diff options
27 files changed, 1789 insertions, 243 deletions
diff --git a/configure.ac b/configure.ac index cfccf3f9e64..a1a52eebd8e 100644 --- a/configure.ac +++ b/configure.ac @@ -29,6 +29,7 @@ AC_CONFIG_HEADERS([config.h])  AC_CONFIG_FILES([Makefile                  libglusterfs/Makefile                  libglusterfs/src/Makefile +                geo-replication/src/peer_gsec_create                  glusterfsd/Makefile                  glusterfsd/src/Makefile                  rpc/Makefile @@ -149,6 +150,7 @@ AC_CONFIG_FILES([Makefile                  extras/ocf/glusterd                  extras/ocf/volume                  extras/LinuxRPM/Makefile +                extras/geo-rep/Makefile                  contrib/fuse-util/Makefile                  contrib/uuid/uuid_types.h                  glusterfs-api.pc diff --git a/extras/Makefile.am b/extras/Makefile.am index 63ad7101655..a49e8628c38 100644 --- a/extras/Makefile.am +++ b/extras/Makefile.am @@ -2,7 +2,7 @@  EditorModedir = $(docdir)  EditorMode_DATA = glusterfs-mode.el glusterfs.vim -SUBDIRS = init.d systemd benchmarking hook-scripts $(OCF_SUBDIR) LinuxRPM +SUBDIRS = init.d systemd benchmarking hook-scripts $(OCF_SUBDIR) LinuxRPM geo-rep  confdir = $(sysconfdir)/glusterfs  conf_DATA = glusterfs-logrotate diff --git a/extras/geo-rep/Makefile.am b/extras/geo-rep/Makefile.am new file mode 100644 index 00000000000..fc5f56d54fa --- /dev/null +++ b/extras/geo-rep/Makefile.am @@ -0,0 +1,2 @@ +EXTRA_DIST = gsync-sync-gfid.c gsync-upgrade.sh generate-gfid-file.sh \ +	get-gfid.sh slave-upgrade.sh diff --git a/extras/geo-rep/generate-gfid-file.sh b/extras/geo-rep/generate-gfid-file.sh new file mode 100644 index 00000000000..c6739fbf140 --- /dev/null +++ b/extras/geo-rep/generate-gfid-file.sh @@ -0,0 +1,53 @@ +#!/bin/bash +#Usage: generate-gfid-file.sh <master-volfile-server:master-volume> <path-to-get-gfid.sh> <output-file> + +function get_gfids() +{ +    GET_GFID_CMD=$1 +    OUTPUT_FILE=$2 +    find . -exec $GET_GFID_CMD {} \; > $OUTPUT_FILE +} + +function mount_client() +{ +    local T; # temporary mount +    local i; # inode number + +    VOLFILE_SERVER=$1; +    VOLUME=$2; +    GFID_CMD=$3; +    OUTPUT=$4; + +    T=$(mktemp -d); + +    glusterfs -s $VOLFILE_SERVER --volfile-id $VOLUME $T; + +    i=$(stat -c '%i' $T); + +    [ "x$i" = "x1" ] || fatal "could not mount volume $MASTER on $T"; + +    cd $T; + +    get_gfids $GFID_CMD $OUTPUT + +    cd -; + +    umount $T || fatal "could not umount $MASTER from $T"; + +    rmdir $T || warn "rmdir of $T failed"; +} + + +function main() +{ +    SLAVE=$1 +    GET_GFID_CMD=$2 +    OUTPUT=$3 + +    VOLFILE_SERVER=`echo $SLAVE | sed -e 's/\(.*\):.*/\1/'` +    VOLUME_NAME=`echo $SLAVE | sed -e 's/.*:\(.*\)/\1/'` + +    mount_client $VOLFILE_SERVER $VOLUME_NAME $GET_GFID_CMD $OUTPUT +} + +main "$@"; diff --git a/extras/geo-rep/get-gfid.sh b/extras/geo-rep/get-gfid.sh new file mode 100755 index 00000000000..a4d609b0bc5 --- /dev/null +++ b/extras/geo-rep/get-gfid.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +ATTR_STR=`getfattr -h $1 -n glusterfs.gfid.string` +GLFS_PATH=`echo $ATTR_STR | sed -e 's/# file: \(.*\) glusterfs.gfid.string*/\1/g'` +GFID=`echo $ATTR_STR | sed -e 's/.*glusterfs.gfid.string="\(.*\)"/\1/g'` + +echo "$GFID $GLFS_PATH" diff --git a/extras/geo-rep/gsync-sync-gfid.c b/extras/geo-rep/gsync-sync-gfid.c new file mode 100644 index 00000000000..601f4720e25 --- /dev/null +++ b/extras/geo-rep/gsync-sync-gfid.c @@ -0,0 +1,106 @@ + +#include <stdio.h> +#include <errno.h> +#include <string.h> +#include <limits.h> +#include <sys/types.h> +#include <attr/xattr.h> +#include <libgen.h> +#include <ctype.h> +#include <stdlib.h> + +#ifndef UUID_CANONICAL_FORM_LEN +#define UUID_CANONICAL_FORM_LEN 36 +#endif + +#ifndef GF_FUSE_AUX_GFID_HEAL +#define GF_FUSE_AUX_GFID_HEAL    "glusterfs.gfid.heal" +#endif + +#define GLFS_LINE_MAX           (PATH_MAX + (2 * UUID_CANONICAL_FORM_LEN)) + +int +main (int argc, char *argv[]) +{ +        char *file                = NULL; +        char *tmp = NULL; +        char *tmp1 = NULL; +        char *parent_dir = NULL; +        char *gfid                = NULL; +        char *bname = NULL; +        int   ret                 = -1; +        int len = 0; +        FILE *fp                  = NULL; +        char  line[GLFS_LINE_MAX] = {0,}; +        char *path = NULL; +        void  *blob               = NULL; +        void  *tmp_blob               = NULL; + +        if (argc != 2) { +                /* each line in the file has the following format +                 * uuid-in-canonical-form path-relative-to-gluster-mount. +                 * Both uuid and relative path are from master mount. +                 */ +                fprintf (stderr, "usage: %s <file-of-paths-to-be-synced>\n", +                         argv[0]); +                goto out; +        } + +        file = argv[1]; + +        fp = fopen (file, "r"); +        if (fp == NULL) { +                fprintf (stderr, "cannot open %s for reading (%s)\n", +                         file, strerror (errno)); +                goto out; +        } + +        while (fgets (line, GLFS_LINE_MAX, fp) != NULL) { +                tmp = line; +                path = gfid = line; + +                path += UUID_CANONICAL_FORM_LEN + 1; + +                while(isspace (*path)) +                        path++; + +                if ((strlen (line) < GLFS_LINE_MAX) && +                    (line[strlen (line) - 1] == '\n')) +                        line[strlen (line) - 1] = '\0'; + +                line[UUID_CANONICAL_FORM_LEN] = '\0'; + +                tmp = strdup (path); +                tmp1 = strdup (path); +                parent_dir = dirname (tmp); +                bname = basename (tmp1); + +                /* gfid + '\0' + bname + '\0' */ +                len = UUID_CANONICAL_FORM_LEN + 1 + strlen (bname) + 1; + +                blob = calloc (1, len); + +                memcpy (blob, gfid, UUID_CANONICAL_FORM_LEN); + +                tmp_blob = blob + UUID_CANONICAL_FORM_LEN + 1; + +                memcpy (tmp_blob, bname, strlen (bname)); + +                ret = setxattr (parent_dir, GF_FUSE_AUX_GFID_HEAL, blob, len, +                                0); +                if (ret < 0) { +                        fprintf (stderr, "setxattr on %s/%s failed (%s)\n", +                                 parent_dir, bname, strerror (errno)); +                } +                memset (line, 0, GLFS_LINE_MAX); + +                free (blob); +                free (tmp); free (tmp1); +                blob = NULL; +        } + +        ret = 0; +out: +        return ret; +} + diff --git a/extras/geo-rep/gsync-upgrade.sh b/extras/geo-rep/gsync-upgrade.sh new file mode 100644 index 00000000000..b179487365a --- /dev/null +++ b/extras/geo-rep/gsync-upgrade.sh @@ -0,0 +1,123 @@ +#!/bin/bash +#usage: gsync-upgrade.sh <slave-volfile-server:slave-volume> <gfid-file> +#                        <path-to-gsync-sync-gfid> <ssh-identity-file> +#<slave-volfile-server>: a machine on which gluster cli can fetch slave volume info. +#                        slave-volfile-server defaults to localhost. +# +#<gfid-file>: a file containing paths and their associated gfids +#            on master. The paths are relative to master mount point +#            (not absolute). An example extract of <gfid-file> can be, +# +#            <extract> +#            22114455-57c5-46e9-a783-c40f83a72b09 /dir +#            25772386-3eb8-4550-a802-c3fdc938ca80 /dir/file +#            </extract> +# +#<ssh-identity-file>: file from which the identity (private key) for public key authentication is read. + +SLAVE_MOUNT='/tmp/glfs_slave' + +function SSH() +{ +    HOST=$1 +    SSHKEY=$2 + +    shift 2 + +    ssh -qi $SSHKEY \ +        -oPasswordAuthentication=no \ +        -oStrictHostKeyChecking=no \ +        "$HOST" "$@"; +} + +function get_bricks() +{ +    SSHKEY=$3 + +    SSH $1 $SSHKEY "gluster volume info $2" | grep -E 'Brick[0-9]+' | sed -e 's/[^:]*:\(.*\)/\1/g' +} + +function cleanup_brick() +{ +    HOST=$1 +    BRICK=$2 +    SSHKEY=$3 + +    # TODO: write a C program to receive a list of files and does cleanup on +    # them instead of spawning a new setfattr process for each file if +    # performance is bad. +    SSH -i $SSHKEY $HOST  "rm -rf $BRICK/.glusterfs/* && find $BRICK -exec setfattr -x trusted.gfid {} \;" +} + +function cleanup_slave() +{ +    SSHKEY=$2 + +    VOLFILE_SERVER=`echo $1 | sed -e 's/\(.*\):.*/\1/'` +    VOLUME_NAME=`echo $1 | sed -e 's/.*:\(.*\)/\1/'` + +    BRICKS=`get_bricks $VOLFILE_SERVER $VOLUME_NAME $SSHKEY` + +    for i in $BRICKS; do +	HOST=`echo $i | sed -e 's/\(.*\):.*/\1/'` +	BRICK=`echo $i | sed -e 's/.*:\(.*\)/\1/'` +	cleanup_brick $HOST $BRICK $SSHKEY +    done + +    SSH -i $SSHKEY $VOLFILE_SERVER "gluster --mode=script volume stop $VOLUME_NAME; gluster volume start $VOLUME_NAME"; + +} + +function mount_client() +{ +    local T; # temporary mount +    local i; # inode number +    GFID_FILE=$3 +    SYNC_CMD=$4 + +    T=$(mktemp -d); + +    glusterfs --aux-gfid-mount -s $1 --volfile-id $2 $T; + +    i=$(stat -c '%i' $T); + +    [ "x$i" = "x1" ] || fatal "could not mount volume $MASTER on $T"; + +    cd $T; + +    $SYNC_CMD $GFID_FILE + +    cd -; + +    umount -l $T || fatal "could not umount $MASTER from $T"; + +    rmdir $T || warn "rmdir of $T failed"; +} + +function sync_gfids() +{ +    SLAVE=$1 +    GFID_FILE=$2 + +    SLAVE_VOLFILE_SERVER=`echo $SLAVE | sed -e 's/\(.*\):.*/\1/'` +    SLAVE_VOLUME_NAME=`echo $SLAVE | sed -e 's/.*:\(.*\)/\1/'` + +    if [ "x$SLAVE_VOLFILE_SERVER" = "x" ]; then +        SLAVE_VOLFILE_SERVER="localhost" +    fi + +    mount_client $SLAVE_VOLFILE_SERVER $SLAVE_VOLUME_NAME $GFID_FILE $3 +} + +function upgrade() +{ +    SLAVE=$1 +    GFID_FILE=$2 +    SYNC_CMD=$3 +    SSHKEY=$4 + +    cleanup_slave $SLAVE $SSHKEY +    sync_gfids $SLAVE $GFID_FILE $SYNC_CMD +} + +upgrade "$@" diff --git a/extras/geo-rep/slave-upgrade.sh b/extras/geo-rep/slave-upgrade.sh new file mode 100644 index 00000000000..6198f408af7 --- /dev/null +++ b/extras/geo-rep/slave-upgrade.sh @@ -0,0 +1,102 @@ +#!/bin/bash +#usage: slave-upgrade.sh <volfile-server:volname> <gfid-file> +#                        <path-to-gsync-sync-gfid> +#<slave-volfile-server>: a machine on which gluster cli can fetch slave volume info. +#                        slave-volfile-server defaults to localhost. +# +#<gfid-file>: a file containing paths and their associated gfids +#            on master. The paths are relative to master mount point +#            (not absolute). An example extract of <gfid-file> can be, +# +#            <extract> +#            22114455-57c5-46e9-a783-c40f83a72b09 /dir +#            25772386-3eb8-4550-a802-c3fdc938ca80 /dir/file +#            </extract> + +function get_bricks() +{ +    gluster volume info $1 | grep -E 'Brick[0-9]+' | sed -e 's/[^:]*:\(.*\)/\1/g' +} + +function cleanup_brick() +{ +    HOST=$1 +    BRICK=$2 + +    # TODO: write a C program to receive a list of files and does cleanup on +    # them instead of spawning a new setfattr process for each file if +    # performance is bad. +    ssh $HOST "rm -rf $BRICK/.glusterfs/* && find $BRICK -exec setfattr -x trusted.gfid {} \; 2>/dev/null" +} + +function cleanup_slave() +{ +    VOLUME_NAME=`echo $1 | sed -e 's/.*:\(.*\)/\1/'` + +    BRICKS=`get_bricks $VOLUME_NAME` + +    for i in $BRICKS; do +	HOST=`echo $i | sed -e 's/\(.*\):.*/\1/'` +	BRICK=`echo $i | sed -e 's/.*:\(.*\)/\1/'` +	cleanup_brick $HOST $BRICK +    done + +    # Now restart the volume +    gluster --mode=script volume stop $VOLUME_NAME; +    gluster volume start $VOLUME_NAME; +} + +function mount_client() +{ +    local T; # temporary mount +    local i; # inode number + +    VOLUME_NAME=$2; +    GFID_FILE=$3 +    SYNC_CMD=$4 + +    T=$(mktemp -d); + +    glusterfs --aux-gfid-mount -s $1 --volfile-id $VOLUME_NAME $T; + +    i=$(stat -c '%i' $T); + +    cd $T; + +    $SYNC_CMD $GFID_FILE + +    cd -; + +    umount $T || fatal "could not umount $MASTER from $T"; + +    rmdir $T || warn "rmdir of $T failed"; +} + +function sync_gfids() +{ +    SLAVE=$1 +    GFID_FILE=$2 +    SYNC_CMD=$3 + +    SLAVE_VOLFILE_SERVER=`echo $SLAVE | sed -e 's/\(.*\):.*/\1/'` +    SLAVE_VOLUME_NAME=`echo $SLAVE | sed -e 's/.*:\(.*\)/\1/'` + +    if [ "x$SLAVE_VOLFILE_SERVER" = "x" ]; then +        SLAVE_VOLFILE_SERVER="localhost" +    fi + +    mount_client $SLAVE_VOLFILE_SERVER $SLAVE_VOLUME_NAME $GFID_FILE $SYNC_CMD +} + +function upgrade() +{ +    SLAVE=$1 +    GFID_FILE=$2 +    SYNC_CMD=$3 + +    cleanup_slave $SLAVE + +    sync_gfids $SLAVE $GFID_FILE $SYNC_CMD +} + +upgrade "$@" diff --git a/extras/hook-scripts/Makefile.am b/extras/hook-scripts/Makefile.am index 924c07c9e8b..edfa5a6ac87 100644 --- a/extras/hook-scripts/Makefile.am +++ b/extras/hook-scripts/Makefile.am @@ -1 +1 @@ -EXTRA_DIST = S29CTDBsetup.sh S30samba-start.sh S30samba-stop.sh S30samba-set.sh +EXTRA_DIST = S29CTDBsetup.sh S30samba-start.sh S30samba-stop.sh S30samba-set.sh Sglusterd-geo-rep-create-post.sh diff --git a/geo-replication/src/Makefile.am b/geo-replication/src/Makefile.am index 9e410cda633..6feeda8e68c 100644 --- a/geo-replication/src/Makefile.am +++ b/geo-replication/src/Makefile.am @@ -1,5 +1,12 @@ +  gsyncddir = $(libexecdir)/glusterfs +gsyncd_SCRIPTS = gverify.sh peer_add_secret_pub peer_gsec_create + +# peer_gsec_create is not added to EXTRA_DIST as it's derived +# from a .in file +EXTRA_DIST = gverify.sh peer_add_secret_pub +  gsyncd_PROGRAMS = gsyncd  gsyncd_SOURCES = gsyncd.c procdiggy.c diff --git a/geo-replication/src/gsyncd.c b/geo-replication/src/gsyncd.c index 9c4a5bdffb3..68446d9ad34 100644 --- a/geo-replication/src/gsyncd.c +++ b/geo-replication/src/gsyncd.c @@ -285,6 +285,46 @@ invoke_rsync (int argc, char **argv)          return 1;  } +static int +invoke_gluster (int argc, char **argv) +{ +        int i = 0; +        int j = 0; +        int optsover = 0; +        char *ov = NULL; + +        for (i = 1; i < argc; i++) { +                ov = strtail (argv[i], "--"); +                if (ov && !optsover) { +                        if (*ov == '\0') +                                optsover = 1; +                        continue; +                } +                switch (++j) { +                case 1: +                        if (strcmp (argv[i], "volume") != 0) +                                goto error; +                        break; +                case 2: +                        if (strcmp (argv[i], "info") != 0) +                                goto error; +                        break; +                case 3: +                        break; +                default: +                        goto error; +                } +        } + +        argv[0] = "gluster"; +        execvp (SBIN_DIR"/gluster", argv); +        fprintf (stderr, "exec of gluster failed\n"); +        return 127; + + error: +        fprintf (stderr, "disallowed gluster invocation\n"); +        return 1; +}  struct invocable {          char *name; @@ -292,8 +332,9 @@ struct invocable {  };  struct invocable invocables[] = { -        { "rsync",  invoke_rsync  }, -        { "gsyncd", invoke_gsyncd }, +        { "rsync",   invoke_rsync  }, +        { "gsyncd",  invoke_gsyncd }, +        { "gluster", invoke_gluster },          { NULL, NULL}  }; diff --git a/geo-replication/src/gverify.sh b/geo-replication/src/gverify.sh new file mode 100755 index 00000000000..186af53a407 --- /dev/null +++ b/geo-replication/src/gverify.sh @@ -0,0 +1,142 @@ +#!/bin/bash + +# Script to verify the Master and Slave Gluster compatibility. +# To use ./gverify <master volume> <slave host> <slave volume> +# Returns 0 if master and slave compatible. + +BUFFER_SIZE=1000; +slave_log_file=`gluster --print-logdir`/geo-replication-slaves/slave.log + +function SSHM() +{ +    ssh -q \ +	-oPasswordAuthentication=no \ +	-oStrictHostKeyChecking=no \ +	-oControlMaster=yes \ +	"$@"; +} + +function cmd_master() +{ +    VOL=$1; +    local cmd_line; +    cmd_line=$(cat <<EOF +function do_verify() { +v=\$1; +d=\$(mktemp -d 2>/dev/null); +glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --volfile-id \$v -l $slave_log_file \$d; +i=\$(stat -c "%i" \$d); +if [[ "\$i" -ne "1" ]]; then +echo 0:0; +exit 1; +fi; +cd \$d; +available_size=\$(df \$d | tail -1 | awk "{print \\\$2}"); +umount -l \$d; +rmdir \$d; +ver=\$(gluster --version | head -1 | cut -f2 -d " "); +echo \$available_size:\$ver; +}; +cd /tmp; +[ x$VOL != x ] && do_verify $VOL; +EOF +); + +echo $cmd_line; +} + +function cmd_slave() +{ +    VOL=$1; +    local cmd_line; +    cmd_line=$(cat <<EOF +function do_verify() { +v=\$1; +d=\$(mktemp -d 2>/dev/null); +glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --volfile-id \$v -l $slave_log_file \$d; +i=\$(stat -c "%i" \$d); +if [[ "\$i" -ne "1" ]]; then +echo 0:0; +exit 1; +fi; +cd \$d; +available_size=\$(df \$d | tail -1 | awk "{print \\\$4}"); +umount -l \$d; +rmdir \$d; +ver=\$(gluster --version | head -1 | cut -f2 -d " "); +echo \$available_size:\$ver; +}; +cd /tmp; +[ x$VOL != x ] && do_verify $VOL; +EOF +); + +echo $cmd_line; +} + +function master_stats() +{ +    MASTERVOL=$1; +    local cmd_line; +    cmd_line=$(cmd_master $MASTERVOL); +    bash -c "$cmd_line"; +} + + +function slave_stats() +{ +    SLAVEHOST=$1; +    SLAVEVOL=$2; +    local cmd_line; +    cmd_line=$(cmd_slave $SLAVEVOL); +    SSHM $SLAVEHOST bash -c "'$cmd_line'"; +} + + +function main() +{ +    ERRORS=0; +    master_data=$(master_stats $1); +    slave_data=$(slave_stats $2 $3); +    master_size=$(echo $master_data | cut -f1 -d':'); +    slave_size=$(echo $slave_data | cut -f1 -d':'); +    master_version=$(echo $master_data | cut -f2 -d':'); +    slave_version=$(echo $slave_data | cut -f2 -d':'); +    log_file=$4 + +    if [[ "x$master_size" = "x" || "x$master_version" = "x" || "$master_size" -eq "0" ]]; then +	echo "Unable to fetch master volume details." > $log_file; +	exit 1; +    fi; + +    if [[ "x$slave_size" = "x" || "x$slave_version" = "x" || "$slave_size" -eq "0" ]]; then +        ping -w 5 $2; +        if [ $? -ne 0 ]; then +            echo "$2 not reachable." > $log_file +            exit 1; +        fi; +	echo "Unable to fetch slave volume details." > $log_file; +	exit 1; +    fi; + +    if [ $slave_size -ge $(($master_size - $BUFFER_SIZE )) ]; then +	echo "Total size of master is lesser than available size of slave." > $log_file; +    else +	echo "Total size of master is greater than available size of slave." > $log_file; +	ERRORS=$(($ERRORS + 1)); +	exit $ERRORS; +    fi; + +    if [[ $master_version < $slave_version || $master_version == $slave_version ]]; then +	echo "Gluster version of master and slave matches." > $log_file; +    else +	echo "Gluster version mismatch between master and slave." > $log_file; +	ERRORS=$(($ERRORS + 1)); +	exit $ERRORS; +    fi; + +    exit $ERRORS; +} + + +main "$@"; diff --git a/geo-replication/src/peer_add_secret_pub b/geo-replication/src/peer_add_secret_pub new file mode 100644 index 00000000000..1ce040d4419 --- /dev/null +++ b/geo-replication/src/peer_add_secret_pub @@ -0,0 +1,3 @@ +#!/bin/bash + +cat $1 >> ~/.ssh/authorized_keys diff --git a/geo-replication/src/peer_gsec_create.in b/geo-replication/src/peer_gsec_create.in new file mode 100755 index 00000000000..ef630bd4417 --- /dev/null +++ b/geo-replication/src/peer_gsec_create.in @@ -0,0 +1,12 @@ +#!/bin/bash + +prefix=@prefix@ +exec_prefix=@exec_prefix@ + +if [ ! -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub ]; then +    \rm -rf "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem* +    ssh-keygen -N '' -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem > /dev/null +fi + +output=`echo command=\"@libexecdir@/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub` +echo $output diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index c19f6b45919..83f969639cc 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,6 +2,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon  syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \  	resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ -	$(top_builddir)/contrib/ipaddr-py/ipaddr.py +	$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py  CLEANFILES = diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md index d45006932d1..0eb15fa7170 100644 --- a/geo-replication/syncdaemon/README.md +++ b/geo-replication/syncdaemon/README.md @@ -11,13 +11,13 @@ Requirements are categorized according to this.  * Python >= 2.5, or 2.4 with Ctypes (see below) (both)  * OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave)  * rsync (both) -* glusterfs with marker support (master); glusterfs (optional on slave) -* FUSE; for supported versions consult glusterfs +* glusterfs: with marker and changelog support (master & slave); +* FUSE: glusterfs fuse module with auxilary gfid based access support  INSTALLATION  ------------ -As of now, the supported way of operation is running from the source directory. +As of now, the supported way of operation is running from the source directory or using the RPMs given.  If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). @@ -46,34 +46,11 @@ USAGE  -----  gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally. -Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors -for it with gysncd: +Assume we have a gluster volume _pop_ at localhost. We try to set up the mirroring for volume +_pop_ using gsyncd for gluster volume _moz_ on remote machine/cluster @ example.com. The +respective gsyncd invocations are (demoing some syntax sugaring): -1. _/data/mirror_ -2. local gluster volume _yow_ -3. _/data/far_mirror_ at example.com -4. gluster volume _moz_ at example.com - -The respective gsyncd invocations are (demoing some syntax sugaring): - -1. - -      gsyncd.py gluster://localhost:pop file:///data/mirror - -  or short form - -      gsyncd.py :pop /data/mirror - -2. `gsyncd :pop :yow` -3. - -       gsyncd.py :pop ssh://example.com:/data/far_mirror - -  or short form - -       gsyncd.py :pop example.com:/data/far_mirror - -4. `gsyncd.py :pop example.com::moz` +`gsyncd.py :pop example.com::moz`  gsyncd has to be available on both sides; it's location on the remote side has to be specified  via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py index e55bec519e9..a326e824681 100644 --- a/geo-replication/syncdaemon/configinterface.py +++ b/geo-replication/syncdaemon/configinterface.py @@ -24,9 +24,9 @@ class MultiDict(object):      def __getitem__(self, key):          val = None          for d in self.dicts: -            if d.get(key): +            if d.get(key) != None:                  val = d[key] -        if not val: +        if val == None:              raise KeyError(key)          return val diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 387900e6ce8..ad498c39cdc 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -2,10 +2,12 @@  import os  import os.path +import glob  import sys  import time  import logging  import signal +import shutil  import optparse  import fcntl  import fnmatch @@ -17,7 +19,7 @@ from ipaddr import IPAddress, IPNetwork  from gconf import gconf  from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception -from syncdutils import GsyncdError, select, set_term_handler, privileged +from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file  from configinterface import GConffile  import resource  from monitor import monitor @@ -109,6 +111,17 @@ def startup(**kw):      GLogger._gsyncd_loginit(**kw) + +def _unlink(path): +    try: +        os.unlink(path) +    except (OSError, IOError): +        if sys.exc_info()[1].errno == ENOENT: +            pass +        else: +            raise GsyncdError('Unlink error: %s' % path) + +  def main():      """main routine, signal/exception handling boilerplates"""      gconf.starttime = time.time() @@ -153,21 +166,27 @@ def main_i():      op.add_option('--gluster-log-file',    metavar='LOGF',  default=os.devnull, type=str, action='callback', callback=store_abs)      op.add_option('--gluster-log-level',   metavar='LVL')      op.add_option('--gluster-params',      metavar='PRMS',  default='') +    op.add_option('--glusterd-uuid',       metavar='UUID',  type=str, default='', help=SUPPRESS_HELP)      op.add_option('--gluster-cli-options', metavar='OPTS',  default='--log-file=-')      op.add_option('--mountbroker',         metavar='LABEL')      op.add_option('-p', '--pid-file',      metavar='PIDF',  type=str, action='callback', callback=store_abs)      op.add_option('-l', '--log-file',      metavar='LOGF',  type=str, action='callback', callback=store_abs)      op.add_option('--log-file-mbr',        metavar='LOGF',  type=str, action='callback', callback=store_abs)      op.add_option('--state-file',          metavar='STATF', type=str, action='callback', callback=store_abs) +    op.add_option('--state-detail-file',   metavar='STATF', type=str, action='callback', callback=store_abs)      op.add_option('--ignore-deletes',      default=False, action='store_true') +    op.add_option('--isolated-slave',      default=False, action='store_true')      op.add_option('--use-rsync-xattrs',    default=False, action='store_true')      op.add_option('-L', '--log-level',     metavar='LVL')      op.add_option('-r', '--remote-gsyncd', metavar='CMD',   default=os.path.abspath(sys.argv[0]))      op.add_option('--volume-id',           metavar='UUID') +    op.add_option('--slave-id',            metavar='ID')      op.add_option('--session-owner',       metavar='ID') +    op.add_option('--local-id',            metavar='ID',    help=SUPPRESS_HELP, default='') +    op.add_option('--local-path',          metavar='PATH',  help=SUPPRESS_HELP, default='')      op.add_option('-s', '--ssh-command',   metavar='CMD',   default='ssh')      op.add_option('--rsync-command',       metavar='CMD',   default='rsync') -    op.add_option('--rsync-options',       metavar='OPTS',  default='--sparse') +    op.add_option('--rsync-options',       metavar='OPTS',  default='')      op.add_option('--rsync-ssh-options',   metavar='OPTS',  default='--compress')      op.add_option('--timeout',             metavar='SEC',   type=int, default=120)      op.add_option('--connection-timeout',  metavar='SEC',   type=int, default=60, help=SUPPRESS_HELP) @@ -186,15 +205,28 @@ def main_i():      # see crawl() for usage of the above tunables      op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) +    # changelog or xtime? (TODO: Change the default) +    op.add_option('--change-detector', metavar='MODE', type=str, default='xtime') +    # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time) +    op.add_option('--change-interval', metavar='SEC', type=int, default=3) +    # working directory for changelog based mechanism +    op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) +      op.add_option('-c', '--config-file',   metavar='CONF',  type=str, action='callback', callback=store_local)      # duh. need to specify dest or value will be mapped to None :S      op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) +    op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local) +    op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local)      op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local)      op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,      action='callback', callback=store_local_curry(True))      op.add_option('-N', '--no-daemon', dest="go_daemon",    action='callback', callback=store_local_curry('dont')) +    op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local) +    op.add_option('--create', type=str, dest="create", action='callback', callback=store_local) +    op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True))      op.add_option('--debug', dest="go_daemon",              action='callback', callback=lambda *a: (store_local_curry('dont')(*a),                                                                                                      setattr(a[-1].values, 'log_file', '-'),                                                                                                      setattr(a[-1].values, 'log_level', 'DEBUG'))), +    op.add_option('--path', type=str, action='append')      for a in ('check', 'get'):          op.add_option('--config-' + a,      metavar='OPT',  type=str, dest='config', action='callback', @@ -225,6 +257,19 @@ def main_i():      # values container.      defaults = op.get_default_values()      opts, args = op.parse_args(values=optparse.Values()) +    args_orig = args[:] +    r = rconf.get('resource_local') +    if r: +        if len(args) == 0: +            args.append(None) +        args[0] = r +    r = rconf.get('resource_remote') +    if r: +        if len(args) == 0: +            raise GsyncdError('local resource unspecfied') +        elif len(args) == 1: +            args.append(None) +        args[1] = r      confdata = rconf.get('config')      if not (len(args) == 2 or \              (len(args) == 1 and rconf.get('listen')) or \ @@ -234,6 +279,12 @@ def main_i():          sys.stderr.write(op.get_usage() + "\n")          sys.exit(1) +    verify = rconf.get('verify') +    if verify: +        logging.info (verify) +        logging.info ("Able to spawn gsyncd.py") +        return +      restricted = os.getenv('_GSYNCD_RESTRICTED_')      if restricted: @@ -250,6 +301,17 @@ def main_i():                                    (k, v))      confrx = getattr(confdata, 'rx', None) +    def makersc(aa, check=True): +        if not aa: +            return ([], None, None) +        ra = [resource.parse_url(u) for u in aa] +        local = ra[0] +        remote = None +        if len(ra) > 1: +            remote = ra[1] +        if check and not local.can_connect_to(remote): +            raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) +        return (ra, local, remote)      if confrx:          # peers are regexen, don't try to parse them          if confrx == 'glob': @@ -257,27 +319,20 @@ def main_i():          canon_peers = args          namedict = {}      else: -        rscs = [resource.parse_url(u) for u in args]          dc = rconf.get('url_print') +        rscs, local, remote = makersc(args_orig, not dc)          if dc:              for r in rscs:                  print(r.get_url(**{'normal': {},                                     'canon': {'canonical': True},                                     'canon_esc': {'canonical': True, 'escaped': True}}[dc]))              return -        local = remote = None -        if rscs: -            local = rscs[0] -            if len(rscs) > 1: -                remote = rscs[1] -            if not local.can_connect_to(remote): -                raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path))          pa = ([], [], [])          urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True})          for x in rscs:              for i in range(len(pa)):                  pa[i].append(x.get_url(**urlprms[i])) -        peers, canon_peers, canon_esc_peers = pa +        _, canon_peers, canon_esc_peers = pa          # creating the namedict, a dict representing various ways of referring to / repreenting          # peers to be fillable in config templates          mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) @@ -327,6 +382,39 @@ def main_i():      gconf.__dict__.update(opts.__dict__)      gconf.configinterface = gcnf +    delete = rconf.get('delete') +    if delete: +        logging.info ('geo-replication delete') +        # Delete pid file, status file, socket file +        cleanup_paths = [] +        if getattr(gconf, 'pid_file', None): +            cleanup_paths.append(gconf.pid_file) + +        if getattr(gconf, 'state_file', None): +            cleanup_paths.append(gconf.state_file) + +        if getattr(gconf, 'state_detail_file', None): +            cleanup_paths.append(gconf.state_detail_file) + +        if getattr(gconf, 'state_socket_unencoded', None): +            cleanup_paths.append(gconf.state_socket_unencoded) + +        # Cleanup changelog working dirs +        if getattr(gconf, 'working_dir', None): +            try: +                shutil.rmtree(gconf.working_dir) +            except (IOError, OSError): +                if sys.exc_info()[1].errno == ENOENT: +                    pass +                else: +                    raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir) + +        for path in cleanup_paths: +            # To delete temp files +            for f in glob.glob(path + "*"): +                _unlink(f) +        return +      if restricted and gconf.allow_network:          ssh_conn = os.getenv('SSH_CONNECTION')          if not ssh_conn: @@ -380,9 +468,16 @@ def main_i():                  raise          return +    create = rconf.get('create') +    if create: +        if getattr(gconf, 'state_file', None): +            update_file(gconf.state_file, lambda f: f.write(create + '\n')) +        return +      go_daemon = rconf['go_daemon']      be_monitor = rconf.get('monitor') +    rscs, local, remote = makersc(args)      if not be_monitor and isinstance(remote, resource.SSH) and \         go_daemon == 'should':          go_daemon = 'postconn' @@ -393,16 +488,16 @@ def main_i():          label = 'monitor'      elif remote:          #master -        label = '' +        label = gconf.local_path      else:          label = 'slave'      startup(go_daemon=go_daemon, log_file=log_file, label=label) +    resource.Popen.init_errhandler()      if be_monitor: -        return monitor() +        return monitor(*rscs) -    logging.info("syncing: %s" % " -> ".join(peers)) -    resource.Popen.init_errhandler() +    logging.info("syncing: %s" % " -> ".join(r.url for r in rscs))      if remote:          go_daemon = remote.connect_remote(go_daemon=go_daemon)          if go_daemon: diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py index f0a9d22920a..b5b6956aea6 100644 --- a/geo-replication/syncdaemon/libcxattr.py +++ b/geo-replication/syncdaemon/libcxattr.py @@ -43,6 +43,16 @@ class Xattr(object):          return cls._query_xattr( path, siz, 'lgetxattr', attr)      @classmethod +    def lgetxattr_buf(cls, path, attr): +        """lgetxattr variant with size discovery""" +        size = cls.lgetxattr(path, attr) +        if size == -1: +            cls.raise_oserr() +        if size == 0: +            return '' +        return cls.lgetxattr(path, attr, size) + +    @classmethod      def llistxattr(cls, path, siz=0):          ret = cls._query_xattr(path, siz, 'llistxattr')          if isinstance(ret, str): @@ -56,6 +66,11 @@ class Xattr(object):              cls.raise_oserr()      @classmethod +    def lsetxattr_l(cls, path, attr, val): +        """ lazy lsetxattr(): caller handles errno """ +        cls.libc.lsetxattr(path, attr, val, len(val), 0) + +    @classmethod      def lremovexattr(cls, path, attr):          ret = cls.libc.lremovexattr(path, attr)          if ret == -1: diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py new file mode 100644 index 00000000000..68ec3baf144 --- /dev/null +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -0,0 +1,64 @@ +import os +from ctypes import * +from ctypes.util import find_library + +class Changes(object): +    libgfc = CDLL(find_library("gfchangelog"), use_errno=True) + +    @classmethod +    def geterrno(cls): +        return get_errno() + +    @classmethod +    def raise_oserr(cls): +        errn = cls.geterrno() +        raise OSError(errn, os.strerror(errn)) + +    @classmethod +    def _get_api(cls, call): +        return getattr(cls.libgfc, call) + +    @classmethod +    def cl_register(cls, brick, path, log_file, log_level, retries = 0): +        ret = cls._get_api('gf_changelog_register')(brick, path, +                                                    log_file, log_level, retries) +        if ret == -1: +            cls.raise_oserr() + +    @classmethod +    def cl_scan(cls): +        ret = cls._get_api('gf_changelog_scan')() +        if ret == -1: +            cls.raise_oserr() + +    @classmethod +    def cl_startfresh(cls): +        ret = cls._get_api('gf_changelog_start_fresh')() +        if ret == -1: +            cls.raise_oserr() + +    @classmethod +    def cl_getchanges(cls): +        """ remove hardcoding for path name length """ +        def clsort(f): +            return f.split('.')[-1] +        changes = [] +        buf = create_string_buffer('\0', 4096) +        call = cls._get_api('gf_changelog_next_change') + +        while True: +            ret = call(buf, 4096) +            if ret in (0, -1): +                break; +            changes.append(buf.raw[:ret-1]) +        if ret == -1: +            cls.raise_oserr() +        # cleanup tracker +        cls.cl_startfresh() +        return sorted(changes, key=clsort) + +    @classmethod +    def cl_done(cls, clfile): +        ret = cls._get_api('gf_changelog_done')(clfile) +        if ret == -1: +            cls.raise_oserr() diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index f903f30595d..58df14954bb 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -4,22 +4,20 @@ import time  import stat  import random  import signal +import json  import logging  import socket +import string  import errno -import re -from errno import ENOENT, ENODATA, EPIPE +from shutil import copyfileobj +from errno import ENOENT, ENODATA, EPIPE, EEXIST  from threading import currentThread, Condition, Lock  from datetime import datetime -try: -    from hashlib import md5 as md5 -except ImportError: -    # py 2.4 -    from md5 import new as md5  from gconf import gconf -from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ -                       escape, unescape, select +from tempfile import mkdtemp, NamedTemporaryFile +from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ +                       unescape, select, gauxpfx, md5hex, selfkill, entry2pb  URXTIME = (-1, 0) @@ -51,18 +49,20 @@ def _volinfo_hook_relax_foreign(self):  # The API! -def gmaster_builder(): +def gmaster_builder(excrawl=None):      """produce the GMaster class variant corresponding         to sync mode"""      this = sys.modules[__name__]      modemixin = gconf.special_sync_mode      if not modemixin:          modemixin = 'normal' -    logging.info('setting up master for %s sync mode' % modemixin) +    changemixin = isinstance(excrawl, str) and excrawl or gconf.change_detector +    logging.info('setting up %s change detection mode' % changemixin)      modemixin = getattr(this, modemixin.capitalize() + 'Mixin') +    crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')      sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin      purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin -    class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin): +    class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin):          pass      return _GMaster @@ -100,12 +100,9 @@ class NormalMixin(object):      def make_xtime_opts(self, is_master, opts):          if not 'create' in opts: -            opts['create'] = is_master and not self.inter_master +            opts['create'] = is_master          if not 'default_xtime' in opts: -            if is_master and self.inter_master: -                opts['default_xtime'] = ENODATA -            else: -                opts['default_xtime'] = URXTIME +            opts['default_xtime'] = URXTIME      def xtime_low(self, server, path, **opts):          xt = server.xtime(path, self.uuid) @@ -114,7 +111,7 @@ class NormalMixin(object):          if xt == ENODATA or xt < self.volmark:              if opts['create']:                  xt = _xtime_now() -                server.set_xtime(path, self.uuid, xt) +                server.aggregated.set_xtime(path, self.uuid, xt)              else:                  xt = opts['default_xtime']          return xt @@ -151,6 +148,13 @@ class NormalMixin(object):      def set_slave_xtime(self, path, mark):          self.slave.server.set_xtime(path, self.uuid, mark) +class PartialMixin(NormalMixin): +    """a variant tuned towards operation with a master +       that has partial info of the slave (brick typically)""" + +    def xtime_reversion_hook(self, path, xtl, xtr): +        pass +  class WrapupMixin(NormalMixin):      """a variant that differs from normal in terms         of ignoring non-indexed files""" @@ -163,7 +167,7 @@ class WrapupMixin(NormalMixin):              opts['default_xtime'] = URXTIME      @staticmethod -    def keepalive_payload_hook(timo, gap): +    def keepalive_payload_hook(self, timo, gap):          return (None, gap)      def volinfo_hook(self): @@ -236,19 +240,19 @@ class BlindMixin(object):                  # from interrupted gsyncd transfer                  logging.warn('have to fix up missing xtime on ' + path)                  xt0 = _xtime_now() -                server.set_xtime(path, self.uuid, xt0) +                server.aggregated.set_xtime(path, self.uuid, xt0)              else:                  xt0 = opts['default_xtime']              xt = (xt0, xt[1])          return xt      @staticmethod -    def keepalive_payload_hook(timo, gap): +    def keepalive_payload_hook(self, timo, gap):          return (None, gap)      def volinfo_hook(self):          res = _volinfo_hook_relax_foreign(self) -        volinfo_r_new = self.slave.server.native_volume_info() +        volinfo_r_new = self.slave.server.aggregated.native_volume_info()          if volinfo_r_new['retval']:              raise GsyncdError("slave is corrupt")          if getattr(self, 'volinfo_r', None): @@ -321,9 +325,7 @@ class PurgeNoopMixin(object):      def purge_missing(self, path, names):          pass - - -class GMasterBase(object): +class GMasterCommon(object):      """abstract class impementling master role"""      KFGN = 0 @@ -334,8 +336,8 @@ class GMasterBase(object):          err out on multiple foreign masters          """ -        fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ -                          self.master.server.native_volume_info() +        fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \ +                          self.master.server.aggregated.native_volume_info()          fgn_vi = None          if fgn_vis:              if len(fgn_vis) > 1: @@ -376,6 +378,33 @@ class GMasterBase(object):          self.make_xtime_opts(rsc == self.master, opts)          return self.xtime_low(rsc.server, path, **opts) +    def get_initial_crawl_data(self): +        default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0} +        if getattr(gconf, 'state_detail_file', None): +            try: +                return json.load(open(gconf.state_detail_file)) +            except (IOError, OSError): +                ex = sys.exc_info()[1] +                if ex.errno == ENOENT: +                    # Create file with initial data +                    with open(gconf.state_detail_file, 'wb') as f: +                        json.dump(default_data, f) +                    return default_data +                else: +                    raise + +        return default_data + +    def update_crawl_data(self): +        if getattr(gconf, 'state_detail_file', None): +            try: +                same_dir = os.path.dirname(gconf.state_detail_file) +                with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: +                    json.dump(self.total_crawl_stats, tmp) +                    os.rename(tmp.name, gconf.state_detail_file) +            except (IOError, OSError): +                raise +      def __init__(self, master, slave):          self.master = master          self.slave = slave @@ -392,15 +421,12 @@ class GMasterBase(object):          self.crawls = 0          self.turns = 0          self.total_turns = int(gconf.turns) -        self.lastreport = {'crawls': 0, 'turns': 0} +        self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} +        self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0, +                            'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0} +        self.total_crawl_stats = self.get_initial_crawl_data()          self.start = None          self.change_seen = None -        self.syncTime=0 -        self.lastSyncTime=0 -        self.crawlStartTime=0 -        self.crawlTime=0 -        self.filesSynced=0 -        self.bytesSynced=0          # the authoritative (foreign, native) volinfo pair          # which lets us deduce what to do when we refetch          # the volinfos from system @@ -409,8 +435,94 @@ class GMasterBase(object):          # the actual volinfo we make use of          self.volinfo = None          self.terminate = False +        self.sleep_interval = 1          self.checkpoint_thread = None +    def init_keep_alive(cls): +        """start the keep-alive thread """ +        timo = int(gconf.timeout or 0) +        if timo > 0: +            def keep_alive(): +                while True: +                    vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5) +                    cls.slave.server.keep_alive(vi) +                    time.sleep(gap) +            t = Thread(target=keep_alive) +            t.start() + +    def volinfo_query(self): +        """volume info state machine""" +        volinfo_sys, state_change = self.volinfo_hook() +        if self.inter_master: +            self.volinfo = volinfo_sys[self.KFGN] +        else: +            self.volinfo = volinfo_sys[self.KNAT] +        if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): +            logging.info('new master is %s', self.uuid) +            if self.volinfo: +                logging.info("%s master with volume id %s ..." % \ +                                 (self.inter_master and "intermediate" or "primary", +                                  self.uuid)) +        if state_change == self.KFGN: +            gconf.configinterface.set('volume_id', self.uuid) +        if self.volinfo: +            if self.volinfo['retval']: +                raise GsyncdError ("master is corrupt") +            self.start_checkpoint_thread() +        else: +            if should_display_info or self.crawls == 0: +                if self.inter_master: +                    logging.info("waiting for being synced from %s ..." % \ +                                     self.volinfo_state[self.KFGN]['uuid']) +                else: +                    logging.info("waiting for volume info ...") +            return True + +    def should_crawl(cls): +        return (gconf.glusterd_uuid in cls.master.server.node_uuid()) + +    def register(self): +        self.register() + +    def crawlwrap(self, oneshot=False): +        if oneshot: +            # it's important to do this during the oneshot crawl as +            # for a passive gsyncd (ie. in a replicate scenario) +            # the keepalive thread would keep the connection alive. +            self.init_keep_alive() +        self.lastreport['time'] = time.time() +        self.crawl_stats['crawl_starttime'] = datetime.now() + +        logging.info('crawl interval: %d seconds' % self.sleep_interval) +        t0 = time.time() +        crawl = self.should_crawl() +        while not self.terminate: +            if self.volinfo_query(): +                continue +            t1 = time.time() +            if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds +                crawl = self.should_crawl() +                t0 = t1 +            if not crawl: +                time.sleep(5) +                continue +            if self.start: +                logging.debug("... crawl #%d done, took %.6f seconds" % \ +                                  (self.crawls, time.time() - self.start)) +            self.start = t1 +            should_display_info = self.start - self.lastreport['time'] >= 60 +            if should_display_info: +                logging.info("%d crawls, %d turns", +                             self.crawls - self.lastreport['crawls'], +                             self.turns - self.lastreport['turns']) +                self.lastreport.update(crawls = self.crawls, +                                       turns = self.turns, +                                       time = self.start) +            self.crawl() +            if oneshot: +                return +            time.sleep(self.sleep_interval) +      @classmethod      def _checkpt_param(cls, chkpt, prm, xtimish=True):          """use config backend to lookup a parameter belonging to @@ -443,32 +555,37 @@ class GMasterBase(object):          return ts      def get_extra_info(self): -        str_info="\nFile synced : %d" %(self.filesSynced) -        str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced) -        str_info+="\nSync Time : %f seconds" %(self.syncTime) -        self.crawlTime=datetime.now()-self.crawlStartTime -        years , days =divmod(self.crawlTime.days,365.25) -        years=int(years) -        days=int(days) +        str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced']) +        str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced']) + +        self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime'] + +        str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time'])) +        str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time']) +        str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time']) +        str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced']) +        str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced']) +        str_info += "\0" +        logging.debug(str_info) +        return str_info + +    def _crawl_time_format(self, crawl_time): +        # Ex: 5 years, 4 days, 20:23:10 +        years, days = divmod(crawl_time.days, 365.25) +        years = int(years) +        days = int(days)          date="" -        m, s = divmod(self.crawlTime.seconds, 60) +        m, s = divmod(crawl_time.seconds, 60)          h, m = divmod(m, 60) -        if years!=0 : -                date+=str(years)+" year " -        if days!=0 : -                date+=str(days)+" day " -        if h!=0 : -                date+=str(h)+" H : " -        if m!=0 or h!=0 : -                date+=str(m)+" M : " - -        date+=str(s)+" S" -        self.crawlTime=date -        str_info+="\nCrawl Time : %s" %(str(self.crawlTime)) -        str_info+="\n\0" -        return str_info +        if years != 0: +            date += "%s %s " % (years, "year" if years == 1 else "years") +        if days != 0: +            date += "%s %s " % (days, "day" if days == 1 else "days") + +        date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) +        return date      def checkpt_service(self, chan, chkpt, tgt):          """checkpoint service loop @@ -517,7 +634,7 @@ class GMasterBase(object):                  try:                      conn, _ = chan.accept()                      try: -                        conn.send("  | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info())) +                        conn.send("  | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info()))                      except:                          exc = sys.exc_info()[1]                          if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ @@ -536,7 +653,7 @@ class GMasterBase(object):          ):              return          chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) -        state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket") +        state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")          try:              os.unlink(state_socket)          except: @@ -559,22 +676,6 @@ class GMasterBase(object):          t.start()          self.checkpoint_thread = t -    def crawl_loop(self): -        """start the keep-alive thread and iterate .crawl""" -        timo = int(gconf.timeout or 0) -        if timo > 0: -            def keep_alive(): -                while True: -                    vi, gap = self.keepalive_payload_hook(timo, timo * 0.5) -                    self.slave.server.keep_alive(vi) -                    time.sleep(gap) -            t = Thread(target=keep_alive) -            t.start() -        self.lastreport['time'] = time.time() -        self.crawlStartTime=datetime.now() -        while not self.terminate: -            self.crawl() -      def add_job(self, path, label, job, *a, **kw):          """insert @job function to job table at @path with @label"""          if self.jobtab.get(path) == None: @@ -600,7 +701,7 @@ class GMasterBase(object):              ret = j[-1]()              if not ret:                  succeed = False -        if succeed: +        if succeed and not args[0] == None:              self.sendmark(path, *args)          return succeed @@ -653,6 +754,319 @@ class GMasterBase(object):                        tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))          return newstate, param.state_change +class GMasterChangelogMixin(GMasterCommon): +    """ changelog based change detection and syncing """ + +    # index for change type and entry +    IDX_START = 0 +    IDX_END   = 2 + +    POS_GFID   = 0 +    POS_TYPE   = 1 +    POS_ENTRY1 = 2 +    POS_ENTRY2 = 3  # renames + +    _CL_TYPE_DATA_PFX     = "D " +    _CL_TYPE_METADATA_PFX = "M " +    _CL_TYPE_ENTRY_PFX    = "E " + +    TYPE_GFID  = [_CL_TYPE_DATA_PFX] # ignoring metadata ops +    TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX] + +    # flat directory heirarchy for gfid based access +    FLAT_DIR_HIERARCHY = '.' + +    def fallback_xsync(self): +        logging.info('falling back to xsync mode') +        gconf.configinterface.set('change-detector', 'xsync') +        selfkill() + +    def setup_working_dir(self): +        workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) +        logfile = os.path.join(workdir, 'changes.log') +        logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) +        return (workdir, logfile) + +    def lstat(self, e): +        try: +            return os.lstat(e) +        except (IOError, OSError): +            ex = sys.exc_info()[1] +            if ex.errno == ENOENT: +                return ex.errno +            else: +                raise + +    # sync data +    def syncdata(self, datas): +        logging.debug('datas: %s' % (datas)) +        for data in datas: +            logging.debug('candidate for syncing %s' % data) +            pb = self.syncer.add(data) +            timeA = datetime.now() +            def regjob(se, xte, pb): +                rv = pb.wait() +                if rv[0]: +                    logging.debug('synced ' + se) +                    # update stats +                    timeB = datetime.now() +                    self.crawl_stats['last_synctime'] = timeB - timeA +                    self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) +                    self.crawl_stats['files_synced'] += 1 +                    self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced + +                    # cumulative statistics +                    self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced +                    self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) +                    self.total_crawl_stats['files_synced'] += 1 +                    return True +                else: +                    if rv[1] in [23, 24]: +                        # stat to check if the file exist +                        st = self.lstat(se) +                        if isinstance(st, int): +                            # file got unlinked in the interim +                            return True +                    logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) +            self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) +        if self.wait(self.FLAT_DIR_HIERARCHY, None): +            self.update_crawl_data() +            return True + +    def process_change(self, change, done): +        clist   = [] +        entries = [] +        purges = set() +        links = set() +        datas = set() +        pfx = gauxpfx() +        try: +            f = open(change, "r") +            clist = f.readlines() +            f.close() +        except IOError: +            raise + +        def edct(op, **ed): +            dct = {} +            dct['op'] = op +            for k in ed: +                if k == 'stat': +                    st = ed[k] +                    dst = dct['stat'] = {} +                    dst['uid'] = st.st_uid +                    dst['gid'] = st.st_gid +                    dst['mode'] = st.st_mode +                else: +                    dct[k] = ed[k] +            return dct +        for e in clist: +            e = e.strip() +            et = e[self.IDX_START:self.IDX_END] +            ec = e[self.IDX_END:].split(' ') +            if et in self.TYPE_ENTRY: +                ty = ec[self.POS_TYPE] +                en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) +                gfid = ec[self.POS_GFID] +                # definitely need a better way bucketize entry ops +                if ty in ['UNLINK', 'RMDIR']: +                  entries.append(edct(ty, gfid=gfid, entry=en)) +                  purges.update([os.path.join(pfx, gfid)]) +                  continue +                if not ty == 'RENAME': +                    go = os.path.join(pfx, gfid) +                    st = self.lstat(go) +                    if isinstance(st, int): +                        logging.debug('file %s got purged in the interim' % go) +                        continue +                if ty in ['CREATE', 'MKDIR', 'MKNOD']: +                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) +                elif ty == 'LINK': +                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) +                    links.update([os.path.join(pfx, gfid)]) +                elif ty == 'SYMLINK': +                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en))) +                elif ty == 'RENAME': +                    e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) +                    entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2)) +                else: +                    pass +            elif et in self.TYPE_GFID: +                da = os.path.join(pfx, ec[0]) +                st = self.lstat(da) +                if isinstance(st, int): +                    logging.debug('file %s got purged in the interim' % da) +                    continue +                datas.update([da]) +        logging.debug('entries: %s' % repr(entries)) +        # sync namespace +        if (entries): +            self.slave.server.entry_ops(entries) +        # sync data +        if self.syncdata(datas - (purges - links)): +            if done: +                self.master.server.changelog_done(change) +            return True + +    def process(self, changes, done=1): +        for change in changes: +            times = 0 +            while True: +                times += 1 +                logging.debug('processing change %s [%d time(s)]' % (change, times)) +                if self.process_change(change, done): +                    break +                # it's either entry_ops() or Rsync that failed to do it's +                # job. Mostly it's entry_ops() [which currently has a problem +                # of failing to create an entry but failing to return an errno] +                # Therefore we do not know if it's either Rsync or the freaking +                # entry_ops() that failed... so we retry the _whole_ changelog +                # again. +                # TODO: remove entry retries when it's gets fixed. +                logging.warn('incomplete sync, retrying changelog: %s' % change) +                time.sleep(0.5) +            self.turns += 1 + +    def upd_stime(self, stime): +        if stime: +            self.sendmark(self.FLAT_DIR_HIERARCHY, stime) + +    def crawl(self): +        changes = [] +        try: +            self.master.server.changelog_scan() +            self.crawls += 1 +        except OSError: +            self.fallback_xsync() +        changes = self.master.server.changelog_getchanges() +        if changes: +            xtl = self.xtime(self.FLAT_DIR_HIERARCHY) +            if isinstance(xtl, int): +                raise GsyncdError('master is corrupt') +            logging.debug('processing changes %s' % repr(changes)) +            self.process(changes) +            self.upd_stime(xtl) + +    def register(self): +        (workdir, logfile) = self.setup_working_dir() +        self.sleep_interval = int(gconf.change_interval) +        # register with the changelog library +        try: +            # 9 == log level (DEBUG) +            # 5 == connection retries +            self.master.server.changelog_register(gconf.local_path, +                                                  workdir, logfile, 9, 5) +        except OSError: +            self.fallback_xsync() +            # control should not reach here +            raise + +class GMasterXsyncMixin(GMasterChangelogMixin): +    """ + +    This crawl needs to be xtime based (as of now +    it's not. this is beacuse we generate CHANGELOG +    file during each crawl which is then processed +    by process_change()). +    For now it's used as a one-shot initial sync +    mechanism and only syncs directories, regular +    files and symlinks. +    """ + +    def register(self): +        self.sleep_interval = 60 +        self.tempdir = self.setup_working_dir()[0] +        self.tempdir = os.path.join(self.tempdir, 'xsync') +        logging.info('xsync temp directory: %s' % self.tempdir) +        try: +            os.makedirs(self.tempdir) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno == EEXIST and os.path.isdir(self.tempdir): +                pass +            else: +                raise + +    def write_entry_change(self, prefix, data=[]): +        self.fh.write("%s %s\n" % (prefix, ' '.join(data))) + +    def open(self): +        try: +            self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) +            self.fh = open(self.xsync_change, 'w') +        except IOError: +            raise + +    def close(self): +        self.fh.close() + +    def fname(self): +        return self.xsync_change + +    def crawl(self, path='.', xtr=None, done=0): +        """ generate a CHANGELOG file consumable by process_change """ +        if path == '.': +            self.open() +            self.crawls += 1 +        if not xtr: +            # get the root stime and use it for all comparisons +            xtr = self.xtime('.', self.slave) +            if isinstance(xtr, int): +                if xtr != ENOENT: +                    raise GsyncdError('slave is corrupt') +                xtr = self.minus_infinity +        xtl = self.xtime(path) +        if isinstance(xtl, int): +            raise GsyncdError('master is corrupt') +        if xtr == xtl: +            if path == '.': +                self.close() +            return +        self.xtime_reversion_hook(path, xtl, xtr) +        logging.debug("entering " + path) +        dem = self.master.server.entries(path) +        pargfid = self.master.server.gfid(path) +        if isinstance(pargfid, int): +            logging.warn('skipping directory %s' % (path)) +        for e in dem: +            bname = e +            e = os.path.join(path, e) +            st = self.lstat(e) +            if isinstance(st, int): +                logging.warn('%s got purged in the interim..' % e) +                continue +            gfid = self.master.server.gfid(e) +            if isinstance(gfid, int): +                logging.warn('skipping entry %s..' % (e)) +                continue +            xte = self.xtime(e) +            if isinstance(xte, int): +                raise GsyncdError('master is corrupt') +            if not self.need_sync(e, xte, xtr): +                continue +            mo = st.st_mode +            if stat.S_ISDIR(mo): +                self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))]) +                self.crawl(e, xtr) +            elif stat.S_ISREG(mo): +                self.write_entry_change("E", [gfid, 'CREATE', escape(os.path.join(pargfid, bname))]) +                self.write_entry_change("D", [gfid]) +            elif stat.S_ISLNK(mo): +                self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) +            else: +                logging.info('ignoring %s' % e) +        if path == '.': +            logging.info('processing xsync changelog %s' % self.fname()) +            self.close() +            self.process([self.fname()], done) +            self.upd_stime(xtl) + +class GMasterXtimeMixin(GMasterCommon): +    """ xtime based change detection and syncing """ + +    def register(self): +        pass +      def crawl(self, path='.', xtl=None):          """crawling... @@ -691,46 +1105,6 @@ class GMasterBase(object):          assert that the file systems (master / slave) underneath do not change and actions          taken upon some condition will not lose their context by the time they are performed.          """ -        if path == '.': -            if self.start: -                self.crawls += 1 -                logging.debug("... crawl #%d done, took %.6f seconds" % \ -                              (self.crawls, time.time() - self.start)) -            time.sleep(1) -            self.start = time.time() -            should_display_info = self.start - self.lastreport['time'] >= 60 -            if should_display_info: -                logging.info("completed %d crawls, %d turns", -                             self.crawls - self.lastreport['crawls'], -                             self.turns - self.lastreport['turns']) -                self.lastreport.update(crawls = self.crawls, -                                       turns = self.turns, -                                       time = self.start) -            volinfo_sys, state_change = self.volinfo_hook() -            if self.inter_master: -                self.volinfo = volinfo_sys[self.KFGN] -            else: -                self.volinfo = volinfo_sys[self.KNAT] -            if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): -                logging.info('new master is %s', self.uuid) -                if self.volinfo: -                    logging.info("%s master with volume id %s ..." % \ -                                 (self.inter_master and "intermediate" or "primary", -                                  self.uuid)) -            if state_change == self.KFGN: -               gconf.configinterface.set('volume_id', self.uuid) -            if self.volinfo: -                if self.volinfo['retval']: -                    raise GsyncdError ("master is corrupt") -                self.start_checkpoint_thread() -            else: -                if should_display_info or self.crawls == 0: -                    if self.inter_master: -                        logging.info("waiting for being synced from %s ..." % \ -                                     self.volinfo_state[self.KFGN]['uuid']) -                    else: -                        logging.info("waiting for volume info ...") -                return          logging.debug("entering " + path)          if not xtl:              xtl = self.xtime(path) @@ -806,6 +1180,7 @@ class GMasterBase(object):              st = indulgently(e, lambda e: os.lstat(e))              if st == False:                  continue +              mo = st.st_mode              adct = {'own': (st.st_uid, st.st_gid)}              if stat.S_ISLNK(mo): @@ -815,16 +1190,19 @@ class GMasterBase(object):              elif stat.S_ISREG(mo):                  logging.debug("syncing %s ..." % e)                  pb = self.syncer.add(e) -                timeA=datetime.now() +                timeA = datetime.now()                  def regjob(e, xte, pb): -                    if pb.wait(): +                    if pb.wait()[0]:                          logging.debug("synced " + e)                          self.sendmark_regular(e, xte) - -                        timeB=datetime.now() -                        self.lastSyncTime=timeB-timeA -                        self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6) -                        self.filesSynced=self.filesSynced+1 +                        # update stats +                        timeB = datetime.now() +                        self.crawl_stats['last_synctime'] = timeB - timeA +                        self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) +                        self.crawl_stats['files_synced'] += 1 +                        self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) +                        self.total_crawl_stats['files_synced'] += 1 +                        self.update_crawl_data()                          return True                      else:                          logging.warn("failed to sync " + e) @@ -841,6 +1219,7 @@ class GMasterBase(object):          if path == '.':              self.wait(path, xtl) +  class BoxClosedErr(Exception):      pass @@ -920,7 +1299,7 @@ class Syncer(object):          self.slave = slave          self.lock = Lock()          self.pb = PostBox() -        self.bytesSynced=0 +        self.bytes_synced = 0          for i in range(int(gconf.sync_jobs)):              t = Thread(target=self.syncjob)              t.start() @@ -940,13 +1319,10 @@ class Syncer(object):              pb.close()              po = self.slave.rsync(pb)              if po.returncode == 0: -                regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE) -                if regEx: -                        self.bytesSynced+=(int(regEx.group(1)))/1024 -                ret = True +                ret = (True, 0)              elif po.returncode in (23, 24):                  # partial transfer (cf. rsync(1)), that's normal -                ret = False +                ret = (False, po.returncode)              else:                  po.errfail()              pb.wakeup(ret) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index b8956dcc2b9..badd0d9c5f8 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -3,26 +3,94 @@ import sys  import time  import signal  import logging +import uuid +import xml.etree.ElementTree as XET +from subprocess import PIPE +from resource import Popen, FILE, GLUSTER, SSH +from threading import Lock  from gconf import gconf -from syncdutils import update_file, select, waitpid, set_term_handler +from syncdutils import update_file, select, waitpid, set_term_handler, is_host_local, GsyncdError +from syncdutils import escape, Thread, finalize, memoize + +class Volinfo(object): +    def __init__(self, vol, host='localhost', prelude=[]): +        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, 'volume', 'info', vol], +                   stdout=PIPE, stderr=PIPE) +        vix = po.stdout.read() +        po.wait() +        po.terminate_geterr() +        vi = XET.fromstring(vix) +        if vi.find('opRet').text != '0': +            if prelude: +                via = '(via %s) ' % prelude.join(' ') +            else: +                via = ' ' +            raise GsyncdError('getting volume info of %s%s failed with errorcode %s', +                              (vol, via, vi.find('opErrno').text)) +        self.tree = vi +        self.volume = vol +        self.host = host + +    def get(self, elem): +        return self.tree.findall('.//' + elem) + +    @property +    @memoize +    def bricks(self): +        def bparse(b): +            host, dirp = b.text.split(':', 2) +            return {'host': host, 'dir': dirp} +        return [ bparse(b) for b in self.get('brick') ] + +    @property +    @memoize +    def uuid(self): +        ids = self.get('id') +        if len(ids) != 1: +            raise GsyncdError("volume info of %s obtained from %s: ambiguous uuid", +                              self.volume, self.host) +        return ids[0].text +  class Monitor(object):      """class which spawns and manages gsyncd workers""" +    ST_INIT     = 'Initializing...' +    ST_STABLE   = 'Stable' +    ST_FAULTY   = 'faulty' +    ST_INCON    = 'inconsistent' +    _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON] +      def __init__(self): -        self.state = None +        self.lock = Lock() +        self.state = {} -    def set_state(self, state): +    def set_state(self, state, w=None):          """set the state that can be used by external agents             like glusterd for status reporting""" -        if state == self.state: -            return -        self.state = state -        logging.info('new state: %s' % state) -        if getattr(gconf, 'state_file', None): -            update_file(gconf.state_file, lambda f: f.write(state + '\n')) - -    def monitor(self): +        computestate = lambda: self.state and self._ST_ORD[max(self._ST_ORD.index(s) for s in self.state.values())] +        if w: +            self.lock.acquire() +            old_state = computestate() +            self.state[w] = state +            state = computestate() +            self.lock.release() +            if state != old_state: +                self.set_state(state) +        else: +            logging.info('new state: %s' % state) +            if getattr(gconf, 'state_file', None): +                update_file(gconf.state_file, lambda f: f.write(state + '\n')) + +    @staticmethod +    def terminate(): +        # relax one SIGTERM by setting a handler that sets back +        # standard handler +        set_term_handler(lambda *a: set_term_handler()) +        # give a chance to graceful exit +        os.kill(-os.getpid(), signal.SIGTERM) + +    def monitor(self, w, argv, cpids):          """the monitor loop          Basic logic is a blantantly simple blunt heuristics: @@ -41,27 +109,8 @@ class Monitor(object):          blown worker blows up on EPIPE if the net goes down,          due to the keep-alive thread)          """ -        def sigcont_handler(*a): -            """ -            Re-init logging and send group kill signal -            """ -            md = gconf.log_metadata -            logging.shutdown() -            lcls = logging.getLoggerClass() -            lcls.setup(label=md.get('saved_label'), **md) -            pid = os.getpid() -            os.kill(-pid, signal.SIGUSR1) -        signal.signal(signal.SIGUSR1, lambda *a: ()) -        signal.signal(signal.SIGCONT, sigcont_handler) - -        argv = sys.argv[:] -        for o in ('-N', '--no-daemon', '--monitor'): -            while o in argv: -                argv.remove(o) -        argv.extend(('-N', '-p', '')) -        argv.insert(0, os.path.basename(sys.executable)) -        self.set_state('starting...') +        self.set_state(self.ST_INIT, w)          ret = 0          def nwait(p, o=0):              p2, r = waitpid(p, o) @@ -83,7 +132,13 @@ class Monitor(object):              cpid = os.fork()              if cpid == 0:                  os.close(pr) -                os.execv(sys.executable, argv + ['--feedback-fd', str(pw)]) +                os.execv(sys.executable, argv + ['--feedback-fd', str(pw), +                                                 '--local-path', w[0], +                                                 '--local-id', '.' + escape(w[0]), +                                                 '--resource-remote', w[1]]) +            self.lock.acquire() +            cpids.add(cpid) +            self.lock.release()              os.close(pw)              t0 = time.time()              so = select((pr,), (), (), conn_timeout)[0] @@ -103,27 +158,104 @@ class Monitor(object):              else:                  logging.debug("worker not confirmed in %d sec, aborting it" % \                                conn_timeout) -                # relax one SIGTERM by setting a handler that sets back -                # standard handler -                set_term_handler(lambda *a: set_term_handler()) -                # give a chance to graceful exit -                os.kill(-os.getpid(), signal.SIGTERM) +                self.terminate()                  time.sleep(1)                  os.kill(cpid, signal.SIGKILL)                  ret = nwait(cpid)              if ret == None: -                self.set_state('OK') +                self.set_state(self.ST_STABLE, w)                  ret = nwait(cpid)              if exit_signalled(ret):                  ret = 0              else:                  ret = exit_status(ret)                  if ret in (0,1): -                    self.set_state('faulty') +                    self.set_state(self.ST_FAULTY, w)              time.sleep(10) -        self.set_state('inconsistent') +        self.set_state(self.ST_INCON, w)          return ret -def monitor(): +    def multiplex(self, wspx, suuid): +        def sigcont_handler(*a): +            """ +            Re-init logging and send group kill signal +            """ +            md = gconf.log_metadata +            logging.shutdown() +            lcls = logging.getLoggerClass() +            lcls.setup(label=md.get('saved_label'), **md) +            pid = os.getpid() +            os.kill(-pid, signal.SIGUSR1) +        signal.signal(signal.SIGUSR1, lambda *a: ()) +        signal.signal(signal.SIGCONT, sigcont_handler) + +        argv = sys.argv[:] +        for o in ('-N', '--no-daemon', '--monitor'): +            while o in argv: +                argv.remove(o) +        argv.extend(('-N', '-p', '', '--slave-id', suuid)) +        argv.insert(0, os.path.basename(sys.executable)) + +        cpids = set() +        ta = [] +        for wx in wspx: +            def wmon(w): +                cpid, _ = self.monitor(w, argv, cpids) +                terminate() +                time.sleep(1) +                self.lock.acquire() +                for cpid in cpids: +                    os.kill(cpid, signal.SIGKILL) +                self.lock.release() +                finalize(exval=1) +            t = Thread(target = wmon, args=[wx]) +            t.start() +            ta.append(t) +        for t in ta: +            t.join() + +def distribute(*resources): +    master, slave = resources +    mvol = Volinfo(master.volume, master.host) +    logging.debug('master bricks: ' + repr(mvol.bricks)) +    locmbricks = [ b['dir'] for b in mvol.bricks if is_host_local(b['host']) ] +    prelude  = [] +    si = slave +    if isinstance(slave, SSH): +        prelude = gconf.ssh_command.split() + [slave.remote_addr] +        si = slave.inner_rsc +        logging.debug('slave SSH gateway: ' + slave.remote_addr) +    if isinstance(si, FILE): +        sbricks = {'host': 'localhost', 'dir': si.path} +        suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True)) +    elif isinstance(si, GLUSTER): +        svol = Volinfo(si.volume, si.host, prelude) +        sbricks = svol.bricks +        suuid = svol.uuid +    else: +        raise GsyncdError("unkown slave type " + slave.url) +    logging.info('slave bricks: ' + repr(sbricks)) +    if isinstance(si, FILE): +        slaves = [ slave.url ] +    else: +        slavenodes = set(b['host'] for b in sbricks) +        if isinstance(slave, SSH) and not gconf.isolated_slave: +            rap = SSH.parse_ssh_address(slave.remote_addr) +            slaves = [ 'ssh://' + rap['user'] + '@' + h + ':' + si.url for h in slavenodes ] +        else: +            slavevols = [ h + ':' + si.volume for h in slavenodes ] +            if isinstance(slave, SSH): +                slaves = [ 'ssh://' + rap.remote_addr + ':' + v for v in slavevols ] +            else: +                slaves = slavevols +    locmbricks.sort() +    slaves.sort() +    workerspex = [] +    for i in range(len(locmbricks)): +        workerspex.append((locmbricks[i], slaves[i % len(slaves)])) +    logging.info('worker specs: ' + repr(workerspex)) +    return workerspex, suuid + +def monitor(*resources):      """oh yeah, actually Monitor is used as singleton, too""" -    return Monitor().monitor() +    return Monitor().multiplex(*distribute(*resources)) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 73102fbcb44..52989fe28cc 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -5,13 +5,14 @@ import stat  import time  import fcntl  import errno +import types  import struct  import socket  import logging  import tempfile  import threading  import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY  from select import error as SelectError  from gconf import gconf @@ -19,7 +20,8 @@ import repce  from repce import RepceServer, RepceClient  from  master import gmaster_builder  import syncdutils -from syncdutils import GsyncdError, select, privileged, boolify +from syncdutils import GsyncdError, select, privileged, boolify, funcode +from syncdutils import umask, entry2pb, gauxpfx, errno_wrap  UrlRX  = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -105,7 +107,18 @@ class _MetaXattr(object):              setattr(self, m, getattr(LXattr, m))          return getattr(self, meth) +class _MetaChangelog(object): +    def __getattr__(self, meth): +        from libgfchangelog import Changes as LChanges +        xmeth = [ m for m in dir(LChanges) if m[0] != '_' ] +        if not meth in xmeth: +            return +        for m in xmeth: +            setattr(self, m, getattr(LChanges, m)) +        return getattr(self, meth) +  Xattr = _MetaXattr() +Changes = _MetaChangelog()  class Popen(subprocess.Popen): @@ -245,10 +258,24 @@ class Server(object):      and classmethods and is used directly, without instantiation.)      """ -    GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs" +    GX_NSPACE_PFX = (privileged() and "trusted" or "system") +    GX_NSPACE = GX_NSPACE_PFX + ".glusterfs"      NTV_FMTSTR = "!" + "B"*19 + "II"      FRGN_XTRA_FMT = "I"      FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT +    GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + +    local_path = '' + +    @classmethod +    def _fmt_mknod(cls, l): +        return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1) +    @classmethod +    def _fmt_mkdir(cls, l): +        return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1) +    @classmethod +    def _fmt_symlink(cls, l1, l2): +        return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1)      def _pathguard(f):          """decorator method that checks @@ -257,22 +284,21 @@ class Server(object):          point out of the managed tree          """ -        fc = getattr(f, 'func_code', None) -        if not fc: -            # python 3 -            fc = f.__code__ +        fc = funcode(f)          pi = list(fc.co_varnames).index('path')          def ff(*a):              path = a[pi]              ps = path.split('/')              if path[0] == '/' or '..' in ps:                  raise ValueError('unsafe path') +            a = list(a) +            a[pi] = os.path.join(a[0].local_path, path)              return f(*a)          return ff -    @staticmethod +    @classmethod      @_pathguard -    def entries(path): +    def entries(cls, path):          """directory entries in an array"""          # prevent symlinks being followed          if not stat.S_ISDIR(os.lstat(path).st_mode): @@ -371,6 +397,18 @@ class Server(object):                  raise      @classmethod +    def gfid(cls, gfidpath): +        return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + +    @classmethod +    def node_uuid(cls, path='.'): +        try: +            uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) +            return uuid_l[:-1].split(' ') +        except OSError: +            raise + +    @classmethod      def xtime_vec(cls, path, *uuids):          """vectored version of @xtime @@ -402,9 +440,96 @@ class Server(object):          for u,t in mark_dct.items():              cls.set_xtime(path, u, t) -    @staticmethod +    @classmethod +    def entry_ops(cls, entries): +        pfx = gauxpfx() +        logging.debug('entries: %s' % repr(entries)) +        # regular file +        def entry_pack_reg(gf, bn, st): +            blen = len(bn) +            mo = st['mode'] +            return struct.pack(cls._fmt_mknod(blen), +                               st['uid'], st['gid'], +                               gf, mo, bn, +                               stat.S_IMODE(mo), 0, umask()) +        # mkdir +        def entry_pack_mkdir(gf, bn, st): +            blen = len(bn) +            mo = st['mode'] +            return struct.pack(cls._fmt_mkdir(blen), +                               st['uid'], st['gid'], +                               gf, mo, bn, +                               stat.S_IMODE(mo), umask()) +        #symlink +        def entry_pack_symlink(gf, bn, lnk, st): +            blen = len(bn) +            llen = len(lnk) +            return struct.pack(cls._fmt_symlink(blen, llen), +                               st['uid'], st['gid'], +                               gf, st['mode'], bn, lnk) +        def entry_purge(entry, gfid): +            # This is an extremely racy code and needs to be fixed ASAP. +            # The GFID check here is to be sure that the pargfid/bname +            # to be purged is the GFID gotten from the changelog. +            # (a stat(changelog_gfid) would also be valid here) +            # The race here is between the GFID check and the purge. +            disk_gfid = cls.gfid(entry) +            if isinstance(disk_gfid, int): +                return +            if not gfid == disk_gfid: +                return +            er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR]) +            if isinstance(er, int): +                if er == EISDIR: +                    er = errno_wrap(os.rmdir, [entry], [ENOENT, ENOTEMPTY]) +                    if er == ENOTEMPTY: +                        return er +        for e in entries: +            blob = None +            op = e['op'] +            gfid = e['gfid'] +            entry = e['entry'] +            (pg, bname) = entry2pb(entry) +            if op in ['RMDIR', 'UNLINK']: +                while True: +                    er = entry_purge(entry, gfid) +                    if isinstance(er, int): +                        time.sleep(1) +                    else: +                        break +            elif op == 'CREATE': +                blob = entry_pack_reg(gfid, bname, e['stat']) +            elif op == 'MKDIR': +                blob = entry_pack_mkdir(gfid, bname, e['stat']) +            elif op == 'LINK': +                errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST]) +            elif op == 'SYMLINK': +                blob = entry_pack_symlink(gfid, bname, e['link'], e['stat']) +            elif op == 'RENAME': +                en = e['entry1'] +                errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) +            if blob: +                errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [ENOENT, EEXIST]) + +    @classmethod +    def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0): +        Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) + +    @classmethod +    def changelog_scan(cls): +        Changes.cl_scan() + +    @classmethod +    def changelog_getchanges(cls): +        return Changes.cl_getchanges() + +    @classmethod +    def changelog_done(cls, clfile): +        Changes.cl_done(clfile) + +    @classmethod      @_pathguard -    def setattr(path, adct): +    def setattr(cls, path, adct):          """set file attributes          @adct is a dict, where 'own', 'mode' and 'times' @@ -537,10 +662,10 @@ class SlaveRemote(object):              raise GsyncdError("no files to sync")          logging.debug("files: " + ", ".join(files))          argv = gconf.rsync_command.split() + \ -               ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ +               ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \                 gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \                 ['.'] + list(args) -        po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) +        po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE)          for f in files:              po.stdin.write(f)              po.stdin.write('\0') @@ -685,7 +810,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      def can_connect_to(self, remote):          """determine our position in the connectibility matrix""" -        return True +        return not remote or \ +               (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))      class Mounter(object):          """Abstract base class for mounter backends""" @@ -864,6 +990,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          sup(self, *a, **kw)          self.slavedir = "/proc/%d/cwd" % self.server.pid() +    def gmaster_instantiate_tuple(self, slave): +        """return a tuple of the 'one shot' and the 'main crawl' class instance""" +        return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave)) +      def service_loop(self, *args):          """enter service loop @@ -873,7 +1003,41 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          - else do that's what's inherited          """          if args: -            gmaster_builder()(self, args[0]).crawl_loop() +            slave = args[0] +            if gconf.local_path: +                class brickserver(FILE.FILEServer): +                    local_path = gconf.local_path +                    aggregated = self.server +                    @classmethod +                    def entries(cls, path): +                        e = super(brickserver, cls).entries(path) +                        # on the brick don't mess with /.glusterfs +                        if path == '.': +                            try: +                                e.remove('.glusterfs') +                            except ValueError: +                                pass +                        return e +                if gconf.slave_id: +                    # define {,set_}xtime in slave, thus preempting +                    # the call to remote, so that it takes data from +                    # the local brick +                    slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) +                    slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server) +                (g1, g2) = self.gmaster_instantiate_tuple(slave) +                g1.master.server = brickserver +                g2.master.server = brickserver +            else: +                (g1, g2) = self.gmaster_instantiate_tuple(slave) +                g1.master.server.aggregated = gmaster.master.server +                g2.master.server.aggregated = gmaster.master.server +            # bad bad bad: bad way to do things like this +            # need to make this elegant +            # register the crawlers and start crawling +            g1.register() +            g2.register() +            g1.crawlwrap(oneshot=True) +            g2.crawlwrap()          else:              sup(self, *args) @@ -893,13 +1057,18 @@ class SSH(AbstractUrl, SlaveRemote):                                            '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))          self.inner_rsc = parse_url(inner_url) -    def canonical_path(self): -        m = re.match('([^@]+)@(.+)', self.remote_addr) +    @staticmethod +    def parse_ssh_address(addr): +        m = re.match('([^@]+)@(.+)', addr)          if m:              u, h = m.groups()          else: -            u, h = syncdutils.getusername(), self.remote_addr -        remote_addr = '@'.join([u, gethostbyname(h)]) +            u, h = syncdutils.getusername(), addr +        return {'user': u, 'host': h} + +    def canonical_path(self): +        rap = self.parse_ssh_address(self.remote_addr) +        remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])])          return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])      def can_connect_to(self, remote): diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 0764c07904d..720200018e5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -5,8 +5,9 @@ import time  import fcntl  import shutil  import logging +import socket  from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, errorcode +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode  from signal import signal, SIGTERM, SIGKILL  from time import sleep  import select as oselect @@ -25,6 +26,15 @@ try:  except ImportError:      import urllib +try: +    from hashlib import md5 as md5 +except ImportError: +    # py 2.4 +    from md5 import new as md5 + +# auxillary gfid based access prefix +_CL_AUX_GFID_PFX = ".gfid/" +  def escape(s):      """the chosen flavor of string escaping, used all over         to turn whatever data to creatable representation""" @@ -286,3 +296,93 @@ def waitpid (*a):  def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):      signal(SIGTERM, hook) + +def is_host_local(host): +    locaddr = False +    for ai in socket.getaddrinfo(host, None): +        # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125 +        if ai[0] == socket.AF_INET: +            if ai[-1][0].split(".")[0] == "127": +                locaddr = True +                break +        elif ai[0] == socket.AF_INET6: +            if ai[-1][0] == "::1": +                locaddr = True +                break +        else: +            continue +        try: +            # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, +            # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 +            s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) +        except socket.error: +            ex = sys.exc_info()[1] +            if ex.errno != EPERM: +                raise +            f = None +            try: +                f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") +                if int(f.read()) != 0: +                    raise GsyncdError( +                            "non-local bind is set and not allowed to create raw sockets, " +                            "cannot determine if %s is local" % host) +                s = socket.socket(ai[0], socket.SOCK_DGRAM) +            finally: +                if f: +                    f.close() +        try: +            s.bind(ai[-1]) +            locaddr = True +            break +        except: +            pass +        s.close() +    return locaddr + +def funcode(f): +    fc = getattr(f, 'func_code', None) +    if not fc: +        # python 3 +        fc = f.__code__ +    return fc + +def memoize(f): +    fc = funcode(f) +    fn = fc.co_name +    def ff(self, *a, **kw): +        rv = getattr(self, '_' + fn, None) +        if rv == None: +            rv = f(self, *a, **kw) +            setattr(self, '_' + fn, rv) +        return rv +    return ff + +def umask(): +    return os.umask(0) + +def entry2pb(e): +    return e.rsplit('/', 1) + +def gauxpfx(): +    return _CL_AUX_GFID_PFX + +def md5hex(s): +    return md5(s).hexdigest() + +def selfkill(sig=SIGTERM): +    os.kill(os.getpid(), sig) + +def errno_wrap(call, arg=[], errnos=[]): +    """ wrapper around calls resilient to errnos. +    retry in case of ESTALE +    """ +    while True: +        try: +            return call(*arg) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno in errnos: +                return ex.errno +            if not ex.errno == ESTALE: +                raise +            time.sleep(0.5)  # retry the call diff --git a/glusterfs.spec.in b/glusterfs.spec.in index 2e94c3315dd..e92cd6e0a80 100644 --- a/glusterfs.spec.in +++ b/glusterfs.spec.in @@ -520,6 +520,7 @@ fi  %defattr(-,root,root)  %{_libexecdir}/glusterfs/gsyncd  %{_libexecdir}/glusterfs/python/syncdaemon/* +%{_libexecdir}/glusterfs/gverify.sh  %ghost %dir %attr(0755,-,-) %{_sharedstatedir}/glusterd/geo-replication  %ghost %attr(0644,-,-) %{_sharedstatedir}/glusterd/geo-replication/gsyncd.conf  %endif @@ -696,6 +697,9 @@ if [ $1 -ge 1 ]; then  fi  %changelog +* Thu Jul 25 2013 Aravinda VK <avishwan@redhat.com> +- Added gverify.sh to %{_libexecdir}/glusterfs directory. +  * Thu Jul 25 2013 Harshavardhana <fharshav@redhat.com>  - Allow to build with '--without bd' to disable 'bd' xlator diff --git a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c index 7e2ab833f9e..6440efa153e 100644 --- a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c +++ b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c @@ -33,6 +33,10 @@ static char *gsync_reserved_opts[] = {          "session-owner",          "state-socket-unencoded",          "socketdir", +        "ignore-deletes", +        "local-id", +        "local-path", +        "slave-id",          NULL  }; diff --git a/xlators/mgmt/glusterd/src/glusterd.c b/xlators/mgmt/glusterd/src/glusterd.c index 84c726577e0..53ca33ef92a 100644 --- a/xlators/mgmt/glusterd/src/glusterd.c +++ b/xlators/mgmt/glusterd/src/glusterd.c @@ -633,10 +633,20 @@ configure_syncdaemon (glusterd_conf_t *conf)          runinit_gsyncd_setrx (&runner, conf);          runner_add_args (&runner,                           "gluster-log-file", -                         DEFAULT_LOG_FILE_DIRECTORY"/"GEOREP"/${mastervol}/${eSlave}.gluster.log", +                         DEFAULT_LOG_FILE_DIRECTORY"/"GEOREP"/${mastervol}/${eSlave}${local_id}.gluster.log",                           ".", ".", NULL);          RUN_GSYNCD_CMD; +        /* ignore-deletes */ +        runinit_gsyncd_setrx (&runner, conf); +        runner_add_args (&runner, "ignore-deletes", "true", ".", ".", NULL); +        RUN_GSYNCD_CMD; + +        /* special-sync-mode */ +        runinit_gsyncd_setrx (&runner, conf); +        runner_add_args (&runner, "special-sync-mode", "partial", ".", ".", NULL); +        RUN_GSYNCD_CMD; +          /************           * slave pre-configuration           ************/  | 
