Skip to content

Commit bcde09a

Browse files
authored
Merge pull request #183 from marbl/limit-concurrent-partitions
2 parents 0a23225 + 2de0840 commit bcde09a

File tree

1 file changed

+14
-4
lines changed

1 file changed

+14
-4
lines changed

parsnp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ from pathlib import Path
2424

2525
from tqdm import tqdm
2626

27-
__version__ = "2.1.4"
27+
__version__ = "2.1.5"
2828
reroot_tree = True #use --midpoint-reroot
2929
random_seeded = random.Random(42)
3030

@@ -540,6 +540,11 @@ def parse_args():
540540
type = int,
541541
default = 1,
542542
help = "Number of threads to use")
543+
misc_args.add_argument(
544+
"--max-concurrent-partitions",
545+
type = int,
546+
default = None,
547+
help = "Maximum number of partitions to run in parallel. Unlimited by default")
543548
misc_args.add_argument(
544549
"--force-overwrite",
545550
"--fo",
@@ -1191,6 +1196,7 @@ if __name__ == "__main__":
11911196
curated = args.curated
11921197
aligner = ALIGNER_TO_IDX[args.alignment_program.lower()]
11931198
threads = args.threads
1199+
max_concurrent_partitions = args.max_concurrent_partitions if args.max_concurrent_partitions else numpy.inf
11941200
unaligned = "0" if not args.unaligned else "1"
11951201
# mincluster = args.min_cluster_size # (using args directly)
11961202
diagdiff = args.max_diagonal_difference
@@ -1507,7 +1513,7 @@ SETTINGS:
15071513
#initiate parallelPhiPack tasks
15081514
#3)run parsnp (cores, grid?)
15091515
if args.no_partition or len(finalfiles) < 2*args.min_partition_size:
1510-
if len(finalfiles) < 2*args.min_partition_size:
1516+
if not args.no_partition and len(finalfiles) < 2*args.min_partition_size:
15111517
logger.info(f"Too few genomes to run partitions of size >{args.min_partition_size}. Running all genomes at once.")
15121518
# Editing the ini file to be used for parnsp-aligner (which is different from parsnip as a mumi finder)
15131519
if not inifile_exists:
@@ -1562,6 +1568,9 @@ SETTINGS:
15621568
for partition_list_file in os.listdir(partition_list_dir):
15631569
chunk_labels.append(chunk_label_parser.search(partition_list_file).groups()[0])
15641570

1571+
num_partitions = len(chunk_labels)
1572+
concurrent_partition_count = min(args.threads, max_concurrent_partitions, num_partitions)
1573+
threads_per_partition = max(1, args.threads // concurrent_partition_count)
15651574
chunk_output_dirs = []
15661575
for cl in chunk_labels:
15671576
chunk_output_dir = f"{partition_output_dir}/{partition.CHUNK_PREFIX}-{cl}-out"
@@ -1571,11 +1580,12 @@ SETTINGS:
15711580
with open(f"{partition_list_dir}/{partition.CHUNK_PREFIX}-{cl}.txt", 'r') as partition_file:
15721581
partition_seq_files = [f for f in partition_file.read().splitlines()]
15731582

1574-
write_inifile_2(inifiled, chunk_output_dir, unaligned, args, auto_ref, query, partition_seq_files, ref, max(1, args.threads // len(chunk_labels)))
1583+
write_inifile_2(inifiled, chunk_output_dir, unaligned, args, auto_ref, query, partition_seq_files, ref, max(1, threads_per_partition))
15751584

15761585
logger.info("Running partitions...")
1586+
logger.debug(f"Number of concurrent partitions is capped at {concurrent_partition_count}, with {threads_per_partition} threads for each partition.")
15771587
good_chunks = set(chunk_labels)
1578-
with Pool(args.threads) as pool:
1588+
with Pool(concurrent_partition_count) as pool:
15791589
return_codes = tqdm(
15801590
pool.imap(run_parsnp_aligner, chunk_output_dirs, chunksize=1),
15811591
total=len(chunk_output_dirs),

0 commit comments

Comments
 (0)