From 1b786ab213aea1c442e5758b070a5de4d59b8289 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 24 Sep 2024 12:05:22 -0300 Subject: [PATCH] take subscription options in pool.SubMany* --- pool.go | 77 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 18 deletions(-) diff --git a/pool.go b/pool.go index d91954c..ff132eb 100644 --- a/pool.go +++ b/pool.go @@ -173,16 +173,32 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) { // SubMany opens a subscription with the given filters to multiple relays // the subscriptions only end when the context is canceled -func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan RelayEvent { - return pool.subMany(ctx, urls, filters, true) +func (pool *SimplePool) SubMany( + ctx context.Context, + urls []string, + filters Filters, + opts ...SubscriptionOption, +) chan RelayEvent { + return pool.subMany(ctx, urls, filters, true, opts) } // SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays -func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent { - return pool.subMany(ctx, urls, filters, false) +func (pool *SimplePool) SubManyNonUnique( + ctx context.Context, + urls []string, + filters Filters, + opts ...SubscriptionOption, +) chan RelayEvent { + return pool.subMany(ctx, urls, filters, false, opts) } -func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent { +func (pool *SimplePool) subMany( + ctx context.Context, + urls []string, + filters Filters, + unique bool, + opts []SubscriptionOption, +) chan RelayEvent { ctx, cancel := context.WithCancel(ctx) _ = cancel // do this so `go vet` will stop complaining events := make(chan RelayEvent) @@ -228,7 +244,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt hasAuthed = false subscribe: - sub, err = relay.Subscribe(ctx, filters) + sub, err = relay.Subscribe(ctx, filters, opts...) if err != nil { goto reconnect } @@ -313,16 +329,32 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt } // SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE -func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan RelayEvent { - return pool.subManyEose(ctx, urls, filters, true) +func (pool *SimplePool) SubManyEose( + ctx context.Context, + urls []string, + filters Filters, + opts ...SubscriptionOption, +) chan RelayEvent { + return pool.subManyEose(ctx, urls, filters, true, opts) } // SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays -func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent { - return pool.subManyEose(ctx, urls, filters, false) +func (pool *SimplePool) SubManyEoseNonUnique( + ctx context.Context, + urls []string, + filters Filters, + opts ...SubscriptionOption, +) chan RelayEvent { + return pool.subManyEose(ctx, urls, filters, false, opts) } -func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent { +func (pool *SimplePool) subManyEose( + ctx context.Context, + urls []string, + filters Filters, + unique bool, + opts []SubscriptionOption, +) chan RelayEvent { ctx, cancel := context.WithCancel(ctx) events := make(chan RelayEvent) @@ -349,7 +381,7 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters hasAuthed := false subscribe: - sub, err := relay.Subscribe(ctx, filters) + sub, err := relay.Subscribe(ctx, filters, opts...) if sub == nil { debugLogf("error subscribing to %s with %v: %s", relay, filters, err) return @@ -416,13 +448,14 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F func (pool *SimplePool) batchedSubMany( ctx context.Context, dfs []DirectedFilters, - subFn func(context.Context, []string, Filters, bool) chan RelayEvent, + subFn func(context.Context, []string, Filters, bool, []SubscriptionOption) chan RelayEvent, + opts []SubscriptionOption, ) chan RelayEvent { res := make(chan RelayEvent) for _, df := range dfs { go func(df DirectedFilters) { - for ie := range subFn(ctx, []string{df.Relay}, df.Filters, true) { + for ie := range subFn(ctx, []string{df.Relay}, df.Filters, true, opts) { res <- ie } }(df) @@ -432,11 +465,19 @@ func (pool *SimplePool) batchedSubMany( } // BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same. -func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilters) chan RelayEvent { - return pool.batchedSubMany(ctx, dfs, pool.subMany) +func (pool *SimplePool) BatchedSubMany( + ctx context.Context, + dfs []DirectedFilters, + opts ...SubscriptionOption, +) chan RelayEvent { + return pool.batchedSubMany(ctx, dfs, pool.subMany, opts) } // BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays. -func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilters) chan RelayEvent { - return pool.batchedSubMany(ctx, dfs, pool.subManyEose) +func (pool *SimplePool) BatchedSubManyEose( + ctx context.Context, + dfs []DirectedFilters, + opts ...SubscriptionOption, +) chan RelayEvent { + return pool.batchedSubMany(ctx, dfs, pool.subManyEose, opts) }