diff --git a/Cargo.toml b/Cargo.toml index bdfdadc..73d9f5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,11 @@ predicates = "3.0" tempfile = "3.20" rstest = "0.25" +[features] +# `deacon server` feature. +server = [] +default = ["server"] + [[test]] name = "cli_tests" path = "tests/cli_tests.rs" diff --git a/src/filter.rs b/src/filter.rs index b0a2cca..ce1b9d8 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,4 +1,4 @@ -use crate::{FilterConfig, index::load_minimizer_hashes}; +use crate::{FilterConfig, index::load_minimizer_hashes_cached}; use anyhow::{Context, Result}; use flate2::write::GzEncoder; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; @@ -211,7 +211,7 @@ pub struct FilterSummary { #[derive(Clone)] struct FilterProcessor { // Minimizer matching parameters - minimizer_hashes: Arc>, + minimizer_hashes: &'static FxHashSet, kmer_length: u8, window_size: u8, abs_threshold: usize, @@ -275,7 +275,7 @@ impl FilterProcessor { } } fn new( - minimizer_hashes: Arc>, + minimizer_hashes: &'static FxHashSet, kmer_length: u8, window_size: u8, config: &FilterProcessorConfig, @@ -767,10 +767,11 @@ pub fn run(config: &FilterConfig) -> Result<()> { // Configure thread pool if nonzero if config.threads > 0 { - rayon::ThreadPoolBuilder::new() + // error is OK here when we initialize a 2nd time in server mode. + let _ = rayon::ThreadPoolBuilder::new() .num_threads(config.threads) .build_global() - .context("Failed to initialize thread pool")?; + .context("Failed to initialize thread pool"); } let mode = if config.deplete { "deplete" } else { "search" }; @@ -815,8 +816,7 @@ pub fn run(config: &FilterConfig) -> Result<()> { check_input_paths(config)?; // Load minimizer hashes and parse header - let (minimizer_hashes, header) = load_minimizer_hashes(&config.minimizers_path)?; - let minimizer_hashes = Arc::new(minimizer_hashes); + let (minimizer_hashes, header) = load_minimizer_hashes_cached(&config.minimizers_path)?; let kmer_length = header.kmer_length(); let window_size = header.window_size(); diff --git a/src/index.rs b/src/index.rs index 6a2fbbf..22f6c96 100644 --- a/src/index.rs +++ b/src/index.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use std::fs::File; use std::io::{self, BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; +use std::sync::OnceLock; use std::time::Instant; use needletail::{parse_fastx_file, parse_fastx_stdin}; @@ -69,6 +70,24 @@ pub fn load_header_and_count>(path: &P) -> Result<(IndexHeader, u Ok((header, count)) } +static INDEX: OnceLock<(PathBuf, FxHashSet, IndexHeader)> = OnceLock::new(); + +pub fn load_minimizer_hashes_cached>( + path: &P, +) -> Result<(&'static FxHashSet, &'static IndexHeader)> { + let (p, minimizers, header) = INDEX.get_or_init(|| { + let (m, h) = load_minimizer_hashes(path).unwrap(); + (path.as_ref().to_owned(), m, h) + }); + assert_eq!( + p, + path.as_ref(), + "Currently, the server can only have one index loaded." + ); + + Ok((minimizers, header)) +} + /// Load the hashes without spiking memory usage with an extra vec pub fn load_minimizer_hashes>(path: &P) -> Result<(FxHashSet, IndexHeader)> { let file = diff --git a/src/main.rs b/src/main.rs index 3d56a97..8fc9ed3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,17 +4,28 @@ use deacon::{ DEFAULT_KMER_LENGTH, DEFAULT_WINDOW_SIZE, FilterConfig, IndexConfig, diff_index, index_info, union_index, }; +use serde::{Deserialize, Serialize}; use std::path::PathBuf; +#[cfg(feature = "server")] +use std::os::unix::net::{UnixListener, UnixStream}; +#[cfg(feature = "server")] +use std::io::{Read, Write}; -#[derive(Parser)] +#[derive(Parser, Serialize, Deserialize)] #[command(author, version, about, long_about = None)] struct Cli { #[command(subcommand)] command: Commands, + #[arg(long)] + use_server: bool, } -#[derive(Subcommand)] +#[derive(Subcommand, Serialize, Deserialize)] enum Commands { + #[cfg(feature = "server")] + Server, + #[cfg(feature = "server")] + Exit, /// Build and compose minimizer indexes Index { #[command(subcommand)] @@ -82,7 +93,7 @@ enum Commands { }, } -#[derive(Subcommand)] +#[derive(Subcommand, Serialize, Deserialize)] enum IndexCommands { /// Index minimizers contained within a fastx file Build { @@ -160,6 +171,15 @@ enum IndexCommands { }, } +#[cfg(feature = "server")] +#[derive(Serialize, Deserialize)] +enum Message { + /// client -> server + Command(Commands), + /// server -> client + Done, +} + fn main() -> Result<()> { // Check we have either AVX2 or NEON #[cfg(not(any(target_feature = "avx2", target_feature = "neon")))] @@ -171,7 +191,70 @@ fn main() -> Result<()> { let cli = Cli::parse(); - match &cli.command { + #[cfg(feature = "server")] + if matches!(cli.command, Commands::Server) { + assert!( + !cli.use_server, + "`deacon --use server Server` does not make sense." + ); + let listener = UnixListener::bind("deacon_server_socket")?; + for stream in listener.incoming() { + let mut stream = stream.unwrap(); + let mut message = vec![]; + let mut buf = vec![0; 10000]; + loop { + let len = stream.read(&mut buf)?; + let buf = &buf[..len]; + message.extend_from_slice(buf); + if buf.contains(&0) { + assert_eq!(buf.last(), Some(&0)); + message.pop(); + break; + } + } + let message: Message = serde_json::from_slice(&message).unwrap(); + match message { + Message::Command(Commands::Exit) => { + serde_json::to_writer(stream, &Message::Done)?; + break; + } + Message::Command(commands) => { + process_command(&commands)?; + serde_json::to_writer(stream, &Message::Done)?; + } + Message::Done => unreachable!("Server should not receive `Done` messages."), + } + } + + return Ok(()); + } + + #[cfg(feature = "server")] + if cli.use_server { + let mut stream = UnixStream::connect("deacon_server_socket")?; + serde_json::to_writer(&stream, &Message::Command(cli.command))?; + stream.write(b"\0")?; + stream.flush()?; + let message: Message = serde_json::from_reader(stream).unwrap(); + match message { + Message::Done => {} + _ => unreachable!("The client only expects to receive `Done` messages."), + } + + return Ok(()); + } + + process_command(&cli.command)?; + + Ok(()) +} + +fn process_command(command: &Commands) -> Result<(), anyhow::Error> { + match &command { + #[cfg(feature = "server")] + Commands::Server => unreachable!(), + #[cfg(feature = "server")] + Commands::Exit => panic!("Use `deacon --use-server Exit` to stop the server."), Commands::Index { command } => match command { IndexCommands::Build { input,