Skip to content

Commit 135cc55

Browse files
Handle nil control connection when reconnecting (#142)
1 parent 38957fa commit 135cc55

File tree

7 files changed

+74
-5
lines changed

7 files changed

+74
-5
lines changed

CHANGELOG/CHANGELOG-2.3.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ Changelog for the ZDM Proxy, new PRs should update the `unreleased` section.
44

55
When cutting a new release, update the `unreleased` heading to the tag being generated and date, like `## vX.Y.Z - YYYY-MM-DD` and create a new placeholder section for `unreleased` entries.
66

7+
## v2.3.3 - 2025-04-29
8+
9+
* [#142](https://github.com/datastax/zdm-proxy/pull/142): Handle nil control connection when reconnecting
10+
711
## v2.3.2 - 2025-04-14
812

913
* [#139](https://github.com/datastax/zdm-proxy/pull/139): Ignore forwarding CQL requests for DSE Insights Client to target cluster

RELEASE_NOTES.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,21 @@ Build artifacts are available at [Docker Hub](https://hub.docker.com/repository/
66

77
For additional details on the changes included in a specific release, see the associated CHANGELOG-x.x.md file.
88

9+
## v2.3.3 - 2025-04-29
10+
11+
Fix race condition that caused a crash while reconnecting the control connection
12+
13+
[Changelog](CHANGELOG/CHANGELOG-2.3.md#v233---2024-04-29)
14+
915
## v2.3.2 - 2025-04-14
1016

1117
Ignore forwarding CQL requests for DSE Insights Client to target cluster
1218

1319
Upgrade software dependencies to resolve vulnerabilities:
1420
- GoLang to 1.24.2
1521

22+
[Changelog](CHANGELOG/CHANGELOG-2.3.md#v232---2024-04-14)
23+
1624
## v2.3.1 - 2024-11-08
1725

1826
Upgrade software dependencies to resolve vulnerabilities:

integration-tests/connect_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,31 @@ func TestGoCqlConnect(t *testing.T) {
4646
require.Equal(t, "fake", iter.Columns()[0].Name)
4747
}
4848

49+
func TestCannotConnectWithoutControlConnection(t *testing.T) {
50+
c := setup.NewTestConfig("", "")
51+
testSetup, err := setup.NewSimulacronTestSetupWithSessionAndNodesAndConfig(t, true, false, 1, c, nil)
52+
require.Nil(t, err)
53+
defer testSetup.Cleanup()
54+
55+
// try to force a scenario where the control connection is reconnecting (cqlConn is nil)
56+
// when a new client handler is being created
57+
// controlConn.Open() triggers a cqlConn.Close() on the existing connection (and making it nil) before it opens a new one
58+
go func() {
59+
_, err := testSetup.Proxy.GetOriginControlConn().Open(false, context.Background())
60+
if err != nil {
61+
t.Logf("err while opening cc in the background: %v", err)
62+
}
63+
}()
64+
65+
for i := 0; i < 1000; i++ {
66+
// connect to proxy as a "client"
67+
client := cqlClient.NewCqlClient("127.0.0.1:14002", nil)
68+
conn, err := client.ConnectAndInit(context.Background(), primitive.ProtocolVersion4, 0)
69+
require.Nil(t, err)
70+
_ = conn.Close()
71+
}
72+
}
73+
4974
// Simulacron-based test to make sure that we can handle invalid protocol error and downgrade
5075
// used protocol on control connection. ORIGIN and TARGET are using the same C* version
5176
func TestControlConnectionProtocolVersionNegotiation(t *testing.T) {

proxy/launch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
)
1414

1515
// TODO: to be managed externally
16-
const ZdmVersionString = "2.3.2"
16+
const ZdmVersionString = "2.3.3"
1717

1818
var displayVersion = flag.Bool("version", false, "display the ZDM proxy version and exit")
1919
var configFile = flag.String("config", "", "specify path to ZDM configuration file")

proxy/pkg/zdmproxy/clienthandler.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,15 @@ func NewClientHandler(
159159
}
160160
}
161161

162+
originCCProtoVer, err := originControlConn.LoadProtoVersion()
163+
if err != nil {
164+
return nil, fmt.Errorf("failed to load protocol version from %v: %w", common.ClusterTypeOrigin, err)
165+
}
166+
targetCCProtoVer, err := targetControlConn.LoadProtoVersion()
167+
if err != nil {
168+
return nil, fmt.Errorf("failed to load protocol version from %v: %w", common.ClusterTypeTarget, err)
169+
}
170+
162171
nodeMetrics, err := metricHandler.GetNodeMetrics(originEndpointId, targetEndpointId, asyncEndpointId)
163172
if err != nil {
164173
return nil, fmt.Errorf("failed to create node metrics: %w", err)
@@ -169,8 +178,6 @@ func NewClientHandler(
169178
requestsDoneCtx, requestsDoneCancelFn := context.WithCancel(context.Background())
170179

171180
// Initialize stream id processors to manage the ids sent to the clusters
172-
originCCProtoVer := originControlConn.cqlConn.GetProtocolVersion()
173-
targetCCProtoVer := targetControlConn.cqlConn.GetProtocolVersion()
174181
// Calculate maximum number of stream IDs. Take the oldest protocol version negotiated between two clusters
175182
// and apply limit defined in proxy configuration. If origin or target cluster are still running protocol V2,
176183
// we will limit maximum number of stream IDs to 127 on both clusters. Logic is based on Java driver version 3.x.

proxy/pkg/zdmproxy/controlconn.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type ControlConn struct {
5555
protocolEventSubscribers map[ProtocolEventObserver]interface{}
5656
authEnabled *atomic.Value
5757
metricsHandler *metrics.MetricHandler
58+
controlConnProtoVersion *atomic.Value
5859
}
5960

6061
const ProxyVirtualRack = "rack0"
@@ -102,6 +103,7 @@ func NewControlConn(ctx context.Context, defaultPort int, connConfig ConnectionC
102103
protocolEventSubscribers: map[ProtocolEventObserver]interface{}{},
103104
authEnabled: authEnabled,
104105
metricsHandler: metricsHandler,
106+
controlConnProtoVersion: &atomic.Value{},
105107
}
106108
}
107109

@@ -378,7 +380,7 @@ func (cc *ControlConn) connAndNegotiateProtoVer(endpoint Endpoint, initialProtoV
378380
cc.connConfig.GetClusterType(), endpoint.GetEndpointIdentifier(), err)
379381
return nil, err
380382
}
381-
newConn := NewCqlConnection(endpoint, tcpConn, cc.username, cc.password, ccReadTimeout, ccWriteTimeout, cc.conf, protoVer)
383+
newConn := NewCqlConnection(cc, endpoint, tcpConn, cc.username, cc.password, ccReadTimeout, ccWriteTimeout, cc.conf, protoVer)
382384
err = newConn.InitializeContext(protoVer, ctx)
383385
var respErr *ResponseError
384386
if err != nil && errors.As(err, &respErr) && respErr.IsProtocolError() && strings.Contains(err.Error(), "Invalid or unsupported protocol version") {
@@ -717,6 +719,26 @@ func (cc *ControlConn) RemoveObserver(observer ProtocolEventObserver) {
717719
delete(cc.protocolEventSubscribers, observer)
718720
}
719721

722+
func (cc *ControlConn) StoreProtoVersion(protoVersion primitive.ProtocolVersion) {
723+
cc.controlConnProtoVersion.Store(protoVersion)
724+
}
725+
726+
func (cc *ControlConn) LoadProtoVersion() (primitive.ProtocolVersion, error) {
727+
var protoVersion primitive.ProtocolVersion
728+
protoVersionAny := cc.controlConnProtoVersion.Load()
729+
if protoVersionAny == nil {
730+
return protoVersion, errors.New("protocol version could not be retrieved")
731+
}
732+
733+
var ok bool
734+
protoVersion, ok = protoVersionAny.(primitive.ProtocolVersion)
735+
if ok {
736+
return protoVersion, nil
737+
}
738+
739+
panic("invalid type for protocol version")
740+
}
741+
720742
func computeAssignedHosts(index int, count int, orderedHosts []*Host) []*Host {
721743
i := 0
722744
assignedHosts := make([]*Host, 0)

proxy/pkg/zdmproxy/cqlconn.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type CqlConnection interface {
4747

4848
// Not thread safe
4949
type cqlConn struct {
50+
controlConn *ControlConn
5051
readTimeout time.Duration
5152
writeTimeout time.Duration
5253
endpoint Endpoint
@@ -82,12 +83,13 @@ func (c *cqlConn) String() string {
8283
}
8384

8485
func NewCqlConnection(
85-
endpoint Endpoint, conn net.Conn,
86+
controlConn *ControlConn, endpoint Endpoint, conn net.Conn,
8687
username string, password string,
8788
readTimeout time.Duration, writeTimeout time.Duration,
8889
conf *config.Config, protoVer primitive.ProtocolVersion) CqlConnection {
8990
ctx, cFn := context.WithCancel(context.Background())
9091
cqlConn := &cqlConn{
92+
controlConn: controlConn,
9193
readTimeout: readTimeout,
9294
writeTimeout: writeTimeout,
9395
endpoint: endpoint,
@@ -257,6 +259,7 @@ func (c *cqlConn) InitializeContext(version primitive.ProtocolVersion, ctx conte
257259
}
258260

259261
c.protocolVersion.Store(version)
262+
c.controlConn.StoreProtoVersion(version)
260263
c.initialized = true
261264
c.authEnabled = authEnabled
262265
return nil

0 commit comments

Comments
 (0)