Skip to content

Commit

Permalink
Spark Thrift Server Fixes by rmenon
Browse files Browse the repository at this point in the history
  • Loading branch information
Catalin Toda committed Oct 8, 2021
1 parent 3fc8380 commit c94d53a
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 13 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@
<hive.group>org.apache.hive</hive.group>
<hive.classifier>core</hive.classifier>
<!-- Version used in Maven Hive dependency -->
<hive.version>2.3.9</hive.version>
<hive23.version>2.3.9</hive23.version>
<hive.version>2.3.6.41</hive.version>
<hive23.version>2.3.6.41</hive23.version>
<!-- Version used for internal directory structure -->
<hive.version.short>2.3</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
Expand Down Expand Up @@ -1803,7 +1803,7 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
<classifier>${hive.classifier}</classifier>
<version>${hive.version}</version>
<version>2.3.6.41</version>
<scope>${hive.deps.scope}</scope>
<exclusions>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ public class ColumnBasedSet implements RowSet {
public static final Logger LOG = LoggerFactory.getLogger(ColumnBasedSet.class);

public ColumnBasedSet(TableSchema schema) {
descriptors = schema.toTypeDescriptors();
columns = new ArrayList<ColumnBuffer>();
for (ColumnDescriptor colDesc : schema.getColumnDescriptors()) {
columns.add(new ColumnBuffer(colDesc.getType()));
if (schema == null) {
descriptors = new TypeDescriptor[0];
columns = new ArrayList<ColumnBuffer>();
} else {
descriptors = schema.toTypeDescriptors();
columns = new ArrayList<ColumnBuffer>();
for (ColumnDescriptor colDesc : schema.getColumnDescriptors()) {
columns.add(new ColumnBuffer(colDesc.getType()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
public class HiveCommandOperation extends ExecuteStatementOperation {
private CommandProcessor commandProcessor;
private TableSchema resultSchema = null;
private int readRows = 0;

/**
* For processors other than Hive queries (Driver), they output to session.out (a temp file)
Expand Down Expand Up @@ -156,10 +157,11 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H
}
List<String> rows = readResults((int) maxRows);
RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), false);

rowSet.setStartOffset(readRows);
for (String row : rows) {
rowSet.addRow(new String[] {row});
}
readRows += rows.size();
return rowSet;
}

Expand Down Expand Up @@ -210,5 +212,6 @@ private void resetResultReader() {
ServiceUtils.cleanup(LOG, resultReader);
resultReader = null;
}
readRows = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle,
public RowSet getOperationLogRowSet(OperationHandle opHandle,
FetchOrientation orientation, long maxRows)
throws HiveSQLException {
TableSchema tableSchema = new TableSchema(getLogSchema());
RowSet rowSet = RowSetFactory.create(tableSchema,
getOperation(opHandle).getProtocolVersion(), false);

// get the OperationLog object from the operation
OperationLog operationLog = getOperation(opHandle).getOperationLog();
if (operationLog == null) {
Expand All @@ -256,17 +260,14 @@ public RowSet getOperationLogRowSet(OperationHandle opHandle,

// read logs
List<String> logs;
rowSet.setStartOffset(operationLog.getStartPosition(isFetchFirst(orientation)));
try {
logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows);
} catch (SQLException e) {
throw new HiveSQLException(e.getMessage(), e.getCause());
}


// convert logs to RowSet
TableSchema tableSchema = new TableSchema(getLogSchema());
RowSet rowSet = RowSetFactory.create(tableSchema,
getOperation(opHandle).getProtocolVersion(), false);
for (String log : logs) {
rowSet.addRow(new String[] {log});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,11 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H
fetchStarted = true;
driver.setMaxRows((int) maxRows);
if (driver.getResults(convey)) {
return decode(convey, rowSet);
decode(convey, rowSet);
}
long startRowOffset = driver.getStartRowOffset();
rowSet.setStartOffset(startRowOffset);
driver.setStartRowOffset(startRowOffset + rowSet.numRows());
return rowSet;
} catch (IOException e) {
throw new HiveSQLException(e);
Expand Down

0 comments on commit c94d53a

Please sign in to comment.