From 4bab8f63d43e398124757ecad9326dfa2e9b5282 Mon Sep 17 00:00:00 2001 From: Kevin Wooten Date: Sun, 3 Feb 2019 16:52:47 -0700 Subject: [PATCH] Fix server connection listener being collected early (#381) There was no hard reference to the `ServerConnection.Listener` created in `BasicContext` whic caused it to be collected early. This adds a hard reference inside `BasicContext` to keep it alive. This caused a failure in LeakTest due to a mix of self references from anonymous classes and weak references; though the exact cause was mysterious. The anonymous classes were for caching and all moved into a new `CacheMap` class and weak references were used when back connections were required to fix the LeakTest and simplify code. --- .../postgres/jdbc/PGDirectConnection.java | 69 ++++++------------- .../postgres/system/BasicContext.java | 12 +++- .../impossibl/postgres/utils/CacheMap.java | 64 +++++++++++++++++ .../com/impossibl/postgres/jdbc/LeakTest.java | 3 - 4 files changed, 95 insertions(+), 53 deletions(-) create mode 100644 driver/src/main/java/com/impossibl/postgres/utils/CacheMap.java diff --git a/driver/src/main/java/com/impossibl/postgres/jdbc/PGDirectConnection.java b/driver/src/main/java/com/impossibl/postgres/jdbc/PGDirectConnection.java index 4082ebb3b..3bc8e77d8 100644 --- a/driver/src/main/java/com/impossibl/postgres/jdbc/PGDirectConnection.java +++ b/driver/src/main/java/com/impossibl/postgres/jdbc/PGDirectConnection.java @@ -49,6 +49,7 @@ import com.impossibl.postgres.types.SharedRegistry; import com.impossibl.postgres.types.Type; import com.impossibl.postgres.utils.BlockingReadTimeoutException; +import com.impossibl.postgres.utils.CacheMap; import static com.impossibl.postgres.jdbc.ErrorUtils.chainWarnings; import static com.impossibl.postgres.jdbc.ErrorUtils.makeSQLException; @@ -118,10 +119,8 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -140,6 +139,7 @@ import static java.sql.ResultSet.TYPE_FORWARD_ONLY; import static java.sql.Statement.RETURN_GENERATED_KEYS; import static java.util.Arrays.asList; +import static java.util.Collections.synchronizedMap; import static java.util.Collections.unmodifiableMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -227,37 +227,19 @@ public void run() { final int descriptionCacheSize = getSetting(DESCRIPTION_CACHE_SIZE); if (descriptionCacheSize > 0) { - this.descriptionCache = Collections.synchronizedMap(new LinkedHashMap(descriptionCacheSize + 1, 1.1f, true) { - private static final long serialVersionUID = 1L; - - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > descriptionCacheSize; - } - }); + this.descriptionCache = synchronizedMap(new CacheMap<>(descriptionCacheSize, 1.1f, true)); } final int statementCacheSize = getSetting(PREPARED_STATEMENT_CACHE_SIZE); if (statementCacheSize > 0) { - preparedStatementCache = Collections.synchronizedMap(new LinkedHashMap(statementCacheSize + 1, 1.1f, true) { - private static final long serialVersionUID = 1L; - - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - if (size() > statementCacheSize) { - try { - PGStatement.dispose(PGDirectConnection.this, eldest.getValue().name); - } - catch (SQLException e) { - // Ignore... - } - return true; - } - else { - return false; - } + WeakReference weakThis = new WeakReference<>(this); + preparedStatementCache = synchronizedMap(new CacheMap<>(statementCacheSize, 1.1f, true, eldest -> { + try { + PGStatement.dispose(weakThis.get(), eldest.getValue().name); + } + catch (SQLException ignored) { } - }); + })); } final int statementCacheThreshold = getSetting(PREPARED_STATEMENT_CACHE_THRESHOLD); @@ -270,14 +252,7 @@ protected boolean removeEldestEntry(Map.Entry 0) { synchronized (PGDirectConnection.class) { if (parsedSqlCache == null) { - parsedSqlCache = Collections.synchronizedMap(new LinkedHashMap(sqlCacheSize + 1, 1.1f, true) { - private static final long serialVersionUID = 1L; - - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > sqlCacheSize; - } - }); + parsedSqlCache = synchronizedMap(new CacheMap<>(sqlCacheSize, 1.1f, true)); } } } @@ -290,7 +265,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { this.housekeeper = housekeeper; if (this.housekeeper != null) - this.cleanupKey = this.housekeeper.add(this, new Cleanup(serverConnection, activeStatements, getSetting(DATABASE_URL))); + this.cleanupKey = this.housekeeper.add(this, new Cleanup(getServerConnection(), activeStatements, getSetting(DATABASE_URL))); else this.cleanupKey = null; } @@ -317,7 +292,7 @@ private void applySettings(Settings settings) throws IOException { public TransactionStatus getTransactionStatus() throws SQLException { try { - return serverConnection.getTransactionStatus(); + return getServerConnection().getTransactionStatus(); } catch (ClosedChannelException e) { internalClose(); @@ -496,8 +471,8 @@ interface QueryResultFunction { T execute(QueryResultFunction function) throws SQLException { try { - if (!autoCommit && serverConnection.getTransactionStatus() == Idle) { - serverConnection.getRequestExecutor().lazyExecute("TB"); + if (!autoCommit && getTransactionStatus() == Idle) { + getRequestExecutor().lazyExecute("TB"); } return function.query(networkTimeout); @@ -517,7 +492,7 @@ T execute(QueryResultFunction function) throws SQLException { } catch (IOException e) { - if (!serverConnection.isConnected()) { + if (!getServerConnection().isConnected()) { internalClose(); } @@ -537,13 +512,13 @@ T executeTimed(Long executionTimeout, QueryResultFunction function) throw // This ensures we don't cancel a request _after_ this one // by mistake. - synchronized (serverConnection.getRequestExecutor()) { + synchronized (getRequestExecutor()) { // Schedule task to run at execution timeout - ExecutionTimerTask task = new CancelRequestTask(serverConnection.getRemoteAddress(), getKeyData()); + ExecutionTimerTask task = new CancelRequestTask(getServerConnection().getRemoteAddress(), getKeyData()); - ScheduledFuture taskHandle = serverConnection.getIOExecutor().schedule(task, executionTimeout, MILLISECONDS); + ScheduledFuture taskHandle = getServerConnection().getIOExecutor().schedule(task, executionTimeout, MILLISECONDS); try { @@ -721,7 +696,7 @@ protected void connectionClosed() { */ @Override public boolean isServerMinimumVersion(int major, int minor) { - return serverConnection.getServerInfo().getVersion().isMinimum(major, minor); + return getServerConnection().getServerInfo().getVersion().isMinimum(major, minor); } @Override @@ -1355,7 +1330,7 @@ public NClob createNClob() throws SQLException { @Override public boolean isClosed() { - return !serverConnection.isConnected(); + return !getServerConnection().isConnected(); } @Override @@ -1375,7 +1350,7 @@ public void abort(Executor executor) { return; // Save socket address (as shutdown might erase it) - SocketAddress serverAddress = serverConnection.getRemoteAddress(); + SocketAddress serverAddress = getServerConnection().getRemoteAddress(); //Shutdown socket (also guarantees no more commands begin execution) ChannelFuture shutdown = shutdown(); diff --git a/driver/src/main/java/com/impossibl/postgres/system/BasicContext.java b/driver/src/main/java/com/impossibl/postgres/system/BasicContext.java index 67b7e1b49..bfa31ae24 100644 --- a/driver/src/main/java/com/impossibl/postgres/system/BasicContext.java +++ b/driver/src/main/java/com/impossibl/postgres/system/BasicContext.java @@ -161,6 +161,7 @@ public Type load(String name) throws IOException { protected Registry registry; protected Map> typeMap; protected Charset charset; + protected Settings settings; private TimeZone timeZone; private ZoneId timeZoneId; private DateTimeFormat dateFormat; @@ -169,8 +170,8 @@ public Type load(String name) throws IOException { private NumberFormat integerFormatter; private DecimalFormat decimalFormatter; private DecimalFormat currencyFormatter; - protected Settings settings; - protected ServerConnection serverConnection; + private ServerConnection serverConnection; + private ServerConnectionListener serverConnectionListener; private Map utilQueries; @@ -182,7 +183,8 @@ public BasicContext(SocketAddress address, Settings settings) throws IOException this.dateFormat = new ISODateFormat(); this.timeFormat = new ISOTimeFormat(); this.timestampFormat = new ISOTimestampFormat(); - this.serverConnection = ServerConnectionFactory.getDefault().connect(this, address, new ServerConnectionListener()); + this.serverConnectionListener = new ServerConnectionListener(); + this.serverConnection = ServerConnectionFactory.getDefault().connect(this, address, serverConnectionListener); this.utilQueries = new HashMap<>(); } @@ -210,6 +212,10 @@ public ByteBufAllocator getAllocator() { return serverConnection.getAllocator(); } + protected ServerConnection getServerConnection() { + return serverConnection; + } + @Override public Registry getRegistry() { return registry; diff --git a/driver/src/main/java/com/impossibl/postgres/utils/CacheMap.java b/driver/src/main/java/com/impossibl/postgres/utils/CacheMap.java new file mode 100644 index 000000000..deb10f2ca --- /dev/null +++ b/driver/src/main/java/com/impossibl/postgres/utils/CacheMap.java @@ -0,0 +1,64 @@ +/** + * Copyright (c) 2013, impossibl.com + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of impossibl.com nor the names of its contributors may + * be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +package com.impossibl.postgres.utils; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Consumer; + +public class CacheMap extends LinkedHashMap { + + private int maxSize; + private Consumer> evictionHandler; + + public CacheMap(int maxSize, float loadFactor, boolean accessOrder) { + super(maxSize + 1, loadFactor, accessOrder); + this.maxSize = maxSize; + } + + public CacheMap(int maxSize, float loadFactor, boolean accessOrder, Consumer> evictionHandler) { + super(maxSize + 1, loadFactor, accessOrder); + this.maxSize = maxSize; + this.evictionHandler = evictionHandler; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + if (size() > maxSize) { + if (evictionHandler != null) { + evictionHandler.accept(eldest); + } + return true; + } + else { + return false; + } + } + +} diff --git a/driver/src/test/java/com/impossibl/postgres/jdbc/LeakTest.java b/driver/src/test/java/com/impossibl/postgres/jdbc/LeakTest.java index ccda34db5..f70c78889 100644 --- a/driver/src/test/java/com/impossibl/postgres/jdbc/LeakTest.java +++ b/driver/src/test/java/com/impossibl/postgres/jdbc/LeakTest.java @@ -28,7 +28,6 @@ */ package com.impossibl.postgres.jdbc; -import java.lang.ref.WeakReference; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -54,14 +53,12 @@ @RunWith(JUnit4.class) public class LeakTest { - WeakReference connRef; Connection conn; ResourceLeakDetector.Level savedLevel; @Before public void before() throws Exception { conn = TestUtil.openDB(); - connRef = new WeakReference<>(conn); savedLevel = ResourceLeakDetector.getLevel(); ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); getHousekeeper().setLogLeakedReferences(false);