Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -59,6 +60,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -1339,6 +1341,35 @@ public static Object[] flatArray(Object... arguments) {
return list.toArray();
}

/**
* Schedules the provided operation to be retried after the specified delay.
*
* @param operation Operation.
* @param delay Delay.
* @param unit Time unit of the delay.
* @param executor Executor to schedule the retry in.
* @return Future that is completed when the operation is successful or failed with an exception.
*/
public static <T> CompletableFuture<T> scheduleRetry(
Callable<CompletableFuture<T>> operation,
long delay,
TimeUnit unit,
ScheduledExecutorService executor
) {
CompletableFuture<T> future = new CompletableFuture<>();

executor.schedule(() -> operation.call()
.whenComplete((res, e) -> {
if (e == null) {
future.complete(res);
} else {
future.completeExceptionally(e);
}
}), delay, unit);

return future;
}

private static CompletableFuture<Void> startAsync(ComponentContext componentContext, Stream<? extends IgniteComponent> components) {
return allOf(components
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand All @@ -77,6 +80,8 @@
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlRow;
import org.awaitility.Awaitility;
import org.hamcrest.CustomMatcher;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -1094,6 +1099,29 @@ public static UUID deriveUuidFrom(String str) {
return new UUID(str.hashCode(), new StringBuilder(str).reverse().toString().hashCode());
}

/**
* Converts a result set to a list of rows.
*
* @param resultSet Result set to convert.
* @return List of rows.
*/
public static List<List<Object>> getAllResultSet(ResultSet<SqlRow> resultSet) {
List<List<Object>> res = new ArrayList<>();

while (resultSet.hasNext()) {
SqlRow sqlRow = resultSet.next();

ArrayList<Object> row = new ArrayList<>(sqlRow.columnCount());
for (int i = 0; i < sqlRow.columnCount(); i++) {
row.add(sqlRow.value(i));
}

res.add(row);
}

return res;
}

/**
* Non-concurrent executor service for test purposes.
*
Expand Down Expand Up @@ -1132,4 +1160,114 @@ public void execute(Runnable command) {
}
};
}

/**
* Non-concurrent scheduled executor service for test purposes. Uses CompletableFuture#delayedExecutor.
*
* @return Executor service.
*/
public static ScheduledExecutorService testSyncScheduledExecutorService() {
return new ScheduledExecutorService() {
private final ExecutorService delegate = testSyncExecutorService();

@Override
public void execute(@NotNull Runnable command) {
delegate.execute(command);
}

@Override
public void shutdown() {
delegate.shutdown();
}

@Override
public @NotNull List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

@Override
public @NotNull <T> Future<T> submit(@NotNull Callable<T> task) {
return delegate.submit(task);
}

@Override
public @NotNull <T> Future<T> submit(@NotNull Runnable task, T result) {
return delegate.submit(task, result);
}

@Override
public @NotNull Future<?> submit(@NotNull Runnable task) {
return delegate.submit(task);
}

@Override
public @NotNull <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}

@Override
public @NotNull <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks, long timeout,
@NotNull TimeUnit unit) throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}

@Override
public @NotNull <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}

@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit)
throws ExecutionException, InterruptedException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}

@Override
public @NotNull ScheduledFuture<?> schedule(@NotNull Runnable command, long delay, @NotNull TimeUnit unit) {
CompletableFuture.delayedExecutor(delay, unit).execute(command);
return null;
}

