/* Copyright (c) 2008-2012 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include #include #include #include #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" #endif #include "afr-common.c" #include "defaults.c" #include "glusterfs.h" #include "pump.h" static int afr_set_dict_gfid (dict_t *dict, uuid_t gfid) { int ret = 0; uuid_t *pgfid = NULL; GF_ASSERT (gfid); pgfid = GF_CALLOC (1, sizeof (uuid_t), gf_common_mt_char); if (!pgfid) { ret = -1; goto out; } uuid_copy (*pgfid, gfid); ret = dict_set_dynptr (dict, "gfid-req", pgfid, sizeof (uuid_t)); if (ret) gf_log (THIS->name, GF_LOG_ERROR, "gfid set failed"); out: if (ret && pgfid) GF_FREE (pgfid); return ret; } static int afr_set_root_gfid (dict_t *dict) { uuid_t gfid; int ret = 0; memset (gfid, 0, 16); gfid[15] = 1; ret = afr_set_dict_gfid (dict, gfid); return ret; } static int afr_build_child_loc (xlator_t *this, loc_t *child, loc_t *parent, char *name) { int ret = -1; uuid_t pargfid = {0}; if (!child) goto out; if (!uuid_is_null (parent->inode->gfid)) uuid_copy (pargfid, parent->inode->gfid); else if (!uuid_is_null (parent->gfid)) uuid_copy (pargfid, parent->gfid); if (uuid_is_null (pargfid)) goto out; if (strcmp (parent->path, "/") == 0) ret = gf_asprintf ((char **)&child->path, "/%s", name); else ret = gf_asprintf ((char **)&child->path, "%s/%s", parent->path, name); if (-1 == ret) { gf_log (this->name, GF_LOG_ERROR, "asprintf failed while setting child path"); } child->name = strrchr (child->path, '/'); if (child->name) child->name++; child->parent = inode_ref (parent->inode); child->inode = inode_new (parent->inode->table); uuid_copy (child->pargfid, pargfid); if (!child->inode) { ret = -1; goto out; } ret = 0; out: if ((ret == -1) && child) loc_wipe (child); return ret; } static void afr_build_root_loc (xlator_t *this, loc_t *loc) { afr_private_t *priv = NULL; priv = this->private; loc->path = gf_strdup ("/"); loc->name = ""; loc->inode = inode_ref (priv->root_inode); uuid_copy (loc->gfid, loc->inode->gfid); } static void afr_update_loc_gfids (loc_t *loc, struct iatt *buf, struct iatt *postparent) { GF_ASSERT (loc); GF_ASSERT (buf); uuid_copy (loc->gfid, buf->ia_gfid); if (postparent) uuid_copy (loc->pargfid, postparent->ia_gfid); } static uint64_t pump_pid = 0; static inline void pump_fill_loc_info (loc_t *loc, struct iatt *iatt, struct iatt *parent) { afr_update_loc_gfids (loc, iatt, parent); uuid_copy (loc->inode->gfid, iatt->ia_gfid); } static int pump_mark_start_pending (xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; priv = this->private; pump_priv = priv->pump_private; pump_priv->pump_start_pending = 1; return 0; } static int is_pump_start_pending (xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; priv = this->private; pump_priv = priv->pump_private; return (pump_priv->pump_start_pending); } static int pump_remove_start_pending (xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; priv = this->private; pump_priv = priv->pump_private; pump_priv->pump_start_pending = 0; return 0; } static pump_state_t pump_get_state () { xlator_t *this = NULL; afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; pump_state_t ret; this = THIS; priv = this->private; pump_priv = priv->pump_private; LOCK (&pump_priv->pump_state_lock); { ret = pump_priv->pump_state; } UNLOCK (&pump_priv->pump_state_lock); return ret; } int pump_change_state (xlator_t *this, pump_state_t state) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; pump_state_t state_old; pump_state_t state_new; priv = this->private; pump_priv = priv->pump_private; GF_ASSERT (pump_priv); LOCK (&pump_priv->pump_state_lock); { state_old = pump_priv->pump_state; state_new = state; pump_priv->pump_state = state; } UNLOCK (&pump_priv->pump_state_lock); gf_log (this->name, GF_LOG_DEBUG, "Pump changing state from %d to %d", state_old, state_new); return 0; } static int pump_set_resume_path (xlator_t *this, const char *path) { int ret = 0; afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; priv = this->private; pump_priv = priv->pump_private; GF_ASSERT (pump_priv); LOCK (&pump_priv->resume_path_lock); { strncpy (pump_priv->resume_path, path, strlen (path) + 1); } UNLOCK (&pump_priv->resume_path_lock); return ret; } static int pump_save_path (xlator_t *this, const char *path) { afr_private_t *priv = NULL; pump_state_t state; dict_t *dict = NULL; loc_t loc = {0}; int dict_ret = 0; int ret = -1; state = pump_get_state (); if (state == PUMP_STATE_RESUME) return 0; priv = this->private; GF_ASSERT (priv->root_inode); afr_build_root_loc (this, &loc); dict = dict_new (); dict_ret = dict_set_str (dict, PUMP_PATH, (char *)path); if (dict_ret) gf_log (this->name, GF_LOG_WARNING, "%s: failed to set the key %s", path, PUMP_PATH); ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); if (ret < 0) { gf_log (this->name, GF_LOG_INFO, "setxattr failed - could not save path=%s", path); } else { gf_log (this->name, GF_LOG_DEBUG, "setxattr succeeded - saved path=%s", path); } dict_unref (dict); loc_wipe (&loc); return 0; } static int pump_check_and_update_status (xlator_t *this) { pump_state_t state; int ret = -1; state = pump_get_state (); switch (state) { case PUMP_STATE_RESUME: case PUMP_STATE_RUNNING: { ret = 0; break; } case PUMP_STATE_PAUSE: { ret = -1; break; } case PUMP_STATE_ABORT: { pump_save_path (this, "/"); ret = -1; break; } default: { gf_log (this->name, GF_LOG_DEBUG, "Unknown pump state"); ret = -1; break; } } return ret; } static const char * pump_get_resume_path (xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; const char *resume_path = NULL; priv = this->private; pump_priv = priv->pump_private; resume_path = pump_priv->resume_path; return resume_path; } static int pump_update_resume_state (xlator_t *this, const char *path) { pump_state_t state; const char *resume_path = NULL; state = pump_get_state (); if (state == PUMP_STATE_RESUME) { resume_path = pump_get_resume_path (this); if (strcmp (resume_path, "/") == 0) { gf_log (this->name, GF_LOG_DEBUG, "Reached the resume path (/). Proceeding to change state" " to running"); pump_change_state (this, PUMP_STATE_RUNNING); } else if (strcmp (resume_path, path) == 0) { gf_log (this->name, GF_LOG_DEBUG, "Reached the resume path. Proceeding to change state" " to running"); pump_change_state (this, PUMP_STATE_RUNNING); } else { gf_log (this->name, GF_LOG_DEBUG, "Not yet hit the resume path:res-path=%s,path=%s", resume_path, path); } } return 0; } static gf_boolean_t is_pump_traversal_allowed (xlator_t *this, const char *path) { pump_state_t state; const char *resume_path = NULL; gf_boolean_t ret = _gf_true; state = pump_get_state (); if (state == PUMP_STATE_RESUME) { resume_path = pump_get_resume_path (this); if (strstr (resume_path, path)) { gf_log (this->name, GF_LOG_DEBUG, "On the right path to resumption path"); ret = _gf_true; } else { gf_log (this->name, GF_LOG_DEBUG, "Not the right path to resuming=> ignoring traverse"); ret = _gf_false; } } return ret; } static int pump_save_file_stats (xlator_t *this, const char *path) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; priv = this->private; pump_priv = priv->pump_private; LOCK (&pump_priv->resume_path_lock); { pump_priv->number_files_pumped++; strncpy (pump_priv->current_file, path, PATH_MAX); } UNLOCK (&pump_priv->resume_path_lock); return 0; } static int gf_pump_traverse_directory (loc_t *loc) { xlator_t *this = NULL; fd_t *fd = NULL; off_t offset = 0; loc_t entry_loc = {0}; gf_dirent_t *entry = NULL; gf_dirent_t *tmp = NULL; gf_dirent_t entries; struct iatt iatt = {0}; struct iatt parent = {0}; dict_t *xattr_rsp = NULL; int ret = 0; gf_boolean_t is_directory_empty = _gf_true; gf_boolean_t free_entries = _gf_false; INIT_LIST_HEAD (&entries.list); this = THIS; GF_ASSERT (loc->inode); fd = fd_create (loc->inode, pump_pid); if (!fd) { gf_log (this->name, GF_LOG_ERROR, "Failed to create fd for %s", loc->path); goto out; } ret = syncop_opendir (this, loc, fd); if (ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "opendir failed on %s", loc->path); goto out; } gf_log (this->name, GF_LOG_TRACE, "pump opendir on %s returned=%d", loc->path, ret); while (syncop_readdirp (this, fd, 131072, offset, NULL, &entries)) { free_entries = _gf_true; if (list_empty (&entries.list)) { gf_log (this->name, GF_LOG_TRACE, "no more entries in directory"); goto out; } list_for_each_entry_safe (entry, tmp, &entries.list, list) { gf_log (this->name, GF_LOG_DEBUG, "found readdir entry=%s", entry->d_name); offset = entry->d_off; if (uuid_is_null (entry->d_stat.ia_gfid)) { gf_log (this->name, GF_LOG_WARNING, "%s/%s: No " "gfid present skipping", loc->path, entry->d_name); continue; } loc_wipe (&entry_loc); ret = afr_build_child_loc (this, &entry_loc, loc, entry->d_name); if (ret) goto out; if ((strcmp (entry->d_name, ".") == 0) || (strcmp (entry->d_name, "..") == 0)) continue; is_directory_empty = _gf_false; gf_log (this->name, GF_LOG_DEBUG, "lookup %s => %"PRId64, entry_loc.path, iatt.ia_ino); ret = syncop_lookup (this, &entry_loc, NULL, &iatt, &xattr_rsp, &parent); if (ret) { gf_log (this->name, GF_LOG_ERROR, "%s: lookup failed", entry_loc.path); continue; } ret = afr_selfheal_name (this, loc->gfid, entry->d_name, NULL); if (ret) { gf_log (this->name, GF_LOG_ERROR, "%s: name self-heal failed (%s/%s)", entry_loc.path, uuid_utoa (loc->gfid), entry->d_name); continue; } ret = afr_selfheal (this, iatt.ia_gfid); if (ret) { gf_log (this->name, GF_LOG_ERROR, "%s: self-heal failed (%s)", entry_loc.path, uuid_utoa (iatt.ia_gfid)); continue; } pump_fill_loc_info (&entry_loc, &iatt, &parent); pump_update_resume_state (this, entry_loc.path); pump_save_path (this, entry_loc.path); pump_save_file_stats (this, entry_loc.path); ret = pump_check_and_update_status (this); if (ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Pump beginning to exit out"); goto out; } if (IA_ISDIR (iatt.ia_type)) { if (is_pump_traversal_allowed (this, entry_loc.path)) { gf_log (this->name, GF_LOG_TRACE, "entering dir=%s", entry->d_name); gf_pump_traverse_directory (&entry_loc); } } } gf_dirent_free (&entries); free_entries = _gf_false; gf_log (this->name, GF_LOG_TRACE, "offset incremented to %d", (int32_t ) offset); } ret = syncop_close (fd); if (ret < 0) gf_log (this->name, GF_LOG_DEBUG, "closing the fd failed"); if (is_directory_empty && (strcmp (loc->path, "/") == 0)) { pump_change_state (this, PUMP_STATE_RUNNING); gf_log (this->name, GF_LOG_INFO, "Empty source brick. " "Nothing to be done."); } out: if (entry_loc.path) loc_wipe (&entry_loc); if (free_entries) gf_dirent_free (&entries); return 0; } static int pump_update_resume_path (xlator_t *this) { const char *resume_path = NULL; resume_path = pump_get_resume_path (this); if (resume_path) { gf_log (this->name, GF_LOG_DEBUG, "Found a path to resume from: %s", resume_path); }else { gf_log (this->name, GF_LOG_DEBUG, "Did not find a path=> setting to '/'"); pump_set_resume_path (this, "/"); } pump_change_state (this, PUMP_STATE_RESUME); return 0; } static int32_t pump_xattr_cleaner (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { afr_private_t *priv = NULL; loc_t loc = {0}; int i = 0; int ret = 0; int source = 0; int sink = 1; priv = this->private; afr_build_root_loc (this, &loc); ret = syncop_removexattr (priv->children[source], &loc, PUMP_PATH, 0); ret = syncop_removexattr (priv->children[sink], &loc, PUMP_SINK_COMPLETE, 0); for (i = 0; i < priv->child_count; i++) { ret = syncop_removexattr (priv->children[i], &loc, PUMP_SOURCE_COMPLETE, 0); if (ret) { gf_log (this->name, GF_LOG_DEBUG, "removexattr " "failed with %s", strerror (-ret)); } } loc_wipe (&loc); return pump_command_reply (frame, this); } static int pump_complete_migration (xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; dict_t *dict = NULL; pump_state_t state; loc_t loc = {0}; int dict_ret = 0; int ret = -1; priv = this->private; pump_priv = priv->pump_private; GF_ASSERT (priv->root_inode); afr_build_root_loc (this, &loc); dict = dict_new (); state = pump_get_state (); if (state == PUMP_STATE_RUNNING) { gf_log (this->name, GF_LOG_DEBUG, "Pump finished pumping"); pump_priv->pump_finished = _gf_true; dict_ret = dict_set_str (dict, PUMP_SOURCE_COMPLETE, "jargon"); if (dict_ret) gf_log (this->name, GF_LOG_WARNING, "%s: failed to set the key %s", loc.path, PUMP_SOURCE_COMPLETE); ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); if (ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "setxattr failed - while notifying source complete"); } dict_ret = dict_set_str (dict, PUMP_SINK_COMPLETE, "jargon"); if (dict_ret) gf_log (this->name, GF_LOG_WARNING, "%s: failed to set the key %s", loc.path, PUMP_SINK_COMPLETE); ret = syncop_setxattr (PUMP_SINK_CHILD (this), &loc, dict, 0); if (ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "setxattr failed - while notifying sink complete"); } pump_save_path (this, "/"); } else if (state == PUMP_STATE_ABORT) { gf_log (this->name, GF_LOG_DEBUG, "Starting cleanup " "of pump internal xattrs"); call_resume (pump_priv->cleaner); } loc_wipe (&loc); return 0; } static int pump_lookup_sink (loc_t *loc) { xlator_t *this = NULL; struct iatt iatt, parent; dict_t *xattr_rsp; dict_t *xattr_req = NULL; int ret = 0; this = THIS; xattr_req = dict_new (); ret = afr_set_root_gfid (xattr_req); if (ret) goto out; ret = syncop_lookup (PUMP_SINK_CHILD (this), loc, xattr_req, &iatt, &xattr_rsp, &parent); if (ret) { gf_log (this->name, GF_LOG_DEBUG, "Lookup on sink child failed"); ret = -1; goto out; } out: if (xattr_req) dict_unref (xattr_req); return ret; } static int pump_task (void *data) { xlator_t *this = NULL; afr_private_t *priv = NULL; loc_t loc = {0}; struct iatt iatt, parent; dict_t *xattr_rsp = NULL; dict_t *xattr_req = NULL; int ret = -1; this = THIS; priv = this->private; GF_ASSERT (priv->root_inode); afr_build_root_loc (this, &loc); xattr_req = dict_new (); if (!xattr_req) { gf_log (this->name, GF_LOG_DEBUG, "Out of memory"); ret = -1; goto out; } afr_set_root_gfid (xattr_req); ret = syncop_lookup (this, &loc, xattr_req, &iatt, &xattr_rsp, &parent); gf_log (this->name, GF_LOG_TRACE, "lookup: path=%s gfid=%s", loc.path, uuid_utoa (loc.inode->gfid)); ret = pump_check_and_update_status (this); if (ret < 0) { goto out; } pump_update_resume_path (this); afr_set_root_gfid (xattr_req); ret = pump_lookup_sink (&loc); if (ret) { pump_update_resume_path (this); goto out; } gf_pump_traverse_directory (&loc); pump_complete_migration (this); out: if (xattr_req) dict_unref (xattr_req); loc_wipe (&loc); return 0; } static int pump_task_completion (int ret, call_frame_t *sync_frame, void *data) { xlator_t *this = NULL; afr_private_t *priv = NULL; this = THIS; priv = this->private; inode_unref (priv->root_inode); STACK_DESTROY (sync_frame->root); gf_log (this->name, GF_LOG_DEBUG, "Pump xlator exiting"); return 0; } int pump_start (call_frame_t *pump_frame, xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; int ret = -1; priv = this->private; pump_priv = priv->pump_private; afr_set_lk_owner (pump_frame, this, pump_frame->root); pump_pid = (uint64_t) (unsigned long)pump_frame->root; ret = synctask_new (pump_priv->env, pump_task, pump_task_completion, pump_frame, NULL); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "starting pump failed"); pump_change_state (this, PUMP_STATE_ABORT); goto out; } gf_log (this->name, GF_LOG_DEBUG, "setting pump as started lk_owner: %s %"PRIu64, lkowner_utoa (&pump_frame->root->lk_owner), pump_pid); priv->use_afr_in_pump = 1; out: return ret; } static int pump_start_synctask (xlator_t *this) { call_frame_t *frame = NULL; int ret = 0; frame = create_frame (this, this->ctx->pool); if (!frame) { gf_log (this->name, GF_LOG_ERROR, "Out of memory"); ret = -1; goto out; } pump_change_state (this, PUMP_STATE_RUNNING); ret = pump_start (frame, this); out: return ret; } int32_t pump_cmd_start_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { call_frame_t *prev = NULL; afr_local_t *local = NULL; int ret = 0; local = frame->local; if (op_ret < 0) { gf_log (this->name, GF_LOG_ERROR, "Could not initiate destination " "brick connect"); ret = op_ret; goto out; } gf_log (this->name, GF_LOG_DEBUG, "Successfully initiated destination " "brick connect"); pump_mark_start_pending (this); /* send the PARENT_UP as pump is ready now */ prev = cookie; if (prev && prev->this) prev->this->notify (prev->this, GF_EVENT_PARENT_UP, this); out: local->op_ret = ret; pump_command_reply (frame, this); return 0; } static int pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) { afr_local_t *local = NULL; afr_private_t *priv = NULL; dict_t *dict = NULL; data_t *data = NULL; char *clnt_cmd = NULL; loc_t loc = {0}; int ret = 0; priv = this->private; local = frame->local; GF_ASSERT (priv->root_inode); afr_build_root_loc (this, &loc); data = data_ref (dict_get (local->dict, RB_PUMP_CMD_START)); if (!data) { ret = -1; gf_log (this->name, GF_LOG_ERROR, "Could not get destination brick value"); goto out; } dict = dict_new (); if (!dict) { ret = -1; goto out; } clnt_cmd = GF_CALLOC (1, data->len+1, gf_common_mt_char); if (!clnt_cmd) { ret = -1; goto out; } memcpy (clnt_cmd, data->data, data->len); clnt_cmd[data->len] = '\0'; gf_log (this->name, GF_LOG_DEBUG, "Got destination brick %s\n", clnt_cmd); ret = dict_set_dynstr (dict, CLIENT_CMD_CONNECT, clnt_cmd); if (ret < 0) { gf_log (this->name, GF_LOG_ERROR, "Could not inititiate destination brick " "connect"); goto out; } STACK_WIND (frame, pump_cmd_start_setxattr_cbk, PUMP_SINK_CHILD(this), PUMP_SINK_CHILD(this)->fops->setxattr, &loc, dict, 0, NULL); ret = 0; out: if (dict) dict_unref (dict); if (data) data_unref (data); if (ret && clnt_cmd) GF_FREE (clnt_cmd); loc_wipe (&loc); return ret; } static int is_pump_aborted (xlator_t *this) { pump_state_t state; state = pump_get_state (); return ((state == PUMP_STATE_ABORT)); } int32_t pump_cmd_start_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata) { afr_local_t *local = NULL; char *path = NULL; pump_state_t state; int ret = 0; int need_unwind = 0; int dict_ret = -1; local = frame->local; if (op_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "getxattr failed - changing pump " "state to RUNNING with '/'"); path = "/"; ret = op_ret; } else { gf_log (this->name, GF_LOG_TRACE, "getxattr succeeded"); dict_ret = dict_get_str (dict, PUMP_PATH, &path); if (dict_ret < 0) path = "/"; } state = pump_get_state (); if ((state == PUMP_STATE_RUNNING) || (state == PUMP_STATE_RESUME)) { gf_log (this->name, GF_LOG_ERROR, "Pump is already started"); ret = -1; goto out; } pump_set_resume_path (this, path); if (is_pump_aborted (this)) /* We're re-starting pump afresh */ ret = pump_initiate_sink_connect (frame, this); else { /* We're re-starting pump from a previous pause */ gf_log (this->name, GF_LOG_DEBUG, "about to start synctask"); ret = pump_start_synctask (this); need_unwind = 1; } out: if ((ret < 0) || (need_unwind == 1)) { local->op_ret = ret; pump_command_reply (frame, this); } return 0; } int pump_execute_status (call_frame_t *frame, xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; uint64_t number_files = 0; char filename[PATH_MAX]; char summary[PATH_MAX+256]; char *dict_str = NULL; int32_t op_ret = 0; int32_t op_errno = 0; dict_t *dict = NULL; int ret = -1; priv = this->private; pump_priv = priv->pump_private; LOCK (&pump_priv->resume_path_lock); { number_files = pump_priv->number_files_pumped; strncpy (filename, pump_priv->current_file, PATH_MAX); } UNLOCK (&pump_priv->resume_path_lock); dict_str = GF_CALLOC (1, PATH_MAX + 256, gf_afr_mt_char); if (!dict_str) { gf_log (this->name, GF_LOG_ERROR, "Out of memory"); op_ret = -1; op_errno = ENOMEM; goto out; } if (pump_priv->pump_finished) { snprintf (summary, PATH_MAX+256, "no_of_files=%"PRIu64, number_files); } else { snprintf (summary, PATH_MAX+256, "no_of_files=%"PRIu64":current_file=%s", number_files, filename); } snprintf (dict_str, PATH_MAX+256, "status=%d:%s", (pump_priv->pump_finished)?1:0, summary); dict = dict_new (); ret = dict_set_dynstr (dict, RB_PUMP_CMD_STATUS, dict_str); if (ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "dict_set_dynstr returned negative value"); } else { dict_str = NULL; } op_ret = 0; out: AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict, NULL); if (dict) dict_unref (dict); GF_FREE (dict_str); return 0; } int pump_execute_pause (call_frame_t *frame, xlator_t *this) { afr_local_t *local = NULL; local = frame->local; pump_change_state (this, PUMP_STATE_PAUSE); local->op_ret = 0; pump_command_reply (frame, this); return 0; } int pump_execute_start (call_frame_t *frame, xlator_t *this) { afr_private_t *priv = NULL; afr_local_t *local = NULL; int ret = 0; loc_t loc = {0}; priv = this->private; local = frame->local; if (!priv->root_inode) { gf_log (this->name, GF_LOG_ERROR, "Pump xlator cannot be started without an initial " "lookup"); ret = -1; goto out; } GF_ASSERT (priv->root_inode); afr_build_root_loc (this, &loc); STACK_WIND (frame, pump_cmd_start_getxattr_cbk, PUMP_SOURCE_CHILD(this), PUMP_SOURCE_CHILD(this)->fops->getxattr, &loc, PUMP_PATH, NULL); ret = 0; out: if (ret < 0) { local->op_ret = ret; pump_command_reply (frame, this); } loc_wipe (&loc); return 0; } static int pump_cleanup_helper (void *data) { call_frame_t *frame = data; pump_xattr_cleaner (frame, 0, frame->this, 0, 0, NULL); return 0; } static int pump_cleanup_done (int ret, call_frame_t *sync_frame, void *data) { STACK_DESTROY (sync_frame->root); return 0; } int pump_execute_commit (call_frame_t *frame, xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; afr_local_t *local = NULL; call_frame_t *sync_frame = NULL; int ret = 0; priv = this->private; pump_priv = priv->pump_private; local = frame->local; local->op_ret = 0; if (pump_priv->pump_finished) { pump_change_state (this, PUMP_STATE_COMMIT); sync_frame = create_frame (this, this->ctx->pool); ret = synctask_new (pump_priv->env, pump_cleanup_helper, pump_cleanup_done, sync_frame, frame); if (ret) { gf_log (this->name, GF_LOG_DEBUG, "Couldn't create " "synctask for cleaning up xattrs."); } } else { gf_log (this->name, GF_LOG_ERROR, "Commit can't proceed. " "Migration in progress"); local->op_ret = -1; local->op_errno = EINPROGRESS; pump_command_reply (frame, this); } return 0; } int pump_execute_abort (call_frame_t *frame, xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; afr_local_t *local = NULL; call_frame_t *sync_frame = NULL; int ret = 0; priv = this->private; pump_priv = priv->pump_private; local = frame->local; pump_change_state (this, PUMP_STATE_ABORT); LOCK (&pump_priv->resume_path_lock); { pump_priv->number_files_pumped = 0; pump_priv->current_file[0] = '\0'; } UNLOCK (&pump_priv->resume_path_lock); local->op_ret = 0; if (pump_priv->pump_finished) { sync_frame = create_frame (this, this->ctx->pool); ret = synctask_new (pump_priv->env, pump_cleanup_helper, pump_cleanup_done, sync_frame, frame); if (ret) { gf_log (this->name, GF_LOG_DEBUG, "Couldn't create " "synctask for cleaning up xattrs."); } } else { pump_priv->cleaner = fop_setxattr_cbk_stub (frame, pump_xattr_cleaner, 0, 0, NULL); } return 0; } gf_boolean_t pump_command_status (xlator_t *this, dict_t *dict) { char *cmd = NULL; int dict_ret = -1; int ret = _gf_true; dict_ret = dict_get_str (dict, RB_PUMP_CMD_STATUS, &cmd); if (dict_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Not a pump status command"); ret = _gf_false; goto out; } gf_log (this->name, GF_LOG_DEBUG, "Hit a pump command - status"); ret = _gf_true; out: return ret; } gf_boolean_t pump_command_pause (xlator_t *this, dict_t *dict) { char *cmd = NULL; int dict_ret = -1; int ret = _gf_true; dict_ret = dict_get_str (dict, RB_PUMP_CMD_PAUSE, &cmd); if (dict_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Not a pump pause command"); ret = _gf_false; goto out; } gf_log (this->name, GF_LOG_DEBUG, "Hit a pump command - pause"); ret = _gf_true; out: return ret; } gf_boolean_t pump_command_commit (xlator_t *this, dict_t *dict) { char *cmd = NULL; int dict_ret = -1; int ret = _gf_true; dict_ret = dict_get_str (dict, RB_PUMP_CMD_COMMIT, &cmd); if (dict_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Not a pump commit command"); ret = _gf_false; goto out; } gf_log (this->name, GF_LOG_DEBUG, "Hit a pump command - commit"); ret = _gf_true; out: return ret; } gf_boolean_t pump_command_abort (xlator_t *this, dict_t *dict) { char *cmd = NULL; int dict_ret = -1; int ret = _gf_true; dict_ret = dict_get_str (dict, RB_PUMP_CMD_ABORT, &cmd); if (dict_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Not a pump abort command"); ret = _gf_false; goto out; } gf_log (this->name, GF_LOG_DEBUG, "Hit a pump command - abort"); ret = _gf_true; out: return ret; } gf_boolean_t pump_command_start (xlator_t *this, dict_t *dict) { char *cmd = NULL; int dict_ret = -1; int ret = _gf_true; dict_ret = dict_get_str (dict, RB_PUMP_CMD_START, &cmd); if (dict_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Not a pump start command"); ret = _gf_false; goto out; } gf_log (this->name, GF_LOG_DEBUG, "Hit a pump command - start"); ret = _gf_true; out: return ret; } int pump_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, dict_t *xdata) { afr_private_t *priv = NULL; int op_errno = 0; int ret = 0; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_getxattr_cbk, FIRST_CHILD (this), (FIRST_CHILD (this))->fops->getxattr, loc, name, xdata); return 0; } if (name) { if (!strncmp (name, AFR_XATTR_PREFIX, strlen (AFR_XATTR_PREFIX))) { op_errno = ENODATA; ret = -1; goto out; } if (!strcmp (name, RB_PUMP_CMD_STATUS)) { gf_log (this->name, GF_LOG_DEBUG, "Hit pump command - status"); pump_execute_status (frame, this); goto out; } } afr_getxattr (frame, this, loc, name, xdata); out: if (ret < 0) AFR_STACK_UNWIND (getxattr, frame, -1, op_errno, NULL, NULL); return 0; } int pump_command_reply (call_frame_t *frame, xlator_t *this) { afr_local_t *local = NULL; local = frame->local; if (local->op_ret < 0) gf_log (this->name, GF_LOG_INFO, "Command failed"); else gf_log (this->name, GF_LOG_INFO, "Command succeeded"); AFR_STACK_UNWIND (setxattr, frame, local->op_ret, local->op_errno, NULL); return 0; } int pump_parse_command (call_frame_t *frame, xlator_t *this, dict_t *dict, int *op_errno_p) { afr_local_t *local = NULL; int ret = -1; int op_errno = 0; if (pump_command_start (this, dict)) { local = AFR_FRAME_INIT (frame, op_errno); if (!local) goto out; local->dict = dict_ref (dict); ret = pump_execute_start (frame, this); } else if (pump_command_pause (this, dict)) { local = AFR_FRAME_INIT (frame, op_errno); if (!local) goto out; local->dict = dict_ref (dict); ret = pump_execute_pause (frame, this); } else if (pump_command_abort (this, dict)) { local = AFR_FRAME_INIT (frame, op_errno); if (!local) goto out; local->dict = dict_ref (dict); ret = pump_execute_abort (frame, this); } else if (pump_command_commit (this, dict)) { local = AFR_FRAME_INIT (frame, op_errno); if (!local) goto out; local->dict = dict_ref (dict); ret = pump_execute_commit (frame, this); } out: if (op_errno_p) *op_errno_p = op_errno; return ret; } int pump_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, int32_t flags, dict_t *xdata) { afr_private_t *priv = NULL; int ret = -1; int op_errno = 0; GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.pump*", dict, op_errno, out); priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_setxattr_cbk, FIRST_CHILD (this), (FIRST_CHILD (this))->fops->setxattr, loc, dict, flags, xdata); return 0; } ret = pump_parse_command (frame, this, dict, &op_errno); if (ret >= 0) goto out; afr_setxattr (frame, this, loc, dict, flags, xdata); ret = 0; out: if (ret < 0) { AFR_STACK_UNWIND (setxattr, frame, -1, op_errno, NULL); } return 0; } /* Defaults */ static int32_t pump_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_lookup_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup, loc, xattr_req); return 0; } afr_lookup (frame, this, loc, xattr_req); return 0; } static int32_t pump_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_truncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); return 0; } afr_truncate (frame, this, loc, offset, xdata); return 0; } static int32_t pump_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_ftruncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); return 0; } afr_ftruncate (frame, this, fd, offset, xdata); return 0; } int pump_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_mknod_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata); return 0; } afr_mknod (frame, this, loc, mode, rdev, umask, xdata); return 0; } int pump_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_mkdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata); return 0; } afr_mkdir (frame, this, loc, mode, umask, xdata); return 0; } static int32_t pump_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_unlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata); return 0; } afr_unlink (frame, this, loc, xflag, xdata); return 0; } static int pump_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int flags, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_rmdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir, loc, flags, xdata); return 0; } afr_rmdir (frame, this, loc, flags, xdata); return 0; } int pump_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath, loc_t *loc, mode_t umask, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_symlink_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink, linkpath, loc, umask, xdata); return 0; } afr_symlink (frame, this, linkpath, loc, umask, xdata); return 0; } static int32_t pump_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_rename_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); return 0; } afr_rename (frame, this, oldloc, newloc, xdata); return 0; } static int32_t pump_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_link_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); return 0; } afr_link (frame, this, oldloc, newloc, xdata); return 0; } static int32_t pump_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_create_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd, xdata); return 0; } afr_create (frame, this, loc, flags, mode, umask, fd, xdata); return 0; } static int32_t pump_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, fd_t *fd, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_open_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata); return 0; } afr_open (frame, this, loc, flags, fd, xdata); return 0; } static int32_t pump_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t off, uint32_t flags, struct iobref *iobref, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_writev_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, off, flags, iobref, xdata); return 0; } afr_writev (frame, this, fd, vector, count, off, flags, iobref, xdata); return 0; } static int32_t pump_flush (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_flush_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->flush, fd, xdata); return 0; } afr_flush (frame, this, fd, xdata); return 0; } static int32_t pump_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_fsync_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync, fd, flags, xdata); return 0; } afr_fsync (frame, this, fd, flags, xdata); return 0; } static int32_t pump_opendir (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_opendir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->opendir, loc, fd, xdata); return 0; } afr_opendir (frame, this, loc, fd, xdata); return 0; } static int32_t pump_fsyncdir (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_fsyncdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsyncdir, fd, flags, xdata); return 0; } afr_fsyncdir (frame, this, fd, flags, xdata); return 0; } static int32_t pump_xattrop (call_frame_t *frame, xlator_t *this, loc_t *loc, gf_xattrop_flags_t flags, dict_t *dict, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_xattrop_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->xattrop, loc, flags, dict, xdata); return 0; } afr_xattrop (frame, this, loc, flags, dict, xdata); return 0; } static int32_t pump_fxattrop (call_frame_t *frame, xlator_t *this, fd_t *fd, gf_xattrop_flags_t flags, dict_t *dict, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_fxattrop_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fxattrop, fd, flags, dict, xdata); return 0; } afr_fxattrop (frame, this, fd, flags, dict, xdata); return 0; } static int32_t pump_removexattr (call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, dict_t *xdata) { afr_private_t *priv = NULL; int op_errno = -1; VALIDATE_OR_GOTO (this, out); GF_IF_NATIVE_XATTR_GOTO ("trusted.glusterfs.pump*", name, op_errno, out); op_errno = 0; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_removexattr_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->removexattr, loc, name, xdata); return 0; } afr_removexattr (frame, this, loc, name, xdata); out: if (op_errno) AFR_STACK_UNWIND (removexattr, frame, -1, op_errno, NULL); return 0; } static int32_t pump_readdir (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t off, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_readdir_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdir, fd, size, off, xdata); return 0; } afr_readdir (frame, this, fd, size, off, xdata); return 0; } static int32_t pump_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t off, dict_t *dict) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_readdirp_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, fd, size, off, dict); return 0; } afr_readdirp (frame, this, fd, size, off, dict); return 0; } static int32_t pump_releasedir (xlator_t *this, fd_t *fd) { afr_private_t *priv = NULL; priv = this->private; if (priv->use_afr_in_pump) afr_releasedir (this, fd); return 0; } static int32_t pump_release (xlator_t *this, fd_t *fd) { afr_private_t *priv = NULL; priv = this->private; if (priv->use_afr_in_pump) afr_release (this, fd); return 0; } static int32_t pump_forget (xlator_t *this, inode_t *inode) { afr_private_t *priv = NULL; priv = this->private; if (priv->use_afr_in_pump) afr_forget (this, inode); return 0; } static int32_t pump_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf, int32_t valid, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_setattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->setattr, loc, stbuf, valid, xdata); return 0; } afr_setattr (frame, this, loc, stbuf, valid, xdata); return 0; } static int32_t pump_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf, int32_t valid, dict_t *xdata) { afr_private_t *priv = NULL; priv = this->private; if (!priv->use_afr_in_pump) { STACK_WIND (frame, default_fsetattr_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; } afr_fsetattr (frame, this, fd, stbuf, valid, xdata); return 0; } /* End of defaults */ int32_t mem_acct_init (xlator_t *this) { int ret = -1; if (!this) return ret; ret = xlator_mem_acct_init (this, gf_afr_mt_end + 1); if (ret != 0) { gf_log(this->name, GF_LOG_ERROR, "Memory accounting init" "failed"); return ret; } return ret; } static int is_xlator_pump_sink (xlator_t *child) { return (child == PUMP_SINK_CHILD(THIS)); } static int is_xlator_pump_source (xlator_t *child) { return (child == PUMP_SOURCE_CHILD(THIS)); } int32_t notify (xlator_t *this, int32_t event, void *data, ...) { int ret = -1; xlator_t *child_xl = NULL; child_xl = (xlator_t *) data; ret = afr_notify (this, event, data, NULL); switch (event) { case GF_EVENT_CHILD_DOWN: if (is_xlator_pump_source (child_xl)) pump_change_state (this, PUMP_STATE_ABORT); break; case GF_EVENT_CHILD_UP: if (is_xlator_pump_sink (child_xl)) if (is_pump_start_pending (this)) { gf_log (this->name, GF_LOG_DEBUG, "about to start synctask"); ret = pump_start_synctask (this); if (ret < 0) gf_log (this->name, GF_LOG_DEBUG, "Could not start pump " "synctask"); else pump_remove_start_pending (this); } } return ret; } int32_t init (xlator_t *this) { afr_private_t * priv = NULL; pump_private_t *pump_priv = NULL; int child_count = 0; xlator_list_t * trav = NULL; int i = 0; int ret = -1; GF_UNUSED int op_errno = 0; int source_child = 0; if (!this->children) { gf_log (this->name, GF_LOG_ERROR, "pump translator needs a source and sink" "subvolumes defined."); return -1; } if (!this->parents) { gf_log (this->name, GF_LOG_WARNING, "Volume is dangling."); } priv = GF_CALLOC (1, sizeof (afr_private_t), gf_afr_mt_afr_private_t); if (!priv) goto out; LOCK_INIT (&priv->lock); child_count = xlator_subvolume_count (this); if (child_count != 2) { gf_log (this->name, GF_LOG_ERROR, "There should be exactly 2 children - one source " "and one sink"); return -1; } priv->child_count = child_count; priv->read_child = source_child; priv->favorite_child = source_child; priv->background_self_heal_count = 0; priv->data_self_heal = "on"; priv->metadata_self_heal = 1; priv->entry_self_heal = 1; priv->data_self_heal_window_size = 16; priv->data_change_log = 1; priv->metadata_change_log = 1; priv->entry_change_log = 1; priv->use_afr_in_pump = 1; priv->sh_readdir_size = 65536; /* Locking options */ /* Lock server count infact does not matter. Locks are held on all subvolumes, in this case being the source and the sink. */ priv->child_up = GF_CALLOC (sizeof (unsigned char), child_count, gf_afr_mt_char); if (!priv->child_up) { gf_log (this->name, GF_LOG_ERROR, "Out of memory."); op_errno = ENOMEM; goto out; } priv->children = GF_CALLOC (sizeof (xlator_t *), child_count, gf_afr_mt_xlator_t); if (!priv->children) { gf_log (this->name, GF_LOG_ERROR, "Out of memory."); op_errno = ENOMEM; goto out; } priv->pending_key = GF_CALLOC (sizeof (*priv->pending_key), child_count, gf_afr_mt_char); if (!priv->pending_key) { gf_log (this->name, GF_LOG_ERROR, "Out of memory."); op_errno = ENOMEM; goto out; } trav = this->children; i = 0; while (i < child_count) { priv->children[i] = trav->xlator; ret = gf_asprintf (&priv->pending_key[i], "%s.%s", AFR_XATTR_PREFIX, trav->xlator->name); if (-1 == ret) { gf_log (this->name, GF_LOG_ERROR, "asprintf failed to set pending key"); op_errno = ENOMEM; goto out; } trav = trav->next; i++; } ret = gf_asprintf (&priv->sh_domain, "%s-self-heal", this->name); if (-1 == ret) { op_errno = ENOMEM; goto out; } priv->root_inode = NULL; priv->last_event = GF_CALLOC (child_count, sizeof (*priv->last_event), gf_afr_mt_int32_t); if (!priv->last_event) { ret = -ENOMEM; goto out; } pump_priv = GF_CALLOC (1, sizeof (*pump_priv), gf_afr_mt_pump_priv); if (!pump_priv) { gf_log (this->name, GF_LOG_ERROR, "Out of memory"); op_errno = ENOMEM; goto out; } LOCK_INIT (&pump_priv->resume_path_lock); LOCK_INIT (&pump_priv->pump_state_lock); pump_priv->resume_path = GF_CALLOC (1, PATH_MAX, gf_afr_mt_char); if (!pump_priv->resume_path) { gf_log (this->name, GF_LOG_ERROR, "Out of memory"); ret = -1; goto out; } pump_priv->env = this->ctx->env; if (!pump_priv->env) { gf_log (this->name, GF_LOG_ERROR, "Could not create new sync-environment"); ret = -1; goto out; } /* keep more local here as we may need them for self-heal etc */ this->local_pool = mem_pool_new (afr_local_t, 128); if (!this->local_pool) { ret = -1; gf_log (this->name, GF_LOG_ERROR, "failed to create local_t's memory pool"); goto out; } priv->pump_private = pump_priv; pump_priv = NULL; this->private = priv; priv = NULL; pump_change_state (this, PUMP_STATE_ABORT); ret = 0; out: if (pump_priv) { GF_FREE (pump_priv->resume_path); LOCK_DESTROY (&pump_priv->resume_path_lock); LOCK_DESTROY (&pump_priv->pump_state_lock); GF_FREE (pump_priv); } if (priv) { GF_FREE (priv->child_up); GF_FREE (priv->children); GF_FREE (priv->pending_key); GF_FREE (priv->last_event); LOCK_DESTROY (&priv->lock); GF_FREE (priv); } return ret; } int fini (xlator_t *this) { afr_private_t * priv = NULL; pump_private_t *pump_priv = NULL; priv = this->private; this->private = NULL; if (!priv) goto out; pump_priv = priv->pump_private; if (!pump_priv) goto afr_priv; GF_FREE (pump_priv->resume_path); LOCK_DESTROY (&pump_priv->resume_path_lock); LOCK_DESTROY (&pump_priv->pump_state_lock); GF_FREE (pump_priv); afr_priv: afr_priv_destroy (priv); out: return 0; } struct xlator_fops fops = { .lookup = pump_lookup, .open = pump_open, .flush = pump_flush, .fsync = pump_fsync, .fsyncdir = pump_fsyncdir, .xattrop = pump_xattrop, .fxattrop = pump_fxattrop, .getxattr = pump_getxattr, /* inode write */ .writev = pump_writev, .truncate = pump_truncate, .ftruncate = pump_ftruncate, .setxattr = pump_setxattr, .setattr = pump_setattr, .fsetattr = pump_fsetattr, .removexattr = pump_removexattr, /* dir read */ .opendir = pump_opendir, .readdir = pump_readdir, .readdirp = pump_readdirp, /* dir write */ .create = pump_create, .mknod = pump_mknod, .mkdir = pump_mkdir, .unlink = pump_unlink, .rmdir = pump_rmdir, .link = pump_link, .symlink = pump_symlink, .rename = pump_rename, }; struct xlator_dumpops dumpops = { .priv = afr_priv_dump, }; struct xlator_cbks cbks = { .release = pump_release, .releasedir = pump_releasedir, .forget = pump_forget, }; struct volume_options options[] = { { .key = {NULL} }, };