Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/workflows/dontime/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,
if req.ExpiryTime().Before(timeoutCheck) {
// Request has been sitting in queue too long
p.store.RemoveRequest(req.WorkflowExecutionID)
req.SendTimeout(nil)
req.SendTimeout(context.Background())
continue
}

Expand All @@ -89,14 +89,16 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,

if req.SeqNum > numObservedDonTimes {
p.store.RemoveRequest(req.WorkflowExecutionID)
req.SendResponse(nil,
ctx, cancel := context.WithDeadline(context.Background(), req.ExpiryTime())
req.SendResponse(ctx,
Response{
WorkflowExecutionID: req.WorkflowExecutionID,
SeqNum: req.SeqNum,
Timestamp: 0,
Err: fmt.Errorf("requested seqNum %d for executionID %s is greater than the number of observed don times %d",
req.SeqNum, req.WorkflowExecutionID, numObservedDonTimes),
})
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you want to achieve with this cancel? The SendRepsonse is a sync function that either succeeds or fails. Calling cancel() afterwards does not seem to do anything.

Comment on lines +92 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: @pavel-raykov - you can use anonymous closures to reduce scope of deferred actions:

Suggested change
ctx, cancel := context.WithDeadline(context.Background(), req.ExpiryTime())
req.SendResponse(ctx,
Response{
WorkflowExecutionID: req.WorkflowExecutionID,
SeqNum: req.SeqNum,
Timestamp: 0,
Err: fmt.Errorf("requested seqNum %d for executionID %s is greater than the number of observed don times %d",
req.SeqNum, req.WorkflowExecutionID, numObservedDonTimes),
})
cancel()
func() {
ctx, cancel := context.WithDeadline(context.Background(), req.ExpiryTime())
defer cancel()
req.SendResponse(ctx,
Response{
WorkflowExecutionID: req.WorkflowExecutionID,
SeqNum: req.SeqNum,
Timestamp: 0,
Err: fmt.Errorf("requested seqNum %d for executionID %s is greater than the number of observed don times %d",
req.SeqNum, req.WorkflowExecutionID, numObservedDonTimes),
})
} ()

continue
}

Expand Down
15 changes: 11 additions & 4 deletions pkg/workflows/dontime/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,28 @@ func (r *Request) ExpiryTime() time.Time {
return r.ExpiresAt
}

func (r *Request) SendResponse(_ context.Context, resp Response) {
func (r *Request) SendResponse(ctx context.Context, resp Response) {
select {
case r.CallbackCh <- resp:
close(r.CallbackCh)
default: // Don't block trying to send
default:
// Don't block if receiver not ready, but check if context is actually expired
select {
case <-ctx.Done():
// Context cancelled or deadline exceeded before send
default:
// Try once more without blocking
}
Comment on lines +33 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just check for an error instead of using the channel mechanism:

Suggested change
// Don't block if receiver not ready, but check if context is actually expired
select {
case <-ctx.Done():
// Context cancelled or deadline exceeded before send
default:
// Try once more without blocking
}
// Don't block if receiver not ready, but check if context is actually expired
ctx.Err()

But what is meant to happen here? Both cases are no-ops.

}
}

func (r *Request) SendTimeout(_ context.Context) {
func (r *Request) SendTimeout(ctx context.Context) {
timeoutResponse := Response{
WorkflowExecutionID: r.WorkflowExecutionID,
SeqNum: r.SeqNum,
Err: fmt.Errorf("timeout exceeded: could not process request before expiry, workflowExecutionID %s", r.WorkflowExecutionID),
}
r.SendResponse(nil, timeoutResponse)
r.SendResponse(ctx, timeoutResponse)
}

func (r *Request) Copy() *Request {
Expand Down
4 changes: 2 additions & 2 deletions pkg/workflows/dontime/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewTransmitter(lggr logger.Logger, store *Store, fromAccount types.Account)
return &Transmitter{lggr: lggr, store: store, fromAccount: fromAccount}
}

func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error {
func (t *Transmitter) Transmit(ctx context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error {
outcome := &pb.Outcome{}
if err := proto.Unmarshal(r.Report, outcome); err != nil {
t.lggr.Errorf("failed to unmarshal report")
Expand All @@ -51,7 +51,7 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64
if len(donTimes.Timestamps) > request.SeqNum {
donTime := donTimes.Timestamps[request.SeqNum]
t.store.RemoveRequest(executionID) // Make space for next request before delivering
request.SendResponse(nil, Response{
request.SendResponse(ctx, Response{
WorkflowExecutionID: executionID,
SeqNum: request.SeqNum,
Timestamp: donTime,
Expand Down
Loading