Skip to content

Commit 939f72b

Browse files
committed
Clone workers in streams
1 parent 6ee6e0f commit 939f72b

File tree

5 files changed

+92
-16
lines changed

5 files changed

+92
-16
lines changed

chaos/streams.ts

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,38 @@
11
import type { ChaosProvider } from "@chaos/provider";
2-
import { typeofStream } from "@workers/main";
2+
import { typeofStream, type WorkerClient } from "@workers/main";
33
import type { WorkerManager } from "@workers/manager";
44

5+
export type StreamsConfig = {
6+
cloned: boolean;
7+
};
8+
59
export class StreamsChaos implements ChaosProvider {
6-
workers?: WorkerManager;
7-
start(workers: WorkerManager) {
10+
workers?: WorkerClient[];
11+
config: StreamsConfig;
12+
13+
constructor(config: StreamsConfig) {
14+
this.config = config;
15+
}
16+
17+
async start(workers: WorkerManager) {
818
console.log("Starting StreamsChaos");
9-
this.workers = workers;
10-
for (const worker of workers.getAll()) {
11-
worker.worker.startStream(typeofStream.Message);
19+
let allWorkers = workers.getAll().map((w) => w.worker);
20+
if (this.config.cloned) {
21+
allWorkers = await Promise.all(allWorkers.map((w) => w.clone()));
22+
}
23+
24+
this.workers = allWorkers;
25+
for (const worker of allWorkers) {
26+
worker.startStream(typeofStream.Message);
1227
}
1328

1429
return Promise.resolve();
1530
}
1631

1732
stop() {
1833
if (this.workers) {
19-
for (const worker of this.workers.getAll()) {
20-
worker.worker.stopStreams();
34+
for (const worker of this.workers) {
35+
worker.stopStreams();
2136
}
2237
}
2338

forks/cli.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ function buildRuntimeConfig(options: ForkOptions): RuntimeConfig {
5050
network: (options.env || "dev") as "local" | "dev" | "production",
5151
networkChaos: resolveNetworkChaosConfig(options.networkChaosLevel),
5252
dbChaos: resolveDbChaosConfig(options.dbChaosLevel),
53-
backgroundStreams: options.withBackgroundStreams,
53+
backgroundStreams: options.withBackgroundStreams
54+
? {
55+
cloned: true,
56+
}
57+
: null,
5458
};
5559
}
5660

forks/config.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { type DbChaosConfig } from "@chaos/db";
22
import type { NetworkChaosConfig } from "@chaos/network";
3+
import type { StreamsConfig } from "@chaos/streams";
34
import { getActiveVersion, type XmtpEnv } from "@helpers/versions";
45

56
export const NODE_VERSION = getActiveVersion().nodeBindings; // default to latest version, can be overridden with --nodeBindings=3.1.1
@@ -22,7 +23,7 @@ export const epochRotationOperations = {
2223
export const otherOperations = {
2324
createInstallation: false, // creates a new installation for a random worker
2425
sendMessage: true, // sends a message to the group
25-
sync: true, // syncs the group
26+
sync: false, // syncs the group
2627
};
2728
export const randomInboxIdsCount = 50; // How many inboxIds to use randomly in the add/remove operations
2829
export const installationCount = 2; // How many installations to use randomly in the createInstallation operations
@@ -130,7 +131,7 @@ export type RuntimeConfig = {
130131
network: XmtpEnv; // XMTP network
131132
networkChaos: NetworkChaosConfig | null; // Network chaos configuration
132133
dbChaos: DbChaosConfig | null; // Database chaos configuration
133-
backgroundStreams: boolean; //
134+
backgroundStreams: StreamsConfig | null; //
134135
};
135136

136137
export function getConfigFromEnv(): RuntimeConfig {
@@ -161,7 +162,9 @@ export function printConfig(config: RuntimeConfig): void {
161162
console.info(`randomInboxIdsCount: ${randomInboxIdsCount}`);
162163
console.info(`installationCount: ${installationCount}`);
163164
console.info(`testName: ${testName}`);
164-
console.info(`backgroundStreams: ${config.backgroundStreams}`);
165+
console.info(
166+
`backgroundStreams: ${config.backgroundStreams ? "enabled" : "disabled"}. From separate client instances: ${config.backgroundStreams?.cloned}`,
167+
);
165168

166169
if (config.networkChaos) {
167170
console.info("\nNETWORK CHAOS PARAMETERS");

forks/forks.test.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,13 @@ const {
3232
} = getConfigFromEnv();
3333

3434
const createOperations = (worker: Worker, groupID: string) => {
35-
const getGroup = () =>
36-
worker.client.conversations.getConversationById(groupID) as Promise<Group>;
35+
const getGroup = () => {
36+
const group = worker.client.conversations.getConversationById(groupID);
37+
if (!group) {
38+
throw new Error(`Group ${groupID} not found`);
39+
}
40+
return group as Promise<Group>;
41+
};
3742

3843
return {
3944
updateName: () =>
@@ -89,7 +94,7 @@ const startChaos = async (workers: WorkerManager): Promise<ChaosProvider[]> => {
8994
}
9095

9196
if (backgroundStreams) {
92-
chaosProviders.push(new StreamsChaos());
97+
chaosProviders.push(new StreamsChaos(backgroundStreams));
9398
}
9499

95100
// Start all chaos providers
@@ -192,7 +197,15 @@ describe(testName, () => {
192197
})(),
193198
);
194199
try {
195-
await Promise.all(parallelOperationsArray);
200+
const results = await Promise.allSettled(parallelOperationsArray);
201+
for (const result of results) {
202+
if (result.status === "rejected") {
203+
console.error(
204+
`Group ${groupIndex + 1} operation failed:`,
205+
result.reason,
206+
);
207+
}
208+
}
196209
} catch (e) {
197210
console.error(`Group ${groupIndex + 1} operation failed:`, e);
198211
}

workers/main.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ interface IWorkerClient {
113113

114114
// Properties
115115
readonly currentFolder: string;
116+
117+
// Clone Management
118+
clone(): Promise<WorkerClient>;
116119
}
117120

118121
// Worker thread code as a string
@@ -1254,4 +1257,42 @@ export class WorkerClient extends Worker implements IWorkerClient {
12541257
installationId: newInstallationId,
12551258
};
12561259
}
1260+
1261+
/**
1262+
* Creates a clone of this worker client with a separate underlying client instance
1263+
* The clone will have the same configuration but a new name: ${original_worker_name}_clone
1264+
* @returns A new WorkerClient instance with separate client
1265+
*/
1266+
async clone(): Promise<WorkerClient> {
1267+
console.debug(`[${this.nameId}] Creating clone of worker`);
1268+
1269+
// Create the clone name
1270+
const cloneName = `${this.name}_clone`;
1271+
1272+
// Create a WorkerBase object with the same properties but new name
1273+
const cloneWorkerBase: WorkerBase = {
1274+
name: cloneName,
1275+
sdk: this.sdk,
1276+
folder: this.folder,
1277+
walletKey: this.walletKey,
1278+
encryptionKey: this.encryptionKeyHex,
1279+
};
1280+
1281+
// Create a new WorkerClient instance with the same configuration
1282+
const clonedWorker = new WorkerClient(
1283+
cloneWorkerBase,
1284+
this.env,
1285+
{}, // Use default worker options
1286+
this.apiUrl,
1287+
);
1288+
1289+
// Initialize the cloned worker to create its client instance
1290+
await clonedWorker.initialize();
1291+
1292+
console.debug(
1293+
`[${this.nameId}] Successfully created clone: ${clonedWorker.nameId}`,
1294+
);
1295+
1296+
return clonedWorker;
1297+
}
12571298
}

0 commit comments

Comments
 (0)