@@ -52,9 +52,9 @@ const (
52
52
53
53
type (
54
54
slot [T any ] struct {
55
- State uint32
56
- WriteParker * ThreadParker [T ]
57
- Item T
55
+ state uint32
56
+ writeParker * ThreadParker [T ]
57
+ item T
58
58
}
59
59
60
60
selectFactory struct {
@@ -101,7 +101,7 @@ func New[T any](size uint32) *ZenQ[T] {
101
101
for idx := uint32 (0 ); idx < queueSize ; idx ++ {
102
102
n := parkPool .Get ().(* parkSpot [T ])
103
103
n .threadPtr , n .next = nil , nil
104
- contents [idx ].WriteParker = NewThreadParker [T ](unsafe .Pointer (n ))
104
+ contents [idx ].writeParker = NewThreadParker [T ](unsafe .Pointer (n ))
105
105
}
106
106
zenq := & ZenQ [T ]{
107
107
strideLength : unsafe .Sizeof (slot [T ]{}),
@@ -153,14 +153,14 @@ direct_send:
153
153
slot := (* slot [T ])(unsafe .Pointer (uintptr (atomic .AddUint32 (& self .writerIndex , 1 )& self .indexMask )* self .strideLength + uintptr (self .contents )))
154
154
155
155
// CAS -> change slot_state to busy if slot_state == empty
156
- for ! atomic .CompareAndSwapUint32 (& slot .State , SlotEmpty , SlotBusy ) {
157
- switch atomic .LoadUint32 (& slot .State ) {
156
+ for ! atomic .CompareAndSwapUint32 (& slot .state , SlotEmpty , SlotBusy ) {
157
+ switch atomic .LoadUint32 (& slot .state ) {
158
158
case SlotBusy :
159
159
wait ()
160
160
case SlotCommitted :
161
161
n := self .alloc ().(* parkSpot [T ])
162
162
n .threadPtr , n .next , n .value = GetG (), nil , value
163
- slot .WriteParker .Park (unsafe .Pointer (n ))
163
+ slot .writeParker .Park (unsafe .Pointer (n ))
164
164
mcall (fast_park )
165
165
return
166
166
case SlotEmpty :
@@ -169,8 +169,8 @@ direct_send:
169
169
return
170
170
}
171
171
}
172
- slot .Item = value
173
- atomic .StoreUint32 (& slot .State , SlotCommitted )
172
+ slot .item = value
173
+ atomic .StoreUint32 (& slot .state , SlotCommitted )
174
174
return
175
175
}
176
176
@@ -179,13 +179,13 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
179
179
slot := (* slot [T ])(unsafe .Pointer (uintptr (atomic .AddUint32 (& self .readerIndex , 1 )& self .indexMask )* self .strideLength + uintptr (self .contents )))
180
180
181
181
// CAS -> change slot_state to busy if slot_state == committed
182
- for ! atomic .CompareAndSwapUint32 (& slot .State , SlotCommitted , SlotBusy ) {
183
- switch atomic .LoadUint32 (& slot .State ) {
182
+ for ! atomic .CompareAndSwapUint32 (& slot .state , SlotCommitted , SlotBusy ) {
183
+ switch atomic .LoadUint32 (& slot .state ) {
184
184
case SlotBusy :
185
185
wait ()
186
186
case SlotEmpty :
187
187
var freeable * parkSpot [T ]
188
- if data , queueOpen , freeable = slot .WriteParker .Ready (); queueOpen {
188
+ if data , queueOpen , freeable = slot .writeParker .Ready (); queueOpen {
189
189
if freeable != nil {
190
190
self .free (freeable )
191
191
}
@@ -199,7 +199,7 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
199
199
return
200
200
}
201
201
case SlotClosed :
202
- if atomic .CompareAndSwapUint32 (& slot .State , SlotClosed , SlotEmpty ) {
202
+ if atomic .CompareAndSwapUint32 (& slot .state , SlotClosed , SlotEmpty ) {
203
203
atomic .StoreUint32 (& self .globalState , StateFullyClosed )
204
204
}
205
205
queueOpen = false
@@ -208,8 +208,8 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
208
208
continue
209
209
}
210
210
}
211
- data , queueOpen = slot .Item , true
212
- atomic .StoreUint32 (& slot .State , SlotEmpty )
211
+ data , queueOpen = slot .item , true
212
+ atomic .StoreUint32 (& slot .state , SlotEmpty )
213
213
return
214
214
}
215
215
@@ -228,8 +228,8 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) {
228
228
slot := (* slot [T ])(unsafe .Pointer (uintptr (atomic .AddUint32 (& self .writerIndex , 1 )& self .indexMask )* self .strideLength + uintptr (self .contents )))
229
229
230
230
// CAS -> change slot_state to busy if slot_state == empty
231
- for ! atomic .CompareAndSwapUint32 (& slot .State , SlotEmpty , SlotBusy ) {
232
- switch atomic .LoadUint32 (& slot .State ) {
231
+ for ! atomic .CompareAndSwapUint32 (& slot .state , SlotEmpty , SlotBusy ) {
232
+ switch atomic .LoadUint32 (& slot .state ) {
233
233
case SlotBusy , SlotCommitted :
234
234
mcall (gosched_m )
235
235
case SlotEmpty :
@@ -239,7 +239,7 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) {
239
239
}
240
240
}
241
241
// Closing commit
242
- atomic .StoreUint32 (& slot .State , SlotClosed )
242
+ atomic .StoreUint32 (& slot .state , SlotClosed )
243
243
return
244
244
}
245
245
@@ -293,9 +293,10 @@ func (self *ZenQ[T]) Reset() {
293
293
// Unsafe to be called from multiple goroutines
294
294
func (self * ZenQ [T ]) Dump () {
295
295
fmt .Printf ("writerIndex: %3d, readerIndex: %3d\n contents:-\n \n " , self .writerIndex , self .readerIndex )
296
- // for idx := range self.contents {
297
- // fmt.Printf("%5v : State -> %5v, Item -> %5v\n", idx, self.contents[idx].State, self.contents[idx].Item)
298
- // }
296
+ for idx := uintptr (0 ); idx <= uintptr (self .indexMask ); idx ++ {
297
+ slot := (* slot [T ])(unsafe .Pointer (uintptr (self .contents ) + idx * unsafe .Sizeof (slot [T ]{})))
298
+ fmt .Printf ("Slot -> %#v\n " , * slot )
299
+ }
299
300
}
300
301
301
302
// selectSender is an auxillary thread which remains parked by default
0 commit comments