55from pathlib import Path
66from dataclasses import dataclass
77
8- SPARK_MIRROR_PATH = str (Path ("../spark_mirror" ).resolve ())
9- TPCH_SPARK_PATH = str (Path ("../tpch-spark" ).resolve ())
10-
118
129def bang (cmd , dry_run , stdout = subprocess .PIPE , stderr = subprocess .PIPE ):
1310 cmd = [str (part ) for part in cmd ]
@@ -29,6 +26,8 @@ def must(cmd, dry_run, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
2926@dataclass
3027class Service :
3128 service_args : any
29+ spark_mirror_path : Path
30+ spark_master_ip : str
3231 output_dir : Path
3332 dry_run : bool
3433
@@ -59,22 +58,22 @@ def __enter__(self):
5958 # launch spark master and worker
6059 self ._master = must (
6160 [
62- f"{ SPARK_MIRROR_PATH } /sbin/start-master.sh" ,
63- * ("--host" , "130.207.125.81" ),
61+ f"{ self . spark_mirror_path } /sbin/start-master.sh" ,
62+ * ("--host" , self . spark_master_ip ),
6463 * (
6564 "--properties-file" ,
66- f"{ SPARK_MIRROR_PATH } /conf/spark-dg-config.conf" ,
65+ f"{ self . spark_mirror_path } /conf/spark-dg-config.conf" ,
6766 ),
6867 ],
6968 self .dry_run ,
7069 )
7170 self ._worker = must (
7271 [
73- f"{ SPARK_MIRROR_PATH } /sbin/start-worker.sh" ,
74- "spark://130.207.125.81 :7077" ,
72+ f"{ self . spark_mirror_path } /sbin/start-worker.sh" ,
73+ f "spark://{ self . spark_master_ip } :7077" ,
7574 * (
7675 "--properties-file" ,
77- f"{ SPARK_MIRROR_PATH } /conf/spark-dg-config.conf" ,
76+ f"{ self . spark_mirror_path } /conf/spark-dg-config.conf" ,
7877 ),
7978 ],
8079 self .dry_run ,
@@ -90,9 +89,9 @@ def clean(self):
9089 if self ._service :
9190 self ._service .wait ()
9291 if self ._master :
93- must ([f"{ SPARK_MIRROR_PATH } /sbin/stop-master.sh" ], self .dry_run )
92+ must ([f"{ self . spark_mirror_path } /sbin/stop-master.sh" ], self .dry_run )
9493 if self ._worker :
95- must ([f"{ SPARK_MIRROR_PATH } /sbin/stop-worker.sh" ], self .dry_run )
94+ must ([f"{ self . spark_mirror_path } /sbin/stop-worker.sh" ], self .dry_run )
9695
9796 def __exit__ (self , type , value , traceback ):
9897 self .clean ()
@@ -101,6 +100,9 @@ def __exit__(self, type, value, traceback):
101100@dataclass
102101class Launcher :
103102 launcher_args : any
103+ spark_mirror_path : Path
104+ spark_master_ip : str
105+ tpch_spark_path : Path
104106 output_dir : Path
105107 dry_run : bool
106108
@@ -113,8 +115,9 @@ def launch(self):
113115 [
114116 * ("python3" , "-u" , "-m" , "rpc.launch_tpch_queries" ),
115117 * self .launcher_args ,
116- * ("--spark-mirror-path" , SPARK_MIRROR_PATH ),
117- * ("--tpch-spark-path" , TPCH_SPARK_PATH ),
118+ * ("--spark-master-ip" , self .spark_master_ip ),
119+ * ("--spark-mirror-path" , self .spark_mirror_path ),
120+ * ("--tpch-spark-path" , self .tpch_spark_path ),
118121 ],
119122 self .dry_run ,
120123 stdout = f_out ,
@@ -127,19 +130,27 @@ class Experiment:
127130 name : str
128131 service_args : any
129132 launcher_args : any
130- args : any
131133
132- def run (self ):
133- output_dir = self . args .output_dir / self .name
134+ def run (self , args ):
135+ output_dir = args .output_dir / self .name
134136 if not output_dir .exists ():
135137 output_dir .mkdir (parents = True )
136138
137139 with Service (
138140 service_args = self .service_args ,
141+ spark_mirror_path = args .spark_mirror_path ,
142+ spark_master_ip = args .spark_master_ip ,
139143 output_dir = output_dir ,
140- dry_run = self . args .dry_run ,
144+ dry_run = args .dry_run ,
141145 ) as s :
142- Launcher (self .launcher_args , output_dir , self .args .dry_run ).launch ()
146+ Launcher (
147+ launcher_args = self .launcher_args ,
148+ spark_mirror_path = args .spark_mirror_path ,
149+ spark_master_ip = args .spark_master_ip ,
150+ tpch_spark_path = args .tpch_spark_path ,
151+ output_dir = output_dir ,
152+ dry_run = args .dry_run ,
153+ ).launch ()
143154
144155
145156def main ():
@@ -149,6 +160,24 @@ def main():
149160 action = "store_true" ,
150161 help = "Prints commands that will be executed for each experiment" ,
151162 )
163+ parser .add_argument (
164+ "--spark-mirror-path" ,
165+ type = Path ,
166+ required = True ,
167+ help = "Path to spark-mirror repository" ,
168+ )
169+ parser .add_argument (
170+ "--spark-master-ip" ,
171+ type = str ,
172+ required = True ,
173+ help = "IP address of node running Spark master" ,
174+ )
175+ parser .add_argument (
176+ "--tpch-spark-path" ,
177+ type = Path ,
178+ required = True ,
179+ help = "Path to TPC-H Spark repository" ,
180+ )
152181 parser .add_argument ("--output-dir" , type = Path , default = Path ("exp-output" ))
153182 args = parser .parse_args ()
154183
@@ -183,19 +212,6 @@ def main():
183212 * ("--scheduler_plan_ahead_no_consideration_gap" , 1 ),
184213 ]
185214 experiments = [
186- Experiment (
187- name = "edf-q300-hard" ,
188- service_args = [
189- * base_args ,
190- * edf_args ,
191- * variance_args ,
192- ],
193- launcher_args = [
194- * ("--num_queries" , 300 ),
195- * ("--variable_arrival_rate" , 0.052 ),
196- ],
197- args = args ,
198- ),
199215 Experiment (
200216 name = "dsched-q300-hard" ,
201217 service_args = [
@@ -207,14 +223,13 @@ def main():
207223 * ("--num_queries" , 300 ),
208224 * ("--variable_arrival_rate" , 0.052 ),
209225 ],
210- args = args ,
211226 ),
212227 ]
213228
214229 for i , experiment in enumerate (experiments ):
215230 try :
216231 print (f"=== { experiment .name } ({ i + 1 } /{ len (experiments )} ) ===" )
217- experiment .run ()
232+ experiment .run (args )
218233 print ("=== done ===" )
219234 except Exception as e :
220235 print (traceback .format_exc ())
0 commit comments