Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMDS-3548] Optimize IO path Python interface for SystemDS #2189

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion scripts/perftest/python/io/load_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
[
"from systemds.script_building.script import DMLScript",
"import numpy as np",
"array = np.loadtxt(src, delimiter=',')",
"import os",
"if os.path.isdir(src):",
" files = [os.path.join(src, f) for f in os.listdir(src)]",
" array = np.concatenate([np.loadtxt(f, delimiter=',') for f in files])",
"else:",
" array = np.loadtxt(src, delimiter=',')",
"if dtype is not None:",
" array = array.astype(dtype)",
]
Expand Down
7 changes: 6 additions & 1 deletion scripts/perftest/python/io/load_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
[
"from systemds.script_building.script import DMLScript",
"import pandas as pd",
"df = pd.read_csv(src, header=None)",
"import os",
"if os.path.isdir(src):",
" files = [os.path.join(src, f) for f in os.listdir(src)]",
" df = pd.concat([pd.read_csv(f, header=None) for f in files])",
"else:",
" df = pd.read_csv(src, header=None)",
"if dtype is not None:",
" df = df.astype(dtype)",
]
Expand Down
41 changes: 41 additions & 0 deletions src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,12 @@ public void reset() {
reset(0, true);
}

public void setRow(int c, Object[] row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add javadocs for these methods.

for (int i = 0; i < row.length; i++) {
set(c, i, row[i]);
}
}

