summaryrefslogtreecommitdiffstats
path: root/xlators/features/quota/src/quotad-aggregator.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/quota/src/quotad-aggregator.c')
-rw-r--r--xlators/features/quota/src/quotad-aggregator.c494
1 files changed, 494 insertions, 0 deletions
diff --git a/xlators/features/quota/src/quotad-aggregator.c b/xlators/features/quota/src/quotad-aggregator.c
new file mode 100644
index 00000000000..75d47867b5b
--- /dev/null
+++ b/xlators/features/quota/src/quotad-aggregator.c
@@ -0,0 +1,494 @@
+/*
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "cli1-xdr.h"
+#include "quota.h"
+#include "quotad-helpers.h"
+#include "quotad-aggregator.h"
+
+static char *qd_ext_xattrs[] = {
+ QUOTA_SIZE_KEY,
+ QUOTA_LIMIT_KEY,
+ QUOTA_LIMIT_OBJECTS_KEY,
+ NULL,
+};
+
+static struct rpcsvc_program quotad_aggregator_prog;
+
+struct iobuf *
+quotad_serialize_reply(rpcsvc_request_t *req, void *arg, struct iovec *outmsg,
+ xdrproc_t xdrproc)
+{
+ struct iobuf *iob = NULL;
+ ssize_t retlen = 0;
+ ssize_t xdr_size = 0;
+
+ GF_VALIDATE_OR_GOTO("server", req, ret);
+
+ /* First, get the io buffer into which the reply in arg will
+ * be serialized.
+ */
+ if (arg && xdrproc) {
+ xdr_size = xdr_sizeof(xdrproc, arg);
+ iob = iobuf_get2(req->svc->ctx->iobuf_pool, xdr_size);
+ if (!iob) {
+ gf_log_callingfn(THIS->name, GF_LOG_ERROR, "Failed to get iobuf");
+ goto ret;
+ };
+
+ iobuf_to_iovec(iob, outmsg);
+ /* Use the given serializer to translate the given C structure
+ * in arg to XDR format which will be written into the buffer
+ * in outmsg.
+ */
+ /* retlen is used to received the error since size_t is unsigned and we
+ * need -1 for error notification during encoding.
+ */
+
+ retlen = xdr_serialize_generic(*outmsg, arg, xdrproc);
+ if (retlen == -1) {
+ /* Failed to Encode 'GlusterFS' msg in RPC is not exactly
+ failure of RPC return values.. Client should get
+ notified about this, so there are no missing frames */
+ gf_log_callingfn("", GF_LOG_ERROR, "Failed to encode message");
+ req->rpc_err = GARBAGE_ARGS;
+ retlen = 0;
+ }
+ }
+ outmsg->iov_len = retlen;
+ret:
+ return iob;
+}
+
+int
+quotad_aggregator_submit_reply(call_frame_t *frame, rpcsvc_request_t *req,
+ void *arg, struct iovec *payload,
+ int payloadcount, struct iobref *iobref,
+ xdrproc_t xdrproc)
+{
+ struct iobuf *iob = NULL;
+ int ret = -1;
+ struct iovec rsp = {
+ 0,
+ };
+ quotad_aggregator_state_t *state = NULL;
+ char new_iobref = 0;
+
+ GF_VALIDATE_OR_GOTO("server", req, ret);
+
+ if (frame) {
+ state = frame->root->state;
+ frame->local = NULL;
+ }
+
+ if (!iobref) {
+ iobref = iobref_new();
+ if (!iobref) {
+ goto ret;
+ }
+
+ new_iobref = 1;
+ }
+
+ iob = quotad_serialize_reply(req, arg, &rsp, xdrproc);
+ if (!iob) {
+ gf_msg("", GF_LOG_ERROR, 0, Q_MSG_DICT_SERIALIZE_FAIL,
+ "Failed to serialize reply");
+ goto ret;
+ }
+
+ iobref_add(iobref, iob);
+
+ ret = rpcsvc_submit_generic(req, &rsp, 1, payload, payloadcount, iobref);
+
+ iobuf_unref(iob);
+
+ ret = 0;
+ret:
+ if (state) {
+ quotad_aggregator_free_state(state);
+ }
+
+ if (frame)
+ STACK_DESTROY(frame->root);
+
+ if (new_iobref) {
+ iobref_unref(iobref);
+ }
+
+ return ret;
+}
+
+int
+quotad_aggregator_getlimit_cbk(xlator_t *this, call_frame_t *frame,
+ void *lookup_rsp)
+{
+ gfs3_lookup_rsp *rsp = lookup_rsp;
+ gf_cli_rsp cli_rsp = {
+ 0,
+ };
+ dict_t *xdata = NULL;
+ quotad_aggregator_state_t *state = NULL;
+ int ret = -1;
+ int type = 0;
+
+ if (!rsp || (rsp->op_ret == -1))
+ goto reply;
+
+ GF_PROTOCOL_DICT_UNSERIALIZE(frame->this, xdata, (rsp->xdata.xdata_val),
+ (rsp->xdata.xdata_len), rsp->op_ret,
+ rsp->op_errno, out);
+
+ if (xdata) {
+ state = frame->root->state;
+ ret = dict_get_int32n(state->req_xdata, "type", SLEN("type"), &type);
+ if (ret < 0)
+ goto out;
+
+ ret = dict_set_int32_sizen(xdata, "type", type);
+ if (ret < 0)
+ goto out;
+ }
+
+ ret = 0;
+out:
+ rsp->op_ret = ret;
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_DICT_UNSERIALIZE_FAIL,
+ "failed to unserialize "
+ "nameless lookup rsp");
+ goto reply;
+ }
+ cli_rsp.op_ret = rsp->op_ret;
+ cli_rsp.op_errno = rsp->op_errno;
+ cli_rsp.op_errstr = "";
+ if (xdata) {
+ GF_PROTOCOL_DICT_SERIALIZE(frame->this, xdata, (&cli_rsp.dict.dict_val),
+ (cli_rsp.dict.dict_len), cli_rsp.op_errno,
+ reply);
+ }
+
+reply:
+ quotad_aggregator_submit_reply(frame, (frame) ? frame->local : NULL,
+ (void *)&cli_rsp, NULL, 0, NULL,
+ (xdrproc_t)xdr_gf_cli_rsp);
+
+ dict_unref(xdata);
+ GF_FREE(cli_rsp.dict.dict_val);
+ return 0;
+}
+
+int
+quotad_aggregator_getlimit(rpcsvc_request_t *req)
+{
+ call_frame_t *frame = NULL;
+ gf_cli_req cli_req = {
+ {0},
+ };
+ gf_cli_rsp cli_rsp = {0};
+ quotad_aggregator_state_t *state = NULL;
+ xlator_t *this = NULL;
+ dict_t *dict = NULL;
+ int ret = -1, op_errno = 0;
+ char *gfid_str = NULL;
+ uuid_t gfid = {0};
+ char *volume_uuid = NULL;
+
+ GF_VALIDATE_OR_GOTO("quotad-aggregator", req, err);
+
+ this = THIS;
+
+ cli_req.dict.dict_val = alloca(req->msg[0].iov_len);
+
+ ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
+ if (ret < 0) {
+ // failed to decode msg;
+ gf_msg("this->name", GF_LOG_ERROR, 0, Q_MSG_XDR_DECODE_ERROR,
+ "xdr decoding error");
+ req->rpc_err = GARBAGE_ARGS;
+ goto err;
+ }
+
+ if (cli_req.dict.dict_len) {
+ dict = dict_new();
+ ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
+ &dict);
+ if (ret < 0) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_DICT_UNSERIALIZE_FAIL,
+ "Failed to unserialize req-buffer to "
+ "dictionary");
+ goto err;
+ }
+ }
+
+ ret = dict_get_strn(dict, "gfid", SLEN("gfid"), &gfid_str);
+ if (ret) {
+ goto err;
+ }
+
+ ret = dict_get_strn(dict, "volume-uuid", SLEN("volume-uuid"), &volume_uuid);
+ if (ret) {
+ goto err;
+ }
+
+ gf_uuid_parse((const char *)gfid_str, gfid);
+
+ frame = quotad_aggregator_get_frame_from_req(req);
+ if (frame == NULL) {
+ cli_rsp.op_errno = ENOMEM;
+ goto errx;
+ }
+ state = frame->root->state;
+ state->req_xdata = dict;
+ state->xdata = dict_new();
+ dict = NULL;
+
+ ret = dict_set_int32_sizen(state->xdata, QUOTA_LIMIT_KEY, 42);
+ if (ret)
+ goto err;
+
+ ret = dict_set_int32_sizen(state->xdata, QUOTA_LIMIT_OBJECTS_KEY, 42);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, ENOMEM, Q_MSG_ENOMEM,
+ "Failed to set QUOTA_LIMIT_OBJECTS_KEY");
+ goto err;
+ }
+
+ ret = dict_set_int32_sizen(state->xdata, QUOTA_SIZE_KEY, 42);
+ if (ret)
+ goto err;
+
+ ret = dict_set_int32_sizen(state->xdata, GET_ANCESTRY_PATH_KEY, 42);
+ if (ret)
+ goto err;
+
+ ret = qd_nameless_lookup(this, frame, (char *)gfid, state->xdata,
+ volume_uuid, quotad_aggregator_getlimit_cbk);
+ if (ret) {
+ cli_rsp.op_errno = ret;
+ goto errx;
+ }
+
+ return ret;
+
+err:
+ cli_rsp.op_errno = op_errno;
+errx:
+ cli_rsp.op_ret = -1;
+ cli_rsp.op_errstr = "";
+
+ quotad_aggregator_getlimit_cbk(this, frame, &cli_rsp);
+ if (dict)
+ dict_unref(dict);
+ return ret;
+}
+
+int
+quotad_aggregator_lookup_cbk(xlator_t *this, call_frame_t *frame, void *rsp)
+{
+ quotad_aggregator_submit_reply(frame, frame ? frame->local : NULL, rsp,
+ NULL, 0, NULL,
+ (xdrproc_t)xdr_gfs3_lookup_rsp);
+
+ return 0;
+}
+
+int
+quotad_aggregator_lookup(rpcsvc_request_t *req)
+{
+ call_frame_t *frame = NULL;
+ gfs3_lookup_req args = {
+ {
+ 0,
+ },
+ };
+ int i = 0, ret = -1, op_errno = 0;
+ gfs3_lookup_rsp rsp = {
+ 0,
+ };
+ quotad_aggregator_state_t *state = NULL;
+ xlator_t *this = NULL;
+ dict_t *dict = NULL;
+ char *volume_uuid = NULL;
+
+ GF_VALIDATE_OR_GOTO("quotad-aggregator", req, err);
+
+ this = THIS;
+
+ args.bname = alloca(req->msg[0].iov_len);
+ args.xdata.xdata_val = alloca(req->msg[0].iov_len);
+
+ ret = xdr_to_generic(req->msg[0], &args, (xdrproc_t)xdr_gfs3_lookup_req);
+ if (ret < 0) {
+ rsp.op_errno = EINVAL;
+ goto err;
+ }
+
+ frame = quotad_aggregator_get_frame_from_req(req);
+ if (frame == NULL) {
+ rsp.op_errno = ENOMEM;
+ goto err;
+ }
+
+ state = frame->root->state;
+
+ GF_PROTOCOL_DICT_UNSERIALIZE(this, dict, (args.xdata.xdata_val),
+ (args.xdata.xdata_len), ret, op_errno, err);
+
+ ret = dict_get_str(dict, "volume-uuid", &volume_uuid);
+ if (ret) {
+ goto err;
+ }
+
+ state->xdata = dict_new();
+
+ for (i = 0; qd_ext_xattrs[i]; i++) {
+ if (dict_get(dict, qd_ext_xattrs[i])) {
+ ret = dict_set_uint32(state->xdata, qd_ext_xattrs[i], 1);
+ if (ret < 0)
+ goto err;
+ }
+ }
+
+ ret = qd_nameless_lookup(this, frame, args.gfid, state->xdata, volume_uuid,
+ quotad_aggregator_lookup_cbk);
+ if (ret) {
+ rsp.op_errno = ret;
+ goto err;
+ }
+
+ if (dict)
+ dict_unref(dict);
+
+ return ret;
+
+err:
+ rsp.op_ret = -1;
+ rsp.op_errno = op_errno;
+
+ quotad_aggregator_lookup_cbk(this, frame, &rsp);
+ if (dict)
+ dict_unref(dict);
+
+ return ret;
+}
+
+int
+quotad_aggregator_rpc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,
+ void *data)
+{
+ if (!xl || !data) {
+ gf_log_callingfn("server", GF_LOG_WARNING,
+ "Calling rpc_notify without initializing");
+ goto out;
+ }
+
+ switch (event) {
+ case RPCSVC_EVENT_ACCEPT:
+ break;
+
+ case RPCSVC_EVENT_DISCONNECT:
+ break;
+
+ default:
+ break;
+ }
+
+out:
+ return 0;
+}
+
+int
+quotad_aggregator_init(xlator_t *this)
+{
+ quota_priv_t *priv = NULL;
+ int ret = -1;
+
+ priv = this->private;
+
+ if (priv->rpcsvc) {
+ /* Listener already created */
+ return 0;
+ }
+
+ ret = dict_set_nstrn(this->options, "transport.address-family",
+ SLEN("transport.address-family"), "unix",
+ SLEN("unix"));
+ if (ret)
+ goto out;
+
+ ret = dict_set_nstrn(this->options, "transport-type",
+ SLEN("transport-type"), "socket", SLEN("socket"));
+ if (ret)
+ goto out;
+
+ ret = dict_set_nstrn(this->options, "transport.socket.listen-path",
+ SLEN("transport.socket.listen-path"),
+ "/var/run/gluster/quotad.socket",
+ SLEN("/var/run/gluster/quotad.socket"));
+ if (ret)
+ goto out;
+
+ /* RPC related */
+ priv->rpcsvc = rpcsvc_init(this, this->ctx, this->options, 0);
+ if (priv->rpcsvc == NULL) {
+ gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_RPCSVC_INIT_FAILED,
+ "creation of rpcsvc failed");
+ ret = -1;
+ goto out;
+ }
+
+ ret = rpcsvc_create_listeners(priv->rpcsvc, this->options, this->name);
+ if (ret < 1) {
+ gf_msg(this->name, GF_LOG_WARNING, 0,
+ Q_MSG_RPCSVC_LISTENER_CREATION_FAILED,
+ "creation of listener failed");
+ ret = -1;
+ goto out;
+ }
+
+ priv->quotad_aggregator = &quotad_aggregator_prog;
+ quotad_aggregator_prog.options = this->options;
+
+ ret = rpcsvc_program_register(priv->rpcsvc, &quotad_aggregator_prog,
+ _gf_false);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_RPCSVC_REGISTER_FAILED,
+ "registration of program (name:%s, prognum:%d, "
+ "progver:%d) failed",
+ quotad_aggregator_prog.progname, quotad_aggregator_prog.prognum,
+ quotad_aggregator_prog.progver);
+ goto out;
+ }
+
+ ret = 0;
+out:
+ if (ret && priv->rpcsvc) {
+ GF_FREE(priv->rpcsvc);
+ priv->rpcsvc = NULL;
+ }
+
+ return ret;
+}
+
+static rpcsvc_actor_t quotad_aggregator_actors[GF_AGGREGATOR_MAXVALUE] = {
+ [GF_AGGREGATOR_NULL] = {"NULL", NULL, NULL, GF_AGGREGATOR_NULL, DRC_NA, 0},
+ [GF_AGGREGATOR_LOOKUP] = {"LOOKUP", quotad_aggregator_lookup, NULL,
+ GF_AGGREGATOR_NULL, DRC_NA, 0},
+ [GF_AGGREGATOR_GETLIMIT] = {"GETLIMIT", quotad_aggregator_getlimit, NULL,
+ GF_AGGREGATOR_GETLIMIT, DRC_NA, 0},
+};
+
+static struct rpcsvc_program quotad_aggregator_prog = {
+ .progname = "GlusterFS 3.3",
+ .prognum = GLUSTER_AGGREGATOR_PROGRAM,
+ .progver = GLUSTER_AGGREGATOR_VERSION,
+ .numactors = GF_AGGREGATOR_MAXVALUE,
+ .actors = quotad_aggregator_actors};