diff options
| author | Raghavendra G <rgowdapp@redhat.com> | 2017-04-25 10:43:07 +0530 | 
|---|---|---|
| committer | Jeff Darcy <jeff@pl.atyp.us> | 2017-07-18 10:45:29 +0000 | 
| commit | 2e72b24707f1886833db0b09e48b3f48b8d68d37 (patch) | |
| tree | cb1f83ed4ac84e5390d5bbc54d0bfb649fcead7e /rpc/rpc-lib/src | |
| parent | 3b069f4d7e2140c1cad8c875a4397a1c90f99551 (diff) | |
program/GF-DUMP: Shield ping processing from traffic to Glusterfs
Program
Since poller thread bears the brunt of execution till the request is
handed over to io-threads, poller thread experiencies lock
contention(s) in the control flow till io-threads, which slows it
down. This delay invariably affects reading ping requests from network
and responding to them, resulting in increased ping latencies, which
sometimes results in a ping-timer-expiry on client leading to
disconnect of transport. So, this patch aims to free up poller thread
from executing code of Glusterfs Program. We do this by making
* Glusterfs Program registering itself asking rpcsvc to execute its
  actors in its own threads.
* GF-DUMP Program registering itself asking rpcsvc to _NOT_ execute
  its actors in its own threads. Otherwise program's ownthreads become
  bottleneck in processing ping traffic. This means that poller thread
  reads a ping packet, invokes its actor and hands the response msg to
  transport queue.
Change-Id: I526268c10bdd5ef93f322a4f95385137550a6a49
Signed-off-by: Raghavendra G <rgowdapp@redhat.com>
BUG: 1421938
Reviewed-on: https://review.gluster.org/17105
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Smoke: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Amar Tumballi <amarts@redhat.com>
Reviewed-by: Jeff Darcy <jeff@pl.atyp.us>
Diffstat (limited to 'rpc/rpc-lib/src')
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 89 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 19 | 
2 files changed, 103 insertions, 5 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index ce4e2bf0dc2..16d76a159e8 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -303,6 +303,7 @@ rpcsvc_program_actor (rpcsvc_request_t *req)                  goto err;          } +        req->ownthread = program->ownthread;          req->synctask = program->synctask;          err = SUCCESS; @@ -410,6 +411,7 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,          req->progver = rpc_call_progver (callmsg);          req->procnum = rpc_call_progproc (callmsg);          req->trans = rpc_transport_ref (trans); +        gf_client_ref (req->trans->xl_private);          req->count = msg->count;          req->msg[0] = progmsg;          req->iobref = iobref_ref (msg->iobref); @@ -425,6 +427,7 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,          req->trans_private = msg->private;          INIT_LIST_HEAD (&req->txlist); +        INIT_LIST_HEAD (&req->request_list);          req->payloadsize = 0;          /* By this time, the data bytes for the auth scheme would have already @@ -575,7 +578,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,          rpcsvc_request_t       *req            = NULL;          int                     ret            = -1;          uint16_t                port           = 0; -        gf_boolean_t            is_unix        = _gf_false; +        gf_boolean_t            is_unix        = _gf_false, empty = _gf_false;          gf_boolean_t            unprivileged   = _gf_false;          drc_cached_op_t        *reply          = NULL;          rpcsvc_drc_globals_t   *drc            = NULL; @@ -691,6 +694,20 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,                                              (synctask_fn_t) actor_fn,                                              rpcsvc_check_and_reply_error, NULL,                                              req); +                } else if (req->ownthread) { +                        pthread_mutex_lock (&req->prog->queue_lock); +                        { +                                empty = list_empty (&req->prog->request_queue); + +                                list_add_tail (&req->request_list, +                                               &req->prog->request_queue); + +                                if (empty) +                                        pthread_cond_signal (&req->prog->queue_cond); +                        } +                        pthread_mutex_unlock (&req->prog->queue_lock); + +                        ret = 0;                  } else {                          ret = actor_fn (req);                  } @@ -1570,6 +1587,12 @@ rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program)                  " Ver: %d, Port: %d", prog->progname, prog->prognum,                  prog->progver, prog->progport); +        if (prog->ownthread) { +                prog->alive = _gf_false; +                ret = 0; +                goto out; +        } +          pthread_mutex_lock (&svc->rpclock);          {                  list_del_init (&prog->program); @@ -1834,6 +1857,55 @@ out:          return ret;  } +void * +rpcsvc_request_handler (void *arg) +{ +        rpcsvc_program_t *program = arg; +        rpcsvc_request_t *req     = NULL; +        rpcsvc_actor_t   *actor   = NULL; +        gf_boolean_t      done    = _gf_false; +        int               ret     = 0; + +        if (!program) +                return NULL; + +        while (1) { +                pthread_mutex_lock (&program->queue_lock); +                { +                        if (!program->alive +                            && list_empty (&program->request_queue)) { +                                done = 1; +                                goto unlock; +                        } + +                        while (list_empty (&program->request_queue)) +                                pthread_cond_wait (&program->queue_cond, +                                                   &program->queue_lock); + +                        req = list_entry (program->request_queue.next, +                                          typeof (*req), request_list); + +                        list_del_init (&req->request_list); +                } +        unlock: +                pthread_mutex_unlock (&program->queue_lock); + +                if (done) +                        break; + +                THIS = req->svc->xl; + +                actor = rpcsvc_program_actor (req); + +                ret = actor->actor (req); + +                if (ret != 0) { +                        rpcsvc_check_and_reply_error (ret, NULL, req); +                } +        } + +        return NULL; +}  int  rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program) @@ -1875,6 +1947,21 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)          memcpy (newprog, program, sizeof (*program));          INIT_LIST_HEAD (&newprog->program); +        INIT_LIST_HEAD (&newprog->request_queue); +        pthread_mutex_init (&newprog->queue_lock, NULL); +        pthread_cond_init (&newprog->queue_cond, NULL); + +        newprog->alive = _gf_true; + +        /* make sure synctask gets priority over ownthread */ +        if (newprog->synctask) +                newprog->ownthread = _gf_false; + +        if (newprog->ownthread) { +                gf_thread_create (&newprog->thread, NULL, +                                  rpcsvc_request_handler, +                                  newprog); +        }          pthread_mutex_lock (&svc->rpclock);          { diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index cf3e5906de1..73507b6538b 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -233,9 +233,10 @@ struct rpcsvc_request {           */          rpcsvc_auth_data_t      verf; -	/* Execute this request's actor function as a synctask? */ -	gf_boolean_t            synctask; +	/* Execute this request's actor function in ownthread of program?*/ +	gf_boolean_t            ownthread; +        gf_boolean_t            synctask;          /* Container for a RPC program wanting to store a temp           * request-specific item.           */ @@ -246,6 +247,9 @@ struct rpcsvc_request {          /* pointer to cached reply for use in DRC */          drc_cached_op_t         *reply; + +        /* request queue in rpcsvc */ +        struct list_head         request_list;  };  #define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog)) @@ -396,11 +400,18 @@ struct rpcsvc_program {           */          int                     min_auth; -	/* Execute actor function as a synctask? */ -	gf_boolean_t            synctask; +	/* Execute actor function in program's own thread? This will reduce */ +        /* the workload on poller threads */ +	gf_boolean_t            ownthread; +        gf_boolean_t            alive; +        gf_boolean_t            synctask;          /* list member to link to list of registered services with rpcsvc */          struct list_head        program; +        struct list_head        request_queue; +        pthread_mutex_t         queue_lock; +        pthread_cond_t          queue_cond; +        pthread_t               thread;  };  typedef struct rpcsvc_cbk_program {  | 
