1use crate::pending::{GuardStatus, RequestId};
7use crate::GuardMgrInner;
8
9use futures::{channel::mpsc, stream::StreamExt};
10#[cfg(test)]
11use oneshot_fused_workaround as oneshot;
12use tor_proto::ClockSkew;
13
14use std::sync::{Mutex, Weak};
15
16#[derive(Debug)]
18pub(crate) enum Msg {
19 Status(RequestId, GuardStatus, Option<ClockSkew>),
22 #[cfg(test)]
26 Ping(oneshot::Sender<()>),
27}
28
29pub(crate) async fn report_status_events(
38 runtime: impl tor_rtcompat::SleepProvider,
39 inner: Weak<Mutex<GuardMgrInner>>,
40 mut events: mpsc::UnboundedReceiver<Msg>,
41) {
42 loop {
43 match events.next().await {
44 Some(Msg::Status(id, status, skew)) => {
45 if let Some(inner) = inner.upgrade() {
47 let mut inner = inner.lock().expect("Poisoned lock");
48 inner.handle_msg(id, status, skew, &runtime);
49 } else {
50 return;
52 }
53 }
54 #[cfg(test)]
55 Some(Msg::Ping(sender)) => {
56 let _ignore = sender.send(());
57 }
58 None => return,
60 }
61 }
63}
64
65pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
74 runtime: R,
75 inner: Weak<Mutex<GuardMgrInner>>,
76) {
77 loop {
78 let delay = if let Some(inner) = inner.upgrade() {
79 let mut inner = inner.lock().expect("Poisoned lock");
80 let wallclock = runtime.wallclock();
81 let now = runtime.now();
82 inner.run_periodic_events(wallclock, now)
83 } else {
84 return;
86 };
87 runtime.sleep(delay).await;
88 }
89}
90
91pub(crate) async fn keep_netdir_updated<RT: tor_rtcompat::Runtime>(
94 runtime: RT,
95 inner: Weak<Mutex<GuardMgrInner>>,
96 netdir_provider: Weak<dyn tor_netdir::NetDirProvider>,
97) {
98 use tor_netdir::DirEvent;
99
100 let mut event_stream = match netdir_provider.upgrade().map(|p| p.events()) {
101 Some(s) => s,
102 None => return,
103 };
104
105 while let Some(event) = event_stream.next().await {
106 match event {
107 DirEvent::NewConsensus | DirEvent::NewDescriptors => {
108 if let Some(inner) = inner.upgrade() {
109 let mut inner = inner.lock().expect("Poisoned lock");
110 inner.update(runtime.wallclock(), runtime.now());
111 } else {
112 return;
113 }
114 }
115 _ => {}
116 }
117 }
118}
119
120#[cfg(feature = "bridge-client")]
123pub(crate) async fn keep_bridge_descs_updated<RT: tor_rtcompat::Runtime>(
124 runtime: RT,
125 inner: Weak<Mutex<GuardMgrInner>>,
126 bridge_desc_provider: Weak<dyn crate::bridge::BridgeDescProvider>,
127) {
128 use crate::bridge::BridgeDescEvent as E;
129 let mut event_stream = match bridge_desc_provider.upgrade().map(|p| p.events()) {
130 Some(s) => s,
131 None => return,
132 };
133
134 while let Some(event) = event_stream.next().await {
135 match event {
136 E::SomethingChanged => {
137 if let Some(inner) = inner.upgrade() {
138 let mut inner = inner.lock().expect("Poisoned lock");
139 inner.update(runtime.wallclock(), runtime.now());
140 } else {
141 return;
142 }
143 }
144 }
145 }
146}