Skip to content

Commit

Permalink
modify map between workers from two stages
Browse files Browse the repository at this point in the history
  • Loading branch information
huasiy committed Oct 28, 2024
1 parent f82f85d commit 119fe31
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public void addStageCoordinator(StageCoordinator stageCoordinator, StageDependen
checkArgument(stageCoordinator.getStageId() == stageDependency.getCurrentStageId(),
"the stageDependency does not belong to the stageCoordinator");
this.stageCoordinators.put(stageId, stageCoordinator);
if (stageDependency.getDownStreamStageId() != -1)
{
StageCoordinator parentStageCoordinator = this.stageCoordinators.get(stageDependency.getDownStreamStageId());
stageCoordinator.setDownStreamWorkerNum(parentStageCoordinator.getFixedWorkerNum());
}
this.stageDependencies.put(stageId, stageDependency);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ public class StageCoordinator

private final int stageId;
private final boolean isQueued;
private int downStreamWorkerNum;
private final int fixedWorkerNum;
private final TaskQueue<Task> taskQueue;
private final Map<Long, Worker<CFWorkerInfo>> workerIdToWorkers = new ConcurrentHashMap<>();
// this.workers is used for dependency checking, no concurrent reads and writes
private final List<Worker<CFWorkerInfo>> workers = new ArrayList<>();
private final Map<Long, Integer> workerIdToWorkerIndex = new ConcurrentHashMap<>();
private final AtomicInteger workerIndexAssigner = new AtomicInteger(0);
private final Map<Long, List<Integer>> workerIdToWorkerIndex = new ConcurrentHashMap<>();
private int workerIndexAssigner;
private final Object lock = new Object();

/**
Expand All @@ -78,6 +79,8 @@ public StageCoordinator(int stageId, int workerNum)
this.isQueued = false;
this.fixedWorkerNum = workerNum;
this.taskQueue = null;
this.downStreamWorkerNum = 0;
this.workerIndexAssigner = 0;
}

/**
Expand All @@ -94,6 +97,8 @@ public StageCoordinator(int stageId, List<Task> pendingTasks)
this.isQueued = true;
this.fixedWorkerNum = 0;
this.taskQueue = new TaskQueue<>(pendingTasks);
this.downStreamWorkerNum = 0;
this.workerIndexAssigner = 0;
}

/**
Expand All @@ -105,7 +110,37 @@ public void addWorker(Worker<CFWorkerInfo> worker)
synchronized (this.lock)
{
this.workerIdToWorkers.put(worker.getWorkerId(), worker);
this.workerIdToWorkerIndex.put(worker.getWorkerId(), this.workerIndexAssigner.getAndIncrement());
if (fixedWorkerNum > 0 && downStreamWorkerNum > 0)
{
if (downStreamWorkerNum > fixedWorkerNum)
{
// one-to-multiple stream
List<Integer> workerIndexs = new ArrayList<>();
int num = downStreamWorkerNum / fixedWorkerNum;
if (downStreamWorkerNum > fixedWorkerNum*num)
{
num++;
}
for (int i = 0; i < num; i++)
{
workerIndexs.add(this.workerIndexAssigner % this.downStreamWorkerNum);
this.workerIndexAssigner++;
}
} else
{
// multiple-to-one stream
List<Integer> workerIndexes = new ArrayList<>();
workerIndexes.add(this.workerIndexAssigner % this.downStreamWorkerNum);
this.workerIndexAssigner++;
this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexes);
}
} else
{
// assume one-to-one stream
List<Integer> workerIndexs = new ArrayList<>(this.workerIndexAssigner);
this.workerIndexAssigner++;
this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexs);
}
this.workers.add(worker);
if (!this.isQueued && this.workers.size() == this.fixedWorkerNum)
{
Expand Down Expand Up @@ -191,14 +226,14 @@ public Worker<CFWorkerInfo> getWorker(long workerId)
* @param workerId the (global) id of the worker
* @return the index of the worker in this stage, or < 0 if the worker is not found
*/
public int getWorkerIndex(long workerId)
public List<Integer> getWorkerIndex(long workerId)
{
Integer index = this.workerIdToWorkerIndex.get(workerId);
List<Integer> index = this.workerIdToWorkerIndex.get(workerId);
if (index != null)
{
return index;
}
return -1;
return null;
}

/**
Expand Down Expand Up @@ -246,4 +281,20 @@ public List<Worker<CFWorkerInfo>> getWorkers()
{
return this.workers;
}

/**
* set down stream workers num
*/
public void setDownStreamWorkerNum(int downStreamWorkerNum)
{
this.downStreamWorkerNum = downStreamWorkerNum;
}

/**
* get worker num of this stage
*/
public int getFixedWorkerNum()
{
return this.fixedWorkerNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,21 @@ public void getDownstreamWorkers(TurboProto.GetDownstreamWorkersRequest request,
}
else
{
int workerIndex = planCoordinator.getStageCoordinator(workerInfo.getStageId()).getWorkerIndex(workerId);
if (workerIndex < 0)
List<Integer> workerIndex = planCoordinator.getStageCoordinator(workerInfo.getStageId()).getWorkerIndex(workerId);
if (workerIndex == null)
{
builder.setErrorCode(ErrorCode.WORKER_COORDINATE_WORKER_NOT_FOUND);
}
else
{
if (workerIndex >= downStreamWorkers.size())
{
builder.setErrorCode(ErrorCode.WORKER_COORDINATE_NO_DOWNSTREAM);
}
else
for (Integer index : workerIndex)
{
// get the worker with the same index in the downstream stage as the downstream worker
builder.setErrorCode(SUCCESS);
builder.addDownstreamWorkers(downStreamWorkers.get(workerIndex).getWorkerInfo().toProto());
if (index < downStreamWorkers.size())
{
builder.addDownstreamWorkers(downStreamWorkers.get(index).getWorkerInfo().toProto());
}
}
}
}
Expand Down

0 comments on commit 119fe31

Please sign in to comment.