Skip to content

Commit

Permalink
subpub: measure DHT latency and export via prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
altergui committed Oct 11, 2023
1 parent 7e13e19 commit 594e995
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion subpub/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,23 @@ import (
discrouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
discutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/metrics"
)

// Metrics exported via prometheus
var (
dhtLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "file",
Name: "peers_dht_latency",
Help: "The time it takes FindPeers to discover peers",
})
)

// setupDiscovery creates a DHT discovery service and attaches it to the libp2p Host.
// This lets us automatically discover peers and connect to them.
func (s *SubPub) setupDiscovery(ctx context.Context) {

// Set a function as stream handler. This function is called when a peer
// initiates a connection and starts a stream with this peer.
if !s.OnlyDiscover {
Expand All @@ -29,6 +39,8 @@ func (s *SubPub) setupDiscovery(ctx context.Context) {
s.routing = discrouting.NewRoutingDiscovery(s.node.DHT)
discutil.Advertise(ctx, s.routing, s.Topic)

metrics.Register(dhtLatency)

// Discover new peers periodically
go func() { // this spawns a single background task per instance
for {
Expand All @@ -46,6 +58,7 @@ func (s *SubPub) setupDiscovery(ctx context.Context) {
}

func (s *SubPub) discover(ctx context.Context) {
dhtLatencyTimer := prometheus.NewTimer(dhtLatency)
// Now, look for others who have announced.
// This is like your friend telling you the location to meet you.
log.Debugf("looking for peers in topic %s", s.Topic)
Expand All @@ -69,6 +82,8 @@ func (s *SubPub) discover(ctx context.Context) {
continue
}
// new peer; let's connect to it
// first update the latency metrics
dhtLatencyTimer.ObserveDuration()
connectCtx, cancel := context.WithTimeout(ctx, time.Second*10)
if err := s.node.PeerHost.Connect(connectCtx, peer); err != nil {
cancel()
Expand Down

0 comments on commit 594e995

Please sign in to comment.