Skip to content

Commit

Permalink
DRILL-8495: Tried to remove unmanaged buffer (#2913)
Browse files Browse the repository at this point in the history
  • Loading branch information
rymarm authored and jnturton committed May 17, 2024
1 parent 13b494a commit f0b1d26
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public class HiveDefaultRecordReader extends AbstractRecordReader {
protected boolean empty;

/**
* Buffer used for population of partition vectors and to fill in data into vectors via writers
* Buffer used for population of partition vectors
*/
private final DrillBuf drillBuf;

Expand Down Expand Up @@ -238,7 +238,7 @@ public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition par
this.proxyUserGroupInfo = proxyUgi;
this.empty = inputSplits == null || inputSplits.isEmpty();
this.inputSplitsIterator = empty ? Collections.emptyIterator() : inputSplits.iterator();
this.drillBuf = context.getManagedBuffer().reallocIfNeeded(256);
this.drillBuf = context.getManagedBuffer();
this.partitionVectors = new ValueVector[0];
this.partitionValues = new Object[0];
setColumns(projectedColumns);
Expand Down Expand Up @@ -333,7 +333,7 @@ private Callable<Void> getInitTask(OutputMutator output) {
this.selectedStructFieldRefs = new StructField[selectedColumnNames.size()];
this.columnValueWriters = new HiveValueWriter[selectedColumnNames.size()];
this.outputWriter = new VectorContainerWriter(output, /*enabled union*/ false);
HiveValueWriterFactory hiveColumnValueWriterFactory = new HiveValueWriterFactory(drillBuf, outputWriter.getWriter());
HiveValueWriterFactory hiveColumnValueWriterFactory = new HiveValueWriterFactory(fragmentContext.getManagedBufferManager(), outputWriter.getWriter());
for (int refIdx = 0; refIdx < selectedStructFieldRefs.length; refIdx++) {
String columnName = selectedColumnNames.get(refIdx);
StructField fieldRef = finalObjInspector.getStructFieldRef(columnName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.function.BiFunction;
import java.util.function.Function;

import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.BufferManager;
import org.apache.drill.exec.store.hive.writers.complex.HiveListWriter;
import org.apache.drill.exec.store.hive.writers.complex.HiveMapWriter;
import org.apache.drill.exec.store.hive.writers.complex.HiveStructWriter;
Expand Down Expand Up @@ -97,18 +97,18 @@ public final class HiveValueWriterFactory {
private static final Logger logger = LoggerFactory.getLogger(HiveValueWriterFactory.class);

/**
* Buffer shared across created Hive writers. May be used by writer for reading data
* to buffer than from buffer to vector.
* Buffer manager used to create buffers for Hive writers for reading data
* to buffer than from buffer to vector if needed.
*/
private final DrillBuf drillBuf;
private final BufferManager bufferManager;

/**
* Used to manage and create column writers.
*/
private final SingleMapWriter rootWriter;

public HiveValueWriterFactory(DrillBuf drillBuf, SingleMapWriter rootWriter) {
this.drillBuf = drillBuf;
public HiveValueWriterFactory(BufferManager bufferManager, SingleMapWriter rootWriter) {
this.bufferManager = bufferManager;
this.rootWriter = rootWriter;
}

Expand Down Expand Up @@ -200,7 +200,7 @@ private HiveValueWriter createPrimitiveHiveValueWriter(String name, PrimitiveTyp
case BINARY: {
VarBinaryWriter writer = extractWriter(name, parentWriter,
MapWriter::varBinary, ListWriter::varBinary, UnionVectorWriter::varBinary);
return new HiveBinaryWriter((BinaryObjectInspector) inspector, writer, drillBuf);
return new HiveBinaryWriter((BinaryObjectInspector) inspector, writer, bufferManager.getManagedBuffer());
}
case BOOLEAN: {
BitWriter writer = extractWriter(name, parentWriter,
Expand Down Expand Up @@ -240,12 +240,12 @@ private HiveValueWriter createPrimitiveHiveValueWriter(String name, PrimitiveTyp
case STRING: {
VarCharWriter writer = extractWriter(name, parentWriter,
MapWriter::varChar, ListWriter::varChar, UnionVectorWriter::varChar);
return new HiveStringWriter((StringObjectInspector) inspector, writer, drillBuf);
return new HiveStringWriter((StringObjectInspector) inspector, writer, bufferManager.getManagedBuffer());
}
case VARCHAR: {
VarCharWriter writer = extractWriter(name, parentWriter,
MapWriter::varChar, ListWriter::varChar, UnionVectorWriter::varChar);
return new HiveVarCharWriter((HiveVarcharObjectInspector) inspector, writer, drillBuf);
return new HiveVarCharWriter((HiveVarcharObjectInspector) inspector, writer, bufferManager.getManagedBuffer());
}
case TIMESTAMP: {
TimeStampWriter writer = extractWriter(name, parentWriter,
Expand All @@ -260,7 +260,7 @@ private HiveValueWriter createPrimitiveHiveValueWriter(String name, PrimitiveTyp
case CHAR: {
VarCharWriter writer = extractWriter(name, parentWriter,
MapWriter::varChar, ListWriter::varChar, UnionVectorWriter::varChar);
return new HiveCharWriter((HiveCharObjectInspector) inspector, writer, drillBuf);
return new HiveCharWriter((HiveCharObjectInspector) inspector, writer, bufferManager.getManagedBuffer());
}
case DECIMAL: {
DecimalTypeInfo decimalType = (DecimalTypeInfo) typeInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.hive;

import static org.apache.drill.shaded.guava.com.google.common.base.Strings.repeat;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -462,6 +463,24 @@ public void testTableWithEmptyParquet() throws Exception {
.go();
}

@Test // see DRILL-8495
public void testReadingHiveDataBiggerThan256Bytes() throws Exception {
testBuilder()
.sqlQuery("select * from hive.`256_bytes_plus_table`")
.unOrdered()
.baselineColumns(
"char_col",
"varchar_col",
"binary_col",
"string_col")
.baselineValues(
repeat("A", 255),
repeat("B", 1200),
repeat("C", 320).getBytes(),
repeat("D", 2200))
.go();
}

private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> columnsList, Map<String, Integer> expectedResult) {
for (UserProtos.ResultColumnMetadata columnMetadata : columnsList) {
assertTrue("Column should be present in result set", expectedResult.containsKey(columnMetadata.getColumnName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void showTablesFromDb() throws Exception{
.baselineValues("hive.default", "hive_view_m")
.baselineValues("hive.default", "view_over_hive_view")
.baselineValues("hive.default", "table_with_empty_parquet")
.baselineValues("hive.default", "256_bytes_plus_table")
.go();

testBuilder()
Expand Down Expand Up @@ -268,6 +269,7 @@ public void showInfoSchema() throws Exception {
.baselineValues("DRILL", "hive.default", "hive_view_m", "TABLE")
.baselineValues("DRILL", "hive.default", "view_over_hive_view", "VIEW")
.baselineValues("DRILL", "hive.default", "table_with_empty_parquet", "TABLE")
.baselineValues("DRILL", "hive.default", "256_bytes_plus_table", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE")
.baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.sql.Date;
import java.sql.Timestamp;

import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.exec.hive.HiveTestUtilities;
Expand Down Expand Up @@ -99,6 +100,9 @@ private void generateDataInternal(Driver hiveDriver) throws Exception {
FileUtils.forceDelete(emptyTableLocation);
}

// generate table with variable length columns and populate if with different size data
generateTableWithVariableLengthColumns(hiveDriver);

// create a Hive table that has columns with data types which are supported for reading in Drill.
testDataFile = generateAllTypesDataFile();
executeQuery(hiveDriver,
Expand Down Expand Up @@ -609,4 +613,22 @@ private String generateTestDataWithHeadersAndFooters(String tableName, int rowCo

return sb.toString();
}

private void generateTableWithVariableLengthColumns(Driver hiveDriver) {
executeQuery(hiveDriver, "CREATE TABLE IF NOT EXISTS 256_bytes_plus_table (" +
" char_col CHAR(255)," +
" varchar_col VARCHAR(1500)," +
" binary_col BINARY," +
" string_col STRING" +
")");

String insertQuery = String.format("INSERT INTO 256_bytes_plus_table VALUES\n" +
" ('%s', '%s', '%s', '%s')",
Strings.repeat("A", 255),
Strings.repeat("B", 1200),
Strings.repeat("C", 320),
Strings.repeat("D", 2200));

executeQuery(hiveDriver, insertQuery);
}
}

0 comments on commit f0b1d26

Please sign in to comment.