diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 3376 |
1 files changed, 1661 insertions, 1715 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 9ee9161c904..c5236251549 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -8,7 +8,6 @@ cases as published by the Free Software Foundation. */ - #define RPC_CLNT_DEFAULT_REQUEST_COUNT 512 #include "rpc-clnt.h" @@ -22,479 +21,465 @@ #include "rpc-common-xdr.h" void -rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); +rpc_clnt_reply_deinit(struct rpc_req *req, struct mem_pool *pool); struct saved_frame * -__saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout, - struct timeval *current) +__saved_frames_get_timedout(struct saved_frames *frames, uint32_t timeout, + struct timeval *current) { - struct saved_frame *bailout_frame = NULL, *tmp = NULL; - - if (!list_empty(&frames->sf.list)) { - tmp = list_entry (frames->sf.list.next, typeof (*tmp), list); - if ((tmp->saved_at.tv_sec + timeout) <= current->tv_sec) { - bailout_frame = tmp; - list_del_init (&bailout_frame->list); - frames->count--; - } - } - - return bailout_frame; + struct saved_frame *bailout_frame = NULL, *tmp = NULL; + + if (!list_empty(&frames->sf.list)) { + tmp = list_entry(frames->sf.list.next, typeof(*tmp), list); + if ((tmp->saved_at.tv_sec + timeout) <= current->tv_sec) { + bailout_frame = tmp; + list_del_init(&bailout_frame->list); + frames->count--; + } + } + + return bailout_frame; } static int -_is_lock_fop (struct saved_frame *sframe) +_is_lock_fop(struct saved_frame *sframe) { - int fop = 0; + int fop = 0; - if (SFRAME_GET_PROGNUM (sframe) == GLUSTER_FOP_PROGRAM && - SFRAME_GET_PROGVER (sframe) == GLUSTER_FOP_VERSION) - fop = SFRAME_GET_PROCNUM (sframe); + if (SFRAME_GET_PROGNUM(sframe) == GLUSTER_FOP_PROGRAM && + SFRAME_GET_PROGVER(sframe) == GLUSTER_FOP_VERSION) + fop = SFRAME_GET_PROCNUM(sframe); - return ((fop == GFS3_OP_LK) || - (fop == GFS3_OP_INODELK) || - (fop == GFS3_OP_FINODELK) || - (fop == GFS3_OP_ENTRYLK) || - (fop == GFS3_OP_FENTRYLK)); + return ((fop == GFS3_OP_LK) || (fop == GFS3_OP_INODELK) || + (fop == GFS3_OP_FINODELK) || (fop == GFS3_OP_ENTRYLK) || + (fop == GFS3_OP_FENTRYLK)); } struct saved_frame * -__saved_frames_put (struct saved_frames *frames, void *frame, - struct rpc_req *rpcreq) +__saved_frames_put(struct saved_frames *frames, void *frame, + struct rpc_req *rpcreq) { - struct saved_frame *saved_frame = NULL; + struct saved_frame *saved_frame = NULL; - saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool); - if (!saved_frame) { - goto out; - } - /* THIS should be saved and set back */ + saved_frame = mem_get(rpcreq->conn->rpc_clnt->saved_frames_pool); + if (!saved_frame) { + goto out; + } + /* THIS should be saved and set back */ - memset (saved_frame, 0, sizeof (*saved_frame)); - INIT_LIST_HEAD (&saved_frame->list); + memset(saved_frame, 0, sizeof(*saved_frame)); + INIT_LIST_HEAD(&saved_frame->list); - saved_frame->capital_this = THIS; - saved_frame->frame = frame; - saved_frame->rpcreq = rpcreq; - gettimeofday (&saved_frame->saved_at, NULL); + saved_frame->capital_this = THIS; + saved_frame->frame = frame; + saved_frame->rpcreq = rpcreq; + gettimeofday(&saved_frame->saved_at, NULL); - if (_is_lock_fop (saved_frame)) - list_add_tail (&saved_frame->list, &frames->lk_sf.list); - else - list_add_tail (&saved_frame->list, &frames->sf.list); + if (_is_lock_fop(saved_frame)) + list_add_tail(&saved_frame->list, &frames->lk_sf.list); + else + list_add_tail(&saved_frame->list, &frames->sf.list); - frames->count++; + frames->count++; out: - return saved_frame; + return saved_frame; } - - static void -call_bail (void *data) +call_bail(void *data) { - rpc_transport_t *trans = NULL; - struct rpc_clnt *clnt = NULL; - rpc_clnt_connection_t *conn = NULL; - struct timeval current; - struct list_head list; - struct saved_frame *saved_frame = NULL; - struct saved_frame *trav = NULL; - struct saved_frame *tmp = NULL; - char frame_sent[256] = {0,}; - struct timespec timeout = {0,}; - char peerid[UNIX_PATH_MAX] = {0}; - gf_boolean_t need_unref = _gf_false; - int len; - - GF_VALIDATE_OR_GOTO ("client", data, out); - - clnt = data; - - conn = &clnt->conn; - pthread_mutex_lock (&conn->lock); - { - trans = conn->trans; - if (trans) { - strncpy (peerid, conn->trans->peerinfo.identifier, - sizeof (peerid)-1); - + rpc_transport_t *trans = NULL; + struct rpc_clnt *clnt = NULL; + rpc_clnt_connection_t *conn = NULL; + struct timeval current; + struct list_head list; + struct saved_frame *saved_frame = NULL; + struct saved_frame *trav = NULL; + struct saved_frame *tmp = NULL; + char frame_sent[256] = { + 0, + }; + struct timespec timeout = { + 0, + }; + char peerid[UNIX_PATH_MAX] = {0}; + gf_boolean_t need_unref = _gf_false; + int len; + + GF_VALIDATE_OR_GOTO("client", data, out); + + clnt = data; + + conn = &clnt->conn; + pthread_mutex_lock(&conn->lock); + { + trans = conn->trans; + if (trans) { + strncpy(peerid, conn->trans->peerinfo.identifier, + sizeof(peerid) - 1); + } + } + pthread_mutex_unlock(&conn->lock); + /*rpc_clnt_connection_cleanup will be unwinding all saved frames, + * bailed or otherwise*/ + if (!trans) + goto out; + + gettimeofday(¤t, NULL); + INIT_LIST_HEAD(&list); + + pthread_mutex_lock(&conn->lock); + { + /* Chaining to get call-always functionality from + call-once timer */ + if (conn->timer) { + timeout.tv_sec = 10; + timeout.tv_nsec = 0; + + /* Ref rpc as it's added to timer event queue */ + rpc_clnt_ref(clnt); + gf_timer_call_cancel(clnt->ctx, conn->timer); + conn->timer = gf_timer_call_after(clnt->ctx, timeout, call_bail, + (void *)clnt); + + if (conn->timer == NULL) { + gf_log(conn->name, GF_LOG_WARNING, + "Cannot create bailout timer for %s", peerid); + need_unref = _gf_true; } } - pthread_mutex_unlock (&conn->lock); - /*rpc_clnt_connection_cleanup will be unwinding all saved frames, - * bailed or otherwise*/ - if (!trans) - goto out; - - gettimeofday (¤t, NULL); - INIT_LIST_HEAD (&list); - pthread_mutex_lock (&conn->lock); - { - /* Chaining to get call-always functionality from - call-once timer */ - if (conn->timer) { - timeout.tv_sec = 10; - timeout.tv_nsec = 0; - - /* Ref rpc as it's added to timer event queue */ - rpc_clnt_ref (clnt); - gf_timer_call_cancel (clnt->ctx, conn->timer); - conn->timer = gf_timer_call_after (clnt->ctx, - timeout, - call_bail, - (void *) clnt); - - if (conn->timer == NULL) { - gf_log (conn->name, GF_LOG_WARNING, - "Cannot create bailout timer for %s", - peerid); - need_unref = _gf_true; - } - } - - do { - saved_frame = - __saved_frames_get_timedout (conn->saved_frames, - conn->frame_timeout, - ¤t); - if (saved_frame) - list_add (&saved_frame->list, &list); - - } while (saved_frame); - } - pthread_mutex_unlock (&conn->lock); - - list_for_each_entry_safe (trav, tmp, &list, list) { - gf_time_fmt (frame_sent, sizeof frame_sent, - trav->saved_at.tv_sec, gf_timefmt_FT); - len = strlen (frame_sent); - snprintf (frame_sent + len, sizeof (frame_sent) - len, - ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); - - gf_log (conn->name, GF_LOG_ERROR, - "bailing out frame type(%s), op(%s(%d)), xid = 0x%x, " - "unique = %"PRIu64", sent = %s, timeout = %d for %s", - trav->rpcreq->prog->progname, - (trav->rpcreq->prog->procnames) ? - trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : - "--", - trav->rpcreq->procnum, trav->rpcreq->xid, - ((call_frame_t *)(trav->frame))->root->unique, - frame_sent, conn->frame_timeout, peerid); - - clnt = rpc_clnt_ref (clnt); - trav->rpcreq->rpc_status = -1; - trav->rpcreq->cbkfn (trav->rpcreq, NULL, 0, trav->frame); - - rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool); - clnt = rpc_clnt_unref (clnt); - list_del_init (&trav->list); - mem_put (trav); - } + do { + saved_frame = __saved_frames_get_timedout( + conn->saved_frames, conn->frame_timeout, ¤t); + if (saved_frame) + list_add(&saved_frame->list, &list); + + } while (saved_frame); + } + pthread_mutex_unlock(&conn->lock); + + list_for_each_entry_safe(trav, tmp, &list, list) + { + gf_time_fmt(frame_sent, sizeof frame_sent, trav->saved_at.tv_sec, + gf_timefmt_FT); + len = strlen(frame_sent); + snprintf(frame_sent + len, sizeof(frame_sent) - len, + ".%" GF_PRI_SUSECONDS, trav->saved_at.tv_usec); + + gf_log(conn->name, GF_LOG_ERROR, + "bailing out frame type(%s), op(%s(%d)), xid = 0x%x, " + "unique = %" PRIu64 ", sent = %s, timeout = %d for %s", + trav->rpcreq->prog->progname, + (trav->rpcreq->prog->procnames) + ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum] + : "--", + trav->rpcreq->procnum, trav->rpcreq->xid, + ((call_frame_t *)(trav->frame))->root->unique, frame_sent, + conn->frame_timeout, peerid); + + clnt = rpc_clnt_ref(clnt); + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn(trav->rpcreq, NULL, 0, trav->frame); + + rpc_clnt_reply_deinit(trav->rpcreq, clnt->reqpool); + clnt = rpc_clnt_unref(clnt); + list_del_init(&trav->list); + mem_put(trav); + } out: - rpc_clnt_unref (clnt); - if (need_unref) - rpc_clnt_unref (clnt); - return; + rpc_clnt_unref(clnt); + if (need_unref) + rpc_clnt_unref(clnt); + return; } - /* to be called with conn->lock held */ struct saved_frame * -__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, - struct rpc_req *rpcreq) +__save_frame(struct rpc_clnt *rpc_clnt, call_frame_t *frame, + struct rpc_req *rpcreq) { - rpc_clnt_connection_t *conn = NULL; - struct timespec timeout = {0, }; - struct saved_frame *saved_frame = NULL; + rpc_clnt_connection_t *conn = NULL; + struct timespec timeout = { + 0, + }; + struct saved_frame *saved_frame = NULL; - conn = &rpc_clnt->conn; + conn = &rpc_clnt->conn; - saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq); + saved_frame = __saved_frames_put(conn->saved_frames, frame, rpcreq); - if (saved_frame == NULL) { - goto out; - } + if (saved_frame == NULL) { + goto out; + } - /* TODO: make timeout configurable */ - if (conn->timer == NULL) { - timeout.tv_sec = 10; - timeout.tv_nsec = 0; - rpc_clnt_ref (rpc_clnt); - conn->timer = gf_timer_call_after (rpc_clnt->ctx, - timeout, - call_bail, - (void *) rpc_clnt); - } + /* TODO: make timeout configurable */ + if (conn->timer == NULL) { + timeout.tv_sec = 10; + timeout.tv_nsec = 0; + rpc_clnt_ref(rpc_clnt); + conn->timer = gf_timer_call_after(rpc_clnt->ctx, timeout, call_bail, + (void *)rpc_clnt); + } out: - return saved_frame; + return saved_frame; } - struct saved_frames * -saved_frames_new (void) +saved_frames_new(void) { - struct saved_frames *saved_frames = NULL; + struct saved_frames *saved_frames = NULL; - saved_frames = GF_CALLOC (1, sizeof (*saved_frames), - gf_common_mt_rpcclnt_savedframe_t); - if (!saved_frames) { - return NULL; - } + saved_frames = GF_CALLOC(1, sizeof(*saved_frames), + gf_common_mt_rpcclnt_savedframe_t); + if (!saved_frames) { + return NULL; + } - INIT_LIST_HEAD (&saved_frames->sf.list); - INIT_LIST_HEAD (&saved_frames->lk_sf.list); + INIT_LIST_HEAD(&saved_frames->sf.list); + INIT_LIST_HEAD(&saved_frames->lk_sf.list); - return saved_frames; + return saved_frames; } - int -__saved_frame_copy (struct saved_frames *frames, int64_t callid, - struct saved_frame *saved_frame) +__saved_frame_copy(struct saved_frames *frames, int64_t callid, + struct saved_frame *saved_frame) { - struct saved_frame *tmp = NULL; - int ret = -1; + struct saved_frame *tmp = NULL; + int ret = -1; - if (!saved_frame) { - ret = 0; - goto out; + if (!saved_frame) { + ret = 0; + goto out; + } + + list_for_each_entry(tmp, &frames->sf.list, list) + { + if (tmp->rpcreq->xid == callid) { + *saved_frame = *tmp; + ret = 0; + goto out; } + } - list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->rpcreq->xid == callid) { - *saved_frame = *tmp; - ret = 0; - goto out; - } - } - - list_for_each_entry (tmp, &frames->lk_sf.list, list) { - if (tmp->rpcreq->xid == callid) { - *saved_frame = *tmp; - ret = 0; - goto out; - } - } + list_for_each_entry(tmp, &frames->lk_sf.list, list) + { + if (tmp->rpcreq->xid == callid) { + *saved_frame = *tmp; + ret = 0; + goto out; + } + } out: - return ret; + return ret; } - struct saved_frame * -__saved_frame_get (struct saved_frames *frames, int64_t callid) +__saved_frame_get(struct saved_frames *frames, int64_t callid) { - struct saved_frame *saved_frame = NULL; - struct saved_frame *tmp = NULL; - - list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->rpcreq->xid == callid) { - list_del_init (&tmp->list); - frames->count--; - saved_frame = tmp; - goto out; - } - } - - list_for_each_entry (tmp, &frames->lk_sf.list, list) { - if (tmp->rpcreq->xid == callid) { - list_del_init (&tmp->list); - frames->count--; - saved_frame = tmp; - goto out; - } - } + struct saved_frame *saved_frame = NULL; + struct saved_frame *tmp = NULL; -out: - if (saved_frame) { - THIS = saved_frame->capital_this; + list_for_each_entry(tmp, &frames->sf.list, list) + { + if (tmp->rpcreq->xid == callid) { + list_del_init(&tmp->list); + frames->count--; + saved_frame = tmp; + goto out; } + } - return saved_frame; -} + list_for_each_entry(tmp, &frames->lk_sf.list, list) + { + if (tmp->rpcreq->xid == callid) { + list_del_init(&tmp->list); + frames->count--; + saved_frame = tmp; + goto out; + } + } + +out: + if (saved_frame) { + THIS = saved_frame->capital_this; + } + return saved_frame; +} void -saved_frames_unwind (struct saved_frames *saved_frames) +saved_frames_unwind(struct saved_frames *saved_frames) { - struct saved_frame *trav = NULL; - struct saved_frame *tmp = NULL; - char timestr[1024] = {0,}; - int len; - - list_splice_init (&saved_frames->lk_sf.list, &saved_frames->sf.list); - - list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) { - gf_time_fmt (timestr, sizeof timestr, - trav->saved_at.tv_sec, gf_timefmt_FT); - len = strlen (timestr); - snprintf (timestr + len, sizeof(timestr) - len, - ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); - - if (!trav->rpcreq || !trav->rpcreq->prog) - continue; - - gf_log_callingfn (trav->rpcreq->conn->name, - GF_LOG_ERROR, - "forced unwinding frame type(%s) op(%s(%d)) " - "called at %s (xid=0x%x)", - trav->rpcreq->prog->progname, - ((trav->rpcreq->prog->procnames) ? - trav->rpcreq->prog->procnames[trav->rpcreq->procnum] - : "--"), - trav->rpcreq->procnum, timestr, - trav->rpcreq->xid); - saved_frames->count--; - - trav->rpcreq->rpc_status = -1; - trav->rpcreq->cbkfn (trav->rpcreq, NULL, 0, trav->frame); - - rpc_clnt_reply_deinit (trav->rpcreq, - trav->rpcreq->conn->rpc_clnt->reqpool); - - list_del_init (&trav->list); - mem_put (trav); - } + struct saved_frame *trav = NULL; + struct saved_frame *tmp = NULL; + char timestr[1024] = { + 0, + }; + int len; + + list_splice_init(&saved_frames->lk_sf.list, &saved_frames->sf.list); + + list_for_each_entry_safe(trav, tmp, &saved_frames->sf.list, list) + { + gf_time_fmt(timestr, sizeof timestr, trav->saved_at.tv_sec, + gf_timefmt_FT); + len = strlen(timestr); + snprintf(timestr + len, sizeof(timestr) - len, ".%" GF_PRI_SUSECONDS, + trav->saved_at.tv_usec); + + if (!trav->rpcreq || !trav->rpcreq->prog) + continue; + + gf_log_callingfn( + trav->rpcreq->conn->name, GF_LOG_ERROR, + "forced unwinding frame type(%s) op(%s(%d)) " + "called at %s (xid=0x%x)", + trav->rpcreq->prog->progname, + ((trav->rpcreq->prog->procnames) + ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum] + : "--"), + trav->rpcreq->procnum, timestr, trav->rpcreq->xid); + saved_frames->count--; + + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn(trav->rpcreq, NULL, 0, trav->frame); + + rpc_clnt_reply_deinit(trav->rpcreq, + trav->rpcreq->conn->rpc_clnt->reqpool); + + list_del_init(&trav->list); + mem_put(trav); + } } - void -saved_frames_destroy (struct saved_frames *frames) +saved_frames_destroy(struct saved_frames *frames) { - if (!frames) - return; + if (!frames) + return; - saved_frames_unwind (frames); + saved_frames_unwind(frames); - GF_FREE (frames); + GF_FREE(frames); } - void -rpc_clnt_reconnect (void *conn_ptr) +rpc_clnt_reconnect(void *conn_ptr) { - rpc_transport_t *trans = NULL; - rpc_clnt_connection_t *conn = NULL; - struct timespec ts = {0, 0}; - struct rpc_clnt *clnt = NULL; - gf_boolean_t need_unref = _gf_false; + rpc_transport_t *trans = NULL; + rpc_clnt_connection_t *conn = NULL; + struct timespec ts = {0, 0}; + struct rpc_clnt *clnt = NULL; + gf_boolean_t need_unref = _gf_false; - conn = conn_ptr; - clnt = conn->rpc_clnt; + conn = conn_ptr; + clnt = conn->rpc_clnt; - pthread_mutex_lock (&conn->lock); - { - trans = conn->trans; - if (!trans) { - pthread_mutex_unlock (&conn->lock); - return; - } - if (conn->reconnect) - gf_timer_call_cancel (clnt->ctx, - conn->reconnect); - conn->reconnect = 0; - - if ((conn->connected == 0) && !clnt->disabled) { - ts.tv_sec = 3; - ts.tv_nsec = 0; - - gf_log (conn->name, GF_LOG_TRACE, - "attempting reconnect"); - (void) rpc_transport_connect (trans, - conn->config.remote_port); - rpc_clnt_ref (clnt); - conn->reconnect = - gf_timer_call_after (clnt->ctx, ts, - rpc_clnt_reconnect, - conn); - if (!conn->reconnect) { - need_unref = _gf_true; - gf_log (conn->name, GF_LOG_ERROR, - "Error adding to timer event queue"); - } - } else { - gf_log (conn->name, GF_LOG_TRACE, - "breaking reconnect chain"); - } + pthread_mutex_lock(&conn->lock); + { + trans = conn->trans; + if (!trans) { + pthread_mutex_unlock(&conn->lock); + return; + } + if (conn->reconnect) + gf_timer_call_cancel(clnt->ctx, conn->reconnect); + conn->reconnect = 0; + + if ((conn->connected == 0) && !clnt->disabled) { + ts.tv_sec = 3; + ts.tv_nsec = 0; + + gf_log(conn->name, GF_LOG_TRACE, "attempting reconnect"); + (void)rpc_transport_connect(trans, conn->config.remote_port); + rpc_clnt_ref(clnt); + conn->reconnect = gf_timer_call_after(clnt->ctx, ts, + rpc_clnt_reconnect, conn); + if (!conn->reconnect) { + need_unref = _gf_true; + gf_log(conn->name, GF_LOG_ERROR, + "Error adding to timer event queue"); + } + } else { + gf_log(conn->name, GF_LOG_TRACE, "breaking reconnect chain"); } - pthread_mutex_unlock (&conn->lock); + } + pthread_mutex_unlock(&conn->lock); - rpc_clnt_unref (clnt); - if (need_unref) - rpc_clnt_unref (clnt); - return; + rpc_clnt_unref(clnt); + if (need_unref) + rpc_clnt_unref(clnt); + return; } - int -rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) +rpc_clnt_fill_request_info(struct rpc_clnt *clnt, rpc_request_info_t *info) { - struct saved_frame saved_frame; - int ret = -1; - - pthread_mutex_lock (&clnt->conn.lock); - { - ret = __saved_frame_copy (clnt->conn.saved_frames, info->xid, - &saved_frame); - } - pthread_mutex_unlock (&clnt->conn.lock); - - if (ret == -1) { - gf_log (clnt->conn.name, GF_LOG_CRITICAL, - "cannot lookup the saved " - "frame corresponding to xid (%d)", info->xid); - goto out; - } - - info->prognum = saved_frame.rpcreq->prog->prognum; - info->procnum = saved_frame.rpcreq->procnum; - info->progver = saved_frame.rpcreq->prog->progver; - info->rpc_req = saved_frame.rpcreq; - info->rsp = saved_frame.rsp; - - ret = 0; + struct saved_frame saved_frame; + int ret = -1; + + pthread_mutex_lock(&clnt->conn.lock); + { + ret = __saved_frame_copy(clnt->conn.saved_frames, info->xid, + &saved_frame); + } + pthread_mutex_unlock(&clnt->conn.lock); + + if (ret == -1) { + gf_log(clnt->conn.name, GF_LOG_CRITICAL, + "cannot lookup the saved " + "frame corresponding to xid (%d)", + info->xid); + goto out; + } + + info->prognum = saved_frame.rpcreq->prog->prognum; + info->procnum = saved_frame.rpcreq->procnum; + info->progver = saved_frame.rpcreq->prog->progver; + info->rpc_req = saved_frame.rpcreq; + info->rsp = saved_frame.rsp; + + ret = 0; out: - return ret; + return ret; } int -rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn) +rpc_clnt_reconnect_cleanup(rpc_clnt_connection_t *conn) { - struct rpc_clnt *clnt = NULL; - int ret = 0; - gf_boolean_t reconnect_unref = _gf_false; - - if (!conn) { - goto out; - } - - clnt = conn->rpc_clnt; - - pthread_mutex_lock (&conn->lock); - { - - if (conn->reconnect) { - ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect); - if (!ret) { - reconnect_unref = _gf_true; - conn->cleanup_gen++; - } - conn->reconnect = NULL; - } - + struct rpc_clnt *clnt = NULL; + int ret = 0; + gf_boolean_t reconnect_unref = _gf_false; + + if (!conn) { + goto out; + } + + clnt = conn->rpc_clnt; + + pthread_mutex_lock(&conn->lock); + { + if (conn->reconnect) { + ret = gf_timer_call_cancel(clnt->ctx, conn->reconnect); + if (!ret) { + reconnect_unref = _gf_true; + conn->cleanup_gen++; + } + conn->reconnect = NULL; } - pthread_mutex_unlock (&conn->lock); + } + pthread_mutex_unlock(&conn->lock); - if (reconnect_unref) - rpc_clnt_unref (clnt); + if (reconnect_unref) + rpc_clnt_unref(clnt); out: - return 0; + return 0; } /* @@ -503,54 +488,53 @@ out: * */ int -rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) +rpc_clnt_connection_cleanup(rpc_clnt_connection_t *conn) { - struct saved_frames *saved_frames = NULL; - struct rpc_clnt *clnt = NULL; - int unref = 0; - int ret = 0; - gf_boolean_t timer_unref = _gf_false; - - if (!conn) { - goto out; - } + struct saved_frames *saved_frames = NULL; + struct rpc_clnt *clnt = NULL; + int unref = 0; + int ret = 0; + gf_boolean_t timer_unref = _gf_false; - clnt = conn->rpc_clnt; + if (!conn) { + goto out; + } - pthread_mutex_lock (&conn->lock); - { + clnt = conn->rpc_clnt; - saved_frames = conn->saved_frames; - conn->saved_frames = saved_frames_new (); + pthread_mutex_lock(&conn->lock); + { + saved_frames = conn->saved_frames; + conn->saved_frames = saved_frames_new(); - /* bailout logic cleanup */ - if (conn->timer) { - ret = gf_timer_call_cancel (clnt->ctx, conn->timer); - if (!ret) - timer_unref = _gf_true; - conn->timer = NULL; - } + /* bailout logic cleanup */ + if (conn->timer) { + ret = gf_timer_call_cancel(clnt->ctx, conn->timer); + if (!ret) + timer_unref = _gf_true; + conn->timer = NULL; + } - conn->connected = 0; - conn->disconnected = 1; + conn->connected = 0; + conn->disconnected = 1; - unref = rpc_clnt_remove_ping_timer_locked (clnt); - /*reset rpc msgs stats*/ - conn->pingcnt = 0; - conn->msgcnt = 0; - conn->cleanup_gen++; - } - pthread_mutex_unlock (&conn->lock); + unref = rpc_clnt_remove_ping_timer_locked(clnt); + /*reset rpc msgs stats*/ + conn->pingcnt = 0; + conn->msgcnt = 0; + conn->cleanup_gen++; + } + pthread_mutex_unlock(&conn->lock); - saved_frames_destroy (saved_frames); - if (unref) - rpc_clnt_unref (clnt); + saved_frames_destroy(saved_frames); + if (unref) + rpc_clnt_unref(clnt); - if (timer_unref) - rpc_clnt_unref (clnt); + if (timer_unref) + rpc_clnt_unref(clnt); out: - return 0; + return 0; } /* @@ -562,1554 +546,1516 @@ out: */ static struct saved_frame * -lookup_frame (rpc_clnt_connection_t *conn, int64_t callid) +lookup_frame(rpc_clnt_connection_t *conn, int64_t callid) { - struct saved_frame *frame = NULL; + struct saved_frame *frame = NULL; - pthread_mutex_lock (&conn->lock); - { - frame = __saved_frame_get (conn->saved_frames, callid); - } - pthread_mutex_unlock (&conn->lock); + pthread_mutex_lock(&conn->lock); + { + frame = __saved_frame_get(conn->saved_frames, callid); + } + pthread_mutex_unlock(&conn->lock); - return frame; + return frame; } - int -rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, - rpc_clnt_connection_t *conn, - struct rpc_msg *replymsg, struct iovec progmsg, - struct rpc_req *req, - struct saved_frame *saved_frame) +rpc_clnt_reply_fill(rpc_transport_pollin_t *msg, rpc_clnt_connection_t *conn, + struct rpc_msg *replymsg, struct iovec progmsg, + struct rpc_req *req, struct saved_frame *saved_frame) { - int ret = -1; - - if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) { - goto out; - } - - req->rpc_status = 0; - if ((rpc_reply_status (replymsg) == MSG_DENIED) - || (rpc_accepted_reply_status (replymsg) != SUCCESS)) { - req->rpc_status = -1; - } - - req->rsp[0] = progmsg; - req->rsp_iobref = iobref_ref (msg->iobref); - - if (msg->vectored) { - req->rsp[1] = msg->vector[1]; - req->rspcnt = 2; - } else { - req->rspcnt = 1; - } - - /* By this time, the data bytes for the auth scheme would have already - * been copied into the required sections of the req structure, - * we just need to fill in the meta-data about it now. + int ret = -1; + + if ((!conn) || (!replymsg) || (!req) || (!saved_frame) || (!msg)) { + goto out; + } + + req->rpc_status = 0; + if ((rpc_reply_status(replymsg) == MSG_DENIED) || + (rpc_accepted_reply_status(replymsg) != SUCCESS)) { + req->rpc_status = -1; + } + + req->rsp[0] = progmsg; + req->rsp_iobref = iobref_ref(msg->iobref); + + if (msg->vectored) { + req->rsp[1] = msg->vector[1]; + req->rspcnt = 2; + } else { + req->rspcnt = 1; + } + + /* By this time, the data bytes for the auth scheme would have already + * been copied into the required sections of the req structure, + * we just need to fill in the meta-data about it now. + */ + if (req->rpc_status == 0) { + /* + * req->verf.flavour = rpc_reply_verf_flavour (replymsg); + * req->verf.datalen = rpc_reply_verf_len (replymsg); */ - if (req->rpc_status == 0) { - /* - * req->verf.flavour = rpc_reply_verf_flavour (replymsg); - * req->verf.datalen = rpc_reply_verf_len (replymsg); - */ - } + } - ret = 0; + ret = 0; out: - return ret; + return ret; } - void -rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool) +rpc_clnt_reply_deinit(struct rpc_req *req, struct mem_pool *pool) { - if (!req) { - goto out; - } + if (!req) { + goto out; + } - if (req->rsp_iobref) { - iobref_unref (req->rsp_iobref); - } + if (req->rsp_iobref) { + iobref_unref(req->rsp_iobref); + } - mem_put (req); + mem_put(req); out: - return; + return; } - /* TODO: use mem-pool for allocating requests */ int -rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, - struct rpc_req *req, struct saved_frame *saved_frame) +rpc_clnt_reply_init(rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, + struct rpc_req *req, struct saved_frame *saved_frame) { - char *msgbuf = NULL; - struct rpc_msg rpcmsg; - struct iovec progmsg; /* RPC Program payload */ - size_t msglen = 0; - int ret = -1; - - msgbuf = msg->vector[0].iov_base; - msglen = msg->vector[0].iov_len; - - ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg, - req->verf.authdata); - if (ret != 0) { - gf_log (conn->name, GF_LOG_WARNING, - "RPC reply decoding failed"); - goto out; - } - - ret = rpc_clnt_reply_fill (msg, conn, &rpcmsg, progmsg, req, - saved_frame); - if (ret != 0) { - goto out; - } - - gf_log (conn->name, GF_LOG_TRACE, - "received rpc message (RPC XID: 0x%x" - " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)", - saved_frame->rpcreq->xid, - saved_frame->rpcreq->prog->progname, - saved_frame->rpcreq->prog->progver, - saved_frame->rpcreq->procnum, conn->name); + char *msgbuf = NULL; + struct rpc_msg rpcmsg; + struct iovec progmsg; /* RPC Program payload */ + size_t msglen = 0; + int ret = -1; + + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; + + ret = xdr_to_rpc_reply(msgbuf, msglen, &rpcmsg, &progmsg, + req->verf.authdata); + if (ret != 0) { + gf_log(conn->name, GF_LOG_WARNING, "RPC reply decoding failed"); + goto out; + } + + ret = rpc_clnt_reply_fill(msg, conn, &rpcmsg, progmsg, req, saved_frame); + if (ret != 0) { + goto out; + } + + gf_log(conn->name, GF_LOG_TRACE, + "received rpc message (RPC XID: 0x%x" + " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)", + saved_frame->rpcreq->xid, saved_frame->rpcreq->prog->progname, + saved_frame->rpcreq->prog->progver, saved_frame->rpcreq->procnum, + conn->name); out: - if (ret != 0) { - req->rpc_status = -1; - } + if (ret != 0) { + req->rpc_status = -1; + } - return ret; + return ret; } int -rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) +rpc_clnt_handle_cbk(struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) { - char *msgbuf = NULL; - rpcclnt_cb_program_t *program = NULL; - struct rpc_msg rpcmsg; - struct iovec progmsg; /* RPC Program payload */ - size_t msglen = 0; - int found = 0; - int ret = -1; - int procnum = 0; - - msgbuf = msg->vector[0].iov_base; - msglen = msg->vector[0].iov_len; - - clnt = rpc_clnt_ref (clnt); - ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); - if (ret == -1) { - gf_log (clnt->conn.name, GF_LOG_WARNING, - "RPC call decoding failed"); - goto out; - } - - gf_log (clnt->conn.name, GF_LOG_TRACE, - "receivd rpc message (XID: 0x%" GF_PRI_RPC_XID ", " - "Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID ", " - "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC ") " - "from rpc-transport (%s)", rpc_call_xid (&rpcmsg), - rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), - rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg), - clnt->conn.name); - - procnum = rpc_call_progproc (&rpcmsg); - - pthread_mutex_lock (&clnt->lock); + char *msgbuf = NULL; + rpcclnt_cb_program_t *program = NULL; + struct rpc_msg rpcmsg; + struct iovec progmsg; /* RPC Program payload */ + size_t msglen = 0; + int found = 0; + int ret = -1; + int procnum = 0; + + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; + + clnt = rpc_clnt_ref(clnt); + ret = xdr_to_rpc_call(msgbuf, msglen, &rpcmsg, &progmsg, NULL, NULL); + if (ret == -1) { + gf_log(clnt->conn.name, GF_LOG_WARNING, "RPC call decoding failed"); + goto out; + } + + gf_log(clnt->conn.name, GF_LOG_TRACE, + "receivd rpc message (XID: 0x%" GF_PRI_RPC_XID + ", " + "Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID + ", " + "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC + ") " + "from rpc-transport (%s)", + rpc_call_xid(&rpcmsg), rpc_call_rpcvers(&rpcmsg), + rpc_call_program(&rpcmsg), rpc_call_progver(&rpcmsg), + rpc_call_progproc(&rpcmsg), clnt->conn.name); + + procnum = rpc_call_progproc(&rpcmsg); + + pthread_mutex_lock(&clnt->lock); + { + list_for_each_entry(program, &clnt->programs, program) { - list_for_each_entry (program, &clnt->programs, program) { - if ((program->prognum == rpc_call_program (&rpcmsg)) - && (program->progver - == rpc_call_progver (&rpcmsg))) { - found = 1; - break; - } - } + if ((program->prognum == rpc_call_program(&rpcmsg)) && + (program->progver == rpc_call_progver(&rpcmsg))) { + found = 1; + break; + } } - pthread_mutex_unlock (&clnt->lock); + } + pthread_mutex_unlock(&clnt->lock); - if (found && (procnum < program->numactors) && - (program->actors[procnum].actor)) { - program->actors[procnum].actor (clnt, program->mydata, - &progmsg); - } + if (found && (procnum < program->numactors) && + (program->actors[procnum].actor)) { + program->actors[procnum].actor(clnt, program->mydata, &progmsg); + } out: - rpc_clnt_unref (clnt); - return ret; + rpc_clnt_unref(clnt); + return ret; } int -rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) +rpc_clnt_handle_reply(struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) { - rpc_clnt_connection_t *conn = NULL; - struct saved_frame *saved_frame = NULL; - int ret = -1; - struct rpc_req *req = NULL; - uint32_t xid = 0; - - clnt = rpc_clnt_ref (clnt); - conn = &clnt->conn; - - xid = ntoh32 (*((uint32_t *)pollin->vector[0].iov_base)); - saved_frame = lookup_frame (conn, xid); - if (saved_frame == NULL) { - gf_log (conn->name, GF_LOG_ERROR, - "cannot lookup the saved frame for reply with xid (%u)", - xid); - goto out; - } - - req = saved_frame->rpcreq; - if (req == NULL) { - gf_log (conn->name, GF_LOG_ERROR, - "no request with frame for xid (%u)", xid); - goto out; - } - - ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame); - if (ret != 0) { - req->rpc_status = -1; - gf_log (conn->name, GF_LOG_WARNING, - "initialising rpc reply failed"); - } - - req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame); - - if (req) { - rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool); - } + rpc_clnt_connection_t *conn = NULL; + struct saved_frame *saved_frame = NULL; + int ret = -1; + struct rpc_req *req = NULL; + uint32_t xid = 0; + + clnt = rpc_clnt_ref(clnt); + conn = &clnt->conn; + + xid = ntoh32(*((uint32_t *)pollin->vector[0].iov_base)); + saved_frame = lookup_frame(conn, xid); + if (saved_frame == NULL) { + gf_log(conn->name, GF_LOG_ERROR, + "cannot lookup the saved frame for reply with xid (%u)", xid); + goto out; + } + + req = saved_frame->rpcreq; + if (req == NULL) { + gf_log(conn->name, GF_LOG_ERROR, "no request with frame for xid (%u)", + xid); + goto out; + } + + ret = rpc_clnt_reply_init(conn, pollin, req, saved_frame); + if (ret != 0) { + req->rpc_status = -1; + gf_log(conn->name, GF_LOG_WARNING, "initialising rpc reply failed"); + } + + req->cbkfn(req, req->rsp, req->rspcnt, saved_frame->frame); + + if (req) { + rpc_clnt_reply_deinit(req, conn->rpc_clnt->reqpool); + } out: - if (saved_frame) { - mem_put (saved_frame); - } + if (saved_frame) { + mem_put(saved_frame); + } - rpc_clnt_unref (clnt); - return ret; + rpc_clnt_unref(clnt); + return ret; } gf_boolean_t -is_rpc_clnt_disconnected (rpc_clnt_connection_t *conn) +is_rpc_clnt_disconnected(rpc_clnt_connection_t *conn) { - gf_boolean_t disconnected = _gf_true; + gf_boolean_t disconnected = _gf_true; - if (!conn) - return disconnected; + if (!conn) + return disconnected; - pthread_mutex_lock (&conn->lock); - { - if (conn->disconnected == _gf_false) - disconnected = _gf_false; - } - pthread_mutex_unlock (&conn->lock); + pthread_mutex_lock(&conn->lock); + { + if (conn->disconnected == _gf_false) + disconnected = _gf_false; + } + pthread_mutex_unlock(&conn->lock); - return disconnected; + return disconnected; } static void -rpc_clnt_destroy (struct rpc_clnt *rpc); +rpc_clnt_destroy(struct rpc_clnt *rpc); -#define RPC_THIS_SAVE(xl) do { \ - old_THIS = THIS ; \ - if (!old_THIS) \ - gf_log_callingfn ("rpc", GF_LOG_CRITICAL, \ - "THIS is not initialised."); \ - THIS = xl; \ -} while (0) +#define RPC_THIS_SAVE(xl) \ + do { \ + old_THIS = THIS; \ + if (!old_THIS) \ + gf_log_callingfn("rpc", GF_LOG_CRITICAL, \ + "THIS is not initialised."); \ + THIS = xl; \ + } while (0) -#define RPC_THIS_RESTORE (THIS = old_THIS) +#define RPC_THIS_RESTORE (THIS = old_THIS) static int -rpc_clnt_handle_disconnect (struct rpc_clnt *clnt, rpc_clnt_connection_t *conn) +rpc_clnt_handle_disconnect(struct rpc_clnt *clnt, rpc_clnt_connection_t *conn) { - struct timespec ts = {0, }; - gf_boolean_t unref_clnt = _gf_false; - uint64_t pre_notify_gen = 0, post_notify_gen = 0; - - pthread_mutex_lock (&conn->lock); - { - pre_notify_gen = conn->cleanup_gen; - } - pthread_mutex_unlock (&conn->lock); - - if (clnt->notifyfn) - clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); - - pthread_mutex_lock (&conn->lock); - { - post_notify_gen = conn->cleanup_gen; - } - pthread_mutex_unlock (&conn->lock); - - if (pre_notify_gen == post_notify_gen) { - /* program didn't invoke cleanup, so rpc has to do it */ - rpc_clnt_connection_cleanup (conn); - } - - pthread_mutex_lock (&conn->lock); - { - if (!conn->rpc_clnt->disabled && (conn->reconnect == NULL)) { - ts.tv_sec = 10; - ts.tv_nsec = 0; - - rpc_clnt_ref (clnt); - conn->reconnect = gf_timer_call_after (clnt->ctx, ts, - rpc_clnt_reconnect, conn); - if (conn->reconnect == NULL) { - gf_log (conn->name, GF_LOG_WARNING, - "Cannot create rpc_clnt_reconnect timer"); - unref_clnt = _gf_true; - } - } + struct timespec ts = { + 0, + }; + gf_boolean_t unref_clnt = _gf_false; + uint64_t pre_notify_gen = 0, post_notify_gen = 0; + + pthread_mutex_lock(&conn->lock); + { + pre_notify_gen = conn->cleanup_gen; + } + pthread_mutex_unlock(&conn->lock); + + if (clnt->notifyfn) + clnt->notifyfn(clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); + + pthread_mutex_lock(&conn->lock); + { + post_notify_gen = conn->cleanup_gen; + } + pthread_mutex_unlock(&conn->lock); + + if (pre_notify_gen == post_notify_gen) { + /* program didn't invoke cleanup, so rpc has to do it */ + rpc_clnt_connection_cleanup(conn); + } + + pthread_mutex_lock(&conn->lock); + { + if (!conn->rpc_clnt->disabled && (conn->reconnect == NULL)) { + ts.tv_sec = 10; + ts.tv_nsec = 0; + + rpc_clnt_ref(clnt); + conn->reconnect = gf_timer_call_after(clnt->ctx, ts, + rpc_clnt_reconnect, conn); + if (conn->reconnect == NULL) { + gf_log(conn->name, GF_LOG_WARNING, + "Cannot create rpc_clnt_reconnect timer"); + unref_clnt = _gf_true; + } } - pthread_mutex_unlock (&conn->lock); - + } + pthread_mutex_unlock(&conn->lock); - if (unref_clnt) - rpc_clnt_unref (clnt); + if (unref_clnt) + rpc_clnt_unref(clnt); - return 0; + return 0; } int -rpc_clnt_notify (rpc_transport_t *trans, void *mydata, - rpc_transport_event_t event, void *data, ...) +rpc_clnt_notify(rpc_transport_t *trans, void *mydata, + rpc_transport_event_t event, void *data, ...) { - rpc_clnt_connection_t *conn = NULL; - struct rpc_clnt *clnt = NULL; - int ret = -1; - rpc_request_info_t *req_info = NULL; - rpc_transport_pollin_t *pollin = NULL; - void *clnt_mydata = NULL; - DECLARE_OLD_THIS; - - conn = mydata; - if (conn == NULL) { - goto out; - } - clnt = conn->rpc_clnt; - if (!clnt) - goto out; - - RPC_THIS_SAVE (clnt->owner); - - switch (event) { - case RPC_TRANSPORT_DISCONNECT: - { - rpc_clnt_handle_disconnect (clnt, conn); - /* The auth_value was being reset to AUTH_GLUSTERFS_v2. - * if (clnt->auth_value) - * clnt->auth_value = AUTH_GLUSTERFS_v2; - * It should not be reset here. The disconnect during - * portmap request can race with handshake. If handshake - * happens first and disconnect later, auth_value would set - * to default value and it never sets back to actual auth_value - * supported by server. But it's important to set to lower - * version supported in the case where the server downgrades. - * So moving this code to RPC_TRANSPORT_CONNECT. Note that - * CONNECT cannot race with handshake as by nature it is - * serialized with handhake. An handshake can happen only - * on a connected transport and hence its strictly serialized. - */ - break; + rpc_clnt_connection_t *conn = NULL; + struct rpc_clnt *clnt = NULL; + int ret = -1; + rpc_request_info_t *req_info = NULL; + rpc_transport_pollin_t *pollin = NULL; + void *clnt_mydata = NULL; + DECLARE_OLD_THIS; + + conn = mydata; + if (conn == NULL) { + goto out; + } + clnt = conn->rpc_clnt; + if (!clnt) + goto out; + + RPC_THIS_SAVE(clnt->owner); + + switch (event) { + case RPC_TRANSPORT_DISCONNECT: { + rpc_clnt_handle_disconnect(clnt, conn); + /* The auth_value was being reset to AUTH_GLUSTERFS_v2. + * if (clnt->auth_value) + * clnt->auth_value = AUTH_GLUSTERFS_v2; + * It should not be reset here. The disconnect during + * portmap request can race with handshake. If handshake + * happens first and disconnect later, auth_value would set + * to default value and it never sets back to actual auth_value + * supported by server. But it's important to set to lower + * version supported in the case where the server downgrades. + * So moving this code to RPC_TRANSPORT_CONNECT. Note that + * CONNECT cannot race with handshake as by nature it is + * serialized with handhake. An handshake can happen only + * on a connected transport and hence its strictly serialized. + */ + break; } case RPC_TRANSPORT_CLEANUP: - if (clnt->notifyfn) { - clnt_mydata = clnt->mydata; - clnt->mydata = NULL; - ret = clnt->notifyfn (clnt, clnt_mydata, - RPC_CLNT_DESTROY, NULL); - if (ret < 0) { - gf_log (trans->name, GF_LOG_WARNING, - "client notify handler returned error " - "while handling RPC_CLNT_DESTROY"); - } + if (clnt->notifyfn) { + clnt_mydata = clnt->mydata; + clnt->mydata = NULL; + ret = clnt->notifyfn(clnt, clnt_mydata, RPC_CLNT_DESTROY, NULL); + if (ret < 0) { + gf_log(trans->name, GF_LOG_WARNING, + "client notify handler returned error " + "while handling RPC_CLNT_DESTROY"); } - rpc_clnt_destroy (clnt); - ret = 0; - break; - - case RPC_TRANSPORT_MAP_XID_REQUEST: - { - req_info = data; - ret = rpc_clnt_fill_request_info (clnt, req_info); - break; - } - - case RPC_TRANSPORT_MSG_RECEIVED: - { - clock_gettime (CLOCK_REALTIME, &conn->last_received); - - pollin = data; - if (pollin->is_reply) - ret = rpc_clnt_handle_reply (clnt, pollin); - else - ret = rpc_clnt_handle_cbk (clnt, pollin); - /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG, - * data); - */ - break; - } + } + rpc_clnt_destroy(clnt); + ret = 0; + break; + + case RPC_TRANSPORT_MAP_XID_REQUEST: { + req_info = data; + ret = rpc_clnt_fill_request_info(clnt, req_info); + break; + } + + case RPC_TRANSPORT_MSG_RECEIVED: { + clock_gettime(CLOCK_REALTIME, &conn->last_received); + + pollin = data; + if (pollin->is_reply) + ret = rpc_clnt_handle_reply(clnt, pollin); + else + ret = rpc_clnt_handle_cbk(clnt, pollin); + /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG, + * data); + */ + break; + } + + case RPC_TRANSPORT_MSG_SENT: { + clock_gettime(CLOCK_REALTIME, &conn->last_sent); + + ret = 0; + break; + } + + case RPC_TRANSPORT_CONNECT: { + pthread_mutex_lock(&conn->lock); + { + /* Every time there is a disconnection, processes + * should try to connect to 'glusterd' (ie, default + * port) or whichever port given as 'option remote-port' + * in volume file. */ + /* Below code makes sure the (re-)configured port lasts + * for just one successful attempt */ + conn->config.remote_port = 0; + conn->connected = 1; + conn->disconnected = 0; + } + pthread_mutex_unlock(&conn->lock); - case RPC_TRANSPORT_MSG_SENT: - { - clock_gettime (CLOCK_REALTIME, &conn->last_sent); + /* auth value should be set to lower version available + * and will be set to appropriate version supported by + * server after the handshake. + */ + if (clnt->auth_value) + clnt->auth_value = AUTH_GLUSTERFS_v2; + if (clnt->notifyfn) + ret = clnt->notifyfn(clnt, clnt->mydata, RPC_CLNT_CONNECT, + NULL); - ret = 0; - break; - } - - case RPC_TRANSPORT_CONNECT: - { - pthread_mutex_lock (&conn->lock); - { - /* Every time there is a disconnection, processes - * should try to connect to 'glusterd' (ie, default - * port) or whichever port given as 'option remote-port' - * in volume file. */ - /* Below code makes sure the (re-)configured port lasts - * for just one successful attempt */ - conn->config.remote_port = 0; - conn->connected = 1; - conn->disconnected = 0; - } - pthread_mutex_unlock (&conn->lock); - - /* auth value should be set to lower version available - * and will be set to appropriate version supported by - * server after the handshake. - */ - if (clnt->auth_value) - clnt->auth_value = AUTH_GLUSTERFS_v2; - if (clnt->notifyfn) - ret = clnt->notifyfn (clnt, clnt->mydata, - RPC_CLNT_CONNECT, NULL); - - break; + break; } case RPC_TRANSPORT_ACCEPT: - /* only meaningful on a server, no need of handling this event - * in a client. - */ - ret = 0; - break; - } + /* only meaningful on a server, no need of handling this event + * in a client. + */ + ret = 0; + break; + } out: - RPC_THIS_RESTORE; - return ret; + RPC_THIS_RESTORE; + return ret; } static int -rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, - dict_t *options, char *name) +rpc_clnt_connection_init(struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, + dict_t *options, char *name) { - int ret = -1; - rpc_clnt_connection_t *conn = NULL; - rpc_transport_t *trans = NULL; - - conn = &clnt->conn; - pthread_mutex_init (&clnt->conn.lock, NULL); - - conn->name = gf_strdup (name); - if (!conn->name) { - ret = -1; - goto out; - } - - ret = dict_get_int32 (options, "frame-timeout", - &conn->frame_timeout); - if (ret >= 0) { - gf_log (name, GF_LOG_INFO, - "setting frame-timeout to %d", conn->frame_timeout); - } else { - gf_log (name, GF_LOG_DEBUG, - "defaulting frame-timeout to 30mins"); - conn->frame_timeout = 1800; - } - conn->rpc_clnt = clnt; + int ret = -1; + rpc_clnt_connection_t *conn = NULL; + rpc_transport_t *trans = NULL; - ret = dict_get_int32 (options, "ping-timeout", - &conn->ping_timeout); - if (ret >= 0) { - gf_log (name, GF_LOG_DEBUG, - "setting ping-timeout to %d", conn->ping_timeout); - } else { - /*TODO: Once the epoll thread model is fixed, - change the default ping-timeout to 30sec */ - gf_log (name, GF_LOG_DEBUG, - "disable ping-timeout"); - conn->ping_timeout = 0; - } - - trans = rpc_transport_load (ctx, options, name); - if (!trans) { - gf_log (name, GF_LOG_WARNING, "loading of new rpc-transport" - " failed"); - ret = -1; - goto out; - } - rpc_transport_ref (trans); - - pthread_mutex_lock (&conn->lock); - { - conn->trans = trans; - trans = NULL; - } - pthread_mutex_unlock (&conn->lock); - - ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify, - conn); - if (ret == -1) { - gf_log (name, GF_LOG_WARNING, "registering notify failed"); - goto out; - } + conn = &clnt->conn; + pthread_mutex_init(&clnt->conn.lock, NULL); - conn->saved_frames = saved_frames_new (); - if (!conn->saved_frames) { - gf_log (name, GF_LOG_WARNING, "creation of saved_frames " - "failed"); - ret = -1; - goto out; - } + conn->name = gf_strdup(name); + if (!conn->name) { + ret = -1; + goto out; + } + + ret = dict_get_int32(options, "frame-timeout", &conn->frame_timeout); + if (ret >= 0) { + gf_log(name, GF_LOG_INFO, "setting frame-timeout to %d", + conn->frame_timeout); + } else { + gf_log(name, GF_LOG_DEBUG, "defaulting frame-timeout to 30mins"); + conn->frame_timeout = 1800; + } + conn->rpc_clnt = clnt; + + ret = dict_get_int32(options, "ping-timeout", &conn->ping_timeout); + if (ret >= 0) { + gf_log(name, GF_LOG_DEBUG, "setting ping-timeout to %d", + conn->ping_timeout); + } else { + /*TODO: Once the epoll thread model is fixed, + change the default ping-timeout to 30sec */ + gf_log(name, GF_LOG_DEBUG, "disable ping-timeout"); + conn->ping_timeout = 0; + } + + trans = rpc_transport_load(ctx, options, name); + if (!trans) { + gf_log(name, GF_LOG_WARNING, + "loading of new rpc-transport" + " failed"); + ret = -1; + goto out; + } + rpc_transport_ref(trans); + + pthread_mutex_lock(&conn->lock); + { + conn->trans = trans; + trans = NULL; + } + pthread_mutex_unlock(&conn->lock); + + ret = rpc_transport_register_notify(conn->trans, rpc_clnt_notify, conn); + if (ret == -1) { + gf_log(name, GF_LOG_WARNING, "registering notify failed"); + goto out; + } + + conn->saved_frames = saved_frames_new(); + if (!conn->saved_frames) { + gf_log(name, GF_LOG_WARNING, + "creation of saved_frames " + "failed"); + ret = -1; + goto out; + } - ret = 0; + ret = 0; out: - if (ret) { - pthread_mutex_lock (&conn->lock); - { - trans = conn->trans; - conn->trans = NULL; - } - pthread_mutex_unlock (&conn->lock); - if (trans) - rpc_transport_unref (trans); - //conn cleanup needs to be done since we might have failed to - // register notification. - rpc_clnt_connection_cleanup (conn); - } - return ret; + if (ret) { + pthread_mutex_lock(&conn->lock); + { + trans = conn->trans; + conn->trans = NULL; + } + pthread_mutex_unlock(&conn->lock); + if (trans) + rpc_transport_unref(trans); + // conn cleanup needs to be done since we might have failed to + // register notification. + rpc_clnt_connection_cleanup(conn); + } + return ret; } struct rpc_clnt * -rpc_clnt_new (dict_t *options, xlator_t *owner, char *name, - uint32_t reqpool_size) +rpc_clnt_new(dict_t *options, xlator_t *owner, char *name, + uint32_t reqpool_size) { - int ret = -1; - struct rpc_clnt *rpc = NULL; - glusterfs_ctx_t *ctx = owner->ctx; - - - rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t); - if (!rpc) { - goto out; - } - - pthread_mutex_init (&rpc->lock, NULL); - rpc->ctx = ctx; - rpc->owner = owner; - GF_ATOMIC_INIT (rpc->xid, 1); - - if (!reqpool_size) - reqpool_size = RPC_CLNT_DEFAULT_REQUEST_COUNT; - - rpc->reqpool = mem_pool_new (struct rpc_req, reqpool_size); - if (rpc->reqpool == NULL) { - pthread_mutex_destroy (&rpc->lock); - GF_FREE (rpc); - rpc = NULL; - goto out; - } - - rpc->saved_frames_pool = mem_pool_new (struct saved_frame, - reqpool_size); - if (rpc->saved_frames_pool == NULL) { - pthread_mutex_destroy (&rpc->lock); - mem_pool_destroy (rpc->reqpool); - GF_FREE (rpc); - rpc = NULL; - goto out; - } - - ret = rpc_clnt_connection_init (rpc, ctx, options, name); - if (ret == -1) { - pthread_mutex_destroy (&rpc->lock); - mem_pool_destroy (rpc->reqpool); - mem_pool_destroy (rpc->saved_frames_pool); - GF_FREE (rpc); - rpc = NULL; - if (options) - dict_unref (options); - goto out; - } - - /* This is handled to make sure we have modularity in getting the - auth data changed */ - gf_boolean_t auth_null = dict_get_str_boolean(options, "auth-null", 0); - - rpc->auth_value = (auth_null) ? 0 : AUTH_GLUSTERFS_v2; - - rpc = rpc_clnt_ref (rpc); - INIT_LIST_HEAD (&rpc->programs); + int ret = -1; + struct rpc_clnt *rpc = NULL; + glusterfs_ctx_t *ctx = owner->ctx; + + rpc = GF_CALLOC(1, sizeof(*rpc), gf_common_mt_rpcclnt_t); + if (!rpc) { + goto out; + } + + pthread_mutex_init(&rpc->lock, NULL); + rpc->ctx = ctx; + rpc->owner = owner; + GF_ATOMIC_INIT(rpc->xid, 1); + + if (!reqpool_size) + reqpool_size = RPC_CLNT_DEFAULT_REQUEST_COUNT; + + rpc->reqpool = mem_pool_new(struct rpc_req, reqpool_size); + if (rpc->reqpool == NULL) { + pthread_mutex_destroy(&rpc->lock); + GF_FREE(rpc); + rpc = NULL; + goto out; + } + + rpc->saved_frames_pool = mem_pool_new(struct saved_frame, reqpool_size); + if (rpc->saved_frames_pool == NULL) { + pthread_mutex_destroy(&rpc->lock); + mem_pool_destroy(rpc->reqpool); + GF_FREE(rpc); + rpc = NULL; + goto out; + } + + ret = rpc_clnt_connection_init(rpc, ctx, options, name); + if (ret == -1) { + pthread_mutex_destroy(&rpc->lock); + mem_pool_destroy(rpc->reqpool); + mem_pool_destroy(rpc->saved_frames_pool); + GF_FREE(rpc); + rpc = NULL; + if (options) + dict_unref(options); + goto out; + } + + /* This is handled to make sure we have modularity in getting the + auth data changed */ + gf_boolean_t auth_null = dict_get_str_boolean(options, "auth-null", 0); + + rpc->auth_value = (auth_null) ? 0 : AUTH_GLUSTERFS_v2; + + rpc = rpc_clnt_ref(rpc); + INIT_LIST_HEAD(&rpc->programs); out: - return rpc; + return rpc; } - int -rpc_clnt_start (struct rpc_clnt *rpc) +rpc_clnt_start(struct rpc_clnt *rpc) { - struct rpc_clnt_connection *conn = NULL; + struct rpc_clnt_connection *conn = NULL; - if (!rpc) - return -1; + if (!rpc) + return -1; - conn = &rpc->conn; + conn = &rpc->conn; - pthread_mutex_lock (&conn->lock); - { - rpc->disabled = 0; - } - pthread_mutex_unlock (&conn->lock); - /* Corresponding unref will be either on successful timer cancel or last - * rpc_clnt_reconnect fire event. - */ - rpc_clnt_ref (rpc); - rpc_clnt_reconnect (conn); + pthread_mutex_lock(&conn->lock); + { + rpc->disabled = 0; + } + pthread_mutex_unlock(&conn->lock); + /* Corresponding unref will be either on successful timer cancel or last + * rpc_clnt_reconnect fire event. + */ + rpc_clnt_ref(rpc); + rpc_clnt_reconnect(conn); - return 0; + return 0; } - int -rpc_clnt_cleanup_and_start (struct rpc_clnt *rpc) +rpc_clnt_cleanup_and_start(struct rpc_clnt *rpc) { - struct rpc_clnt_connection *conn = NULL; + struct rpc_clnt_connection *conn = NULL; - if (!rpc) - return -1; + if (!rpc) + return -1; - conn = &rpc->conn; + conn = &rpc->conn; - rpc_clnt_connection_cleanup (conn); + rpc_clnt_connection_cleanup(conn); - pthread_mutex_lock (&conn->lock); - { - rpc->disabled = 0; - } - pthread_mutex_unlock (&conn->lock); - /* Corresponding unref will be either on successful timer cancel or last - * rpc_clnt_reconnect fire event. - */ - rpc_clnt_ref (rpc); - rpc_clnt_reconnect (conn); + pthread_mutex_lock(&conn->lock); + { + rpc->disabled = 0; + } + pthread_mutex_unlock(&conn->lock); + /* Corresponding unref will be either on successful timer cancel or last + * rpc_clnt_reconnect fire event. + */ + rpc_clnt_ref(rpc); + rpc_clnt_reconnect(conn); - return 0; + return 0; } - int -rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, - void *mydata) +rpc_clnt_register_notify(struct rpc_clnt *rpc, rpc_clnt_notify_t fn, + void *mydata) { - rpc->mydata = mydata; - rpc->notifyfn = fn; + rpc->mydata = mydata; + rpc->notifyfn = fn; - return 0; + return 0; } /* used for GF_LOG_OCCASIONALLY() */ static int gf_auth_max_groups_log = 0; static inline int -setup_glusterfs_auth_param_v3 (call_frame_t *frame, - auth_glusterfs_params_v3 *au, - int lk_owner_len, char *owner_data) +setup_glusterfs_auth_param_v3(call_frame_t *frame, auth_glusterfs_params_v3 *au, + int lk_owner_len, char *owner_data) { - int ret = -1; - unsigned int max_groups = 0; - int max_lkowner_len = 0; - - au->pid = frame->root->pid; - au->uid = frame->root->uid; - au->gid = frame->root->gid; - - au->flags = frame->root->flags; - au->ctime_sec = frame->root->ctime.tv_sec; - au->ctime_nsec = frame->root->ctime.tv_nsec; - - au->lk_owner.lk_owner_val = owner_data; - au->lk_owner.lk_owner_len = lk_owner_len; - au->groups.groups_val = frame->root->groups; - au->groups.groups_len = frame->root->ngrps; - - /* The number of groups and the size of lk_owner depend on oneother. - * We can truncate the groups, but should not touch the lk_owner. */ - max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS (lk_owner_len, AUTH_GLUSTERFS_v3); - if (au->groups.groups_len > max_groups) { - GF_LOG_OCCASIONALLY (gf_auth_max_groups_log, "rpc-auth", - GF_LOG_WARNING, "truncating grouplist " - "from %d to %d", au->groups.groups_len, - max_groups); - - au->groups.groups_len = max_groups; - } - - max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER (au->groups.groups_len, - AUTH_GLUSTERFS_v3); - if (lk_owner_len > max_lkowner_len) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "lkowner field is too " - "big (%d), it does not fit in the rpc-header", - au->lk_owner.lk_owner_len); - errno = E2BIG; - goto out; - } - - ret = 0; + int ret = -1; + unsigned int max_groups = 0; + int max_lkowner_len = 0; + + au->pid = frame->root->pid; + au->uid = frame->root->uid; + au->gid = frame->root->gid; + + au->flags = frame->root->flags; + au->ctime_sec = frame->root->ctime.tv_sec; + au->ctime_nsec = frame->root->ctime.tv_nsec; + + au->lk_owner.lk_owner_val = owner_data; + au->lk_owner.lk_owner_len = lk_owner_len; + au->groups.groups_val = frame->root->groups; + au->groups.groups_len = frame->root->ngrps; + + /* The number of groups and the size of lk_owner depend on oneother. + * We can truncate the groups, but should not touch the lk_owner. */ + max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS(lk_owner_len, AUTH_GLUSTERFS_v3); + if (au->groups.groups_len > max_groups) { + GF_LOG_OCCASIONALLY(gf_auth_max_groups_log, "rpc-auth", GF_LOG_WARNING, + "truncating grouplist " + "from %d to %d", + au->groups.groups_len, max_groups); + + au->groups.groups_len = max_groups; + } + + max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER(au->groups.groups_len, + AUTH_GLUSTERFS_v3); + if (lk_owner_len > max_lkowner_len) { + gf_log("rpc-clnt", GF_LOG_ERROR, + "lkowner field is too " + "big (%d), it does not fit in the rpc-header", + au->lk_owner.lk_owner_len); + errno = E2BIG; + goto out; + } + + ret = 0; out: - return ret; + return ret; } static inline int -setup_glusterfs_auth_param_v2 (call_frame_t *frame, - auth_glusterfs_parms_v2 *au, - int lk_owner_len, char *owner_data) +setup_glusterfs_auth_param_v2(call_frame_t *frame, auth_glusterfs_parms_v2 *au, + int lk_owner_len, char *owner_data) { - unsigned int max_groups = 0; - int max_lkowner_len = 0; - int ret = -1; - - au->pid = frame->root->pid; - au->uid = frame->root->uid; - au->gid = frame->root->gid; - - au->lk_owner.lk_owner_val = owner_data; - au->lk_owner.lk_owner_len = lk_owner_len; - au->groups.groups_val = frame->root->groups; - au->groups.groups_len = frame->root->ngrps; - - /* The number of groups and the size of lk_owner depend on oneother. - * We can truncate the groups, but should not touch the lk_owner. */ - max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS (lk_owner_len, AUTH_GLUSTERFS_v2); - if (au->groups.groups_len > max_groups) { - GF_LOG_OCCASIONALLY (gf_auth_max_groups_log, "rpc-auth", - GF_LOG_WARNING, "truncating grouplist " - "from %d to %d", au->groups.groups_len, - max_groups); - - au->groups.groups_len = max_groups; - } - - max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER (au->groups.groups_len, - AUTH_GLUSTERFS_v2); - if (lk_owner_len > max_lkowner_len) { - gf_log ("rpc-auth", GF_LOG_ERROR, "lkowner field is too " - "big (%d), it does not fit in the rpc-header", - au->lk_owner.lk_owner_len); - errno = E2BIG; - goto out; - } - - ret = 0; + unsigned int max_groups = 0; + int max_lkowner_len = 0; + int ret = -1; + + au->pid = frame->root->pid; + au->uid = frame->root->uid; + au->gid = frame->root->gid; + + au->lk_owner.lk_owner_val = owner_data; + au->lk_owner.lk_owner_len = lk_owner_len; + au->groups.groups_val = frame->root->groups; + au->groups.groups_len = frame->root->ngrps; + + /* The number of groups and the size of lk_owner depend on oneother. + * We can truncate the groups, but should not touch the lk_owner. */ + max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS(lk_owner_len, AUTH_GLUSTERFS_v2); + if (au->groups.groups_len > max_groups) { + GF_LOG_OCCASIONALLY(gf_auth_max_groups_log, "rpc-auth", GF_LOG_WARNING, + "truncating grouplist " + "from %d to %d", + au->groups.groups_len, max_groups); + + au->groups.groups_len = max_groups; + } + + max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER(au->groups.groups_len, + AUTH_GLUSTERFS_v2); + if (lk_owner_len > max_lkowner_len) { + gf_log("rpc-auth", GF_LOG_ERROR, + "lkowner field is too " + "big (%d), it does not fit in the rpc-header", + au->lk_owner.lk_owner_len); + errno = E2BIG; + goto out; + } + + ret = 0; out: - return ret; + return ret; } - static ssize_t -xdr_serialize_glusterfs_auth (struct rpc_clnt *clnt, call_frame_t *frame, - char *dest) +xdr_serialize_glusterfs_auth(struct rpc_clnt *clnt, call_frame_t *frame, + char *dest) { - ssize_t ret = -1; - XDR xdr; - char owner[4] = {0,}; - int32_t pid = 0; - char *lk_owner_data = NULL; - int lk_owner_len = 0; - - if ((!dest)) - return -1; - - xdrmem_create (&xdr, dest, GF_MAX_AUTH_BYTES, XDR_ENCODE); - - if (frame->root->lk_owner.len) { - lk_owner_data = frame->root->lk_owner.data; - lk_owner_len = frame->root->lk_owner.len; - } else { - pid = frame->root->pid; - owner[0] = (char)(pid & 0xff); - owner[1] = (char)((pid >> 8) & 0xff); - owner[2] = (char)((pid >> 16) & 0xff); - owner[3] = (char)((pid >> 24) & 0xff); - - lk_owner_data = owner; - lk_owner_len = 4; - } - - if (clnt->auth_value == AUTH_GLUSTERFS_v2) { - auth_glusterfs_parms_v2 au_v2 = {0,}; - - ret = setup_glusterfs_auth_param_v2 (frame, &au_v2, - lk_owner_len, - lk_owner_data); - if (ret) - goto out; - if (!xdr_auth_glusterfs_parms_v2 (&xdr, &au_v2)) { - gf_log (THIS->name, GF_LOG_WARNING, - "failed to encode auth glusterfs elements"); - ret = -1; - goto out; - } - } else if (clnt->auth_value == AUTH_GLUSTERFS_v3) { - auth_glusterfs_params_v3 au_v3 = {0,}; - - ret = setup_glusterfs_auth_param_v3 (frame, &au_v3, - lk_owner_len, - lk_owner_data); - if (ret) - goto out; - - if (!xdr_auth_glusterfs_params_v3 (&xdr, &au_v3)) { - gf_log (THIS->name, GF_LOG_WARNING, - "failed to encode auth glusterfs elements"); - ret = -1; - goto out; - } - } else { - gf_log (THIS->name, GF_LOG_WARNING, - "failed to encode auth glusterfs elements"); - ret = -1; - goto out; - } + ssize_t ret = -1; + XDR xdr; + char owner[4] = { + 0, + }; + int32_t pid = 0; + char *lk_owner_data = NULL; + int lk_owner_len = 0; + + if ((!dest)) + return -1; + + xdrmem_create(&xdr, dest, GF_MAX_AUTH_BYTES, XDR_ENCODE); + + if (frame->root->lk_owner.len) { + lk_owner_data = frame->root->lk_owner.data; + lk_owner_len = frame->root->lk_owner.len; + } else { + pid = frame->root->pid; + owner[0] = (char)(pid & 0xff); + owner[1] = (char)((pid >> 8) & 0xff); + owner[2] = (char)((pid >> 16) & 0xff); + owner[3] = (char)((pid >> 24) & 0xff); + + lk_owner_data = owner; + lk_owner_len = 4; + } + + if (clnt->auth_value == AUTH_GLUSTERFS_v2) { + auth_glusterfs_parms_v2 au_v2 = { + 0, + }; + + ret = setup_glusterfs_auth_param_v2(frame, &au_v2, lk_owner_len, + lk_owner_data); + if (ret) + goto out; + if (!xdr_auth_glusterfs_parms_v2(&xdr, &au_v2)) { + gf_log(THIS->name, GF_LOG_WARNING, + "failed to encode auth glusterfs elements"); + ret = -1; + goto out; + } + } else if (clnt->auth_value == AUTH_GLUSTERFS_v3) { + auth_glusterfs_params_v3 au_v3 = { + 0, + }; + + ret = setup_glusterfs_auth_param_v3(frame, &au_v3, lk_owner_len, + lk_owner_data); + if (ret) + goto out; + + if (!xdr_auth_glusterfs_params_v3(&xdr, &au_v3)) { + gf_log(THIS->name, GF_LOG_WARNING, + "failed to encode auth glusterfs elements"); + ret = -1; + goto out; + } + } else { + gf_log(THIS->name, GF_LOG_WARNING, + "failed to encode auth glusterfs elements"); + ret = -1; + goto out; + } - ret = (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base)); + ret = (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base)); out: - return ret; + return ret; } - int -rpc_clnt_fill_request (struct rpc_clnt *clnt, int prognum, int progver, - int procnum, uint64_t xid, call_frame_t *fr, - struct rpc_msg *request, char *auth_data) +rpc_clnt_fill_request(struct rpc_clnt *clnt, int prognum, int progver, + int procnum, uint64_t xid, call_frame_t *fr, + struct rpc_msg *request, char *auth_data) { - int ret = -1; + int ret = -1; - if (!request) { - goto out; - } + if (!request) { + goto out; + } - memset (request, 0, sizeof (*request)); + memset(request, 0, sizeof(*request)); - request->rm_xid = xid; - request->rm_direction = CALL; + request->rm_xid = xid; + request->rm_direction = CALL; - request->rm_call.cb_rpcvers = 2; - request->rm_call.cb_prog = prognum; - request->rm_call.cb_vers = progver; - request->rm_call.cb_proc = procnum; + request->rm_call.cb_rpcvers = 2; + request->rm_call.cb_prog = prognum; + request->rm_call.cb_vers = progver; + request->rm_call.cb_proc = procnum; - if (!clnt->auth_value) { - request->rm_call.cb_cred.oa_flavor = AUTH_NULL; - request->rm_call.cb_cred.oa_base = NULL; - request->rm_call.cb_cred.oa_length = 0; - } else { - ret = xdr_serialize_glusterfs_auth (clnt, fr, auth_data); - if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_WARNING, - "cannot encode auth credentials"); - goto out; - } - - request->rm_call.cb_cred.oa_flavor = clnt->auth_value; - request->rm_call.cb_cred.oa_base = auth_data; - request->rm_call.cb_cred.oa_length = ret; + if (!clnt->auth_value) { + request->rm_call.cb_cred.oa_flavor = AUTH_NULL; + request->rm_call.cb_cred.oa_base = NULL; + request->rm_call.cb_cred.oa_length = 0; + } else { + ret = xdr_serialize_glusterfs_auth(clnt, fr, auth_data); + if (ret == -1) { + gf_log("rpc-clnt", GF_LOG_WARNING, + "cannot encode auth credentials"); + goto out; } - request->rm_call.cb_verf.oa_flavor = AUTH_NONE; - request->rm_call.cb_verf.oa_base = NULL; - request->rm_call.cb_verf.oa_length = 0; - ret = 0; + request->rm_call.cb_cred.oa_flavor = clnt->auth_value; + request->rm_call.cb_cred.oa_base = auth_data; + request->rm_call.cb_cred.oa_length = ret; + } + request->rm_call.cb_verf.oa_flavor = AUTH_NONE; + request->rm_call.cb_verf.oa_base = NULL; + request->rm_call.cb_verf.oa_length = 0; + + ret = 0; out: - return ret; + return ret; } - struct iovec -rpc_clnt_record_build_header (char *recordstart, size_t rlen, - struct rpc_msg *request, size_t payload) +rpc_clnt_record_build_header(char *recordstart, size_t rlen, + struct rpc_msg *request, size_t payload) { - struct iovec requesthdr = {0, }; - struct iovec txrecord = {0, 0}; - int ret = -1; - size_t fraglen = 0; - - ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr); - if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, - "Failed to create RPC request"); - goto out; - } - - fraglen = payload + requesthdr.iov_len; - gf_log ("rpc-clnt", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " - "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); - - - txrecord.iov_base = recordstart; - - /* Remember, this is only the vec for the RPC header and does not - * include the payload above. We needed the payload only to calculate - * the size of the full fragment. This size is sent in the fragment - * header. - */ - txrecord.iov_len = requesthdr.iov_len; + struct iovec requesthdr = { + 0, + }; + struct iovec txrecord = {0, 0}; + int ret = -1; + size_t fraglen = 0; + + ret = rpc_request_to_xdr(request, recordstart, rlen, &requesthdr); + if (ret == -1) { + gf_log("rpc-clnt", GF_LOG_DEBUG, "Failed to create RPC request"); + goto out; + } + + fraglen = payload + requesthdr.iov_len; + gf_log("rpc-clnt", GF_LOG_TRACE, + "Request fraglen %zu, payload: %zu, " + "rpc hdr: %zu", + fraglen, payload, requesthdr.iov_len); + + txrecord.iov_base = recordstart; + + /* Remember, this is only the vec for the RPC header and does not + * include the payload above. We needed the payload only to calculate + * the size of the full fragment. This size is sent in the fragment + * header. + */ + txrecord.iov_len = requesthdr.iov_len; out: - return txrecord; + return txrecord; } - struct iobuf * -rpc_clnt_record_build_record (struct rpc_clnt *clnt, call_frame_t *fr, - int prognum, int progver, - int procnum, size_t hdrsize, uint64_t xid, - struct iovec *recbuf) +rpc_clnt_record_build_record(struct rpc_clnt *clnt, call_frame_t *fr, + int prognum, int progver, int procnum, + size_t hdrsize, uint64_t xid, struct iovec *recbuf) { - struct rpc_msg request = {0, }; - struct iobuf *request_iob = NULL; - char *record = NULL; - struct iovec recordhdr = {0, }; - size_t pagesize = 0; - int ret = -1; - size_t xdr_size = 0; - char auth_data[GF_MAX_AUTH_BYTES] = {0, }; - - if ((!clnt) || (!recbuf)) { - goto out; - } - - /* Fill the rpc structure and XDR it into the buffer got above. */ - ret = rpc_clnt_fill_request (clnt, prognum, progver, procnum, - xid, fr, &request, auth_data); - - if (ret == -1) { - gf_log (clnt->conn.name, GF_LOG_WARNING, - "cannot build a rpc-request xid (%"PRIu64")", xid); - goto out; - } - - xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request); - - /* First, try to get a pointer into the buffer which the RPC - * layer can use. - */ - request_iob = iobuf_get2 (clnt->ctx->iobuf_pool, (xdr_size + hdrsize)); - if (!request_iob) { - goto out; - } - - pagesize = iobuf_pagesize (request_iob); - - record = iobuf_ptr (request_iob); /* Now we have it. */ - - recordhdr = rpc_clnt_record_build_header (record, pagesize, &request, - hdrsize); - - if (!recordhdr.iov_base) { - gf_log (clnt->conn.name, GF_LOG_ERROR, - "Failed to build record header"); - iobuf_unref (request_iob); - request_iob = NULL; - recbuf->iov_base = NULL; - goto out; - } - - recbuf->iov_base = recordhdr.iov_base; - recbuf->iov_len = recordhdr.iov_len; + struct rpc_msg request = { + 0, + }; + struct iobuf *request_iob = NULL; + char *record = NULL; + struct iovec recordhdr = { + 0, + }; + size_t pagesize = 0; + int ret = -1; + size_t xdr_size = 0; + char auth_data[GF_MAX_AUTH_BYTES] = { + 0, + }; + + if ((!clnt) || (!recbuf)) { + goto out; + } + + /* Fill the rpc structure and XDR it into the buffer got above. */ + ret = rpc_clnt_fill_request(clnt, prognum, progver, procnum, xid, fr, + &request, auth_data); + + if (ret == -1) { + gf_log(clnt->conn.name, GF_LOG_WARNING, + "cannot build a rpc-request xid (%" PRIu64 ")", xid); + goto out; + } + + xdr_size = xdr_sizeof((xdrproc_t)xdr_callmsg, &request); + + /* First, try to get a pointer into the buffer which the RPC + * layer can use. + */ + request_iob = iobuf_get2(clnt->ctx->iobuf_pool, (xdr_size + hdrsize)); + if (!request_iob) { + goto out; + } + + pagesize = iobuf_pagesize(request_iob); + + record = iobuf_ptr(request_iob); /* Now we have it. */ + + recordhdr = rpc_clnt_record_build_header(record, pagesize, &request, + hdrsize); + + if (!recordhdr.iov_base) { + gf_log(clnt->conn.name, GF_LOG_ERROR, "Failed to build record header"); + iobuf_unref(request_iob); + request_iob = NULL; + recbuf->iov_base = NULL; + goto out; + } + + recbuf->iov_base = recordhdr.iov_base; + recbuf->iov_len = recordhdr.iov_len; out: - return request_iob; + return request_iob; } - static inline struct iobuf * -rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame, - rpc_clnt_prog_t *prog, int procnum, size_t hdrlen, - struct iovec *rpchdr, uint64_t callid) +rpc_clnt_record(struct rpc_clnt *clnt, call_frame_t *call_frame, + rpc_clnt_prog_t *prog, int procnum, size_t hdrlen, + struct iovec *rpchdr, uint64_t callid) { + if (!prog || !rpchdr || !call_frame) { + return NULL; + } - if (!prog || !rpchdr || !call_frame) { - return NULL; - } - - return rpc_clnt_record_build_record (clnt, call_frame, - prog->prognum, - prog->progver, - procnum, hdrlen, - callid, rpchdr); + return rpc_clnt_record_build_record(clnt, call_frame, prog->prognum, + prog->progver, procnum, hdrlen, callid, + rpchdr); } int -rpcclnt_cbk_program_register (struct rpc_clnt *clnt, - rpcclnt_cb_program_t *program, void *mydata) +rpcclnt_cbk_program_register(struct rpc_clnt *clnt, + rpcclnt_cb_program_t *program, void *mydata) { - int ret = -1; - char already_registered = 0; - rpcclnt_cb_program_t *tmp = NULL; + int ret = -1; + char already_registered = 0; + rpcclnt_cb_program_t *tmp = NULL; - if (!clnt) - goto out; + if (!clnt) + goto out; - if (program->actors == NULL) - goto out; + if (program->actors == NULL) + goto out; - pthread_mutex_lock (&clnt->lock); + pthread_mutex_lock(&clnt->lock); + { + list_for_each_entry(tmp, &clnt->programs, program) { - list_for_each_entry (tmp, &clnt->programs, program) { - if ((program->prognum == tmp->prognum) - && (program->progver == tmp->progver)) { - already_registered = 1; - break; - } - } + if ((program->prognum == tmp->prognum) && + (program->progver == tmp->progver)) { + already_registered = 1; + break; + } } - pthread_mutex_unlock (&clnt->lock); + } + pthread_mutex_unlock(&clnt->lock); - if (already_registered) { - gf_log_callingfn (clnt->conn.name, GF_LOG_DEBUG, - "already registered"); - ret = 0; - goto out; - } + if (already_registered) { + gf_log_callingfn(clnt->conn.name, GF_LOG_DEBUG, "already registered"); + ret = 0; + goto out; + } - tmp = GF_MALLOC (sizeof (*tmp), - gf_common_mt_rpcclnt_cb_program_t); - if (tmp == NULL) { - goto out; - } + tmp = GF_MALLOC(sizeof(*tmp), gf_common_mt_rpcclnt_cb_program_t); + if (tmp == NULL) { + goto out; + } - memcpy (tmp, program, sizeof (*tmp)); - INIT_LIST_HEAD (&tmp->program); + memcpy(tmp, program, sizeof(*tmp)); + INIT_LIST_HEAD(&tmp->program); - tmp->mydata = mydata; + tmp->mydata = mydata; - pthread_mutex_lock (&clnt->lock); - { - list_add_tail (&tmp->program, &clnt->programs); - } - pthread_mutex_unlock (&clnt->lock); + pthread_mutex_lock(&clnt->lock); + { + list_add_tail(&tmp->program, &clnt->programs); + } + pthread_mutex_unlock(&clnt->lock); - ret = 0; - gf_log (clnt->conn.name, GF_LOG_DEBUG, - "New program registered: %s, Num: %d, Ver: %d", - program->progname, program->prognum, - program->progver); + ret = 0; + gf_log(clnt->conn.name, GF_LOG_DEBUG, + "New program registered: %s, Num: %d, Ver: %d", program->progname, + program->prognum, program->progver); out: - if (ret == -1 && clnt) { - gf_log (clnt->conn.name, GF_LOG_ERROR, - "Program registration failed:" - " %s, Num: %d, Ver: %d", - program->progname, - program->prognum, program->progver); - } - - return ret; + if (ret == -1 && clnt) { + gf_log(clnt->conn.name, GF_LOG_ERROR, + "Program registration failed:" + " %s, Num: %d, Ver: %d", + program->progname, program->prognum, program->progver); + } + + return ret; } - int -rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, - int procnum, fop_cbk_fn_t cbkfn, - struct iovec *proghdr, int proghdrcount, - struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *frame, struct iovec *rsphdr, - int rsphdr_count, struct iovec *rsp_payload, - int rsp_payload_count, struct iobref *rsp_iobref) +rpc_clnt_submit(struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum, + fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount, + struct iovec *progpayload, int progpayloadcount, + struct iobref *iobref, void *frame, struct iovec *rsphdr, + int rsphdr_count, struct iovec *rsp_payload, + int rsp_payload_count, struct iobref *rsp_iobref) { - rpc_clnt_connection_t *conn = NULL; - struct iobuf *request_iob = NULL; - struct iovec rpchdr = {0,}; - struct rpc_req *rpcreq = NULL; - rpc_transport_req_t req; - int ret = -1; - int proglen = 0; - char new_iobref = 0; - uint64_t callid = 0; - gf_boolean_t need_unref = _gf_false; - call_frame_t *cframe = frame; - - if (!rpc || !prog || !frame) { - goto out; - } - - conn = &rpc->conn; - - rpcreq = mem_get (rpc->reqpool); - if (rpcreq == NULL) { - goto out; - } - - memset (rpcreq, 0, sizeof (*rpcreq)); - memset (&req, 0, sizeof (req)); - + rpc_clnt_connection_t *conn = NULL; + struct iobuf *request_iob = NULL; + struct iovec rpchdr = { + 0, + }; + struct rpc_req *rpcreq = NULL; + rpc_transport_req_t req; + int ret = -1; + int proglen = 0; + char new_iobref = 0; + uint64_t callid = 0; + gf_boolean_t need_unref = _gf_false; + call_frame_t *cframe = frame; + + if (!rpc || !prog || !frame) { + goto out; + } + + conn = &rpc->conn; + + rpcreq = mem_get(rpc->reqpool); + if (rpcreq == NULL) { + goto out; + } + + memset(rpcreq, 0, sizeof(*rpcreq)); + memset(&req, 0, sizeof(req)); + + if (!iobref) { + iobref = iobref_new(); if (!iobref) { - iobref = iobref_new (); - if (!iobref) { - goto out; - } - - new_iobref = 1; - } - - callid = GF_ATOMIC_INC (rpc->xid); - - rpcreq->prog = prog; - rpcreq->procnum = procnum; - rpcreq->conn = conn; - rpcreq->xid = callid; - rpcreq->cbkfn = cbkfn; - - ret = -1; - - if (proghdr) { - proglen += iov_length (proghdr, proghdrcount); + goto out; + } + + new_iobref = 1; + } + + callid = GF_ATOMIC_INC(rpc->xid); + + rpcreq->prog = prog; + rpcreq->procnum = procnum; + rpcreq->conn = conn; + rpcreq->xid = callid; + rpcreq->cbkfn = cbkfn; + + ret = -1; + + if (proghdr) { + proglen += iov_length(proghdr, proghdrcount); + } + + request_iob = rpc_clnt_record(rpc, frame, prog, procnum, proglen, &rpchdr, + callid); + if (!request_iob) { + gf_log(conn->name, GF_LOG_WARNING, "cannot build rpc-record"); + goto out; + } + + iobref_add(iobref, request_iob); + + req.msg.rpchdr = &rpchdr; + req.msg.rpchdrcount = 1; + req.msg.proghdr = proghdr; + req.msg.proghdrcount = proghdrcount; + req.msg.progpayload = progpayload; + req.msg.progpayloadcount = progpayloadcount; + req.msg.iobref = iobref; + + req.rsp.rsphdr = rsphdr; + req.rsp.rsphdr_count = rsphdr_count; + req.rsp.rsp_payload = rsp_payload; + req.rsp.rsp_payload_count = rsp_payload_count; + req.rsp.rsp_iobref = rsp_iobref; + req.rpc_req = rpcreq; + + pthread_mutex_lock(&conn->lock); + { + if (conn->connected == 0 && !rpc->disabled) { + ret = rpc_transport_connect(conn->trans, conn->config.remote_port); + if (ret < 0) { + gf_log(conn->name, GF_LOG_WARNING, + "error returned while attempting to " + "connect to host:%s, port:%d", + conn->config.remote_host, conn->config.remote_port); + } } - request_iob = rpc_clnt_record (rpc, frame, prog, - procnum, proglen, - &rpchdr, callid); - if (!request_iob) { - gf_log (conn->name, GF_LOG_WARNING, - "cannot build rpc-record"); - goto out; + ret = rpc_transport_submit_request(conn->trans, &req); + if (ret == -1) { + gf_log(conn->name, GF_LOG_WARNING, + "failed to submit rpc-request " + "(unique: %" PRIu64 + ", XID: 0x%x Program: %s, " + "ProgVers: %d, Proc: %d) to rpc-transport (%s)", + cframe->root->unique, rpcreq->xid, rpcreq->prog->progname, + rpcreq->prog->progver, rpcreq->procnum, conn->name); } - iobref_add (iobref, request_iob); - - req.msg.rpchdr = &rpchdr; - req.msg.rpchdrcount = 1; - req.msg.proghdr = proghdr; - req.msg.proghdrcount = proghdrcount; - req.msg.progpayload = progpayload; - req.msg.progpayloadcount = progpayloadcount; - req.msg.iobref = iobref; + if ((ret >= 0) && frame) { + /* Save the frame in queue */ + __save_frame(rpc, frame, rpcreq); - req.rsp.rsphdr = rsphdr; - req.rsp.rsphdr_count = rsphdr_count; - req.rsp.rsp_payload = rsp_payload; - req.rsp.rsp_payload_count = rsp_payload_count; - req.rsp.rsp_iobref = rsp_iobref; - req.rpc_req = rpcreq; - - pthread_mutex_lock (&conn->lock); - { - if (conn->connected == 0 && !rpc->disabled) { - ret = rpc_transport_connect (conn->trans, - conn->config.remote_port); - if (ret < 0) { - gf_log (conn->name, GF_LOG_WARNING, - "error returned while attempting to " - "connect to host:%s, port:%d", - conn->config.remote_host, - conn->config.remote_port); - } - } + /* A ref on rpc-clnt object is taken while registering + * call_bail to timer in __save_frame. If it fails to + * register, it needs an unref and should happen outside + * conn->lock which otherwise leads to deadlocks */ + if (conn->timer == NULL) + need_unref = _gf_true; - ret = rpc_transport_submit_request (conn->trans, &req); - if (ret == -1) { - gf_log (conn->name, GF_LOG_WARNING, - "failed to submit rpc-request " - "(unique: %"PRIu64", XID: 0x%x Program: %s, " - "ProgVers: %d, Proc: %d) to rpc-transport (%s)", - cframe->root->unique, rpcreq->xid, - rpcreq->prog->progname, rpcreq->prog->progver, - rpcreq->procnum, conn->name); - } + conn->msgcnt++; - if ((ret >= 0) && frame) { - /* Save the frame in queue */ - __save_frame (rpc, frame, rpcreq); - - /* A ref on rpc-clnt object is taken while registering - * call_bail to timer in __save_frame. If it fails to - * register, it needs an unref and should happen outside - * conn->lock which otherwise leads to deadlocks */ - if (conn->timer == NULL) - need_unref = _gf_true; - - conn->msgcnt++; - - gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " - "(unique: %"PRIu64", XID: 0x%x, Program: %s, " - "ProgVers: %d, Proc: %d) to rpc-transport (%s)", - cframe->root->unique, rpcreq->xid, - rpcreq->prog->progname, rpcreq->prog->progver, - rpcreq->procnum, conn->name); - } + gf_log("rpc-clnt", GF_LOG_TRACE, + "submitted request " + "(unique: %" PRIu64 + ", XID: 0x%x, Program: %s, " + "ProgVers: %d, Proc: %d) to rpc-transport (%s)", + cframe->root->unique, rpcreq->xid, rpcreq->prog->progname, + rpcreq->prog->progver, rpcreq->procnum, conn->name); } - pthread_mutex_unlock (&conn->lock); + } + pthread_mutex_unlock(&conn->lock); - if (need_unref) - rpc_clnt_unref (rpc); + if (need_unref) + rpc_clnt_unref(rpc); - if (ret == -1) { - goto out; - } + if (ret == -1) { + goto out; + } - rpc_clnt_check_and_start_ping (rpc); - ret = 0; + rpc_clnt_check_and_start_ping(rpc); + ret = 0; out: - if (request_iob) { - iobuf_unref (request_iob); - } - - if (new_iobref && iobref) { - iobref_unref (iobref); - } - - if (frame && (ret == -1)) { - if (rpcreq) { - rpcreq->rpc_status = -1; - cbkfn (rpcreq, NULL, 0, frame); - mem_put (rpcreq); - } - } - return ret; + if (request_iob) { + iobuf_unref(request_iob); + } + + if (new_iobref && iobref) { + iobref_unref(iobref); + } + + if (frame && (ret == -1)) { + if (rpcreq) { + rpcreq->rpc_status = -1; + cbkfn(rpcreq, NULL, 0, frame); + mem_put(rpcreq); + } + } + return ret; } - struct rpc_clnt * -rpc_clnt_ref (struct rpc_clnt *rpc) +rpc_clnt_ref(struct rpc_clnt *rpc) { - if (!rpc) - return NULL; + if (!rpc) + return NULL; - GF_ATOMIC_INC (rpc->refcount); - return rpc; + GF_ATOMIC_INC(rpc->refcount); + return rpc; } - static void -rpc_clnt_trigger_destroy (struct rpc_clnt *rpc) +rpc_clnt_trigger_destroy(struct rpc_clnt *rpc) { - rpc_clnt_connection_t *conn = NULL; - rpc_transport_t *trans = NULL; - - if (!rpc) - return; + rpc_clnt_connection_t *conn = NULL; + rpc_transport_t *trans = NULL; - /* reading conn->trans outside conn->lock is OK, since this is the last - * ref*/ - conn = &rpc->conn; - trans = conn->trans; - rpc_clnt_disconnect (rpc); + if (!rpc) + return; - /* This is to account for rpc_clnt_disable that might have been called - * before rpc_clnt_unref */ - if (trans) { - /* set conn->trans to NULL before rpc_transport_unref - * as rpc_transport_unref can potentially free conn - */ - conn->trans = NULL; - rpc_transport_unref (trans); - } + /* reading conn->trans outside conn->lock is OK, since this is the last + * ref*/ + conn = &rpc->conn; + trans = conn->trans; + rpc_clnt_disconnect(rpc); + + /* This is to account for rpc_clnt_disable that might have been called + * before rpc_clnt_unref */ + if (trans) { + /* set conn->trans to NULL before rpc_transport_unref + * as rpc_transport_unref can potentially free conn + */ + conn->trans = NULL; + rpc_transport_unref(trans); + } } static void -rpc_clnt_destroy (struct rpc_clnt *rpc) +rpc_clnt_destroy(struct rpc_clnt *rpc) { - rpcclnt_cb_program_t *program = NULL; - rpcclnt_cb_program_t *tmp = NULL; - struct saved_frames *saved_frames = NULL; - rpc_clnt_connection_t *conn = NULL; - - if (!rpc) - return; - - conn = &rpc->conn; - GF_FREE (rpc->conn.name); - /* Access saved_frames in critical-section to avoid - crash in rpc_clnt_connection_cleanup at the time - of destroying saved frames - */ - pthread_mutex_lock (&conn->lock); - { - saved_frames = conn->saved_frames; - conn->saved_frames = NULL; - } - pthread_mutex_unlock (&conn->lock); + rpcclnt_cb_program_t *program = NULL; + rpcclnt_cb_program_t *tmp = NULL; + struct saved_frames *saved_frames = NULL; + rpc_clnt_connection_t *conn = NULL; - saved_frames_destroy (saved_frames); - pthread_mutex_destroy (&rpc->lock); - pthread_mutex_destroy (&rpc->conn.lock); - - /* mem-pool should be destroyed, otherwise, - it will cause huge memory leaks */ - mem_pool_destroy (rpc->reqpool); - mem_pool_destroy (rpc->saved_frames_pool); - - list_for_each_entry_safe (program, tmp, &rpc->programs, program) { - GF_FREE (program); - } - - GF_FREE (rpc); + if (!rpc) return; + + conn = &rpc->conn; + GF_FREE(rpc->conn.name); + /* Access saved_frames in critical-section to avoid + crash in rpc_clnt_connection_cleanup at the time + of destroying saved frames + */ + pthread_mutex_lock(&conn->lock); + { + saved_frames = conn->saved_frames; + conn->saved_frames = NULL; + } + pthread_mutex_unlock(&conn->lock); + + saved_frames_destroy(saved_frames); + pthread_mutex_destroy(&rpc->lock); + pthread_mutex_destroy(&rpc->conn.lock); + + /* mem-pool should be destroyed, otherwise, + it will cause huge memory leaks */ + mem_pool_destroy(rpc->reqpool); + mem_pool_destroy(rpc->saved_frames_pool); + + list_for_each_entry_safe(program, tmp, &rpc->programs, program) + { + GF_FREE(program); + } + + GF_FREE(rpc); + return; } struct rpc_clnt * -rpc_clnt_unref (struct rpc_clnt *rpc) +rpc_clnt_unref(struct rpc_clnt *rpc) { - int count = 0; + int count = 0; - if (!rpc) - return NULL; + if (!rpc) + return NULL; - count = GF_ATOMIC_DEC (rpc->refcount); + count = GF_ATOMIC_DEC(rpc->refcount); - if (!count) { - rpc_clnt_trigger_destroy (rpc); - return NULL; - } - return rpc; + if (!count) { + rpc_clnt_trigger_destroy(rpc); + return NULL; + } + return rpc; } - char -rpc_clnt_is_disabled (struct rpc_clnt *rpc) +rpc_clnt_is_disabled(struct rpc_clnt *rpc) { + rpc_clnt_connection_t *conn = NULL; + char disabled = 0; - rpc_clnt_connection_t *conn = NULL; - char disabled = 0; + if (!rpc) { + goto out; + } - if (!rpc) { - goto out; - } + conn = &rpc->conn; - conn = &rpc->conn; - - pthread_mutex_lock (&conn->lock); - { - disabled = rpc->disabled; - } - pthread_mutex_unlock (&conn->lock); + pthread_mutex_lock(&conn->lock); + { + disabled = rpc->disabled; + } + pthread_mutex_unlock(&conn->lock); out: - return disabled; + return disabled; } void -rpc_clnt_disable (struct rpc_clnt *rpc) +rpc_clnt_disable(struct rpc_clnt *rpc) { - rpc_clnt_connection_t *conn = NULL; - rpc_transport_t *trans = NULL; - int unref = 0; - int ret = 0; - gf_boolean_t timer_unref = _gf_false; - gf_boolean_t reconnect_unref = _gf_false; - - if (!rpc) { - goto out; - } - - conn = &rpc->conn; - - pthread_mutex_lock (&conn->lock); - { - rpc->disabled = 1; - - if (conn->timer) { - ret = gf_timer_call_cancel (rpc->ctx, conn->timer); - /* If the event is not fired and it actually cancelled - * the timer, do the unref else registered call back - * function will take care of it. - */ - if (!ret) - timer_unref = _gf_true; - conn->timer = NULL; - } - - if (conn->reconnect) { - ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); - if (!ret) - reconnect_unref = _gf_true; - conn->reconnect = NULL; - } - conn->connected = 0; - - unref = rpc_clnt_remove_ping_timer_locked (rpc); - trans = conn->trans; - - } - pthread_mutex_unlock (&conn->lock); - - if (trans) { - rpc_transport_disconnect (trans, _gf_true); - /* The auth_value was being reset to AUTH_GLUSTERFS_v2. - * if (clnt->auth_value) - * clnt->auth_value = AUTH_GLUSTERFS_v2; - * It should not be reset here. The disconnect during - * portmap request can race with handshake. If handshake - * happens first and disconnect later, auth_value would set - * to default value and it never sets back to actual auth_value - * supported by server. But it's important to set to lower - * version supported in the case where the server downgrades. - * So moving this code to RPC_TRANSPORT_CONNECT. Note that - * CONNECT cannot race with handshake as by nature it is - * serialized with handhake. An handshake can happen only - * on a connected transport and hence its strictly serialized. - */ - } + rpc_clnt_connection_t *conn = NULL; + rpc_transport_t *trans = NULL; + int unref = 0; + int ret = 0; + gf_boolean_t timer_unref = _gf_false; + gf_boolean_t reconnect_unref = _gf_false; + + if (!rpc) { + goto out; + } + + conn = &rpc->conn; + + pthread_mutex_lock(&conn->lock); + { + rpc->disabled = 1; + + if (conn->timer) { + ret = gf_timer_call_cancel(rpc->ctx, conn->timer); + /* If the event is not fired and it actually cancelled + * the timer, do the unref else registered call back + * function will take care of it. + */ + if (!ret) + timer_unref = _gf_true; + conn->timer = NULL; + } + + if (conn->reconnect) { + ret = gf_timer_call_cancel(rpc->ctx, conn->reconnect); + if (!ret) + reconnect_unref = _gf_true; + conn->reconnect = NULL; + } + conn->connected = 0; + + unref = rpc_clnt_remove_ping_timer_locked(rpc); + trans = conn->trans; + } + pthread_mutex_unlock(&conn->lock); + + if (trans) { + rpc_transport_disconnect(trans, _gf_true); + /* The auth_value was being reset to AUTH_GLUSTERFS_v2. + * if (clnt->auth_value) + * clnt->auth_value = AUTH_GLUSTERFS_v2; + * It should not be reset here. The disconnect during + * portmap request can race with handshake. If handshake + * happens first and disconnect later, auth_value would set + * to default value and it never sets back to actual auth_value + * supported by server. But it's important to set to lower + * version supported in the case where the server downgrades. + * So moving this code to RPC_TRANSPORT_CONNECT. Note that + * CONNECT cannot race with handshake as by nature it is + * serialized with handhake. An handshake can happen only + * on a connected transport and hence its strictly serialized. + */ + } - if (unref) - rpc_clnt_unref (rpc); + if (unref) + rpc_clnt_unref(rpc); - if (timer_unref) - rpc_clnt_unref (rpc); + if (timer_unref) + rpc_clnt_unref(rpc); - if (reconnect_unref) - rpc_clnt_unref (rpc); + if (reconnect_unref) + rpc_clnt_unref(rpc); out: - return; + return; } void -rpc_clnt_disconnect (struct rpc_clnt *rpc) +rpc_clnt_disconnect(struct rpc_clnt *rpc) { - rpc_clnt_connection_t *conn = NULL; - rpc_transport_t *trans = NULL; - int unref = 0; - int ret = 0; - gf_boolean_t timer_unref = _gf_false; - gf_boolean_t reconnect_unref = _gf_false; - - if (!rpc) - goto out; - - conn = &rpc->conn; - - pthread_mutex_lock (&conn->lock); - { - rpc->disabled = 1; - if (conn->timer) { - ret = gf_timer_call_cancel (rpc->ctx, conn->timer); - /* If the event is not fired and it actually cancelled - * the timer, do the unref else registered call back - * function will take care of unref. - */ - if (!ret) - timer_unref = _gf_true; - conn->timer = NULL; - } - - if (conn->reconnect) { - ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); - if (!ret) - reconnect_unref = _gf_true; - conn->reconnect = NULL; - } - conn->connected = 0; - - unref = rpc_clnt_remove_ping_timer_locked (rpc); - trans = conn->trans; - } - pthread_mutex_unlock (&conn->lock); - - if (trans) { - rpc_transport_disconnect (trans, _gf_true); - /* The auth_value was being reset to AUTH_GLUSTERFS_v2. - * if (clnt->auth_value) - * clnt->auth_value = AUTH_GLUSTERFS_v2; - * It should not be reset here. The disconnect during - * portmap request can race with handshake. If handshake - * happens first and disconnect later, auth_value would set - * to default value and it never sets back to actual auth_value - * supported by server. But it's important to set to lower - * version supported in the case where the server downgrades. - * So moving this code to RPC_TRANSPORT_CONNECT. Note that - * CONNECT cannot race with handshake as by nature it is - * serialized with handhake. An handshake can happen only - * on a connected transport and hence its strictly serialized. - */ - } - if (unref) - rpc_clnt_unref (rpc); + rpc_clnt_connection_t *conn = NULL; + rpc_transport_t *trans = NULL; + int unref = 0; + int ret = 0; + gf_boolean_t timer_unref = _gf_false; + gf_boolean_t reconnect_unref = _gf_false; + + if (!rpc) + goto out; + + conn = &rpc->conn; + + pthread_mutex_lock(&conn->lock); + { + rpc->disabled = 1; + if (conn->timer) { + ret = gf_timer_call_cancel(rpc->ctx, conn->timer); + /* If the event is not fired and it actually cancelled + * the timer, do the unref else registered call back + * function will take care of unref. + */ + if (!ret) + timer_unref = _gf_true; + conn->timer = NULL; + } + + if (conn->reconnect) { + ret = gf_timer_call_cancel(rpc->ctx, conn->reconnect); + if (!ret) + reconnect_unref = _gf_true; + conn->reconnect = NULL; + } + conn->connected = 0; + + unref = rpc_clnt_remove_ping_timer_locked(rpc); + trans = conn->trans; + } + pthread_mutex_unlock(&conn->lock); + + if (trans) { + rpc_transport_disconnect(trans, _gf_true); + /* The auth_value was being reset to AUTH_GLUSTERFS_v2. + * if (clnt->auth_value) + * clnt->auth_value = AUTH_GLUSTERFS_v2; + * It should not be reset here. The disconnect during + * portmap request can race with handshake. If handshake + * happens first and disconnect later, auth_value would set + * to default value and it never sets back to actual auth_value + * supported by server. But it's important to set to lower + * version supported in the case where the server downgrades. + * So moving this code to RPC_TRANSPORT_CONNECT. Note that + * CONNECT cannot race with handshake as by nature it is + * serialized with handhake. An handshake can happen only + * on a connected transport and hence its strictly serialized. + */ + } + if (unref) + rpc_clnt_unref(rpc); - if (timer_unref) - rpc_clnt_unref (rpc); + if (timer_unref) + rpc_clnt_unref(rpc); - if (reconnect_unref) - rpc_clnt_unref (rpc); + if (reconnect_unref) + rpc_clnt_unref(rpc); out: - return; + return; } - void -rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) +rpc_clnt_reconfig(struct rpc_clnt *rpc, struct rpc_clnt_config *config) { - if (config->ping_timeout) { - if (config->ping_timeout != rpc->conn.ping_timeout) - gf_log (rpc->conn.name, GF_LOG_INFO, - "changing ping timeout to %d (from %d)", - config->ping_timeout, - rpc->conn.ping_timeout); - - pthread_mutex_lock (&rpc->conn.lock); - { - rpc->conn.ping_timeout = config->ping_timeout; - } - pthread_mutex_unlock (&rpc->conn.lock); - - } - - if (config->rpc_timeout) { - if (config->rpc_timeout != rpc->conn.config.rpc_timeout) - gf_log (rpc->conn.name, GF_LOG_INFO, - "changing timeout to %d (from %d)", - config->rpc_timeout, - rpc->conn.config.rpc_timeout); - rpc->conn.config.rpc_timeout = config->rpc_timeout; - } - - if (config->remote_port) { - if (config->remote_port != rpc->conn.config.remote_port) - gf_log (rpc->conn.name, GF_LOG_INFO, - "changing port to %d (from %d)", - config->remote_port, - rpc->conn.config.remote_port); + if (config->ping_timeout) { + if (config->ping_timeout != rpc->conn.ping_timeout) + gf_log(rpc->conn.name, GF_LOG_INFO, + "changing ping timeout to %d (from %d)", + config->ping_timeout, rpc->conn.ping_timeout); - rpc->conn.config.remote_port = config->remote_port; + pthread_mutex_lock(&rpc->conn.lock); + { + rpc->conn.ping_timeout = config->ping_timeout; + } + pthread_mutex_unlock(&rpc->conn.lock); + } + + if (config->rpc_timeout) { + if (config->rpc_timeout != rpc->conn.config.rpc_timeout) + gf_log(rpc->conn.name, GF_LOG_INFO, + "changing timeout to %d (from %d)", config->rpc_timeout, + rpc->conn.config.rpc_timeout); + rpc->conn.config.rpc_timeout = config->rpc_timeout; + } + + if (config->remote_port) { + if (config->remote_port != rpc->conn.config.remote_port) + gf_log(rpc->conn.name, GF_LOG_INFO, "changing port to %d (from %d)", + config->remote_port, rpc->conn.config.remote_port); + + rpc->conn.config.remote_port = config->remote_port; + } + + if (config->remote_host) { + if (rpc->conn.config.remote_host) { + if (strcmp(rpc->conn.config.remote_host, config->remote_host)) + gf_log(rpc->conn.name, GF_LOG_INFO, + "changing hostname to %s (from %s)", config->remote_host, + rpc->conn.config.remote_host); + GF_FREE(rpc->conn.config.remote_host); + } else { + gf_log(rpc->conn.name, GF_LOG_INFO, "setting hostname to %s", + config->remote_host); } - if (config->remote_host) { - if (rpc->conn.config.remote_host) { - if (strcmp (rpc->conn.config.remote_host, - config->remote_host)) - gf_log (rpc->conn.name, GF_LOG_INFO, - "changing hostname to %s (from %s)", - config->remote_host, - rpc->conn.config.remote_host); - GF_FREE (rpc->conn.config.remote_host); - } else { - gf_log (rpc->conn.name, GF_LOG_INFO, - "setting hostname to %s", - config->remote_host); - } - - rpc->conn.config.remote_host = gf_strdup (config->remote_host); - } + rpc->conn.config.remote_host = gf_strdup(config->remote_host); + } } |