-
Notifications
You must be signed in to change notification settings - Fork 56
Sync flush time #300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sync flush time #300
Conversation
403a844
to
3cbf5ac
Compare
f550b92
to
865562c
Compare
5e8b0ce
to
f14994a
Compare
f14994a
to
2a1d7d6
Compare
go.mod
Outdated
@@ -105,7 +105,7 @@ require ( | |||
) | |||
|
|||
// Using a fork of the Alertmanager with Alerting Squad specific changes. | |||
replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20250305143719-fa9fa7096626 | |||
replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20250306181800-10368d39a559 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be updated on the AM PR is merged
go.sum
Outdated
@@ -194,8 +194,8 @@ github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= | |||
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | |||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= | |||
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= | |||
github.com/grafana/prometheus-alertmanager v0.25.1-0.20250305143719-fa9fa7096626 h1:QsMYtDseSPq8hXvoNtA64unFiawJaE5kryizcMsVZWg= | |||
github.com/grafana/prometheus-alertmanager v0.25.1-0.20250305143719-fa9fa7096626/go.mod h1:FGdGvhI40Dq+CTQaSzK9evuve774cgOUdGfVO04OXkw= | |||
github.com/grafana/prometheus-alertmanager v0.25.1-0.20250306181800-10368d39a559 h1:E7az+c68g6E5X0mt8ZRm9+wZ530EWSQUTHgVZ1AKBRQ= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise
sync: true, | ||
entries: []*nflogpb.Entry{}, | ||
pipelineTime: now, | ||
expectedErr: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about actually including the expected error here, or at least a string we can check against?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I got a bit lazy on these tests tbh 🙈
added proper errors now and comparing against those 👍
|
||
if sfs.sync { | ||
select { | ||
case <-time.After(wait): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should update the context with a new "Now" based on the wait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, maybe we should.. not sure what would be the impact of that in the other stages, but if we're simulating a flush delay I guess that it would be the right thing..
cc @yuri-tceretian since it would affect the extra dedup specifically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the timeNow must be the time when snapshot of the aggrGroup was taken. This step runs after that and does not make the new snapshot.
I think after it wakes up it needs to run some kind of "coordination step" but without warning log and proceed if there wasn't any state from the future. Otherwise, it should immediately exit pipeline and start waiting for group_interval
Co-authored-by: William Wernert <[email protected]>
return ctx, nil, ErrMissingGroupInterval | ||
} | ||
|
||
entries, err := sfs.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(sfs.recv)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will break when user adds a new integration to the receiver, and basically, will delay old integrations while not delaying new integration.
@@ -892,6 +901,9 @@ func (am *GrafanaAlertmanager) createReceiverStage(name string, integrations []* | |||
Idx: uint32(integrations[i].Index()), | |||
} | |||
var s notify.MultiStage | |||
if stage := stages.NewSyncFlushStage(notificationLog, recv, syncAct, syncMargin); stage != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this stage should run before Fanout stage, perhaps right after meshStage
or right after inhibitionStage . This will guarantee that the pipeline is delayed correctly.
Context
The idea is to have all AM instances flushing alerts at the same/similar time. This should reduce the number of duplicated notifications and help reduce escalations.
The strategy is to add a new stage to the notification pipeline. On grafana/prometheus-alertmanager#101, we're adding the pipeline time to the notification entry, so we can retrieve this in the next flush. We then compare the current pipeline time with the expected next flush (summing the previous pipeline with the group wait and considering a margin). If the current execution time is similar to the expected next flush, we execute it immediately. Otherwise, wait for
nextFlush - currentPipelineTime
.Depends on
Related PR's