Skip to content

Commit

Permalink
Fixed buffered race condition (#136)
Browse files Browse the repository at this point in the history
* Fixed buffered race condition

* Fix go 1.18 compatibility

* Bump go.uber.org/atomic to v1.10.0

* Use unsafe atomic pointer
  • Loading branch information
echistyakov authored Jan 5, 2025
1 parent f3f8376 commit 099cb5b
Showing 1 changed file with 44 additions and 23 deletions.
67 changes: 44 additions & 23 deletions core/framing/buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,91 +4,112 @@ import (
"encoding/binary"
"io"
"sync/atomic"
"unsafe"

"github.com/rsocket/rsocket-go/core"
"github.com/rsocket/rsocket-go/internal/common"
"github.com/rsocket/rsocket-go/internal/u24"
uberatomic "go.uber.org/atomic"
)

// bufferedFrame is basic frame implementation.
type bufferedFrame struct {
inner *common.ByteBuff
refs int32
innerPtr unsafe.Pointer
refs uberatomic.Int32
}

func newBufferedFrame(inner *common.ByteBuff) *bufferedFrame {
return &bufferedFrame{
inner: inner,
refs: 1,
}
frame := &bufferedFrame{}
atomic.StorePointer(&frame.innerPtr, unsafe.Pointer(inner))
frame.refs.Store(1)
return frame
}

func (f *bufferedFrame) IncRef() int32 {
return atomic.AddInt32(&f.refs, 1)
return f.refs.Add(1)
}

func (f *bufferedFrame) RefCnt() int32 {
return atomic.LoadInt32(&f.refs)
return f.refs.Load()
}

func (f *bufferedFrame) Header() core.FrameHeader {
if f.inner == nil {
inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr))
if inner == nil {
panic("frame has been released!")
}
b := f.inner.Bytes()
b := inner.Bytes()
_ = b[core.FrameHeaderLen-1]
var h core.FrameHeader
copy(h[:], b)
return h
}

func (f *bufferedFrame) HasFlag(flag core.FrameFlag) bool {
if f.inner == nil {
inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr))
if inner == nil {
panic("frame has been released!")
}
n := binary.BigEndian.Uint16(f.inner.Bytes()[4:6])
n := binary.BigEndian.Uint16(inner.Bytes()[4:6])
return core.FrameFlag(n&0x03FF)&flag == flag
}

func (f *bufferedFrame) StreamID() uint32 {
if f.inner == nil {
inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr))
if inner == nil {
panic("frame has been released!")
}
return binary.BigEndian.Uint32(f.inner.Bytes()[:4])
return binary.BigEndian.Uint32(inner.Bytes()[:4])
}

// Release releases resource.
func (f *bufferedFrame) Release() {
if f != nil && f.inner != nil && atomic.AddInt32(&f.refs, -1) == 0 {
common.ReturnByteBuff(f.inner)
f.inner = nil
if f == nil {
return
}
refs := f.refs.Add(-1)
if refs > 0 {
return
}
inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr))
if inner != nil {
swapped := atomic.CompareAndSwapPointer(&f.innerPtr, unsafe.Pointer(inner), unsafe.Pointer(nil))
if swapped {
common.ReturnByteBuff(inner)
}
}
}

// Body returns frame body.
func (f *bufferedFrame) Body() []byte {
if f.inner == nil {
inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr))
if inner == nil {
return nil
}
b := f.inner.Bytes()
b := inner.Bytes()
_ = b[core.FrameHeaderLen-1]
return b[core.FrameHeaderLen:]
}

// Len returns length of frame.
func (f *bufferedFrame) Len() int {
if f.inner == nil {
inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr))
if inner == nil {
return 0
}
return f.inner.Len()
return inner.Len()
}

// WriteTo write frame to writer.
func (f *bufferedFrame) WriteTo(w io.Writer) (n int64, err error) {
if f == nil || f.inner == nil {
if f == nil {
return
}
inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr))
if inner == nil {
return
}
n, err = f.inner.WriteTo(w)
n, err = inner.WriteTo(w)
return
}

Expand Down

0 comments on commit 099cb5b

Please sign in to comment.