diff options
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 41 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 35 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 46 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 3 |
4 files changed, 52 insertions, 73 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 2bbbb43c4af..c75d4124707 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -1032,42 +1032,16 @@ out: } -void -rpc_clnt_set_lastfrag (uint32_t *fragsize) { - (*fragsize) |= 0x80000000U; -} - - -void -rpc_clnt_set_frag_header_size (uint32_t size, char *haddr) -{ - size = htonl (size); - memcpy (haddr, &size, sizeof (size)); -} - - -void -rpc_clnt_set_last_frag_header_size (uint32_t size, char *haddr) -{ - rpc_clnt_set_lastfrag (&size); - rpc_clnt_set_frag_header_size (size, haddr); -} - - struct iovec rpc_clnt_record_build_header (char *recordstart, size_t rlen, struct rpc_msg *request, size_t payload) { struct iovec requesthdr = {0, }; struct iovec txrecord = {0, 0}; - size_t fraglen = 0; int ret = -1; + size_t fraglen = 0; - /* After leaving aside the 4 bytes for the fragment header, lets - * encode the RPC reply structure into the buffer given to us. - */ - ret = rpc_request_to_xdr (request, (recordstart + RPC_FRAGHDR_SIZE), - rlen, &requesthdr); + ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr); if (ret == -1) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "Failed to create RPC request"); @@ -1078,16 +1052,7 @@ rpc_clnt_record_build_header (char *recordstart, size_t rlen, gf_log ("rpc-clnt", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); - /* Since we're not spreading RPC records over mutiple fragments - * we just set this fragment as the first and last fragment for this - * record. - */ - rpc_clnt_set_last_frag_header_size (fraglen, recordstart); - /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE - * we need to transmit the record with the fragment header, which starts - * at recordstart. - */ txrecord.iov_base = recordstart; /* Remember, this is only the vec for the RPC header and does not @@ -1095,7 +1060,7 @@ rpc_clnt_record_build_header (char *recordstart, size_t rlen, * the size of the full fragment. This size is sent in the fragment * header. */ - txrecord.iov_len = RPC_FRAGHDR_SIZE + requesthdr.iov_len; + txrecord.iov_len = requesthdr.iov_len; out: return txrecord; diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 687deefba26..aed8e4f13dd 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1236,26 +1236,6 @@ out: } -void -rpcsvc_set_lastfrag (uint32_t *fragsize) { - (*fragsize) |= 0x80000000U; -} - -void -rpcsvc_set_frag_header_size (uint32_t size, char *haddr) -{ - size = htonl (size); - memcpy (haddr, &size, sizeof (size)); -} - -void -rpcsvc_set_last_frag_header_size (uint32_t size, char *haddr) -{ - rpcsvc_set_lastfrag (&size); - rpcsvc_set_frag_header_size (size, haddr); -} - - /* Given the RPC reply structure and the payload handed by the RPC program, * encode the RPC record header into the buffer pointed by recordstart. */ @@ -1271,8 +1251,7 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen, /* After leaving aside the 4 bytes for the fragment header, lets * encode the RPC reply structure into the buffer given to us. */ - ret = rpc_reply_to_xdr (&reply,(recordstart + RPCSVC_FRAGHDR_SIZE), - rlen, &replyhdr); + ret = rpc_reply_to_xdr (&reply, recordstart, rlen, &replyhdr); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to create RPC reply"); goto err; @@ -1282,16 +1261,6 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen, gf_log (GF_RPCSVC, GF_LOG_TRACE, "Reply fraglen %zu, payload: %zu, " "rpc hdr: %zu", fraglen, payload, replyhdr.iov_len); - /* Since we're not spreading RPC records over mutiple fragments - * we just set this fragment as the first and last fragment for this - * record. - */ - rpcsvc_set_last_frag_header_size (fraglen, recordstart); - - /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE - * we need to transmit the record with the fragment header, which starts - * at recordstart. - */ txrecord.iov_base = recordstart; /* Remember, this is only the vec for the RPC header and does not @@ -1299,7 +1268,7 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen, * the size of the full fragment. This size is sent in the fragment * header. */ - txrecord.iov_len = RPCSVC_FRAGHDR_SIZE + replyhdr.iov_len; + txrecord.iov_len = replyhdr.iov_len; err: return txrecord; } diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 31b5a82eae7..ef77c71ae00 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -45,6 +45,7 @@ #define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR) #define SA(ptr) ((struct sockaddr *)ptr) + #define __socket_proto_reset_pending(priv) do { \ memset (&priv->incoming.frag.vector, 0, \ sizeof (priv->incoming.frag.vector)); \ @@ -390,23 +391,64 @@ __socket_reset (rpc_transport_t *this) } +void +socket_set_lastfrag (uint32_t *fragsize) { + (*fragsize) |= 0x80000000U; +} + + +void +socket_set_frag_header_size (uint32_t size, char *haddr) +{ + size = htonl (size); + memcpy (haddr, &size, sizeof (size)); +} + + +void +socket_set_last_frag_header_size (uint32_t size, char *haddr) +{ + socket_set_lastfrag (&size); + socket_set_frag_header_size (size, haddr); +} + struct ioq * __socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) { struct ioq *entry = NULL; int count = 0; + uint32_t size = 0; /* TODO: use mem-pool */ entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq); if (!entry) return NULL; + count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount; - assert (count <= MAX_IOVEC); + assert (count <= (MAX_IOVEC - 1)); + + size = iov_length (msg->rpchdr, msg->rpchdrcount) + + iov_length (msg->proghdr, msg->proghdrcount) + + iov_length (msg->progpayload, msg->progpayloadcount); + + if (size > RPC_MAX_FRAGMENT_SIZE) { + gf_log (this->name, GF_LOG_ERROR, + "msg size (%u) bigger than the maximum allowed size on " + "sockets (%u)", size, RPC_MAX_FRAGMENT_SIZE); + GF_FREE (entry); + return NULL; + } + + socket_set_last_frag_header_size (size, (char *)&entry->fraghdr); + + entry->vector[0].iov_base = (char *)&entry->fraghdr; + entry->vector[0].iov_len = sizeof (entry->fraghdr); + entry->count = 1; if (msg->rpchdr != NULL) { - memcpy (&entry->vector[0], msg->rpchdr, + memcpy (&entry->vector[1], msg->rpchdr, sizeof (struct iovec) * msg->rpchdrcount); entry->count += msg->rpchdrcount; } diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 5078b161e29..4016153e747 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -38,6 +38,8 @@ #define GF_DEFAULT_SOCKET_LISTEN_PORT 6969 +#define RPC_MAX_FRAGMENT_SIZE 0x7fffffff + /* This is the size set through setsockopt for * both the TCP receive window size and the * send buffer size. @@ -97,6 +99,7 @@ struct ioq { }; }; + uint32_t fraghdr; struct iovec vector[MAX_IOVEC]; int count; struct iovec *pending_vector; |