From 214288ee0029602040fd7e156eb617d46d731402 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 16 Nov 2024 15:05:12 -0300 Subject: [PATCH] wip --- envelopes.go | 13 ++++++++++++- nip45/{ => hyperloglog}/helpers.go | 2 +- nip45/{ => hyperloglog}/hll.go | 2 +- nip45/{ => hyperloglog}/hll_test.go | 2 +- pool.go | 19 +++++++++++++++++++ relay.go | 16 ++++++++++++---- subscription.go | 4 ++-- 7 files changed, 48 insertions(+), 10 deletions(-) rename nip45/{ => hyperloglog}/helpers.go (95%) rename nip45/{ => hyperloglog}/hll.go (98%) rename nip45/{ => hyperloglog}/hll_test.go (99%) diff --git a/envelopes.go b/envelopes.go index eec494c..8f401a4 100644 --- a/envelopes.go +++ b/envelopes.go @@ -2,6 +2,7 @@ package nostr import ( "bytes" + "encoding/hex" "encoding/json" "fmt" "strconv" @@ -142,7 +143,8 @@ func (v ReqEnvelope) MarshalJSON() ([]byte, error) { type CountEnvelope struct { SubscriptionID string Filters - Count *int64 + Count *int64 + HyperLogLog []byte } func (_ CountEnvelope) Label() string { return "COUNT" } @@ -161,9 +163,11 @@ func (v *CountEnvelope) UnmarshalJSON(data []byte) error { var countResult struct { Count *int64 `json:"count"` + HLL string `json:"hll"` } if err := json.Unmarshal([]byte(arr[2].Raw), &countResult); err == nil && countResult.Count != nil { v.Count = countResult.Count + v.HyperLogLog, _ = hex.DecodeString(countResult.HLL) return nil } @@ -189,6 +193,13 @@ func (v CountEnvelope) MarshalJSON() ([]byte, error) { if v.Count != nil { w.RawString(`,{"count":`) w.RawString(strconv.FormatInt(*v.Count, 10)) + if v.HyperLogLog != nil { + w.RawString(`,"hll":"`) + hllHex := make([]byte, 0, 512) + hex.Encode(hllHex, v.HyperLogLog) + w.Buffer.AppendBytes(hllHex) + w.RawString(`"`) + } w.RawString(`}`) } else { for _, filter := range v.Filters { diff --git a/nip45/helpers.go b/nip45/hyperloglog/helpers.go similarity index 95% rename from nip45/helpers.go rename to nip45/hyperloglog/helpers.go index 208d521..88aba53 100644 --- a/nip45/helpers.go +++ b/nip45/hyperloglog/helpers.go @@ -1,4 +1,4 @@ -package nip45 +package hyperloglog import ( "math" diff --git a/nip45/hll.go b/nip45/hyperloglog/hll.go similarity index 98% rename from nip45/hll.go rename to nip45/hyperloglog/hll.go index a77135e..b0ff9b1 100644 --- a/nip45/hll.go +++ b/nip45/hyperloglog/hll.go @@ -1,4 +1,4 @@ -package nip45 +package hyperloglog import ( "encoding/binary" diff --git a/nip45/hll_test.go b/nip45/hyperloglog/hll_test.go similarity index 99% rename from nip45/hll_test.go rename to nip45/hyperloglog/hll_test.go index 993bc92..8215a73 100644 --- a/nip45/hll_test.go +++ b/nip45/hyperloglog/hll_test.go @@ -1,4 +1,4 @@ -package nip45 +package hyperloglog import ( "encoding/hex" diff --git a/pool.go b/pool.go index a2004f2..d2ccc9a 100644 --- a/pool.go +++ b/pool.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/nbd-wtf/go-nostr/nip45/hyperloglog" "github.com/puzpuzpuz/xsync/v3" ) @@ -468,6 +469,24 @@ func (pool *SimplePool) subManyEose( return events } +// CountMany aggregates count results from multiple relays using HyperLogLog +func (pool *SimplePool) CountMany( + ctx context.Context, + urls []string, + filter Filter, + opts []SubscriptionOption, +) int { + hll := hyperloglog.New() + + for _, url := range urls { + go func(nm string) { + relay, err := pool.EnsureRelay(url) + }(NormalizeURL(url)) + } + + return int(hll.Count()) +} + // QuerySingle returns the first event returned by the first relay, cancels everything else. func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *RelayEvent { ctx, cancel := context.WithCancel(ctx) diff --git a/relay.go b/relay.go index 5825689..ea89455 100644 --- a/relay.go +++ b/relay.go @@ -273,7 +273,7 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error } case *CountEnvelope: if subscription, ok := r.Subscriptions.Load(subIdToSerial(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil { - subscription.countResult <- *env.Count + subscription.countResult <- *env } case *OKEnvelope: if okCallback, exist := r.okCallbacks.Load(env.EventID); exist { @@ -478,11 +478,19 @@ func (r *Relay) QuerySync(ctx context.Context, filter Filter) ([]*Event, error) } func (r *Relay) Count(ctx context.Context, filters Filters, opts ...SubscriptionOption) (int64, error) { + v, err := r.countInternal(ctx, filters, opts...) + if err != nil { + return 0, err + } + return *v.Count, nil +} + +func (r *Relay) countInternal(ctx context.Context, filters Filters, opts ...SubscriptionOption) (CountEnvelope, error) { sub := r.PrepareSubscription(ctx, filters, opts...) - sub.countResult = make(chan int64) + sub.countResult = make(chan CountEnvelope) if err := sub.Fire(); err != nil { - return 0, err + return CountEnvelope{}, err } defer sub.Unsub() @@ -499,7 +507,7 @@ func (r *Relay) Count(ctx context.Context, filters Filters, opts ...Subscription case count := <-sub.countResult: return count, nil case <-ctx.Done(): - return 0, ctx.Err() + return CountEnvelope{}, ctx.Err() } } } diff --git a/subscription.go b/subscription.go index d4c1af4..1adf62d 100644 --- a/subscription.go +++ b/subscription.go @@ -15,7 +15,7 @@ type Subscription struct { Filters Filters // for this to be treated as a COUNT and not a REQ this must be set - countResult chan int64 + countResult chan CountEnvelope // the Events channel emits all EVENTs that come in a Subscription // will be closed when the subscription ends @@ -152,7 +152,7 @@ func (sub *Subscription) Fire() error { if sub.countResult == nil { reqb, _ = ReqEnvelope{sub.id, sub.Filters}.MarshalJSON() } else { - reqb, _ = CountEnvelope{sub.id, sub.Filters, nil}.MarshalJSON() + reqb, _ = CountEnvelope{sub.id, sub.Filters, nil, nil}.MarshalJSON() } sub.live.Store(true)