tor_hsclient/
state.rs

1//! Implement a cache for onion descriptors and the facility to remember a bit
2//! about onion service history.
3
4use std::fmt::Debug;
5use std::mem;
6use std::panic::AssertUnwindSafe;
7use std::sync::{Arc, Mutex, MutexGuard};
8use std::time::{Duration, Instant};
9
10use futures::task::{SpawnError, SpawnExt as _};
11use futures::FutureExt as _;
12
13use async_trait::async_trait;
14use educe::Educe;
15use either::Either::{self, *};
16use postage::stream::Stream as _;
17use tracing::{debug, error, trace};
18
19use safelog::DisplayRedacted as _;
20use tor_basic_utils::define_accessor_trait;
21use tor_circmgr::isolation::Isolation;
22use tor_error::{debug_report, error_report, internal, Bug, ErrorReport as _};
23use tor_hscrypto::pk::HsId;
24use tor_netdir::NetDir;
25use tor_rtcompat::Runtime;
26
27use crate::isol_map;
28use crate::{ConnError, HsClientConnector, HsClientSecretKeys};
29
30slotmap_careful::new_key_type! {
31    struct TableIndex;
32}
33
34/// Configuration, currently just some retry parameters
35#[derive(Default, Debug)]
36// This is not really public.
37// It has to be `pub` because it appears in one of the methods in `MockableConnectorData`.
38// That has to be because that trait is a bound on a parameter for `HsClientConnector`.
39// `Config` is not re-exported.  (This is isomorphic to the trait sealing pattern.)
40//
41// This means that this struct cannot live in the crate root, so we put it here.
42pub struct Config {
43    /// Retry parameters
44    pub(crate) retry: tor_circmgr::CircuitTiming,
45}
46
47define_accessor_trait! {
48    /// Configuration for an HS client connector
49    ///
50    /// If the HS client connector gains new configurabilities, this trait will gain additional
51    /// supertraits, as an API break.
52    ///
53    /// Prefer to use `TorClientConfig`, which will always implement this trait.
54    //
55    // This arrangement is very like that for `CircMgrConfig`.
56    pub trait HsClientConnectorConfig {
57        circuit_timing: tor_circmgr::CircuitTiming,
58    }
59}
60
61/// Number of times we're willing to iterate round the state machine loop
62///
63/// **Not** the number of retries of failed descriptor downloads, circuits, etc.
64///
65/// The state machine loop is a condition variable loop.
66/// It repeatedly transforms the [`ServiceState`] to try to get to `Open`,
67/// converting stale data to `Closed` and `Closed` to `Working`, and so on.
68/// This ought only to go forwards so in principle we could use an infinite loop.
69/// But if we have a logic error, we want to crash eventually.
70/// The `rechecks` counter is for detecting such a situation.
71///
72/// This is fairly arbitrary, but we shouldn't get anywhere near it.
73///
74/// Note that this is **not** a number of operational retries
75/// of fallible retriable operations.
76/// Such retries are handled in [`connect.rs`](crate::connect).
77const MAX_RECHECKS: u32 = 10;
78
79/// C Tor `MaxCircuitDirtiness`
80///
81/// As per
82///    <https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914433>
83///
84/// And C Tor's `tor(1)`, which says:
85///
86/// > MaxCircuitDirtiness NUM
87/// >
88/// > Feel free to reuse a circuit that was first used at most NUM
89/// > seconds ago, but never attach a new stream to a circuit that is
90/// > too old.  For hidden services, this applies to the last time a
91/// > circuit was used, not the first.  Circuits with streams
92/// > constructed with SOCKS authentication via SocksPorts that have
93/// > KeepAliveIsolateSOCKSAuth also remain alive for
94/// > MaxCircuitDirtiness seconds after carrying the last such
95/// > stream. (Default: 10 minutes)
96///
97/// However, we're not entirely sure this is the right behaviour.
98/// See <https://gitlab.torproject.org/tpo/core/arti/-/issues/916>
99///
100// TODO SPEC: Explain C Tor `MaxCircuitDirtiness` behaviour
101//
102// TODO HS CFG: This should be configurable somehow
103const RETAIN_CIRCUIT_AFTER_LAST_USE: Duration = Duration::from_secs(10 * 60);
104
105/// How long to retain cached data about a hidden service
106///
107/// This is simply to reclaim space, not for correctness.
108/// So we only check this during housekeeping, not operation.
109///
110/// The starting point for this interval is the last time we used the data,
111/// or a circuit derived from it.
112///
113/// Note that this is a *maximum* for the length of time we will retain a descriptor;
114/// HS descriptors' lifetimes (as declared in the descriptor) *are* honoured;
115/// but that's done by the code in `connect.rs`, not here.
116///
117/// We're not sure this is the right value.
118/// See <https://gitlab.torproject.org/tpo/core/arti/-/issues/916>
119//
120// TODO SPEC: State how long IPT and descriptor data should be retained after use
121//
122// TODO HS CFG: Perhaps this should be configurable somehow?
123const RETAIN_DATA_AFTER_LAST_USE: Duration = Duration::from_secs(48 * 3600 /*hours*/);
124
125/// Hidden services;, our connections to them, and history of connections, etc.
126///
127/// Table containing state of our ideas about services.
128/// Data structure is keyed (indexed) by:
129///  * `HsId`, hidden service identity
130///  * any secret keys we are to use
131///  * circuit isolation
132///
133/// We treat different values for any of the above as completely independent,
134/// except that we try isolation joining (narrowing) if everything else matches.
135///
136/// In other words,
137///  * Two HS connection requests cannot share state and effort
138///    (descriptor downloads, descriptors, intro pt history)
139///    unless the restricted discovery keys to be used are the same.
140///  * This criterion is checked before looking at isolations,
141///    which may further restrict sharing:
142///    Two HS connection requests will only share state subject to isolations.
143///
144/// Here "state and effort" includes underlying circuits such as hsdir circuits,
145/// since each HS connection state will use `launch_specific_isolated` for those.
146#[derive(Default, Debug)]
147pub(crate) struct Services<D: MockableConnectorData> {
148    /// The actual records of our connections/attempts for each service, as separated
149    records: isol_map::MultikeyIsolatedMap<TableIndex, HsId, HsClientSecretKeys, ServiceState<D>>,
150
151    /// Configuration
152    ///
153    /// `Arc` so that it can be shared with individual hs connector tasks
154    config: Arc<Config>,
155}
156
157/// Entry in the 2nd-level lookup array
158#[allow(dead_code)] // This alias is here for documentation if nothing else
159type ServiceRecord<D> = isol_map::Record<HsClientSecretKeys, ServiceState<D>>;
160
161/// Value in the `Services` data structure
162///
163/// State and history of of our connections, including connection to any connection task.
164///
165/// `last_used` is used to expire data eventually.
166//
167// TODO unify this with channels and circuits.  See arti#778.
168#[derive(Educe)]
169#[educe(Debug)]
170enum ServiceState<D: MockableConnectorData> {
171    /// We don't have a circuit
172    Closed {
173        /// The state
174        data: D,
175        /// Last time we touched this, including reuse
176        last_used: Instant,
177    },
178    /// We have an open circuit, which we can (hopefully) just use
179    Open {
180        /// The state
181        data: D,
182        /// The circuit
183        #[educe(Debug(ignore))]
184        circuit: Arc<D::ClientCirc>,
185        /// Last time we touched this, including reuse
186        ///
187        /// This is set when we created the circuit, and updated when we
188        /// hand out this circuit again in response to a new request.
189        ///
190        /// We believe this mirrors C Tor behaviour;
191        /// see [`RETAIN_CIRCUIT_AFTER_LAST_USE`].
192        last_used: Instant,
193        /// We have a task that will close the circuit when required
194        ///
195        /// This field serves to require construction sites of Open
196        /// to demonstrate that there *is* an expiry task.
197        /// In the future, it may also serve to cancel old expiry tasks.
198        circuit_expiry_task: CircuitExpiryTask,
199    },
200    /// We have a task trying to find the service and establish the circuit
201    ///
202    /// CachedData is owned by the task.
203    Working {
204        /// Signals instances of `get_or_launch_connection` when the task completes
205        barrier_recv: postage::barrier::Receiver,
206        /// Where the task will store the error.
207        ///
208        /// Lock hierarchy: this lock is "inside" the big lock on `Services`.
209        error: Arc<Mutex<Option<ConnError>>>,
210    },
211    /// Dummy value for use with temporary mem replace
212    Dummy,
213}
214
215impl<D: MockableConnectorData> ServiceState<D> {
216    /// Make a new (blank) `ServiceState::Closed`
217    fn blank(runtime: &impl Runtime) -> Self {
218        ServiceState::Closed {
219            data: D::default(),
220            last_used: runtime.now(),
221        }
222    }
223}
224
225/// "Continuation" return type from `obtain_circuit_or_continuation_info`
226type Continuation = (Arc<Mutex<Option<ConnError>>>, postage::barrier::Receiver);
227
228/// Represents a task which is waiting to see when the circuit needs to be expired
229///
230/// TODO: Replace this with a task handle that cancels the task when dropped.
231/// Until then, if the circuit is closed before then, the expiry task will
232/// uselessly wake up some time later.
233#[derive(Debug)] // Not Clone
234struct CircuitExpiryTask {}
235// impl Drop already, partly to allow explicit drop(CircuitExpiryTask) without clippy complaint
236impl Drop for CircuitExpiryTask {
237    fn drop(&mut self) {}
238}
239
240/// Obtain a circuit from the `Services` table, or return a continuation
241///
242/// This is the workhorse function for `get_or_launch_connection`.
243///
244/// `get_or_launch_connection`, together with `obtain_circuit_or_continuation_info`,
245/// form a condition variable loop:
246///
247/// We check to see if we have a circuit.  If so, we return it.
248/// Otherwise, we make sure that a circuit is being constructed,
249/// and then go into a condvar wait;
250/// we'll be signaled when the construction completes.
251///
252/// So the connection task we spawn does not return the circuit, or error,
253/// via an inter-task stream.
254/// It stores it in the data structure and wakes up all the client tasks.
255/// (This means there is only one success path for the client task code.)
256///
257/// There are some wrinkles:
258///
259/// ### Existence of this as a separate function
260///
261/// The usual structure for a condition variable loop would be something like this:
262///
263/// ```rust,ignore
264/// loop {
265///    test state and maybe break;
266///    cv.wait(guard).await; // consumes guard, unlocking after enqueueing us as a waiter
267///    guard = lock();
268/// }
269/// ```
270///
271/// However, Rust does not currently understand that the mutex is not
272/// actually a captured variable held across an await point,
273/// when the variable is consumed before the await, and re-stored afterwards.
274/// As a result, the async future becomes erroneously `!Send`:
275/// <https://github.com/rust-lang/rust/issues/104883>.
276/// We want the unstable feature `-Zdrop-tracking`:
277/// <https://github.com/rust-lang/rust/issues/97331>.
278///
279/// Instead, to convince the compiler, we must use a scope-based drop of the mutex guard.
280/// That means converting the "test state and maybe break" part into a sub-function.
281/// That's what this function is.
282///
283/// It returns `Right` if the loop should be exited, returning the circuit to the caller.
284/// It returns `Left` if the loop needs to do a condition variable wait.
285///
286/// ### We're using a barrier as a condition variable
287///
288/// We want to be signaled when the task exits.  Indeed, *only* when it exits.
289/// This functionality is most conveniently in a `postage::barrier`.
290///
291/// ### Nested loops
292///
293/// Sometimes we want to go round again *without* unlocking.
294/// Sometimes we must unlock and wait and relock.
295///
296/// The drop tracking workaround (see above) means we have to do these two
297/// in separate scopes.
298/// So there are two nested loops: one here, and one in `get_or_launch_connection`.
299/// They both use the same backstop rechecks counter.
300fn obtain_circuit_or_continuation_info<D: MockableConnectorData>(
301    connector: &HsClientConnector<impl Runtime, D>,
302    netdir: &Arc<NetDir>,
303    hsid: &HsId,
304    secret_keys: &HsClientSecretKeys,
305    table_index: TableIndex,
306    rechecks: &mut impl Iterator,
307    mut guard: MutexGuard<'_, Services<D>>,
308) -> Result<Either<Continuation, Arc<D::ClientCirc>>, ConnError> {
309    let blank_state = || ServiceState::blank(&connector.runtime);
310
311    for _recheck in rechecks {
312        let record = guard
313            .records
314            .by_index_mut(table_index)
315            .ok_or_else(|| internal!("guard table entry vanished!"))?;
316        let state = &mut **record;
317
318        trace!("HS conn state: {state:?}");
319
320        let (data, barrier_send) = match state {
321            ServiceState::Open {
322                data: _,
323                circuit,
324                last_used,
325                circuit_expiry_task: _,
326            } => {
327                let now = connector.runtime.now();
328                if !D::circuit_is_ok(circuit) {
329                    // Well that's no good, we need a fresh one, but keep the data
330                    let data = match mem::replace(state, ServiceState::Dummy) {
331                        ServiceState::Open {
332                            data,
333                            last_used: _,
334                            circuit: _,
335                            circuit_expiry_task: _,
336                        } => data,
337                        _ => panic!("state changed between matches"),
338                    };
339                    *state = ServiceState::Closed {
340                        data,
341                        last_used: now,
342                    };
343                    continue;
344                }
345                *last_used = now;
346                // No need to tell expiry task about revised expiry time;
347                // it will see the new last_used when it wakes up at the old expiry time.
348
349                return Ok::<_, ConnError>(Right(circuit.clone()));
350            }
351            ServiceState::Working {
352                barrier_recv,
353                error,
354            } => {
355                if !matches!(
356                    barrier_recv.try_recv(),
357                    Err(postage::stream::TryRecvError::Pending)
358                ) {
359                    // This information is stale; the task no longer exists.
360                    // We want information from a fresh task.
361                    *state = blank_state();
362                    continue;
363                }
364                let barrier_recv = barrier_recv.clone();
365
366                // This clone of the error field Arc<Mutex<..>> allows us to collect errors
367                // which happened due to the currently-running task, which we have just
368                // found exists.  Ie, it will see errors that occurred after we entered
369                // `get_or_launch`.  Stale errors, from previous tasks, were cleared above.
370                let error = error.clone();
371
372                // Wait for the task to complete (at which point it drops the barrier)
373                return Ok(Left((error, barrier_recv)));
374            }
375            ServiceState::Closed { .. } => {
376                let (barrier_send, barrier_recv) = postage::barrier::channel();
377                let data = match mem::replace(
378                    state,
379                    ServiceState::Working {
380                        barrier_recv,
381                        error: Arc::new(Mutex::new(None)),
382                    },
383                ) {
384                    ServiceState::Closed { data, .. } => data,
385                    _ => panic!("state changed between matches"),
386                };
387                (data, barrier_send)
388            }
389            ServiceState::Dummy => {
390                *state = blank_state();
391                return Err(internal!("HS connector found dummy state").into());
392            }
393        };
394
395        // Make a connection
396        let runtime = &connector.runtime;
397        let connector = (*connector).clone();
398        let config = guard.config.clone();
399        let netdir = netdir.clone();
400        let secret_keys = secret_keys.clone();
401        let hsid = *hsid;
402        let connect_future = async move {
403            let mut data = data;
404
405            let got = AssertUnwindSafe(D::connect(
406                &connector,
407                netdir,
408                config,
409                hsid,
410                &mut data,
411                secret_keys,
412            ))
413            .catch_unwind()
414            .await
415            .unwrap_or_else(|_| {
416                data = D::default();
417                Err(internal!("hidden service connector task panicked!").into())
418            });
419            let now = connector.runtime.now();
420            let last_used = now;
421
422            let got = got.and_then(|circuit| {
423                let circuit_expiry_task = ServiceState::spawn_circuit_expiry_task(
424                    &connector,
425                    hsid,
426                    table_index,
427                    last_used,
428                    now,
429                )
430                .map_err(|cause| ConnError::Spawn {
431                    spawning: "circuit expiry task",
432                    cause: cause.into(),
433                })?;
434                Ok((circuit, circuit_expiry_task))
435            });
436
437            let got_error = got.as_ref().map(|_| ()).map_err(Clone::clone);
438
439            // block for handling inability to store
440            let stored = async {
441                let mut guard = connector.services()?;
442                let record = guard
443                    .records
444                    .by_index_mut(table_index)
445                    .ok_or_else(|| internal!("HS table entry removed while task running"))?;
446                // Always match this, so we check what we're overwriting
447                let state = &mut **record;
448                let error_store = match state {
449                    ServiceState::Working { error, .. } => error,
450                    _ => return Err(internal!("HS task found state other than Working")),
451                };
452
453                match got {
454                    Ok((circuit, circuit_expiry_task)) => {
455                        *state = ServiceState::Open {
456                            data,
457                            circuit,
458                            last_used,
459                            circuit_expiry_task,
460                        }
461                    }
462                    Err(error) => {
463                        let mut error_store = error_store
464                            .lock()
465                            .map_err(|_| internal!("Working error poisoned, cannot store error"))?;
466                        *error_store = Some(error);
467                    }
468                };
469
470                Ok(())
471            }
472            .await;
473
474            match (got_error, stored) {
475                (Ok::<(), ConnError>(()), Ok::<(), Bug>(())) => {}
476                (Err(got_error), Ok(())) => {
477                    debug_report!(
478                        got_error,
479                        "HS connection failure for {}",
480                        hsid.display_redacted()
481                    );
482                }
483                (Ok(()), Err(bug)) => {
484                    error_report!(
485                        bug,
486                        "internal error storing built HS circuit for {}",
487                        hsid.display_redacted()
488                    );
489                }
490                (Err(got_error), Err(bug)) => {
491                    // We're reporting two errors, so we'll construct the event
492                    // manually.
493                    error!(
494                        "internal error storing HS connection error for {}: {}; {}",
495                        hsid.display_redacted(),
496                        got_error.report(),
497                        bug.report(),
498                    );
499                }
500            };
501            drop(barrier_send);
502        };
503        runtime
504            .spawn_obj(Box::new(connect_future).into())
505            .map_err(|cause| ConnError::Spawn {
506                spawning: "connection task",
507                cause: cause.into(),
508            })?;
509    }
510
511    Err(internal!("HS connector state management malfunction (exceeded MAX_RECHECKS").into())
512}
513
514impl<D: MockableConnectorData> Services<D> {
515    /// Create a new empty `Services`
516    pub(crate) fn new(config: Config) -> Self {
517        Services {
518            records: Default::default(),
519            config: Arc::new(config),
520        }
521    }
522
523    /// Connect to a hidden service
524    // We *do* drop guard.  There is *one* await point, just after drop(guard).
525    pub(crate) async fn get_or_launch_connection(
526        connector: &HsClientConnector<impl Runtime, D>,
527        netdir: &Arc<NetDir>,
528        hs_id: HsId,
529        isolation: Box<dyn Isolation>,
530        secret_keys: HsClientSecretKeys,
531    ) -> Result<Arc<D::ClientCirc>, ConnError> {
532        let blank_state = || ServiceState::blank(&connector.runtime);
533
534        let mut rechecks = 0..MAX_RECHECKS;
535
536        let mut obtain = |table_index, guard| {
537            obtain_circuit_or_continuation_info(
538                connector,
539                netdir,
540                &hs_id,
541                &secret_keys,
542                table_index,
543                &mut rechecks,
544                guard,
545            )
546        };
547
548        let mut got;
549        let table_index;
550        {
551            let mut guard = connector.services()?;
552            let services = &mut *guard;
553
554            trace!("HS conn get_or_launch: {hs_id:?} {isolation:?} {secret_keys:?}");
555            //trace!("HS conn services: {services:?}");
556
557            table_index =
558                services
559                    .records
560                    .index_or_insert_with(&hs_id, &secret_keys, isolation, blank_state);
561
562            let guard = guard;
563            got = obtain(table_index, guard);
564        }
565        loop {
566            // The parts of this loop which run after a `Left` is returned
567            // logically belong in the case in `obtain_circuit_or_continuation_info`
568            // for `ServiceState::Working`, where that function decides we need to wait.
569            // This code has to be out here to help the compiler's drop tracking.
570            {
571                // Block to scope the acquisition of `error`, a guard
572                // for the mutex-protected error field in the state,
573                // and, for neatness, barrier_recv.
574
575                let (error, mut barrier_recv) = match got? {
576                    Right(ret) => return Ok(ret),
577                    Left(continuation) => continuation,
578                };
579
580                barrier_recv.recv().await;
581
582                let error = error
583                    .lock()
584                    .map_err(|_| internal!("Working error poisoned"))?;
585                if let Some(error) = &*error {
586                    return Err(error.clone());
587                }
588            }
589
590            let guard = connector.services()?;
591
592            got = obtain(table_index, guard);
593        }
594    }
595
596    /// Perform housekeeping - delete data we aren't interested in any more
597    pub(crate) fn run_housekeeping(&mut self, now: Instant) {
598        self.expire_old_data(now);
599    }
600
601    /// Delete data we aren't interested in any more
602    fn expire_old_data(&mut self, now: Instant) {
603        self.records
604            .retain(|hsid, record, _table_index| match &**record {
605                ServiceState::Closed { data: _, last_used } => {
606                    let Some(expiry_time) = last_used.checked_add(RETAIN_DATA_AFTER_LAST_USE)
607                    else {
608                        return false;
609                    };
610                    now <= expiry_time
611                }
612                ServiceState::Open { .. } | ServiceState::Working { .. } => true,
613                ServiceState::Dummy => {
614                    error!(
615                        "bug: found dummy data during HS housekeeping, for {}",
616                        hsid.display_redacted()
617                    );
618                    false
619                }
620            });
621    }
622}
623
624impl<D: MockableConnectorData> ServiceState<D> {
625    /// Spawn a task that will drop our reference to the rendezvous circuit
626    /// at `table_index` when it has gone too long without any use.
627    ///
628    /// According to [`RETAIN_CIRCUIT_AFTER_LAST_USE`].
629    //
630    // As it happens, this function is always called with `last_used` equal to `now`,
631    // but we pass separate arguments for clarity.
632    fn spawn_circuit_expiry_task(
633        connector: &HsClientConnector<impl Runtime, D>,
634        hsid: HsId,
635        table_index: TableIndex,
636        last_used: Instant,
637        now: Instant,
638    ) -> Result<CircuitExpiryTask, SpawnError> {
639        /// Returns the duration until expiry, or `None` if it should expire now
640        fn calculate_expiry_wait(last_used: Instant, now: Instant) -> Option<Duration> {
641            let expiry = last_used
642                .checked_add(RETAIN_CIRCUIT_AFTER_LAST_USE)
643                .or_else(|| {
644                    error!("bug: time overflow calculating HS circuit expiry, killing circuit!");
645                    None
646                })?;
647            let wait = expiry.checked_duration_since(now).unwrap_or_default();
648            if wait == Duration::ZERO {
649                return None;
650            }
651            Some(wait)
652        }
653
654        let mut maybe_wait = calculate_expiry_wait(last_used, now);
655        let () = connector.runtime.spawn({
656            let connector = connector.clone();
657            async move {
658                // This loop is slightly odd.  The wait ought naturally to be at the end,
659                // but that would mean a useless re-lock and re-check right after creation,
660                // or jumping into the middle of the loop.
661                loop {
662                    if let Some(yes_wait) = maybe_wait {
663                        connector.runtime.sleep(yes_wait).await;
664                    }
665                    // If it's None, we can't rely on that to say we should expire it,
666                    // since that information crossed a time when we didn't hold the lock.
667
668                    let Ok(mut guard) = connector.services() else {
669                        break;
670                    };
671                    let Some(record) = guard.records.by_index_mut(table_index) else {
672                        break;
673                    };
674                    let state = &mut **record;
675                    let last_used = match state {
676                        ServiceState::Closed { .. } => break,
677                        ServiceState::Open { last_used, .. } => *last_used,
678                        ServiceState::Working { .. } => break, // someone else will respawn
679                        ServiceState::Dummy => break,          // someone else will (report and) fix
680                    };
681                    maybe_wait = calculate_expiry_wait(last_used, connector.runtime.now());
682                    if maybe_wait.is_none() {
683                        match mem::replace(state, ServiceState::Dummy) {
684                            ServiceState::Open {
685                                data,
686                                circuit,
687                                last_used,
688                                circuit_expiry_task,
689                            } => {
690                                debug!("HS connection expires: {hsid:?}");
691                                drop(circuit);
692                                drop(circuit_expiry_task); // that's us
693                                *state = ServiceState::Closed { data, last_used };
694                                break;
695                            }
696                            _ => panic!("state now {state:?} even though we just saw it Open"),
697                        }
698                    }
699                }
700            }
701        })?;
702        Ok(CircuitExpiryTask {})
703    }
704}
705
706/// Mocking for actual HS connection work, to let us test the `Services` state machine
707//
708// Does *not* mock circmgr, chanmgr, etc. - those won't be used by the tests, since our
709// `connect` won't call them.  But mocking them pollutes many types with `R` and is
710// generally tiresome.  So let's not.  Instead the tests can make dummy ones.
711//
712// This trait is actually crate-private, since it isn't re-exported, but it must
713// be `pub` because it appears as a default for a type parameter in HsClientConnector.
714#[async_trait]
715pub trait MockableConnectorData: Default + Debug + Send + Sync + 'static {
716    /// Client circuit
717    type ClientCirc: Sync + Send + 'static;
718
719    /// Mock state
720    type MockGlobalState: Clone + Sync + Send + 'static;
721
722    /// Connect
723    async fn connect<R: Runtime>(
724        connector: &HsClientConnector<R, Self>,
725        netdir: Arc<NetDir>,
726        config: Arc<Config>,
727        hsid: HsId,
728        data: &mut Self,
729        secret_keys: HsClientSecretKeys,
730    ) -> Result<Arc<Self::ClientCirc>, ConnError>;
731
732    /// Is circuit OK?  Ie, not `.is_closing()`.
733    fn circuit_is_ok(circuit: &Self::ClientCirc) -> bool;
734}
735
736#[cfg(test)]
737pub(crate) mod test {
738    // @@ begin test lint list maintained by maint/add_warning @@
739    #![allow(clippy::bool_assert_comparison)]
740    #![allow(clippy::clone_on_copy)]
741    #![allow(clippy::dbg_macro)]
742    #![allow(clippy::mixed_attributes_style)]
743    #![allow(clippy::print_stderr)]
744    #![allow(clippy::print_stdout)]
745    #![allow(clippy::single_char_pattern)]
746    #![allow(clippy::unwrap_used)]
747    #![allow(clippy::unchecked_duration_subtraction)]
748    #![allow(clippy::useless_vec)]
749    #![allow(clippy::needless_pass_by_value)]
750    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
751    use super::*;
752    use crate::*;
753    use futures::{poll, SinkExt};
754    use std::fmt;
755    use std::task::Poll::{self, *};
756    use tokio::pin;
757    use tokio_crate as tokio;
758    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
759    use tor_proto::memquota::ToplevelAccount;
760    use tor_rtcompat::{test_with_one_runtime, SleepProvider};
761    use tor_rtmock::MockRuntime;
762    use tracing_test::traced_test;
763
764    use ConnError as E;
765
766    #[derive(Debug, Default)]
767    struct MockData {
768        connect_called: usize,
769    }
770
771    /// Type indicating what our `connect()` should return; it always makes a fresh MockCirc
772    type MockGive = Poll<Result<(), E>>;
773
774    #[derive(Debug, Clone)]
775    struct MockGlobalState {
776        // things will appear here when we have more sophisticated tests
777        give: postage::watch::Receiver<MockGive>,
778    }
779
780    #[derive(Clone, Educe)]
781    #[educe(Debug)]
782    struct MockCirc {
783        #[educe(Debug(method = "debug_arc_mutex"))]
784        ok: Arc<Mutex<bool>>,
785        connect_called: usize,
786    }
787
788    fn debug_arc_mutex(val: &Arc<Mutex<impl Debug>>, f: &mut fmt::Formatter) -> fmt::Result {
789        write!(f, "@{:?}", Arc::as_ptr(val))?;
790        let guard = val.lock();
791        let guard = guard.or_else(|g| {
792            write!(f, ",POISON")?;
793            Ok::<_, fmt::Error>(g.into_inner())
794        })?;
795        write!(f, " ")?;
796        Debug::fmt(&*guard, f)
797    }
798
799    impl PartialEq for MockCirc {
800        fn eq(&self, other: &MockCirc) -> bool {
801            Arc::ptr_eq(&self.ok, &other.ok)
802        }
803    }
804
805    impl MockCirc {
806        fn new(connect_called: usize) -> Self {
807            let ok = Arc::new(Mutex::new(true));
808            MockCirc { ok, connect_called }
809        }
810    }
811
812    #[async_trait]
813    impl MockableConnectorData for MockData {
814        type ClientCirc = MockCirc;
815        type MockGlobalState = MockGlobalState;
816
817        async fn connect<R: Runtime>(
818            connector: &HsClientConnector<R, MockData>,
819            _netdir: Arc<NetDir>,
820            _config: Arc<Config>,
821            _hsid: HsId,
822            data: &mut MockData,
823            _secret_keys: HsClientSecretKeys,
824        ) -> Result<Arc<Self::ClientCirc>, E> {
825            data.connect_called += 1;
826            let make = {
827                let connect_called = data.connect_called;
828                move |()| Arc::new(MockCirc::new(connect_called))
829            };
830            let mut give = connector.mock_for_state.give.clone();
831            if let Ready(ret) = &*give.borrow() {
832                return ret.clone().map(make);
833            }
834            loop {
835                match give.recv().await.expect("EOF on mock_global_state stream") {
836                    Pending => {}
837                    Ready(ret) => return ret.map(make),
838                }
839            }
840        }
841
842        fn circuit_is_ok(circuit: &Self::ClientCirc) -> bool {
843            *circuit.ok.lock().unwrap()
844        }
845    }
846
847    /// Makes a non-empty `HsClientSecretKeys`, containing (somehow) `kk`
848    fn mk_keys(kk: u8) -> HsClientSecretKeys {
849        let mut ss = [0_u8; 32];
850        ss[0] = kk;
851        let keypair = tor_llcrypto::pk::ed25519::Keypair::from_bytes(&ss);
852        let mut b = HsClientSecretKeysBuilder::default();
853        #[allow(deprecated)]
854        b.ks_hsc_intro_auth(keypair.into());
855        b.build().unwrap()
856    }
857
858    fn mk_hsconn<R: Runtime>(
859        runtime: R,
860    ) -> (
861        HsClientConnector<R, MockData>,
862        HsClientSecretKeys,
863        postage::watch::Sender<MockGive>,
864    ) {
865        let chanmgr = tor_chanmgr::ChanMgr::new(
866            runtime.clone(),
867            &Default::default(),
868            tor_chanmgr::Dormancy::Dormant,
869            &Default::default(),
870            ToplevelAccount::new_noop(),
871        );
872        let guardmgr = tor_guardmgr::GuardMgr::new(
873            runtime.clone(),
874            tor_persist::TestingStateMgr::new(),
875            &tor_guardmgr::TestConfig::default(),
876        )
877        .unwrap();
878
879        let circmgr = Arc::new(
880            tor_circmgr::CircMgr::new(
881                &tor_circmgr::TestConfig::default(),
882                tor_persist::TestingStateMgr::new(),
883                &runtime,
884                Arc::new(chanmgr),
885                &guardmgr,
886            )
887            .unwrap(),
888        );
889        let circpool = Arc::new(HsCircPool::new(&circmgr));
890        let (give_send, give) = postage::watch::channel_with(Ready(Ok(())));
891        let mock_for_state = MockGlobalState { give };
892        #[allow(clippy::let_and_return)] // we'll probably add more in this function
893        let hscc = HsClientConnector {
894            runtime,
895            circpool,
896            services: Default::default(),
897            mock_for_state,
898        };
899        let keys = HsClientSecretKeysBuilder::default().build().unwrap();
900        (hscc, keys, give_send)
901    }
902
903    #[allow(clippy::unnecessary_wraps)]
904    fn mk_isol(s: &str) -> Option<NarrowableIsolation> {
905        Some(NarrowableIsolation(s.into()))
906    }
907
908    async fn launch_one(
909        hsconn: &HsClientConnector<impl Runtime, MockData>,
910        id: u8,
911        secret_keys: &HsClientSecretKeys,
912        isolation: Option<NarrowableIsolation>,
913    ) -> Result<Arc<MockCirc>, ConnError> {
914        let netdir = tor_netdir::testnet::construct_netdir()
915            .unwrap_if_sufficient()
916            .unwrap();
917        let netdir = Arc::new(netdir);
918
919        let hs_id = {
920            let mut hs_id = [0_u8; 32];
921            hs_id[0] = id;
922            hs_id.into()
923        };
924        #[allow(clippy::redundant_closure)] // srsly, that would be worse
925        let isolation = isolation.unwrap_or_default().into();
926        Services::get_or_launch_connection(hsconn, &netdir, hs_id, isolation, secret_keys.clone())
927            .await
928    }
929
930    #[derive(Default, Debug, Clone)]
931    // TODO move this to tor-circmgr under a test feature?
932    pub(crate) struct NarrowableIsolation(pub(crate) String);
933    impl tor_circmgr::isolation::IsolationHelper for NarrowableIsolation {
934        fn compatible_same_type(&self, other: &Self) -> bool {
935            self.join_same_type(other).is_some()
936        }
937        fn join_same_type(&self, other: &Self) -> Option<Self> {
938            Some(if self.0.starts_with(&other.0) {
939                self.clone()
940            } else if other.0.starts_with(&self.0) {
941                other.clone()
942            } else {
943                return None;
944            })
945        }
946    }
947
948    #[test]
949    #[traced_test]
950    fn simple() {
951        test_with_one_runtime!(|runtime| async {
952            let (hsconn, keys, _give_send) = mk_hsconn(runtime);
953
954            let circuit = launch_one(&hsconn, 0, &keys, None).await.unwrap();
955            eprintln!("{:?}", circuit);
956        });
957    }
958
959    #[test]
960    #[traced_test]
961    fn expiry() {
962        MockRuntime::test_with_various(|runtime| async move {
963            // This is the amount by which we adjust clock advances to make sure we
964            // hit more or less than a particular value, to avoid edge cases and
965            // cope with real time advancing too.
966            // This does *not* represent an actual delay to real test runs.
967            const TIMEOUT_SLOP: Duration = Duration::from_secs(10);
968
969            let (hsconn, keys, _give_send) = mk_hsconn(runtime.clone());
970
971            let advance = |duration| {
972                let hsconn = hsconn.clone();
973                let runtime = &runtime;
974                async move {
975                    // let expiry task get going and choose its expiry (wakeup) time
976                    runtime.progress_until_stalled().await;
977                    // TODO: Make this use runtime.advance_by() when that's not very slow
978                    runtime.mock_sleep().advance(duration);
979                    // let expiry task run
980                    runtime.progress_until_stalled().await;
981                    hsconn.services().unwrap().run_housekeeping(runtime.now());
982                }
983            };
984
985            // make circuit1
986            let circuit1 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
987
988            // expire it
989            advance(RETAIN_CIRCUIT_AFTER_LAST_USE + TIMEOUT_SLOP).await;
990
991            // make circuit2 (a)
992            let circuit2a = launch_one(&hsconn, 0, &keys, None).await.unwrap();
993            assert_ne!(circuit1, circuit2a);
994
995            // nearly expire it, then reuse it
996            advance(RETAIN_CIRCUIT_AFTER_LAST_USE - TIMEOUT_SLOP).await;
997            let circuit2b = launch_one(&hsconn, 0, &keys, None).await.unwrap();
998            assert_eq!(circuit2a, circuit2b);
999
1000            // nearly expire it again, then reuse it
1001            advance(RETAIN_CIRCUIT_AFTER_LAST_USE - TIMEOUT_SLOP).await;
1002            let circuit2c = launch_one(&hsconn, 0, &keys, None).await.unwrap();
1003            assert_eq!(circuit2a, circuit2c);
1004
1005            // actually expire it
1006            advance(RETAIN_CIRCUIT_AFTER_LAST_USE + TIMEOUT_SLOP).await;
1007            let circuit3 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
1008            assert_ne!(circuit2c, circuit3);
1009            assert_eq!(circuit3.connect_called, 3);
1010
1011            advance(RETAIN_DATA_AFTER_LAST_USE + Duration::from_secs(10)).await;
1012            let circuit4 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
1013            assert_eq!(circuit4.connect_called, 1);
1014        });
1015    }
1016
1017    #[test]
1018    #[traced_test]
1019    fn coalesce() {
1020        test_with_one_runtime!(|runtime| async {
1021            let (hsconn, keys, mut give_send) = mk_hsconn(runtime);
1022
1023            give_send.send(Pending).await.unwrap();
1024
1025            let c1f = launch_one(&hsconn, 0, &keys, None);
1026            pin!(c1f);
1027            for _ in 0..10 {
1028                assert!(poll!(&mut c1f).is_pending());
1029            }
1030
1031            // c2f will find Working
1032            let c2f = launch_one(&hsconn, 0, &keys, None);
1033            pin!(c2f);
1034            for _ in 0..10 {
1035                assert!(poll!(&mut c1f).is_pending());
1036                assert!(poll!(&mut c2f).is_pending());
1037            }
1038
1039            give_send.send(Ready(Ok(()))).await.unwrap();
1040
1041            let c1 = c1f.await.unwrap();
1042            let c2 = c2f.await.unwrap();
1043            assert_eq!(c1, c2);
1044
1045            // c2 will find Open
1046            let c3 = launch_one(&hsconn, 0, &keys, None).await.unwrap();
1047            assert_eq!(c1, c3);
1048
1049            assert_ne!(c1, launch_one(&hsconn, 1, &keys, None).await.unwrap());
1050            assert_ne!(
1051                c1,
1052                launch_one(&hsconn, 0, &mk_keys(42), None).await.unwrap()
1053            );
1054
1055            let c_isol_1 = launch_one(&hsconn, 0, &keys, mk_isol("a")).await.unwrap();
1056            assert_eq!(c1, c_isol_1); // We can reuse, but now we've narrowed the isol
1057
1058            let c_isol_2 = launch_one(&hsconn, 0, &keys, mk_isol("b")).await.unwrap();
1059            assert_ne!(c1, c_isol_2);
1060        });
1061    }
1062}