Skip to content

Commit

Permalink
refactor: use protobuffer for API storenode queries
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Oct 24, 2024
1 parent 4519f58 commit b6fac0f
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 43 deletions.
12 changes: 12 additions & 0 deletions waku/v2/api/common/storenode_requestor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package common

import (
"context"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
)

type StorenodeRequestor interface {
Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error)
}
32 changes: 19 additions & 13 deletions waku/v2/api/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"
"encoding/hex"
"errors"
"math"
"sync"
Expand All @@ -10,8 +11,12 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"google.golang.org/protobuf/proto"

"go.uber.org/zap"
)

Expand All @@ -25,7 +30,7 @@ type work struct {
}

type HistoryRetriever struct {
store Store
store common.StorenodeRequestor
logger *zap.Logger
historyProcessor HistoryProcessor
}
Expand All @@ -35,11 +40,7 @@ type HistoryProcessor interface {
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
}

type Store interface {
Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error)
}

func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
return &HistoryRetriever{
store: store,
logger: logger.Named("history-retriever"),
Expand Down Expand Up @@ -257,12 +258,6 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
requestID := protocol.GenerateRequestID()
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))

opts := []store.RequestOption{
store.WithPaging(false, limit),
store.WithRequestID(requestID),
store.WithPeer(peerID),
store.WithCursor(cursor)}

logger.Debug("store.query",
logging.Timep("startTime", criteria.TimeStart),
logging.Timep("endTime", criteria.TimeEnd),
Expand All @@ -271,8 +266,19 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
zap.String("cursor", hexutil.Encode(cursor)),
)

storeQueryRequest := &pb.StoreQueryRequest{
RequestId: hex.EncodeToString(requestID),
IncludeData: true,
PubsubTopic: &criteria.PubsubTopic,
ContentTopics: criteria.ContentTopicsList(),
TimeStart: criteria.TimeStart,
TimeEnd: criteria.TimeEnd,
PaginationCursor: cursor,
PaginationLimit: proto.Uint64(limit),
}

queryStart := time.Now()
result, err := hr.store.Query(ctx, criteria, opts...)
result, err := hr.store.Query(ctx, peerID, storeQueryRequest)
queryDuration := time.Since(queryStart)
if err != nil {
logger.Error("error querying storenode", zap.Error(err))
Expand Down
13 changes: 5 additions & 8 deletions waku/v2/api/history/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol"
proto_pb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
Expand Down Expand Up @@ -91,22 +92,18 @@ func getInitialResponseKey(contentTopics []string) string {
return hex.EncodeToString(append([]byte("start"), []byte(contentTopics[0])...))
}

func (t *mockStore) Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) {
params := store.Parameters{}
for _, opt := range opts {
_ = opt(&params)
}
func (t *mockStore) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *pb.StoreQueryRequest) (common.StoreRequestResult, error) {
result := &mockResult{}
if params.Cursor() == nil {
initialResponse := getInitialResponseKey(criteria.ContentTopicsList())
if len(storeQueryRequest.GetPaginationCursor()) == 0 {
initialResponse := getInitialResponseKey(storeQueryRequest.GetContentTopics())
response := t.queryResponses[initialResponse]
if response.err != nil {
return nil, response.err
}
result.cursor = response.cursor
result.messages = response.messages
} else {
response := t.queryResponses[hex.EncodeToString(params.Cursor())]
response := t.queryResponses[hex.EncodeToString(storeQueryRequest.GetPaginationCursor())]
if response.err != nil {
return nil, response.err
}
Expand Down
12 changes: 4 additions & 8 deletions waku/v2/api/missing/default_requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
)

func NewDefaultStorenodeRequestor(store *store.WakuStore) StorenodeRequestor {
func NewDefaultStorenodeRequestor(store *store.WakuStore) common.StorenodeRequestor {
return &defaultStorenodeRequestor{
store: store,
}
Expand All @@ -24,10 +24,6 @@ func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerI
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
}

func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) {
return d.store.Query(ctx, store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
TimeStart: from,
TimeEnd: to,
}, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false))
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
return d.store.RequestRaw(ctx, peerID, storeQueryRequest)
}
42 changes: 28 additions & 14 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
Expand All @@ -31,17 +32,12 @@ type MessageTracker interface {
MessageExists(pb.MessageHash) (bool, error)
}

type StorenodeRequestor interface {
GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error)
QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error)
}

// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
type MissingMessageVerifier struct {
ctx context.Context
params missingMessageVerifierParams

storenodeRequestor StorenodeRequestor
storenodeRequestor common.StorenodeRequestor
messageTracker MessageTracker

criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
Expand All @@ -54,7 +50,7 @@ type MissingMessageVerifier struct {
}

// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier
func NewMissingMessageVerifier(storenodeRequester StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier {
func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier {
options = append(defaultMissingMessagesVerifierOptions, options...)
params := missingMessageVerifierParams{}
for _, opt := range options {
Expand Down Expand Up @@ -219,14 +215,19 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
)

result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
return m.storenodeRequestor.QueryWithCriteria(
storeQueryRequest := &storepb.StoreQueryRequest{
RequestId: hex.EncodeToString(protocol.GenerateRequestID()),
PubsubTopic: &interest.contentFilter.PubsubTopic,
ContentTopics: contentTopics[batchFrom:batchTo],
TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()),
PaginationLimit: proto.Uint64(messageFetchPageSize),
}

return m.storenodeRequestor.Query(
ctx,
interest.peerID,
messageFetchPageSize,
interest.contentFilter.PubsubTopic,
contentTopics[batchFrom:batchTo],
proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
proto.Int64(now.Add(-m.params.delay).UnixNano()),
storeQueryRequest,
)
}, logger, "retrieving history to check for missing messages")
if err != nil {
Expand Down Expand Up @@ -295,7 +296,20 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
defer cancel()
return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes)

var messageHashesBytes [][]byte
for _, m := range messageHashes {
messageHashesBytes = append(messageHashesBytes, m.Bytes())
}

storeQueryRequest := &storepb.StoreQueryRequest{
RequestId: hex.EncodeToString(protocol.GenerateRequestID()),
IncludeData: true,
MessageHashes: messageHashesBytes,
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
}

return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest)
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
Expand Down
29 changes: 29 additions & 0 deletions waku/v2/protocol/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,35 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
return result, nil
}

func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) {
err := storeRequest.Validate()
if err != nil {
return nil, err
}

var params Parameters
params.selectedPeer = peerID
if params.selectedPeer == "" {
return nil, ErrMustSelectPeer
}

response, err := s.queryFrom(ctx, storeRequest, &params)
if err != nil {
return nil, err
}

result := &resultImpl{
store: s,
messages: response.Messages,
storeRequest: storeRequest,
storeResponse: response,
peerID: params.selectedPeer,
cursor: response.PaginationCursor,
}

return result, nil
}

// Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not.
func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) {
return s.Request(ctx, criteria, opts...)
Expand Down

0 comments on commit b6fac0f

Please sign in to comment.