Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 15, 2024
1 parent 0c12367 commit 01880d3
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 24 deletions.
2 changes: 1 addition & 1 deletion waku/v2/api/common/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
)

type Result interface {
type StoreRequestResult interface {
Cursor() []byte
IsComplete() bool
PeerID() peer.ID
Expand Down
6 changes: 3 additions & 3 deletions waku/v2/api/missing/default_requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ type defaultStorenodeRequestor struct {
store *store.WakuStore
}

func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.Result, error) {
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
}

func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.Result, error) {
func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) {
return d.store.Query(ctx, store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
TimeStart: from,
TimeEnd: to,
}, store.WithPeer(peerID), store.WithPaging(false, 100), store.IncludeData(false))
}, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false))
}
17 changes: 9 additions & 8 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

const maxContentTopicsPerRequest = 10
const maxMsgHashesPerRequest = 50
const messageFetchPageSize = 100

// MessageTracker should keep track of messages it has seen before and
// provide a way to determine whether a message exists or not. This
Expand All @@ -31,8 +32,8 @@ type MessageTracker interface {
}

type StorenodeRequestor interface {
GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.Result, error)
QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.Result, error)
GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error)
QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error)
}

// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
Expand Down Expand Up @@ -183,7 +184,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
}
}

func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (common.Result, error), logger *zap.Logger, logMsg string) (common.Result, error) {
func (m *MissingMessageVerifier) storeQueryWithRetry(ctx context.Context, queryFunc func(ctx context.Context) (common.StoreRequestResult, error), logger *zap.Logger, logMsg string) (common.StoreRequestResult, error) {
retry := true
count := 1
for retry && count <= m.params.maxAttemptsToRetrieveHistory {
Expand Down Expand Up @@ -217,11 +218,11 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
logging.Epoch("to", now),
)

result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) {
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
return m.storenodeRequestor.QueryWithCriteria(
ctx,
interest.peerID,
100,
messageFetchPageSize,
interest.contentFilter.PubsubTopic,
contentTopics[batchFrom:batchTo],
proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
Expand Down Expand Up @@ -252,7 +253,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
missingHashes = append(missingHashes, hash)
}

result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) {
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -291,7 +292,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
defer utils.LogOnPanic()
defer wg.Wait()

result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) {
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
defer cancel()
return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes)
Expand All @@ -312,7 +313,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
}
}

result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.Result, error) {
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}
Expand Down
26 changes: 21 additions & 5 deletions waku/v2/api/publish/default_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,49 @@ package publish

import (
"context"
"errors"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)

var ErrRelayNotAvailable = errors.New("relay is not available")
var ErrLightpushNotAvailable = errors.New("lightpush is not available")

func NewDefaultPublisher(lightpush *lightpush.WakuLightPush, relay *relay.WakuRelay) Publisher {
return &defaultPublisher{
lightPush: lightpush,
lightpush: lightpush,
relay: relay,
}
}

type defaultPublisher struct {
lightPush *lightpush.WakuLightPush
lightpush *lightpush.WakuLightPush
relay *relay.WakuRelay
}

func (d *defaultPublisher) RelayListPeers(pubsubTopic string) []peer.ID {
return d.relay.PubSub().ListPeers(pubsubTopic)
func (d *defaultPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) {
if d.relay == nil {
return nil, ErrRelayNotAvailable
}

return d.relay.PubSub().ListPeers(pubsubTopic), nil
}

func (d *defaultPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
if d.relay == nil {
return pb.MessageHash{}, ErrRelayNotAvailable
}

return d.relay.Publish(ctx, message, relay.WithPubSubTopic(pubsubTopic))
}

func (d *defaultPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) {
return d.lightPush.Publish(ctx, message, lightpush.WithPubSubTopic(pubsubTopic), lightpush.WithMaxPeers(maxPeers))
if d.lightpush == nil {
return pb.MessageHash{}, ErrLightpushNotAvailable
}

return d.lightpush.Publish(ctx, message, lightpush.WithPubSubTopic(pubsubTopic), lightpush.WithMaxPeers(maxPeers))
}
2 changes: 1 addition & 1 deletion waku/v2/api/publish/default_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type defaultStorenodeMessageVerifier struct {
store *store.WakuStore
}

func (d *defaultStorenodeMessageVerifier) MessagesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
var opts []store.RequestOption
opts = append(opts, store.WithRequestID(requestID))
opts = append(opts, store.WithPeer(peerID))
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/api/publish/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ISentCheck interface {

type StorenodeMessageVerifier interface {
// MessagesExist returns a list of the messages it found from a list of message hashes
MessagesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
}

// MessageSentCheck tracks the outgoing messages and check against store node
Expand Down Expand Up @@ -228,7 +228,7 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c

queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
defer cancel()
result, err := m.messageVerifier.MessagesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
if err != nil {
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
return []common.Hash{}
Expand Down
11 changes: 7 additions & 4 deletions waku/v2/api/publish/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (pm PublishMethod) String() string {

type Publisher interface {
// RelayListPeers returns the list of peers for a pubsub topic
RelayListPeers(pubsubTopic string) []peer.ID
RelayListPeers(pubsubTopic string) ([]peer.ID, error)

// RelayPublish publishes a message via WakuRelay
RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error)
Expand Down Expand Up @@ -128,9 +128,12 @@ func (ms *MessageSender) Send(req *Request) error {
return err
}
case Relay:
peerCnt := len(ms.publisher.RelayListPeers(req.envelope.PubsubTopic()))
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
_, err := ms.publisher.RelayPublish(req.ctx, req.envelope.Message(), req.envelope.PubsubTopic())
peers, err := ms.publisher.RelayListPeers(req.envelope.PubsubTopic())
if err != nil {
return err
}
logger.Info("publishing message via relay", zap.Int("peerCnt", len(peers)))
_, err = ms.publisher.RelayPublish(req.ctx, req.envelope.Message(), req.envelope.PubsubTopic())
if err != nil {
return err
}
Expand Down

0 comments on commit 01880d3

Please sign in to comment.