diff options
author | Vikas Gorur <vikas@zresearch.com> | 2009-02-18 17:36:07 +0530 |
---|---|---|
committer | Vikas Gorur <vikas@zresearch.com> | 2009-02-18 17:36:07 +0530 |
commit | 77adf4cd648dce41f89469dd185deec6b6b53a0b (patch) | |
tree | 02e155a5753b398ee572b45793f889b538efab6b /transport | |
parent | f3b2e6580e5663292ee113c741343c8a43ee133f (diff) |
Added all files
Diffstat (limited to 'transport')
-rw-r--r-- | transport/Makefile.am | 3 | ||||
-rw-r--r-- | transport/ib-verbs/Makefile.am | 1 | ||||
-rw-r--r-- | transport/ib-verbs/src/Makefile.am | 15 | ||||
-rw-r--r-- | transport/ib-verbs/src/ib-verbs.c | 2392 | ||||
-rw-r--r-- | transport/ib-verbs/src/ib-verbs.h | 215 | ||||
-rw-r--r-- | transport/ib-verbs/src/name.c | 682 | ||||
-rw-r--r-- | transport/ib-verbs/src/name.h | 47 | ||||
-rw-r--r-- | transport/socket/Makefile.am | 1 | ||||
-rw-r--r-- | transport/socket/src/Makefile.am | 14 | ||||
-rw-r--r-- | transport/socket/src/name.c | 677 | ||||
-rw-r--r-- | transport/socket/src/name.h | 44 | ||||
-rw-r--r-- | transport/socket/src/socket.c | 1370 | ||||
-rw-r--r-- | transport/socket/src/socket.h | 106 |
13 files changed, 5567 insertions, 0 deletions
diff --git a/transport/Makefile.am b/transport/Makefile.am new file mode 100644 index 00000000000..e2f97437c12 --- /dev/null +++ b/transport/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = socket $(IBVERBS_SUBDIR) + +CLEANFILES = diff --git a/transport/ib-verbs/Makefile.am b/transport/ib-verbs/Makefile.am new file mode 100644 index 00000000000..f963effea22 --- /dev/null +++ b/transport/ib-verbs/Makefile.am @@ -0,0 +1 @@ +SUBDIRS = src
\ No newline at end of file diff --git a/transport/ib-verbs/src/Makefile.am b/transport/ib-verbs/src/Makefile.am new file mode 100644 index 00000000000..e6240090e92 --- /dev/null +++ b/transport/ib-verbs/src/Makefile.am @@ -0,0 +1,15 @@ +noinst_HEADERS = ib-verbs.h name.h + +transport_LTLIBRARIES = ib-verbs.la +transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/transport/ + +ib_verbs_la_LDFLAGS = -module -avoidversion + +ib_verbs_la_SOURCES = ib-verbs.c name.c +ib_verbs_la_LIBADD = -libverbs $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ + -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/transport/ib-verbs \ + -shared -nostartfiles + +CLEANFILES = *~ diff --git a/transport/ib-verbs/src/ib-verbs.c b/transport/ib-verbs/src/ib-verbs.c new file mode 100644 index 00000000000..b9329588eb3 --- /dev/null +++ b/transport/ib-verbs/src/ib-verbs.c @@ -0,0 +1,2392 @@ +/* + Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "dict.h" +#include "glusterfs.h" +#include "transport.h" +#include "protocol.h" +#include "logging.h" +#include "xlator.h" +#include "name.h" +#include "ib-verbs.h" +#include <signal.h> + +int32_t +gf_resolve_ip6 (const char *hostname, + uint16_t port, + int family, + void **dnscache, + struct addrinfo **addr_info); + +static uint16_t +ib_verbs_get_local_lid (struct ibv_context *context, + int32_t port) +{ + struct ibv_port_attr attr; + + if (ibv_query_port (context, port, &attr)) + return 0; + + return attr.lid; +} + + +static void +ib_verbs_put_post (ib_verbs_queue_t *queue, + ib_verbs_post_t *post) +{ + pthread_mutex_lock (&queue->lock); + if (post->prev) { + queue->active_count--; + post->prev->next = post->next; + } + if (post->next) + post->next->prev = post->prev; + post->prev = &queue->passive_posts; + post->next = post->prev->next; + post->prev->next = post; + post->next->prev = post; + queue->passive_count++; + pthread_mutex_unlock (&queue->lock); +} + + +static ib_verbs_post_t * +ib_verbs_new_post (ib_verbs_device_t *device, int32_t len) +{ + ib_verbs_post_t *post; + + post = (ib_verbs_post_t *) CALLOC (1, sizeof (*post)); + if (!post) + return NULL; + + post->buf_size = len; + + post->buf = valloc (len); + if (!post->buf) { + free (post); + return NULL; + } + + post->mr = ibv_reg_mr (device->pd, + post->buf, + post->buf_size, + IBV_ACCESS_LOCAL_WRITE); + if (!post->mr) { + free (post->buf); + free (post); + return NULL; + } + + return post; +} + + +static ib_verbs_post_t * +ib_verbs_get_post (ib_verbs_queue_t *queue) +{ + ib_verbs_post_t *post; + + pthread_mutex_lock (&queue->lock); + { + post = queue->passive_posts.next; + if (post == &queue->passive_posts) + post = NULL; + + if (post) { + if (post->prev) + post->prev->next = post->next; + if (post->next) + post->next->prev = post->prev; + post->prev = &queue->active_posts; + post->next = post->prev->next; + post->prev->next = post; + post->next->prev = post; + post->reused++; + queue->active_count++; + } + } + pthread_mutex_unlock (&queue->lock); + + return post; +} + +void +ib_verbs_destroy_post (ib_verbs_post_t *post) +{ + ibv_dereg_mr (post->mr); + free (post->buf); + free (post); +} + + +static int32_t +__ib_verbs_quota_get (ib_verbs_peer_t *peer) +{ + int32_t ret = -1; + ib_verbs_private_t *priv = peer->trans->private; + + if (priv->connected && peer->quota > 0) { + ret = peer->quota--; + } + + return ret; +} + +/* + static int32_t + ib_verbs_quota_get (ib_verbs_peer_t *peer) + { + int32_t ret = -1; + ib_verbs_private_t *priv = peer->trans->private; + + pthread_mutex_lock (&priv->write_mutex); + { + ret = __ib_verbs_quota_get (peer); + } + pthread_mutex_unlock (&priv->write_mutex); + + return ret; + } +*/ + +static void +__ib_verbs_ioq_entry_free (ib_verbs_ioq_t *entry) +{ + list_del_init (&entry->list); + if (entry->refs) + dict_unref (entry->refs); + + /* TODO: use mem-pool */ + free (entry->buf); + + /* TODO: use mem-pool */ + free (entry); +} + + +static void +__ib_verbs_ioq_flush (ib_verbs_peer_t *peer) +{ + ib_verbs_ioq_t *entry = NULL, *dummy = NULL; + + list_for_each_entry_safe (entry, dummy, &peer->ioq, list) { + __ib_verbs_ioq_entry_free (entry); + } +} + + +static int32_t +__ib_verbs_disconnect (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + int32_t ret = 0; + + if (priv->connected || priv->tcp_connected) { + fcntl (priv->sock, F_SETFL, O_NONBLOCK); + if (shutdown (priv->sock, SHUT_RDWR) != 0) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "shutdown () - error: %s", + strerror (errno)); + ret = -errno; + priv->tcp_connected = 0; + } + } + + return ret; +} + + +static int32_t +ib_verbs_post_send (struct ibv_qp *qp, + ib_verbs_post_t *post, + int32_t len) +{ + struct ibv_sge list = { + .addr = (unsigned long) post->buf, + .length = len, + .lkey = post->mr->lkey + }; + + struct ibv_send_wr wr = { + .wr_id = (unsigned long) post, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, + }, *bad_wr; + + if (!qp) + return -1; + + return ibv_post_send (qp, &wr, &bad_wr); +} + + +static int32_t +__ib_verbs_ioq_churn_entry (ib_verbs_peer_t *peer, ib_verbs_ioq_t *entry) +{ + int32_t ret = 0, quota = 0; + ib_verbs_private_t *priv = peer->trans->private; + ib_verbs_device_t *device = priv->device; + ib_verbs_options_t *options = &priv->options; + ib_verbs_post_t *post = NULL; + int32_t len = 0; + + quota = __ib_verbs_quota_get (peer); + if (quota > 0) { + post = ib_verbs_get_post (&device->sendq); + if (!post) + post = ib_verbs_new_post (device, + (options->send_size + 2048)); + + len = iov_length ((const struct iovec *)&entry->vector, + entry->count); + if (len >= (options->send_size + 2048)) { + gf_log ("transport/ib-verbs", GF_LOG_CRITICAL, + "increase value of option 'transport.ib-verbs." + "work-request-send-size' (given=> %d) to send " + "bigger (%d) messages", + (options->send_size + 2048), len); + return -1; + } + + iov_unload (post->buf, + (const struct iovec *)&entry->vector, + entry->count); + + ret = ib_verbs_post_send (peer->qp, post, len); + if (!ret) { + __ib_verbs_ioq_entry_free (entry); + ret = len; + } else { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "ibv_post_send failed with ret = %d", ret); + ib_verbs_put_post (&device->sendq, post); + __ib_verbs_disconnect (peer->trans); + ret = -1; + } + } + + return ret; +} + + +static int32_t +__ib_verbs_ioq_churn (ib_verbs_peer_t *peer) +{ + ib_verbs_ioq_t *entry = NULL; + int32_t ret = 0; + + while (!list_empty (&peer->ioq)) + { + /* pick next entry */ + entry = peer->ioq_next; + + ret = __ib_verbs_ioq_churn_entry (peer, entry); + + if (ret <= 0) + break; + } + + /* + list_for_each_entry_safe (entry, dummy, &peer->ioq, list) { + ret = __ib_verbs_ioq_churn_entry (peer, entry); + if (ret <= 0) { + break; + } + } + */ + + return ret; +} + +static int32_t +__ib_verbs_quota_put (ib_verbs_peer_t *peer) +{ + int32_t ret; + + peer->quota++; + ret = peer->quota; + + if (!list_empty (&peer->ioq)) { + ret = __ib_verbs_ioq_churn (peer); + } + + return ret; +} + + +static int32_t +ib_verbs_quota_put (ib_verbs_peer_t *peer) +{ + int32_t ret; + ib_verbs_private_t *priv = peer->trans->private; + + pthread_mutex_lock (&priv->write_mutex); + { + ret = __ib_verbs_quota_put (peer); + } + pthread_mutex_unlock (&priv->write_mutex); + + return ret; +} + + +static int32_t +ib_verbs_post_recv (struct ibv_srq *srq, + ib_verbs_post_t *post) +{ + struct ibv_sge list = { + .addr = (unsigned long) post->buf, + .length = post->buf_size, + .lkey = post->mr->lkey + }; + + struct ibv_recv_wr wr = { + .wr_id = (unsigned long) post, + .sg_list = &list, + .num_sge = 1, + }, *bad_wr; + + return ibv_post_srq_recv (srq, &wr, &bad_wr); +} + + +static int32_t +ib_verbs_writev (transport_t *this, + ib_verbs_ioq_t *entry) +{ + int32_t ret = 0, need_append = 1; + ib_verbs_private_t *priv = this->private; + ib_verbs_peer_t *peer = NULL; + + pthread_mutex_lock (&priv->write_mutex); + { + if (!priv->connected) { + gf_log (this->xl->name, GF_LOG_ERROR, + "ib-verbs is not connected to post a " + "send request"); + ret = -1; + goto unlock; + } + + peer = &priv->peer; + if (list_empty (&peer->ioq)) { + ret = __ib_verbs_ioq_churn_entry (peer, entry); + if (ret > 0) { + need_append = 0; + } + } + + if (need_append) { + list_add_tail (&entry->list, &peer->ioq); + } + } +unlock: + pthread_mutex_unlock (&priv->write_mutex); + return ret; +} + + +static ib_verbs_ioq_t * +ib_verbs_ioq_new (char *buf, int len, struct iovec *vector, + int count, dict_t *refs) +{ + ib_verbs_ioq_t *entry = NULL; + + /* TODO: use mem-pool */ + entry = CALLOC (1, sizeof (*entry)); + + assert (count <= (MAX_IOVEC-2)); + + entry->header.colonO[0] = ':'; + entry->header.colonO[1] = 'O'; + entry->header.colonO[2] = '\0'; + entry->header.version = 42; + entry->header.size1 = hton32 (len); + entry->header.size2 = hton32 (iov_length (vector, count)); + + entry->vector[0].iov_base = &entry->header; + entry->vector[0].iov_len = sizeof (entry->header); + entry->count++; + + entry->vector[1].iov_base = buf; + entry->vector[1].iov_len = len; + entry->count++; + + if (vector && count) + { + memcpy (&entry->vector[2], vector, sizeof (*vector) * count); + entry->count += count; + } + + if (refs) + entry->refs = dict_ref (refs); + + entry->buf = buf; + + INIT_LIST_HEAD (&entry->list); + + return entry; +} + + +static int32_t +ib_verbs_submit (transport_t *this, char *buf, int32_t len, + struct iovec *vector, int count, dict_t *refs) +{ + int32_t ret = 0; + ib_verbs_ioq_t *entry = NULL; + + entry = ib_verbs_ioq_new (buf, len, vector, count, refs); + ret = ib_verbs_writev (this, entry); + + if (ret > 0) { + ret = 0; + } + + return ret; +} + +static int +ib_verbs_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, + char **buf_p, size_t *buflen_p) +{ + ib_verbs_private_t *priv = this->private; + /* TODO: return error if !priv->connected, check with locks */ + /* TODO: boundry checks for data_ptr/offset */ + char *copy_from = NULL; + ib_verbs_header_t *header = NULL; + uint32_t size1, size2, data_len = 0; + char *hdr = NULL, *buf = NULL; + int32_t ret = 0; + + pthread_mutex_lock (&priv->recv_mutex); + { +/* + while (!priv->data_ptr) + pthread_cond_wait (&priv->recv_cond, &priv->recv_mutex); +*/ + + copy_from = priv->data_ptr + priv->data_offset; + + priv->data_ptr = NULL; + data_len = priv->data_len; + /* pthread_cond_broadcast (&priv->recv_cond); */ + } + pthread_mutex_unlock (&priv->recv_mutex); + + header = (ib_verbs_header_t *)copy_from; + if (strcmp (header->colonO, ":O")) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "%s: corrupt header received", this->xl->name); + ret = -1; + goto err; + } + + size1 = ntoh32 (header->size1); + size2 = ntoh32 (header->size2); + + if (data_len != (size1 + size2 + sizeof (*header))) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "%s: sizeof data read from transport is not equal " + "to the size specified in the header", + this->xl->name); + ret = -1; + goto err; + } + + copy_from += sizeof (*header); + + if (size1) { + hdr = CALLOC (1, size1); + memcpy (hdr, copy_from, size1); + copy_from += size1; + *hdr_p = hdr; + } + *hdrlen_p = size1; + + if (size2) { + buf = CALLOC (1, size2); + memcpy (buf, copy_from, size2); + *buf_p = buf; + } + *buflen_p = size2; + +err: + return ret; +} + + +static void +ib_verbs_destroy_cq (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + ib_verbs_device_t *device = priv->device; + + if (device->recv_cq) + ibv_destroy_cq (device->recv_cq); + device->recv_cq = NULL; + + if (device->send_cq) + ibv_destroy_cq (device->send_cq); + device->send_cq = NULL; + + return; +} + + +static int32_t +ib_verbs_create_cq (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + ib_verbs_options_t *options = &priv->options; + ib_verbs_device_t *device = priv->device; + int32_t ret = 0; + + device->recv_cq = ibv_create_cq (priv->device->context, + options->recv_count * 2, + device, + device->recv_chan, + 0); + if (!device->recv_cq) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "%s: creation of CQ failed", + this->xl->name); + ret = -1; + } else if (ibv_req_notify_cq (device->recv_cq, 0)) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "%s: ibv_req_notify_cq on CQ failed", + this->xl->name); + ret = -1; + } + + do { + /* TODO: make send_cq size dynamically adaptive */ + device->send_cq = ibv_create_cq (priv->device->context, + options->send_count * 1024, + device, + device->send_chan, + 0); + if (!device->send_cq) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "%s: creation of send_cq failed", + this->xl->name); + ret = -1; + break; + } + + if (ibv_req_notify_cq (device->send_cq, 0)) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "%s: ibv_req_notify_cq on send_cq failed", + this->xl->name); + ret = -1; + break; + } + } while (0); + + if (ret != 0) + ib_verbs_destroy_cq (this); + + return ret; +} + + +static void +ib_verbs_register_peer (ib_verbs_device_t *device, + int32_t qp_num, + ib_verbs_peer_t *peer) +{ + struct _qpent *ent; + ib_verbs_qpreg_t *qpreg = &device->qpreg; + int32_t hash = qp_num % 42; + + pthread_mutex_lock (&qpreg->lock); + ent = qpreg->ents[hash].next; + while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) + ent = ent->next; + if (ent->qp_num == qp_num) { + pthread_mutex_unlock (&qpreg->lock); + return; + } + ent = (struct _qpent *) CALLOC (1, sizeof (*ent)); + ERR_ABORT (ent); + /* TODO: ref reg->peer */ + ent->peer = peer; + ent->next = &qpreg->ents[hash]; + ent->prev = ent->next->prev; + ent->next->prev = ent; + ent->prev->next = ent; + ent->qp_num = qp_num; + qpreg->count++; + pthread_mutex_unlock (&qpreg->lock); +} + + +static void +ib_verbs_unregister_peer (ib_verbs_device_t *device, + int32_t qp_num) +{ + struct _qpent *ent; + ib_verbs_qpreg_t *qpreg = &device->qpreg; + int32_t hash = qp_num % 42; + + pthread_mutex_lock (&qpreg->lock); + ent = qpreg->ents[hash].next; + while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) + ent = ent->next; + if (ent->qp_num != qp_num) { + pthread_mutex_unlock (&qpreg->lock); + return; + } + ent->prev->next = ent->next; + ent->next->prev = ent->prev; + /* TODO: unref reg->peer */ + free (ent); + qpreg->count--; + pthread_mutex_unlock (&qpreg->lock); +} + + +static ib_verbs_peer_t * +ib_verbs_lookup_peer (ib_verbs_device_t *device, + int32_t qp_num) +{ + struct _qpent *ent; + ib_verbs_qpreg_t *qpreg = &device->qpreg; + ib_verbs_peer_t *peer; + int32_t hash = qp_num % 42; + + pthread_mutex_lock (&qpreg->lock); + ent = qpreg->ents[hash].next; + while ((ent != &qpreg->ents[hash]) && (ent->qp_num != qp_num)) + ent = ent->next; + peer = ent->peer; + pthread_mutex_unlock (&qpreg->lock); + return peer; +} + + +static void +__ib_verbs_destroy_qp (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + + if (priv->peer.qp) { + ib_verbs_unregister_peer (priv->device, priv->peer.qp->qp_num); + ibv_destroy_qp (priv->peer.qp); + } + priv->peer.qp = NULL; + + return; +} + + +static int32_t +ib_verbs_create_qp (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + ib_verbs_options_t *options = &priv->options; + ib_verbs_device_t *device = priv->device; + int32_t ret = 0; + ib_verbs_peer_t *peer; + + peer = &priv->peer; + struct ibv_qp_init_attr init_attr = { + .send_cq = device->send_cq, + .recv_cq = device->recv_cq, + .srq = device->srq, + .cap = { + .max_send_wr = peer->send_count, + .max_recv_wr = peer->recv_count, + .max_send_sge = 1, + .max_recv_sge = 1 + }, + .qp_type = IBV_QPT_RC + }; + + struct ibv_qp_attr attr = { + .qp_state = IBV_QPS_INIT, + .pkey_index = 0, + .port_num = options->port, + .qp_access_flags = 0 + }; + + peer->qp = ibv_create_qp (device->pd, &init_attr); + if (!peer->qp) { + gf_log ("transport/ib-verbs", + GF_LOG_CRITICAL, + "%s: could not create QP", + this->xl->name); + ret = -1; + } else if (ibv_modify_qp (peer->qp, &attr, + IBV_QP_STATE | + IBV_QP_PKEY_INDEX | + IBV_QP_PORT | + IBV_QP_ACCESS_FLAGS)) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "%s: failed to modify QP to INIT state", + this->xl->name); + ret = -1; + } + + peer->local_lid = ib_verbs_get_local_lid (device->context, + options->port); + peer->local_qpn = peer->qp->qp_num; + peer->local_psn = lrand48 () & 0xffffff; + + ib_verbs_register_peer (device, peer->qp->qp_num, peer); + + if (ret == -1) + __ib_verbs_destroy_qp (this); + + return ret; +} + + +static void +ib_verbs_destroy_posts (transport_t *this) +{ + +} + + +static int32_t +__ib_verbs_create_posts (transport_t *this, + int32_t count, + int32_t size, + ib_verbs_queue_t *q) +{ + int32_t i; + int32_t ret = 0; + ib_verbs_private_t *priv = this->private; + ib_verbs_device_t *device = priv->device; + + for (i=0 ; i<count ; i++) { + ib_verbs_post_t *post; + + post = ib_verbs_new_post (device, size + 2048); + if (!post) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "%s: post creation failed", + this->xl->name); + ret = -1; + break; + } + + ib_verbs_put_post (q, post); + } + return ret; +} + + +static int32_t +ib_verbs_create_posts (transport_t *this) +{ + int32_t i, ret; + ib_verbs_post_t *post = NULL; + ib_verbs_private_t *priv = this->private; + ib_verbs_options_t *options = &priv->options; + ib_verbs_device_t *device = priv->device; + + ret = __ib_verbs_create_posts (this, options->send_count, + options->send_size, + &device->sendq); + if (!ret) + ret = __ib_verbs_create_posts (this, options->recv_count, + options->recv_size, + &device->recvq); + + if (!ret) { + for (i=0 ; i<options->recv_count ; i++) { + post = ib_verbs_get_post (&device->recvq); + if (ib_verbs_post_recv (device->srq, post) != 0) { + ret = -1; + break; + } + } + } + + if (ret) + ib_verbs_destroy_posts (this); + + return ret; +} + + +static int32_t +ib_verbs_connect_qp (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + ib_verbs_options_t *options = &priv->options; + struct ibv_qp_attr attr = { + .qp_state = IBV_QPS_RTR, + .path_mtu = options->mtu, + .dest_qp_num = priv->peer.remote_qpn, + .rq_psn = priv->peer.remote_psn, + .max_dest_rd_atomic = 1, + .min_rnr_timer = 12, + .ah_attr = { + .is_global = 0, + .dlid = priv->peer.remote_lid, + .sl = 0, + .src_path_bits = 0, + .port_num = options->port + } + }; + if (ibv_modify_qp (priv->peer.qp, &attr, + IBV_QP_STATE | + IBV_QP_AV | + IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | + IBV_QP_MAX_DEST_RD_ATOMIC | + IBV_QP_MIN_RNR_TIMER)) { + gf_log ("transport/ib-verbs", + GF_LOG_CRITICAL, + "Failed to modify QP to RTR\n"); + return -1; + } + + /* TODO: make timeout and retry_cnt configurable from options */ + attr.qp_state = IBV_QPS_RTS; + attr.timeout = 14; + attr.retry_cnt = 7; + attr.rnr_retry = 7; + attr.sq_psn = priv->peer.local_psn; + attr.max_rd_atomic = 1; + if (ibv_modify_qp (priv->peer.qp, &attr, + IBV_QP_STATE | + IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC)) { + gf_log ("transport/ib-verbs", + GF_LOG_CRITICAL, + "Failed to modify QP to RTS\n"); + return -1; + } + + return 0; +} + +static int32_t +__ib_verbs_teardown (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + + __ib_verbs_destroy_qp (this); + + if (!list_empty (&priv->peer.ioq)) { + __ib_verbs_ioq_flush (&priv->peer); + } + + /* TODO: decrement cq size */ + return 0; +} + +/* + * return value: + * 0 = success (completed) + * -1 = error + * > 0 = incomplete + */ + +static int +__tcp_rwv (transport_t *this, struct iovec *vector, int count, + struct iovec **pending_vector, int *pending_count, + int write) +{ + ib_verbs_private_t *priv = NULL; + int sock = -1; + int ret = -1; + struct iovec *opvector = vector; + int opcount = count; + int moved = 0; + + priv = this->private; + sock = priv->sock; + + while (opcount) + { + if (write) + { + ret = writev (sock, opvector, opcount); + + if (ret == 0 || (ret == -1 && errno == EAGAIN)) + { + /* done for now */ + break; + } + } + else + { + ret = readv (sock, opvector, opcount); + + if (ret == -1 && errno == EAGAIN) + { + /* done for now */ + break; + } + } + + if (ret == 0) + { + gf_log (this->xl->name, GF_LOG_ERROR, "EOF from peer"); + opcount = -1; + errno = ENOTCONN; + break; + } + + if (ret == -1) + { + if (errno == EINTR) + continue; + + gf_log (this->xl->name, GF_LOG_ERROR, + "%s failed (%s)", write ? "writev" : "readv", + strerror (errno)); + if (write && !priv->connected && + (errno == ECONNREFUSED)) + gf_log (this->xl->name, GF_LOG_ERROR, + "possible mismatch of 'transport-type'" + " in protocol server and client. " + "check volume file"); + opcount = -1; + break; + } + + moved = 0; + + while (moved < ret) + { + if ((ret - moved) >= opvector[0].iov_len) + { + moved += opvector[0].iov_len; + opvector++; + opcount--; + } + else + { + opvector[0].iov_len -= (ret - moved); + opvector[0].iov_base += (ret - moved); + moved += (ret - moved); + } + while (opcount && !opvector[0].iov_len) + { + opvector++; + opcount--; + } + } + } + + if (pending_vector) + *pending_vector = opvector; + + if (pending_count) + *pending_count = opcount; + + return opcount; +} + + +static int +__tcp_readv (transport_t *this, struct iovec *vector, int count, + struct iovec **pending_vector, int *pending_count) +{ + int ret = -1; + + ret = __tcp_rwv (this, vector, count, + pending_vector, pending_count, 0); + + return ret; +} + + +static int +__tcp_writev (transport_t *this, struct iovec *vector, int count, + struct iovec **pending_vector, int *pending_count) +{ + int ret = -1; + ib_verbs_private_t *priv = this->private; + + ret = __tcp_rwv (this, vector, count, pending_vector, + pending_count, 1); + + if (ret > 0) { + /* TODO: Avoid multiple calls when socket is already + registered for POLLOUT */ + priv->idx = event_select_on (this->xl->ctx->event_pool, + priv->sock, priv->idx, -1, 1); + } else if (ret == 0) { + priv->idx = event_select_on (this->xl->ctx->event_pool, + priv->sock, + priv->idx, -1, 0); + } + + return ret; +} + + +static void * +ib_verbs_recv_completion_proc (void *data) +{ + struct ibv_comp_channel *chan = data; + ib_verbs_private_t *priv = NULL; + ib_verbs_device_t *device; + ib_verbs_post_t *post; + ib_verbs_peer_t *peer; + struct ibv_cq *event_cq; + struct ibv_wc wc; + void *event_ctx; + int32_t ret = 0; + + + while (1) { + ret = ibv_get_cq_event (chan, &event_cq, &event_ctx); + if (ret) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "ibv_get_cq_event failed, terminating recv " + "thread %d (%d)", ret, errno); + continue; + } + + device = event_ctx; + + ret = ibv_req_notify_cq (event_cq, 0); + if (ret) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "ibv_req_notify_cq on %s failed, terminating " + "recv thread: %d (%d)", + device->device_name, ret, errno); + continue; + } + + device = (ib_verbs_device_t *) event_ctx; + + while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) { + post = (ib_verbs_post_t *) (long) wc.wr_id; + peer = ib_verbs_lookup_peer (device, wc.qp_num); + + if (wc.status != IBV_WC_SUCCESS) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "recv work request on `%s' returned " + "error (%d)", + device->device_name, + wc.status); + if (peer) + transport_disconnect (peer->trans); + + if (post) { + ib_verbs_post_recv (device->srq, post); + } + continue; + } + + if (peer) { + priv = peer->trans->private; + + pthread_mutex_lock (&priv->recv_mutex); + { +/* while (priv->data_ptr) + pthread_cond_wait (&priv->recv_cond, &priv->recv_mutex); +*/ + + priv->data_ptr = post->buf; + priv->data_offset = 0; + priv->data_len = wc.byte_len; + + /*pthread_cond_broadcast (&priv->recv_cond);*/ + } + pthread_mutex_unlock (&priv->recv_mutex); + + if ((ret = peer->trans->xl->notify (peer->trans->xl, GF_EVENT_POLLIN, + peer->trans, NULL)) == -1) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "pollin notification to %s " + "failed, disconnecting " + "transport", + peer->trans->xl->name); + transport_disconnect (peer->trans); + } + } else { + gf_log ("transport/ib-verbs", + GF_LOG_DEBUG, + "could not lookup peer for qp_num: %d", + wc.qp_num); + } + ib_verbs_post_recv (device->srq, post); + } + + if (ret < 0) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "ibv_poll_cq on `%s' returned error " + "(ret = %d, errno = %d)", + device->device_name, ret, errno); + continue; + } + ibv_ack_cq_events (event_cq, 1); + } + return NULL; +} + + +static void * +ib_verbs_send_completion_proc (void *data) +{ + struct ibv_comp_channel *chan = data; + ib_verbs_post_t *post; + ib_verbs_peer_t *peer; + struct ibv_cq *event_cq; + void *event_ctx; + ib_verbs_device_t *device; + struct ibv_wc wc; + int32_t ret; + + while (1) { + ret = ibv_get_cq_event (chan, &event_cq, &event_ctx); + if (ret) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "ibv_get_cq_event on failed, terminating " + "send thread: %d (%d)", ret, errno); + continue; + } + + device = event_ctx; + + ret = ibv_req_notify_cq (event_cq, 0); + if (ret) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "ibv_req_notify_cq on %s failed, terminating " + "send thread: %d (%d)", + device->device_name, ret, errno); + continue; + } + + while ((ret = ibv_poll_cq (event_cq, 1, &wc)) > 0) { + post = (ib_verbs_post_t *) (long) wc.wr_id; + peer = ib_verbs_lookup_peer (device, wc.qp_num); + + if (wc.status != IBV_WC_SUCCESS) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "send work request on `%s' returned " + "error wc.status = %d, wc.vendor_err " + "= %d, post->buf = %p, wc.byte_len = " + "%d, post->reused = %d", + device->device_name, wc.status, + wc.vendor_err, + post->buf, wc.byte_len, post->reused); + if (peer) + transport_disconnect (peer->trans); + } + + if (post) { + ib_verbs_put_post (&device->sendq, post); + } + + if (peer) { + int quota_ret = ib_verbs_quota_put (peer); + if (quota_ret < 0) { + gf_log ("ib-verbs", GF_LOG_WARNING, + "failed to send message"); + + } + } else { + gf_log ("transport/ib-verbs", GF_LOG_DEBUG, + "could not lookup peer for qp_num: %d", + wc.qp_num); + } + } + + if (ret < 0) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "ibv_poll_cq on `%s' returned error (ret = %d," + " errno = %d)", + device->device_name, ret, errno); + continue; + } + ibv_ack_cq_events (event_cq, 1); + } + + return NULL; +} + +static void +ib_verbs_options_init (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + ib_verbs_options_t *options = &priv->options; + int32_t mtu; + data_t *temp; + + /* TODO: validate arguments from options below */ + + options->send_size = 1048576; + options->recv_size = 1048576; + options->send_count = 16; + options->recv_count = 16; + + temp = dict_get (this->xl->options, + "transport.ib-verbs.work-request-send-count"); + if (temp) + options->send_count = data_to_int32 (temp); + + temp = dict_get (this->xl->options, + "transport.ib-verbs.work-request-recv-count"); + if (temp) + options->recv_count = data_to_int32 (temp); + + temp = dict_get (this->xl->options, + "transport.ib-verbs.work-request-send-size"); + if (temp) + options->send_size = data_to_int32 (temp); + + temp = dict_get (this->xl->options, + "transport.ib-verbs.work-request-recv-size"); + if (temp) + options->recv_size = data_to_int32 (temp); + + options->port = 1; + temp = dict_get (this->xl->options, + "transport.ib-verbs.port"); + if (temp) + options->port = data_to_uint64 (temp); + + options->mtu = mtu = IBV_MTU_2048; + temp = dict_get (this->xl->options, + "transport.ib-verbs.mtu"); + if (temp) + mtu = data_to_int32 (temp); + switch (mtu) { + case 256: options->mtu = IBV_MTU_256; + break; + case 512: options->mtu = IBV_MTU_512; + break; + case 1024: options->mtu = IBV_MTU_1024; + break; + case 2048: options->mtu = IBV_MTU_2048; + break; + case 4096: options->mtu = IBV_MTU_4096; + break; + default: + if (temp) + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "%s: unrecognized MTU value '%s', defaulting " + "to '2048'", this->xl->name, + data_to_str (temp)); + else + gf_log ("transport/ib-verbs", GF_LOG_DEBUG, + "%s: defaulting MTU to '2048'", + this->xl->name); + options->mtu = IBV_MTU_2048; + break; + } + + temp = dict_get (this->xl->options, + "transport.ib-verbs.device-name"); + if (temp) + options->device_name = strdup (temp->data); + + return; +} + +static void +ib_verbs_queue_init (ib_verbs_queue_t *queue) +{ + pthread_mutex_init (&queue->lock, NULL); + + queue->active_posts.next = &queue->active_posts; + queue->active_posts.prev = &queue->active_posts; + queue->passive_posts.next = &queue->passive_posts; + queue->passive_posts.prev = &queue->passive_posts; +} + + +static ib_verbs_device_t * +ib_verbs_get_device (transport_t *this, + struct ibv_device *ib_dev, + int32_t port) +{ + glusterfs_ctx_t *ctx = this->xl->ctx; + ib_verbs_private_t *priv = this->private; + ib_verbs_options_t *options = &priv->options; + char *device_name = priv->options.device_name; + int32_t ret = 0, i = 0; + + ib_verbs_device_t *trav; + + trav = ctx->ib; + while (trav) { + if ((!strcmp (trav->device_name, device_name)) && + (trav->port == port)) + break; + trav = trav->next; + } + + if (!trav) { + struct ibv_context *ibctx = ibv_open_device (ib_dev); + + if (!ibctx) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "cannot open device `%s'", + device_name); + return NULL; + } + + trav = CALLOC (1, sizeof (*trav)); + ERR_ABORT (trav); + priv->device = trav; + + trav->context = ibctx; + trav->device_name = strdup (device_name); + trav->port = port; + + trav->next = ctx->ib; + ctx->ib = trav; + + trav->send_chan = ibv_create_comp_channel (trav->context); + if (!trav->send_chan) { + gf_log ("transport/ib-verbs", GF_LOG_CRITICAL, + "%s: could not create send completion channel", + device_name); + /* TODO: cleanup current mess */ + return NULL; + } + + trav->recv_chan = ibv_create_comp_channel (trav->context); + if (!trav->recv_chan) { + gf_log ("transport/ib-verbs", GF_LOG_CRITICAL, + "could not create recv completion channel"); + /* TODO: cleanup current mess */ + return NULL; + } + + if (ib_verbs_create_cq (this) < 0) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "%s: could not create CQ", + this->xl->name); + return NULL; + } + + /* protection domain */ + trav->pd = ibv_alloc_pd (trav->context); + + if (!trav->pd) { + gf_log ("transport/ib-verbs", GF_LOG_CRITICAL, + "%s: could not allocate protection domain", + this->xl->name); + return NULL; + } + + struct ibv_srq_init_attr attr = { + .attr = { + .max_wr = options->recv_count, + .max_sge = 1 + } + }; + trav->srq = ibv_create_srq (trav->pd, &attr); + + if (!trav->srq) { + gf_log ("transport/ib-verbs", GF_LOG_CRITICAL, + "%s: could not create SRQ", + this->xl->name); + return NULL; + } + + /* queue init */ + ib_verbs_queue_init (&trav->sendq); + ib_verbs_queue_init (&trav->recvq); + + if (ib_verbs_create_posts (this) < 0) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "%s: could not allocate posts", + this->xl->name); + return NULL; + } + + /* completion threads */ + ret = pthread_create (&trav->send_thread, + NULL, + ib_verbs_send_completion_proc, + trav->send_chan); + if (ret) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "could not create send completion thread"); + return NULL; + } + ret = pthread_create (&trav->recv_thread, + NULL, + ib_verbs_recv_completion_proc, + trav->recv_chan); + if (ret) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "could not create recv completion thread"); + return NULL; + } + + /* qpreg */ + pthread_mutex_init (&trav->qpreg.lock, NULL); + for (i=0; i<42; i++) { + trav->qpreg.ents[i].next = &trav->qpreg.ents[i]; + trav->qpreg.ents[i].prev = &trav->qpreg.ents[i]; + } + } + return trav; +} + +static int32_t +ib_verbs_init (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + ib_verbs_options_t *options = &priv->options; + struct ibv_device **dev_list; + struct ibv_device *ib_dev = NULL; + int32_t i; + + ib_verbs_options_init (this); + + { + dev_list = ibv_get_device_list (NULL); + + if (!dev_list) { + gf_log ("transport/ib-verbs", + GF_LOG_CRITICAL, + "No IB devices found"); + return -1; + } + + if (!options->device_name) { + if (*dev_list) { + options->device_name = + strdup (ibv_get_device_name (*dev_list)); + } else { + gf_log ("transport/ib-verbs", GF_LOG_CRITICAL, + "IB device list is empty. Check for " + "'ib_uverbs' module"); + return -1; + } + } + + for (i = 0; dev_list[i]; i++) { + if (!strcmp (ibv_get_device_name (dev_list[i]), + options->device_name)) { + ib_dev = dev_list[i]; + break; + } + } + + if (!ib_dev) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "could not open device `%s' (does not exist)", + options->device_name); + ibv_free_device_list (dev_list); + return -1; + } + + priv->device = ib_verbs_get_device (this, ib_dev, + options->port); + + if (!priv->device) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "could not create ib_verbs device for %s", + options->device_name); + ibv_free_device_list (dev_list); + return -1; + } + ibv_free_device_list (dev_list); + } + + priv->peer.trans = this; + INIT_LIST_HEAD (&priv->peer.ioq); + + pthread_mutex_init (&priv->read_mutex, NULL); + pthread_mutex_init (&priv->write_mutex, NULL); + pthread_mutex_init (&priv->recv_mutex, NULL); + /* pthread_cond_init (&priv->recv_cond, NULL); */ + + return 0; +} + + +static int32_t +ib_verbs_disconnect (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + int32_t ret = 0; + + pthread_mutex_lock (&priv->write_mutex); + { + ret = __ib_verbs_disconnect (this); + } + pthread_mutex_unlock (&priv->write_mutex); + + return ret; +} + + +static int32_t +__tcp_connect_finish (int fd) +{ + int ret = -1; + int optval = 0; + socklen_t optlen = sizeof (int); + + ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, + (void *)&optval, &optlen); + + if (ret == 0 && optval) + { + errno = optval; + ret = -1; + } + + return ret; +} + +static inline void +ib_verbs_fill_handshake_data (char *buf, struct ib_verbs_nbio *nbio, + ib_verbs_private_t *priv) +{ + sprintf (buf, + "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n" + "QP1:LID=%04x:QPN=%06x:PSN=%06x\n", + priv->peer.recv_size, + priv->peer.send_size, + priv->peer.local_lid, + priv->peer.local_qpn, + priv->peer.local_psn); + + nbio->vector.iov_base = buf; + nbio->vector.iov_len = strlen (buf) + 1; + nbio->count = 1; + return; +} + +static inline void +ib_verbs_fill_handshake_ack (char *buf, struct ib_verbs_nbio *nbio) +{ + sprintf (buf, "DONE\n"); + nbio->vector.iov_base = buf; + nbio->vector.iov_len = strlen (buf) + 1; + nbio->count = 1; + return; +} + +static int +ib_verbs_handshake_pollin (transport_t *this) +{ + int ret = 0; + ib_verbs_private_t *priv = this->private; + char *buf = priv->handshake.incoming.buf; + int32_t recv_buf_size, send_buf_size; + socklen_t sock_len; + + if (priv->handshake.incoming.state == IB_VERBS_HANDSHAKE_COMPLETE) { + return -1; + } + + pthread_mutex_lock (&priv->write_mutex); + { + while (priv->handshake.incoming.state != IB_VERBS_HANDSHAKE_COMPLETE) + { + switch (priv->handshake.incoming.state) + { + case IB_VERBS_HANDSHAKE_START: + buf = priv->handshake.incoming.buf = CALLOC (1, 256); + ib_verbs_fill_handshake_data (buf, &priv->handshake.incoming, priv); + buf[0] = 0; + priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVING_DATA; + break; + + case IB_VERBS_HANDSHAKE_RECEIVING_DATA: + ret = __tcp_readv (this, + &priv->handshake.incoming.vector, + priv->handshake.incoming.count, + &priv->handshake.incoming.pending_vector, + &priv->handshake.incoming.pending_count); + if (ret == -1) { + goto unlock; + } + + if (ret > 0) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "partial header read on NB socket. continue later"); + goto unlock; + } + + if (!ret) { + priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVED_DATA; + } + break; + + case IB_VERBS_HANDSHAKE_RECEIVED_DATA: + ret = sscanf (buf, + "QP1:RECV_BLKSIZE=%08x:SEND_BLKSIZE=%08x\n" + "QP1:LID=%04x:QPN=%06x:PSN=%06x\n", + &recv_buf_size, + &send_buf_size, + &priv->peer.remote_lid, + &priv->peer.remote_qpn, + &priv->peer.remote_psn); + + if ((ret != 5) && (strncmp (buf, "QP1:", 4))) { + gf_log ("transport/ib-verbs", + GF_LOG_CRITICAL, + "%s: remote-host(%s)'s " + "transport type is different", + this->xl->name, + this->peerinfo.identifier); + ret = -1; + goto unlock; + } + + if (recv_buf_size < priv->peer.recv_size) + priv->peer.recv_size = recv_buf_size; + if (send_buf_size < priv->peer.send_size) + priv->peer.send_size = send_buf_size; + + gf_log ("transport/ib-verbs", GF_LOG_DEBUG, + "%s: transacted recv_size=%d " + "send_size=%d", + this->xl->name, priv->peer.recv_size, + priv->peer.send_size); + + priv->peer.quota = priv->peer.send_count; + + if (ib_verbs_connect_qp (this)) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "%s: failed to connect with " + "remote QP", this->xl->name); + ret = -1; + goto unlock; + } + ib_verbs_fill_handshake_ack (buf, &priv->handshake.incoming); + buf[0] = 0; + priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVING_ACK; + break; + + case IB_VERBS_HANDSHAKE_RECEIVING_ACK: + ret = __tcp_readv (this, + &priv->handshake.incoming.vector, + priv->handshake.incoming.count, + &priv->handshake.incoming.pending_vector, + &priv->handshake.incoming.pending_count); + if (ret == -1) { + goto unlock; + } + + if (ret > 0) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "partial header read on NB " + "socket. continue later"); + goto unlock; + } + + if (!ret) { + priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_RECEIVED_ACK; + } + break; + + case IB_VERBS_HANDSHAKE_RECEIVED_ACK: + if (strncmp (buf, "DONE", 4)) { + gf_log ("transport/ib-verbs", + GF_LOG_ERROR, + "%s: handshake-3 did not " + "return 'DONE' (%s)", + this->xl->name, buf); + ret = -1; + goto unlock; + } + ret = 0; + priv->connected = 1; + sock_len = sizeof (struct sockaddr_storage); + getpeername (priv->sock, + (struct sockaddr *) &this->peerinfo.sockaddr, + &sock_len); + + FREE (priv->handshake.incoming.buf); + priv->handshake.incoming.buf = NULL; + priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_COMPLETE; + } + } + } +unlock: + pthread_mutex_unlock (&priv->write_mutex); + + if (ret == -1) { + transport_disconnect (this); + } else { + ret = 0; + } + + if (!ret && priv->connected) { + ret = this->xl->notify (this->xl, GF_EVENT_CHILD_UP, this); + } + + return ret; +} + +static int +ib_verbs_handshake_pollout (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + char *buf = priv->handshake.outgoing.buf; + int32_t ret = 0; + + if (priv->handshake.outgoing.state == IB_VERBS_HANDSHAKE_COMPLETE) { + return 0; + } + + pthread_mutex_unlock (&priv->write_mutex); + { + while (priv->handshake.outgoing.state != IB_VERBS_HANDSHAKE_COMPLETE) + { + switch (priv->handshake.outgoing.state) + { + case IB_VERBS_HANDSHAKE_START: + buf = priv->handshake.outgoing.buf = CALLOC (1, 256); + ib_verbs_fill_handshake_data (buf, &priv->handshake.outgoing, priv); + priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENDING_DATA; + break; + + case IB_VERBS_HANDSHAKE_SENDING_DATA: + ret = __tcp_writev (this, + &priv->handshake.outgoing.vector, + priv->handshake.outgoing.count, + &priv->handshake.outgoing.pending_vector, + &priv->handshake.outgoing.pending_count); + if (ret == -1) { + goto unlock; + } + + if (ret > 0) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "partial header read on NB socket. continue later"); + goto unlock; + } + + if (!ret) { + priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENT_DATA; + } + break; + + case IB_VERBS_HANDSHAKE_SENT_DATA: + ib_verbs_fill_handshake_ack (buf, &priv->handshake.outgoing); + priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_SENDING_ACK; + break; + + case IB_VERBS_HANDSHAKE_SENDING_ACK: + ret = __tcp_writev (this, + &priv->handshake.outgoing.vector, + priv->handshake.outgoing.count, + &priv->handshake.outgoing.pending_vector, + &priv->handshake.outgoing.pending_count); + + if (ret == -1) { + goto unlock; + } + + if (ret > 0) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "partial header read on NB " + "socket. continue later"); + goto unlock; + } + + if (!ret) { + FREE (priv->handshake.outgoing.buf); + priv->handshake.outgoing.buf = NULL; + priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_COMPLETE; + } + break; + } + } + } +unlock: + pthread_mutex_unlock (&priv->write_mutex); + + if (ret == -1) { + transport_disconnect (this); + } else { + ret = 0; + } + + return ret; +} + +static int +ib_verbs_handshake_pollerr (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + int32_t ret = 0; + char need_unref = 0; + + gf_log ("transport/ib-verbs", GF_LOG_DEBUG, + "%s: peer disconnected, cleaning up", + this->xl->name); + + pthread_mutex_lock (&priv->write_mutex); + { + __ib_verbs_teardown (this); + + if (priv->sock != -1) { + event_unregister (this->xl->ctx->event_pool, + priv->sock, priv->idx); + need_unref = 1; + + if (close (priv->sock) != 0) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "close () - error: %s", + strerror (errno)); + ret = -errno; + } + priv->tcp_connected = priv->connected = 0; + priv->sock = -1; + } + + if (priv->handshake.incoming.buf) { + FREE (priv->handshake.incoming.buf); + priv->handshake.incoming.buf = NULL; + } + + priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START; + + if (priv->handshake.outgoing.buf) { + FREE (priv->handshake.outgoing.buf); + priv->handshake.outgoing.buf = NULL; + } + + priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START; + } + pthread_mutex_unlock (&priv->write_mutex); + + this->xl->notify (this->xl, GF_EVENT_POLLERR, this, NULL); + + if (need_unref) + transport_unref (this); + + return 0; +} + + +static int +tcp_connect_finish (transport_t *this) +{ + ib_verbs_private_t *priv = this->private; + int error = 0, ret = 0; + + pthread_mutex_lock (&priv->write_mutex); + { + ret = __tcp_connect_finish (priv->sock); + + if (!ret) { + this->myinfo.sockaddr_len = + sizeof (this->myinfo.sockaddr); + ret = getsockname (priv->sock, + (struct sockaddr *)&this->myinfo.sockaddr, + &this->myinfo.sockaddr_len); + if (ret == -1) + { + gf_log (this->xl->name, GF_LOG_ERROR, + "getsockname on new client-socket %d " + "failed (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + error = 1; + goto unlock; + } + + get_transport_identifiers (this); + priv->tcp_connected = 1; + } + + if (ret == -1 && errno != EINPROGRESS) { + gf_log (this->xl->name, GF_LOG_ERROR, + "tcp connect to %s failed (%s)", + this->peerinfo.identifier, strerror (errno)); + error = 1; + } + } +unlock: + pthread_mutex_unlock (&priv->write_mutex); + + if (error) { + transport_disconnect (this); + } + + return ret; +} + +static int +ib_verbs_event_handler (int fd, int idx, void *data, + int poll_in, int poll_out, int poll_err) +{ + transport_t *this = data; + ib_verbs_private_t *priv = this->private; + ib_verbs_options_t *options = NULL; + int ret = 0; + + if (!priv->tcp_connected) { + ret = tcp_connect_finish (this); + if (priv->tcp_connected) { + options = &priv->options; + + priv->peer.send_count = options->send_count; + priv->peer.recv_count = options->recv_count; + priv->peer.send_size = options->send_size; + priv->peer.recv_size = options->recv_size; + + if ((ret = ib_verbs_create_qp (this)) < 0) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "%s: could not create QP", + this->xl->name); + transport_disconnect (this); + } + } + } + + if (!ret && poll_out && priv->tcp_connected) { + ret = ib_verbs_handshake_pollout (this); + } + + if (!ret && poll_in && priv->tcp_connected) { + if (priv->handshake.incoming.state == IB_VERBS_HANDSHAKE_COMPLETE) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "%s: pollin received on tcp socket (peer: %s) " + "after handshake is complete", + this->xl->name, this->peerinfo.identifier); + ib_verbs_handshake_pollerr (this); + return 0; + } + ret = ib_verbs_handshake_pollin (this); + } + + if (poll_err) { + ret = ib_verbs_handshake_pollerr (this); + } + + return 0; +} + +static int +__tcp_nonblock (int fd) +{ + int flags = 0; + int ret = -1; + + flags = fcntl (fd, F_GETFL); + + if (flags != -1) + ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); + + return ret; +} + +static int32_t +ib_verbs_connect (struct transport *this) +{ + dict_t *options = this->xl->options; + + ib_verbs_private_t *priv = this->private; + + int32_t ret = 0; + gf_boolean_t non_blocking = 1; + struct sockaddr_storage sockaddr; + socklen_t sockaddr_len = 0; + + if (priv->connected) { + return 0; + } + + if (dict_get (options, "non-blocking-io")) { + char *nb_connect = data_to_str (dict_get (this->xl->options, + "non-blocking-io")); + + if (gf_string2boolean (nb_connect, &non_blocking) == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "'non-blocking-io' takes only boolean " + "options, not taking any action"); + non_blocking = 1; + } + } + + ret = ibverbs_client_get_remote_sockaddr (this, (struct sockaddr *)&sockaddr, + &sockaddr_len); + if (ret != 0) { + gf_log (this->xl->name, GF_LOG_ERROR, + "cannot get remote address to connect"); + return ret; + } + + pthread_mutex_lock (&priv->write_mutex); + { + if (priv->sock != -1) { + ret = 0; + goto unlock; + } + + priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family, + SOCK_STREAM, 0); + + if (priv->sock == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "socket () - error: %s", strerror (errno)); + ret = -errno; + goto unlock; + } + + gf_log (this->xl->name, GF_LOG_DEBUG, + "socket fd = %d", priv->sock); + + memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len); + this->peerinfo.sockaddr_len = sockaddr_len; + + ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = + ((struct sockaddr *)&this->peerinfo.sockaddr)->sa_family; + + if (non_blocking) + { + ret = __tcp_nonblock (priv->sock); + + if (ret == -1) + { + gf_log (this->xl->name, GF_LOG_ERROR, + "could not set socket %d to non " + "blocking mode (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + } + + ret = client_bind (this, + (struct sockaddr *)&this->myinfo.sockaddr, + &this->myinfo.sockaddr_len, priv->sock); + if (ret == -1) + { + gf_log (this->xl->name, GF_LOG_WARNING, + "client bind failed: %s", strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + ret = connect (priv->sock, + (struct sockaddr *)&this->peerinfo.sockaddr, + this->peerinfo.sockaddr_len); + if (ret == -1 && errno != EINPROGRESS) + { + gf_log (this->xl->name, GF_LOG_ERROR, + "connection attempt failed (%s)", + strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + priv->tcp_connected = priv->connected = 0; + + transport_ref (this); + + priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START; + priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START; + + priv->idx = event_register (this->xl->ctx->event_pool, + priv->sock, ib_verbs_event_handler, + this, 1, 1); + } +unlock: + pthread_mutex_unlock (&priv->write_mutex); + + return ret; +} + +static int +ib_verbs_server_event_handler (int fd, int idx, void *data, + int poll_in, int poll_out, int poll_err) +{ + int32_t main_sock = -1; + transport_t *this, *trans = data; + ib_verbs_private_t *priv = NULL; + ib_verbs_private_t *trans_priv = (ib_verbs_private_t *) trans->private; + ib_verbs_options_t *options = NULL; + + if (!poll_in) + return 0; + + this = CALLOC (1, sizeof (transport_t)); + ERR_ABORT (this); + priv = CALLOC (1, sizeof (ib_verbs_private_t)); + ERR_ABORT (priv); + this->private = priv; + /* Copy all the ib_verbs related values in priv, from trans_priv + as other than QP, all the values remain same */ + priv->device = trans_priv->device; + priv->options = trans_priv->options; + options = &priv->options; + + this->ops = trans->ops; + this->xl = trans->xl; + + memcpy (&this->myinfo.sockaddr, &trans->myinfo.sockaddr, + trans->myinfo.sockaddr_len); + this->myinfo.sockaddr_len = trans->myinfo.sockaddr_len; + + main_sock = (trans_priv)->sock; + this->peerinfo.sockaddr_len = sizeof (this->peerinfo.sockaddr); + priv->sock = accept (main_sock, + (struct sockaddr *)&this->peerinfo.sockaddr, + &this->peerinfo.sockaddr_len); + if (priv->sock == -1) { + gf_log ("ib-verbs/server", GF_LOG_ERROR, + "accept() failed: %s", + strerror (errno)); + free (this->private); + free (this); + return -1; + } + + priv->peer.trans = this; + transport_ref (this); + + get_transport_identifiers (this); + + priv->tcp_connected = 1; + priv->handshake.incoming.state = IB_VERBS_HANDSHAKE_START; + priv->handshake.outgoing.state = IB_VERBS_HANDSHAKE_START; + + priv->peer.send_count = options->send_count; + priv->peer.recv_count = options->recv_count; + priv->peer.send_size = options->send_size; + priv->peer.recv_size = options->recv_size; + INIT_LIST_HEAD (&priv->peer.ioq); + + if (ib_verbs_create_qp (this) < 0) { + gf_log ("transport/ib-verbs", GF_LOG_ERROR, + "%s: could not create QP", + this->xl->name); + transport_disconnect (this); + return -1; + } + + priv->idx = event_register (this->xl->ctx->event_pool, priv->sock, + ib_verbs_event_handler, this, 1, 1); + + pthread_mutex_init (&priv->read_mutex, NULL); + pthread_mutex_init (&priv->write_mutex, NULL); + pthread_mutex_init (&priv->recv_mutex, NULL); + /* pthread_cond_init (&priv->recv_cond, NULL); */ + + return 0; +} + +static int32_t +ib_verbs_listen (transport_t *this) +{ + struct sockaddr_storage sockaddr; + socklen_t sockaddr_len; + ib_verbs_private_t *priv = this->private; + int opt = 1, ret = 0; + char service[NI_MAXSERV], host[NI_MAXHOST]; + + memset (&sockaddr, 0, sizeof (sockaddr)); + ret = ibverbs_server_get_local_sockaddr (this, + (struct sockaddr *)&sockaddr, + &sockaddr_len); + if (ret != 0) { + gf_log (this->xl->name, GF_LOG_ERROR, + "cannot find network address of server to bind to"); + goto err; + } + + priv->sock = socket (((struct sockaddr *)&sockaddr)->sa_family, + SOCK_STREAM, 0); + if (priv->sock == -1) { + gf_log ("ib-verbs/server", GF_LOG_CRITICAL, + "init: failed to create socket, error: %s", + strerror (errno)); + free (this->private); + ret = -1; + goto err; + } + + memcpy (&this->myinfo.sockaddr, &sockaddr, sockaddr_len); + this->myinfo.sockaddr_len = sockaddr_len; + + ret = getnameinfo ((struct sockaddr *)&this->myinfo.sockaddr, + this->myinfo.sockaddr_len, + host, sizeof (host), + service, sizeof (service), + NI_NUMERICHOST); + if (ret != 0) { + gf_log (this->xl->name, GF_LOG_ERROR, + "getnameinfo failed (%s)", gai_strerror (ret)); + goto err; + } + sprintf (this->myinfo.identifier, "%s:%s", host, service); + + setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); + if (bind (priv->sock, + (struct sockaddr *)&sockaddr, + sockaddr_len) != 0) { + ret = -1; + gf_log ("ib-verbs/server", GF_LOG_CRITICAL, + "init: failed to bind to socket for %s (%s)", + this->myinfo.identifier, strerror (errno)); + goto err; + } + + if (listen (priv->sock, 10) != 0) { + gf_log ("ib-verbs/server", GF_LOG_CRITICAL, + "init: listen () failed on socket for %s (%s)", + this->myinfo.identifier, strerror (errno)); + ret = -1; + goto err; + } + + /* Register the main socket */ + priv->idx = event_register (this->xl->ctx->event_pool, priv->sock, + ib_verbs_server_event_handler, + transport_ref (this), 1, 0); + +err: + return ret; +} + +struct transport_ops tops = { + .receive = ib_verbs_receive, + .submit = ib_verbs_submit, + .connect = ib_verbs_connect, + .disconnect = ib_verbs_disconnect, + .listen = ib_verbs_listen, +}; + +int32_t +init (transport_t *this) +{ + ib_verbs_private_t *priv = CALLOC (1, sizeof (*priv)); + this->private = priv; + priv->sock = -1; + + if (ib_verbs_init (this)) { + gf_log (this->xl->name, GF_LOG_ERROR, + "Failed to initialize IB Device"); + return -1; + } + + return 0; +} + +void +fini (struct transport *this) +{ + /* TODO: verify this function does graceful finish */ + ib_verbs_private_t *priv = this->private; + this->private = NULL; + + pthread_mutex_destroy (&priv->recv_mutex); + pthread_mutex_destroy (&priv->write_mutex); + pthread_mutex_destroy (&priv->read_mutex); + /* pthread_cond_destroy (&priv->recv_cond); */ + + gf_log (this->xl->name, GF_LOG_CRITICAL, + "called fini on transport: %p", + this); + free (priv); + return; +} + +/* TODO: expand each option */ +struct volume_options options[] = { + { .key = {"transport.ib-verbs.port", + "ib-verbs-port"}, + .type = GF_OPTION_TYPE_INT, + .min = 1, + .max = 4, + .description = "check the option by 'ibv_devinfo'" + }, + { .key = {"transport.ib-verbs.mtu", + "ib-verbs-mtu"}, + .type = GF_OPTION_TYPE_INT, + }, + { .key = {"transport.ib-verbs.device-name", + "ib-verbs-device-name"}, + .type = GF_OPTION_TYPE_ANY, + .description = "check by 'ibv_devinfo'" + }, + { .key = {"transport.ib-verbs.work-request-send-size", + "ib-verbs-work-request-send-size"}, + .type = GF_OPTION_TYPE_INT, + }, + { .key = {"transport.ib-verbs.work-request-recv-size", + "ib-verbs-work-request-recv-size"}, + .type = GF_OPTION_TYPE_INT, + }, + { .key = {"transport.ib-verbs.work-request-send-count", + "ib-verbs-work-request-send-count"}, + .type = GF_OPTION_TYPE_INT, + }, + { .key = {"transport.ib-verbs.work-request-recv-count", + "ib-verbs-work-request-recv-count"}, + .type = GF_OPTION_TYPE_INT, + }, + { .key = {"remote-port", + "transport.remote-port", + "transport.ib-verbs.remote-port"}, + .type = GF_OPTION_TYPE_INT + }, + { .key = {"transport.ib-verbs.listen-port", "listen-port"}, + .type = GF_OPTION_TYPE_INT + }, + { .key = {"transport.ib-verbs.connect-path", "connect-path"}, + .type = GF_OPTION_TYPE_ANY + }, + { .key = {"transport.ib-verbs.bind-path", "bind-path"}, + .type = GF_OPTION_TYPE_ANY + }, + { .key = {"transport.ib-verbs.listen-path", "listen-path"}, + .type = GF_OPTION_TYPE_ANY + }, + { .key = {"transport.address-family", + "address-family"}, + .value = {"inet", "inet6", "inet/inet6", "inet6/inet", + "unix", "inet-sdp" }, + .type = GF_OPTION_TYPE_STR + }, + { .key = {NULL} } +}; diff --git a/transport/ib-verbs/src/ib-verbs.h b/transport/ib-verbs/src/ib-verbs.h new file mode 100644 index 00000000000..56b717865bf --- /dev/null +++ b/transport/ib-verbs/src/ib-verbs.h @@ -0,0 +1,215 @@ +/* + Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#ifndef _XPORT_IB_VERBS_H +#define _XPORT_IB_VERBS_H + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#ifndef MAX_IOVEC +#define MAX_IOVEC 16 +#endif /* MAX_IOVEC */ + +#include "xlator.h" +#include "event.h" + +#include <stdio.h> +#include <list.h> +#include <arpa/inet.h> +#include <infiniband/verbs.h> + +#define GF_DEFAULT_IBVERBS_LISTEN_PORT 6997 + +/* options per transport end point */ +struct _ib_verbs_options { + int32_t port; + char *device_name; + enum ibv_mtu mtu; + int32_t send_count, send_size, recv_count, recv_size; +}; +typedef struct _ib_verbs_options ib_verbs_options_t; + + +struct _ib_verbs_header { + char colonO[3]; + uint32_t size1; + uint32_t size2; + char version; +} __attribute__((packed)); +typedef struct _ib_verbs_header ib_verbs_header_t; + +struct _ib_verbs_ioq { + union { + struct list_head list; + struct { + struct _ib_verbs_ioq *next; + struct _ib_verbs_ioq *prev; + }; + }; + ib_verbs_header_t header; + struct iovec vector[MAX_IOVEC]; + int count; + char *buf; + dict_t *refs; +}; +typedef struct _ib_verbs_ioq ib_verbs_ioq_t; + +/* represents one communication peer, two per transport_t */ +struct _ib_verbs_peer { + transport_t *trans; + struct ibv_qp *qp; + + int32_t recv_count; + int32_t send_count; + int32_t recv_size; + int32_t send_size; + + int32_t quota; + union { + struct list_head ioq; + struct { + ib_verbs_ioq_t *ioq_next; + ib_verbs_ioq_t *ioq_prev; + }; + }; + + /* QP attributes, needed to connect with remote QP */ + int32_t local_lid; + int32_t local_psn; + int32_t local_qpn; + int32_t remote_lid; + int32_t remote_psn; + int32_t remote_qpn; +}; +typedef struct _ib_verbs_peer ib_verbs_peer_t; + + +struct _ib_verbs_post { + struct _ib_verbs_post *next, *prev; + struct ibv_mr *mr; + char *buf; + int32_t buf_size; + char aux; + int32_t reused; + pthread_barrier_t wait; +}; +typedef struct _ib_verbs_post ib_verbs_post_t; + + +struct _ib_verbs_queue { + ib_verbs_post_t active_posts, passive_posts; + int32_t active_count, passive_count; + pthread_mutex_t lock; +}; +typedef struct _ib_verbs_queue ib_verbs_queue_t; + + +struct _ib_verbs_qpreg { + pthread_mutex_t lock; + int32_t count; + struct _qpent { + struct _qpent *next, *prev; + int32_t qp_num; + ib_verbs_peer_t *peer; + } ents[42]; +}; +typedef struct _ib_verbs_qpreg ib_verbs_qpreg_t; + +/* context per device, stored in global glusterfs_ctx_t->ib */ +struct _ib_verbs_device { + struct _ib_verbs_device *next; + const char *device_name; + struct ibv_context *context; + int32_t port; + struct ibv_pd *pd; + struct ibv_srq *srq; + ib_verbs_qpreg_t qpreg; + struct ibv_comp_channel *send_chan, *recv_chan; + struct ibv_cq *send_cq, *recv_cq; + ib_verbs_queue_t sendq, recvq; + pthread_t send_thread, recv_thread; +}; +typedef struct _ib_verbs_device ib_verbs_device_t; + +typedef enum { + IB_VERBS_HANDSHAKE_START = 0, + IB_VERBS_HANDSHAKE_SENDING_DATA, + IB_VERBS_HANDSHAKE_RECEIVING_DATA, + IB_VERBS_HANDSHAKE_SENT_DATA, + IB_VERBS_HANDSHAKE_RECEIVED_DATA, + IB_VERBS_HANDSHAKE_SENDING_ACK, + IB_VERBS_HANDSHAKE_RECEIVING_ACK, + IB_VERBS_HANDSHAKE_RECEIVED_ACK, + IB_VERBS_HANDSHAKE_COMPLETE, +} ib_verbs_handshake_state_t; + +struct ib_verbs_nbio { + int state; + char *buf; + int count; + struct iovec vector; + struct iovec *pending_vector; + int pending_count; +}; + + +struct _ib_verbs_private { + int32_t sock; + int32_t idx; + unsigned char connected; + unsigned char tcp_connected; + unsigned char ib_connected; + in_addr_t addr; + unsigned short port; + + /* IB Verbs Driver specific variables, pointers */ + ib_verbs_peer_t peer; + ib_verbs_device_t *device; + ib_verbs_options_t options; + + /* Used by trans->op->receive */ + char *data_ptr; + int32_t data_offset; + int32_t data_len; + + /* Mutex */ + pthread_mutex_t read_mutex; + pthread_mutex_t write_mutex; + pthread_barrier_t handshake_barrier; + char handshake_ret; + + pthread_mutex_t recv_mutex; + + /* used during ib_verbs_handshake */ + struct { + struct ib_verbs_nbio incoming; + struct ib_verbs_nbio outgoing; + int state; + ib_verbs_header_t header; + char *buf; + size_t size; + } handshake; +}; +typedef struct _ib_verbs_private ib_verbs_private_t; + +#endif /* _XPORT_IB_VERBS_H */ diff --git a/transport/ib-verbs/src/name.c b/transport/ib-verbs/src/name.c new file mode 100644 index 00000000000..697344987b4 --- /dev/null +++ b/transport/ib-verbs/src/name.c @@ -0,0 +1,682 @@ +/* + Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#include <sys/types.h> +#include <sys/socket.h> +#include <errno.h> +#include <netdb.h> +#include <string.h> + +#ifdef CLIENT_PORT_CEILING +#undef CLIENT_PORT_CEILING +#endif + +#define CLIENT_PORT_CEILING 1024 + +#ifndef AF_INET_SDP +#define AF_INET_SDP 27 +#endif + +#include "transport.h" +#include "ib-verbs.h" + +int32_t +gf_resolve_ip6 (const char *hostname, + uint16_t port, + int family, + void **dnscache, + struct addrinfo **addr_info); + +static int32_t +af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, + socklen_t sockaddr_len, int ceiling) +{ + int32_t ret = -1; + /* struct sockaddr_in sin = {0, }; */ + uint16_t port = ceiling - 1; + + while (port) + { + switch (sockaddr->sa_family) + { + case AF_INET6: + ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port); + break; + + case AF_INET_SDP: + case AF_INET: + ((struct sockaddr_in *)sockaddr)->sin_port = htons (port); + break; + } + + ret = bind (fd, sockaddr, sockaddr_len); + + if (ret == 0) + break; + + if (ret == -1 && errno == EACCES) + break; + + port--; + } + + return ret; +} + +static int32_t +af_unix_client_bind (transport_t *this, + struct sockaddr *sockaddr, + socklen_t sockaddr_len, + int sock) +{ + data_t *path_data = NULL; + struct sockaddr_un *addr = NULL; + int32_t ret = -1; + + path_data = dict_get (this->xl->options, + "transport.ib-verbs.bind-path"); + if (path_data) { + char *path = data_to_str (path_data); + if (!path || strlen (path) > UNIX_PATH_MAX) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "transport.ib-verbs.bind-path not specfied " + "for unix socket, letting connect to assign " + "default value"); + goto err; + } + + addr = (struct sockaddr_un *) sockaddr; + strcpy (addr->sun_path, path); + ret = bind (sock, (struct sockaddr *)addr, sockaddr_len); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "cannot bind to unix-domain socket %d (%s)", + sock, strerror (errno)); + goto err; + } + } + +err: + return ret; +} + +static int32_t +client_fill_address_family (transport_t *this, struct sockaddr *sockaddr) +{ + data_t *address_family_data = NULL; + + address_family_data = dict_get (this->xl->options, + "transport.address-family"); + if (!address_family_data) { + data_t *remote_host_data = NULL, *connect_path_data = NULL; + remote_host_data = dict_get (this->xl->options, "remote-host"); + connect_path_data = dict_get (this->xl->options, + "transport.ib-verbs.connect-path"); + + if (!(remote_host_data || connect_path_data) || + (remote_host_data && connect_path_data)) { + gf_log (this->xl->name, GF_LOG_ERROR, + "address-family not specified and not able to " + "determine the same from other options " + "(remote-host:%s and connect-path:%s)", + data_to_str (remote_host_data), + data_to_str (connect_path_data)); + return -1; + } + + if (remote_host_data) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "address-family not specified, guessing it " + "to be inet/inet6"); + sockaddr->sa_family = AF_UNSPEC; + } else { + gf_log (this->xl->name, GF_LOG_DEBUG, + "address-family not specified, guessing it " + "to be unix"); + sockaddr->sa_family = AF_UNIX; + } + + } else { + char *address_family = data_to_str (address_family_data); + if (!strcasecmp (address_family, "unix")) { + sockaddr->sa_family = AF_UNIX; + } else if (!strcasecmp (address_family, "inet")) { + sockaddr->sa_family = AF_INET; + } else if (!strcasecmp (address_family, "inet6")) { + sockaddr->sa_family = AF_INET6; + } else if (!strcasecmp (address_family, "inet-sdp")) { + sockaddr->sa_family = AF_INET_SDP; + } else if (!strcasecmp (address_family, "inet/inet6") + || !strcasecmp (address_family, "inet6/inet")) { + sockaddr->sa_family = AF_UNSPEC; + } else { + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address-family (%s) specified", + address_family); + return -1; + } + } + + return 0; +} + +static int32_t +af_inet_client_get_remote_sockaddr (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len) +{ + dict_t *options = this->xl->options; + data_t *remote_host_data = NULL; + data_t *remote_port_data = NULL; + char *remote_host = NULL; + uint16_t remote_port = 0; + struct addrinfo *addr_info = NULL; + int32_t ret = 0; + + remote_host_data = dict_get (options, "remote-host"); + if (remote_host_data == NULL) + { + gf_log (this->xl->name, GF_LOG_ERROR, + "option remote-host missing in volume %s", + this->xl->name); + ret = -1; + goto err; + } + + remote_host = data_to_str (remote_host_data); + if (remote_host == NULL) + { + gf_log (this->xl->name, GF_LOG_ERROR, + "option remote-host has data NULL in volume %s", + this->xl->name); + ret = -1; + goto err; + } + + remote_port_data = dict_get (options, "remote-port"); + if (remote_port_data == NULL) + { + gf_log (this->xl->name, GF_LOG_DEBUG, + "option remote-port missing in volume %s. " + "Defaulting to %d", + this->xl->name, GF_DEFAULT_IBVERBS_LISTEN_PORT); + + remote_port = GF_DEFAULT_IBVERBS_LISTEN_PORT; + } + else + { + remote_port = data_to_uint16 (remote_port_data); + } + + if (remote_port == (uint16_t)-1) + { + gf_log (this->xl->name, GF_LOG_ERROR, + "option remote-port has invalid port in volume %s", + this->xl->name); + ret = -1; + goto err; + } + + /* TODO: gf_resolve is a blocking call. kick in some + non blocking dns techniques */ + ret = gf_resolve_ip6 (remote_host, remote_port, + sockaddr->sa_family, + &this->dnscache, &addr_info); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "DNS resolution failed on host %s", remote_host); + goto err; + } + + memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen); + *sockaddr_len = addr_info->ai_addrlen; + +err: + return ret; +} + +static int32_t +af_unix_client_get_remote_sockaddr (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len) +{ + struct sockaddr_un *sockaddr_un = NULL; + char *connect_path = NULL; + data_t *connect_path_data = NULL; + int32_t ret = 0; + + connect_path_data = dict_get (this->xl->options, + "transport.ib-verbs.connect-path"); + if (!connect_path_data) { + gf_log (this->xl->name, GF_LOG_ERROR, + "option transport.ib-verbs.connect-path not " + "specified for address-family unix"); + ret = -1; + goto err; + } + + connect_path = data_to_str (connect_path_data); + if (!connect_path) { + gf_log (this->xl->name, GF_LOG_ERROR, + "connect-path is null-string"); + ret = -1; + goto err; + } + + if (strlen (connect_path) > UNIX_PATH_MAX) { + gf_log (this->xl->name, GF_LOG_ERROR, + "connect-path value length %"GF_PRI_SIZET" > " + "%d octets", strlen (connect_path), UNIX_PATH_MAX); + ret = -1; + goto err; + } + + gf_log (this->xl->name, + GF_LOG_DEBUG, + "using connect-path %s", connect_path); + sockaddr_un = (struct sockaddr_un *)sockaddr; + strcpy (sockaddr_un->sun_path, connect_path); + *sockaddr_len = sizeof (struct sockaddr_un); + +err: + return ret; +} + +static int32_t +af_unix_server_get_local_sockaddr (transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len) +{ + data_t *listen_path_data = NULL; + char *listen_path = NULL; + int32_t ret = 0; + struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr; + + + listen_path_data = dict_get (this->xl->options, + "transport.ib-verbs.listen-path"); + if (!listen_path_data) { + gf_log (this->xl->name, GF_LOG_ERROR, + "missing option listen-path"); + ret = -1; + goto err; + } + + listen_path = data_to_str (listen_path_data); + +#ifndef UNIX_PATH_MAX +#define UNIX_PATH_MAX 108 +#endif + + if (strlen (listen_path) > UNIX_PATH_MAX) { + gf_log (this->xl->name, GF_LOG_ERROR, + "option listen-path has value length %"GF_PRI_SIZET" > %d", + strlen (listen_path), UNIX_PATH_MAX); + ret = -1; + goto err; + } + + sunaddr->sun_family = AF_UNIX; + strcpy (sunaddr->sun_path, listen_path); + *addr_len = sizeof (struct sockaddr_un); + +err: + return ret; +} + +static int32_t +af_inet_server_get_local_sockaddr (transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len) +{ + struct addrinfo hints, *res = 0; + data_t *listen_port_data = NULL, *listen_host_data = NULL; + uint16_t listen_port = -1; + char service[NI_MAXSERV], *listen_host = NULL; + dict_t *options = NULL; + int32_t ret = 0; + + options = this->xl->options; + + listen_port_data = dict_get (options, "transport.ib-verbs.listen-port"); + listen_host_data = dict_get (options, "transport.ib-verbs.bind-address"); + + if (listen_port_data) + { + listen_port = data_to_uint16 (listen_port_data); + } + + if (listen_port == (uint16_t) -1) + listen_port = GF_DEFAULT_IBVERBS_LISTEN_PORT; + + + if (listen_host_data) + { + listen_host = data_to_str (listen_host_data); + } + + memset (service, 0, sizeof (service)); + sprintf (service, "%d", listen_port); + + memset (&hints, 0, sizeof (hints)); + hints.ai_family = addr->sa_family; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE; + + ret = getaddrinfo(listen_host, service, &hints, &res); + if (ret != 0) { + gf_log (this->xl->name, + GF_LOG_ERROR, + "getaddrinfo failed for host %s, service %s (%s)", + listen_host, service, gai_strerror (ret)); + ret = -1; + goto err; + } + + memcpy (addr, res->ai_addr, res->ai_addrlen); + *addr_len = res->ai_addrlen; + + freeaddrinfo (res); + +err: + return ret; +} + +int32_t +client_bind (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len, + int sock) +{ + int ret = 0; + + *sockaddr_len = sizeof (struct sockaddr_in6); + switch (sockaddr->sa_family) + { + case AF_INET_SDP: + case AF_INET: + *sockaddr_len = sizeof (struct sockaddr_in); + + case AF_INET6: + ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr, + *sockaddr_len, + CLIENT_PORT_CEILING); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "cannot bind inet socket (%d) to port " + "less than %d (%s)", + sock, CLIENT_PORT_CEILING, strerror (errno)); + ret = 0; + } + break; + + case AF_UNIX: + *sockaddr_len = sizeof (struct sockaddr_un); + ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr, + *sockaddr_len, sock); + break; + + default: + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address family %d", sockaddr->sa_family); + ret = -1; + break; + } + + return ret; +} + +int32_t +ibverbs_client_get_remote_sockaddr (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len) +{ + int32_t ret = 0; + char is_inet_sdp = 0; + + ret = client_fill_address_family (this, sockaddr); + if (ret) { + ret = -1; + goto err; + } + + switch (sockaddr->sa_family) + { + case AF_INET_SDP: + sockaddr->sa_family = AF_INET; + is_inet_sdp = 1; + + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + ret = af_inet_client_get_remote_sockaddr (this, + sockaddr, + sockaddr_len); + + if (is_inet_sdp) { + sockaddr->sa_family = AF_INET_SDP; + } + + break; + + case AF_UNIX: + ret = af_unix_client_get_remote_sockaddr (this, + sockaddr, + sockaddr_len); + break; + + default: + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address-family %d", sockaddr->sa_family); + ret = -1; + } + +err: + return ret; +} + +int32_t +ibverbs_server_get_local_sockaddr (transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len) +{ + data_t *address_family_data = NULL; + int32_t ret = 0; + char is_inet_sdp = 0; + + address_family_data = dict_get (this->xl->options, + "transport.address-family"); + if (address_family_data) { + char *address_family = NULL; + address_family = data_to_str (address_family_data); + + if (!strcasecmp (address_family, "inet")) { + addr->sa_family = AF_INET; + } else if (!strcasecmp (address_family, "inet6")) { + addr->sa_family = AF_INET6; + } else if (!strcasecmp (address_family, "inet-sdp")) { + addr->sa_family = AF_INET_SDP; + } else if (!strcasecmp (address_family, "unix")) { + addr->sa_family = AF_UNIX; + } else if (!strcasecmp (address_family, "inet/inet6") + || !strcasecmp (address_family, "inet6/inet")) { + addr->sa_family = AF_UNSPEC; + } else { + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address family (%s) specified", + address_family); + ret = -1; + goto err; + } + } else { + gf_log (this->xl->name, GF_LOG_DEBUG, + "option address-family not specified, defaulting " + "to inet/inet6"); + addr->sa_family = AF_UNSPEC; + } + + switch (addr->sa_family) + { + case AF_INET_SDP: + is_inet_sdp = 1; + addr->sa_family = AF_INET; + + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + ret = af_inet_server_get_local_sockaddr (this, addr, addr_len); + if (is_inet_sdp && !ret) { + addr->sa_family = AF_INET_SDP; + } + break; + + case AF_UNIX: + ret = af_unix_server_get_local_sockaddr (this, addr, addr_len); + break; + } + +err: + return ret; +} + +int32_t +fill_inet6_inet_identifiers (transport_t *this, struct sockaddr_storage *addr, + int32_t addr_len, char *identifier) +{ + int32_t ret = 0, tmpaddr_len = 0; + char service[NI_MAXSERV], host[NI_MAXHOST]; + struct sockaddr_storage tmpaddr; + + memset (&tmpaddr, 0, sizeof (tmpaddr)); + tmpaddr = *addr; + tmpaddr_len = addr_len; + + if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) { + int32_t one_to_four, four_to_eight, twelve_to_sixteen; + int16_t eight_to_ten, ten_to_twelve; + + one_to_four = four_to_eight = twelve_to_sixteen = 0; + eight_to_ten = ten_to_twelve = 0; + + one_to_four = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[0]; + four_to_eight = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[1]; + eight_to_ten = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[4]; + ten_to_twelve = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[5]; + twelve_to_sixteen = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[3]; + + /* ipv4 mapped ipv6 address has + bits 0-80: 0 + bits 80-96: 0xffff + bits 96-128: ipv4 address + */ + + if (one_to_four == 0 && + four_to_eight == 0 && + eight_to_ten == 0 && + ten_to_twelve == -1) { + struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr; + memset (&tmpaddr, 0, sizeof (tmpaddr)); + + in_ptr->sin_family = AF_INET; + in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port; + in_ptr->sin_addr.s_addr = twelve_to_sixteen; + tmpaddr_len = sizeof (*in_ptr); + } + } + + ret = getnameinfo ((struct sockaddr *) &tmpaddr, + tmpaddr_len, + host, sizeof (host), + service, sizeof (service), + NI_NUMERICHOST | NI_NUMERICSERV); + if (ret != 0) { + gf_log (this->xl->name, + GF_LOG_ERROR, + "getnameinfo failed (%s)", gai_strerror (ret)); + } + + sprintf (identifier, "%s:%s", host, service); + + return ret; +} + +int32_t +get_transport_identifiers (transport_t *this) +{ + int32_t ret = 0; + char is_inet_sdp = 0; + + switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family) + { + case AF_INET_SDP: + is_inet_sdp = 1; + ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET; + + case AF_INET: + case AF_INET6: + { + ret = fill_inet6_inet_identifiers (this, + &this->myinfo.sockaddr, + this->myinfo.sockaddr_len, + this->myinfo.identifier); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "can't fill inet/inet6 identifier for server"); + goto err; + } + + ret = fill_inet6_inet_identifiers (this, + &this->peerinfo.sockaddr, + this->peerinfo.sockaddr_len, + this->peerinfo.identifier); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "can't fill inet/inet6 identifier for client"); + goto err; + } + + if (is_inet_sdp) { + ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP; + } + } + break; + + case AF_UNIX: + { + struct sockaddr_un *sunaddr = NULL; + + sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr; + strcpy (this->myinfo.identifier, sunaddr->sun_path); + + sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr; + strcpy (this->peerinfo.identifier, sunaddr->sun_path); + } + break; + + default: + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address family (%d)", + ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family); + ret = -1; + break; + } + +err: + return ret; +} diff --git a/transport/ib-verbs/src/name.h b/transport/ib-verbs/src/name.h new file mode 100644 index 00000000000..1b0f378b94a --- /dev/null +++ b/transport/ib-verbs/src/name.h @@ -0,0 +1,47 @@ +/* + Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#ifndef _IB_VERBS_NAME_H +#define _IB_VERBS_NAME_H + +#include <sys/socket.h> +#include <sys/un.h> + +#include "compat.h" + +int32_t +client_bind (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len, + int sock); + +int32_t +ibverbs_client_get_remote_sockaddr (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len); + +int32_t +ibverbs_server_get_local_sockaddr (transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len); + +int32_t +get_transport_identifiers (transport_t *this); + +#endif /* _IB_VERBS_NAME_H */ diff --git a/transport/socket/Makefile.am b/transport/socket/Makefile.am new file mode 100644 index 00000000000..f963effea22 --- /dev/null +++ b/transport/socket/Makefile.am @@ -0,0 +1 @@ +SUBDIRS = src
\ No newline at end of file diff --git a/transport/socket/src/Makefile.am b/transport/socket/src/Makefile.am new file mode 100644 index 00000000000..e112921232c --- /dev/null +++ b/transport/socket/src/Makefile.am @@ -0,0 +1,14 @@ +noinst_HEADERS = socket.h name.h + +transport_LTLIBRARIES = socket.la +transportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/transport/ + +socket_la_LDFLAGS = -module -avoidversion + +socket_la_SOURCES = socket.c name.c +socket_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ + -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES = *~ diff --git a/transport/socket/src/name.c b/transport/socket/src/name.c new file mode 100644 index 00000000000..a599b00cced --- /dev/null +++ b/transport/socket/src/name.c @@ -0,0 +1,677 @@ +/* + Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#include <sys/types.h> +#include <sys/socket.h> +#include <errno.h> +#include <netdb.h> +#include <string.h> + +#ifdef CLIENT_PORT_CEILING +#undef CLIENT_PORT_CEILING +#endif + +#define CLIENT_PORT_CEILING 1024 + +#ifndef AF_INET_SDP +#define AF_INET_SDP 27 +#endif + +#include "transport.h" +#include "socket.h" + +int32_t +gf_resolve_ip6 (const char *hostname, + uint16_t port, + int family, + void **dnscache, + struct addrinfo **addr_info); + +static int32_t +af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, + socklen_t sockaddr_len, int ceiling) +{ + int32_t ret = -1; + /* struct sockaddr_in sin = {0, }; */ + uint16_t port = ceiling - 1; + + while (port) + { + switch (sockaddr->sa_family) + { + case AF_INET6: + ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port); + break; + + case AF_INET_SDP: + case AF_INET: + ((struct sockaddr_in *)sockaddr)->sin_port = htons (port); + break; + } + + ret = bind (fd, sockaddr, sockaddr_len); + + if (ret == 0) + break; + + if (ret == -1 && errno == EACCES) + break; + + port--; + } + + return ret; +} + +static int32_t +af_unix_client_bind (transport_t *this, + struct sockaddr *sockaddr, + socklen_t sockaddr_len, + int sock) +{ + data_t *path_data = NULL; + struct sockaddr_un *addr = NULL; + int32_t ret = -1; + + path_data = dict_get (this->xl->options, "transport.socket.bind-path"); + if (path_data) { + char *path = data_to_str (path_data); + if (!path || strlen (path) > UNIX_PATH_MAX) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "bind-path not specfied for unix socket, " + "letting connect to assign default value"); + goto err; + } + + addr = (struct sockaddr_un *) sockaddr; + strcpy (addr->sun_path, path); + ret = bind (sock, (struct sockaddr *)addr, sockaddr_len); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "cannot bind to unix-domain socket %d (%s)", + sock, strerror (errno)); + goto err; + } + } + +err: + return ret; +} + +static int32_t +client_fill_address_family (transport_t *this, struct sockaddr *sockaddr) +{ + data_t *address_family_data = NULL; + + address_family_data = dict_get (this->xl->options, + "transport.address-family"); + if (!address_family_data) { + data_t *remote_host_data = NULL, *connect_path_data = NULL; + remote_host_data = dict_get (this->xl->options, "remote-host"); + connect_path_data = dict_get (this->xl->options, + "transport.socket.connect-path"); + + if (!(remote_host_data || connect_path_data) || + (remote_host_data && connect_path_data)) { + gf_log (this->xl->name, GF_LOG_ERROR, + "transport.address-family not specified and " + "not able to determine the " + "same from other options (remote-host:%s and " + "transport.unix.connect-path:%s)", + data_to_str (remote_host_data), + data_to_str (connect_path_data)); + return -1; + } + + if (remote_host_data) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "address-family not specified, guessing it " + "to be inet/inet6"); + sockaddr->sa_family = AF_UNSPEC; + } else { + gf_log (this->xl->name, GF_LOG_DEBUG, + "address-family not specified, guessing it " + "to be unix"); + sockaddr->sa_family = AF_UNIX; + } + + } else { + char *address_family = data_to_str (address_family_data); + if (!strcasecmp (address_family, "unix")) { + sockaddr->sa_family = AF_UNIX; + } else if (!strcasecmp (address_family, "inet")) { + sockaddr->sa_family = AF_INET; + } else if (!strcasecmp (address_family, "inet6")) { + sockaddr->sa_family = AF_INET6; + } else if (!strcasecmp (address_family, "inet-sdp")) { + sockaddr->sa_family = AF_INET_SDP; + } else if (!strcasecmp (address_family, "inet/inet6") + || !strcasecmp (address_family, "inet6/inet")) { + sockaddr->sa_family = AF_UNSPEC; + } else { + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address-family (%s) specified", + address_family); + return -1; + } + } + + return 0; +} + +static int32_t +af_inet_client_get_remote_sockaddr (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len) +{ + dict_t *options = this->xl->options; + data_t *remote_host_data = NULL; + data_t *remote_port_data = NULL; + char *remote_host = NULL; + uint16_t remote_port = 0; + struct addrinfo *addr_info = NULL; + int32_t ret = 0; + + remote_host_data = dict_get (options, "remote-host"); + if (remote_host_data == NULL) + { + gf_log (this->xl->name, GF_LOG_ERROR, + "option remote-host missing in volume %s", this->xl->name); + ret = -1; + goto err; + } + + remote_host = data_to_str (remote_host_data); + if (remote_host == NULL) + { + gf_log (this->xl->name, GF_LOG_ERROR, + "option remote-host has data NULL in volume %s", this->xl->name); + ret = -1; + goto err; + } + + remote_port_data = dict_get (options, "remote-port"); + if (remote_port_data == NULL) + { + gf_log (this->xl->name, GF_LOG_DEBUG, + "option remote-port missing in volume %s. Defaulting to %d", + this->xl->name, GF_DEFAULT_SOCKET_LISTEN_PORT); + + remote_port = GF_DEFAULT_SOCKET_LISTEN_PORT; + } + else + { + remote_port = data_to_uint16 (remote_port_data); + } + + if (remote_port == (uint16_t)-1) + { + gf_log (this->xl->name, GF_LOG_ERROR, + "option remote-port has invalid port in volume %s", + this->xl->name); + ret = -1; + goto err; + } + + /* TODO: gf_resolve is a blocking call. kick in some + non blocking dns techniques */ + ret = gf_resolve_ip6 (remote_host, remote_port, + sockaddr->sa_family, &this->dnscache, &addr_info); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "DNS resolution failed on host %s", remote_host); + goto err; + } + + memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen); + *sockaddr_len = addr_info->ai_addrlen; + +err: + return ret; +} + +static int32_t +af_unix_client_get_remote_sockaddr (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len) +{ + struct sockaddr_un *sockaddr_un = NULL; + char *connect_path = NULL; + data_t *connect_path_data = NULL; + int32_t ret = 0; + + connect_path_data = dict_get (this->xl->options, + "transport.socket.connect-path"); + if (!connect_path_data) { + gf_log (this->xl->name, GF_LOG_ERROR, + "option transport.unix.connect-path not specified for " + "address-family unix"); + ret = -1; + goto err; + } + + connect_path = data_to_str (connect_path_data); + if (!connect_path) { + gf_log (this->xl->name, GF_LOG_ERROR, + "transport.unix.connect-path is null-string"); + ret = -1; + goto err; + } + + if (strlen (connect_path) > UNIX_PATH_MAX) { + gf_log (this->xl->name, GF_LOG_ERROR, + "connect-path value length %"GF_PRI_SIZET" > %d octets", + strlen (connect_path), UNIX_PATH_MAX); + ret = -1; + goto err; + } + + gf_log (this->xl->name, GF_LOG_DEBUG, + "using connect-path %s", connect_path); + sockaddr_un = (struct sockaddr_un *)sockaddr; + strcpy (sockaddr_un->sun_path, connect_path); + *sockaddr_len = sizeof (struct sockaddr_un); + +err: + return ret; +} + +static int32_t +af_unix_server_get_local_sockaddr (transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len) +{ + data_t *listen_path_data = NULL; + char *listen_path = NULL; + int32_t ret = 0; + struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr; + + + listen_path_data = dict_get (this->xl->options, + "transport.socket.listen-path"); + if (!listen_path_data) { + gf_log (this->xl->name, GF_LOG_ERROR, + "missing option transport.socket.listen-path"); + ret = -1; + goto err; + } + + listen_path = data_to_str (listen_path_data); + +#ifndef UNIX_PATH_MAX +#define UNIX_PATH_MAX 108 +#endif + + if (strlen (listen_path) > UNIX_PATH_MAX) { + gf_log (this->xl->name, GF_LOG_ERROR, + "option transport.unix.listen-path has value length " + "%"GF_PRI_SIZET" > %d", + strlen (listen_path), UNIX_PATH_MAX); + ret = -1; + goto err; + } + + sunaddr->sun_family = AF_UNIX; + strcpy (sunaddr->sun_path, listen_path); + *addr_len = sizeof (struct sockaddr_un); + +err: + return ret; +} + +static int32_t +af_inet_server_get_local_sockaddr (transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len) +{ + struct addrinfo hints, *res = 0; + data_t *listen_port_data = NULL, *listen_host_data = NULL; + uint16_t listen_port = -1; + char service[NI_MAXSERV], *listen_host = NULL; + dict_t *options = NULL; + int32_t ret = 0; + + options = this->xl->options; + + listen_port_data = dict_get (options, "transport.socket.listen-port"); + listen_host_data = dict_get (options, "transport.socket.bind-address"); + + if (listen_port_data) + { + listen_port = data_to_uint16 (listen_port_data); + } + + if (listen_port == (uint16_t) -1) + listen_port = GF_DEFAULT_SOCKET_LISTEN_PORT; + + + if (listen_host_data) + { + listen_host = data_to_str (listen_host_data); + } + + memset (service, 0, sizeof (service)); + sprintf (service, "%d", listen_port); + + memset (&hints, 0, sizeof (hints)); + hints.ai_family = addr->sa_family; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE; + + ret = getaddrinfo(listen_host, service, &hints, &res); + if (ret != 0) { + gf_log (this->xl->name, GF_LOG_ERROR, + "getaddrinfo failed for host %s, service %s (%s)", + listen_host, service, gai_strerror (ret)); + ret = -1; + goto err; + } + + memcpy (addr, res->ai_addr, res->ai_addrlen); + *addr_len = res->ai_addrlen; + + freeaddrinfo (res); + +err: + return ret; +} + +int32_t +client_bind (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len, + int sock) +{ + int ret = 0; + + *sockaddr_len = sizeof (struct sockaddr_in6); + switch (sockaddr->sa_family) + { + case AF_INET_SDP: + case AF_INET: + *sockaddr_len = sizeof (struct sockaddr_in); + + case AF_INET6: + ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr, + *sockaddr_len, CLIENT_PORT_CEILING); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "cannot bind inet socket (%d) to port less than %d (%s)", + sock, CLIENT_PORT_CEILING, strerror (errno)); + ret = 0; + } + break; + + case AF_UNIX: + *sockaddr_len = sizeof (struct sockaddr_un); + ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr, + *sockaddr_len, sock); + break; + + default: + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address family %d", sockaddr->sa_family); + ret = -1; + break; + } + + return ret; +} + +int32_t +socket_client_get_remote_sockaddr (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len) +{ + int32_t ret = 0; + char is_inet_sdp = 0; + + ret = client_fill_address_family (this, sockaddr); + if (ret) { + ret = -1; + goto err; + } + + switch (sockaddr->sa_family) + { + case AF_INET_SDP: + sockaddr->sa_family = AF_INET; + is_inet_sdp = 1; + + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + ret = af_inet_client_get_remote_sockaddr (this, sockaddr, sockaddr_len); + + if (is_inet_sdp) { + sockaddr->sa_family = AF_INET_SDP; + } + + break; + + case AF_UNIX: + ret = af_unix_client_get_remote_sockaddr (this, sockaddr, sockaddr_len); + break; + + default: + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address-family %d", sockaddr->sa_family); + ret = -1; + } + +err: + return ret; +} + +int32_t +socket_server_get_local_sockaddr (transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len) +{ + data_t *address_family_data = NULL; + int32_t ret = 0; + char is_inet_sdp = 0; + + address_family_data = dict_get (this->xl->options, + "transport.address-family"); + if (address_family_data) { + char *address_family = NULL; + address_family = data_to_str (address_family_data); + + if (!strcasecmp (address_family, "inet")) { + addr->sa_family = AF_INET; + } else if (!strcasecmp (address_family, "inet6")) { + addr->sa_family = AF_INET6; + } else if (!strcasecmp (address_family, "inet-sdp")) { + addr->sa_family = AF_INET_SDP; + } else if (!strcasecmp (address_family, "unix")) { + addr->sa_family = AF_UNIX; + } else if (!strcasecmp (address_family, "inet/inet6") + || !strcasecmp (address_family, "inet6/inet")) { + addr->sa_family = AF_UNSPEC; + } else { + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address family (%s) specified", address_family); + ret = -1; + goto err; + } + } else { + gf_log (this->xl->name, GF_LOG_DEBUG, + "option address-family not specified, defaulting to inet/inet6"); + addr->sa_family = AF_UNSPEC; + } + + switch (addr->sa_family) + { + case AF_INET_SDP: + is_inet_sdp = 1; + addr->sa_family = AF_INET; + + case AF_INET: + case AF_INET6: + case AF_UNSPEC: + ret = af_inet_server_get_local_sockaddr (this, addr, addr_len); + if (is_inet_sdp && !ret) { + addr->sa_family = AF_INET_SDP; + } + break; + + case AF_UNIX: + ret = af_unix_server_get_local_sockaddr (this, addr, addr_len); + break; + } + +err: + return ret; +} + +int32_t +fill_inet6_inet_identifiers (transport_t *this, struct sockaddr_storage *addr, + int32_t addr_len, char *identifier) +{ + int32_t ret = 0, tmpaddr_len = 0; + char service[NI_MAXSERV], host[NI_MAXHOST]; + struct sockaddr_storage tmpaddr; + + memset (&tmpaddr, 0, sizeof (tmpaddr)); + tmpaddr = *addr; + tmpaddr_len = addr_len; + + if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) { + int32_t one_to_four, four_to_eight, twelve_to_sixteen; + int16_t eight_to_ten, ten_to_twelve; + + one_to_four = four_to_eight = twelve_to_sixteen = 0; + eight_to_ten = ten_to_twelve = 0; + + one_to_four = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[0]; + four_to_eight = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[1]; +#ifdef GF_SOLARIS_HOST_OS + eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[4]; +#else + eight_to_ten = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[4]; +#endif + +#ifdef GF_SOLARIS_HOST_OS + ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[5]; +#else + ten_to_twelve = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[5]; +#endif + + twelve_to_sixteen = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[3]; + + /* ipv4 mapped ipv6 address has + bits 0-80: 0 + bits 80-96: 0xffff + bits 96-128: ipv4 address + */ + + if (one_to_four == 0 && + four_to_eight == 0 && + eight_to_ten == 0 && + ten_to_twelve == -1) { + struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr; + memset (&tmpaddr, 0, sizeof (tmpaddr)); + + in_ptr->sin_family = AF_INET; + in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port; + in_ptr->sin_addr.s_addr = twelve_to_sixteen; + tmpaddr_len = sizeof (*in_ptr); + } + } + + ret = getnameinfo ((struct sockaddr *) &tmpaddr, + tmpaddr_len, + host, sizeof (host), + service, sizeof (service), + NI_NUMERICHOST | NI_NUMERICSERV); + if (ret != 0) { + gf_log (this->xl->name, GF_LOG_ERROR, + "getnameinfo failed (%s)", gai_strerror (ret)); + } + + sprintf (identifier, "%s:%s", host, service); + + return ret; +} + +int32_t +get_transport_identifiers (transport_t *this) +{ + int32_t ret = 0; + char is_inet_sdp = 0; + + switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family) + { + case AF_INET_SDP: + is_inet_sdp = 1; + ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET; + + case AF_INET: + case AF_INET6: + { + ret = fill_inet6_inet_identifiers (this, + &this->myinfo.sockaddr, + this->myinfo.sockaddr_len, + this->myinfo.identifier); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "cannot fill inet/inet6 identifier for server"); + goto err; + } + + ret = fill_inet6_inet_identifiers (this, + &this->peerinfo.sockaddr, + this->peerinfo.sockaddr_len, + this->peerinfo.identifier); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "cannot fill inet/inet6 identifier for client"); + goto err; + } + + if (is_inet_sdp) { + ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP; + } + } + break; + + case AF_UNIX: + { + struct sockaddr_un *sunaddr = NULL; + + sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr; + strcpy (this->myinfo.identifier, sunaddr->sun_path); + + sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr; + strcpy (this->peerinfo.identifier, sunaddr->sun_path); + } + break; + + default: + gf_log (this->xl->name, GF_LOG_ERROR, + "unknown address family (%d)", + ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family); + ret = -1; + break; + } + +err: + return ret; +} diff --git a/transport/socket/src/name.h b/transport/socket/src/name.h new file mode 100644 index 00000000000..552037bcc89 --- /dev/null +++ b/transport/socket/src/name.h @@ -0,0 +1,44 @@ +/* + Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#ifndef _SOCKET_NAME_H +#define _SOCKET_NAME_H + +#include "compat.h" + +int32_t +client_bind (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len, + int sock); + +int32_t +socket_client_get_remote_sockaddr (transport_t *this, + struct sockaddr *sockaddr, + socklen_t *sockaddr_len); + +int32_t +socket_server_get_local_sockaddr (transport_t *this, + struct sockaddr *addr, + socklen_t *addr_len); + +int32_t +get_transport_identifiers (transport_t *this); + +#endif /* _SOCKET_NAME_H */ diff --git a/transport/socket/src/socket.c b/transport/socket/src/socket.c new file mode 100644 index 00000000000..066da782250 --- /dev/null +++ b/transport/socket/src/socket.c @@ -0,0 +1,1370 @@ +/* + Copyright (c) 2008 Z RESEARCH, Inc. <http://www.zresearch.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "socket.h" +#include "name.h" +#include "dict.h" +#include "transport.h" +#include "logging.h" +#include "xlator.h" +#include "byte-order.h" +#include "common-utils.h" +#include "compat-errno.h" + +#include <fcntl.h> +#include <errno.h> + + +#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR) +#define SA(ptr) ((struct sockaddr *)ptr) + +int socket_init (transport_t *this); + +/* + * return value: + * 0 = success (completed) + * -1 = error + * > 0 = incomplete + */ + +int +__socket_rwv (transport_t *this, struct iovec *vector, int count, + struct iovec **pending_vector, int *pending_count, + int write) +{ + socket_private_t *priv = NULL; + int sock = -1; + int ret = -1; + struct iovec *opvector = NULL; + int opcount = 0; + int moved = 0; + + priv = this->private; + sock = priv->sock; + + opvector = vector; + opcount = count; + + while (opcount) { + if (write) { + ret = writev (sock, opvector, opcount); + + if (ret == 0 || (ret == -1 && errno == EAGAIN)) { + /* done for now */ + break; + } + } else { + ret = readv (sock, opvector, opcount); + + if (ret == -1 && errno == EAGAIN) { + /* done for now */ + break; + } + } + + if (ret == 0) { + /* Mostly due to 'umount' in client */ + gf_log (this->xl->name, GF_LOG_DEBUG, + "EOF from peer %s", this->peerinfo.identifier); + opcount = -1; + errno = ENOTCONN; + break; + } + + if (ret == -1) { + if (errno == EINTR) + continue; + + gf_log (this->xl->name, GF_LOG_ERROR, + "%s failed (%s)", write ? "writev" : "readv", + strerror (errno)); + opcount = -1; + break; + } + + moved = 0; + + while (moved < ret) { + if ((ret - moved) >= opvector[0].iov_len) { + moved += opvector[0].iov_len; + opvector++; + opcount--; + } else { + opvector[0].iov_len -= (ret - moved); + opvector[0].iov_base += (ret - moved); + moved += (ret - moved); + } + while (opcount && !opvector[0].iov_len) { + opvector++; + opcount--; + } + } + } + + if (pending_vector) + *pending_vector = opvector; + + if (pending_count) + *pending_count = opcount; + + return opcount; +} + + +int +__socket_readv (transport_t *this, struct iovec *vector, int count, + struct iovec **pending_vector, int *pending_count) +{ + int ret = -1; + + ret = __socket_rwv (this, vector, count, + pending_vector, pending_count, 0); + + return ret; +} + + +int +__socket_writev (transport_t *this, struct iovec *vector, int count, + struct iovec **pending_vector, int *pending_count) +{ + int ret = -1; + + ret = __socket_rwv (this, vector, count, + pending_vector, pending_count, 1); + + return ret; +} + + +int +__socket_disconnect (transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + + priv = this->private; + + if (priv->sock != -1) { + ret = shutdown (priv->sock, SHUT_RDWR); + priv->connected = -1; + gf_log (this->xl->name, GF_LOG_DEBUG, + "shutdown() returned %d. set connection state to -1", + ret); + } + + return ret; +} + + +int +__socket_server_bind (transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + int opt = 1; + + priv = this->private; + + ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, + &opt, sizeof (opt)); + + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "setsockopt() for SO_REUSEADDR failed (%s)", + strerror (errno)); + } + + ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr, + this->myinfo.sockaddr_len); + + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "binding to %s failed: %s", + this->myinfo.identifier, strerror (errno)); + if (errno == EADDRINUSE) { + gf_log (this->xl->name, GF_LOG_ERROR, + "Port is already in use"); + } + } + + return ret; +} + + +int +__socket_nonblock (int fd) +{ + int flags = 0; + int ret = -1; + + flags = fcntl (fd, F_GETFL); + + if (flags != -1) + ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); + + return ret; +} + + +int +__socket_connect_finish (int fd) +{ + int ret = -1; + int optval = 0; + socklen_t optlen = sizeof (int); + + ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen); + + if (ret == 0 && optval) { + errno = optval; + ret = -1; + } + + return ret; +} + + +void +__socket_reset (transport_t *this) +{ + socket_private_t *priv = NULL; + + priv = this->private; + + /* TODO: use mem-pool on incoming data */ + + if (priv->incoming.hdr_p) + free (priv->incoming.hdr_p); + + if (priv->incoming.buf_p) + free (priv->incoming.buf_p); + + memset (&priv->incoming, 0, sizeof (priv->incoming)); + + event_unregister (this->xl->ctx->event_pool, priv->sock, priv->idx); + close (priv->sock); + priv->sock = -1; + priv->idx = -1; + priv->connected = -1; +} + + +struct ioq * +__socket_ioq_new (transport_t *this, char *buf, int len, + struct iovec *vector, int count, dict_t *refs) +{ + socket_private_t *priv = NULL; + struct ioq *entry = NULL; + + priv = this->private; + + /* TODO: use mem-pool */ + entry = CALLOC (1, sizeof (*entry)); + + assert (count <= (MAX_IOVEC-2)); + + entry->header.colonO[0] = ':'; + entry->header.colonO[1] = 'O'; + entry->header.colonO[2] = '\0'; + entry->header.version = 42; + entry->header.size1 = hton32 (len); + entry->header.size2 = hton32 (iov_length (vector, count)); + + entry->vector[0].iov_base = &entry->header; + entry->vector[0].iov_len = sizeof (entry->header); + entry->count++; + + entry->vector[1].iov_base = buf; + entry->vector[1].iov_len = len; + entry->count++; + + if (vector && count) { + memcpy (&entry->vector[2], vector, sizeof (*vector) * count); + entry->count += count; + } + + entry->pending_vector = entry->vector; + entry->pending_count = entry->count; + + if (refs) + entry->refs = dict_ref (refs); + + entry->buf = buf; + + INIT_LIST_HEAD (&entry->list); + + return entry; +} + + +void +__socket_ioq_entry_free (struct ioq *entry) +{ + list_del_init (&entry->list); + if (entry->refs) + dict_unref (entry->refs); + + /* TODO: use mem-pool */ + free (entry->buf); + + /* TODO: use mem-pool */ + free (entry); +} + + +void +__socket_ioq_flush (transport_t *this) +{ + socket_private_t *priv = NULL; + struct ioq *entry = NULL; + + priv = this->private; + + while (!list_empty (&priv->ioq)) { + entry = priv->ioq_next; + __socket_ioq_entry_free (entry); + } + + return; +} + + +int +__socket_ioq_churn_entry (transport_t *this, struct ioq *entry) +{ + int ret = -1; + + ret = __socket_writev (this, entry->pending_vector, + entry->pending_count, + &entry->pending_vector, + &entry->pending_count); + + if (ret == 0) { + /* current entry was completely written */ + assert (entry->pending_count == 0); + __socket_ioq_entry_free (entry); + } + + return ret; +} + + +int +__socket_ioq_churn (transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = 0; + struct ioq *entry = NULL; + + priv = this->private; + + while (!list_empty (&priv->ioq)) { + /* pick next entry */ + entry = priv->ioq_next; + + ret = __socket_ioq_churn_entry (this, entry); + + if (ret != 0) + break; + } + + if (list_empty (&priv->ioq)) { + /* all pending writes done, not interested in POLLOUT */ + priv->idx = event_select_on (this->xl->ctx->event_pool, + priv->sock, priv->idx, -1, 0); + } + + return ret; +} + + +int +socket_event_poll_err (transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + __socket_ioq_flush (this); + __socket_reset (this); + } + pthread_mutex_unlock (&priv->lock); + + this->xl->notify (this->xl, GF_EVENT_POLLERR, this); + + return ret; +} + + +int +socket_event_poll_out (transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + if (priv->connected == 1) { + ret = __socket_ioq_churn (this); + + if (ret == -1) { + __socket_disconnect (this); + } + } + } + pthread_mutex_unlock (&priv->lock); + + this->xl->notify (this->xl, GF_EVENT_POLLOUT, this); + + return ret; +} + + +int +__socket_proto_validate_header (transport_t *this, + struct socket_header *header, + size_t *size1_p, size_t *size2_p) +{ + size_t size1 = 0; + size_t size2 = 0; + + if (strcmp (header->colonO, ":O")) { + gf_log (this->xl->name, GF_LOG_ERROR, + "socket header signature does not match :O (%x.%x.%x)", + header->colonO[0], header->colonO[1], + header->colonO[2]); + return -1; + } + + if (header->version != 42) { + gf_log (this->xl->name, GF_LOG_ERROR, + "socket header version does not match 42 != %d", + header->version); + return -1; + } + + size1 = ntoh32 (header->size1); + size2 = ntoh32 (header->size2); + + if (size1 <= 0 || size1 > 1048576) { + gf_log (this->xl->name, GF_LOG_ERROR, + "socket header has incorrect size1=%"GF_PRI_SIZET, + size1); + return -1; + } + + if (size2 > (1048576 * 4)) { + gf_log (this->xl->name, GF_LOG_ERROR, + "socket header has incorrect size2=%"GF_PRI_SIZET, + size2); + return -1; + } + + if (size1_p) + *size1_p = size1; + + if (size2_p) + *size2_p = size2; + + return 0; +} + + + +/* socket protocol state machine */ + +int +__socket_proto_state_machine (transport_t *this) +{ + int ret = -1; + socket_private_t *priv = NULL; + size_t size1 = 0; + size_t size2 = 0; + int previous_state = -1; + struct socket_header *hdr = NULL; + + + priv = this->private; + + while (priv->incoming.state != SOCKET_PROTO_STATE_COMPLETE) { + /* debug check against infinite loops */ + if (previous_state == priv->incoming.state) { + gf_log (this->xl->name, GF_LOG_ERROR, + "state did not change! (%d) breaking", + previous_state); + ret = -1; + goto unlock; + } + previous_state = priv->incoming.state; + + switch (priv->incoming.state) { + + case SOCKET_PROTO_STATE_NADA: + priv->incoming.pending_vector = + priv->incoming.vector; + + priv->incoming.pending_vector->iov_base = + &priv->incoming.header; + + priv->incoming.pending_vector->iov_len = + sizeof (struct socket_header); + + priv->incoming.state = + SOCKET_PROTO_STATE_HEADER_COMING; + break; + + case SOCKET_PROTO_STATE_HEADER_COMING: + + ret = __socket_readv (this, + priv->incoming.pending_vector, 1, + &priv->incoming.pending_vector, + NULL); + if (ret == 0) { + priv->incoming.state = + SOCKET_PROTO_STATE_HEADER_CAME; + break; + } + + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERRNO (errno), + "read (%s) in state %d (%s)", + strerror (errno), + SOCKET_PROTO_STATE_HEADER_COMING, + this->peerinfo.identifier); + goto unlock; + } + + if (ret > 0) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "partial header read on NB socket."); + goto unlock; + } + break; + + case SOCKET_PROTO_STATE_HEADER_CAME: + hdr = &priv->incoming.header; + ret = __socket_proto_validate_header (this, hdr, + &size1, &size2); + + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "socket header validate failed (%s). " + "possible mismatch of transport-type " + "between server and client volumes, " + "or version mismatch", + this->peerinfo.identifier); + goto unlock; + } + + priv->incoming.hdrlen = size1; + priv->incoming.buflen = size2; + + /* TODO: use mem-pool */ + priv->incoming.hdr_p = MALLOC (size1); + if (size2) + priv->incoming.buf_p = MALLOC (size2); + + priv->incoming.vector[0].iov_base = + priv->incoming.hdr_p; + + priv->incoming.vector[0].iov_len = size1; + + priv->incoming.vector[1].iov_base = + priv->incoming.buf_p; + + priv->incoming.vector[1].iov_len = size2; + priv->incoming.count = size2 ? 2 : 1; + + priv->incoming.pending_vector = + priv->incoming.vector; + + priv->incoming.pending_count = + priv->incoming.count; + + priv->incoming.state = + SOCKET_PROTO_STATE_DATA_COMING; + break; + + case SOCKET_PROTO_STATE_DATA_COMING: + + ret = __socket_readv (this, + priv->incoming.pending_vector, + priv->incoming.pending_count, + &priv->incoming.pending_vector, + &priv->incoming.pending_count); + if (ret == 0) { + priv->incoming.state = + SOCKET_PROTO_STATE_DATA_CAME; + break; + } + + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "read (%s) in state %d (%s)", + strerror (errno), + SOCKET_PROTO_STATE_DATA_COMING, + this->peerinfo.identifier); + goto unlock; + } + + if (ret > 0) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "partial data read on NB socket"); + goto unlock; + } + break; + + case SOCKET_PROTO_STATE_DATA_CAME: + memset (&priv->incoming.vector, 0, + sizeof (priv->incoming.vector)); + priv->incoming.pending_vector = NULL; + priv->incoming.pending_count = 0; + priv->incoming.state = SOCKET_PROTO_STATE_COMPLETE; + break; + + case SOCKET_PROTO_STATE_COMPLETE: + /* not reached */ + break; + + default: + gf_log (this->xl->name, GF_LOG_ERROR, + "undefined state reached: %d", + priv->incoming.state); + goto unlock; + } + } +unlock: + + return ret; +} + + +int +socket_proto_state_machine (transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = 0; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + ret = __socket_proto_state_machine (this); + } + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +int +socket_event_poll_in (transport_t *this) +{ + int ret = -1; + + ret = socket_proto_state_machine (this); + + /* call POLLIN on xlator even if complete block is not received, + just to keep the last_received timestamp ticking */ + + if (ret == 0) + ret = this->xl->notify (this->xl, GF_EVENT_POLLIN, this); + + return ret; +} + + +int +socket_connect_finish (transport_t *this) +{ + int ret = -1; + socket_private_t *priv = NULL; + int event = -1; + char notify_xlator = 0; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + if (priv->connected) + goto unlock; + + ret = __socket_connect_finish (priv->sock); + + if (ret == -1 && errno == EINPROGRESS) + ret = 1; + + if (ret == -1 && errno != EINPROGRESS) { + if (!priv->connect_finish_log) { + gf_log (this->xl->name, GF_LOG_ERROR, + "connection failed (%s)", + strerror (errno)); + priv->connect_finish_log = 1; + } + __socket_disconnect (this); + notify_xlator = 1; + event = GF_EVENT_POLLERR; + goto unlock; + } + + if (ret == 0) { + notify_xlator = 1; + + this->myinfo.sockaddr_len = + sizeof (this->myinfo.sockaddr); + + ret = getsockname (priv->sock, + SA (&this->myinfo.sockaddr), + &this->myinfo.sockaddr_len); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "getsockname on (%d) failed (%s)", + priv->sock, strerror (errno)); + __socket_disconnect (this); + event = GF_EVENT_POLLERR; + goto unlock; + } + + priv->connected = 1; + priv->connect_finish_log = 0; + event = GF_EVENT_CHILD_UP; + get_transport_identifiers (this); + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + if (notify_xlator) + this->xl->notify (this->xl, event, this); + + return 0; +} + + +int +socket_event_handler (int fd, int idx, void *data, + int poll_in, int poll_out, int poll_err) +{ + transport_t *this = NULL; + socket_private_t *priv = NULL; + int ret = 0; + + this = data; + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + priv->idx = idx; + } + pthread_mutex_unlock (&priv->lock); + + if (!priv->connected) { + ret = socket_connect_finish (this); + } + + if (!ret && poll_out) { + ret = socket_event_poll_out (this); + } + + if (!ret && poll_in) { + ret = socket_event_poll_in (this); + } + + if (ret < 0 || poll_err) { + socket_event_poll_err (this); + transport_unref (this); + } + + return 0; +} + + +int +socket_server_event_handler (int fd, int idx, void *data, + int poll_in, int poll_out, int poll_err) +{ + transport_t *this = NULL; + socket_private_t *priv = NULL; + int ret = 0; + int new_sock = -1; + transport_t *new_trans = NULL; + struct sockaddr_storage new_sockaddr = {0, }; + socklen_t addrlen = sizeof (new_sockaddr); + socket_private_t *new_priv = NULL; + glusterfs_ctx_t *ctx = NULL; + + this = data; + priv = this->private; + ctx = this->xl->ctx; + + pthread_mutex_lock (&priv->lock); + { + priv->idx = idx; + + if (poll_in) { + new_sock = accept (priv->sock, SA (&new_sockaddr), + &addrlen); + + if (new_sock == -1) + goto unlock; + + if (!priv->bio) { + ret = __socket_nonblock (new_sock); + + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "NBIO on %d failed (%s)", + new_sock, strerror (errno)); + close (new_sock); + goto unlock; + } + } + + new_trans = CALLOC (1, sizeof (*new_trans)); + new_trans->xl = this->xl; + new_trans->fini = this->fini; + + memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, + addrlen); + new_trans->peerinfo.sockaddr_len = addrlen; + + new_trans->myinfo.sockaddr_len = + sizeof (new_trans->myinfo.sockaddr); + + ret = getsockname (new_sock, + SA (&new_trans->myinfo.sockaddr), + &new_trans->myinfo.sockaddr_len); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "getsockname on %d failed (%s)", + new_sock, strerror (errno)); + close (new_sock); + goto unlock; + } + + get_transport_identifiers (new_trans); + socket_init (new_trans); + new_trans->ops = this->ops; + new_trans->init = this->init; + new_trans->fini = this->fini; + + new_priv = new_trans->private; + + pthread_mutex_lock (&new_priv->lock); + { + new_priv->sock = new_sock; + new_priv->connected = 1; + + transport_ref (new_trans); + new_priv->idx = + event_register (ctx->event_pool, + new_sock, + socket_event_handler, + new_trans, 1, 0); + + if (new_priv->idx == -1) + ret = -1; + } + pthread_mutex_unlock (&new_priv->lock); + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +int +socket_disconnect (transport_t *this) +{ + socket_private_t *priv = NULL; + int ret = -1; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + ret = __socket_disconnect (this); + } + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +int +socket_connect (transport_t *this) +{ + int ret = -1; + int sock = -1; + socket_private_t *priv = NULL; + struct sockaddr_storage sockaddr = {0, }; + socklen_t sockaddr_len = 0; + glusterfs_ctx_t *ctx = NULL; + + priv = this->private; + ctx = this->xl->ctx; + + if (!priv) { + gf_log (this->xl->name, GF_LOG_ERROR, + "connect() called on uninitialized transport"); + goto err; + } + + pthread_mutex_lock (&priv->lock); + { + sock = priv->sock; + } + pthread_mutex_unlock (&priv->lock); + + if (sock != -1) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "connect () called on transport already connected"); + goto err; + } + + ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr), + &sockaddr_len); + if (ret == -1) { + /* logged inside client_get_remote_sockaddr */ + goto err; + } + + pthread_mutex_lock (&priv->lock); + { + if (priv->sock != -1) { + gf_log (this->xl->name, GF_LOG_DEBUG, + "connect() -- already connected"); + goto unlock; + } + + memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len); + this->peerinfo.sockaddr_len = sockaddr_len; + + priv->sock = socket (SA (&sockaddr)->sa_family, + SOCK_STREAM, 0); + + if (priv->sock == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "socket creation failed (%s)", + strerror (errno)); + goto unlock; + } + + if (!priv->bio) { + ret = __socket_nonblock (priv->sock); + + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "NBIO on %d failed (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + } + + SA (&this->myinfo.sockaddr)->sa_family = + SA (&this->peerinfo.sockaddr)->sa_family; + + ret = client_bind (this, SA (&this->myinfo.sockaddr), + &this->myinfo.sockaddr_len, priv->sock); + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_WARNING, + "client bind failed: %s", strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + ret = connect (priv->sock, SA (&this->peerinfo.sockaddr), + this->peerinfo.sockaddr_len); + + if (ret == -1 && errno != EINPROGRESS) { + gf_log (this->xl->name, GF_LOG_ERROR, + "connection attempt failed (%s)", + strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + priv->connected = 0; + + transport_ref (this); + + priv->idx = event_register (ctx->event_pool, priv->sock, + socket_event_handler, this, 1, 1); + if (priv->idx == -1) + ret = -1; + } +unlock: + pthread_mutex_unlock (&priv->lock); + +err: + return ret; +} + + +int +socket_listen (transport_t *this) +{ + socket_private_t * priv = NULL; + int ret = -1; + int sock = -1; + struct sockaddr_storage sockaddr; + socklen_t sockaddr_len; + peer_info_t *myinfo = NULL; + glusterfs_ctx_t *ctx = NULL; + + priv = this->private; + myinfo = &this->myinfo; + ctx = this->xl->ctx; + + pthread_mutex_lock (&priv->lock); + { + sock = priv->sock; + } + pthread_mutex_unlock (&priv->lock); + + if (sock != -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "alreading listening"); + return ret; + } + + ret = socket_server_get_local_sockaddr (this, SA (&sockaddr), + &sockaddr_len); + + if (ret == -1) { + return ret; + } + + pthread_mutex_lock (&priv->lock); + { + if (priv->sock != -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "already listening"); + goto unlock; + } + + memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len); + myinfo->sockaddr_len = sockaddr_len; + + priv->sock = socket (SA (&sockaddr)->sa_family, + SOCK_STREAM, 0); + + if (priv->sock == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "socket creation failed (%s)", + strerror (errno)); + goto unlock; + } + + if (!priv->bio) { + ret = __socket_nonblock (priv->sock); + + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "NBIO on %d failed (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + } + + ret = __socket_server_bind (this); + + if (ret == -1) { + /* logged inside __socket_server_bind() */ + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + ret = listen (priv->sock, 10); + + if (ret == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "could not set socket %d to listen mode (%s)", + priv->sock, strerror (errno)); + close (priv->sock); + priv->sock = -1; + goto unlock; + } + + transport_ref (this); + + priv->idx = event_register (ctx->event_pool, priv->sock, + socket_server_event_handler, + this, 1, 0); + + if (priv->idx == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "could not register socket %d with events", + priv->sock); + ret = -1; + close (priv->sock); + priv->sock = -1; + goto unlock; + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +int +socket_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, + char **buf_p, size_t *buflen_p) +{ + socket_private_t *priv = NULL; + int ret = -1; + + priv = this->private; + + pthread_mutex_lock (&priv->lock); + { + if (priv->connected != 1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "socket not connected to receive"); + goto unlock; + } + + if (!hdr_p || !hdrlen_p || !buf_p || !buflen_p) { + gf_log (this->xl->name, GF_LOG_ERROR, + "bad parameters %p %p %p %p", + hdr_p, hdrlen_p, buf_p, buflen_p); + goto unlock; + } + + if (priv->incoming.state == SOCKET_PROTO_STATE_COMPLETE) { + *hdr_p = priv->incoming.hdr_p; + *hdrlen_p = priv->incoming.hdrlen; + *buf_p = priv->incoming.buf_p; + *buflen_p = priv->incoming.buflen; + + memset (&priv->incoming, 0, sizeof (priv->incoming)); + priv->incoming.state = SOCKET_PROTO_STATE_NADA; + + ret = 0; + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +/* TODO: implement per transfer limit */ +int +socket_submit (transport_t *this, char *buf, int len, + struct iovec *vector, int count, + dict_t *refs) +{ + socket_private_t *priv = NULL; + int ret = -1; + char need_poll_out = 0; + char need_append = 1; + struct ioq *entry = NULL; + glusterfs_ctx_t *ctx = NULL; + + priv = this->private; + ctx = this->xl->ctx; + + pthread_mutex_lock (&priv->lock); + { + if (priv->connected != 1) { + if (!priv->submit_log && !priv->connect_finish_log) { + gf_log (this->xl->name, GF_LOG_ERROR, + "not connected (priv->connected = %d)", + priv->connected); + priv->submit_log = 1; + } + goto unlock; + } + + priv->submit_log = 0; + entry = __socket_ioq_new (this, buf, len, vector, count, refs); + + if (list_empty (&priv->ioq)) { + ret = __socket_ioq_churn_entry (this, entry); + + if (ret == 0) + need_append = 0; + + if (ret > 0) + need_poll_out = 1; + } + + if (need_append) { + list_add_tail (&entry->list, &priv->ioq); + ret = 0; + } + + if (need_poll_out) { + /* first entry to wait. continue writing on POLLOUT */ + priv->idx = event_select_on (ctx->event_pool, + priv->sock, + priv->idx, -1, 1); + } + } +unlock: + pthread_mutex_unlock (&priv->lock); + + return ret; +} + + +struct transport_ops tops = { + .listen = socket_listen, + .connect = socket_connect, + .disconnect = socket_disconnect, + .submit = socket_submit, + .receive = socket_receive +}; + + +int +socket_init (transport_t *this) +{ + socket_private_t *priv = NULL; + + if (this->private) { + gf_log (this->xl->name, GF_LOG_ERROR, + "double init attempted"); + return -1; + } + + priv = CALLOC (1, sizeof (*priv)); + if (!priv) { + gf_log (this->xl->name, GF_LOG_ERROR, + "calloc (1, %"GF_PRI_SIZET") returned NULL", + sizeof (*priv)); + return -1; + } + + pthread_mutex_init (&priv->lock, NULL); + + priv->sock = -1; + priv->idx = -1; + priv->connected = -1; + + INIT_LIST_HEAD (&priv->ioq); + + if (dict_get (this->xl->options, "non-blocking-io")) { + gf_boolean_t tmp_bool = 0; + char *nb_connect = data_to_str (dict_get (this->xl->options, + "non-blocking-io")); + + if (gf_string2boolean (nb_connect, &tmp_bool) == -1) { + gf_log (this->xl->name, GF_LOG_ERROR, + "'non-blocking-io' takes only boolean options," + " not taking any action"); + tmp_bool = 1; + } + priv->bio = 0; + if (!tmp_bool) { + priv->bio = 1; + gf_log (this->xl->name, GF_LOG_WARNING, + "disabling non-blocking IO"); + } + } + + this->private = priv; + + return 0; +} + + +void +fini (transport_t *this) +{ + socket_private_t *priv = this->private; + gf_log (this->xl->name, GF_LOG_DEBUG, + "transport %p destroyed", this); + + pthread_mutex_destroy (&priv->lock); + FREE (priv); +} + + +int32_t +init (transport_t *this) +{ + int ret = -1; + + ret = socket_init (this); + + if (ret == -1) + { + gf_log (this->xl->name, GF_LOG_ERROR, "socket_init() failed"); + } + + return ret; +} + +struct volume_options options[] = { + { .key = {"remote-port", + "transport.remote-port", + "transport.socket.remote-port"}, + .type = GF_OPTION_TYPE_INT + }, + { .key = {"transport.socket.listen-port", "listen-port"}, + .type = GF_OPTION_TYPE_INT + }, + { .key = {"transport.socket.bind-address", "bind-address" }, + .type = GF_OPTION_TYPE_ANY + }, + { .key = {"transport.socket.connect-path", "connect-path"}, + .type = GF_OPTION_TYPE_ANY + }, + { .key = {"transport.socket.bind-path", "bind-path"}, + .type = GF_OPTION_TYPE_ANY + }, + { .key = {"transport.socket.listen-path", "listen-path"}, + .type = GF_OPTION_TYPE_ANY + }, + { .key = { "transport.address-family", + "address-family" }, + .value = {"inet", "inet6", "inet/inet6", "inet6/inet", + "unix", "inet-sdp" }, + .type = GF_OPTION_TYPE_STR + }, + + { .key = {NULL} } +}; + diff --git a/transport/socket/src/socket.h b/transport/socket/src/socket.h new file mode 100644 index 00000000000..070e69d088b --- /dev/null +++ b/transport/socket/src/socket.h @@ -0,0 +1,106 @@ +/* + Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#ifndef _SOCKET_H +#define _SOCKET_H + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "event.h" +#include "transport.h" +#include "logging.h" +#include "dict.h" +#include "mem-pool.h" + +#ifndef MAX_IOVEC +#define MAX_IOVEC 16 +#endif /* MAX_IOVEC */ + +#define GF_DEFAULT_SOCKET_LISTEN_PORT 6996 + +typedef enum { + SOCKET_PROTO_STATE_NADA = 0, + SOCKET_PROTO_STATE_HEADER_COMING, + SOCKET_PROTO_STATE_HEADER_CAME, + SOCKET_PROTO_STATE_DATA_COMING, + SOCKET_PROTO_STATE_DATA_CAME, + SOCKET_PROTO_STATE_COMPLETE, +} socket_proto_state_t; + +struct socket_header { + char colonO[3]; + uint32_t size1; + uint32_t size2; + char version; +} __attribute__((packed)); + + +struct ioq { + union { + struct list_head list; + struct { + struct ioq *next; + struct ioq *prev; + }; + }; + struct socket_header header; + struct iovec vector[MAX_IOVEC]; + int count; + struct iovec *pending_vector; + int pending_count; + char *buf; + dict_t *refs; +}; + + +typedef struct { + int32_t sock; + int32_t idx; + unsigned char connected; // -1 = not connected. 0 = in progress. 1 = connected + char bio; + char connect_finish_log; + char submit_log; + union { + struct list_head ioq; + struct { + struct ioq *ioq_next; + struct ioq *ioq_prev; + }; + }; + struct { + int state; + struct socket_header header; + char *hdr_p; + size_t hdrlen; + char *buf_p; + size_t buflen; + struct iovec vector[2]; + int count; + struct iovec *pending_vector; + int pending_count; + } incoming; + pthread_mutex_t lock; +} socket_private_t; + + +#endif |