Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 91 additions & 10 deletions apps/backend/src/deal/deal.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ vi.mock("@filoz/synapse-sdk", async (importOriginal) => {
};
});

const fetchMock = vi.fn(async () => new Response(null, { status: 200 }));
vi.stubGlobal("fetch", fetchMock);

vi.mock("filecoin-pin", () => ({
executeUpload: vi.fn(),
cleanupSynapseService: vi.fn(),
Expand Down Expand Up @@ -195,6 +198,7 @@ describe("DealService", () => {

afterEach(() => {
vi.clearAllMocks();
fetchMock.mockImplementation(async () => new Response(null, { status: 200 }));
});

describe("createDeal", () => {
Expand Down Expand Up @@ -222,7 +226,7 @@ describe("DealService", () => {
isActive: true,
isApproved: true,
pdp: {
serviceURL: "service url",
serviceURL: "https://sp.example",
minPieceSizeInBytes: 0n,
maxPieceSizeInBytes: 100n,
storagePricePerTibPerDay: 1n,
Expand All @@ -247,6 +251,9 @@ describe("DealService", () => {

dealRepoMock.create.mockReturnValue(mockDeal);
mockStorageProviderRepository.findOne.mockResolvedValue({});
mockSynapseInstance = { ...mockSynapseInstance, client: {} } as unknown as Synapse;
vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(mockProviderInfo);
vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(mockSynapseInstance);
});

it("processes the full deal lifecycle successfully", async () => {
Expand Down Expand Up @@ -1060,6 +1067,7 @@ describe("DealService", () => {
}).compile();

const testService = module.get<DealService>(DealService);
vi.spyOn(testService as any, "createSynapseInstance").mockResolvedValue(mockSynapseInstance);

return testService;
};
Expand Down Expand Up @@ -1195,7 +1203,7 @@ describe("DealService", () => {
isActive: true,
isApproved: true,
pdp: {
serviceURL: "service url",
serviceURL: "https://sp.example",
minPieceSizeInBytes: 0n,
maxPieceSizeInBytes: 100n,
storagePricePerTibPerDay: 1n,
Expand Down Expand Up @@ -1262,7 +1270,7 @@ describe("DealService", () => {
isActive: true,
isApproved: true,
pdp: {
serviceURL: "service url",
serviceURL: "https://sp.example",
minPieceSizeInBytes: 0n,
maxPieceSizeInBytes: 100n,
storagePricePerTibPerDay: 1n,
Expand Down Expand Up @@ -1433,16 +1441,86 @@ describe("DealService", () => {
});

describe("isDataSetLive", () => {
it("rethrows non-terminal probe errors instead of classifying as terminated", async () => {
mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545"));
await expect(service.isDataSetLive(1n)).rejects.toThrow("ECONNREFUSED");
const providerInfo: PDPProviderEx = {
id: 101n,
serviceProvider: "0xprovider",
payee: "0x100",
name: "Test Provider",
description: "Test Provider",
isActive: true,
isApproved: true,
pdp: {
serviceURL: "https://sp.example",
minPieceSizeInBytes: 0n,
maxPieceSizeInBytes: 100n,
storagePricePerTibPerDay: 1n,
minProvingPeriodInEpochs: 1n,
location: "location",
paymentTokenAddress: "0x100",
ipniPiece: true,
ipniIpfs: true,
},
};

beforeEach(() => {
vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo);
const synapseMock = { client: {} } as unknown as Synapse;
vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock);
});

it("returns true when both probes report live", async () => {
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true);
});

it("returns false only on the known not-live error message", async () => {
it("returns false when FWSS validateDataSet reports not live", async () => {
mockWarmStorageService.validateDataSet.mockRejectedValueOnce(
new Error("Data set 1 does not exist or is not live"),
);
await expect(service.isDataSetLive(1n)).resolves.toBe(false);
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false);
});

it("returns false when SP HTTP probe returns 409 with the terminated body", async () => {
fetchMock.mockResolvedValueOnce(
new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }),
);
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false);
});

it("treats SP HTTP 409 with a different body as live", async () => {
fetchMock.mockResolvedValueOnce(new Response("piece already exists", { status: 409 }));
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true);
});

it("treats SP HTTP non-409 responses as live", async () => {
fetchMock.mockResolvedValueOnce(new Response("At least one piece must be provided", { status: 400 }));
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true);
});

it("treats SP HTTP network errors as live", async () => {
fetchMock.mockRejectedValueOnce(new Error("ECONNREFUSED"));
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true);
});

it("rethrows FWSS validateDataSet errors that do not match the terminal message", async () => {
mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545"));
await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED");
});

it("returns false when SP reports terminated even if FWSS RPC throws transiently", async () => {
mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545"));
fetchMock.mockResolvedValueOnce(
new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }),
);
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false);
});

it("posts an empty JSON body to the SP addPieces endpoint", async () => {
await service.isDataSetLive("0xprovider", 42n);
expect(fetchMock).toHaveBeenCalledTimes(1);
const [calledUrl, init] = fetchMock.mock.calls[0] as unknown as [URL, RequestInit];
expect(String(calledUrl)).toBe("https://sp.example/pdp/data-sets/42/pieces");
expect(init.method).toBe("POST");
expect(init.body).toBe("{}");
});
});

Expand All @@ -1457,7 +1535,7 @@ describe("DealService", () => {
isActive: true,
isApproved: true,
pdp: {
serviceURL: "u",
serviceURL: "https://sp.example",
minPieceSizeInBytes: 0n,
maxPieceSizeInBytes: 100n,
storagePricePerTibPerDay: 1n,
Expand All @@ -1476,13 +1554,16 @@ describe("DealService", () => {
} as any;
const uploadPayload = { carData: Uint8Array.from([1]), rootCid: CID.parse(mockRootCid) };
const synapseMock = {
client: {},
storage: {
createContext: vi.fn().mockResolvedValue({ dataSetId: 9n }),
},
} as unknown as Synapse;
const deal = Object.assign(new Deal(), { id: "deal-skip", spAddress: "0xProvider" });
dealRepoMock.create.mockReturnValue(deal);
mockStorageProviderRepository.findOne.mockResolvedValue({ providerId: 1, isApproved: true });
vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo);
vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock);
mockWarmStorageService.validateDataSet.mockRejectedValueOnce(
new Error("Data set 9 does not exist or is not live"),
);
Expand Down Expand Up @@ -1525,7 +1606,7 @@ describe("DealService", () => {
isActive: true,
isApproved: true,
pdp: {
serviceURL: "service url",
serviceURL: "https://sp.example",
minPieceSizeInBytes: 0n,
maxPieceSizeInBytes: 100n,
storagePricePerTibPerDay: 1n,
Expand Down
108 changes: 95 additions & 13 deletions apps/backend/src/deal/deal.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type UploadPayload = {
rootCid: CID;
};

const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000;

type UploadResultSummary = {
pieceCid: string;
pieceId?: number;
Expand Down Expand Up @@ -363,7 +365,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy {
// pdpEndEpoch=0; createContext returns it and the next add-pieces path
// would fail. See #379.
if (storage.dataSetId !== undefined) {
const live = await this.isDataSetLive(storage.dataSetId, signal);
const live = await this.isDataSetLive(providerAddress, storage.dataSetId, signal);
if (!live) {
preUploadTerminated = true;
throw new DealJobTerminatedDataSetError(storage.dataSetId);
Expand Down Expand Up @@ -675,12 +677,12 @@ export class DealService implements OnModuleInit, OnModuleDestroy {
}

/**
* Determines whether a provider's dataset slot is `missing`, `live`, or
* `terminated`. Resolves the dataset via createContext (FWSS pdpEndEpoch=0
* filter) and then probes PDP liveness via WarmStorage.validateDataSet.
* Classifies a provider's dataset slot as `missing`, `live`, or `terminated`.
* Resolves the dataset via createContext, then composes the liveness probes
* documented on `isDataSetLive`.
*
* `terminated` means FWSS still considers the set provisionable but PDP has
* marked it dead (e.g. unrecoverable proving failure). See #379.
* `terminated` means either FWSS or Curio reports the set as dead. See #379
* and the SP-HTTP probe rationale in `isDataSetLive`.
*/
async getDataSetProvisioningStatus(
providerAddress: string,
Expand All @@ -707,19 +709,44 @@ export class DealService implements OnModuleInit, OnModuleDestroy {
return { status: "missing" };
}
const dataSetId = context.dataSetId;
const isLive = await this.isDataSetLive(dataSetId, signal);
const isLive = await this.isDataSetLive(providerAddress, dataSetId, signal);
return isLive ? { status: "live", dataSetId } : { status: "terminated", dataSetId };
}

/**
* PDP-liveness probe via WarmStorage.validateDataSet.
* Composite PDP-liveness check. Runs two independent probes:
*
* - FWSS `validateDataSet` (chain): wraps `PDPVerifier.dataSetLive` via
* multicall and additionally verifies the listener is this WarmStorage
* contract, so it covers chain-side liveness fully.
* - SP HTTP `POST /pdp/data-sets/{id}/pieces` (off-chain): catches Curio's
* `unrecoverable_proving_failure_epoch` state, which precedes chain
* propagation and is the only signal observable when the SP refuses
* addPieces but chain still reports the set as live.
*
* Returns true if PDP reports the dataset live and FWSS-managed.
* Returns false ONLY for the known terminal error string ("does not exist or is not live").
* Re-throws any other error (RPC failure, "not managed", unknown) so callers do not
* misclassify a transient probe failure as a terminated dataset.
* A positive-terminated signal from either probe wins: if any settled result
* is `false`, returns `false` even when the other probe threw a transient
* error. Otherwise rethrows the first rejection so a probe outage is not
* silently misclassified as live. The SP HTTP probe never throws on
* transient errors (returns `true` on non-409 responses, including network
* errors and auth failures), since HTTP 409 with Curio's terminated body is
* the only signal that endpoint emits.
*/
async isDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise<boolean> {
async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise<boolean> {
signal?.throwIfAborted();
const settled = await Promise.allSettled([
this.probeFwssDataSetLive(dataSetId, signal),
this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal),
]);
if (settled.some((r) => r.status === "fulfilled" && r.value === false)) {
return false;
}
const rejection = settled.find((r): r is PromiseRejectedResult => r.status === "rejected");
if (rejection) throw rejection.reason;
return true;
}

protected async probeFwssDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise<boolean> {
signal?.throwIfAborted();
const { warmStorageService } = this.walletSdkService.getWalletServices();
try {
Expand All @@ -735,6 +762,61 @@ export class DealService implements OnModuleInit, OnModuleDestroy {
}
}

/**
* Probes the SP's Curio addPieces endpoint with an empty body. Curio's
* handler checks `unrecoverable_proving_failure_epoch` before body
* validation and returns HTTP 409 with "Data set has been terminated due to
* unrecoverable proving failure" when the dataset is marked terminated.
*
* Returns `false` only on `409` paired with that body text. The body
* substring is a guard against blast radius: if a future Curio reuses `409`
* for a non-terminal conflict, the probe stays conservative rather than
* triggering destructive repair. Any other response (including a `409` with
* a different body, `400` for empty pieces, `404`, `5xx`, and network
* errors) is treated as live.
*/
protected async probeSpHttpDataSetLive(
providerAddress: string,
dataSetId: bigint,
signal?: AbortSignal,
): Promise<boolean> {
signal?.throwIfAborted();
const providerInfo = this.walletSdkService.getProviderInfo(providerAddress);
if (!providerInfo) {
throw new Error(`Provider ${providerAddress} not found in registry`);
}
const serviceURL = providerInfo.pdp?.serviceURL;
if (!serviceURL) {
throw new Error(`Provider ${providerAddress} has no PDP serviceURL`);
Comment thread
SgtPooki marked this conversation as resolved.
}
const url = new URL(`pdp/data-sets/${dataSetId.toString()}/pieces`, serviceURL);
Comment thread
SgtPooki marked this conversation as resolved.
const timeoutSignal = AbortSignal.timeout(PDP_LIVENESS_PROBE_TIMEOUT_MS);
const probeSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal;
try {
const res = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: "{}",
signal: probeSignal,
});
if (res.status !== 409) return true;
const body = await res.text();
return !/unrecoverable proving failure/i.test(body);
} catch (error) {
if (signal?.aborted) throw error;
this.logger.warn({
event: "dataset_sp_liveness_probe_failed",
message: "SP HTTP liveness probe failed; treating dataset as live",
providerAddress,
providerId: providerInfo.id,
dataSetId: dataSetId.toString(),
serviceURL,
error: toStructuredError(error),
});
return true;
}
}

/**
* Repair a PDP-terminated dataset (FWSS may or may not have flipped pdpEndEpoch).
*
Expand Down