tor_proto/tunnel/reactor/circuit/circhop.rs
1//! Module exposing structures relating to the reactor's view of a circuit's hops.
2
3use super::CircuitCmd;
4use super::{CloseStreamBehavior, SendRelayCell, SEND_WINDOW_INIT};
5use crate::circuit::HopSettings;
6use crate::congestion::sendme;
7use crate::congestion::CongestionControl;
8use crate::crypto::cell::HopNum;
9use crate::stream::queue::StreamQueueSender;
10use crate::stream::{
11 AnyCmdChecker, DrainRateRequest, StreamFlowControl, StreamRateLimit, StreamStatus,
12};
13use crate::tunnel::circuit::StreamMpscReceiver;
14use crate::tunnel::streammap::{
15 self, EndSentStreamEnt, OpenStreamEnt, ShouldSendEnd, StreamEntMut,
16};
17use crate::tunnel::TunnelScopedCircId;
18use crate::util::notify::NotifySender;
19use crate::{Error, Result};
20
21use futures::stream::FuturesUnordered;
22use futures::Stream;
23use postage::watch;
24use safelog::sensitive as sv;
25use tor_cell::chancell::BoxedCellBody;
26use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
27use tor_cell::relaycell::msg::AnyRelayMsg;
28use tor_cell::relaycell::{
29 AnyRelayMsgOuter, RelayCellDecoder, RelayCellDecoderResult, RelayCellFormat, RelayCmd,
30 RelayMsg, StreamId, UnparsedRelayMsg,
31};
32
33use tor_error::{internal, Bug};
34use tracing::{trace, warn};
35
36use std::num::NonZeroU32;
37use std::pin::Pin;
38use std::result::Result as StdResult;
39use std::sync::{Arc, Mutex};
40use std::task::Poll;
41
42#[cfg(test)]
43use tor_cell::relaycell::msg::SendmeTag;
44
45/// Represents the reactor's view of a circuit's hop.
46#[derive(Default)]
47pub(crate) struct CircHopList {
48 /// The list of hops.
49 hops: Vec<CircHop>,
50}
51
52impl CircHopList {
53 /// Return a reference to the hop corresponding to `hopnum`, if there is one.
54 pub(super) fn hop(&self, hopnum: HopNum) -> Option<&CircHop> {
55 self.hops.get(Into::<usize>::into(hopnum))
56 }
57
58 /// Return a mutable reference to the hop corresponding to `hopnum`, if there is one.
59 pub(super) fn get_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
60 self.hops.get_mut(Into::<usize>::into(hopnum))
61 }
62
63 /// Append the specified hop.
64 pub(crate) fn push(&mut self, hop: CircHop) {
65 self.hops.push(hop);
66 }
67
68 /// Returns `true` if the list contains no [`CircHop`]s.
69 pub(crate) fn is_empty(&self) -> bool {
70 self.hops.is_empty()
71 }
72
73 /// Returns the number of hops in the list.
74 pub(crate) fn len(&self) -> usize {
75 self.hops.len()
76 }
77
78 /// Returns a [`Stream`] of [`CircuitCmd`] to poll from the main loop.
79 ///
80 /// The iterator contains at most one [`CircuitCmd`] for each hop,
81 /// representing the instructions for handling the ready-item, if any,
82 /// of its highest priority stream.
83 ///
84 /// IMPORTANT: this stream locks the stream map mutexes of each `CircHop`!
85 /// To avoid contention, never create more than one
86 /// [`ready_streams_iterator`](Self::ready_streams_iterator)
87 /// stream at a time!
88 ///
89 /// This is cancellation-safe.
90 pub(super) fn ready_streams_iterator(
91 &self,
92 exclude: Option<HopNum>,
93 ) -> impl Stream<Item = Result<CircuitCmd>> {
94 self.hops
95 .iter()
96 .enumerate()
97 .filter_map(|(i, hop)| {
98 let hop_num = HopNum::from(i as u8);
99
100 if exclude == Some(hop_num) {
101 // We must skip polling this hop
102 return None;
103 }
104
105 if !hop.ccontrol().can_send() {
106 // We can't send anything on this hop that counts towards SENDME windows.
107 //
108 // In theory we could send messages that don't count towards
109 // windows (like `RESOLVE`), and process end-of-stream
110 // events (to send an `END`), but it's probably not worth
111 // doing an O(N) iteration over flow-control-ready streams
112 // to see if that's the case.
113 //
114 // This *doesn't* block outgoing flow-control messages (e.g.
115 // SENDME), which are initiated via the control-message
116 // channel, handled above.
117 //
118 // TODO: Consider revisiting. OTOH some extra throttling when circuit-level
119 // congestion control has "bottomed out" might not be so bad, and the
120 // alternatives have complexity and/or performance costs.
121 return None;
122 }
123
124 let hop_map = Arc::clone(&self.hops[i].map);
125 Some(futures::future::poll_fn(move |cx| {
126 // Process an outbound message from the first ready stream on
127 // this hop. The stream map implements round robin scheduling to
128 // ensure fairness across streams.
129 // TODO: Consider looping here to process multiple ready
130 // streams. Need to be careful though to balance that with
131 // continuing to service incoming and control messages.
132 let mut hop_map = hop_map.lock().expect("lock poisoned");
133 let Some((sid, msg)) = hop_map.poll_ready_streams_iter(cx).next() else {
134 // No ready streams for this hop.
135 return Poll::Pending;
136 };
137
138 if msg.is_none() {
139 return Poll::Ready(Ok(CircuitCmd::CloseStream {
140 hop: hop_num,
141 sid,
142 behav: CloseStreamBehavior::default(),
143 reason: streammap::TerminateReason::StreamTargetClosed,
144 }));
145 };
146 let msg = hop_map.take_ready_msg(sid).expect("msg disappeared");
147
148 #[allow(unused)] // unused in non-debug builds
149 let Some(StreamEntMut::Open(s)) = hop_map.get_mut(sid) else {
150 panic!("Stream {sid} disappeared");
151 };
152
153 debug_assert!(
154 s.can_send(&msg),
155 "Stream {sid} produced a message it can't send: {msg:?}"
156 );
157
158 let cell = SendRelayCell {
159 hop: hop_num,
160 early: false,
161 cell: AnyRelayMsgOuter::new(Some(sid), msg),
162 };
163 Poll::Ready(Ok(CircuitCmd::Send(cell)))
164 }))
165 })
166 .collect::<FuturesUnordered<_>>()
167 }
168
169 /// Returns true if there are any streams on this circuit
170 ///
171 /// Important: this function locks the stream map of its each of the [`CircHop`]s
172 /// in this circuit, so it must **not** be called from any function where the
173 /// stream map lock is held (such as [`ready_streams_iterator`](Self::ready_streams_iterator).
174 pub(super) fn has_streams(&self) -> bool {
175 self.hops
176 .iter()
177 .any(|hop| hop.map.lock().expect("lock poisoned").n_open_streams() > 0)
178 }
179
180 /// Return the number of streams currently open on this circuit.
181 pub(crate) fn n_open_streams(&self) -> usize {
182 self.hops
183 .iter()
184 .map(|hop| hop.n_open_streams())
185 // No need to worry about overflow; max streams per hop is U16_MAX
186 .sum()
187 }
188}
189
190/// Represents the reactor's view of a single hop.
191pub(crate) struct CircHop {
192 /// The unique ID of the circuit. Used for logging.
193 unique_id: TunnelScopedCircId,
194 /// Hop number in the path.
195 hop_num: HopNum,
196 /// Map from stream IDs to streams.
197 ///
198 /// We store this with the reactor instead of the circuit, since the
199 /// reactor needs it for every incoming cell on a stream, whereas
200 /// the circuit only needs it when allocating new streams.
201 ///
202 /// NOTE: this is behind a mutex because the reactor polls the `StreamMap`s
203 /// of all hops concurrently, in a [`FuturesUnordered`]. Without the mutex,
204 /// this wouldn't be possible, because it would mean holding multiple
205 /// mutable references to `self` (the reactor). Note, however,
206 /// that there should never be any contention on this mutex:
207 /// we never create more than one
208 /// [`ready_streams_iterator`](CircHopList::ready_streams_iterator) stream
209 /// at a time, and we never clone/lock the hop's `StreamMap` outside of it.
210 ///
211 /// Additionally, the stream map of the last hop (join point) of a conflux tunnel
212 /// is shared with all the circuits in the tunnel.
213 map: Arc<Mutex<streammap::StreamMap>>,
214 /// Congestion control object.
215 ///
216 /// This object is also in charge of handling circuit level SENDME logic for this hop.
217 ccontrol: CongestionControl,
218 /// Decodes relay cells received from this hop.
219 inbound: RelayCellDecoder,
220 /// Format to use for relay cells.
221 //
222 // When we have packed/fragmented cells, this may be replaced by a RelayCellEncoder.
223 relay_format: RelayCellFormat,
224
225 /// Remaining permitted incoming relay cells from this hop, plus 1.
226 ///
227 /// (In other words, `None` represents no limit,
228 /// `Some(1)` represents an exhausted limit,
229 /// and `Some(n)` means that n-1 more cells may be received.)
230 ///
231 /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
232 n_incoming_cells_permitted: Option<NonZeroU32>,
233
234 /// Remaining permitted outgoing relay cells from this hop, plus 1.
235 ///
236 /// If this ever decrements from Some(1), then the circuit must be torn down with an error.
237 n_outgoing_cells_permitted: Option<NonZeroU32>,
238}
239
240impl CircHop {
241 /// Create a new hop.
242 pub(super) fn new(
243 unique_id: TunnelScopedCircId,
244 hop_num: HopNum,
245 settings: &HopSettings,
246 ) -> Self {
247 /// Convert a limit from the form used in a HopSettings to that used here.
248 /// (The format we use here is more compact.)
249 fn cvt(limit: u32) -> NonZeroU32 {
250 // See "known limitations" comment on n_incoming_cells_permitted.
251 limit
252 .saturating_add(1)
253 .try_into()
254 .expect("Adding one left it as zero?")
255 }
256 let relay_format = settings.relay_crypt_protocol().relay_cell_format();
257 CircHop {
258 unique_id,
259 hop_num,
260 map: Arc::new(Mutex::new(streammap::StreamMap::new())),
261 ccontrol: CongestionControl::new(&settings.ccontrol),
262 inbound: RelayCellDecoder::new(relay_format),
263 relay_format,
264 n_incoming_cells_permitted: settings.n_incoming_cells_permitted.map(cvt),
265 n_outgoing_cells_permitted: settings.n_outgoing_cells_permitted.map(cvt),
266 }
267 }
268
269 /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
270 /// `message` to the provided hop.
271 pub(crate) fn begin_stream(
272 &mut self,
273 message: AnyRelayMsg,
274 sender: StreamQueueSender,
275 rx: StreamMpscReceiver<AnyRelayMsg>,
276 rate_limit_updater: watch::Sender<StreamRateLimit>,
277 drain_rate_requester: NotifySender<DrainRateRequest>,
278 cmd_checker: AnyCmdChecker,
279 ) -> Result<(SendRelayCell, StreamId)> {
280 let flow_ctrl = self.build_flow_ctrl(rate_limit_updater, drain_rate_requester)?;
281 let r =
282 self.map
283 .lock()
284 .expect("lock poisoned")
285 .add_ent(sender, rx, flow_ctrl, cmd_checker)?;
286 let cell = AnyRelayMsgOuter::new(Some(r), message);
287 Ok((
288 SendRelayCell {
289 hop: self.hop_num,
290 early: false,
291 cell,
292 },
293 r,
294 ))
295 }
296
297 /// Close the stream associated with `id` because the stream was
298 /// dropped.
299 ///
300 /// If we have not already received an END cell on this stream, send one.
301 /// If no END cell is specified, an END cell with the reason byte set to
302 /// REASON_MISC will be sent.
303 pub(super) fn close_stream(
304 &mut self,
305 id: StreamId,
306 message: CloseStreamBehavior,
307 why: streammap::TerminateReason,
308 ) -> Result<Option<SendRelayCell>> {
309 let should_send_end = self.map.lock().expect("lock poisoned").terminate(id, why)?;
310 trace!(
311 circ_id = %self.unique_id,
312 stream_id = %id,
313 should_send_end = ?should_send_end,
314 "Ending stream",
315 );
316 // TODO: I am about 80% sure that we only send an END cell if
317 // we didn't already get an END cell. But I should double-check!
318 if let (ShouldSendEnd::Send, CloseStreamBehavior::SendEnd(end_message)) =
319 (should_send_end, message)
320 {
321 let end_cell = AnyRelayMsgOuter::new(Some(id), end_message.into());
322 let cell = SendRelayCell {
323 hop: self.hop_num,
324 early: false,
325 cell: end_cell,
326 };
327
328 return Ok(Some(cell));
329 }
330 Ok(None)
331 }
332
333 /// Check if we should send an XON message.
334 ///
335 /// If we should, then returns the XON message that should be sent.
336 pub(crate) fn maybe_send_xon(
337 &mut self,
338 rate: XonKbpsEwma,
339 id: StreamId,
340 ) -> Result<Option<Xon>> {
341 // the call below will return an error if XON/XOFF aren't supported,
342 // so we check for support here
343 if !self.ccontrol.uses_xon_xoff() {
344 return Ok(None);
345 }
346
347 let mut map = self.map.lock().expect("lock poisoned");
348 let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
349 // stream went away
350 return Ok(None);
351 };
352
353 ent.maybe_send_xon(rate)
354 }
355
356 /// Check if we should send an XOFF message.
357 ///
358 /// If we should, then returns the XOFF message that should be sent.
359 pub(super) fn maybe_send_xoff(&mut self, id: StreamId) -> Result<Option<Xoff>> {
360 // the call below will return an error if XON/XOFF aren't supported,
361 // so we check for support here
362 if !self.ccontrol.uses_xon_xoff() {
363 return Ok(None);
364 }
365
366 let mut map = self.map.lock().expect("lock poisoned");
367 let Some(StreamEntMut::Open(ent)) = map.get_mut(id) else {
368 // stream went away
369 return Ok(None);
370 };
371
372 ent.maybe_send_xoff()
373 }
374
375 /// Return the format that is used for relay cells sent to this hop.
376 ///
377 /// For the most part, this format isn't necessary to interact with a CircHop;
378 /// it becomes relevant when we are deciding _what_ we can encode for the hop.
379 pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
380 self.relay_format
381 }
382
383 /// Delegate to CongestionControl, for testing purposes
384 #[cfg(test)]
385 pub(crate) fn send_window_and_expected_tags(&self) -> (u32, Vec<SendmeTag>) {
386 self.ccontrol.send_window_and_expected_tags()
387 }
388
389 /// Return the number of open streams on this hop.
390 ///
391 /// WARNING: because this locks the stream map mutex,
392 /// it should never be called from a context where that mutex is already locked.
393 pub(crate) fn n_open_streams(&self) -> usize {
394 self.map.lock().expect("lock poisoned").n_open_streams()
395 }
396
397 /// Return a reference to our CongestionControl object.
398 pub(crate) fn ccontrol(&self) -> &CongestionControl {
399 &self.ccontrol
400 }
401
402 /// Return a mutable reference to our CongestionControl object.
403 pub(crate) fn ccontrol_mut(&mut self) -> &mut CongestionControl {
404 &mut self.ccontrol
405 }
406
407 /// Return the RelayCellFormat.
408 pub(crate) fn relay_format(&self) -> RelayCellFormat {
409 self.relay_format
410 }
411
412 /// Take capacity to send `msg`.
413 ///
414 /// See [`OpenStreamEnt::take_capacity_to_send`].
415 //
416 // TODO prop340: This should take a cell or similar, not a message.
417 pub(crate) fn take_capacity_to_send<M: RelayMsg>(
418 &mut self,
419 stream_id: StreamId,
420 msg: &M,
421 ) -> Result<()> {
422 let mut hop_map = self.map.lock().expect("lock poisoned");
423 let Some(StreamEntMut::Open(ent)) = hop_map.get_mut(stream_id) else {
424 warn!(
425 circ_id = %self.unique_id,
426 stream_id = %stream_id,
427 "sending a relay cell for non-existent or non-open stream!",
428 );
429 return Err(Error::CircProto(format!(
430 "tried to send a relay cell on non-open stream {}",
431 sv(stream_id),
432 )));
433 };
434
435 ent.take_capacity_to_send(msg)
436 }
437
438 /// Add an entry to this map using the specified StreamId.
439 #[cfg(feature = "hs-service")]
440 pub(super) fn add_ent_with_id(
441 &self,
442 sink: StreamQueueSender,
443 rx: StreamMpscReceiver<AnyRelayMsg>,
444 rate_limit_updater: watch::Sender<StreamRateLimit>,
445 drain_rate_requester: NotifySender<DrainRateRequest>,
446 stream_id: StreamId,
447 cmd_checker: AnyCmdChecker,
448 ) -> Result<()> {
449 let mut hop_map = self.map.lock().expect("lock poisoned");
450 hop_map.add_ent_with_id(
451 sink,
452 rx,
453 self.build_flow_ctrl(rate_limit_updater, drain_rate_requester)?,
454 stream_id,
455 cmd_checker,
456 )?;
457
458 Ok(())
459 }
460
461 /// Note that we received an END message (or other message indicating the end of
462 /// the stream) on the stream with `id`.
463 ///
464 /// See [`StreamMap::ending_msg_received`](super::streammap::StreamMap::ending_msg_received).
465 #[cfg(feature = "hs-service")]
466 pub(super) fn ending_msg_received(&self, stream_id: StreamId) -> Result<()> {
467 let mut hop_map = self.map.lock().expect("lock poisoned");
468
469 hop_map.ending_msg_received(stream_id)?;
470
471 Ok(())
472 }
473
474 /// Parse a RELAY or RELAY_EARLY cell body.
475 ///
476 /// Requires that the cryptographic checks on the message have already been
477 /// performed
478 pub(super) fn decode(&mut self, cell: BoxedCellBody) -> Result<RelayCellDecoderResult> {
479 self.inbound
480 .decode(cell)
481 .map_err(|e| Error::from_bytes_err(e, "relay cell"))
482 }
483
484 /// Handle `msg`, delivering it to the stream with the specified `streamid` if appropriate.
485 ///
486 /// Returns back the provided `msg`, if the message is an incoming stream request
487 /// that needs to be handled by the calling code.
488 ///
489 // TODO: the above is a bit of a code smell -- we should try to avoid passing the msg
490 // back and forth like this.
491 pub(super) fn handle_msg(
492 &self,
493 cell_counts_toward_windows: bool,
494 streamid: StreamId,
495 msg: UnparsedRelayMsg,
496 ) -> Result<Option<UnparsedRelayMsg>> {
497 let mut hop_map = self.map.lock().expect("lock poisoned");
498 match hop_map.get_mut(streamid) {
499 Some(StreamEntMut::Open(ent)) => {
500 // Can't have a stream level SENDME when congestion control is enabled.
501 let message_closes_stream =
502 Self::deliver_msg_to_stream(streamid, ent, cell_counts_toward_windows, msg)?;
503
504 if message_closes_stream {
505 hop_map.ending_msg_received(streamid)?;
506 }
507 }
508 #[cfg(feature = "hs-service")]
509 Some(StreamEntMut::EndSent(_))
510 if matches!(
511 msg.cmd(),
512 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
513 ) =>
514 {
515 // If the other side is sending us a BEGIN but hasn't yet acknowledged our END
516 // message, just remove the old stream from the map and stop waiting for a
517 // response
518 hop_map.ending_msg_received(streamid)?;
519 return Ok(Some(msg));
520 }
521 Some(StreamEntMut::EndSent(EndSentStreamEnt { half_stream, .. })) => {
522 // We sent an end but maybe the other side hasn't heard.
523
524 match half_stream.handle_msg(msg)? {
525 StreamStatus::Open => {}
526 StreamStatus::Closed => {
527 hop_map.ending_msg_received(streamid)?;
528 }
529 }
530 }
531 #[cfg(feature = "hs-service")]
532 None if matches!(
533 msg.cmd(),
534 RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
535 ) =>
536 {
537 return Ok(Some(msg));
538 }
539 _ => {
540 // No stream wants this message, or ever did.
541 return Err(Error::CircProto(
542 "Cell received on nonexistent stream!?".into(),
543 ));
544 }
545 }
546
547 Ok(None)
548 }
549
550 /// Builds the reactor's flow control handler for a new stream.
551 // TODO: remove the `Result` once we remove the "flowctl-cc" feature
552 #[cfg_attr(feature = "flowctl-cc", expect(clippy::unnecessary_wraps))]
553 fn build_flow_ctrl(
554 &self,
555 rate_limit_updater: watch::Sender<StreamRateLimit>,
556 drain_rate_requester: NotifySender<DrainRateRequest>,
557 ) -> Result<StreamFlowControl> {
558 if self.ccontrol.uses_stream_sendme() {
559 let window = sendme::StreamSendWindow::new(SEND_WINDOW_INIT);
560 Ok(StreamFlowControl::new_window_based(window))
561 } else {
562 cfg_if::cfg_if! {
563 if #[cfg(feature = "flowctl-cc")] {
564 Ok(StreamFlowControl::new_xon_xoff_based(rate_limit_updater, drain_rate_requester))
565 } else {
566 Err(internal!(
567 "`CongestionControl` doesn't use sendmes, but 'flowctl-cc' feature not enabled",
568 ).into())
569 }
570 }
571 }
572 }
573
574 /// Deliver `msg` to the specified open stream entry `ent`.
575 fn deliver_msg_to_stream(
576 streamid: StreamId,
577 ent: &mut OpenStreamEnt,
578 cell_counts_toward_windows: bool,
579 msg: UnparsedRelayMsg,
580 ) -> Result<bool> {
581 use tor_async_utils::SinkTrySend as _;
582 use tor_async_utils::SinkTrySendError as _;
583
584 // The stream for this message exists, and is open.
585
586 // We need to handle SENDME/XON/XOFF messages here, not in the stream's recv() method, or
587 // else we'd never notice them if the stream isn't reading.
588 //
589 // TODO: this logic is the same as `HalfStream::handle_msg`; we should refactor this if
590 // possible
591 match msg.cmd() {
592 RelayCmd::SENDME => {
593 ent.put_for_incoming_sendme(msg)?;
594 return Ok(false);
595 }
596 RelayCmd::XON => {
597 ent.handle_incoming_xon(msg)?;
598 return Ok(false);
599 }
600 RelayCmd::XOFF => {
601 ent.handle_incoming_xoff(msg)?;
602 return Ok(false);
603 }
604 _ => {}
605 }
606
607 let message_closes_stream = ent.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
608
609 if let Err(e) = Pin::new(&mut ent.sink).try_send(msg) {
610 if e.is_full() {
611 // If we get here, we either have a logic bug (!), or an attacker
612 // is sending us more cells than we asked for via congestion control.
613 return Err(Error::CircProto(format!(
614 "Stream sink would block; received too many cells on stream ID {}",
615 sv(streamid),
616 )));
617 }
618 if e.is_disconnected() && cell_counts_toward_windows {
619 // the other side of the stream has gone away; remember
620 // that we received a cell that we couldn't queue for it.
621 //
622 // Later this value will be recorded in a half-stream.
623 ent.dropped += 1;
624 }
625 }
626
627 Ok(message_closes_stream)
628 }
629
630 /// Get the stream map of this hop.
631 pub(crate) fn stream_map(&self) -> &Arc<Mutex<streammap::StreamMap>> {
632 &self.map
633 }
634
635 /// Set the stream map of this hop to `map`.
636 ///
637 /// Returns an error if the existing stream map of the hop has any open stream.
638 pub(crate) fn set_stream_map(
639 &mut self,
640 map: Arc<Mutex<streammap::StreamMap>>,
641 ) -> StdResult<(), Bug> {
642 if self.n_open_streams() != 0 {
643 return Err(internal!("Tried to discard existing open streams?!"));
644 }
645
646 self.map = map;
647
648 Ok(())
649 }
650
651 /// Decrement the limit of outbound cells that may be sent to this hop; give
652 /// an error if it would reach zero.
653 pub(crate) fn decrement_outbound_cell_limit(&mut self) -> Result<()> {
654 try_decrement_cell_limit(&mut self.n_outgoing_cells_permitted)
655 .map_err(|_| Error::ExcessOutboundCells)
656 }
657
658 /// Decrement the limit of inbound cells that may be received from this hop; give
659 /// an error if it would reach zero.
660 pub(crate) fn decrement_inbound_cell_limit(&mut self) -> Result<()> {
661 try_decrement_cell_limit(&mut self.n_incoming_cells_permitted)
662 .map_err(|_| Error::ExcessInboundCells)
663 }
664}
665
666/// If `val` is `Some(1)`, return Err(());
667/// otherwise decrement it (if it is Some) and return Ok(()).
668#[inline]
669fn try_decrement_cell_limit(val: &mut Option<NonZeroU32>) -> StdResult<(), ()> {
670 // This is a bit verbose, but I've confirmed that it optimizes nicely.
671 match val {
672 Some(x) => {
673 let z = u32::from(*x);
674 if z == 1 {
675 Err(())
676 } else {
677 *x = (z - 1).try_into().expect("NonZeroU32 was zero?!");
678 Ok(())
679 }
680 }
681 None => Ok(()),
682 }
683}