1- import datetime
21import logging
32
43from bulldozer .bulldozer_worker .infra import Infra , InfraException
@@ -74,7 +73,7 @@ class RunsetState:
7473
7574
7675class TaskReason :
77- COMPLETED = "task completed successfully"
76+ COMPLETED = "Task completed successfully"
7877
7978
8079class Base :
@@ -237,7 +236,7 @@ def check_destroy_conditions(self):
237236 _ , runner = self .bulldozer_cl .get_runner (runner_id )
238237 # check for destroy flag set
239238 if runner .get ("destroy" ):
240- raise DestroyFlagSet ("Destroy flag is set" )
239+ raise DestroyFlagSet ("Aborted - Destroy flag is set" )
241240
242241 destroy_conditions = runner .get ("destroy_conditions" , {})
243242 # check budget condition
@@ -250,7 +249,8 @@ def check_destroy_conditions(self):
250249 "current (estimated): %f" , runner_id , max_budget , cost )
251250 if max_budget < cost :
252251 raise BudgetExceeded (
253- f"Budget exceeded max: { max_budget } , current: { cost } " )
252+ f"Aborted - Budget exceeded max: { max_budget } , "
253+ f"current: { cost } " )
254254 max_duration = destroy_conditions .get ("max_duration" )
255255 if max_duration :
256256 LOG .info ("checking for max duration %d for runner %s" ,
@@ -263,7 +263,7 @@ def check_destroy_conditions(self):
263263 runner_id , now , threshold )
264264 if now > threshold :
265265 raise TimeoutConditionExceeded (
266- f"Duration exceeded: current time: { now } "
266+ f"Aborted - Duration exceeded: current time: { now } "
267267 f"threshold: { threshold } "
268268 )
269269
@@ -366,15 +366,18 @@ def _exec(self):
366366 }
367367 )
368368 infra .destroy ()
369-
369+ self .bulldozer_cl .update_runner (
370+ runner_id ,
371+ state = TaskState .DESTROYED ,
372+ destroyed_at = utcnow_timestamp ()
373+ )
370374 except Exception as exc :
371375 # basically exception
372376 LOG .exception ("Cleanup problem: %s" , str (exc ))
373- finally :
374377 self .bulldozer_cl .update_runner (
375378 runner_id ,
376- state = TaskState .ERROR ,
377- destroyed_at = utcnow_timestamp ())
379+ state = TaskState .ERROR )
380+ finally :
378381 self .update_reason ()
379382 self .message .ack ()
380383
@@ -402,6 +405,11 @@ def _exec(self):
402405 tags = runner .get ("tags" , dict ())
403406 # opens ingress ports for runner instance
404407 open_ingress = runner .get ("open_ingress" , False )
408+ _ , runset = self .bulldozer_cl .runset_get (runner ["runset_id" ])
409+ spot_settings = runset .get ("spot_settings" )
410+ spot_price = None
411+ if spot_settings :
412+ spot_price = spot_settings .get ("spot_price" )
405413
406414 if hp is not None and isinstance (hp , dict ):
407415 for k , v in hp .items ():
@@ -413,7 +421,7 @@ def _exec(self):
413421 state = TaskState .STARTING )
414422 _ , cloud_account = self .rest_cl .cloud_account_get (
415423 cloud_account_id , True )
416- # TODO: get cloud type form cloud account to support multi-cloud
424+ # TODO: get cloud type from cloud account to support multi-cloud
417425 # Now only AWS is supported
418426 c_type = "AWS"
419427 if not self .body .get ("type" ):
@@ -448,6 +456,7 @@ def _exec(self):
448456 key = None ,
449457 tags = tags ,
450458 open_ingress = open_ingress ,
459+ spot_price = spot_price
451460 )
452461
453462 LOG .info ("Created runner id=%s, instance=%s, ip=%s" ,
@@ -494,7 +503,7 @@ def _exec(self):
494503 current_time , wait_time )
495504 if current_time > wait_time :
496505 # TODO: Do we need automatically destroy env?
497- raise ArceeWaitException ("Arcee wait exceeded" )
506+ raise ArceeWaitException ("Aborted - Arcee wait exceeded" )
498507 else :
499508 self .update_run_info (run_id , runner )
500509 self .bulldozer_cl .update_runner (
0 commit comments