use std::borrow::Cow;
use std::error::Error as StdError;
use std::fmt;
use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::{Async, Future, Poll, Stream, Sink, AsyncSink, StartSend};
use tokio_buf::SizeHint;
use h2;
use http::HeaderMap;
use common::Never;
use super::internal::{FullDataArg, FullDataRet};
use super::{Chunk, Payload};
use upgrade::OnUpgrade;
type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
#[must_use = "streams do nothing unless polled"]
pub struct Body {
kind: Kind,
extra: Option<Box<Extra>>,
}
enum Kind {
Once(Option<Chunk>),
Chan {
content_length: Option<u64>,
abort_rx: oneshot::Receiver<()>,
rx: mpsc::Receiver<Result<Chunk, ::Error>>,
},
H2 {
content_length: Option<u64>,
recv: h2::RecvStream,
},
Wrapped(Box<dyn Stream<Item = Chunk, Error = Box<dyn StdError + Send + Sync>> + Send>),
}
struct Extra {
delayed_eof: Option<DelayEof>,
on_upgrade: OnUpgrade,
}
type DelayEofUntil = oneshot::Receiver<Never>;
enum DelayEof {
NotEof(DelayEofUntil),
Eof(DelayEofUntil),
}
#[must_use = "Sender does nothing unless sent on"]
#[derive(Debug)]
pub struct Sender {
abort_tx: oneshot::Sender<()>,
tx: BodySender,
}
impl Body {
#[inline]
pub fn empty() -> Body {
Body::new(Kind::Once(None))
}
#[inline]
pub fn channel() -> (Sender, Body) {
Self::new_channel(None)
}
pub(crate) fn new_channel(content_length: Option<u64>) -> (Sender, Body) {
let (tx, rx) = mpsc::channel(0);
let (abort_tx, abort_rx) = oneshot::channel();
let tx = Sender {
abort_tx: abort_tx,
tx: tx,
};
let rx = Body::new(Kind::Chan {
content_length,
abort_rx,
rx,
});
(tx, rx)
}
pub fn wrap_stream<S>(stream: S) -> Body
where
S: Stream + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Chunk: From<S::Item>,
{
let mapped = stream.map(Chunk::from).map_err(Into::into);
Body::new(Kind::Wrapped(Box::new(mapped)))
}
pub fn on_upgrade(self) -> OnUpgrade {
self
.extra
.map(|ex| ex.on_upgrade)
.unwrap_or_else(OnUpgrade::none)
}
fn new(kind: Kind) -> Body {
Body {
kind: kind,
extra: None,
}
}
pub(crate) fn h2(recv: h2::RecvStream, content_length: Option<u64>) -> Self {
Body::new(Kind::H2 {
content_length,
recv,
})
}
pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade");
let extra = self.extra_mut();
debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice");
extra.on_upgrade = upgrade;
}
pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
}
fn take_delayed_eof(&mut self) -> Option<DelayEof> {
self
.extra
.as_mut()
.and_then(|extra| extra.delayed_eof.take())
}
fn extra_mut(&mut self) -> &mut Extra {
self
.extra
.get_or_insert_with(|| Box::new(Extra {
delayed_eof: None,
on_upgrade: OnUpgrade::none(),
}))
}
fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.take_delayed_eof() {
Some(DelayEof::NotEof(mut delay)) => {
match self.poll_inner() {
ok @ Ok(Async::Ready(Some(..))) |
ok @ Ok(Async::NotReady) => {
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
ok
},
Ok(Async::Ready(None)) => match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Ok(Async::Ready(None))
},
},
Err(e) => Err(e),
}
},
Some(DelayEof::Eof(mut delay)) => {
match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Ok(Async::Ready(None))
},
}
},
None => self.poll_inner(),
}
}
fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.kind {
Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
Kind::Chan {
content_length: ref mut len,
ref mut rx,
ref mut abort_rx,
} => {
if let Ok(Async::Ready(())) = abort_rx.poll() {
return Err(::Error::new_body_write_aborted());
}
match rx.poll().expect("mpsc cannot error") {
Async::Ready(Some(Ok(chunk))) => {
if let Some(ref mut len) = *len {
debug_assert!(*len >= chunk.len() as u64);
*len = *len - chunk.len() as u64;
}
Ok(Async::Ready(Some(chunk)))
}
Async::Ready(Some(Err(err))) => Err(err),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
Kind::H2 {
recv: ref mut h2, ..
} => h2
.poll()
.map(|async| {
async.map(|opt| {
opt.map(|bytes| {
let _ = h2.release_capacity().release_capacity(bytes.len());
Chunk::from(bytes)
})
})
})
.map_err(::Error::new_body),
Kind::Wrapped(ref mut s) => s.poll().map_err(::Error::new_body),
}
}
}
impl Default for Body {
#[inline]
fn default() -> Body {
Body::empty()
}
}
impl Payload for Body {
type Data = Chunk;
type Error = ::Error;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
self.poll_eof()
}
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
match self.kind {
Kind::H2 {
recv: ref mut h2, ..
} => h2.poll_trailers().map_err(::Error::new_h2),
_ => Ok(Async::Ready(None)),
}
}
fn is_end_stream(&self) -> bool {
match self.kind {
Kind::Once(ref val) => val.is_none(),
Kind::Chan { content_length, .. } => content_length == Some(0),
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
Kind::Wrapped(..) => false,
}
}
fn content_length(&self) -> Option<u64> {
match self.kind {
Kind::Once(Some(ref val)) => Some(val.len() as u64),
Kind::Once(None) => Some(0),
Kind::Wrapped(..) => None,
Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => content_length,
}
}
#[doc(hidden)]
fn __hyper_full_data(&mut self, _: FullDataArg) -> FullDataRet<Self::Data> {
match self.kind {
Kind::Once(ref mut val) => FullDataRet(val.take()),
_ => FullDataRet(None),
}
}
}
impl ::http_body::Body for Body {
type Data = Chunk;
type Error = ::Error;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
<Self as Payload>::poll_data(self)
}
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
<Self as Payload>::poll_trailers(self)
}
fn is_end_stream(&self) -> bool {
<Self as Payload>::is_end_stream(self)
}
fn size_hint(&self) -> SizeHint {
let mut hint = SizeHint::default();
let content_length = <Self as Payload>::content_length(self);
if let Some(size) = content_length {
hint.set_upper(size);
hint.set_lower(size)
}
hint
}
}
impl Stream for Body {
type Item = Chunk;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.poll_data()
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
#[derive(Debug)]
struct Streaming;
#[derive(Debug)]
struct Empty;
#[derive(Debug)]
struct Once<'a>(&'a Chunk);
let mut builder = f.debug_tuple("Body");
match self.kind {
Kind::Once(None) => builder.field(&Empty),
Kind::Once(Some(ref chunk)) => builder.field(&Once(chunk)),
_ => builder.field(&Streaming),
};
builder.finish()
}
}
impl Sender {
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
match self.abort_tx.poll_cancel() {
Ok(Async::Ready(())) | Err(_) => return Err(::Error::new_closed()),
Ok(Async::NotReady) => (),
}
self.tx.poll_ready().map_err(|_| ::Error::new_closed())
}
pub fn send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> {
self.tx
.try_send(Ok(chunk))
.map_err(|err| err.into_inner().expect("just sent Ok"))
}
pub fn abort(self) {
let _ = self.abort_tx.send(());
}
pub(crate) fn send_error(&mut self, err: ::Error) {
let _ = self.tx.try_send(Err(err));
}
}
impl Sink for Sender {
type SinkItem = Chunk;
type SinkError = ::Error;
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
Ok(Async::Ready(()))
}
fn start_send(&mut self, msg: Chunk) -> StartSend<Self::SinkItem, Self::SinkError> {
match self.poll_ready()? {
Async::Ready(_) => {
self.send_data(msg).map_err(|_| ::Error::new_closed())?;
Ok(AsyncSink::Ready)
}
Async::NotReady => Ok(AsyncSink::NotReady(msg)),
}
}
}
impl From<Chunk> for Body {
#[inline]
fn from(chunk: Chunk) -> Body {
if chunk.is_empty() {
Body::empty()
} else {
Body::new(Kind::Once(Some(chunk)))
}
}
}
impl
From<Box<dyn Stream<Item = Chunk, Error = Box<dyn StdError + Send + Sync>> + Send + 'static>>
for Body
{
#[inline]
fn from(
stream: Box<
dyn Stream<Item = Chunk, Error = Box<dyn StdError + Send + Sync>> + Send + 'static,
>,
) -> Body {
Body::new(Kind::Wrapped(stream))
}
}
impl From<Bytes> for Body {
#[inline]
fn from(bytes: Bytes) -> Body {
Body::from(Chunk::from(bytes))
}
}
impl From<Vec<u8>> for Body {
#[inline]
fn from(vec: Vec<u8>) -> Body {
Body::from(Chunk::from(vec))
}
}
impl From<&'static [u8]> for Body {
#[inline]
fn from(slice: &'static [u8]) -> Body {
Body::from(Chunk::from(slice))
}
}
impl From<Cow<'static, [u8]>> for Body {
#[inline]
fn from(cow: Cow<'static, [u8]>) -> Body {
match cow {
Cow::Borrowed(b) => Body::from(b),
Cow::Owned(o) => Body::from(o),
}
}
}
impl From<String> for Body {
#[inline]
fn from(s: String) -> Body {
Body::from(Chunk::from(s.into_bytes()))
}
}
impl From<&'static str> for Body {
#[inline]
fn from(slice: &'static str) -> Body {
Body::from(Chunk::from(slice.as_bytes()))
}
}
impl From<Cow<'static, str>> for Body {
#[inline]
fn from(cow: Cow<'static, str>) -> Body {
match cow {
Cow::Borrowed(b) => Body::from(b),
Cow::Owned(o) => Body::from(o),
}
}
}
#[test]
fn test_body_stream_concat() {
let body = Body::from("hello world");
let total = body.concat2().wait().unwrap();
assert_eq!(total.as_ref(), b"hello world");
}