Skip to content

Commit c9fdc2e

Browse files
authored
fix(FileStore): Stop keep-alive thread on cancellation token. (#4666)
Signed-off-by: Graham King <[email protected]>
1 parent 71f94ed commit c9fdc2e

File tree

3 files changed

+27
-9
lines changed

3 files changed

+27
-9
lines changed

lib/runtime/src/distributed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl DistributedRuntime {
111111
tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
112112
kv::Manager::etcd(etcd_client)
113113
}
114-
kv::Selector::File(root) => kv::Manager::file(root),
114+
kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
115115
kv::Selector::Memory => kv::Manager::memory(),
116116
};
117117

lib/runtime/src/storage/kv.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,8 @@ impl Manager {
265265
Self::new(KeyValueStoreEnum::Etcd(EtcdStore::new(etcd_client)))
266266
}
267267

268-
pub fn file<P: Into<PathBuf>>(root: P) -> Self {
269-
Self::new(KeyValueStoreEnum::File(FileStore::new(root)))
268+
pub fn file<P: Into<PathBuf>>(cancel_token: CancellationToken, root: P) -> Self {
269+
Self::new(KeyValueStoreEnum::File(FileStore::new(cancel_token, root)))
270270
}
271271

272272
fn new(s: KeyValueStoreEnum) -> Manager {

lib/runtime/src/storage/kv/file.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use async_trait::async_trait;
2020
use futures::StreamExt;
2121
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, event};
2222
use parking_lot::Mutex;
23+
use tokio_util::sync::CancellationToken;
2324

2425
use super::{Bucket, Key, KeyValue, Store, StoreError, StoreOutcome, WatchEvent};
2526

@@ -33,6 +34,7 @@ const MIN_KEEP_ALIVE: Duration = Duration::from_secs(1);
3334
/// Treat as a singleton
3435
#[derive(Clone)]
3536
pub struct FileStore {
37+
cancel_token: CancellationToken,
3638
root: PathBuf,
3739
connection_id: u64,
3840
/// Directories we may have created files in, for shutdown cleanup and keep-alive.
@@ -41,8 +43,9 @@ pub struct FileStore {
4143
}
4244

4345
impl FileStore {
44-
pub(super) fn new<P: Into<PathBuf>>(root_dir: P) -> Self {
46+
pub(super) fn new<P: Into<PathBuf>>(cancel_token: CancellationToken, root_dir: P) -> Self {
4547
let fs = FileStore {
48+
cancel_token,
4649
root: root_dir.into(),
4750
connection_id: rand::random::<u64>(),
4851
active_dirs: Arc::new(Mutex::new(HashMap::new())),
@@ -52,16 +55,27 @@ impl FileStore {
5255
fs
5356
}
5457

55-
/// Keep our files alive and delete expired keys. Does not return.
56-
/// We run this in a real thread so it doesn't get delayed by tokio runtime load.
57-
/// It doesn't need any cleanup so we don't use cancellation token.
58-
fn expiry_thread(&self) -> ! {
58+
/// Keep our files alive and delete expired keys.
59+
///
60+
/// Does not return until cancellation token cancelled. On shutdown the process will
61+
/// often exit before we detect cancellation. That's fine.
62+
/// We run this in a real thread so it doesn't get delayed by tokio runtime under heavy load.
63+
fn expiry_thread(&self) {
5964
loop {
6065
let ttl = self.shortest_ttl();
6166
let keep_alive_interval = cmp::max(ttl / 3, MIN_KEEP_ALIVE);
6267

68+
// Check before and after the sleep
69+
if self.cancel_token.is_cancelled() {
70+
break;
71+
}
72+
6373
thread::sleep(keep_alive_interval);
6474

75+
if self.cancel_token.is_cancelled() {
76+
break;
77+
}
78+
6579
self.keep_alive();
6680
if let Err(err) = self.delete_expired_files() {
6781
tracing::error!(error = %err, "FileStore delete_expired_files");
@@ -469,13 +483,16 @@ fn to_fs_err<E: std::error::Error>(err: E) -> StoreError {
469483
mod tests {
470484
use std::collections::HashSet;
471485

486+
use tokio_util::sync::CancellationToken;
487+
472488
use crate::storage::kv::{Bucket as _, FileStore, Key, Store as _};
473489

474490
#[tokio::test]
475491
async fn test_entries_full_path() {
476492
let t = tempfile::tempdir().unwrap();
477493

478-
let m = FileStore::new(t.path());
494+
let cancel_token = CancellationToken::new();
495+
let m = FileStore::new(cancel_token.clone(), t.path());
479496
let bucket = m.get_or_create_bucket("v1/tests", None).await.unwrap();
480497
let _ = bucket
481498
.insert(&Key::new("key1/multi/part".to_string()), "value1".into(), 0)
@@ -487,6 +504,7 @@ mod tests {
487504
.unwrap();
488505
let entries = bucket.entries().await.unwrap();
489506
let keys: HashSet<Key> = entries.into_keys().collect();
507+
cancel_token.cancel(); // stop the background thread
490508

491509
assert!(keys.contains(&Key::new("v1/tests/key1/multi/part".to_string())));
492510
assert!(keys.contains(&Key::new("v1/tests/key2".to_string())));

0 commit comments

Comments
 (0)