diff options
Diffstat (limited to 'xlators/features/changelog/lib')
5 files changed, 280 insertions, 140 deletions
diff --git a/xlators/features/changelog/lib/examples/c/get-changes-multi.c b/xlators/features/changelog/lib/examples/c/get-changes-multi.c index ae404bc7ad6..3741bdf6edc 100644 --- a/xlators/features/changelog/lib/examples/c/get-changes-multi.c +++ b/xlators/features/changelog/lib/examples/c/get-changes-multi.c @@ -44,7 +44,7 @@ void brick_callback (void *xl, char *brick, void fill_brick_spec (struct gf_brick_spec *brick, char *path) { brick->brick_path = strdup (path); - brick->filter = CHANGELOG_OP_TYPE_RELEASE; + brick->filter = CHANGELOG_OP_TYPE_BR_RELEASE; brick->init = brick_init; brick->fini = brick_fini; @@ -75,7 +75,7 @@ main (int argc, char **argv) goto error_return; ret = gf_changelog_register_generic ((struct gf_brick_spec *)bricks, 2, - 1, "/tmp/multi-changes.log", 9, + 0, "/tmp/multi-changes.log", 9, NULL); if (ret) goto error_return; diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h index 4247cb46718..adde1e57bbf 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h +++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h @@ -47,6 +47,7 @@ typedef struct read_line { } read_line_t; struct gf_changelog; +struct gf_event; /** * Event list for ordered event notification @@ -64,7 +65,7 @@ struct gf_event_list { struct gf_changelog *entry; /* backpointer to it's brick encapsulator (entry) */ - struct list_head events; /* list of events (ordered) */ + struct list_head events; /* list of events */ }; /** @@ -84,7 +85,7 @@ struct gf_event { /** * assign the base address of the IO vector to the correct memory - * area and set it's addressable length. +o * area and set it's addressable length. */ #define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \ do { \ @@ -95,11 +96,20 @@ struct gf_event { pos += len; \ } while (0) +typedef enum gf_changelog_conn_state { + GF_CHANGELOG_CONN_STATE_PENDING = 0, + GF_CHANGELOG_CONN_STATE_ACCEPTED, + GF_CHANGELOG_CONN_STATE_DISCONNECTED, +} gf_changelog_conn_state_t; + /** * An instance of this structure is allocated for each brick for which * notifications are streamed. */ typedef struct gf_changelog { + gf_lock_t statelock; + gf_changelog_conn_state_t connstate; + xlator_t *this; struct list_head list; /* list of instances */ @@ -125,6 +135,9 @@ typedef struct gf_changelog { gf_boolean_t ordered; + void (*queueevent) (struct gf_event_list *, struct gf_event *); + void (*pickevent) (struct gf_event_list *, struct gf_event **); + struct gf_event_list event; } gf_changelog_t; @@ -140,13 +153,17 @@ gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event) /** private structure */ typedef struct gf_private { - gf_lock_t lock; /* protects ->connections */ + pthread_mutex_t lock; /* protects ->connections, cleanups */ + pthread_cond_t cond; void *api; /* pointer for API access */ pthread_t poller; /* event poller thread */ + pthread_t connectionjanitor; /* connection cleaner */ struct list_head connections; /* list of connections */ + struct list_head cleanups; /* list of connection to be + cleaned up */ } gf_private_t; #define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api) @@ -218,4 +235,25 @@ gf_thread_cleanup (xlator_t *this, pthread_t thread); void * gf_changelog_callback_invoker (void *arg); +int +gf_cleanup_event (xlator_t *, struct gf_event_list *); + +/* (un)ordered event queueing */ +void +queue_ordered_event (struct gf_event_list *, struct gf_event *); + +void +queue_unordered_event (struct gf_event_list *, struct gf_event *); + +/* (un)ordered event picking */ +void +pick_event_ordered (struct gf_event_list *, struct gf_event **); + +void +pick_event_unordered (struct gf_event_list *, struct gf_event **); + +/* connection janitor thread */ +void * +gf_changelog_connection_janitor (void *); + #endif diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c index d7e60fb9634..457d829e69b 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-reborp.c +++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c @@ -21,19 +21,135 @@ struct rpcsvc_program *gf_changelog_reborp_programs[]; +void * +gf_changelog_connection_janitor (void *arg) +{ + int32_t ret = 0; + xlator_t *this = NULL; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; + struct gf_event *event = NULL; + struct gf_event_list *ev = NULL; + unsigned long drained = 0; + + this = arg; + THIS = this; + + priv = this->private; + + while (1) { + pthread_mutex_lock (&priv->lock); + { + while (list_empty (&priv->cleanups)) + pthread_cond_wait (&priv->cond, &priv->lock); + + entry = list_first_entry (&priv->cleanups, + gf_changelog_t, list); + list_del_init (&entry->list); + } + pthread_mutex_unlock (&priv->lock); + + drained = 0; + ev = &entry->event; + + gf_log (this->name, GF_LOG_INFO, + "Cleaning brick entry for brick %s", entry->brick); + + /* 0x0: disbale rpc-clnt */ + rpc_clnt_disable (RPC_PROBER (entry)); + + /* 0x1: cleanup callback invoker thread */ + ret = gf_cleanup_event (this, ev); + if (ret) + continue; + + /* 0x2: drain pending events */ + while (!list_empty (&ev->events)) { + event = list_first_entry (&ev->events, + struct gf_event, list); + gf_log (this->name, GF_LOG_INFO, + "Draining event [Seq: %lu, Payload: %d]", + event->seq, event->count); + + GF_FREE (event); + drained++; + } + + gf_log (this->name, GF_LOG_INFO, + "Drained %lu events", drained); + + /* 0x3: freeup brick entry */ + gf_log (this->name, GF_LOG_INFO, "freeing entry %p", entry); + LOCK_DESTROY (&entry->statelock); + GF_FREE (entry); + } + + return NULL; +} + +static inline void +__gf_changelog_set_conn_state (gf_changelog_t *entry, + gf_changelog_conn_state_t state) +{ + entry->connstate = state; +} + /** - * On a reverse connection, unlink the socket file. + * state check login to gaurd access object after free */ +static inline void +gf_changelog_check_event (gf_private_t *priv, + gf_changelog_t *entry, rpcsvc_event_t event) +{ + gf_boolean_t needfree = _gf_false; + gf_changelog_conn_state_t laststate; + /** + * need to handle couple of connection states to gaurd correct + * freeing of object. + */ + LOCK (&entry->statelock); + { + laststate = entry->connstate; + if (event == RPCSVC_EVENT_ACCEPT) { + __gf_changelog_set_conn_state + (entry, GF_CHANGELOG_CONN_STATE_ACCEPTED); + + if (laststate == GF_CHANGELOG_CONN_STATE_DISCONNECTED) + needfree = _gf_true; + } + + if (event == RPCSVC_EVENT_DISCONNECT) { + __gf_changelog_set_conn_state + (entry, GF_CHANGELOG_CONN_STATE_DISCONNECTED); + + if (laststate == GF_CHANGELOG_CONN_STATE_ACCEPTED) + needfree = _gf_true; + } + } + UNLOCK (&entry->statelock); + + /** + * TODO: + * Handle the race between ACCEPT and DISCONNECT in the + * reconnect code. So purging of entry is deliberately + * avoided here. It will be handled in the reconnect code. + */ +} + int gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, rpcsvc_event_t event, void *data) { - int ret = 0; + int ret = 0; xlator_t *this = NULL; gf_private_t *priv = NULL; gf_changelog_t *entry = NULL; char sock[UNIX_PATH_MAX] = {0,}; + if (!(event == RPCSVC_EVENT_ACCEPT || + event == RPCSVC_EVENT_DISCONNECT)) + return 0; + entry = mydata; this = entry->this; priv = this->private; @@ -42,29 +158,23 @@ gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata, case RPCSVC_EVENT_ACCEPT: ret = unlink (RPC_SOCK(entry)); if (ret != 0) - gf_log (this->name, GF_LOG_WARNING, "failed to unlink" - " reverse socket file %s", RPC_SOCK (entry)); + gf_log (this->name, GF_LOG_WARNING, "failed to unlink " + "reverse socket %s", RPC_SOCK (entry)); if (entry->connected) GF_CHANGELOG_INVOKE_CBK (this, entry->connected, entry->brick, entry->ptr); break; case RPCSVC_EVENT_DISCONNECT: - LOCK (&priv->lock); - { - list_del (&entry->list); - } - UNLOCK (&priv->lock); - if (entry->disconnected) GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected, entry->brick, entry->ptr); - - GF_FREE (entry); - break; + /* passthrough */ default: break; } + /* gf_changelog_check_event (priv, entry, event); */ + return 0; } @@ -140,12 +250,29 @@ __can_process_event (struct gf_event_list *ev, struct gf_event **event) return 0; } -inline void -__process_event_list (struct gf_event_list *ev, struct gf_event **event) +void +pick_event_ordered (struct gf_event_list *ev, struct gf_event **event) { - while (list_empty (&ev->events) - || !__can_process_event (ev, event)) - pthread_cond_wait (&ev->cond, &ev->lock); + pthread_mutex_lock (&ev->lock); + { + while (list_empty (&ev->events) + || !__can_process_event (ev, event)) + pthread_cond_wait (&ev->cond, &ev->lock); + } + pthread_mutex_unlock (&ev->lock); +} + +void +pick_event_unordered (struct gf_event_list *ev, struct gf_event **event) +{ + pthread_mutex_lock (&ev->lock); + { + while (list_empty (&ev->events)) + pthread_cond_wait (&ev->cond, &ev->lock); + *event = list_first_entry (&ev->events, struct gf_event, list); + list_del (&(*event)->list); + } + pthread_mutex_unlock (&ev->lock); } void * @@ -160,14 +287,10 @@ gf_changelog_callback_invoker (void *arg) ev = arg; entry = ev->entry; - this = entry->this; + THIS = this = entry->this; while (1) { - pthread_mutex_lock (&ev->lock); - { - __process_event_list (ev, &event); - } - pthread_mutex_unlock (&ev->lock); + entry->pickevent (ev, &event); vec = (struct iovec *) &event->iov; gf_changelog_invoke_callback (entry, &vec, event->count); @@ -192,9 +315,36 @@ orderfn (struct list_head *pos1, struct list_head *pos2) return -1; } +void +queue_ordered_event (struct gf_event_list *ev, struct gf_event *event) +{ + /* add event to the ordered event list and wake up listner(s) */ + pthread_mutex_lock (&ev->lock); + { + list_add_order (&event->list, &ev->events, orderfn); + if (!ev->next_seq) + ev->next_seq = event->seq; + if (ev->next_seq == event->seq) + pthread_cond_signal (&ev->cond); + } + pthread_mutex_unlock (&ev->lock); +} + +void +queue_unordered_event (struct gf_event_list *ev, struct gf_event *event) +{ + /* add event to the tail of the queue and wake up listener(s) */ + pthread_mutex_lock (&ev->lock); + { + list_add_tail (&event->list, &ev->events); + pthread_cond_signal (&ev->cond); + } + pthread_mutex_unlock (&ev->lock); +} + int -gf_changelog_ordered_event_handler (rpcsvc_request_t *req, - xlator_t *this, gf_changelog_t *entry) +gf_changelog_event_handler (rpcsvc_request_t *req, + xlator_t *this, gf_changelog_t *entry) { int i = 0; size_t payloadlen = 0; @@ -255,16 +405,8 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req, rpc_req.seq, entry->brick, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt, payloadlen); - /* add it to the ordered event list and wake up listner(s) */ - pthread_mutex_lock (&ev->lock); - { - list_add_order (&event->list, &ev->events, orderfn); - if (!ev->next_seq) - ev->next_seq = event->seq; - if (ev->next_seq == event->seq) - pthread_cond_signal (&ev->cond); - } - pthread_mutex_unlock (&ev->lock); + /* dispatch event */ + entry->queueevent (ev, event); /* ack sequence number */ rpc_rsp.op_ret = 0; @@ -281,76 +423,18 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req, } int -gf_changelog_unordered_event_handler (rpcsvc_request_t *req, - xlator_t *this, gf_changelog_t *entry) -{ - int i = 0; - int ret = 0; - ssize_t len = 0; - int payloadcnt = 0; - struct iovec vector[MAX_IOVEC] = {{0,}, }; - changelog_event_req rpc_req = {0,}; - changelog_event_rsp rpc_rsp = {0,}; - - len = xdr_to_generic (req->msg[0], - &rpc_req, (xdrproc_t)xdr_changelog_event_req); - if (len < 0) { - gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed"); - req->rpc_err = GARBAGE_ARGS; - goto handle_xdr_error; - } - - /* prepare payload */ - if (len < req->msg[0].iov_len) { - payloadcnt = 1; - vector[0].iov_base = (req->msg[0].iov_base + len); - vector[0].iov_len = (req->msg[0].iov_len - len); - } - - for (i = 1; i < req->count; i++) { - vector[payloadcnt++] = req->msg[i]; - } - - gf_log (this->name, GF_LOG_DEBUG, - "seq: %lu (time: %lu.%lu), (vec: %d)", - rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt); - - /* invoke callback */ - struct iovec *vec = (struct iovec *) &vector; - gf_changelog_invoke_callback (entry, &vec, payloadcnt); - - /* ack sequence number */ - rpc_rsp.op_ret = 0; - rpc_rsp.seq = rpc_req.seq; - - goto submit_rpc; - - handle_xdr_error: - rpc_rsp.op_ret = -1; - rpc_rsp.seq = 0; /* invalid */ - submit_rpc: - return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, - (xdrproc_t)xdr_changelog_event_rsp); -} - -int gf_changelog_reborp_handle_event (rpcsvc_request_t *req) { - int ret = 0; - xlator_t *this = NULL; - rpcsvc_t *svc = NULL; - gf_changelog_t *entry = NULL; + xlator_t *this = NULL; + rpcsvc_t *svc = NULL; + gf_changelog_t *entry = NULL; svc = rpcsvc_request_service (req); entry = svc->mydata; this = THIS = entry->this; - ret = GF_NEED_ORDERED_EVENTS (entry) - ? gf_changelog_ordered_event_handler (req, this, entry) - : gf_changelog_unordered_event_handler (req, this, entry); - - return ret; + return gf_changelog_event_handler (req, this, entry); } rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = { diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c index c2a4c044d23..270632bc71b 100644 --- a/xlators/features/changelog/lib/src/gf-changelog-rpc.c +++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c @@ -19,17 +19,11 @@ int gf_changelog_rpc_notify (struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event, void *data) { - xlator_t *this = NULL; - - this = mydata; - switch (event) { case RPC_CLNT_CONNECT: rpc_clnt_set_connected (&rpc->conn); break; case RPC_CLNT_DISCONNECT: - rpc_clnt_unset_connected (&rpc->conn); - break; case RPC_CLNT_MSG: case RPC_CLNT_DESTROY: break; diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c index e1cfdb038fa..103a7b01eb0 100644 --- a/xlators/features/changelog/lib/src/gf-changelog.c +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -1,5 +1,5 @@ /* - Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser @@ -54,14 +54,20 @@ gf_private_t *gf_changelog_alloc_priv () if (!priv) goto error_return; INIT_LIST_HEAD (&priv->connections); + INIT_LIST_HEAD (&priv->cleanups); - ret = LOCK_INIT (&priv->lock); + ret = pthread_mutex_init (&priv->lock, NULL); if (ret != 0) goto free_priv; - priv->api = NULL; + ret = pthread_cond_init (&priv->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + priv->api = NULL; return priv; + cleanup_mutex: + (void) pthread_mutex_destroy (&priv->lock); free_priv: GF_FREE (priv); error_return: @@ -248,21 +254,23 @@ gf_changelog_setup_rpc (xlator_t *this, return -1; } -static void -gf_cleanup_event (gf_changelog_t *entry) +int +gf_cleanup_event (xlator_t *this, struct gf_event_list *ev) { - xlator_t *this = NULL; - struct gf_event_list *ev = NULL; - - this = entry->this; - ev = &entry->event; - - (void) gf_thread_cleanup (this, ev->invoker); + int ret = 0; - (void) pthread_mutex_destroy (&ev->lock); - (void) pthread_cond_destroy (&ev->cond); + ret = gf_thread_cleanup (this, ev->invoker); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "cannot cleanup callback invoker thread " + " [reason: %s]. Not freeing resources", + strerror (-ret)); + return -1; + } ev->entry = NULL; + + return 0; } static int @@ -284,11 +292,17 @@ gf_init_event (gf_changelog_t *entry) ev->next_seq = 0; /* bootstrap sequencing */ - if (entry->ordered) { - ret = pthread_create (&ev->invoker, NULL, - gf_changelog_callback_invoker, ev); - if (ret != 0) - goto cleanup_cond; + ret = gf_thread_create (&ev->invoker, NULL, + gf_changelog_callback_invoker, ev); + if (ret != 0) + goto cleanup_cond; + + if (GF_NEED_ORDERED_EVENTS (entry)) { + entry->pickevent = pick_event_ordered; + entry->queueevent = queue_ordered_event; + } else { + entry->pickevent = pick_event_unordered; + entry->queueevent = queue_unordered_event; } return 0; @@ -303,7 +317,7 @@ gf_init_event (gf_changelog_t *entry) /** * TODO: - * - cleanup invoker thread (if ordered mode) + * - cleanup invoker thread * - cleanup event list * - destroy rpc{-clnt, svc} */ @@ -339,6 +353,9 @@ gf_setup_brick_connection (xlator_t *this, goto error_return; INIT_LIST_HEAD (&entry->list); + LOCK_INIT (&entry->statelock); + entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; + entry->notify = brick->filter; (void) strncpy (entry->brick, brick->brick_path, PATH_MAX); @@ -346,11 +363,9 @@ gf_setup_brick_connection (xlator_t *this, entry->invokerxl = xl; entry->ordered = ordered; - if (ordered) { - ret = gf_init_event (entry); - if (ret) - goto free_entry; - } + ret = gf_init_event (entry); + if (ret) + goto free_entry; entry->fini = brick->fini; entry->callback = brick->callback; @@ -362,11 +377,11 @@ gf_setup_brick_connection (xlator_t *this, goto cleanup_event; priv->api = entry->ptr; /* pointer to API, if required */ - LOCK (&priv->lock); + pthread_mutex_lock (&priv->lock); { list_add_tail (&entry->list, &priv->connections); } - UNLOCK (&priv->lock); + pthread_mutex_unlock (&priv->lock); ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER); if (ret) @@ -374,9 +389,9 @@ gf_setup_brick_connection (xlator_t *this, return 0; cleanup_event: - if (ordered) - gf_cleanup_event (entry); + (void) gf_cleanup_event (this, &entry->event); free_entry: + gf_log (this->name, GF_LOG_DEBUG, "freeing entry %p", entry); list_del (&entry->list); /* FIXME: kludge for now */ GF_FREE (entry); error_return: @@ -436,8 +451,8 @@ gf_changelog_set_master (xlator_t *master, void *xl) if (!xl) { /* poller thread */ - ret = pthread_create (&priv->poller, - NULL, changelog_rpc_poller, THIS); + ret = gf_thread_create (&priv->poller, + NULL, changelog_rpc_poller, THIS); if (ret != 0) { GF_FREE (priv); gf_log (master->name, GF_LOG_ERROR, @@ -458,6 +473,7 @@ int gf_changelog_init (void *xl) { int ret = 0; + gf_private_t *priv = NULL; if (master) return 0; @@ -474,6 +490,14 @@ gf_changelog_init (void *xl) if (ret) goto dealloc_name; + priv = master->private; + ret = gf_thread_create (&priv->connectionjanitor, NULL, + gf_changelog_connection_janitor, master); + if (ret != 0) { + /* TODO: cleanup priv, mutex (poller thread for !xl) */ + goto dealloc_name; + } + return 0; dealloc_name: |