summaryrefslogtreecommitdiffstats
path: root/xlators/cluster
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2014-03-25 18:37:16 +0000
committerJeff Darcy <jdarcy@redhat.com>2014-04-22 14:33:31 +0000
commit90366df60fa3b9b2915a4a8d804c2e95ae1948e7 (patch)
treecd75a682bcfb5b83d7d476551b748a6d0af1556b /xlators/cluster
parent66909ef9da737371395e63fd2557f118f6672446 (diff)
nsr: fix more reconciliation resource leaks
Also fixed GF_CALLOC calls to use proper memory types instead of always gf_mt_recon_private_t, so we that we can use state dumps to see what's leaking. This in turn required solving some very "interesting" problems to do with xlator/GFAPI mixing and THIS (which is used within GF_CALLOC). Change-Id: I3f928c9ac89600649bb3934664a3c4f6c43937e5 Signed-off-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'xlators/cluster')
-rw-r--r--xlators/cluster/nsr-recon/src/recon_driver.c390
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.c12
-rw-r--r--xlators/cluster/nsr-recon/src/recon_xlator.h16
3 files changed, 237 insertions, 181 deletions
diff --git a/xlators/cluster/nsr-recon/src/recon_driver.c b/xlators/cluster/nsr-recon/src/recon_driver.c
index e0327f81d..8c7622a02 100644
--- a/xlators/cluster/nsr-recon/src/recon_driver.c
+++ b/xlators/cluster/nsr-recon/src/recon_driver.c
@@ -183,6 +183,19 @@ _nsr_worker_log (const char *func, int line, char *member,
#endif
/*
+ * Recon Driver Calloc
+ *
+ * We need this because all of this messing about with gfapi from within a
+ * translator keeps scrambling THIS (only one reason it's a terrible idea) and
+ * we need THIS to have a value that represents our initialization with our
+ * memory types.
+ *
+ * Note that the macro requires "this" to be defined in the current scope.
+ */
+
+#define RD_CALLOC(x,y,z) ({THIS = this; GF_CALLOC(x,y,z); })
+
+/*
* This function gets the size of all the extended attributes for a file.
* This is used so that caller knows how much to allocate for key-value storage.
*
@@ -425,9 +438,9 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx,
char *morph_name = NULL, *ptr = NULL;
priv = this->private;
- my_name = GF_CALLOC (1,
+ my_name = RD_CALLOC (1,
strlen (priv->replica_group_members[0]) + 1,
- gf_mt_recon_private_t);
+ gf_mt_recon_member_name_t);
strcpy (my_name, priv->replica_group_members[0]);
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -456,7 +469,8 @@ nsr_recon_start_work(nsr_per_node_worker_t *ctx,
return -1;
}
- morph_name = GF_CALLOC (1, strlen (my_name) + 1, gf_mt_recon_private_t);
+ morph_name = RD_CALLOC (1, strlen (my_name) + 1,
+ gf_mt_recon_member_name_t);
strcpy (morph_name, my_name);
ptr = strchr (morph_name, '/');
@@ -570,148 +584,152 @@ control_worker_func_0(nsr_per_node_worker_t *ctx,
{
unsigned int index = ctx->index;
nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
- xlator_t *this = ctx->driver_ctx->this;
- nsr_recon_private_t *priv = this->private;
nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ xlator_t *this = dr->this;
+ nsr_recon_private_t *priv = this->private;
ctx->is_control = _gf_true;
- switch (work->req_id){
- case NSR_WORK_ID_INI:
- {
- break;
- }
- case NSR_WORK_ID_FINI:
- {
- break;
- }
- case NSR_WORK_ID_GET_LAST_TERM_INFO:
- {
- nsr_recon_last_term_info_t lt;
- nsr_reconciliator_info_t *recon_info = rw->recon_info;
- // term is stuffed inside work->index. overloading.
- int32_t term = work->index;
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "trying to get last term info for node %d with current term %d\n",index, term);
-
- // TBD - handle errors
- // This is called by the leader after it gets the current term.
- // Makes searching easier.
- nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, term, &lt);
- recon_info->last_term = lt.last_term;
- recon_info->commited_ops = lt.commited_ops;
- recon_info->last_index = lt.last_index;
- recon_info->first_index = lt.first_index;
-
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "out of get last term info with current term %d. got ops %d with first %d and last %d \n",
- recon_info->last_term, recon_info->commited_ops,
- recon_info->first_index, recon_info->last_index);
- break;
- }
- case NSR_WORK_ID_GET_GIVEN_TERM_INFO:
- {
- nsr_recon_last_term_info_t lt;
- nsr_reconciliator_info_t *recon_info = rw->recon_info;
- // term is stuffed inside work->index. overloading.
- int32_t term = work->index;
+ switch (work->req_id) {
+ case NSR_WORK_ID_INI:
+ {
+ break;
+ }
+ case NSR_WORK_ID_FINI:
+ {
+ break;
+ }
+ case NSR_WORK_ID_GET_LAST_TERM_INFO:
+ {
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ // term is stuffed inside work->index. overloading.
+ int32_t term = work->index;
- nsr_worker_log(this->name, GF_LOG_INFO,
- "trying to get term info for node %d for term %d\n",index, term);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get last term info for node %d with current term %d\n",index, term);
- // TBD - handle errors
- nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, term, &lt);
+ // TBD - handle errors
+ // This is called by the leader after it gets the current term.
+ // Makes searching easier.
+ nsr_recon_libchangelog_get_last_term_info(this, priv->changelog_base_path, term, &lt);
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
- recon_info->last_term = lt.last_term;
- recon_info->commited_ops = lt.commited_ops;
- recon_info->last_index = lt.last_index;
- recon_info->first_index = lt.first_index;
- nsr_worker_log(this->name, GF_LOG_INFO,
- "out of get term info for term %d. got ops %d with first %d and last %d \n",
- recon_info->last_term, recon_info->commited_ops,
- recon_info->first_index, recon_info->last_index);
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get last term info with current term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->first_index, recon_info->last_index);
+ break;
+ }
+ case NSR_WORK_ID_GET_GIVEN_TERM_INFO:
+ {
+ nsr_recon_last_term_info_t lt;
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ // term is stuffed inside work->index. overloading.
+ int32_t term = work->index;
- break;
- }
- case NSR_WORK_ID_RECONCILIATOR_DO_WORK:
- {
- // For local resolution, the main driver thread does it.
- // SO there is no way we can have this message for this node.
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get term info for node %d for term %d\n",index, term);
- nsr_worker_log(this->name, GF_LOG_INFO,
- "this message should not be sent \n");
- break;
- }
- case NSR_WORK_ID_RESOLUTION_DO_WORK:
- {
+ // TBD - handle errors
+ nsr_recon_libchangelog_get_this_term_info(this,priv->changelog_base_path, term, &lt);
- nsr_worker_log(this->name, GF_LOG_INFO,
- "this message should not be sent \n");
- ctx->result = -1;
- break;
- }
- case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
- {
- nsr_reconciliator_info_t *recon_info = rw->recon_info;
- // first_index and last_index at 0 indicates empty log.
- // For non empty log, the first_index always starts at 1.
- uint32_t num = (dr->workers[index].recon_info->last_index -
- dr->workers[index].recon_info->first_index + 1);
- nsr_recon_record_details_t *rd;
- uint32_t i=0;
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
- index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
-
-
- // TBD - handle buffer allocation errors
- rd = GF_CALLOC(num,
- sizeof(nsr_recon_record_details_t),
- gf_mt_recon_private_t);
- if (rd == NULL) {
- ctx->result = -1;
- return;
- }
-
- recon_info->records = GF_CALLOC(num,
- sizeof(nsr_reconciliation_record_t),
- gf_mt_recon_private_t);
- if (recon_info->records == NULL) {
- ctx->result = -1;
- return;
- }
-
- // TBD - handle errors
- if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
- recon_info->last_term,
- recon_info->first_index,
- recon_info->last_index,
- rd) == _gf_false) {
- ctx->result = -1;
- return;
- }
-
- // The above function writes into rd from 0 to (num -1)
- // We need to take care of this whenever we deal with records
- for (i=0; i < num; i++) {
- ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
- memcpy(&(recon_info->records[i].rec),
- &(rd[i]),
- sizeof(nsr_recon_record_details_t));
- }
-
- GF_FREE(rd);
-
- nsr_worker_log(this->name, GF_LOG_INFO,
- "got reconciliation window records for node %d for term %d \n",
- index, recon_info->last_term);
- break;
- }
- }
+ recon_info->last_term = lt.last_term;
+ recon_info->commited_ops = lt.commited_ops;
+ recon_info->last_index = lt.last_index;
+ recon_info->first_index = lt.first_index;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "out of get term info for term %d. got ops %d with first %d and last %d \n",
+ recon_info->last_term, recon_info->commited_ops,
+ recon_info->first_index, recon_info->last_index);
+
+ break;
+ }
+ case NSR_WORK_ID_RECONCILIATOR_DO_WORK:
+ {
+ // For local resolution, the main driver thread does it.
+ // SO there is no way we can have this message for this node.
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "this message should not be sent \n");
+ break;
+ }
+ case NSR_WORK_ID_RESOLUTION_DO_WORK:
+ {
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "this message should not be sent \n");
+ ctx->result = -1;
+ break;
+ }
+ case NSR_WORK_ID_GET_RECONCILATION_WINDOW:
+ {
+ nsr_reconciliator_info_t *recon_info = rw->recon_info;
+ // first_index and last_index at 0 indicates empty log.
+ // For non empty log, the first_index always starts at 1.
+ uint32_t num = (dr->workers[index].recon_info->last_index -
+ dr->workers[index].recon_info->first_index + 1);
+ nsr_recon_record_details_t *rd;
+ uint32_t i=0;
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "trying to get reconciliation window records for node %d for term %d with first %d last %d\n",
+ index, recon_info->last_term, recon_info->first_index, recon_info->last_index);
+
+
+ // TBD - handle buffer allocation errors
+ rd = RD_CALLOC(num,
+ sizeof(nsr_recon_record_details_t),
+ gf_mt_recon_record_details_t);
+ if (rd == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
+ recon_info->records = RD_CALLOC(num,
+ sizeof(nsr_reconciliation_record_t),
+ gf_mt_recon_record_t);
+ if (recon_info->records == NULL) {
+ ctx->result = -1;
+ return;
+ }
+
+ // TBD - handle errors
+ if (nsr_recon_libchangelog_get_records(this, priv->changelog_base_path,
+ recon_info->last_term,
+ recon_info->first_index,
+ recon_info->last_index,
+ rd) == _gf_false) {
+ ctx->result = -1;
+ return;
+ }
+
+ // The above function writes into rd from 0 to (num -1)
+ // We need to take care of this whenever we deal with records
+ for (i=0; i < num; i++) {
+ ENDIAN_CONVERSION_RD(rd[i], _gf_true); //ntohl
+ memcpy(&(recon_info->records[i].rec),
+ &(rd[i]),
+ sizeof(nsr_recon_record_details_t));
+ }
+
+ GF_FREE(rd);
+
+ nsr_worker_log(this->name, GF_LOG_INFO,
+ "got reconciliation window records for node %d for term %d \n",
+ index, recon_info->last_term);
+ break;
+ }
+
+ default:
+ nsr_worker_log (this->name, GF_LOG_ERROR,
+ "bad req id %u", work->req_id);
+ }
return;
}
@@ -756,6 +774,7 @@ control_worker_main_0(nsr_per_node_worker_t *ctx)
nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n");
list_del_init (&work->list);
+ GF_FREE(work);
nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n");
}
@@ -896,6 +915,7 @@ control_worker_get_window (nsr_per_node_worker_t *ctx, nsr_recon_work_t *work)
unsigned int index = ctx->index;
nsr_replica_worker_t *rw = &(ctx->driver_ctx->workers[index]);
nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ xlator_t *this = dr->this;
nsr_recon_log_info_t li;
nsr_reconciliator_info_t *recon_info = rw->recon_info;
uint32_t i = 0;
@@ -924,14 +944,14 @@ control_worker_get_window (nsr_per_node_worker_t *ctx, nsr_recon_work_t *work)
}
// then read
- rd = GF_CALLOC(num,
+ rd = RD_CALLOC(num,
sizeof(nsr_recon_record_details_t),
gf_mt_recon_private_t);
if (rd == NULL) {
ctx->result = -1;
return;
}
- recon_info->records = GF_CALLOC(num,
+ recon_info->records = RD_CALLOC(num,
sizeof(nsr_reconciliation_record_t),
gf_mt_recon_private_t);
if (recon_info->records == NULL) {
@@ -1138,6 +1158,7 @@ control_worker_main(nsr_per_node_worker_t *ctx)
}
nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n");
list_del_init (&work->list);
+ GF_FREE(work);
nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n");
}
@@ -1217,9 +1238,10 @@ nsr_recon_driver_set_role(nsr_recon_driver_ctx_t *ctx,
uint32_t term)
{
nsr_role_work_t *rw;
+ xlator_t *this = ctx->this;
nsr_driver_log(this->name, GF_LOG_INFO, "set role called \n");
- rw = GF_CALLOC(1, sizeof (nsr_role_work_t), 0);
+ rw = RD_CALLOC(1, sizeof (nsr_role_work_t), gf_mt_recon_role_work_t);
memcpy(&rw->role, rr, sizeof(nsr_recon_role_t));
rw->term = term;
INIT_LIST_HEAD(&(rw->list));
@@ -1255,6 +1277,7 @@ nsr_recon_driver_get_role(int32_t *status,
uint8_t i=0, j=0;
nsr_recon_role_t *rr = &(rw->role);
nsr_reconciliator_info_t *tmp;
+ xlator_t *this = ctx->this;
// First make all the threads uninitialise
for (i = 0; i < ctx->replica_group_size; i++) {
@@ -1269,8 +1292,8 @@ nsr_recon_driver_get_role(int32_t *status,
case joiner:
// First set info this node
- tmp = GF_CALLOC (1, sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
+ tmp = RD_CALLOC (1, sizeof (nsr_reconciliator_info_t),
+ gf_mt_recon_reconciliator_info_t);
if (!tmp) {
*status = -1;
return 0;
@@ -1304,9 +1327,9 @@ nsr_recon_driver_get_role(int32_t *status,
// Allocate this here. This will get later
// filled when the leader tries to get last term
// information from all the nodes
- tmp = GF_CALLOC (1,
+ tmp = RD_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
+ gf_mt_recon_reconciliator_info_t);
if (!tmp) {
*status = -1;
return 0;
@@ -1346,9 +1369,9 @@ nsr_recon_driver_get_role(int32_t *status,
nsr_driver_log(this->name, GF_LOG_INFO,
"nsr_recon_driver_get_role: this as reconciliator. found other server %s\n",
ctx->workers[j].name);
- tmp = GF_CALLOC (1,
+ tmp = RD_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
+ gf_mt_recon_reconciliator_info_t);
if (!tmp) {
*status = -1;
return 0;
@@ -1384,9 +1407,9 @@ nsr_recon_driver_get_role(int32_t *status,
nsr_driver_log(this->name, GF_LOG_INFO,
"nsr_recon_driver_get_role: this as resolutor. found other server %s as reconciliator\n",
ctx->workers[j].name);
- tmp = GF_CALLOC (1,
+ tmp = RD_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
+ gf_mt_recon_reconciliator_info_t);
if (!tmp) {
*status = -1;
return 0;
@@ -1404,9 +1427,9 @@ nsr_recon_driver_get_role(int32_t *status,
GF_ASSERT(ctx->reconciliator_index != 0);
break;
}
- tmp = GF_CALLOC (1,
+ tmp = RD_CALLOC (1,
sizeof (nsr_reconciliator_info_t),
- gf_mt_recon_private_t);
+ gf_mt_recon_reconciliator_info_t);
if (!tmp) {
*status = -1;
return 0;
@@ -1449,14 +1472,15 @@ compute_resolution_work(nsr_recon_driver_ctx_t *ctx,
{
uint32_t i=0;
uint32_t num = (my_info->last_index - my_info->first_index + 1);
+ xlator_t *this = ctx->this;
if (invalidate) {
if (my_info->records) {
GF_FREE(my_info->records);
}
- my_info->records = GF_CALLOC(num,
- sizeof(nsr_reconciliation_record_t),
- gf_mt_recon_private_t);
+ my_info->records = RD_CALLOC(num,
+ sizeof(nsr_reconciliation_record_t),
+ gf_mt_recon_record_t);
}
for (i=0; i < num; i++) {
@@ -1524,6 +1548,7 @@ apply_record(nsr_per_node_worker_t *ctx,
{
struct glfs_fd *fd = NULL;
struct glfs_object *obj = NULL;
+ struct glfs_object *to_obj = NULL;
gf_boolean_t retval = _gf_false;
if (ri->rec.op == GF_FOP_WRITE) {
@@ -1784,8 +1809,6 @@ apply_record(nsr_per_node_worker_t *ctx,
} else if (ri->rec.op == GF_FOP_LINK) {
- struct glfs_object *to_obj = NULL;
-
nsr_worker_log(this->name, GF_LOG_INFO,
"Doing hard link for file %s to file %s \n",
ri->rec.entry, ri->rec.gfid);
@@ -1810,8 +1833,6 @@ apply_record(nsr_per_node_worker_t *ctx,
} else if (ri->rec.op == GF_FOP_RENAME) {
- struct glfs_object *to_obj = NULL;
-
nsr_worker_log(this->name, GF_LOG_INFO,
"Doing rename for file %s to file %s \n",
ri->rec.entry, ri->rec.newloc);
@@ -1910,6 +1931,13 @@ err:
*/
glfs_h_close(obj);
}
+ if (to_obj) {
+ /*
+ * AFAICT fd operations do not borrow this reference, so we
+ * still need to drop it ourselves.
+ */
+ glfs_h_close(to_obj);
+ }
return retval;
}
@@ -1947,6 +1975,7 @@ data_worker_func(nsr_per_node_worker_t *ctx,
nsr_recon_work_t *work)
{
nsr_recon_driver_ctx_t *dr = ctx->driver_ctx;
+ xlator_t *this = dr->this;
nsr_reconciliation_record_t *ri = NULL;
nsr_recon_record_details_t *rd = NULL;
int wip = 0;
@@ -1958,7 +1987,6 @@ data_worker_func(nsr_per_node_worker_t *ctx,
char *t_b= NULL;
uint32_t num=0;
-
switch (work->req_id){
case NSR_WORK_ID_INI:
nsr_worker_log(this->name, GF_LOG_INFO,
@@ -2050,8 +2078,8 @@ data_worker_func(nsr_per_node_worker_t *ctx,
break;
}
- ri->work.data = GF_CALLOC (rd->len , sizeof(char),
- gf_mt_recon_private_t);
+ ri->work.data = RD_CALLOC (rd->len , sizeof(char),
+ gf_mt_recon_work_data_t);
if (glfs_read_with_xdata (fd, ri->work.data, rd->len,
0, dict) != rd->len) {
nsr_worker_log(this->name, GF_LOG_ERROR,
@@ -2059,9 +2087,6 @@ data_worker_func(nsr_per_node_worker_t *ctx,
rd->offset, rd->len);
break;
}
-
- glfs_close_with_xdata(fd, dict);
- glfs_h_close(obj);
break;
case GF_FOP_FTRUNCATE:
@@ -2127,8 +2152,8 @@ data_worker_func(nsr_per_node_worker_t *ctx,
rd->gfid);
break;
}
- ri->work.data = GF_CALLOC ((k_s + v_s) , sizeof(char),
- gf_mt_recon_private_t);
+ ri->work.data = RD_CALLOC ((k_s + v_s) , sizeof(char),
+ gf_mt_recon_work_data_t);
if (get_xattr(fd, t_b, ri->work.data, v_s, num, dict) == -1) {
nsr_worker_log(this->name, GF_LOG_ERROR,
"get xattr of gfid %s failed\n", rd->gfid);
@@ -2229,6 +2254,16 @@ data_worker_func(nsr_per_node_worker_t *ctx,
nsr_worker_log (this->name, GF_LOG_ERROR,
"unrecognized request id %u\n", work->req_id);
}
+
+ if (fd) {
+ glfs_close_with_xdata(fd, dict);
+ }
+ if (obj) {
+ glfs_h_close(obj);
+ }
+ if (dict) {
+ dict_unref(dict);
+ }
}
// thread for doing data work
@@ -2261,6 +2296,7 @@ data_worker_main(nsr_per_node_worker_t *ctx)
}
nsr_worker_log(this->name, GF_LOG_INFO,"deleting work item\n");
list_del_init (&work->list);
+ GF_FREE(work);
nsr_worker_log(this->name, GF_LOG_INFO,"finished deleting work item\n");
}
@@ -2270,13 +2306,16 @@ data_worker_main(nsr_per_node_worker_t *ctx)
//make recon work
static void
-recon_make_work(nsr_recon_work_t **work,
+recon_make_work(nsr_recon_driver_ctx_t *ctx,
+ nsr_recon_work_t **work,
nsr_recon_work_req_id_t req_id,
int32_t i)
{
+ xlator_t *this = ctx->this;
+
// TBD - change this to get from a static pool
// This cannot fail
- (*work) = GF_CALLOC (1, sizeof (nsr_recon_work_t), gf_mt_recon_private_t);
+ (*work) = RD_CALLOC (1, sizeof (nsr_recon_work_t), gf_mt_recon_work_t);
(*work)->req_id = req_id;
(*work)->index = i;
(*work)->in_use = _gf_true;
@@ -2321,10 +2360,10 @@ create_worker_threads(nsr_recon_private_t *priv,
{
uint32_t i;
nsr_per_node_worker_t *worker = w;
-
+ xlator_t *this = ctx->this;
for (i=0; i < num; i++) {
- worker->id = GF_CALLOC(1, 10, gf_mt_recon_private_t);
+ worker->id = RD_CALLOC(1, 10, gf_mt_recon_id_t);
if (!worker->id) {
nsr_driver_log (priv->this->name, GF_LOG_ERROR, "memory allocation error \n");
return _gf_false;
@@ -2383,7 +2422,7 @@ send_and_wait(int32_t *result,
if (ctx->mode == NSR_SEQ) {
for (i=0; i < num; i++) {
if ((bm & (1 << i)) && ctx->workers[i].in_use) {
- recon_make_work(&work, id, misc);
+ recon_make_work(ctx,&work, id, misc);
if (q == NSR_RECON_QUEUE_TO_CONTROL) {
if (i == 0)
control_worker_func_0(ctx->workers[0].control_worker, work);
@@ -2392,6 +2431,7 @@ send_and_wait(int32_t *result,
} else {
data_worker_func(ctx->workers[i].data_worker, work);
}
+ GF_FREE(work);
}
}
goto out;
@@ -2399,7 +2439,7 @@ send_and_wait(int32_t *result,
for (i=0; i < num; i++) {
if ((bm & (1 << i)) && ctx->workers[i].in_use) {
- recon_make_work(&work, id, misc);
+ recon_make_work(ctx,&work, id, misc);
atomic_inc(&(ctx->outstanding));
recon_queue_to_worker(ctx, work, i, q);
}
@@ -2921,7 +2961,7 @@ nsr_reconciliation_driver(void *arg)
driver_ctx = &priv->driver_thread_context;
(*driver_ctx) = GF_CALLOC (1,
sizeof (nsr_recon_driver_ctx_t),
- gf_mt_recon_private_t);
+ gf_mt_recon_driver_ctx_t);
if (!driver_ctx) {
gf_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
return NULL;
@@ -2941,9 +2981,9 @@ nsr_reconciliation_driver(void *arg)
}
INIT_LIST_HEAD(&(ctx->role_head.list));
- ctx->workers = GF_CALLOC (replica_group_size,
+ ctx->workers = RD_CALLOC (replica_group_size,
sizeof(nsr_replica_worker_t),
- gf_mt_recon_private_t);
+ gf_mt_recon_worker_t);
if (!ctx->workers) {
nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
return NULL;
@@ -2952,17 +2992,17 @@ nsr_reconciliation_driver(void *arg)
strcpy(ctx->workers[i].name, priv->replica_group_members[i]);
}
- control_s = GF_CALLOC (replica_group_size,
+ control_s = RD_CALLOC (replica_group_size,
sizeof(nsr_per_node_worker_t),
- gf_mt_recon_private_t);
+ gf_mt_recon_per_node_worker_t);
if (!control_s) {
nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
return NULL;
}
- data_s = GF_CALLOC (replica_group_size,
+ data_s = RD_CALLOC (replica_group_size,
sizeof(nsr_per_node_worker_t),
- gf_mt_recon_private_t);
+ gf_mt_recon_per_node_worker_t);
if (!data_s) {
nsr_driver_log (this->name, GF_LOG_ERROR, "memory allocation error \n");
return NULL;
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.c b/xlators/cluster/nsr-recon/src/recon_xlator.c
index da389fccf..ffbf74296 100644
--- a/xlators/cluster/nsr-recon/src/recon_xlator.c
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.c
@@ -300,7 +300,8 @@ gf_boolean_t nsr_recon_libchangelog_get_records(xlator_t *this, char *bp, int32_
"libchangelog_get_records called for term %d index from %d to %d \n",
term, first, last );
- orig = rb = GF_CALLOC(128, ((last - first) + 1), gf_mt_recon_private_t);
+ orig = rb = GF_CALLOC(128, ((last - first) + 1),
+ gf_mt_recon_changelog_buf_t);
sprintf(path,"%s/%s%d",bp,"TERM.",term);
fd = open(path, O_RDONLY);
@@ -558,7 +559,7 @@ nsr_recon_open (call_frame_t *frame, xlator_t *this,
nsr_recon_fd_t *rfd = NULL;
recon_main_log (this->name, GF_LOG_INFO, "nsr_recon_open called for path %s \n",loc->path );
- rfd = GF_CALLOC (1, sizeof (*rfd), gf_mt_recon_private_t);
+ rfd = GF_CALLOC (1, sizeof (*rfd), gf_mt_recon_fd_t);
if (!rfd) {
op_ret = -1;
op_errno = ENOMEM;
@@ -896,10 +897,10 @@ init (xlator_t *this)
priv->replica_group_members = GF_CALLOC (priv->replica_group_size,
sizeof(char *),
- gf_mt_recon_private_t);
+ gf_mt_recon_members_list_t);
priv->replica_group_members[0] = GF_CALLOC (1,
strlen(local),
- gf_mt_recon_private_t);
+ gf_mt_recon_member_name_t);
if (!priv->replica_group_members || !(priv->replica_group_members[0])) {
gf_log (this->name, GF_LOG_ERROR,
"str allocation error\n");
@@ -912,7 +913,8 @@ init (xlator_t *this)
member = strtok(members, ",");
else
member = strtok(NULL, ",");
- priv->replica_group_members[i] = GF_CALLOC (1, strlen(member) + 1, gf_mt_recon_private_t);
+ priv->replica_group_members[i] = GF_CALLOC (1,
+ strlen(member) + 1, gf_mt_recon_member_name_t);
if (!priv->replica_group_members[i]) {
gf_log (this->name, GF_LOG_ERROR,
"str allocation error\n");
diff --git a/xlators/cluster/nsr-recon/src/recon_xlator.h b/xlators/cluster/nsr-recon/src/recon_xlator.h
index 57c44cca2..d9692a632 100644
--- a/xlators/cluster/nsr-recon/src/recon_xlator.h
+++ b/xlators/cluster/nsr-recon/src/recon_xlator.h
@@ -15,7 +15,21 @@
#include <pthread.h>
enum gf_dht_mem_types_ {
- gf_mt_recon_private_t = gf_common_mt_end + 1,
+ gf_mt_recon_changelog_buf_t = gf_common_mt_end + 1,
+ gf_mt_recon_driver_ctx_t,
+ gf_mt_recon_fd_t,
+ gf_mt_recon_id_t,
+ gf_mt_recon_member_name_t,
+ gf_mt_recon_members_list_t,
+ gf_mt_recon_per_node_worker_t,
+ gf_mt_recon_private_t,
+ gf_mt_recon_reconciliator_info_t,
+ gf_mt_recon_record_t,
+ gf_mt_recon_record_details_t,
+ gf_mt_recon_role_work_t,
+ gf_mt_recon_work_t,
+ gf_mt_recon_work_data_t,
+ gf_mt_recon_worker_t,
gf_mt_recon_end,
};