Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jobmanagers/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
},
"pbspro": {
"cmd": "qsub",
"queue_query": "pbs_queue.py",
"queue_query_grace_secs": 300,
"mem_is_vmem": true,
"envs": [ ]
},
Expand Down
65 changes: 65 additions & 0 deletions jobmanagers/pbs_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python
#
# Copyright (c) 2017 10X Genomics, Inc. All rights reserved.
#

"""Queries qstat about a list of jobs and parses the output, returning the list
of jobs which are queued, running, or on hold."""

import subprocess
import sys
import json

# PBS Pro "job states" to be regarded as "alive"
ALIVE = {'Q', 'H', 'W', 'S', 'R', 'E'}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be simply

Suggested change
ALIVE = {'Q', 'H', 'W', 'S', 'R', 'E'}
ALIVE = "QHWSRE"

in will still work on it the same way and, for a set that small, will probably be faster than hash lookup.



def get_ids():
"""Returns the set of jobids to query from standard input."""
ids = []
for jobid in sys.stdin:
ids.extend(jobid.split())
return ids


def mkopts(ids):
"""Gets the command line for qstat."""
if not ids:
sys.exit(0)
return ['qstat', '-x', '-F', 'json', '-f'] + ids
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please format this script with black for consistency with the other scripts.



def execute(cmd):
"""Executes qstat and captures its output."""
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
out, err = proc.communicate()
if proc.returncode:
raise OSError(err)
if not isinstance(out, str):
out = out.decode()
if len(out) < 500:
sys.stderr.write(out)
else:
sys.stderr.write(out[:496] + '...')
return out


def parse_output(out):
"""Parses the JSON-format output of qstat and yields the ids of pending
jobs."""
data = json.loads(out)
for jid, info in data.get('Jobs', {}).items():
if info.get('job_state') in ALIVE:
yield jid


def main():
"""Reads a set of ids from standard input, queries qstat, and outputs the
jobids to standard output for jobs which are in the pending state."""
for jobid in parse_output(execute(mkopts(get_ids()))):
sys.stdout.write(f'{jobid}\n')
return 0


if __name__ == '__main__':
sys.exit(main())
10 changes: 7 additions & 3 deletions jobmanagers/pbspro.template.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
# Setup Instructions
# =============================================================================
#
# 1. Add any other necessary PBSpro arguments such as queue (-q) or account
# (-A). If your system requires a walltime (-l walltime), 24 hours (24:00:00)
# is sufficient. We recommend you do not remove any arguments below or
# 1. Add any other necessary PBSpro arguments such as queue (-q), account
# (-A), project (-P) or volumes (-l storage=).
# If your system requires a walltime (-l walltime), 24 hours (24:00:00)
# is sufficient, but this can often be reduced to 4 hours or less if
# '--maxjobs' is at least 10.
Comment on lines +12 to +13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's probably fine to set 4 hours regardless, this comment is misleading. The wall time being set in this template is for each individual job, not for the overall pipeline run. --maxjobs controls the number of jobs that can be queued simultaneously; increasing it may improve the overall pipeline wall time but would not be expected to improve the runtime for each individual job; more likely in fact would be higher I/O contention which would increase the runtime for these jobs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen any jobs run for more than an hour in cluster mode, but happy to be educated here. My naive assumption was that --maxjobs affected the number (and size) of the chunks when input data is split. If I'm wrong then you can revert the second half of this change.
My reason for reducing this is that on our HPC, and perhaps PBS Pro systems more generally, account quota is "reserved" based on resources X walltime. We often run dozens of cellranger or spaceranger jobs in parallel and so requesting too much walltime can result in all of our quota getting reserved, even though we only get billed based on actual usage. Hogging quota like this can impact the rest of the team.
Perhaps

24 hours (24:00:00) is sufficient but this can be adjusted by comparing against actual usage

In our system, PBS Pro dumps actual usage information to the _stdout files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--maxjobs controls a semaphore limiting the number of jobs which are queued to the cluster at a time. It does not affect the size or number of jobs in total. Together with --jobinterval, the intent is to prevent a single pipeline job from slamming a cluster hard enough to make sysadmins angry at our users.

I have not previously encountered a cluster that charged based on reserved wall time as opposed to actually-used wall time; if you have that then more careful accounting is certainly in order! You can get the actual wall time (as measured by the job itself, so might be a slight underestimate as far as the cluster is concerned) from the _perf json file at the end of the run, though care must be taken when interpreting what that means in anything but the leaf nodes of the graph.

In our internal infrastructure, we usually run on spot instances in AWS, which are prone to involuntary preemption, so we try to aim to keep individual jobs under 1 hour (for most datasets anyway) to avoid loosing too much work when one of them fails this way and needs to be restarted. I think in most cases there will be very few jobs more than 15 minutes, even, depending on the performance of your hardware (the slow stages tend to be mostly I/O limited).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. We only get billed for actual usage, but potential usage gets "reserved" until the jobs finish. 1000 jobs (from multiple parallel runs) all claiming 24 hours walltime can result in all our quota getting reserved, which prevents new jobs from queuing, breaking the pipeline orchestrating the parallel runs.

# We recommend you do not remove any arguments below or
# Martian may not run properly.
#
# 2. Change filename of pbspro.template.example to pbspro.template.
Expand All @@ -18,6 +21,7 @@
# =============================================================================
#
#PBS -N __MRO_JOB_NAME__
#PBS -S /bin/bash
#PBS -V
#PBS -l select=1:ncpus=__MRO_THREADS__
#PBS -l mem=__MRO_MEM_GB__gb
Expand Down