Skip to content

Commit

Permalink
Fix server connection listener being collected early (#381)
Browse files Browse the repository at this point in the history
There was no hard reference to the `ServerConnection.Listener` created in `BasicContext` whic caused it to be collected early.

This adds a hard reference inside `BasicContext` to keep it alive. This caused a failure in LeakTest due to a mix of self references from anonymous classes and weak references; though the exact cause was mysterious.

The anonymous classes were for caching and all moved into a new `CacheMap` class and weak references were used when back connections were required to fix the LeakTest and simplify code.
  • Loading branch information
kdubb authored Feb 3, 2019
1 parent c7bf108 commit 4bab8f6
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.impossibl.postgres.types.SharedRegistry;
import com.impossibl.postgres.types.Type;
import com.impossibl.postgres.utils.BlockingReadTimeoutException;
import com.impossibl.postgres.utils.CacheMap;

import static com.impossibl.postgres.jdbc.ErrorUtils.chainWarnings;
import static com.impossibl.postgres.jdbc.ErrorUtils.makeSQLException;
Expand Down Expand Up @@ -118,10 +119,8 @@
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -140,6 +139,7 @@
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
import static java.sql.Statement.RETURN_GENERATED_KEYS;
import static java.util.Arrays.asList;
import static java.util.Collections.synchronizedMap;
import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -227,37 +227,19 @@ public void run() {

final int descriptionCacheSize = getSetting(DESCRIPTION_CACHE_SIZE);
if (descriptionCacheSize > 0) {
this.descriptionCache = Collections.synchronizedMap(new LinkedHashMap<StatementCacheKey, StatementDescription>(descriptionCacheSize + 1, 1.1f, true) {
private static final long serialVersionUID = 1L;

@Override
protected boolean removeEldestEntry(Map.Entry<StatementCacheKey, StatementDescription> eldest) {
return size() > descriptionCacheSize;
}
});
this.descriptionCache = synchronizedMap(new CacheMap<>(descriptionCacheSize, 1.1f, true));
}

final int statementCacheSize = getSetting(PREPARED_STATEMENT_CACHE_SIZE);
if (statementCacheSize > 0) {
preparedStatementCache = Collections.synchronizedMap(new LinkedHashMap<StatementCacheKey, PreparedStatementDescription>(statementCacheSize + 1, 1.1f, true) {
private static final long serialVersionUID = 1L;

@Override
protected boolean removeEldestEntry(Map.Entry<StatementCacheKey, PreparedStatementDescription> eldest) {
if (size() > statementCacheSize) {
try {
PGStatement.dispose(PGDirectConnection.this, eldest.getValue().name);
}
catch (SQLException e) {
// Ignore...
}
return true;
}
else {
return false;
}
WeakReference<PGDirectConnection> weakThis = new WeakReference<>(this);
preparedStatementCache = synchronizedMap(new CacheMap<>(statementCacheSize, 1.1f, true, eldest -> {
try {
PGStatement.dispose(weakThis.get(), eldest.getValue().name);
}
catch (SQLException ignored) {
}
});
}));
}

final int statementCacheThreshold = getSetting(PREPARED_STATEMENT_CACHE_THRESHOLD);
Expand All @@ -270,14 +252,7 @@ protected boolean removeEldestEntry(Map.Entry<StatementCacheKey, PreparedStateme
if (sqlCacheSize > 0) {
synchronized (PGDirectConnection.class) {
if (parsedSqlCache == null) {
parsedSqlCache = Collections.synchronizedMap(new LinkedHashMap<String, SQLText>(sqlCacheSize + 1, 1.1f, true) {
private static final long serialVersionUID = 1L;

@Override
protected boolean removeEldestEntry(Map.Entry<String, SQLText> eldest) {
return size() > sqlCacheSize;
}
});
parsedSqlCache = synchronizedMap(new CacheMap<>(sqlCacheSize, 1.1f, true));
}
}
}
Expand All @@ -290,7 +265,7 @@ protected boolean removeEldestEntry(Map.Entry<String, SQLText> eldest) {

this.housekeeper = housekeeper;
if (this.housekeeper != null)
this.cleanupKey = this.housekeeper.add(this, new Cleanup(serverConnection, activeStatements, getSetting(DATABASE_URL)));
this.cleanupKey = this.housekeeper.add(this, new Cleanup(getServerConnection(), activeStatements, getSetting(DATABASE_URL)));
else
this.cleanupKey = null;
}
Expand All @@ -317,7 +292,7 @@ private void applySettings(Settings settings) throws IOException {

public TransactionStatus getTransactionStatus() throws SQLException {
try {
return serverConnection.getTransactionStatus();
return getServerConnection().getTransactionStatus();
}
catch (ClosedChannelException e) {
internalClose();
Expand Down Expand Up @@ -496,8 +471,8 @@ interface QueryResultFunction<T> {
<T> T execute(QueryResultFunction<T> function) throws SQLException {

try {
if (!autoCommit && serverConnection.getTransactionStatus() == Idle) {
serverConnection.getRequestExecutor().lazyExecute("TB");
if (!autoCommit && getTransactionStatus() == Idle) {
getRequestExecutor().lazyExecute("TB");
}

return function.query(networkTimeout);
Expand All @@ -517,7 +492,7 @@ <T> T execute(QueryResultFunction<T> function) throws SQLException {
}
catch (IOException e) {

if (!serverConnection.isConnected()) {
if (!getServerConnection().isConnected()) {
internalClose();
}

Expand All @@ -537,13 +512,13 @@ <T> T executeTimed(Long executionTimeout, QueryResultFunction<T> function) throw
// This ensures we don't cancel a request _after_ this one
// by mistake.

synchronized (serverConnection.getRequestExecutor()) {
synchronized (getRequestExecutor()) {

// Schedule task to run at execution timeout

ExecutionTimerTask task = new CancelRequestTask(serverConnection.getRemoteAddress(), getKeyData());
ExecutionTimerTask task = new CancelRequestTask(getServerConnection().getRemoteAddress(), getKeyData());

ScheduledFuture<?> taskHandle = serverConnection.getIOExecutor().schedule(task, executionTimeout, MILLISECONDS);
ScheduledFuture<?> taskHandle = getServerConnection().getIOExecutor().schedule(task, executionTimeout, MILLISECONDS);

try {

Expand Down Expand Up @@ -721,7 +696,7 @@ protected void connectionClosed() {
*/
@Override
public boolean isServerMinimumVersion(int major, int minor) {
return serverConnection.getServerInfo().getVersion().isMinimum(major, minor);
return getServerConnection().getServerInfo().getVersion().isMinimum(major, minor);
}

@Override
Expand Down Expand Up @@ -1355,7 +1330,7 @@ public NClob createNClob() throws SQLException {

@Override
public boolean isClosed() {
return !serverConnection.isConnected();
return !getServerConnection().isConnected();
}

@Override
Expand All @@ -1375,7 +1350,7 @@ public void abort(Executor executor) {
return;

// Save socket address (as shutdown might erase it)
SocketAddress serverAddress = serverConnection.getRemoteAddress();
SocketAddress serverAddress = getServerConnection().getRemoteAddress();

//Shutdown socket (also guarantees no more commands begin execution)
ChannelFuture shutdown = shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public Type load(String name) throws IOException {
protected Registry registry;
protected Map<String, Class<?>> typeMap;
protected Charset charset;
protected Settings settings;
private TimeZone timeZone;
private ZoneId timeZoneId;
private DateTimeFormat dateFormat;
Expand All @@ -169,8 +170,8 @@ public Type load(String name) throws IOException {
private NumberFormat integerFormatter;
private DecimalFormat decimalFormatter;
private DecimalFormat currencyFormatter;
protected Settings settings;
protected ServerConnection serverConnection;
private ServerConnection serverConnection;
private ServerConnectionListener serverConnectionListener;
private Map<String, QueryDescription> utilQueries;


Expand All @@ -182,7 +183,8 @@ public BasicContext(SocketAddress address, Settings settings) throws IOException
this.dateFormat = new ISODateFormat();
this.timeFormat = new ISOTimeFormat();
this.timestampFormat = new ISOTimestampFormat();
this.serverConnection = ServerConnectionFactory.getDefault().connect(this, address, new ServerConnectionListener());
this.serverConnectionListener = new ServerConnectionListener();
this.serverConnection = ServerConnectionFactory.getDefault().connect(this, address, serverConnectionListener);
this.utilQueries = new HashMap<>();
}

Expand Down Expand Up @@ -210,6 +212,10 @@ public ByteBufAllocator getAllocator() {
return serverConnection.getAllocator();
}

protected ServerConnection getServerConnection() {
return serverConnection;
}

@Override
public Registry getRegistry() {
return registry;
Expand Down
64 changes: 64 additions & 0 deletions driver/src/main/java/com/impossibl/postgres/utils/CacheMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2013, impossibl.com
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of impossibl.com nor the names of its contributors may
* be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package com.impossibl.postgres.utils;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Consumer;

public class CacheMap<K, V> extends LinkedHashMap<K, V> {

private int maxSize;
private Consumer<Map.Entry<K, V>> evictionHandler;

public CacheMap(int maxSize, float loadFactor, boolean accessOrder) {
super(maxSize + 1, loadFactor, accessOrder);
this.maxSize = maxSize;
}

public CacheMap(int maxSize, float loadFactor, boolean accessOrder, Consumer<Map.Entry<K, V>> evictionHandler) {
super(maxSize + 1, loadFactor, accessOrder);
this.maxSize = maxSize;
this.evictionHandler = evictionHandler;
}

@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
if (size() > maxSize) {
if (evictionHandler != null) {
evictionHandler.accept(eldest);
}
return true;
}
else {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
*/
package com.impossibl.postgres.jdbc;

import java.lang.ref.WeakReference;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -54,14 +53,12 @@
@RunWith(JUnit4.class)
public class LeakTest {

WeakReference<Connection> connRef;
Connection conn;
ResourceLeakDetector.Level savedLevel;

@Before
public void before() throws Exception {
conn = TestUtil.openDB();
connRef = new WeakReference<>(conn);
savedLevel = ResourceLeakDetector.getLevel();
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
getHousekeeper().setLogLeakedReferences(false);
Expand Down

0 comments on commit 4bab8f6

Please sign in to comment.