diff options
Diffstat (limited to 'rpc/rpc-lib/src')
| -rw-r--r-- | rpc/rpc-lib/src/libgfrpc.sym | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 131 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 8 | 
3 files changed, 123 insertions, 17 deletions
diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym index 89923b22192..7d878abfd4d 100644 --- a/rpc/rpc-lib/src/libgfrpc.sym +++ b/rpc/rpc-lib/src/libgfrpc.sym @@ -53,6 +53,7 @@ rpcsvc_transport_connect  rpcsvc_transport_getpeeraddr  rpcsvc_unregister_notify  rpcsvc_volume_allowed +rpcsvc_ownthread_reconf  rpc_transport_count  rpc_transport_connect  rpc_transport_disconnect diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 8766da47b7b..34e7563e163 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1970,33 +1970,98 @@ rpcsvc_request_handler (void *arg)                                  goto unlock;                          } -                        while (list_empty (&program->request_queue)) +                        while (list_empty (&program->request_queue) && +                               (program->threadcount <= +                                        program->eventthreadcount)) {                                  pthread_cond_wait (&program->queue_cond,                                                     &program->queue_lock); +                        } -                        req = list_entry (program->request_queue.next, -                                          typeof (*req), request_list); - -                        list_del_init (&req->request_list); +                        if (program->threadcount > program->eventthreadcount) { +                                done = 1; +                                program->threadcount--; + +                                gf_log (GF_RPCSVC, GF_LOG_INFO, +                                        "program '%s' thread terminated; " +                                        "total count:%d", +                                        program->progname, +                                        program->threadcount); +                        } else if (!list_empty (&program->request_queue)) { +                                req = list_entry (program->request_queue.next, +                                                  typeof (*req), request_list); + +                                list_del_init (&req->request_list); +                        }                  }          unlock:                  pthread_mutex_unlock (&program->queue_lock); +                if (req) { +                        THIS = req->svc->xl; +                        actor = rpcsvc_program_actor (req); +                        ret = actor->actor (req); + +                        if (ret != 0) { +                                rpcsvc_check_and_reply_error (ret, NULL, req); +                        } +                        req = NULL; +                } +                  if (done)                          break; +        } -                THIS = req->svc->xl; +        return NULL; +} -                actor = rpcsvc_program_actor (req); +int +rpcsvc_spawn_threads (rpcsvc_t *svc, rpcsvc_program_t *program) +{ +        int                ret  = 0, delta = 0, creates = 0; -                ret = actor->actor (req); +        if (!program || !svc) +                goto out; -                if (ret != 0) { -                        rpcsvc_check_and_reply_error (ret, NULL, req); +        pthread_mutex_lock (&program->queue_lock); +        { +                delta = program->eventthreadcount - program->threadcount; + +                if (delta >= 0) { +                        while (delta--) { +                                ret = gf_thread_create (&program->thread, NULL, +                                                        rpcsvc_request_handler, +                                                        program, "rpcrqhnd"); +                                if (!ret) { +                                        program->threadcount++; +                                        creates++; +                                } +                        } + +                        if (creates) { +                                gf_log (GF_RPCSVC, GF_LOG_INFO, +                                        "spawned %d threads for program '%s'; " +                                        "total count:%d", +                                        creates, +                                        program->progname, +                                        program->threadcount); +                        } +                } else { +                        gf_log (GF_RPCSVC, GF_LOG_INFO, +                                "terminating %d threads for program '%s'", +                                -delta, program->progname); + +                        /* this signal is to just wake up the threads so they +                         * test for the change in eventthreadcount and kill +                         * themselves until the program thread count becomes +                         * equal to the event thread count +                         */ +                        pthread_cond_broadcast (&program->queue_cond);                  }          } +        pthread_mutex_unlock (&program->queue_lock); -        return NULL; +out: +        return creates;  }  int @@ -2004,6 +2069,7 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program,                           gf_boolean_t add_to_head)  {          int               ret                = -1; +        int               creates            = -1;          rpcsvc_program_t *newprog            = NULL;          char              already_registered = 0; @@ -2051,12 +2117,11 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program,                  newprog->ownthread = _gf_false;          if (newprog->ownthread) { -                ret = gf_thread_create (&newprog->thread, NULL, -                                        rpcsvc_request_handler, -                                        newprog, "rpcsvcrh"); -                if (ret != 0) { -                        gf_log (GF_RPCSVC, GF_LOG_ERROR, -                                "error creating request handler thread"); +                newprog->eventthreadcount = 1; +                creates = rpcsvc_spawn_threads (svc, newprog); + +                if (creates < 1) { +                        goto out;                  }          } @@ -2924,6 +2989,38 @@ out:          return ret;  } +/* During reconfigure, Make sure to call this function after event-threads are + * reconfigured as programs' threadcount will be made equal to event threads. + */ + +int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount) +{ +        int ret = -1; +        rpcsvc_program_t *program = NULL; + +        if (!svc) { +                ret = 0; +                goto out; +        } + +        pthread_rwlock_wrlock (&svc->rpclock); +        { +                list_for_each_entry (program, &svc->programs, program) { +                        if (program->ownthread) { +                                program->eventthreadcount = +                                        new_eventthreadcount; +                                rpcsvc_spawn_threads (svc, program); +                        } +                } +        } +        pthread_rwlock_unlock (&svc->rpclock); + +        ret = 0; +out: +        return ret; +} +  rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {          [GF_DUMP_NULL]      = {"NULL",     GF_DUMP_NULL,     NULL,        NULL, 0, DRC_NA}, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 878eea28999..ec76b659965 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -422,6 +422,12 @@ struct rpcsvc_program {          pthread_mutex_t         queue_lock;          pthread_cond_t          queue_cond;          pthread_t               thread; +        int                     threadcount; +        /* eventthreadcount is just a readonly copy of the actual value +         * owned by the event sub-system +         * It is used to control the scaling of rpcsvc_request_handler threads +         */ +        int                     eventthreadcount;  };  typedef struct rpcsvc_cbk_program { @@ -639,4 +645,6 @@ rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen);  rpcsvc_vector_sizer  rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,                                   uint32_t progver, int procnum); +extern int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount);  #endif  | 
