diff options
Diffstat (limited to 'xlators/cluster/afr/src/pump.c')
-rw-r--r-- | xlators/cluster/afr/src/pump.c | 2472 |
1 files changed, 0 insertions, 2472 deletions
diff --git a/xlators/cluster/afr/src/pump.c b/xlators/cluster/afr/src/pump.c deleted file mode 100644 index d322a9d67b5..00000000000 --- a/xlators/cluster/afr/src/pump.c +++ /dev/null @@ -1,2472 +0,0 @@ -/* - Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> - This file is part of GlusterFS. - - This file is licensed to you under your choice of the GNU Lesser - General Public License, version 3 or any later version (LGPLv3 or - later), or the GNU General Public License, version 2 (GPLv2), in all - cases as published by the Free Software Foundation. -*/ - -#include <unistd.h> -#include <sys/time.h> -#include <stdlib.h> -#include <fnmatch.h> - -#include "afr-common.c" -#include "defaults.h" -#include "glusterfs.h" -#include "pump.h" -#include "afr-messages.h" - - -static int -afr_set_dict_gfid (dict_t *dict, uuid_t gfid) -{ - int ret = 0; - uuid_t *pgfid = NULL; - - GF_ASSERT (gfid); - - pgfid = GF_CALLOC (1, sizeof (uuid_t), gf_common_mt_char); - if (!pgfid) { - ret = -1; - goto out; - } - - gf_uuid_copy (*pgfid, gfid); - - ret = dict_set_dynptr (dict, "gfid-req", pgfid, sizeof (uuid_t)); - if (ret) - gf_msg (THIS->name, GF_LOG_ERROR, -ret, - AFR_MSG_DICT_SET_FAILED, "gfid set failed"); - -out: - if (ret && pgfid) - GF_FREE (pgfid); - return ret; -} - -static int -afr_set_root_gfid (dict_t *dict) -{ - uuid_t gfid; - int ret = 0; - - memset (gfid, 0, 16); - gfid[15] = 1; - - ret = afr_set_dict_gfid (dict, gfid); - - return ret; -} - -static int -afr_build_child_loc (xlator_t *this, loc_t *child, loc_t *parent, char *name) -{ - int ret = -1; - uuid_t pargfid = {0}; - - if (!child) - goto out; - - if (!gf_uuid_is_null (parent->inode->gfid)) - gf_uuid_copy (pargfid, parent->inode->gfid); - else if (!gf_uuid_is_null (parent->gfid)) - gf_uuid_copy (pargfid, parent->gfid); - - if (gf_uuid_is_null (pargfid)) - goto out; - - if (strcmp (parent->path, "/") == 0) - ret = gf_asprintf ((char **)&child->path, "/%s", name); - else - ret = gf_asprintf ((char **)&child->path, "%s/%s", parent->path, - name); - - if (-1 == ret) { - } - - child->name = strrchr (child->path, '/'); - if (child->name) - child->name++; - - child->parent = inode_ref (parent->inode); - child->inode = inode_new (parent->inode->table); - gf_uuid_copy (child->pargfid, pargfid); - - if (!child->inode) { - ret = -1; - goto out; - } - - ret = 0; -out: - if ((ret == -1) && child) - loc_wipe (child); - - return ret; -} - -static void -afr_build_root_loc (xlator_t *this, loc_t *loc) -{ - afr_private_t *priv = NULL; - - priv = this->private; - loc->path = gf_strdup ("/"); - loc->name = ""; - loc->inode = inode_ref (priv->root_inode); - gf_uuid_copy (loc->gfid, loc->inode->gfid); -} - -static void -afr_update_loc_gfids (loc_t *loc, struct iatt *buf, struct iatt *postparent) -{ - GF_ASSERT (loc); - GF_ASSERT (buf); - - gf_uuid_copy (loc->gfid, buf->ia_gfid); - if (postparent) - gf_uuid_copy (loc->pargfid, postparent->ia_gfid); -} - -static uint64_t pump_pid = 0; -static void -pump_fill_loc_info (loc_t *loc, struct iatt *iatt, struct iatt *parent) -{ - afr_update_loc_gfids (loc, iatt, parent); - gf_uuid_copy (loc->inode->gfid, iatt->ia_gfid); -} - -static int -pump_mark_start_pending (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - pump_priv->pump_start_pending = 1; - - return 0; -} - -static int -is_pump_start_pending (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - return (pump_priv->pump_start_pending); -} - -static int -pump_remove_start_pending (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - pump_priv->pump_start_pending = 0; - - return 0; -} - -static pump_state_t -pump_get_state () -{ - xlator_t *this = NULL; - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - pump_state_t ret; - - this = THIS; - priv = this->private; - pump_priv = priv->pump_private; - - LOCK (&pump_priv->pump_state_lock); - { - ret = pump_priv->pump_state; - } - UNLOCK (&pump_priv->pump_state_lock); - - return ret; -} - -int -pump_change_state (xlator_t *this, pump_state_t state) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - pump_state_t state_old; - pump_state_t state_new; - - - priv = this->private; - pump_priv = priv->pump_private; - - GF_ASSERT (pump_priv); - - LOCK (&pump_priv->pump_state_lock); - { - state_old = pump_priv->pump_state; - state_new = state; - - pump_priv->pump_state = state; - - } - UNLOCK (&pump_priv->pump_state_lock); - - gf_msg_debug (this->name, 0, - "Pump changing state from %d to %d", - state_old, state_new); - - return 0; -} - -static int -pump_set_resume_path (xlator_t *this, const char *path) -{ - int ret = 0; - - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - GF_ASSERT (pump_priv); - - LOCK (&pump_priv->resume_path_lock); - { - strncpy (pump_priv->resume_path, path, strlen (path) + 1); - } - UNLOCK (&pump_priv->resume_path_lock); - - return ret; -} - -static int -pump_save_path (xlator_t *this, const char *path) -{ - afr_private_t *priv = NULL; - pump_state_t state; - dict_t *dict = NULL; - loc_t loc = {0}; - int dict_ret = 0; - int ret = -1; - - state = pump_get_state (); - if (state == PUMP_STATE_RESUME) - return 0; - - priv = this->private; - - GF_ASSERT (priv->root_inode); - - afr_build_root_loc (this, &loc); - - dict = dict_new (); - dict_ret = dict_set_str (dict, PUMP_PATH, (char *)path); - if (dict_ret) - gf_msg (this->name, GF_LOG_WARNING, - -dict_ret, AFR_MSG_DICT_SET_FAILED, - "%s: failed to set the key %s", path, PUMP_PATH); - - ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0, NULL, - NULL); - - if (ret < 0) { - gf_msg (this->name, GF_LOG_INFO, -ret, AFR_MSG_INFO_COMMON, - "setxattr failed - could not save path=%s", path); - } else { - gf_msg_debug (this->name, 0, - "setxattr succeeded - saved path=%s", path); - } - - dict_unref (dict); - - loc_wipe (&loc); - return 0; -} - -static int -pump_check_and_update_status (xlator_t *this) -{ - pump_state_t state; - int ret = -1; - - state = pump_get_state (); - - switch (state) { - - case PUMP_STATE_RESUME: - case PUMP_STATE_RUNNING: - { - ret = 0; - break; - } - case PUMP_STATE_PAUSE: - { - ret = -1; - break; - } - case PUMP_STATE_ABORT: - { - pump_save_path (this, "/"); - ret = -1; - break; - } - default: - { - gf_msg_debug (this->name, 0, - "Unknown pump state"); - ret = -1; - break; - } - - } - - return ret; -} - -static const char * -pump_get_resume_path (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - const char *resume_path = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - resume_path = pump_priv->resume_path; - - return resume_path; -} - -static int -pump_update_resume_state (xlator_t *this, const char *path) -{ - pump_state_t state; - const char *resume_path = NULL; - - state = pump_get_state (); - - if (state == PUMP_STATE_RESUME) { - resume_path = pump_get_resume_path (this); - if (strcmp (resume_path, "/") == 0) { - gf_msg_debug (this->name, 0, - "Reached the resume path (/). Proceeding to change state" - " to running"); - - pump_change_state (this, PUMP_STATE_RUNNING); - } else if (strcmp (resume_path, path) == 0) { - gf_msg_debug (this->name, 0, - "Reached the resume path. Proceeding to change state" - " to running"); - - pump_change_state (this, PUMP_STATE_RUNNING); - } else { - gf_msg_debug (this->name, 0, - "Not yet hit the resume path:res-path=%s,path=%s", - resume_path, path); - } - } - - return 0; -} - -static gf_boolean_t -is_pump_traversal_allowed (xlator_t *this, const char *path) -{ - pump_state_t state; - const char *resume_path = NULL; - gf_boolean_t ret = _gf_true; - - state = pump_get_state (); - - if (state == PUMP_STATE_RESUME) { - resume_path = pump_get_resume_path (this); - if (strstr (resume_path, path)) { - gf_msg_debug (this->name, 0, - "On the right path to resumption path"); - ret = _gf_true; - } else { - gf_msg_debug (this->name, 0, - "Not the right path to resuming=> ignoring traverse"); - ret = _gf_false; - } - } - - return ret; -} - -static int -pump_save_file_stats (xlator_t *this, const char *path) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - LOCK (&pump_priv->resume_path_lock); - { - pump_priv->number_files_pumped++; - - strncpy (pump_priv->current_file, path, - PATH_MAX); - } - UNLOCK (&pump_priv->resume_path_lock); - - return 0; -} - -static int -gf_pump_traverse_directory (loc_t *loc) -{ - xlator_t *this = NULL; - fd_t *fd = NULL; - off_t offset = 0; - loc_t entry_loc = {0}; - gf_dirent_t *entry = NULL; - gf_dirent_t *tmp = NULL; - gf_dirent_t entries; - struct iatt iatt = {0}; - struct iatt parent = {0}; - dict_t *xattr_rsp = NULL; - int ret = 0; - gf_boolean_t is_directory_empty = _gf_true; - gf_boolean_t free_entries = _gf_false; - - INIT_LIST_HEAD (&entries.list); - this = THIS; - - GF_ASSERT (loc->inode); - - fd = fd_create (loc->inode, pump_pid); - if (!fd) { - gf_msg (this->name, GF_LOG_ERROR, 0, AFR_MSG_FD_CREATE_FAILED, - "Failed to create fd for %s", loc->path); - goto out; - } - - ret = syncop_opendir (this, loc, fd, NULL, NULL); - if (ret < 0) { - gf_msg_debug (this->name, 0, - "opendir failed on %s", loc->path); - goto out; - } - - gf_msg_trace (this->name, 0, - "pump opendir on %s returned=%d", - loc->path, ret); - - while (syncop_readdirp (this, fd, 131072, offset, &entries, NULL, - NULL)) { - free_entries = _gf_true; - - if (list_empty (&entries.list)) { - gf_msg_trace (this->name, 0, - "no more entries in directory"); - goto out; - } - - list_for_each_entry_safe (entry, tmp, &entries.list, list) { - gf_msg_debug (this->name, 0, - "found readdir entry=%s", entry->d_name); - - offset = entry->d_off; - if (gf_uuid_is_null (entry->d_stat.ia_gfid)) { - gf_msg (this->name, GF_LOG_WARNING, 0, - AFR_MSG_GFID_NULL, "%s/%s: No " - "gfid present skipping", - loc->path, entry->d_name); - continue; - } - loc_wipe (&entry_loc); - ret = afr_build_child_loc (this, &entry_loc, loc, - entry->d_name); - if (ret) - goto out; - - if ((strcmp (entry->d_name, ".") == 0) || - (strcmp (entry->d_name, "..") == 0)) - continue; - - is_directory_empty = _gf_false; - gf_msg_debug (this->name, 0, - "lookup %s => %"PRId64, - entry_loc.path, - iatt.ia_ino); - - ret = syncop_lookup (this, &entry_loc, &iatt, &parent, - NULL, &xattr_rsp); - - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, - -ret, AFR_MSG_INFO_COMMON, - "%s: lookup failed", entry_loc.path); - continue; - } - - ret = afr_selfheal_name (this, loc->gfid, entry->d_name, - NULL); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - AFR_MSG_SELF_HEAL_FAILED, - "%s: name self-heal failed (%s/%s)", - entry_loc.path, uuid_utoa (loc->gfid), - entry->d_name); - continue; - } - - ret = afr_selfheal (this, iatt.ia_gfid); - if (ret < 0) { - gf_msg (this->name, GF_LOG_ERROR, 0, - AFR_MSG_SELF_HEAL_FAILED, - "%s: self-heal failed (%s)", - entry_loc.path, - uuid_utoa (iatt.ia_gfid)); - continue; - } - - pump_fill_loc_info (&entry_loc, &iatt, &parent); - - pump_update_resume_state (this, entry_loc.path); - - pump_save_path (this, entry_loc.path); - pump_save_file_stats (this, entry_loc.path); - - ret = pump_check_and_update_status (this); - if (ret < 0) { - gf_msg_debug (this->name, 0, - "Pump beginning to exit out"); - goto out; - } - - if (IA_ISDIR (iatt.ia_type)) { - if (is_pump_traversal_allowed (this, entry_loc.path)) { - gf_msg_trace (this->name, 0, - "entering dir=%s", - entry->d_name); - gf_pump_traverse_directory (&entry_loc); - } - } - } - - gf_dirent_free (&entries); - free_entries = _gf_false; - gf_msg_trace (this->name, 0, "offset incremented to %d", - (int32_t) offset); - - } - - ret = syncop_close (fd); - if (ret < 0) - gf_msg_debug (this->name, 0, "closing the fd failed"); - - if (is_directory_empty && (strcmp (loc->path, "/") == 0)) { - pump_change_state (this, PUMP_STATE_RUNNING); - gf_msg (this->name, GF_LOG_INFO, 0, AFR_MSG_INFO_COMMON, - "Empty source brick. Nothing to be done."); - } - -out: - if (entry_loc.path) - loc_wipe (&entry_loc); - if (free_entries) - gf_dirent_free (&entries); - return 0; -} - -static int -pump_update_resume_path (xlator_t *this) -{ - const char *resume_path = NULL; - - resume_path = pump_get_resume_path (this); - - if (resume_path) { - gf_msg_debug (this->name, 0, - "Found a path to resume from: %s", - resume_path); - - }else { - gf_msg_debug (this->name, 0, - "Did not find a path=> setting to '/'"); - pump_set_resume_path (this, "/"); - } - - pump_change_state (this, PUMP_STATE_RESUME); - - return 0; -} - -static int32_t -pump_xattr_cleaner (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *xdata) -{ - afr_private_t *priv = NULL; - loc_t loc = {0}; - int i = 0; - int ret = 0; - int source = 0; - int sink = 1; - - priv = this->private; - - afr_build_root_loc (this, &loc); - - ret = syncop_removexattr (priv->children[source], &loc, - PUMP_PATH, 0, NULL); - - ret = syncop_removexattr (priv->children[sink], &loc, - PUMP_SINK_COMPLETE, 0, NULL); - - for (i = 0; i < priv->child_count; i++) { - ret = syncop_removexattr (priv->children[i], &loc, - PUMP_SOURCE_COMPLETE, 0, NULL); - if (ret) { - gf_msg_debug (this->name, 0, "removexattr " - "failed with %s", strerror (-ret)); - } - } - - loc_wipe (&loc); - return pump_command_reply (frame, this); -} - -static int -pump_complete_migration (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - dict_t *dict = NULL; - pump_state_t state; - loc_t loc = {0}; - int dict_ret = 0; - int ret = -1; - - priv = this->private; - pump_priv = priv->pump_private; - - GF_ASSERT (priv->root_inode); - - afr_build_root_loc (this, &loc); - - dict = dict_new (); - - state = pump_get_state (); - if (state == PUMP_STATE_RUNNING) { - gf_msg_debug (this->name, 0, - "Pump finished pumping"); - - pump_priv->pump_finished = _gf_true; - - dict_ret = dict_set_str (dict, PUMP_SOURCE_COMPLETE, "jargon"); - if (dict_ret) - gf_msg (this->name, GF_LOG_WARNING, -dict_ret, - AFR_MSG_DICT_SET_FAILED, - "%s: failed to set the key %s", - loc.path, PUMP_SOURCE_COMPLETE); - - ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0, - NULL, NULL); - if (ret < 0) { - gf_msg_debug (this->name, 0, - "setxattr failed - while " - "notifying source complete"); - } - dict_ret = dict_set_str (dict, PUMP_SINK_COMPLETE, "jargon"); - if (dict_ret) - gf_msg (this->name, GF_LOG_WARNING, -dict_ret, - AFR_MSG_DICT_SET_FAILED, - "%s: failed to set the key %s", - loc.path, PUMP_SINK_COMPLETE); - - ret = syncop_setxattr (PUMP_SINK_CHILD (this), &loc, dict, 0, - NULL, NULL); - if (ret < 0) { - gf_msg_debug (this->name, 0, - "setxattr failed - while " - "notifying sink complete"); - } - - pump_save_path (this, "/"); - - } else if (state == PUMP_STATE_ABORT) { - gf_msg_debug (this->name, 0, "Starting cleanup " - "of pump internal xattrs"); - call_resume (pump_priv->cleaner); - } - - loc_wipe (&loc); - return 0; -} - -static int -pump_lookup_sink (loc_t *loc) -{ - xlator_t *this = NULL; - struct iatt iatt, parent; - dict_t *xattr_rsp; - dict_t *xattr_req = NULL; - int ret = 0; - - this = THIS; - - xattr_req = dict_new (); - - ret = afr_set_root_gfid (xattr_req); - if (ret) - goto out; - - ret = syncop_lookup (PUMP_SINK_CHILD (this), loc, &iatt, &parent, - xattr_req, &xattr_rsp); - - if (ret) { - gf_msg_debug (this->name, 0, - "Lookup on sink child failed"); - ret = -1; - goto out; - } - -out: - if (xattr_req) - dict_unref (xattr_req); - - return ret; -} - -static int -pump_task (void *data) -{ - xlator_t *this = NULL; - afr_private_t *priv = NULL; - - - loc_t loc = {0}; - struct iatt iatt, parent; - dict_t *xattr_rsp = NULL; - dict_t *xattr_req = NULL; - - int ret = -1; - - this = THIS; - priv = this->private; - - GF_ASSERT (priv->root_inode); - - afr_build_root_loc (this, &loc); - xattr_req = dict_new (); - if (!xattr_req) { - gf_msg_debug (this->name, ENOMEM, - "Out of memory"); - ret = -1; - goto out; - } - - afr_set_root_gfid (xattr_req); - ret = syncop_lookup (this, &loc, &iatt, &parent, - xattr_req, &xattr_rsp); - - gf_msg_trace (this->name, 0, - "lookup: path=%s gfid=%s", - loc.path, uuid_utoa (loc.inode->gfid)); - - ret = pump_check_and_update_status (this); - if (ret < 0) { - goto out; - } - - pump_update_resume_path (this); - - afr_set_root_gfid (xattr_req); - ret = pump_lookup_sink (&loc); - if (ret) { - pump_update_resume_path (this); - goto out; - } - - gf_pump_traverse_directory (&loc); - - pump_complete_migration (this); -out: - if (xattr_req) - dict_unref (xattr_req); - - loc_wipe (&loc); - return 0; -} - - -static int -pump_task_completion (int ret, call_frame_t *sync_frame, void *data) -{ - xlator_t *this = NULL; - afr_private_t *priv = NULL; - - this = THIS; - - priv = this->private; - - inode_unref (priv->root_inode); - STACK_DESTROY (sync_frame->root); - - gf_msg_debug (this->name, 0, - "Pump xlator exiting"); - return 0; -} - -int -pump_start (call_frame_t *pump_frame, xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - int ret = -1; - - priv = this->private; - pump_priv = priv->pump_private; - - afr_set_lk_owner (pump_frame, this, pump_frame->root); - pump_pid = (uint64_t) (unsigned long)pump_frame->root; - - ret = synctask_new (pump_priv->env, pump_task, - pump_task_completion, - pump_frame, NULL); - if (ret == -1) { - goto out; - } - - gf_msg_debug (this->name, 0, - "setting pump as started lk_owner: %s %"PRIu64, - lkowner_utoa (&pump_frame->root->lk_owner), pump_pid); - - priv->use_afr_in_pump = 1; -out: - return ret; -} - -static int -pump_start_synctask (xlator_t *this) -{ - call_frame_t *frame = NULL; - int ret = 0; - - frame = create_frame (this, this->ctx->pool); - if (!frame) { - ret = -1; - goto out; - } - - pump_change_state (this, PUMP_STATE_RUNNING); - - ret = pump_start (frame, this); - -out: - return ret; -} - -int32_t -pump_cmd_start_setxattr_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, dict_t *xdata) - -{ - call_frame_t *prev = NULL; - afr_local_t *local = NULL; - int ret = 0; - - local = frame->local; - - if (op_ret < 0) { - gf_msg (this->name, GF_LOG_ERROR, 0, - AFR_MSG_INFO_COMMON, - "Could not initiate destination " - "brick connect"); - ret = op_ret; - goto out; - } - - gf_msg_debug (this->name, 0, - "Successfully initiated destination " - "brick connect"); - - pump_mark_start_pending (this); - - /* send the PARENT_UP as pump is ready now */ - prev = cookie; - if (prev && prev->this) - prev->this->notify (prev->this, GF_EVENT_PARENT_UP, this); - -out: - local->op_ret = ret; - pump_command_reply (frame, this); - - return 0; -} - -static int -pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) -{ - afr_local_t *local = NULL; - afr_private_t *priv = NULL; - dict_t *dict = NULL; - data_t *data = NULL; - char *clnt_cmd = NULL; - loc_t loc = {0}; - - int ret = 0; - - priv = this->private; - local = frame->local; - - GF_ASSERT (priv->root_inode); - - afr_build_root_loc (this, &loc); - - data = data_ref (dict_get (local->dict, RB_PUMP_CMD_START)); - if (!data) { - ret = -1; - gf_msg (this->name, GF_LOG_ERROR, ENOMEM, - AFR_MSG_DICT_GET_FAILED, - "Could not get destination brick value"); - goto out; - } - - dict = dict_new (); - if (!dict) { - ret = -1; - goto out; - } - - clnt_cmd = GF_CALLOC (1, data->len+1, gf_common_mt_char); - if (!clnt_cmd) { - ret = -1; - goto out; - } - - memcpy (clnt_cmd, data->data, data->len); - clnt_cmd[data->len] = '\0'; - gf_msg_debug (this->name, 0, "Got destination brick %s\n", - clnt_cmd); - - ret = dict_set_dynstr (dict, CLIENT_CMD_CONNECT, clnt_cmd); - if (ret < 0) { - gf_msg (this->name, GF_LOG_ERROR, -ret, - AFR_MSG_DICT_SET_FAILED, - "Could not inititiate destination brick " - "connect"); - goto out; - } - - STACK_WIND (frame, - pump_cmd_start_setxattr_cbk, - PUMP_SINK_CHILD(this), - PUMP_SINK_CHILD(this)->fops->setxattr, - &loc, - dict, - 0, NULL); - - ret = 0; - -out: - if (dict) - dict_unref (dict); - - if (data) - data_unref (data); - - if (ret && clnt_cmd) - GF_FREE (clnt_cmd); - - loc_wipe (&loc); - return ret; -} - -static int -is_pump_aborted (xlator_t *this) -{ - pump_state_t state; - - state = pump_get_state (); - - return ((state == PUMP_STATE_ABORT)); -} - -int32_t -pump_cmd_start_getxattr_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - dict_t *dict, dict_t *xdata) -{ - afr_local_t *local = NULL; - char *path = NULL; - - pump_state_t state; - int ret = 0; - int need_unwind = 0; - int dict_ret = -1; - - local = frame->local; - - if (op_ret < 0) { - gf_msg_debug (this->name, 0, - "getxattr failed - changing pump " - "state to RUNNING with '/'"); - path = "/"; - ret = op_ret; - } else { - gf_msg_trace (this->name, 0, - "getxattr succeeded"); - - dict_ret = dict_get_str (dict, PUMP_PATH, &path); - if (dict_ret < 0) - path = "/"; - } - - state = pump_get_state (); - if ((state == PUMP_STATE_RUNNING) || - (state == PUMP_STATE_RESUME)) { - gf_msg (this->name, GF_LOG_ERROR, - 0, AFR_MSG_PUMP_XLATOR_ERROR, - "Pump is already started"); - ret = -1; - goto out; - } - - pump_set_resume_path (this, path); - - if (is_pump_aborted (this)) - /* We're re-starting pump afresh */ - ret = pump_initiate_sink_connect (frame, this); - else { - /* We're re-starting pump from a previous - pause */ - gf_msg_debug (this->name, 0, - "about to start synctask"); - ret = pump_start_synctask (this); - need_unwind = 1; - } - -out: - if ((ret < 0) || (need_unwind == 1)) { - local->op_ret = ret; - pump_command_reply (frame, this); - } - return 0; -} - -int -pump_execute_status (call_frame_t *frame, xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - uint64_t number_files = 0; - - char filename[PATH_MAX]; - char summary[PATH_MAX+256]; - char *dict_str = NULL; - - int32_t op_ret = 0; - int32_t op_errno = 0; - - dict_t *dict = NULL; - int ret = -1; - - priv = this->private; - pump_priv = priv->pump_private; - - LOCK (&pump_priv->resume_path_lock); - { - number_files = pump_priv->number_files_pumped; - strncpy (filename, pump_priv->current_file, PATH_MAX); - } - UNLOCK (&pump_priv->resume_path_lock); - - dict_str = GF_CALLOC (1, PATH_MAX + 256, gf_afr_mt_char); - if (!dict_str) { - op_ret = -1; - op_errno = ENOMEM; - goto out; - } - - if (pump_priv->pump_finished) { - snprintf (summary, PATH_MAX+256, - "no_of_files=%"PRIu64, number_files); - } else { - snprintf (summary, PATH_MAX+256, - "no_of_files=%"PRIu64":current_file=%s", - number_files, filename); - } - snprintf (dict_str, PATH_MAX+256, "status=%d:%s", - (pump_priv->pump_finished)?1:0, summary); - - dict = dict_new (); - - ret = dict_set_dynstr (dict, RB_PUMP_CMD_STATUS, dict_str); - if (ret < 0) { - gf_msg_debug (this->name, 0, - "dict_set_dynstr returned negative value"); - } else { - dict_str = NULL; - } - - op_ret = 0; - -out: - - AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict, NULL); - - if (dict) - dict_unref (dict); - - GF_FREE (dict_str); - - return 0; -} - -int -pump_execute_pause (call_frame_t *frame, xlator_t *this) -{ - afr_local_t *local = NULL; - - local = frame->local; - - pump_change_state (this, PUMP_STATE_PAUSE); - - local->op_ret = 0; - pump_command_reply (frame, this); - - return 0; -} - -int -pump_execute_start (call_frame_t *frame, xlator_t *this) -{ - afr_private_t *priv = NULL; - afr_local_t *local = NULL; - - int ret = 0; - loc_t loc = {0}; - - priv = this->private; - local = frame->local; - - if (!priv->root_inode) { - gf_msg (this->name, GF_LOG_ERROR, - 0, AFR_MSG_PUMP_XLATOR_ERROR, - "Pump xlator cannot be started without an initial " - "lookup"); - ret = -1; - goto out; - } - - GF_ASSERT (priv->root_inode); - - afr_build_root_loc (this, &loc); - - STACK_WIND (frame, - pump_cmd_start_getxattr_cbk, - PUMP_SOURCE_CHILD(this), - PUMP_SOURCE_CHILD(this)->fops->getxattr, - &loc, - PUMP_PATH, NULL); - - ret = 0; - -out: - if (ret < 0) { - local->op_ret = ret; - pump_command_reply (frame, this); - } - - loc_wipe (&loc); - return 0; -} - -static int -pump_cleanup_helper (void *data) { - call_frame_t *frame = data; - - pump_xattr_cleaner (frame, 0, frame->this, 0, 0, NULL); - - return 0; -} - -static int -pump_cleanup_done (int ret, call_frame_t *sync_frame, void *data) -{ - STACK_DESTROY (sync_frame->root); - - return 0; -} - -int -pump_execute_commit (call_frame_t *frame, xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - afr_local_t *local = NULL; - call_frame_t *sync_frame = NULL; - int ret = 0; - - priv = this->private; - pump_priv = priv->pump_private; - local = frame->local; - - local->op_ret = 0; - if (pump_priv->pump_finished) { - pump_change_state (this, PUMP_STATE_COMMIT); - sync_frame = create_frame (this, this->ctx->pool); - ret = synctask_new (pump_priv->env, pump_cleanup_helper, - pump_cleanup_done, sync_frame, frame); - if (ret) { - gf_msg_debug (this->name, 0, "Couldn't create " - "synctask for cleaning up xattrs."); - } - - } else { - gf_msg (this->name, GF_LOG_ERROR, EINPROGRESS, - AFR_MSG_MIGRATION_IN_PROGRESS, - "Commit can't proceed. Migration in progress"); - local->op_ret = -1; - local->op_errno = EINPROGRESS; - pump_command_reply (frame, this); - } - - return 0; -} -int -pump_execute_abort (call_frame_t *frame, xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - afr_local_t *local = NULL; - call_frame_t *sync_frame = NULL; - int ret = 0; - - priv = this->private; - pump_priv = priv->pump_private; - local = frame->local; - - pump_change_state (this, PUMP_STATE_ABORT); - - LOCK (&pump_priv->resume_path_lock); - { - pump_priv->number_files_pumped = 0; - pump_priv->current_file[0] = '\0'; - } - UNLOCK (&pump_priv->resume_path_lock); - - local->op_ret = 0; - if (pump_priv->pump_finished) { - sync_frame = create_frame (this, this->ctx->pool); - ret = synctask_new (pump_priv->env, pump_cleanup_helper, - pump_cleanup_done, sync_frame, frame); - if (ret) { - gf_msg_debug (this->name, 0, "Couldn't create " - "synctask for cleaning up xattrs."); - } - - } else { - pump_priv->cleaner = fop_setxattr_cbk_stub (frame, - pump_xattr_cleaner, - 0, 0, NULL); - } - - return 0; -} - -gf_boolean_t -pump_command_status (xlator_t *this, dict_t *dict) -{ - char *cmd = NULL; - int dict_ret = -1; - int ret = _gf_true; - - dict_ret = dict_get_str (dict, RB_PUMP_CMD_STATUS, &cmd); - if (dict_ret < 0) { - gf_msg_debug (this->name, 0, - "Not a pump status command"); - ret = _gf_false; - goto out; - } - - gf_msg_debug (this->name, 0, - "Hit a pump command - status"); - ret = _gf_true; - -out: - return ret; - -} - -gf_boolean_t -pump_command_pause (xlator_t *this, dict_t *dict) -{ - char *cmd = NULL; - int dict_ret = -1; - int ret = _gf_true; - - dict_ret = dict_get_str (dict, RB_PUMP_CMD_PAUSE, &cmd); - if (dict_ret < 0) { - gf_msg_debug (this->name, 0, - "Not a pump pause command"); - ret = _gf_false; - goto out; - } - - gf_msg_debug (this->name, 0, - "Hit a pump command - pause"); - ret = _gf_true; - -out: - return ret; - -} - -gf_boolean_t -pump_command_commit (xlator_t *this, dict_t *dict) -{ - char *cmd = NULL; - int dict_ret = -1; - int ret = _gf_true; - - dict_ret = dict_get_str (dict, RB_PUMP_CMD_COMMIT, &cmd); - if (dict_ret < 0) { - gf_msg_debug (this->name, 0, - "Not a pump commit command"); - ret = _gf_false; - goto out; - } - - gf_msg_debug (this->name, 0, - "Hit a pump command - commit"); - ret = _gf_true; - -out: - return ret; - -} - -gf_boolean_t -pump_command_abort (xlator_t *this, dict_t *dict) -{ - char *cmd = NULL; - int dict_ret = -1; - int ret = _gf_true; - - dict_ret = dict_get_str (dict, RB_PUMP_CMD_ABORT, &cmd); - if (dict_ret < 0) { - gf_msg_debug (this->name, 0, - "Not a pump abort command"); - ret = _gf_false; - goto out; - } - - gf_msg_debug (this->name, 0, - "Hit a pump command - abort"); - ret = _gf_true; - -out: - return ret; - -} - -gf_boolean_t -pump_command_start (xlator_t *this, dict_t *dict) -{ - char *cmd = NULL; - int dict_ret = -1; - int ret = _gf_true; - - dict_ret = dict_get_str (dict, RB_PUMP_CMD_START, &cmd); - if (dict_ret < 0) { - gf_msg_debug (this->name, 0, - "Not a pump start command"); - ret = _gf_false; - goto out; - } - - gf_msg_debug (this->name, 0, - "Hit a pump command - start"); - ret = _gf_true; - -out: - return ret; - -} - -int -pump_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - const char *name, dict_t *xdata) -{ - afr_private_t *priv = NULL; - int op_errno = 0; - int ret = 0; - - priv = this->private; - - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, default_getxattr_cbk, - FIRST_CHILD (this), - (FIRST_CHILD (this))->fops->getxattr, - loc, name, xdata); - return 0; - } - - if (name) { - if (!strncmp (name, AFR_XATTR_PREFIX, - strlen (AFR_XATTR_PREFIX))) { - - op_errno = ENODATA; - ret = -1; - goto out; - } - - if (!strcmp (name, RB_PUMP_CMD_STATUS)) { - gf_msg_debug (this->name, 0, - "Hit pump command - status"); - pump_execute_status (frame, this); - goto out; - } - } - - afr_getxattr (frame, this, loc, name, xdata); - -out: - if (ret < 0) - AFR_STACK_UNWIND (getxattr, frame, -1, op_errno, NULL, NULL); - return 0; -} - -int -pump_command_reply (call_frame_t *frame, xlator_t *this) -{ - afr_local_t *local = NULL; - - local = frame->local; - - if (local->op_ret < 0) - gf_msg (this->name, GF_LOG_INFO, - 0, AFR_MSG_INFO_COMMON, - "Command failed"); - else - gf_msg (this->name, GF_LOG_INFO, - 0, AFR_MSG_INFO_COMMON, - "Command succeeded"); - - AFR_STACK_UNWIND (setxattr, - frame, - local->op_ret, - local->op_errno, NULL); - - return 0; -} - -int -pump_parse_command (call_frame_t *frame, xlator_t *this, dict_t *dict, - int *op_errno_p) -{ - afr_local_t *local = NULL; - int ret = -1; - int op_errno = 0; - - if (pump_command_start (this, dict)) { - local = AFR_FRAME_INIT (frame, op_errno); - if (!local) - goto out; - local->dict = dict_ref (dict); - ret = pump_execute_start (frame, this); - - } else if (pump_command_pause (this, dict)) { - local = AFR_FRAME_INIT (frame, op_errno); - if (!local) - goto out; - local->dict = dict_ref (dict); - ret = pump_execute_pause (frame, this); - - } else if (pump_command_abort (this, dict)) { - local = AFR_FRAME_INIT (frame, op_errno); - if (!local) - goto out; - local->dict = dict_ref (dict); - ret = pump_execute_abort (frame, this); - - } else if (pump_command_commit (this, dict)) { - local = AFR_FRAME_INIT (frame, op_errno); - if (!local) - goto out; - local->dict = dict_ref (dict); - ret = pump_execute_commit (frame, this); - } -out: - if (op_errno_p) - *op_errno_p = op_errno; - return ret; -} - -int -pump_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, - int32_t flags, dict_t *xdata) -{ - afr_private_t *priv = NULL; - int ret = -1; - int op_errno = 0; - - GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.pump*", dict, op_errno, out); - - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, default_setxattr_cbk, - FIRST_CHILD (this), - (FIRST_CHILD (this))->fops->setxattr, - loc, dict, flags, xdata); - return 0; - } - - ret = pump_parse_command (frame, this, dict, &op_errno); - if (ret >= 0) - goto out; - - afr_setxattr (frame, this, loc, dict, flags, xdata); - - ret = 0; -out: - if (ret < 0) { - AFR_STACK_UNWIND (setxattr, frame, -1, op_errno, NULL); - } - - return 0; -} - -/* Defaults */ -static int32_t -pump_lookup (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - dict_t *xattr_req) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_lookup_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->lookup, - loc, - xattr_req); - return 0; - } - - afr_lookup (frame, this, loc, xattr_req); - return 0; -} - - -static int32_t -pump_truncate (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - off_t offset, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_truncate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->truncate, - loc, - offset, xdata); - return 0; - } - - afr_truncate (frame, this, loc, offset, xdata); - return 0; -} - - -static int32_t -pump_ftruncate (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - off_t offset, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_ftruncate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->ftruncate, - fd, - offset, xdata); - return 0; - } - - afr_ftruncate (frame, this, fd, offset, xdata); - return 0; -} - - - - -int -pump_mknod (call_frame_t *frame, xlator_t *this, - loc_t *loc, mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, default_mknod_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->mknod, - loc, mode, rdev, umask, xdata); - return 0; - } - afr_mknod (frame, this, loc, mode, rdev, umask, xdata); - return 0; - -} - - - -int -pump_mkdir (call_frame_t *frame, xlator_t *this, - loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, default_mkdir_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->mkdir, - loc, mode, umask, xdata); - return 0; - } - afr_mkdir (frame, this, loc, mode, umask, xdata); - return 0; - -} - - -static int32_t -pump_unlink (call_frame_t *frame, - xlator_t *this, - loc_t *loc, int xflag, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_unlink_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->unlink, - loc, xflag, xdata); - return 0; - } - afr_unlink (frame, this, loc, xflag, xdata); - return 0; - -} - - -static int -pump_rmdir (call_frame_t *frame, xlator_t *this, - loc_t *loc, int flags, dict_t *xdata) -{ - afr_private_t *priv = NULL; - - priv = this->private; - - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, default_rmdir_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->rmdir, - loc, flags, xdata); - return 0; - } - - afr_rmdir (frame, this, loc, flags, xdata); - return 0; - -} - - - -int -pump_symlink (call_frame_t *frame, xlator_t *this, - const char *linkpath, loc_t *loc, mode_t umask, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, default_symlink_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->symlink, - linkpath, loc, umask, xdata); - return 0; - } - afr_symlink (frame, this, linkpath, loc, umask, xdata); - return 0; - -} - - -static int32_t -pump_rename (call_frame_t *frame, - xlator_t *this, - loc_t *oldloc, - loc_t *newloc, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_rename_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->rename, - oldloc, newloc, xdata); - return 0; - } - afr_rename (frame, this, oldloc, newloc, xdata); - return 0; - -} - - -static int32_t -pump_link (call_frame_t *frame, - xlator_t *this, - loc_t *oldloc, - loc_t *newloc, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_link_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->link, - oldloc, newloc, xdata); - return 0; - } - afr_link (frame, this, oldloc, newloc, xdata); - return 0; - -} - - -static int32_t -pump_create (call_frame_t *frame, xlator_t *this, - loc_t *loc, int32_t flags, mode_t mode, - mode_t umask, fd_t *fd, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, default_create_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->create, - loc, flags, mode, umask, fd, xdata); - return 0; - } - afr_create (frame, this, loc, flags, mode, umask, fd, xdata); - return 0; - -} - - -static int32_t -pump_open (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - int32_t flags, fd_t *fd, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_open_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->open, - loc, flags, fd, xdata); - return 0; - } - afr_open (frame, this, loc, flags, fd, xdata); - return 0; - -} - - -static int32_t -pump_writev (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - struct iovec *vector, - int32_t count, - off_t off, uint32_t flags, - struct iobref *iobref, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_writev_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->writev, - fd, - vector, - count, - off, flags, - iobref, xdata); - return 0; - } - - afr_writev (frame, this, fd, vector, count, off, flags, iobref, xdata); - return 0; -} - - -static int32_t -pump_flush (call_frame_t *frame, - xlator_t *this, - fd_t *fd, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_flush_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->flush, - fd, xdata); - return 0; - } - afr_flush (frame, this, fd, xdata); - return 0; - -} - - -static int32_t -pump_fsync (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - int32_t flags, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_fsync_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fsync, - fd, - flags, xdata); - return 0; - } - afr_fsync (frame, this, fd, flags, xdata); - return 0; - -} - - -static int32_t -pump_opendir (call_frame_t *frame, - xlator_t *this, - loc_t *loc, fd_t *fd, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_opendir_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->opendir, - loc, fd, xdata); - return 0; - } - afr_opendir (frame, this, loc, fd, xdata); - return 0; - -} - - -static int32_t -pump_fsyncdir (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - int32_t flags, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_fsyncdir_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fsyncdir, - fd, - flags, xdata); - return 0; - } - afr_fsyncdir (frame, this, fd, flags, xdata); - return 0; - -} - - -static int32_t -pump_xattrop (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - gf_xattrop_flags_t flags, - dict_t *dict, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_xattrop_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->xattrop, - loc, - flags, - dict, xdata); - return 0; - } - afr_xattrop (frame, this, loc, flags, dict, xdata); - return 0; - -} - -static int32_t -pump_fxattrop (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - gf_xattrop_flags_t flags, - dict_t *dict, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_fxattrop_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fxattrop, - fd, - flags, - dict, xdata); - return 0; - } - afr_fxattrop (frame, this, fd, flags, dict, xdata); - return 0; - -} - - -static int32_t -pump_removexattr (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - const char *name, dict_t *xdata) -{ - afr_private_t *priv = NULL; - int op_errno = -1; - - VALIDATE_OR_GOTO (this, out); - - GF_IF_NATIVE_XATTR_GOTO ("trusted.glusterfs.pump*", - name, op_errno, out); - - op_errno = 0; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_removexattr_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->removexattr, - loc, - name, xdata); - return 0; - } - afr_removexattr (frame, this, loc, name, xdata); - - out: - if (op_errno) - AFR_STACK_UNWIND (removexattr, frame, -1, op_errno, NULL); - return 0; - -} - - - -static int32_t -pump_readdir (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - size_t size, - off_t off, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_readdir_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->readdir, - fd, size, off, xdata); - return 0; - } - afr_readdir (frame, this, fd, size, off, xdata); - return 0; - -} - - -static int32_t -pump_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, - size_t size, off_t off, dict_t *dict) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_readdirp_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->readdirp, - fd, size, off, dict); - return 0; - } - afr_readdirp (frame, this, fd, size, off, dict); - return 0; - -} - - - -static int32_t -pump_releasedir (xlator_t *this, - fd_t *fd) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (priv->use_afr_in_pump) - afr_releasedir (this, fd); - return 0; - -} - -static int32_t -pump_release (xlator_t *this, - fd_t *fd) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (priv->use_afr_in_pump) - afr_release (this, fd); - return 0; - -} - -static int32_t -pump_forget (xlator_t *this, inode_t *inode) -{ - afr_private_t *priv = NULL; - - priv = this->private; - if (priv->use_afr_in_pump) - afr_forget (this, inode); - - return 0; -} - -static int32_t -pump_setattr (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - struct iatt *stbuf, - int32_t valid, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_setattr_cbk, - FIRST_CHILD (this), - FIRST_CHILD (this)->fops->setattr, - loc, stbuf, valid, xdata); - return 0; - } - afr_setattr (frame, this, loc, stbuf, valid, xdata); - return 0; - -} - - -static int32_t -pump_fsetattr (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - struct iatt *stbuf, - int32_t valid, dict_t *xdata) -{ - afr_private_t *priv = NULL; - priv = this->private; - if (!priv->use_afr_in_pump) { - STACK_WIND (frame, - default_fsetattr_cbk, - FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fsetattr, - fd, stbuf, valid, xdata); - return 0; - } - afr_fsetattr (frame, this, fd, stbuf, valid, xdata); - return 0; - -} - - -/* End of defaults */ - - -int32_t -mem_acct_init (xlator_t *this) -{ - int ret = -1; - - if (!this) - return ret; - - ret = xlator_mem_acct_init (this, gf_afr_mt_end + 1); - - if (ret != 0) { - return ret; - } - - return ret; -} - -static int -is_xlator_pump_sink (xlator_t *child) -{ - return (child == PUMP_SINK_CHILD(THIS)); -} - -static int -is_xlator_pump_source (xlator_t *child) -{ - return (child == PUMP_SOURCE_CHILD(THIS)); -} - -int32_t -notify (xlator_t *this, int32_t event, - void *data, ...) -{ - int ret = -1; - xlator_t *child_xl = NULL; - - child_xl = (xlator_t *) data; - - ret = afr_notify (this, event, data, NULL); - - switch (event) { - case GF_EVENT_CHILD_DOWN: - if (is_xlator_pump_source (child_xl)) - pump_change_state (this, PUMP_STATE_ABORT); - break; - - case GF_EVENT_CHILD_UP: - if (is_xlator_pump_sink (child_xl)) - if (is_pump_start_pending (this)) { - gf_msg_debug (this->name, 0, - "about to start synctask"); - ret = pump_start_synctask (this); - if (ret < 0) - gf_msg_debug (this->name, 0, - "Could not start pump " - "synctask"); - else - pump_remove_start_pending (this); - } - } - - return ret; -} - -int32_t -init (xlator_t *this) -{ - afr_private_t * priv = NULL; - pump_private_t *pump_priv = NULL; - int child_count = 0; - xlator_list_t * trav = NULL; - int i = 0; - int ret = -1; - GF_UNUSED int op_errno = 0; - - int source_child = 0; - - if (!this->children) { - gf_msg (this->name, GF_LOG_ERROR, - 0, AFR_MSG_CHILD_MISCONFIGURED, - "pump translator needs a source and sink" - "subvolumes defined."); - return -1; - } - - if (!this->parents) { - gf_msg (this->name, GF_LOG_WARNING, 0, - AFR_MSG_VOL_MISCONFIGURED, "Volume is dangling."); - } - - priv = GF_CALLOC (1, sizeof (afr_private_t), gf_afr_mt_afr_private_t); - if (!priv) - goto out; - - LOCK_INIT (&priv->lock); - - child_count = xlator_subvolume_count (this); - if (child_count != 2) { - gf_msg (this->name, GF_LOG_ERROR, - 0, AFR_MSG_CHILD_MISCONFIGURED, - "There should be exactly 2 children - one source " - "and one sink"); - LOCK_DESTROY (&priv->lock); - GF_FREE (priv); - return -1; - } - priv->child_count = child_count; - - priv->read_child = source_child; - priv->favorite_child = source_child; - priv->background_self_heal_count = 0; - - priv->data_self_heal = "on"; - priv->metadata_self_heal = 1; - priv->entry_self_heal = 1; - - priv->data_self_heal_window_size = 16; - - priv->data_change_log = 1; - priv->metadata_change_log = 1; - priv->entry_change_log = 1; - priv->use_afr_in_pump = 1; - priv->sh_readdir_size = 65536; - - /* Locking options */ - - /* Lock server count infact does not matter. Locks are held - on all subvolumes, in this case being the source - and the sink. - */ - - priv->child_up = GF_CALLOC (sizeof (unsigned char), child_count, - gf_afr_mt_char); - if (!priv->child_up) { - op_errno = ENOMEM; - goto out; - } - - priv->children = GF_CALLOC (sizeof (xlator_t *), child_count, - gf_afr_mt_xlator_t); - if (!priv->children) { - op_errno = ENOMEM; - goto out; - } - - priv->pending_key = GF_CALLOC (sizeof (*priv->pending_key), - child_count, - gf_afr_mt_char); - if (!priv->pending_key) { - op_errno = ENOMEM; - goto out; - } - - trav = this->children; - i = 0; - while (i < child_count) { - priv->children[i] = trav->xlator; - - ret = gf_asprintf (&priv->pending_key[i], "%s.%s", - AFR_XATTR_PREFIX, - trav->xlator->name); - if (-1 == ret) { - op_errno = ENOMEM; - goto out; - } - - trav = trav->next; - i++; - } - - ret = gf_asprintf (&priv->sh_domain, "%s-self-heal", this->name); - if (-1 == ret) { - op_errno = ENOMEM; - goto out; - } - - priv->root_inode = NULL; - - priv->last_event = GF_CALLOC (child_count, sizeof (*priv->last_event), - gf_afr_mt_int32_t); - if (!priv->last_event) { - ret = -ENOMEM; - goto out; - } - - pump_priv = GF_CALLOC (1, sizeof (*pump_priv), - gf_afr_mt_pump_priv); - if (!pump_priv) { - op_errno = ENOMEM; - goto out; - } - - LOCK_INIT (&pump_priv->resume_path_lock); - LOCK_INIT (&pump_priv->pump_state_lock); - - pump_priv->resume_path = GF_CALLOC (1, PATH_MAX, - gf_afr_mt_char); - if (!pump_priv->resume_path) { - ret = -1; - goto out; - } - - pump_priv->env = this->ctx->env; - if (!pump_priv->env) { - ret = -1; - goto out; - } - - /* keep more local here as we may need them for self-heal etc */ - this->local_pool = mem_pool_new (afr_local_t, 128); - if (!this->local_pool) { - ret = -1; - goto out; - } - - priv->pump_private = pump_priv; - pump_priv = NULL; - - this->private = priv; - priv = NULL; - - pump_change_state (this, PUMP_STATE_ABORT); - - ret = 0; -out: - - if (pump_priv) { - GF_FREE (pump_priv->resume_path); - LOCK_DESTROY (&pump_priv->resume_path_lock); - LOCK_DESTROY (&pump_priv->pump_state_lock); - GF_FREE (pump_priv); - } - - if (priv) { - GF_FREE (priv->child_up); - GF_FREE (priv->children); - GF_FREE (priv->pending_key); - GF_FREE (priv->last_event); - LOCK_DESTROY (&priv->lock); - GF_FREE (priv); - } - - return ret; -} - -int -fini (xlator_t *this) -{ - afr_private_t * priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - this->private = NULL; - if (!priv) - goto out; - - pump_priv = priv->pump_private; - if (!pump_priv) - goto afr_priv; - - GF_FREE (pump_priv->resume_path); - LOCK_DESTROY (&pump_priv->resume_path_lock); - LOCK_DESTROY (&pump_priv->pump_state_lock); - GF_FREE (pump_priv); -afr_priv: - afr_priv_destroy (priv); -out: - return 0; -} - - -struct xlator_fops fops = { - .lookup = pump_lookup, - .open = pump_open, - .flush = pump_flush, - .fsync = pump_fsync, - .fsyncdir = pump_fsyncdir, - .xattrop = pump_xattrop, - .fxattrop = pump_fxattrop, - .getxattr = pump_getxattr, - - /* inode write */ - .writev = pump_writev, - .truncate = pump_truncate, - .ftruncate = pump_ftruncate, - .setxattr = pump_setxattr, - .setattr = pump_setattr, - .fsetattr = pump_fsetattr, - .removexattr = pump_removexattr, - - /* dir read */ - .opendir = pump_opendir, - .readdir = pump_readdir, - .readdirp = pump_readdirp, - - /* dir write */ - .create = pump_create, - .mknod = pump_mknod, - .mkdir = pump_mkdir, - .unlink = pump_unlink, - .rmdir = pump_rmdir, - .link = pump_link, - .symlink = pump_symlink, - .rename = pump_rename, -}; - -struct xlator_dumpops dumpops = { - .priv = afr_priv_dump, -}; - - -struct xlator_cbks cbks = { - .release = pump_release, - .releasedir = pump_releasedir, - .forget = pump_forget, -}; - -struct volume_options options[] = { - { .key = {NULL} }, -}; |