RDFS
The Rice Comp413 2017 class' continuation on the work of the 2016 RDFS.
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Pages
zk_nn_client.h
1 // Copyright 2017 Rice University, COMP 413 2017
2 
3 #ifndef ZOOKEEPER_INCLUDE_ZK_NN_CLIENT_H_
4 #define ZOOKEEPER_INCLUDE_ZK_NN_CLIENT_H_
5 
6 #define MIN_XMITS 'x'
7 #define MAX_FREE_SPACE 'f'
8 
9 #include "zk_client_common.h"
10 #include <google/protobuf/message.h>
11 #include <queue>
12 #include <string>
13 #include <vector>
14 #include <utility>
15 #include "hdfs.pb.h"
16 #include "ClientNamenodeProtocol.pb.h"
17 #include "erasurecoding.pb.h"
18 #include <ConfigReader.h>
19 #include "util.h"
20 #include "LRUCache.h"
21 
22 #define MAX_USERNAME_LEN 256
23 
24 namespace zkclient {
25 
26 typedef enum class FileStatus : int {
27  UnderConstruction,
28  FileComplete,
29  UnderDestruction
30 } FileStatus;
31 
35 typedef struct {
36  uint32_t replication; // the block replication factor.
37  bool isEC; // 1 if EC file. 0 if replication based.
38  uint64_t blocksize;
39  // 1 for under construction, 0 for complete
40  zkclient::FileStatus under_construction;
41  int filetype; // 0 or 1 for dir, 2 for file, 3 for symlinks (not supported)
42  std::uint64_t length;
43 // https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/fs/
44 // FileSystem.html#setOwner(org.apache.hadoop.fs.Path,
45 // java.lang.String,
46 // java.lang.String)
47  std::uint64_t access_time;
48  std::uint64_t modification_time;
49  char owner[MAX_USERNAME_LEN]; // the client who created the file
50  char group[MAX_USERNAME_LEN];
51  char permissions[20][MAX_USERNAME_LEN]; // max 20 users can view the file.
52  int perm_length; // number of slots filled in permissions
53  int permission_number;
54  std::uint64_t last_block_id;
55 } FileZNode;
56 
60 typedef struct {
61  std::string clientName; // Same as the
62  // clientName passed
63  // from RenewLeaseRequestProto
64 } LeaseInfo;
65 
69 typedef struct {
70  uint64_t timestamp; // std::time()
71 } ClientInfo;
72 
73 struct TargetDN {
74  char policy;
75  std::string dn_id;
76  uint64_t free_bytes; // free space on disk
77  uint32_t num_xmits; // current number of xmits
78 
79  TargetDN(std::string id, int bytes, int xmits, char policy) : policy(policy),
80  dn_id(id),
81  free_bytes(bytes),
82  num_xmits(xmits) {
83  }
84 
85  bool operator<(const struct TargetDN &other) const {
86  // If storage policy is 'x' for xmits, choose the min xmits node
87  if (policy == MIN_XMITS) {
88  if (num_xmits == other.num_xmits) {
89  return free_bytes < other.free_bytes;
90  }
91  return num_xmits > other.num_xmits;
92 
93  // Default policy is choose the node with the most free space
94  } else {
95  if (free_bytes == other.free_bytes) {
96  return num_xmits > other.num_xmits;
97  }
98  return free_bytes < other.free_bytes;
99  }
100  }
101 };
102 
152 
156 class ZkNnClient : public ZkClientCommon {
157  public:
158  char policy;
159  const char* EC_REPLICATION = "replication";
160  const char* DEFAULT_EC_POLICY = "RS-6-3-1024k"; // the default policy.
161  uint32_t DEFAULT_EC_CELLSIZE = 1024*1024; // the default cell size is 64kb.
162  uint32_t DEFAULT_EC_ID = 1;
163  uint32_t REPLICATION_EC_ID = 63;
164  const uint32_t DEFAULT_DATA_UNITS = 6;
165  const uint32_t DEFAULT_PARITY_UNITS = 3;
166  const char* DEFAULT_EC_CODEC_NAME = "rs";
167  std::string DEFAULT_STORAGE_ID = "1"; // the default storage id.
168  std::string REPLICATION_STORAGE_ID = "63";
169  /*
170  * TODO(2017) For some reason, these fields can't be properly
171  * reclaimed/destructed in some cases.
172  */
173  ECSchemaProto DEFAULT_EC_SCHEMA;
174  ErasureCodingPolicyProto RS_SOLOMON_PROTO;
175  ErasureCodingPolicyProto REPLICATION_PROTO;
176  ECSchemaProto REPLICATION_1_2_SCHEMA;
177 
178 
179  enum class ListingResponse {
180  Ok, // 0
181  FileDoesNotExist, // 1
182  FailedChildRetrieval, // 2
183  FileAccessRestricted // 3
184  };
185 
186  enum class DeleteResponse {
187  Ok,
188  FileDoesNotExist,
189  FileUnderConstruction,
190  FileIsDirectoryMismatch,
191  FailedChildRetrieval,
192  FailedBlockRetrieval,
193  FailedDataNodeRetrieval,
194  FailedZookeeperOp,
195  FileAccessRestricted
196  };
197 
198  enum class GetFileInfoResponse {
199  Ok,
200  FileDoesNotExist,
201  FailedReadZnode,
202  FileAccessRestricted
203  };
204 
205  enum class MkdirResponse {
206  Ok,
207  FailedZnodeCreation
208  };
209 
210  enum class CreateResponse {
211  Ok,
212  FileAlreadyExists,
213  FailedMkdir,
214  FailedCreateZnode
215  };
216 
217  enum class RenameResponse {
218  Ok,
219  FileDoesNotExist,
220  RenameOpsFailed,
221  InvalidType,
222  MultiOpFailed,
223  FileAccessRestricted
224  };
225 
226  enum class ErasureCodingPoliciesResponse {
227  Ok
228  };
229 
230  enum class ErasureCodingPolicyResponse {
231  Ok,
232  FileDoesNotExist
233  };
234 
235  enum class SetErasureCodingPolicyResponse {
236  Ok,
237  FileDoesNotExist,
238  FailedZookeeperOp
239  };
240 
241  explicit ZkNnClient(std::string zkIpAndAddress)
242  : ZkClientCommon(zkIpAndAddress),
243  cache(new lru::Cache<std::string,
244  std::shared_ptr<GetListingResponseProto>>(64, 10)) {
245  mkdir_helper("/", false);
246  populateDefaultECProto();
247  }
248 
257  explicit ZkNnClient(std::shared_ptr<ZKWrapper> zk_in,
258  bool secureMode = false)
259  : ZkClientCommon(zk_in),
260  cache(new lru::Cache<std::string,
261  std::shared_ptr<GetListingResponseProto>>(64, 10)) {
262  mkdir_helper("/", false);
263  isSecureMode = secureMode;
264  populateDefaultECProto();
265  }
266  void register_watches();
267 
268 
272  uint64_t current_time_ms();
273 
277  uint64_t get_client_lease_timestamp(std::string client_name);
278 
284 
285  void recover_lease(RecoverLeaseRequestProto &req,
293  GetFileInfoResponse get_info(GetFileInfoRequestProto &req,
295  std::string client_name = "default");
296 
303  CreateResponse create_file(CreateRequestProto &request,
304  CreateResponseProto &response);
305 
315  std::string client_name = "default");
322  MkdirResponse mkdir(MkdirsRequestProto &req,
323  MkdirsResponseProto &res);
324 
332  DeleteResponse destroy(DeleteRequestProto &req,
333  DeleteResponseProto &res,
334  std::string client_name = "default");
335 
343  void complete(CompleteRequestProto &req,
345  std::string client_name = "default");
346 
354  RenameResponse rename(RenameRequestProto &req,
355  RenameResponseProto &res,
356  std::string client_name = "default");
357 
365  ListingResponse get_listing(GetListingRequestProto &req,
367  std::string client_name = "default");
377  std::string client_name = "default");
378 
383  const std::string &path,
384  FileZNode &znode_data);
385 
386  void set_node_policy(char policy);
387 
388  char get_node_policy();
389 
394  ErasureCodingPoliciesResponse get_erasure_coding_policies(
397 
401  ErasureCodingPolicyResponse get_erasure_coding_policy_of_path(
404 
408  SetErasureCodingPolicyResponse set_erasure_coding_policy_of_path(
411 
412 // bool modifyAclEntries(ModifyAclEntriesRequestProto req,
413 // ModifyAclEntriesResponseProto res);
422 
432  std::string client_name = "default");
443  std::string client_name = "default");
444  /*
445  * Sets the owner of the file.
446  * @param req SetOwnerRequestProto
447  * @param res SetOwnerResponseProto
448  * @return boolean indicating whether operation succeeded or not
449  */
451 
463  bool add_block(const std::string &fileName,
464  u_int64_t &block_id,
465  std::vector<std::string> &dataNodes,
466  uint32_t replication_factor);
467 
478  bool add_block_group(const std::string &filePath,
479  u_int64_t &block_group_id,
480  std::vector<std::string> &dataNodes,
481  std::vector<char> &blockIndices,
482  uint32_t total_num_storage_blocks);
483 
491  const std::string &fileName,
492  u_int64_t &block_group_id);
493 
500  u_int64_t generate_storage_block_id(
501  uint64_t block_group_id,
502  uint64_t index_within_group);
507  u_int64_t generate_block_group_id();
508 
515  u_int64_t get_block_group_id(u_int64_t storage_block_id);
516 
522  u_int64_t get_index_within_block_group(u_int64_t storage_block_id);
523 
533  std::string client_name = "default");
534 
535  bool previousBlockComplete(uint64_t prev_id);
539  bool file_exists(const std::string &path);
540 
544  bool get_block_size(const u_int64_t &block_id, uint64_t &blocksize);
545 
546  // this is public because we have not member functions in this file
547  static const std::string CLASS_NAME;
548 
549  bool find_live_datanodes(const uint64_t blockId, int error_code,
550  std::vector<std::string> &live_data_nodes);
551 
552  bool find_datanode_for_block(std::vector<std::string> &datanodes,
553  std::vector<std::string> &excluded_dns,
554  const u_int64_t blockId,
555  uint32_t replication_factor,
556  uint64_t blocksize);
557 
558  bool find_all_datanodes_with_block(const uint64_t &block_uuid,
559  std::vector<std::string> &rdatanodes,
560  int &error_code);
561 
562  bool rename_ops_for_file(const std::string &src, const std::string &dst,
563  std::vector<std::shared_ptr<ZooOp>> &ops);
564  bool rename_ops_for_dir(const std::string &src, const std::string &dst,
565  std::vector<std::shared_ptr<ZooOp>> &ops);
566 
572  bool check_acks();
573 
574 // get locations given src, offset, and length
575  void get_block_locations(const std::string &src,
576  google::protobuf::uint64 offset,
577  google::protobuf::uint64 length,
578  LocatedBlocksProto *blocks,
579  std::string client_name = "default");
580 
585  void update_block_for_pipeline(UpdateBlockForPipelineRequestProto &req,
587  bool process_request(std::string client_name, std::string file_path,
588  AppendRequestProto &req);
589  bool get_primary_block_info(std::string file_path,
590  AppendRequestProto &req,
591  AppendResponseProto &res);
592  bool check_lease(std::string client_name, std::string file_path);
593 
597  void read_file_znode(FileZNode &znode_data, const std::string &path);
598 
599  bool cache_contains(const std::string &path);
600 
601  int cache_size();
602 
608  std::string find_parent(const std::string &path);
609 
610  private:
611  bool set_located_block(LocatedBlockProto* locatedBlockProto,
612  uint64_t block_id, uint64_t block_size);
613 
614  bool set_file_status(std::string file_path,
615  HdfsFileStatusProto* hdfsFileStatusProto);
616 
617  void recover_lease_helper(RecoverLeaseRequestProto &req,
619 
620  void renew_lease_helper(RenewLeaseRequestProto &req,
622 
626  bool sort_by_xmits(const std::vector<std::string> &unsorted_dn_ids,
627  std::vector<std::string> &sorted_dn_ids);
628 
633  void set_file_info(HdfsFileStatusProto *fs,
634  const std::string &path,
635  FileZNode &node);
639  std::string ClientZookeeperPath(const std::string & clientname);
643  std::string LeaseZookeeperPath(const std::string & hadoopPath);
644 
649  std::string ZookeeperBlocksPath(const std::string &hadoopPath);
650 
655  std::string ZookeeperFilePath(const std::string &hadoopPath);
656 
661 
666  bool create_file_znode(const std::string &path, FileZNode *znode_data);
667 
671  void set_mkdir_znode(FileZNode *znode_data);
677  MkdirResponse mkdir_helper(const std::string &path, bool create_parent);
678 
683  void file_znode_struct_to_vec(FileZNode *znode_data,
684  std::vector<std::uint8_t> &data);
685  template <class T>
686  void znode_data_to_vec(T *znode_data, std::vector<std::uint8_t> &data);
687  template <class T>
688  void read_znode_data(T &znode_data, const std::string &path);
689 
693  void delete_node_wrapper(std::string &path,
694  DeleteResponseProto &response);
695 
696  DeleteResponse destroy_helper(const std::string &path,
697  std::vector<std::shared_ptr<ZooOp>> &ops);
703  bool recover_ec_blocks(const std::vector<std::string> &to_ec_recover,
704  int error_code);
705 
706 
712  bool replicate_blocks(const std::vector<std::string> &to_replicate,
713  int error_code);
714 
719  int ms_since_creation(std::string &path);
720 
724 // bool updateLocatedBlockProto(LocatedBlockProto* location,
725 // uint64_t block_id);
726 
732  bool buildDatanodeInfoProto(DatanodeInfoProto *dn_info,
733  const std::string &data_node);
734 
738  bool buildTokenProto(hadoop::common::TokenProto *token);
739 
743  bool buildExtendedBlockProto(ExtendedBlockProto *eb,
744  const std::uint64_t &block_id,
745  const uint64_t &block_size);
746 
751  static void watcher_health(zhandle_t *zzh, int type, int state,
752  const char *path, void *watcherCtx);
753 
757  static void watcher_health_child(zhandle_t *zzh, int type, int state,
758  const char *path, void *watcherCtx);
759 
760  static void watcher_listing(zhandle_t *zzh, int type, int state,
761  const char *path, void *watcherCtx);
762 
766  bool lease_expired(std::string lease_holder_client);
767 
774  bool blockDeleted(uint64_t uuid, std::string id);
775 
779  void populateDefaultECProto();
780 
788  bool checkAccess(std::string username, FileZNode &znode_data);
789 
790  const int UNDER_CONSTRUCTION = 1;
791  const int FILE_COMPLETE = 0;
792  const int UNDER_DESTRUCTION = 2;
793 
794  const int IS_FILE = 2;
795  const int IS_DIR = 1;
796 // TODO(2016): Should eventually be read from a conf file
797 // in millisecons, 10 minute timeout when waiting for
798 // replication acknowledgements
799  const int ACK_TIMEOUT = 600000;
800 
801 // Boolean indicating whether zk_nn is in secure mode
802  bool isSecureMode = false;
803  const uint64_t EXPIRATION_TIME =
804  2 * 60 * 60 * 1000; // 2 hours in milliseconds.
805 
806  /* TODO(2017) This needs to be reclaimed properly. */
808 };
809 
810 } // namespace zkclient
811 
812 #endif // ZOOKEEPER_INCLUDE_ZK_NN_CLIENT_H_
uint64_t get_client_lease_timestamp(std::string client_name)
Definition: zk_nn_client.cc:3180
Definition: ClientNamenodeProtocol.pb.h:4767
Definition: ClientNamenodeProtocol.pb.h:12580
Definition: ClientNamenodeProtocol.pb.h:4337
void complete(CompleteRequestProto &req, CompleteResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:1289
Definition: LRUCache.h:209
Definition: ClientNamenodeProtocol.pb.h:415
bool add_block(AddBlockRequestProto &req, AddBlockResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:825
Definition: ClientNamenodeProtocol.pb.h:2128
Definition: ClientNamenodeProtocol.pb.h:5364
ErasureCodingPolicyResponse get_erasure_coding_policy_of_path(GetErasureCodingPolicyRequestProto &req, GetErasureCodingPolicyResponseProto &res)
Definition: zk_nn_client.cc:2015
Definition: ClientNamenodeProtocol.pb.h:8436
Definition: ClientNamenodeProtocol.pb.h:1917
Definition: zk_client_common.h:14
Definition: ClientNamenodeProtocol.pb.h:5266
bool rename_ops_for_file(const std::string &src, const std::string &dst, std::vector< std::shared_ptr< ZooOp >> &ops)
Definition: zk_nn_client.cc:2639
u_int64_t generate_storage_block_id(uint64_t block_group_id, uint64_t index_within_group)
Definition: zk_nn_client.cc:2461
Definition: zk_nn_client.h:69
uint32_t get_total_num_storage_blocks(const std::string &fileName, u_int64_t &block_group_id)
Definition: ClientNamenodeProtocol.pb.h:1115
bool abandon_block(AbandonBlockRequestProto &req, AbandonBlockResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:941
Definition: ClientNamenodeProtocol.pb.h:5563
Definition: ClientNamenodeProtocol.pb.h:11431
Definition: ClientNamenodeProtocol.pb.h:2000
ZkNnClient(std::shared_ptr< ZKWrapper > zk_in, bool secureMode=false)
Definition: zk_nn_client.h:257
Definition: ClientNamenodeProtocol.pb.h:2432
u_int64_t get_block_group_id(u_int64_t storage_block_id)
Definition: zk_nn_client.cc:2479
Definition: ClientNamenodeProtocol.pb.h:2211
Definition: ClientNamenodeProtocol.pb.h:1804
Definition: hdfs.pb.h:1380
bool file_exists(const std::string &path)
Definition: zk_nn_client.cc:322
Definition: hdfs.pb.h:3859
Definition: ConfigReader.h:18
u_int64_t generate_block_group_id()
Definition: zk_nn_client.cc:2471
Definition: erasurecoding.pb.h:53
Definition: hdfs.pb.h:3304
Definition: zk_nn_client.h:60
Definition: ClientNamenodeProtocol.pb.h:2349
Definition: erasurecoding.pb.h:607
Definition: erasurecoding.pb.h:249
uint64_t current_time_ms()
Definition: zk_nn_client.cc:3095
Definition: zk_nn_client.h:35
Definition: ClientNamenodeProtocol.pb.h:294
Definition: ClientNamenodeProtocol.pb.h:4226
bool append_file(AppendRequestProto &req, AppendResponseProto &res)
Definition: zk_nn_client.cc:396
bool set_owner(SetOwnerRequestProto &req, SetOwnerResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:785
Definition: erasurecoding.pb.h:332
void renew_lease(RenewLeaseRequestProto &req, RenewLeaseResponseProto &res)
Definition: zk_nn_client.cc:630
Definition: hdfs.pb.h:314
GetFileInfoResponse get_info(GetFileInfoRequestProto &req, GetFileInfoResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:1044
ErasureCodingPoliciesResponse get_erasure_coding_policies(GetErasureCodingPoliciesRequestProto &req, GetErasureCodingPoliciesResponseProto &res)
Definition: zk_nn_client.cc:1995
RenameResponse rename(RenameRequestProto &req, RenameResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:1562
Definition: ClientNamenodeProtocol.pb.h:2600
Definition: ClientNamenodeProtocol.pb.h:3924
Definition: ClientNamenodeProtocol.pb.h:8338
Definition: ClientNamenodeProtocol.pb.h:12693
Definition: ClientNamenodeProtocol.pb.h:5447
Definition: erasurecoding.pb.h:705
Definition: ClientNamenodeProtocol.pb.h:989
bool set_permission(SetPermissionRequestProto &req, SetPermissionResponseProto &res)
Definition: zk_nn_client.cc:2253
Definition: hdfs.pb.h:2354
ListingResponse get_listing(GetListingRequestProto &req, GetListingResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:1706
Definition: Security.pb.h:50
Definition: ClientNamenodeProtocol.pb.h:3119
DeleteResponse destroy(DeleteRequestProto &req, DeleteResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:1412
void set_file_info_content(ContentSummaryProto *status, const std::string &path, FileZNode &znode_data)
Definition: zk_nn_client.cc:2163
Definition: ClientNamenodeProtocol.pb.h:11333
std::string find_parent(const std::string &path)
Definition: zk_nn_client.cc:1072
void get_content(GetContentSummaryRequestProto &req, GetContentSummaryResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:2136
void get_block_locations(GetBlockLocationsRequestProto &req, GetBlockLocationsResponseProto &res, std::string client_name="default")
Definition: zk_nn_client.cc:1806
bool add_block_group(const std::string &filePath, u_int64_t &block_group_id, std::vector< std::string > &dataNodes, std::vector< char > &blockIndices, uint32_t total_num_storage_blocks)
Definition: zk_nn_client.cc:2370
Definition: hdfs.pb.h:3585
Definition: ClientNamenodeProtocol.pb.h:2981
Definition: ClientNamenodeProtocol.pb.h:4553
Definition: ClientNamenodeProtocol.pb.h:894
Definition: ClientNamenodeProtocol.pb.h:4646
void read_file_znode(FileZNode &znode_data, const std::string &path)
Definition: zk_nn_client.cc:711
u_int64_t get_index_within_block_group(u_int64_t storage_block_id)
Definition: zk_nn_client.cc:2484
Definition: ClientNamenodeProtocol.pb.h:688
Definition: hdfs.pb.h:3719
Definition: zk_nn_client.h:73
Definition: ClientNamenodeProtocol.pb.h:3808
bool get_block_size(const u_int64_t &block_id, uint64_t &blocksize)
Definition: zk_nn_client.cc:330
Definition: ClientNamenodeProtocol.pb.h:4430
MkdirResponse mkdir(MkdirsRequestProto &req, MkdirsResponseProto &res)
Definition: zk_nn_client.cc:1656
Definition: erasurecoding.pb.h:166
CreateResponse create_file(CreateRequestProto &request, CreateResponseProto &response)
Definition: zk_nn_client.cc:1488
bool check_acks()
Definition: zk_nn_client.cc:2852
SetErasureCodingPolicyResponse set_erasure_coding_policy_of_path(SetErasureCodingPolicyRequestProto &req, SetErasureCodingPolicyResponseProto &res)
Definition: zk_nn_client.cc:2046
Definition: zk_nn_client.h:156
Definition: hdfs.pb.h:838