/* Copyright (c) 2010-2013 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" #endif #include "server.h" #include "server-helpers.h" #include int server_decode_groups (call_frame_t *frame, rpcsvc_request_t *req) { int i = 0; GF_VALIDATE_OR_GOTO ("server", frame, out); GF_VALIDATE_OR_GOTO ("server", req, out); if (call_stack_alloc_groups (frame->root, req->auxgidcount) != 0) return -1; frame->root->ngrps = req->auxgidcount; if (frame->root->ngrps == 0) return 0; /* ngrps cannot be bigger than USHRT_MAX(65535) */ if (frame->root->ngrps > GF_MAX_AUX_GROUPS) return -1; for (; i < frame->root->ngrps; ++i) frame->root->groups[i] = req->auxgids[i]; out: return 0; } void server_loc_wipe (loc_t *loc) { if (loc->parent) { inode_unref (loc->parent); loc->parent = NULL; } if (loc->inode) { inode_unref (loc->inode); loc->inode = NULL; } GF_FREE ((void *)loc->path); } void server_resolve_wipe (server_resolve_t *resolve) { GF_FREE ((void *)resolve->path); GF_FREE ((void *)resolve->bname); loc_wipe (&resolve->resolve_loc); } void free_state (server_state_t *state) { if (state->xprt) { rpc_transport_unref (state->xprt); state->xprt = NULL; } if (state->fd) { fd_unref (state->fd); state->fd = NULL; } if (state->params) { dict_unref (state->params); state->params = NULL; } if (state->iobref) { iobref_unref (state->iobref); state->iobref = NULL; } if (state->iobuf) { iobuf_unref (state->iobuf); state->iobuf = NULL; } if (state->dict) { dict_unref (state->dict); state->dict = NULL; } if (state->xdata) { dict_unref (state->xdata); state->xdata = NULL; } GF_FREE ((void *)state->volume); GF_FREE ((void *)state->name); server_loc_wipe (&state->loc); server_loc_wipe (&state->loc2); server_resolve_wipe (&state->resolve); server_resolve_wipe (&state->resolve2); GF_FREE (state); } static int server_connection_cleanup_flush_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, dict_t *xdata) { int32_t ret = -1; fd_t *fd = NULL; client_t *client = NULL; GF_VALIDATE_OR_GOTO ("server", this, out); GF_VALIDATE_OR_GOTO ("server", cookie, out); GF_VALIDATE_OR_GOTO ("server", frame, out); fd = frame->local; client = frame->root->client; fd_unref (fd); frame->local = NULL; gf_client_unref (client); STACK_DESTROY (frame->root); ret = 0; out: return ret; } static int do_fd_cleanup (xlator_t *this, client_t* client, fdentry_t *fdentries, int fd_count) { fd_t *fd = NULL; int i = 0, ret = -1; call_frame_t *tmp_frame = NULL; xlator_t *bound_xl = NULL; char *path = NULL; GF_VALIDATE_OR_GOTO ("server", this, out); GF_VALIDATE_OR_GOTO ("server", fdentries, out); bound_xl = client->bound_xl; for (i = 0;i < fd_count; i++) { fd = fdentries[i].fd; if (fd != NULL) { tmp_frame = create_frame (this, this->ctx->pool); if (tmp_frame == NULL) { goto out; } GF_ASSERT (fd->inode); ret = inode_path (fd->inode, NULL, &path); if (ret > 0) { gf_log (this->name, GF_LOG_INFO, "fd cleanup on %s", path); GF_FREE (path); } else { gf_log (this->name, GF_LOG_INFO, "fd cleanup on inode with gfid %s", uuid_utoa (fd->inode->gfid)); } tmp_frame->local = fd; tmp_frame->root->pid = 0; gf_client_ref (client); memset (&tmp_frame->root->lk_owner, 0, sizeof (gf_lkowner_t)); STACK_WIND (tmp_frame, server_connection_cleanup_flush_cbk, bound_xl, bound_xl->fops->flush, fd, NULL); } } GF_FREE (fdentries); ret = 0; out: return ret; } int server_connection_cleanup (xlator_t *this, client_t *client, int32_t flags) { server_ctx_t *serv_ctx = NULL; fdentry_t *fdentries = NULL; uint32_t fd_count = 0; int cd_ret = 0; int ret = 0; GF_VALIDATE_OR_GOTO (this->name, this, out); GF_VALIDATE_OR_GOTO (this->name, client, out); GF_VALIDATE_OR_GOTO (this->name, flags, out); serv_ctx = server_ctx_get (client, client->this); if (serv_ctx == NULL) { gf_log (this->name, GF_LOG_INFO, "server_ctx_get() failed"); goto out; } LOCK (&serv_ctx->fdtable_lock); { if (serv_ctx->fdtable && (flags & POSIX_LOCKS)) fdentries = gf_fd_fdtable_get_all_fds (serv_ctx->fdtable, &fd_count); } UNLOCK (&serv_ctx->fdtable_lock); if (client->bound_xl == NULL) goto out; if (flags & INTERNAL_LOCKS) { cd_ret = gf_client_disconnect (client); } if (fdentries != NULL) ret = do_fd_cleanup (this, client, fdentries, fd_count); else gf_log (this->name, GF_LOG_INFO, "no fdentries to clean"); if (cd_ret || ret) ret = -1; out: return ret; } static call_frame_t * server_alloc_frame (rpcsvc_request_t *req) { call_frame_t *frame = NULL; server_state_t *state = NULL; client_t *client = NULL; GF_VALIDATE_OR_GOTO ("server", req, out); GF_VALIDATE_OR_GOTO ("server", req->trans, out); GF_VALIDATE_OR_GOTO ("server", req->svc, out); GF_VALIDATE_OR_GOTO ("server", req->svc->ctx, out); client = req->trans->xl_private; GF_VALIDATE_OR_GOTO ("server", client, out); frame = create_frame (client->this, req->svc->ctx->pool); if (!frame) goto out; state = GF_CALLOC (1, sizeof (*state), gf_server_mt_state_t); if (!state) goto out; if (client->bound_xl) state->itable = client->bound_xl->itable; state->xprt = rpc_transport_ref (req->trans); state->resolve.fd_no = -1; state->resolve2.fd_no = -1; frame->root->client = client; frame->root->state = state; /* which socket */ frame->root->unique = 0; /* which call */ frame->this = client->this; out: return frame; } call_frame_t * get_frame_from_request (rpcsvc_request_t *req) { call_frame_t *frame = NULL; client_t *client = NULL; client_t *tmp_client = NULL; xlator_t *this = NULL; server_conf_t *priv = NULL; clienttable_t *clienttable = NULL; unsigned int i = 0; GF_VALIDATE_OR_GOTO ("server", req, out); client = req->trans->xl_private; frame = server_alloc_frame (req); if (!frame) goto out; frame->root->op = req->procnum; frame->root->unique = req->xid; client = req->trans->xl_private; this = req->trans->xl; priv = this->private; clienttable = this->ctx->clienttable; for (i = 0; i < clienttable->max_clients; i++) { tmp_client = clienttable->cliententries[i].client; if (client == tmp_client) { /* for non trusted clients username and password would not have been set. So for non trusted clients (i.e clients not from the same machine as the brick, and clients from outside the storage pool) do the root-squashing. TODO: If any client within the storage pool (i.e mounting within a machine from the pool but using other machine's ip/hostname from the same pool) is present treat it as a trusted client */ if (!client->auth.username && req->pid != NFS_PID) RPC_AUTH_ROOT_SQUASH (req); /* Problem: If we just check whether the client is trusted client and do not do root squashing for them, then for smb clients and UFO clients root squashing will never happen as they use the fuse mounts done within the trusted pool (i.e they are trusted clients). Solution: To fix it, do root squashing for trusted clients also. If one wants to have a client within the storage pool for which root-squashing does not happen, then the client has to be mounted with --no-root-squash option. But for defrag client and gsyncd client do not do root-squashing. */ if (client->auth.username && req->pid != GF_CLIENT_PID_NO_ROOT_SQUASH && req->pid != GF_CLIENT_PID_GSYNCD && req->pid != GF_CLIENT_PID_DEFRAG) RPC_AUTH_ROOT_SQUASH (req); /* For nfs clients the server processes will be running within the trusted storage pool machines. So if we do not do root-squashing for nfs servers, thinking that its a trusted client, then root-squashing wont work for nfs clients. */ if (req->pid == NFS_PID) RPC_AUTH_ROOT_SQUASH (req); } } frame->root->uid = req->uid; frame->root->gid = req->gid; frame->root->pid = req->pid; gf_client_ref (client); frame->root->client = client; frame->root->lk_owner = req->lk_owner; server_decode_groups (frame, req); frame->local = req; out: return frame; } int server_build_config (xlator_t *this, server_conf_t *conf) { data_t *data = NULL; int ret = -1; struct stat buf = {0,}; GF_VALIDATE_OR_GOTO ("server", this, out); GF_VALIDATE_OR_GOTO ("server", conf, out); ret = dict_get_int32 (this->options, "inode-lru-limit", &conf->inode_lru_limit); if (ret < 0) { conf->inode_lru_limit = 16384; } conf->verify_volfile = 1; data = dict_get (this->options, "verify-volfile-checksum"); if (data) { ret = gf_string2boolean(data->data, &conf->verify_volfile); if (ret != 0) { gf_log (this->name, GF_LOG_WARNING, "wrong value for 'verify-volfile-checksum', " "Neglecting option"); } } data = dict_get (this->options, "trace"); if (data) { ret = gf_string2boolean (data->data, &conf->trace); if (ret != 0) { gf_log (this->name, GF_LOG_WARNING, "'trace' takes on only boolean values. " "Neglecting option"); } } /* TODO: build_rpc_config (); */ ret = dict_get_int32 (this->options, "limits.transaction-size", &conf->rpc_conf.max_block_size); if (ret < 0) { gf_log (this->name, GF_LOG_TRACE, "defaulting limits.transaction-size to %d", DEFAULT_BLOCK_SIZE); conf->rpc_conf.max_block_size = DEFAULT_BLOCK_SIZE; } data = dict_get (this->options, "config-directory"); if (data) { /* Check whether the specified directory exists, or directory specified is non standard */ ret = stat (data->data, &buf); if ((ret != 0) || !S_ISDIR (buf.st_mode)) { gf_log (this->name, GF_LOG_ERROR, "Directory '%s' doesn't exist, exiting.", data->data); ret = -1; goto out; } /* Make sure that conf-dir doesn't contain ".." in path */ if ((gf_strstr (data->data, "/", "..")) == -1) { ret = -1; gf_log (this->name, GF_LOG_ERROR, "%s: invalid conf_dir", data->data); goto out; } conf->conf_dir = gf_strdup (data->data); } ret = 0; out: return ret; } void print_caller (char *str, int size, call_frame_t *frame) { server_state_t *state = NULL; GF_VALIDATE_OR_GOTO ("server", str, out); GF_VALIDATE_OR_GOTO ("server", frame, out); state = CALL_STATE (frame); snprintf (str, size, " Callid=%"PRId64", Client=%s", frame->root->unique, state->xprt->peerinfo.identifier); out: return; } void server_print_resolve (char *str, int size, server_resolve_t *resolve) { int filled = 0; GF_VALIDATE_OR_GOTO ("server", str, out); if (!resolve) { snprintf (str, size, ""); return; } filled += snprintf (str + filled, size - filled, " Resolve={"); if (resolve->fd_no != -1) filled += snprintf (str + filled, size - filled, "fd=%"PRId64",", (uint64_t) resolve->fd_no); if (resolve->bname) filled += snprintf (str + filled, size - filled, "bname=%s,", resolve->bname); if (resolve->path) filled += snprintf (str + filled, size - filled, "path=%s", resolve->path); snprintf (str + filled, size - filled, "}"); out: return; } void server_print_loc (char *str, int size, loc_t *loc) { int filled = 0; GF_VALIDATE_OR_GOTO ("server", str, out); if (!loc) { snprintf (str, size, ""); return; } filled += snprintf (str + filled, size - filled, " Loc={"); if (loc->path) filled += snprintf (str + filled, size - filled, "path=%s,", loc->path); if (loc->inode) filled += snprintf (str + filled, size - filled, "inode=%p,", loc->inode); if (loc->parent) filled += snprintf (str + filled, size - filled, "parent=%p", loc->parent); snprintf (str + filled, size - filled, "}"); out: return; } void server_print_params (char *str, int size, server_state_t *state) { int filled = 0; GF_VALIDATE_OR_GOTO ("server", str, out); filled += snprintf (str + filled, size - filled, " Params={"); if (state->fd) filled += snprintf (str + filled, size - filled, "fd=%p,", state->fd); if (state->valid) filled += snprintf (str + filled, size - filled, "valid=%d,", state->valid); if (state->flags) filled += snprintf (str + filled, size - filled, "flags=%d,", state->flags); if (state->wbflags) filled += snprintf (str + filled, size - filled, "wbflags=%d,", state->wbflags); if (state->size) filled += snprintf (str + filled, size - filled, "size=%zu,", state->size); if (state->offset) filled += snprintf (str + filled, size - filled, "offset=%"PRId64",", state->offset); if (state->cmd) filled += snprintf (str + filled, size - filled, "cmd=%d,", state->cmd); if (state->type) filled += snprintf (str + filled, size - filled, "type=%d,", state->type); if (state->name) filled += snprintf (str + filled, size - filled, "name=%s,", state->name); if (state->mask) filled += snprintf (str + filled, size - filled, "mask=%d,", state->mask); if (state->volume) filled += snprintf (str + filled, size - filled, "volume=%s,", state->volume); /* FIXME snprintf (str + filled, size - filled, "bound_xl=%s}", state->client->bound_xl->name); */ out: return; } int server_resolve_is_empty (server_resolve_t *resolve) { if (resolve->fd_no != -1) return 0; if (resolve->path != 0) return 0; if (resolve->bname != 0) return 0; return 1; } void server_print_reply (call_frame_t *frame, int op_ret, int op_errno) { server_conf_t *conf = NULL; server_state_t *state = NULL; xlator_t *this = NULL; char caller[512]; char fdstr[32]; char *op = "UNKNOWN"; GF_VALIDATE_OR_GOTO ("server", frame, out); this = frame->this; conf = this->private; GF_VALIDATE_OR_GOTO ("server", conf, out); GF_VALIDATE_OR_GOTO ("server", conf->trace, out); state = CALL_STATE (frame); print_caller (caller, 256, frame); switch (frame->root->type) { case GF_OP_TYPE_FOP: op = (char *)gf_fop_list[frame->root->op]; break; default: op = ""; } fdstr[0] = '\0'; if (state->fd) snprintf (fdstr, 32, " fd=%p", state->fd); gf_log (this->name, GF_LOG_INFO, "%s%s => (%d, %d)%s", op, caller, op_ret, op_errno, fdstr); out: return; } void server_print_request (call_frame_t *frame) { server_conf_t *conf = NULL; xlator_t *this = NULL; server_state_t *state = NULL; char *op = "UNKNOWN"; char resolve_vars[256]; char resolve2_vars[256]; char loc_vars[256]; char loc2_vars[256]; char other_vars[512]; char caller[512]; GF_VALIDATE_OR_GOTO ("server", frame, out); this = frame->this; conf = this->private; GF_VALIDATE_OR_GOTO ("server", conf, out); if (!conf->trace) goto out; state = CALL_STATE (frame); memset (resolve_vars, '\0', 256); memset (resolve2_vars, '\0', 256); memset (loc_vars, '\0', 256); memset (loc2_vars, '\0', 256); memset (other_vars, '\0', 256); print_caller (caller, 256, frame); if (!server_resolve_is_empty (&state->resolve)) { server_print_resolve (resolve_vars, 256, &state->resolve); server_print_loc (loc_vars, 256, &state->loc); } if (!server_resolve_is_empty (&state->resolve2)) { server_print_resolve (resolve2_vars, 256, &state->resolve2); server_print_loc (loc2_vars, 256, &state->loc2); } server_print_params (other_vars, 512, state); switch (frame->root->type) { case GF_OP_TYPE_FOP: op = (char *)gf_fop_list[frame->root->op]; break; default: op = ""; break; } gf_log (this->name, GF_LOG_INFO, "%s%s%s%s%s%s%s", op, caller, resolve_vars, loc_vars, resolve2_vars, loc2_vars, other_vars); out: return; } int serialize_rsp_direntp (gf_dirent_t *entries, gfs3_readdirp_rsp *rsp) { gf_dirent_t *entry = NULL; gfs3_dirplist *trav = NULL; gfs3_dirplist *prev = NULL; int ret = -1; GF_VALIDATE_OR_GOTO ("server", entries, out); GF_VALIDATE_OR_GOTO ("server", rsp, out); list_for_each_entry (entry, &entries->list, list) { trav = GF_CALLOC (1, sizeof (*trav), gf_server_mt_dirent_rsp_t); if (!trav) goto out; trav->d_ino = entry->d_ino; trav->d_off = entry->d_off; trav->d_len = entry->d_len; trav->d_type = entry->d_type; trav->name = entry->d_name; gf_stat_from_iatt (&trav->stat, &entry->d_stat); /* if 'dict' is present, pack it */ if (entry->dict) { trav->dict.dict_len = dict_serialized_length (entry->dict); if (trav->dict.dict_len > UINT_MAX) { gf_log (THIS->name, GF_LOG_ERROR, "failed to get serialized length " "of reply dict"); errno = EINVAL; trav->dict.dict_len = 0; goto out; } trav->dict.dict_val = GF_CALLOC (1, trav->dict.dict_len, gf_server_mt_rsp_buf_t); if (!trav->dict.dict_val) { errno = ENOMEM; trav->dict.dict_len = 0; goto out; } ret = dict_serialize (entry->dict, trav->dict.dict_val); if (ret < 0) { gf_log (THIS->name, GF_LOG_ERROR, "failed to serialize reply dict"); errno = -ret; trav->dict.dict_len = 0; goto out; } } if (prev) prev->nextentry = trav; else rsp->reply = trav; prev = trav; trav = NULL; } ret = 0; out: GF_FREE (trav); return ret; } int serialize_rsp_dirent (gf_dirent_t *entries, gfs3_readdir_rsp *rsp) { gf_dirent_t *entry = NULL; gfs3_dirlist *trav = NULL; gfs3_dirlist *prev = NULL; int ret = -1; GF_VALIDATE_OR_GOTO ("server", entries, out); GF_VALIDATE_OR_GOTO ("server", rsp, out); list_for_each_entry (entry, &entries->list, list) { trav = GF_CALLOC (1, sizeof (*trav), gf_server_mt_dirent_rsp_t); if (!trav) goto out; trav->d_ino = entry->d_ino; trav->d_off = entry->d_off; trav->d_len = entry->d_len; trav->d_type = entry->d_type; trav->name = entry->d_name; if (prev) prev->nextentry = trav; else rsp->reply = trav; prev = trav; } ret = 0; out: return ret; } int readdir_rsp_cleanup (gfs3_readdir_rsp *rsp) { gfs3_dirlist *prev = NULL; gfs3_dirlist *trav = NULL; trav = rsp->reply; prev = trav; while (trav) { trav = trav->nextentry; GF_FREE (prev); prev = trav; } return 0; } int readdirp_rsp_cleanup (gfs3_readdirp_rsp *rsp) { gfs3_dirplist *prev = NULL; gfs3_dirplist *trav = NULL; trav = rsp->reply; prev = trav; while (trav) { trav = trav->nextentry; GF_FREE (prev->dict.dict_val); GF_FREE (prev); prev = trav; } return 0; } int gf_server_check_getxattr_cmd (call_frame_t *frame, const char *key) { server_conf_t *conf = NULL; rpc_transport_t *xprt = NULL; conf = frame->this->private; if (!conf) return 0; if (fnmatch ("*list*mount*point*", key, 0) == 0) { /* list all the client protocol connecting to this process */ pthread_mutex_lock (&conf->mutex); { list_for_each_entry (xprt, &conf->xprt_list, list) { gf_log ("mount-point-list", GF_LOG_INFO, "%s", xprt->peerinfo.identifier); } } pthread_mutex_unlock (&conf->mutex); } /* Add more options/keys here */ return 0; } int gf_server_check_setxattr_cmd (call_frame_t *frame, dict_t *dict) { server_conf_t *conf = NULL; rpc_transport_t *xprt = NULL; uint64_t total_read = 0; uint64_t total_write = 0; conf = frame->this->private; if (!conf || !dict) return 0; if (dict_foreach_fnmatch (dict, "*io*stat*dump", dict_null_foreach_fn, NULL ) > 0) { list_for_each_entry (xprt, &conf->xprt_list, list) { total_read += xprt->total_bytes_read; total_write += xprt->total_bytes_write; } gf_log ("stats", GF_LOG_INFO, "total-read %"PRIu64", total-write %"PRIu64, total_read, total_write); } return 0; } gf_boolean_t server_cancel_grace_timer (xlator_t *this, client_t *client) { server_ctx_t *serv_ctx = NULL; gf_timer_t *timer = NULL; gf_boolean_t cancelled = _gf_false; if (!this || !client) { gf_log (THIS->name, GF_LOG_ERROR, "Invalid arguments to cancel connection timer"); return cancelled; } serv_ctx = server_ctx_get (client, client->this); if (serv_ctx == NULL) { gf_log (this->name, GF_LOG_INFO, "server_ctx_get() failed"); goto out; } LOCK (&serv_ctx->fdtable_lock); { if (serv_ctx->grace_timer) { timer = serv_ctx->grace_timer; serv_ctx->grace_timer = NULL; } } UNLOCK (&serv_ctx->fdtable_lock); if (timer) { gf_timer_call_cancel (this->ctx, timer); cancelled = _gf_true; } out: return cancelled; } server_ctx_t* server_ctx_get (client_t *client, xlator_t *xlator) { void *tmp = NULL; server_ctx_t *ctx = NULL; client_ctx_get (client, xlator, &tmp); ctx = tmp; if (ctx != NULL) goto out; ctx = GF_CALLOC (1, sizeof (server_ctx_t), gf_server_mt_server_conf_t); if (ctx == NULL) goto out; /* ctx->lk_version = 0; redundant */ ctx->fdtable = gf_fd_fdtable_alloc (); if (ctx->fdtable == NULL) { GF_FREE (ctx); ctx = NULL; goto out; } LOCK_INIT (&ctx->fdtable_lock); if (client_ctx_set (client, xlator, ctx) != 0) { LOCK_DESTROY (&ctx->fdtable_lock); GF_FREE (ctx); ctx = NULL; } out: return ctx; } int auth_set_username_passwd (dict_t *input_params, dict_t *config_params, client_t *client) { int ret = 0; data_t *allow_user = NULL; data_t *passwd_data = NULL; char *username = NULL; char *password = NULL; char *brick_name = NULL; char *searchstr = NULL; char *username_str = NULL; char *tmp = NULL; char *username_cpy = NULL; ret = dict_get_str (input_params, "username", &username); if (ret) { gf_log ("auth/login", GF_LOG_DEBUG, "username not found, returning DONT-CARE"); /* For non trusted clients username and password will not be there. So dont reject the client. */ ret = 0; goto out; } ret = dict_get_str (input_params, "password", &password); if (ret) { gf_log ("auth/login", GF_LOG_WARNING, "password not found, returning DONT-CARE"); goto out; } ret = dict_get_str (input_params, "remote-subvolume", &brick_name); if (ret) { gf_log ("auth/login", GF_LOG_ERROR, "remote-subvolume not specified"); ret = -1; goto out; } ret = gf_asprintf (&searchstr, "auth.login.%s.allow", brick_name); if (-1 == ret) { ret = 0; goto out; } allow_user = dict_get (config_params, searchstr); GF_FREE (searchstr); if (allow_user) { username_cpy = gf_strdup (allow_user->data); if (!username_cpy) goto out; username_str = strtok_r (username_cpy, " ,", &tmp); while (username_str) { if (!fnmatch (username_str, username, 0)) { ret = gf_asprintf (&searchstr, "auth.login.%s.password", username); if (-1 == ret) goto out; passwd_data = dict_get (config_params, searchstr); GF_FREE (searchstr); if (!passwd_data) { gf_log ("auth/login", GF_LOG_ERROR, "wrong username/password " "combination"); ret = -1; goto out; } ret = !((strcmp (data_to_str (passwd_data), password))?0: -1); if (!ret) { client->auth.username = gf_strdup (username); client->auth.passwd = gf_strdup (password); } if (ret == -1) gf_log ("auth/login", GF_LOG_ERROR, "wrong password for user %s", username); break; } username_str = strtok_r (NULL, " ,", &tmp); } } out: GF_FREE (username_cpy); return ret; } int32_t gf_barrier_transmit (server_conf_t *conf, gf_barrier_payload_t *payload) { gf_barrier_t *barrier = NULL; int32_t ret = -1; client_t *client = NULL; gf_boolean_t lk_heal = _gf_false; call_frame_t *frame = NULL; server_state_t *state = NULL; GF_VALIDATE_OR_GOTO ("barrier", conf, out); GF_VALIDATE_OR_GOTO ("barrier", conf->barrier, out); GF_VALIDATE_OR_GOTO ("barrier", payload, out); barrier = conf->barrier; frame = payload->frame; if (frame) { state = CALL_STATE (frame); frame->local = NULL; client = frame->root->client; } /* currently lk fops are not barrier'ed. This is reflecting code in * server_submit_reply */ if (client) lk_heal = ((server_conf_t *) client->this->private)->lk_heal; ret = rpcsvc_submit_generic (payload->req, &payload->rsp, 1, payload->payload, payload->payload_count, payload->iobref); iobuf_unref (payload->iob); if (ret == -1) { gf_log_callingfn ("", GF_LOG_ERROR, "Reply submission failed"); if (frame && client && !lk_heal) { server_connection_cleanup (frame->this, client, INTERNAL_LOCKS | POSIX_LOCKS); } else { /* TODO: Failure of open(dir), create, inodelk, entrylk or lk fops send failure must be handled specially. */ } goto ret; } ret = 0; ret: if (state) { free_state (state); } if (frame) { gf_client_unref (client); STACK_DESTROY (frame->root); } if (payload->free_iobref) { iobref_unref (payload->iobref); } out: return ret; } gf_barrier_payload_t * gf_barrier_dequeue (gf_barrier_t *barrier) { gf_barrier_payload_t *payload = NULL; if (!barrier || list_empty (&barrier->queue)) return NULL; payload = list_entry (barrier->queue.next, gf_barrier_payload_t, list); if (payload) { list_del_init (&payload->list); barrier->cur_size--; } return payload; } void* gf_barrier_dequeue_start (void *data) { server_conf_t *conf = NULL; gf_barrier_t *barrier = NULL; gf_barrier_payload_t *payload = NULL; conf = (server_conf_t *)data; if (!conf || !conf->barrier) return NULL; barrier = conf->barrier; LOCK (&barrier->lock); { while (barrier->cur_size) { payload = gf_barrier_dequeue (barrier); if (payload) { if (gf_barrier_transmit (conf, payload)) { gf_log ("server", GF_LOG_WARNING, "Failed to transmit"); } GF_FREE (payload); } } } UNLOCK (&barrier->lock); return NULL; } void gf_barrier_timeout (void *data) { server_conf_t *conf = NULL; gf_barrier_t *barrier = NULL; gf_boolean_t need_dequeue = _gf_false; conf = (server_conf_t *)data; if (!conf || !conf->barrier) goto out; barrier = conf->barrier; gf_log ("", GF_LOG_INFO, "barrier timed-out"); LOCK (&barrier->lock); { need_dequeue = barrier->on; barrier->on = _gf_false; } UNLOCK (&barrier->lock); if (need_dequeue == _gf_true) gf_barrier_dequeue_start (data); out: return; } int32_t gf_barrier_start (xlator_t *this) { server_conf_t *conf = NULL; gf_barrier_t *barrier = NULL; int32_t ret = -1; struct timespec time = {0,}; conf = this->private; GF_VALIDATE_OR_GOTO ("server", this, out); GF_VALIDATE_OR_GOTO (this->name, conf, out); GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out); barrier = conf->barrier; gf_log (this->name, GF_LOG_INFO, "barrier start called"); LOCK (&barrier->lock); { /* if barrier is on, reset timer */ if (barrier->on == _gf_true) { ret = gf_timer_call_cancel (this->ctx, barrier->timer); if (ret) { gf_log (this->name, GF_LOG_ERROR, "Failed to " "unset timer, failing barrier start"); goto unlock; } } barrier->on = _gf_true; time.tv_sec = barrier->time_out; time.tv_nsec = 0; barrier->timer = gf_timer_call_after (this->ctx, time, gf_barrier_timeout, (void *)conf); if (!barrier->timer) { gf_log (this->name, GF_LOG_ERROR, "Failed to set " "timer, failing barrier start"); barrier->on = _gf_false; } } unlock: UNLOCK (&barrier->lock); ret = 0; out: return ret; } int32_t gf_barrier_stop (xlator_t *this) { server_conf_t *conf = NULL; gf_barrier_t *barrier = NULL; int32_t ret = -1; gf_boolean_t need_dequeue = _gf_false; conf = this->private; GF_VALIDATE_OR_GOTO ("server", this, out); GF_VALIDATE_OR_GOTO (this->name, conf, out); GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out); barrier = conf->barrier; gf_log (this->name, GF_LOG_INFO, "barrier stop called"); LOCK (&barrier->lock); { need_dequeue = barrier->on; barrier->on = _gf_false; } UNLOCK (&barrier->lock); if (need_dequeue == _gf_true) { gf_timer_call_cancel (this->ctx, barrier->timer); ret = gf_thread_create (&conf->barrier_th, NULL, gf_barrier_dequeue_start, conf); if (ret) { gf_log (this->name, GF_LOG_CRITICAL, "Failed to start un-barriering"); goto out; } } ret = 0; out: return ret; } int32_t gf_barrier_fops_configure (xlator_t *this, gf_barrier_t *barrier, char *str) { int32_t ret = -1; char *dup_str = NULL; char *str_tok = NULL; char *save_ptr = NULL; uint64_t fops = 0; /* by defaul fsync & flush needs to be barriered */ fops |= 1 << GFS3_OP_FSYNC; fops |= 1 << GFS3_OP_FLUSH; if (!str) goto done; dup_str = gf_strdup (str); if (!dup_str) goto done; str_tok = strtok_r (dup_str, ",", &save_ptr); if (!str_tok) goto done; fops = 0; while (str_tok) { if (!strcmp(str_tok, "writev")) { fops |= ((uint64_t)1 << GFS3_OP_WRITE); } else if (!strcmp(str_tok, "fsync")) { fops |= ((uint64_t)1 << GFS3_OP_FSYNC); } else if (!strcmp(str_tok, "read")) { fops |= ((uint64_t)1 << GFS3_OP_READ); } else if (!strcmp(str_tok, "rename")) { fops |= ((uint64_t)1 << GFS3_OP_RENAME); } else if (!strcmp(str_tok, "flush")) { fops |= ((uint64_t)1 << GFS3_OP_FLUSH); } else if (!strcmp(str_tok, "ftruncate")) { fops |= ((uint64_t)1 << GFS3_OP_FTRUNCATE); } else if (!strcmp(str_tok, "fallocate")) { fops |= ((uint64_t)1 << GFS3_OP_FALLOCATE); } else if (!strcmp(str_tok, "rmdir")) { fops |= ((uint64_t)1 << GFS3_OP_RMDIR); } else { gf_log ("barrier", GF_LOG_ERROR, "Invalid barrier fop %s", str_tok); } str_tok = strtok_r (NULL, ",", &save_ptr); } done: LOCK (&barrier->lock); { barrier->fops = fops; } UNLOCK (&barrier->lock); ret = 0; GF_FREE (dup_str); return ret; } void gf_barrier_enqueue (gf_barrier_t *barrier, gf_barrier_payload_t *payload) { list_add_tail (&payload->list, &barrier->queue); barrier->cur_size++; } gf_barrier_payload_t * gf_barrier_payload (rpcsvc_request_t *req, struct iovec *rsp, call_frame_t *frame, struct iovec *payload_orig, int payloadcount, struct iobref *iobref, struct iobuf *iob, gf_boolean_t free_iobref) { gf_barrier_payload_t *payload = NULL; if (!rsp) return NULL; payload = GF_CALLOC (1, sizeof (*payload),1); if (!payload) return NULL; INIT_LIST_HEAD (&payload->list); payload->req = req; memcpy (&payload->rsp, rsp, sizeof (struct iovec)); payload->frame = frame; payload->payload = payload_orig; payload->payload_count = payloadcount; payload->iobref = iobref; payload->iob = iob; payload->free_iobref = free_iobref; return payload; }