From 8af22dee8a53ca9da62c3893454ae29e007da13b Mon Sep 17 00:00:00 2001 From: Ragnar Groot Koerkamp Date: Thu, 18 Sep 2025 19:32:52 +0200 Subject: [PATCH 1/6] Add a `server` subcommand and `--use-server` flag to send commands there via unix sockets --- src/main.rs | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3d56a97..f56f6df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,17 +4,21 @@ use deacon::{ DEFAULT_KMER_LENGTH, DEFAULT_WINDOW_SIZE, FilterConfig, IndexConfig, diff_index, index_info, union_index, }; -use std::path::PathBuf; +use serde::{Deserialize, Serialize}; +use std::{os::unix::net::{UnixListener, UnixStream}, path::PathBuf}; -#[derive(Parser)] +#[derive(Parser, Serialize, Deserialize)] #[command(author, version, about, long_about = None)] struct Cli { #[command(subcommand)] command: Commands, + #[arg(long)] + use_server: bool, } -#[derive(Subcommand)] +#[derive(Subcommand, Serialize, Deserialize)] enum Commands { + Server, /// Build and compose minimizer indexes Index { #[command(subcommand)] @@ -82,7 +86,7 @@ enum Commands { }, } -#[derive(Subcommand)] +#[derive(Subcommand, Serialize, Deserialize)] enum IndexCommands { /// Index minimizers contained within a fastx file Build { @@ -171,7 +175,27 @@ fn main() -> Result<()> { let cli = Cli::parse(); - match &cli.command { + if matches!(cli.command, Commands::Server) { + let listener = UnixListener::bind("deacon_server_socket")?; + for stream in listener.incoming() { + let command: Commands = serde_json::from_reader(stream.unwrap()).unwrap(); + process_command(&command)?; + } + } else { + if cli.use_server { + let stream = UnixStream::connect("deacon_server_socket")?; + serde_json::to_writer(stream, &cli.command)?; + } else { + process_command(&cli.command)?; + } + } + + Ok(()) +} + +fn process_command(command: &Commands) -> Result<(), anyhow::Error> { + match &command { + Commands::Server => panic!(), Commands::Index { command } => match command { IndexCommands::Build { input, From 07cb82a89e9d3ce53d44da0416789e2ce0aaf41f Mon Sep 17 00:00:00 2001 From: Ragnar Groot Koerkamp Date: Thu, 18 Sep 2025 19:59:34 +0200 Subject: [PATCH 2/6] Reuse index between filter operations --- src/filter.rs | 14 +++++++------- src/index.rs | 15 +++++++++++++++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/filter.rs b/src/filter.rs index b0a2cca..ce1b9d8 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,4 +1,4 @@ -use crate::{FilterConfig, index::load_minimizer_hashes}; +use crate::{FilterConfig, index::load_minimizer_hashes_cached}; use anyhow::{Context, Result}; use flate2::write::GzEncoder; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; @@ -211,7 +211,7 @@ pub struct FilterSummary { #[derive(Clone)] struct FilterProcessor { // Minimizer matching parameters - minimizer_hashes: Arc>, + minimizer_hashes: &'static FxHashSet, kmer_length: u8, window_size: u8, abs_threshold: usize, @@ -275,7 +275,7 @@ impl FilterProcessor { } } fn new( - minimizer_hashes: Arc>, + minimizer_hashes: &'static FxHashSet, kmer_length: u8, window_size: u8, config: &FilterProcessorConfig, @@ -767,10 +767,11 @@ pub fn run(config: &FilterConfig) -> Result<()> { // Configure thread pool if nonzero if config.threads > 0 { - rayon::ThreadPoolBuilder::new() + // error is OK here when we initialize a 2nd time in server mode. + let _ = rayon::ThreadPoolBuilder::new() .num_threads(config.threads) .build_global() - .context("Failed to initialize thread pool")?; + .context("Failed to initialize thread pool"); } let mode = if config.deplete { "deplete" } else { "search" }; @@ -815,8 +816,7 @@ pub fn run(config: &FilterConfig) -> Result<()> { check_input_paths(config)?; // Load minimizer hashes and parse header - let (minimizer_hashes, header) = load_minimizer_hashes(&config.minimizers_path)?; - let minimizer_hashes = Arc::new(minimizer_hashes); + let (minimizer_hashes, header) = load_minimizer_hashes_cached(&config.minimizers_path)?; let kmer_length = header.kmer_length(); let window_size = header.window_size(); diff --git a/src/index.rs b/src/index.rs index 6a2fbbf..d7e8cd2 100644 --- a/src/index.rs +++ b/src/index.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use std::fs::File; use std::io::{self, BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; +use std::sync::OnceLock; use std::time::Instant; use needletail::{parse_fastx_file, parse_fastx_stdin}; @@ -69,6 +70,20 @@ pub fn load_header_and_count>(path: &P) -> Result<(IndexHeader, u Ok((header, count)) } +static INDEX: OnceLock<(PathBuf, FxHashSet, IndexHeader)> = OnceLock::new(); + +pub fn load_minimizer_hashes_cached>( + path: &P, +) -> Result<(&'static FxHashSet, &'static IndexHeader)> { + let (p, minimizers, header) = INDEX.get_or_init(|| { + let (m, h) = load_minimizer_hashes(path).unwrap(); + (path.as_ref().to_owned(), m, h) + }); + assert_eq!(p, path.as_ref(), "Currently, the server can only have one index loaded."); + + Ok((minimizers, header)) +} + /// Load the hashes without spiking memory usage with an extra vec pub fn load_minimizer_hashes>(path: &P) -> Result<(FxHashSet, IndexHeader)> { let file = From 5e1f8c385a8973ecb828a9980b4dfbb47b1fff64 Mon Sep 17 00:00:00 2001 From: Ragnar Groot Koerkamp Date: Thu, 18 Sep 2025 20:37:37 +0200 Subject: [PATCH 3/6] Make child-process wait for server; add exit command --- src/main.rs | 43 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index f56f6df..6679413 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,12 @@ use deacon::{ union_index, }; use serde::{Deserialize, Serialize}; -use std::{os::unix::net::{UnixListener, UnixStream}, path::PathBuf}; +use std::{ + io::{Read, Write}, + os::unix::net::{UnixListener, UnixStream}, + path::PathBuf, + process::exit, +}; #[derive(Parser, Serialize, Deserialize)] #[command(author, version, about, long_about = None)] @@ -19,6 +24,8 @@ struct Cli { #[derive(Subcommand, Serialize, Deserialize)] enum Commands { Server, + Exit, + Done, /// Build and compose minimizer indexes Index { #[command(subcommand)] @@ -178,13 +185,37 @@ fn main() -> Result<()> { if matches!(cli.command, Commands::Server) { let listener = UnixListener::bind("deacon_server_socket")?; for stream in listener.incoming() { - let command: Commands = serde_json::from_reader(stream.unwrap()).unwrap(); - process_command(&command)?; + let mut stream = stream.unwrap(); + let mut message = vec![]; + let mut buf = vec![0; 10000]; + loop { + let len = stream.read(&mut buf)?; + let buf = &buf[..len]; + message.extend_from_slice(buf); + if buf.contains(&0) { + assert_eq!(buf.last(), Some(&0)); + message.pop(); + break; + } + } + let command: Commands = serde_json::from_slice(&message).unwrap(); + let do_exit = matches!(command, Commands::Exit); + if !do_exit { + process_command(&command)?; + } + serde_json::to_writer(stream, &Commands::Done)?; + if do_exit { + exit(0); + } } } else { if cli.use_server { - let stream = UnixStream::connect("deacon_server_socket")?; - serde_json::to_writer(stream, &cli.command)?; + let mut stream = UnixStream::connect("deacon_server_socket")?; + serde_json::to_writer(&stream, &cli.command)?; + stream.write(b"\0")?; + stream.flush()?; + let command: Commands = serde_json::from_reader(stream).unwrap(); + assert!(matches!(command, Commands::Done)); } else { process_command(&cli.command)?; } @@ -196,6 +227,8 @@ fn main() -> Result<()> { fn process_command(command: &Commands) -> Result<(), anyhow::Error> { match &command { Commands::Server => panic!(), + Commands::Done => panic!(), + Commands::Exit => exit(0), Commands::Index { command } => match command { IndexCommands::Build { input, From 43f9a4b0b46b25cd77faabc6da0a0c627023e596 Mon Sep 17 00:00:00 2001 From: Ragnar Groot Koerkamp Date: Thu, 18 Sep 2025 21:29:38 +0200 Subject: [PATCH 4/6] Separate Message type --- src/index.rs | 6 +++++- src/main.rs | 47 +++++++++++++++++++++++++++++++---------------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/src/index.rs b/src/index.rs index d7e8cd2..22f6c96 100644 --- a/src/index.rs +++ b/src/index.rs @@ -79,7 +79,11 @@ pub fn load_minimizer_hashes_cached>( let (m, h) = load_minimizer_hashes(path).unwrap(); (path.as_ref().to_owned(), m, h) }); - assert_eq!(p, path.as_ref(), "Currently, the server can only have one index loaded."); + assert_eq!( + p, + path.as_ref(), + "Currently, the server can only have one index loaded." + ); Ok((minimizers, header)) } diff --git a/src/main.rs b/src/main.rs index 6679413..30ae38a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,6 @@ use std::{ io::{Read, Write}, os::unix::net::{UnixListener, UnixStream}, path::PathBuf, - process::exit, }; #[derive(Parser, Serialize, Deserialize)] @@ -25,7 +24,6 @@ struct Cli { enum Commands { Server, Exit, - Done, /// Build and compose minimizer indexes Index { #[command(subcommand)] @@ -171,6 +169,14 @@ enum IndexCommands { }, } +#[derive(Serialize, Deserialize)] +enum Message { + /// client -> server + Command(Commands), + /// server -> client + Done, +} + fn main() -> Result<()> { // Check we have either AVX2 or NEON #[cfg(not(any(target_feature = "avx2", target_feature = "neon")))] @@ -183,6 +189,10 @@ fn main() -> Result<()> { let cli = Cli::parse(); if matches!(cli.command, Commands::Server) { + assert!( + !cli.use_server, + "`deacon --use server Server` does not make sense." + ); let listener = UnixListener::bind("deacon_server_socket")?; for stream in listener.incoming() { let mut stream = stream.unwrap(); @@ -198,24 +208,30 @@ fn main() -> Result<()> { break; } } - let command: Commands = serde_json::from_slice(&message).unwrap(); - let do_exit = matches!(command, Commands::Exit); - if !do_exit { - process_command(&command)?; - } - serde_json::to_writer(stream, &Commands::Done)?; - if do_exit { - exit(0); + let message: Message = serde_json::from_slice(&message).unwrap(); + match message { + Message::Command(Commands::Exit) => { + serde_json::to_writer(stream, &Message::Done)?; + break; + } + Message::Command(commands) => { + process_command(&commands)?; + serde_json::to_writer(stream, &Message::Done)?; + } + Message::Done => unreachable!("Server should not receive `Done` messages."), } } } else { if cli.use_server { let mut stream = UnixStream::connect("deacon_server_socket")?; - serde_json::to_writer(&stream, &cli.command)?; + serde_json::to_writer(&stream, &Message::Command(cli.command))?; stream.write(b"\0")?; stream.flush()?; - let command: Commands = serde_json::from_reader(stream).unwrap(); - assert!(matches!(command, Commands::Done)); + let message: Message = serde_json::from_reader(stream).unwrap(); + match message { + Message::Done => {} + _ => unreachable!("The client only expects to receive `Done` messages."), + } } else { process_command(&cli.command)?; } @@ -226,9 +242,8 @@ fn main() -> Result<()> { fn process_command(command: &Commands) -> Result<(), anyhow::Error> { match &command { - Commands::Server => panic!(), - Commands::Done => panic!(), - Commands::Exit => exit(0), + Commands::Server => unreachable!(), + Commands::Exit => panic!("Use `deacon --use-server Exit` to stop the server."), Commands::Index { command } => match command { IndexCommands::Build { input, From f6f31bdd4f0d0ac320d81b807543c94b400a0c33 Mon Sep 17 00:00:00 2001 From: Ragnar Groot Koerkamp Date: Fri, 19 Sep 2025 11:44:10 +0200 Subject: [PATCH 5/6] Put server behind a feature flag --- Cargo.toml | 5 +++++ src/main.rs | 47 +++++++++++++++++++++++++++++------------------ 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bdfdadc..0417f5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,11 @@ predicates = "3.0" tempfile = "3.20" rstest = "0.25" +[features] +# `deacon server` feature. +server = [] +default = [] + [[test]] name = "cli_tests" path = "tests/cli_tests.rs" diff --git a/src/main.rs b/src/main.rs index 30ae38a..8fc9ed3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,11 +5,11 @@ use deacon::{ union_index, }; use serde::{Deserialize, Serialize}; -use std::{ - io::{Read, Write}, - os::unix::net::{UnixListener, UnixStream}, - path::PathBuf, -}; +use std::path::PathBuf; +#[cfg(feature = "server")] +use std::os::unix::net::{UnixListener, UnixStream}; +#[cfg(feature = "server")] +use std::io::{Read, Write}; #[derive(Parser, Serialize, Deserialize)] #[command(author, version, about, long_about = None)] @@ -22,7 +22,9 @@ struct Cli { #[derive(Subcommand, Serialize, Deserialize)] enum Commands { + #[cfg(feature = "server")] Server, + #[cfg(feature = "server")] Exit, /// Build and compose minimizer indexes Index { @@ -169,6 +171,7 @@ enum IndexCommands { }, } +#[cfg(feature = "server")] #[derive(Serialize, Deserialize)] enum Message { /// client -> server @@ -188,6 +191,7 @@ fn main() -> Result<()> { let cli = Cli::parse(); + #[cfg(feature = "server")] if matches!(cli.command, Commands::Server) { assert!( !cli.use_server, @@ -221,28 +225,35 @@ fn main() -> Result<()> { Message::Done => unreachable!("Server should not receive `Done` messages."), } } - } else { - if cli.use_server { - let mut stream = UnixStream::connect("deacon_server_socket")?; - serde_json::to_writer(&stream, &Message::Command(cli.command))?; - stream.write(b"\0")?; - stream.flush()?; - let message: Message = serde_json::from_reader(stream).unwrap(); - match message { - Message::Done => {} - _ => unreachable!("The client only expects to receive `Done` messages."), - } - } else { - process_command(&cli.command)?; + + return Ok(()); + } + + #[cfg(feature = "server")] + if cli.use_server { + let mut stream = UnixStream::connect("deacon_server_socket")?; + serde_json::to_writer(&stream, &Message::Command(cli.command))?; + stream.write(b"\0")?; + stream.flush()?; + let message: Message = serde_json::from_reader(stream).unwrap(); + match message { + Message::Done => {} + _ => unreachable!("The client only expects to receive `Done` messages."), } + + return Ok(()); } + process_command(&cli.command)?; + Ok(()) } fn process_command(command: &Commands) -> Result<(), anyhow::Error> { match &command { + #[cfg(feature = "server")] Commands::Server => unreachable!(), + #[cfg(feature = "server")] Commands::Exit => panic!("Use `deacon --use-server Exit` to stop the server."), Commands::Index { command } => match command { IndexCommands::Build { From dee0a589ad80a80c100d0ca9ab28f3f1335e257a Mon Sep 17 00:00:00 2001 From: Ragnar Groot Koerkamp Date: Fri, 26 Sep 2025 16:15:11 +0200 Subject: [PATCH 6/6] server feature is on by default --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 0417f5e..73d9f5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ rstest = "0.25" [features] # `deacon server` feature. server = [] -default = [] +default = ["server"] [[test]] name = "cli_tests"