Skip to content

Commit 602843d

Browse files
authored
Merge pull request #297 from ziggie1984/sync-pruned-node
query: fix retry query case.
2 parents 43f5a58 + a2d891c commit 602843d

File tree

3 files changed

+245
-56
lines changed

3 files changed

+245
-56
lines changed

query/worker.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ var (
2424
// queryJob is the internal struct that wraps the Query to work on, in
2525
// addition to some information about the query.
2626
type queryJob struct {
27-
tries uint8
28-
index uint64
29-
timeout time.Duration
30-
encoding wire.MessageEncoding
31-
cancelChan <-chan struct{}
27+
tries uint8
28+
index uint64
29+
timeout time.Duration
30+
encoding wire.MessageEncoding
31+
cancelChan <-chan struct{}
32+
internalCancelChan <-chan struct{}
3233
*Request
3334
}
3435

@@ -128,6 +129,16 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
128129
// result will be sent back.
129130
break
130131

132+
case <-job.internalCancelChan:
133+
log.Tracef("Worker %v found job with index %v "+
134+
"already internally canceled (batch timed out)",
135+
peer.Addr(), job.Index())
136+
137+
// We break to the below loop, where we'll check the
138+
// internal cancel channel again and the ErrJobCanceled
139+
// result will be sent back.
140+
break
141+
131142
// We received a non-canceled query job, send it to the peer.
132143
default:
133144
log.Tracef("Worker %v queuing job %T with index %v",
@@ -214,6 +225,13 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
214225
jobErr = ErrJobCanceled
215226
break Loop
216227

228+
case <-job.internalCancelChan:
229+
log.Tracef("Worker %v job %v internally "+
230+
"canceled", peer.Addr(), job.Index())
231+
232+
jobErr = ErrJobCanceled
233+
break Loop
234+
217235
case <-quit:
218236
return
219237
}

query/workmanager.go

Lines changed: 76 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ func (w *peerWorkManager) workDispatcher() {
183183
timeout <-chan time.Time
184184
rem int
185185
errChan chan error
186+
cancelChan chan struct{}
186187
}
187188

188189
// We set up a batch index counter to keep track of batches that still
@@ -309,7 +310,20 @@ Loop:
309310
// turns out to be an error.
310311
batchNum := currentQueries[result.job.index]
311312
delete(currentQueries, result.job.index)
312-
batch := currentBatches[batchNum]
313+
314+
// In case the batch is already canceled we return
315+
// early.
316+
batch, ok := currentBatches[batchNum]
317+
if !ok {
318+
log.Warnf("Query(%d) result from peer %v "+
319+
"discarded with retries %d, because "+
320+
"batch already canceled: %v",
321+
result.job.index,
322+
result.peer.Addr(),
323+
result.job.tries, result.err)
324+
325+
continue Loop
326+
}
313327

314328
switch {
315329
// If the query ended because it was canceled, drop it.
@@ -322,30 +336,34 @@ Loop:
322336
// was canceled, forward the error on the
323337
// batch's error channel. We do this since a
324338
// cancellation applies to the whole batch.
325-
if batch != nil {
326-
batch.errChan <- result.err
327-
delete(currentBatches, batchNum)
339+
batch.errChan <- result.err
340+
delete(currentBatches, batchNum)
328341

329-
log.Debugf("Canceled batch %v",
330-
batchNum)
331-
continue Loop
332-
}
342+
log.Debugf("Canceled batch %v", batchNum)
343+
continue Loop
333344

334345
// If the query ended with any other error, put it back
335346
// into the work queue if it has not reached the
336347
// maximum number of retries.
337348
case result.err != nil:
338-
// Punish the peer for the failed query.
339-
w.cfg.Ranking.Punish(result.peer.Addr())
349+
// Refresh peer rank on disconnect.
350+
if result.err == ErrPeerDisconnected {
351+
w.cfg.Ranking.ResetRanking(
352+
result.peer.Addr(),
353+
)
354+
} else {
355+
// Punish the peer for the failed query.
356+
w.cfg.Ranking.Punish(result.peer.Addr())
357+
}
340358

341-
if batch != nil && !batch.noRetryMax {
359+
if !batch.noRetryMax {
342360
result.job.tries++
343361
}
344362

345363
// Check if this query has reached its maximum
346364
// number of retries. If so, remove it from the
347365
// batch and don't reschedule it.
348-
if batch != nil && !batch.noRetryMax &&
366+
if !batch.noRetryMax &&
349367
result.job.tries >= batch.maxRetries {
350368

351369
log.Warnf("Query(%d) from peer %v "+
@@ -380,11 +398,6 @@ Loop:
380398
result.job.timeout = newTimeout
381399
}
382400

383-
// Refresh peer rank on disconnect.
384-
if result.err == ErrPeerDisconnected {
385-
w.cfg.Ranking.ResetRanking(result.peer.Addr())
386-
}
387-
388401
heap.Push(work, result.job)
389402
currentQueries[result.job.index] = batchNum
390403

@@ -396,42 +409,47 @@ Loop:
396409

397410
// Decrement the number of queries remaining in
398411
// the batch.
399-
if batch != nil {
400-
batch.rem--
401-
log.Tracef("Remaining jobs for batch "+
402-
"%v: %v ", batchNum, batch.rem)
403-
404-
// If this was the last query in flight
405-
// for this batch, we can notify that
406-
// it finished, and delete it.
407-
if batch.rem == 0 {
408-
batch.errChan <- nil
409-
delete(currentBatches, batchNum)
410-
411-
log.Tracef("Batch %v done",
412-
batchNum)
413-
continue Loop
414-
}
412+
batch.rem--
413+
log.Tracef("Remaining jobs for batch "+
414+
"%v: %v ", batchNum, batch.rem)
415+
416+
// If this was the last query in flight
417+
// for this batch, we can notify that
418+
// it finished, and delete it.
419+
if batch.rem == 0 {
420+
batch.errChan <- nil
421+
delete(currentBatches, batchNum)
422+
423+
log.Tracef("Batch %v done",
424+
batchNum)
425+
continue Loop
415426
}
416427
}
417428

418429
// If the total timeout for this batch has passed,
419430
// return an error.
420-
if batch != nil {
421-
select {
422-
case <-batch.timeout:
423-
batch.errChan <- ErrQueryTimeout
424-
delete(currentBatches, batchNum)
431+
select {
432+
case <-batch.timeout:
433+
batch.errChan <- ErrQueryTimeout
434+
delete(currentBatches, batchNum)
435+
436+
// When deleting the particular batch
437+
// number we need to make sure to cancel
438+
// all queued and ongoing queryJobs
439+
// to not waste resources when the batch
440+
// call is already canceled.
441+
if batch.cancelChan != nil {
442+
close(batch.cancelChan)
443+
}
425444

426-
log.Warnf("Query(%d) failed with "+
427-
"error: %v. Timing out.",
428-
result.job.index, result.err)
445+
log.Warnf("Query(%d) failed with "+
446+
"error: %v. Timing out.",
447+
result.job.index, result.err)
429448

430-
log.Debugf("Batch %v timed out",
431-
batchNum)
449+
log.Warnf("Batch %v timed out",
450+
batchNum)
432451

433-
default:
434-
}
452+
default:
435453
}
436454

437455
// A new batch of queries where scheduled.
@@ -442,13 +460,17 @@ Loop:
442460
log.Debugf("Adding new batch(%d) of %d queries to "+
443461
"work queue", batchIndex, len(batch.requests))
444462

463+
// Internal cancel channel of a batch request.
464+
cancelChan := make(chan struct{})
465+
445466
for _, q := range batch.requests {
446467
heap.Push(work, &queryJob{
447-
index: queryIndex,
448-
timeout: minQueryTimeout,
449-
encoding: batch.options.encoding,
450-
cancelChan: batch.options.cancelChan,
451-
Request: q,
468+
index: queryIndex,
469+
timeout: minQueryTimeout,
470+
encoding: batch.options.encoding,
471+
cancelChan: batch.options.cancelChan,
472+
internalCancelChan: cancelChan,
473+
Request: q,
452474
})
453475
currentQueries[queryIndex] = batchIndex
454476
queryIndex++
@@ -457,9 +479,12 @@ Loop:
457479
currentBatches[batchIndex] = &batchProgress{
458480
noRetryMax: batch.options.noRetryMax,
459481
maxRetries: batch.options.numRetries,
460-
timeout: time.After(batch.options.timeout),
482+
timeout: time.After(
483+
batch.options.timeout,
484+
),
461485
rem: len(batch.requests),
462486
errChan: batch.errChan,
487+
cancelChan: cancelChan,
463488
}
464489
batchIndex++
465490

0 commit comments

Comments
 (0)