Skip to content

Commit 2fb5370

Browse files
committed
refactor(cch): refactor lnd trackers
- Define payment and invoice events to be sent to CCH for both fiber and lnd. - These events can be emitted from fiber store changes in a later PR. - Extract lnd trackers into its own module. - Limit concurrent lnd invoice trackers to 5, scheduling them at 5-minute intervals using a round-robin strategy.
1 parent e39b267 commit 2fb5370

File tree

9 files changed

+949
-401
lines changed

9 files changed

+949
-401
lines changed

crates/fiber-lib/src/cch/actor.rs

Lines changed: 115 additions & 372 deletions
Large diffs are not rendered by default.

crates/fiber-lib/src/cch/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ pub enum CchError {
4040
ReceiveBTCOrderAlreadyPaid,
4141
#[error("ReceiveBTC received payment amount is too small")]
4242
ReceiveBTCReceivedAmountTooSmall,
43-
#[error("ReceiveBTC expected preimage but missing")]
44-
ReceiveBTCMissingPreimage,
43+
#[error("Expect preimage in settled payment but missing")]
44+
SettledPaymentMissingPreimage,
4545
#[error("System time error: {0}")]
4646
SystemTimeError(#[from] SystemTimeError),
4747
#[error("JSON serialization error: {0}")]

crates/fiber-lib/src/cch/events.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use lnd_grpc_tonic_client::lnrpc;
2+
3+
use crate::{cch::CchOrderStatus, fiber::types::Hash256};
4+
5+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6+
pub enum CchIncomingPaymentStatus {
7+
// The income payment is in-flight
8+
InFlight = 0,
9+
// Incoming payment TLCs have been accepted
10+
Accepted = 1,
11+
Settled = 2,
12+
Failed = 3,
13+
}
14+
15+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16+
pub enum CchOutgoingPaymentStatus {
17+
// The outgoing payment is in-flight
18+
InFlight = 0,
19+
Settled = 2,
20+
Failed = 3,
21+
}
22+
23+
impl From<CchIncomingPaymentStatus> for CchOrderStatus {
24+
fn from(status: CchIncomingPaymentStatus) -> Self {
25+
match status {
26+
CchIncomingPaymentStatus::InFlight => CchOrderStatus::Pending,
27+
CchIncomingPaymentStatus::Accepted => CchOrderStatus::IncomingAccepted,
28+
CchIncomingPaymentStatus::Settled => CchOrderStatus::Succeeded,
29+
CchIncomingPaymentStatus::Failed => CchOrderStatus::Failed,
30+
}
31+
}
32+
}
33+
34+
impl From<CchOutgoingPaymentStatus> for CchOrderStatus {
35+
fn from(status: CchOutgoingPaymentStatus) -> Self {
36+
match status {
37+
CchOutgoingPaymentStatus::InFlight => CchOrderStatus::OutgoingInFlight,
38+
CchOutgoingPaymentStatus::Settled => CchOrderStatus::OutgoingSettled,
39+
CchOutgoingPaymentStatus::Failed => CchOrderStatus::Failed,
40+
}
41+
}
42+
}
43+
44+
/// Lnd invoice is the incoming part of a CCHOrder to receive BTC from Lightning to Fiber
45+
impl From<lnrpc::invoice::InvoiceState> for CchIncomingPaymentStatus {
46+
fn from(state: lnrpc::invoice::InvoiceState) -> Self {
47+
use lnrpc::invoice::InvoiceState;
48+
match state {
49+
InvoiceState::Open => CchIncomingPaymentStatus::InFlight,
50+
InvoiceState::Settled => CchIncomingPaymentStatus::Settled,
51+
InvoiceState::Canceled => CchIncomingPaymentStatus::Failed,
52+
InvoiceState::Accepted => CchIncomingPaymentStatus::Accepted,
53+
}
54+
}
55+
}
56+
57+
/// Lnd payment is the outgoing part of a CCHOrder to send BTC from Fiber to Lightning
58+
impl From<lnrpc::payment::PaymentStatus> for CchOutgoingPaymentStatus {
59+
fn from(status: lnrpc::payment::PaymentStatus) -> Self {
60+
use lnrpc::payment::PaymentStatus;
61+
match status {
62+
PaymentStatus::Unknown => CchOutgoingPaymentStatus::InFlight,
63+
PaymentStatus::InFlight => CchOutgoingPaymentStatus::InFlight,
64+
PaymentStatus::Succeeded => CchOutgoingPaymentStatus::Settled,
65+
PaymentStatus::Failed => CchOutgoingPaymentStatus::Failed,
66+
PaymentStatus::Initiated => CchOutgoingPaymentStatus::InFlight,
67+
}
68+
}
69+
}
70+
71+
#[derive(Debug, Clone)]
72+
pub enum CchIncomingEvent {
73+
InvoiceChanged {
74+
/// The payment hash of the invoice.
75+
payment_hash: Hash256,
76+
/// The preimage of the invoice.
77+
payment_preimage: Option<Hash256>,
78+
status: CchIncomingPaymentStatus,
79+
},
80+
81+
PaymentChanged {
82+
/// The payment hash of the invoice.
83+
payment_hash: Hash256,
84+
/// The preimage of the invoice.
85+
payment_preimage: Option<Hash256>,
86+
status: CchOutgoingPaymentStatus,
87+
},
88+
}
89+
90+
impl CchIncomingEvent {
91+
pub fn payment_hash(&self) -> &Hash256 {
92+
match self {
93+
CchIncomingEvent::InvoiceChanged { payment_hash, .. } => payment_hash,
94+
CchIncomingEvent::PaymentChanged { payment_hash, .. } => payment_hash,
95+
}
96+
}
97+
}

crates/fiber-lib/src/cch/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ pub use actor::{start_cch, CchActor, CchArgs, CchMessage, ReceiveBTC, SendBTC};
44
mod error;
55
pub use error::{CchError, CchResult};
66

7+
mod events;
8+
pub use events::{CchIncomingEvent, CchIncomingPaymentStatus, CchOutgoingPaymentStatus};
9+
mod trackers;
10+
pub use trackers::{LndConnectionInfo, LndTrackerActor, LndTrackerArgs, LndTrackerMessage};
11+
712
mod config;
813
pub use config::{
914
CchConfig, DEFAULT_BTC_FINAL_TLC_EXPIRY_TIME, DEFAULT_CKB_FINAL_TLC_EXPIRY_DELTA,
@@ -15,3 +20,6 @@ pub use order::{CchInvoice, CchOrder, CchOrderStatus};
1520

1621
mod orders_db;
1722
pub use orders_db::CchOrdersDb;
23+
24+
#[cfg(test)]
25+
pub mod tests;

crates/fiber-lib/src/cch/order.rs

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use lightning_invoice::Bolt11Invoice;
2-
use lnd_grpc_tonic_client::lnrpc;
32
use serde::{Deserialize, Serialize};
43
use serde_with::{serde_as, DisplayFromStr};
54

@@ -15,7 +14,7 @@ use crate::{
1514
#[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq)]
1615
#[serde(rename_all = "snake_case")]
1716
pub enum CchOrderStatus {
18-
/// Order is created and has not send out payments yet.
17+
/// Order is created and has not received the incoming payment
1918
Pending = 0,
2019
/// HTLC in the incoming payment is accepted.
2120
IncomingAccepted = 1,
@@ -29,31 +28,6 @@ pub enum CchOrderStatus {
2928
Failed = 5,
3029
}
3130

32-
/// Lnd payment is the outgoing part of a CCHOrder to send BTC from Fiber to Lightning
33-
impl From<lnrpc::payment::PaymentStatus> for CchOrderStatus {
34-
fn from(status: lnrpc::payment::PaymentStatus) -> Self {
35-
use lnrpc::payment::PaymentStatus;
36-
match status {
37-
PaymentStatus::Succeeded => CchOrderStatus::OutgoingSettled,
38-
PaymentStatus::Failed => CchOrderStatus::Failed,
39-
_ => CchOrderStatus::OutgoingInFlight,
40-
}
41-
}
42-
}
43-
44-
/// Lnd invoice is the incoming part of a CCHOrder to receive BTC from Lightning to Fiber
45-
impl From<lnrpc::invoice::InvoiceState> for CchOrderStatus {
46-
fn from(state: lnrpc::invoice::InvoiceState) -> Self {
47-
use lnrpc::invoice::InvoiceState;
48-
match state {
49-
InvoiceState::Accepted => CchOrderStatus::IncomingAccepted,
50-
InvoiceState::Canceled => CchOrderStatus::Failed,
51-
InvoiceState::Settled => CchOrderStatus::Succeeded,
52-
_ => CchOrderStatus::Pending,
53-
}
54-
}
55-
}
56-
5731
/// The generated proxy invoice for the incoming payment.
5832
///
5933
/// The JSON representation:
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
use std::sync::Arc;
2+
3+
use crate::{
4+
cch::{LndConnectionInfo, LndTrackerActor, LndTrackerArgs, LndTrackerMessage},
5+
fiber::types::Hash256,
6+
};
7+
8+
use ractor::{concurrency::Duration as RactorDuration, Actor, ActorRef, OutputPort};
9+
use tokio_util::{sync::CancellationToken, task::TaskTracker};
10+
11+
// Helper function to create test arguments
12+
fn create_test_args() -> LndTrackerArgs {
13+
let port = Arc::new(OutputPort::default());
14+
let tracker = TaskTracker::new();
15+
let token = CancellationToken::new();
16+
let lnd_connection = LndConnectionInfo {
17+
// Tracker will keep running because this URI is unreachable
18+
uri: "https://localhost:10009".parse().unwrap(),
19+
cert: None,
20+
macaroon: None,
21+
};
22+
23+
LndTrackerArgs {
24+
port,
25+
lnd_connection,
26+
token,
27+
tracker,
28+
}
29+
}
30+
31+
// Helper function to create a test payment hash
32+
fn test_payment_hash(value: u8) -> Hash256 {
33+
let mut bytes = [0u8; 32];
34+
bytes[0] = value;
35+
Hash256::from(bytes)
36+
}
37+
38+
// Helper function to create a test `LndTrackerActor` (without spawning trackers)
39+
async fn create_test_actor() -> (ActorRef<LndTrackerMessage>, tokio::task::JoinHandle<()>) {
40+
// Use spawn instead of spawn_linked to avoid needing a root actor
41+
let args = create_test_args();
42+
let (actor_ref, actor_handle) =
43+
Actor::spawn(Some("test_lnd_tracker".to_string()), LndTrackerActor, args)
44+
.await
45+
.expect("Failed to spawn test actor");
46+
47+
(actor_ref, actor_handle)
48+
}
49+
50+
// Test completion decrements active_invoice_trackers counter
51+
#[tokio::test]
52+
async fn test_completion_decrements_counter() {
53+
let (actor_ref, _handle) = create_test_actor().await;
54+
let payment_hash = test_payment_hash(1);
55+
56+
// Add invoice to queue (without processing to avoid LND calls)
57+
actor_ref
58+
.cast(LndTrackerMessage::TrackInvoice(payment_hash))
59+
.expect("Failed to send TrackInvoice");
60+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
61+
62+
// Send completion message (simulating a tracker that finished)
63+
actor_ref
64+
.cast(LndTrackerMessage::InvoiceTrackerCompleted {
65+
payment_hash,
66+
completed_successfully: true,
67+
})
68+
.expect("Failed to send completion");
69+
70+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
71+
72+
// Verify counter behavior (should handle completion gracefully)
73+
let final_state = actor_ref
74+
.call(
75+
|reply_port| LndTrackerMessage::GetState(reply_port),
76+
Some(RactorDuration::from_millis(1000)),
77+
)
78+
.await
79+
.expect("Actor should be responsive after completion");
80+
81+
assert!(final_state.is_success());
82+
let final_state = final_state.unwrap();
83+
assert_eq!(final_state.invoice_queue_len, 0);
84+
assert_eq!(final_state.active_invoice_trackers, 0);
85+
}
86+
87+
// Test completion triggers queue processing for waiting invoices
88+
#[tokio::test]
89+
async fn test_completion_triggers_queue_processing() {
90+
let (actor_ref, _handle) = create_test_actor().await;
91+
92+
// Add 6 invoices to queue
93+
for i in 0..6 {
94+
let payment_hash = test_payment_hash(i);
95+
actor_ref
96+
.cast(LndTrackerMessage::TrackInvoice(payment_hash))
97+
.expect("Failed to send TrackInvoice");
98+
}
99+
100+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
101+
102+
// Verify invoices are queued
103+
let state_before = actor_ref
104+
.call(
105+
|reply_port| LndTrackerMessage::GetState(reply_port),
106+
Some(RactorDuration::from_millis(1000)),
107+
)
108+
.await
109+
.expect("Failed to get state")
110+
.expect("Failed to get state");
111+
112+
assert_eq!(
113+
state_before.invoice_queue_len, 1,
114+
"Should have 1 invoice in queue"
115+
);
116+
assert_eq!(
117+
state_before.active_invoice_trackers, 5,
118+
"Should have 5 active invoice trackers"
119+
);
120+
121+
let completed_hash = test_payment_hash(1);
122+
actor_ref
123+
.cast(LndTrackerMessage::InvoiceTrackerCompleted {
124+
payment_hash: completed_hash,
125+
completed_successfully: true,
126+
})
127+
.expect("Failed to send completion");
128+
129+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
130+
131+
// Verify actor is still responsive
132+
let state_after = actor_ref
133+
.call(
134+
|reply_port| LndTrackerMessage::GetState(reply_port),
135+
Some(RactorDuration::from_millis(1000)),
136+
)
137+
.await
138+
.expect("Failed to get state")
139+
.expect("Failed to get state");
140+
141+
assert_eq!(
142+
state_after.invoice_queue_len, 0,
143+
"Should have 0 invoices in queue"
144+
);
145+
assert_eq!(
146+
state_after.active_invoice_trackers, 5,
147+
"Should have 5 active invoice trackers"
148+
);
149+
}
150+
151+
// Test timeout re-queues active invoices to end of queue
152+
#[tokio::test]
153+
async fn test_timeout_requeues_active_invoices() {
154+
let (actor_ref, _handle) = create_test_actor().await;
155+
let payment_hash = test_payment_hash(1);
156+
157+
// Add invoice to queue (without processing to avoid LND calls)
158+
actor_ref
159+
.cast(LndTrackerMessage::TrackInvoice(payment_hash))
160+
.expect("Failed to send TrackInvoice");
161+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
162+
163+
// Send completion message (simulating a tracker that finished)
164+
actor_ref
165+
.cast(LndTrackerMessage::InvoiceTrackerCompleted {
166+
payment_hash,
167+
completed_successfully: false,
168+
})
169+
.expect("Failed to send completion");
170+
171+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
172+
173+
// Verify counter behavior (should handle completion gracefully)
174+
let final_state = actor_ref
175+
.call(
176+
|reply_port| LndTrackerMessage::GetState(reply_port),
177+
Some(RactorDuration::from_millis(1000)),
178+
)
179+
.await
180+
.expect("Actor should be responsive after completion");
181+
182+
assert!(final_state.is_success());
183+
let final_state = final_state.unwrap();
184+
assert_eq!(final_state.invoice_queue_len, 0);
185+
assert_eq!(final_state.active_invoice_trackers, 1);
186+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mod lnd_trackers_tests;

0 commit comments

Comments
 (0)