Skip to content

Commit d458f99

Browse files
committed
Scan phase of scans batches writes
1 parent 1d0bf36 commit d458f99

File tree

2 files changed

+102
-33
lines changed

2 files changed

+102
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Changed
1111
- **Database connection handling**: Refactored to use R2D2 connection pool for improved concurrency and resource management
12+
- **Batch updates during scan phase of scans**: Now transacting in batches rather than for each item
1213

1314
### Fixed
1415
- **Settings page**: Active configuration values now readable in light mode with proper background colors

src/scanner.rs

Lines changed: 101 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,79 @@ use std::{cmp, fs};
4040

4141
pub struct Scanner {}
4242

43+
/// Batch size for database write operations during scanning
44+
const SCAN_BATCH_SIZE: usize = 100;
45+
4346
/// Context passed through recursive directory scanning to avoid large parameter lists
4447
struct ScanContext<'a> {
4548
conn: &'a Connection,
4649
scan: &'a Scan,
4750
reporter: &'a Arc<ProgressReporter>,
4851
interrupt_token: &'a Arc<AtomicBool>,
52+
batch_count: usize,
53+
}
54+
55+
impl<'a> ScanContext<'a> {
56+
fn new(
57+
conn: &'a Connection,
58+
scan: &'a Scan,
59+
reporter: &'a Arc<ProgressReporter>,
60+
interrupt_token: &'a Arc<AtomicBool>,
61+
) -> Self {
62+
Self {
63+
conn,
64+
scan,
65+
reporter,
66+
interrupt_token,
67+
batch_count: 0,
68+
}
69+
}
70+
71+
fn execute_batch_write<F, T>(&mut self, f: F) -> Result<T, FsPulseError>
72+
where
73+
F: FnOnce(&Connection) -> Result<T, FsPulseError>,
74+
{
75+
// Start transaction on first write
76+
if self.batch_count == 0 {
77+
self.conn
78+
.execute("BEGIN IMMEDIATE", [])
79+
.map_err(FsPulseError::DatabaseError)?;
80+
}
81+
82+
let result = f(self.conn)?;
83+
self.batch_count += 1;
84+
85+
// Auto-flush at batch size
86+
if self.batch_count >= SCAN_BATCH_SIZE {
87+
self.flush()?;
88+
}
89+
90+
Ok(result)
91+
}
92+
93+
fn flush(&mut self) -> Result<(), FsPulseError> {
94+
if self.batch_count > 0 {
95+
self.conn
96+
.execute("COMMIT", [])
97+
.map_err(FsPulseError::DatabaseError)?;
98+
self.batch_count = 0;
99+
}
100+
Ok(())
101+
}
102+
}
103+
104+
impl<'a> Drop for ScanContext<'a> {
105+
fn drop(&mut self) {
106+
// If we still have unflushed writes, we're in an error scenario
107+
// (normal path explicitly calls flush()). Rollback to maintain data integrity.
108+
if self.batch_count > 0 {
109+
error!(
110+
"ScanContext dropped with {} unflushed writes - rolling back transaction",
111+
self.batch_count
112+
);
113+
let _ = self.conn.execute("ROLLBACK", []);
114+
}
115+
}
49116
}
50117

