tor_proto/tunnel/reactor/circuit/
circhop.rs

1//! Module exposing structures relating to the reactor's view of a circuit's hops.
2
3use super::CircuitCmd;
4use super::{CloseStreamBehavior, SendRelayCell, SEND_WINDOW_INIT};
5use crate::circuit::HopSettings;
6use crate::congestion::sendme;
7use crate::congestion::CongestionControl;
8use crate::crypto::cell::HopNum;
9use crate::stream::queue::StreamQueueSender;
10use crate::stream::{
11    AnyCmdChecker, DrainRateRequest, StreamFlowControl, StreamRateLimit, StreamStatus,
12};
13use crate::tunnel::circuit::StreamMpscReceiver;
14use crate::tunnel::streammap::{
15    self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut,
16};
17use crate::tunnel::TunnelScopedCircId;
18use crate::util::notify::NotifySender;
19use crate::{Error, Result};
20
21use futures::stream::FuturesUnordered;
22use futures::Stream;
23use postage::watch;
24use safelog::sensitive as sv;
25use tor_cell::chancell::BoxedCellBody;
26use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
27use tor_cell::relaycell::msg::AnyRelayMsg;
28use tor_cell::relaycell::{
29    AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
30    RelayMsg, StreamId, UnparsedRelayMsg,
31};
32
33use tor_error::{internal, Bug};
34use tracing::{trace, warn};
35
36use std::num::NonZeroU32;
37use std::pin::Pin;
38use std::result::Result as StdResult;
39use std::sync::{Arc, Mutex};
40use std::task::Poll;
41
42#[cfg(test)]
43use tor_cell::relaycell::msg::SendmeTag;
44
45/// Represents the reactor's view of a circuit's hop.
46#[derive(Default)]
47pub(crate) struct CircHopList {
48    /// The list of hops.
49    hops: Vec<CircHop>,
50}
51
52impl CircHopList {
53    /// Return a reference to the hop corresponding to `hopnum`, if there is one.
54    pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
55        self.hops.get(Into::<usize>::into(hopnum))
56    }
57
58    /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
59    pub(super) fn get_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
60        self.hops.get_mut(Into::<usize>::into(hopnum))
61    }
62
63    /// Append the specified hop.
64    pub(crate) fn push(&mut self, hop: CircHop) {
65        self.hops.push(hop);
66    }
67
68    /// Returns `true` if the list contains no [`CircHop`]s.
69    pub(crate) fn is_empty(&self) -> bool {
70        self.hops.is_empty()
71    }
72
73    /// Returns the number of hops in the list.
74    pub(crate) fn len(&self) -> usize {
75        self.hops.len()
76    }
77
78    /// Returns a [`Stream`] of [`CircuitCmd`] to poll from the main loop.
79    ///
80    /// The iterator contains at most one [`CircuitCmd`] for each hop,
81    /// representing the instructions for handling the ready-item, if any,
82    /// of its highest priority stream.
83    ///
84    /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
85    /// To avoid contention, never create more than one
86    /// [`ready_streams_iterator`](Self::ready_streams_iterator)
87    /// stream at a time!
88    ///
89    /// This is cancellation-safe.
90    pub(super) fn ready_streams_iterator(
91        &self,
92        exclude: Option<HopNum>,
93    ) -> impl Stream<Item = Result<CircuitCmd>> {
94        self.hops
95            .iter()
96            .enumerate()
97            .filter_map(|(i, hop)| {
98                let hop_num = HopNum::from(i as u8);
99
100                if exclude == Some(hop_num) {
101                    // We must skip polling this hop
102                    return None;
103                }
104
105                if !hop.ccontrol().can_send() {
106                    // We can't send anything on this hop that counts towards SENDME windows.
107                    //
108                    // In theory we could send messages that don't count towards
109                    // windows (like `RESOLVE`), and process end-of-stream
110                    // events (to send an `END`), but it's probably not worth
111                    // doing an O(N) iteration over flow-control-ready streams
112                    // to see if that's the case.
113                    //
114                    // This *doesn't* block outgoing flow-control messages (e.g.
115                    // SENDME), which are initiated via the control-message
116                    // channel, handled above.
117                    //
118                    // TODO: Consider revisiting. OTOH some extra throttling when circuit-level
119                    // congestion control has "bottomed out" might not be so bad, and the
120                    // alternatives have complexity and/or performance costs.
121                    return None;
122                }
123
124                let hop_map = Arc::clone(&self.hops[i].map);
125                Some(futures::future::poll_fn(move |cx| {
126                    // Process an outbound message from the first ready stream on
127                    // this hop. The stream map implements round robin scheduling to
128                    // ensure fairness across streams.
129                    // TODO: Consider looping here to process multiple ready
130                    // streams. Need to be careful though to balance that with
131                    // continuing to service incoming and control messages.
132                    let mut hop_map = hop_map.lock().expect("lock poisoned");
133                    let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
134                        // No ready streams for this hop.
135                        return Poll::Pending;
136                    };
137
138                    if msg.is_none() {
139                        return Poll::Ready(Ok(CircuitCmd::CloseStream {
140                            hop: hop_num,
141                            sid,
142                            behav: CloseStreamBehavior::default(),
143                            reason: streammap::TerminateReason::StreamTargetClosed,
144                        }));
145                    };
146                    let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
147
148                    #[allow(unused)] // unused in non-debug builds
149                    let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
150                        panic!("Stream {sid} disappeared");
151                    };
152
153                    debug_assert!(
154                        s.can_send(&msg),
155                        "Stream {sid} produced a message it can't send: {msg:?}"
156                    );
157
158                    let cell = SendRelayCell {
159                        hop: hop_num,
160                        early: false,
161                        cell: AnyRelayMsgOuter::new(Some(sid), msg),
162                    };
163                    Poll::Ready(Ok(CircuitCmd::Send(cell)))
164                }))
165            })
166            .collect::<FuturesUnordered<_>>()
167    }
168
169    /// Returns true if there are any streams on this circuit
170    ///
171    /// Important: this function locks the stream map of its each of the [`CircHop`]s
172    /// in this circuit, so it must **not** be called from any function where the
173    /// stream map lock is held (such as [`ready_streams_iterator`](Self::ready_streams_iterator).
174    pub(super) fn has_streams(&self) -> bool {
175        self.hops
176            .iter()
177            .any(|hop| hop.map.lock().expect("lock poisoned").n_open_streams() > 0)
178    }
179
180    /// Return the number of streams currently open on this circuit.
181    pub(crate) fn n_open_streams(&self) -> usize {
182        self.hops
183            .iter()
184            .map(|hop| hop.n_open_streams())
185            // No need to worry about overflow; max streams per hop is U16_MAX
186            .sum()
187    }
188}
189
190/// Represents the reactor's view of a single hop.
191pub(crate) struct CircHop {
192    /// The unique ID of the circuit. Used for logging.
193    unique_id: TunnelScopedCircId,
194    /// Hop number in the path.
195    hop_num: HopNum,
196    /// Map from stream IDs to streams.
197    ///
198    /// We store this with the reactor instead of the circuit, since the
199    /// reactor needs it for every incoming cell on a stream, whereas
200    /// the circuit only needs it when allocating new streams.
201    ///
202    /// NOTE: this is behind a mutex because the reactor polls the `StreamMap`s
203    /// of all hops concurrently, in a [`FuturesUnordered`]. Without the mutex,
204    /// this wouldn't be possible, because it would mean holding multiple
205    /// mutable references to `self` (the reactor). Note, however,
206    /// that there should never be any contention on this mutex:
207    /// we never create more than one
208    /// [`ready_streams_iterator`](CircHopList::ready_streams_iterator) stream
209    /// at a time, and we never clone/lock the hop's `StreamMap` outside of it.
210    ///
211    /// Additionally, the stream map of the last hop (join point) of a conflux tunnel
212    /// is shared with all the circuits in the tunnel.
213    map: Arc<Mutex<streammap::StreamMap>>,
214    /// Congestion control object.
215    ///
216    /// This object is also in charge of handling circuit level SENDME logic for this hop.
217    ccontrol: CongestionControl,
218    /// Decodes relay cells received from this hop.
219    inbound: RelayCellDecoder,
220    /// Format to use for relay cells.
221    //
222    // When we have packed/fragmented cells, this may be replaced by a RelayCellEncoder.
223    relay_format: RelayCellFormat,
224
225    /// Remaining permitted incoming relay cells from this hop, plus 1.
226    ///
227    /// (In other words, `None` represents no limit,
228    /// `Some(1)` represents an exhausted limit,
229    /// and `Some(n)` means that n-1 more cells may be received.)
230    ///
231    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
232    n_incoming_cells_permitted: Option<NonZeroU32>,
233
234    /// Remaining permitted outgoing relay cells from this hop, plus 1.
235    ///
236    /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
237    n_outgoing_cells_permitted: Option<NonZeroU32>,
238}
239
240impl CircHop {
241    /// Create a new hop.
242    pub(super) fn new(
243        unique_id: TunnelScopedCircId,
244        hop_num: HopNum,
245        settings: &HopSettings,
246    ) -> Self {
247        /// Convert a limit from the form used in a HopSettings to that used here.
248        /// (The format we use here is more compact.)
249        fn cvt(limit: u32) -> NonZeroU32 {
250            // See "known limitations" comment on n_incoming_cells_permitted.
251            limit
252                .saturating_add(1)
253                .try_into()
254                .expect("Adding one left it as zero?")
255        }
256        let relay_format = settings.relay_crypt_protocol().relay_cell_format();
257        CircHop {
258            unique_id,
259            hop_num,
260            map: Arc::new(Mutex::new(streammap::StreamMap::new())),
261            ccontrol: CongestionControl::new(&settings.ccontrol),
262            inbound: RelayCellDecoder::new(relay_format),
263            relay_format,
264            n_incoming_cells_permitted: settings.n_incoming_cells_permitted.map(cvt),
265            n_outgoing_cells_permitted: settings.n_outgoing_cells_permitted.map(cvt),
266        }
267    }
268
269    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
270    /// `message` to the provided hop.
271    pub(crate) fn begin_stream(
272        &mut self,
273        message: AnyRelayMsg,
274        sender: StreamQueueSender,
275        rx: StreamMpscReceiver<AnyRelayMsg>,
276        rate_limit_updater: watch::Sender<StreamRateLimit>,
277        drain_rate_requester: NotifySender<DrainRateRequest>,
278        cmd_checker: AnyCmdChecker,
279    ) -> Result<(SendRelayCell, StreamId)> {
280        let flow_ctrl = self.build_flow_ctrl(rate_limit_updater, drain_rate_requester)?;
281        let r =
282            self.map
283                .lock()
284                .expect("lock poisoned")
285                .add_ent(sender, rx, flow_ctrl, cmd_checker)?;
286        let cell = AnyRelayMsgOuter::new(Some(r), message);
287        Ok((
288            SendRelayCell {
289                hop: self.hop_num,
290                early: false,
291                cell,
292            },
293            r,
294        ))
295    }
296
297    /// Close the stream associated with `id` because the stream was
298    /// dropped.
299    ///
300    /// If we have not already received an END cell on this stream, send one.
301    /// If no END cell is specified, an END cell with the reason byte set to
302    /// REASON_MISC will be sent.
303    pub(super) fn close_stream(
304        &mut self,
305        id: StreamId,
306        message: CloseStreamBehavior,
307        why: streammap::TerminateReason,
308    ) -> Result<Option<SendRelayCell>> {
309        let should_send_end = self.map.lock().expect("lock poisoned").terminate(id, why)?;
310        trace!(
311            circ_id = %self.unique_id,
312            stream_id = %id,
313            should_send_end = ?should_send_end,
314            "Ending stream",
315        );
316        // TODO: I am about 80% sure that we only send an END cell if
317        // we didn't already get an END cell.  But I should double-check!
318        if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
319            (should_send_end, message)
320        {
321            let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
322            let cell = SendRelayCell {
323                hop: self.hop_num,
324                early: false,
325                cell: end_cell,
326            };
327
328            return Ok(Some(cell));
329        }
330        Ok(None)
331    }
332
333    /// Check if we should send an XON message.
334    ///
335    /// If we should, then returns the XON message that should be sent.
336    pub(crate) fn maybe_send_xon(
337        &mut self,
338        rate: XonKbpsEwma,
339        id: StreamId,
340    ) -> Result<Option<Xon>> {
341        // the call below will return an error if XON/XOFF aren't supported,
342        // so we check for support here
343        if !self.ccontrol.uses_xon_xoff() {
344            return Ok(None);
345        }
346
347        let mut map = self.map.lock().expect("lock poisoned");
348        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
349            // stream went away
350            return Ok(None);
351        };
352
353        ent.maybe_send_xon(rate)
354    }
355
356    /// Check if we should send an XOFF message.
357    ///
358    /// If we should, then returns the XOFF message that should be sent.
359    pub(super) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
360        // the call below will return an error if XON/XOFF aren't supported,
361        // so we check for support here
362        if !self.ccontrol.uses_xon_xoff() {
363            return Ok(None);
364        }
365
366        let mut map = self.map.lock().expect("lock poisoned");
367        let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
368            // stream went away
369            return Ok(None);
370        };
371
372        ent.maybe_send_xoff()
373    }
374
375    /// Return the format that is used for relay cells sent to this hop.
376    ///
377    /// For the most part, this format isn't necessary to interact with a CircHop;
378    /// it becomes relevant when we are deciding _what_ we can encode for the hop.
379    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
380        self.relay_format
381    }
382
383    /// Delegate to CongestionControl, for testing purposes
384    #[cfg(test)]
385    pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
386        self.ccontrol.send_window_and_expected_tags()
387    }
388
389    /// Return the number of open streams on this hop.
390    ///
391    /// WARNING: because this locks the stream map mutex,
392    /// it should never be called from a context where that mutex is already locked.
393    pub(crate) fn n_open_streams(&self) -> usize {
394        self.map.lock().expect("lock poisoned").n_open_streams()
395    }
396
397    /// Return a reference to our CongestionControl object.
398    pub(crate) fn ccontrol(&self) -> &CongestionControl {
399        &self.ccontrol
400    }
401
402    /// Return a mutable reference to our CongestionControl object.
403    pub(crate) fn ccontrol_mut(&mut self) -> &mut CongestionControl {
404        &mut self.ccontrol
405    }
406
407    /// Return the RelayCellFormat.
408    pub(crate) fn relay_format(&self) -> RelayCellFormat {
409        self.relay_format
410    }
411
412    /// Take capacity to send `msg`.
413    ///
414    /// See [`OpenStreamEnt::take_capacity_to_send`].
415    //
416    // TODO prop340: This should take a cell or similar, not a message.
417    pub(crate) fn take_capacity_to_send<M: RelayMsg>(
418        &mut self,
419        stream_id: StreamId,
420        msg: &M,
421    ) -> Result<()> {
422        let mut hop_map = self.map.lock().expect("lock poisoned");
423        let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
424            warn!(
425                circ_id = %self.unique_id,
426                stream_id = %stream_id,
427                "sending a relay cell for non-existent or non-open stream!",
428            );
429            return Err(Error::CircProto(format!(
430                "tried to send a relay cell on non-open stream {}",
431                sv(stream_id),
432            )));
433        };
434
435        ent.take_capacity_to_send(msg)
436    }
437
438    /// Add an entry to this map using the specified StreamId.
439    #[cfg(feature = "hs-service")]
440    pub(super) fn add_ent_with_id(
441        &self,
442        sink: StreamQueueSender,
443        rx: StreamMpscReceiver<AnyRelayMsg>,
444        rate_limit_updater: watch::Sender<StreamRateLimit>,
445        drain_rate_requester: NotifySender<DrainRateRequest>,
446        stream_id: StreamId,
447        cmd_checker: AnyCmdChecker,
448    ) -> Result<()> {
449        let mut hop_map = self.map.lock().expect("lock poisoned");
450        hop_map.add_ent_with_id(
451            sink,
452            rx,
453            self.build_flow_ctrl(rate_limit_updater, drain_rate_requester)?,
454            stream_id,
455            cmd_checker,
456        )?;
457
458        Ok(())
459    }
460
461    /// Note that we received an END message (or other message indicating the end of
462    /// the stream) on the stream with `id`.
463    ///
464    /// See [`StreamMap::ending_msg_received`](super::streammap::StreamMap::ending_msg_received).
465    #[cfg(feature = "hs-service")]
466    pub(super) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
467        let mut hop_map = self.map.lock().expect("lock poisoned");
468
469        hop_map.ending_msg_received(stream_id)?;
470
471        Ok(())
472    }
473
474    /// Parse a RELAY or RELAY_EARLY cell body.
475    ///
476    /// Requires that the cryptographic checks on the message have already been
477    /// performed
478    pub(super) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
479        self.inbound
480            .decode(cell)
481            .map_err(|e| Error::from_bytes_err(e, "relay cell"))
482    }
483
484    /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
485    ///
486    /// Returns back the provided `msg`, if the message is an incoming stream request
487    /// that needs to be handled by the calling code.
488    ///
489    // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
490    // back and forth like this.
491    pub(super) fn handle_msg(
492        &self,
493        cell_counts_toward_windows: bool,
494        streamid: StreamId,
495        msg: UnparsedRelayMsg,
496    ) -> Result<Option<UnparsedRelayMsg>> {
497        let mut hop_map = self.map.lock().expect("lock poisoned");
498        match hop_map.get_mut(streamid) {
499            Some(StreamEntMut::Open(ent)) => {
500                // Can't have a stream level SENDME when congestion control is enabled.
501                let message_closes_stream =
502                    Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
503
504                if message_closes_stream {
505                    hop_map.ending_msg_received(streamid)?;
506                }
507            }
508            #[cfg(feature = "hs-service")]
509            Some(StreamEntMut::EndSent(_))
510                if matches!(
511                    msg.cmd(),
512                    RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
513                ) =>
514            {
515                // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
516                // message, just remove the old stream from the map and stop waiting for a
517                // response
518                hop_map.ending_msg_received(streamid)?;
519                return Ok(Some(msg));
520            }
521            Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
522                // We sent an end but maybe the other side hasn't heard.
523
524                match half_stream.handle_msg(msg)? {
525                    StreamStatus::Open => {}
526                    StreamStatus::Closed => {
527                        hop_map.ending_msg_received(streamid)?;
528                    }
529                }
530            }
531            #[cfg(feature = "hs-service")]
532            None if matches!(
533                msg.cmd(),
534                RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
535            ) =>
536            {
537                return Ok(Some(msg));
538            }
539            _ => {
540                // No stream wants this message, or ever did.
541                return Err(Error::CircProto(
542                    "Cell received on nonexistent stream!?".into(),
543                ));
544            }
545        }
546
547        Ok(None)
548    }
549
550    /// Builds the reactor's flow control handler for a new stream.
551    // TODO: remove the `Result` once we remove the "flowctl-cc" feature
552    #[cfg_attr(feature = "flowctl-cc", expect(clippy::unnecessary_wraps))]
553    fn build_flow_ctrl(
554        &self,
555        rate_limit_updater: watch::Sender<StreamRateLimit>,
556        drain_rate_requester: NotifySender<DrainRateRequest>,
557    ) -> Result<StreamFlowControl> {
558        if self.ccontrol.uses_stream_sendme() {
559            let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
560            Ok(StreamFlowControl::new_window_based(window))
561        } else {
562            cfg_if::cfg_if! {
563                if #[cfg(feature = "flowctl-cc")] {
564                    Ok(StreamFlowControl::new_xon_xoff_based(rate_limit_updater, drain_rate_requester))
565                } else {
566                    Err(internal!(
567                        "`CongestionControl` doesn't use sendmes, but 'flowctl-cc' feature not enabled",
568                    ).into())
569                }
570            }
571        }
572    }
573
574    /// Deliver `msg` to the specified open stream entry `ent`.
575    fn deliver_msg_to_stream(
576        streamid: StreamId,
577        ent: &mut OpenStreamEnt,
578        cell_counts_toward_windows: bool,
579        msg: UnparsedRelayMsg,
580    ) -> Result<bool> {
581        use tor_async_utils::SinkTrySend as _;
582        use tor_async_utils::SinkTrySendError as _;
583
584        // The stream for this message exists, and is open.
585
586        // We need to handle SENDME/XON/XOFF messages here, not in the stream's recv() method, or
587        // else we'd never notice them if the stream isn't reading.
588        //
589        // TODO: this logic is the same as `HalfStream::handle_msg`; we should refactor this if
590        // possible
591        match msg.cmd() {
592            RelayCmd::SENDME => {
593                ent.put_for_incoming_sendme(msg)?;
594                return Ok(false);
595            }
596            RelayCmd::XON => {
597                ent.handle_incoming_xon(msg)?;
598                return Ok(false);
599            }
600            RelayCmd::XOFF => {
601                ent.handle_incoming_xoff(msg)?;
602                return Ok(false);
603            }
604            _ => {}
605        }
606
607        let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
608
609        if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
610            if e.is_full() {
611                // If we get here, we either have a logic bug (!), or an attacker
612                // is sending us more cells than we asked for via congestion control.
613                return Err(Error::CircProto(format!(
614                    "Stream sink would block; received too many cells on stream ID {}",
615                    sv(streamid),
616                )));
617            }
618            if e.is_disconnected() && cell_counts_toward_windows {
619                // the other side of the stream has gone away; remember
620                // that we received a cell that we couldn't queue for it.
621                //
622                // Later this value will be recorded in a half-stream.
623                ent.dropped += 1;
624            }
625        }
626
627        Ok(message_closes_stream)
628    }
629
630    /// Get the stream map of this hop.
631    pub(crate) fn stream_map(&self) -> &Arc<Mutex<streammap::StreamMap>> {
632        &self.map
633    }
634
635    /// Set the stream map of this hop to `map`.
636    ///
637    /// Returns an error if the existing stream map of the hop has any open stream.
638    pub(crate) fn set_stream_map(
639        &mut self,
640        map: Arc<Mutex<streammap::StreamMap>>,
641    ) -> StdResult<(), Bug> {
642        if self.n_open_streams() != 0 {
643            return Err(internal!("Tried to discard existing open streams?!"));
644        }
645
646        self.map = map;
647
648        Ok(())
649    }
650
651    /// Decrement the limit of outbound cells that may be sent to this hop; give
652    /// an error if it would reach zero.
653    pub(crate) fn decrement_outbound_cell_limit(&mut self) -> Result<()> {
654        try_decrement_cell_limit(&mut self.n_outgoing_cells_permitted)
655            .map_err(|_| Error::ExcessOutboundCells)
656    }
657
658    /// Decrement the limit of inbound cells that may be received from this hop; give
659    /// an error if it would reach zero.
660    pub(crate) fn decrement_inbound_cell_limit(&mut self) -> Result<()> {
661        try_decrement_cell_limit(&mut self.n_incoming_cells_permitted)
662            .map_err(|_| Error::ExcessInboundCells)
663    }
664}
665
666/// If `val` is `Some(1)`, return Err(());
667/// otherwise decrement it (if it is Some) and return Ok(()).
668#[inline]
669fn try_decrement_cell_limit(val: &mut Option<NonZeroU32>) -> StdResult<(), ()> {
670    // This is a bit verbose, but I've confirmed that it optimizes nicely.
671    match val {
672        Some(x) => {
673            let z = u32::from(*x);
674            if z == 1 {
675                Err(())
676            } else {
677                *x = (z - 1).try_into().expect("NonZeroU32 was zero?!");
678                Ok(())
679            }
680        }
681        None => Ok(()),
682    }
683}