Skip to content

Commit 7672fca

Browse files
committed
pubsub: Ensure batcher flushes on shutdown, even if min batch size isn't met
This PR ensures that the batcher flushes on shutdown, even if the pending length is less than the min batch size specified. Sending events is preferred to dropping, even if limits are not obeyed.
1 parent fb4e4b9 commit 7672fca

File tree

2 files changed

+59
-9
lines changed

2 files changed

+59
-9
lines changed

pubsub/batcher/batcher.go

+31-9
Original file line numberDiff line numberDiff line change
@@ -200,26 +200,40 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error {
200200
if b.nHandlers < b.opts.MaxHandlers {
201201
// If we can start a handler, do so with the item just added and any others that are pending.
202202
batch := b.nextBatch()
203-
if batch != nil {
204-
b.wg.Add(1)
205-
go func() {
206-
b.callHandler(batch)
207-
b.wg.Done()
208-
}()
209-
b.nHandlers++
210-
}
203+
b.handleBatch(batch)
211204
}
212205
// If we can't start a handler, then one of the currently running handlers will
213206
// take our item.
214207
return c
215208
}
216209

210+
func (b *Batcher) handleBatch(batch []waiter) {
211+
if batch == nil || len(batch) == 0 {
212+
return
213+
}
214+
215+
b.wg.Add(1)
216+
go func() {
217+
b.callHandler(batch)
218+
b.wg.Done()
219+
}()
220+
b.nHandlers++
221+
}
222+
217223
// nextBatch returns the batch to process, and updates b.pending.
218224
// It returns nil if there's no batch ready for processing.
219225
// b.mu must be held.
220226
func (b *Batcher) nextBatch() []waiter {
221227
if len(b.pending) < b.opts.MinBatchSize {
222-
return nil
228+
// We handle minimum batch sizes depending on specific
229+
// situations.
230+
// XXX: If we allow max batch lifetimes, handle that here.
231+
if b.shutdown == false {
232+
// If we're not shutting down, respect minimums. If we're
233+
// shutting down, though, we ignore minimums to flush the
234+
// entire batch.
235+
return nil
236+
}
223237
}
224238

225239
if b.opts.MaxBatchByteSize == 0 && (b.opts.MaxBatchSize == 0 || len(b.pending) <= b.opts.MaxBatchSize) {
@@ -283,5 +297,13 @@ func (b *Batcher) Shutdown() {
283297
b.mu.Lock()
284298
b.shutdown = true
285299
b.mu.Unlock()
300+
301+
// On shutdown, ensure that we attempt to flush any pending items
302+
// if there's a minimum batch size.
303+
if b.nHandlers < b.opts.MaxHandlers {
304+
batch := b.nextBatch()
305+
b.handleBatch(batch)
306+
}
307+
286308
b.wg.Wait()
287309
}

pubsub/batcher/batcher_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,34 @@ func TestMinBatchSize(t *testing.T) {
171171
}
172172
}
173173

174+
// TestMinBatchSizeFlushesOnShutdown ensures that Shutdown() flushes batches, even if
175+
// the pending count is less than the minimum batch size.
176+
func TestMinBatchSizeFlushesOnShutdown(t *testing.T) {
177+
var got [][]int
178+
179+
batchSize := 3
180+
181+
b := batcher.New(reflect.TypeOf(int(0)), &batcher.Options{MinBatchSize: batchSize}, func(items interface{}) error {
182+
got = append(got, items.([]int))
183+
return nil
184+
})
185+
for i := 0; i < (batchSize - 1); i++ {
186+
b.AddNoWait(i)
187+
}
188+
189+
// Ensure that we've received nothing
190+
if len(got) > 0 {
191+
t.Errorf("got batch unexpectedly: %+v", got)
192+
}
193+
194+
b.Shutdown()
195+
196+
want := [][]int{{0, 1}}
197+
if !cmp.Equal(got, want) {
198+
t.Errorf("got %+v, want %+v on shutdown", got, want)
199+
}
200+
}
201+
174202
func TestSaturation(t *testing.T) {
175203
// Verify that under high load the maximum number of handlers are running.
176204
ctx := context.Background()

0 commit comments

Comments
 (0)