Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,28 +523,32 @@ impl Daemon {
// buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned.
#[trace]
fn requests(&self, method: &str, params_list: Vec<Value>) -> Result<Vec<Value>> {
self.requests_iter(method, params_list).collect()
self.rpc_threads
.install(|| self.requests_iter(method, params_list).collect())
}

// Send requests in parallel over multiple RPC connections, iterating over the results without buffering them.
// Errors are included in the iterator and do not terminate other pending requests.
//
// IMPORTANT: The returned parallel iterator must be collected inside self.rpc_threads.install()
// to ensure it runs on the daemon's own thread pool, not the global rayon pool. This is necessary
// because the per-thread DAEMON_INSTANCE thread-locals would otherwise be shared across different
// daemon instances in the same process (e.g. during parallel tests).
#[trace]
fn requests_iter<'a>(
&'a self,
method: &'a str,
params_list: Vec<Value>,
) -> impl ParallelIterator<Item = Result<Value>> + IndexedParallelIterator + 'a {
self.rpc_threads.install(move || {
params_list.into_par_iter().map(move |params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will
// get initialized as necessary for the `rpc_threads` pool thread managed by rayon.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
daemon
.get_or_init(|| self.retry_reconnect())
.retry_request(&method, &params)
})
params_list.into_par_iter().map(move |params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will
// get initialized as necessary for the `rpc_threads` pool thread managed by rayon.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
daemon
.get_or_init(|| self.retry_reconnect())
.retry_request(&method, &params)
})
})
}
Expand Down Expand Up @@ -647,20 +651,22 @@ impl Daemon {
.map(|txhash| json!([txhash, /*verbose=*/ false]))
.collect();

self.requests_iter("getrawtransaction", params_list)
.zip(txids)
.filter_map(|(res, txid)| match res {
Ok(val) => Some(tx_from_value(val).map(|tx| (**txid, tx))),
// Ignore 'tx not found' errors
Err(Error(ErrorKind::RpcError(code, _, _), _))
if code == RPC_INVALID_ADDRESS_OR_KEY =>
{
None
}
// Terminate iteration if any other errors are encountered
Err(e) => Some(Err(e)),
})
.collect()
self.rpc_threads.install(|| {
self.requests_iter("getrawtransaction", params_list)
.zip(txids)
.filter_map(|(res, txid)| match res {
Ok(val) => Some(tx_from_value(val).map(|tx| (**txid, tx))),
// Ignore 'tx not found' errors
Err(Error(ErrorKind::RpcError(code, _, _), _))
if code == RPC_INVALID_ADDRESS_OR_KEY =>
{
None
}
// Terminate iteration if any other errors are encountered
Err(e) => Some(Err(e)),
})
.collect()
})
}

#[trace]
Expand Down Expand Up @@ -773,11 +779,12 @@ impl Daemon {

result.append(&mut headers);

info!("downloaded {}/{} block headers ({:.0}%)",
info!(
"downloaded {}/{} block headers ({:.0}%)",
result.len(),
tip_height,
result.len() as f32 / tip_height as f32 * 100.0);

result.len() as f32 / tip_height as f32 * 100.0
);
}

let mut blockhash = *DEFAULT_BLOCKHASH;
Expand Down
45 changes: 33 additions & 12 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ use noded::bitcoincore_rpc::{self, RpcApi};

use electrs::{
chain::{Address, BlockHash, Network, Txid},
config::Config,
config::{Config, RpcLogging},
daemon::Daemon,
electrum::RPC as ElectrumRPC,
metrics::Metrics,
new_index::{ChainQuery, FetchFrom, Indexer, Mempool, Query, Store},
rest,
signal::Waiter,
};
use electrs::config::RpcLogging;

pub struct TestRunner {
config: Arc<Config>,
Expand Down Expand Up @@ -290,23 +289,33 @@ impl bitcoincore_rpc::RpcApi for TestRunner {

pub fn init_rest_tester() -> Result<(rest::Handle, net::SocketAddr, TestRunner)> {
let tester = TestRunner::new()?;
let addr = tester.config.http_addr;
let rest_server = rest::start(Arc::clone(&tester.config), Arc::clone(&tester.query));
log::info!("REST server running on {}", tester.config.http_addr);
Ok((rest_server, tester.config.http_addr, tester))
wait_for_tcp(addr, "REST");
Ok((rest_server, addr, tester))
}
pub fn init_electrum_tester() -> Result<(ElectrumRPC, net::SocketAddr, TestRunner)> {
let tester = TestRunner::new()?;
let addr = tester.config.electrum_rpc_addr;
let electrum_server = ElectrumRPC::start(
Arc::clone(&tester.config),
Arc::clone(&tester.query),
&tester.metrics,
Arc::clone(&tester.salt_rwlock),
);
log::info!(
"Electrum server running on {}",
tester.config.electrum_rpc_addr
);
Ok((electrum_server, tester.config.electrum_rpc_addr, tester))
wait_for_tcp(addr, "Electrum");
Ok((electrum_server, addr, tester))
}

fn wait_for_tcp(addr: net::SocketAddr, name: &str) {
for _ in 0..50 {
if net::TcpStream::connect(addr).is_ok() {
log::info!("{} server running on {}", name, addr);
return;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
panic!("{} server failed to start on {}", name, addr);
}

#[cfg(not(feature = "liquid"))]
Expand Down Expand Up @@ -348,9 +357,21 @@ fn init_log() -> StdErrLog {
}

fn rand_available_addr() -> net::SocketAddr {
// note this has a potential but unlikely race condition, if the port is grabbed before the caller binds it
let socket = net::UdpSocket::bind("127.0.0.1:0").unwrap();
socket.local_addr().unwrap()
use std::collections::HashSet;
use std::sync::Mutex;

lazy_static::lazy_static! {
static ref USED_PORTS: Mutex<HashSet<u16>> = Mutex::new(HashSet::new());
}

loop {
let mut used = USED_PORTS.lock().unwrap();
let socket = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = socket.local_addr().unwrap();
if used.insert(addr.port()) {
return addr;
}
}
}

error_chain::error_chain! {
Expand Down
Loading