From 2c93f88eb1a78538d9deb1d37606d122392f4575 Mon Sep 17 00:00:00 2001 From: ian Date: Tue, 11 Nov 2025 11:48:53 +0800 Subject: [PATCH 1/2] refactor(cch): refactor lnd trackers - Define payment and invoice events to be sent to CCH for both fiber and lnd. - These events can be emitted from fiber store changes in a later PR. - Extract lnd trackers into its own module. - Limit concurrent lnd invoice trackers to 5, scheduling them at 5-minute intervals using a round-robin strategy. --- crates/fiber-lib/src/cch/actor.rs | 495 ++++------------ crates/fiber-lib/src/cch/error.rs | 4 +- crates/fiber-lib/src/cch/events.rs | 97 ++++ crates/fiber-lib/src/cch/mod.rs | 8 + crates/fiber-lib/src/cch/order.rs | 28 +- .../src/cch/tests/lnd_trackers_tests.rs | 185 ++++++ crates/fiber-lib/src/cch/tests/mod.rs | 1 + .../src/cch/trackers/lnd_trackers.rs | 535 ++++++++++++++++++ crates/fiber-lib/src/cch/trackers/mod.rs | 2 + crates/fiber-lib/src/rpc/README.md | 2 +- 10 files changed, 953 insertions(+), 404 deletions(-) create mode 100644 crates/fiber-lib/src/cch/events.rs create mode 100644 crates/fiber-lib/src/cch/tests/lnd_trackers_tests.rs create mode 100644 crates/fiber-lib/src/cch/tests/mod.rs create mode 100644 crates/fiber-lib/src/cch/trackers/lnd_trackers.rs create mode 100644 crates/fiber-lib/src/cch/trackers/mod.rs diff --git a/crates/fiber-lib/src/cch/actor.rs b/crates/fiber-lib/src/cch/actor.rs index 6cd2f8a51..95b9e1371 100644 --- a/crates/fiber-lib/src/cch/actor.rs +++ b/crates/fiber-lib/src/cch/actor.rs @@ -1,32 +1,33 @@ use anyhow::{anyhow, Context, Result}; use futures::StreamExt as _; use lightning_invoice::Bolt11Invoice; -use lnd_grpc_tonic_client::{ - create_invoices_client, create_router_client, invoicesrpc, lnrpc, routerrpc, InvoicesClient, - RouterClient, Uri, -}; -use ractor::{call, RpcReplyPort}; +use lnd_grpc_tonic_client::{invoicesrpc, lnrpc, routerrpc, Uri}; +use ractor::{call, port::OutputPortSubscriberTrait as _, OutputPort, RpcReplyPort}; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef}; use secp256k1::{PublicKey, Secp256k1, SecretKey}; use serde::Deserialize; use std::str::FromStr; +use std::sync::Arc; use tentacle::secio::SecioKeyPair; -use tokio::{select, time::sleep}; +use tokio::select; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use crate::cch::order::CchInvoice; use crate::ckb::contracts::{get_script_by_contract, Contract}; use crate::fiber::channel::TlcNotification; use crate::fiber::hash_algorithm::HashAlgorithm; use crate::fiber::network::SendPaymentCommand; use crate::fiber::payment::PaymentStatus; use crate::fiber::types::{Hash256, Privkey}; +use crate::fiber::ASSUME_NETWORK_ACTOR_ALIVE; use crate::fiber::{NetworkActorCommand, NetworkActorMessage}; use crate::invoice::{CkbInvoice, Currency, InvoiceBuilder}; use crate::time::{Duration, SystemTime, UNIX_EPOCH}; -use super::error::CchDbError; -use super::{CchConfig, CchError, CchOrder, CchOrderStatus, CchOrdersDb}; +use super::{ + error::CchDbError, CchConfig, CchError, CchIncomingEvent, CchIncomingPaymentStatus, CchInvoice, + CchOrder, CchOrderStatus, CchOrdersDb, CchOutgoingPaymentStatus, LndConnectionInfo, + LndTrackerActor, LndTrackerArgs, LndTrackerMessage, +}; pub const BTC_PAYMENT_TIMEOUT_SECONDS: i32 = 60; pub const DEFAULT_ORDER_EXPIRY_SECONDS: u64 = 86400; // 24 hours @@ -37,20 +38,6 @@ pub async fn start_cch(args: CchArgs, root_actor: ActorCell) -> Result, - status: CchOrderStatus, -} - -#[derive(Debug)] -pub struct SettleReceiveBTCOrderEvent { - payment_hash: Hash256, - preimage: Option, - status: CchOrderStatus, -} - #[derive(Clone, Debug, Deserialize)] pub struct SendBTC { pub btc_pay_req: String, @@ -68,41 +55,15 @@ pub enum CchMessage { GetCchOrder(Hash256, RpcReplyPort>), - SettleSendBTCOrder(SettleSendBTCOrderEvent), - SettleReceiveBTCOrder(SettleReceiveBTCOrderEvent), + Event(CchIncomingEvent), PendingReceivedTlcNotification(TlcNotification), SettledTlcNotification(TlcNotification), } -#[derive(Clone)] -struct LndConnectionInfo { - uri: Uri, - cert: Option>, - macaroon: Option>, -} - -impl LndConnectionInfo { - async fn create_router_client( - &self, - ) -> Result { - create_router_client( - self.uri.clone(), - self.cert.as_deref(), - self.macaroon.as_deref(), - ) - .await - } - - async fn create_invoices_client( - &self, - ) -> Result { - create_invoices_client( - self.uri.clone(), - self.cert.as_deref(), - self.macaroon.as_deref(), - ) - .await +impl From for CchMessage { + fn from(value: CchIncomingEvent) -> Self { + CchMessage::Event(value) } } @@ -119,11 +80,11 @@ pub struct CchArgs { pub struct CchState { config: CchConfig, - tracker: TaskTracker, token: CancellationToken, network_actor: ActorRef, node_keypair: (PublicKey, SecretKey), lnd_connection: LndConnectionInfo, + lnd_tracker: ActorRef, orders_db: CchOrdersDb, } @@ -155,11 +116,7 @@ impl Actor for CchActor { ), None => None, }; - let lnd_connection = LndConnectionInfo { - uri: lnd_rpc_url, - cert, - macaroon, - }; + let lnd_connection = LndConnectionInfo::new(lnd_rpc_url, cert, macaroon); let private_key: Privkey = <[u8; 32]>::try_from(args.node_keypair.as_ref()) .expect("valid length for key") @@ -171,31 +128,35 @@ impl Actor for CchActor { private_key.into(), ); + let port = Arc::new(OutputPort::default()); + let lnd_tracker = LndTrackerActor::start( + LndTrackerArgs { + port: port.clone(), + lnd_connection: lnd_connection.clone(), + tracker: args.tracker, + token: args.token.clone(), + }, + myself.get_cell(), + ) + .await?; + myself.subscribe_to_port(&port); + let state = CchState { config: args.config, - tracker: args.tracker, token: args.token, network_actor: args.network_actor, orders_db: Default::default(), node_keypair, lnd_connection, + lnd_tracker, }; - let payments_tracker = LndPaymentsTracker::new( - myself.clone(), - state.lnd_connection.clone(), - state.token.clone(), - ); - state - .tracker - .spawn(async move { payments_tracker.run().await }); - Ok(state) } async fn handle( &self, - myself: ActorRef, + _myself: ActorRef, message: Self::Msg, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { @@ -209,7 +170,7 @@ impl Actor for CchActor { Ok(()) } CchMessage::ReceiveBTC(receive_btc, port) => { - let result = state.receive_btc(myself, receive_btc).await; + let result = state.receive_btc(receive_btc).await; if !port.is_closed() { // ignore error let _ = port.send(result); @@ -228,17 +189,10 @@ impl Actor for CchActor { } Ok(()) } - CchMessage::SettleSendBTCOrder(event) => { - tracing::debug!("settle_send_btc_order {:?}", event); - if let Err(err) = state.settle_send_btc_order(event).await { - tracing::error!("settle_send_btc_order failed: {}", err); - } - Ok(()) - } - CchMessage::SettleReceiveBTCOrder(event) => { - tracing::debug!("settle_receive_btc_order {:?}", event); - if let Err(err) = state.settle_receive_btc_order(event).await { - tracing::error!("settle_receive_btc_order failed: {}", err); + CchMessage::Event(event) => { + tracing::debug!("event {:?}", event); + if let Err(err) = state.handle_event(event).await { + tracing::error!("handle_event failed: {}", err); } Ok(()) } @@ -323,7 +277,7 @@ impl CchState { )) } }; - call!(self.network_actor, message).expect("call actor")?; + call!(self.network_actor, message).expect(ASSUME_NETWORK_ACTOR_ALIVE)?; let order = CchOrder { wrapped_btc_type_script, @@ -388,7 +342,8 @@ impl CchState { payment_result_opt = stream.next() => { tracing::debug!("[inbounding tlc] payment result: {:?}", payment_result_opt); if let Some(Ok(payment)) = payment_result_opt { - order.status = lnrpc::payment::PaymentStatus::try_from(payment.status)?.into(); + let status: CchOutgoingPaymentStatus = lnrpc::payment::PaymentStatus::try_from(payment.status)?.into(); + order.status = status.into(); self.orders_db .update_cch_order(order) .await?; @@ -421,7 +376,7 @@ impl CchState { let preimage = tlc_notification .tlc .payment_preimage - .ok_or(CchError::ReceiveBTCMissingPreimage)?; + .ok_or(CchError::SettledPaymentMissingPreimage)?; tracing::debug!("[settled tlc] preimage: {:#x}", preimage); @@ -446,34 +401,56 @@ impl CchState { Ok(()) } - async fn settle_send_btc_order(&mut self, event: SettleSendBTCOrderEvent) -> Result<()> { - let payment_hash = event.payment_hash; - - let mut order = match self.orders_db.get_cch_order(&event.payment_hash).await { + async fn handle_event(&mut self, event: CchIncomingEvent) -> Result<()> { + let order = match self.orders_db.get_cch_order(event.payment_hash()).await { Err(CchDbError::NotFound(_)) => return Ok(()), Err(err) => return Err(err.into()), - Ok(order) if !order.is_from_fiber_to_lightning() => return Ok(()), Ok(order) => order, }; - order.status = event.status; - if let Some(preimage) = event.preimage { - tracing::info!( - "SettleSendBTCOrder: payment_hash={}, status={:?}", - payment_hash, - event.status - ); - order.payment_preimage = Some(preimage); + match event { + CchIncomingEvent::InvoiceChanged { status, .. } => { + self.handle_invoice_changed_event(order, status).await + } + CchIncomingEvent::PaymentChanged { + payment_preimage, + status, + .. + } => { + self.handle_payment_changed_event(order, payment_preimage, status) + .await + } + } + } + + async fn handle_payment_changed_event( + &mut self, + mut order: CchOrder, + payment_preimage: Option, + status: CchOutgoingPaymentStatus, + ) -> Result<()> { + // When the order is not from fiber to lightning, the payment is sent to fiber. Since Fiber + // events are handled in `handle_pending_received_tlc_notification` and + // `handle_settled_tlc_notification`, skip the event here. + if !order.is_from_fiber_to_lightning() { + return Ok(()); + } + order.status = status.into(); + + if status == CchOutgoingPaymentStatus::Settled { + let payment_preimage = + payment_preimage.ok_or(CchError::SettledPaymentMissingPreimage)?; + order.payment_preimage = Some(payment_preimage); self.orders_db.update_cch_order(order.clone()).await?; let command = move |rpc_reply| -> NetworkActorMessage { NetworkActorMessage::Command(NetworkActorCommand::SettleInvoice( - payment_hash, - preimage, + order.payment_hash, + payment_preimage, rpc_reply, )) }; - call!(self.network_actor, command).expect("call actor")?; + call!(self.network_actor, command).expect(ASSUME_NETWORK_ACTOR_ALIVE)?; order.status = CchOrderStatus::Succeeded; } @@ -482,11 +459,47 @@ impl CchState { Ok(()) } - async fn receive_btc( + async fn handle_invoice_changed_event( &mut self, - myself: ActorRef, - receive_btc: ReceiveBTC, - ) -> Result { + mut order: CchOrder, + status: CchIncomingPaymentStatus, + ) -> Result<()> { + // When the order is from fiber to lightning, the invoice is created in fiber. Since Fiber + // events are handled in `handle_pending_received_tlc_notification` and + // `handle_settled_tlc_notification`, skip the event here. + if order.is_from_fiber_to_lightning() { + return Ok(()); + } + + order.status = status.into(); + if status == CchIncomingPaymentStatus::Accepted { + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::SendPayment( + SendPaymentCommand { + invoice: Some(order.outgoing_pay_req.clone()), + ..Default::default() + }, + rpc_reply, + )) + }; + + let payment_status = call!(self.network_actor, message) + .expect(ASSUME_NETWORK_ACTOR_ALIVE) + .map_err(|err| anyhow!("{}", err))? + .status; + + if payment_status == PaymentStatus::Failed { + order.status = CchOrderStatus::Failed; + } else { + order.status = CchOrderStatus::OutgoingInFlight; + } + } + + self.orders_db.update_cch_order(order.clone()).await?; + Ok(()) + } + + async fn receive_btc(&mut self, receive_btc: ReceiveBTC) -> Result { let invoice = CkbInvoice::from_str(&receive_btc.fiber_pay_req)?; let payment_hash = *invoice.payment_hash(); let amount_sats = invoice.amount().ok_or(CchError::CKBInvoiceMissingAmount)?; @@ -548,276 +561,10 @@ impl CchState { self.orders_db.insert_cch_order(order.clone()).await?; - let invoice_tracker = LndInvoiceTracker::new( - myself, - payment_hash, - self.lnd_connection.clone(), - self.token.clone(), - ); - self.tracker - .spawn(async move { invoice_tracker.run().await }); + self.lnd_tracker + .cast(LndTrackerMessage::TrackInvoice(payment_hash)) + .expect("cast message to LndTrackerActor"); Ok(order) } - - async fn settle_receive_btc_order(&mut self, event: SettleReceiveBTCOrderEvent) -> Result<()> { - let mut order = match self.orders_db.get_cch_order(&event.payment_hash).await { - Err(CchDbError::NotFound(_)) => return Ok(()), - Err(err) => return Err(err.into()), - Ok(order) if order.is_from_fiber_to_lightning() => return Ok(()), - Ok(order) => order, - }; - - order.status = event.status; - order.payment_preimage = event.preimage; - - if event.status == CchOrderStatus::IncomingAccepted { - let message = |rpc_reply| -> NetworkActorMessage { - NetworkActorMessage::Command(NetworkActorCommand::SendPayment( - SendPaymentCommand { - invoice: Some(order.outgoing_pay_req.clone()), - ..Default::default() - }, - rpc_reply, - )) - }; - - let payment_status = call!(self.network_actor, message) - .expect("call actor") - .map_err(|err| anyhow!("{}", err))? - .status; - - let mut order = order.clone(); - if payment_status == PaymentStatus::Failed { - order.status = CchOrderStatus::Failed; - } else { - order.status = CchOrderStatus::OutgoingInFlight; - } - } - - self.orders_db.update_cch_order(order.clone()).await?; - Ok(()) - } -} - -struct LndPaymentsTracker { - cch_actor: ActorRef, - lnd_connection: LndConnectionInfo, - token: CancellationToken, -} - -impl LndPaymentsTracker { - fn new( - cch_actor: ActorRef, - lnd_connection: LndConnectionInfo, - token: CancellationToken, - ) -> Self { - Self { - cch_actor, - lnd_connection, - token, - } - } - - async fn run(self) { - tracing::debug!( - target: "fnn::cch::actor::tracker::lnd_payments", - "will connect {}", - self.lnd_connection.uri - ); - - // TODO: clean up expired orders - loop { - select! { - result = self.run_inner() => { - match result { - Ok(_) => { - break; - } - Err(err) => { - tracing::error!( - target: "fnn::cch::actor::tracker::lnd_payments", - "Error tracking LND payments, retry 15 seconds later: {:?}", - err - ); - select! { - _ = sleep(Duration::from_secs(15)) => { - // continue - } - _ = self.token.cancelled() => { - tracing::debug!("Cancellation received, shutting down cch service"); - return; - } - } - } - } - } - _ = self.token.cancelled() => { - tracing::debug!("Cancellation received, shutting down cch service"); - return; - } - } - } - } - - async fn run_inner(&self) -> Result<()> { - let mut client = self.lnd_connection.create_router_client().await?; - let mut stream = client - .track_payments(routerrpc::TrackPaymentsRequest { - no_inflight_updates: true, - }) - .await? - .into_inner(); - - loop { - select! { - payment_opt = stream.next() => { - match payment_opt { - Some(Ok(payment)) => self.on_payment(payment).await?, - Some(Err(err)) => return Err(err.into()), - None => return Err(anyhow!("unexpected closed stream")), - } - } - _ = self.token.cancelled() => { - tracing::debug!("Cancellation received, shutting down cch service"); - return Ok(()); - } - } - } - } - - async fn on_payment(&self, payment: lnrpc::Payment) -> Result<()> { - tracing::debug!(target: "fnn::cch::actor::tracker::lnd_payments", "payment: {:?}", payment); - let preimage = if !payment.payment_preimage.is_empty() { - Some(Hash256::from_str(&payment.payment_preimage)?) - } else { - None - }; - - let event = CchMessage::SettleSendBTCOrder(SettleSendBTCOrderEvent { - payment_hash: Hash256::from_str(&payment.payment_hash)?, - preimage, - status: lnrpc::payment::PaymentStatus::try_from(payment.status) - .map(Into::into) - .unwrap_or(CchOrderStatus::OutgoingInFlight), - }); - self.cch_actor.cast(event).map_err(Into::into) - } -} - -/// Subscribe single invoice. -/// -/// Lnd does not notify Accepted event in SubscribeInvoices rpc. -/// -/// -struct LndInvoiceTracker { - cch_actor: ActorRef, - payment_hash: Hash256, - lnd_connection: LndConnectionInfo, - token: CancellationToken, -} - -impl LndInvoiceTracker { - fn new( - cch_actor: ActorRef, - payment_hash: Hash256, - lnd_connection: LndConnectionInfo, - token: CancellationToken, - ) -> Self { - Self { - cch_actor, - payment_hash, - lnd_connection, - token, - } - } - - async fn run(self) { - tracing::debug!( - target: "fnn::cch::actor::tracker::lnd_invoice", - "will connect {}", - self.lnd_connection.uri - ); - loop { - select! { - result = self.run_inner() => { - match result { - Ok(_) => { - break; - } - Err(err) => { - tracing::error!( - target: "fnn::cch::actor::tracker::lnd_invoice", - "Error tracking LND invoices, retry 15 seconds later: {:?}", - err - ); - select! { - _ = sleep(Duration::from_secs(15)) => { - // continue - } - _ = self.token.cancelled() => { - tracing::debug!("Cancellation received, shutting down cch service"); - return; - } - } - } - } - } - _ = self.token.cancelled() => { - tracing::debug!("Cancellation received, shutting down cch service"); - return; - } - } - } - } - - async fn run_inner(&self) -> Result<()> { - let mut client = self.lnd_connection.create_invoices_client().await?; - // TODO: clean up expired orders - let mut stream = client - .subscribe_single_invoice(invoicesrpc::SubscribeSingleInvoiceRequest { - r_hash: self.payment_hash.into(), - }) - .await? - .into_inner(); - - loop { - select! { - invoice_opt = stream.next() => { - match invoice_opt { - Some(Ok(invoice)) => if self.on_invoice(invoice).await? { - return Ok(()); - }, - Some(Err(err)) => return Err(err.into()), - None => return Err(anyhow!("unexpected closed stream")), - } - } - _ = self.token.cancelled() => { - tracing::debug!("Cancellation received, shutting down cch service"); - return Ok(()); - } - } - } - } - - // Return true to quit the tracker - async fn on_invoice(&self, invoice: lnrpc::Invoice) -> Result { - tracing::debug!("[LndInvoiceTracker] invoice: {:?}", invoice); - let status = lnrpc::invoice::InvoiceState::try_from(invoice.state) - .map(Into::into) - .unwrap_or(CchOrderStatus::Pending); - let preimage = if !invoice.r_preimage.is_empty() { - Some(Hash256::try_from(invoice.r_preimage.as_slice())?) - } else { - None - }; - let event = CchMessage::SettleReceiveBTCOrder(SettleReceiveBTCOrderEvent { - payment_hash: Hash256::try_from(invoice.r_hash.as_slice())?, - preimage, - status, - }); - self.cch_actor.cast(event)?; - // Quit tracker when the status is final - Ok(status == CchOrderStatus::Succeeded || status == CchOrderStatus::Failed) - } } diff --git a/crates/fiber-lib/src/cch/error.rs b/crates/fiber-lib/src/cch/error.rs index fab440947..f896e343d 100644 --- a/crates/fiber-lib/src/cch/error.rs +++ b/crates/fiber-lib/src/cch/error.rs @@ -40,8 +40,8 @@ pub enum CchError { ReceiveBTCOrderAlreadyPaid, #[error("ReceiveBTC received payment amount is too small")] ReceiveBTCReceivedAmountTooSmall, - #[error("ReceiveBTC expected preimage but missing")] - ReceiveBTCMissingPreimage, + #[error("Expect preimage in settled payment but missing")] + SettledPaymentMissingPreimage, #[error("System time error: {0}")] SystemTimeError(#[from] SystemTimeError), #[error("JSON serialization error: {0}")] diff --git a/crates/fiber-lib/src/cch/events.rs b/crates/fiber-lib/src/cch/events.rs new file mode 100644 index 000000000..ac4343b46 --- /dev/null +++ b/crates/fiber-lib/src/cch/events.rs @@ -0,0 +1,97 @@ +use lnd_grpc_tonic_client::lnrpc; + +use crate::{cch::CchOrderStatus, fiber::types::Hash256}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CchIncomingPaymentStatus { + // The incoming payment is in-flight + InFlight = 0, + // Incoming payment TLCs have been accepted + Accepted = 1, + Settled = 2, + Failed = 3, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CchOutgoingPaymentStatus { + // The outgoing payment is in-flight + InFlight = 0, + Settled = 2, + Failed = 3, +} + +impl From for CchOrderStatus { + fn from(status: CchIncomingPaymentStatus) -> Self { + match status { + CchIncomingPaymentStatus::InFlight => CchOrderStatus::Pending, + CchIncomingPaymentStatus::Accepted => CchOrderStatus::IncomingAccepted, + CchIncomingPaymentStatus::Settled => CchOrderStatus::Succeeded, + CchIncomingPaymentStatus::Failed => CchOrderStatus::Failed, + } + } +} + +impl From for CchOrderStatus { + fn from(status: CchOutgoingPaymentStatus) -> Self { + match status { + CchOutgoingPaymentStatus::InFlight => CchOrderStatus::OutgoingInFlight, + CchOutgoingPaymentStatus::Settled => CchOrderStatus::OutgoingSettled, + CchOutgoingPaymentStatus::Failed => CchOrderStatus::Failed, + } + } +} + +/// Lnd invoice is the incoming part of a CCHOrder to receive BTC from Lightning to Fiber +impl From for CchIncomingPaymentStatus { + fn from(state: lnrpc::invoice::InvoiceState) -> Self { + use lnrpc::invoice::InvoiceState; + match state { + InvoiceState::Open => CchIncomingPaymentStatus::InFlight, + InvoiceState::Settled => CchIncomingPaymentStatus::Settled, + InvoiceState::Canceled => CchIncomingPaymentStatus::Failed, + InvoiceState::Accepted => CchIncomingPaymentStatus::Accepted, + } + } +} + +/// Lnd payment is the outgoing part of a CCHOrder to send BTC from Fiber to Lightning +impl From for CchOutgoingPaymentStatus { + fn from(status: lnrpc::payment::PaymentStatus) -> Self { + use lnrpc::payment::PaymentStatus; + match status { + PaymentStatus::Unknown => CchOutgoingPaymentStatus::InFlight, + PaymentStatus::InFlight => CchOutgoingPaymentStatus::InFlight, + PaymentStatus::Succeeded => CchOutgoingPaymentStatus::Settled, + PaymentStatus::Failed => CchOutgoingPaymentStatus::Failed, + PaymentStatus::Initiated => CchOutgoingPaymentStatus::InFlight, + } + } +} + +#[derive(Debug, Clone)] +pub enum CchIncomingEvent { + InvoiceChanged { + /// The payment hash of the invoice. + payment_hash: Hash256, + /// The preimage of the invoice. + payment_preimage: Option, + status: CchIncomingPaymentStatus, + }, + + PaymentChanged { + /// The payment hash of the invoice. + payment_hash: Hash256, + /// The preimage of the invoice. + payment_preimage: Option, + status: CchOutgoingPaymentStatus, + }, +} + +impl CchIncomingEvent { + pub fn payment_hash(&self) -> &Hash256 { + match self { + CchIncomingEvent::InvoiceChanged { payment_hash, .. } => payment_hash, + CchIncomingEvent::PaymentChanged { payment_hash, .. } => payment_hash, + } + } +} diff --git a/crates/fiber-lib/src/cch/mod.rs b/crates/fiber-lib/src/cch/mod.rs index 517b396d9..4cc4c4a7b 100644 --- a/crates/fiber-lib/src/cch/mod.rs +++ b/crates/fiber-lib/src/cch/mod.rs @@ -4,6 +4,11 @@ pub use actor::{start_cch, CchActor, CchArgs, CchMessage, ReceiveBTC, SendBTC}; mod error; pub use error::{CchError, CchResult}; +mod events; +pub use events::{CchIncomingEvent, CchIncomingPaymentStatus, CchOutgoingPaymentStatus}; +mod trackers; +pub use trackers::{LndConnectionInfo, LndTrackerActor, LndTrackerArgs, LndTrackerMessage}; + mod config; pub use config::{ CchConfig, DEFAULT_BTC_FINAL_TLC_EXPIRY_TIME, DEFAULT_CKB_FINAL_TLC_EXPIRY_DELTA, @@ -15,3 +20,6 @@ pub use order::{CchInvoice, CchOrder, CchOrderStatus}; mod orders_db; pub use orders_db::CchOrdersDb; + +#[cfg(test)] +pub mod tests; diff --git a/crates/fiber-lib/src/cch/order.rs b/crates/fiber-lib/src/cch/order.rs index 476adb19f..16aa46019 100644 --- a/crates/fiber-lib/src/cch/order.rs +++ b/crates/fiber-lib/src/cch/order.rs @@ -1,5 +1,4 @@ use lightning_invoice::Bolt11Invoice; -use lnd_grpc_tonic_client::lnrpc; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; @@ -15,7 +14,7 @@ use crate::{ #[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq)] #[serde(rename_all = "snake_case")] pub enum CchOrderStatus { - /// Order is created and has not send out payments yet. + /// Order is created and has not received the incoming payment Pending = 0, /// HTLC in the incoming payment is accepted. IncomingAccepted = 1, @@ -29,31 +28,6 @@ pub enum CchOrderStatus { Failed = 5, } -/// Lnd payment is the outgoing part of a CCHOrder to send BTC from Fiber to Lightning -impl From for CchOrderStatus { - fn from(status: lnrpc::payment::PaymentStatus) -> Self { - use lnrpc::payment::PaymentStatus; - match status { - PaymentStatus::Succeeded => CchOrderStatus::OutgoingSettled, - PaymentStatus::Failed => CchOrderStatus::Failed, - _ => CchOrderStatus::OutgoingInFlight, - } - } -} - -/// Lnd invoice is the incoming part of a CCHOrder to receive BTC from Lightning to Fiber -impl From for CchOrderStatus { - fn from(state: lnrpc::invoice::InvoiceState) -> Self { - use lnrpc::invoice::InvoiceState; - match state { - InvoiceState::Accepted => CchOrderStatus::IncomingAccepted, - InvoiceState::Canceled => CchOrderStatus::Failed, - InvoiceState::Settled => CchOrderStatus::Succeeded, - _ => CchOrderStatus::Pending, - } - } -} - /// The generated proxy invoice for the incoming payment. /// /// The JSON representation: diff --git a/crates/fiber-lib/src/cch/tests/lnd_trackers_tests.rs b/crates/fiber-lib/src/cch/tests/lnd_trackers_tests.rs new file mode 100644 index 000000000..2dc083778 --- /dev/null +++ b/crates/fiber-lib/src/cch/tests/lnd_trackers_tests.rs @@ -0,0 +1,185 @@ +use std::sync::Arc; + +use crate::{ + cch::{LndConnectionInfo, LndTrackerActor, LndTrackerArgs, LndTrackerMessage}, + fiber::types::Hash256, +}; + +use ractor::{concurrency::Duration as RactorDuration, Actor, ActorRef, OutputPort}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; + +// Helper function to create test arguments +fn create_test_args() -> LndTrackerArgs { + let port = Arc::new(OutputPort::default()); + let tracker = TaskTracker::new(); + let token = CancellationToken::new(); + let lnd_connection = LndConnectionInfo { + // Tracker will keep running because this URI is unreachable + uri: "https://localhost:10009".parse().unwrap(), + cert: None, + macaroon: None, + }; + + LndTrackerArgs { + port, + lnd_connection, + token, + tracker, + } +} + +// Helper function to create a test payment hash +fn test_payment_hash(value: u8) -> Hash256 { + let mut bytes = [0u8; 32]; + bytes[0] = value; + Hash256::from(bytes) +} + +// Helper function to create a test `LndTrackerActor` (without spawning trackers) +async fn create_test_actor() -> (ActorRef, tokio::task::JoinHandle<()>) { + // Use spawn instead of spawn_linked to avoid needing a root actor + let args = create_test_args(); + let (actor_ref, actor_handle) = Actor::spawn(None, LndTrackerActor, args) + .await + .expect("Failed to spawn test actor"); + + (actor_ref, actor_handle) +} + +// Test completion decrements active_invoice_trackers counter +#[tokio::test] +async fn test_completion_decrements_counter() { + let (actor_ref, _handle) = create_test_actor().await; + let payment_hash = test_payment_hash(1); + + // Add invoice to queue (without processing to avoid LND calls) + actor_ref + .cast(LndTrackerMessage::TrackInvoice(payment_hash)) + .expect("Failed to send TrackInvoice"); + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Send completion message (simulating a tracker that finished) + actor_ref + .cast(LndTrackerMessage::InvoiceTrackerCompleted { + payment_hash, + completed_successfully: true, + }) + .expect("Failed to send completion"); + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Verify counter behavior (should handle completion gracefully) + let final_state = actor_ref + .call( + LndTrackerMessage::GetState, + Some(RactorDuration::from_millis(1000)), + ) + .await + .expect("Actor should be responsive after completion"); + + assert!(final_state.is_success()); + let final_state = final_state.unwrap(); + assert_eq!(final_state.invoice_queue_len, 0); + assert_eq!(final_state.active_invoice_trackers, 0); +} + +// Test completion triggers queue processing for waiting invoices +#[tokio::test] +async fn test_completion_triggers_queue_processing() { + let (actor_ref, _handle) = create_test_actor().await; + + // Add 6 invoices to queue + for i in 0..6 { + let payment_hash = test_payment_hash(i); + actor_ref + .cast(LndTrackerMessage::TrackInvoice(payment_hash)) + .expect("Failed to send TrackInvoice"); + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Verify invoices are queued + let state_before = actor_ref + .call( + LndTrackerMessage::GetState, + Some(RactorDuration::from_millis(1000)), + ) + .await + .expect("Failed to get state") + .expect("Failed to get state"); + + assert_eq!( + state_before.invoice_queue_len, 1, + "Should have 1 invoice in queue" + ); + assert_eq!( + state_before.active_invoice_trackers, 5, + "Should have 5 active invoice trackers" + ); + + let completed_hash = test_payment_hash(1); + actor_ref + .cast(LndTrackerMessage::InvoiceTrackerCompleted { + payment_hash: completed_hash, + completed_successfully: true, + }) + .expect("Failed to send completion"); + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Verify actor is still responsive + let state_after = actor_ref + .call( + LndTrackerMessage::GetState, + Some(RactorDuration::from_millis(1000)), + ) + .await + .expect("Failed to get state") + .expect("Failed to get state"); + + assert_eq!( + state_after.invoice_queue_len, 0, + "Should have 0 invoices in queue" + ); + assert_eq!( + state_after.active_invoice_trackers, 5, + "Should have 5 active invoice trackers" + ); +} + +// Test timeout re-queues active invoices to end of queue +#[tokio::test] +async fn test_timeout_requeues_active_invoices() { + let (actor_ref, _handle) = create_test_actor().await; + let payment_hash = test_payment_hash(1); + + // Add invoice to queue (without processing to avoid LND calls) + actor_ref + .cast(LndTrackerMessage::TrackInvoice(payment_hash)) + .expect("Failed to send TrackInvoice"); + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Send completion message (simulating a tracker that finished) + actor_ref + .cast(LndTrackerMessage::InvoiceTrackerCompleted { + payment_hash, + completed_successfully: false, + }) + .expect("Failed to send completion"); + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Verify counter behavior (should handle completion gracefully) + let final_state = actor_ref + .call( + LndTrackerMessage::GetState, + Some(RactorDuration::from_millis(1000)), + ) + .await + .expect("Actor should be responsive after completion"); + + assert!(final_state.is_success()); + let final_state = final_state.unwrap(); + assert_eq!(final_state.invoice_queue_len, 0); + assert_eq!(final_state.active_invoice_trackers, 1); +} diff --git a/crates/fiber-lib/src/cch/tests/mod.rs b/crates/fiber-lib/src/cch/tests/mod.rs new file mode 100644 index 000000000..70bb4e4c6 --- /dev/null +++ b/crates/fiber-lib/src/cch/tests/mod.rs @@ -0,0 +1 @@ +mod lnd_trackers_tests; diff --git a/crates/fiber-lib/src/cch/trackers/lnd_trackers.rs b/crates/fiber-lib/src/cch/trackers/lnd_trackers.rs new file mode 100644 index 000000000..41c61cb06 --- /dev/null +++ b/crates/fiber-lib/src/cch/trackers/lnd_trackers.rs @@ -0,0 +1,535 @@ +//! LND Payment and Invoice Tracker Actor +//! +//! This module implements `LndTrackerActor`, which manages concurrent tracking of +//! Lightning Network invoices and payments via LND (Lightning Network Daemon). +//! +//! ## Key Features +//! +//! - **Concurrent Tracking**: Tracks up to 5 invoices simultaneously to avoid overwhelming LND +//! - **Queue Management**: Maintains FIFO queue for pending invoice tracking requests +//! - **Timeout**: Re-queues active invoices after 5 minutes to prevent indefinite blocking +//! - **Completion Handling**: Properly cleans up when tracker tasks complete, timeout or fail +//! +//! ## Architecture +//! +//! The actor uses a message-passing model with two main message types: +//! - `TrackInvoice(Hash256)`: Adds invoice to tracking queue +//! - `InvoiceTrackerCompleted{...}`: Sent by spawned tracker tasks when they finish +//! +//! When a tracker task completes (successfully or with error), it ALWAYS sends +//! `InvoiceTrackerCompleted` back to the actor. The actor maintains two data structures: +//! - `invoice_queue`: VecDeque of pending invoice hashes +//! - `active_invoice_trackers`: Number of active invoice trackers +//! +//! When completion message arrives: +//! 1. Decrement `active_invoice_trackers` counter +//! 2. Re-queue if failed +//! 3. Dequeue invoices from the queue and start tracking + +use std::{collections::VecDeque, str::FromStr, sync::Arc, time::Duration}; + +use anyhow::{anyhow, Result}; +use futures::StreamExt as _; +use lnd_grpc_tonic_client::{ + create_invoices_client, create_router_client, invoicesrpc, lnrpc, routerrpc, InvoicesClient, + RouterClient, Uri, +}; +use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, OutputPort}; +use tokio::{select, time::sleep}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; + +use crate::{ + cch::{CchIncomingEvent, CchIncomingPaymentStatus, CchOutgoingPaymentStatus}, + fiber::types::Hash256, +}; + +const MAX_CONCURRENT_INVOICE_TRACKERS: usize = 5; +const INVOICE_TRACKING_TIMEOUT: Duration = Duration::from_secs(5 * 60); // 5 minutes + +/// LND connection information +/// +/// This struct contains the connection details for communicating with an LND node. +#[derive(Clone)] +pub struct LndConnectionInfo { + pub uri: Uri, + pub cert: Option>, + pub macaroon: Option>, +} + +impl LndConnectionInfo { + pub fn new(uri: Uri, cert: Option>, macaroon: Option>) -> Self { + Self { + uri, + cert, + macaroon, + } + } + + pub async fn create_router_client( + &self, + ) -> Result { + create_router_client( + self.uri.clone(), + self.cert.as_deref(), + self.macaroon.as_deref(), + ) + .await + } + + pub async fn create_invoices_client( + &self, + ) -> Result { + create_invoices_client( + self.uri.clone(), + self.cert.as_deref(), + self.macaroon.as_deref(), + ) + .await + } +} + +/// Message types for the LndTrackerActor +#[derive(Debug)] +pub enum LndTrackerMessage { + /// Track a new invoice + TrackInvoice(Hash256), + + /// Notification that an invoice tracker task has completed + /// + /// Sent by InvoiceTracker tasks when they terminate (either successfully + /// when invoice reaches final state, or due to error). + InvoiceTrackerCompleted { + payment_hash: Hash256, + completed_successfully: bool, + }, + + /// Get current state snapshot (for testing) + #[cfg(test)] + GetState(ractor::RpcReplyPort), +} + +/// Snapshot of actor state (for testing) +#[cfg(test)] +#[derive(Debug, Clone)] +pub struct StateSnapshot { + pub invoice_queue_len: usize, + pub active_invoice_trackers: usize, +} + +/// Arguments for starting the LndTrackerActor +pub struct LndTrackerArgs { + pub port: Arc>, + pub lnd_connection: LndConnectionInfo, + pub token: CancellationToken, + pub tracker: TaskTracker, +} + +/// State for the LndTrackerActor +pub struct LndTrackerState { + port: Arc>, + lnd_connection: LndConnectionInfo, + token: CancellationToken, + tracker: TaskTracker, + /// Queue of payment hashes waiting to be tracked + invoice_queue: VecDeque, + /// Number of currently active invoice trackers + active_invoice_trackers: usize, +} + +/// Ractor Actor to track LND payments and invoices +/// +/// This actor manages tracking of Lightning Network Daemon (LND) payments and invoices. +/// It provides the following features: +/// +/// ## Payment Tracking +/// - Automatically tracks all LND payments in the background +/// - Sends `CchIncomingEvent::PaymentChanged` events to the output port +/// +/// ## Invoice Tracking +/// - Supports tracking individual invoices via `LndTrackerMessage::TrackInvoice` +/// - Implements concurrency control: maximum 5 concurrent invoice connections +/// - Track invoices with a 5-minute timeout and automatically retry them later +/// - Queues additional invoices when concurrency limit is reached +/// +/// ## Example Usage +/// +/// ```rust,ignore +/// use std::sync::Arc; +/// use ractor::{ActorCell, OutputPort}; +/// use tokio_util::{sync::CancellationToken, task::TaskTracker}; +/// +/// // Create output port for events +/// let port = Arc::new(OutputPort::::default()); +/// +/// // Create connection info +/// let lnd_connection = LndConnectionInfo { +/// uri: "https://localhost:10009".parse().unwrap(), +/// cert: Some(cert_bytes), +/// macaroon: Some(macaroon_bytes), +/// }; +/// +/// // Start the actor +/// let args = LndTrackerArgs { +/// port: port.clone(), +/// lnd_connection, +/// token: CancellationToken::new(), +/// tracker: TaskTracker::new(), +/// }; +/// +/// let actor = LndTrackerActor::start(args, root_actor).await?; +/// +/// // Track an invoice +/// actor.cast(LndTrackerMessage::TrackInvoice(payment_hash))?; +/// ``` +#[derive(Default)] +pub struct LndTrackerActor; + +impl LndTrackerActor { + pub async fn start( + args: LndTrackerArgs, + root_actor: ActorCell, + ) -> Result> { + let (actor, _handle) = Actor::spawn_linked( + Some("lnd_tracker_actor".to_string()), + LndTrackerActor, + args, + root_actor, + ) + .await?; + Ok(actor) + } +} + +#[async_trait::async_trait] +impl Actor for LndTrackerActor { + type Msg = LndTrackerMessage; + type State = LndTrackerState; + type Arguments = LndTrackerArgs; + + async fn pre_start( + &self, + _myself: ActorRef, + args: Self::Arguments, + ) -> Result { + let state = LndTrackerState { + port: args.port.clone(), + lnd_connection: args.lnd_connection.clone(), + token: args.token.clone(), + tracker: args.tracker.clone(), + invoice_queue: VecDeque::new(), + active_invoice_trackers: 0, + }; + + // Start payment tracker in background + let payment_tracker = PaymentTracker { + port: args.port, + lnd_connection: args.lnd_connection, + token: args.token, + }; + + args.tracker.spawn(async move { + payment_tracker.run().await; + }); + + Ok(state) + } + + async fn handle( + &self, + myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + LndTrackerMessage::TrackInvoice(payment_hash) => { + state.invoice_queue.push_back(payment_hash); + state.process_invoice_queue(myself).await?; + Ok(()) + } + LndTrackerMessage::InvoiceTrackerCompleted { + payment_hash, + completed_successfully, + } => { + tracing::debug!( + "Processing completion for payment_hash={}, success={}, active={}/{}", + payment_hash, + completed_successfully, + state.active_invoice_trackers, + MAX_CONCURRENT_INVOICE_TRACKERS + ); + state.active_invoice_trackers = state.active_invoice_trackers.saturating_sub(1); + // Re-queue failed tracker + if !completed_successfully { + state.invoice_queue.push_back(payment_hash); + } + + // Now that a slot is free, we can start tracking more invoices from the queue + state.process_invoice_queue(myself).await?; + + Ok(()) + } + + #[cfg(test)] + LndTrackerMessage::GetState(reply_port) => { + let snapshot = StateSnapshot { + invoice_queue_len: state.invoice_queue.len(), + active_invoice_trackers: state.active_invoice_trackers, + }; + let _ = reply_port.send(snapshot); + Ok(()) + } + } + } +} + +impl LndTrackerState { + async fn process_invoice_queue( + &mut self, + myself: ActorRef, + ) -> Result<(), ActorProcessingErr> { + // Process invoices from queue + while self.active_invoice_trackers < MAX_CONCURRENT_INVOICE_TRACKERS { + let Some(payment_hash) = self.invoice_queue.pop_front() else { + break; + }; + self.active_invoice_trackers += 1; + + let tracker = InvoiceTracker { + port: self.port.clone(), + lnd_connection: self.lnd_connection.clone(), + token: self.token.clone(), + payment_hash, + }; + + // Spawned Task Completion Flow: + // 1. Clone actor reference and payment hash before moving into async task + // 2. Spawn tracker in background (tokio::spawn) + // 3. Capture result from tracker.run() + // 4. ALWAYS send InvoiceTrackerCompleted message back to actor + // - This ensures we decrement counter and remove from queue + // - Even on error, the tracker has quit, so we must clean up + + let myself_clone = myself.clone(); + self.tracker.spawn(async move { + select! { + _ = sleep(INVOICE_TRACKING_TIMEOUT) => { + myself_clone.cast(LndTrackerMessage::InvoiceTrackerCompleted { + payment_hash, + completed_successfully: false, + }).expect("cast LndTrackerMessage"); + } + result = tracker.run() => { + myself_clone.cast(LndTrackerMessage::InvoiceTrackerCompleted { + payment_hash, + completed_successfully: result.is_ok(), + }).expect("cast LndTrackerMessage"); + } + } + }); + + tracing::debug!( + "Started invoice tracker for payment_hash={}, active={}/{}", + payment_hash, + self.active_invoice_trackers, + MAX_CONCURRENT_INVOICE_TRACKERS + ); + } + + Ok(()) + } +} + +/// Internal struct for tracking payments +struct PaymentTracker { + port: Arc>, + lnd_connection: LndConnectionInfo, + token: CancellationToken, +} + +impl PaymentTracker { + async fn run(self) { + tracing::debug!("PaymentTracker: will connect {}", self.lnd_connection.uri); + + loop { + select! { + result = self.run_inner() => { + match result { + Ok(_) => { + break; + } + Err(err) => { + tracing::error!( + "Error tracking LND payments, retry 15 seconds later: {:?}", + err + ); + select! { + _ = sleep(Duration::from_secs(15)) => { + // continue + } + _ = self.token.cancelled() => { + tracing::debug!("Cancellation received, shutting down payment tracker"); + return; + } + } + } + } + } + _ = self.token.cancelled() => { + tracing::debug!("Cancellation received, shutting down payment tracker"); + return; + } + } + } + } + + async fn run_inner(&self) -> Result<()> { + let mut client = self.lnd_connection.create_router_client().await?; + let mut stream = client + .track_payments(routerrpc::TrackPaymentsRequest { + no_inflight_updates: true, + }) + .await? + .into_inner(); + + loop { + select! { + payment_opt = stream.next() => { + match payment_opt { + Some(Ok(payment)) => self.on_payment(payment).await?, + Some(Err(err)) => return Err(err.into()), + None => return Err(anyhow!("unexpected closed stream")), + } + } + _ = self.token.cancelled() => { + tracing::debug!("Cancellation received, shutting down payment tracker"); + return Ok(()); + } + } + } + } + + async fn on_payment(&self, payment: lnrpc::Payment) -> Result<()> { + tracing::debug!("payment: {:?}", payment); + let payment_preimage = if !payment.payment_preimage.is_empty() { + Some(Hash256::from_str(&payment.payment_preimage)?) + } else { + None + }; + use lnrpc::payment::PaymentStatus; + let status: CchOutgoingPaymentStatus = PaymentStatus::try_from(payment.status) + .unwrap_or(PaymentStatus::InFlight) + .into(); + + let event = CchIncomingEvent::PaymentChanged { + payment_hash: Hash256::from_str(&payment.payment_hash)?, + payment_preimage, + status, + }; + self.port.send(event); + Ok(()) + } +} + +/// Internal struct for tracking individual invoices +struct InvoiceTracker { + port: Arc>, + payment_hash: Hash256, + lnd_connection: LndConnectionInfo, + token: CancellationToken, +} + +impl InvoiceTracker { + async fn run(self) -> Result<()> { + tracing::debug!( + "InvoiceTracker: will connect {} for payment_hash={}", + self.lnd_connection.uri, + self.payment_hash + ); + + loop { + select! { + result = self.run_inner() => { + match result { + Ok(_) => { + tracing::debug!("InvoiceTracker completed successfully for payment_hash={}", self.payment_hash); + return Ok(()); + } + Err(err) => { + tracing::error!( + "Error tracking LND invoice {}, retry 15 seconds later: {:?}", + self.payment_hash, + err + ); + select! { + _ = sleep(Duration::from_secs(15)) => { + // continue + } + _ = self.token.cancelled() => { + tracing::debug!("Cancellation received, shutting down invoice tracker"); + return Err(anyhow!("Cancelled")); + } + } + } + } + } + _ = self.token.cancelled() => { + tracing::debug!("Cancellation received, shutting down invoice tracker"); + return Err(anyhow!("Cancelled")); + } + } + } + } + + async fn run_inner(&self) -> Result<()> { + let mut client = self.lnd_connection.create_invoices_client().await?; + let mut stream = client + .subscribe_single_invoice(invoicesrpc::SubscribeSingleInvoiceRequest { + r_hash: self.payment_hash.into(), + }) + .await? + .into_inner(); + + loop { + select! { + invoice_opt = stream.next() => { + match invoice_opt { + Some(Ok(invoice)) => if self.on_invoice(invoice).await? { + return Ok(()); + }, + Some(Err(err)) => return Err(err.into()), + None => return Err(anyhow!("unexpected closed stream")), + } + } + _ = self.token.cancelled() => { + tracing::debug!("Cancellation received, shutting down invoice tracker"); + return Ok(()); + } + } + } + } + + // Return true to quit the tracker + async fn on_invoice(&self, invoice: lnrpc::Invoice) -> Result { + tracing::debug!("[InvoiceTracker] invoice: {:?}", invoice); + let payment_preimage = if !invoice.r_preimage.is_empty() { + Some(Hash256::try_from(invoice.r_preimage.as_slice())?) + } else { + None + }; + use lnrpc::invoice::InvoiceState; + let status: CchIncomingPaymentStatus = InvoiceState::try_from(invoice.state) + .unwrap_or(InvoiceState::Open) + .into(); + + let event = CchIncomingEvent::InvoiceChanged { + payment_hash: Hash256::try_from(invoice.r_hash.as_slice())?, + payment_preimage, + status, + }; + self.port.send(event); + + // Quit tracker when the status is final + Ok(status == CchIncomingPaymentStatus::Settled + || status == CchIncomingPaymentStatus::Failed) + } +} diff --git a/crates/fiber-lib/src/cch/trackers/mod.rs b/crates/fiber-lib/src/cch/trackers/mod.rs new file mode 100644 index 000000000..6c4342706 --- /dev/null +++ b/crates/fiber-lib/src/cch/trackers/mod.rs @@ -0,0 +1,2 @@ +mod lnd_trackers; +pub use lnd_trackers::{LndConnectionInfo, LndTrackerActor, LndTrackerArgs, LndTrackerMessage}; diff --git a/crates/fiber-lib/src/rpc/README.md b/crates/fiber-lib/src/rpc/README.md index 2e98dfdec..96200f51e 100644 --- a/crates/fiber-lib/src/rpc/README.md +++ b/crates/fiber-lib/src/rpc/README.md @@ -1083,7 +1083,7 @@ The status of a cross-chain hub order, will update as the order progresses. #### Enum with values of -* `Pending` - Order is created and has not send out payments yet. +* `Pending` - Order is created and has not received the incoming payment * `IncomingAccepted` - HTLC in the incoming payment is accepted. * `OutgoingInFlight` - There's an outgoing payment in flight. * `OutgoingSettled` - The outgoing payment is settled. From 11de4239db21511edd5006572b41a4289684f439 Mon Sep 17 00:00:00 2001 From: ian Date: Thu, 13 Nov 2025 12:24:18 +0800 Subject: [PATCH 2/2] feat: add metrics for LND tracker Add gauges to monitor the LND tracker's internal state for better observability. - Track invoice queue length to monitor backlog size - Monitor count of active invoice trackers for capacity awareness - Conditionally compile metrics behind "metrics" feature flag --- crates/fiber-lib/src/cch/trackers/lnd_trackers.rs | 13 ++++++++++++- crates/fiber-lib/src/metrics.rs | 4 ++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/crates/fiber-lib/src/cch/trackers/lnd_trackers.rs b/crates/fiber-lib/src/cch/trackers/lnd_trackers.rs index 41c61cb06..4873694fe 100644 --- a/crates/fiber-lib/src/cch/trackers/lnd_trackers.rs +++ b/crates/fiber-lib/src/cch/trackers/lnd_trackers.rs @@ -240,7 +240,7 @@ impl Actor for LndTrackerActor { message: Self::Msg, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - match message { + let res = match message { LndTrackerMessage::TrackInvoice(payment_hash) => { state.invoice_queue.push_back(payment_hash); state.process_invoice_queue(myself).await?; @@ -278,7 +278,18 @@ impl Actor for LndTrackerActor { let _ = reply_port.send(snapshot); Ok(()) } + }; + + // update metrics + #[cfg(feature = "metrics")] + { + metrics::gauge!(crate::metrics::CCH_LND_TRACKER_INVOICE_QUEUE_LEN) + .set(state.invoice_queue.len() as f64); + metrics::gauge!(crate::metrics::CCH_LND_TRACKER_ACTIVE_INVOICE_TRACKERS) + .set(state.active_invoice_trackers as f64); } + + res } } diff --git a/crates/fiber-lib/src/metrics.rs b/crates/fiber-lib/src/metrics.rs index 85c7dbfe6..45e0efcac 100644 --- a/crates/fiber-lib/src/metrics.rs +++ b/crates/fiber-lib/src/metrics.rs @@ -10,6 +10,10 @@ pub const INBOUND_PEER_COUNT: &str = "fiber.inbound_peer_count"; pub const OUTBOUND_PEER_COUNT: &str = "fiber.outbound_peer_count"; pub const DOWN_WITH_CHANNEL_PEER_COUNT: &str = "fiber.down_with_channel_peer_count"; +pub const CCH_LND_TRACKER_INVOICE_QUEUE_LEN: &str = "fiber.cch.lnd_tracker.invoice_queue_len"; +pub const CCH_LND_TRACKER_ACTIVE_INVOICE_TRACKERS: &str = + "fiber.cch.lnd_tracker.active_invoice_trackers"; + pub fn start_metrics(metrics_addr: &str) -> Result<(), Box> { let socket_addr = metrics_addr .to_socket_addrs()