1 #ifndef CLIENT_HTTP_HPP
2 #define CLIENT_HTTP_HPP
8 #include <unordered_set>
11 #ifdef USE_STANDALONE_ASIO
13 #include <asio/steady_timer.hpp>
15 using error_code = std::error_code;
16 using errc = std::errc;
17 using system_error = std::system_error;
18 namespace make_error_code = std;
19 using string_view =
const std::string &;
22 #include <boost/asio.hpp>
23 #include <boost/asio/steady_timer.hpp>
24 #include <boost/utility/string_ref.hpp>
26 namespace asio = boost::asio;
27 using error_code = boost::system::error_code;
28 namespace errc = boost::system::errc;
29 using system_error = boost::system::system_error;
30 namespace make_error_code = boost::system::errc;
31 using string_view = boost::string_ref;
36 template <
class socket_type>
39 template <
class socket_type>
46 std::size_t size() noexcept {
47 return streambuf.size();
62 asio::streambuf &streambuf;
63 Content(asio::streambuf &streambuf) noexcept : std::istream(&streambuf), streambuf(streambuf) {}
68 friend class Client<socket_type>;
70 asio::streambuf streambuf;
72 Response(std::size_t max_response_streambuf_size) noexcept : streambuf(max_response_streambuf_size), content(streambuf) {}
75 std::string http_version, status_code;
79 CaseInsensitiveMultimap header;
101 class Connection :
public std::enable_shared_from_this<Connection> {
103 template <
typename... Args>
104 Connection(std::shared_ptr<ScopeRunner> handler_runner,
long timeout, Args &&... args) noexcept
105 : handler_runner(std::move(handler_runner)), timeout(timeout), socket(
new socket_type(std::forward<Args>(args)...)) {}
107 std::shared_ptr<ScopeRunner> handler_runner;
110 std::unique_ptr<socket_type> socket;
112 bool attempt_reconnect =
true;
114 std::unique_ptr<asio::steady_timer> timer;
116 void set_timeout(
long seconds = 0) noexcept {
123 timer = std::unique_ptr<asio::steady_timer>(
new asio::steady_timer(socket->get_io_service()));
124 timer->expires_from_now(std::chrono::seconds(seconds));
125 auto self = this->shared_from_this();
126 timer->async_wait([
self](
const error_code &ec) {
129 self->socket->lowest_layer().cancel(ec);
134 void cancel_timeout() noexcept {
144 Session(std::size_t max_response_streambuf_size, std::shared_ptr<Connection> connection, std::unique_ptr<asio::streambuf> request_streambuf) noexcept
145 : connection(std::move(connection)), request_streambuf(std::move(request_streambuf)), response(
new Response(max_response_streambuf_size)) {}
147 std::shared_ptr<Connection> connection;
148 std::unique_ptr<asio::streambuf> request_streambuf;
149 std::shared_ptr<Response> response;
150 std::function<void(const std::shared_ptr<Connection> &,
const error_code &)> callback;
164 std::shared_ptr<Response>
request(
const std::string &method,
const std::string &path = std::string(
"/"),
165 string_view content =
"",
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
166 std::shared_ptr<Response> response;
168 request(method, path, content, header, [&response, &ec](std::shared_ptr<Response> response_,
const error_code &ec_) {
169 response = response_;
174 std::unique_lock<std::mutex> lock(concurrent_synchronous_requests_mutex);
175 ++concurrent_synchronous_requests;
179 std::unique_lock<std::mutex> lock(concurrent_synchronous_requests_mutex);
180 --concurrent_synchronous_requests;
181 if(!concurrent_synchronous_requests)
186 throw system_error(ec);
194 std::shared_ptr<Response>
request(
const std::string &method,
const std::string &path, std::istream &content,
195 const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
196 std::shared_ptr<Response> response;
198 request(method, path, content, header, [&response, &ec](std::shared_ptr<Response> response_,
const error_code &ec_) {
199 response = response_;
204 std::unique_lock<std::mutex> lock(concurrent_synchronous_requests_mutex);
205 ++concurrent_synchronous_requests;
209 std::unique_lock<std::mutex> lock(concurrent_synchronous_requests_mutex);
210 --concurrent_synchronous_requests;
211 if(!concurrent_synchronous_requests)
216 throw system_error(ec);
223 void request(
const std::string &method,
const std::string &path, string_view content,
const CaseInsensitiveMultimap &header,
224 std::function<
void(std::shared_ptr<Response>,
const error_code &)> &&request_callback_) {
226 auto response = session->response;
227 auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>,
const error_code &)>>(std::move(request_callback_));
228 session->callback = [
this, response, request_callback](
const std::shared_ptr<Connection> &connection,
const error_code &ec) {
230 std::unique_lock<std::mutex> lock(this->connections_mutex);
231 connection->in_use =
false;
234 std::size_t unused_connections = 0;
235 for(
auto it = this->connections.begin(); it != this->connections.end();) {
236 if(ec && connection == *it)
237 it = this->connections.erase(it);
238 else if((*it)->in_use)
241 ++unused_connections;
242 if(unused_connections > 1)
243 it = this->connections.erase(it);
250 if(*request_callback)
251 (*request_callback)(response, ec);
254 std::ostream write_stream(session->request_streambuf.get());
255 if(content.size() > 0)
256 write_stream <<
"Content-Length: " << content.size() <<
"\r\n";
257 write_stream <<
"\r\n"
265 void request(
const std::string &method,
const std::string &path, string_view content,
266 std::function<
void(std::shared_ptr<Response>,
const error_code &)> &&request_callback) {
267 request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback));
271 void request(
const std::string &method,
const std::string &path,
272 std::function<
void(std::shared_ptr<Response>,
const error_code &)> &&request_callback) {
273 request(method, path, std::string(), CaseInsensitiveMultimap(), std::move(request_callback));
277 void request(
const std::string &method, std::function<
void(std::shared_ptr<Response>,
const error_code &)> &&request_callback) {
278 request(method, std::string(
"/"), std::string(), CaseInsensitiveMultimap(), std::move(request_callback));
282 void request(
const std::string &method,
const std::string &path, std::istream &content,
const CaseInsensitiveMultimap &header,
283 std::function<
void(std::shared_ptr<Response>,
const error_code &)> &&request_callback_) {
285 auto response = session->response;
286 auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>,
const error_code &)>>(std::move(request_callback_));
287 session->callback = [
this, response, request_callback](
const std::shared_ptr<Connection> &connection,
const error_code &ec) {
289 std::unique_lock<std::mutex> lock(this->connections_mutex);
290 connection->in_use =
false;
293 std::size_t unused_connections = 0;
294 for(
auto it = this->connections.begin(); it != this->connections.end();) {
295 if(ec && connection == *it)
296 it = this->connections.erase(it);
297 else if((*it)->in_use)
300 ++unused_connections;
301 if(unused_connections > 1)
302 it = this->connections.erase(it);
309 if(*request_callback)
310 (*request_callback)(response, ec);
313 content.seekg(0, std::ios::end);
314 auto content_length = content.tellg();
315 content.seekg(0, std::ios::beg);
316 std::ostream write_stream(session->request_streambuf.get());
317 if(content_length > 0)
318 write_stream <<
"Content-Length: " << content_length <<
"\r\n";
319 write_stream <<
"\r\n";
320 if(content_length > 0)
321 write_stream << content.rdbuf();
327 void request(
const std::string &method,
const std::string &path, std::istream &content,
328 std::function<
void(std::shared_ptr<Response>,
const error_code &)> &&request_callback) {
329 request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback));
334 std::unique_lock<std::mutex> lock(connections_mutex);
335 for(
auto it = connections.begin(); it != connections.end();) {
337 (*it)->socket->lowest_layer().cancel(ec);
338 it = connections.erase(it);
343 handler_runner->stop();
348 bool internal_io_service =
false;
353 std::unique_ptr<asio::ip::tcp::resolver::query> query;
355 std::unordered_set<std::shared_ptr<Connection>> connections;
356 std::mutex connections_mutex;
358 std::shared_ptr<ScopeRunner> handler_runner;
360 std::size_t concurrent_synchronous_requests = 0;
361 std::mutex concurrent_synchronous_requests_mutex;
363 ClientBase(
const std::string &host_port,
unsigned short default_port) noexcept : handler_runner(
new ScopeRunner()) {
364 auto parsed_host_port = parse_host_port(host_port, default_port);
365 host = parsed_host_port.first;
366 port = parsed_host_port.second;
369 std::shared_ptr<Connection> get_connection() noexcept {
370 std::shared_ptr<Connection> connection;
371 std::unique_lock<std::mutex> lock(connections_mutex);
374 io_service = std::make_shared<asio::io_service>();
375 internal_io_service =
true;
378 for(
auto it = connections.begin(); it != connections.end(); ++it) {
379 if(!(*it)->in_use && !connection) {
385 connection = create_connection();
386 connections.emplace(connection);
388 connection->attempt_reconnect =
true;
389 connection->in_use =
true;
393 query = std::unique_ptr<asio::ip::tcp::resolver::query>(
new asio::ip::tcp::resolver::query(host, std::to_string(port)));
396 query = std::unique_ptr<asio::ip::tcp::resolver::query>(
new asio::ip::tcp::resolver::query(proxy_host_port.first, std::to_string(proxy_host_port.second)));
403 virtual std::shared_ptr<Connection> create_connection() noexcept = 0;
404 virtual
void connect(const std::shared_ptr<Session> &) = 0;
406 std::unique_ptr<asio::streambuf> create_request_header(const std::
string &method, const std::
string &path, const CaseInsensitiveMultimap &header)
const {
407 auto corrected_path = path;
408 if(corrected_path ==
"")
409 corrected_path =
"/";
410 if(!
config.
proxy_server.empty() && std::is_same<socket_type, asio::ip::tcp::socket>::value)
411 corrected_path =
"http://" + host +
':' + std::to_string(port) + corrected_path;
413 std::unique_ptr<asio::streambuf> streambuf(
new asio::streambuf());
414 std::ostream write_stream(streambuf.get());
415 write_stream << method <<
" " << corrected_path <<
" HTTP/1.1\r\n";
416 write_stream <<
"Host: " << host <<
"\r\n";
417 for(
auto &h : header)
418 write_stream << h.first <<
": " << h.second <<
"\r\n";
422 std::pair<std::string, unsigned short> parse_host_port(
const std::string &host_port,
unsigned short default_port)
const noexcept {
423 std::pair<std::string, unsigned short> parsed_host_port;
424 std::size_t host_end = host_port.find(
':');
425 if(host_end == std::string::npos) {
426 parsed_host_port.first = host_port;
427 parsed_host_port.second = default_port;
430 parsed_host_port.first = host_port.substr(0, host_end);
431 parsed_host_port.second =
static_cast<unsigned short>(stoul(host_port.substr(host_end + 1)));
433 return parsed_host_port;
436 void write(
const std::shared_ptr<Session> &session) {
437 session->connection->set_timeout();
438 asio::async_write(*session->connection->socket, session->request_streambuf->data(), [
this, session](
const error_code &ec, std::size_t ) {
439 session->connection->cancel_timeout();
440 auto lock = session->connection->handler_runner->continue_lock();
446 session->callback(session->connection, ec);
450 void read(
const std::shared_ptr<Session> &session) {
451 session->connection->set_timeout();
452 asio::async_read_until(*session->connection->socket, session->response->streambuf,
"\r\n\r\n", [
this, session](
const error_code &ec, std::size_t bytes_transferred) {
453 session->connection->cancel_timeout();
454 auto lock = session->connection->handler_runner->continue_lock();
457 if((!ec || ec == asio::error::not_found) && session->response->streambuf.size() == session->response->streambuf.max_size()) {
458 session->callback(session->connection, make_error_code::make_error_code(errc::message_size));
462 session->connection->attempt_reconnect =
true;
463 std::size_t num_additional_bytes = session->response->streambuf.size() - bytes_transferred;
465 if(!
ResponseMessage::parse(session->response->content, session->response->http_version, session->response->status_code, session->response->header)) {
466 session->callback(session->connection, make_error_code::make_error_code(errc::protocol_error));
470 auto header_it = session->response->header.find(
"Content-Length");
471 if(header_it != session->response->header.end()) {
472 auto content_length = stoull(header_it->second);
473 if(content_length > num_additional_bytes) {
474 session->connection->set_timeout();
475 asio::async_read(*session->connection->socket, session->response->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [
this, session](
const error_code &ec, std::size_t ) {
476 session->connection->cancel_timeout();
477 auto lock = session->connection->handler_runner->continue_lock();
481 if(session->response->streambuf.size() == session->response->streambuf.max_size()) {
482 session->callback(session->connection, make_error_code::make_error_code(errc::message_size));
485 session->callback(session->connection, ec);
488 session->callback(session->connection, ec);
492 session->callback(session->connection, ec);
494 else if((header_it = session->response->header.find(
"Transfer-Encoding")) != session->response->header.end() && header_it->second ==
"chunked") {
495 auto tmp_streambuf = std::make_shared<asio::streambuf>();
496 this->read_chunked(session, tmp_streambuf);
498 else if(session->response->http_version <
"1.1" || ((header_it = session->response->header.find(
"Session")) != session->response->header.end() && header_it->second ==
"close")) {
499 session->connection->set_timeout();
500 asio::async_read(*session->connection->socket, session->response->streambuf, [
this, session](
const error_code &ec, std::size_t ) {
501 session->connection->cancel_timeout();
502 auto lock = session->connection->handler_runner->continue_lock();
506 if(session->response->streambuf.size() == session->response->streambuf.max_size()) {
507 session->callback(session->connection, make_error_code::make_error_code(errc::message_size));
510 session->callback(session->connection, ec);
513 session->callback(session->connection, ec == asio::error::eof ? error_code() : ec);
517 session->callback(session->connection, ec);
520 if(session->connection->attempt_reconnect && ec != asio::error::operation_aborted) {
521 std::unique_lock<std::mutex> lock(connections_mutex);
522 auto it = connections.find(session->connection);
523 if(it != connections.end()) {
524 connections.erase(it);
525 session->connection = create_connection();
526 session->connection->attempt_reconnect =
false;
527 session->connection->in_use =
true;
528 connections.emplace(session->connection);
530 this->connect(session);
534 session->callback(session->connection, ec);
538 session->callback(session->connection, ec);
543 void read_chunked(
const std::shared_ptr<Session> &session,
const std::shared_ptr<asio::streambuf> &tmp_streambuf) {
544 if(tmp_streambuf->size() >= config.max_response_streambuf_size) {
545 session->callback(session->connection, make_error_code::make_error_code(errc::message_size));
549 auto chunked_streambuf = std::make_shared<asio::streambuf>(config.max_response_streambuf_size - tmp_streambuf->size());
551 if(session->response->streambuf.size() > 0) {
552 std::ostream chunked_stream(chunked_streambuf.get());
553 chunked_stream << &session->response->streambuf;
555 session->connection->set_timeout();
556 asio::async_read_until(*session->connection->socket, *chunked_streambuf,
"\r\n", [
this, session, chunked_streambuf, tmp_streambuf](
const error_code &ec, std::size_t bytes_transferred) {
557 session->connection->cancel_timeout();
558 auto lock = session->connection->handler_runner->continue_lock();
561 if((!ec || ec == asio::error::not_found) && chunked_streambuf->size() == chunked_streambuf->max_size()) {
562 session->callback(session->connection, make_error_code::make_error_code(errc::message_size));
567 std::istream chunked_stream(chunked_streambuf.get());
568 getline(chunked_stream, line);
569 bytes_transferred -= line.size() + 1;
571 unsigned long length;
573 length = stoul(line, 0, 16);
576 session->callback(session->connection, make_error_code::make_error_code(errc::protocol_error));
580 auto num_additional_bytes = chunked_streambuf->size() - bytes_transferred;
582 auto post_process = [
this, session, chunked_streambuf, tmp_streambuf, length]() {
583 std::istream chunked_stream(chunked_streambuf.get());
584 std::ostream tmp_stream(tmp_streambuf.get());
586 std::unique_ptr<char[]> buffer(
new char[length]);
587 chunked_stream.read(buffer.get(),
static_cast<std::streamsize
>(length));
588 tmp_stream.write(buffer.get(),
static_cast<std::streamsize
>(length));
592 chunked_stream.get();
593 chunked_stream.get();
596 this->read_chunked(session, tmp_streambuf);
598 if(tmp_streambuf->size() > 0) {
599 std::ostream response_stream(&session->response->streambuf);
600 response_stream << tmp_streambuf.get();
603 session->callback(session->connection, ec);
607 if((2 + length) > num_additional_bytes) {
608 session->connection->set_timeout();
609 asio::async_read(*session->connection->socket, *chunked_streambuf, asio::transfer_exactly(2 + length - num_additional_bytes), [
this, session, chunked_streambuf, post_process](
const error_code &ec, std::size_t ) {
610 session->connection->cancel_timeout();
611 auto lock = session->connection->handler_runner->continue_lock();
615 if(chunked_streambuf->size() == chunked_streambuf->max_size()) {
616 session->callback(session->connection, make_error_code::make_error_code(errc::message_size));
622 session->callback(session->connection, ec);
629 session->callback(session->connection, ec);
634 template <
class socket_type>
635 class Client :
public ClientBase<socket_type> {};
637 using HTTP = asio::ip::tcp::socket;
645 std::shared_ptr<Connection> create_connection() noexcept
override {
646 return std::make_shared<Connection>(handler_runner, config.timeout, *io_service);
649 void connect(
const std::shared_ptr<Session> &session)
override {
650 if(!session->connection->socket->lowest_layer().is_open()) {
651 auto resolver = std::make_shared<asio::ip::tcp::resolver>(*io_service);
652 session->connection->set_timeout(config.timeout_connect);
653 resolver->async_resolve(*query, [
this, session, resolver](
const error_code &ec, asio::ip::tcp::resolver::iterator it) {
654 session->connection->cancel_timeout();
655 auto lock = session->connection->handler_runner->continue_lock();
659 session->connection->set_timeout(config.timeout_connect);
660 asio::async_connect(*session->connection->socket, it, [
this, session, resolver](
const error_code &ec, asio::ip::tcp::resolver::iterator ) {
661 session->connection->cancel_timeout();
662 auto lock = session->connection->handler_runner->continue_lock();
666 asio::ip::tcp::no_delay option(
true);
668 session->connection->socket->set_option(option, ec);
669 this->write(session);
672 session->callback(session->connection, ec);
676 session->callback(session->connection, ec);
void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header, std::function< void(std::shared_ptr< Response >, const error_code &)> &&request_callback_)
Asynchronous request where setting and/or running Client's io_service is required.
Definition: client_http.h:282
std::string proxy_server
Set proxy server (server:port)
Definition: client_http.h:97
std::shared_ptr< Response > request(const std::string &method, const std::string &path=std::string("/"), string_view content="", const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Definition: client_http.h:164
void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header, std::function< void(std::shared_ptr< Response >, const error_code &)> &&request_callback_)
Definition: client_http.h:223
static bool parse(std::istream &stream, std::string &version, std::string &status_code, CaseInsensitiveMultimap &header) noexcept
Parse status line and header fields.
Definition: utility.h:212
void request(const std::string &method, const std::string &path, string_view content, std::function< void(std::shared_ptr< Response >, const error_code &)> &&request_callback)
Definition: client_http.h:265
Definition: client_http.h:66
Definition: client_http.h:37
Definition: client_http.h:101
std::string string() noexcept
Convenience function to return std::string. The stream buffer is consumed.
Definition: client_http.h:50
Config config
Set before calling request.
Definition: client_http.h:155
std::shared_ptr< asio::io_service > io_service
Definition: client_http.h:159
Definition: client_http.h:82
long timeout_connect
Set connect timeout in seconds. Default value: 0 (Config::timeout is then used instead).
Definition: client_http.h:92
void request(const std::string &method, const std::string &path, std::function< void(std::shared_ptr< Response >, const error_code &)> &&request_callback)
Asynchronous request where setting and/or running Client's io_service is required.
Definition: client_http.h:271
void stop() noexcept
Close connections.
Definition: client_http.h:333
Definition: client_http.h:42
std::size_t max_response_streambuf_size
Definition: client_http.h:95
void request(const std::string &method, const std::string &path, std::istream &content, std::function< void(std::shared_ptr< Response >, const error_code &)> &&request_callback)
Asynchronous request where setting and/or running Client's io_service is required.
Definition: client_http.h:327
Definition: client_http.h:142
void request(const std::string &method, std::function< void(std::shared_ptr< Response >, const error_code &)> &&request_callback)
Asynchronous request where setting and/or running Client's io_service is required.
Definition: client_http.h:277
Definition: client_http.h:40
long timeout
Set timeout on requests in seconds. Default value: 0 (no timeout).
Definition: client_http.h:90
std::shared_ptr< Response > request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Definition: client_http.h:194