37
37
from fireworks .utilities .fw_utilities import get_fw_logger
38
38
from fireworks .utilities .fw_serializers import recursive_dict
39
39
40
-
41
40
__author__ = 'Anubhav Jain'
42
41
__copyright__ = 'Copyright 2013, The Materials Project'
43
42
__version__ = '0.1'
44
43
__maintainer__ = 'Anubhav Jain'
45
44
46
45
__date__ = 'Jan 30, 2013'
47
46
47
+
48
48
# TODO: lots of duplication reduction and cleanup possible
49
49
50
50
@@ -86,7 +86,7 @@ def __enter__(self):
86
86
# could not acquire lock b/c WF is already locked for writing
87
87
while not links_dict :
88
88
ctr += 1
89
- time_incr = ctr / 10.0 + random .random ()/ 100.0
89
+ time_incr = ctr / 10.0 + random .random () / 100.0
90
90
time .sleep (time_incr ) # wait a bit for lock to free up
91
91
waiting_time += time_incr
92
92
if waiting_time > self .expire_secs : # too much time waiting, expire lock
@@ -225,15 +225,15 @@ def update_spec(self, fw_ids, spec_document, mongo=False):
225
225
if mongo :
226
226
mod_spec = spec_document
227
227
else :
228
- mod_spec = {"$set" : {("spec." + k ): v for k , v in spec_document .items ()} }
228
+ mod_spec = {"$set" : {("spec." + k ): v for k , v in spec_document .items ()}}
229
229
230
230
allowed_states = ["READY" , "WAITING" , "FIZZLED" , "DEFUSED" , "PAUSED" ]
231
231
self .fireworks .update_many ({'fw_id' : {"$in" : fw_ids },
232
232
'state' : {"$in" : allowed_states }}, mod_spec )
233
233
for fw in self .fireworks .find ({'fw_id' : {"$in" : fw_ids }, 'state' : {"$nin" : allowed_states }},
234
234
{"fw_id" : 1 , "state" : 1 }):
235
235
self .m_logger .warning ("Cannot update spec of fw_id: {} with state: {}. "
236
- "Try rerunning first" .format (fw ['fw_id' ], fw ['state' ]))
236
+ "Try rerunning first" .format (fw ['fw_id' ], fw ['state' ]))
237
237
238
238
@classmethod
239
239
def from_dict (cls , d ):
@@ -246,11 +246,12 @@ def from_dict(cls, d):
246
246
user_indices = d .get ('user_indices' , [])
247
247
wf_user_indices = d .get ('wf_user_indices' , [])
248
248
ssl = d .get ('ssl' , False )
249
- ssl_ca_certs = d .get ('ssl_ca_certs' , d .get ('ssl_ca_file' , None )) # ssl_ca_file was the old notation for FWS < 1.5.5
249
+ ssl_ca_certs = d .get ('ssl_ca_certs' ,
250
+ d .get ('ssl_ca_file' , None )) # ssl_ca_file was the old notation for FWS < 1.5.5
250
251
ssl_certfile = d .get ('ssl_certfile' , None )
251
252
ssl_keyfile = d .get ('ssl_keyfile' , None )
252
253
ssl_pem_passphrase = d .get ('ssl_pem_passphrase' , None )
253
- authsource = d .get ('authsource' , None )
254
+ authsource = d .get ('authsource' , None )
254
255
uri_mode = d .get ('uri_mode' , False )
255
256
return LaunchPad (d ['host' ], port , name , username , password ,
256
257
logdir , strm_lvl , user_indices , wf_user_indices , ssl ,
@@ -770,17 +771,17 @@ def tuneup(self, bkground=True):
770
771
try :
771
772
self .db .command ({'compact' : 'fireworks' })
772
773
self .db .command ({'compact' : 'launches' })
773
- except :
774
+ except Exception :
774
775
self .m_logger .debug ('Database compaction failed (not critical)' )
775
776
776
- def pause_fw (self ,fw_id ):
777
+ def pause_fw (self , fw_id ):
777
778
"""
778
779
Given the firework id, pauses the firework and refresh the workflow
779
780
780
781
Args:
781
782
fw_id(int): firework id
782
783
"""
783
- allowed_states = ['WAITING' , 'READY' , 'RESERVED' ]
784
+ allowed_states = ['WAITING' , 'READY' , 'RESERVED' ]
784
785
f = self .fireworks .find_one_and_update (
785
786
{'fw_id' : fw_id , 'state' : {'$in' : allowed_states }},
786
787
{'$set' : {'state' : 'PAUSED' , 'updated_on' : datetime .datetime .utcnow ()}})
@@ -790,7 +791,6 @@ def pause_fw(self,fw_id):
790
791
self .m_logger .error ('No pausable (WAITING,READY,RESERVED) Firework exists with fw_id: {}' .format (fw_id ))
791
792
return f
792
793
793
-
794
794
def defuse_fw (self , fw_id , rerun_duplicates = True ):
795
795
"""
796
796
Given the firework id, defuse the firework and refresh the workflow.
@@ -809,8 +809,8 @@ def defuse_fw(self, fw_id, rerun_duplicates=True):
809
809
if not f :
810
810
self .rerun_fw (fw_id , rerun_duplicates )
811
811
f = self .fireworks .find_one_and_update (
812
- {'fw_id' : fw_id , 'state' : {'$in' : allowed_states }},
813
- {'$set' : {'state' : 'DEFUSED' , 'updated_on' : datetime .datetime .utcnow ()}})
812
+ {'fw_id' : fw_id , 'state' : {'$in' : allowed_states }},
813
+ {'$set' : {'state' : 'DEFUSED' , 'updated_on' : datetime .datetime .utcnow ()}})
814
814
if f :
815
815
self ._refresh_wf (fw_id )
816
816
return f
@@ -966,7 +966,7 @@ def _get_a_fw_to_run(self, query=None, fw_id=None, checkout=True):
966
966
if checkout :
967
967
m_fw = self .fireworks .find_one_and_update (m_query ,
968
968
{'$set' : {'state' : 'RESERVED' ,
969
- 'updated_on' : datetime .datetime .utcnow ()}},
969
+ 'updated_on' : datetime .datetime .utcnow ()}},
970
970
sort = sortby )
971
971
else :
972
972
m_fw = self .fireworks .find_one (m_query , {'fw_id' : 1 , 'spec' : 1 }, sort = sortby )
@@ -1149,8 +1149,8 @@ def detect_lostruns(self, expiration_secs=RUN_EXPIRATION_SECS, fizzle=False, rer
1149
1149
m_l = self .get_launch_by_id (ld ['launch_id' ])
1150
1150
utime = m_l ._get_time ('RUNNING' , use_update_time = True )
1151
1151
ctime = m_l ._get_time ('RUNNING' , use_update_time = False )
1152
- if (not max_runtime or (utime - ctime ).seconds <= max_runtime ) and \
1153
- (not min_runtime or (utime - ctime ).seconds >= min_runtime ):
1152
+ if (not max_runtime or (utime - ctime ).seconds <= max_runtime ) and \
1153
+ (not min_runtime or (utime - ctime ).seconds >= min_runtime ):
1154
1154
bad_launch = True
1155
1155
if bad_launch :
1156
1156
lost_launch_ids .append (ld ['launch_id' ])
@@ -1344,12 +1344,11 @@ def complete_launch(self, launch_id, action=None, state='COMPLETED'):
1344
1344
metadata = {"launch_id" : launch_id })
1345
1345
launch_db_dict ["action" ] = {"gridfs_id" : str (action_id )}
1346
1346
self .m_logger .warning ("The size of the launch document was too large. Saving "
1347
- "the action in gridfs." )
1347
+ "the action in gridfs." )
1348
1348
1349
1349
self .launches .find_one_and_replace ({'launch_id' : m_launch .launch_id },
1350
1350
launch_db_dict , upsert = True )
1351
1351
1352
-
1353
1352
# find all the fws that have this launch
1354
1353
for fw in self .fireworks .find ({'launches' : launch_id }, {'fw_id' : 1 }):
1355
1354
fw_id = fw ['fw_id' ]
@@ -1385,7 +1384,7 @@ def get_new_fw_id(self, quantity=1):
1385
1384
"""
1386
1385
try :
1387
1386
return self .fw_id_assigner .find_one_and_update ({}, {'$inc' : {'next_fw_id' : quantity }})['next_fw_id' ]
1388
- except :
1387
+ except Exception :
1389
1388
raise ValueError ("Could not get next FW id! If you have not yet initialized the database,"
1390
1389
" please do so by performing a database reset (e.g., lpad reset)" )
1391
1390
@@ -1395,7 +1394,7 @@ def get_new_launch_id(self):
1395
1394
"""
1396
1395
try :
1397
1396
return self .fw_id_assigner .find_one_and_update ({}, {'$inc' : {'next_launch_id' : 1 }})['next_launch_id' ]
1398
- except :
1397
+ except Exception :
1399
1398
raise ValueError ("Could not get next launch id! If you have not yet initialized the "
1400
1399
"database, please do so by performing a database reset (e.g., lpad reset)" )
1401
1400
@@ -1421,7 +1420,7 @@ def _upsert_fws(self, fws, reassign_all=False):
1421
1420
# this is the FIRST fw_id we should use
1422
1421
first_new_id = self .get_new_fw_id (quantity = len (fws ))
1423
1422
1424
- for new_id , fw in enumerate (fws , start = first_new_id ):
1423
+ for new_id , fw in enumerate (fws , start = first_new_id ):
1425
1424
old_new [fw .fw_id ] = new_id
1426
1425
fw .fw_id = new_id
1427
1426
used_ids .append (new_id )
@@ -1467,7 +1466,7 @@ def rerun_fw(self, fw_id, rerun_duplicates=True, recover_launch=None, recover_mo
1467
1466
reruns = []
1468
1467
if rerun_duplicates :
1469
1468
f = self .fireworks .find_one ({"fw_id" : fw_id , "spec._dupefinder" : {"$exists" : True }},
1470
- {'launches' :1 })
1469
+ {'launches' : 1 })
1471
1470
if f :
1472
1471
for d in self .fireworks .find ({"launches" : {"$in" : f ['launches' ]},
1473
1472
"fw_id" : {"$ne" : fw_id }}, {"fw_id" : 1 }):
@@ -1486,12 +1485,11 @@ def rerun_fw(self, fw_id, rerun_duplicates=True, recover_launch=None, recover_mo
1486
1485
1487
1486
# If no launch recovery specified, unset the firework recovery spec
1488
1487
else :
1489
- set_spec = {"$unset" :{"spec._recovery" :"" }}
1490
- self .fireworks .find_one_and_update ({"fw_id" :fw_id }, set_spec )
1491
-
1488
+ set_spec = {"$unset" : {"spec._recovery" : "" }}
1489
+ self .fireworks .find_one_and_update ({"fw_id" : fw_id }, set_spec )
1492
1490
1493
1491
# rerun this FW
1494
- if m_fw ['state' ] in ['ARCHIVED' , 'DEFUSED' ] :
1492
+ if m_fw ['state' ] in ['ARCHIVED' , 'DEFUSED' ]:
1495
1493
self .m_logger .info ("Cannot rerun fw_id: {}: it is {}." .format (fw_id , m_fw ['state' ]))
1496
1494
elif m_fw ['state' ] == 'WAITING' and not recover_launch :
1497
1495
self .m_logger .debug ("Skipping rerun fw_id: {}: it is already WAITING." .format (fw_id ))
@@ -1546,7 +1544,7 @@ def _refresh_wf(self, fw_id):
1546
1544
self ._update_wf (wf , updated_ids )
1547
1545
except LockedWorkflowError :
1548
1546
self .m_logger .info ("fw_id {} locked. Can't refresh!" .format (fw_id ))
1549
- except :
1547
+ except Exception :
1550
1548
# some kind of internal error - an example is that fws serialization changed due to
1551
1549
# code updates and thus the Firework object can no longer be loaded from db description
1552
1550
# Action: *manually* mark the fw and workflow as FIZZLED
@@ -1614,7 +1612,7 @@ def _steal_launches(self, thief_fw):
1614
1612
except NotImplementedError :
1615
1613
verified = True # no dupefinder.verify() implemented, skip verification
1616
1614
1617
- except : # we want to catch any exceptions from testing an empty dict, which the dupefinder might not be designed for
1615
+ except Exception : # we want to catch any exceptions from testing an empty dict, which the dupefinder might not be designed for
1618
1616
pass
1619
1617
1620
1618
if not verified :
@@ -1755,7 +1753,7 @@ def recover_offline(self, launch_id, ignore_errors=False, print_errors=False):
1755
1753
{"$set" : {"updated_on" : datetime .datetime .utcnow ().isoformat ()}})
1756
1754
return None
1757
1755
1758
- except :
1756
+ except Exception :
1759
1757
if print_errors :
1760
1758
self .m_logger .error ("failed recovering launch_id {}.\n {}" .format (
1761
1759
launch_id , traceback .format_exc ()))
@@ -1961,7 +1959,7 @@ def partial_fw(self):
1961
1959
1962
1960
@property
1963
1961
def full_fw (self ):
1964
- #map(self._get_launch_data, self.db_launch_fields)
1962
+ # map(self._get_launch_data, self.db_launch_fields)
1965
1963
for launch_field in self .db_launch_fields :
1966
1964
self ._get_launch_data (launch_field )
1967
1965
return self ._fw
@@ -2000,7 +1998,7 @@ def get_action_from_gridfs(action_dict, fallback_fs):
2000
1998
on its identifier, otherwise simply returns the dictionary in input.
2001
1999
Should be used when accessing a launch to ensure the presence of the
2002
2000
correct action dictionary.
2003
-
2001
+
2004
2002
Args:
2005
2003
action_dict (dict): the dictionary contained in the "action" key of a launch
2006
2004
document.
0 commit comments