summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaghavendra G <raghavendra@gluster.com>2010-08-16 03:19:32 +0000
committerAnand V. Avati <avati@dev.gluster.com>2010-08-16 02:16:29 -0700
commitfac7244477c6f2537f9abf3e83abf5e77177e548 (patch)
tree54f523e9e4571a15a0a7392028760005375ddc28
parent5065b0824ed01cca59501d74c227b136a5b0e65e (diff)
rpc: move handling of fragment headers to socket.
- fragment headers are only specific to tcp and hence should be handled in transport-socket instead of by rpc layer. Signed-off-by: Raghavendra G <raghavendra@gluster.com> Signed-off-by: Anand V. Avati <avati@dev.gluster.com> BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c41
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c35
-rw-r--r--rpc/rpc-transport/socket/src/socket.c46
-rw-r--r--rpc/rpc-transport/socket/src/socket.h3
4 files changed, 52 insertions, 73 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 2bbbb43c4af..c75d4124707 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -1032,42 +1032,16 @@ out:
}
-void
-rpc_clnt_set_lastfrag (uint32_t *fragsize) {
- (*fragsize) |= 0x80000000U;
-}
-
-
-void
-rpc_clnt_set_frag_header_size (uint32_t size, char *haddr)
-{
- size = htonl (size);
- memcpy (haddr, &size, sizeof (size));
-}
-
-
-void
-rpc_clnt_set_last_frag_header_size (uint32_t size, char *haddr)
-{
- rpc_clnt_set_lastfrag (&size);
- rpc_clnt_set_frag_header_size (size, haddr);
-}
-
-
struct iovec
rpc_clnt_record_build_header (char *recordstart, size_t rlen,
struct rpc_msg *request, size_t payload)
{
struct iovec requesthdr = {0, };
struct iovec txrecord = {0, 0};
- size_t fraglen = 0;
int ret = -1;
+ size_t fraglen = 0;
- /* After leaving aside the 4 bytes for the fragment header, lets
- * encode the RPC reply structure into the buffer given to us.
- */
- ret = rpc_request_to_xdr (request, (recordstart + RPC_FRAGHDR_SIZE),
- rlen, &requesthdr);
+ ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr);
if (ret == -1) {
gf_log ("rpc-clnt", GF_LOG_DEBUG,
"Failed to create RPC request");
@@ -1078,16 +1052,7 @@ rpc_clnt_record_build_header (char *recordstart, size_t rlen,
gf_log ("rpc-clnt", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, "
"rpc hdr: %zu", fraglen, payload, requesthdr.iov_len);
- /* Since we're not spreading RPC records over mutiple fragments
- * we just set this fragment as the first and last fragment for this
- * record.
- */
- rpc_clnt_set_last_frag_header_size (fraglen, recordstart);
- /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE
- * we need to transmit the record with the fragment header, which starts
- * at recordstart.
- */
txrecord.iov_base = recordstart;
/* Remember, this is only the vec for the RPC header and does not
@@ -1095,7 +1060,7 @@ rpc_clnt_record_build_header (char *recordstart, size_t rlen,
* the size of the full fragment. This size is sent in the fragment
* header.
*/
- txrecord.iov_len = RPC_FRAGHDR_SIZE + requesthdr.iov_len;
+ txrecord.iov_len = requesthdr.iov_len;
out:
return txrecord;
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 687deefba26..aed8e4f13dd 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1236,26 +1236,6 @@ out:
}
-void
-rpcsvc_set_lastfrag (uint32_t *fragsize) {
- (*fragsize) |= 0x80000000U;
-}
-
-void
-rpcsvc_set_frag_header_size (uint32_t size, char *haddr)
-{
- size = htonl (size);
- memcpy (haddr, &size, sizeof (size));
-}
-
-void
-rpcsvc_set_last_frag_header_size (uint32_t size, char *haddr)
-{
- rpcsvc_set_lastfrag (&size);
- rpcsvc_set_frag_header_size (size, haddr);
-}
-
-
/* Given the RPC reply structure and the payload handed by the RPC program,
* encode the RPC record header into the buffer pointed by recordstart.
*/
@@ -1271,8 +1251,7 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen,
/* After leaving aside the 4 bytes for the fragment header, lets
* encode the RPC reply structure into the buffer given to us.
*/
- ret = rpc_reply_to_xdr (&reply,(recordstart + RPCSVC_FRAGHDR_SIZE),
- rlen, &replyhdr);
+ ret = rpc_reply_to_xdr (&reply, recordstart, rlen, &replyhdr);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to create RPC reply");
goto err;
@@ -1282,16 +1261,6 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen,
gf_log (GF_RPCSVC, GF_LOG_TRACE, "Reply fraglen %zu, payload: %zu, "
"rpc hdr: %zu", fraglen, payload, replyhdr.iov_len);
- /* Since we're not spreading RPC records over mutiple fragments
- * we just set this fragment as the first and last fragment for this
- * record.
- */
- rpcsvc_set_last_frag_header_size (fraglen, recordstart);
-
- /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE
- * we need to transmit the record with the fragment header, which starts
- * at recordstart.
- */
txrecord.iov_base = recordstart;
/* Remember, this is only the vec for the RPC header and does not
@@ -1299,7 +1268,7 @@ rpcsvc_record_build_header (char *recordstart, size_t rlen,
* the size of the full fragment. This size is sent in the fragment
* header.
*/
- txrecord.iov_len = RPCSVC_FRAGHDR_SIZE + replyhdr.iov_len;
+ txrecord.iov_len = replyhdr.iov_len;
err:
return txrecord;
}
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 31b5a82eae7..ef77c71ae00 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -45,6 +45,7 @@
#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR)
#define SA(ptr) ((struct sockaddr *)ptr)
+
#define __socket_proto_reset_pending(priv) do { \
memset (&priv->incoming.frag.vector, 0, \
sizeof (priv->incoming.frag.vector)); \
@@ -390,23 +391,64 @@ __socket_reset (rpc_transport_t *this)
}
+void
+socket_set_lastfrag (uint32_t *fragsize) {
+ (*fragsize) |= 0x80000000U;
+}
+
+
+void
+socket_set_frag_header_size (uint32_t size, char *haddr)
+{
+ size = htonl (size);
+ memcpy (haddr, &size, sizeof (size));
+}
+
+
+void
+socket_set_last_frag_header_size (uint32_t size, char *haddr)
+{
+ socket_set_lastfrag (&size);
+ socket_set_frag_header_size (size, haddr);
+}
+
struct ioq *
__socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg)
{
struct ioq *entry = NULL;
int count = 0;
+ uint32_t size = 0;
/* TODO: use mem-pool */
entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq);
if (!entry)
return NULL;
+
count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount;
- assert (count <= MAX_IOVEC);
+ assert (count <= (MAX_IOVEC - 1));
+
+ size = iov_length (msg->rpchdr, msg->rpchdrcount)
+ + iov_length (msg->proghdr, msg->proghdrcount)
+ + iov_length (msg->progpayload, msg->progpayloadcount);
+
+ if (size > RPC_MAX_FRAGMENT_SIZE) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "msg size (%u) bigger than the maximum allowed size on "
+ "sockets (%u)", size, RPC_MAX_FRAGMENT_SIZE);
+ GF_FREE (entry);
+ return NULL;
+ }
+
+ socket_set_last_frag_header_size (size, (char *)&entry->fraghdr);
+
+ entry->vector[0].iov_base = (char *)&entry->fraghdr;
+ entry->vector[0].iov_len = sizeof (entry->fraghdr);
+ entry->count = 1;
if (msg->rpchdr != NULL) {
- memcpy (&entry->vector[0], msg->rpchdr,
+ memcpy (&entry->vector[1], msg->rpchdr,
sizeof (struct iovec) * msg->rpchdrcount);
entry->count += msg->rpchdrcount;
}
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 5078b161e29..4016153e747 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -38,6 +38,8 @@
#define GF_DEFAULT_SOCKET_LISTEN_PORT 6969
+#define RPC_MAX_FRAGMENT_SIZE 0x7fffffff
+
/* This is the size set through setsockopt for
* both the TCP receive window size and the
* send buffer size.
@@ -97,6 +99,7 @@ struct ioq {
};
};
+ uint32_t fraghdr;
struct iovec vector[MAX_IOVEC];
int count;
struct iovec *pending_vector;