-
Notifications
You must be signed in to change notification settings - Fork 225
feat: cache operator state #1696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sam recently updated the golang version in the repo to 1.24. If you haven't done so yet, I suggest merging the latest master
into your branch.
disperser/controller/dispatcher.go
Outdated
go func() { | ||
ticker := time.NewTicker(d.OnchainStateRefreshInterval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
if err := d.refreshOnchainState(ctx); err != nil { | ||
d.logger.Error("failed to refresh onchain state", "err", err) | ||
} | ||
} | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest moving this to its own function, e.g. func (d *Dispatcher) stateFetcher()
, then calling go d.stateFetcher()
in the Start()
method.
disperser/controller/dispatcher.go
Outdated
defer func() { | ||
d.metrics.reportGetOperatorStateLatency(time.Since(start)) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer func() { | |
d.metrics.reportGetOperatorStateLatency(time.Since(start)) | |
}() | |
defer d.metrics.reportGetOperatorStateLatency(time.Since(start)) |
disperser/controller/dispatcher.go
Outdated
if time.Since(cachedOnChainState.LastRefreshed) > d.OnchainStateRefreshInterval { | ||
d.logger.Warn("cached onchain state is outdated, manually fetching it") | ||
err := d.refreshOnchainState(ctx) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("failed to refresh onchain state: %w", err) | ||
} | ||
// reload the fresh snapshot | ||
cachedOnChainState = d.cachedOnchainState.Load().(*CachedOnchainState) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way this is now, there may be a race where the background thread and this thread always go and attempt to fetch the data at approximately the same time.
Perhaps there could be two configuration values:
- the
OnchainStateRefreshInterval
, which is used by the background goroutine - a new setting
BatchMetadataMaxAge
, which is used by this block of code. By default, it should be larger thanOnchainStateRefreshInterval
(perhaps 2x or 5x as long).
It's ok to use slightly out of date chain state for new batches. Using chain state from five or ten minutes ago doesn't really hurt anything.
The end result is that this block will not trigger unless the background thread is unable to fetch the chain state for a long time. If that is happening, I'd also expect for this thread to fail at its task, and we'd start failing to create new batches. Creating no new batches is probably the correct behavior if we cease being able to get recent operator state for an extremely long period of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also suggest the following special behavior for these settings:
- if
OnchainStateRefreshInterval
is 0, then this code should not attempt to fetch any information on a background goroutine. In this case, it will fall back to fetching the chain state on the batch creation thread. - If
BatchMetadataMaxAge
(i.e. the new setting I suggest above) is 0, then it should always fetch the onchain state every time it creates a new batch.
The advantage of doing this is that we can fall back to the old pattern if we detect issues with the new pattern in the future. Additionally, if there are legacy tests that break with the new threading pattern, you can configure the code to pass with those legacy tests.
disperser/controller/dispatcher.go
Outdated
|
||
// Get a batch of blobs to dispatch | ||
// This also writes a batch header and blob inclusion info for each blob in metadata store | ||
batchData, err := d.NewBatch(ctx, referenceBlockNumber) | ||
batchData, err := d.NewBatch(ctx, cachedOnChainState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cachedOnChainState
is only used by the NewBatch()
method. Would it be possible to fetch the chain state inside NewBatch()
instead of passing it in? Perhaps you could even encapsulate the block of code that fetches cachedOnChainState
into a helper method.
HandleBatch()
is already a very long and complicated method, and so extracting code helps to reduce its complexity.
disperser/controller/dispatcher.go
Outdated
|
||
// Store updated cache atomically | ||
d.cachedOnchainState.Store(cached) | ||
d.logger.Debug("refreshed onchain state", "blockNumber", currentBlockNumber, "referenceBlockNumber", referenceBlockNumber, "quorumIDs", quorumIDs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, this line is longer than 120 characters, could you wrap it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I can't approve this change since I technically opened the PR, so you will probably have to press "approve" on it, or find somebody else to also do a review.
We are not going to merge this PR. @cody-littley is going to open a new PR for this Keeping it open in draft for now, for reference |
Why are these changes needed?
Cache batch metadata, instead of fetching it each time a new batch is created.