5
5
import sys
6
6
import threading
7
7
from asyncio import Future
8
- from datetime import datetime
8
+ import datetime
9
9
from enum import Enum
10
- from typing import Dict , List , Optional
11
- import numpy as np
10
+ from typing import Dict , Optional
12
11
12
+ import numpy as np
13
13
import websockets
14
14
from rx .subject import BehaviorSubject
15
15
16
16
import deepkit .globals
17
17
from deepkit .home import get_home_config
18
- from deepkit .model import ExperimentOptions , FolderLink
18
+ from deepkit .model import FolderLink
19
19
20
20
21
21
def is_in_directory (filepath , directory ):
@@ -31,10 +31,15 @@ def json_converter(obj):
31
31
return int (obj )
32
32
elif isinstance (obj , np .floating ):
33
33
return float (obj )
34
+ elif isinstance (obj , np .float ):
35
+ return float (obj )
34
36
elif isinstance (obj , np .ndarray ):
35
37
return obj .tolist ()
36
38
elif isinstance (obj , datetime .datetime ):
37
- return obj .__str__ ()
39
+ # we assume all datetime instances are UTC
40
+ return obj .strftime ('%Y-%m-%dT%H:%M:%S.%fZ' )
41
+ else :
42
+ return str (obj )
38
43
39
44
40
45
class JobStatus (Enum ):
@@ -47,23 +52,36 @@ class JobStatus(Enum):
47
52
class Client (threading .Thread ):
48
53
connection : websockets .WebSocketClientProtocol
49
54
50
- def __init__ (self , options : ExperimentOptions ):
55
+ def __init__ (self , project : Optional [str ] = None ,
56
+ account : Optional [str ] = None ,
57
+ try_pick_up = False ,
58
+ parent_experiment = None ,
59
+ silent = False ):
51
60
self .connected = BehaviorSubject (False )
52
- self .options : ExperimentOptions = options
61
+ self .project = project
62
+ self .account = account
63
+ self .parent_experiment = parent_experiment
64
+ self .silent = silent
53
65
54
66
self .host = os .environ .get ('DEEPKIT_HOST' , '127.0.0.1' )
55
67
self .socket_path = os .environ .get ('DEEPKIT_SOCKET' , None )
56
68
self .ssl = os .environ .get ('DEEPKIT_SSL' , '0' ) is '1'
57
69
self .port = int (os .environ .get ('DEEPKIT_PORT' , '8960' ))
58
- self .job_token = os .environ .get ('DEEPKIT_JOB_ACCESSTOKEN' , None )
59
- self .job_id = os .environ .get ('DEEPKIT_JOB_ID' , None )
60
70
71
+ self .job_token = None
72
+ self .job_id = None
73
+
74
+ if try_pick_up :
75
+ # is set by Deepkit cli
76
+ self .job_token = os .environ .get ('DEEPKIT_JOB_ACCESSTOKEN' , None )
77
+ self .job_id = os .environ .get ('DEEPKIT_JOB_ID' , None )
78
+
79
+ # is set by deepkit.login()
61
80
self .token = os .environ .get ('DEEPKIT_ACCESSTOKEN' , None )
62
81
63
82
self .result_status = None
64
83
65
84
self .message_id = 0
66
- self .account = 'localhost'
67
85
self .callbacks : Dict [int , asyncio .Future ] = {}
68
86
self .subscriber : Dict [int , any ] = {}
69
87
self .stopping = False
@@ -102,24 +120,25 @@ def shutdown(self):
102
120
async def stop_and_sync (self ):
103
121
self .stopping = True
104
122
105
- if deepkit .utils .in_self_execution ():
123
+ if deepkit .utils .in_self_execution () or self . result_status :
106
124
# only when we are in self execution do we set status, time stamps etc
107
- # otherwise the CLI is doing that and the server.
125
+ # otherwise the CLI is doing that and the server. Or when
126
+ # the experiment set result_state explicitly.
108
127
109
128
# done = 150, //when all tasks are done
110
129
# aborted = 200, //when at least one task aborted
111
130
# failed = 250, //when at least one task failed
112
131
# crashed = 300, //when at least one task crashed
113
132
self .patches ['status' ] = 150
114
- self .patches ['ended' ] = datetime .now (). isoformat ()
115
- self .patches ['tasks.main.ended' ] = datetime .now (). isoformat ()
133
+ self .patches ['ended' ] = datetime .datetime . utcnow ()
134
+ self .patches ['tasks.main.ended' ] = datetime .datetime . utcnow ()
116
135
117
136
# done = 500,
118
137
# aborted = 550,
119
138
# failed = 600,
120
139
# crashed = 650,
121
140
self .patches ['tasks.main.status' ] = 500
122
- self .patches ['tasks.main.instances.0.ended' ] = datetime .now (). isoformat ()
141
+ self .patches ['tasks.main.instances.0.ended' ] = datetime .datetime . utcnow ()
123
142
124
143
# done = 500,
125
144
# aborted = 550,
@@ -338,12 +357,12 @@ async def send_messages(self, connection):
338
357
try :
339
358
j = json .dumps (m , default = json_converter )
340
359
except TypeError as e :
341
- print ('Could not send message since JSON error' , e , m )
360
+ print ('Could not send message since JSON error' , e , m , file = sys . stderr )
342
361
continue
343
362
await connection .send (j )
344
363
self .queue .remove (m )
345
364
except Exception as e :
346
- print ("Failed sending, exit send_messages" )
365
+ print ("Failed sending, exit send_messages" , file = sys . stderr )
347
366
raise e
348
367
349
368
if len (self .patches ) > 0 :
@@ -363,7 +382,7 @@ async def send_messages(self, connection):
363
382
send
364
383
],
365
384
'timeout' : 60
366
- }))
385
+ }, default = json_converter ))
367
386
368
387
for i in send .keys ():
369
388
if self .patches [i ] == send [i ]:
@@ -396,7 +415,7 @@ async def handle_messages(self, connection):
396
415
del self .callbacks [res ['id' ]]
397
416
398
417
if not self .stopping :
399
- print ("Deepkit: lost connection. reconnect ..." )
418
+ self . log ("Deepkit: lost connection. reconnect ..." )
400
419
self .connecting = self .loop .create_future ()
401
420
self .connected .on_next (False )
402
421
self .loop .create_task (self ._connect ())
@@ -411,7 +430,7 @@ async def _connected(self, id: str, token: str):
411
430
self .connection = await websockets .connect (url )
412
431
except Exception as e :
413
432
# try again later
414
- print ('Unable to connect' , e )
433
+ self . log ('Unable to connect' , e )
415
434
await asyncio .sleep (1 )
416
435
self .loop .create_task (self ._connect ())
417
436
return
@@ -430,7 +449,7 @@ async def _connected(self, id: str, token: str):
430
449
}
431
450
}, lock = False )
432
451
433
- await self .connection .send (json .dumps (message ))
452
+ await self .connection .send (json .dumps (message , default = json_converter ))
434
453
435
454
res = await self .callbacks [message ['id' ]]
436
455
if not res ['result' ] or res ['result' ] is not True :
@@ -440,7 +459,7 @@ async def _connected(self, id: str, token: str):
440
459
441
460
self .connecting .set_result (True )
442
461
if self .connections > 0 :
443
- print ("Deepkit: Reconnected." )
462
+ self . log ("Deepkit: Reconnected." )
444
463
445
464
self .connected .on_next (True )
446
465
self .connections += 1
@@ -469,20 +488,21 @@ async def _connect(self):
469
488
link : Optional [FolderLink ] = None
470
489
471
490
user_token = self .token
472
- account_name = 'dynamic '
491
+ account_name = 'none '
473
492
474
493
if not user_token :
475
494
config = get_home_config ()
476
495
# when no user_token is given (via deepkit.login() for example)
477
496
# we need to find the host, port, token from the user config in ~/.deepkit/config
478
- if self .options .account :
479
- account_config = config .get_account_for_name (self .options .account )
480
- elif not self .options .project :
497
+ if not self .account and not self .project :
498
+ # find both, start with
481
499
link = config .get_folder_link_of_directory (sys .path [0 ])
482
500
account_config = config .get_account_for_id (link .accountId )
501
+ elif self .account and not self .project :
502
+ account_config = config .get_account_for_name (self .account )
483
503
else :
484
- # default to localhost
485
- account_config = config .get_account_for_name ( 'localhost' )
504
+ # default to first account configured
505
+ account_config = config .get_first_account ( )
486
506
487
507
account_name = account_config .name
488
508
self .host = account_config .host
@@ -496,7 +516,7 @@ async def _connect(self):
496
516
self .connection = await websockets .connect (url )
497
517
except Exception as e :
498
518
self .offline = True
499
- print (f"Deepkit: App not started or server not reachable. Monitoring disabled. { e } " )
519
+ print (f"Deepkit: App not started or server not reachable. Monitoring disabled. { e } " , file = sys . stderr )
500
520
self .connecting .set_result (False )
501
521
return
502
522
@@ -513,24 +533,30 @@ async def _connect(self):
513
533
if not res ['result' ]:
514
534
raise Exception ('Login invalid' )
515
535
536
+ project_name = ''
516
537
if link :
538
+ project_name = link .name
517
539
projectId = link .projectId
518
540
else :
519
- if not self .options . project :
520
- raise Exception ('No project defined. Please use project="project-name"'
541
+ if not self .project :
542
+ raise Exception ('No project defined. Please use project="project-name" '
521
543
'to specify which project to use.' )
522
544
523
- project = await self ._action ('app' , 'getProjectForPublicName' , [self .options .project ], lock = False )
545
+ project = await self ._action ('app' , 'getProjectForPublicName' , [self .project ], lock = False )
546
+
524
547
if not project :
525
548
raise Exception (
526
- f'No project found for name { self .options . project } . Make sure it exists before using it. '
549
+ f'No project found for name { self .project } . Make sure it exists before using it. '
527
550
f'Do you use the correct account? (used { account_name } )' )
528
-
551
+ project_name = project [ 'name' ]
529
552
projectId = project ['id' ]
530
553
531
- job = await self ._action ('app' , 'createJob' , [projectId ],
554
+ job = await self ._action ('app' , 'createJob' , [projectId , self . parent_experiment ],
532
555
lock = False )
533
556
557
+ prefix = "Sub experiment" if self .parent_experiment else "Experiment"
558
+ self .log (f"{ prefix } #{ job ['number' ]} created in project { project_name } using account { account_name } " )
559
+
534
560
deepkit .globals .loaded_job_config = job ['config' ]['config' ]
535
561
self .job_token = await self ._action ('app' , 'getJobAccessToken' , [job ['id' ]], lock = False )
536
562
self .job_id = job ['id' ]
@@ -542,3 +568,6 @@ async def _connect(self):
542
568
self .connecting .set_exception (e )
543
569
544
570
self .queue = queue_copy + self .queue
571
+
572
+ def log (self , * message : str ):
573
+ if not self .silent : print (* message )
0 commit comments