diff options
Diffstat (limited to 'xlators/protocol/server/src/server.c')
-rw-r--r-- | xlators/protocol/server/src/server.c | 125 |
1 files changed, 120 insertions, 5 deletions
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 378aa393a50..7855170586a 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -130,8 +130,6 @@ ret: return iob; } - - int server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg, struct iovec *payload, int payloadcount, @@ -144,6 +142,10 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg, char new_iobref = 0; client_t *client = NULL; gf_boolean_t lk_heal = _gf_false; + server_conf_t *conf = NULL; + gf_barrier_t *barrier = NULL; + gf_barrier_payload_t *stub = NULL; + gf_boolean_t barriered = _gf_false; GF_VALIDATE_OR_GOTO ("server", req, ret); @@ -151,6 +153,7 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg, state = CALL_STATE (frame); frame->local = NULL; client = frame->root->client; + conf = (server_conf_t *) client->this->private; } if (client) @@ -173,6 +176,32 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg, iobref_add (iobref, iob); + if (conf) + barrier = conf->barrier; + if (barrier) { + /* todo: write's with fd flags set to O_SYNC and O_DIRECT */ + LOCK (&barrier->lock); + { + if (is_fop_barriered (barrier->fops, req->procnum) && + (barrier_add_to_queue (barrier))) { + stub = gf_barrier_payload (req, &rsp, frame, + payload, + payloadcount, iobref, + iob, new_iobref); + if (stub) { + gf_barrier_enqueue (barrier, stub); + barriered = _gf_true; + } else { + gf_log ("", GF_LOG_ERROR, "Failed to " + " barrier fop %"PRIu64, + ((uint64_t)1 << req->procnum)); + } + } + } + UNLOCK (&barrier->lock); + if (barriered == _gf_true) + goto out; + } /* Then, submit the message for transmission. */ ret = rpcsvc_submit_generic (req, &rsp, 1, payload, payloadcount, iobref); @@ -214,7 +243,7 @@ ret: if (new_iobref) { iobref_unref (iobref); } - +out: return ret; } @@ -798,6 +827,8 @@ init (xlator_t *this) server_conf_t *conf = NULL; rpcsvc_listener_t *listener = NULL; char *statedump_path = NULL; + gf_barrier_t *barrier = NULL; + char *str = NULL; GF_VALIDATE_OR_GOTO ("init", this, out); if (this->children == NULL) { @@ -946,6 +977,37 @@ init (xlator_t *this) } } #endif + /* barrier related */ + barrier = GF_CALLOC (1, sizeof (*barrier),1); + if (!barrier) { + gf_log (this->name, GF_LOG_WARNING, + "WARNING: Failed to allocate barrier"); + ret = -1; + goto out; + } + + LOCK_INIT (&barrier->lock); + INIT_LIST_HEAD (&barrier->queue); + barrier->on = _gf_false; + + GF_OPTION_INIT ("barrier-queue-length", barrier->max_size, + int64, out); + GF_OPTION_INIT ("barrier-timeout", barrier->time_out, + uint64, out); + + ret = dict_get_str (this->options, "barrier-fops", &str); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "setting barrier fops to default value"); + } + ret = gf_barrier_fops_configure (this, barrier, str); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "invalid barrier fops specified"); + goto out; + } + + conf->barrier = barrier; this->private = conf; ret = 0; @@ -999,12 +1061,48 @@ int notify (xlator_t *this, int32_t event, void *data, ...) { int ret = 0; + int32_t val = 0; + dict_t *dict = NULL; + dict_t *output = NULL; + va_list ap; + + dict = data; + va_start (ap, data); + output = va_arg (ap, dict_t*); + va_end (ap); + switch (event) { + case GF_EVENT_VOLUME_BARRIER_OP: + ret = dict_get_int32 (dict, "barrier", &val); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Wrong BARRIER event"); + goto out; + } + /* !val un-barrier, if val, barrier */ + if (val) { + ret = gf_barrier_start (this); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "Barrier start failed"); + } else { + ret = gf_barrier_stop (this); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "Barrier stop failed"); + } + ret = dict_set_int32 (output, "barrier-status", ret); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "Failed to set barrier-status in dict"); + break; + + /* todo: call default_notify to make other xlators handle it.*/ default: default_notify (this, event, data); break; } - +out: return ret; } @@ -1128,6 +1226,23 @@ struct volume_options options[] = { "overrides the auth.allow option. By default, all" " connections are allowed." }, - + {.key = {"barrier-timeout"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "60", + .min = 0, + .max = 360, + .description = "Barrier timeout in seconds", + }, + {.key = {"barrier-queue-length"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "4096", + .min = 0, + .max = 16384, + .description = "Barrier queue length", + }, + {.key = {"barrier-fops"}, + .type = GF_OPTION_TYPE_STR, + .description = "Allow a comma seperated fop lists", + }, { .key = {NULL} }, }; |