summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/unify/src
diff options
context:
space:
mode:
authorVikas Gorur <vikas@zresearch.com>2009-02-18 17:36:07 +0530
committerVikas Gorur <vikas@zresearch.com>2009-02-18 17:36:07 +0530
commit77adf4cd648dce41f89469dd185deec6b6b53a0b (patch)
tree02e155a5753b398ee572b45793f889b538efab6b /xlators/cluster/unify/src
parentf3b2e6580e5663292ee113c741343c8a43ee133f (diff)
Added all files
Diffstat (limited to 'xlators/cluster/unify/src')
-rw-r--r--xlators/cluster/unify/src/Makefile.am16
-rw-r--r--xlators/cluster/unify/src/unify-self-heal.c1225
-rw-r--r--xlators/cluster/unify/src/unify.c4451
-rw-r--r--xlators/cluster/unify/src/unify.h132
4 files changed, 5824 insertions, 0 deletions
diff --git a/xlators/cluster/unify/src/Makefile.am b/xlators/cluster/unify/src/Makefile.am
new file mode 100644
index 00000000000..b9e6f63e9d7
--- /dev/null
+++ b/xlators/cluster/unify/src/Makefile.am
@@ -0,0 +1,16 @@
+
+xlator_LTLIBRARIES = unify.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster
+
+unify_la_LDFLAGS = -module -avoidversion
+
+unify_la_SOURCES = unify.c unify-self-heal.c
+unify_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+noinst_HEADERS = unify.h
+
+AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS) \
+ -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS)
+
+CLEANFILES =
+
diff --git a/xlators/cluster/unify/src/unify-self-heal.c b/xlators/cluster/unify/src/unify-self-heal.c
new file mode 100644
index 00000000000..4885dd91a35
--- /dev/null
+++ b/xlators/cluster/unify/src/unify-self-heal.c
@@ -0,0 +1,1225 @@
+/*
+ Copyright (c) 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+/**
+ * unify-self-heal.c :
+ * This file implements few functions which enables 'unify' translator
+ * to be consistent in its behaviour when
+ * > a node fails,
+ * > a node gets added,
+ * > a failed node comes back
+ * > a new namespace server is added (ie, an fresh namespace server).
+ *
+ * This functionality of 'unify' will enable glusterfs to support storage
+ * system failure, and maintain consistancy. This works both ways, ie, when
+ * an entry (either file or directory) is found on namespace server, and not
+ * on storage nodes, its created in storage nodes and vica-versa.
+ *
+ * The two fops, where it can be implemented are 'getdents ()' and 'lookup ()'
+ *
+ */
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "unify.h"
+#include "dict.h"
+#include "xlator.h"
+#include "hashfn.h"
+#include "logging.h"
+#include "stack.h"
+#include "common-utils.h"
+
+int32_t
+unify_sh_getdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dir_entry_t *entry,
+ int32_t count);
+
+int32_t
+unify_sh_ns_getdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dir_entry_t *entry,
+ int32_t count);
+
+int32_t
+unify_bgsh_getdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dir_entry_t *entry,
+ int32_t count);
+
+int32_t
+unify_bgsh_ns_getdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dir_entry_t *entry,
+ int32_t count);
+
+/**
+ * unify_local_wipe - free all the extra allocation of local->* here.
+ */
+static void
+unify_local_wipe (unify_local_t *local)
+{
+ /* Free the strdup'd variables in the local structure */
+ if (local->name) {
+ FREE (local->name);
+ }
+
+ if (local->sh_struct) {
+ if (local->sh_struct->offset_list)
+ FREE (local->sh_struct->offset_list);
+
+ if (local->sh_struct->entry_list)
+ FREE (local->sh_struct->entry_list);
+
+ if (local->sh_struct->count_list)
+ FREE (local->sh_struct->count_list);
+
+ FREE (local->sh_struct);
+ }
+
+ loc_wipe (&local->loc1);
+ loc_wipe (&local->loc2);
+}
+
+int32_t
+unify_sh_setdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int32_t callcnt = -1;
+ unify_local_t *local = frame->local;
+ inode_t *inode = NULL;
+ dict_t *tmp_dict = NULL;
+ dir_entry_t *prev, *entry, *trav;
+
+ LOCK (&frame->lock);
+ {
+ /* if local->call_count == 0, that means, setdents on
+ * storagenodes is still pending.
+ */
+ if (local->call_count)
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+
+ if (callcnt == 0) {
+ if (local->sh_struct->entry_list[0]) {
+ prev = entry = local->sh_struct->entry_list[0];
+ if (!entry)
+ return 0;
+ trav = entry->next;
+ while (trav) {
+ prev->next = trav->next;
+ FREE (trav->name);
+ if (S_ISLNK (trav->buf.st_mode))
+ FREE (trav->link);
+ FREE (trav);
+ trav = prev->next;
+ }
+ FREE (entry);
+ }
+
+ if (!local->flags) {
+ if (local->sh_struct->count_list[0] >=
+ UNIFY_SELF_HEAL_GETDENTS_COUNT) {
+ /* count == size, that means, there are more entries
+ to read from */
+ //local->call_count = 0;
+ local->sh_struct->offset_list[0] +=
+ UNIFY_SELF_HEAL_GETDENTS_COUNT;
+ STACK_WIND (frame,
+ unify_sh_ns_getdents_cbk,
+ NS(this),
+ NS(this)->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ local->sh_struct->offset_list[0],
+ GF_GET_DIR_ONLY);
+ }
+ } else {
+ inode = local->loc1.inode;
+ fd_unref (local->fd);
+ tmp_dict = local->dict;
+
+ unify_local_wipe (local);
+
+ STACK_UNWIND (frame, local->op_ret, local->op_errno,
+ inode, &local->stbuf, local->dict);
+ if (tmp_dict)
+ dict_unref (local->dict);
+ }
+ }
+
+ return 0;
+}
+
+
+int32_t
+unify_sh_ns_getdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dir_entry_t *entry,
+ int32_t count)
+{
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ long index = 0;
+ unsigned long final = 0;
+ dir_entry_t *tmp = CALLOC (1, sizeof (dir_entry_t));
+
+ local->sh_struct->entry_list[0] = tmp;
+ local->sh_struct->count_list[0] = count;
+ if (entry) {
+ tmp->next = entry->next;
+ entry->next = NULL;
+ }
+
+ if ((count < UNIFY_SELF_HEAL_GETDENTS_COUNT) || !entry) {
+ final = 1;
+ }
+
+ LOCK (&frame->lock);
+ {
+ /* local->call_count will be '0' till now. make it 1 so, it
+ can be UNWIND'ed for the last call. */
+ local->call_count = priv->child_count;
+ if (final)
+ local->flags = 1;
+ }
+ UNLOCK (&frame->lock);
+
+ for (index = 0; index < priv->child_count; index++)
+ {
+ STACK_WIND_COOKIE (frame,
+ unify_sh_setdents_cbk,
+ (void *)index,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->setdents,
+ local->fd, GF_SET_DIR_ONLY,
+ local->sh_struct->entry_list[0], count);
+ }
+
+ return 0;
+}
+
+int32_t
+unify_sh_ns_setdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int32_t callcnt = -1;
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ long index = (long)cookie;
+ dir_entry_t *prev, *entry, *trav;
+
+ LOCK (&frame->lock);
+ {
+ if (local->sh_struct->entry_list[index]) {
+ prev = entry = local->sh_struct->entry_list[index];
+ trav = entry->next;
+ while (trav) {
+ prev->next = trav->next;
+ FREE (trav->name);
+ if (S_ISLNK (trav->buf.st_mode))
+ FREE (trav->link);
+ FREE (trav);
+ trav = prev->next;
+ }
+ FREE (entry);
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (local->sh_struct->count_list[index] <
+ UNIFY_SELF_HEAL_GETDENTS_COUNT) {
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+ } else {
+ /* count == size, that means, there are more entries
+ to read from */
+ local->sh_struct->offset_list[index] +=
+ UNIFY_SELF_HEAL_GETDENTS_COUNT;
+ STACK_WIND_COOKIE (frame,
+ unify_sh_getdents_cbk,
+ cookie,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ local->sh_struct->offset_list[index],
+ GF_GET_ALL);
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "readdir on (%s) with offset %"PRId64"",
+ priv->xl_array[index]->name,
+ local->sh_struct->offset_list[index]);
+ }
+
+ if (!callcnt) {
+ /* All storage nodes have done unified setdents on NS node.
+ * Now, do getdents from NS and do setdents on storage nodes.
+ */
+
+ /* sh_struct->offset_list is no longer required for
+ storage nodes now */
+ local->sh_struct->offset_list[0] = 0; /* reset */
+
+ STACK_WIND (frame,
+ unify_sh_ns_getdents_cbk,
+ NS(this),
+ NS(this)->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ 0, /* In this call, do send '0' as offset */
+ GF_GET_DIR_ONLY);
+ }
+
+ return 0;
+}
+
+
+/**
+ * unify_sh_getdents_cbk -
+ */
+int32_t
+unify_sh_getdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dir_entry_t *entry,
+ int32_t count)
+{
+ int32_t callcnt = -1;
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ long index = (long)cookie;
+ dir_entry_t *tmp = NULL;
+
+ if (op_ret >= 0 && count > 0) {
+ /* There is some dentry found, just send the dentry to NS */
+ tmp = CALLOC (1, sizeof (dir_entry_t));
+ local->sh_struct->entry_list[index] = tmp;
+ local->sh_struct->count_list[index] = count;
+ if (entry) {
+ tmp->next = entry->next;
+ entry->next = NULL;
+ }
+ STACK_WIND_COOKIE (frame,
+ unify_sh_ns_setdents_cbk,
+ cookie,
+ NS(this),
+ NS(this)->fops->setdents,
+ local->fd,
+ GF_SET_IF_NOT_PRESENT,
+ local->sh_struct->entry_list[index],
+ count);
+ return 0;
+ }
+
+ if (count < UNIFY_SELF_HEAL_GETDENTS_COUNT) {
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+ } else {
+ /* count == size, that means, there are more entries
+ to read from */
+ local->sh_struct->offset_list[index] +=
+ UNIFY_SELF_HEAL_GETDENTS_COUNT;
+ STACK_WIND_COOKIE (frame,
+ unify_sh_getdents_cbk,
+ cookie,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ local->sh_struct->offset_list[index],
+ GF_GET_ALL);
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "readdir on (%s) with offset %"PRId64"",
+ priv->xl_array[index]->name,
+ local->sh_struct->offset_list[index]);
+ }
+
+ if (!callcnt) {
+ /* All storage nodes have done unified setdents on NS node.
+ * Now, do getdents from NS and do setdents on storage nodes.
+ */
+
+ /* sh_struct->offset_list is no longer required for
+ storage nodes now */
+ local->sh_struct->offset_list[0] = 0; /* reset */
+
+ STACK_WIND (frame,
+ unify_sh_ns_getdents_cbk,
+ NS(this),
+ NS(this)->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ 0, /* In this call, do send '0' as offset */
+ GF_GET_DIR_ONLY);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_sh_opendir_cbk -
+ *
+ * @cookie:
+ */
+int32_t
+unify_sh_opendir_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd)
+{
+ int32_t callcnt = 0;
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ int16_t index = 0;
+ inode_t *inode = NULL;
+ dict_t *tmp_dict = NULL;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+
+ if (op_ret >= 0) {
+ local->op_ret = op_ret;
+ } else {
+ gf_log (this->name, GF_LOG_WARNING, "failed");
+ local->failed = 1;
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ local->call_count = priv->child_count + 1;
+
+ if (!local->failed) {
+ /* send getdents() namespace after finishing
+ storage nodes */
+ local->call_count--;
+
+ fd_bind (fd);
+
+ if (local->call_count) {
+ /* Used as the offset index. This list keeps
+ * track of offset sent to each node during
+ * STACK_WIND.
+ */
+ local->sh_struct->offset_list =
+ calloc (priv->child_count,
+ sizeof (off_t));
+ ERR_ABORT (local->sh_struct->offset_list);
+
+ local->sh_struct->entry_list =
+ calloc (priv->child_count,
+ sizeof (dir_entry_t *));
+ ERR_ABORT (local->sh_struct->entry_list);
+
+ local->sh_struct->count_list =
+ calloc (priv->child_count,
+ sizeof (int));
+ ERR_ABORT (local->sh_struct->count_list);
+
+ /* Send getdents on all the fds */
+ for (index = 0;
+ index < priv->child_count; index++) {
+ STACK_WIND_COOKIE (frame,
+ unify_sh_getdents_cbk,
+ (void *)(long)index,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ 0, /* In this call, do send '0' as offset */
+ GF_GET_ALL);
+ }
+
+ /* did stack wind, so no need to unwind here */
+ return 0;
+ } /* (local->call_count) */
+ } /* (!local->failed) */
+
+ /* Opendir failed on one node. */
+ inode = local->loc1.inode;
+ fd_unref (local->fd);
+ tmp_dict = local->dict;
+
+ unify_local_wipe (local);
+ /* Only 'self-heal' failed, lookup() was successful. */
+ local->op_ret = 0;
+
+ /* This is lookup_cbk ()'s UNWIND. */
+ STACK_UNWIND (frame, local->op_ret, local->op_errno, inode,
+ &local->stbuf, local->dict);
+ if (tmp_dict)
+ dict_unref (tmp_dict);
+ }
+
+ return 0;
+}
+
+/**
+ * gf_sh_checksum_cbk -
+ *
+ * @frame: frame used in lookup. get a copy of it, and use that copy.
+ * @this: pointer to unify xlator.
+ * @inode: pointer to inode, for which the consistency check is required.
+ *
+ */
+int32_t
+unify_sh_checksum_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ uint8_t *file_checksum,
+ uint8_t *dir_checksum)
+{
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ int16_t index = 0;
+ int32_t callcnt = 0;
+ inode_t *inode = NULL;
+ dict_t *tmp_dict = NULL;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ if (op_ret >= 0) {
+ if (NS(this) == (xlator_t *)cookie) {
+ memcpy (local->sh_struct->ns_file_checksum,
+ file_checksum, ZR_FILENAME_MAX);
+ memcpy (local->sh_struct->ns_dir_checksum,
+ dir_checksum, ZR_FILENAME_MAX);
+ } else {
+ if (local->entry_count == 0) {
+ /* Initialize the dir_checksum to be
+ * used for comparision with other
+ * storage nodes. Should be done for
+ * the first successful call *only*.
+ */
+ /* Using 'entry_count' as a flag */
+ local->entry_count = 1;
+ memcpy (local->sh_struct->dir_checksum,
+ dir_checksum, ZR_FILENAME_MAX);
+ }
+
+ /* Reply from the storage nodes */
+ for (index = 0;
+ index < ZR_FILENAME_MAX; index++) {
+ /* Files should be present in
+ only one node */
+ local->sh_struct->file_checksum[index] ^= file_checksum[index];
+
+ /* directory structure should be
+ same accross */
+ if (local->sh_struct->dir_checksum[index] != dir_checksum[index])
+ local->failed = 1;
+ }
+ }
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ for (index = 0; index < ZR_FILENAME_MAX ; index++) {
+ if (local->sh_struct->file_checksum[index] !=
+ local->sh_struct->ns_file_checksum[index]) {
+ local->failed = 1;
+ break;
+ }
+ if (local->sh_struct->dir_checksum[index] !=
+ local->sh_struct->ns_dir_checksum[index]) {
+ local->failed = 1;
+ break;
+ }
+ }
+
+ if (local->failed) {
+ /* Log it, it should be a rare event */
+ gf_log (this->name, GF_LOG_WARNING,
+ "Self-heal triggered on directory %s",
+ local->loc1.path);
+
+ /* Any self heal will be done at directory level */
+ local->call_count = 0;
+ local->op_ret = -1;
+ local->failed = 0;
+
+ local->fd = fd_create (local->loc1.inode,
+ frame->root->pid);
+
+ local->call_count = priv->child_count + 1;
+
+ for (index = 0;
+ index < (priv->child_count + 1); index++) {
+ STACK_WIND_COOKIE (frame,
+ unify_sh_opendir_cbk,
+ priv->xl_array[index]->name,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->opendir,
+ &local->loc1,
+ local->fd);
+ }
+ /* opendir can be done on the directory */
+ return 0;
+ }
+
+ /* no mismatch */
+ inode = local->loc1.inode;
+ tmp_dict = local->dict;
+
+ unify_local_wipe (local);
+
+ /* This is lookup_cbk ()'s UNWIND. */
+ STACK_UNWIND (frame,
+ local->op_ret,
+ local->op_errno,
+ inode,
+ &local->stbuf,
+ local->dict);
+ if (tmp_dict)
+ dict_unref (tmp_dict);
+ }
+
+ return 0;
+}
+
+/* Foreground self-heal part over */
+
+/* Background self-heal part */
+
+int32_t
+unify_bgsh_setdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int32_t callcnt = -1;
+ unify_local_t *local = frame->local;
+ dir_entry_t *prev, *entry, *trav;
+
+ LOCK (&frame->lock);
+ {
+ /* if local->call_count == 0, that means, setdents
+ on storagenodes is still pending. */
+ if (local->call_count)
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+
+
+ if (callcnt == 0) {
+ if (local->sh_struct->entry_list[0]) {
+ prev = entry = local->sh_struct->entry_list[0];
+ trav = entry->next;
+ while (trav) {
+ prev->next = trav->next;
+ FREE (trav->name);
+ if (S_ISLNK (trav->buf.st_mode))
+ FREE (trav->link);
+ FREE (trav);
+ trav = prev->next;
+ }
+ FREE (entry);
+ }
+
+ if (!local->flags) {
+ if (local->sh_struct->count_list[0] >=
+ UNIFY_SELF_HEAL_GETDENTS_COUNT) {
+ /* count == size, that means, there are more
+ entries to read from */
+ //local->call_count = 0;
+ local->sh_struct->offset_list[0] +=
+ UNIFY_SELF_HEAL_GETDENTS_COUNT;
+ STACK_WIND (frame,
+ unify_bgsh_ns_getdents_cbk,
+ NS(this),
+ NS(this)->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ local->sh_struct->offset_list[0],
+ GF_GET_DIR_ONLY);
+ }
+ } else {
+ fd_unref (local->fd);
+ unify_local_wipe (local);
+ STACK_DESTROY (frame->root);
+ }
+ }
+
+ return 0;
+}
+
+
+int32_t
+unify_bgsh_ns_getdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dir_entry_t *entry,
+ int32_t count)
+{
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ long index = 0;
+ unsigned long final = 0;
+ dir_entry_t *tmp = CALLOC (1, sizeof (dir_entry_t));
+
+ local->sh_struct->entry_list[0] = tmp;
+ local->sh_struct->count_list[0] = count;
+ if (entry) {
+ tmp->next = entry->next;
+ entry->next = NULL;
+ }
+
+ if ((count < UNIFY_SELF_HEAL_GETDENTS_COUNT) || !entry) {
+ final = 1;
+ }
+
+ LOCK (&frame->lock);
+ {
+ /* local->call_count will be '0' till now. make it 1 so,
+ it can be UNWIND'ed for the last call. */
+ local->call_count = priv->child_count;
+ if (final)
+ local->flags = 1;
+ }
+ UNLOCK (&frame->lock);
+
+ for (index = 0; index < priv->child_count; index++)
+ {
+ STACK_WIND_COOKIE (frame,
+ unify_bgsh_setdents_cbk,
+ (void *)index,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->setdents,
+ local->fd, GF_SET_DIR_ONLY,
+ local->sh_struct->entry_list[0], count);
+ }
+
+ return 0;
+}
+
+int32_t
+unify_bgsh_ns_setdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int32_t callcnt = -1;
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ long index = (long)cookie;
+ dir_entry_t *prev, *entry, *trav;
+
+ if (local->sh_struct->entry_list[index]) {
+ prev = entry = local->sh_struct->entry_list[index];
+ if (!entry)
+ return 0;
+ trav = entry->next;
+ while (trav) {
+ prev->next = trav->next;
+ FREE (trav->name);
+ if (S_ISLNK (trav->buf.st_mode))
+ FREE (trav->link);
+ FREE (trav);
+ trav = prev->next;
+ }
+ FREE (entry);
+ }
+
+ if (local->sh_struct->count_list[index] <
+ UNIFY_SELF_HEAL_GETDENTS_COUNT) {
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+ } else {
+ /* count == size, that means, there are more entries
+ to read from */
+ local->sh_struct->offset_list[index] +=
+ UNIFY_SELF_HEAL_GETDENTS_COUNT;
+ STACK_WIND_COOKIE (frame,
+ unify_bgsh_getdents_cbk,
+ cookie,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ local->sh_struct->offset_list[index],
+ GF_GET_ALL);
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "readdir on (%s) with offset %"PRId64"",
+ priv->xl_array[index]->name,
+ local->sh_struct->offset_list[index]);
+ }
+
+ if (!callcnt) {
+ /* All storage nodes have done unified setdents on NS node.
+ * Now, do getdents from NS and do setdents on storage nodes.
+ */
+
+ /* sh_struct->offset_list is no longer required for
+ storage nodes now */
+ local->sh_struct->offset_list[0] = 0; /* reset */
+
+ STACK_WIND (frame,
+ unify_bgsh_ns_getdents_cbk,
+ NS(this),
+ NS(this)->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ 0, /* In this call, do send '0' as offset */
+ GF_GET_DIR_ONLY);
+ }
+
+ return 0;
+}
+
+
+/**
+ * unify_bgsh_getdents_cbk -
+ */
+int32_t
+unify_bgsh_getdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dir_entry_t *entry,
+ int32_t count)
+{
+ int32_t callcnt = -1;
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ long index = (long)cookie;
+ dir_entry_t *tmp = NULL;
+
+ if (op_ret >= 0 && count > 0) {
+ /* There is some dentry found, just send the dentry to NS */
+ tmp = CALLOC (1, sizeof (dir_entry_t));
+ local->sh_struct->entry_list[index] = tmp;
+ local->sh_struct->count_list[index] = count;
+ if (entry) {
+ tmp->next = entry->next;
+ entry->next = NULL;
+ }
+ STACK_WIND_COOKIE (frame,
+ unify_bgsh_ns_setdents_cbk,
+ cookie,
+ NS(this),
+ NS(this)->fops->setdents,
+ local->fd,
+ GF_SET_IF_NOT_PRESENT,
+ local->sh_struct->entry_list[index],
+ count);
+ return 0;
+ }
+
+ if (count < UNIFY_SELF_HEAL_GETDENTS_COUNT) {
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+ } else {
+ /* count == size, that means, there are more entries to read from */
+ local->sh_struct->offset_list[index] +=
+ UNIFY_SELF_HEAL_GETDENTS_COUNT;
+
+ STACK_WIND_COOKIE (frame,
+ unify_bgsh_getdents_cbk,
+ cookie,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ local->sh_struct->offset_list[index],
+ GF_GET_ALL);
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "readdir on (%s) with offset %"PRId64"",
+ priv->xl_array[index]->name,
+ local->sh_struct->offset_list[index]);
+ }
+
+ if (!callcnt) {
+ /* All storage nodes have done unified setdents on NS node.
+ * Now, do getdents from NS and do setdents on storage nodes.
+ */
+
+ /* sh_struct->offset_list is no longer required for
+ storage nodes now */
+ local->sh_struct->offset_list[0] = 0; /* reset */
+
+ STACK_WIND (frame,
+ unify_bgsh_ns_getdents_cbk,
+ NS(this),
+ NS(this)->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ 0, /* In this call, do send '0' as offset */
+ GF_GET_DIR_ONLY);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_bgsh_opendir_cbk -
+ *
+ * @cookie:
+ */
+int32_t
+unify_bgsh_opendir_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd)
+{
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ int32_t callcnt = 0;
+ int16_t index = 0;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+
+ if (op_ret >= 0) {
+ local->op_ret = op_ret;
+ } else {
+ local->failed = 1;
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ local->call_count = priv->child_count + 1;
+
+ if (!local->failed) {
+ /* send getdents() namespace after finishing
+ storage nodes */
+ local->call_count--;
+ callcnt = local->call_count;
+
+ fd_bind (fd);
+
+ if (local->call_count) {
+ /* Used as the offset index. This list keeps
+ track of offset sent to each node during
+ STACK_WIND. */
+ local->sh_struct->offset_list =
+ calloc (priv->child_count,
+ sizeof (off_t));
+ ERR_ABORT (local->sh_struct->offset_list);
+
+ local->sh_struct->entry_list =
+ calloc (priv->child_count,
+ sizeof (dir_entry_t *));
+ ERR_ABORT (local->sh_struct->entry_list);
+
+ local->sh_struct->count_list =
+ calloc (priv->child_count,
+ sizeof (int));
+ ERR_ABORT (local->sh_struct->count_list);
+
+ /* Send getdents on all the fds */
+ for (index = 0;
+ index < priv->child_count; index++) {
+ STACK_WIND_COOKIE (frame,
+ unify_bgsh_getdents_cbk,
+ (void *)(long)index,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->getdents,
+ local->fd,
+ UNIFY_SELF_HEAL_GETDENTS_COUNT,
+ 0, /* In this call, do send '0' as offset */
+ GF_GET_ALL);
+ }
+ /* did a stack wind, so no need to unwind here */
+ return 0;
+ } /* (local->call_count) */
+ } /* (!local->failed) */
+
+ /* Opendir failed on one node. */
+ fd_unref (local->fd);
+
+ unify_local_wipe (local);
+ STACK_DESTROY (frame->root);
+ }
+
+ return 0;
+}
+
+/**
+ * gf_bgsh_checksum_cbk -
+ *
+ * @frame: frame used in lookup. get a copy of it, and use that copy.
+ * @this: pointer to unify xlator.
+ * @inode: pointer to inode, for which the consistency check is required.
+ *
+ */
+int32_t
+unify_bgsh_checksum_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ uint8_t *file_checksum,
+ uint8_t *dir_checksum)
+{
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ int16_t index = 0;
+ int32_t callcnt = 0;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ if (op_ret >= 0) {
+ if (NS(this) == (xlator_t *)cookie) {
+ memcpy (local->sh_struct->ns_file_checksum,
+ file_checksum, ZR_FILENAME_MAX);
+ memcpy (local->sh_struct->ns_dir_checksum,
+ dir_checksum, ZR_FILENAME_MAX);
+ } else {
+ if (local->entry_count == 0) {
+ /* Initialize the dir_checksum to be
+ * used for comparision with other
+ * storage nodes. Should be done for
+ * the first successful call *only*.
+ */
+ /* Using 'entry_count' as a flag */
+ local->entry_count = 1;
+ memcpy (local->sh_struct->dir_checksum,
+ dir_checksum, ZR_FILENAME_MAX);
+ }
+
+ /* Reply from the storage nodes */
+ for (index = 0;
+ index < ZR_FILENAME_MAX; index++) {
+ /* Files should be present in only
+ one node */
+ local->sh_struct->file_checksum[index] ^= file_checksum[index];
+
+ /* directory structure should be same
+ accross */
+ if (local->sh_struct->dir_checksum[index] != dir_checksum[index])
+ local->failed = 1;
+ }
+ }
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ for (index = 0; index < ZR_FILENAME_MAX ; index++) {
+ if (local->sh_struct->file_checksum[index] !=
+ local->sh_struct->ns_file_checksum[index]) {
+ local->failed = 1;
+ break;
+ }
+ if (local->sh_struct->dir_checksum[index] !=
+ local->sh_struct->ns_dir_checksum[index]) {
+ local->failed = 1;
+ break;
+ }
+ }
+
+ if (local->failed) {
+ /* Log it, it should be a rare event */
+ gf_log (this->name, GF_LOG_WARNING,
+ "Self-heal triggered on directory %s",
+ local->loc1.path);
+
+ /* Any self heal will be done at the directory level */
+ local->op_ret = -1;
+ local->failed = 0;
+
+ local->fd = fd_create (local->loc1.inode,
+ frame->root->pid);
+ local->call_count = priv->child_count + 1;
+
+ for (index = 0;
+ index < (priv->child_count + 1); index++) {
+ STACK_WIND_COOKIE (frame,
+ unify_bgsh_opendir_cbk,
+ priv->xl_array[index]->name,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->opendir,
+ &local->loc1,
+ local->fd);
+ }
+
+ /* opendir can be done on the directory */
+ return 0;
+ }
+
+ /* no mismatch */
+ unify_local_wipe (local);
+ STACK_DESTROY (frame->root);
+ }
+
+ return 0;
+}
+
+/* Background self-heal part over */
+
+
+
+
+/**
+ * zr_unify_self_heal -
+ *
+ * @frame: frame used in lookup. get a copy of it, and use that copy.
+ * @this: pointer to unify xlator.
+ * @inode: pointer to inode, for which the consistency check is required.
+ *
+ */
+int32_t
+zr_unify_self_heal (call_frame_t *frame,
+ xlator_t *this,
+ unify_local_t *local)
+{
+ unify_private_t *priv = this->private;
+ call_frame_t *bg_frame = NULL;
+ unify_local_t *bg_local = NULL;
+ inode_t *tmp_inode = NULL;
+ dict_t *tmp_dict = NULL;
+ int16_t index = 0;
+
+ if (local->inode_generation < priv->inode_generation) {
+ /* Any self heal will be done at the directory level */
+ /* Update the inode's generation to the current generation
+ value. */
+ local->inode_generation = priv->inode_generation;
+ inode_ctx_put (local->loc1.inode, this,
+ (uint64_t)(long)local->inode_generation);
+
+ if (priv->self_heal == ZR_UNIFY_FG_SELF_HEAL) {
+ local->op_ret = 0;
+ local->failed = 0;
+ local->call_count = priv->child_count + 1;
+ local->sh_struct =
+ calloc (1, sizeof (struct unify_self_heal_struct));
+
+ /* +1 is for NS */
+ for (index = 0;
+ index < (priv->child_count + 1); index++) {
+ STACK_WIND_COOKIE (frame,
+ unify_sh_checksum_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->checksum,
+ &local->loc1,
+ 0);
+ }
+
+ /* Self-heal in foreground, hence no need
+ to UNWIND here */
+ return 0;
+ }
+
+ /* Self Heal done in background */
+ bg_frame = copy_frame (frame);
+ INIT_LOCAL (bg_frame, bg_local);
+ loc_copy (&bg_local->loc1, &local->loc1);
+ bg_local->op_ret = 0;
+ bg_local->failed = 0;
+ bg_local->call_count = priv->child_count + 1;
+ bg_local->sh_struct =
+ calloc (1, sizeof (struct unify_self_heal_struct));
+
+ /* +1 is for NS */
+ for (index = 0; index < (priv->child_count + 1); index++) {
+ STACK_WIND_COOKIE (bg_frame,
+ unify_bgsh_checksum_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->checksum,
+ &bg_local->loc1,
+ 0);
+ }
+ }
+
+ /* generation number matches, self heal already done or
+ * self heal done in background: just do STACK_UNWIND
+ */
+ tmp_inode = local->loc1.inode;
+ tmp_dict = local->dict;
+
+ unify_local_wipe (local);
+
+ /* This is lookup_cbk ()'s UNWIND. */
+ STACK_UNWIND (frame,
+ local->op_ret,
+ local->op_errno,
+ tmp_inode,
+ &local->stbuf,
+ local->dict);
+
+ if (tmp_dict)
+ dict_unref (tmp_dict);
+
+ return 0;
+}
+
diff --git a/xlators/cluster/unify/src/unify.c b/xlators/cluster/unify/src/unify.c
new file mode 100644
index 00000000000..e2a5e14b191
--- /dev/null
+++ b/xlators/cluster/unify/src/unify.c
@@ -0,0 +1,4451 @@
+/*
+ Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+/**
+ * xlators/cluster/unify:
+ * - This xlator is one of the main translator in GlusterFS, which
+ * actually does the clustering work of the file system. One need to
+ * understand that, unify assumes file to be existing in only one of
+ * the child node, and directories to be present on all the nodes.
+ *
+ * NOTE:
+ * Now, unify has support for global namespace, which is used to keep a
+ * global view of fs's namespace tree. The stat for directories are taken
+ * just from the namespace, where as for files, just 'st_ino' is taken from
+ * Namespace node, and other stat info is taken from the actual storage node.
+ * Also Namespace node helps to keep consistant inode for files across
+ * glusterfs (re-)mounts.
+ */
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "glusterfs.h"
+#include "unify.h"
+#include "dict.h"
+#include "xlator.h"
+#include "hashfn.h"
+#include "logging.h"
+#include "stack.h"
+#include "defaults.h"
+#include "common-utils.h"
+#include <signal.h>
+#include <libgen.h>
+#include "compat-errno.h"
+#include "compat.h"
+
+#define UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR(_loc) do { \
+ if (!(_loc && _loc->inode)) { \
+ STACK_UNWIND (frame, -1, EINVAL, NULL, NULL, NULL); \
+ return 0; \
+ } \
+} while(0)
+
+
+#define UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR(_fd) do { \
+ if (!(_fd && !fd_ctx_get (_fd, this, NULL))) { \
+ STACK_UNWIND (frame, -1, EBADFD, NULL, NULL); \
+ return 0; \
+ } \
+} while(0)
+
+#define UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(_fd) do { \
+ if (!_fd) { \
+ STACK_UNWIND (frame, -1, EBADFD, NULL, NULL); \
+ return 0; \
+ } \
+} while(0)
+
+/**
+ * unify_local_wipe - free all the extra allocation of local->* here.
+ */
+static void
+unify_local_wipe (unify_local_t *local)
+{
+ /* Free the strdup'd variables in the local structure */
+ if (local->name) {
+ FREE (local->name);
+ }
+ loc_wipe (&local->loc1);
+ loc_wipe (&local->loc2);
+}
+
+
+
+/*
+ * unify_normalize_stats -
+ */
+void
+unify_normalize_stats (struct statvfs *buf,
+ unsigned long bsize,
+ unsigned long frsize)
+{
+ double factor;
+
+ if (buf->f_bsize != bsize) {
+ factor = ((double) buf->f_bsize) / bsize;
+ buf->f_bsize = bsize;
+ buf->f_bfree = (fsblkcnt_t) (factor * buf->f_bfree);
+ buf->f_bavail = (fsblkcnt_t) (factor * buf->f_bavail);
+ }
+
+ if (buf->f_frsize != frsize) {
+ factor = ((double) buf->f_frsize) / frsize;
+ buf->f_frsize = frsize;
+ buf->f_blocks = (fsblkcnt_t) (factor * buf->f_blocks);
+ }
+}
+
+
+xlator_t *
+unify_loc_subvol (loc_t *loc, xlator_t *this)
+{
+ unify_private_t *priv = NULL;
+ xlator_t *subvol = NULL;
+ int16_t *list = NULL;
+ long index = 0;
+ xlator_t *subvol_i = NULL;
+ int ret = 0;
+ uint64_t tmp_list = 0;
+
+ priv = this->private;
+ subvol = NS (this);
+
+ if (!S_ISDIR (loc->inode->st_mode)) {
+ ret = inode_ctx_get (loc->inode, this, &tmp_list);
+ list = (int16_t *)(long)tmp_list;
+ if (!list)
+ goto out;
+
+ for (index = 0; list[index] != -1; index++) {
+ subvol_i = priv->xl_array[list[index]];
+ if (subvol_i != NS (this)) {
+ subvol = subvol_i;
+ break;
+ }
+ }
+ }
+out:
+ return subvol;
+}
+
+
+
+/**
+ * unify_statfs_cbk -
+ */
+int32_t
+unify_statfs_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct statvfs *stbuf)
+{
+ int32_t callcnt = 0;
+ struct statvfs *dict_buf = NULL;
+ unsigned long bsize;
+ unsigned long frsize;
+ unify_local_t *local = (unify_local_t *)frame->local;
+ call_frame_t *prev_frame = cookie;
+
+ LOCK (&frame->lock);
+ {
+ if (op_ret >= 0) {
+ /* when a call is successfull, add it to local->dict */
+ dict_buf = &local->statvfs_buf;
+
+ if (dict_buf->f_bsize != 0) {
+ bsize = max (dict_buf->f_bsize,
+ stbuf->f_bsize);
+
+ frsize = max (dict_buf->f_frsize,
+ stbuf->f_frsize);
+ unify_normalize_stats(dict_buf, bsize, frsize);
+ unify_normalize_stats(stbuf, bsize, frsize);
+ } else {
+ dict_buf->f_bsize = stbuf->f_bsize;
+ dict_buf->f_frsize = stbuf->f_frsize;
+ }
+
+ dict_buf->f_blocks += stbuf->f_blocks;
+ dict_buf->f_bfree += stbuf->f_bfree;
+ dict_buf->f_bavail += stbuf->f_bavail;
+ dict_buf->f_files += stbuf->f_files;
+ dict_buf->f_ffree += stbuf->f_ffree;
+ dict_buf->f_favail += stbuf->f_favail;
+ dict_buf->f_fsid = stbuf->f_fsid;
+ dict_buf->f_flag = stbuf->f_flag;
+ dict_buf->f_namemax = stbuf->f_namemax;
+ local->op_ret = op_ret;
+ } else {
+ /* fop on storage node has failed due to some error */
+ if (op_errno != ENOTCONN) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): %s",
+ prev_frame->this->name,
+ strerror (op_errno));
+ }
+ local->op_errno = op_errno;
+ }
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ STACK_UNWIND (frame, local->op_ret, local->op_errno,
+ &local->statvfs_buf);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_statfs -
+ */
+int32_t
+unify_statfs (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ unify_local_t *local = NULL;
+ xlator_list_t *trav = this->children;
+
+ INIT_LOCAL (frame, local);
+ local->call_count = ((unify_private_t *)this->private)->child_count;
+
+ while(trav) {
+ STACK_WIND (frame,
+ unify_statfs_cbk,
+ trav->xlator,
+ trav->xlator->fops->statfs,
+ loc);
+ trav = trav->next;
+ }
+
+ return 0;
+}
+
+/**
+ * unify_buf_cbk -
+ */
+int32_t
+unify_buf_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ int32_t callcnt = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+ call_frame_t *prev_frame = cookie;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+
+ if (op_ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s(): child(%s): path(%s): %s",
+ gf_fop_list[frame->root->op],
+ prev_frame->this->name,
+ (local->loc1.path)?local->loc1.path:"",
+ strerror (op_errno));
+
+ local->op_errno = op_errno;
+ if ((op_errno == ENOENT) && priv->optimist)
+ local->op_ret = 0;
+ }
+
+ if (op_ret >= 0) {
+ local->op_ret = 0;
+
+ if (NS (this) == prev_frame->this) {
+ local->st_ino = buf->st_ino;
+ /* If the entry is directory, get the stat
+ from NS node */
+ if (S_ISDIR (buf->st_mode) ||
+ !local->stbuf.st_blksize) {
+ local->stbuf = *buf;
+ }
+ }
+
+ if ((!S_ISDIR (buf->st_mode)) &&
+ (NS (this) != prev_frame->this)) {
+ /* If file, take the stat info from Storage
+ node. */
+ local->stbuf = *buf;
+ }
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ /* If the inode number is not filled, operation should
+ fail */
+ if (!local->st_ino)
+ local->op_ret = -1;
+
+ local->stbuf.st_ino = local->st_ino;
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno,
+ &local->stbuf);
+ }
+
+ return 0;
+}
+
+#define check_if_dht_linkfile(s) ((s->st_mode & ~S_IFMT) == S_ISVTX)
+
+/**
+ * unify_lookup_cbk -
+ */
+int32_t
+unify_lookup_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf,
+ dict_t *dict)
+{
+ int32_t callcnt = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+ inode_t *tmp_inode = NULL;
+ dict_t *local_dict = NULL;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+
+ if (op_ret == -1) {
+ if ((op_errno != ENOTCONN) && (op_errno != ENOENT)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s): %s",
+ priv->xl_array[(long)cookie]->name,
+ local->loc1.path, strerror (op_errno));
+ local->op_errno = op_errno;
+ local->failed = 1;
+
+ } else if (local->revalidate &&
+ !(priv->optimist && (op_errno == ENOENT))) {
+
+ gf_log (this->name,
+ (op_errno == ENOTCONN) ?
+ GF_LOG_DEBUG:GF_LOG_ERROR,
+ "child(%s): path(%s): %s",
+ priv->xl_array[(long)cookie]->name,
+ local->loc1.path, strerror (op_errno));
+ local->op_errno = op_errno;
+ local->failed = 1;
+ }
+ }
+
+ if (op_ret == 0) {
+ local->op_ret = 0;
+
+ if (check_if_dht_linkfile(buf)) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "file %s may be DHT link file on %s, "
+ "make sure the backend is not shared "
+ "between unify and DHT",
+ local->loc1.path,
+ priv->xl_array[(long)cookie]->name);
+ }
+
+ if (local->stbuf.st_mode && local->stbuf.st_blksize) {
+ /* make sure we already have a stbuf
+ stored in local->stbuf */
+ if (S_ISDIR (local->stbuf.st_mode) &&
+ !S_ISDIR (buf->st_mode)) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "[CRITICAL] '%s' is directory "
+ "on namespace, non-directory "
+ "on node '%s', returning EIO",
+ local->loc1.path,
+ priv->xl_array[(long)cookie]->name);
+ local->return_eio = 1;
+ }
+ if (!S_ISDIR (local->stbuf.st_mode) &&
+ S_ISDIR (buf->st_mode)) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "[CRITICAL] '%s' is directory "
+ "on node '%s', non-directory "
+ "on namespace, returning EIO",
+ local->loc1.path,
+ priv->xl_array[(long)cookie]->name);
+ local->return_eio = 1;
+ }
+ }
+
+ if (!local->revalidate && !S_ISDIR (buf->st_mode)) {
+ /* This is the first time lookup on file*/
+ if (!local->list) {
+ /* list is not allocated, allocate
+ the max possible range */
+ local->list = CALLOC (1, 2 * (priv->child_count + 2));
+ if (!local->list) {
+ gf_log (this->name,
+ GF_LOG_CRITICAL,
+ "Not enough memory");
+ STACK_UNWIND (frame, -1,
+ ENOMEM, inode,
+ NULL, NULL);
+ return 0;
+ }
+ }
+ /* update the index of the list */
+ local->list [local->index++] =
+ (int16_t)(long)cookie;
+ }
+
+ if ((!local->dict) && dict &&
+ (priv->xl_array[(long)cookie] != NS(this))) {
+ local->dict = dict_ref (dict);
+ }
+
+ /* index of NS node is == total child count */
+ if (priv->child_count == (int16_t)(long)cookie) {
+ /* Take the inode number from namespace */
+ local->st_ino = buf->st_ino;
+ if (S_ISDIR (buf->st_mode) ||
+ !(local->stbuf.st_blksize)) {
+ local->stbuf = *buf;
+ }
+ } else if (!S_ISDIR (buf->st_mode)) {
+ /* If file, then get the stat from
+ storage node */
+ local->stbuf = *buf;
+ }
+
+ if (local->st_nlink < buf->st_nlink) {
+ local->st_nlink = buf->st_nlink;
+ }
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ local_dict = local->dict;
+ if (local->return_eio) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "[CRITICAL] Unable to fix the path (%s) with "
+ "self-heal, try manual verification. "
+ "returning EIO.", local->loc1.path);
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, -1, EIO, inode, NULL, NULL);
+ if (local_dict) {
+ dict_unref (local_dict);
+ }
+ return 0;
+ }
+
+ if (!local->stbuf.st_blksize) {
+ /* Inode not present */
+ local->op_ret = -1;
+ } else {
+ if (!local->revalidate &&
+ !S_ISDIR (local->stbuf.st_mode)) {
+ /* If its a file, big array is useless,
+ allocate the smaller one */
+ int16_t *list = NULL;
+ list = CALLOC (1, 2 * (local->index + 1));
+ ERR_ABORT (list);
+ memcpy (list, local->list, 2 * local->index);
+ /* Make the end of the list as -1 */
+ FREE (local->list);
+ local->list = list;
+ local->list [local->index] = -1;
+ /* Update the inode's ctx with proper array */
+ /* TODO: log on failure */
+ inode_ctx_put (local->loc1.inode, this,
+ (uint64_t)(long)local->list);
+ }
+
+ if (S_ISDIR(local->loc1.inode->st_mode)) {
+ /* lookup is done for directory */
+ if (local->failed && priv->self_heal) {
+ /* Triggering self-heal */
+ /* means, self-heal required for this
+ inode */
+ local->inode_generation = 0;
+ priv->inode_generation++;
+ }
+ } else {
+ local->stbuf.st_ino = local->st_ino;
+ }
+
+ local->stbuf.st_nlink = local->st_nlink;
+ }
+ if (local->op_ret == -1) {
+ if (!local->revalidate && local->list)
+ FREE (local->list);
+ }
+
+ if ((local->op_ret >= 0) && local->failed &&
+ local->revalidate) {
+ /* Done revalidate, but it failed */
+ if (op_errno != ENOTCONN) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Revalidate failed for path(%s): %s",
+ local->loc1.path, strerror (op_errno));
+ }
+ local->op_ret = -1;
+ }
+
+ if ((priv->self_heal && !priv->optimist) &&
+ (!local->revalidate && (local->op_ret == 0) &&
+ S_ISDIR(local->stbuf.st_mode))) {
+ /* Let the self heal be done here */
+ zr_unify_self_heal (frame, this, local);
+ local_dict = NULL;
+ } else {
+ /* either no self heal, or op_ret == -1 (failure) */
+ tmp_inode = local->loc1.inode;
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno,
+ tmp_inode, &local->stbuf, local->dict);
+ }
+ if (local_dict) {
+ dict_unref (local_dict);
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * unify_lookup -
+ */
+int32_t
+unify_lookup (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ dict_t *xattr_req)
+{
+ unify_local_t *local = NULL;
+ unify_private_t *priv = this->private;
+ int16_t *list = NULL;
+ long index = 0;
+
+ if (!(loc && loc->inode)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: Argument not right", loc?loc->path:"(null)");
+ STACK_UNWIND (frame, -1, EINVAL, NULL, NULL, NULL);
+ return 0;
+ }
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ loc_copy (&local->loc1, loc);
+ if (local->loc1.path == NULL) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O");
+ STACK_UNWIND (frame, -1, ENOMEM, loc->inode, NULL, NULL);
+ return 0;
+ }
+
+ if (!inode_ctx_get (loc->inode, this, NULL) &&
+ loc->inode->st_mode &&
+ !S_ISDIR (loc->inode->st_mode)) {
+ uint64_t tmp_list = 0;
+ /* check if revalidate or fresh lookup */
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ local->list = (int16_t *)(long)tmp_list;
+ }
+
+ if (local->list) {
+ list = local->list;
+ for (index = 0; list[index] != -1; index++);
+ if (index != 2) {
+ if (index < 2) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "returning ESTALE for %s: file "
+ "count is %ld", loc->path, index);
+ /* Print where all the file is present */
+ for (index = 0;
+ local->list[index] != -1; index++) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: found on %s", loc->path,
+ priv->xl_array[list[index]]->name);
+ }
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, -1, ESTALE,
+ NULL, NULL, NULL);
+ return 0;
+ } else {
+ /* There are more than 2 presences */
+ /* Just log and continue */
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: file count is %ld",
+ loc->path, index);
+ /* Print where all the file is present */
+ for (index = 0;
+ local->list[index] != -1; index++) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: found on %s", loc->path,
+ priv->xl_array[list[index]]->name);
+ }
+ }
+ }
+
+ /* is revalidate */
+ local->revalidate = 1;
+
+ for (index = 0; list[index] != -1; index++)
+ local->call_count++;
+
+ for (index = 0; list[index] != -1; index++) {
+ char need_break = (list[index+1] == -1);
+ STACK_WIND_COOKIE (frame,
+ unify_lookup_cbk,
+ (void *)(long)list[index], //cookie
+ priv->xl_array [list[index]],
+ priv->xl_array [list[index]]->fops->lookup,
+ loc,
+ xattr_req);
+ if (need_break)
+ break;
+ }
+ } else {
+ if (loc->inode->st_mode) {
+ if (inode_ctx_get (loc->inode, this, NULL)) {
+ inode_ctx_get (loc->inode, this,
+ &local->inode_generation);
+ }
+ }
+ /* This is first call, there is no list */
+ /* call count should be all child + 1 namespace */
+ local->call_count = priv->child_count + 1;
+
+ for (index = 0; index <= priv->child_count; index++) {
+ STACK_WIND_COOKIE (frame,
+ unify_lookup_cbk,
+ (void *)index, //cookie
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->lookup,
+ loc,
+ xattr_req);
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * unify_stat - if directory, get the stat directly from NameSpace child.
+ * if file, check for a hint and send it only there (also to NS).
+ * if its a fresh stat, then do it on all the nodes.
+ *
+ * NOTE: for all the call, sending cookie as xlator pointer, which will be
+ * used in cbk.
+ */
+int32_t
+unify_stat (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ unify_local_t *local = NULL;
+ unify_private_t *priv = this->private;
+ int16_t index = 0;
+ int16_t *list = NULL;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ loc_copy (&local->loc1, loc);
+ if (local->loc1.path == NULL) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+ local->st_ino = loc->inode->ino;
+ if (S_ISDIR (loc->inode->st_mode)) {
+ /* Directory */
+ local->call_count = 1;
+ STACK_WIND (frame, unify_buf_cbk, NS(this),
+ NS(this)->fops->stat, loc);
+ } else {
+ /* File */
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; list[index] != -1; index++)
+ local->call_count++;
+
+ for (index = 0; list[index] != -1; index++) {
+ char need_break = (list[index+1] == -1);
+ STACK_WIND (frame,
+ unify_buf_cbk,
+ priv->xl_array[list[index]],
+ priv->xl_array[list[index]]->fops->stat,
+ loc);
+ if (need_break)
+ break;
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * unify_access_cbk -
+ */
+int32_t
+unify_access_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+
+/**
+ * unify_access - Send request to only namespace, which has all the
+ * attributes set for the file.
+ */
+int32_t
+unify_access (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t mask)
+{
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ STACK_WIND (frame,
+ unify_access_cbk,
+ NS(this),
+ NS(this)->fops->access,
+ loc,
+ mask);
+
+ return 0;
+}
+
+int32_t
+unify_mkdir_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf)
+{
+ int32_t callcnt = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+ inode_t *tmp_inode = NULL;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+
+ if ((op_ret == -1) && !(priv->optimist &&
+ (op_errno == ENOENT ||
+ op_errno == EEXIST))) {
+ /* TODO: Decrement the inode_generation of
+ * this->inode's parent inode, hence the missing
+ * directory is created properly by self-heal.
+ * Currently, there is no way to get the parent
+ * inode directly.
+ */
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s): %s",
+ priv->xl_array[(long)cookie]->name,
+ local->loc1.path, strerror (op_errno));
+ if (op_errno != EEXIST)
+ local->failed = 1;
+ local->op_errno = op_errno;
+ }
+
+ if (op_ret >= 0)
+ local->op_ret = 0;
+
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ if (!local->failed) {
+ inode_ctx_put (local->loc1.inode, this,
+ priv->inode_generation);
+ }
+
+ tmp_inode = local->loc1.inode;
+ unify_local_wipe (local);
+
+ STACK_UNWIND (frame, local->op_ret, local->op_errno,
+ tmp_inode, &local->stbuf);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_ns_mkdir_cbk -
+ */
+int32_t
+unify_ns_mkdir_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf)
+{
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+ long index = 0;
+
+ if (op_ret == -1) {
+ /* No need to send mkdir request to other servers,
+ * as namespace action failed
+ */
+ gf_log (this->name, GF_LOG_ERROR,
+ "namespace: path(%s): %s",
+ local->name, strerror (op_errno));
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno, inode, NULL);
+ return 0;
+ }
+
+ /* Create one inode for this entry */
+ local->op_ret = 0;
+ local->stbuf = *buf;
+
+ local->call_count = priv->child_count;
+
+ /* Send mkdir request to all the nodes now */
+ for (index = 0; index < priv->child_count; index++) {
+ STACK_WIND_COOKIE (frame,
+ unify_mkdir_cbk,
+ (void *)index, //cookie
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->mkdir,
+ &local->loc1,
+ local->mode);
+ }
+
+ return 0;
+}
+
+
+/**
+ * unify_mkdir -
+ */
+int32_t
+unify_mkdir (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ mode_t mode)
+{
+ unify_local_t *local = NULL;
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ local->mode = mode;
+
+ loc_copy (&local->loc1, loc);
+
+ if (local->loc1.path == NULL) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL);
+ return 0;
+ }
+
+ STACK_WIND (frame,
+ unify_ns_mkdir_cbk,
+ NS(this),
+ NS(this)->fops->mkdir,
+ loc,
+ mode);
+ return 0;
+}
+
+/**
+ * unify_rmdir_cbk -
+ */
+int32_t
+unify_rmdir_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int32_t callcnt = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ if (op_ret == 0 || (priv->optimist && (op_errno == ENOENT)))
+ local->op_ret = 0;
+ if (op_ret == -1)
+ local->op_errno = op_errno;
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_ns_rmdir_cbk -
+ */
+int32_t
+unify_ns_rmdir_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int16_t index = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+
+ if (op_ret == -1) {
+ /* No need to send rmdir request to other servers,
+ * as namespace action failed
+ */
+ gf_log (this->name,
+ ((op_errno != ENOTEMPTY) ?
+ GF_LOG_ERROR : GF_LOG_DEBUG),
+ "namespace: path(%s): %s",
+ local->loc1.path, strerror (op_errno));
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+ }
+
+ local->call_count = priv->child_count;
+
+ for (index = 0; index < priv->child_count; index++) {
+ STACK_WIND (frame,
+ unify_rmdir_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->rmdir,
+ &local->loc1);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_rmdir -
+ */
+int32_t
+unify_rmdir (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ unify_local_t *local = NULL;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+
+ loc_copy (&local->loc1, loc);
+ if (local->loc1.path == NULL) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O");
+ STACK_UNWIND (frame, -1, ENOMEM);
+ return 0;
+ }
+
+ STACK_WIND (frame,
+ unify_ns_rmdir_cbk,
+ NS(this),
+ NS(this)->fops->rmdir,
+ loc);
+
+ return 0;
+}
+
+/**
+ * unify_open_cbk -
+ */
+int32_t
+unify_open_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd)
+{
+ int32_t callcnt = 0;
+ unify_local_t *local = frame->local;
+
+ LOCK (&frame->lock);
+ {
+ if (op_ret >= 0) {
+ local->op_ret = op_ret;
+ if (NS(this) != (xlator_t *)cookie) {
+ /* Store child node's ptr, used in
+ all the f*** / FileIO calls */
+ fd_ctx_set (fd, this, (uint64_t)(long)cookie);
+ }
+ }
+ if (op_ret == -1) {
+ local->op_errno = op_errno;
+ local->failed = 1;
+ }
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ if ((local->failed == 1) && (local->op_ret >= 0)) {
+ local->call_count = 1;
+ /* return -1 to user */
+ local->op_ret = -1;
+ //local->op_errno = EIO;
+
+ if (!fd_ctx_get (local->fd, this, NULL)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Open success on child node, "
+ "failed on namespace");
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Open success on namespace, "
+ "failed on child node");
+ }
+ }
+
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret,
+ local->op_errno, local->fd);
+ }
+
+ return 0;
+}
+
+#ifdef GF_DARWIN_HOST_OS
+/**
+ * unify_create_lookup_cbk -
+ */
+int32_t
+unify_open_lookup_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf,
+ dict_t *dict)
+{
+ int32_t callcnt = 0;
+ int16_t index = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ if ((op_ret == -1) && (op_errno != ENOENT)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s): %s",
+ priv->xl_array[(long)cookie]->name,
+ local->loc1.path, strerror (op_errno));
+ local->op_errno = op_errno;
+ }
+
+ if (op_ret >= 0) {
+ local->op_ret = op_ret;
+ local->index++;
+ if (NS(this) == priv->xl_array[(long)cookie]) {
+ local->list[0] = (int16_t)(long)cookie;
+ } else {
+ local->list[1] = (int16_t)(long)cookie;
+ }
+ if (S_ISDIR (buf->st_mode))
+ local->failed = 1;
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ int16_t file_list[3] = {0,};
+ local->op_ret = -1;
+
+ file_list[0] = local->list[0];
+ file_list[1] = local->list[1];
+ file_list[2] = -1;
+
+ if (local->index != 2) {
+ /* Lookup failed, can't do open */
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: present on %d nodes",
+ local->name, local->index);
+
+ if (local->index < 2) {
+ unify_local_wipe (local);
+ gf_log (this->name, GF_LOG_ERROR,
+ "returning as file found on less "
+ "than 2 nodes");
+ STACK_UNWIND (frame, local->op_ret,
+ local->op_errno, local->fd);
+ return 0;
+ }
+ }
+
+ if (local->failed) {
+ /* Open on directory, return EISDIR */
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, -1, EISDIR, local->fd);
+ return 0;
+ }
+
+ /* Everything is perfect :) */
+ local->call_count = 2;
+
+ for (index = 0; file_list[index] != -1; index++) {
+ char need_break = (file_list[index+1] == -1);
+ STACK_WIND_COOKIE (frame,
+ unify_open_cbk,
+ priv->xl_array[file_list[index]],
+ priv->xl_array[file_list[index]],
+ priv->xl_array[file_list[index]]->fops->open,
+ &local->loc1,
+ local->flags,
+ local->fd);
+ if (need_break)
+ break;
+ }
+ }
+
+ return 0;
+}
+
+
+int32_t
+unify_open_readlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ const char *path)
+{
+ int16_t index = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+
+ if (op_ret == -1) {
+ STACK_UNWIND (frame, -1, ENOENT);
+ return 0;
+ }
+
+ if (path[0] == '/') {
+ local->name = strdup (path);
+ ERR_ABORT (local->name);
+ } else {
+ char *tmp_str = strdup (local->loc1.path);
+ char *tmp_base = dirname (tmp_str);
+ local->name = CALLOC (1, ZR_PATH_MAX);
+ strcpy (local->name, tmp_base);
+ strncat (local->name, "/", 1);
+ strcat (local->name, path);
+ FREE (tmp_str);
+ }
+
+ local->list = CALLOC (1, sizeof (int16_t) * 3);
+ ERR_ABORT (local->list);
+ local->call_count = priv->child_count + 1;
+ local->op_ret = -1;
+ for (index = 0; index <= priv->child_count; index++) {
+ /* Send the lookup to all the nodes including namespace */
+ STACK_WIND_COOKIE (frame,
+ unify_open_lookup_cbk,
+ (void *)(long)index,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->lookup,
+ &local->loc1,
+ NULL);
+ }
+
+ return 0;
+}
+#endif /* GF_DARWIN_HOST_OS */
+
+/**
+ * unify_open -
+ */
+int32_t
+unify_open (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flags,
+ fd_t *fd)
+{
+ unify_private_t *priv = this->private;
+ unify_local_t *local = NULL;
+ int16_t *list = NULL;
+ int16_t index = 0;
+ int16_t file_list[3] = {0,};
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Init */
+ INIT_LOCAL (frame, local);
+ loc_copy (&local->loc1, loc);
+ local->fd = fd;
+ local->flags = flags;
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ list = (int16_t *)(long)tmp_list;
+
+ local->list = list;
+ file_list[0] = priv->child_count; /* Thats namespace */
+ file_list[2] = -1;
+ for (index = 0; list[index] != -1; index++) {
+ local->call_count++;
+ if (list[index] != priv->child_count)
+ file_list[1] = list[index];
+ }
+
+ if (local->call_count != 2) {
+ /* If the lookup was done for file */
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: entry_count is %d",
+ loc->path, local->call_count);
+ for (index = 0; local->list[index] != -1; index++)
+ gf_log (this->name, GF_LOG_ERROR, "%s: found on %s",
+ loc->path, priv->xl_array[list[index]]->name);
+
+ if (local->call_count < 2) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "returning EIO as file found on onlyone node");
+ STACK_UNWIND (frame, -1, EIO, fd);
+ return 0;
+ }
+ }
+
+#ifdef GF_DARWIN_HOST_OS
+ /* Handle symlink here */
+ if (S_ISLNK (loc->inode->st_mode)) {
+ /* Callcount doesn't matter here */
+ STACK_WIND (frame,
+ unify_open_readlink_cbk,
+ NS(this),
+ NS(this)->fops->readlink,
+ loc, ZR_PATH_MAX);
+ return 0;
+ }
+#endif /* GF_DARWIN_HOST_OS */
+
+ local->call_count = 2;
+ for (index = 0; file_list[index] != -1; index++) {
+ char need_break = (file_list[index+1] == -1);
+ STACK_WIND_COOKIE (frame,
+ unify_open_cbk,
+ priv->xl_array[file_list[index]], //cookie
+ priv->xl_array[file_list[index]],
+ priv->xl_array[file_list[index]]->fops->open,
+ loc,
+ flags,
+ fd);
+ if (need_break)
+ break;
+ }
+
+ return 0;
+}
+
+
+int32_t
+unify_create_unlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ unify_local_t *local = frame->local;
+ inode_t *inode = local->loc1.inode;
+
+ unify_local_wipe (local);
+
+ STACK_UNWIND (frame, local->op_ret, local->op_errno, local->fd,
+ inode, &local->stbuf);
+
+ return 0;
+}
+
+/**
+ * unify_create_open_cbk -
+ */
+int32_t
+unify_create_open_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd)
+{
+ int ret = 0;
+ int32_t callcnt = 0;
+ unify_local_t *local = frame->local;
+ inode_t *inode = NULL;
+ xlator_t *child = NULL;
+ uint64_t tmp_value = 0;
+
+ LOCK (&frame->lock);
+ {
+ if (op_ret >= 0) {
+ local->op_ret = op_ret;
+ if (NS(this) != (xlator_t *)cookie) {
+ /* Store child node's ptr, used in all
+ the f*** / FileIO calls */
+ /* TODO: log on failure */
+ ret = fd_ctx_get (fd, this, &tmp_value);
+ cookie = (void *)(long)tmp_value;
+ } else {
+ /* NOTE: open successful on namespace.
+ * fd's ctx can be used to identify open
+ * failure on storage subvolume. cool
+ * ide ;) */
+ local->failed = 0;
+ }
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s): %s",
+ ((xlator_t *)cookie)->name,
+ local->loc1.path, strerror (op_errno));
+ local->op_errno = op_errno;
+ local->failed = 1;
+ }
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ if (local->failed == 1 && (local->op_ret >= 0)) {
+ local->call_count = 1;
+ /* return -1 to user */
+ local->op_ret = -1;
+ local->op_errno = EIO;
+ local->fd = fd;
+ local->call_count = 1;
+
+ if (!fd_ctx_get (local->fd, this, &tmp_value)) {
+ child = (xlator_t *)(long)tmp_value;
+
+ gf_log (this->name, GF_LOG_ERROR,
+ "Create success on child node, "
+ "failed on namespace");
+
+ STACK_WIND (frame,
+ unify_create_unlink_cbk,
+ child,
+ child->fops->unlink,
+ &local->loc1);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Create success on namespace, "
+ "failed on child node");
+
+ STACK_WIND (frame,
+ unify_create_unlink_cbk,
+ NS(this),
+ NS(this)->fops->unlink,
+ &local->loc1);
+ }
+ return 0;
+ }
+ inode = local->loc1.inode;
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno, fd,
+ inode, &local->stbuf);
+ }
+ return 0;
+}
+
+/**
+ * unify_create_lookup_cbk -
+ */
+int32_t
+unify_create_lookup_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf,
+ dict_t *dict)
+{
+ int32_t callcnt = 0;
+ int16_t index = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ if (op_ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s): %s",
+ priv->xl_array[(long)cookie]->name,
+ local->loc1.path, strerror (op_errno));
+ local->op_errno = op_errno;
+ local->failed = 1;
+ }
+
+ if (op_ret >= 0) {
+ local->op_ret = op_ret;
+ local->list[local->index++] = (int16_t)(long)cookie;
+ if (NS(this) == priv->xl_array[(long)cookie]) {
+ local->st_ino = buf->st_ino;
+ } else {
+ local->stbuf = *buf;
+ }
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ int16_t *list = local->list;
+ int16_t file_list[3] = {0,};
+ local->op_ret = -1;
+
+ local->list [local->index] = -1;
+ file_list[0] = list[0];
+ file_list[1] = list[1];
+ file_list[2] = -1;
+
+ local->stbuf.st_ino = local->st_ino;
+ /* TODO: log on failure */
+ inode_ctx_put (local->loc1.inode, this,
+ (uint64_t)(long)local->list);
+
+ if (local->index != 2) {
+ /* Lookup failed, can't do open */
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: present on %d nodes",
+ local->loc1.path, local->index);
+ file_list[0] = priv->child_count;
+ for (index = 0; list[index] != -1; index++) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: found on %s", local->loc1.path,
+ priv->xl_array[list[index]]->name);
+ if (list[index] != priv->child_count)
+ file_list[1] = list[index];
+ }
+
+ if (local->index < 2) {
+ unify_local_wipe (local);
+ gf_log (this->name, GF_LOG_ERROR,
+ "returning EIO as file found on "
+ "only one node");
+ STACK_UNWIND (frame, -1, EIO,
+ local->fd, inode, NULL);
+ return 0;
+ }
+ }
+ /* Everything is perfect :) */
+ local->call_count = 2;
+
+ for (index = 0; file_list[index] != -1; index++) {
+ char need_break = (file_list[index+1] == -1);
+ STACK_WIND_COOKIE (frame,
+ unify_create_open_cbk,
+ priv->xl_array[file_list[index]],
+ priv->xl_array[file_list[index]],
+ priv->xl_array[file_list[index]]->fops->open,
+ &local->loc1,
+ local->flags,
+ local->fd);
+ if (need_break)
+ break;
+ }
+ }
+
+ return 0;
+}
+
+
+/**
+ * unify_create_cbk -
+ */
+int32_t
+unify_create_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd,
+ inode_t *inode,
+ struct stat *buf)
+{
+ int ret = 0;
+ unify_local_t *local = frame->local;
+ call_frame_t *prev_frame = cookie;
+ inode_t *tmp_inode = NULL;
+
+ if (op_ret == -1) {
+ /* send unlink () on Namespace */
+ local->op_errno = op_errno;
+ local->op_ret = -1;
+ local->call_count = 1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "create failed on %s (file %s, error %s), "
+ "sending unlink to namespace",
+ prev_frame->this->name,
+ local->loc1.path, strerror (op_errno));
+
+ STACK_WIND (frame,
+ unify_create_unlink_cbk,
+ NS(this),
+ NS(this)->fops->unlink,
+ &local->loc1);
+
+ return 0;
+ }
+
+ if (op_ret >= 0) {
+ local->op_ret = op_ret;
+ local->stbuf = *buf;
+ /* Just inode number should be from NS node */
+ local->stbuf.st_ino = local->st_ino;
+
+ /* TODO: log on failure */
+ ret = fd_ctx_set (fd, this, (uint64_t)(long)prev_frame->this);
+ }
+
+ tmp_inode = local->loc1.inode;
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno, local->fd,
+ tmp_inode, &local->stbuf);
+
+ return 0;
+}
+
+/**
+ * unify_ns_create_cbk -
+ *
+ */
+int32_t
+unify_ns_create_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd,
+ inode_t *inode,
+ struct stat *buf)
+{
+ struct sched_ops *sched_ops = NULL;
+ xlator_t *sched_xl = NULL;
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ int16_t *list = NULL;
+ int16_t index = 0;
+
+ if (op_ret == -1) {
+ /* No need to send create request to other servers, as
+ namespace action failed. Handle exclusive create here. */
+ if ((op_errno != EEXIST) ||
+ ((op_errno == EEXIST) &&
+ ((local->flags & O_EXCL) == O_EXCL))) {
+ /* If its just a create call without O_EXCL,
+ don't do this */
+ gf_log (this->name, GF_LOG_ERROR,
+ "namespace: path(%s): %s",
+ local->loc1.path, strerror (op_errno));
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno, fd, inode, buf);
+ return 0;
+ }
+ }
+
+ if (op_ret >= 0) {
+ /* Get the inode number from the NS node */
+ local->st_ino = buf->st_ino;
+
+ local->op_ret = -1;
+
+ /* Start the mapping list */
+ list = CALLOC (1, sizeof (int16_t) * 3);
+ ERR_ABORT (list);
+ inode_ctx_put (inode, this, (uint64_t)(long)list);
+ list[0] = priv->child_count;
+ list[2] = -1;
+
+ /* This means, file doesn't exist anywhere in the Filesystem */
+ sched_ops = priv->sched_ops;
+
+ /* Send create request to the scheduled node now */
+ sched_xl = sched_ops->schedule (this, local->loc1.path);
+ if (sched_xl == NULL)
+ {
+ /* send unlink () on Namespace */
+ local->op_errno = ENOTCONN;
+ local->op_ret = -1;
+ local->call_count = 1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "no node online to schedule create:(file %s) "
+ "sending unlink to namespace",
+ (local->loc1.path)?local->loc1.path:"");
+
+ STACK_WIND (frame,
+ unify_create_unlink_cbk,
+ NS(this),
+ NS(this)->fops->unlink,
+ &local->loc1);
+
+ return 0;
+ }
+
+ for (index = 0; index < priv->child_count; index++)
+ if (sched_xl == priv->xl_array[index])
+ break;
+ list[1] = index;
+
+ STACK_WIND (frame, unify_create_cbk,
+ sched_xl, sched_xl->fops->create,
+ &local->loc1, local->flags, local->mode, fd);
+ } else {
+ /* File already exists, and there is no O_EXCL flag */
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "File(%s) already exists on namespace, sending "
+ "open instead", local->loc1.path);
+
+ local->list = CALLOC (1, sizeof (int16_t) * 3);
+ ERR_ABORT (local->list);
+ local->call_count = priv->child_count + 1;
+ local->op_ret = -1;
+ for (index = 0; index <= priv->child_count; index++) {
+ /* Send lookup() to all nodes including namespace */
+ STACK_WIND_COOKIE (frame,
+ unify_create_lookup_cbk,
+ (void *)(long)index,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->lookup,
+ &local->loc1,
+ NULL);
+ }
+ }
+ return 0;
+}
+
+/**
+ * unify_create - create a file in global namespace first, so other
+ * clients can see them. Create the file in storage nodes in background.
+ */
+int32_t
+unify_create (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flags,
+ mode_t mode,
+ fd_t *fd)
+{
+ unify_local_t *local = NULL;
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ local->mode = mode;
+ local->flags = flags;
+ local->fd = fd;
+
+ loc_copy (&local->loc1, loc);
+ if (local->loc1.path == NULL) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O");
+ STACK_UNWIND (frame, -1, ENOMEM, fd, loc->inode, NULL);
+ return 0;
+ }
+
+ STACK_WIND (frame,
+ unify_ns_create_cbk,
+ NS(this),
+ NS(this)->fops->create,
+ loc,
+ flags | O_EXCL,
+ mode,
+ fd);
+
+ return 0;
+}
+
+
+/**
+ * unify_opendir_cbk -
+ */
+int32_t
+unify_opendir_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ fd_t *fd)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, fd);
+
+ return 0;
+}
+
+/**
+ * unify_opendir -
+ */
+int32_t
+unify_opendir (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ fd_t *fd)
+{
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ STACK_WIND (frame, unify_opendir_cbk,
+ NS(this), NS(this)->fops->opendir, loc, fd);
+
+ return 0;
+}
+
+
+/**
+ * unify_chmod -
+ */
+int32_t
+unify_chmod (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ mode_t mode)
+{
+ unify_local_t *local = NULL;
+ unify_private_t *priv = this->private;
+ int32_t index = 0;
+ int32_t callcnt = 0;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+
+ loc_copy (&local->loc1, loc);
+ local->st_ino = loc->inode->ino;
+
+ if (S_ISDIR (loc->inode->st_mode)) {
+ local->call_count = priv->child_count + 1;
+
+ for (index = 0; index < (priv->child_count + 1); index++) {
+ STACK_WIND (frame,
+ unify_buf_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->chmod,
+ loc, mode);
+ }
+ } else {
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ local->list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; local->list[index] != -1; index++) {
+ local->call_count++;
+ callcnt++;
+ }
+
+ for (index = 0; local->list[index] != -1; index++) {
+ STACK_WIND (frame,
+ unify_buf_cbk,
+ priv->xl_array[local->list[index]],
+ priv->xl_array[local->list[index]]->fops->chmod,
+ loc,
+ mode);
+ if (!--callcnt)
+ break;
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * unify_chown -
+ */
+int32_t
+unify_chown (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ uid_t uid,
+ gid_t gid)
+{
+ unify_local_t *local = NULL;
+ unify_private_t *priv = this->private;
+ int32_t index = 0;
+ int32_t callcnt = 0;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ loc_copy (&local->loc1, loc);
+ local->st_ino = loc->inode->ino;
+
+ if (S_ISDIR (loc->inode->st_mode)) {
+ local->call_count = priv->child_count + 1;
+
+ for (index = 0; index < (priv->child_count + 1); index++) {
+ STACK_WIND (frame,
+ unify_buf_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->chown,
+ loc, uid, gid);
+ }
+ } else {
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ local->list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; local->list[index] != -1; index++) {
+ local->call_count++;
+ callcnt++;
+ }
+
+ for (index = 0; local->list[index] != -1; index++) {
+ STACK_WIND (frame,
+ unify_buf_cbk,
+ priv->xl_array[local->list[index]],
+ priv->xl_array[local->list[index]]->fops->chown,
+ loc, uid, gid);
+ if (!--callcnt)
+ break;
+ }
+ }
+
+ return 0;
+}
+
+
+/**
+ * unify_truncate_cbk -
+ */
+int32_t
+unify_truncate_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ int32_t callcnt = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+ call_frame_t *prev_frame = cookie;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+
+ if (op_ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s): %s",
+ prev_frame->this->name,
+ (local->loc1.path)?local->loc1.path:"",
+ strerror (op_errno));
+ local->op_errno = op_errno;
+ if (!((op_errno == ENOENT) && priv->optimist))
+ local->op_ret = -1;
+ }
+
+ if (op_ret >= 0) {
+ if (NS (this) == prev_frame->this) {
+ local->st_ino = buf->st_ino;
+ /* If the entry is directory, get the
+ stat from NS node */
+ if (S_ISDIR (buf->st_mode) ||
+ !local->stbuf.st_blksize) {
+ local->stbuf = *buf;
+ }
+ }
+
+ if ((!S_ISDIR (buf->st_mode)) &&
+ (NS (this) != prev_frame->this)) {
+ /* If file, take the stat info from
+ Storage node. */
+ local->stbuf = *buf;
+ }
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ if (local->st_ino)
+ local->stbuf.st_ino = local->st_ino;
+ else
+ local->op_ret = -1;
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno,
+ &local->stbuf);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_truncate -
+ */
+int32_t
+unify_truncate (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ off_t offset)
+{
+ unify_local_t *local = NULL;
+ unify_private_t *priv = this->private;
+ int32_t index = 0;
+ int32_t callcnt = 0;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ loc_copy (&local->loc1, loc);
+ local->st_ino = loc->inode->ino;
+
+ if (S_ISDIR (loc->inode->st_mode)) {
+ local->call_count = 1;
+
+ STACK_WIND (frame,
+ unify_buf_cbk,
+ NS(this),
+ NS(this)->fops->stat,
+ loc);
+ } else {
+ local->op_ret = 0;
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ local->list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; local->list[index] != -1; index++) {
+ local->call_count++;
+ callcnt++;
+ }
+
+ /* Don't send truncate to NS node */
+ STACK_WIND (frame, unify_truncate_cbk, NS(this),
+ NS(this)->fops->stat, loc);
+ callcnt--;
+
+ for (index = 0; local->list[index] != -1; index++) {
+ if (NS(this) != priv->xl_array[local->list[index]]) {
+ STACK_WIND (frame,
+ unify_truncate_cbk,
+ priv->xl_array[local->list[index]],
+ priv->xl_array[local->list[index]]->fops->truncate,
+ loc,
+ offset);
+ if (!--callcnt)
+ break;
+ }
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * unify_utimens -
+ */
+int32_t
+unify_utimens (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ struct timespec tv[2])
+{
+ unify_local_t *local = NULL;
+ unify_private_t *priv = this->private;
+ int32_t index = 0;
+ int32_t callcnt = 0;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ loc_copy (&local->loc1, loc);
+ local->st_ino = loc->inode->ino;
+
+ if (S_ISDIR (loc->inode->st_mode)) {
+ local->call_count = priv->child_count + 1;
+
+ for (index = 0; index < (priv->child_count + 1); index++) {
+ STACK_WIND (frame,
+ unify_buf_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->utimens,
+ loc, tv);
+ }
+ } else {
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ local->list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; local->list[index] != -1; index++) {
+ local->call_count++;
+ callcnt++;
+ }
+
+ for (index = 0; local->list[index] != -1; index++) {
+ STACK_WIND (frame,
+ unify_buf_cbk,
+ priv->xl_array[local->list[index]],
+ priv->xl_array[local->list[index]]->fops->utimens,
+ loc,
+ tv);
+ if (!--callcnt)
+ break;
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * unify_readlink_cbk -
+ */
+int32_t
+unify_readlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ const char *path)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, path);
+ return 0;
+}
+
+/**
+ * unify_readlink - Read the link only from the storage node.
+ */
+int32_t
+unify_readlink (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ size_t size)
+{
+ unify_private_t *priv = this->private;
+ int32_t entry_count = 0;
+ int16_t *list = NULL;
+ int16_t index = 0;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; list[index] != -1; index++)
+ entry_count++;
+
+ if (entry_count >= 2) {
+ for (index = 0; list[index] != -1; index++) {
+ if (priv->xl_array[list[index]] != NS(this)) {
+ STACK_WIND (frame,
+ unify_readlink_cbk,
+ priv->xl_array[list[index]],
+ priv->xl_array[list[index]]->fops->readlink,
+ loc,
+ size);
+ break;
+ }
+ }
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "returning ENOENT, no softlink files found "
+ "on storage node");
+ STACK_UNWIND (frame, -1, ENOENT, NULL);
+ }
+
+ return 0;
+}
+
+
+/**
+ * unify_unlink_cbk -
+ */
+int32_t
+unify_unlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int32_t callcnt = 0;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ if (op_ret == 0 || ((op_errno == ENOENT) && priv->optimist))
+ local->op_ret = 0;
+ if (op_ret == -1)
+ local->op_errno = op_errno;
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno);
+ }
+
+ return 0;
+}
+
+
+/**
+ * unify_unlink -
+ */
+int32_t
+unify_unlink (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ unify_private_t *priv = this->private;
+ unify_local_t *local = NULL;
+ int16_t *list = NULL;
+ int16_t index = 0;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ loc_copy (&local->loc1, loc);
+
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; list[index] != -1; index++)
+ local->call_count++;
+
+ if (local->call_count) {
+ for (index = 0; list[index] != -1; index++) {
+ char need_break = (list[index+1] == -1);
+ STACK_WIND (frame,
+ unify_unlink_cbk,
+ priv->xl_array[list[index]],
+ priv->xl_array[list[index]]->fops->unlink,
+ loc);
+ if (need_break)
+ break;
+ }
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: returning ENOENT", loc->path);
+ STACK_UNWIND (frame, -1, ENOENT);
+ }
+
+ return 0;
+}
+
+
+/**
+ * unify_readv_cbk -
+ */
+int32_t
+unify_readv_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct iovec *vector,
+ int32_t count,
+ struct stat *stbuf)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf);
+ return 0;
+}
+
+/**
+ * unify_readv -
+ */
+int32_t
+unify_readv (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ size_t size,
+ off_t offset)
+{
+ UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd);
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ fd_ctx_get (fd, this, &tmp_child);
+ child = (xlator_t *)(long)tmp_child;
+
+ STACK_WIND (frame,
+ unify_readv_cbk,
+ child,
+ child->fops->readv,
+ fd,
+ size,
+ offset);
+
+
+ return 0;
+}
+
+/**
+ * unify_writev_cbk -
+ */
+int32_t
+unify_writev_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *stbuf)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, stbuf);
+ return 0;
+}
+
+/**
+ * unify_writev -
+ */
+int32_t
+unify_writev (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ struct iovec *vector,
+ int32_t count,
+ off_t off)
+{
+ UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd);
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ fd_ctx_get (fd, this, &tmp_child);
+ child = (xlator_t *)(long)tmp_child;
+
+ STACK_WIND (frame,
+ unify_writev_cbk,
+ child,
+ child->fops->writev,
+ fd,
+ vector,
+ count,
+ off);
+
+ return 0;
+}
+
+/**
+ * unify_ftruncate -
+ */
+int32_t
+unify_ftruncate (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ off_t offset)
+{
+ xlator_t *child = NULL;
+ unify_local_t *local = NULL;
+ uint64_t tmp_child = 0;
+
+ UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR(fd);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ local->op_ret = 0;
+
+ fd_ctx_get (fd, this, &tmp_child);
+ child = (xlator_t *)(long)tmp_child;
+
+ local->call_count = 2;
+
+ STACK_WIND (frame, unify_truncate_cbk,
+ child, child->fops->ftruncate,
+ fd, offset);
+
+ STACK_WIND (frame, unify_truncate_cbk,
+ NS(this), NS(this)->fops->fstat,
+ fd);
+
+ return 0;
+}
+
+
+/**
+ * unify_fchmod -
+ */
+int32_t
+unify_fchmod (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ mode_t mode)
+{
+ unify_local_t *local = NULL;
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(fd);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ local->st_ino = fd->inode->ino;
+
+ if (!fd_ctx_get (fd, this, &tmp_child)) {
+ /* If its set, then its file */
+ child = (xlator_t *)(long)tmp_child;
+
+ local->call_count = 2;
+
+ STACK_WIND (frame, unify_buf_cbk, child,
+ child->fops->fchmod, fd, mode);
+
+ STACK_WIND (frame, unify_buf_cbk, NS(this),
+ NS(this)->fops->fchmod, fd, mode);
+
+ } else {
+ /* this is an directory */
+ local->call_count = 1;
+
+ STACK_WIND (frame, unify_buf_cbk,
+ NS(this), NS(this)->fops->fchmod, fd, mode);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_fchown -
+ */
+int32_t
+unify_fchown (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ uid_t uid,
+ gid_t gid)
+{
+ unify_local_t *local = NULL;
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(fd);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ local->st_ino = fd->inode->ino;
+
+ if (!fd_ctx_get (fd, this, &tmp_child)) {
+ /* If its set, then its file */
+ child = (xlator_t *)(long)tmp_child;
+
+ local->call_count = 2;
+
+ STACK_WIND (frame, unify_buf_cbk, child,
+ child->fops->fchown, fd, uid, gid);
+
+ STACK_WIND (frame, unify_buf_cbk, NS(this),
+ NS(this)->fops->fchown, fd, uid, gid);
+ } else {
+ local->call_count = 1;
+
+ STACK_WIND (frame, unify_buf_cbk,
+ NS(this), NS(this)->fops->fchown,
+ fd, uid, gid);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_flush_cbk -
+ */
+int32_t
+unify_flush_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+/**
+ * unify_flush -
+ */
+int32_t
+unify_flush (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd)
+{
+ UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd);
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ fd_ctx_get (fd, this, &tmp_child);
+ child = (xlator_t *)(long)tmp_child;
+
+ STACK_WIND (frame, unify_flush_cbk, child,
+ child->fops->flush, fd);
+
+ return 0;
+}
+
+
+/**
+ * unify_fsync_cbk -
+ */
+int32_t
+unify_fsync_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+/**
+ * unify_fsync -
+ */
+int32_t
+unify_fsync (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ int32_t flags)
+{
+ UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd);
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ fd_ctx_get (fd, this, &tmp_child);
+ child = (xlator_t *)(long)tmp_child;
+
+ STACK_WIND (frame, unify_fsync_cbk, child,
+ child->fops->fsync, fd, flags);
+
+ return 0;
+}
+
+/**
+ * unify_fstat - Send fstat FOP to Namespace only if its directory, and to
+ * both namespace and the storage node if its a file.
+ */
+int32_t
+unify_fstat (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd)
+{
+ unify_local_t *local = NULL;
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ UNIFY_CHECK_FD_AND_UNWIND_ON_ERR(fd);
+
+ INIT_LOCAL (frame, local);
+ local->st_ino = fd->inode->ino;
+
+ if (!fd_ctx_get (fd, this, &tmp_child)) {
+ /* If its set, then its file */
+ child = (xlator_t *)(long)tmp_child;
+ local->call_count = 2;
+
+ STACK_WIND (frame, unify_buf_cbk, child,
+ child->fops->fstat, fd);
+
+ STACK_WIND (frame, unify_buf_cbk, NS(this),
+ NS(this)->fops->fstat, fd);
+
+ } else {
+ /* this is an directory */
+ local->call_count = 1;
+ STACK_WIND (frame, unify_buf_cbk, NS(this),
+ NS(this)->fops->fstat, fd);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_getdents_cbk -
+ */
+int32_t
+unify_getdents_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dir_entry_t *entry,
+ int32_t count)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, entry, count);
+ return 0;
+}
+
+/**
+ * unify_getdents - send the FOP request to all the nodes.
+ */
+int32_t
+unify_getdents (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ size_t size,
+ off_t offset,
+ int32_t flag)
+{
+ UNIFY_CHECK_FD_AND_UNWIND_ON_ERR (fd);
+
+ STACK_WIND (frame, unify_getdents_cbk, NS(this),
+ NS(this)->fops->getdents, fd, size, offset, flag);
+
+ return 0;
+}
+
+
+/**
+ * unify_readdir_cbk -
+ */
+int32_t
+unify_readdir_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ gf_dirent_t *buf)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+
+ return 0;
+}
+
+/**
+ * unify_readdir - send the FOP request to all the nodes.
+ */
+int32_t
+unify_readdir (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ size_t size,
+ off_t offset)
+{
+ UNIFY_CHECK_FD_AND_UNWIND_ON_ERR (fd);
+
+ STACK_WIND (frame, unify_readdir_cbk, NS(this),
+ NS(this)->fops->readdir, fd, size, offset);
+
+ return 0;
+}
+
+
+/**
+ * unify_fsyncdir_cbk -
+ */
+int32_t
+unify_fsyncdir_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+
+ return 0;
+}
+
+/**
+ * unify_fsyncdir -
+ */
+int32_t
+unify_fsyncdir (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ int32_t flags)
+{
+ UNIFY_CHECK_FD_AND_UNWIND_ON_ERR (fd);
+
+ STACK_WIND (frame, unify_fsyncdir_cbk,
+ NS(this), NS(this)->fops->fsyncdir, fd, flags);
+
+ return 0;
+}
+
+/**
+ * unify_lk_cbk - UNWIND frame with the proper return arguments.
+ */
+int32_t
+unify_lk_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct flock *lock)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, lock);
+ return 0;
+}
+
+/**
+ * unify_lk - Send it to all the storage nodes, (should be 1) which has file.
+ */
+int32_t
+unify_lk (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ int32_t cmd,
+ struct flock *lock)
+{
+ UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd);
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ fd_ctx_get (fd, this, &tmp_child);
+ child = (xlator_t *)(long)tmp_child;
+
+ STACK_WIND (frame, unify_lk_cbk, child,
+ child->fops->lk, fd, cmd, lock);
+
+ return 0;
+}
+
+
+int32_t
+unify_setxattr_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno);
+
+static int32_t
+unify_setxattr_file_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ unify_private_t *private = this->private;
+ unify_local_t *local = frame->local;
+ xlator_t *sched_xl = NULL;
+ struct sched_ops *sched_ops = NULL;
+
+ if (op_ret == -1) {
+ if (!ENOTSUP)
+ gf_log (this->name, GF_LOG_ERROR,
+ "setxattr with XATTR_CREATE on ns: "
+ "path(%s) key(%s): %s",
+ local->loc1.path, local->name,
+ strerror (op_errno));
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+ }
+
+ LOCK (&frame->lock);
+ {
+ local->failed = 0;
+ local->op_ret = 0;
+ local->op_errno = 0;
+ local->call_count = 1;
+ }
+ UNLOCK (&frame->lock);
+
+ /* schedule XATTR_CREATE on one of the child node */
+ sched_ops = private->sched_ops;
+
+ /* Send create request to the scheduled node now */
+ sched_xl = sched_ops->schedule (this, local->name);
+ if (!sched_xl) {
+ STACK_UNWIND (frame, -1, ENOTCONN);
+ return 0;
+ }
+
+ STACK_WIND (frame,
+ unify_setxattr_cbk,
+ sched_xl,
+ sched_xl->fops->setxattr,
+ &local->loc1,
+ local->dict,
+ local->flags);
+ return 0;
+}
+
+/**
+ * unify_setxattr_cbk - When all the child nodes return, UNWIND frame.
+ */
+int32_t
+unify_setxattr_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int32_t callcnt = 0;
+ unify_local_t *local = frame->local;
+ call_frame_t *prev_frame = cookie;
+ dict_t *dict = NULL;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+
+ if (op_ret == -1) {
+ gf_log (this->name, (((op_errno == ENOENT) ||
+ (op_errno == ENOTSUP))?
+ GF_LOG_DEBUG : GF_LOG_ERROR),
+ "child(%s): path(%s): %s",
+ prev_frame->this->name,
+ (local->loc1.path)?local->loc1.path:"",
+ strerror (op_errno));
+ if (local->failed == -1) {
+ local->failed = 1;
+ }
+ local->op_errno = op_errno;
+ } else {
+ local->failed = 0;
+ local->op_ret = op_ret;
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ if (local->failed && local->name &&
+ ZR_FILE_CONTENT_REQUEST(local->name)) {
+ dict = get_new_dict ();
+ dict_set (dict, local->dict->members_list->key,
+ data_from_dynptr(NULL, 0));
+ dict_ref (dict);
+
+ local->call_count = 1;
+
+ STACK_WIND (frame,
+ unify_setxattr_file_cbk,
+ NS(this),
+ NS(this)->fops->setxattr,
+ &local->loc1,
+ dict,
+ XATTR_CREATE);
+
+ dict_unref (dict);
+ return 0;
+ }
+
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_sexattr - This function should be sent to all the storage nodes,
+ * which contains the file, (excluding namespace).
+ */
+int32_t
+unify_setxattr (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ dict_t *dict,
+ int32_t flags)
+{
+ unify_private_t *priv = this->private;
+ unify_local_t *local = NULL;
+ int16_t *list = NULL;
+ int16_t index = 0;
+ int32_t call_count = 0;
+ uint64_t tmp_list = 0;
+ data_pair_t *trav = dict->members_list;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ local->failed = -1;
+ loc_copy (&local->loc1, loc);
+
+ if (S_ISDIR (loc->inode->st_mode)) {
+
+ if (trav && trav->key && ZR_FILE_CONTENT_REQUEST(trav->key)) {
+ /* direct the storage xlators to change file
+ content only if file exists */
+ local->flags = flags;
+ local->dict = dict;
+ local->name = strdup (trav->key);
+ flags |= XATTR_REPLACE;
+ }
+
+ local->call_count = priv->child_count;
+ for (index = 0; index < priv->child_count; index++) {
+ STACK_WIND (frame,
+ unify_setxattr_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->setxattr,
+ loc, dict, flags);
+ }
+ return 0;
+ }
+
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; list[index] != -1; index++) {
+ if (NS(this) != priv->xl_array[list[index]]) {
+ local->call_count++;
+ call_count++;
+ }
+ }
+
+ if (local->call_count) {
+ for (index = 0; list[index] != -1; index++) {
+ if (priv->xl_array[list[index]] != NS(this)) {
+ STACK_WIND (frame,
+ unify_setxattr_cbk,
+ priv->xl_array[list[index]],
+ priv->xl_array[list[index]]->fops->setxattr,
+ loc,
+ dict,
+ flags);
+ if (!--call_count)
+ break;
+ }
+ }
+ return 0;
+ }
+
+ /* No entry in storage nodes */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "returning ENOENT, file not found on storage node.");
+ STACK_UNWIND (frame, -1, ENOENT);
+
+ return 0;
+}
+
+
+/**
+ * unify_getxattr_cbk - This function is called from only one child, so, no
+ * need of any lock or anything else, just send it to above layer
+ */
+int32_t
+unify_getxattr_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ dict_t *value)
+{
+ int32_t callcnt = 0;
+ dict_t *local_value = NULL;
+ unify_local_t *local = frame->local;
+ call_frame_t *prev_frame = cookie;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+
+ if (op_ret == -1) {
+ local->op_errno = op_errno;
+ gf_log (this->name,
+ (((op_errno == ENOENT) ||
+ (op_errno == ENODATA) ||
+ (op_errno == ENOTSUP)) ?
+ GF_LOG_DEBUG : GF_LOG_ERROR),
+ "child(%s): path(%s): %s",
+ prev_frame->this->name,
+ (local->loc1.path)?local->loc1.path:"",
+ strerror (op_errno));
+ } else {
+ if (!local->dict)
+ local->dict = dict_ref (value);
+ local->op_ret = op_ret;
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ local_value = local->dict;
+ local->dict = NULL;
+
+ STACK_UNWIND (frame, local->op_ret, local->op_errno,
+ local_value);
+
+ if (local_value)
+ dict_unref (local_value);
+ }
+
+ return 0;
+}
+
+
+/**
+ * unify_getxattr - This FOP is sent to only the storage node.
+ */
+int32_t
+unify_getxattr (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ const char *name)
+{
+ unify_private_t *priv = this->private;
+ int16_t *list = NULL;
+ int16_t index = 0;
+ int16_t count = 0;
+ unify_local_t *local = NULL;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+ INIT_LOCAL (frame, local);
+
+ if (S_ISDIR (loc->inode->st_mode)) {
+ local->call_count = priv->child_count;
+ for (index = 0; index < priv->child_count; index++)
+ STACK_WIND (frame,
+ unify_getxattr_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->getxattr,
+ loc,
+ name);
+ return 0;
+ }
+
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; list[index] != -1; index++) {
+ if (NS(this) != priv->xl_array[list[index]]) {
+ local->call_count++;
+ count++;
+ }
+ }
+
+ if (count) {
+ for (index = 0; list[index] != -1; index++) {
+ if (priv->xl_array[list[index]] != NS(this)) {
+ STACK_WIND (frame,
+ unify_getxattr_cbk,
+ priv->xl_array[list[index]],
+ priv->xl_array[list[index]]->fops->getxattr,
+ loc,
+ name);
+ if (!--count)
+ break;
+ }
+ }
+ } else {
+ dict_t *tmp_dict = get_new_dict ();
+ gf_log (this->name, GF_LOG_DEBUG,
+ "%s: returning ENODATA, no file found on storage node",
+ loc->path);
+ STACK_UNWIND (frame, -1, ENODATA, tmp_dict);
+ dict_destroy (tmp_dict);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_removexattr_cbk - Wait till all the child node returns the call
+ * and then UNWIND to above layer.
+ */
+int32_t
+unify_removexattr_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int32_t callcnt = 0;
+ unify_local_t *local = frame->local;
+ call_frame_t *prev_frame = cookie;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ if (op_ret == -1) {
+ local->op_errno = op_errno;
+ if (op_errno != ENOTSUP)
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s): %s",
+ prev_frame->this->name,
+ local->loc1.path, strerror (op_errno));
+ } else {
+ local->op_ret = op_ret;
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ STACK_UNWIND (frame, local->op_ret, local->op_errno);
+ }
+
+ return 0;
+}
+
+/**
+ * unify_removexattr - Send it to all the child nodes which has the files.
+ */
+int32_t
+unify_removexattr (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ const char *name)
+{
+ unify_private_t *priv = this->private;
+ unify_local_t *local = NULL;
+ int16_t *list = NULL;
+ int16_t index = 0;
+ int32_t call_count = 0;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (loc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+
+ if (S_ISDIR (loc->inode->st_mode)) {
+ local->call_count = priv->child_count;
+ for (index = 0; index < priv->child_count; index++)
+ STACK_WIND (frame,
+ unify_removexattr_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->removexattr,
+ loc,
+ name);
+
+ return 0;
+ }
+
+ inode_ctx_get (loc->inode, this, &tmp_list);
+ list = (int16_t *)(long)tmp_list;
+
+ for (index = 0; list[index] != -1; index++) {
+ if (NS(this) != priv->xl_array[list[index]]) {
+ local->call_count++;
+ call_count++;
+ }
+ }
+
+ if (local->call_count) {
+ for (index = 0; list[index] != -1; index++) {
+ if (priv->xl_array[list[index]] != NS(this)) {
+ STACK_WIND (frame,
+ unify_removexattr_cbk,
+ priv->xl_array[list[index]],
+ priv->xl_array[list[index]]->fops->removexattr,
+ loc,
+ name);
+ if (!--call_count)
+ break;
+ }
+ }
+ return 0;
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "%s: returning ENOENT, not found on storage node.", loc->path);
+ STACK_UNWIND (frame, -1, ENOENT);
+
+ return 0;
+}
+
+
+int32_t
+unify_mknod_unlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ unify_local_t *local = frame->local;
+
+ if (op_ret == -1)
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: %s", local->loc1.path, strerror (op_errno));
+
+ unify_local_wipe (local);
+ /* No log required here as this -1 is for mknod call */
+ STACK_UNWIND (frame, -1, local->op_errno, NULL, NULL);
+ return 0;
+}
+
+/**
+ * unify_mknod_cbk -
+ */
+int32_t
+unify_mknod_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf)
+{
+ unify_local_t *local = frame->local;
+
+ if (op_ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "mknod failed on storage node, sending unlink to "
+ "namespace");
+ local->op_errno = op_errno;
+ STACK_WIND (frame,
+ unify_mknod_unlink_cbk,
+ NS(this),
+ NS(this)->fops->unlink,
+ &local->loc1);
+ return 0;
+ }
+
+ local->stbuf = *buf;
+ local->stbuf.st_ino = local->st_ino;
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno, inode, &local->stbuf);
+ return 0;
+}
+
+/**
+ * unify_ns_mknod_cbk -
+ */
+int32_t
+unify_ns_mknod_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf)
+{
+ struct sched_ops *sched_ops = NULL;
+ xlator_t *sched_xl = NULL;
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ int16_t *list = NULL;
+ int16_t index = 0;
+ call_frame_t *prev_frame = cookie;
+
+ if (op_ret == -1) {
+ /* No need to send mknod request to other servers,
+ * as namespace action failed
+ */
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s): %s",
+ prev_frame->this->name, local->loc1.path,
+ strerror (op_errno));
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno, inode, buf);
+ return 0;
+ }
+
+ /* Create one inode for this entry */
+ local->op_ret = 0;
+ local->stbuf = *buf;
+ local->st_ino = buf->st_ino;
+
+ list = CALLOC (1, sizeof (int16_t) * 3);
+ ERR_ABORT (list);
+ list[0] = priv->child_count;
+ list[2] = -1;
+ inode_ctx_put (inode, this, (uint64_t)(long)list);
+
+ sched_ops = priv->sched_ops;
+
+ /* Send mknod request to scheduled node now */
+ sched_xl = sched_ops->schedule (this, local->loc1.path);
+ if (!sched_xl) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "mknod failed on storage node, no node online "
+ "at the moment, sending unlink to NS");
+ local->op_errno = ENOTCONN;
+ STACK_WIND (frame,
+ unify_mknod_unlink_cbk,
+ NS(this),
+ NS(this)->fops->unlink,
+ &local->loc1);
+
+ return 0;
+ }
+
+ for (index = 0; index < priv->child_count; index++)
+ if (sched_xl == priv->xl_array[index])
+ break;
+ list[1] = index;
+
+ STACK_WIND (frame, unify_mknod_cbk,
+ sched_xl, sched_xl->fops->mknod,
+ &local->loc1, local->mode, local->dev);
+
+ return 0;
+}
+
+/**
+ * unify_mknod - Create a device on namespace first, and later create on
+ * the storage node.
+ */
+int32_t
+unify_mknod (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ mode_t mode,
+ dev_t rdev)
+{
+ unify_local_t *local = NULL;
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ local->mode = mode;
+ local->dev = rdev;
+ loc_copy (&local->loc1, loc);
+ if (local->loc1.path == NULL) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O");
+ STACK_UNWIND (frame, -1, ENOMEM, loc->inode, NULL);
+ return 0;
+ }
+
+ STACK_WIND (frame,
+ unify_ns_mknod_cbk,
+ NS(this),
+ NS(this)->fops->mknod,
+ loc,
+ mode,
+ rdev);
+
+ return 0;
+}
+
+int32_t
+unify_symlink_unlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ unify_local_t *local = frame->local;
+ if (op_ret == -1)
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: %s", local->loc1.path, strerror (op_errno));
+
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, -1, local->op_errno, NULL, NULL);
+ return 0;
+}
+
+/**
+ * unify_symlink_cbk -
+ */
+int32_t
+unify_symlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf)
+{
+ unify_local_t *local = frame->local;
+
+ if (op_ret == -1) {
+ /* Symlink on storage node failed, hence send unlink
+ to the NS node */
+ local->op_errno = op_errno;
+ gf_log (this->name, GF_LOG_ERROR,
+ "symlink on storage node failed, sending unlink "
+ "to namespace");
+
+ STACK_WIND (frame,
+ unify_symlink_unlink_cbk,
+ NS(this),
+ NS(this)->fops->unlink,
+ &local->loc1);
+
+ return 0;
+ }
+
+ local->stbuf = *buf;
+ local->stbuf.st_ino = local->st_ino;
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno, inode, &local->stbuf);
+
+ return 0;
+}
+
+/**
+ * unify_ns_symlink_cbk -
+ */
+int32_t
+unify_ns_symlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf)
+{
+
+ struct sched_ops *sched_ops = NULL;
+ xlator_t *sched_xl = NULL;
+ int16_t *list = NULL;
+ unify_local_t *local = frame->local;
+ unify_private_t *priv = this->private;
+ int16_t index = 0;
+
+ if (op_ret == -1) {
+ /* No need to send symlink request to other servers,
+ * as namespace action failed
+ */
+ gf_log (this->name, GF_LOG_ERROR,
+ "namespace: path(%s): %s",
+ local->loc1.path, strerror (op_errno));
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno, NULL, buf);
+ return 0;
+ }
+
+ /* Create one inode for this entry */
+ local->op_ret = 0;
+ local->st_ino = buf->st_ino;
+
+ /* Start the mapping list */
+
+ list = CALLOC (1, sizeof (int16_t) * 3);
+ ERR_ABORT (list);
+ list[0] = priv->child_count; //namespace's index
+ list[2] = -1;
+ inode_ctx_put (inode, this, (uint64_t)(long)list);
+
+ sched_ops = priv->sched_ops;
+
+ /* Send symlink request to all the nodes now */
+ sched_xl = sched_ops->schedule (this, local->loc1.path);
+ if (!sched_xl) {
+ /* Symlink on storage node failed, hence send unlink
+ to the NS node */
+ local->op_errno = ENOTCONN;
+ gf_log (this->name, GF_LOG_ERROR,
+ "symlink on storage node failed, no node online, "
+ "sending unlink to namespace");
+
+ STACK_WIND (frame,
+ unify_symlink_unlink_cbk,
+ NS(this),
+ NS(this)->fops->unlink,
+ &local->loc1);
+
+ return 0;
+ }
+
+ for (index = 0; index < priv->child_count; index++)
+ if (sched_xl == priv->xl_array[index])
+ break;
+ list[1] = index;
+
+ STACK_WIND (frame,
+ unify_symlink_cbk,
+ sched_xl,
+ sched_xl->fops->symlink,
+ local->name,
+ &local->loc1);
+
+ return 0;
+}
+
+/**
+ * unify_symlink -
+ */
+int32_t
+unify_symlink (call_frame_t *frame,
+ xlator_t *this,
+ const char *linkpath,
+ loc_t *loc)
+{
+ unify_local_t *local = NULL;
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ loc_copy (&local->loc1, loc);
+ local->name = strdup (linkpath);
+
+ if ((local->name == NULL) ||
+ (local->loc1.path == NULL)) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O");
+ STACK_UNWIND (frame, -1, ENOMEM, loc->inode, NULL);
+ return 0;
+ }
+
+ STACK_WIND (frame,
+ unify_ns_symlink_cbk,
+ NS(this),
+ NS(this)->fops->symlink,
+ linkpath,
+ loc);
+
+ return 0;
+}
+
+
+int32_t
+unify_rename_unlink_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno)
+{
+ int32_t callcnt = 0;
+ unify_local_t *local = frame->local;
+ call_frame_t *prev_frame = cookie;
+
+ if (op_ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s -> %s): %s",
+ prev_frame->this->name,
+ local->loc1.path, local->loc2.path,
+ strerror (op_errno));
+
+ }
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ local->stbuf.st_ino = local->st_ino;
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno,
+ &local->stbuf);
+ }
+ return 0;
+}
+
+int32_t
+unify_ns_rename_undo_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ unify_local_t *local = frame->local;
+
+ if (op_ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "namespace: path(%s -> %s): %s",
+ local->loc1.path, local->loc2.path,
+ strerror (op_errno));
+ }
+
+ local->stbuf.st_ino = local->st_ino;
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno, &local->stbuf);
+ return 0;
+}
+
+int32_t
+unify_rename_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ int32_t index = 0;
+ int32_t callcnt = 0;
+ int16_t *list = NULL;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+ call_frame_t *prev_frame = cookie;
+
+ LOCK (&frame->lock);
+ {
+ callcnt = --local->call_count;
+ if (op_ret >= 0) {
+ if (!S_ISDIR (buf->st_mode))
+ local->stbuf = *buf;
+ local->op_ret = op_ret;
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "child(%s): path(%s -> %s): %s",
+ prev_frame->this->name,
+ local->loc1.path, local->loc2.path,
+ strerror (op_errno));
+ local->op_errno = op_errno;
+ }
+ }
+ UNLOCK (&frame->lock);
+
+ if (!callcnt) {
+ local->stbuf.st_ino = local->st_ino;
+ if (S_ISDIR (local->loc1.inode->st_mode)) {
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret, local->op_errno, &local->stbuf);
+ return 0;
+ }
+
+ if (local->op_ret == -1) {
+ /* TODO: check this logic */
+
+ /* Rename failed in storage node, successful on NS,
+ * hence, rename back the entries in NS */
+ /* NOTE: this will be done only if the destination
+ * doesn't exists, if the destination exists, the
+ * job of correcting NS is left to self-heal
+ */
+ if (!local->index) {
+ loc_t tmp_oldloc = {
+ /* its actual 'newloc->path' */
+ .path = local->loc2.path,
+ .inode = local->loc1.inode,
+ .parent = local->loc2.parent
+ };
+
+ loc_t tmp_newloc = {
+ /* Actual 'oldloc->path' */
+ .path = local->loc1.path,
+ .parent = local->loc1.parent
+ };
+
+ gf_log (this->name, GF_LOG_ERROR,
+ "rename succussful on namespace, on "
+ "stroage node failed, reverting back");
+
+ STACK_WIND (frame,
+ unify_ns_rename_undo_cbk,
+ NS(this),
+ NS(this)->fops->rename,
+ &tmp_oldloc,
+ &tmp_newloc);
+ return 0;
+ }
+ } else {
+ /* Rename successful on storage nodes */
+
+ int32_t idx = 0;
+ int16_t *tmp_list = NULL;
+ uint64_t tmp_list_int64 = 0;
+ if (local->loc2.inode) {
+ inode_ctx_get (local->loc2.inode,
+ this, &tmp_list_int64);
+ list = (int16_t *)(long)tmp_list_int64;
+
+ }
+
+ if (list) {
+ for (index = 0; list[index] != -1; index++);
+ tmp_list = CALLOC (1, index * 2);
+ memcpy (tmp_list, list, index * 2);
+
+ for (index = 0; list[index] != -1; index++) {
+ /* TODO: Check this logic. */
+ /* If the destination file exists in
+ * the same storage node where we sent
+ * 'rename' call, no need to send
+ * unlink
+ */
+ for (idx = 0;
+ local->list[idx] != -1; idx++) {
+ if (tmp_list[index] == local->list[idx]) {
+ tmp_list[index] = priv->child_count;
+ continue;
+ }
+ }
+
+ if (NS(this) != priv->xl_array[tmp_list[index]]) {
+ local->call_count++;
+ callcnt++;
+ }
+ }
+
+ if (local->call_count) {
+ if (callcnt > 1)
+ gf_log (this->name,
+ GF_LOG_ERROR,
+ "%s->%s: more (%d) "
+ "subvolumes have the "
+ "newloc entry",
+ local->loc1.path,
+ local->loc2.path,
+ callcnt);
+
+ for (index=0;
+ tmp_list[index] != -1; index++) {
+ if (NS(this) != priv->xl_array[tmp_list[index]]) {
+ STACK_WIND (frame,
+ unify_rename_unlink_cbk,
+ priv->xl_array[tmp_list[index]],
+ priv->xl_array[tmp_list[index]]->fops->unlink,
+ &local->loc2);
+ if (!--callcnt)
+ break;
+ }
+ }
+
+ FREE (tmp_list);
+ return 0;
+ }
+ if (tmp_list)
+ FREE (tmp_list);
+ }
+ }
+
+ /* Need not send 'unlink' to storage node */
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, local->op_ret,
+ local->op_errno, &local->stbuf);
+ }
+
+ return 0;
+}
+
+int32_t
+unify_ns_rename_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ int32_t index = 0;
+ int32_t callcnt = 0;
+ int16_t *list = NULL;
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+
+ if (op_ret == -1) {
+ /* Free local->new_inode */
+ gf_log (this->name, GF_LOG_ERROR,
+ "namespace: path(%s -> %s): %s",
+ local->loc1.path, local->loc2.path,
+ strerror (op_errno));
+
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+ return 0;
+ }
+
+ local->stbuf = *buf;
+ local->st_ino = buf->st_ino;
+
+ /* Everything is fine. */
+ if (S_ISDIR (buf->st_mode)) {
+ local->call_count = priv->child_count;
+ for (index=0; index < priv->child_count; index++) {
+ STACK_WIND (frame,
+ unify_rename_cbk,
+ priv->xl_array[index],
+ priv->xl_array[index]->fops->rename,
+ &local->loc1,
+ &local->loc2);
+ }
+
+ return 0;
+ }
+
+ local->call_count = 0;
+ /* send rename */
+ list = local->list;
+ for (index=0; list[index] != -1; index++) {
+ if (NS(this) != priv->xl_array[list[index]]) {
+ local->call_count++;
+ callcnt++;
+ }
+ }
+
+ if (local->call_count) {
+ for (index=0; list[index] != -1; index++) {
+ if (NS(this) != priv->xl_array[list[index]]) {
+ STACK_WIND (frame,
+ unify_rename_cbk,
+ priv->xl_array[list[index]],
+ priv->xl_array[list[index]]->fops->rename,
+ &local->loc1,
+ &local->loc2);
+ if (!--callcnt)
+ break;
+ }
+ }
+ } else {
+ /* file doesn't seem to be present in storage nodes */
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "CRITICAL: source file not in storage node, "
+ "rename successful on namespace :O");
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, -1, EIO, NULL);
+ }
+ return 0;
+}
+
+
+/**
+ * unify_rename - One of the tricky function. The deadliest of all :O
+ */
+int32_t
+unify_rename (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *oldloc,
+ loc_t *newloc)
+{
+ unify_local_t *local = NULL;
+ uint64_t tmp_list = 0;
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+ loc_copy (&local->loc1, oldloc);
+ loc_copy (&local->loc2, newloc);
+
+ if ((local->loc1.path == NULL) ||
+ (local->loc2.path == NULL)) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Not enough memory :O");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+
+ inode_ctx_get (oldloc->inode, this, &tmp_list);
+ local->list = (int16_t *)(long)tmp_list;
+
+ STACK_WIND (frame,
+ unify_ns_rename_cbk,
+ NS(this),
+ NS(this)->fops->rename,
+ oldloc,
+ newloc);
+ return 0;
+}
+
+/**
+ * unify_link_cbk -
+ */
+int32_t
+unify_link_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf)
+{
+ unify_local_t *local = frame->local;
+
+ if (op_ret >= 0)
+ local->stbuf = *buf;
+ local->stbuf.st_ino = local->st_ino;
+
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno, inode, &local->stbuf);
+
+ return 0;
+}
+
+/**
+ * unify_ns_link_cbk -
+ */
+int32_t
+unify_ns_link_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ inode_t *inode,
+ struct stat *buf)
+{
+ unify_private_t *priv = this->private;
+ unify_local_t *local = frame->local;
+ int16_t *list = local->list;
+ int16_t index = 0;
+
+ if (op_ret == -1) {
+ /* No need to send link request to other servers,
+ * as namespace action failed
+ */
+ gf_log (this->name, GF_LOG_ERROR,
+ "namespace: path(%s -> %s): %s",
+ local->loc1.path, local->loc2.path,
+ strerror (op_errno));
+ unify_local_wipe (local);
+ STACK_UNWIND (frame, op_ret, op_errno, inode, buf);
+ return 0;
+ }
+
+ /* Update inode for this entry */
+ local->op_ret = 0;
+ local->st_ino = buf->st_ino;
+
+ /* Send link request to the node now */
+ for (index = 0; list[index] != -1; index++) {
+ char need_break = (list[index+1] == -1);
+ if (priv->xl_array[list[index]] != NS (this)) {
+ STACK_WIND (frame,
+ unify_link_cbk,
+ priv->xl_array[list[index]],
+ priv->xl_array[list[index]]->fops->link,
+ &local->loc1,
+ &local->loc2);
+ }
+ if (need_break)
+ break;
+ }
+
+ return 0;
+}
+
+/**
+ * unify_link -
+ */
+int32_t
+unify_link (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *oldloc,
+ loc_t *newloc)
+{
+ unify_local_t *local = NULL;
+ uint64_t tmp_list = 0;
+
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (oldloc);
+ UNIFY_CHECK_INODE_CTX_AND_UNWIND_ON_ERR (newloc);
+
+ /* Initialization */
+ INIT_LOCAL (frame, local);
+
+ loc_copy (&local->loc1, oldloc);
+ loc_copy (&local->loc2, newloc);
+
+ inode_ctx_get (oldloc->inode, this, &tmp_list);
+ local->list = (int16_t *)(long)tmp_list;
+
+ STACK_WIND (frame,
+ unify_ns_link_cbk,
+ NS(this),
+ NS(this)->fops->link,
+ oldloc,
+ newloc);
+
+ return 0;
+}
+
+
+/**
+ * unify_checksum_cbk -
+ */
+int32_t
+unify_checksum_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ uint8_t *fchecksum,
+ uint8_t *dchecksum)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, fchecksum, dchecksum);
+
+ return 0;
+}
+
+/**
+ * unify_checksum -
+ */
+int32_t
+unify_checksum (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flag)
+{
+ STACK_WIND (frame,
+ unify_checksum_cbk,
+ NS(this),
+ NS(this)->fops->checksum,
+ loc,
+ flag);
+
+ return 0;
+}
+
+
+/**
+ * unify_finodelk_cbk -
+ */
+int
+unify_finodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+/**
+ * unify_finodelk
+ */
+int
+unify_finodelk (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, int cmd, struct flock *flock)
+{
+ UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd);
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ fd_ctx_get (fd, this, &tmp_child);
+ child = (xlator_t *)(long)tmp_child;
+
+ STACK_WIND (frame, unify_finodelk_cbk,
+ child, child->fops->finodelk,
+ fd, cmd, flock);
+
+ return 0;
+}
+
+
+
+/**
+ * unify_fentrylk_cbk -
+ */
+int
+unify_fentrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+/**
+ * unify_fentrylk
+ */
+int
+unify_fentrylk (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, const char *basename,
+ entrylk_cmd cmd, entrylk_type type)
+
+{
+ UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd);
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ fd_ctx_get (fd, this, &tmp_child);
+ child = (xlator_t *)(long)tmp_child;
+
+ STACK_WIND (frame, unify_fentrylk_cbk,
+ child, child->fops->fentrylk,
+ fd, basename, cmd, type);
+
+ return 0;
+}
+
+
+
+/**
+ * unify_fxattrop_cbk -
+ */
+int
+unify_fxattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xattr)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, xattr);
+ return 0;
+}
+
+/**
+ * unify_fxattrop
+ */
+int
+unify_fxattrop (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, gf_xattrop_flags_t optype, dict_t *xattr)
+{
+ UNIFY_CHECK_FD_CTX_AND_UNWIND_ON_ERR (fd);
+ xlator_t *child = NULL;
+ uint64_t tmp_child = 0;
+
+ fd_ctx_get (fd, this, &tmp_child);
+ child = (xlator_t *)(long)tmp_child;
+
+ STACK_WIND (frame, unify_fxattrop_cbk,
+ child, child->fops->fxattrop,
+ fd, optype, xattr);
+
+ return 0;
+}
+
+
+/**
+ * unify_inodelk_cbk -
+ */
+int
+unify_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+
+/**
+ * unify_inodelk
+ */
+int
+unify_inodelk (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int cmd, struct flock *flock)
+{
+ xlator_t *child = NULL;
+
+ child = unify_loc_subvol (loc, this);
+
+ STACK_WIND (frame, unify_inodelk_cbk,
+ child, child->fops->inodelk,
+ loc, cmd, flock);
+
+ return 0;
+}
+
+
+
+/**
+ * unify_entrylk_cbk -
+ */
+int
+unify_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno)
+{
+ STACK_UNWIND (frame, op_ret, op_errno);
+ return 0;
+}
+
+/**
+ * unify_entrylk
+ */
+int
+unify_entrylk (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, const char *basename,
+ entrylk_cmd cmd, entrylk_type type)
+
+{
+ xlator_t *child = NULL;
+
+ child = unify_loc_subvol (loc, this);
+
+ STACK_WIND (frame, unify_entrylk_cbk,
+ child, child->fops->entrylk,
+ loc, basename, cmd, type);
+
+ return 0;
+}
+
+
+
+/**
+ * unify_xattrop_cbk -
+ */
+int
+unify_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xattr)
+{
+ STACK_UNWIND (frame, op_ret, op_errno, xattr);
+ return 0;
+}
+
+/**
+ * unify_xattrop
+ */
+int
+unify_xattrop (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, gf_xattrop_flags_t optype, dict_t *xattr)
+{
+ xlator_t *child = NULL;
+
+ child = unify_loc_subvol (loc, this);
+
+ STACK_WIND (frame, unify_xattrop_cbk,
+ child, child->fops->xattrop,
+ loc, optype, xattr);
+
+ return 0;
+}
+
+
+/**
+ * notify
+ */
+int32_t
+notify (xlator_t *this,
+ int32_t event,
+ void *data,
+ ...)
+{
+ unify_private_t *priv = this->private;
+ struct sched_ops *sched = NULL;
+
+ if (!priv) {
+ return 0;
+ }
+
+ sched = priv->sched_ops;
+ if (!sched) {
+ gf_log (this->name, GF_LOG_CRITICAL, "No scheduler :O");
+ raise (SIGTERM);
+ return 0;
+ }
+ if (priv->namespace == data) {
+ if (event == GF_EVENT_CHILD_UP) {
+ sched->notify (this, event, data);
+ }
+ return 0;
+ }
+
+ switch (event)
+ {
+ case GF_EVENT_CHILD_UP:
+ {
+ /* Call scheduler's update () to enable it for scheduling */
+ sched->notify (this, event, data);
+
+ LOCK (&priv->lock);
+ {
+ /* Increment the inode's generation, which is
+ used for self_heal */
+ ++priv->inode_generation;
+ ++priv->num_child_up;
+ }
+ UNLOCK (&priv->lock);
+
+ if (!priv->is_up) {
+ default_notify (this, event, data);
+ priv->is_up = 1;
+ }
+ }
+ break;
+ case GF_EVENT_CHILD_DOWN:
+ {
+ /* Call scheduler's update () to disable the child node
+ * for scheduling
+ */
+ sched->notify (this, event, data);
+ LOCK (&priv->lock);
+ {
+ --priv->num_child_up;
+ }
+ UNLOCK (&priv->lock);
+
+ if (priv->num_child_up == 0) {
+ /* Send CHILD_DOWN to upper layer */
+ default_notify (this, event, data);
+ priv->is_up = 0;
+ }
+ }
+ break;
+
+ default:
+ {
+ default_notify (this, event, data);
+ }
+ break;
+ }
+
+ return 0;
+}
+
+/**
+ * init - This function is called first in the xlator, while initializing.
+ * All the config file options are checked and appropriate flags are set.
+ *
+ * @this -
+ */
+int32_t
+init (xlator_t *this)
+{
+ int32_t ret = 0;
+ int32_t count = 0;
+ data_t *scheduler = NULL;
+ data_t *data = NULL;
+ xlator_t *ns_xl = NULL;
+ xlator_list_t *trav = NULL;
+ xlator_list_t *xlparent = NULL;
+ xlator_list_t *parent = NULL;
+ unify_private_t *_private = NULL;
+
+ /* Check for number of child nodes, if there is no child nodes, exit */
+ if (!this->children) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "No child nodes specified. check \"subvolumes \" "
+ "option in volfile");
+ return -1;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "dangling volume. check volfile ");
+ }
+
+ /* Check for 'scheduler' in volume */
+ scheduler = dict_get (this->options, "scheduler");
+ if (!scheduler) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "\"option scheduler <x>\" is missing in volfile");
+ return -1;
+ }
+
+ /* Setting "option namespace <node>" */
+ data = dict_get (this->options, "namespace");
+ if(!data) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "namespace option not specified, Exiting");
+ return -1;
+ }
+ /* Search namespace in the child node, if found, exit */
+ trav = this->children;
+ while (trav) {
+ if (strcmp (trav->xlator->name, data->data) == 0)
+ break;
+ trav = trav->next;
+ }
+ if (trav) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "namespace node used as a subvolume, Exiting");
+ return -1;
+ }
+
+ /* Search for the namespace node, if found, continue */
+ ns_xl = this->next;
+ while (ns_xl) {
+ if (strcmp (ns_xl->name, data->data) == 0)
+ break;
+ ns_xl = ns_xl->next;
+ }
+ if (!ns_xl) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "namespace node not found in volfile, Exiting");
+ return -1;
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "namespace node specified as %s", data->data);
+
+ _private = CALLOC (1, sizeof (*_private));
+ ERR_ABORT (_private);
+ _private->sched_ops = get_scheduler (this, scheduler->data);
+ if (!_private->sched_ops) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "Error while loading scheduler. Exiting");
+ FREE (_private);
+ return -1;
+ }
+
+ if (ns_xl->parents) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "Namespace node should not be a child of any other node. Exiting");
+ FREE (_private);
+ return -1;
+ }
+
+ _private->namespace = ns_xl;
+
+ /* update _private structure */
+ {
+ count = 0;
+ trav = this->children;
+ /* Get the number of child count */
+ while (trav) {
+ count++;
+ trav = trav->next;
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Child node count is %d", count);
+
+ _private->child_count = count;
+ if (count == 1) {
+ /* TODO: Should I error out here? */
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "WARNING: You have defined only one "
+ "\"subvolumes\" for unify volume. It may not "
+ "be the desired config, review your volume "
+ "volfile. If this is how you are testing it,"
+ " you may hit some performance penalty");
+ }
+
+ _private->xl_array = CALLOC (1,
+ sizeof (xlator_t) * (count + 1));
+ ERR_ABORT (_private->xl_array);
+
+ count = 0;
+ trav = this->children;
+ while (trav) {
+ _private->xl_array[count++] = trav->xlator;
+ trav = trav->next;
+ }
+ _private->xl_array[count] = _private->namespace;
+
+ /* self-heal part, start with generation '1' */
+ _private->inode_generation = 1;
+ /* Because, Foreground part is tested well */
+ _private->self_heal = ZR_UNIFY_FG_SELF_HEAL;
+ data = dict_get (this->options, "self-heal");
+ if (data) {
+ if (strcasecmp (data->data, "off") == 0)
+ _private->self_heal = ZR_UNIFY_SELF_HEAL_OFF;
+
+ if (strcasecmp (data->data, "foreground") == 0)
+ _private->self_heal = ZR_UNIFY_FG_SELF_HEAL;
+
+ if (strcasecmp (data->data, "background") == 0)
+ _private->self_heal = ZR_UNIFY_BG_SELF_HEAL;
+ }
+
+ /* optimist - ask bulde for more about it */
+ data = dict_get (this->options, "optimist");
+ if (data) {
+ if (gf_string2boolean (data->data,
+ &_private->optimist) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "optimist excepts only boolean "
+ "options");
+ }
+ }
+
+ LOCK_INIT (&_private->lock);
+ }
+
+ /* Now that everything is fine. */
+ this->private = (void *)_private;
+ {
+ /* Initialize scheduler, if everything else is successful */
+ ret = _private->sched_ops->init (this);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "Initializing scheduler failed, Exiting");
+ FREE (_private);
+ return -1;
+ }
+
+ ret = 0;
+
+ /* This section is required because some fops may look
+ * for 'xl->parent' variable
+ */
+ xlparent = CALLOC (1, sizeof (*xlparent));
+ xlparent->xlator = this;
+ if (!ns_xl->parents) {
+ ns_xl->parents = xlparent;
+ } else {
+ parent = ns_xl->parents;
+ while (parent->next)
+ parent = parent->next;
+ parent->next = xlparent;
+ }
+ /* Initialize the namespace volume */
+ if (!ns_xl->ready) {
+ ret = xlator_tree_init (ns_xl);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "initializing namespace node failed, "
+ "Exiting");
+ FREE (_private);
+ return -1;
+ }
+ }
+ }
+
+ /* Tell namespace node that init is done */
+ ns_xl->notify (ns_xl, GF_EVENT_PARENT_UP, this);
+
+ return 0;
+}
+
+/**
+ * fini - Free all the allocated memory
+ */
+void
+fini (xlator_t *this)
+{
+ unify_private_t *priv = this->private;
+ priv->sched_ops->fini (this);
+ this->private = NULL;
+ LOCK_DESTROY (&priv->lock);
+ FREE (priv->xl_array);
+ FREE (priv);
+ return;
+}
+
+
+struct xlator_fops fops = {
+ .stat = unify_stat,
+ .chmod = unify_chmod,
+ .readlink = unify_readlink,
+ .mknod = unify_mknod,
+ .mkdir = unify_mkdir,
+ .unlink = unify_unlink,
+ .rmdir = unify_rmdir,
+ .symlink = unify_symlink,
+ .rename = unify_rename,
+ .link = unify_link,
+ .chown = unify_chown,
+ .truncate = unify_truncate,
+ .create = unify_create,
+ .open = unify_open,
+ .readv = unify_readv,
+ .writev = unify_writev,
+ .statfs = unify_statfs,
+ .flush = unify_flush,
+ .fsync = unify_fsync,
+ .setxattr = unify_setxattr,
+ .getxattr = unify_getxattr,
+ .removexattr = unify_removexattr,
+ .opendir = unify_opendir,
+ .readdir = unify_readdir,
+ .fsyncdir = unify_fsyncdir,
+ .access = unify_access,
+ .ftruncate = unify_ftruncate,
+ .fstat = unify_fstat,
+ .lk = unify_lk,
+ .fchown = unify_fchown,
+ .fchmod = unify_fchmod,
+ .utimens = unify_utimens,
+ .lookup = unify_lookup,
+ .getdents = unify_getdents,
+ .checksum = unify_checksum,
+ .inodelk = unify_inodelk,
+ .finodelk = unify_finodelk,
+ .entrylk = unify_entrylk,
+ .fentrylk = unify_fentrylk,
+ .xattrop = unify_xattrop,
+ .fxattrop = unify_fxattrop
+};
+
+struct xlator_mops mops = {
+};
+
+struct xlator_cbks cbks = {
+};
+
+struct volume_options options[] = {
+ { .key = { "namespace" },
+ .type = GF_OPTION_TYPE_XLATOR
+ },
+ { .key = { "scheduler" },
+ .value = { "alu", "rr", "random", "nufa", "switch" },
+ .type = GF_OPTION_TYPE_STR
+ },
+ { .key = {"self-heal"},
+ .value = { "foreground", "background", "off" },
+ .type = GF_OPTION_TYPE_STR
+ },
+ /* TODO: remove it some time later */
+ { .key = {"optimist"},
+ .type = GF_OPTION_TYPE_BOOL
+ },
+
+ { .key = {NULL} },
+};
diff --git a/xlators/cluster/unify/src/unify.h b/xlators/cluster/unify/src/unify.h
new file mode 100644
index 00000000000..bc18dc53f52
--- /dev/null
+++ b/xlators/cluster/unify/src/unify.h
@@ -0,0 +1,132 @@
+/*
+ Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#ifndef _UNIFY_H
+#define _UNIFY_H
+
+#include "scheduler.h"
+#include "list.h"
+
+#define MAX_DIR_ENTRY_STRING (32 * 1024)
+
+#define ZR_UNIFY_SELF_HEAL_OFF 0
+#define ZR_UNIFY_FG_SELF_HEAL 1
+#define ZR_UNIFY_BG_SELF_HEAL 2
+
+/* Sometimes one should use completely random numbers.. its good :p */
+#define UNIFY_SELF_HEAL_GETDENTS_COUNT 1024
+
+#define NS(xl) (((unify_private_t *)xl->private)->namespace)
+
+/* This is used to allocate memory for local structure */
+#define INIT_LOCAL(fr, loc) \
+do { \
+ loc = CALLOC (1, sizeof (unify_local_t)); \
+ ERR_ABORT (loc); \
+ if (!loc) { \
+ STACK_UNWIND (fr, -1, ENOMEM); \
+ return 0; \
+ } \
+ fr->local = loc; \
+ loc->op_ret = -1; \
+ loc->op_errno = ENOENT; \
+} while (0)
+
+
+
+struct unify_private {
+ /* Update this structure depending on requirement */
+ void *scheduler; /* THIS SHOULD BE THE FIRST VARIABLE,
+ if xlator is using scheduler */
+ struct sched_ops *sched_ops; /* Scheduler options */
+ xlator_t *namespace; /* ptr to namespace xlator */
+ xlator_t **xl_array;
+ gf_boolean_t optimist;
+ int16_t child_count;
+ int16_t num_child_up;
+ uint8_t self_heal;
+ uint8_t is_up;
+ uint64_t inode_generation;
+ gf_lock_t lock;
+};
+typedef struct unify_private unify_private_t;
+
+struct unify_self_heal_struct {
+ uint8_t dir_checksum[ZR_FILENAME_MAX];
+ uint8_t ns_dir_checksum[ZR_FILENAME_MAX];
+ uint8_t file_checksum[ZR_FILENAME_MAX];
+ uint8_t ns_file_checksum[ZR_FILENAME_MAX];
+ off_t *offset_list;
+ int *count_list;
+ dir_entry_t **entry_list;
+};
+
+
+struct _unify_local_t {
+ int32_t call_count;
+ int32_t op_ret;
+ int32_t op_errno;
+ mode_t mode;
+ off_t offset;
+ dev_t dev;
+ uid_t uid;
+ gid_t gid;
+ int32_t flags;
+ int32_t entry_count;
+ int32_t count; // dir_entry_t count;
+ fd_t *fd;
+ struct stat stbuf;
+ struct statvfs statvfs_buf;
+ struct timespec tv[2];
+ char *name;
+ int32_t revalidate;
+
+ ino_t st_ino;
+ nlink_t st_nlink;
+
+ dict_t *dict;
+
+ int16_t *list;
+ int16_t *new_list; /* Used only in case of rename */
+ int16_t index;
+
+ int32_t failed;
+ int32_t return_eio; /* Used in case of different st-mode
+ present for a given path */
+
+ uint64_t inode_generation; /* used to store the per directory
+ * inode_generation. Got from inode's ctx
+ * of directory inodes
+ */
+
+ struct unify_self_heal_struct *sh_struct;
+ loc_t loc1, loc2;
+};
+typedef struct _unify_local_t unify_local_t;
+
+int32_t zr_unify_self_heal (call_frame_t *frame,
+ xlator_t *this,
+ unify_local_t *local);
+
+#endif /* _UNIFY_H */