diff options
Diffstat (limited to 'apachekerbauth/apachekerbauth/var/www/cgi-bin/memcached.py')
-rw-r--r-- | apachekerbauth/apachekerbauth/var/www/cgi-bin/memcached.py | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/apachekerbauth/apachekerbauth/var/www/cgi-bin/memcached.py b/apachekerbauth/apachekerbauth/var/www/cgi-bin/memcached.py new file mode 100644 index 0000000..ecd9332 --- /dev/null +++ b/apachekerbauth/apachekerbauth/var/www/cgi-bin/memcached.py @@ -0,0 +1,318 @@ +# Copyright (c) 2010-2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Lucid comes with memcached: v1.4.2. Protocol documentation for that +version is at: + +http://github.com/memcached/memcached/blob/1.4.2/doc/protocol.txt +""" + +import cPickle as pickle +import logging +import socket +import time +from bisect import bisect +from hashlib import md5 + +DEFAULT_MEMCACHED_PORT = 11211 + +CONN_TIMEOUT = 0.3 +IO_TIMEOUT = 2.0 +PICKLE_FLAG = 1 +NODE_WEIGHT = 50 +PICKLE_PROTOCOL = 2 +TRY_COUNT = 3 + +# if ERROR_LIMIT_COUNT errors occur in ERROR_LIMIT_TIME seconds, the server +# will be considered failed for ERROR_LIMIT_DURATION seconds. +ERROR_LIMIT_COUNT = 10 +ERROR_LIMIT_TIME = 60 +ERROR_LIMIT_DURATION = 60 + + +def md5hash(key): + return md5(key).hexdigest() + + +class MemcacheConnectionError(Exception): + pass + + +class MemcacheRing(object): + """ + Simple, consistent-hashed memcache client. + """ + + def __init__(self, servers, connect_timeout=CONN_TIMEOUT, + io_timeout=IO_TIMEOUT, tries=TRY_COUNT): + self._ring = {} + self._errors = dict(((serv, []) for serv in servers)) + self._error_limited = dict(((serv, 0) for serv in servers)) + for server in sorted(servers): + for i in xrange(NODE_WEIGHT): + self._ring[md5hash('%s-%s' % (server, i))] = server + self._tries = tries if tries <= len(servers) else len(servers) + self._sorted = sorted(self._ring.keys()) + self._client_cache = dict(((server, []) for server in servers)) + self._connect_timeout = connect_timeout + self._io_timeout = io_timeout + + def _exception_occurred(self, server, e, action='talking'): + if isinstance(e, socket.timeout): + logging.error(_("Timeout %(action)s to memcached: %(server)s"), + {'action': action, 'server': server}) + else: + logging.exception(_("Error %(action)s to memcached: %(server)s"), + {'action': action, 'server': server}) + now = time.time() + self._errors[server].append(time.time()) + if len(self._errors[server]) > ERROR_LIMIT_COUNT: + self._errors[server] = [err for err in self._errors[server] + if err > now - ERROR_LIMIT_TIME] + if len(self._errors[server]) > ERROR_LIMIT_COUNT: + self._error_limited[server] = now + ERROR_LIMIT_DURATION + logging.error(_('Error limiting server %s'), server) + + def _get_conns(self, key): + """ + Retrieves a server conn from the pool, or connects a new one. + Chooses the server based on a consistent hash of "key". + """ + pos = bisect(self._sorted, key) + served = [] + while len(served) < self._tries: + pos = (pos + 1) % len(self._sorted) + server = self._ring[self._sorted[pos]] + if server in served: + continue + served.append(server) + if self._error_limited[server] > time.time(): + continue + try: + fp, sock = self._client_cache[server].pop() + yield server, fp, sock + except IndexError: + try: + if ':' in server: + host, port = server.split(':') + else: + host = server + port = DEFAULT_MEMCACHED_PORT + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.settimeout(self._connect_timeout) + sock.connect((host, int(port))) + sock.settimeout(self._io_timeout) + yield server, sock.makefile(), sock + except Exception, e: + self._exception_occurred(server, e, 'connecting') + + def _return_conn(self, server, fp, sock): + """ Returns a server connection to the pool """ + self._client_cache[server].append((fp, sock)) + + def set(self, key, value, serialize=True, timeout=0): + """ + Set a key/value pair in memcache + + :param key: key + :param value: value + :param serialize: if True, value is pickled before sending to memcache + :param timeout: ttl in memcache + """ + key = md5hash(key) + if timeout > 0: + timeout += time.time() + flags = 0 + if serialize: + value = pickle.dumps(value, PICKLE_PROTOCOL) + flags |= PICKLE_FLAG + for (server, fp, sock) in self._get_conns(key): + try: + sock.sendall('set %s %d %d %s noreply\r\n%s\r\n' % \ + (key, flags, timeout, len(value), value)) + self._return_conn(server, fp, sock) + return + except Exception, e: + self._exception_occurred(server, e) + + def get(self, key): + """ + Gets the object specified by key. It will also unpickle the object + before returning if it is pickled in memcache. + + :param key: key + :returns: value of the key in memcache + """ + key = md5hash(key) + value = None + for (server, fp, sock) in self._get_conns(key): + try: + sock.sendall('get %s\r\n' % key) + line = fp.readline().strip().split() + while line[0].upper() != 'END': + if line[0].upper() == 'VALUE' and line[1] == key: + size = int(line[3]) + value = fp.read(size) + if int(line[2]) & PICKLE_FLAG: + value = pickle.loads(value) + fp.readline() + line = fp.readline().strip().split() + self._return_conn(server, fp, sock) + return value + except Exception, e: + self._exception_occurred(server, e) + + def incr(self, key, delta=1, timeout=0): + """ + Increments a key which has a numeric value by delta. + If the key can't be found, it's added as delta or 0 if delta < 0. + If passed a negative number, will use memcached's decr. Returns + the int stored in memcached + Note: The data memcached stores as the result of incr/decr is + an unsigned int. decr's that result in a number below 0 are + stored as 0. + + :param key: key + :param delta: amount to add to the value of key (or set as the value + if the key is not found) will be cast to an int + :param timeout: ttl in memcache + :raises MemcacheConnectionError: + """ + key = md5hash(key) + command = 'incr' + if delta < 0: + command = 'decr' + delta = str(abs(int(delta))) + for (server, fp, sock) in self._get_conns(key): + try: + sock.sendall('%s %s %s\r\n' % (command, key, delta)) + line = fp.readline().strip().split() + if line[0].upper() == 'NOT_FOUND': + add_val = delta + if command == 'decr': + add_val = '0' + sock.sendall('add %s %d %d %s\r\n%s\r\n' % \ + (key, 0, timeout, len(add_val), add_val)) + line = fp.readline().strip().split() + if line[0].upper() == 'NOT_STORED': + sock.sendall('%s %s %s\r\n' % (command, key, delta)) + line = fp.readline().strip().split() + ret = int(line[0].strip()) + else: + ret = int(add_val) + else: + ret = int(line[0].strip()) + self._return_conn(server, fp, sock) + return ret + except Exception, e: + self._exception_occurred(server, e) + raise MemcacheConnectionError("No Memcached connections succeeded.") + + def decr(self, key, delta=1, timeout=0): + """ + Decrements a key which has a numeric value by delta. Calls incr with + -delta. + + :param key: key + :param delta: amount to subtract to the value of key (or set the + value to 0 if the key is not found) will be cast to + an int + :param timeout: ttl in memcache + :raises MemcacheConnectionError: + """ + self.incr(key, delta=-delta, timeout=timeout) + + def delete(self, key): + """ + Deletes a key/value pair from memcache. + + :param key: key to be deleted + """ + key = md5hash(key) + for (server, fp, sock) in self._get_conns(key): + try: + sock.sendall('delete %s noreply\r\n' % key) + self._return_conn(server, fp, sock) + return + except Exception, e: + self._exception_occurred(server, e) + + def set_multi(self, mapping, server_key, serialize=True, timeout=0): + """ + Sets multiple key/value pairs in memcache. + + :param mapping: dictonary of keys and values to be set in memcache + :param servery_key: key to use in determining which server in the ring + is used + :param serialize: if True, value is pickled before sending to memcache + :param timeout: ttl for memcache + """ + server_key = md5hash(server_key) + if timeout > 0: + timeout += time.time() + msg = '' + for key, value in mapping.iteritems(): + key = md5hash(key) + flags = 0 + if serialize: + value = pickle.dumps(value, PICKLE_PROTOCOL) + flags |= PICKLE_FLAG + msg += ('set %s %d %d %s noreply\r\n%s\r\n' % + (key, flags, timeout, len(value), value)) + for (server, fp, sock) in self._get_conns(server_key): + try: + sock.sendall(msg) + self._return_conn(server, fp, sock) + return + except Exception, e: + self._exception_occurred(server, e) + + def get_multi(self, keys, server_key): + """ + Gets multiple values from memcache for the given keys. + + :param keys: keys for values to be retrieved from memcache + :param servery_key: key to use in determining which server in the ring + is used + :returns: list of values + """ + server_key = md5hash(server_key) + keys = [md5hash(key) for key in keys] + for (server, fp, sock) in self._get_conns(server_key): + try: + sock.sendall('get %s\r\n' % ' '.join(keys)) + line = fp.readline().strip().split() + responses = {} + while line[0].upper() != 'END': + if line[0].upper() == 'VALUE': + size = int(line[3]) + value = fp.read(size) + if int(line[2]) & PICKLE_FLAG: + value = pickle.loads(value) + responses[line[1]] = value + fp.readline() + line = fp.readline().strip().split() + values = [] + for key in keys: + if key in responses: + values.append(responses[key]) + else: + values.append(None) + self._return_conn(server, fp, sock) + return values + except Exception, e: + self._exception_occurred(server, e) |