Skip to content

Commit

Permalink
Separation of node health and initialization state from rln_relay sta…
Browse files Browse the repository at this point in the history
…tus. Make (only) health endpoint avail early and install others in the last stage of node setup.
  • Loading branch information
NagyZoltanPeter committed Apr 22, 2024
1 parent e61e4ff commit 1d9a5ef
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 83 deletions.
75 changes: 48 additions & 27 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import
../../waku/common/logging,
../../waku/factory/external_config,
../../waku/factory/networks_config,
../../waku/factory/app
../../waku/factory/app,
../../waku/node/health_monitor

logScope:
topics = "wakunode main"
Expand Down Expand Up @@ -88,54 +89,74 @@ when isMainModule:
doInspectRlnDb(conf)
of noCommand:
case conf.clusterId
# cluster-id=0
of 0:
let clusterZeroConf = ClusterConf.ClusterZeroConf()
conf.pubsubTopics = clusterZeroConf.pubsubTopics
# TODO: Write some template to "merge" the configs
# cluster-id=1 (aka The Waku Network)
of 1:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
if len(conf.shards) != 0:
conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
else:
conf.pubsubTopics = twnClusterConf.pubsubTopics

# Override configuration
conf.maxMessageSize = twnClusterConf.maxMessageSize
conf.clusterId = twnClusterConf.clusterId
conf.rlnRelay = twnClusterConf.rlnRelay
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
conf.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold
conf.discv5Discovery = twnClusterConf.discv5Discovery
conf.discv5BootstrapNodes =
conf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
# cluster-id=0
of 0:
let clusterZeroConf = ClusterConf.ClusterZeroConf()
conf.pubsubTopics = clusterZeroConf.pubsubTopics
# TODO: Write some template to "merge" the configs
# cluster-id=1 (aka The Waku Network)
of 1:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
if len(conf.shards) != 0:
conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
else:
discard
conf.pubsubTopics = twnClusterConf.pubsubTopics

# Override configuration
conf.maxMessageSize = twnClusterConf.maxMessageSize
conf.clusterId = twnClusterConf.clusterId
conf.rlnRelay = twnClusterConf.rlnRelay
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
conf.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold
conf.discv5Discovery = twnClusterConf.discv5Discovery
conf.discv5BootstrapNodes =
conf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
else:
discard

info "Running nwaku node", version = app.git_version
logConfig(conf)

# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
# It will always be called from main thread anyway.
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor
nodeHealthMonitor = WakuNodeHealthMonitor()
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)

let restServerRes = startRestServerEsentials(nodeHealthMonitor, conf)
if restServerRes.isErr():
error "Starting REST server failed. . Continuing in current state.",
error = $restServerRes.error()

var wakunode2 = App.init(conf).valueOr:
error "App initialization failed", error = error
quit(QuitFailure)

nodeHealthMonitor.setNode(wakunode2.node)

wakunode2.startApp().isOkOr:
error "Starting app failed", error = error
quit(QuitFailure)

if conf.rest and not restServerRes.isErr():
wakunode2.restServer = restServerRes.value

wakunode2.setupMonitoringAndExternalInterfaces().isOkOr:
error "Starting monitoring and external interfaces failed", error = error
quit(QuitFailure)

nodeHealthMonitor.setOverallHealth(HealthStatus.READY)

debug "Setting up shutdown hooks"
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.

proc asyncStopper(node: App) {.async: (raises: [Exception]).} =
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
await node.stop()
quit(QuitSuccess)

Expand Down
113 changes: 72 additions & 41 deletions waku/factory/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import
../../waku/node/waku_metrics,
../../waku/node/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/node/health_monitor,
../../waku/waku_api/message_cache,
../../waku/waku_api/handlers,
../../waku/waku_api/rest/server,
Expand Down Expand Up @@ -70,7 +71,7 @@ type

node: WakuNode

restServer: Option[WakuRestServerRef]
restServer*: Option[WakuRestServerRef]
metricsServer: Option[MetricsHttpServerRef]

AppResult*[T] = Result[T, string]
Expand Down Expand Up @@ -155,7 +156,7 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =

## Setup DiscoveryV5

proc setupDiscoveryV5(app: App): WakuDiscoveryV5 =
proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
let dynamicBootstrapEnrs =
app.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())

