diff options
Diffstat (limited to 'xlators/cluster/nsr-recon/src/recon_xlator.c')
-rw-r--r-- | xlators/cluster/nsr-recon/src/recon_xlator.c | 1010 |
1 files changed, 1010 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c new file mode 100644 index 000000000..272c35dc2 --- /dev/null +++ b/xlators/cluster/nsr-recon/src/recon_xlator.c @@ -0,0 +1,1010 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <sys/types.h> +#include <fcntl.h> +#include <string.h> +#include <unistd.h> + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" + +#include "recon_driver.h" +#include "recon_xlator.h" + +typedef struct _nsr_recon_fd_s { + int32_t term; + nsr_recon_driver_state_t state; + uint32_t first_index; + uint32_t last_index; + call_frame_t *frame; +} nsr_recon_fd_t; + +#if defined(NSR_DEBUG) + +void +_recon_main_log (const char *func, int line, char *member, FILE *fp, + char *fmt, ...) +{ + va_list ap; + char *buf = NULL; + int retval; + + if (!fp) { + fp = recon_create_log(member,"recon-main-log"); + if (!fp) { + return; + } + } + + va_start(ap,fmt); + retval = vasprintf(&buf,fmt,ap); + if (buf) { + fprintf(fp,"[%s:%d] %.*s\n",func,line,retval,buf); + free(buf); + } + va_end(ap); +} + +#endif + +// Given fd, get back the NSR based fd context. +static int32_t this_fd_ctx_get(fd_t *fd, xlator_t *this, nsr_recon_fd_t **rfd) +{ + uint64_t tmp = 0; + int32_t ret = -1; + + if ((ret = fd_ctx_get(fd, this, &tmp)) != 0) { + return ret; + } else { + *rfd = (nsr_recon_fd_t *)tmp; + return 0; + } +} + +// Add the frame in q after associating with term +// term usage tbd +static void put_frame(nsr_recon_private_t *priv, + call_frame_t *frame, + uint32_t term) +{ + xlator_t *this = priv->this; + recon_main_log (this->name, GF_LOG_INFO, "adding frame for term %d \n", term); + priv->frame = frame; + return; +} + +// get the frame from the queue given the term +// term usage tbd +static void get_frame(nsr_recon_private_t *priv, + call_frame_t **frame, + uint32_t term) +{ + if (frame != NULL) + *frame = priv->frame; + priv->frame = NULL; + return; +} + +// check if there are outstanding frames +static gf_boolean_t is_frame(nsr_recon_private_t *priv) +{ + return((priv->frame != NULL) ? _gf_true : _gf_false); +} + +#define ENTRY_SIZE 128 + +long +get_entry_count (char *path) +{ + int fd; + struct stat buf; + unsigned long entries = -1; + long min; /* last entry not known to be empty */ + long max; /* first entry known to be empty */ + long curr; + char entry[ENTRY_SIZE]; + void *err_label = &&done; + + fd = open(path,O_RDONLY); + if (fd < 0) { + goto *err_label; + } + err_label = &&close_fd; + + if (fstat(fd,&buf) < 0) { + goto *err_label; + } + + min = 0; + max = buf.st_size / ENTRY_SIZE; + printf("max = %ld\n",max); + + while ((min+1) < max) { + curr = (min + max) / 2; + printf("trying entry %ld\n",curr); + if (lseek(fd,curr*ENTRY_SIZE,SEEK_SET) < 0) { + goto *err_label; + } + if (read(fd,entry,sizeof(entry)) != sizeof(entry)) { + goto *err_label; + } + if ((entry[0] == '_') && (entry[1] == 'P')) { + min = curr; + } + else { + max = curr; + } + } + + entries = max; + +close_fd: + close(fd); +done: + return entries; +} + +// Get the term info for the term number specified +void nsr_recon_libchangelog_get_this_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt) +{ + char path[PATH_MAX]; + long entries; + + bzero(lt, sizeof(nsr_recon_last_term_info_t)); + lt->last_term = term; + sprintf(path,"%s/%s%d",bp,"TERM.",term); + entries = get_entry_count(path); + if (entries > 1) { + /* The first entry is actually a header. */ + lt->first_index = 1; + /* + * This seems wrong, because it means that last_index*128 will + * be exactly at EOF and commited_ops will be one greater than + * it should be. Maybe some other code makes the exact + * opposite mistake to compensate. + */ + lt->last_index = lt->commited_ops = (int)entries; + } + recon_main_log (this->name, GF_LOG_INFO, "for term=%d got first_index=%d last_index=%d commited_ops=%d\n", + term, lt->first_index, lt->last_index, lt->commited_ops); + return; +} + +// Given the term number, find the last term in the changelogs +void nsr_recon_libchangelog_get_last_term_info(xlator_t *this, char *bp, int32_t term, nsr_recon_last_term_info_t *lt) +{ + uint32_t t = term; + struct stat buf; + char path[PATH_MAX]; + bzero(lt, sizeof(nsr_recon_last_term_info_t)); + while(t) { + // journal file is of type TERM-1.jnl + sprintf(path,"%s/%s%d",bp,"TERM.",t); + if (!stat(path, &buf)) { + nsr_recon_libchangelog_get_this_term_info(this, bp, t, lt); + recon_main_log (this->name, GF_LOG_INFO, "got last term given current term %d as %d\n", term, t); + return; + } + t--; + } + recon_main_log (this->name, GF_LOG_INFO, "got no last term given current term %d \n", term); + + return; +} + +// Return back the frame stored against the term +void nsr_recon_return_back(nsr_recon_private_t *priv, uint32_t term, int32_t status, int32_t op_errno) +{ + call_frame_t *old_frame = NULL; + xlator_t *this = priv->this; + + get_frame(priv, &old_frame, term); + if (old_frame) { + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev returns old frame \n"); + // first return the original write for which this ack was sent + STACK_UNWIND_STRICT (writev, old_frame, status, op_errno, NULL, NULL, NULL); + } else { + recon_main_log (this->name, GF_LOG_ERROR, "EIII---nsr_recon_writev cnnot return old frame \n"); + } +} + +typedef enum records_type_t { + fop_gfid_pgfid_oldloc_newloc = 1, + fop_gfid_pgfid_entry = fop_gfid_pgfid_oldloc_newloc + 1, + fop_gfid = fop_gfid_pgfid_entry + 1 , + fop_gfid_offset = fop_gfid + 1, + fop_gfid_offset_len = fop_gfid_offset + 1, +} records_type_t; + +// Get the backend ./glusterfs/xx/xx/<...> path +static void +get_gfid_path(nsr_recon_private_t *priv, char *gfid, char *path) +{ + strcpy(path, priv->base_dir); + strcat(path, "/.glusterfs/"); + strncat(path,gfid,2); + strcat(path,"/"); + strncat(path,gfid+2,2); + strcat(path,"/"); + strcat(path,gfid); +} + + +// Get the link to which backend points to +static gf_boolean_t +get_link_using_gfid(nsr_recon_private_t *priv, char *gfid, char *path) +{ + char lp[PATH_MAX]; + xlator_t *this = priv->this; + get_gfid_path(priv,gfid, lp); + if (readlink(lp, path, 255) == -1) { + GF_ASSERT(0); + recon_main_log(priv->this, GF_LOG_ERROR, + "cannot get readlink for %s\n",lp); + return _gf_false; + } + return _gf_true; +} + +// Get the list of changelog records given a term , first and last index. +// +// TBD: rewrite this hideous ball of mud in at least the following ways: +// +// (1) Break out the code for handling a single record into a separate +// function, to make error handling easier and reduce "indentation +// creep" so the code's readable. +// +// (2) Change all of the fop_xxx_yyy nonsense to OR together values +// like FOP_HAS_FIELD_XXX and FOP_HAS_FIELD_YYY, to reduce code +// duplication and facilitate the addition of new fields. +// +// (3) Stop making so many assumptions about the underlying formats. +// The code as it is won't even work for the existing binary format, +// let alone as changelog evolves over time. +// +// Really, 90% of this code should just GO AWAY in favor of using +// libgfchangelog, enhanced as necessary to support our needs. + +/* + * Use this macro to skip over a field we're not using yet. + * NB: the body is a null statement on purpose + * TBD: all instances of this should be removed eventually! + */ +#define SKIP_FIELD do /* nothing */ ; while (*(start++) != '\0') + +#define SKIP_OVER +gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_t term, uint32_t first, uint32_t last, void *buf) +{ + // do a mmap; seek into the first and read all records till last. + // TBD - right now all records are pseudo holes but mark them as fills. + // TBD - pseudo hole to be implemented when actual fsync gets done on data. + char *rb = NULL, *orig = NULL; + char path[PATH_MAX]; + int fd; + uint32_t index = 0; + + recon_main_log (this->name, GF_LOG_INFO, + "libchangelog_get_records called for term %d index from %d to %d \n", + term, first, last ); + + orig = rb = GF_CALLOC(128, ((last - first) + 1), + gf_mt_recon_changelog_buf_t); + + sprintf(path,"%s/%s%d",bp,"TERM.",term); + fd = open(path, O_RDONLY); + if (fd == -1) { + return _gf_false; + } else { + char *start = NULL; + nsr_recon_record_details_t * rec = (nsr_recon_record_details_t *)buf; + + if (first == 0) + lseek(fd, 128, SEEK_SET); + else + lseek(fd, first * 128, SEEK_SET); + if (read(fd, rb, (last - first + 1) * 128) == -1) { + return _gf_false; + } + start = rb; + index = first; + do { + recon_main_log (this->name, GF_LOG_INFO, + "libchangelog_get_records start inspecting records at index %d \n", + index ); + if (!strncmp(start, "_PRE_", 5)) { + uint32_t i; + uint32_t opcode = 0; + records_type_t type; + + start += 5; + // increment by the NULLs after the PRE + start += 4; + SKIP_FIELD; // real index + // now we have the opcode + while (*start != '\0') { + opcode *= 10; + opcode += (*(start++) - '0'); + } + ++start; + recon_main_log (this->name, GF_LOG_ERROR, + "libchangelog_get_records: got opcode %d @index %d\n", opcode, index); + if ((opcode == GF_FOP_RENAME)) { + type = fop_gfid_pgfid_oldloc_newloc; + } else if ((opcode == GF_FOP_UNLINK) || + (opcode == GF_FOP_RMDIR) || + (opcode == GF_FOP_LINK) || + (opcode == GF_FOP_MKDIR) || + (opcode == GF_FOP_SYMLINK) || + (opcode == GF_FOP_MKNOD) || + (opcode == GF_FOP_CREATE)) { + type = fop_gfid_pgfid_entry; + } else if ((opcode == GF_FOP_FSETATTR) || + (opcode == GF_FOP_SETATTR) || + (opcode == GF_FOP_FREMOVEXATTR) || + (opcode == GF_FOP_REMOVEXATTR) || + (opcode == GF_FOP_SETXATTR) || + (opcode == GF_FOP_FSETXATTR)) { + type = fop_gfid; + } else if ((opcode == GF_FOP_TRUNCATE) || + (opcode == GF_FOP_FTRUNCATE)) { + type = fop_gfid_offset; + } else if (opcode == GF_FOP_WRITE) { + type = fop_gfid_offset_len; + } else { + recon_main_log (this->name, + GF_LOG_ERROR, + "libchangelog_get_records:got no proper opcode %d @index %d\n", + opcode, index); + //GF_ASSERT(0); + // make this as a hole. + // TBD - check this logic later. maybe we should raise alarm here because + // this means that changelog is corrupted. We are not handling changelog + // corruptions as of now. + rec->type = NSR_LOG_HOLE; + goto finish; + } + // TBD - handle psuedo holes once that logic is in. + rec->type = NSR_LOG_FILL; + recon_main_log (this->name, GF_LOG_ERROR, + "libchangelog_get_records:got type %d at index %d \n", + rec->type, index); + rec->op = opcode; + + // Now get the gfid and parse it + // before that increment the pointer + for (i=0; i < 36; i++) { + rec->gfid[i] = (*start); + start++; + } + rec->gfid[i] = '\0'; + + GF_ASSERT(*start == 0); + start ++; + + if (opcode == GF_FOP_SYMLINK) { + i = 0; + do { + if (i >= 256) { + goto finish; + } + rec->link_path[i++] = *start; + } while (*(start++) != '\0'); + } + + i = 0; + // If type is fop_gfid_offset+_len, get offset + if ((type == fop_gfid_offset) || (type == fop_gfid_offset_len)) { + char offset_str[128]; + while(*start != 0) { + offset_str[i++] = *start; + start ++; + } + offset_str[i] = '\0'; + // get over the 0 + start++; + rec->offset = strtoul(offset_str, NULL, 10); + recon_main_log (this->name, + GF_LOG_ERROR, + "libchangelog_get_records:got offset %d @index %d \n", rec->offset, index); + + } + i = 0; + if (type == fop_gfid_offset_len) { + char len_str[128]; + while(*start != 0) { + len_str[i++] = *start; + start ++; + } + len_str[i] = '\0'; + // get over the 0 + start++; + rec->len = strtoul(len_str, NULL, 10); + recon_main_log (this->name, + GF_LOG_ERROR, + "libchangelog_get_records:got length %d @index %d \n", rec->len, index); + } + i = 0; + if (type == fop_gfid_pgfid_entry) { + switch (opcode) { + case GF_FOP_CREATE: + case GF_FOP_MKDIR: + case GF_FOP_MKNOD: + SKIP_FIELD; // mode + break; + /* TBD: handle GF_FOP_SYMLINK target */ + default: + ; + } + SKIP_FIELD; // uid + SKIP_FIELD; // gid + if (opcode == GF_FOP_MKNOD) { + SKIP_FIELD; // dev + } + // first get the gfid and then the path + for (i=0; i < 36; i++) { + rec->pargfid[i] = (*start); + start++; + } + rec->pargfid[i] = '\0'; + GF_ASSERT(*start == '/'); + start ++; + + i = 0; + while(*start != 0) { + rec->entry[i++] = *start; + start ++; + } + rec->entry[i] = '\0'; + // get over the 0 + start++; + /* + * Having to add this as a special case + * is awful. See the function header + * comment for the real solution. + */ + if (opcode == GF_FOP_CREATE) { + rec->mode = 0; + while (*start != '\0') { + rec->mode *= 10; + rec->mode += *start + - '0'; + ++start; + } + ++start; + } + recon_main_log (this->name, + GF_LOG_ERROR, + "libchangelog_get_records:got entry %s @index %d \n", rec->entry, index); + + } + i = 0; + if (type == fop_gfid_pgfid_oldloc_newloc) { + + // first get the source and then the destination + // source stuff gets stored in pargfid/entry + for (i=0; i < 36; i++) { + rec->pargfid[i] = (*start); + start++; + } + rec->pargfid[i] = '\0'; + GF_ASSERT(*start == '/'); + start ++; + + i=0; + while(*start != 0) { + rec->entry[i++] = *start; + start ++; + } + rec->entry[i] = '\0'; + // get over the 0 + start++; + + // dst stuff gets stored in gfid/newloc + for (i=0; i < 36; i++) { + rec->gfid[i] = (*start); + start++; + } + rec->gfid[i] = '\0'; + GF_ASSERT(*start == '/'); + start ++; + i = 0; + while(*start != 0) { + rec->newloc[i++] = *start; + start ++; + } + rec->newloc[i] = '\0'; + // get over the 0 + start++; + + } + ENDIAN_CONVERSION_RD((*rec), _gf_false); //htonl + } +finish: + if (index == last) + break; + index++; + rb += 128; + start = rb; + rec++; + } while(1); + } + GF_FREE(orig); + close(fd); + + recon_main_log (this->name, GF_LOG_INFO, + "libchangelog_get_records finsihed inspecting records for term %d \n", + term); + return _gf_true; +} + +int32_t +nsr_recon_open (call_frame_t *frame, xlator_t *this, + loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata) +{ + int32_t op_ret = 0; + int32_t op_errno = 0; + nsr_recon_fd_t *rfd = NULL; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open called for path %s \n",loc->path ); + rfd = GF_CALLOC (1, sizeof (*rfd), gf_mt_recon_fd_t); + if (!rfd) { + op_ret = -1; + op_errno = ENOMEM; + } + + op_ret = fd_ctx_set (fd, this, (uint64_t)(long)rfd); + if (op_ret) { + op_ret = -1; + op_errno = EINVAL; + } + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open returns with %d for path %s \n",op_ret,loc->path ); + STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd, NULL); + return 0; +} + +int32_t +nsr_recon_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iovec *vector, int32_t count, off_t offset, + uint32_t flags, struct iobref *iobref, dict_t *xdata) +{ + nsr_recon_fd_t *rfd = NULL; + nsr_recon_private_t *priv = NULL; + int32_t op_ret = 0; + int32_t op_errno = 0; + int32_t ret = 0; + + ret = this_fd_ctx_get (fd, this, &rfd); + if (ret < 0) { + return -1; + } + priv = (nsr_recon_private_t *)this->private; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called for offset %d \n",(unsigned int)offset ); + GF_ASSERT(count == 1); + switch (offset) { + // client(brick, leader) writes the role of the node + case nsr_recon_xlator_sector_1 : + { + nsr_recon_role_t rr; + memcpy((void *)&rr, (void *)vector[0].iov_base, sizeof(rr)); + ENDIAN_CONVERSION_RR(rr, _gf_true); //ntohl + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_writev called to set role %d\n", rr.role); + if ((rr.role != leader) && + (rr.role != reconciliator) && + (rr.role != resolutor) && + (rr.role != joiner)) { + recon_main_log (this->name, GF_LOG_ERROR, + "EIII---nsr_recon_writev cannot set state \n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + } + + GF_ASSERT(rr.num <= MAXIMUM_REPLICA_STRENGTH); + + // Check if already a role play is going on. If yes return with EAGAIN. + // Ideally we should check if we have got a higher term number while + // servicing a lower term number; if so abort the older one. + // However the abort infrastructure needs to be sketched properly; TBD. + if (is_frame(priv) == _gf_true) { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev set_role - already role play \n"); + STACK_UNWIND_STRICT (writev, frame, -1, EAGAIN, + NULL, NULL, NULL); + } else { + + // Store the stack frame so that when the actual job gets finished + // we send the response back to the brick. + put_frame(priv, frame, rr.current_term); + if (nsr_recon_driver_set_role(priv->driver_thread_context, + &rr, + rr.current_term) == _gf_false) { + get_frame(priv, NULL, rr.current_term); + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev set_role - cannot seem to set role \n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + } else { + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev set_role - set role succesfully \n"); + } + } + break; + } + // client(reconciliator) writes how much it needs for the read + case nsr_recon_xlator_sector_2 : + { + nsr_recon_log_info_t li; + memcpy((void *)&li, (void *)vector[0].iov_base, sizeof(li)); + ENDIAN_CONVERSION_LI(li, _gf_true); //ntohl + + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev - setting term info for reconcilation info. term=%d, first_index=%d,start_index=%d \n", + li.term, li.first_index, li.last_index); + rfd->term = li.term; + rfd->last_index = li.last_index; + rfd->first_index = li.first_index; + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + // client(reconciliator) writes term for which it needs info + case nsr_recon_xlator_sector_3 : + { + int32_t term; + + memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term)); + term = ntohl(term); //ntohl + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev - setting term info for term info. term=%d\n", + term); + rfd->term = term; + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + // client(reconciliator) writes current term so that it gets last term info later + case nsr_recon_xlator_sector_4 : + { + int32_t term; + + memcpy((void *)&term, (void *)vector[0].iov_base, sizeof(term)); + term = ntohl(term); //ntohl + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_writev - setting term info for last term info given current term=%d\n", + term); + rfd->term = term; + STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, + NULL, NULL, NULL); + break; + } + default: + { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_writev called with wrong offset\n"); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, + NULL, NULL, NULL); + break; + } + } + + return 0; +} + +int +nsr_recon_readv (call_frame_t *frame, xlator_t *this, + fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata) +{ + nsr_recon_fd_t *rfd = NULL; + int32_t op_errno = 0; + // copied stuff from quick-read.c and posix.c + struct iobuf *iobuf = NULL; + struct iobref *iobref = NULL; + struct iovec iov = {0, }; + int32_t ret = -1; + nsr_recon_private_t *priv = NULL; + + iobuf = iobuf_get2 (this->ctx->iobuf_pool, size); + if (!iobuf) { + op_errno = ENOMEM; + goto out; + } + + iobref = iobref_new (); + if (!iobref) { + op_errno = ENOMEM; + goto out; + } + + iobref_add (iobref, iobuf); + + ret = this_fd_ctx_get (fd, this, &rfd); + if (ret < 0) { + op_errno = -ret; + goto out; + } + priv = (nsr_recon_private_t *)this->private; + + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_readv called for offset %d \n",(unsigned int)offset ); + switch (offset) { + // client(leader) reads from here to get info for this term on this node + // invole libchagelog to get the information + case nsr_recon_xlator_sector_3 : + { + nsr_recon_last_term_info_t lt; + GF_ASSERT(size == sizeof(lt)); + nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, rfd->term, <); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - getting term info for term=%d, ops=%d, first=%d, last=%d\n", + rfd->term, lt.commited_ops, lt.first_index, lt.last_index); + ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl + memcpy(iobuf->ptr, <, size); + goto out; + } + // client(reconciliator) reads individual record information + case nsr_recon_xlator_sector_2 : + { + uint32_t num = (rfd->last_index - rfd->first_index + 1); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - expected size %lu got size %lu\n", + (num * sizeof(nsr_recon_record_details_t)), size); + + GF_ASSERT(size == (num * sizeof(nsr_recon_record_details_t))); + bzero(iobuf->ptr, size); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - getting records for term=%d from %d to %d\n", + rfd->term, rfd->first_index, rfd->last_index); + nsr_recon_libchangelog_get_records(this, priv->changelog_base_path, + rfd->term, rfd->first_index, rfd->last_index, iobuf->ptr); + goto out; + } + // read last term info + case nsr_recon_xlator_sector_4 : + { + nsr_recon_last_term_info_t lt; + GF_ASSERT(size == sizeof(lt)); + nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, rfd->term, <); + recon_main_log (this->name, GF_LOG_INFO, + "nsr_recon_readv - getting last term info given current term=%d. last term = %d ops=%d, first=%d, last=%d\n", + rfd->term, lt.last_term, lt.commited_ops, lt.first_index, lt.last_index); + ENDIAN_CONVERSION_LT(lt, _gf_false); //htonl + memcpy(iobuf->ptr, <, size); + goto out; + } + default: + { + recon_main_log (this->name, GF_LOG_ERROR, + "nsr_recon_readv called with wrong offset\n"); + op_errno = -1; + break; + } + } + +out: + if (op_errno == 0) { + iov.iov_base = iobuf->ptr; + ret = iov.iov_len = size; + } + + STACK_UNWIND_STRICT (readv, frame, ret, op_errno, &iov, 1, NULL, iobref , NULL); + + if (iobref) + iobref_unref (iobref); + if (iobuf) + iobuf_unref (iobuf); + return 0; +} + +int +nsr_recon_lookup (call_frame_t *frame, xlator_t *this, + loc_t *loc, dict_t *xdata) +{ + struct iatt buf = {0, }; + // dirty hack to set root as regular but seems to work. + buf.ia_type = IA_IFREG; + recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_lookup called \n"); + + STACK_UNWIND_STRICT (lookup, frame, 0, 0, this->itable->root, &buf, NULL, NULL); + return 0; +} + + +int32_t +nsr_recon_flush (call_frame_t *frame, xlator_t *this, + fd_t *fd, dict_t *xdata) +{ + STACK_UNWIND_STRICT (flush, frame, 0, 0, NULL); + return 0; +} + + +int32_t +mem_acct_init (xlator_t *this) +{ + int ret = -1; + + GF_VALIDATE_OR_GOTO ("recon", this, out); + + ret = xlator_mem_acct_init (this, gf_mt_recon_end + 1); + + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "Memory accounting init" "failed"); + return ret; + } +out: + return ret; +} + + +int32_t +init (xlator_t *this) +{ + nsr_recon_private_t *priv = NULL; + char *local, *members; + unsigned int i=0; + + priv = GF_CALLOC (1, sizeof (*priv), gf_mt_recon_private_t); + if (!priv) { + gf_log (this->name, GF_LOG_ERROR, + "priv allocation error\n"); + return -1; + } + GF_OPTION_INIT ("replica-group-size", priv->replica_group_size, uint32, err); + GF_OPTION_INIT ("vol-name", priv->volname, str, err); + if (!priv->volname) { + gf_log (this->name, GF_LOG_ERROR, + "missing volname option (required)"); + return -1; + } + GF_OPTION_INIT ("changelog-dir", priv->changelog_base_path, str, err); + if (!priv->changelog_base_path) { + gf_log (this->name, GF_LOG_ERROR, + "missing changelog directory option (required)"); + return -1; + } + GF_OPTION_INIT ("base-dir", priv->base_dir, str, err); + if (!priv->base_dir) { + gf_log (this->name, GF_LOG_ERROR, + "missing brick base directory option (required)"); + return -1; + } + GF_OPTION_INIT ("replica-group-members", members, str, err); + if (!members) { + gf_log (this->name, GF_LOG_ERROR, + "missing membership option (required)"); + return -1; + } + GF_OPTION_INIT ("local-member", local, str, err); + if (!local) { + gf_log (this->name, GF_LOG_ERROR, + "missing local member option (required)"); + return -1; + } + + priv->replica_group_members = GF_CALLOC (priv->replica_group_size, + sizeof(char *), + gf_mt_recon_members_list_t); + priv->replica_group_members[0] = GF_CALLOC (1, + strlen(local), + gf_mt_recon_member_name_t); + if (!priv->replica_group_members || !(priv->replica_group_members[0])) { + gf_log (this->name, GF_LOG_ERROR, + "str allocation error\n"); + return -1; + } + strcpy(priv->replica_group_members[0], local); + for (i=1; i < priv->replica_group_size; i++) { + char *member; + if (i == 1) + member = strtok(members, ","); + else + member = strtok(NULL, ","); + priv->replica_group_members[i] = GF_CALLOC (1, + strlen(member) + 1, gf_mt_recon_member_name_t); + if (!priv->replica_group_members[i]) { + gf_log (this->name, GF_LOG_ERROR, + "str allocation error\n"); + return -1; + } + strcpy(priv->replica_group_members[i], member); + } + + + priv->this = this; + this->private = (void *)priv; + + priv->fp = recon_create_log (priv->replica_group_members[0], "recon-main-log"); + if (!priv->fp) + return -1; + + recon_main_log (this->name, GF_LOG_INFO, "creating reconciliation driver \n"); + + if (pthread_create(&priv->thread_id, NULL, nsr_reconciliation_driver, priv)) { + recon_main_log (this->name, GF_LOG_ERROR, + "pthread creation error \n"); + return -1; + } + + INIT_LIST_HEAD(&(priv->list)); + + + return 0; + +err: + return -1; +} + + +void +fini (xlator_t *this) +{ + nsr_recon_private_t *priv = NULL; + void *ret = NULL; + + priv = (nsr_recon_private_t *)this->private; + + pthread_cancel(priv->thread_id); + pthread_join(priv->thread_id, &ret); +} + + +struct xlator_fops fops = { + .open = nsr_recon_open, + .readv = nsr_recon_readv, + .writev = nsr_recon_writev, + .lookup = nsr_recon_lookup, + .flush = nsr_recon_flush +}; + +struct xlator_cbks cbks = { +}; + +struct volume_options options[] = { + { .key = {"replica-group-size"}, + .type = GF_OPTION_TYPE_INT, + .min = 2, + .max = INT_MAX, + .default_value = "2", + .description = "Number of bricks in replica group. can be derived but putting it here for testing." + }, + { + .key = {"vol-name"}, + .type = GF_OPTION_TYPE_STR, + .description = "volume name" + }, + { + .key = {"local-member"}, + .type = GF_OPTION_TYPE_STR, + .description = "member(brick) for which this translator is responsible." + }, + { + .key = {"replica-group-members"}, + .type = GF_OPTION_TYPE_STR, + .description = "Comma seperated member names other than local." + }, + { + .key = {"changelog-dir"}, + .type = GF_OPTION_TYPE_STR, + .description = "Base directory where per term changelogs are maintained." + }, + { + .key = {"base-dir"}, + .type = GF_OPTION_TYPE_STR, + .description = "Base directory for this brick. This should go away once we fix gfid based lookups" + }, + { .key = {NULL} }, +}; |