RDFS
The Rice Comp413 2017 class' continuation on the work of the 2016 RDFS.
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Pages
server_http.h
1 #ifndef SERVER_HTTP_HPP
2 #define SERVER_HTTP_HPP
3 
4 #include "utility.h"
5 #include <functional>
6 #include <iostream>
7 #include <limits>
8 #include <map>
9 #include <mutex>
10 #include <sstream>
11 #include <thread>
12 #include <unordered_set>
13 
14 #ifdef USE_STANDALONE_ASIO
15 #include <asio.hpp>
16 #include <asio/steady_timer.hpp>
17 namespace SimpleWeb {
18  using error_code = std::error_code;
19  using errc = std::errc;
20  namespace make_error_code = std;
21 } // namespace SimpleWeb
22 #else
23 #include <boost/asio.hpp>
24 #include <boost/asio/steady_timer.hpp>
25 namespace SimpleWeb {
26  namespace asio = boost::asio;
27  using error_code = boost::system::error_code;
28  namespace errc = boost::system::errc;
29  namespace make_error_code = boost::system::errc;
30 } // namespace SimpleWeb
31 #endif
32 
33 // Late 2017 TODO: remove the following checks and always use std::regex
34 #ifdef USE_BOOST_REGEX
35 #include <boost/regex.hpp>
36 namespace SimpleWeb {
37  namespace regex = boost;
38 }
39 #else
40 #include <regex>
41 namespace SimpleWeb {
42  namespace regex = std;
43 }
44 #endif
45 
46 namespace SimpleWeb {
47  template <class socket_type>
48  class Server;
49 
50  template <class socket_type>
51  class ServerBase {
52  protected:
53  class Session;
54 
55  public:
56  class Response : public std::enable_shared_from_this<Response>, public std::ostream {
57  friend class ServerBase<socket_type>;
58  friend class Server<socket_type>;
59 
60  asio::streambuf streambuf;
61 
62  std::shared_ptr<Session> session;
63  long timeout_content;
64 
65  Response(std::shared_ptr<Session> session, long timeout_content) noexcept : std::ostream(&streambuf), session(std::move(session)), timeout_content(timeout_content) {}
66 
67  template <typename size_type>
68  void write_header(const CaseInsensitiveMultimap &header, size_type size) {
69  bool content_length_written = false;
70  bool chunked_transfer_encoding = false;
71  for(auto &field : header) {
72  if(!content_length_written && case_insensitive_equal(field.first, "content-length"))
73  content_length_written = true;
74  else if(!chunked_transfer_encoding && case_insensitive_equal(field.first, "transfer-encoding") && case_insensitive_equal(field.second, "chunked"))
75  chunked_transfer_encoding = true;
76 
77  *this << field.first << ": " << field.second << "\r\n";
78  }
79  if(!content_length_written && !chunked_transfer_encoding && !close_connection_after_response)
80  *this << "Content-Length: " << size << "\r\n\r\n";
81  else
82  *this << "\r\n";
83  }
84 
85  public:
86  std::size_t size() noexcept {
87  return streambuf.size();
88  }
89 
91  void send(const std::function<void(const error_code &)> &callback = nullptr) noexcept {
92  session->connection->set_timeout(timeout_content);
93  auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write
94  asio::async_write(*session->connection->socket, streambuf, [self, callback](const error_code &ec, std::size_t /*bytes_transferred*/) {
95  self->session->connection->cancel_timeout();
96  auto lock = self->session->connection->handler_runner->continue_lock();
97  if(!lock)
98  return;
99  if(callback)
100  callback(ec);
101  });
102  }
103 
105  void write(const char_type *ptr, std::streamsize n) {
106  std::ostream::write(ptr, n);
107  }
108 
110  void write(StatusCode status_code = StatusCode::success_ok, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
111  *this << "HTTP/1.1 " << SimpleWeb::status_code(status_code) << "\r\n";
112  write_header(header, 0);
113  }
114 
116  void write(StatusCode status_code, const std::string &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
117  *this << "HTTP/1.1 " << SimpleWeb::status_code(status_code) << "\r\n";
118  write_header(header, content.size());
119  if(!content.empty())
120  *this << content;
121  }
122 
124  void write(StatusCode status_code, std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
125  *this << "HTTP/1.1 " << SimpleWeb::status_code(status_code) << "\r\n";
126  content.seekg(0, std::ios::end);
127  auto size = content.tellg();
128  content.seekg(0, std::ios::beg);
129  write_header(header, size);
130  if(size)
131  *this << content.rdbuf();
132  }
133 
135  void write(const std::string &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
136  write(StatusCode::success_ok, content, header);
137  }
138 
140  void write(std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
141  write(StatusCode::success_ok, content, header);
142  }
143 
145  void write(const CaseInsensitiveMultimap &header) {
146  write(StatusCode::success_ok, std::string(), header);
147  }
148 
153  bool close_connection_after_response = false;
154  };
155 
156  class Content : public std::istream {
157  friend class ServerBase<socket_type>;
158 
159  public:
160  std::size_t size() noexcept {
161  return streambuf.size();
162  }
164  std::string string() noexcept {
165  try {
166  std::stringstream ss;
167  ss << rdbuf();
168  return ss.str();
169  }
170  catch(...) {
171  return std::string();
172  }
173  }
174 
175  private:
176  asio::streambuf &streambuf;
177  Content(asio::streambuf &streambuf) noexcept : std::istream(&streambuf), streambuf(streambuf) {}
178  };
179 
180  class Request {
181  friend class ServerBase<socket_type>;
182  friend class Server<socket_type>;
183  friend class Session;
184 
185  asio::streambuf streambuf;
186  std::shared_ptr<asio::ip::tcp::endpoint> remote_endpoint;
187 
188  Request(std::size_t max_request_streambuf_size, std::shared_ptr<asio::ip::tcp::endpoint> remote_endpoint) noexcept
189  : streambuf(max_request_streambuf_size), remote_endpoint(std::move(remote_endpoint)), content(streambuf) {}
190 
191  public:
192  std::string method, path, query_string, http_version;
193 
194  Content content;
195 
196  CaseInsensitiveMultimap header;
197 
198  regex::smatch path_match;
199 
200  std::string remote_endpoint_address() noexcept {
201  try {
202  return remote_endpoint->address().to_string();
203  }
204  catch(...) {
205  return std::string();
206  }
207  }
208 
209  unsigned short remote_endpoint_port() noexcept {
210  return remote_endpoint->port();
211  }
212 
214  CaseInsensitiveMultimap parse_query_string() noexcept {
215  return SimpleWeb::QueryString::parse(query_string);
216  }
217  };
218 
219  protected:
220  class Connection : public std::enable_shared_from_this<Connection> {
221  public:
222  template <typename... Args>
223  Connection(std::shared_ptr<ScopeRunner> handler_runner, Args &&... args) noexcept : handler_runner(std::move(handler_runner)), socket(new socket_type(std::forward<Args>(args)...)) {}
224 
225  std::shared_ptr<ScopeRunner> handler_runner;
226 
227  std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
228  std::mutex socket_close_mutex;
229 
230  std::unique_ptr<asio::steady_timer> timer;
231 
232  std::shared_ptr<asio::ip::tcp::endpoint> remote_endpoint;
233 
234  void close() noexcept {
235  error_code ec;
236  std::unique_lock<std::mutex> lock(socket_close_mutex); // The following operations seems to be needed to run sequentially
237  socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
238  socket->lowest_layer().close(ec);
239  }
240 
241  void set_timeout(long seconds) noexcept {
242  if(seconds == 0) {
243  timer = nullptr;
244  return;
245  }
246 
247  timer = std::unique_ptr<asio::steady_timer>(new asio::steady_timer(socket->get_io_service()));
248  timer->expires_from_now(std::chrono::seconds(seconds));
249  auto self = this->shared_from_this();
250  timer->async_wait([self](const error_code &ec) {
251  if(!ec)
252  self->close();
253  });
254  }
255 
256  void cancel_timeout() noexcept {
257  if(timer) {
258  error_code ec;
259  timer->cancel(ec);
260  }
261  }
262  };
263 
264  class Session {
265  public:
266  Session(std::size_t max_request_streambuf_size, std::shared_ptr<Connection> connection) noexcept : connection(std::move(connection)) {
267  if(!this->connection->remote_endpoint) {
268  error_code ec;
269  this->connection->remote_endpoint = std::make_shared<asio::ip::tcp::endpoint>(this->connection->socket->lowest_layer().remote_endpoint(ec));
270  }
271  request = std::shared_ptr<Request>(new Request(max_request_streambuf_size, this->connection->remote_endpoint));
272  }
273 
274  std::shared_ptr<Connection> connection;
275  std::shared_ptr<Request> request;
276  };
277 
278  public:
279  class Config {
280  friend class ServerBase<socket_type>;
281 
282  Config(unsigned short port) noexcept : port(port) {}
283 
284  public:
286  unsigned short port;
289  std::size_t thread_pool_size = 1;
291  long timeout_request = 5;
293  long timeout_content = 300;
296  std::size_t max_request_streambuf_size = std::numeric_limits<std::size_t>::max();
299  std::string address;
301  bool reuse_address = true;
302  };
304  Config config;
305 
306  private:
307  class regex_orderable : public regex::regex {
308  std::string str;
309 
310  public:
311  regex_orderable(const char *regex_cstr) : regex::regex(regex_cstr), str(regex_cstr) {}
312  regex_orderable(std::string regex_str) : regex::regex(regex_str), str(std::move(regex_str)) {}
313  bool operator<(const regex_orderable &rhs) const noexcept {
314  return str < rhs.str;
315  }
316  };
317 
318  public:
320  std::map<regex_orderable, std::map<std::string, std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Response>, std::shared_ptr<typename ServerBase<socket_type>::Request>)>>> resource;
321 
322  std::map<std::string, std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Response>, std::shared_ptr<typename ServerBase<socket_type>::Request>)>> default_resource;
323 
324  std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Request>, const error_code &)> on_error;
325 
326  std::function<void(std::unique_ptr<socket_type> &, std::shared_ptr<typename ServerBase<socket_type>::Request>)> on_upgrade;
327 
329  std::shared_ptr<asio::io_service> io_service;
330 
331  virtual void start() {
332  if(!io_service) {
333  io_service = std::make_shared<asio::io_service>();
334  internal_io_service = true;
335  }
336 
337  if(io_service->stopped())
338  io_service->reset();
339 
340  asio::ip::tcp::endpoint endpoint;
341  if(config.address.size() > 0)
342  endpoint = asio::ip::tcp::endpoint(asio::ip::address::from_string(config.address), config.port);
343  else
344  endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), config.port);
345 
346  if(!acceptor)
347  acceptor = std::unique_ptr<asio::ip::tcp::acceptor>(new asio::ip::tcp::acceptor(*io_service));
348  acceptor->open(endpoint.protocol());
349  acceptor->set_option(asio::socket_base::reuse_address(config.reuse_address));
350  acceptor->bind(endpoint);
351  acceptor->listen();
352 
353  accept();
354 
355  if(internal_io_service) {
356  // If thread_pool_size>1, start m_io_service.run() in (thread_pool_size-1) threads for thread-pooling
357  threads.clear();
358  for(std::size_t c = 1; c < config.thread_pool_size; c++) {
359  threads.emplace_back([this]() {
360  this->io_service->run();
361  });
362  }
363 
364  // Main thread
365  if(config.thread_pool_size > 0)
366  io_service->run();
367 
368  // Wait for the rest of the threads, if any, to finish as well
369  for(auto &t : threads)
370  t.join();
371  }
372  }
373 
375  void stop() noexcept {
376  if(acceptor) {
377  error_code ec;
378  acceptor->close(ec);
379 
380  {
381  std::unique_lock<std::mutex> lock(*connections_mutex);
382  for(auto &connection : *connections)
383  connection->close();
384  connections->clear();
385  }
386 
387  if(internal_io_service)
388  io_service->stop();
389  }
390  }
391 
392  virtual ~ServerBase() noexcept {
393  handler_runner->stop();
394  stop();
395  }
396 
397  protected:
398  bool internal_io_service = false;
399 
400  std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
401  std::vector<std::thread> threads;
402 
403  std::shared_ptr<std::unordered_set<Connection *>> connections;
404  std::shared_ptr<std::mutex> connections_mutex;
405 
406  std::shared_ptr<ScopeRunner> handler_runner;
407 
408  ServerBase(unsigned short port) noexcept : config(port), connections(new std::unordered_set<Connection *>()), connections_mutex(new std::mutex()), handler_runner(new ScopeRunner()) {}
409 
410  virtual void accept() = 0;
411 
412  template <typename... Args>
413  std::shared_ptr<Connection> create_connection(Args &&... args) noexcept {
414  auto connections = this->connections;
415  auto connections_mutex = this->connections_mutex;
416  auto connection = std::shared_ptr<Connection>(new Connection(handler_runner, std::forward<Args>(args)...), [connections, connections_mutex](Connection *connection) {
417  {
418  std::unique_lock<std::mutex> lock(*connections_mutex);
419  auto it = connections->find(connection);
420  if(it != connections->end())
421  connections->erase(it);
422  }
423  delete connection;
424  });
425  {
426  std::unique_lock<std::mutex> lock(*connections_mutex);
427  connections->emplace(connection.get());
428  }
429  return connection;
430  }
431 
432  void read_request_and_content(const std::shared_ptr<Session> &session) {
433  session->connection->set_timeout(config.timeout_request);
434  asio::async_read_until(*session->connection->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, std::size_t bytes_transferred) {
435  session->connection->cancel_timeout();
436  auto lock = session->connection->handler_runner->continue_lock();
437  if(!lock)
438  return;
439  if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) {
440  auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
441  response->write(StatusCode::client_error_payload_too_large);
442  response->send();
443  if(this->on_error)
444  this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
445  return;
446  }
447  if(!ec) {
448  // request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs:
449  // "After a successful async_read_until operation, the streambuf may contain additional data beyond the delimiter"
450  // The chosen solution is to extract lines from the stream directly when parsing the header. What is left of the
451  // streambuf (maybe some bytes of the content) is appended to in the async_read-function below (for retrieving content).
452  std::size_t num_additional_bytes = session->request->streambuf.size() - bytes_transferred;
453 
454  if(!RequestMessage::parse(session->request->content, session->request->method, session->request->path,
455  session->request->query_string, session->request->http_version, session->request->header)) {
456  if(this->on_error)
457  this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error));
458  return;
459  }
460 
461  // If content, read that as well
462  auto it = session->request->header.find("Content-Length");
463  if(it != session->request->header.end()) {
464  unsigned long long content_length = 0;
465  try {
466  content_length = stoull(it->second);
467  }
468  catch(const std::exception &e) {
469  if(this->on_error)
470  this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error));
471  return;
472  }
473  if(content_length > num_additional_bytes) {
474  session->connection->set_timeout(config.timeout_content);
475  asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) {
476  session->connection->cancel_timeout();
477  auto lock = session->connection->handler_runner->continue_lock();
478  if(!lock)
479  return;
480  if(!ec) {
481  if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
482  auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
483  response->write(StatusCode::client_error_payload_too_large);
484  response->send();
485  if(this->on_error)
486  this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
487  return;
488  }
489  this->find_resource(session);
490  }
491  else if(this->on_error)
492  this->on_error(session->request, ec);
493  });
494  }
495  else
496  this->find_resource(session);
497  }
498  else
499  this->find_resource(session);
500  }
501  else if(this->on_error)
502  this->on_error(session->request, ec);
503  });
504  }
505 
506  void find_resource(const std::shared_ptr<Session> &session) {
507  // Upgrade connection
508  if(on_upgrade) {
509  auto it = session->request->header.find("Upgrade");
510  if(it != session->request->header.end()) {
511  // remove connection from connections
512  {
513  std::unique_lock<std::mutex> lock(*connections_mutex);
514  auto it = connections->find(session->connection.get());
515  if(it != connections->end())
516  connections->erase(it);
517  }
518 
519  on_upgrade(session->connection->socket, session->request);
520  return;
521  }
522  }
523  // Find path- and method-match, and call write_response
524  for(auto &regex_method : resource) {
525  auto it = regex_method.second.find(session->request->method);
526  if(it != regex_method.second.end()) {
527  regex::smatch sm_res;
528  if(regex::regex_match(session->request->path, sm_res, regex_method.first)) {
529  session->request->path_match = std::move(sm_res);
530  write_response(session, it->second);
531  return;
532  }
533  }
534  }
535  auto it = default_resource.find(session->request->method);
536  if(it != default_resource.end())
537  write_response(session, it->second);
538  }
539 
540  void write_response(const std::shared_ptr<Session> &session,
541  std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Response>, std::shared_ptr<typename ServerBase<socket_type>::Request>)> &resource_function) {
542  session->connection->set_timeout(config.timeout_content);
543  auto response = std::shared_ptr<Response>(new Response(session, config.timeout_content), [this](Response *response_ptr) {
544  auto response = std::shared_ptr<Response>(response_ptr);
545  response->send([this, response](const error_code &ec) {
546  if(!ec) {
547  if(response->close_connection_after_response)
548  return;
549 
550  auto range = response->session->request->header.equal_range("Connection");
551  for(auto it = range.first; it != range.second; it++) {
552  if(case_insensitive_equal(it->second, "close"))
553  return;
554  else if(case_insensitive_equal(it->second, "keep-alive")) {
555  auto new_session = std::make_shared<Session>(this->config.max_request_streambuf_size, response->session->connection);
556  this->read_request_and_content(new_session);
557  return;
558  }
559  }
560  if(response->session->request->http_version >= "1.1") {
561  auto new_session = std::make_shared<Session>(this->config.max_request_streambuf_size, response->session->connection);
562  this->read_request_and_content(new_session);
563  return;
564  }
565  }
566  else if(this->on_error)
567  this->on_error(response->session->request, ec);
568  });
569  });
570 
571  try {
572  resource_function(response, session->request);
573  }
574  catch(const std::exception &) {
575  if(on_error)
576  on_error(session->request, make_error_code::make_error_code(errc::operation_canceled));
577  return;
578  }
579  }
580  };
581 
582  template <class socket_type>
583  class Server : public ServerBase<socket_type> {};
584 
585  using HTTP = asio::ip::tcp::socket;
586 
587  template <>
588  class Server<HTTP> : public ServerBase<HTTP> {
589  public:
590  Server() noexcept : ServerBase<HTTP>::ServerBase(80) {}
591 
592  protected:
593  void accept() override {
594  auto connection = create_connection(*io_service);
595 
596  acceptor->async_accept(*connection->socket, [this, connection](const error_code &ec) {
597  auto lock = connection->handler_runner->continue_lock();
598  if(!lock)
599  return;
600 
601  // Immediately start accepting a new connection (unless io_service has been stopped)
602  if(ec != asio::error::operation_aborted)
603  this->accept();
604 
605  auto session = std::make_shared<Session>(config.max_request_streambuf_size, connection);
606 
607  if(!ec) {
608  asio::ip::tcp::no_delay option(true);
609  error_code ec;
610  session->connection->socket->set_option(option, ec);
611 
612  this->read_request_and_content(session);
613  }
614  else if(this->on_error)
615  this->on_error(session->request, ec);
616  });
617  }
618  };
619 } // namespace SimpleWeb
620 
621 #endif /* SERVER_HTTP_HPP */
void write(const char_type *ptr, std::streamsize n)
Write directly to stream buffer using std::ostream::write.
Definition: server_http.h:105
void write(const std::string &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing success status line, header fields, and content. ...
Definition: server_http.h:135
void write(StatusCode status_code=StatusCode::success_ok, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing status line, potential header fields, and empty content.
Definition: server_http.h:110
void write(std::istream &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing success status line, header fields, and content. ...
Definition: server_http.h:140
Definition: server_http.h:56
void write(StatusCode status_code, std::istream &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing status line, header fields, and content.
Definition: server_http.h:124
std::string address
Definition: server_http.h:299
Definition: server_http.h:48
Definition: server_http.h:264
Definition: server_http.h:51
Definition: server_http.h:180
std::shared_ptr< asio::io_service > io_service
If you have your own asio::io_service, store its pointer here before running start().
Definition: server_http.h:329
void stop() noexcept
Stop accepting new requests, and close current connections.
Definition: server_http.h:375
unsigned short port
Port number to use. Defaults to 80 for HTTP and 443 for HTTPS.
Definition: server_http.h:286
void write(const CaseInsensitiveMultimap &header)
Convenience function for writing success status line, and header fields.
Definition: server_http.h:145
void write(StatusCode status_code, const std::string &content, const CaseInsensitiveMultimap &header=CaseInsensitiveMultimap())
Convenience function for writing status line, header fields, and content.
Definition: server_http.h:116
static CaseInsensitiveMultimap parse(const std::string &query_string) noexcept
Returns query keys with percent-decoded values.
Definition: utility.h:100
Config config
Set before calling start().
Definition: server_http.h:304
CaseInsensitiveMultimap parse_query_string() noexcept
Returns query keys with percent-decoded values.
Definition: server_http.h:214
bool close_connection_after_response
Definition: server_http.h:153
Definition: server_http.h:156
void send(const std::function< void(const error_code &)> &callback=nullptr) noexcept
Use this function if you need to recursively send parts of a longer message.
Definition: server_http.h:91
Definition: server_http.h:279
static bool parse(std::istream &stream, std::string &method, std::string &path, std::string &query_string, std::string &version, CaseInsensitiveMultimap &header) noexcept
Parse request line and header fields.
Definition: utility.h:163
std::string string() noexcept
Convenience function to return std::string. The stream buffer is consumed.
Definition: server_http.h:164
Definition: server_http.h:220