diff options
author | Amar Tumballi <amar@gluster.com> | 2010-05-04 00:36:24 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2010-05-03 23:40:05 -0700 |
commit | f75b76350747f5f58a4bbe704915c74979cc5ac3 (patch) | |
tree | af828761f51a8aa2612aa5eb9c9f0709869675c8 /transport/socket/src/socket.c | |
parent | 7504b0e623914d097933f0a613ba50e376131828 (diff) |
structuring of protocol - 2
* 'transports/' and 'auth/' moved to xlators/protocol/
* transport.{c,h}, authenticate.{c,h}, protocol.h moved to
xlators/protocol/lib/src/
Signed-off-by: Amar Tumballi <amar@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
Diffstat (limited to 'transport/socket/src/socket.c')
-rw-r--r-- | transport/socket/src/socket.c | 1552 |
1 files changed, 0 insertions, 1552 deletions
diff --git a/transport/socket/src/socket.c b/transport/socket/src/socket.c deleted file mode 100644 index 7f7f8093a7d..00000000000 --- a/transport/socket/src/socket.c +++ /dev/null @@ -1,1552 +0,0 @@ -/* - Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ - - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include "socket.h" -#include "name.h" -#include "dict.h" -#include "transport.h" -#include "logging.h" -#include "xlator.h" -#include "byte-order.h" -#include "common-utils.h" -#include "compat-errno.h" - -#include <fcntl.h> -#include <errno.h> -#include <netinet/tcp.h> - - -#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR) -#define SA(ptr) ((struct sockaddr *)ptr) - -int socket_init (transport_t *this); - -/* - * return value: - * 0 = success (completed) - * -1 = error - * > 0 = incomplete - */ - -int -__socket_rwv (transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count, - int write) -{ - socket_private_t *priv = NULL; - int sock = -1; - int ret = -1; - struct iovec *opvector = NULL; - int opcount = 0; - int moved = 0; - - priv = this->private; - sock = priv->sock; - - opvector = vector; - opcount = count; - - while (opcount) { - if (write) { - ret = writev (sock, opvector, opcount); - - if (ret == 0 || (ret == -1 && errno == EAGAIN)) { - /* done for now */ - break; - } - } else { - ret = readv (sock, opvector, opcount); - - if (ret == -1 && errno == EAGAIN) { - /* done for now */ - break; - } - } - - if (ret == 0) { - /* Mostly due to 'umount' in client */ - gf_log (this->xl->name, GF_LOG_TRACE, - "EOF from peer %s", this->peerinfo.identifier); - opcount = -1; - errno = ENOTCONN; - break; - } - - if (ret == -1) { - if (errno == EINTR) - continue; - - gf_log (this->xl->name, GF_LOG_TRACE, - "%s failed (%s)", write ? "writev" : "readv", - strerror (errno)); - opcount = -1; - break; - } - - moved = 0; - - while (moved < ret) { - if ((ret - moved) >= opvector[0].iov_len) { - moved += opvector[0].iov_len; - opvector++; - opcount--; - } else { - opvector[0].iov_len -= (ret - moved); - opvector[0].iov_base += (ret - moved); - moved += (ret - moved); - } - while (opcount && !opvector[0].iov_len) { - opvector++; - opcount--; - } - } - } - - if (pending_vector) - *pending_vector = opvector; - - if (pending_count) - *pending_count = opcount; - - return opcount; -} - - -int -__socket_readv (transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count) -{ - int ret = -1; - - ret = __socket_rwv (this, vector, count, - pending_vector, pending_count, 0); - - return ret; -} - - -int -__socket_writev (transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count) -{ - int ret = -1; - - ret = __socket_rwv (this, vector, count, - pending_vector, pending_count, 1); - - return ret; -} - - -int -__socket_disconnect (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - if (priv->sock != -1) { - ret = shutdown (priv->sock, SHUT_RDWR); - priv->connected = -1; - gf_log (this->xl->name, GF_LOG_TRACE, - "shutdown() returned %d. set connection state to -1", - ret); - } - - return ret; -} - - -int -__socket_server_bind (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - int opt = 1; - - priv = this->private; - - ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, - &opt, sizeof (opt)); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setsockopt() for SO_REUSEADDR failed (%s)", - strerror (errno)); - } - - ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr, - this->myinfo.sockaddr_len); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "binding to %s failed: %s", - this->myinfo.identifier, strerror (errno)); - if (errno == EADDRINUSE) { - gf_log (this->xl->name, GF_LOG_ERROR, - "Port is already in use"); - } - } - - return ret; -} - - -int -__socket_nonblock (int fd) -{ - int flags = 0; - int ret = -1; - - flags = fcntl (fd, F_GETFL); - - if (flags != -1) - ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); - - return ret; -} - - -int -__socket_nodelay (int fd) -{ - int on = 1; - int ret = -1; - - ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, - &on, sizeof (on)); - if (!ret) - gf_log ("", GF_LOG_TRACE, - "NODELAY enabled for socket %d", fd); - - return ret; -} - -int -__socket_connect_finish (int fd) -{ - int ret = -1; - int optval = 0; - socklen_t optlen = sizeof (int); - - ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen); - - if (ret == 0 && optval) { - errno = optval; - ret = -1; - } - - return ret; -} - - -void -__socket_reset (transport_t *this) -{ - socket_private_t *priv = NULL; - - priv = this->private; - - /* TODO: use mem-pool on incoming data */ - - if (priv->incoming.hdr_p) - GF_FREE (priv->incoming.hdr_p); - - if (priv->incoming.iobuf) - iobuf_unref (priv->incoming.iobuf); - - memset (&priv->incoming, 0, sizeof (priv->incoming)); - - event_unregister (this->xl->ctx->event_pool, priv->sock, priv->idx); - close (priv->sock); - priv->sock = -1; - priv->idx = -1; - priv->connected = -1; -} - - -struct ioq * -__socket_ioq_new (transport_t *this, char *buf, int len, - struct iovec *vector, int count, struct iobref *iobref) -{ - socket_private_t *priv = NULL; - struct ioq *entry = NULL; - - priv = this->private; - - /* TODO: use mem-pool */ - entry = GF_CALLOC (1, sizeof (*entry), - gf_common_mt_ioq); - if (!entry) - return NULL; - - assert (count <= (MAX_IOVEC-2)); - - entry->header.colonO[0] = ':'; - entry->header.colonO[1] = 'O'; - entry->header.colonO[2] = '\0'; - entry->header.version = 42; - entry->header.size1 = hton32 (len); - entry->header.size2 = hton32 (iov_length (vector, count)); - - entry->vector[0].iov_base = &entry->header; - entry->vector[0].iov_len = sizeof (entry->header); - entry->count++; - - entry->vector[1].iov_base = buf; - entry->vector[1].iov_len = len; - entry->count++; - - if (vector && count) { - memcpy (&entry->vector[2], vector, sizeof (*vector) * count); - entry->count += count; - } - - entry->pending_vector = entry->vector; - entry->pending_count = entry->count; - - if (iobref) - entry->iobref = iobref_ref (iobref); - - entry->buf = buf; - - INIT_LIST_HEAD (&entry->list); - - return entry; -} - - -void -__socket_ioq_entry_free (struct ioq *entry) -{ - list_del_init (&entry->list); - if (entry->iobref) - iobref_unref (entry->iobref); - - /* TODO: use mem-pool */ - GF_FREE (entry->buf); - - /* TODO: use mem-pool */ - GF_FREE (entry); -} - - -void -__socket_ioq_flush (transport_t *this) -{ - socket_private_t *priv = NULL; - struct ioq *entry = NULL; - - priv = this->private; - - while (!list_empty (&priv->ioq)) { - entry = priv->ioq_next; - __socket_ioq_entry_free (entry); - } - - return; -} - - -int -__socket_ioq_churn_entry (transport_t *this, struct ioq *entry) -{ - int ret = -1; - - ret = __socket_writev (this, entry->pending_vector, - entry->pending_count, - &entry->pending_vector, - &entry->pending_count); - - if (ret == 0) { - /* current entry was completely written */ - assert (entry->pending_count == 0); - __socket_ioq_entry_free (entry); - } - - return ret; -} - - -int -__socket_ioq_churn (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = 0; - struct ioq *entry = NULL; - - priv = this->private; - - while (!list_empty (&priv->ioq)) { - /* pick next entry */ - entry = priv->ioq_next; - - ret = __socket_ioq_churn_entry (this, entry); - - if (ret != 0) - break; - } - - if (list_empty (&priv->ioq)) { - /* all pending writes done, not interested in POLLOUT */ - priv->idx = event_select_on (this->xl->ctx->event_pool, - priv->sock, priv->idx, -1, 0); - } - - return ret; -} - - -int -socket_event_poll_err (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - __socket_ioq_flush (this); - __socket_reset (this); - } - pthread_mutex_unlock (&priv->lock); - - xlator_notify (this->xl, GF_EVENT_POLLERR, this); - - return ret; -} - - -int -socket_event_poll_out (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected == 1) { - ret = __socket_ioq_churn (this); - - if (ret == -1) { - __socket_disconnect (this); - } - } - } - pthread_mutex_unlock (&priv->lock); - - xlator_notify (this->xl, GF_EVENT_POLLOUT, this); - - return ret; -} - - -int -__socket_proto_validate_header (transport_t *this, - struct socket_header *header, - size_t *size1_p, size_t *size2_p) -{ - size_t size1 = 0; - size_t size2 = 0; - - if (strcmp (header->colonO, ":O")) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket header signature does not match :O (%x.%x.%x)", - header->colonO[0], header->colonO[1], - header->colonO[2]); - return -1; - } - - if (header->version != 42) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket header version does not match 42 != %d", - header->version); - return -1; - } - - size1 = ntoh32 (header->size1); - size2 = ntoh32 (header->size2); - - if (size1 <= 0 || size1 > 1048576) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket header has incorrect size1=%"GF_PRI_SIZET, - size1); - return -1; - } - - if (size2 > (131072)) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket header has incorrect size2=%"GF_PRI_SIZET, - size2); - return -1; - } - - if (size1_p) - *size1_p = size1; - - if (size2_p) - *size2_p = size2; - - return 0; -} - - - -/* socket protocol state machine */ - -int -__socket_proto_state_machine (transport_t *this) -{ - int ret = -1; - socket_private_t *priv = NULL; - size_t size1 = 0; - size_t size2 = 0; - int previous_state = -1; - struct socket_header *hdr = NULL; - struct iobuf *iobuf = NULL; - - - priv = this->private; - - while (priv->incoming.state != SOCKET_PROTO_STATE_COMPLETE) { - /* debug check against infinite loops */ - if (previous_state == priv->incoming.state) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "state did not change! (%d) breaking", - previous_state); - ret = -1; - goto unlock; - } - previous_state = priv->incoming.state; - - switch (priv->incoming.state) { - - case SOCKET_PROTO_STATE_NADA: - priv->incoming.pending_vector = - priv->incoming.vector; - - priv->incoming.pending_vector->iov_base = - &priv->incoming.header; - - priv->incoming.pending_vector->iov_len = - sizeof (struct socket_header); - - priv->incoming.state = - SOCKET_PROTO_STATE_HEADER_COMING; - break; - - case SOCKET_PROTO_STATE_HEADER_COMING: - - ret = __socket_readv (this, - priv->incoming.pending_vector, 1, - &priv->incoming.pending_vector, - NULL); - if (ret == 0) { - priv->incoming.state = - SOCKET_PROTO_STATE_HEADER_CAME; - break; - } - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_TRACE, - "read (%s) in state %d (%s)", - strerror (errno), - SOCKET_PROTO_STATE_HEADER_COMING, - this->peerinfo.identifier); - goto unlock; - } - - if (ret > 0) { - gf_log (this->xl->name, GF_LOG_TRACE, - "partial header read on NB socket."); - goto unlock; - } - break; - - case SOCKET_PROTO_STATE_HEADER_CAME: - hdr = &priv->incoming.header; - ret = __socket_proto_validate_header (this, hdr, - &size1, &size2); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "socket header validate failed (%s). " - "possible mismatch of transport-type " - "between server and client volumes, " - "or version mismatch", - this->peerinfo.identifier); - goto unlock; - } - - priv->incoming.hdrlen = size1; - priv->incoming.buflen = size2; - - /* TODO: use mem-pool */ - priv->incoming.hdr_p = GF_MALLOC (size1, - gf_common_mt_char); - if (size2) { - /* TODO: sanity check size2 < page size - */ - iobuf = iobuf_get (this->xl->ctx->iobuf_pool); - if (!iobuf) { - gf_log (this->xl->name, GF_LOG_ERROR, - "unable to allocate IO buffer " - "for peer %s", - this->peerinfo.identifier); - ret = -ENOMEM; - goto unlock; - } - priv->incoming.iobuf = iobuf; - priv->incoming.buf_p = iobuf->ptr; - } - - priv->incoming.vector[0].iov_base = - priv->incoming.hdr_p; - - priv->incoming.vector[0].iov_len = size1; - - priv->incoming.vector[1].iov_base = - priv->incoming.buf_p; - - priv->incoming.vector[1].iov_len = size2; - priv->incoming.count = size2 ? 2 : 1; - - priv->incoming.pending_vector = - priv->incoming.vector; - - priv->incoming.pending_count = - priv->incoming.count; - - priv->incoming.state = - SOCKET_PROTO_STATE_DATA_COMING; - break; - - case SOCKET_PROTO_STATE_DATA_COMING: - - ret = __socket_readv (this, - priv->incoming.pending_vector, - priv->incoming.pending_count, - &priv->incoming.pending_vector, - &priv->incoming.pending_count); - if (ret == 0) { - priv->incoming.state = - SOCKET_PROTO_STATE_DATA_CAME; - break; - } - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "read (%s) in state %d (%s)", - strerror (errno), - SOCKET_PROTO_STATE_DATA_COMING, - this->peerinfo.identifier); - goto unlock; - } - - if (ret > 0) { - gf_log (this->xl->name, GF_LOG_TRACE, - "partial data read on NB socket"); - goto unlock; - } - break; - - case SOCKET_PROTO_STATE_DATA_CAME: - memset (&priv->incoming.vector, 0, - sizeof (priv->incoming.vector)); - priv->incoming.pending_vector = NULL; - priv->incoming.pending_count = 0; - priv->incoming.state = SOCKET_PROTO_STATE_COMPLETE; - break; - - case SOCKET_PROTO_STATE_COMPLETE: - /* not reached */ - break; - - default: - gf_log (this->xl->name, GF_LOG_DEBUG, - "undefined state reached: %d", - priv->incoming.state); - goto unlock; - } - } -unlock: - - return ret; -} - - -int -socket_proto_state_machine (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = 0; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - ret = __socket_proto_state_machine (this); - } - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_event_poll_in (transport_t *this) -{ - int ret = -1; - - ret = socket_proto_state_machine (this); - - /* call POLLIN on xlator even if complete block is not received, - just to keep the last_received timestamp ticking */ - - if (ret == 0) - ret = xlator_notify (this->xl, GF_EVENT_POLLIN, this); - - return ret; -} - - -int -socket_connect_finish (transport_t *this) -{ - int ret = -1; - socket_private_t *priv = NULL; - int event = -1; - char notify_xlator = 0; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected) - goto unlock; - - ret = __socket_connect_finish (priv->sock); - - if (ret == -1 && errno == EINPROGRESS) - ret = 1; - - if (ret == -1 && errno != EINPROGRESS) { - if (!priv->connect_finish_log) { - gf_log (this->xl->name, GF_LOG_ERROR, - "connection to %s failed (%s)", - this->peerinfo.identifier, - strerror (errno)); - priv->connect_finish_log = 1; - } - __socket_disconnect (this); - notify_xlator = 1; - event = GF_EVENT_POLLERR; - goto unlock; - } - - if (ret == 0) { - notify_xlator = 1; - - this->myinfo.sockaddr_len = - sizeof (this->myinfo.sockaddr); - - ret = getsockname (priv->sock, - SA (&this->myinfo.sockaddr), - &this->myinfo.sockaddr_len); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "getsockname on (%d) failed (%s)", - priv->sock, strerror (errno)); - __socket_disconnect (this); - event = GF_EVENT_POLLERR; - goto unlock; - } - - priv->connected = 1; - priv->connect_finish_log = 0; - event = GF_EVENT_CHILD_UP; - get_transport_identifiers (this); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - if (notify_xlator) - xlator_notify (this->xl, event, this); - - return 0; -} - - -int -socket_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = 0; - - this = data; - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - priv->idx = idx; - } - pthread_mutex_unlock (&priv->lock); - - if (!priv->connected) { - ret = socket_connect_finish (this); - } - - if (!ret && poll_out) { - ret = socket_event_poll_out (this); - } - - if (!ret && poll_in) { - ret = socket_event_poll_in (this); - } - - if (ret < 0 || poll_err) { - socket_event_poll_err (this); - transport_unref (this); - } - - return 0; -} - - -int -socket_server_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = 0; - int new_sock = -1; - transport_t *new_trans = NULL; - struct sockaddr_storage new_sockaddr = {0, }; - socklen_t addrlen = sizeof (new_sockaddr); - socket_private_t *new_priv = NULL; - glusterfs_ctx_t *ctx = NULL; - - this = data; - priv = this->private; - ctx = this->xl->ctx; - - pthread_mutex_lock (&priv->lock); - { - priv->idx = idx; - - if (poll_in) { - new_sock = accept (priv->sock, SA (&new_sockaddr), - &addrlen); - - if (new_sock == -1) - goto unlock; - - if (!priv->bio) { - ret = __socket_nonblock (new_sock); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "NBIO on %d failed (%s)", - new_sock, strerror (errno)); - close (new_sock); - goto unlock; - } - } - - if (priv->nodelay) { - ret = __socket_nodelay (new_sock); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setsockopt() failed for " - "NODELAY (%s)", - strerror (errno)); - } - } - - new_trans = GF_CALLOC (1, sizeof (*new_trans), - gf_common_mt_transport_t); - new_trans->xl = this->xl; - new_trans->fini = this->fini; - - memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, - addrlen); - new_trans->peerinfo.sockaddr_len = addrlen; - - new_trans->myinfo.sockaddr_len = - sizeof (new_trans->myinfo.sockaddr); - - ret = getsockname (new_sock, - SA (&new_trans->myinfo.sockaddr), - &new_trans->myinfo.sockaddr_len); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "getsockname on %d failed (%s)", - new_sock, strerror (errno)); - close (new_sock); - goto unlock; - } - - get_transport_identifiers (new_trans); - socket_init (new_trans); - new_trans->ops = this->ops; - new_trans->init = this->init; - new_trans->fini = this->fini; - - new_priv = new_trans->private; - - pthread_mutex_lock (&new_priv->lock); - { - new_priv->sock = new_sock; - new_priv->connected = 1; - - transport_ref (new_trans); - new_priv->idx = - event_register (ctx->event_pool, - new_sock, - socket_event_handler, - new_trans, 1, 0); - - if (new_priv->idx == -1) - ret = -1; - } - pthread_mutex_unlock (&new_priv->lock); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_disconnect (transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - ret = __socket_disconnect (this); - } - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_connect (transport_t *this) -{ - int ret = -1; - int sock = -1; - socket_private_t *priv = NULL; - struct sockaddr_storage sockaddr = {0, }; - socklen_t sockaddr_len = 0; - glusterfs_ctx_t *ctx = NULL; - sa_family_t sa_family = {0, }; - - priv = this->private; - ctx = this->xl->ctx; - - if (!priv) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "connect() called on uninitialized transport"); - goto err; - } - - pthread_mutex_lock (&priv->lock); - { - sock = priv->sock; - } - pthread_mutex_unlock (&priv->lock); - - if (sock != -1) { - gf_log (this->xl->name, GF_LOG_TRACE, - "connect () called on transport already connected"); - ret = 0; - goto err; - } - - ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr), - &sockaddr_len, &sa_family); - if (ret == -1) { - /* logged inside client_get_remote_sockaddr */ - goto err; - } - - pthread_mutex_lock (&priv->lock); - { - if (priv->sock != -1) { - gf_log (this->xl->name, GF_LOG_TRACE, - "connect() -- already connected"); - goto unlock; - } - - memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len); - this->peerinfo.sockaddr_len = sockaddr_len; - - priv->sock = socket (sa_family, SOCK_STREAM, 0); - if (priv->sock == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "socket creation failed (%s)", - strerror (errno)); - goto unlock; - } - - /* Cant help if setting socket options fails. We can continue - * working nonetheless. - */ - if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setting receive window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setting send window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - - if (priv->nodelay && priv->lowlat) { - ret = __socket_nodelay (priv->sock); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setsockopt() failed for NODELAY (%s)", - strerror (errno)); - } - } - - if (!priv->bio) { - ret = __socket_nonblock (priv->sock); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "NBIO on %d failed (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } - - SA (&this->myinfo.sockaddr)->sa_family = - SA (&this->peerinfo.sockaddr)->sa_family; - - ret = client_bind (this, SA (&this->myinfo.sockaddr), - &this->myinfo.sockaddr_len, priv->sock); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_WARNING, - "client bind failed: %s", strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - ret = connect (priv->sock, SA (&this->peerinfo.sockaddr), - this->peerinfo.sockaddr_len); - - if (ret == -1 && errno != EINPROGRESS) { - gf_log (this->xl->name, GF_LOG_ERROR, - "connection attempt failed (%s)", - strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - priv->connected = 0; - - transport_ref (this); - - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_event_handler, this, 1, 1); - if (priv->idx == -1) - ret = -1; - } -unlock: - pthread_mutex_unlock (&priv->lock); - -err: - return ret; -} - - -int -socket_listen (transport_t *this) -{ - socket_private_t * priv = NULL; - int ret = -1; - int sock = -1; - struct sockaddr_storage sockaddr; - socklen_t sockaddr_len; - peer_info_t *myinfo = NULL; - glusterfs_ctx_t *ctx = NULL; - sa_family_t sa_family = {0, }; - - priv = this->private; - myinfo = &this->myinfo; - ctx = this->xl->ctx; - - pthread_mutex_lock (&priv->lock); - { - sock = priv->sock; - } - pthread_mutex_unlock (&priv->lock); - - if (sock != -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "alreading listening"); - return ret; - } - - ret = socket_server_get_local_sockaddr (this, SA (&sockaddr), - &sockaddr_len, &sa_family); - if (ret == -1) { - return ret; - } - - pthread_mutex_lock (&priv->lock); - { - if (priv->sock != -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "already listening"); - goto unlock; - } - - memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len); - myinfo->sockaddr_len = sockaddr_len; - - priv->sock = socket (sa_family, SOCK_STREAM, 0); - - if (priv->sock == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "socket creation failed (%s)", - strerror (errno)); - goto unlock; - } - - /* Cant help if setting socket options fails. We can continue - * working nonetheless. - */ - if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setting receive window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setting send window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - if (priv->nodelay) { - ret = __socket_nodelay (priv->sock); - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "setsockopt() failed for NODELAY (%s)", - strerror (errno)); - } - } - - if (!priv->bio) { - ret = __socket_nonblock (priv->sock); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "NBIO on %d failed (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } - - ret = __socket_server_bind (this); - - if (ret == -1) { - /* logged inside __socket_server_bind() */ - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - ret = listen (priv->sock, 10); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "could not set socket %d to listen mode (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - transport_ref (this); - - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_server_event_handler, - this, 1, 0); - - if (priv->idx == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "could not register socket %d with events", - priv->sock); - ret = -1; - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, - struct iobuf **iobuf_p) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected != 1) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "socket not connected to receive"); - goto unlock; - } - - if (!hdr_p || !hdrlen_p || !iobuf_p) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "bad parameters %p %p %p", - hdr_p, hdrlen_p, iobuf_p); - goto unlock; - } - - if (priv->incoming.state == SOCKET_PROTO_STATE_COMPLETE) { - *hdr_p = priv->incoming.hdr_p; - *hdrlen_p = priv->incoming.hdrlen; - *iobuf_p = priv->incoming.iobuf; - - memset (&priv->incoming, 0, sizeof (priv->incoming)); - priv->incoming.state = SOCKET_PROTO_STATE_NADA; - - ret = 0; - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -/* TODO: implement per transfer limit */ -int -socket_submit (transport_t *this, char *buf, int len, - struct iovec *vector, int count, - struct iobref *iobref) -{ - socket_private_t *priv = NULL; - int ret = -1; - char need_poll_out = 0; - char need_append = 1; - struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; - - priv = this->private; - ctx = this->xl->ctx; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected != 1) { - if (!priv->submit_log && !priv->connect_finish_log) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "not connected (priv->connected = %d)", - priv->connected); - priv->submit_log = 1; - } - goto unlock; - } - - priv->submit_log = 0; - entry = __socket_ioq_new (this, buf, len, vector, count, iobref); - if (!entry) - goto unlock; - - if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); - - if (ret == 0) - need_append = 0; - - if (ret > 0) - need_poll_out = 1; - } - - if (need_append) { - list_add_tail (&entry->list, &priv->ioq); - ret = 0; - } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -struct transport_ops tops = { - .listen = socket_listen, - .connect = socket_connect, - .disconnect = socket_disconnect, - .submit = socket_submit, - .receive = socket_receive -}; - - -int -socket_init (transport_t *this) -{ - socket_private_t *priv = NULL; - gf_boolean_t tmp_bool = 0; - uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; - char *optstr = NULL; - - if (this->private) { - gf_log (this->xl->name, GF_LOG_DEBUG, - "double init attempted"); - return -1; - } - - priv = GF_CALLOC (1, sizeof (*priv), - gf_common_mt_socket_private_t); - if (!priv) { - gf_log (this->xl->name, GF_LOG_ERROR, - "calloc (1, %"GF_PRI_SIZET") returned NULL", - sizeof (*priv)); - return -1; - } - - pthread_mutex_init (&priv->lock, NULL); - - priv->sock = -1; - priv->idx = -1; - priv->connected = -1; - - INIT_LIST_HEAD (&priv->ioq); - - if (dict_get (this->xl->options, "non-blocking-io")) { - optstr = data_to_str (dict_get (this->xl->options, - "non-blocking-io")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "'non-blocking-io' takes only boolean options," - " not taking any action"); - tmp_bool = 1; - } - priv->bio = 0; - if (!tmp_bool) { - priv->bio = 1; - gf_log (this->xl->name, GF_LOG_WARNING, - "disabling non-blocking IO"); - } - } - - optstr = NULL; - - // By default, we enable NODELAY - priv->nodelay = 1; - if (dict_get (this->xl->options, "transport.socket.nodelay")) { - optstr = data_to_str (dict_get (this->xl->options, - "transport.socket.nodelay")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->xl->name, GF_LOG_ERROR, - "'transport.socket.nodelay' takes only " - "boolean options, not taking any action"); - tmp_bool = 1; - } - if (!tmp_bool) { - priv->nodelay = 0; - gf_log (this->xl->name, GF_LOG_DEBUG, - "disabling nodelay"); - } - } - - - optstr = NULL; - if (dict_get_str (this->xl->options, "transport.window-size", - &optstr) == 0) { - if (gf_string2bytesize (optstr, &windowsize) != 0) { - gf_log (this->xl->name, GF_LOG_ERROR, - "invalid number format: %s", optstr); - return -1; - } - } - - optstr = NULL; - - if (dict_get_str (this->xl->options, "transport.socket.lowlat", - &optstr) == 0) { - priv->lowlat = 1; - } - - priv->windowsize = (int)windowsize; - this->private = priv; - - return 0; -} - - -void -fini (transport_t *this) -{ - socket_private_t *priv = this->private; - - gf_log (this->xl->name, GF_LOG_TRACE, - "transport %p destroyed", this); - - pthread_mutex_destroy (&priv->lock); - GF_FREE (priv); -} - -int32_t -mem_acct_init (xlator_t *this) -{ - int ret = -1; - - if (!this) - return ret; - - ret = xlator_mem_acct_init (this, gf_common_mt_end + 1); - - if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, "Memory accounting init" - "failed"); - return ret; - } - - return ret; -} - -int32_t -init (transport_t *this) -{ - int ret = -1; - - ret = socket_init (this); - - if (ret == -1) { - gf_log (this->xl->name, GF_LOG_DEBUG, "socket_init() failed"); - } - - return ret; -} - -struct volume_options options[] = { - { .key = {"remote-port", - "transport.remote-port", - "transport.socket.remote-port"}, - .type = GF_OPTION_TYPE_INT - }, - { .key = {"transport.socket.listen-port", "listen-port"}, - .type = GF_OPTION_TYPE_INT - }, - { .key = {"transport.socket.bind-address", "bind-address" }, - .type = GF_OPTION_TYPE_INTERNET_ADDRESS - }, - { .key = {"transport.socket.connect-path", "connect-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = {"transport.socket.bind-path", "bind-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = {"transport.socket.listen-path", "listen-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = { "transport.address-family", - "address-family" }, - .value = {"inet", "inet6", "inet/inet6", "inet6/inet", - "unix", "inet-sdp" }, - .type = GF_OPTION_TYPE_STR - }, - - { .key = {"non-blocking-io"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {"transport.window-size"}, - .type = GF_OPTION_TYPE_SIZET, - .min = GF_MIN_SOCKET_WINDOW_SIZE, - .max = GF_MAX_SOCKET_WINDOW_SIZE, - }, - { .key = {"transport.socket.nodelay"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {"transport.socket.lowlat"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {NULL} } -}; - |