-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#2086] feat(spark): Support cut partition to slices and served by multiply server #2093
Conversation
fb19661
to
2d5d3ed
Compare
abf947f
to
059cf59
Compare
65ea348
to
6f9d6ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work. Thanks for your effort. Left some comments.
@@ -52,7 +52,7 @@ public class MutableShuffleHandleInfo extends ShuffleHandleInfoBase { | |||
*/ | |||
private Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers; | |||
|
|||
private Map<String, Set<ShuffleServerInfo>> excludedServerToReplacements; | |||
private Map<String, List<ShuffleServerInfo>> excludedServerToReplacements; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If using list instead of set, is it possible to store the duplicate replacement servers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this change, what the set of excludedServerToReplacements
stored the servers about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert the set -> list change.
proto/src/main/proto/Rss.proto
Outdated
@@ -62,6 +62,8 @@ message RequireBufferResponse { | |||
int64 requireBufferId = 1; | |||
StatusCode status = 2; | |||
string retMsg = 3; | |||
// need split partitions | |||
repeated int32 partitionIds = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not marking the partitionIds name as the needSplitPartitionIds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -148,4 +148,9 @@ public static boolean limitHugePartition( | |||
} | |||
return false; | |||
} | |||
|
|||
public static boolean hasExceedPartitionSplitLimit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not extract the general logic to limit the partitioned data? I think the partition split and huge partition limit could be scoped in the same abstract limit policies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address this after other comments addressed and tested.
704f57d
to
9437c2e
Compare
if (replacements == null || StringUtils.isEmpty(receivingFailureServerId)) { | ||
return Collections.emptySet(); | ||
return Collections.emptyList(); | ||
} | ||
excludedServerToReplacements.put(receivingFailureServerId, replacements); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zuston I found there are something wrong with excludedServerToReplacements
for split partition, as the server is not faulty server only just cannot store more data for specific partition, so it may should not be added to the exclude servers.
Reference to
public Set<String> listExcludedServers() {
return excludedServerToReplacements.keySet();
}
9835f38
to
cadf1da
Compare
cadf1da
to
aaf472b
Compare
Overall lgtm. I hope some test cases could be added to cover this case. |
Have any update on this? @maobaolong If you have applied in your internal cluster, please let me know. Thanks |
@zuston Yeah, I test this on our pressure test cluster and verified by our pressure test case. We have 10 node in that cluster, and run a 17TiB shuffle data spark application with 3 shuffle stage, there are 2000 partition for each shuffle stage. I verified the shuffle split result by #2133 and #2156. BTW, #2137 this PR make me easy to dynamic update conf and happy on test this feature. |
a3b64dc
to
831ee0f
Compare
505c625
to
993c00b
Compare
@zuston UT has been added, would you like to take another look? |
993c00b
to
949d57f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@zuston Thank you so much for the discussion and reviewing for this feature, I cannot finish this work without your help! Merge it now! |
… by multiply server (apache#2093) ### What changes were proposed in this pull request? Support sliced store partition to multiply server. Limitation: - Only finished tested the netty mode. ### Why are the changes needed? Fix: apache#2086 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Start multiply servers and coordinator on local - Start a spark standalone env on local - Start spark-shell and execute `test.scala` ```Console bin/spark-shell --master spark://localhost:7077 --deploy-mode client --conf spark.rss.client.reassign.blockRetryMaxTimes=3 --conf spark.rss.writer.buffer.spill.size=30 --conf spark.rss.client.reassign.enabled=true --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager --conf spark.rss.coordinator.quorum=localhost:19999 --conf spark.rss.storage.type=LOCALFILE --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.rss.test.mode.enable=true --conf spark.rss.client.type=GRPC_NETTY --conf spark.sql.shuffle.partitions=1 -i test.scala ``` - test.scala ```scala val data = sc.parallelize(Seq(("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 5), ("A", 6), ("A", 7),("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7))); val result = data.reduceByKey(_ + _); result.collect().foreach(println); System.exit(0); ``` <img width="410" alt="image" src="https://github.com/user-attachments/assets/7c72fa3e-cfb5-4361-9875-a82b6aeeedfb"> (cherry picked from commit 051a247)
What changes were proposed in this pull request?
Support sliced store partition to multiply server.
Limitation:
Why are the changes needed?
Fix: #2086
Does this PR introduce any user-facing change?
No.
How was this patch tested?
test.scala
bin/spark-shell --master spark://localhost:7077 --deploy-mode client --conf spark.rss.client.reassign.blockRetryMaxTimes=3 --conf spark.rss.writer.buffer.spill.size=30 --conf spark.rss.client.reassign.enabled=true --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager --conf spark.rss.coordinator.quorum=localhost:19999 --conf spark.rss.storage.type=LOCALFILE --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.rss.test.mode.enable=true --conf spark.rss.client.type=GRPC_NETTY --conf spark.sql.shuffle.partitions=1 -i test.scala