summaryrefslogtreecommitdiffstats
path: root/xlators/protocol/server/src/server-helpers.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/protocol/server/src/server-helpers.c')
-rw-r--r--xlators/protocol/server/src/server-helpers.c1841
1 files changed, 1155 insertions, 686 deletions
diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c
index 9a6932f67..f0b040c74 100644
--- a/xlators/protocol/server/src/server-helpers.c
+++ b/xlators/protocol/server/src/server-helpers.c
@@ -1,20 +1,11 @@
/*
- Copyright (c) 2006-2009 Z RESEARCH, Inc. <http://www.zresearch.com>
+ Copyright (c) 2010-2013 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
- GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- GlusterFS is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see
- <http://www.gnu.org/licenses/>.
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
*/
#ifndef _CONFIG_H
@@ -22,546 +13,771 @@
#include "config.h"
#endif
-#include "server-protocol.h"
+#include "server.h"
#include "server-helpers.h"
+#include <fnmatch.h>
-/* server_loc_fill - derive a loc_t for a given inode number
- *
- * NOTE: make sure that @loc is empty, because any pointers it holds with reference will
- * be leaked after returning from here.
- */
int
-server_loc_fill (loc_t *loc, server_state_t *state,
- ino_t ino, ino_t par,
- const char *name, const char *path)
-{
- inode_t *inode = NULL;
- inode_t *parent = NULL;
- int32_t ret = -1;
- char *dentry_path = NULL;
-
-
- GF_VALIDATE_OR_GOTO ("server", loc, out);
- GF_VALIDATE_OR_GOTO ("server", state, out);
- GF_VALIDATE_OR_GOTO ("server", path, out);
-
- /* anything beyond this point is success */
- ret = 0;
- loc->ino = ino;
- inode = loc->inode;
- if (inode == NULL) {
- if (ino)
- inode = inode_search (state->itable, ino, NULL);
-
- if ((inode == NULL) &&
- (par && name))
- inode = inode_search (state->itable, par, name);
-
- loc->inode = inode;
- if (inode)
- loc->ino = inode->ino;
- }
-
- parent = loc->parent;
- if (parent == NULL) {
- if (inode)
- parent = inode_parent (inode, par, name);
- else
- parent = inode_search (state->itable, par, NULL);
- loc->parent = parent;
- }
-
- if (name && parent) {
- ret = inode_path (parent, name, &dentry_path);
- if (ret < 0) {
- gf_log (state->bound_xl->name, GF_LOG_DEBUG,
- "failed to build path for %"PRId64"/%s: %s",
- parent->ino, name, strerror (-ret));
- }
- } else if (inode) {
- ret = inode_path (inode, NULL, &dentry_path);
- if (ret < 0) {
- gf_log (state->bound_xl->name, GF_LOG_DEBUG,
- "failed to build path for %"PRId64": %s",
- inode->ino, strerror (-ret));
-
- inode_unref (loc->inode);
- loc->inode = NULL;
- }
- }
-
- if (dentry_path) {
- if (strcmp (dentry_path, path)) {
- gf_log (state->bound_xl->name, GF_LOG_DEBUG,
- "paths differ for inode(%"PRId64"): "
- "client path = %s. dentry path = %s",
- ino, path, dentry_path);
- }
-
- loc->path = dentry_path;
- loc->name = strrchr (loc->path, '/');
- if (loc->name)
- loc->name++;
- } else {
- loc->path = strdup (path);
- loc->name = strrchr (loc->path, '/');
- if (loc->name)
- loc->name++;
- }
+server_decode_groups (call_frame_t *frame, rpcsvc_request_t *req)
+{
+ int i = 0;
+
+ GF_VALIDATE_OR_GOTO ("server", frame, out);
+ GF_VALIDATE_OR_GOTO ("server", req, out);
+
+ if (call_stack_alloc_groups (frame->root, req->auxgidcount) != 0)
+ return -1;
+
+ frame->root->ngrps = req->auxgidcount;
+ if (frame->root->ngrps == 0)
+ return 0;
+ if (frame->root->ngrps > GF_MAX_AUX_GROUPS)
+ return -1;
+
+ for (; i < frame->root->ngrps; ++i)
+ frame->root->groups[i] = req->auxgids[i];
out:
- return ret;
+ return 0;
}
-/*
- * stat_to_str - convert struct stat to a ASCII string
- * @stbuf: struct stat pointer
- *
- * not for external reference
- */
-char *
-stat_to_str (struct stat *stbuf)
-{
- char *tmp_buf = NULL;
-
- uint64_t dev = stbuf->st_dev;
- uint64_t ino = stbuf->st_ino;
- uint32_t mode = stbuf->st_mode;
- uint32_t nlink = stbuf->st_nlink;
- uint32_t uid = stbuf->st_uid;
- uint32_t gid = stbuf->st_gid;
- uint64_t rdev = stbuf->st_rdev;
- uint64_t size = stbuf->st_size;
- uint32_t blksize = stbuf->st_blksize;
- uint64_t blocks = stbuf->st_blocks;
- uint32_t atime = stbuf->st_atime;
- uint32_t mtime = stbuf->st_mtime;
- uint32_t ctime = stbuf->st_ctime;
-
- uint32_t atime_nsec = ST_ATIM_NSEC(stbuf);
- uint32_t mtime_nsec = ST_MTIM_NSEC(stbuf);
- uint32_t ctime_nsec = ST_CTIM_NSEC(stbuf);
-
-
- asprintf (&tmp_buf,
- GF_STAT_PRINT_FMT_STR,
- dev,
- ino,
- mode,
- nlink,
- uid,
- gid,
- rdev,
- size,
- blksize,
- blocks,
- atime,
- atime_nsec,
- mtime,
- mtime_nsec,
- ctime,
- ctime_nsec);
-
- return tmp_buf;
+
+void
+server_loc_wipe (loc_t *loc)
+{
+ if (loc->parent) {
+ inode_unref (loc->parent);
+ loc->parent = NULL;
+ }
+
+ if (loc->inode) {
+ inode_unref (loc->inode);
+ loc->inode = NULL;
+ }
+
+ GF_FREE ((void *)loc->path);
}
void
-server_loc_wipe (loc_t *loc)
+server_resolve_wipe (server_resolve_t *resolve)
{
- if (loc->parent)
- inode_unref (loc->parent);
- if (loc->inode)
- inode_unref (loc->inode);
- if (loc->path)
- free ((char *)loc->path);
+ GF_FREE ((void *)resolve->path);
+
+ GF_FREE ((void *)resolve->bname);
+
+ loc_wipe (&resolve->resolve_loc);
}
+
void
free_state (server_state_t *state)
{
- transport_t *trans = NULL;
+ if (state->xprt) {
+ rpc_transport_unref (state->xprt);
+ state->xprt = NULL;
+ }
+ if (state->fd) {
+ fd_unref (state->fd);
+ state->fd = NULL;
+ }
+
+ if (state->params) {
+ dict_unref (state->params);
+ state->params = NULL;
+ }
+
+ if (state->iobref) {
+ iobref_unref (state->iobref);
+ state->iobref = NULL;
+ }
+
+ if (state->iobuf) {
+ iobuf_unref (state->iobuf);
+ state->iobuf = NULL;
+ }
+
+ if (state->dict) {
+ dict_unref (state->dict);
+ state->dict = NULL;
+ }
+
+ if (state->xdata) {
+ dict_unref (state->xdata);
+ state->xdata = NULL;
+ }
+
+ GF_FREE ((void *)state->volume);
+
+ GF_FREE ((void *)state->name);
+
+ server_loc_wipe (&state->loc);
+ server_loc_wipe (&state->loc2);
+
+ server_resolve_wipe (&state->resolve);
+ server_resolve_wipe (&state->resolve2);
+
+ GF_FREE (state);
+}
+
+
+static int
+server_connection_cleanup_flush_cbk (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret,
+ int32_t op_errno, dict_t *xdata)
+{
+ int32_t ret = -1;
+ fd_t *fd = NULL;
+ client_t *client = NULL;
+
+ GF_VALIDATE_OR_GOTO ("server", this, out);
+ GF_VALIDATE_OR_GOTO ("server", cookie, out);
+ GF_VALIDATE_OR_GOTO ("server", frame, out);
+
+ fd = frame->local;
+ client = frame->root->client;
+
+ fd_unref (fd);
+ frame->local = NULL;
+
+ gf_client_unref (client);
+ STACK_DESTROY (frame->root);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+
+static int
+do_fd_cleanup (xlator_t *this, client_t* client, fdentry_t *fdentries, int fd_count)
+{
+ fd_t *fd = NULL;
+ int i = 0, ret = -1;
+ call_frame_t *tmp_frame = NULL;
+ xlator_t *bound_xl = NULL;
+ char *path = NULL;
+
+ GF_VALIDATE_OR_GOTO ("server", this, out);
+ GF_VALIDATE_OR_GOTO ("server", fdentries, out);
+
+ bound_xl = client->bound_xl;
+ for (i = 0;i < fd_count; i++) {
+ fd = fdentries[i].fd;
+
+ if (fd != NULL) {
+ tmp_frame = create_frame (this, this->ctx->pool);
+ if (tmp_frame == NULL) {
+ goto out;
+ }
+
+ GF_ASSERT (fd->inode);
+
+ ret = inode_path (fd->inode, NULL, &path);
+
+ if (ret > 0) {
+ gf_log (this->name, GF_LOG_INFO,
+ "fd cleanup on %s", path);
+ GF_FREE (path);
+ } else {
+
+ gf_log (this->name, GF_LOG_INFO,
+ "fd cleanup on inode with gfid %s",
+ uuid_utoa (fd->inode->gfid));
+ }
+
+ tmp_frame->local = fd;
+
+ tmp_frame->root->pid = 0;
+ gf_client_ref (client);
+ memset (&tmp_frame->root->lk_owner, 0,
+ sizeof (gf_lkowner_t));
+
+ STACK_WIND (tmp_frame,
+ server_connection_cleanup_flush_cbk,
+ bound_xl, bound_xl->fops->flush, fd, NULL);
+ }
+ }
+
+ GF_FREE (fdentries);
+ ret = 0;
+
+out:
+ return ret;
+}
+
+
+int
+server_connection_cleanup (xlator_t *this, client_t *client,
+ int32_t flags)
+{
+ server_ctx_t *serv_ctx = NULL;
+ fdentry_t *fdentries = NULL;
+ uint32_t fd_count = 0;
+ int cd_ret = 0;
+ int ret = 0;
+
+ GF_VALIDATE_OR_GOTO (this->name, this, out);
+ GF_VALIDATE_OR_GOTO (this->name, client, out);
+ GF_VALIDATE_OR_GOTO (this->name, flags, out);
+
+ serv_ctx = server_ctx_get (client, client->this);
+
+ if (serv_ctx == NULL) {
+ gf_log (this->name, GF_LOG_INFO, "server_ctx_get() failed");
+ goto out;
+ }
+
+ LOCK (&serv_ctx->fdtable_lock);
+ {
+ if (serv_ctx->fdtable && (flags & POSIX_LOCKS))
+ fdentries = gf_fd_fdtable_get_all_fds (serv_ctx->fdtable,
+ &fd_count);
+ }
+ UNLOCK (&serv_ctx->fdtable_lock);
+
+ if (client->bound_xl == NULL)
+ goto out;
+
+ if (flags & INTERNAL_LOCKS) {
+ cd_ret = gf_client_disconnect (client);
+ }
+
+ if (fdentries != NULL)
+ ret = do_fd_cleanup (this, client, fdentries, fd_count);
+ else
+ gf_log (this->name, GF_LOG_INFO, "no fdentries to clean");
+
+ if (cd_ret || ret)
+ ret = -1;
+
+out:
+ return ret;
+}
+
+
+static call_frame_t *
+server_alloc_frame (rpcsvc_request_t *req)
+{
+ call_frame_t *frame = NULL;
+ server_state_t *state = NULL;
+ client_t *client = NULL;
+
+ GF_VALIDATE_OR_GOTO ("server", req, out);
+ GF_VALIDATE_OR_GOTO ("server", req->trans, out);
+ GF_VALIDATE_OR_GOTO ("server", req->svc, out);
+ GF_VALIDATE_OR_GOTO ("server", req->svc->ctx, out);
+
+ client = req->trans->xl_private;
+ GF_VALIDATE_OR_GOTO ("server", client, out);
- trans = state->trans;
+ frame = create_frame (client->this, req->svc->ctx->pool);
+ if (!frame)
+ goto out;
- if (state->fd)
- fd_unref (state->fd);
+ state = GF_CALLOC (1, sizeof (*state), gf_server_mt_state_t);
+ if (!state)
+ goto out;
- transport_unref (trans);
-
- if (state->xattr_req)
- dict_unref (state->xattr_req);
+ if (client->bound_xl)
+ state->itable = client->bound_xl->itable;
- FREE (state);
+ state->xprt = rpc_transport_ref (req->trans);
+ state->resolve.fd_no = -1;
+ state->resolve2.fd_no = -1;
+
+ frame->root->client = client;
+ frame->root->state = state; /* which socket */
+ frame->root->unique = 0; /* which call */
+
+ frame->this = client->this;
+out:
+ return frame;
}
call_frame_t *
-server_copy_frame (call_frame_t *frame)
+get_frame_from_request (rpcsvc_request_t *req)
{
- call_frame_t *new_frame = NULL;
- server_state_t *state = NULL, *new_state = NULL;
+ call_frame_t *frame = NULL;
+ client_t *client = NULL;
+
+ GF_VALIDATE_OR_GOTO ("server", req, out);
+
+ client = req->trans->xl_private;
- state = frame->root->state;
+ frame = server_alloc_frame (req);
+ if (!frame)
+ goto out;
- new_frame = copy_frame (frame);
+ frame->root->op = req->procnum;
- new_state = CALLOC (1, sizeof (server_state_t));
+ frame->root->unique = req->xid;
- new_frame->root->op = frame->root->op;
- new_frame->root->type = frame->root->type;
- new_frame->root->trans = state->trans;
- new_frame->root->state = new_state;
+ frame->root->uid = req->uid;
+ frame->root->gid = req->gid;
+ frame->root->pid = req->pid;
+ gf_client_ref (client);
+ frame->root->client = client;
+ frame->root->lk_owner = req->lk_owner;
- new_state->bound_xl = state->bound_xl;
- new_state->trans = transport_ref (state->trans);
- new_state->itable = state->itable;
+ server_decode_groups (frame, req);
- return new_frame;
+ frame->local = req;
+out:
+ return frame;
}
-int32_t
-gf_add_locker (struct _lock_table *table,
- const char *volume,
- loc_t *loc,
- fd_t *fd,
- pid_t pid)
-{
- int32_t ret = -1;
- struct _locker *new = NULL;
- uint8_t dir = 0;
-
- new = CALLOC (1, sizeof (struct _locker));
- if (new == NULL) {
- gf_log ("server", GF_LOG_ERROR,
- "failed to allocate memory for \'struct _locker\'");
- goto out;
- }
- INIT_LIST_HEAD (&new->lockers);
-
- new->volume = strdup (volume);
-
- if (fd == NULL) {
- loc_copy (&new->loc, loc);
- dir = S_ISDIR (new->loc.inode->st_mode);
- } else {
- new->fd = fd_ref (fd);
- dir = S_ISDIR (fd->inode->st_mode);
- }
-
- new->pid = pid;
-
- LOCK (&table->lock);
- {
- if (dir)
- list_add_tail (&new->lockers, &table->dir_lockers);
- else
- list_add_tail (&new->lockers, &table->file_lockers);
- }
- UNLOCK (&table->lock);
+
+int
+server_build_config (xlator_t *this, server_conf_t *conf)
+{
+ data_t *data = NULL;
+ int ret = -1;
+ struct stat buf = {0,};
+
+ GF_VALIDATE_OR_GOTO ("server", this, out);
+ GF_VALIDATE_OR_GOTO ("server", conf, out);
+
+ ret = dict_get_int32 (this->options, "inode-lru-limit",
+ &conf->inode_lru_limit);
+ if (ret < 0) {
+ conf->inode_lru_limit = 16384;
+ }
+
+ conf->verify_volfile = 1;
+ data = dict_get (this->options, "verify-volfile-checksum");
+ if (data) {
+ ret = gf_string2boolean(data->data, &conf->verify_volfile);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "wrong value for 'verify-volfile-checksum', "
+ "Neglecting option");
+ }
+ }
+
+ data = dict_get (this->options, "trace");
+ if (data) {
+ ret = gf_string2boolean (data->data, &conf->trace);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "'trace' takes on only boolean values. "
+ "Neglecting option");
+ }
+ }
+
+ /* TODO: build_rpc_config (); */
+ ret = dict_get_int32 (this->options, "limits.transaction-size",
+ &conf->rpc_conf.max_block_size);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "defaulting limits.transaction-size to %d",
+ DEFAULT_BLOCK_SIZE);
+ conf->rpc_conf.max_block_size = DEFAULT_BLOCK_SIZE;
+ }
+
+ data = dict_get (this->options, "config-directory");
+ if (data) {
+ /* Check whether the specified directory exists,
+ or directory specified is non standard */
+ ret = stat (data->data, &buf);
+ if ((ret != 0) || !S_ISDIR (buf.st_mode)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Directory '%s' doesn't exist, exiting.",
+ data->data);
+ ret = -1;
+ goto out;
+ }
+ /* Make sure that conf-dir doesn't contain ".." in path */
+ if ((gf_strstr (data->data, "/", "..")) == -1) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: invalid conf_dir", data->data);
+ goto out;
+ }
+
+ conf->conf_dir = gf_strdup (data->data);
+ }
+ ret = 0;
out:
- return ret;
+ return ret;
}
-int32_t
-gf_del_locker (struct _lock_table *table,
- const char *volume,
- loc_t *loc,
- fd_t *fd,
- pid_t pid)
-{
- struct _locker *locker = NULL, *tmp = NULL;
- int32_t ret = 0;
- uint8_t dir = 0;
- struct list_head *head = NULL;
- struct list_head del;
-
- INIT_LIST_HEAD (&del);
-
- if (fd) {
- dir = S_ISDIR (fd->inode->st_mode);
- } else {
- dir = S_ISDIR (loc->inode->st_mode);
- }
-
- LOCK (&table->lock);
- {
- if (dir) {
- head = &table->dir_lockers;
- } else {
- head = &table->file_lockers;
- }
-
- list_for_each_entry_safe (locker, tmp, head, lockers) {
- if (locker->fd &&
- fd &&
- (locker->fd == fd) && (locker->pid == pid)
- && !strcmp (locker->volume, volume)) {
- list_move_tail (&locker->lockers, &del);
- } else if (locker->loc.inode &&
- loc &&
- (locker->loc.inode == loc->inode) &&
- (locker->pid == pid)
- && !strcmp (locker->volume, volume)) {
- list_move_tail (&locker->lockers, &del);
- }
- }
- }
- UNLOCK (&table->lock);
-
- tmp = NULL;
- locker = NULL;
-
- list_for_each_entry_safe (locker, tmp, &del, lockers) {
- list_del_init (&locker->lockers);
- if (locker->fd)
- fd_unref (locker->fd);
- else
- loc_wipe (&locker->loc);
-
- free (locker->volume);
- free (locker);
- }
-
- return ret;
+
+void
+print_caller (char *str, int size, call_frame_t *frame)
+{
+ server_state_t *state = NULL;
+
+ GF_VALIDATE_OR_GOTO ("server", str, out);
+ GF_VALIDATE_OR_GOTO ("server", frame, out);
+
+ state = CALL_STATE (frame);
+
+ snprintf (str, size,
+ " Callid=%"PRId64", Client=%s",
+ frame->root->unique,
+ state->xprt->peerinfo.identifier);
+
+out:
+ return;
}
-int32_t
-gf_direntry_to_bin (dir_entry_t *head,
- char **bufferp)
-{
- dir_entry_t *trav = NULL;
- uint32_t len = 0;
- uint32_t this_len = 0;
- char *buffer = NULL;
- size_t buflen = -1;
- char *ptr = NULL;
- char *tmp_buf = NULL;
-
- trav = head->next;
- while (trav) {
- len += strlen (trav->name);
- len += 1;
- len += strlen (trav->link);
- len += 1; /* for '\n' */
- len += 256; // max possible for statbuf;
- trav = trav->next;
- }
-
- buffer = CALLOC (1, len);
- if (buffer == NULL) {
- gf_log ("server", GF_LOG_ERROR,
- "failed to allocate memory for buffer");
- goto out;
- }
-
- ptr = buffer;
- trav = head->next;
- while (trav) {
- tmp_buf = stat_to_str (&trav->buf);
- /* tmp_buf will have \n before \0 */
-
- this_len = sprintf (ptr, "%s/%s%s\n",
- trav->name, tmp_buf,
- trav->link);
-
- FREE (tmp_buf);
- trav = trav->next;
- ptr += this_len;
- }
- if (bufferp)
- *bufferp = buffer;
- buflen = strlen (buffer);
-
+
+void
+server_print_resolve (char *str, int size, server_resolve_t *resolve)
+{
+ int filled = 0;
+
+ GF_VALIDATE_OR_GOTO ("server", str, out);
+
+ if (!resolve) {
+ snprintf (str, size, "<nul>");
+ return;
+ }
+
+ filled += snprintf (str + filled, size - filled,
+ " Resolve={");
+ if (resolve->fd_no != -1)
+ filled += snprintf (str + filled, size - filled,
+ "fd=%"PRId64",", (uint64_t) resolve->fd_no);
+ if (resolve->bname)
+ filled += snprintf (str + filled, size - filled,
+ "bname=%s,", resolve->bname);
+ if (resolve->path)
+ filled += snprintf (str + filled, size - filled,
+ "path=%s", resolve->path);
+
+ snprintf (str + filled, size - filled, "}");
out:
- return buflen;
+ return;
}
-static struct _lock_table *
-gf_lock_table_new (void)
+void
+server_print_loc (char *str, int size, loc_t *loc)
{
- struct _lock_table *new = NULL;
+ int filled = 0;
+
+ GF_VALIDATE_OR_GOTO ("server", str, out);
+
+ if (!loc) {
+ snprintf (str, size, "<nul>");
+ return;
+ }
- new = CALLOC (1, sizeof (struct _lock_table));
- if (new == NULL) {
- gf_log ("server-protocol", GF_LOG_CRITICAL,
- "failed to allocate memory for new lock table");
- goto out;
- }
- INIT_LIST_HEAD (&new->dir_lockers);
- INIT_LIST_HEAD (&new->file_lockers);
- LOCK_INIT (&new->lock);
+ filled += snprintf (str + filled, size - filled,
+ " Loc={");
+
+ if (loc->path)
+ filled += snprintf (str + filled, size - filled,
+ "path=%s,", loc->path);
+ if (loc->inode)
+ filled += snprintf (str + filled, size - filled,
+ "inode=%p,", loc->inode);
+ if (loc->parent)
+ filled += snprintf (str + filled, size - filled,
+ "parent=%p", loc->parent);
+
+ snprintf (str + filled, size - filled, "}");
out:
- return new;
+ return;
}
-static int32_t
-server_connection_cleanup_flush_cbk (call_frame_t *frame,
- void *cookie,
- xlator_t *this,
- int32_t op_ret,
- int32_t op_errno)
+void
+server_print_params (char *str, int size, server_state_t *state)
{
- fd_t *fd = NULL;
- fd = frame->local;
+ int filled = 0;
+
+ GF_VALIDATE_OR_GOTO ("server", str, out);
+
+ filled += snprintf (str + filled, size - filled,
+ " Params={");
+
+ if (state->fd)
+ filled += snprintf (str + filled, size - filled,
+ "fd=%p,", state->fd);
+ if (state->valid)
+ filled += snprintf (str + filled, size - filled,
+ "valid=%d,", state->valid);
+ if (state->flags)
+ filled += snprintf (str + filled, size - filled,
+ "flags=%d,", state->flags);
+ if (state->wbflags)
+ filled += snprintf (str + filled, size - filled,
+ "wbflags=%d,", state->wbflags);
+ if (state->size)
+ filled += snprintf (str + filled, size - filled,
+ "size=%zu,", state->size);
+ if (state->offset)
+ filled += snprintf (str + filled, size - filled,
+ "offset=%"PRId64",", state->offset);
+ if (state->cmd)
+ filled += snprintf (str + filled, size - filled,
+ "cmd=%d,", state->cmd);
+ if (state->type)
+ filled += snprintf (str + filled, size - filled,
+ "type=%d,", state->type);
+ if (state->name)
+ filled += snprintf (str + filled, size - filled,
+ "name=%s,", state->name);
+ if (state->mask)
+ filled += snprintf (str + filled, size - filled,
+ "mask=%d,", state->mask);
+ if (state->volume)
+ filled += snprintf (str + filled, size - filled,
+ "volume=%s,", state->volume);
+
+/* FIXME
+ snprintf (str + filled, size - filled,
+ "bound_xl=%s}", state->client->bound_xl->name);
+*/
+out:
+ return;
+}
- fd_unref (fd);
- frame->local = NULL;
- STACK_DESTROY (frame->root);
- return 0;
+int
+server_resolve_is_empty (server_resolve_t *resolve)
+{
+ if (resolve->fd_no != -1)
+ return 0;
+
+ if (resolve->path != 0)
+ return 0;
+
+ if (resolve->bname != 0)
+ return 0;
+
+ return 1;
+}
+
+
+void
+server_print_reply (call_frame_t *frame, int op_ret, int op_errno)
+{
+ server_conf_t *conf = NULL;
+ server_state_t *state = NULL;
+ xlator_t *this = NULL;
+ char caller[512];
+ char fdstr[32];
+ char *op = "UNKNOWN";
+
+ GF_VALIDATE_OR_GOTO ("server", frame, out);
+
+ this = frame->this;
+ conf = this->private;
+
+ GF_VALIDATE_OR_GOTO ("server", conf, out);
+ GF_VALIDATE_OR_GOTO ("server", conf->trace, out);
+
+ state = CALL_STATE (frame);
+
+ print_caller (caller, 256, frame);
+
+ switch (frame->root->type) {
+ case GF_OP_TYPE_FOP:
+ op = (char *)gf_fop_list[frame->root->op];
+ break;
+ default:
+ op = "";
+ }
+
+ fdstr[0] = '\0';
+ if (state->fd)
+ snprintf (fdstr, 32, " fd=%p", state->fd);
+
+ gf_log (this->name, GF_LOG_INFO,
+ "%s%s => (%d, %d)%s",
+ op, caller, op_ret, op_errno, fdstr);
+out:
+ return;
+}
+
+
+void
+server_print_request (call_frame_t *frame)
+{
+ server_conf_t *conf = NULL;
+ xlator_t *this = NULL;
+ server_state_t *state = NULL;
+ char *op = "UNKNOWN";
+ char resolve_vars[256];
+ char resolve2_vars[256];
+ char loc_vars[256];
+ char loc2_vars[256];
+ char other_vars[512];
+ char caller[512];
+
+ GF_VALIDATE_OR_GOTO ("server", frame, out);
+
+ this = frame->this;
+ conf = this->private;
+
+ GF_VALIDATE_OR_GOTO ("server", conf, out);
+
+ if (!conf->trace)
+ goto out;
+
+ state = CALL_STATE (frame);
+
+ memset (resolve_vars, '\0', 256);
+ memset (resolve2_vars, '\0', 256);
+ memset (loc_vars, '\0', 256);
+ memset (loc2_vars, '\0', 256);
+ memset (other_vars, '\0', 256);
+
+ print_caller (caller, 256, frame);
+
+ if (!server_resolve_is_empty (&state->resolve)) {
+ server_print_resolve (resolve_vars, 256, &state->resolve);
+ server_print_loc (loc_vars, 256, &state->loc);
+ }
+
+ if (!server_resolve_is_empty (&state->resolve2)) {
+ server_print_resolve (resolve2_vars, 256, &state->resolve2);
+ server_print_loc (loc2_vars, 256, &state->loc2);
+ }
+
+ server_print_params (other_vars, 512, state);
+
+ switch (frame->root->type) {
+ case GF_OP_TYPE_FOP:
+ op = (char *)gf_fop_list[frame->root->op];
+ break;
+ default:
+ op = "";
+ break;
+ }
+
+ gf_log (this->name, GF_LOG_INFO,
+ "%s%s%s%s%s%s%s",
+ op, caller,
+ resolve_vars, loc_vars, resolve2_vars, loc2_vars, other_vars);
+out:
+ return;
}
int
-server_connection_cleanup (xlator_t *this, server_connection_t *conn)
-{
- call_frame_t *frame = NULL, *tmp_frame = NULL;
- xlator_t *bound_xl = NULL;
- server_state_t *state = NULL;
- struct list_head file_lockers;
- struct list_head dir_lockers;
- struct _lock_table *ltable = NULL;
- struct _locker *locker = NULL, *tmp = NULL;
- struct flock flock = {0,};
- fd_t *fd = NULL;
- uint32_t fd_count = 0;
- fd_t **fds = NULL;
- int32_t i = 0;
-
- bound_xl = (xlator_t *) (conn->bound_xl);
-
- if (bound_xl) {
- /* trans will have ref_count = 1 after this call, but its
- ok since this function is called in
- GF_EVENT_TRANSPORT_CLEANUP */
- frame = create_frame (this, this->ctx->pool);
-
- pthread_mutex_lock (&(conn->lock));
- {
- if (conn->ltable) {
- ltable = conn->ltable;
- conn->ltable = gf_lock_table_new ();
- }
- }
- pthread_mutex_unlock (&conn->lock);
-
- INIT_LIST_HEAD (&file_lockers);
- INIT_LIST_HEAD (&dir_lockers);
-
- LOCK (&ltable->lock);
- {
- list_splice_init (&ltable->file_lockers,
- &file_lockers);
-
- list_splice_init (&ltable->dir_lockers, &dir_lockers);
- }
- UNLOCK (&ltable->lock);
- free (ltable);
-
- flock.l_type = F_UNLCK;
- flock.l_start = 0;
- flock.l_len = 0;
- list_for_each_entry_safe (locker,
- tmp, &file_lockers, lockers) {
- tmp_frame = copy_frame (frame);
- /*
- pid = 0 is a special case that tells posix-locks
- to release all locks from this transport
- */
- tmp_frame->root->pid = 0;
- tmp_frame->root->trans = conn;
-
- if (locker->fd) {
- STACK_WIND (tmp_frame, server_nop_cbk,
- bound_xl,
- bound_xl->fops->finodelk,
- locker->volume,
- locker->fd, F_SETLK, &flock);
- fd_unref (locker->fd);
- } else {
- STACK_WIND (tmp_frame, server_nop_cbk,
- bound_xl,
- bound_xl->fops->inodelk,
- locker->volume,
- &(locker->loc), F_SETLK, &flock);
- loc_wipe (&locker->loc);
- }
-
- free (locker->volume);
-
- list_del_init (&locker->lockers);
- free (locker);
- }
-
- tmp = NULL;
- locker = NULL;
- list_for_each_entry_safe (locker, tmp, &dir_lockers, lockers) {
- tmp_frame = copy_frame (frame);
-
- tmp_frame->root->pid = 0;
- tmp_frame->root->trans = conn;
-
- if (locker->fd) {
- STACK_WIND (tmp_frame, server_nop_cbk,
- bound_xl,
- bound_xl->fops->fentrylk,
- locker->volume,
- locker->fd, NULL,
- ENTRYLK_UNLOCK, ENTRYLK_WRLCK);
- fd_unref (locker->fd);
- } else {
- STACK_WIND (tmp_frame, server_nop_cbk,
- bound_xl,
- bound_xl->fops->entrylk,
- locker->volume,
- &(locker->loc), NULL,
- ENTRYLK_UNLOCK, ENTRYLK_WRLCK);
- loc_wipe (&locker->loc);
- }
-
- free (locker->volume);
-
- list_del_init (&locker->lockers);
- free (locker);
- }
-
- pthread_mutex_lock (&conn->lock);
- {
- if (conn->fdtable) {
- fds = gf_fd_fdtable_get_all_fds (conn->fdtable,
- &fd_count);
+serialize_rsp_direntp (gf_dirent_t *entries, gfs3_readdirp_rsp *rsp)
+{
+ gf_dirent_t *entry = NULL;
+ gfs3_dirplist *trav = NULL;
+ gfs3_dirplist *prev = NULL;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("server", entries, out);
+ GF_VALIDATE_OR_GOTO ("server", rsp, out);
+
+ list_for_each_entry (entry, &entries->list, list) {
+ trav = GF_CALLOC (1, sizeof (*trav), gf_server_mt_dirent_rsp_t);
+ if (!trav)
+ goto out;
+
+ trav->d_ino = entry->d_ino;
+ trav->d_off = entry->d_off;
+ trav->d_len = entry->d_len;
+ trav->d_type = entry->d_type;
+ trav->name = entry->d_name;
+
+ gf_stat_from_iatt (&trav->stat, &entry->d_stat);
+
+ /* if 'dict' is present, pack it */
+ if (entry->dict) {
+ trav->dict.dict_len = dict_serialized_length (entry->dict);
+ if (trav->dict.dict_len < 0) {
+ gf_log (THIS->name, GF_LOG_ERROR,
+ "failed to get serialized length "
+ "of reply dict");
+ errno = EINVAL;
+ trav->dict.dict_len = 0;
+ goto out;
}
- }
- pthread_mutex_unlock (&conn->lock);
-
- if (fds != NULL) {
- for (i = 0;i < fd_count; i++) {
- fd = fds[i];
-
- if (fd != NULL) {
- tmp_frame = copy_frame (frame);
- tmp_frame->local = fd;
-
- tmp_frame->root->pid = 0;
- tmp_frame->root->trans = conn;
- STACK_WIND (tmp_frame,
- server_connection_cleanup_flush_cbk,
- bound_xl,
- bound_xl->fops->flush,
- fd);
- }
+
+ trav->dict.dict_val = GF_CALLOC (1, trav->dict.dict_len,
+ gf_server_mt_rsp_buf_t);
+ if (!trav->dict.dict_val) {
+ errno = ENOMEM;
+ trav->dict.dict_len = 0;
+ goto out;
+ }
+
+ ret = dict_serialize (entry->dict, trav->dict.dict_val);
+ if (ret < 0) {
+ gf_log (THIS->name, GF_LOG_ERROR,
+ "failed to serialize reply dict");
+ errno = -ret;
+ trav->dict.dict_len = 0;
+ goto out;
}
- FREE (fds);
}
- state = CALL_STATE (frame);
- if (state)
- free (state);
- STACK_DESTROY (frame->root);
+ if (prev)
+ prev->nextentry = trav;
+ else
+ rsp->reply = trav;
+
+ prev = trav;
+ trav = NULL;
+ }
+
+ ret = 0;
+out:
+ GF_FREE (trav);
+
+ return ret;
+}
+
+
+int
+serialize_rsp_dirent (gf_dirent_t *entries, gfs3_readdir_rsp *rsp)
+{
+ gf_dirent_t *entry = NULL;
+ gfs3_dirlist *trav = NULL;
+ gfs3_dirlist *prev = NULL;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("server", entries, out);
+ GF_VALIDATE_OR_GOTO ("server", rsp, out);
+
+ list_for_each_entry (entry, &entries->list, list) {
+ trav = GF_CALLOC (1, sizeof (*trav), gf_server_mt_dirent_rsp_t);
+ if (!trav)
+ goto out;
+ trav->d_ino = entry->d_ino;
+ trav->d_off = entry->d_off;
+ trav->d_len = entry->d_len;
+ trav->d_type = entry->d_type;
+ trav->name = entry->d_name;
+ if (prev)
+ prev->nextentry = trav;
+ else
+ rsp->reply = trav;
+
+ prev = trav;
+ }
+
+ ret = 0;
+out:
+ return ret;
+}
+
+
+int
+readdir_rsp_cleanup (gfs3_readdir_rsp *rsp)
+{
+ gfs3_dirlist *prev = NULL;
+ gfs3_dirlist *trav = NULL;
+
+ trav = rsp->reply;
+ prev = trav;
+ while (trav) {
+ trav = trav->nextentry;
+ GF_FREE (prev);
+ prev = trav;
}
return 0;
@@ -569,224 +785,477 @@ server_connection_cleanup (xlator_t *this, server_connection_t *conn)
int
-server_connection_destroy (xlator_t *this, server_connection_t *conn)
-{
- call_frame_t *frame = NULL, *tmp_frame = NULL;
- xlator_t *bound_xl = NULL;
- int32_t ret = -1;
- server_state_t *state = NULL;
- struct list_head file_lockers;
- struct list_head dir_lockers;
- struct _lock_table *ltable = NULL;
- struct _locker *locker = NULL, *tmp = NULL;
- struct flock flock = {0,};
- fd_t *fd = NULL;
- int32_t i = 0;
- fd_t **fds = NULL;
- uint32_t fd_count = 0;
-
-
- bound_xl = (xlator_t *) (conn->bound_xl);
-
- if (bound_xl) {
- /* trans will have ref_count = 1 after this call, but its
- ok since this function is called in
- GF_EVENT_TRANSPORT_CLEANUP */
- frame = create_frame (this, this->ctx->pool);
-
- pthread_mutex_lock (&(conn->lock));
- {
- if (conn->ltable) {
- ltable = conn->ltable;
- conn->ltable = NULL;
- }
- }
- pthread_mutex_unlock (&conn->lock);
-
- INIT_LIST_HEAD (&file_lockers);
- INIT_LIST_HEAD (&dir_lockers);
-
- LOCK (&ltable->lock);
- {
- list_splice_init (&ltable->file_lockers,
- &file_lockers);
-
- list_splice_init (&ltable->dir_lockers, &dir_lockers);
- }
- UNLOCK (&ltable->lock);
- free (ltable);
-
- flock.l_type = F_UNLCK;
- flock.l_start = 0;
- flock.l_len = 0;
- list_for_each_entry_safe (locker,
- tmp, &file_lockers, lockers) {
- tmp_frame = copy_frame (frame);
- /*
- pid = 0 is a special case that tells posix-locks
- to release all locks from this transport
- */
- tmp_frame->root->pid = 0;
- tmp_frame->root->trans = conn;
-
- if (locker->fd) {
- STACK_WIND (tmp_frame, server_nop_cbk,
- bound_xl,
- bound_xl->fops->finodelk,
- locker->volume,
- locker->fd, F_SETLK, &flock);
- fd_unref (locker->fd);
- } else {
- STACK_WIND (tmp_frame, server_nop_cbk,
- bound_xl,
- bound_xl->fops->inodelk,
- locker->volume,
- &(locker->loc), F_SETLK, &flock);
- loc_wipe (&locker->loc);
- }
-
- free (locker->volume);
-
- list_del_init (&locker->lockers);
- free (locker);
- }
-
- tmp = NULL;
- locker = NULL;
- list_for_each_entry_safe (locker, tmp, &dir_lockers, lockers) {
- tmp_frame = copy_frame (frame);
-
- tmp_frame->root->pid = 0;
- tmp_frame->root->trans = conn;
-
- if (locker->fd) {
- STACK_WIND (tmp_frame, server_nop_cbk,
- bound_xl,
- bound_xl->fops->fentrylk,
- locker->volume,
- locker->fd, NULL,
- ENTRYLK_UNLOCK, ENTRYLK_WRLCK);
- fd_unref (locker->fd);
- } else {
- STACK_WIND (tmp_frame, server_nop_cbk,
- bound_xl,
- bound_xl->fops->entrylk,
- locker->volume,
- &(locker->loc), NULL,
- ENTRYLK_UNLOCK, ENTRYLK_WRLCK);
- loc_wipe (&locker->loc);
- }
-
- free (locker->volume);
-
- list_del_init (&locker->lockers);
- free (locker);
- }
-
- state = CALL_STATE (frame);
- if (state)
- free (state);
- STACK_DESTROY (frame->root);
-
- pthread_mutex_lock (&(conn->lock));
- {
- if (conn->fdtable) {
- fds = gf_fd_fdtable_get_all_fds (conn->fdtable,
- &fd_count);
- gf_fd_fdtable_destroy (conn->fdtable);
- conn->fdtable = NULL;
- }
- }
- pthread_mutex_unlock (&conn->lock);
-
- if (fds != NULL) {
- for (i = 0; i < fd_count; i++) {
- fd = fds[i];
- if (fd != NULL) {
- tmp_frame = copy_frame (frame);
- tmp_frame->local = fd;
-
- STACK_WIND (tmp_frame,
- server_connection_cleanup_flush_cbk,
- bound_xl,
- bound_xl->fops->flush,
- fd);
- }
+readdirp_rsp_cleanup (gfs3_readdirp_rsp *rsp)
+{
+ gfs3_dirplist *prev = NULL;
+ gfs3_dirplist *trav = NULL;
+
+ trav = rsp->reply;
+ prev = trav;
+ while (trav) {
+ trav = trav->nextentry;
+ GF_FREE (prev->dict.dict_val);
+ GF_FREE (prev);
+ prev = trav;
+ }
+
+ return 0;
+}
+
+
+int
+gf_server_check_getxattr_cmd (call_frame_t *frame, const char *key)
+{
+
+ server_conf_t *conf = NULL;
+ rpc_transport_t *xprt = NULL;
+
+ conf = frame->this->private;
+ if (!conf)
+ return 0;
+
+ if (fnmatch ("*list*mount*point*", key, 0) == 0) {
+ /* list all the client protocol connecting to this process */
+ pthread_mutex_lock (&conf->mutex);
+ {
+ list_for_each_entry (xprt, &conf->xprt_list, list) {
+ gf_log ("mount-point-list", GF_LOG_INFO,
+ "%s", xprt->peerinfo.identifier);
}
- FREE (fds);
}
- }
+ pthread_mutex_unlock (&conf->mutex);
+ }
+
+ /* Add more options/keys here */
+
+ return 0;
+}
+
+
+int
+gf_server_check_setxattr_cmd (call_frame_t *frame, dict_t *dict)
+{
+
+ server_conf_t *conf = NULL;
+ rpc_transport_t *xprt = NULL;
+ uint64_t total_read = 0;
+ uint64_t total_write = 0;
+
+ conf = frame->this->private;
+ if (!conf || !dict)
+ return 0;
+
+ if (dict_foreach_fnmatch (dict, "*io*stat*dump",
+ dict_null_foreach_fn, NULL ) > 0) {
+ list_for_each_entry (xprt, &conf->xprt_list, list) {
+ total_read += xprt->total_bytes_read;
+ total_write += xprt->total_bytes_write;
+ }
+ gf_log ("stats", GF_LOG_INFO,
+ "total-read %"PRIu64", total-write %"PRIu64,
+ total_read, total_write);
+ }
+
+ return 0;
+}
+
+
+gf_boolean_t
+server_cancel_grace_timer (xlator_t *this, client_t *client)
+{
+ server_ctx_t *serv_ctx = NULL;
+ gf_timer_t *timer = NULL;
+ gf_boolean_t cancelled = _gf_false;
+
+ if (!this || !client) {
+ gf_log (THIS->name, GF_LOG_ERROR,
+ "Invalid arguments to cancel connection timer");
+ return cancelled;
+ }
+
+ serv_ctx = server_ctx_get (client, client->this);
+
+ if (serv_ctx == NULL) {
+ gf_log (this->name, GF_LOG_INFO, "server_ctx_get() failed");
+ goto out;
+ }
+
+ LOCK (&serv_ctx->fdtable_lock);
+ {
+ if (serv_ctx->grace_timer) {
+ timer = serv_ctx->grace_timer;
+ serv_ctx->grace_timer = NULL;
+ }
+ }
+ UNLOCK (&serv_ctx->fdtable_lock);
+
+ if (timer) {
+ gf_timer_call_cancel (this->ctx, timer);
+ cancelled = _gf_true;
+ }
+out:
+ return cancelled;
+}
+
+server_ctx_t*
+server_ctx_get (client_t *client, xlator_t *xlator)
+{
+ void *tmp = NULL;
+ server_ctx_t *ctx = NULL;
+
+ client_ctx_get (client, xlator, &tmp);
+
+ ctx = tmp;
+
+ if (ctx != NULL)
+ goto out;
+
+ ctx = GF_CALLOC (1, sizeof (server_ctx_t), gf_server_mt_server_conf_t);
+
+ if (ctx == NULL)
+ goto out;
+
+ /* ctx->lk_version = 0; redundant */
+ ctx->fdtable = gf_fd_fdtable_alloc ();
+
+ if (ctx->fdtable == NULL) {
+ GF_FREE (ctx);
+ ctx = NULL;
+ goto out;
+ }
+
+ LOCK_INIT (&ctx->fdtable_lock);
+
+ if (client_ctx_set (client, xlator, ctx) != 0) {
+ LOCK_DESTROY (&ctx->fdtable_lock);
+ GF_FREE (ctx);
+ ctx = NULL;
+ }
+
+out:
+ return ctx;
+}
+
+int32_t
+gf_barrier_transmit (server_conf_t *conf, gf_barrier_payload_t *payload)
+{
+ gf_barrier_t *barrier = NULL;
+ int32_t ret = -1;
+ client_t *client = NULL;
+ gf_boolean_t lk_heal = _gf_false;
+ call_frame_t *frame = NULL;
+ server_state_t *state = NULL;
+
+ GF_VALIDATE_OR_GOTO ("barrier", conf, out);
+ GF_VALIDATE_OR_GOTO ("barrier", conf->barrier, out);
+ GF_VALIDATE_OR_GOTO ("barrier", payload, out);
+
+ barrier = conf->barrier;
+
+ frame = payload->frame;
+ if (frame) {
+ state = CALL_STATE (frame);
+ frame->local = NULL;
+ client = frame->root->client;
+ }
+ /* currently lk fops are not barrier'ed. This is reflecting code in
+ * server_submit_reply */
+ if (client)
+ lk_heal = ((server_conf_t *) client->this->private)->lk_heal;
+
+ ret = rpcsvc_submit_generic (payload->req, &payload->rsp, 1,
+ payload->payload, payload->payload_count,
+ payload->iobref);
+ iobuf_unref (payload->iob);
+ if (ret == -1) {
+ gf_log_callingfn ("", GF_LOG_ERROR, "Reply submission failed");
+ if (frame && client && !lk_heal) {
+ server_connection_cleanup (frame->this, client,
+ INTERNAL_LOCKS | POSIX_LOCKS);
+ } else {
+ /* TODO: Failure of open(dir), create, inodelk, entrylk
+ or lk fops send failure must be handled specially. */
+ }
+ goto ret;
+ }
+
+ ret = 0;
+ret:
+ if (state) {
+ free_state (state);
+ }
+
+ if (frame) {
+ gf_client_unref (client);
+ STACK_DESTROY (frame->root);
+ }
+
+ if (payload->free_iobref) {
+ iobref_unref (payload->iobref);
+ }
+out:
+ return ret;
+}
+
+gf_barrier_payload_t *
+gf_barrier_dequeue (gf_barrier_t *barrier)
+{
+ gf_barrier_payload_t *payload = NULL;
- gf_log (this->name, GF_LOG_INFO, "destroyed connection of %s",
- conn->id);
+ if (!barrier || list_empty (&barrier->queue))
+ return NULL;
- FREE (conn->id);
- FREE (conn);
+ payload = list_entry (barrier->queue.next,
+ gf_barrier_payload_t, list);
+ if (payload) {
+ list_del_init (&payload->list);
+ barrier->cur_size--;
+ }
- return ret;
+ return payload;
}
-server_connection_t *
-server_connection_get (xlator_t *this, const char *id)
+void*
+gf_barrier_dequeue_start (void *data)
{
- server_connection_t *conn = NULL;
- server_connection_t *trav = NULL;
- server_conf_t *conf = NULL;
+ server_conf_t *conf = NULL;
+ gf_barrier_t *barrier = NULL;
+ gf_barrier_payload_t *payload = NULL;
+
+ conf = (server_conf_t *)data;
+ if (!conf || !conf->barrier)
+ return NULL;
+ barrier = conf->barrier;
+
+ LOCK (&barrier->lock);
+ {
+ while (barrier->cur_size) {
+ payload = gf_barrier_dequeue (barrier);
+ if (payload) {
+ if (gf_barrier_transmit (conf, payload)) {
+ gf_log ("server", GF_LOG_WARNING,
+ "Failed to transmit");
+ }
+ GF_FREE (payload);
+ }
+ }
+ }
+ UNLOCK (&barrier->lock);
+ return NULL;
+}
- conf = this->private;
+void
+gf_barrier_timeout (void *data)
+{
+ server_conf_t *conf = NULL;
+ gf_barrier_t *barrier = NULL;
+ gf_boolean_t need_dequeue = _gf_false;
+
+ conf = (server_conf_t *)data;
+ if (!conf || !conf->barrier)
+ goto out;
+ barrier = conf->barrier;
+
+ gf_log ("", GF_LOG_INFO, "barrier timed-out");
+ LOCK (&barrier->lock);
+ {
+ need_dequeue = barrier->on;
+ barrier->on = _gf_false;
+ }
+ UNLOCK (&barrier->lock);
+
+ if (need_dequeue == _gf_true)
+ gf_barrier_dequeue_start (data);
+out:
+ return;
+}
+
+
+int32_t
+gf_barrier_start (xlator_t *this)
+{
+ server_conf_t *conf = NULL;
+ gf_barrier_t *barrier = NULL;
+ int32_t ret = -1;
+ struct timespec time = {0,};
+
+ conf = this->private;
+
+ GF_VALIDATE_OR_GOTO ("server", this, out);
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+ GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out);
+
+ barrier = conf->barrier;
+
+ gf_log (this->name, GF_LOG_INFO, "barrier start called");
+ LOCK (&barrier->lock);
+ {
+ /* if barrier is on, reset timer */
+ if (barrier->on == _gf_true) {
+ ret = gf_timer_call_cancel (this->ctx, barrier->timer);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to "
+ "unset timer, failing barrier start");
+ goto unlock;
+ }
+ }
- pthread_mutex_lock (&conf->mutex);
- {
- list_for_each_entry (trav, &conf->conns, list) {
- if (!strcmp (id, trav->id)) {
- conn = trav;
- break;
- }
- }
+ barrier->on = _gf_true;
+ time.tv_sec = barrier->time_out;
+ time.tv_nsec = 0;
+
+ barrier->timer = gf_timer_call_after (this->ctx, time,
+ gf_barrier_timeout,
+ (void *)conf);
+ if (!barrier->timer) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to set "
+ "timer, failing barrier start");
+ barrier->on = _gf_false;
+ }
+ }
+unlock:
+ UNLOCK (&barrier->lock);
- if (!conn) {
- conn = (void *) CALLOC (1, sizeof (*conn));
+ ret = 0;
+out:
+ return ret;
+}
- conn->id = strdup (id);
- conn->fdtable = gf_fd_fdtable_alloc ();
- conn->ltable = gf_lock_table_new ();
+int32_t
+gf_barrier_stop (xlator_t *this)
+{
+ server_conf_t *conf = NULL;
+ gf_barrier_t *barrier = NULL;
+ int32_t ret = -1;
+ gf_boolean_t need_dequeue = _gf_false;
- pthread_mutex_init (&conn->lock, NULL);
+ conf = this->private;
- list_add (&conn->list, &conf->conns);
- }
+ GF_VALIDATE_OR_GOTO ("server", this, out);
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+ GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out);
- conn->ref++;
- }
- pthread_mutex_unlock (&conf->mutex);
+ barrier = conf->barrier;
- return conn;
+ gf_log (this->name, GF_LOG_INFO, "barrier stop called");
+ LOCK (&barrier->lock);
+ {
+ need_dequeue = barrier->on;
+ barrier->on = _gf_false;
+ }
+ UNLOCK (&barrier->lock);
+
+ if (need_dequeue == _gf_true) {
+ gf_timer_call_cancel (this->ctx, barrier->timer);
+ ret = gf_thread_create (&conf->barrier_th, NULL,
+ gf_barrier_dequeue_start,
+ conf);
+ if (ret) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "Failed to start un-barriering");
+ goto out;
+ }
+ }
+ ret = 0;
+out:
+ return ret;
}
+int32_t
+gf_barrier_fops_configure (xlator_t *this, gf_barrier_t *barrier, char *str)
+{
+ int32_t ret = -1;
+ char *dup_str = NULL;
+ char *str_tok = NULL;
+ char *save_ptr = NULL;
+ uint64_t fops = 0;
+
+ /* by defaul fsync & flush needs to be barriered */
+
+ fops |= 1 << GFS3_OP_FSYNC;
+ fops |= 1 << GFS3_OP_FLUSH;
+
+ if (!str)
+ goto done;
+
+ dup_str = gf_strdup (str);
+ if (!dup_str)
+ goto done;
+
+ str_tok = strtok_r (dup_str, ",", &save_ptr);
+ if (!str_tok)
+ goto done;
+
+ fops = 0;
+ while (str_tok) {
+ if (!strcmp(str_tok, "writev")) {
+ fops |= ((uint64_t)1 << GFS3_OP_WRITE);
+ } else if (!strcmp(str_tok, "fsync")) {
+ fops |= ((uint64_t)1 << GFS3_OP_FSYNC);
+ } else if (!strcmp(str_tok, "read")) {
+ fops |= ((uint64_t)1 << GFS3_OP_READ);
+ } else if (!strcmp(str_tok, "rename")) {
+ fops |= ((uint64_t)1 << GFS3_OP_RENAME);
+ } else if (!strcmp(str_tok, "flush")) {
+ fops |= ((uint64_t)1 << GFS3_OP_FLUSH);
+ } else if (!strcmp(str_tok, "ftruncate")) {
+ fops |= ((uint64_t)1 << GFS3_OP_FTRUNCATE);
+ } else if (!strcmp(str_tok, "fallocate")) {
+ fops |= ((uint64_t)1 << GFS3_OP_FALLOCATE);
+ } else if (!strcmp(str_tok, "rmdir")) {
+ fops |= ((uint64_t)1 << GFS3_OP_RMDIR);
+ } else {
+ gf_log ("barrier", GF_LOG_ERROR,
+ "Invalid barrier fop %s", str_tok);
+ }
+
+ str_tok = strtok_r (NULL, ",", &save_ptr);
+ }
+done:
+ LOCK (&barrier->lock);
+ {
+ barrier->fops = fops;
+ }
+ UNLOCK (&barrier->lock);
+ ret = 0;
+
+ GF_FREE (dup_str);
+ return ret;
+}
void
-server_connection_put (xlator_t *this, server_connection_t *conn)
+gf_barrier_enqueue (gf_barrier_t *barrier, gf_barrier_payload_t *payload)
+{
+ list_add_tail (&payload->list, &barrier->queue);
+ barrier->cur_size++;
+}
+
+gf_barrier_payload_t *
+gf_barrier_payload (rpcsvc_request_t *req, struct iovec *rsp,
+ call_frame_t *frame, struct iovec *payload_orig,
+ int payloadcount, struct iobref *iobref,
+ struct iobuf *iob, gf_boolean_t free_iobref)
{
- server_conf_t *conf = NULL;
- server_connection_t *todel = NULL;
+ gf_barrier_payload_t *payload = NULL;
- conf = this->private;
+ if (!rsp)
+ return NULL;
- pthread_mutex_lock (&conf->mutex);
- {
- conn->ref--;
+ payload = GF_CALLOC (1, sizeof (*payload),1);
+ if (!payload)
+ return NULL;
- if (!conn->ref) {
- list_del_init (&conn->list);
- todel = conn;
- }
- }
- pthread_mutex_unlock (&conf->mutex);
+ INIT_LIST_HEAD (&payload->list);
- if (todel) {
- server_connection_destroy (this, todel);
- }
+ payload->req = req;
+ memcpy (&payload->rsp, rsp, sizeof (struct iovec));
+ payload->frame = frame;
+ payload->payload = payload_orig;
+ payload->payload_count = payloadcount;
+ payload->iobref = iobref;
+ payload->iob = iob;
+ payload->free_iobref = free_iobref;
- return;
+ return payload;
}