diff options
| author | Krishnan Parthasarathi <kparthas@redhat.com> | 2013-02-20 14:44:23 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2013-02-26 09:06:12 -0800 | 
| commit | 5e6dfce0b0d55d96b5bdad6a693fdb2826c20b92 (patch) | |
| tree | 8521f9b3a8e5603b3c3c76efa4ca07210884acba | |
| parent | dc43e7dd9934925f8cb96762c33be23ccb63528a (diff) | |
glusterd: Increasing throughput of synctask based mgmt ops.
Change-Id: Ibd963f78707b157fc4c9729aa87206cfd5ecfe81
BUG: 913662
Signed-off-by: Krishnan Parthasarathi <kparthas@redhat.com>
Reviewed-on: http://review.gluster.org/4570
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Amar Tumballi <amarts@redhat.com>
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
| -rw-r--r-- | libglusterfs/src/syncop.h | 1 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-mem-types.h | 3 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.c | 699 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.h | 22 | 
4 files changed, 387 insertions, 338 deletions
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index ba0440cd790..aec07f36050 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -114,6 +114,7 @@ struct syncargs {          uuid_t              uuid;          char               *errstr;          dict_t             *dict; +        pthread_mutex_t     lock_dict;          /* do not touch */          struct synctask    *task; diff --git a/xlators/mgmt/glusterd/src/glusterd-mem-types.h b/xlators/mgmt/glusterd/src/glusterd-mem-types.h index a13236da1fe..98216e28ab0 100644 --- a/xlators/mgmt/glusterd/src/glusterd-mem-types.h +++ b/xlators/mgmt/glusterd/src/glusterd-mem-types.h @@ -65,7 +65,8 @@ typedef enum gf_gld_mem_types_ {          gf_gld_mt_charptr                       = gf_common_mt_end + 49,          gf_gld_mt_hooks_stub_t                  = gf_common_mt_end + 50,          gf_gld_mt_hooks_priv_t                  = gf_common_mt_end + 51, -        gf_gld_mt_end                           = gf_common_mt_end + 52, +        gf_gld_mt_mop_commit_req_t              = gf_common_mt_end + 52, +        gf_gld_mt_end                           = gf_common_mt_end + 53,  } gf_gld_mem_types_t;  #endif diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c index a01753f3a29..b943d8e9487 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.c +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c @@ -8,7 +8,6 @@     cases as published by the Free Software Foundation.  */  /* rpc related syncops */ -#include "syncop.h"  #include "rpc-clnt.h"  #include "protocol-common.h"  #include "xdr-generic.h" @@ -19,6 +18,57 @@  #include "glusterd-op-sm.h"  #include "glusterd-utils.h" +static void +gd_collate_errors (struct syncargs *args, int op_ret, int op_errno, +                   char *op_errstr) +{ +        if (args->op_ret) +                return; +        args->op_ret = op_ret; +        args->op_errno = op_errno; +        if (op_ret && op_errstr && strcmp (op_errstr, "")) +                args->errstr = gf_strdup (op_errstr); +} + +static void +gd_syncargs_init (struct syncargs *args, dict_t *op_ctx) +{ +        args->dict = op_ctx; +        pthread_mutex_init (&args->lock_dict, NULL); +} + +static void +gd_stage_op_req_free (gd1_mgmt_stage_op_req *req) +{ +        if (!req) +                return; + +        GF_FREE (req->buf.buf_val); +        GF_FREE (req); +} + +static void +gd_commit_op_req_free (gd1_mgmt_commit_op_req *req) +{ +        if (!req) +                return; + +        GF_FREE (req->buf.buf_val); +        GF_FREE (req); +} + +static void +gd_brick_op_req_free (gd1_mgmt_brick_op_req *req) +{ +        if (!req) +                return; + +        if (strcmp (req->name, "") != 0) +                GF_FREE (req->name); +        GF_FREE (req->input.input_val); +        GF_FREE (req); +} +  int  gd_syncop_submit_request (struct rpc_clnt *rpc, void *req,                            void *cookie, rpc_clnt_prog_t *prog, @@ -82,23 +132,84 @@ out:  extern struct rpc_clnt_program gd_mgmt_prog;  extern struct rpc_clnt_program gd_brick_prog; +static int +glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp) +{ +        int ret = 0; + +        switch (op) { +        case GD_OP_REPLACE_BRICK: +                ret = glusterd_rb_use_rsp_dict (aggr, rsp); +                if (ret) +                        goto out; +        break; + +        case GD_OP_SYNC_VOLUME: +                ret = glusterd_sync_use_rsp_dict (aggr, rsp); +                if (ret) +                        goto out; +        break; + +        case GD_OP_PROFILE_VOLUME: +                ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp); +                if (ret) +                        goto out; +        break; + +        case GD_OP_GSYNC_SET: +                ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL); +                if (ret) +                        goto out; +        break; + +        case GD_OP_STATUS_VOLUME: +                ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp); +                if (ret) +                        goto out; +        break; + +        case GD_OP_REBALANCE: +        case GD_OP_DEFRAG_BRICK_VOLUME: +                ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp); +                if (ret) +                        goto out; +        break; + +        case GD_OP_HEAL_VOLUME: +                ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp); +                if (ret) +                        goto out; + +        break; + +        case GD_OP_QUOTA: +        case GD_OP_CLEARLOCKS_VOLUME: +                ret = glusterd_use_rsp_dict (aggr, rsp); +                if (ret) +                        goto out; + +        break; + +        default: +        break; +        } +out: +        return ret; +} +  int32_t  gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,                           int count, void *myframe)  { -        struct syncargs           *args  = NULL; -        gd1_mgmt_cluster_lock_rsp  rsp   = {{0},}; -        int                        ret   = -1; -        call_frame_t              *frame = NULL; +        int                         ret         = -1; +        struct syncargs             *args       = NULL; +        gd1_mgmt_cluster_lock_rsp   rsp         = {{0},}; +        call_frame_t                *frame      = NULL; -        frame = myframe; -        args = frame->local; +        frame  = myframe; +        args   = frame->local;          frame->local = NULL; -        /* initialize */ -        args->op_ret   = -1; -        args->op_errno = EINVAL; -          if (-1 == req->rpc_status) {                  args->op_errno = ENOTCONN;                  goto out; @@ -106,64 +217,49 @@ gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,          ret = xdr_to_generic (*iov, &rsp,                                (xdrproc_t)xdr_gd1_mgmt_cluster_lock_rsp); -        if (ret < 0) { +        if (ret < 0)                  goto out; -        } - -        args->op_ret = rsp.op_ret; -        args->op_errno = rsp.op_errno; +        gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL);          uuid_copy (args->uuid, rsp.uuid);  out:          STACK_DESTROY (frame->root); - -        __wake (args); - +        synctask_barrier_wake(args);          return 0;  }  int -gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) +gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *args, +                     uuid_t my_uuid, uuid_t recv_uuid)  { -        struct syncargs           args = {0, }; +        int                       ret = -1;          gd1_mgmt_cluster_lock_req req  = {{0},};          uuid_copy (req.uuid, my_uuid); - -        args.op_ret = -1; -        args.op_errno = ENOTCONN; - -        GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_lock_cbk, -                   &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_LOCK, -                   xdr_gd1_mgmt_cluster_lock_req); - -        if (!args.op_ret) -                uuid_copy (recv_uuid, args.uuid); - -        errno = args.op_errno; -        return args.op_ret; - +        ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog, +                                        GLUSTERD_MGMT_CLUSTER_LOCK, +                                        gd_syncop_mgmt_lock_cbk, +                                        (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req); +        if (ret) +                synctask_barrier_wake(args); +        return ret;  }  int32_t  gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov,                             int count, void *myframe)  { -        struct syncargs             *args  = NULL; -        gd1_mgmt_cluster_unlock_rsp  rsp   = {{0},}; -        int                          ret   = -1; -        call_frame_t              *frame = NULL; +        int                         ret         = -1; +        struct syncargs             *args       = NULL; +        gd1_mgmt_cluster_unlock_rsp rsp         = {{0},}; +        call_frame_t                *frame      = NULL;          frame = myframe; -        args = frame->local; +        args  = frame->local;          frame->local = NULL; -        /* initialize */ -        args->op_ret   = -1; -        args->op_errno = EINVAL; -          if (-1 == req->rpc_status) {                  args->op_errno = ENOTCONN;                  goto out; @@ -171,154 +267,143 @@ gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov,          ret = xdr_to_generic (*iov, &rsp,                                (xdrproc_t)xdr_gd1_mgmt_cluster_unlock_rsp); -        if (ret < 0) { +        if (ret < 0)                  goto out; -        } - -        args->op_ret = rsp.op_ret; -        args->op_errno = rsp.op_errno; +        gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL);          uuid_copy (args->uuid, rsp.uuid);  out:          STACK_DESTROY (frame->root); - -        __wake (args); - +        synctask_barrier_wake(args);          return 0;  }  int -gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) +gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *args, +                       uuid_t my_uuid, uuid_t recv_uuid)  { -        struct syncargs             args = {0, }; -        gd1_mgmt_cluster_unlock_req req  = {{0},}; +        int                         ret     = -1; +        gd1_mgmt_cluster_unlock_req req     = {{0},};          uuid_copy (req.uuid, my_uuid); - -        args.op_ret = -1; -        args.op_errno = ENOTCONN; - -        GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_unlock_cbk, -                   &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_UNLOCK, -                   xdr_gd1_mgmt_cluster_unlock_req); - -        if (!args.op_ret) -                uuid_copy (recv_uuid, args.uuid); - -        errno = args.op_errno; -        return args.op_ret; - +        ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog, +                                        GLUSTERD_MGMT_CLUSTER_UNLOCK, +                                        gd_syncop_mgmt_unlock_cbk, +                                        (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req); +        if (ret) +                synctask_barrier_wake(args); +        return ret;  }  int32_t  gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov,                          int count, void *myframe)  { -        struct syncargs       *args  = NULL; -        gd1_mgmt_stage_op_rsp  rsp   = {{0},}; -        int                    ret   = -1; -        call_frame_t              *frame = NULL; - +        int                         ret         = -1; +        gd1_mgmt_stage_op_rsp       rsp         = {{0},}; +        struct syncargs             *args       = NULL; +        xlator_t                    *this       = NULL; +        dict_t                      *rsp_dict   = NULL; +        call_frame_t                *frame      = NULL; + +        this  = THIS;          frame = myframe; -        args = frame->local; +        args  = frame->local;          frame->local = NULL; -        /* initialize */ -        args->op_ret   = -1; -        args->op_errno = EINVAL; -          if (-1 == req->rpc_status) { +                args->op_ret   = -1;                  args->op_errno = ENOTCONN;                  goto out;          }          ret = xdr_to_generic (*iov, &rsp,                                (xdrproc_t)xdr_gd1_mgmt_stage_op_rsp); -        if (ret < 0) { +        if (ret < 0)                  goto out; -        }          if (rsp.dict.dict_len) {                  /* Unserialize the dictionary */ -                args->dict  = dict_new (); +                rsp_dict  = dict_new ();                  ret = dict_unserialize (rsp.dict.dict_val,                                          rsp.dict.dict_len, -                                        &args->dict); +                                        &rsp_dict);                  if (ret < 0) {                          GF_FREE (rsp.dict.dict_val);                          goto out;                  } else { -                        args->dict->extra_stdfree = rsp.dict.dict_val; +                        rsp_dict->extra_stdfree = rsp.dict.dict_val;                  }          } -        args->op_ret = rsp.op_ret; -        args->op_errno = rsp.op_errno; - +        gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr);          uuid_copy (args->uuid, rsp.uuid); - -        args->errstr = gf_strdup (rsp.op_errstr); +        if (rsp.op == GD_OP_REPLACE_BRICK) { +                pthread_mutex_lock (&args->lock_dict); +                { +                        ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict, +                                                             rsp_dict); +                        if (ret) +                                gf_log (this->name, GF_LOG_ERROR, "%s", +                                        "Failed to aggregate response from " +                                        " node/brick"); +                } +                pthread_mutex_unlock (&args->lock_dict); +        }  out: -        STACK_DESTROY (frame->root); - -        __wake (args); +        if (rsp_dict) +                dict_unref (rsp_dict); +        STACK_DESTROY (frame->root); +        synctask_barrier_wake(args);          return 0;  }  int -gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid, -                         int op, dict_t *dict_out, dict_t **dict_in, -                         char **errstr) +gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *args, +                         uuid_t my_uuid, uuid_t recv_uuid, int op, +                         dict_t *dict_out, dict_t *op_ctx)  { -        struct syncargs       args = {0, }; -        gd1_mgmt_stage_op_req req  = {{0},}; -        int                   ret  = 0; +        gd1_mgmt_stage_op_req *req  = NULL; +        int                   ret  = -1; -        uuid_copy (req.uuid, my_uuid); -        req.op = op; +        req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_stage_req_t); +        if (!req) +                goto out; -        args.op_ret = -1; -        args.op_errno = ENOTCONN; +        uuid_copy (req->uuid, my_uuid); +        req->op = op;          ret = dict_allocate_and_serialize (dict_out, -                                           &req.buf.buf_val, &req.buf.buf_len); +                                           &req->buf.buf_val, &req->buf.buf_len);          if (ret)                  goto out; -        GD_SYNCOP (rpc, (&args), gd_syncop_stage_op_cbk, -                   &req, &gd_mgmt_prog, GLUSTERD_MGMT_STAGE_OP, -                   xdr_gd1_mgmt_stage_op_req); - -        if (args.errstr && errstr) -                *errstr = args.errstr; -        else GF_FREE (args.errstr); - -        if (args.dict && dict_in) -                *dict_in = args.dict; -        else if (args.dict) -                dict_unref (args.dict); - -        uuid_copy (recv_uuid, args.uuid); +        ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog, +                                        GLUSTERD_MGMT_STAGE_OP, +                                        gd_syncop_stage_op_cbk, +                                        (xdrproc_t) xdr_gd1_mgmt_stage_op_req);  out: -        errno = args.op_errno; -        return args.op_ret; +        gd_stage_op_req_free (req); +        if (ret) +                synctask_barrier_wake(args); + +        return ret;  } -/*TODO: Need to add syncop for brick ops*/  int32_t  gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov,                          int count, void *myframe)  {          struct syncargs        *args  = NULL;          gd1_mgmt_brick_op_rsp  rsp   = {0,}; -        int                     ret   = -1; +        int                    ret   = -1;          call_frame_t           *frame = NULL;          frame = myframe; @@ -362,25 +447,13 @@ out:          if (strcmp (rsp.op_errstr, "") != 0)                  free (rsp.op_errstr);          free (rsp.output.output_val); -        STACK_DESTROY (frame->root); +        STACK_DESTROY (frame->root);          __wake (args);          return 0;  } -static void -gd_brick_op_req_free (gd1_mgmt_brick_op_req *req) -{ -        if (!req) -                return; - -        if (strcmp (req->name, "") != 0) -                GF_FREE (req->name); -        GF_FREE (req->input.input_val); -        GF_FREE (req); -} -  int  gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode,                           int op, dict_t *dict_out, dict_t *op_ctx, @@ -447,19 +520,18 @@ int32_t  gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,                           int count, void *myframe)  { -        struct syncargs        *args  = NULL; -        gd1_mgmt_commit_op_rsp  rsp   = {{0},}; -        int                     ret   = -1; -        call_frame_t           *frame = NULL; - +        int                         ret         = -1; +        gd1_mgmt_commit_op_rsp      rsp         = {{0},}; +        struct syncargs             *args       = NULL; +        xlator_t                    *this       = NULL; +        dict_t                      *rsp_dict   = NULL; +        call_frame_t                *frame      = NULL; + +        this  = THIS;          frame = myframe; -        args = frame->local; +        args  = frame->local;          frame->local = NULL; -        /* initialize */ -        args->op_ret   = -1; -        args->op_errno = EINVAL; -          if (-1 == req->rpc_status) {                  args->op_errno = ENOTCONN;                  goto out; @@ -473,146 +545,80 @@ gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,          if (rsp.dict.dict_len) {                  /* Unserialize the dictionary */ -                args->dict  = dict_new (); +                rsp_dict  = dict_new ();                  ret = dict_unserialize (rsp.dict.dict_val,                                          rsp.dict.dict_len, -                                        &args->dict); +                                        &rsp_dict);                  if (ret < 0) {                          GF_FREE (rsp.dict.dict_val);                          goto out;                  } else { -                        args->dict->extra_stdfree = rsp.dict.dict_val; +                        rsp_dict->extra_stdfree = rsp.dict.dict_val;                  }          } -        args->op_ret = rsp.op_ret; -        args->op_errno = rsp.op_errno; - +        gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr);          uuid_copy (args->uuid, rsp.uuid); - -        args->errstr = gf_strdup (rsp.op_errstr); - +        pthread_mutex_lock (&args->lock_dict); +        { +                ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict, +                                                     rsp_dict); +                if (ret) +                        gf_log (this->name, GF_LOG_ERROR, "%s", +                                "Failed to aggregate response from " +                                " node/brick"); +        } +        pthread_mutex_unlock (&args->lock_dict);  out: -        STACK_DESTROY (frame->root); +        if (rsp_dict) +                dict_unref (rsp_dict); -        __wake (args); +        STACK_DESTROY (frame->root); +        synctask_barrier_wake(args);          return 0;  }  int -gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid, -                          int op, dict_t *dict_out, dict_t **dict_in, -                          char **errstr) +gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *args, +                          uuid_t my_uuid, uuid_t recv_uuid, +                          int op, dict_t *dict_out, dict_t *op_ctx)  { -        struct syncargs        args = {0, }; -        gd1_mgmt_commit_op_req req  = {{0},}; -        int                    ret  = 0; +        gd1_mgmt_commit_op_req *req  = NULL; +        int                    ret  = -1; -        uuid_copy (req.uuid, my_uuid); -        req.op = op; +        req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_commit_req_t); +        if (!req) +                goto out; -        args.op_ret = -1; -        args.op_errno = ENOTCONN; +        uuid_copy (req->uuid, my_uuid); +        req->op = op;          ret = dict_allocate_and_serialize (dict_out, -                                           &req.buf.buf_val, &req.buf.buf_len); +                                           &req->buf.buf_val, &req->buf.buf_len);          if (ret)                  goto out; -        GD_SYNCOP (rpc, (&args), gd_syncop_commit_op_cbk, -                   &req, &gd_mgmt_prog, GLUSTERD_MGMT_COMMIT_OP, -                   xdr_gd1_mgmt_commit_op_req); - -        if (args.errstr && errstr) -                *errstr = args.errstr; -        else GF_FREE (args.errstr); - -        if (args.dict && dict_in) -                *dict_in = args.dict; -        else if (args.dict) -                dict_unref (args.dict); - -        uuid_copy (recv_uuid, args.uuid); - +        ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog, +                                        GLUSTERD_MGMT_COMMIT_OP , +                                        gd_syncop_commit_op_cbk, +                                        (xdrproc_t) xdr_gd1_mgmt_commit_op_req);  out: -        errno = args.op_errno; -        return args.op_ret; - -} - - -static int -glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp) -{ -        int ret = 0; - -        switch (op) { -        case GD_OP_REPLACE_BRICK: -                ret = glusterd_rb_use_rsp_dict (aggr, rsp); -                if (ret) -                        goto out; -        break; - -        case GD_OP_SYNC_VOLUME: -                ret = glusterd_sync_use_rsp_dict (aggr, rsp); -                if (ret) -                        goto out; -        break; - -        case GD_OP_PROFILE_VOLUME: -                ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp); -                if (ret) -                        goto out; -        break; - -        case GD_OP_GSYNC_SET: -                ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL); -                if (ret) -                        goto out; -        break; - -        case GD_OP_STATUS_VOLUME: -                ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp); -                if (ret) -                        goto out; -        break; - -        case GD_OP_REBALANCE: -        case GD_OP_DEFRAG_BRICK_VOLUME: -                ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp); -                if (ret) -                        goto out; -        break; - -        case GD_OP_HEAL_VOLUME: -                ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp); -                if (ret) -                        goto out; - -        break; - -        case GD_OP_QUOTA: -        case GD_OP_CLEARLOCKS_VOLUME: -                ret = glusterd_use_rsp_dict (aggr, rsp); -                if (ret) -                        goto out; - -        break; +        gd_commit_op_req_free (req); +        if (ret) +                synctask_barrier_wake(args); -        default: -        break; -        } -out:          return ret;  } -void + +int  gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers)  {          glusterd_peerinfo_t *peerinfo = NULL; +        int                 npeers      = 0;          list_for_each_entry (peerinfo, peers, uuid_list) {                  if (!peerinfo->connected) @@ -621,29 +627,42 @@ gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers)                          continue;                  list_add_tail (&peerinfo->op_peers_list, xact_peers); +                npeers++;          } +        return npeers;  }  int -gd_lock_op_phase (struct list_head *peers, char **op_errstr) +gd_lock_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, +                  char **op_errstr, int npeers)  { -        glusterd_peerinfo_t *peerinfo   = NULL; -        uuid_t              peer_uuid   = {0};          int                 ret         = -1; +        int                 peer_cnt    = 0; +        uuid_t              peer_uuid   = {0};          xlator_t            *this       = NULL; +        glusterd_peerinfo_t *peerinfo   = NULL; +        struct syncargs     args        = {0}; + +        if (!npeers) { +                ret = 0; +                goto out; +        }          this = THIS; +        synctask_barrier_init((&args)); +        peer_cnt = 0;          list_for_each_entry (peerinfo, peers, op_peers_list) { -                ret = gd_syncop_mgmt_lock (peerinfo->rpc, -                                           MY_UUID, peer_uuid); -                if (ret) { -                        gf_asprintf (op_errstr, "Another transaction could be " -                                     "in progress. Please try again after " -                                     "sometime."); -                        gf_log (this->name, GF_LOG_ERROR, "Failed to acquire " -                                "lock on peer %s", peerinfo->hostname); -                        goto out; -                } +                gd_syncop_mgmt_lock (peerinfo->rpc, &args, MY_UUID, peer_uuid); +                peer_cnt++; +        } +        synctask_barrier_wait((&args), peer_cnt); +        ret = args.op_ret; +        if (ret) { +                gf_asprintf (op_errstr, "Another transaction could be " +                             "in progress. Please try again after " +                             "sometime."); +                gf_log (this->name, GF_LOG_ERROR, "Failed to acquire lock"); +                goto out;          }          ret = 0; @@ -653,14 +672,17 @@ out:  int  gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, -                   dict_t *req_dict, char **op_errstr) +                   dict_t *req_dict, char **op_errstr, int npeers)  { -        dict_t              *rsp_dict       = NULL;          int                 ret             = -1; +        int                 peer_cnt           = 0; +        dict_t              *rsp_dict       = NULL;          char                *hostname       = NULL;          xlator_t            *this           = NULL;          glusterd_peerinfo_t *peerinfo       = NULL;          uuid_t              tmp_uuid        = {0}; +        char                *errstr         = NULL; +        struct syncargs     args            = {0};          this = THIS;          rsp_dict = dict_new (); @@ -684,30 +706,6 @@ gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,          dict_unref (rsp_dict);          rsp_dict = NULL; -        list_for_each_entry (peerinfo, peers, op_peers_list) { -                ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, -                                               MY_UUID, tmp_uuid, -                                               op, req_dict, &rsp_dict, -                                               op_errstr); -                if (ret) { -                        hostname = peerinfo->hostname; -                        goto stage_done; -                } - -                if (op == GD_OP_REPLACE_BRICK) { -                        ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, -                                                             rsp_dict); -                        if (ret) { -                                gf_log (this->name, GF_LOG_ERROR, "%s", -                                        "Failed to aggregate response from " -                                        " node/brick"); -                                goto out; -                        } -                } -                dict_unref (rsp_dict); -                rsp_dict = NULL; -        } -  stage_done:          if (ret) {                  gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL, @@ -717,6 +715,26 @@ stage_done:                          gf_asprintf (op_errstr, OPERRSTR_STAGE_FAIL, hostname);                  goto out;          } + +        if (!npeers) { +                ret = 0; +                goto out; +        } + +        gd_syncargs_init (&args, op_ctx); +        synctask_barrier_init((&args)); +        peer_cnt = 0; +        list_for_each_entry (peerinfo, peers, op_peers_list) { +                ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, &args, +                                               MY_UUID, tmp_uuid, +                                               op, req_dict, op_ctx); +                peer_cnt++; +        } +        synctask_barrier_wait((&args), peer_cnt); +        ret = args.op_ret; +        if (dict_get_str (op_ctx, "errstr", &errstr) == 0) +                *op_errstr = gf_strdup (errstr); +  out:          if (rsp_dict)                  dict_unref (rsp_dict); @@ -725,14 +743,17 @@ out:  int  gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, -                    dict_t *req_dict, char **op_errstr) +                    dict_t *req_dict, char **op_errstr, int npeers)  {          dict_t              *rsp_dict       = NULL; +        int                 peer_cnt           = -1;          int                 ret             = -1;          char                *hostname       = NULL;          glusterd_peerinfo_t *peerinfo       = NULL;          xlator_t            *this           = NULL;          uuid_t              tmp_uuid        = {0}; +        char                *errstr         = NULL; +        struct syncargs     args            = {0};          this = THIS;          rsp_dict = dict_new (); @@ -756,25 +777,6 @@ gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,          dict_unref (rsp_dict);          rsp_dict = NULL; -        list_for_each_entry (peerinfo, peers, op_peers_list) { -                ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, -                                                MY_UUID, tmp_uuid, -                                                op, req_dict, &rsp_dict, -                                                op_errstr); -                if (ret) { -                        hostname = peerinfo->hostname; -                        goto commit_done; -                } -                ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); -                if (ret) { -                        gf_log (this->name, GF_LOG_ERROR, "%s", -                                "Failed to aggregate " -                                "response from node/brick"); -                        goto out; -                } -                dict_unref (rsp_dict); -                rsp_dict = NULL; -        }  commit_done:          if (ret) {                  gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL, @@ -788,6 +790,24 @@ commit_done:                   glusterd_op_modify_op_ctx (op, op_ctx);           } +        if (!npeers) { +                ret = 0; +                goto out; +        } +        gd_syncargs_init (&args, op_ctx); +        synctask_barrier_init((&args)); +        peer_cnt = 0; +        list_for_each_entry (peerinfo, peers, op_peers_list) { +                ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, &args, +                                                MY_UUID, tmp_uuid, +                                                op, req_dict, op_ctx); +                peer_cnt++; +        } +        synctask_barrier_wait((&args), peer_cnt); +        ret = args.op_ret; +        if (dict_get_str (op_ctx, "errstr", &errstr) == 0) +                *op_errstr = gf_strdup (errstr); +  out:          if (rsp_dict)                  dict_unref (rsp_dict); @@ -795,20 +815,40 @@ out:  }  int -gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int ret, -                 rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr) +gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int op_ret, +                    rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr, +                    int npeers)  { -        glusterd_peerinfo_t *peerinfo = NULL; -        glusterd_peerinfo_t *tmp = NULL; -        uuid_t              tmp_uuid = {0}; +        glusterd_peerinfo_t *peerinfo   = NULL; +        glusterd_peerinfo_t *tmp        = NULL; +        uuid_t              tmp_uuid    = {0}; +        int                 peer_cnt       = 0; +        int                 ret         = -1; +        xlator_t            *this       = NULL; +        struct syncargs     args        = {0}; + +        if (!npeers) { +                ret = 0; +                goto out; +        } +        this = THIS; +        synctask_barrier_init((&args)); +        peer_cnt = 0;          list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) { -                gd_syncop_mgmt_unlock (peerinfo->rpc, -                                       MY_UUID, tmp_uuid); +                gd_syncop_mgmt_unlock (peerinfo->rpc, &args, MY_UUID, tmp_uuid);                  list_del_init (&peerinfo->op_peers_list); +                peer_cnt++; +        } +        synctask_barrier_wait((&args), peer_cnt); +        ret = args.op_ret; +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, "Failed to unlock " +                        "on some peer(s)");          } -        glusterd_op_send_cli_response (op, ret, 0, req, op_ctx, op_errstr); +out: +        glusterd_op_send_cli_response (op, op_ret, 0, req, op_ctx, op_errstr);          glusterd_op_clear_op (op);          glusterd_unlock (MY_UUID); @@ -816,6 +856,17 @@ gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int ret,  }  int +gd_get_brick_count (struct list_head *bricks) +{ +        glusterd_pending_node_t *pending_node = NULL; +        int                     npeers        = 0; +        list_for_each_entry (pending_node, bricks, list) { +                npeers++; +        } +        return npeers; +} + +int  gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict, char **op_errstr)  {          glusterd_pending_node_t *pending_node = NULL; @@ -885,13 +936,13 @@ void  gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)  {          int                         ret             = -1; +        int                         npeers          = 0;          dict_t                      *req_dict       = NULL;          glusterd_conf_t             *conf           = NULL;          glusterd_op_t               op              = 0;          int32_t                     tmp_op          = 0;          char                        *op_errstr      = NULL;          xlator_t                    *this           = NULL; -        gf_boolean_t                local_locked    = _gf_false;          this = THIS;          GF_ASSERT (this); @@ -913,16 +964,15 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)                               "Please try again after sometime.");                  goto out;          } -        local_locked = _gf_true;          /* storing op globally to access in synctask code paths           * This is still acceptable, as we are performing this under           * the 'cluster' lock*/          glusterd_op_set_op  (op);          INIT_LIST_HEAD (&conf->xaction_peers); -        gd_build_peers_list  (&conf->peers, &conf->xaction_peers); +        npeers = gd_build_peers_list  (&conf->peers, &conf->xaction_peers); -        ret = gd_lock_op_phase (&conf->xaction_peers, &op_errstr); +        ret = gd_lock_op_phase (&conf->xaction_peers, op, op_ctx, &op_errstr, npeers);          if (ret)                  goto out; @@ -936,7 +986,7 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)          }          ret = gd_stage_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, -                                 &op_errstr); +                                 &op_errstr, npeers);          if (ret)                  goto out; @@ -945,15 +995,14 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)                  goto out;          ret = gd_commit_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, -                                  &op_errstr); +                                  &op_errstr, npeers);          if (ret)                  goto out;          ret = 0;  out: -        if (local_locked) -                (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req, -                                           op_ctx, op_errstr); +        (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req, +                                   op_ctx, op_errstr, npeers);          if (req_dict)                  dict_unref (req_dict); diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.h b/xlators/mgmt/glusterd/src/glusterd-syncop.h index 268f3bf2357..658ed4e2a28 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.h +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.h @@ -12,7 +12,6 @@  #include "syncop.h" -  #define GD_SYNC_OPCODE_KEY "sync-mgmt-operation"  /* gd_syncop_* */ @@ -36,15 +35,14 @@ int gd_syncop_submit_request (struct rpc_clnt *rpc, void *req,                                 xdrproc_t xdrproc); -int gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, -                          uuid_t recv_uuid); -int gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, -                            uuid_t recv_uuid); -int gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid, -                              uuid_t recv_uuid, int op, dict_t *dict_out, -                              dict_t **dict_in, char **errstr); -int gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid, -                               uuid_t recv_uuid, int op, dict_t *dict_out, -                               dict_t **dict_in, char **errstr); - +int gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *arg, +                         uuid_t my_uuid, uuid_t recv_uuid); +int gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *arg, +                           uuid_t my_uuid, uuid_t recv_uuid); +int gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *arg, +                             uuid_t my_uuid, uuid_t recv_uuid, int op, +                             dict_t *dict_out, dict_t *op_ctx); +int gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *arg, +                              uuid_t my_uuid, uuid_t recv_uuid, int op, +                              dict_t *dict_out, dict_t *op_ctx);  #endif /* __RPC_SYNCOP_H */  | 
