Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.testframework;

import static java.lang.Thread.sleep;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -1099,6 +1098,21 @@ public static UUID deriveUuidFrom(String str) {
return new UUID(str.hashCode(), new StringBuilder(str).reverse().toString().hashCode());
}

/**
* Sleep for a while.
*
* @param millis Time to sleep in milliseconds.
*/
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

throw new RuntimeException(e);
}
}

/**
* Converts a result set to a list of rows.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1681,11 +1681,11 @@ private <T> CompletableFuture<T> appendTxCommand(

return inBusyLockAsync(busyLock, () ->
resolveWriteIntentReadability(writeIntent, ts)
.thenApply(writeIntentReadable ->
.thenApply(wiResolutionResult ->
inBusyLock(busyLock, () -> {
metrics.onRead(true, true);

if (writeIntentReadable) {
if (wiResolutionResult.writeIntentReadable) {
return findAny(writeIntents, wi -> !wi.isEmpty()).map(ReadResult::binaryRow).orElse(null);
} else {
for (ReadResult wi : writeIntents) {
Expand Down Expand Up @@ -3372,16 +3372,15 @@ private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForRe
) {
return inBusyLockAsync(busyLock, () ->
resolveWriteIntentReadability(readResult, timestamp)
.thenApply(writeIntentReadable ->
.thenApply(wiResolutionResult ->
inBusyLock(busyLock, () -> {
if (writeIntentReadable) {
if (wiResolutionResult.writeIntentReadable) {
// Even though this readResult is still a write intent entry in the storage
// (therefore it contains txId), we already know it relates to a committed transaction
// and will be cleaned up by an asynchronous task
// started in scheduleTransactionRowAsyncCleanup().
// So it's safe to assume that that this is the latest committed entry.
HybridTimestamp commitTimestamp =
txManager.stateMeta(readResult.transactionId()).commitTimestamp();
HybridTimestamp commitTimestamp = wiResolutionResult.transactionMeta.commitTimestamp();

return new TimedBinaryRow(readResult.binaryRow(), commitTimestamp);
}
Expand Down Expand Up @@ -3451,10 +3450,12 @@ private void scheduleAsyncWriteIntentSwitch(UUID txId, RowId rowId, TransactionM
*
* @param writeIntent Write intent to resolve.
* @param timestamp Timestamp.
* @return The future completes with {@code true} when the transaction is committed and commit time <= read time, {@code false}
* otherwise (whe the transaction is either in progress, or aborted, or committed and commit time > read time).
* @return Write intent resolution result, see {@link WriteIntentResolutionResult}.
*/
private CompletableFuture<Boolean> resolveWriteIntentReadability(ReadResult writeIntent, @Nullable HybridTimestamp timestamp) {
private CompletableFuture<WriteIntentResolutionResult> resolveWriteIntentReadability(
ReadResult writeIntent,
@Nullable HybridTimestamp timestamp
) {
UUID txId = writeIntent.transactionId();

HybridTimestamp now = clockService.current();
Expand All @@ -3463,19 +3464,35 @@ private CompletableFuture<Boolean> resolveWriteIntentReadability(ReadResult writ
? null
: replicaMeta.getStartTime().longValue();

ZonePartitionId commitPartitionId = new ZonePartitionId(writeIntent.commitZoneId(), writeIntent.commitPartitionId());

return transactionStateResolver.resolveTxState(
txId,
new ZonePartitionId(writeIntent.commitZoneId(), writeIntent.commitPartitionId()),
commitPartitionId,
timestamp,
currentConsistencyToken,
replicationGroupId
)
.thenApply(transactionMeta -> {
boolean writeIntentReadable = canReadFromWriteIntent(txId, txManager, transactionMeta, timestamp);

if (isFinalState(transactionMeta.txState())) {
scheduleAsyncWriteIntentSwitch(txId, writeIntent.rowId(), transactionMeta);
} else {
LOG.info(
"Received non-final transaction state after tx state resolution [txId={}, groupId={}, txMeta={}, "
+ "timestamp={}, commitPartId={}, currentConsistencyToken={}, writeIntentReadable={}].",
txId,
replicationGroupId,
transactionMeta,
timestamp,
commitPartitionId,
currentConsistencyToken,
writeIntentReadable
);
}

return canReadFromWriteIntent(txId, txManager, transactionMeta, timestamp);
return new WriteIntentResolutionResult(writeIntentReadable, transactionMeta);
});
}

Expand All @@ -3494,6 +3511,7 @@ assert isFinalState(txMeta.txState()) || txMeta.txState() == PENDING || txMeta.t
: format("Unexpected state defined by write intent resolution [{}, txMeta={}].",
formatTxInfo(txId, txManager, false), txMeta);

// TODO IGNITE-27494 double check UNKNOWN state works correctly here.
if (txMeta.txState() == COMMITTED) {
boolean readLatest = timestamp == null;

Expand Down Expand Up @@ -3925,4 +3943,21 @@ private int tableVersionByTs(HybridTimestamp ts) {
public void cleanupLocally(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) {
storageUpdateHandler.switchWriteIntents(txId, commit, commitTimestamp, null);
}

private static class WriteIntentResolutionResult {
/**
* This value is assigned with awareness of read timestamp in case of WI resolution by read-only transaction. It is {@code true}
* when the transaction is committed and commit time <= read time, {@code false} otherwise (when the transaction
* is either in progress, or aborted, or committed and commit time > read time).
*/
private final boolean writeIntentReadable;

/** Transaction meta. */
private final TransactionMeta transactionMeta;

public WriteIntentResolutionResult(boolean writeIntentReadable, TransactionMeta transactionMeta) {
this.writeIntentReadable = writeIntentReadable;
this.transactionMeta = transactionMeta;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.ignite.tx.distributed;

import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runInExecutor;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.findTupleToBeHostedOnNode;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.partitionIdForTuple;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.table;
Expand All @@ -30,11 +35,14 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
Expand All @@ -52,13 +60,20 @@
import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.message.TxCleanupMessage;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Test for transaction abort on coordinator when write-intent resolution happens after primary replica expiration.
Expand Down Expand Up @@ -103,8 +118,18 @@ protected int[] cmgMetastoreNodes() {
return new int[]{0, 1, 2};
}

@Test
public void test() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCoordinatorAbortsTransaction(boolean withThinClient) throws Exception {
IgniteClient client = IgniteClient.builder()
.addresses(getClientAddresses(runningNodes().collect(toList())).toArray(new String[0]))
.operationTimeout(15_000)
.build();

if (withThinClient) {
await().atMost(3, TimeUnit.SECONDS).until(() -> client.connections().size() == initialNodes());
}

IgniteImpl firstPrimaryNode = anyNode();
IgniteImpl coordinatorNode = findNode(n -> !n.name().equals(firstPrimaryNode.name()));

Expand All @@ -114,17 +139,32 @@ public void test() throws Exception {
RecordView<Tuple> view = coordinatorNode.tables().table(TABLE_NAME).recordView();

Transaction txx = coordinatorNode.transactions().begin();
Tuple tuple0 = findTupleToBeHostedOnNode(coordinatorNode, TABLE_NAME, txx, INITIAL_TUPLE, NEXT_TUPLE, true);
Tuple tuple = findTupleToBeHostedOnNode(firstPrimaryNode, TABLE_NAME, txx, INITIAL_TUPLE, NEXT_TUPLE, true);
int partId = partitionIdForTuple(firstPrimaryNode, TABLE_NAME, tuple, txx);
var groupId = new ZonePartitionId(zoneId(firstPrimaryNode, TABLE_NAME), partId);
log.info("Test: groupId: " + groupId);
view.upsert(txx, tuple0);
view.upsert(txx, tuple);

txx.commit();

Transaction tx0 = coordinatorNode.transactions().begin();
log.info("Test: unfinished tx id: " + txId(tx0));
view.upsert(tx0, tuple);
// Unfinished transaction.
RecordView<Tuple> unfinishedTxView = withThinClient
? client.tables().table(TABLE_NAME).recordView()
: coordinatorNode.tables().table(TABLE_NAME).recordView();
IgniteTransactions unfinishedTxTransactions = withThinClient
? client.transactions()
: coordinatorNode.transactions();

Transaction tx0 = unfinishedTxTransactions.begin();

if (!withThinClient) {
log.info("Test: unfinished tx id: " + txId(tx0));
}

unfinishedTxView.upsert(tx0, tuple0);
unfinishedTxView.upsert(tx0, tuple);
// Don't commit or rollback tx0.

// Wait for replication of write intent.
Expand Down Expand Up @@ -164,6 +204,88 @@ public void test() throws Exception {
assertEquals(newTuple, actual);
}

@Test
public void testWriteIntentResolutionUsesCorrectStateAndCommitTimestamp() throws Exception {
// Coordinator node will also be the commit partition primary node.
IgniteImpl coordinatorNode = anyNode();
IgniteImpl firstPrimaryNode = findNode(n -> !n.name().equals(coordinatorNode.name()));
IgniteImpl secondPrimaryNode = findNode(n -> !n.name().equals(coordinatorNode.name()) && !n.name().equals(firstPrimaryNode.name()));

log.info("Test: coordinatorNode: {}, coordinatorNode id: {}", coordinatorNode.name(), coordinatorNode.id());
log.info("Test: firstPrimaryNode: {}, firstPrimaryNode id: {}", firstPrimaryNode.name(), firstPrimaryNode.id());
log.info("Test: secondPrimaryNode: {}, secondPrimaryNode id: {}", secondPrimaryNode.name(), secondPrimaryNode.id());

RecordView<Tuple> view = coordinatorNode.tables().table(TABLE_NAME).recordView();

Transaction txx = coordinatorNode.transactions().begin();
Tuple tuple1 = findTupleToBeHostedOnNode(coordinatorNode, TABLE_NAME, txx, INITIAL_TUPLE, NEXT_TUPLE, true);
Tuple tuple2 = findTupleToBeHostedOnNode(firstPrimaryNode, TABLE_NAME, txx, INITIAL_TUPLE, NEXT_TUPLE, true);
int partId1 = partitionIdForTuple(coordinatorNode, TABLE_NAME, tuple1, txx);
var groupId1 = new ZonePartitionId(zoneId(firstPrimaryNode, TABLE_NAME), partId1);
int partId2 = partitionIdForTuple(coordinatorNode, TABLE_NAME, tuple2, txx);
var groupId2 = new ZonePartitionId(zoneId(firstPrimaryNode, TABLE_NAME), partId2);
log.info("Test: groupId1: " + groupId1);
log.info("Test: groupId2: " + groupId2);
view.upsert(txx, tuple1);
view.upsert(txx, tuple2);

txx.commit();

cluster.runningNodes().forEach(node -> {
unwrapIgniteImpl(node).dropMessages((dest, msg) -> {
boolean wiSwitch = msg instanceof TxCleanupMessage || msg instanceof WriteIntentSwitchReplicaRequestBase;
return wiSwitch && secondPrimaryNode.name().equals(dest);
});
});

coordinatorNode.dropMessages((dest, msg) -> msg instanceof TxCleanupMessage || msg instanceof WriteIntentSwitchReplicaRequest);
firstPrimaryNode.dropMessages((dest, msg) -> msg instanceof TxCleanupMessage || msg instanceof WriteIntentSwitchReplicaRequest);

Transaction tx0 = coordinatorNode.transactions().begin();
UUID tx0Id = txId(tx0);
log.info("Test: cleanup unfinished tx id: " + txId(tx0));
view.upsert(tx0, tuple1);
view.upsert(tx0, tuple2);

tx0.commitAsync();

await().atMost(5, TimeUnit.SECONDS)
.until(() -> txFinishedStateOnNode(coordinatorNode, tx0Id));

// Wait for replication of write intent.
Tuple keyTuple = Tuple.create().set("key", tuple2.longValue("key"));
await().atMost(5, TimeUnit.SECONDS)
.until(() -> checkWriteIntentInStorageOnAllNodes(partId2, keyTuple));

transferPrimary(runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(toList()), groupId2, secondPrimaryNode.name());

await().atMost(5, TimeUnit.SECONDS)
.until(() -> txFinishedStateOnNode(secondPrimaryNode, tx0Id));

Transaction tx = coordinatorNode.transactions().begin();
log.info("Test: new tx: " + txId(tx));
log.info("Test: upsert");

Tuple newTuple = Tuple.create().set("key", tuple2.longValue("key")).set("val", "v");

// Tx cleanup is blocked but tx state could be propagated if second tuple's primary node is commit partition's backup.
// Transfer it back to pending, imitating obsolete transaction state on secondPrimaryNode.
secondPrimaryNode.txManager().updateTxMeta(tx0Id, old -> null);
secondPrimaryNode.txManager().updateTxMeta(tx0Id, old -> TxStateMeta.builder(PENDING).build());

// If coordinator of tx0 doesn't abort it, tx will stumble into write intent and fail with TxIdMismatchException.
view.upsert(tx, newTuple);

coordinatorNode.stopDroppingMessages();
firstPrimaryNode.stopDroppingMessages();

tx.commit();

// Check that new value is written successfully.
Tuple actual = view.get(null, keyTuple);
assertEquals(newTuple, actual);
}

private boolean checkWriteIntentInStorageOnAllNodes(int partId, Tuple key) {
return cluster.runningNodes()
.map(TestWrappers::unwrapIgniteImpl)
Expand Down Expand Up @@ -195,4 +317,22 @@ private boolean checkWriteIntentInStorage(IgniteImpl node, int partId, Tuple key
return false;
});
}

private static boolean txFinishedStateOnNode(IgniteImpl node, UUID txId) {
TxStateMeta meta = node.txManager().stateMeta(txId);

return meta != null && isFinalState(meta.txState());
}

private static List<String> getClientAddresses(List<Ignite> nodes) {
return getClientPorts(nodes).stream()
.map(port -> "127.0.0.1" + ":" + port)
.collect(toList());
}

private static List<Integer> getClientPorts(List<Ignite> nodes) {
return nodes.stream()
.map(ignite -> unwrapIgniteImpl(ignite).clientAddress().port())
.collect(toList());
}
}
Loading