1use crate::{BoxError, Error};
4use bytes::Bytes;
5use futures_util::stream::Stream;
6use futures_util::TryStream;
7use http_body::{Body as _, Frame};
8use http_body_util::BodyExt;
9use pin_project_lite::pin_project;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use sync_wrapper::SyncWrapper;
13
14type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
15
16fn boxed<B>(body: B) -> BoxBody
17where
18 B: http_body::Body<Data = Bytes> + Send + 'static,
19 B::Error: Into<BoxError>,
20{
21 try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
22}
23
24pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
25where
26 T: 'static,
27 K: Send + 'static,
28{
29 let mut k = Some(k);
30 if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
31 Ok(k.take().unwrap())
32 } else {
33 Err(k.unwrap())
34 }
35}
36
37#[derive(Debug)]
39pub struct Body(BoxBody);
40
41impl Body {
42 pub fn new<B>(body: B) -> Self
44 where
45 B: http_body::Body<Data = Bytes> + Send + 'static,
46 B::Error: Into<BoxError>,
47 {
48 try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
49 }
50
51 pub fn empty() -> Self {
53 Self::new(http_body_util::Empty::new())
54 }
55
56 pub fn from_stream<S>(stream: S) -> Self
60 where
61 S: TryStream + Send + 'static,
62 S::Ok: Into<Bytes>,
63 S::Error: Into<BoxError>,
64 {
65 Self::new(StreamBody {
66 stream: SyncWrapper::new(stream),
67 })
68 }
69
70 pub fn into_data_stream(self) -> BodyDataStream {
77 BodyDataStream { inner: self }
78 }
79}
80
81impl Default for Body {
82 fn default() -> Self {
83 Self::empty()
84 }
85}
86
87impl From<()> for Body {
88 fn from(_: ()) -> Self {
89 Self::empty()
90 }
91}
92
93macro_rules! body_from_impl {
94 ($ty:ty) => {
95 impl From<$ty> for Body {
96 fn from(buf: $ty) -> Self {
97 Self::new(http_body_util::Full::from(buf))
98 }
99 }
100 };
101}
102
103body_from_impl!(&'static [u8]);
104body_from_impl!(std::borrow::Cow<'static, [u8]>);
105body_from_impl!(Vec<u8>);
106
107body_from_impl!(&'static str);
108body_from_impl!(std::borrow::Cow<'static, str>);
109body_from_impl!(String);
110
111body_from_impl!(Bytes);
112
113impl http_body::Body for Body {
114 type Data = Bytes;
115 type Error = Error;
116
117 #[inline]
118 fn poll_frame(
119 mut self: Pin<&mut Self>,
120 cx: &mut Context<'_>,
121 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
122 Pin::new(&mut self.0).poll_frame(cx)
123 }
124
125 #[inline]
126 fn size_hint(&self) -> http_body::SizeHint {
127 self.0.size_hint()
128 }
129
130 #[inline]
131 fn is_end_stream(&self) -> bool {
132 self.0.is_end_stream()
133 }
134}
135
136#[derive(Debug)]
140pub struct BodyDataStream {
141 inner: Body,
142}
143
144impl Stream for BodyDataStream {
145 type Item = Result<Bytes, Error>;
146
147 #[inline]
148 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149 loop {
150 match futures_util::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
151 Some(frame) => match frame.into_data() {
152 Ok(data) => return Poll::Ready(Some(Ok(data))),
153 Err(_frame) => {}
154 },
155 None => return Poll::Ready(None),
156 }
157 }
158 }
159}
160
161impl http_body::Body for BodyDataStream {
162 type Data = Bytes;
163 type Error = Error;
164
165 #[inline]
166 fn poll_frame(
167 mut self: Pin<&mut Self>,
168 cx: &mut Context<'_>,
169 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
170 Pin::new(&mut self.inner).poll_frame(cx)
171 }
172
173 #[inline]
174 fn is_end_stream(&self) -> bool {
175 self.inner.is_end_stream()
176 }
177
178 #[inline]
179 fn size_hint(&self) -> http_body::SizeHint {
180 self.inner.size_hint()
181 }
182}
183
184pin_project! {
185 struct StreamBody<S> {
186 #[pin]
187 stream: SyncWrapper<S>,
188 }
189}
190
191impl<S> http_body::Body for StreamBody<S>
192where
193 S: TryStream,
194 S::Ok: Into<Bytes>,
195 S::Error: Into<BoxError>,
196{
197 type Data = Bytes;
198 type Error = Error;
199
200 fn poll_frame(
201 self: Pin<&mut Self>,
202 cx: &mut Context<'_>,
203 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
204 let stream = self.project().stream.get_pin_mut();
205 match futures_util::ready!(stream.try_poll_next(cx)) {
206 Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
207 Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
208 None => Poll::Ready(None),
209 }
210 }
211}
212
213#[test]
214fn test_try_downcast() {
215 assert_eq!(try_downcast::<i32, _>(5_u32), Err(5_u32));
216 assert_eq!(try_downcast::<i32, _>(5_i32), Ok(5_i32));
217}