51118
impl Scanner {
@@ -146,8 +213,7 @@ impl Scanner {
146213

147214
// Handle the subdirectory with its computed size
148215
let returned_size = Scanner::handle_scan_item(
149-
ctx.conn,
150-
ctx.scan,
216+
ctx,
151217
ItemType::Directory,
152218
&item.path(),
153219
&item_metadata,
@@ -175,8 +241,7 @@ impl Scanner {
175241
};
176242

177243
let returned_size = Scanner::handle_scan_item(
178-
ctx.conn,
179-
ctx.scan,
244+
ctx,
180245
item_type,
181246
&item.path(),
182247
&item_metadata,
@@ -200,17 +265,18 @@ impl Scanner {
200265
let root_path_buf = PathBuf::from(root.root_path());
201266

202267
// Create scanning context
203-
let mut ctx = ScanContext {
204-
conn: &conn,
205-
scan,
206-
reporter: &reporter,
207-
interrupt_token,
208-
};
268+
let mut ctx = ScanContext::new(&conn, scan, &reporter, interrupt_token);
209269

210270
// Recursively scan the root directory and get the total size
211271
// Note: We don't store the root directory itself as an item in the database
212272
let total_size = Scanner::scan_directory_recursive(&mut ctx, &root_path_buf)?;
213273

274+
// Flush any remaining batched writes
275+
ctx.flush()?;
276+
277+
// Drop ctx to release the immutable borrow of scan before we mutably borrow it
278+
drop(ctx);
279+
214280
// The total_size column is set on the scan row before advancing to the next phase
215281
// This means it doesn't have to be computed or set later in the scan, but it does need
216282
// to be set to NULL if the scan ends in stoppage or error
@@ -466,16 +532,15 @@ impl Scanner {
466532
}
467533

468534
fn handle_scan_item(
469-
conn: &Connection,
470-
scan: &Scan,
535+
ctx: &mut ScanContext,
471536
item_type: ItemType,
472537
path: &Path,
473538
metadata: &Metadata,
474539
computed_size: Option<i64>,
475540
) -> Result<i64, FsPulseError> {
476541
// load the item
477542
let path_str = path.to_string_lossy();
478-
let existing_item = Item::get_by_root_path_type(conn, scan.root_id(), &path_str, item_type)?;
543+
let existing_item = Item::get_by_root_path_type(ctx.conn, ctx.scan.root_id(), &path_str, item_type)?;
479544

480545
let mod_date = metadata
481546
.modified()
@@ -494,15 +559,15 @@ impl Scanner {
494559
// such as when an item was seen as a file, the scan was resumed and the item has
495560
// changed into a directory. In this case, we'll still traverse the children within
496561
// the resumed scan and a tree report will look odd.
497-
if existing_item.last_scan() == scan.scan_id() {
562+
if existing_item.last_scan() == ctx.scan.scan_id() {
498563
return Ok(existing_item.size().unwrap_or(0));
499564
}
500565

501566
let meta_change = existing_item.mod_date() != mod_date || existing_item.size() != size;
502567

503568
if existing_item.is_ts() {
504569
// Rehydrate a tombstone
505-
Database::immediate_transaction(conn, |c| {
570+
ctx.execute_batch_write(|c| {
506571
let rows_updated = c.execute(
507572
"UPDATE items SET
508573
is_ts = 0,
@@ -519,7 +584,7 @@ impl Scanner {
519584
mod_date,
520585
size,
521586
ValidationState::Unknown.as_i64(),
522-
scan.scan_id(),
587+
ctx.scan.scan_id(),
523588
existing_item.item_id(),
524589
),
525590
)?;
@@ -548,7 +613,7 @@ impl Scanner {
548613
VALUES
549614
(?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?)",
550615
(
551-
scan.scan_id(),
616+
ctx.scan.scan_id(),
552617
existing_item.item_id(),
553618
ChangeType::Add.as_i64(),
554619
existing_item.mod_date(),
@@ -564,14 +629,14 @@ impl Scanner {
564629
Ok(())
565630
})?;
566631
} else if meta_change {
567-
Database::immediate_transaction(conn, |c| {
632+
ctx.execute_batch_write(|c| {
568633
let rows_updated = c.execute(
569634
"UPDATE items SET
570635
mod_date = ?,
571636
size = ?,
572637
last_scan = ?
573638
WHERE item_id = ?",
574-
(mod_date, size, scan.scan_id(), existing_item.item_id()),
639+
(mod_date, size, ctx.scan.scan_id(), existing_item.item_id()),
575640
)?;
576641
if rows_updated == 0 {
577642
return Err(FsPulseError::Error(format!(
@@ -592,7 +657,7 @@ impl Scanner {
592657
size_new)
593658
VALUES (?, ?, ?, 1, ?, ?, ?, ?)",
594659
(
595-
scan.scan_id(),
660+
ctx.scan.scan_id(),
596661
existing_item.item_id(),
597662
ChangeType::Modify.as_i64(),
598663
meta_change.then_some(existing_item.mod_date()),
@@ -606,28 +671,31 @@ impl Scanner {
606671
})?;
607672
} else {
608673
// No change - just update last_scan
609-
let rows_updated = conn.execute(
610-
"UPDATE items SET last_scan = ? WHERE item_id = ?",
611-
(scan.scan_id(), existing_item.item_id()),
612-
)?;
674+
ctx.execute_batch_write(|c| {
675+
let rows_updated = c.execute(
676+
"UPDATE items SET last_scan = ? WHERE item_id = ?",
677+
(ctx.scan.scan_id(), existing_item.item_id()),
678+
)?;
613679

614-
if rows_updated == 0 {
615-
return Err(FsPulseError::Error(format!(
616-
"Item Id {} not found for update",
617-
existing_item.item_id()
618-
)));
619-
}
680+
if rows_updated == 0 {
681+
return Err(FsPulseError::Error(format!(
682+
"Item Id {} not found for update",
683+
existing_item.item_id()
684+
)));
685+
}
686+
Ok(())
687+
})?;
620688
}
621689
} else {
622690
// Item is new, insert into items and changes tables
623-
Database::immediate_transaction(conn, |c| {
691+
ctx.execute_batch_write(|c| {
624692
c.execute("INSERT INTO items (root_id, item_path, item_type, mod_date, size, val, last_scan) VALUES (?, ?, ?, ?, ?, ?, ?)",
625-
(scan.root_id(), &path_str, item_type.as_i64(), mod_date, size, ValidationState::Unknown.as_i64(), scan.scan_id()))?;
693+
(ctx.scan.root_id(), &path_str, item_type.as_i64(), mod_date, size, ValidationState::Unknown.as_i64(), ctx.scan.scan_id()))?;
626694

627695
let item_id: i64 = c.query_row("SELECT last_insert_rowid()", [], |row| row.get(0))?;
628696

629697
c.execute("INSERT INTO changes (scan_id, item_id, change_type, is_undelete, mod_date_new, size_new, hash_change, val_change) VALUES (?, ?, ?, 0, ?, ?, 0, 0)",
630-
(scan.scan_id(), item_id, ChangeType::Add.as_i64(), mod_date, size))?;
698+
(ctx.scan.scan_id(), item_id, ChangeType::Add.as_i64(), mod_date, size))?;
631699

632700
Ok(())
633701
})?;

0 commit comments

Comments
 (0)