Skip to content

Commit 69805f9

Browse files
committed
one worker write to one port of down stream worker
1 parent 326949d commit 69805f9

File tree

4 files changed

+18
-11
lines changed

4 files changed

+18
-11
lines changed

pixels-common/src/main/java/io/pixelsdb/pixels/common/task/Worker.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
public class Worker<WI extends WorkerInfo>
3232
{
3333
private final long workerId;
34+
private int workerPortIndex;
3435
private final Lease lease;
3536
private final WI workerInfo;
3637
private boolean terminated;
3738

38-
public Worker(long workerId, Lease lease, WI workerInfo)
39+
public Worker(long workerId, Lease lease, int workerPortIndex, WI workerInfo)
3940
{
4041
this.workerId = workerId;
42+
this.workerPortIndex = workerPortIndex;
4143
this.lease = requireNonNull(lease, "lease is null");
4244
this.workerInfo = requireNonNull(workerInfo, "worker info is null");
4345
this.terminated = false;
@@ -48,6 +50,10 @@ public long getWorkerId()
4850
return workerId;
4951
}
5052

53+
public void setWorkerPortIndex(int index) { this.workerPortIndex = index; }
54+
55+
public int getWorkerPortIndex() { return workerPortIndex; }
56+
5157
public WI getWorkerInfo()
5258
{
5359
return workerInfo;

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public void addWorker(Worker<CFWorkerInfo> worker)
115115
if (downStreamWorkerNum > fixedWorkerNum)
116116
{
117117
// one-to-multiple stream
118+
// TODO: find a query to test
118119
List<Integer> workerIndexs = new ArrayList<>();
119120
int num = downStreamWorkerNum / fixedWorkerNum;
120121
if (downStreamWorkerNum > fixedWorkerNum*num)
@@ -129,10 +130,11 @@ public void addWorker(Worker<CFWorkerInfo> worker)
129130
} else
130131
{
131132
// multiple-to-one stream
132-
if (workerIndexAssigner < downStreamWorkerNum)
133-
{
134-
worker.getWorkerInfo().setPassSchema(true);
135-
}
133+
// if (workerIndexAssigner < downStreamWorkerNum)
134+
// {
135+
// worker.getWorkerInfo().setPassSchema(true);
136+
// }
137+
worker.setWorkerPortIndex(this.workerIndexAssigner / this.downStreamWorkerNum);
136138
List<Integer> workerIndexes = new ArrayList<>();
137139
workerIndexes.add(this.workerIndexAssigner % this.downStreamWorkerNum);
138140
this.workerIndexAssigner++;
@@ -141,7 +143,7 @@ public void addWorker(Worker<CFWorkerInfo> worker)
141143
} else
142144
{
143145
// assume one-to-one stream
144-
worker.getWorkerInfo().setPassSchema(true);
146+
worker.setWorkerPortIndex(0);
145147
List<Integer> workerIndexs = new ArrayList<>(this.workerIndexAssigner);
146148
this.workerIndexAssigner++;
147149
this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexs);

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,8 @@ public Worker<CFWorkerInfo> registerWorker(CFWorkerInfo workerInfo) throws Worke
7474
{
7575
throw new WorkerCoordinateException("failed to register worker, error code=" + response.getErrorCode());
7676
}
77-
workerInfo.setPassSchema(response.getPassSchema());
7877
return new Worker<>(response.getWorkerId(),
79-
new Lease(response.getLeasePeriodMs(), response.getLeaseStartTimeMs()), workerInfo);
78+
new Lease(response.getLeasePeriodMs(), response.getLeaseStartTimeMs()), response.getWorkerPortIndex(), workerInfo);
8079
}
8180

8281
/**

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void registerWorker(TurboProto.RegisterWorkerRequest request,
5757
CFWorkerInfo workerInfo = new CFWorkerInfo(request.getWorkerInfo());
5858
Lease lease = new Lease(WorkerLeasePeriodMs, System.currentTimeMillis());
5959
long workerId = CFWorkerManager.Instance().createWorkerId();
60-
Worker<CFWorkerInfo> worker = new Worker<>(workerId, lease, workerInfo);
60+
Worker<CFWorkerInfo> worker = new Worker<>(workerId, lease, 0, workerInfo);
6161
CFWorkerManager.Instance().registerCFWorker(worker);
6262
log.debug("register worker, local address: " + workerInfo.getIp() + ", transId: " + workerInfo.getTransId()
6363
+ ", stageId: " + workerInfo.getStageId() + ", workerId: " + workerId);
@@ -67,8 +67,8 @@ public void registerWorker(TurboProto.RegisterWorkerRequest request,
6767
requireNonNull(stageCoordinator, "stage coordinator is not found");
6868
stageCoordinator.addWorker(worker);
6969
TurboProto.RegisterWorkerResponse response = TurboProto.RegisterWorkerResponse.newBuilder()
70-
.setErrorCode(SUCCESS).setWorkerId(workerId).setLeasePeriodMs(lease.getPeriodMs())
71-
.setLeaseStartTimeMs(lease.getStartTimeMs()).setPassSchema(worker.getWorkerInfo().getPassSchema()).build();
70+
.setErrorCode(SUCCESS).setWorkerId(workerId).setWorkerPortIndex(worker.getWorkerPortIndex()).setLeasePeriodMs(lease.getPeriodMs())
71+
.setLeaseStartTimeMs(lease.getStartTimeMs()).build();
7272
responseObserver.onNext(response);
7373
responseObserver.onCompleted();
7474
}

0 commit comments

Comments
 (0)