diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/CFWorkerInfo.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/CFWorkerInfo.java index cae6d0049..8cd6cd201 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/CFWorkerInfo.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/CFWorkerInfo.java @@ -36,6 +36,7 @@ public class CFWorkerInfo implements WorkerInfo private final int stageId; private final String operatorName; private final List hashValues; + private boolean passSchema; public CFWorkerInfo(String ip, int port, long transId, int stageId, String operatorName, List hashValues) @@ -46,6 +47,7 @@ public CFWorkerInfo(String ip, int port, long transId, int stageId, this.stageId = stageId; this.operatorName = operatorName; this.hashValues = hashValues; + this.passSchema = false; } public CFWorkerInfo(TurboProto.WorkerInfo workerInfo) @@ -56,8 +58,13 @@ public CFWorkerInfo(TurboProto.WorkerInfo workerInfo) this.stageId = workerInfo.getStageId(); this.operatorName = workerInfo.getOperatorName(); this.hashValues = workerInfo.getHashValuesList(); + this.passSchema = false; } + public void setPassSchema(boolean passSchema) { this.passSchema = passSchema; } + + public boolean getPassSchema() { return passSchema; } + public String getIp() { return ip; diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java index a2f7b597c..e272910f1 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java @@ -129,6 +129,10 @@ public void addWorker(Worker worker) } else { // multiple-to-one stream + if (workerIndexAssigner <= downStreamWorkerNum) + { + worker.getWorkerInfo().setPassSchema(true); + } List workerIndexes = new ArrayList<>(); workerIndexes.add(this.workerIndexAssigner % this.downStreamWorkerNum); this.workerIndexAssigner++; @@ -137,6 +141,7 @@ public void addWorker(Worker worker) } else { // assume one-to-one stream + worker.getWorkerInfo().setPassSchema(true); List workerIndexs = new ArrayList<>(this.workerIndexAssigner); this.workerIndexAssigner++; this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexs);