From 34791c844b128ebfe6dbe98ecbbf9d3b01c18250 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 14 May 2026 12:54:14 -0400 Subject: [PATCH 1/4] fix(deal): combine FWSS, PDPVerifier, and SP-HTTP probes for dataset liveness `isDataSetLive` returns true only when all three signals agree: WarmStorage.validateDataSet, PDPVerifier.dataSetLive, and an unauthenticated POST to the SP's `/pdp/data-sets/{id}/pieces` endpoint. Curio returns HTTP 409 on that endpoint when `unrecoverable_proving_failure_epoch` is set, which is the only signal observable when a dataset is dead on the SP but chain still reports it as live. Chain probes rethrow on transient errors so transient outages don't get misclassified as termination. The SP HTTP probe treats any non-409 response (including auth failures and network errors) as live. Threads `providerAddress` through `isDataSetLive` so the SP probe can resolve the serviceURL from the provider registry. --- apps/backend/src/deal/deal.service.spec.ts | 97 +++++++++++++++++--- apps/backend/src/deal/deal.service.ts | 100 ++++++++++++++++++--- 2 files changed, 174 insertions(+), 23 deletions(-) diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index d6617d92..c3793c9a 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -37,6 +37,14 @@ vi.mock("@filoz/synapse-sdk", async (importOriginal) => { }; }); +const pdpVerifierDataSetLiveMock = vi.fn().mockResolvedValue(true); +vi.mock("@filoz/synapse-core/pdp-verifier", () => ({ + dataSetLive: (...args: unknown[]) => pdpVerifierDataSetLiveMock(...args), +})); + +const fetchMock = vi.fn(async () => new Response(null, { status: 200 })); +vi.stubGlobal("fetch", fetchMock); + vi.mock("filecoin-pin", () => ({ executeUpload: vi.fn(), cleanupSynapseService: vi.fn(), @@ -195,6 +203,8 @@ describe("DealService", () => { afterEach(() => { vi.clearAllMocks(); + pdpVerifierDataSetLiveMock.mockResolvedValue(true); + fetchMock.mockImplementation(async () => new Response(null, { status: 200 })); }); describe("createDeal", () => { @@ -222,7 +232,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "service url", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, @@ -247,6 +257,9 @@ describe("DealService", () => { dealRepoMock.create.mockReturnValue(mockDeal); mockStorageProviderRepository.findOne.mockResolvedValue({}); + mockSynapseInstance = { ...mockSynapseInstance, client: {} } as unknown as Synapse; + vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(mockProviderInfo); + vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(mockSynapseInstance); }); it("processes the full deal lifecycle successfully", async () => { @@ -1060,6 +1073,7 @@ describe("DealService", () => { }).compile(); const testService = module.get(DealService); + vi.spyOn(testService as any, "createSynapseInstance").mockResolvedValue(mockSynapseInstance); return testService; }; @@ -1195,7 +1209,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "service url", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, @@ -1262,7 +1276,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "service url", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, @@ -1433,16 +1447,76 @@ describe("DealService", () => { }); describe("isDataSetLive", () => { - it("rethrows non-terminal probe errors instead of classifying as terminated", async () => { - mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); - await expect(service.isDataSetLive(1n)).rejects.toThrow("ECONNREFUSED"); + const providerInfo: PDPProviderEx = { + id: 101n, + serviceProvider: "0xprovider", + payee: "0x100", + name: "Test Provider", + description: "Test Provider", + isActive: true, + isApproved: true, + pdp: { + serviceURL: "https://sp.example", + minPieceSizeInBytes: 0n, + maxPieceSizeInBytes: 100n, + storagePricePerTibPerDay: 1n, + minProvingPeriodInEpochs: 1n, + location: "location", + paymentTokenAddress: "0x100", + ipniPiece: true, + ipniIpfs: true, + }, + }; + + beforeEach(() => { + vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo); + const synapseMock = { client: {} } as unknown as Synapse; + vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock); }); - it("returns false only on the known not-live error message", async () => { + it("returns true when all three probes report live", async () => { + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("returns false when FWSS validateDataSet reports not live", async () => { mockWarmStorageService.validateDataSet.mockRejectedValueOnce( new Error("Data set 1 does not exist or is not live"), ); - await expect(service.isDataSetLive(1n)).resolves.toBe(false); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("returns false when PDPVerifier reports not live", async () => { + pdpVerifierDataSetLiveMock.mockResolvedValueOnce(false); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("returns false when SP HTTP probe returns 409", async () => { + fetchMock.mockResolvedValueOnce(new Response("terminated", { status: 409 })); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("treats SP HTTP non-409 responses as live", async () => { + fetchMock.mockResolvedValueOnce(new Response("At least one piece must be provided", { status: 400 })); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("treats SP HTTP network errors as live", async () => { + fetchMock.mockRejectedValueOnce(new Error("ECONNREFUSED")); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("rethrows FWSS validateDataSet errors that do not match the terminal message", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); + await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED"); + }); + + it("posts an empty JSON body to the SP addPieces endpoint", async () => { + await service.isDataSetLive("0xprovider", 42n); + expect(fetchMock).toHaveBeenCalledTimes(1); + const [calledUrl, init] = fetchMock.mock.calls[0] as [URL, RequestInit]; + expect(String(calledUrl)).toBe("https://sp.example/pdp/data-sets/42/pieces"); + expect(init.method).toBe("POST"); + expect(init.body).toBe("{}"); }); }); @@ -1457,7 +1531,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "u", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, @@ -1476,6 +1550,7 @@ describe("DealService", () => { } as any; const uploadPayload = { carData: Uint8Array.from([1]), rootCid: CID.parse(mockRootCid) }; const synapseMock = { + client: {}, storage: { createContext: vi.fn().mockResolvedValue({ dataSetId: 9n }), }, @@ -1483,6 +1558,8 @@ describe("DealService", () => { const deal = Object.assign(new Deal(), { id: "deal-skip", spAddress: "0xProvider" }); dealRepoMock.create.mockReturnValue(deal); mockStorageProviderRepository.findOne.mockResolvedValue({ providerId: 1, isApproved: true }); + vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo); + vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock); mockWarmStorageService.validateDataSet.mockRejectedValueOnce( new Error("Data set 9 does not exist or is not live"), ); @@ -1525,7 +1602,7 @@ describe("DealService", () => { isActive: true, isApproved: true, pdp: { - serviceURL: "service url", + serviceURL: "https://sp.example", minPieceSizeInBytes: 0n, maxPieceSizeInBytes: 100n, storagePricePerTibPerDay: 1n, diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 74b0055c..371056a3 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; import { setTimeout as setTimeoutAsync } from "node:timers/promises"; +import { dataSetLive as pdpVerifierDataSetLive } from "@filoz/synapse-core/pdp-verifier"; import { METADATA_KEYS, SIZE_CONSTANTS, Synapse } from "@filoz/synapse-sdk"; import { Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; @@ -43,6 +44,8 @@ type UploadPayload = { rootCid: CID; }; +const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000; + type UploadResultSummary = { pieceCid: string; pieceId?: number; @@ -363,7 +366,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { // pdpEndEpoch=0; createContext returns it and the next add-pieces path // would fail. See #379. if (storage.dataSetId !== undefined) { - const live = await this.isDataSetLive(storage.dataSetId, signal); + const live = await this.isDataSetLive(providerAddress, storage.dataSetId, signal); if (!live) { preUploadTerminated = true; throw new DealJobTerminatedDataSetError(storage.dataSetId); @@ -675,12 +678,12 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } /** - * Determines whether a provider's dataset slot is `missing`, `live`, or - * `terminated`. Resolves the dataset via createContext (FWSS pdpEndEpoch=0 - * filter) and then probes PDP liveness via WarmStorage.validateDataSet. + * Classifies a provider's dataset slot as `missing`, `live`, or `terminated`. + * Resolves the dataset via createContext, then composes the three liveness + * probes documented on `isDataSetLive`. * - * `terminated` means FWSS still considers the set provisionable but PDP has - * marked it dead (e.g. unrecoverable proving failure). See #379. + * `terminated` means at least one of FWSS / PDPVerifier / Curio reports the + * set as dead. See #379 and the SP-HTTP probe rationale in `isDataSetLive`. */ async getDataSetProvisioningStatus( providerAddress: string, @@ -707,19 +710,37 @@ export class DealService implements OnModuleInit, OnModuleDestroy { return { status: "missing" }; } const dataSetId = context.dataSetId; - const isLive = await this.isDataSetLive(dataSetId, signal); + const isLive = await this.isDataSetLive(providerAddress, dataSetId, signal); return isLive ? { status: "live", dataSetId } : { status: "terminated", dataSetId }; } /** - * PDP-liveness probe via WarmStorage.validateDataSet. + * Composite PDP-liveness check. Runs three independent probes and returns + * true only when all agree the dataset is live. + * + * - FWSS `validateDataSet` (chain): catches FWSS-side termination. + * - PDPVerifier `dataSetLive` (chain): catches PDPVerifier-side termination. + * - SP HTTP `POST /pdp/data-sets/{id}/pieces` (off-chain): catches Curio's + * `unrecoverable_proving_failure_epoch` state, which precedes chain + * propagation and is the only signal observable when the SP refuses + * addPieces but chain still reports the set as live. * - * Returns true if PDP reports the dataset live and FWSS-managed. - * Returns false ONLY for the known terminal error string ("does not exist or is not live"). - * Re-throws any other error (RPC failure, "not managed", unknown) so callers do not - * misclassify a transient probe failure as a terminated dataset. + * Chain probes rethrow on transient errors so callers do not misclassify a + * probe outage as termination. The SP HTTP probe treats any non-409 response + * (including network errors and auth failures) as live, since 409 is the + * sole positive-terminated signal Curio emits on this endpoint. */ - async isDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { + async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + const [fwssLive, pdpLive, spLive] = await Promise.all([ + this.probeFwssDataSetLive(dataSetId, signal), + this.probePdpVerifierDataSetLive(dataSetId, signal), + this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal), + ]); + return fwssLive && pdpLive && spLive; + } + + protected async probeFwssDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { signal?.throwIfAborted(); const { warmStorageService } = this.walletSdkService.getWalletServices(); try { @@ -735,6 +756,59 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } } + protected async probePdpVerifierDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + const synapse = this.sharedSynapse ?? (await this.createSynapseInstance()); + return awaitWithAbort(pdpVerifierDataSetLive(synapse.client, { dataSetId }), signal); + } + + /** + * Probes the SP's Curio addPieces endpoint with an empty body. Curio's + * handler checks `unrecoverable_proving_failure_epoch` before body + * validation and returns HTTP 409 when the dataset is marked terminated. + * Any other response (400 for empty pieces, 404, 5xx, network error) is + * treated as live, since none signal positive termination. + */ + protected async probeSpHttpDataSetLive( + providerAddress: string, + dataSetId: bigint, + signal?: AbortSignal, + ): Promise { + signal?.throwIfAborted(); + const providerInfo = this.walletSdkService.getProviderInfo(providerAddress); + if (!providerInfo) { + throw new Error(`Provider ${providerAddress} not found in registry`); + } + const serviceURL = providerInfo.pdp?.serviceURL; + if (!serviceURL) { + throw new Error(`Provider ${providerAddress} has no PDP serviceURL`); + } + const url = new URL(`pdp/data-sets/${dataSetId.toString()}/pieces`, serviceURL); + const timeoutSignal = AbortSignal.timeout(PDP_LIVENESS_PROBE_TIMEOUT_MS); + const probeSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal; + try { + const res = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: "{}", + signal: probeSignal, + }); + return res.status !== 409; + } catch (error) { + if (signal?.aborted) throw error; + this.logger.warn({ + event: "dataset_sp_liveness_probe_failed", + message: "SP HTTP liveness probe failed; treating dataset as live", + providerAddress, + providerId: providerInfo.id, + dataSetId: dataSetId.toString(), + serviceURL, + error: toStructuredError(error), + }); + return true; + } + } + /** * Repair a PDP-terminated dataset (FWSS may or may not have flipped pdpEndEpoch). * From feedae8b1e8f127c2c96a6073b5a8ed6a17e1c4f Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 14 May 2026 13:27:47 -0400 Subject: [PATCH 2/4] fix(deal): preserve SP 409 termination signal across transient probe errors `Promise.all` previously let a transient chain-probe rejection mask a conclusive SP HTTP 409. Switch to `Promise.allSettled` and return false when any probe positively reports terminated; only rethrow when no probe reported termination. Also require the SP HTTP probe to match `unrecoverable proving failure` in the response body in addition to HTTP 409. Defends against a future Curio reusing 409 for a non-terminal conflict, which would otherwise trigger destructive `terminateDataSet` + deal cleanup. Tests cover both the SP-409-rescues-transient-chain-error path and the 409-with-different-body-treated-as-live path. --- apps/backend/src/deal/deal.service.spec.ts | 24 +++++++++++-- apps/backend/src/deal/deal.service.ts | 39 +++++++++++++++------- 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index c3793c9a..7e2c04af 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -1490,11 +1490,18 @@ describe("DealService", () => { await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); }); - it("returns false when SP HTTP probe returns 409", async () => { - fetchMock.mockResolvedValueOnce(new Response("terminated", { status: 409 })); + it("returns false when SP HTTP probe returns 409 with the terminated body", async () => { + fetchMock.mockResolvedValueOnce( + new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), + ); await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); }); + it("treats SP HTTP 409 with a different body as live", async () => { + fetchMock.mockResolvedValueOnce(new Response("piece already exists", { status: 409 })); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + it("treats SP HTTP non-409 responses as live", async () => { fetchMock.mockResolvedValueOnce(new Response("At least one piece must be provided", { status: 400 })); await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); @@ -1510,6 +1517,19 @@ describe("DealService", () => { await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED"); }); + it("rethrows PDPVerifier dataSetLive errors when no probe reports terminated", async () => { + pdpVerifierDataSetLiveMock.mockRejectedValueOnce(new Error("rpc timeout")); + await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("rpc timeout"); + }); + + it("returns false when SP reports terminated even if PDPVerifier RPC throws", async () => { + pdpVerifierDataSetLiveMock.mockRejectedValueOnce(new Error("rpc timeout")); + fetchMock.mockResolvedValueOnce( + new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), + ); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + it("posts an empty JSON body to the SP addPieces endpoint", async () => { await service.isDataSetLive("0xprovider", 42n); expect(fetchMock).toHaveBeenCalledTimes(1); diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 371056a3..641264d4 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -715,8 +715,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } /** - * Composite PDP-liveness check. Runs three independent probes and returns - * true only when all agree the dataset is live. + * Composite PDP-liveness check. Runs three independent probes: * * - FWSS `validateDataSet` (chain): catches FWSS-side termination. * - PDPVerifier `dataSetLive` (chain): catches PDPVerifier-side termination. @@ -725,19 +724,27 @@ export class DealService implements OnModuleInit, OnModuleDestroy { * propagation and is the only signal observable when the SP refuses * addPieces but chain still reports the set as live. * - * Chain probes rethrow on transient errors so callers do not misclassify a - * probe outage as termination. The SP HTTP probe treats any non-409 response - * (including network errors and auth failures) as live, since 409 is the - * sole positive-terminated signal Curio emits on this endpoint. + * A positive-terminated signal from any probe wins: if any settled result is + * `false`, returns `false` even when another probe threw a transient error. + * Otherwise rethrows the first rejection so a probe outage is not silently + * misclassified as live. The SP HTTP probe never throws on transient errors + * (returns `true` on non-409 responses, including network errors and auth + * failures), since HTTP 409 with Curio's terminated body is the only signal + * that endpoint emits. */ async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise { signal?.throwIfAborted(); - const [fwssLive, pdpLive, spLive] = await Promise.all([ + const settled = await Promise.allSettled([ this.probeFwssDataSetLive(dataSetId, signal), this.probePdpVerifierDataSetLive(dataSetId, signal), this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal), ]); - return fwssLive && pdpLive && spLive; + if (settled.some((r) => r.status === "fulfilled" && r.value === false)) { + return false; + } + const rejection = settled.find((r): r is PromiseRejectedResult => r.status === "rejected"); + if (rejection) throw rejection.reason; + return true; } protected async probeFwssDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { @@ -765,9 +772,15 @@ export class DealService implements OnModuleInit, OnModuleDestroy { /** * Probes the SP's Curio addPieces endpoint with an empty body. Curio's * handler checks `unrecoverable_proving_failure_epoch` before body - * validation and returns HTTP 409 when the dataset is marked terminated. - * Any other response (400 for empty pieces, 404, 5xx, network error) is - * treated as live, since none signal positive termination. + * validation and returns HTTP 409 with "Data set has been terminated due to + * unrecoverable proving failure" when the dataset is marked terminated. + * + * Returns `false` only on `409` paired with that body text. The body + * substring is a guard against blast radius: if a future Curio reuses `409` + * for a non-terminal conflict, the probe stays conservative rather than + * triggering destructive repair. Any other response (including a `409` with + * a different body, `400` for empty pieces, `404`, `5xx`, and network + * errors) is treated as live. */ protected async probeSpHttpDataSetLive( providerAddress: string, @@ -793,7 +806,9 @@ export class DealService implements OnModuleInit, OnModuleDestroy { body: "{}", signal: probeSignal, }); - return res.status !== 409; + if (res.status !== 409) return true; + const body = await res.text(); + return !/unrecoverable proving failure/i.test(body); } catch (error) { if (signal?.aborted) throw error; this.logger.warn({ From f6c908e059245b6fcd682e34fbc811c2915d9b24 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 14 May 2026 13:52:01 -0400 Subject: [PATCH 3/4] fix(deal): cast fetch mock call tuple via unknown for strict tuple typecheck --- apps/backend/src/deal/deal.service.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index 7e2c04af..46a023dc 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -1533,7 +1533,7 @@ describe("DealService", () => { it("posts an empty JSON body to the SP addPieces endpoint", async () => { await service.isDataSetLive("0xprovider", 42n); expect(fetchMock).toHaveBeenCalledTimes(1); - const [calledUrl, init] = fetchMock.mock.calls[0] as [URL, RequestInit]; + const [calledUrl, init] = fetchMock.mock.calls[0] as unknown as [URL, RequestInit]; expect(String(calledUrl)).toBe("https://sp.example/pdp/data-sets/42/pieces"); expect(init.method).toBe("POST"); expect(init.body).toBe("{}"); From ef6417ae2c6edcfa7549536acf88f333ce963bd1 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Mon, 18 May 2026 11:50:42 -0400 Subject: [PATCH 4/4] refactor(deal): drop redundant PDPVerifier probe --- apps/backend/src/deal/deal.service.spec.ts | 22 ++----------- apps/backend/src/deal/deal.service.ts | 37 +++++++++------------- 2 files changed, 18 insertions(+), 41 deletions(-) diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index 46a023dc..960eee09 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -37,11 +37,6 @@ vi.mock("@filoz/synapse-sdk", async (importOriginal) => { }; }); -const pdpVerifierDataSetLiveMock = vi.fn().mockResolvedValue(true); -vi.mock("@filoz/synapse-core/pdp-verifier", () => ({ - dataSetLive: (...args: unknown[]) => pdpVerifierDataSetLiveMock(...args), -})); - const fetchMock = vi.fn(async () => new Response(null, { status: 200 })); vi.stubGlobal("fetch", fetchMock); @@ -203,7 +198,6 @@ describe("DealService", () => { afterEach(() => { vi.clearAllMocks(); - pdpVerifierDataSetLiveMock.mockResolvedValue(true); fetchMock.mockImplementation(async () => new Response(null, { status: 200 })); }); @@ -1474,7 +1468,7 @@ describe("DealService", () => { vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock); }); - it("returns true when all three probes report live", async () => { + it("returns true when both probes report live", async () => { await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); }); @@ -1485,11 +1479,6 @@ describe("DealService", () => { await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); }); - it("returns false when PDPVerifier reports not live", async () => { - pdpVerifierDataSetLiveMock.mockResolvedValueOnce(false); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); - }); - it("returns false when SP HTTP probe returns 409 with the terminated body", async () => { fetchMock.mockResolvedValueOnce( new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), @@ -1517,13 +1506,8 @@ describe("DealService", () => { await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED"); }); - it("rethrows PDPVerifier dataSetLive errors when no probe reports terminated", async () => { - pdpVerifierDataSetLiveMock.mockRejectedValueOnce(new Error("rpc timeout")); - await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("rpc timeout"); - }); - - it("returns false when SP reports terminated even if PDPVerifier RPC throws", async () => { - pdpVerifierDataSetLiveMock.mockRejectedValueOnce(new Error("rpc timeout")); + it("returns false when SP reports terminated even if FWSS RPC throws transiently", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); fetchMock.mockResolvedValueOnce( new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), ); diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 641264d4..2edf4026 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -1,6 +1,5 @@ import { randomUUID } from "node:crypto"; import { setTimeout as setTimeoutAsync } from "node:timers/promises"; -import { dataSetLive as pdpVerifierDataSetLive } from "@filoz/synapse-core/pdp-verifier"; import { METADATA_KEYS, SIZE_CONSTANTS, Synapse } from "@filoz/synapse-sdk"; import { Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; @@ -679,11 +678,11 @@ export class DealService implements OnModuleInit, OnModuleDestroy { /** * Classifies a provider's dataset slot as `missing`, `live`, or `terminated`. - * Resolves the dataset via createContext, then composes the three liveness - * probes documented on `isDataSetLive`. + * Resolves the dataset via createContext, then composes the liveness probes + * documented on `isDataSetLive`. * - * `terminated` means at least one of FWSS / PDPVerifier / Curio reports the - * set as dead. See #379 and the SP-HTTP probe rationale in `isDataSetLive`. + * `terminated` means either FWSS or Curio reports the set as dead. See #379 + * and the SP-HTTP probe rationale in `isDataSetLive`. */ async getDataSetProvisioningStatus( providerAddress: string, @@ -715,28 +714,28 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } /** - * Composite PDP-liveness check. Runs three independent probes: + * Composite PDP-liveness check. Runs two independent probes: * - * - FWSS `validateDataSet` (chain): catches FWSS-side termination. - * - PDPVerifier `dataSetLive` (chain): catches PDPVerifier-side termination. + * - FWSS `validateDataSet` (chain): wraps `PDPVerifier.dataSetLive` via + * multicall and additionally verifies the listener is this WarmStorage + * contract, so it covers chain-side liveness fully. * - SP HTTP `POST /pdp/data-sets/{id}/pieces` (off-chain): catches Curio's * `unrecoverable_proving_failure_epoch` state, which precedes chain * propagation and is the only signal observable when the SP refuses * addPieces but chain still reports the set as live. * - * A positive-terminated signal from any probe wins: if any settled result is - * `false`, returns `false` even when another probe threw a transient error. - * Otherwise rethrows the first rejection so a probe outage is not silently - * misclassified as live. The SP HTTP probe never throws on transient errors - * (returns `true` on non-409 responses, including network errors and auth - * failures), since HTTP 409 with Curio's terminated body is the only signal - * that endpoint emits. + * A positive-terminated signal from either probe wins: if any settled result + * is `false`, returns `false` even when the other probe threw a transient + * error. Otherwise rethrows the first rejection so a probe outage is not + * silently misclassified as live. The SP HTTP probe never throws on + * transient errors (returns `true` on non-409 responses, including network + * errors and auth failures), since HTTP 409 with Curio's terminated body is + * the only signal that endpoint emits. */ async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise { signal?.throwIfAborted(); const settled = await Promise.allSettled([ this.probeFwssDataSetLive(dataSetId, signal), - this.probePdpVerifierDataSetLive(dataSetId, signal), this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal), ]); if (settled.some((r) => r.status === "fulfilled" && r.value === false)) { @@ -763,12 +762,6 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } } - protected async probePdpVerifierDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { - signal?.throwIfAborted(); - const synapse = this.sharedSynapse ?? (await this.createSynapseInstance()); - return awaitWithAbort(pdpVerifierDataSetLive(synapse.client, { dataSetId }), signal); - } - /** * Probes the SP's Curio addPieces endpoint with an empty body. Curio's * handler checks `unrecoverable_proving_failure_epoch` before body