From aedfd66b88f1d0a6a00345c5972e4423c5e82a19 Mon Sep 17 00:00:00 2001 From: "b.tian" Date: Wed, 17 Jan 2024 11:33:50 +0800 Subject: [PATCH 1/2] fix prewrite & commit log info to debug(#776) --- .../java/org/tikv/txn/TwoPhaseCommitter.java | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java index 550b7de47ac..b91a103b118 100644 --- a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java +++ b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java @@ -227,8 +227,9 @@ private void doCommitPrimaryKeyWithRetry(BackOffer backOffer, ByteString key, lo this.doCommitPrimaryKeyWithRetry(backOffer, key, commitTs); } } - - LOG.info("commit primary key {} successfully", KeyUtils.formatBytes(key)); + if (commitResult.isSuccess() && LOG.isDebugEnabled()) { + LOG.debug("commit primary key {} successfully", KeyUtils.formatBytes(key)); + } } /** @@ -397,11 +398,13 @@ private void doPrewriteSecondaryKeySingleBatchWithRetry( BatchKeys batchKeys, Map mutations) throws TiBatchWriteException { - LOG.debug( - "start prewrite secondary key, row={}, size={}KB, regionId={}", - batchKeys.getKeys().size(), - batchKeys.getSizeInKB(), - batchKeys.getRegion().getId()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "start prewrite secondary key, row={}, size={}KB, regionId={}", + batchKeys.getKeys().size(), + batchKeys.getSizeInKB(), + batchKeys.getRegion().getId()); + } List keyList = batchKeys.getKeys(); int batchSize = keyList.size(); @@ -445,11 +448,13 @@ private void doPrewriteSecondaryKeySingleBatchWithRetry( throw new TiBatchWriteException(errorMsg, e); } } - LOG.debug( - "prewrite secondary key successfully, row={}, size={}KB, regionId={}", - batchKeys.getKeys().size(), - batchKeys.getSizeInKB(), - batchKeys.getRegion().getId()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "prewrite secondary key successfully, row={}, size={}KB, regionId={}", + batchKeys.getKeys().size(), + batchKeys.getSizeInKB(), + batchKeys.getRegion().getId()); + } } private void appendBatchBySize( @@ -592,11 +597,13 @@ private void doCommitSecondaryKeysWithRetry( private void doCommitSecondaryKeySingleBatchWithRetry( BackOffer backOffer, BatchKeys batchKeys, long commitTs) throws TiBatchWriteException { - LOG.info( - "start commit secondary key, row={}, size={}KB, regionId={}", - batchKeys.getKeys().size(), - batchKeys.getSizeInKB(), - batchKeys.getRegion().getId()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "start commit secondary key, row={}, size={}KB, regionId={}", + batchKeys.getKeys().size(), + batchKeys.getSizeInKB(), + batchKeys.getRegion().getId()); + } List keysCommit = batchKeys.getKeys(); ByteString[] keys = new ByteString[keysCommit.size()]; keysCommit.toArray(keys); @@ -612,11 +619,13 @@ private void doCommitSecondaryKeySingleBatchWithRetry( LOG.warn(error); throw new TiBatchWriteException("commit secondary key error", commitResult.getException()); } - LOG.info( - "commit {} rows successfully, size={}KB, regionId={}", - batchKeys.getKeys().size(), - batchKeys.getSizeInKB(), - batchKeys.getRegion().getId()); + if (commitResult.isSuccess() && LOG.isDebugEnabled()) { + LOG.debug( + "commit {} rows successfully, size={}KB, regionId={}", + batchKeys.getKeys().size(), + batchKeys.getSizeInKB(), + batchKeys.getRegion().getId()); + } } private GroupKeyResult groupKeysByRegion(ByteString[] keys, int size, BackOffer backOffer) From 2be0907330136043cd62d27df1bbac7edb41679d Mon Sep 17 00:00:00 2001 From: "b.tian" Date: Wed, 17 Jan 2024 11:36:58 +0800 Subject: [PATCH 2/2] fix KeyException no retry & no detail error msg(#778) --- pom.xml | 12 ++ .../tikv/common/exception/KeyException.java | 2 +- .../tikv/common/region/RegionStoreClient.java | 2 +- src/main/java/org/tikv/txn/TxnKVClient.java | 2 - .../org/tikv/txn/TwoPhaseCommitterTest.java | 189 ++++++++++++++++++ 5 files changed, 203 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index dd34f060b3f..1ba5ddf30cb 100644 --- a/pom.xml +++ b/pom.xml @@ -265,6 +265,18 @@ simpleclient_pushgateway 0.10.0 + + org.powermock + powermock-module-junit4 + 2.0.2 + test + + + org.powermock + powermock-api-mockito2 + 2.0.2 + test + diff --git a/src/main/java/org/tikv/common/exception/KeyException.java b/src/main/java/org/tikv/common/exception/KeyException.java index 22ddda982b9..048ac3b35fe 100644 --- a/src/main/java/org/tikv/common/exception/KeyException.java +++ b/src/main/java/org/tikv/common/exception/KeyException.java @@ -31,7 +31,7 @@ public KeyException(String errMsg) { } public KeyException(Kvrpcpb.KeyError keyErr) { - super("Key exception occurred"); + super("Key exception occurred " + keyErr.toString()); this.keyErr = keyErr; } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 22607b2bdb1..3228650c0ff 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -519,7 +519,7 @@ private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp, lo Lock lock = new Lock(err.getLocked(), codec); locks.add(lock); } else { - throw new KeyException(err.toString()); + throw new KeyException(err); } } if (isSuccess) { diff --git a/src/main/java/org/tikv/txn/TxnKVClient.java b/src/main/java/org/tikv/txn/TxnKVClient.java index 7806c56496e..9c5f4c9da3a 100644 --- a/src/main/java/org/tikv/txn/TxnKVClient.java +++ b/src/main/java/org/tikv/txn/TxnKVClient.java @@ -27,7 +27,6 @@ import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; -import org.tikv.common.exception.KeyException; import org.tikv.common.exception.RegionException; import org.tikv.common.exception.TiClientInternalException; import org.tikv.common.exception.TiKVException; @@ -181,7 +180,6 @@ public ClientRPCResult commit( // TODO: check this logic to see are we satisfied? private boolean retryableException(Exception e) { return e instanceof TiClientInternalException - || e instanceof KeyException || e instanceof RegionException || e instanceof StatusRuntimeException; } diff --git a/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java b/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java index 02a530ffbc6..ecb150b5673 100644 --- a/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java +++ b/src/test/java/org/tikv/txn/TwoPhaseCommitterTest.java @@ -20,17 +20,40 @@ import static org.junit.Assert.fail; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.tikv.BaseTxnKVTest; +import org.tikv.common.BytePairWrapper; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.exception.KeyException; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +@RunWith(PowerMockRunner.class) +@PrepareForTest({TiSession.class, TxnKVClient.class, TwoPhaseCommitter.class}) +@PowerMockIgnore({"javax.net.ssl.*"}) public class TwoPhaseCommitterTest extends BaseTxnKVTest { + + private static final Logger logger = LoggerFactory.getLogger(TwoPhaseCommitterTest.class); private static final int WRITE_BUFFER_SIZE = 32 * 1024; private static final int TXN_COMMIT_BATCH_SIZE = 768 * 1024; private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000; @@ -76,4 +99,170 @@ public void autoClosableTest() throws Exception { executorService)) {} Assert.assertTrue(executorService.isShutdown()); } + + @Test + public void prewriteWriteConflictFastFailTest() throws Exception { + + int WRITE_BUFFER_SIZE = 32; + String primaryKey = RandomStringUtils.randomAlphabetic(3); + AtomicLong failCount = new AtomicLong(); + ExecutorService executorService = + Executors.newFixedThreadPool( + WRITE_BUFFER_SIZE, + new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); + CountDownLatch latch = new CountDownLatch(2); + int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000; + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try { + TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter( + session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); + List pairs = + Arrays.asList( + new BytePairWrapper( + primaryKey.getBytes(StandardCharsets.UTF_8), + primaryKey.getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys( + primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); + + long commitTs = session.getTimestamp().getVersion(); + + twoPhaseCommitter.commitPrimaryKey( + backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); + } catch (Exception e) { + logger.error(e.getMessage(), e); + failCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }) + .start(); + + Thread.sleep(10); + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try { + TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter( + session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); + List pairs = + Arrays.asList( + new BytePairWrapper( + primaryKey.getBytes(StandardCharsets.UTF_8), + primaryKey.getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys( + primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); + + long commitTs = session.getTimestamp().getVersion(); + + twoPhaseCommitter.commitPrimaryKey( + backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); + } catch (Exception e) { + logger.error(e.getMessage(), e); + failCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }) + .start(); + latch.await(); + Assert.assertEquals(1, failCount.get()); + } + + @Test + public void prewriteWriteConflictLongNoFailTest() throws Exception { + + int WRITE_BUFFER_SIZE = 32; + String primaryKey = RandomStringUtils.randomAlphabetic(3); + AtomicLong failCount = new AtomicLong(); + ExecutorService executorService = + Executors.newFixedThreadPool( + WRITE_BUFFER_SIZE, + new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); + CountDownLatch latch = new CountDownLatch(2); + int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000; + + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + try { + session = PowerMockito.spy(session); + TxnKVClient kvClient = PowerMockito.spy(session.createTxnClient()); + PowerMockito.when(kvClient, "retryableException", Mockito.any(KeyException.class)) + .thenReturn(true); + PowerMockito.doReturn(kvClient).when(session).createTxnClient(); + + TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter( + session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); + List pairs = + Arrays.asList( + new BytePairWrapper( + primaryKey.getBytes(StandardCharsets.UTF_8), + primaryKey.getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys( + primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); + + long commitTs = session.getTimestamp().getVersion(); + + twoPhaseCommitter.commitPrimaryKey( + backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); + } catch (Exception e) { + logger.error(e.getMessage(), e); + failCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }) + .start(); + + Thread.sleep(10); + new Thread( + () -> { + long startTS = session.getTimestamp().getVersion(); + try { + TwoPhaseCommitter twoPhaseCommitter = + new TwoPhaseCommitter( + session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); + List pairs = + Arrays.asList( + new BytePairWrapper( + primaryKey.getBytes(StandardCharsets.UTF_8), + primaryKey.getBytes(StandardCharsets.UTF_8))); + twoPhaseCommitter.prewriteSecondaryKeys( + primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); + + long commitTs = session.getTimestamp().getVersion(); + + twoPhaseCommitter.commitPrimaryKey( + backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); + } catch (Exception e) { + logger.error(e.getMessage(), e); + failCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }) + .start(); + latch.await(); + Assert.assertEquals(1, failCount.get()); + } }