diff options
Diffstat (limited to 'lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/engine/base.py')
-rwxr-xr-x | lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/engine/base.py | 2995 |
1 files changed, 0 insertions, 2995 deletions
diff --git a/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/engine/base.py b/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/engine/base.py deleted file mode 100755 index 31fdd7fb..00000000 --- a/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/engine/base.py +++ /dev/null @@ -1,2995 +0,0 @@ -# engine/base.py -# Copyright (C) 2005-2011 the SQLAlchemy authors and contributors <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: http://www.opensource.org/licenses/mit-license.php - - -"""Basic components for SQL execution and interfacing with DB-API. - -Defines the basic components used to interface DB-API modules with -higher-level statement-construction, connection-management, execution -and result contexts. -""" - -__all__ = [ - 'BufferedColumnResultProxy', 'BufferedColumnRow', - 'BufferedRowResultProxy','Compiled', 'Connectable', 'Connection', - 'Dialect', 'Engine','ExecutionContext', 'NestedTransaction', - 'ResultProxy', 'RootTransaction','RowProxy', 'SchemaIterator', - 'StringIO', 'Transaction', 'TwoPhaseTransaction', - 'connection_memoize'] - -import inspect, StringIO, sys, operator -from itertools import izip -from sqlalchemy import exc, schema, util, types, log, interfaces, \ - event, events -from sqlalchemy.sql import expression -from sqlalchemy import processors -import collections - -class Dialect(object): - """Define the behavior of a specific database and DB-API combination. - - Any aspect of metadata definition, SQL query generation, - execution, result-set handling, or anything else which varies - between databases is defined under the general category of the - Dialect. The Dialect acts as a factory for other - database-specific object implementations including - ExecutionContext, Compiled, DefaultGenerator, and TypeEngine. - - All Dialects implement the following attributes: - - name - identifying name for the dialect from a DBAPI-neutral point of view - (i.e. 'sqlite') - - driver - identifying name for the dialect's DBAPI - - positional - True if the paramstyle for this Dialect is positional. - - paramstyle - the paramstyle to be used (some DB-APIs support multiple - paramstyles). - - convert_unicode - True if Unicode conversion should be applied to all ``str`` - types. - - encoding - type of encoding to use for unicode, usually defaults to - 'utf-8'. - - statement_compiler - a :class:`~Compiled` class used to compile SQL statements - - ddl_compiler - a :class:`~Compiled` class used to compile DDL statements - - server_version_info - a tuple containing a version number for the DB backend in use. - This value is only available for supporting dialects, and is - typically populated during the initial connection to the database. - - default_schema_name - the name of the default schema. This value is only available for - supporting dialects, and is typically populated during the - initial connection to the database. - - execution_ctx_cls - a :class:`.ExecutionContext` class used to handle statement execution - - execute_sequence_format - either the 'tuple' or 'list' type, depending on what cursor.execute() - accepts for the second argument (they vary). - - preparer - a :class:`~sqlalchemy.sql.compiler.IdentifierPreparer` class used to - quote identifiers. - - supports_alter - ``True`` if the database supports ``ALTER TABLE``. - - max_identifier_length - The maximum length of identifier names. - - supports_unicode_statements - Indicate whether the DB-API can receive SQL statements as Python - unicode strings - - supports_unicode_binds - Indicate whether the DB-API can receive string bind parameters - as Python unicode strings - - supports_sane_rowcount - Indicate whether the dialect properly implements rowcount for - ``UPDATE`` and ``DELETE`` statements. - - supports_sane_multi_rowcount - Indicate whether the dialect properly implements rowcount for - ``UPDATE`` and ``DELETE`` statements when executed via - executemany. - - preexecute_autoincrement_sequences - True if 'implicit' primary key functions must be executed separately - in order to get their value. This is currently oriented towards - Postgresql. - - implicit_returning - use RETURNING or equivalent during INSERT execution in order to load - newly generated primary keys and other column defaults in one execution, - which are then available via inserted_primary_key. - If an insert statement has returning() specified explicitly, - the "implicit" functionality is not used and inserted_primary_key - will not be available. - - dbapi_type_map - A mapping of DB-API type objects present in this Dialect's - DB-API implementation mapped to TypeEngine implementations used - by the dialect. - - This is used to apply types to result sets based on the DB-API - types present in cursor.description; it only takes effect for - result sets against textual statements where no explicit - typemap was present. - - colspecs - A dictionary of TypeEngine classes from sqlalchemy.types mapped - to subclasses that are specific to the dialect class. This - dictionary is class-level only and is not accessed from the - dialect instance itself. - - supports_default_values - Indicates if the construct ``INSERT INTO tablename DEFAULT - VALUES`` is supported - - supports_sequences - Indicates if the dialect supports CREATE SEQUENCE or similar. - - sequences_optional - If True, indicates if the "optional" flag on the Sequence() construct - should signal to not generate a CREATE SEQUENCE. Applies only to - dialects that support sequences. Currently used only to allow Postgresql - SERIAL to be used on a column that specifies Sequence() for usage on - other backends. - - supports_native_enum - Indicates if the dialect supports a native ENUM construct. - This will prevent types.Enum from generating a CHECK - constraint when that type is used. - - supports_native_boolean - Indicates if the dialect supports a native boolean construct. - This will prevent types.Boolean from generating a CHECK - constraint when that type is used. - - """ - - def create_connect_args(self, url): - """Build DB-API compatible connection arguments. - - Given a :class:`~sqlalchemy.engine.url.URL` object, returns a tuple - consisting of a `*args`/`**kwargs` suitable to send directly - to the dbapi's connect function. - - """ - - raise NotImplementedError() - - @classmethod - def type_descriptor(cls, typeobj): - """Transform a generic type to a dialect-specific type. - - Dialect classes will usually use the - :func:`~sqlalchemy.types.adapt_type` function in the types module to - make this job easy. - - The returned result is cached *per dialect class* so can - contain no dialect-instance state. - - """ - - raise NotImplementedError() - - def initialize(self, connection): - """Called during strategized creation of the dialect with a - connection. - - Allows dialects to configure options based on server version info or - other properties. - - The connection passed here is a SQLAlchemy Connection object, - with full capabilities. - - The initalize() method of the base dialect should be called via - super(). - - """ - - pass - - def reflecttable(self, connection, table, include_columns=None): - """Load table description from the database. - - Given a :class:`.Connection` and a - :class:`~sqlalchemy.schema.Table` object, reflect its columns and - properties from the database. If include_columns (a list or - set) is specified, limit the autoload to the given column - names. - - The default implementation uses the - :class:`~sqlalchemy.engine.reflection.Inspector` interface to - provide the output, building upon the granular table/column/ - constraint etc. methods of :class:`.Dialect`. - - """ - - raise NotImplementedError() - - def get_columns(self, connection, table_name, schema=None, **kw): - """Return information about columns in `table_name`. - - Given a :class:`.Connection`, a string - `table_name`, and an optional string `schema`, return column - information as a list of dictionaries with these keys: - - name - the column's name - - type - [sqlalchemy.types#TypeEngine] - - nullable - boolean - - default - the column's default value - - autoincrement - boolean - - sequence - a dictionary of the form - {'name' : str, 'start' :int, 'increment': int} - - Additional column attributes may be present. - """ - - raise NotImplementedError() - - def get_primary_keys(self, connection, table_name, schema=None, **kw): - """Return information about primary keys in `table_name`. - - Given a :class:`.Connection`, a string - `table_name`, and an optional string `schema`, return primary - key information as a list of column names. - - """ - raise NotImplementedError() - - def get_pk_constraint(self, table_name, schema=None, **kw): - """Return information about the primary key constraint on - table_name`. - - Given a string `table_name`, and an optional string `schema`, return - primary key information as a dictionary with these keys: - - constrained_columns - a list of column names that make up the primary key - - name - optional name of the primary key constraint. - - """ - raise NotImplementedError() - - def get_foreign_keys(self, connection, table_name, schema=None, **kw): - """Return information about foreign_keys in `table_name`. - - Given a :class:`.Connection`, a string - `table_name`, and an optional string `schema`, return foreign - key information as a list of dicts with these keys: - - name - the constraint's name - - constrained_columns - a list of column names that make up the foreign key - - referred_schema - the name of the referred schema - - referred_table - the name of the referred table - - referred_columns - a list of column names in the referred table that correspond to - constrained_columns - """ - - raise NotImplementedError() - - def get_table_names(self, connection, schema=None, **kw): - """Return a list of table names for `schema`.""" - - raise NotImplementedError - - def get_view_names(self, connection, schema=None, **kw): - """Return a list of all view names available in the database. - - schema: - Optional, retrieve names from a non-default schema. - """ - - raise NotImplementedError() - - def get_view_definition(self, connection, view_name, schema=None, **kw): - """Return view definition. - - Given a :class:`.Connection`, a string - `view_name`, and an optional string `schema`, return the view - definition. - """ - - raise NotImplementedError() - - def get_indexes(self, connection, table_name, schema=None, **kw): - """Return information about indexes in `table_name`. - - Given a :class:`.Connection`, a string - `table_name` and an optional string `schema`, return index - information as a list of dictionaries with these keys: - - name - the index's name - - column_names - list of column names in order - - unique - boolean - """ - - raise NotImplementedError() - - def normalize_name(self, name): - """convert the given name to lowercase if it is detected as - case insensitive. - - this method is only used if the dialect defines - requires_name_normalize=True. - - """ - raise NotImplementedError() - - def denormalize_name(self, name): - """convert the given name to a case insensitive identifier - for the backend if it is an all-lowercase name. - - this method is only used if the dialect defines - requires_name_normalize=True. - - """ - raise NotImplementedError() - - def has_table(self, connection, table_name, schema=None): - """Check the existence of a particular table in the database. - - Given a :class:`.Connection` object and a string - `table_name`, return True if the given table (possibly within - the specified `schema`) exists in the database, False - otherwise. - """ - - raise NotImplementedError() - - def has_sequence(self, connection, sequence_name, schema=None): - """Check the existence of a particular sequence in the database. - - Given a :class:`.Connection` object and a string - `sequence_name`, return True if the given sequence exists in - the database, False otherwise. - """ - - raise NotImplementedError() - - def _get_server_version_info(self, connection): - """Retrieve the server version info from the given connection. - - This is used by the default implementation to populate the - "server_version_info" attribute and is called exactly - once upon first connect. - - """ - - raise NotImplementedError() - - def _get_default_schema_name(self, connection): - """Return the string name of the currently selected schema from - the given connection. - - This is used by the default implementation to populate the - "default_schema_name" attribute and is called exactly - once upon first connect. - - """ - - raise NotImplementedError() - - def do_begin(self, connection): - """Provide an implementation of *connection.begin()*, given a - DB-API connection.""" - - raise NotImplementedError() - - def do_rollback(self, connection): - """Provide an implementation of *connection.rollback()*, given - a DB-API connection.""" - - raise NotImplementedError() - - def create_xid(self): - """Create a two-phase transaction ID. - - This id will be passed to do_begin_twophase(), - do_rollback_twophase(), do_commit_twophase(). Its format is - unspecified. - """ - - raise NotImplementedError() - - def do_commit(self, connection): - """Provide an implementation of *connection.commit()*, given a - DB-API connection.""" - - raise NotImplementedError() - - def do_savepoint(self, connection, name): - """Create a savepoint with the given name on a SQLAlchemy - connection.""" - - raise NotImplementedError() - - def do_rollback_to_savepoint(self, connection, name): - """Rollback a SQL Alchemy connection to the named savepoint.""" - - raise NotImplementedError() - - def do_release_savepoint(self, connection, name): - """Release the named savepoint on a SQL Alchemy connection.""" - - raise NotImplementedError() - - def do_begin_twophase(self, connection, xid): - """Begin a two phase transaction on the given connection.""" - - raise NotImplementedError() - - def do_prepare_twophase(self, connection, xid): - """Prepare a two phase transaction on the given connection.""" - - raise NotImplementedError() - - def do_rollback_twophase(self, connection, xid, is_prepared=True, - recover=False): - """Rollback a two phase transaction on the given connection.""" - - raise NotImplementedError() - - def do_commit_twophase(self, connection, xid, is_prepared=True, - recover=False): - """Commit a two phase transaction on the given connection.""" - - raise NotImplementedError() - - def do_recover_twophase(self, connection): - """Recover list of uncommited prepared two phase transaction - identifiers on the given connection.""" - - raise NotImplementedError() - - def do_executemany(self, cursor, statement, parameters, context=None): - """Provide an implementation of *cursor.executemany(statement, - parameters)*.""" - - raise NotImplementedError() - - def do_execute(self, cursor, statement, parameters, context=None): - """Provide an implementation of *cursor.execute(statement, - parameters)*.""" - - raise NotImplementedError() - - def is_disconnect(self, e, connection, cursor): - """Return True if the given DB-API error indicates an invalid - connection""" - - raise NotImplementedError() - - def connect(self): - """return a callable which sets up a newly created DBAPI connection. - - The callable accepts a single argument "conn" which is the - DBAPI connection itself. It has no return value. - - This is used to set dialect-wide per-connection options such as - isolation modes, unicode modes, etc. - - If a callable is returned, it will be assembled into a pool listener - that receives the direct DBAPI connection, with all wrappers removed. - - If None is returned, no listener will be generated. - - """ - return None - - def reset_isolation_level(self, dbapi_conn): - """Given a DBAPI connection, revert its isolation to the default.""" - - raise NotImplementedError() - - def set_isolation_level(self, dbapi_conn, level): - """Given a DBAPI connection, set its isolation level.""" - - raise NotImplementedError() - - def get_isolation_level(self, dbapi_conn): - """Given a DBAPI connection, return its isolation level.""" - - raise NotImplementedError() - - -class ExecutionContext(object): - """A messenger object for a Dialect that corresponds to a single - execution. - - ExecutionContext should have these data members: - - connection - Connection object which can be freely used by default value - generators to execute SQL. This Connection should reference the - same underlying connection/transactional resources of - root_connection. - - root_connection - Connection object which is the source of this ExecutionContext. This - Connection may have close_with_result=True set, in which case it can - only be used once. - - dialect - dialect which created this ExecutionContext. - - cursor - DB-API cursor procured from the connection, - - compiled - if passed to constructor, sqlalchemy.engine.base.Compiled object - being executed, - - statement - string version of the statement to be executed. Is either - passed to the constructor, or must be created from the - sql.Compiled object by the time pre_exec() has completed. - - parameters - bind parameters passed to the execute() method. For compiled - statements, this is a dictionary or list of dictionaries. For - textual statements, it should be in a format suitable for the - dialect's paramstyle (i.e. dict or list of dicts for non - positional, list or list of lists/tuples for positional). - - isinsert - True if the statement is an INSERT. - - isupdate - True if the statement is an UPDATE. - - should_autocommit - True if the statement is a "committable" statement. - - postfetch_cols - a list of Column objects for which a server-side default or - inline SQL expression value was fired off. Applies to inserts - and updates. - """ - - def create_cursor(self): - """Return a new cursor generated from this ExecutionContext's - connection. - - Some dialects may wish to change the behavior of - connection.cursor(), such as postgresql which may return a PG - "server side" cursor. - """ - - raise NotImplementedError() - - def pre_exec(self): - """Called before an execution of a compiled statement. - - If a compiled statement was passed to this ExecutionContext, - the `statement` and `parameters` datamembers must be - initialized after this statement is complete. - """ - - raise NotImplementedError() - - def post_exec(self): - """Called after the execution of a compiled statement. - - If a compiled statement was passed to this ExecutionContext, - the `last_insert_ids`, `last_inserted_params`, etc. - datamembers should be available after this method completes. - """ - - raise NotImplementedError() - - def result(self): - """Return a result object corresponding to this ExecutionContext. - - Returns a ResultProxy. - """ - - raise NotImplementedError() - - def handle_dbapi_exception(self, e): - """Receive a DBAPI exception which occurred upon execute, result - fetch, etc.""" - - raise NotImplementedError() - - def should_autocommit_text(self, statement): - """Parse the given textual statement and return True if it refers to - a "committable" statement""" - - raise NotImplementedError() - - def lastrow_has_defaults(self): - """Return True if the last INSERT or UPDATE row contained - inlined or database-side defaults. - """ - - raise NotImplementedError() - - def get_rowcount(self): - """Return the number of rows produced (by a SELECT query) - or affected (by an INSERT/UPDATE/DELETE statement). - - Note that this row count may not be properly implemented - in some dialects; this is indicated by the - ``supports_sane_rowcount`` and ``supports_sane_multi_rowcount`` - dialect attributes. - - """ - - raise NotImplementedError() - - -class Compiled(object): - """Represent a compiled SQL or DDL expression. - - The ``__str__`` method of the ``Compiled`` object should produce - the actual text of the statement. ``Compiled`` objects are - specific to their underlying database dialect, and also may - or may not be specific to the columns referenced within a - particular set of bind parameters. In no case should the - ``Compiled`` object be dependent on the actual values of those - bind parameters, even though it may reference those values as - defaults. - """ - - def __init__(self, dialect, statement, bind=None): - """Construct a new ``Compiled`` object. - - :param dialect: ``Dialect`` to compile against. - - :param statement: ``ClauseElement`` to be compiled. - - :param bind: Optional Engine or Connection to compile this - statement against. - """ - - self.dialect = dialect - self.bind = bind - if statement is not None: - self.statement = statement - self.can_execute = statement.supports_execution - self.string = self.process(self.statement) - - @util.deprecated("0.7", ":class:`.Compiled` objects now compile " - "within the constructor.") - def compile(self): - """Produce the internal string representation of this element.""" - pass - - @property - def sql_compiler(self): - """Return a Compiled that is capable of processing SQL expressions. - - If this compiler is one, it would likely just return 'self'. - - """ - - raise NotImplementedError() - - def process(self, obj, **kwargs): - return obj._compiler_dispatch(self, **kwargs) - - def __str__(self): - """Return the string text of the generated SQL or DDL.""" - - return self.string or '' - - def construct_params(self, params=None): - """Return the bind params for this compiled object. - - :param params: a dict of string/object pairs whos values will - override bind values compiled in to the - statement. - """ - - raise NotImplementedError() - - @property - def params(self): - """Return the bind params for this compiled object.""" - return self.construct_params() - - def execute(self, *multiparams, **params): - """Execute this compiled object.""" - - e = self.bind - if e is None: - raise exc.UnboundExecutionError( - "This Compiled object is not bound to any Engine " - "or Connection.") - return e._execute_compiled(self, multiparams, params) - - def scalar(self, *multiparams, **params): - """Execute this compiled object and return the result's - scalar value.""" - - return self.execute(*multiparams, **params).scalar() - - -class TypeCompiler(object): - """Produces DDL specification for TypeEngine objects.""" - - def __init__(self, dialect): - self.dialect = dialect - - def process(self, type_): - return type_._compiler_dispatch(self) - - -class Connectable(object): - """Interface for an object which supports execution of SQL constructs. - - The two implementations of ``Connectable`` are :class:`.Connection` and - :class:`.Engine`. - - Connectable must also implement the 'dialect' member which references a - :class:`.Dialect` instance. - """ - - def contextual_connect(self): - """Return a Connection object which may be part of an ongoing - context.""" - - raise NotImplementedError() - - def create(self, entity, **kwargs): - """Create a table or index given an appropriate schema object.""" - - raise NotImplementedError() - - def drop(self, entity, **kwargs): - """Drop a table or index given an appropriate schema object.""" - - raise NotImplementedError() - - def execute(self, object, *multiparams, **params): - """Executes the given construct and returns a :class:`.ResultProxy`.""" - raise NotImplementedError() - - def scalar(self, object, *multiparams, **params): - """Executes and returns the first column of the first row. - - The underlying cursor is closed after execution. - """ - raise NotImplementedError() - - def _execute_clauseelement(self, elem, multiparams=None, params=None): - raise NotImplementedError() - - -class Connection(Connectable): - """Provides high-level functionality for a wrapped DB-API connection. - - Provides execution support for string-based SQL statements as well as - :class:`.ClauseElement`, :class:`.Compiled` and :class:`.DefaultGenerator` - objects. Provides a :meth:`begin` method to return :class:`.Transaction` - objects. - - The Connection object is **not** thread-safe. While a Connection can be - shared among threads using properly synchronized access, it is still - possible that the underlying DBAPI connection may not support shared - access between threads. Check the DBAPI documentation for details. - - The Connection object represents a single dbapi connection checked out - from the connection pool. In this state, the connection pool has no affect - upon the connection, including its expiration or timeout state. For the - connection pool to properly manage connections, connections should be - returned to the connection pool (i.e. ``connection.close()``) whenever the - connection is not in use. - - .. index:: - single: thread safety; Connection - - """ - - def __init__(self, engine, connection=None, close_with_result=False, - _branch=False, _execution_options=None): - """Construct a new Connection. - - The constructor here is not public and is only called only by an - :class:`.Engine`. See :meth:`.Engine.connect` and - :meth:`.Engine.contextual_connect` methods. - - """ - self.engine = engine - self.dialect = engine.dialect - self.__connection = connection or engine.raw_connection() - self.__transaction = None - self.should_close_with_result = close_with_result - self.__savepoint_seq = 0 - self.__branch = _branch - self.__invalid = False - self._has_events = engine._has_events - self._echo = self.engine._should_log_info() - if _execution_options: - self._execution_options =\ - engine._execution_options.union(_execution_options) - else: - self._execution_options = engine._execution_options - - def _branch(self): - """Return a new Connection which references this Connection's - engine and connection; but does not have close_with_result enabled, - and also whose close() method does nothing. - - This is used to execute "sub" statements within a single execution, - usually an INSERT statement. - """ - - return self.engine._connection_cls( - self.engine, - self.__connection, _branch=True) - - def _clone(self): - """Create a shallow copy of this Connection. - - """ - c = self.__class__.__new__(self.__class__) - c.__dict__ = self.__dict__.copy() - return c - - def execution_options(self, **opt): - """ Set non-SQL options for the connection which take effect - during execution. - - The method returns a copy of this :class:`.Connection` which references - the same underlying DBAPI connection, but also defines the given - execution options which will take effect for a call to - :meth:`execute`. As the new :class:`.Connection` references the same - underlying resource, it is probably best to ensure that the copies - would be discarded immediately, which is implicit if used as in:: - - result = connection.execution_options(stream_results=True).\\ - execute(stmt) - - :meth:`.Connection.execution_options` accepts all options as those - accepted by :meth:`.Executable.execution_options`. Additionally, - it includes options that are applicable only to - :class:`.Connection`. - - :param autocommit: Available on: Connection, statement. - When True, a COMMIT will be invoked after execution - when executed in 'autocommit' mode, i.e. when an explicit - transaction is not begun on the connection. Note that DBAPI - connections by default are always in a transaction - SQLAlchemy uses - rules applied to different kinds of statements to determine if - COMMIT will be invoked in order to provide its "autocommit" feature. - Typically, all INSERT/UPDATE/DELETE statements as well as - CREATE/DROP statements have autocommit behavior enabled; SELECT - constructs do not. Use this option when invoking a SELECT or other - specific SQL construct where COMMIT is desired (typically when - calling stored procedures and such), and an explicit - transaction is not in progress. - - :param compiled_cache: Available on: Connection. - A dictionary where :class:`.Compiled` objects - will be cached when the :class:`.Connection` compiles a clause - expression into a :class:`.Compiled` object. - It is the user's responsibility to - manage the size of this dictionary, which will have keys - corresponding to the dialect, clause element, the column - names within the VALUES or SET clause of an INSERT or UPDATE, - as well as the "batch" mode for an INSERT or UPDATE statement. - The format of this dictionary is not guaranteed to stay the - same in future releases. - - Note that the ORM makes use of its own "compiled" caches for - some operations, including flush operations. The caching - used by the ORM internally supersedes a cache dictionary - specified here. - - :param isolation_level: Available on: Connection. - Set the transaction isolation level for - the lifespan of this connection. Valid values include - those string values accepted by the ``isolation_level`` - parameter passed to :func:`.create_engine`, and are - database specific, including those for :ref:`sqlite_toplevel`, - :ref:`postgresql_toplevel` - see those dialect's documentation - for further info. - - Note that this option necessarily affects the underying - DBAPI connection for the lifespan of the originating - :class:`.Connection`, and is not per-execution. This - setting is not removed until the underying DBAPI connection - is returned to the connection pool, i.e. - the :meth:`.Connection.close` method is called. - - :param stream_results: Available on: Connection, statement. - Indicate to the dialect that results should be - "streamed" and not pre-buffered, if possible. This is a limitation - of many DBAPIs. The flag is currently understood only by the - psycopg2 dialect. - - """ - c = self._clone() - c._execution_options = c._execution_options.union(opt) - if 'isolation_level' in opt: - c._set_isolation_level() - return c - - def _set_isolation_level(self): - self.dialect.set_isolation_level(self.connection, - self._execution_options['isolation_level']) - self.connection._connection_record.finalize_callback = \ - self.dialect.reset_isolation_level - - @property - def closed(self): - """Return True if this connection is closed.""" - - return not self.__invalid and '_Connection__connection' \ - not in self.__dict__ - - @property - def invalidated(self): - """Return True if this connection was invalidated.""" - - return self.__invalid - - @property - def connection(self): - "The underlying DB-API connection managed by this Connection." - - try: - return self.__connection - except AttributeError: - return self._revalidate_connection() - - def _revalidate_connection(self): - if self.__invalid: - if self.__transaction is not None: - raise exc.InvalidRequestError( - "Can't reconnect until invalid " - "transaction is rolled back") - self.__connection = self.engine.raw_connection() - self.__invalid = False - return self.__connection - raise exc.ResourceClosedError("This Connection is closed") - - @property - def _connection_is_valid(self): - # use getattr() for is_valid to support exceptions raised in - # dialect initializer, where the connection is not wrapped in - # _ConnectionFairy - - return getattr(self.__connection, 'is_valid', False) - - @property - def info(self): - """A collection of per-DB-API connection instance properties.""" - - return self.connection.info - - def connect(self): - """Returns self. - - This ``Connectable`` interface method returns self, allowing - Connections to be used interchangably with Engines in most - situations that require a bind. - """ - - return self - - def contextual_connect(self, **kwargs): - """Returns self. - - This ``Connectable`` interface method returns self, allowing - Connections to be used interchangably with Engines in most - situations that require a bind. - """ - - return self - - def invalidate(self, exception=None): - """Invalidate the underlying DBAPI connection associated with - this Connection. - - The underlying DB-API connection is literally closed (if - possible), and is discarded. Its source connection pool will - typically lazily create a new connection to replace it. - - Upon the next usage, this Connection will attempt to reconnect - to the pool with a new connection. - - Transactions in progress remain in an "opened" state (even though the - actual transaction is gone); these must be explicitly rolled back - before a reconnect on this Connection can proceed. This is to prevent - applications from accidentally continuing their transactional - operations in a non-transactional state. - - """ - if self.invalidated: - return - - if self.closed: - raise exc.ResourceClosedError("This Connection is closed") - - if self._connection_is_valid: - self.__connection.invalidate(exception) - del self.__connection - self.__invalid = True - - - def detach(self): - """Detach the underlying DB-API connection from its connection pool. - - This Connection instance will remain useable. When closed, - the DB-API connection will be literally closed and not - returned to its pool. The pool will typically lazily create a - new connection to replace the detached connection. - - This method can be used to insulate the rest of an application - from a modified state on a connection (such as a transaction - isolation level or similar). Also see - :class:`~sqlalchemy.interfaces.PoolListener` for a mechanism to modify - connection state when connections leave and return to their - connection pool. - """ - - self.__connection.detach() - - def begin(self): - """Begin a transaction and return a Transaction handle. - - Repeated calls to ``begin`` on the same Connection will create - a lightweight, emulated nested transaction. Only the - outermost transaction may ``commit``. Calls to ``commit`` on - inner transactions are ignored. Any transaction in the - hierarchy may ``rollback``, however. - """ - - if self.__transaction is None: - self.__transaction = RootTransaction(self) - return self.__transaction - else: - return Transaction(self, self.__transaction) - - def begin_nested(self): - """Begin a nested transaction and return a Transaction handle. - - Nested transactions require SAVEPOINT support in the - underlying database. Any transaction in the hierarchy may - ``commit`` and ``rollback``, however the outermost transaction - still controls the overall ``commit`` or ``rollback`` of the - transaction of a whole. - """ - - if self.__transaction is None: - self.__transaction = RootTransaction(self) - else: - self.__transaction = NestedTransaction(self, self.__transaction) - return self.__transaction - - def begin_twophase(self, xid=None): - """Begin a two-phase or XA transaction and return a Transaction - handle. - - :param xid: the two phase transaction id. If not supplied, a - random id will be generated. - - """ - - if self.__transaction is not None: - raise exc.InvalidRequestError( - "Cannot start a two phase transaction when a transaction " - "is already in progress.") - if xid is None: - xid = self.engine.dialect.create_xid(); - self.__transaction = TwoPhaseTransaction(self, xid) - return self.__transaction - - def recover_twophase(self): - return self.engine.dialect.do_recover_twophase(self) - - def rollback_prepared(self, xid, recover=False): - self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) - - def commit_prepared(self, xid, recover=False): - self.engine.dialect.do_commit_twophase(self, xid, recover=recover) - - def in_transaction(self): - """Return True if a transaction is in progress.""" - - return self.__transaction is not None - - def _begin_impl(self): - if self._echo: - self.engine.logger.info("BEGIN (implicit)") - - if self._has_events: - self.engine.dispatch.begin(self) - - try: - self.engine.dialect.do_begin(self.connection) - except Exception, e: - self._handle_dbapi_exception(e, None, None, None, None) - raise - - def _rollback_impl(self): - if self._has_events: - self.engine.dispatch.rollback(self) - - if not self.closed and not self.invalidated and \ - self._connection_is_valid: - if self._echo: - self.engine.logger.info("ROLLBACK") - try: - self.engine.dialect.do_rollback(self.connection) - self.__transaction = None - except Exception, e: - self._handle_dbapi_exception(e, None, None, None, None) - raise - else: - self.__transaction = None - - def _commit_impl(self): - if self._has_events: - self.engine.dispatch.commit(self) - - if self._echo: - self.engine.logger.info("COMMIT") - try: - self.engine.dialect.do_commit(self.connection) - self.__transaction = None - except Exception, e: - self._handle_dbapi_exception(e, None, None, None, None) - raise - - def _savepoint_impl(self, name=None): - if self._has_events: - self.engine.dispatch.savepoint(self, name) - - if name is None: - self.__savepoint_seq += 1 - name = 'sa_savepoint_%s' % self.__savepoint_seq - if self._connection_is_valid: - self.engine.dialect.do_savepoint(self, name) - return name - - def _rollback_to_savepoint_impl(self, name, context): - if self._has_events: - self.engine.dispatch.rollback_savepoint(self, name, context) - - if self._connection_is_valid: - self.engine.dialect.do_rollback_to_savepoint(self, name) - self.__transaction = context - - def _release_savepoint_impl(self, name, context): - if self._has_events: - self.engine.dispatch.release_savepoint(self, name, context) - - if self._connection_is_valid: - self.engine.dialect.do_release_savepoint(self, name) - self.__transaction = context - - def _begin_twophase_impl(self, xid): - if self._has_events: - self.engine.dispatch.begin_twophase(self, xid) - - if self._connection_is_valid: - self.engine.dialect.do_begin_twophase(self, xid) - - def _prepare_twophase_impl(self, xid): - if self._has_events: - self.engine.dispatch.prepare_twophase(self, xid) - - if self._connection_is_valid: - assert isinstance(self.__transaction, TwoPhaseTransaction) - self.engine.dialect.do_prepare_twophase(self, xid) - - def _rollback_twophase_impl(self, xid, is_prepared): - if self._has_events: - self.engine.dispatch.rollback_twophase(self, xid, is_prepared) - - if self._connection_is_valid: - assert isinstance(self.__transaction, TwoPhaseTransaction) - self.engine.dialect.do_rollback_twophase(self, xid, is_prepared) - self.__transaction = None - - def _commit_twophase_impl(self, xid, is_prepared): - if self._has_events: - self.engine.dispatch.commit_twophase(self, xid, is_prepared) - - if self._connection_is_valid: - assert isinstance(self.__transaction, TwoPhaseTransaction) - self.engine.dialect.do_commit_twophase(self, xid, is_prepared) - self.__transaction = None - - def _autorollback(self): - if not self.in_transaction(): - self._rollback_impl() - - def close(self): - """Close this Connection.""" - - try: - conn = self.__connection - except AttributeError: - return - if not self.__branch: - conn.close() - self.__invalid = False - del self.__connection - self.__transaction = None - - def scalar(self, object, *multiparams, **params): - """Executes and returns the first column of the first row. - - The underlying result/cursor is closed after execution. - """ - - return self.execute(object, *multiparams, **params).scalar() - - def execute(self, object, *multiparams, **params): - """Executes the given construct and returns a :class:`.ResultProxy`. - - The construct can be one of: - - * a textual SQL string - * any :class:`.ClauseElement` construct that is also - a subclass of :class:`.Executable`, such as a - :func:`expression.select` construct - * a :class:`.FunctionElement`, such as that generated - by :attr:`.func`, will be automatically wrapped in - a SELECT statement, which is then executed. - * a :class:`.DDLElement` object - * a :class:`.DefaultGenerator` object - * a :class:`.Compiled` object - - """ - for c in type(object).__mro__: - if c in Connection.executors: - return Connection.executors[c]( - self, - object, - multiparams, - params) - else: - raise exc.InvalidRequestError( - "Unexecutable object type: %s" % - type(object)) - - def __distill_params(self, multiparams, params): - """Given arguments from the calling form *multiparams, **params, - return a list of bind parameter structures, usually a list of - dictionaries. - - In the case of 'raw' execution which accepts positional parameters, - it may be a list of tuples or lists. - - """ - - if not multiparams: - if params: - return [params] - else: - return [] - elif len(multiparams) == 1: - zero = multiparams[0] - if isinstance(zero, (list, tuple)): - if not zero or hasattr(zero[0], '__iter__'): - return zero - else: - return [zero] - elif hasattr(zero, 'keys'): - return [zero] - else: - return [[zero]] - else: - if hasattr(multiparams[0], '__iter__'): - return multiparams - else: - return [multiparams] - - def _execute_function(self, func, multiparams, params): - """Execute a sql.FunctionElement object.""" - - return self._execute_clauseelement(func.select(), - multiparams, params) - - def _execute_default(self, default, multiparams, params): - """Execute a schema.ColumnDefault object.""" - - if self._has_events: - for fn in self.engine.dispatch.before_execute: - default, multiparams, params = \ - fn(self, default, multiparams, params) - - try: - try: - conn = self.__connection - except AttributeError: - conn = self._revalidate_connection() - - dialect = self.dialect - ctx = dialect.execution_ctx_cls._init_default( - dialect, self, conn) - except Exception, e: - self._handle_dbapi_exception(e, None, None, None, None) - raise - - ret = ctx._exec_default(default, None) - if self.should_close_with_result: - self.close() - - if self._has_events: - self.engine.dispatch.after_execute(self, - default, multiparams, params, ret) - - return ret - - def _execute_ddl(self, ddl, multiparams, params): - """Execute a schema.DDL object.""" - - if self._has_events: - for fn in self.engine.dispatch.before_execute: - ddl, multiparams, params = \ - fn(self, ddl, multiparams, params) - - dialect = self.dialect - - compiled = ddl.compile(dialect=dialect) - ret = self._execute_context( - dialect, - dialect.execution_ctx_cls._init_ddl, - compiled, - None, - compiled - ) - if self._has_events: - self.engine.dispatch.after_execute(self, - ddl, multiparams, params, ret) - return ret - - def _execute_clauseelement(self, elem, multiparams, params): - """Execute a sql.ClauseElement object.""" - - if self._has_events: - for fn in self.engine.dispatch.before_execute: - elem, multiparams, params = \ - fn(self, elem, multiparams, params) - - distilled_params = self.__distill_params(multiparams, params) - if distilled_params: - keys = distilled_params[0].keys() - else: - keys = [] - - dialect = self.dialect - if 'compiled_cache' in self._execution_options: - key = dialect, elem, tuple(keys), len(distilled_params) > 1 - if key in self._execution_options['compiled_cache']: - compiled_sql = self._execution_options['compiled_cache'][key] - else: - compiled_sql = elem.compile( - dialect=dialect, column_keys=keys, - inline=len(distilled_params) > 1) - self._execution_options['compiled_cache'][key] = compiled_sql - else: - compiled_sql = elem.compile( - dialect=dialect, column_keys=keys, - inline=len(distilled_params) > 1) - - - ret = self._execute_context( - dialect, - dialect.execution_ctx_cls._init_compiled, - compiled_sql, - distilled_params, - compiled_sql, distilled_params - ) - if self._has_events: - self.engine.dispatch.after_execute(self, - elem, multiparams, params, ret) - return ret - - def _execute_compiled(self, compiled, multiparams, params): - """Execute a sql.Compiled object.""" - - if self._has_events: - for fn in self.engine.dispatch.before_execute: - compiled, multiparams, params = \ - fn(self, compiled, multiparams, params) - - dialect = self.dialect - parameters=self.__distill_params(multiparams, params) - ret = self._execute_context( - dialect, - dialect.execution_ctx_cls._init_compiled, - compiled, - parameters, - compiled, parameters - ) - if self._has_events: - self.engine.dispatch.after_execute(self, - compiled, multiparams, params, ret) - return ret - - def _execute_text(self, statement, multiparams, params): - """Execute a string SQL statement.""" - - if self._has_events: - for fn in self.engine.dispatch.before_execute: - statement, multiparams, params = \ - fn(self, statement, multiparams, params) - - dialect = self.dialect - parameters = self.__distill_params(multiparams, params) - ret = self._execute_context( - dialect, - dialect.execution_ctx_cls._init_statement, - statement, - parameters, - statement, parameters - ) - if self._has_events: - self.engine.dispatch.after_execute(self, - statement, multiparams, params, ret) - return ret - - def _execute_context(self, dialect, constructor, - statement, parameters, - *args): - """Create an :class:`.ExecutionContext` and execute, returning - a :class:`.ResultProxy`.""" - - try: - try: - conn = self.__connection - except AttributeError: - conn = self._revalidate_connection() - - context = constructor(dialect, self, conn, *args) - except Exception, e: - self._handle_dbapi_exception(e, - str(statement), parameters, - None, None) - raise - - if context.compiled: - context.pre_exec() - - cursor, statement, parameters = context.cursor, \ - context.statement, \ - context.parameters - - if not context.executemany: - parameters = parameters[0] - - if self._has_events: - for fn in self.engine.dispatch.before_cursor_execute: - statement, parameters = \ - fn(self, cursor, statement, parameters, - context, context.executemany) - - if self._echo: - self.engine.logger.info(statement) - self.engine.logger.info("%r", parameters) - try: - if context.executemany: - self.dialect.do_executemany( - cursor, - statement, - parameters, - context) - else: - self.dialect.do_execute( - cursor, - statement, - parameters, - context) - except Exception, e: - self._handle_dbapi_exception( - e, - statement, - parameters, - cursor, - context) - raise - - - if self._has_events: - self.engine.dispatch.after_cursor_execute(self, cursor, - statement, - parameters, - context, - context.executemany) - - if context.compiled: - context.post_exec() - - if context.isinsert and not context.executemany: - context.post_insert() - - # create a resultproxy, get rowcount/implicit RETURNING - # rows, close cursor if no further results pending - result = context.get_result_proxy() - - if context.isinsert: - if context._is_implicit_returning: - context._fetch_implicit_returning(result) - result.close(_autoclose_connection=False) - elif not context._is_explicit_returning: - result.close(_autoclose_connection=False) - elif result._metadata is None: - # no results, get rowcount - # (which requires open cursor on some drivers - # such as kintersbasdb, mxodbc), - result.rowcount - result.close(_autoclose_connection=False) - - if self.__transaction is None and context.should_autocommit: - self._commit_impl() - - if result.closed and self.should_close_with_result: - self.close() - - return result - - def _cursor_execute(self, cursor, statement, parameters): - """Execute a statement + params on the given cursor. - - Adds appropriate logging and exception handling. - - This method is used by DefaultDialect for special-case - executions, such as for sequences and column defaults. - The path of statement execution in the majority of cases - terminates at _execute_context(). - - """ - if self._echo: - self.engine.logger.info(statement) - self.engine.logger.info("%r", parameters) - try: - self.dialect.do_execute( - cursor, - statement, - parameters) - except Exception, e: - self._handle_dbapi_exception( - e, - statement, - parameters, - cursor, - None) - raise - - def _safe_close_cursor(self, cursor): - """Close the given cursor, catching exceptions - and turning into log warnings. - - """ - try: - cursor.close() - except Exception, e: - try: - ex_text = str(e) - except TypeError: - ex_text = repr(e) - self.connection._logger.warn("Error closing cursor: %s", ex_text) - - if isinstance(e, (SystemExit, KeyboardInterrupt)): - raise - - def _handle_dbapi_exception(self, - e, - statement, - parameters, - cursor, - context): - if getattr(self, '_reentrant_error', False): - # Py3K - #raise exc.DBAPIError.instance(statement, parameters, e, - # self.dialect.dbapi.Error) from e - # Py2K - raise exc.DBAPIError.instance(statement, - parameters, - e, - self.dialect.dbapi.Error), \ - None, sys.exc_info()[2] - # end Py2K - self._reentrant_error = True - try: - # non-DBAPI error - if we already got a context, - # or theres no string statement, don't wrap it - should_wrap = isinstance(e, self.dialect.dbapi.Error) or \ - (statement is not None and context is None) - - if should_wrap and context: - context.handle_dbapi_exception(e) - - is_disconnect = isinstance(e, self.dialect.dbapi.Error) and \ - self.dialect.is_disconnect(e, self.__connection, cursor) - if is_disconnect: - self.invalidate(e) - self.engine.dispose() - else: - if cursor: - self._safe_close_cursor(cursor) - self._autorollback() - if self.should_close_with_result: - self.close() - - if not should_wrap: - return - - # Py3K - #raise exc.DBAPIError.instance( - # statement, - # parameters, - # e, - # self.dialect.dbapi.Error, - # connection_invalidated=is_disconnect) \ - # from e - # Py2K - raise exc.DBAPIError.instance( - statement, - parameters, - e, - self.dialect.dbapi.Error, - connection_invalidated=is_disconnect), \ - None, sys.exc_info()[2] - # end Py2K - - finally: - del self._reentrant_error - - # poor man's multimethod/generic function thingy - executors = { - expression.FunctionElement: _execute_function, - expression.ClauseElement: _execute_clauseelement, - Compiled: _execute_compiled, - schema.SchemaItem: _execute_default, - schema.DDLElement: _execute_ddl, - basestring: _execute_text - } - - def create(self, entity, **kwargs): - """Create a Table or Index given an appropriate Schema object.""" - - return self.engine.create(entity, connection=self, **kwargs) - - def drop(self, entity, **kwargs): - """Drop a Table or Index given an appropriate Schema object.""" - - return self.engine.drop(entity, connection=self, **kwargs) - - def reflecttable(self, table, include_columns=None): - """Reflect the columns in the given string table name from the - database.""" - - return self.engine.reflecttable(table, self, include_columns) - - def default_schema_name(self): - return self.engine.dialect.get_default_schema_name(self) - - def transaction(self, callable_, *args, **kwargs): - """Execute the given function within a transaction boundary. - - This is a shortcut for explicitly calling `begin()` and `commit()` - and optionally `rollback()` when exceptions are raised. The - given `*args` and `**kwargs` will be passed to the function. - - See also transaction() on engine. - - """ - - trans = self.begin() - try: - ret = self.run_callable(callable_, *args, **kwargs) - trans.commit() - return ret - except: - trans.rollback() - raise - - def run_callable(self, callable_, *args, **kwargs): - return callable_(self, *args, **kwargs) - - -class Transaction(object): - """Represent a Transaction in progress. - - The object provides :meth:`.rollback` and :meth:`.commit` - methods in order to control transaction boundaries. It - also implements a context manager interface so that - the Python ``with`` statement can be used with the - :meth:`.Connection.begin` method. - - The Transaction object is **not** threadsafe. - - .. index:: - single: thread safety; Transaction - """ - - def __init__(self, connection, parent): - """The constructor for :class:`.Transaction` is private - and is called from within the :class:`.Connection.begin` - implementation. - - """ - self.connection = connection - self._parent = parent or self - self.is_active = True - - def close(self): - """Close this :class:`.Transaction`. - - If this transaction is the base transaction in a begin/commit - nesting, the transaction will rollback(). Otherwise, the - method returns. - - This is used to cancel a Transaction without affecting the scope of - an enclosing transaction. - - """ - if not self._parent.is_active: - return - if self._parent is self: - self.rollback() - - def rollback(self): - """Roll back this :class:`.Transaction`. - - """ - if not self._parent.is_active: - return - self._do_rollback() - self.is_active = False - - def _do_rollback(self): - self._parent.rollback() - - def commit(self): - """Commit this :class:`.Transaction`.""" - - if not self._parent.is_active: - raise exc.InvalidRequestError("This transaction is inactive") - self._do_commit() - self.is_active = False - - def _do_commit(self): - pass - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - if type is None and self.is_active: - self.commit() - else: - self.rollback() - - -class RootTransaction(Transaction): - def __init__(self, connection): - super(RootTransaction, self).__init__(connection, None) - self.connection._begin_impl() - - def _do_rollback(self): - if self.is_active: - self.connection._rollback_impl() - - def _do_commit(self): - if self.is_active: - self.connection._commit_impl() - - -class NestedTransaction(Transaction): - def __init__(self, connection, parent): - super(NestedTransaction, self).__init__(connection, parent) - self._savepoint = self.connection._savepoint_impl() - - def _do_rollback(self): - if self.is_active: - self.connection._rollback_to_savepoint_impl( - self._savepoint, self._parent) - - def _do_commit(self): - if self.is_active: - self.connection._release_savepoint_impl( - self._savepoint, self._parent) - - -class TwoPhaseTransaction(Transaction): - def __init__(self, connection, xid): - super(TwoPhaseTransaction, self).__init__(connection, None) - self._is_prepared = False - self.xid = xid - self.connection._begin_twophase_impl(self.xid) - - def prepare(self): - if not self._parent.is_active: - raise exc.InvalidRequestError("This transaction is inactive") - self.connection._prepare_twophase_impl(self.xid) - self._is_prepared = True - - def _do_rollback(self): - self.connection._rollback_twophase_impl(self.xid, self._is_prepared) - - def _do_commit(self): - self.connection._commit_twophase_impl(self.xid, self._is_prepared) - - -class Engine(Connectable, log.Identified): - """ - Connects a :class:`~sqlalchemy.pool.Pool` and - :class:`~sqlalchemy.engine.base.Dialect` together to provide a source - of database connectivity and behavior. - - An :class:`.Engine` object is instantiated publically using the - :func:`~sqlalchemy.create_engine` function. - - """ - - _execution_options = util.immutabledict() - _has_events = False - _connection_cls = Connection - - def __init__(self, pool, dialect, url, - logging_name=None, echo=None, proxy=None, - execution_options=None - ): - self.pool = pool - self.url = url - self.dialect = dialect - if logging_name: - self.logging_name = logging_name - self.echo = echo - self.engine = self - log.instance_logger(self, echoflag=echo) - if proxy: - interfaces.ConnectionProxy._adapt_listener(self, proxy) - if execution_options: - if 'isolation_level' in execution_options: - raise exc.ArgumentError( - "'isolation_level' execution option may " - "only be specified on Connection.execution_options(). " - "To set engine-wide isolation level, " - "use the isolation_level argument to create_engine()." - ) - self.update_execution_options(**execution_options) - - dispatch = event.dispatcher(events.ConnectionEvents) - - def update_execution_options(self, **opt): - """update the execution_options dictionary of this :class:`.Engine`. - - For details on execution_options, see - :meth:`Connection.execution_options` as well as - :meth:`sqlalchemy.sql.expression.Executable.execution_options`. - - """ - self._execution_options = \ - self._execution_options.union(opt) - - @property - def name(self): - """String name of the :class:`~sqlalchemy.engine.Dialect` in use by - this ``Engine``.""" - - return self.dialect.name - - @property - def driver(self): - """Driver name of the :class:`~sqlalchemy.engine.Dialect` in use by - this ``Engine``.""" - - return self.dialect.driver - - echo = log.echo_property() - - def __repr__(self): - return 'Engine(%s)' % str(self.url) - - def dispose(self): - """Dispose of the connection pool used by this :class:`.Engine`. - - A new connection pool is created immediately after the old one has - been disposed. This new pool, like all SQLAlchemy connection pools, - does not make any actual connections to the database until one is - first requested. - - This method has two general use cases: - - * When a dropped connection is detected, it is assumed that all - connections held by the pool are potentially dropped, and - the entire pool is replaced. - - * An application may want to use :meth:`dispose` within a test - suite that is creating multiple engines. - - It is critical to note that :meth:`dispose` does **not** guarantee - that the application will release all open database connections - only - those connections that are checked into the pool are closed. - Connections which remain checked out or have been detached from - the engine are not affected. - - """ - self.pool.dispose() - self.pool = self.pool.recreate() - - def create(self, entity, connection=None, **kwargs): - """Create a table or index within this engine's database connection - given a schema object.""" - - from sqlalchemy.engine import ddl - - self._run_visitor(ddl.SchemaGenerator, entity, - connection=connection, **kwargs) - - def drop(self, entity, connection=None, **kwargs): - """Drop a table or index within this engine's database connection - given a schema object.""" - - from sqlalchemy.engine import ddl - - self._run_visitor(ddl.SchemaDropper, entity, - connection=connection, **kwargs) - - def _execute_default(self, default): - connection = self.contextual_connect() - try: - return connection._execute_default(default, (), {}) - finally: - connection.close() - - @property - def func(self): - return expression._FunctionGenerator(bind=self) - - def text(self, text, *args, **kwargs): - """Return a :func:`~sqlalchemy.sql.expression.text` construct, - bound to this engine. - - This is equivalent to:: - - text("SELECT * FROM table", bind=engine) - - """ - - return expression.text(text, bind=self, *args, **kwargs) - - def _run_visitor(self, visitorcallable, element, - connection=None, **kwargs): - if connection is None: - conn = self.contextual_connect(close_with_result=False) - else: - conn = connection - try: - visitorcallable(self.dialect, conn, - **kwargs).traverse_single(element) - finally: - if connection is None: - conn.close() - - def transaction(self, callable_, *args, **kwargs): - """Execute the given function within a transaction boundary. - - This is a shortcut for explicitly calling `begin()` and `commit()` - and optionally `rollback()` when exceptions are raised. The - given `*args` and `**kwargs` will be passed to the function. - - The connection used is that of contextual_connect(). - - See also the similar method on Connection itself. - - """ - - conn = self.contextual_connect() - try: - return conn.transaction(callable_, *args, **kwargs) - finally: - conn.close() - - def run_callable(self, callable_, *args, **kwargs): - conn = self.contextual_connect() - try: - return conn.run_callable(callable_, *args, **kwargs) - finally: - conn.close() - - def execute(self, statement, *multiparams, **params): - """Executes the given construct and returns a :class:`.ResultProxy`. - - The arguments are the same as those used by - :meth:`.Connection.execute`. - - Here, a :class:`.Connection` is acquired using the - :meth:`~.Engine.contextual_connect` method, and the statement executed - with that connection. The returned :class:`.ResultProxy` is flagged - such that when the :class:`.ResultProxy` is exhausted and its - underlying cursor is closed, the :class:`.Connection` created here - will also be closed, which allows its associated DBAPI connection - resource to be returned to the connection pool. - - """ - - connection = self.contextual_connect(close_with_result=True) - return connection.execute(statement, *multiparams, **params) - - def scalar(self, statement, *multiparams, **params): - return self.execute(statement, *multiparams, **params).scalar() - - def _execute_clauseelement(self, elem, multiparams=None, params=None): - connection = self.contextual_connect(close_with_result=True) - return connection._execute_clauseelement(elem, multiparams, params) - - def _execute_compiled(self, compiled, multiparams, params): - connection = self.contextual_connect(close_with_result=True) - return connection._execute_compiled(compiled, multiparams, params) - - def connect(self, **kwargs): - """Return a new :class:`.Connection` object. - - The :class:`.Connection`, upon construction, will procure a DBAPI connection - from the :class:`.Pool` referenced by this :class:`.Engine`, - returning it back to the :class:`.Pool` after the :meth:`.Connection.close` - method is called. - - """ - - return self._connection_cls(self, **kwargs) - - def contextual_connect(self, close_with_result=False, **kwargs): - """Return a :class:`.Connection` object which may be part of some ongoing context. - - By default, this method does the same thing as :meth:`.Engine.connect`. - Subclasses of :class:`.Engine` may override this method - to provide contextual behavior. - - :param close_with_result: When True, the first :class:`.ResultProxy` created - by the :class:`.Connection` will call the :meth:`.Connection.close` method - of that connection as soon as any pending result rows are exhausted. - This is used to supply the "connectionless execution" behavior provided - by the :meth:`.Engine.execute` method. - - """ - - return self._connection_cls(self, - self.pool.connect(), - close_with_result=close_with_result, - **kwargs) - - def table_names(self, schema=None, connection=None): - """Return a list of all table names available in the database. - - :param schema: Optional, retrieve names from a non-default schema. - - :param connection: Optional, use a specified connection. Default is - the ``contextual_connect`` for this ``Engine``. - """ - - if connection is None: - conn = self.contextual_connect() - else: - conn = connection - if not schema: - schema = self.dialect.default_schema_name - try: - return self.dialect.get_table_names(conn, schema) - finally: - if connection is None: - conn.close() - - def reflecttable(self, table, connection=None, include_columns=None): - """Given a Table object, reflects its columns and properties from the - database.""" - - if connection is None: - conn = self.contextual_connect() - else: - conn = connection - try: - self.dialect.reflecttable(conn, table, include_columns) - finally: - if connection is None: - conn.close() - - def has_table(self, table_name, schema=None): - return self.run_callable(self.dialect.has_table, table_name, schema) - - def raw_connection(self): - """Return a DB-API connection.""" - - return self.pool.unique_connection() - - -# This reconstructor is necessary so that pickles with the C extension or -# without use the same Binary format. -try: - # We need a different reconstructor on the C extension so that we can - # add extra checks that fields have correctly been initialized by - # __setstate__. - from sqlalchemy.cresultproxy import safe_rowproxy_reconstructor - - # The extra function embedding is needed so that the - # reconstructor function has the same signature whether or not - # the extension is present. - def rowproxy_reconstructor(cls, state): - return safe_rowproxy_reconstructor(cls, state) -except ImportError: - def rowproxy_reconstructor(cls, state): - obj = cls.__new__(cls) - obj.__setstate__(state) - return obj - -try: - from sqlalchemy.cresultproxy import BaseRowProxy -except ImportError: - class BaseRowProxy(object): - __slots__ = ('_parent', '_row', '_processors', '_keymap') - - def __init__(self, parent, row, processors, keymap): - """RowProxy objects are constructed by ResultProxy objects.""" - - self._parent = parent - self._row = row - self._processors = processors - self._keymap = keymap - - def __reduce__(self): - return (rowproxy_reconstructor, - (self.__class__, self.__getstate__())) - - def values(self): - """Return the values represented by this RowProxy as a list.""" - return list(self) - - def __iter__(self): - for processor, value in izip(self._processors, self._row): - if processor is None: - yield value - else: - yield processor(value) - - def __len__(self): - return len(self._row) - - def __getitem__(self, key): - try: - processor, index = self._keymap[key] - except KeyError: - processor, index = self._parent._key_fallback(key) - except TypeError: - if isinstance(key, slice): - l = [] - for processor, value in izip(self._processors[key], - self._row[key]): - if processor is None: - l.append(value) - else: - l.append(processor(value)) - return tuple(l) - else: - raise - if index is None: - raise exc.InvalidRequestError( - "Ambiguous column name '%s' in result set! " - "try 'use_labels' option on select statement." % key) - if processor is not None: - return processor(self._row[index]) - else: - return self._row[index] - - def __getattr__(self, name): - try: - # TODO: no test coverage here - return self[name] - except KeyError, e: - raise AttributeError(e.args[0]) - - -class RowProxy(BaseRowProxy): - """Proxy values from a single cursor row. - - Mostly follows "ordered dictionary" behavior, mapping result - values to the string-based column name, the integer position of - the result in the row, as well as Column instances which can be - mapped to the original Columns that produced this result set (for - results that correspond to constructed SQL expressions). - """ - __slots__ = () - - def __contains__(self, key): - return self._parent._has_key(self._row, key) - - def __getstate__(self): - return { - '_parent': self._parent, - '_row': tuple(self) - } - - def __setstate__(self, state): - self._parent = parent = state['_parent'] - self._row = state['_row'] - self._processors = parent._processors - self._keymap = parent._keymap - - __hash__ = None - - def __eq__(self, other): - return other is self or other == tuple(self) - - def __ne__(self, other): - return not self.__eq__(other) - - def __repr__(self): - return repr(tuple(self)) - - def has_key(self, key): - """Return True if this RowProxy contains the given key.""" - - return self._parent._has_key(self._row, key) - - def items(self): - """Return a list of tuples, each tuple containing a key/value pair.""" - # TODO: no coverage here - return [(key, self[key]) for key in self.iterkeys()] - - def keys(self): - """Return the list of keys as strings represented by this RowProxy.""" - - return self._parent.keys - - def iterkeys(self): - return iter(self._parent.keys) - - def itervalues(self): - return iter(self) - -try: - # Register RowProxy with Sequence, - # so sequence protocol is implemented - from collections import Sequence - Sequence.register(RowProxy) -except ImportError: - pass - - -class ResultMetaData(object): - """Handle cursor.description, applying additional info from an execution - context.""" - - def __init__(self, parent, metadata): - self._processors = processors = [] - - # We do not strictly need to store the processor in the key mapping, - # though it is faster in the Python version (probably because of the - # saved attribute lookup self._processors) - self._keymap = keymap = {} - self.keys = [] - context = parent.context - dialect = context.dialect - typemap = dialect.dbapi_type_map - - for i, rec in enumerate(metadata): - colname = rec[0] - coltype = rec[1] - - if dialect.description_encoding: - colname = dialect._description_decoder(colname) - - if context.result_map: - try: - name, obj, type_ = context.result_map[colname.lower()] - except KeyError: - name, obj, type_ = \ - colname, None, typemap.get(coltype, types.NULLTYPE) - else: - name, obj, type_ = \ - colname, None, typemap.get(coltype, types.NULLTYPE) - - processor = type_._cached_result_processor(dialect, coltype) - - processors.append(processor) - rec = (processor, i) - - # indexes as keys. This is only needed for the Python version of - # RowProxy (the C version uses a faster path for integer indexes). - keymap[i] = rec - - # Column names as keys - if keymap.setdefault(name.lower(), rec) is not rec: - # We do not raise an exception directly because several - # columns colliding by name is not a problem as long as the - # user does not try to access them (ie use an index directly, - # or the more precise ColumnElement) - keymap[name.lower()] = (processor, None) - - if dialect.requires_name_normalize: - colname = dialect.normalize_name(colname) - - self.keys.append(colname) - if obj: - for o in obj: - keymap[o] = rec - - if parent._echo: - context.engine.logger.debug( - "Col %r", tuple(x[0] for x in metadata)) - - def _set_keymap_synonym(self, name, origname): - """Set a synonym for the given name. - - Some dialects (SQLite at the moment) may use this to - adjust the column names that are significant within a - row. - - """ - rec = (processor, i) = self._keymap[origname.lower()] - if self._keymap.setdefault(name, rec) is not rec: - self._keymap[name] = (processor, None) - - def _key_fallback(self, key): - map = self._keymap - result = None - if isinstance(key, basestring): - result = map.get(key.lower()) - # fallback for targeting a ColumnElement to a textual expression - # this is a rare use case which only occurs when matching text() - # constructs to ColumnElements, and after a pickle/unpickle roundtrip - elif isinstance(key, expression.ColumnElement): - if key._label and key._label.lower() in map: - result = map[key._label.lower()] - elif hasattr(key, 'name') and key.name.lower() in map: - result = map[key.name.lower()] - if result is None: - raise exc.NoSuchColumnError( - "Could not locate column in row for column '%s'" % key) - else: - map[key] = result - return result - - def _has_key(self, row, key): - if key in self._keymap: - return True - else: - try: - self._key_fallback(key) - return True - except exc.NoSuchColumnError: - return False - - def __getstate__(self): - return { - '_pickled_keymap': dict( - (key, index) - for key, (processor, index) in self._keymap.iteritems() - if isinstance(key, (basestring, int)) - ), - 'keys': self.keys - } - - def __setstate__(self, state): - # the row has been processed at pickling time so we don't need any - # processor anymore - self._processors = [None for _ in xrange(len(state['keys']))] - self._keymap = keymap = {} - for key, index in state['_pickled_keymap'].iteritems(): - keymap[key] = (None, index) - self.keys = state['keys'] - self._echo = False - - -class ResultProxy(object): - """Wraps a DB-API cursor object to provide easier access to row columns. - - Individual columns may be accessed by their integer position, - case-insensitive column name, or by ``schema.Column`` - object. e.g.:: - - row = fetchone() - - col1 = row[0] # access via integer position - - col2 = row['col2'] # access via name - - col3 = row[mytable.c.mycol] # access via Column object. - - ``ResultProxy`` also handles post-processing of result column - data using ``TypeEngine`` objects, which are referenced from - the originating SQL statement that produced this result set. - - """ - - _process_row = RowProxy - out_parameters = None - _can_close_connection = False - - def __init__(self, context): - self.context = context - self.dialect = context.dialect - self.closed = False - self.cursor = self._saved_cursor = context.cursor - self.connection = context.root_connection - self._echo = self.connection._echo and \ - context.engine._should_log_debug() - self._init_metadata() - - def _init_metadata(self): - metadata = self._cursor_description() - if metadata is None: - self._metadata = None - else: - self._metadata = ResultMetaData(self, metadata) - - def keys(self): - """Return the current set of string keys for rows.""" - if self._metadata: - return self._metadata.keys - else: - return [] - - @util.memoized_property - def rowcount(self): - """Return the 'rowcount' for this result. - - The 'rowcount' reports the number of rows affected - by an UPDATE or DELETE statement. It has *no* other - uses and is not intended to provide the number of rows - present from a SELECT. - - Note that this row count may not be properly implemented in some - dialects; this is indicated by - :meth:`~sqlalchemy.engine.base.ResultProxy.supports_sane_rowcount()` - and - :meth:`~sqlalchemy.engine.base.ResultProxy.supports_sane_multi_rowcount()`. - ``rowcount()`` also may not work at this time for a statement that - uses ``returning()``. - - """ - try: - return self.context.rowcount - except Exception, e: - self.connection._handle_dbapi_exception( - e, None, None, self.cursor, self.context) - raise - - @property - def lastrowid(self): - """return the 'lastrowid' accessor on the DBAPI cursor. - - This is a DBAPI specific method and is only functional - for those backends which support it, for statements - where it is appropriate. It's behavior is not - consistent across backends. - - Usage of this method is normally unnecessary; the - :attr:`~ResultProxy.inserted_primary_key` attribute provides a - tuple of primary key values for a newly inserted row, - regardless of database backend. - - """ - try: - return self._saved_cursor.lastrowid - except Exception, e: - self.connection._handle_dbapi_exception( - e, None, None, - self._saved_cursor, self.context) - raise - - @property - def returns_rows(self): - """True if this :class:`.ResultProxy` returns rows. - - I.e. if it is legal to call the methods - :meth:`~.ResultProxy.fetchone`, - :meth:`~.ResultProxy.fetchmany` - :meth:`~.ResultProxy.fetchall`. - - """ - return self._metadata is not None - - @property - def is_insert(self): - """True if this :class:`.ResultProxy` is the result - of a executing an expression language compiled - :func:`.expression.insert` construct. - - When True, this implies that the - :attr:`inserted_primary_key` attribute is accessible, - assuming the statement did not include - a user defined "returning" construct. - - """ - return self.context.isinsert - - def _cursor_description(self): - """May be overridden by subclasses.""" - - return self._saved_cursor.description - - def close(self, _autoclose_connection=True): - """Close this ResultProxy. - - Closes the underlying DBAPI cursor corresponding to the execution. - - Note that any data cached within this ResultProxy is still available. - For some types of results, this may include buffered rows. - - If this ResultProxy was generated from an implicit execution, - the underlying Connection will also be closed (returns the - underlying DBAPI connection to the connection pool.) - - This method is called automatically when: - - * all result rows are exhausted using the fetchXXX() methods. - * cursor.description is None. - - """ - - if not self.closed: - self.closed = True - self.connection._safe_close_cursor(self.cursor) - if _autoclose_connection and \ - self.connection.should_close_with_result: - self.connection.close() - # allow consistent errors - self.cursor = None - - def __iter__(self): - while True: - row = self.fetchone() - if row is None: - raise StopIteration - else: - yield row - - @util.memoized_property - def inserted_primary_key(self): - """Return the primary key for the row just inserted. - - The return value is a list of scalar values - corresponding to the list of primary key columns - in the target table. - - This only applies to single row :func:`.insert` - constructs which did not explicitly specify - :meth:`.Insert.returning`. - - Note that primary key columns which specify a - server_default clause, - or otherwise do not qualify as "autoincrement" - columns (see the notes at :class:`.Column`), and were - generated using the database-side default, will - appear in this list as ``None`` unless the backend - supports "returning" and the insert statement executed - with the "implicit returning" enabled. - - """ - - if not self.context.isinsert: - raise exc.InvalidRequestError( - "Statement is not an insert() expression construct.") - elif self.context._is_explicit_returning: - raise exc.InvalidRequestError( - "Can't call inserted_primary_key when returning() " - "is used.") - - return self.context.inserted_primary_key - - @util.deprecated("0.6", "Use :attr:`.ResultProxy.inserted_primary_key`") - def last_inserted_ids(self): - """Return the primary key for the row just inserted.""" - - return self.inserted_primary_key - - def last_updated_params(self): - """Return the collection of updated parameters from this - execution. - - """ - if self.context.executemany: - return self.context.compiled_parameters - else: - return self.context.compiled_parameters[0] - - def last_inserted_params(self): - """Return the collection of inserted parameters from this - execution. - - """ - if self.context.executemany: - return self.context.compiled_parameters - else: - return self.context.compiled_parameters[0] - - def lastrow_has_defaults(self): - """Return ``lastrow_has_defaults()`` from the underlying - ExecutionContext. - - See ExecutionContext for details. - """ - - return self.context.lastrow_has_defaults() - - def postfetch_cols(self): - """Return ``postfetch_cols()`` from the underlying ExecutionContext. - - See ExecutionContext for details. - """ - - return self.context.postfetch_cols - - def prefetch_cols(self): - return self.context.prefetch_cols - - def supports_sane_rowcount(self): - """Return ``supports_sane_rowcount`` from the dialect.""" - - return self.dialect.supports_sane_rowcount - - def supports_sane_multi_rowcount(self): - """Return ``supports_sane_multi_rowcount`` from the dialect.""" - - return self.dialect.supports_sane_multi_rowcount - - def _fetchone_impl(self): - try: - return self.cursor.fetchone() - except AttributeError: - self._non_result() - - def _fetchmany_impl(self, size=None): - try: - if size is None: - return self.cursor.fetchmany() - else: - return self.cursor.fetchmany(size) - except AttributeError: - self._non_result() - - def _fetchall_impl(self): - try: - return self.cursor.fetchall() - except AttributeError: - self._non_result() - - def _non_result(self): - if self._metadata is None: - raise exc.ResourceClosedError( - "This result object does not return rows. " - "It has been closed automatically.", - ) - else: - raise exc.ResourceClosedError("This result object is closed.") - - def process_rows(self, rows): - process_row = self._process_row - metadata = self._metadata - keymap = metadata._keymap - processors = metadata._processors - if self._echo: - log = self.context.engine.logger.debug - l = [] - for row in rows: - log("Row %r", row) - l.append(process_row(metadata, row, processors, keymap)) - return l - else: - return [process_row(metadata, row, processors, keymap) - for row in rows] - - def fetchall(self): - """Fetch all rows, just like DB-API ``cursor.fetchall()``.""" - - try: - l = self.process_rows(self._fetchall_impl()) - self.close() - return l - except Exception, e: - self.connection._handle_dbapi_exception( - e, None, None, - self.cursor, self.context) - raise - - def fetchmany(self, size=None): - """Fetch many rows, just like DB-API - ``cursor.fetchmany(size=cursor.arraysize)``. - - If rows are present, the cursor remains open after this is called. - Else the cursor is automatically closed and an empty list is returned. - - """ - - try: - l = self.process_rows(self._fetchmany_impl(size)) - if len(l) == 0: - self.close() - return l - except Exception, e: - self.connection._handle_dbapi_exception( - e, None, None, - self.cursor, self.context) - raise - - def fetchone(self): - """Fetch one row, just like DB-API ``cursor.fetchone()``. - - If a row is present, the cursor remains open after this is called. - Else the cursor is automatically closed and None is returned. - - """ - try: - row = self._fetchone_impl() - if row is not None: - return self.process_rows([row])[0] - else: - self.close() - return None - except Exception, e: - self.connection._handle_dbapi_exception( - e, None, None, - self.cursor, self.context) - raise - - def first(self): - """Fetch the first row and then close the result set unconditionally. - - Returns None if no row is present. - - """ - if self._metadata is None: - self._non_result() - - try: - row = self._fetchone_impl() - except Exception, e: - self.connection._handle_dbapi_exception( - e, None, None, - self.cursor, self.context) - raise - - try: - if row is not None: - return self.process_rows([row])[0] - else: - return None - finally: - self.close() - - def scalar(self): - """Fetch the first column of the first row, and close the result set. - - Returns None if no row is present. - - """ - row = self.first() - if row is not None: - return row[0] - else: - return None - -class BufferedRowResultProxy(ResultProxy): - """A ResultProxy with row buffering behavior. - - ``ResultProxy`` that buffers the contents of a selection of rows - before ``fetchone()`` is called. This is to allow the results of - ``cursor.description`` to be available immediately, when - interfacing with a DB-API that requires rows to be consumed before - this information is available (currently psycopg2, when used with - server-side cursors). - - The pre-fetching behavior fetches only one row initially, and then - grows its buffer size by a fixed amount with each successive need - for additional rows up to a size of 100. - """ - - def _init_metadata(self): - self.__buffer_rows() - super(BufferedRowResultProxy, self)._init_metadata() - - # this is a "growth chart" for the buffering of rows. - # each successive __buffer_rows call will use the next - # value in the list for the buffer size until the max - # is reached - size_growth = { - 1 : 5, - 5 : 10, - 10 : 20, - 20 : 50, - 50 : 100, - 100 : 250, - 250 : 500, - 500 : 1000 - } - - def __buffer_rows(self): - size = getattr(self, '_bufsize', 1) - self.__rowbuffer = collections.deque(self.cursor.fetchmany(size)) - self._bufsize = self.size_growth.get(size, size) - - def _fetchone_impl(self): - if self.closed: - return None - if not self.__rowbuffer: - self.__buffer_rows() - if not self.__rowbuffer: - return None - return self.__rowbuffer.popleft() - - def _fetchmany_impl(self, size=None): - if size is None: - return self._fetchall_impl() - result = [] - for x in range(0, size): - row = self._fetchone_impl() - if row is None: - break - result.append(row) - return result - - def _fetchall_impl(self): - self.__rowbuffer.extend(self.cursor.fetchall()) - ret = self.__rowbuffer - self.__rowbuffer = collections.deque() - return ret - -class FullyBufferedResultProxy(ResultProxy): - """A result proxy that buffers rows fully upon creation. - - Used for operations where a result is to be delivered - after the database conversation can not be continued, - such as MSSQL INSERT...OUTPUT after an autocommit. - - """ - def _init_metadata(self): - super(FullyBufferedResultProxy, self)._init_metadata() - self.__rowbuffer = self._buffer_rows() - - def _buffer_rows(self): - return collections.deque(self.cursor.fetchall()) - - def _fetchone_impl(self): - if self.__rowbuffer: - return self.__rowbuffer.popleft() - else: - return None - - def _fetchmany_impl(self, size=None): - if size is None: - return self._fetchall_impl() - result = [] - for x in range(0, size): - row = self._fetchone_impl() - if row is None: - break - result.append(row) - return result - - def _fetchall_impl(self): - ret = self.__rowbuffer - self.__rowbuffer = collections.deque() - return ret - -class BufferedColumnRow(RowProxy): - def __init__(self, parent, row, processors, keymap): - # preprocess row - row = list(row) - # this is a tad faster than using enumerate - index = 0 - for processor in parent._orig_processors: - if processor is not None: - row[index] = processor(row[index]) - index += 1 - row = tuple(row) - super(BufferedColumnRow, self).__init__(parent, row, - processors, keymap) - -class BufferedColumnResultProxy(ResultProxy): - """A ResultProxy with column buffering behavior. - - ``ResultProxy`` that loads all columns into memory each time - fetchone() is called. If fetchmany() or fetchall() are called, - the full grid of results is fetched. This is to operate with - databases where result rows contain "live" results that fall out - of scope unless explicitly fetched. Currently this includes - cx_Oracle LOB objects. - - """ - - _process_row = BufferedColumnRow - - def _init_metadata(self): - super(BufferedColumnResultProxy, self)._init_metadata() - metadata = self._metadata - # orig_processors will be used to preprocess each row when they are - # constructed. - metadata._orig_processors = metadata._processors - # replace the all type processors by None processors. - metadata._processors = [None for _ in xrange(len(metadata.keys))] - keymap = {} - for k, (func, index) in metadata._keymap.iteritems(): - keymap[k] = (None, index) - self._metadata._keymap = keymap - - def fetchall(self): - # can't call cursor.fetchall(), since rows must be - # fully processed before requesting more from the DBAPI. - l = [] - while True: - row = self.fetchone() - if row is None: - break - l.append(row) - return l - - def fetchmany(self, size=None): - # can't call cursor.fetchmany(), since rows must be - # fully processed before requesting more from the DBAPI. - if size is None: - return self.fetchall() - l = [] - for i in xrange(size): - row = self.fetchone() - if row is None: - break - l.append(row) - return l - -def connection_memoize(key): - """Decorator, memoize a function in a connection.info stash. - - Only applicable to functions which take no arguments other than a - connection. The memo will be stored in ``connection.info[key]``. - """ - - @util.decorator - def decorated(fn, self, connection): - connection = connection.connect() - try: - return connection.info[key] - except KeyError: - connection.info[key] = val = fn(self, connection) - return val - - return decorated |