diff options
Diffstat (limited to 'xlators/mgmt/glusterd/src/glusterd-syncop.c')
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.c | 270 | 
1 files changed, 242 insertions, 28 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c index 2b1c88c3cc3..a8f58e0255d 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.c +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c @@ -80,6 +80,7 @@ out:  /* Defined in glusterd-rpc-ops.c */  extern struct rpc_clnt_program gd_mgmt_prog; +extern struct rpc_clnt_program gd_brick_prog;  int32_t  gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, @@ -312,6 +313,137 @@ out:  /*TODO: Need to add syncop for brick ops*/  int32_t +gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov, +                        int count, void *myframe) +{ +        struct syncargs        *args  = NULL; +        gd1_mgmt_brick_op_rsp  rsp   = {0,}; +        int                     ret   = -1; +        call_frame_t           *frame = NULL; + +        frame = myframe; +        args = frame->local; +        frame->local = NULL; + +        /* initialize */ +        args->op_ret   = -1; +        args->op_errno = EINVAL; + +        if (-1 == req->rpc_status) { +                args->op_errno = ENOTCONN; +                goto out; +        } + +        ret = xdr_to_generic (*iov, &rsp, +                              (xdrproc_t)xdr_gd1_mgmt_brick_op_rsp); +        if (ret < 0) +                goto out; + +        if (rsp.output.output_len) { +                args->dict  = dict_new (); +                if (!args->dict) { +                        ret = -1; +                        args->op_errno = ENOMEM; +                        goto out; +                } + +                ret = dict_unserialize (rsp.output.output_val, +                                        rsp.output.output_len, +                                        &args->dict); +                if (ret < 0) +                        goto out; +        } + +        args->op_ret = rsp.op_ret; +        args->op_errno = rsp.op_errno; +        args->errstr = gf_strdup (rsp.op_errstr); + +out: +        if (strcmp (rsp.op_errstr, "") != 0) +                free (rsp.op_errstr); +        free (rsp.output.output_val); +        STACK_DESTROY (frame->root); + +        __wake (args); + +        return 0; +} + +static void +gd_brick_op_req_free (gd1_mgmt_brick_op_req *req) +{ +        if (!req) +                return; + +        if (strcmp (req->name, "") != 0) +                GF_FREE (req->name); +        GF_FREE (req->input.input_val); +        GF_FREE (req); +} + +int +gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode, +                         int op, dict_t *dict_out, dict_t *op_ctx, +                         char **errstr) +{ +        struct syncargs        args = {0, }; +        gd1_mgmt_brick_op_req  *req  = NULL; +        int                    ret  = 0; +        xlator_t               *this = NULL; + +        this = THIS; +        args.op_ret = -1; +        args.op_errno = ENOTCONN; + +        if ((pnode->type == GD_NODE_NFS) || +            ((pnode->type == GD_NODE_SHD) && +            (op == GD_OP_STATUS_VOLUME))) { +                ret = glusterd_node_op_build_payload +                        (op, &req, dict_out); + +        } else { +                ret = glusterd_brick_op_build_payload +                        (op, pnode->node, &req, dict_out); + +        } + +        if (ret) +                goto out; + +        GD_SYNCOP (rpc, (&args), gd_syncop_brick_op_cbk, +                   req, &gd_brick_prog, req->op, +                   xdr_gd1_mgmt_brick_op_req); + +        if (args.errstr && errstr) +                *errstr = args.errstr; +        else +                GF_FREE (args.errstr); + +        if (GD_OP_STATUS_VOLUME == op) { +                ret = dict_set_int32 (args.dict, "index", pnode->index); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "Error setting index on brick status" +                                " rsp dict"); +                        args.op_ret = -1; +                        goto out; +                } +        } +        if (args.op_ret == 0) +                glusterd_handle_node_rsp (dict_out, pnode->node, op, +                                          args.dict, op_ctx, errstr, +                                          pnode->type); + +out: +        errno = args.op_errno; +        if (args.dict) +                dict_unref (args.dict); +        gd_brick_op_req_free (req); +        return args.op_ret; + +} + +int32_t  gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,                           int count, void *myframe)  { @@ -416,7 +548,7 @@ static int  glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp,                                 char *op_errstr)  { -        int ret = -1; +        int ret = 0;          switch (op) {          case GD_OP_REPLACE_BRICK: @@ -473,19 +605,23 @@ out:  void  gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)  { -        int                  ret      = -1; -        dict_t              *req_dict     = NULL; -        dict_t              *rsp_dict = NULL; -        glusterd_peerinfo_t *peerinfo = NULL; -        glusterd_peerinfo_t *tmp = NULL; -        glusterd_conf_t     *conf     = NULL; -        uuid_t               tmp_uuid = {0,}; -        glusterd_op_t        op       = 0; -        int32_t              tmp_op   = 0; -        gf_boolean_t         local_locked = _gf_false; -        char                 *op_errstr = NULL; -        xlator_t             *this = NULL; -        char                 *hostname = NULL; +        int                         ret             = -1; +        dict_t                      *req_dict       = NULL; +        dict_t                      *rsp_dict       = NULL; +        glusterd_peerinfo_t         *peerinfo       = NULL; +        glusterd_peerinfo_t         *tmp            = NULL; +        glusterd_conf_t             *conf           = NULL; +        uuid_t                      tmp_uuid        = {0,}; +        glusterd_op_t               op              = 0; +        int32_t                     tmp_op          = 0; +        gf_boolean_t                local_locked    = _gf_false; +        char                        *op_errstr      = NULL; +        glusterd_pending_node_t     *pending_node   = NULL; +        rpc_clnt_t                  *rpc            = NULL; +        int                         brick_count     = 0; +        struct list_head            selected        = {0}; +        xlator_t                    *this           = NULL; +        char                        *hostname       = NULL;          this = THIS;          GF_ASSERT (this); @@ -501,11 +637,6 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)          op = tmp_op; -        ret = -1; -        rsp_dict = dict_new (); -        if (!rsp_dict) -                goto out; -          /*  Lock everything */          ret = glusterd_lock (MY_UUID);          if (ret) { @@ -554,12 +685,30 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)          }          /* stage op */ +        ret = -1; +        rsp_dict = dict_new (); +        if (!rsp_dict) +                goto out; +          ret = glusterd_op_stage_validate (op, req_dict, &op_errstr, rsp_dict);          if (ret) {                  hostname = "localhost";                  goto stage_done;          } +        if (op == GD_OP_REPLACE_BRICK) { +                ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, +                                                     op_errstr); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, "%s", +                                (op_errstr)? op_errstr: "Failed to aggregate " +                                "response from node/brick"); +                        goto out; +                } +        } +        dict_unref (rsp_dict); +        rsp_dict = NULL; +          list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) {                  ret = gd_syncop_mgmt_stage_op (peerinfo->rpc,                                                 MY_UUID, tmp_uuid, @@ -570,13 +719,19 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)                          goto stage_done;                  } -                if (op == GD_OP_REPLACE_BRICK) -                        (void) glusterd_syncop_aggr_rsp_dict (op, op_ctx, +                if (op == GD_OP_REPLACE_BRICK) { +                        ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx,                                                                rsp_dict,                                                                op_errstr); - -                if (rsp_dict) -                        dict_unref (rsp_dict); +                        if (ret) { +                                gf_log (this->name, GF_LOG_ERROR, "%s", +                                        (op_errstr)? op_errstr: "Failed to " +                                        "aggregate response from node/brick"); +                                goto out; +                        } +                } +                dict_unref (rsp_dict); +                rsp_dict = NULL;          }  stage_done: @@ -589,12 +744,64 @@ stage_done:                  goto out;          } +        /*brick op */ +        INIT_LIST_HEAD (&selected); +        ret = glusterd_op_bricks_select (op, req_dict, &op_errstr, &selected); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, "%s", +                       (op_errstr)? op_errstr: "Brick op failed. Check " +                       "glusterd log file for more details."); +                goto out; +        } + +        brick_count = 0; +        list_for_each_entry (pending_node, &selected, list) { +                rpc = glusterd_pending_node_get_rpc (pending_node); +                if (!rpc) { +                        if (pending_node->type == GD_NODE_REBALANCE) { +                                ret = 0; +                                glusterd_defrag_volume_node_rsp (req_dict, +                                                                 NULL, op_ctx); +                                goto out; +                        } + +                        ret = -1; +                        gf_log (this->name, GF_LOG_ERROR, "Brick Op failed " +                                "due to rpc failure."); +                        goto out; +                } +                ret = gd_syncop_mgmt_brick_op (rpc, pending_node, op, req_dict, +                                               op_ctx, &op_errstr); +                if (ret) +                        goto out; + +                brick_count++; +        } + +        gf_log (this->name, GF_LOG_DEBUG, "Sent op req to %d bricks", +                brick_count); +          /* commit op */ +        rsp_dict = dict_new (); +        if (!rsp_dict) { +                ret = -1; +                goto out; +        } +          ret = glusterd_op_commit_perform (op, req_dict, &op_errstr, rsp_dict);          if (ret) {                  hostname = "localhost";                  goto commit_done;          } +        ret =  glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, op_errstr); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, "%s", +                        (op_errstr)? op_errstr: "Failed to aggregate response " +                        "from node/brick"); +                goto out; +        } +        dict_unref (rsp_dict); +        rsp_dict = NULL;          list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) {                  ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, @@ -605,10 +812,16 @@ stage_done:                          hostname = peerinfo->hostname;                          goto commit_done;                  } -                (void) glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, -                                                      op_errstr); -                if (rsp_dict) -                        dict_unref (rsp_dict); +                ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, +                                                     op_errstr); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, "%s", +                                (op_errstr)? op_errstr: "Failed to aggregate " +                                "response from node/brick"); +                        goto out; +                } +                dict_unref (rsp_dict); +                rsp_dict = NULL;          }  commit_done:          if (ret) { @@ -665,6 +878,7 @@ glusterd_op_begin_synctask (rpcsvc_request_t *req, glusterd_op_t op,          }          gd_sync_task_begin (dict, req); +        ret = 0;  out:          if (dict)                  dict_unref (dict);  | 
