Skip to content

Commit dd94572

Browse files
authored
Revert "Expire idle connections no longer acquired during lifetime (ClickHouse#945)" (ClickHouse#958)
This reverts commit 0d10a70.
1 parent a0b3148 commit dd94572

7 files changed

+5
-113
lines changed

clickhouse.go

-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ func (ch *clickhouse) dial(ctx context.Context) (conn *connect, err error) {
211211
if err != nil {
212212
return nil, err
213213
}
214-
go result.conn.closeAfterMaxLifeTime()
215214
return result.conn, nil
216215
}
217216

conn.go

+5-40
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"log"
2626
"net"
2727
"os"
28-
"sync"
2928
"syscall"
3029
"time"
3130

@@ -129,8 +128,6 @@ type connect struct {
129128
readTimeout time.Duration
130129
blockBufferSize uint8
131130
maxCompressionBuffer int
132-
133-
rwLock sync.Mutex
134131
}
135132

136133
func (c *connect) settings(querySettings Settings) []proto.Setting {
@@ -151,7 +148,8 @@ func (c *connect) settings(querySettings Settings) []proto.Setting {
151148
}
152149

153150
func (c *connect) isBad() bool {
154-
if c.isClosed() {
151+
switch {
152+
case c.closed:
155153
return true
156154
}
157155

@@ -165,48 +163,15 @@ func (c *connect) isBad() bool {
165163
return false
166164
}
167165

168-
// closeAfterMaxLifeTime closes the connection if it has been used for longer than ConnMaxLifeTime
169-
func (c *connect) closeAfterMaxLifeTime() {
170-
t := time.NewTimer(c.opt.ConnMaxLifetime)
171-
defer t.Stop()
172-
173-
// check if connection should be closed after duration of ConnMaxLifeTime
174-
// if connection is closed, return
175-
for {
176-
select {
177-
case <-t.C:
178-
c.close()
179-
return
180-
default:
181-
if c.isClosed() {
182-
return
183-
}
184-
185-
time.Sleep(time.Second)
186-
}
187-
}
188-
}
189-
190-
func (c *connect) isClosed() bool {
191-
c.rwLock.Lock()
192-
defer c.rwLock.Unlock()
193-
194-
return c.closed
195-
}
196-
197166
func (c *connect) close() error {
198-
c.rwLock.Lock()
199-
defer c.rwLock.Unlock()
200-
201167
if c.closed {
202168
return nil
203169
}
204-
205170
c.closed = true
206171
c.buffer = nil
207172
c.reader = nil
208173
if err := c.conn.Close(); err != nil {
209-
c.debugf("[close] %s", err)
174+
return err
210175
}
211176
return nil
212177
}
@@ -271,10 +236,10 @@ func (c *connect) sendData(block *proto.Block, name string) error {
271236
if err := c.flush(); err != nil {
272237
if errors.Is(err, syscall.EPIPE) {
273238
c.debugf("[send data] pipe is broken, closing connection")
274-
c.close()
239+
c.closed = true
275240
} else if errors.Is(err, io.EOF) {
276241
c.debugf("[send data] unexpected EOF, closing connection")
277-
c.close()
242+
c.closed = true
278243
} else {
279244
c.debugf("[send data] unexpected error: %v", err)
280245
}

conn_handshake.go

-6
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ import (
2626
)
2727

2828
func (c *connect) handshake(database, username, password string) error {
29-
c.rwLock.Lock()
30-
defer c.rwLock.Unlock()
31-
3229
defer c.buffer.Reset()
3330
c.debugf("[handshake] -> %s", proto.ClientHandshake{})
3431
// set a read deadline - alternative to context.Read operation will fail if no data is received after deadline.
@@ -86,9 +83,6 @@ func (c *connect) handshake(database, username, password string) error {
8683
}
8784

8885
func (c *connect) sendAddendum() error {
89-
c.rwLock.Lock()
90-
defer c.rwLock.Unlock()
91-
9286
if c.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY {
9387
c.buffer.PutString("") // todo quota key support
9488
}

conn_ping.go

-3
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ import (
2828
// Connection::ping
2929
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
3030
func (c *connect) ping(ctx context.Context) (err error) {
31-
c.rwLock.Lock()
32-
defer c.rwLock.Unlock()
33-
3431
// set a read deadline - alternative to context.Read operation will fail if no data is received after deadline.
3532
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
3633
defer c.conn.SetReadDeadline(time.Time{})

conn_process.go

-5
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ func (c *connect) process(ctx context.Context, on *onProcess) error {
6666
return ctx.Err()
6767
default:
6868
}
69-
c.rwLock.Lock()
7069
packet, err := c.reader.ReadByte()
71-
c.rwLock.Unlock()
7270
if err != nil {
7371
return err
7472
}
@@ -84,9 +82,6 @@ func (c *connect) process(ctx context.Context, on *onProcess) error {
8482
}
8583

8684
func (c *connect) handle(ctx context.Context, packet byte, on *onProcess) error {
87-
c.rwLock.Lock()
88-
defer c.rwLock.Unlock()
89-
9085
switch packet {
9186
case proto.ServerData, proto.ServerTotals, proto.ServerExtremes:
9287
block, err := c.readData(ctx, packet, true)

conn_send_query.go

-3
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ import (
2424
// Connection::sendQuery
2525
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
2626
func (c *connect) sendQuery(body string, o *QueryOptions) error {
27-
c.rwLock.Lock()
28-
defer c.rwLock.Unlock()
29-
3027
c.debugf("[send query] compression=%q %s", c.compression, body)
3128
c.buffer.PutByte(proto.ClientQuery)
3229
q := proto.Query{

tests/conn_test.go

-55
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"fmt"
2424
"os"
2525
"strconv"
26-
"sync"
2726
"testing"
2827
"time"
2928

@@ -306,57 +305,3 @@ func TestEmptyDatabaseConfig(t *testing.T) {
306305
err = anotherConn.Ping(context.Background())
307306
require.NoError(t, err)
308307
}
309-
310-
func TestConnectionExpiresIdleConnection(t *testing.T) {
311-
runInDocker, _ := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_DOCKER", "true"))
312-
if !runInDocker {
313-
t.Skip("Skip test in cloud environment. This test is not stable in cloud environment, due to race conditions.")
314-
}
315-
316-
// given
317-
ctx := context.Background()
318-
testEnv, err := GetTestEnvironment(testSet)
319-
require.NoError(t, err)
320-
321-
baseConn, err := testClientWithDefaultSettings(testEnv)
322-
require.NoError(t, err)
323-
324-
expectedConnections := getActiveConnections(t, baseConn)
325-
326-
// when the client is configured to expire idle connections after 1/10 of a second
327-
opts := clientOptionsFromEnv(testEnv, clickhouse.Settings{})
328-
opts.MaxIdleConns = 20
329-
opts.MaxOpenConns = 20
330-
opts.ConnMaxLifetime = time.Second / 10
331-
conn, err := clickhouse.Open(&opts)
332-
require.NoError(t, err)
333-
334-
// run 1000 queries in parallel
335-
var wg sync.WaitGroup
336-
const selectToRunAtOnce = 1000
337-
for i := 0; i < selectToRunAtOnce; i++ {
338-
wg.Add(1)
339-
go func() {
340-
defer wg.Done()
341-
r, err := conn.Query(ctx, "SELECT 1")
342-
require.NoError(t, err)
343-
344-
r.Close()
345-
}()
346-
}
347-
wg.Wait()
348-
349-
// then we expect that all connections will be closed when they are idle
350-
// retrying for 10 seconds to make sure that the connections are closed
351-
assert.Eventuallyf(t, func() bool {
352-
return getActiveConnections(t, baseConn) == expectedConnections
353-
}, time.Second*10, opts.ConnMaxLifetime, "expected connections to be reset back to %d", expectedConnections)
354-
}
355-
356-
func getActiveConnections(t *testing.T, client clickhouse.Conn) (conns int64) {
357-
ctx := context.Background()
358-
r := client.QueryRow(ctx, "SELECT sum(value) as conns FROM system.metrics WHERE metric LIKE '%Connection'")
359-
require.NoError(t, r.Err())
360-
require.NoError(t, r.Scan(&conns))
361-
return conns
362-
}

0 commit comments

Comments
 (0)