diff options
| -rw-r--r-- | libglusterfs/src/event-epoll.c | 747 | ||||
| -rw-r--r-- | libglusterfs/src/event-poll.c | 43 | ||||
| -rw-r--r-- | libglusterfs/src/event.c | 18 | ||||
| -rw-r--r-- | libglusterfs/src/event.h | 20 | ||||
| -rw-r--r-- | libglusterfs/src/mem-types.h | 1 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 33 | 
6 files changed, 557 insertions, 305 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 06b3236246a..9082954e4e4 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -31,28 +31,203 @@  #include <sys/epoll.h> +struct event_slot_epoll { +	int fd; +	int events; +	int gen; +	int ref; +	int do_close; +	int in_handler; +	void *data; +	event_handler_t handler; +	gf_lock_t lock; +}; + + +static struct event_slot_epoll * +__event_newtable (struct event_pool *event_pool, int table_idx) +{ +	struct event_slot_epoll *table = NULL; +	int                      i = -1; + +	table = GF_CALLOC (sizeof (*table), EVENT_EPOLL_SLOTS, +			   gf_common_mt_ereg); +	if (!table) +		return NULL; + +	for (i = 0; i < EVENT_EPOLL_SLOTS; i++) { +		table[i].fd = -1; +		LOCK_INIT (&table[i].lock); +	} + +	event_pool->ereg[table_idx] = table; +	event_pool->slots_used[table_idx] = 0; + +	return table; +} + +  static int -__event_getindex (struct event_pool *event_pool, int fd, int idx) +__event_slot_alloc (struct event_pool *event_pool, int fd)  { -        int  ret = -1;          int  i = 0; +	int  table_idx = -1; +	int  gen = -1; +	struct event_slot_epoll *table = NULL; + +	for (i = 0; i < EVENT_EPOLL_TABLES; i++) { +		switch (event_pool->slots_used[i]) { +		case EVENT_EPOLL_SLOTS: +			continue; +		case 0: +			if (!event_pool->ereg[i]) { +				table = __event_newtable (event_pool, i); +				if (!table) +					return -1; +			} +			break; +		default: +			table = event_pool->ereg[i]; +			break; +		} + +		if (table) +			/* break out of the loop */ +			break; +	} + +	if (!table) +		return -1; + +	table_idx = i; + +	for (i = 0; i < EVENT_EPOLL_SLOTS; i++) { +		if (table[i].fd == -1) { +			/* wipe everything except bump the generation */ +			gen = table[i].gen; +			memset (&table[i], 0, sizeof (table[i])); +			table[i].gen = gen + 1; + +			LOCK_INIT (&table[i].lock); + +			table[i].fd = fd; +			event_pool->slots_used[table_idx]++; + +			break; +		} +	} + +	return table_idx * EVENT_EPOLL_SLOTS + i; +} -        GF_VALIDATE_OR_GOTO ("event", event_pool, out); -        if (idx > -1 && idx < event_pool->used) { -                if (event_pool->reg[idx].fd == fd) -                        ret = idx; -        } +static int +event_slot_alloc (struct event_pool *event_pool, int fd) +{ +	int  idx = -1; -        for (i=0; ret == -1 && i<event_pool->used; i++) { -                if (event_pool->reg[i].fd == fd) { -                        ret = i; -                        break; -                } -        } +	pthread_mutex_lock (&event_pool->mutex); +	{ +		idx = __event_slot_alloc (event_pool, fd); +	} +	pthread_mutex_unlock (&event_pool->mutex); -out: -        return ret; +	return idx; +} + + + +static void +__event_slot_dealloc (struct event_pool *event_pool, int idx) +{ +	int                      table_idx = 0; +	int                      offset = 0; +	struct event_slot_epoll *table = NULL; +	struct event_slot_epoll *slot = NULL; + +	table_idx = idx / EVENT_EPOLL_SLOTS; +	offset = idx % EVENT_EPOLL_SLOTS; + +	table = event_pool->ereg[table_idx]; +	if (!table) +		return; + +	slot = &table[offset]; +	slot->gen++; + +	slot->fd = -1; +	event_pool->slots_used[table_idx]--; + +	return; +} + + +static void +event_slot_dealloc (struct event_pool *event_pool, int idx) +{ +	pthread_mutex_lock (&event_pool->mutex); +	{ +		__event_slot_dealloc (event_pool, idx); +	} +	pthread_mutex_unlock (&event_pool->mutex); + +	return; +} + + +static struct event_slot_epoll * +event_slot_get (struct event_pool *event_pool, int idx) +{ +	struct event_slot_epoll *slot = NULL; +	struct event_slot_epoll *table = NULL; +	int                      table_idx = 0; +	int                      offset = 0; + +	table_idx = idx / EVENT_EPOLL_SLOTS; +	offset = idx % EVENT_EPOLL_SLOTS; + +	table = event_pool->ereg[table_idx]; +	if (!table) +		return NULL; + +	slot = &table[offset]; + +	LOCK (&slot->lock); +	{ +		slot->ref++; +	} +	UNLOCK (&slot->lock); + +	return slot; +} + + +static void +event_slot_unref (struct event_pool *event_pool, struct event_slot_epoll *slot, +		  int idx) +{ +	int ref = -1; +	int fd = -1; +	int do_close = 0; + +	LOCK (&slot->lock); +	{ +		ref = --slot->ref; +		fd = slot->fd; +		do_close = slot->do_close; +	} +	UNLOCK (&slot->lock); + +	if (ref) +		/* slot still alive */ +		goto done; + +	event_slot_dealloc (event_pool, idx); + +	if (do_close) +		close (fd); +done: +	return;  } @@ -68,17 +243,6 @@ event_pool_new_epoll (int count)          if (!event_pool)                  goto out; -        event_pool->count = count; -        event_pool->reg = GF_CALLOC (event_pool->count, -                                     sizeof (*event_pool->reg), -                                     gf_common_mt_reg); - -        if (!event_pool->reg) { -                GF_FREE (event_pool); -                event_pool = NULL; -                goto out; -        } -          epfd = epoll_create (count);          if (epfd == -1) { @@ -95,13 +259,49 @@ event_pool_new_epoll (int count)          event_pool->count = count;          pthread_mutex_init (&event_pool->mutex, NULL); -        pthread_cond_init (&event_pool->cond, NULL);  out:          return event_pool;  } +static void +__slot_update_events (struct event_slot_epoll *slot, int poll_in, int poll_out) +{ +	switch (poll_in) { +	case 1: +		slot->events |= EPOLLIN; +		break; +	case 0: +		slot->events &= ~EPOLLIN; +		break; +	case -1: +		/* do nothing */ +		break; +	default: +		gf_log ("epoll", GF_LOG_ERROR, +			"invalid poll_in value %d", poll_in); +		break; +	} + +	switch (poll_out) { +	case 1: +		slot->events |= EPOLLOUT; +		break; +	case 0: +		slot->events &= ~EPOLLOUT; +		break; +	case -1: +		/* do nothing */ +		break; +	default: +		gf_log ("epoll", GF_LOG_ERROR, +			"invalid poll_out value %d", poll_out); +		break; +	} +} + +  int  event_register_epoll (struct event_pool *event_pool, int fd,                        event_handler_t handler, @@ -111,87 +311,98 @@ event_register_epoll (struct event_pool *event_pool, int fd,          int                 ret = -1;          struct epoll_event  epoll_event = {0, };          struct event_data  *ev_data = (void *)&epoll_event.data; +	struct event_slot_epoll *slot = NULL;          GF_VALIDATE_OR_GOTO ("event", event_pool, out); -        pthread_mutex_lock (&event_pool->mutex); -        { -                if (event_pool->count == event_pool->used) { -                        event_pool->count *= 2; +	idx = event_slot_alloc (event_pool, fd); +	if (idx == -1) { +		gf_log ("epoll", GF_LOG_ERROR, +			"could not find slot for fd=%d", fd); +		return -1; +	} + +	slot = event_slot_get (event_pool, idx); + +	assert (slot->fd == fd); + +	LOCK (&slot->lock); +	{ +		/* make epoll edge triggered and 'singleshot', which +		   means we need to re-add the fd with +		   epoll_ctl(EPOLL_CTL_MOD) after delivery of every +		   single event. This assures us that while a poller +		   thread has picked up and is processing an event, +		   another poller will not try to pick this at the same +		   time as well. +		*/ + +		slot->events = EPOLLPRI | EPOLLET | EPOLLONESHOT; +		slot->handler = handler; +		slot->data = data; + +		__slot_update_events (slot, poll_in, poll_out); + +		epoll_event.events = slot->events; +		ev_data->idx = idx; +		ev_data->gen = slot->gen; + +		ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd, +				 &epoll_event); +		/* check ret after UNLOCK() to avoid deadlock in +		   event_slot_unref() +		*/ +	} +	UNLOCK (&slot->lock); + +	if (ret == -1) { +		gf_log ("epoll", GF_LOG_ERROR, +			"failed to add fd(=%d) to epoll fd(=%d) (%s)", +			fd, event_pool->fd, strerror (errno)); + +		event_slot_unref (event_pool, slot, idx); +		idx = -1; +	} + +	/* keep slot->ref (do not event_slot_unref) if successful */ +out: +        return idx; +} -                        event_pool->reg = GF_REALLOC (event_pool->reg, -                                                      event_pool->count * -                                                      sizeof (*event_pool->reg)); -                        if (!event_pool->reg) { -                                gf_log ("epoll", GF_LOG_ERROR, -                                        "event registry re-allocation failed"); -                                goto unlock; -                        } -                } - -                idx = event_pool->used; -                event_pool->used++; - -                event_pool->reg[idx].fd = fd; -                event_pool->reg[idx].events = EPOLLPRI; -                event_pool->reg[idx].handler = handler; -                event_pool->reg[idx].data = data; - -                switch (poll_in) { -                case 1: -                        event_pool->reg[idx].events |= EPOLLIN; -                        break; -                case 0: -                        event_pool->reg[idx].events &= ~EPOLLIN; -                        break; -                case -1: -                        /* do nothing */ -                        break; -                default: -                        gf_log ("epoll", GF_LOG_ERROR, -                                "invalid poll_in value %d", poll_in); -                        break; -                } +static int +event_unregister_epoll_common (struct event_pool *event_pool, int fd, +			       int idx, int do_close) +{ +        int  ret = -1; +	struct event_slot_epoll *slot = NULL; -                switch (poll_out) { -                case 1: -                        event_pool->reg[idx].events |= EPOLLOUT; -                        break; -                case 0: -                        event_pool->reg[idx].events &= ~EPOLLOUT; -                        break; -                case -1: -                        /* do nothing */ -                        break; -                default: -                        gf_log ("epoll", GF_LOG_ERROR, -                                "invalid poll_out value %d", poll_out); -                        break; -                } +        GF_VALIDATE_OR_GOTO ("event", event_pool, out); -                event_pool->changed = 1; +	slot = event_slot_get (event_pool, idx); -                epoll_event.events = event_pool->reg[idx].events; -                ev_data->fd = fd; -                ev_data->idx = idx; +	assert (slot->fd == fd); -                ret = epoll_ctl (event_pool->fd, EPOLL_CTL_ADD, fd, -                                 &epoll_event); +	LOCK (&slot->lock); +	{ +                ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL);                  if (ret == -1) {                          gf_log ("epoll", GF_LOG_ERROR, -                                "failed to add fd(=%d) to epoll fd(=%d) (%s)", +                                "fail to del fd(=%d) from epoll fd(=%d) (%s)",                                  fd, event_pool->fd, strerror (errno));                          goto unlock;                  } -                pthread_cond_broadcast (&event_pool->cond); +		slot->do_close = do_close; +		slot->gen++; /* detect unregister in dispatch_handler() */          }  unlock: -        pthread_mutex_unlock (&event_pool->mutex); +	UNLOCK (&slot->lock); +	event_slot_unref (event_pool, slot, idx); /* one for event_register() */ +	event_slot_unref (event_pool, slot, idx); /* one for event_slot_get() */  out:          return ret;  } @@ -200,233 +411,186 @@ out:  static int  event_unregister_epoll (struct event_pool *event_pool, int fd, int idx_hint)  { -        int  idx = -1; -        int  ret = -1; - -        struct epoll_event epoll_event = {0, }; -        struct event_data *ev_data = (void *)&epoll_event.data; -        int                lastidx = -1; - -        GF_VALIDATE_OR_GOTO ("event", event_pool, out); +	int ret = -1; -        pthread_mutex_lock (&event_pool->mutex); -        { -                idx = __event_getindex (event_pool, fd, idx_hint); +	ret = event_unregister_epoll_common (event_pool, fd, idx_hint, 0); -                if (idx == -1) { -                        gf_log ("epoll", GF_LOG_ERROR, -                                "index not found for fd=%d (idx_hint=%d)", -                                fd, idx_hint); -                        errno = ENOENT; -                        goto unlock; -                } - -                ret = epoll_ctl (event_pool->fd, EPOLL_CTL_DEL, fd, NULL); - -                /* if ret is -1, this array member should never be accessed */ -                /* if it is 0, the array member might be used by idx_cache -                 * in which case the member should not be accessed till -                 * it is reallocated -                 */ - -                event_pool->reg[idx].fd = -1; - -                if (ret == -1) { -                        gf_log ("epoll", GF_LOG_ERROR, -                                "fail to del fd(=%d) from epoll fd(=%d) (%s)", -                                fd, event_pool->fd, strerror (errno)); -                        goto unlock; -                } - -                lastidx = event_pool->used - 1; -                if (lastidx == idx) { -                        event_pool->used--; -                        goto unlock; -                } +	return ret; +} -                epoll_event.events = event_pool->reg[lastidx].events; -                ev_data->fd = event_pool->reg[lastidx].fd; -                ev_data->idx = idx; -                ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, ev_data->fd, -                                 &epoll_event); -                if (ret == -1) { -                        gf_log ("epoll", GF_LOG_ERROR, -                                "fail to modify fd(=%d) index %d to %d (%s)", -                                ev_data->fd, event_pool->used, idx, -                                strerror (errno)); -                        goto unlock; -                } +static int +event_unregister_close_epoll (struct event_pool *event_pool, int fd, +			      int idx_hint) +{ +	int ret = -1; -                /* just replace the unregistered idx by last one */ -                event_pool->reg[idx] = event_pool->reg[lastidx]; -                event_pool->used--; -        } -unlock: -        pthread_mutex_unlock (&event_pool->mutex); +	ret = event_unregister_epoll_common (event_pool, fd, idx_hint, 1); -out: -        return ret; +	return ret;  }  static int -event_select_on_epoll (struct event_pool *event_pool, int fd, int idx_hint, +event_select_on_epoll (struct event_pool *event_pool, int fd, int idx,                         int poll_in, int poll_out)  { -        int idx = -1;          int ret = -1; - +	struct event_slot_epoll *slot = NULL;          struct epoll_event epoll_event = {0, };          struct event_data *ev_data = (void *)&epoll_event.data;          GF_VALIDATE_OR_GOTO ("event", event_pool, out); -        pthread_mutex_lock (&event_pool->mutex); -        { -                idx = __event_getindex (event_pool, fd, idx_hint); - -                if (idx == -1) { -                        gf_log ("epoll", GF_LOG_ERROR, -                                "index not found for fd=%d (idx_hint=%d)", -                                fd, idx_hint); -                        errno = ENOENT; -                        goto unlock; -                } - -                switch (poll_in) { -                case 1: -                        event_pool->reg[idx].events |= EPOLLIN; -                        break; -                case 0: -                        event_pool->reg[idx].events &= ~EPOLLIN; -                        break; -                case -1: -                        /* do nothing */ -                        break; -                default: -                        gf_log ("epoll", GF_LOG_ERROR, -                                "invalid poll_in value %d", poll_in); -                        break; -                } - -                switch (poll_out) { -                case 1: -                        event_pool->reg[idx].events |= EPOLLOUT; -                        break; -                case 0: -                        event_pool->reg[idx].events &= ~EPOLLOUT; -                        break; -                case -1: -                        /* do nothing */ -                        break; -                default: -                        gf_log ("epoll", GF_LOG_ERROR, -                                "invalid poll_out value %d", poll_out); -                        break; -                } - -                epoll_event.events = event_pool->reg[idx].events; -                ev_data->fd = fd; -                ev_data->idx = idx; - -                ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd, -                                 &epoll_event); -                if (ret == -1) { -                        gf_log ("epoll", GF_LOG_ERROR, -                                "failed to modify fd(=%d) events to %d", -                                fd, epoll_event.events); -                } -        } +	slot = event_slot_get (event_pool, idx); + +	assert (slot->fd == fd); + +	LOCK (&slot->lock); +	{ +		__slot_update_events (slot, poll_in, poll_out); + +		epoll_event.events = slot->events; +		ev_data->idx = idx; +		ev_data->gen = slot->gen; + +		if (slot->in_handler) +			/* in_handler indicates at least one thread +			   executing event_dispatch_epoll_handler() +			   which will perform epoll_ctl(EPOLL_CTL_MOD) +			   anyways (because of EPOLLET) + +			   This not only saves a system call, but also +			   avoids possibility of another epoll thread +			   parallely picking up the next event while the +			   ongoing handler is still in progress (and +			   resulting in unnecessary contention on +			   rpc_transport_t->mutex). +			*/ +			goto unlock; + +		ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, fd, +				 &epoll_event); +		if (ret == -1) { +			gf_log ("epoll", GF_LOG_ERROR, +				"failed to modify fd(=%d) events to %d", +				fd, epoll_event.events); +		} +	}  unlock: -        pthread_mutex_unlock (&event_pool->mutex); +	UNLOCK (&slot->lock); + +	event_slot_unref (event_pool, slot, idx);  out: -        return ret; +        return idx;  }  static int  event_dispatch_epoll_handler (struct event_pool *event_pool, -                              struct epoll_event *events, int i) +                              struct epoll_event *event)  { -        struct event_data  *event_data = NULL; +        struct event_data  *ev_data = NULL; +	struct event_slot_epoll *slot = NULL;          event_handler_t     handler = NULL;          void               *data = NULL;          int                 idx = -1; +	int                 gen = -1;          int                 ret = -1; +	int                 fd = -1; - -        event_data = (void *)&events[i].data; +	ev_data = (void *)&event->data;          handler = NULL;          data = NULL; -        pthread_mutex_lock (&event_pool->mutex); -        { -                idx = __event_getindex (event_pool, event_data->fd, -                                        event_data->idx); - -                if (idx == -1) { -                        gf_log ("epoll", GF_LOG_ERROR, -                                "index not found for fd(=%d) (idx_hint=%d)", -                                event_data->fd, event_data->idx); -                        goto unlock; -                } - -                handler = event_pool->reg[idx].handler; -                data = event_pool->reg[idx].data; -        } -unlock: -        pthread_mutex_unlock (&event_pool->mutex); +	idx = ev_data->idx; +	gen = ev_data->gen; + +	slot = event_slot_get (event_pool, idx); + +	LOCK (&slot->lock); +	{ +		fd = slot->fd; +		if (fd == -1) { +			gf_log ("epoll", GF_LOG_ERROR, +				"stale fd found on idx=%d, gen=%d, events=%d, " +				"slot->gen=%d", +				idx, gen, event->events, slot->gen); +			/* fd got unregistered in another thread */ +			goto pre_unlock; +		} + +		if (gen != slot->gen) { +			gf_log ("epoll", GF_LOG_ERROR, +				"generation mismatch on idx=%d, gen=%d, " +				"slot->gen=%d, slot->fd=%d", +				idx, gen, slot->gen, slot->fd); +			/* slot was re-used and therefore is another fd! */ +			goto pre_unlock; +		} + +		handler = slot->handler; +		data = slot->data; + +		slot->in_handler++; +	} +pre_unlock: +	UNLOCK (&slot->lock); + +        if (!handler) +		goto out; + +	ret = handler (fd, idx, data, +		       (event->events & (EPOLLIN|EPOLLPRI)), +		       (event->events & (EPOLLOUT)), +		       (event->events & (EPOLLERR|EPOLLHUP))); + +	LOCK (&slot->lock); +	{ +		slot->in_handler--; + +		if (gen != slot->gen) { +			/* event_unregister() happened while we were +			   in handler() +			*/ +			gf_log ("epoll", GF_LOG_DEBUG, +				"generation bumped on idx=%d from " +				"gen=%d to slot->gen=%d, fd=%d, " +				"slot->fd=%d", +				idx, gen, slot->gen, fd, slot->fd); +			goto post_unlock; +		} + +		/* This call also picks up the changes made by another +		   thread calling event_select_on_epoll() while this +		   thread was busy in handler() +		*/ +		event->events = slot->events; +		ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, +				 fd, event); +	} +post_unlock: +	UNLOCK (&slot->lock); +out: +	event_slot_unref (event_pool, slot, idx); -        if (handler) -                ret = handler (event_data->fd, event_data->idx, data, -                               (events[i].events & (EPOLLIN|EPOLLPRI)), -                               (events[i].events & (EPOLLOUT)), -                               (events[i].events & (EPOLLERR|EPOLLHUP)));          return ret;  } -static int -event_dispatch_epoll (struct event_pool *event_pool) +static void * +event_dispatch_epoll_worker (void *data)  { -        struct epoll_event *events = NULL; -        int                 size = 0; -        int                 i = 0; +        struct epoll_event  event;          int                 ret = -1; +	struct event_pool  *event_pool = data;          GF_VALIDATE_OR_GOTO ("event", event_pool, out); -        while (1) { -                pthread_mutex_lock (&event_pool->mutex); -                { -                        while (event_pool->used == 0) -                                pthread_cond_wait (&event_pool->cond, -                                                   &event_pool->mutex); - -                        if (event_pool->used > event_pool->evcache_size) { -                                GF_FREE (event_pool->evcache); - -                                event_pool->evcache = events = NULL; - -                                event_pool->evcache_size = -                                        event_pool->used + 256; - -                                events = GF_CALLOC (event_pool->evcache_size, -                                                    sizeof (struct epoll_event), -                                                    gf_common_mt_epoll_event); -                                if (!events) -                                        break; - -                                event_pool->evcache = events; -                        } -                } -                pthread_mutex_unlock (&event_pool->mutex); - -                ret = epoll_wait (event_pool->fd, event_pool->evcache, -                                  event_pool->evcache_size, -1); +	for (;;) { +                ret = epoll_wait (event_pool->fd, &event, 1, -1);                  if (ret == 0)                          /* timeout */ @@ -436,28 +600,43 @@ event_dispatch_epoll (struct event_pool *event_pool)                          /* sys call */                          continue; -                size = ret; +		ret = event_dispatch_epoll_handler (event_pool, &event); +        } +out: +        return NULL; +} -                for (i = 0; i < size; i++) { -                        if (!events || !events[i].events) -                                continue; -                        ret = event_dispatch_epoll_handler (event_pool, -                                                            events, i); -                } -        } +#define GLUSTERFS_EPOLL_MAXTHREADS 2 -out: -        return ret; + +static int +event_dispatch_epoll (struct event_pool *event_pool) +{ +	int               i = 0; +	pthread_t         pollers[GLUSTERFS_EPOLL_MAXTHREADS]; +	int               ret = -1; + +	for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) { +		ret = pthread_create (&pollers[i], NULL, +				      event_dispatch_epoll_worker, +				      event_pool); +	} + +	for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) +		pthread_join (pollers[i], NULL); + +	return ret;  }  struct event_ops event_ops_epoll = { -        .new              = event_pool_new_epoll, -        .event_register   = event_register_epoll, -        .event_select_on  = event_select_on_epoll, -        .event_unregister = event_unregister_epoll, -        .event_dispatch   = event_dispatch_epoll +        .new                    = event_pool_new_epoll, +        .event_register         = event_register_epoll, +        .event_select_on        = event_select_on_epoll, +        .event_unregister       = event_unregister_epoll, +        .event_unregister_close = event_unregister_close_epoll, +        .event_dispatch         = event_dispatch_epoll  };  #endif diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 7f7f560d086..a7e2e663103 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -26,6 +26,16 @@  #include "config.h"  #endif + + +struct event_slot_poll { +	int fd; +	int events; +	void *data; +	event_handler_t handler; +}; + +  static int  event_register_poll (struct event_pool *event_pool, int fd,                       event_handler_t handler, @@ -63,12 +73,16 @@ __event_getindex (struct event_pool *event_pool, int fd, int idx)          GF_VALIDATE_OR_GOTO ("event", event_pool, out); +        /* lookup in used space based on index provided */          if (idx > -1 && idx < event_pool->used) { -                if (event_pool->reg[idx].fd == fd) +                if (event_pool->reg[idx].fd == fd) {                          ret = idx; +                        goto out; +                }          } -        for (i=0; ret == -1 && i<event_pool->used; i++) { +        /* search in used space, if lookup fails */ +        for (i = 0; i < event_pool->used; i++) {                  if (event_pool->reg[i].fd == fd) {                          ret = i;                          break; @@ -264,6 +278,20 @@ out:  static int +event_unregister_close_poll (struct event_pool *event_pool, int fd, +			     int idx_hint) +{ +	int ret = -1; + +	ret = event_unregister_poll (event_pool, fd, idx_hint); + +	close (fd); + +        return ret; +} + + +static int  event_select_on_poll (struct event_pool *event_pool, int fd, int idx_hint,                        int poll_in, int poll_out)  { @@ -443,9 +471,10 @@ out:  struct event_ops event_ops_poll = { -        .new              = event_pool_new_poll, -        .event_register   = event_register_poll, -        .event_select_on  = event_select_on_poll, -        .event_unregister = event_unregister_poll, -        .event_dispatch   = event_dispatch_poll +        .new                    = event_pool_new_poll, +        .event_register         = event_register_poll, +        .event_select_on        = event_select_on_poll, +        .event_unregister       = event_unregister_poll, +        .event_unregister_close = event_unregister_close_poll, +        .event_dispatch         = event_dispatch_poll  }; diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 0197e7948b5..6c253df3c1a 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -32,10 +32,10 @@ struct event_pool *  event_pool_new (int count)  {          struct event_pool *event_pool = NULL; -        extern struct event_ops event_ops_poll; +	extern struct event_ops event_ops_poll;  #ifdef HAVE_SYS_EPOLL_H -        extern struct event_ops event_ops_epoll; +	extern struct event_ops event_ops_epoll;          event_pool = event_ops_epoll.new (count); @@ -89,6 +89,20 @@ out:  int +event_unregister_close (struct event_pool *event_pool, int fd, int idx) +{ +        int ret = -1; + +        GF_VALIDATE_OR_GOTO ("event", event_pool, out); + +        ret = event_pool->ops->event_unregister_close (event_pool, fd, idx); + +out: +        return ret; +} + + +int  event_select_on (struct event_pool *event_pool, int fd, int idx_hint,                   int poll_in, int poll_out)  { diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h index 7ed182492e2..3b3ab0e4b2f 100644 --- a/libglusterfs/src/event.h +++ b/libglusterfs/src/event.h @@ -20,15 +20,20 @@  struct event_pool;  struct event_ops; +struct event_slot_poll; +struct event_slot_epoll;  struct event_data { -	int fd;  	int idx; +	int gen;  } __attribute__ ((__packed__, __may_alias__));  typedef int (*event_handler_t) (int fd, int idx, void *data,  				int poll_in, int poll_out, int poll_err); +#define EVENT_EPOLL_TABLES 1024 +#define EVENT_EPOLL_SLOTS 1024 +  struct event_pool {  	struct event_ops *ops; @@ -36,12 +41,9 @@ struct event_pool {  	int breaker[2];  	int count; -	struct { -		int fd; -		int events; -		void *data; -		event_handler_t handler; -	} *reg; +	struct event_slot_poll  *reg; +	struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES]; +	int slots_used[EVENT_EPOLL_TABLES];  	int used;  	int changed; @@ -65,6 +67,9 @@ struct event_ops {          int (*event_unregister) (struct event_pool *event_pool, int fd, int idx); +        int (*event_unregister_close) (struct event_pool *event_pool, int fd, +				       int idx); +          int (*event_dispatch) (struct event_pool *event_pool);  }; @@ -75,6 +80,7 @@ int event_register (struct event_pool *event_pool, int fd,  		    event_handler_t handler,  		    void *data, int poll_in, int poll_out);  int event_unregister (struct event_pool *event_pool, int fd, int idx); +int event_unregister_close (struct event_pool *event_pool, int fd, int idx);  int event_dispatch (struct event_pool *event_pool);  #endif /* _EVENT_H_ */ diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index 1cce6db7501..388b8dedfd9 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -125,6 +125,7 @@ enum gf_common_mem_types_ {  	gf_common_mt_strfd_t              = 109,  	gf_common_mt_strfd_data_t         = 110,          gf_common_mt_regex_t              = 111, +        gf_common_mt_ereg                 = 112,          gf_common_mt_end  };  #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 107590b0273..0a3a5812a91 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -291,6 +291,22 @@ ssl_do (rpc_transport_t *this, void *buf, size_t len, SSL_trinary_func *func)  		case SSL_ERROR_NONE:  			return r;  		case SSL_ERROR_WANT_READ: +                        /* If we are attempting to connect/accept then we +                         * should wait here on the poll, for the SSL +                         * (re)negotiation to complete, else we would error out +                         * on the accept/connect. +                         * If we are here when attempting to read/write +                         * then we return r (or -1) as the socket is always +                         * primed for the read event, and it would eventually +                         * call one of the SSL routines */ +                        /* NOTE: Only way to determine this is a accept/connect +                         * is to examine buf or func, which is not very +                         * clean */ +                        if ((func == (SSL_trinary_func *)SSL_read) +                            || (func == (SSL_trinary_func *) SSL_write)) { +                                return r; +                        } +  			pfd.fd = priv->sock;  			pfd.events = POLLIN;  			if (poll(&pfd,1,-1) < 0) { @@ -944,9 +960,8 @@ __socket_reset (rpc_transport_t *this)          memset (&priv->incoming, 0, sizeof (priv->incoming)); -        event_unregister (this->ctx->event_pool, priv->sock, priv->idx); +        event_unregister_close (this->ctx->event_pool, priv->sock, priv->idx); -        close (priv->sock);          priv->sock = -1;          priv->idx = -1;          priv->connected = -1; @@ -2236,9 +2251,16 @@ socket_event_poll_in (rpc_transport_t *this)          rpc_transport_pollin_t *pollin = NULL;          socket_private_t       *priv = this->private; -        ret = socket_proto_state_machine (this, &pollin); +	do { +		/* consume all we can, this is our only chance +		   (Edge Triggered polling in epoll) +		*/ +		pollin = NULL; +		ret = socket_proto_state_machine (this, &pollin); + +		if (!pollin) +			break; -        if (pollin != NULL) {                  priv->ot_state = OT_CALLBACK;                  ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,                                              pollin); @@ -2246,7 +2268,8 @@ socket_event_poll_in (rpc_transport_t *this)                          priv->ot_state = OT_RUNNING;                  }                  rpc_transport_pollin_destroy (pollin); -        } + +        } while (pollin);          return ret;  }  | 
