/* Copyright (c) 2013 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" #endif #include #include #include #include #include "call-stub.h" #include "defaults.h" #include "xlator.h" #include "recon_driver.h" #include "recon_xlator.h" typedef struct _nsr_recon_fd_s { int32_t term; nsr_recon_driver_state_t state; uint32_t first_index; uint32_t last_index; call_frame_t *frame; } nsr_recon_fd_t; #if defined(NSR_DEBUG) void _recon_main_log (const char *func, int line, char *member, FILE *fp, char *fmt, ...) { va_list ap; char *buf = NULL; int retval; if (!fp) { fp = recon_create_log(member,"recon-main-log"); if (!fp) { return; } } va_start(ap,fmt); retval = vasprintf(&buf,fmt,ap); if (buf) { fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf); free(buf); } va_end(ap); } #endif // Given fd, get back the NSR based fd context. static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd) { uint64_t tmp = 0; int32_t ret = -1; if ((ret = fd_ctx_get(fd, this, &tmp)) != 0) { return ret; } else { *rfd = (nsr_recon_fd_t *)tmp; return 0; } } // Add the frame in q after associating with term // term usage tbd static void put_frame(nsr_recon_private_t *priv, call_frame_t *frame, uint32_t term) { xlator_t *this = priv->this; recon_main_log (this->name, GF_LOG_INFO, "adding frame for term %d \n", term); priv->frame = frame; return; } // get the frame from the queue given the term // term usage tbd static void get_frame(nsr_recon_private_t *priv, call_frame_t **frame, uint32_t term) { if (frame != NULL) *frame = priv->frame; priv->frame = NULL; return; } // check if there are outstanding frames static gf_boolean_t is_frame(nsr_recon_private_t *priv) { return((priv->frame != NULL) ? _gf_true : _gf_false); } #define ENTRY_SIZE 128 long get_entry_count (char *path) { int fd; struct stat buf; unsigned long entries = -1; long min; /* last entry not known to be empty */ long max; /* first entry known to be empty */ long curr; char entry[ENTRY_SIZE]; void *err_label = &&done; fd = open(path,O_RDONLY); if (fd < 0) { goto *err_label; } err_label = &&close_fd; if (fstat(fd,&buf) < 0) { goto *err_label; } min = 0; max = buf.st_size / ENTRY_SIZE; printf("max = %ld\n",max); while ((min+1) < max) { curr = (min + max) / 2; printf("trying entry %ld\n",curr); if (lseek(fd,curr*ENTRY_SIZE,SEEK_SET) < 0) { goto *err_label; } if (read(fd,entry,sizeof(entry)) != sizeof(entry)) { goto *err_label; } if ((entry[0] == '_') && (entry[1] == 'P')) { min = curr; } else { max = curr; } } entries = max; close_fd: close(fd); done: return entries; } // Get the term info for the term number specified void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt) { char path[PATH_MAX]; long entries; bzero(lt, sizeof(nsr_recon_last_term_info_t)); lt->last_term = term; sprintf(path,"%s/%s%d",bp,"TERM.",term); entries = get_entry_count(path); if (entries > 1) { /* The first entry is actually a header. */ lt->first_index = 1; /* * This seems wrong, because it means that last_index*128 will * be exactly at EOF and commited_ops will be one greater than * it should be. Maybe some other code makes the exact * opposite mistake to compensate. */ lt->last_index = lt->commited_ops = (int)entries; } recon_main_log (this->name, GF_LOG_INFO, "for term=%d got first_index=%d last_index=%d commited_ops=%d\n", term, lt->first_index, lt->last_index, lt->commited_ops); return; } // Given the term number, find the last term in the changelogs void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt) { uint32_t t = term; struct stat buf; char path[PATH_MAX]; bzero(lt, sizeof(nsr_recon_last_term_info_t)); while(t) { // journal file is of type TERM-1.jnl sprintf(path,"%s/%s%d",bp,"TERM.",t); if (!stat(path, &buf)) { nsr_recon_libchangelog_get_this_term_info(this, bp, t, lt); recon_main_log (this->name, GF_LOG_INFO, "got last term given current term %d as %d\n", term, t); return; } t--; } recon_main_log (this->name, GF_LOG_INFO, "got no last term given current term %d \n", term); return; } // Return back the frame stored against the term void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno) { call_frame_t *old_frame = NULL; xlator_t *this = priv->this; get_frame(priv, &old_frame, term); if (old_frame) { recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n"); // first return the original write for which this ack was sent STACK_UNWIND_STRICT (writev, old_frame, status, op_errno, NULL, NULL, NULL); } else { recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n"); } } typedef enum records_type_t { fop_gfid_pgfid_oldloc_newloc = 1, fop_gfid_pgfid_entry = fop_gfid_pgfid_oldloc_newloc + 1, fop_gfid = fop_gfid_pgfid_entry + 1 , fop_gfid_offset = fop_gfid + 1, fop_gfid_offset_len = fop_gfid_offset + 1, } records_type_t; // Get the backend ./glusterfs/xx/xx/<...> path static void get_gfid_path(nsr_recon_private_t *priv, char *gfid, char *path) { strcpy(path, priv->base_dir); strcat(path, "/.glusterfs/"); strncat(path,gfid,2); strcat(path,"/"); strncat(path,gfid+2,2); strcat(path,"/"); strcat(path,gfid); } // Get the link to which backend points to static gf_boolean_t get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path) { char lp[PATH_MAX]; xlator_t *this = priv->this; get_gfid_path(priv,gfid, lp); if (readlink(lp, path, 255) == -1) { GF_ASSERT(0); recon_main_log(priv->this, GF_LOG_ERROR, "cannot get readlink for %s\n",lp); return _gf_false; } return _gf_true; } // Get the list of changelog records given a term , first and last index. // // TBD: rewrite this hideous ball of mud in at least the following ways: // // (1) Break out the code for handling a single record into a separate // function, to make error handling easier and reduce "indentation // creep" so the code's readable. // // (2) Change all of the fop_xxx_yyy nonsense to OR together values // like FOP_HAS_FIELD_XXX and FOP_HAS_FIELD_YYY, to reduce code // duplication and facilitate the addition of new fields. // // (3) Stop making so many assumptions about the underlying formats. // The code as it is won't even work for the existing binary format, // let alone as changelog evolves over time. // // Really, 90% of this code should just GO AWAY in favor of using // libgfchangelog, enhanced as necessary to support our needs. gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf) { // do a mmap; seek into the first and read all records till last. // TBD - right now all records are pseudo holes but mark them as fills. // TBD - pseudo hole to be implemented when actual fsync gets done on data. char *rb = NULL, *orig = NULL; char path[PATH_MAX]; int fd; uint32_t index = 0; recon_main_log (this->name, GF_LOG_INFO, "libchangelog_get_records called for term %d index from %d to %d \n", term, first, last ); orig = rb = GF_CALLOC(128, ((last - first) + 1), gf_mt_recon_private_t); sprintf(path,"%s/%s%d",bp,"TERM.",term); fd = open(path, O_RDONLY); if (fd == -1) { return _gf_false; } else { char *start = NULL; nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf; if (first == 0) lseek(fd, 128, SEEK_SET); else lseek(fd, first * 128, SEEK_SET); if (read(fd, rb, (last - first + 1) * 128) == -1) { return _gf_false; } start = rb; index = first; do { recon_main_log (this->name, GF_LOG_INFO, "libchangelog_get_records start inspecting records at index %d \n", index ); if (!strncmp(start, "_PRE_", 5)) { char op_str[4]; uint32_t i=0, opcode = 0; records_type_t type; start += 5; // increment by the NULLs after the PRE start += 4; // now we have the opcode i = 0; while (*start != 0) { op_str[i++] = (*start); start++; } op_str[i] = '\0'; opcode = strtoul(op_str, NULL, 10); recon_main_log (this->name, GF_LOG_ERROR, "libchangelog_get_records: got opcode %d @index %d\n", opcode, index); if ((opcode == GF_FOP_RENAME)) { type = fop_gfid_pgfid_oldloc_newloc; } else if ((opcode == GF_FOP_UNLINK) || (opcode == GF_FOP_RMDIR) || (opcode == GF_FOP_LINK) || (opcode == GF_FOP_MKDIR) || (opcode == GF_FOP_SYMLINK) || (opcode == GF_FOP_MKNOD) || (opcode == GF_FOP_CREATE)) { type = fop_gfid_pgfid_entry; } else if ((opcode == GF_FOP_FSETATTR) || (opcode == GF_FOP_SETATTR) || (opcode == GF_FOP_FREMOVEXATTR) || (opcode == GF_FOP_REMOVEXATTR) || (opcode == GF_FOP_SETXATTR) || (opcode == GF_FOP_FSETXATTR)) { type = fop_gfid; } else if ((opcode == GF_FOP_TRUNCATE) || (opcode == GF_FOP_FTRUNCATE)) { type = fop_gfid_offset; } else if (opcode == GF_FOP_WRITE) { type = fop_gfid_offset_len; } else { recon_main_log (this->name, GF_LOG_ERROR, "libchangelog_get_records:got no proper opcode %d @index %d\n", opcode, index); //GF_ASSERT(0); // make this as a hole. // TBD - check this logic later. maybe we should raise alarm here because // this means that changelog is corrupted. We are not handling changelog // corruptions as of now. rec->type = NSR_LOG_HOLE; goto finish; } // TBD - handle psuedo holes once that logic is in. rec->type = NSR_LOG_FILL; recon_main_log (this->name, GF_LOG_ERROR, "libchangelog_get_records:got type %d at index %d \n", rec->type, index); rec->op = opcode; // Now get the gfid and parse it // before that increment the pointer start++; for (i=0; i < 36; i++) { rec->gfid[i] = (*start); start++; } rec->gfid[i] = '\0'; if (opcode == GF_FOP_SYMLINK) { // the symlink would have been removed. Hence ignore this. // TBD - have an uniform error policy in case of such cases. // Right now we are handling some on the source and some on the destination. if(get_link_using_gfid(this->private, rec->gfid, rec->link_path) == _gf_false) { rec->type = NSR_LOG_HOLE; goto finish; } } GF_ASSERT(*start == 0); start ++; i = 0; // If type is fop_gfid_offset+_len, get offset if ((type == fop_gfid_offset) || (type == fop_gfid_offset_len)) { char offset_str[128]; while(*start != 0) { offset_str[i++] = *start; start ++; } offset_str[i] = '\0'; // get over the 0 start++; rec->offset = strtoul(offset_str, NULL, 10); recon_main_log (this->name, GF_LOG_ERROR, "libchangelog_get_records:got offset %d @index %d \n", rec->offset, index); } i = 0; if (type == fop_gfid_offset_len) { char len_str[128]; while(*start != 0) { len_str[i++] = *start; start ++; } len_str[i] = '\0'; // get over the 0 start++; rec->len = strtoul(len_str, NULL, 10); recon_main_log (this->name, GF_LOG_ERROR, "libchangelog_get_records:got length %d @index %d \n", rec->len, index); } i = 0; if (type == fop_gfid_pgfid_entry) { // first get the gfid and then the path for (i=0; i < 36; i++) { rec->pargfid[i] = (*start); start++; } rec->pargfid[i] = '\0'; GF_ASSERT(*start == '/'); start ++; i = 0; while(*start != 0) { rec->entry[i++] = *start; start ++; } rec->entry[i] = '\0'; // get over the 0 start++; /* * Having to add this as a special case * is awful. See the function header * comment for the real solution. */ if (opcode == GF_FOP_CREATE) { rec->mode = 0; while (*start != '\0') { rec->mode *= 10; rec->mode += *start - '0'; ++start; } ++start; } recon_main_log (this->name, GF_LOG_ERROR, "libchangelog_get_records:got entry %s @index %d \n", rec->entry, index); } i = 0; if (type == fop_gfid_pgfid_oldloc_newloc) { // first get the source and then the destination // source stuff gets stored in pargfid/entry for (i=0; i < 36; i++) { rec->pargfid[i] = (*start); start++; } rec->pargfid[i] = '\0'; GF_ASSERT(*start == '/'); start ++; i=0; while(*start != 0) { rec->entry[i++] = *start; start ++; } rec->entry[i] = '\0'; // get over the 0 start++; // dst stuff gets stored in gfid/newloc for (i=0; i < 36; i++) { rec->gfid[i] = (*start); start++; } rec->gfid[i] = '\0'; GF_ASSERT(*start == '/'); start ++; i = 0; while(*start != 0) { rec->newloc[i++] = *start; start ++; } rec->newloc[i] = '\0'; // get over the 0 start++; } ENDIAN_CONVERSION_RD((*rec), _gf_false); //htonl } finish: if (index == last) break; index++; rb += 128; start = rb; rec++; } while(1); } GF_FREE(orig); close(fd); recon_main_log (this->name, GF_LOG_INFO, "libchangelog_get_records finsihed inspecting records for term %d \n", term); return _gf_true; } int32_t nsr_recon_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata) { int32_t op_ret = 0; int32_t op_errno = 0; nsr_recon_fd_t *rfd = NULL; recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open called for path %s \n",loc->path ); rfd = GF_CALLOC (1, sizeof (*rfd), gf_mt_recon_private_t); if (!rfd) { op_ret = -1; op_errno = ENOMEM; } op_ret = fd_ctx_set (fd, this, (uint64_t)(long)rfd); if (op_ret) { op_ret = -1; op_errno = EINVAL; } recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open returns with %d for path %s \n",op_ret,loc->path ); STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd, NULL); return 0; } int32_t nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, dict_t *xdata) { nsr_recon_fd_t *rfd = NULL; nsr_recon_private_t *priv = NULL; int32_t op_ret = 0; int32_t op_errno = 0; int32_t ret = 0; ret = this_fd_ctx_get (fd, this, &rfd); if (ret < 0) { return -1; } priv = (nsr_recon_private_t *)this->private; recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset ); GF_ASSERT(count == 1); switch (offset) { // client(brick, leader) writes the role of the node case nsr_recon_xlator_sector_1 : { nsr_recon_role_t rr; memcpy((void *)&rr, (void *)vector[0].iov_base, sizeof(rr)); ENDIAN_CONVERSION_RR(rr, _gf_true); //ntohl recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called to set role %d\n", rr.role); if ((rr.role != leader) && (rr.role != reconciliator) && (rr.role != resolutor) && (rr.role != joiner)) { recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cannot set state \n"); STACK_UNWIND_STRICT (writev, frame, -1, op_errno, NULL, NULL, NULL); } GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH); // Check if already a role play is going on. If yes return with EAGAIN. // Ideally we should check if we have got a higher term number while // servicing a lower term number; if so abort the older one. // However the abort infrastructure needs to be sketched properly; TBD. if (is_frame(priv) == _gf_true) { recon_main_log (this->name, GF_LOG_ERROR, "nsr_recon_writev set_role - already role play \n"); STACK_UNWIND_STRICT (writev, frame, -1, EAGAIN, NULL, NULL, NULL); } else { // Store the stack frame so that when the actual job gets finished // we send the response back to the brick. put_frame(priv, frame, rr.current_term); if (nsr_recon_driver_set_role(priv->driver_thread_context, &rr, rr.current_term) == _gf_false) { get_frame(priv, NULL, rr.current_term); recon_main_log (this->name, GF_LOG_ERROR, "nsr_recon_writev set_role - cannot seem to set role \n"); STACK_UNWIND_STRICT (writev, frame, -1, op_errno, NULL, NULL, NULL); } else { recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev set_role - set role succesfully \n"); } } break; } // client(reconciliator) writes how much it needs for the read case nsr_recon_xlator_sector_2 : { nsr_recon_log_info_t li; memcpy((void *)&li, (void *)vector[0].iov_base, sizeof(li)); ENDIAN_CONVERSION_LI(li, _gf_true); //ntohl recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev - setting term info for reconcilation info. term=%d, first_index=%d,start_index=%d \n", li.term, li.first_index, li.last_index); rfd->term = li.term; rfd->last_index = li.last_index; rfd->first_index = li.first_index; STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, NULL, NULL, NULL); break; } // client(reconciliator) writes term for which it needs info case nsr_recon_xlator_sector_3 : { int32_t term; memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term)); term = ntohl(term); //ntohl recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev - setting term info for term info. term=%d\n", term); rfd->term = term; STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, NULL, NULL, NULL); break; } // client(reconciliator) writes current term so that it gets last term info later case nsr_recon_xlator_sector_4 : { int32_t term; memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term)); term = ntohl(term); //ntohl recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev - setting term info for last term info given current term=%d\n", term); rfd->term = term; STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, NULL, NULL, NULL); break; } default: { recon_main_log (this->name, GF_LOG_ERROR, "nsr_recon_writev called with wrong offset\n"); STACK_UNWIND_STRICT (writev, frame, -1, op_errno, NULL, NULL, NULL); break; } } return 0; } int nsr_recon_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata) { nsr_recon_fd_t *rfd = NULL; int32_t op_errno = 0; // copied stuff from quick-read.c and posix.c struct iobuf *iobuf = NULL; struct iobref *iobref = NULL; struct iovec iov = {0, }; int32_t ret = -1; nsr_recon_private_t *priv = NULL; iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); if (!iobuf) { op_errno = ENOMEM; goto out; } iobref = iobref_new (); if (!iobref) { op_errno = ENOMEM; goto out; } iobref_add (iobref, iobuf); ret = this_fd_ctx_get (fd, this, &rfd); if (ret < 0) { op_errno = -ret; goto out; } priv = (nsr_recon_private_t *)this->private; recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv called for offset %d \n",(unsigned int)offset ); switch (offset) { // client(leader) reads from here to get info for this term on this node // invole libchagelog to get the information case nsr_recon_xlator_sector_3 : { nsr_recon_last_term_info_t lt; GF_ASSERT(size == sizeof(lt)); nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, rfd->term, <); recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv - getting term info for term=%d, ops=%d, first=%d, last=%d\n", rfd->term, lt.commited_ops, lt.first_index, lt.last_index); ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl memcpy(iobuf->ptr, <, size); goto out; } // client(reconciliator) reads individual record information case nsr_recon_xlator_sector_2 : { uint32_t num = (rfd->last_index - rfd->first_index + 1); recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv - expected size %lu got size %lu\n", (num * sizeof(nsr_recon_record_details_t)), size); GF_ASSERT(size == (num * sizeof(nsr_recon_record_details_t))); bzero(iobuf->ptr, size); recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv - getting records for term=%d from %d to %d\n", rfd->term, rfd->first_index, rfd->last_index); nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, rfd->term, rfd->first_index, rfd->last_index, iobuf->ptr); goto out; } // read last term info case nsr_recon_xlator_sector_4 : { nsr_recon_last_term_info_t lt; GF_ASSERT(size == sizeof(lt)); nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, rfd->term, <); recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv - getting last term info given current term=%d. last term = %d ops=%d, first=%d, last=%d\n", rfd->term, lt.last_term, lt.commited_ops, lt.first_index, lt.last_index); ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl memcpy(iobuf->ptr, <, size); goto out; } default: { recon_main_log (this->name, GF_LOG_ERROR, "nsr_recon_readv called with wrong offset\n"); op_errno = -1; break; } } out: if (op_errno == 0) { iov.iov_base = iobuf->ptr; ret = iov.iov_len = size; } STACK_UNWIND_STRICT (readv, frame, ret, op_errno, &iov, 1, NULL, iobref , NULL); if (iobref) iobref_unref (iobref); if (iobuf) iobuf_unref (iobuf); return 0; } int nsr_recon_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { struct iatt buf = {0, }; // dirty hack to set root as regular but seems to work. buf.ia_type = IA_IFREG; recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_lookup called \n"); STACK_UNWIND_STRICT (lookup, frame, 0, 0, this->itable->root, &buf, NULL, NULL); return 0; } int32_t nsr_recon_flush (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { STACK_UNWIND_STRICT (flush, frame, 0, 0, NULL); return 0; } int32_t mem_acct_init (xlator_t *this) { int ret = -1; GF_VALIDATE_OR_GOTO ("recon", this, out); ret = xlator_mem_acct_init (this, gf_mt_recon_end + 1); if (ret != 0) { gf_log (this->name, GF_LOG_ERROR, "Memory accounting init" "failed"); return ret; } out: return ret; } int32_t init (xlator_t *this) { nsr_recon_private_t *priv = NULL; char *local, *members; unsigned int i=0; priv = GF_CALLOC (1, sizeof (*priv), gf_mt_recon_private_t); if (!priv) { gf_log (this->name, GF_LOG_ERROR, "priv allocation error\n"); return -1; } GF_OPTION_INIT ("replica-group-size", priv->replica_group_size, uint32, err); GF_OPTION_INIT ("vol-name", priv->volname, str, err); if (!priv->volname) { gf_log (this->name, GF_LOG_ERROR, "missing volname option (required)"); return -1; } GF_OPTION_INIT ("changelog-dir", priv->changelog_base_path, str, err); if (!priv->changelog_base_path) { gf_log (this->name, GF_LOG_ERROR, "missing changelog directory option (required)"); return -1; } GF_OPTION_INIT ("base-dir", priv->base_dir, str, err); if (!priv->base_dir) { gf_log (this->name, GF_LOG_ERROR, "missing brick base directory option (required)"); return -1; } GF_OPTION_INIT ("replica-group-members", members, str, err); if (!members) { gf_log (this->name, GF_LOG_ERROR, "missing membership option (required)"); return -1; } GF_OPTION_INIT ("local-member", local, str, err); if (!local) { gf_log (this->name, GF_LOG_ERROR, "missing local member option (required)"); return -1; } priv->replica_group_members = GF_CALLOC (priv->replica_group_size, sizeof(char *), gf_mt_recon_private_t); priv->replica_group_members[0] = GF_CALLOC (1, strlen(local), gf_mt_recon_private_t); if (!priv->replica_group_members || !(priv->replica_group_members[0])) { gf_log (this->name, GF_LOG_ERROR, "str allocation error\n"); return -1; } strcpy(priv->replica_group_members[0], local); for (i=1; i < priv->replica_group_size; i++) { char *member; if (i == 1) member = strtok(members, ","); else member = strtok(NULL, ","); priv->replica_group_members[i] = GF_CALLOC (1, strlen(member) + 1, gf_mt_recon_private_t); if (!priv->replica_group_members[i]) { gf_log (this->name, GF_LOG_ERROR, "str allocation error\n"); return -1; } strcpy(priv->replica_group_members[i], member); } priv->this = this; this->private = (void *)priv; priv->fp = recon_create_log (priv->replica_group_members[0], "recon-main-log"); if (!priv->fp) return -1; recon_main_log (this->name, GF_LOG_INFO, "creating reconciliation driver \n"); if (pthread_create(&priv->thread_id, NULL, nsr_reconciliation_driver, priv)) { recon_main_log (this->name, GF_LOG_ERROR, "pthread creation error \n"); return -1; } INIT_LIST_HEAD(&(priv->list)); return 0; err: return -1; } void fini (xlator_t *this) { nsr_recon_private_t *priv = NULL; void *ret = NULL; priv = (nsr_recon_private_t *)this->private; pthread_cancel(priv->thread_id); pthread_join(priv->thread_id, &ret); } struct xlator_fops fops = { .open = nsr_recon_open, .readv = nsr_recon_readv, .writev = nsr_recon_writev, .lookup = nsr_recon_lookup, .flush = nsr_recon_flush }; struct xlator_cbks cbks = { }; struct volume_options options[] = { { .key = {"replica-group-size"}, .type = GF_OPTION_TYPE_INT, .min = 2, .max = INT_MAX, .default_value = "2", .description = "Number of bricks in replica group. can be derived but putting it here for testing." }, { .key = {"vol-name"}, .type = GF_OPTION_TYPE_STR, .description = "volume name" }, { .key = {"local-member"}, .type = GF_OPTION_TYPE_STR, .description = "member(brick) for which this translator is responsible." }, { .key = {"replica-group-members"}, .type = GF_OPTION_TYPE_STR, .description = "Comma seperated member names other than local." }, { .key = {"changelog-dir"}, .type = GF_OPTION_TYPE_STR, .description = "Base directory where per term changelogs are maintained." }, { .key = {"base-dir"}, .type = GF_OPTION_TYPE_STR, .description = "Base directory for this brick. This should go away once we fix gfid based lookups" }, { .key = {NULL} }, };