From d1925612f501a011116ff51b0f550be619b2ffc9 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Fri, 19 Apr 2024 15:05:12 +0530 Subject: [PATCH 1/7] chore(rln-relay): resultify rln-relay 1/n --- .../test_rln_group_manager_onchain.nim | 162 ++++++------ .../rln_keystore_generator.nim | 7 +- waku/waku_rln_relay/conversion_utils.nim | 2 +- .../group_manager/group_manager_base.nim | 11 +- .../group_manager/on_chain/group_manager.nim | 240 ++++++++++-------- .../group_manager/on_chain/retry_wrapper.nim | 11 +- .../group_manager/static/group_manager.nim | 17 +- waku/waku_rln_relay/rln/wrappers.nim | 3 +- waku/waku_rln_relay/rln_relay.nim | 64 ++--- 9 files changed, 264 insertions(+), 253 deletions(-) diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index 0596a8e96b..3d95a4e40d 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -8,7 +8,6 @@ else: import std/[options, os, osproc, sequtils, deques, streams, strutils, tempfiles], stew/[results, byteutils], - stew/shims/net as stewNet, testutils/unittests, chronos, chronicles, @@ -218,7 +217,8 @@ suite "Onchain group manager": asyncTest "should initialize successfully": let manager = await setup() - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error check: manager.ethRpc.isSome() @@ -231,7 +231,8 @@ suite "Onchain group manager": asyncTest "should error on initialization when loaded metadata does not match": let manager = await setup() - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error let metadataSetRes = manager.setMetadata() assert metadataSetRes.isOk(), metadataSetRes.error @@ -253,50 +254,43 @@ suite "Onchain group manager": ethContractAddress: $differentContractAddress, rlnInstance: manager.rlnInstance, ) - expect(ValueError): - await manager2.init() + (await manager2.init()).isErrOr: + raiseAssert "Expected error when contract address doesn't match" asyncTest "should error when keystore path and password are provided but file doesn't exist": let manager = await setup() manager.keystorePath = some("/inexistent/file") manager.keystorePassword = some("password") - expect(CatchableError): - await manager.init() + (await manager.init()).isErrOr: + raiseAssert "Expected error when keystore file doesn't exist" asyncTest "startGroupSync: should start group sync": let manager = await setup() - await manager.init() - try: - await manager.startGroupSync() - except Exception, CatchableError: - assert false, - "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() + (await manager.init()).isOkOr: + raiseAssert $error + (await manager.startGroupSync()).isOkOr: + raiseAssert $error + await manager.stop() asyncTest "startGroupSync: should guard against uninitialized state": let manager = await setup() - try: - await manager.startGroupSync() - except CatchableError: - assert true - except Exception: - assert false, - "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() + (await manager.startGroupSync()).isErrOr: + raiseAssert "Expected error when not initialized" await manager.stop() asyncTest "startGroupSync: should sync to the state of the group": let manager = await setup() let credentials = generateCredentials(manager.rlnInstance) - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error - let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootBeforeRes.isOk() - let merkleRootBefore = merkleRootBeforeRes.get() + let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error let fut = newFuture[void]("startGroupSync") @@ -324,16 +318,15 @@ suite "Onchain group manager": await manager.register(credentials, UserMessageLimit(1)) else: await manager.register(credentials) - await manager.startGroupSync() + (await manager.startGroupSync()).isOkOr: + raiseAssert $error except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() await fut - let merkleRootAfterRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootAfterRes.isOk() - let merkleRootAfter = merkleRootAfterRes.get() + let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error check: merkleRootBefore != merkleRootAfter @@ -343,12 +336,11 @@ suite "Onchain group manager": let manager = await setup() const credentialCount = 6 let credentials = generateCredentials(manager.rlnInstance, credentialCount) - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error - let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootBeforeRes.isOk() - let merkleRootBefore = merkleRootBeforeRes.get() + let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error type TestGroupSyncFuts = array[0 .. credentialCount - 1, Future[void]] var futures: TestGroupSyncFuts @@ -377,7 +369,8 @@ suite "Onchain group manager": try: manager.onRegister(generateCallback(futures, credentials)) - await manager.startGroupSync() + (await manager.startGroupSync()).isOkOr: + raiseAssert $error for i in 0 ..< credentials.len(): when defined(rln_v2): @@ -389,10 +382,8 @@ suite "Onchain group manager": await allFutures(futures) - let merkleRootAfterRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootAfterRes.isOk() - let merkleRootAfter = merkleRootAfterRes.get() + let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error check: merkleRootBefore != merkleRootAfter @@ -421,18 +412,14 @@ suite "Onchain group manager": asyncTest "register: should register successfully": let manager = await setup() - await manager.init() - try: - await manager.startGroupSync() - except Exception, CatchableError: - assert false, - "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() + (await manager.init()).isOkOr: + raiseAssert $error + (await manager.startGroupSync()).isOkOr: + raiseAssert $error let idCommitment = generateCredentials(manager.rlnInstance).idCommitment - let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootBeforeRes.isOk() - let merkleRootBefore = merkleRootBeforeRes.get() + let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error try: when defined(rln_v2): @@ -447,10 +434,8 @@ suite "Onchain group manager": assert false, "exception raised when calling register: " & getCurrentExceptionMsg() - let merkleRootAfterRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootAfterRes.isOk() - let merkleRootAfter = merkleRootAfterRes.get() + let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error check: merkleRootAfter.inHex() != merkleRootBefore.inHex() manager.latestIndex == 1 @@ -480,9 +465,11 @@ suite "Onchain group manager": fut.complete() manager.onRegister(callback) - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error try: - await manager.startGroupSync() + (await manager.startGroupSync()).isOkOr: + raiseAssert $error when defined(rln_v2): await manager.register( RateCommitment( @@ -519,7 +506,8 @@ suite "Onchain group manager": asyncTest "validateRoot: should validate good root": let manager = await setup() let credentials = generateCredentials(manager.rlnInstance) - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error let fut = newFuture[void]() @@ -541,7 +529,8 @@ suite "Onchain group manager": manager.onRegister(callback) try: - await manager.startGroupSync() + (await manager.startGroupSync()).isOkOr: + raiseAssert $error when defined(rln_v2): await manager.register(credentials, UserMessageLimit(1)) else: @@ -578,12 +567,10 @@ suite "Onchain group manager": asyncTest "validateRoot: should reject bad root": let manager = await setup() - await manager.init() - try: - await manager.startGroupSync() - except Exception, CatchableError: - assert false, - "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() + (await manager.init()).isOkOr: + raiseAssert $error + (await manager.startGroupSync()).isOkOr: + raiseAssert $error let credentials = generateCredentials(manager.rlnInstance) @@ -620,7 +607,8 @@ suite "Onchain group manager": asyncTest "verifyProof: should verify valid proof": let manager = await setup() let credentials = generateCredentials(manager.rlnInstance) - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error let fut = newFuture[void]() @@ -642,7 +630,8 @@ suite "Onchain group manager": manager.onRegister(callback) try: - await manager.startGroupSync() + (await manager.startGroupSync()).isOkOr: + raiseAssert $error when defined(rln_v2): await manager.register(credentials, UserMessageLimit(1)) else: @@ -679,12 +668,10 @@ suite "Onchain group manager": asyncTest "verifyProof: should reject invalid proof": let manager = await setup() - await manager.init() - try: - await manager.startGroupSync() - except Exception, CatchableError: - assert false, - "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() + (await manager.init()).isOkOr: + raiseAssert $error + (await manager.startGroupSync()).isOkOr: + raiseAssert $error let idCredential = generateCredentials(manager.rlnInstance) @@ -724,19 +711,19 @@ suite "Onchain group manager": let invalidProof = invalidProofRes.get() # verify the proof (should be false) - let verifiedRes = manager.verifyProof(messageBytes, invalidProof) - require: - verifiedRes.isOk() + let verified = manager.verifyProof(messageBytes, invalidProof).valueOr: + raiseAssert $error check: - verifiedRes.get() == false + verified == false await manager.stop() asyncTest "backfillRootQueue: should backfill roots in event of chain reorg": let manager = await setup() const credentialCount = 6 let credentials = generateCredentials(manager.rlnInstance, credentialCount) - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error type TestBackfillFuts = array[0 .. credentialCount - 1, Future[void]] var futures: TestBackfillFuts @@ -766,7 +753,9 @@ suite "Onchain group manager": try: manager.onRegister(generateCallback(futures, credentials)) - await manager.startGroupSync() + (await manager.startGroupSync()).isOkOr: + raiseAssert $error + for i in 0 ..< credentials.len(): when defined(rln_v2): await manager.register(credentials[i], UserMessageLimit(1)) @@ -798,7 +787,8 @@ suite "Onchain group manager": asyncTest "isReady should return false if ethRpc is none": let manager = await setup() - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error manager.ethRpc = none(Web3) @@ -815,7 +805,8 @@ suite "Onchain group manager": asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed": let manager = await setup() - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error var isReady = true try: @@ -830,13 +821,12 @@ suite "Onchain group manager": asyncTest "isReady should return true if ethRpc is ready": let manager = await setup() - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error # node can only be ready after group sync is done - try: - await manager.startGroupSync() - except Exception, CatchableError: - assert false, - "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() + (await manager.startGroupSync()).isOkOr: + raiseAssert $error + var isReady = false try: isReady = await manager.isReady() diff --git a/tools/rln_keystore_generator/rln_keystore_generator.nim b/tools/rln_keystore_generator/rln_keystore_generator.nim index 6be2e1a270..c904519c0d 100644 --- a/tools/rln_keystore_generator/rln_keystore_generator.nim +++ b/tools/rln_keystore_generator/rln_keystore_generator.nim @@ -56,8 +56,11 @@ proc doRlnKeystoreGenerator*(conf: WakuNodeConf) = ethPrivateKey: some(conf.rlnRelayEthPrivateKey), ) try: - waitFor groupManager.init() - except CatchableError: + (waitFor groupManager.init()).isOkOr: + error "failure while initializing OnchainGroupManager", error = $error + quit(1) + # handling the exception is required since waitFor raises an exception + except Exception, CatchableError: error "failure while initializing OnchainGroupManager", error = getCurrentExceptionMsg() quit(1) diff --git a/waku/waku_rln_relay/conversion_utils.nim b/waku/waku_rln_relay/conversion_utils.nim index 59070c6a5a..5744507f8a 100644 --- a/waku/waku_rln_relay/conversion_utils.nim +++ b/waku/waku_rln_relay/conversion_utils.nim @@ -7,7 +7,7 @@ import std/[sequtils, strutils, algorithm], web3, chronicles, - stew/[arrayops, results, endians2], + stew/[arrayops, endians2], stint import ./constants, ./protocol_types import ../waku_keystore diff --git a/waku/waku_rln_relay/group_manager/group_manager_base.nim b/waku/waku_rln_relay/group_manager/group_manager_base.nim index 99dff9635c..6fce958300 100644 --- a/waku/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/waku_rln_relay/group_manager/group_manager_base.nim @@ -40,18 +40,15 @@ type GroupManager* = ref object of RootObj # This proc is used to initialize the group manager # Any initialization logic should be implemented here -method init*(g: GroupManager): Future[void] {.base, async.} = - raise - newException(CatchableError, "init proc for " & $g.type & " is not implemented yet") +method init*(g: GroupManager): Future[GroupManagerResult[void]] {.base, async.} = + return err("init proc for " & $g.type & " is not implemented yet") # This proc is used to start the group sync process # It should be used to sync the group state with the rest of the group members method startGroupSync*( g: GroupManager -): Future[void] {.base, async: (raises: [Exception]).} = - raise newException( - CatchableError, "startGroupSync proc for " & $g.type & " is not implemented yet" - ) +): Future[GroupManagerResult[void]] {.base, async.} = + return err("startGroupSync proc for " & $g.type & " is not implemented yet") # This proc is used to register a new identity commitment into the merkle tree # The user may or may not have the identity secret to this commitment diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 118e6c5abe..74dc93f7ca 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -121,11 +121,26 @@ template initializedGuard(g: OnchainGroupManager): untyped = if not g.initialized: raise newException(CatchableError, "OnchainGroupManager is not initialized") -template retryWrapper( - g: OnchainGroupManager, res: auto, errStr: string, body: untyped -): auto = - retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction): - body +proc resultifiedInitGuard(g: OnchainGroupManager): GroupManagerResult[void] = + try: + initializedGuard(g) + return ok() + except CatchableError: + return err("OnchainGroupManager is not initialized") + +proc retryWrapper[T]( + g: OnchainGroupManager, fut: Future[T], errStr: string +): Future[T] {.async.} = + return await retryWrapper(RetryStrategy.new(), errStr, g.onFatalErrorAction, fut) + +proc resultifiedRetryWrapper[T]( + g: OnchainGroupManager, fut: Future[T], errStr: string +): Future[RlnRelayResult[T]] {.async.} = + try: + return + ok(await retryWrapper(RetryStrategy.new(), errStr, g.onFatalErrorAction, fut)) + except CatchableError: + return err(errStr & ": " & getCurrentExceptionMsg()) proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] = try: @@ -314,24 +329,24 @@ else: let registryContract = g.registryContract.get() let membershipFee = g.membershipFee.get() - var gasPrice: int - g.retryWrapper(gasPrice, "Failed to get gas price"): - int(await ethRpc.provider.eth_gasPrice()) * 2 + let gasPrice = + int( + await g.retryWrapper(ethRpc.provider.eth_gasPrice(), "Failed to get gas price") + ) * 2 let idCommitment = credentials.idCommitment.toUInt256() - var txHash: TxHash let storageIndex = g.usingStorageIndex.get() debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex - g.retryWrapper(txHash, "Failed to register the member"): - await registryContract.register(storageIndex, idCommitment).send( - gasPrice = gasPrice - ) + let txHash = await g.retryWrapper( + registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice), + "Failed to register the member", + ) # wait for the transaction to be mined - var tsReceipt: ReceiptObject - g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"): - await ethRpc.getMinedTransactionReceipt(txHash) + let tsReceipt = await g.retryWrapper( + ethRpc.getMinedTransactionReceipt(txHash), "Failed to get the transaction receipt" + ) debug "registration transaction mined", txHash = txHash g.registrationTxHash = some(txHash) # the receipt topic holds the hash of signature of the raised events @@ -450,14 +465,17 @@ proc getRawEvents( let ethRpc = g.ethRpc.get() let rlnContract = g.rlnContract.get() - var events: JsonNode - g.retryWrapper(events, "Failed to get the events"): - await rlnContract.getJsonLogs( - MemberRegistered, - fromBlock = some(fromBlock.blockId()), - toBlock = some(toBlock.blockId()), + try: + return await g.retryWrapper( + rlnContract.getJsonLogs( + MemberRegistered, + fromBlock = some(fromBlock.blockId()), + toBlock = some(toBlock.blockId()), + ), + "Failed to get the events", ) - return events + except CatchableError: + raise newException(CatchableError, "failed to get the events") proc getBlockTable( g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber @@ -532,7 +550,7 @@ proc handleRemovedEvents( proc getAndHandleEvents( g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: BlockNumber -): Future[bool] {.async: (raises: [Exception]).} = +): Future[void] {.async: (raises: [Exception]).} = initializedGuard(g) let blockTable = await g.getBlockTable(fromBlock, toBlock) try: @@ -550,8 +568,6 @@ proc getAndHandleEvents( else: trace "rln metadata persisted", blockNumber = g.latestProcessedBlock - return true - proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) = g.blockFetchingActive = false @@ -559,9 +575,7 @@ proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) = g.blockFetchingActive = true while g.blockFetchingActive: - var retCb: bool - g.retryWrapper(retCb, "Failed to run the interval block fetching loop"): - await cb() + await g.retryWrapper(cb(), "Failed to run the interval block fetching loop") await sleepAsync(interval) # using asyncSpawn is OK here since @@ -571,20 +585,19 @@ proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) = proc getNewBlockCallback(g: OnchainGroupManager): proc = let ethRpc = g.ethRpc.get() - proc wrappedCb(): Future[bool] {.async, gcsafe.} = - var latestBlock: BlockNumber - g.retryWrapper(latestBlock, "Failed to get the latest block number"): - cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + proc wrappedCb(): Future[void] {.async, gcsafe.} = + let latestBlock = cast[BlockNumber](await g.retryWrapper( + ethRpc.provider.eth_blockNumber(), "Failed to get the latest block number" + )) if latestBlock <= g.latestProcessedBlock: return # get logs from the last block # inc by 1 to prevent double processing let fromBlock = g.latestProcessedBlock + 1 - var handleBlockRes: bool - g.retryWrapper(handleBlockRes, "Failed to handle new block"): - await g.getAndHandleEvents(fromBlock, latestBlock) - return true + await g.retryWrapper( + g.getAndHandleEvents(fromBlock, latestBlock), "Failed to handle new block" + ) return wrappedCb @@ -598,13 +611,11 @@ proc startListeningToEvents( g.runInInterval(newBlockCallback, DefaultBlockPollRate) proc batchAwaitBlockHandlingFuture( - g: OnchainGroupManager, futs: seq[Future[bool]] + g: OnchainGroupManager, futs: seq[Future[void]] ): Future[void] {.async: (raises: [Exception]).} = for fut in futs: try: - var handleBlockRes: bool - g.retryWrapper(handleBlockRes, "Failed to handle block"): - await fut + await g.retryWrapper(fut, "Failed to handle block") except CatchableError: raise newException( CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg() @@ -633,11 +644,11 @@ proc startOnchainSync( blockNumber = g.rlnContractDeployedBlockNumber g.rlnContractDeployedBlockNumber - var futs = newSeq[Future[bool]]() - var currentLatestBlock: BlockNumber - g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): - cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) - + var futs = newSeq[Future[void]]() + var currentLatestBlock = cast[BlockNumber](await g.retryWrapper( + ethRpc.provider.eth_blockNumber(), "Failed to get the latest block number" + )) + try: # we always want to sync from last processed block => latest # chunk events @@ -646,11 +657,11 @@ proc startOnchainSync( # then fetch the new toBlock if fromBlock >= currentLatestBlock: break - + if fromBlock + blockChunkSize.uint > currentLatestBlock.uint: - g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): - cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) - + currentLatestBlock = cast[BlockNumber](await g.retryWrapper( + ethRpc.provider.eth_blockNumber(), "Failed to get the latest block number" + )) let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) debug "fetching events", fromBlock = fromBlock, toBlock = toBlock @@ -658,7 +669,7 @@ proc startOnchainSync( futs.add(g.getAndHandleEvents(fromBlock, toBlock)) if futs.len >= maxFutures or toBlock == currentLatestBlock: await g.batchAwaitBlockHandlingFuture(futs) - futs = newSeq[Future[bool]]() + futs = newSeq[Future[void]]() fromBlock = toBlock + 1 except CatchableError: raise newException( @@ -676,17 +687,14 @@ proc startOnchainSync( method startGroupSync*( g: OnchainGroupManager -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) +): Future[GroupManagerResult[void]] {.async.} = + ?resultifiedInitGuard(g) # Get archive history try: await startOnchainSync(g) - except CatchableError: - raise newException( - CatchableError, - "failed to start onchain sync service: " & getCurrentExceptionMsg(), - ) - return + return ok() + except CatchableError, Exception: + return err("failed to start group sync: " & getCurrentExceptionMsg()) method onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} = g.registerCb = some(cb) @@ -694,23 +702,28 @@ method onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} = method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = g.withdrawCb = some(cb) -method init*(g: OnchainGroupManager): Future[void] {.async.} = - var ethRpc: Web3 +method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} = # check if the Ethereum client is reachable - g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"): - await newWeb3(g.ethClientUrl) + let ethRpc = ( + await g.resultifiedRetryWrapper( + newWeb3(g.ethClientUrl), "Failed to connect to the Ethereum client" + ) + ).valueOr: + return err("failed to connect to the Ethereum client: " & $error) # Set the chain id - var chainId: Quantity - g.retryWrapper(chainId, "Failed to get the chain id"): - await ethRpc.provider.eth_chainId() + let chainId = ( + await g.resultifiedRetryWrapper( + ethRpc.provider.eth_chainId(), "Failed to get the chain id" + ) + ).valueOr: + return err("failed to get the chain id: " & $error) g.chainId = some(chainId) if g.ethPrivateKey.isSome(): let pk = g.ethPrivateKey.get() - let pkParseRes = keys.PrivateKey.fromHex(pk) - if pkParseRes.isErr(): - raise newException(ValueError, "could not parse the private key") - ethRpc.privateKey = some(pkParseRes.get()) + let parsedPk = keys.PrivateKey.fromHex(pk).valueOr: + return err("failed to parse the private key" & ": " & $error) + ethRpc.privateKey = some(parsedPk) ethRpc.defaultAccount = ethRpc.privateKey.get().toPublicKey().toCanonicalAddress().Address @@ -718,14 +731,21 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = let registryContract = ethRpc.contractSender(WakuRlnRegistry, registryAddress) # get the current storage index - var usingStorageIndex: Uint16 - g.retryWrapper(usingStorageIndex, "Failed to get the storage index"): - await registryContract.usingStorageIndex().call() + let usingStorageIndex = ( + await g.resultifiedRetryWrapper( + registryContract.usingStorageIndex().call(), "Failed to get the storage index" + ) + ).valueOr: + return err("failed to get the storage index: " & $error) g.usingStorageIndex = some(usingStorageIndex) - var rlnContractAddress: Address - g.retryWrapper(rlnContractAddress, "Failed to get the rln contract address"): - await registryContract.storages(usingStorageIndex).call() + let rlnContractAddress = ( + await g.resultifiedRetryWrapper( + registryContract.storages(usingStorageIndex).call(), + "Failed to get the rln contract address", + ) + ).valueOr: + return err("failed to get the rln contract address: " & $error) let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress) g.ethRpc = some(ethRpc) @@ -733,9 +753,9 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = g.registryContract = some(registryContract) if g.keystorePath.isSome() and g.keystorePassword.isSome(): - if not existsFile(g.keystorePath.get()): + if not fileExists(g.keystorePath.get()): error "File provided as keystore path does not exist", path = g.keystorePath.get() - raise newException(CatchableError, "missing keystore") + return err("File provided as keystore path does not exist") var keystoreQuery = KeystoreMembership( membershipContract: @@ -744,17 +764,14 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = if g.membershipIndex.isSome(): keystoreQuery.treeIndex = MembershipIndex(g.membershipIndex.get()) waku_rln_membership_credentials_import_duration_seconds.nanosecondTime: - let keystoreCredRes = getMembershipCredentials( + let keystoreCred = getMembershipCredentials( path = g.keystorePath.get(), password = g.keystorePassword.get(), query = keystoreQuery, appInfo = RLNAppInfo, - ) - if keystoreCredRes.isErr(): - raise newException( - CatchableError, "could not parse the keystore: " & $keystoreCredRes.error - ) - let keystoreCred = keystoreCredRes.get() + ).valueOr: + return err("failed to get the keystore credentials: " & $error) + g.membershipIndex = some(keystoreCred.treeIndex) when defined(rln_v2): g.userMessageLimit = some(keystoreCred.userMessageLimit) @@ -764,15 +781,9 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = .memberExists(keystoreCred.identityCredential.idCommitment.toUInt256()) .call() if membershipExists == 0: - raise newException( - CatchableError, "the provided commitment does not have a membership" - ) + return err("the commitment does not have a membership") except CatchableError: - raise newException( - CatchableError, - "could not check if the commitment exists on the contract: " & - getCurrentExceptionMsg(), - ) + return err("failed to check if the commitment has a membership") g.idCredentials = some(keystoreCred.identityCredential) @@ -782,22 +793,29 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = elif metadataGetOptRes.get().isSome(): let metadata = metadataGetOptRes.get().get() if metadata.chainId != uint64(g.chainId.get()): - raise newException(ValueError, "persisted data: chain id mismatch") + return err("persisted data: chain id mismatch") if metadata.contractAddress != g.ethContractAddress.toLower(): - raise newException(ValueError, "persisted data: contract address mismatch") + return err("persisted data: contract address mismatch") g.latestProcessedBlock = metadata.lastProcessedBlock g.validRoots = metadata.validRoots.toDeque() # check if the contract exists by calling a static function - var membershipFee: Uint256 - g.retryWrapper(membershipFee, "Failed to get the membership deposit"): - await rlnContract.MEMBERSHIP_DEPOSIT().call() + let membershipFee = ( + await g.resultifiedRetryWrapper( + rlnContract.MEMBERSHIP_DEPOSIT().call(), "Failed to get the membership deposit" + ) + ).valueOr: + return err("failed to get the membership deposit: " & $error) g.membershipFee = some(membershipFee) - var deployedBlockNumber: Uint256 - g.retryWrapper(deployedBlockNumber, "Failed to get the deployed block number"): - await rlnContract.deployedBlockNumber().call() + let deployedBlockNumber = ( + await g.resultifiedRetryWrapper( + rlnContract.deployedBlockNumber().call(), + "Failed to get the deployed block number", + ) + ).valueOr: + return err("failed to get the deployed block number: " & $error) debug "using rln storage", deployedBlockNumber, rlnContractAddress g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) @@ -807,9 +825,9 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock - var newEthRpc: Web3 - g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"): - await newWeb3(g.ethClientUrl) + let newEthRpc = await g.retryWrapper( + newWeb3(g.ethClientUrl), "Failed to reconnect with the Ethereum client" + ) newEthRpc.ondisconnect = ethRpc.ondisconnect g.ethRpc = some(newEthRpc) @@ -826,6 +844,8 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) g.initialized = true + return ok() + method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} = g.blockFetchingActive = false @@ -841,9 +861,9 @@ method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} = proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async, gcsafe.} = let ethRpc = g.ethRpc.get() - var syncing: JsonNode - g.retryWrapper(syncing, "Failed to get the syncing status"): - await ethRpc.provider.eth_syncing() + let syncing = await g.retryWrapper( + ethRpc.provider.eth_syncing(), "Failed to get the syncing status" + ) return syncing.getBool() method isReady*(g: OnchainGroupManager): Future[bool] {.async.} = @@ -852,9 +872,9 @@ method isReady*(g: OnchainGroupManager): Future[bool] {.async.} = if g.ethRpc.isNone(): return false - var currentBlock: BlockNumber - g.retryWrapper(currentBlock, "Failed to get the current block number"): - cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber()) + let currentBlock = cast[BlockNumber](await g.retryWrapper( + g.ethRpc.get().provider.eth_blockNumber(), "Failed to get the current block number" + )) if g.latestProcessedBlock < currentBlock: return false diff --git a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim index eaa239c80c..d2d8f92f0f 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim @@ -1,5 +1,6 @@ import ../../../common/error_handling import chronos +import results type RetryStrategy* = object shouldRetry*: bool @@ -9,21 +10,19 @@ type RetryStrategy* = object proc new*(T: type RetryStrategy): RetryStrategy = return RetryStrategy(shouldRetry: true, retryDelay: 1000.millis, retryCount: 3) -template retryWrapper*( - res: auto, +proc retryWrapper*[T]( retryStrategy: RetryStrategy, errStr: string, errCallback: OnFatalErrorHandler = nil, - body: untyped, -): auto = + fut: Future[T], +): Future[T] {.async.} = var retryCount = retryStrategy.retryCount var shouldRetry = retryStrategy.shouldRetry var exceptionMessage = "" while shouldRetry and retryCount > 0: try: - res = body - shouldRetry = false + return await fut except: retryCount -= 1 exceptionMessage = getCurrentExceptionMsg() diff --git a/waku/waku_rln_relay/group_manager/static/group_manager.nim b/waku/waku_rln_relay/group_manager/static/group_manager.nim index 48d38dd725..55853df0ce 100644 --- a/waku/waku_rln_relay/group_manager/static/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/static/group_manager.nim @@ -10,7 +10,7 @@ template initializedGuard*(g: StaticGroupManager): untyped = if not g.initialized: raise newException(ValueError, "StaticGroupManager is not initialized") -method init*(g: StaticGroupManager): Future[void] {.async.} = +method init*(g: StaticGroupManager): Future[GroupManagerResult[void]] {.async.} = let groupSize = g.groupSize groupKeys = g.groupKeys @@ -18,14 +18,13 @@ method init*(g: StaticGroupManager): Future[void] {.async.} = if g.membershipIndex.isSome(): g.membershipIndex.get() else: - raise newException(ValueError, "Membership index is not set") + return err("membershipIndex is not set") if membershipIndex < MembershipIndex(0) or membershipIndex >= MembershipIndex(groupSize): - raise newException( - ValueError, + return err( "Invalid membership index. Must be within 0 and " & $(groupSize - 1) & "but was " & - $membershipIndex, + $membershipIndex ) when defined(rln_v2): g.userMessageLimit = some(DefaultUserMessageLimit) @@ -39,15 +38,13 @@ method init*(g: StaticGroupManager): Future[void] {.async.} = ) ) let leaves = rateCommitments.toLeaves().valueOr: - raise newException( - ValueError, "Failed to convert rate commitments to leaves: " & $error - ) + return err("Failed to convert rate commitments to leaves: " & $error) let membersInserted = g.rlnInstance.insertMembers(g.latestIndex, leaves) else: let idCommitments = groupKeys.mapIt(it.idCommitment) let membersInserted = g.rlnInstance.insertMembers(g.latestIndex, idCommitments) if not membersInserted: - raise newException(ValueError, "Failed to insert members into the merkle tree") + return err("Failed to insert members into the merkle tree") discard g.slideRootQueue() @@ -55,7 +52,7 @@ method init*(g: StaticGroupManager): Future[void] {.async.} = g.initialized = true - return + return ok() method startGroupSync*( g: StaticGroupManager diff --git a/waku/waku_rln_relay/rln/wrappers.nim b/waku/waku_rln_relay/rln/wrappers.nim index 3f7c87ed4d..1f26e2eec4 100644 --- a/waku/waku_rln_relay/rln/wrappers.nim +++ b/waku/waku_rln_relay/rln/wrappers.nim @@ -4,8 +4,7 @@ import options, eth/keys, stew/[arrayops, byteutils, results, endians2], - std/[sequtils, strformat, strutils, tables], - nimcrypto/utils + std/[sequtils, strutils, tables] import ./rln_interface, ../conversion_utils, ../protocol_types, ../protocol_metrics import ../../waku_core, ../../waku_keystore diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 0b1506620e..fc2491ca44 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -273,7 +273,7 @@ proc validateMessageAndUpdateLog*( ## validates the message and updates the log to prevent double messaging ## in future messages - let result = rlnPeer.validateMessage(msg, timeOption) + let isValidMessage = rlnPeer.validateMessage(msg, timeOption) let decodeRes = RateLimitProof.init(msg.proof) if decodeRes.isErr(): @@ -288,7 +288,7 @@ proc validateMessageAndUpdateLog*( # insert the message to the log (never errors) discard rlnPeer.updateLog(msgProof.epoch, proofMetadataRes.get()) - return result + return isValidMessage proc toRLNSignal*(wakumessage: WakuMessage): seq[byte] = ## it is a utility proc that prepares the `data` parameter of the proof generation procedure i.e., `proofGen` that resides in the current module @@ -397,23 +397,22 @@ proc generateRlnValidator*( proc mount( conf: WakuRlnConfig, registrationHandler = none(RegistrationHandler) -): Future[WakuRlnRelay] {.async: (raises: [Exception]).} = +): Future[RlnRelayResult[WakuRlnRelay]] {.async.} = var groupManager: GroupManager wakuRlnRelay: WakuRLNRelay # create an RLN instance - let rlnInstanceRes = createRLNInstance(tree_path = conf.rlnRelayTreePath) - if rlnInstanceRes.isErr(): - raise newException(CatchableError, "RLN instance creation failed") - let rlnInstance = rlnInstanceRes.get() + let rlnInstance = createRLNInstance(tree_path = conf.rlnRelayTreePath).valueOr: + return err("could not create RLN instance: " & $error) + if not conf.rlnRelayDynamic: # static setup - let parsedGroupKeysRes = StaticGroupKeys.toIdentityCredentials() - if parsedGroupKeysRes.isErr(): - raise newException(ValueError, "Static group keys are not valid") + let parsedGroupKeys = StaticGroupKeys.toIdentityCredentials().valueOr: + return err("could not parse static group keys: " & $error) + groupManager = StaticGroupManager( groupSize: StaticGroupSize, - groupKeys: parsedGroupKeysRes.get(), + groupKeys: parsedGroupKeys, membershipIndex: conf.rlnRelayCredIndex, rlnInstance: rlnInstance, onFatalErrorAction: conf.onFatalErrorAction, @@ -442,25 +441,33 @@ proc mount( ) # Initialize the groupManager - await groupManager.init() + (await groupManager.init()).isOkOr: + return err("could not initialize the group manager: " & $error) # Start the group sync - await groupManager.startGroupSync() + (await groupManager.startGroupSync()).isOkOr: + return err("could not start the group sync: " & $error) when defined(rln_v2): - return WakuRLNRelay( - groupManager: groupManager, - nonceManager: - NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float), - rlnEpochSizeSec: conf.rlnEpochSizeSec, - rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1), - onFatalErrorAction: conf.onFatalErrorAction, + return ok( + WakuRLNRelay( + groupManager: groupManager, + nonceManager: + NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float), + rlnEpochSizeSec: conf.rlnEpochSizeSec, + rlnMaxEpochGap: + max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1), + onFatalErrorAction: conf.onFatalErrorAction, + ) ) else: - return WakuRLNRelay( - groupManager: groupManager, - rlnEpochSizeSec: conf.rlnEpochSizeSec, - rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1), - onFatalErrorAction: conf.onFatalErrorAction, + return ok( + WakuRLNRelay( + groupManager: groupManager, + rlnEpochSizeSec: conf.rlnEpochSizeSec, + rlnMaxEpochGap: + max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1), + onFatalErrorAction: conf.onFatalErrorAction, + ) ) proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} = @@ -486,7 +493,6 @@ proc new*( ## The rln-relay protocol can be mounted in two modes: on-chain and off-chain. ## Returns an error if the rln-relay protocol could not be mounted. try: - let rlnRelay = await mount(conf, registrationHandler) - return ok(rlnRelay) - except: - return err("exception in new WakuRlnRelay: " & getCurrentExceptionMsg()) + return await mount(conf, registrationHandler) + except CatchableError: + return err("could not mount the rln-relay protocol: " & getCurrentExceptionMsg()) From cbefb76d9f06ee80658fd0b94e586a7a8e078267 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Fri, 19 Apr 2024 16:06:47 +0530 Subject: [PATCH 2/7] fix: v2 too --- .../group_manager/on_chain/group_manager.nim | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 74dc93f7ca..1ac9941b7f 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -272,26 +272,28 @@ when defined(rln_v2): let registryContract = g.registryContract.get() let membershipFee = g.membershipFee.get() - var gasPrice: int - g.retryWrapper(gasPrice, "Failed to get gas price"): - int(await ethRpc.provider.eth_gasPrice()) * 2 + let gasPrice = + int( + await g.retryWrapper(ethRpc.provider.eth_gasPrice(), "Failed to get gas price") + ) * 2 let idCommitment = identityCredential.idCommitment.toUInt256() - var txHash: TxHash let storageIndex = g.usingStorageIndex.get() debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex, userMessageLimit = userMessageLimit - g.retryWrapper(txHash, "Failed to register the member"): - await registryContract - .register(storageIndex, idCommitment, u256(userMessageLimit)) - .send(gasPrice = gasPrice) + let txHash = await g.retryWrapper( + registryContract.register(storageIndex, idCommitment, u256(userMessageLimit)).send( + gasPrice = gasPrice + ), + "Failed to register the member", + ) # wait for the transaction to be mined - var tsReceipt: ReceiptObject - g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"): - await ethRpc.getMinedTransactionReceipt(txHash) + let tsReceipt = await g.retryWrapper( + ethRpc.getMinedTransactionReceipt(txHash), "Failed to get the transaction receipt" + ) debug "registration transaction mined", txHash = txHash g.registrationTxHash = some(txHash) # the receipt topic holds the hash of signature of the raised events From ac19c31308b2c30982b9b3188ade6d9131a5dc73 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Fri, 19 Apr 2024 19:42:51 +0530 Subject: [PATCH 3/7] fix: for static group manager --- .../test_rln_group_manager_static.nim | 100 ++++++++---------- .../group_manager/static/group_manager.nim | 12 ++- 2 files changed, 53 insertions(+), 59 deletions(-) diff --git a/tests/waku_rln_relay/test_rln_group_manager_static.nim b/tests/waku_rln_relay/test_rln_group_manager_static.nim index 29c987f246..b6ff5a511a 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_static.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_static.nim @@ -30,12 +30,11 @@ proc generateCredentials(rlnInstance: ptr RLN, n: int): seq[IdentityCredential] suite "Static group manager": setup: - let rlnInstanceRes = - createRlnInstance(tree_path = genTempPath("rln_tree", "group_manager_static")) - require: - rlnInstanceRes.isOk() + let rlnInstance = createRlnInstance( + tree_path = genTempPath("rln_tree", "group_manager_static") + ).valueOr: + raiseAssert $error - let rlnInstance = rlnInstanceRes.get() let credentials = generateCredentials(rlnInstance, 10) let manager {.used.} = StaticGroupManager( @@ -46,16 +45,14 @@ suite "Static group manager": ) asyncTest "should initialize successfully": - let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootBeforeRes.isOk() - let merkleRootBefore = merkleRootBeforeRes.get() + let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error + + (await manager.init()).isOkOr: + raiseAssert $error + let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error - await manager.init() - let merkleRootAfterRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootAfterRes.isOk() - let merkleRootAfter = merkleRootAfterRes.get() check: manager.idCredentials.isSome() manager.groupKeys.len == 10 @@ -66,16 +63,14 @@ suite "Static group manager": merkleRootAfter.inHex() != merkleRootBefore.inHex() asyncTest "startGroupSync: should start group sync": - await manager.init() + (await manager.init()).isOkOr: + raiseAssert $error require: manager.validRoots.len() == 1 manager.rlnInstance.getMerkleRoot().get() == manager.validRoots[0] - try: - await manager.startGroupSync() - except Exception, CatchableError: - assert false, - "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() + (await manager.startGroupSync()).isOkOr: + raiseAssert $error asyncTest "startGroupSync: should guard against uninitialized state": let manager = StaticGroupManager( @@ -84,13 +79,9 @@ suite "Static group manager": groupKeys: @[], rlnInstance: rlnInstance, ) - try: - await manager.startGroupSync() - except ValueError: - assert true - except Exception, CatchableError: - assert false, - "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() + + (await manager.startGroupSync()).isErrOr: + raiseAssert "StartGroupSync: expected error" asyncTest "register: should guard against uninitialized state": let manager = StaticGroupManager( @@ -117,17 +108,14 @@ suite "Static group manager": assert false, "exception raised: " & getCurrentExceptionMsg() asyncTest "register: should register successfully": - await manager.init() - try: - await manager.startGroupSync() - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() + (await manager.init()).isOkOr: + raiseAssert $error + (await manager.startGroupSync()).isOkOr: + raiseAssert $error let idCommitment = generateCredentials(manager.rlnInstance).idCommitment - let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootBeforeRes.isOk() - let merkleRootBefore = merkleRootBeforeRes.get() + let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error try: when defined(rln_v2): await manager.register( @@ -139,10 +127,8 @@ suite "Static group manager": await manager.register(idCommitment) except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() - let merkleRootAfterRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootAfterRes.isOk() - let merkleRootAfter = merkleRootAfterRes.get() + let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error check: merkleRootAfter.inHex() != merkleRootBefore.inHex() manager.latestIndex == 10 @@ -171,8 +157,10 @@ suite "Static group manager": try: manager.onRegister(callback) - await manager.init() - await manager.startGroupSync() + (await manager.init()).isOkOr: + raiseAssert $error + (await manager.startGroupSync()).isOkOr: + raiseAssert $error when defined(rln_v2): await manager.register( RateCommitment( @@ -199,25 +187,21 @@ suite "Static group manager": assert false, "exception raised: " & getCurrentExceptionMsg() asyncTest "withdraw: should withdraw successfully": - await manager.init() - try: - await manager.startGroupSync() - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() + (await manager.init()).isOkOr: + raiseAssert $error + (await manager.startGroupSync()).isOkOr: + raiseAssert $error let idSecretHash = credentials[0].idSecretHash - let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootBeforeRes.isOk() - let merkleRootBefore = merkleRootBeforeRes.get() + let merkleRootBefore = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error + try: await manager.withdraw(idSecretHash) except Exception, CatchableError: assert false, "exception raised: " & getCurrentExceptionMsg() - let merkleRootAfterRes = manager.rlnInstance.getMerkleRoot() - require: - merkleRootAfterRes.isOk() - let merkleRootAfter = merkleRootAfterRes.get() + let merkleRootAfter = manager.rlnInstance.getMerkleRoot().valueOr: + raiseAssert $error check: merkleRootAfter.inHex() != merkleRootBefore.inHex() @@ -245,8 +229,10 @@ suite "Static group manager": try: manager.onWithdraw(callback) - await manager.init() - await manager.startGroupSync() + (await manager.init()).isOkOr: + raiseAssert $error + (await manager.startGroupSync()).isOkOr: + raiseAssert $error await manager.withdraw(idSecretHash) except Exception, CatchableError: diff --git a/waku/waku_rln_relay/group_manager/static/group_manager.nim b/waku/waku_rln_relay/group_manager/static/group_manager.nim index 55853df0ce..c577512fac 100644 --- a/waku/waku_rln_relay/group_manager/static/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/static/group_manager.nim @@ -10,6 +10,13 @@ template initializedGuard*(g: StaticGroupManager): untyped = if not g.initialized: raise newException(ValueError, "StaticGroupManager is not initialized") +proc resultifiedInitGuard(g: StaticGroupManager): GroupManagerResult[void] = + try: + initializedGuard(g) + return ok() + except CatchableError: + return err("StaticGroupManager is not initialized") + method init*(g: StaticGroupManager): Future[GroupManagerResult[void]] {.async.} = let groupSize = g.groupSize @@ -56,9 +63,10 @@ method init*(g: StaticGroupManager): Future[GroupManagerResult[void]] {.async.} method startGroupSync*( g: StaticGroupManager -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) +): Future[GroupManagerResult[void]] {.async.} = + ?g.resultifiedInitGuard() # No-op + return ok() when defined(rln_v2): method register*( From d3c1490ff8a43aed51c8f80828073b48d130b33c Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Thu, 25 Apr 2024 12:02:16 +0200 Subject: [PATCH 4/7] fix: cleanup, make PR digestable --- .../group_manager/on_chain/group_manager.nim | 83 ++++++++----------- .../group_manager/on_chain/retry_wrapper.nim | 4 +- 2 files changed, 36 insertions(+), 51 deletions(-) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 75dc37819c..7a46e70a6d 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -613,10 +613,13 @@ proc startListeningToEvents( proc batchAwaitBlockHandlingFuture( g: OnchainGroupManager, futs: seq[Future[bool]] -): Future[void] {.async: (raises: [Exception]).} = +): Future[bool] {.async: (raises: [Exception]).} = for fut in futs: try: - await g.retryWrapper(fut, "Failed to handle block") + var res: bool + g.retryWrapper(res, "Failed to handle block"): + await fut + discard res except CatchableError: raise newException( CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg() @@ -668,7 +671,7 @@ proc startOnchainSync( await sleepAsync(rpcDelay) futs.add(g.getAndHandleEvents(fromBlock, toBlock)) if futs.len >= maxFutures or toBlock == currentLatestBlock: - await g.batchAwaitBlockHandlingFuture(futs) + discard await g.batchAwaitBlockHandlingFuture(futs) futs = newSeq[Future[bool]]() fromBlock = toBlock + 1 except CatchableError: @@ -705,19 +708,13 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} = # check if the Ethereum client is reachable var ethRpc: Web3 - let ethRpc = ( - await g.resultifiedRetryWrapper( - newWeb3(g.ethClientUrl), "Failed to connect to the Ethereum client" - ) - ).valueOr: - return err("failed to connect to the Ethereum client: " & $error) + g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"): + await newWeb3(g.ethClientUrl) + # Set the chain id - let chainId = ( - await g.resultifiedRetryWrapper( - ethRpc.provider.eth_chainId(), "Failed to get the chain id" - ) - ).valueOr: - return err("failed to get the chain id: " & $error) + var chainId: Quantity + g.retryWrapper(chainId, "Failed to get the chain id"): + await ethRpc.provider.eth_chainId() g.chainId = some(chainId) if g.ethPrivateKey.isSome(): @@ -732,21 +729,14 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} let registryContract = ethRpc.contractSender(WakuRlnRegistry, registryAddress) # get the current storage index - let usingStorageIndex = ( - await g.resultifiedRetryWrapper( - registryContract.usingStorageIndex().call(), "Failed to get the storage index" - ) - ).valueOr: - return err("failed to get the storage index: " & $error) + var usingStorageIndex: Uint16 + g.retryWrapper(usingStorageIndex, "Failed to get the storage index"): + await registryContract.usingStorageIndex().call() g.usingStorageIndex = some(usingStorageIndex) - let rlnContractAddress = ( - await g.resultifiedRetryWrapper( - registryContract.storages(usingStorageIndex).call(), - "Failed to get the rln contract address", - ) - ).valueOr: - return err("failed to get the rln contract address: " & $error) + var rlnContractAddress: Address + g.retryWrapper(rlnContractAddress, "Failed to get the rln contract address"): + await registryContract.storages(usingStorageIndex).call() let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress) g.ethRpc = some(ethRpc) @@ -802,21 +792,14 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} g.validRoots = metadata.validRoots.toDeque() # check if the contract exists by calling a static function - let membershipFee = ( - await g.resultifiedRetryWrapper( - rlnContract.MEMBERSHIP_DEPOSIT().call(), "Failed to get the membership deposit" - ) - ).valueOr: - return err("failed to get the membership deposit: " & $error) + var membershipFee: Uint256 + g.retryWrapper(membershipFee, "Failed to get the membership deposit"): + await rlnContract.MEMBERSHIP_DEPOSIT().call() g.membershipFee = some(membershipFee) - let deployedBlockNumber = ( - await g.resultifiedRetryWrapper( - rlnContract.deployedBlockNumber().call(), - "Failed to get the deployed block number", - ) - ).valueOr: - return err("failed to get the deployed block number: " & $error) + var deployedBlockNumber: Uint256 + g.retryWrapper(deployedBlockNumber, "Failed to get the deployed block number"): + await rlnContract.deployedBlockNumber().call() debug "using rln storage", deployedBlockNumber, rlnContractAddress g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) @@ -826,9 +809,9 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock - let newEthRpc = await g.retryWrapper( - newWeb3(g.ethClientUrl), "Failed to reconnect with the Ethereum client" - ) + var newEthRpc: Web3 + g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"): + await newWeb3(g.ethClientUrl) newEthRpc.ondisconnect = ethRpc.ondisconnect g.ethRpc = some(newEthRpc) @@ -862,9 +845,9 @@ method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} = proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async, gcsafe.} = let ethRpc = g.ethRpc.get() - let syncing = await g.retryWrapper( - ethRpc.provider.eth_syncing(), "Failed to get the syncing status" - ) + var syncing: JsonNode + g.retryWrapper(syncing, "Failed to get the syncing status"): + await ethRpc.provider.eth_syncing() return syncing.getBool() method isReady*(g: OnchainGroupManager): Future[bool] {.async.} = @@ -873,9 +856,9 @@ method isReady*(g: OnchainGroupManager): Future[bool] {.async.} = if g.ethRpc.isNone(): return false - let currentBlock = cast[BlockNumber](await g.retryWrapper( - g.ethRpc.get().provider.eth_blockNumber(), "Failed to get the current block number" - )) + var currentBlock: BlockNumber + g.retryWrapper(currentBlock, "Failed to get the current block number"): + cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber()) if g.latestProcessedBlock < currentBlock: return false diff --git a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim index 7a8114ffdb..9ae000e3ce 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim @@ -11,6 +11,7 @@ proc new*(T: type RetryStrategy): RetryStrategy = return RetryStrategy(shouldRetry: true, retryDelay: 4000.millis, retryCount: 15) template retryWrapper*( + res: auto, retryStrategy: RetryStrategy, errStr: string, errCallback: OnFatalErrorHandler, @@ -22,7 +23,8 @@ template retryWrapper*( while shouldRetry and retryCount > 0: try: - return await fut + res = body + shouldRetry = false except: retryCount -= 1 exceptionMessage = getCurrentExceptionMsg() From 6675ec747ef524e5b855152985fe6fabda9d52dc Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Thu, 25 Apr 2024 12:02:45 +0200 Subject: [PATCH 5/7] fix: remove resultified retry wrapper --- .../group_manager/on_chain/group_manager.nim | 9 --------- 1 file changed, 9 deletions(-) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 7a46e70a6d..6ec741313c 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -134,15 +134,6 @@ template retryWrapper( retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction): body -proc resultifiedRetryWrapper[T]( - g: OnchainGroupManager, fut: Future[T], errStr: string -): Future[RlnRelayResult[T]] {.async.} = - try: - return - ok(await retryWrapper(RetryStrategy.new(), errStr, g.onFatalErrorAction, fut)) - except CatchableError: - return err(errStr & ": " & getCurrentExceptionMsg()) - proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] = try: let metadataSetRes = g.rlnInstance.setMetadata( From 97edfe15a64a4d93374abf30192dd5de47450f33 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Thu, 25 Apr 2024 12:04:43 +0200 Subject: [PATCH 6/7] fix: cleanup --- .../group_manager/on_chain/group_manager.nim | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 6ec741313c..98a1ec8027 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -604,13 +604,12 @@ proc startListeningToEvents( proc batchAwaitBlockHandlingFuture( g: OnchainGroupManager, futs: seq[Future[bool]] -): Future[bool] {.async: (raises: [Exception]).} = +): Future[void] {.async: (raises: [Exception]).} = for fut in futs: try: - var res: bool - g.retryWrapper(res, "Failed to handle block"): + var handleBlockRes: bool + g.retryWrapper(handleBlockRes, "Failed to handle block"): await fut - discard res except CatchableError: raise newException( CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg() @@ -662,7 +661,7 @@ proc startOnchainSync( await sleepAsync(rpcDelay) futs.add(g.getAndHandleEvents(fromBlock, toBlock)) if futs.len >= maxFutures or toBlock == currentLatestBlock: - discard await g.batchAwaitBlockHandlingFuture(futs) + await g.batchAwaitBlockHandlingFuture(futs) futs = newSeq[Future[bool]]() fromBlock = toBlock + 1 except CatchableError: From cc8b9dac39107c7385513da4174cd50f73881d4f Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Thu, 25 Apr 2024 12:06:02 +0200 Subject: [PATCH 7/7] fix: cleanup --- waku/waku_rln_relay/group_manager/on_chain/group_manager.nim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 98a1ec8027..dad3da9289 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -557,6 +557,8 @@ proc getAndHandleEvents( else: trace "rln metadata persisted", blockNumber = g.latestProcessedBlock + return true + proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) = g.blockFetchingActive = false