Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ private static InternalTransaction startImplicitTx(
TxManager txManager,
boolean readOnly
) {
return txManager.beginImplicit(tsTracker, readOnly);
return txManager.beginImplicit(tsTracker, readOnly, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
}

@Override
public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly) {
public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly, String txLabel) {
return begin(timestampTracker, true, readOnly, InternalTxOptions.defaults());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public void markFinished(UUID txId, TxState txState, @Nullable HybridTimestamp c
old == null ? null : old.tx(),
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp(),
old == null ? null : old.isFinishedDueToTimeout()
old == null ? null : old.isFinishedDueToTimeout(),
old == null ? null : old.txLabel()
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,11 @@ default boolean isWrite() {
* @return {@code True} to disable the delayed ack optimization.
*/
boolean skipDelayedAck();

/**
* Transaction label.
*
* @return Transaction label.
*/
@Nullable String txLabel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public void markFinished(
old == null ? null : old.tx(),
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp(),
old == null ? null : old.isFinishedDueToTimeout()
old == null ? null : old.isFinishedDueToTimeout(),
old == null ? null : old.txLabel()
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public QueryTransactionWrapper getOrStartSqlManaged(boolean readOnly, boolean im

if (tx == null) {
if (implicit) {
transaction = txManager.beginImplicit(observableTimeTracker, readOnly);
transaction = txManager.beginImplicit(observableTimeTracker, readOnly, null);
} else {
transaction = txManager.beginExplicit(observableTimeTracker, readOnly, InternalTxOptions.defaults());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,10 @@ private CompletableFuture<?> processRequest(ReplicaRequest request, ReplicaPrima
req.commitPartitionId().asReplicationGroupId(),
null,
old == null ? null : old.tx(),
old == null ? null : old.isFinishedDueToTimeout()
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp(),
old == null ? null : old.isFinishedDueToTimeout(),
old == null ? req.txLabel() : old.txLabel()
));
}
}
Expand Down Expand Up @@ -766,7 +769,10 @@ private CompletableFuture<?> processOperationRequest(
req.commitPartitionId().asReplicationGroupId(),
null,
old == null ? null : old.tx(),
old == null ? null : old.isFinishedDueToTimeout()
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp(),
old == null ? null : old.isFinishedDueToTimeout(),
old == null ? req.txLabel() : old.txLabel()
));

var opId = new OperationId(senderId, req.timestamp().longValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,22 @@ public class InternalTxOptions {
/** Transaction timeout. 0 means 'use default timeout'. */
private final long timeoutMillis;

/** Transaction label. */
@Nullable
private final String txLabel;

/**
* Read timestamp of the transaction. If {@code null} - the most recent available timestamp will be calculated based on the current
* node's clock.
*/
@Nullable
private final HybridTimestamp readTimestamp;

private InternalTxOptions(TxPriority priority, long timeoutMillis, @Nullable HybridTimestamp readTimestamp) {
private InternalTxOptions(TxPriority priority, long timeoutMillis, @Nullable HybridTimestamp readTimestamp, @Nullable String txLabel) {
this.priority = priority;
this.timeoutMillis = timeoutMillis;
this.readTimestamp = readTimestamp;
this.txLabel = txLabel;
}

public static Builder builder() {
Expand All @@ -73,6 +78,10 @@ public long timeoutMillis() {
return readTimestamp;
}

public @Nullable String txLabel() {
return txLabel;
}

/** Builder for InternalTxOptions. */
public static class Builder {
private TxPriority priority = TxPriority.NORMAL;
Expand All @@ -86,6 +95,9 @@ public static class Builder {
@Nullable
private HybridTimestamp readTimestamp = null;

@Nullable
private String txLabel = null;

public Builder priority(TxPriority priority) {
this.priority = priority;
return this;
Expand All @@ -101,8 +113,13 @@ public Builder readTimestamp(HybridTimestamp readTimestamp) {
return this;
}

public Builder txLabel(@Nullable String txLabel) {
this.txLabel = txLabel;
return this;
}

public InternalTxOptions build() {
return new InternalTxOptions(priority, timeoutMillis, readTimestamp);
return new InternalTxOptions(priority, timeoutMillis, readTimestamp, txLabel);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface TxManager extends IgniteComponent {
* @return The transaction.
*/
default InternalTransaction beginImplicitRw(HybridTimestampTracker timestampTracker) {
return beginImplicit(timestampTracker, false);
return beginImplicit(timestampTracker, false, null);
}

/**
Expand All @@ -56,7 +56,7 @@ default InternalTransaction beginImplicitRw(HybridTimestampTracker timestampTrac
* @return The transaction.
*/
default InternalTransaction beginImplicitRo(HybridTimestampTracker timestampTracker) {
return beginImplicit(timestampTracker, true);
return beginImplicit(timestampTracker, true, null);
}

/**
Expand All @@ -65,9 +65,10 @@ default InternalTransaction beginImplicitRo(HybridTimestampTracker timestampTrac
* @param timestampTracker Observable timestamp tracker is used to track a timestamp for either read-write or read-only
* transaction execution. The tracker is also used to determine the read timestamp for read-only transactions.
* @param readOnly {@code true} in order to start a read snapshot transaction, {@code false} in order to start read-write one.
* @param txLabel Transaction label.
* @return The transaction.
*/
InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly);
InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly, @Nullable String txLabel);

/**
* Starts an explicit read-write transaction coordinated by a local node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ public class TxStateMetaAbandoned extends TxStateMeta {
*
* @param txCoordinatorId Transaction coordinator id.
* @param commitPartitionId Commit partition replication group ID.
* @param txLabel Transaction label.
*/
public TxStateMetaAbandoned(
UUID txCoordinatorId,
ReplicationGroupId commitPartitionId
ReplicationGroupId commitPartitionId,
String txLabel
) {
super(ABANDONED, txCoordinatorId, commitPartitionId, null, null, null);
super(ABANDONED, txCoordinatorId, commitPartitionId, null, null, null, null, null, txLabel);

this.lastAbandonedMarkerTs = FastTimestamps.coarseCurrentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.ignite.internal.tx;

import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateMetaFinishingMessage;
import org.apache.ignite.internal.tx.message.TxStateMetaMessage;
import org.jetbrains.annotations.Nullable;

/**
Expand All @@ -44,11 +42,15 @@ public class TxStateMetaFinishing extends TxStateMeta {
* @param txCoordinatorId Transaction coordinator id.
* @param commitPartitionId Commit partition id.
* @param isFinishingDueToTimeout {@code true} if transaction is finishing due to timeout, {@code false} otherwise.
* @param txLabel Transaction label.
*/
public TxStateMetaFinishing(
@Nullable UUID txCoordinatorId, @Nullable ReplicationGroupId commitPartitionId, @Nullable Boolean isFinishingDueToTimeout
@Nullable UUID txCoordinatorId,
@Nullable ReplicationGroupId commitPartitionId,
@Nullable Boolean isFinishingDueToTimeout,
@Nullable String txLabel
) {
super(TxState.FINISHING, txCoordinatorId, commitPartitionId, null, null, isFinishingDueToTimeout);
super(TxState.FINISHING, txCoordinatorId, commitPartitionId, null, null, null, null, isFinishingDueToTimeout, txLabel);
}

/**
Expand All @@ -66,22 +68,11 @@ public CompletableFuture<TransactionMeta> txFinishFuture() {
}

@Override
public TxStateMetaFinishingMessage toTransactionMetaMessage(
public TxStateMetaMessage toTransactionMetaMessage(
ReplicaMessagesFactory replicaMessagesFactory,
TxMessagesFactory txMessagesFactory
) {
ReplicationGroupId commitPartitionId = commitPartitionId();

return txMessagesFactory.txStateMetaFinishingMessage()
.txState(txState())
.txCoordinatorId(txCoordinatorId())
.commitPartitionId(
commitPartitionId == null ? null : toReplicationGroupIdMessage(replicaMessagesFactory, commitPartitionId)
)
.commitTimestamp(commitTimestamp())
.initialVacuumObservationTimestamp(initialVacuumObservationTimestamp())
.cleanupCompletionTimestamp(cleanupCompletionTimestamp())
.build();
throw new AssertionError("This state shouldn't be transferred over the network.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public CompletableFuture<Transaction> beginAsync(@Nullable TransactionOptions op
* @return The started transaction.
*/
public InternalTransaction beginImplicit(boolean readOnly) {
return txManager.beginImplicit(observableTimestampTracker, readOnly);
return txManager.beginImplicit(observableTimestampTracker, readOnly, null);
}

@TestOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ private void markTxnCleanupReplicated(UUID txId, TxState state, ReplicationGroup
oldMeta == null ? null : oldMeta.tx(),
oldMeta == null ? null : oldMeta.initialVacuumObservationTimestamp(),
cleanupCompletionTimestamp,
oldMeta == null ? null : oldMeta.isFinishedDueToTimeout()
oldMeta == null ? null : oldMeta.isFinishedDueToTimeout(),
oldMeta == null ? null : oldMeta.txLabel()
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,15 @@ private CompletableFuture<Boolean> primaryReplicaExpiredListener(PrimaryReplicaE
}

@Override
public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly) {
public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly, String txLabel) {
if (readOnly) {
return new ReadOnlyImplicitTransactionImpl(timestampTracker, clockService.current());
}

HybridTimestamp beginTimestamp = createBeginTimestampWithIncrementRwTxCounter();
var tx = beginReadWriteTransaction(timestampTracker, beginTimestamp, true, InternalTxOptions.defaults());

txStateVolatileStorage.initialize(tx);
txStateVolatileStorage.initialize(tx, txLabel);
txMetrics.onTransactionStarted();

return tx;
Expand All @@ -459,7 +459,7 @@ public InternalTransaction beginExplicit(HybridTimestampTracker timestampTracker
tx = beginReadWriteTransaction(timestampTracker, beginTimestamp, false, txOptions);
}

txStateVolatileStorage.initialize(tx);
txStateVolatileStorage.initialize(tx, txOptions.txLabel());
txMetrics.onTransactionStarted();

return tx;
Expand Down Expand Up @@ -607,7 +607,10 @@ public void finishFull(
old == null ? null : old.commitPartitionId(),
ts,
old == null ? null : old.tx(),
timeoutExceeded
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp(),
timeoutExceeded,
old == null ? null : old.txLabel()
));

txMetrics.onReadWriteTransactionFinished(txId, finalState == COMMITTED);
Expand Down Expand Up @@ -648,7 +651,10 @@ public CompletableFuture<Void> finish(
commitPartition,
commitTimestamp(commitIntent),
old == null ? null : old.tx(),
timeout
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp(),
timeout,
old == null ? null : old.txLabel()
));

txMetrics.onReadWriteTransactionFinished(txId, commitIntent);
Expand All @@ -670,7 +676,7 @@ public CompletableFuture<Void> finish(

TxStateMetaFinishing finishingStateMeta =
txMeta == null
? new TxStateMetaFinishing(null, commitPartition, timeout)
? new TxStateMetaFinishing(null, commitPartition, timeout, null)
: txMeta.finishing(timeout);

TxStateMeta stateMeta = updateTxMeta(txId, oldMeta -> finishingStateMeta);
Expand Down Expand Up @@ -755,6 +761,7 @@ private CompletableFuture<Void> prepareFinish(
return txCleanupRequestSender.cleanup(null, groups, verifiedCommit, commitTimestamp, txId)
.thenAccept(ignored -> {
// Don't keep useless state.
TxStateMeta previous = txStateVolatileStorage.state(txId);
txStateVolatileStorage.updateMeta(txId, old -> null);

TxStateMeta meta = new TxStateMeta(
Expand All @@ -765,7 +772,8 @@ private CompletableFuture<Void> prepareFinish(
null,
null,
System.currentTimeMillis(),
null
null,
previous == null ? null : previous.txLabel()
);

txFinishFuture.complete(meta);
Expand Down Expand Up @@ -843,7 +851,8 @@ private CompletableFuture<Void> durableFinish(
old == null ? null : old.tx(),
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp(),
old == null ? null : old.isFinishedDueToTimeout()
old == null ? null : old.isFinishedDueToTimeout(),
old == null ? null : old.txLabel()
)
);

Expand Down Expand Up @@ -911,7 +920,8 @@ private CompletableFuture<Void> sendFinishRequest(
old == null ? null : old.tx(),
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp(),
old == null ? null : old.isFinishedDueToTimeout()
old == null ? null : old.isFinishedDueToTimeout(),
old == null ? null : old.txLabel()
));

assert isFinalState(updatedMeta.txState()) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ public void stop() {
* Initializes the meta state for a created transaction.
*
* @param tx Transaction object.
* @param txLabel Transaction label.
*/
public void initialize(InternalTransaction tx) {
TxStateMeta previous = txStateMap.put(tx.id(), new TxStateMeta(PENDING, tx.coordinatorId(), null, null, tx, null));
public void initialize(InternalTransaction tx, @Nullable String txLabel) {
TxStateMeta previous = txStateMap.put(
tx.id(),
new TxStateMeta(PENDING, tx.coordinatorId(), null, null, tx, null, null, null, txLabel)
);

assert previous == null : "Transaction state has already defined [txId=" + tx.id() + ", state=" + previous.txState() + ']';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ default TxStateMetaAbandoned asTxStateMetaAbandoned() {

return new TxStateMetaAbandoned(
txCoordinatorId(),
commitPartitionId == null ? null : commitPartitionId.asReplicationGroupId()
commitPartitionId == null ? null : commitPartitionId.asReplicationGroupId(),
txLabel()
);
}

Expand Down
Loading