-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[close #776] Fix prewrite & commit log info to debug(#776) #777
base: master
Are you sure you want to change the base?
Changes from 3 commits
aedfd66
2be0907
6757dce
bc40a34
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,6 @@ | |
import org.tikv.common.ReadOnlyPDClient; | ||
import org.tikv.common.TiConfiguration; | ||
import org.tikv.common.exception.GrpcException; | ||
import org.tikv.common.exception.KeyException; | ||
import org.tikv.common.exception.RegionException; | ||
import org.tikv.common.exception.TiClientInternalException; | ||
import org.tikv.common.exception.TiKVException; | ||
|
@@ -181,7 +180,6 @@ public ClientRPCResult commit( | |
// TODO: check this logic to see are we satisfied? | ||
private boolean retryableException(Exception e) { | ||
return e instanceof TiClientInternalException | ||
|| e instanceof KeyException | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delete this will be a break change. Some KeyException also need the retry, why do you want to exclude this?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok,I review it again |
||
|| e instanceof RegionException | ||
|| e instanceof StatusRuntimeException; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,17 +20,40 @@ | |
import static org.junit.Assert.fail; | ||
|
||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import org.apache.commons.lang3.RandomStringUtils; | ||
import org.junit.After; | ||
import org.junit.Assert; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.mockito.Mockito; | ||
import org.powermock.api.mockito.PowerMockito; | ||
import org.powermock.core.classloader.annotations.PowerMockIgnore; | ||
import org.powermock.core.classloader.annotations.PrepareForTest; | ||
import org.powermock.modules.junit4.PowerMockRunner; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.tikv.BaseTxnKVTest; | ||
import org.tikv.common.BytePairWrapper; | ||
import org.tikv.common.TiConfiguration; | ||
import org.tikv.common.TiSession; | ||
import org.tikv.common.exception.KeyException; | ||
import org.tikv.common.util.BackOffer; | ||
import org.tikv.common.util.ConcreteBackOffer; | ||
|
||
@RunWith(PowerMockRunner.class) | ||
@PrepareForTest({TiSession.class, TxnKVClient.class, TwoPhaseCommitter.class}) | ||
@PowerMockIgnore({"javax.net.ssl.*"}) | ||
public class TwoPhaseCommitterTest extends BaseTxnKVTest { | ||
|
||
private static final Logger logger = LoggerFactory.getLogger(TwoPhaseCommitterTest.class); | ||
private static final int WRITE_BUFFER_SIZE = 32 * 1024; | ||
private static final int TXN_COMMIT_BATCH_SIZE = 768 * 1024; | ||
private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000; | ||
|
@@ -76,4 +99,170 @@ public void autoClosableTest() throws Exception { | |
executorService)) {} | ||
Assert.assertTrue(executorService.isShutdown()); | ||
} | ||
|
||
@Test | ||
public void prewriteWriteConflictFastFailTest() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems this test has nothing to do with this pr, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is based on the new constructor method added in this PR #775. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this test for verify write conflict will aways retry ,but need fast fail There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you explain why this test was added? Or what scenes/feature this test for |
||
|
||
int WRITE_BUFFER_SIZE = 32; | ||
String primaryKey = RandomStringUtils.randomAlphabetic(3); | ||
AtomicLong failCount = new AtomicLong(); | ||
ExecutorService executorService = | ||
Executors.newFixedThreadPool( | ||
WRITE_BUFFER_SIZE, | ||
new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); | ||
CountDownLatch latch = new CountDownLatch(2); | ||
int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000; | ||
new Thread( | ||
() -> { | ||
long startTS = session.getTimestamp().getVersion(); | ||
try { | ||
Thread.sleep(1000); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
try { | ||
TwoPhaseCommitter twoPhaseCommitter = | ||
new TwoPhaseCommitter( | ||
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); | ||
List<BytePairWrapper> pairs = | ||
Arrays.asList( | ||
new BytePairWrapper( | ||
primaryKey.getBytes(StandardCharsets.UTF_8), | ||
primaryKey.getBytes(StandardCharsets.UTF_8))); | ||
twoPhaseCommitter.prewriteSecondaryKeys( | ||
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); | ||
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); | ||
|
||
long commitTs = session.getTimestamp().getVersion(); | ||
|
||
twoPhaseCommitter.commitPrimaryKey( | ||
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); | ||
} catch (Exception e) { | ||
logger.error(e.getMessage(), e); | ||
failCount.incrementAndGet(); | ||
} finally { | ||
latch.countDown(); | ||
} | ||
}) | ||
.start(); | ||
|
||
Thread.sleep(10); | ||
new Thread( | ||
() -> { | ||
long startTS = session.getTimestamp().getVersion(); | ||
try { | ||
TwoPhaseCommitter twoPhaseCommitter = | ||
new TwoPhaseCommitter( | ||
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); | ||
List<BytePairWrapper> pairs = | ||
Arrays.asList( | ||
new BytePairWrapper( | ||
primaryKey.getBytes(StandardCharsets.UTF_8), | ||
primaryKey.getBytes(StandardCharsets.UTF_8))); | ||
twoPhaseCommitter.prewriteSecondaryKeys( | ||
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); | ||
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); | ||
|
||
long commitTs = session.getTimestamp().getVersion(); | ||
|
||
twoPhaseCommitter.commitPrimaryKey( | ||
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); | ||
} catch (Exception e) { | ||
logger.error(e.getMessage(), e); | ||
failCount.incrementAndGet(); | ||
} finally { | ||
latch.countDown(); | ||
} | ||
}) | ||
.start(); | ||
latch.await(); | ||
Assert.assertEquals(1, failCount.get()); | ||
} | ||
|
||
@Test | ||
public void prewriteWriteConflictLongNoFailTest() throws Exception { | ||
|
||
int WRITE_BUFFER_SIZE = 32; | ||
String primaryKey = RandomStringUtils.randomAlphabetic(3); | ||
AtomicLong failCount = new AtomicLong(); | ||
ExecutorService executorService = | ||
Executors.newFixedThreadPool( | ||
WRITE_BUFFER_SIZE, | ||
new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); | ||
CountDownLatch latch = new CountDownLatch(2); | ||
int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000; | ||
|
||
new Thread( | ||
() -> { | ||
long startTS = session.getTimestamp().getVersion(); | ||
try { | ||
Thread.sleep(1000); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
try { | ||
session = PowerMockito.spy(session); | ||
TxnKVClient kvClient = PowerMockito.spy(session.createTxnClient()); | ||
PowerMockito.when(kvClient, "retryableException", Mockito.any(KeyException.class)) | ||
.thenReturn(true); | ||
PowerMockito.doReturn(kvClient).when(session).createTxnClient(); | ||
|
||
TwoPhaseCommitter twoPhaseCommitter = | ||
new TwoPhaseCommitter( | ||
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); | ||
List<BytePairWrapper> pairs = | ||
Arrays.asList( | ||
new BytePairWrapper( | ||
primaryKey.getBytes(StandardCharsets.UTF_8), | ||
primaryKey.getBytes(StandardCharsets.UTF_8))); | ||
twoPhaseCommitter.prewriteSecondaryKeys( | ||
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); | ||
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); | ||
|
||
long commitTs = session.getTimestamp().getVersion(); | ||
|
||
twoPhaseCommitter.commitPrimaryKey( | ||
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); | ||
} catch (Exception e) { | ||
logger.error(e.getMessage(), e); | ||
failCount.incrementAndGet(); | ||
} finally { | ||
latch.countDown(); | ||
} | ||
}) | ||
.start(); | ||
|
||
Thread.sleep(10); | ||
new Thread( | ||
() -> { | ||
long startTS = session.getTimestamp().getVersion(); | ||
try { | ||
TwoPhaseCommitter twoPhaseCommitter = | ||
new TwoPhaseCommitter( | ||
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService); | ||
List<BytePairWrapper> pairs = | ||
Arrays.asList( | ||
new BytePairWrapper( | ||
primaryKey.getBytes(StandardCharsets.UTF_8), | ||
primaryKey.getBytes(StandardCharsets.UTF_8))); | ||
twoPhaseCommitter.prewriteSecondaryKeys( | ||
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000); | ||
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000); | ||
|
||
long commitTs = session.getTimestamp().getVersion(); | ||
|
||
twoPhaseCommitter.commitPrimaryKey( | ||
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs); | ||
} catch (Exception e) { | ||
logger.error(e.getMessage(), e); | ||
failCount.incrementAndGet(); | ||
} finally { | ||
latch.countDown(); | ||
} | ||
}) | ||
.start(); | ||
latch.await(); | ||
Assert.assertEquals(1, failCount.get()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to print error msg here?
Have you tried
getKeyErr()
? it should return the error and you should be able to get the error msg fromgetKeyErr().toString()