Expand Down Expand Up @@ -281,12 +282,19 @@ proc startApp*(app: var App): AppResult[void] =

## Monitoring and external interfaces

proc startRestServer(
app: App, address: IpAddress, port: Port, conf: WakuNodeConf
): AppResult[WakuRestServerRef] =
# Used to register api endpoints that are not currently installed as keys,
# values are holding error messages to be returned to the client
var notInstalledTab: Table[string, string] = initTable[string, string]()
# Used to register api endpoints that are not currently installed as keys,
# values are holding error messages to be returned to the client
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
# It will always be called from main thread anyway.
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
var restServerNotInstalledTab {.threadvar.}: TableRef[string, string]
restServerNotInstalledTab = newTable[string, string]()

proc startRestServerEsentials*(
nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf
): AppResult[Option[WakuRestServerRef]] =
if not conf.rest:
return ok(none(WakuRestServerRef))

let requestErrorHandler: RestRequestErrorHandler = proc(
error: RestRequestError, request: HttpRequestRef
Expand All @@ -302,7 +310,7 @@ proc startRestServer(
paths[1]
else:
""
notInstalledTab.withValue(rootPath, errMsg):
restServerNotInstalledTab[].withValue(rootPath, errMsg):
return await request.respond(Http404, errMsg[], HttpTable.init())
do:
return await request.respond(
Expand All @@ -328,6 +336,8 @@ proc startRestServer(
else:
none(string)

let address = conf.restAddress
let port = Port(conf.restPort + conf.portsShift)
let server =
?newRestHttpServer(
address,
Expand All @@ -336,37 +346,64 @@ proc startRestServer(
requestErrorHandler = requestErrorHandler,
)

## Health REST API
installHealthApiHandler(server.router, nodeHealthMonitor)

restServerNotInstalledTab["admin"] =
"/admin endpoints are not available while initializing."
restServerNotInstalledTab["debug"] =
"/debug endpoints are not available while initializing."
restServerNotInstalledTab["relay"] =
"/relay endpoints are not available while initializing."
restServerNotInstalledTab["filter"] =
"/filter endpoints are not available while initializing."
restServerNotInstalledTab["lightpush"] =
"/lightpush endpoints are not available while initializing."
restServerNotInstalledTab["store"] =
"/store endpoints are not available while initializing."

server.start()
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"

ok(some(server))

proc startRestServerProtocolSupport(app: var App): AppResult[void] =
if not app.conf.rest or app.restServer.isNone():
## Maybe we don't need rest server at all, so it is ok.
return ok()

var router = app.restServer.get().router
## Admin REST API
if conf.restAdmin:
installAdminApiHandlers(server.router, app.node)
if app.conf.restAdmin:
installAdminApiHandlers(router, app.node)
else:
restServerNotInstalledTab["admin"] =
"/admin endpoints are not available. Please check your configuration: --rest-admin=true"

## Debug REST API
installDebugApiHandlers(server.router, app.node)

## Health REST API
installHealthApiHandler(server.router, app.node)
installDebugApiHandlers(router, app.node)

## Relay REST API
if conf.relay:
let cache = MessageCache.init(int(conf.restRelayCacheCapacity))
if app.conf.relay:
let cache = MessageCache.init(int(app.conf.restRelayCacheCapacity))

let handler = messageCacheHandler(cache)

for pubsubTopic in conf.pubsubTopics:
for pubsubTopic in app.conf.pubsubTopics:
cache.pubsubSubscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

for contentTopic in conf.contentTopics:
for contentTopic in app.conf.contentTopics:
cache.contentSubscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))

installRelayApiHandlers(server.router, app.node, cache)
installRelayApiHandlers(router, app.node, cache)
else:
notInstalledTab["relay"] =
restServerNotInstalledTab["relay"] =
"/relay endpoints are not available. Please check your configuration: --relay"

## Filter REST API
if conf.filternode != "" and app.node.wakuFilterClient != nil:
if app.conf.filternode != "" and app.node.wakuFilterClient != nil:
let filterCache = MessageCache.init()

let filterDiscoHandler =
Expand All @@ -376,10 +413,10 @@ proc startRestServer(
none(DiscoveryHandler)

rest_filter_api.installFilterRestApiHandlers(
server.router, app.node, filterCache, filterDiscoHandler
router, app.node, filterCache, filterDiscoHandler
)
else:
notInstalledTab["filter"] =
restServerNotInstalledTab["filter"] =
"/filter endpoints are not available. Please check your configuration: --filternode"

## Store REST API
Expand All @@ -389,27 +426,25 @@ proc startRestServer(
else:
none(DiscoveryHandler)

installStoreApiHandlers(server.router, app.node, storeDiscoHandler)
installStoreApiHandlers(router, app.node, storeDiscoHandler)

## Light push API
if conf.lightpushnode != "" and app.node.wakuLightpushClient != nil:
if app.conf.lightpushnode != "" and app.node.wakuLightpushClient != nil:
let lightDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
else:
none(DiscoveryHandler)

rest_lightpush_api.installLightPushRequestHandler(
server.router, app.node, lightDiscoHandler
router, app.node, lightDiscoHandler
)
else:
notInstalledTab["lightpush"] =
restServerNotInstalledTab["lightpush"] =
"/lightpush endpoints are not available. Please check your configuration: --lightpushnode"

server.start()
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"

ok(server)
info "REST services are installed"
ok()

proc startMetricsServer(
serverIp: IpAddress, serverPort: Port
Expand All @@ -434,15 +469,11 @@ proc startMetricsLogging(): AppResult[void] =
ok()

proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
if app.conf.rest:
let startRestServerRes = startRestServer(
app, app.conf.restAddress, Port(app.conf.restPort + app.conf.portsShift), app.conf
)
if startRestServerRes.isErr():
error "Starting REST server failed. Continuing in current state.",
error = startRestServerRes.error
else:
app.restServer = some(startRestServerRes.value)
if app.conf.rest and app.restServer.isSome():
let restProtocolSupportRes = startRestServerProtocolSupport(app)
if restProtocolSupportRes.isErr():
error "Starting REST server protocol support failed. Continuing in current state.",
error = restProtocolSupportRes.error

if app.conf.metricsServer:
let startMetricsServerRes = startMetricsServer(
Expand Down
65 changes: 65 additions & 0 deletions waku/node/health_monitor.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import std/[options, sequtils, strutils, tables], chronos

import waku_node, ../waku_rln_relay

type
HealthStatus* = enum
INITIALIZING
SYNCHRONIZING
READY
NOT_READY
NOT_MOUNTED
SHUTTING_DOWN

HealtReport* = object
nodeHealth*: HealthStatus
protocolHealth*: Table[string, HealthStatus]

WakuNodeHealthMonitor* = ref object
nodeHealth: HealthStatus
node: Option[WakuNode]

proc `$`*(t: HealthStatus): string =
result =
case t
of INITIALIZING: "Initializing"
of SYNCHRONIZING: "Synchronizing"
of READY: "Ready"
of NOT_READY: "Not Ready"
of NOT_MOUNTED: "Not Mounted"
of SHUTTING_DOWN: "Shutting Down"

const FutIsReadyTimout = 5.seconds

proc getNodeHealthReport*(hm: WakuNodeHealthMonitor): Future[HealtReport] {.async.} =
result.nodeHealth = hm.nodeHealth

if hm.node.isSome(): ## and not hm.node.get().wakuRlnRelay == nil:
let getRlnRelayHealth = proc(): Future[HealthStatus] {.async.} =
let isReadyStateFut = hm.node.get().wakuRlnRelay.isReady()
if not await isReadyStateFut.withTimeout(FutIsReadyTimout):
return HealthStatus.NOT_READY

try:
if not isReadyStateFut.completed():
return HealthStatus.NOT_READY
elif isReadyStateFut.read():
return HealthStatus.READY

return HealthStatus.SYNCHRONIZING
except:
error "exception reading state: " & getCurrentExceptionMsg()
return HealthStatus.NOT_READY

result.protocolHealth["Rln Relay"] = await getRlnRelayHealth()

proc setNode*(hm: WakuNodeHealthMonitor, node: WakuNode) =
hm.node = some(node)

proc setOverallHealth*(hm: WakuNodeHealthMonitor, health: HealthStatus) =
hm.nodeHealth = health
Loading

0 comments on commit 1d9a5ef

Please sign in to comment.