diff options
Diffstat (limited to 'xlators')
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/gsyncd.py | 56 | ||||
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 129 | ||||
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/syncdutils.py | 6 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-geo-rep.c | 206 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd.c | 7 |
5 files changed, 374 insertions, 30 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py index 9ac32ce4267..d68cea6725e 100644 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -58,6 +58,25 @@ class GLogger(Logger): logging.getLogger().handlers = [] logging.basicConfig(**lprm) + @classmethod + def _gsyncd_loginit(cls, **kw): + lkw = {} + if gconf.log_level: + lkw['level'] = gconf.log_level + if kw.get('log_file'): + if kw['log_file'] in ('-', '/dev/stderr'): + lkw['stream'] = sys.stderr + elif kw['log_file'] == '/dev/stdout': + lkw['stream'] = sys.stdout + else: + lkw['filename'] = kw['log_file'] + + cls.setup(label=kw.get('label'), **lkw) + + lkw.update({'saved_label': kw.get('label')}) + gconf.log_metadata = lkw + gconf.log_exit = True + def startup(**kw): """set up logging, pidfile grabbing, daemonization""" if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': @@ -88,22 +107,7 @@ def startup(**kw): select((x,), (), ()) os.close(x) - lkw = {} - if gconf.log_level: - lkw['level'] = gconf.log_level - if kw.get('log_file'): - if kw['log_file'] in ('-', '/dev/stderr'): - lkw['stream'] = sys.stderr - elif kw['log_file'] == '/dev/stdout': - lkw['stream'] = sys.stdout - else: - lkw['filename'] = kw['log_file'] - - GLogger.setup(label=kw.get('label'), **lkw) - - lkw.update({'saved_label': kw.get('label')}) - gconf.log_metadata = lkw - gconf.log_exit = True + GLogger._gsyncd_loginit(**kw) def main(): """main routine, signal/exception handling boilerplates""" @@ -166,6 +170,8 @@ def main_i(): op.add_option('--sync-jobs', metavar='N', type=int, default=3) op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) op.add_option('--allow-network', metavar='IPS', default='') + op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) + op.add_option('--checkpoint', metavar='LABEL', default='') op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) # duh. need to specify dest or value will be mapped to None :S @@ -278,6 +284,7 @@ def main_i(): rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf") gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) + checkpoint_change = False if confdata: opt_ok = norm(confdata.opt) in tunables + [None] if confdata.op == 'check': @@ -293,7 +300,14 @@ def main_i(): gcnf.set(confdata.opt, confdata.val, confdata.rx) elif confdata.op == 'del': gcnf.delete(confdata.opt, confdata.rx) - return + # when modifying checkpoint, it's important to make a log + # of that, so in that case we go on to set up logging even + # if its just config invocation + if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \ + not confdata.rx: + checkpoint_change = True + if not checkpoint_change: + return gconf.__dict__.update(defaults.__dict__) gcnf.update_to(gconf.__dict__) @@ -331,6 +345,14 @@ def main_i(): raise GsyncdError('cannot recognize log level "%s"' % lvl0) gconf.log_level = lvl2 + if checkpoint_change: + GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') + if confdata.op == 'set': + logging.info('checkpoint %s set' % confdata.val) + elif confdata.op == 'del': + logging.info('checkpoint info was reset') + return + go_daemon = rconf['go_daemon'] be_monitor = rconf.get('monitor') diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index 8e196f8c5f4..4826037f134 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -5,12 +5,20 @@ import stat import random import signal import logging +import socket import errno -from errno import ENOENT, ENODATA +from errno import ENOENT, ENODATA, EPIPE from threading import currentThread, Condition, Lock +from datetime import datetime +try: + from hashlib import md5 as md5 +except ImportError: + # py 2.4 + from md5 import new as md5 from gconf import gconf -from syncdutils import FreeObject, Thread, GsyncdError, boolify +from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ + escape, unescape, select URXTIME = (-1, 0) @@ -113,6 +121,122 @@ class GMaster(object): # the actual volinfo we make use of self.volinfo = None self.terminate = False + self.checkpoint_thread = None + + @staticmethod + def _checkpt_param(chkpt, prm, timish=True): + """use config backend to lookup a parameter belonging to + checkpoint @chkpt""" + cprm = getattr(gconf, 'checkpoint_' + prm, None) + if not cprm: + return + chkpt_mapped, val = cprm.split(':', 1) + if unescape(chkpt_mapped) != chkpt: + return + if timish: + val = tuple(int(x) for x in val.split(".")) + return val + + @staticmethod + def _set_checkpt_param(chkpt, prm, val, timish=True): + """use config backend to store a parameter associated + with checkpoint @chkpt""" + if timish: + val = "%d.%d" % tuple(val) + gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) + + @staticmethod + def humantime(*tpair): + """format xtime-like (sec, nsec) pair to human readable format""" + ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ + strftime("%Y-%m-%d %H:%M:%S") + if len(tpair) > 1: + ts += '.' + str(tpair[1]) + return ts + + def checkpt_service(self, chan, chkpt, tgt): + """checkpoint service loop + + monitor and verify checkpoint status for @chkpt, and listen + for incoming requests for whom we serve a pretty-formatted + status report""" + if not chkpt: + # dummy loop for the case when there is no checkpt set + while True: + select([chan], [], []) + conn, _ = chan.accept() + conn.send('\0') + conn.close() + completed = self._checkpt_param(chkpt, 'completed') + while True: + s,_,_ = select([chan], [], [], (not completed) and 5 or None) + # either request made and we re-check to not + # give back stale data, or we still hunting for completion + if tgt < self.volmark: + # indexing has been reset since setting the checkpoint + status = "is invalid" + else: + xtr = self.xtime('.', self.slave) + if isinstance(xtr, int): + raise GsyncdError("slave root directory is unaccessible (%s)", + os.strerror(xtr)) + ncompleted = (xtr >= tgt) + if completed and not ncompleted: # stale data + logging.warn("completion time %s for checkpoint %s became stale" % \ + (self.humantime(*completed), chkpt)) + completed = None + gconf.confdata.delete('checkpoint-completed') + if ncompleted and not completed: # just reaching completion + completed = [ int(x) for x in ("%.6f" % time.time()).split('.') ] + self._set_checkpt_param(chkpt, 'completed', completed) + logging.info("checkpoint %s completed" % chkpt) + status = completed and \ + "completed at " + self.humantime(completed[0]) or \ + "not reached yet" + if s: + conn = None + try: + conn, _ = chan.accept() + try: + conn.send(" | checkpoint %s %s\0" % (chkpt, status)) + except: + exc = sys.exc_info()[1] + if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ + exc.errno == EPIPE: + logging.debug('checkpoint client disconnected') + else: + raise + finally: + if conn: + conn.close() + + def start_checkpoint_thread(self): + """prepare and start checkpoint service""" + if self.checkpoint_thread or not getattr(gconf, 'state_socket_unencoded', None): + return + chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + state_socket = "/tmp/%s.socket" % md5(gconf.state_socket_unencoded).hexdigest() + try: + os.unlink(state_socket) + except: + if sys.exc_info()[0] == OSError: + pass + chan.bind(state_socket) + chan.listen(1) + checkpt_tgt = None + if gconf.checkpoint: + checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') + if not checkpt_tgt: + checkpt_tgt = self.xtime('.') + if isinstance(checkpt_tgt, int): + raise GsyncdError("master root directory is unaccessible (%s)", + os.strerror(checkpt_tgt)) + self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) + logging.debug("checkpoint target %d.%d has been determined for checkpoint %s" % \ + (checkpt_tgt[0], checkpt_tgt[1], gconf.checkpoint)) + t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) + t.start() + self.checkpoint_thread = t def crawl_loop(self): """start the keep-alive thread and iterate .crawl""" @@ -291,6 +415,7 @@ class GMaster(object): if self.volinfo: if self.volinfo['retval']: raise GsyncdError ("master is corrupt") + self.start_checkpoint_thread() else: if should_display_info or self.crawls == 0: if self.inter_master: diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py index f786bc34326..1d4eb20032c 100644 --- a/xlators/features/marker/utils/syncdaemon/syncdutils.py +++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py @@ -138,6 +138,12 @@ def finalize(*a, **kw): raise if gconf.ssh_ctl_dir and not gconf.cpid: shutil.rmtree(gconf.ssh_ctl_dir) + if getattr(gconf, 'state_socket', None): + try: + os.unlink(gconf.state_socket) + except: + if sys.exc_info()[0] == OSError: + pass if gconf.log_exit: logging.info("exiting.") sys.stdout.flush() diff --git a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c index 26d1bff152d..c66b2db578e 100644 --- a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c +++ b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c @@ -355,9 +355,9 @@ glusterd_get_slave (glusterd_volinfo_t *vol, const char *slaveurl, char **slavek static int -glusterd_query_extutil (char *resbuf, runner_t *runner) +glusterd_query_extutil_generic (char *resbuf, size_t blen, runner_t *runner, void *data, + int (*fcbk)(char *resbuf, size_t blen, FILE *fp, void *data)) { - char *ptr = NULL; int ret = 0; runner_redir (runner, STDOUT_FILENO, RUN_PIPE); @@ -367,11 +367,9 @@ glusterd_query_extutil (char *resbuf, runner_t *runner) return -1; } - ptr = fgets(resbuf, PATH_MAX, runner_chio (runner, STDOUT_FILENO)); - if (ptr) - resbuf[strlen(resbuf)-1] = '\0'; //strip off \n + ret = fcbk (resbuf, blen, runner_chio (runner, STDOUT_FILENO), data); - ret = runner_end (runner); + ret |= runner_end (runner); if (ret) gf_log ("", GF_LOG_ERROR, "reading data from child failed"); @@ -379,6 +377,80 @@ glusterd_query_extutil (char *resbuf, runner_t *runner) } static int +_fcbk_singleline(char *resbuf, size_t blen, FILE *fp, void *data) +{ + char *ptr = NULL; + + errno = 0; + ptr = fgets (resbuf, blen, fp); + if (ptr) + resbuf[strlen(resbuf)-1] = '\0'; //strip off \n + + return errno ? -1 : 0; +} + +static int +glusterd_query_extutil (char *resbuf, runner_t *runner) +{ + return glusterd_query_extutil_generic (resbuf, PATH_MAX, runner, NULL, + _fcbk_singleline); +} + +static int +_fcbk_conftodict (char *resbuf, size_t blen, FILE *fp, void *data) +{ + char *ptr = NULL; + dict_t *dict = data; + char *v = NULL; + + for (;;) { + errno = 0; + ptr = fgets (resbuf, blen, fp); + if (!ptr) + break; + v = resbuf + strlen(resbuf) - 1; + while (isspace (*v)) + /* strip trailing space */ + *v-- = '\0'; + if (v == resbuf) + /* skip empty line */ + continue; + v = strchr (resbuf, ':'); + if (!v) + return -1; + *v++ = '\0'; + while (isspace (*v)) + v++; + v = gf_strdup (v); + if (!v) + return -1; + if (dict_set_dynstr (dict, resbuf, v) != 0) { + GF_FREE (v); + return -1; + } + } + + return errno ? -1 : 0; +} + +static int +glusterd_gsync_get_config (char *master, char *slave, char *gl_workdir, dict_t *dict) +{ + /* key + value, where value must be able to accommodate a path */ + char resbuf[256 + PATH_MAX] = {0,}; + runner_t runner = {0,}; + + runinit (&runner); + runner_add_args (&runner, GSYNCD_PREFIX"/gsyncd", "-c", NULL); + runner_argprintf (&runner, "%s/"GSYNC_CONF, gl_workdir); + runner_argprintf (&runner, ":%s", master); + runner_add_args (&runner, slave, "--config-get-all", NULL); + + return glusterd_query_extutil_generic (resbuf, sizeof (resbuf), + &runner, dict, _fcbk_conftodict); +} + +static int glusterd_gsync_get_param_file (char *prmfile, const char *param, char *master, char *slave, char *gl_workdir) { @@ -1309,29 +1381,112 @@ glusterd_gsync_read_frm_status (char *path, char *buf, size_t blen) } static int +glusterd_gsync_fetch_status_extra (char *path, char *buf, size_t blen) +{ + char sockpath[PATH_MAX] = {0,}; + struct sockaddr_un sa = {0,}; + size_t l = 0; + int s = -1; + struct pollfd pfd = {0,}; + int ret = 0; + + l = strlen (buf); + /* seek to end of data in buf */ + buf += l; + blen -= l; + + glusterd_set_socket_filepath (path, sockpath, sizeof (sockpath)); + + strncpy(sa.sun_path, sockpath, sizeof(sa.sun_path)); + if (sa.sun_path[sizeof (sa.sun_path) - 1]) + return -1; + sa.sun_family = AF_UNIX; + + s = socket(AF_UNIX, SOCK_STREAM, 0); + if (s == -1) + return -1; + + ret = connect (s, (struct sockaddr *)&sa, sizeof (sa)); + if (ret == -1) + goto out; + pfd.fd = s; + pfd.events = POLLIN; + /* we don't want to hang on gsyncd */ + if (poll (&pfd, 1, 5000) < 1 || + !(pfd.revents & POLLIN)) { + ret = -1; + goto out; + } + ret = read(s, buf, blen); + /* we expect a terminating 0 byte */ + if (ret == 0 || (ret > 0 && buf[ret - 1])) + ret = -1; + if (ret > 0) + ret = 0; + + out: + close (s); + return ret; +} + +static int +dict_get_param (dict_t *dict, char *key, char **param) +{ + char *dk = NULL; + char *s = NULL; + char x = '\0'; + int ret = 0; + + if (dict_get_str (dict, key, param) == 0) + return 0; + + dk = gf_strdup (key); + if (!key) + return -1; + + s = strpbrk (dk, "-_"); + if (!s) + return -1; + x = (*s == '-') ? '_' : '-'; + *s++ = x; + while ((s = strpbrk (s, "-_"))) + *s++ = x; + + ret = dict_get_str (dict, dk, param); + + GF_FREE (dk); + return ret; +} + +static int glusterd_read_status_file (char *master, char *slave, dict_t *dict) { glusterd_conf_t *priv = NULL; int ret = 0; - char statefile[PATH_MAX] = {0, }; + char *statefile = NULL; char buf[1024] = {0, }; char mst[1024] = {0, }; char slv[1024] = {0, }; char sts[1024] = {0, }; char *bufp = NULL; + dict_t *confd = NULL; int gsync_count = 0; int status = 0; GF_ASSERT (THIS); GF_ASSERT (THIS->private); + confd = dict_new (); + if (!dict) + return -1; + priv = THIS->private; - ret = glusterd_gsync_get_param_file (statefile, "state", master, - slave, priv->workdir); + ret = glusterd_gsync_get_config (master, slave, priv->workdir, + confd); if (ret) { - gf_log ("", GF_LOG_ERROR, "Unable to get the name of status" - "file for %s(master), %s(slave)", master, slave); + gf_log ("", GF_LOG_ERROR, "Unable to get configuration data" + "for %s(master), %s(slave)", master, slave); goto out; } @@ -1343,6 +1498,9 @@ glusterd_read_status_file (char *master, char *slave, } else if (ret == -1) goto out; + ret = dict_get_param (confd, "state_file", &statefile); + if (ret) + goto out; ret = glusterd_gsync_read_frm_status (statefile, buf, sizeof (buf)); if (ret) { gf_log ("", GF_LOG_ERROR, "Unable to read the status" @@ -1350,6 +1508,30 @@ glusterd_read_status_file (char *master, char *slave, strncpy (buf, "defunct", sizeof (buf)); goto done; } + if (strcmp (buf, "OK") != 0) + goto done; + + ret = dict_get_param (confd, "state_socket_unencoded", &statefile); + if (ret) + goto out; + ret = glusterd_gsync_fetch_status_extra (statefile, buf, sizeof (buf)); + if (ret) { + gf_log ("", GF_LOG_ERROR, "Unable to fetch extra status" + "for %s(master), %s(slave)", master, slave); + /* there is a slight chance that this occurs due to race + * -- in that case, the following options all seem bad: + * + * - suppress irregurlar behavior by just leaving status + * on "OK" + * - freak out users with a misleading "defunct" + * - overload the meaning of the regular error signal + * mechanism of gsyncd, that is, when status is "faulty" + * + * -- so we just come up with something new... + */ + strncpy (buf, "N/A", sizeof (buf)); + goto done; + } done: ret = dict_get_int32 (dict, "gsync-count", &gsync_count); @@ -1394,6 +1576,8 @@ glusterd_read_status_file (char *master, char *slave, ret = 0; out: + dict_destroy (confd); + gf_log ("", GF_LOG_DEBUG, "Returning %d ", ret); return ret; } diff --git a/xlators/mgmt/glusterd/src/glusterd.c b/xlators/mgmt/glusterd/src/glusterd.c index 0dfffbbed39..a3869e6317f 100644 --- a/xlators/mgmt/glusterd/src/glusterd.c +++ b/xlators/mgmt/glusterd/src/glusterd.c @@ -500,6 +500,13 @@ configure_syncdaemon (glusterd_conf_t *conf) runner_add_args (&runner, ".", ".", NULL); RUN_GSYNCD_CMD; + /* state-socket */ + runinit_gsyncd_setrx (&runner, conf); + runner_add_arg (&runner, "state-socket-unencoded"); + runner_argprintf (&runner, "%s/${mastervol}/${eSlave}.socket", georepdir); + runner_add_args (&runner, ".", ".", NULL); + RUN_GSYNCD_CMD; + /* log-file */ runinit_gsyncd_setrx (&runner, conf); runner_add_args (&runner, |