-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RTO Task Overhaul (BugFix and Support to run multiple subtasks) #14623
base: master
Are you sure you want to change the base?
Changes from all commits
4ab1610
c71bcac
d0ca568
8db838b
3233c33
65e6aef
fd496bf
fec0b65
11c84be
31b3960
aaa72e3
2992595
58eb51c
f4ed406
07f831c
318e89e
b8e0daf
8e80c09
30a9459
d29af8d
8bcbe0f
01381a2
75fb4ba
546f27e
fae4aaa
97e8c49
4ddfbb1
7d3fa68
19b83c6
68cc920
84f471a
e365e91
1baf68e
1754dbc
ff4017e
9d329bc
21185e6
324a4cf
0f27067
0eba7cc
02b94fe
6405c35
f77e817
5cd9bdf
592f65c
cf97b6e
f2dcef2
b3cebb4
e8c1b9b
9eacf76
9a2eb80
32628d0
cc70645
903d519
066d925
66ff5ad
b307458
2d1f086
f183b85
46bbd20
00cc1bc
dca5736
6040793
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pinot.common.minion; | ||
|
||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.UUID; | ||
|
||
|
||
/** | ||
* ExpectedRealtimeOfflineTaskResult is created in | ||
* {@link org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor} | ||
* before uploading offline segment(s) to the offline table. | ||
* | ||
* The <code>_segmentsFrom</code> denotes the input RealtimeSegments. | ||
* The <code>_segmentsTo</code> denotes the expected offline segemnts. | ||
* The <code>_id</code> denotes the unique identifier of object. | ||
* The <code>_taskID</code> denotes the minion taskId. | ||
* The <code>_taskFailure</code> denotes the failure status of minion task handling the | ||
* current ExpectedResult. This is modified in | ||
* {@link org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskGenerator} | ||
* when a prev minion task is failed. | ||
* | ||
*/ | ||
public class ExpectedSubtaskResult { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we name this something like RealtimeToOfflineCheckpoint or something ? |
||
private final List<String> _segmentsFrom; | ||
private final List<String> _segmentsTo; | ||
private final String _id; | ||
private final String _taskID; | ||
private boolean _taskFailure = false; | ||
|
||
public ExpectedSubtaskResult(List<String> segmentsFrom, List<String> segmentsTo, String taskID) { | ||
_segmentsFrom = segmentsFrom; | ||
_segmentsTo = segmentsTo; | ||
_taskID = taskID; | ||
_id = UUID.randomUUID().toString(); | ||
} | ||
|
||
public ExpectedSubtaskResult(List<String> segmentsFrom, List<String> segmentsTo, | ||
String id, String taskID, boolean taskFailure) { | ||
_segmentsFrom = segmentsFrom; | ||
_segmentsTo = segmentsTo; | ||
_id = id; | ||
_taskID = taskID; | ||
_taskFailure = taskFailure; | ||
} | ||
|
||
public String getTaskID() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this itself be a uniqueId instead of new _id field? |
||
return _taskID; | ||
} | ||
|
||
public String getId() { | ||
return _id; | ||
} | ||
|
||
public List<String> getSegmentsFrom() { | ||
return _segmentsFrom; | ||
} | ||
|
||
public List<String> getSegmentsTo() { | ||
return _segmentsTo; | ||
} | ||
|
||
public boolean isTaskFailure() { | ||
return _taskFailure; | ||
} | ||
|
||
public void setTaskFailure() { | ||
_taskFailure = true; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (!(o instanceof ExpectedSubtaskResult)) { | ||
return false; | ||
} | ||
ExpectedSubtaskResult that = (ExpectedSubtaskResult) o; | ||
return Objects.equals(_id, that._id); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hashCode(_id); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,57 +18,183 @@ | |
*/ | ||
package org.apache.pinot.common.minion; | ||
|
||
import com.google.common.base.Preconditions; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.helix.zookeeper.datamodel.ZNRecord; | ||
|
||
|
||
/** | ||
* Metadata for the minion task of type <code>RealtimeToOfflineSegmentsTask</code>. | ||
* The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks have been executed. | ||
* | ||
* Metadata for the minion task of type <code>RealtimeToOfflineSegmentsTask</code>. The <code>_windowStartMs</code> | ||
* denotes the time (exclusive) until which it's certain that tasks have been completed successfully. The | ||
* <code>_expectedSubtaskResultMap</code> contains the expected RTO tasks result info. This map can contain both | ||
* completed and in-completed Tasks expected Results. This map is used by generator to validate whether a potential | ||
* segment (for RTO task) has already been successfully processed as a RTO task in the past or not. The | ||
* <code>_windowStartMs</code> and <code>_windowEndMs</code> denote the window bucket time of currently not | ||
* successfully completed minion task. bucket: [_windowStartMs, _windowEndMs) The window is updated by generator when | ||
* it's certain that prev minon task run is successful. | ||
* <p> | ||
* This gets serialized and stored in zookeeper under the path | ||
* MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask | ||
* | ||
* PinotTaskGenerator: | ||
* The <code>watermarkMs</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>, | ||
* to determine the window of execution for the task it is generating. | ||
* The window of execution will be [watermarkMs, watermarkMs + bucketSize) | ||
* | ||
* PinotTaskExecutor: | ||
* The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to: | ||
* - Verify that is is running the latest task scheduled by the task generator | ||
* - Update the watermark as the end of the window that it executed for | ||
* <p> | ||
* PinotTaskGenerator: The <code>_windowStartMs</code>> is used by the | ||
* <code>RealtimeToOfflineSegmentsTaskGenerator</code>, to determine the window of execution of the prev task based on | ||
* which it generates new task. | ||
* <p> | ||
* PinotTaskExecutor: The same windowStartMs is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to: | ||
* - Verify that it's running the latest task scheduled by the task generator. | ||
* - The _expectedSubtaskResultMap is updated before the offline segments are uploaded to the table. | ||
*/ | ||
public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata { | ||
|
||
private static final String WATERMARK_KEY = "watermarkMs"; | ||
private static final String WINDOW_START_KEY = "watermarkMs"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Value should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For Backward compatibility the value should remain same. Like when we deploy, watermark should be picked what was previously set. |
||
private static final String WINDOW_END_KEY = "windowEndMs"; | ||
private static final String COMMA_SEPARATOR = ","; | ||
private static final String SEGMENT_NAME_TO_EXPECTED_SUBTASK_RESULT_ID_KEY = "segmentToExpectedSubtaskResultId"; | ||
|
||
private final String _tableNameWithType; | ||
private final long _watermarkMs; | ||
private long _windowStartMs; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would this change cause backward incompatibility issues during deployments? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. _windowStartMs will be read from |
||
private long _windowEndMs; | ||
private final Map<String, ExpectedSubtaskResult> _expectedSubtaskResultMap; | ||
private final Map<String, String> _segmentNameToExpectedSubtaskResultID; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this Map? Can we not use the previous map itself to get the list of segmentsFrom and segmentsTo for a specific taskId? We can the construct this map in memory, from that right? |
||
|
||
public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs) { | ||
public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long windowStartMs) { | ||
_windowStartMs = windowStartMs; | ||
_tableNameWithType = tableNameWithType; | ||
_watermarkMs = watermarkMs; | ||
_expectedSubtaskResultMap = new HashMap<>(); | ||
_segmentNameToExpectedSubtaskResultID = new HashMap<>(); | ||
} | ||
|
||
public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long windowStartMs, | ||
long windowEndMs, | ||
Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap, | ||
Map<String, String> segmentNameToExpectedSubtaskResultID) { | ||
_tableNameWithType = tableNameWithType; | ||
_windowStartMs = windowStartMs; | ||
_expectedSubtaskResultMap = expectedSubtaskResultMap; | ||
_windowEndMs = windowEndMs; | ||
_segmentNameToExpectedSubtaskResultID = segmentNameToExpectedSubtaskResultID; | ||
} | ||
|
||
public String getTableNameWithType() { | ||
return _tableNameWithType; | ||
} | ||
|
||
/** | ||
* Get the watermark in millis | ||
*/ | ||
public long getWatermarkMs() { | ||
return _watermarkMs; | ||
public void setWindowStartMs(long windowStartMs) { | ||
_windowStartMs = windowStartMs; | ||
} | ||
|
||
public long getWindowStartMs() { | ||
return _windowStartMs; | ||
} | ||
|
||
public long getWindowEndMs() { | ||
return _windowEndMs; | ||
} | ||
|
||
public void setWindowEndMs(long windowEndMs) { | ||
_windowEndMs = windowEndMs; | ||
} | ||
|
||
public Map<String, ExpectedSubtaskResult> getExpectedSubtaskResultMap() { | ||
return _expectedSubtaskResultMap; | ||
} | ||
|
||
public Map<String, String> getSegmentNameToExpectedSubtaskResultID() { | ||
return _segmentNameToExpectedSubtaskResultID; | ||
} | ||
|
||
public void addExpectedSubTaskResult( | ||
ExpectedSubtaskResult newExpectedSubtaskResult) { | ||
|
||
List<String> segmentsFrom = newExpectedSubtaskResult.getSegmentsFrom(); | ||
|
||
for (String segmentName : segmentsFrom) { | ||
if (_segmentNameToExpectedSubtaskResultID.containsKey(segmentName)) { | ||
String prevExpectedSubtaskResultID = | ||
_segmentNameToExpectedSubtaskResultID.get(segmentName); | ||
|
||
ExpectedSubtaskResult prevExpectedSubtaskResult = | ||
_expectedSubtaskResultMap.get(prevExpectedSubtaskResultID); | ||
|
||
// check if prevExpectedRealtimeToOfflineSubtaskResult is not null, since it could | ||
// have been removed in the same minion run previously. | ||
if (prevExpectedSubtaskResult != null) { | ||
Preconditions.checkState(prevExpectedSubtaskResult.isTaskFailure(), | ||
"ExpectedSubtaskResult can only be replaced if it's of a failed task"); | ||
} | ||
} | ||
|
||
_segmentNameToExpectedSubtaskResultID.put(segmentName, | ||
newExpectedSubtaskResult.getId()); | ||
_expectedSubtaskResultMap.put(newExpectedSubtaskResult.getId(), | ||
newExpectedSubtaskResult); | ||
} | ||
} | ||
|
||
public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znRecord) { | ||
long watermark = znRecord.getLongField(WATERMARK_KEY, 0); | ||
return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), watermark); | ||
long windowStartMs = znRecord.getLongField(WINDOW_START_KEY, 0); | ||
long windowEndMs = znRecord.getLongField(WINDOW_END_KEY, 0); | ||
Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap = | ||
new HashMap<>(); | ||
Map<String, List<String>> listFields = znRecord.getListFields(); | ||
|
||
if (listFields != null) { | ||
for (Map.Entry<String, List<String>> listField : listFields.entrySet()) { | ||
String expectedSubtaskResultId = listField.getKey(); | ||
|
||
List<String> value = listField.getValue(); | ||
Preconditions.checkState(value.size() == 4); | ||
|
||
List<String> segmentsFrom = Arrays.asList(StringUtils.split(value.get(0), COMMA_SEPARATOR)); | ||
List<String> segmentsTo = Arrays.asList(StringUtils.split(value.get(1), COMMA_SEPARATOR)); | ||
String taskID = value.get(2); | ||
boolean taskFailure = Boolean.parseBoolean(value.get(3)); | ||
|
||
expectedSubtaskResultMap.put(expectedSubtaskResultId, | ||
new ExpectedSubtaskResult(segmentsFrom, segmentsTo, expectedSubtaskResultId, taskID, | ||
taskFailure) | ||
); | ||
} | ||
} | ||
|
||
Map<String, Map<String, String>> mapFields = znRecord.getMapFields(); | ||
Map<String, String> segmentNameToExpectedSubtaskResultID = new HashMap<>(); | ||
if (mapFields != null) { | ||
segmentNameToExpectedSubtaskResultID = mapFields.get(SEGMENT_NAME_TO_EXPECTED_SUBTASK_RESULT_ID_KEY); | ||
} | ||
|
||
return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), windowStartMs, windowEndMs, | ||
expectedSubtaskResultMap, segmentNameToExpectedSubtaskResultID); | ||
} | ||
|
||
public ZNRecord toZNRecord() { | ||
ZNRecord znRecord = new ZNRecord(_tableNameWithType); | ||
znRecord.setLongField(WATERMARK_KEY, _watermarkMs); | ||
znRecord.setLongField(WINDOW_START_KEY, _windowStartMs); | ||
znRecord.setLongField(WINDOW_END_KEY, _windowEndMs); | ||
|
||
if (_expectedSubtaskResultMap != null) { | ||
for (String expectedSubtaskResultID : _expectedSubtaskResultMap.keySet()) { | ||
ExpectedSubtaskResult expectedSubtaskResult = | ||
_expectedSubtaskResultMap.get(expectedSubtaskResultID); | ||
|
||
String segmentsFrom = String.join(COMMA_SEPARATOR, expectedSubtaskResult.getSegmentsFrom()); | ||
String segmentsTo = String.join(COMMA_SEPARATOR, expectedSubtaskResult.getSegmentsTo()); | ||
String taskId = expectedSubtaskResult.getTaskID(); | ||
boolean taskFailure = expectedSubtaskResult.isTaskFailure(); | ||
|
||
List<String> listEntry = Arrays.asList(segmentsFrom, segmentsTo, taskId, Boolean.toString(taskFailure)); | ||
|
||
String id = expectedSubtaskResult.getId(); | ||
znRecord.setListField(id, listEntry); | ||
} | ||
} | ||
|
||
znRecord.setMapField(SEGMENT_NAME_TO_EXPECTED_SUBTASK_RESULT_ID_KEY, | ||
_segmentNameToExpectedSubtaskResultID); | ||
return znRecord; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this applicable for other tasks. If not, I'd suggest moving this data structure to a RTO specific module.