diff options
Diffstat (limited to 'xlators/cluster/afr/src/pump.c')
| -rw-r--r-- | xlators/cluster/afr/src/pump.c | 1854 |
1 files changed, 0 insertions, 1854 deletions
diff --git a/xlators/cluster/afr/src/pump.c b/xlators/cluster/afr/src/pump.c deleted file mode 100644 index 5b31d389b28..00000000000 --- a/xlators/cluster/afr/src/pump.c +++ /dev/null @@ -1,1854 +0,0 @@ -/* - Copyright (c) 2007-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - -#include <unistd.h> -#include <sys/time.h> -#include <stdlib.h> - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include "afr-common.c" - -static int -pump_mark_start_pending (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - pump_priv->pump_start_pending = 1; - - return 0; -} - -static int -is_pump_start_pending (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - return (pump_priv->pump_start_pending); -} - -static int -pump_remove_start_pending (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - pump_priv->pump_start_pending = 0; - - return 0; -} - -static pump_state_t -pump_get_state () -{ - xlator_t *this = NULL; - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - pump_state_t ret; - - this = THIS; - priv = this->private; - pump_priv = priv->pump_private; - - LOCK (&pump_priv->pump_state_lock); - { - ret = pump_priv->pump_state; - } - UNLOCK (&pump_priv->pump_state_lock); - - return ret; -} - -int -pump_change_state (xlator_t *this, pump_state_t state) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - pump_state_t state_old; - pump_state_t state_new; - - - priv = this->private; - pump_priv = priv->pump_private; - - GF_ASSERT (pump_priv); - - LOCK (&pump_priv->pump_state_lock); - { - state_old = pump_priv->pump_state; - state_new = state; - - pump_priv->pump_state = state; - - } - UNLOCK (&pump_priv->pump_state_lock); - - gf_log (this->name, GF_LOG_DEBUG, - "Pump changing state from %d to %d", - state_old, - state_new); - - return 0; -} - -static int -pump_set_resume_path (xlator_t *this, const char *path) -{ - int ret = 0; - - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - assert (pump_priv); - - LOCK (&pump_priv->resume_path_lock); - { - pump_priv->resume_path = strdup (path); - if (!pump_priv->resume_path) - ret = -1; - } - UNLOCK (&pump_priv->resume_path_lock); - - return ret; -} - -static void -build_child_loc (loc_t *parent, loc_t *child, char *path, char *name) -{ - child->path = path; - child->name = name; - - child->parent = inode_ref (parent->inode); - child->inode = inode_new (parent->inode->table); -} - -static char * -build_file_path (loc_t *loc, gf_dirent_t *entry) -{ - xlator_t *this = NULL; - char *file_path = NULL; - int pathlen = 0; - int total_size = 0; - - this = THIS; - - pathlen = STRLEN_0 (loc->path); - - if (IS_ROOT_PATH (loc->path)) { - total_size = pathlen + entry->d_len; - file_path = GF_CALLOC (1, total_size, gf_afr_mt_char); - if (!file_path) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); - return NULL; - } - - gf_log (this->name, GF_LOG_TRACE, - "constructing file path of size=%d" - "pathlen=%d, d_len=%d", - total_size, pathlen, - entry->d_len); - - snprintf(file_path, total_size, "%s%s", loc->path, entry->d_name); - - } else { - total_size = pathlen + entry->d_len + 1; /* for the extra '/' in the path */ - file_path = GF_CALLOC (1, total_size + 1, gf_afr_mt_char); - if (!file_path) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); - return NULL; - } - - gf_log (this->name, GF_LOG_TRACE, - "constructing file path of size=%d" - "pathlen=%d, d_len=%d", - total_size, pathlen, - entry->d_len); - - snprintf(file_path, total_size, "%s/%s", loc->path, entry->d_name); - } - - gf_log (this->name, GF_LOG_TRACE, - "path=%s and d_name=%s", loc->path, entry->d_name); - gf_log (this->name, GF_LOG_TRACE, - "constructed file_path=%s of size=%d", file_path, total_size); - - return file_path; -} - -static int -pump_save_path (xlator_t *this, const char *path) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - pump_state_t state; - dict_t *dict = NULL; - loc_t loc; - int dict_ret = 0; - int ret = -1; - - state = pump_get_state (); - if (state == PUMP_STATE_RESUME) - return 0; - - priv = this->private; - pump_priv = priv->pump_private; - - assert (priv->root_inode); - - build_root_loc (priv->root_inode, &loc); - - dict = dict_new (); - dict_ret = dict_set_str (dict, PUMP_PATH, (char *)path); - - ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); - - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "setxattr failed - could not save path=%s", path); - } else { - gf_log (this->name, GF_LOG_DEBUG, - "setxattr succeeded - saved path=%s", path); - gf_log (this->name, GF_LOG_DEBUG, - "Saving path for status info"); - } - - dict_unref (dict); - - return 0; -} - -static int -pump_check_and_update_status (xlator_t *this) -{ - pump_state_t state; - int ret = -1; - - state = pump_get_state (); - - switch (state) { - - case PUMP_STATE_RESUME: - case PUMP_STATE_RUNNING: - { - ret = 0; - break; - } - case PUMP_STATE_PAUSE: - { - ret = -1; - break; - } - case PUMP_STATE_ABORT: - { - pump_save_path (this, "/"); - ret = -1; - break; - } - default: - { - gf_log (this->name, GF_LOG_DEBUG, - "Unknown pump state"); - ret = -1; - break; - } - - } - - return ret; -} - -static const char * -pump_get_resume_path (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - const char *resume_path = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - resume_path = pump_priv->resume_path; - - return resume_path; -} - -static int -pump_update_resume_state (xlator_t *this, const char *path) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - pump_state_t state; - const char *resume_path = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - state = pump_get_state (); - - if (state == PUMP_STATE_RESUME) { - resume_path = pump_get_resume_path (this); - if (strcmp (resume_path, "/") == 0) { - gf_log (this->name, GF_LOG_DEBUG, - "Reached the resume path (/). Proceeding to change state" - " to running"); - pump_change_state (this, PUMP_STATE_RUNNING); - } else if (strcmp (resume_path, path) == 0) { - gf_log (this->name, GF_LOG_DEBUG, - "Reached the resume path. Proceeding to change state" - " to running"); - pump_change_state (this, PUMP_STATE_RUNNING); - } else { - gf_log (this->name, GF_LOG_DEBUG, - "Not yet hit the resume path:res-path=%s,path=%s", - resume_path, path); - } - } - - return 0; -} - -static gf_boolean_t -is_pump_traversal_allowed (xlator_t *this, const char *path) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - pump_state_t state; - const char *resume_path = NULL; - gf_boolean_t ret = _gf_true; - - priv = this->private; - pump_priv = priv->pump_private; - - state = pump_get_state (); - - if (state == PUMP_STATE_RESUME) { - resume_path = pump_get_resume_path (this); - if (strstr (resume_path, path)) { - gf_log (this->name, GF_LOG_DEBUG, - "On the right path to resumption path"); - ret = _gf_true; - } else { - gf_log (this->name, GF_LOG_DEBUG, - "Not the right path to resuming=> ignoring traverse"); - ret = _gf_false; - } - } - - return ret; -} - -static int -pump_save_file_stats (xlator_t *this, const char *path) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - LOCK (&pump_priv->resume_path_lock); - { - pump_priv->number_files_pumped++; - - strncpy (pump_priv->current_file, path, - PATH_MAX); - } - UNLOCK (&pump_priv->resume_path_lock); - - return 0; -} - -static int -gf_pump_traverse_directory (loc_t *loc) -{ - xlator_t *this = NULL; - afr_private_t *priv = NULL; - fd_t *fd = NULL; - - off_t offset = 0; - loc_t entry_loc; - gf_dirent_t *entry = NULL; - gf_dirent_t *tmp = NULL; - gf_dirent_t entries; - - struct iatt iatt, parent; - dict_t *xattr_rsp; - - int source = 0; - - char *file_path = NULL; - int ret = 0; - - INIT_LIST_HEAD (&entries.list); - this = THIS; - priv = this->private; - - assert (loc->inode); - - fd = fd_create (loc->inode, PUMP_PID); - if (!fd) { - gf_log (this->name, GF_LOG_ERROR, - "Failed to create fd for %s", loc->path); - goto out; - } - - ret = syncop_opendir (priv->children[source], loc, fd); - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "opendir failed on %s", loc->path); - goto out; - } - - gf_log (this->name, GF_LOG_TRACE, - "pump opendir on %s returned=%d", - loc->path, ret); - - while (syncop_readdirp (priv->children[source], fd, 131072, offset, &entries)) { - - if (list_empty (&entries.list)) { - gf_log (this->name, GF_LOG_TRACE, - "no more entries in directory"); - goto out; - } - - list_for_each_entry_safe (entry, tmp, &entries.list, list) { - gf_log (this->name, GF_LOG_DEBUG, - "found readdir entry=%s", entry->d_name); - - file_path = build_file_path (loc, entry); - if (!file_path) { - gf_log (this->name, GF_LOG_DEBUG, - "file path construction failed"); - goto out; - } - - build_child_loc (loc, &entry_loc, file_path, entry->d_name); - - ret = syncop_lookup (this, &entry_loc, NULL, - &iatt, &xattr_rsp, &parent); - - entry_loc.ino = iatt.ia_ino; - entry_loc.inode->ino = iatt.ia_ino; - - gf_log (this->name, GF_LOG_DEBUG, - "lookup %s => %"PRId64, - entry_loc.path, - iatt.ia_ino); - - pump_update_resume_state (this, entry_loc.path); - - if (!IS_ENTRY_CWD(entry->d_name) && - !IS_ENTRY_PARENT (entry->d_name)) { - pump_save_path (this, entry_loc.path); - pump_save_file_stats (this, entry_loc.path); - } - - ret = pump_check_and_update_status (this); - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "Pump beginning to exit out"); - goto out; - } - - gf_log (this->name, GF_LOG_TRACE, - "type of file=%d, IFDIR=%d", - iatt.ia_type, IA_IFDIR); - - if (IA_ISDIR (iatt.ia_type) && !IS_ENTRY_CWD(entry->d_name) && - !IS_ENTRY_PARENT (entry->d_name)) { - if (is_pump_traversal_allowed (this, entry_loc.path)) { - gf_log (this->name, GF_LOG_TRACE, - "entering dir=%s", - entry->d_name); - gf_pump_traverse_directory (&entry_loc); - } - } - - offset = entry->d_off; - loc_wipe (&entry_loc); - } - - gf_dirent_free (&entries); - gf_log (this->name, GF_LOG_TRACE, - "offset incremented to %d", - (int32_t ) offset); - - } - -out: - return 0; - -} - -void -build_root_loc (inode_t *inode, loc_t *loc) -{ - loc->path = "/"; - loc->name = ""; - loc->inode = inode; - loc->ino = 1; - loc->inode->ino = 1; - -} - -static int -pump_update_resume_path (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - const char *resume_path = NULL; - - priv = this->private; - pump_priv = priv->pump_private; - - resume_path = pump_get_resume_path (this); - - if (resume_path) { - gf_log (this->name, GF_LOG_DEBUG, - "Found a path to resume from: %s", - resume_path); - - }else { - gf_log (this->name, GF_LOG_DEBUG, - "Did not find a path=> setting to '/'"); - pump_set_resume_path (this, "/"); - } - - pump_change_state (this, PUMP_STATE_RESUME); - - return 0; -} - -static int -pump_complete_migration (xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - dict_t *dict = NULL; - pump_state_t state; - loc_t loc; - int dict_ret = 0; - int ret = -1; - - priv = this->private; - pump_priv = priv->pump_private; - - assert (priv->root_inode); - - build_root_loc (priv->root_inode, &loc); - - dict = dict_new (); - - state = pump_get_state (); - if (state == PUMP_STATE_RUNNING) { - gf_log (this->name, GF_LOG_DEBUG, - "Pump finished pumping"); - - pump_priv->pump_finished = _gf_true; - - dict_ret = dict_set_str (dict, PUMP_SOURCE_COMPLETE, "jargon"); - - ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "setxattr failed - while notifying source complete"); - } - dict_ret = dict_set_str (dict, PUMP_SINK_COMPLETE, "jargon"); - - ret = syncop_setxattr (PUMP_SINK_CHILD (this), &loc, dict, 0); - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "setxattr failed - while notifying sink complete"); - } - } - - return 0; -} - -static int -pump_task (void *data) -{ - xlator_t *this = NULL; - afr_private_t *priv = NULL; - - - loc_t loc; - struct iatt iatt, parent; - dict_t *xattr_rsp; - - int ret = -1; - - this = THIS; - priv = this->private; - - assert (priv->root_inode); - - build_root_loc (priv->root_inode, &loc); - - ret = syncop_lookup (this, &loc, NULL, - &iatt, &xattr_rsp, &parent); - - gf_log (this->name, GF_LOG_TRACE, - "lookup: ino=%"PRId64", path=%s", - loc.ino, - loc.path); - - ret = pump_check_and_update_status (this); - if (ret < 0) { - goto out; - } - - pump_update_resume_path (this); - - gf_pump_traverse_directory (&loc); - - pump_complete_migration (this); -out: - return 0; -} - - -static int -pump_task_completion (int ret, void *data) -{ - xlator_t *this = NULL; - call_frame_t *frame = NULL; - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - this = THIS; - - frame = (call_frame_t *) data; - - priv = this->private; - pump_priv = priv->pump_private; - - inode_unref (priv->root_inode); - - gf_log (this->name, GF_LOG_DEBUG, - "Pump xlator exiting"); - return 0; -} - -int -pump_start (call_frame_t *pump_frame, xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - int ret = -1; - - priv = this->private; - pump_priv = priv->pump_private; - - if (!pump_frame->root->lk_owner) - pump_frame->root->lk_owner = PUMP_LK_OWNER; - - ret = synctask_new (pump_priv->env, pump_task, pump_task_completion, - pump_frame); - - if (ret != -1) { - gf_log (this->name, GF_LOG_TRACE, - "setting pump as started"); - } else { - gf_log (this->name, GF_LOG_DEBUG, - "starting pump failed"); - pump_change_state (this, PUMP_STATE_ABORT); - } - - return ret; -} - -static int -pump_start_synctask (xlator_t *this) -{ - call_frame_t *frame = NULL; - int ret = 0; - - frame = create_frame (this, this->ctx->pool); - if (!frame) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); - ret = -1; - goto out; - } - - pump_change_state (this, PUMP_STATE_RUNNING); - - ret = pump_start (frame, this); - -out: - return ret; -} - -int32_t -pump_cmd_start_setxattr_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno) - -{ - afr_local_t *local = NULL; - int ret = 0; - - local = frame->local; - - if (op_ret < 0) { - gf_log (this->name, GF_LOG_ERROR, - "Could not initiate destination " - "brick connect"); - ret = op_ret; - goto out; - } - - gf_log (this->name, GF_LOG_DEBUG, - "Successfully initiated destination " - "brick connect"); - - pump_mark_start_pending (this); - -out: - local->op_ret = ret; - pump_command_reply (frame, this); - - return 0; -} - -static int -pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) -{ - afr_local_t *local = NULL; - afr_private_t *priv = NULL; - dict_t *dict = NULL; - char *dst_brick = NULL; - loc_t loc; - - int ret = 0; - - priv = this->private; - local = frame->local; - - GF_ASSERT (priv->root_inode); - - build_root_loc (priv->root_inode, &loc); - - ret = dict_get_str (local->dict, PUMP_CMD_START, &dst_brick); - if (ret < 0) { - gf_log (this->name, GF_LOG_ERROR, - "Could not get destination brick value"); - goto out; - } - - dict = dict_new (); - if (!dict) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); - ret = -1; - goto out; - } - - GF_ASSERT (dst_brick); - gf_log (this->name, GF_LOG_DEBUG, - "Got destination brick as %s", dst_brick); - - ret = dict_set_str (dict, CLIENT_CMD_CONNECT, dst_brick); - if (ret < 0) { - gf_log (this->name, GF_LOG_ERROR, - "Could not inititiate destination brick " - "connect"); - goto out; - } - - STACK_WIND (frame, - pump_cmd_start_setxattr_cbk, - PUMP_SINK_CHILD(this), - PUMP_SINK_CHILD(this)->fops->setxattr, - &loc, - dict, - 0); - - ret = 0; - - dict_unref (dict); -out: - return ret; -} - -static int -is_pump_aborted (xlator_t *this) -{ - pump_state_t state; - - state = pump_get_state (); - - return ((state == PUMP_STATE_ABORT)); -} - -int32_t -pump_cmd_start_getxattr_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - dict_t *dict) -{ - afr_local_t *local = NULL; - char *path = NULL; - - pump_state_t state; - int ret = 0; - int need_unwind = 0; - int dict_ret = -1; - - local = frame->local; - - if (op_ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "getxattr failed - changing pump " - "state to RUNNING with '/'"); - path = "/"; - ret = op_ret; - } else { - gf_log (this->name, GF_LOG_TRACE, - "getxattr succeeded"); - - dict_ret = dict_get_str (dict, PUMP_PATH, &path); - if (dict_ret < 0) - path = "/"; - } - - state = pump_get_state (); - if ((state == PUMP_STATE_RUNNING) || - (state == PUMP_STATE_RESUME)) { - gf_log (this->name, GF_LOG_ERROR, - "Pump is already started"); - ret = -1; - goto out; - } - - pump_set_resume_path (this, path); - - if (is_pump_aborted (this)) - /* We're re-starting pump afresh */ - ret = pump_initiate_sink_connect (frame, this); - else { - /* We're re-starting pump from a previous - pause */ - ret = pump_start_synctask (this); - need_unwind = 1; - } - -out: - if ((ret < 0) || (need_unwind == 1)) { - local->op_ret = ret; - pump_command_reply (frame, this); - } - return 0; -} - -int -pump_execute_status (call_frame_t *frame, xlator_t *this) -{ - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - - uint64_t number_files = 0; - - char filename[PATH_MAX]; - char *dict_str = NULL; - - int32_t op_ret = 0; - int32_t op_errno = 0; - - dict_t *dict = NULL; - int ret = -1; - - priv = this->private; - pump_priv = priv->pump_private; - - LOCK (&pump_priv->resume_path_lock); - { - number_files = pump_priv->number_files_pumped; - strncpy (filename, pump_priv->current_file, PATH_MAX); - } - UNLOCK (&pump_priv->resume_path_lock); - - dict_str = GF_CALLOC (1, PATH_MAX + 256, gf_afr_mt_char); - if (!dict_str) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); - op_ret = -1; - op_errno = ENOMEM; - goto out; - } - - if (pump_priv->pump_finished) { - snprintf (dict_str, PATH_MAX + 256, "Number of files migrated = %"PRIu64" Migration complete ", - number_files); - } else { - snprintf (dict_str, PATH_MAX + 256, "Number of files migrated = %"PRIu64" Current file= %s ", - number_files, filename); - } - - dict = dict_new (); - - ret = dict_set_str (dict, PUMP_CMD_STATUS, dict_str); - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "dict_set_str returned negative value"); - } - - op_ret = 0; - -out: - - AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict); - - dict_unref (dict); - GF_FREE (dict_str); - - return 0; -} - -int -pump_execute_pause (call_frame_t *frame, xlator_t *this) -{ - afr_local_t *local = NULL; - - local = frame->local; - - pump_change_state (this, PUMP_STATE_PAUSE); - - local->op_ret = 0; - pump_command_reply (frame, this); - - return 0; -} - -int -pump_execute_start (call_frame_t *frame, xlator_t *this) -{ - afr_private_t *priv = NULL; - afr_local_t *local = NULL; - - int ret = 0; - loc_t loc; - - priv = this->private; - local = frame->local; - - if (!priv->root_inode) { - gf_log (this->name, GF_LOG_ERROR, - "Pump xlator cannot be started without an initial " - "lookup"); - ret = -1; - goto out; - } - - GF_ASSERT (priv->root_inode); - - build_root_loc (priv->root_inode, &loc); - - STACK_WIND (frame, - pump_cmd_start_getxattr_cbk, - PUMP_SOURCE_CHILD(this), - PUMP_SOURCE_CHILD(this)->fops->getxattr, - &loc, - PUMP_PATH); - - ret = 0; - -out: - if (ret < 0) { - local->op_ret = ret; - pump_command_reply (frame, this); - } - - return 0; -} - -int -pump_execute_abort (call_frame_t *frame, xlator_t *this) -{ - afr_private_t *priv = NULL; - afr_local_t *local = NULL; - - priv = this->private; - local = frame->local; - - pump_change_state (this, PUMP_STATE_ABORT); - - local->op_ret = 0; - pump_command_reply (frame, this); - - return 0; -} - -gf_boolean_t -pump_command_status (xlator_t *this, dict_t *dict) -{ - char *cmd = NULL; - int dict_ret = -1; - int ret = _gf_true; - - dict_ret = dict_get_str (dict, PUMP_CMD_STATUS, &cmd); - if (dict_ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "Not a pump status command"); - ret = _gf_false; - goto out; - } - - gf_log (this->name, GF_LOG_DEBUG, - "Hit a pump command - status"); - ret = _gf_true; - -out: - return ret; - -} - -gf_boolean_t -pump_command_pause (xlator_t *this, dict_t *dict) -{ - char *cmd = NULL; - int dict_ret = -1; - int ret = _gf_true; - - dict_ret = dict_get_str (dict, PUMP_CMD_PAUSE, &cmd); - if (dict_ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "Not a pump pause command"); - ret = _gf_false; - goto out; - } - - gf_log (this->name, GF_LOG_DEBUG, - "Hit a pump command - pause"); - ret = _gf_true; - -out: - return ret; - -} - -gf_boolean_t -pump_command_abort (xlator_t *this, dict_t *dict) -{ - char *cmd = NULL; - int dict_ret = -1; - int ret = _gf_true; - - dict_ret = dict_get_str (dict, PUMP_CMD_ABORT, &cmd); - if (dict_ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "Not a pump abort command"); - ret = _gf_false; - goto out; - } - - gf_log (this->name, GF_LOG_DEBUG, - "Hit a pump command - abort"); - ret = _gf_true; - -out: - return ret; - -} - -gf_boolean_t -pump_command_start (xlator_t *this, dict_t *dict) -{ - char *cmd = NULL; - int dict_ret = -1; - int ret = _gf_true; - - dict_ret = dict_get_str (dict, PUMP_CMD_START, &cmd); - if (dict_ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "Not a pump start command"); - ret = _gf_false; - goto out; - } - - gf_log (this->name, GF_LOG_DEBUG, - "Hit a pump command - start"); - ret = _gf_true; - -out: - return ret; - -} - -struct _xattr_key { - char *key; - struct list_head list; -}; - -static void -__gather_xattr_keys (dict_t *dict, char *key, data_t *value, - void *data) -{ - struct list_head * list = data; - struct _xattr_key * xkey = NULL; - - if (!strncmp (key, AFR_XATTR_PREFIX, - strlen (AFR_XATTR_PREFIX))) { - - xkey = GF_CALLOC (1, sizeof (*xkey), gf_afr_mt_xattr_key); - if (!xkey) - return; - - xkey->key = key; - INIT_LIST_HEAD (&xkey->list); - - list_add_tail (&xkey->list, list); - } -} - -static void -__filter_xattrs (dict_t *dict) -{ - struct list_head keys; - - struct _xattr_key *key; - struct _xattr_key *tmp; - - INIT_LIST_HEAD (&keys); - - dict_foreach (dict, __gather_xattr_keys, - (void *) &keys); - - list_for_each_entry_safe (key, tmp, &keys, list) { - dict_del (dict, key->key); - - list_del_init (&key->list); - - GF_FREE (key); - } -} - -int32_t -pump_getxattr_cbk (call_frame_t *frame, void *cookie, - xlator_t *this, int32_t op_ret, int32_t op_errno, - dict_t *dict) -{ - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - xlator_t ** children = NULL; - - int unwind = 1; - int last_tried = -1; - int this_try = -1; - int read_child = -1; - - priv = this->private; - children = priv->children; - - local = frame->local; - - read_child = (long) cookie; - - if (op_ret == -1) { - retry: - last_tried = local->cont.getxattr.last_tried; - - if (all_tried (last_tried, priv->child_count)) { - goto out; - } - this_try = ++local->cont.getxattr.last_tried; - - if (this_try == read_child) { - goto retry; - } - - unwind = 0; - STACK_WIND_COOKIE (frame, pump_getxattr_cbk, - (void *) (long) read_child, - children[this_try], - children[this_try]->fops->getxattr, - &local->loc, - local->cont.getxattr.name); - } - -out: - if (unwind) { - if (op_ret >= 0 && dict) - __filter_xattrs (dict); - - AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict); - } - - return 0; -} - -int32_t -pump_getxattr (call_frame_t *frame, xlator_t *this, - loc_t *loc, const char *name) -{ - afr_private_t * priv = NULL; - xlator_t ** children = NULL; - int call_child = 0; - afr_local_t * local = NULL; - - int read_child = -1; - - int32_t op_ret = -1; - int32_t op_errno = 0; - - - VALIDATE_OR_GOTO (frame, out); - VALIDATE_OR_GOTO (this, out); - VALIDATE_OR_GOTO (this->private, out); - - priv = this->private; - VALIDATE_OR_GOTO (priv->children, out); - - children = priv->children; - - ALLOC_OR_GOTO (local, afr_local_t, out); - frame->local = local; - - if (name) { - if (!strncmp (name, AFR_XATTR_PREFIX, - strlen (AFR_XATTR_PREFIX))) { - - op_errno = ENODATA; - goto out; - } - - if (!strcmp (name, PUMP_CMD_STATUS)) { - gf_log (this->name, GF_LOG_DEBUG, - "Hit pump command - status"); - pump_execute_status (frame, this); - op_ret = 0; - goto out; - } - } - - read_child = afr_read_child (this, loc->inode); - - if (read_child >= 0) { - call_child = read_child; - - local->cont.getxattr.last_tried = -1; - } else { - call_child = afr_first_up_child (priv); - - if (call_child == -1) { - op_errno = ENOTCONN; - gf_log (this->name, GF_LOG_DEBUG, - "no child is up"); - goto out; - } - - local->cont.getxattr.last_tried = call_child; - } - - loc_copy (&local->loc, loc); - if (name) - local->cont.getxattr.name = gf_strdup (name); - - STACK_WIND_COOKIE (frame, pump_getxattr_cbk, - (void *) (long) call_child, - children[call_child], children[call_child]->fops->getxattr, - loc, name); - - op_ret = 0; -out: - if (op_ret == -1) { - AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, NULL); - } - return 0; -} - -static int -afr_setxattr_unwind (call_frame_t *frame, xlator_t *this) -{ - afr_local_t * local = NULL; - afr_private_t * priv = NULL; - call_frame_t *main_frame = NULL; - - local = frame->local; - priv = this->private; - - LOCK (&frame->lock); - { - if (local->transaction.main_frame) - main_frame = local->transaction.main_frame; - local->transaction.main_frame = NULL; - } - UNLOCK (&frame->lock); - - if (main_frame) { - AFR_STACK_UNWIND (setxattr, main_frame, - local->op_ret, local->op_errno) - } - return 0; -} - -static int -afr_setxattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) -{ - afr_local_t * local = NULL; - afr_private_t * priv = NULL; - - int call_count = -1; - int need_unwind = 0; - - local = frame->local; - priv = this->private; - - LOCK (&frame->lock); - { - if (op_ret != -1) { - if (local->success_count == 0) { - local->op_ret = op_ret; - } - local->success_count++; - - if (local->success_count == priv->child_count) { - need_unwind = 1; - } - } - - local->op_errno = op_errno; - } - UNLOCK (&frame->lock); - - if (need_unwind) - local->transaction.unwind (frame, this); - - call_count = afr_frame_return (frame); - - if (call_count == 0) { - local->transaction.resume (frame, this); - } - - return 0; -} - -static int -afr_setxattr_wind (call_frame_t *frame, xlator_t *this) -{ - afr_local_t *local = NULL; - afr_private_t *priv = NULL; - - int call_count = -1; - int i = 0; - - local = frame->local; - priv = this->private; - - call_count = afr_up_children_count (priv->child_count, local->child_up); - - if (call_count == 0) { - local->transaction.resume (frame, this); - return 0; - } - - local->call_count = call_count; - - for (i = 0; i < priv->child_count; i++) { - if (local->child_up[i]) { - STACK_WIND_COOKIE (frame, afr_setxattr_wind_cbk, - (void *) (long) i, - priv->children[i], - priv->children[i]->fops->setxattr, - &local->loc, - local->cont.setxattr.dict, - local->cont.setxattr.flags); - - if (!--call_count) - break; - } - } - - return 0; -} - - -static int -afr_setxattr_done (call_frame_t *frame, xlator_t *this) -{ - afr_local_t * local = frame->local; - - local->transaction.unwind (frame, this); - - AFR_STACK_DESTROY (frame); - - return 0; -} - -int32_t -pump_setxattr_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno) -{ - STACK_UNWIND (frame, - op_ret, - op_errno); - return 0; -} - -int -pump_command_reply (call_frame_t *frame, xlator_t *this) -{ - afr_local_t *local = NULL; - - local = frame->local; - - if (local->op_ret < 0) - gf_log (this->name, GF_LOG_NORMAL, - "Command failed"); - else - gf_log (this->name, GF_LOG_NORMAL, - "Command succeeded"); - - dict_unref (local->dict); - - AFR_STACK_UNWIND (setxattr, - frame, - local->op_ret, - local->op_errno); - - return 0; -} - -int -pump_parse_command (call_frame_t *frame, xlator_t *this, - afr_local_t *local, dict_t *dict) -{ - - int ret = -1; - - if (pump_command_start (this, dict)) { - frame->local = local; - local->dict = dict_ref (dict); - ret = pump_execute_start (frame, this); - - } else if (pump_command_pause (this, dict)) { - frame->local = local; - local->dict = dict_ref (dict); - ret = pump_execute_pause (frame, this); - - } else if (pump_command_abort (this, dict)) { - frame->local = local; - local->dict = dict_ref (dict); - ret = pump_execute_abort (frame, this); - } - return ret; -} - -int -pump_setxattr (call_frame_t *frame, xlator_t *this, - loc_t *loc, dict_t *dict, int32_t flags) -{ - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - call_frame_t *transaction_frame = NULL; - - int ret = -1; - - int op_ret = -1; - int op_errno = 0; - - VALIDATE_OR_GOTO (frame, out); - VALIDATE_OR_GOTO (this, out); - VALIDATE_OR_GOTO (this->private, out); - - priv = this->private; - - ALLOC_OR_GOTO (local, afr_local_t, out); - - ret = AFR_LOCAL_INIT (local, priv); - if (ret < 0) { - op_errno = -ret; - goto out; - } - - ret = pump_parse_command (frame, this, - local, dict); - if (ret >= 0) { - op_ret = 0; - goto out; - } - - transaction_frame = copy_frame (frame); - if (!transaction_frame) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory."); - goto out; - } - - transaction_frame->local = local; - - local->op_ret = -1; - - local->cont.setxattr.dict = dict_ref (dict); - local->cont.setxattr.flags = flags; - - local->transaction.fop = afr_setxattr_wind; - local->transaction.done = afr_setxattr_done; - local->transaction.unwind = afr_setxattr_unwind; - - loc_copy (&local->loc, loc); - - local->transaction.main_frame = frame; - local->transaction.start = LLONG_MAX - 1; - local->transaction.len = 0; - - afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION); - - op_ret = 0; -out: - if (op_ret == -1) { - if (transaction_frame) - AFR_STACK_DESTROY (transaction_frame); - AFR_STACK_UNWIND (setxattr, frame, op_ret, op_errno); - } - - return 0; -} - -int32_t -mem_acct_init (xlator_t *this) -{ - int ret = -1; - - if (!this) - return ret; - - ret = xlator_mem_acct_init (this, gf_afr_mt_end + 1); - - if (ret != 0) { - gf_log(this->name, GF_LOG_ERROR, "Memory accounting init" - "failed"); - return ret; - } - - return ret; -} - -static int -is_xlator_pump_sink (xlator_t *child) -{ - return (child == PUMP_SINK_CHILD(THIS)); -} - -static int -is_xlator_pump_source (xlator_t *child) -{ - return (child == PUMP_SOURCE_CHILD(THIS)); -} - -int32_t -notify (xlator_t *this, int32_t event, - void *data, ...) -{ - int ret = -1; - xlator_t *child_xl = NULL; - - child_xl = (xlator_t *) data; - - ret = afr_notify (this, event, data); - - switch (event) { - case GF_EVENT_CHILD_DOWN: - if (is_xlator_pump_source (child_xl)) - pump_change_state (this, PUMP_STATE_ABORT); - break; - - case GF_EVENT_CHILD_UP: - if (is_xlator_pump_sink (child_xl)) - if (is_pump_start_pending (this)) { - ret = pump_start_synctask (this); - if (ret < 0) - gf_log (this->name, GF_LOG_DEBUG, - "Could not start pump " - "synctask"); - else - pump_remove_start_pending (this); - } - } - - return ret; -} - -int32_t -init (xlator_t *this) -{ - afr_private_t * priv = NULL; - pump_private_t *pump_priv = NULL; - int child_count = 0; - xlator_list_t * trav = NULL; - int i = 0; - int ret = -1; - int op_errno = 0; - - int source_child = 0; - - if (!this->children) { - gf_log (this->name, GF_LOG_ERROR, - "pump translator needs a source and sink" - "subvolumes defined."); - return -1; - } - - if (!this->parents) { - gf_log (this->name, GF_LOG_WARNING, - "Volume is dangling."); - } - - ALLOC_OR_GOTO (this->private, afr_private_t, out); - - priv = this->private; - - priv->read_child = source_child; - priv->favorite_child = source_child; - priv->background_self_heal_count = 0; - - priv->data_self_heal = 1; - priv->metadata_self_heal = 1; - priv->entry_self_heal = 1; - - priv->data_self_heal_algorithm = ""; - - priv->data_self_heal_window_size = 16; - - priv->data_change_log = 1; - priv->metadata_change_log = 1; - priv->entry_change_log = 1; - - /* Locking options */ - - /* Lock server count infact does not matter. Locks are held - on all subvolumes, in this case being the source - and the sink. - */ - - priv->data_lock_server_count = 2; - priv->metadata_lock_server_count = 2; - priv->entry_lock_server_count = 2; - - priv->strict_readdir = _gf_false; - - trav = this->children; - while (trav) { - child_count++; - trav = trav->next; - } - - priv->wait_count = 1; - - if (child_count != 2) { - gf_log (this->name, GF_LOG_ERROR, - "There should be exactly 2 children - one source " - "and one sink"); - return -1; - } - priv->child_count = child_count; - - LOCK_INIT (&priv->lock); - LOCK_INIT (&priv->read_child_lock); - - priv->child_up = GF_CALLOC (sizeof (unsigned char), child_count, - gf_afr_mt_char); - if (!priv->child_up) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory."); - op_errno = ENOMEM; - goto out; - } - - priv->children = GF_CALLOC (sizeof (xlator_t *), child_count, - gf_afr_mt_xlator_t); - if (!priv->children) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory."); - op_errno = ENOMEM; - goto out; - } - - priv->pending_key = GF_CALLOC (sizeof (*priv->pending_key), - child_count, - gf_afr_mt_char); - if (!priv->pending_key) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory."); - op_errno = ENOMEM; - goto out; - } - - trav = this->children; - i = 0; - while (i < child_count) { - priv->children[i] = trav->xlator; - - ret = asprintf (&priv->pending_key[i], "%s.%s", AFR_XATTR_PREFIX, - trav->xlator->name); - if (-1 == ret) { - gf_log (this->name, GF_LOG_ERROR, - "asprintf failed to set pending key"); - op_errno = ENOMEM; - goto out; - } - - trav = trav->next; - i++; - } - - priv->first_lookup = 1; - priv->root_inode = NULL; - - pump_priv = GF_CALLOC (1, sizeof (*pump_priv), - gf_afr_mt_pump_priv); - if (!pump_priv) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); - op_errno = ENOMEM; - goto out; - } - - LOCK_INIT (&pump_priv->resume_path_lock); - LOCK_INIT (&pump_priv->pump_state_lock); - - pump_priv->resume_path = GF_CALLOC (1, PATH_MAX, - gf_afr_mt_char); - if (!pump_priv->resume_path) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); - ret = -1; - goto out; - } - - pump_priv->env = syncenv_new (0); - if (!pump_priv->env) { - gf_log (this->name, GF_LOG_ERROR, - "Could not create new sync-environment"); - ret = -1; - goto out; - } - - priv->pump_private = pump_priv; - - pump_change_state (this, PUMP_STATE_ABORT); - - ret = 0; -out: - return ret; -} - -int -fini (xlator_t *this) -{ - return 0; -} - - -struct xlator_fops fops = { - .lookup = afr_lookup, - .open = afr_open, - .lk = afr_lk, - .flush = afr_flush, - .statfs = afr_statfs, - .fsync = afr_fsync, - .fsyncdir = afr_fsyncdir, - .xattrop = afr_xattrop, - .fxattrop = afr_fxattrop, - .inodelk = afr_inodelk, - .finodelk = afr_finodelk, - .entrylk = afr_entrylk, - .fentrylk = afr_fentrylk, - - /* inode read */ - .access = afr_access, - .stat = afr_stat, - .fstat = afr_fstat, - .readlink = afr_readlink, - .getxattr = pump_getxattr, - .readv = afr_readv, - - /* inode write */ - .writev = afr_writev, - .truncate = afr_truncate, - .ftruncate = afr_ftruncate, - .setxattr = pump_setxattr, - .setattr = afr_setattr, - .fsetattr = afr_fsetattr, - .removexattr = afr_removexattr, - - /* dir read */ - .opendir = afr_opendir, - .readdir = afr_readdir, - .readdirp = afr_readdirp, - - /* dir write */ - .create = afr_create, - .mknod = afr_mknod, - .mkdir = afr_mkdir, - .unlink = afr_unlink, - .rmdir = afr_rmdir, - .link = afr_link, - .symlink = afr_symlink, - .rename = afr_rename, -}; - -struct xlator_dumpops dumpops = { - .priv = afr_priv_dump, -}; - - -struct xlator_cbks cbks = { - .release = afr_release, - .releasedir = afr_releasedir, -}; - -struct volume_options options[] = { - { .key = {NULL} }, -}; |
