Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed buffered race condition #136

Merged
merged 4 commits into from
Jan 5, 2025

Conversation

echistyakov
Copy link
Contributor

@echistyakov echistyakov commented Dec 17, 2024

Fixing data race condition in buffered.go

Motivation:

I discovered this race condition after running a stress test with race flag against the Facebook RSocket Server/Client:

==================
WARNING: DATA RACE
Write at 0x00c0011a2430 by goroutine 3192:
  github.com/rsocket/rsocket-go/core/framing.(*bufferedFrame).Release()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/core/framing/buffered.go:64 +0xa6
  github.com/rsocket/rsocket-go/core/framing.(*PayloadFrame).Release()
      <autogenerated>:1 +0x3e
  github.com/rsocket/rsocket-go/internal/common.TryRelease()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/common/common.go:25 +0xba
  github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).RequestResponse.func1()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/duplex.go:258 +0x7a
  github.com/jjeffcaii/reactor-go/mono.(*processorSubscriber).handleFinally()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:208 +0x1c9
  github.com/jjeffcaii/reactor-go/mono.(*processorSubscriber).OnComplete()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:190 +0x65
  github.com/jjeffcaii/reactor-go/mono.(*processorSubscriber).Request()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:178 +0x2ae
  github.com/rsocket/rsocket-go/rx/mono.blockSubscriber.OnSubscribe()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/block_subscriber.go:69 +0x1f2
  github.com/rsocket/rsocket-go/rx/mono.(*blockSubscriber).OnSubscribe()
      <autogenerated>:1 +0x84
  github.com/jjeffcaii/reactor-go/mono.(*processorSubscriber).OnSubscribe()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:229 +0x12f
  github.com/jjeffcaii/reactor-go/mono.(*processor).SubscribeWith.func1()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:119 +0x64
  github.com/panjf2000/ants/v2.(*goWorker).run.func1()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/worker.go:67 +0x130

Previous read at 0x00c0011a2430 by goroutine 1945:
  github.com/rsocket/rsocket-go/core/framing.(*bufferedFrame).Release()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/core/framing/buffered.go:62 +0x4d
  github.com/rsocket/rsocket-go/core/framing.(*PayloadFrame).Release()
      <autogenerated>:1 +0x3e
  github.com/rsocket/rsocket-go/internal/common.TryRelease()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/common/common.go:25 +0xba
  github.com/rsocket/rsocket-go/internal/socket.requestResponseCallback.stopWithError()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/callback.go:35 +0x8a
  github.com/rsocket/rsocket-go/internal/socket.(*requestResponseCallback).stopWithError()
      <autogenerated>:1 +0x6d
  github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).destroyHandler()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/duplex.go:171 +0x121
  github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).Close()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/duplex.go:135 +0x2a8
  github.com/rsocket/rsocket-go/internal/socket.(*BaseSocket).close.func1()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/base_socket.go:86 +0x64
  sync.(*Once).doSlow()
      third-party/go/1.23.4/linux_amd64/src/sync/once.go:76 +0xe1
  sync.(*Once).Do()
      third-party/go/1.23.4/linux_amd64/src/sync/once.go:67 +0x44
  github.com/rsocket/rsocket-go/internal/socket.(*BaseSocket).close()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/base_socket.go:85 +0xb6
  github.com/rsocket/rsocket-go/internal/socket.(*BaseSocket).Close()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/base_socket.go:81 +0x3c
  github.com/rsocket/rsocket-go/internal/socket.(*simpleClientSocket).Close()
      <autogenerated>:1 +0x3e
  thrift/lib/go/thrift.(*rsocketClient).Close()
      fbcode/thrift/lib/go/thrift/rocket_rsocket_client.go:171 +0x81
  thrift/lib/go/thrift.(*rocketClient).Close()
      fbcode/thrift/lib/go/thrift/rocket_client.go:166 +0x57
  github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).Close()
      fbcode/thrift/lib/go/thrift/types/serial_channel.go:109 +0xc5
  github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyChannelClient).Close()
      fbcode/thrift/lib/go/thrift/dummy/svcs.go:60 +0x4b
  github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyClient).Close()
      fbcode/thrift/lib/go/thrift/dummy/svcs.go:79 +0x46
  thrift/lib/go/thrift/stress.runStressTest.func4.deferwrap1()
      fbcode/thrift/lib/go/thrift/stress/server_test.go:126 +0x33
  runtime.deferreturn()
      third-party/go/1.23.4/linux_amd64/src/runtime/panic.go:605 +0x5d
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      fbcode/third-party-go/vendor/golang.org/x/sync/errgroup/errgroup.go:78 +0xa1

