use codec::RecvError;
use frame::{self, Frame, Kind, Reason};
use frame::{DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE};
use hpack;
use futures::*;
use bytes::BytesMut;
use std::io;
use tokio_io::AsyncRead;
use tokio_io::codec::length_delimited;
const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: usize = 16 << 20;
#[derive(Debug)]
pub struct FramedRead<T> {
inner: length_delimited::FramedRead<T>,
hpack: hpack::Decoder,
max_header_list_size: usize,
partial: Option<Partial>,
}
#[derive(Debug)]
struct Partial {
frame: Continuable,
buf: BytesMut,
}
#[derive(Debug)]
enum Continuable {
Headers(frame::Headers),
PushPromise(frame::PushPromise),
}
impl<T> FramedRead<T> {
pub fn new(inner: length_delimited::FramedRead<T>) -> FramedRead<T> {
FramedRead {
inner: inner,
hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
partial: None,
}
}
fn decode_frame(&mut self, mut bytes: BytesMut) -> Result<Option<Frame>, RecvError> {
use self::RecvError::*;
trace!("decoding frame from {}B", bytes.len());
let head = frame::Head::parse(&bytes);
if self.partial.is_some() && head.kind() != Kind::Continuation {
proto_err!(conn: "expected CONTINUATION, got {:?}", head.kind());
return Err(Connection(Reason::PROTOCOL_ERROR));
}
let kind = head.kind();
trace!(" -> kind={:?}", kind);
macro_rules! header_block {
($frame:ident, $head:ident, $bytes:ident) => ({
let _ = $bytes.split_to(frame::HEADER_LEN);
let (mut frame, mut payload) = match frame::$frame::load($head, $bytes) {
Ok(res) => res,
Err(frame::Error::InvalidDependencyId) => {
proto_err!(stream: "invalid HEADERS dependency ID");
return Err(Stream {
id: $head.stream_id(),
reason: Reason::PROTOCOL_ERROR,
});
},
Err(e) => {
proto_err!(conn: "failed to load frame; err={:?}", e);
return Err(Connection(Reason::PROTOCOL_ERROR));
}
};
let is_end_headers = frame.is_end_headers();
match frame.load_hpack(&mut payload, self.max_header_list_size, &mut self.hpack) {
Ok(_) => {},
Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_))) if !is_end_headers => {},
Err(frame::Error::Hpack(e)) => {
debug!("connection error COMPRESSION_ERROR -- {:?};", e);
return Err(Connection(Reason::COMPRESSION_ERROR));
},
Err(frame::Error::MalformedMessage) => {
let id = $head.stream_id();
proto_err!(stream: "malformed header block; stream={:?}", id);
return Err(Stream {
id,
reason: Reason::PROTOCOL_ERROR,
});
},
Err(e) => {
proto_err!(conn: "failed HPACK decoding; err={:?}", e);
return Err(Connection(Reason::PROTOCOL_ERROR));
}
}
if is_end_headers {
frame.into()
} else {
trace!("loaded partial header block");
self.partial = Some(Partial {
frame: Continuable::$frame(frame),
buf: payload,
});
return Ok(None);
}
});
}
let frame = match kind {
Kind::Settings => {
let res = frame::Settings::load(head, &bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
Kind::Ping => {
let res = frame::Ping::load(head, &bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load PING frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
Kind::WindowUpdate => {
let res = frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
Kind::Data => {
let _ = bytes.split_to(frame::HEADER_LEN);
let res = frame::Data::load(head, bytes.freeze());
res.map_err(|e| {
proto_err!(conn: "failed to load DATA frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
Kind::Headers => {
header_block!(Headers, head, bytes)
},
Kind::Reset => {
let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load RESET frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
Kind::GoAway => {
let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]);
res.map_err(|e| {
proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
Connection(Reason::PROTOCOL_ERROR)
})?.into()
},
Kind::PushPromise => {
header_block!(PushPromise, head, bytes)
},
Kind::Priority => {
if head.stream_id() == 0 {
proto_err!(conn: "invalid stream ID 0");
return Err(Connection(Reason::PROTOCOL_ERROR));
}
match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) {
Ok(frame) => frame.into(),
Err(frame::Error::InvalidDependencyId) => {
let id = head.stream_id();
proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id);
return Err(Stream {
id,
reason: Reason::PROTOCOL_ERROR,
});
},
Err(e) => {
proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
return Err(Connection(Reason::PROTOCOL_ERROR));
}
}
},
Kind::Continuation => {
let is_end_headers = (head.flag() & 0x4) == 0x4;
let mut partial = match self.partial.take() {
Some(partial) => partial,
None => {
proto_err!(conn: "received unexpected CONTINUATION frame");
return Err(Connection(Reason::PROTOCOL_ERROR));
}
};
if partial.frame.stream_id() != head.stream_id() {
proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID");
return Err(Connection(Reason::PROTOCOL_ERROR));
}
if partial.buf.is_empty() {
partial.buf = bytes.split_off(frame::HEADER_LEN);
} else {
if partial.frame.is_over_size() {
if partial.buf.len() + bytes.len() > self.max_header_list_size {
proto_err!(conn: "CONTINUATION frame header block size over ignorable limit");
return Err(Connection(Reason::COMPRESSION_ERROR));
}
}
partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
}
match partial.frame.load_hpack(&mut partial.buf, self.max_header_list_size, &mut self.hpack) {
Ok(_) => {},
Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_))) if !is_end_headers => {},
Err(frame::Error::MalformedMessage) => {
let id = head.stream_id();
proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
return Err(Stream {
id,
reason: Reason::PROTOCOL_ERROR,
});
},
Err(e) => {
proto_err!(conn: "failed HPACK decoding; err={:?}", e);
return Err(Connection(Reason::PROTOCOL_ERROR));
},
}
if is_end_headers {
partial.frame.into()
} else {
self.partial = Some(partial);
return Ok(None);
}
},
Kind::Unknown => {
return Ok(None);
},
};
Ok(Some(frame))
}
pub fn get_ref(&self) -> &T {
self.inner.get_ref()
}
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut()
}
#[cfg(feature = "unstable")]
#[inline]
pub fn max_frame_size(&self) -> usize {
self.inner.max_frame_length()
}
#[inline]
pub fn set_max_frame_size(&mut self, val: usize) {
assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize);
self.inner.set_max_frame_length(val)
}
#[inline]
pub fn set_max_header_list_size(&mut self, val: usize) {
self.max_header_list_size = val;
}
pub fn set_max_header_table_size(&mut self, val: usize) {
self.hpack.queue_size_update(val);
}
}
impl<T> Stream for FramedRead<T>
where
T: AsyncRead,
{
type Item = Frame;
type Error = RecvError;
fn poll(&mut self) -> Poll<Option<Frame>, Self::Error> {
loop {
trace!("poll");
let bytes = match try_ready!(self.inner.poll().map_err(map_err)) {
Some(bytes) => bytes,
None => return Ok(Async::Ready(None)),
};
trace!("poll; bytes={}B", bytes.len());
if let Some(frame) = self.decode_frame(bytes)? {
debug!("received; frame={:?}", frame);
return Ok(Async::Ready(Some(frame)));
}
}
}
}
fn map_err(err: io::Error) -> RecvError {
use tokio_io::codec::length_delimited::FrameTooBig;
if let io::ErrorKind::InvalidData = err.kind() {
if let Some(custom) = err.get_ref() {
if custom.is::<FrameTooBig>() {
return RecvError::Connection(Reason::FRAME_SIZE_ERROR);
}
}
}
err.into()
}
impl Continuable {
fn stream_id(&self) -> frame::StreamId {
match *self {
Continuable::Headers(ref h) => h.stream_id(),
Continuable::PushPromise(ref p) => p.stream_id(),
}
}
fn is_over_size(&self) -> bool {
match *self {
Continuable::Headers(ref h) => h.is_over_size(),
Continuable::PushPromise(ref p) => p.is_over_size(),
}
}
fn load_hpack(
&mut self,
src: &mut BytesMut,
max_header_list_size: usize,
decoder: &mut hpack::Decoder,
) -> Result<(), frame::Error> {
match *self {
Continuable::Headers(ref mut h) => h.load_hpack(src, max_header_list_size, decoder),
Continuable::PushPromise(ref mut p) => p.load_hpack(src, max_header_list_size, decoder),
}
}
}
impl<T> From<Continuable> for Frame<T> {
fn from(cont: Continuable) -> Self {
match cont {
Continuable::Headers(mut headers) => {
headers.set_end_headers();
headers.into()
}
Continuable::PushPromise(mut push) => {
push.set_end_headers();
push.into()
}
}
}
}