Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { CodexAdapter } from "../src/provider/Services/CodexAdapter.ts";
import { ProviderService } from "../src/provider/Services/ProviderService.ts";
import { AnalyticsService } from "../src/telemetry/Services/AnalyticsService.ts";
import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointReactor.ts";
import { ColdStartLifecycleLive } from "../src/orchestration/Layers/ColdStartLifecycle.ts";
import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts";
import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts";
Expand Down Expand Up @@ -305,6 +306,7 @@ export const makeOrchestrationIntegrationHarness = (
Layer.provideMerge(runtimeServicesLayer),
);
const orchestrationReactorLayer = OrchestrationReactorLive.pipe(
Layer.provideMerge(ColdStartLifecycleLive),
Layer.provideMerge(runtimeIngestionLayer),
Layer.provideMerge(providerCommandReactorLayer),
Layer.provideMerge(checkpointReactorLayer),
Expand Down
252 changes: 252 additions & 0 deletions apps/server/src/orchestration/Layers/ColdStartLifecycle.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";

import * as NodeServices from "@effect/platform-node/NodeServices";
import type { ProviderSession } from "@t3tools/contracts";
import {
CommandId,
DEFAULT_PROVIDER_INTERACTION_MODE,
EventId,
ProjectId,
ThreadId,
TurnId,
} from "@t3tools/contracts";
import { derivePendingUserInputs } from "@t3tools/shared/pendingUserInput";
import { Effect, Layer, ManagedRuntime } from "effect";
import { afterEach, describe, expect, it } from "vitest";

import { ServerConfig } from "../../config.ts";
import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts";
import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts";
import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts";
import {
ProviderService,
type ProviderServiceShape,
} from "../../provider/Services/ProviderService.ts";
import { ColdStartLifecycle } from "../Services/ColdStartLifecycle.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import { ColdStartLifecycleLive } from "./ColdStartLifecycle.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";

const asProjectId = (value: string): ProjectId => ProjectId.makeUnsafe(value);
const asTurnId = (value: string): TurnId => TurnId.makeUnsafe(value);

describe("ColdStartLifecycle", () => {
let runtime: ManagedRuntime.ManagedRuntime<
OrchestrationEngineService | ColdStartLifecycle,
unknown
> | null = null;
const createdStateDirs = new Set<string>();

afterEach(async () => {
if (runtime) {
await runtime.dispose();
}
runtime = null;
for (const stateDir of createdStateDirs) {
fs.rmSync(stateDir, { recursive: true, force: true });
}
createdStateDirs.clear();
});

async function createHarness(input?: {
readonly stateDir?: string;
readonly sessions?: ReadonlyArray<ProviderSession>;
}) {
const now = new Date().toISOString();
const stateDir =
input?.stateDir ?? fs.mkdtempSync(path.join(os.tmpdir(), "t3code-cold-start-"));
createdStateDirs.add(stateDir);
const sessions = [...(input?.sessions ?? [])];

const unsupported = () => Effect.die(new Error("Unsupported provider call in test")) as never;
const providerService: ProviderServiceShape = {
startSession: () => unsupported(),
sendTurn: () => unsupported(),
interruptTurn: () => unsupported(),
respondToRequest: () => unsupported(),
respondToUserInput: () => unsupported(),
stopSession: () => unsupported(),
listSessions: () => Effect.succeed(sessions),
getCapabilities: () => unsupported(),
rollbackConversation: () => unsupported(),
streamEvents: unsupported(),
};

const orchestrationLayer = OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionPipelineLive),
Layer.provide(OrchestrationEventStoreLive),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Layer.provide(SqlitePersistenceMemory),
);
const layer = ColdStartLifecycleLive.pipe(
Layer.provideMerge(orchestrationLayer),
Layer.provideMerge(Layer.succeed(ProviderService, providerService)),
Layer.provideMerge(ServerConfig.layerTest(process.cwd(), stateDir)),
Layer.provideMerge(NodeServices.layer),
);
runtime = ManagedRuntime.make(layer);

const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
const coldStartLifecycle = await runtime.runPromise(Effect.service(ColdStartLifecycle));

await Effect.runPromise(
engine.dispatch({
type: "project.create",
commandId: CommandId.makeUnsafe("cmd-project-create"),
projectId: asProjectId("project-1"),
title: "Provider Project",
workspaceRoot: "/tmp/provider-project",
defaultModel: "gpt-5-codex",
createdAt: now,
}),
);
await Effect.runPromise(
engine.dispatch({
type: "thread.create",
commandId: CommandId.makeUnsafe("cmd-thread-create"),
threadId: ThreadId.makeUnsafe("thread-1"),
projectId: asProjectId("project-1"),
title: "Thread",
model: "gpt-5-codex",
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
branch: null,
worktreePath: null,
createdAt: now,
}),
);

return {
engine,
coldStartLifecycle,
};
}

it("expires stale pending user-input requests on cold boot", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
const testRuntime = runtime;
if (!testRuntime) {
throw new Error("Expected runtime to be initialized");
}

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.activity.append",
commandId: CommandId.makeUnsafe("cmd-user-input-requested"),
threadId: ThreadId.makeUnsafe("thread-1"),
activity: {
id: EventId.makeUnsafe("evt-user-input-requested"),
tone: "info",
kind: "user-input.requested",
summary: "User input requested",
payload: {
requestId: "user-input-request-1",
questions: [
{
id: "sandbox_mode",
header: "Sandbox",
question: "Which mode should be used?",
options: [
{
label: "workspace-write",
description: "Allow workspace writes only",
},
],
},
],
},
turnId: asTurnId("turn-1"),
createdAt: now,
},
createdAt: now,
}),
);

