Skip to content

Commit 598bf72

Browse files
committed
modify map between workers from two stages
1 parent f82f85d commit 598bf72

File tree

3 files changed

+69
-14
lines changed

3 files changed

+69
-14
lines changed

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/PlanCoordinator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public void addStageCoordinator(StageCoordinator stageCoordinator, StageDependen
6666
checkArgument(stageCoordinator.getStageId() == stageDependency.getCurrentStageId(),
6767
"the stageDependency does not belong to the stageCoordinator");
6868
this.stageCoordinators.put(stageId, stageCoordinator);
69+
if (stageDependency.getDownStreamStageId() != -1)
70+
{
71+
StageCoordinator parentStageCoordinator = this.stageCoordinators.get(stageDependency.getDownStreamStageId());
72+
stageCoordinator.setDownStreamWorkerNum(parentStageCoordinator.getFixedWorkerNum());
73+
}
6974
this.stageDependencies.put(stageId, stageDependency);
7075
}
7176

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,14 @@ public class StageCoordinator
5353

5454
private final int stageId;
5555
private final boolean isQueued;
56+
private int downStreamWorkerNum;
5657
private final int fixedWorkerNum;
5758
private final TaskQueue<Task> taskQueue;
5859
private final Map<Long, Worker<CFWorkerInfo>> workerIdToWorkers = new ConcurrentHashMap<>();
5960
// this.workers is used for dependency checking, no concurrent reads and writes
6061
private final List<Worker<CFWorkerInfo>> workers = new ArrayList<>();
61-
private final Map<Long, Integer> workerIdToWorkerIndex = new ConcurrentHashMap<>();
62-
private final AtomicInteger workerIndexAssigner = new AtomicInteger(0);
62+
private final Map<Long, List<Integer>> workerIdToWorkerIndex = new ConcurrentHashMap<>();
63+
private int workerIndexAssigner;
6364
private final Object lock = new Object();
6465

6566
/**
@@ -78,6 +79,8 @@ public StageCoordinator(int stageId, int workerNum)
7879
this.isQueued = false;
7980
this.fixedWorkerNum = workerNum;
8081
this.taskQueue = null;
82+
this.downStreamWorkerNum = 0;
83+
this.workerIndexAssigner = 0;
8184
}
8285

8386
/**
@@ -94,6 +97,8 @@ public StageCoordinator(int stageId, List<Task> pendingTasks)
9497
this.isQueued = true;
9598
this.fixedWorkerNum = 0;
9699
this.taskQueue = new TaskQueue<>(pendingTasks);
100+
this.downStreamWorkerNum = 0;
101+
this.workerIndexAssigner = 0;
97102
}
98103

99104
/**
@@ -105,7 +110,37 @@ public void addWorker(Worker<CFWorkerInfo> worker)
105110
synchronized (this.lock)
106111
{
107112
this.workerIdToWorkers.put(worker.getWorkerId(), worker);
108-
this.workerIdToWorkerIndex.put(worker.getWorkerId(), this.workerIndexAssigner.getAndIncrement());
113+
if (fixedWorkerNum > 0 && downStreamWorkerNum > 0)
114+
{
115+
if (downStreamWorkerNum > fixedWorkerNum)
116+
{
117+
// one-to-multiple stream
118+
List<Integer> workerIndexs = new ArrayList<>();
119+
int num = downStreamWorkerNum / fixedWorkerNum;
120+
if (downStreamWorkerNum > fixedWorkerNum*num)
121+
{
122+
num++;
123+
}
124+
for (int i = 0; i < num; i++)
125+
{
126+
workerIndexs.add(this.workerIndexAssigner % this.downStreamWorkerNum);
127+
this.workerIndexAssigner++;
128+
}
129+
} else
130+
{
131+
// multiple-to-one stream
132+
List<Integer> workerIndexes = new ArrayList<>();
133+
workerIndexes.add(this.workerIndexAssigner % this.downStreamWorkerNum);
134+
this.workerIndexAssigner++;
135+
this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexes);
136+
}
137+
} else
138+
{
139+
// assume one-to-one stream
140+
List<Integer> workerIndexs = new ArrayList<>(this.workerIndexAssigner);
141+
this.workerIndexAssigner++;
142+
this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexs);
143+
}
109144
this.workers.add(worker);
110145
if (!this.isQueued && this.workers.size() == this.fixedWorkerNum)
111146
{
@@ -191,14 +226,14 @@ public Worker<CFWorkerInfo> getWorker(long workerId)
191226
* @param workerId the (global) id of the worker
192227
* @return the index of the worker in this stage, or < 0 if the worker is not found
193228
*/
194-
public int getWorkerIndex(long workerId)
229+
public List<Integer> getWorkerIndex(long workerId)
195230
{
196-
Integer index = this.workerIdToWorkerIndex.get(workerId);
231+
List<Integer> index = this.workerIdToWorkerIndex.get(workerId);
197232
if (index != null)
198233
{
199234
return index;
200235
}
201-
return -1;
236+
return null;
202237
}
203238

204239
/**
@@ -246,4 +281,20 @@ public List<Worker<CFWorkerInfo>> getWorkers()
246281
{
247282
return this.workers;
248283
}
284+
285+
/**
286+
* set down stream workers num
287+
*/
288+
public void setDownStreamWorkerNum(int downStreamWorkerNum)
289+
{
290+
this.downStreamWorkerNum = downStreamWorkerNum;
291+
}
292+
293+
/**
294+
* get worker num of this stage
295+
*/
296+
public int getFixedWorkerNum()
297+
{
298+
return this.fixedWorkerNum;
299+
}
249300
}

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,22 +99,21 @@ public void getDownstreamWorkers(TurboProto.GetDownstreamWorkersRequest request,
9999
}
100100
else
101101
{
102-
int workerIndex = planCoordinator.getStageCoordinator(workerInfo.getStageId()).getWorkerIndex(workerId);
103-
if (workerIndex < 0)
102+
List<Integer> workerIndex = planCoordinator.getStageCoordinator(workerInfo.getStageId()).getWorkerIndex(workerId);
103+
if (workerIndex == null)
104104
{
105105
builder.setErrorCode(ErrorCode.WORKER_COORDINATE_WORKER_NOT_FOUND);
106106
}
107107
else
108108
{
109-
if (workerIndex >= downStreamWorkers.size())
110-
{
111-
builder.setErrorCode(ErrorCode.WORKER_COORDINATE_NO_DOWNSTREAM);
112-
}
113-
else
109+
for (Integer index : workerIndex)
114110
{
115111
// get the worker with the same index in the downstream stage as the downstream worker
116112
builder.setErrorCode(SUCCESS);
117-
builder.addDownstreamWorkers(downStreamWorkers.get(workerIndex).getWorkerInfo().toProto());
113+
if (index < downStreamWorkers.size())
114+
{
115+
builder.addDownstreamWorkers(downStreamWorkers.get(index).getWorkerInfo().toProto());
116+
}
118117
}
119118
}
120119
}

0 commit comments

Comments
 (0)