Skip to content

Commit

Permalink
Merge pull request #13 from sagebind/fix/follow-response-redirects
Browse files Browse the repository at this point in the history
Fix basic redirect follow handling
  • Loading branch information
sagebind authored Sep 18, 2018
2 parents 02708b8 + 9dfbf33 commit dc06254
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 70 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ http = "0.1"
lazy_static = "1"
lazycell = "1.0"
log = "0.4"
regex = "1.0"
slab = "0.4"
withers_derive = "0.2"

Expand Down
10 changes: 5 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,37 +84,37 @@ impl Client {
/// Sends an HTTP GET request.
///
/// The response body is provided as a stream that may only be consumed once.
pub fn get(&self, uri: &str) -> Result<Response<Body>, Error> {
pub fn get<U>(&self, uri: U) -> Result<Response<Body>, Error> where http::Uri: http::HttpTryFrom<U> {
let request = http::Request::get(uri).body(Body::default())?;
self.send(request)
}

/// Sends an HTTP HEAD request.
pub fn head(&self, uri: &str) -> Result<Response<Body>, Error> {
pub fn head<U>(&self, uri: U) -> Result<Response<Body>, Error> where http::Uri: http::HttpTryFrom<U> {
let request = http::Request::head(uri).body(Body::default())?;
self.send(request)
}

/// Sends an HTTP POST request.
///
/// The response body is provided as a stream that may only be consumed once.
pub fn post(&self, uri: &str, body: impl Into<Body>) -> Result<Response<Body>, Error> {
pub fn post<U>(&self, uri: U, body: impl Into<Body>) -> Result<Response<Body>, Error> where http::Uri: http::HttpTryFrom<U> {
let request = http::Request::post(uri).body(body)?;
self.send(request)
}

/// Sends an HTTP PUT request.
///
/// The response body is provided as a stream that may only be consumed once.
pub fn put(&self, uri: &str, body: impl Into<Body>) -> Result<Response<Body>, Error> {
pub fn put<U>(&self, uri: U, body: impl Into<Body>) -> Result<Response<Body>, Error> where http::Uri: http::HttpTryFrom<U> {
let request = http::Request::put(uri).body(body)?;
self.send(request)
}

/// Sends an HTTP DELETE request.
///
/// The response body is provided as a stream that may only be consumed once.
pub fn delete(&self, uri: &str) -> Result<Response<Body>, Error> {
pub fn delete<U>(&self, uri: U) -> Result<Response<Body>, Error> where http::Uri: http::HttpTryFrom<U> {
let request = http::Request::delete(uri).body(Body::default())?;
self.send(request)
}
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ impl From<curl::Error> for Error {
Error::SSLEngineError(error.extra_description().map(str::to_owned))
} else if error.is_operation_timedout() {
Error::Timeout
} else if error.is_too_many_redirects() {
Error::TooManyRedirects
} else {
Error::Curl(error.description().to_owned())
}
Expand Down
5 changes: 3 additions & 2 deletions src/internal/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,10 @@ impl Agent {
}

fn complete_request(&mut self, token: usize) -> Result<(), Error> {
debug!("request with token {} completed", token);
let handle = self.requests.remove(token);
let handle = self.multi.remove2(handle)?;
handle.get_ref().complete();
let mut handle = self.multi.remove2(handle)?;
handle.get_mut().complete();

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions src/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::ascii;

pub mod agent;
pub mod notify;
pub mod parse;
pub mod request;

pub fn format_byte_string(bytes: impl AsRef<[u8]>) -> String {
Expand Down
28 changes: 28 additions & 0 deletions src/internal/parse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use http::{StatusCode, Version};
use http::header::*;
use regex::bytes::Regex;

lazy_static! {
static ref STATUS_LINE_REGEX: Regex = r#"^HTTP/(\d(?:\.\d)?) (\d{3})"#.parse().unwrap();
static ref HEADER_LINE_REGEX: Regex = r#"^([^:]+): *([^\r]*)\r\n$"#.parse().unwrap();
}

pub fn parse_status_line(line: &[u8]) -> Option<(Version, StatusCode)> {
STATUS_LINE_REGEX.captures(line).and_then(|captures| Some((
match &captures[1] {
b"HTTP/2" => Version::HTTP_2,
b"HTTP/1.1" => Version::HTTP_11,
b"HTTP/1.0" => Version::HTTP_10,
b"HTTP/0.9" => Version::HTTP_09,
_ => Version::default(),
},
StatusCode::from_bytes(&captures[2]).ok()?,
)))
}

pub fn parse_header(line: &[u8]) -> Option<(HeaderName, HeaderValue)> {
HEADER_LINE_REGEX.captures(line).and_then(|captures| Some((
HeaderName::from_bytes(&captures[1]).ok()?,
HeaderValue::from_bytes(&captures[2]).ok()?,
)))
}
131 changes: 77 additions & 54 deletions src/internal/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use log;
use options::*;
use std::io::{self, Read};
use std::mem;
use std::str::{self, FromStr};
use std::sync::atomic::*;
use std::sync::{Arc, Mutex};
use super::agent;
use super::format_byte_string;
use super::parse;

const STATUS_READY: usize = 0;
const STATUS_CLOSED: usize = 1;
Expand All @@ -27,16 +27,18 @@ pub fn create<B: Into<Body>>(request: Request<B>, default_options: &Options) ->
let (future_tx, future_rx) = oneshot::channel();
let (request_parts, request_body) = request.into_parts();

// If the request has options attached, use those options, otherwise use the default options.
let options = request_parts.extensions.get().unwrap_or(default_options);

let mut easy = curl::easy::Easy2::new(CurlHandler {
state: Arc::new(RequestState::default()),
state: Arc::new(RequestState::new(options.clone())),
future: Some(future_tx),
request_body: request_body.into(),
response: http::response::Builder::new(),
version: None,
status_code: None,
headers: http::HeaderMap::default(),
});

// If the request has options attached, use those options, otherwise use the default options.
let options = request_parts.extensions.get().unwrap_or(default_options);

easy.verbose(log_enabled!(log::Level::Trace))?;
easy.signal(false)?;
easy.buffer_size(options.buffer_size)?;
Expand Down Expand Up @@ -128,15 +130,29 @@ pub struct CurlRequest(pub curl::easy::Easy2<CurlHandler>);
/// Sends and receives data between curl and the outside world.
#[derive(Debug)]
pub struct CurlHandler {
/// Shared request state.
state: Arc<RequestState>,

/// Future that resolves when the response headers are received.
future: Option<oneshot::Sender<Result<Response<CurlResponseStream>, Error>>>,

/// A request body to send.
request_body: Body,
response: http::response::Builder,

/// Status code of the response.
status_code: Option<http::StatusCode>,

/// HTTP version of the response.
version: Option<http::Version>,

/// Response headers received so far.
headers: http::HeaderMap,
}

impl CurlHandler {
/// Mark the request as completed successfully.
pub fn complete(&self) {
pub fn complete(&mut self) {
self.ensure_future_is_completed();
self.state.close();
self.state.read_waker.wake();
}
Expand Down Expand Up @@ -182,74 +198,79 @@ impl CurlHandler {
}
}

/// Determine if curl is about to perform a redirect.
fn is_about_to_redirect(&self) -> bool {
self.state.options.redirect_policy != RedirectPolicy::None
&& self.status_code.filter(http::StatusCode::is_redirection).is_some()
&& self.headers.contains_key("Location")
}

/// Completes the associated future when headers have been received.
fn finalize_headers(&mut self) -> bool {
fn finalize_headers(&mut self) {
if self.is_about_to_redirect() {
debug!("preparing for redirect to {:?}", self.headers.get("Location"));

// It appears that curl will do a redirect, so instead of completing the future, just reset the response
// state.
self.status_code = None;
self.version = None;
self.headers.clear();

return;
}

self.ensure_future_is_completed();
}

fn ensure_future_is_completed(&mut self) {
if let Some(future) = self.future.take() {
let body = CurlResponseStream {
state: self.state.clone(),
};

let response = self.response.body(body).unwrap();
let mut builder = http::Response::builder();
builder.status(self.status_code.take().unwrap());
builder.version(self.version.take().unwrap());

for (name, values) in self.headers.drain() {
for value in values {
builder.header(&name, value);
}
}

let response = builder
.body(body)
.unwrap();

future.send(Ok(response)).is_ok()
} else {
warn!("headers already finalized");
false
future.send(Ok(response)).is_ok();
}
}
}

impl curl::easy::Handler for CurlHandler {
// Gets called by curl for each line of data in the HTTP request header.
fn header(&mut self, data: &[u8]) -> bool {
let line = match str::from_utf8(data) {
Ok(s) => s,
_ => return false,
};

// curl calls this function for all lines in the response not part of the response body, not just for headers.
// Curl calls this function for all lines in the response not part of the response body, not just for headers.
// We need to inspect the contents of the string in order to determine what it is and how to parse it, just as
// if we were reading from the socket of a HTTP/1.0 or HTTP/1.1 connection ourselves.

// Is this the status line?
if line.starts_with("HTTP/") {
let separator = match line.find(' ') {
Some(idx) => idx,
None => return false,
};

// Parse the HTTP protocol version.
let version = match &line[0..separator] {
"HTTP/2" => http::Version::HTTP_2,
"HTTP/1.1" => http::Version::HTTP_11,
"HTTP/1.0" => http::Version::HTTP_10,
"HTTP/0.9" => http::Version::HTTP_09,
_ => http::Version::default(),
};
self.response.version(version);

// Parse the status code.
let status_code = match http::StatusCode::from_str(&line[separator+1..separator+4]) {
Ok(s) => s,
_ => return false,
};
self.response.status(status_code);

if let Some((version, status)) = parse::parse_status_line(data) {
self.version = Some(version);
self.status_code = Some(status);
return true;
}

// Is this a header line?
if let Some(pos) = line.find(":") {
let (name, value) = line.split_at(pos);
let value = value[2..].trim();
self.response.header(name, value);

if let Some((name, value)) = parse::parse_header(data) {
self.headers.insert(name, value);
return true;
}

// Is this the end of the response header?
if line == "\r\n" {
return self.finalize_headers();
if data == b"\r\n" {
self.finalize_headers();
return true;
}

// Unknown header line we don't know how to parse.
Expand All @@ -275,6 +296,8 @@ impl curl::easy::Handler for CurlHandler {

// Gets called by curl when bytes from the response body are received.
fn write(&mut self, data: &[u8]) -> Result<usize, curl::easy::WriteError> {
trace!("received {} bytes of data", data.len());

if self.state.is_closed() {
debug!("aborting write, request is already closed");
return Ok(0);
Expand Down Expand Up @@ -390,6 +413,7 @@ impl AsyncRead for CurlResponseStream {
/// Holds the shared state of a request.
#[derive(Debug)]
struct RequestState {
options: Options,
status: AtomicUsize,
agent: AtomicLazyCell<agent::Handle>,
token: AtomicLazyCell<usize>,
Expand All @@ -398,9 +422,10 @@ struct RequestState {
read_waker: task::AtomicWaker,
}

impl Default for RequestState {
fn default() -> Self {
impl RequestState {
fn new(options: Options) -> Self {
Self {
options: options,
status: AtomicUsize::new(STATUS_READY),
agent: AtomicLazyCell::new(),
token: AtomicLazyCell::new(),
Expand All @@ -409,9 +434,7 @@ impl Default for RequestState {
read_waker: task::AtomicWaker::default(),
}
}
}

impl RequestState {
fn is_closed(&self) -> bool {
self.status.load(Ordering::SeqCst) == STATUS_CLOSED
}
Expand Down
11 changes: 6 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ extern crate lazycell;
extern crate log;
#[cfg(unix)]
extern crate nix;
extern crate regex;
extern crate slab;
#[macro_use]
extern crate withers_derive;
Expand Down Expand Up @@ -150,33 +151,33 @@ lazy_static! {
/// Sends an HTTP GET request.
///
/// The response body is provided as a stream that may only be consumed once.
pub fn get(uri: &str) -> Result<Response, Error> {
pub fn get<U>(uri: U) -> Result<Response, Error> where http::Uri: http::HttpTryFrom<U> {
DEFAULT_CLIENT.get(uri)
}

/// Sends an HTTP HEAD request.
pub fn head(uri: &str) -> Result<Response, Error> {
pub fn head<U>(uri: U) -> Result<Response, Error> where http::Uri: http::HttpTryFrom<U> {
DEFAULT_CLIENT.head(uri)
}

/// Sends an HTTP POST request.
///
/// The response body is provided as a stream that may only be consumed once.
pub fn post<B: Into<Body>>(uri: &str, body: B) -> Result<Response, Error> {
pub fn post<U>(uri: U, body: impl Into<Body>) -> Result<Response, Error> where http::Uri: http::HttpTryFrom<U> {
DEFAULT_CLIENT.post(uri, body)
}

/// Sends an HTTP PUT request.
///
/// The response body is provided as a stream that may only be consumed once.
pub fn put<B: Into<Body>>(uri: &str, body: B) -> Result<Response, Error> {
pub fn put<U>(uri: U, body: impl Into<Body>) -> Result<Response, Error> where http::Uri: http::HttpTryFrom<U> {
DEFAULT_CLIENT.put(uri, body)
}

/// Sends an HTTP DELETE request.
///
/// The response body is provided as a stream that may only be consumed once.
pub fn delete(uri: &str) -> Result<Response, Error> {
pub fn delete<U>(uri: U) -> Result<Response, Error> where http::Uri: http::HttpTryFrom<U> {
DEFAULT_CLIENT.delete(uri)
}

Expand Down
Loading

0 comments on commit dc06254

Please sign in to comment.