Skip to content

Commit e349d98

Browse files
committed
reduce 1 alloc and update benchmarks
1 parent 7752eb0 commit e349d98

File tree

7 files changed

+114
-167
lines changed

7 files changed

+114
-167
lines changed

README.md

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ task := gopool.NewTask(func(t gopool.Args[User]) error {
6969
}).DrainTo(output)
7070

7171
pool := gopool.NewPool()
72-
defer gopool.Close()
7372

7473
gopool.Go(task).Wait()
7574
results := output.Drain()
@@ -157,15 +156,15 @@ type Retryable interface { WithRetry(attempts uint, sleep time.Duration) Worker
157156
goos: linux, goarch: amd64, cpu: 13th Gen Intel i9-13900KS
158157
```
159158

160-
| Name | Time per op (ns) | Allocs per op | Bytes per op |
161-
|----------------------------------------------|----------------|---------------|--------------|
162-
| ErrGroup-32 | 178.7 | 1 | 24 B |
163-
| ChannelsWithOutputAndErrChannel-32 | 259.9 | 2 | 72 B |
164-
| ChannelsWithWaitGroup-32 | 272.8 | 2 | 80 B |
165-
| MutexWithErrGroup-32 | 270.9 | 2 | 102 B |
166-
| GoPoolWithDrainer-32 | 277.5 | 4 | 162 B |
167-
| ChannelsWithErrGroup-32 | 279.5 | 2 | 80 B |
168-
| GoPool-32 | 297.4 | 3 | 96 B |
159+
| Name | Iterations | ns/op | B/op | allocs/op |
160+
|-------------------------------------------|-----------:|---------:|-------:|-----------:|
161+
| **ErrGroup** | 6,253,089 | **177.8** | **24** | **1** |
162+
| GoPool | 4,754,223 | **255.9** | 80 | 2 |
163+
| ChannelsWithOutputAndErrChannel | 4,463,799 | 258.3 | **72** | 2 |
164+
| GoPoolWithDrainer | 4,570,286 | 262.8 | 118 | 3 |
165+
| ChannelsWithWaitGroup | 4,499,217 | 270.5 | 80 | 2 |
166+
| ChannelsWithErrGroup | 4,336,857 | 277.6 | 80 | 2 |
167+
| MutexWithErrGroup | 4,380,441 | 368.9 | 127 | 2 |
169168

170169
![Benchmark Comparison](go_async_benchmarks.png)
171170

@@ -196,7 +195,6 @@ it provides type safety, retries, automatic draining, and deterministic cleanup
196195

197196
### General
198197

199-
- Graceful Shutdown — always call `gopool.Close()` or defer it for safe cleanup.
200198
- Thread Safety — never access internal slices or channels directly.
201199
- Non-blocking design — use `Drain()` or wait for pool completion instead of manual `close()` calls.
202200

benchmark_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@ func BenchmarkAsyncPackage(b *testing.B) {
2525
os.Setenv("STAGE", "prod")
2626
// Create a Drain channel for async operations
2727
d := gopool.NewPool()
28-
defer d.Close()
2928

30-
b.Run("AsyncPackage", func(b *testing.B) {
29+
b.Run("GoPool", func(b *testing.B) {
3130
for i := 0; i < b.N; i++ {
3231
d.Go(gopool.NewTask(func(arg gopool.Args[int]) error {
3332
return SimulatedTask()
@@ -46,9 +45,8 @@ func BenchmarkAsyncPackageWithDrainer(b *testing.B) {
4645
//disable internal limit on test
4746
os.Setenv("STAGE", "prod")
4847
d := gopool.NewPool()
49-
defer d.Close()
5048

51-
b.Run("AsyncPackage", func(b *testing.B) {
49+
b.Run("GoPool", func(b *testing.B) {
5250
o := gopool.NewDrainer[int]()
5351
// Create a Drain channel for async operations
5452
for i := 0; i < b.N; i++ {

drain.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,43 @@ import (
44
"sync"
55
)
66

7-
// Drain collects values dynamically without pre-allocation (unbuffered)
7+
// Drain collects values safely with minimal allocations
88
type Drain[T any] struct {
9-
values []T
109
mu sync.Mutex
10+
values []T
1111
cond *sync.Cond
1212
}
1313

14-
// NewDrainer creates an unbuffered, thread-safe Drainer
14+
// NewDrainer creates a Drain
1515
func NewDrainer[T any]() *Drain[T] {
16-
d := &Drain[T]{}
16+
d := &Drain[T]{values: make([]T, 0, 4)} // small preallocation
1717
d.cond = sync.NewCond(&d.mu)
1818
return d
1919
}
2020

21-
// Send appends a value safely and notifies Drain()
21+
// Send appends a value with minimal allocations
2222
func (d *Drain[T]) Send(v T) {
2323
d.mu.Lock()
24-
d.values = append(d.values, v)
25-
d.cond.Broadcast() // wake any goroutines waiting in Drain()
24+
if cap(d.values) == len(d.values) {
25+
newCap := cap(d.values)*2 + 1
26+
newSlice := make([]T, len(d.values), newCap)
27+
copy(newSlice, d.values)
28+
d.values = newSlice
29+
}
30+
d.values = d.values[:len(d.values)+1]
31+
d.values[len(d.values)-1] = v
32+
d.cond.Broadcast()
2633
d.mu.Unlock()
2734
}
2835

29-
// Count returns the number of items pushed so far
36+
// Count returns current length
3037
func (d *Drain[T]) Count() int {
3138
d.mu.Lock()
3239
defer d.mu.Unlock()
3340
return len(d.values)
3441
}
3542

36-
// Drain returns a snapshot of all values currently pushed
37-
// Since total is unknown, this returns current state immediately
43+
// Drain returns a snapshot of all values
3844
func (d *Drain[T]) Drain() []T {
3945
d.mu.Lock()
4046
defer d.mu.Unlock()

pool.go

Lines changed: 42 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,115 +1,67 @@
11
package gopool
22

33
import (
4-
"context"
5-
"fmt"
6-
"math/rand"
74
"sync"
85
"time"
96
)
107

11-
type token struct{}
12-
13-
// Pool is an wrapper for errgroup.Group
148
type Pool struct {
15-
cancel func(error)
16-
17-
wg sync.WaitGroup
18-
19-
sem chan token
20-
21-
errOnce sync.Once
22-
err error
23-
24-
ctx context.Context
25-
9+
wg sync.WaitGroup
2610
attempts uint
2711
sleep time.Duration
12+
errOnce sync.Once
13+
err error
14+
cancel func(error)
2815
}
2916

30-
// Go runs the provided async Tasks, handling them generically
31-
func (p *Pool) Go(tasks ...Worker) *Pool {
32-
for i := 0; i < len(tasks); i++ {
33-
if p.sem != nil {
34-
p.sem <- token{}
35-
}
36-
37-
p.wg.Add(1)
38-
go func(w Worker) {
39-
defer p.done()
40-
41-
currentSleep := p.sleep
42-
for i := uint(0); i < p.attempts; i++ {
43-
err := w.Execute()
44-
if err == nil {
45-
break
46-
}
17+
// NewPool creates a pool
18+
func NewPool() *Pool {
19+
return &Pool{attempts: 1}
20+
}
4721

48-
if i+1 < p.attempts {
49-
jitter := time.Duration(rand.Int63n(int64(currentSleep) / 2))
50-
time.Sleep(currentSleep + jitter)
51-
currentSleep *= 2
52-
continue
53-
}
22+
// WithRetry sets pool-wide retry config
23+
func (p *Pool) WithRetry(attempts uint, sleep time.Duration) *Pool {
24+
p.attempts = attempts
25+
p.sleep = sleep
26+
return p
27+
}
5428

55-
p.errOnce.Do(func() {
56-
p.err = err
57-
if p.cancel != nil {
58-
p.cancel(p.err)
59-
}
60-
})
61-
}
62-
}(tasks[i])
29+
// Go executes tasks concurrently
30+
func (p *Pool) Go(tasks ...Worker) *Pool {
31+
for i := range tasks {
32+
task := tasks[i] // copy by value internally
33+
p.wg.Add(1)
34+
go executeTask(p, task)
6335
}
64-
6536
return p
6637
}
6738

68-
func (g *Pool) done() {
69-
if g.sem != nil {
70-
<-g.sem
39+
// executeTask is standalone to avoid closure allocation
40+
func executeTask(p *Pool, w Worker) {
41+
defer p.wg.Done()
42+
currentSleep := p.sleep
43+
for i := uint(0); i < p.attempts; i++ {
44+
err := w.Execute()
45+
if err == nil {
46+
return
47+
}
48+
if i+1 < p.attempts {
49+
jitter := currentSleep / 2
50+
time.Sleep(currentSleep + jitter)
51+
currentSleep *= 2
52+
continue
53+
}
54+
p.errOnce.Do(func() {
55+
p.err = err
56+
if p.cancel != nil {
57+
p.cancel(err)
58+
}
59+
})
7160
}
72-
g.wg.Done()
7361
}
7462

75-
// Wait waits until all workers are done, and then gracefully shuts down them
63+
// Wait waits for all tasks to finish
7664
func (p *Pool) Wait() error {
7765
p.wg.Wait()
78-
if p.cancel != nil {
79-
p.cancel(p.err)
80-
}
8166
return p.err
8267
}
83-
84-
// NewPool creates a new Promise group and allows to run asynchronously
85-
func NewPool() *Pool {
86-
return NewPoolWithContext(context.Background())
87-
}
88-
89-
// NewPoolWithContext creates a new Promise group and allows to run asynchronously with provided context
90-
func NewPoolWithContext(ctx context.Context) *Pool {
91-
ctx, cancel := context.WithCancelCause(ctx)
92-
return &Pool{cancel: cancel, ctx: ctx, attempts: 1}
93-
}
94-
95-
// Close closes channels and contexts in use to prevent memory leaks
96-
func (g *Pool) Close() {
97-
g.ctx.Done()
98-
}
99-
100-
// WithLimit returns an Pool that will run asynchronously with a limit of Tasks
101-
func (p *Pool) WithLimit(limit uint) *Pool {
102-
if len(p.sem) != 0 {
103-
panic(fmt.Errorf("unable to modify limit while %v goroutines are still active", len(p.sem)))
104-
}
105-
p.sem = make(chan token, limit)
106-
107-
return p
108-
}
109-
110-
// WithRetry returns an Pool that will run asynchronously with a limit of retries
111-
func (g *Pool) WithRetry(attempts uint, sleep time.Duration) *Pool {
112-
g.attempts = attempts
113-
g.sleep = sleep
114-
return g
115-
}

pool_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ func TestConcurrentClient(t *testing.T) {
3838

3939
pool := gopool.NewPool()
4040
err := pool.Go(requests...).Wait()
41-
defer pool.Close()
4241

4342
t.Run("No errors", func(t *testing.T) {
4443
assert.NoError(t, err)
@@ -106,7 +105,7 @@ func TestConcurrentClientWithRetry(t *testing.T) {
106105

107106
pool := gopool.NewPool()
108107
err := pool.Go(requests...).Wait()
109-
defer pool.Close()
108+
110109
t.Run("6 requests done", func(t *testing.T) {
111110
assert.Equal(t, 6, numInvocations)
112111
})
@@ -142,7 +141,7 @@ func TestConcurrentClientWithRetryFailure(t *testing.T) {
142141
}
143142

144143
pool := gopool.NewPool()
145-
defer pool.Close()
144+
146145
err := pool.Go(requests...).Wait()
147146
t.Run("6 requests done", func(t *testing.T) {
148147
assert.Equal(t, 6, numInvocations)
@@ -177,8 +176,9 @@ func TestConcurrentClientWithAllRetry(t *testing.T) {
177176
}),
178177
}
179178

180-
pool := gopool.NewPool().WithRetry(3, 100*time.Millisecond)
181-
defer pool.Close()
179+
pool := gopool.NewPool()
180+
pool.WithRetry(3, 100*time.Millisecond)
181+
182182
err := pool.Go(requests...).Wait()
183183
t.Run("5 requests done", func(t *testing.T) {
184184
assert.Equal(t, 5, numInvocations)
@@ -206,7 +206,7 @@ func TestConcurrentClientWithTaskChannel(t *testing.T) {
206206
}
207207

208208
pool := gopool.NewPool()
209-
defer pool.Close()
209+
210210
// Run the Task(s)
211211
err := pool.Go(requests...).Wait()
212212

@@ -251,7 +251,7 @@ func TestConcurrentClientWith2WorkersameChannel(t *testing.T) {
251251
}
252252

253253
pool := gopool.NewPool()
254-
defer pool.Close()
254+
255255
// Run the Task(s)
256256
err := pool.Go(requests...).Wait()
257257

@@ -298,7 +298,7 @@ func TestConcurrentClientWith2TaskDiffTypes(t *testing.T) {
298298
}
299299

300300
pool := gopool.NewPool()
301-
defer pool.Close()
301+
302302
// Run the Task(s)
303303
err := pool.Go(requests...).Wait()
304304

@@ -347,7 +347,7 @@ func TestConcurrentClientWith2TaskDiffTypes1Output(t *testing.T) {
347347
}
348348

349349
pool := gopool.NewPool()
350-
defer pool.Close()
350+
351351
// Run the Task(s)
352352
err := pool.Go(requests...).Wait()
353353

@@ -382,7 +382,7 @@ func TestConcurrentClientWith2TaskDiffTypes1Output1Input(t *testing.T) {
382382
return nil
383383
}
384384

385-
tFunc2 := func(t gopool.Args[typeB]) error {
385+
tFunc2 := func(t gopool.Args[*typeB]) error {
386386
numInvocations++
387387

388388
// update
@@ -396,7 +396,7 @@ func TestConcurrentClientWith2TaskDiffTypes1Output1Input(t *testing.T) {
396396
}
397397

398398
pool := gopool.NewPool()
399-
defer pool.Close()
399+
400400
// Run the Task(s)
401401
err := pool.Go(requests...).Wait()
402402

0 commit comments

Comments
 (0)