1use std::{collections::HashMap, sync::Arc};
94
95use serde::{Deserialize, Serialize};
96use tokio::sync::{Mutex, RwLock};
97use systemstat::{Platform, System};
98
99use crate::{AirError, Configuration::AirConfiguration, Result, Utility};
100
101#[derive(Debug)]
103pub struct ApplicationState {
104 pub Configuration:Arc<AirConfiguration>,
106
107 pub ServiceStatus:Arc<RwLock<HashMap<String, ServiceStatus>>>,
109
110 pub ActiveRequests:Arc<Mutex<HashMap<String, RequestStatus>>>,
112
113 pub Metrics:Arc<RwLock<PerformanceMetrics>>,
115
116 pub Resources:Arc<RwLock<ResourceUsage>>,
118
119 pub Connections:Arc<RwLock<HashMap<String, ConnectionInfo>>>,
121
122 pub BackgroundTasks:Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub enum ServiceStatus {
129 Starting,
130 Running,
131 Stopping,
132 Stopped,
133 Error(String),
134}
135
136#[derive(Debug, Clone)]
138pub struct RequestStatus {
139 pub RequestId:String,
140 pub Service:String,
141 pub StartedAt:u64,
142 pub Status:RequestState,
143 pub Progress:Option<f32>,
144}
145
146#[derive(Debug, Clone)]
148pub enum RequestState {
149 Pending,
150 InProgress,
151 Completed,
152 Failed(String),
153 Cancelled,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct PerformanceMetrics {
159 pub TotalRequests:u64,
160 pub SuccessfulRequests:u64,
161 pub FailedRequests:u64,
162 pub AverageResponseTime:f64,
163 pub UptimeSeconds:u64,
164 pub LastUpdated:u64,
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct ResourceUsage {
170 pub MemoryUsageMb:f64,
171 pub CPUUsagePercent:f64,
172 pub DiskUsageMb:f64,
173 pub NetworkUsageMbps:f64,
174 pub LastUpdated:u64,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct ConnectionInfo {
180 pub ConnectionId:String,
181 pub ClientId:String,
182 pub ClientVersion:String,
183 pub ProtocolVersion:u32,
184 pub LastHeartbeat:u64,
185 pub IsActive:bool,
186 pub ConnectionType:ConnectionType,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
191pub enum ConnectionType {
192 MountainMain,
193 MountainWorker,
194 Cocoon,
195 Wind,
196 External,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct ConnectionHealthReport {
202 pub TotalConnections:usize,
203 pub HealthyConnections:usize,
204 pub StaleConnections:usize,
205 pub ConnectionsByType:HashMap<String, usize>,
206 pub LastChecked:u64,
207}
208
209impl ApplicationState {
210 pub async fn New(Configuration:Arc<AirConfiguration>) -> Result<Self> {
212 let State = Self {
213 Configuration,
214 ServiceStatus:Arc::new(RwLock::new(HashMap::new())),
215 ActiveRequests:Arc::new(Mutex::new(HashMap::new())),
216 Metrics:Arc::new(RwLock::new(PerformanceMetrics {
217 TotalRequests:0,
218 SuccessfulRequests:0,
219 FailedRequests:0,
220 AverageResponseTime:0.0,
221 UptimeSeconds:0,
222 LastUpdated:Utility::CurrentTimestamp(),
223 })),
224 Resources:Arc::new(RwLock::new(ResourceUsage {
225 MemoryUsageMb:0.0,
226 CPUUsagePercent:0.0,
227 DiskUsageMb:0.0,
228 NetworkUsageMbps:0.0,
229 LastUpdated:Utility::CurrentTimestamp(),
230 })),
231 Connections:Arc::new(RwLock::new(HashMap::new())),
232 BackgroundTasks:Arc::new(Mutex::new(Vec::new())),
233 };
234
235 State.InitializeServiceStatus().await?;
237
238 Ok(State)
239 }
240
241 async fn InitializeServiceStatus(&self) -> Result<()> {
243 let mut Status = self.ServiceStatus.write().await;
244
245 Status.insert("authentication".to_string(), ServiceStatus::Starting);
246 Status.insert("updates".to_string(), ServiceStatus::Starting);
247 Status.insert("downloader".to_string(), ServiceStatus::Starting);
248 Status.insert("indexing".to_string(), ServiceStatus::Starting);
249 Status.insert("grpc".to_string(), ServiceStatus::Starting);
250 Status.insert("connections".to_string(), ServiceStatus::Starting);
251
252 Ok(())
253 }
254
255 pub async fn RegisterConnection(
258 &self,
259 ConnectionId:String,
260 ClientId:String,
261 ClientVersion:String,
262 ProtocolVersion:u32,
263 ConnectionType:ConnectionType,
264 ) -> Result<()> {
265 if ConnectionId.is_empty() {
267 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
268 }
269
270 if ClientId.is_empty() {
272 return Err(AirError::Configuration("Client ID cannot be empty".to_string()));
273 }
274
275 if ProtocolVersion == 0 {
277 return Err(AirError::Configuration("Protocol version must be greater than 0".to_string()));
278 }
279
280 let mut Connections = self.Connections.write().await;
281
282 if Connections.contains_key(&ConnectionId) {
284 return Err(AirError::Configuration(format!("Connection {} already exists", ConnectionId)));
285 }
286
287 if matches!(ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker) {
289 let ClientConnCount = Connections
291 .values()
292 .filter(|c| {
293 c.ClientId == ClientId
294 && matches!(c.ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker)
295 })
296 .count();
297
298 const MAX_CONN_PER_CLIENT:usize = 10;
299 if ClientConnCount >= MAX_CONN_PER_CLIENT {
300 return Err(AirError::ResourceLimit(format!(
301 "Client {} exceeds maximum connection limit ({})",
302 ClientId, MAX_CONN_PER_CLIENT
303 )));
304 }
305 }
306
307 Connections.insert(
308 ConnectionId.clone(),
309 ConnectionInfo {
310 ConnectionId:ConnectionId.clone(),
311 ClientId:ClientId.clone(),
312 ClientVersion,
313 ProtocolVersion,
314 LastHeartbeat:Utility::CurrentTimestamp(),
315 IsActive:true,
316 ConnectionType:ConnectionType.clone(),
317 },
318 );
319
320 log::info!("Connection registered: {} - {} ({:?})", ConnectionId, ClientId, ConnectionType);
321 Ok(())
322 }
323
324 pub async fn UpdateHeartbeat(&self, ConnectionId:&str) -> Result<()> {
327 if ConnectionId.is_empty() {
328 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
329 }
330
331 let mut Connections = self.Connections.write().await;
332
333 if let Some(Connection) = Connections.get_mut(ConnectionId) {
334 let CurrentTime = Utility::CurrentTimestamp();
335 const MAX_HEARTBEAT_INTERVAL:u64 = 120000; if CurrentTime - Connection.LastHeartbeat > MAX_HEARTBEAT_INTERVAL {
339 log::warn!(
340 "Long heartbeat interval for connection {}: {}ms",
341 ConnectionId,
342 CurrentTime - Connection.LastHeartbeat
343 );
344 }
345
346 Connection.LastHeartbeat = CurrentTime;
347 Connection.IsActive = true;
348
349 log::debug!(
350 "Heartbeat updated for connection: {} (client: {})",
351 ConnectionId,
352 Connection.ClientId
353 );
354 } else {
355 return Err(AirError::Internal(format!("Connection {} not found", ConnectionId)));
356 }
357
358 Ok(())
359 }
360
361 pub async fn RemoveConnection(&self, ConnectionId:&str) -> Result<()> {
364 if ConnectionId.is_empty() {
365 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
366 }
367
368 let mut Connections = self.Connections.write().await;
369
370 if let Some(Connection) = Connections.remove(ConnectionId) {
371 log::info!(
372 "Connection removed: {} (client: {}, type: {:?})",
373 ConnectionId,
374 Connection.ClientId,
375 Connection.ConnectionType
376 );
377
378 } else {
383 log::warn!("Attempted to remove non-existent connection: {}", ConnectionId);
384 }
385
386 Ok(())
387 }
388
389 pub async fn GetActiveConnectionCount(&self) -> usize {
391 let Connections = self.Connections.read().await;
392 Connections.values().filter(|c| c.IsActive).count()
393 }
394
395 pub async fn GetConnectionCountByType(&self, ConnectionType:ConnectionType) -> usize {
397 let Connections = self.Connections.read().await;
398 Connections
399 .values()
400 .filter(|c| c.ConnectionType == ConnectionType && c.IsActive)
401 .count()
402 }
403
404 pub async fn GetConnectionsByType(&self, ConnectionType:ConnectionType) -> Vec<ConnectionInfo> {
406 let Connections = self.Connections.read().await;
407 Connections
408 .values()
409 .filter(|c| c.ConnectionType == ConnectionType)
410 .cloned()
411 .collect()
412 }
413
414 pub async fn GetNextMountainConnection(&self) -> Result<ConnectionInfo> {
417 let Connections = self.Connections.read().await;
418
419 let MountainConnections:Vec<_> = Connections
420 .values()
421 .filter(|c| {
422 matches!(c.ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker) && c.IsActive
423 })
424 .collect();
425
426 if MountainConnections.is_empty() {
427 return Err(AirError::ServiceUnavailable(
428 "No active Mountain connections available".to_string(),
429 ));
430 }
431
432 let Selected = MountainConnections[0].clone();
438
439 Ok(Selected)
440 }
441
442 pub async fn CleanupStaleConnections(&self, TimeoutSeconds:u64) -> Result<usize> {
446 let mut Connections = self.Connections.write().await;
447 let CurrentTime = Utility::CurrentTimestamp();
448 let TimeoutMs = TimeoutSeconds * 1000;
449
450 let mut RemovedCount = 0;
451 let mut RemovedByType:HashMap<String, usize> = HashMap::new();
452
453 Connections.retain(|Id, Connection| {
454 if CurrentTime - Connection.LastHeartbeat > TimeoutMs {
455 log::warn!(
456 "Removing stale connection: {} - {} ({:?}) - idle: {}ms",
457 Id,
458 Connection.ClientId,
459 Connection.ConnectionType,
460 CurrentTime - Connection.LastHeartbeat
461 );
462
463 *RemovedByType.entry(format!("{:?}", Connection.ConnectionType)).or_insert(0) += 1;
464
465 RemovedCount += 1;
466 false
467 } else {
468 true
469 }
470 });
471
472 if RemovedCount > 0 {
473 log::info!("Cleaned up {} stale connections", RemovedCount);
474 for (ConnType, Count) in RemovedByType {
475 log::info!(" - {} connections: {}", ConnType, Count);
476 }
477 }
478
479 Ok(RemovedCount)
480 }
481
482 pub async fn RegisterBackgroundTask(&self, Task:tokio::task::JoinHandle<()>) -> Result<()> {
484 let mut Tasks = self.BackgroundTasks.lock().await;
485 Tasks.push(Task);
486 log::debug!("Background task registered. Total tasks: {}", Tasks.len());
487 Ok(())
488 }
489
490 pub async fn StopAllBackgroundTasks(&self) -> Result<()> {
492 let mut Tasks = self.BackgroundTasks.lock().await;
493
494 let TaskCount = Tasks.len();
495 log::info!("Stopping {} background tasks", TaskCount);
496
497 for Task in Tasks.drain(..) {
499 Task.abort();
500 }
501
502 log::info!("Stopped all {} background tasks", TaskCount);
503 Ok(())
504 }
505
506 pub async fn UpdateServiceStatus(&self, Service:&str, Status:ServiceStatus) -> Result<()> {
508 if Service.is_empty() {
509 return Err(AirError::Configuration("Service name cannot be empty".to_string()));
510 }
511
512 let mut ServiceStatus = self.ServiceStatus.write().await;
513 let StatusClone = Status.clone();
514 ServiceStatus.insert(Service.to_string(), Status);
515 log::debug!("Service status updated: {} -> {:?}", Service, StatusClone);
516 Ok(())
517 }
518
519 pub async fn GetServiceStatus(&self, Service:&str) -> Option<ServiceStatus> {
521 let ServiceStatus = self.ServiceStatus.read().await;
522 ServiceStatus.get(Service).cloned()
523 }
524
525 pub async fn GetAllServiceStatuses(&self) -> HashMap<String, ServiceStatus> {
527 let ServiceStatus = self.ServiceStatus.read().await;
528 ServiceStatus.clone()
529 }
530
531 pub async fn RegisterRequest(&self, RequestId:String, Service:String) -> Result<()> {
533 if RequestId.is_empty() {
534 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
535 }
536
537 if Service.is_empty() {
538 return Err(AirError::Configuration("Service name cannot be empty".to_string()));
539 }
540
541 let mut Requests = self.ActiveRequests.lock().await;
542
543 if Requests.contains_key(&RequestId) {
545 return Err(AirError::Configuration(format!("Request {} already exists", RequestId)));
546 }
547
548 Requests.insert(
549 RequestId.clone(),
550 RequestStatus {
551 RequestId:RequestId.clone(),
552 Service,
553 StartedAt:Utility::CurrentTimestamp(),
554 Status:RequestState::Pending,
555 Progress:None,
556 },
557 );
558
559 log::debug!("Request registered: {}", RequestId);
560 Ok(())
561 }
562
563 pub async fn UpdateRequestStatus(&self, RequestId:&str, Status:RequestState, Progress:Option<f32>) -> Result<()> {
565 if RequestId.is_empty() {
566 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
567 }
568
569 if let Some(p) = Progress {
571 if !(0.0..=1.0).contains(&p) {
572 return Err(AirError::Configuration("Progress must be between 0.0 and 1.0".to_string()));
573 }
574 }
575
576 let mut Requests = self.ActiveRequests.lock().await;
577
578 if let Some(Request) = Requests.get_mut(RequestId) {
579 Request.Status = Status;
580 Request.Progress = Progress;
581 } else {
582 return Err(AirError::Internal(format!("Request {} not found", RequestId)));
583 }
584
585 Ok(())
586 }
587
588 pub async fn RemoveRequest(&self, RequestId:&str) -> Result<()> {
590 if RequestId.is_empty() {
591 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
592 }
593
594 let mut requests = self.ActiveRequests.lock().await;
595
596 if requests.remove(RequestId).is_some() {
597 log::debug!("Request removed: {}", RequestId);
598 }
599
600 Ok(())
601 }
602
603 pub async fn UpdateMetrics(&self, Success:bool, ResponseTime:u64) -> Result<()> {
605 let mut Metrics = self.Metrics.write().await;
606
607 Metrics.TotalRequests += 1;
608 if Success {
609 Metrics.SuccessfulRequests += 1;
610 } else {
611 Metrics.FailedRequests += 1;
612 }
613
614 let Alpha = 0.1; Metrics.AverageResponseTime = Alpha * (ResponseTime as f64) + (1.0 - Alpha) * Metrics.AverageResponseTime;
617
618 Metrics.LastUpdated = Utility::CurrentTimestamp();
619
620 Ok(())
621 }
622
623 pub async fn UpdateResourceUsage(&self) -> Result<()> {
625 let Sys = System::new();
626
627 let MemoryUsage = if let Ok(Memory) = Sys.memory() {
629 (Memory.total.as_u64() - Memory.free.as_u64()) as f64 / 1024.0 / 1024.0
630 } else {
631 log::warn!("Failed to get memory usage");
632 0.0
633 };
634
635 let CPUUsage = if let Ok(CPU) = Sys.cpu_load_aggregate() {
637 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
638 if let Ok(CPU) = CPU.done() {
639 (CPU.user + CPU.nice + CPU.system) as f64 * 100.0
640 } else {
641 log::warn!("Failed to get CPU usage after sampling");
642 0.0
643 }
644 } else {
645 log::warn!("Failed to start CPU load sampling");
646 0.0
647 };
648
649 let mut Resources = self.Resources.write().await;
651 Resources.MemoryUsageMb = MemoryUsage;
652 Resources.CPUUsagePercent = CPUUsage;
653 Resources.LastUpdated = Utility::CurrentTimestamp();
654
655 Ok(())
656 }
657
658 pub async fn GetMetrics(&self) -> PerformanceMetrics {
660 let metrics = self.Metrics.read().await;
661 metrics.clone()
662 }
663
664 pub async fn GetResourceUsage(&self) -> ResourceUsage {
666 let Resources = self.Resources.read().await;
667 Resources.clone()
668 }
669
670 pub async fn GetActiveRequestCount(&self) -> usize {
672 let Requests = self.ActiveRequests.lock().await;
673 Requests.len()
674 }
675
676 pub async fn IsRequestCancelled(&self, RequestId:&str) -> bool {
678 let Requests = self.ActiveRequests.lock().await;
679 if let Some(Request) = Requests.get(RequestId) {
680 matches!(Request.Status, RequestState::Cancelled)
681 } else {
682 false
683 }
684 }
685
686 pub async fn GetConfiguration(&self) -> Arc<AirConfiguration> { self.Configuration.clone() }
688
689 pub async fn UpdateConfiguration(
691 &self,
692 Section:String,
693 Updates:std::collections::HashMap<String, String>,
694 ) -> Result<()> {
695 log::info!("[ApplicationState] Updating configuration section: {}", Section);
696
697 if Section.is_empty() {
699 return Err(AirError::Configuration("Configuration section cannot be empty".to_string()));
700 }
701
702 if Updates.is_empty() {
704 return Err(AirError::Configuration("Configuration updates cannot be empty".to_string()));
705 }
706
707 match Section.as_str() {
715 "grpc" => {
716 log::info!("Updating gRPC configuration: {:?}", Updates);
717 },
718 "updates" => {
719 log::info!("Updating updates configuration: {:?}", Updates);
720 },
721 "downloader" => {
722 log::info!("Updating downloader configuration: {:?}", Updates);
723 },
724 "indexing" => {
725 log::info!("Updating indexing configuration: {:?}", Updates);
726 },
727 "daemon" => {
728 log::info!("Updating daemon configuration: {:?}", Updates);
729 },
730 _ => {
731 return Err(AirError::Configuration(format!("Unknown configuration section: {}", Section)));
732 },
733 }
734
735 Ok(())
736 }
737
738 pub async fn SetResourceLimits(
740 &self,
741 MemoryLimitMb:Option<u64>,
742 CPULimitPercent:Option<f64>,
743 DiskLimitMb:Option<u64>,
744 ) -> Result<()> {
745 log::info!(
746 "[ApplicationState] Setting resource limits memory={:?}, CPU={:?}, disk={:?}",
747 MemoryLimitMb,
748 CPULimitPercent,
749 DiskLimitMb
750 );
751
752 if let Some(CPU) = CPULimitPercent {
754 if !(0.0..=100.0).contains(&CPU) {
755 return Err(AirError::ResourceLimit("CPU limit must be between 0 and 100".to_string()));
756 }
757 }
758
759 if let Some(Memory) = MemoryLimitMb {
761 if Memory == 0 {
762 return Err(AirError::ResourceLimit("Memory limit must be greater than 0".to_string()));
763 }
764 }
765
766 if let Some(Disk) = DiskLimitMb {
768 if Disk == 0 {
769 return Err(AirError::ResourceLimit("Disk limit must be greater than 0".to_string()));
770 }
771 }
772
773 if MemoryLimitMb.is_some() {
783 log::info!("Memory limit set: {} MB", MemoryLimitMb.unwrap());
784 }
785 if CPULimitPercent.is_some() {
786 log::info!("CPU limit set: {}%", CPULimitPercent.unwrap());
787 }
788 if DiskLimitMb.is_some() {
789 log::info!("Disk limit set: {} MB", DiskLimitMb.unwrap());
790 }
791
792 Ok(())
793 }
794
795 pub async fn CheckResourceLimits(&self) -> Result<bool> {
797 let _Resources = self.Resources.read().await;
798
799 Ok(false)
803 }
804
805 pub async fn GetConnectionHealthReport(&self) -> ConnectionHealthReport {
807 let Connections = self.Connections.read().await;
808 let CurrentTime = Utility::CurrentTimestamp();
809
810 let mut Healthy = 0;
811 let mut Stale = 0;
812 let mut ByType:HashMap<String, usize> = HashMap::new();
813
814 for Connection in Connections.values() {
815 let IsStale = CurrentTime - Connection.LastHeartbeat > 120000; if IsStale {
818 Stale += 1;
819 } else if Connection.IsActive {
820 Healthy += 1;
821 }
822
823 *ByType.entry(format!("{:?}", Connection.ConnectionType)).or_insert(0) += 1;
824 }
825
826 ConnectionHealthReport {
827 TotalConnections:Connections.len(),
828 HealthyConnections:Healthy,
829 StaleConnections:Stale,
830 ConnectionsByType:ByType,
831 LastChecked:CurrentTime,
832 }
833 }
834}