Skip to content

Commit ca7cece

Browse files
committed
Indexing: auto-rescan on FSEvents channel overflow
- Increase `cmdr-fsevent-stream` internal channel from 1,024 to 32,768 batches - Add `Arc<AtomicBool>` overflow flag set on `try_send` failure (logs once, then silent) - `DriveWatcher` exposes `overflow_flag()` for passing to async event loop tasks - Both `run_live_event_loop` and `run_replay_event_loop` check the flag every 1s flush tick — on overflow, emit `WatcherChannelOverflow` rescan notification, drain channel, exit - Add `WatcherChannelOverflow` variant to `RescanReason` enum and frontend toast map
1 parent b74ed39 commit ca7cece

File tree

4 files changed

+108
-8
lines changed

4 files changed

+108
-8
lines changed

apps/desktop/src-tauri/src/indexing/CLAUDE.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,17 +118,17 @@ Key test files are alongside each module (test functions within `#[cfg(test)]` b
118118

119119
**Subtree aggregation uses scoped queries**: `scoped_get_children_stats_by_id` and `scoped_get_child_dir_ids` in `aggregator.rs` use recursive CTEs scoped to the target subtree, not full-table scans. This keeps subtree aggregation O(subtree_size) regardless of total DB size.
120120

121-
**Bounded buffers prevent OOM**: All buffers have capacity limits. Reconciler buffer: 500K events (overflow triggers full rescan). Writer channel: 20K messages (bounded `sync_channel`, backpressure). Replay `affected_paths`: 50K entries (overflow emits full refresh). Replay `pending_rescans`: 1K entries (overflow triggers full rescan). Replay event count: 1M events max (overflow falls back to full scan). Memory watchdog: warns at 8 GB, stops indexing at 16 GB. The index is a disposable cache, so dropping events and rescanning is always safe.
121+
**Bounded buffers prevent OOM**: All buffers have capacity limits. FSEvents channel: 32K batches (bounded `try_send` in cmdr-fsevent-stream; overflow sets atomic flag, triggers rescan). Reconciler buffer: 500K events (overflow triggers full rescan). Writer channel: 20K messages (bounded `sync_channel`, backpressure). Replay `affected_paths`: 50K entries (overflow emits full refresh). Replay `pending_rescans`: 1K entries (overflow triggers full rescan). Replay event count: 1M events max (overflow falls back to full scan). Memory watchdog: warns at 8 GB, stops indexing at 16 GB. The index is a disposable cache, so dropping events and rescanning is always safe.
122122

123123
**Disposable cache pattern**: The index DB is a cache, not a source of truth. Any corruption or error triggers delete+rebuild. No user-facing errors for DB issues.
124124

125-
**cmdr-fsevent-stream fork (macOS only)**: Our fork of `fsevent-stream` (v0.3.0) provides direct access to FSEvents event IDs, `sinceWhen` replay, and `MustScanSubDirs` flags. Only used on macOS. On Linux, the `notify` crate (inotify backend) provides recursive directory watching with `RecursiveMode::Recursive`.
125+
**cmdr-fsevent-stream fork (macOS only)**: Vendored in `crates/fsevent-stream/` (forked from `fsevent-stream` v0.3.0). Provides direct access to FSEvents event IDs, `sinceWhen` replay, and `MustScanSubDirs` flags. Only used on macOS. On Linux, the `notify` crate (inotify backend) provides recursive directory watching with `RecursiveMode::Recursive`.
126126

127127
**Linux inotify watch limits**: Default `fs.inotify.max_user_watches` is ~8192. The `notify` crate's recursive mode adds one inotify watch per directory. Power users with large directory trees may hit this limit; the workaround is `sysctl fs.inotify.max_user_watches=524288`. The watcher gracefully handles watch errors without crashing.
128128

129129
**APFS firmlinks**: Scan from `/` only, skip `/System/Volumes/Data`. Normalize all paths via firmlink prefix map so DB lookups work regardless of how the user navigated to a path.
130130

131-
**Rescan notification system (`RescanReason` enum)**: Every code path that falls back to a full rescan emits an `index-rescan-notification` event with a `RescanReason` variant and human-readable details. The frontend maps each reason to a user-friendly toast message. Seven reasons: `StaleIndex` (pre-check gap), `JournalGap` (in-loop gap), `ReplayOverflow` (>1M events), `TooManySubdirRescans` (>1K MustScanSubDirs), `WatcherStartFailed`, `ReconcilerBufferOverflow` (>500K buffered events during scan), `IncompletePreviousScan` (has data but no `scan_completed_at`). The pre-check in `resume_or_scan()` catches stale indexes before starting the FSEvents stream, preventing the cmdr-fsevent-stream channel (1024 capacity, `try_send`) from being overwhelmed.
131+
**Rescan notification system (`RescanReason` enum)**: Every code path that falls back to a full rescan emits an `index-rescan-notification` event with a `RescanReason` variant and human-readable details. The frontend maps each reason to a user-friendly toast message. Eight reasons: `StaleIndex` (pre-check gap), `JournalGap` (in-loop gap), `ReplayOverflow` (>1M events), `TooManySubdirRescans` (>1K MustScanSubDirs), `WatcherStartFailed`, `ReconcilerBufferOverflow` (>500K buffered events during scan), `IncompletePreviousScan` (has data but no `scan_completed_at`), `WatcherChannelOverflow` (FSEvents channel full, events dropped). The pre-check in `resume_or_scan()` catches stale indexes before starting the FSEvents stream, preventing the cmdr-fsevent-stream channel (32K capacity, `try_send`) from being overwhelmed.
132132

133133
## Gotchas
134134

apps/desktop/src-tauri/src/indexing/mod.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,8 @@ pub enum RescanReason {
273273
ReconcilerBufferOverflow,
274274
/// Previous scan didn't complete (app crashed or was force-quit).
275275
IncompletePreviousScan,
276+
/// FSEvents channel overflowed — events were dropped.
277+
WatcherChannelOverflow,
276278
}
277279

278280
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -469,8 +471,10 @@ impl IndexManager {
469471
let (event_tx, event_rx) = tokio::sync::mpsc::channel(WATCHER_CHANNEL_CAPACITY);
470472
let current_id = watcher::current_event_id();
471473

474+
let watcher_overflow: Option<Arc<AtomicBool>>;
472475
match DriveWatcher::start(&self.volume_root, since_event_id, event_tx) {
473476
Ok(watcher) => {
477+
watcher_overflow = Some(watcher.overflow_flag());
474478
self.drive_watcher = Some(watcher);
475479
log::debug!("DriveWatcher started for replay (sinceWhen={since_event_id}, current={current_id})");
476480
}
@@ -523,6 +527,7 @@ impl IndexManager {
523527
},
524528
fallback_tx,
525529
micro_scans,
530+
watcher_overflow,
526531
)
527532
.await;
528533

@@ -597,12 +602,16 @@ impl IndexManager {
597602
let (event_tx, event_rx) = tokio::sync::mpsc::channel(WATCHER_CHANNEL_CAPACITY);
598603
let scan_start_event_id = watcher::current_event_id();
599604

605+
// watcher_overflow is None if the watcher failed to start (non-fatal).
606+
let watcher_overflow: Option<Arc<AtomicBool>>;
600607
match DriveWatcher::start(&self.volume_root, 0, event_tx) {
601608
Ok(watcher) => {
609+
watcher_overflow = Some(watcher.overflow_flag());
602610
self.drive_watcher = Some(watcher);
603611
log::debug!("DriveWatcher started (scan_start_event_id={scan_start_event_id})");
604612
}
605613
Err(e) => {
614+
watcher_overflow = None;
606615
// Watcher failure is non-fatal: scan works without it, just no live updates
607616
log::warn!("Failed to start DriveWatcher (scan will proceed without watcher): {e}");
608617
}
@@ -665,6 +674,7 @@ impl IndexManager {
665674
let micro_scans = self.micro_scans.clone();
666675
let scanning = Arc::clone(&self.scanning);
667676
let live_event_task_slot = Arc::clone(&self.live_event_task);
677+
let watcher_overflow_flag = watcher_overflow;
668678
tauri::async_runtime::spawn(async move {
669679
// Wait for scan to complete
670680
let join_result = tokio::task::spawn_blocking(move || join_handle.join()).await;
@@ -716,6 +726,22 @@ impl IndexManager {
716726
);
717727
}
718728

729+
// Check if the FSEvents channel overflowed (events dropped
730+
// before reaching the forward task). If so, our buffered events
731+
// are incomplete — the reconciler replay will miss changes.
732+
// We still proceed (the scan data itself is fine), but log a
733+
// warning. The live event loop will detect the overflow flag
734+
// and trigger a rescan at that point, since a fresh scan is
735+
// the only way to recover from dropped events.
736+
if let Some(ref flag) = watcher_overflow_flag {
737+
if flag.load(Ordering::Relaxed) {
738+
log::info!(
739+
"FSEvents channel overflowed during scan — some watcher \
740+
events were dropped. Live event loop will trigger a rescan."
741+
);
742+
}
743+
}
744+
719745
// Emit scan-complete first, then start the flushing phase.
720746
// Order matters: the frontend's scan-complete handler calls
721747
// resetAggregation(), so the saving_entries event must come
@@ -794,8 +820,13 @@ impl IndexManager {
794820
// Step 5: Start live event processing loop
795821
let writer_live = writer.clone();
796822
let app_live = app.clone();
823+
let volume_id_live = volume_id.clone();
824+
let overflow_live = watcher_overflow_flag.clone();
797825
let handle = tauri::async_runtime::spawn(async move {
798-
run_live_event_loop(event_rx, reconciler, writer_live, app_live).await;
826+
run_live_event_loop(
827+
event_rx, reconciler, writer_live, app_live,
828+
volume_id_live, overflow_live,
829+
).await;
799830
});
800831

801832
// Store the handle so shutdown() can wait for it to drain
@@ -1118,6 +1149,8 @@ async fn run_live_event_loop(
11181149
mut reconciler: EventReconciler,
11191150
writer: IndexWriter,
11201151
app: AppHandle,
1152+
volume_id: String,
1153+
watcher_overflow: Option<Arc<AtomicBool>>,
11211154
) {
11221155
log::debug!("Live event processing started");
11231156

@@ -1177,6 +1210,28 @@ async fn run_live_event_loop(
11771210
}
11781211
}
11791212
_ = flush_interval.tick() => {
1213+
// Check if the FSEvents channel overflowed — events were dropped
1214+
// between FSEvents and our forward task. The only safe recovery is
1215+
// a full rescan.
1216+
if let Some(ref flag) = watcher_overflow {
1217+
if flag.load(Ordering::Relaxed) {
1218+
emit_rescan_notification(
1219+
&app,
1220+
&volume_id,
1221+
RescanReason::WatcherChannelOverflow,
1222+
format!(
1223+
"The filesystem watcher's event channel overflowed after \
1224+
{event_count} live events. Some file changes were lost."
1225+
),
1226+
);
1227+
// Drain and discard remaining events — they're a partial
1228+
// picture and processing them before a rescan is pointless.
1229+
event_rx.close();
1230+
while event_rx.recv().await.is_some() {}
1231+
break;
1232+
}
1233+
}
1234+
11801235
process_live_batch(
11811236
&mut pending_events, &mut reconciler, &conn,
11821237
&writer, &mut pending_paths,
@@ -1272,6 +1327,7 @@ async fn run_replay_event_loop(
12721327
config: ReplayConfig,
12731328
fallback_tx: tokio::sync::oneshot::Sender<()>,
12741329
micro_scans: MicroScanManager,
1330+
watcher_overflow: Option<Arc<AtomicBool>>,
12751331
) -> Result<(), String> {
12761332
let ReplayConfig {
12771333
volume_id,
@@ -1584,6 +1640,28 @@ async fn run_replay_event_loop(
15841640
}
15851641
}
15861642
_ = flush_interval.tick() => {
1643+
// Check if the FSEvents channel overflowed
1644+
if let Some(ref flag) = watcher_overflow {
1645+
if flag.load(Ordering::Relaxed) {
1646+
emit_rescan_notification(
1647+
&app,
1648+
&volume_id,
1649+
RescanReason::WatcherChannelOverflow,
1650+
format!(
1651+
"The filesystem watcher's event channel overflowed after \
1652+
{event_count} replay + {live_count} live events. Some file \
1653+
changes were lost."
1654+
),
1655+
);
1656+
if let Some(tx) = fallback_tx.take() {
1657+
let _ = tx.send(());
1658+
}
1659+
event_rx.close();
1660+
while event_rx.recv().await.is_some() {}
1661+
return Ok(());
1662+
}
1663+
}
1664+
15871665
process_live_batch(
15881666
&mut live_pending_events, &mut reconciler, &conn,
15891667
&writer, &mut live_pending_paths,

apps/desktop/src-tauri/src/indexing/watcher.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@
1212
//! `DriveWatcher::start` returns `WatcherError::StreamCreate` and
1313
//! `current_event_id` returns `0`.
1414
15+
use std::sync::Arc;
16+
use std::sync::atomic::{AtomicBool, Ordering};
17+
1518
#[cfg(target_os = "macos")]
1619
use std::path::Path;
17-
#[cfg(any(target_os = "macos", target_os = "linux"))]
18-
use std::sync::Arc;
1920
#[cfg(target_os = "macos")]
2021
use std::sync::atomic::AtomicU64;
21-
#[cfg(any(target_os = "macos", target_os = "linux"))]
22-
use std::sync::atomic::{AtomicBool, Ordering};
2322
#[cfg(target_os = "macos")]
2423
use std::time::Duration;
2524

@@ -109,6 +108,8 @@ pub struct DriveWatcher {
109108
running: Arc<AtomicBool>,
110109
/// Last processed event ID (atomically updated as events arrive).
111110
last_event_id: Arc<AtomicU64>,
111+
/// Set to `true` when the FSEvents channel overflows and events are dropped.
112+
overflow: Arc<AtomicBool>,
112113
/// Handle to abort the FSEvents run loop thread.
113114
handler: Option<EventStreamHandler>,
114115
/// Task that reads the event stream and forwards events.
@@ -149,6 +150,8 @@ impl DriveWatcher {
149150
)
150151
.map_err(WatcherError::Io)?;
151152

153+
let overflow = event_stream.overflow_flag();
154+
152155
log::debug!("DriveWatcher started on {} (sinceWhen={since_when})", root.display());
153156

154157
// Spawn a task to read the async event stream and forward events.
@@ -178,6 +181,7 @@ impl DriveWatcher {
178181
Ok(Self {
179182
running,
180183
last_event_id,
184+
overflow,
181185
handler: Some(handler),
182186
forward_task: Some(forward_task),
183187
})
@@ -212,6 +216,11 @@ impl DriveWatcher {
212216
pub fn is_running(&self) -> bool {
213217
self.running.load(Ordering::Relaxed)
214218
}
219+
220+
/// Returns a shared handle to the overflow flag for passing to async tasks.
221+
pub fn overflow_flag(&self) -> Arc<AtomicBool> {
222+
Arc::clone(&self.overflow)
223+
}
215224
}
216225

217226
#[cfg(target_os = "macos")]
@@ -327,6 +336,13 @@ impl DriveWatcher {
327336
pub fn is_running(&self) -> bool {
328337
self.running.load(Ordering::Relaxed)
329338
}
339+
340+
/// Returns a shared handle to the overflow flag for passing to async tasks.
341+
/// Linux never overflows (backpressure via `blocking_send`), but the API is
342+
/// cross-platform.
343+
pub fn overflow_flag(&self) -> Arc<AtomicBool> {
344+
Arc::new(AtomicBool::new(false))
345+
}
330346
}
331347

332348
#[cfg(target_os = "linux")]
@@ -434,6 +450,10 @@ impl DriveWatcher {
434450
pub fn is_running(&self) -> bool {
435451
false
436452
}
453+
454+
pub fn overflow_flag(&self) -> Arc<AtomicBool> {
455+
Arc::new(AtomicBool::new(false))
456+
}
437457
}
438458

439459
// ── Helpers ──────────────────────────────────────────────────────────

apps/desktop/src/lib/indexing/index-state.svelte.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ const rescanReasonToMessage: Record<string, string> = {
7979
'Heavy filesystem activity overwhelmed the event buffer. Running a fresh scan to stay accurate.',
8080
incomplete_previous_scan:
8181
"The previous scan didn't finish (the app may have been closed). Restarting the scan from scratch.",
82+
watcher_channel_overflow:
83+
'A burst of filesystem activity overflowed the watcher channel. Running a fresh scan to stay accurate.',
8284
}
8385

8486
// Event listener cleanup handles

0 commit comments

Comments
 (0)