AirLibrary/Indexing/Background/
StartWatcher.rs

1//! # StartWatcher
2//!
3//! ## File: Indexing/Background/StartWatcher.rs
4//!
5//! ## Role in Air Architecture
6//!
7//! Provides background task management for the File Indexer service,
8//! handling file watching startup and periodic indexing tasks.
9//!
10//! ## Primary Responsibility
11//!
12//! Start and manage background file watcher and periodic indexing tasks
13//! for the indexing service.
14//!
15//! ## Secondary Responsibilities
16//!
17//! - File watcher initialization and lifecycle management
18//! - Periodic background re-indexing
19//! - Watcher event debouncing
20//! - Background task cleanup
21//!
22//! ## Dependencies
23//!
24//! **External Crates:**
25//! - `notify` - File system watching
26//! - `tokio` - Async runtime for background tasks
27//!
28//! **Internal Modules:**
29//! - `crate::Result` - Error handling type
30//! - `crate::AirError` - Error types
31//! - `crate::ApplicationState::ApplicationState` - Application state
32//! - `super::super::FileIndexer` - Main file indexer
33//! - `super::WatchFile` - File watching operations
34//!
35//! ## Dependents
36//!
37//! - `Indexing::mod::FileIndexer` - Main file indexer implementation
38//!
39//! ## VSCode Pattern Reference
40//!
41//! Inspired by VSCode's background services in
42//! `src/vs/workbench/services/search/common/`
43//!
44//! ## Security Considerations
45//!
46//! - Path validation before watching
47//! - Watch path limits enforcement
48//! - Permission checking on watch paths
49//!
50//! ## Performance Considerations
51//!
52//! - Event debouncing prevents excessive re-indexing
53//! - Parallel processing of file changes
54//! - Efficient background task scheduling
55//!
56//! ## Error Handling Strategy
57//!
58//! Background tasks log errors and continue running, ensuring
59//! temporary failures don't stop the indexing service.
60//!
61//! ## Thread Safety
62//!
63//! Background tasks use Arc for shared state and async/await
64//! for safe concurrent operations.
65
66use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69	sync::{Mutex, RwLock, Semaphore},
70	task::JoinHandle,
71};
72
73use crate::{
74	AirError,
75	ApplicationState::ApplicationState,
76	Configuration::IndexingConfig,
77	Indexing::State::CreateState::{FileIndex, SymbolLocation},
78	Result,
79};
80
81/// Maximum number of parallel watch event processors
82const MAX_WATCH_PROCESSORS:usize = 5;
83
84/// Background indexer context containing shared state
85pub struct BackgroundIndexerContext {
86	/// Application state reference
87	pub app_state:Arc<ApplicationState>,
88	/// File index
89	pub file_index:Arc<RwLock<FileIndex>>,
90	/// Corruption detected flag
91	pub corruption_detected:Arc<Mutex<bool>>,
92	/// File watcher (optional)
93	pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
94	/// Semaphore for limiting parallel operations
95	pub indexing_semaphore:Arc<Semaphore>,
96	/// Debounced event handler
97	pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
98}
99
100impl BackgroundIndexerContext {
101	pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
102		Self {
103			app_state,
104			file_index,
105			corruption_detected:Arc::new(Mutex::new(false)),
106			file_watcher:Arc::new(Mutex::new(None)),
107			indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
108			debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
109		}
110	}
111}
112
113/// Start file watcher for incremental indexing
114///
115/// Monitors file system changes and updates index in real-time.
116/// This enables:
117/// - Real-time search updates
118/// - Automatic reindexing of changed files
119/// - Removal of deleted files from index
120pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
121	use notify::{RecursiveMode, Watcher};
122
123	let index = context.file_index.clone();
124	let corruption_flag = context.corruption_detected.clone();
125	let config = context.app_state.Configuration.Indexing.clone();
126	let debounced_handler = context.debounced_handler.clone();
127
128	// Create and start the watcher
129	let mut watcher:notify::RecommendedWatcher = Watcher::new(
130		move |res:std::result::Result<notify::Event, notify::Error>| {
131			if let Ok(event) = res {
132				// Check corruption flag before processing events
133				if *corruption_flag.blocking_lock() {
134					log::warn!("[StartWatcher] Skipping file event - index marked as corrupted");
135					return;
136				}
137
138				let index = index.clone();
139				let debounced_handler = debounced_handler.clone();
140				let config_clone = config.clone();
141
142				tokio::spawn(async move {
143					// Convert event to change type and add to debounced handler
144					if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
145						for path in &event.paths {
146							if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
147								path,
148								&crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
149							) {
150								debounced_handler.AddChange(path.clone(), change_type).await;
151							}
152						}
153					}
154				});
155			}
156		},
157		notify::Config::default(),
158	)
159	.map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
160
161	// Watch all specified paths
162	for path in &paths {
163		if path.exists() {
164			match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
165				Ok(()) => {
166					watcher
167						.watch(path, notify::RecursiveMode::Recursive)
168						.map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
169					log::info!("[StartWatcher] Watching path: {}", path.display());
170				},
171				Err(e) => {
172					log::warn!("[StartWatcher] Skipping invalid watch path {}: {}", path.display(), e);
173				},
174			}
175		}
176	}
177
178	*context.file_watcher.lock().await = Some(watcher);
179
180	log::info!("[StartWatcher] File watcher started successfully for {} paths", paths.len());
181
182	Ok(())
183}
184
185/// Start the debounce processor task
186pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
187	tokio::spawn(async move {
188		log::info!("[StartWatcher] Debounce processor started");
189
190		let interval = Duration::from_millis(100); // Process every 100ms
191		// Debounce age cutoff
192		let debounce_cutoff = Duration::from_millis(500);
193
194		loop {
195			tokio::time::sleep(interval).await;
196			{
197				// Check corruption flag
198				if *context.corruption_detected.lock().await {
199					log::warn!("[StartWatcher] Index corrupted, pausing debounce processing");
200					tokio::time::sleep(Duration::from_secs(5)).await;
201					continue;
202				}
203
204				// Process pending changes
205				let config = context.app_state.Configuration.Indexing.clone();
206
207				match context
208					.debounced_handler
209					.ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
210					.await
211				{
212					Ok(changes) => {
213						if !changes.is_empty() {
214							log::debug!("[StartWatcher] Processed {} debounced changes", changes.len());
215						}
216					},
217					Err(e) => {
218						log::error!("[StartWatcher] Failed to process pending changes: {}", e);
219					},
220				}
221			}
222		}
223	})
224}
225
226/// Start background tasks for periodic indexing
227pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
228	let config = &context.app_state.Configuration.Indexing;
229
230	if !config.Enabled {
231		log::info!("[StartWatcher] Background indexing disabled in configuration");
232		return Err(AirError::Configuration("Background indexing is disabled".to_string()));
233	}
234
235	let handle = tokio::spawn(BackgroundTask(context));
236
237	log::info!("[StartWatcher] Background tasks started");
238
239	Ok(handle)
240}
241
242/// Stop background tasks
243pub async fn StopBackgroundTasks(context:&BackgroundIndexerContext) {
244	log::info!("[StartWatcher] Stopping background tasks");
245	// Tasks are cancelled when the task handle is dropped
246}
247
248/// Stop file watcher
249pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
250	if let Some(watcher) = context.file_watcher.lock().await.take() {
251		drop(watcher);
252		log::info!("[StartWatcher] File watcher stopped");
253	}
254}
255
256/// Background task for periodic indexing
257async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
258	let config = context.app_state.Configuration.Indexing.clone();
259
260	let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
261	let mut interval = tokio::time::interval(interval);
262
263	log::info!(
264		"[StartWatcher] Background indexing configured for {} minute intervals",
265		config.UpdateIntervalMinutes
266	);
267
268	loop {
269		interval.tick().await;
270		{
271			// Check corruption flag
272			if *context.corruption_detected.lock().await {
273				log::warn!("[StartWatcher] Index corrupted, skipping background update");
274				continue;
275			}
276
277			log::info!("[StartWatcher] Running periodic background index...");
278
279			// Re-index configured directories
280			let directories = config.IndexDirectory.clone();
281			if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
282			{
283				log::error!("[StartWatcher] Background indexing failed: {}", e);
284			}
285		}
286	}
287}
288
289/// Get watcher status
290pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
291	let is_running = context.file_watcher.lock().await.is_some();
292	let pending_count = context.debounced_handler.PendingCount().await;
293	let is_corrupted = *context.corruption_detected.lock().await;
294
295	WatcherStatus { is_running, pending_count, is_corrupted }
296}
297
298/// Watcher status information
299#[derive(Debug, Clone)]
300pub struct WatcherStatus {
301	pub is_running:bool,
302	pub pending_count:usize,
303	pub is_corrupted:bool,
304}
305
306/// Start all background components (watcher and tasks)
307pub async fn StartAll(
308	context:Arc<BackgroundIndexerContext>,
309	watch_paths:Vec<PathBuf>,
310) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
311	let watcher_handle = if config_watch_enabled(&context) {
312		match StartFileWatcher(&context, watch_paths).await {
313			Ok(()) => {
314				// Start debounce processor
315				Some(StartDebounceProcessor(context.clone()))
316			},
317			Err(e) => {
318				log::error!("[StartWatcher] Failed to start file watcher: {}", e);
319				None
320			},
321		}
322	} else {
323		None
324	};
325
326	let background_handle = match StartBackgroundTasks(context.clone()).await {
327		Ok(handle) => Some(handle),
328		Err(e) => {
329			log::warn!("[StartWatcher] Failed to start background tasks: {}", e);
330			None
331		},
332	};
333
334	Ok((watcher_handle, background_handle))
335}
336
337/// Stop all background components
338pub async fn StopAll(context:&BackgroundIndexerContext) {
339	StopBackgroundTasks(context).await;
340	StopFileWatcher(context).await;
341}
342
343/// Check if watching is enabled in configuration
344fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }