diff options
Diffstat (limited to 'xlators/cluster/afr-v1/src/pump.c')
-rw-r--r-- | xlators/cluster/afr-v1/src/pump.c | 2663 |
1 files changed, 2663 insertions, 0 deletions
diff --git a/xlators/cluster/afr-v1/src/pump.c b/xlators/cluster/afr-v1/src/pump.c new file mode 100644 index 000000000..987696e55 --- /dev/null +++ b/xlators/cluster/afr-v1/src/pump.c @@ -0,0 +1,2663 @@ +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <unistd.h> +#include <sys/time.h> +#include <stdlib.h> +#include <fnmatch.h> + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "afr-common.c" +#include "defaults.c" +#include "glusterfs.h" + +static uint64_t pump_pid = 0; +static inline void +pump_fill_loc_info (loc_t *loc, struct iatt *iatt, struct iatt *parent) +{ + afr_update_loc_gfids (loc, iatt, parent); + uuid_copy (loc->inode->gfid, iatt->ia_gfid); +} + +static int +pump_mark_start_pending (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + pump_priv->pump_start_pending = 1; + + return 0; +} + +static int +is_pump_start_pending (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + return (pump_priv->pump_start_pending); +} + +static int +pump_remove_start_pending (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + pump_priv->pump_start_pending = 0; + + return 0; +} + +static pump_state_t +pump_get_state () +{ + xlator_t *this = NULL; + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + pump_state_t ret; + + this = THIS; + priv = this->private; + pump_priv = priv->pump_private; + + LOCK (&pump_priv->pump_state_lock); + { + ret = pump_priv->pump_state; + } + UNLOCK (&pump_priv->pump_state_lock); + + return ret; +} + +int +pump_change_state (xlator_t *this, pump_state_t state) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + pump_state_t state_old; + pump_state_t state_new; + + + priv = this->private; + pump_priv = priv->pump_private; + + GF_ASSERT (pump_priv); + + LOCK (&pump_priv->pump_state_lock); + { + state_old = pump_priv->pump_state; + state_new = state; + + pump_priv->pump_state = state; + + } + UNLOCK (&pump_priv->pump_state_lock); + + gf_log (this->name, GF_LOG_DEBUG, + "Pump changing state from %d to %d", + state_old, + state_new); + + return 0; +} + +static int +pump_set_resume_path (xlator_t *this, const char *path) +{ + int ret = 0; + + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + GF_ASSERT (pump_priv); + + LOCK (&pump_priv->resume_path_lock); + { + strncpy (pump_priv->resume_path, path, strlen (path) + 1); + } + UNLOCK (&pump_priv->resume_path_lock); + + return ret; +} + +static int +pump_save_path (xlator_t *this, const char *path) +{ + afr_private_t *priv = NULL; + pump_state_t state; + dict_t *dict = NULL; + loc_t loc = {0}; + int dict_ret = 0; + int ret = -1; + + state = pump_get_state (); + if (state == PUMP_STATE_RESUME) + return 0; + + priv = this->private; + + GF_ASSERT (priv->root_inode); + + afr_build_root_loc (this, &loc); + + dict = dict_new (); + dict_ret = dict_set_str (dict, PUMP_PATH, (char *)path); + if (dict_ret) + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to set the key %s", path, PUMP_PATH); + + ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); + + if (ret < 0) { + gf_log (this->name, GF_LOG_INFO, + "setxattr failed - could not save path=%s", path); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "setxattr succeeded - saved path=%s", path); + } + + dict_unref (dict); + + loc_wipe (&loc); + return 0; +} + +static int +pump_check_and_update_status (xlator_t *this) +{ + pump_state_t state; + int ret = -1; + + state = pump_get_state (); + + switch (state) { + + case PUMP_STATE_RESUME: + case PUMP_STATE_RUNNING: + { + ret = 0; + break; + } + case PUMP_STATE_PAUSE: + { + ret = -1; + break; + } + case PUMP_STATE_ABORT: + { + pump_save_path (this, "/"); + ret = -1; + break; + } + default: + { + gf_log (this->name, GF_LOG_DEBUG, + "Unknown pump state"); + ret = -1; + break; + } + + } + + return ret; +} + +static const char * +pump_get_resume_path (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + const char *resume_path = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + resume_path = pump_priv->resume_path; + + return resume_path; +} + +static int +pump_update_resume_state (xlator_t *this, const char *path) +{ + pump_state_t state; + const char *resume_path = NULL; + + state = pump_get_state (); + + if (state == PUMP_STATE_RESUME) { + resume_path = pump_get_resume_path (this); + if (strcmp (resume_path, "/") == 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Reached the resume path (/). Proceeding to change state" + " to running"); + pump_change_state (this, PUMP_STATE_RUNNING); + } else if (strcmp (resume_path, path) == 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Reached the resume path. Proceeding to change state" + " to running"); + pump_change_state (this, PUMP_STATE_RUNNING); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "Not yet hit the resume path:res-path=%s,path=%s", + resume_path, path); + } + } + + return 0; +} + +static gf_boolean_t +is_pump_traversal_allowed (xlator_t *this, const char *path) +{ + pump_state_t state; + const char *resume_path = NULL; + gf_boolean_t ret = _gf_true; + + state = pump_get_state (); + + if (state == PUMP_STATE_RESUME) { + resume_path = pump_get_resume_path (this); + if (strstr (resume_path, path)) { + gf_log (this->name, GF_LOG_DEBUG, + "On the right path to resumption path"); + ret = _gf_true; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "Not the right path to resuming=> ignoring traverse"); + ret = _gf_false; + } + } + + return ret; +} + +static int +pump_save_file_stats (xlator_t *this, const char *path) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + LOCK (&pump_priv->resume_path_lock); + { + pump_priv->number_files_pumped++; + + strncpy (pump_priv->current_file, path, + PATH_MAX); + } + UNLOCK (&pump_priv->resume_path_lock); + + return 0; +} + +static int +gf_pump_traverse_directory (loc_t *loc) +{ + xlator_t *this = NULL; + fd_t *fd = NULL; + off_t offset = 0; + loc_t entry_loc = {0}; + gf_dirent_t *entry = NULL; + gf_dirent_t *tmp = NULL; + gf_dirent_t entries; + struct iatt iatt = {0}; + struct iatt parent = {0}; + dict_t *xattr_rsp = NULL; + int ret = 0; + gf_boolean_t is_directory_empty = _gf_true; + gf_boolean_t free_entries = _gf_false; + + INIT_LIST_HEAD (&entries.list); + this = THIS; + + GF_ASSERT (loc->inode); + + fd = fd_create (loc->inode, pump_pid); + if (!fd) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to create fd for %s", loc->path); + goto out; + } + + ret = syncop_opendir (this, loc, fd); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "opendir failed on %s", loc->path); + goto out; + } + + gf_log (this->name, GF_LOG_TRACE, + "pump opendir on %s returned=%d", + loc->path, ret); + + while (syncop_readdirp (this, fd, 131072, offset, NULL, &entries)) { + free_entries = _gf_true; + + if (list_empty (&entries.list)) { + gf_log (this->name, GF_LOG_TRACE, + "no more entries in directory"); + goto out; + } + + list_for_each_entry_safe (entry, tmp, &entries.list, list) { + gf_log (this->name, GF_LOG_DEBUG, + "found readdir entry=%s", entry->d_name); + + offset = entry->d_off; + if (uuid_is_null (entry->d_stat.ia_gfid)) { + gf_log (this->name, GF_LOG_WARNING, "%s/%s: No " + "gfid present skipping", + loc->path, entry->d_name); + continue; + } + loc_wipe (&entry_loc); + ret = afr_build_child_loc (this, &entry_loc, loc, + entry->d_name); + if (ret) + goto out; + + if (!IS_ENTRY_CWD (entry->d_name) && + !IS_ENTRY_PARENT (entry->d_name)) { + + is_directory_empty = _gf_false; + gf_log (this->name, GF_LOG_DEBUG, + "lookup %s => %"PRId64, + entry_loc.path, + iatt.ia_ino); + + ret = syncop_lookup (this, &entry_loc, NULL, + &iatt, &xattr_rsp, &parent); + + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "%s: lookup failed", + entry_loc.path); + continue; + } + pump_fill_loc_info (&entry_loc, &iatt, + &parent); + + pump_update_resume_state (this, entry_loc.path); + + pump_save_path (this, entry_loc.path); + pump_save_file_stats (this, entry_loc.path); + + ret = pump_check_and_update_status (this); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Pump beginning to exit out"); + goto out; + } + + if (IA_ISDIR (iatt.ia_type)) { + if (is_pump_traversal_allowed (this, entry_loc.path)) { + gf_log (this->name, GF_LOG_TRACE, + "entering dir=%s", + entry->d_name); + gf_pump_traverse_directory (&entry_loc); + } + } + } + } + + gf_dirent_free (&entries); + free_entries = _gf_false; + gf_log (this->name, GF_LOG_TRACE, + "offset incremented to %d", + (int32_t ) offset); + + } + + ret = syncop_close (fd); + if (ret < 0) + gf_log (this->name, GF_LOG_DEBUG, "closing the fd failed"); + + if (is_directory_empty && IS_ROOT_PATH (loc->path)) { + pump_change_state (this, PUMP_STATE_RUNNING); + gf_log (this->name, GF_LOG_INFO, "Empty source brick. " + "Nothing to be done."); + } + +out: + if (entry_loc.path) + loc_wipe (&entry_loc); + if (free_entries) + gf_dirent_free (&entries); + return 0; +} + +static int +pump_update_resume_path (xlator_t *this) +{ + const char *resume_path = NULL; + + resume_path = pump_get_resume_path (this); + + if (resume_path) { + gf_log (this->name, GF_LOG_DEBUG, + "Found a path to resume from: %s", + resume_path); + + }else { + gf_log (this->name, GF_LOG_DEBUG, + "Did not find a path=> setting to '/'"); + pump_set_resume_path (this, "/"); + } + + pump_change_state (this, PUMP_STATE_RESUME); + + return 0; +} + +static int32_t +pump_xattr_cleaner (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + afr_private_t *priv = NULL; + loc_t loc = {0}; + int i = 0; + int ret = 0; + int source = 0; + int sink = 1; + + priv = this->private; + + afr_build_root_loc (this, &loc); + + ret = syncop_removexattr (priv->children[source], &loc, + PUMP_PATH, 0); + + ret = syncop_removexattr (priv->children[sink], &loc, + PUMP_SINK_COMPLETE, 0); + + for (i = 0; i < priv->child_count; i++) { + ret = syncop_removexattr (priv->children[i], &loc, + PUMP_SOURCE_COMPLETE, 0); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, "removexattr " + "failed with %s", strerror (-ret)); + } + } + + loc_wipe (&loc); + return pump_command_reply (frame, this); +} + +static int +pump_complete_migration (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + dict_t *dict = NULL; + pump_state_t state; + loc_t loc = {0}; + int dict_ret = 0; + int ret = -1; + + priv = this->private; + pump_priv = priv->pump_private; + + GF_ASSERT (priv->root_inode); + + afr_build_root_loc (this, &loc); + + dict = dict_new (); + + state = pump_get_state (); + if (state == PUMP_STATE_RUNNING) { + gf_log (this->name, GF_LOG_DEBUG, + "Pump finished pumping"); + + pump_priv->pump_finished = _gf_true; + + dict_ret = dict_set_str (dict, PUMP_SOURCE_COMPLETE, "jargon"); + if (dict_ret) + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to set the key %s", + loc.path, PUMP_SOURCE_COMPLETE); + + ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "setxattr failed - while notifying source complete"); + } + dict_ret = dict_set_str (dict, PUMP_SINK_COMPLETE, "jargon"); + if (dict_ret) + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to set the key %s", + loc.path, PUMP_SINK_COMPLETE); + + ret = syncop_setxattr (PUMP_SINK_CHILD (this), &loc, dict, 0); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "setxattr failed - while notifying sink complete"); + } + + pump_save_path (this, "/"); + + } else if (state == PUMP_STATE_ABORT) { + gf_log (this->name, GF_LOG_DEBUG, "Starting cleanup " + "of pump internal xattrs"); + call_resume (pump_priv->cleaner); + } + + loc_wipe (&loc); + return 0; +} + +static int +pump_lookup_sink (loc_t *loc) +{ + xlator_t *this = NULL; + struct iatt iatt, parent; + dict_t *xattr_rsp; + dict_t *xattr_req = NULL; + int ret = 0; + + this = THIS; + + xattr_req = dict_new (); + + ret = afr_set_root_gfid (xattr_req); + if (ret) + goto out; + + ret = syncop_lookup (PUMP_SINK_CHILD (this), loc, + xattr_req, &iatt, &xattr_rsp, &parent); + + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "Lookup on sink child failed"); + ret = -1; + goto out; + } + +out: + if (xattr_req) + dict_unref (xattr_req); + + return ret; +} + +static int +pump_task (void *data) +{ + xlator_t *this = NULL; + afr_private_t *priv = NULL; + + + loc_t loc = {0}; + struct iatt iatt, parent; + dict_t *xattr_rsp = NULL; + dict_t *xattr_req = NULL; + + int ret = -1; + + this = THIS; + priv = this->private; + + GF_ASSERT (priv->root_inode); + + afr_build_root_loc (this, &loc); + xattr_req = dict_new (); + if (!xattr_req) { + gf_log (this->name, GF_LOG_DEBUG, + "Out of memory"); + ret = -1; + goto out; + } + + afr_set_root_gfid (xattr_req); + ret = syncop_lookup (this, &loc, xattr_req, + &iatt, &xattr_rsp, &parent); + + gf_log (this->name, GF_LOG_TRACE, + "lookup: path=%s gfid=%s", + loc.path, uuid_utoa (loc.inode->gfid)); + + ret = pump_check_and_update_status (this); + if (ret < 0) { + goto out; + } + + pump_update_resume_path (this); + + afr_set_root_gfid (xattr_req); + ret = pump_lookup_sink (&loc); + if (ret) { + pump_update_resume_path (this); + goto out; + } + + gf_pump_traverse_directory (&loc); + + pump_complete_migration (this); +out: + if (xattr_req) + dict_unref (xattr_req); + + loc_wipe (&loc); + return 0; +} + + +static int +pump_task_completion (int ret, call_frame_t *sync_frame, void *data) +{ + xlator_t *this = NULL; + afr_private_t *priv = NULL; + + this = THIS; + + priv = this->private; + + inode_unref (priv->root_inode); + STACK_DESTROY (sync_frame->root); + + gf_log (this->name, GF_LOG_DEBUG, + "Pump xlator exiting"); + return 0; +} + +int +pump_start (call_frame_t *pump_frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + int ret = -1; + + priv = this->private; + pump_priv = priv->pump_private; + + afr_set_lk_owner (pump_frame, this, pump_frame->root); + pump_pid = (uint64_t) (unsigned long)pump_frame->root; + + ret = synctask_new (pump_priv->env, pump_task, + pump_task_completion, + pump_frame, NULL); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "starting pump failed"); + pump_change_state (this, PUMP_STATE_ABORT); + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "setting pump as started lk_owner: %s %"PRIu64, + lkowner_utoa (&pump_frame->root->lk_owner), pump_pid); + + priv->use_afr_in_pump = 1; +out: + return ret; +} + +static int +pump_start_synctask (xlator_t *this) +{ + call_frame_t *frame = NULL; + int ret = 0; + + frame = create_frame (this, this->ctx->pool); + if (!frame) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + ret = -1; + goto out; + } + + pump_change_state (this, PUMP_STATE_RUNNING); + + ret = pump_start (frame, this); + +out: + return ret; +} + +int32_t +pump_cmd_start_setxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, dict_t *xdata) + +{ + call_frame_t *prev = NULL; + afr_local_t *local = NULL; + int ret = 0; + + local = frame->local; + + if (op_ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Could not initiate destination " + "brick connect"); + ret = op_ret; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Successfully initiated destination " + "brick connect"); + + pump_mark_start_pending (this); + + /* send the PARENT_UP as pump is ready now */ + prev = cookie; + if (prev && prev->this) + prev->this->notify (prev->this, GF_EVENT_PARENT_UP, this); + +out: + local->op_ret = ret; + pump_command_reply (frame, this); + + return 0; +} + +static int +pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) +{ + afr_local_t *local = NULL; + afr_private_t *priv = NULL; + dict_t *dict = NULL; + data_t *data = NULL; + char *clnt_cmd = NULL; + loc_t loc = {0}; + + int ret = 0; + + priv = this->private; + local = frame->local; + + GF_ASSERT (priv->root_inode); + + afr_build_root_loc (this, &loc); + + data = data_ref (dict_get (local->dict, RB_PUMP_CMD_START)); + if (!data) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "Could not get destination brick value"); + goto out; + } + + dict = dict_new (); + if (!dict) { + ret = -1; + goto out; + } + + clnt_cmd = GF_CALLOC (1, data->len+1, gf_common_mt_char); + if (!clnt_cmd) { + ret = -1; + goto out; + } + + memcpy (clnt_cmd, data->data, data->len); + clnt_cmd[data->len] = '\0'; + gf_log (this->name, GF_LOG_DEBUG, "Got destination brick %s\n", + clnt_cmd); + + ret = dict_set_dynstr (dict, CLIENT_CMD_CONNECT, clnt_cmd); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Could not inititiate destination brick " + "connect"); + goto out; + } + + STACK_WIND (frame, + pump_cmd_start_setxattr_cbk, + PUMP_SINK_CHILD(this), + PUMP_SINK_CHILD(this)->fops->setxattr, + &loc, + dict, + 0, NULL); + + ret = 0; + +out: + if (dict) + dict_unref (dict); + + if (data) + data_unref (data); + + if (ret && clnt_cmd) + GF_FREE (clnt_cmd); + + loc_wipe (&loc); + return ret; +} + +static int +is_pump_aborted (xlator_t *this) +{ + pump_state_t state; + + state = pump_get_state (); + + return ((state == PUMP_STATE_ABORT)); +} + +int32_t +pump_cmd_start_getxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + dict_t *dict, dict_t *xdata) +{ + afr_local_t *local = NULL; + char *path = NULL; + + pump_state_t state; + int ret = 0; + int need_unwind = 0; + int dict_ret = -1; + + local = frame->local; + + if (op_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "getxattr failed - changing pump " + "state to RUNNING with '/'"); + path = "/"; + ret = op_ret; + } else { + gf_log (this->name, GF_LOG_TRACE, + "getxattr succeeded"); + + dict_ret = dict_get_str (dict, PUMP_PATH, &path); + if (dict_ret < 0) + path = "/"; + } + + state = pump_get_state (); + if ((state == PUMP_STATE_RUNNING) || + (state == PUMP_STATE_RESUME)) { + gf_log (this->name, GF_LOG_ERROR, + "Pump is already started"); + ret = -1; + goto out; + } + + pump_set_resume_path (this, path); + + if (is_pump_aborted (this)) + /* We're re-starting pump afresh */ + ret = pump_initiate_sink_connect (frame, this); + else { + /* We're re-starting pump from a previous + pause */ + gf_log (this->name, GF_LOG_DEBUG, + "about to start synctask"); + ret = pump_start_synctask (this); + need_unwind = 1; + } + +out: + if ((ret < 0) || (need_unwind == 1)) { + local->op_ret = ret; + pump_command_reply (frame, this); + } + return 0; +} + +int +pump_execute_status (call_frame_t *frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + uint64_t number_files = 0; + + char filename[PATH_MAX]; + char summary[PATH_MAX+256]; + char *dict_str = NULL; + + int32_t op_ret = 0; + int32_t op_errno = 0; + + dict_t *dict = NULL; + int ret = -1; + + priv = this->private; + pump_priv = priv->pump_private; + + LOCK (&pump_priv->resume_path_lock); + { + number_files = pump_priv->number_files_pumped; + strncpy (filename, pump_priv->current_file, PATH_MAX); + } + UNLOCK (&pump_priv->resume_path_lock); + + dict_str = GF_CALLOC (1, PATH_MAX + 256, gf_afr_mt_char); + if (!dict_str) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } + + if (pump_priv->pump_finished) { + snprintf (summary, PATH_MAX+256, + "no_of_files=%"PRIu64, number_files); + } else { + snprintf (summary, PATH_MAX+256, + "no_of_files=%"PRIu64":current_file=%s", + number_files, filename); + } + snprintf (dict_str, PATH_MAX+256, "status=%d:%s", + (pump_priv->pump_finished)?1:0, summary); + + dict = dict_new (); + + ret = dict_set_dynstr (dict, RB_PUMP_CMD_STATUS, dict_str); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "dict_set_dynstr returned negative value"); + } else { + dict_str = NULL; + } + + op_ret = 0; + +out: + + AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict, NULL); + + if (dict) + dict_unref (dict); + + GF_FREE (dict_str); + + return 0; +} + +int +pump_execute_pause (call_frame_t *frame, xlator_t *this) +{ + afr_local_t *local = NULL; + + local = frame->local; + + pump_change_state (this, PUMP_STATE_PAUSE); + + local->op_ret = 0; + pump_command_reply (frame, this); + + return 0; +} + +int +pump_execute_start (call_frame_t *frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + afr_local_t *local = NULL; + + int ret = 0; + loc_t loc = {0}; + + priv = this->private; + local = frame->local; + + if (!priv->root_inode) { + gf_log (this->name, GF_LOG_ERROR, + "Pump xlator cannot be started without an initial " + "lookup"); + ret = -1; + goto out; + } + + GF_ASSERT (priv->root_inode); + + afr_build_root_loc (this, &loc); + + STACK_WIND (frame, + pump_cmd_start_getxattr_cbk, + PUMP_SOURCE_CHILD(this), + PUMP_SOURCE_CHILD(this)->fops->getxattr, + &loc, + PUMP_PATH, NULL); + + ret = 0; + +out: + if (ret < 0) { + local->op_ret = ret; + pump_command_reply (frame, this); + } + + loc_wipe (&loc); + return 0; +} + +static int +pump_cleanup_helper (void *data) { + call_frame_t *frame = data; + + pump_xattr_cleaner (frame, 0, frame->this, 0, 0, NULL); + + return 0; +} + +static int +pump_cleanup_done (int ret, call_frame_t *sync_frame, void *data) +{ + STACK_DESTROY (sync_frame->root); + + return 0; +} + +int +pump_execute_commit (call_frame_t *frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + afr_local_t *local = NULL; + call_frame_t *sync_frame = NULL; + int ret = 0; + + priv = this->private; + pump_priv = priv->pump_private; + local = frame->local; + + local->op_ret = 0; + if (pump_priv->pump_finished) { + pump_change_state (this, PUMP_STATE_COMMIT); + sync_frame = create_frame (this, this->ctx->pool); + ret = synctask_new (pump_priv->env, pump_cleanup_helper, + pump_cleanup_done, sync_frame, frame); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, "Couldn't create " + "synctask for cleaning up xattrs."); + } + + } else { + gf_log (this->name, GF_LOG_ERROR, "Commit can't proceed. " + "Migration in progress"); + local->op_ret = -1; + local->op_errno = EINPROGRESS; + pump_command_reply (frame, this); + } + + return 0; +} +int +pump_execute_abort (call_frame_t *frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + afr_local_t *local = NULL; + call_frame_t *sync_frame = NULL; + int ret = 0; + + priv = this->private; + pump_priv = priv->pump_private; + local = frame->local; + + pump_change_state (this, PUMP_STATE_ABORT); + + LOCK (&pump_priv->resume_path_lock); + { + pump_priv->number_files_pumped = 0; + pump_priv->current_file[0] = '\0'; + } + UNLOCK (&pump_priv->resume_path_lock); + + local->op_ret = 0; + if (pump_priv->pump_finished) { + sync_frame = create_frame (this, this->ctx->pool); + ret = synctask_new (pump_priv->env, pump_cleanup_helper, + pump_cleanup_done, sync_frame, frame); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, "Couldn't create " + "synctask for cleaning up xattrs."); + } + + } else { + pump_priv->cleaner = fop_setxattr_cbk_stub (frame, + pump_xattr_cleaner, + 0, 0, NULL); + } + + return 0; +} + +gf_boolean_t +pump_command_status (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, RB_PUMP_CMD_STATUS, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump status command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - status"); + ret = _gf_true; + +out: + return ret; + +} + +gf_boolean_t +pump_command_pause (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, RB_PUMP_CMD_PAUSE, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump pause command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - pause"); + ret = _gf_true; + +out: + return ret; + +} + +gf_boolean_t +pump_command_commit (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, RB_PUMP_CMD_COMMIT, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump commit command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - commit"); + ret = _gf_true; + +out: + return ret; + +} + +gf_boolean_t +pump_command_abort (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, RB_PUMP_CMD_ABORT, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump abort command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - abort"); + ret = _gf_true; + +out: + return ret; + +} + +gf_boolean_t +pump_command_start (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, RB_PUMP_CMD_START, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump start command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - start"); + ret = _gf_true; + +out: + return ret; + +} + +struct _xattr_key { + char *key; + struct list_head list; +}; + +static int +__gather_xattr_keys (dict_t *dict, char *key, data_t *value, + void *data) +{ + struct list_head * list = data; + struct _xattr_key * xkey = NULL; + + if (!strncmp (key, AFR_XATTR_PREFIX, + strlen (AFR_XATTR_PREFIX))) { + + xkey = GF_CALLOC (1, sizeof (*xkey), gf_afr_mt_xattr_key); + if (!xkey) + return -1; + + xkey->key = key; + INIT_LIST_HEAD (&xkey->list); + + list_add_tail (&xkey->list, list); + } + return 0; +} + +static void +__filter_xattrs (dict_t *dict) +{ + struct list_head keys; + + struct _xattr_key *key; + struct _xattr_key *tmp; + + INIT_LIST_HEAD (&keys); + + dict_foreach (dict, __gather_xattr_keys, + (void *) &keys); + + list_for_each_entry_safe (key, tmp, &keys, list) { + dict_del (dict, key->key); + + list_del_init (&key->list); + + GF_FREE (key); + } +} + +int32_t +pump_getxattr_cbk (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, int32_t op_errno, + dict_t *dict, dict_t *xdata) +{ + afr_private_t *priv = NULL; + afr_local_t *local = NULL; + xlator_t **children = NULL; + int unwind = 1; + int32_t *last_index = NULL; + int32_t next_call_child = -1; + int32_t read_child = -1; + int32_t *fresh_children = NULL; + + + priv = this->private; + children = priv->children; + + local = frame->local; + + read_child = (long) cookie; + + if (op_ret == -1) { + last_index = &local->cont.getxattr.last_index; + fresh_children = local->fresh_children; + next_call_child = afr_next_call_child (fresh_children, + local->child_up, + priv->child_count, + last_index, read_child); + if (next_call_child < 0) + goto out; + + unwind = 0; + STACK_WIND_COOKIE (frame, pump_getxattr_cbk, + (void *) (long) read_child, + children[next_call_child], + children[next_call_child]->fops->getxattr, + &local->loc, + local->cont.getxattr.name, NULL); + } + +out: + if (unwind) { + if (op_ret >= 0 && dict) + __filter_xattrs (dict); + + AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict, NULL); + } + + return 0; +} + +int32_t +pump_getxattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, const char *name, dict_t *xdata) +{ + afr_private_t * priv = NULL; + xlator_t ** children = NULL; + int call_child = 0; + afr_local_t *local = NULL; + int32_t ret = -1; + int32_t op_errno = 0; + uint64_t read_child = 0; + + + VALIDATE_OR_GOTO (frame, out); + VALIDATE_OR_GOTO (this, out); + VALIDATE_OR_GOTO (this->private, out); + + priv = this->private; + VALIDATE_OR_GOTO (priv->children, out); + + children = priv->children; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_getxattr_cbk, + FIRST_CHILD (this), + (FIRST_CHILD (this))->fops->getxattr, + loc, name, xdata); + return 0; + } + + + AFR_LOCAL_ALLOC_OR_GOTO (frame->local, out); + local = frame->local; + + ret = afr_local_init (local, priv, &op_errno); + if (ret < 0) + goto out; + + if (name) { + if (!strncmp (name, AFR_XATTR_PREFIX, + strlen (AFR_XATTR_PREFIX))) { + + op_errno = ENODATA; + goto out; + } + + if (!strcmp (name, RB_PUMP_CMD_STATUS)) { + gf_log (this->name, GF_LOG_DEBUG, + "Hit pump command - status"); + pump_execute_status (frame, this); + ret = 0; + goto out; + } + } + + local->fresh_children = GF_CALLOC (priv->child_count, + sizeof (*local->fresh_children), + gf_afr_mt_int32_t); + if (!local->fresh_children) { + ret = -1; + op_errno = ENOMEM; + goto out; + } + + read_child = afr_inode_get_read_ctx (this, loc->inode, local->fresh_children); + ret = afr_get_call_child (this, local->child_up, read_child, + local->fresh_children, + &call_child, + &local->cont.getxattr.last_index); + if (ret < 0) { + op_errno = -ret; + goto out; + } + loc_copy (&local->loc, loc); + if (name) + local->cont.getxattr.name = gf_strdup (name); + + STACK_WIND_COOKIE (frame, pump_getxattr_cbk, + (void *) (long) call_child, + children[call_child], children[call_child]->fops->getxattr, + loc, name, xdata); + + ret = 0; +out: + if (ret < 0) + AFR_STACK_UNWIND (getxattr, frame, -1, op_errno, NULL, NULL); + return 0; +} + +static int +afr_setxattr_unwind (call_frame_t *frame, xlator_t *this) +{ + afr_local_t * local = NULL; + call_frame_t *main_frame = NULL; + + local = frame->local; + + LOCK (&frame->lock); + { + if (local->transaction.main_frame) + main_frame = local->transaction.main_frame; + local->transaction.main_frame = NULL; + } + UNLOCK (&frame->lock); + + if (main_frame) { + AFR_STACK_UNWIND (setxattr, main_frame, + local->op_ret, local->op_errno, NULL); + } + return 0; +} + +static int +afr_setxattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + afr_local_t * local = NULL; + afr_private_t * priv = NULL; + + int call_count = -1; + int need_unwind = 0; + + local = frame->local; + priv = this->private; + + LOCK (&frame->lock); + { + if (op_ret != -1) { + if (local->success_count == 0) { + local->op_ret = op_ret; + } + local->success_count++; + + if (local->success_count == priv->child_count) { + need_unwind = 1; + } + } + + local->op_errno = op_errno; + } + UNLOCK (&frame->lock); + + if (need_unwind) + local->transaction.unwind (frame, this); + + call_count = afr_frame_return (frame); + + if (call_count == 0) { + local->transaction.resume (frame, this); + } + + return 0; +} + +static int +afr_setxattr_wind (call_frame_t *frame, xlator_t *this) +{ + afr_local_t *local = NULL; + afr_private_t *priv = NULL; + + int call_count = -1; + int i = 0; + + local = frame->local; + priv = this->private; + + call_count = afr_up_children_count (local->child_up, priv->child_count); + + if (call_count == 0) { + local->transaction.resume (frame, this); + return 0; + } + + local->call_count = call_count; + + for (i = 0; i < priv->child_count; i++) { + if (local->child_up[i]) { + STACK_WIND_COOKIE (frame, afr_setxattr_wind_cbk, + (void *) (long) i, + priv->children[i], + priv->children[i]->fops->setxattr, + &local->loc, + local->cont.setxattr.dict, + local->cont.setxattr.flags, NULL); + + if (!--call_count) + break; + } + } + + return 0; +} + + +static int +afr_setxattr_done (call_frame_t *frame, xlator_t *this) +{ + afr_local_t * local = frame->local; + + local->transaction.unwind (frame, this); + + AFR_STACK_DESTROY (frame); + + return 0; +} + +int32_t +pump_setxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, dict_t *xdata) +{ + AFR_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); + return 0; +} + +int +pump_command_reply (call_frame_t *frame, xlator_t *this) +{ + afr_local_t *local = NULL; + + local = frame->local; + + if (local->op_ret < 0) + gf_log (this->name, GF_LOG_INFO, + "Command failed"); + else + gf_log (this->name, GF_LOG_INFO, + "Command succeeded"); + + AFR_STACK_UNWIND (setxattr, + frame, + local->op_ret, + local->op_errno, NULL); + + return 0; +} + +int +pump_parse_command (call_frame_t *frame, xlator_t *this, + afr_local_t *local, dict_t *dict) +{ + + int ret = -1; + + if (pump_command_start (this, dict)) { + frame->local = local; + local->dict = dict_ref (dict); + ret = pump_execute_start (frame, this); + + } else if (pump_command_pause (this, dict)) { + frame->local = local; + local->dict = dict_ref (dict); + ret = pump_execute_pause (frame, this); + + } else if (pump_command_abort (this, dict)) { + frame->local = local; + local->dict = dict_ref (dict); + ret = pump_execute_abort (frame, this); + + } else if (pump_command_commit (this, dict)) { + frame->local = local; + local->dict = dict_ref (dict); + ret = pump_execute_commit (frame, this); + } + return ret; +} + +int +pump_setxattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, dict_t *dict, int32_t flags, dict_t *xdata) +{ + afr_private_t * priv = NULL; + afr_local_t * local = NULL; + call_frame_t *transaction_frame = NULL; + int ret = -1; + int op_errno = 0; + + VALIDATE_OR_GOTO (frame, out); + VALIDATE_OR_GOTO (this, out); + VALIDATE_OR_GOTO (this->private, out); + + GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.pump*", dict, + op_errno, out); + + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_setxattr_cbk, + FIRST_CHILD (this), + (FIRST_CHILD (this))->fops->setxattr, + loc, dict, flags, xdata); + return 0; + } + + + AFR_LOCAL_ALLOC_OR_GOTO (local, out); + + ret = afr_local_init (local, priv, &op_errno); + if (ret < 0) { + afr_local_cleanup (local, this); + mem_put (local); + goto out; + } + + ret = pump_parse_command (frame, this, + local, dict); + if (ret >= 0) { + ret = 0; + goto out; + } + + transaction_frame = copy_frame (frame); + if (!transaction_frame) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory."); + op_errno = ENOMEM; + ret = -1; + afr_local_cleanup (local, this); + goto out; + } + + transaction_frame->local = local; + + local->op_ret = -1; + + local->cont.setxattr.dict = dict_ref (dict); + local->cont.setxattr.flags = flags; + + local->transaction.fop = afr_setxattr_wind; + local->transaction.done = afr_setxattr_done; + local->transaction.unwind = afr_setxattr_unwind; + + loc_copy (&local->loc, loc); + + local->transaction.main_frame = frame; + local->transaction.start = LLONG_MAX - 1; + local->transaction.len = 0; + + afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION); + + ret = 0; +out: + if (ret < 0) { + if (transaction_frame) + AFR_STACK_DESTROY (transaction_frame); + AFR_STACK_UNWIND (setxattr, frame, -1, op_errno, NULL); + } + + return 0; +} + +/* Defaults */ +static int32_t +pump_lookup (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + dict_t *xattr_req) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_lookup_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, + loc, + xattr_req); + return 0; + } + + afr_lookup (frame, this, loc, xattr_req); + return 0; +} + + +static int32_t +pump_truncate (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + off_t offset, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_truncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, + loc, + offset, xdata); + return 0; + } + + afr_truncate (frame, this, loc, offset, xdata); + return 0; +} + + +static int32_t +pump_ftruncate (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + off_t offset, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_ftruncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ftruncate, + fd, + offset, xdata); + return 0; + } + + afr_ftruncate (frame, this, fd, offset, xdata); + return 0; +} + + + + +int +pump_mknod (call_frame_t *frame, xlator_t *this, + loc_t *loc, mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_mknod_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->mknod, + loc, mode, rdev, umask, xdata); + return 0; + } + afr_mknod (frame, this, loc, mode, rdev, umask, xdata); + return 0; + +} + + + +int +pump_mkdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_mkdir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->mkdir, + loc, mode, umask, xdata); + return 0; + } + afr_mkdir (frame, this, loc, mode, umask, xdata); + return 0; + +} + + +static int32_t +pump_unlink (call_frame_t *frame, + xlator_t *this, + loc_t *loc, int xflag, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_unlink_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->unlink, + loc, xflag, xdata); + return 0; + } + afr_unlink (frame, this, loc, xflag, xdata); + return 0; + +} + + +static int +pump_rmdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, int flags, dict_t *xdata) +{ + afr_private_t *priv = NULL; + + priv = this->private; + + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_rmdir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->rmdir, + loc, flags, xdata); + return 0; + } + + afr_rmdir (frame, this, loc, flags, xdata); + return 0; + +} + + + +int +pump_symlink (call_frame_t *frame, xlator_t *this, + const char *linkpath, loc_t *loc, mode_t umask, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_symlink_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->symlink, + linkpath, loc, umask, xdata); + return 0; + } + afr_symlink (frame, this, linkpath, loc, umask, xdata); + return 0; + +} + + +static int32_t +pump_rename (call_frame_t *frame, + xlator_t *this, + loc_t *oldloc, + loc_t *newloc, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_rename_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->rename, + oldloc, newloc, xdata); + return 0; + } + afr_rename (frame, this, oldloc, newloc, xdata); + return 0; + +} + + +static int32_t +pump_link (call_frame_t *frame, + xlator_t *this, + loc_t *oldloc, + loc_t *newloc, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_link_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->link, + oldloc, newloc, xdata); + return 0; + } + afr_link (frame, this, oldloc, newloc, xdata); + return 0; + +} + + +static int32_t +pump_create (call_frame_t *frame, xlator_t *this, + loc_t *loc, int32_t flags, mode_t mode, + mode_t umask, fd_t *fd, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_create_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->create, + loc, flags, mode, umask, fd, xdata); + return 0; + } + afr_create (frame, this, loc, flags, mode, umask, fd, xdata); + return 0; + +} + + +static int32_t +pump_open (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + int32_t flags, fd_t *fd, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_open_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->open, + loc, flags, fd, xdata); + return 0; + } + afr_open (frame, this, loc, flags, fd, xdata); + return 0; + +} + + +static int32_t +pump_writev (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + struct iovec *vector, + int32_t count, + off_t off, uint32_t flags, + struct iobref *iobref, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_writev_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->writev, + fd, + vector, + count, + off, flags, + iobref, xdata); + return 0; + } + + afr_writev (frame, this, fd, vector, count, off, flags, iobref, xdata); + return 0; +} + + +static int32_t +pump_flush (call_frame_t *frame, + xlator_t *this, + fd_t *fd, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_flush_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, + fd, xdata); + return 0; + } + afr_flush (frame, this, fd, xdata); + return 0; + +} + + +static int32_t +pump_fsync (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + int32_t flags, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_fsync_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, + fd, + flags, xdata); + return 0; + } + afr_fsync (frame, this, fd, flags, xdata); + return 0; + +} + + +static int32_t +pump_opendir (call_frame_t *frame, + xlator_t *this, + loc_t *loc, fd_t *fd, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_opendir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->opendir, + loc, fd, xdata); + return 0; + } + afr_opendir (frame, this, loc, fd, xdata); + return 0; + +} + + +static int32_t +pump_fsyncdir (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + int32_t flags, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_fsyncdir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsyncdir, + fd, + flags, xdata); + return 0; + } + afr_fsyncdir (frame, this, fd, flags, xdata); + return 0; + +} + + +static int32_t +pump_xattrop (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + gf_xattrop_flags_t flags, + dict_t *dict, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_xattrop_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->xattrop, + loc, + flags, + dict, xdata); + return 0; + } + afr_xattrop (frame, this, loc, flags, dict, xdata); + return 0; + +} + +static int32_t +pump_fxattrop (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + gf_xattrop_flags_t flags, + dict_t *dict, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_fxattrop_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fxattrop, + fd, + flags, + dict, xdata); + return 0; + } + afr_fxattrop (frame, this, fd, flags, dict, xdata); + return 0; + +} + + +static int32_t +pump_removexattr (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + const char *name, dict_t *xdata) +{ + afr_private_t *priv = NULL; + int op_errno = -1; + + VALIDATE_OR_GOTO (this, out); + + GF_IF_NATIVE_XATTR_GOTO ("trusted.glusterfs.pump*", + name, op_errno, out); + + op_errno = 0; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_removexattr_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->removexattr, + loc, + name, xdata); + return 0; + } + afr_removexattr (frame, this, loc, name, xdata); + + out: + if (op_errno) + AFR_STACK_UNWIND (removexattr, frame, -1, op_errno, NULL); + return 0; + +} + + + +static int32_t +pump_readdir (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + size_t size, + off_t off, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_readdir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readdir, + fd, size, off, xdata); + return 0; + } + afr_readdir (frame, this, fd, size, off, xdata); + return 0; + +} + + +static int32_t +pump_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, + size_t size, off_t off, dict_t *dict) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_readdirp_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readdirp, + fd, size, off, dict); + return 0; + } + afr_readdirp (frame, this, fd, size, off, dict); + return 0; + +} + + + +static int32_t +pump_releasedir (xlator_t *this, + fd_t *fd) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (priv->use_afr_in_pump) + afr_releasedir (this, fd); + return 0; + +} + +static int32_t +pump_release (xlator_t *this, + fd_t *fd) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (priv->use_afr_in_pump) + afr_release (this, fd); + return 0; + +} + +static int32_t +pump_forget (xlator_t *this, inode_t *inode) +{ + afr_private_t *priv = NULL; + + priv = this->private; + if (priv->use_afr_in_pump) + afr_forget (this, inode); + + return 0; +} + +static int32_t +pump_setattr (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + struct iatt *stbuf, + int32_t valid, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_setattr_cbk, + FIRST_CHILD (this), + FIRST_CHILD (this)->fops->setattr, + loc, stbuf, valid, xdata); + return 0; + } + afr_setattr (frame, this, loc, stbuf, valid, xdata); + return 0; + +} + + +static int32_t +pump_fsetattr (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + struct iatt *stbuf, + int32_t valid, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_fsetattr_cbk, + FIRST_CHILD (this), + FIRST_CHILD (this)->fops->fsetattr, + fd, stbuf, valid, xdata); + return 0; + } + afr_fsetattr (frame, this, fd, stbuf, valid, xdata); + return 0; + +} + + +/* End of defaults */ + + +int32_t +mem_acct_init (xlator_t *this) +{ + int ret = -1; + + if (!this) + return ret; + + ret = xlator_mem_acct_init (this, gf_afr_mt_end + 1); + + if (ret != 0) { + gf_log(this->name, GF_LOG_ERROR, "Memory accounting init" + "failed"); + return ret; + } + + return ret; +} + +static int +is_xlator_pump_sink (xlator_t *child) +{ + return (child == PUMP_SINK_CHILD(THIS)); +} + +static int +is_xlator_pump_source (xlator_t *child) +{ + return (child == PUMP_SOURCE_CHILD(THIS)); +} + +int32_t +notify (xlator_t *this, int32_t event, + void *data, ...) +{ + int ret = -1; + xlator_t *child_xl = NULL; + + child_xl = (xlator_t *) data; + + ret = afr_notify (this, event, data, NULL); + + switch (event) { + case GF_EVENT_CHILD_DOWN: + if (is_xlator_pump_source (child_xl)) + pump_change_state (this, PUMP_STATE_ABORT); + break; + + case GF_EVENT_CHILD_UP: + if (is_xlator_pump_sink (child_xl)) + if (is_pump_start_pending (this)) { + gf_log (this->name, GF_LOG_DEBUG, + "about to start synctask"); + ret = pump_start_synctask (this); + if (ret < 0) + gf_log (this->name, GF_LOG_DEBUG, + "Could not start pump " + "synctask"); + else + pump_remove_start_pending (this); + } + } + + return ret; +} + +int32_t +init (xlator_t *this) +{ + afr_private_t * priv = NULL; + pump_private_t *pump_priv = NULL; + int child_count = 0; + xlator_list_t * trav = NULL; + int i = 0; + int ret = -1; + GF_UNUSED int op_errno = 0; + + int source_child = 0; + + if (!this->children) { + gf_log (this->name, GF_LOG_ERROR, + "pump translator needs a source and sink" + "subvolumes defined."); + return -1; + } + + if (!this->parents) { + gf_log (this->name, GF_LOG_WARNING, + "Volume is dangling."); + } + + priv = GF_CALLOC (1, sizeof (afr_private_t), gf_afr_mt_afr_private_t); + if (!priv) + goto out; + + LOCK_INIT (&priv->lock); + LOCK_INIT (&priv->read_child_lock); + //lock recovery is not done in afr + pthread_mutex_init (&priv->mutex, NULL); + INIT_LIST_HEAD (&priv->saved_fds); + + child_count = xlator_subvolume_count (this); + if (child_count != 2) { + gf_log (this->name, GF_LOG_ERROR, + "There should be exactly 2 children - one source " + "and one sink"); + return -1; + } + priv->child_count = child_count; + + priv->read_child = source_child; + priv->favorite_child = source_child; + priv->background_self_heal_count = 0; + + priv->data_self_heal = "on"; + priv->metadata_self_heal = 1; + priv->entry_self_heal = 1; + + priv->data_self_heal_window_size = 16; + + priv->data_change_log = 1; + priv->metadata_change_log = 1; + priv->entry_change_log = 1; + priv->use_afr_in_pump = 1; + priv->sh_readdir_size = 65536; + + /* Locking options */ + + /* Lock server count infact does not matter. Locks are held + on all subvolumes, in this case being the source + and the sink. + */ + + priv->strict_readdir = _gf_false; + priv->wait_count = 1; + priv->child_up = GF_CALLOC (sizeof (unsigned char), child_count, + gf_afr_mt_char); + if (!priv->child_up) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory."); + op_errno = ENOMEM; + goto out; + } + + priv->children = GF_CALLOC (sizeof (xlator_t *), child_count, + gf_afr_mt_xlator_t); + if (!priv->children) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory."); + op_errno = ENOMEM; + goto out; + } + + priv->pending_key = GF_CALLOC (sizeof (*priv->pending_key), + child_count, + gf_afr_mt_char); + if (!priv->pending_key) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory."); + op_errno = ENOMEM; + goto out; + } + + trav = this->children; + i = 0; + while (i < child_count) { + priv->children[i] = trav->xlator; + + ret = gf_asprintf (&priv->pending_key[i], "%s.%s", + AFR_XATTR_PREFIX, + trav->xlator->name); + if (-1 == ret) { + gf_log (this->name, GF_LOG_ERROR, + "asprintf failed to set pending key"); + op_errno = ENOMEM; + goto out; + } + + trav = trav->next; + i++; + } + + ret = gf_asprintf (&priv->sh_domain, "%s-self-heal", this->name); + if (-1 == ret) { + op_errno = ENOMEM; + goto out; + } + + priv->first_lookup = 1; + priv->root_inode = NULL; + + priv->last_event = GF_CALLOC (child_count, sizeof (*priv->last_event), + gf_afr_mt_int32_t); + if (!priv->last_event) { + ret = -ENOMEM; + goto out; + } + + pump_priv = GF_CALLOC (1, sizeof (*pump_priv), + gf_afr_mt_pump_priv); + if (!pump_priv) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto out; + } + + LOCK_INIT (&pump_priv->resume_path_lock); + LOCK_INIT (&pump_priv->pump_state_lock); + + pump_priv->resume_path = GF_CALLOC (1, PATH_MAX, + gf_afr_mt_char); + if (!pump_priv->resume_path) { + gf_log (this->name, GF_LOG_ERROR, "Out of memory"); + ret = -1; + goto out; + } + + pump_priv->env = this->ctx->env; + if (!pump_priv->env) { + gf_log (this->name, GF_LOG_ERROR, + "Could not create new sync-environment"); + ret = -1; + goto out; + } + + /* keep more local here as we may need them for self-heal etc */ + this->local_pool = mem_pool_new (afr_local_t, 128); + if (!this->local_pool) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "failed to create local_t's memory pool"); + goto out; + } + + priv->pump_private = pump_priv; + pump_priv = NULL; + + this->private = priv; + priv = NULL; + + pump_change_state (this, PUMP_STATE_ABORT); + + ret = 0; +out: + + if (pump_priv) { + GF_FREE (pump_priv->resume_path); + LOCK_DESTROY (&pump_priv->resume_path_lock); + LOCK_DESTROY (&pump_priv->pump_state_lock); + GF_FREE (pump_priv); + } + + if (priv) { + GF_FREE (priv->child_up); + GF_FREE (priv->children); + GF_FREE (priv->pending_key); + GF_FREE (priv->last_event); + LOCK_DESTROY (&priv->lock); + LOCK_DESTROY (&priv->read_child_lock); + GF_FREE (priv); + } + + return ret; +} + +int +fini (xlator_t *this) +{ + afr_private_t * priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + this->private = NULL; + if (!priv) + goto out; + + pump_priv = priv->pump_private; + if (!pump_priv) + goto afr_priv; + + GF_FREE (pump_priv->resume_path); + LOCK_DESTROY (&pump_priv->resume_path_lock); + LOCK_DESTROY (&pump_priv->pump_state_lock); + GF_FREE (pump_priv); +afr_priv: + afr_priv_destroy (priv); +out: + return 0; +} + + +struct xlator_fops fops = { + .lookup = pump_lookup, + .open = pump_open, + .flush = pump_flush, + .fsync = pump_fsync, + .fsyncdir = pump_fsyncdir, + .xattrop = pump_xattrop, + .fxattrop = pump_fxattrop, + .getxattr = pump_getxattr, + + /* inode write */ + .writev = pump_writev, + .truncate = pump_truncate, + .ftruncate = pump_ftruncate, + .setxattr = pump_setxattr, + .setattr = pump_setattr, + .fsetattr = pump_fsetattr, + .removexattr = pump_removexattr, + + /* dir read */ + .opendir = pump_opendir, + .readdir = pump_readdir, + .readdirp = pump_readdirp, + + /* dir write */ + .create = pump_create, + .mknod = pump_mknod, + .mkdir = pump_mkdir, + .unlink = pump_unlink, + .rmdir = pump_rmdir, + .link = pump_link, + .symlink = pump_symlink, + .rename = pump_rename, +}; + +struct xlator_dumpops dumpops = { + .priv = afr_priv_dump, +}; + + +struct xlator_cbks cbks = { + .release = pump_release, + .releasedir = pump_releasedir, + .forget = pump_forget, +}; + +struct volume_options options[] = { + { .key = {NULL} }, +}; |