Skip to content

Commit

Permalink
Add new config and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Jan 20, 2025
1 parent fabb82e commit 899cd0a
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.storage.StoragePath;
Expand Down Expand Up @@ -2040,15 +2041,19 @@ public boolean useBloomIndexBucketizedChecking() {
}

public boolean isMetadataBloomFilterIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isBloomFilterIndexEnabled();
return isMetadataTableEnabled() && getMetadataConfig().isBloomFilterIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
}

public boolean isMetadataColumnStatsIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isColumnStatsIndexEnabled();
return isMetadataTableEnabled() && getMetadataConfig().isColumnStatsIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.COLUMN_STATS.getPartitionPath());
}

public boolean isPartitionStatsIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isPartitionStatsIndexEnabled();
return isMetadataTableEnabled() && getMetadataConfig().isPartitionStatsIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.PARTITION_STATS.getPartitionPath());
}

public boolean isDropMetadataIndex(String indexName) {
return StringUtils.nonEmpty(getMetadataConfig().getMetadataIndexToDrop()) && getMetadataConfig().getMetadataIndexToDrop().equals(indexName);
}

public int getPartitionStatsIndexParallelism() {
Expand Down Expand Up @@ -2557,7 +2562,7 @@ public boolean isLogCompactionEnabledOnMetadata() {
}

public boolean isRecordIndexEnabled() {
return metadataConfig.isRecordIndexEnabled();
return metadataConfig.isRecordIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.RECORD_INDEX.getPartitionPath());
}

public int getRecordIndexMinFileGroupCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,9 +1097,9 @@ engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, get

// Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
if (dataWriteConfig.isRecordIndexEnabled()) {
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()), commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
if (dataWriteConfig.isRecordIndexEnabled() && RECORD_INDEX.isMetadataPartitionAvailable(dataMetaClient)) {
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()), commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
}
updateExpressionIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.InstantFileNameParser;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
Expand Down Expand Up @@ -1051,10 +1051,41 @@ private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionTyp
if (isMetadataTable() || !config.isMetadataTableEnabled()) {
return false;
}
boolean metadataIndexDisabled = !partitionType.isMetadataPartitionAvailable(metaClient);
boolean metadataIndexDisabled = isMetadataIndexDisabled(partitionType);
return metadataIndexDisabled && metaClient.getTableConfig().getMetadataPartitions().contains(partitionType.getPartitionPath());
}

private boolean isMetadataIndexDisabled(MetadataPartitionType partitionType) {
boolean metadataIndexDisabled;
switch (partitionType) {
// NOTE: FILES partition type is always considered in sync with hoodie.metadata.enable.
// It cannot be the case that metadata is enabled but FILES is disabled.
case COLUMN_STATS:
metadataIndexDisabled = !config.isMetadataColumnStatsIndexEnabled();
break;
case BLOOM_FILTERS:
metadataIndexDisabled = !config.isMetadataBloomFilterIndexEnabled();
break;
case RECORD_INDEX:
metadataIndexDisabled = !config.isRecordIndexEnabled();
break;
// PARTITION_STATS should have same behavior as COLUMN_STATS
case PARTITION_STATS:
metadataIndexDisabled = !config.isPartitionStatsIndexEnabled();
break;
// Expression and Secondary index can be in different partitions for different keys,
// and do not delete unless DROP INDEX is called.
case EXPRESSION_INDEX:
case SECONDARY_INDEX:
metadataIndexDisabled = !partitionType.isMetadataPartitionAvailable(metaClient);
break;
default:
LOG.debug("Not a valid metadata partition type: " + partitionType.name());
return false;
}
return metadataIndexDisabled;
}

private boolean shouldExecuteMetadataTableDeletion() {
// Only execute metadata table deletion when all the following conditions are met
// (1) This is data table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.metadata.MetadataPartitionType;

import javax.annotation.concurrent.Immutable;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

/**
* Configurations used by the HUDI Metadata Table.
Expand Down Expand Up @@ -391,6 +394,16 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating secondary index.");

// Config to specify metadata index to delete
public static final ConfigProperty<String> DROP_METADATA_INDEX = ConfigProperty
.key(METADATA_PREFIX + ".index.drop")
.noDefaultValue()
.sinceVersion("1.0.1")
.withDocumentation("Drop the specified index. "
+ "The value should be the name of the index to delete. You can check index names using `SHOW INDEXES` command. "
+ "The index name either starts with or matches exactly can be one of the following: "
+ StringUtils.join(Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()), ", "));

public long getMaxLogFileSize() {
return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
}
Expand Down Expand Up @@ -552,6 +565,10 @@ public int getSecondaryIndexParallelism() {
return getInt(SECONDARY_INDEX_PARALLELISM);
}

public String getMetadataIndexToDrop() {
return getString(DROP_METADATA_INDEX);
}

public static class Builder {

private EngineType engineType = EngineType.SPARK;
Expand Down Expand Up @@ -760,6 +777,11 @@ public Builder withSecondaryIndexParallelism(int parallelism) {
return this;
}

public Builder withDropMetadataIndex(String indexName) {
metadataConfig.setValue(DROP_METADATA_INDEX, indexName);
return this;
}

public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.hudi.common.util.{ParquetUtils, StringUtils}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams
import org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions, config}
import org.apache.spark.sql._
Expand Down Expand Up @@ -162,6 +163,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
val metadataOpts3 = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false",
HoodieMetadataConfig.DROP_METADATA_INDEX.key -> COLUMN_STATS.getPartitionPath,
HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c7" // ignore c4,c5,c8.
)
// disable col stats
Expand Down

0 comments on commit 899cd0a

Please sign in to comment.