summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnand Avati <avati@redhat.com>2012-09-05 17:20:05 -0700
committerAnand Avati <avati@redhat.com>2012-12-04 14:46:04 -0800
commit98879ebdddd4ca77440defad6a73acf4fa1e75ab (patch)
tree041cfce881b40d22758a423542e380578dd8112f
parent741766c708f2a246854584c064d63d3fba67be90 (diff)
rpc-transport/socket: implement read-ahead of RPC headers
This reduces the number of read() system calls on the socket to complete the full RPC fragment reading. Change-Id: I421a53af195ead4aad70e09e0172a61ad7912d83 BUG: 821087 Signed-off-by: Anand Avati <avati@redhat.com> Reviewed-on: http://review.gluster.org/3855 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Reviewed-by: Amar Tumballi <amarts@redhat.com>
-rw-r--r--libglusterfs/src/common-utils.h22
-rw-r--r--rpc/rpc-transport/socket/src/socket.c115
-rw-r--r--rpc/rpc-transport/socket/src/socket.h7
3 files changed, 137 insertions, 7 deletions
diff --git a/libglusterfs/src/common-utils.h b/libglusterfs/src/common-utils.h
index 6d46a997105..75692309e14 100644
--- a/libglusterfs/src/common-utils.h
+++ b/libglusterfs/src/common-utils.h
@@ -349,6 +349,28 @@ iov_unload (char *buf, const struct iovec *vector, int count)
static inline size_t
+iov_load (const struct iovec *vector, int count, char *buf, int size)
+{
+ size_t left = size;
+ size_t cp = 0;
+ int ret = 0;
+ int i = 0;
+
+ while (left && i < count) {
+ cp = min (vector[i].iov_len, left);
+ if (vector[i].iov_base != buf + (size - left))
+ memcpy (vector[i].iov_base, buf + (size - left), cp);
+ ret += cp;
+ left -= cp;
+ if (left)
+ i++;
+ }
+
+ return ret;
+}
+
+
+static inline size_t
iov_copy (const struct iovec *dst, int dcnt,
const struct iovec *src, int scnt)
{
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 97612610975..ae574f5f599 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -308,6 +308,112 @@ done:
return ret;
}
+
+
+ssize_t
+__socket_ssl_readv (rpc_transport_t *this, struct iovec *opvector, int opcount)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ int ret = -1;
+
+ priv = this->private;
+ sock = priv->sock;
+
+ if (priv->use_ssl) {
+ ret = ssl_read_one (this, opvector->iov_base, opvector->iov_len);
+ } else {
+ ret = readv (sock, opvector, opcount);
+ }
+
+ return ret;
+}
+
+
+ssize_t
+__socket_ssl_read (rpc_transport_t *this, void *buf, size_t count)
+{
+ struct iovec iov = {0, };
+ int ret = -1;
+
+ iov.iov_base = buf;
+ iov.iov_len = count;
+
+ ret = __socket_ssl_readv (this, &iov, 1);
+
+ return ret;
+}
+
+
+int
+__socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount)
+{
+ socket_private_t *priv = NULL;
+ int sock = -1;
+ struct gf_sock_incoming *in = NULL;
+ int req_len = -1;
+ int ret = -1;
+
+ priv = this->private;
+ sock = priv->sock;
+ in = &priv->incoming;
+ req_len = iov_length (opvector, opcount);
+
+ if (in->record_state == SP_STATE_READING_FRAGHDR) {
+ in->ra_read = 0;
+ in->ra_served = 0;
+ in->ra_max = 0;
+ in->ra_buf = NULL;
+ goto uncached;
+ }
+
+ if (!in->ra_max) {
+ /* first call after passing SP_STATE_READING_FRAGHDR */
+ in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX);
+ /* Note that the in->iobuf is the primary iobuf into which
+ headers are read into. By using this itself as our
+ read-ahead cache, we can avoid memory copies in iov_load
+ */
+ in->ra_buf = iobuf_ptr (in->iobuf);
+ }
+
+ /* fill read-ahead */
+ if (in->ra_read < in->ra_max) {
+ ret = __socket_ssl_read (this, &in->ra_buf[in->ra_read],
+ (in->ra_max - in->ra_read));
+ if (ret > 0)
+ in->ra_read += ret;
+
+ /* we proceed to test if there is still cached data to
+ be served even if readahead could not progress */
+ }
+
+ /* serve cached */
+ if (in->ra_served < in->ra_read) {
+ ret = iov_load (opvector, opcount, &in->ra_buf[in->ra_served],
+ min (req_len, (in->ra_read - in->ra_served)));
+
+ in->ra_served += ret;
+ /* Do not read uncached and cached in the same call */
+ goto out;
+ }
+
+ if (in->ra_read < in->ra_max)
+ /* If there was no cached data to be served, (and we are
+ guaranteed to have already performed an attempt to progress
+ readahead above), and we have not yet read out the full
+ readahead capacity, then bail out for now without doing
+ the uncached read below (as that will overtake future cached
+ read)
+ */
+ goto out;
+uncached:
+ ret = __socket_ssl_readv (this, opvector, opcount);
+out:
+ return ret;
+}
+
+
/*
* return value:
* 0 = success (completed)
@@ -363,13 +469,8 @@ __socket_rwv (rpc_transport_t *this, struct iovec *vector, int count,
}
this->total_bytes_write += ret;
} else {
- if (priv->use_ssl) {
- ret = ssl_read_one(this,
- opvector->iov_base, opvector->iov_len);
- }
- else {
- ret = readv (sock, opvector, opcount);
- }
+ ret = __socket_cached_read (this, opvector, opcount);
+
if (ret == 0) {
gf_log(this->name,GF_LOG_DEBUG,"EOF on socket");
errno = ENODATA;
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 2c4b44cf466..78faad9038d 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -158,6 +158,8 @@ struct gf_sock_incoming_frag {
sp_rpcfrag_state_t state;
};
+#define GF_SOCKET_RA_MAX 1024
+
struct gf_sock_incoming {
sp_rpcrecord_state_t record_state;
struct gf_sock_incoming_frag frag;
@@ -175,6 +177,11 @@ struct gf_sock_incoming {
char complete_record;
msg_type_t msg_type;
size_t total_bytes_read;
+
+ size_t ra_read;
+ size_t ra_max;
+ size_t ra_served;
+ char *ra_buf;
};
typedef struct {