Goroutine 3192 (running) created at:
  github.com/panjf2000/ants/v2.(*goWorker).run()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/worker.go:48 +0xc4
  github.com/panjf2000/ants/v2.(*Pool).retrieveWorker()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/pool.go:348 +0x384
  github.com/panjf2000/ants/v2.(*Pool).Submit()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/pool.go:222 +0x67
  github.com/jjeffcaii/reactor-go/scheduler.(*elasticScheduler).Do()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/scheduler/elastic.go:34 +0x56
  github.com/jjeffcaii/reactor-go/mono.(*processor).SubscribeWith()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/processor.go:118 +0x232
  github.com/jjeffcaii/reactor-go/mono.(*wrapper).SubscribeWith()
      <autogenerated>:1 +0x72
  github.com/rsocket/rsocket-go/rx/mono.toBlock()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/utils.go:169 +0x141
  github.com/rsocket/rsocket-go/rx/mono.(*oneshotProxy).BlockUnsafe()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/proxy_oneshot.go:114 +0x5a
  github.com/rsocket/rsocket-go/rx/mono.(*oneshotProxy).Block()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/proxy_oneshot.go:126 +0x78
  thrift/lib/go/thrift.(*rsocketClient).RequestResponse()
      fbcode/thrift/lib/go/thrift/rocket_rsocket_client.go:148 +0x15d
  thrift/lib/go/thrift.(*rocketClient).Flush()
      fbcode/thrift/lib/go/thrift/rocket_client.go:123 +0x436
  github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).sendMsg()
      fbcode/thrift/lib/go/thrift/types/serial_channel.go:61 +0x2f0
  github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).Call()
      fbcode/thrift/lib/go/thrift/types/serial_channel.go:118 +0x14f
  github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyChannelClient).Echo()
      fbcode/thrift/lib/go/thrift/dummy/svcs.go:87 +0x114
  github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyClient).Echo()
      fbcode/thrift/lib/go/thrift/dummy/svcs.go:95 +0x6c
  thrift/lib/go/thrift/stress.runStressTest.func4()
      fbcode/thrift/lib/go/thrift/stress/server_test.go:127 +0x2cd
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      fbcode/third-party-go/vendor/golang.org/x/sync/errgroup/errgroup.go:78 +0xa1

Goroutine 1945 (finished) created at:
  golang.org/x/sync/errgroup.(*Group).Go()
      fbcode/third-party-go/vendor/golang.org/x/sync/errgroup/errgroup.go:75 +0x15c
  thrift/lib/go/thrift/stress.runStressTest()
      fbcode/thrift/lib/go/thrift/stress/server_test.go:151 +0x77e
  thrift/lib/go/thrift/stress.TestServerStress.func1()
      fbcode/thrift/lib/go/thrift/stress/server_test.go:45 +0x2b
  testing.tRunner()
      third-party/go/1.23.4/linux_amd64/src/testing/testing.go:1690 +0x226
  testing.(*T).Run.gowrap1()
      third-party/go/1.23.4/linux_amd64/src/testing/testing.go:1743 +0x44
==================

It is possible for two separate goroutines to race on the inner field which can result in the following issues:

  1. A nil value of buffer being placed backed into the global pool (which later could result in a nil dereference panic).
  2. Buffer not being returned back to the global pool (which would make the implementation a little bit slower - because a buffer would be lost from the global pool forever and garbage collected).

Modifications:

To fix the issue - I made the inner field (the buffer) always accessed via atomic operations (atomic Store/Load/CompareAndSwap) to ensure that reads/writes to the inner field are atomic.

Result:

With this fix - the data race goes away.

Copy link
Member

@jjeffcaii jjeffcaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ensure codes be compatible with go1.18, thanks.

Copy link
Member

@jjeffcaii jjeffcaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jjeffcaii jjeffcaii merged commit 099cb5b into rsocket:master Jan 5, 2025
2 checks passed
@echistyakov echistyakov deleted the fix-buffered-race-condition branch January 6, 2025 17:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants