1use std::{collections::HashMap, sync::Arc};
9
10use log::{debug, error, info, warn};
11use tonic::{Request, Response, Status};
12use futures::StreamExt;
13use tokio_stream::StreamExt as TokioStreamExt;
14use async_trait::async_trait;
15
16use crate::{
17 AirError,
18 ApplicationState::ApplicationState,
19 Authentication::AuthenticationService,
20 Downloader::DownloadManager,
21 Indexing::{
22 FileIndexer,
23 Store::QueryIndex::{SearchMode, SearchQuery},
24 },
25 Result,
26 Updates::UpdateManager,
27 Utility::CurrentTimestamp,
28 Vine::Generated::{
29 air as air_generated,
30 air::{
31 ApplyUpdateRequest,
32 ApplyUpdateResponse,
33 AuthenticationRequest,
34 AuthenticationResponse,
35 ConfigurationRequest,
36 ConfigurationResponse,
37 DownloadRequest,
38 DownloadResponse,
39 DownloadStreamRequest,
40 DownloadStreamResponse,
41 FileInfoRequest,
42 FileInfoResponse,
43 FileResult,
44 HealthCheckRequest,
45 HealthCheckResponse,
46 IndexRequest,
47 IndexResponse,
48 MetricsRequest,
49 MetricsResponse,
50 ResourceLimitsRequest,
51 ResourceLimitsResponse,
52 ResourceUsageRequest,
53 ResourceUsageResponse,
54 SearchRequest,
55 SearchResponse,
56 StatusRequest,
57 StatusResponse,
58 UpdateCheckRequest,
59 UpdateCheckResponse,
60 UpdateConfigurationRequest,
61 UpdateConfigurationResponse,
62 air_service_server::AirService,
63 },
64 },
65};
66
67#[derive(Clone)]
69pub struct AirVinegRPCService {
70 AppState:Arc<ApplicationState>,
72
73 AuthService:Arc<AuthenticationService>,
75
76 UpdateManager:Arc<UpdateManager>,
78
79 DownloadManager:Arc<DownloadManager>,
81
82 FileIndexer:Arc<FileIndexer>,
84
85 ActiveConnections:Arc<tokio::sync::RwLock<HashMap<String, ConnectionMetadata>>>,
87}
88
89#[derive(Debug, Clone)]
91struct ConnectionMetadata {
92 pub ClientId:String,
93 pub ClientVersion:String,
94 pub ProtocolVersion:u32,
95 pub LastRequestTime:u64,
96 pub RequestCount:u64,
97 pub ConnectionType:crate::ApplicationState::ConnectionType,
98}
99
100impl AirVinegRPCService {
101 pub fn new(
103 AppState:Arc<ApplicationState>,
104 AuthService:Arc<AuthenticationService>,
105 UpdateManager:Arc<UpdateManager>,
106 DownloadManager:Arc<DownloadManager>,
107 FileIndexer:Arc<FileIndexer>,
108 ) -> Self {
109 info!("[AirVinegRPCService] New instance created");
110
111 Self {
112 AppState,
113 AuthService,
114 UpdateManager,
115 DownloadManager,
116 FileIndexer,
117 ActiveConnections:Arc::new(tokio::sync::RwLock::new(HashMap::new())),
118 }
119 }
120
121 async fn TrackConnection<RequestType>(
123 &self,
124 Request:&tonic::Request<RequestType>,
125 ServiceName:&str,
126 ) -> std::result::Result<String, Status> {
127 let Metadata = Request.metadata();
128 let ConnectionId = Metadata
129 .get("connection-id")
130 .map(|v| v.to_str().unwrap_or_default().to_string())
131 .unwrap_or_else(|| crate::Utility::GenerateRequestId());
132
133 let ClientId = Metadata
134 .get("client-id")
135 .map(|v| v.to_str().unwrap_or_default().to_string())
136 .unwrap_or_else(|| "unknown".to_string());
137
138 let ClientVersion = Metadata
139 .get("client-version")
140 .map(|v| v.to_str().unwrap_or_default().to_string())
141 .unwrap_or_else(|| "unknown".to_string());
142
143 let ProtocolVersion = Metadata
144 .get("protocol-version")
145 .map(|v| v.to_str().unwrap_or_default().parse().unwrap_or(1))
146 .unwrap_or(1);
147
148 let mut Connections = self.ActiveConnections.write().await;
150 let ConnectionMetadata = Connections.entry(ConnectionId.clone()).or_insert_with(|| {
151 ConnectionMetadata {
152 ClientId:ClientId.clone(),
153 ClientVersion:ClientVersion.clone(),
154 ProtocolVersion,
155 LastRequestTime:crate::Utility::CurrentTimestamp(),
156 RequestCount:0,
157 ConnectionType:crate::ApplicationState::ConnectionType::MountainMain,
158 }
159 });
160
161 ConnectionMetadata.LastRequestTime = crate::Utility::CurrentTimestamp();
162 ConnectionMetadata.RequestCount += 1;
163
164 self.AppState
166 .RegisterConnection(
167 ConnectionId.clone(),
168 ClientId,
169 ClientVersion,
170 ProtocolVersion,
171 crate::ApplicationState::ConnectionType::MountainMain,
172 )
173 .await
174 .map_err(|e| Status::internal(e.to_string()))?;
175
176 Ok(ConnectionId)
177 }
178
179 fn validate_protocol_version(&self, ClientVersion:u32) -> std::result::Result<(), Status> {
181 if ClientVersion > crate::ProtocolVersion {
182 return Err(Status::failed_precondition(format!(
183 "Client protocol version {} is newer than server version {}",
184 ClientVersion,
185 crate::ProtocolVersion
186 )));
187 }
188
189 if ClientVersion < crate::ProtocolVersion {
190 warn!(
191 "Client using older protocol version {} (server: {})",
192 ClientVersion,
193 crate::ProtocolVersion
194 );
195 }
196
197 Ok(())
198 }
199}
200
201#[async_trait]
202impl AirService for AirVinegRPCService {
203 async fn authenticate(
205 &self,
206 Request:Request<AuthenticationRequest>,
207 ) -> std::result::Result<Response<AuthenticationResponse>, Status> {
208 let ConnectionId = self.TrackConnection(&Request, "authentication").await?;
210
211 let RequestData = Request.into_inner();
212 let request_id = RequestData.request_id.clone();
213
214 info!(
215 "[AirVinegRPCService] Authentication request received [ID: {}] [Connection: {}]",
216 request_id, ConnectionId
217 );
218
219 self.AppState
220 .RegisterRequest(request_id.clone(), "authentication".to_string())
221 .await
222 .map_err(|e| Status::internal(e.to_string()))?;
223
224 if RequestData.username.is_empty() || RequestData.password.is_empty() || RequestData.provider.is_empty() {
226 let ErrorMessage = "Invalid authentication parameters".to_string();
227 self.AppState
228 .UpdateRequestStatus(
229 &request_id,
230 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
231 None,
232 )
233 .await
234 .ok();
235
236 return Ok(Response::new(air_generated::AuthenticationResponse {
237 request_id,
238 success:false,
239 token:String::new(),
240 error:ErrorMessage,
241 }));
242 }
243
244 let username_for_log = RequestData.username.clone();
246 let password = RequestData.password;
247 let provider = RequestData.provider;
248
249 let result = self
250 .AuthService
251 .AuthenticateUser(RequestData.username, password, provider)
252 .await;
253
254 match result {
255 Ok(token) => {
256 self.AppState
257 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
258 .await
259 .ok();
260
261 info!(
263 "[AirVinegRPCService] Authentication successful for user: {} [Connection: {}]",
264 username_for_log, ConnectionId
265 );
266
267 Ok(Response::new(air_generated::AuthenticationResponse {
268 request_id,
269 success:true,
270 token,
271 error:String::new(),
272 }))
273 },
274 Err(e) => {
275 self.AppState
276 .UpdateRequestStatus(
277 &request_id,
278 crate::ApplicationState::RequestState::Failed(e.to_string()),
279 None,
280 )
281 .await
282 .ok();
283
284 warn!(
286 "[AirVinegRPCService] Authentication failed for user: {} [Connection: {}] - {}",
287 username_for_log, ConnectionId, e
288 );
289
290 Ok(Response::new(air_generated::AuthenticationResponse {
291 request_id,
292 success:false,
293 token:String::new(),
294 error:e.to_string(),
295 }))
296 },
297 }
298 }
299
300 async fn check_for_updates(
302 &self,
303 request:Request<UpdateCheckRequest>,
304 ) -> std::result::Result<Response<UpdateCheckResponse>, Status> {
305 let RequestData = request.into_inner();
306 let request_id = RequestData.request_id.clone();
307
308 info!(
309 "[AirVinegRPCService] Update check request received [ID: {}] - Version: {}, Channel: {}",
310 request_id, RequestData.current_version, RequestData.channel
311 );
312
313 self.AppState
314 .RegisterRequest(request_id.clone(), "updates".to_string())
315 .await
316 .map_err(|e| Status::internal(e.to_string()))?;
317
318 if RequestData.current_version.is_empty() {
320 let ErrorMessage = crate::AirError::Validation("CurrentVersion cannot be empty".to_string());
321 self.AppState
322 .UpdateRequestStatus(
323 &request_id,
324 crate::ApplicationState::RequestState::Failed(ErrorMessage.to_string()),
325 None,
326 )
327 .await
328 .ok();
329 return Err(Status::invalid_argument(ErrorMessage.to_string()));
330 }
331
332 let ValidChannels = ["stable", "beta", "nightly"];
334 let Channel = if RequestData.channel.is_empty() {
335 "stable".to_string()
336 } else {
337 RequestData.channel.clone()
338 };
339 if !ValidChannels.contains(&Channel.as_str()) {
340 let ErrorMessage = format!("Invalid channel: {}. Valid values are: {}", Channel, ValidChannels.join(", "));
341 self.AppState
342 .UpdateRequestStatus(
343 &request_id,
344 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
345 None,
346 )
347 .await
348 .ok();
349 return Err(Status::invalid_argument(ErrorMessage));
350 }
351
352 let result = self.UpdateManager.CheckForUpdates().await;
354
355 match result {
356 Ok(UpdateInfo) => {
357 self.AppState
358 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
359 .await
360 .ok();
361
362 info!(
363 "[AirVinegRPCService] Update check successful - Available: {}",
364 UpdateInfo.is_some()
365 );
366
367 Ok(Response::new(air_generated::UpdateCheckResponse {
368 request_id,
369 update_available:UpdateInfo.is_some(),
370 version:UpdateInfo.as_ref().map(|info| info.version.clone()).unwrap_or_default(),
371 download_url:UpdateInfo.as_ref().map(|info| info.download_url.clone()).unwrap_or_default(),
372 release_notes:UpdateInfo.as_ref().map(|info| info.release_notes.clone()).unwrap_or_default(),
373 error:String::new(),
374 }))
375 },
376 Err(crate::AirError::Network(e)) => {
377 self.AppState
378 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
379 .await
380 .ok();
381 error!("[AirVinegRPCService] Network error during update check: {}", e);
382 Err(Status::unavailable(e))
383 },
384 Err(e) => {
385 self.AppState
386 .UpdateRequestStatus(
387 &request_id,
388 crate::ApplicationState::RequestState::Failed(e.to_string()),
389 None,
390 )
391 .await
392 .ok();
393 error!("[AirVinegRPCService] Update check failed: {}", e);
394 Ok(Response::new(air_generated::UpdateCheckResponse {
395 request_id,
396 update_available:false,
397 version:String::new(),
398 download_url:String::new(),
399 release_notes:String::new(),
400 error:e.to_string(),
401 }))
402 },
403 }
404 }
405
406 async fn download_file(
408 &self,
409 request:Request<DownloadRequest>,
410 ) -> std::result::Result<Response<DownloadResponse>, Status> {
411 let RequestData = request.into_inner();
412 let request_id = RequestData.request_id.clone();
413
414 info!(
415 "[AirVinegRPCService] Download request received [ID: {}] - URL: {}",
416 request_id, RequestData.url
417 );
418
419 let download_request_id = if request_id.is_empty() {
421 crate::Utility::GenerateRequestId()
422 } else {
423 request_id.clone()
424 };
425
426 self.AppState
427 .RegisterRequest(download_request_id.clone(), "downloader".to_string())
428 .await
429 .map_err(|e| Status::internal(e.to_string()))?;
430
431 if RequestData.url.is_empty() {
433 let error_msg = "URL cannot be empty".to_string();
434 self.AppState
435 .UpdateRequestStatus(
436 &download_request_id,
437 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
438 None,
439 )
440 .await
441 .ok();
442 return Ok(Response::new(DownloadResponse {
443 request_id:download_request_id,
444 success:false,
445 file_path:String::new(),
446 file_size:0,
447 checksum:String::new(),
448 error:error_msg,
449 }));
450 }
451
452 if !match_url_scheme(&RequestData.url) {
454 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
455 self.AppState
456 .UpdateRequestStatus(
457 &download_request_id,
458 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
459 None,
460 )
461 .await
462 .ok();
463 return Ok(Response::new(DownloadResponse {
464 request_id:download_request_id,
465 success:false,
466 file_path:String::new(),
467 file_size:0,
468 checksum:String::new(),
469 error:error_msg,
470 }));
471 }
472
473 let DestinationPath = if RequestData.destination_path.is_empty() {
475 let config = &self.AppState.Configuration.Downloader;
477 config.CacheDirectory.clone()
478 } else {
479 RequestData.destination_path.clone()
480 };
481
482 let dest_path = std::path::Path::new(&DestinationPath);
484 if let Some(parent) = dest_path.parent() {
485 if !parent.exists() {
486 match tokio::fs::create_dir_all(parent).await {
487 Ok(_) => {
488 debug!("[AirVinegRPCService] Created destination directory: {}", parent.display());
489 },
490 Err(e) => {
491 let error_msg = format!("Failed to create destination directory: {}", e);
492 self.AppState
493 .UpdateRequestStatus(
494 &download_request_id,
495 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
496 None,
497 )
498 .await
499 .ok();
500 return Ok(Response::new(DownloadResponse {
501 request_id:download_request_id,
502 success:false,
503 file_path:String::new(),
504 file_size:0,
505 checksum:String::new(),
506 error:error_msg,
507 }));
508 },
509 }
510 }
511 }
512
513 let download_manager = self.DownloadManager.clone();
515 let AppState = self.AppState.clone();
516 let callback_request_id = download_request_id.clone();
517 let progress_callback = move |progress:f32| {
518 let state = AppState.clone();
519 let id = callback_request_id.clone();
520 tokio::spawn(async move {
521 let _ = state
522 .UpdateRequestStatus(&id, crate::ApplicationState::RequestState::InProgress, Some(progress))
523 .await;
524 });
525 };
526
527 let result = self
529 .download_file_with_retry(
530 &download_request_id,
531 RequestData.url.clone(),
532 DestinationPath,
533 RequestData.checksum,
534 Some(Box::new(progress_callback)),
535 )
536 .await;
537
538 match result {
539 Ok(file_info) => {
540 self.AppState
541 .UpdateRequestStatus(
542 &download_request_id,
543 crate::ApplicationState::RequestState::Completed,
544 Some(100.0),
545 )
546 .await
547 .ok();
548
549 info!(
550 "[AirVinegRPCService] Download completed [ID: {}] - Size: {} bytes",
551 download_request_id, file_info.size
552 );
553
554 Ok(Response::new(DownloadResponse {
555 request_id:download_request_id,
556 success:true,
557 file_path:file_info.path,
558 file_size:file_info.size,
559 checksum:file_info.checksum,
560 error:String::new(),
561 }))
562 },
563 Err(e) => {
564 self.AppState
565 .UpdateRequestStatus(
566 &download_request_id,
567 crate::ApplicationState::RequestState::Failed(e.to_string()),
568 None,
569 )
570 .await
571 .ok();
572
573 error!(
574 "[AirVinegRPCService] Download failed [ID: {}] - Error: {}",
575 download_request_id, e
576 );
577
578 Ok(Response::new(DownloadResponse {
579 request_id:download_request_id,
580 success:false,
581 file_path:String::new(),
582 file_size:0,
583 checksum:String::new(),
584 error:e.to_string(),
585 }))
586 },
587 }
588 }
589
590 async fn index_files(&self, request:Request<IndexRequest>) -> std::result::Result<Response<IndexResponse>, Status> {
592 let RequestData = request.into_inner();
593 let request_id = RequestData.request_id;
594
595 info!(
596 "[AirVinegRPCService] Index request received [ID: {}] - Path: {}",
597 request_id, RequestData.path
598 );
599
600 self.AppState
601 .RegisterRequest(request_id.clone(), "indexing".to_string())
602 .await
603 .map_err(|e| Status::internal(e.to_string()))?;
604
605 let result = self.FileIndexer.IndexDirectory(RequestData.path, RequestData.patterns).await;
606
607 match result {
608 Ok(index_info) => {
609 self.AppState
610 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
611 .await
612 .ok();
613
614 Ok(Response::new(air_generated::IndexResponse {
615 request_id,
616 success:true,
617 files_indexed:index_info.files_indexed,
618 total_size:index_info.total_size,
619 error:String::new(),
620 }))
621 },
622 Err(e) => {
623 self.AppState
624 .UpdateRequestStatus(
625 &request_id,
626 crate::ApplicationState::RequestState::Failed(e.to_string()),
627 None,
628 )
629 .await
630 .ok();
631
632 Ok(Response::new(air_generated::IndexResponse {
633 request_id,
634 success:false,
635 files_indexed:0,
636 total_size:0,
637 error:e.to_string(),
638 }))
639 },
640 }
641 }
642
643 async fn get_status(
645 &self,
646 request:Request<StatusRequest>,
647 ) -> std::result::Result<Response<StatusResponse>, Status> {
648 let RequestData = request.into_inner();
649
650 debug!("[AirVinegRPCService] Status request received");
651
652 let metrics = self.AppState.GetMetrics().await;
653 let resources = self.AppState.GetResourceUsage().await;
654
655 Ok(Response::new(air_generated::StatusResponse {
656 version:crate::VERSION.to_string(),
657 uptime_seconds:metrics.UptimeSeconds,
658 total_requests:metrics.TotalRequests,
659 successful_requests:metrics.SuccessfulRequests,
660 failed_requests:metrics.FailedRequests,
661 average_response_time:metrics.AverageResponseTime,
662 memory_usage_mb:resources.MemoryUsageMb,
663 cpu_usage_percent:resources.CPUUsagePercent,
664 active_requests:self.AppState.GetActiveRequestCount().await as u32,
665 }))
666 }
667
668 async fn health_check(
670 &self,
671 _request:Request<HealthCheckRequest>,
672 ) -> std::result::Result<Response<HealthCheckResponse>, Status> {
673 debug!("[AirVinegRPCService] Health check request received");
674
675 Ok(Response::new(air_generated::HealthCheckResponse {
676 healthy:true,
677 timestamp:CurrentTimestamp(),
678 }))
679 }
680
681 async fn download_update(
685 &self,
686 request:Request<DownloadRequest>,
687 ) -> std::result::Result<Response<DownloadResponse>, Status> {
688 let RequestData = request.into_inner();
689 let request_id = RequestData.request_id.clone();
690
691 info!(
692 "[AirVinegRPCService] Download update request received [ID: {}] - URL: {}, Destination: {}",
693 request_id, RequestData.url, RequestData.destination_path
694 );
695
696 self.AppState
697 .RegisterRequest(request_id.clone(), "download_update".to_string())
698 .await
699 .map_err(|e| Status::internal(e.to_string()))?;
700
701 if RequestData.url.is_empty() {
703 let error_msg = crate::AirError::Validation("URL cannot be empty".to_string());
704 self.AppState
705 .UpdateRequestStatus(
706 &request_id,
707 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
708 None,
709 )
710 .await
711 .ok();
712 return Err(Status::invalid_argument(error_msg.to_string()));
713 }
714
715 if !RequestData.url.starts_with("http://") && !RequestData.url.starts_with("https://") {
717 let error_msg = crate::AirError::Validation("URL must start with http:// or https://".to_string());
718 self.AppState
719 .UpdateRequestStatus(
720 &request_id,
721 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
722 None,
723 )
724 .await
725 .ok();
726 return Err(Status::invalid_argument(error_msg.to_string()));
727 }
728
729 let destination = if RequestData.destination_path.is_empty() {
731 self.UpdateManager
733 .GetCacheDirectory()
734 .join("update-latest.bin")
735 .to_string_lossy()
736 .to_string()
737 } else {
738 let dest_path = std::path::Path::new(&RequestData.destination_path);
740 if let Some(parent) = dest_path.parent() {
741 if parent.as_os_str().is_empty() {
742 self.UpdateManager
744 .GetCacheDirectory()
745 .join(&RequestData.destination_path)
746 .to_string_lossy()
747 .to_string()
748 } else {
749 RequestData.destination_path.clone()
751 }
752 } else {
753 RequestData.destination_path.clone()
754 }
755 };
756
757 let dest_path = std::path::Path::new(&destination);
759 if let Some(parent) = dest_path.parent() {
760 if !parent.exists() {
761 return Err(Status::failed_precondition(format!(
762 "Destination directory does not exist: {}",
763 parent.display()
764 )));
765 }
766
767 if let Err(e) = std::fs::write(parent.join(".write_test"), "") {
769 let error_msg = crate::AirError::FileSystem(format!("Destination directory not writeable: {}", e));
770 self.AppState
771 .UpdateRequestStatus(
772 &request_id,
773 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
774 None,
775 )
776 .await
777 .ok();
778 return Err(Status::permission_denied(error_msg.to_string()));
779 }
780 let _ = std::fs::remove_file(parent.join(".write_test"));
782 }
783
784 let download_result = self
787 .DownloadManager
788 .DownloadFile(RequestData.url, destination.clone(), RequestData.checksum)
789 .await;
790
791 match download_result {
792 Ok(result) => {
793 self.AppState
794 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
795 .await
796 .ok();
797
798 info!(
799 "[AirVinegRPCService] Update downloaded successfully - Path: {}, Size: {}, Checksum: {}",
800 result.path, result.size, result.checksum
801 );
802
803 Ok(Response::new(DownloadResponse {
804 request_id,
805 success:true,
806 file_path:result.path,
807 file_size:result.size,
808 checksum:result.checksum,
809 error:String::new(),
810 }))
811 },
812 Err(crate::AirError::Network(e)) => {
813 self.AppState
814 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
815 .await
816 .ok();
817 error!("[AirVinegRPCService] Download update network error: {}", e);
818 Err(Status::unavailable(e))
819 },
820 Err(crate::AirError::FileSystem(e)) => {
821 self.AppState
822 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
823 .await
824 .ok();
825 error!("[AirVinegRPCService] Download update filesystem error: {}", e);
826 Err(Status::internal(e))
827 },
828 Err(e) => {
829 self.AppState
830 .UpdateRequestStatus(
831 &request_id,
832 crate::ApplicationState::RequestState::Failed(e.to_string()),
833 None,
834 )
835 .await
836 .ok();
837 error!("[AirVinegRPCService] Download update failed: {}", e);
838 Ok(Response::new(DownloadResponse {
839 request_id,
840 success:false,
841 file_path:String::new(),
842 file_size:0,
843 checksum:String::new(),
844 error:e.to_string(),
845 }))
846 },
847 }
848 }
849
850 async fn apply_update(
852 &self,
853 request:Request<ApplyUpdateRequest>,
854 ) -> std::result::Result<Response<ApplyUpdateResponse>, Status> {
855 let RequestData = request.into_inner();
856 let request_id = RequestData.request_id.clone();
857
858 info!(
859 "[AirVinegRPCService] Apply update request received [ID: {}] - Version: {}, Path: {}",
860 request_id, RequestData.version, RequestData.update_path
861 );
862
863 self.AppState
864 .RegisterRequest(request_id.clone(), "apply_update".to_string())
865 .await
866 .map_err(|e| Status::internal(e.to_string()))?;
867
868 if RequestData.version.is_empty() {
870 let error_msg = crate::AirError::Validation("version cannot be empty".to_string());
871 self.AppState
872 .UpdateRequestStatus(
873 &request_id,
874 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
875 None,
876 )
877 .await
878 .ok();
879 return Err(Status::invalid_argument(error_msg.to_string()));
880 }
881
882 if RequestData.update_path.is_empty() {
884 let error_msg = crate::AirError::Validation("update_path cannot be empty".to_string());
885 self.AppState
886 .UpdateRequestStatus(
887 &request_id,
888 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
889 None,
890 )
891 .await
892 .ok();
893 return Err(Status::invalid_argument(error_msg.to_string()));
894 }
895
896 let update_path = std::path::Path::new(&RequestData.update_path);
897
898 if !update_path.exists() {
900 let error_msg = crate::AirError::FileSystem(format!("Update file not found: {}", RequestData.update_path));
901 self.AppState
902 .UpdateRequestStatus(
903 &request_id,
904 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
905 None,
906 )
907 .await
908 .ok();
909 return Err(Status::not_found(error_msg.to_string()));
910 }
911
912 let metadata = match std::fs::metadata(update_path) {
914 Ok(m) => m,
915 Err(e) => {
916 let error_msg = crate::AirError::FileSystem(format!("Failed to read update file metadata: {}", e));
917 self.AppState
918 .UpdateRequestStatus(
919 &request_id,
920 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
921 None,
922 )
923 .await
924 .ok();
925 return Err(Status::internal(error_msg.to_string()));
926 },
927 };
928
929 if metadata.len() == 0 {
930 let error_msg = crate::AirError::Validation("Update file is empty".to_string());
931 self.AppState
932 .UpdateRequestStatus(
933 &request_id,
934 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
935 None,
936 )
937 .await
938 .ok();
939 return Err(Status::failed_precondition(error_msg.to_string()));
940 }
941
942 let rollback_backup_path = self.prepare_rollback_backup(&RequestData.version).await;
944 if let Err(ref e) = rollback_backup_path {
945 warn!(
946 "[AirVinegRPCService] Failed to prepare rollback backup: {}. Proceeding without rollback capability.",
947 e
948 );
949 }
950
951 match self.UpdateManager.verify_update(&RequestData.update_path, None).await {
953 Ok(true) => {
954 info!("[AirVinegRPCService] Update verification successful, preparing for installation");
955
956 self.AppState
957 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
958 .await
959 .ok();
960
961 let AppState = self.AppState.clone();
963 let version = RequestData.version.clone();
964 let self_clone = self.clone();
965
966 tokio::spawn(async move {
967 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
968 log::info!(
969 "[AirVinegRPCService] Initiating graceful shutdown for update version {}",
970 version
971 );
972
973 if let Err(e) = AppState.StopAllBackgroundTasks().await {
975 log::error!("[AirVinegRPCService] Failed to initiate graceful shutdown: {}", e);
976
977 log::warn!("[AirVinegRPCService] Rollback initiated due to graceful shutdown failure");
979 if let Err(rollback_error) = self_clone.perform_rollback(&version).await {
980 log::error!("[AirVinegRPCService] Rollback failed: {}", rollback_error);
981 } else {
982 log::info!("[AirVinegRPCService] Rollback completed successfully");
983 }
984 }
985 });
986
987 Ok(Response::new(ApplyUpdateResponse {
988 request_id,
989 success:true,
990 error:String::new(),
991 }))
992 },
993 Ok(false) => {
994 let error_msg = "Update verification failed: checksum mismatch".to_string();
995 self.AppState
996 .UpdateRequestStatus(
997 &request_id,
998 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
999 None,
1000 )
1001 .await
1002 .ok();
1003 error!("[AirVinegRPCService] {}", error_msg);
1004
1005 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1007
1008 Err(Status::failed_precondition(error_msg))
1009 },
1010 Err(crate::AirError::FileSystem(e)) => {
1011 self.AppState
1012 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
1013 .await
1014 .ok();
1015 error!("[AirVinegRPCService] Update verification filesystem error: {}", e);
1016
1017 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1019
1020 Err(Status::internal(e))
1021 },
1022 Err(e) => {
1023 self.AppState
1024 .UpdateRequestStatus(
1025 &request_id,
1026 crate::ApplicationState::RequestState::Failed(e.to_string()),
1027 None,
1028 )
1029 .await
1030 .ok();
1031 error!("[AirVinegRPCService] Update verification error: {}", e);
1032
1033 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1035
1036 Ok(Response::new(ApplyUpdateResponse {
1037 request_id,
1038 success:false,
1039 error:e.to_string(),
1040 }))
1041 },
1042 }
1043 }
1044
1045 type DownloadStreamStream =
1050 tokio_stream::wrappers::ReceiverStream<std::result::Result<air_generated::DownloadStreamResponse, Status>>;
1051
1052 async fn download_stream(
1053 &self,
1054 request:Request<DownloadStreamRequest>,
1055 ) -> std::result::Result<Response<Self::DownloadStreamStream>, Status> {
1056 let RequestData = request.into_inner();
1057 let request_id = RequestData.request_id.clone();
1058
1059 info!(
1060 "[AirVinegRPCService] Download stream request received [ID: {}] - URL: {}",
1061 request_id, RequestData.url
1062 );
1063
1064 self.AppState
1065 .RegisterRequest(request_id.clone(), "downloader_stream".to_string())
1066 .await
1067 .map_err(|e| Status::internal(e.to_string()))?;
1068
1069 if RequestData.url.is_empty() {
1071 let error_msg = "URL cannot be empty".to_string();
1072 self.AppState
1073 .UpdateRequestStatus(
1074 &request_id,
1075 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1076 None,
1077 )
1078 .await
1079 .ok();
1080 return Err(Status::invalid_argument(error_msg));
1081 }
1082
1083 if !match_url_scheme(&RequestData.url) {
1085 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
1086 self.AppState
1087 .UpdateRequestStatus(
1088 &request_id,
1089 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1090 None,
1091 )
1092 .await
1093 .ok();
1094 return Err(Status::invalid_argument(error_msg));
1095 }
1096
1097 match self.validate_range_support(&RequestData.url).await {
1099 Ok(true) => {
1100 debug!("[AirVinegRPCService] URL supports range headers");
1101 },
1102 Ok(false) => {
1103 warn!("[AirVinegRPCService] URL does not support range headers, streaming may be inefficient");
1104 },
1105 Err(e) => {
1106 let error_msg = format!("Failed to validate range support: {}", e);
1107 self.AppState
1108 .UpdateRequestStatus(
1109 &request_id,
1110 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1111 None,
1112 )
1113 .await
1114 .ok();
1115 return Err(Status::internal(error_msg));
1116 },
1117 }
1118
1119 let (tx, rx) = tokio::sync::mpsc::channel(100);
1121
1122 let chunk_size = 8 * 1024 * 1024; let url = RequestData.url.clone();
1127 let headers = RequestData.headers;
1128 let download_request_id = request_id.clone();
1129 let download_manager = self.DownloadManager.clone();
1130 let AppState = self.AppState.clone();
1131
1132 tokio::spawn(async move {
1134 if tx
1136 .send(Ok(DownloadStreamResponse {
1137 request_id:download_request_id.clone(),
1138 chunk:vec![].into(),
1139 total_size:0,
1140 downloaded:0,
1141 completed:false,
1142 error:String::new(),
1143 }))
1144 .await
1145 .is_err()
1146 {
1147 log::warn!(
1148 "[AirVinegRPCService] Client disconnected before streaming started [ID: {}]",
1149 download_request_id
1150 );
1151 return;
1152 }
1153
1154 let client = reqwest::Client::builder()
1156 .pool_idle_timeout(std::time::Duration::from_secs(60))
1157 .pool_max_idle_per_host(5)
1158 .timeout(std::time::Duration::from_secs(300))
1159 .build();
1160
1161 if client.is_err() {
1162 let error = client.unwrap_err().to_string();
1163 let _ = tx
1164 .send(Ok(DownloadStreamResponse {
1165 request_id:download_request_id.clone(),
1166 chunk:vec![].into(),
1167 total_size:0,
1168 downloaded:0,
1169 completed:false,
1170 error:error.clone(),
1171 }))
1172 .await;
1173 AppState
1174 .UpdateRequestStatus(
1175 &download_request_id,
1176 crate::ApplicationState::RequestState::Failed(error),
1177 None,
1178 )
1179 .await
1180 .ok();
1181 return;
1182 }
1183
1184 let client = match client {
1185 Ok(client) => client,
1186 Err(e) => {
1187 let error = format!("Failed to create HTTP client: {}", e);
1188 let _ = tx.send(Err(Status::internal(error.clone())));
1189 AppState
1190 .UpdateRequestStatus(
1191 &download_request_id,
1192 crate::ApplicationState::RequestState::Failed(error),
1193 None,
1194 )
1195 .await
1196 .ok();
1197 return;
1198 },
1199 };
1200
1201 let mut total_size:u64 = 0;
1203 let mut total_downloaded:u64 = 0;
1204
1205 match client
1206 .get(&url)
1207 .headers({
1208 let mut map = reqwest::header::HeaderMap::new();
1209 for (key, value) in headers {
1210 if let (Ok(header_name), Ok(header_value)) = (
1211 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1212 reqwest::header::HeaderValue::from_str(&value),
1213 ) {
1214 map.insert(header_name, header_value);
1215 }
1216 }
1217 map
1218 })
1219 .send()
1220 .await
1221 {
1222 Ok(response) => {
1223 if !response.status().is_success() {
1224 let error = format!("Download failed with status: {}", response.status());
1225 let _ = tx
1226 .send(Ok(DownloadStreamResponse {
1227 request_id:download_request_id.clone(),
1228 chunk:vec![].into(),
1229 total_size:0,
1230 downloaded:0,
1231 completed:false,
1232 error:error.clone(),
1233 }))
1234 .await;
1235 AppState
1236 .UpdateRequestStatus(
1237 &download_request_id,
1238 crate::ApplicationState::RequestState::Failed(error),
1239 None,
1240 )
1241 .await
1242 .ok();
1243 return;
1244 }
1245
1246 total_size = response.content_length().unwrap_or(0);
1247 let response_tx = tx.clone();
1248 let response_id = download_request_id.clone();
1249
1250 let mut stream = response.bytes_stream();
1252 let mut buffer = Vec::with_capacity(chunk_size);
1253 let mut last_progress:f32 = 0.0;
1254
1255 while let Some(chunk_result) = TokioStreamExt::next(&mut stream).await {
1256 if AppState.IsRequestCancelled(&download_request_id).await {
1257 log::info!(
1258 "[AirVinegRPCService] Download cancelled by client [ID: {}]",
1259 download_request_id
1260 );
1261 AppState
1262 .UpdateRequestStatus(
1263 &download_request_id,
1264 crate::ApplicationState::RequestState::Cancelled,
1265 None,
1266 )
1267 .await
1268 .ok();
1269 return;
1270 }
1271
1272 match chunk_result {
1273 Ok(chunk) => {
1274 buffer.extend_from_slice(&chunk);
1275 total_downloaded += chunk.len() as u64;
1276
1277 if buffer.len() >= chunk_size {
1279 let chunk_checksum = calculate_chunk_checksum(&buffer);
1281
1282 let progress = if total_size > 0 {
1284 (total_downloaded as f32 / total_size as f32) * 100.0
1285 } else {
1286 0.0
1287 };
1288
1289 if progress - last_progress >= 5.0 {
1291 AppState
1292 .UpdateRequestStatus(
1293 &download_request_id,
1294 crate::ApplicationState::RequestState::InProgress,
1295 Some(progress),
1296 )
1297 .await
1298 .ok();
1299 last_progress = progress;
1300 }
1301
1302 if response_tx
1303 .send(Ok(DownloadStreamResponse {
1304 request_id:response_id.clone(),
1305 chunk:buffer.clone().into(),
1306 total_size,
1307 downloaded:total_downloaded,
1308 completed:false,
1309 error:String::new(),
1310 }))
1311 .await
1312 .is_err()
1313 {
1314 log::warn!(
1315 "[AirVinegRPCService] Client disconnected during streaming [ID: {}]",
1316 download_request_id
1317 );
1318 AppState
1319 .UpdateRequestStatus(
1320 &download_request_id,
1321 crate::ApplicationState::RequestState::Failed(
1322 "Client disconnected".to_string(),
1323 ),
1324 None,
1325 )
1326 .await
1327 .ok();
1328 return;
1329 }
1330
1331 debug!(
1332 "[AirVinegRPCService] Sent chunk of {} bytes [ID: {}] - Progress: {:.1}%",
1333 buffer.len(),
1334 download_request_id,
1335 progress
1336 );
1337
1338 buffer.clear();
1339 }
1340 },
1341 Err(e) => {
1342 let error = format!("Download error: {}", e);
1343 log::error!(
1344 "[AirVinegRPCService] Stream download failed [ID: {}]: {}",
1345 download_request_id,
1346 error
1347 );
1348
1349 let _ = response_tx
1350 .send(Ok(DownloadStreamResponse {
1351 request_id:response_id.clone(),
1352 chunk:vec![].into(),
1353 total_size,
1354 downloaded:total_downloaded,
1355 completed:false,
1356 error:error.clone(),
1357 }))
1358 .await;
1359
1360 AppState
1361 .UpdateRequestStatus(
1362 &download_request_id,
1363 crate::ApplicationState::RequestState::Failed(error),
1364 None,
1365 )
1366 .await
1367 .ok();
1368 return;
1369 },
1370 }
1371 }
1372
1373 if !buffer.is_empty() {
1375 let chunk_checksum = calculate_chunk_checksum(&buffer);
1376
1377 if tx
1378 .send(Ok(DownloadStreamResponse {
1379 request_id:download_request_id.clone(),
1380 chunk:buffer.into(),
1381 total_size,
1382 downloaded:total_downloaded,
1383 completed:false,
1384 error:String::new(),
1385 }))
1386 .await
1387 .is_err()
1388 {
1389 log::warn!(
1390 "[AirVinegRPCService] Client disconnected while sending final chunk [ID: {}]",
1391 download_request_id
1392 );
1393 return;
1394 }
1395 }
1396
1397 AppState
1399 .UpdateRequestStatus(
1400 &download_request_id,
1401 crate::ApplicationState::RequestState::Completed,
1402 Some(100.0),
1403 )
1404 .await
1405 .ok();
1406
1407 let _ = tx
1408 .send(Ok(DownloadStreamResponse {
1409 request_id,
1410 chunk:vec![].into(),
1411 total_size,
1412 downloaded:total_downloaded,
1413 completed:true,
1414 error:String::new(),
1415 }))
1416 .await;
1417
1418 info!(
1419 "[AirVinegRPCService] Stream download completed [ID: {}] - Total: {} bytes",
1420 download_request_id, total_downloaded
1421 );
1422 },
1423 Err(e) => {
1424 let error = format!("Failed to start streaming download: {}", e);
1425 log::error!(
1426 "[AirVinegRPCService] Stream download error [ID: {}]: {}",
1427 download_request_id,
1428 error
1429 );
1430
1431 let _ = tx
1432 .send(Ok(DownloadStreamResponse {
1433 request_id:download_request_id.clone(),
1434 chunk:vec![].into(),
1435 total_size:0,
1436 downloaded:0,
1437 completed:false,
1438 error:error.clone(),
1439 }))
1440 .await;
1441
1442 AppState
1443 .UpdateRequestStatus(
1444 &download_request_id,
1445 crate::ApplicationState::RequestState::Failed(error),
1446 None,
1447 )
1448 .await
1449 .ok();
1450 },
1451 }
1452 });
1453
1454 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
1455 }
1456
1457 async fn search_files(
1461 &self,
1462 request:Request<SearchRequest>,
1463 ) -> std::result::Result<Response<SearchResponse>, Status> {
1464 let RequestData = request.into_inner();
1465 let request_id = RequestData.request_id.clone();
1466
1467 debug!(
1468 "[AirVinegRPCService] Search files request: query='{}' in path='{}'",
1469 RequestData.query, RequestData.path
1470 );
1471
1472 if RequestData.query.is_empty() {
1474 return Ok(Response::new(SearchResponse {
1475 request_id,
1476 results:vec![],
1477 total_results:0,
1478 error:"Search query cannot be empty".to_string(),
1479 }));
1480 }
1481
1482 let path = if RequestData.path.is_empty() { None } else { Some(RequestData.path.clone()) };
1484 let search_path = path.as_deref();
1485
1486 match self
1487 .FileIndexer
1488 .SearchFiles(
1489 SearchQuery {
1490 query:RequestData.query.clone(),
1491 mode:SearchMode::Literal,
1492 case_sensitive:false,
1493 whole_word:false,
1494 regex:None,
1495 max_results:RequestData.max_results,
1496 page:1,
1497 },
1498 path,
1499 None,
1500 )
1501 .await
1502 {
1503 Ok(search_results) => {
1504 let mut file_results = Vec::new();
1506 for r in search_results {
1507 let (match_preview, line_number) = if let Some(first_match) = r.matches.first() {
1509 (first_match.line_content.clone(), first_match.line_number)
1510 } else {
1511 (String::new(), 0)
1512 };
1513
1514 let size = if let Ok(Some(metadata)) = self.FileIndexer.GetFileInfo(r.path.clone()).await {
1516 metadata.size
1517 } else if let Ok(file_metadata) = std::fs::metadata(&r.path) {
1518 file_metadata.len()
1519 } else {
1520 0
1521 };
1522
1523 file_results.push(FileResult { path:r.path, size, match_preview, line_number });
1524 }
1525
1526 info!("[AirVinegRPCService] Search completed: {} results found", file_results.len());
1527
1528 let result_count = file_results.len();
1529 Ok(Response::new(SearchResponse {
1530 request_id,
1531 results:file_results,
1532 total_results:result_count as u32,
1533 error:String::new(),
1534 }))
1535 },
1536 Err(e) => {
1537 error!("[AirVinegRPCService] Search failed: {}", e);
1538 Ok(Response::new(SearchResponse {
1539 request_id,
1540 results:vec![],
1541 total_results:0,
1542 error:e.to_string(),
1543 }))
1544 },
1545 }
1546 }
1547
1548 async fn get_file_info(
1550 &self,
1551 request:Request<FileInfoRequest>,
1552 ) -> std::result::Result<Response<FileInfoResponse>, Status> {
1553 let RequestData = request.into_inner();
1554 let request_id = RequestData.request_id.clone();
1555
1556 debug!("[AirVinegRPCService] Get file info request: {}", RequestData.path);
1557
1558 if RequestData.path.is_empty() {
1560 return Ok(Response::new(FileInfoResponse {
1561 request_id,
1562 exists:false,
1563 size:0,
1564 mime_type:String::new(),
1565 checksum:String::new(),
1566 modified_time:0,
1567 error:"Path cannot be empty".to_string(),
1568 }));
1569 }
1570
1571 use std::path::Path;
1573 let path = Path::new(&RequestData.path);
1574
1575 if !path.exists() {
1576 return Ok(Response::new(FileInfoResponse {
1577 request_id,
1578 exists:false,
1579 size:0,
1580 mime_type:String::new(),
1581 checksum:String::new(),
1582 modified_time:0,
1583 error:String::new(), }));
1585 }
1586
1587 match std::fs::metadata(path) {
1589 Ok(metadata) => {
1590 let modified_time = metadata
1591 .modified()
1592 .ok()
1593 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
1594 .map(|d| d.as_secs())
1595 .unwrap_or(0);
1596
1597 let mime_type = self.detect_mime_type(path);
1599
1600 let checksum = calculate_file_checksum(path).await.unwrap_or_else(|e| {
1602 log::warn!("[AirVinegRPCService] Failed to calculate checksum: {}", e);
1603 String::new()
1604 });
1605
1606 Ok(Response::new(FileInfoResponse {
1607 request_id,
1608 exists:true,
1609 size:metadata.len(),
1610 mime_type,
1611 checksum,
1612 modified_time,
1613 error:String::new(),
1614 }))
1615 },
1616 Err(e) => {
1617 error!("[AirVinegRPCService] Failed to get file metadata: {}", e);
1618 Ok(Response::new(FileInfoResponse {
1619 request_id,
1620 exists:false,
1621 size:0,
1622 mime_type:String::new(),
1623 checksum:String::new(),
1624 modified_time:0,
1625 error:e.to_string(),
1626 }))
1627 },
1628 }
1629 }
1630
1631 async fn get_metrics(
1635 &self,
1636 request:Request<MetricsRequest>,
1637 ) -> std::result::Result<Response<MetricsResponse>, Status> {
1638 let RequestData = request.into_inner();
1639 let request_id = RequestData.request_id.clone();
1640
1641 debug!("[AirVinegRPCService] Get metrics request: type='{}'", RequestData.metric_type);
1642
1643 let metrics = self.AppState.GetMetrics().await;
1644 let mut metrics_map = std::collections::HashMap::new();
1645
1646 if RequestData.metric_type.is_empty() || RequestData.metric_type == "performance" {
1648 metrics_map.insert("uptime_seconds".to_string(), metrics.UptimeSeconds.to_string());
1649 metrics_map.insert("total_requests".to_string(), metrics.TotalRequests.to_string());
1650 metrics_map.insert("successful_requests".to_string(), metrics.SuccessfulRequests.to_string());
1651 metrics_map.insert("failed_requests".to_string(), metrics.FailedRequests.to_string());
1652 metrics_map.insert("average_response_time_ms".to_string(), metrics.AverageResponseTime.to_string());
1653 }
1654
1655 if RequestData.metric_type.is_empty() || RequestData.metric_type == "requests" {
1657 metrics_map.insert(
1658 "ActiveRequests".to_string(),
1659 self.AppState.GetActiveRequestCount().await.to_string(),
1660 );
1661 }
1662
1663 Ok(Response::new(MetricsResponse {
1664 request_id,
1665 metrics:metrics_map,
1666 error:String::new(),
1667 }))
1668 }
1669
1670 async fn get_resource_usage(
1672 &self,
1673 request:Request<ResourceUsageRequest>,
1674 ) -> std::result::Result<Response<ResourceUsageResponse>, Status> {
1675 let RequestData = request.into_inner();
1676 let request_id = RequestData.request_id.clone();
1677
1678 debug!("[AirVinegRPCService] Get resource usage request");
1679
1680 let resources = self.AppState.GetResourceUsage().await;
1681
1682 Ok(Response::new(ResourceUsageResponse {
1683 request_id,
1684 memory_usage_mb:resources.MemoryUsageMb,
1685 cpu_usage_percent:resources.CPUUsagePercent,
1686 disk_usage_mb:resources.DiskUsageMb,
1687 network_usage_mbps:resources.NetworkUsageMbps,
1688 error:String::new(),
1689 }))
1690 }
1691
1692 async fn set_resource_limits(
1694 &self,
1695 request:Request<ResourceLimitsRequest>,
1696 ) -> std::result::Result<Response<ResourceLimitsResponse>, Status> {
1697 let RequestData = request.into_inner();
1698 let request_id = RequestData.request_id.clone();
1699
1700 info!(
1701 "[AirVinegRPCService] Set resource limits: memory={}MB, cpu={}%, disk={}MB",
1702 RequestData.memory_limit_mb, RequestData.cpu_limit_percent, RequestData.disk_limit_mb
1703 );
1704
1705 if RequestData.memory_limit_mb == 0 {
1707 return Ok(Response::new(ResourceLimitsResponse {
1708 request_id,
1709 success:false,
1710 error:"Memory limit must be greater than 0".to_string(),
1711 }));
1712 }
1713
1714 if RequestData.cpu_limit_percent > 100 {
1715 return Ok(Response::new(ResourceLimitsResponse {
1716 request_id,
1717 success:false,
1718 error:"CPU limit cannot exceed 100%".to_string(),
1719 }));
1720 }
1721
1722 let result = self
1724 .AppState
1725 .SetResourceLimits(
1726 Some(RequestData.memory_limit_mb as u64),
1727 Some(RequestData.cpu_limit_percent as f64),
1728 Some(RequestData.disk_limit_mb as u64),
1729 )
1730 .await;
1731
1732 match result {
1733 Ok(_) => {
1734 Ok(Response::new(ResourceLimitsResponse {
1735 request_id,
1736 success:true,
1737 error:String::new(),
1738 }))
1739 },
1740 Err(e) => {
1741 Ok(Response::new(ResourceLimitsResponse {
1742 request_id,
1743 success:false,
1744 error:e.to_string(),
1745 }))
1746 },
1747 }
1748 }
1749
1750 async fn get_configuration(
1754 &self,
1755 request:Request<ConfigurationRequest>,
1756 ) -> std::result::Result<Response<ConfigurationResponse>, Status> {
1757 let RequestData = request.into_inner();
1758 let request_id = RequestData.request_id.clone();
1759
1760 debug!(
1761 "[AirVinegRPCService] Get configuration request: section='{}'",
1762 RequestData.section
1763 );
1764
1765 let config = self.AppState.GetConfiguration().await;
1767 let mut config_map = std::collections::HashMap::new();
1768
1769 match RequestData.section.as_str() {
1771 "grpc" => {
1772 config_map.insert("bind_address".to_string(), config.Grpc.BindAddress.clone());
1773 config_map.insert("max_connections".to_string(), config.Grpc.MaxConnections.to_string());
1774 config_map.insert("request_timeout_secs".to_string(), config.Grpc.RequestTimeoutSecs.to_string());
1775 },
1776 "authentication" => {
1777 config_map.insert("enabled".to_string(), config.Authentication.Enabled.to_string());
1778 config_map.insert("credentials_path".to_string(), "***REDACTED***".to_string());
1779 config_map.insert(
1780 "token_expiration_hours".to_string(),
1781 config.Authentication.TokenExpirationHours.to_string(),
1782 );
1783 },
1784 "updates" => {
1785 config_map.insert("enabled".to_string(), config.Updates.Enabled.to_string());
1786 config_map.insert(
1787 "check_interval_hours".to_string(),
1788 config.Updates.CheckIntervalHours.to_string(),
1789 );
1790 config_map.insert("update_server_url".to_string(), config.Updates.UpdateServerUrl.clone());
1791 config_map.insert("auto_download".to_string(), config.Updates.AutoDownload.to_string());
1792 config_map.insert("auto_install".to_string(), config.Updates.AutoInstall.to_string());
1793 },
1794 "downloader" => {
1795 config_map.insert("enabled".to_string(), config.Downloader.Enabled.to_string());
1796 config_map.insert(
1797 "max_concurrent_downloads".to_string(),
1798 config.Downloader.MaxConcurrentDownloads.to_string(),
1799 );
1800 config_map.insert(
1801 "download_timeout_secs".to_string(),
1802 config.Downloader.DownloadTimeoutSecs.to_string(),
1803 );
1804 config_map.insert("max_retries".to_string(), config.Downloader.MaxRetries.to_string());
1805 config_map.insert("cache_directory".to_string(), config.Downloader.CacheDirectory.clone());
1806 },
1807 "indexing" => {
1808 config_map.insert("enabled".to_string(), config.Indexing.Enabled.to_string());
1809 config_map.insert("max_file_size_mb".to_string(), config.Indexing.MaxFileSizeMb.to_string());
1810 config_map.insert("file_types".to_string(), config.Indexing.FileTypes.join(","));
1811 config_map.insert(
1812 "update_interval_minutes".to_string(),
1813 config.Indexing.UpdateIntervalMinutes.to_string(),
1814 );
1815 config_map.insert("index_directory".to_string(), config.Indexing.IndexDirectory.clone());
1816 },
1817 _ => {
1818 config_map.insert("_grpc_enabled".to_string(), "true".to_string());
1820 },
1821 }
1822
1823 Ok(Response::new(ConfigurationResponse {
1824 request_id,
1825 configuration:config_map,
1826 error:String::new(),
1827 }))
1828 }
1829
1830 async fn update_configuration(
1832 &self,
1833 request:Request<UpdateConfigurationRequest>,
1834 ) -> std::result::Result<Response<UpdateConfigurationResponse>, Status> {
1835 let RequestData = request.into_inner();
1836 let request_id = RequestData.request_id.clone();
1837
1838 info!(
1839 "[AirVinegRPCService] Update configuration request: section='{}'",
1840 RequestData.section
1841 );
1842
1843 if !["grpc", "authentication", "updates", "downloader", "indexing", ""].contains(&RequestData.section.as_str())
1845 {
1846 return Ok(Response::new(UpdateConfigurationResponse {
1847 request_id,
1848 success:false,
1849 error:"Invalid configuration section".to_string(),
1850 }));
1851 }
1852
1853 let result = self
1855 .AppState
1856 .UpdateConfiguration(RequestData.section, RequestData.updates)
1857 .await;
1858
1859 match result {
1860 Ok(_) => {
1861 Ok(Response::new(UpdateConfigurationResponse {
1862 request_id,
1863 success:true,
1864 error:String::new(),
1865 }))
1866 },
1867 Err(e) => {
1868 Ok(Response::new(UpdateConfigurationResponse {
1869 request_id,
1870 success:false,
1871 error:e.to_string(),
1872 }))
1873 },
1874 }
1875 }
1876}
1877
1878impl AirVinegRPCService {
1881 fn detect_mime_type(&self, path:&std::path::Path) -> String {
1883 match path.extension().and_then(|e| e.to_str()) {
1884 Some("rs") => "text/x-rust".to_string(),
1885 Some("ts") => "application/typescript".to_string(),
1886 Some("js") => "application/javascript".to_string(),
1887 Some("json") => "application/json".to_string(),
1888 Some("toml") => "application/toml".to_string(),
1889 Some("md") => "text/markdown".to_string(),
1890 Some("txt") => "text/plain".to_string(),
1891 Some("yaml") | Some("yml") => "application/x-yaml".to_string(),
1892 Some("html") => "text/html".to_string(),
1893 Some("css") => "text/css".to_string(),
1894 Some("xml") => "application/xml".to_string(),
1895 Some("png") => "image/png".to_string(),
1896 Some("jpg") | Some("jpeg") => "image/jpeg".to_string(),
1897 Some("gif") => "image/gif".to_string(),
1898 Some("svg") => "image/svg+xml".to_string(),
1899 Some("pdf") => "application/pdf".to_string(),
1900 Some("zip") => "application/zip".to_string(),
1901 Some("tar") | Some("gz") => "application/x-tar".to_string(),
1902 Some("proto") => "application/x-protobuf".to_string(),
1903 _ => "application/octet-stream".to_string(),
1904 }
1905 }
1906
1907 async fn download_file_with_retry(
1910 &self,
1911 request_id:&str,
1912 url:String,
1913 DestinationPath:String,
1914 checksum:String,
1915 progress_callback:Option<Box<dyn Fn(f32) + Send>>,
1916 ) -> Result<crate::Downloader::DownloadResult> {
1917 let config = &self.AppState.Configuration.Downloader;
1918 let mut retries = 0;
1919
1920 loop {
1921 match self
1922 .DownloadManager
1923 .DownloadFile(url.clone(), DestinationPath.clone(), checksum.clone())
1924 .await
1925 {
1926 Ok(file_info) => {
1927 if let Some(ref callback) = progress_callback {
1928 callback(100.0);
1929 }
1930 return Ok(file_info);
1931 },
1932 Err(e) => {
1933 if retries < config.MaxRetries as usize {
1934 retries += 1;
1935 let backoff_secs = 2u64.pow(retries as u32);
1936 warn!(
1937 "[AirVinegRPCService] Download failed [ID: {}], retrying (attempt {}/{}): {} - Backing \
1938 off {} seconds",
1939 request_id, retries, config.MaxRetries, e, backoff_secs
1940 );
1941
1942 if let Some(ref callback) = progress_callback {
1943 let progress = (retries as f32 / config.MaxRetries as f32) * 10.0;
1945 callback(progress);
1946 }
1947
1948 tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
1949 } else {
1950 error!(
1951 "[AirVinegRPCService] Download failed after {} retries [ID: {}]: {}",
1952 config.MaxRetries, request_id, e
1953 );
1954 return Err(e);
1955 }
1956 },
1957 }
1958 }
1959 }
1960
1961 async fn validate_range_support(&self, url:&str) -> Result<bool> {
1963 let client = reqwest::Client::builder()
1964 .timeout(std::time::Duration::from_secs(10))
1965 .build()
1966 .map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client for validation: {}", e)))?;
1967
1968 let response = client
1969 .head(url)
1970 .send()
1971 .await
1972 .map_err(|e| crate::AirError::Network(format!("Failed to send HEAD request: {}", e)))?;
1973
1974 let accepts_ranges = response
1976 .headers()
1977 .get("accept-ranges")
1978 .map(|v| v.to_str().unwrap_or("none"))
1979 .unwrap_or("none");
1980
1981 Ok(accepts_ranges == "bytes")
1982 }
1983
1984 async fn prepare_rollback_backup(&self, version:&str) -> Result<()> {
1986 let cache_dir = self.UpdateManager.GetCacheDirectory();
1987 let rollback_dir = cache_dir.join("rollback");
1988
1989 if let Err(e) = tokio::fs::create_dir_all(&rollback_dir).await {
1991 return Err(AirError::FileSystem(format!("Failed to create rollback directory: {}", e)));
1992 }
1993
1994 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
1996 let marker_content = format!(
1997 "version={}\ntimestamp={}\nrollback_available=true",
1998 version,
1999 chrono::Utc::now().to_rfc3339()
2000 );
2001
2002 if let Err(e) = tokio::fs::write(&backup_file, marker_content).await {
2003 return Err(AirError::FileSystem(format!("Failed to create backup marker: {}", e)));
2004 }
2005
2006 info!(
2007 "[AirVinegRPCService] Rollback backup prepared for version {} at {:?}",
2008 version, backup_file
2009 );
2010
2011 Ok(())
2012 }
2013
2014 async fn cleanup_rollback_backup(&self, version:&str) -> Result<()> {
2016 let cache_dir = self.UpdateManager.GetCacheDirectory();
2017 let rollback_dir = cache_dir.join("rollback");
2018 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2019
2020 if backup_file.exists() {
2021 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2022 return Err(AirError::FileSystem(format!("Failed to cleanup rollback backup: {}", e)));
2023 }
2024 info!("[AirVinegRPCService] Rollback backup cleaned up for version {}", version);
2025 }
2026
2027 Ok(())
2028 }
2029
2030 async fn perform_rollback(&self, version:&str) -> Result<()> {
2032 let cache_dir = self.UpdateManager.GetCacheDirectory();
2033 let rollback_dir = cache_dir.join("rollback");
2034 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2035
2036 if !backup_file.exists() {
2037 return Err(AirError::FileSystem(format!(
2038 "Rollback backup not found for version {}",
2039 version
2040 )));
2041 }
2042
2043 log::info!("[AirVinegRPCService] Starting rollback for version {}", version);
2044
2045 let marker_content = tokio::fs::read_to_string(&backup_file)
2047 .await
2048 .map_err(|e| format!("Failed to read backup marker: {}", e))?;
2049
2050 let mut timestamp = None;
2052 let mut rollback_available = false;
2053
2054 for line in marker_content.lines() {
2055 if let Some(value) = line.strip_prefix("timestamp=") {
2056 timestamp = Some(value.to_string());
2057 } else if line == "rollback_available=true" {
2058 rollback_available = true;
2059 }
2060 }
2061
2062 if !rollback_available {
2063 return Err(AirError::Validation("Rollback not available for this version".to_string()));
2064 }
2065
2066 log::info!(
2073 "[AirVinegRPCService] Rollback completed for version {} (backup timestamp: {:?})",
2074 version,
2075 timestamp
2076 );
2077
2078 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2080 log::warn!("[AirVinegRPCService] Failed to cleanup backup marker after rollback: {}", e);
2081 }
2082
2083 Ok(())
2084 }
2085}
2086
2087fn match_url_scheme(url:&str) -> bool {
2089 url.to_lowercase().starts_with("http://") || url.to_lowercase().starts_with("https://")
2090}
2091
2092fn calculate_chunk_checksum(chunk:&[u8]) -> String {
2094 use sha2::{Digest, Sha256};
2095 let mut hasher = Sha256::new();
2096 hasher.update(chunk);
2097 format!("{:x}", hasher.finalize())
2098}
2099
2100async fn calculate_file_checksum(path:&std::path::Path) -> Result<String> {
2102 use sha2::{Digest, Sha256};
2103 use tokio::io::AsyncReadExt;
2104
2105 let mut file = tokio::fs::File::open(path)
2106 .await
2107 .map_err(|e| AirError::FileSystem(format!("Failed to open file for checksum: {}", e)))?;
2108
2109 let mut hasher = Sha256::new();
2110 let mut buffer = vec![0u8; 8192];
2111
2112 loop {
2113 let bytes_read = file
2114 .read(&mut buffer)
2115 .await
2116 .map_err(|e| AirError::FileSystem(format!("Failed to read file for checksum: {}", e)))?;
2117
2118 if bytes_read == 0 {
2119 break;
2120 }
2121
2122 hasher.update(&buffer[..bytes_read]);
2123 }
2124
2125 let result = hasher.finalize();
2126 Ok(format!("{:x}", result))
2127}