summaryrefslogtreecommitdiffstats
path: root/xlators/cluster
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2014-02-11 05:58:18 -0800
committerGerrit Code Review <review@dev.gluster.org>2014-02-11 05:58:18 -0800
commit6f9bec28ea3a488111cda974ee6959bd86094f31 (patch)
tree87e12b0281a1c70b7b98167f9767c514c3886894 /xlators/cluster
parent86ca56114f5aa2d2e5358ca9f62a43b049c84246 (diff)
parenta3388532787ec42a8cd3fbb4cc88b84fcf0b5cc3 (diff)
Merge "nsr-recon: bring data_worker_fun in line with coding standard"
Diffstat (limited to 'xlators/cluster')
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c473
1 files changed, 245 insertions, 228 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
index 084b7f5ce..00e79b9d3 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.c
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -1776,262 +1776,279 @@ static void
data_worker_func(nsr_per_node_worker_t *ctx,
nsr_recon_work_t *work)
{
- nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
- nsr_reconciliation_record_t *ri = NULL;
- nsr_recon_record_details_t *rd = NULL;
- glfs_fd_t *fd = NULL;
- int wip = 0;
+ nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ nsr_reconciliation_record_t *ri = NULL;
+ nsr_recon_record_details_t *rd = NULL;
+ int wip = 0;
+ dict_t * dict = NULL;
+ struct glfs_fd *fd = NULL;
+ struct glfs_object *obj = NULL;
+ uuid_t gfid;
+ uint32_t k_s = 0, v_s = 0;
+ char *t_b= NULL;
+ uint32_t num=0;
- switch (work->req_id){
- case NSR_WORK_ID_INI:
- {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "started data ini \n");
- nsr_recon_start_work(ctx, _gf_false);
+ switch (work->req_id){
+ case NSR_WORK_ID_INI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started data ini \n");
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished data ini \n");
- break;
- }
- case NSR_WORK_ID_FINI:
- {
- nsr_worker_log(this->name, GF_LOG_INFO,
- "started data fini \n");
+ nsr_recon_start_work(ctx, _gf_false);
- nsr_recon_end_work(ctx, _gf_false);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished data ini \n");
+ break;
+ case NSR_WORK_ID_FINI:
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started data fini \n");
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished data fini \n");
- break;
- }
- case NSR_WORK_ID_SINGLE_RECONCILIATION_READ:
- {
- dict_t * dict = NULL;
- // first_index always starts with 1 but records starts at 0.
- wip = work->index - (dr->workers[0].recon_info->first_index);
- ri = &(dr->workers[0].recon_info->records[wip]);
- rd = &(ri->rec);
-
- dict = dict_new ();
- if (!dict) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "failed allocating for dictionary\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
+ nsr_recon_end_work(ctx, _gf_false);
- if (rd->op == GF_FOP_WRITE) {
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished data fini \n");
+ break;
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_READ:
+ // first_index always starts with 1 but records starts at 0.
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+
+ dict = dict_new ();
+ if (!dict) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
- // record already copied.
- // copy data to this node's info.
- struct glfs_fd *fd = NULL;
- struct glfs_object *obj = NULL;
- uuid_t gfid;
+ switch (rd->op) {
+ case GF_FOP_WRITE:
- uuid_parse(ri->rec.gfid, gfid);
+ // record already copied.
+ // copy data to this node's info.
- nsr_worker_log(this->name, GF_LOG_INFO,
- "started recon read for file %s at offset %d at len %d\n",
- ri->rec.gfid, rd->offset, rd->len);
-
- obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL);
- if (obj == NULL) {
- GF_ASSERT(obj != NULL);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "creating of handle failed\n");
- goto read_out;
- }
+ uuid_parse(ri->rec.gfid, gfid);
- // The file has probably got deleted.
- fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict);
- if (fd == NULL) {
- GF_ASSERT(fd != NULL);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "opening of file failed\n");
- goto read_out;
- }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "started recon read for file %s at offset %d at len %d\n",
+ ri->rec.gfid, rd->offset, rd->len);
+
+ obj = glfs_h_create_from_handle (ctx->fs, gfid,
+ GFAPI_HANDLE_LENGTH,
+ NULL);
+ if (obj == NULL) {
+ GF_ASSERT(obj != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ break;
+ }
- if (glfs_lseek_with_xdata(fd, rd->offset, SEEK_SET, dict) != rd->offset) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "lseek of file failed to offset %d\n", rd->offset);
- goto read_out;
- }
+ // The file has probably got deleted.
+ fd = glfs_h_open_with_xdata (ctx->fs, obj, O_RDONLY,
+ dict);
+ if (fd == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "opening of file failed\n");
+ break;
+ }
- ri->work.data = GF_CALLOC(rd->len , sizeof(char), gf_mt_recon_private_t);
- if (glfs_read_with_xdata(fd, ri->work.data, rd->len, 0, dict) != rd->len) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "read of file failed to offset %d for bytes %d\n", rd->offset, rd->len);
- goto read_out;
- }
+ if (glfs_lseek_with_xdata (fd, rd->offset, SEEK_SET,
+ dict) != rd->offset) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "lseek of file failed to offset %d\n",
+ rd->offset);
+ break;
+ }
- glfs_close_with_xdata(fd, dict);
- glfs_h_close(obj);
+ ri->work.data = GF_CALLOC (rd->len , sizeof(char),
+ gf_mt_recon_private_t);
+ if (glfs_read_with_xdata (fd, ri->work.data, rd->len,
+ 0, dict) != rd->len) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "read of file failed to offset %d for bytes %d\n",
+ rd->offset, rd->len);
+ break;
+ }
- } else if (rd->op == GF_FOP_FTRUNCATE) {
- } else if (rd->op == GF_FOP_SYMLINK) {
- } else if ((rd->op == GF_FOP_RMDIR) || (rd->op == GF_FOP_UNLINK) ||
- (rd->op == GF_FOP_MKNOD) || (rd->op == GF_FOP_CREATE) ||
- (rd->op == GF_FOP_LINK) || (rd->op == GF_FOP_MKDIR)) {
- } else if (rd->op == GF_FOP_RENAME) {
- } else if ((rd->op == GF_FOP_FREMOVEXATTR) ||
- (rd->op == GF_FOP_REMOVEXATTR) ||
- (rd->op == GF_FOP_SETXATTR) ||
- (rd->op == GF_FOP_FSETXATTR)) {
+ glfs_close_with_xdata(fd, dict);
+ glfs_h_close(obj);
+ break;
- struct glfs_fd *fd = NULL;
- struct glfs_object *obj = NULL;
- uuid_t gfid;
+ case GF_FOP_FTRUNCATE:
+ case GF_FOP_SYMLINK:
+ case GF_FOP_RMDIR:
+ case GF_FOP_UNLINK:
+ case GF_FOP_MKNOD:
+ case GF_FOP_CREATE:
+ case GF_FOP_LINK:
+ case GF_FOP_MKDIR:
+ case GF_FOP_RENAME:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unimplemented fop %u\n", rd->op);
+ break;
- uuid_parse(ri->rec.gfid, gfid);
+ case GF_FOP_FREMOVEXATTR:
+ case GF_FOP_REMOVEXATTR:
+ case GF_FOP_SETXATTR:
+ case GF_FOP_FSETXATTR:
+ uuid_parse(ri->rec.gfid, gfid);
- // This is for all the set attribute/extended attributes commands.
- // Get all the attributes from the source and fill it in the buffer
- // as a NULL seperated key and value which are in turn seperated by
- // NULL.
- uint32_t k_s = 0, v_s = 0;
- char *t_b= NULL;
- uint32_t num=0;
+ // This is for all the set attribute/extended
+ // attributes commands. Get all the attributes from
+ // the source and fill it in the buffer as a NULL
+ // seperated key and value which are in turn seperated
+ // by NULL.
- nsr_worker_log(this->name, GF_LOG_INFO,
- "doing getattr for gfid %s \n",
- ri->rec.gfid);
-
- obj = glfs_h_create_from_handle(ctx->fs, gfid, GFAPI_HANDLE_LENGTH, NULL);
- if (obj == NULL) {
- GF_ASSERT(fd != NULL);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "creating of handle failed\n");
- goto read_out;
- }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "doing getattr for gfid %s \n",
+ ri->rec.gfid);
- if (obj->inode->ia_type == IA_IFDIR)
- fd = glfs_h_opendir_with_xdata(ctx->fs, obj, dict);
- else
- fd = glfs_h_open_with_xdata(ctx->fs, obj, O_RDONLY, dict);
+ obj = glfs_h_create_from_handle (ctx->fs, gfid,
+ GFAPI_HANDLE_LENGTH,
+ NULL);
+ if (obj == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "creating of handle failed\n");
+ break;
+ }
- if (fd == NULL) {
- GF_ASSERT(fd != NULL);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "opening of file failed\n");
- goto read_out;
- }
+ if (obj->inode->ia_type == IA_IFDIR)
+ fd = glfs_h_opendir_with_xdata (ctx->fs, obj,
+ dict);
+ else
+ fd = glfs_h_open_with_xdata (ctx->fs, obj,
+ O_RDONLY, dict);
+
+ if (fd == NULL) {
+ GF_ASSERT(fd != NULL);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "opening of file failed\n");
+ break;
+ }
- if(get_xattr_total_size(fd, &t_b, &k_s, &v_s, &num, dict) == -1) {
- if (t_b) free(t_b);
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "list of xattr of gfid %s failed\n", rd->gfid);
- goto read_out;
- }
- ri->work.data = GF_CALLOC((k_s + v_s) , sizeof(char), gf_mt_recon_private_t);
- get_xattr(fd, t_b, ri->work.data, v_s, num, dict);
- ri->work.num = num;
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished getattr for gfid %s \n",
- ri->rec.gfid);
- free(t_b);
- goto read_out;
+ if (get_xattr_total_size (fd, &t_b, &k_s, &v_s, &num,
+ dict) == -1) {
+ if (t_b) free(t_b);
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "list of xattr of gfid %s failed\n",
+ rd->gfid);
+ break;
+ }
+ ri->work.data = GF_CALLOC ((k_s + v_s) , sizeof(char),
+ gf_mt_recon_private_t);
+ get_xattr(fd, t_b, ri->work.data, v_s, num, dict);
+ ri->work.num = num;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished getattr for gfid %s \n",
+ ri->rec.gfid);
+ free(t_b);
+ break;
- } else if ((rd->op == GF_FOP_FSETATTR) ||
- (rd->op == GF_FOP_SETATTR)) {
+ case GF_FOP_FSETATTR:
+ case GF_FOP_SETATTR:
+ //TBD - to get the actual attrbutes and fill
+ // mode, uid, gid, size, atime, mtime
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unimplemented fop %u\n", rd->op);
+ break;
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unrecognized fop %u\n", rd->op);
- //TBD - to get the actual attrbutes and fill
- // mode, uid, gid, size, atime, mtime
- }
-read_out:
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished recon read for gfid %s at offset %d for %d bytes \n",
- rd->gfid, rd->offset, rd->len);
- break;
- }
- case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT:
- {
- dict_t * dict = NULL;
- // first_index always starts with 1 but records starts at 0.
- wip = work->index - (dr->workers[0].recon_info->first_index);
- ri = &(dr->workers[0].recon_info->records[wip]);
- rd = &(ri->rec);
+ }
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished recon read for gfid %s at offset %d for %d bytes \n",
+ rd->gfid, rd->offset, rd->len);
+ break;
- nsr_worker_log(this->name, GF_LOG_INFO,
- "got recon commit for index %d that has gfid %s \n",
- wip, rd->gfid);
- dict = dict_new ();
- if (!dict) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "failed allocating for dictionary\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
- apply_record(ctx, ri, dict);
-commit_out:
- nsr_worker_log(this->name, GF_LOG_INFO,
- "finished recon commit for gfid %s \n",
- rd->gfid);
- break;
- }
- case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH:
- {
- dict_t * dict = NULL;
- dict = dict_new ();
- if (!dict) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "failed allocating for dictionary\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
- if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
- GF_ASSERT(0);
- nsr_worker_log(this->name, GF_LOG_ERROR,
- "error setting term in dict\n");
- goto commit_out;
- }
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_COMMIT:
+ // first_index always starts with 1 but records starts at 0.
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
- // Increment work index with the start index
- wip = work->index - (dr->workers[0].recon_info->first_index);
- ri = &(dr->workers[0].recon_info->records[wip]);
- rd = &(ri->rec);
- //fd = glfs_open(ctx->fs, rd->gfid, O_RDONLY); //TBD - using gfid
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got recon commit for index %d that has gfid %s \n",
+ wip, rd->gfid);
+ dict = dict_new ();
+ if (!dict) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ apply_record(ctx, ri, dict);
- glfs_fsync_with_xdata(fd, dict);
- break;
- }
- }
- return;
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "finished recon commit for gfid %s \n",
+ rd->gfid);
+ break;
+
+ case NSR_WORK_ID_SINGLE_RECONCILIATION_FLUSH:
+ dict = dict_new ();
+ if (!dict) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "failed allocating for dictionary\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_TERM_XATTR,ri->work.term)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+ if (dict_set_int32(dict,RECON_INDEX_XATTR,ri->work.index)) {
+ GF_ASSERT(0);
+ nsr_worker_log(this->name, GF_LOG_ERROR,
+ "error setting term in dict\n");
+ break;
+ }
+
+ // Increment work index with the start index
+ wip = work->index - (dr->workers[0].recon_info->first_index);
+ ri = &(dr->workers[0].recon_info->records[wip]);
+ rd = &(ri->rec);
+
+ glfs_fsync_with_xdata(fd, dict);
+ break;
+
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "unrecognized request id %u\n", work->req_id);
+ }
}
// thread for doing data work