getSegmentsTo() {
+ return _segmentsTo;
+ }
+
+ public boolean isFailed() {
+ return _failed;
+ }
+
+ public void setFailed() {
+ _failed = true;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
index 56f089021970..964126f89ccd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
@@ -18,57 +18,151 @@
*/
package org.apache.pinot.common.minion;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
/**
- * Metadata for the minion task of type RealtimeToOfflineSegmentsTask
.
- * The watermarkMs
denotes the time (exclusive) upto which tasks have been executed.
- *
+ * Metadata for the minion task of type RealtimeToOfflineSegmentsTask
. The _windowStartMs
+ * denotes the time (exclusive) until which it's certain that tasks have been completed successfully. The
+ * _checkPoints
contains the expected RTO tasks result info. This map can contain both
+ * completed and in-completed Tasks expected Results. This map is used by generator to validate whether a potential
+ * segment (for RTO task) has already been successfully processed as a RTO task in the past or not. The
+ * _windowStartMs
and _windowEndMs
denote the window bucket time of currently not
+ * successfully completed minion task. bucket: [_windowStartMs, _windowEndMs) The window is updated by generator when
+ * it's certain that prev minon task run is successful.
+ *
* This gets serialized and stored in zookeeper under the path
* MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
- *
- * PinotTaskGenerator:
- * The watermarkMs
> is used by the RealtimeToOfflineSegmentsTaskGenerator
,
- * to determine the window of execution for the task it is generating.
- * The window of execution will be [watermarkMs, watermarkMs + bucketSize)
- *
- * PinotTaskExecutor:
- * The same watermark is used by the RealtimeToOfflineSegmentsTaskExecutor
, to:
- * - Verify that is is running the latest task scheduled by the task generator
- * - Update the watermark as the end of the window that it executed for
+ *
+ * PinotTaskGenerator: The _windowStartMs
> is used by the
+ * RealtimeToOfflineSegmentsTaskGenerator
, to determine the window of execution of the prev task based on
+ * which it generates new task.
+ *
+ * PinotTaskExecutor: The same windowStartMs is used by the RealtimeToOfflineSegmentsTaskExecutor
, to:
+ * - Verify that it's running the latest task scheduled by the task generator.
+ * - The _checkPoints is updated before the offline segments are uploaded to the table.
*/
public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {
- private static final String WATERMARK_KEY = "watermarkMs";
+ private static final String WINDOW_START_KEY = "watermarkMs";
+ private static final String WINDOW_END_KEY = "windowEndMs";
+ private static final String COMMA_SEPARATOR = ",";
private final String _tableNameWithType;
- private final long _watermarkMs;
+ private long _windowStartMs;
+ private long _windowEndMs;
+ private final List _checkPoints;
- public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs) {
+ public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long windowStartMs) {
+ _windowStartMs = windowStartMs;
_tableNameWithType = tableNameWithType;
- _watermarkMs = watermarkMs;
+ _checkPoints = new ArrayList<>();
+ }
+
+ public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long windowStartMs, long windowEndMs,
+ List checkPoints) {
+ _windowStartMs = windowStartMs;
+ _windowEndMs = windowEndMs;
+ _tableNameWithType = tableNameWithType;
+ _checkPoints = checkPoints;
}
public String getTableNameWithType() {
return _tableNameWithType;
}
- /**
- * Get the watermark in millis
- */
- public long getWatermarkMs() {
- return _watermarkMs;
+ public void setWindowStartMs(long windowStartMs) {
+ _windowStartMs = windowStartMs;
+ }
+
+ public long getWindowStartMs() {
+ return _windowStartMs;
+ }
+
+ public long getWindowEndMs() {
+ return _windowEndMs;
+ }
+
+ public void setWindowEndMs(long windowEndMs) {
+ _windowEndMs = windowEndMs;
+ }
+
+ public List getCheckPoints() {
+ return _checkPoints;
+ }
+
+ public void addCheckpoint(RealtimeToOfflineCheckpointCheckPoint newCheckPoint) {
+ if (canAddCheckpoint(newCheckPoint)) {
+ _checkPoints.add(newCheckPoint);
+ }
+ }
+
+ private boolean canAddCheckpoint(RealtimeToOfflineCheckpointCheckPoint newCheckPoint) {
+ Set segmentsFrom = newCheckPoint.getSegmentsFrom();
+ for (String segmentName : segmentsFrom) {
+ for (RealtimeToOfflineCheckpointCheckPoint checkPoint : _checkPoints) {
+ if (checkPoint.isFailed()) {
+ continue;
+ }
+ Set prevSegmentsFrom = checkPoint.getSegmentsFrom();
+ Preconditions.checkState(!prevSegmentsFrom.contains(segmentName),
+ "A live Checkpoints already exists for segment: " + segmentName);
+ }
+ }
+ return true;
}
public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znRecord) {
- long watermark = znRecord.getLongField(WATERMARK_KEY, 0);
- return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), watermark);
+ long windowStartMs = znRecord.getLongField(WINDOW_START_KEY, 0);
+ long windowEndMs = znRecord.getLongField(WINDOW_END_KEY, 0);
+
+ Map> listFields = znRecord.getListFields();
+ List checkPoints = new ArrayList<>();
+
+ if (listFields != null) {
+ for (Map.Entry> listField : listFields.entrySet()) {
+ String checkpointID = listField.getKey();
+
+ List value = listField.getValue();
+ Preconditions.checkState(value.size() == 4);
+
+ Set segmentsFrom = new HashSet<>(Arrays.asList(StringUtils.split(value.get(0), COMMA_SEPARATOR)));
+ Set segmentsTo = new HashSet<>(Arrays.asList(StringUtils.split(value.get(1), COMMA_SEPARATOR)));
+ String taskID = value.get(2);
+ boolean isFailedCheckpoint = Boolean.parseBoolean(value.get(3));
+
+ checkPoints.add(new RealtimeToOfflineCheckpointCheckPoint(segmentsFrom, segmentsTo, checkpointID, taskID,
+ isFailedCheckpoint));
+ }
+ }
+
+ return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), windowStartMs, windowEndMs, checkPoints);
}
public ZNRecord toZNRecord() {
ZNRecord znRecord = new ZNRecord(_tableNameWithType);
- znRecord.setLongField(WATERMARK_KEY, _watermarkMs);
+ znRecord.setLongField(WINDOW_START_KEY, _windowStartMs);
+ znRecord.setLongField(WINDOW_END_KEY, _windowEndMs);
+
+ for (RealtimeToOfflineCheckpointCheckPoint checkPoint : _checkPoints) {
+ String segmentsFrom = String.join(COMMA_SEPARATOR, checkPoint.getSegmentsFrom());
+ String segmentsTo = String.join(COMMA_SEPARATOR, checkPoint.getSegmentsTo());
+ String taskId = checkPoint.getTaskID();
+ boolean isFailedCheckpoint = checkPoint.isFailed();
+ String id = checkPoint.getId();
+
+ List listEntry = Arrays.asList(segmentsFrom, segmentsTo, taskId, Boolean.toString(isFailedCheckpoint));
+ znRecord.setListField(id, listEntry);
+ }
+
return znRecord;
}
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
index 4f109940fa2e..80a6d37f33ad 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
@@ -18,7 +18,15 @@
*/
package org.apache.pinot.common.metadata;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.minion.RealtimeToOfflineCheckpointCheckPoint;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.testng.annotations.Test;
@@ -41,6 +49,89 @@ public void testToFromZNRecord() {
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord);
assertEquals(realtimeToOfflineSegmentsTaskMetadata.getTableNameWithType(), "testTable_REALTIME");
- assertEquals(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), 1000);
+ assertEquals(realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(), 1000);
+ }
+
+ @Test
+ public void testToFromZNRecordWithWindowIntervalAndExpectedResults() {
+ List checkPoints = new ArrayList<>();
+ RealtimeToOfflineCheckpointCheckPoint checkPoint =
+ new RealtimeToOfflineCheckpointCheckPoint(
+ new HashSet<>(Arrays.asList("githubEvents__0__0__20241213T2002Z", "githubEvents__0__0__20241213T2003Z")),
+ new HashSet<>(Arrays.asList("githubEventsOffline__0__0__20241213T2002Z",
+ "githubEventsOffline__0__0__20241213T2003Z")),
+ "1");
+ RealtimeToOfflineCheckpointCheckPoint checkPoint1 =
+ new RealtimeToOfflineCheckpointCheckPoint(
+ new HashSet<>(Arrays.asList("githubEvents__0__0__20241213T2102Z", "githubEvents__0__0__20241213T2203Z")),
+ new HashSet<>(Arrays.asList("githubEventsOffline__0__0__20241213T2032Z",
+ "githubEventsOffline__0__0__20241213T2403Z")),
+ "2");
+
+ checkPoints.add(checkPoint);
+ checkPoints.add(checkPoint1);
+
+ RealtimeToOfflineSegmentsTaskMetadata originalMetadata =
+ new RealtimeToOfflineSegmentsTaskMetadata("testTable_REALTIME", 1000, 2000, checkPoints);
+
+ ZNRecord znRecord = originalMetadata.toZNRecord();
+ assertEquals(znRecord.getId(), "testTable_REALTIME");
+ assertEquals(znRecord.getSimpleField("watermarkMs"), "1000");
+ assertEquals(znRecord.getSimpleField("windowEndMs"), "2000");
+ Map> listFields = znRecord.getListFields();
+
+ for (String id : listFields.keySet()) {
+ List fields = listFields.get(id);
+ assertEquals(fields.size(), 4);
+ String taskID = fields.get(2);
+ boolean taskFailure = Boolean.parseBoolean(fields.get(3));
+ assert !taskFailure;
+
+ switch (taskID) {
+ case "1":
+ assertEquals(fields.get(0), String.join(",", checkPoint.getSegmentsFrom()));
+ assertEquals(fields.get(1), String.join(",", checkPoint.getSegmentsTo()));
+ break;
+ case "2":
+ assertEquals(fields.get(0), String.join(",", checkPoint1.getSegmentsFrom()));
+ assertEquals(fields.get(1), String.join(",", checkPoint1.getSegmentsTo()));
+ break;
+ default:
+ throw new RuntimeException("invalid taskID");
+ }
+ }
+
+ RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+ RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord);
+
+ assert isEqual(realtimeToOfflineSegmentsTaskMetadata, originalMetadata);
+ }
+
+ private boolean isEqual(RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata,
+ RealtimeToOfflineSegmentsTaskMetadata originalMetadata) {
+ assertEquals(realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs(), originalMetadata.getWindowEndMs());
+ assertEquals(realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(), originalMetadata.getWindowStartMs());
+ assertEquals(realtimeToOfflineSegmentsTaskMetadata.getTableNameWithType(), originalMetadata.getTableNameWithType());
+
+ originalMetadata.getCheckPoints().sort(Comparator.comparing(RealtimeToOfflineCheckpointCheckPoint::getId));
+ realtimeToOfflineSegmentsTaskMetadata.getCheckPoints()
+ .sort(Comparator.comparing(RealtimeToOfflineCheckpointCheckPoint::getId));
+
+ for (int checkpointIndex = 0; checkpointIndex < originalMetadata.getCheckPoints().size(); checkpointIndex++) {
+ assert isEqual((originalMetadata.getCheckPoints().get(checkpointIndex)),
+ realtimeToOfflineSegmentsTaskMetadata.getCheckPoints().get(checkpointIndex));
+ }
+ return true;
+ }
+
+ private boolean isEqual(RealtimeToOfflineCheckpointCheckPoint checkPoint1,
+ RealtimeToOfflineCheckpointCheckPoint checkPoint2) {
+ return Objects.equals(checkPoint1.getSegmentsFrom(),
+ checkPoint2.getSegmentsFrom()) && Objects.equals(
+ checkPoint1.getSegmentsTo(),
+ checkPoint2.getSegmentsTo()) && Objects.equals(
+ checkPoint1.getId(), checkPoint2.getId())
+ && Objects.equals(
+ checkPoint1.getTaskID(), checkPoint2.getTaskID());
}
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 296c981c1821..e6da066b051b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -31,6 +31,7 @@
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.common.MinionConstants;
@@ -228,7 +229,7 @@ public void testRealtimeToOfflineSegmentsTask()
.map(ColumnPartitionConfig::getNumPartitions).reduce((a, b) -> a * b)
.orElseThrow(() -> new RuntimeException("Expected accumulated result but not found.")) : 1;
- long expectedWatermark = _dataSmallestTimeMs + 86400000;
+ long expectedWatermark = _dataSmallestTimeMs;
for (int i = 0; i < 3; i++) {
// Schedule task
assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null)
@@ -245,7 +246,7 @@ public void testRealtimeToOfflineSegmentsTask()
segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_offlineTableName);
assertEquals(segmentsZKMetadata.size(), (numOfflineSegmentsPerTask * (i + 1)));
- long expectedOfflineSegmentTimeMs = expectedWatermark - 86400000;
+ long expectedOfflineSegmentTimeMs = expectedWatermark;
for (int j = (numOfflineSegmentsPerTask * i); j < segmentsZKMetadata.size(); j++) {
SegmentZKMetadata segmentZKMetadata = segmentsZKMetadata.get(j);
assertEquals(segmentZKMetadata.getStartTimeMs(), expectedOfflineSegmentTimeMs);
@@ -262,6 +263,42 @@ public void testRealtimeToOfflineSegmentsTask()
}
testHardcodedQueries();
+
+ // delete all offline segments to test how generator handles prev minion task failure
+ List allOfflineSegments = _helixResourceManager.getSegmentsFor(_offlineTableName, true);
+ PinotResourceManagerResponse response = _helixResourceManager.deleteSegments(_offlineTableName, allOfflineSegments);
+ assert response.isSuccessful();
+ allOfflineSegments = _helixResourceManager.getSegmentsFor(_offlineTableName, true);
+ assertEquals(allOfflineSegments.size(), 0);
+ expectedWatermark -= 86400000;
+
+ // Schedule task
+ assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertTrue(_taskResourceManager.getTaskQueues().contains(
+ PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
+ // Should not generate more tasks
+ MinionTaskTestUtils.assertNoTaskSchedule(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+
+ // Wait at most 600 seconds for all tasks COMPLETED
+ waitForTaskToComplete(expectedWatermark, _realtimeTableName);
+ // check segment is in offline
+ segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_offlineTableName);
+ assertEquals(segmentsZKMetadata.size(), (numOfflineSegmentsPerTask));
+
+ long expectedOfflineSegmentTimeMs = expectedWatermark;
+ for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+ assertEquals(segmentZKMetadata.getStartTimeMs(), expectedOfflineSegmentTimeMs);
+ assertEquals(segmentZKMetadata.getEndTimeMs(), expectedOfflineSegmentTimeMs);
+ if (segmentPartitionConfig != null) {
+ assertEquals(segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().keySet(),
+ segmentPartitionConfig.getColumnPartitionMap().keySet());
+ for (String partitionColumn : segmentPartitionConfig.getColumnPartitionMap().keySet()) {
+ assertEquals(segmentZKMetadata.getPartitionMetadata().getPartitions(partitionColumn).size(), 1);
+ }
+ }
+ }
}
@Test
@@ -279,7 +316,7 @@ public void testRealtimeToOfflineSegmentsMetadataPushTask()
.map(ColumnPartitionConfig::getNumPartitions).reduce((a, b) -> a * b)
.orElseThrow(() -> new RuntimeException("Expected accumulated result but not found.")) : 1;
- long expectedWatermark = _dataSmallestMetadataTableTimeMs + 86400000;
+ long expectedWatermark = _dataSmallestMetadataTableTimeMs;
_taskManager.cleanUpTask();
for (int i = 0; i < 3; i++) {
// Schedule task
@@ -297,7 +334,7 @@ public void testRealtimeToOfflineSegmentsMetadataPushTask()
segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_offlineMetadataTableName);
assertEquals(segmentsZKMetadata.size(), (numOfflineSegmentsPerTask * (i + 1)));
- long expectedOfflineSegmentTimeMs = expectedWatermark - 86400000;
+ long expectedOfflineSegmentTimeMs = expectedWatermark;
for (int j = (numOfflineSegmentsPerTask * i); j < segmentsZKMetadata.size(); j++) {
SegmentZKMetadata segmentZKMetadata = segmentsZKMetadata.get(j);
assertEquals(segmentZKMetadata.getStartTimeMs(), expectedOfflineSegmentTimeMs);
@@ -332,7 +369,7 @@ private void waitForTaskToComplete(long expectedWatermark, String realtimeTableN
RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
assertNotNull(minionTaskMetadata);
- assertEquals(minionTaskMetadata.getWatermarkMs(), expectedWatermark);
+ assertEquals(minionTaskMetadata.getWindowStartMs(), expectedWatermark);
}
@AfterClass
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
index bb1fc70afafa..49fb5dcb99d2 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -21,11 +21,17 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.minion.RealtimeToOfflineCheckpointCheckPoint;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
@@ -39,10 +45,13 @@
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,17 +70,18 @@
* Before beginning the task, the watermarkMs
is checked in the minion task metadata ZNode,
* located at MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
* It should match the windowStartMs
.
- * The version of the znode is cached.
*
- * After the segments are uploaded, this task updates the watermarkMs
in the minion task metadata ZNode.
- * The znode version is checked during update,
- * and update only succeeds if version matches with the previously cached version
+ * Before the segments are uploaded, this task updates the _checkpoints
+ * in the minion task metadata ZNode.
+ * The znode version is checked during update, retrying until max attempts and version of znode is equal to expected.
+ * Reason for above is that, since multiple subtasks run in parallel, there can be race condition
+ * with updating the znode.
*/
public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);
+ private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
- private int _expectedVersion = Integer.MIN_VALUE;
public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager,
MinionConf minionConf) {
@@ -82,7 +92,6 @@ public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionT
/**
* Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table.
* Checks that the watermarkMs
from the ZNode matches the windowStartMs in the task configs.
- * If yes, caches the ZNode version to check during update.
*/
@Override
public void preProcess(PinotTaskConfig pinotTaskConfig) {
@@ -99,12 +108,10 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
long windowStartMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
- Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs() <= windowStartMs,
+ Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs() == windowStartMs,
"watermarkMs in RealtimeToOfflineSegmentsTask metadata: %s shouldn't be larger than windowStartMs: %d in task"
+ " configs for table: %s. ZNode may have been modified by another task",
- realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), windowStartMs, realtimeTableName);
-
- _expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
+ realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs(), windowStartMs, realtimeTableName);
}
@Override
@@ -156,6 +163,11 @@ protected List convert(PinotTaskConfig pinotTaskConfig,
// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
+ // Since multiple subtasks run in parallel, there shouldn't be a name conflict.
+ // Append uuid
+ segmentProcessorConfigBuilder.setSegmentNameGenerator(
+ new SimpleSegmentNameGenerator(rawTableName, null, true, false));
+
// Progress observer
segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p));
@@ -194,19 +206,57 @@ protected List convert(PinotTaskConfig pinotTaskConfig,
/**
* Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table.
- * Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update
- * watermark in the ZNode
- * TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot see another way to do it
+ * Before uploading the segments, updates the metadata with the expected results
+ * of the successful execution of current subtask.
+ * The expected result updated in metadata is read by the next iteration of Task Generator
+ * to ensure data correctness.
*/
@Override
- public void postProcess(PinotTaskConfig pinotTaskConfig) {
- Map configs = pinotTaskConfig.getConfigs();
- String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
- long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
- RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
- new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs);
- _minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, RealtimeToOfflineSegmentsTask.TASK_TYPE,
- _expectedVersion);
+ protected void preUploadSegments(SegmentUploadContext context)
+ throws Exception {
+ super.preUploadSegments(context);
+ String realtimeTableName = context.getTableNameWithType();
+ PinotTaskConfig pinotTaskConfig = context.getPinotTaskConfig();
+ String taskId = pinotTaskConfig.getTaskId();
+ int attemptCount = 0;
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+ try {
+ ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
+ _minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName,
+ RealtimeToOfflineSegmentsTask.TASK_TYPE);
+ int expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
+
+ // Adding Checkpoint might fail.
+ // In-case of failure there will be runtime exception thrown
+ RealtimeToOfflineSegmentsTaskMetadata updatedRealtimeToOfflineSegmentsTaskMetadata =
+ getUpdatedTaskMetadata(context, realtimeToOfflineSegmentsTaskZNRecord);
+
+ // Setting to zookeeper might fail due to version mismatch, but in this case
+ // the exception is caught and retried.
+ _minionTaskZkMetadataManager.setTaskMetadataZNRecord(updatedRealtimeToOfflineSegmentsTaskMetadata,
+ RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ expectedVersion);
+ return true;
+ } catch (ZkBadVersionException e) {
+ LOGGER.info(
+ "Version changed while updating num of subtasks left in RTO task metadata for table: {}, taskId: {}, "
+ + "Retrying.",
+ realtimeTableName, taskId);
+ return false;
+ }
+ });
+ } catch (Exception e) {
+ String errorMsg =
+ String.format("Failed to update the RealtimeToOfflineSegmentsTaskMetadata during preUploadSegments. "
+ + "(tableName = %s), (attemptCount = %d), (taskId = %s)", realtimeTableName, attemptCount, taskId);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
+ }
+ LOGGER.info(
+ "Successfully updated the RealtimeToOfflineSegmentsTaskMetadata during preUploadSegments for table: {}, "
+ + "attemptCount: {}, taskId: {}",
+ realtimeTableName, attemptCount, taskId);
}
@Override
@@ -215,4 +265,34 @@ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifi
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
Collections.emptyMap());
}
+
+ private RealtimeToOfflineSegmentsTaskMetadata getUpdatedTaskMetadata(SegmentUploadContext context,
+ ZNRecord realtimeToOfflineSegmentsTaskZNRecord) {
+
+ RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+ RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
+
+ RealtimeToOfflineCheckpointCheckPoint checkPoint = getExpectedSubtaskResult(context);
+
+ realtimeToOfflineSegmentsTaskMetadata.addCheckpoint(checkPoint);
+
+ return realtimeToOfflineSegmentsTaskMetadata;
+ }
+
+ private RealtimeToOfflineCheckpointCheckPoint getExpectedSubtaskResult(
+ SegmentUploadContext context) {
+
+ PinotTaskConfig pinotTaskConfig = context.getPinotTaskConfig();
+ String taskId = pinotTaskConfig.getTaskId();
+
+ Set segmentsFrom =
+ Arrays.stream(StringUtils.split(context.getInputSegmentNames(), MinionConstants.SEGMENT_NAME_SEPARATOR))
+ .map(String::trim).collect(Collectors.toSet());
+
+ Set segmentsTo =
+ context.getSegmentConversionResults().stream().map(SegmentConversionResult::getSegmentName)
+ .collect(Collectors.toSet());
+
+ return new RealtimeToOfflineCheckpointCheckPoint(segmentsFrom, segmentsTo, taskId);
+ }
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index 128610ae6411..b9d9f98b1d35 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
@@ -29,9 +30,12 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineCheckpointCheckPoint;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
@@ -49,6 +53,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants.Segment;
import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +84,9 @@
* Such segments will be checked for segment endTime, to ensure there's no overflow into CONSUMING segments
*
* - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ *
+ * - Generator owns the responsibility to ensure prev minion tasks were successful and only then watermark
+ * can be updated.
*/
@TaskGenerator
public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator {
@@ -86,6 +94,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator {
private static final String DEFAULT_BUCKET_PERIOD = "1d";
private static final String DEFAULT_BUFFER_PERIOD = "2d";
+ private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = Integer.MAX_VALUE;
@Override
public String getTaskType() {
@@ -107,6 +116,8 @@ public List generateTasks(List tableConfigs) {
LOGGER.info("Start generating task configs for table: {} for task: {}", realtimeTableName, taskType);
// Only schedule 1 task of this type, per table
+ // Still there can be scenario where generator can generate additional task, while previous task
+ // is just about to be enqueued in the helix queue.
Map incompleteTasks =
TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, _clusterInfoAccessor);
if (!incompleteTasks.isEmpty()) {
@@ -116,12 +127,12 @@ public List generateTasks(List tableConfigs) {
}
// Get all segment metadata for completed segments (DONE/UPLOADED status).
- List completedSegmentsZKMetadata = new ArrayList<>();
+ List completedRealtimeSegmentsZKMetadata = new ArrayList<>();
Map partitionToLatestLLCSegmentName = new HashMap<>();
Set allPartitions = new HashSet<>();
- getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata, partitionToLatestLLCSegmentName,
+ getCompletedSegmentsInfo(realtimeTableName, completedRealtimeSegmentsZKMetadata, partitionToLatestLLCSegmentName,
allPartitions);
- if (completedSegmentsZKMetadata.isEmpty()) {
+ if (completedRealtimeSegmentsZKMetadata.isEmpty()) {
LOGGER.info("No realtime-completed segments found for table: {}, skipping task generation: {}",
realtimeTableName, taskType);
continue;
@@ -147,106 +158,380 @@ public List generateTasks(List tableConfigs) {
long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
- // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd =
- // windowStart + bucket.
- long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs);
- long windowEndMs = windowStartMs + bucketMs;
+ ZNRecord realtimeToOfflineZNRecord =
+ _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ realtimeTableName);
+ int expectedVersion = realtimeToOfflineZNRecord != null ? realtimeToOfflineZNRecord.getVersion() : -1;
+ RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+ getRTOTaskMetadata(realtimeTableName, completedRealtimeSegmentsZKMetadata, bucketMs,
+ realtimeToOfflineZNRecord);
+
+ // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark.
+ long windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
// Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd
// (exclusive)
- List segmentNames = new ArrayList<>();
- List downloadURLs = new ArrayList<>();
Set lastLLCSegmentPerPartition = new HashSet<>(partitionToLatestLLCSegmentName.values());
- boolean skipGenerate = false;
- while (true) {
- // Check that execution window is older than bufferTime
- if (windowEndMs > System.currentTimeMillis() - bufferMs) {
- LOGGER.info(
- "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task "
- + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
- skipGenerate = true;
- break;
+
+ // Get all offline table segments.
+ // These are used to validate if previous minion task was successful or not
+ String offlineTableName =
+ TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName));
+ Set existingOfflineTableSegmentNames =
+ new HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName, true));
+
+ // In-case of previous minion task failures, get info
+ // of failed minion subtasks. They need to be reprocessed.
+ List failedTaskCheckpoints =
+ getFailedCheckpoints(realtimeToOfflineSegmentsTaskMetadata, existingOfflineTableSegmentNames);
+
+ // In-case of partial failure of segments upload in prev minion task run,
+ // data is inconsistent, delete the corresponding offline segments immediately.
+ Set failedRealtimeSegments;
+ List segmentsToBeReProcessed = new ArrayList<>();
+
+ if (!failedTaskCheckpoints.isEmpty()) {
+ failedRealtimeSegments = new HashSet<>();
+ for (RealtimeToOfflineCheckpointCheckPoint checkPoint : failedTaskCheckpoints) {
+ failedRealtimeSegments.addAll(checkPoint.getSegmentsFrom());
}
+ LOGGER.warn("found prev minion task failures for table: {}, failed task RealtimeSegments: {}",
+ realtimeTableName, failedRealtimeSegments);
- for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
- String segmentName = segmentZKMetadata.getSegmentName();
- long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
- long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
-
- // Check overlap with window
- if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
- // If last completed segment is being used, make sure that segment crosses over end of window.
- // In the absence of this check, CONSUMING segments could contain some portion of the window. That data
- // would be skipped forever.
- if (lastLLCSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
- LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
- + "generation: {}", segmentName, taskType);
- skipGenerate = true;
- break;
- }
- segmentNames.add(segmentName);
- downloadURLs.add(segmentZKMetadata.getDownloadUrl());
- }
+ deleteInvalidOfflineSegments(offlineTableName, existingOfflineTableSegmentNames, failedTaskCheckpoints);
+ segmentsToBeReProcessed = filterOutDeletedSegments(failedRealtimeSegments, completedRealtimeSegmentsZKMetadata);
+ }
+
+ // if no segment to be reprocessed, no failure
+ boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty();
+
+ List> segmentNamesGroupList = new ArrayList<>();
+ Map segmentNameVsDownloadURL = new HashMap<>();
+
+ // maxNumRecordsPerTask is used to divide a minion tasks among
+ // multiple subtasks to improve performance.
+ int maxNumRecordsPerSubTask =
+ taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY) != null
+ ? Integer.parseInt(
+ taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY))
+ : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+
+ List segmentsToBeScheduled;
+
+ if (!prevMinionTaskSuccessful) {
+ List segmentsNamesToBeReprocessed = new ArrayList<>();
+ for (SegmentZKMetadata segmentZKMetadata : segmentsToBeReProcessed) {
+ segmentsNamesToBeReprocessed.add(segmentZKMetadata.getSegmentName());
}
- if (skipGenerate || !segmentNames.isEmpty()) {
- break;
+ LOGGER.warn(
+ "Found prev minion task failures. Re-Scheduling previously failed task input segments: {} of table: {}",
+ segmentsNamesToBeReprocessed, realtimeTableName);
+ segmentsToBeScheduled = segmentsToBeReProcessed;
+ } else {
+ // if all offline segments of prev minion tasks were successfully uploaded,
+ // we can clear the state of prev minion tasks as now it's useless.
+ if (!realtimeToOfflineSegmentsTaskMetadata.getCheckPoints().
+ isEmpty()) {
+ realtimeToOfflineSegmentsTaskMetadata.getCheckPoints().clear();
+ // windowEndTime of prev minion task needs to be re-used for picking up the
+ // next windowStartTime. This is useful for case where user changes minion config
+ // after a minion task run was complete. So windowStartTime cannot be watermark + bucketMs
+ windowStartMs = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
}
+ long windowEndMs = windowStartMs + bucketMs;
+ // since window changed, pick new segments.
+ segmentsToBeScheduled =
+ generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata, windowStartMs, windowEndMs, bucketMs,
+ bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition, realtimeToOfflineSegmentsTaskMetadata);
+ }
+
+ divideSegmentsAmongSubtasks(segmentsToBeScheduled, segmentNamesGroupList, segmentNameVsDownloadURL,
+ maxNumRecordsPerSubTask);
+
+ if (segmentNamesGroupList.isEmpty()) {
+ continue;
+ }
+
+ List pinotTaskConfigsForTable = new ArrayList<>();
+ long newWindowStartTime = realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
+ long newWindowEndTime = realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
+
+ LOGGER.info(
+ "generating tasks for: {} with window start time: {}, window end time: {}, table: {}", taskType,
+ windowStartMs,
+ newWindowEndTime, realtimeTableName);
+
+ for (List segmentNameList : segmentNamesGroupList) {
+ List downloadURLList = getDownloadURLList(segmentNameList, segmentNameVsDownloadURL);
+ Preconditions.checkState(segmentNameList.size() == downloadURLList.size());
+ pinotTaskConfigsForTable.add(
+ createPinotTaskConfig(segmentNameList, downloadURLList, realtimeTableName, taskConfigs, tableConfig,
+ newWindowStartTime,
+ newWindowEndTime, taskType));
+ }
+ try {
+ _clusterInfoAccessor
+ .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
+ MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ expectedVersion);
+ } catch (ZkBadVersionException e) {
+ LOGGER.error(
+ "Version changed while updating RTO task metadata for table: {}, skip scheduling. There are "
+ + "multiple task schedulers for the same table, need to investigate!", realtimeTableName);
+ // skip this table for this minion run
+ continue;
+ }
+
+ pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
+
+ LOGGER.info("Finished generating task configs for table: {} for task: {}", realtimeTableName, taskType);
+ }
+ return pinotTaskConfigs;
+ }
+
+ @Override
+ public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map taskConfigs) {
+ // check table is not upsert
+ Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE,
+ "RealtimeToOfflineTask doesn't support upsert table!");
+ // check no malformed period
+ TimeUtils.convertPeriodToMillis(
+ taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d"));
+ TimeUtils.convertPeriodToMillis(
+ taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d"));
+ TimeUtils.convertPeriodToMillis(
+ taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s"));
+ // check mergeType is correct
+ Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name())
+ .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name())
+ .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!");
- LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket",
- taskType, windowStartMs, windowEndMs);
- windowStartMs = windowEndMs;
- windowEndMs += bucketMs;
+ // check schema is not null
+ Preconditions.checkNotNull(schema, "Schema should not be null!");
+ // check no mis-configured columns
+ Set columnNames = schema.getColumnNames();
+ for (Map.Entry entry : taskConfigs.entrySet()) {
+ if (entry.getKey().endsWith(".aggregationType")) {
+ Preconditions.checkState(columnNames.contains(
+ StringUtils.removeEnd(entry.getKey(), RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
+ String.format("Column \"%s\" not found in schema!", entry.getKey()));
+ try {
+ // check that it's a valid aggregation function type
+ AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+ // check that a value aggregator is available
+ if (!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) {
+ throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString());
+ }
+ } catch (IllegalArgumentException e) {
+ String err =
+ String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue());
+ throw new IllegalStateException(err);
+ }
}
+ }
+ }
+
+ private List getDownloadURLList(List segmentNameList, Map segmentNameVsDownloadURL) {
+ List downloadURLList = new ArrayList<>();
+ for (String segmentName : segmentNameList) {
+ downloadURLList.add(segmentNameVsDownloadURL.get(segmentName));
+ }
+ return downloadURLList;
+ }
+
+ @VisibleForTesting
+ List getFailedCheckpoints(
+ RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata,
+ Set existingOfflineTableSegmentNames) {
+
+ List checkPoints = realtimeToOfflineSegmentsTaskMetadata.getCheckPoints();
+
+ Set failedCheckpointSegments = new HashSet<>();
+ List failedCheckPoints = new ArrayList<>();
- if (skipGenerate) {
+ for (RealtimeToOfflineCheckpointCheckPoint checkPoint : checkPoints) {
+ if (checkPoint.isFailed()) {
+ // checkpoint is marked as failed only when its invalid offline segments
+ // of the checkpoints are deleted. This checkpoint has been already
+ // marked as failed.
+ // it's safe to skip them here.
continue;
}
+ // get offline segments
+ Set segmentTo = checkPoint.getSegmentsTo();
+ // If not all corresponding offline segments to a realtime segment exists,
+ // it means there was an issue with prev minion task. And segment needs
+ // to be re-processed.
+ boolean taskSuccessful = checkIfAllSegmentsExists(segmentTo, existingOfflineTableSegmentNames);
+
+ if (!taskSuccessful) {
+ Set segmentsFrom = checkPoint.getSegmentsFrom();
+ for (String segmentFrom : segmentsFrom) {
+ Preconditions.checkState(!failedCheckpointSegments.contains(segmentFrom),
+ "Multiple live checkpoints found for the segment");
+ failedCheckpointSegments.add(segmentFrom);
+ }
+ failedCheckPoints.add(checkPoint);
+ }
+ }
+
+ return failedCheckPoints;
+ }
+
+ private boolean checkIfAllSegmentsExists(Set expectedSegments,
+ Set currentTableSegments) {
+ return currentTableSegments.containsAll(expectedSegments);
+ }
+
+ @VisibleForTesting
+ void deleteInvalidOfflineSegments(String offlineTableName, Set existingOfflineTableSegmentNames,
+ List failedTaskCheckpoints) {
+
+ List invalidOfflineSegments = new ArrayList<>();
+
+ for (RealtimeToOfflineCheckpointCheckPoint checkPoint : failedTaskCheckpoints) {
+ Set expectedCorrespondingOfflineSegments = checkPoint.getSegmentsTo();
+ List segmentsToDelete =
+ getSegmentsToDelete(expectedCorrespondingOfflineSegments, existingOfflineTableSegmentNames);
+
+ if (!segmentsToDelete.isEmpty()) {
+ invalidOfflineSegments.addAll(segmentsToDelete);
+ }
+ }
+
+ if (!invalidOfflineSegments.isEmpty()) {
+ LOGGER.warn("Deleting invalid offline segments: {} of table: {}", invalidOfflineSegments, offlineTableName);
+ PinotResourceManagerResponse pinotResourceManagerResponse = _clusterInfoAccessor.getPinotHelixResourceManager()
+ .deleteSegments(offlineTableName, invalidOfflineSegments);
+
+ Preconditions.checkState(pinotResourceManagerResponse.isSuccessful(),
+ String.format("unable to delete invalid offline segments: %s", invalidOfflineSegments));
+ }
- Map configs = MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs,
- _clusterInfoAccessor);
- configs.putAll(getBaseTaskConfigs(tableConfig, segmentNames));
- configs.put(MinionConstants.DOWNLOAD_URL_KEY, StringUtils.join(downloadURLs, MinionConstants.URL_SEPARATOR));
- configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
-
- // Segment processor configs
- configs.put(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, String.valueOf(windowStartMs));
- configs.put(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, String.valueOf(windowEndMs));
- String roundBucketTimePeriod = taskConfigs.get(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY);
- if (roundBucketTimePeriod != null) {
- configs.put(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, roundBucketTimePeriod);
+ // All Invalid segments have been sent to Controller for deletion.
+ // Now we can mark these checkpoints as failed.
+ for (RealtimeToOfflineCheckpointCheckPoint checkPoint : failedTaskCheckpoints) {
+ checkPoint.setFailed();
+ }
+ }
+
+ private List getSegmentsToDelete(Set expectedCorrespondingOfflineSegments,
+ Set existingOfflineTableSegmentNames) {
+ List segmentsToDelete = new ArrayList<>();
+
+ // Iterate on all expectedCorrespondingOfflineSegments of realtime segments to be reprocessed.
+ // check which segments exists. They need to be deleted.
+ for (String expectedCorrespondingOfflineSegment : expectedCorrespondingOfflineSegments) {
+ if (existingOfflineTableSegmentNames.contains(expectedCorrespondingOfflineSegment)) {
+ segmentsToDelete.add(expectedCorrespondingOfflineSegment);
}
- // NOTE: Check and put both keys for backward-compatibility
- String mergeType = taskConfigs.get(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY);
- if (mergeType == null) {
- mergeType = taskConfigs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
+ }
+ return segmentsToDelete;
+ }
+
+ @VisibleForTesting
+ List filterOutDeletedSegments(Set segmentNames,
+ List currentTableSegments) {
+
+ List segmentZKMetadataListToRet = new ArrayList<>();
+
+ // filter out deleted/removed segments.
+ for (SegmentZKMetadata segmentZKMetadata : currentTableSegments) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ if (segmentNames.contains(segmentName)) {
+ segmentZKMetadataListToRet.add(segmentZKMetadata);
}
- if (mergeType != null) {
- configs.put(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, mergeType);
- configs.put(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, mergeType);
+ }
+ return segmentZKMetadataListToRet;
+ }
+
+ @VisibleForTesting
+ List generateNewSegmentsToProcess(List completedSegmentsZKMetadata,
+ long windowStartMs, long windowEndMs, long bucketMs, long bufferMs, String bufferTimePeriod,
+ Set lastLLCSegmentPerPartition,
+ RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata) {
+
+ String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+ List segmentZKMetadataList = new ArrayList<>();
+
+ while (true) {
+ // Check that execution window is older than bufferTime
+ if (windowEndMs > System.currentTimeMillis() - bufferMs) {
+ LOGGER.info(
+ "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping "
+ + "task "
+ + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
+ return new ArrayList<>();
}
- for (Map.Entry entry : taskConfigs.entrySet()) {
- if (entry.getKey().endsWith(RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
- configs.put(entry.getKey(), entry.getValue());
+
+ for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
+ long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
+
+ // Check overlap with window.
+ if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
+ // If last completed segment is being used, make sure that segment crosses over end of window.
+ // In the absence of this check, CONSUMING segments could contain some portion of the window. That data
+ // would be skipped forever.
+ if (lastLLCSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
+ LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
+ + "generation: {}", segmentName, taskType);
+ return new ArrayList<>();
+ }
+ segmentZKMetadataList.add(segmentZKMetadata);
}
}
- String maxNumRecordsPerSegment = taskConfigs.get(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
- if (maxNumRecordsPerSegment != null) {
- configs.put(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, maxNumRecordsPerSegment);
+
+ if (!segmentZKMetadataList.isEmpty()) {
+ break;
}
- pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
- LOGGER.info("Finished generating task configs for table: {} for task: {}", realtimeTableName, taskType);
+ LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket",
+ taskType, windowStartMs, windowEndMs);
+ windowStartMs = windowEndMs;
+ windowEndMs += bucketMs;
+ }
+
+ // At this point, there will be some segment which needs to be processed for RTO.
+ // Since we have input segments, we can update metadata to new window.
+ realtimeToOfflineSegmentsTaskMetadata.setWindowStartMs(windowStartMs);
+ realtimeToOfflineSegmentsTaskMetadata.setWindowEndMs(windowEndMs);
+ return segmentZKMetadataList;
+ }
+
+ @VisibleForTesting
+ void divideSegmentsAmongSubtasks(List segmentsToBeScheduled,
+ List> segmentNamesGroupList, Map segmentNameVsDownloadURL,
+ int maxNumRecordsPerSubTask) {
+ long numRecordsAdded = 0;
+ List segmentNames = new ArrayList<>();
+
+ for (SegmentZKMetadata segmentZKMetadata: segmentsToBeScheduled) {
+ segmentNames.add(segmentZKMetadata.getSegmentName());
+ segmentNameVsDownloadURL.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata.getDownloadUrl());
+
+ numRecordsAdded += segmentZKMetadata.getTotalDocs();
+
+ if (numRecordsAdded >= maxNumRecordsPerSubTask) {
+ segmentNamesGroupList.add(segmentNames);
+ segmentNames = new ArrayList<>();
+ numRecordsAdded = 0;
+ }
+ }
+
+ if (!segmentNames.isEmpty()) {
+ segmentNamesGroupList.add(segmentNames);
}
- return pinotTaskConfigs;
}
/**
* Fetch completed (DONE/UPLOADED) segment and partition information
*
- * @param realtimeTableName the realtime table name
- * @param completedSegmentsZKMetadata list for collecting the completed (DONE/UPLOADED) segments ZK metadata
+ * @param realtimeTableName the realtime table name
+ * @param completedSegmentsZKMetadata list for collecting the completed (DONE/UPLOADED) segments ZK metadata
* @param partitionToLatestLLCSegmentName map for collecting the partitionId to the latest LLC segment name
- * @param allPartitions set for collecting all partition ids
+ * @param allPartitions set for collecting all partition ids
*/
private void getCompletedSegmentsInfo(String realtimeTableName, List completedSegmentsZKMetadata,
Map partitionToLatestLLCSegmentName, Set allPartitions) {
@@ -283,81 +568,72 @@ private void getCompletedSegmentsInfo(String realtimeTableName, List completedSegmentsZKMetadata,
- long bucketMs) {
- ZNRecord realtimeToOfflineZNRecord =
- _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
- realtimeTableName);
- RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
- realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
- realtimeToOfflineZNRecord) : null;
-
- if (realtimeToOfflineSegmentsTaskMetadata == null) {
- // No ZNode exists. Cold-start.
- long watermarkMs;
-
- // Find the smallest time from all segments
- long minStartTimeMs = Long.MAX_VALUE;
- for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
- minStartTimeMs = Math.min(minStartTimeMs, segmentZKMetadata.getStartTimeMs());
- }
- Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
+ private RealtimeToOfflineSegmentsTaskMetadata getRTOTaskMetadata(String realtimeTableName,
+ List completedSegmentsZKMetadata,
+ long bucketMs, ZNRecord realtimeToOfflineZNRecord) {
- // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries
- // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window
- // [20200813, 20200814)
- watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
+ if (realtimeToOfflineZNRecord != null) {
+ return RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
+ realtimeToOfflineZNRecord);
+ }
- // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above
- realtimeToOfflineSegmentsTaskMetadata = new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
- _clusterInfoAccessor.setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
- MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, -1);
+ // No ZNode exists. Cold-start.
+ long watermarkMs;
+
+ // Find the smallest time from all segments
+ long minStartTimeMs = Long.MAX_VALUE;
+ for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
+ minStartTimeMs = Math.min(minStartTimeMs, segmentZKMetadata.getStartTimeMs());
}
- return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs();
+ Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
+
+ // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries
+ // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window
+ // [20200813, 20200814)
+ watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
+
+ return new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
}
- @Override
- public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map taskConfigs) {
- // check table is not upsert
- Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE,
- "RealtimeToOfflineTask doesn't support upsert table!");
- // check no malformed period
- TimeUtils.convertPeriodToMillis(
- taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "2d"));
- TimeUtils.convertPeriodToMillis(
- taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "1d"));
- TimeUtils.convertPeriodToMillis(
- taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, "1s"));
- // check mergeType is correct
- Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name())
- .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name())
- .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!");
- // check schema is not null
- Preconditions.checkNotNull(schema, "Schema should not be null!");
- // check no mis-configured columns
- Set columnNames = schema.getColumnNames();
+ private PinotTaskConfig createPinotTaskConfig(List segmentNameList, List downloadURLList,
+ String realtimeTableName, Map taskConfigs, TableConfig tableConfig, long windowStartMs,
+ long windowEndMs, String taskType) {
+
+ Map configs = MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs,
+ _clusterInfoAccessor);
+ configs.putAll(getBaseTaskConfigs(tableConfig, segmentNameList));
+ configs.put(MinionConstants.DOWNLOAD_URL_KEY, StringUtils.join(downloadURLList, MinionConstants.URL_SEPARATOR));
+ configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
+
+ // Segment processor configs
+ configs.put(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, String.valueOf(windowStartMs));
+ configs.put(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, String.valueOf(windowEndMs));
+ String roundBucketTimePeriod = taskConfigs.get(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY);
+ if (roundBucketTimePeriod != null) {
+ configs.put(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY, roundBucketTimePeriod);
+ }
+ // NOTE: Check and put both keys for backward-compatibility
+ String mergeType = taskConfigs.get(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY);
+ if (mergeType == null) {
+ mergeType = taskConfigs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
+ }
+ if (mergeType != null) {
+ configs.put(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, mergeType);
+ configs.put(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, mergeType);
+ }
for (Map.Entry entry : taskConfigs.entrySet()) {
- if (entry.getKey().endsWith(".aggregationType")) {
- Preconditions.checkState(columnNames.contains(
- StringUtils.removeEnd(entry.getKey(), RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
- String.format("Column \"%s\" not found in schema!", entry.getKey()));
- try {
- // check that it's a valid aggregation function type
- AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue());
- // check that a value aggregator is available
- if (!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) {
- throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString());
- }
- } catch (IllegalArgumentException e) {
- String err =
- String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue());
- throw new IllegalStateException(err);
- }
+ if (entry.getKey().endsWith(RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
+ configs.put(entry.getKey(), entry.getValue());
}
}
+ String maxNumRecordsPerSegment = taskConfigs.get(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
+ if (maxNumRecordsPerSegment != null) {
+ configs.put(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, maxNumRecordsPerSegment);
+ }
+
+ return new PinotTaskConfig(taskType, configs);
}
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 754f7224a248..fa1daac1baea 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -18,18 +18,26 @@
*/
package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.helix.model.IdealState;
import org.apache.helix.task.TaskState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineCheckpointCheckPoint;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
@@ -41,13 +49,16 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -61,6 +72,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+ private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
private final Map _streamConfigs = new HashMap<>();
@@ -95,6 +107,11 @@ public void testGenerateTasksCheckConfigs() {
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME))
.thenReturn(getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList(segmentZKMetadata.getSegmentName())));
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(mockPinotHelixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true)).thenReturn(new ArrayList<>());
+
+ when(mockClusterInfoProvide.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+
RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
generator.init(mockClusterInfoProvide);
@@ -151,6 +168,11 @@ public void testGenerateTasksSimultaneousConstraints() {
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME))
.thenReturn(getIdealState(REALTIME_TABLE_NAME, Lists.newArrayList(segmentZKMetadata.getSegmentName())));
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(mockPinotHelixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true)).thenReturn(new ArrayList<>());
+
+ when(mockClusterInfoProvide.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+
RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
generator.init(mockClusterInfoProvide);
@@ -244,6 +266,11 @@ public void testGenerateTasksNoMinionMetadata() {
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName())));
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(mockPinotHelixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true)).thenReturn(new ArrayList<>());
+
+ when(mockClusterInfoProvide.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+
// StartTime calculated using segment metadata
Map> taskConfigsMap = new HashMap<>();
taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
@@ -305,6 +332,11 @@ public void testGenerateTasksWithMinionMetadata() {
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName())));
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(mockPinotHelixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true)).thenReturn(new ArrayList<>());
+
+ when(mockClusterInfoProvide.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+
// Default configs
Map> taskConfigsMap = new HashMap<>();
taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
@@ -376,6 +408,122 @@ public void testGenerateTasksWithMinionMetadata() {
assertEquals(configs.get("m1" + RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX), "MAX");
}
+ @Test
+ public void testGenerateNewSegmentsToProcess() {
+ List completedSegmentsZKMetadata = new ArrayList<>();
+
+ long hourMillis = 3600 * 1000;
+ long pastTime = System.currentTimeMillis() - (2 * 24 * hourMillis);
+
+ ZNRecord znRecord1 = new ZNRecord("seg_1");
+ znRecord1.setSimpleField(CommonConstants.Segment.START_TIME, String.valueOf(pastTime + hourMillis));
+ znRecord1.setSimpleField(CommonConstants.Segment.END_TIME, String.valueOf(pastTime + 2 * hourMillis));
+
+ ZNRecord znRecord2 = new ZNRecord("seg_2");
+ znRecord2.setSimpleField(CommonConstants.Segment.START_TIME, String.valueOf(pastTime + hourMillis + 1));
+ znRecord2.setSimpleField(CommonConstants.Segment.END_TIME, String.valueOf(pastTime + 2 * hourMillis - 90));
+
+ ZNRecord znRecord3 = new ZNRecord("seg_3");
+ znRecord3.setSimpleField(CommonConstants.Segment.START_TIME, String.valueOf(pastTime + 6 * hourMillis + 1));
+ znRecord3.setSimpleField(CommonConstants.Segment.END_TIME, String.valueOf(pastTime + 8 * hourMillis));
+
+ ZNRecord znRecord4 = new ZNRecord("seg_4");
+ znRecord4.setSimpleField(CommonConstants.Segment.START_TIME, String.valueOf(pastTime + 6 * hourMillis + 90));
+ znRecord4.setSimpleField(CommonConstants.Segment.END_TIME, String.valueOf(pastTime + 8 * hourMillis + 12));
+
+ List znRecordList = ImmutableList.of(znRecord1, znRecord2, znRecord3, znRecord4);
+ for (ZNRecord znRecord : znRecordList) {
+ znRecord.setSimpleField(CommonConstants.Segment.TIME_UNIT, TimeUnit.MILLISECONDS.toString());
+ completedSegmentsZKMetadata.add(new SegmentZKMetadata(znRecord));
+ }
+
+ Set lastLLCSegmentPerPartition = new HashSet<>();
+ lastLLCSegmentPerPartition.add("seg_4");
+
+ RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+ new RealtimeToOfflineSegmentsTaskMetadata("test_REALTIME", 1);
+
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ List segmentZKMetadataList =
+ generator.generateNewSegmentsToProcess(completedSegmentsZKMetadata, pastTime, pastTime + hourMillis, hourMillis,
+ (24 * hourMillis), "1d", lastLLCSegmentPerPartition,
+ realtimeToOfflineSegmentsTaskMetadata);
+
+ assert segmentZKMetadataList.size() == 2;
+ assert "seg_1".equals(segmentZKMetadataList.get(0).getSegmentName());
+ assert "seg_2".equals(segmentZKMetadataList.get(1).getSegmentName());
+ assert (pastTime + hourMillis) == realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
+ assert (pastTime + 2 * hourMillis) == realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
+ }
+
+ @Test
+ public void testGenerateTasksWithSegmentUploadFailure() {
+ // store partial offline segments in Zk metadata.
+ ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+ when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
+ RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+ getRealtimeToOfflineSegmentsTaskMetadata();
+ when(mockClusterInfoProvide
+ .getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, REALTIME_TABLE_NAME)).thenReturn(
+ realtimeToOfflineSegmentsTaskMetadata.toZNRecord()); // 21 May 2020 UTC
+ SegmentZKMetadata segmentZKMetadata1 =
+ getSegmentZKMetadata("githubEvents__0__0__20241213T2002Z", Status.DONE, 1589972400000L, 1590048000000L,
+ TimeUnit.MILLISECONDS, "download1"); // 05-20-2020T11:00:00 to 05-21-2020T08:00:00 UTC
+ SegmentZKMetadata segmentZKMetadata2 =
+ getSegmentZKMetadata("githubEvents__0__0__20241213T2003Z", Status.DONE, 1590048000000L, 1590134400000L,
+ TimeUnit.MILLISECONDS, "download2"); // 05-21-2020T08:00:00 UTC to 05-22-2020T08:00:00 UTC
+ when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(segmentZKMetadata1, segmentZKMetadata2));
+ when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
+ Lists.newArrayList(segmentZKMetadata1.getSegmentName(), segmentZKMetadata2.getSegmentName())));
+
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(mockPinotHelixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true)).thenReturn(
+ List.of("githubEventsOffline__0__0__20241213T2002Z"));
+
+ when(mockClusterInfoProvide.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+ ArgumentCaptor captor = ArgumentCaptor.forClass(List.class);
+ when(mockPinotHelixResourceManager.deleteSegments(Mockito.eq(OFFLINE_TABLE_NAME), captor.capture())).thenReturn(
+ PinotResourceManagerResponse.success(""));
+
+ // Default configs
+ Map> taskConfigsMap = new HashMap<>();
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+ TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
+ List pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+
+ List capturedList = captor.getValue();
+ assert capturedList.size() == 1;
+ assert capturedList.get(0).equals("githubEventsOffline__0__0__20241213T2002Z");
+
+ assertEquals(pinotTaskConfigs.size(), 1);
+ assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
+ Map configs = pinotTaskConfigs.get(0).getConfigs();
+ assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), REALTIME_TABLE_NAME);
+ assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY),
+ "githubEvents__0__0__20241213T2002Z,githubEvents__0__0__20241213T2003Z");
+ assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), "download1,download2");
+ assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), "1589972400000");
+ assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590058800000");
+ }
+
+ private RealtimeToOfflineSegmentsTaskMetadata getRealtimeToOfflineSegmentsTaskMetadata() {
+ List checkPoints = new ArrayList<>();
+ RealtimeToOfflineCheckpointCheckPoint checkPoint =
+ new RealtimeToOfflineCheckpointCheckPoint(
+ new HashSet<>(Arrays.asList("githubEvents__0__0__20241213T2002Z", "githubEvents__0__0__20241213T2003Z")),
+ new HashSet<>(Arrays.asList("githubEventsOffline__0__0__20241213T2002Z",
+ "githubEventsOffline__0__0__20241213T2003Z")),
+ "1");
+
+ checkPoints.add(checkPoint);
+ return new RealtimeToOfflineSegmentsTaskMetadata("testTable_REALTIME", 1589972400000L, 1590058800000L,
+ checkPoints);
+ }
+
/**
* Tests for skipping task generation due to CONSUMING segments overlap with window
*/
@@ -403,6 +551,11 @@ public void testOverflowIntoConsuming() {
RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
generator.init(mockClusterInfoProvide);
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(mockPinotHelixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true)).thenReturn(new ArrayList<>());
+
+ when(mockClusterInfoProvide.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+
// last COMPLETED segment's endTime is less than windowEnd time. CONSUMING segment overlap. Skip task
List pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertTrue(pinotTaskConfigs.isEmpty());
@@ -453,6 +606,11 @@ public void testTimeGap() {
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
Lists.newArrayList(segmentZKMetadata.getSegmentName())));
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(mockPinotHelixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true)).thenReturn(new ArrayList<>());
+
+ when(mockClusterInfoProvide.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+
RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
generator.init(mockClusterInfoProvide);
@@ -486,6 +644,11 @@ public void testBuffer() {
when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(getIdealState(REALTIME_TABLE_NAME,
Lists.newArrayList(segmentZKMetadata.getSegmentName())));
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(mockPinotHelixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true)).thenReturn(new ArrayList<>());
+
+ when(mockClusterInfoProvide.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+
RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
generator.init(mockClusterInfoProvide);
@@ -525,7 +688,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
.addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
- when(mockPinotHelixResourceManager.getSchemaForTableConfig(Mockito.any())).thenReturn(schema);
+ when(mockPinotHelixResourceManager.getSchemaForTableConfig(any())).thenReturn(schema);
RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
taskGenerator.init(mockClusterInfoAccessor);
@@ -645,6 +808,153 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
taskGenerator.validateTaskConfigs(tableConfig, schema, validAgg2Config);
}
+ @Test
+ public void testDivideSegmentsAmongSubtasks() {
+ RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
+
+ ZNRecord znRecord1 = new ZNRecord("seg_1");
+ znRecord1.setSimpleField(CommonConstants.Segment.TOTAL_DOCS, "70");
+ znRecord1.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "seg_1.tar");
+
+ ZNRecord znRecord2 = new ZNRecord("seg_2");
+ znRecord2.setSimpleField(CommonConstants.Segment.TOTAL_DOCS, "30");
+ znRecord2.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "seg_2.tar");
+
+ ZNRecord znRecord3 = new ZNRecord("seg_3");
+ znRecord3.setSimpleField(CommonConstants.Segment.TOTAL_DOCS, "101");
+ znRecord3.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "seg_3.tar");
+
+ ZNRecord znRecord4 = new ZNRecord("seg_4");
+ znRecord4.setSimpleField(CommonConstants.Segment.TOTAL_DOCS, "1");
+ znRecord4.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "seg_4.tar");
+
+ ZNRecord znRecord5 = new ZNRecord("seg_5");
+ znRecord5.setSimpleField(CommonConstants.Segment.TOTAL_DOCS, "98");
+ znRecord5.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "seg_5.tar");
+
+ ZNRecord znRecord6 = new ZNRecord("seg_6");
+ znRecord6.setSimpleField(CommonConstants.Segment.TOTAL_DOCS, "123");
+ znRecord6.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "seg_6.tar");
+
+ ZNRecord znRecord7 = new ZNRecord("seg_7");
+ znRecord7.setSimpleField(CommonConstants.Segment.TOTAL_DOCS, "1");
+ znRecord7.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "seg_7.tar");
+
+ List znRecordList =
+ ImmutableList.of(znRecord1, znRecord2, znRecord3, znRecord4, znRecord5, znRecord6, znRecord7);
+
+ List segmentsToBeScheduled = new ArrayList<>();
+ List> segmentNamesGroupList = new ArrayList<>();
+ Map segmentNameVsDownloadURL = new HashMap<>();
+ int maxNumRecordsPerSubTask = 100;
+
+ for (ZNRecord znRecord: znRecordList) {
+ segmentsToBeScheduled.add(new SegmentZKMetadata(znRecord));
+ }
+
+ taskGenerator.divideSegmentsAmongSubtasks(segmentsToBeScheduled, segmentNamesGroupList, segmentNameVsDownloadURL,
+ maxNumRecordsPerSubTask);
+
+ assert segmentNamesGroupList.size() == 4;
+ assert "seg_1,seg_2".equals(String.join(",", segmentNamesGroupList.get(0)));
+ assert "seg_3".equals(String.join(",", segmentNamesGroupList.get(1)));
+ assert "seg_4,seg_5,seg_6".equals(String.join(",", segmentNamesGroupList.get(2)));
+ assert "seg_7".equals(String.join(",", segmentNamesGroupList.get(3)));
+
+ assert segmentNameVsDownloadURL.size() == 7;
+ for (String segmentName: segmentNameVsDownloadURL.keySet()) {
+ assert (segmentName + ".tar").equals(segmentNameVsDownloadURL.get(segmentName));
+ }
+ }
+
+ @Test
+ public void testGetFailedCheckpoints() {
+ RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
+ Set segmentsPresentInOfflineTable =
+ new HashSet<>(Arrays.asList("seg_1", "seg_2", "seg_3", "seg_4", "seg_5", "seg_6", "seg_7"));
+
+ List checkPoints = new ArrayList<>();
+ checkPoints.add(new RealtimeToOfflineCheckpointCheckPoint(
+ new HashSet<>(Arrays.asList("seg_realtime_1", "seg_realtime_2")),
+ new HashSet<>(Arrays.asList("seg_1", "seg_4", "seg_5")),
+ "1", "task_1", true)
+ );
+ checkPoints.add(new RealtimeToOfflineCheckpointCheckPoint(
+ new HashSet<>(Arrays.asList("seg_realtime_3", "seg_realtime_4")),
+ new HashSet<>(Arrays.asList("seg_2")),
+ "2")
+ );
+ RealtimeToOfflineCheckpointCheckPoint checkPoint = new RealtimeToOfflineCheckpointCheckPoint(
+ new HashSet<>(Arrays.asList("seg_realtime_5", "seg_realtime_6")),
+ new HashSet<>(Arrays.asList("seg_6", "seg_8")),
+ "2");
+ checkPoints.add(checkPoint);
+
+ RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+ new RealtimeToOfflineSegmentsTaskMetadata("test_REALTIME", System.currentTimeMillis() - 100000,
+ System.currentTimeMillis() - 1000, checkPoints);
+ List failedCheckpoints =
+ taskGenerator.getFailedCheckpoints(realtimeToOfflineSegmentsTaskMetadata, segmentsPresentInOfflineTable);
+
+ assert failedCheckpoints.size() == 1;
+ assert !failedCheckpoints.get(0).isFailed();
+ assert failedCheckpoints.get(0).getId().equals(checkPoint.getId());
+ }
+
+ @Test
+ public void testFilterOutDeletedSegments() {
+ RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
+ Set segmentNames = new HashSet<>(Arrays.asList("seg_1", "seg_2", "seg_3", "seg_4"));
+ List currentTableSegments =
+ Arrays.asList(new SegmentZKMetadata("seg_1"), new SegmentZKMetadata("seg_3"), new SegmentZKMetadata("seg_4"));
+ List segmentZKMetadataList =
+ taskGenerator.filterOutDeletedSegments(segmentNames, currentTableSegments);
+ assert segmentZKMetadataList.size() == 3;
+ StringBuilder liveSegmentNames = new StringBuilder();
+ for (SegmentZKMetadata segmentZKMetadata: segmentZKMetadataList) {
+ liveSegmentNames.append(segmentZKMetadata.getSegmentName()).append(",");
+ }
+ assert "seg_1,seg_3,seg_4,".contentEquals(liveSegmentNames);
+ }
+
+ @Test
+ public void testDeleteInvalidOfflineSegments() {
+ Set existingOfflineSegmentNames = new HashSet<>(Arrays.asList("seg_1", "seg_2", "seg_3", "seg_4"));
+
+ List checkPoints = new ArrayList<>();
+ checkPoints.add(new RealtimeToOfflineCheckpointCheckPoint(
+ new HashSet<>(Arrays.asList("seg_realtime_1", "seg_realtime_2")),
+ new HashSet<>(Arrays.asList("seg_1", "seg_4", "seg_5")),
+ "1")
+ );
+ checkPoints.add(new RealtimeToOfflineCheckpointCheckPoint(
+ new HashSet<>(Arrays.asList("seg_realtime_3", "seg_realtime_4")),
+ new HashSet<>(Arrays.asList("seg_2")),
+ "1")
+ );
+
+ ClusterInfoAccessor mockClusterInfoAccessor = mock(ClusterInfoAccessor.class);
+ PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(mockClusterInfoAccessor.getPinotHelixResourceManager()).thenReturn(mockPinotHelixResourceManager);
+ RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new RealtimeToOfflineSegmentsTaskGenerator();
+ taskGenerator.init(mockClusterInfoAccessor);
+ ArgumentCaptor captor = ArgumentCaptor.forClass(List.class);
+ when(mockPinotHelixResourceManager.deleteSegments(Mockito.eq("test_OFFLINE"), captor.capture())).thenReturn(
+ PinotResourceManagerResponse.success(""));
+
+ taskGenerator.deleteInvalidOfflineSegments("test_OFFLINE", existingOfflineSegmentNames, checkPoints);
+ List capturedList = captor.getValue();
+
+ assert checkPoints.get(0).isFailed();
+ assert checkPoints.get(1).isFailed();
+
+ StringBuilder segmentNames = new StringBuilder();
+ for (String segmentName: capturedList) {
+ segmentNames.append(segmentName).append(",");
+ }
+ assert "seg_1,seg_4,seg_2,".contentEquals(segmentNames);
+ }
+
private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status status, long startTime, long endTime,
TimeUnit timeUnit, String downloadURL) {
SegmentZKMetadata realtimeSegmentZKMetadata = new SegmentZKMetadata(segmentName);