diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index ee1089ae16..df11d74c8e 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -18,7 +18,8 @@ import ../../waku/common/logging, ../../waku/factory/external_config, ../../waku/factory/networks_config, - ../../waku/factory/app + ../../waku/factory/app, + ../../waku/node/health_monitor logScope: topics = "wakunode main" @@ -88,54 +89,74 @@ when isMainModule: doInspectRlnDb(conf) of noCommand: case conf.clusterId - # cluster-id=0 - of 0: - let clusterZeroConf = ClusterConf.ClusterZeroConf() - conf.pubsubTopics = clusterZeroConf.pubsubTopics - # TODO: Write some template to "merge" the configs - # cluster-id=1 (aka The Waku Network) - of 1: - let twnClusterConf = ClusterConf.TheWakuNetworkConf() - if len(conf.shards) != 0: - conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16]) - else: - conf.pubsubTopics = twnClusterConf.pubsubTopics - - # Override configuration - conf.maxMessageSize = twnClusterConf.maxMessageSize - conf.clusterId = twnClusterConf.clusterId - conf.rlnRelay = twnClusterConf.rlnRelay - conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress - conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic - conf.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold - conf.discv5Discovery = twnClusterConf.discv5Discovery - conf.discv5BootstrapNodes = - conf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes - conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec - conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + # cluster-id=0 + of 0: + let clusterZeroConf = ClusterConf.ClusterZeroConf() + conf.pubsubTopics = clusterZeroConf.pubsubTopics + # TODO: Write some template to "merge" the configs + # cluster-id=1 (aka The Waku Network) + of 1: + let twnClusterConf = ClusterConf.TheWakuNetworkConf() + if len(conf.shards) != 0: + conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16]) else: - discard + conf.pubsubTopics = twnClusterConf.pubsubTopics + + # Override configuration + conf.maxMessageSize = twnClusterConf.maxMessageSize + conf.clusterId = twnClusterConf.clusterId + conf.rlnRelay = twnClusterConf.rlnRelay + conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress + conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic + conf.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold + conf.discv5Discovery = twnClusterConf.discv5Discovery + conf.discv5BootstrapNodes = + conf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes + conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec + conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + else: + discard info "Running nwaku node", version = app.git_version logConfig(conf) + # NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it + # It will always be called from main thread anyway. + # Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety + var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor + nodeHealthMonitor = WakuNodeHealthMonitor() + nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING) + + let restServerRes = startRestServerEsentials(nodeHealthMonitor, conf) + if restServerRes.isErr(): + error "Starting REST server failed. . Continuing in current state.", + error = $restServerRes.error() + var wakunode2 = App.init(conf).valueOr: error "App initialization failed", error = error quit(QuitFailure) + nodeHealthMonitor.setNode(wakunode2.node) + wakunode2.startApp().isOkOr: error "Starting app failed", error = error quit(QuitFailure) + if conf.rest and not restServerRes.isErr(): + wakunode2.restServer = restServerRes.value + wakunode2.setupMonitoringAndExternalInterfaces().isOkOr: error "Starting monitoring and external interfaces failed", error = error quit(QuitFailure) + nodeHealthMonitor.setOverallHealth(HealthStatus.READY) + debug "Setting up shutdown hooks" ## Setup shutdown hooks for this process. ## Stop node gracefully on shutdown. proc asyncStopper(node: App) {.async: (raises: [Exception]).} = + nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) await node.stop() quit(QuitSuccess) diff --git a/waku/factory/app.nim b/waku/factory/app.nim index 1838110f1d..4d04a1a45c 100644 --- a/waku/factory/app.nim +++ b/waku/factory/app.nim @@ -29,6 +29,7 @@ import ../../waku/node/waku_metrics, ../../waku/node/peer_manager, ../../waku/node/peer_manager/peer_store/waku_peer_storage, + ../../waku/node/health_monitor, ../../waku/waku_api/message_cache, ../../waku/waku_api/handlers, ../../waku/waku_api/rest/server, @@ -70,7 +71,7 @@ type node: WakuNode - restServer: Option[WakuRestServerRef] + restServer*: Option[WakuRestServerRef] metricsServer: Option[MetricsHttpServerRef] AppResult*[T] = Result[T, string] @@ -155,7 +156,7 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] = ## Setup DiscoveryV5 -proc setupDiscoveryV5(app: App): WakuDiscoveryV5 = +proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 = let dynamicBootstrapEnrs = app.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get()) @@ -281,12 +282,19 @@ proc startApp*(app: var App): AppResult[void] = ## Monitoring and external interfaces -proc startRestServer( - app: App, address: IpAddress, port: Port, conf: WakuNodeConf -): AppResult[WakuRestServerRef] = - # Used to register api endpoints that are not currently installed as keys, - # values are holding error messages to be returned to the client - var notInstalledTab: Table[string, string] = initTable[string, string]() +# Used to register api endpoints that are not currently installed as keys, +# values are holding error messages to be returned to the client +# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it +# It will always be called from main thread anyway. +# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety +var restServerNotInstalledTab {.threadvar.}: TableRef[string, string] +restServerNotInstalledTab = newTable[string, string]() + +proc startRestServerEsentials*( + nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf +): AppResult[Option[WakuRestServerRef]] = + if not conf.rest: + return ok(none(WakuRestServerRef)) let requestErrorHandler: RestRequestErrorHandler = proc( error: RestRequestError, request: HttpRequestRef @@ -302,7 +310,7 @@ proc startRestServer( paths[1] else: "" - notInstalledTab.withValue(rootPath, errMsg): + restServerNotInstalledTab[].withValue(rootPath, errMsg): return await request.respond(Http404, errMsg[], HttpTable.init()) do: return await request.respond( @@ -328,6 +336,8 @@ proc startRestServer( else: none(string) + let address = conf.restAddress + let port = Port(conf.restPort + conf.portsShift) let server = ?newRestHttpServer( address, @@ -336,37 +346,64 @@ proc startRestServer( requestErrorHandler = requestErrorHandler, ) + ## Health REST API + installHealthApiHandler(server.router, nodeHealthMonitor) + + restServerNotInstalledTab["admin"] = + "/admin endpoints are not available while initializing." + restServerNotInstalledTab["debug"] = + "/debug endpoints are not available while initializing." + restServerNotInstalledTab["relay"] = + "/relay endpoints are not available while initializing." + restServerNotInstalledTab["filter"] = + "/filter endpoints are not available while initializing." + restServerNotInstalledTab["lightpush"] = + "/lightpush endpoints are not available while initializing." + restServerNotInstalledTab["store"] = + "/store endpoints are not available while initializing." + + server.start() + info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" + + ok(some(server)) + +proc startRestServerProtocolSupport(app: var App): AppResult[void] = + if not app.conf.rest or app.restServer.isNone(): + ## Maybe we don't need rest server at all, so it is ok. + return ok() + + var router = app.restServer.get().router ## Admin REST API - if conf.restAdmin: - installAdminApiHandlers(server.router, app.node) + if app.conf.restAdmin: + installAdminApiHandlers(router, app.node) + else: + restServerNotInstalledTab["admin"] = + "/admin endpoints are not available. Please check your configuration: --rest-admin=true" ## Debug REST API - installDebugApiHandlers(server.router, app.node) - - ## Health REST API - installHealthApiHandler(server.router, app.node) + installDebugApiHandlers(router, app.node) ## Relay REST API - if conf.relay: - let cache = MessageCache.init(int(conf.restRelayCacheCapacity)) + if app.conf.relay: + let cache = MessageCache.init(int(app.conf.restRelayCacheCapacity)) let handler = messageCacheHandler(cache) - for pubsubTopic in conf.pubsubTopics: + for pubsubTopic in app.conf.pubsubTopics: cache.pubsubSubscribe(pubsubTopic) app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) - for contentTopic in conf.contentTopics: + for contentTopic in app.conf.contentTopics: cache.contentSubscribe(contentTopic) app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler)) - installRelayApiHandlers(server.router, app.node, cache) + installRelayApiHandlers(router, app.node, cache) else: - notInstalledTab["relay"] = + restServerNotInstalledTab["relay"] = "/relay endpoints are not available. Please check your configuration: --relay" ## Filter REST API - if conf.filternode != "" and app.node.wakuFilterClient != nil: + if app.conf.filternode != "" and app.node.wakuFilterClient != nil: let filterCache = MessageCache.init() let filterDiscoHandler = @@ -376,10 +413,10 @@ proc startRestServer( none(DiscoveryHandler) rest_filter_api.installFilterRestApiHandlers( - server.router, app.node, filterCache, filterDiscoHandler + router, app.node, filterCache, filterDiscoHandler ) else: - notInstalledTab["filter"] = + restServerNotInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode" ## Store REST API @@ -389,10 +426,10 @@ proc startRestServer( else: none(DiscoveryHandler) - installStoreApiHandlers(server.router, app.node, storeDiscoHandler) + installStoreApiHandlers(router, app.node, storeDiscoHandler) ## Light push API - if conf.lightpushnode != "" and app.node.wakuLightpushClient != nil: + if app.conf.lightpushnode != "" and app.node.wakuLightpushClient != nil: let lightDiscoHandler = if app.wakuDiscv5.isSome(): some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush)) @@ -400,16 +437,14 @@ proc startRestServer( none(DiscoveryHandler) rest_lightpush_api.installLightPushRequestHandler( - server.router, app.node, lightDiscoHandler + router, app.node, lightDiscoHandler ) else: - notInstalledTab["lightpush"] = + restServerNotInstalledTab["lightpush"] = "/lightpush endpoints are not available. Please check your configuration: --lightpushnode" - server.start() - info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" - - ok(server) + info "REST services are installed" + ok() proc startMetricsServer( serverIp: IpAddress, serverPort: Port @@ -434,15 +469,11 @@ proc startMetricsLogging(): AppResult[void] = ok() proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] = - if app.conf.rest: - let startRestServerRes = startRestServer( - app, app.conf.restAddress, Port(app.conf.restPort + app.conf.portsShift), app.conf - ) - if startRestServerRes.isErr(): - error "Starting REST server failed. Continuing in current state.", - error = startRestServerRes.error - else: - app.restServer = some(startRestServerRes.value) + if app.conf.rest and app.restServer.isSome(): + let restProtocolSupportRes = startRestServerProtocolSupport(app) + if restProtocolSupportRes.isErr(): + error "Starting REST server protocol support failed. Continuing in current state.", + error = restProtocolSupportRes.error if app.conf.metricsServer: let startMetricsServerRes = startMetricsServer( diff --git a/waku/node/health_monitor.nim b/waku/node/health_monitor.nim new file mode 100644 index 0000000000..755441f8fe --- /dev/null +++ b/waku/node/health_monitor.nim @@ -0,0 +1,65 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[options, sequtils, strutils, tables], chronos + +import waku_node, ../waku_rln_relay + +type + HealthStatus* = enum + INITIALIZING + SYNCHRONIZING + READY + NOT_READY + NOT_MOUNTED + SHUTTING_DOWN + + HealtReport* = object + nodeHealth*: HealthStatus + protocolHealth*: Table[string, HealthStatus] + + WakuNodeHealthMonitor* = ref object + nodeHealth: HealthStatus + node: Option[WakuNode] + +proc `$`*(t: HealthStatus): string = + result = + case t + of INITIALIZING: "Initializing" + of SYNCHRONIZING: "Synchronizing" + of READY: "Ready" + of NOT_READY: "Not Ready" + of NOT_MOUNTED: "Not Mounted" + of SHUTTING_DOWN: "Shutting Down" + +const FutIsReadyTimout = 5.seconds + +proc getNodeHealthReport*(hm: WakuNodeHealthMonitor): Future[HealtReport] {.async.} = + result.nodeHealth = hm.nodeHealth + + if hm.node.isSome(): ## and not hm.node.get().wakuRlnRelay == nil: + let getRlnRelayHealth = proc(): Future[HealthStatus] {.async.} = + let isReadyStateFut = hm.node.get().wakuRlnRelay.isReady() + if not await isReadyStateFut.withTimeout(FutIsReadyTimout): + return HealthStatus.NOT_READY + + try: + if not isReadyStateFut.completed(): + return HealthStatus.NOT_READY + elif isReadyStateFut.read(): + return HealthStatus.READY + + return HealthStatus.SYNCHRONIZING + except: + error "exception reading state: " & getCurrentExceptionMsg() + return HealthStatus.NOT_READY + + result.protocolHealth["Rln Relay"] = await getRlnRelayHealth() + +proc setNode*(hm: WakuNodeHealthMonitor, node: WakuNode) = + hm.node = some(node) + +proc setOverallHealth*(hm: WakuNodeHealthMonitor, health: HealthStatus) = + hm.nodeHealth = health diff --git a/waku/waku_api/rest/health/handlers.nim b/waku/waku_api/rest/health/handlers.nim index ea5d658b6a..53d16f1cd2 100644 --- a/waku/waku_api/rest/health/handlers.nim +++ b/waku/waku_api/rest/health/handlers.nim @@ -11,26 +11,25 @@ logScope: const ROUTE_HEALTH* = "/health" -const FutIsReadyTimout = 5.seconds - -proc installHealthApiHandler*(router: var RestRouter, node: WakuNode) = - ## /health endpoint provides information about node readiness to caller. - ## Currently it is restricted to checking RLN (if mounted) proper setup - ## TODO: Leter to extend it to a broader information about each subsystem state - ## report. Rest response to change to JSON structure that can hold exact detailed - ## information. +const FitHealthReportTimeout = 5.seconds +proc installHealthApiHandler*( + router: var RestRouter, nodeHealthMonitor: WakuNodeHealthMonitor +) = router.api(MethodGet, ROUTE_HEALTH) do() -> RestApiResponse: - let isReadyStateFut = node.isReady() - if not await isReadyStateFut.withTimeout(FutIsReadyTimout): + let healthReportFut = nodeHealthMonitor.getNodeHealthReport() + if not await healthReportFut.withTimeout(FitHealthReportTimeout): return RestApiResponse.internalServerError("Health check timed out") - var msg = "Node is healthy" + var msg = "" var status = Http200 try: - if not isReadyStateFut.read(): - msg = "Node is not ready" + if healthReportFut.completed(): + let healthReport = healthReportFut.read() + msg = $healthReport + else: + msg = "Health check failed" status = Http503 except: msg = "exception reading state: " & getCurrentExceptionMsg() diff --git a/waku/waku_node.nim b/waku/waku_node.nim index 54536f91d7..f1c6471117 100644 --- a/waku/waku_node.nim +++ b/waku/waku_node.nim @@ -1,3 +1,7 @@ -import ./node/config, ./node/waku_switch as switch, ./node/waku_node as node +import + ./node/config, + ./node/waku_switch as switch, + ./node/waku_node as node, + ./node/health_monitor as health_monitor -export config, switch, node +export config, switch, node, health_monitor