diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index c78f8fbbca1..0c7dbf78367 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -1218,14 +1218,52 @@ public void asyncCreateLedgerAdv(final long ledgerId,
*/
public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
final OpenCallback cb, final Object ctx) {
+ asyncOpenLedger(lId, digestType, passwd, cb, ctx, false);
+ }
+
+ /**
+ * Open existing ledger asynchronously for reading.
+ *
+ *
Opening a ledger with this method invokes fencing and recovery on the ledger
+ * if the ledger has not been closed. Fencing will block all other clients from
+ * writing to the ledger. Recovery will make sure that the ledger is closed
+ * before reading from it.
+ *
+ *
Recovery also makes sure that any entries which reached one bookie, but not a
+ * quorum, will be replicated to a quorum of bookies. This occurs in cases were
+ * the writer of a ledger crashes after sending a write request to one bookie but
+ * before being able to send it to the rest of the bookies in the quorum.
+ *
+ *
If the ledger is already closed, neither fencing nor recovery will be applied.
+ *
+ * @see LedgerHandle#asyncClose
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param ctx
+ * optional control object
+ * @param keepUpdateMetadata
+ * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble.
+ */
+ public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
+ final OpenCallback cb, final Object ctx, boolean keepUpdateMetadata) {
closeLock.readLock().lock();
try {
if (closed) {
cb.openComplete(BKException.Code.ClientClosedException, null, ctx);
return;
}
- new LedgerOpenOp(BookKeeper.this, clientStats,
- lId, digestType, passwd, cb, ctx).initiate();
+ LedgerOpenOp ledgerOpenOp = new LedgerOpenOp(BookKeeper.this, clientStats,
+ lId, digestType, passwd, cb, ctx);
+ if (keepUpdateMetadata) {
+ ledgerOpenOp.initiateWithKeepUpdateMetadata();
+ } else {
+ ledgerOpenOp.initiate();
+ }
} finally {
closeLock.readLock().unlock();
}
@@ -1293,13 +1331,36 @@ public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestTyp
*/
public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd)
throws BKException, InterruptedException {
+ return openLedger(lId, digestType, passwd, false);
+ }
+
+
+ /**
+ * Synchronous open ledger call.
+ *
+ * @see #asyncOpenLedger
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ *
+ * @param keepUpdateMetadata
+ * Whether update ledger metadata if the auto-recover component modified the ledger's ensemble.
+ * @return a handle to the open ledger
+ * @throws InterruptedException
+ * @throws BKException
+ */
+ public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd, boolean keepUpdateMetadata)
+ throws BKException, InterruptedException {
CompletableFuture future = new CompletableFuture<>();
SyncOpenCallback result = new SyncOpenCallback(future);
/*
* Calls async open ledger
*/
- asyncOpenLedger(lId, digestType, passwd, result, null);
+ asyncOpenLedger(lId, digestType, passwd, result, null, keepUpdateMetadata);
return SyncCallbackUtils.waitForResult(future);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 943aa8cd2a9..1101a7d4e1a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -57,6 +57,18 @@ class LedgerOpenOp {
ReadOnlyLedgerHandle lh;
final byte[] passwd;
boolean doRecovery = true;
+ // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may rewrite
+ // the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an opened ledger
+ // handle in memory still accesses to a BK instance who has been decommissioned. The issue that solved happens as
+ // follows:
+ // 1. Client service open a readonly ledger handle, which has been closed.
+ // 2. All BKs that relates to the ledger have been decommissioned.
+ // 3. Auto recovery component moved the data into other BK instances who is alive.
+ // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, and the
+ // connection will always fail.
+ // For minimum modification, to add a new configuration named "keepUpdateMetadata", users can use the
+ // new API to create a readonly ledger handle that will auto-updates metadata.
+ boolean keepUpdateMetadata = false;
boolean administrativeOpen = false;
long startTime;
final OpStatsLogger openOpLogger;
@@ -126,6 +138,15 @@ public void initiateWithoutRecovery() {
initiate();
}
+ /**
+ * Different with {@link #initiate()}, the method keep update metadata once the auto-recover component modified
+ * the ensemble.
+ */
+ public void initiateWithKeepUpdateMetadata() {
+ this.keepUpdateMetadata = true;
+ initiate();
+ }
+
private CompletableFuture closeLedgerHandleAsync() {
if (lh != null) {
return lh.closeAsync();
@@ -175,8 +196,19 @@ private void openWithMetadata(Versioned versionedMetadata) {
// get the ledger metadata back
try {
+ // The ledger metadata may be modified even if it has been closed, because the auto-recovery component may
+ // rewrite the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an
+ // opened ledger handle in memory still accesses to a BK instance who has been decommissioned. The issue
+ // that solved happens as follows:
+ // 1. Client service open a readonly ledger handle, which has been closed.
+ // 2. All BKs that relates to the ledger have been decommissioned.
+ // 3. Auto recovery component moved the data into other BK instances who is alive.
+ // 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set,
+ // and the connection will always fail.
+ // Therefore, if a user needs to the feature that update metadata automatically, he will set
+ // "keepUpdateMetadata" to "true",
lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType,
- passwd, !doRecovery);
+ passwd, !doRecovery || keepUpdateMetadata);
} catch (GeneralSecurityException e) {
LOG.error("Security exception while opening ledger: " + ledgerId, e);
openComplete(BKException.Code.DigestNotInitializedException, null);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index 20b5e6a0c61..c412dffc07f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -634,13 +634,13 @@ private Stat watchUrLedgerNode(final String znode,
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDeleted) {
- LOG.info("Received Ledger rereplication completion event :"
- + event.getType());
+ LOG.info("Received Ledger replication completion. event : {}, path: {}, latchCount: {}",
+ event.getType(), event.getPath(), latch.getCount());
latch.countDown();
}
if (event.getType() == EventType.NodeCreated) {
- LOG.info("Received urLedger publishing event :"
- + event.getType());
+ LOG.info("Received urLedger publishing event: {}, path: {}, latchCount: {}",
+ event.getType(), event.getPath(), latch.getCount());
latch.countDown();
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java
new file mode 100644
index 00000000000..9c2d221d328
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/FullEnsembleDecommissionedTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests verifies the complete decommission tasks.
+ */
+public class FullEnsembleDecommissionedTest extends BookKeeperClusterTestCase {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(FullEnsembleDecommissionedTest.class);
+ private static final byte[] PASSWD = "admin".getBytes();
+ private static final byte[] data = "TESTDATA".getBytes();
+ private static final String openLedgerRereplicationGracePeriod = "3000"; // milliseconds
+
+ private DigestType digestType;
+ private MetadataClientDriver metadataClientDriver;
+ private LedgerManagerFactory mFactory;
+ private LedgerUnderreplicationManager underReplicationManager;
+ private LedgerManager ledgerManager;
+ private OrderedScheduler scheduler;
+
+ public FullEnsembleDecommissionedTest() throws Exception{
+ super(2);
+
+ baseConf.setLedgerManagerFactoryClassName(
+ "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+ baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod);
+ baseConf.setRwRereplicateBackoffMs(500);
+ baseClientConf.setLedgerManagerFactoryClassName(
+ "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+ this.digestType = DigestType.MAC;
+ setAutoRecoveryEnabled(true);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+
+ scheduler = OrderedScheduler.newSchedulerBuilder()
+ .name("test-scheduler")
+ .numThreads(1)
+ .build();
+
+ metadataClientDriver = MetadataDrivers.getClientDriver(
+ URI.create(baseClientConf.getMetadataServiceUri()));
+ metadataClientDriver.initialize(
+ baseClientConf,
+ scheduler,
+ NullStatsLogger.INSTANCE,
+ Optional.empty());
+
+ // initialize urReplicationManager
+ mFactory = metadataClientDriver.getLedgerManagerFactory();
+ underReplicationManager = mFactory.newLedgerUnderreplicationManager();
+ ledgerManager = mFactory.newLedgerManager();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ if (null != underReplicationManager) {
+ underReplicationManager.close();
+ underReplicationManager = null;
+ }
+ if (null != ledgerManager) {
+ ledgerManager.close();
+ ledgerManager = null;
+ }
+ if (null != metadataClientDriver) {
+ metadataClientDriver.close();
+ metadataClientDriver = null;
+ }
+ if (null != scheduler) {
+ scheduler.shutdown();
+ }
+ }
+
+ /**
+ * The purpose of this test:
+ * 1. Client service open a readonly ledger handle, which has been closed.
+ * 2. All BKs that relates to the ledger have been decommissioned.
+ * 3. Auto recovery component moved the data into other BK instances who is alive.
+ * 4. Verify: lhe ledger handle in the client memory keeps updating the ledger ensemble set, and the new read
+ * request works.
+ */
+ @Test
+ public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Exception {
+ LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD);
+ assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() == 2);
+ lh.addEntry(data);
+ lh.close();
+ List originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L);
+ LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, true);
+ assertTrue(originalEnsemble.size() == 2);
+
+ startNewBookie();
+ BookieServer newBookieServer3 = serverByIndex(lastBookieIndex());
+ killBookie(originalEnsemble.get(0));
+ waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0), newBookieServer3.getBookieId());
+
+ startNewBookie();
+ int newBookieIndex4 = lastBookieIndex();
+ BookieServer newBookieServer4 = serverByIndex(newBookieIndex4);
+ killBookie(originalEnsemble.get(1));
+ waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1), newBookieServer4.getBookieId());
+
+ Awaitility.await().untilAsserted(() -> {
+ LedgerEntries ledgerEntries = readonlyLh.read(0, 0);
+ assertNotNull(ledgerEntries);
+ byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes();
+ assertEquals(new String(data), new String(entryBytes));
+ ledgerEntries.close();
+ });
+ readonlyLh.close();
+ }
+
+ private void waitAutoRecoveryFinished(long lId, BookieId originalBookie,
+ BookieId newBookie) throws Exception {
+ Awaitility.await().untilAsserted(() -> {
+ LedgerHandle openLedger = bkc.openLedger(lId, digestType, PASSWD);
+ NavigableMap> map = openLedger.getLedgerMetadata().getAllEnsembles();
+ try {
+ for (Map.Entry> entry : map.entrySet()) {
+ assertFalse(entry.getValue().contains(originalBookie));
+ assertTrue(entry.getValue().contains(newBookie));
+ }
+ } finally {
+ openLedger.close();
+ }
+ });
+ }
+}