diff --git a/internal/socket/subscriber_request_response.go b/internal/socket/subscriber_request_response.go index 3cc4d4f..0a60963 100644 --- a/internal/socket/subscriber_request_response.go +++ b/internal/socket/subscriber_request_response.go @@ -37,7 +37,7 @@ type requestResponseSubscriber struct { dc *DuplexConnection sid uint32 receiving fragmentation.HeaderAndPayload - sndCnt int32 + sndCnt atomic.Int32 } func borrowRequestResponseSubscriber(dc *DuplexConnection, sid uint32, receiving fragmentation.HeaderAndPayload) rx.Subscriber { @@ -55,13 +55,13 @@ func returnRequestResponseSubscriber(s rx.Subscriber) { } actual.dc = nil actual.receiving = nil - actual.sndCnt = 0 + actual.sndCnt.Store(0) globalRequestResponseSubscriberPool.put(actual) } func (r *requestResponseSubscriber) OnNext(next payload.Payload) { r.dc.sendPayload(r.sid, next, core.FlagNext|core.FlagComplete) - atomic.AddInt32(&r.sndCnt, 1) + r.sndCnt.Add(1) } func (r *requestResponseSubscriber) OnError(err error) { @@ -73,7 +73,7 @@ func (r *requestResponseSubscriber) OnError(err error) { } func (r *requestResponseSubscriber) OnComplete() { - if atomic.AddInt32(&r.sndCnt, 1) == 1 { + if r.sndCnt.Add(1) == 1 { r.dc.sendPayload(r.sid, payload.Empty(), core.FlagComplete) } r.dc.unregister(r.sid)