Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5769] Ensure partitions created by async indexer are not deleted by regular writers #12662

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.storage.StoragePath;
Expand Down Expand Up @@ -2054,16 +2055,80 @@ public boolean useBloomIndexBucketizedChecking() {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
}

/**
* Determines if the metadata bloom filter index is enabled.
*
* <p>The bloom filter index is enabled if:
* <ul>
* <li>The metadata table is enabled and bloom filter index is enabled in the metadata configuration, or</li>
* <li>The bloom filter index is not explicitly marked for dropping in the metadata configuration.</li>
* </ul>
*
* @return {@code true} if the metadata bloom filter index is enabled, {@code false} otherwise.
*/
public boolean isMetadataBloomFilterIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isBloomFilterIndexEnabled();
return isMetadataTableEnabled() && getMetadataConfig().isBloomFilterIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
codope marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Determines if the metadata column stats index is enabled.
*
* <p>The column stats index is enabled if:
* <ul>
* <li>The metadata table is enabled and column stats index is enabled in the metadata configuration, or</li>
* <li>The column stats index is not explicitly marked for dropping in the metadata configuration.</li>
* </ul>
*
* @return {@code true} if the metadata column stats index is enabled, {@code false} otherwise.
*/
public boolean isMetadataColumnStatsIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isColumnStatsIndexEnabled();
return isMetadataTableEnabled() && getMetadataConfig().isColumnStatsIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.COLUMN_STATS.getPartitionPath());
}

/**
* Determines if the partition stats index is enabled.
*
* <p>The partition stats index is enabled if:
* <ul>
* <li>The metadata table is enabled and partition stats index is enabled in the metadata configuration, or</li>
* <li>The partition stats index is not explicitly marked for dropping in the metadata configuration.</li>
* </ul>
*
* @return {@code true} if the partition stats index is enabled, {@code false} otherwise.
*/
public boolean isPartitionStatsIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isPartitionStatsIndexEnabled();
return isMetadataTableEnabled() && getMetadataConfig().isPartitionStatsIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.PARTITION_STATS.getPartitionPath());
}

/**
* Determines if the record index is enabled.
*
* <p>The record index is enabled if:
* <ul>
* <li>The record index is enabled in the metadata configuration, or</li>
* <li>The record index is not explicitly marked for dropping in the metadata configuration.</li>
* </ul>
*
* @return {@code true} if the record index is enabled, {@code false} otherwise.
*/
public boolean isRecordIndexEnabled() {
return metadataConfig.isRecordIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.RECORD_INDEX.getPartitionPath());
}

/**
* Checks if a specific metadata index is marked for dropping based on the metadata configuration.
*
* <p>An index is considered marked for dropping if:
* <ul>
* <li>The metadata configuration specifies a non-empty index to drop, and</li>
* <li>The specified index matches the given index name.</li>
* </ul>
*
* @param indexName the name of the metadata index to check
* @return {@code true} if the specified metadata index is marked for dropping, {@code false} otherwise.
*/
public boolean isDropMetadataIndex(String indexName) {
return StringUtils.nonEmpty(getMetadataConfig().getMetadataIndexToDrop()) && getMetadataConfig().getMetadataIndexToDrop().equals(indexName);
}

public int getPartitionStatsIndexParallelism() {
Expand Down Expand Up @@ -2575,10 +2640,6 @@ public boolean isLogCompactionEnabledOnMetadata() {
return getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE);
}

public boolean isRecordIndexEnabled() {
return metadataConfig.isRecordIndexEnabled();
}

public int getRecordIndexMinFileGroupCount() {
return metadataConfig.getRecordIndexMinFileGroupCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,9 +1096,9 @@ engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, get

// Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
if (dataWriteConfig.isRecordIndexEnabled()) {
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()), commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
if (getMetadataPartitionsToUpdate().contains(RECORD_INDEX.getPartitionPath())) {
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()), commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
}
updateExpressionIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.InstantFileNameParser;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
Expand Down Expand Up @@ -1045,12 +1045,17 @@ public void deleteMetadataIndexIfNecessary() {
private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionType) {
// Only delete metadata table partition when all the following conditions are met:
// (1) This is data table.
// (2) Index corresponding to this metadata partition is disabled in HoodieWriteConfig.
// (2) This metadata partition does NOT exist on storage.
// (3) The completed metadata partitions in table config contains this partition.
// NOTE: Inflight metadata partitions are not considered as they could have been inflight due to async indexer.
if (isMetadataTable() || !config.isMetadataTableEnabled()) {
return false;
}
boolean metadataIndexDisabled = isMetadataIndexDisabled(partitionType);
return metadataIndexDisabled && metaClient.getTableConfig().getMetadataPartitions().contains(partitionType.getPartitionPath());
}

