Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8898] Support INSERT SQL statement with a subset of columns in Spark 3.5 #12692

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

yihua
Copy link
Contributor

@yihua yihua commented Jan 23, 2025

Change Logs

This PR fixes the issue that the INSERT SQL statement with a subset of columns fails on Hudi table in Spark 3.5. The same succeeds in Spark 3.4.

In Spark 3.5, the following Resolution rules are removed, ResolveUserSpecifiedColumns and ResolveDefaultColumns (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]] from apache/spark#41262). The same logic of resolving the user-specified columns and default values, which are required for a subset of columns as user-specified compared to the table schema to work properly, are deferred to PreprocessTableInsertion for v1 INSERT.

Note that HoodieAnalysis intercepts the InsertIntoStatement after Spark's built-in Resolution rules are applies, the logic of resolving the user specified columns and default values may no longer be applied. To make INSERT with a subset of columns specified by user to work, the custom resolution rule HoodieSpark35ResolveColumnsForInsertInto is added to achieve the same, before converting InsertIntoStatement into InsertIntoHoodieTableCommand. Here's the behavior different before and after the fix on Spark 3.5, when InsertIntoStatement is intercepted

Before the fix, query in the relation for InsertIntoStatement

LocalRelation [col1#128, col2#129, col3#130, col4#131]

After the fix, query in the relation for InsertIntoStatement

Project [id#140, name#139, price#146, ts#147L, dt#137]
+- Project [null AS _hoodie_commit_time#141, null AS _hoodie_commit_seqno#142, null AS _hoodie_record_key#143, null AS _hoodie_partition_path#144, null AS _hoodie_file_name#145, id#140, name#139, null AS price#146, cast(ts#138 as bigint) AS ts#147L, dt#137]
   +- Project [col1#133 AS dt#137, col2#134 AS ts#138, col3#135 AS name#139, col4#136 AS id#140]
      +- LocalRelation [col1#133, col2#134, col3#135, col4#136]

New tests are added in TestInsertTable."Test Insert Into with subset of columns" and "Test Insert Into with subset of columns on Parquet table". The test on Hudi table fails before the fix and passes after the fix.

Reproducing the failure in Spark 3.5 (this is added as tests in TestInsertTable."Test Insert Into with subset of columns" and "Test Insert Into with subset of columns on Parquet table"):
Create table:

     create table $tableName (
       id int,
       dt string,
       name string,
       price double,
       ts long
     ) using hudi
     tblproperties (primaryKey = 'id')
     location '/tmp/table'

INSERT INTO with a subset of columns

         insert into $tableName (dt, ts, name, id)
         values ('2025-01-04', 4000, 'a4', 4)

It fails with

[INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`h1`, the reason is not enough data columns:
Table columns: `id`, `name`, `price`, `ts`, `dt`.
Data columns: `dt`, `ts`, `name`, `id`.
org.apache.spark.sql.AnalysisException: [INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`h1`, the reason is not enough data columns:
Table columns: `id`, `name`, `price`, `ts`, `dt`.
Data columns: `dt`, `ts`, `name`, `id`.
    at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotWriteNotEnoughColumnsToTableError(QueryCompilationErrors.scala:2126)
    at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.resolveOutputColumns(TableOutputResolver.scala:70)
    at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns(HoodieSpark3CatalystPlanUtils.scala:51)
    at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns$(HoodieSpark3CatalystPlanUtils.scala:46)
    at org.apache.spark.sql.HoodieSpark35CatalystPlanUtils$.resolveOutputColumns(HoodieSpark35CatalystPlanUtils.scala:32)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.coerceQueryOutputColumns(InsertIntoHoodieTableCommand.scala:168)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignQueryOutput(InsertIntoHoodieTableCommand.scala:145)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:99)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:62) 

Impact

Fixes INSERT SQL statement with a subset of columns in Spark 3.5

Risk level

low

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jan 23, 2025
@yihua yihua changed the title [HUDI-8898] Support INSERT SQL statement with a subset of columns in Spark 3.5 [HUDI-8898] Support INSERT SQL statement with a subset of columns in Spark 3.5 Jan 23, 2025
@yihua
Copy link
Contributor Author

yihua commented Jan 23, 2025

@KnightChess @jonvex could you guys help review this PR given you have expertise in Spark SQL integration?

@yihua yihua requested review from KnightChess and jonvex January 23, 2025 01:41
if table.resolved && query.resolved
&& i.userSpecifiedCols.nonEmpty && i.table.isInstanceOf[LogicalRelation]
&& sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get) =>
table match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, I directly called PreprocessTableInsertion.apply(plan) to resolve the columns and add projection for columns not present in the user-specified columns. Then I noticed that for empty table (EmptyRelation in old read path) or MOR table with log files (MergeOnReadSnapshotRelation in old read path), the plan may not get preprocessed as PreprocessTableInsertion does not recognize Hudi relations. So I have to copy the logic and adapt them here so INSERT INTO a Hudi table with a subset of columns specified works. But I'm wondering if I should extract the functionality of projection so it's easier to maintain (that'll take more time for me to figure out). @KnightChess wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also are these relations v2? It seems that they are mixed with v1 logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm wondering if I should extract the functionality of projection so it's easier to maintain

I think both approaches are fine, but I prefer having separate implementations for different versions, even if it leads to some code duplication. Even if the abstraction works for Spark 3.4 and Spark 3.5 at the moment, there might be significant changes in future versions like Spark 4.0 and Spark 4.1, making it difficult for a single logic to adapt to multiple versions. This could even result in the creation of abstractions like spark3_45Common and spark4_123Common in the future, which would further complicate things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And with separate implementations for different versions, if full support for v2 relations is achieved in the future, this Rule can simply be removed without incurring significant refactoring costs. What do you think?

Copy link
Contributor

@KnightChess KnightChess Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also are these relations v2? It seems that they are mixed with v1 logic.

hudi is currently mostly implemented using the v1 approach, and v2 relations in writes are generally fallback to v1 relations. So i think it's ok now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Then I'll keep HoodieSpark35ResolveColumnsForInsertInto as an independent resolution rule for Spark 3.5 only (Spark 3.4 does not need such logic, as the new tests have already passed on Spark 3.4 without any changes).

@@ -46,6 +46,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer()

if (HoodieSparkUtils.gteqSpark3_5) {
rules += (_ => instantiateKlass(
"org.apache.spark.sql.hudi.analysis.HoodieSpark35ResolveColumnsForInsertInto"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KnightChess: If we have all the projection and ordering of columns handled in HoodieSpark35ResolveColumnsForInsertInto resolution rule, we can deprecate the projection logic in ResolveImplementationsEarly for InsertIntoStatement?

// Create a project if this is an INSERT INTO query with specified cols.
            val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) {
              ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing catalog table")
              sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis)
            } else {
              None
            }

in

case class ResolveImplementationsEarly() extends Rule[LogicalPlan] {

  override def apply(plan: LogicalPlan): LogicalPlan = {
    plan match {
      // Convert to InsertIntoHoodieTableCommand
      case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_), userSpecifiedCols, partition, query, overwrite, _) if query.resolved =>
        relation match {
          // NOTE: In Spark >= 3.2, Hudi relations will be resolved as [[DataSourceV2Relation]]s by default;
          //       However, currently, fallback will be applied downgrading them to V1 relations, hence
          //       we need to check whether we could proceed here, or has to wait until fallback rule kicks in
          case lr: LogicalRelation =>
            // Create a project if this is an INSERT INTO query with specified cols.
            val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) {
              ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing catalog table")
              sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis)
            } else {
              None
            }
            new InsertIntoHoodieTableCommand(lr, projectByUserSpecified.getOrElse(query), partition, overwrite)
          case _ => iis
        }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the above logic is added, then the logic here can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding this one, I'm keeping the logic as they are still required by Spark 3.4. On Spark 3.5, after applying the new resolution rule HoodieSpark35ResolveColumnsForInsertInto, userSpecifiedCols is reset to empty, so essentially the projection in ResolveImplementationsEarly is not invoked any more.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@@ -46,6 +46,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer()

if (HoodieSparkUtils.gteqSpark3_5) {
rules += (_ => instantiateKlass(
"org.apache.spark.sql.hudi.analysis.HoodieSpark35ResolveColumnsForInsertInto"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the above logic is added, then the logic here can be removed.

if table.resolved && query.resolved
&& i.userSpecifiedCols.nonEmpty && i.table.isInstanceOf[LogicalRelation]
&& sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get) =>
table match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm wondering if I should extract the functionality of projection so it's easier to maintain

I think both approaches are fine, but I prefer having separate implementations for different versions, even if it leads to some code duplication. Even if the abstraction works for Spark 3.4 and Spark 3.5 at the moment, there might be significant changes in future versions like Spark 4.0 and Spark 4.1, making it difficult for a single logic to adapt to multiple versions. This could even result in the creation of abstractions like spark3_45Common and spark4_123Common in the future, which would further complicate things.

if table.resolved && query.resolved
&& i.userSpecifiedCols.nonEmpty && i.table.isInstanceOf[LogicalRelation]
&& sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get) =>
table match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And with separate implementations for different versions, if full support for v2 relations is achieved in the future, this Rule can simply be removed without incurring significant refactoring costs. What do you think?

if table.resolved && query.resolved
&& i.userSpecifiedCols.nonEmpty && i.table.isInstanceOf[LogicalRelation]
&& sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get) =>
table match {
Copy link
Contributor

@KnightChess KnightChess Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also are these relations v2? It seems that they are mixed with v1 logic.

hudi is currently mostly implemented using the v1 approach, and v2 relations in writes are generally fallback to v1 relations. So i think it's ok now

@@ -46,6 +46,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer()

if (HoodieSparkUtils.gteqSpark3_5) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this rule should be applied after the HoodieSpark35DataSourceV2ToV1Fallback rule. Before applying RelationInsertInto again, the v2Relation has already been fallback to v1Relation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Fixed.

Copy link
Contributor

@jonvex jonvex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me

Seq(1, "a1", 10.0, 1000, "2025-01-01"),
Seq(2, "a2", 20.0, 2000, "2025-01-02"),
Seq(3, "a3", 30.0, 3000, "2025-01-03"),
Seq(4, "a4", null, 4000, "2025-01-04")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the column is non-nullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's going to fail the write, since the values for a non-nullable column cannot be null.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:M PR with lines of changes in (100, 300]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants