File tree 7 files changed +29
-30
lines changed
7 files changed +29
-30
lines changed Original file line number Diff line number Diff line change @@ -20,7 +20,7 @@ const (
20
20
)
21
21
22
22
func BenchmarkResponseTimesRing (b * testing.B ) {
23
- var ring = New {Size : 8192 , MaxBatch : 127 }.SPSC ()
23
+ var ring = New {Size : 8192 , BatchSize : 127 }.SPSC ()
24
24
var wg sync.WaitGroup
25
25
wg .Add (2 )
26
26
var diffs = make ([]int64 , (b .N / sampleTimes )+ 1 )
Original file line number Diff line number Diff line change 1
1
package onering
2
2
3
3
// default max batch size for batched consumers
4
- const DefaultMaxBatch = 1023
4
+ const DefaultMaxBatch = ( 1 << 8 ) - 1
Original file line number Diff line number Diff line change @@ -12,8 +12,8 @@ type MPMC struct {
12
12
multi
13
13
}
14
14
15
- func (r * MPMC ) init (size uint32 ) {
16
- r .multi .init (size )
15
+ func (r * MPMC ) init (n * New ) {
16
+ r .multi .init (n )
17
17
for i := range r .seq {
18
18
r .seq [i ] = - int64 (i )
19
19
}
Original file line number Diff line number Diff line change @@ -8,11 +8,6 @@ type MPSC struct {
8
8
multi
9
9
}
10
10
11
- func (r * MPSC ) init (size uint32 ) {
12
- r .multi .init (size )
13
- r .rc = 1
14
- }
15
-
16
11
func (r * MPSC ) Get (i interface {}) bool {
17
12
var (
18
13
rc = r .rc
Original file line number Diff line number Diff line change @@ -43,30 +43,27 @@ type New struct {
43
43
// Size (Capacity) of the queue
44
44
Size uint32
45
45
// Maximum number of batched messages
46
- MaxBatch int32
46
+ BatchSize uint32
47
47
}
48
48
49
49
// SPSC constructs a Single Producer/Single Consumer queue
50
50
func (n New ) SPSC () Queue {
51
51
var spsc = new (SPSC )
52
- spsc .init (n .Size )
53
- spsc .maxbatch = n .BatchSize ()
52
+ spsc .init (& n )
54
53
return spsc
55
54
}
56
55
57
56
// MPSC constructs a Multi-Producer/Single Consumer queue
58
57
func (n New ) MPSC () Queue {
59
58
var mpsc = new (MPSC )
60
- mpsc .init (n .Size )
61
- mpsc .maxbatch = n .BatchSize ()
59
+ mpsc .init (& n )
62
60
return mpsc
63
61
}
64
62
65
63
// SPMC constructs a Single Producer/Multi-Consumer queue
66
64
func (n New ) SPMC () Queue {
67
65
var spmc = new (SPMC )
68
- spmc .init (n .Size )
69
- spmc .maxbatch = n .BatchSize ()
66
+ spmc .init (& n )
70
67
return spmc
71
68
}
72
69
@@ -75,18 +72,10 @@ func (n New) SPMC() Queue {
75
72
// However it will not implement many of the optimizations available to other queue types
76
73
func (n New ) MPMC () Queue {
77
74
var mpmc = new (MPMC )
78
- mpmc .init (n .Size )
79
- mpmc .maxbatch = n .BatchSize ()
75
+ mpmc .init (& n )
80
76
return mpmc
81
77
}
82
78
83
- func (n * New ) BatchSize () int64 {
84
- if n .MaxBatch > 0 {
85
- return int64 (n .MaxBatch )
86
- }
87
- return DefaultMaxBatch
88
- }
89
-
90
79
//type Waiter interface {
91
80
// Wait()
92
81
// Signal()
Original file line number Diff line number Diff line change 1
1
package onering
2
2
3
3
import (
4
- "math/bits"
5
4
"runtime"
6
5
"sync/atomic"
7
6
"unsafe"
@@ -24,9 +23,15 @@ type ring struct {
24
23
done int32
25
24
}
26
25
27
- func (r * ring ) init (size uint32 ) {
28
- r .data = make ([]unsafe.Pointer , 1 << uint ( 32 - bits . LeadingZeros32 ( size - 1 ) ))
26
+ func (r * ring ) init (n * New ) {
27
+ r .data = make ([]unsafe.Pointer , roundUp2 ( n . Size ))
29
28
r .mask = int64 (len (r .data ) - 1 )
29
+
30
+ var bs = n .BatchSize
31
+ if bs == 0 {
32
+ bs = DefaultMaxBatch
33
+ }
34
+ r .maxbatch = int64 (roundUp2 (bs ) - 1 )
30
35
}
31
36
32
37
func (r * ring ) Close () {
@@ -54,8 +59,8 @@ type multi struct {
54
59
seq []int64
55
60
}
56
61
57
- func (c * multi ) init (size uint32 ) {
58
- c .ring .init (size )
62
+ func (c * multi ) init (n * New ) {
63
+ c .ring .init (n )
59
64
c .size = int64 (len (c .data ))
60
65
c .seq = make ([]int64 , len (c .data ))
61
66
c .wp = 1 // just to avoid 0-awkwardness with seq
Original file line number Diff line number Diff line change @@ -28,3 +28,13 @@ func inject(i interface{}, ptr unsafe.Pointer) {
28
28
var v = (* unsafe .Pointer )((* iface )(unsafe .Pointer (& i )).d )
29
29
* v = ptr
30
30
}
31
+
32
+ func roundUp2 (v uint32 ) uint32 {
33
+ v --
34
+ v |= v >> 1
35
+ v |= v >> 2
36
+ v |= v >> 4
37
+ v |= v >> 8
38
+ v |= v >> 16
39
+ return v + 1
40
+ }
You can’t perform that action at this time.
0 commit comments