private boolean isMetadataIndexDisabled(MetadataPartitionType partitionType) {
boolean metadataIndexDisabled;
switch (partitionType) {
// NOTE: FILES partition type is always considered in sync with hoodie.metadata.enable.
Expand All @@ -1064,12 +1069,21 @@ private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionTyp
case RECORD_INDEX:
metadataIndexDisabled = !config.isRecordIndexEnabled();
break;
// PARTITION_STATS should have same behavior as COLUMN_STATS
case PARTITION_STATS:
metadataIndexDisabled = !config.isPartitionStatsIndexEnabled();
codope marked this conversation as resolved.
Show resolved Hide resolved
break;
// Expression and Secondary index can be in different partitions for different keys,
// and do not delete unless DROP INDEX is called.
case EXPRESSION_INDEX:
case SECONDARY_INDEX:
metadataIndexDisabled = !partitionType.isMetadataPartitionAvailable(metaClient);
break;
default:
LOG.debug("Not a valid metadata partition type: " + partitionType.name());
return false;
}
return metadataIndexDisabled
&& metaClient.getTableConfig().getMetadataPartitions().contains(partitionType.getPartitionPath());
return metadataIndexDisabled;
}

private boolean shouldExecuteMetadataTableDeletion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.metadata.MetadataPartitionType;

import javax.annotation.concurrent.Immutable;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

/**
* Configurations used by the HUDI Metadata Table.
Expand Down Expand Up @@ -391,6 +394,16 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating secondary index.");

// Config to specify metadata index to delete
public static final ConfigProperty<String> DROP_METADATA_INDEX = ConfigProperty
.key(METADATA_PREFIX + ".index.drop")
.noDefaultValue()
.sinceVersion("1.0.1")
.withDocumentation("Drop the specified index. "
+ "The value should be the name of the index to delete. You can check index names using `SHOW INDEXES` command. "
+ "The index name either starts with or matches exactly can be one of the following: "
+ StringUtils.join(Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()), ", "));

public long getMaxLogFileSize() {
return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
}
Expand Down Expand Up @@ -552,6 +565,10 @@ public int getSecondaryIndexParallelism() {
return getInt(SECONDARY_INDEX_PARALLELISM);
}

public String getMetadataIndexToDrop() {
return getString(DROP_METADATA_INDEX);
}

public static class Builder {

private EngineType engineType = EngineType.SPARK;
Expand Down Expand Up @@ -760,6 +777,11 @@ public Builder withSecondaryIndexParallelism(int parallelism) {
return this;
}

public Builder withDropMetadataIndex(String indexName) {
metadataConfig.setValue(DROP_METADATA_INDEX, indexName);
return this;
}

public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams
import org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS
import org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions, config}
import org.apache.spark.sql._
Expand Down Expand Up @@ -163,6 +164,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {
val metadataOpts3 = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false",
HoodieMetadataConfig.DROP_METADATA_INDEX.key -> COLUMN_STATS.getPartitionPath,
HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c7" // ignore c4,c5,c8.
)
// disable col stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,36 @@ public void testIndexerForRecordIndex() {
"streamer-config/indexer-record-index.properties");
}

/**
* Test partitions created by async indexer is not deleted by regular writers if the partition is not enabled in the regular writer config.
* <p>
* 1. Upsert with metadata enabled with default configs (RECORD_INDEX is disabled by default).
* 2. Run async indexer for RECORD_INDEX.
* 3. Upsert with metadata enabled with default configs (RECORD_INDEX is disabled by default).
* 4. Validate RECORD_INDEX partition is not deleted.
*/
@Test
public void testIndexerWithDifferentIngestionWriterConfig() {
String tableName = "indexer_test";
// Step 1: upsert with metadata enabled with default configs (RECORD_INDEX is disabled by default)
HoodieMetadataConfig.Builder metadataConfigBuilder = HoodieMetadataConfig.newBuilder().enable(true);
upsertToTable(metadataConfigBuilder.build(), tableName);

// Step 2: build indexer config which has only RECORD_INDEX enabled
indexMetadataPartitionsAndAssert(RECORD_INDEX, Arrays.asList(new MetadataPartitionType[] {FILES, RECORD_INDEX}), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS}),
tableName, "streamer-config/indexer-record-index.properties");
// validate table config and metadata partitions actually exist
assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(RECORD_INDEX.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath(), context(), RECORD_INDEX.getPartitionPath()));

// Step 3: upsert with metadata enabled with default configs (RECORD_INDEX is disabled by default)
upsertToTable(metadataConfigBuilder.build(), tableName);

// Step 4: validate RECORD_INDEX partition is not deleted
assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(RECORD_INDEX.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath(), context(), RECORD_INDEX.getPartitionPath()));
}

@Test
public void testIndexerWithWriterFinishingFirst() throws IOException {
// Test the case where the indexer is running, i.e., the delta commit in the metadata table
Expand Down
Loading