From cfde7eea82796554d45e488d2a3d60747be90b4d Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 28 Oct 2024 09:17:46 +0100 Subject: [PATCH] chore: Circuit relay (#3112) * undo apt install libpcre (not circuit-relay related.) * nat.nim: protect against possible exceptions when calling getExternalIP * new external CLI argument, isRelayClient * waku factory change to mount circuit hop proto by default * waku_node: move autonat_service to a separate module --- .github/workflows/ci.yml | 3 - .../liteprotocoltester/liteprotocoltester.nim | 2 +- apps/wakunode2/wakunode2.nim | 4 +- examples/wakustealthcommitments/node_spec.nim | 2 +- .../requests/node_lifecycle_request.nim | 2 +- tests/factory/test_node_factory.nim | 10 +- tests/test_peer_manager.nim | 20 +--- tests/test_waku_switch.nim | 4 +- tests/test_wakunode.nim | 6 +- tests/wakunode2/test_app.nim | 8 +- tests/wakunode_rest/test_rest_admin.nim | 11 +- waku/common/utils/nat.nim | 7 +- waku/discovery/autonat_service.nim | 36 ++++++ waku/discovery/waku_discv5.nim | 20 +++- waku/factory/builder.nim | 16 ++- waku/factory/external_config.nim | 10 ++ waku/factory/node_factory.nim | 19 ++- waku/factory/waku.nim | 113 ++++++++++++++++-- waku/node/waku_node.nim | 59 ++++----- waku/node/waku_switch.nim | 8 +- waku/waku_core/peers.nim | 54 ++++++++- waku/waku_relay/protocol.nim | 7 ++ 22 files changed, 304 insertions(+), 117 deletions(-) create mode 100644 waku/discovery/autonat_service.nim diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3a8085a159..b567059f83 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -111,9 +111,6 @@ jobs: run: | postgres_enabled=0 if [ ${{ runner.os }} == "Linux" ]; then - sudo apt-get update - sudo apt-get install -y libpcre3 libpcre3-dev - sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18 postgres_enabled=1 fi diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim index 4d9f190d61..5f6ec4ee08 100644 --- a/apps/liteprotocoltester/liteprotocoltester.nim +++ b/apps/liteprotocoltester/liteprotocoltester.nim @@ -132,7 +132,7 @@ when isMainModule: error "Starting esential REST server failed.", error = $error quit(QuitFailure) - var wakuApp = Waku.init(wakuConf).valueOr: + var wakuApp = Waku.new(wakuConf).valueOr: error "Waku initialization failed", error = error quit(QuitFailure) diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 70703f8ecd..b6e94c7477 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -42,7 +42,7 @@ when isMainModule: error "failure while loading the configuration", error = error quit(QuitFailure) - ## Also called within Waku.init. The call to startRestServerEsentials needs the following line + ## Also called within Waku.new. The call to startRestServerEsentials needs the following line logging.setupLog(conf.logLevel, conf.logFormat) case conf.cmd @@ -66,7 +66,7 @@ when isMainModule: error "Starting esential REST server failed.", error = $error quit(QuitFailure) - var waku = Waku.init(confCopy).valueOr: + var waku = Waku.new(confCopy).valueOr: error "Waku initialization failed", error = error quit(QuitFailure) diff --git a/examples/wakustealthcommitments/node_spec.nim b/examples/wakustealthcommitments/node_spec.nim index dabaa0f753..dbab8a3b22 100644 --- a/examples/wakustealthcommitments/node_spec.nim +++ b/examples/wakustealthcommitments/node_spec.nim @@ -48,7 +48,7 @@ proc setup*(): Waku = conf.rlnRelay = twnClusterConf.rlnRelay debug "Starting node" - var waku = Waku.init(conf).valueOr: + var waku = Waku.new(conf).valueOr: error "Waku initialization failed", error = error quit(QuitFailure) diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 841d47db23..2b2edf038f 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -59,7 +59,7 @@ proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} = formattedString & ". expected type: " & $typeof(confValue) ) - let wakuRes = Waku.init(conf).valueOr: + let wakuRes = Waku.new(conf).valueOr: error "waku initialization failed", error = error return err("Failed setting up Waku: " & $error) diff --git a/tests/factory/test_node_factory.nim b/tests/factory/test_node_factory.nim index 0b68338e23..bc3dc0f80d 100644 --- a/tests/factory/test_node_factory.nim +++ b/tests/factory/test_node_factory.nim @@ -1,6 +1,6 @@ {.used.} -import testutils/unittests, chronos +import testutils/unittests, chronos, libp2p/protocols/connectivity/relay/relay import ../testlib/wakunode, waku/factory/node_factory, waku/waku_node @@ -8,7 +8,7 @@ suite "Node Factory": test "Set up a node based on default configurations": let conf = defaultTestWakuNodeConf() - let node = setupNode(conf).valueOr: + let node = setupNode(conf, relay = Relay.new()).valueOr: raiseAssert error check: @@ -23,7 +23,7 @@ suite "Node Factory": var conf = defaultTestWakuNodeConf() conf.store = true - let node = setupNode(conf).valueOr: + let node = setupNode(conf, relay = Relay.new()).valueOr: raiseAssert error check: @@ -35,7 +35,7 @@ test "Set up a node with Filter enabled": var conf = defaultTestWakuNodeConf() conf.filter = true - let node = setupNode(conf).valueOr: + let node = setupNode(conf, relay = Relay.new()).valueOr: raiseAssert error check: @@ -45,7 +45,7 @@ test "Set up a node with Filter enabled": test "Start a node based on default configurations": let conf = defaultTestWakuNodeConf() - let node = setupNode(conf).valueOr: + let node = setupNode(conf, relay = Relay.new()).valueOr: raiseAssert error assert not node.isNil(), "Node can't be nil" diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 31b04ebf43..bcc3821cab 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils, times, sugar], + std/[options, sequtils, times, sugar, net], stew/shims/net as stewNet, testutils/unittests, chronos, @@ -269,14 +269,9 @@ procSuite "Peer Manager": database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] node1 = newTestWakuNode( - generateSecp256k1Key(), - ValidIpAddress.init("127.0.0.1"), - Port(44048), - peerStorage = storage, - ) - node2 = newTestWakuNode( - generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023) + generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage ) + node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023)) node1.mountMetadata(0).expect("Mounted Waku Metadata") node2.mountMetadata(0).expect("Mounted Waku Metadata") @@ -344,14 +339,9 @@ procSuite "Peer Manager": database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] node1 = newTestWakuNode( - generateSecp256k1Key(), - ValidIpAddress.init("127.0.0.1"), - Port(44048), - peerStorage = storage, - ) - node2 = newTestWakuNode( - generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023) + generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage ) + node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023)) node1.mountMetadata(0).expect("Mounted Waku Metadata") node2.mountMetadata(0).expect("Mounted Waku Metadata") diff --git a/tests/test_waku_switch.nim b/tests/test_waku_switch.nim index 6cf734a332..1bf76e89c8 100644 --- a/tests/test_waku_switch.nim +++ b/tests/test_waku_switch.nim @@ -26,7 +26,7 @@ suite "Waku Switch": ## Given let sourceSwitch = newTestSwitch() - wakuSwitch = newWakuSwitch(rng = rng()) + wakuSwitch = newWakuSwitch(rng = rng(), circuitRelay = Relay.new()) await sourceSwitch.start() await wakuSwitch.start() @@ -46,7 +46,7 @@ suite "Waku Switch": asyncTest "Waku Switch acts as circuit relayer": ## Setup let - wakuSwitch = newWakuSwitch(rng = rng()) + wakuSwitch = newWakuSwitch(rng = rng(), circuitRelay = Relay.new()) sourceClient = RelayClient.new() destClient = RelayClient.new() sourceSwitch = newCircuitRelayClientSwitch(sourceClient) diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index 975070465a..dbb2d5d8db 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -1,7 +1,7 @@ {.used.} import - std/[sequtils, strutils], + std/[sequtils, strutils, net], stew/byteutils, stew/shims/net as stewNet, testutils/unittests, @@ -169,7 +169,7 @@ suite "WakuNode": nodeKey = generateSecp256k1Key() bindIp = parseIpAddress("0.0.0.0") bindPort = Port(61006) - extIp = some(parseIpAddress("127.0.0.1")) + extIp = some(getPrimaryIPAddr()) extPort = some(Port(61008)) node = newTestWakuNode(nodeKey, bindIp, bindPort, extIp, extPort) @@ -205,7 +205,7 @@ suite "WakuNode": nodeKey = generateSecp256k1Key() bindIp = parseIpAddress("0.0.0.0") bindPort = Port(61010) - extIp = some(parseIpAddress("127.0.0.1")) + extIp = some(getPrimaryIPAddr()) extPort = some(Port(61012)) domainName = "example.com" expectedDns4Addr = diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index 04057b1a1f..67a6556c8b 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -19,7 +19,7 @@ suite "Wakunode2 - Waku": ## Given var conf = defaultTestWakuNodeConf() - let waku = Waku.init(conf).valueOr: + let waku = Waku.new(conf).valueOr: raiseAssert error ## When @@ -35,7 +35,7 @@ suite "Wakunode2 - Waku initialization": var conf = defaultTestWakuNodeConf() conf.peerPersistence = true - let waku = Waku.init(conf).valueOr: + let waku = Waku.new(conf).valueOr: raiseAssert error check: @@ -46,7 +46,7 @@ suite "Wakunode2 - Waku initialization": var conf = defaultTestWakuNodeConf() ## When - var waku = Waku.init(conf).valueOr: + var waku = Waku.new(conf).valueOr: raiseAssert error (waitFor startWaku(addr waku)).isOkOr: @@ -73,7 +73,7 @@ suite "Wakunode2 - Waku initialization": conf.tcpPort = Port(0) ## When - var waku = Waku.init(conf).valueOr: + var waku = Waku.new(conf).valueOr: raiseAssert error (waitFor startWaku(addr waku)).isOkOr: diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index b6c9336d08..51d1133957 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -1,7 +1,7 @@ {.used.} import - std/[sequtils, strformat], + std/[sequtils, strformat, net], stew/shims/net, testutils/unittests, presto, @@ -38,12 +38,9 @@ suite "Waku v2 Rest API - Admin": var client {.threadvar.}: RestClientRef asyncSetup: - node1 = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60600)) - node2 = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60602)) - node3 = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60604)) + node1 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60600)) + node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60602)) + node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604)) await allFutures(node1.start(), node2.start(), node3.start()) await allFutures( diff --git a/waku/common/utils/nat.nim b/waku/common/utils/nat.nim index 893e8d8a82..975dcec377 100644 --- a/waku/common/utils/nat.nim +++ b/waku/common/utils/nat.nim @@ -39,7 +39,12 @@ proc setupNat*( warn "NAT already initialized, skipping as cannot be done multiple times" else: singletonNat = true - let extIp = getExternalIP(strategy) + var extIp = none(IpAddress) + try: + extIp = getExternalIP(strategy) + except Exception: + warn "exception in setupNat", error = getCurrentExceptionMsg() + if extIP.isSome(): endpoint.ip = some(extIp.get()) # RedirectPorts in considered a gcsafety violation diff --git a/waku/discovery/autonat_service.nim b/waku/discovery/autonat_service.nim new file mode 100644 index 0000000000..c4e2dd8edc --- /dev/null +++ b/waku/discovery/autonat_service.nim @@ -0,0 +1,36 @@ +import + chronos, + chronicles, + bearssl/rand, + libp2p/protocols/connectivity/autonat/client, + libp2p/protocols/connectivity/autonat/service, + libp2p/protocols/connectivity/autonat/core + +const AutonatCheckInterval = Opt.some(chronos.seconds(30)) + +proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService = + ## AutonatService request other peers to dial us back + ## flagging us as Reachable or NotReachable. + ## minConfidence is used as threshold to determine the state. + ## If maxQueueSize > numPeersToAsk past samples are considered + ## in the calculation. + let autonatService = AutonatService.new( + autonatClient = AutonatClient.new(), + rng = rng, + scheduleInterval = AutonatCheckInterval, + askNewConnectedPeers = false, + numPeersToAsk = 3, + maxQueueSize = 3, + minConfidence = 0.7, + ) + + proc statusAndConfidenceHandler( + networkReachability: NetworkReachability, confidence: Opt[float] + ): Future[void] {.async.} = + if confidence.isSome(): + info "Peer reachability status", + networkReachability = networkReachability, confidence = confidence.get() + + autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) + + return autonatService diff --git a/waku/discovery/waku_discv5.nim b/waku/discovery/waku_discv5.nim index b2c2bb8e67..f01c6c50f4 100644 --- a/waku/discovery/waku_discv5.nim +++ b/waku/discovery/waku_discv5.nim @@ -108,6 +108,19 @@ proc new*( topicSubscriptionQueue: queue, ) +proc updateAnnouncedMultiAddress*( + wd: WakuDiscoveryV5, addresses: seq[MultiAddress] +): Result[void, string] = + let encodedAddrs = multiaddr.encodeMultiaddrs(addresses) + + wd.protocol.updateRecord([(MultiaddrEnrField, encodedAddrs)]).isOkOr: + return err("failed to update multiaddress in ENR: " & $error) + + debug "ENR updated successfully with new multiaddress", + enrUri = wd.protocol.localNode.record.toUri(), enr = $(wd.protocol.localNode.record) + + return ok() + proc updateENRShards( wd: WakuDiscoveryV5, newTopics: seq[PubsubTopic], add: bool ): Result[void, string] = @@ -286,7 +299,9 @@ proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} = if subRes.isErr() and unsubRes.isErr(): continue - debug "ENR updated successfully" + debug "ENR updated successfully", + enrUri = wd.protocol.localNode.record.toUri(), + enr = $(wd.protocol.localNode.record) wd.predicate = shardingPredicate(wd.protocol.localNode.record, wd.protocol.bootstrapRecords) @@ -314,7 +329,8 @@ proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async: (raises: asyncSpawn wd.subscriptionsListener() debug "Successfully started discovery v5 service" - info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri() + info "Discv5: discoverable ENR ", + enrUri = wd.protocol.localNode.record.toUri(), enr = $(wd.protocol.localNode.record) ok() diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index 7e203fe72b..abd347b848 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -7,7 +7,9 @@ import libp2p/crypto/crypto, libp2p/builders, libp2p/nameresolving/nameresolver, - libp2p/transports/wstransport + libp2p/transports/wstransport, + libp2p/protocols/connectivity/relay/client, + libp2p/protocols/connectivity/relay/relay import ../waku_enr, ../discovery/waku_discv5, @@ -38,6 +40,7 @@ type switchSslSecureKey: Option[string] switchSslSecureCert: Option[string] switchSendSignedPeerRecord: Option[bool] + circuitRelay: Relay #Rate limit configs for non-relay req-resp protocols rateLimitSettings: Option[seq[string]] @@ -116,6 +119,9 @@ proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) = proc withRateLimit*(builder: var WakuNodeBuilder, limits: seq[string]) = builder.rateLimitSettings = some(limits) +proc withCircuitRelay*(builder: var WakuNodeBuilder, circuitRelay: Relay) = + builder.circuitRelay = circuitRelay + ## Waku switch proc withSwitchConfiguration*( @@ -154,6 +160,12 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = if builder.record.isNone(): return err("node record is required") + let circuitRelay = + if builder.circuitRelay.isNil(): + Relay.new() + else: + builder.circuitRelay + var switch: Switch try: switch = newWakuSwitch( @@ -170,7 +182,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = sendSignedPeerRecord = builder.switchSendSignedPeerRecord.get(false), agentString = builder.switchAgentString, peerStoreCapacity = builder.peerStorageCapacity, - services = @[Service(getAutonatService(rng))], + circuitRelay = circuitRelay, ) except CatchableError: return err("failed to create switch: " & getCurrentExceptionMsg()) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 904dee509c..a06f06d0c9 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -245,6 +245,16 @@ type WakuNodeConf* = object name: "dns4-domain-name" .}: string + ## Circuit-relay config + isRelayClient* {. + desc: + """Set the node as a relay-client. +Set it to true for nodes that run behind a NAT or firewall and +hence would have reachability issues.""", + defaultValue: false, + name: "relay-client" + .}: bool + ## Relay config relay* {. desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay" diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index c6e0412271..0caa50259c 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -4,6 +4,7 @@ import chronos, libp2p/peerid, libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/connectivity/relay/relay, libp2p/nameresolving/dnsresolver, libp2p/crypto/crypto @@ -59,6 +60,7 @@ proc initNode( nodeKey: crypto.PrivateKey, record: enr.Record, peerStore: Option[WakuPeerStorage], + relay: Relay, dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[], ): Result[WakuNode, string] = ## Setup a basic Waku v2 node based on a supplied configuration @@ -103,6 +105,7 @@ proc initNode( maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement ) builder.withRateLimit(conf.rateLimits) + builder.withCircuitRelay(relay) node = ?builder.build().mapErr( @@ -438,21 +441,15 @@ proc startNode*( return ok() proc setupNode*( - conf: WakuNodeConf, rng: Option[ref HmacDrbgContext] = none(ref HmacDrbgContext) + conf: WakuNodeConf, rng: ref HmacDrbgContext = crypto.newRng(), relay: Relay ): Result[WakuNode, string] = - var nodeRng = - if rng.isSome(): - rng.get() - else: - crypto.newRng() - # Use provided key only if corresponding rng is also provided let key = - if conf.nodeKey.isSome() and rng.isSome(): + if conf.nodeKey.isSome(): conf.nodeKey.get() else: - warn "missing key or rng, generating new set" - crypto.PrivateKey.random(Secp256k1, nodeRng[]).valueOr: + warn "missing key, generating new" + crypto.PrivateKey.random(Secp256k1, rng[]).valueOr: error "Failed to generate key", error = error return err("Failed to generate key: " & $error) @@ -479,7 +476,7 @@ proc setupNode*( debug "Initializing node" - let node = initNode(conf, netConfig, nodeRng, key, record, peerStore).valueOr: + let node = initNode(conf, netConfig, rng, key, record, peerStore, relay).valueOr: error "Initializing node failed", error = error return err("Initializing node failed: " & error) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index e503ee8f0d..48d75cfd15 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -1,16 +1,22 @@ {.push raises: [].} import - std/options, + std/[options, sequtils], results, chronicles, chronos, + libp2p/protocols/connectivity/relay/relay, + libp2p/protocols/connectivity/relay/client, libp2p/wire, - libp2p/multicodec, libp2p/crypto/crypto, libp2p/protocols/pubsub/gossipsub, + libp2p/services/autorelayservice, + libp2p/services/hpservice, libp2p/peerid, + libp2p/discovery/discoverymngr, + libp2p/discovery/rendezvousinterface, eth/keys, + eth/p2p/discoveryv5/enr, presto, metrics, metrics/chronos_httpserver @@ -24,8 +30,10 @@ import ../waku_api/message_cache, ../waku_api/rest/server, ../waku_archive, + ../waku_relay/protocol, ../discovery/waku_dnsdisc, ../discovery/waku_discv5, + ../discovery/autonat_service, ../waku_enr/sharding, ../waku_rln_relay, ../waku_store, @@ -33,7 +41,8 @@ import ../factory/networks_config, ../factory/node_factory, ../factory/internal_config, - ../factory/external_config + ../factory/external_config, + ../waku_enr/multiaddr logScope: topics = "wakunode waku" @@ -41,7 +50,7 @@ logScope: # Git version in git describe format (defined at compile time) const git_version* {.strdefine.} = "n/a" -type Waku* = object +type Waku* = ref object version: string conf: WakuNodeConf rng: ref HmacDrbgContext @@ -49,6 +58,7 @@ type Waku* = object wakuDiscv5*: WakuDiscoveryV5 dynamicBootstrapNodes: seq[RemotePeerInfo] + discoveryMngr: DiscoveryManager node*: WakuNode @@ -99,9 +109,43 @@ proc validateShards(conf: WakuNodeConf): Result[void, string] = return ok() +proc setupSwitchServices( + waku: Waku, conf: WakuNodeConf, circuitRelay: Relay, rng: ref HmacDrbgContext +) = + proc onReservation(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} = + debug "circuit relay handler new reserve event", + addrs_before = $(waku.node.announcedAddresses), addrs = $addresses + + waku.node.announcedAddresses.setLen(0) ## remove previous addresses + waku.node.announcedAddresses.add(addresses) + debug "waku node announced addresses updated", + announcedAddresses = waku.node.announcedAddresses + + if not isNil(waku.wakuDiscv5): + waku.wakuDiscv5.updateAnnouncedMultiAddress(addresses).isOkOr: + error "failed to update announced multiaddress", error = $error + + let autonatService = getAutonatService(rng) + if conf.isRelayClient: + ## The node is considered to be behind a NAT or firewall and then it + ## should struggle to be reachable and establish connections to other nodes + const MaxNumRelayServers = 2 + let autoRelayService = AutoRelayService.new( + MaxNumRelayServers, RelayClient(circuitRelay), onReservation, rng + ) + let holePunchService = HPService.new(autonatService, autoRelayService) + waku.node.switch.services = @[Service(holePunchService)] + else: + waku.node.switch.services = @[Service(autonatService)] + ## Initialisation -proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = +proc newCircuitRelay(isRelayClient: bool): Relay = + if isRelayClient: + return RelayClient.new() + return Relay.new() + +proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = let rng = crypto.newRng() logging.setupLog(confCopy.logLevel, confCopy.logFormat) @@ -182,13 +226,16 @@ proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = "Retrieving dynamic bootstrap nodes failed: " & dynamicBootstrapNodesRes.error ) - let nodeRes = setupNode(confCopy, some(rng)) + var relay = newCircuitRelay(confCopy.isRelayClient) + + let nodeRes = setupNode(confCopy, rng, relay) if nodeRes.isErr(): error "Failed setting up node", error = nodeRes.error return err("Failed setting up node: " & nodeRes.error) let node = nodeRes.get() + ## Delivery Monitor var deliveryMonitor: DeliveryMonitor if confCopy.reliabilityEnabled: if confCopy.storenode == "": @@ -212,6 +259,8 @@ proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = deliveryMonitor: deliveryMonitor, ) + waku.setupSwitchServices(confCopy, relay, rng) + ok(waku) proc getPorts( @@ -249,7 +298,10 @@ proc getRunningNetConfig(waku: ptr Waku): Result[NetConfig, string] = return ok(netConf) -proc updateEnr(waku: ptr Waku, netConf: NetConfig): Result[void, string] = +proc updateEnr(waku: ptr Waku): Result[void, string] = + let netConf: NetConfig = getRunningNetConfig(waku).valueOr: + return err("error calling updateNetConfig: " & $error) + let record = enrConfiguration(waku[].conf, netConf, waku[].key).valueOr: return err("ENR setup failed: " & error) @@ -260,17 +312,42 @@ proc updateEnr(waku: ptr Waku, netConf: NetConfig): Result[void, string] = return ok() +proc updateAddressInENR(waku: ptr Waku): Result[void, string] = + let addresses: seq[MultiAddress] = waku[].node.announcedAddresses + let encodedAddrs = multiaddr.encodeMultiaddrs(addresses) + + ## First update the enr info contained in WakuNode + let keyBytes = waku[].key.getRawBytes().valueOr: + return err("failed to retrieve raw bytes from waku key: " & $error) + + let parsedPk = keys.PrivateKey.fromHex(keyBytes.toHex()).valueOr: + return err("failed to parse the private key: " & $error) + + let enrFields = @[toFieldPair(MultiaddrEnrField, encodedAddrs)] + waku[].node.enr.update(parsedPk, enrFields).isOkOr: + return err("failed to update multiaddress in ENR updateAddressInENR: " & $error) + + debug "Waku node ENR updated successfully with new multiaddress", + enr = waku[].node.enr.toUri(), record = $(waku[].node.enr) + + ## Now update the ENR infor in discv5 + if not waku[].wakuDiscv5.isNil(): + waku[].wakuDiscv5.protocol.localNode.record = waku[].node.enr + let enr = waku[].wakuDiscv5.protocol.localNode.record + + debug "Waku discv5 ENR updated successfully with new multiaddress", + enr = enr.toUri(), record = $(enr) + + return ok() + proc updateWaku(waku: ptr Waku): Result[void, string] = if waku[].conf.tcpPort == Port(0) or waku[].conf.websocketPort == Port(0): - let netConf = getRunningNetConfig(waku).valueOr: - return err("error calling updateNetConfig: " & $error) - - updateEnr(waku, netConf).isOkOr: + updateEnr(waku).isOkOr: return err("error calling updateEnr: " & $error) - waku[].node.announcedAddresses = netConf.announcedAddresses + ?updateAnnouncedAddrWithPrimaryIpAddr(waku[].node) - printNodeNetworkInfo(waku[].node) + ?updateAddressInENR(waku) return ok() @@ -297,6 +374,16 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: if not waku[].deliveryMonitor.isNil(): waku[].deliveryMonitor.startDeliveryMonitor() + ## libp2p DiscoveryManager + waku[].discoveryMngr = DiscoveryManager() + waku[].discoveryMngr.add( + RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes) + ) + if not isNil(waku[].node.wakuRelay): + for topic in waku[].node.wakuRelay.getSubscribedTopics(): + debug "advertise rendezvous namespace", topic + waku[].discoveryMngr.advertise(RdvNamespace(topic)) + return ok() # Waku shutdown diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index a91ad874d8..a592ebe7f5 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -50,7 +50,8 @@ import ../waku_rln_relay, ./config, ./peer_manager, - ../common/rate_limit/setting + ../common/rate_limit/setting, + ../discovery/autonat_service declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicHistogram waku_histogram_message_size, @@ -116,33 +117,6 @@ type contentTopicHandlers: Table[ContentTopic, TopicHandler] rateLimitSettings*: ProtocolRateLimitSettings -proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService = - ## AutonatService request other peers to dial us back - ## flagging us as Reachable or NotReachable. - ## minConfidence is used as threshold to determine the state. - ## If maxQueueSize > numPeersToAsk past samples are considered - ## in the calculation. - let autonatService = AutonatService.new( - autonatClient = AutonatClient.new(), - rng = rng, - scheduleInterval = Opt.some(chronos.seconds(120)), - askNewConnectedPeers = false, - numPeersToAsk = 3, - maxQueueSize = 3, - minConfidence = 0.7, - ) - - proc statusAndConfidenceHandler( - networkReachability: NetworkReachability, confidence: Opt[float] - ): Future[void] {.gcsafe, async.} = - if confidence.isSome(): - info "Peer reachability status", - networkReachability = networkReachability, confidence = confidence.get() - - autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) - - return autonatService - proc new*( T: type WakuNode, netConfig: NetConfig, @@ -1291,11 +1265,11 @@ proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = return false -proc printNodeNetworkInfo*(node: WakuNode): void = +proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] = let peerInfo = node.switch.peerInfo var announcedStr = "" var listenStr = "" - var localIp = "" + var localIp = "0.0.0.0" try: localIp = $getPrimaryIPAddr() @@ -1304,20 +1278,34 @@ proc printNodeNetworkInfo*(node: WakuNode): void = info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs + ## Update the WakuNode addresses + var newAnnouncedAddresses = newSeq[MultiAddress](0) for address in node.announcedAddresses: - var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" + ## Replace "0.0.0.0" or "127.0.0.1" with the localIp + let newAddr = ($address).replace("0.0.0.0", localIp).replace("127.0.0.1", localIp) + let fulladdr = "[" & $newAddr & "/p2p/" & $peerInfo.peerId & "]" announcedStr &= fulladdr + let newMultiAddr = MultiAddress.init(newAddr).valueOr: + return err("error in updateAnnouncedAddrWithPrimaryIpAddr: " & $error) + newAnnouncedAddresses.add(newMultiAddr) + + node.announcedAddresses = newAnnouncedAddresses + + ## Update the Switch addresses + node.switch.peerInfo.addrs = newAnnouncedAddresses for transport in node.switch.transports: for address in transport.addrs: - var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" + let fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" listenStr &= fulladdr - ## XXX: this should be /ip4..., / stripped? - info "Listening on", full = listenStr, localIp = localIp + info "Listening on", + full = listenStr, localIp = localIp, switchAddress = $(node.switch.peerInfo.addrs) info "Announcing addresses", full = announcedStr info "DNS: discoverable ENR ", enr = node.enr.toUri() + return ok() + proc start*(node: WakuNode) {.async.} = ## Starts a created Waku Node and ## all its mounted protocols. @@ -1357,7 +1345,8 @@ proc start*(node: WakuNode) {.async.} = node.started = true if not zeroPortPresent: - printNodeNetworkInfo(node) + updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr: + error "failed update announced addr", error = $error else: info "Listening port is dynamically allocated, address and ENR generation postponed" diff --git a/waku/node/waku_switch.nim b/waku/node/waku_switch.nim index db1dc51bb2..48d3612e33 100644 --- a/waku/node/waku_switch.nim +++ b/waku/node/waku_switch.nim @@ -9,6 +9,7 @@ import libp2p/crypto/crypto, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/rendezvous, + libp2p/protocols/connectivity/relay/relay, libp2p/nameresolving/nameresolver, libp2p/builders, libp2p/switch, @@ -76,8 +77,8 @@ proc newWakuSwitch*( secureCertPath: string = "", agentString = none(string), # defaults to nim-libp2p version peerStoreCapacity = none(int), # defaults to 1.25 maxConnections - services: seq[switch.Service] = @[], rendezvous: RendezVous = nil, + circuitRelay: Relay, ): Switch {.raises: [Defect, IOError, LPError].} = var b = SwitchBuilder .new() @@ -92,7 +93,7 @@ proc newWakuSwitch*( .withTcpTransport(transportFlags) .withNameResolver(nameResolver) .withSignedPeerRecord(sendSignedPeerRecord) - .withCircuitRelay() + .withCircuitRelay(circuitRelay) .withAutonat() if peerStoreCapacity.isSome(): @@ -114,9 +115,6 @@ proc newWakuSwitch*( else: b = b.withAddress(address) - if services.len > 0: - b = b.withServices(services) - if not rendezvous.isNil(): b = b.withRendezVous(rendezvous) diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 45a6914e23..6b828cd0ad 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -16,6 +16,7 @@ import libp2p/peerid, libp2p/peerinfo, libp2p/routing_record, + regex, json_serialization import ../waku_enr/capabilities @@ -110,7 +111,7 @@ proc init*( ## Parse -proc validWireAddr*(ma: MultiAddress): bool = +proc validWireAddr(ma: MultiAddress): bool = ## Check if wire Address is supported const ValidTransports = mapOr(TCP, WebSockets) return ValidTransports.match(ma) @@ -120,9 +121,44 @@ proc parsePeerInfo*(peer: RemotePeerInfo): Result[RemotePeerInfo, string] = ## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo ok(peer) -proc parsePeerInfo*(peer: MultiAddress): Result[RemotePeerInfo, string] = - ## Parses a fully qualified peer multiaddr, in the - ## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo +proc parsePeerInfoFromCircuitRelayAddr( + address: string +): Result[RemotePeerInfo, string] = + var match: RegexMatch2 + # Parse like: /ip4/162.19.247.156/tcp/60010/p2p/16Uiu2HAmCzWcYBCw3xKW8De16X9wtcbQrqD8x7CRRv4xpsFJ4oN8/p2p-circuit/p2p/16Uiu2HAm2eqzqp6xn32fzgGi8K4BuF88W4Xy6yxsmDcW8h1gj6ie + let maPattern = + re2"\/(ip4|ip6|dns|dnsaddr|dns4|dns6)\/[0-9a-fA-F:.]+\/(tcp|ws|wss)\/\d+\/p2p\/(.+)\/p2p-circuit\/p2p\/(.+)" + if not regex.match(address, maPattern, match): + return err("failed to parse ma: " & address) + + if match.captures.len != 4: + return err( + "failed parsing p2p-circuit addr, expected 4 regex capture groups: " & address & + " found: " & $(match.namedGroups.len) + ) + + let relayPeerId = address[match.group(2)] + let targetPeerIdStr = address[match.group(3)] + + discard PeerID.init(relayPeerId).valueOr: + return err("invalid relay peer id from p2p-circuit address: " & address) + let targetPeerId = PeerID.init(targetPeerIdStr).valueOr: + return err("invalid targetPeerId peer id from p2p-circuit address: " & address) + + let pattern = "/p2p-circuit" + let idx = address.find(pattern) + let wireAddr: MultiAddress = + if idx != -1: + # Extract everything from the start up to and including "/p2p-circuit" + let adr = address[0 .. (idx + pattern.len - 1)] + MultiAddress.init(adr).valueOr: + return err("could not create multiaddress from: " & adr) + else: + return err("could not find /p2p-circuit pattern in: " & address) + + return ok(RemotePeerInfo.init(targetPeerId, @[wireAddr])) + +proc parsePeerInfoFromRegularAddr(peer: MultiAddress): Result[RemotePeerInfo, string] = var p2pPart: MultiAddress var wireAddr = MultiAddress() for addrPart in peer.items(): @@ -163,6 +199,16 @@ proc parsePeerInfo*(peer: MultiAddress): Result[RemotePeerInfo, string] = return ok(RemotePeerInfo.init(peerId, @[wireAddr])) +proc parsePeerInfo*(peer: MultiAddress): Result[RemotePeerInfo, string] = + ## Parses a fully qualified peer multiaddr into dialable RemotePeerInfo + + let peerAddrStr = $peer + + if "p2p-circuit" in peerAddrStr: + return parsePeerInfoFromCircuitRelayAddr(peerAddrStr) + + return parsePeerInfoFromRegularAddr(peer) + proc parsePeerInfo*(peer: string): Result[RemotePeerInfo, string] = ## Parses a fully qualified peer multiaddr, in the ## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 2d718c09c2..966f6a4849 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -502,3 +502,10 @@ proc getNumConnectedPeers*( ) return ok(peers.len) + +proc getSubscribedTopics*(w: WakuRelay): seq[PubsubTopic] = + ## Returns a seq containing the current list of subscribed topics + var topics: seq[PubsubTopic] + for t in w.validatorInserted.keys(): + topics.add(t) + return topics