Skip to content

Commit

Permalink
Update postrun.json when job fails (#353)
Browse files Browse the repository at this point in the history
* Update postrun.json for failed runs

* Update postrun on idle termination

* Remove unnecessary datetime
  • Loading branch information
alexander-veit authored Oct 8, 2021
1 parent 51677a8 commit bb84597
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -586,5 +586,5 @@ Retrieve a cost estimate for a specific job. This will be available as soon as t
update_tsv This flag specifies wether to update the cost in the tsv file that
stores metrics information on the S3 bucket

force Return the estimate, even if the actual cost is available
force Return the estimate, even if the actual cost is available

2 changes: 1 addition & 1 deletion tibanna/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Version information."""

# The following line *must* be the last in the module, exactly as formatted:
__version__ = "1.7.0"
__version__ = "1.7.1"
6 changes: 3 additions & 3 deletions tibanna/awsem.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ def update(self, **kwargs):
setattr(self, k, v)

@property
def start_time_as_str(self):
def start_time_as_datetime(self):
if not self.start_time:
return ''
return None
else:
return datetime.strptime(self.start_time, AWSEM_TIME_STAMP_FORMAT)

Expand Down Expand Up @@ -311,7 +311,7 @@ def create_Output(self, Output):
self.Output = AwsemPostRunJsonOutput(**Output)

@property
def end_time_as_str(self):
def end_time_as_datetime(self):
try:
return datetime.strptime(self.end_time, AWSEM_TIME_STAMP_FORMAT)
except:
Expand Down
16 changes: 12 additions & 4 deletions tibanna/check_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
JobAbortedException,
AWSEMErrorHandler
)
from .vars import PARSE_AWSEM_TIME
from .vars import PARSE_AWSEM_TIME, AWSEM_TIME_STAMP_FORMAT
from .core import API


Expand Down Expand Up @@ -66,6 +66,7 @@ def run(self):
if start_time + timedelta(minutes=10) < now:
try:
boto3.client('ec2').terminate_instances(InstanceIds=[instance_id])
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json)
except:
pass # most likely already terminated or never initiated
raise EC2IdleException("Failed to find jobid %s, ec2 is not initializing for too long. Terminating the instance." % jobid)
Expand Down Expand Up @@ -108,16 +109,19 @@ def run(self):
res = boto3.client('ec2').describe_instances(InstanceIds=[instance_id])
except Exception as e:
if 'InvalidInstanceID.NotFound' in str(e):
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json) # We need to record the end time
raise EC2UnintendedTerminationException("EC2 is no longer found for job %s - please rerun." % jobid)
else:
raise e
if not res['Reservations']:
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json) # We need to record the end time
raise EC2UnintendedTerminationException("EC2 is no longer found for job %s - please rerun." % jobid)
else:
ec2_state = res['Reservations'][0]['Instances'][0]['State']['Name']
if ec2_state in ['stopped', 'shutting-down', 'terminated']:
errmsg = "EC2 is terminated unintendedly for job %s - please rerun." % jobid
logger.error(errmsg)
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json) # We need to record the end time
raise EC2UnintendedTerminationException(errmsg)

# check CPU utilization for the past hour
Expand All @@ -144,6 +148,9 @@ def terminate_idle_instance(self, jobid, instance_id, cpu, ebs_read):
if not ebs_read or ebs_read < 1000: # minimum 1kb
# in case the instance is copying files using <1% cpu for more than 1hr, do not terminate it.
try:
bucket_name = self.input_json['config']['log_bucket']
public_postrun_json = self.input_json['config'].get('public_postrun_json', False)
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json) # We need to record the end time
boto3.client('ec2').terminate_instances(InstanceIds=[instance_id])
errmsg = (
"Nothing has been running for the past hour for job %s,"
Expand All @@ -166,6 +173,7 @@ def handle_postrun_json(self, bucket_name, jobid, input_json, public_read=False)
postrunjsoncontent = json.loads(read_s3(bucket_name, postrunjson))
prj = AwsemPostRunJson(**postrunjsoncontent)
prj.Job.update(instance_id=input_json['config'].get('instance_id', ''))
prj.Job.update(end_time=datetime.now(tzutc()).strftime(AWSEM_TIME_STAMP_FORMAT))
self.handle_metrics(prj)
logger.debug("inside funtion handle_postrun_json")
logger.debug("content=\n" + json.dumps(prj.as_dict(), indent=4))
Expand Down Expand Up @@ -199,14 +207,14 @@ def handle_metrics(self, prj):
try:
resources = self.TibannaResource(prj.Job.instance_id,
prj.Job.filesystem,
prj.Job.start_time_as_str,
prj.Job.end_time_as_str or datetime.now())
prj.Job.start_time_as_datetime,
prj.Job.end_time_as_datetime)

except Exception as e:
raise MetricRetrievalException("error getting metrics: %s" % str(e))
prj.Job.update(Metrics=resources.as_dict())
self.API().plot_metrics(prj.Job.JOBID, directory='/tmp/tibanna_metrics/',
force_upload=True, open_browser=False,
endtime=prj.Job.end_time_as_str or datetime.now(),
endtime=prj.Job.end_time_as_datetime,
filesystem=prj.Job.filesystem,
instance_id=prj.Job.instance_id)
8 changes: 4 additions & 4 deletions tibanna/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ def plot_metrics(self, job_id, sfn=None, directory='.', open_browser=True, force
if postrunjsonstr:
postrunjson = AwsemPostRunJson(**json.loads(postrunjsonstr))
job = postrunjson.Job
if hasattr(job, 'end_time_as_str') and job.end_time_as_str:
if hasattr(job, 'end_time_as_datetime') and job.end_time_as_datetime:
job_complete = True
else:
job_complete = False
Expand Down Expand Up @@ -921,10 +921,10 @@ def plot_metrics(self, job_id, sfn=None, directory='.', open_browser=True, force
webbrowser.open(METRICS_URL(log_bucket, job_id))
return None
# report not already on s3 with a lock
starttime = job.start_time_as_str
starttime = job.start_time_as_datetime
if not endtime:
if hasattr(job, 'end_time_as_str') and job.end_time_as_str:
endtime = job.end_time_as_str
if hasattr(job, 'end_time_as_datetime') and job.end_time_as_datetime:
endtime = job.end_time_as_datetime
else:
endtime = datetime.utcnow()
if hasattr(job, 'filesystem') and job.filesystem:
Expand Down

0 comments on commit bb84597

Please sign in to comment.