aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/pool.py')
-rwxr-xr-xlib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/pool.py958
1 files changed, 0 insertions, 958 deletions
diff --git a/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/pool.py b/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/pool.py
deleted file mode 100755
index 2edafbf3..00000000
--- a/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/pool.py
+++ /dev/null
@@ -1,958 +0,0 @@
-# sqlalchemy/pool.py
-# Copyright (C) 2005-2011 the SQLAlchemy authors and contributors <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-
-"""Connection pooling for DB-API connections.
-
-Provides a number of connection pool implementations for a variety of
-usage scenarios and thread behavior requirements imposed by the
-application, DB-API or database itself.
-
-Also provides a DB-API 2.0 connection proxying mechanism allowing
-regular DB-API connect() methods to be transparently managed by a
-SQLAlchemy connection pool.
-"""
-
-import weakref, time, traceback
-
-from sqlalchemy import exc, log, event, events, interfaces, util
-from sqlalchemy.util import queue as sqla_queue
-from sqlalchemy.util import threading, memoized_property, \
- chop_traceback
-
-proxies = {}
-
-def manage(module, **params):
- """Return a proxy for a DB-API module that automatically
- pools connections.
-
- Given a DB-API 2.0 module and pool management parameters, returns
- a proxy for the module that will automatically pool connections,
- creating new connection pools for each distinct set of connection
- arguments sent to the decorated module's connect() function.
-
- :param module: a DB-API 2.0 database module
-
- :param poolclass: the class used by the pool module to provide
- pooling. Defaults to :class:`.QueuePool`.
-
- :param \*\*params: will be passed through to *poolclass*
-
- """
- try:
- return proxies[module]
- except KeyError:
- return proxies.setdefault(module, _DBProxy(module, **params))
-
-def clear_managers():
- """Remove all current DB-API 2.0 managers.
-
- All pools and connections are disposed.
- """
-
- for manager in proxies.itervalues():
- manager.close()
- proxies.clear()
-
-
-class Pool(log.Identified):
- """Abstract base class for connection pools."""
-
- def __init__(self,
- creator, recycle=-1, echo=None,
- use_threadlocal=False,
- logging_name=None,
- reset_on_return=True,
- listeners=None,
- events=None,
- _dispatch=None):
- """
- Construct a Pool.
-
- :param creator: a callable function that returns a DB-API
- connection object. The function will be called with
- parameters.
-
- :param recycle: If set to non -1, number of seconds between
- connection recycling, which means upon checkout, if this
- timeout is surpassed the connection will be closed and
- replaced with a newly opened connection. Defaults to -1.
-
- :param logging_name: String identifier which will be used within
- the "name" field of logging records generated within the
- "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
- id.
-
- :param echo: If True, connections being pulled and retrieved
- from the pool will be logged to the standard output, as well
- as pool sizing information. Echoing can also be achieved by
- enabling logging for the "sqlalchemy.pool"
- namespace. Defaults to False.
-
- :param use_threadlocal: If set to True, repeated calls to
- :meth:`connect` within the same application thread will be
- guaranteed to return the same connection object, if one has
- already been retrieved from the pool and has not been
- returned yet. Offers a slight performance advantage at the
- cost of individual transactions by default. The
- :meth:`unique_connection` method is provided to bypass the
- threadlocal behavior installed into :meth:`connect`.
-
- :param reset_on_return: If true, reset the database state of
- connections returned to the pool. This is typically a
- ROLLBACK to release locks and transaction resources.
- Disable at your own peril. Defaults to True.
-
- :param events: a list of 2-tuples, each of the form
- ``(callable, target)`` which will be passed to event.listen()
- upon construction. Provided here so that event listeners
- can be assigned via ``create_engine`` before dialect-level
- listeners are applied.
-
- :param listeners: Deprecated. A list of
- :class:`~sqlalchemy.interfaces.PoolListener`-like objects or
- dictionaries of callables that receive events when DB-API
- connections are created, checked out and checked in to the
- pool. This has been superseded by
- :func:`~sqlalchemy.event.listen`.
-
- """
- if logging_name:
- self.logging_name = self._orig_logging_name = logging_name
- else:
- self._orig_logging_name = None
-
- log.instance_logger(self, echoflag=echo)
- self._threadconns = threading.local()
- self._creator = creator
- self._recycle = recycle
- self._use_threadlocal = use_threadlocal
- self._reset_on_return = reset_on_return
- self.echo = echo
- if _dispatch:
- self.dispatch._update(_dispatch, only_propagate=False)
- if events:
- for fn, target in events:
- event.listen(self, target, fn)
- if listeners:
- util.warn_deprecated(
- "The 'listeners' argument to Pool (and "
- "create_engine()) is deprecated. Use event.listen().")
- for l in listeners:
- self.add_listener(l)
-
- dispatch = event.dispatcher(events.PoolEvents)
-
- @util.deprecated(2.7, "Pool.add_listener is deprecated. Use event.listen()")
- def add_listener(self, listener):
- """Add a :class:`.PoolListener`-like object to this pool.
-
- ``listener`` may be an object that implements some or all of
- PoolListener, or a dictionary of callables containing implementations
- of some or all of the named methods in PoolListener.
-
- """
- interfaces.PoolListener._adapt_listener(self, listener)
-
- def unique_connection(self):
- """Produce a DBAPI connection that is not referenced by any
- thread-local context.
-
- This method is different from :meth:`.Pool.connect` only if the
- ``use_threadlocal`` flag has been set to ``True``.
-
- """
-
- return _ConnectionFairy(self).checkout()
-
- def _create_connection(self):
- """Called by subclasses to create a new ConnectionRecord."""
-
- return _ConnectionRecord(self)
-
- def recreate(self):
- """Return a new :class:`.Pool`, of the same class as this one
- and configured with identical creation arguments.
-
- This method is used in conjunection with :meth:`dispose`
- to close out an entire :class:`.Pool` and create a new one in
- its place.
-
- """
-
- raise NotImplementedError()
-
- def dispose(self):
- """Dispose of this pool.
-
- This method leaves the possibility of checked-out connections
- remaining open, It is advised to not reuse the pool once dispose()
- is called, and to instead use a new pool constructed by the
- recreate() method.
-
- """
-
- raise NotImplementedError()
-
- def connect(self):
- """Return a DBAPI connection from the pool.
-
- The connection is instrumented such that when its
- ``close()`` method is called, the connection will be returned to
- the pool.
-
- """
- if not self._use_threadlocal:
- return _ConnectionFairy(self).checkout()
-
- try:
- rec = self._threadconns.current()
- if rec:
- return rec.checkout()
- except AttributeError:
- pass
-
- agent = _ConnectionFairy(self)
- self._threadconns.current = weakref.ref(agent)
- return agent.checkout()
-
- def _return_conn(self, record):
- """Given a _ConnectionRecord, return it to the :class:`.Pool`.
-
- This method is called when an instrumented DBAPI connection
- has its ``close()`` method called.
-
- """
- if self._use_threadlocal:
- try:
- del self._threadconns.current
- except AttributeError:
- pass
- self._do_return_conn(record)
-
- def _do_get(self):
- """Implementation for :meth:`get`, supplied by subclasses."""
-
- raise NotImplementedError()
-
- def _do_return_conn(self, conn):
- """Implementation for :meth:`return_conn`, supplied by subclasses."""
-
- raise NotImplementedError()
-
- def status(self):
- raise NotImplementedError()
-
-
-class _ConnectionRecord(object):
- finalize_callback = None
-
- def __init__(self, pool):
- self.__pool = pool
- self.connection = self.__connect()
- self.info = {}
-
- pool.dispatch.first_connect.exec_once(self.connection, self)
- pool.dispatch.connect(self.connection, self)
-
- def close(self):
- if self.connection is not None:
- self.__pool.logger.debug("Closing connection %r", self.connection)
- try:
- self.connection.close()
- except (SystemExit, KeyboardInterrupt):
- raise
- except:
- self.__pool.logger.debug("Exception closing connection %r",
- self.connection)
-
- def invalidate(self, e=None):
- if e is not None:
- self.__pool.logger.info(
- "Invalidate connection %r (reason: %s:%s)",
- self.connection, e.__class__.__name__, e)
- else:
- self.__pool.logger.info(
- "Invalidate connection %r", self.connection)
- self.__close()
- self.connection = None
-
- def get_connection(self):
- if self.connection is None:
- self.connection = self.__connect()
- self.info.clear()
- if self.__pool.dispatch.connect:
- self.__pool.dispatch.connect(self.connection, self)
- elif self.__pool._recycle > -1 and \
- time.time() - self.starttime > self.__pool._recycle:
- self.__pool.logger.info(
- "Connection %r exceeded timeout; recycling",
- self.connection)
- self.__close()
- self.connection = self.__connect()
- self.info.clear()
- if self.__pool.dispatch.connect:
- self.__pool.dispatch.connect(self.connection, self)
- return self.connection
-
- def __close(self):
- try:
- self.__pool.logger.debug("Closing connection %r", self.connection)
- self.connection.close()
- except (SystemExit, KeyboardInterrupt):
- raise
- except Exception, e:
- self.__pool.logger.debug(
- "Connection %r threw an error on close: %s",
- self.connection, e)
-
- def __connect(self):
- try:
- self.starttime = time.time()
- connection = self.__pool._creator()
- self.__pool.logger.debug("Created new connection %r", connection)
- return connection
- except Exception, e:
- self.__pool.logger.debug("Error on connect(): %s", e)
- raise
-
-
-def _finalize_fairy(connection, connection_record, pool, ref, echo):
- _refs.discard(connection_record)
-
- if ref is not None and \
- connection_record.fairy is not ref:
- return
-
- if connection is not None:
- try:
- if pool._reset_on_return:
- connection.rollback()
- # Immediately close detached instances
- if connection_record is None:
- connection.close()
- except Exception, e:
- if connection_record is not None:
- connection_record.invalidate(e=e)
- if isinstance(e, (SystemExit, KeyboardInterrupt)):
- raise
-
- if connection_record is not None:
- connection_record.fairy = None
- if echo:
- pool.logger.debug("Connection %r being returned to pool",
- connection)
- if connection_record.finalize_callback:
- connection_record.finalize_callback(connection)
- del connection_record.finalize_callback
- if pool.dispatch.checkin:
- pool.dispatch.checkin(connection, connection_record)
- pool._return_conn(connection_record)
-
-_refs = set()
-
-class _ConnectionFairy(object):
- """Proxies a DB-API connection and provides return-on-dereference
- support."""
-
- __slots__ = '_pool', '__counter', 'connection', \
- '_connection_record', '__weakref__', \
- '_detached_info', '_echo'
-
- def __init__(self, pool):
- self._pool = pool
- self.__counter = 0
- self._echo = _echo = pool._should_log_debug()
- try:
- rec = self._connection_record = pool._do_get()
- conn = self.connection = self._connection_record.get_connection()
- rec.fairy = weakref.ref(
- self,
- lambda ref:_finalize_fairy(conn, rec, pool, ref, _echo)
- )
- _refs.add(rec)
- except:
- # helps with endless __getattr__ loops later on
- self.connection = None
- self._connection_record = None
- raise
- if self._echo:
- self._pool.logger.debug("Connection %r checked out from pool" %
- self.connection)
-
- @property
- def _logger(self):
- return self._pool.logger
-
- @property
- def is_valid(self):
- return self.connection is not None
-
- @property
- def info(self):
- """An info collection unique to this DB-API connection."""
-
- try:
- return self._connection_record.info
- except AttributeError:
- if self.connection is None:
- raise exc.InvalidRequestError("This connection is closed")
- try:
- return self._detached_info
- except AttributeError:
- self._detached_info = value = {}
- return value
-
- def invalidate(self, e=None):
- """Mark this connection as invalidated.
-
- The connection will be immediately closed. The containing
- ConnectionRecord will create a new connection when next used.
- """
-
- if self.connection is None:
- raise exc.InvalidRequestError("This connection is closed")
- if self._connection_record is not None:
- self._connection_record.invalidate(e=e)
- self.connection = None
- self._close()
-
- def cursor(self, *args, **kwargs):
- return self.connection.cursor(*args, **kwargs)
-
- def __getattr__(self, key):
- return getattr(self.connection, key)
-
- def checkout(self):
- if self.connection is None:
- raise exc.InvalidRequestError("This connection is closed")
- self.__counter += 1
-
- if not self._pool.dispatch.checkout or self.__counter != 1:
- return self
-
- # Pool listeners can trigger a reconnection on checkout
- attempts = 2
- while attempts > 0:
- try:
- self._pool.dispatch.checkout(self.connection,
- self._connection_record,
- self)
- return self
- except exc.DisconnectionError, e:
- self._pool.logger.info(
- "Disconnection detected on checkout: %s", e)
- self._connection_record.invalidate(e)
- self.connection = self._connection_record.get_connection()
- attempts -= 1
-
- self._pool.logger.info("Reconnection attempts exhausted on checkout")
- self.invalidate()
- raise exc.InvalidRequestError("This connection is closed")
-
- def detach(self):
- """Separate this connection from its Pool.
-
- This means that the connection will no longer be returned to the
- pool when closed, and will instead be literally closed. The
- containing ConnectionRecord is separated from the DB-API connection,
- and will create a new connection when next used.
-
- Note that any overall connection limiting constraints imposed by a
- Pool implementation may be violated after a detach, as the detached
- connection is removed from the pool's knowledge and control.
- """
-
- if self._connection_record is not None:
- _refs.remove(self._connection_record)
- self._connection_record.fairy = None
- self._connection_record.connection = None
- self._pool._do_return_conn(self._connection_record)
- self._detached_info = \
- self._connection_record.info.copy()
- self._connection_record = None
-
- def close(self):
- self.__counter -= 1
- if self.__counter == 0:
- self._close()
-
- def _close(self):
- _finalize_fairy(self.connection, self._connection_record,
- self._pool, None, self._echo)
- self.connection = None
- self._connection_record = None
-
-class SingletonThreadPool(Pool):
- """A Pool that maintains one connection per thread.
-
- Maintains one connection per each thread, never moving a connection to a
- thread other than the one which it was created in.
-
- Options are the same as those of :class:`.Pool`, as well as:
-
- :param pool_size: The number of threads in which to maintain connections
- at once. Defaults to five.
-
- :class:`.SingletonThreadPool` is used by the SQLite dialect
- automatically when a memory-based database is used.
- See :ref:`sqlite_toplevel`.
-
- """
-
- def __init__(self, creator, pool_size=5, **kw):
- kw['use_threadlocal'] = True
- Pool.__init__(self, creator, **kw)
- self._conn = threading.local()
- self._all_conns = set()
- self.size = pool_size
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return SingletonThreadPool(self._creator,
- pool_size=self.size,
- recycle=self._recycle,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- _dispatch=self.dispatch)
-
- def dispose(self):
- """Dispose of this pool."""
-
- for conn in self._all_conns:
- try:
- conn.close()
- except (SystemExit, KeyboardInterrupt):
- raise
- except:
- # pysqlite won't even let you close a conn from a thread
- # that didn't create it
- pass
-
- self._all_conns.clear()
-
- def _cleanup(self):
- while len(self._all_conns) > self.size:
- c = self._all_conns.pop()
- c.close()
-
- def status(self):
- return "SingletonThreadPool id:%d size: %d" % \
- (id(self), len(self._all_conns))
-
- def _do_return_conn(self, conn):
- pass
-
- def _do_get(self):
- try:
- c = self._conn.current()
- if c:
- return c
- except AttributeError:
- pass
- c = self._create_connection()
- self._conn.current = weakref.ref(c)
- self._all_conns.add(c)
- if len(self._all_conns) > self.size:
- self._cleanup()
- return c
-
-class QueuePool(Pool):
- """A :class:`.Pool` that imposes a limit on the number of open connections.
-
- :class:`.QueuePool` is the default pooling implementation used for
- all :class:`.Engine` objects, unless the SQLite dialect is in use.
-
- """
-
- def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
- **kw):
- """
- Construct a QueuePool.
-
- :param creator: a callable function that returns a DB-API
- connection object. The function will be called with
- parameters.
-
- :param pool_size: The size of the pool to be maintained,
- defaults to 5. This is the largest number of connections that
- will be kept persistently in the pool. Note that the pool
- begins with no connections; once this number of connections
- is requested, that number of connections will remain.
- ``pool_size`` can be set to 0 to indicate no size limit; to
- disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
- instead.
-
- :param max_overflow: The maximum overflow size of the
- pool. When the number of checked-out connections reaches the
- size set in pool_size, additional connections will be
- returned up to this limit. When those additional connections
- are returned to the pool, they are disconnected and
- discarded. It follows then that the total number of
- simultaneous connections the pool will allow is pool_size +
- `max_overflow`, and the total number of "sleeping"
- connections the pool will allow is pool_size. `max_overflow`
- can be set to -1 to indicate no overflow limit; no limit
- will be placed on the total number of concurrent
- connections. Defaults to 10.
-
- :param timeout: The number of seconds to wait before giving up
- on returning a connection. Defaults to 30.
-
- :param recycle: If set to non -1, number of seconds between
- connection recycling, which means upon checkout, if this
- timeout is surpassed the connection will be closed and
- replaced with a newly opened connection. Defaults to -1.
-
- :param echo: If True, connections being pulled and retrieved
- from the pool will be logged to the standard output, as well
- as pool sizing information. Echoing can also be achieved by
- enabling logging for the "sqlalchemy.pool"
- namespace. Defaults to False.
-
- :param use_threadlocal: If set to True, repeated calls to
- :meth:`connect` within the same application thread will be
- guaranteed to return the same connection object, if one has
- already been retrieved from the pool and has not been
- returned yet. Offers a slight performance advantage at the
- cost of individual transactions by default. The
- :meth:`unique_connection` method is provided to bypass the
- threadlocal behavior installed into :meth:`connect`.
-
- :param reset_on_return: If true, reset the database state of
- connections returned to the pool. This is typically a
- ROLLBACK to release locks and transaction resources.
- Disable at your own peril. Defaults to True.
-
- :param listeners: A list of
- :class:`~sqlalchemy.interfaces.PoolListener`-like objects or
- dictionaries of callables that receive events when DB-API
- connections are created, checked out and checked in to the
- pool.
-
- """
- Pool.__init__(self, creator, **kw)
- self._pool = sqla_queue.Queue(pool_size)
- self._overflow = 0 - pool_size
- self._max_overflow = max_overflow
- self._timeout = timeout
- self._overflow_lock = self._max_overflow > -1 and \
- threading.Lock() or None
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return QueuePool(self._creator, pool_size=self._pool.maxsize,
- max_overflow=self._max_overflow,
- timeout=self._timeout,
- recycle=self._recycle, echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- _dispatch=self.dispatch)
-
- def _do_return_conn(self, conn):
- try:
- self._pool.put(conn, False)
- except sqla_queue.Full:
- conn.close()
- if self._overflow_lock is None:
- self._overflow -= 1
- else:
- self._overflow_lock.acquire()
- try:
- self._overflow -= 1
- finally:
- self._overflow_lock.release()
-
- def _do_get(self):
- try:
- wait = self._max_overflow > -1 and \
- self._overflow >= self._max_overflow
- return self._pool.get(wait, self._timeout)
- except sqla_queue.Empty:
- if self._max_overflow > -1 and \
- self._overflow >= self._max_overflow:
- if not wait:
- return self._do_get()
- else:
- raise exc.TimeoutError(
- "QueuePool limit of size %d overflow %d reached, "
- "connection timed out, timeout %d" %
- (self.size(), self.overflow(), self._timeout))
-
- if self._overflow_lock is not None:
- self._overflow_lock.acquire()
-
- if self._max_overflow > -1 and \
- self._overflow >= self._max_overflow:
- if self._overflow_lock is not None:
- self._overflow_lock.release()
- return self._do_get()
-
- try:
- con = self._create_connection()
- self._overflow += 1
- finally:
- if self._overflow_lock is not None:
- self._overflow_lock.release()
- return con
-
- def dispose(self):
- while True:
- try:
- conn = self._pool.get(False)
- conn.close()
- except sqla_queue.Empty:
- break
-
- self._overflow = 0 - self.size()
- self.logger.info("Pool disposed. %s", self.status())
-
- def status(self):
- return "Pool size: %d Connections in pool: %d "\
- "Current Overflow: %d Current Checked out "\
- "connections: %d" % (self.size(),
- self.checkedin(),
- self.overflow(),
- self.checkedout())
-
- def size(self):
- return self._pool.maxsize
-
- def checkedin(self):
- return self._pool.qsize()
-
- def overflow(self):
- return self._overflow
-
- def checkedout(self):
- return self._pool.maxsize - self._pool.qsize() + self._overflow
-
-class NullPool(Pool):
- """A Pool which does not pool connections.
-
- Instead it literally opens and closes the underlying DB-API connection
- per each connection open/close.
-
- Reconnect-related functions such as ``recycle`` and connection
- invalidation are not supported by this Pool implementation, since
- no connections are held persistently.
-
- :class:`.NullPool` is used by the SQlite dilalect automatically
- when a file-based database is used (as of SQLAlchemy 0.7).
- See :ref:`sqlite_toplevel`.
-
- """
-
- def status(self):
- return "NullPool"
-
- def _do_return_conn(self, conn):
- conn.close()
-
- def _do_get(self):
- return self._create_connection()
-
- def recreate(self):
- self.logger.info("Pool recreating")
-
- return NullPool(self._creator,
- recycle=self._recycle,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- _dispatch=self.dispatch)
-
- def dispose(self):
- pass
-
-
-class StaticPool(Pool):
- """A Pool of exactly one connection, used for all requests.
-
- Reconnect-related functions such as ``recycle`` and connection
- invalidation (which is also used to support auto-reconnect) are not
- currently supported by this Pool implementation but may be implemented
- in a future release.
-
- """
-
- @memoized_property
- def _conn(self):
- return self._creator()
-
- @memoized_property
- def connection(self):
- return _ConnectionRecord(self)
-
- def status(self):
- return "StaticPool"
-
- def dispose(self):
- if '_conn' in self.__dict__:
- self._conn.close()
- self._conn = None
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return self.__class__(creator=self._creator,
- recycle=self._recycle,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- _dispatch=self.dispatch)
-
- def _create_connection(self):
- return self._conn
-
- def _do_return_conn(self, conn):
- pass
-
- def _do_get(self):
- return self.connection
-
-class AssertionPool(Pool):
- """A :class:`.Pool` that allows at most one checked out connection at any given
- time.
-
- This will raise an exception if more than one connection is checked out
- at a time. Useful for debugging code that is using more connections
- than desired.
-
- :class:`.AssertionPool` also logs a traceback of where
- the original connection was checked out, and reports
- this in the assertion error raised (new in 0.7).
-
- """
- def __init__(self, *args, **kw):
- self._conn = None
- self._checked_out = False
- self._store_traceback = kw.pop('store_traceback', True)
- self._checkout_traceback = None
- Pool.__init__(self, *args, **kw)
-
- def status(self):
- return "AssertionPool"
-
- def _do_return_conn(self, conn):
- if not self._checked_out:
- raise AssertionError("connection is not checked out")
- self._checked_out = False
- assert conn is self._conn
-
- def dispose(self):
- self._checked_out = False
- if self._conn:
- self._conn.close()
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return AssertionPool(self._creator, echo=self.echo,
- logging_name=self._orig_logging_name,
- _dispatch=self.dispatch)
-
- def _do_get(self):
- if self._checked_out:
- if self._checkout_traceback:
- suffix = ' at:\n%s' % ''.join(
- chop_traceback(self._checkout_traceback))
- else:
- suffix = ''
- raise AssertionError("connection is already checked out" + suffix)
-
- if not self._conn:
- self._conn = self._create_connection()
-
- self._checked_out = True
- if self._store_traceback:
- self._checkout_traceback = traceback.format_stack()
- return self._conn
-
-class _DBProxy(object):
- """Layers connection pooling behavior on top of a standard DB-API module.
-
- Proxies a DB-API 2.0 connect() call to a connection pool keyed to the
- specific connect parameters. Other functions and attributes are delegated
- to the underlying DB-API module.
- """
-
- def __init__(self, module, poolclass=QueuePool, **kw):
- """Initializes a new proxy.
-
- module
- a DB-API 2.0 module
-
- poolclass
- a Pool class, defaulting to QueuePool
-
- Other parameters are sent to the Pool object's constructor.
-
- """
-
- self.module = module
- self.kw = kw
- self.poolclass = poolclass
- self.pools = {}
- self._create_pool_mutex = threading.Lock()
-
- def close(self):
- for key in self.pools.keys():
- del self.pools[key]
-
- def __del__(self):
- self.close()
-
- def __getattr__(self, key):
- return getattr(self.module, key)
-
- def get_pool(self, *args, **kw):
- key = self._serialize(*args, **kw)
- try:
- return self.pools[key]
- except KeyError:
- self._create_pool_mutex.acquire()
- try:
- if key not in self.pools:
- pool = self.poolclass(lambda:
- self.module.connect(*args, **kw), **self.kw)
- self.pools[key] = pool
- return pool
- else:
- return self.pools[key]
- finally:
- self._create_pool_mutex.release()
-
- def connect(self, *args, **kw):
- """Activate a connection to the database.
-
- Connect to the database using this DBProxy's module and the given
- connect arguments. If the arguments match an existing pool, the
- connection will be returned from the pool's current thread-local
- connection instance, or if there is no thread-local connection
- instance it will be checked out from the set of pooled connections.
-
- If the pool has no available connections and allows new connections
- to be created, a new database connection will be made.
-
- """
-
- return self.get_pool(*args, **kw).connect()
-
- def dispose(self, *args, **kw):
- """Dispose the pool referenced by the given connect arguments."""
-
- key = self._serialize(*args, **kw)
- try:
- del self.pools[key]
- except KeyError:
- pass
-
- def _serialize(self, *args, **kw):
- return tuple(
- list(args) +
- [(k, kw[k]) for k in sorted(kw)]
- )