@Override
public @NotNull <V> ScheduledFuture<V> schedule(@NotNull Callable<V> callable, long delay, @NotNull TimeUnit unit) {
CompletableFuture.delayedExecutor(delay, unit).execute(() -> {
try {
callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
});

return null;
}

@Override
public @NotNull ScheduledFuture<?> scheduleAtFixedRate(@NotNull Runnable command, long initialDelay, long period,
@NotNull TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public @NotNull ScheduledFuture<?> scheduleWithFixedDelay(@NotNull Runnable command, long initialDelay, long delay,
@NotNull TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented.");
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,6 @@ public void testCompactionDuringRebalancing() throws InterruptedException {
);
}

private void sql(String sql) {
cluster.doInSession(0, session -> {
executeUpdate(sql, session);
});
}

private static Set<String> dataNodes(IgniteImpl ignite, int zoneId, HybridTimestamp ts) {
CompletableFuture<Set<String>> dataNodesBeforeStopFut = ignite
.distributionZoneManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ private static Set<String> currentDataNodes(IgniteImpl node, int zoneId) {
return nodeFut.join();
}

private void sql(String sql) {
cluster.aliveNode().sql().execute(sql);
}

private CompletableFuture<?> sqlAsync(String sql) {
return cluster.aliveNode().sql().executeAsync(sql);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@

package org.apache.ignite.internal.index;

import static java.lang.Thread.sleep;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.ClusterPerClassIntegrationTest.isIndexAvailable;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.index.IndexBuildTestUtils.INDEX_NAME;
import static org.apache.ignite.internal.index.IndexBuildTestUtils.TABLE_NAME;
import static org.apache.ignite.internal.index.IndexBuildTestUtils.createTestTable;
import static org.apache.ignite.internal.index.WriteIntentSwitchControl.disableWriteIntentSwitchExecution;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.tx.Transaction;
Expand All @@ -51,8 +54,17 @@ void writeIntentFromTxAbandonedBeforeShouldNotBeIndexed() {
cluster.restartNode(txCoordinatorOrdinal);

createIndex(INDEX_NAME);

// Allow cleanup to be completed after some time. This is required because transaction abortion (that is triggered by
// write intent resolution that is done in index build task) is completed only after successful txn cleanup, and the index
// can't become available before building is completed.
runAsync(() -> {
sleep(5_000);
runningNodesIter().forEach(IgniteImpl::stopDroppingMessages);
});

await("Index did not become available in time")
.atMost(10, SECONDS)
.atMost(30, SECONDS)
.until(() -> isIndexAvailable(unwrapIgniteImpl(cluster.aliveNode()), INDEX_NAME));

verifyNoNodesHaveAnythingInIndex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.lang.util.IgniteNameUtils.quoteIfNeeded;
Expand All @@ -30,7 +31,6 @@

import java.nio.file.Path;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -571,23 +571,6 @@ protected static void sqlScript(String query, Object... args) {
sql.executeScript(query, args);
}

private static List<List<Object>> getAllResultSet(ResultSet<SqlRow> resultSet) {
List<List<Object>> res = new ArrayList<>();

while (resultSet.hasNext()) {
SqlRow sqlRow = resultSet.next();

ArrayList<Object> row = new ArrayList<>(sqlRow.columnCount());
for (int i = 0; i < sqlRow.columnCount(); i++) {
row.add(sqlRow.value(i));
}

res.add(row);
}

return res;
}

/**
* Looks up a node by a consistent ID, {@code null} if absent.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.ignite.internal.ConfigTemplates.NODE_BOOTSTRAP_CFG_TEMPLATE;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;

import java.nio.file.Path;
import java.util.List;
Expand All @@ -38,6 +39,8 @@
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.testframework.junit.DumpThreadsOnTimeout;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -183,6 +186,13 @@ protected final Stream<Ignite> runningNodes() {
return cluster.runningNodes();
}

/**
* Returns nodes that are started and not stopped. This can include knocked out nodes.
*/
protected final Iterable<IgniteImpl> runningNodesIter() {
return cluster.runningNodes().map(node -> unwrapIgniteImpl(node))::iterator;
}

/**
* Restarts a node by index.
*
Expand Down Expand Up @@ -257,6 +267,16 @@ protected final IgniteImpl anyNode() {
return runningNodes().map(TestWrappers::unwrapIgniteImpl).findFirst().orElseThrow();
}

protected final List<List<Object>> sql(String sql) {
return sql(null, sql);
}

protected final List<List<Object>> sql(Transaction tx, String sql) {
try (ResultSet<SqlRow> rs = anyNode().sql().execute(tx, sql)) {
return getAllResultSet(rs);
}
}

/** Cluster configuration that aggressively increases low watermark to speed up data cleanup in tests. */
public static String aggressiveLowWatermarkIncreaseClusterConfig() {
return "{\n"
Expand Down
Loading