Skip to content

Commit 1dbcbf9

Browse files
committed
allow for group files containing paths to be reused
also added some error handling
1 parent 428c66e commit 1dbcbf9

File tree

4 files changed

+92
-26
lines changed

4 files changed

+92
-26
lines changed

scripts/gufi_distributed.py

Lines changed: 63 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,20 @@ def run_slurm(args, target, cmd):
8484
stdout=subprocess.PIPE,
8585
cwd=os.getcwd())
8686
out, _ = proc.communicate()
87+
88+
if proc.returncode != 0:
89+
return None
90+
8791
return out.split()[-1].decode()
8892

8993
# wait for all jobs (not sbatch) to complete
9094
def handle_slurm_procs(args, jobids):
95+
after = ':'.join(jobid for jobid in jobids if jobid)
96+
9197
# pylint: disable=consider-using-with
92-
wait = subprocess.Popen([args.sbatch, '--nodes=1', '--nodelist={0}'.format(args.hosts[1]),
93-
'--dependency', 'after:' + ':'.join(jobids), '--wait', '/dev/stdin'],
98+
wait = subprocess.Popen([args.sbatch, '--nodes=1', '--nodelist={0}'.format(args.hosts[1])] +
99+
['--dependency', 'after:' + after] if len(after) != 0 else [] +
100+
['--wait', '/dev/stdin'],
94101
stdin=subprocess.PIPE,
95102
stdout=subprocess.DEVNULL, # Python 3.3
96103
shell=True)
@@ -152,36 +159,58 @@ def group_dirs(dirs, splits, sort):
152159

153160
# Step 3
154161
# Write the group to a per-node file and start jobs
162+
# pylint: disable=too-many-locals
155163
def schedule_subtrees(args, dir_count, group_size, groups, subtree_cmd):
156164
targets = args.hosts[0]
157-
print('Splitting {0} paths into {1} groups of max size {2}'.format(dir_count,
158-
len(targets),
159-
group_size))
165+
166+
# pylint: disable=unnecessary-lambda-assignment
167+
gen_filename = lambda idx : os.path.realpath('{0}.{1}'.format(args.group_file_prefix, idx))
160168

161169
procs = []
162-
for i, (group, target) in enumerate(zip(groups, targets)):
163-
count = len(group)
170+
if args.use_existing_group_files:
171+
print('Using existing files')
172+
for i, target in enumerate(targets):
173+
filename = gen_filename(i) # files not mapped to targets are ignored
164174

165-
if count == 0:
166-
break
175+
# not checking for existance of file - if it doesn't exist, -D will fail
167176

168-
print(' Range {0}: {1} path{2} on {3}'.format(i, count, 's' if count != 1 else '', target))
169-
print(' {0} {1}'.format(group[0], group[-1]))
177+
print(' Range {0}: Contents of {1} on {2}'.format(i, filename, target))
170178

171-
if args.dry_run:
172-
continue
179+
if args.dry_run:
180+
continue
181+
182+
cmd = subtree_cmd(args, filename, i, target)
173183

174-
# write group to per-node file
175-
filename = os.path.realpath('{0}.{1}'.format(args.group_file_prefix, i))
176-
with open(filename, 'w', encoding='utf-8') as f:
177-
for path in group:
178-
f.write(path)
179-
f.write('\n')
184+
# run the command to process the subtree
185+
procs += [DISTRIBUTORS[args.distributor][0](args, target, cmd)]
186+
else:
187+
print('Splitting {0} paths into {1} groups of max size {2}'.format(dir_count,
188+
len(targets),
189+
group_size))
190+
for i, (group, target) in enumerate(zip(groups, targets)):
180191

181-
cmd = subtree_cmd(args, filename, i, target)
192+
count = len(group)
182193

183-
# run the command to process the subtree
184-
procs += [DISTRIBUTORS[args.distributor][0](args, target, cmd)]
194+
if count == 0:
195+
break
196+
197+
print(' Range {0}: {1} path{2} on {3}'.format(i, count, 's' if count != 1 else '', target))
198+
print(' {0} {1}'.format(group[0], group[-1]))
199+
200+
if args.dry_run:
201+
continue
202+
203+
# write group to per-node file
204+
filename = gen_filename(i)
205+
with open(filename, 'w', encoding='utf-8') as f:
206+
for path in group:
207+
f.write(path)
208+
f.write('\n')
209+
210+
cmd = subtree_cmd(args, filename, i, target)
211+
212+
# run the command to process the subtree
213+
procs += [DISTRIBUTORS[args.distributor][0](args, target, cmd)]
185214

186215
return procs
187216

@@ -221,8 +250,13 @@ def clock_diff(start, end):
221250
def distribute_work(args, root, schedule_subtree_func, schedule_top_func):
222251
start = clock()
223252

224-
dirs = dirs_at_level(root, args.level)
225-
group_size, groups = group_dirs(dirs, len(args.hosts[0]), args.sort)
253+
if args.use_existing_group_files:
254+
dirs = []
255+
group_size = None
256+
groups = None
257+
else:
258+
dirs = dirs_at_level(root, args.level)
259+
group_size, groups = group_dirs(dirs, len(args.hosts[0]), args.sort)
226260

227261
# launch jobs in parallel
228262
procs = schedule_subtrees(args, len(dirs), group_size, groups,
@@ -286,11 +320,15 @@ def parse_args(name, desc):
286320

287321
parser.add_argument('--dry-run', action='store_true')
288322

289-
parser.add_argument('--group_file_prefix', metavar='path',
323+
parser.add_argument('--group-file-prefix', metavar='path',
290324
type=str,
291325
default='path_list',
292326
help='prefix for file containing paths to be processed by one node')
293327

328+
parser.add_argument('--use-existing-group-files',
329+
action='store_true',
330+
help='use existing group files (up to the number of targets) instead of running find(1)')
331+
294332
parser.add_argument('--sort',
295333
choices=SORT_DIRS.keys(),
296334
default='path',

test/regression/gufi_distributed.expected

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ prefix/unusual#? directory ,/unusual, name?#
105105
# Diff original index results against the trace files
106106
$ diff <(gufi_query -d " " -S "SELECT rpath(sname, sroll) FROM vrsummary;" -E "SELECT rpath(sname, sroll) || '/' || name FROM vrpentries;" "prefix" | sort | sed 's/search\///g') <(cat traces.0.0 traces.1.0 traces.2.0 traces.3.0 traces.0 | awk -F'|' '{ print $1 }' | sort)
107107

108+
# Use existing group files (path_list.4 does not exist)
109+
$ gufi_dir2index_distributed --sbatch "sbatch" --gufi_dir2index "gufi_dir2index" --use-existing-group-files slurm "hostfile" 1 "prefix" "search2" 2>/dev/null
110+
Using existing files
111+
Range 0: Contents of path_list.0 on localhost
112+
Range 1: Contents of path_list.1 on 127.0.0.1
113+
Range 2: Contents of path_list.2 on localhost
114+
Range 3: Contents of path_list.3 on 127.0.0.1
115+
Range 4: Contents of path_list.4 on localhost
116+
Process upper directories up to and including level 0 on 127.0.0.1
117+
Waiting for slurm jobs to complete
118+
108119
#####################################
109120

110121
#####################################
@@ -209,4 +220,15 @@ prefix/unusual#? directory ,/unusual, name?#
209220
# Diff original index results against the trace files
210221
$ diff <(gufi_query -d " " -S "SELECT rpath(sname, sroll) FROM vrsummary;" -E "SELECT rpath(sname, sroll) || '/' || name FROM vrpentries;" "prefix" | sort | sed 's/search\///g') <(cat traces.0.0 traces.1.0 traces.2.0 traces.3.0 traces.0 | awk -F'|' '{ print $1 }' | sort)
211222

223+
# Use existing group files (path_list.4 does not exist)
224+
$ gufi_dir2index_distributed --ssh "ssh" --gufi_dir2index "gufi_dir2index" --use-existing-group-files ssh "hostfile" 1 "prefix" "search2" 2>/dev/null | tail -n 9
225+
Using existing files
226+
Range 0: Contents of path_list.0 on localhost
227+
Range 1: Contents of path_list.1 on 127.0.0.1
228+
Range 2: Contents of path_list.2 on localhost
229+
Range 3: Contents of path_list.3 on 127.0.0.1
230+
Range 4: Contents of path_list.4 on localhost
231+
Process upper directories up to and including level 0 on 127.0.0.1
232+
Waiting for ssh jobs to complete
233+
212234
#####################################

test/regression/gufi_distributed.sh.in

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ echo "# Diff original index results against the trace files"
210210
# shellcheck disable=SC2145,SC2153
211211
run_no_sort "@DIFF@ <(${GUFI_QUERY} -d \" \" -S \"SELECT rpath(sname, sroll) FROM vrsummary;\" -E \"SELECT rpath(sname, sroll) || '/' || name FROM vrpentries;\" \"${INDEXROOT}\" | sort | @SED@ 's/${SEARCH//\//\\/}\\///g') <(cat ${traces[@]} | @AWK@ -F'${DELIM}' '{ print \$1 }' | sort)"
212212

213+
echo "# Use existing group files (path_list.4 does not exist)"
214+
run_no_sort "${GUFI_DIR2INDEX_DISTRIBUTED} --sbatch \"${SBATCH_FAKE}\" --gufi_dir2index \"${GUFI_DIR2INDEX}\" --use-existing-group-files slurm \"${HOSTFILE}\" ${DISTRIBUTED_LEVEL} \"${SRCDIR}\" \"${SEARCH2}\" 2>/dev/null"
215+
213216
# cleanup
214217
cleanup
215218
echo "#####################################"
@@ -249,6 +252,9 @@ run_sort "cat ${traces[@]} | @AWK@ -F'${DELIM}' '{ print \$1 }'"
249252
echo "# Diff original index results against the trace files"
250253
# shellcheck disable=SC2145,SC2153
251254
run_no_sort "@DIFF@ <(${GUFI_QUERY} -d \" \" -S \"SELECT rpath(sname, sroll) FROM vrsummary;\" -E \"SELECT rpath(sname, sroll) || '/' || name FROM vrpentries;\" \"${INDEXROOT}\" | sort | @SED@ 's/${SEARCH//\//\\/}\\///g') <(cat ${traces[@]} | @AWK@ -F'${DELIM}' '{ print \$1 }' | sort)"
255+
256+
echo "# Use existing group files (path_list.4 does not exist)"
257+
run_no_sort "${GUFI_DIR2INDEX_DISTRIBUTED} --ssh \"${SSH_FAKE}\" --gufi_dir2index \"${GUFI_DIR2INDEX}\" --use-existing-group-files ssh \"${HOSTFILE}\" ${DISTRIBUTED_LEVEL} \"${SRCDIR}\" \"${SEARCH2}\" 2>/dev/null | tail -n 9"
252258
echo "#####################################"
253259
) 2>&1 | remove_distributed_output | tee "${OUTPUT}"
254260

test/regression/sbatch.fake.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,6 @@ then
108108
$(</dev/stdin)
109109
else
110110
"$@"
111-
fi > "${out}"
111+
fi > "${out}" || true # always return a job id
112112

113113
echo "${jobid}"

0 commit comments

Comments
 (0)