diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/enterprise/adbapi.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/enterprise/adbapi.py | 483 |
1 files changed, 0 insertions, 483 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/enterprise/adbapi.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/enterprise/adbapi.py deleted file mode 100755 index 0531d2da..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/enterprise/adbapi.py +++ /dev/null @@ -1,483 +0,0 @@ -# -*- test-case-name: twisted.test.test_adbapi -*- -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -An asynchronous mapping to U{DB-API 2.0<http://www.python.org/topics/database/DatabaseAPI-2.0.html>}. -""" - -import sys - -from twisted.internet import threads -from twisted.python import reflect, log -from twisted.python.deprecate import deprecated -from twisted.python.versions import Version - - - -class ConnectionLost(Exception): - """ - This exception means that a db connection has been lost. Client code may - try again. - """ - - - -class Connection(object): - """ - A wrapper for a DB-API connection instance. - - The wrapper passes almost everything to the wrapped connection and so has - the same API. However, the Connection knows about its pool and also - handle reconnecting should when the real connection dies. - """ - - def __init__(self, pool): - self._pool = pool - self._connection = None - self.reconnect() - - def close(self): - # The way adbapi works right now means that closing a connection is - # a really bad thing as it leaves a dead connection associated with - # a thread in the thread pool. - # Really, I think closing a pooled connection should return it to the - # pool but that's handled by the runWithConnection method already so, - # rather than upsetting anyone by raising an exception, let's ignore - # the request - pass - - def rollback(self): - if not self._pool.reconnect: - self._connection.rollback() - return - - try: - self._connection.rollback() - curs = self._connection.cursor() - curs.execute(self._pool.good_sql) - curs.close() - self._connection.commit() - return - except: - log.err(None, "Rollback failed") - - self._pool.disconnect(self._connection) - - if self._pool.noisy: - log.msg("Connection lost.") - - raise ConnectionLost() - - def reconnect(self): - if self._connection is not None: - self._pool.disconnect(self._connection) - self._connection = self._pool.connect() - - def __getattr__(self, name): - return getattr(self._connection, name) - - -class Transaction: - """A lightweight wrapper for a DB-API 'cursor' object. - - Relays attribute access to the DB cursor. That is, you can call - execute(), fetchall(), etc., and they will be called on the - underlying DB-API cursor object. Attributes will also be - retrieved from there. - """ - _cursor = None - - def __init__(self, pool, connection): - self._pool = pool - self._connection = connection - self.reopen() - - def close(self): - _cursor = self._cursor - self._cursor = None - _cursor.close() - - def reopen(self): - if self._cursor is not None: - self.close() - - try: - self._cursor = self._connection.cursor() - return - except: - if not self._pool.reconnect: - raise - else: - log.err(None, "Cursor creation failed") - - if self._pool.noisy: - log.msg('Connection lost, reconnecting') - - self.reconnect() - self._cursor = self._connection.cursor() - - def reconnect(self): - self._connection.reconnect() - self._cursor = None - - def __getattr__(self, name): - return getattr(self._cursor, name) - - -class ConnectionPool: - """ - Represent a pool of connections to a DB-API 2.0 compliant database. - - @ivar connectionFactory: factory for connections, default to L{Connection}. - @type connectionFactory: any callable. - - @ivar transactionFactory: factory for transactions, default to - L{Transaction}. - @type transactionFactory: any callable - - @ivar shutdownID: C{None} or a handle on the shutdown event trigger - which will be used to stop the connection pool workers when the - reactor stops. - - @ivar _reactor: The reactor which will be used to schedule startup and - shutdown events. - @type _reactor: L{IReactorCore} provider - """ - - CP_ARGS = "min max name noisy openfun reconnect good_sql".split() - - noisy = False # if true, generate informational log messages - min = 3 # minimum number of connections in pool - max = 5 # maximum number of connections in pool - name = None # Name to assign to thread pool for debugging - openfun = None # A function to call on new connections - reconnect = False # reconnect when connections fail - good_sql = 'select 1' # a query which should always succeed - - running = False # true when the pool is operating - connectionFactory = Connection - transactionFactory = Transaction - - # Initialize this to None so it's available in close() even if start() - # never runs. - shutdownID = None - - def __init__(self, dbapiName, *connargs, **connkw): - """Create a new ConnectionPool. - - Any positional or keyword arguments other than those documented here - are passed to the DB-API object when connecting. Use these arguments to - pass database names, usernames, passwords, etc. - - @param dbapiName: an import string to use to obtain a DB-API compatible - module (e.g. 'pyPgSQL.PgSQL') - - @param cp_min: the minimum number of connections in pool (default 3) - - @param cp_max: the maximum number of connections in pool (default 5) - - @param cp_noisy: generate informational log messages during operation - (default False) - - @param cp_openfun: a callback invoked after every connect() on the - underlying DB-API object. The callback is passed a - new DB-API connection object. This callback can - setup per-connection state such as charset, - timezone, etc. - - @param cp_reconnect: detect connections which have failed and reconnect - (default False). Failed connections may result in - ConnectionLost exceptions, which indicate the - query may need to be re-sent. - - @param cp_good_sql: an sql query which should always succeed and change - no state (default 'select 1') - - @param cp_reactor: use this reactor instead of the global reactor - (added in Twisted 10.2). - @type cp_reactor: L{IReactorCore} provider - """ - - self.dbapiName = dbapiName - self.dbapi = reflect.namedModule(dbapiName) - - if getattr(self.dbapi, 'apilevel', None) != '2.0': - log.msg('DB API module not DB API 2.0 compliant.') - - if getattr(self.dbapi, 'threadsafety', 0) < 1: - log.msg('DB API module not sufficiently thread-safe.') - - reactor = connkw.pop('cp_reactor', None) - if reactor is None: - from twisted.internet import reactor - self._reactor = reactor - - self.connargs = connargs - self.connkw = connkw - - for arg in self.CP_ARGS: - cp_arg = 'cp_%s' % arg - if cp_arg in connkw: - setattr(self, arg, connkw[cp_arg]) - del connkw[cp_arg] - - self.min = min(self.min, self.max) - self.max = max(self.min, self.max) - - self.connections = {} # all connections, hashed on thread id - - # these are optional so import them here - from twisted.python import threadpool - import thread - - self.threadID = thread.get_ident - self.threadpool = threadpool.ThreadPool(self.min, self.max) - self.startID = self._reactor.callWhenRunning(self._start) - - - def _start(self): - self.startID = None - return self.start() - - - def start(self): - """ - Start the connection pool. - - If you are using the reactor normally, this function does *not* - need to be called. - """ - if not self.running: - self.threadpool.start() - self.shutdownID = self._reactor.addSystemEventTrigger( - 'during', 'shutdown', self.finalClose) - self.running = True - - - def runWithConnection(self, func, *args, **kw): - """ - Execute a function with a database connection and return the result. - - @param func: A callable object of one argument which will be executed - in a thread with a connection from the pool. It will be passed as - its first argument a L{Connection} instance (whose interface is - mostly identical to that of a connection object for your DB-API - module of choice), and its results will be returned as a Deferred. - If the method raises an exception the transaction will be rolled - back. Otherwise, the transaction will be committed. B{Note} that - this function is B{not} run in the main thread: it must be - threadsafe. - - @param *args: positional arguments to be passed to func - - @param **kw: keyword arguments to be passed to func - - @return: a Deferred which will fire the return value of - C{func(Transaction(...), *args, **kw)}, or a Failure. - """ - from twisted.internet import reactor - return threads.deferToThreadPool(reactor, self.threadpool, - self._runWithConnection, - func, *args, **kw) - - - def _runWithConnection(self, func, *args, **kw): - conn = self.connectionFactory(self) - try: - result = func(conn, *args, **kw) - conn.commit() - return result - except: - excType, excValue, excTraceback = sys.exc_info() - try: - conn.rollback() - except: - log.err(None, "Rollback failed") - raise excType, excValue, excTraceback - - - def runInteraction(self, interaction, *args, **kw): - """ - Interact with the database and return the result. - - The 'interaction' is a callable object which will be executed - in a thread using a pooled connection. It will be passed an - L{Transaction} object as an argument (whose interface is - identical to that of the database cursor for your DB-API - module of choice), and its results will be returned as a - Deferred. If running the method raises an exception, the - transaction will be rolled back. If the method returns a - value, the transaction will be committed. - - NOTE that the function you pass is *not* run in the main - thread: you may have to worry about thread-safety in the - function you pass to this if it tries to use non-local - objects. - - @param interaction: a callable object whose first argument - is an L{adbapi.Transaction}. - - @param *args: additional positional arguments to be passed - to interaction - - @param **kw: keyword arguments to be passed to interaction - - @return: a Deferred which will fire the return value of - 'interaction(Transaction(...), *args, **kw)', or a Failure. - """ - from twisted.internet import reactor - return threads.deferToThreadPool(reactor, self.threadpool, - self._runInteraction, - interaction, *args, **kw) - - - def runQuery(self, *args, **kw): - """Execute an SQL query and return the result. - - A DB-API cursor will will be invoked with cursor.execute(*args, **kw). - The exact nature of the arguments will depend on the specific flavor - of DB-API being used, but the first argument in *args be an SQL - statement. The result of a subsequent cursor.fetchall() will be - fired to the Deferred which is returned. If either the 'execute' or - 'fetchall' methods raise an exception, the transaction will be rolled - back and a Failure returned. - - The *args and **kw arguments will be passed to the DB-API cursor's - 'execute' method. - - @return: a Deferred which will fire the return value of a DB-API - cursor's 'fetchall' method, or a Failure. - """ - return self.runInteraction(self._runQuery, *args, **kw) - - - def runOperation(self, *args, **kw): - """Execute an SQL query and return None. - - A DB-API cursor will will be invoked with cursor.execute(*args, **kw). - The exact nature of the arguments will depend on the specific flavor - of DB-API being used, but the first argument in *args will be an SQL - statement. This method will not attempt to fetch any results from the - query and is thus suitable for INSERT, DELETE, and other SQL statements - which do not return values. If the 'execute' method raises an - exception, the transaction will be rolled back and a Failure returned. - - The args and kw arguments will be passed to the DB-API cursor's - 'execute' method. - - return: a Deferred which will fire None or a Failure. - """ - return self.runInteraction(self._runOperation, *args, **kw) - - - def close(self): - """ - Close all pool connections and shutdown the pool. - """ - if self.shutdownID: - self._reactor.removeSystemEventTrigger(self.shutdownID) - self.shutdownID = None - if self.startID: - self._reactor.removeSystemEventTrigger(self.startID) - self.startID = None - self.finalClose() - - def finalClose(self): - """This should only be called by the shutdown trigger.""" - - self.shutdownID = None - self.threadpool.stop() - self.running = False - for conn in self.connections.values(): - self._close(conn) - self.connections.clear() - - def connect(self): - """Return a database connection when one becomes available. - - This method blocks and should be run in a thread from the internal - threadpool. Don't call this method directly from non-threaded code. - Using this method outside the external threadpool may exceed the - maximum number of connections in the pool. - - @return: a database connection from the pool. - """ - - tid = self.threadID() - conn = self.connections.get(tid) - if conn is None: - if self.noisy: - log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, - self.connargs or '', - self.connkw or '')) - conn = self.dbapi.connect(*self.connargs, **self.connkw) - if self.openfun != None: - self.openfun(conn) - self.connections[tid] = conn - return conn - - def disconnect(self, conn): - """Disconnect a database connection associated with this pool. - - Note: This function should only be used by the same thread which - called connect(). As with connect(), this function is not used - in normal non-threaded twisted code. - """ - tid = self.threadID() - if conn is not self.connections.get(tid): - raise Exception("wrong connection for thread") - if conn is not None: - self._close(conn) - del self.connections[tid] - - - def _close(self, conn): - if self.noisy: - log.msg('adbapi closing: %s' % (self.dbapiName,)) - try: - conn.close() - except: - log.err(None, "Connection close failed") - - - def _runInteraction(self, interaction, *args, **kw): - conn = self.connectionFactory(self) - trans = self.transactionFactory(self, conn) - try: - result = interaction(trans, *args, **kw) - trans.close() - conn.commit() - return result - except: - excType, excValue, excTraceback = sys.exc_info() - try: - conn.rollback() - except: - log.err(None, "Rollback failed") - raise excType, excValue, excTraceback - - - def _runQuery(self, trans, *args, **kw): - trans.execute(*args, **kw) - return trans.fetchall() - - def _runOperation(self, trans, *args, **kw): - trans.execute(*args, **kw) - - def __getstate__(self): - return {'dbapiName': self.dbapiName, - 'min': self.min, - 'max': self.max, - 'noisy': self.noisy, - 'reconnect': self.reconnect, - 'good_sql': self.good_sql, - 'connargs': self.connargs, - 'connkw': self.connkw} - - def __setstate__(self, state): - self.__dict__ = state - self.__init__(self.dbapiName, *self.connargs, **self.connkw) - - -__all__ = ['Transaction', 'ConnectionPool'] |