diff --git a/library/libwaku.nim b/library/libwaku.nim index a5828f5d8..70fac3923 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -168,7 +168,7 @@ proc waku_destroy( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread.stopWakuThread(ctx).handleRes(callback, userData) + waku_thread.destroyWakuThread(ctx).handleRes(callback, userData) proc waku_version( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 5967cbfdd..44e81d4dc 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -18,36 +18,45 @@ type WakuContext* = object userData*: pointer eventCallback*: pointer eventUserdata*: pointer + running: Atomic[bool] # To control when the thread is running const git_version* {.strdefine.} = "n/a" const versionString = "version / git commit hash: " & waku.git_version -# To control when the thread is running -# TODO: this should be part of the context so multiple instances can be executed -var running: Atomic[bool] - proc runWaku(ctx: ptr WakuContext) {.async.} = ## This is the worker body. This runs the Waku node ## and attends library user requests (stop, connect_to, etc.) - info "Starting Waku", version = versionString var waku: Waku - while running.load == true: + while true: await ctx.reqSignal.wait() - # Trying to get a request from the libwaku main thread + if ctx.running.load == false: + break; + + ## Trying to get a request from the libwaku requestor thread var request: ptr InterThreadRequest let recvOk = ctx.reqChannel.tryRecv(request) - if recvOk == true: - let resultResponse = waitFor InterThreadRequest.process(request, addr waku) + if not recvOk: + error "waku thread could not receive a request" + continue + + ## Handle the request + let resultResponse = waitFor InterThreadRequest.process(request, addr waku) + + ## Converting a `Result` into a thread-safe transferable response type + let threadSafeResp = InterThreadResponse.createShared(resultResponse) - ## Converting a `Result` into a thread-safe transferable response type - let threadSafeResp = InterThreadResponse.createShared(resultResponse) + ## Send the response back to the thread that sent the request + let sentOk = ctx.respChannel.trySend(threadSafeResp) + if not sentOk: + error "could not send a request to the requester thread", original_request = $request[] - ## The error-handling is performed in the main thread - discard ctx.respChannel.trySend(threadSafeResp) - discard ctx.respSignal.fireSync() + let fireRes = ctx.respSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", + original_request = $request[], error = fireRes.error proc run(ctx: ptr WakuContext) {.thread.} = ## Launch waku worker @@ -62,7 +71,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = ctx.respSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create respSignal ThreadSignalPtr") - running.store(true) + ctx.running.store(true) try: createThread(ctx.thread, run, ctx) @@ -74,15 +83,20 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = return ok(ctx) -proc stopWakuThread*(ctx: ptr WakuContext): Result[void, string] = - running.store(false) - let fireRes = ctx.reqSignal.fireSync() - if fireRes.isErr(): - return err("error in stopWakuThread: " & $fireRes.error) - discard ctx.reqSignal.close() - discard ctx.respSignal.close() +proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = + ctx.running.store(false) + + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("error in destroyWakuThread: " & $error) + if not signaledOnTime: + error "failed to signal reqSignal on time in destroyWakuThread" + return err("failed to signal reqSignal on time in destroyWakuThread") + joinThread(ctx.thread) + ?ctx.reqSignal.close() + ?ctx.respSignal.close() freeShared(ctx) + return ok() proc sendRequestToWakuThread*(