diff --git a/pkg/workflows/dontime/plugin.go b/pkg/workflows/dontime/plugin.go index ece80d7dd..611875bb0 100644 --- a/pkg/workflows/dontime/plugin.go +++ b/pkg/workflows/dontime/plugin.go @@ -75,7 +75,7 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, if req.ExpiryTime().Before(timeoutCheck) { // Request has been sitting in queue too long p.store.RemoveRequest(req.WorkflowExecutionID) - req.SendTimeout(nil) + req.SendTimeout(context.Background()) continue } @@ -89,7 +89,8 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, if req.SeqNum > numObservedDonTimes { p.store.RemoveRequest(req.WorkflowExecutionID) - req.SendResponse(nil, + ctx, cancel := context.WithDeadline(context.Background(), req.ExpiryTime()) + req.SendResponse(ctx, Response{ WorkflowExecutionID: req.WorkflowExecutionID, SeqNum: req.SeqNum, @@ -97,6 +98,7 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, Err: fmt.Errorf("requested seqNum %d for executionID %s is greater than the number of observed don times %d", req.SeqNum, req.WorkflowExecutionID, numObservedDonTimes), }) + cancel() continue } diff --git a/pkg/workflows/dontime/request.go b/pkg/workflows/dontime/request.go index 381fb3196..52bc6d020 100644 --- a/pkg/workflows/dontime/request.go +++ b/pkg/workflows/dontime/request.go @@ -25,21 +25,28 @@ func (r *Request) ExpiryTime() time.Time { return r.ExpiresAt } -func (r *Request) SendResponse(_ context.Context, resp Response) { +func (r *Request) SendResponse(ctx context.Context, resp Response) { select { case r.CallbackCh <- resp: close(r.CallbackCh) - default: // Don't block trying to send + default: + // Don't block if receiver not ready, but check if context is actually expired + select { + case <-ctx.Done(): + // Context cancelled or deadline exceeded before send + default: + // Try once more without blocking + } } } -func (r *Request) SendTimeout(_ context.Context) { +func (r *Request) SendTimeout(ctx context.Context) { timeoutResponse := Response{ WorkflowExecutionID: r.WorkflowExecutionID, SeqNum: r.SeqNum, Err: fmt.Errorf("timeout exceeded: could not process request before expiry, workflowExecutionID %s", r.WorkflowExecutionID), } - r.SendResponse(nil, timeoutResponse) + r.SendResponse(ctx, timeoutResponse) } func (r *Request) Copy() *Request { diff --git a/pkg/workflows/dontime/transmitter.go b/pkg/workflows/dontime/transmitter.go index 0f31e5aee..99ee961b2 100644 --- a/pkg/workflows/dontime/transmitter.go +++ b/pkg/workflows/dontime/transmitter.go @@ -26,7 +26,7 @@ func NewTransmitter(lggr logger.Logger, store *Store, fromAccount types.Account) return &Transmitter{lggr: lggr, store: store, fromAccount: fromAccount} } -func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error { +func (t *Transmitter) Transmit(ctx context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error { outcome := &pb.Outcome{} if err := proto.Unmarshal(r.Report, outcome); err != nil { t.lggr.Errorf("failed to unmarshal report") @@ -51,7 +51,7 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64 if len(donTimes.Timestamps) > request.SeqNum { donTime := donTimes.Timestamps[request.SeqNum] t.store.RemoveRequest(executionID) // Make space for next request before delivering - request.SendResponse(nil, Response{ + request.SendResponse(ctx, Response{ WorkflowExecutionID: executionID, SeqNum: request.SeqNum, Timestamp: donTime,