From acdeed002d30209e0a058c2df0346d4f16c08994 Mon Sep 17 00:00:00 2001 From: Pavan Sondur Date: Fri, 6 Aug 2010 05:31:45 +0000 Subject: add pump xlator and changes for replace-brick Signed-off-by: Pavan Vilas Sondur Signed-off-by: Anand V. Avati BUG: 1235 (Bug for all pump/migrate commits) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=1235 --- xlators/cluster/afr/src/pump.c | 1817 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1817 insertions(+) create mode 100644 xlators/cluster/afr/src/pump.c (limited to 'xlators/cluster/afr/src/pump.c') diff --git a/xlators/cluster/afr/src/pump.c b/xlators/cluster/afr/src/pump.c new file mode 100644 index 00000000000..e69c028450b --- /dev/null +++ b/xlators/cluster/afr/src/pump.c @@ -0,0 +1,1817 @@ +/* + Copyright (c) 2007-2009 Gluster, Inc. + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + . +*/ + +#include +#include +#include + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "afr-common.c" + +pump_state_t +pump_get_state () +{ + xlator_t *this = NULL; + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + pump_state_t ret; + + this = THIS; + priv = this->private; + pump_priv = priv->pump_private; + + LOCK (&pump_priv->pump_state_lock); + { + ret = pump_priv->pump_state; + } + UNLOCK (&pump_priv->pump_state_lock); + + return ret; +} + +int +pump_change_state (xlator_t *this, pump_state_t state) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + pump_state_t state_old; + pump_state_t state_new; + + unsigned char * child_up = NULL; + int i = 0; + + + priv = this->private; + pump_priv = priv->pump_private; + + child_up = priv->child_up; + + assert (pump_priv); + + LOCK (&pump_priv->pump_state_lock); + { + state_old = pump_priv->pump_state; + state_new = state; + + pump_priv->pump_state = state; + + switch (pump_priv->pump_state) { + case PUMP_STATE_RESUME: + case PUMP_STATE_RUNNING: + case PUMP_STATE_PAUSE: + { + priv->pump_loaded = _gf_true; + i = 1; + + child_up[i] = 1; + + LOCK (&priv->lock); + { + priv->up_count++; + } + UNLOCK (&priv->lock); + + break; + } + case PUMP_STATE_ABORT: + { + priv->pump_loaded = _gf_false; + i = 1; + + child_up[i] = 0; + + LOCK (&priv->lock); + { + priv->down_count++; + } + UNLOCK (&priv->lock); + + LOCK (&pump_priv->resume_path_lock); + { + pump_priv->number_files_pumped = 0; + } + UNLOCK (&pump_priv->resume_path_lock); + + + break; + } + + } + } + UNLOCK (&pump_priv->pump_state_lock); + + gf_log (this->name, GF_LOG_DEBUG, + "Pump changing state from %d to %d", + state_old, + state_new); + + return 0; +} + +static int +pump_set_resume_path (xlator_t *this, const char *path) +{ + int ret = 0; + + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + assert (pump_priv); + + LOCK (&pump_priv->resume_path_lock); + { + pump_priv->resume_path = strdup (path); + if (!pump_priv->resume_path) + ret = -1; + } + UNLOCK (&pump_priv->resume_path_lock); + + return ret; +} + +static void +build_child_loc (loc_t *parent, loc_t *child, char *path, char *name) +{ + child->path = path; + child->name = name; + + child->parent = inode_ref (parent->inode); + child->inode = inode_new (parent->inode->table); +} + +static char * +build_file_path (loc_t *loc, gf_dirent_t *entry) +{ + xlator_t *this = NULL; + char *file_path = NULL; + int pathlen = 0; + int total_size = 0; + + this = THIS; + + pathlen = STRLEN_0 (loc->path); + + if (IS_ROOT_PATH (loc->path)) { + total_size = pathlen + entry->d_len; + file_path = GF_CALLOC (1, total_size, gf_afr_mt_char); + if (!file_path) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + return NULL; + } + + gf_log (this->name, GF_LOG_TRACE, + "constructing file path of size=%d" + "pathlen=%d, d_len=%d", + total_size, pathlen, + entry->d_len); + + snprintf(file_path, total_size, "%s%s", loc->path, entry->d_name); + + } else { + total_size = pathlen + entry->d_len + 1; /* for the extra '/' in the path */ + file_path = GF_CALLOC (1, total_size + 1, gf_afr_mt_char); + if (!file_path) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + return NULL; + } + + gf_log (this->name, GF_LOG_TRACE, + "constructing file path of size=%d" + "pathlen=%d, d_len=%d", + total_size, pathlen, + entry->d_len); + + snprintf(file_path, total_size, "%s/%s", loc->path, entry->d_name); + } + + gf_log (this->name, GF_LOG_TRACE, + "path=%s and d_name=%s", loc->path, entry->d_name); + gf_log (this->name, GF_LOG_TRACE, + "constructed file_path=%s of size=%d", file_path, total_size); + + return file_path; +} + +static int +pump_check_and_update_status (xlator_t *this) +{ + pump_state_t state; + int ret = -1; + + state = pump_get_state (); + + switch (state) { + + case PUMP_STATE_RESUME: + case PUMP_STATE_RUNNING: + { + ret = 0; + break; + } + case PUMP_STATE_PAUSE: + case PUMP_STATE_ABORT: + { + ret = -1; + break; + } + default: + { + gf_log (this->name, GF_LOG_DEBUG, + "Unknown pump state"); + ret = -1; + break; + } + + } + + return ret; +} + +static const char * +pump_get_resume_path (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + const char *resume_path = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + resume_path = pump_priv->resume_path; + + return resume_path; +} + +static int +pump_update_resume_state (xlator_t *this, const char *path) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + pump_state_t state; + const char *resume_path = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + state = pump_get_state (); + + if (state == PUMP_STATE_RESUME) { + resume_path = pump_get_resume_path (this); + if (strcmp (resume_path, "/") == 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Reached the resume path (/). Proceeding to change state" + " to running"); + pump_change_state (this, PUMP_STATE_RUNNING); + } else if (strcmp (resume_path, path) == 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Reached the resume path. Proceeding to change state" + " to running"); + pump_change_state (this, PUMP_STATE_RUNNING); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "Not yet hit the resume path:res-path=%s,path=%s", + resume_path, path); + } + } + + return 0; +} + +static gf_boolean_t +is_pump_traversal_allowed (xlator_t *this, const char *path) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + pump_state_t state; + const char *resume_path = NULL; + gf_boolean_t ret = _gf_true; + + priv = this->private; + pump_priv = priv->pump_private; + + state = pump_get_state (); + + if (state == PUMP_STATE_RESUME) { + resume_path = pump_get_resume_path (this); + if (strstr (resume_path, path)) { + gf_log (this->name, GF_LOG_DEBUG, + "On the right path to resumption path"); + ret = _gf_true; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "Not the right path to resuming=> ignoring traverse"); + ret = _gf_false; + } + } + + return ret; +} + +static int +pump_update_file_stats (xlator_t *this, long source_blocks, + long sink_blocks) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + LOCK (&pump_priv->resume_path_lock); + { + pump_priv->source_blocks = source_blocks; + pump_priv->sink_blocks = sink_blocks; + } + UNLOCK (&pump_priv->resume_path_lock); + + return 0; +} + +static int +pump_save_file_stats (xlator_t *this) +{ + afr_private_t *priv = NULL; + struct statvfs source_buf = {0, }; + struct statvfs sink_buf = {0, }; + loc_t loc; + int ret = -1; + + priv = this->private; + + assert (priv->root_inode); + + build_root_loc (priv->root_inode, &loc); + + ret = syncop_statfs (PUMP_SOURCE_CHILD (this), + &loc, &source_buf); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "source statfs failed"); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "source statfs succeeded"); + } + + + ret = syncop_statfs (PUMP_SOURCE_CHILD (this), + &loc, &sink_buf); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "sink statfs failed"); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "sink statfs succeeded"); + } + + pump_update_file_stats (this, + source_buf.f_blocks, + sink_buf.f_blocks); + + return 0; + +} +static int +pump_save_path (xlator_t *this, const char *path) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + pump_state_t state; + dict_t *dict = NULL; + loc_t loc; + int dict_ret = 0; + int ret = -1; + + state = pump_get_state (); + if (state != PUMP_STATE_RUNNING) + return 0; + + priv = this->private; + pump_priv = priv->pump_private; + + assert (priv->root_inode); + + build_root_loc (priv->root_inode, &loc); + + dict = dict_new (); + dict_ret = dict_set_str (dict, PUMP_PATH, (char *)path); + +// ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); + ret = 0; + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "setxattr failed - could not save path=%s", path); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "setxattr succeeded - saved path=%s", path); + gf_log (this->name, GF_LOG_DEBUG, + "Saving path for status info"); + + LOCK (&pump_priv->resume_path_lock); + { + pump_priv->number_files_pumped++; + + strncpy (pump_priv->current_file, path, + PATH_MAX); + } + UNLOCK (&pump_priv->resume_path_lock); + + } + + dict_unref (dict); + + return 0; +} + +static int +gf_pump_traverse_directory (loc_t *loc) +{ + xlator_t *this = NULL; + afr_private_t *priv = NULL; + fd_t *fd = NULL; + + off_t offset = 0; + loc_t entry_loc; + gf_dirent_t *entry = NULL; + gf_dirent_t *tmp = NULL; + gf_dirent_t entries; + + struct iatt iatt, parent; + dict_t *xattr_rsp; + + int source = 0; + + char *file_path = NULL; + int ret = 0; + + INIT_LIST_HEAD (&entries.list); + this = THIS; + priv = this->private; + + assert (loc->inode); + + fd = fd_create (loc->inode, PUMP_PID); + if (!fd) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to create fd for %s", loc->path); + goto out; + } + + ret = syncop_opendir (priv->children[source], loc, fd); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "opendir failed on %s", loc->path); + goto out; + } + + gf_log (this->name, GF_LOG_TRACE, + "pump opendir on %s returned=%d", + loc->path, ret); + + while (syncop_readdirp (priv->children[source], fd, 131072, offset, &entries)) { + + if (list_empty (&entries.list)) { + gf_log (this->name, GF_LOG_TRACE, + "no more entries in directory"); + goto out; + } + + list_for_each_entry_safe (entry, tmp, &entries.list, list) { + gf_log (this->name, GF_LOG_DEBUG, + "found readdir entry=%s", entry->d_name); + + file_path = build_file_path (loc, entry); + if (!file_path) { + gf_log (this->name, GF_LOG_DEBUG, + "file path construction failed"); + goto out; + } + + build_child_loc (loc, &entry_loc, file_path, entry->d_name); + + ret = syncop_lookup (this, &entry_loc, NULL, + &iatt, &xattr_rsp, &parent); + + entry_loc.ino = iatt.ia_ino; + entry_loc.inode->ino = iatt.ia_ino; + + gf_log (this->name, GF_LOG_DEBUG, + "lookup %s => %"PRId64, + entry_loc.path, + iatt.ia_ino); + + pump_update_resume_state (this, entry_loc.path); + + if (!IS_ENTRY_CWD(entry->d_name) && + !IS_ENTRY_PARENT (entry->d_name)) { + pump_save_path (this, entry_loc.path); + pump_save_file_stats (this); + } + + ret = pump_check_and_update_status (this); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Pump beginning to exit out"); + goto out; + } + + gf_log (this->name, GF_LOG_TRACE, + "type of file=%d, IFDIR=%d", + iatt.ia_type, IA_IFDIR); + + if (IA_ISDIR (iatt.ia_type) && !IS_ENTRY_CWD(entry->d_name) && + !IS_ENTRY_PARENT (entry->d_name)) { + if (is_pump_traversal_allowed (this, entry_loc.path)) { + gf_log (this->name, GF_LOG_TRACE, + "entering dir=%s", + entry->d_name); + gf_pump_traverse_directory (&entry_loc); + } + } + + offset = entry->d_off; + loc_wipe (&entry_loc); + } + + gf_dirent_free (&entries); + gf_log (this->name, GF_LOG_TRACE, + "offset incremented to %d", + (int32_t ) offset); + + } + +out: + return 0; + +} + +void +build_root_loc (inode_t *inode, loc_t *loc) +{ + loc->path = "/"; + loc->name = ""; + loc->inode = inode; + loc->ino = 1; + loc->inode->ino = 1; + +} + +static int +pump_update_resume_path (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + const char *resume_path = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + resume_path = pump_get_resume_path (this); + + if (resume_path) { + gf_log (this->name, GF_LOG_DEBUG, + "Found a path to resume from: %s", + resume_path); + + }else { + gf_log (this->name, GF_LOG_DEBUG, + "Did not find a path=> setting to '/'"); + pump_set_resume_path (this, "/"); + } + + pump_change_state (this, PUMP_STATE_RESUME); + + return 0; +} + +static int +pump_complete_migration (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + dict_t *dict = NULL; + pump_state_t state; + loc_t loc; + int dict_ret = 0; + int ret = -1; + + priv = this->private; + pump_priv = priv->pump_private; + + assert (priv->root_inode); + + build_root_loc (priv->root_inode, &loc); + + dict = dict_new (); + + state = pump_get_state (); + if (state == PUMP_STATE_RUNNING) { + gf_log (this->name, GF_LOG_DEBUG, + "Pump finished pumping"); + + pump_priv->pump_finished = _gf_true; + + dict_ret = dict_set_str (dict, PUMP_SOURCE_COMPLETE, "jargon"); + + ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "setxattr failed - while notifying source complete"); + } + dict_ret = dict_set_str (dict, PUMP_SINK_COMPLETE, "jargon"); + + ret = syncop_setxattr (PUMP_SINK_CHILD (this), &loc, dict, 0); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "setxattr failed - while notifying sink complete"); + } + } + + return 0; +} + +static int +pump_task (void *data) +{ + xlator_t *this = NULL; + afr_private_t *priv = NULL; + + + loc_t loc; + struct iatt iatt, parent; + dict_t *xattr_rsp; + + int ret = -1; + + this = THIS; + priv = this->private; + + assert (priv->root_inode); + + build_root_loc (priv->root_inode, &loc); + + ret = syncop_lookup (this, &loc, NULL, + &iatt, &xattr_rsp, &parent); + + gf_log (this->name, GF_LOG_TRACE, + "lookup: ino=%"PRId64", path=%s", + loc.ino, + loc.path); + + ret = pump_check_and_update_status (this); + if (ret < 0) { + goto out; + } + + pump_update_resume_path (this); + + gf_pump_traverse_directory (&loc); + + pump_complete_migration (this); +out: + return 0; +} + + +static int +pump_task_completion (int ret, void *data) +{ + xlator_t *this = NULL; + call_frame_t *frame = NULL; + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + this = THIS; + + frame = (call_frame_t *) data; + + priv = this->private; + pump_priv = priv->pump_private; + + inode_unref (priv->root_inode); + + gf_log (this->name, GF_LOG_DEBUG, + "Pump xlator exiting"); + return 0; +} + +int +pump_start (call_frame_t *frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + call_frame_t *pump_frame = NULL; + + int ret = -1; + + priv = this->private; + pump_priv = priv->pump_private; + + pump_frame = copy_frame (frame); + + if (!pump_frame->root->lk_owner) + pump_frame->root->lk_owner = PUMP_LK_OWNER; + + ret = synctask_new (pump_priv->env, pump_task, pump_task_completion, + pump_frame); + + if (ret != -1) { + gf_log (this->name, GF_LOG_TRACE, + "setting pump as started"); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "starting pump failed"); + pump_change_state (this, PUMP_STATE_ABORT); + } + + return ret; +} + +gf_boolean_t +is_pump_loaded (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + gf_boolean_t ret; + + priv = this->private; + pump_priv = priv->pump_private; + + if (priv->pump_loaded) { + gf_log (this->name, GF_LOG_DEBUG, + "Pump is already started"); + ret = _gf_true; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "Pump is not started"); + ret = _gf_false; + } + + return ret; + +} + +int32_t +pump_cmd_start_getxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + dict_t *dict) +{ + afr_local_t *local = NULL; + char *path = NULL; + + pump_state_t state; + int ret = 0; + int dict_ret = -1; + + local = frame->local; + + if (op_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "getxattr failed - changing pump state to RUNNING with '/'"); + path = "/"; + } else { + gf_log (this->name, GF_LOG_TRACE, + "getxattr succeeded"); + + dict_ret = dict_get_str (dict, PUMP_PATH, &path); + if (dict_ret < 0) + path = "/"; + } + + state = pump_get_state (); + if ((state == PUMP_STATE_RUNNING) || + (state == PUMP_STATE_RESUME)) { + gf_log (this->name, GF_LOG_ERROR, + "Pump is already started"); + ret = -1; + goto out; + } + + pump_set_resume_path (this, path); + pump_change_state (this, PUMP_STATE_RUNNING); + + ret = pump_start (frame, this); + +out: + local->op_ret = ret; + pump_command_reply (frame, this); + return 0; +} + +int +pump_execute_status (call_frame_t *frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + uint64_t number_files = 0; + + char filename[PATH_MAX]; + char *dict_str = NULL; + + int32_t op_ret = 0; + int32_t op_errno = 0; + + dict_t *dict = NULL; + int ret = -1; + + priv = this->private; + pump_priv = priv->pump_private; + + LOCK (&pump_priv->resume_path_lock); + { + number_files = pump_priv->number_files_pumped; + strncpy (filename, pump_priv->current_file, PATH_MAX); + } + UNLOCK (&pump_priv->resume_path_lock); + + dict_str = GF_CALLOC (1, PATH_MAX + 256, gf_afr_mt_char); + if (!dict_str) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } + + if (pump_priv->pump_finished) { + snprintf (dict_str, PATH_MAX + 256, "Number of files migrated = %"PRIu64" Migration complete ", + number_files); + } else { + snprintf (dict_str, PATH_MAX + 256, "Number of files migrated = %"PRIu64" Current file= %s ", + number_files, filename); + } + + dict = dict_new (); + + ret = dict_set_str (dict, PUMP_CMD_STATUS, dict_str); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "dict_set_str returned negative value"); + } + + op_ret = 0; + +out: + + AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict); + + dict_unref (dict); + GF_FREE (dict_str); + + return 0; +} + +int +pump_execute_pause (call_frame_t *frame, xlator_t *this) +{ + afr_local_t *local = NULL; + + local = frame->local; + + pump_change_state (this, PUMP_STATE_PAUSE); + + local->op_ret = 0; + pump_command_reply (frame, this); + + return 0; +} + +int +pump_execute_start (call_frame_t *frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + afr_local_t *local = NULL; + + int ret = 0; + loc_t loc; + + priv = this->private; + local = frame->local; + + if (!priv->root_inode) { + gf_log (this->name, GF_LOG_NORMAL, + "Pump xlator cannot be started without an initial lookup"); + ret = -1; + goto out; + } + + assert (priv->root_inode); + + build_root_loc (priv->root_inode, &loc); + + STACK_WIND (frame, + pump_cmd_start_getxattr_cbk, + PUMP_SOURCE_CHILD(this), + PUMP_SOURCE_CHILD(this)->fops->getxattr, + &loc, + PUMP_PATH); + + ret = 0; + +out: + if (ret < 0) { + local->op_ret = ret; + pump_command_reply (frame, this); + } + + return 0; +} + +int32_t +pump_cmd_abort_removexattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + afr_local_t *local = NULL; + + local = frame->local; + + if (op_ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Aborting pump failed. Please remove xattr" + PUMP_PATH "of the source child's '/'"); + local->op_ret = -1; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "remove xattr succeeded"); + local->op_ret = 0; + } + + pump_change_state (this, PUMP_STATE_ABORT); + + pump_command_reply (frame, this); + return 0; +} + +int +pump_execute_abort (call_frame_t *frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + afr_local_t *local = NULL; + pump_private_t *pump_priv = NULL; + loc_t root_loc; + int ret = 0; + + priv = this->private; + local = frame->local; + pump_priv = priv->pump_private; + + if (!priv->pump_loaded) { + gf_log (this->name, GF_LOG_ERROR, + "Trying to abort pump xlator which is not loaded"); + ret = -1; + goto out; + } + + assert (priv->root_inode); + + build_root_loc (priv->root_inode, &root_loc); + + STACK_WIND (frame, + pump_cmd_abort_removexattr_cbk, + PUMP_SOURCE_CHILD (this), + PUMP_SOURCE_CHILD (this)->fops->removexattr, + &root_loc, + PUMP_PATH); + + ret = 0; + +out: + if (ret < 0) { + local->op_ret = ret; + pump_command_reply (frame, this); + } + + return 0; +} + +gf_boolean_t +pump_command_status (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, PUMP_CMD_STATUS, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump status command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - status"); + ret = _gf_true; + +out: + return ret; + +} + +gf_boolean_t +pump_command_pause (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, PUMP_CMD_PAUSE, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump pause command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - pause"); + ret = _gf_true; + +out: + return ret; + +} + +gf_boolean_t +pump_command_abort (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, PUMP_CMD_ABORT, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump abort command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - abort"); + ret = _gf_true; + +out: + return ret; + +} + +gf_boolean_t +pump_command_start (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, PUMP_CMD_START, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump start command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - start"); + ret = _gf_true; + +out: + return ret; + +} + +struct _xattr_key { + char *key; + struct list_head list; +}; + +static void +__gather_xattr_keys (dict_t *dict, char *key, data_t *value, + void *data) +{ + struct list_head * list = data; + struct _xattr_key * xkey = NULL; + + if (!strncmp (key, AFR_XATTR_PREFIX, + strlen (AFR_XATTR_PREFIX))) { + + xkey = GF_CALLOC (1, sizeof (*xkey), gf_afr_mt_xattr_key); + if (!xkey) + return; + + xkey->key = key; + INIT_LIST_HEAD (&xkey->list); + + list_add_tail (&xkey->list, list); + } +} + +static void +__filter_xattrs (dict_t *dict) +{ + struct list_head keys; + + struct _xattr_key *key; + struct _xattr_key *tmp; + + INIT_LIST_HEAD (&keys); + + dict_foreach (dict, __gather_xattr_keys, + (void *) &keys); + + list_for_each_entry_safe (key, tmp, &keys, list) { + dict_del (dict, key->key); + + list_del_init (&key->list); + + GF_FREE (key); + } +} + +int32_t +pump_getxattr_cbk (call_frame_t *frame, void *cookie, + xlator_t *this, int32_t op_ret, int32_t op_errno, + dict_t *dict) +{ + afr_private_t * priv = NULL; + afr_local_t * local = NULL; + xlator_t ** children = NULL; + + int unwind = 1; + int last_tried = -1; + int this_try = -1; + int read_child = -1; + + priv = this->private; + children = priv->children; + + local = frame->local; + + read_child = (long) cookie; + + if (op_ret == -1) { + retry: + last_tried = local->cont.getxattr.last_tried; + + if (all_tried (last_tried, priv->child_count)) { + goto out; + } + this_try = ++local->cont.getxattr.last_tried; + + if (this_try == read_child) { + goto retry; + } + + unwind = 0; + STACK_WIND_COOKIE (frame, pump_getxattr_cbk, + (void *) (long) read_child, + children[this_try], + children[this_try]->fops->getxattr, + &local->loc, + local->cont.getxattr.name); + } + +out: + if (unwind) { + if (op_ret >= 0 && dict) + __filter_xattrs (dict); + + AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict); + } + + return 0; +} + +int32_t +pump_getxattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, const char *name) +{ + afr_private_t * priv = NULL; + xlator_t ** children = NULL; + int call_child = 0; + afr_local_t * local = NULL; + + int read_child = -1; + + int32_t op_ret = -1; + int32_t op_errno = 0; + + + VALIDATE_OR_GOTO (frame, out); + VALIDATE_OR_GOTO (this, out); + VALIDATE_OR_GOTO (this->private, out); + + priv = this->private; + VALIDATE_OR_GOTO (priv->children, out); + + children = priv->children; + + ALLOC_OR_GOTO (local, afr_local_t, out); + frame->local = local; + + if (name) { + if (!strncmp (name, AFR_XATTR_PREFIX, + strlen (AFR_XATTR_PREFIX))) { + + op_errno = ENODATA; + goto out; + } + + if (!strcmp (name, PUMP_CMD_STATUS)) { + gf_log (this->name, GF_LOG_DEBUG, + "Hit pump command - status"); + pump_execute_status (frame, this); + op_ret = 0; + goto out; + } + } + + read_child = afr_read_child (this, loc->inode); + + if (read_child >= 0) { + call_child = read_child; + + local->cont.getxattr.last_tried = -1; + } else { + call_child = afr_first_up_child (priv); + + if (call_child == -1) { + op_errno = ENOTCONN; + gf_log (this->name, GF_LOG_DEBUG, + "no child is up"); + goto out; + } + + local->cont.getxattr.last_tried = call_child; + } + + loc_copy (&local->loc, loc); + if (name) + local->cont.getxattr.name = gf_strdup (name); + + STACK_WIND_COOKIE (frame, pump_getxattr_cbk, + (void *) (long) call_child, + children[call_child], children[call_child]->fops->getxattr, + loc, name); + + op_ret = 0; +out: + if (op_ret == -1) { + AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, NULL); + } + return 0; +} + +static int +afr_setxattr_unwind (call_frame_t *frame, xlator_t *this) +{ + afr_local_t * local = NULL; + afr_private_t * priv = NULL; + call_frame_t *main_frame = NULL; + + local = frame->local; + priv = this->private; + + LOCK (&frame->lock); + { + if (local->transaction.main_frame) + main_frame = local->transaction.main_frame; + local->transaction.main_frame = NULL; + } + UNLOCK (&frame->lock); + + if (main_frame) { + AFR_STACK_UNWIND (setxattr, main_frame, + local->op_ret, local->op_errno) + } + return 0; +} + +static int +afr_setxattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno) +{ + afr_local_t * local = NULL; + afr_private_t * priv = NULL; + + int call_count = -1; + int need_unwind = 0; + + local = frame->local; + priv = this->private; + + LOCK (&frame->lock); + { + if (op_ret != -1) { + if (local->success_count == 0) { + local->op_ret = op_ret; + } + local->success_count++; + + if (local->success_count == priv->child_count) { + need_unwind = 1; + } + } + + local->op_errno = op_errno; + } + UNLOCK (&frame->lock); + + if (need_unwind) + local->transaction.unwind (frame, this); + + call_count = afr_frame_return (frame); + + if (call_count == 0) { + local->transaction.resume (frame, this); + } + + return 0; +} + +static int +afr_setxattr_wind (call_frame_t *frame, xlator_t *this) +{ + afr_local_t *local = NULL; + afr_private_t *priv = NULL; + + int call_count = -1; + int i = 0; + + local = frame->local; + priv = this->private; + + call_count = afr_up_children_count (priv->child_count, local->child_up); + + if (call_count == 0) { + local->transaction.resume (frame, this); + return 0; + } + + local->call_count = call_count; + + for (i = 0; i < priv->child_count; i++) { + if (local->child_up[i]) { + STACK_WIND_COOKIE (frame, afr_setxattr_wind_cbk, + (void *) (long) i, + priv->children[i], + priv->children[i]->fops->setxattr, + &local->loc, + local->cont.setxattr.dict, + local->cont.setxattr.flags); + + if (!--call_count) + break; + } + } + + return 0; +} + + +static int +afr_setxattr_done (call_frame_t *frame, xlator_t *this) +{ + afr_local_t * local = frame->local; + + local->transaction.unwind (frame, this); + + AFR_STACK_DESTROY (frame); + + return 0; +} + +int32_t +pump_setxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + STACK_UNWIND (frame, + op_ret, + op_errno); + return 0; +} + +int +pump_command_reply (call_frame_t *frame, xlator_t *this) +{ + afr_local_t *local = NULL; + + local = frame->local; + + if (local->op_ret < 0) + gf_log (this->name, GF_LOG_NORMAL, + "Command failed"); + else + gf_log (this->name, GF_LOG_NORMAL, + "Command succeeded"); + + AFR_STACK_UNWIND (setxattr, + frame, + local->op_ret, + local->op_errno); + + return 0; +} + +int +pump_parse_command (call_frame_t *frame, xlator_t *this, + afr_local_t *local, dict_t *dict) +{ + + int ret = -1; + + if (pump_command_start (this, dict)) { + frame->local = local; + ret = pump_execute_start (frame, this); + + } else if (pump_command_pause (this, dict)) { + frame->local = local; + ret = pump_execute_pause (frame, this); + + } else if (pump_command_abort (this, dict)) { + frame->local = local; + ret = pump_execute_abort (frame, this); + } + return ret; +} + +int +pump_setxattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, dict_t *dict, int32_t flags) +{ + afr_private_t * priv = NULL; + afr_local_t * local = NULL; + call_frame_t *transaction_frame = NULL; + + int ret = -1; + + int op_ret = -1; + int op_errno = 0; + + VALIDATE_OR_GOTO (frame, out); + VALIDATE_OR_GOTO (this, out); + VALIDATE_OR_GOTO (this->private, out); + + priv = this->private; + + ALLOC_OR_GOTO (local, afr_local_t, out); + + ret = AFR_LOCAL_INIT (local, priv); + if (ret < 0) { + op_errno = -ret; + goto out; + } + + ret = pump_parse_command (frame, this, + local, dict); + if (ret >= 0) { + op_ret = 0; + goto out; + } + + transaction_frame = copy_frame (frame); + if (!transaction_frame) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory."); + goto out; + } + + transaction_frame->local = local; + + local->op_ret = -1; + + local->cont.setxattr.dict = dict_ref (dict); + local->cont.setxattr.flags = flags; + + local->transaction.fop = afr_setxattr_wind; + local->transaction.done = afr_setxattr_done; + local->transaction.unwind = afr_setxattr_unwind; + + loc_copy (&local->loc, loc); + + local->transaction.main_frame = frame; + local->transaction.start = LLONG_MAX - 1; + local->transaction.len = 0; + + afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION); + + op_ret = 0; +out: + if (op_ret == -1) { + if (transaction_frame) + AFR_STACK_DESTROY (transaction_frame); + AFR_STACK_UNWIND (setxattr, frame, op_ret, op_errno); + } + + return 0; +} + +int32_t +mem_acct_init (xlator_t *this) +{ + int ret = -1; + + if (!this) + return ret; + + ret = xlator_mem_acct_init (this, gf_afr_mt_end + 1); + + if (ret != 0) { + gf_log(this->name, GF_LOG_ERROR, "Memory accounting init" + "failed"); + return ret; + } + + return ret; +} + +int32_t +notify (xlator_t *this, int32_t event, + void *data, ...) +{ + int ret = -1; + + switch (event) { + case GF_EVENT_CHILD_DOWN: + pump_change_state (this, PUMP_STATE_ABORT); + break; + } + + ret = afr_notify (this, event, data); + + return ret; +} + +int32_t +init (xlator_t *this) +{ + afr_private_t * priv = NULL; + pump_private_t *pump_priv = NULL; + int child_count = 0; + xlator_list_t * trav = NULL; + int i = 0; + int ret = -1; + int op_errno = 0; + + int source_child = 0; + + if (!this->children) { + gf_log (this->name, GF_LOG_ERROR, + "pump translator needs a source and sink" + "subvolumes defined."); + return -1; + } + + if (!this->parents) { + gf_log (this->name, GF_LOG_WARNING, + "Volume is dangling."); + } + + ALLOC_OR_GOTO (this->private, afr_private_t, out); + + priv = this->private; + + priv->read_child = source_child; + priv->favorite_child = source_child; + priv->background_self_heal_count = 0; + + priv->data_self_heal = 1; + priv->metadata_self_heal = 1; + priv->entry_self_heal = 1; + + priv->data_self_heal_algorithm = ""; + + priv->data_self_heal_window_size = 16; + + priv->data_change_log = 1; + priv->metadata_change_log = 1; + priv->entry_change_log = 1; + + /* Locking options */ + + /* Lock server count infact does not matter. Locks are held + on all subvolumes, in this case being the source + and the sink. + */ + + priv->data_lock_server_count = 2; + priv->metadata_lock_server_count = 2; + priv->entry_lock_server_count = 2; + + priv->strict_readdir = _gf_false; + + trav = this->children; + while (trav) { + child_count++; + trav = trav->next; + } + + priv->wait_count = 1; + + if (child_count != 2) { + gf_log (this->name, GF_LOG_ERROR, + "There should be exactly 2 children - one source " + "and one sink"); + return -1; + } + priv->child_count = child_count; + + LOCK_INIT (&priv->lock); + LOCK_INIT (&priv->read_child_lock); + + priv->child_up = GF_CALLOC (sizeof (unsigned char), child_count, + gf_afr_mt_char); + if (!priv->child_up) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory."); + op_errno = ENOMEM; + goto out; + } + + priv->children = GF_CALLOC (sizeof (xlator_t *), child_count, + gf_afr_mt_xlator_t); + if (!priv->children) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory."); + op_errno = ENOMEM; + goto out; + } + + priv->pending_key = GF_CALLOC (sizeof (*priv->pending_key), + child_count, + gf_afr_mt_char); + if (!priv->pending_key) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory."); + op_errno = ENOMEM; + goto out; + } + + trav = this->children; + i = 0; + while (i < child_count) { + priv->children[i] = trav->xlator; + + ret = asprintf (&priv->pending_key[i], "%s.%s", AFR_XATTR_PREFIX, + trav->xlator->name); + if (-1 == ret) { + gf_log (this->name, GF_LOG_ERROR, + "asprintf failed to set pending key"); + op_errno = ENOMEM; + goto out; + } + + trav = trav->next; + i++; + } + + priv->first_lookup = 1; + priv->root_inode = NULL; + + pump_priv = GF_CALLOC (1, sizeof (*pump_priv), + gf_afr_mt_pump_priv); + if (!pump_priv) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + op_errno = ENOMEM; + goto out; + } + + LOCK_INIT (&pump_priv->resume_path_lock); + LOCK_INIT (&pump_priv->pump_state_lock); + + pump_priv->resume_path = GF_CALLOC (1, PATH_MAX, + gf_afr_mt_char); + if (!pump_priv->resume_path) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + ret = -1; + goto out; + } + + pump_priv->env = syncenv_new (0); + if (!pump_priv->env) { + gf_log (this->name, GF_LOG_ERROR, + "Could not create new sync-environment"); + ret = -1; + goto out; + } + + priv->pump_private = pump_priv; + + pump_change_state (this, PUMP_STATE_ABORT); + + ret = 0; +out: + return ret; +} + +int +fini (xlator_t *this) +{ + return 0; +} + + +struct xlator_fops fops = { + .lookup = afr_lookup, + .open = afr_open, + .lk = afr_lk, + .flush = afr_flush, + .statfs = afr_statfs, + .fsync = afr_fsync, + .fsyncdir = afr_fsyncdir, + .xattrop = afr_xattrop, + .fxattrop = afr_fxattrop, + .inodelk = afr_inodelk, + .finodelk = afr_finodelk, + .entrylk = afr_entrylk, + .fentrylk = afr_fentrylk, + + /* inode read */ + .access = afr_access, + .stat = afr_stat, + .fstat = afr_fstat, + .readlink = afr_readlink, + .getxattr = pump_getxattr, + .readv = afr_readv, + + /* inode write */ + .writev = afr_writev, + .truncate = afr_truncate, + .ftruncate = afr_ftruncate, + .setxattr = pump_setxattr, + .setattr = afr_setattr, + .fsetattr = afr_fsetattr, + .removexattr = afr_removexattr, + + /* dir read */ + .opendir = afr_opendir, + .readdir = afr_readdir, + .readdirp = afr_readdirp, + + /* dir write */ + .create = afr_create, + .mknod = afr_mknod, + .mkdir = afr_mkdir, + .unlink = afr_unlink, + .rmdir = afr_rmdir, + .link = afr_link, + .symlink = afr_symlink, + .rename = afr_rename, +}; + +struct xlator_dumpops dumpops = { + .priv = afr_priv_dump, +}; + + +struct xlator_cbks cbks = { + .release = afr_release, + .releasedir = afr_releasedir, +}; + +struct volume_options options[] = { + { .key = {NULL} }, +}; -- cgit