Skip to content

Commit

Permalink
Update status in all input lifecycle phases
Browse files Browse the repository at this point in the history
Now also cover the following phases:

- pipeline creation
- sanitizers creation
- input setup
  • Loading branch information
zmoog committed Nov 6, 2024
1 parent 5212494 commit c631779
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/azureeventhub/v1_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (in *eventHubInputV1) Run(
// Create pipelineClient for publishing events.
in.pipelineClient, err = createPipelineClient(pipeline)
if err != nil {
inputContext.UpdateStatus(status.Failed, err.Error())
return fmt.Errorf("failed to create pipeline pipelineClient: %w", err)
}
defer in.pipelineClient.Close()
Expand All @@ -86,6 +87,7 @@ func (in *eventHubInputV1) Run(
// Set up new and legacy sanitizers, if any.
sanitizers, err := newSanitizers(in.config.Sanitizers, in.config.LegacySanitizeOptions)
if err != nil {
inputContext.UpdateStatus(status.Failed, err.Error())
return fmt.Errorf("failed to create sanitizers: %w", err)
}

Expand All @@ -102,6 +104,8 @@ func (in *eventHubInputV1) Run(
// in preparation for the main run loop.
err = in.setup(ctx)
if err != nil {
in.log.Errorw("error setting up input", "error", err)
inputContext.UpdateStatus(status.Failed, err.Error())
return err
}

Expand All @@ -113,7 +117,7 @@ func (in *eventHubInputV1) Run(
return err
}

inputContext.UpdateStatus(status.Stopped, "")
inputContext.UpdateStatus(status.Stopping, "")
return nil
}

Expand Down

0 comments on commit c631779

Please sign in to comment.