RDFS
The Rice Comp413 2017 class' continuation on the work of the 2016 RDFS.
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Pages
zk_dn_client.h
1 // Copyright 2017 Rice University, COMP 413 2017
2 
3 #ifndef ZOOKEEPER_INCLUDE_ZK_DN_CLIENT_H_
4 #define ZOOKEEPER_INCLUDE_ZK_DN_CLIENT_H_
5 
6 #include "zk_client_common.h"
7 #include <google/protobuf/message.h>
8 #include <atomic>
9 #include <string>
10 #include "hdfs.pb.h"
11 
12 class TransferServer;
13 
14 namespace zkclient {
15 
16 typedef struct {
17  std::string ip;
18  uint32_t ipcPort;
19 } DataNodeId;
20 
21 // TODO(2016): Store hostname in payload as a vararg?
22 typedef struct {
23  uint32_t ipcPort;
24  uint32_t xferPort;
25  uint64_t disk_bytes; // total space on disk
26  uint64_t free_bytes; // free space on disk
27  uint32_t xmits; // current number of xmits
29 
30 typedef struct {
31  char ipPort[256];
33 
38 typedef struct {
39  uint64_t block_size; // size of the block in bytes
40 } BlockZNode;
41 
46 class ZkClientDn : public ZkClientCommon {
47  public:
56  ZkClientDn(const std::string &ip,
57  const std::string &zkIpAndAddress,
58  uint64_t total_disk_space,
59  const uint32_t ipcPort = 50020,
60  const uint32_t xferPort = 50010);
61 
62  ZkClientDn(const std::string &ip,
63  std::shared_ptr<ZKWrapper>,
64  uint64_t total_disk_space,
65  const uint32_t ipcPort = 50020,
66  const uint32_t xferPort = 50010);
67  ~ZkClientDn();
68 
72  void registerDataNode(const std::string &ip,
73  uint64_t total_disk_space,
74  const uint32_t ipcPort,
75  const uint32_t xferPort);
76 
84  bool blockReceived(uint64_t uuid, uint64_t size_bytes);
85 
86  bool blockSizeUpdated(uint64_t uuid, uint64_t size_bytes);
87 
88  bool sendStats(uint64_t free_space, uint32_t xmits);
89 
93  void setTransferServer(std::shared_ptr<TransferServer> &server);
94 
101  bool push_dn_on_repq(std::string dn_name, uint64_t blockid);
102 
103  bool poll_replication_queue();
104 
105  bool poll_delete_queue();
106 
110  bool poll_reconstruct_queue();
111 
112  std::string get_datanode_id();
113 
114  private:
115  std::shared_ptr<TransferServer> server;
116 
124  void processDeleteQueue();
125 
126  std::string build_datanode_id(DataNodeId data_node_id);
127 
128  DataNodeId data_node_id;
129  DataNodePayload data_node_payload;
130 
131  static const std::string CLASS_NAME;
132 
139  void initWorkQueue(std::string queueName, void (*watchFuncPtr)(zhandle_t *,
140  int,
141  int,
142  const char *,
143  void *));
144 
148  void handleReplicateCmds(const std::string &path);
149 
153  bool handleReconstructCmds(const std::string &path);
154 
155  static void thisDNDeleteQueueWatcher(zhandle_t *zzh,
156  int type,
157  int state,
158  const char *path,
159  void *watcherCtx);
160 
164  bool find_datanode_with_block(uint64_t &block_uuid,
165  std::string &datanode,
166  int &error_code);
167 
171  bool buildExtendedBlockProto(hadoop::hdfs::ExtendedBlockProto *eb,
172  const std::uint64_t &block_id,
173  const uint64_t &block_size);
174 };
175 } // namespace zkclient
176 
177 #endif // ZOOKEEPER_INCLUDE_ZK_DN_CLIENT_H_
bool push_dn_on_repq(std::string dn_name, uint64_t blockid)
Definition: zk_dn_client.cc:298
Definition: zk_client_common.h:14
bool blockReceived(uint64_t uuid, uint64_t size_bytes)
Definition: zk_dn_client.cc:40
Definition: data_transfer_server.h:43
Definition: zk_dn_client.h:30
Definition: hdfs.pb.h:314
Definition: zk_dn_client.h:46
Definition: zk_dn_client.h:38
Definition: zk_dn_client.h:22
Definition: zk_dn_client.h:16
bool poll_reconstruct_queue()
Definition: zk_dn_client.cc:335
void registerDataNode(const std::string &ip, uint64_t total_disk_space, const uint32_t ipcPort, const uint32_t xferPort)
Definition: zk_dn_client.cc:204
void setTransferServer(std::shared_ptr< TransferServer > &server)
Definition: zk_dn_client.cc:319
ZkClientDn(const std::string &ip, const std::string &zkIpAndAddress, uint64_t total_disk_space, const uint32_t ipcPort=50020, const uint32_t xferPort=50010)
Definition: zk_dn_client.cc:32