diff options
-rw-r--r-- | cli/src/cli-cmd-volume.c | 37 | ||||
-rw-r--r-- | cli/src/cli-cmd.c | 45 | ||||
-rw-r--r-- | cli/src/cli-cmd.h | 2 | ||||
-rw-r--r-- | cli/src/cli.c | 50 | ||||
-rw-r--r-- | cli/src/cli.h | 10 | ||||
-rw-r--r-- | cli/src/cli3_1-cops.c | 21 | ||||
-rw-r--r-- | cli/src/input.c | 1 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 6 |
8 files changed, 137 insertions, 35 deletions
diff --git a/cli/src/cli-cmd-volume.c b/cli/src/cli-cmd-volume.c index 445432ecd..cc8200bd2 100644 --- a/cli/src/cli-cmd-volume.c +++ b/cli/src/cli-cmd-volume.c @@ -144,7 +144,6 @@ cli_cmd_volume_start_cbk (struct cli_state *state, struct cli_cmd_word *word, call_frame_t *frame = NULL; char *volname = NULL; - proc = &cli_rpc_prog->proctable[GF1_CLI_START_VOLUME]; frame = create_frame (THIS, THIS->ctx->pool); if (!frame) @@ -159,12 +158,14 @@ cli_cmd_volume_start_cbk (struct cli_state *state, struct cli_cmd_word *word, volname = (char *)words[2]; GF_ASSERT (volname); + proc = &cli_rpc_prog->proctable[GF1_CLI_START_VOLUME]; + if (proc->fn) { ret = proc->fn (frame, THIS, volname); } out: - if (ret && volname) + if (!proc && ret && volname) cli_out ("Starting Volume %s failed", volname); return ret; @@ -180,7 +181,6 @@ cli_cmd_volume_stop_cbk (struct cli_state *state, struct cli_cmd_word *word, call_frame_t *frame = NULL; char *volname = NULL; - proc = &cli_rpc_prog->proctable[GF1_CLI_STOP_VOLUME]; frame = create_frame (THIS, THIS->ctx->pool); if (!frame) @@ -190,12 +190,14 @@ cli_cmd_volume_stop_cbk (struct cli_state *state, struct cli_cmd_word *word, volname = (char *)words[2]; GF_ASSERT (volname); + proc = &cli_rpc_prog->proctable[GF1_CLI_STOP_VOLUME]; + if (proc->fn) { ret = proc->fn (frame, THIS, volname); } out: - if (ret) + if (!proc && ret) cli_out ("Stopping Volume %s failed", volname); return ret; @@ -211,7 +213,6 @@ cli_cmd_volume_rename_cbk (struct cli_state *state, struct cli_cmd_word *word, call_frame_t *frame = NULL; dict_t *dict = NULL; - proc = &cli_rpc_prog->proctable[GF1_CLI_RENAME_VOLUME]; frame = create_frame (THIS, THIS->ctx->pool); if (!frame) @@ -236,12 +237,14 @@ cli_cmd_volume_rename_cbk (struct cli_state *state, struct cli_cmd_word *word, if (ret) goto out; + proc = &cli_rpc_prog->proctable[GF1_CLI_RENAME_VOLUME]; + if (proc->fn) { ret = proc->fn (frame, THIS, dict); } out: - if (ret) { + if (!proc && ret) { char *volname = (char *) words[2]; if (dict) dict_destroy (dict); @@ -261,7 +264,6 @@ cli_cmd_volume_defrag_cbk (struct cli_state *state, struct cli_cmd_word *word, call_frame_t *frame = NULL; char *volname = NULL; - proc = &cli_rpc_prog->proctable[GF1_CLI_DEFRAG_VOLUME]; frame = create_frame (THIS, THIS->ctx->pool); if (!frame) @@ -271,12 +273,14 @@ cli_cmd_volume_defrag_cbk (struct cli_state *state, struct cli_cmd_word *word, volname = (char *)words[2]; GF_ASSERT (volname); + proc = &cli_rpc_prog->proctable[GF1_CLI_DEFRAG_VOLUME]; + if (proc->fn) { ret = proc->fn (frame, THIS, volname); } out: - if (ret) + if (!proc && ret) cli_out ("Defrag of Volume %s failed", volname); return 0; @@ -293,7 +297,6 @@ cli_cmd_volume_set_cbk (struct cli_state *state, struct cli_cmd_word *word, char *volname = NULL; dict_t *dict = NULL; - proc = &cli_rpc_prog->proctable[GF1_CLI_SET_VOLUME]; frame = create_frame (THIS, THIS->ctx->pool); if (!frame) @@ -310,12 +313,14 @@ cli_cmd_volume_set_cbk (struct cli_state *state, struct cli_cmd_word *word, goto out; //TODO: Build validation here + proc = &cli_rpc_prog->proctable[GF1_CLI_SET_VOLUME]; + if (proc->fn) { ret = proc->fn (frame, THIS, dict); } out: - if (ret) { + if (!proc && ret) { if (dict) dict_destroy (dict); cli_out ("Changing option on Volume %s failed", volname); @@ -335,8 +340,6 @@ cli_cmd_volume_add_brick_cbk (struct cli_state *state, call_frame_t *frame = NULL; dict_t *options = NULL; - proc = &cli_rpc_prog->proctable[GF1_CLI_ADD_BRICK]; - frame = create_frame (THIS, THIS->ctx->pool); if (!frame) goto out; @@ -346,12 +349,14 @@ cli_cmd_volume_add_brick_cbk (struct cli_state *state, if (ret) goto out; + proc = &cli_rpc_prog->proctable[GF1_CLI_ADD_BRICK]; + if (proc->fn) { ret = proc->fn (frame, THIS, options); } out: - if (ret) { + if (!proc && ret) { char *volname = (char *) words[2]; cli_out ("Adding brick to Volume %s failed",volname ); } @@ -369,8 +374,6 @@ cli_cmd_volume_remove_brick_cbk (struct cli_state *state, call_frame_t *frame = NULL; dict_t *options = NULL; - proc = &cli_rpc_prog->proctable[GF1_CLI_REMOVE_BRICK]; - frame = create_frame (THIS, THIS->ctx->pool); if (!frame) goto out; @@ -380,12 +383,14 @@ cli_cmd_volume_remove_brick_cbk (struct cli_state *state, if (ret) goto out; + proc = &cli_rpc_prog->proctable[GF1_CLI_REMOVE_BRICK]; + if (proc->fn) { ret = proc->fn (frame, THIS, options); } out: - if (ret) { + if (!proc && ret) { char *volname = (char *) words[2]; cli_out ("Removing brick from Volume %s failed",volname ); } diff --git a/cli/src/cli-cmd.c b/cli/src/cli-cmd.c index 4ef8e86cd..043cb91d7 100644 --- a/cli/src/cli-cmd.c +++ b/cli/src/cli-cmd.c @@ -37,6 +37,11 @@ static int cmd_done; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t cond_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t conn = PTHREAD_COND_INITIALIZER; +static pthread_mutex_t conn_mutex = PTHREAD_MUTEX_INITIALIZER; + +int cli_op_ret = 0; +int connected = 1; int cli_cmd_process (struct cli_state *state, int argc, char **argv) @@ -178,6 +183,7 @@ cli_cmd_await_response () pthread_mutex_lock (&cond_mutex); { + cli_op_ret = 0; while (!cmd_done) { pthread_cond_wait (&cond, &cond_mutex); } @@ -187,15 +193,16 @@ cli_cmd_await_response () pthread_mutex_destroy (&cond_mutex); pthread_cond_destroy (&cond); - return 0; + return cli_op_ret; } int -cli_cmd_broadcast_response () +cli_cmd_broadcast_response (int32_t status) { pthread_mutex_lock (&cond_mutex); { cmd_done = 1; + cli_op_ret = status; pthread_cond_broadcast (&cond); } @@ -204,3 +211,37 @@ cli_cmd_broadcast_response () return 0; } +int32_t +cli_cmd_await_connected () +{ + pthread_mutex_init (&conn_mutex, NULL); + pthread_cond_init (&conn, NULL); + + pthread_mutex_lock (&conn_mutex); + { + while (!connected) { + pthread_cond_wait (&conn, &conn_mutex); + } + } + pthread_mutex_unlock (&conn_mutex); + + pthread_mutex_destroy (&conn_mutex); + pthread_cond_destroy (&conn); + + return 0; +} + +int32_t +cli_cmd_broadcast_connected () +{ + connected = 1; + gf_log ("", GF_LOG_NORMAL, "Connected"); + pthread_mutex_lock (&conn_mutex); + { + pthread_cond_broadcast (&conn); + } + + pthread_mutex_unlock (&conn_mutex); + + return 0; +} diff --git a/cli/src/cli-cmd.h b/cli/src/cli-cmd.h index 4a8381510..b08f5b3d5 100644 --- a/cli/src/cli-cmd.h +++ b/cli/src/cli-cmd.h @@ -44,5 +44,5 @@ void cli_cmd_tokens_destroy (char **tokens); int cli_cmd_await_response (); -int cli_cmd_broadcast_response (); +int cli_cmd_broadcast_response (int32_t status); #endif /* __CLI_CMD_H__ */ diff --git a/cli/src/cli.c b/cli/src/cli.c index d70c67f85..5f734d8e8 100644 --- a/cli/src/cli.c +++ b/cli/src/cli.c @@ -74,6 +74,7 @@ #include <fnmatch.h> +extern int connected; /* using argp for command line parsing */ static char gf_doc[] = ""; @@ -319,6 +320,41 @@ out: } int +cli_rpc_notify (struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event, + void *data) +{ + xlator_t *this = NULL; + int ret = 0; + + this = data; + + switch (event) { + case RPC_CLNT_CONNECT: + { + + cli_cmd_broadcast_connected (); + gf_log (this->name, GF_LOG_TRACE, "got RPC_CLNT_CONNECT"); + break; + } + + case RPC_CLNT_DISCONNECT: + { + gf_log (this->name, GF_LOG_TRACE, "got RPC_CLNT_DISCONNECT"); + connected = 0; + break; + } + + default: + gf_log (this->name, GF_LOG_TRACE, + "got some other RPC event %d", event); + ret = 0; + break; + } + + return ret; +} + +int parse_cmdline (int argc, char *argv[], struct cli_state *state) { int ret = 0; @@ -393,8 +429,10 @@ cli_rpc_init (struct cli_state *state) dict_t *options = NULL; int ret = -1; int port = CLI_GLUSTERD_PORT; + xlator_t *this = NULL; + this = THIS; cli_rpc_prog = &cli3_1_prog; options = dict_new (); if (!options) @@ -418,8 +456,11 @@ cli_rpc_init (struct cli_state *state) if (ret) goto out; - rpc = rpc_clnt_init (&rpc_cfg, options, THIS->ctx, THIS->name); + rpc = rpc_clnt_init (&rpc_cfg, options, this->ctx, this->name); + if (rpc) { + ret = rpc_clnt_register_notify (rpc, cli_rpc_notify, this); + } out: return rpc; } @@ -459,6 +500,10 @@ main (int argc, char *argv[]) if (ret) goto out; + global_rpc = cli_rpc_init (&state); + if (!global_rpc) + goto out; + state.ctx = ctx; global_state = &state; @@ -474,9 +519,6 @@ main (int argc, char *argv[]) if (ret) goto out; - global_rpc = cli_rpc_init (&state); - if (!global_rpc) - goto out; ret = cli_input_init (&state); if (ret) diff --git a/cli/src/cli.h b/cli/src/cli.h index 140a1af13..c31b4631a 100644 --- a/cli/src/cli.h +++ b/cli/src/cli.h @@ -167,4 +167,14 @@ cli_cmd_volume_replace_brick_parse (const char **words, int wordcount, dict_t **options); cli_local_t * cli_local_get (); + +int32_t +cli_cmd_await_connected (); + +int32_t +cli_cmd_broadcast_connected (); + +int +cli_rpc_notify (struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event, + void *data); #endif /* __CLI_H__ */ diff --git a/cli/src/cli3_1-cops.c b/cli/src/cli3_1-cops.c index 6ebe6ddbe..8e6c76ae6 100644 --- a/cli/src/cli3_1-cops.c +++ b/cli/src/cli3_1-cops.c @@ -33,6 +33,7 @@ #include "protocol-common.h" extern rpc_clnt_prog_t *cli_rpc_prog; +extern int cli_op_ret; int gf_cli3_1_probe_cbk (struct rpc_req *req, struct iovec *iov, @@ -60,7 +61,7 @@ gf_cli3_1_probe_cbk (struct rpc_req *req, struct iovec *iov, ret = rsp.op_ret; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); return ret; } @@ -90,7 +91,7 @@ gf_cli3_1_deprobe_cbk (struct rpc_req *req, struct iovec *iov, ret = rsp.op_ret; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); return ret; } @@ -191,7 +192,7 @@ gf_cli3_1_list_friends_cbk (struct rpc_req *req, struct iovec *iov, ret = 0; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); if (ret) cli_out ("Command Execution Failed"); @@ -299,7 +300,7 @@ gf_cli3_1_get_volume_cbk (struct rpc_req *req, struct iovec *iov, ret = 0; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); if (ret) cli_out ("Command Execution Failed"); @@ -341,7 +342,7 @@ gf_cli3_1_create_volume_cbk (struct rpc_req *req, struct iovec *iov, ret = rsp.op_ret; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); return ret; } @@ -379,7 +380,7 @@ gf_cli3_1_delete_volume_cbk (struct rpc_req *req, struct iovec *iov, ret = rsp.op_ret; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); gf_log ("", GF_LOG_NORMAL, "Returning with %d", ret); return ret; } @@ -419,7 +420,7 @@ gf_cli3_1_start_volume_cbk (struct rpc_req *req, struct iovec *iov, ret = rsp.op_ret; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); return ret; } @@ -458,7 +459,7 @@ gf_cli3_1_stop_volume_cbk (struct rpc_req *req, struct iovec *iov, ret = rsp.op_ret; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); return ret; } @@ -497,7 +498,7 @@ gf_cli3_1_defrag_volume_cbk (struct rpc_req *req, struct iovec *iov, ret = rsp.op_ret; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); return ret; } @@ -582,7 +583,7 @@ gf_cli3_1_add_brick_cbk (struct rpc_req *req, struct iovec *iov, ret = rsp.op_ret; out: - cli_cmd_broadcast_response (); + cli_cmd_broadcast_response (ret); return ret; } diff --git a/cli/src/input.c b/cli/src/input.c index a577a0f4c..25a7cb62d 100644 --- a/cli/src/input.c +++ b/cli/src/input.c @@ -41,6 +41,7 @@ cli_batch (void *d) state = d; + cli_cmd_await_connected (); ret = cli_cmd_process (state, state->argc, state->argv); gf_log ("", GF_LOG_NORMAL, "Exiting with: %d", ret); exit (ret); diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 15173d050..f2c2736e0 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -643,6 +643,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) rpc_request_info_t *request_info = NULL; int ret = -1; struct rpc_req req = {0, }; + int cbk_ret = -1; conn = &clnt->conn; @@ -666,7 +667,8 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) "failed"); } - saved_frame->cbkfn (&req, req.rsp, req.rspcnt, saved_frame->frame); + cbk_ret = saved_frame->cbkfn (&req, req.rsp, req.rspcnt, + saved_frame->frame); if (ret == 0) { rpc_clnt_reply_deinit (&req); @@ -679,7 +681,7 @@ out: GF_FREE (saved_frame); } - return ret; + return cbk_ret; } |