1 #ifndef SERVER_HTTP_HPP
2 #define SERVER_HTTP_HPP
12 #include <unordered_set>
14 #ifdef USE_STANDALONE_ASIO
16 #include <asio/steady_timer.hpp>
18 using error_code = std::error_code;
19 using errc = std::errc;
20 namespace make_error_code = std;
23 #include <boost/asio.hpp>
24 #include <boost/asio/steady_timer.hpp>
26 namespace asio = boost::asio;
27 using error_code = boost::system::error_code;
28 namespace errc = boost::system::errc;
29 namespace make_error_code = boost::system::errc;
34 #ifdef USE_BOOST_REGEX
35 #include <boost/regex.hpp>
37 namespace regex = boost;
42 namespace regex = std;
47 template <
class socket_type>
50 template <
class socket_type>
56 class Response :
public std::enable_shared_from_this<Response>,
public std::ostream {
58 friend class Server<socket_type>;
60 asio::streambuf streambuf;
62 std::shared_ptr<Session> session;
65 Response(std::shared_ptr<Session> session,
long timeout_content) noexcept : std::ostream(&streambuf), session(std::move(session)), timeout_content(timeout_content) {}
67 template <
typename size_type>
68 void write_header(
const CaseInsensitiveMultimap &header, size_type size) {
69 bool content_length_written =
false;
70 bool chunked_transfer_encoding =
false;
71 for(
auto &field : header) {
72 if(!content_length_written && case_insensitive_equal(field.first,
"content-length"))
73 content_length_written =
true;
74 else if(!chunked_transfer_encoding && case_insensitive_equal(field.first,
"transfer-encoding") && case_insensitive_equal(field.second,
"chunked"))
75 chunked_transfer_encoding =
true;
77 *
this << field.first <<
": " << field.second <<
"\r\n";
80 *
this <<
"Content-Length: " << size <<
"\r\n\r\n";
86 std::size_t size() noexcept {
87 return streambuf.size();
91 void send(
const std::function<
void(
const error_code &)> &callback =
nullptr) noexcept {
92 session->connection->set_timeout(timeout_content);
93 auto self = this->shared_from_this();
94 asio::async_write(*session->connection->socket, streambuf, [
self, callback](
const error_code &ec, std::size_t ) {
95 self->session->connection->cancel_timeout();
96 auto lock =
self->session->connection->handler_runner->continue_lock();
105 void write(
const char_type *ptr, std::streamsize n) {
106 std::ostream::write(ptr, n);
110 void write(StatusCode status_code = StatusCode::success_ok,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
111 *
this <<
"HTTP/1.1 " << SimpleWeb::status_code(status_code) <<
"\r\n";
112 write_header(header, 0);
116 void write(StatusCode status_code,
const std::string &content,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
117 *
this <<
"HTTP/1.1 " << SimpleWeb::status_code(status_code) <<
"\r\n";
118 write_header(header, content.size());
124 void write(StatusCode status_code, std::istream &content,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
125 *
this <<
"HTTP/1.1 " << SimpleWeb::status_code(status_code) <<
"\r\n";
126 content.seekg(0, std::ios::end);
127 auto size = content.tellg();
128 content.seekg(0, std::ios::beg);
129 write_header(header, size);
131 *
this << content.rdbuf();
135 void write(
const std::string &content,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
136 write(StatusCode::success_ok, content, header);
140 void write(std::istream &content,
const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
141 write(StatusCode::success_ok, content, header);
145 void write(
const CaseInsensitiveMultimap &header) {
146 write(StatusCode::success_ok, std::string(), header);
153 bool close_connection_after_response =
false;
160 std::size_t size() noexcept {
161 return streambuf.size();
166 std::stringstream ss;
171 return std::string();
176 asio::streambuf &streambuf;
177 Content(asio::streambuf &streambuf) noexcept : std::istream(&streambuf), streambuf(streambuf) {}
182 friend class Server<socket_type>;
185 asio::streambuf streambuf;
186 std::shared_ptr<asio::ip::tcp::endpoint> remote_endpoint;
188 Request(std::size_t max_request_streambuf_size, std::shared_ptr<asio::ip::tcp::endpoint> remote_endpoint) noexcept
189 : streambuf(max_request_streambuf_size), remote_endpoint(std::move(remote_endpoint)), content(streambuf) {}
192 std::string method, path, query_string, http_version;
196 CaseInsensitiveMultimap header;
198 regex::smatch path_match;
200 std::string remote_endpoint_address() noexcept {
202 return remote_endpoint->address().to_string();
205 return std::string();
209 unsigned short remote_endpoint_port() noexcept {
210 return remote_endpoint->port();
220 class Connection :
public std::enable_shared_from_this<Connection> {
222 template <
typename... Args>
223 Connection(std::shared_ptr<ScopeRunner> handler_runner, Args &&... args) noexcept : handler_runner(std::move(handler_runner)), socket(
new socket_type(std::forward<Args>(args)...)) {}
225 std::shared_ptr<ScopeRunner> handler_runner;
227 std::unique_ptr<socket_type> socket;
228 std::mutex socket_close_mutex;
230 std::unique_ptr<asio::steady_timer> timer;
232 std::shared_ptr<asio::ip::tcp::endpoint> remote_endpoint;
234 void close() noexcept {
236 std::unique_lock<std::mutex> lock(socket_close_mutex);
237 socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
238 socket->lowest_layer().close(ec);
241 void set_timeout(
long seconds) noexcept {
247 timer = std::unique_ptr<asio::steady_timer>(
new asio::steady_timer(socket->get_io_service()));
248 timer->expires_from_now(std::chrono::seconds(seconds));
249 auto self = this->shared_from_this();
250 timer->async_wait([
self](
const error_code &ec) {
256 void cancel_timeout() noexcept {
266 Session(std::size_t max_request_streambuf_size, std::shared_ptr<Connection> connection) noexcept : connection(std::move(connection)) {
267 if(!this->connection->remote_endpoint) {
269 this->connection->remote_endpoint = std::make_shared<asio::ip::tcp::endpoint>(this->connection->socket->lowest_layer().remote_endpoint(ec));
271 request = std::shared_ptr<Request>(
new Request(max_request_streambuf_size, this->connection->remote_endpoint));
274 std::shared_ptr<Connection> connection;
275 std::shared_ptr<Request> request;
282 Config(
unsigned short port) noexcept : port(port) {}
289 std::size_t thread_pool_size = 1;
291 long timeout_request = 5;
293 long timeout_content = 300;
296 std::size_t max_request_streambuf_size = std::numeric_limits<std::size_t>::max();
301 bool reuse_address =
true;
307 class regex_orderable :
public regex::regex {
311 regex_orderable(
const char *regex_cstr) : regex::regex(regex_cstr), str(regex_cstr) {}
312 regex_orderable(std::string regex_str) : regex::regex(regex_str), str(std::move(regex_str)) {}
313 bool operator<(
const regex_orderable &rhs)
const noexcept {
314 return str < rhs.str;
320 std::map<regex_orderable, std::map<std::string, std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Response>, std::shared_ptr<typename ServerBase<socket_type>::Request>)>>> resource;
322 std::map<std::string, std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Response>, std::shared_ptr<typename ServerBase<socket_type>::Request>)>> default_resource;
324 std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Request>,
const error_code &)> on_error;
326 std::function<void(std::unique_ptr<socket_type> &, std::shared_ptr<typename ServerBase<socket_type>::Request>)> on_upgrade;
331 virtual void start() {
333 io_service = std::make_shared<asio::io_service>();
334 internal_io_service =
true;
337 if(io_service->stopped())
340 asio::ip::tcp::endpoint endpoint;
341 if(config.address.size() > 0)
342 endpoint = asio::ip::tcp::endpoint(asio::ip::address::from_string(config.address), config.port);
344 endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), config.port);
347 acceptor = std::unique_ptr<asio::ip::tcp::acceptor>(
new asio::ip::tcp::acceptor(*io_service));
348 acceptor->open(endpoint.protocol());
349 acceptor->set_option(asio::socket_base::reuse_address(config.reuse_address));
350 acceptor->bind(endpoint);
355 if(internal_io_service) {
358 for(std::size_t c = 1; c < config.thread_pool_size; c++) {
359 threads.emplace_back([
this]() {
360 this->io_service->run();
365 if(config.thread_pool_size > 0)
369 for(
auto &t : threads)
381 std::unique_lock<std::mutex> lock(*connections_mutex);
382 for(
auto &connection : *connections)
384 connections->clear();
387 if(internal_io_service)
393 handler_runner->
stop();
398 bool internal_io_service =
false;
400 std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
401 std::vector<std::thread> threads;
403 std::shared_ptr<std::unordered_set<Connection *>> connections;
404 std::shared_ptr<std::mutex> connections_mutex;
406 std::shared_ptr<ScopeRunner> handler_runner;
408 ServerBase(
unsigned short port) noexcept : config(port), connections(
new std::unordered_set<Connection *>()), connections_mutex(
new std::mutex()), handler_runner(
new ScopeRunner()) {}
410 virtual void accept() = 0;
412 template <
typename... Args>
413 std::shared_ptr<Connection> create_connection(Args &&... args) noexcept {
414 auto connections = this->connections;
415 auto connections_mutex = this->connections_mutex;
416 auto connection = std::shared_ptr<Connection>(
new Connection(handler_runner, std::forward<Args>(args)...), [connections, connections_mutex](Connection *connection) {
418 std::unique_lock<std::mutex> lock(*connections_mutex);
419 auto it = connections->find(connection);
420 if(it != connections->end())
421 connections->erase(it);
426 std::unique_lock<std::mutex> lock(*connections_mutex);
427 connections->emplace(connection.get());
432 void read_request_and_content(
const std::shared_ptr<Session> &session) {
433 session->connection->set_timeout(config.timeout_request);
434 asio::async_read_until(*session->connection->socket, session->request->streambuf,
"\r\n\r\n", [
this, session](
const error_code &ec, std::size_t bytes_transferred) {
435 session->connection->cancel_timeout();
436 auto lock = session->connection->handler_runner->continue_lock();
439 if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) {
440 auto response = std::shared_ptr<Response>(
new Response(session, this->config.timeout_content));
441 response->write(StatusCode::client_error_payload_too_large);
444 this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
452 std::size_t num_additional_bytes = session->request->streambuf.size() - bytes_transferred;
454 if(!
RequestMessage::parse(session->request->content, session->request->method, session->request->path,
455 session->request->query_string, session->request->http_version, session->request->header)) {
457 this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error));
462 auto it = session->request->header.find(
"Content-Length");
463 if(it != session->request->header.end()) {
464 unsigned long long content_length = 0;
466 content_length = stoull(it->second);
468 catch(
const std::exception &e) {
470 this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error));
473 if(content_length > num_additional_bytes) {
474 session->connection->set_timeout(config.timeout_content);
475 asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [
this, session](
const error_code &ec, std::size_t ) {
476 session->connection->cancel_timeout();
477 auto lock = session->connection->handler_runner->continue_lock();
481 if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
482 auto response = std::shared_ptr<Response>(
new Response(session, this->config.timeout_content));
483 response->write(StatusCode::client_error_payload_too_large);
486 this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
489 this->find_resource(session);
491 else if(this->on_error)
492 this->on_error(session->request, ec);
496 this->find_resource(session);
499 this->find_resource(session);
501 else if(this->on_error)
502 this->on_error(session->request, ec);
506 void find_resource(
const std::shared_ptr<Session> &session) {
509 auto it = session->request->header.find(
"Upgrade");
510 if(it != session->request->header.end()) {
513 std::unique_lock<std::mutex> lock(*connections_mutex);
514 auto it = connections->find(session->connection.get());
515 if(it != connections->end())
516 connections->erase(it);
519 on_upgrade(session->connection->socket, session->request);
524 for(
auto ®ex_method : resource) {
525 auto it = regex_method.second.find(session->request->method);
526 if(it != regex_method.second.end()) {
527 regex::smatch sm_res;
528 if(regex::regex_match(session->request->path, sm_res, regex_method.first)) {
529 session->request->path_match = std::move(sm_res);
530 write_response(session, it->second);
535 auto it = default_resource.find(session->request->method);
536 if(it != default_resource.end())
537 write_response(session, it->second);
540 void write_response(
const std::shared_ptr<Session> &session,
541 std::function<
void(std::shared_ptr<
typename ServerBase<socket_type>::Response>, std::shared_ptr<
typename ServerBase<socket_type>::Request>)> &resource_function) {
542 session->connection->set_timeout(config.timeout_content);
543 auto response = std::shared_ptr<Response>(
new Response(session, config.timeout_content), [
this](Response *response_ptr) {
544 auto response = std::shared_ptr<Response>(response_ptr);
545 response->send([
this, response](
const error_code &ec) {
547 if(response->close_connection_after_response)
550 auto range = response->session->request->header.equal_range(
"Connection");
551 for(
auto it = range.first; it != range.second; it++) {
552 if(case_insensitive_equal(it->second,
"close"))
554 else if(case_insensitive_equal(it->second,
"keep-alive")) {
555 auto new_session = std::make_shared<Session>(this->config.max_request_streambuf_size, response->session->connection);
556 this->read_request_and_content(new_session);
560 if(response->session->request->http_version >=
"1.1") {
561 auto new_session = std::make_shared<Session>(this->config.max_request_streambuf_size, response->session->connection);
562 this->read_request_and_content(new_session);
566 else if(this->on_error)
567 this->on_error(response->session->request, ec);
572 resource_function(response, session->request);
574 catch(
const std::exception &) {
576 on_error(session->request, make_error_code::make_error_code(errc::operation_canceled));
582 template <
class socket_type>
583 class Server :
public ServerBase<socket_type> {};
585 using HTTP = asio::ip::tcp::socket;
593 void accept()
override {
594 auto connection = create_connection(*io_service);
596 acceptor->async_accept(*connection->socket, [
this, connection](
const error_code &ec) {
597 auto lock = connection->handler_runner->continue_lock();
602 if(ec != asio::error::operation_aborted)
605 auto session = std::make_shared<Session>(config.max_request_streambuf_size, connection);
608 asio::ip::tcp::no_delay option(
true);
610 session->connection->socket->set_option(option, ec);
612 this->read_request_and_content(session);
614 else if(this->on_error)
615 this->on_error(session->request, ec);
void write(const char_type *ptr, std::streamsize n)
Write directly to stream buffer using std::ostream::write.
Definition: server_http.h:105
void write(const std::string &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing success status line, header fields, and content. ...
Definition: server_http.h:135
void write(StatusCode status_code=StatusCode::success_ok, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing status line, potential header fields, and empty content.
Definition: server_http.h:110
void write(std::istream &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing success status line, header fields, and content. ...
Definition: server_http.h:140
Definition: server_http.h:56
void write(StatusCode status_code, std::istream &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing status line, header fields, and content.
Definition: server_http.h:124
std::string address
Definition: server_http.h:299
Definition: server_http.h:48
Definition: server_http.h:264
Definition: server_http.h:51
Definition: server_http.h:180
std::shared_ptr< asio::io_service > io_service
If you have your own asio::io_service, store its pointer here before running start().
Definition: server_http.h:329
void stop() noexcept
Stop accepting new requests, and close current connections.
Definition: server_http.h:375
unsigned short port
Port number to use. Defaults to 80 for HTTP and 443 for HTTPS.
Definition: server_http.h:286
void write(const CaseInsensitiveMultimap &header)
Convenience function for writing success status line, and header fields.
Definition: server_http.h:145
void write(StatusCode status_code, const std::string &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing status line, header fields, and content.
Definition: server_http.h:116
static CaseInsensitiveMultimap parse(const std::string &query_string) noexcept
Returns query keys with percent-decoded values.
Definition: utility.h:100
Config config
Set before calling start().
Definition: server_http.h:304
CaseInsensitiveMultimap parse_query_string() noexcept
Returns query keys with percent-decoded values.
Definition: server_http.h:214
bool close_connection_after_response
Definition: server_http.h:153
Definition: server_http.h:156
void send(const std::function< void(const error_code &)> &callback=nullptr) noexcept
Use this function if you need to recursively send parts of a longer message.
Definition: server_http.h:91
Definition: server_http.h:279
static bool parse(std::istream &stream, std::string &method, std::string &path, std::string &query_string, std::string &version, CaseInsensitiveMultimap &header) noexcept
Parse request line and header fields.
Definition: utility.h:163
std::string string() noexcept
Convenience function to return std::string. The stream buffer is consumed.
Definition: server_http.h:164
Definition: server_http.h:220