Skip to content

[#1824] feat(spark): Support map side combine of shuffle writer #1825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 26, 2024

Conversation

wForget
Copy link
Member

@wForget wForget commented Jun 24, 2024

What changes were proposed in this pull request?

Support map side combine of shuffle writer

Why are the changes needed?

Fix: #1824

Does this PR introduce any user-facing change?

Yes, support new shuffle writer behavior.

How was this patch tested?

Added integration test

Copy link

github-actions bot commented Jun 24, 2024

Test Results

 2 657 files  +16   2 657 suites  +16   5h 30m 44s ⏱️ + 2m 34s
   946 tests + 2     945 ✅ + 3   1 💤 ±0  0 ❌  - 1 
11 789 runs  +16  11 774 ✅ +17  15 💤 ±0  0 ❌  - 1 

Results for commit d48ba34. ± Comparison against base commit 1482804.

♻️ This comment has been updated with latest results.

if (isCombine) {
createCombiner = shuffleDependency.aggregator().get().createCombiner();
if (RssSparkConfig.toRssConf(sparkConf).get(RSS_CLIENT_MAP_SIDE_COMBINE_ENABLED)) {
iterator = shuffleDependency.aggregator().get().combineValuesByKey(records, taskContext);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the existence of shuffleDependency.aggregator() ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, will this map combine cause the disk/memory burden , especially on the tight disk space on k8s?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the existence of shuffleDependency.aggregator() ?

It seems not, there is a relevant check in ShuffleDependency.

https://github.com/apache/spark/blob/2ac2710b46be70064cd7286a9c86deb1ddc979cb/core/src/main/scala/org/apache/spark/Dependency.scala#L89-L91

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, will this map combine cause the disk/memory burden , especially on the tight disk space on k8s?

I think it is possible when there are spills. So I added a configuration to control whether it is enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh. I got your point, just an extra attention.

For having this point implicitly, let's make this shown in this config description.

@zuston zuston changed the title [#1824] Support map side combine of shuffle writer [#1824] feat(spark): Support map side combine of shuffle writer Jun 25, 2024
WriteAndReadMetricsSparkListener listener = new WriteAndReadMetricsSparkListener();
spark.sparkContext().addSparkListener(listener);

Thread.sleep(4000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, why so large sleep time?
It's not the problem of this pr, but I'm not a big fan of using sleep directly in the test. Could we replace it with something likes semaphore or similar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@wForget wForget Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the test cases keep it, I guess it's to wait for the shuffle server to be ready, do we still need to remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can left it as it is for now. However it will introduce a lot of unnecessary sleep time for integration tests, we should address it in a followup PRs.

ConfigOptions.key("rss.client.mapSideCombine.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable map side combine of shuffle writer.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also update the user documentation about this config?

I believe the reason we didn't perform map side combine when writing is that the map side could be super lightweight and improve write throughput. The documentation should state that implication as well.

Comment on lines 75 to 77
sc.parallelize(data, 10).mapToPair(x -> new Tuple2<>(x % 10, 1)).reduceByKey((x, y) -> x + y)
.collect().stream()
.forEach(x -> result.put(x._1 + "-result-value", x._2));
Copy link
Contributor

@advancedxy advancedxy Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing, could we add more test cases to demonstrate the map-side combine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses map side combine by default in vanilla spark implementation. We checked that rss behaves same as vanilla spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean more RDD calls that require map side combine other than reduceByKey.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean more RDD calls that require map side combine other than reduceByKey.

Got it, I will add tests for other operators

}

// check map side combine
assertEquals(100L, result.get("0-write-records"));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When map side combine is in effect, the shuffle write records of stage 0 is 100, otherwise it is 1000.

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost lgtm now, it would be great if we can update the documentation as well.

@rickyma
Copy link
Contributor

rickyma commented Jun 25, 2024

Would you mind adding the new config into the markdown documentation?

@wForget
Copy link
Member Author

wForget commented Jun 26, 2024

@advancedxy @rickyma Thank you for your review , I will add doc later.

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for your contribution.

Merging this as there's no objection for now. Others are welcome to leave comments if any, they will be addressed in follow-up PRs.

@advancedxy advancedxy merged commit e0996f2 into apache:master Jun 26, 2024
43 checks passed
zuston pushed a commit to zuston/incubator-uniffle that referenced this pull request Feb 11, 2025
…apache#1825)

Support map side combine of shuffle write

Fix: apache#1824

Yes, support new shuffle writer behavior.

Added integration test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Support map side combine of shuffle writer
4 participants