diff --git a/pubsub/batcher/batcher.go b/pubsub/batcher/batcher.go index 8fd760a971..1a6bae812b 100644 --- a/pubsub/batcher/batcher.go +++ b/pubsub/batcher/batcher.go @@ -212,7 +212,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { b.handleBatch(batch) } - if batch == nil && len(b.pending) > 0 { + if batch == nil && len(b.pending) > 0 && b.opts.BatchTimeout > 0 { // If the batch size timeout is zero, this is one of the first items to // be added to the batch under the minimum batch size. Record when this // happens so that .nextBatch() can grab the batch on timeout. @@ -225,6 +225,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { if atomic.CompareAndSwapInt32(&b.batchTimeoutRunning, 0, 1) { go func() { <-time.After(b.opts.BatchTimeout) + b.batchTimeoutRunning = 0 batch = b.nextBatch() if batch != nil { b.handleBatch(batch) @@ -255,23 +256,8 @@ func (b *Batcher) handleBatch(batch []waiter) { // It returns nil if there's no batch ready for processing. // b.mu must be held. func (b *Batcher) nextBatch() []waiter { - if len(b.pending) < b.opts.MinBatchSize { - // We handle minimum batch sizes depending on specific - // situations. - if time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout { - // If we're within the max batch lifetime, respect minimum batch - // sizes and return nil. - return nil - } - if b.shutdown == false { - // If we're not shutting down, respect minimums. If we're - // shutting down, though, we ignore minimums to flush the - // entire batch. - return nil - } - // At this point, either we're shutting down or we've we've waited - // too long for the minimum size to be met. We're going to proceed - // with flushing the batch. + if len(b.pending) < b.opts.MinBatchSize && b.respectMinBatchSize() { + return nil } if len(b.pending) < b.opts.MinBatchSize { @@ -307,6 +293,23 @@ func (b *Batcher) nextBatch() []waiter { return batch } +func (b *Batcher) respectMinBatchSize() bool { + // We handle minimum batch sizes depending on specific + // situations. + if b.shutdown { + // If we're shutting down, do not respect minimums. This takes priority. + return false + } + if b.opts.BatchTimeout > 0 && time.Since(b.batchSizeTimeout) >= b.opts.BatchTimeout { + // If we have a maximum wait before sending batches below the minimum, and we've + // waited longer than that period, do not respect minimum batches and send! + return false + } + // At this point, either we're not shutting down and we're not forcing a batch + // due to timeouts. Respect the batch size. + return true +} + func (b *Batcher) callHandler(batch []waiter) { for batch != nil {