summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/lib')
-rw-r--r--xlators/features/changelog/lib/examples/c/get-changes-multi.c4
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h44
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-reborp.c280
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-rpc.c6
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c86
5 files changed, 280 insertions, 140 deletions
diff --git a/xlators/features/changelog/lib/examples/c/get-changes-multi.c b/xlators/features/changelog/lib/examples/c/get-changes-multi.c
index ae404bc7ad6..3741bdf6edc 100644
--- a/xlators/features/changelog/lib/examples/c/get-changes-multi.c
+++ b/xlators/features/changelog/lib/examples/c/get-changes-multi.c
@@ -44,7 +44,7 @@ void brick_callback (void *xl, char *brick,
void fill_brick_spec (struct gf_brick_spec *brick, char *path)
{
brick->brick_path = strdup (path);
- brick->filter = CHANGELOG_OP_TYPE_RELEASE;
+ brick->filter = CHANGELOG_OP_TYPE_BR_RELEASE;
brick->init = brick_init;
brick->fini = brick_fini;
@@ -75,7 +75,7 @@ main (int argc, char **argv)
goto error_return;
ret = gf_changelog_register_generic ((struct gf_brick_spec *)bricks, 2,
- 1, "/tmp/multi-changes.log", 9,
+ 0, "/tmp/multi-changes.log", 9,
NULL);
if (ret)
goto error_return;
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
index 4247cb46718..adde1e57bbf 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -47,6 +47,7 @@ typedef struct read_line {
} read_line_t;
struct gf_changelog;
+struct gf_event;
/**
* Event list for ordered event notification
@@ -64,7 +65,7 @@ struct gf_event_list {
struct gf_changelog *entry; /* backpointer to it's brick
encapsulator (entry) */
- struct list_head events; /* list of events (ordered) */
+ struct list_head events; /* list of events */
};
/**
@@ -84,7 +85,7 @@ struct gf_event {
/**
* assign the base address of the IO vector to the correct memory
- * area and set it's addressable length.
+o * area and set it's addressable length.
*/
#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \
do { \
@@ -95,11 +96,20 @@ struct gf_event {
pos += len; \
} while (0)
+typedef enum gf_changelog_conn_state {
+ GF_CHANGELOG_CONN_STATE_PENDING = 0,
+ GF_CHANGELOG_CONN_STATE_ACCEPTED,
+ GF_CHANGELOG_CONN_STATE_DISCONNECTED,
+} gf_changelog_conn_state_t;
+
/**
* An instance of this structure is allocated for each brick for which
* notifications are streamed.
*/
typedef struct gf_changelog {
+ gf_lock_t statelock;
+ gf_changelog_conn_state_t connstate;
+
xlator_t *this;
struct list_head list; /* list of instances */
@@ -125,6 +135,9 @@ typedef struct gf_changelog {
gf_boolean_t ordered;
+ void (*queueevent) (struct gf_event_list *, struct gf_event *);
+ void (*pickevent) (struct gf_event_list *, struct gf_event **);
+
struct gf_event_list event;
} gf_changelog_t;
@@ -140,13 +153,17 @@ gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event)
/** private structure */
typedef struct gf_private {
- gf_lock_t lock; /* protects ->connections */
+ pthread_mutex_t lock; /* protects ->connections, cleanups */
+ pthread_cond_t cond;
void *api; /* pointer for API access */
pthread_t poller; /* event poller thread */
+ pthread_t connectionjanitor; /* connection cleaner */
struct list_head connections; /* list of connections */
+ struct list_head cleanups; /* list of connection to be
+ cleaned up */
} gf_private_t;
#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api)
@@ -218,4 +235,25 @@ gf_thread_cleanup (xlator_t *this, pthread_t thread);
void *
gf_changelog_callback_invoker (void *arg);
+int
+gf_cleanup_event (xlator_t *, struct gf_event_list *);
+
+/* (un)ordered event queueing */
+void
+queue_ordered_event (struct gf_event_list *, struct gf_event *);
+
+void
+queue_unordered_event (struct gf_event_list *, struct gf_event *);
+
+/* (un)ordered event picking */
+void
+pick_event_ordered (struct gf_event_list *, struct gf_event **);
+
+void
+pick_event_unordered (struct gf_event_list *, struct gf_event **);
+
+/* connection janitor thread */
+void *
+gf_changelog_connection_janitor (void *);
+
#endif
diff --git a/xlators/features/changelog/lib/src/gf-changelog-reborp.c b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
index d7e60fb9634..457d829e69b 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-reborp.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-reborp.c
@@ -21,19 +21,135 @@
struct rpcsvc_program *gf_changelog_reborp_programs[];
+void *
+gf_changelog_connection_janitor (void *arg)
+{
+ int32_t ret = 0;
+ xlator_t *this = NULL;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
+ struct gf_event *event = NULL;
+ struct gf_event_list *ev = NULL;
+ unsigned long drained = 0;
+
+ this = arg;
+ THIS = this;
+
+ priv = this->private;
+
+ while (1) {
+ pthread_mutex_lock (&priv->lock);
+ {
+ while (list_empty (&priv->cleanups))
+ pthread_cond_wait (&priv->cond, &priv->lock);
+
+ entry = list_first_entry (&priv->cleanups,
+ gf_changelog_t, list);
+ list_del_init (&entry->list);
+ }
+ pthread_mutex_unlock (&priv->lock);
+
+ drained = 0;
+ ev = &entry->event;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Cleaning brick entry for brick %s", entry->brick);
+
+ /* 0x0: disbale rpc-clnt */
+ rpc_clnt_disable (RPC_PROBER (entry));
+
+ /* 0x1: cleanup callback invoker thread */
+ ret = gf_cleanup_event (this, ev);
+ if (ret)
+ continue;
+
+ /* 0x2: drain pending events */
+ while (!list_empty (&ev->events)) {
+ event = list_first_entry (&ev->events,
+ struct gf_event, list);
+ gf_log (this->name, GF_LOG_INFO,
+ "Draining event [Seq: %lu, Payload: %d]",
+ event->seq, event->count);
+
+ GF_FREE (event);
+ drained++;
+ }
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Drained %lu events", drained);
+
+ /* 0x3: freeup brick entry */
+ gf_log (this->name, GF_LOG_INFO, "freeing entry %p", entry);
+ LOCK_DESTROY (&entry->statelock);
+ GF_FREE (entry);
+ }
+
+ return NULL;
+}
+
+static inline void
+__gf_changelog_set_conn_state (gf_changelog_t *entry,
+ gf_changelog_conn_state_t state)
+{
+ entry->connstate = state;
+}
+
/**
- * On a reverse connection, unlink the socket file.
+ * state check login to gaurd access object after free
*/
+static inline void
+gf_changelog_check_event (gf_private_t *priv,
+ gf_changelog_t *entry, rpcsvc_event_t event)
+{
+ gf_boolean_t needfree = _gf_false;
+ gf_changelog_conn_state_t laststate;
+ /**
+ * need to handle couple of connection states to gaurd correct
+ * freeing of object.
+ */
+ LOCK (&entry->statelock);
+ {
+ laststate = entry->connstate;
+ if (event == RPCSVC_EVENT_ACCEPT) {
+ __gf_changelog_set_conn_state
+ (entry, GF_CHANGELOG_CONN_STATE_ACCEPTED);
+
+ if (laststate == GF_CHANGELOG_CONN_STATE_DISCONNECTED)
+ needfree = _gf_true;
+ }
+
+ if (event == RPCSVC_EVENT_DISCONNECT) {
+ __gf_changelog_set_conn_state
+ (entry, GF_CHANGELOG_CONN_STATE_DISCONNECTED);
+
+ if (laststate == GF_CHANGELOG_CONN_STATE_ACCEPTED)
+ needfree = _gf_true;
+ }
+ }
+ UNLOCK (&entry->statelock);
+
+ /**
+ * TODO:
+ * Handle the race between ACCEPT and DISCONNECT in the
+ * reconnect code. So purging of entry is deliberately
+ * avoided here. It will be handled in the reconnect code.
+ */
+}
+
int
gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
rpcsvc_event_t event, void *data)
{
- int ret = 0;
+ int ret = 0;
xlator_t *this = NULL;
gf_private_t *priv = NULL;
gf_changelog_t *entry = NULL;
char sock[UNIX_PATH_MAX] = {0,};
+ if (!(event == RPCSVC_EVENT_ACCEPT ||
+ event == RPCSVC_EVENT_DISCONNECT))
+ return 0;
+
entry = mydata;
this = entry->this;
priv = this->private;
@@ -42,29 +158,23 @@ gf_changelog_reborp_rpcsvc_notify (rpcsvc_t *rpc, void *mydata,
case RPCSVC_EVENT_ACCEPT:
ret = unlink (RPC_SOCK(entry));
if (ret != 0)
- gf_log (this->name, GF_LOG_WARNING, "failed to unlink"
- " reverse socket file %s", RPC_SOCK (entry));
+ gf_log (this->name, GF_LOG_WARNING, "failed to unlink "
+ "reverse socket %s", RPC_SOCK (entry));
if (entry->connected)
GF_CHANGELOG_INVOKE_CBK (this, entry->connected,
entry->brick, entry->ptr);
break;
case RPCSVC_EVENT_DISCONNECT:
- LOCK (&priv->lock);
- {
- list_del (&entry->list);
- }
- UNLOCK (&priv->lock);
-
if (entry->disconnected)
GF_CHANGELOG_INVOKE_CBK (this, entry->disconnected,
entry->brick, entry->ptr);
-
- GF_FREE (entry);
- break;
+ /* passthrough */
default:
break;
}
+ /* gf_changelog_check_event (priv, entry, event); */
+
return 0;
}
@@ -140,12 +250,29 @@ __can_process_event (struct gf_event_list *ev, struct gf_event **event)
return 0;
}
-inline void
-__process_event_list (struct gf_event_list *ev, struct gf_event **event)
+void
+pick_event_ordered (struct gf_event_list *ev, struct gf_event **event)
{
- while (list_empty (&ev->events)
- || !__can_process_event (ev, event))
- pthread_cond_wait (&ev->cond, &ev->lock);
+ pthread_mutex_lock (&ev->lock);
+ {
+ while (list_empty (&ev->events)
+ || !__can_process_event (ev, event))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
+void
+pick_event_unordered (struct gf_event_list *ev, struct gf_event **event)
+{
+ pthread_mutex_lock (&ev->lock);
+ {
+ while (list_empty (&ev->events))
+ pthread_cond_wait (&ev->cond, &ev->lock);
+ *event = list_first_entry (&ev->events, struct gf_event, list);
+ list_del (&(*event)->list);
+ }
+ pthread_mutex_unlock (&ev->lock);
}
void *
@@ -160,14 +287,10 @@ gf_changelog_callback_invoker (void *arg)
ev = arg;
entry = ev->entry;
- this = entry->this;
+ THIS = this = entry->this;
while (1) {
- pthread_mutex_lock (&ev->lock);
- {
- __process_event_list (ev, &event);
- }
- pthread_mutex_unlock (&ev->lock);
+ entry->pickevent (ev, &event);
vec = (struct iovec *) &event->iov;
gf_changelog_invoke_callback (entry, &vec, event->count);
@@ -192,9 +315,36 @@ orderfn (struct list_head *pos1, struct list_head *pos2)
return -1;
}
+void
+queue_ordered_event (struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the ordered event list and wake up listner(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_order (&event->list, &ev->events, orderfn);
+ if (!ev->next_seq)
+ ev->next_seq = event->seq;
+ if (ev->next_seq == event->seq)
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
+void
+queue_unordered_event (struct gf_event_list *ev, struct gf_event *event)
+{
+ /* add event to the tail of the queue and wake up listener(s) */
+ pthread_mutex_lock (&ev->lock);
+ {
+ list_add_tail (&event->list, &ev->events);
+ pthread_cond_signal (&ev->cond);
+ }
+ pthread_mutex_unlock (&ev->lock);
+}
+
int
-gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
- xlator_t *this, gf_changelog_t *entry)
+gf_changelog_event_handler (rpcsvc_request_t *req,
+ xlator_t *this, gf_changelog_t *entry)
{
int i = 0;
size_t payloadlen = 0;
@@ -255,16 +405,8 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
rpc_req.seq, entry->brick, rpc_req.tv_sec,
rpc_req.tv_usec, payloadcnt, payloadlen);
- /* add it to the ordered event list and wake up listner(s) */
- pthread_mutex_lock (&ev->lock);
- {
- list_add_order (&event->list, &ev->events, orderfn);
- if (!ev->next_seq)
- ev->next_seq = event->seq;
- if (ev->next_seq == event->seq)
- pthread_cond_signal (&ev->cond);
- }
- pthread_mutex_unlock (&ev->lock);
+ /* dispatch event */
+ entry->queueevent (ev, event);
/* ack sequence number */
rpc_rsp.op_ret = 0;
@@ -281,76 +423,18 @@ gf_changelog_ordered_event_handler (rpcsvc_request_t *req,
}
int
-gf_changelog_unordered_event_handler (rpcsvc_request_t *req,
- xlator_t *this, gf_changelog_t *entry)
-{
- int i = 0;
- int ret = 0;
- ssize_t len = 0;
- int payloadcnt = 0;
- struct iovec vector[MAX_IOVEC] = {{0,}, };
- changelog_event_req rpc_req = {0,};
- changelog_event_rsp rpc_rsp = {0,};
-
- len = xdr_to_generic (req->msg[0],
- &rpc_req, (xdrproc_t)xdr_changelog_event_req);
- if (len < 0) {
- gf_log (this->name, GF_LOG_ERROR, "xdr decoding failed");
- req->rpc_err = GARBAGE_ARGS;
- goto handle_xdr_error;
- }
-
- /* prepare payload */
- if (len < req->msg[0].iov_len) {
- payloadcnt = 1;
- vector[0].iov_base = (req->msg[0].iov_base + len);
- vector[0].iov_len = (req->msg[0].iov_len - len);
- }
-
- for (i = 1; i < req->count; i++) {
- vector[payloadcnt++] = req->msg[i];
- }
-
- gf_log (this->name, GF_LOG_DEBUG,
- "seq: %lu (time: %lu.%lu), (vec: %d)",
- rpc_req.seq, rpc_req.tv_sec, rpc_req.tv_usec, payloadcnt);
-
- /* invoke callback */
- struct iovec *vec = (struct iovec *) &vector;
- gf_changelog_invoke_callback (entry, &vec, payloadcnt);
-
- /* ack sequence number */
- rpc_rsp.op_ret = 0;
- rpc_rsp.seq = rpc_req.seq;
-
- goto submit_rpc;
-
- handle_xdr_error:
- rpc_rsp.op_ret = -1;
- rpc_rsp.seq = 0; /* invalid */
- submit_rpc:
- return changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL,
- (xdrproc_t)xdr_changelog_event_rsp);
-}
-
-int
gf_changelog_reborp_handle_event (rpcsvc_request_t *req)
{
- int ret = 0;
- xlator_t *this = NULL;
- rpcsvc_t *svc = NULL;
- gf_changelog_t *entry = NULL;
+ xlator_t *this = NULL;
+ rpcsvc_t *svc = NULL;
+ gf_changelog_t *entry = NULL;
svc = rpcsvc_request_service (req);
entry = svc->mydata;
this = THIS = entry->this;
- ret = GF_NEED_ORDERED_EVENTS (entry)
- ? gf_changelog_ordered_event_handler (req, this, entry)
- : gf_changelog_unordered_event_handler (req, this, entry);
-
- return ret;
+ return gf_changelog_event_handler (req, this, entry);
}
rpcsvc_actor_t gf_changelog_reborp_actors[CHANGELOG_REV_PROC_MAX] = {
diff --git a/xlators/features/changelog/lib/src/gf-changelog-rpc.c b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
index c2a4c044d23..270632bc71b 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-rpc.c
+++ b/xlators/features/changelog/lib/src/gf-changelog-rpc.c
@@ -19,17 +19,11 @@ int
gf_changelog_rpc_notify (struct rpc_clnt *rpc,
void *mydata, rpc_clnt_event_t event, void *data)
{
- xlator_t *this = NULL;
-
- this = mydata;
-
switch (event) {
case RPC_CLNT_CONNECT:
rpc_clnt_set_connected (&rpc->conn);
break;
case RPC_CLNT_DISCONNECT:
- rpc_clnt_unset_connected (&rpc->conn);
- break;
case RPC_CLNT_MSG:
case RPC_CLNT_DESTROY:
break;
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index e1cfdb038fa..103a7b01eb0 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
@@ -54,14 +54,20 @@ gf_private_t *gf_changelog_alloc_priv ()
if (!priv)
goto error_return;
INIT_LIST_HEAD (&priv->connections);
+ INIT_LIST_HEAD (&priv->cleanups);
- ret = LOCK_INIT (&priv->lock);
+ ret = pthread_mutex_init (&priv->lock, NULL);
if (ret != 0)
goto free_priv;
- priv->api = NULL;
+ ret = pthread_cond_init (&priv->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+ priv->api = NULL;
return priv;
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&priv->lock);
free_priv:
GF_FREE (priv);
error_return:
@@ -248,21 +254,23 @@ gf_changelog_setup_rpc (xlator_t *this,
return -1;
}
-static void
-gf_cleanup_event (gf_changelog_t *entry)
+int
+gf_cleanup_event (xlator_t *this, struct gf_event_list *ev)
{
- xlator_t *this = NULL;
- struct gf_event_list *ev = NULL;
-
- this = entry->this;
- ev = &entry->event;
-
- (void) gf_thread_cleanup (this, ev->invoker);
+ int ret = 0;
- (void) pthread_mutex_destroy (&ev->lock);
- (void) pthread_cond_destroy (&ev->cond);
+ ret = gf_thread_cleanup (this, ev->invoker);
+ if (ret) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "cannot cleanup callback invoker thread "
+ " [reason: %s]. Not freeing resources",
+ strerror (-ret));
+ return -1;
+ }
ev->entry = NULL;
+
+ return 0;
}
static int
@@ -284,11 +292,17 @@ gf_init_event (gf_changelog_t *entry)
ev->next_seq = 0; /* bootstrap sequencing */
- if (entry->ordered) {
- ret = pthread_create (&ev->invoker, NULL,
- gf_changelog_callback_invoker, ev);
- if (ret != 0)
- goto cleanup_cond;
+ ret = gf_thread_create (&ev->invoker, NULL,
+ gf_changelog_callback_invoker, ev);
+ if (ret != 0)
+ goto cleanup_cond;
+
+ if (GF_NEED_ORDERED_EVENTS (entry)) {
+ entry->pickevent = pick_event_ordered;
+ entry->queueevent = queue_ordered_event;
+ } else {
+ entry->pickevent = pick_event_unordered;
+ entry->queueevent = queue_unordered_event;
}
return 0;
@@ -303,7 +317,7 @@ gf_init_event (gf_changelog_t *entry)
/**
* TODO:
- * - cleanup invoker thread (if ordered mode)
+ * - cleanup invoker thread
* - cleanup event list
* - destroy rpc{-clnt, svc}
*/
@@ -339,6 +353,9 @@ gf_setup_brick_connection (xlator_t *this,
goto error_return;
INIT_LIST_HEAD (&entry->list);
+ LOCK_INIT (&entry->statelock);
+ entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING;
+
entry->notify = brick->filter;
(void) strncpy (entry->brick, brick->brick_path, PATH_MAX);
@@ -346,11 +363,9 @@ gf_setup_brick_connection (xlator_t *this,
entry->invokerxl = xl;
entry->ordered = ordered;
- if (ordered) {
- ret = gf_init_event (entry);
- if (ret)
- goto free_entry;
- }
+ ret = gf_init_event (entry);
+ if (ret)
+ goto free_entry;
entry->fini = brick->fini;
entry->callback = brick->callback;
@@ -362,11 +377,11 @@ gf_setup_brick_connection (xlator_t *this,
goto cleanup_event;
priv->api = entry->ptr; /* pointer to API, if required */
- LOCK (&priv->lock);
+ pthread_mutex_lock (&priv->lock);
{
list_add_tail (&entry->list, &priv->connections);
}
- UNLOCK (&priv->lock);
+ pthread_mutex_unlock (&priv->lock);
ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER);
if (ret)
@@ -374,9 +389,9 @@ gf_setup_brick_connection (xlator_t *this,
return 0;
cleanup_event:
- if (ordered)
- gf_cleanup_event (entry);
+ (void) gf_cleanup_event (this, &entry->event);
free_entry:
+ gf_log (this->name, GF_LOG_DEBUG, "freeing entry %p", entry);
list_del (&entry->list); /* FIXME: kludge for now */
GF_FREE (entry);
error_return:
@@ -436,8 +451,8 @@ gf_changelog_set_master (xlator_t *master, void *xl)
if (!xl) {
/* poller thread */
- ret = pthread_create (&priv->poller,
- NULL, changelog_rpc_poller, THIS);
+ ret = gf_thread_create (&priv->poller,
+ NULL, changelog_rpc_poller, THIS);
if (ret != 0) {
GF_FREE (priv);
gf_log (master->name, GF_LOG_ERROR,
@@ -458,6 +473,7 @@ int
gf_changelog_init (void *xl)
{
int ret = 0;
+ gf_private_t *priv = NULL;
if (master)
return 0;
@@ -474,6 +490,14 @@ gf_changelog_init (void *xl)
if (ret)
goto dealloc_name;
+ priv = master->private;
+ ret = gf_thread_create (&priv->connectionjanitor, NULL,
+ gf_changelog_connection_janitor, master);
+ if (ret != 0) {
+ /* TODO: cleanup priv, mutex (poller thread for !xl) */
+ goto dealloc_name;
+ }
+
return 0;
dealloc_name: