Skip to content

Commit 06f453d

Browse files
committed
use lower memory bandwidth data types
1 parent 12be04b commit 06f453d

File tree

2 files changed

+9
-12
lines changed

2 files changed

+9
-12
lines changed

benchmarks/simple/main.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,10 @@ func zenqProducer() {
8686
}
8787

8888
func zenqConsumer() {
89+
var data Payload
8990
for i := uint64(0); i < currSize; i++ {
90-
if data, ok := zq.Read(); ok {
91-
validatePayload(data)
92-
} else {
93-
panic("ZenQ is closed")
94-
}
95-
91+
data, _ = zq.Read()
92+
validatePayload(data)
9693
}
9794
}
9895

zenq.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ type (
7575
_p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
7676
globalState uint32
7777
indexMask uint32
78-
strideLength uintptr
78+
strideLength uint16
7979
contents unsafe.Pointer
8080
// memory pool refs for storing and leasing parking spots for goroutines
8181
alloc func() any
8282
free func(any)
83-
_p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {}) - unsafe.Sizeof(unsafe.Pointer(nil)) - unsafe.Sizeof(uintptr(0))]byte
83+
_p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 4*unsafe.Sizeof(unsafe.Pointer(nil))]byte
8484
selectFactory
8585
_p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte
8686
}
@@ -104,7 +104,7 @@ func New[T any](size uint32) *ZenQ[T] {
104104
contents[idx].writeParker = NewThreadParker[T](unsafe.Pointer(n))
105105
}
106106
zenq := &ZenQ[T]{
107-
strideLength: unsafe.Sizeof(slot[T]{}),
107+
strideLength: uint16(unsafe.Sizeof(slot[T]{})),
108108
contents: unsafe.Pointer(&contents[0]),
109109
alloc: parkPool.Get,
110110
free: parkPool.Put,
@@ -150,7 +150,7 @@ direct_send:
150150
goto direct_send
151151
}
152152

153-
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents)))
153+
slot := (*slot[T])(unsafe.Pointer(uintptr((atomic.AddUint32(&self.writerIndex, 1)&self.indexMask))*uintptr(self.strideLength) + uintptr(self.contents)))
154154

155155
// CAS -> change slot_state to busy if slot_state == empty
156156
for !atomic.CompareAndSwapUint32(&slot.state, SlotEmpty, SlotBusy) {
@@ -176,7 +176,7 @@ direct_send:
176176

177177
// Read reads a value from the queue, you can once read once per object
178178
func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
179-
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents)))
179+
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*uintptr(self.strideLength) + uintptr(self.contents)))
180180

181181
// CAS -> change slot_state to busy if slot_state == committed
182182
for !atomic.CompareAndSwapUint32(&slot.state, SlotCommitted, SlotBusy) {
@@ -225,7 +225,7 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) {
225225
alreadyClosedForWrites = true
226226
return
227227
}
228-
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents)))
228+
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*uintptr(self.strideLength) + uintptr(self.contents)))
229229

230230
// CAS -> change slot_state to busy if slot_state == empty
231231
for !atomic.CompareAndSwapUint32(&slot.state, SlotEmpty, SlotBusy) {

0 commit comments

Comments
 (0)