3 #include <datatransfer.pb.h>
7 #include <condition_variable>
14 #include <boost/lockfree/spsc_queue.hpp>
15 #include "native_filesystem.h"
16 #include "rpcserver.h"
17 #include "socket_reads.h"
18 #include "socket_writes.h"
19 #include "zk_dn_client.h"
26 #define WRITE_BLOCK 80
28 #define READ_METADATA 82
29 #define REPLACE_BLOCK 83
31 #define BLOCK_CHECKSUM 85
32 #define TRANSFER_BLOCK 86
33 #define REQUEST_SHORT_CIRCUIT_FDS 87
34 #define RELEASE_SHORT_CIRCUIT_FDS 88
35 #define REQUEST_SHORT_CIRCUIT_SHM 89
36 #define BLOCK_GROUP_CHECKSUM 90
46 std::shared_ptr<nativefs::NativeFS> &fs,
47 std::shared_ptr<zkclient::ZkClientDn> &dn,
52 void serve(asio::io_service &io_service);
65 bool replicate(uint64_t len, std::string ip, std::string xferport,
82 std::string data,
int &read_len);
84 bool rmBlock(uint64_t block_id);
92 bool writeBlock(uint64_t block_id, std::string data);
93 bool poll_replicate();
95 bool poll_reconstruct();
100 std::atomic<std::uint32_t> xmits{0};
101 std::shared_ptr<nativefs::NativeFS> fs;
102 std::shared_ptr<zkclient::ZkClientDn> dn;
104 mutable std::mutex m;
105 std::condition_variable cv;
107 bool receive_header(tcp::socket &sock, uint16_t *version,
108 unsigned char *type);
109 bool write_header(tcp::socket &sock, uint16_t version,
unsigned char type);
110 void handle_connection(tcp::socket sock);
111 void processWriteRequest(tcp::socket &sock);
112 void processReadRequest(tcp::socket &sock);
113 void buildBlockOpResponse(std::string &response_string);
115 void buildFailBlockOpResponse(std::string &response_string);
117 bool writeFinalPacket(tcp::socket &sock, uint64_t, uint64_t);
118 template<
typename BufType>
120 const BufType &payload);
121 void synchronize(std::function<
void(
TransferServer &, tcp::socket &)> f,
126 template<
typename BufType>
127 bool TransferServer::writePacket(tcp::socket &sock,
129 const BufType &payload) {
130 std::string p_head_str;
131 p_head.SerializeToString(&p_head_str);
132 const uint16_t header_len = p_head_str.length();
136 const uint32_t payload_len = 4 + asio::buffer_size(payload);
139 return (rpcserver::write_int32(sock, payload_len) &&
140 rpcserver::write_int16(sock, header_len) &&
141 rpcserver::write_proto(sock, p_head_str) &&
142 payload_len - 4 == sock.write_some(payload));
bool replicate(uint64_t len, std::string ip, std::string xferport, ExtendedBlockProto blockToTarget)
Definition: data_transfer_server.cc:548
Definition: data_transfer_server.h:43
Definition: hdfs.pb.h:314
bool writeBlock(uint64_t block_id, std::string data)
Definition: data_transfer_server.cc:620
bool remote_read(uint64_t len, std::string ip, std::string xferport, ExtendedBlockProto blockToTarget, std::string data, int &read_len)
Definition: data_transfer_server.cc:430