Skip to content

[#2472] feat(spark): Add an rpc method to obtain the uniffleId and delete the Write Stage for retry at the same time. #2473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ public Thread newThread(Runnable r) {
ShuffleDataDistributionType.NORMAL,
RssMRConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
0,
remoteMergeEnable
? MergeContext.newBuilder()
.setKeyClass(conf.getMapOutputKeyClass().getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ public void registerShuffle(
RemoteStorageInfo storageType,
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

Expand Down Expand Up @@ -548,9 +547,6 @@ public ShuffleAssignmentsInfo getShuffleAssignments(
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds,
int stageId,
int stageAttemptNumber,
boolean reassign,
long retryIntervalMs,
int retryTimes) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,6 @@ public void registerShuffle(
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

Expand Down Expand Up @@ -781,9 +780,6 @@ public ShuffleAssignmentsInfo getShuffleAssignments(
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds,
int stageId,
int stageAttemptNumber,
boolean reassign,
long retryIntervalMs,
int retryTimes) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,6 @@ public class RssSparkConfig {
.withDeprecatedKeys(RssClientConfig.RSS_RESUBMIT_STAGE)
.withDescription("Whether to enable the resubmit stage for fetch/write failure");

public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED =
ConfigOptions.key("rss.stageRetry.fetchFailureEnabled")
.booleanType()
.defaultValue(false)
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
.withDescription(
"If set to true, the stage retry mechanism will be enabled when a fetch failure occurs.");

public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED =
ConfigOptions.key("rss.stageRetry.writeFailureEnabled")
.booleanType()
.defaultValue(false)
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
.withDescription(
"If set to true, the stage retry mechanism will be enabled when a write failure occurs.");

public static final ConfigOption<Boolean> RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED =
ConfigOptions.key("rss.blockId.selfManagementEnabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
Expand All @@ -54,7 +55,7 @@
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.Constants;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_ENABLED;

public class RssSparkShuffleUtils {

Expand Down Expand Up @@ -366,17 +367,21 @@ public static RssException reportRssFetchFailedException(
SparkConf sparkConf,
String appId,
int shuffleId,
int uniffleShuffleId,
int stageAttemptId,
int stageAttemptNumber,
Set<Integer> failedPartitions) {
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED)
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_ENABLED)
&& RssSparkShuffleUtils.isStageResubmitSupported()) {
for (int partitionId : failedPartitions) {
RssReportShuffleFetchFailureRequest req =
new RssReportShuffleFetchFailureRequest(
appId,
shuffleId,
uniffleShuffleId,
stageAttemptId,
stageAttemptNumber,
partitionId,
rssFetchFailedException.getMessage());
RssReportShuffleFetchFailureResponse response =
Expand Down Expand Up @@ -404,4 +409,8 @@ public static boolean isSparkUIEnabled(SparkConf conf) {
}
return false;
}

public static String getAppShuffleIdentifier(int appShuffleId, TaskContext context) {
return appShuffleId + "-" + context.stageId() + "-" + context.stageAttemptNumber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@
import java.io.IOException;
import java.util.Map;

import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;

import org.apache.uniffle.common.util.JavaUtils;

public class ShuffleHandleInfoManager implements Closeable {
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
private Map<Integer, MutableShuffleHandleInfo> shuffleIdToShuffleHandleInfo;

public ShuffleHandleInfoManager() {
this.shuffleIdToShuffleHandleInfo = JavaUtils.newConcurrentMap();
}

public ShuffleHandleInfo get(int shuffleId) {
public MutableShuffleHandleInfo get(int shuffleId) {
return shuffleIdToShuffleHandleInfo.get(shuffleId);
}

public void remove(int shuffleId) {
shuffleIdToShuffleHandleInfo.remove(shuffleId);
}

public void register(int shuffleId, ShuffleHandleInfo handle) {
public void register(int shuffleId, MutableShuffleHandleInfo handle) {
shuffleIdToShuffleHandleInfo.put(shuffleId, handle);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ private RssFetchFailedIterator(Builder builder, Iterator<Product2<K, C>> iterato
public static class Builder {
private String appId;
private int shuffleId;
private int uniffleShuffleId;
private int partitionId;
private int stageAttemptId;
private int stageAttemptNumber;
private Supplier<ShuffleManagerClient> managerClientSupplier;

private Builder() {}
Expand All @@ -64,6 +66,11 @@ Builder shuffleId(int shuffleId) {
return this;
}

Builder uniffleShuffleId(int uniffleShuffleId) {
this.uniffleShuffleId = uniffleShuffleId;
return this;
}

Builder partitionId(int partitionId) {
this.partitionId = partitionId;
return this;
Expand All @@ -74,6 +81,11 @@ Builder stageAttemptId(int stageAttemptId) {
return this;
}

Builder stageAttemptNumber(int stageAttemptNumber) {
this.stageAttemptNumber = stageAttemptNumber;
return this;
}

Builder managerClientSupplier(Supplier<ShuffleManagerClient> managerClientSupplier) {
this.managerClientSupplier = managerClientSupplier;
return this;
Expand All @@ -95,7 +107,9 @@ private RssException generateFetchFailedIfNecessary(RssFetchFailedException e) {
new RssReportShuffleFetchFailureRequest(
builder.appId,
builder.shuffleId,
builder.uniffleShuffleId,
builder.stageAttemptId,
builder.stageAttemptNumber,
builder.partitionId,
e.getMessage());
RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,19 @@
public class AddBlockEvent {

private String taskId;
private int stageAttemptNumber;
private List<ShuffleBlockInfo> shuffleDataInfoList;
private List<Runnable> processedCallbackChain;
private WriteBufferManager bufferManager;

public AddBlockEvent(String taskId, List<ShuffleBlockInfo> shuffleDataInfoList) {
this(taskId, 0, shuffleDataInfoList, null);
this(taskId, shuffleDataInfoList, null);
}

public AddBlockEvent(
String taskId,
int stageAttemptNumber,
List<ShuffleBlockInfo> shuffleDataInfoList,
WriteBufferManager writeBufferManager) {
this.taskId = taskId;
this.stageAttemptNumber = stageAttemptNumber;
this.shuffleDataInfoList = shuffleDataInfoList;
this.processedCallbackChain = new ArrayList<>();
this.bufferManager = writeBufferManager;
Expand All @@ -55,10 +52,6 @@ public String getTaskId() {
return taskId;
}

public int getStageAttemptNumber() {
return stageAttemptNumber;
}

public List<ShuffleBlockInfo> getShuffleDataInfoList() {
return shuffleDataInfoList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,7 @@ public CompletableFuture<Long> send(AddBlockEvent event) {
try {
result =
shuffleWriteClient.sendShuffleData(
rssAppId,
event.getStageAttemptNumber(),
shuffleBlockInfoList,
() -> !isValidTask(taskId));
rssAppId, shuffleBlockInfoList, () -> !isValidTask(taskId));
putBlockId(taskToSuccessBlockIds, taskId, result.getSuccessBlockIds());
putFailedBlockSendTracker(
taskToFailedBlockSendTracker, taskId, result.getFailedBlockSendTracker());
Expand Down
Loading
Loading