Skip to content

Commit

Permalink
move timeout before doFlush
Browse files Browse the repository at this point in the history
  • Loading branch information
juliaElastic committed Nov 6, 2024
1 parent 6152b0a commit 17de402
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 14 deletions.
7 changes: 5 additions & 2 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent
policyAcks = append(policyAcks, event.ActionId)
policyIdxs = append(policyIdxs, n)
}
log.Debug().Msg("ack policy change")
// Set OK status, this can be overwritten in case of the errors later when the policy change events acked
setResult(n, http.StatusOK)
span.End()
Expand Down Expand Up @@ -365,14 +366,14 @@ func (ack *AckT) handleActionResult(ctx context.Context, zlog zerolog.Logger, ag

// Save action result document
if err := dl.CreateActionResult(ctx, ack.bulk, acr); err != nil {
zlog.Error().Err(err).Msg("create action result")
zlog.Error().Err(err).Str(logger.AgentID, agent.Agent.ID).Str(logger.ActionID, action.Id).Msg("create action result")
return err
}

if action.Type == TypeUpgrade {
event, _ := ev.AsUpgradeEvent()
if err := ack.handleUpgrade(ctx, zlog, agent, event); err != nil {
zlog.Error().Err(err).Msg("handle upgrade event")
zlog.Error().Err(err).Str(logger.AgentID, agent.Agent.ID).Str(logger.ActionID, action.Id).Msg("handle upgrade event")
return err
}
}
Expand Down Expand Up @@ -634,6 +635,8 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
zlog.Info().
Str("lastReportedVersion", agent.Agent.Version).
Str("upgradedAt", now).
Str(logger.AgentID, agent.Agent.ID).
Str(logger.ActionID, event.ActionId).
Msg("ack upgrade")

return nil
Expand Down
19 changes: 11 additions & 8 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,18 +346,14 @@ func (b *Bulker) Run(ctx context.Context) error {
var itemCnt int
var byteCnt int

doFlush := func() error {

// deadline prevents bulker being blocked on flush
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
doFlush := func(flushCtx context.Context) error {

for i := range queues {
q := &queues[i]
if q.pending > 0 {

// Pass queue structure by value
if err := b.flushQueue(ctx, w, *q); err != nil {
if err := b.flushQueue(flushCtx, w, *q); err != nil {
return err
}

Expand Down Expand Up @@ -409,7 +405,10 @@ func (b *Bulker) Run(ctx context.Context) error {
Int("byteCnt", byteCnt).
Msg("Flush on threshold")

err = doFlush()
// deadline prevents bulker being blocked on flush
flushCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
err = doFlush(flushCtx)

stopTimer(timer)
}
Expand All @@ -420,7 +419,11 @@ func (b *Bulker) Run(ctx context.Context) error {
Int("itemCnt", itemCnt).
Int("byteCnt", byteCnt).
Msg("Flush on timer")
err = doFlush()

// deadline prevents bulker being blocked on flush
flushCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
err = doFlush(flushCtx)

case <-ctx.Done():
err = ctx.Err()
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/bulk/opApiKey.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error {

res, err := req.Do(ctx, b.es)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch")
zerolog.Ctx(ctx).Error().Err(err).Msg("flushUpdateAPIKey: Error sending bulk API Key update request to Elasticsearch")
return err
}
if res.Body != nil {
Expand All @@ -229,7 +229,7 @@ func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error {
return parseError(res, zerolog.Ctx(ctx))
}

zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("flushUpdateAPIKey")
zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("flushUpdateAPIKey: API Keys updated.")

responses[responseIdx] = res.StatusCode
for _, id := range idsInBatch {
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/bulk/opBulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error {

res, err := req.Do(ctx, b.es)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do")
zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("flushBulk: Fail BulkRequest req.Do")
return err
}

Expand All @@ -220,7 +220,7 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error {
}

if res.IsError() {
zerolog.Ctx(ctx).Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("Fail BulkRequest result")
zerolog.Ctx(ctx).Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("flushBulk: Fail BulkRequest result")
return parseError(res, zerolog.Ctx(ctx))
}

Expand Down

0 comments on commit 17de402

Please sign in to comment.