summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cli/src/cli-cmd-parser.c23
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py56
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py129
-rw-r--r--xlators/features/marker/utils/syncdaemon/syncdutils.py6
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-geo-rep.c206
-rw-r--r--xlators/mgmt/glusterd/src/glusterd.c7
6 files changed, 397 insertions, 30 deletions
diff --git a/cli/src/cli-cmd-parser.c b/cli/src/cli-cmd-parser.c
index f447ccae3d5..931d89ae1bd 100644
--- a/cli/src/cli-cmd-parser.c
+++ b/cli/src/cli-cmd-parser.c
@@ -1577,6 +1577,29 @@ cli_cmd_gsync_set_parse (const char **words, int wordcount, dict_t **options)
}
append_str[append_len - 2] = '\0';
+ /* "checkpoint now" is special: we resolve that "now" */
+ if (strcmp (words[cmdi + 1], "checkpoint") == 0 &&
+ strcmp (append_str, "now") == 0) {
+ struct timeval tv = {0,};
+ struct tm *tm = NULL;
+
+ ret = gettimeofday (&tv, NULL);
+ if (ret == -1)
+ goto out;
+ tm = localtime (&tv.tv_sec);
+
+ GF_FREE (append_str);
+ append_str = GF_CALLOC (1, 300, cli_mt_append_str);
+ if (!append_str) {
+ ret = -1;
+ goto out;
+ }
+ strcpy (append_str, "as of ");
+ strftime (append_str + strlen ("as of "),
+ 300 - strlen ("as of "),
+ "%Y-%m-%d %H:%M:%S", tm);
+ }
+
ret = dict_set_dynstr (dict, "op_value", append_str);
}
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
index 9ac32ce4267..d68cea6725e 100644
--- a/xlators/features/marker/utils/syncdaemon/gsyncd.py
+++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py
@@ -58,6 +58,25 @@ class GLogger(Logger):
logging.getLogger().handlers = []
logging.basicConfig(**lprm)
+ @classmethod
+ def _gsyncd_loginit(cls, **kw):
+ lkw = {}
+ if gconf.log_level:
+ lkw['level'] = gconf.log_level
+ if kw.get('log_file'):
+ if kw['log_file'] in ('-', '/dev/stderr'):
+ lkw['stream'] = sys.stderr
+ elif kw['log_file'] == '/dev/stdout':
+ lkw['stream'] = sys.stdout
+ else:
+ lkw['filename'] = kw['log_file']
+
+ cls.setup(label=kw.get('label'), **lkw)
+
+ lkw.update({'saved_label': kw.get('label')})
+ gconf.log_metadata = lkw
+ gconf.log_exit = True
+
def startup(**kw):
"""set up logging, pidfile grabbing, daemonization"""
if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn':
@@ -88,22 +107,7 @@ def startup(**kw):
select((x,), (), ())
os.close(x)
- lkw = {}
- if gconf.log_level:
- lkw['level'] = gconf.log_level
- if kw.get('log_file'):
- if kw['log_file'] in ('-', '/dev/stderr'):
- lkw['stream'] = sys.stderr
- elif kw['log_file'] == '/dev/stdout':
- lkw['stream'] = sys.stdout
- else:
- lkw['filename'] = kw['log_file']
-
- GLogger.setup(label=kw.get('label'), **lkw)
-
- lkw.update({'saved_label': kw.get('label')})
- gconf.log_metadata = lkw
- gconf.log_exit = True
+ GLogger._gsyncd_loginit(**kw)
def main():
"""main routine, signal/exception handling boilerplates"""
@@ -166,6 +170,8 @@ def main_i():
op.add_option('--sync-jobs', metavar='N', type=int, default=3)
op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
op.add_option('--allow-network', metavar='IPS', default='')
+ op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs)
+ op.add_option('--checkpoint', metavar='LABEL', default='')
op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
# duh. need to specify dest or value will be mapped to None :S
@@ -278,6 +284,7 @@ def main_i():
rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf")
gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict)
+ checkpoint_change = False
if confdata:
opt_ok = norm(confdata.opt) in tunables + [None]
if confdata.op == 'check':
@@ -293,7 +300,14 @@ def main_i():
gcnf.set(confdata.opt, confdata.val, confdata.rx)
elif confdata.op == 'del':
gcnf.delete(confdata.opt, confdata.rx)
- return
+ # when modifying checkpoint, it's important to make a log
+ # of that, so in that case we go on to set up logging even
+ # if its just config invocation
+ if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \
+ not confdata.rx:
+ checkpoint_change = True
+ if not checkpoint_change:
+ return
gconf.__dict__.update(defaults.__dict__)
gcnf.update_to(gconf.__dict__)
@@ -331,6 +345,14 @@ def main_i():
raise GsyncdError('cannot recognize log level "%s"' % lvl0)
gconf.log_level = lvl2
+ if checkpoint_change:
+ GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')
+ if confdata.op == 'set':
+ logging.info('checkpoint %s set' % confdata.val)
+ elif confdata.op == 'del':
+ logging.info('checkpoint info was reset')
+ return
+
go_daemon = rconf['go_daemon']
be_monitor = rconf.get('monitor')
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
index 8e196f8c5f4..4826037f134 100644
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ b/xlators/features/marker/utils/syncdaemon/master.py
@@ -5,12 +5,20 @@ import stat
import random
import signal
import logging
+import socket
import errno
-from errno import ENOENT, ENODATA
+from errno import ENOENT, ENODATA, EPIPE
from threading import currentThread, Condition, Lock
+from datetime import datetime
+try:
+ from hashlib import md5 as md5
+except ImportError:
+ # py 2.4
+ from md5 import new as md5
from gconf import gconf
-from syncdutils import FreeObject, Thread, GsyncdError, boolify
+from syncdutils import FreeObject, Thread, GsyncdError, boolify, \
+ escape, unescape, select
URXTIME = (-1, 0)
@@ -113,6 +121,122 @@ class GMaster(object):
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
+ self.checkpoint_thread = None
+
+ @staticmethod
+ def _checkpt_param(chkpt, prm, timish=True):
+ """use config backend to lookup a parameter belonging to
+ checkpoint @chkpt"""
+ cprm = getattr(gconf, 'checkpoint_' + prm, None)
+ if not cprm:
+ return
+ chkpt_mapped, val = cprm.split(':', 1)
+ if unescape(chkpt_mapped) != chkpt:
+ return
+ if timish:
+ val = tuple(int(x) for x in val.split("."))
+ return val
+
+ @staticmethod
+ def _set_checkpt_param(chkpt, prm, val, timish=True):
+ """use config backend to store a parameter associated
+ with checkpoint @chkpt"""
+ if timish:
+ val = "%d.%d" % tuple(val)
+ gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
+
+ @staticmethod
+ def humantime(*tpair):
+ """format xtime-like (sec, nsec) pair to human readable format"""
+ ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\
+ strftime("%Y-%m-%d %H:%M:%S")
+ if len(tpair) > 1:
+ ts += '.' + str(tpair[1])
+ return ts
+
+ def checkpt_service(self, chan, chkpt, tgt):
+ """checkpoint service loop
+
+ monitor and verify checkpoint status for @chkpt, and listen
+ for incoming requests for whom we serve a pretty-formatted
+ status report"""
+ if not chkpt:
+ # dummy loop for the case when there is no checkpt set
+ while True:
+ select([chan], [], [])
+ conn, _ = chan.accept()
+ conn.send('\0')
+ conn.close()
+ completed = self._checkpt_param(chkpt, 'completed')
+ while True:
+ s,_,_ = select([chan], [], [], (not completed) and 5 or None)
+ # either request made and we re-check to not
+ # give back stale data, or we still hunting for completion
+ if tgt < self.volmark:
+ # indexing has been reset since setting the checkpoint
+ status = "is invalid"
+ else:
+ xtr = self.xtime('.', self.slave)
+ if isinstance(xtr, int):
+ raise GsyncdError("slave root directory is unaccessible (%s)",
+ os.strerror(xtr))
+ ncompleted = (xtr >= tgt)
+ if completed and not ncompleted: # stale data
+ logging.warn("completion time %s for checkpoint %s became stale" % \
+ (self.humantime(*completed), chkpt))
+ completed = None
+ gconf.confdata.delete('checkpoint-completed')
+ if ncompleted and not completed: # just reaching completion
+ completed = [ int(x) for x in ("%.6f" % time.time()).split('.') ]
+ self._set_checkpt_param(chkpt, 'completed', completed)
+ logging.info("checkpoint %s completed" % chkpt)
+ status = completed and \
+ "completed at " + self.humantime(completed[0]) or \
+ "not reached yet"
+ if s:
+ conn = None
+ try:
+ conn, _ = chan.accept()
+ try:
+ conn.send(" | checkpoint %s %s\0" % (chkpt, status))
+ except:
+ exc = sys.exc_info()[1]
+ if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
+ exc.errno == EPIPE:
+ logging.debug('checkpoint client disconnected')
+ else:
+ raise
+ finally:
+ if conn:
+ conn.close()
+
+ def start_checkpoint_thread(self):
+ """prepare and start checkpoint service"""
+ if self.checkpoint_thread or not getattr(gconf, 'state_socket_unencoded', None):
+ return
+ chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ state_socket = "/tmp/%s.socket" % md5(gconf.state_socket_unencoded).hexdigest()
+ try:
+ os.unlink(state_socket)
+ except:
+ if sys.exc_info()[0] == OSError:
+ pass
+ chan.bind(state_socket)
+ chan.listen(1)
+ checkpt_tgt = None
+ if gconf.checkpoint:
+ checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target')
+ if not checkpt_tgt:
+ checkpt_tgt = self.xtime('.')
+ if isinstance(checkpt_tgt, int):
+ raise GsyncdError("master root directory is unaccessible (%s)",
+ os.strerror(checkpt_tgt))
+ self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt)
+ logging.debug("checkpoint target %d.%d has been determined for checkpoint %s" % \
+ (checkpt_tgt[0], checkpt_tgt[1], gconf.checkpoint))
+ t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt))
+ t.start()
+ self.checkpoint_thread = t
def crawl_loop(self):
"""start the keep-alive thread and iterate .crawl"""
@@ -291,6 +415,7 @@ class GMaster(object):
if self.volinfo:
if self.volinfo['retval']:
raise GsyncdError ("master is corrupt")
+ self.start_checkpoint_thread()
else:
if should_display_info or self.crawls == 0:
if self.inter_master:
diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py
index f786bc34326..1d4eb20032c 100644
--- a/xlators/features/marker/utils/syncdaemon/syncdutils.py
+++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py
@@ -138,6 +138,12 @@ def finalize(*a, **kw):
raise
if gconf.ssh_ctl_dir and not gconf.cpid:
shutil.rmtree(gconf.ssh_ctl_dir)
+ if getattr(gconf, 'state_socket', None):
+ try:
+ os.unlink(gconf.state_socket)
+ except:
+ if sys.exc_info()[0] == OSError:
+ pass
if gconf.log_exit:
logging.info("exiting.")
sys.stdout.flush()
diff --git a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c
index 26d1bff152d..c66b2db578e 100644
--- a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c
+++ b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c
@@ -355,9 +355,9 @@ glusterd_get_slave (glusterd_volinfo_t *vol, const char *slaveurl, char **slavek
static int
-glusterd_query_extutil (char *resbuf, runner_t *runner)
+glusterd_query_extutil_generic (char *resbuf, size_t blen, runner_t *runner, void *data,
+ int (*fcbk)(char *resbuf, size_t blen, FILE *fp, void *data))
{
- char *ptr = NULL;
int ret = 0;
runner_redir (runner, STDOUT_FILENO, RUN_PIPE);
@@ -367,11 +367,9 @@ glusterd_query_extutil (char *resbuf, runner_t *runner)
return -1;
}
- ptr = fgets(resbuf, PATH_MAX, runner_chio (runner, STDOUT_FILENO));
- if (ptr)
- resbuf[strlen(resbuf)-1] = '\0'; //strip off \n
+ ret = fcbk (resbuf, blen, runner_chio (runner, STDOUT_FILENO), data);
- ret = runner_end (runner);
+ ret |= runner_end (runner);
if (ret)
gf_log ("", GF_LOG_ERROR, "reading data from child failed");
@@ -379,6 +377,80 @@ glusterd_query_extutil (char *resbuf, runner_t *runner)
}
static int
+_fcbk_singleline(char *resbuf, size_t blen, FILE *fp, void *data)
+{
+ char *ptr = NULL;
+
+ errno = 0;
+ ptr = fgets (resbuf, blen, fp);
+ if (ptr)
+ resbuf[strlen(resbuf)-1] = '\0'; //strip off \n
+
+ return errno ? -1 : 0;
+}
+
+static int
+glusterd_query_extutil (char *resbuf, runner_t *runner)
+{
+ return glusterd_query_extutil_generic (resbuf, PATH_MAX, runner, NULL,
+ _fcbk_singleline);
+}
+
+static int
+_fcbk_conftodict (char *resbuf, size_t blen, FILE *fp, void *data)
+{
+ char *ptr = NULL;
+ dict_t *dict = data;
+ char *v = NULL;
+
+ for (;;) {
+ errno = 0;
+ ptr = fgets (resbuf, blen, fp);
+ if (!ptr)
+ break;
+ v = resbuf + strlen(resbuf) - 1;
+ while (isspace (*v))
+ /* strip trailing space */
+ *v-- = '\0';
+ if (v == resbuf)
+ /* skip empty line */
+ continue;
+ v = strchr (resbuf, ':');
+ if (!v)
+ return -1;
+ *v++ = '\0';
+ while (isspace (*v))
+ v++;
+ v = gf_strdup (v);
+ if (!v)
+ return -1;
+ if (dict_set_dynstr (dict, resbuf, v) != 0) {
+ GF_FREE (v);
+ return -1;
+ }
+ }
+
+ return errno ? -1 : 0;
+}
+
+static int
+glusterd_gsync_get_config (char *master, char *slave, char *gl_workdir, dict_t *dict)
+{
+ /* key + value, where value must be able to accommodate a path */
+ char resbuf[256 + PATH_MAX] = {0,};
+ runner_t runner = {0,};
+
+ runinit (&runner);
+ runner_add_args (&runner, GSYNCD_PREFIX"/gsyncd", "-c", NULL);
+ runner_argprintf (&runner, "%s/"GSYNC_CONF, gl_workdir);
+ runner_argprintf (&runner, ":%s", master);
+ runner_add_args (&runner, slave, "--config-get-all", NULL);
+
+ return glusterd_query_extutil_generic (resbuf, sizeof (resbuf),
+ &runner, dict, _fcbk_conftodict);
+}
+
+static int
glusterd_gsync_get_param_file (char *prmfile, const char *param, char *master,
char *slave, char *gl_workdir)
{
@@ -1309,29 +1381,112 @@ glusterd_gsync_read_frm_status (char *path, char *buf, size_t blen)
}
static int
+glusterd_gsync_fetch_status_extra (char *path, char *buf, size_t blen)
+{
+ char sockpath[PATH_MAX] = {0,};
+ struct sockaddr_un sa = {0,};
+ size_t l = 0;
+ int s = -1;
+ struct pollfd pfd = {0,};
+ int ret = 0;
+
+ l = strlen (buf);
+ /* seek to end of data in buf */
+ buf += l;
+ blen -= l;
+
+ glusterd_set_socket_filepath (path, sockpath, sizeof (sockpath));
+
+ strncpy(sa.sun_path, sockpath, sizeof(sa.sun_path));
+ if (sa.sun_path[sizeof (sa.sun_path) - 1])
+ return -1;
+ sa.sun_family = AF_UNIX;
+
+ s = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (s == -1)
+ return -1;
+
+ ret = connect (s, (struct sockaddr *)&sa, sizeof (sa));
+ if (ret == -1)
+ goto out;
+ pfd.fd = s;
+ pfd.events = POLLIN;
+ /* we don't want to hang on gsyncd */
+ if (poll (&pfd, 1, 5000) < 1 ||
+ !(pfd.revents & POLLIN)) {
+ ret = -1;
+ goto out;
+ }
+ ret = read(s, buf, blen);
+ /* we expect a terminating 0 byte */
+ if (ret == 0 || (ret > 0 && buf[ret - 1]))
+ ret = -1;
+ if (ret > 0)
+ ret = 0;
+
+ out:
+ close (s);
+ return ret;
+}
+
+static int
+dict_get_param (dict_t *dict, char *key, char **param)
+{
+ char *dk = NULL;
+ char *s = NULL;
+ char x = '\0';
+ int ret = 0;
+
+ if (dict_get_str (dict, key, param) == 0)
+ return 0;
+
+ dk = gf_strdup (key);
+ if (!key)
+ return -1;
+
+ s = strpbrk (dk, "-_");
+ if (!s)
+ return -1;
+ x = (*s == '-') ? '_' : '-';
+ *s++ = x;
+ while ((s = strpbrk (s, "-_")))
+ *s++ = x;
+
+ ret = dict_get_str (dict, dk, param);
+
+ GF_FREE (dk);
+ return ret;
+}
+
+static int
glusterd_read_status_file (char *master, char *slave,
dict_t *dict)
{
glusterd_conf_t *priv = NULL;
int ret = 0;
- char statefile[PATH_MAX] = {0, };
+ char *statefile = NULL;
char buf[1024] = {0, };
char mst[1024] = {0, };
char slv[1024] = {0, };
char sts[1024] = {0, };
char *bufp = NULL;
+ dict_t *confd = NULL;
int gsync_count = 0;
int status = 0;
GF_ASSERT (THIS);
GF_ASSERT (THIS->private);
+ confd = dict_new ();
+ if (!dict)
+ return -1;
+
priv = THIS->private;
- ret = glusterd_gsync_get_param_file (statefile, "state", master,
- slave, priv->workdir);
+ ret = glusterd_gsync_get_config (master, slave, priv->workdir,
+ confd);
if (ret) {
- gf_log ("", GF_LOG_ERROR, "Unable to get the name of status"
- "file for %s(master), %s(slave)", master, slave);
+ gf_log ("", GF_LOG_ERROR, "Unable to get configuration data"
+ "for %s(master), %s(slave)", master, slave);
goto out;
}
@@ -1343,6 +1498,9 @@ glusterd_read_status_file (char *master, char *slave,
} else if (ret == -1)
goto out;
+ ret = dict_get_param (confd, "state_file", &statefile);
+ if (ret)
+ goto out;
ret = glusterd_gsync_read_frm_status (statefile, buf, sizeof (buf));
if (ret) {
gf_log ("", GF_LOG_ERROR, "Unable to read the status"
@@ -1350,6 +1508,30 @@ glusterd_read_status_file (char *master, char *slave,
strncpy (buf, "defunct", sizeof (buf));
goto done;
}
+ if (strcmp (buf, "OK") != 0)
+ goto done;
+
+ ret = dict_get_param (confd, "state_socket_unencoded", &statefile);
+ if (ret)
+ goto out;
+ ret = glusterd_gsync_fetch_status_extra (statefile, buf, sizeof (buf));
+ if (ret) {
+ gf_log ("", GF_LOG_ERROR, "Unable to fetch extra status"
+ "for %s(master), %s(slave)", master, slave);
+ /* there is a slight chance that this occurs due to race
+ * -- in that case, the following options all seem bad:
+ *
+ * - suppress irregurlar behavior by just leaving status
+ * on "OK"
+ * - freak out users with a misleading "defunct"
+ * - overload the meaning of the regular error signal
+ * mechanism of gsyncd, that is, when status is "faulty"
+ *
+ * -- so we just come up with something new...
+ */
+ strncpy (buf, "N/A", sizeof (buf));
+ goto done;
+ }
done:
ret = dict_get_int32 (dict, "gsync-count", &gsync_count);
@@ -1394,6 +1576,8 @@ glusterd_read_status_file (char *master, char *slave,
ret = 0;
out:
+ dict_destroy (confd);
+
gf_log ("", GF_LOG_DEBUG, "Returning %d ", ret);
return ret;
}
diff --git a/xlators/mgmt/glusterd/src/glusterd.c b/xlators/mgmt/glusterd/src/glusterd.c
index 0dfffbbed39..a3869e6317f 100644
--- a/xlators/mgmt/glusterd/src/glusterd.c
+++ b/xlators/mgmt/glusterd/src/glusterd.c
@@ -500,6 +500,13 @@ configure_syncdaemon (glusterd_conf_t *conf)
runner_add_args (&runner, ".", ".", NULL);
RUN_GSYNCD_CMD;
+ /* state-socket */
+ runinit_gsyncd_setrx (&runner, conf);
+ runner_add_arg (&runner, "state-socket-unencoded");
+ runner_argprintf (&runner, "%s/${mastervol}/${eSlave}.socket", georepdir);
+ runner_add_args (&runner, ".", ".", NULL);
+ RUN_GSYNCD_CMD;
+
/* log-file */
runinit_gsyncd_setrx (&runner, conf);
runner_add_args (&runner,