Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24242 Add RO/RW Transaction Timeout configuration schema #5093

Merged
merged 21 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
68f1fa8
IGNITE-24242 Add RO/RW Transaction Timeout configuration schema
PakhomovAlexander Jan 21, 2025
2e7c24c
Review fixes: javadoc and configurations naming
PakhomovAlexander Jan 23, 2025
c4fcc59
Use suppliers for ro/rw timeouts in TableManager, so if the config ar…
PakhomovAlexander Jan 24, 2025
03a0274
Use timeout from TransationOptions rather than from configuration.
PakhomovAlexander Jan 24, 2025
7dcb643
Fix config in test
PakhomovAlexander Jan 24, 2025
1e21f92
Fix test
PakhomovAlexander Jan 27, 2025
a34ded1
Update modules/transactions/src/main/java/org/apache/ignite/internal/…
PakhomovAlexander Jan 27, 2025
9b6203a
Merge branch 'IGNITE-24242' of github.com:unisonteam/ignite-3 into IG…
PakhomovAlexander Jan 27, 2025
17f37e4
Use TimeUnit for defaults in configuration
PakhomovAlexander Jan 27, 2025
4340ec3
Fix comment in constructor
PakhomovAlexander Jan 27, 2025
db2fcef
Fix test, add todo
PakhomovAlexander Jan 27, 2025
71ac52d
Fix checkstyle
PakhomovAlexander Jan 27, 2025
ba756bc
Add todos for the next ticket
PakhomovAlexander Jan 27, 2025
c05476d
Change default timeout for rw transactions to make tests pass
PakhomovAlexander Jan 28, 2025
fa7f5df
fix
PakhomovAlexander Jan 28, 2025
cc9cb48
Fix
PakhomovAlexander Jan 28, 2025
ae00d30
Change default RW timeout to make test pass
PakhomovAlexander Jan 29, 2025
e1ee039
Fix test
PakhomovAlexander Jan 29, 2025
2f1e409
Update modules/transactions/src/main/java/org/apache/ignite/internal/…
PakhomovAlexander Jan 30, 2025
c0b0e40
IGNITE-24242 Add RO/RW Transaction Timeout configuration schema
PakhomovAlexander Jan 30, 2025
2f13beb
Fix checkstyle
PakhomovAlexander Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public CompletableFuture<Void> finish(boolean commit, HybridTimestamp executionT
public boolean isFinishingOrFinished() {
return false;
}

@Override
public long timeout() {
return 10_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10_000 is used for the same purpose in multiple places. Should we extract it somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next ticket will change this, don't you mind to leave this as is?

}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ private static class TestInternalTableImpl extends InternalTableImpl {
timestampTracker,
mock(PlacementDriver.class),
mock(TransactionInflights.class),
3_000,
0,
null,
mock(StreamerReceiverRunner.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ public boolean isFinishingOrFinished() {
return commitFut.isDone() || rollbackFut.isDone();
}

@Override
public long timeout() {
return 10_000;
}

@Override
public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId tablePartitionId,
IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.table.TxAbstractTest;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.util.CollectionUtils;
Expand Down Expand Up @@ -205,7 +206,7 @@ public void testImplicitTransactionTimeout() {

assertFalse(implicitOpFut.isDone());

assertThat(implicitOpFut, willThrow(TransactionException.class));
assertThat(implicitOpFut, willThrow(TransactionException.class, 40_000, TimeUnit.SECONDS));

assertNull(rv.get(null, makeKey(1)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ public NetworkMessage clone() {
observableTimestampTracker,
new TestPlacementDriver(clusterNode),
transactionInflights,
3_000,
0,
null,
mock(StreamerReceiverRunner.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {

private final PartitionReplicaLifecycleManager partitionReplicaLifecycleManager;

private long implicitTransactionTimeout;

private int attemptsObtainLock;

@Nullable
Expand Down Expand Up @@ -679,7 +677,6 @@ public CompletableFuture<Void> startAsync(ComponentContext componentContext) {

partitionReplicatorNodeRecovery.start();

implicitTransactionTimeout = txCfg.implicitTransactionTimeout().value();
attemptsObtainLock = txCfg.attemptsObtainLock().value();

executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this::onPrimaryReplicaExpired);
Expand Down Expand Up @@ -1628,7 +1625,6 @@ private TableImpl createTableImpl(
observableTimestampTracker,
executorInclinedPlacementDriver,
transactionInflights,
implicitTransactionTimeout,
attemptsObtainLock,
this::streamerFlushExecutor,
Objects.requireNonNull(streamerReceiverRunner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public class InternalTableImpl implements InternalTable {
/** Replica messages factory. */
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();

public static final int DEFAULT_RW_TIMEOUT = 10_000;

/** Partitions. */
private final int partitions;

Expand Down Expand Up @@ -198,9 +200,6 @@ public class InternalTableImpl implements InternalTable {
/** Map update guarded by {@link #updatePartitionMapsMux}. */
private volatile Int2ObjectMap<PendingComparableValuesTracker<Long, Void>> storageIndexTrackerByPartitionId = emptyMap();

/** Implicit transaction timeout. */
private final long implicitTransactionTimeout;

/** Attempts to take lock. */
private final int attemptsObtainLock;

Expand All @@ -218,7 +217,6 @@ public class InternalTableImpl implements InternalTable {
* @param clockService A hybrid logical clock service.
* @param placementDriver Placement driver.
* @param transactionInflights Transaction inflights.
* @param implicitTransactionTimeout Implicit transaction timeout.
* @param attemptsObtainLock Attempts to take lock.
*/
public InternalTableImpl(
Expand All @@ -234,7 +232,6 @@ public InternalTableImpl(
HybridTimestampTracker observableTimestampTracker,
PlacementDriver placementDriver,
TransactionInflights transactionInflights,
long implicitTransactionTimeout,
int attemptsObtainLock,
Supplier<ScheduledExecutorService> streamerFlushExecutor,
StreamerReceiverRunner streamerReceiverRunner
Expand All @@ -251,7 +248,6 @@ public InternalTableImpl(
this.observableTimestampTracker = observableTimestampTracker;
this.placementDriver = placementDriver;
this.transactionInflights = transactionInflights;
this.implicitTransactionTimeout = implicitTransactionTimeout;
this.attemptsObtainLock = attemptsObtainLock;
this.streamerFlushExecutor = streamerFlushExecutor;
this.streamerReceiverRunner = streamerReceiverRunner;
Expand Down Expand Up @@ -368,9 +364,12 @@ private <R> CompletableFuture<R> enlistInTx(
return postEnlist(fut, false, actualTx, actualTx.implicit()).handle((r, e) -> {
if (e != null) {
if (actualTx.implicit()) {
// TODO: IGNITE-24244
long timeout = actualTx.isReadOnly() ? actualTx.timeout() : DEFAULT_RW_TIMEOUT;

long ts = (txStartTs == null) ? actualTx.startTimestamp().getPhysical() : txStartTs;

if (exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
if (canRetry(e, ts, timeout)) {
return enlistInTx(row, null, fac, noWriteChecker, ts);
}
}
Expand All @@ -382,6 +381,10 @@ private <R> CompletableFuture<R> enlistInTx(
}).thenCompose(identity());
}

private static boolean canRetry(Throwable e, long ts, long timeout) {
return exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < timeout;
}

/**
* Enlists a single row into a transaction.
*
Expand Down Expand Up @@ -486,9 +489,12 @@ private <T> CompletableFuture<T> enlistInTx(
return postEnlist(fut, actualTx.implicit() && !singlePart, actualTx, full).handle((r, e) -> {
if (e != null) {
if (actualTx.implicit()) {
// TODO: IGNITE-24244
long timeout = actualTx.isReadOnly() ? actualTx.timeout() : 3_000;

long ts = (txStartTs == null) ? actualTx.startTimestamp().getPhysical() : txStartTs;

if (exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
if (canRetry(e, ts, timeout)) {
return enlistInTx(keyRows, null, fac, reducer, noOpChecker, ts);
}
}
Expand Down Expand Up @@ -1185,9 +1191,12 @@ private CompletableFuture<Void> updateAllWithRetry(
// Will be finished in one RTT.
return postEnlist(fut, false, tx, true).handle((r, e) -> {
if (e != null) {
// TODO: IGNITE-24244
long timeout = tx.isReadOnly() ? tx.timeout() : 3_000;

long ts = (txStartTs == null) ? tx.startTimestamp().getPhysical() : txStartTs;

if (exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
if (canRetry(e, ts, timeout)) {
return updateAllWithRetry(rows, deleted, partition, ts);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ void setUp(
placementDriver,
new TransactionInflights(placementDriver, clockService),
0,
0,
() -> null,
mock(StreamerReceiverRunner.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ void testUpdatePartitionTrackers() {
HybridTimestampTracker.atomicTracker(null),
mock(PlacementDriver.class),
mock(TransactionInflights.class),
3_000,
0,
null,
mock(StreamerReceiverRunner.class)
Expand Down Expand Up @@ -122,7 +121,6 @@ void testRowBatchByPartitionId() {
HybridTimestampTracker.atomicTracker(null),
mock(PlacementDriver.class),
mock(TransactionInflights.class),
3_000,
0,
null,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,6 @@ public TableViewInternal startTable(String tableName, SchemaDescriptor schemaDes
timestampTracker,
placementDriver,
clientTransactionInflights,
500,
0,
null,
mock(StreamerReceiverRunner.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ public DummyInternalTableImpl(
tracker,
placementDriver,
transactionInflights,
3_000,
0,
null,
mock(StreamerReceiverRunner.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ protected void customizeInitParameters(InitParametersBuilder builder) {

builder.clusterConfiguration("ignite {"
+ " transaction: {"
+ " implicitTransactionTimeout: 30000,"
+ " readOnlyTimeout: 30000,"
+ " readWriteTimeout: 30000,"
+ " txnResourceTtl: 2"
+ " },"
+ " replication: {"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,11 @@ public interface InternalTransaction extends Transaction {
* @return Whether the transaction is finishing or finished
*/
boolean isFinishingOrFinished();

/**
* Returns the transaction timeout in millis.
*
* @return The transaction timeout.
*/
long timeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,20 @@
*/
@Config
public class TransactionConfigurationSchema {
/** Default checking transaction interval. */
public static final long DEFAULT_ABANDONED_CHECK_TS = 5_000;

/** How often abandoned transactions are searched for (milliseconds). */
@Range(min = 0)
@Value(hasDefault = true)
public final long abandonedCheckTs = DEFAULT_ABANDONED_CHECK_TS;
public final long abandonedCheckTs = 5_000;

/** Default transaction timeout (milliseconds). */
/** Default timeout for read-only transactions. */
@Range(min = 1)
@Value(hasDefault = true)
public final long timeout = 10_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still looks like a breaking change. Let's consult @ptupitsyn whether we are allowed to directly remove a configuration property

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. But why should we consult Pavel about changing the property we've added a couple of weeks ago?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because 3.0.0 already contains transaction.timeout config property. If we just remove it, then our next release will potentially break them.

public final long readOnlyTimeout = TimeUnit.MINUTES.toMillis(10);

/** Timeout for implicit transactions (milliseconds). */
/** Default timeout for read-write transactions. */
@Range(min = 1)
@Value(hasDefault = true)
public final long implicitTransactionTimeout = 3_000;
public final long readWriteTimeout = TimeUnit.SECONDS.toMillis(30);

/** A transaction tries to take lock several times until it throws an exception {@lonk org.apache.ignite.tx.TransactionException}. */
@Range(min = 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public abstract class IgniteAbstractTransactionImpl implements InternalTransacti
/** Implicit transaction flag. */
private final boolean implicit;

/** Transaction timeout. */
protected final long timeout;

/**
* The constructor.
*
Expand All @@ -63,19 +66,22 @@ public abstract class IgniteAbstractTransactionImpl implements InternalTransacti
* @param observableTsTracker Observation timestamp tracker.
* @param coordinatorId Transaction coordinator inconsistent ID.
* @param implicit True for an implicit transaction, false for an ordinary one.
* @param timeout Transaction timeout in milliseconds.
*/
public IgniteAbstractTransactionImpl(
TxManager txManager,
HybridTimestampTracker observableTsTracker,
UUID id,
UUID coordinatorId,
boolean implicit
boolean implicit,
long timeout
) {
this.txManager = txManager;
this.observableTsTracker = observableTsTracker;
this.id = id;
this.coordinatorId = coordinatorId;
this.implicit = implicit;
this.timeout = timeout;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -145,4 +151,10 @@ private static Throwable tryToCopyExceptionWithCause(ExecutionException exceptio

return copy;
}

/** {@inheritDoc} */
@Override
public long timeout() {
return timeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public boolean isFinishingOrFinished() {
return transaction.isFinishingOrFinished();
}

@Override
public long timeout() {
return transaction.timeout();
}

@Override
public <T> T unwrap(Class<T> classToUnwrap) {
return classToUnwrap.cast(transaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ public class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
UUID id,
UUID txCoordinatorId,
boolean implicit,
long timeout,
HybridTimestamp readTimestamp,
CompletableFuture<Void> txFuture
) {
super(txManager, observableTsTracker, id, txCoordinatorId, implicit);
super(txManager, observableTsTracker, id, txCoordinatorId, implicit, timeout);

this.readTimestamp = readTimestamp;
this.txFuture = txFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ public ReadWriteTransactionImpl(
HybridTimestampTracker observableTsTracker,
UUID id,
UUID txCoordinatorId,
boolean implicit
boolean implicit,
long timeout
) {
super(txManager, observableTsTracker, id, txCoordinatorId, implicit);
super(txManager, observableTsTracker, id, txCoordinatorId, implicit, timeout);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,11 @@ private InternalTransaction beginBusy(
if (!readOnly) {
txStateVolatileStorage.initialize(txId, localNodeId);

return new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId, implicit);
// TODO: RW timeouts will be supported in https://issues.apache.org/jira/browse/IGNITE-24244
// long timeout = options.timeoutMillis() == 0 ? txConfig.readWriteTimeout().value() : options.timeoutMillis();
long timeout = 3_000;

return new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId, implicit, timeout);
} else {
return beginReadOnlyTransaction(timestampTracker, beginTimestamp, txId, implicit, options);
}
Expand Down Expand Up @@ -447,7 +451,11 @@ private ReadOnlyTransactionImpl beginReadOnlyTransaction(
try {
CompletableFuture<Void> txFuture = new CompletableFuture<>();

var transaction = new ReadOnlyTransactionImpl(this, timestampTracker, txId, localNodeId, implicit, readTimestamp, txFuture);
long timeout = options.timeoutMillis() == 0 ? defaultReadOnlyTransactionTimeoutMillis() : options.timeoutMillis();

var transaction = new ReadOnlyTransactionImpl(
this, timestampTracker, txId, localNodeId, implicit, timeout, readTimestamp, txFuture
);

// Implicit transactions are finished as soon as their operation/query is finished, they cannot be abandoned, so there is
// no need to register them.
Expand Down Expand Up @@ -475,7 +483,7 @@ private ReadOnlyTransactionImpl beginReadOnlyTransaction(
}

private long roExpirationPhysicalTimeFor(HybridTimestamp beginTimestamp, InternalTxOptions options) {
long effectiveTimeoutMillis = options.timeoutMillis() == 0 ? defaultTransactionTimeoutMillis() : options.timeoutMillis();
long effectiveTimeoutMillis = options.timeoutMillis() == 0 ? defaultReadOnlyTransactionTimeoutMillis() : options.timeoutMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calculation is repeated at least twice. Let's extract it to a method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next ticket will change this, don't you mind to leave this as is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with this if this will not be forgotten :)

return sumWithSaturation(beginTimestamp.getPhysical(), effectiveTimeoutMillis);
}

Expand All @@ -493,8 +501,8 @@ private static long sumWithSaturation(long a, long b) {
}
}

private long defaultTransactionTimeoutMillis() {
return txConfig.timeout().value();
private long defaultReadOnlyTransactionTimeoutMillis() {
return txConfig.readOnlyTimeout().value();
}

/**
Expand Down
Loading