Skip to content

[fix] Failed read entries after multiple decommissioning #4613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -1218,14 +1218,52 @@ public void asyncCreateLedgerAdv(final long ledgerId,
*/
public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
final OpenCallback cb, final Object ctx) {
asyncOpenLedger(lId, digestType, passwd, cb, ctx, false);
}

/**
* Open existing ledger asynchronously for reading.
*
* <p>Opening a ledger with this method invokes fencing and recovery on the ledger
* if the ledger has not been closed. Fencing will block all other clients from
* writing to the ledger. Recovery will make sure that the ledger is closed
* before reading from it.
*
* <p>Recovery also makes sure that any entries which reached one bookie, but not a
* quorum, will be replicated to a quorum of bookies. This occurs in cases were
* the writer of a ledger crashes after sending a write request to one bookie but
* before being able to send it to the rest of the bookies in the quorum.
*
* <p>If the ledger is already closed, neither fencing nor recovery will be applied.
*
* @see LedgerHandle#asyncClose
*
* @param lId
* ledger identifier
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
* @param ctx
* optional control object
* @param keepUpdateMetadata
* Whether update ledger metadata if the auto-recover component modified the ledger's ensemble.
*/
public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
final OpenCallback cb, final Object ctx, boolean keepUpdateMetadata) {
closeLock.readLock().lock();
try {
if (closed) {
cb.openComplete(BKException.Code.ClientClosedException, null, ctx);
return;
}
new LedgerOpenOp(BookKeeper.this, clientStats,
lId, digestType, passwd, cb, ctx).initiate();
LedgerOpenOp ledgerOpenOp = new LedgerOpenOp(BookKeeper.this, clientStats,
lId, digestType, passwd, cb, ctx);
if (keepUpdateMetadata) {
ledgerOpenOp.initiateWithKeepUpdateMetadata();
} else {
ledgerOpenOp.initiate();
}
} finally {
closeLock.readLock().unlock();
}
Expand Down Expand Up @@ -1293,13 +1331,36 @@ public void asyncOpenLedgerNoRecovery(final long lId, final DigestType digestTyp
*/
public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd)
throws BKException, InterruptedException {
return openLedger(lId, digestType, passwd, false);
}


/**
* Synchronous open ledger call.
*
* @see #asyncOpenLedger
* @param lId
* ledger identifier
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
* password
*
* @param keepUpdateMetadata
* Whether update ledger metadata if the auto-recover component modified the ledger's ensemble.
* @return a handle to the open ledger
* @throws InterruptedException
* @throws BKException
*/
public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd, boolean keepUpdateMetadata)
throws BKException, InterruptedException {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
SyncOpenCallback result = new SyncOpenCallback(future);

/*
* Calls async open ledger
*/
asyncOpenLedger(lId, digestType, passwd, result, null);
asyncOpenLedger(lId, digestType, passwd, result, null, keepUpdateMetadata);

return SyncCallbackUtils.waitForResult(future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ class LedgerOpenOp {
ReadOnlyLedgerHandle lh;
final byte[] passwd;
boolean doRecovery = true;
// The ledger metadata may be modified even if it has been closed, because the auto-recovery component may rewrite
// the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an opened ledger
// handle in memory still accesses to a BK instance who has been decommissioned. The issue that solved happens as
// follows:
// 1. Client service open a readonly ledger handle, which has been closed.
// 2. All BKs that relates to the ledger have been decommissioned.
// 3. Auto recovery component moved the data into other BK instances who is alive.
// 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set, and the
// connection will always fail.
// For minimum modification, to add a new configuration named "keepUpdateMetadata", users can use the
// new API to create a readonly ledger handle that will auto-updates metadata.
boolean keepUpdateMetadata = false;
boolean administrativeOpen = false;
long startTime;
final OpStatsLogger openOpLogger;
Expand Down Expand Up @@ -126,6 +138,15 @@ public void initiateWithoutRecovery() {
initiate();
}

/**
* Different with {@link #initiate()}, the method keep update metadata once the auto-recover component modified
* the ensemble.
*/
public void initiateWithKeepUpdateMetadata() {
this.keepUpdateMetadata = true;
initiate();
}

private CompletableFuture<Void> closeLedgerHandleAsync() {
if (lh != null) {
return lh.closeAsync();
Expand Down Expand Up @@ -175,8 +196,19 @@ private void openWithMetadata(Versioned<LedgerMetadata> versionedMetadata) {

// get the ledger metadata back
try {
// The ledger metadata may be modified even if it has been closed, because the auto-recovery component may
// rewrite the ledger's metadata. Keep receiving a notification from ZK to avoid the following issue: an
// opened ledger handle in memory still accesses to a BK instance who has been decommissioned. The issue
// that solved happens as follows:
// 1. Client service open a readonly ledger handle, which has been closed.
// 2. All BKs that relates to the ledger have been decommissioned.
// 3. Auto recovery component moved the data into other BK instances who is alive.
// 4. The ledger handle in the client memory keeps connects to the BKs who in the original ensemble set,
// and the connection will always fail.
// Therefore, if a user needs to the feature that update metadata automatically, he will set
// "keepUpdateMetadata" to "true",
lh = new ReadOnlyLedgerHandle(bk.getClientCtx(), ledgerId, versionedMetadata, digestType,
passwd, !doRecovery);
passwd, !doRecovery || keepUpdateMetadata);
} catch (GeneralSecurityException e) {
LOG.error("Security exception while opening ledger: " + ledgerId, e);
openComplete(BKException.Code.DigestNotInitializedException, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,13 +634,13 @@ private Stat watchUrLedgerNode(final String znode,
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDeleted) {
LOG.info("Received Ledger rereplication completion event :"
+ event.getType());
LOG.info("Received Ledger replication completion. event : {}, path: {}, latchCount: {}",
event.getType(), event.getPath(), latch.getCount());
latch.countDown();
}
if (event.getType() == EventType.NodeCreated) {
LOG.info("Received urLedger publishing event :"
+ event.getType());
LOG.info("Received urLedger publishing event: {}, path: {}, latchCount: {}",
event.getType(), event.getPath(), latch.getCount());
latch.countDown();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Integration tests verifies the complete decommission tasks.
*/
public class FullEnsembleDecommissionedTest extends BookKeeperClusterTestCase {
private static final Logger LOG = LoggerFactory
.getLogger(FullEnsembleDecommissionedTest.class);
private static final byte[] PASSWD = "admin".getBytes();
private static final byte[] data = "TESTDATA".getBytes();
private static final String openLedgerRereplicationGracePeriod = "3000"; // milliseconds

private DigestType digestType;
private MetadataClientDriver metadataClientDriver;
private LedgerManagerFactory mFactory;
private LedgerUnderreplicationManager underReplicationManager;
private LedgerManager ledgerManager;
private OrderedScheduler scheduler;

public FullEnsembleDecommissionedTest() throws Exception{
super(2);

baseConf.setLedgerManagerFactoryClassName(
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod);
baseConf.setRwRereplicateBackoffMs(500);
baseClientConf.setLedgerManagerFactoryClassName(
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
this.digestType = DigestType.MAC;
setAutoRecoveryEnabled(true);
}

@Override
public void setUp() throws Exception {
super.setUp();
baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());

scheduler = OrderedScheduler.newSchedulerBuilder()
.name("test-scheduler")
.numThreads(1)
.build();

metadataClientDriver = MetadataDrivers.getClientDriver(
URI.create(baseClientConf.getMetadataServiceUri()));
metadataClientDriver.initialize(
baseClientConf,
scheduler,
NullStatsLogger.INSTANCE,
Optional.empty());

// initialize urReplicationManager
mFactory = metadataClientDriver.getLedgerManagerFactory();
underReplicationManager = mFactory.newLedgerUnderreplicationManager();
ledgerManager = mFactory.newLedgerManager();
}

@Override
public void tearDown() throws Exception {
super.tearDown();

if (null != underReplicationManager) {
underReplicationManager.close();
underReplicationManager = null;
}
if (null != ledgerManager) {
ledgerManager.close();
ledgerManager = null;
}
if (null != metadataClientDriver) {
metadataClientDriver.close();
metadataClientDriver = null;
}
if (null != scheduler) {
scheduler.shutdown();
}
}

/**
* The purpose of this test:
* 1. Client service open a readonly ledger handle, which has been closed.
* 2. All BKs that relates to the ledger have been decommissioned.
* 3. Auto recovery component moved the data into other BK instances who is alive.
* 4. Verify: lhe ledger handle in the client memory keeps updating the ledger ensemble set, and the new read
* request works.
*/
@Test
public void testOpenedLedgerHandleStillWorkAfterDecommissioning() throws Exception {
LedgerHandle lh = bkc.createLedger(2, 2, digestType, PASSWD);
assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).size() == 2);
lh.addEntry(data);
lh.close();
List<BookieId> originalEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(0L);
LedgerHandle readonlyLh = bkc.openLedger(lh.getId(), digestType, PASSWD, true);
assertTrue(originalEnsemble.size() == 2);

startNewBookie();
BookieServer newBookieServer3 = serverByIndex(lastBookieIndex());
killBookie(originalEnsemble.get(0));
waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(0), newBookieServer3.getBookieId());

startNewBookie();
int newBookieIndex4 = lastBookieIndex();
BookieServer newBookieServer4 = serverByIndex(newBookieIndex4);
killBookie(originalEnsemble.get(1));
waitAutoRecoveryFinished(lh.getId(), originalEnsemble.get(1), newBookieServer4.getBookieId());

Awaitility.await().untilAsserted(() -> {
LedgerEntries ledgerEntries = readonlyLh.read(0, 0);
assertNotNull(ledgerEntries);
byte[] entryBytes = ledgerEntries.getEntry(0L).getEntryBytes();
assertEquals(new String(data), new String(entryBytes));
ledgerEntries.close();
});
readonlyLh.close();
}

private void waitAutoRecoveryFinished(long lId, BookieId originalBookie,
BookieId newBookie) throws Exception {
Awaitility.await().untilAsserted(() -> {
LedgerHandle openLedger = bkc.openLedger(lId, digestType, PASSWD);
NavigableMap<Long, ? extends List<BookieId>> map = openLedger.getLedgerMetadata().getAllEnsembles();
try {
for (Map.Entry<Long, ? extends List<BookieId>> entry : map.entrySet()) {
assertFalse(entry.getValue().contains(originalBookie));
assertTrue(entry.getValue().contains(newBookie));
}
} finally {
openLedger.close();
}
});
}
}
Loading