Skip to content

Commit 70a45d8

Browse files
authored
fix(coprocessor): port DB and TFHE worker fixes from release 0.10.x (#1396)
* fix(coprocessor): fix missing downgrade of scheduler error in tfhe-worker (#1352) * fix(coprocessor): update tfhe-worker query for work (#1357)
1 parent da3aafb commit 70a45d8

File tree

4 files changed

+29
-9
lines changed

4 files changed

+29
-9
lines changed
Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
CREATE INDEX IF NOT EXISTS idx_computations_created_at
2+
ON computations USING BTREE (created_at)
3+
WHERE is_completed = false;

coprocessor/fhevm-engine/scheduler/src/dfg/scheduler.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,8 @@ fn execute_partition(
346346
for (h, i) in tx_inputs.iter_mut() {
347347
if i.is_none() {
348348
let Some(Ok(ct)) = res.get(h) else {
349-
warn!(target: "scheduler", {transaction_id = ?tid },
350-
"Missing input to compute transaction - skipping");
349+
warn!(target: "scheduler", {transaction_id = ?hex::encode(tid) },
350+
"Missing input to compute transaction - skipping");
351351
for nidx in dfg.graph.node_identifiers() {
352352
let Some(node) = dfg.graph.node_weight_mut(nidx) else {
353353
error!(target: "scheduler", {index = ?nidx.index() }, "Wrong dataflow graph index");
@@ -375,7 +375,7 @@ fn execute_partition(
375375
if let Err(e) =
376376
re_randomise_transaction_inputs(tx_inputs, &tid, cid, gpu_idx, cpk.clone())
377377
{
378-
error!(target: "scheduler", {transaction_id = ?tid, error = ?e },
378+
error!(target: "scheduler", {transaction_id = ?hex::encode(tid), error = ?e },
379379
"Error while re-randomising inputs");
380380
for nidx in dfg.graph.node_identifiers() {
381381
let Some(node) = dfg.graph.node_weight_mut(nidx) else {
@@ -401,7 +401,7 @@ fn execute_partition(
401401
// If re-randomisation is not available (e.g., on GPU),
402402
// only decompress ciphertexts
403403
if let Err(e) = decompress_transaction_inputs(tx_inputs, &tid, gpu_idx, cpk.clone()) {
404-
error!(target: "scheduler", {transaction_id = ?tid, error = ?e },
404+
error!(target: "scheduler", {transaction_id = ?hex::encode(tid), error = ?e },
405405
"Error while decompressing inputs");
406406
for nidx in dfg.graph.node_identifiers() {
407407
let Some(node) = dfg.graph.node_weight_mut(nidx) else {
@@ -499,7 +499,7 @@ fn try_execute_node(
499499
cts.push(i);
500500
} else {
501501
// That should not be possible as we called the checker.
502-
error!(target: "scheduler", { handle = ?node.result_handle }, "Computation missing inputs");
502+
error!(target: "scheduler", { handle = ?hex::encode(&node.result_handle) }, "Computation missing inputs");
503503
return Err(SchedulerError::MissingInputs.into());
504504
}
505505
}

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,19 @@ async fn query_for_work<'a>(
303303
WITH selected_computations AS (
304304
(
305305
SELECT DISTINCT
306-
c.transaction_id
306+
c_creation_order.transaction_id
307+
FROM (
308+
SELECT transaction_id
309+
FROM computations
310+
WHERE is_completed = FALSE
311+
AND is_error = FALSE
312+
AND is_allowed = TRUE
313+
ORDER BY created_at
314+
LIMIT $1
315+
) as c_creation_order
316+
UNION ALL
317+
SELECT DISTINCT
318+
c_schedule_order.transaction_id
307319
FROM (
308320
SELECT transaction_id
309321
FROM computations
@@ -312,7 +324,7 @@ WITH selected_computations AS (
312324
AND is_allowed = TRUE
313325
ORDER BY schedule_order
314326
LIMIT $1
315-
) as c
327+
) as c_schedule_order
316328
)
317329
)
318330
-- Acquire all computations from this transaction set
@@ -548,6 +560,11 @@ async fn upload_transaction_graph_results<'a>(
548560
CoprocessorError::SchedulerError(SchedulerError::MissingInputs)
549561
) {
550562
uncomputable.push((result.handle.clone(), result.transaction_id.clone()));
563+
// Make sure we don't mark this as an error since this simply means that the
564+
// inputs weren't available when we tried scheduling these operations.
565+
// Setting them as uncomputable will postpone them with an exponential backoff
566+
// and they will be retried later.
567+
continue;
551568
}
552569
}
553570
set_computation_error(

0 commit comments

Comments
 (0)