summaryrefslogtreecommitdiffstats
path: root/xlators/protocol/server/src/authenticate.h
blob: 6fc539333434f9469ae74c2acca3dace8247141c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/*
  Copyright (c) 2007-2013 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#ifndef _AUTHENTICATE_H
#define _AUTHENTICATE_H

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#include <stdio.h>
#include <fnmatch.h>
#include "dict.h"
#include "compat.h"
#include "list.h"
#include "xlator.h"

typedef enum { AUTH_ACCEPT, AUTH_REJECT, AUTH_DONT_CARE } auth_result_t;

typedef auth_result_t (*auth_fn_t)(dict_t *input_params, dict_t *config_params);

typedef struct {
    void *handle;
    auth_fn_t authenticate;
    volume_opt_list_t *vol_opt;
} auth_handle_t;

int32_t
gf_auth_init(xlator_t *xl, dict_t *auth_modules);
void
gf_auth_fini(dict_t *auth_modules);
auth_result_t
gf_authenticate(dict_t *, dict_t *, dict_t *);

#endif /* _AUTHENTICATE_H */
**kw) class SendmarkRsyncMixin(object): def sendmark_regular(self, *a, **kw): pass class PurgeNormalMixin(object): def purge_missing(self, path, names): self.slave.server.purge(path, names) class PurgeNoopMixin(object): def purge_missing(self, path, names): pass class TarSSHEngine(object): """Sync engine that uses tar(1) piped over ssh(1) for data transfers. Good for lots of small files. """ def a_syncdata(self, files): logging.debug(lf("Files", files=files)) for f in files: pb = self.syncer.add(f) def regjob(se, xte, pb): rv = pb.wait() if rv[0]: logging.debug(lf('synced', file=se)) return True else: # stat check for file presence st = lstat(se) if isinstance(st, int): # file got unlinked in the interim self.unlinked_gfids.add(se) return True self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) def syncdata_wait(self): if self.wait(self.FLAT_DIR_HIERARCHY, None): return True def syncdata(self, files): self.a_syncdata(files) self.syncdata_wait() class RsyncEngine(object): """Sync engine that uses rsync(1) for data transfers""" def a_syncdata(self, files): logging.debug(lf("files", files=files)) for f in files: logging.debug(lf('candidate for syncing', file=f)) pb = self.syncer.add(f) def regjob(se, xte, pb): rv = pb.wait() if rv[0]: logging.debug(lf('synced', file=se)) return True else: # stat to check if the file exist st = lstat(se) if isinstance(st, int): # file got unlinked in the interim self.unlinked_gfids.add(se) return True self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) def syncdata_wait(self): if self.wait(self.FLAT_DIR_HIERARCHY, None): return True def syncdata(self, files): self.a_syncdata(files) self.syncdata_wait() class GMasterCommon(object): """abstract class impementling master role""" KFGN = 0 KNAT = 1 def get_sys_volinfo(self): """query volume marks on fs root err out on multiple foreign masters """ fgn_vis, nat_vi = ( self.master.server.aggregated.foreign_volume_infos(), self.master.server.aggregated.native_volume_info()) fgn_vi = None if fgn_vis: if len(fgn_vis) > 1: raise GsyncdError("cannot work with multiple foreign masters") fgn_vi = fgn_vis[0] return fgn_vi, nat_vi @property def uuid(self): if self.volinfo: return self.volinfo['uuid'] @property def volmark(self): if self.volinfo: return self.volinfo['volume_mark'] def get_entry_stime(self): data = self.slave.server.entry_stime(".", self.uuid) if isinstance(data, int): data = None return data def get_data_stime(self): data = self.slave.server.stime(".", self.uuid) if isinstance(data, int): data = None return data def xtime(self, path, *a, **opts): """get amended xtime as of amending, we can create missing xtime, or determine a valid value if what we get is expired (as of the volume mark expiry); way of amendig depends on @opts and on subject of query (master or slave). """ if a: rsc = a[0] else: rsc = self.master self.make_xtime_opts(rsc == self.master, opts) return self.xtime_low(rsc, path, **opts) def __init__(self, master, slave): self.master = master self.slave = slave self.jobtab = {} if gconf.get("sync-method") == "tarssh": self.syncer = Syncer(slave, self.slave.tarssh, [2]) else: # partial transfer (cf. rsync(1)), that's normal self.syncer = Syncer(slave, self.slave.rsync, [23, 24]) # crawls vs. turns: # - self.crawls is simply the number of crawl() invocations on root # - one turn is a maximal consecutive sequence of crawls so that each # crawl in it detects a change to be synced # - self.turns is the number of turns since start # - self.total_turns is a limit so that if self.turns reaches it, then # we exit (for diagnostic purposes) # so, eg., if the master fs changes unceasingly, self.turns will remain # 0. self.crawls = 0 self.turns = 0 self.total_turns = rconf.turns self.crawl_start = datetime.now() self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} self.start = None self.change_seen = None # the actual volinfo we make use of self.volinfo = None self.terminate = False self.sleep_interval = 1 self.unlinked_gfids = set() def init_keep_alive(cls): """start the keep-alive thread """ timo = gconf.get("slave-timeout", 0) if timo > 0: def keep_alive(): while True: vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5) cls.slave.server.keep_alive(vi) time.sleep(gap) t = Thread(target=keep_alive) t.start() def mgmt_lock(self): """Take management volume lock """ if rconf.mgmt_lock_fd: try: fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) if not rconf.active_earlier: rconf.active_earlier = True logging.info(lf("Got lock Becoming ACTIVE", brick=rconf.args.local_path)) return True except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): if not rconf.passive_earlier: rconf.passive_earlier = True logging.info(lf("Didn't get lock Becoming PASSIVE", brick=rconf.local_path)) return False raise fd = None bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \ + str(rconf.args.subvol_num) + ".lock" mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep") path = os.path.join(mgmt_lock_dir, bname) logging.debug(lf("lock file path", path=path)) try: fd = os.open(path, os.O_CREAT | os.O_RDWR) except OSError: ex = sys.exc_info()[1] if ex.errno == ENOENT: logging.info("Creating geo-rep directory in meta volume...") try: os.makedirs(mgmt_lock_dir) except OSError: ex = sys.exc_info()[1] if ex.errno == EEXIST: pass else: raise fd = os.open(path, os.O_CREAT | os.O_RDWR) else: raise try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # Save latest FD for future use rconf.mgmt_lock_fd = fd except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): # cannot grab, it's taken if not rconf.passive_earlier: rconf.passive_earlier = True logging.info(lf("Didn't get lock Becoming PASSIVE", brick=rconf.args.local_path)) rconf.mgmt_lock_fd = fd return False raise if not rconf.active_earlier: rconf.active_earlier = True logging.info(lf("Got lock Becoming ACTIVE", brick=rconf.args.local_path)) return True def should_crawl(self): if not gconf.get("use-meta-volume"): return rconf.args.local_node_id in self.master.server.node_uuid() if not os.path.ismount(gconf.get("meta-volume-mnt")): logging.error("Meta-volume is not mounted. Worker Exiting...") sys.exit(1) return self.mgmt_lock() def register(self): self.register() def crawlwrap(self, oneshot=False, register_time=None): if oneshot: # it's important to do this during the oneshot crawl as # for a passive gsyncd (ie. in a replicate scenario) # the keepalive thread would keep the connection alive. self.init_keep_alive() # If crawlwrap is called when partial history available, # then it sets register_time which is the time when geo-rep # worker registerd to changelog consumption. Since nsec is # not considered in register time, their are chances of skipping # changes detection in xsync crawl. This limit will be reset when # crawlwrap is called again. self.live_changelog_start_time = None if register_time: self.live_changelog_start_time = (register_time, 0) # no need to maintain volinfo state machine. # in a cascading setup, each geo-replication session is # independent (ie. 'volume-mark' and 'xtime' are not # propogated). This is because the slave's xtime is now # stored on the master itself. 'volume-mark' just identifies # that we are in a cascading setup and need to enable # 'geo-replication.ignore-pid-check' option. volinfo_sys = self.volinfo_hook() self.volinfo = volinfo_sys[self.KNAT] inter_master = volinfo_sys[self.KFGN] logging.debug("%s master with volume id %s ..." % (inter_master and "intermediate" or "primary", self.uuid)) rconf.volume_id = self.uuid if self.volinfo: if self.volinfo['retval']: logging.warn(lf("master cluster's info may not be valid", error=self.volinfo['retval'])) else: raise GsyncdError("master volinfo unavailable") self.lastreport['time'] = time.time() t0 = time.time() crawl = self.should_crawl() while not self.terminate: if self.start: logging.debug("... crawl #%d done, took %.6f seconds" % (self.crawls, time.time() - self.start)) self.start = time.time() should_display_info = self.start - self.lastreport['time'] >= 60 if should_display_info: logging.debug("%d crawls, %d turns", self.crawls - self.lastreport['crawls'], self.turns - self.lastreport['turns']) self.lastreport.update(crawls=self.crawls, turns=self.turns, time=self.start) t1 = time.time() if int(t1 - t0) >= gconf.get("replica-failover-interval"): crawl = self.should_crawl() t0 = t1 self.update_worker_remote_node() if not crawl: self.status.set_passive() # bring up _this_ brick to the cluster stime # which is min of cluster (but max of the replicas) brick_stime = self.xtime('.', self.slave) cluster_stime = self.master.server.aggregated.stime_mnt( '.', '.'.join([str(self.uuid), rconf.args.slave_id])) logging.debug(lf("Crawl info", cluster_stime=cluster_stime, brick_stime=brick_stime)) if not isinstance(cluster_stime, int): if brick_stime < cluster_stime: self.slave.server.set_stime( self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) self.upd_stime(cluster_stime) # Purge all changelogs available in processing dir