diff --git a/pyfgaws/batch/api.py b/pyfgaws/batch/api.py index 972e2f1..c5ad800 100644 --- a/pyfgaws/batch/api.py +++ b/pyfgaws/batch/api.py @@ -263,6 +263,12 @@ def stream(self) -> Optional[str]: """The log stream for the job, if available.""" return self.describe_job()["container"].get("logStreamName") + @property + def group(self) -> Optional[str]: + """The log group for the job, if available.""" + options = self.describe_job()["container"]["logConfiguration"]["options"] # type: ignore + return options.get("awslogs-group") # type: ignore + def submit(self) -> SubmitJobResponseTypeDef: """Submits this job.""" @@ -325,7 +331,7 @@ def get_status(self) -> Optional[Status]: return Status.from_string(self.describe_job()["status"]) def describe_job(self) -> JobDetailTypeDef: - """Gets detauled information about this job.""" + """Gets detailed information about this job.""" jobs_response = self.client.describe_jobs(jobs=[self.job_id]) job_statuses = jobs_response["jobs"] assert len(job_statuses) == 1 diff --git a/pyfgaws/batch/tools.py b/pyfgaws/batch/tools.py index b231a2d..56be205 100644 --- a/pyfgaws/batch/tools.py +++ b/pyfgaws/batch/tools.py @@ -77,6 +77,10 @@ def watch_job( logger.info(f"Watching job with name '{job.name}' and id '{job.job_id}'") if print_logs: _log_it(region_name=region_name, job=job, logger=logger, delay=delay) + if delay is None: + time.sleep(DEFAULT_LOGS_POLLING_INTERVAL) + else: + time.sleep(delay) job.wait_on_complete(delay=delay) end_status = job.get_status() @@ -202,12 +206,15 @@ def _watch_logs( client: Optional[logs.Client] = boto3.client( service_name="logs", region_name=region_name # type: ignore ) - log: Log = Log(client=client, group="/aws/batch/job", stream=job.stream) + log: Log = Log(client=client, group=job.group, stream=job.stream) try: while True: - for line in log: - logger.info(line) + try: + for line in log: + logger.info(line) + except client.exceptions.ResourceNotFoundException: + logger.warning("The log stream has not been created, will try again.") time.sleep(polling_interval) log.reset() if not indefinitely: