Skip to content

Commit 8d3a9ba

Browse files
feat: add "pagination" to http-based event log poller (#131)
Modify the HTTP log poller to fetch blocks in pages or chunks. Instead of trying to fetch all the logs from the RPC in one go (which fails if there are too many blocks between the specified "initial block" until the last one), we now fetch logs in sequential "pages" of N blocks. For instance, say we're using a page size of 1000: ``` # before GetLogs(FromBlock: 0, ToBlock: "<CurrentBLock>") # after GetLogs(FromBlock: 0, ToBlock: 1000) GetLogs(FromBlock: 1000, ToBlock: 2000) GetLogs(FromBlock: 2000, ToBlock: 3000) ... GetLogs(FromBlock: 99000, ToBlock: "<CurrentBlock>") ``` The page size is controlled by a new flag "event-listener-poll-size" (or the corresponding environment variable "EVENT_LISTENER_POLL_SIZE"). The recommended value based on tests with Monad is 1000.
1 parent 895c0e4 commit 8d3a9ba

File tree

6 files changed

+121
-25
lines changed

6 files changed

+121
-25
lines changed

cmd/start.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func startCommand() *cobra.Command {
2020

2121
nodeURL, privateKey, timelockAddress, callProxyAddress string
2222
fromBlock, pollPeriod, eventListenerPollPeriod int64
23+
eventListenerPollSize uint64
2324
dryRun bool
2425
)
2526

@@ -42,6 +43,7 @@ func startCommand() *cobra.Command {
4243
startCmd.Flags().Int64Var(&fromBlock, "from-block", timelockConf.FromBlock, "Start watching from this block")
4344
startCmd.Flags().Int64Var(&pollPeriod, "poll-period", timelockConf.PollPeriod, "Poll period in seconds")
4445
startCmd.Flags().Int64Var(&eventListenerPollPeriod, "event-listener-poll-period", timelockConf.EventListenerPollPeriod, "Event Listener poll period in seconds")
46+
startCmd.Flags().Uint64Var(&eventListenerPollSize, "event-listener-poll-size", timelockConf.EventListenerPollSize, "Number of entries to fetch when polling logs")
4547
startCmd.Flags().BoolVar(&dryRun, "dry-run", timelockConf.DryRun, "Enable \"dry run\" mode -- monitor events but don't trigger any calls")
4648

4749
return &startCmd
@@ -91,13 +93,18 @@ func startTimelock(cmd *cobra.Command) {
9193
slog.Fatalf("value of poll-period not set: %s", err.Error())
9294
}
9395

96+
eventListenerPollSize, err := cmd.Flags().GetUint64("event-listener-poll-size")
97+
if err != nil {
98+
slog.Fatalf("value of event-listener-poll-size not set: %s", err.Error())
99+
}
100+
94101
dryRun, err := cmd.Flags().GetBool("dry-run")
95102
if err != nil {
96103
slog.Fatalf("value of dry-run not set: %s", err.Error())
97104
}
98105

99106
tWorker, err := timelock.NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey,
100-
big.NewInt(fromBlock), pollPeriod, eventListenerPollPeriod, dryRun, slog)
107+
big.NewInt(fromBlock), pollPeriod, eventListenerPollPeriod, eventListenerPollSize, dryRun, slog)
101108
if err != nil {
102109
slog.Fatalf("error creating the timelock-worker: %s", err.Error())
103110
}

pkg/cli/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type Config struct {
1919
FromBlock int64 `mapstructure:"FROM_BLOCK"`
2020
PollPeriod int64 `mapstructure:"POLL_PERIOD"`
2121
EventListenerPollPeriod int64 `mapstructure:"EVENT_LISTENER_POLL_PERIOD"`
22+
EventListenerPollSize uint64 `mapstructure:"EVENT_LISTENER_POLL_SIZE"`
2223
DryRun bool `mapstructure:"DRY_RUN"`
2324
}
2425

@@ -83,6 +84,15 @@ func NewTimelockCLI() (*Config, error) {
8384
c.EventListenerPollPeriod = int64(pp)
8485
}
8586

87+
if os.Getenv("EVENT_LISTENER_POLL_SIZE") != "" {
88+
pp, err := strconv.Atoi(os.Getenv("EVENT_LISTENER_POLL_SIZE"))
89+
if err != nil {
90+
return nil, fmt.Errorf("unable to parse EVENT_LISTENER_POLL_SIZE value: %w", err)
91+
}
92+
93+
c.EventListenerPollSize = uint64(pp) //nolint:gosec
94+
}
95+
8696
if os.Getenv("DRY_RUN") != "" {
8797
trueValues := []string{"true", "yes", "on", "enabled", "1"}
8898
c.DryRun = slices.Contains(trueValues, strings.ToLower(os.Getenv("DRY_RUN")))

pkg/timelock/const_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ var (
1616
testFromBlock = big.NewInt(0)
1717
testPollPeriod = 5
1818
testEventListenerPollPeriod = 1
19+
testEventListenerPollSize = uint64(10)
1920
testDryRun = false
2021
testLogger = lo.Must(logger.NewLogger("info", "human")).Sugar()
2122
)

pkg/timelock/timelock.go

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type Worker struct {
3636
fromBlock *big.Int
3737
pollPeriod int64
3838
listenerPollPeriod int64
39+
pollSize uint64
3940
dryRun bool
4041
logger *zap.SugaredLogger
4142
privateKey *ecdsa.PrivateKey
@@ -50,7 +51,7 @@ var validNodeUrlSchemes = []string{"http", "https", "ws", "wss"}
5051
// It's a singleton, so further executions will retrieve the same timelockWorker.
5152
func NewTimelockWorker(
5253
nodeURL, timelockAddress, callProxyAddress, privateKey string, fromBlock *big.Int,
53-
pollPeriod int64, listenerPollPeriod int64, dryRun bool, logger *zap.SugaredLogger,
54+
pollPeriod int64, listenerPollPeriod int64, pollSize uint64, dryRun bool, logger *zap.SugaredLogger,
5455
) (*Worker, error) {
5556
// Sanity check on each provided variable before allocating more resources.
5657
u, err := url.ParseRequestURI(nodeURL)
@@ -78,6 +79,10 @@ func NewTimelockWorker(
7879
return nil, fmt.Errorf("event-listener-poll-period must be a positive non-zero integer: got %d", listenerPollPeriod)
7980
}
8081

82+
if slices.Contains(httpSchemes, u.Scheme) && pollSize == 0 {
83+
return nil, fmt.Errorf("event-listener-poll-size must be a positive non-zero integer: got %d", pollSize)
84+
}
85+
8186
if fromBlock.Int64() < big.NewInt(0).Int64() {
8287
return nil, fmt.Errorf("from block can't be a negative number (minimum value 0): got %d", fromBlock.Int64())
8388
}
@@ -127,6 +132,7 @@ func NewTimelockWorker(
127132
fromBlock: fromBlock,
128133
pollPeriod: pollPeriod,
129134
listenerPollPeriod: listenerPollPeriod,
135+
pollSize: pollSize,
130136
dryRun: dryRun,
131137
logger: logger,
132138
privateKey: privateKeyECDSA,
@@ -193,10 +199,11 @@ func (tw *Worker) Listen(ctx context.Context) error {
193199
}
194200

195201
// setupFilterQuery returns an ethereum.FilterQuery initialized to watch the Timelock contract.
196-
func (tw *Worker) setupFilterQuery(fromBlock *big.Int) ethereum.FilterQuery {
202+
func (tw *Worker) setupFilterQuery(fromBlock, toBlock *big.Int) ethereum.FilterQuery {
197203
return ethereum.FilterQuery{
198204
Addresses: tw.address,
199205
FromBlock: fromBlock,
206+
ToBlock: toBlock,
200207
}
201208
}
202209

@@ -216,7 +223,7 @@ func (tw *Worker) retrieveNewLogs(ctx context.Context) (<-chan struct{}, <-chan
216223

217224
// subscribeNewLogs subscribes to a Timelock contract and emit logs through the channel it returns.
218225
func (tw *Worker) subscribeNewLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
219-
query := tw.setupFilterQuery(tw.fromBlock)
226+
query := tw.setupFilterQuery(tw.fromBlock, nil)
220227
logCh := make(chan types.Log)
221228
done := make(chan struct{})
222229

@@ -291,7 +298,7 @@ func (tw *Worker) pollNewLogs(ctx context.Context) (<-chan struct{}, <-chan type
291298
defer ticker.Stop()
292299

293300
for {
294-
lastBlock = tw.fetchAndDispatchLogs(ctx, logCh, lastBlock)
301+
lastBlock = tw.fetchAndDispatchLogs(ctx, logCh, lastBlock, nil)
295302

296303
select {
297304
case <-ticker.C:
@@ -311,7 +318,7 @@ func (tw *Worker) pollNewLogs(ctx context.Context) (<-chan struct{}, <-chan type
311318
// retrieveHistoricalLogs returns a types.Log channel and retrieves all the historical events of a given contract.
312319
// Once all the logs have been sent into the channel the function returns and the channel is closed.
313320
func (tw *Worker) retrieveHistoricalLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
314-
query := tw.setupFilterQuery(tw.fromBlock)
321+
query := tw.setupFilterQuery(tw.fromBlock, nil)
315322
logCh := make(chan types.Log)
316323
done := make(chan struct{})
317324

@@ -352,30 +359,48 @@ func (tw *Worker) retrieveHistoricalLogs(ctx context.Context) (<-chan struct{},
352359
return done, logCh, nil
353360
}
354361

355-
func (tw *Worker) fetchAndDispatchLogs(ctx context.Context, logCh chan types.Log, lastBlock *big.Int) *big.Int {
356-
query := tw.setupFilterQuery(lastBlock)
362+
func (tw *Worker) fetchAndDispatchLogs(
363+
ctx context.Context, logCh chan types.Log, fromBlock, currentChainBlock *big.Int,
364+
) *big.Int {
365+
if currentChainBlock == nil {
366+
blockNumber, err := tw.ethClient.BlockNumber(ctx)
367+
if err != nil {
368+
tw.logger.With("error", err).Error("unable to fetch current block number from eth client")
369+
}
370+
currentChainBlock = new(big.Int).SetUint64(blockNumber)
371+
}
372+
toBlock := new(big.Int).SetUint64(min(currentChainBlock.Uint64(), fromBlock.Uint64()+tw.pollSize))
373+
374+
query := tw.setupFilterQuery(fromBlock, toBlock)
375+
tw.logger.Debugf("fetching logs from block %v to block %v", query.FromBlock, query.ToBlock)
357376
logs, err := tw.ethClient.FilterLogs(ctx, query)
358377
if err != nil {
359378
tw.logger.With("error", err).Error("unable to fetch logs from eth client")
360379
SetReadyStatus(HealthStatusError) // FIXME(gustavogama-cll): wait for N errors before setting status
361380

362-
return lastBlock
381+
return fromBlock
363382
}
364-
tw.logger.Debugf("fetched %d log entries starting from block %d", len(logs), lastBlock)
383+
384+
tw.logger.Debugf("fetched %d log entries from block %d to block %d", len(logs), fromBlock, toBlock)
365385
SetReadyStatus(HealthStatusOK)
366386

367387
for _, log := range logs {
368-
lastBlock = new(big.Int).SetUint64(max(lastBlock.Uint64(), log.BlockNumber+1))
369388
select {
370389
case logCh <- log:
371390
tw.logger.With("log", log).Debug("dispatching log")
372391
case <-ctx.Done():
373392
tw.logger.Debug("stopped while dispatching logs: incomplete retrieval.")
374-
break
393+
return toBlock
375394
}
376395
}
377396

378-
return lastBlock
397+
if toBlock.Cmp(currentChainBlock) < 0 {
398+
// we haven't reached the current block; re-run same procedure with
399+
// the 'toBlock` as the start block
400+
return tw.fetchAndDispatchLogs(ctx, logCh, toBlock, currentChainBlock)
401+
}
402+
403+
return toBlock
379404
}
380405

381406
// processLogs is implemented as a fan-in for all the logs channels, merging all the data and handling logs sequentially.
@@ -512,4 +537,5 @@ func (tw *Worker) startLog() {
512537
tw.logger.Infof("\tStarting from block: %v", tw.fromBlock)
513538
tw.logger.Infof("\tPoll Period: %v", time.Duration(tw.pollPeriod*int64(time.Second)).String())
514539
tw.logger.Infof("\tEvent Listener Poll Period: %v", time.Duration(tw.listenerPollPeriod*int64(time.Second)).String())
540+
tw.logger.Infof("\tEvent Listener Poll # Logs%v", tw.pollSize)
515541
}

pkg/timelock/timelock_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313

1414
func newTestTimelockWorker(
1515
t *testing.T, nodeURL, timelockAddress, callProxyAddress, privateKey string, fromBlock *big.Int,
16-
pollPeriod int64, eventListenerPollPeriod int64, dryRun bool, logger *zap.SugaredLogger,
16+
pollPeriod int64, eventListenerPollPeriod int64, eventListenerPollSize uint64, dryRun bool,
17+
logger *zap.SugaredLogger,
1718
) *Worker {
1819
assert.NotEmpty(t, nodeURL, "nodeURL is empty. Are environment variabes in const_test.go set?")
1920
assert.NotEmpty(t, timelockAddress, "nodeURL is empty. Are environment variabes in const_test.go set?")
@@ -24,7 +25,7 @@ func newTestTimelockWorker(
2425
assert.NotNil(t, logger, "logger is nil. Are environment variabes in const_test.go set?")
2526

2627
tw, err := NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey, fromBlock,
27-
pollPeriod, eventListenerPollPeriod, dryRun, logger)
28+
pollPeriod, eventListenerPollPeriod, eventListenerPollSize, dryRun, logger)
2829
require.NoError(t, err)
2930
require.NotNil(t, tw)
3031

@@ -44,6 +45,7 @@ func TestNewTimelockWorker(t *testing.T) {
4445
fromBlock *big.Int
4546
pollPeriod int64
4647
eventListenerPollPeriod int64
48+
eventListenerPollSize uint64
4749
dryRun bool
4850
logger *zap.SugaredLogger
4951
}
@@ -55,6 +57,7 @@ func TestNewTimelockWorker(t *testing.T) {
5557
fromBlock: big.NewInt(1),
5658
pollPeriod: 900,
5759
eventListenerPollPeriod: 60,
60+
eventListenerPollSize: 1000,
5861
dryRun: false,
5962
logger: zap.NewNop().Sugar(),
6063
}
@@ -108,6 +111,11 @@ func TestNewTimelockWorker(t *testing.T) {
108111
setup: func(a *argsT) { a.eventListenerPollPeriod = -1 },
109112
wantErr: "event-listener-poll-period must be a positive non-zero integer: got -1",
110113
},
114+
{
115+
name: "failure - bad event listener poll size",
116+
setup: func(a *argsT) { a.eventListenerPollSize = 0 },
117+
wantErr: "event-listener-poll-size must be a positive non-zero integer: got 0",
118+
},
111119
}
112120
for _, tt := range tests {
113121
t.Run(tt.name, func(t *testing.T) {
@@ -118,7 +126,7 @@ func TestNewTimelockWorker(t *testing.T) {
118126

119127
got, err := NewTimelockWorker(args.nodeURL, args.timelockAddress, args.callProxyAddress,
120128
args.privateKey, args.fromBlock, args.pollPeriod, args.eventListenerPollPeriod,
121-
args.dryRun, args.logger)
129+
args.eventListenerPollSize, args.dryRun, args.logger)
122130

123131
if tt.wantErr == "" {
124132
require.NoError(t, err)
@@ -136,7 +144,8 @@ func TestWorker_startLog(t *testing.T) {
136144
rpcURL := runRPCServer(t)
137145

138146
testWorker := newTestTimelockWorker(t, rpcURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
139-
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)
147+
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testEventListenerPollSize,
148+
testDryRun, testLogger)
140149

141150
tests := []struct {
142151
name string

tests/integration/timelock_test.go

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package integration
33
import (
44
"context"
55
"math/big"
6+
"strings"
67
"testing"
78
"time"
89

@@ -65,7 +66,7 @@ func (s *integrationTestSuite) TestTimelockWorkerListen() {
6566
callProxyAddress, _, _, _ := DeployCallProxy(s.T(), ctx, transactor, backend, timelockAddress)
6667

6768
go runTimelockWorker(s.T(), sctx, tt.url, timelockAddress.String(), callProxyAddress.String(),
68-
account.HexPrivateKey, big.NewInt(0), int64(60), int64(1), true, logger)
69+
account.HexPrivateKey, big.NewInt(0), int64(60), int64(1), uint64(10), true, logger)
6970

7071
UpdateDelay(s.T(), ctx, transactor, backend, timelockContract, big.NewInt(10))
7172

@@ -123,7 +124,7 @@ func (s *integrationTestSuite) TestTimelockWorkerDryRun() {
123124
s.Require().EventuallyWithT(func(t *assert.CollectT) {
124125
assertLogMessage(t, logs, "scheduling operation: 371141ec10c0cc52996bed94240931136172d0b46bdc4bceaea1ef76675c1237")
125126
assertLogMessage(t, logs, "scheduled operation: 371141ec10c0cc52996bed94240931136172d0b46bdc4bceaea1ef76675c1237")
126-
}, 2*time.Second, 100*time.Millisecond)
127+
}, 2*time.Second, 100*time.Millisecond, logMessages(logs))
127128
},
128129
},
129130
}
@@ -139,7 +140,7 @@ func (s *integrationTestSuite) TestTimelockWorkerDryRun() {
139140
callProxyAddress, _, _, _ := DeployCallProxy(s.T(), tctx, transactor, backend, timelockAddress)
140141

141142
go runTimelockWorker(s.T(), tctx, gethURL, timelockAddress.String(), callProxyAddress.String(),
142-
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), tt.dryRun, logger)
143+
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), uint64(10), tt.dryRun, logger)
143144

144145
ScheduleBatch(s.T(), tctx, transactor, backend, timelockContract, calls, [32]byte{}, [32]byte{}, big.NewInt(1))
145146

@@ -168,7 +169,7 @@ func (s *integrationTestSuite) TestTimelockWorkerCancelledEvent() {
168169
callProxyAddress, _, _, _ := DeployCallProxy(s.T(), ctx, transactor, backend, timelockAddress)
169170

170171
go runTimelockWorker(s.T(), ctx, gethURL, timelockAddress.String(), callProxyAddress.String(),
171-
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), false, logger)
172+
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), uint64(10), false, logger)
172173

173174
calls := []contracts.RBACTimelockCall{{
174175
Target: common.HexToAddress("0x000000000000000000000000000000000000000"),
@@ -189,16 +190,50 @@ func (s *integrationTestSuite) TestTimelockWorkerCancelledEvent() {
189190
assertLogMessage(s.T(), logs, "de-scheduled operation: 371141ec10c0cc52996bed94240931136172d0b46bdc4bceaea1ef76675c1237")
190191
}
191192

193+
func (s *integrationTestSuite) TestTimelockWorkerPollSize() {
194+
// --- arrange ---
195+
ctx, cancel := context.WithCancel(s.Ctx)
196+
defer cancel()
197+
198+
account := NewTestAccount(s.T())
199+
_, err := s.GethContainer.CreateAccount(ctx, account.HexAddress, account.HexPrivateKey, 1)
200+
s.Require().NoError(err)
201+
s.Logf("new account created: %v", account)
202+
203+
gethURL := s.GethContainer.HTTPConnStr(s.T(), ctx)
204+
backend := NewRPCBackend(s.T(), ctx, gethURL)
205+
transactor := s.KeyedTransactor(account.PrivateKey, nil)
206+
logger, logs := timelockTests.NewTestLogger()
207+
208+
timelockAddress, _, _, _ := DeployTimelock(s.T(), ctx, transactor, backend,
209+
account.Address, big.NewInt(1))
210+
callProxyAddress, _, _, _ := DeployCallProxy(s.T(), ctx, transactor, backend, timelockAddress)
211+
212+
time.Sleep(1*time.Second) // wait for a few blocks before starting the timelock worker service
213+
214+
// --- act ---
215+
go runTimelockWorker(s.T(), ctx, gethURL, timelockAddress.String(), callProxyAddress.String(),
216+
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), uint64(2), false, logger)
217+
218+
// --- assert ---
219+
s.Require().EventuallyWithT(func(collect *assert.CollectT) {
220+
assertLogMessage(collect, logs, "fetching logs from block 0 to block 2")
221+
assertLogMessage(collect, logs, "fetching logs from block 2 to block 4")
222+
assertLogMessage(collect, logs, "fetching logs from block 4 to block 6")
223+
}, 2*time.Second, 100*time.Millisecond, logMessages(logs))
224+
}
225+
192226
// ----- helpers -----
193227

194228
func runTimelockWorker(
195229
t *testing.T, ctx context.Context, nodeURL, timelockAddress, callProxyAddress, privateKey string,
196-
fromBlock *big.Int, pollPeriod int64, listenerPollPeriod int64, dryRun bool, logger *zap.Logger,
230+
fromBlock *big.Int, pollPeriod int64, listenerPollPeriod int64, listenerPollSize uint64,
231+
dryRun bool, logger *zap.Logger,
197232
) {
198-
t.Logf("TimelockWorker.Listen(%v, %v, %v, %v, %v, %v, %v)", nodeURL, timelockAddress,
199-
callProxyAddress, privateKey, fromBlock, pollPeriod, listenerPollPeriod)
233+
t.Logf("TimelockWorker.Listen(%v, %v, %v, %v, %v, %v, %v, %v)", nodeURL, timelockAddress,
234+
callProxyAddress, privateKey, fromBlock, pollPeriod, listenerPollPeriod, listenerPollSize)
200235
timelockWorker, err := timelock.NewTimelockWorker(nodeURL, timelockAddress,
201-
callProxyAddress, privateKey, fromBlock, pollPeriod, listenerPollPeriod, dryRun, logger.Sugar())
236+
callProxyAddress, privateKey, fromBlock, pollPeriod, listenerPollPeriod, listenerPollSize, dryRun, logger.Sugar())
202237
require.NoError(t, err)
203238
require.NotNil(t, timelockWorker)
204239

@@ -209,3 +244,11 @@ func runTimelockWorker(
209244
func assertLogMessage(t assert.TestingT, logs *observer.ObservedLogs, message string) {
210245
assert.Equal(t, logs.FilterMessage(message).Len(), 1)
211246
}
247+
248+
func logMessages(logs *observer.ObservedLogs) string {
249+
m := make([]string, 0, logs.Len())
250+
for _, entry := range logs.All() {
251+
m = append(m, entry.Message)
252+
}
253+
return "LOGS:\n" + strings.Join(m, "\n")
254+
}

0 commit comments

Comments
 (0)