Skip to content

Commit 8ab38be

Browse files
committed
support arm64
1 parent a55c797 commit 8ab38be

File tree

6 files changed

+49
-16
lines changed

6 files changed

+49
-16
lines changed

buffer_manager.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ type bufferList struct {
8787
// it points to the last free buffer, whose offset in bufferRegion
8888
tail *uint32
8989
capPerBuffer *uint32
90-
pushCount *uint64
91-
popCount *uint64
90+
counter *int32
9291
//underlying memory
9392
bufferRegion []byte
9493
bufferRegionOffsetInShm uint32
@@ -360,8 +359,7 @@ func createFreeBufferList(bufferNum, capPerBuffer uint32, mem []byte, offsetInMe
360359
head: (*uint32)(unsafe.Pointer(&mem[offsetInMem+8])),
361360
tail: (*uint32)(unsafe.Pointer(&mem[offsetInMem+12])),
362361
capPerBuffer: (*uint32)(unsafe.Pointer(&mem[offsetInMem+16])),
363-
pushCount: (*uint64)(unsafe.Pointer(&mem[offsetInMem+20])),
364-
popCount: (*uint64)(unsafe.Pointer(&mem[offsetInMem+28])),
362+
counter: (*int32)(unsafe.Pointer(&mem[offsetInMem+20])),
365363
bufferRegion: mem[offsetInMem+bufferListHeaderSize : offsetInMem+atLeastSize],
366364
bufferRegionOffsetInShm: offsetInMem + bufferListHeaderSize,
367365
offsetInShm: offsetInMem,
@@ -371,8 +369,7 @@ func createFreeBufferList(bufferNum, capPerBuffer uint32, mem []byte, offsetInMe
371369
*b.head = 0
372370
*b.tail = (bufferNum - 1) * (capPerBuffer + bufferHeaderSize)
373371
*b.capPerBuffer = capPerBuffer
374-
*b.pushCount = 0
375-
*b.popCount = 0
372+
*b.counter = 0
376373
internalLogger.infof("createFreeBufferList bufferNum:%d capPerBuffer:%d offsetInMem:%d needSize:%d bufferRegionLen:%d",
377374
bufferNum, capPerBuffer, offsetInMem, atLeastSize, len(b.bufferRegion))
378375

@@ -404,8 +401,7 @@ func mappingFreeBufferList(mem []byte, offset uint32) (*bufferList, error) {
404401
head: (*uint32)(unsafe.Pointer(&mem[offset+8])),
405402
tail: (*uint32)(unsafe.Pointer(&mem[offset+12])),
406403
capPerBuffer: (*uint32)(unsafe.Pointer(&mem[offset+16])),
407-
pushCount: (*uint64)(unsafe.Pointer(&mem[offset+20])),
408-
popCount: (*uint64)(unsafe.Pointer(&mem[offset+28])),
404+
counter: (*int32)(unsafe.Pointer(&mem[offset+24])),
409405
offsetInShm: offset,
410406
}
411407
needSize := countBufferListMemSize(*b.cap, *b.capPerBuffer)
@@ -433,7 +429,7 @@ func (b *bufferList) pop() (*bufferSlice, error) {
433429
h := bufferHeader(b.bufferRegion[oldHead : oldHead+bufferHeaderSize])
434430
h.clearFlag()
435431
h.setInUsed()
436-
atomic.AddUint64(b.popCount, 1)
432+
atomic.AddInt32(b.counter, 1)
437433
return newBufferSlice(h,
438434
b.bufferRegion[oldHead+bufferHeaderSize:oldHead+bufferHeaderSize+*b.capPerBuffer],
439435
oldHead+b.bufferRegionOffsetInShm, true), nil
@@ -459,7 +455,7 @@ func (b *bufferList) push(buffer *bufferSlice) {
459455
if atomic.CompareAndSwapUint32(b.tail, oldTail, newTail) {
460456
bufferHeader(b.bufferRegion[oldTail : oldTail+bufferHeaderSize]).linkNext(newTail)
461457
atomic.AddInt32(b.size, 1)
462-
atomic.AddUint64(b.pushCount, 1)
458+
atomic.AddInt32(b.counter, -1)
463459
return
464460
}
465461
}
@@ -610,7 +606,7 @@ func (b *bufferManager) checkBufferReturned() bool {
610606
if uint32(atomic.LoadInt32(l.size)) != atomic.LoadUint32(l.cap) {
611607
return false
612608
}
613-
if atomic.LoadUint64(l.pushCount) != atomic.LoadUint64(l.popCount) {
609+
if atomic.LoadInt32(l.counter) != 0 {
614610
return false
615611
}
616612
}

config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,19 @@ func VerifyConfig(config *Config) error {
111111
return fmt.Errorf("BufferSliceSizes's Size:%d couldn't greater than ShareMemoryBufferCap:%d",
112112
pair.Size, config.ShareMemoryBufferCap)
113113
}
114+
115+
if isArmArch() && pair.Size%4 != 0 {
116+
return fmt.Errorf("the SizePercentPair.Size must be a multiple of 4")
117+
}
114118
}
115119
if sum != 100 {
116120
return errors.New("the sum of BufferSliceSizes's Percent should be 100")
117121
}
118122

123+
if isArmArch() && config.QueueCap%8 != 0 {
124+
return fmt.Errorf("the QueueCap must be a multiple of 8")
125+
}
126+
119127
if config.ShareMemoryPathPrefix == "" || config.QueuePath == "" {
120128
return errors.New("buffer path or queue path could not be nil")
121129
}
@@ -124,5 +132,9 @@ func VerifyConfig(config *Config) error {
124132
return ErrOSNonSupported
125133
}
126134

135+
if runtime.GOARCH != "amd64" && runtime.GOARCH != "arm64" {
136+
return ErrArchNonSupported
137+
}
138+
127139
return nil
128140
}

errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ var (
6767
//ErrOSNonSupported means that shmipc couldn't work in current OS. (only support Linux now)
6868
ErrOSNonSupported = errors.New("shmipc just support linux OS now")
6969

70+
//ErrArchNonSupported means that shmipc only support amd64 and arm64
71+
ErrArchNonSupported = errors.New("shmipc just support amd64 or arm64 arch")
72+
7073
//ErrHotRestartInProgress was returned by Listener.HotRestart when the Session had under the hot restart state
7174
ErrHotRestartInProgress = errors.New("hot restart in progress, try again later")
7275

queue.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ func mappingQueueManagerMemfd(queuePathName string, memFd int) (*queueManager, e
126126
}
127127

128128
mappingSize := int(fileInfo.Size)
129+
//a queueManager have two queue, a queue's head and tail should align to 8 byte boundary
130+
if isArmArch() && mappingSize%16 != 0 {
131+
return nil, fmt.Errorf("the memory size of queue should be a multiple of 16")
132+
}
129133
mem, err := syscall.Mmap(memFd, 0, mappingSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
130134
if err != nil {
131135
return nil, err
@@ -152,6 +156,10 @@ func mappingQueueManager(shmPath string) (*queueManager, error) {
152156
}
153157
mappingSize := int(fileInfo.Size())
154158

159+
//a queueManager have two queue, a queue's head and tail should align to 8 byte boundary
160+
if isArmArch() && mappingSize%16 != 0 {
161+
return nil, fmt.Errorf("the memory size of queue should be a multiple of 16")
162+
}
155163
mem, err := syscall.Mmap(int(f.Fd()), 0, mappingSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
156164
if err != nil {
157165
return nil, err
@@ -181,6 +189,16 @@ func mappingQueueFromBytes(data []byte) *queue {
181189
cap := *(*uint32)(unsafe.Pointer(&data[0]))
182190
queueStartOffset := queueHeaderLength
183191
queueEndOffset := queueHeaderLength + cap*queueElementLen
192+
if isArmArch() {
193+
// align 8 byte boundary for head and tail
194+
return &queue{
195+
cap: int64(cap),
196+
workingFlag: (*uint32)(unsafe.Pointer(&data[4])),
197+
head: (*int64)(unsafe.Pointer(&data[8])),
198+
tail: (*int64)(unsafe.Pointer(&data[16])),
199+
queueBytesOnMemory: data[queueStartOffset:queueEndOffset],
200+
}
201+
}
184202
return &queue{
185203
cap: int64(cap),
186204
head: (*int64)(unsafe.Pointer(&data[4])),

stream_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func newClientServerWithNoCheck(conf *Config) (client *Session, server *Session)
5454
// Close 94.4%
5555
func TestStream_Close(t *testing.T) {
5656
c := testConf()
57-
c.QueueCap = 1
57+
c.QueueCap = 8
5858
client, server := newClientServerWithNoCheck(c)
5959
defer server.Close()
6060
defer client.Close()
@@ -312,14 +312,14 @@ func TestStream_HalfClose(t *testing.T) {
312312

313313
func TestStream_SendQueueFull(t *testing.T) {
314314
conf := testConf()
315-
conf.QueueCap = 1 // can only contain 1 queue elem(12B)
315+
conf.QueueCap = 8 // can only contain 1 queue elem(12B)
316316
client, server := testClientServerConfig(conf)
317317
defer client.Close()
318318
defer server.Close()
319319

320320
done := make(chan struct{})
321321
dataSize := 10
322-
mockDataLength := 50
322+
mockDataLength := 500
323323
mockData := make([][]byte, mockDataLength)
324324
for i := range mockData {
325325
mockData[i] = make([]byte, dataSize)
@@ -367,13 +367,13 @@ func TestStream_SendQueueFull(t *testing.T) {
367367

368368
func TestStream_SendQueueFullTimeout(t *testing.T) {
369369
conf := testConf()
370-
conf.QueueCap = 1 // can only contain 1 queue elem(12B)
370+
conf.QueueCap = 8 // can only contain 1 queue elem(12B)
371371
client, server := testClientServerConfig(conf)
372372
defer client.Close()
373373
defer server.Close()
374374

375375
dataSize := 10
376-
mockDataLength := 5
376+
mockDataLength := 500
377377
mockData := make([][]byte, mockDataLength)
378378
for i := range mockData {
379379
mockData[i] = make([]byte, dataSize)

util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,7 @@ func safeRemoveUdsFile(filename string) bool {
133133

134134
return true
135135
}
136+
137+
func isArmArch() bool {
138+
return runtime.GOARCH == "arm" || runtime.GOARCH == "arm64"
139+
}

0 commit comments

Comments
 (0)