1use std::{
72 collections::HashMap,
73 sync::Arc,
74 time::{Duration, Instant},
75};
76
77use tokio::sync::{Mutex, RwLock, broadcast};
78use serde::{Deserialize, Serialize};
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
82pub enum ErrorClass {
83 Transient,
85
86 NonRetryable,
88
89 RateLimited,
91
92 ServerError,
94
95 Unknown,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct RetryPolicy {
102 pub MaxRetries:u32,
104
105 pub InitialIntervalMs:u64,
107
108 pub MaxIntervalMs:u64,
110
111 pub BackoffMultiplier:f64,
113
114 pub JitterFactor:f64,
116
117 pub BudgetPerMinute:u32,
119
120 pub ErrorClassification:HashMap<String, ErrorClass>,
122}
123
124impl Default for RetryPolicy {
125 fn default() -> Self {
126 let mut ErrorClassification = HashMap::new();
127
128 ErrorClassification.insert("timeout".to_string(), ErrorClass::Transient);
130
131 ErrorClassification.insert("connection_refused".to_string(), ErrorClass::Transient);
132
133 ErrorClassification.insert("connection_reset".to_string(), ErrorClass::Transient);
134
135 ErrorClassification.insert("rate_limit_exceeded".to_string(), ErrorClass::RateLimited);
136
137 ErrorClassification.insert("authentication_failed".to_string(), ErrorClass::NonRetryable);
138
139 ErrorClassification.insert("unauthorized".to_string(), ErrorClass::NonRetryable);
140
141 ErrorClassification.insert("not_found".to_string(), ErrorClass::NonRetryable);
142
143 ErrorClassification.insert("server_error".to_string(), ErrorClass::ServerError);
144
145 ErrorClassification.insert("internal_server_error".to_string(), ErrorClass::ServerError);
146
147 ErrorClassification.insert("service_unavailable".to_string(), ErrorClass::ServerError);
148
149 ErrorClassification.insert("gateway_timeout".to_string(), ErrorClass::Transient);
150
151 Self {
152 MaxRetries:3,
153
154 InitialIntervalMs:1000,
155
156 MaxIntervalMs:32000,
157
158 BackoffMultiplier:2.0,
159
160 JitterFactor:0.1,
161
162 BudgetPerMinute:100,
163
164 ErrorClassification,
165 }
166 }
167}
168
169#[derive(Debug, Clone)]
171struct RetryBudget {
172 Attempts:Vec<Instant>,
173
174 MaxPerMinute:u32,
175}
176
177impl RetryBudget {
178 fn new(MaxPerMinute:u32) -> Self { Self { Attempts:Vec::new(), MaxPerMinute } }
179
180 fn can_retry(&mut self) -> bool {
181 let Now = Instant::now();
182
183 let OneMinuteAgo = Now - Duration::from_secs(60);
184
185 self.Attempts.retain(|&attempt| attempt > OneMinuteAgo);
187
188 if self.Attempts.len() < self.MaxPerMinute as usize {
189 self.Attempts.push(Now);
190
191 true
192 } else {
193 false
194 }
195 }
196}
197
198pub struct RetryManager {
200 Policy:RetryPolicy,
201
202 Budgets:Arc<Mutex<HashMap<String, RetryBudget>>>,
203
204 EventTx:Arc<broadcast::Sender<RetryEvent>>,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct RetryEvent {
210 pub Service:String,
211
212 pub Attempt:u32,
213
214 pub ErrorClass:ErrorClass,
215
216 pub DelayMs:u64,
217
218 pub Success:bool,
219
220 pub ErrorMessage:Option<String>,
221}
222
223impl RetryManager {
224 pub fn new(policy:RetryPolicy) -> Self {
226 let (EventTx, _) = broadcast::channel(1000);
227
228 Self {
229 Policy:policy,
230
231 Budgets:Arc::new(Mutex::new(HashMap::new())),
232
233 EventTx:Arc::new(EventTx),
234 }
235 }
236
237 pub fn GetEventTransmitter(&self) -> broadcast::Sender<RetryEvent> { (*self.EventTx).clone() }
239
240 pub fn CalculateRetryDelay(&self, Attempt:u32) -> Duration {
242 if Attempt == 0 {
243 return Duration::from_millis(0);
244 }
245
246 let BaseDelay = (self.Policy.InitialIntervalMs as f64 * self.Policy.BackoffMultiplier.powi(Attempt as i32 - 1))
247 .min(self.Policy.MaxIntervalMs as f64) as u64;
248
249 let Jitter = (BaseDelay as f64 * self.Policy.JitterFactor) as u64;
251
252 let RandomJitter = (rand::random::<f64>() * Jitter as f64) as u64;
253
254 let FinalDelay = BaseDelay + RandomJitter;
255
256 Duration::from_millis(FinalDelay)
257 }
258
259 pub fn CalculateAdaptiveRetryDelay(&self, ErrorType:&str, attempt:u32) -> Duration {
261 let ErrorClass = self
262 .Policy
263 .ErrorClassification
264 .get(ErrorType)
265 .copied()
266 .unwrap_or(ErrorClass::Unknown);
267
268 match ErrorClass {
269 ErrorClass::RateLimited => {
270 let delay = (attempt + 1) * 5000;
273
274 Duration::from_millis(delay as u64)
275 },
276
277 ErrorClass::ServerError => {
278 let BaseDelay = self.Policy.InitialIntervalMs * 2_u64.pow(attempt);
280
281 Duration::from_millis(BaseDelay.min(self.Policy.MaxIntervalMs))
282 },
283
284 ErrorClass::Transient => {
285 self.CalculateRetryDelay(attempt)
287 },
288
289 ErrorClass::NonRetryable | ErrorClass::Unknown => {
290 Duration::from_millis(100)
292 },
293 }
294 }
295
296 pub fn ClassifyError(&self, ErrorMessage:&str) -> ErrorClass {
298 let ErrorLower = ErrorMessage.to_lowercase();
299
300 for (pattern, class) in &self.Policy.ErrorClassification {
301 if ErrorLower.contains(pattern) {
302 return *class;
303 }
304 }
305
306 ErrorClass::Unknown
307 }
308
309 pub async fn CanRetry(&self, service:&str) -> bool {
312 let mut budgets = self.Budgets.lock().await;
313
314 let budget = budgets
315 .entry(service.to_string())
316 .or_insert_with(|| RetryBudget::new(self.Policy.BudgetPerMinute));
317
318 budget.can_retry()
319 }
320
321 pub fn PublishRetryEvent(&self, event:RetryEvent) { let _ = self.EventTx.send(event); }
323
324 pub fn ValidatePolicy(&self) -> Result<(), String> {
326 if self.Policy.MaxRetries == 0 {
327 return Err("MaxRetries must be greater than 0".to_string());
328 }
329
330 if self.Policy.InitialIntervalMs == 0 {
331 return Err("InitialIntervalMs must be greater than 0".to_string());
332 }
333
334 if self.Policy.InitialIntervalMs > self.Policy.MaxIntervalMs {
335 return Err("InitialIntervalMs cannot be greater than MaxIntervalMs".to_string());
336 }
337
338 if self.Policy.BackoffMultiplier <= 1.0 {
339 return Err("BackoffMultiplier must be greater than 1.0".to_string());
340 }
341
342 if self.Policy.JitterFactor < 0.0 || self.Policy.JitterFactor > 1.0 {
343 return Err("JitterFactor must be between 0 and 1".to_string());
344 }
345
346 if self.Policy.BudgetPerMinute == 0 {
347 return Err("BudgetPerMinute must be greater than 0".to_string());
348 }
349
350 Ok(())
351 }
352}
353
354#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
356pub enum CircuitState {
357 Closed,
359
360 Open,
362
363 HalfOpen,
365}
366
367#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
369pub struct CircuitBreakerConfig {
370 pub FailureThreshold:u32,
372
373 pub SuccessThreshold:u32,
375
376 pub TimeoutSecs:u64,
378}
379
380impl Default for CircuitBreakerConfig {
381 fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct CircuitEvent {
387 pub name:String,
388
389 pub FromState:CircuitState,
390
391 pub ToState:CircuitState,
392
393 pub timestamp:u64,
394
395 pub reason:String,
396}
397
398pub struct CircuitBreaker {
401 Name:String,
402
403 State:Arc<RwLock<CircuitState>>,
404
405 Config:CircuitBreakerConfig,
406
407 FailureCount:Arc<RwLock<u32>>,
408
409 SuccessCount:Arc<RwLock<u32>>,
410
411 LastFailureTime:Arc<RwLock<Option<Instant>>>,
412
413 EventTx:Arc<broadcast::Sender<CircuitEvent>>,
414
415 StateTransitionCounter:Arc<RwLock<u32>>,
416}
417
418impl CircuitBreaker {
419 pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
421 let (EventTx, _) = broadcast::channel(1000);
422
423 Self {
424 Name:name.clone(),
425
426 State:Arc::new(RwLock::new(CircuitState::Closed)),
427
428 Config,
429
430 FailureCount:Arc::new(RwLock::new(0)),
431
432 SuccessCount:Arc::new(RwLock::new(0)),
433
434 LastFailureTime:Arc::new(RwLock::new(None)),
435
436 EventTx:Arc::new(EventTx),
437
438 StateTransitionCounter:Arc::new(RwLock::new(0)),
439 }
440 }
441
442 pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
444
445 pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
447
448 pub async fn ValidateState(&self) -> Result<(), String> {
450 let state = *self.State.read().await;
451
452 let failures = *self.FailureCount.read().await;
453
454 let successes = *self.SuccessCount.read().await;
455
456 match state {
457 CircuitState::Closed => {
458 if successes != 0 {
459 return Err(format!("Inconsistent state: Closed but has {} successes", successes));
460 }
461
462 if failures >= self.Config.FailureThreshold {
463 log::warn!(
464 "[CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
465 failures,
466 self.Config.FailureThreshold
467 );
468 }
469 },
470
471 CircuitState::Open => {
472 if failures < self.Config.FailureThreshold {
473 log::warn!(
474 "[CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
475 failures,
476 self.Config.FailureThreshold
477 );
478 }
479 },
480
481 CircuitState::HalfOpen => {
482 if successes >= self.Config.SuccessThreshold {
483 return Err(format!(
484 "Inconsistent state: HalfOpen but has {} successes (should be Closed)",
485 successes
486 ));
487 }
488 },
489 }
490
491 Ok(())
492 }
493
494 async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
496 let CurrentState = self.GetState().await;
497
498 if CurrentState == NewState {
499 return Ok(());
501 }
502
503 match (CurrentState, NewState) {
505 (CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
506 },
508
509 (CircuitState::Open, CircuitState::HalfOpen) => {
510 },
512
513 (CircuitState::HalfOpen, CircuitState::Closed) => {
514 },
516
517 _ => {
518 return Err(format!(
519 "Invalid state transition from {:?} to {:?} for {}",
520 CurrentState, NewState, self.Name
521 ));
522 },
523 }
524
525 let event = CircuitEvent {
527 name:self.Name.clone(),
528
529 FromState:CurrentState,
530
531 ToState:NewState,
532
533 timestamp:crate::Utility::CurrentTimestamp(),
534
535 reason:reason.to_string(),
536 };
537
538 let _ = self.EventTx.send(event);
539
540 *self.State.write().await = NewState;
542
543 *self.StateTransitionCounter.write().await += 1;
545
546 log::info!(
547 "[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
548 self.Name,
549 CurrentState,
550 NewState,
551 reason
552 );
553
554 self.ValidateState().await.map_err(|e| {
556 log::error!("[CircuitBreaker] State validation failed after transition: {}", e);
557
558 e
559 })?;
560
561 Ok(())
562 }
563
564 pub async fn RecordSuccess(&self) {
566 let state = self.GetState().await;
567
568 match state {
569 CircuitState::Closed => {
570 *self.FailureCount.write().await = 0;
572 },
573
574 CircuitState::HalfOpen => {
575 let mut SuccessCount = self.SuccessCount.write().await;
577
578 *SuccessCount += 1;
579
580 if *SuccessCount >= self.Config.SuccessThreshold {
581 let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
583
584 *self.FailureCount.write().await = 0;
585
586 *self.SuccessCount.write().await = 0;
587 }
588 },
589
590 _ => {},
591 }
592 }
593
594 pub async fn RecordFailure(&self) {
596 let State = self.GetState().await;
597
598 *self.LastFailureTime.write().await = Some(Instant::now());
599
600 match State {
601 CircuitState::Closed => {
602 let mut FailureCount = self.FailureCount.write().await;
604
605 *FailureCount += 1;
606
607 if *FailureCount >= self.Config.FailureThreshold {
608 let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
610
611 *self.SuccessCount.write().await = 0;
612 }
613 },
614
615 CircuitState::HalfOpen => {
616 let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
618
619 *self.SuccessCount.write().await = 0;
620 },
621
622 _ => {},
623 }
624 }
625
626 pub async fn AttemptRecovery(&self) -> bool {
629 let state = self.GetState().await;
630
631 if state != CircuitState::Open {
632 return state == CircuitState::HalfOpen;
633 }
634
635 if let Some(last_failure) = *self.LastFailureTime.read().await {
636 if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
637 let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
638
639 *self.SuccessCount.write().await = 0;
640
641 return true;
642 }
643 }
644
645 false
646 }
647
648 pub async fn GetStatistics(&self) -> CircuitStatistics {
650 CircuitStatistics {
651 Name:self.Name.clone(),
652
653 State:self.GetState().await,
654
655 Failures:*self.FailureCount.read().await,
656
657 Successes:*self.SuccessCount.read().await,
658
659 StateTransitions:*self.StateTransitionCounter.read().await,
660
661 LastFailureTime:*self.LastFailureTime.read().await,
662 }
663 }
664
665 pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
667 if config.FailureThreshold == 0 {
668 return Err("FailureThreshold must be greater than 0".to_string());
669 }
670
671 if config.SuccessThreshold == 0 {
672 return Err("SuccessThreshold must be greater than 0".to_string());
673 }
674
675 if config.TimeoutSecs == 0 {
676 return Err("TimeoutSecs must be greater than 0".to_string());
677 }
678
679 Ok(())
680 }
681}
682
683#[derive(Debug, Clone, Serialize)]
685pub struct CircuitStatistics {
686 pub Name:String,
687
688 pub State:CircuitState,
689
690 pub Failures:u32,
691
692 pub Successes:u32,
693
694 pub StateTransitions:u32,
695
696 #[serde(skip_serializing)]
697 pub LastFailureTime:Option<Instant>,
698}
699
700impl<'de> Deserialize<'de> for CircuitStatistics {
701 fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
702 where
703 D: serde::Deserializer<'de>, {
704 use serde::de::{self, Visitor};
705
706 struct CircuitStatisticsVisitor;
707
708 impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
709 type Value = CircuitStatistics;
710
711 fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
712 formatter.write_str("struct CircuitStatistics")
713 }
714
715 fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
716 where
717 A: de::MapAccess<'de>, {
718 let mut Name = None;
719
720 let mut State = None;
721
722 let mut Failures = None;
723
724 let mut Successes = None;
725
726 let mut StateTransitions = None;
727
728 while let Some(key) = map.next_key::<String>()? {
729 match key.as_str() {
730 "name" => Name = Some(map.next_value()?),
731
732 "state" => State = Some(map.next_value()?),
733
734 "failures" => Failures = Some(map.next_value()?),
735
736 "successes" => Successes = Some(map.next_value()?),
737
738 "state_transitions" => StateTransitions = Some(map.next_value()?),
739
740 _ => {
741 map.next_value::<de::IgnoredAny>()?;
742 },
743 }
744 }
745
746 Ok(CircuitStatistics {
747 Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
748
749 State:State.ok_or_else(|| de::Error::missing_field("state"))?,
750
751 Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
752
753 Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
754
755 StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
756
757 LastFailureTime:None,
758 })
759 }
760 }
761
762 const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
763
764 Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
765 }
766}
767
768impl Clone for CircuitBreaker {
769 fn clone(&self) -> Self {
770 Self {
771 Name:self.Name.clone(),
772
773 State:self.State.clone(),
774
775 Config:self.Config.clone(),
776
777 FailureCount:self.FailureCount.clone(),
778
779 SuccessCount:self.SuccessCount.clone(),
780
781 LastFailureTime:self.LastFailureTime.clone(),
782
783 EventTx:self.EventTx.clone(),
784
785 StateTransitionCounter:self.StateTransitionCounter.clone(),
786 }
787 }
788}
789
790#[derive(Debug, Clone, Serialize, Deserialize)]
792pub struct BulkheadConfig {
793 pub max_concurrent:usize,
795
796 pub max_queue:usize,
798
799 pub timeout_secs:u64,
801}
802
803impl Default for BulkheadConfig {
804 fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
805}
806
807#[derive(Debug, Clone, Serialize, Deserialize)]
809pub struct BulkheadStatistics {
810 pub name:String,
811
812 pub current_concurrent:u32,
813
814 pub current_queue:u32,
815
816 pub max_concurrent:usize,
817
818 pub max_queue:usize,
819
820 pub total_rejected:u64,
821
822 pub total_completed:u64,
823
824 pub total_timed_out:u64,
825}
826
827pub struct BulkheadExecutor {
829 name:String,
830
831 semaphore:Arc<tokio::sync::Semaphore>,
832
833 config:BulkheadConfig,
834
835 current_requests:Arc<RwLock<u32>>,
836
837 queue_size:Arc<RwLock<u32>>,
838
839 total_rejected:Arc<RwLock<u64>>,
840
841 total_completed:Arc<RwLock<u64>>,
842
843 total_timed_out:Arc<RwLock<u64>>,
844}
845
846impl BulkheadExecutor {
847 pub fn new(name:String, config:BulkheadConfig) -> Self {
849 Self {
850 name:name.clone(),
851
852 semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
853
854 config,
855
856 current_requests:Arc::new(RwLock::new(0)),
857
858 queue_size:Arc::new(RwLock::new(0)),
859
860 total_rejected:Arc::new(RwLock::new(0)),
861
862 total_completed:Arc::new(RwLock::new(0)),
863
864 total_timed_out:Arc::new(RwLock::new(0)),
865 }
866 }
867
868 pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
870 if config.max_concurrent == 0 {
871 return Err("max_concurrent must be greater than 0".to_string());
872 }
873
874 if config.max_queue == 0 {
875 return Err("max_queue must be greater than 0".to_string());
876 }
877
878 if config.timeout_secs == 0 {
879 return Err("timeout_secs must be greater than 0".to_string());
880 }
881
882 Ok(())
883 }
884
885 pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
887 where
888 F: std::future::Future<Output = Result<R, String>>, {
889 async {
890 if self.config.timeout_secs == 0 {
892 return Err("Bulkhead timeout must be greater than 0".to_string());
893 }
894
895 let queue = *self.queue_size.read().await;
897
898 if queue >= self.config.max_queue as u32 {
899 *self.total_rejected.write().await += 1;
900
901 log::warn!("[Bulkhead] Queue full for {}, rejecting request", self.name);
902
903 return Err("Bulkhead queue full".to_string());
904 }
905
906 *self.queue_size.write().await += 1;
908
909 let _Permit =
911 match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
912 .await
913 {
914 Ok(Ok(_)) => {
915 *self.queue_size.write().await -= 1;
918 },
919
920 Ok(Err(e)) => {
921 *self.queue_size.write().await -= 1;
922
923 return Err(format!("Bulkhead semaphore error: {}", e));
924 },
925
926 Err(_) => {
927 *self.queue_size.write().await -= 1;
928
929 *self.total_timed_out.write().await += 1;
930
931 log::warn!("[Bulkhead] Timeout waiting for permit for {}", self.name);
932
933 return Err("Bulkhead timeout waiting for permit".to_string());
934 },
935 };
936
937 *self.queue_size.write().await -= 1;
939
940 *self.current_requests.write().await += 1;
941
942 let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
944
945 let execution_result:Result<R, String> = match execution_result {
946 Ok(Ok(value)) => Ok(value),
947
948 Ok(Err(e)) => Err(e),
949
950 Err(_) => {
951 *self.total_timed_out.write().await += 1;
952
953 Err("Bulkhead execution timeout".to_string())
954 },
955 };
956
957 if execution_result.is_ok() {
958 *self.total_completed.write().await += 1;
959 }
960
961 execution_result
962 }
963 .await
964 }
965
966 pub async fn GetLoad(&self) -> (u32, u32) {
968 async {
969 let current = *self.current_requests.read().await;
970
971 let queue = *self.queue_size.read().await;
972
973 (current, queue)
974 }
975 .await
976 }
977
978 pub async fn GetStatistics(&self) -> BulkheadStatistics {
980 async {
981 BulkheadStatistics {
982 name:self.name.clone(),
983
984 current_concurrent:*self.current_requests.read().await,
985
986 current_queue:*self.queue_size.read().await,
987
988 max_concurrent:self.config.max_concurrent,
989
990 max_queue:self.config.max_queue,
991
992 total_rejected:*self.total_rejected.read().await,
993
994 total_completed:*self.total_completed.read().await,
995
996 total_timed_out:*self.total_timed_out.read().await,
997 }
998 }
999 .await
1000 }
1001
1002 pub async fn GetUtilization(&self) -> f64 {
1004 let (current, _) = self.GetLoad().await;
1005
1006 if self.config.max_concurrent == 0 {
1007 return 0.0;
1008 }
1009
1010 (current as f64 / self.config.max_concurrent as f64) * 100.0
1011 }
1012}
1013
1014impl Clone for BulkheadExecutor {
1015 fn clone(&self) -> Self {
1016 Self {
1017 name:self.name.clone(),
1018
1019 semaphore:self.semaphore.clone(),
1020
1021 config:self.config.clone(),
1022
1023 current_requests:self.current_requests.clone(),
1024
1025 queue_size:self.queue_size.clone(),
1026
1027 total_rejected:self.total_rejected.clone(),
1028
1029 total_completed:self.total_completed.clone(),
1030
1031 total_timed_out:self.total_timed_out.clone(),
1032 }
1033 }
1034}
1035
1036#[derive(Debug, Clone)]
1038pub struct TimeoutManager {
1039 global_deadline:Option<Instant>,
1040
1041 operation_timeout:Duration,
1042}
1043
1044impl TimeoutManager {
1045 pub fn new(operation_timeout:Duration) -> Self { Self { global_deadline:None, operation_timeout } }
1047
1048 pub fn with_deadline(global_deadline:Instant, operation_timeout:Duration) -> Self {
1050 Self { global_deadline:Some(global_deadline), operation_timeout }
1051 }
1052
1053 pub fn ValidateTimeout(timeout:Duration) -> Result<(), String> {
1055 if timeout.is_zero() {
1056 return Err("Timeout must be greater than 0".to_string());
1057 }
1058
1059 if timeout.as_secs() > 3600 {
1060 return Err("Timeout cannot exceed 1 hour".to_string());
1061 }
1062
1063 Ok(())
1064 }
1065
1066 pub fn ValidateTimeoutResult(timeout:Duration) -> Result<Duration, String> {
1068 if timeout.is_zero() {
1069 return Err("Timeout must be greater than 0".to_string());
1070 }
1071
1072 if timeout.as_secs() > 3600 {
1073 return Err("Timeout cannot exceed 1 hour".to_string());
1074 }
1075
1076 Ok(timeout)
1077 }
1078
1079 pub fn remaining(&self) -> Option<Duration> {
1081 self.global_deadline.map(|deadline| {
1082 deadline
1083 .checked_duration_since(Instant::now())
1084 .unwrap_or(Duration::from_secs(0))
1085 })
1086 }
1087
1088 pub fn Remaining(&self) -> Option<Duration> {
1090 std::panic::catch_unwind(|| self.remaining()).unwrap_or_else(|e| {
1091 log::error!("[TimeoutManager] Panic in Remaining: {:?}", e);
1092
1093 None
1094 })
1095 }
1096
1097 pub fn effective_timeout(&self) -> Duration {
1099 match self.remaining() {
1100 Some(remaining) => self.operation_timeout.min(remaining),
1101
1102 None => self.operation_timeout,
1103 }
1104 }
1105
1106 pub fn EffectiveTimeout(&self) -> Duration {
1108 std::panic::catch_unwind(|| {
1109 let timeout = self.effective_timeout();
1110
1111 match Self::ValidateTimeoutResult(timeout) {
1112 Ok(valid_timeout) => valid_timeout,
1113
1114 Err(_) => Duration::from_secs(30),
1115 }
1116 })
1117 .unwrap_or_else(|e| {
1118 log::error!("[TimeoutManager] Panic in EffectiveTimeout: {:?}", e);
1119
1120 Duration::from_secs(30)
1121 })
1122 }
1123
1124 pub fn is_exceeded(&self) -> bool { self.global_deadline.map_or(false, |deadline| Instant::now() >= deadline) }
1126
1127 pub fn IsExceeded(&self) -> bool {
1129 std::panic::catch_unwind(|| self.is_exceeded()).unwrap_or_else(|e| {
1130 log::error!("[TimeoutManager] Panic in IsExceeded: {:?}", e);
1131
1132 true })
1134 }
1135
1136 pub fn GetGlobalDeadline(&self) -> Option<Instant> { self.global_deadline }
1138
1139 pub fn GetOperationTimeout(&self) -> Duration { self.operation_timeout }
1141}
1142
1143pub struct ResilienceOrchestrator {
1145 retry_manager:Arc<RetryManager>,
1146
1147 circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
1148
1149 bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
1150}
1151
1152impl ResilienceOrchestrator {
1153 pub fn new(retry_policy:RetryPolicy) -> Self {
1155 Self {
1156 retry_manager:Arc::new(RetryManager::new(retry_policy)),
1157
1158 circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
1159
1160 bulkheads:Arc::new(RwLock::new(HashMap::new())),
1161 }
1162 }
1163
1164 pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
1166 let mut breakers = self.circuit_breakers.write().await;
1167
1168 Arc::new(
1169 breakers
1170 .entry(service.to_string())
1171 .or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
1172 .clone(),
1173 )
1174 }
1175
1176 pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
1178 let mut bulkheads = self.bulkheads.write().await;
1179
1180 Arc::new(
1181 bulkheads
1182 .entry(service.to_string())
1183 .or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
1184 .clone(),
1185 )
1186 }
1187
1188 pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
1190 let breakers = self.circuit_breakers.read().await;
1191
1192 let mut stats = Vec::new();
1193
1194 for breaker in breakers.values() {
1195 stats.push(breaker.GetStatistics().await);
1196 }
1197
1198 stats
1199 }
1200
1201 pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
1203 let bulkheads = self.bulkheads.read().await;
1204
1205 let mut stats = Vec::new();
1206
1207 for bulkhead in bulkheads.values() {
1208 stats.push(bulkhead.GetStatistics().await);
1209 }
1210
1211 stats
1212 }
1213
1214 pub async fn ExecuteResilient<F, R>(
1216 &self,
1217
1218 service:&str,
1219
1220 retry_policy:&RetryPolicy,
1221
1222 circuit_config:CircuitBreakerConfig,
1223
1224 bulkhead_config:BulkheadConfig,
1225
1226 f:F,
1227 ) -> Result<R, String>
1228 where
1229 F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
1230 if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
1232 return Err(format!("Invalid circuit breaker config: {}", e));
1233 }
1234
1235 if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
1236 return Err(format!("Invalid bulkhead config: {}", e));
1237 }
1238
1239 let breaker = self.GetCircuitBreaker(service, circuit_config).await;
1240
1241 let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
1242
1243 if breaker.GetState().await == CircuitState::Open {
1245 if !breaker.AttemptRecovery().await {
1246 return Err("Circuit breaker is open".to_string());
1247 }
1248 }
1249
1250 let mut Attempt = 0;
1252
1253 let _LastError = "".to_string();
1254
1255 loop {
1256 let result = bulkhead.Execute(f()).await;
1257
1258 match result {
1259 Ok(Value) => {
1260 breaker.RecordSuccess().await;
1261
1262 let Event = RetryEvent {
1264 Service:service.to_string(),
1265
1266 Attempt,
1267
1268 ErrorClass:ErrorClass::Unknown,
1269
1270 DelayMs:0,
1271
1272 Success:true,
1273
1274 ErrorMessage:None,
1275 };
1276
1277 self.retry_manager.PublishRetryEvent(Event);
1278
1279 return Ok(Value);
1280 },
1281
1282 Err(E) => {
1283 let ErrorClass = self.retry_manager.ClassifyError(&E);
1284
1285 breaker.RecordFailure().await;
1286
1287 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1289
1290 let Event = RetryEvent {
1291 Service:service.to_string(),
1292
1293 Attempt,
1294
1295 ErrorClass,
1296
1297 DelayMs:Delay.as_millis() as u64,
1298
1299 Success:false,
1300
1301 ErrorMessage:Some(self.redact_sensitive_data(&E)),
1302 };
1303
1304 self.retry_manager.PublishRetryEvent(Event);
1305
1306 if Attempt < retry_policy.MaxRetries
1307 && ErrorClass != ErrorClass::NonRetryable
1308 && self.retry_manager.CanRetry(service).await
1309 {
1310 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1311
1312 log::debug!(
1313 "[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
1314 service,
1315 Attempt + 1,
1316 retry_policy.MaxRetries,
1317 Delay,
1318 self.redact_sensitive_data(&E)
1319 );
1320
1321 tokio::time::sleep(Delay).await;
1322
1323 Attempt += 1;
1324 } else {
1325 return Err(E);
1326 }
1327 },
1328 }
1329 }
1330 }
1331
1332 fn redact_sensitive_data(&self, message:&str) -> String {
1335 let mut redacted = message.to_string();
1336
1337 let patterns = vec![
1339 (r"(?i)password[=:]\S+", "password=[REDACTED]"),
1340 (r"(?i)token[=:]\S+", "token=[REDACTED]"),
1341 (r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
1342 (r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
1343 (
1344 r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
1345 "Authorization: Bearer [REDACTED]",
1346 ),
1347 (r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
1348 (r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
1349 ];
1350
1351 for (pattern, replacement) in patterns {
1352 if let Ok(re) = regex::Regex::new(pattern) {
1353 redacted = re.replace_all(&redacted, replacement).to_string();
1354 }
1355 }
1356
1357 redacted
1358 }
1359
1360 pub fn ValidateConfigurations(
1362 &self,
1363
1364 _RetryPolicy:&RetryPolicy,
1365
1366 CircuitConfig:&CircuitBreakerConfig,
1367
1368 BulkheadConfig:&BulkheadConfig,
1369 ) -> Result<(), String> {
1370 self.retry_manager.ValidatePolicy()?;
1371
1372 CircuitBreaker::ValidateConfig(CircuitConfig)?;
1373
1374 BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1375
1376 TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1377
1378 Ok(())
1379 }
1380}
1381
1382impl Clone for ResilienceOrchestrator {
1383 fn clone(&self) -> Self {
1384 Self {
1385 retry_manager:self.retry_manager.clone(),
1386
1387 circuit_breakers:self.circuit_breakers.clone(),
1388
1389 bulkheads:self.bulkheads.clone(),
1390 }
1391 }
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396 use super::*;
1397
1398 #[test]
1399 fn test_retry_delay_calculation() {
1400 let policy = RetryPolicy::default();
1401
1402 let manager = RetryManager::new(policy);
1403
1404 let delay_1 = manager.CalculateRetryDelay(1);
1405
1406 let delay_2 = manager.CalculateRetryDelay(2);
1407
1408 assert!(delay_2 >= delay_1);
1410 }
1411
1412 #[test]
1413 fn test_adaptive_retry_delay() {
1414 let policy = RetryPolicy::default();
1415
1416 let manager = RetryManager::new(policy);
1417
1418 let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1420
1421 let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1422
1423 assert!(rate_limit_delay >= transient_delay);
1424 }
1425
1426 #[test]
1427 fn test_error_classification() {
1428 let policy = RetryPolicy::default();
1429
1430 let manager = RetryManager::new(policy);
1431
1432 assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1433
1434 assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1435
1436 assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1437
1438 assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1439 }
1440
1441 #[test]
1442 fn test_policy_validation() {
1443 let policy = RetryPolicy::default();
1444
1445 let manager = RetryManager::new(policy);
1446
1447 assert!(manager.ValidatePolicy().is_ok());
1448
1449 let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1450
1451 let invalid_manager = RetryManager::new(invalid_policy);
1452
1453 assert!(invalid_manager.ValidatePolicy().is_err());
1454 }
1455
1456 #[tokio::test]
1457 async fn test_circuit_breaker_state_transitions() {
1458 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1459
1460 let breaker = CircuitBreaker::new("test".to_string(), config);
1461
1462 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1463
1464 breaker.RecordFailure().await;
1465
1466 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1467
1468 breaker.RecordFailure().await;
1469
1470 assert_eq!(breaker.GetState().await, CircuitState::Open);
1471
1472 assert!(breaker.AttemptRecovery().await);
1473
1474 assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1475
1476 breaker.RecordSuccess().await;
1477
1478 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1479 }
1480
1481 #[tokio::test]
1482 async fn test_circuit_breaker_validation() {
1483 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1484
1485 let breaker = CircuitBreaker::new("test".to_string(), config);
1486
1487 assert!(breaker.ValidateState().await.is_ok());
1489
1490 breaker.RecordFailure().await;
1492
1493 breaker.RecordFailure().await;
1494
1495 let validate_result = breaker.ValidateState().await;
1496
1497 assert!(validate_result.is_ok() || validate_result.is_err());
1499 }
1500
1501 #[test]
1502 fn test_circuit_breaker_config_validation() {
1503 let valid_config = CircuitBreakerConfig::default();
1504
1505 assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1506
1507 let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1508
1509 assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1510 }
1511
1512 #[tokio::test]
1513 async fn test_bulkhead_resource_isolation() {
1514 let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1515
1516 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1517
1518 let (_current, _queue) = bulkhead.GetLoad().await;
1519
1520 assert_eq!(_current, 0);
1521
1522 assert_eq!(_queue, 0);
1523
1524 let stats = bulkhead.GetStatistics().await;
1525
1526 assert_eq!(stats.current_concurrent, 0);
1527
1528 assert_eq!(stats.current_queue, 0);
1529
1530 assert_eq!(stats.max_concurrent, 2);
1531
1532 assert_eq!(stats.max_queue, 5);
1533 }
1534
1535 #[tokio::test]
1536 async fn test_bulkhead_utilization() {
1537 let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1538
1539 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1540
1541 let utilization = bulkhead.GetUtilization().await;
1542
1543 assert_eq!(utilization, 0.0);
1544 }
1545
1546 #[test]
1547 fn test_bulkhead_config_validation() {
1548 let valid_config = BulkheadConfig::default();
1549
1550 assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1551
1552 let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1553
1554 assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1555 }
1556
1557 #[test]
1558 fn test_timeout_manager() {
1559 let manager = TimeoutManager::new(Duration::from_secs(30));
1560
1561 assert!(!manager.IsExceeded());
1562
1563 assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1564
1565 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1566
1567 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1568 }
1569
1570 #[test]
1571 fn test_timeout_manager_with_deadline() {
1572 let deadline = Instant::now() + Duration::from_secs(60);
1573
1574 let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1575
1576 let remaining = manager.Remaining();
1577
1578 assert!(remaining.is_some());
1579
1580 assert!(remaining.unwrap() <= Duration::from_secs(60));
1581 }
1582}