@@ -18,36 +18,45 @@ type WakuContext* = object
18
18
userData* : pointer
19
19
eventCallback* : pointer
20
20
eventUserdata* : pointer
21
+ running: Atomic[bool ] # To control when the thread is running
21
22
22
23
const git_version* {.strdefine.} = " n/a"
23
24
const versionString = " version / git commit hash: " & waku.git_version
24
25
25
- # To control when the thread is running
26
- # TODO : this should be part of the context so multiple instances can be executed
27
- var running: Atomic[bool ]
28
-
29
26
proc runWaku(ctx: ptr WakuContext) {.async.} =
30
27
# # This is the worker body. This runs the Waku node
31
28
# # and attends library user requests (stop, connect_to, etc.)
32
- info " Starting Waku" , version = versionString
33
29
34
30
var waku: Waku
35
31
36
- while running.load == true :
32
+ while true :
37
33
await ctx.reqSignal.wait()
38
34
39
- # Trying to get a request from the libwaku main thread
35
+ if ctx.running.load == false :
36
+ break ;
37
+
38
+ # # Trying to get a request from the libwaku requestor thread
40
39
var request: ptr InterThreadRequest
41
40
let recvOk = ctx.reqChannel.tryRecv(request)
42
- if recvOk == true :
43
- let resultResponse = waitFor InterThreadRequest.process(request, addr waku)
41
+ if not recvOk:
42
+ error " waku thread could not receive a request"
43
+ continue
44
+
45
+ # # Handle the request
46
+ let resultResponse = waitFor InterThreadRequest.process(request, addr waku)
47
+
48
+ # # Converting a `Result` into a thread-safe transferable response type
49
+ let threadSafeResp = InterThreadResponse.createShared(resultResponse)
44
50
45
- # # Converting a `Result` into a thread-safe transferable response type
46
- let threadSafeResp = InterThreadResponse.createShared(resultResponse)
51
+ # # Send the response back to the thread that sent the request
52
+ let sentOk = ctx.respChannel.trySend(threadSafeResp)
53
+ if not sentOk:
54
+ error " could not send a request to the requester thread" , original_request = $ request[]
47
55
48
- # # The error-handling is performed in the main thread
49
- discard ctx.respChannel.trySend(threadSafeResp)
50
- discard ctx.respSignal.fireSync()
56
+ let fireRes = ctx.respSignal.fireSync()
57
+ if fireRes.isErr():
58
+ error " could not fireSync back to requester thread" ,
59
+ original_request = $ request[], error = fireRes.error
51
60
52
61
proc run(ctx: ptr WakuContext) {.thread.} =
53
62
# # Launch waku worker
@@ -62,7 +71,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =
62
71
ctx.respSignal = ThreadSignalPtr.new().valueOr:
63
72
return err(" couldn't create respSignal ThreadSignalPtr" )
64
73
65
- running.store(true )
74
+ ctx. running.store(true )
66
75
67
76
try :
68
77
createThread(ctx.thread, run, ctx)
@@ -74,15 +83,20 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =
74
83
75
84
return ok(ctx)
76
85
77
- proc stopWakuThread* (ctx: ptr WakuContext): Result[void , string ] =
78
- running.store(false )
79
- let fireRes = ctx.reqSignal.fireSync()
80
- if fireRes.isErr():
81
- return err(" error in stopWakuThread: " & $ fireRes.error)
82
- discard ctx.reqSignal.close()
83
- discard ctx.respSignal.close()
86
+ proc destroyWakuThread* (ctx: ptr WakuContext): Result[void , string ] =
87
+ ctx.running.store(false )
88
+
89
+ let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
90
+ return err(" error in destroyWakuThread: " & $ error)
91
+ if not signaledOnTime:
92
+ error " failed to signal reqSignal on time in destroyWakuThread"
93
+ return err(" failed to signal reqSignal on time in destroyWakuThread" )
94
+
84
95
joinThread(ctx.thread)
96
+ ? ctx.reqSignal.close()
97
+ ? ctx.respSignal.close()
85
98
freeShared(ctx)
99
+
86
100
return ok()
87
101
88
102
proc sendRequestToWakuThread* (
0 commit comments