Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ predicates = "3.0"
tempfile = "3.20"
rstest = "0.25"

[features]
# `deacon server` feature.
server = []
default = ["server"]

[[test]]
name = "cli_tests"
path = "tests/cli_tests.rs"
Expand Down
14 changes: 7 additions & 7 deletions src/filter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{FilterConfig, index::load_minimizer_hashes};
use crate::{FilterConfig, index::load_minimizer_hashes_cached};
use anyhow::{Context, Result};
use flate2::write::GzEncoder;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
Expand Down Expand Up @@ -211,7 +211,7 @@ pub struct FilterSummary {
#[derive(Clone)]
struct FilterProcessor {
// Minimizer matching parameters
minimizer_hashes: Arc<FxHashSet<u64>>,
minimizer_hashes: &'static FxHashSet<u64>,
kmer_length: u8,
window_size: u8,
abs_threshold: usize,
Expand Down Expand Up @@ -275,7 +275,7 @@ impl FilterProcessor {
}
}
fn new(
minimizer_hashes: Arc<FxHashSet<u64>>,
minimizer_hashes: &'static FxHashSet<u64>,
kmer_length: u8,
window_size: u8,
config: &FilterProcessorConfig,
Expand Down Expand Up @@ -767,10 +767,11 @@ pub fn run(config: &FilterConfig) -> Result<()> {

// Configure thread pool if nonzero
if config.threads > 0 {
rayon::ThreadPoolBuilder::new()
// error is OK here when we initialize a 2nd time in server mode.
let _ = rayon::ThreadPoolBuilder::new()
.num_threads(config.threads)
.build_global()
.context("Failed to initialize thread pool")?;
.context("Failed to initialize thread pool");
}

let mode = if config.deplete { "deplete" } else { "search" };
Expand Down Expand Up @@ -815,8 +816,7 @@ pub fn run(config: &FilterConfig) -> Result<()> {
check_input_paths(config)?;

// Load minimizer hashes and parse header
let (minimizer_hashes, header) = load_minimizer_hashes(&config.minimizers_path)?;
let minimizer_hashes = Arc::new(minimizer_hashes);
let (minimizer_hashes, header) = load_minimizer_hashes_cached(&config.minimizers_path)?;

let kmer_length = header.kmer_length();
let window_size = header.window_size();
Expand Down
19 changes: 19 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{self, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::time::Instant;

use needletail::{parse_fastx_file, parse_fastx_stdin};
Expand Down Expand Up @@ -69,6 +70,24 @@ pub fn load_header_and_count<P: AsRef<Path>>(path: &P) -> Result<(IndexHeader, u
Ok((header, count))
}

static INDEX: OnceLock<(PathBuf, FxHashSet<u64>, IndexHeader)> = OnceLock::new();

pub fn load_minimizer_hashes_cached<P: AsRef<Path>>(
path: &P,
) -> Result<(&'static FxHashSet<u64>, &'static IndexHeader)> {
let (p, minimizers, header) = INDEX.get_or_init(|| {
let (m, h) = load_minimizer_hashes(path).unwrap();
(path.as_ref().to_owned(), m, h)
});
assert_eq!(
p,
path.as_ref(),
"Currently, the server can only have one index loaded."
);

Ok((minimizers, header))
}

/// Load the hashes without spiking memory usage with an extra vec
pub fn load_minimizer_hashes<P: AsRef<Path>>(path: &P) -> Result<(FxHashSet<u64>, IndexHeader)> {
let file =
Expand Down
91 changes: 87 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,28 @@ use deacon::{
DEFAULT_KMER_LENGTH, DEFAULT_WINDOW_SIZE, FilterConfig, IndexConfig, diff_index, index_info,
union_index,
};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[cfg(feature = "server")]
use std::os::unix::net::{UnixListener, UnixStream};
#[cfg(feature = "server")]
use std::io::{Read, Write};

#[derive(Parser)]
#[derive(Parser, Serialize, Deserialize)]
#[command(author, version, about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
#[arg(long)]
use_server: bool,
}

#[derive(Subcommand)]
#[derive(Subcommand, Serialize, Deserialize)]
enum Commands {
#[cfg(feature = "server")]
Server,
#[cfg(feature = "server")]
Exit,
/// Build and compose minimizer indexes
Index {
#[command(subcommand)]
Expand Down Expand Up @@ -82,7 +93,7 @@ enum Commands {
},
}

#[derive(Subcommand)]
#[derive(Subcommand, Serialize, Deserialize)]
enum IndexCommands {
/// Index minimizers contained within a fastx file
Build {
Expand Down Expand Up @@ -160,6 +171,15 @@ enum IndexCommands {
},
}

#[cfg(feature = "server")]
#[derive(Serialize, Deserialize)]
enum Message {
/// client -> server
Command(Commands),
/// server -> client
Done,
}

fn main() -> Result<()> {
// Check we have either AVX2 or NEON
#[cfg(not(any(target_feature = "avx2", target_feature = "neon")))]
Expand All @@ -171,7 +191,70 @@ fn main() -> Result<()> {

let cli = Cli::parse();

match &cli.command {
#[cfg(feature = "server")]
if matches!(cli.command, Commands::Server) {
assert!(
!cli.use_server,
"`deacon --use server Server` does not make sense."
);
let listener = UnixListener::bind("deacon_server_socket")?;
for stream in listener.incoming() {
let mut stream = stream.unwrap();
let mut message = vec![];
let mut buf = vec![0; 10000];
loop {
let len = stream.read(&mut buf)?;
let buf = &buf[..len];
message.extend_from_slice(buf);
if buf.contains(&0) {
assert_eq!(buf.last(), Some(&0));
message.pop();
break;
}
}
let message: Message = serde_json::from_slice(&message).unwrap();
match message {
Message::Command(Commands::Exit) => {
serde_json::to_writer(stream, &Message::Done)?;
break;
}
Message::Command(commands) => {
process_command(&commands)?;
serde_json::to_writer(stream, &Message::Done)?;
}
Message::Done => unreachable!("Server should not receive `Done` messages."),
}
}

return Ok(());
}

#[cfg(feature = "server")]
if cli.use_server {
let mut stream = UnixStream::connect("deacon_server_socket")?;
serde_json::to_writer(&stream, &Message::Command(cli.command))?;
stream.write(b"\0")?;
stream.flush()?;
let message: Message = serde_json::from_reader(stream).unwrap();
match message {
Message::Done => {}
_ => unreachable!("The client only expects to receive `Done` messages."),
}

return Ok(());
}

process_command(&cli.command)?;

Ok(())
}

fn process_command(command: &Commands) -> Result<(), anyhow::Error> {
match &command {
#[cfg(feature = "server")]
Commands::Server => unreachable!(),
#[cfg(feature = "server")]
Commands::Exit => panic!("Use `deacon --use-server Exit` to stop the server."),
Commands::Index { command } => match command {
IndexCommands::Build {
input,
Expand Down