diff --git a/chaos/db.ts b/chaos/db.ts index 81f62803f..0fd201d12 100644 --- a/chaos/db.ts +++ b/chaos/db.ts @@ -5,6 +5,7 @@ export type DbChaosConfig = { minLockTime: number; maxLockTime: number; lockInterval: number; + impactedWorkerPercentage: number; // number between 0 and 100 for what % of workers to lock on each run }; export class DbChaos implements ChaosProvider { @@ -17,7 +18,8 @@ export class DbChaos implements ChaosProvider { } start(workers: WorkerManager): Promise { - const { minLockTime, maxLockTime, lockInterval } = this.config; + const { minLockTime, maxLockTime, lockInterval, impactedWorkerPercentage } = + this.config; console.log( `Starting DB Chaos: Locking for ${minLockTime}ms - ${maxLockTime}ms @@ -25,6 +27,9 @@ export class DbChaos implements ChaosProvider { ); this.interval = setInterval(() => { for (const worker of workers.getAll()) { + if (Math.random() * 100 > impactedWorkerPercentage) { + continue; + } const duration = Math.floor( minLockTime + Math.random() * (maxLockTime - minLockTime), ); diff --git a/chaos/streams.ts b/chaos/streams.ts index 2549361d7..5008fe36d 100644 --- a/chaos/streams.ts +++ b/chaos/streams.ts @@ -1,14 +1,29 @@ import type { ChaosProvider } from "@chaos/provider"; -import { typeofStream } from "@workers/main"; +import { typeofStream, type WorkerClient } from "@workers/main"; import type { WorkerManager } from "@workers/manager"; +export type StreamsConfig = { + cloned: boolean; +}; + export class StreamsChaos implements ChaosProvider { - workers?: WorkerManager; - start(workers: WorkerManager) { + workers?: WorkerClient[]; + config: StreamsConfig; + + constructor(config: StreamsConfig) { + this.config = config; + } + + async start(workers: WorkerManager) { console.log("Starting StreamsChaos"); - this.workers = workers; - for (const worker of workers.getAll()) { - worker.worker.startStream(typeofStream.Message); + let allWorkers = workers.getAll().map((w) => w.worker); + if (this.config.cloned) { + allWorkers = await Promise.all(allWorkers.map((w) => w.clone())); + } + + this.workers = allWorkers; + for (const worker of allWorkers) { + worker.startStream(typeofStream.Message); } return Promise.resolve(); @@ -16,8 +31,8 @@ export class StreamsChaos implements ChaosProvider { stop() { if (this.workers) { - for (const worker of this.workers.getAll()) { - worker.worker.stopStreams(); + for (const worker of this.workers) { + worker.stopStreams(); } } diff --git a/forks/README.md b/forks/README.md index 59faa51d8..3ffd545ee 100644 --- a/forks/README.md +++ b/forks/README.md @@ -25,7 +25,7 @@ XMTP_ENV=production ### Running locally Before running this suite locally you _must_ run `yarn gen update:local` to pre-populate the database with inboxes to add and remove from the group. Otherwise add/remove member operations will fail, which will not increase the epoch or trigger forks. -### Fork generation through send testing +### Fork generation through parallel operations The main approach creates intentional conflicts by running parallel operations on shared groups: @@ -33,7 +33,7 @@ The main approach creates intentional conflicts by running parallel operations o - Add X workers as super admins to each group - Loop each group until epoch Y: - Choose random worker and syncAll conversations - - Run between 2 random operations: + - Run parallel operations: - Update group name - Send message (random message) - Add member (random inboxId) @@ -45,23 +45,25 @@ The main approach creates intentional conflicts by running parallel operations o ### Parameters -- **groupCount**: `5` - Number of groups to create in parallel -- **nodeBindings**: `3.x.x` - Node SDK version to use -- **parallelOperations**: `1` - How many operations to perform in parallel -- **enabledOperations**: - Operations configuration - enable/disable specific operations - - `updateName`: true, // updates the name of the group - - `sendMessage`: false, // sends a message to the group - - `addMember`: true, // adds a random member to the group - - `removeMember`: true, // removes a random member from the group - - `createInstallation`: true, // creates a new installation for a random worker -- **workerNames**: Random workers (`random1`, `random2`, ..., `random10`) -- **targetEpoch**: `100n` - The target epoch to stop the test (epochs are when performing forks to the group) -- **network**: `process.env.XMTP_ENV` - Network environment setting -- **randomInboxIdsCount**: `30` - How many inboxIds to use randomly in the add/remove operations -- **installationCount**: `5` - How many installations to use randomly in the createInstallation operations -- **typeofStreamForTest**: `typeofStream.None` - No streams started by default (configured on-demand) -- **typeOfSyncForTest**: `typeOfSync.None` - No automatic syncing (configured on-demand) -- **streams**: `false` - Enable message streams on all workers (configured via `--streams` CLI flag) +Default configuration values (can be overridden via CLI flags): + +- **groupCount**: `5` - Number of groups to create in parallel (override with `--group-count`) +- **nodeBindings**: Latest version - Node SDK version to use +- **parallelOperations**: `5` - How many operations to perform in parallel (override with `--parallel-operations`) +- **epochRotationOperations**: Operations that rotate epochs: + - `updateName`: true - Updates the name of the group + - `addMember`: true - Adds a random member to the group + - `removeMember`: true - Removes a random member from the group +- **otherOperations**: Additional operations: + - `sendMessage`: true - Sends a message to the group + - `createInstallation`: false - Creates a new installation for a random worker + - `sync`: false - Syncs the group +- **workerNames**: Random workers (`random1`, `random2`, `random3`, `random4`, `random5`) +- **targetEpoch**: `20` - The target epoch to stop the test (override with `--target-epoch`) +- **network**: `dev` - Network environment setting (override with `--env`) +- **randomInboxIdsCount**: `50` - How many inboxIds to use randomly in the add/remove operations +- **installationCount**: `2` - How many installations to use randomly in the createInstallation operations +- **backgroundStreams**: `false` - Enable message streams on all workers (enable with `--with-background-streams`) ### Test setup in local network @@ -73,7 +75,7 @@ The main approach creates intentional conflicts by running parallel operations o yarn local-update # Process that runs the test 100 times and exports forks logs -yarn test forks --attempts 100 --env local --log warn --file --forks +yarn fork --count 100 --env local ``` ## CLI Usage @@ -90,21 +92,50 @@ yarn fork --count 50 # Clean all raw logs before starting yarn fork --clean-all +# Keep logs that don't contain fork content +yarn fork --no-remove-non-matching + # Run on a specific environment yarn fork --count 200 --env local # Enable message streams on all workers -yarn fork --streams +yarn fork --with-background-streams + +# Configure test parameters +yarn fork --group-count 10 --parallel-operations 3 --target-epoch 50 + +# Set log level for test runner +yarn fork --log-level debug # Show help yarn fork --help ``` +### CLI Options + +- `--count`: Number of times to run the fork detection process (default: 100) +- `--clean-all`: Clean all raw logs before starting (default: false) +- `--remove-non-matching`: Remove logs that don't contain fork content (default: true) +- `--no-remove-non-matching`: Keep logs that don't contain fork content +- `--env`: XMTP environment - `local`, `dev`, or `production` (default: `dev` or `XMTP_ENV`) +- `--network-chaos-level`: Network chaos level - `none`, `low`, `medium`, or `high` (default: `none`) +- `--db-chaos-level`: Database chaos level - `none`, `low`, `medium`, or `high` (default: `none`) +- `--with-background-streams`: Enable message streams on all workers (default: false) +- `--log-level`: Log level for test runner - `debug`, `info`, `warn`, `error` (default: `warn`) +- `--group-count`: Number of groups to run the test against (default: 5) +- `--parallel-operations`: Number of parallel operations run on each group (default: 5) +- `--target-epoch`: Target epoch to stop the test at (default: 20) + +### Statistics Output + The CLI provides statistics including: - Total runs and forks detected +- Runs with forks vs. runs without forks +- Runs with errors - Fork detection rate - Average forks per run +- Average forks per run (with forks only) ### Network Chaos Testing @@ -113,7 +144,7 @@ The fork test can inject network chaos (latency, jitter, packet loss) to simulat **Requirements:** - Network chaos requires `--env local` - Multinode Docker containers must be running (`./multinode/up`) -- Must be run on linux with `tc` and `iptables` commands available. Will not work on MacOS. +- Must be run on Linux with `tc` and `iptables` commands available. Will not work on MacOS. - Requires `sudo` access **Chaos Levels:** @@ -122,19 +153,19 @@ The fork test can inject network chaos (latency, jitter, packet loss) to simulat |--------|-------------|--------------|-------------|----------| | low | 50-150ms | 0-50ms | 0-2% | 15s | | medium | 100-300ms | 0-75ms | 0-3.5% | 10s | -| high | 100-500ms | 0-100ms | 0-5% | 10s | +| high | 0-500ms | 50-200ms | 0-25% | 10s | **Usage:** ```bash -# Run with default (medium) chaos -yarn fork --env local --chaos-enabled +# Run with low network chaos +yarn fork --env local --network-chaos-level low -# Run with high chaos level -yarn fork --env local --chaos-enabled --chaos-level high +# Run with high network chaos level +yarn fork --env local --network-chaos-level high -# Run 50 iterations with low chaos -yarn fork --count 50 --env local --chaos-enabled --chaos-level low +# Run 50 iterations with medium network chaos +yarn fork --count 50 --env local --network-chaos-level medium ``` **How it works:** @@ -146,10 +177,49 @@ yarn fork --count 50 --env local --chaos-enabled --chaos-level low **Example output:** ``` NETWORK CHAOS PARAMETERS -chaosEnabled: true -chaosLevel: high - delay: 100-500ms - jitter: 0-100ms - packetLoss: 0-5% + delay: 0-500ms + jitter: 50-200ms + packetLoss: 0-25% interval: 10000ms ``` + +### Database Chaos Testing + +The fork test can inject database chaos by temporarily locking database files to simulate database contention and I/O issues. + +**Chaos Levels:** + +| Level | Lock Duration | Interval | +|--------|---------------|----------| +| low | 50-250ms | 10s | +| medium | 100-2000ms | 15s | +| high | 500-2000ms | 5s | + +**Usage:** + +```bash +# Run with low database chaos +yarn fork --db-chaos-level low + +# Run with high database chaos level +yarn fork --db-chaos-level high + +# Run 50 iterations with medium database chaos +yarn fork --count 50 --db-chaos-level medium + +# Combine network and database chaos +yarn fork --env local --network-chaos-level medium --db-chaos-level medium +``` + +**How it works:** +1. Periodically locks database files for each worker +2. Lock duration is randomized within the preset range +3. Workers experience database busy/locked errors during operations +4. Cleans up and waits for all locks to complete when test finishes + +**Example output:** +``` +DATABASE CHAOS PARAMETERS + lockDuration: 500-2000ms + interval: 5000ms +``` diff --git a/forks/cli.ts b/forks/cli.ts index 4917fe783..a17d14fb4 100644 --- a/forks/cli.ts +++ b/forks/cli.ts @@ -50,7 +50,11 @@ function buildRuntimeConfig(options: ForkOptions): RuntimeConfig { network: (options.env || "dev") as "local" | "dev" | "production", networkChaos: resolveNetworkChaosConfig(options.networkChaosLevel), dbChaos: resolveDbChaosConfig(options.dbChaosLevel), - backgroundStreams: options.withBackgroundStreams, + backgroundStreams: options.withBackgroundStreams + ? { + cloned: true, + } + : null, }; } @@ -129,7 +133,7 @@ async function runForkDetection(options: ForkOptions): Promise { const success = runForkTest(options, runtimeConfig); if (!success) { stats.runsWithErrors++; - console.log(`❌ Error in run ${i}/${options.count}`); + console.info(`❌ Error in run ${i}/${options.count}`); } // Clean and analyze fork logs after the test (suppress output) diff --git a/forks/config.ts b/forks/config.ts index 746dda9d0..636f69551 100644 --- a/forks/config.ts +++ b/forks/config.ts @@ -1,5 +1,6 @@ import { type DbChaosConfig } from "@chaos/db"; import type { NetworkChaosConfig } from "@chaos/network"; +import type { StreamsConfig } from "@chaos/streams"; import { getActiveVersion, type XmtpEnv } from "@helpers/versions"; export const NODE_VERSION = getActiveVersion().nodeBindings; // default to latest version, can be overridden with --nodeBindings=3.1.1 @@ -11,6 +12,11 @@ export const workerNames = [ "random3", "random4", "random5", + "random1", + "random2", + "random3", + "random4", + "random5", ] as string[]; // Operations configuration - enable/disable specific operations @@ -20,9 +26,9 @@ export const epochRotationOperations = { removeMember: true, // removes a random member from the group }; export const otherOperations = { - createInstallation: false, // creates a new installation for a random worker + createInstallation: true, // creates a new installation for a random worker sendMessage: true, // sends a message to the group - sync: true, // syncs the group + sync: false, // syncs the group }; export const randomInboxIdsCount = 50; // How many inboxIds to use randomly in the add/remove operations export const installationCount = 2; // How many installations to use randomly in the createInstallation operations @@ -74,17 +80,20 @@ export const dbChaosPresets: Record< low: { minLockTime: 50, maxLockTime: 250, - lockInterval: 10000, // 20 seconds + lockInterval: 10000, // 10 seconds + impactedWorkerPercentage: 20, }, medium: { minLockTime: 100, maxLockTime: 2000, lockInterval: 15000, // 15 seconds + impactedWorkerPercentage: 40, }, high: { minLockTime: 500, maxLockTime: 2000, lockInterval: 5000, // 5 seconds + impactedWorkerPercentage: 60, }, }; @@ -130,7 +139,7 @@ export type RuntimeConfig = { network: XmtpEnv; // XMTP network networkChaos: NetworkChaosConfig | null; // Network chaos configuration dbChaos: DbChaosConfig | null; // Database chaos configuration - backgroundStreams: boolean; // + backgroundStreams: StreamsConfig | null; // }; export function getConfigFromEnv(): RuntimeConfig { @@ -161,7 +170,9 @@ export function printConfig(config: RuntimeConfig): void { console.info(`randomInboxIdsCount: ${randomInboxIdsCount}`); console.info(`installationCount: ${installationCount}`); console.info(`testName: ${testName}`); - console.info(`backgroundStreams: ${config.backgroundStreams}`); + console.info( + `backgroundStreams: ${config.backgroundStreams ? "enabled" : "disabled"}. From separate client instances: ${config.backgroundStreams?.cloned}`, + ); if (config.networkChaos) { console.info("\nNETWORK CHAOS PARAMETERS"); diff --git a/forks/forks.test.ts b/forks/forks.test.ts index 9c3ee41bf..4baf2838a 100644 --- a/forks/forks.test.ts +++ b/forks/forks.test.ts @@ -32,8 +32,14 @@ const { } = getConfigFromEnv(); const createOperations = (worker: Worker, groupID: string) => { - const getGroup = () => - worker.client.conversations.getConversationById(groupID) as Promise; + const getGroup = async () => { + const group = + await worker.client.conversations.getConversationById(groupID); + if (!group) { + throw new Error(`Group ${groupID} not found`); + } + return group as Group; + }; return { updateName: () => @@ -89,7 +95,7 @@ const startChaos = async (workers: WorkerManager): Promise => { } if (backgroundStreams) { - chaosProviders.push(new StreamsChaos()); + chaosProviders.push(new StreamsChaos(backgroundStreams)); } // Start all chaos providers @@ -141,8 +147,9 @@ describe(testName, () => { const groupOperationPromises = groupIDs.map( async (groupID, groupIndex) => { let currentEpoch = 0n; + let numConsecutiveFailures = 0; - while (currentEpoch < targetEpoch) { + while (currentEpoch < targetEpoch && numConsecutiveFailures < 5) { const parallelOperationsArray = Array.from( { length: parallelOperations }, () => @@ -191,20 +198,30 @@ describe(testName, () => { } })(), ); + const results = await Promise.allSettled(parallelOperationsArray); + for (const result of results) { + if (result.status === "rejected") { + console.error( + `Group ${groupIndex + 1} operation failed:`, + result.reason, + ); + } + } + try { - await Promise.all(parallelOperationsArray); + await workers.checkForksForGroup(groupID); + const group = await workers + .getCreator() + .client.conversations.getConversationById(groupID); + if (!group) { + throw new Error("Could not find group"); + } + currentEpoch = (await group.debugInfo()).epoch; + numConsecutiveFailures = 0; } catch (e) { console.error(`Group ${groupIndex + 1} operation failed:`, e); + numConsecutiveFailures++; } - - await workers.checkForksForGroup(groupID); - const group = await workers - .getCreator() - .client.conversations.getConversationById(groupID); - if (!group) { - throw new Error("Could not find group"); - } - currentEpoch = (await group.debugInfo()).epoch; } return { groupIndex, finalEpoch: currentEpoch }; @@ -212,7 +229,6 @@ describe(testName, () => { ); await Promise.all(groupOperationPromises); - await workers.checkForks(); } catch (e: any) { const msg = `Error during fork testing: ${e}`; console.error(msg); @@ -223,6 +239,8 @@ describe(testName, () => { for (const chaosProvider of chaosProviders) { await chaosProvider.stop(); } + // Check for forks one last time, with all chaos turned off to ensure the check can succeed. + await workers.checkForks(); } }); }); diff --git a/helpers/versions.ts b/helpers/versions.ts index 25784529c..fb71af570 100644 --- a/helpers/versions.ts +++ b/helpers/versions.ts @@ -249,6 +249,8 @@ export const regressionClient = async ( loggingLevel, apiUrl, appVersion: APP_VERSION, + historySyncUrl: "http://localhost:5558", + disableDeviceSync: false, codecs: [new ReactionCodec(), new ReplyCodec()], }); } catch (error) { diff --git a/multinode/docker-compose.yml b/multinode/docker-compose.yml index d72854e3a..2dbebb0d6 100644 --- a/multinode/docker-compose.yml +++ b/multinode/docker-compose.yml @@ -117,6 +117,12 @@ services: image: ghcr.io/xmtp/mls-validation-service:main platform: linux/amd64 + history-server: + image: ghcr.io/xmtp/message-history-server:main + platform: linux/amd64 + ports: + - 5558:5558 + db: image: postgres:13 environment: diff --git a/workers/main.ts b/workers/main.ts index e5aa135b8..639dbeadb 100644 --- a/workers/main.ts +++ b/workers/main.ts @@ -113,6 +113,9 @@ interface IWorkerClient { // Properties readonly currentFolder: string; + + // Clone Management + clone(): Promise; } // Worker thread code as a string @@ -1254,4 +1257,42 @@ export class WorkerClient extends Worker implements IWorkerClient { installationId: newInstallationId, }; } + + /** + * Creates a clone of this worker client with a separate underlying client instance + * The clone will have the same configuration but a new name: ${original_worker_name}_clone + * @returns A new WorkerClient instance with separate client + */ + async clone(): Promise { + console.debug(`[${this.nameId}] Creating clone of worker`); + + // Create the clone name + const cloneName = `${this.name}_clone`; + + // Create a WorkerBase object with the same properties but new name + const cloneWorkerBase: WorkerBase = { + name: cloneName, + sdk: this.sdk, + folder: this.folder, + walletKey: this.walletKey, + encryptionKey: this.encryptionKeyHex, + }; + + // Create a new WorkerClient instance with the same configuration + const clonedWorker = new WorkerClient( + cloneWorkerBase, + this.env, + {}, // Use default worker options + this.apiUrl, + ); + + // Initialize the cloned worker to create its client instance + await clonedWorker.initialize(); + + console.debug( + `[${this.nameId}] Successfully created clone: ${clonedWorker.nameId}`, + ); + + return clonedWorker; + } } diff --git a/workers/manager.ts b/workers/manager.ts index 6fbe69ebb..7f2286dee 100644 --- a/workers/manager.ts +++ b/workers/manager.ts @@ -516,7 +516,7 @@ export async function getWorkers( (v) => v.nodeBindings, ); } - let workerPromises: Promise[] = []; + let successfulResults: Worker[] = []; let descriptors: string[] = []; // Handle different input types @@ -528,68 +528,75 @@ export async function getWorkers( : getFixedNames(workers) : workers; descriptors = names; - workerPromises = descriptors.map((descriptor) => - manager.createWorker( - descriptor, - sdkVersions[Math.floor(Math.random() * sdkVersions.length)], - ), - ); + for (const descriptor of descriptors) { + successfulResults.push( + await manager.createWorker( + descriptor, + sdkVersions[Math.floor(Math.random() * sdkVersions.length)], + ), + ); + } } else { // Record input - apply versioning if requested let entries = Object.entries(workers); descriptors = entries.map(([descriptor]) => descriptor); - workerPromises = entries.map(([descriptor, apiUrl]) => - manager.createWorker(descriptor, sdkVersions[0], apiUrl), - ); - } - - // Only use progress bar if there are more than 50 workers - const useProgressBar = workerPromises.length > 50; - let progressBar: ProgressBar | undefined; - - if (useProgressBar) { - progressBar = new ProgressBar( - workerPromises.length, - `Initializing ${workerPromises.length} workers...`, - ); - // Show initial progress immediately - progressBar.update(0); - } - - // Track all workers in parallel and update progress as each completes - let completedCount = 0; - const results = await Promise.allSettled( - workerPromises.map(async (workerPromise) => { - try { - const worker = await workerPromise; - completedCount++; - if (useProgressBar && progressBar) { - progressBar.update(completedCount); - } - return worker; - } catch (error) { - completedCount++; - if (useProgressBar && progressBar) { - progressBar.update(completedCount); - } - throw error; - } - }), - ); - - // Check for any failures - const failedResults = results.filter( - (result) => result.status === "rejected", - ); - if (failedResults.length > 0) { - throw failedResults[0].reason; + for (const descriptor of descriptors) { + successfulResults.push( + await manager.createWorker( + descriptor, + sdkVersions[Math.floor(Math.random() * sdkVersions.length)], + ), + ); + } } - // Extract successful results - const successfulResults = results.map( - (result) => (result as PromiseFulfilledResult).value, - ); + // // Only use progress bar if there are more than 50 workers + // const useProgressBar = workerPromises.length > 50; + // let progressBar: ProgressBar | undefined; + + // if (useProgressBar) { + // progressBar = new ProgressBar( + // workerPromises.length, + // `Initializing ${workerPromises.length} workers...`, + // ); + // // Show initial progress immediately + // progressBar.update(0); + // } + + // // Track all workers in parallel and update progress as each completes + // let completedCount = 0; + // const results = await Promise.allSettled( + // workerPromises.map(async (workerPromise) => { + // try { + // const worker = await workerPromise; + // completedCount++; + // if (useProgressBar && progressBar) { + // progressBar.update(completedCount); + // } + // return worker; + // } catch (error) { + // completedCount++; + // if (useProgressBar && progressBar) { + // progressBar.update(completedCount); + // } + // throw error; + // } + // }), + // ); + + // // Check for any failures + // const failedResults = results.filter( + // (result) => result.status === "rejected", + // ); + // if (failedResults.length > 0) { + // throw failedResults[0].reason; + // } + + // // Extract successful results + // const successfulResults = results.map( + // (result) => (result as PromiseFulfilledResult).value, + // ); // Add all successful workers to the manager for (const worker of successfulResults) { @@ -618,12 +625,11 @@ function getNextFolderName(): string { const dataPath = path.resolve(process.cwd(), ".data"); let folder = "a"; if (fs.existsSync(dataPath)) { - const existingFolders = fs - .readdirSync(dataPath) - .filter((f) => /^[a-z]$/.test(f)); - folder = String.fromCharCode( - "a".charCodeAt(0) + (existingFolders.length % 26), - ); + const chars = "abcdefghijklmnopqrstuvwxyz0123456789"; + folder = Array.from( + { length: 32 }, + () => chars[Math.floor(Math.random() * chars.length)], + ).join(""); } return folder; }