1use std::{
80 sync::{
81 Arc,
82 atomic::{AtomicI64, AtomicU64, Ordering},
83 },
84 time::{Duration, Instant},
85};
86
87use serde::{Deserialize, Serialize};
88use log::{debug, info, warn};
89
90use crate::{AirError, Result};
91
92struct MetricGuard {
94 current:u64,
95 max:u64,
96}
97
98impl MetricGuard {
99 fn new(current:u64, max:u64) -> Self { Self { current, max } }
100
101 fn increment(&mut self) -> bool {
103 if self.current < self.max.saturating_sub(1) {
104 self.current += 1;
105 true
106 } else {
107 warn!("[Metrics] Metric overflow detected, wrapping around");
108 self.current = 0;
109 true
110 }
111 }
112}
113
114#[derive(Debug)]
116struct AggregationValidator {
117 last_timestamp:Instant,
118 validation_window:Duration,
119}
120
121impl AggregationValidator {
122 fn new(validation_window_secs:u64) -> Self {
123 Self {
124 last_timestamp:Instant::now(),
125 validation_window:Duration::from_secs(validation_window_secs),
126 }
127 }
128
129 fn validate(&mut self) -> std::result::Result<(), String> {
131 let now = Instant::now();
132 if now.duration_since(self.last_timestamp) > self.validation_window {
133 warn!("[Metrics] Aggregation outside validation window, resetting");
134 self.last_timestamp = now;
135 Ok(())
136 } else {
137 Ok(())
138 }
139 }
140}
141
142#[derive(Debug, Clone)]
145pub struct MetricsCollector {
146 requests_total:Arc<AtomicU64>,
148 requests_successful:Arc<AtomicU64>,
149 requests_failed:Arc<AtomicU64>,
150 request_latency_sum_ms:Arc<AtomicU64>,
151 request_latency_count:Arc<AtomicU64>,
152 request_latency_min_ms:Arc<AtomicU64>,
153 request_latency_max_ms:Arc<AtomicU64>,
154
155 errors_total:Arc<AtomicU64>,
157 errors_by_type:Arc<std::sync::Mutex<std::collections::HashMap<String, u64>>>,
158
159 memory_usage_bytes:Arc<AtomicI64>,
161 cpu_usage_percent:Arc<AtomicU64>,
162 active_connections:Arc<AtomicU64>,
163 threads_active:Arc<AtomicU64>,
164
165 authentication_operations:Arc<AtomicU64>,
167 authentication_failures:Arc<AtomicU64>,
168 downloads_total:Arc<AtomicU64>,
169 downloads_completed:Arc<AtomicU64>,
170 downloads_failed:Arc<AtomicU64>,
171 downloads_bytes_total:Arc<AtomicU64>,
172 indexing_operations:Arc<AtomicU64>,
173 indexing_entries:Arc<AtomicI64>,
174 updates_checked:Arc<AtomicU64>,
175 updates_applied:Arc<AtomicU64>,
176
177 aggregator:Arc<std::sync::Mutex<AggregationValidator>>,
179}
180
181impl MetricsCollector {
182 pub fn new() -> Result<Self> {
184 info!("[Metrics] MetricsCollector initialized successfully");
185
186 Ok(Self {
187 requests_total:Arc::new(AtomicU64::new(0)),
188 requests_successful:Arc::new(AtomicU64::new(0)),
189 requests_failed:Arc::new(AtomicU64::new(0)),
190 request_latency_sum_ms:Arc::new(AtomicU64::new(0)),
191 request_latency_count:Arc::new(AtomicU64::new(0)),
192 request_latency_min_ms:Arc::new(AtomicU64::new(u64::MAX)),
193 request_latency_max_ms:Arc::new(AtomicU64::new(0)),
194 errors_total:Arc::new(AtomicU64::new(0)),
195 errors_by_type:Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
196 memory_usage_bytes:Arc::new(AtomicI64::new(0)),
197 cpu_usage_percent:Arc::new(AtomicU64::new(0)),
198 active_connections:Arc::new(AtomicU64::new(0)),
199 threads_active:Arc::new(AtomicU64::new(0)),
200 authentication_operations:Arc::new(AtomicU64::new(0)),
201 authentication_failures:Arc::new(AtomicU64::new(0)),
202 downloads_total:Arc::new(AtomicU64::new(0)),
203 downloads_completed:Arc::new(AtomicU64::new(0)),
204 downloads_failed:Arc::new(AtomicU64::new(0)),
205 downloads_bytes_total:Arc::new(AtomicU64::new(0)),
206 indexing_operations:Arc::new(AtomicU64::new(0)),
207 indexing_entries:Arc::new(AtomicI64::new(0)),
208 updates_checked:Arc::new(AtomicU64::new(0)),
209 updates_applied:Arc::new(AtomicU64::new(0)),
210 aggregator:Arc::new(std::sync::Mutex::new(AggregationValidator::new(3600))),
211 })
212 }
213
214 pub fn ValidateAggregation(&self) -> Result<()> {
216 match self.aggregator.lock() {
217 Ok(mut validator) => validator.validate().map_err(|e| AirError::Internal(e)),
218 Err(_) => {
219 warn!("[Metrics] Failed to acquire aggregation validator lock");
220 Ok(())
221 },
222 }
223 }
224
225 pub fn RecordRequestSuccess(&self, LatencySeconds:f64) {
227 self.ValidateAggregation();
228
229 let LatencyMs = (LatencySeconds * 1000.0) as u64;
230
231 let _ = self.requests_total.fetch_add(1, Ordering::Relaxed);
233 let _ = self.requests_successful.fetch_add(1, Ordering::Relaxed);
234
235 let _ = self.request_latency_sum_ms.fetch_add(LatencyMs, Ordering::Relaxed);
237 let _ = self.request_latency_count.fetch_add(1, Ordering::Relaxed);
238
239 MinMaxUpdate(&self.request_latency_min_ms, &self.request_latency_max_ms, LatencyMs);
241
242 debug!("[Metrics] Recorded successful request with latency: {:.3}s", LatencySeconds);
243 }
244
245 pub fn RecordRequestFailure(&self, ErrorType:&str, LatencySeconds:f64) {
247 self.ValidateAggregation();
248
249 let LatencyMs = (LatencySeconds * 1000.0) as u64;
250
251 let _ = self.requests_total.fetch_add(1, Ordering::Relaxed);
253 let _ = self.requests_failed.fetch_add(1, Ordering::Relaxed);
254 let _ = self.errors_total.fetch_add(1, Ordering::Relaxed);
255
256 let _ = self.request_latency_sum_ms.fetch_add(LatencyMs, Ordering::Relaxed);
258 let _ = self.request_latency_count.fetch_add(1, Ordering::Relaxed);
259
260 MinMaxUpdate(&self.request_latency_min_ms, &self.request_latency_max_ms, LatencyMs);
262
263 let RedactedError = self.RedactErrorType(ErrorType);
265 let RedactedErrorClone = RedactedError.clone();
266 if let Ok(mut error_map) = self.errors_by_type.lock() {
267 *error_map.entry(RedactedError).or_insert(0) += 1;
268 }
269
270 debug!(
271 "[Metrics] Recorded failed request: {}, latency: {:.3}s",
272 RedactedErrorClone, LatencySeconds
273 );
274 }
275
276 pub fn UpdateResourceMetrics(&self, MemoryBytes:u64, CPUPercent:f64, ActiveConns:u64, ActiveThreads:u64) {
278 self.memory_usage_bytes.store(MemoryBytes as i64, Ordering::Relaxed);
279 self.cpu_usage_percent.store((CPUPercent * 100.0) as u64, Ordering::Relaxed);
280 self.active_connections.store(ActiveConns, Ordering::Relaxed);
281 self.threads_active.store(ActiveThreads, Ordering::Relaxed);
282
283 debug!(
284 "[Metrics] Updated resource metrics - Memory: {}B, CPU: {:.1}%, Connections: {}, Threads: {}",
285 MemoryBytes, CPUPercent, ActiveConns, ActiveThreads
286 );
287 }
288
289 pub fn RecordAuthenticationOperation(&self, Success:bool) {
291 let _ = self.authentication_operations.fetch_add(1, Ordering::Relaxed);
292 if !Success {
293 let _ = self.authentication_failures.fetch_add(1, Ordering::Relaxed);
294 }
295 }
296
297 pub fn RecordDownload(&self, Success:bool, Bytes:u64) {
299 let _ = self.downloads_total.fetch_add(1, Ordering::Relaxed);
300 let _ = self.downloads_bytes_total.fetch_add(Bytes, Ordering::Relaxed);
301
302 if Success {
303 let _ = self.downloads_completed.fetch_add(1, Ordering::Relaxed);
304 } else {
305 let _ = self.downloads_failed.fetch_add(1, Ordering::Relaxed);
306 }
307 }
308
309 pub fn RecordIndexingOperation(&self, EntriesIndexed:u64) {
311 let _ = self.indexing_operations.fetch_add(1, Ordering::Relaxed);
312 self.indexing_entries.store(EntriesIndexed as i64, Ordering::Relaxed);
313 }
314
315 pub fn RecordUpdateCheck(&self, UpdatesAvailable:bool) {
317 let _ = self.updates_checked.fetch_add(1, Ordering::Relaxed);
318 if UpdatesAvailable {
319 let _ = self.updates_applied.fetch_add(1, Ordering::Relaxed);
320 }
321 }
322
323 fn RedactErrorType(&self, ErrorType:&str) -> String {
325 let Redacted = ErrorType.to_lowercase();
326
327 if Redacted.contains("password") || Redacted.contains("token") || Redacted.contains("secret") {
329 return "sensitive_error".to_string();
330 }
331
332 Redacted
333 }
334
335 pub fn ExportMetrics(&self) -> Result<String> {
337 let metrics_data = self.GetMetricsData();
338
339 let mut output = String::new();
340 output.push_str("# HELP air_requests_total Total number of requests processed by Air daemon\n");
341 output.push_str("# TYPE air_requests_total counter\n");
342 output.push_str(&format!("air_requests_total {}\n", metrics_data.requests_total));
343
344 output.push_str("# HELP air_requests_successful Total number of successful requests\n");
345 output.push_str("# TYPE air_requests_successful counter\n");
346 output.push_str(&format!("air_requests_successful {}\n", metrics_data.requests_successful));
347
348 output.push_str("# HELP air_requests_failed Total number of failed requests\n");
349 output.push_str("# TYPE air_requests_failed counter\n");
350 output.push_str(&format!("air_requests_failed {}\n", metrics_data.requests_failed));
351
352 output.push_str("# HELP air_errors_total Total number of errors encountered\n");
353 output.push_str("# TYPE air_errors_total counter\n");
354 output.push_str(&format!("air_errors_total {}\n", metrics_data.errors_total));
355
356 output.push_str("# HELP air_memory_usage_bytes Memory usage in bytes\n");
357 output.push_str("# TYPE air_memory_usage_bytes gauge\n");
358 output.push_str(&format!("air_memory_usage_bytes {}\n", metrics_data.memory_bytes));
359
360 output.push_str("# HELP air_cpu_usage_percent CPU usage in hundredths of a percent\n");
361 output.push_str("# TYPE air_cpu_usage_percent gauge\n");
362 output.push_str(&format!("air_cpu_usage_percent {}\n", metrics_data.cpu_percent));
363
364 output.push_str("# HELP air_active_connections Number of active connections\n");
365 output.push_str("# TYPE air_active_connections gauge\n");
366 output.push_str(&format!("air_active_connections {}\n", metrics_data.active_connections));
367
368 output.push_str("# HELP air_threads_active Number of active threads\n");
369 output.push_str("# TYPE air_threads_active gauge\n");
370 output.push_str(&format!("air_threads_active {}\n", metrics_data.active_threads));
371
372 output.push_str("# HELP air_authentication_operations_total Total authentication operations\n");
373 output.push_str("# TYPE air_authentication_operations_total counter\n");
374 output.push_str(&format!(
375 "air_authentication_operations_total {}\n",
376 metrics_data.authentication_operations
377 ));
378
379 output.push_str("# HELP air_authentication_failures_total Total authentication failures\n");
380 output.push_str("# TYPE air_authentication_failures_total counter\n");
381 output.push_str(&format!(
382 "air_authentication_failures_total {}\n",
383 metrics_data.authentication_failures
384 ));
385
386 output.push_str("# HELP air_downloads_total Total downloads initiated\n");
387 output.push_str("# TYPE air_downloads_total counter\n");
388 output.push_str(&format!("air_downloads_total {}\n", metrics_data.downloads_total));
389
390 output.push_str("# HELP air_downloads_completed_total Total downloads completed successfully\n");
391 output.push_str("# TYPE air_downloads_completed_total counter\n");
392 output.push_str(&format!("air_downloads_completed_total {}\n", metrics_data.downloads_completed));
393
394 output.push_str("# HELP air_downloads_failed_total Total downloads failed\n");
395 output.push_str("# TYPE air_downloads_failed_total counter\n");
396 output.push_str(&format!("air_downloads_failed_total {}\n", metrics_data.downloads_failed));
397
398 output.push_str("# HELP air_downloads_bytes_total Total bytes downloaded\n");
399 output.push_str("# TYPE air_downloads_bytes_total counter\n");
400 output.push_str(&format!("air_downloads_bytes_total {}\n", metrics_data.downloads_bytes));
401
402 output.push_str("# HELP air_indexing_operations_total Total indexing operations\n");
403 output.push_str("# TYPE air_indexing_operations_total counter\n");
404 output.push_str(&format!("air_indexing_operations_total {}\n", metrics_data.indexing_operations));
405
406 output.push_str("# HELP air_indexing_entries Number of indexed entries\n");
407 output.push_str("# TYPE air_indexing_entries gauge\n");
408 output.push_str(&format!("air_indexing_entries {}\n", metrics_data.indexing_entries));
409
410 output.push_str("# HELP air_updates_checked_total Total update checks performed\n");
411 output.push_str("# TYPE air_updates_checked_total counter\n");
412 output.push_str(&format!("air_updates_checked_total {}\n", metrics_data.updates_checked));
413
414 output.push_str("# HELP air_updates_applied_total Total updates applied\n");
415 output.push_str("# TYPE air_updates_applied_total counter\n");
416 output.push_str(&format!("air_updates_applied_total {}\n", metrics_data.updates_applied));
417
418 Ok(output)
419 }
420
421 pub fn GetMetricsData(&self) -> MetricsData {
423 let latency_avg = if self.request_latency_count.load(Ordering::Relaxed) > 0 {
424 self.request_latency_sum_ms.load(Ordering::Relaxed) as f64
425 / self.request_latency_count.load(Ordering::Relaxed) as f64
426 } else {
427 0.0
428 };
429
430 MetricsData {
431 timestamp:crate::Utility::CurrentTimestamp(),
432 requests_total:self.requests_total.load(Ordering::Relaxed),
433 requests_successful:self.requests_successful.load(Ordering::Relaxed),
434 requests_failed:self.requests_failed.load(Ordering::Relaxed),
435 errors_total:self.errors_total.load(Ordering::Relaxed),
436 memory_bytes:self.memory_usage_bytes.load(Ordering::Relaxed).max(0) as u64,
437 cpu_percent:self.cpu_usage_percent.load(Ordering::Relaxed) as f64 / 100.0,
438 active_connections:self.active_connections.load(Ordering::Relaxed),
439 active_threads:self.threads_active.load(Ordering::Relaxed),
440 authentication_operations:self.authentication_operations.load(Ordering::Relaxed),
441 authentication_failures:self.authentication_failures.load(Ordering::Relaxed),
442 downloads_total:self.downloads_total.load(Ordering::Relaxed),
443 downloads_completed:self.downloads_completed.load(Ordering::Relaxed),
444 downloads_failed:self.downloads_failed.load(Ordering::Relaxed),
445 downloads_bytes:self.downloads_bytes_total.load(Ordering::Relaxed),
446 indexing_operations:self.indexing_operations.load(Ordering::Relaxed),
447 indexing_entries:self.indexing_entries.load(Ordering::Relaxed).max(0) as u64,
448 updates_checked:self.updates_checked.load(Ordering::Relaxed),
449 updates_applied:self.updates_applied.load(Ordering::Relaxed),
450 latency_avg_ms:latency_avg,
451 latency_min_ms:self.request_latency_min_ms.load(Ordering::Relaxed),
452 latency_max_ms:self.request_latency_max_ms.load(Ordering::Relaxed),
453 }
454 }
455
456 #[cfg(test)]
458 pub fn Reset(&self) {
459 self.requests_total.store(0, Ordering::Relaxed);
460 self.requests_successful.store(0, Ordering::Relaxed);
461 self.requests_failed.store(0, Ordering::Relaxed);
462 self.request_latency_sum_ms.store(0, Ordering::Relaxed);
463 self.request_latency_count.store(0, Ordering::Relaxed);
464 self.request_latency_min_ms.store(u64::MAX, Ordering::Relaxed);
465 self.request_latency_max_ms.store(0, Ordering::Relaxed);
466 self.errors_total.store(0, Ordering::Relaxed);
467 self.memory_usage_bytes.store(0, Ordering::Relaxed);
468 self.cpu_usage_percent.store(0, Ordering::Relaxed);
469 self.active_connections.store(0, Ordering::Relaxed);
470 self.threads_active.store(0, Ordering::Relaxed);
471 self.authentication_operations.store(0, Ordering::Relaxed);
472 self.authentication_failures.store(0, Ordering::Relaxed);
473 self.downloads_total.store(0, Ordering::Relaxed);
474 self.downloads_completed.store(0, Ordering::Relaxed);
475 self.downloads_failed.store(0, Ordering::Relaxed);
476 self.downloads_bytes_total.store(0, Ordering::Relaxed);
477 self.indexing_operations.store(0, Ordering::Relaxed);
478 self.indexing_entries.store(0, Ordering::Relaxed);
479 self.updates_checked.store(0, Ordering::Relaxed);
480 self.updates_applied.store(0, Ordering::Relaxed);
481 }
482}
483
484fn MinMaxUpdate(MinMetric:&AtomicU64, MaxMetric:&AtomicU64, Value:u64) {
486 let mut CurrentMin = MinMetric.load(Ordering::Relaxed);
487 let mut CurrentMax = MaxMetric.load(Ordering::Relaxed);
488
489 loop {
490 if Value < CurrentMin {
491 match MinMetric.compare_exchange_weak(CurrentMin, Value, Ordering::Relaxed, Ordering::Relaxed) {
492 Ok(_) => break,
493 Err(NewMin) => CurrentMin = NewMin,
494 }
495 } else if Value > CurrentMax {
496 match MaxMetric.compare_exchange_weak(CurrentMax, Value, Ordering::Relaxed, Ordering::Relaxed) {
497 Ok(_) => break,
498 Err(NewMax) => CurrentMax = NewMax,
499 }
500 } else {
501 break;
502 }
503 }
504}
505
506impl Default for MetricsCollector {
507 fn default() -> Self { Self::new().expect("Failed to create MetricsCollector") }
508}
509
510#[derive(Debug, Clone, Serialize, Deserialize)]
512pub struct MetricsData {
513 pub timestamp:u64,
514 pub requests_total:u64,
515 pub requests_successful:u64,
516 pub requests_failed:u64,
517 pub errors_total:u64,
518 pub memory_bytes:u64,
519 pub cpu_percent:f64,
520 pub active_connections:u64,
521 pub active_threads:u64,
522 pub authentication_operations:u64,
523 pub authentication_failures:u64,
524 pub downloads_total:u64,
525 pub downloads_completed:u64,
526 pub downloads_failed:u64,
527 pub downloads_bytes:u64,
528 pub indexing_operations:u64,
529 pub indexing_entries:u64,
530 pub updates_checked:u64,
531 pub updates_applied:u64,
532 pub latency_avg_ms:f64,
533 pub latency_min_ms:u64,
534 pub latency_max_ms:u64,
535}
536
537impl MetricsData {
538 pub fn SuccessRate(&self) -> f64 {
540 if self.requests_total == 0 {
541 return 100.0;
542 }
543 (self.requests_successful as f64 / self.requests_total as f64) * 100.0
544 }
545
546 pub fn DownloadSuccessRate(&self) -> f64 {
548 if self.downloads_total == 0 {
549 return 100.0;
550 }
551 (self.downloads_completed as f64 / self.downloads_total as f64) * 100.0
552 }
553
554 pub fn ErrorRate(&self) -> f64 {
556 if self.requests_total == 0 {
557 return 0.0;
558 }
559 (self.errors_total as f64 / self.requests_total as f64) * 100.0
560 }
561}
562
563static METRICS_INSTANCE:std::sync::OnceLock<MetricsCollector> = std::sync::OnceLock::new();
565
566pub fn GetMetrics() -> &'static MetricsCollector { METRICS_INSTANCE.get_or_init(|| MetricsCollector::default()) }
568
569pub fn InitializeMetrics() -> Result<()> {
571 let _collector = GetMetrics();
572 info!("[Metrics] Global metrics collector initialized");
573 Ok(())
574}