summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/cluster/afr/src/afr.h2
-rw-r--r--xlators/cluster/afr/src/pump.c459
-rw-r--r--xlators/cluster/afr/src/pump.h32
3 files changed, 348 insertions, 145 deletions
diff --git a/xlators/cluster/afr/src/afr.h b/xlators/cluster/afr/src/afr.h
index 3fa987ee8..284eb7a1b 100644
--- a/xlators/cluster/afr/src/afr.h
+++ b/xlators/cluster/afr/src/afr.h
@@ -245,6 +245,8 @@ typedef struct _afr_local {
int32_t inodelk_count;
int32_t entrylk_count;
+ dict_t *dict;
+
int (*up_down_flush_cbk) (call_frame_t *, xlator_t *);
/*
diff --git a/xlators/cluster/afr/src/pump.c b/xlators/cluster/afr/src/pump.c
index e69c02845..b62b079aa 100644
--- a/xlators/cluster/afr/src/pump.c
+++ b/xlators/cluster/afr/src/pump.c
@@ -28,7 +28,47 @@
#include "afr-common.c"
-pump_state_t
+static int
+pump_mark_start_pending (xlator_t *this)
+{
+ afr_private_t *priv = NULL;
+ pump_private_t *pump_priv = NULL;
+
+ priv = this->private;
+ pump_priv = priv->pump_private;
+
+ pump_priv->pump_start_pending = 1;
+
+ return 0;
+}
+
+static int
+is_pump_start_pending (xlator_t *this)
+{
+ afr_private_t *priv = NULL;
+ pump_private_t *pump_priv = NULL;
+
+ priv = this->private;
+ pump_priv = priv->pump_private;
+
+ return (pump_priv->pump_start_pending);
+}
+
+static int
+pump_remove_start_pending (xlator_t *this)
+{
+ afr_private_t *priv = NULL;
+ pump_private_t *pump_priv = NULL;
+
+ priv = this->private;
+ pump_priv = priv->pump_private;
+
+ pump_priv->pump_start_pending = 0;
+
+ return 0;
+}
+
+static pump_state_t
pump_get_state ()
{
xlator_t *this = NULL;
@@ -59,16 +99,11 @@ pump_change_state (xlator_t *this, pump_state_t state)
pump_state_t state_old;
pump_state_t state_new;
- unsigned char * child_up = NULL;
- int i = 0;
-
priv = this->private;
pump_priv = priv->pump_private;
- child_up = priv->child_up;
-
- assert (pump_priv);
+ GF_ASSERT (pump_priv);
LOCK (&pump_priv->pump_state_lock);
{
@@ -77,48 +112,6 @@ pump_change_state (xlator_t *this, pump_state_t state)
pump_priv->pump_state = state;
- switch (pump_priv->pump_state) {
- case PUMP_STATE_RESUME:
- case PUMP_STATE_RUNNING:
- case PUMP_STATE_PAUSE:
- {
- priv->pump_loaded = _gf_true;
- i = 1;
-
- child_up[i] = 1;
-
- LOCK (&priv->lock);
- {
- priv->up_count++;
- }
- UNLOCK (&priv->lock);
-
- break;
- }
- case PUMP_STATE_ABORT:
- {
- priv->pump_loaded = _gf_false;
- i = 1;
-
- child_up[i] = 0;
-
- LOCK (&priv->lock);
- {
- priv->down_count++;
- }
- UNLOCK (&priv->lock);
-
- LOCK (&pump_priv->resume_path_lock);
- {
- pump_priv->number_files_pumped = 0;
- }
- UNLOCK (&pump_priv->resume_path_lock);
-
-
- break;
- }
-
- }
}
UNLOCK (&pump_priv->pump_state_lock);
@@ -338,67 +331,24 @@ is_pump_traversal_allowed (xlator_t *this, const char *path)
}
static int
-pump_update_file_stats (xlator_t *this, long source_blocks,
- long sink_blocks)
+pump_save_file_stats (xlator_t *this, const char *path)
{
afr_private_t *priv = NULL;
pump_private_t *pump_priv = NULL;
- priv = this->private;
+ priv = this->private;
pump_priv = priv->pump_private;
LOCK (&pump_priv->resume_path_lock);
{
- pump_priv->source_blocks = source_blocks;
- pump_priv->sink_blocks = sink_blocks;
- }
- UNLOCK (&pump_priv->resume_path_lock);
-
- return 0;
-}
-
-static int
-pump_save_file_stats (xlator_t *this)
-{
- afr_private_t *priv = NULL;
- struct statvfs source_buf = {0, };
- struct statvfs sink_buf = {0, };
- loc_t loc;
- int ret = -1;
-
- priv = this->private;
-
- assert (priv->root_inode);
-
- build_root_loc (priv->root_inode, &loc);
-
- ret = syncop_statfs (PUMP_SOURCE_CHILD (this),
- &loc, &source_buf);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_DEBUG,
- "source statfs failed");
- } else {
- gf_log (this->name, GF_LOG_DEBUG,
- "source statfs succeeded");
- }
+ pump_priv->number_files_pumped++;
-
- ret = syncop_statfs (PUMP_SOURCE_CHILD (this),
- &loc, &sink_buf);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_DEBUG,
- "sink statfs failed");
- } else {
- gf_log (this->name, GF_LOG_DEBUG,
- "sink statfs succeeded");
+ strncpy (pump_priv->current_file, path,
+ PATH_MAX);
}
-
- pump_update_file_stats (this,
- source_buf.f_blocks,
- sink_buf.f_blocks);
+ UNLOCK (&pump_priv->resume_path_lock);
return 0;
-
}
static int
pump_save_path (xlator_t *this, const char *path)
@@ -435,16 +385,6 @@ pump_save_path (xlator_t *this, const char *path)
"setxattr succeeded - saved path=%s", path);
gf_log (this->name, GF_LOG_DEBUG,
"Saving path for status info");
-
- LOCK (&pump_priv->resume_path_lock);
- {
- pump_priv->number_files_pumped++;
-
- strncpy (pump_priv->current_file, path,
- PATH_MAX);
- }
- UNLOCK (&pump_priv->resume_path_lock);
-
}
dict_unref (dict);
@@ -534,7 +474,7 @@ gf_pump_traverse_directory (loc_t *loc)
if (!IS_ENTRY_CWD(entry->d_name) &&
!IS_ENTRY_PARENT (entry->d_name)) {
pump_save_path (this, entry_loc.path);
- pump_save_file_stats (this);
+ pump_save_file_stats (this, entry_loc.path);
}
ret = pump_check_and_update_status (this);
@@ -726,19 +666,16 @@ pump_task_completion (int ret, void *data)
}
int
-pump_start (call_frame_t *frame, xlator_t *this)
+pump_start (call_frame_t *pump_frame, xlator_t *this)
{
afr_private_t *priv = NULL;
pump_private_t *pump_priv = NULL;
- call_frame_t *pump_frame = NULL;
int ret = -1;
priv = this->private;
pump_priv = priv->pump_private;
- pump_frame = copy_frame (frame);
-
if (!pump_frame->root->lk_owner)
pump_frame->root->lk_owner = PUMP_LK_OWNER;
@@ -782,6 +719,212 @@ is_pump_loaded (xlator_t *this)
}
+static int
+pump_start_synctask (xlator_t *this)
+{
+ call_frame_t *frame = NULL;
+ int ret = 0;
+
+ frame = create_frame (this, this->ctx->pool);
+ if (!frame) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Out of memory");
+ ret = -1;
+ goto out;
+ }
+
+ pump_change_state (this, PUMP_STATE_RUNNING);
+
+ ret = pump_start (frame, this);
+
+out:
+ return ret;
+}
+
+int32_t
+pump_cmd_start_setxattr_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+
+{
+ afr_local_t *local = NULL;
+ int ret = 0;
+
+ local = frame->local;
+
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not initiate destination "
+ "brick connect");
+ ret = op_ret;
+ goto out;
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Successfully initiated destination "
+ "brick connect");
+
+ pump_mark_start_pending (this);
+
+out:
+ local->op_ret = ret;
+ pump_command_reply (frame, this);
+
+ return 0;
+}
+
+static int
+pump_initiate_sink_connect (call_frame_t *frame, xlator_t *this)
+{
+ afr_local_t *local = NULL;
+ afr_private_t *priv = NULL;
+ dict_t *dict = NULL;
+ char *dst_brick = NULL;
+ loc_t loc;
+
+ int ret = 0;
+
+ priv = this->private;
+ local = frame->local;
+
+ GF_ASSERT (priv->root_inode);
+
+ build_root_loc (priv->root_inode, &loc);
+
+ ret = dict_get_str (local->dict, PUMP_CMD_START, &dst_brick);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not get destination brick value");
+ goto out;
+ }
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Out of memory");
+ ret = -1;
+ goto out;
+ }
+
+ GF_ASSERT (dst_brick);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Got destination brick as %s", dst_brick);
+
+ ret = dict_set_str (dict, CLIENT_CMD_CONNECT, dst_brick);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not inititiate destination brick "
+ "connect");
+ goto out;
+ }
+
+ STACK_WIND (frame,
+ pump_cmd_start_setxattr_cbk,
+ PUMP_SINK_CHILD(this),
+ PUMP_SINK_CHILD(this)->fops->setxattr,
+ &loc,
+ dict,
+ 0);
+
+ ret = 0;
+
+ dict_unref (dict);
+out:
+ return ret;
+}
+
+int32_t
+pump_cmd_abort_setxattr_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+
+{
+ afr_local_t *local = NULL;
+ int ret = 0;
+
+ local = frame->local;
+
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not initiate destination "
+ "brick disconnect");
+ ret = op_ret;
+ goto out;
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Successfully initiated destination "
+ "brick disconnect");
+ ret = 0;
+
+out:
+ local->op_ret = ret;
+ pump_command_reply (frame, this);
+ return 0;
+}
+
+static int
+pump_initiate_sink_disconnect (call_frame_t *frame, xlator_t *this)
+{
+ afr_local_t *local = NULL;
+ afr_private_t *priv = NULL;
+ dict_t *dict = NULL;
+ loc_t loc;
+
+ int ret = 0;
+
+ priv = this->private;
+ local = frame->local;
+
+ GF_ASSERT (priv->root_inode);
+
+ build_root_loc (priv->root_inode, &loc);
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Out of memory");
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_str (dict, CLIENT_CMD_DISCONNECT, "jargon");
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not inititiate destination brick "
+ "disconnect");
+ goto out;
+ }
+
+ STACK_WIND (frame,
+ pump_cmd_abort_setxattr_cbk,
+ PUMP_SINK_CHILD(this),
+ PUMP_SINK_CHILD(this)->fops->setxattr,
+ &loc,
+ dict,
+ 0);
+
+ ret = 0;
+
+ dict_unref (dict);
+out:
+ return ret;
+}
+
+static int
+is_pump_aborted (xlator_t *this)
+{
+ pump_state_t state;
+
+ state = pump_get_state ();
+
+ return ((state == PUMP_STATE_ABORT));
+}
+
int32_t
pump_cmd_start_getxattr_cbk (call_frame_t *frame,
void *cookie,
@@ -795,14 +938,17 @@ pump_cmd_start_getxattr_cbk (call_frame_t *frame,
pump_state_t state;
int ret = 0;
+ int need_unwind = 0;
int dict_ret = -1;
local = frame->local;
if (op_ret < 0) {
gf_log (this->name, GF_LOG_DEBUG,
- "getxattr failed - changing pump state to RUNNING with '/'");
+ "getxattr failed - changing pump "
+ "state to RUNNING with '/'");
path = "/";
+ ret = op_ret;
} else {
gf_log (this->name, GF_LOG_TRACE,
"getxattr succeeded");
@@ -822,13 +968,22 @@ pump_cmd_start_getxattr_cbk (call_frame_t *frame,
}
pump_set_resume_path (this, path);
- pump_change_state (this, PUMP_STATE_RUNNING);
- ret = pump_start (frame, this);
+ if (is_pump_aborted (this))
+ /* We're re-starting pump afresh */
+ ret = pump_initiate_sink_connect (frame, this);
+ else {
+ /* We're re-starting pump from a previous
+ pause */
+ ret = pump_start_synctask (this);
+ need_unwind = 1;
+ }
out:
- local->op_ret = ret;
- pump_command_reply (frame, this);
+ if ((ret < 0) || (need_unwind == 1)) {
+ local->op_ret = ret;
+ pump_command_reply (frame, this);
+ }
return 0;
}
@@ -924,13 +1079,14 @@ pump_execute_start (call_frame_t *frame, xlator_t *this)
local = frame->local;
if (!priv->root_inode) {
- gf_log (this->name, GF_LOG_NORMAL,
- "Pump xlator cannot be started without an initial lookup");
+ gf_log (this->name, GF_LOG_ERROR,
+ "Pump xlator cannot be started without an initial "
+ "lookup");
ret = -1;
goto out;
}
- assert (priv->root_inode);
+ GF_ASSERT (priv->root_inode);
build_root_loc (priv->root_inode, &loc);
@@ -960,6 +1116,7 @@ pump_cmd_abort_removexattr_cbk (call_frame_t *frame,
int32_t op_errno)
{
afr_local_t *local = NULL;
+ int ret = 0;
local = frame->local;
@@ -967,16 +1124,23 @@ pump_cmd_abort_removexattr_cbk (call_frame_t *frame,
gf_log (this->name, GF_LOG_ERROR,
"Aborting pump failed. Please remove xattr"
PUMP_PATH "of the source child's '/'");
- local->op_ret = -1;
- } else {
- gf_log (this->name, GF_LOG_DEBUG,
- "remove xattr succeeded");
- local->op_ret = 0;
+ ret = op_ret;
+ goto out;
}
+ gf_log (this->name, GF_LOG_DEBUG,
+ "remove xattr succeeded");
+
+
pump_change_state (this, PUMP_STATE_ABORT);
+ ret = pump_initiate_sink_disconnect (frame, this);
+
+out:
+ if (ret < 0) {
+ local->op_ret = ret;
+ pump_command_reply (frame, this);
+ }
- pump_command_reply (frame, this);
return 0;
}
@@ -1000,7 +1164,7 @@ pump_execute_abort (call_frame_t *frame, xlator_t *this)
goto out;
}
- assert (priv->root_inode);
+ GF_ASSERT (priv->root_inode);
build_root_loc (priv->root_inode, &root_loc);
@@ -1446,6 +1610,8 @@ pump_command_reply (call_frame_t *frame, xlator_t *this)
gf_log (this->name, GF_LOG_NORMAL,
"Command succeeded");
+ dict_unref (local->dict);
+
AFR_STACK_UNWIND (setxattr,
frame,
local->op_ret,
@@ -1463,14 +1629,17 @@ pump_parse_command (call_frame_t *frame, xlator_t *this,
if (pump_command_start (this, dict)) {
frame->local = local;
+ local->dict = dict_ref (dict);
ret = pump_execute_start (frame, this);
} else if (pump_command_pause (this, dict)) {
frame->local = local;
+ local->dict = dict_ref (dict);
ret = pump_execute_pause (frame, this);
} else if (pump_command_abort (this, dict)) {
frame->local = local;
+ local->dict = dict_ref (dict);
ret = pump_execute_abort (frame, this);
}
return ret;
@@ -1566,19 +1735,47 @@ mem_acct_init (xlator_t *this)
return ret;
}
+static int
+is_xlator_pump_sink (xlator_t *child)
+{
+ return (child == PUMP_SINK_CHILD(THIS));
+}
+
+static int
+is_xlator_pump_source (xlator_t *child)
+{
+ return (child == PUMP_SOURCE_CHILD(THIS));
+}
+
int32_t
notify (xlator_t *this, int32_t event,
void *data, ...)
{
int ret = -1;
+ xlator_t *child_xl = NULL;
+
+ child_xl = (xlator_t *) data;
+
+ ret = afr_notify (this, event, data);
switch (event) {
case GF_EVENT_CHILD_DOWN:
- pump_change_state (this, PUMP_STATE_ABORT);
+ if (is_xlator_pump_source (child_xl))
+ pump_change_state (this, PUMP_STATE_ABORT);
break;
- }
- ret = afr_notify (this, event, data);
+ case GF_EVENT_CHILD_UP:
+ if (is_xlator_pump_sink (child_xl))
+ if (is_pump_start_pending (this)) {
+ ret = pump_start_synctask (this);
+ if (ret < 0)
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Could not start pump "
+ "synctask");
+ else
+ pump_remove_start_pending (this);
+ }
+ }
return ret;
}
diff --git a/xlators/cluster/afr/src/pump.h b/xlators/cluster/afr/src/pump.h
index 15799002b..e786fb0de 100644
--- a/xlators/cluster/afr/src/pump.h
+++ b/xlators/cluster/afr/src/pump.h
@@ -22,6 +22,10 @@
#include "syncop.h"
+/* FIXME: Needs to be defined in a common file */
+#define CLIENT_CMD_CONNECT "trusted.glusterfs.client-connect"
+#define CLIENT_CMD_DISCONNECT "trusted.glusterfs.client-disconnect"
+
#define PUMP_PID 696969
#define PUMP_LK_OWNER 696969
@@ -43,23 +47,23 @@
#define PUMP_SINK_CHILD(xl) (xl->children->next->xlator)
typedef enum {
- PUMP_STATE_RUNNING,
- PUMP_STATE_RESUME,
- PUMP_STATE_PAUSE,
- PUMP_STATE_ABORT,
+ PUMP_STATE_RUNNING, /* Pump is running and migrating files */
+ PUMP_STATE_RESUME, /* Pump is resuming from a previous pause */
+ PUMP_STATE_PAUSE, /* Pump is paused */
+ PUMP_STATE_ABORT, /* Pump is aborted */
} pump_state_t;
typedef struct _pump_private {
- struct syncenv *env;
- const char *resume_path;
- gf_lock_t resume_path_lock;
- gf_lock_t pump_state_lock;
- pump_state_t pump_state;
- long source_blocks;
- long sink_blocks;
- char current_file[PATH_MAX];
- uint64_t number_files_pumped;
- gf_boolean_t pump_finished;
+ struct syncenv *env; /* The env pointer to the pump synctask */
+ const char *resume_path; /* path to resume from the last pause */
+ gf_lock_t resume_path_lock; /* Synchronize resume_path changes */
+ gf_lock_t pump_state_lock; /* Synchronize pump_state changes */
+ pump_state_t pump_state; /* State of pump */
+ char current_file[PATH_MAX]; /* Current file being pumped */
+ uint64_t number_files_pumped; /* Number of files pumped */
+ gf_boolean_t pump_finished; /* Boolean to indicate pump termination */
+ char pump_start_pending; /* Boolean to mark start pending until
+ CHILD_UP */
} pump_private_t;
void