Skip to content

Commit

Permalink
Temporary fix for SPARK-36990 (#43)
Browse files Browse the repository at this point in the history
* Temporary fix for SPARK-36990
As we cannot differentiate between uint32 and int32, use LongIntegerUpdater by default, which converts an int32 to long using signed conversion
  • Loading branch information
catalinii authored Oct 14, 2021
1 parent dc38be5 commit bbf7a91
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) {
return new IntegerUpdater();
} else if (sparkType == DataTypes.LongType) {
return new IntegerUpdater();
return new LongIntegerUpdater();
} else if (sparkType == DataTypes.ByteType) {
return new ByteUpdater();
} else if (sparkType == DataTypes.ShortType) {
Expand Down Expand Up @@ -246,14 +246,14 @@ public void decodeSingleDictionaryId(
}
}

private static class UnsignedIntegerUpdater implements ParquetVectorUpdater {
private static class LongIntegerUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
valuesReader.readUnsignedIntegers(total, values, offset);
valuesReader.readIntegersAsLongs(total, values, offset);
}

@Override
Expand All @@ -266,7 +266,7 @@ public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putLong(offset, Integer.toUnsignedLong(valuesReader.readInteger()));
values.putLong(offset, valuesReader.readInteger());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ public void skipIntegers(int total) {
in.skip(total * 4L);
}

@Override
public final void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);
for (int i = 0; i < total; i += 1) {
c.putLong(rowId + i, buffer.getInt());
}
}


@Override
public final void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) {
}
}

@Override
public void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}

@Override
public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public interface VectorizedValuesReader {
void readShorts(int total, WritableColumnVector c, int rowId);
void readIntegers(int total, WritableColumnVector c, int rowId);
void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase);
void readIntegersAsLongs(int total, WritableColumnVector c, int rowId);
void readUnsignedIntegers(int total, WritableColumnVector c, int rowId);
void readUnsignedLongs(int total, WritableColumnVector c, int rowId);
void readLongs(int total, WritableColumnVector c, int rowId);
Expand Down

0 comments on commit bbf7a91

Please sign in to comment.