Skip to content

[#1796] fix(spark): Implicitly unregister map output on fetch failure#1797

Draft
zuston wants to merge 23 commits intoapache:masterfrom
zuston:1796
Draft

[#1796] fix(spark): Implicitly unregister map output on fetch failure#1797
zuston wants to merge 23 commits intoapache:masterfrom
zuston:1796

Conversation

@zuston
Copy link
Member

@zuston zuston commented Jun 17, 2024

What changes were proposed in this pull request?

  1. Implicitly unregister map output on fetch failure
  2. Introduce the unified RssShuffleStatus to track the stage task failure, and depending on this, fix the incorrect retry check condition that will be checked the task failure times whether reaching the spark.task.maxFailures value rather than partitionId failure or shuffleServer failure.
  3. Remove the 2-phase rpcs of write/fetch by using the simple rpc on reportFetch/WriteFailure to speed up

Why are the changes needed?

Fix: #1796. #1801 #1798

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit tests

@zuston zuston marked this pull request as draft June 17, 2024 06:36
@zuston zuston requested a review from advancedxy June 17, 2024 06:36
@zuston
Copy link
Member Author

zuston commented Jun 17, 2024

Could you help review this? @advancedxy If I understand incorrectly, feel free to point out

@github-actions
Copy link

github-actions bot commented Jun 17, 2024

Test Results

2 116 files   -   533  2 116 suites   - 533   2h 28m 20s ⏱️ - 2h 58m 20s
  656 tests  -   289    640 ✅  -   304   1 💤 ±0   2 ❌ + 2   13 🔥 + 13 
9 826 runs   - 1 955  9 586 ✅  - 2 180  15 💤 ±0  30 ❌ +30  195 🔥 +195 

For more details on these failures and errors, see this check.

Results for commit 73e5020. ± Comparison against base commit f8e4329.

This pull request removes 289 tests.
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testCombineBuffer
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testCommitBlocksWhenMemoryShuffleDisabled
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testOnePartition
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testWriteException
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testWriteNormal
org.apache.hadoop.mapred.SortWriteBufferTest ‑ testReadWrite
org.apache.hadoop.mapred.SortWriteBufferTest ‑ testSortBufferIterator
org.apache.hadoop.mapreduce.RssMRUtilsTest ‑ applyDynamicClientConfTest
org.apache.hadoop.mapreduce.RssMRUtilsTest ‑ baskAttemptIdTest
org.apache.hadoop.mapreduce.RssMRUtilsTest ‑ blockConvertTest
…

♻️ This comment has been updated with latest results.

RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req);
if (response.getReSubmitWholeStage()) {
TaskContext taskContext = TaskContext.get();
RssReassignServersRequest rssReassignServersRequest =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bring this up.

I think I made a mistake in the previous impl, which doesn't unregister all the map output with shuffle fetch failure.
I think the right place to unregister the map output should be ShuffleManagerGrpcService's reportShuffleFetchFailure. When enough number of fetch failure is reported, it should unregister all the map output and tell the client to report a FetchFailedException. The shuffle server reassignment could be triggered too if configured to do so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good insight. Thanks for your advice and I will go on

@zuston zuston marked this pull request as ready for review June 19, 2024 09:23
@zuston
Copy link
Member Author

zuston commented Jun 19, 2024

cc @yl09099 Could you help check some write failure logic, I have refactored these part code and make it align with the fetch failure.

LOG.warn("The shuffleId:{}, stageId:{} has been retried. Ignore it.");
return false;
}
if (shuffleStatus.getTaskFailureAttemptCount() >= sparkTaskMaxFailures) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SMJ, one stage has two shuffle readers. If a task fails due to two different shuffle reader, the condition readerShuffleStatus.getTaskFailureAttemptCount() >= sparkTaskMaxFailures, will not expect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think more about this case. Do you some further solutions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we can add a state for the stage contain all the shuffleStatus in this stage.

class RssShuffleStageFailureState {
   int stageId;
   List<RssShuffleStatus> shuffleStatusList;
   boolean activateStageRetry();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. StageId is a good trigger condition for the retry checking. But I'm not sure what I missed, especially for some corner cases? Could you help give some extra advice? @jerqi @advancedxy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return true;
}
return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one case, the Reader triggers retry, and the retry is recorded. After the Writer fails to write data for several times, the retry is triggered. However, this method returns that the retry has been performed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The same stageIdAttemptNumber retry will ocurr one time, is this incorrect? @yl09099

@zuston
Copy link
Member Author

zuston commented Jun 24, 2024

Could you help review this? @advancedxy

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 0% with 48 lines in your changes missing coverage. Please review.

Project coverage is 53.43%. Comparing base (dddcced) to head (afea817).
Report is 20 commits behind head on master.

Files Patch % Lines
...t/request/RssReportShuffleFetchFailureRequest.java 0.00% 15 Missing ⚠️
...t/request/RssReportShuffleWriteFailureRequest.java 0.00% 10 Missing ⚠️
...ffle/common/exception/RssFetchFailedException.java 0.00% 8 Missing ⚠️
...ffle/client/impl/grpc/ShuffleServerGrpcClient.java 0.00% 7 Missing ⚠️
...client/impl/grpc/ShuffleServerGrpcNettyClient.java 0.00% 4 Missing ⚠️
...torage/handler/impl/ComposedClientReadHandler.java 0.00% 4 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1797      +/-   ##
============================================
- Coverage     53.53%   53.43%   -0.11%     
- Complexity     2356     2395      +39     
============================================
  Files           368      371       +3     
  Lines         16852    17156     +304     
  Branches       1540     1571      +31     
============================================
+ Hits           9022     9167     +145     
- Misses         7303     7451     +148     
- Partials        527      538      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@advancedxy
Copy link
Contributor

Could you help review this? @advancedxy

The's CI failures, you may need to take a look at that first.

I'll take a look at this later tonight or tomorrow.

@advancedxy
Copy link
Contributor

advancedxy commented Jun 25, 2024

Thanks for working on this. I did a quick overview about this change, I think it's quite large to review. It would best to keep
this pr open and split it into several smaller PRs, namely:

  1. the API/interface change by including shuffle server info RssFetchFailed Exception(we may discuss more about that in the new PR), RssReportShuffleFetchFailureRequest and the RssShuffleStatus
  2. The rework of shuffle manager service and RssStageResubmitManager
  3. unify logic about fetch failure and write failure handling.
  4. left remaining logic if necessary.

@zuston
Copy link
Member Author

zuston commented Jun 25, 2024

Thanks for working on this. I did a quick overview about this change, I think it's quite large to review. It would best to keep this pr open and split it into several smaller PRs, namely:

  1. the API/interface change by including shuffle server info RssFetchFailed Exception(we may discuss more about that in the new PR), RssReportShuffleFetchFailureRequest and the RssShuffleStatus
  2. The rework of shuffle manager service and RssStageResubmitManager
  3. unify logic about fetch failure and write failure handling.
  4. left remaining logic if necessary.

Emm... Sorry I don't think this is a huge change, this is mostly based on your previous great work, just fix some bugs. And I don't have much time to rework to split multi PRs, so I hope we could review this in this PR to check whether some critical problems exist.

@advancedxy
Copy link
Contributor

Emm... Sorry I don't think this is a huge change, this is mostly based on your previous great work, just fix some bugs.

I agree it's not huge, but it's large enough that requires sufficient meta capacity and time to review, which unfortunately I don't have until this weekend.

And I don't have much time to rework to split multi PRs, so I hope we could review this in this PR to check whether some critical problems exist.

Well understood. However, I think this PR should be split into two PRs at least: handle fetch failure and handle write failures. I think the write failure is different from fetch failure and should be decoupled.

@jerqi do you have the time to review this by any chance? Otherwise, it will take some time and probably be reviewed in this weekend, do that sound right to you? @zuston

@zuston
Copy link
Member Author

zuston commented Jun 25, 2024

Good to know this. Thanks for your determined reply. @advancedxy

@zuston
Copy link
Member Author

zuston commented Jun 25, 2024

However, I think this PR should be split into two PRs at least: handle fetch failure and handle write failures. I think the write failure is different from fetch failure and should be decoupled.

Could you help show what’s the difference between fetc and write failure?

@zuston zuston marked this pull request as draft June 26, 2024 10:52
@advancedxy
Copy link
Contributor

Could you help show what’s the difference between fetch and write failure?

For starters, you should report shuffle failure and write failure via different request types. The stage retry logic is also a bit off as you need to retry the parent stage for fetch failure, but the current stage for write failure. They might have common logic to remove map output data, etc. But they should be handled in separate PRs instead of one, which makes the PR changes huge and hard to review.

@zuston
Copy link
Member Author

zuston commented Jun 27, 2024

After digging into this feature, I found there are many bugs and improvement need to be done. So I have to split them into small patch to fix. And this PR will be as the collection to place them to test the availablity

The stage retry logic is also a bit off as you need to retry the parent stage for fetch failure, but the current stage for write failure

Yes. This is the difference that I have distinguished in current PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Remaining map output when enable the stage retry on fetch failure

5 participants