diff options
-rw-r--r-- | libglusterfs/src/xlator.c | 4 | ||||
-rw-r--r-- | xlators/protocol/server/src/server-helpers.c | 112 | ||||
-rw-r--r-- | xlators/protocol/server/src/server-helpers.h | 6 | ||||
-rw-r--r-- | xlators/protocol/server/src/server-protocol.c | 39 | ||||
-rw-r--r-- | xlators/protocol/server/src/server-protocol.h | 7 | ||||
-rw-r--r-- | xlators/storage/posix/src/posix.c | 141 | ||||
-rw-r--r-- | xlators/storage/posix/src/posix.h | 11 |
7 files changed, 305 insertions, 15 deletions
diff --git a/libglusterfs/src/xlator.c b/libglusterfs/src/xlator.c index 44494a944a6..650a844c912 100644 --- a/libglusterfs/src/xlator.c +++ b/libglusterfs/src/xlator.c @@ -870,10 +870,6 @@ xlator_tree_init (xlator_t *xl) */ ret = xlator_init_rec (top); - if (ret == 0 && top->notify) { - top->notify (top, GF_EVENT_PARENT_UP, NULL); - } - return ret; } diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c index 88dada1516e..0e22f0ba5fa 100644 --- a/xlators/protocol/server/src/server-helpers.c +++ b/xlators/protocol/server/src/server-helpers.c @@ -645,13 +645,14 @@ out: int -server_connection_cleanup (xlator_t *this, server_connection_t *conn) +server_connection_cleanup (xlator_t *this, server_connection_t *conn, transport_t *trans) { char do_cleanup = 0; struct _lock_table *ltable = NULL; fdentry_t *fdentries = NULL; uint32_t fd_count = 0; int ret = 0; + int i = 0; if (conn == NULL) { goto out; @@ -659,6 +660,12 @@ server_connection_cleanup (xlator_t *this, server_connection_t *conn) pthread_mutex_lock (&conn->lock); { + for (i = 0; i < TRANSPORTS_PER_SERVER_CONN; i++) { + if (conn->transports[i] == trans) { + conn->transports[i] = NULL; + transport_unref (trans); + } + } conn->active_transports--; if (conn->active_transports == 0) { if (conn->ltable) { @@ -850,11 +857,12 @@ out: server_connection_t * -server_connection_get (xlator_t *this, const char *id) +server_connection_get (xlator_t *this, const char *id, transport_t *trans) { server_connection_t *conn = NULL; server_connection_t *trav = NULL; server_conf_t *conf = NULL; + int i = 0; conf = this->private; @@ -878,10 +886,30 @@ server_connection_get (xlator_t *this, const char *id) list_add (&conn->list, &conf->conns); } + if (conn->active_transports == TRANSPORTS_PER_SERVER_CONN) { + gf_log (this->name, GF_LOG_DEBUG, + "Maximum number of connections allowed is %d", + TRANSPORTS_PER_SERVER_CONN); + goto unlock; + } + + for (i = 0; i < TRANSPORTS_PER_SERVER_CONN; i++) { + if (!conn->transports[i]) + break; + } + + if (i == TRANSPORTS_PER_SERVER_CONN) { + gf_log (this->name, GF_LOG_DEBUG, + "Could not find a vacant slot"); + goto unlock; + } + + conn->transports[i] = transport_ref (trans); conn->ref++; conn->active_transports++; } +unlock: pthread_mutex_unlock (&conf->mutex); return conn; @@ -918,3 +946,83 @@ server_connection_put (xlator_t *this, server_connection_t *conn) out: return; } + +void +server_child_down (xlator_t *this, xlator_t *bound_xl) +{ + server_conf_t *conf = NULL; + server_connection_t *trav = NULL; + transport_t *trans = NULL; + int subvol_idx = 0; + int i = 0; + xlator_list_t *xltrav = NULL; + + conf = this->private; + + if (conf == NULL) + return; + + xltrav = this->children; + + while (xltrav) { + if (xltrav->xlator == bound_xl) + break; + xltrav = xltrav->next; + subvol_idx++; + } + gf_log (this->name, GF_LOG_DEBUG, + "subvolume %s(%d) went down", bound_xl->name, subvol_idx); + + conf->subvol_list[subvol_idx] = 0; + + pthread_mutex_lock (&conf->mutex); + { + if (!list_empty(&conf->conns)) { + list_for_each_entry (trav, &conf->conns, list) { + if (bound_xl == trav->bound_xl) { + gf_log (this->name, GF_LOG_DEBUG, + "disonnecting conn=%p", trav); + for (i = 0; i < TRANSPORTS_PER_SERVER_CONN; i++) + { + trans = trav->transports[i]; + if (trans == NULL) + continue; + gf_log (this->name, GF_LOG_DEBUG, + "disconnecting %p(%d)", + trans, i); + transport_disconnect (trans); + } + } + } + } + } + pthread_mutex_unlock (&conf->mutex); +} + +void +server_child_up (xlator_t *this, xlator_t *bound_xl) +{ + server_conf_t *conf = NULL; + int subvol_idx = 0; + xlator_list_t *xltrav = NULL; + + conf = this->private; + + if (conf == NULL) + return; + + xltrav = this->children; + + while (xltrav) { + if (bound_xl == xltrav->xlator) { + break; + } + subvol_idx++; + xltrav = xltrav->next; + } + + gf_log (this->name, GF_LOG_DEBUG, + "subvolume %s(%d) came up", bound_xl->name, subvol_idx); + + conf->subvol_list[subvol_idx] = 1; +} diff --git a/xlators/protocol/server/src/server-helpers.h b/xlators/protocol/server/src/server-helpers.h index 867035d3334..5c584b00aa7 100644 --- a/xlators/protocol/server/src/server-helpers.h +++ b/xlators/protocol/server/src/server-helpers.h @@ -69,4 +69,10 @@ gf_direntry_to_bin (dir_entry_t *head, char *bufferp); void server_print_request (call_frame_t *frame); +void +server_child_up (xlator_t *this, xlator_t *bound_xl); + +void +server_child_down (xlator_t *this, xlator_t *bound_xl); + #endif /* __SERVER_HELPERS_H__ */ diff --git a/xlators/protocol/server/src/server-protocol.c b/xlators/protocol/server/src/server-protocol.c index 0528699acbe..a7f6294f387 100644 --- a/xlators/protocol/server/src/server-protocol.c +++ b/xlators/protocol/server/src/server-protocol.c @@ -5654,6 +5654,8 @@ mop_setvolume (call_frame_t *frame, xlator_t *bound_xl, char *volfile_key = NULL; uint32_t checksum = 0; int32_t lru_limit = 1024; + xlator_list_t *xltrav = NULL; + int subvol_idx = 0; params = dict_new (); reply = dict_new (); @@ -5695,7 +5697,7 @@ mop_setvolume (call_frame_t *frame, xlator_t *bound_xl, } - conn = server_connection_get (frame->this, process_uuid); + conn = server_connection_get (frame->this, process_uuid, trans); if (trans->xl_private != conn) trans->xl_private = conn; @@ -5862,6 +5864,22 @@ mop_setvolume (call_frame_t *frame, xlator_t *bound_xl, ret = dict_set_uint64 (reply, "transport-ptr", ((uint64_t) (long) trans)); + xltrav = frame->this->children; + while (xltrav) { + if (xltrav->xlator == xl) + break; + xltrav = xltrav->next; + subvol_idx++; + } + + if (conf->subvol_list[subvol_idx] == 0) { + gf_log (xl->name, GF_LOG_DEBUG, + "subvolume %d down (filesystem not accesible), failed to setvolume", subvol_idx); + op_ret = -1; + op_errno = ENOTCONN; + goto fail; + } + fail: dict_len = dict_serialized_length (reply); if (dict_len < 0) { @@ -6550,6 +6568,8 @@ init (xlator_t *this) server_conf_t *conf = NULL; data_t *data = NULL; data_t *trace = NULL; + int i = 0; + xlator_list_t *xltrav = NULL; if (this->children == NULL) { gf_log (this->name, GF_LOG_ERROR, @@ -6635,6 +6655,15 @@ init (xlator_t *this) } } + xltrav = this->children; + + while (xltrav) { + i++; + xltrav = xltrav->next; + } + + conf->subvol_list = calloc (i, sizeof (char)); + #ifndef GF_DARWIN_HOST_OS { struct rlimit lim; @@ -6736,6 +6765,12 @@ notify (xlator_t *this, int32_t event, void *data, ...) } switch (event) { + case GF_EVENT_CHILD_DOWN: + server_child_down (this, data); + break; + case GF_EVENT_CHILD_UP: + server_child_up (this, data); + break; case GF_EVENT_POLLIN: ret = protocol_server_pollin (this, trans); break; @@ -6756,7 +6791,7 @@ notify (xlator_t *this, int32_t event, void *data, ...) * FIXME: shouldn't we check for return value? * what should be done if cleanup fails? */ - server_connection_cleanup (this, trans->xl_private); + server_connection_cleanup (this, trans->xl_private, trans); } } break; diff --git a/xlators/protocol/server/src/server-protocol.h b/xlators/protocol/server/src/server-protocol.h index 78bc138279a..73e7f78911b 100644 --- a/xlators/protocol/server/src/server-protocol.h +++ b/xlators/protocol/server/src/server-protocol.h @@ -37,6 +37,7 @@ #define DEFAULT_BLOCK_SIZE 4194304 /* 4MB */ #define DEFAULT_VOLUME_FILE_PATH CONFDIR "/glusterfs.vol" +#define TRANSPORTS_PER_SERVER_CONN 2 typedef struct _server_state server_state_t; @@ -64,6 +65,7 @@ struct _server_connection { char *id; int ref; int active_transports; + transport_t *transports[TRANSPORTS_PER_SERVER_CONN]; pthread_mutex_t lock; char disconnected; fdtable_t *fdtable; @@ -75,7 +77,7 @@ typedef struct _server_connection server_connection_t; server_connection_t * -server_connection_get (xlator_t *this, const char *id); +server_connection_get (xlator_t *this, const char *id, transport_t *trans); void server_connection_put (xlator_t *this, server_connection_t *conn); @@ -84,7 +86,7 @@ int server_connection_destroy (xlator_t *this, server_connection_t *conn); int -server_connection_cleanup (xlator_t *this, server_connection_t *conn); +server_connection_cleanup (xlator_t *this, server_connection_t *conn, transport_t *trans); int server_nop_cbk (call_frame_t *frame, void *cookie, @@ -107,6 +109,7 @@ typedef struct { pthread_mutex_t mutex; struct list_head conns; gf_boolean_t verify_volfile_checksum; + char *subvol_list; gf_boolean_t trace; } server_conf_t; diff --git a/xlators/storage/posix/src/posix.c b/xlators/storage/posix/src/posix.c index 1ff9a06f9fe..c9342ac4d69 100644 --- a/xlators/storage/posix/src/posix.c +++ b/xlators/storage/posix/src/posix.c @@ -4703,6 +4703,119 @@ posix_inode (xlator_t *this) return 0; } +void +posix_fsping_timer_expired (void *data) +{ + xlator_t *this = NULL; + struct posix_private *priv = NULL; + + this = data; + priv = this->private; + + pthread_mutex_lock (&priv->mutex); + { + if (priv->fsping_timer) { + gf_timer_call_cancel (this->ctx, + priv->fsping_timer); + priv->fsping_timer = NULL; + } + + if (priv->fs_state) { + priv->fs_state = 0; + default_notify (this, GF_EVENT_CHILD_DOWN, NULL); + } + } + pthread_mutex_unlock (&priv->mutex); +} + +void +posix_fsping (void *arg); + +void * +posix_fsping_statvfs (void *arg) +{ + int ret = -1; + xlator_t *this = NULL; + char *root_path = NULL; + struct statvfs buf = {0, }; + struct posix_private *priv = NULL; + struct timeval delta = {0, }; + + this = arg; + priv = this->private; + root_path = POSIX_BASE_PATH (this); + + ret = statvfs (root_path, &buf); + + pthread_mutex_lock (&priv->mutex); + { + if (priv->fsping_timer) { + gf_timer_call_cancel (this->ctx, + priv->fsping_timer); + priv->fsping_timer = NULL; + } + if (ret == 0) { + if (priv->fs_state == 0) { + priv->fs_state = 1; + default_notify (this, GF_EVENT_CHILD_UP, + NULL); + } + } else { + if (priv->fs_state) { + priv->fs_state = 0; + default_notify (this, GF_EVENT_CHILD_DOWN, + NULL); + } + } + } + pthread_mutex_unlock (&priv->mutex); + + delta.tv_sec = POSIX_FSPING_SLEEP_TIME; + priv->fsping_timer = + gf_timer_call_after (this->ctx, + delta, + posix_fsping, + (void *) this); + if (priv->fsping_timer == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "unable to register timer"); + } + return NULL; +} + +void +posix_fsping (void *arg) +{ + xlator_t *this = NULL; + struct posix_private *priv = NULL; + struct timeval delta = {0, }; + + this = arg; + priv = this->private; + + delta.tv_sec = priv->fsping_timeout; + delta.tv_usec = 0; + + if (priv->fsping_timer) { + gf_timer_call_cancel (this->ctx, + priv->fsping_timer); + } + priv->fsping_timer = + gf_timer_call_after (this->ctx, + delta, + posix_fsping_timer_expired, + (void *) this); + + if (priv->fsping_timer == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "unable to register timer"); + /*FIXME: handle error*/ + } + pthread_create (&priv->fsping, + NULL, + posix_fsping_statvfs, + this); +} int32_t posix_rchecksum (call_frame_t *frame, xlator_t *this, @@ -4780,12 +4893,15 @@ notify (xlator_t *this, void *data, ...) { + struct posix_private *priv = NULL; + + priv = this->private; + switch (event) { case GF_EVENT_PARENT_UP: { - /* Tell the parent that posix xlator is up */ - default_notify (this, GF_EVENT_CHILD_UP, data); + posix_fsping ((void *)this); } break; default: @@ -4809,9 +4925,9 @@ init (xlator_t *this) data_t * dir_data = NULL; data_t * tmp_data = NULL; uint64_t time64 = 0; - - int dict_ret = 0; - int32_t janitor_sleep; + int dict_ret = -1; + int fsping_timeout = -1; + int32_t janitor_sleep; dir_data = dict_get (this->options, "directory"); @@ -4908,6 +5024,7 @@ init (xlator_t *this) strcat (_private->trash_path, "/" GF_REPLICATE_TRASH_DIR); LOCK_INIT (&_private->lock); + pthread_mutex_init (&_private->mutex, NULL); ret = gethostname (_private->hostname, 256); if (ret < 0) { @@ -4923,6 +5040,17 @@ init (xlator_t *this) _private->max_write = 1; } + _private->fsping_timeout = POSIX_FSPING_TIMEOUT; + dict_ret = dict_get_int32 (this->options, + "fsping-timeout", + &fsping_timeout); + + if (dict_ret == 0) { + _private->fsping_timeout = fsping_timeout; + } + gf_log (this->name, GF_LOG_DEBUG, + "fsping-timeout set to %d", _private->fsping_timeout); + _private->export_statfs = 1; tmp_data = dict_get (this->options, "export-statfs-size"); if (tmp_data) { @@ -5056,6 +5184,7 @@ fini (xlator_t *this) { struct posix_private *priv = this->private; sys_lremovexattr (priv->base_path, "trusted.glusterfs.test"); + pthread_mutex_destroy (&priv->mutex); FREE (priv); return; } @@ -5132,6 +5261,8 @@ struct volume_options options[] = { .type = GF_OPTION_TYPE_BOOL }, { .key = {"span-devices"}, .type = GF_OPTION_TYPE_INT }, + { .key = {"fsping-timeout"}, + .type = GF_OPTION_TYPE_INT }, { .key = {"background-unlink"}, .type = GF_OPTION_TYPE_BOOL }, { .key = {"janitor-sleep-duration"}, diff --git a/xlators/storage/posix/src/posix.h b/xlators/storage/posix/src/posix.h index f92e256fbc0..92fe8e2515c 100644 --- a/xlators/storage/posix/src/posix.h +++ b/xlators/storage/posix/src/posix.h @@ -29,6 +29,7 @@ #include <unistd.h> #include <sys/types.h> #include <dirent.h> +#include <pthread.h> #include <time.h> #ifdef linux @@ -50,6 +51,7 @@ #include "xlator.h" #include "inode.h" #include "compat.h" +#include "timer.h" /** * posix_fd - internal structure common to file and directory fd's @@ -70,6 +72,7 @@ struct posix_private { int32_t base_path_length; gf_lock_t lock; + pthread_mutex_t mutex; char hostname[256]; /* Statistics, provides activity of the server */ @@ -117,6 +120,11 @@ struct posix_private { int num_devices_to_span; dev_t *st_device; + pthread_t fsping; + gf_timer_t *fsping_timer; + int fsping_timeout; + int fs_state; + /* a global generation number sequence is used to assign generation numbers in sequence. */ @@ -129,6 +137,9 @@ struct posix_private { char * trash_path; }; +#define POSIX_FSPING_SLEEP_TIME 10 +#define POSIX_FSPING_TIMEOUT 10 + #define POSIX_BASE_PATH(this) (((struct posix_private *)this->private)->base_path) #define POSIX_BASE_PATH_LEN(this) (((struct posix_private *)this->private)->base_path_length) |