-# util/langhelpers.py
-# Copyright (C) 2005-2011 the SQLAlchemy authors and contributors <see AUTHORS file>
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Routines to help with the creation, loading and introspection of
-modules, classes, hierarchies, attributes, functions, and methods.
-import itertools
-import inspect
-import operator
-import re
-import sys
-import types
-import warnings
-from compat import update_wrapper, set_types, threading
-from sqlalchemy import exc
-def _unique_symbols(used, *bases):
- used = set(used)
- for base in bases:
- pool = itertools.chain((base,),
- itertools.imap(lambda i: base + str(i),
- xrange(1000)))
- for sym in pool:
- if sym not in used:
- used.add(sym)
- yield sym
- break
- else:
- raise NameError("exhausted namespace for symbol base %s" % base)
-def decorator(target):
- """A signature-matching decorator factory."""
- def decorate(fn):
- if not inspect.isfunction(fn):
- raise Exception("not a decoratable function")
- spec = inspect.getargspec(fn)
- names = tuple(spec[0]) + spec[1:3] + (fn.func_name,)
- targ_name, fn_name = _unique_symbols(names, 'target', 'fn')
- metadata = dict(target=targ_name, fn=fn_name)
- metadata.update(format_argspec_plus(spec, grouped=False))
- code = 'lambda %(args)s: %(target)s(%(fn)s, %(apply_kw)s)' % (
- metadata)
- decorated = eval(code, {targ_name:target, fn_name:fn})
- decorated.func_defaults = getattr(fn, 'im_func', fn).func_defaults
- return update_wrapper(decorated, fn)
- return update_wrapper(decorate, target)
-def get_cls_kwargs(cls):
- """Return the full set of inherited kwargs for the given `cls`.
- Probes a class's __init__ method, collecting all named arguments. If the
- __init__ defines a \**kwargs catch-all, then the constructor is presumed to
- pass along unrecognized keywords to it's base classes, and the collection
- process is repeated recursively on each of the bases.
- Uses a subset of inspect.getargspec() to cut down on method overhead.
- No anonymous tuple arguments please !
- """
- for c in cls.__mro__:
- if '__init__' in c.__dict__:
- stack = set([c])
- break
- else:
- return []
- args = set()
- while stack:
- class_ = stack.pop()
- ctr = class_.__dict__.get('__init__', False)
- if (not ctr or
- not isinstance(ctr, types.FunctionType) or
- not isinstance(ctr.func_code, types.CodeType)):
- stack.update(class_.__bases__)
- continue
- # this is shorthand for
- # names, _, has_kw, _ = inspect.getargspec(ctr)
- names, has_kw = inspect_func_args(ctr)
- args.update(names)
- if has_kw:
- stack.update(class_.__bases__)
- args.discard('self')
- return args
- from inspect import CO_VARKEYWORDS
- def inspect_func_args(fn):
- co = fn.func_code
- nargs = co.co_argcount
- names = co.co_varnames
- args = list(names[:nargs])
- has_kw = bool(co.co_flags & CO_VARKEYWORDS)
- return args, has_kw
-except ImportError:
- def inspect_func_args(fn):
- names, _, has_kw, _ = inspect.getargspec(fn)
- return names, bool(has_kw)
-def get_func_kwargs(func):
- """Return the set of legal kwargs for the given `func`.
- Uses getargspec so is safe to call for methods, functions,
- etc.
- """
- return inspect.getargspec(func)[0]
-def format_argspec_plus(fn, grouped=True):
- """Returns a dictionary of formatted, introspected function arguments.
- A enhanced variant of inspect.formatargspec to support code generation.
- fn
- An inspectable callable or tuple of inspect getargspec() results.
- grouped
- Defaults to True; include (parens, around, argument) lists
- Returns:
- args
- Full inspect.formatargspec for fn
- self_arg
- The name of the first positional argument, varargs[0], or None
- if the function defines no positional arguments.
- apply_pos
- args, re-written in calling rather than receiving syntax. Arguments are
- passed positionally.
- apply_kw
- Like apply_pos, except keyword-ish args are passed as keywords.
- Example::
- >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
- {'args': '(self, a, b, c=3, **d)',
- 'self_arg': 'self',
- 'apply_kw': '(self, a, b, c=c, **d)',
- 'apply_pos': '(self, a, b, c, **d)'}
- """
- spec = callable(fn) and inspect.getargspec(fn) or fn
- args = inspect.formatargspec(*spec)
- if spec[0]:
- self_arg = spec[0][0]
- elif spec[1]:
- self_arg = '%s[0]' % spec[1]
- else:
- self_arg = None
- apply_pos = inspect.formatargspec(spec[0], spec[1], spec[2])
- defaulted_vals = spec[3] is not None and spec[0][0-len(spec[3]):] or ()
- apply_kw = inspect.formatargspec(spec[0], spec[1], spec[2], defaulted_vals,
- formatvalue=lambda x: '=' + x)
- if grouped:
- return dict(args=args, self_arg=self_arg,
- apply_pos=apply_pos, apply_kw=apply_kw)
- else:
- return dict(args=args[1:-1], self_arg=self_arg,
- apply_pos=apply_pos[1:-1], apply_kw=apply_kw[1:-1])
-def format_argspec_init(method, grouped=True):
- """format_argspec_plus with considerations for typical __init__ methods
- Wraps format_argspec_plus with error handling strategies for typical
- __init__ cases::
- object.__init__ -> (self)
- other unreflectable (usually C) -> (self, *args, **kwargs)
- """
- try:
- return format_argspec_plus(method, grouped=grouped)
- except TypeError:
- self_arg = 'self'
- if method is object.__init__:
- args = grouped and '(self)' or 'self'
- else:
- args = (grouped and '(self, *args, **kwargs)'
- or 'self, *args, **kwargs')
- return dict(self_arg='self', args=args, apply_pos=args, apply_kw=args)
-def getargspec_init(method):
- """inspect.getargspec with considerations for typical __init__ methods
- Wraps inspect.getargspec with error handling for typical __init__ cases::
- object.__init__ -> (self)
- other unreflectable (usually C) -> (self, *args, **kwargs)
- """
- try:
- return inspect.getargspec(method)
- except TypeError:
- if method is object.__init__:
- return (['self'], None, None, None)
- else:
- return (['self'], 'args', 'kwargs', None)
-def unbound_method_to_callable(func_or_cls):
- """Adjust the incoming callable such that a 'self' argument is not required."""
- if isinstance(func_or_cls, types.MethodType) and not func_or_cls.im_self:
- return func_or_cls.im_func
- else:
- return func_or_cls
-class portable_instancemethod(object):
- """Turn an instancemethod into a (parent, name) pair
- to produce a serializable callable.
- """
- def __init__(self, meth):
- self.target = meth.im_self
- self.name = meth.__name__
- def __call__(self, *arg, **kw):
- return getattr(self.target, self.name)(*arg, **kw)
-def class_hierarchy(cls):
- """Return an unordered sequence of all classes related to cls.
- Traverses diamond hierarchies.
- Fibs slightly: subclasses of builtin types are not returned. Thus
- class_hierarchy(class A(object)) returns (A, object), not A plus every
- class systemwide that derives from object.
- Old-style classes are discarded and hierarchies rooted on them
- will not be descended.
- """
- # Py2K
- if isinstance(cls, types.ClassType):
- return list()
- # end Py2K
- hier = set([cls])
- process = list(cls.__mro__)
- while process:
- c = process.pop()
- # Py2K
- if isinstance(c, types.ClassType):
- continue
- for b in (_ for _ in c.__bases__
- if _ not in hier and not isinstance(_, types.ClassType)):
- # end Py2K
- # Py3K
- #for b in (_ for _ in c.__bases__
- # if _ not in hier):
- process.append(b)
- hier.add(b)
- # Py3K
- #if c.__module__ == 'builtins' or not hasattr(c, '__subclasses__'):
- # continue
- # Py2K
- if c.__module__ == '__builtin__' or not hasattr(c, '__subclasses__'):
- continue
- # end Py2K
- for s in [_ for _ in c.__subclasses__() if _ not in hier]:
- process.append(s)
- hier.add(s)
- return list(hier)
-def iterate_attributes(cls):
- """iterate all the keys and attributes associated
- with a class, without using getattr().
- Does not use getattr() so that class-sensitive
- descriptors (i.e. property.__get__()) are not called.
- """
- keys = dir(cls)
- for key in keys:
- for c in cls.__mro__:
- if key in c.__dict__:
- yield (key, c.__dict__[key])
- break
-def monkeypatch_proxied_specials(into_cls, from_cls, skip=None, only=None,
- name='self.proxy', from_instance=None):
- """Automates delegation of __specials__ for a proxying type."""
- if only:
- dunders = only
- else:
- if skip is None:
- skip = ('__slots__', '__del__', '__getattribute__',
- '__metaclass__', '__getstate__', '__setstate__')
- dunders = [m for m in dir(from_cls)
- if (m.startswith('__') and m.endswith('__') and
- not hasattr(into_cls, m) and m not in skip)]
- for method in dunders:
- try:
- fn = getattr(from_cls, method)
- if not hasattr(fn, '__call__'):
- continue
- fn = getattr(fn, 'im_func', fn)
- except AttributeError:
- continue
- try:
- spec = inspect.getargspec(fn)
- fn_args = inspect.formatargspec(spec[0])
- d_args = inspect.formatargspec(spec[0][1:])
- except TypeError:
- fn_args = '(self, *args, **kw)'
- d_args = '(*args, **kw)'
- py = ("def %(method)s%(fn_args)s: "
- "return %(name)s.%(method)s%(d_args)s" % locals())
- env = from_instance is not None and {name: from_instance} or {}
- exec py in env
- try:
- env[method].func_defaults = fn.func_defaults
- except AttributeError:
- pass
- setattr(into_cls, method, env[method])
-def methods_equivalent(meth1, meth2):
- """Return True if the two methods are the same implementation."""
- # Py3K
- #return getattr(meth1, '__func__', meth1) is getattr(meth2, '__func__', meth2)
- # Py2K
- return getattr(meth1, 'im_func', meth1) is getattr(meth2, 'im_func', meth2)
- # end Py2K
-def as_interface(obj, cls=None, methods=None, required=None):
- """Ensure basic interface compliance for an instance or dict of callables.
- Checks that ``obj`` implements public methods of ``cls`` or has members
- listed in ``methods``. If ``required`` is not supplied, implementing at
- least one interface method is sufficient. Methods present on ``obj`` that
- are not in the interface are ignored.
- If ``obj`` is a dict and ``dict`` does not meet the interface
- requirements, the keys of the dictionary are inspected. Keys present in
- ``obj`` that are not in the interface will raise TypeErrors.
- Raises TypeError if ``obj`` does not meet the interface criteria.
- In all passing cases, an object with callable members is returned. In the
- simple case, ``obj`` is returned as-is; if dict processing kicks in then
- an anonymous class is returned.
- obj
- A type, instance, or dictionary of callables.
- cls
- Optional, a type. All public methods of cls are considered the
- interface. An ``obj`` instance of cls will always pass, ignoring
- ``required``..
- methods
- Optional, a sequence of method names to consider as the interface.
- required
- Optional, a sequence of mandatory implementations. If omitted, an
- ``obj`` that provides at least one interface method is considered
- sufficient. As a convenience, required may be a type, in which case
- all public methods of the type are required.
- """
- if not cls and not methods:
- raise TypeError('a class or collection of method names are required')
- if isinstance(cls, type) and isinstance(obj, cls):
- return obj
- interface = set(methods or [m for m in dir(cls) if not m.startswith('_')])
- implemented = set(dir(obj))
- complies = operator.ge
- if isinstance(required, type):
- required = interface
- elif not required:
- required = set()
- complies = operator.gt
- else:
- required = set(required)
- if complies(implemented.intersection(interface), required):
- return obj
- # No dict duck typing here.
- if not type(obj) is dict:
- qualifier = complies is operator.gt and 'any of' or 'all of'
- raise TypeError("%r does not implement %s: %s" % (
- obj, qualifier, ', '.join(interface)))
- class AnonymousInterface(object):
- """A callable-holding shell."""
- if cls:
- AnonymousInterface.__name__ = 'Anonymous' + cls.__name__
- found = set()
- for method, impl in dictlike_iteritems(obj):
- if method not in interface:
- raise TypeError("%r: unknown in this interface" % method)
- if not callable(impl):
- raise TypeError("%r=%r is not callable" % (method, impl))
- setattr(AnonymousInterface, method, staticmethod(impl))
- found.add(method)
- if complies(found, required):
- return AnonymousInterface
- raise TypeError("dictionary does not contain required keys %s" %
- ', '.join(required - found))
-class memoized_property(object):
- """A read-only @property that is only evaluated once."""
- def __init__(self, fget, doc=None):
- self.fget = fget
- self.__doc__ = doc or fget.__doc__
- self.__name__ = fget.__name__
- def __get__(self, obj, cls):
- if obj is None:
- return self
- obj.__dict__[self.__name__] = result = self.fget(obj)
- return result
-class memoized_instancemethod(object):
- """Decorate a method memoize its return value.
- Best applied to no-arg methods: memoization is not sensitive to
- argument values, and will always return the same value even when
- called with different arguments.
- """
- def __init__(self, fget, doc=None):
- self.fget = fget
- self.__doc__ = doc or fget.__doc__
- self.__name__ = fget.__name__
- def __get__(self, obj, cls):
- if obj is None:
- return self
- def oneshot(*args, **kw):
- result = self.fget(obj, *args, **kw)
- memo = lambda *a, **kw: result
- memo.__name__ = self.__name__
- memo.__doc__ = self.__doc__
- obj.__dict__[self.__name__] = memo
- return result
- oneshot.__name__ = self.__name__
- oneshot.__doc__ = self.__doc__
- return oneshot
-def reset_memoized(instance, name):
- instance.__dict__.pop(name, None)
-class group_expirable_memoized_property(object):
- """A family of @memoized_properties that can be expired in tandem."""
- def __init__(self):
- self.attributes = []
- def expire_instance(self, instance):
- """Expire all memoized properties for *instance*."""
- stash = instance.__dict__
- for attribute in self.attributes:
- stash.pop(attribute, None)
- def __call__(self, fn):
- self.attributes.append(fn.__name__)
- return memoized_property(fn)
-class importlater(object):
- """Deferred import object.
- e.g.::
- somesubmod = importlater("mypackage.somemodule", "somesubmod")
- is equivalent to::
- from mypackage.somemodule import somesubmod
- except evaluted upon attribute access to "somesubmod".
- """
- def __init__(self, path, addtl=None):
- self._il_path = path
- self._il_addtl = addtl
- @memoized_property
- def module(self):
- if self._il_addtl:
- m = __import__(self._il_path, globals(), locals(),
- [self._il_addtl])
- try:
- return getattr(m, self._il_addtl)
- except AttributeError:
- raise ImportError(
- "Module %s has no attribute '%s'" %
- (self._il_path, self._il_addtl)
- )
- else:
- m = __import__(self._il_path)
- for token in self._il_path.split(".")[1:]:
- m = getattr(m, token)
- return m
- def __getattr__(self, key):
- try:
- attr = getattr(self.module, key)
- except AttributeError:
- raise AttributeError(
- "Module %s has no attribute '%s'" %
- (self._il_path, key)
- )
- self.__dict__[key] = attr
- return attr
-# from paste.deploy.converters
-def asbool(obj):
- if isinstance(obj, (str, unicode)):
- obj = obj.strip().lower()
- if obj in ['true', 'yes', 'on', 'y', 't', '1']:
- return True
- elif obj in ['false', 'no', 'off', 'n', 'f', '0']:
- return False
- else:
- raise ValueError("String is not true/false: %r" % obj)
- return bool(obj)
-def bool_or_str(*text):
- """Return a callable that will evaulate a string as
- boolean, or one of a set of "alternate" string values.
- """
- def bool_or_value(obj):
- if obj in text:
- return obj
- else:
- return asbool(obj)
- return bool_or_value
-def asint(value):
- """Coerce to integer."""
- if value is None:
- return value
- return int(value)
-def coerce_kw_type(kw, key, type_, flexi_bool=True):
- """If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
- necessary. If 'flexi_bool' is True, the string '0' is considered false
- when coercing to boolean.
- """
- if key in kw and type(kw[key]) is not type_ and kw[key] is not None:
- if type_ is bool and flexi_bool:
- kw[key] = asbool(kw[key])
- else:
- kw[key] = type_(kw[key])
-def constructor_copy(obj, cls, **kw):
- """Instantiate cls using the __dict__ of obj as constructor arguments.
- Uses inspect to match the named arguments of ``cls``.
- """
- names = get_cls_kwargs(cls)
- kw.update((k, obj.__dict__[k]) for k in names if k in obj.__dict__)
- return cls(**kw)
-def duck_type_collection(specimen, default=None):
- """Given an instance or class, guess if it is or is acting as one of
- the basic collection types: list, set and dict. If the __emulates__
- property is present, return that preferentially.
- """
- if hasattr(specimen, '__emulates__'):
- # canonicalize set vs sets.Set to a standard: the builtin set
- if (specimen.__emulates__ is not None and
- issubclass(specimen.__emulates__, set_types)):
- return set
- else:
- return specimen.__emulates__
- isa = isinstance(specimen, type) and issubclass or isinstance
- if isa(specimen, list):
- return list
- elif isa(specimen, set_types):
- return set
- elif isa(specimen, dict):
- return dict
- if hasattr(specimen, 'append'):
- return list
- elif hasattr(specimen, 'add'):
- return set
- elif hasattr(specimen, 'set'):
- return dict
- else:
- return default
-def assert_arg_type(arg, argtype, name):
- if isinstance(arg, argtype):
- return arg
- else:
- if isinstance(argtype, tuple):
- raise exc.ArgumentError(
- "Argument '%s' is expected to be one of type %s, got '%s'" %
- (name, ' or '.join("'%s'" % a for a in argtype), type(arg)))
- else:
- raise exc.ArgumentError(
- "Argument '%s' is expected to be of type '%s', got '%s'" %
- (name, argtype, type(arg)))
-def dictlike_iteritems(dictlike):
- """Return a (key, value) iterator for almost any dict-like object."""
- # Py3K
- #if hasattr(dictlike, 'items'):
- # return dictlike.items()
- # Py2K
- if hasattr(dictlike, 'iteritems'):
- return dictlike.iteritems()
- elif hasattr(dictlike, 'items'):
- return iter(dictlike.items())
- # end Py2K
- getter = getattr(dictlike, '__getitem__', getattr(dictlike, 'get', None))
- if getter is None:
- raise TypeError(
- "Object '%r' is not dict-like" % dictlike)
- if hasattr(dictlike, 'iterkeys'):
- def iterator():
- for key in dictlike.iterkeys():
- yield key, getter(key)
- return iterator()
- elif hasattr(dictlike, 'keys'):
- return iter((key, getter(key)) for key in dictlike.keys())
- else:
- raise TypeError(
- "Object '%r' is not dict-like" % dictlike)
-class classproperty(property):
- """A decorator that behaves like @property except that operates
- on classes rather than instances.
- The decorator is currently special when using the declarative
- module, but note that the
- :class:`~.sqlalchemy.ext.declarative.declared_attr`
- decorator should be used for this purpose with declarative.
- """
- def __init__(self, fget, *arg, **kw):
- super(classproperty, self).__init__(fget, *arg, **kw)
- self.__doc__ = fget.__doc__
- def __get__(desc, self, cls):
- return desc.fget(cls)
-class _symbol(object):
- def __init__(self, name, doc=None):
- """Construct a new named symbol."""
- assert isinstance(name, str)
- self.name = name
- if doc:
- self.__doc__ = doc
- def __reduce__(self):
- return symbol, (self.name,)
- def __repr__(self):
- return "<symbol '%s>" % self.name
-_symbol.__name__ = 'symbol'
-class symbol(object):
- """A constant symbol.
- >>> symbol('foo') is symbol('foo')
- True
- >>> symbol('foo')
- <symbol 'foo>
- A slight refinement of the MAGICCOOKIE=object() pattern. The primary
- advantage of symbol() is its repr(). They are also singletons.
- Repeated calls of symbol('name') will all return the same instance.
- The optional ``doc`` argument assigns to ``__doc__``. This
- is strictly so that Sphinx autoattr picks up the docstring we want
- (it doesn't appear to pick up the in-module docstring if the datamember
- is in a different module - autoattribute also blows up completely).
- If Sphinx fixes/improves this then we would no longer need
- ``doc`` here.
- """
- symbols = {}
- _lock = threading.Lock()
- def __new__(cls, name, doc=None):
- cls._lock.acquire()
- try:
- sym = cls.symbols.get(name)
- if sym is None:
- cls.symbols[name] = sym = _symbol(name, doc)
- return sym
- finally:
- symbol._lock.release()
-_creation_order = 1
-def set_creation_order(instance):
- """Assign a '_creation_order' sequence to the given instance.
- This allows multiple instances to be sorted in order of creation
- (typically within a single thread; the counter is not particularly
- threadsafe).
- """
- global _creation_order
- instance._creation_order = _creation_order
- _creation_order +=1
-def warn_exception(func, *args, **kwargs):
- """executes the given function, catches all exceptions and converts to a warning."""
- try:
- return func(*args, **kwargs)
- except:
- warn("%s('%s') ignored" % sys.exc_info()[0:2])
-def warn(msg, stacklevel=3):
- """Issue a warning.
- If msg is a string, :class:`.exc.SAWarning` is used as
- the category.
- .. note:: This function is swapped out when the test suite
- runs, with a compatible version that uses
- warnings.warn_explicit, so that the warnings registry can
- be controlled.
- """
- if isinstance(msg, basestring):
- warnings.warn(msg, exc.SAWarning, stacklevel=stacklevel)
- else:
- warnings.warn(msg, stacklevel=stacklevel)
-_SQLA_RE = re.compile(r'sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py')
-_UNITTEST_RE = re.compile(r'unit(?:2|test2?/)')
-def chop_traceback(tb, exclude_prefix=_UNITTEST_RE, exclude_suffix=_SQLA_RE):
- """Chop extraneous lines off beginning and end of a traceback.
- :param tb:
- a list of traceback lines as returned by ``traceback.format_stack()``
- :param exclude_prefix:
- a regular expression object matching lines to skip at beginning of ``tb``
- :param exclude_suffix:
- a regular expression object matching lines to skip at end of ``tb``
- """
- start = 0
- end = len(tb) - 1
- while start <= end and exclude_prefix.search(tb[start]):
- start += 1
- while start <= end and exclude_suffix.search(tb[end]):
- end -= 1
- return tb[start:end+1]
-NoneType = type(None)