Skip to content

Commit

Permalink
[feat] batch monitor can monitor a queue indefinitely (#24)
Browse files Browse the repository at this point in the history
Adding the `--monitor-queue` option to monitor a job queue for new jobs
and only complete when all jobs have completed
  • Loading branch information
nh13 authored Apr 19, 2023
1 parent 74508e6 commit 2dd353a
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions pyfgaws/batch/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,40 +233,55 @@ def monitor(
region_name: Optional[str] = None,
delay: Optional[int] = None,
per_job: bool = False,
monitor_queue: bool = False,
) -> None:
"""Monitor the Batch jobs with given job identifiers or job queue.
If job identifiers are given, then this tool will exit once all jobs have reached a terminal
state. If a job queue is given, then this tool will keep polling for new jobs as long as there
are known jobs that are not in a terminal state. In other words, exit once all known jobs have
completed, and if a queue is given, keep checking to see if any new jobs show up.
Args:
job_ids: the AWS batch job identifier(s)
queue: the name of the AWS batch queue
region_name: the AWS region
delay: the number of seconds to wait after polling for status(es).
per_job: true to monitor per-job status information, otherwise jobs will be summarized by
status
monitor-queue: never exit and keep monitoring for new jobs. Can only be used with the
`--queue` option
"""
logger = logging.getLogger(__name__)
assert (
job_ids is not None or queue is not None
), "Either --job-ids or --queue must be specified"
assert job_ids is None or queue is None, "Both --job-ids or --queue cannot be specified"
assert queue is None or not monitor_queue, "--queue must be used with --monitor-queue"

client: batch.Client = boto3.client(
service_name="batch", region_name=region_name # type: ignore
)

# get the job ids from the queue if necessary
if job_ids is None:
logger.info(f"Retrieving job ids for queue: {queue}")
job_ids = list_jobs(client=client, queue=queue)

# get the list of batch jobs from job ids
jobs: List[BatchJob] = [BatchJob.from_id(client=client, job_id=job_id) for job_id in job_ids]

logger.info(f"Monitoring {len(job_ids):,d} jobs.")
# get the list of batch jobs from job ids. If using a queue, the job ids will be retrieved
# later
jobs: List[BatchJob] = []
if job_ids is not None:
jobs = [BatchJob.from_id(client=client, job_id=job_id) for job_id in job_ids]
logger.info(f"Monitoring {len(job_ids):,d} jobs.")
else:
logger.info(f"Monitoring jobs for queue: {queue}")

while True:
print("\033[H") # clear the screen!
logger.info("Polling job status")

# if monitoring a queue, get the full list of jobs
if queue is not None:
logger.info(f"Retrieving job ids for queue: {queue}")
# add newly seen jobs
for job_id in list_jobs(client=client, queue=queue):
if all(job_id != job.job_id for job in jobs):
jobs.append(BatchJob.from_id(client=client, job_id=job_id))

# print out detailed stats, and update status counts
status_counts: Dict[Optional[Status], int] = {status: 0 for status in Status}
Expand All @@ -286,16 +301,19 @@ def monitor(
if per_job:
print(column_it(table_detail, delimiter=" "))

# we are done polling if we're not monitoring a queue and all the known jobs are complete
all_done = not monitor_queue and num_done == len(jobs)

# print out summary stats
if num_done == len(job_ids) or not per_job:
if all_done or not per_job:
table_summary: List[List[str]] = [["STATUS", "COUNT"]]
for status, count in status_counts.items():
status_name = "Unknown" if status is None else status.name
table_summary.append([status_name, f"{count:,d}"])
print("\033[2J\033[H") # clear the screen!
print(column_it(table_summary, delimiter=" "))

if num_done == len(job_ids):
if all_done:
break

if delay is None:
Expand Down

0 comments on commit 2dd353a

Please sign in to comment.