Skip to content

Commit

Permalink
feat: add traceFunction call to metrics
Browse files Browse the repository at this point in the history
Allow tracing method calls using a metrics implementation.

```js
const libp2p = await createLibp2p()
const context = libp2p.metrics?.createTraceContext()

for await (const foo of libp2p.contentRouting.findProviders(cid, {
  context,
  signal: AbortSignal.timeout(20_000)
}) {
  //...
}
```

Adds tracing support to `libp2p.contentRouting.*` and
`libp2p.peerRouting.*` to start with, other methods can have it added
when necessary.
  • Loading branch information
achingbrain committed Dec 19, 2024
1 parent 7caee9f commit 73887f2
Show file tree
Hide file tree
Showing 15 changed files with 24,334 additions and 12 deletions.
4 changes: 2 additions & 2 deletions packages/interface/src/content-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { AbortOptions, RoutingOptions } from '../index.js'
import type { RoutingOptions } from '../index.js'
import type { PeerInfo } from '../peer-info/index.js'
import type { CID } from 'multiformats/cid'

Expand Down Expand Up @@ -50,7 +50,7 @@ export interface ContentRouting {
* provide content corresponding to the passed CID, call this function to no
* longer remind them.
*/
cancelReprovide (key: CID, options?: AbortOptions): Promise<void>
cancelReprovide (key: CID, options?: RoutingOptions): Promise<void>

/**
* Find the providers of the passed CID.
Expand Down
11 changes: 10 additions & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -753,13 +753,22 @@ export interface LoggerOptions {
log: Logger
}

/**
* An object that includes a context object that is passed onwards.
*
* This is used by metrics method tracing to link function calls together.
*/
export interface ContextOptions {
context?: any
}

/**
* When a routing operation involves reading values, these options allow
* controlling where the values are read from. By default libp2p will check
* local caches but may not use the network if a valid local value is found,
* these options allow tuning that behaviour.
*/
export interface RoutingOptions extends AbortOptions, ProgressOptions {
export interface RoutingOptions extends AbortOptions, ProgressOptions, ContextOptions {
/**
* Pass `false` to not use the network
*
Expand Down
57 changes: 57 additions & 0 deletions packages/interface/src/metrics/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,4 +488,61 @@ export interface Metrics {
* method on the returned summary group object
*/
registerSummaryGroup: ((name: string, options?: SummaryOptions) => SummaryGroup) & ((name: string, options: CalculatedSummaryOptions<Record<string, number>>) => void)

/**
* Wrap a function for tracing purposes.
*
* All functions wrapped like this should accept a final optional options arg.
*
* In order to pass an execution context along to create a multi-layered
* trace, the index of the options arg must be specified.
*/
traceFunction <F extends (...args: any[]) => AsyncIterator<any>> (name: string, fn: F, options?: TraceGeneratorFunctionOptions<Parameters<F>, ReturnType<F>, YieldType<ReturnType<F>>>): F
traceFunction <F extends (...args: any[]) => Iterator<any>> (name: string, fn: F, options?: TraceGeneratorFunctionOptions<Parameters<F>, ReturnType<F>, YieldType<ReturnType<F>>>): F
traceFunction <F extends (...args: any[]) => any = (...args: any[]) => any> (name: string, fn: F, options?: TraceFunctionOptions<Parameters<F>, ReturnType<F>>): F

/**
* Creates a tracing context that can be used to trace a method call
*/
createTraceContext(): any
}

/**
* Infer the yielded type of an (async)iterable
*/
type YieldType<T extends AsyncIterator<any> | Iterator<any>> = T extends AsyncIterator<infer Y> ? Y : T extends Iterator<infer Y, any, any> ? Y : never

export type TraceAttributes = Record<string, number | string | boolean | number[] | string[] | boolean[]>

export interface TraceFunctionOptions<A, B> {
/**
* To construct a trace that spans multiple method invocations, it's necessary
* to pass the trace context onwards as part of the options object.
*
* Specify the index of the options object in the args array here.
*
* @default 0
*/
optionsIndex?: number

/**
* Set attributes on the trace by modifying the passed attributes object.
*/
getAttributesFromArgs?(args: A, attributes: TraceAttributes): TraceAttributes

/**
* Set attributes on the trace by modifying the passed attributes object. The
* object will have previously been passed to `appendAttributesFromArgs`
* and/or `appendAttributesFromYieldedValue` (if defined)
*/
getAttributesFromReturnValue?(value: B, attributes: TraceAttributes): TraceAttributes
}

export interface TraceGeneratorFunctionOptions<A, B, C = any> extends TraceFunctionOptions<A, B> {
/**
* Set attributes on the trace by modifying the passed attributes object. The
* object will have previously been passed to `appendAttributesFromArgs` (if
* defined)
*/
getAttributesFromYieldedValue? (value: C, attributes: TraceAttributes, index: number): TraceAttributes
}
24,006 changes: 24,006 additions & 0 deletions packages/kad-dht/benchmark/peer-distance/fixtures.js

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions packages/kad-dht/benchmark/peer-distance/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/* eslint-disable no-console
import Benchmark from 'benchmark'
import { PeerDistanceList } from '../../dist/src/peer-distance-list.js'
import { generateKeyPair } from '@libp2p/crypto/keys'
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { convertPeerId } from '../../dist/src/utils.js'
const peers = 6000
for (let i = 0; i < peers; i++) {
const key = await generateKeyPair('Ed25519')
const id = peerIdFromPrivateKey(key)
console.info(`{
peerId: peerIdFromString('${id.toString()}'),
kadId: Uint8Array.from([${(await convertPeerId(id)).join(', ')}])
}, `)
}
*/



/* eslint-disable no-console */
import Benchmark from 'benchmark'
import { PeerDistanceList } from '../../dist/src/peer-distance-list.js'
import { convertBuffer } from '../../dist/src/utils.js'
import { key, peers } from './fixtures.js'

const suite = new Benchmark.Suite('peer distance list')

suite.add('current', async (d) => {
const kadId = await convertBuffer(key)
const list = new PeerDistanceList(kadId, 20)

for (const peer of peers) {
list.addWitKadId({ id: peer.peerId, multiaddrs: [] }, peer.kadId)
}

d.resolve()
}, { defer: true })

async function main () {
suite
.on('cycle', (event) => console.log(String(event.target)))
.on('complete', function () {
console.log('fastest is ' + this.filter('fastest').map('name'))
})
.run({ async: true })
}

main()
.catch(err => {
console.error(err)
process.exit(1)
})
14 changes: 14 additions & 0 deletions packages/kad-dht/benchmark/peer-distance/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "libp2p-kad-dht-peer-distance-list-benchmarks",
"version": "0.0.0",
"private": true,
"type": "module",
"scripts": {
"start": "node ."
},
"license": "MIT",
"dependencies": {
"@types/benchmark": "^2.1.5",
"benchmark": "^2.1.4"
}
}
17 changes: 15 additions & 2 deletions packages/kad-dht/src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ export class ContentFetching {
this.peerRouting = peerRouting
this.queryManager = queryManager
this.network = network

this.get = components.metrics?.traceFunction('libp2p.kadDHT.get', this.get.bind(this), {
optionsIndex: 1
}) ?? this.get
this.put = components.metrics?.traceFunction('libp2p.kadDHT.put', this.put.bind(this), {
optionsIndex: 2
}) ?? this.put
}

/**
Expand Down Expand Up @@ -145,7 +152,10 @@ export class ContentFetching {

// put record to the closest peers
yield * pipe(
this.peerRouting.getClosestPeers(key, { signal: options.signal }),
this.peerRouting.getClosestPeers(key, {
...options,
signal: options.signal
}),
(source) => map(source, (event) => {
return async () => {
if (event.name !== 'FINAL_PEER') {
Expand Down Expand Up @@ -252,7 +262,10 @@ export class ContentFetching {
const self = this // eslint-disable-line @typescript-eslint/no-this-alias

const getValueQuery: QueryFunc = async function * ({ peer, signal }) {
for await (const event of self.peerRouting.getValueOrPeers(peer, key, { signal })) {
for await (const event of self.peerRouting.getValueOrPeers(peer, key, {
...options,
signal
})) {
yield event

if (event.name === 'PEER_RESPONSE' && (event.record != null)) {
Expand Down
23 changes: 23 additions & 0 deletions packages/kad-dht/src/content-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@ export class ContentRouting {
this.queryManager = queryManager
this.routingTable = routingTable
this.providers = providers

this.findProviders = components.metrics?.traceFunction('libp2p.kadDHT.findProviders', this.findProviders.bind(this), {
optionsIndex: 1,
getAttributesFromYieldedValue: (event, attrs: { providers?: string[] }) => {
if (event.name === 'PROVIDER') {
attrs.providers ??= []
attrs.providers.push(...event.providers.map(info => info.id.toString()))
}

return attrs
}
}) ?? this.findProviders
this.provide = components.metrics?.traceFunction('libp2p.kadDHT.provide', this.provide.bind(this), {
optionsIndex: 1,
getAttributesFromYieldedValue: (event, attrs: { providers?: string[] }) => {
if (event.name === 'PEER_RESPONSE' && event.messageName === 'ADD_PROVIDER') {
attrs.providers ??= []
attrs.providers.push(event.from.toString())
}

return attrs
}
}) ?? this.provide
}

/**
Expand Down
55 changes: 55 additions & 0 deletions packages/kad-dht/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,61 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
operations: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_outbound_rpc_requests_total`),
errors: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_outbound_rpc_errors_total`)
}

this.sendRequest = components.metrics?.traceFunction('libp2p.kadDHT.sendRequest', this.sendRequest.bind(this), {
optionsIndex: 2,
getAttributesFromArgs ([to, message], attrs) {
return {
...attrs,
to: to.toString(),
'message type': `${message.type}`
}
},
getAttributesFromYieldedValue: (event, attrs) => {
if (event.name === 'PEER_RESPONSE') {
if (event.providers.length > 0) {
event.providers.forEach((value, index) => {
attrs[`providers-${index}`] = value.id.toString()
})
}

if (event.closer.length > 0) {
event.closer.forEach((value, index) => {
attrs[`closer-${index}`] = value.id.toString()
})
}
}

return attrs
}
}) ?? this.sendRequest
this.sendMessage = components.metrics?.traceFunction('libp2p.kadDHT.sendMessage', this.sendMessage.bind(this), {
optionsIndex: 2,
getAttributesFromArgs ([to, message], attrs) {
return {
...attrs,
to: to.toString(),
'message type': `${message.type}`
}
},
getAttributesFromYieldedValue: (event, attrs) => {
if (event.name === 'PEER_RESPONSE') {
if (event.providers.length > 0) {
event.providers.forEach((value, index) => {
attrs[`providers-${index}`] = value.id.toString()
})
}

if (event.closer.length > 0) {
event.closer.forEach((value, index) => {
attrs[`closer-${index}`] = value.id.toString()
})
}
}

return attrs
}
}) ?? this.sendMessage
}

/**
Expand Down
10 changes: 9 additions & 1 deletion packages/kad-dht/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import type { Network } from '../network.js'
import type { QueryManager, QueryOptions } from '../query/manager.js'
import type { QueryFunc } from '../query/types.js'
import type { RoutingTable } from '../routing-table/index.js'
import type { ComponentLogger, Logger, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface'
import type { ComponentLogger, Logger, Metrics, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface'

export interface PeerRoutingComponents {
peerId: PeerId
peerStore: PeerStore
logger: ComponentLogger
metrics?: Metrics
}

export interface PeerRoutingInit {
Expand Down Expand Up @@ -55,6 +56,13 @@ export class PeerRouting {
this.peerStore = components.peerStore
this.peerId = components.peerId
this.log = components.logger.forComponent(`${init.logPrefix}:peer-routing`)

this.findPeer = components.metrics?.traceFunction('libp2p.kadDHT.findPeer', this.findPeer.bind(this), {
optionsIndex: 1
}) ?? this.findPeer
this.getClosestPeers = components.metrics?.traceFunction('libp2p.kadDHT.getClosestPeers', this.getClosestPeers.bind(this), {
optionsIndex: 1
}) ?? this.getClosestPeers
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/kad-dht/src/query-self.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import { timeOperationMethod } from './utils.js'
import type { OperationMetrics } from './kad-dht.js'
import type { PeerRouting } from './peer-routing/index.js'
import type { RoutingTable } from './routing-table/index.js'
import type { ComponentLogger, Logger, PeerId, Startable } from '@libp2p/interface'
import type { ComponentLogger, Logger, Metrics, PeerId, Startable } from '@libp2p/interface'
import type { DeferredPromise } from 'p-defer'

export interface QuerySelfInit {
logPrefix: string
peerRouting: PeerRouting
Expand All @@ -28,6 +27,7 @@ export interface QuerySelfInit {
export interface QuerySelfComponents {
peerId: PeerId
logger: ComponentLogger
metrics?: Metrics
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export class QueryManager implements Startable {
// Create query paths from the starting peers
const paths = peersToQuery.map((peer, index) => {
return queryPath({
...options,
key,
startingPeer: peer,
ourPeerId: this.peerId,
Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu

try {
for await (const event of query({
...options,
key,
peer,
signal: compoundSignal,
Expand Down
Loading

0 comments on commit 73887f2

Please sign in to comment.