await testRuntime.runPromise(harness.coldStartLifecycle.run);

const readModel = await testRuntime.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.makeUnsafe("thread-1"));
const expiredActivity = thread?.activities.find(
(activity) => activity.kind === "user-input.expired",
);

expect(expiredActivity?.summary).toBe("Pending question expired after app restart");
expect(expiredActivity?.turnId).toBe("turn-1");
expect(expiredActivity?.payload).toMatchObject({
requestId: "user-input-request-1",
reason: "server-restart",
});
expect(derivePendingUserInputs(thread?.activities ?? [])).toEqual([]);
});

it("skips cold-start stale-input expiration when live provider sessions already exist", async () => {
const harness = await createHarness({
sessions: [
{
provider: "codex",
status: "running",
runtimeMode: "approval-required",
threadId: ThreadId.makeUnsafe("thread-1"),
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
],
});
const now = new Date().toISOString();
const testRuntime = runtime;
if (!testRuntime) {
throw new Error("Expected runtime to be initialized");
}

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.activity.append",
commandId: CommandId.makeUnsafe("cmd-user-input-requested-live-session"),
threadId: ThreadId.makeUnsafe("thread-1"),
activity: {
id: EventId.makeUnsafe("evt-user-input-requested-live-session"),
tone: "info",
kind: "user-input.requested",
summary: "User input requested",
payload: {
requestId: "user-input-request-live-session",
questions: [
{
id: "sandbox_mode",
header: "Sandbox",
question: "Which mode should be used?",
options: [
{
label: "workspace-write",
description: "Allow workspace writes only",
},
],
},
],
},
turnId: asTurnId("turn-live"),
createdAt: now,
},
createdAt: now,
}),
);

await testRuntime.runPromise(harness.coldStartLifecycle.run);

const readModel = await testRuntime.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.makeUnsafe("thread-1"));

expect(thread?.activities.some((activity) => activity.kind === "user-input.expired")).toBe(
false,
);
expect(derivePendingUserInputs(thread?.activities ?? [])).toEqual([
expect.objectContaining({
requestId: "user-input-request-live-session",
}),
]);
});
});
88 changes: 88 additions & 0 deletions apps/server/src/orchestration/Layers/ColdStartLifecycle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { CommandId, EventId, type OrchestrationThreadActivity, type ThreadId, type TurnId } from "@t3tools/contracts";
import { derivePendingUserInputs } from "@t3tools/shared/pendingUserInput";
import { Cause, Effect, Layer } from "effect";

