diff options
| -rw-r--r-- | cli/src/cli-rl.c | 5 | ||||
| -rw-r--r-- | libglusterfs/src/event-epoll.c | 114 | ||||
| -rw-r--r-- | libglusterfs/src/event-poll.c | 10 | ||||
| -rw-r--r-- | libglusterfs/src/event.c | 10 | ||||
| -rw-r--r-- | libglusterfs/src/gf-event.h | 19 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/autoscale-threads.c | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/libgfrpc.sym | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 6 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 4 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 3 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 412 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 34 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 28 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.c | 9 | 
14 files changed, 479 insertions, 177 deletions
diff --git a/cli/src/cli-rl.c b/cli/src/cli-rl.c index 7831d0bcb40..38aa6f4b7ae 100644 --- a/cli/src/cli-rl.c +++ b/cli/src/cli-rl.c @@ -104,7 +104,7 @@ cli_rl_process_line(char *line)  int  cli_rl_stdin(int fd, int idx, int gen, void *data, int poll_out, int poll_in, -             int poll_err) +             int poll_err, char event_thread_died)  {      struct cli_state *state = NULL; @@ -376,7 +376,8 @@ cli_rl_enable(struct cli_state *state)          goto out;      } -    ret = event_register(state->ctx->event_pool, 0, cli_rl_stdin, state, 1, 0); +    ret = event_register(state->ctx->event_pool, 0, cli_rl_stdin, state, 1, 0, +                         0);      if (ret == -1)          goto out; diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 9826cc9e275..041a7e6c583 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -30,6 +30,7 @@ struct event_slot_epoll {      int fd;      int events;      int gen; +    int idx;      gf_atomic_t ref;      int do_close;      int in_handler; @@ -37,6 +38,7 @@ struct event_slot_epoll {      void *data;      event_handler_t handler;      gf_lock_t lock; +    struct list_head poller_death;  };  struct event_thread_data { @@ -57,6 +59,7 @@ __event_newtable(struct event_pool *event_pool, int table_idx)      for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {          table[i].fd = -1;          LOCK_INIT(&table[i].lock); +        INIT_LIST_HEAD(&table[i].poller_death);      }      event_pool->ereg[table_idx] = table; @@ -66,7 +69,8 @@ __event_newtable(struct event_pool *event_pool, int table_idx)  }  static int -__event_slot_alloc(struct event_pool *event_pool, int fd) +__event_slot_alloc(struct event_pool *event_pool, int fd, +                   char notify_poller_death)  {      int i = 0;      int table_idx = -1; @@ -109,8 +113,15 @@ __event_slot_alloc(struct event_pool *event_pool, int fd)              table[i].gen = gen + 1;              LOCK_INIT(&table[i].lock); +            INIT_LIST_HEAD(&table[i].poller_death);              table[i].fd = fd; +            if (notify_poller_death) { +                table[i].idx = table_idx * EVENT_EPOLL_SLOTS + i; +                list_add_tail(&table[i].poller_death, +                              &event_pool->poller_death); +            } +              event_pool->slots_used[table_idx]++;              break; @@ -121,13 +132,14 @@ __event_slot_alloc(struct event_pool *event_pool, int fd)  }  static int -event_slot_alloc(struct event_pool *event_pool, int fd) +event_slot_alloc(struct event_pool *event_pool, int fd, +                 char notify_poller_death)  {      int idx = -1;      pthread_mutex_lock(&event_pool->mutex);      { -        idx = __event_slot_alloc(event_pool, fd); +        idx = __event_slot_alloc(event_pool, fd, notify_poller_death);      }      pthread_mutex_unlock(&event_pool->mutex); @@ -155,6 +167,7 @@ __event_slot_dealloc(struct event_pool *event_pool, int idx)      slot->fd = -1;      slot->handled_error = 0;      slot->in_handler = 0; +    list_del_init(&slot->poller_death);      event_pool->slots_used[table_idx]--;      return; @@ -172,6 +185,15 @@ event_slot_dealloc(struct event_pool *event_pool, int idx)      return;  } +static int +event_slot_ref(struct event_slot_epoll *slot) +{ +    if (!slot) +        return -1; + +    return GF_ATOMIC_INC(slot->ref); +} +  static struct event_slot_epoll *  event_slot_get(struct event_pool *event_pool, int idx)  { @@ -188,12 +210,41 @@ event_slot_get(struct event_pool *event_pool, int idx)          return NULL;      slot = &table[offset]; -    GF_ATOMIC_INC(slot->ref); +    event_slot_ref(slot);      return slot;  }  static void +__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot, +                   int idx) +{ +    int ref = -1; +    int fd = -1; +    int do_close = 0; + +    ref = GF_ATOMIC_DEC(slot->ref); +    if (ref) +        /* slot still alive */ +        goto done; + +    LOCK(&slot->lock); +    { +        fd = slot->fd; +        do_close = slot->do_close; +        slot->do_close = 0; +    } +    UNLOCK(&slot->lock); + +    __event_slot_dealloc(event_pool, idx); + +    if (do_close) +        sys_close(fd); +done: +    return; +} + +static void  event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,                   int idx)  { @@ -248,7 +299,7 @@ event_pool_new_epoll(int count, int eventthreadcount)      event_pool->fd = epfd;      event_pool->count = count; - +    INIT_LIST_HEAD(&event_pool->poller_death);      event_pool->eventthreadcount = eventthreadcount;      event_pool->auto_thread_count = 0; @@ -297,7 +348,7 @@ __slot_update_events(struct event_slot_epoll *slot, int poll_in, int poll_out)  int  event_register_epoll(struct event_pool *event_pool, int fd,                       event_handler_t handler, void *data, int poll_in, -                     int poll_out) +                     int poll_out, char notify_poller_death)  {      int idx = -1;      int ret = -1; @@ -328,7 +379,7 @@ event_register_epoll(struct event_pool *event_pool, int fd,      if (destroy == 1)          goto out; -    idx = event_slot_alloc(event_pool, fd); +    idx = event_slot_alloc(event_pool, fd, notify_poller_death);      if (idx == -1) {          gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,                 "could not find slot for fd=%d", fd); @@ -591,7 +642,7 @@ pre_unlock:          ret = handler(fd, idx, gen, data,                        (event->events & (EPOLLIN | EPOLLPRI)),                        (event->events & (EPOLLOUT)), -                      (event->events & (EPOLLERR | EPOLLHUP))); +                      (event->events & (EPOLLERR | EPOLLHUP)), 0);      }  out:      event_slot_unref(event_pool, slot, idx); @@ -607,7 +658,9 @@ event_dispatch_epoll_worker(void *data)      struct event_thread_data *ev_data = data;      struct event_pool *event_pool;      int myindex = -1; -    int timetodie = 0; +    int timetodie = 0, gen = 0; +    struct list_head poller_death_notify; +    struct event_slot_epoll *slot = NULL, *tmp = NULL;      GF_VALIDATE_OR_GOTO("event", ev_data, out); @@ -619,7 +672,7 @@ event_dispatch_epoll_worker(void *data)      gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD,             "Started"             " thread with index %d", -           myindex); +           myindex - 1);      pthread_mutex_lock(&event_pool->mutex);      { @@ -637,20 +690,55 @@ event_dispatch_epoll_worker(void *data)              pthread_mutex_lock(&event_pool->mutex);              {                  if (event_pool->eventthreadcount < myindex) { +                    while (event_pool->poller_death_sliced) { +                        pthread_cond_wait(&event_pool->cond, +                                          &event_pool->mutex); +                    } + +                    INIT_LIST_HEAD(&poller_death_notify);                      /* if found true in critical section,                       * die */                      event_pool->pollers[myindex - 1] = 0;                      event_pool->activethreadcount--;                      timetodie = 1; +                    gen = ++event_pool->poller_gen; +                    list_for_each_entry(slot, &event_pool->poller_death, +                                        poller_death) +                    { +                        event_slot_ref(slot); +                    } + +                    list_splice_init(&event_pool->poller_death, +                                     &poller_death_notify); +                    event_pool->poller_death_sliced = 1;                      pthread_cond_broadcast(&event_pool->cond);                  }              }              pthread_mutex_unlock(&event_pool->mutex);              if (timetodie) { +                list_for_each_entry(slot, &poller_death_notify, poller_death) +                { +                    slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1); +                } + +                pthread_mutex_lock(&event_pool->mutex); +                { +                    list_for_each_entry_safe(slot, tmp, &poller_death_notify, +                                             poller_death) +                    { +                        __event_slot_unref(event_pool, slot, slot->idx); +                    } + +                    list_splice(&poller_death_notify, +                                &event_pool->poller_death); +                    event_pool->poller_death_sliced = 0; +                    pthread_cond_broadcast(&event_pool->cond); +                } +                pthread_mutex_unlock(&event_pool->mutex); +                  gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_EXITED_EPOLL_THREAD, -                       "Exited " -                       "thread with index %d", -                       myindex); +                       "Exited thread with index %d", myindex); +                  goto out;              }          } diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 727d2a000a2..5bac4291c47 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -33,11 +33,11 @@ struct event_slot_poll {  static int  event_register_poll(struct event_pool *event_pool, int fd,                      event_handler_t handler, void *data, int poll_in, -                    int poll_out); +                    int poll_out, char notify_poller_death);  static int  __flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out, -           int poll_err) +           int poll_err, char event_thread_died)  {      char buf[64];      int ret = -1; @@ -146,7 +146,7 @@ event_pool_new_poll(int count, int eventthreadcount)      }      ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd, -                              NULL, 1, 0); +                              NULL, 1, 0, 0);      if (ret == -1) {          gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED,                 "could not register pipe fd with poll event loop"); @@ -180,7 +180,7 @@ event_pool_new_poll(int count, int eventthreadcount)  static int  event_register_poll(struct event_pool *event_pool, int fd,                      event_handler_t handler, void *data, int poll_in, -                    int poll_out) +                    int poll_out, char notify_poller_death)  {      int idx = -1; @@ -378,7 +378,7 @@ unlock:          ret = handler(ufds[i].fd, idx, 0, data,                        (ufds[i].revents & (POLLIN | POLLPRI)),                        (ufds[i].revents & (POLLOUT)), -                      (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL))); +                      (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0);      return ret;  } diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 49f70c83366..ddba9810b0b 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -54,14 +54,14 @@ event_pool_new(int count, int eventthreadcount)  int  event_register(struct event_pool *event_pool, int fd, event_handler_t handler, -               void *data, int poll_in, int poll_out) +               void *data, int poll_in, int poll_out, char notify_poller_death)  {      int ret = -1;      GF_VALIDATE_OR_GOTO("event", event_pool, out); -    ret = event_pool->ops->event_register(event_pool, fd, handler, data, -                                          poll_in, poll_out); +    ret = event_pool->ops->event_register( +        event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death);  out:      return ret;  } @@ -161,7 +161,7 @@ out:  int  poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out, -                       int poll_in, int poll_err) +                       int poll_in, int poll_err, char event_thread_exit)  {      struct event_destroy_data *destroy = NULL;      int readfd = -1, ret = -1; @@ -233,7 +233,7 @@ event_dispatch_destroy(struct event_pool *event_pool)      /* From the main thread register an event on the pipe fd[0],       */ -    idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, +    idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, 0,                           0);      if (idx < 0)          goto out; diff --git a/libglusterfs/src/gf-event.h b/libglusterfs/src/gf-event.h index 5c3724cc953..5d92a2dd285 100644 --- a/libglusterfs/src/gf-event.h +++ b/libglusterfs/src/gf-event.h @@ -12,6 +12,7 @@  #define _GF_EVENT_H_  #include <pthread.h> +#include "list.h"  struct event_pool;  struct event_ops; @@ -23,7 +24,8 @@ struct event_data {  } __attribute__((__packed__, __may_alias__));  typedef int (*event_handler_t)(int fd, int idx, int gen, void *data, -                               int poll_in, int poll_out, int poll_err); +                               int poll_in, int poll_out, int poll_err, +                               char event_thread_exit);  #define EVENT_EPOLL_TABLES 1024  #define EVENT_EPOLL_SLOTS 1024 @@ -40,6 +42,13 @@ struct event_pool {      struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES];      int slots_used[EVENT_EPOLL_TABLES]; +    struct list_head poller_death; +    int poller_death_sliced; /* track whether the list of fds interested +                              * poller_death is sliced. If yes, new thread death +                              * notification has to wait till the list is added +                              * back +                              */ +    int poller_gen;      int used;      int changed; @@ -52,8 +61,8 @@ struct event_pool {      /* NOTE: Currently used only when event processing is done using       * epoll. */      int eventthreadcount; /* number of event threads to execute. */ -    pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, -                                           * and live status */ +    pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, and live +                                             status */      int destroy;      int activethreadcount; @@ -81,7 +90,7 @@ struct event_ops {      int (*event_register)(struct event_pool *event_pool, int fd,                            event_handler_t handler, void *data, int poll_in, -                          int poll_out); +                          int poll_out, char notify_poller_death);      int (*event_select_on)(struct event_pool *event_pool, int fd, int idx,                             int poll_in, int poll_out); @@ -107,7 +116,7 @@ event_select_on(struct event_pool *event_pool, int fd, int idx, int poll_in,                  int poll_out);  int  event_register(struct event_pool *event_pool, int fd, event_handler_t handler, -               void *data, int poll_in, int poll_out); +               void *data, int poll_in, int poll_out, char notify_poller_death);  int  event_unregister(struct event_pool *event_pool, int fd, int idx);  int diff --git a/rpc/rpc-lib/src/autoscale-threads.c b/rpc/rpc-lib/src/autoscale-threads.c index 337f002df10..d629a1cd430 100644 --- a/rpc/rpc-lib/src/autoscale-threads.c +++ b/rpc/rpc-lib/src/autoscale-threads.c @@ -19,5 +19,4 @@ rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr)      pool->auto_thread_count += incr;      (void)event_reconfigure_threads(pool, thread_count + incr); -    rpcsvc_ownthread_reconf(rpc, pool->eventthreadcount);  } diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym index a7cb5f6b5cb..4f42485044f 100644 --- a/rpc/rpc-lib/src/libgfrpc.sym +++ b/rpc/rpc-lib/src/libgfrpc.sym @@ -51,7 +51,6 @@ rpcsvc_transport_connect  rpcsvc_transport_getpeeraddr  rpcsvc_unregister_notify  rpcsvc_volume_allowed -rpcsvc_ownthread_reconf  rpc_transport_count  rpc_transport_connect  rpc_transport_disconnect diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 2505998b3d4..b26d645bb12 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -969,6 +969,12 @@ rpc_clnt_notify(rpc_transport_t *trans, void *mydata,               */              ret = 0;              break; + +        case RPC_TRANSPORT_EVENT_THREAD_DIED: +            /* only meaningful on a server, no need of handling this event on a +             * client */ +            ret = 0; +            break;      }  out: diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index d70334476c7..54636dcbf00 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -266,6 +266,10 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)          goto fail;      } +    if (dict_get(options, "notify-poller-death")) { +        trans->notify_poller_death = 1; +    } +      gf_log("rpc-transport", GF_LOG_DEBUG, "attempt to load file %s", name);      handle = dlopen(name, RTLD_NOW); diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index c238501b5c7..fd737d0c764 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -97,6 +97,7 @@ typedef enum {      RPC_TRANSPORT_MSG_RECEIVED,    /* Complete rpc msg has been read */      RPC_TRANSPORT_CONNECT,         /* client is connected to server */      RPC_TRANSPORT_MSG_SENT, +    RPC_TRANSPORT_EVENT_THREAD_DIED /* event-thread has died */  } rpc_transport_event_t;  struct rpc_transport_msg { @@ -213,6 +214,8 @@ struct rpc_transport {       * layer or in client management notification handler functions       */      gf_boolean_t connect_failed; +    char notify_poller_death; +    char poller_death_accept;  };  struct rpc_transport_ops { diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index c6545193a11..d678bca43a8 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -36,6 +36,7 @@  #include <fnmatch.h>  #include <stdarg.h>  #include <stdio.h> +#include <math.h>  #ifdef IPV6_DEFAULT  #include <netconfig.h> @@ -63,10 +64,76 @@ rpcsvc_get_listener(rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);  int  rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,                void *data, ...); +void * +rpcsvc_request_handler(void *arg);  static int  rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr); +void +rpcsvc_toggle_queue_status(rpcsvc_program_t *prog, +                           rpcsvc_request_queue_t *queue, char status[]) +{ +    int queue_index = 0, status_index = 0, set_bit = 0; + +    if (queue != &prog->request_queue[0]) { +        queue_index = (queue - &prog->request_queue[0]); +    } + +    status_index = queue_index / 8; +    set_bit = queue_index % 8; + +    status[status_index] ^= (1 << set_bit); + +    return; +} + +static int +get_rightmost_set_bit(int n) +{ +    return log2(n & -n); +} + +int +rpcsvc_get_free_queue_index(rpcsvc_program_t *prog) +{ +    int queue_index = 0, max_index = 0, i = 0; +    unsigned int right_most_unset_bit = 0; + +    right_most_unset_bit = 8; + +    max_index = gf_roof(EVENT_MAX_THREADS, 8) / 8; +    for (i = 0; i < max_index; i++) { +        if (prog->request_queue_status[i] == 0) { +            right_most_unset_bit = 0; +            break; +        } else { +            right_most_unset_bit = get_rightmost_set_bit( +                ~prog->request_queue_status[i]); +            if (right_most_unset_bit < 8) { +                break; +            } +        } +    } + +    if (right_most_unset_bit > 7) { +        queue_index = -1; +    } else { +        queue_index = i * 8; +        queue_index += right_most_unset_bit; + +        if (queue_index > EVENT_MAX_THREADS) { +            queue_index = -1; +        } +    } + +    if (queue_index != -1) { +        prog->request_queue_status[i] |= (0x1 << right_most_unset_bit); +    } + +    return queue_index; +} +  rpcsvc_notify_wrapper_t *  rpcsvc_notify_wrapper_alloc(void)  { @@ -575,6 +642,73 @@ rpcsvc_check_and_reply_error(int ret, call_frame_t *frame, void *opaque)      return 0;  } +void +rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen) +{ +    rpcsvc_request_queue_t *queue = NULL; +    int num = 0; +    void *value = NULL; +    rpcsvc_request_t *req = NULL; +    char empty = 0; + +    value = pthread_getspecific(prog->req_queue_key); +    if (value == NULL) { +        return; +    } + +    num = ((unsigned long)value) - 1; + +    queue = &prog->request_queue[num]; + +    if (queue->gen == gen) { +        /* duplicate event */ +        gf_log(GF_RPCSVC, GF_LOG_INFO, +               "not queuing duplicate event thread death. " +               "queue %d program %s", +               num, prog->progname); +        return; +    } + +    rpcsvc_alloc_request(svc, req); +    req->prognum = RPCSVC_INFRA_PROGRAM; +    req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH; +    gf_log(GF_RPCSVC, GF_LOG_INFO, +           "queuing event thread death request to queue %d of program %s", num, +           prog->progname); + +    pthread_mutex_lock(&queue->queue_lock); +    { +        empty = list_empty(&queue->request_queue); + +        list_add_tail(&req->request_list, &queue->request_queue); +        queue->gen = gen; + +        if (empty && queue->waiting) +            pthread_cond_signal(&queue->queue_cond); +    } +    pthread_mutex_unlock(&queue->queue_lock); + +    return; +} + +int +rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen) +{ +    rpcsvc_program_t *prog = NULL; + +    pthread_rwlock_rdlock(&svc->rpclock); +    { +        list_for_each_entry(prog, &svc->programs, program) +        { +            if (prog->ownthread) +                rpcsvc_queue_event_thread_death(svc, prog, gen); +        } +    } +    pthread_rwlock_unlock(&svc->rpclock); + +    return 0; +} +  int  rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,                         rpc_transport_pollin_t *msg) @@ -585,9 +719,12 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,      int ret = -1;      uint16_t port = 0;      gf_boolean_t is_unix = _gf_false, empty = _gf_false; -    gf_boolean_t unprivileged = _gf_false; +    gf_boolean_t unprivileged = _gf_false, spawn_request_handler = 0;      drc_cached_op_t *reply = NULL;      rpcsvc_drc_globals_t *drc = NULL; +    rpcsvc_request_queue_t *queue = NULL; +    long num = 0; +    void *value = NULL;      if (!trans || !svc)          return -1; @@ -700,19 +837,81 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,              ret = synctask_new(THIS->ctx->env, (synctask_fn_t)actor_fn,                                 rpcsvc_check_and_reply_error, NULL, req);          } else if (req->ownthread) { -            pthread_mutex_lock(&req->prog->queue_lock); +            value = pthread_getspecific(req->prog->req_queue_key); +            if (value == NULL) { +                pthread_mutex_lock(&req->prog->thr_lock); +                { +                    num = rpcsvc_get_free_queue_index(req->prog); +                    if (num != -1) { +                        num++; +                        value = (void *)num; +                        ret = pthread_setspecific(req->prog->req_queue_key, +                                                  value); +                        if (ret < 0) { +                            gf_log(GF_RPCSVC, GF_LOG_WARNING, +                                   "setting request queue in TLS failed"); +                            rpcsvc_toggle_queue_status( +                                req->prog, &req->prog->request_queue[num - 1], +                                req->prog->request_queue_status); +                            num = -1; +                        } else { +                            spawn_request_handler = 1; +                        } +                    } +                } +                pthread_mutex_unlock(&req->prog->thr_lock); +            } + +            if (num == -1) +                goto noqueue; + +            num = ((unsigned long)value) - 1; + +            queue = &req->prog->request_queue[num]; + +            if (spawn_request_handler) { +                ret = gf_thread_create(&queue->thread, NULL, +                                       rpcsvc_request_handler, queue, +                                       "rpcrqhnd"); +                if (!ret) { +                    gf_log(GF_RPCSVC, GF_LOG_INFO, +                           "spawned a request handler thread for queue %d", +                           (int)num); + +                    req->prog->threadcount++; +                } else { +                    gf_log( +                        GF_RPCSVC, GF_LOG_INFO, +                        "spawning a request handler thread for queue %d failed", +                        (int)num); +                    ret = pthread_setspecific(req->prog->req_queue_key, 0); +                    if (ret < 0) { +                        gf_log(GF_RPCSVC, GF_LOG_WARNING, +                               "resetting request queue in TLS failed"); +                    } + +                    rpcsvc_toggle_queue_status( +                        req->prog, &req->prog->request_queue[num - 1], +                        req->prog->request_queue_status); + +                    goto noqueue; +                } +            } + +            pthread_mutex_lock(&queue->queue_lock);              { -                empty = list_empty(&req->prog->request_queue); +                empty = list_empty(&queue->request_queue); -                list_add_tail(&req->request_list, &req->prog->request_queue); +                list_add_tail(&req->request_list, &queue->request_queue); -                if (empty) -                    pthread_cond_signal(&req->prog->queue_cond); +                if (empty && queue->waiting) +                    pthread_cond_signal(&queue->queue_cond);              } -            pthread_mutex_unlock(&req->prog->queue_lock); +            pthread_mutex_unlock(&queue->queue_lock);              ret = 0;          } else { +        noqueue:              ret = actor_fn(req);          }      } @@ -839,6 +1038,12 @@ rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,                     "got MAP_XID event, which should have not come");              ret = 0;              break; + +        case RPC_TRANSPORT_EVENT_THREAD_DIED: +            rpcsvc_handle_event_thread_death(svc, trans, +                                             (int)(unsigned long)data); +            ret = 0; +            break;      }  out: @@ -1877,6 +2082,7 @@ rpcsvc_create_listeners(rpcsvc_t *svc, dict_t *options, char *name)              goto out;          } +        dict_del(options, "notify-poller-death");          GF_FREE(transport_name);          transport_name = NULL;          count++; @@ -1961,55 +2167,86 @@ out:  void *  rpcsvc_request_handler(void *arg)  { -    rpcsvc_program_t *program = arg; -    rpcsvc_request_t *req = NULL; +    rpcsvc_request_queue_t *queue = NULL; +    rpcsvc_program_t *program = NULL; +    rpcsvc_request_t *req = NULL, *tmp_req = NULL;      rpcsvc_actor_t *actor = NULL;      gf_boolean_t done = _gf_false;      int ret = 0; +    struct list_head tmp_list = { +        0, +    }; + +    queue = arg; +    program = queue->program; + +    INIT_LIST_HEAD(&tmp_list);      if (!program)          return NULL;      while (1) { -        pthread_mutex_lock(&program->queue_lock); +        pthread_mutex_lock(&queue->queue_lock);          { -            if (!program->alive && list_empty(&program->request_queue)) { +            if (!program->alive && list_empty(&queue->request_queue)) {                  done = 1;                  goto unlock;              } -            while (list_empty(&program->request_queue) && -                   (program->threadcount <= program->eventthreadcount)) { -                pthread_cond_wait(&program->queue_cond, &program->queue_lock); +            while (list_empty(&queue->request_queue)) { +                queue->waiting = _gf_true; +                pthread_cond_wait(&queue->queue_cond, &queue->queue_lock);              } -            if (program->threadcount > program->eventthreadcount) { -                done = 1; -                program->threadcount--; +            queue->waiting = _gf_false; -                gf_log(GF_RPCSVC, GF_LOG_INFO, -                       "program '%s' thread terminated; " -                       "total count:%d", -                       program->progname, program->threadcount); -            } else if (!list_empty(&program->request_queue)) { -                req = list_entry(program->request_queue.next, typeof(*req), -                                 request_list); - -                list_del_init(&req->request_list); +            if (!list_empty(&queue->request_queue)) { +                INIT_LIST_HEAD(&tmp_list); +                list_splice_init(&queue->request_queue, &tmp_list);              }          }      unlock: -        pthread_mutex_unlock(&program->queue_lock); - -        if (req) { -            THIS = req->svc->xl; -            actor = rpcsvc_program_actor(req); -            ret = actor->actor(req); +        pthread_mutex_unlock(&queue->queue_lock); -            if (ret != 0) { -                rpcsvc_check_and_reply_error(ret, NULL, req); +        list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list) +        { +            list_del_init(&req->request_list); + +            if (req) { +                if (req->prognum == RPCSVC_INFRA_PROGRAM) { +                    switch (req->procnum) { +                        case RPCSVC_PROC_EVENT_THREAD_DEATH: +                            gf_log(GF_RPCSVC, GF_LOG_INFO, +                                   "event thread died, exiting request handler " +                                   "thread for queue %d of program %s", +                                   (int)(queue - &program->request_queue[0]), +                                   program->progname); +                            done = 1; +                            pthread_mutex_lock(&program->thr_lock); +                            { +                                rpcsvc_toggle_queue_status( +                                    program, queue, +                                    program->request_queue_status); +                                program->threadcount--; +                            } +                            pthread_mutex_unlock(&program->thr_lock); +                            rpcsvc_request_destroy(req); +                            break; + +                        default: +                            break; +                    } +                } else { +                    THIS = req->svc->xl; +                    actor = rpcsvc_program_actor(req); +                    ret = actor->actor(req); + +                    if (ret != 0) { +                        rpcsvc_check_and_reply_error(ret, NULL, req); +                    } +                    req = NULL; +                }              } -            req = NULL;          }          if (done) @@ -2020,59 +2257,10 @@ rpcsvc_request_handler(void *arg)  }  int -rpcsvc_spawn_threads(rpcsvc_t *svc, rpcsvc_program_t *program) -{ -    int ret = 0, delta = 0, creates = 0; - -    if (!program || !svc) -        goto out; - -    pthread_mutex_lock(&program->queue_lock); -    { -        delta = program->eventthreadcount - program->threadcount; - -        if (delta >= 0) { -            while (delta--) { -                ret = gf_thread_create(&program->thread, NULL, -                                       rpcsvc_request_handler, program, -                                       "rpcrqhnd"); -                if (!ret) { -                    program->threadcount++; -                    creates++; -                } -            } - -            if (creates) { -                gf_log(GF_RPCSVC, GF_LOG_INFO, -                       "spawned %d threads for program '%s'; " -                       "total count:%d", -                       creates, program->progname, program->threadcount); -            } -        } else { -            gf_log(GF_RPCSVC, GF_LOG_INFO, -                   "terminating %d threads for program '%s'", -delta, -                   program->progname); - -            /* this signal is to just wake up the threads so they -             * test for the change in eventthreadcount and kill -             * themselves until the program thread count becomes -             * equal to the event thread count -             */ -            pthread_cond_broadcast(&program->queue_cond); -        } -    } -    pthread_mutex_unlock(&program->queue_lock); - -out: -    return creates; -} - -int  rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,                          gf_boolean_t add_to_head)  { -    int ret = -1; -    int creates = -1; +    int ret = -1, i = 0;      rpcsvc_program_t *newprog = NULL;      char already_registered = 0; @@ -2110,9 +2298,16 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,      memcpy(newprog, program, sizeof(*program));      INIT_LIST_HEAD(&newprog->program); -    INIT_LIST_HEAD(&newprog->request_queue); -    pthread_mutex_init(&newprog->queue_lock, NULL); -    pthread_cond_init(&newprog->queue_cond, NULL); + +    for (i = 0; i < EVENT_MAX_THREADS; i++) { +        INIT_LIST_HEAD(&newprog->request_queue[i].request_queue); +        pthread_mutex_init(&newprog->request_queue[i].queue_lock, NULL); +        pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL); +        newprog->request_queue[i].program = newprog; +    } + +    pthread_mutex_init(&newprog->thr_lock, NULL); +    pthread_cond_init(&newprog->thr_cond, NULL);      newprog->alive = _gf_true; @@ -2121,12 +2316,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,          newprog->ownthread = _gf_false;      if (newprog->ownthread) { -        newprog->eventthreadcount = 1; -        creates = rpcsvc_spawn_threads(svc, newprog); +        struct event_pool *ep = svc->ctx->event_pool; +        newprog->eventthreadcount = ep->eventthreadcount; -        if (creates < 1) { -            goto out; -        } +        pthread_key_create(&newprog->req_queue_key, NULL); +        newprog->thr_queue = 1;      }      pthread_rwlock_wrlock(&svc->rpclock); @@ -3003,38 +3197,6 @@ out:      return ret;  } -/* During reconfigure, Make sure to call this function after event-threads are - * reconfigured as programs' threadcount will be made equal to event threads. - */ - -int -rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount) -{ -    int ret = -1; -    rpcsvc_program_t *program = NULL; - -    if (!svc) { -        ret = 0; -        goto out; -    } - -    pthread_rwlock_wrlock(&svc->rpclock); -    { -        list_for_each_entry(program, &svc->programs, program) -        { -            if (program->ownthread) { -                program->eventthreadcount = new_eventthreadcount; -                rpcsvc_spawn_threads(svc, program); -            } -        } -    } -    pthread_rwlock_unlock(&svc->rpclock); - -    ret = 0; -out: -    return ret; -} -  rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {      [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},      [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA}, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index ebb836fba3f..8388dd404c5 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -33,6 +33,16 @@  #define MAX_IOVEC 16  #endif +/* TODO: we should store prognums at a centralized location to avoid conflict +         or use a robust random number generator to avoid conflicts +*/ + +#define RPCSVC_INFRA_PROGRAM 7712846 /* random number */ + +typedef enum { +    RPCSVC_PROC_EVENT_THREAD_DEATH = 0, +} rpcsvc_infra_procnum_t; +  #define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT                                   \      64 /* Default for protocol/server */  #define RPCSVC_DEF_NFS_OUTSTANDING_RPC_LIMIT 16 /* Default for nfs/server */ @@ -362,6 +372,16 @@ typedef struct rpcsvc_actor_desc {      drc_op_type_t op_type;  } rpcsvc_actor_t; +typedef struct rpcsvc_request_queue { +    int gen; +    struct list_head request_queue; +    pthread_mutex_t queue_lock; +    pthread_cond_t queue_cond; +    pthread_t thread; +    struct rpcsvc_program *program; +    gf_boolean_t waiting; +} rpcsvc_request_queue_t; +  /* Describes a program and its version along with the function pointers   * required to handle the procedures/actors of each program/version.   * Never changed ever by any thread so no need for a lock. @@ -421,11 +441,14 @@ struct rpcsvc_program {      gf_boolean_t synctask;      /* list member to link to list of registered services with rpcsvc */      struct list_head program; -    struct list_head request_queue; -    pthread_mutex_t queue_lock; -    pthread_cond_t queue_cond; -    pthread_t thread; +    rpcsvc_request_queue_t request_queue[EVENT_MAX_THREADS]; +    char request_queue_status[EVENT_MAX_THREADS / 8 + 1]; +    pthread_mutex_t thr_lock; +    pthread_cond_t thr_cond;      int threadcount; +    int thr_queue; +    pthread_key_t req_queue_key; +      /* eventthreadcount is just a readonly copy of the actual value       * owned by the event sub-system       * It is used to control the scaling of rpcsvc_request_handler threads @@ -652,9 +675,6 @@ rpcsvc_auth_array(rpcsvc_t *svc, char *volname, int *autharr, int arrlen);  rpcsvc_vector_sizer  rpcsvc_get_program_vector_sizer(rpcsvc_t *svc, uint32_t prognum,                                  uint32_t progver, int procnum); -extern int -rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount); -  void  rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr);  #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index dc227137d57..776e647d4f6 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2859,7 +2859,7 @@ socket_complete_connection(rpc_transport_t *this)  /* reads rpc_requests during pollin */  static int  socket_event_handler(int fd, int idx, int gen, void *data, int poll_in, -                     int poll_out, int poll_err) +                     int poll_out, int poll_err, char event_thread_died)  {      rpc_transport_t *this = NULL;      socket_private_t *priv = NULL; @@ -2869,6 +2869,11 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in,      this = data; +    if (event_thread_died) { +        /* to avoid duplicate notifications, notify only for listener sockets */ +        return 0; +    } +      GF_VALIDATE_OR_GOTO("socket", this, out);      GF_VALIDATE_OR_GOTO("socket", this->private, out);      GF_VALIDATE_OR_GOTO("socket", this->xl, out); @@ -2967,7 +2972,7 @@ out:  static int  socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, -                            int poll_out, int poll_err) +                            int poll_out, int poll_err, char event_thread_died)  {      rpc_transport_t *this = NULL;      socket_private_t *priv = NULL; @@ -2991,6 +2996,12 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,      priv = this->private;      ctx = this->ctx; +    if (event_thread_died) { +        rpc_transport_notify(this, RPC_TRANSPORT_EVENT_THREAD_DIED, +                             (void *)(unsigned long)gen); +        return 0; +    } +      /* NOTE:       * We have done away with the critical section in this function. since       * there's little that it helps with. There's no other code that @@ -3099,6 +3110,7 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,          new_trans->mydata = this->mydata;          new_trans->notify = this->notify;          new_trans->listener = this; +        new_trans->notify_poller_death = this->poller_death_accept;          new_priv = new_trans->private;          if (new_sockaddr.ss_family == AF_UNIX) { @@ -3149,9 +3161,9 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,              ret = rpc_transport_notify(this, RPC_TRANSPORT_ACCEPT, new_trans);              if (ret != -1) { -                new_priv->idx = event_register(ctx->event_pool, new_sock, -                                               socket_event_handler, new_trans, -                                               1, 0); +                new_priv->idx = event_register( +                    ctx->event_pool, new_sock, socket_event_handler, new_trans, +                    1, 0, new_trans->notify_poller_death);                  if (new_priv->idx == -1) {                      ret = -1;                      gf_log(this->name, GF_LOG_ERROR, @@ -3530,7 +3542,8 @@ socket_connect(rpc_transport_t *this, int port)          this->listener = this;          priv->idx = event_register(ctx->event_pool, priv->sock, -                                   socket_event_handler, this, 1, 1); +                                   socket_event_handler, this, 1, 1, +                                   this->notify_poller_death);          if (priv->idx == -1) {              gf_log("", GF_LOG_WARNING,                     "failed to register the event; " @@ -3709,7 +3722,8 @@ socket_listen(rpc_transport_t *this)          rpc_transport_ref(this);          priv->idx = event_register(ctx->event_pool, priv->sock, -                                   socket_server_event_handler, this, 1, 0); +                                   socket_server_event_handler, this, 1, 0, +                                   this->notify_poller_death);          if (priv->idx == -1) {              gf_log(this->name, GF_LOG_WARNING, diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 8d8e8fc5718..77e5d74e7c5 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -929,12 +929,6 @@ do_rpc:      if (ret)          goto out; -    /* rpcsvc thread reconfigure should be after events thread -     * reconfigure -     */ -    new_nthread = ((struct event_pool *)(this->ctx->event_pool)) -                      ->eventthreadcount; -    ret = rpcsvc_ownthread_reconf(rpc_conf, new_nthread);  out:      THIS = oldTHIS;      gf_msg_debug("", 0, "returning %d", ret); @@ -1133,6 +1127,9 @@ server_init(xlator_t *this)          ret = -1;          goto out;      } + +    ret = dict_set_int32(this->options, "notify-poller-death", 1); +      ret = rpcsvc_create_listeners(conf->rpc, this->options, this->name);      if (ret < 1) {          gf_msg(this->name, GF_LOG_WARNING, 0,  | 
