hyper/body/
incoming.rs

1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(
11    any(feature = "http1", feature = "http2"),
12    any(feature = "client", feature = "server")
13))]
14use futures_util::ready;
15#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver
17#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
18use http::HeaderMap;
19use http_body::{Body, Frame, SizeHint};
20
21#[cfg(all(
22    any(feature = "http1", feature = "http2"),
23    any(feature = "client", feature = "server")
24))]
25use super::DecodedLength;
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27use crate::common::watch;
28#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
29use crate::proto::h2::ping;
30
31#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
32type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
33#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
34type TrailersSender = oneshot::Sender<HeaderMap>;
35
36/// A stream of `Bytes`, used when receiving bodies from the network.
37///
38/// Note that Users should not instantiate this struct directly. When working with the hyper client,
39/// `Incoming` is returned to you in responses. Similarly, when operating with the hyper server,
40/// it is provided within requests.
41///
42/// # Examples
43///
44/// ```rust,ignore
45/// async fn echo(
46///    req: Request<hyper::body::Incoming>,
47/// ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
48///    //Here, you can process `Incoming`
49/// }
50/// ```
51#[must_use = "streams do nothing unless polled"]
52pub struct Incoming {
53    kind: Kind,
54}
55
56enum Kind {
57    Empty,
58    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
59    Chan {
60        content_length: DecodedLength,
61        want_tx: watch::Sender,
62        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
63        trailers_rx: oneshot::Receiver<HeaderMap>,
64    },
65    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
66    H2 {
67        content_length: DecodedLength,
68        data_done: bool,
69        ping: ping::Recorder,
70        recv: h2::RecvStream,
71    },
72    #[cfg(feature = "ffi")]
73    Ffi(crate::ffi::UserBody),
74}
75
76/// A sender half created through [`Body::channel()`].
77///
78/// Useful when wanting to stream chunks from another thread.
79///
80/// ## Body Closing
81///
82/// Note that the request body will always be closed normally when the sender is dropped (meaning
83/// that the empty terminating chunk will be sent to the remote). If you desire to close the
84/// connection with an incomplete response (e.g. in the case of an error during asynchronous
85/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
86///
87/// [`Body::channel()`]: struct.Body.html#method.channel
88/// [`Sender::abort()`]: struct.Sender.html#method.abort
89#[must_use = "Sender does nothing unless sent on"]
90#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
91pub(crate) struct Sender {
92    want_rx: watch::Receiver,
93    data_tx: BodySender,
94    trailers_tx: Option<TrailersSender>,
95}
96
97#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
98const WANT_PENDING: usize = 1;
99#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
100const WANT_READY: usize = 2;
101
102impl Incoming {
103    /// Create a `Body` stream with an associated sender half.
104    ///
105    /// Useful when wanting to stream chunks from another thread.
106    #[inline]
107    #[cfg(test)]
108    pub(crate) fn channel() -> (Sender, Incoming) {
109        Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
110    }
111
112    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
113    pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
114        let (data_tx, data_rx) = mpsc::channel(0);
115        let (trailers_tx, trailers_rx) = oneshot::channel();
116
117        // If wanter is true, `Sender::poll_ready()` won't becoming ready
118        // until the `Body` has been polled for data once.
119        let want = if wanter { WANT_PENDING } else { WANT_READY };
120
121        let (want_tx, want_rx) = watch::channel(want);
122
123        let tx = Sender {
124            want_rx,
125            data_tx,
126            trailers_tx: Some(trailers_tx),
127        };
128        let rx = Incoming::new(Kind::Chan {
129            content_length,
130            want_tx,
131            data_rx,
132            trailers_rx,
133        });
134
135        (tx, rx)
136    }
137
138    fn new(kind: Kind) -> Incoming {
139        Incoming { kind }
140    }
141
142    #[allow(dead_code)]
143    pub(crate) fn empty() -> Incoming {
144        Incoming::new(Kind::Empty)
145    }
146
147    #[cfg(feature = "ffi")]
148    pub(crate) fn ffi() -> Incoming {
149        Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
150    }
151
152    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
153    pub(crate) fn h2(
154        recv: h2::RecvStream,
155        mut content_length: DecodedLength,
156        ping: ping::Recorder,
157    ) -> Self {
158        // If the stream is already EOS, then the "unknown length" is clearly
159        // actually ZERO.
160        if !content_length.is_exact() && recv.is_end_stream() {
161            content_length = DecodedLength::ZERO;
162        }
163
164        Incoming::new(Kind::H2 {
165            data_done: false,
166            ping,
167            content_length,
168            recv,
169        })
170    }
171
172    #[cfg(feature = "ffi")]
173    pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
174        match self.kind {
175            Kind::Ffi(ref mut body) => return body,
176            _ => {
177                self.kind = Kind::Ffi(crate::ffi::UserBody::new());
178            }
179        }
180
181        match self.kind {
182            Kind::Ffi(ref mut body) => body,
183            _ => unreachable!(),
184        }
185    }
186}
187
188impl Body for Incoming {
189    type Data = Bytes;
190    type Error = crate::Error;
191
192    fn poll_frame(
193        #[cfg_attr(
194            not(all(
195                any(feature = "http1", feature = "http2"),
196                any(feature = "client", feature = "server")
197            )),
198            allow(unused_mut)
199        )]
200        mut self: Pin<&mut Self>,
201        #[cfg_attr(
202            not(all(
203                any(feature = "http1", feature = "http2"),
204                any(feature = "client", feature = "server")
205            )),
206            allow(unused_variables)
207        )]
208        cx: &mut Context<'_>,
209    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
210        match self.kind {
211            Kind::Empty => Poll::Ready(None),
212            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
213            Kind::Chan {
214                content_length: ref mut len,
215                ref mut data_rx,
216                ref mut want_tx,
217                ref mut trailers_rx,
218            } => {
219                want_tx.send(WANT_READY);
220
221                if !data_rx.is_terminated() {
222                    if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
223                        len.sub_if(chunk.len() as u64);
224                        return Poll::Ready(Some(Ok(Frame::data(chunk))));
225                    }
226                }
227
228                // check trailers after data is terminated
229                match ready!(Pin::new(trailers_rx).poll(cx)) {
230                    Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
231                    Err(_) => Poll::Ready(None),
232                }
233            }
234            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
235            Kind::H2 {
236                ref mut data_done,
237                ref ping,
238                recv: ref mut h2,
239                content_length: ref mut len,
240            } => {
241                if !*data_done {
242                    match ready!(h2.poll_data(cx)) {
243                        Some(Ok(bytes)) => {
244                            let _ = h2.flow_control().release_capacity(bytes.len());
245                            len.sub_if(bytes.len() as u64);
246                            ping.record_data(bytes.len());
247                            return Poll::Ready(Some(Ok(Frame::data(bytes))));
248                        }
249                        Some(Err(e)) => {
250                            return match e.reason() {
251                                // These reasons should cause the body reading to stop, but not fail it.
252                                // The same logic as for `Read for H2Upgraded` is applied here.
253                                Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
254                                    Poll::Ready(None)
255                                }
256                                _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
257                            };
258                        }
259                        None => {
260                            *data_done = true;
261                            // fall through to trailers
262                        }
263                    }
264                }
265
266                // after data, check trailers
267                match ready!(h2.poll_trailers(cx)) {
268                    Ok(t) => {
269                        ping.record_non_data();
270                        Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
271                    }
272                    Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
273                }
274            }
275
276            #[cfg(feature = "ffi")]
277            Kind::Ffi(ref mut body) => body.poll_data(cx),
278        }
279    }
280
281    fn is_end_stream(&self) -> bool {
282        match self.kind {
283            Kind::Empty => true,
284            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
285            Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
286            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
287            Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
288            #[cfg(feature = "ffi")]
289            Kind::Ffi(..) => false,
290        }
291    }
292
293    fn size_hint(&self) -> SizeHint {
294        #[cfg(all(
295            any(feature = "http1", feature = "http2"),
296            any(feature = "client", feature = "server")
297        ))]
298        fn opt_len(decoded_length: DecodedLength) -> SizeHint {
299            if let Some(content_length) = decoded_length.into_opt() {
300                SizeHint::with_exact(content_length)
301            } else {
302                SizeHint::default()
303            }
304        }
305
306        match self.kind {
307            Kind::Empty => SizeHint::with_exact(0),
308            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
309            Kind::Chan { content_length, .. } => opt_len(content_length),
310            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
311            Kind::H2 { content_length, .. } => opt_len(content_length),
312            #[cfg(feature = "ffi")]
313            Kind::Ffi(..) => SizeHint::default(),
314        }
315    }
316}
317
318impl fmt::Debug for Incoming {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        #[cfg(any(
321            all(
322                any(feature = "http1", feature = "http2"),
323                any(feature = "client", feature = "server")
324            ),
325            feature = "ffi"
326        ))]
327        #[derive(Debug)]
328        struct Streaming;
329        #[derive(Debug)]
330        struct Empty;
331
332        let mut builder = f.debug_tuple("Body");
333        match self.kind {
334            Kind::Empty => builder.field(&Empty),
335            #[cfg(any(
336                all(
337                    any(feature = "http1", feature = "http2"),
338                    any(feature = "client", feature = "server")
339                ),
340                feature = "ffi"
341            ))]
342            _ => builder.field(&Streaming),
343        };
344
345        builder.finish()
346    }
347}
348
349#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
350impl Sender {
351    /// Check to see if this `Sender` can send more data.
352    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
353        // Check if the receiver end has tried polling for the body yet
354        ready!(self.poll_want(cx)?);
355        self.data_tx
356            .poll_ready(cx)
357            .map_err(|_| crate::Error::new_closed())
358    }
359
360    fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
361        match self.want_rx.load(cx) {
362            WANT_READY => Poll::Ready(Ok(())),
363            WANT_PENDING => Poll::Pending,
364            watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
365            unexpected => unreachable!("want_rx value: {}", unexpected),
366        }
367    }
368
369    #[cfg(test)]
370    async fn ready(&mut self) -> crate::Result<()> {
371        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
372    }
373
374    /// Send data on data channel when it is ready.
375    #[cfg(test)]
376    #[allow(unused)]
377    pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
378        self.ready().await?;
379        self.data_tx
380            .try_send(Ok(chunk))
381            .map_err(|_| crate::Error::new_closed())
382    }
383
384    /// Send trailers on trailers channel.
385    #[allow(unused)]
386    pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
387        let tx = match self.trailers_tx.take() {
388            Some(tx) => tx,
389            None => return Err(crate::Error::new_closed()),
390        };
391        tx.send(trailers).map_err(|_| crate::Error::new_closed())
392    }
393
394    /// Try to send data on this channel.
395    ///
396    /// # Errors
397    ///
398    /// Returns `Err(Bytes)` if the channel could not (currently) accept
399    /// another `Bytes`.
400    ///
401    /// # Note
402    ///
403    /// This is mostly useful for when trying to send from some other thread
404    /// that doesn't have an async context. If in an async context, prefer
405    /// `send_data()` instead.
406    #[cfg(feature = "http1")]
407    pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
408        self.data_tx
409            .try_send(Ok(chunk))
410            .map_err(|err| err.into_inner().expect("just sent Ok"))
411    }
412
413    #[cfg(feature = "http1")]
414    pub(crate) fn try_send_trailers(
415        &mut self,
416        trailers: HeaderMap,
417    ) -> Result<(), Option<HeaderMap>> {
418        let tx = match self.trailers_tx.take() {
419            Some(tx) => tx,
420            None => return Err(None),
421        };
422
423        tx.send(trailers).map_err(Some)
424    }
425
426    #[cfg(test)]
427    pub(crate) fn abort(mut self) {
428        self.send_error(crate::Error::new_body_write_aborted());
429    }
430
431    pub(crate) fn send_error(&mut self, err: crate::Error) {
432        let _ = self
433            .data_tx
434            // clone so the send works even if buffer is full
435            .clone()
436            .try_send(Err(err));
437    }
438}
439
440#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
441impl fmt::Debug for Sender {
442    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
443        #[derive(Debug)]
444        struct Open;
445        #[derive(Debug)]
446        struct Closed;
447
448        let mut builder = f.debug_tuple("Sender");
449        match self.want_rx.peek() {
450            watch::CLOSED => builder.field(&Closed),
451            _ => builder.field(&Open),
452        };
453
454        builder.finish()
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use std::mem;
461    use std::task::Poll;
462
463    use super::{Body, DecodedLength, Incoming, Sender, SizeHint};
464    use http_body_util::BodyExt;
465
466    #[test]
467    fn test_size_of() {
468        // These are mostly to help catch *accidentally* increasing
469        // the size by too much.
470
471        let body_size = mem::size_of::<Incoming>();
472        let body_expected_size = mem::size_of::<u64>() * 5;
473        assert!(
474            body_size <= body_expected_size,
475            "Body size = {} <= {}",
476            body_size,
477            body_expected_size,
478        );
479
480        //assert_eq!(body_size, mem::size_of::<Option<Incoming>>(), "Option<Incoming>");
481
482        assert_eq!(
483            mem::size_of::<Sender>(),
484            mem::size_of::<usize>() * 5,
485            "Sender"
486        );
487
488        assert_eq!(
489            mem::size_of::<Sender>(),
490            mem::size_of::<Option<Sender>>(),
491            "Option<Sender>"
492        );
493    }
494
495    #[test]
496    fn size_hint() {
497        fn eq(body: Incoming, b: SizeHint, note: &str) {
498            let a = body.size_hint();
499            assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
500            assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
501        }
502
503        eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
504
505        eq(Incoming::channel().1, SizeHint::new(), "channel");
506
507        eq(
508            Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
509            SizeHint::with_exact(4),
510            "channel with length",
511        );
512    }
513
514    #[cfg(not(miri))]
515    #[tokio::test]
516    async fn channel_abort() {
517        let (tx, mut rx) = Incoming::channel();
518
519        tx.abort();
520
521        let err = rx.frame().await.unwrap().unwrap_err();
522        assert!(err.is_body_write_aborted(), "{:?}", err);
523    }
524
525    #[cfg(all(not(miri), feature = "http1"))]
526    #[tokio::test]
527    async fn channel_abort_when_buffer_is_full() {
528        let (mut tx, mut rx) = Incoming::channel();
529
530        tx.try_send_data("chunk 1".into()).expect("send 1");
531        // buffer is full, but can still send abort
532        tx.abort();
533
534        let chunk1 = rx
535            .frame()
536            .await
537            .expect("item 1")
538            .expect("chunk 1")
539            .into_data()
540            .unwrap();
541        assert_eq!(chunk1, "chunk 1");
542
543        let err = rx.frame().await.unwrap().unwrap_err();
544        assert!(err.is_body_write_aborted(), "{:?}", err);
545    }
546
547    #[cfg(feature = "http1")]
548    #[test]
549    fn channel_buffers_one() {
550        let (mut tx, _rx) = Incoming::channel();
551
552        tx.try_send_data("chunk 1".into()).expect("send 1");
553
554        // buffer is now full
555        let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
556        assert_eq!(chunk2, "chunk 2");
557    }
558
559    #[cfg(not(miri))]
560    #[tokio::test]
561    async fn channel_empty() {
562        let (_, mut rx) = Incoming::channel();
563
564        assert!(rx.frame().await.is_none());
565    }
566
567    #[test]
568    fn channel_ready() {
569        let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
570
571        let mut tx_ready = tokio_test::task::spawn(tx.ready());
572
573        assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
574    }
575
576    #[test]
577    fn channel_wanter() {
578        let (mut tx, mut rx) =
579            Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
580
581        let mut tx_ready = tokio_test::task::spawn(tx.ready());
582        let mut rx_data = tokio_test::task::spawn(rx.frame());
583
584        assert!(
585            tx_ready.poll().is_pending(),
586            "tx isn't ready before rx has been polled"
587        );
588
589        assert!(rx_data.poll().is_pending(), "poll rx.data");
590        assert!(tx_ready.is_woken(), "rx poll wakes tx");
591
592        assert!(
593            tx_ready.poll().is_ready(),
594            "tx is ready after rx has been polled"
595        );
596    }
597
598    #[test]
599    fn channel_notices_closure() {
600        let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
601
602        let mut tx_ready = tokio_test::task::spawn(tx.ready());
603
604        assert!(
605            tx_ready.poll().is_pending(),
606            "tx isn't ready before rx has been polled"
607        );
608
609        drop(rx);
610        assert!(tx_ready.is_woken(), "dropping rx wakes tx");
611
612        match tx_ready.poll() {
613            Poll::Ready(Err(ref e)) if e.is_closed() => (),
614            unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
615        }
616    }
617}