diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index d6617d92..960eee09 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -37,6 +37,9 @@ vi.mock("@filoz/synapse-sdk", async (importOriginal) => { }; }); +const fetchMock = vi.fn(async () => new Response(null, { status: 200 })); +vi.stubGlobal("fetch", fetchMock); + vi.mock("filecoin-pin", () => ({ executeUpload: vi.fn(), cleanupSynapseService: vi.fn(), @@ -195,6 +198,7 @@ describe("DealService", () => { afterEach(() => { vi.clearAllMocks(); + fetchMock.mockImplementation(async () => new Response(null, { status: 200 })); }); describe("createDeal", () => { @@ -222,7 +226,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "service url", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, @@ -247,6 +251,9 @@ describe("DealService", () => { dealRepoMock.create.mockReturnValue(mockDeal); mockStorageProviderRepository.findOne.mockResolvedValue({}); + mockSynapseInstance = { ...mockSynapseInstance, client: {} } as unknown as Synapse; + vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(mockProviderInfo); + vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(mockSynapseInstance); }); it("processes the full deal lifecycle successfully", async () => { @@ -1060,6 +1067,7 @@ describe("DealService", () => { }).compile(); const testService = module.get(DealService); + vi.spyOn(testService as any, "createSynapseInstance").mockResolvedValue(mockSynapseInstance); return testService; }; @@ -1195,7 +1203,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "service url", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, @@ -1262,7 +1270,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "service url", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, @@ -1433,16 +1441,86 @@ describe("DealService", () => { }); describe("isDataSetLive", () => { - it("rethrows non-terminal probe errors instead of classifying as terminated", async () => { - mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); - await expect(service.isDataSetLive(1n)).rejects.toThrow("ECONNREFUSED"); + const providerInfo: PDPProviderEx = { + id: 101n, + serviceProvider: "0xprovider", + payee: "0x100", + name: "Test Provider", + description: "Test Provider", + isActive: true, + isApproved: true, + pdp: { + serviceURL: "https://sp.example", + minPieceSizeInBytes: 0n, + maxPieceSizeInBytes: 100n, + storagePricePerTibPerDay: 1n, + minProvingPeriodInEpochs: 1n, + location: "location", + paymentTokenAddress: "0x100", + ipniPiece: true, + ipniIpfs: true, + }, + }; + + beforeEach(() => { + vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo); + const synapseMock = { client: {} } as unknown as Synapse; + vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock); + }); + + it("returns true when both probes report live", async () => { + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); }); - it("returns false only on the known not-live error message", async () => { + it("returns false when FWSS validateDataSet reports not live", async () => { mockWarmStorageService.validateDataSet.mockRejectedValueOnce( new Error("Data set 1 does not exist or is not live"), ); - await expect(service.isDataSetLive(1n)).resolves.toBe(false); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("returns false when SP HTTP probe returns 409 with the terminated body", async () => { + fetchMock.mockResolvedValueOnce( + new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), + ); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("treats SP HTTP 409 with a different body as live", async () => { + fetchMock.mockResolvedValueOnce(new Response("piece already exists", { status: 409 })); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("treats SP HTTP non-409 responses as live", async () => { + fetchMock.mockResolvedValueOnce(new Response("At least one piece must be provided", { status: 400 })); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("treats SP HTTP network errors as live", async () => { + fetchMock.mockRejectedValueOnce(new Error("ECONNREFUSED")); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("rethrows FWSS validateDataSet errors that do not match the terminal message", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); + await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED"); + }); + + it("returns false when SP reports terminated even if FWSS RPC throws transiently", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); + fetchMock.mockResolvedValueOnce( + new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), + ); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("posts an empty JSON body to the SP addPieces endpoint", async () => { + await service.isDataSetLive("0xprovider", 42n); + expect(fetchMock).toHaveBeenCalledTimes(1); + const [calledUrl, init] = fetchMock.mock.calls[0] as unknown as [URL, RequestInit]; + expect(String(calledUrl)).toBe("https://sp.example/pdp/data-sets/42/pieces"); + expect(init.method).toBe("POST"); + expect(init.body).toBe("{}"); }); }); @@ -1457,7 +1535,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "u", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, @@ -1476,6 +1554,7 @@ describe("DealService", () => { } as any; const uploadPayload = { carData: Uint8Array.from([1]), rootCid: CID.parse(mockRootCid) }; const synapseMock = { + client: {}, storage: { createContext: vi.fn().mockResolvedValue({ dataSetId: 9n }), }, @@ -1483,6 +1562,8 @@ describe("DealService", () => { const deal = Object.assign(new Deal(), { id: "deal-skip", spAddress: "0xProvider" }); dealRepoMock.create.mockReturnValue(deal); mockStorageProviderRepository.findOne.mockResolvedValue({ providerId: 1, isApproved: true }); + vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo); + vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock); mockWarmStorageService.validateDataSet.mockRejectedValueOnce( new Error("Data set 9 does not exist or is not live"), ); @@ -1525,7 +1606,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "service url", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 74b0055c..2edf4026 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -43,6 +43,8 @@ type UploadPayload = { rootCid: CID; }; +const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000; + type UploadResultSummary = { pieceCid: string; pieceId?: number; @@ -363,7 +365,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { // pdpEndEpoch=0; createContext returns it and the next add-pieces path // would fail. See #379. if (storage.dataSetId !== undefined) { - const live = await this.isDataSetLive(storage.dataSetId, signal); + const live = await this.isDataSetLive(providerAddress, storage.dataSetId, signal); if (!live) { preUploadTerminated = true; throw new DealJobTerminatedDataSetError(storage.dataSetId); @@ -675,12 +677,12 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } /** - * Determines whether a provider's dataset slot is `missing`, `live`, or - * `terminated`. Resolves the dataset via createContext (FWSS pdpEndEpoch=0 - * filter) and then probes PDP liveness via WarmStorage.validateDataSet. + * Classifies a provider's dataset slot as `missing`, `live`, or `terminated`. + * Resolves the dataset via createContext, then composes the liveness probes + * documented on `isDataSetLive`. * - * `terminated` means FWSS still considers the set provisionable but PDP has - * marked it dead (e.g. unrecoverable proving failure). See #379. + * `terminated` means either FWSS or Curio reports the set as dead. See #379 + * and the SP-HTTP probe rationale in `isDataSetLive`. */ async getDataSetProvisioningStatus( providerAddress: string, @@ -707,19 +709,44 @@ export class DealService implements OnModuleInit, OnModuleDestroy { return { status: "missing" }; } const dataSetId = context.dataSetId; - const isLive = await this.isDataSetLive(dataSetId, signal); + const isLive = await this.isDataSetLive(providerAddress, dataSetId, signal); return isLive ? { status: "live", dataSetId } : { status: "terminated", dataSetId }; } /** - * PDP-liveness probe via WarmStorage.validateDataSet. + * Composite PDP-liveness check. Runs two independent probes: + * + * - FWSS `validateDataSet` (chain): wraps `PDPVerifier.dataSetLive` via + * multicall and additionally verifies the listener is this WarmStorage + * contract, so it covers chain-side liveness fully. + * - SP HTTP `POST /pdp/data-sets/{id}/pieces` (off-chain): catches Curio's + * `unrecoverable_proving_failure_epoch` state, which precedes chain + * propagation and is the only signal observable when the SP refuses + * addPieces but chain still reports the set as live. * - * Returns true if PDP reports the dataset live and FWSS-managed. - * Returns false ONLY for the known terminal error string ("does not exist or is not live"). - * Re-throws any other error (RPC failure, "not managed", unknown) so callers do not - * misclassify a transient probe failure as a terminated dataset. + * A positive-terminated signal from either probe wins: if any settled result + * is `false`, returns `false` even when the other probe threw a transient + * error. Otherwise rethrows the first rejection so a probe outage is not + * silently misclassified as live. The SP HTTP probe never throws on + * transient errors (returns `true` on non-409 responses, including network + * errors and auth failures), since HTTP 409 with Curio's terminated body is + * the only signal that endpoint emits. */ - async isDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { + async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + const settled = await Promise.allSettled([ + this.probeFwssDataSetLive(dataSetId, signal), + this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal), + ]); + if (settled.some((r) => r.status === "fulfilled" && r.value === false)) { + return false; + } + const rejection = settled.find((r): r is PromiseRejectedResult => r.status === "rejected"); + if (rejection) throw rejection.reason; + return true; + } + + protected async probeFwssDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { signal?.throwIfAborted(); const { warmStorageService } = this.walletSdkService.getWalletServices(); try { @@ -735,6 +762,61 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } } + /** + * Probes the SP's Curio addPieces endpoint with an empty body. Curio's + * handler checks `unrecoverable_proving_failure_epoch` before body + * validation and returns HTTP 409 with "Data set has been terminated due to + * unrecoverable proving failure" when the dataset is marked terminated. + * + * Returns `false` only on `409` paired with that body text. The body + * substring is a guard against blast radius: if a future Curio reuses `409` + * for a non-terminal conflict, the probe stays conservative rather than + * triggering destructive repair. Any other response (including a `409` with + * a different body, `400` for empty pieces, `404`, `5xx`, and network + * errors) is treated as live. + */ + protected async probeSpHttpDataSetLive( + providerAddress: string, + dataSetId: bigint, + signal?: AbortSignal, + ): Promise { + signal?.throwIfAborted(); + const providerInfo = this.walletSdkService.getProviderInfo(providerAddress); + if (!providerInfo) { + throw new Error(`Provider ${providerAddress} not found in registry`); + } + const serviceURL = providerInfo.pdp?.serviceURL; + if (!serviceURL) { + throw new Error(`Provider ${providerAddress} has no PDP serviceURL`); + } + const url = new URL(`pdp/data-sets/${dataSetId.toString()}/pieces`, serviceURL); + const timeoutSignal = AbortSignal.timeout(PDP_LIVENESS_PROBE_TIMEOUT_MS); + const probeSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal; + try { + const res = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: "{}", + signal: probeSignal, + }); + if (res.status !== 409) return true; + const body = await res.text(); + return !/unrecoverable proving failure/i.test(body); + } catch (error) { + if (signal?.aborted) throw error; + this.logger.warn({ + event: "dataset_sp_liveness_probe_failed", + message: "SP HTTP liveness probe failed; treating dataset as live", + providerAddress, + providerId: providerInfo.id, + dataSetId: dataSetId.toString(), + serviceURL, + error: toStructuredError(error), + }); + return true; + } + } + /** * Repair a PDP-terminated dataset (FWSS may or may not have flipped pdpEndEpoch). *