Skip to content

Commit c318151

Browse files
authored
Merge pull request #6 from scylladb/calculate-delay-since-last-window
stream_batch: do not include processing time when sleeping
2 parents 0bde107 + 65810c2 commit c318151

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

stream_batch.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ outer:
9696
var err error
9797
var hadRows bool
9898

99+
windowProcessingStartTime := time.Now()
100+
99101
if compareTimeuuid(wnd.begin, wnd.end) < 0 {
100102
var iter *changeRowIterator
101103
iter, err = crq.queryRange(wnd.begin, wnd.end)
@@ -130,7 +132,10 @@ outer:
130132
delay = sbr.config.Advanced.PostEmptyQueryDelay
131133
}
132134

133-
delayUntil := time.Now().Add(delay)
135+
delayUntil := windowProcessingStartTime.Add(delay)
136+
if time.Until(delayUntil) < time.Duration(0) {
137+
sbr.config.Logger.Printf("the stream can't keep up! the next poll was supposed to happen %v ago", -time.Until(delayUntil))
138+
}
134139

135140
if sbr.reachedEndOfTheGeneration(wnd.begin) {
136141
break outer
@@ -141,7 +146,7 @@ outer:
141146
select {
142147
case <-ctx.Done():
143148
return ctx.Err()
144-
case <-time.After(delayUntil.Sub(time.Now())):
149+
case <-time.After(time.Until(delayUntil)):
145150
break delay
146151
case <-sbr.interruptCh:
147152
if sbr.reachedEndOfTheGeneration(wnd.begin) {

0 commit comments

Comments
 (0)