/**
* Append a row to the end of the data frame, where all row fields are boxed objects according to the schema.
*
Expand Down Expand Up @@ -753,6 +759,41 @@ else if(column != null && column.size() != _nRow)
_msize = -1;
}

public void appendColumnChunk(int c, Array<?> chunk) {
if (_coldata == null) {
_coldata = new Array[getNumColumns()];
}

if (_coldata[c] == null) {
_coldata[c] = chunk;
_nRow = chunk.size();
} else {
_coldata[c] = ArrayFactory.append(_coldata[c], chunk);
_nRow += chunk.size();
}

_msize = -1;
}

public void setColumnChunk(int colIndex, Array<?> chunk, int offset, int colSize) {
if (_coldata == null) {
_coldata = new Array[getNumColumns()];
_nRow = colSize;
}

if (_coldata[colIndex] == null) {
_coldata[colIndex] = ArrayFactory.allocate(chunk.getValueType(), _nRow);
}

if (_coldata[colIndex].getValueType() != chunk.getValueType()) {
throw new DMLRuntimeException("ValueType mismatch in setColumnChunk: expected " +
_coldata[colIndex].getValueType() + " but got " + chunk.getValueType());
}

ArrayFactory.set(_coldata[colIndex], chunk, offset, offset + chunk.size() - 1, _nRow);
}


@Override
public void write(DataOutput out) throws IOException {
final boolean isDefaultMeta = isColNamesDefault() && isColumnMetadataDefault();
Expand Down
114 changes: 60 additions & 54 deletions src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,75 +128,81 @@ public static Array<?> convert(byte[] data, int numElements, Types.ValueType val
buffer.order(ByteOrder.LITTLE_ENDIAN);

Array<?> array = ArrayFactory.allocate(valueType, numElements);
readBufferIntoArray(buffer, array, valueType, numElements);

// Process the data based on the value type
switch(valueType) {
case UINT8:
for(int i = 0; i < numElements; i++) {
return array;
}

// Right now row conversion is only supported for if all columns have the same datatype, so this is a placeholder for now that essentially just casts to Object[]
public static Object[] convertRow(byte[] data, int numElements, Types.ValueType valueType) {
Array<?> converted = convert(data, numElements, valueType);

Object[] row = new Object[numElements];
for(int i = 0; i < numElements; i++) {
row[i] = converted.get(i);
}

return row;
}

public static Array<?>[] convertFused(byte[] data, int numElements, Types.ValueType[] valueTypes) {
int numOperations = valueTypes.length;

ByteBuffer buffer = ByteBuffer.wrap(data);
buffer.order(ByteOrder.LITTLE_ENDIAN);

Array<?>[] arrays = new Array<?>[numOperations];

for (int i = 0; i < numOperations; i++) {
arrays[i] = ArrayFactory.allocate(valueTypes[i], numElements);
readBufferIntoArray(buffer, arrays[i], valueTypes[i], numElements);
}

return arrays;
}

private static void readBufferIntoArray(ByteBuffer buffer, Array<?> array, Types.ValueType valueType, int numElements) {
for (int i = 0; i < numElements; i++) {
switch (valueType) {
case UINT8:
array.set(i, (int) (buffer.get() & 0xFF));
}
break;
case INT32:
for(int i = 0; i < numElements; i++) {
array.set(i, buffer.getInt());
}
break;
case INT64:
for(int i = 0; i < numElements; i++) {
array.set(i, buffer.getLong());
}
break;
case FP32:
for(int i = 0; i < numElements; i++) {
break;
case INT32:
case HASH32:
array.set(i, buffer.getInt());
break;
case INT64:
case HASH64:
array.set(i, buffer.getLong());
break;
case FP32:
array.set(i, buffer.getFloat());
}
break;
case FP64:
for(int i = 0; i < numElements; i++) {
break;
case FP64:
array.set(i, buffer.getDouble());
}
break;
case BOOLEAN:
for(int i = 0; i < numElements; i++) {
break;
case BOOLEAN:
if (array instanceof BooleanArray) {
((BooleanArray) array).set(i, buffer.get() != 0);
} else if (array instanceof BitSetArray) {
((BitSetArray) array).set(i, buffer.get() != 0);
} else {
throw new DMLRuntimeException("Array factory returned invalid array type for boolean values.");
}
}
break;
case STRING:
for(int i = 0; i < numElements; i++) {
buffer.order(ByteOrder.BIG_ENDIAN);
int strLen = buffer.getInt();
buffer.order(ByteOrder.LITTLE_ENDIAN);
byte[] strBytes = new byte[strLen];
break;
case STRING:
int strLength = buffer.getInt();
byte[] strBytes = new byte[strLength];
buffer.get(strBytes);
array.set(i, new String(strBytes, StandardCharsets.UTF_8));
}
break;
case CHARACTER:
for(int i = 0; i < numElements; i++) {
break;
case CHARACTER:
array.set(i, buffer.getChar());
}
break;
case HASH32:
for(int i = 0; i < numElements; i++) {
array.set(i, buffer.getInt());
}
break;
case HASH64:
for(int i = 0; i < numElements; i++) {
array.set(i, buffer.getLong());
}
break;
default:
throw new DMLRuntimeException("Unsupported value type: " + valueType);
break;
default:
throw new DMLRuntimeException("Unsupported value type: " + valueType);
}
}

return array;
}

public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
Expand Down
121 changes: 72 additions & 49 deletions src/main/python/systemds/utils/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
else:
arr = np_arr.ravel().astype(np.float64)
value_type = jvm.org.apache.sysds.common.Types.ValueType.FP64
buf = bytearray(arr.tobytes())
buf = arr.tobytes()

# Send data to java.
try:
Expand All @@ -82,31 +82,36 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject):
)


def convert_column(jvm, rows, j, col_type, pd_col, fb, col_name):
"""Converts a given pandas column to a FrameBlock representation.
def convert(jvm, fb, idx, num_elements, value_type, pd_series, conversion="column"):
"""Converts a given pandas column or row to a FrameBlock representation.

:param jvm: The JVMView of the current SystemDS context.
:param rows: The number of rows in the pandas DataFrame.
:param j: The current column index.
:param col_type: The ValueType of the column.
:param pd_col: The pandas column to convert.
:param fb: The FrameBlock to add the column to.
:param idx: The current column/row index.
:param num_elements: The number of rows/columns in the pandas DataFrame.
:param value_type: The ValueType of the column/row.
:param pd_series: The pandas column or row to convert.
:param conversion: The type of conversion to perform. Can be either "column" or "row".
"""
if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING:
if pd_series.dtype == "string" or pd_series.dtype == "object":
byte_data = bytearray()
for value in pd_col.astype(str):
for value in pd_series.astype(str):
encoded_value = value.encode("utf-8")
byte_data.extend(struct.pack(">I", len(encoded_value)))
byte_data.extend(struct.pack("<I", len(encoded_value)))
byte_data.extend(encoded_value)
else:
col_data = pd_col.fillna("").to_numpy()
byte_data = bytearray(col_data.tobytes())
byte_data = pd_series.fillna("").to_numpy().tobytes()

converted_array = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
byte_data, rows, col_type
)

fb.setColumnName(j, str(col_name))
fb.setColumn(j, converted_array)
if conversion == "column":
converted_array = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
byte_data, num_elements, value_type
)
fb.setColumn(idx, converted_array)
elif conversion == "row":
converted_array = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convertRow(
byte_data, num_elements, value_type
)
fb.setRow(idx, converted_array)


def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
Expand All @@ -121,58 +126,76 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):

jvm: JVMView = sds.java_gateway.jvm
java_gate: JavaGateway = sds.java_gateway
jc_ValueType = jvm.org.apache.sysds.common.Types.ValueType

# pandas type mapping to systemds Valuetypes
data_type_mapping = {
np.dtype(np.object_): jvm.org.apache.sysds.common.Types.ValueType.STRING,
np.dtype(np.int64): jvm.org.apache.sysds.common.Types.ValueType.INT64,
np.dtype(np.float64): jvm.org.apache.sysds.common.Types.ValueType.FP64,
np.dtype(np.bool_): jvm.org.apache.sysds.common.Types.ValueType.BOOLEAN,
np.dtype("<M8[ns]"): jvm.org.apache.sysds.common.Types.ValueType.STRING,
np.dtype(np.int32): jvm.org.apache.sysds.common.Types.ValueType.INT32,
np.dtype(np.float32): jvm.org.apache.sysds.common.Types.ValueType.FP32,
np.dtype(np.uint8): jvm.org.apache.sysds.common.Types.ValueType.UINT8,
np.dtype(np.str_): jvm.org.apache.sysds.common.Types.ValueType.CHARACTER,
"object": jc_ValueType.STRING,
"int64": jc_ValueType.INT64,
"float64": jc_ValueType.FP64,
"bool": jc_ValueType.BOOLEAN,
"string": jc_ValueType.STRING,
"int32": jc_ValueType.INT32,
"float32": jc_ValueType.FP32,
"uint8": jc_ValueType.UINT8,
}
schema = []
col_names = []

for col_name, dtype in dict(pd_df.dtypes).items():
# schema and j_valueTypeArray are essentially doubled but accessing a Java array is costly,
# while also being necessary for FrameBlock, so we create one for Python and one for Java.
col_names = []
schema = []
j_valueTypeArray = java_gate.new_array(jc_ValueType, cols)
j_colNameArray = java_gate.new_array(jvm.java.lang.String, cols)
for i, (col_name, dtype) in enumerate(dict(pd_df.dtypes).items()):
j_colNameArray[i] = str(col_name)
col_names.append(col_name)
if dtype in data_type_mapping.keys():
schema.append(data_type_mapping[dtype])
type_key = str(dtype)
if type_key in data_type_mapping:
schema.append(data_type_mapping[type_key])
j_valueTypeArray[i] = data_type_mapping[type_key]
else:
schema.append(jvm.org.apache.sysds.common.Types.ValueType.STRING)
schema.append(jc_ValueType.STRING)
j_valueTypeArray[i] = jc_ValueType.STRING

try:
jc_ValueType = jvm.org.apache.sysds.common.Types.ValueType
jc_String = jvm.java.lang.String
jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock
j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema))

# execution speed increases with optimized code when the number of rows exceeds 4
if rows > 4:
for i in range(len(schema)):
j_valueTypeArray[i] = schema[i]
# Row conversion if more columns than rows and all columns have the same type, otherwise column
conversion_type = "row" if cols > rows and len(set(pd_df.dtypes)) == 1 else "column"
if conversion_type == "row":
pd_df = pd_df.transpose()
col_names = pd_df.columns.tolist() # re-calculate col names

fb = jc_FrameBlock(j_valueTypeArray, rows)
fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, rows if conversion_type == "column" else None)
if conversion_type == "row":
fb.ensureAllocatedColumns(rows)

# We use .submit() with explicit .result() calling to properly propagate exceptions
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(
lambda j, col_name: convert_column(
jvm, rows, j, schema[j], pd_df[col_name], fb, col_name
),
range(len(col_names)),
col_names,
)
futures = [
executor.submit(
convert,
jvm,
fb,
i,
rows if conversion_type == "column" else cols,
schema[i],
pd_df[col_name],
conversion_type,
)
for i, col_name in enumerate(col_names)
]

for future in concurrent.futures.as_completed(futures):
future.result()

return fb
else:
j_dataArray = java_gate.new_array(jc_String, rows, cols)
j_colNameArray = java_gate.new_array(jc_String, len(col_names))

for j, col_name in enumerate(col_names):
j_valueTypeArray[j] = schema[j]
j_colNameArray[j] = str(col_names[j])
col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)

for i in range(col_data.shape[0]):
Expand Down
Loading