diff options
| -rw-r--r-- | glusterfsd/src/Makefile.am | 1 | ||||
| -rw-r--r-- | glusterfsd/src/glusterfsd-mgmt.c | 16 | ||||
| -rw-r--r-- | glusterfsd/src/glusterfsd.h | 2 | ||||
| -rw-r--r-- | libglusterfs/src/event-poll.c | 7 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/libgfrpc.sym | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 131 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 8 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.c | 10 | 
8 files changed, 153 insertions, 23 deletions
diff --git a/glusterfsd/src/Makefile.am b/glusterfsd/src/Makefile.am index eb92e66e989..3286e639bcf 100644 --- a/glusterfsd/src/Makefile.am +++ b/glusterfsd/src/Makefile.am @@ -25,6 +25,7 @@ AM_CPPFLAGS = $(GF_CPPFLAGS) \  	-I$(top_srcdir)/rpc/xdr/src \  	-I$(top_builddir)/rpc/xdr/src \  	-I$(top_srcdir)/xlators/nfs/server/src \ +	-I$(top_srcdir)/xlators/protocol/server/src \  	-I$(top_srcdir)/api/src  AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c index 355b25f8aab..fa824de9996 100644 --- a/glusterfsd/src/glusterfsd-mgmt.c +++ b/glusterfsd/src/glusterfsd-mgmt.c @@ -33,6 +33,7 @@  #include "xlator.h"  #include "syscall.h"  #include "monitoring.h" +#include "server.h"  static gf_boolean_t is_mgmt_rpc_reconnect = _gf_false;  int need_emancipate = 0; @@ -185,12 +186,15 @@ glusterfs_terminate_response_send (rpcsvc_request_t *req, int op_ret)  }  void -glusterfs_autoscale_threads (glusterfs_ctx_t *ctx, int incr) +glusterfs_autoscale_threads (glusterfs_ctx_t *ctx, int incr, xlator_t *this)  {          struct event_pool       *pool           = ctx->event_pool; +        server_conf_t           *conf           = this->private; +        int                      thread_count   = pool->eventthreadcount;          pool->auto_thread_count += incr; -        (void) event_reconfigure_threads (pool, pool->eventthreadcount+incr); +        (void) event_reconfigure_threads (pool, thread_count+incr); +        rpcsvc_ownthread_reconf (conf->rpc, pool->eventthreadcount);  }  int @@ -842,6 +846,7 @@ glusterfs_handle_attach (rpcsvc_request_t *req)          xlator_t                *nextchild      = NULL;          glusterfs_graph_t       *newgraph       = NULL;          glusterfs_ctx_t         *ctx            = NULL; +        xlator_t                *protocol_server = NULL;          GF_ASSERT (req);          this = THIS; @@ -879,7 +884,12 @@ glusterfs_handle_attach (rpcsvc_request_t *req)                                                  nextchild->name);                                          goto out;                                  } -                                glusterfs_autoscale_threads (this->ctx, 1); +                                /* we need a protocol/server xlator as +                                 * nextchild +                                 */ +                                protocol_server = this->ctx->active->first; +                                glusterfs_autoscale_threads (this->ctx, 1, +                                                             protocol_server);                          }                  } else {                          gf_log (this->name, GF_LOG_WARNING, diff --git a/glusterfsd/src/glusterfsd.h b/glusterfsd/src/glusterfsd.h index 9a0281e78a4..516eef864c0 100644 --- a/glusterfsd/src/glusterfsd.h +++ b/glusterfsd/src/glusterfsd.h @@ -126,7 +126,7 @@ int glusterfs_volume_top_read_perf (uint32_t blk_size, uint32_t blk_count,                                      char *brick_path, double *throughput,                                      double *time);  void -glusterfs_autoscale_threads (glusterfs_ctx_t *ctx, int incr); +glusterfs_autoscale_threads (glusterfs_ctx_t *ctx, int incr, xlator_t *this);  extern glusterfs_ctx_t *glusterfsd_ctx;  #endif /* __GLUSTERFSD_H__ */ diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 3bffc4784d7..b1aca826759 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -173,6 +173,13 @@ event_pool_new_poll (int count, int eventthreadcount)                          "thread count (%d) ignored", eventthreadcount);          } +        /* although, eventhreadcount for poll implementaiton is always +         * going to be 1, eventthreadcount needs to be set to 1 so that +         * rpcsvc_request_handler() thread scaling works flawlessly in +         * both epoll and poll models +         */ +        event_pool->eventthreadcount = 1; +          return event_pool;  } diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym index 89923b22192..7d878abfd4d 100644 --- a/rpc/rpc-lib/src/libgfrpc.sym +++ b/rpc/rpc-lib/src/libgfrpc.sym @@ -53,6 +53,7 @@ rpcsvc_transport_connect  rpcsvc_transport_getpeeraddr  rpcsvc_unregister_notify  rpcsvc_volume_allowed +rpcsvc_ownthread_reconf  rpc_transport_count  rpc_transport_connect  rpc_transport_disconnect diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 8766da47b7b..34e7563e163 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1970,33 +1970,98 @@ rpcsvc_request_handler (void *arg)                                  goto unlock;                          } -                        while (list_empty (&program->request_queue)) +                        while (list_empty (&program->request_queue) && +                               (program->threadcount <= +                                        program->eventthreadcount)) {                                  pthread_cond_wait (&program->queue_cond,                                                     &program->queue_lock); +                        } -                        req = list_entry (program->request_queue.next, -                                          typeof (*req), request_list); - -                        list_del_init (&req->request_list); +                        if (program->threadcount > program->eventthreadcount) { +                                done = 1; +                                program->threadcount--; + +                                gf_log (GF_RPCSVC, GF_LOG_INFO, +                                        "program '%s' thread terminated; " +                                        "total count:%d", +                                        program->progname, +                                        program->threadcount); +                        } else if (!list_empty (&program->request_queue)) { +                                req = list_entry (program->request_queue.next, +                                                  typeof (*req), request_list); + +                                list_del_init (&req->request_list); +                        }                  }          unlock:                  pthread_mutex_unlock (&program->queue_lock); +                if (req) { +                        THIS = req->svc->xl; +                        actor = rpcsvc_program_actor (req); +                        ret = actor->actor (req); + +                        if (ret != 0) { +                                rpcsvc_check_and_reply_error (ret, NULL, req); +                        } +                        req = NULL; +                } +                  if (done)                          break; +        } -                THIS = req->svc->xl; +        return NULL; +} -                actor = rpcsvc_program_actor (req); +int +rpcsvc_spawn_threads (rpcsvc_t *svc, rpcsvc_program_t *program) +{ +        int                ret  = 0, delta = 0, creates = 0; -                ret = actor->actor (req); +        if (!program || !svc) +                goto out; -                if (ret != 0) { -                        rpcsvc_check_and_reply_error (ret, NULL, req); +        pthread_mutex_lock (&program->queue_lock); +        { +                delta = program->eventthreadcount - program->threadcount; + +                if (delta >= 0) { +                        while (delta--) { +                                ret = gf_thread_create (&program->thread, NULL, +                                                        rpcsvc_request_handler, +                                                        program, "rpcrqhnd"); +                                if (!ret) { +                                        program->threadcount++; +                                        creates++; +                                } +                        } + +                        if (creates) { +                                gf_log (GF_RPCSVC, GF_LOG_INFO, +                                        "spawned %d threads for program '%s'; " +                                        "total count:%d", +                                        creates, +                                        program->progname, +                                        program->threadcount); +                        } +                } else { +                        gf_log (GF_RPCSVC, GF_LOG_INFO, +                                "terminating %d threads for program '%s'", +                                -delta, program->progname); + +                        /* this signal is to just wake up the threads so they +                         * test for the change in eventthreadcount and kill +                         * themselves until the program thread count becomes +                         * equal to the event thread count +                         */ +                        pthread_cond_broadcast (&program->queue_cond);                  }          } +        pthread_mutex_unlock (&program->queue_lock); -        return NULL; +out: +        return creates;  }  int @@ -2004,6 +2069,7 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program,                           gf_boolean_t add_to_head)  {          int               ret                = -1; +        int               creates            = -1;          rpcsvc_program_t *newprog            = NULL;          char              already_registered = 0; @@ -2051,12 +2117,11 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program,                  newprog->ownthread = _gf_false;          if (newprog->ownthread) { -                ret = gf_thread_create (&newprog->thread, NULL, -                                        rpcsvc_request_handler, -                                        newprog, "rpcsvcrh"); -                if (ret != 0) { -                        gf_log (GF_RPCSVC, GF_LOG_ERROR, -                                "error creating request handler thread"); +                newprog->eventthreadcount = 1; +                creates = rpcsvc_spawn_threads (svc, newprog); + +                if (creates < 1) { +                        goto out;                  }          } @@ -2924,6 +2989,38 @@ out:          return ret;  } +/* During reconfigure, Make sure to call this function after event-threads are + * reconfigured as programs' threadcount will be made equal to event threads. + */ + +int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount) +{ +        int ret = -1; +        rpcsvc_program_t *program = NULL; + +        if (!svc) { +                ret = 0; +                goto out; +        } + +        pthread_rwlock_wrlock (&svc->rpclock); +        { +                list_for_each_entry (program, &svc->programs, program) { +                        if (program->ownthread) { +                                program->eventthreadcount = +                                        new_eventthreadcount; +                                rpcsvc_spawn_threads (svc, program); +                        } +                } +        } +        pthread_rwlock_unlock (&svc->rpclock); + +        ret = 0; +out: +        return ret; +} +  rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {          [GF_DUMP_NULL]      = {"NULL",     GF_DUMP_NULL,     NULL,        NULL, 0, DRC_NA}, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 878eea28999..ec76b659965 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -422,6 +422,12 @@ struct rpcsvc_program {          pthread_mutex_t         queue_lock;          pthread_cond_t          queue_cond;          pthread_t               thread; +        int                     threadcount; +        /* eventthreadcount is just a readonly copy of the actual value +         * owned by the event sub-system +         * It is used to control the scaling of rpcsvc_request_handler threads +         */ +        int                     eventthreadcount;  };  typedef struct rpcsvc_cbk_program { @@ -639,4 +645,6 @@ rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen);  rpcsvc_vector_sizer  rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,                                   uint32_t progver, int procnum); +extern int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount);  #endif diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 7bd276cdece..30546e87b8d 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -843,6 +843,12 @@ do_rpc:          if (ret)                  goto out; +        /* rpcsvc thread reconfigure should be after events thread +         * reconfigure +         */ +        new_nthread = +        ((struct event_pool *)(this->ctx->event_pool))->eventthreadcount; +        ret = rpcsvc_ownthread_reconf (rpc_conf, new_nthread);  out:          THIS = oldTHIS;          gf_msg_debug ("", 0, "returning %d", ret); @@ -1499,9 +1505,9 @@ server_notify (xlator_t *this, int32_t event, void *data, ...)                                  (*trav_p) = (*trav_p)->next;                          glusterfs_mgmt_pmap_signout (ctx,                                                       victim->name); -                        glusterfs_autoscale_threads (THIS->ctx, -1); +                        /* we need the protocol/server xlator here as 'this' */ +                        glusterfs_autoscale_threads (ctx, -1, this);                          default_notify (victim, GF_EVENT_CLEANUP, data); -                  }                  break;  | 