import { ProviderService } from "../../provider/Services/ProviderService.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import { ColdStartLifecycle, type ColdStartLifecycleShape } from "../Services/ColdStartLifecycle.ts";

const serverCommandId = (tag: string): CommandId =>
CommandId.makeUnsafe(`server:${tag}:${crypto.randomUUID()}`);

const appendExpiredUserInputActivity = (input: {
readonly orchestrationEngine: typeof OrchestrationEngineService.Service;
readonly threadId: ThreadId;
readonly requestId: string;
readonly turnId: TurnId | null;
readonly createdAt: string;
}) =>
input.orchestrationEngine.dispatch({
type: "thread.activity.append",
commandId: serverCommandId("expire-stale-user-input"),
threadId: input.threadId,
activity: {
id: EventId.makeUnsafe(`server:user-input-expired:${crypto.randomUUID()}`),
tone: "info",
kind: "user-input.expired",
summary: "Pending question expired after app restart",
payload: {
requestId: input.requestId,
reason: "server-restart",
detail:
"This pending question could not be resumed after app restart. Re-run the action if you still want to answer it.",
},
turnId: input.turnId,
createdAt: input.createdAt,
} satisfies OrchestrationThreadActivity,
createdAt: input.createdAt,
});

const expireStalePendingUserInputsOnColdBoot = (input: {
readonly orchestrationEngine: typeof OrchestrationEngineService.Service;
}) =>
Effect.gen(function* () {
const readModel = yield* input.orchestrationEngine.getReadModel();

for (const thread of readModel.threads) {
const pendingPrompts = derivePendingUserInputs(thread.activities);
for (const prompt of pendingPrompts) {
const expiredAt = new Date().toISOString();
yield* appendExpiredUserInputActivity({
orchestrationEngine: input.orchestrationEngine,
threadId: thread.id,
requestId: prompt.requestId,
turnId: prompt.turnId,
createdAt: expiredAt,
}).pipe(
Effect.catchCause((cause) =>
Effect.logWarning("failed to expire stale pending user-input request on cold boot", {
threadId: thread.id,
requestId: prompt.requestId,
cause: Cause.pretty(cause),
}),
),
);
}
}
});

const makeColdStartLifecycle = Effect.gen(function* () {
const orchestrationEngine = yield* OrchestrationEngineService;
const providerService = yield* ProviderService;

const run: ColdStartLifecycleShape["run"] = Effect.gen(function* () {
const activeSessions = yield* providerService.listSessions();
if (activeSessions.length > 0) {
return;
}

const tasks = [expireStalePendingUserInputsOnColdBoot({ orchestrationEngine })];
yield* Effect.forEach(tasks, (task) => task).pipe(Effect.asVoid);
});

return {
run,
} satisfies ColdStartLifecycleShape;
});

export const ColdStartLifecycleLive = Layer.effect(ColdStartLifecycle, makeColdStartLifecycle);
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import { afterEach, describe, expect, it } from "vitest";

import { CheckpointReactor } from "../Services/CheckpointReactor.ts";
import { ColdStartLifecycle } from "../Services/ColdStartLifecycle.ts";
import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts";
import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts";
import { OrchestrationReactor } from "../Services/OrchestrationReactor.ts";
Expand All @@ -22,6 +23,13 @@ describe("OrchestrationReactor", () => {

runtime = ManagedRuntime.make(
Layer.effect(OrchestrationReactor, makeOrchestrationReactor).pipe(
Layer.provideMerge(
Layer.succeed(ColdStartLifecycle, {
run: Effect.sync(() => {
started.push("cold-start-lifecycle");
}),
}),
),
Layer.provideMerge(
Layer.succeed(ProviderRuntimeIngestionService, {
start: Effect.sync(() => {
Expand Down Expand Up @@ -54,6 +62,7 @@ describe("OrchestrationReactor", () => {
await Effect.runPromise(reactor.start.pipe(Scope.provide(scope)));

expect(started).toEqual([
"cold-start-lifecycle",
"provider-runtime-ingestion",
"provider-command-reactor",
"checkpoint-reactor",
Expand Down
Loading