summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
Diffstat (limited to 'xlators')
-rw-r--r--xlators/features/marker/utils/syncdaemon/gconf.py1
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py10
-rw-r--r--xlators/features/marker/utils/syncdaemon/monitor.py41
-rw-r--r--xlators/features/marker/utils/syncdaemon/repce.py5
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py11
-rw-r--r--xlators/features/marker/utils/syncdaemon/syncdutils.py22
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-geo-rep.c371
7 files changed, 437 insertions, 24 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py
index 4e3b959fe37..146c72a1825 100644
--- a/xlators/features/marker/utils/syncdaemon/gconf.py
+++ b/xlators/features/marker/utils/syncdaemon/gconf.py
@@ -10,6 +10,7 @@ class GConf(object):
pid_file_owned = False
log_exit = False
permanent_handles = []
+ log_metadata = {}
@classmethod
def setup_ssh_ctl(cls, ctld):
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
index 67a873cb57c..2b8356b1620 100644
--- a/xlators/features/marker/utils/syncdaemon/gsyncd.py
+++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py
@@ -6,7 +6,6 @@ import sys
import time
import logging
import signal
-import select
import optparse
import fcntl
import fnmatch
@@ -18,7 +17,7 @@ from ipaddr import IPAddress, IPNetwork
from gconf import gconf
from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception
-from syncdutils import GsyncdError
+from syncdutils import GsyncdError, select
from configinterface import GConffile
import resource
from monitor import monitor
@@ -59,7 +58,6 @@ class GLogger(Logger):
logging.getLogger().handlers = []
logging.basicConfig(**lprm)
-
def startup(**kw):
"""set up logging, pidfile grabbing, daemonization"""
if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn':
@@ -87,7 +85,7 @@ def startup(**kw):
# so we can start up with
# no messing from the dirty
# ol' bustard
- select.select((x,), (), ())
+ select((x,), (), ())
os.close(x)
lkw = {}
@@ -100,7 +98,11 @@ def startup(**kw):
lkw['stream'] = sys.stdout
else:
lkw['filename'] = kw['log_file']
+
GLogger.setup(label=kw.get('label'), **lkw)
+
+ lkw.update({'saved_label': kw.get('label')})
+ gconf.log_metadata = lkw
gconf.log_exit = True
def main():
diff --git a/xlators/features/marker/utils/syncdaemon/monitor.py b/xlators/features/marker/utils/syncdaemon/monitor.py
index b8e9219dc47..9536f3e2683 100644
--- a/xlators/features/marker/utils/syncdaemon/monitor.py
+++ b/xlators/features/marker/utils/syncdaemon/monitor.py
@@ -1,11 +1,10 @@
import os
import sys
import time
+import signal
import logging
-import select
-from signal import SIGKILL
from gconf import gconf
-from syncdutils import update_file
+from syncdutils import update_file, select, waitpid
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
@@ -42,6 +41,19 @@ class Monitor(object):
blown worker blows up on EPIPE if the net goes down,
due to the keep-alive thread)
"""
+ def sigcont_handler(*a):
+ """
+ Re-init logging and send group kill signal
+ """
+ md = gconf.log_metadata
+ logging.shutdown()
+ lcls = logging.getLoggerClass()
+ lcls.setup(label=md.get('saved_label'), **md)
+ pid = os.getpid()
+ os.kill(-pid, signal.SIGUSR1)
+ signal.signal(signal.SIGUSR1, lambda *a: ())
+ signal.signal(signal.SIGCONT, sigcont_handler)
+
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
@@ -52,11 +64,16 @@ class Monitor(object):
self.set_state('starting...')
ret = 0
def nwait(p, o=0):
- p2, r = os.waitpid(p, o)
+ p2, r = waitpid(p, o)
if not p2:
return
- if os.WIFEXITED(r):
- return os.WEXITSTATUS(r)
+ return r
+ def exit_signalled(s):
+ """ child teminated due to receipt of SIGUSR1 """
+ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
+ def exit_status(s):
+ if os.WIFEXITED(s):
+ return os.WEXITSTATUS(s)
return 1
conn_timeout = 60
while ret in (0, 1):
@@ -69,7 +86,7 @@ class Monitor(object):
os.execv(sys.executable, argv + ['--feedback-fd', str(pw)])
os.close(pw)
t0 = time.time()
- so = select.select((pr,), (), (), conn_timeout)[0]
+ so = select((pr,), (), (), conn_timeout)[0]
os.close(pr)
if so:
ret = nwait(cpid, os.WNOHANG)
@@ -86,13 +103,17 @@ class Monitor(object):
else:
logging.debug("worker not confirmed in %d sec, aborting it" % \
conn_timeout)
- os.kill(cpid, SIGKILL)
+ os.kill(cpid, signal.SIGKILL)
ret = nwait(cpid)
if ret == None:
self.set_state('OK')
ret = nwait(cpid)
- elif ret in (0, 1):
- self.set_state('faulty')
+ if exit_signalled(ret):
+ ret = 0
+ else:
+ ret = exit_status(ret)
+ if ret in (0,1):
+ self.set_state('faulty')
time.sleep(10)
self.set_state('inconsistent')
return ret
diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py
index 9473524909a..755fb61df48 100644
--- a/xlators/features/marker/utils/syncdaemon/repce.py
+++ b/xlators/features/marker/utils/syncdaemon/repce.py
@@ -1,6 +1,5 @@
import os
import sys
-import select
import time
import logging
from threading import Condition
@@ -20,7 +19,7 @@ except ImportError:
# py 3
import pickle
-from syncdutils import Thread
+from syncdutils import Thread, select
pickle_proto = -1
repce_version = 1.0
@@ -154,7 +153,7 @@ class RepceClient(object):
def listen(self):
while True:
- select.select((self.inf,), (), ())
+ select((self.inf,), (), ())
rid, exc, res = recv(self.inf)
rjob = self.jtab.pop(rid)
if rjob.cbk:
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
index 3595f428fdd..3454c38234a 100644
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ b/xlators/features/marker/utils/syncdaemon/resource.py
@@ -5,7 +5,6 @@ import stat
import time
import errno
import struct
-import select
import socket
import logging
import tempfile
@@ -18,7 +17,7 @@ import repce
from repce import RepceServer, RepceClient
from master import GMaster
import syncdutils
-from syncdutils import GsyncdError
+from syncdutils import GsyncdError, select
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@@ -113,11 +112,11 @@ class Popen(subprocess.Popen):
@classmethod
def init_errhandler(cls):
- """start the thread which hanldes children's error output"""
+ """start the thread which handles children's error output"""
cls.errstore = {}
def tailer():
while True:
- for po in select.select([po.stderr for po in cls.errstore], [], []):
+ for po in select([po.stderr for po in cls.errstore], [], []):
po.lock.acquire()
try:
la = cls.errstore.get(po)
@@ -419,7 +418,7 @@ class SlaveLocal(object):
logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
break
else:
- select.select((), (), ())
+ select((), (), ())
class SlaveRemote(object):
"""mix-in class to implement an interface to a remote slave"""
@@ -826,7 +825,7 @@ class SSH(AbstractUrl, SlaveRemote):
i, o = ret
inf = os.fdopen(i)
repce.send(o, None, '__repce_version__')
- select.select((inf,), (), ())
+ select((inf,), (), ())
repce.recv(inf)
# hack hack hack: store a global reference to the file
# to save it from getting GC'd which implies closing it
diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py
index e3098d5f4ea..59defa711ed 100644
--- a/xlators/features/marker/utils/syncdaemon/syncdutils.py
+++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py
@@ -6,9 +6,11 @@ import fcntl
import shutil
import logging
from threading import Lock, Thread as baseThread
-from errno import EACCES, EAGAIN, EPIPE, ENOTCONN
+from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, EINTR
from signal import SIGTERM, SIGKILL
from time import sleep
+import select as oselect
+from os import waitpid as owaitpid
try:
from cPickle import PickleError
except ImportError:
@@ -247,3 +249,21 @@ def boolify(s):
logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s))
return rv
+
+def eintr_wrap(func, exc, *a):
+ """
+ wrapper around syscalls resilient to interrupt caused
+ by signals
+ """
+ while True:
+ try:
+ return func(*a)
+ except exc, ex:
+ if not ex[0] == EINTR:
+ raise GsyncdError(ex[1])
+
+def select(*a):
+ return eintr_wrap(oselect.select, oselect.error, *a)
+
+def waitpid (*a):
+ return eintr_wrap(owaitpid, OSError, *a)
diff --git a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c
index eae5dd827b4..f6ea5aeae3d 100644
--- a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c
+++ b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c
@@ -135,6 +135,9 @@ glusterd_handle_gsync_set (rpcsvc_request_t *req)
case GF_GSYNC_OPTION_TYPE_STATUS:
strncpy (operation, "status", sizeof (operation));
break;
+ case GF_GSYNC_OPTION_TYPE_ROTATE:
+ strncpy (operation, "rotate", sizeof(operation));
+ break;
}
gf_cmd_log ("volume "GEOREP, " %s command on %s,%s", operation, master,
@@ -390,6 +393,60 @@ glusterd_gsync_get_param_file (char *prmfile, const char *param, char *master,
return glusterd_query_extutil (prmfile, &runner);
}
+int
+glusterd_gsync_get_session_owner (char *master, char *slave, char *session_owner,
+ char *gl_workdir)
+{
+ runner_t runner = {0,};
+
+ runinit(&runner);
+ runner_add_args (&runner, GSYNCD_PREFIX"/gsyncd", "-c", NULL);
+ runner_argprintf (&runner, "%s/"GSYNC_CONF, gl_workdir);
+ runner_argprintf (&runner, ":%s", master);
+ runner_add_args (&runner, slave, "--config-get", "session-owner",
+ NULL);
+
+ return glusterd_query_extutil (session_owner, &runner);
+}
+
+int
+glusterd_gsync_get_slave_log_file (char *master, char *slave, char *log_file)
+{
+ int ret = -1;
+ runner_t runner = {0,};
+ char uuid_str[64] = {0,};
+ glusterd_conf_t *priv = NULL;
+ char *gl_workdir = NULL;
+
+ GF_ASSERT(THIS);
+ GF_ASSERT(THIS->private);
+
+ priv = THIS->private;
+
+ GF_VALIDATE_OR_GOTO("gsyncd", master, out);
+ GF_VALIDATE_OR_GOTO("gsyncd", slave, out);
+
+ gl_workdir = priv->workdir;
+
+ /* get the session owner for the master-slave session */
+ ret = glusterd_gsync_get_session_owner (master, slave, uuid_str,
+ gl_workdir);
+ if (ret)
+ goto out;
+
+ /* get the log file for the slave */
+ runinit(&runner);
+ runner_add_args (&runner, GSYNCD_PREFIX"/gsyncd", "-c", NULL);
+ runner_argprintf (&runner, "%s/"GSYNC_CONF, gl_workdir);
+ runner_argprintf (&runner, "--session-owner=%s", uuid_str);
+ runner_add_args (&runner, slave, "--config-get", "log-file", NULL);
+
+ ret = glusterd_query_extutil (log_file, &runner);
+
+ out:
+ return ret;
+}
+
static int
gsyncd_getpidfile (char *master, char *slave, char *pidfile)
{
@@ -419,6 +476,32 @@ gsyncd_getpidfile (char *master, char *slave, char *pidfile)
}
static int
+glusterd_gsyncd_getlogfile (char *master, char *slave, char *log_file)
+{
+ int ret = -1;
+ glusterd_conf_t *priv = NULL;
+
+ GF_ASSERT (THIS);
+ GF_ASSERT (THIS->private);
+
+ priv = THIS->private;
+
+ GF_VALIDATE_OR_GOTO ("gsync", master, out);
+ GF_VALIDATE_OR_GOTO ("gsync", slave, out);
+
+ ret = glusterd_gsync_get_param_file (log_file, "log", master,
+ slave, priv->workdir);
+ if (ret == -1) {
+ ret = -2;
+ gf_log ("", GF_LOG_WARNING, "failed to gsyncd logfile");
+ goto out;
+ }
+
+ out:
+ return ret;
+}
+
+static int
gsync_status_byfd (int fd)
{
GF_ASSERT (fd >= -1);
@@ -965,6 +1048,11 @@ glusterd_op_stage_gsync_set (dict_t *dict, char **op_errstr)
ret = gsync_verify_config_options (dict, op_errstr);
goto out;
+
+ case GF_GSYNC_OPTION_TYPE_ROTATE:
+ /* checks same as status mode */
+ ret = glusterd_verify_gsync_status_opts(dict, op_errstr);
+ goto out;
}
ret = glusterd_op_gsync_args_get (dict, op_errstr, &volname, &slave);
@@ -1476,8 +1564,285 @@ glusterd_get_gsync_status (dict_t *dict, char **op_errstr, dict_t *rsp_dict)
out:
gf_log ("", GF_LOG_DEBUG, "Returning %d", ret);
return ret;
+}
+int
+glusterd_send_sigstop (pid_t pid)
+{
+ int ret = 0;
+ ret = kill (pid, SIGSTOP);
+ if (ret)
+ gf_log ("", GF_LOG_ERROR, GEOREP"failed to send SIGSTOP signal");
+ return ret;
+}
+int
+glusterd_send_sigcont (pid_t pid)
+{
+ int ret = 0;
+ ret = kill (pid, SIGCONT);
+ if (ret)
+ gf_log ("", GF_LOG_ERROR, GEOREP"failed to send SIGCONT signal");
+ return ret;
+}
+
+/*
+ * Log rotations flow is something like this:
+ * - Send SIGSTOP to process group (this will stop monitor/worker process
+ * and also the slave if it's local)
+ * - Rotate log file for monitor/worker
+ * - Rotate log file for slave if it's local
+ * - Send SIGCONT to the process group. Monitor wakes up, kills the worker
+ * (this is done in the SIGCONT handler), which results in the termination
+ * of the slave (local/remote). After returning from signal handler,
+ * monitor detects absence of worker and starts it again, which in-turn
+ * starts the slave.
+ */
+int
+glusterd_send_log_rotate_signal (pid_t pid, char *logfile1, char *logfile2)
+{
+ int ret = 0;
+ struct stat stbuf = {0,};
+ char rlogfile[PATH_MAX] = {0,};
+ time_t rottime = 0;
+
+ ret = glusterd_send_sigstop (-pid);
+ rottime = time (NULL);
+
+ snprintf (rlogfile, sizeof (rlogfile), "%s.%"PRIu64, logfile1,
+ (uint64_t) rottime);
+ ret = rename (logfile1, rlogfile);
+ if (ret)
+ gf_log ("", GF_LOG_ERROR, "rename failed for geo-rep log file");
+
+
+ snprintf (rlogfile, sizeof (rlogfile), "%s.%"PRIu64, logfile2,
+ (uint64_t) rottime);
+ ret = stat (logfile2, &stbuf);
+ if (ret) {
+ if (errno != ENOENT)
+ gf_log("", GF_LOG_ERROR, "stat failed for slave log"
+ " file: %s", logfile2);
+ else {
+ gf_log ("", GF_LOG_DEBUG, "Slave is not local, skipping rotation");
+ ret = 0;
+ }
+ goto out;
+ }
+
+ ret = rename (logfile2, rlogfile);
+ if (ret)
+ gf_log ("", GF_LOG_ERROR, "rename failed for geo-rep slave log file");
+
+ out:
+ ret = glusterd_send_sigcont (-pid);
+
+ return ret;
+}
+
+int
+glusterd_get_pid_from_file (char *master, char *slave, pid_t *pid)
+{
+ int ret = -1;
+ int pfd = 0;
+ char pidfile[PATH_MAX] = {0,};
+ char buff[1024] = {0,};
+
+ pfd = gsyncd_getpidfile (master, slave, pidfile);
+
+ if (pfd == -2) {
+ gf_log ("", GF_LOG_ERROR, GEOREP" log-rotate validation "
+ " failed for %s & %s", master, slave);
+ goto out;
+ }
+ if (gsync_status_byfd (pfd) == -1) {
+ gf_log ("", GF_LOG_ERROR, "gsyncd b/w %s & %s is not"
+ " running", master, slave);
+ goto out;
+ }
+
+ ret = read (pfd, buff, 1024);
+ if (ret < 0) {
+ gf_log ("", GF_LOG_ERROR, GEOREP" cannot read pid from pid-file");
+ goto out;
+ }
+
+ close(pfd);
+
+ *pid = strtol (buff, NULL, 10);
+ ret = 0;
+
+ out:
+ return ret;
+}
+
+int
+glusterd_do_gsync_log_rotate (char *master, char *slave, uuid_t *uuid, char **op_errstr)
+{
+ int ret = 0;
+ glusterd_conf_t *priv = NULL;
+ pid_t pid = 0;
+ char log_file1[PATH_MAX] = {0,};
+ char log_file2[PATH_MAX] = {0,};
+
+ GF_ASSERT (THIS);
+ GF_ASSERT (THIS->private);
+
+ priv = THIS->private;
+
+ ret = glusterd_get_pid_from_file (master, slave, &pid);
+ if (ret)
+ goto out;
+
+ /* log file */
+ ret = glusterd_gsyncd_getlogfile (master, slave, log_file1);
+ if (ret)
+ goto out;
+
+ /* slave log file */
+ ret = glusterd_gsync_get_slave_log_file (master, slave, log_file2);
+ if (ret)
+ goto out;
+
+ ret = glusterd_send_log_rotate_signal (pid, log_file1, log_file2);
+
+ out:
+ if (ret && op_errstr)
+ *op_errstr = gf_strdup("Error rotating log file");
+ return ret;
+}
+
+int
+glusterd_do_gsync_log_rotation_mst_slv (glusterd_volinfo_t *volinfo, char *slave,
+ char **op_errstr)
+{
+ uuid_t uuid = {0, };
+ glusterd_conf_t *priv = NULL;
+ int ret = 0;
+ char errmsg[1024] = {0,};
+
+ GF_ASSERT (volinfo);
+ GF_ASSERT (slave);
+ GF_ASSERT (THIS);
+ GF_ASSERT (THIS->private);
+
+ priv = THIS->private;
+
+ ret = glusterd_gsync_get_uuid (slave, volinfo, uuid);
+ if ((ret == 0) && (uuid_compare (priv->uuid, uuid) != 0))
+ goto out;
+
+ if (ret) {
+ snprintf(errmsg, sizeof(errmsg), "geo-replication session b/w %s %s not active",
+ volinfo->volname, slave);
+ gf_log ("", GF_LOG_WARNING, errmsg);
+ if (op_errstr)
+ *op_errstr = gf_strdup(errmsg);
+ goto out;
+ }
+
+ ret = glusterd_do_gsync_log_rotate (volinfo->volname, slave, &uuid, op_errstr);
+
+ out:
+ gf_log ("", GF_LOG_DEBUG, "Returning with %d", ret);
+ return ret;
+}
+
+static void
+_iterate_log_rotate_mst_slv (dict_t *this, char *key, data_t *value, void *data)
+{
+ glusterd_gsync_status_temp_t *param = NULL;
+ char *slave = NULL;
+
+ param = (glusterd_gsync_status_temp_t *) data;
+
+ GF_ASSERT (param);
+ GF_ASSERT (param->volinfo);
+
+ slave = strchr (value->data, ':');
+ if (slave)
+ slave++;
+ else {
+ gf_log ("", GF_LOG_ERROR, "geo-replication log-rotate: slave (%s) "
+ "not comfirming to format", slave);
+ return;
+ }
+
+ (void) glusterd_do_gsync_log_rotation_mst_slv (param->volinfo, slave, NULL);
+}
+
+int
+glusterd_do_gsync_log_rotation_mst (glusterd_volinfo_t *volinfo)
+{
+ glusterd_gsync_status_temp_t param = {0, };
+
+ GF_ASSERT (volinfo);
+
+ param.volinfo = volinfo;
+ dict_foreach (volinfo->gsync_slaves, _iterate_log_rotate_mst_slv, &param);
+ return 0;
+}
+
+static int
+glusterd_rotate_gsync_all ()
+{
+ int32_t ret = 0;
+ glusterd_conf_t *priv = NULL;
+ glusterd_volinfo_t *volinfo = NULL;
+
+ GF_ASSERT (THIS);
+ priv = THIS->private;
+
+ GF_ASSERT (priv);
+
+ list_for_each_entry (volinfo, &priv->volumes, vol_list) {
+ ret = glusterd_do_gsync_log_rotation_mst (volinfo);
+ if (ret)
+ goto out;
+ }
+
+ out:
+ gf_log ("", GF_LOG_DEBUG, "Returning with %d", ret);
+ return ret;
+}
+
+static int
+glusterd_rotate_gsync_logs (dict_t *dict, char **op_errstr, dict_t *rsp_dict)
+{
+ char *slave = NULL;
+ char *volname = NULL;
+ char errmsg[1024] = {0,};
+ gf_boolean_t exists = _gf_false;
+ glusterd_volinfo_t *volinfo = NULL;
+ int ret = 0;
+
+ ret = dict_get_str (dict, "master", &volname);
+ if (ret < 0) {
+ ret = glusterd_rotate_gsync_all ();
+ goto out;
+ }
+
+ exists = glusterd_check_volume_exists (volname);
+ ret = glusterd_volinfo_find (volname, &volinfo);
+ if ((ret) || (!exists)) {
+ snprintf (errmsg, sizeof(errmsg), "Volume %s does not"
+ " exist", volname);
+ gf_log ("", GF_LOG_WARNING, errmsg);
+ *op_errstr = gf_strdup (errmsg);
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_get_str (dict, "slave", &slave);
+ if (ret < 0) {
+ ret = glusterd_do_gsync_log_rotation_mst (volinfo);
+ goto out;
+ }
+
+ ret = glusterd_do_gsync_log_rotation_mst_slv (volinfo, slave, op_errstr);
+
+ out:
+ return ret;
}
@@ -1520,6 +1885,12 @@ glusterd_op_gsync_set (dict_t *dict, char **op_errstr, dict_t *rsp_dict)
goto out;
}
+ if (type == GF_GSYNC_OPTION_TYPE_ROTATE) {
+ ret = glusterd_rotate_gsync_logs (dict, op_errstr, resp_dict);
+ goto out;
+
+ }
+
ret = dict_get_str (dict, "slave", &slave);
if (ret < 0)
goto out;