1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(
11 any(feature = "http1", feature = "http2"),
12 any(feature = "client", feature = "server")
13))]
14use futures_util::ready;
15#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16use futures_util::{stream::FusedStream, Stream}; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
18use http::HeaderMap;
19use http_body::{Body, Frame, SizeHint};
20
21#[cfg(all(
22 any(feature = "http1", feature = "http2"),
23 any(feature = "client", feature = "server")
24))]
25use super::DecodedLength;
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27use crate::common::watch;
28#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
29use crate::proto::h2::ping;
30
31#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
32type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
33#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
34type TrailersSender = oneshot::Sender<HeaderMap>;
35
36#[must_use = "streams do nothing unless polled"]
52pub struct Incoming {
53 kind: Kind,
54}
55
56enum Kind {
57 Empty,
58 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
59 Chan {
60 content_length: DecodedLength,
61 want_tx: watch::Sender,
62 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
63 trailers_rx: oneshot::Receiver<HeaderMap>,
64 },
65 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
66 H2 {
67 content_length: DecodedLength,
68 data_done: bool,
69 ping: ping::Recorder,
70 recv: h2::RecvStream,
71 },
72 #[cfg(feature = "ffi")]
73 Ffi(crate::ffi::UserBody),
74}
75
76#[must_use = "Sender does nothing unless sent on"]
90#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
91pub(crate) struct Sender {
92 want_rx: watch::Receiver,
93 data_tx: BodySender,
94 trailers_tx: Option<TrailersSender>,
95}
96
97#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
98const WANT_PENDING: usize = 1;
99#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
100const WANT_READY: usize = 2;
101
102impl Incoming {
103 #[inline]
107 #[cfg(test)]
108 pub(crate) fn channel() -> (Sender, Incoming) {
109 Self::new_channel(DecodedLength::CHUNKED, false)
110 }
111
112 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
113 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
114 let (data_tx, data_rx) = mpsc::channel(0);
115 let (trailers_tx, trailers_rx) = oneshot::channel();
116
117 let want = if wanter { WANT_PENDING } else { WANT_READY };
120
121 let (want_tx, want_rx) = watch::channel(want);
122
123 let tx = Sender {
124 want_rx,
125 data_tx,
126 trailers_tx: Some(trailers_tx),
127 };
128 let rx = Incoming::new(Kind::Chan {
129 content_length,
130 want_tx,
131 data_rx,
132 trailers_rx,
133 });
134
135 (tx, rx)
136 }
137
138 fn new(kind: Kind) -> Incoming {
139 Incoming { kind }
140 }
141
142 #[allow(dead_code)]
143 pub(crate) fn empty() -> Incoming {
144 Incoming::new(Kind::Empty)
145 }
146
147 #[cfg(feature = "ffi")]
148 pub(crate) fn ffi() -> Incoming {
149 Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
150 }
151
152 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
153 pub(crate) fn h2(
154 recv: h2::RecvStream,
155 mut content_length: DecodedLength,
156 ping: ping::Recorder,
157 ) -> Self {
158 if !content_length.is_exact() && recv.is_end_stream() {
161 content_length = DecodedLength::ZERO;
162 }
163
164 Incoming::new(Kind::H2 {
165 data_done: false,
166 ping,
167 content_length,
168 recv,
169 })
170 }
171
172 #[cfg(feature = "ffi")]
173 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
174 match self.kind {
175 Kind::Ffi(ref mut body) => return body,
176 _ => {
177 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
178 }
179 }
180
181 match self.kind {
182 Kind::Ffi(ref mut body) => body,
183 _ => unreachable!(),
184 }
185 }
186}
187
188impl Body for Incoming {
189 type Data = Bytes;
190 type Error = crate::Error;
191
192 fn poll_frame(
193 #[cfg_attr(
194 not(all(
195 any(feature = "http1", feature = "http2"),
196 any(feature = "client", feature = "server")
197 )),
198 allow(unused_mut)
199 )]
200 mut self: Pin<&mut Self>,
201 #[cfg_attr(
202 not(all(
203 any(feature = "http1", feature = "http2"),
204 any(feature = "client", feature = "server")
205 )),
206 allow(unused_variables)
207 )]
208 cx: &mut Context<'_>,
209 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
210 match self.kind {
211 Kind::Empty => Poll::Ready(None),
212 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
213 Kind::Chan {
214 content_length: ref mut len,
215 ref mut data_rx,
216 ref mut want_tx,
217 ref mut trailers_rx,
218 } => {
219 want_tx.send(WANT_READY);
220
221 if !data_rx.is_terminated() {
222 if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
223 len.sub_if(chunk.len() as u64);
224 return Poll::Ready(Some(Ok(Frame::data(chunk))));
225 }
226 }
227
228 match ready!(Pin::new(trailers_rx).poll(cx)) {
230 Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
231 Err(_) => Poll::Ready(None),
232 }
233 }
234 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
235 Kind::H2 {
236 ref mut data_done,
237 ref ping,
238 recv: ref mut h2,
239 content_length: ref mut len,
240 } => {
241 if !*data_done {
242 match ready!(h2.poll_data(cx)) {
243 Some(Ok(bytes)) => {
244 let _ = h2.flow_control().release_capacity(bytes.len());
245 len.sub_if(bytes.len() as u64);
246 ping.record_data(bytes.len());
247 return Poll::Ready(Some(Ok(Frame::data(bytes))));
248 }
249 Some(Err(e)) => {
250 return match e.reason() {
251 Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
254 Poll::Ready(None)
255 }
256 _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
257 };
258 }
259 None => {
260 *data_done = true;
261 }
263 }
264 }
265
266 match ready!(h2.poll_trailers(cx)) {
268 Ok(t) => {
269 ping.record_non_data();
270 Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
271 }
272 Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
273 }
274 }
275
276 #[cfg(feature = "ffi")]
277 Kind::Ffi(ref mut body) => body.poll_data(cx),
278 }
279 }
280
281 fn is_end_stream(&self) -> bool {
282 match self.kind {
283 Kind::Empty => true,
284 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
285 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
286 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
287 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
288 #[cfg(feature = "ffi")]
289 Kind::Ffi(..) => false,
290 }
291 }
292
293 fn size_hint(&self) -> SizeHint {
294 #[cfg(all(
295 any(feature = "http1", feature = "http2"),
296 any(feature = "client", feature = "server")
297 ))]
298 fn opt_len(decoded_length: DecodedLength) -> SizeHint {
299 if let Some(content_length) = decoded_length.into_opt() {
300 SizeHint::with_exact(content_length)
301 } else {
302 SizeHint::default()
303 }
304 }
305
306 match self.kind {
307 Kind::Empty => SizeHint::with_exact(0),
308 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
309 Kind::Chan { content_length, .. } => opt_len(content_length),
310 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
311 Kind::H2 { content_length, .. } => opt_len(content_length),
312 #[cfg(feature = "ffi")]
313 Kind::Ffi(..) => SizeHint::default(),
314 }
315 }
316}
317
318impl fmt::Debug for Incoming {
319 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320 #[cfg(any(
321 all(
322 any(feature = "http1", feature = "http2"),
323 any(feature = "client", feature = "server")
324 ),
325 feature = "ffi"
326 ))]
327 #[derive(Debug)]
328 struct Streaming;
329 #[derive(Debug)]
330 struct Empty;
331
332 let mut builder = f.debug_tuple("Body");
333 match self.kind {
334 Kind::Empty => builder.field(&Empty),
335 #[cfg(any(
336 all(
337 any(feature = "http1", feature = "http2"),
338 any(feature = "client", feature = "server")
339 ),
340 feature = "ffi"
341 ))]
342 _ => builder.field(&Streaming),
343 };
344
345 builder.finish()
346 }
347}
348
349#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
350impl Sender {
351 pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
353 ready!(self.poll_want(cx)?);
355 self.data_tx
356 .poll_ready(cx)
357 .map_err(|_| crate::Error::new_closed())
358 }
359
360 fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
361 match self.want_rx.load(cx) {
362 WANT_READY => Poll::Ready(Ok(())),
363 WANT_PENDING => Poll::Pending,
364 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
365 unexpected => unreachable!("want_rx value: {}", unexpected),
366 }
367 }
368
369 #[cfg(test)]
370 async fn ready(&mut self) -> crate::Result<()> {
371 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
372 }
373
374 #[cfg(test)]
376 #[allow(unused)]
377 pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
378 self.ready().await?;
379 self.data_tx
380 .try_send(Ok(chunk))
381 .map_err(|_| crate::Error::new_closed())
382 }
383
384 #[allow(unused)]
386 pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
387 let tx = match self.trailers_tx.take() {
388 Some(tx) => tx,
389 None => return Err(crate::Error::new_closed()),
390 };
391 tx.send(trailers).map_err(|_| crate::Error::new_closed())
392 }
393
394 #[cfg(feature = "http1")]
407 pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
408 self.data_tx
409 .try_send(Ok(chunk))
410 .map_err(|err| err.into_inner().expect("just sent Ok"))
411 }
412
413 #[cfg(feature = "http1")]
414 pub(crate) fn try_send_trailers(
415 &mut self,
416 trailers: HeaderMap,
417 ) -> Result<(), Option<HeaderMap>> {
418 let tx = match self.trailers_tx.take() {
419 Some(tx) => tx,
420 None => return Err(None),
421 };
422
423 tx.send(trailers).map_err(Some)
424 }
425
426 #[cfg(test)]
427 pub(crate) fn abort(mut self) {
428 self.send_error(crate::Error::new_body_write_aborted());
429 }
430
431 pub(crate) fn send_error(&mut self, err: crate::Error) {
432 let _ = self
433 .data_tx
434 .clone()
436 .try_send(Err(err));
437 }
438}
439
440#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
441impl fmt::Debug for Sender {
442 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
443 #[derive(Debug)]
444 struct Open;
445 #[derive(Debug)]
446 struct Closed;
447
448 let mut builder = f.debug_tuple("Sender");
449 match self.want_rx.peek() {
450 watch::CLOSED => builder.field(&Closed),
451 _ => builder.field(&Open),
452 };
453
454 builder.finish()
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use std::mem;
461 use std::task::Poll;
462
463 use super::{Body, DecodedLength, Incoming, Sender, SizeHint};
464 use http_body_util::BodyExt;
465
466 #[test]
467 fn test_size_of() {
468 let body_size = mem::size_of::<Incoming>();
472 let body_expected_size = mem::size_of::<u64>() * 5;
473 assert!(
474 body_size <= body_expected_size,
475 "Body size = {} <= {}",
476 body_size,
477 body_expected_size,
478 );
479
480 assert_eq!(
483 mem::size_of::<Sender>(),
484 mem::size_of::<usize>() * 5,
485 "Sender"
486 );
487
488 assert_eq!(
489 mem::size_of::<Sender>(),
490 mem::size_of::<Option<Sender>>(),
491 "Option<Sender>"
492 );
493 }
494
495 #[test]
496 fn size_hint() {
497 fn eq(body: Incoming, b: SizeHint, note: &str) {
498 let a = body.size_hint();
499 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
500 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
501 }
502
503 eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
504
505 eq(Incoming::channel().1, SizeHint::new(), "channel");
506
507 eq(
508 Incoming::new_channel(DecodedLength::new(4), false).1,
509 SizeHint::with_exact(4),
510 "channel with length",
511 );
512 }
513
514 #[cfg(not(miri))]
515 #[tokio::test]
516 async fn channel_abort() {
517 let (tx, mut rx) = Incoming::channel();
518
519 tx.abort();
520
521 let err = rx.frame().await.unwrap().unwrap_err();
522 assert!(err.is_body_write_aborted(), "{:?}", err);
523 }
524
525 #[cfg(all(not(miri), feature = "http1"))]
526 #[tokio::test]
527 async fn channel_abort_when_buffer_is_full() {
528 let (mut tx, mut rx) = Incoming::channel();
529
530 tx.try_send_data("chunk 1".into()).expect("send 1");
531 tx.abort();
533
534 let chunk1 = rx
535 .frame()
536 .await
537 .expect("item 1")
538 .expect("chunk 1")
539 .into_data()
540 .unwrap();
541 assert_eq!(chunk1, "chunk 1");
542
543 let err = rx.frame().await.unwrap().unwrap_err();
544 assert!(err.is_body_write_aborted(), "{:?}", err);
545 }
546
547 #[cfg(feature = "http1")]
548 #[test]
549 fn channel_buffers_one() {
550 let (mut tx, _rx) = Incoming::channel();
551
552 tx.try_send_data("chunk 1".into()).expect("send 1");
553
554 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
556 assert_eq!(chunk2, "chunk 2");
557 }
558
559 #[cfg(not(miri))]
560 #[tokio::test]
561 async fn channel_empty() {
562 let (_, mut rx) = Incoming::channel();
563
564 assert!(rx.frame().await.is_none());
565 }
566
567 #[test]
568 fn channel_ready() {
569 let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, false);
570
571 let mut tx_ready = tokio_test::task::spawn(tx.ready());
572
573 assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
574 }
575
576 #[test]
577 fn channel_wanter() {
578 let (mut tx, mut rx) =
579 Incoming::new_channel(DecodedLength::CHUNKED, true);
580
581 let mut tx_ready = tokio_test::task::spawn(tx.ready());
582 let mut rx_data = tokio_test::task::spawn(rx.frame());
583
584 assert!(
585 tx_ready.poll().is_pending(),
586 "tx isn't ready before rx has been polled"
587 );
588
589 assert!(rx_data.poll().is_pending(), "poll rx.data");
590 assert!(tx_ready.is_woken(), "rx poll wakes tx");
591
592 assert!(
593 tx_ready.poll().is_ready(),
594 "tx is ready after rx has been polled"
595 );
596 }
597
598 #[test]
599 fn channel_notices_closure() {
600 let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, true);
601
602 let mut tx_ready = tokio_test::task::spawn(tx.ready());
603
604 assert!(
605 tx_ready.poll().is_pending(),
606 "tx isn't ready before rx has been polled"
607 );
608
609 drop(rx);
610 assert!(tx_ready.is_woken(), "dropping rx wakes tx");
611
612 match tx_ready.poll() {
613 Poll::Ready(Err(ref e)) if e.is_closed() => (),
614 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
615 }
616 }
617}