1
1
import json
2
2
import multiprocessing
3
+ import sys
3
4
import tempfile
4
- import time
5
- import traceback
6
5
7
6
from pytest import raises
8
7
9
8
from labthings_fastapi .server import server_from_config , ThingServer
10
9
from labthings_fastapi .server .cli import serve_from_cli
11
10
12
11
13
- class ProcessPropagatingExceptions (multiprocessing .Process ):
14
- """A process that remembers exceptons, and raises them on join()
12
+ def monitored_target (target , conn , * args , ** kwargs ):
13
+ """Monitor stdout and exceptions from a function"""
14
+ # The lines below copy stdout messages to a pipe
15
+ # which allows us to monitor STDOUT and STDERR
16
+ for output , name in [(sys .stdout , "stdout" ), (sys .stderr , "stderr" )]:
17
+
18
+ def write_wrapper (message ):
19
+ conn .send ((name , message ))
20
+
21
+ output .write = write_wrapper
22
+
23
+ try :
24
+ ret = target (* args , ** kwargs )
25
+ conn .send (("success" , ret ))
26
+ except Exception as e :
27
+ conn .send (("exception" , e ))
28
+ except SystemExit as e :
29
+ conn .send (("exit" , e ))
30
+
31
+
32
+ class MonitoredProcess (multiprocessing .Process ):
33
+ """A process that monitors stdout and propagates exceptions to `join()`
15
34
16
35
With thanks to:
17
36
https://stackoverflow.com/questions/63758186
18
37
"""
19
38
20
- def __init__ (self , * args , ** kwargs ):
21
- multiprocessing .Process .__init__ (self , * args , ** kwargs )
39
+ def __init__ (self , target = None , ** kwargs ):
22
40
self ._pconn , self ._cconn = multiprocessing .Pipe ()
23
- self ._exception = None
24
-
25
- def run (self ):
26
- try :
27
- multiprocessing .Process .run (self )
28
- self ._cconn .send (None )
29
- except Exception as e :
30
- tb = traceback .format_exc ()
31
- self ._cconn .send ((e , tb ))
32
-
33
- @property
34
- def exception (self ):
35
- if self ._pconn .poll ():
36
- self ._exception = self ._pconn .recv ()
37
- return self ._exception
38
-
39
- def join (self ):
41
+ args = (target , self ._cconn ) + kwargs .pop ("args" , ())
42
+ multiprocessing .Process .__init__ (
43
+ self , target = monitored_target , args = args , ** kwargs
44
+ )
45
+
46
+ def run_monitored (self , terminate_outputs = [], timeout = 10 ):
47
+ """Run the process, monitoring stdout and exceptions"""
48
+ self .start ()
40
49
try :
41
- if self .exception :
42
- e , _tb = self .exception
43
- raise e
50
+ while self ._pconn .poll (timeout ):
51
+ event , m = self ._pconn .recv ()
52
+ if event == "success" :
53
+ return m
54
+ elif event in ("exception" , "exit" ):
55
+ raise m
56
+ elif event in ("stdout" , "stderr" ):
57
+ print (f"{ event .upper ()} : { m } " )
58
+ if any (output in m for output in terminate_outputs ):
59
+ self .terminate ()
60
+ break
61
+ else :
62
+ raise RuntimeError (f"Unknown event: { event } , { m !r} " )
63
+ else :
64
+ raise TimeoutError ("Timed out waiting for process output" )
44
65
finally :
45
- multiprocessing . Process . join (self )
66
+ self . join ()
46
67
47
68
48
69
CONFIG = {
@@ -64,13 +85,8 @@ def test_server_from_config():
64
85
65
86
def check_serve_from_cli (args : list [str ] = []):
66
87
"""Check we can create a server from the command line"""
67
- p = ProcessPropagatingExceptions (
68
- target = serve_from_cli , args = (args ,)
69
- )
70
- p .start ()
71
- time .sleep (1 )
72
- p .terminate ()
73
- p .join ()
88
+ p = MonitoredProcess (target = serve_from_cli , args = (args ,))
89
+ p .run_monitored (terminate_outputs = ["Application startup complete" ])
74
90
75
91
76
92
def test_serve_from_cli_with_config_json ():
@@ -101,3 +117,7 @@ def test_serve_with_no_config():
101
117
"""
102
118
with raises (RuntimeError ):
103
119
check_serve_from_cli ([])
120
+
121
+
122
+ if __name__ == "__main__" :
123
+ test_serve_from_cli_with_config_json ()
0 commit comments