diff options
Diffstat (limited to 'xlators/cluster/afr/src/pump.c')
-rw-r--r-- | xlators/cluster/afr/src/pump.c | 459 |
1 files changed, 328 insertions, 131 deletions
diff --git a/xlators/cluster/afr/src/pump.c b/xlators/cluster/afr/src/pump.c index e69c02845..b62b079aa 100644 --- a/xlators/cluster/afr/src/pump.c +++ b/xlators/cluster/afr/src/pump.c @@ -28,7 +28,47 @@ #include "afr-common.c" -pump_state_t +static int +pump_mark_start_pending (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + pump_priv->pump_start_pending = 1; + + return 0; +} + +static int +is_pump_start_pending (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + return (pump_priv->pump_start_pending); +} + +static int +pump_remove_start_pending (xlator_t *this) +{ + afr_private_t *priv = NULL; + pump_private_t *pump_priv = NULL; + + priv = this->private; + pump_priv = priv->pump_private; + + pump_priv->pump_start_pending = 0; + + return 0; +} + +static pump_state_t pump_get_state () { xlator_t *this = NULL; @@ -59,16 +99,11 @@ pump_change_state (xlator_t *this, pump_state_t state) pump_state_t state_old; pump_state_t state_new; - unsigned char * child_up = NULL; - int i = 0; - priv = this->private; pump_priv = priv->pump_private; - child_up = priv->child_up; - - assert (pump_priv); + GF_ASSERT (pump_priv); LOCK (&pump_priv->pump_state_lock); { @@ -77,48 +112,6 @@ pump_change_state (xlator_t *this, pump_state_t state) pump_priv->pump_state = state; - switch (pump_priv->pump_state) { - case PUMP_STATE_RESUME: - case PUMP_STATE_RUNNING: - case PUMP_STATE_PAUSE: - { - priv->pump_loaded = _gf_true; - i = 1; - - child_up[i] = 1; - - LOCK (&priv->lock); - { - priv->up_count++; - } - UNLOCK (&priv->lock); - - break; - } - case PUMP_STATE_ABORT: - { - priv->pump_loaded = _gf_false; - i = 1; - - child_up[i] = 0; - - LOCK (&priv->lock); - { - priv->down_count++; - } - UNLOCK (&priv->lock); - - LOCK (&pump_priv->resume_path_lock); - { - pump_priv->number_files_pumped = 0; - } - UNLOCK (&pump_priv->resume_path_lock); - - - break; - } - - } } UNLOCK (&pump_priv->pump_state_lock); @@ -338,67 +331,24 @@ is_pump_traversal_allowed (xlator_t *this, const char *path) } static int -pump_update_file_stats (xlator_t *this, long source_blocks, - long sink_blocks) +pump_save_file_stats (xlator_t *this, const char *path) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; - priv = this->private; + priv = this->private; pump_priv = priv->pump_private; LOCK (&pump_priv->resume_path_lock); { - pump_priv->source_blocks = source_blocks; - pump_priv->sink_blocks = sink_blocks; - } - UNLOCK (&pump_priv->resume_path_lock); - - return 0; -} - -static int -pump_save_file_stats (xlator_t *this) -{ - afr_private_t *priv = NULL; - struct statvfs source_buf = {0, }; - struct statvfs sink_buf = {0, }; - loc_t loc; - int ret = -1; - - priv = this->private; - - assert (priv->root_inode); - - build_root_loc (priv->root_inode, &loc); - - ret = syncop_statfs (PUMP_SOURCE_CHILD (this), - &loc, &source_buf); - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "source statfs failed"); - } else { - gf_log (this->name, GF_LOG_DEBUG, - "source statfs succeeded"); - } + pump_priv->number_files_pumped++; - - ret = syncop_statfs (PUMP_SOURCE_CHILD (this), - &loc, &sink_buf); - if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, - "sink statfs failed"); - } else { - gf_log (this->name, GF_LOG_DEBUG, - "sink statfs succeeded"); + strncpy (pump_priv->current_file, path, + PATH_MAX); } - - pump_update_file_stats (this, - source_buf.f_blocks, - sink_buf.f_blocks); + UNLOCK (&pump_priv->resume_path_lock); return 0; - } static int pump_save_path (xlator_t *this, const char *path) @@ -435,16 +385,6 @@ pump_save_path (xlator_t *this, const char *path) "setxattr succeeded - saved path=%s", path); gf_log (this->name, GF_LOG_DEBUG, "Saving path for status info"); - - LOCK (&pump_priv->resume_path_lock); - { - pump_priv->number_files_pumped++; - - strncpy (pump_priv->current_file, path, - PATH_MAX); - } - UNLOCK (&pump_priv->resume_path_lock); - } dict_unref (dict); @@ -534,7 +474,7 @@ gf_pump_traverse_directory (loc_t *loc) if (!IS_ENTRY_CWD(entry->d_name) && !IS_ENTRY_PARENT (entry->d_name)) { pump_save_path (this, entry_loc.path); - pump_save_file_stats (this); + pump_save_file_stats (this, entry_loc.path); } ret = pump_check_and_update_status (this); @@ -726,19 +666,16 @@ pump_task_completion (int ret, void *data) } int -pump_start (call_frame_t *frame, xlator_t *this) +pump_start (call_frame_t *pump_frame, xlator_t *this) { afr_private_t *priv = NULL; pump_private_t *pump_priv = NULL; - call_frame_t *pump_frame = NULL; int ret = -1; priv = this->private; pump_priv = priv->pump_private; - pump_frame = copy_frame (frame); - if (!pump_frame->root->lk_owner) pump_frame->root->lk_owner = PUMP_LK_OWNER; @@ -782,6 +719,212 @@ is_pump_loaded (xlator_t *this) } +static int +pump_start_synctask (xlator_t *this) +{ + call_frame_t *frame = NULL; + int ret = 0; + + frame = create_frame (this, this->ctx->pool); + if (!frame) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + ret = -1; + goto out; + } + + pump_change_state (this, PUMP_STATE_RUNNING); + + ret = pump_start (frame, this); + +out: + return ret; +} + +int32_t +pump_cmd_start_setxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) + +{ + afr_local_t *local = NULL; + int ret = 0; + + local = frame->local; + + if (op_ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Could not initiate destination " + "brick connect"); + ret = op_ret; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Successfully initiated destination " + "brick connect"); + + pump_mark_start_pending (this); + +out: + local->op_ret = ret; + pump_command_reply (frame, this); + + return 0; +} + +static int +pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this) +{ + afr_local_t *local = NULL; + afr_private_t *priv = NULL; + dict_t *dict = NULL; + char *dst_brick = NULL; + loc_t loc; + + int ret = 0; + + priv = this->private; + local = frame->local; + + GF_ASSERT (priv->root_inode); + + build_root_loc (priv->root_inode, &loc); + + ret = dict_get_str (local->dict, PUMP_CMD_START, &dst_brick); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Could not get destination brick value"); + goto out; + } + + dict = dict_new (); + if (!dict) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + ret = -1; + goto out; + } + + GF_ASSERT (dst_brick); + gf_log (this->name, GF_LOG_DEBUG, + "Got destination brick as %s", dst_brick); + + ret = dict_set_str (dict, CLIENT_CMD_CONNECT, dst_brick); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Could not inititiate destination brick " + "connect"); + goto out; + } + + STACK_WIND (frame, + pump_cmd_start_setxattr_cbk, + PUMP_SINK_CHILD(this), + PUMP_SINK_CHILD(this)->fops->setxattr, + &loc, + dict, + 0); + + ret = 0; + + dict_unref (dict); +out: + return ret; +} + +int32_t +pump_cmd_abort_setxattr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) + +{ + afr_local_t *local = NULL; + int ret = 0; + + local = frame->local; + + if (op_ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Could not initiate destination " + "brick disconnect"); + ret = op_ret; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, + "Successfully initiated destination " + "brick disconnect"); + ret = 0; + +out: + local->op_ret = ret; + pump_command_reply (frame, this); + return 0; +} + +static int +pump_initiate_sink_disconnect (call_frame_t *frame, xlator_t *this) +{ + afr_local_t *local = NULL; + afr_private_t *priv = NULL; + dict_t *dict = NULL; + loc_t loc; + + int ret = 0; + + priv = this->private; + local = frame->local; + + GF_ASSERT (priv->root_inode); + + build_root_loc (priv->root_inode, &loc); + + dict = dict_new (); + if (!dict) { + gf_log (this->name, GF_LOG_ERROR, + "Out of memory"); + ret = -1; + goto out; + } + + ret = dict_set_str (dict, CLIENT_CMD_DISCONNECT, "jargon"); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "Could not inititiate destination brick " + "disconnect"); + goto out; + } + + STACK_WIND (frame, + pump_cmd_abort_setxattr_cbk, + PUMP_SINK_CHILD(this), + PUMP_SINK_CHILD(this)->fops->setxattr, + &loc, + dict, + 0); + + ret = 0; + + dict_unref (dict); +out: + return ret; +} + +static int +is_pump_aborted (xlator_t *this) +{ + pump_state_t state; + + state = pump_get_state (); + + return ((state == PUMP_STATE_ABORT)); +} + int32_t pump_cmd_start_getxattr_cbk (call_frame_t *frame, void *cookie, @@ -795,14 +938,17 @@ pump_cmd_start_getxattr_cbk (call_frame_t *frame, pump_state_t state; int ret = 0; + int need_unwind = 0; int dict_ret = -1; local = frame->local; if (op_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, - "getxattr failed - changing pump state to RUNNING with '/'"); + "getxattr failed - changing pump " + "state to RUNNING with '/'"); path = "/"; + ret = op_ret; } else { gf_log (this->name, GF_LOG_TRACE, "getxattr succeeded"); @@ -822,13 +968,22 @@ pump_cmd_start_getxattr_cbk (call_frame_t *frame, } pump_set_resume_path (this, path); - pump_change_state (this, PUMP_STATE_RUNNING); - ret = pump_start (frame, this); + if (is_pump_aborted (this)) + /* We're re-starting pump afresh */ + ret = pump_initiate_sink_connect (frame, this); + else { + /* We're re-starting pump from a previous + pause */ + ret = pump_start_synctask (this); + need_unwind = 1; + } out: - local->op_ret = ret; - pump_command_reply (frame, this); + if ((ret < 0) || (need_unwind == 1)) { + local->op_ret = ret; + pump_command_reply (frame, this); + } return 0; } @@ -924,13 +1079,14 @@ pump_execute_start (call_frame_t *frame, xlator_t *this) local = frame->local; if (!priv->root_inode) { - gf_log (this->name, GF_LOG_NORMAL, - "Pump xlator cannot be started without an initial lookup"); + gf_log (this->name, GF_LOG_ERROR, + "Pump xlator cannot be started without an initial " + "lookup"); ret = -1; goto out; } - assert (priv->root_inode); + GF_ASSERT (priv->root_inode); build_root_loc (priv->root_inode, &loc); @@ -960,6 +1116,7 @@ pump_cmd_abort_removexattr_cbk (call_frame_t *frame, int32_t op_errno) { afr_local_t *local = NULL; + int ret = 0; local = frame->local; @@ -967,16 +1124,23 @@ pump_cmd_abort_removexattr_cbk (call_frame_t *frame, gf_log (this->name, GF_LOG_ERROR, "Aborting pump failed. Please remove xattr" PUMP_PATH "of the source child's '/'"); - local->op_ret = -1; - } else { - gf_log (this->name, GF_LOG_DEBUG, - "remove xattr succeeded"); - local->op_ret = 0; + ret = op_ret; + goto out; } + gf_log (this->name, GF_LOG_DEBUG, + "remove xattr succeeded"); + + pump_change_state (this, PUMP_STATE_ABORT); + ret = pump_initiate_sink_disconnect (frame, this); + +out: + if (ret < 0) { + local->op_ret = ret; + pump_command_reply (frame, this); + } - pump_command_reply (frame, this); return 0; } @@ -1000,7 +1164,7 @@ pump_execute_abort (call_frame_t *frame, xlator_t *this) goto out; } - assert (priv->root_inode); + GF_ASSERT (priv->root_inode); build_root_loc (priv->root_inode, &root_loc); @@ -1446,6 +1610,8 @@ pump_command_reply (call_frame_t *frame, xlator_t *this) gf_log (this->name, GF_LOG_NORMAL, "Command succeeded"); + dict_unref (local->dict); + AFR_STACK_UNWIND (setxattr, frame, local->op_ret, @@ -1463,14 +1629,17 @@ pump_parse_command (call_frame_t *frame, xlator_t *this, if (pump_command_start (this, dict)) { frame->local = local; + local->dict = dict_ref (dict); ret = pump_execute_start (frame, this); } else if (pump_command_pause (this, dict)) { frame->local = local; + local->dict = dict_ref (dict); ret = pump_execute_pause (frame, this); } else if (pump_command_abort (this, dict)) { frame->local = local; + local->dict = dict_ref (dict); ret = pump_execute_abort (frame, this); } return ret; @@ -1566,19 +1735,47 @@ mem_acct_init (xlator_t *this) return ret; } +static int +is_xlator_pump_sink (xlator_t *child) +{ + return (child == PUMP_SINK_CHILD(THIS)); +} + +static int +is_xlator_pump_source (xlator_t *child) +{ + return (child == PUMP_SOURCE_CHILD(THIS)); +} + int32_t notify (xlator_t *this, int32_t event, void *data, ...) { int ret = -1; + xlator_t *child_xl = NULL; + + child_xl = (xlator_t *) data; + + ret = afr_notify (this, event, data); switch (event) { case GF_EVENT_CHILD_DOWN: - pump_change_state (this, PUMP_STATE_ABORT); + if (is_xlator_pump_source (child_xl)) + pump_change_state (this, PUMP_STATE_ABORT); break; - } - ret = afr_notify (this, event, data); + case GF_EVENT_CHILD_UP: + if (is_xlator_pump_sink (child_xl)) + if (is_pump_start_pending (this)) { + ret = pump_start_synctask (this); + if (ret < 0) + gf_log (this->name, GF_LOG_DEBUG, + "Could not start pump " + "synctask"); + else + pump_remove_start_pending (this); + } + } return ret; } |