Skip to content

Commit 1328f74

Browse files
committed
restore concurrency limit and re-run benchmarks
1 parent e349d98 commit 1328f74

File tree

5 files changed

+58
-34
lines changed

5 files changed

+58
-34
lines changed

README.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -158,15 +158,15 @@ goos: linux, goarch: amd64, cpu: 13th Gen Intel i9-13900KS
158158

159159
| Name | Iterations | ns/op | B/op | allocs/op |
160160
|-------------------------------------------|-----------:|---------:|-------:|-----------:|
161-
| **ErrGroup** | 6,253,089 | **177.8** | **24** | **1** |
162-
| GoPool | 4,754,223 | **255.9** | 80 | 2 |
163-
| ChannelsWithOutputAndErrChannel | 4,463,799 | 258.3 | **72** | 2 |
164-
| GoPoolWithDrainer | 4,570,286 | 262.8 | 118 | 3 |
165-
| ChannelsWithWaitGroup | 4,499,217 | 270.5 | 80 | 2 |
166-
| ChannelsWithErrGroup | 4,336,857 | 277.6 | 80 | 2 |
167-
| MutexWithErrGroup | 4,380,441 | 368.9 | 127 | 2 |
168-
169-
![Benchmark Comparison](go_async_benchmarks.png)
161+
| **ErrGroup** | 6,211,902 | **180.3** | **24** | **1** |
162+
| **GoPool** | 5,020,380 | **214.4** | 80 | 2 |
163+
| ChannelsWithOutputAndErrChannel | 4,426,651 | 260.6 | **72** | 2 |
164+
| AsyncPackageWithDrainer | 4,531,092 | 274.5 | 119 | 3 |
165+
| ChannelsWithWaitGroup | 4,480,616 | 271.5 | 80 | 2 |
166+
| ChannelsWithErrGroup | 4,336,473 | 279.1 | 80 | 2 |
167+
| MutexWithErrGroup | 2,842,214 | 420.6 | 135 | 2 |
168+
169+
![Benchmark Comparison](benchmark_chart.png)
170170

171171
Even though `go-pool` adds a small constant overhead compared to `errgroup` (≈100–130 ns per operation),
172172
it provides type safety, retries, automatic draining, and deterministic cleanup — all while staying within ~1.7× of native concurrency performance.
File renamed without changes.

benchmark_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package gopool_test
22

33
import (
4-
"os"
54
"sync"
65
"testing"
76

@@ -21,8 +20,6 @@ func SimulatedTask() error {
2120

2221
// BenchmarkAsyncPackage benchmarks the `go-async` package.
2322
func BenchmarkAsyncPackage(b *testing.B) {
24-
//disable internal limit on test
25-
os.Setenv("STAGE", "prod")
2623
// Create a Drain channel for async operations
2724
d := gopool.NewPool()
2825

@@ -37,13 +34,11 @@ func BenchmarkAsyncPackage(b *testing.B) {
3734
b.Fatal(err)
3835
}
3936
})
40-
os.Setenv("STAGE", "test")
4137
}
4238

4339
// BenchmarkAsyncPackageWithDrainer benchmarks the `go-async` package + drain logic.
4440
func BenchmarkAsyncPackageWithDrainer(b *testing.B) {
4541
//disable internal limit on test
46-
os.Setenv("STAGE", "prod")
4742
d := gopool.NewPool()
4843

4944
b.Run("GoPool", func(b *testing.B) {
@@ -61,7 +56,6 @@ func BenchmarkAsyncPackageWithDrainer(b *testing.B) {
6156
b.Fatal(err)
6257
}
6358
})
64-
os.Setenv("STAGE", "test")
6559
}
6660

6761
// BenchmarkErrGroup benchmarks the `errgroup.Group`.

pool.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,34 @@
11
package gopool
22

33
import (
4+
"context"
5+
"fmt"
46
"sync"
57
"time"
68
)
79

10+
type token struct{}
11+
12+
// Pool is a pool for N workers
813
type Pool struct {
914
wg sync.WaitGroup
1015
attempts uint
1116
sleep time.Duration
1217
errOnce sync.Once
1318
err error
19+
ctx context.Context
1420
cancel func(error)
21+
sem chan token
1522
}
1623

1724
// NewPool creates a pool
1825
func NewPool() *Pool {
19-
return &Pool{attempts: 1}
26+
return NewPoolWithContext(context.Background())
27+
}
28+
29+
func NewPoolWithContext(ctx context.Context) *Pool {
30+
ctx, cancel := context.WithCancelCause(ctx)
31+
return &Pool{attempts: 1, cancel: cancel, ctx: ctx}
2032
}
2133

2234
// WithRetry sets pool-wide retry config
@@ -30,6 +42,9 @@ func (p *Pool) WithRetry(attempts uint, sleep time.Duration) *Pool {
3042
func (p *Pool) Go(tasks ...Worker) *Pool {
3143
for i := range tasks {
3244
task := tasks[i] // copy by value internally
45+
if p.sem != nil {
46+
p.sem <- token{}
47+
}
3348
p.wg.Add(1)
3449
go executeTask(p, task)
3550
}
@@ -39,6 +54,10 @@ func (p *Pool) Go(tasks ...Worker) *Pool {
3954
// executeTask is standalone to avoid closure allocation
4055
func executeTask(p *Pool, w Worker) {
4156
defer p.wg.Done()
57+
if p.sem != nil {
58+
<-p.sem
59+
}
60+
4261
currentSleep := p.sleep
4362
for i := uint(0); i < p.attempts; i++ {
4463
err := w.Execute()
@@ -63,5 +82,21 @@ func executeTask(p *Pool, w Worker) {
6382
// Wait waits for all tasks to finish
6483
func (p *Pool) Wait() error {
6584
p.wg.Wait()
85+
if p.cancel != nil {
86+
p.cancel(p.err)
87+
}
6688
return p.err
6789
}
90+
91+
// WithLimit stablishes limitted concurrency behavior
92+
func (p *Pool) WithLimit(n int) *Pool {
93+
if n < 0 {
94+
p.sem = nil
95+
return p
96+
}
97+
if len(p.sem) != 0 {
98+
panic(fmt.Errorf("can't modify limit while %v goroutines still active", len(p.sem)))
99+
}
100+
p.sem = make(chan token, n)
101+
return p
102+
}

pool_test.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,13 @@ package gopool_test
22

33
import (
44
"fmt"
5-
"os"
65
"testing"
76
"time"
87

98
gopool "github.com/rubengp99/go-pool"
109
"github.com/stretchr/testify/assert"
1110
)
1211

13-
func init() {
14-
os.Setenv("STAGE", "test")
15-
}
16-
1712
type typeA struct {
1813
value string
1914
}
@@ -36,7 +31,7 @@ func TestConcurrentClient(t *testing.T) {
3631
gopool.NewTask(tFunc),
3732
}
3833

39-
pool := gopool.NewPool()
34+
pool := gopool.NewPool().WithLimit(1)
4035
err := pool.Go(requests...).Wait()
4136

4237
t.Run("No errors", func(t *testing.T) {
@@ -65,7 +60,7 @@ func TestConcurrentClientWithError(t *testing.T) {
6560
gopool.NewTask(tFunc),
6661
}
6762

68-
pool := gopool.NewPool()
63+
pool := gopool.NewPool().WithLimit(1)
6964
err := pool.Go(requests...).Wait()
7065

7166
t.Run("errors", func(t *testing.T) {
@@ -103,7 +98,7 @@ func TestConcurrentClientWithRetry(t *testing.T) {
10398
}).WithRetry(3, 100*time.Millisecond),
10499
}
105100

106-
pool := gopool.NewPool()
101+
pool := gopool.NewPool().WithLimit(1)
107102
err := pool.Go(requests...).Wait()
108103

109104
t.Run("6 requests done", func(t *testing.T) {
@@ -140,7 +135,7 @@ func TestConcurrentClientWithRetryFailure(t *testing.T) {
140135
}).WithRetry(3, 100*time.Millisecond),
141136
}
142137

143-
pool := gopool.NewPool()
138+
pool := gopool.NewPool().WithLimit(1)
144139

145140
err := pool.Go(requests...).Wait()
146141
t.Run("6 requests done", func(t *testing.T) {
@@ -168,15 +163,15 @@ func TestConcurrentClientWithAllRetry(t *testing.T) {
168163
gopool.NewTask(func(t gopool.Args[any]) error {
169164
numInvocations++
170165

171-
if numInvocations > 1 {
166+
if numInvocations > 2 {
172167
return nil
173168
}
174169

175170
return fmt.Errorf("bye 2")
176171
}),
177172
}
178173

179-
pool := gopool.NewPool()
174+
pool := gopool.NewPool().WithLimit(1)
180175
pool.WithRetry(3, 100*time.Millisecond)
181176

182177
err := pool.Go(requests...).Wait()
@@ -205,7 +200,7 @@ func TestConcurrentClientWithTaskChannel(t *testing.T) {
205200
gopool.NewTask(tFunc).DrainTo(output),
206201
}
207202

208-
pool := gopool.NewPool()
203+
pool := gopool.NewPool().WithLimit(1)
209204

210205
// Run the Task(s)
211206
err := pool.Go(requests...).Wait()
@@ -250,7 +245,7 @@ func TestConcurrentClientWith2WorkersameChannel(t *testing.T) {
250245
gopool.NewTask(tFunc2).DrainTo(output),
251246
}
252247

253-
pool := gopool.NewPool()
248+
pool := gopool.NewPool().WithLimit(1)
254249

255250
// Run the Task(s)
256251
err := pool.Go(requests...).Wait()
@@ -268,8 +263,8 @@ func TestConcurrentClientWith2WorkersameChannel(t *testing.T) {
268263

269264
t.Run("results drained", func(t *testing.T) {
270265
if assert.Equal(t, 2, len(results)) {
271-
assert.Equal(t, "hello-world!", results[1].value)
272-
assert.Equal(t, "hello-world!2", results[0].value)
266+
assert.Equal(t, "hello-world!", results[0].value)
267+
assert.Equal(t, "hello-world!2", results[1].value)
273268
}
274269
})
275270
}
@@ -297,7 +292,7 @@ func TestConcurrentClientWith2TaskDiffTypes(t *testing.T) {
297292
gopool.NewTask(tFunc2).DrainTo(output2),
298293
}
299294

300-
pool := gopool.NewPool()
295+
pool := gopool.NewPool().WithLimit(1)
301296

302297
// Run the Task(s)
303298
err := pool.Go(requests...).Wait()
@@ -346,7 +341,7 @@ func TestConcurrentClientWith2TaskDiffTypes1Output(t *testing.T) {
346341
gopool.NewTask(tFunc2),
347342
}
348343

349-
pool := gopool.NewPool()
344+
pool := gopool.NewPool().WithLimit(1)
350345

351346
// Run the Task(s)
352347
err := pool.Go(requests...).Wait()
@@ -395,7 +390,7 @@ func TestConcurrentClientWith2TaskDiffTypes1Output1Input(t *testing.T) {
395390
gopool.NewTask(tFunc2).WithInput(&initial),
396391
}
397392

398-
pool := gopool.NewPool()
393+
pool := gopool.NewPool().WithLimit(1)
399394

400395
// Run the Task(s)
401396
err := pool.Go(requests...).Wait()

0 commit comments

Comments
 (0)