RDFS
The Rice Comp413 2017 class' continuation on the work of the 2016 RDFS.
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Pages
data_transfer_server.h
1 // Copyright 2017 Rice University, COMP 413 2017
2 
3 #include <datatransfer.pb.h>
4 #include <hdfs.pb.h>
5 
6 #include <atomic>
7 #include <condition_variable>
8 #include <functional>
9 #include <mutex>
10 #include <queue>
11 #include <string>
12 
13 #include <asio.hpp>
14 #include <boost/lockfree/spsc_queue.hpp>
15 #include "native_filesystem.h"
16 #include "rpcserver.h"
17 #include "socket_reads.h"
18 #include "socket_writes.h"
19 #include "zk_dn_client.h"
20 
21 #pragma once
22 
23 /*
24  * Operation types
25  */
26 #define WRITE_BLOCK 80
27 #define READ_BLOCK 81
28 #define READ_METADATA 82
29 #define REPLACE_BLOCK 83
30 #define COPY_BLOCK 84
31 #define BLOCK_CHECKSUM 85
32 #define TRANSFER_BLOCK 86
33 #define REQUEST_SHORT_CIRCUIT_FDS 87
34 #define RELEASE_SHORT_CIRCUIT_FDS 88
35 #define REQUEST_SHORT_CIRCUIT_SHM 89
36 #define BLOCK_GROUP_CHECKSUM 90
37 #define CUSTOM 127
38 
39 using asio::ip::tcp;
42 
44  public:
45  TransferServer(int port,
46  std::shared_ptr<nativefs::NativeFS> &fs,
47  std::shared_ptr<zkclient::ZkClientDn> &dn,
48  int max_xmits = 10);
49 
50  TransferServer(const TransferServer &other) {}
51 
52  void serve(asio::io_service &io_service);
53  bool sendStats();
54 
65  bool replicate(uint64_t len, std::string ip, std::string xferport,
66  ExtendedBlockProto blockToTarget);
67 
79  bool remote_read(uint64_t len, std::string ip,
80  std::string xferport,
81  ExtendedBlockProto blockToTarget,
82  std::string data, int &read_len);
83 
84  bool rmBlock(uint64_t block_id);
85 
92  bool writeBlock(uint64_t block_id, std::string data);
93  bool poll_replicate();
94  bool poll_delete();
95  bool poll_reconstruct();
96 
97  private:
98  int max_xmits;
99  int port;
100  std::atomic<std::uint32_t> xmits{0};
101  std::shared_ptr<nativefs::NativeFS> fs;
102  std::shared_ptr<zkclient::ZkClientDn> dn;
103 
104  mutable std::mutex m;
105  std::condition_variable cv;
106 
107  bool receive_header(tcp::socket &sock, uint16_t *version,
108  unsigned char *type);
109  bool write_header(tcp::socket &sock, uint16_t version, unsigned char type);
110  void handle_connection(tcp::socket sock);
111  void processWriteRequest(tcp::socket &sock);
112  void processReadRequest(tcp::socket &sock);
113  void buildBlockOpResponse(std::string &response_string);
114  void ackPacket(tcp::socket &sock, PacketHeaderProto &p_head);
115  void buildFailBlockOpResponse(std::string &response_string);
116 
117  bool writeFinalPacket(tcp::socket &sock, uint64_t, uint64_t);
118  template<typename BufType>
119  bool writePacket(tcp::socket &sock, PacketHeaderProto p_head,
120  const BufType &payload);
121  void synchronize(std::function<void(TransferServer &, tcp::socket &)> f,
122  tcp::socket &sock);
123 };
124 
125 // Templated method to be generic across any asio buffer type.
126 template<typename BufType>
127 bool TransferServer::writePacket(tcp::socket &sock,
128  PacketHeaderProto p_head,
129  const BufType &payload) {
130  std::string p_head_str;
131  p_head.SerializeToString(&p_head_str);
132  const uint16_t header_len = p_head_str.length();
133  // Add 4 to account for the size of uint32_t.
134  // Also add in |cksums| u4s; these are a part of the payload and thus the
135  // payload length
136  const uint32_t payload_len = 4 + asio::buffer_size(payload);
137  // Write payload length, header length, header, payload.
138 
139  return (rpcserver::write_int32(sock, payload_len) &&
140  rpcserver::write_int16(sock, header_len) &&
141  rpcserver::write_proto(sock, p_head_str) &&
142  payload_len - 4 == sock.write_some(payload));
143 }
bool replicate(uint64_t len, std::string ip, std::string xferport, ExtendedBlockProto blockToTarget)
Definition: data_transfer_server.cc:548
Definition: datatransfer.pb.h:2542
Definition: data_transfer_server.h:43
Definition: hdfs.pb.h:314
bool writeBlock(uint64_t block_id, std::string data)
Definition: data_transfer_server.cc:620
bool remote_read(uint64_t len, std::string ip, std::string xferport, ExtendedBlockProto blockToTarget, std::string data, int &read_len)
Definition: data_transfer_server.cc:430