diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index dc4c0713f17bb..00f7b8b6a13ee 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -73,7 +73,7 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) { return new IntegerUpdater(); } else if (sparkType == DataTypes.LongType) { - return new IntegerUpdater(); + return new LongIntegerUpdater(); } else if (sparkType == DataTypes.ByteType) { return new ByteUpdater(); } else if (sparkType == DataTypes.ShortType) { @@ -246,14 +246,14 @@ public void decodeSingleDictionaryId( } } - private static class UnsignedIntegerUpdater implements ParquetVectorUpdater { + private static class LongIntegerUpdater implements ParquetVectorUpdater { @Override public void readValues( int total, int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - valuesReader.readUnsignedIntegers(total, values, offset); + valuesReader.readIntegersAsLongs(total, values, offset); } @Override @@ -266,7 +266,7 @@ public void readValue( int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - values.putLong(offset, Integer.toUnsignedLong(valuesReader.readInteger())); + values.putLong(offset, valuesReader.readInteger()); } @Override diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 39591be3b4be4..4018c1c44b205 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -97,6 +97,16 @@ public void skipIntegers(int total) { in.skip(total * 4L); } + @Override + public final void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) { + int requiredBytes = total * 4; + ByteBuffer buffer = getBuffer(requiredBytes); + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, buffer.getInt()); + } + } + + @Override public final void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) { int requiredBytes = total * 4; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index af739a52d8ed0..166784d1e669e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -311,6 +311,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) { } } + @Override + public void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + @Override public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) { throw new UnsupportedOperationException("only readInts is valid."); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index fc4eac94d1c46..f4f5af990cb42 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -43,6 +43,7 @@ public interface VectorizedValuesReader { void readShorts(int total, WritableColumnVector c, int rowId); void readIntegers(int total, WritableColumnVector c, int rowId); void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase); + void readIntegersAsLongs(int total, WritableColumnVector c, int rowId); void readUnsignedIntegers(int total, WritableColumnVector c, int rowId); void readUnsignedLongs(int total, WritableColumnVector c, int rowId); void readLongs(int total, WritableColumnVector c, int rowId);