From de163b34fd4ea714b5c5d4c1df0dfe35ea948a8d Mon Sep 17 00:00:00 2001 From: Jack Chistyakov Date: Tue, 17 Dec 2024 14:44:30 -0800 Subject: [PATCH 1/4] Fixed buffered race condition --- core/framing/buffered.go | 65 ++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/core/framing/buffered.go b/core/framing/buffered.go index 62835d1..ab5dda5 100644 --- a/core/framing/buffered.go +++ b/core/framing/buffered.go @@ -12,30 +12,31 @@ import ( // bufferedFrame is basic frame implementation. type bufferedFrame struct { - inner *common.ByteBuff - refs int32 + innerPtr atomic.Pointer[common.ByteBuff] + refs atomic.Int32 } func newBufferedFrame(inner *common.ByteBuff) *bufferedFrame { - return &bufferedFrame{ - inner: inner, - refs: 1, - } + frame := &bufferedFrame{} + frame.innerPtr.Store(inner) + frame.refs.Store(1) + return frame } func (f *bufferedFrame) IncRef() int32 { - return atomic.AddInt32(&f.refs, 1) + return f.refs.Add(1) } func (f *bufferedFrame) RefCnt() int32 { - return atomic.LoadInt32(&f.refs) + return f.refs.Load() } func (f *bufferedFrame) Header() core.FrameHeader { - if f.inner == nil { + inner := f.innerPtr.Load() + if inner == nil { panic("frame has been released!") } - b := f.inner.Bytes() + b := inner.Bytes() _ = b[core.FrameHeaderLen-1] var h core.FrameHeader copy(h[:], b) @@ -43,52 +44,70 @@ func (f *bufferedFrame) Header() core.FrameHeader { } func (f *bufferedFrame) HasFlag(flag core.FrameFlag) bool { - if f.inner == nil { + inner := f.innerPtr.Load() + if inner == nil { panic("frame has been released!") } - n := binary.BigEndian.Uint16(f.inner.Bytes()[4:6]) + n := binary.BigEndian.Uint16(inner.Bytes()[4:6]) return core.FrameFlag(n&0x03FF)&flag == flag } func (f *bufferedFrame) StreamID() uint32 { - if f.inner == nil { + inner := f.innerPtr.Load() + if inner == nil { panic("frame has been released!") } - return binary.BigEndian.Uint32(f.inner.Bytes()[:4]) + return binary.BigEndian.Uint32(inner.Bytes()[:4]) } // Release releases resource. func (f *bufferedFrame) Release() { - if f != nil && f.inner != nil && atomic.AddInt32(&f.refs, -1) == 0 { - common.ReturnByteBuff(f.inner) - f.inner = nil + if f == nil { + return + } + refs := f.refs.Add(-1) + if refs > 0 { + return + } + inner := f.innerPtr.Load() + if inner != nil { + swapped := f.innerPtr.CompareAndSwap(inner, nil) + if swapped { + common.ReturnByteBuff(inner) + } } } // Body returns frame body. func (f *bufferedFrame) Body() []byte { - if f.inner == nil { + inner := f.innerPtr.Load() + if inner == nil { return nil } - b := f.inner.Bytes() + b := inner.Bytes() _ = b[core.FrameHeaderLen-1] return b[core.FrameHeaderLen:] } // Len returns length of frame. func (f *bufferedFrame) Len() int { - if f.inner == nil { + inner := f.innerPtr.Load() + if inner == nil { return 0 } - return f.inner.Len() + return inner.Len() } // WriteTo write frame to writer. func (f *bufferedFrame) WriteTo(w io.Writer) (n int64, err error) { - if f == nil || f.inner == nil { + if f == nil { + return + } + inner := f.innerPtr.Load() + if inner == nil { return } - n, err = f.inner.WriteTo(w) + n, err = inner.WriteTo(w) return } From 7faa77bb97c20b05ae0af7e792e83ee5c78d87c3 Mon Sep 17 00:00:00 2001 From: Jack Chistyakov Date: Thu, 26 Dec 2024 12:50:08 -0700 Subject: [PATCH 2/4] Fix go 1.18 compatibility --- core/framing/buffered.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/framing/buffered.go b/core/framing/buffered.go index ab5dda5..42af6a3 100644 --- a/core/framing/buffered.go +++ b/core/framing/buffered.go @@ -3,11 +3,11 @@ package framing import ( "encoding/binary" "io" - "sync/atomic" "github.com/rsocket/rsocket-go/core" "github.com/rsocket/rsocket-go/internal/common" "github.com/rsocket/rsocket-go/internal/u24" + "go.uber.org/atomic" ) // bufferedFrame is basic frame implementation. From d73c16cace9ae1199988c44ca1ed4c1031aa186f Mon Sep 17 00:00:00 2001 From: Jack Chistyakov Date: Thu, 26 Dec 2024 13:05:25 -0700 Subject: [PATCH 3/4] Bump go.uber.org/atomic to v1.10.0 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 77bf617..2d0032b 100644 --- a/go.mod +++ b/go.mod @@ -11,5 +11,5 @@ require ( github.com/pkg/profile v1.5.0 github.com/stretchr/testify v1.7.1 github.com/urfave/cli/v2 v2.3.0 - go.uber.org/atomic v1.7.0 + go.uber.org/atomic v1.10.0 ) diff --git a/go.sum b/go.sum index d81f9a6..db6ef36 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= From 853645519cc896ee166653543d1e73401d86b7de Mon Sep 17 00:00:00 2001 From: Jack Chistyakov Date: Thu, 26 Dec 2024 13:20:15 -0700 Subject: [PATCH 4/4] Use unsafe atomic pointer --- core/framing/buffered.go | 26 ++++++++++++++------------ go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/core/framing/buffered.go b/core/framing/buffered.go index 42af6a3..5a0dd7c 100644 --- a/core/framing/buffered.go +++ b/core/framing/buffered.go @@ -3,22 +3,24 @@ package framing import ( "encoding/binary" "io" + "sync/atomic" + "unsafe" "github.com/rsocket/rsocket-go/core" "github.com/rsocket/rsocket-go/internal/common" "github.com/rsocket/rsocket-go/internal/u24" - "go.uber.org/atomic" + uberatomic "go.uber.org/atomic" ) // bufferedFrame is basic frame implementation. type bufferedFrame struct { - innerPtr atomic.Pointer[common.ByteBuff] - refs atomic.Int32 + innerPtr unsafe.Pointer + refs uberatomic.Int32 } func newBufferedFrame(inner *common.ByteBuff) *bufferedFrame { frame := &bufferedFrame{} - frame.innerPtr.Store(inner) + atomic.StorePointer(&frame.innerPtr, unsafe.Pointer(inner)) frame.refs.Store(1) return frame } @@ -32,7 +34,7 @@ func (f *bufferedFrame) RefCnt() int32 { } func (f *bufferedFrame) Header() core.FrameHeader { - inner := f.innerPtr.Load() + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) if inner == nil { panic("frame has been released!") } @@ -44,7 +46,7 @@ func (f *bufferedFrame) Header() core.FrameHeader { } func (f *bufferedFrame) HasFlag(flag core.FrameFlag) bool { - inner := f.innerPtr.Load() + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) if inner == nil { panic("frame has been released!") } @@ -53,7 +55,7 @@ func (f *bufferedFrame) HasFlag(flag core.FrameFlag) bool { } func (f *bufferedFrame) StreamID() uint32 { - inner := f.innerPtr.Load() + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) if inner == nil { panic("frame has been released!") } @@ -69,9 +71,9 @@ func (f *bufferedFrame) Release() { if refs > 0 { return } - inner := f.innerPtr.Load() + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) if inner != nil { - swapped := f.innerPtr.CompareAndSwap(inner, nil) + swapped := atomic.CompareAndSwapPointer(&f.innerPtr, unsafe.Pointer(inner), unsafe.Pointer(nil)) if swapped { common.ReturnByteBuff(inner) } @@ -80,7 +82,7 @@ func (f *bufferedFrame) Release() { // Body returns frame body. func (f *bufferedFrame) Body() []byte { - inner := f.innerPtr.Load() + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) if inner == nil { return nil } @@ -91,7 +93,7 @@ func (f *bufferedFrame) Body() []byte { // Len returns length of frame. func (f *bufferedFrame) Len() int { - inner := f.innerPtr.Load() + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) if inner == nil { return 0 } @@ -103,7 +105,7 @@ func (f *bufferedFrame) WriteTo(w io.Writer) (n int64, err error) { if f == nil { return } - inner := f.innerPtr.Load() + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) if inner == nil { return } diff --git a/go.mod b/go.mod index 2d0032b..77bf617 100644 --- a/go.mod +++ b/go.mod @@ -11,5 +11,5 @@ require ( github.com/pkg/profile v1.5.0 github.com/stretchr/testify v1.7.1 github.com/urfave/cli/v2 v2.3.0 - go.uber.org/atomic v1.10.0 + go.uber.org/atomic v1.7.0 ) diff --git a/go.sum b/go.sum index db6ef36..d81f9a6 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= -go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=