From 0b6c8a0b8a6c3197f5200999b904efe1c515750c Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 31 May 2024 16:35:56 +0800 Subject: [PATCH 1/3] fix rpc client panic cause by concurrent close Signed-off-by: crazycs520 --- internal/client/client.go | 3 ++- internal/client/client_test.go | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/internal/client/client.go b/internal/client/client.go index 8772cb159..a0ab40c36 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -532,7 +532,8 @@ func (c *RPCClient) closeConns() { if !c.isClosed { c.isClosed = true // close all connections - for _, array := range c.conns { + for addr, array := range c.conns { + delete(c.conns, addr) array.Close() } } diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 0ee82bb23..6098829e8 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -1047,3 +1047,23 @@ func TestFastFailWhenNoAvailableConn(t *testing.T) { require.Equal(t, "no available connections", err.Error()) require.Less(t, time.Since(start), timeout) } + +func TestConcurrentCloseConnPanic(t *testing.T) { + client := NewRPCClient() + addr := "127.0.0.1:6379" + _, err := client.getConnArray(addr, true) + assert.Nil(t, err) + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + err := client.Close() + assert.Nil(t, err) + }() + go func() { + defer wg.Done() + err := client.CloseAddr(addr) + assert.Nil(t, err) + }() + wg.Wait() +} From d9158cbdef7defb8f9686fe193b15d5874784abc Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 31 May 2024 16:52:28 +0800 Subject: [PATCH 2/3] address comment Signed-off-by: crazycs520 --- internal/client/client.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index a0ab40c36..a63599267 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -532,10 +532,10 @@ func (c *RPCClient) closeConns() { if !c.isClosed { c.isClosed = true // close all connections - for addr, array := range c.conns { - delete(c.conns, addr) + for _, array := range c.conns { array.Close() } + c.conns = nil } c.Unlock() } @@ -861,6 +861,10 @@ func (c *RPCClient) CloseAddr(addr string) error { func (c *RPCClient) CloseAddrVer(addr string, ver uint64) error { c.Lock() + if c.isClosed { + c.Unlock() + return nil + } conn, ok := c.conns[addr] if ok { if conn.ver <= ver { From c86091c28be30d61a7a6173332fca1412d0fd157 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 31 May 2024 16:57:04 +0800 Subject: [PATCH 3/3] refine Signed-off-by: crazycs520 --- internal/client/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/client/client.go b/internal/client/client.go index a63599267..067daefe4 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -535,7 +535,6 @@ func (c *RPCClient) closeConns() { for _, array := range c.conns { array.Close() } - c.conns = nil } c.Unlock() }