diff options
Diffstat (limited to 'xlators/cluster/afr/src/pump.c')
| -rw-r--r-- | xlators/cluster/afr/src/pump.c | 1605 |
1 files changed, 1196 insertions, 409 deletions
diff --git a/xlators/cluster/afr/src/pump.c b/xlators/cluster/afr/src/pump.c index 5b31d389b..a7f72fb30 100644 --- a/xlators/cluster/afr/src/pump.c +++ b/xlators/cluster/afr/src/pump.c @@ -1,25 +1,17 @@ /* - Copyright (c) 2007-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ #include <unistd.h> #include <sys/time.h> #include <stdlib.h> +#include <fnmatch.h> #ifndef _CONFIG_H #define _CONFIG_H @@ -27,6 +19,16 @@ #endif #include "afr-common.c" +#include "defaults.c" +#include "glusterfs.h" + +static uint64_t pump_pid = 0; +static inline void +pump_fill_loc_info (loc_t *loc, struct iatt *iatt, struct iatt *parent) +{ + afr_update_loc_gfids (loc, iatt, parent); + uuid_copy (loc->inode->gfid, iatt->ia_gfid); +} static int pump_mark_start_pending (xlator_t *this) @@ -134,92 +136,24 @@ pump_set_resume_path (xlator_t *this, const char *path) priv = this->private; pump_priv = priv->pump_private; - assert (pump_priv); + GF_ASSERT (pump_priv); LOCK (&pump_priv->resume_path_lock); { - pump_priv->resume_path = strdup (path); - if (!pump_priv->resume_path) - ret = -1; + strncpy (pump_priv->resume_path, path, strlen (path) + 1); } UNLOCK (&pump_priv->resume_path_lock); return ret; } -static void -build_child_loc (loc_t *parent, loc_t *child, char *path, char *name) -{ - child->path = path; - child->name = name; - - child->parent = inode_ref (parent->inode); - child->inode = inode_new (parent->inode->table); -} - -static char * -build_file_path (loc_t *loc, gf_dirent_t *entry) -{ - xlator_t *this = NULL; - char *file_path = NULL; - int pathlen = 0; - int total_size = 0; - - this = THIS; - - pathlen = STRLEN_0 (loc->path); - - if (IS_ROOT_PATH (loc->path)) { - total_size = pathlen + entry->d_len; - file_path = GF_CALLOC (1, total_size, gf_afr_mt_char); - if (!file_path) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); - return NULL; - } - - gf_log (this->name, GF_LOG_TRACE, - "constructing file path of size=%d" - "pathlen=%d, d_len=%d", - total_size, pathlen, - entry->d_len); - - snprintf(file_path, total_size, "%s%s", loc->path, entry->d_name); - - } else { - total_size = pathlen + entry->d_len + 1; /* for the extra '/' in the path */ - file_path = GF_CALLOC (1, total_size + 1, gf_afr_mt_char); - if (!file_path) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); - return NULL; - } - - gf_log (this->name, GF_LOG_TRACE, - "constructing file path of size=%d" - "pathlen=%d, d_len=%d", - total_size, pathlen, - entry->d_len); - - snprintf(file_path, total_size, "%s/%s", loc->path, entry->d_name); - } - - gf_log (this->name, GF_LOG_TRACE, - "path=%s and d_name=%s", loc->path, entry->d_name); - gf_log (this->name, GF_LOG_TRACE, - "constructed file_path=%s of size=%d", file_path, total_size); - - return file_path; -} - static int pump_save_path (xlator_t *this, const char *path) { afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; pump_state_t state; dict_t *dict = NULL; - loc_t loc; + loc_t loc = {0}; int dict_ret = 0; int ret = -1; @@ -228,29 +162,30 @@ pump_save_path (xlator_t *this, const char *path) return 0; priv = this->private; - pump_priv = priv->pump_private; - assert (priv->root_inode); + GF_ASSERT (priv->root_inode); - build_root_loc (priv->root_inode, &loc); + afr_build_root_loc (this, &loc); dict = dict_new (); dict_ret = dict_set_str (dict, PUMP_PATH, (char *)path); + if (dict_ret) + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to set the key %s", path, PUMP_PATH); ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, + gf_log (this->name, GF_LOG_INFO, "setxattr failed - could not save path=%s", path); } else { gf_log (this->name, GF_LOG_DEBUG, "setxattr succeeded - saved path=%s", path); - gf_log (this->name, GF_LOG_DEBUG, - "Saving path for status info"); } dict_unref (dict); + loc_wipe (&loc); return 0; } @@ -313,15 +248,9 @@ pump_get_resume_path (xlator_t *this) static int pump_update_resume_state (xlator_t *this, const char *path) { - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - pump_state_t state; const char *resume_path = NULL; - priv = this->private; - pump_priv = priv->pump_private; - state = pump_get_state (); if (state == PUMP_STATE_RESUME) { @@ -349,16 +278,10 @@ pump_update_resume_state (xlator_t *this, const char *path) static gf_boolean_t is_pump_traversal_allowed (xlator_t *this, const char *path) { - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - pump_state_t state; const char *resume_path = NULL; gf_boolean_t ret = _gf_true; - priv = this->private; - pump_priv = priv->pump_private; - state = pump_get_state (); if (state == PUMP_STATE_RESUME) { @@ -401,38 +324,33 @@ pump_save_file_stats (xlator_t *this, const char *path) static int gf_pump_traverse_directory (loc_t *loc) { - xlator_t *this = NULL; - afr_private_t *priv = NULL; - fd_t *fd = NULL; - - off_t offset = 0; - loc_t entry_loc; - gf_dirent_t *entry = NULL; - gf_dirent_t *tmp = NULL; - gf_dirent_t entries; - - struct iatt iatt, parent; - dict_t *xattr_rsp; - - int source = 0; - - char *file_path = NULL; - int ret = 0; + xlator_t *this = NULL; + fd_t *fd = NULL; + off_t offset = 0; + loc_t entry_loc = {0}; + gf_dirent_t *entry = NULL; + gf_dirent_t *tmp = NULL; + gf_dirent_t entries; + struct iatt iatt = {0}; + struct iatt parent = {0}; + dict_t *xattr_rsp = NULL; + int ret = 0; + gf_boolean_t is_directory_empty = _gf_true; + gf_boolean_t free_entries = _gf_false; INIT_LIST_HEAD (&entries.list); this = THIS; - priv = this->private; - assert (loc->inode); + GF_ASSERT (loc->inode); - fd = fd_create (loc->inode, PUMP_PID); + fd = fd_create (loc->inode, pump_pid); if (!fd) { gf_log (this->name, GF_LOG_ERROR, "Failed to create fd for %s", loc->path); goto out; } - ret = syncop_opendir (priv->children[source], loc, fd); + ret = syncop_opendir (this, loc, fd); if (ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "opendir failed on %s", loc->path); @@ -443,7 +361,8 @@ gf_pump_traverse_directory (loc_t *loc) "pump opendir on %s returned=%d", loc->path, ret); - while (syncop_readdirp (priv->children[source], fd, 131072, offset, &entries)) { + while (syncop_readdirp (this, fd, 131072, offset, NULL, &entries)) { + free_entries = _gf_true; if (list_empty (&entries.list)) { gf_log (this->name, GF_LOG_TRACE, @@ -455,93 +374,94 @@ gf_pump_traverse_directory (loc_t *loc) gf_log (this->name, GF_LOG_DEBUG, "found readdir entry=%s", entry->d_name); - file_path = build_file_path (loc, entry); - if (!file_path) { - gf_log (this->name, GF_LOG_DEBUG, - "file path construction failed"); - goto out; - } - - build_child_loc (loc, &entry_loc, file_path, entry->d_name); - - ret = syncop_lookup (this, &entry_loc, NULL, - &iatt, &xattr_rsp, &parent); - - entry_loc.ino = iatt.ia_ino; - entry_loc.inode->ino = iatt.ia_ino; - - gf_log (this->name, GF_LOG_DEBUG, - "lookup %s => %"PRId64, - entry_loc.path, - iatt.ia_ino); - - pump_update_resume_state (this, entry_loc.path); - - if (!IS_ENTRY_CWD(entry->d_name) && - !IS_ENTRY_PARENT (entry->d_name)) { - pump_save_path (this, entry_loc.path); - pump_save_file_stats (this, entry_loc.path); + offset = entry->d_off; + if (uuid_is_null (entry->d_stat.ia_gfid)) { + gf_log (this->name, GF_LOG_WARNING, "%s/%s: No " + "gfid present skipping", + loc->path, entry->d_name); + continue; } - - ret = pump_check_and_update_status (this); - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "Pump beginning to exit out"); + loc_wipe (&entry_loc); + ret = afr_build_child_loc (this, &entry_loc, loc, + entry->d_name); + if (ret) goto out; - } - - gf_log (this->name, GF_LOG_TRACE, - "type of file=%d, IFDIR=%d", - iatt.ia_type, IA_IFDIR); - if (IA_ISDIR (iatt.ia_type) && !IS_ENTRY_CWD(entry->d_name) && + if (!IS_ENTRY_CWD (entry->d_name) && !IS_ENTRY_PARENT (entry->d_name)) { - if (is_pump_traversal_allowed (this, entry_loc.path)) { - gf_log (this->name, GF_LOG_TRACE, - "entering dir=%s", - entry->d_name); - gf_pump_traverse_directory (&entry_loc); - } - } - offset = entry->d_off; - loc_wipe (&entry_loc); + is_directory_empty = _gf_false; + gf_log (this->name, GF_LOG_DEBUG, + "lookup %s => %"PRId64, + entry_loc.path, + iatt.ia_ino); + + ret = syncop_lookup (this, &entry_loc, NULL, + &iatt, &xattr_rsp, &parent); + + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "%s: lookup failed", + entry_loc.path); + continue; + } + pump_fill_loc_info (&entry_loc, &iatt, + &parent); + + pump_update_resume_state (this, entry_loc.path); + + pump_save_path (this, entry_loc.path); + pump_save_file_stats (this, entry_loc.path); + + ret = pump_check_and_update_status (this); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Pump beginning to exit out"); + goto out; + } + + if (IA_ISDIR (iatt.ia_type)) { + if (is_pump_traversal_allowed (this, entry_loc.path)) { + gf_log (this->name, GF_LOG_TRACE, + "entering dir=%s", + entry->d_name); + gf_pump_traverse_directory (&entry_loc); + } + } + } } gf_dirent_free (&entries); + free_entries = _gf_false; gf_log (this->name, GF_LOG_TRACE, "offset incremented to %d", (int32_t ) offset); } -out: - return 0; - -} + ret = syncop_close (fd); + if (ret < 0) + gf_log (this->name, GF_LOG_DEBUG, "closing the fd failed"); -void -build_root_loc (inode_t *inode, loc_t *loc) -{ - loc->path = "/"; - loc->name = ""; - loc->inode = inode; - loc->ino = 1; - loc->inode->ino = 1; + if (is_directory_empty && IS_ROOT_PATH (loc->path)) { + pump_change_state (this, PUMP_STATE_RUNNING); + gf_log (this->name, GF_LOG_INFO, "Empty source brick. " + "Nothing to be done."); + } +out: + if (entry_loc.path) + loc_wipe (&entry_loc); + if (free_entries) + gf_dirent_free (&entries); + return 0; } static int pump_update_resume_path (xlator_t *this) { - afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; - const char *resume_path = NULL; - priv = this->private; - pump_priv = priv->pump_private; - resume_path = pump_get_resume_path (this); if (resume_path) { @@ -560,6 +480,39 @@ pump_update_resume_path (xlator_t *this) return 0; } +static int32_t +pump_xattr_cleaner (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + afr_private_t *priv = NULL; + loc_t loc = {0}; + int i = 0; + int ret = 0; + int source = 0; + int sink = 1; + + priv = this->private; + + afr_build_root_loc (this, &loc); + + ret = syncop_removexattr (priv->children[source], &loc, + PUMP_PATH); + + ret = syncop_removexattr (priv->children[sink], &loc, + PUMP_SINK_COMPLETE); + + for (i = 0; i < priv->child_count; i++) { + ret = syncop_removexattr (priv->children[i], &loc, + PUMP_SOURCE_COMPLETE); + if (ret) + gf_log (this->name, GF_LOG_DEBUG, "removexattr " + "failed with %s", strerror (errno)); + } + + loc_wipe (&loc); + return pump_command_reply (frame, this); +} + static int pump_complete_migration (xlator_t *this) { @@ -567,16 +520,16 @@ pump_complete_migration (xlator_t *this) pump_private_t *pump_priv = NULL; dict_t *dict = NULL; pump_state_t state; - loc_t loc; + loc_t loc = {0}; int dict_ret = 0; int ret = -1; priv = this->private; pump_priv = priv->pump_private; - assert (priv->root_inode); + GF_ASSERT (priv->root_inode); - build_root_loc (priv->root_inode, &loc); + afr_build_root_loc (this, &loc); dict = dict_new (); @@ -588,6 +541,10 @@ pump_complete_migration (xlator_t *this) pump_priv->pump_finished = _gf_true; dict_ret = dict_set_str (dict, PUMP_SOURCE_COMPLETE, "jargon"); + if (dict_ret) + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to set the key %s", + loc.path, PUMP_SOURCE_COMPLETE); ret = syncop_setxattr (PUMP_SOURCE_CHILD (this), &loc, dict, 0); if (ret < 0) { @@ -595,44 +552,97 @@ pump_complete_migration (xlator_t *this) "setxattr failed - while notifying source complete"); } dict_ret = dict_set_str (dict, PUMP_SINK_COMPLETE, "jargon"); + if (dict_ret) + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to set the key %s", + loc.path, PUMP_SINK_COMPLETE); ret = syncop_setxattr (PUMP_SINK_CHILD (this), &loc, dict, 0); if (ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "setxattr failed - while notifying sink complete"); } + + pump_save_path (this, "/"); + + } else if (state == PUMP_STATE_ABORT) { + gf_log (this->name, GF_LOG_DEBUG, "Starting cleanup " + "of pump internal xattrs"); + call_resume (pump_priv->cleaner); } + loc_wipe (&loc); return 0; } static int +pump_lookup_sink (loc_t *loc) +{ + xlator_t *this = NULL; + struct iatt iatt, parent; + dict_t *xattr_rsp; + dict_t *xattr_req = NULL; + int ret = 0; + + this = THIS; + + xattr_req = dict_new (); + + ret = afr_set_root_gfid (xattr_req); + if (ret) + goto out; + + ret = syncop_lookup (PUMP_SINK_CHILD (this), loc, + xattr_req, &iatt, &xattr_rsp, &parent); + + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "Lookup on sink child failed"); + goto out; + } + +out: + if (xattr_req) + dict_unref (xattr_req); + + return ret; +} + +static int pump_task (void *data) { xlator_t *this = NULL; afr_private_t *priv = NULL; - loc_t loc; + loc_t loc = {0}; struct iatt iatt, parent; - dict_t *xattr_rsp; + dict_t *xattr_rsp = NULL; + dict_t *xattr_req = NULL; int ret = -1; this = THIS; priv = this->private; - assert (priv->root_inode); + GF_ASSERT (priv->root_inode); - build_root_loc (priv->root_inode, &loc); + afr_build_root_loc (this, &loc); + xattr_req = dict_new (); + if (!xattr_req) { + gf_log (this->name, GF_LOG_DEBUG, + "Out of memory"); + ret = -1; + goto out; + } - ret = syncop_lookup (this, &loc, NULL, + afr_set_root_gfid (xattr_req); + ret = syncop_lookup (this, &loc, xattr_req, &iatt, &xattr_rsp, &parent); gf_log (this->name, GF_LOG_TRACE, - "lookup: ino=%"PRId64", path=%s", - loc.ino, - loc.path); + "lookup: path=%s gfid=%s", + loc.path, uuid_utoa (loc.inode->gfid)); ret = pump_check_and_update_status (this); if (ret < 0) { @@ -641,30 +651,37 @@ pump_task (void *data) pump_update_resume_path (this); + afr_set_root_gfid (xattr_req); + ret = pump_lookup_sink (&loc); + if (ret) { + pump_update_resume_path (this); + goto out; + } + gf_pump_traverse_directory (&loc); pump_complete_migration (this); out: + if (xattr_req) + dict_unref (xattr_req); + + loc_wipe (&loc); return 0; } static int -pump_task_completion (int ret, void *data) +pump_task_completion (int ret, call_frame_t *sync_frame, void *data) { xlator_t *this = NULL; - call_frame_t *frame = NULL; afr_private_t *priv = NULL; - pump_private_t *pump_priv = NULL; this = THIS; - frame = (call_frame_t *) data; - priv = this->private; - pump_priv = priv->pump_private; inode_unref (priv->root_inode); + STACK_DESTROY (sync_frame->root); gf_log (this->name, GF_LOG_DEBUG, "Pump xlator exiting"); @@ -682,21 +699,25 @@ pump_start (call_frame_t *pump_frame, xlator_t *this) priv = this->private; pump_priv = priv->pump_private; - if (!pump_frame->root->lk_owner) - pump_frame->root->lk_owner = PUMP_LK_OWNER; + afr_set_lk_owner (pump_frame, this, pump_frame->root); + pump_pid = (uint64_t) (unsigned long)pump_frame->root; - ret = synctask_new (pump_priv->env, pump_task, pump_task_completion, - pump_frame); - - if (ret != -1) { - gf_log (this->name, GF_LOG_TRACE, - "setting pump as started"); - } else { - gf_log (this->name, GF_LOG_DEBUG, + ret = synctask_new (pump_priv->env, pump_task, + pump_task_completion, + pump_frame, NULL); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, "starting pump failed"); pump_change_state (this, PUMP_STATE_ABORT); + goto out; } + gf_log (this->name, GF_LOG_DEBUG, + "setting pump as started lk_owner: %s %"PRIu64, + lkowner_utoa (&pump_frame->root->lk_owner), pump_pid); + + priv->use_afr_in_pump = 1; +out: return ret; } @@ -727,9 +748,10 @@ pump_cmd_start_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno) + int32_t op_errno, dict_t *xdata) { + call_frame_t *prev = NULL; afr_local_t *local = NULL; int ret = 0; @@ -749,6 +771,11 @@ pump_cmd_start_setxattr_cbk (call_frame_t *frame, pump_mark_start_pending (this); + /* send the PARENT_UP as pump is ready now */ + prev = cookie; + if (prev && prev->this) + prev->this->notify (prev->this, GF_EVENT_PARENT_UP, this); + out: local->op_ret = ret; pump_command_reply (frame, this); @@ -762,8 +789,9 @@ pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) afr_local_t *local = NULL; afr_private_t *priv = NULL; dict_t *dict = NULL; - char *dst_brick = NULL; - loc_t loc; + data_t *data = NULL; + char *clnt_cmd = NULL; + loc_t loc = {0}; int ret = 0; @@ -772,10 +800,11 @@ pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) GF_ASSERT (priv->root_inode); - build_root_loc (priv->root_inode, &loc); + afr_build_root_loc (this, &loc); - ret = dict_get_str (local->dict, PUMP_CMD_START, &dst_brick); - if (ret < 0) { + data = data_ref (dict_get (local->dict, RB_PUMP_CMD_START)); + if (!data) { + ret = -1; gf_log (this->name, GF_LOG_ERROR, "Could not get destination brick value"); goto out; @@ -783,17 +812,22 @@ pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) dict = dict_new (); if (!dict) { - gf_log (this->name, GF_LOG_ERROR, - "Out of memory"); ret = -1; goto out; } - GF_ASSERT (dst_brick); - gf_log (this->name, GF_LOG_DEBUG, - "Got destination brick as %s", dst_brick); + clnt_cmd = GF_CALLOC (1, data->len+1, gf_common_mt_char); + if (!clnt_cmd) { + ret = -1; + goto out; + } + + memcpy (clnt_cmd, data->data, data->len); + clnt_cmd[data->len] = '\0'; + gf_log (this->name, GF_LOG_DEBUG, "Got destination brick %s\n", + clnt_cmd); - ret = dict_set_str (dict, CLIENT_CMD_CONNECT, dst_brick); + ret = dict_set_dynstr (dict, CLIENT_CMD_CONNECT, clnt_cmd); if (ret < 0) { gf_log (this->name, GF_LOG_ERROR, "Could not inititiate destination brick " @@ -807,12 +841,21 @@ pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) PUMP_SINK_CHILD(this)->fops->setxattr, &loc, dict, - 0); + 0, NULL); ret = 0; - dict_unref (dict); out: + if (dict) + dict_unref (dict); + + if (data) + data_unref (data); + + if (ret && clnt_cmd) + GF_FREE (clnt_cmd); + + loc_wipe (&loc); return ret; } @@ -832,7 +875,7 @@ pump_cmd_start_getxattr_cbk (call_frame_t *frame, xlator_t *this, int32_t op_ret, int32_t op_errno, - dict_t *dict) + dict_t *dict, dict_t *xdata) { afr_local_t *local = NULL; char *path = NULL; @@ -876,6 +919,8 @@ pump_cmd_start_getxattr_cbk (call_frame_t *frame, else { /* We're re-starting pump from a previous pause */ + gf_log (this->name, GF_LOG_DEBUG, + "about to start synctask"); ret = pump_start_synctask (this); need_unwind = 1; } @@ -897,6 +942,7 @@ pump_execute_status (call_frame_t *frame, xlator_t *this) uint64_t number_files = 0; char filename[PATH_MAX]; + char summary[PATH_MAX+256]; char *dict_str = NULL; int32_t op_ret = 0; @@ -925,28 +971,35 @@ pump_execute_status (call_frame_t *frame, xlator_t *this) } if (pump_priv->pump_finished) { - snprintf (dict_str, PATH_MAX + 256, "Number of files migrated = %"PRIu64" Migration complete ", - number_files); + snprintf (summary, PATH_MAX+256, + "no_of_files=%"PRIu64, number_files); } else { - snprintf (dict_str, PATH_MAX + 256, "Number of files migrated = %"PRIu64" Current file= %s ", - number_files, filename); + snprintf (summary, PATH_MAX+256, + "no_of_files=%"PRIu64":current_file=%s", + number_files, filename); } + snprintf (dict_str, PATH_MAX+256, "status=%d:%s", + (pump_priv->pump_finished)?1:0, summary); dict = dict_new (); - ret = dict_set_str (dict, PUMP_CMD_STATUS, dict_str); + ret = dict_set_dynstr (dict, RB_PUMP_CMD_STATUS, dict_str); if (ret < 0) { gf_log (this->name, GF_LOG_DEBUG, - "dict_set_str returned negative value"); + "dict_set_dynstr returned negative value"); + } else { + dict_str = NULL; } op_ret = 0; out: - AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict); + AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict, NULL); + + if (dict) + dict_unref (dict); - dict_unref (dict); GF_FREE (dict_str); return 0; @@ -974,7 +1027,7 @@ pump_execute_start (call_frame_t *frame, xlator_t *this) afr_local_t *local = NULL; int ret = 0; - loc_t loc; + loc_t loc = {0}; priv = this->private; local = frame->local; @@ -989,14 +1042,14 @@ pump_execute_start (call_frame_t *frame, xlator_t *this) GF_ASSERT (priv->root_inode); - build_root_loc (priv->root_inode, &loc); + afr_build_root_loc (this, &loc); STACK_WIND (frame, pump_cmd_start_getxattr_cbk, PUMP_SOURCE_CHILD(this), PUMP_SOURCE_CHILD(this)->fops->getxattr, &loc, - PUMP_PATH); + PUMP_PATH, NULL); ret = 0; @@ -1006,22 +1059,98 @@ out: pump_command_reply (frame, this); } + loc_wipe (&loc); return 0; } +static int +pump_cleanup_helper (void *data) { + call_frame_t *frame = data; + + pump_xattr_cleaner (frame, 0, frame->this, 0, 0, NULL); + + return 0; +} + +static int +pump_cleanup_done (int ret, call_frame_t *sync_frame, void *data) +{ + STACK_DESTROY (sync_frame->root); + + return 0; +} + +int +pump_execute_commit (call_frame_t *frame, xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + afr_local_t *local = NULL; + call_frame_t *sync_frame = NULL; + int ret = 0; + + priv = this->private; + pump_priv = priv->pump_private; + local = frame->local; + + local->op_ret = 0; + if (pump_priv->pump_finished) { + pump_change_state (this, PUMP_STATE_COMMIT); + sync_frame = create_frame (this, this->ctx->pool); + ret = synctask_new (pump_priv->env, pump_cleanup_helper, + pump_cleanup_done, sync_frame, frame); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, "Couldn't create " + "synctask for cleaning up xattrs."); + } + + } else { + gf_log (this->name, GF_LOG_ERROR, "Commit can't proceed. " + "Migration in progress"); + local->op_ret = -1; + local->op_errno = EINPROGRESS; + pump_command_reply (frame, this); + } + + return 0; +} int pump_execute_abort (call_frame_t *frame, xlator_t *this) { - afr_private_t *priv = NULL; - afr_local_t *local = NULL; + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + afr_local_t *local = NULL; + call_frame_t *sync_frame = NULL; + int ret = 0; - priv = this->private; - local = frame->local; + priv = this->private; + pump_priv = priv->pump_private; + local = frame->local; pump_change_state (this, PUMP_STATE_ABORT); + LOCK (&pump_priv->resume_path_lock); + { + pump_priv->number_files_pumped = 0; + pump_priv->current_file[0] = '\0'; + } + UNLOCK (&pump_priv->resume_path_lock); + local->op_ret = 0; - pump_command_reply (frame, this); + if (pump_priv->pump_finished) { + sync_frame = create_frame (this, this->ctx->pool); + ret = synctask_new (pump_priv->env, pump_cleanup_helper, + pump_cleanup_done, sync_frame, frame); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, "Couldn't create " + "synctask for cleaning up xattrs."); + } + + } else { + pump_priv->cleaner = fop_setxattr_cbk_stub (frame, + pump_xattr_cleaner, + 0, 0, NULL); + } return 0; } @@ -1033,7 +1162,7 @@ pump_command_status (xlator_t *this, dict_t *dict) int dict_ret = -1; int ret = _gf_true; - dict_ret = dict_get_str (dict, PUMP_CMD_STATUS, &cmd); + dict_ret = dict_get_str (dict, RB_PUMP_CMD_STATUS, &cmd); if (dict_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Not a pump status command"); @@ -1057,7 +1186,7 @@ pump_command_pause (xlator_t *this, dict_t *dict) int dict_ret = -1; int ret = _gf_true; - dict_ret = dict_get_str (dict, PUMP_CMD_PAUSE, &cmd); + dict_ret = dict_get_str (dict, RB_PUMP_CMD_PAUSE, &cmd); if (dict_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Not a pump pause command"); @@ -1075,13 +1204,37 @@ out: } gf_boolean_t +pump_command_commit (xlator_t *this, dict_t *dict) +{ + char *cmd = NULL; + int dict_ret = -1; + int ret = _gf_true; + + dict_ret = dict_get_str (dict, RB_PUMP_CMD_COMMIT, &cmd); + if (dict_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Not a pump commit command"); + ret = _gf_false; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Hit a pump command - commit"); + ret = _gf_true; + +out: + return ret; + +} + +gf_boolean_t pump_command_abort (xlator_t *this, dict_t *dict) { char *cmd = NULL; int dict_ret = -1; int ret = _gf_true; - dict_ret = dict_get_str (dict, PUMP_CMD_ABORT, &cmd); + dict_ret = dict_get_str (dict, RB_PUMP_CMD_ABORT, &cmd); if (dict_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Not a pump abort command"); @@ -1105,7 +1258,7 @@ pump_command_start (xlator_t *this, dict_t *dict) int dict_ret = -1; int ret = _gf_true; - dict_ret = dict_get_str (dict, PUMP_CMD_START, &cmd); + dict_ret = dict_get_str (dict, RB_PUMP_CMD_START, &cmd); if (dict_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, "Not a pump start command"); @@ -1127,7 +1280,7 @@ struct _xattr_key { struct list_head list; }; -static void +static int __gather_xattr_keys (dict_t *dict, char *key, data_t *value, void *data) { @@ -1139,13 +1292,14 @@ __gather_xattr_keys (dict_t *dict, char *key, data_t *value, xkey = GF_CALLOC (1, sizeof (*xkey), gf_afr_mt_xattr_key); if (!xkey) - return; + return -1; xkey->key = key; INIT_LIST_HEAD (&xkey->list); list_add_tail (&xkey->list, list); } + return 0; } static void @@ -1173,16 +1327,17 @@ __filter_xattrs (dict_t *dict) int32_t pump_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, - dict_t *dict) + dict_t *dict, dict_t *xdata) { - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - xlator_t ** children = NULL; + afr_private_t *priv = NULL; + afr_local_t *local = NULL; + xlator_t **children = NULL; + int unwind = 1; + int32_t *last_index = NULL; + int32_t next_call_child = -1; + int32_t read_child = -1; + int32_t *fresh_children = NULL; - int unwind = 1; - int last_tried = -1; - int this_try = -1; - int read_child = -1; priv = this->private; children = priv->children; @@ -1192,25 +1347,22 @@ pump_getxattr_cbk (call_frame_t *frame, void *cookie, read_child = (long) cookie; if (op_ret == -1) { - retry: - last_tried = local->cont.getxattr.last_tried; - - if (all_tried (last_tried, priv->child_count)) { - goto out; - } - this_try = ++local->cont.getxattr.last_tried; - - if (this_try == read_child) { - goto retry; - } + last_index = &local->cont.getxattr.last_index; + fresh_children = local->fresh_children; + next_call_child = afr_next_call_child (fresh_children, + local->child_up, + priv->child_count, + last_index, read_child); + if (next_call_child < 0) + goto out; unwind = 0; STACK_WIND_COOKIE (frame, pump_getxattr_cbk, (void *) (long) read_child, - children[this_try], - children[this_try]->fops->getxattr, + children[next_call_child], + children[next_call_child]->fops->getxattr, &local->loc, - local->cont.getxattr.name); + local->cont.getxattr.name, NULL); } out: @@ -1218,7 +1370,7 @@ out: if (op_ret >= 0 && dict) __filter_xattrs (dict); - AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict); + AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict, NULL); } return 0; @@ -1226,17 +1378,15 @@ out: int32_t pump_getxattr (call_frame_t *frame, xlator_t *this, - loc_t *loc, const char *name) + loc_t *loc, const char *name, dict_t *xdata) { afr_private_t * priv = NULL; xlator_t ** children = NULL; int call_child = 0; - afr_local_t * local = NULL; - - int read_child = -1; - - int32_t op_ret = -1; - int32_t op_errno = 0; + afr_local_t *local = NULL; + int32_t ret = -1; + int32_t op_errno = 0; + uint64_t read_child = 0; VALIDATE_OR_GOTO (frame, out); @@ -1247,9 +1397,21 @@ pump_getxattr (call_frame_t *frame, xlator_t *this, VALIDATE_OR_GOTO (priv->children, out); children = priv->children; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_getxattr_cbk, + FIRST_CHILD (this), + (FIRST_CHILD (this))->fops->getxattr, + loc, name, xdata); + return 0; + } - ALLOC_OR_GOTO (local, afr_local_t, out); - frame->local = local; + + AFR_LOCAL_ALLOC_OR_GOTO (frame->local, out); + local = frame->local; + + ret = afr_local_init (local, priv, &op_errno); + if (ret < 0) + goto out; if (name) { if (!strncmp (name, AFR_XATTR_PREFIX, @@ -1259,34 +1421,33 @@ pump_getxattr (call_frame_t *frame, xlator_t *this, goto out; } - if (!strcmp (name, PUMP_CMD_STATUS)) { + if (!strcmp (name, RB_PUMP_CMD_STATUS)) { gf_log (this->name, GF_LOG_DEBUG, "Hit pump command - status"); pump_execute_status (frame, this); - op_ret = 0; + ret = 0; goto out; } } - read_child = afr_read_child (this, loc->inode); - - if (read_child >= 0) { - call_child = read_child; - - local->cont.getxattr.last_tried = -1; - } else { - call_child = afr_first_up_child (priv); - - if (call_child == -1) { - op_errno = ENOTCONN; - gf_log (this->name, GF_LOG_DEBUG, - "no child is up"); - goto out; - } - - local->cont.getxattr.last_tried = call_child; + local->fresh_children = GF_CALLOC (priv->child_count, + sizeof (*local->fresh_children), + gf_afr_mt_int32_t); + if (!local->fresh_children) { + ret = -1; + op_errno = ENOMEM; + goto out; } + read_child = afr_inode_get_read_ctx (this, loc->inode, local->fresh_children); + ret = afr_get_call_child (this, local->child_up, read_child, + local->fresh_children, + &call_child, + &local->cont.getxattr.last_index); + if (ret < 0) { + op_errno = -ret; + goto out; + } loc_copy (&local->loc, loc); if (name) local->cont.getxattr.name = gf_strdup (name); @@ -1294,13 +1455,12 @@ pump_getxattr (call_frame_t *frame, xlator_t *this, STACK_WIND_COOKIE (frame, pump_getxattr_cbk, (void *) (long) call_child, children[call_child], children[call_child]->fops->getxattr, - loc, name); + loc, name, xdata); - op_ret = 0; + ret = 0; out: - if (op_ret == -1) { - AFR_STACK_UNWIND (getxattr, frame, op_ret, op_errno, NULL); - } + if (ret < 0) + AFR_STACK_UNWIND (getxattr, frame, -1, op_errno, NULL, NULL); return 0; } @@ -1308,11 +1468,9 @@ static int afr_setxattr_unwind (call_frame_t *frame, xlator_t *this) { afr_local_t * local = NULL; - afr_private_t * priv = NULL; call_frame_t *main_frame = NULL; local = frame->local; - priv = this->private; LOCK (&frame->lock); { @@ -1324,14 +1482,14 @@ afr_setxattr_unwind (call_frame_t *frame, xlator_t *this) if (main_frame) { AFR_STACK_UNWIND (setxattr, main_frame, - local->op_ret, local->op_errno) + local->op_ret, local->op_errno, NULL); } return 0; } static int afr_setxattr_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno) + int32_t op_ret, int32_t op_errno, dict_t *xdata) { afr_local_t * local = NULL; afr_private_t * priv = NULL; @@ -1383,7 +1541,7 @@ afr_setxattr_wind (call_frame_t *frame, xlator_t *this) local = frame->local; priv = this->private; - call_count = afr_up_children_count (priv->child_count, local->child_up); + call_count = afr_up_children_count (local->child_up, priv->child_count); if (call_count == 0) { local->transaction.resume (frame, this); @@ -1400,7 +1558,7 @@ afr_setxattr_wind (call_frame_t *frame, xlator_t *this) priv->children[i]->fops->setxattr, &local->loc, local->cont.setxattr.dict, - local->cont.setxattr.flags); + local->cont.setxattr.flags, NULL); if (!--call_count) break; @@ -1428,11 +1586,9 @@ pump_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno) + int32_t op_errno, dict_t *xdata) { - STACK_UNWIND (frame, - op_ret, - op_errno); + AFR_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); return 0; } @@ -1444,18 +1600,16 @@ pump_command_reply (call_frame_t *frame, xlator_t *this) local = frame->local; if (local->op_ret < 0) - gf_log (this->name, GF_LOG_NORMAL, + gf_log (this->name, GF_LOG_INFO, "Command failed"); else - gf_log (this->name, GF_LOG_NORMAL, + gf_log (this->name, GF_LOG_INFO, "Command succeeded"); - dict_unref (local->dict); - AFR_STACK_UNWIND (setxattr, frame, local->op_ret, - local->op_errno); + local->op_errno, NULL); return 0; } @@ -1481,41 +1635,54 @@ pump_parse_command (call_frame_t *frame, xlator_t *this, frame->local = local; local->dict = dict_ref (dict); ret = pump_execute_abort (frame, this); + + } else if (pump_command_commit (this, dict)) { + frame->local = local; + local->dict = dict_ref (dict); + ret = pump_execute_commit (frame, this); } return ret; } int pump_setxattr (call_frame_t *frame, xlator_t *this, - loc_t *loc, dict_t *dict, int32_t flags) + loc_t *loc, dict_t *dict, int32_t flags, dict_t *xdata) { afr_private_t * priv = NULL; afr_local_t * local = NULL; call_frame_t *transaction_frame = NULL; - int ret = -1; - - int op_ret = -1; int op_errno = 0; VALIDATE_OR_GOTO (frame, out); VALIDATE_OR_GOTO (this, out); VALIDATE_OR_GOTO (this->private, out); + GF_IF_INTERNAL_XATTR_GOTO ("trusted.glusterfs.pump*", dict, + op_errno, out); + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_setxattr_cbk, + FIRST_CHILD (this), + (FIRST_CHILD (this))->fops->setxattr, + loc, dict, flags, xdata); + return 0; + } + - ALLOC_OR_GOTO (local, afr_local_t, out); + AFR_LOCAL_ALLOC_OR_GOTO (local, out); - ret = AFR_LOCAL_INIT (local, priv); + ret = afr_local_init (local, priv, &op_errno); if (ret < 0) { - op_errno = -ret; + afr_local_cleanup (local, this); goto out; - } + } ret = pump_parse_command (frame, this, local, dict); if (ret >= 0) { - op_ret = 0; + ret = 0; goto out; } @@ -1523,6 +1690,9 @@ pump_setxattr (call_frame_t *frame, xlator_t *this, if (!transaction_frame) { gf_log (this->name, GF_LOG_ERROR, "Out of memory."); + op_errno = ENOMEM; + ret = -1; + afr_local_cleanup (local, this); goto out; } @@ -1545,17 +1715,608 @@ pump_setxattr (call_frame_t *frame, xlator_t *this, afr_transaction (transaction_frame, this, AFR_METADATA_TRANSACTION); - op_ret = 0; + ret = 0; out: - if (op_ret == -1) { + if (ret < 0) { if (transaction_frame) AFR_STACK_DESTROY (transaction_frame); - AFR_STACK_UNWIND (setxattr, frame, op_ret, op_errno); + AFR_STACK_UNWIND (setxattr, frame, -1, op_errno, NULL); } return 0; } +/* Defaults */ +static int32_t +pump_lookup (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + dict_t *xattr_req) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_lookup_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->lookup, + loc, + xattr_req); + return 0; + } + + afr_lookup (frame, this, loc, xattr_req); + return 0; +} + + +static int32_t +pump_truncate (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + off_t offset, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_truncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, + loc, + offset, xdata); + return 0; + } + + afr_truncate (frame, this, loc, offset, xdata); + return 0; +} + + +static int32_t +pump_ftruncate (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + off_t offset, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_ftruncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ftruncate, + fd, + offset, xdata); + return 0; + } + + afr_ftruncate (frame, this, fd, offset, xdata); + return 0; +} + + + + +int +pump_mknod (call_frame_t *frame, xlator_t *this, + loc_t *loc, mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_mknod_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->mknod, + loc, mode, rdev, umask, xdata); + return 0; + } + afr_mknod (frame, this, loc, mode, rdev, umask, xdata); + return 0; + +} + + + +int +pump_mkdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, mode_t mode, mode_t umask, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_mkdir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->mkdir, + loc, mode, umask, xdata); + return 0; + } + afr_mkdir (frame, this, loc, mode, umask, xdata); + return 0; + +} + + +static int32_t +pump_unlink (call_frame_t *frame, + xlator_t *this, + loc_t *loc, int xflag, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_unlink_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->unlink, + loc, xflag, xdata); + return 0; + } + afr_unlink (frame, this, loc, xflag, xdata); + return 0; + +} + + +static int +pump_rmdir (call_frame_t *frame, xlator_t *this, + loc_t *loc, int flags, dict_t *xdata) +{ + afr_private_t *priv = NULL; + + priv = this->private; + + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_rmdir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->rmdir, + loc, flags, xdata); + return 0; + } + + afr_rmdir (frame, this, loc, flags, xdata); + return 0; + +} + + + +int +pump_symlink (call_frame_t *frame, xlator_t *this, + const char *linkpath, loc_t *loc, mode_t umask, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_symlink_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->symlink, + linkpath, loc, umask, xdata); + return 0; + } + afr_symlink (frame, this, linkpath, loc, umask, xdata); + return 0; + +} + + +static int32_t +pump_rename (call_frame_t *frame, + xlator_t *this, + loc_t *oldloc, + loc_t *newloc, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_rename_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->rename, + oldloc, newloc, xdata); + return 0; + } + afr_rename (frame, this, oldloc, newloc, xdata); + return 0; + +} + + +static int32_t +pump_link (call_frame_t *frame, + xlator_t *this, + loc_t *oldloc, + loc_t *newloc, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_link_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->link, + oldloc, newloc, xdata); + return 0; + } + afr_link (frame, this, oldloc, newloc, xdata); + return 0; + +} + + +static int32_t +pump_create (call_frame_t *frame, xlator_t *this, + loc_t *loc, int32_t flags, mode_t mode, + mode_t umask, fd_t *fd, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, default_create_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->create, + loc, flags, mode, umask, fd, xdata); + return 0; + } + afr_create (frame, this, loc, flags, mode, umask, fd, xdata); + return 0; + +} + + +static int32_t +pump_open (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + int32_t flags, fd_t *fd, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_open_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->open, + loc, flags, fd, xdata); + return 0; + } + afr_open (frame, this, loc, flags, fd, xdata); + return 0; + +} + + +static int32_t +pump_writev (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + struct iovec *vector, + int32_t count, + off_t off, uint32_t flags, + struct iobref *iobref, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_writev_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->writev, + fd, + vector, + count, + off, flags, + iobref, xdata); + return 0; + } + + afr_writev (frame, this, fd, vector, count, off, flags, iobref, xdata); + return 0; +} + + +static int32_t +pump_flush (call_frame_t *frame, + xlator_t *this, + fd_t *fd, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_flush_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, + fd, xdata); + return 0; + } + afr_flush (frame, this, fd, xdata); + return 0; + +} + + +static int32_t +pump_fsync (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + int32_t flags, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_fsync_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, + fd, + flags, xdata); + return 0; + } + afr_fsync (frame, this, fd, flags, xdata); + return 0; + +} + + +static int32_t +pump_opendir (call_frame_t *frame, + xlator_t *this, + loc_t *loc, fd_t *fd, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_opendir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->opendir, + loc, fd, xdata); + return 0; + } + afr_opendir (frame, this, loc, fd, xdata); + return 0; + +} + + +static int32_t +pump_fsyncdir (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + int32_t flags, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_fsyncdir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsyncdir, + fd, + flags, xdata); + return 0; + } + afr_fsyncdir (frame, this, fd, flags, xdata); + return 0; + +} + + +static int32_t +pump_xattrop (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + gf_xattrop_flags_t flags, + dict_t *dict, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_xattrop_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->xattrop, + loc, + flags, + dict, xdata); + return 0; + } + afr_xattrop (frame, this, loc, flags, dict, xdata); + return 0; + +} + +static int32_t +pump_fxattrop (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + gf_xattrop_flags_t flags, + dict_t *dict, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_fxattrop_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fxattrop, + fd, + flags, + dict, xdata); + return 0; + } + afr_fxattrop (frame, this, fd, flags, dict, xdata); + return 0; + +} + + +static int32_t +pump_removexattr (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + const char *name, dict_t *xdata) +{ + afr_private_t *priv = NULL; + int op_errno = -1; + + VALIDATE_OR_GOTO (this, out); + + GF_IF_NATIVE_XATTR_GOTO ("trusted.glusterfs.pump*", + name, op_errno, out); + + op_errno = 0; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_removexattr_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->removexattr, + loc, + name, xdata); + return 0; + } + afr_removexattr (frame, this, loc, name, xdata); + + out: + if (op_errno) + AFR_STACK_UNWIND (removexattr, frame, -1, op_errno, NULL); + return 0; + +} + + + +static int32_t +pump_readdir (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + size_t size, + off_t off, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_readdir_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readdir, + fd, size, off, xdata); + return 0; + } + afr_readdir (frame, this, fd, size, off, xdata); + return 0; + +} + + +static int32_t +pump_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, + size_t size, off_t off, dict_t *dict) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_readdirp_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readdirp, + fd, size, off, dict); + return 0; + } + afr_readdirp (frame, this, fd, size, off, dict); + return 0; + +} + + + +static int32_t +pump_releasedir (xlator_t *this, + fd_t *fd) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (priv->use_afr_in_pump) + afr_releasedir (this, fd); + return 0; + +} + +static int32_t +pump_release (xlator_t *this, + fd_t *fd) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (priv->use_afr_in_pump) + afr_release (this, fd); + return 0; + +} + +static int32_t +pump_forget (xlator_t *this, inode_t *inode) +{ + afr_private_t *priv = NULL; + + priv = this->private; + if (priv->use_afr_in_pump) + afr_forget (this, inode); + + return 0; +} + +static int32_t +pump_setattr (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + struct iatt *stbuf, + int32_t valid, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_setattr_cbk, + FIRST_CHILD (this), + FIRST_CHILD (this)->fops->setattr, + loc, stbuf, valid, xdata); + return 0; + } + afr_setattr (frame, this, loc, stbuf, valid, xdata); + return 0; + +} + + +static int32_t +pump_fsetattr (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + struct iatt *stbuf, + int32_t valid, dict_t *xdata) +{ + afr_private_t *priv = NULL; + priv = this->private; + if (!priv->use_afr_in_pump) { + STACK_WIND (frame, + default_fsetattr_cbk, + FIRST_CHILD (this), + FIRST_CHILD (this)->fops->fsetattr, + fd, stbuf, valid, xdata); + return 0; + } + afr_fsetattr (frame, this, fd, stbuf, valid, xdata); + return 0; + +} + + +/* End of defaults */ + + int32_t mem_acct_init (xlator_t *this) { @@ -1596,7 +2357,7 @@ notify (xlator_t *this, int32_t event, child_xl = (xlator_t *) data; - ret = afr_notify (this, event, data); + ret = afr_notify (this, event, data, NULL); switch (event) { case GF_EVENT_CHILD_DOWN: @@ -1607,6 +2368,8 @@ notify (xlator_t *this, int32_t event, case GF_EVENT_CHILD_UP: if (is_xlator_pump_sink (child_xl)) if (is_pump_start_pending (this)) { + gf_log (this->name, GF_LOG_DEBUG, + "about to start synctask"); ret = pump_start_synctask (this); if (ret < 0) gf_log (this->name, GF_LOG_DEBUG, @@ -1629,7 +2392,7 @@ init (xlator_t *this) xlator_list_t * trav = NULL; int i = 0; int ret = -1; - int op_errno = 0; + GF_UNUSED int op_errno = 0; int source_child = 0; @@ -1645,25 +2408,42 @@ init (xlator_t *this) "Volume is dangling."); } - ALLOC_OR_GOTO (this->private, afr_private_t, out); + this->private = GF_CALLOC (1, sizeof (afr_private_t), + gf_afr_mt_afr_private_t); + if (!this->private) + goto out; priv = this->private; + LOCK_INIT (&priv->lock); + LOCK_INIT (&priv->read_child_lock); + //lock recovery is not done in afr + pthread_mutex_init (&priv->mutex, NULL); + INIT_LIST_HEAD (&priv->saved_fds); + + child_count = xlator_subvolume_count (this); + if (child_count != 2) { + gf_log (this->name, GF_LOG_ERROR, + "There should be exactly 2 children - one source " + "and one sink"); + return -1; + } + priv->child_count = child_count; priv->read_child = source_child; priv->favorite_child = source_child; priv->background_self_heal_count = 0; - priv->data_self_heal = 1; + priv->data_self_heal = "on"; priv->metadata_self_heal = 1; priv->entry_self_heal = 1; - priv->data_self_heal_algorithm = ""; - priv->data_self_heal_window_size = 16; priv->data_change_log = 1; priv->metadata_change_log = 1; priv->entry_change_log = 1; + priv->use_afr_in_pump = 1; + priv->sh_readdir_size = 65536; /* Locking options */ @@ -1672,31 +2452,9 @@ init (xlator_t *this) and the sink. */ - priv->data_lock_server_count = 2; - priv->metadata_lock_server_count = 2; - priv->entry_lock_server_count = 2; - priv->strict_readdir = _gf_false; - trav = this->children; - while (trav) { - child_count++; - trav = trav->next; - } - priv->wait_count = 1; - - if (child_count != 2) { - gf_log (this->name, GF_LOG_ERROR, - "There should be exactly 2 children - one source " - "and one sink"); - return -1; - } - priv->child_count = child_count; - - LOCK_INIT (&priv->lock); - LOCK_INIT (&priv->read_child_lock); - priv->child_up = GF_CALLOC (sizeof (unsigned char), child_count, gf_afr_mt_char); if (!priv->child_up) { @@ -1730,8 +2488,8 @@ init (xlator_t *this) while (i < child_count) { priv->children[i] = trav->xlator; - ret = asprintf (&priv->pending_key[i], "%s.%s", AFR_XATTR_PREFIX, - trav->xlator->name); + ret = gf_asprintf (&priv->pending_key[i], "%s.%s", AFR_XATTR_PREFIX, + trav->xlator->name); if (-1 == ret) { gf_log (this->name, GF_LOG_ERROR, "asprintf failed to set pending key"); @@ -1743,9 +2501,22 @@ init (xlator_t *this) i++; } + ret = gf_asprintf (&priv->sh_domain, "%s-self-heal", this->name); + if (-1 == ret) { + op_errno = ENOMEM; + goto out; + } + priv->first_lookup = 1; priv->root_inode = NULL; + priv->last_event = GF_CALLOC (child_count, sizeof (*priv->last_event), + gf_afr_mt_int32_t); + if (!priv->last_event) { + ret = -ENOMEM; + goto out; + } + pump_priv = GF_CALLOC (1, sizeof (*pump_priv), gf_afr_mt_pump_priv); if (!pump_priv) { @@ -1767,7 +2538,7 @@ init (xlator_t *this) goto out; } - pump_priv->env = syncenv_new (0); + pump_priv->env = this->ctx->env; if (!pump_priv->env) { gf_log (this->name, GF_LOG_ERROR, "Could not create new sync-environment"); @@ -1775,6 +2546,15 @@ init (xlator_t *this) goto out; } + /* keep more local here as we may need them for self-heal etc */ + this->local_pool = mem_pool_new (afr_local_t, 128); + if (!this->local_pool) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "failed to create local_t's memory pool"); + goto out; + } + priv->pump_private = pump_priv; pump_change_state (this, PUMP_STATE_ABORT); @@ -1787,56 +2567,62 @@ out: int fini (xlator_t *this) { + afr_private_t * priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + this->private = NULL; + if (!priv) + goto out; + + pump_priv = priv->pump_private; + if (!pump_priv) + goto afr_priv; + + GF_FREE (pump_priv->resume_path); + LOCK_DESTROY (&pump_priv->resume_path_lock); + LOCK_DESTROY (&pump_priv->pump_state_lock); + GF_FREE (pump_priv); +afr_priv: + afr_priv_destroy (priv); +out: return 0; } struct xlator_fops fops = { - .lookup = afr_lookup, - .open = afr_open, - .lk = afr_lk, - .flush = afr_flush, - .statfs = afr_statfs, - .fsync = afr_fsync, - .fsyncdir = afr_fsyncdir, - .xattrop = afr_xattrop, - .fxattrop = afr_fxattrop, - .inodelk = afr_inodelk, - .finodelk = afr_finodelk, - .entrylk = afr_entrylk, - .fentrylk = afr_fentrylk, - - /* inode read */ - .access = afr_access, - .stat = afr_stat, - .fstat = afr_fstat, - .readlink = afr_readlink, - .getxattr = pump_getxattr, - .readv = afr_readv, + .lookup = pump_lookup, + .open = pump_open, + .flush = pump_flush, + .fsync = pump_fsync, + .fsyncdir = pump_fsyncdir, + .xattrop = pump_xattrop, + .fxattrop = pump_fxattrop, + .getxattr = pump_getxattr, /* inode write */ - .writev = afr_writev, - .truncate = afr_truncate, - .ftruncate = afr_ftruncate, + .writev = pump_writev, + .truncate = pump_truncate, + .ftruncate = pump_ftruncate, .setxattr = pump_setxattr, - .setattr = afr_setattr, - .fsetattr = afr_fsetattr, - .removexattr = afr_removexattr, + .setattr = pump_setattr, + .fsetattr = pump_fsetattr, + .removexattr = pump_removexattr, /* dir read */ - .opendir = afr_opendir, - .readdir = afr_readdir, - .readdirp = afr_readdirp, + .opendir = pump_opendir, + .readdir = pump_readdir, + .readdirp = pump_readdirp, /* dir write */ - .create = afr_create, - .mknod = afr_mknod, - .mkdir = afr_mkdir, - .unlink = afr_unlink, - .rmdir = afr_rmdir, - .link = afr_link, - .symlink = afr_symlink, - .rename = afr_rename, + .create = pump_create, + .mknod = pump_mknod, + .mkdir = pump_mkdir, + .unlink = pump_unlink, + .rmdir = pump_rmdir, + .link = pump_link, + .symlink = pump_symlink, + .rename = pump_rename, }; struct xlator_dumpops dumpops = { @@ -1845,8 +2631,9 @@ struct xlator_dumpops dumpops = { struct xlator_cbks cbks = { - .release = afr_release, - .releasedir = afr_releasedir, + .release = pump_release, + .releasedir = pump_releasedir, + .forget = pump_forget, }; struct volume_options options[] = { |
