aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/sqlalchemy_migrate-0.7.2-py2.7.egg/migrate/changeset/constraint.py
blob: 96407bd7bc6c0d60d66b58d281b053eeb0d15f76 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""
   This module defines standalone schema constraint classes.
"""
from sqlalchemy import schema

from migrate.exceptions import *

class ConstraintChangeset(object):
    """Base class for Constraint classes."""

    def _normalize_columns(self, cols, table_name=False):
        """Given: column objects or names; return col names and
        (maybe) a table"""
        colnames = []
        table = None
        for col in cols:
            if isinstance(col, schema.Column):
                if col.table is not None and table is None:
                    table = col.table
                if table_name:
                    col = '.'.join((col.table.name, col.name))
                else:
                    col = col.name
            colnames.append(col)
        return colnames, table

    def __do_imports(self, visitor_name, *a, **kw):
        engine = kw.pop('engine', self.table.bind)
        from migrate.changeset.databases.visitor import (get_engine_visitor,
                                                         run_single_visitor)
        visitorcallable = get_engine_visitor(engine, visitor_name)
        run_single_visitor(engine, visitorcallable, self, *a, **kw)

    def create(self, *a, **kw):
        """Create the constraint in the database.

        :param engine: the database engine to use. If this is \
        :keyword:`None` the instance's engine will be used
        :type engine: :class:`sqlalchemy.engine.base.Engine`
        :param connection: reuse connection istead of creating new one.
        :type connection: :class:`sqlalchemy.engine.base.Connection` instance
        """
        # TODO: set the parent here instead of in __init__
        self.__do_imports('constraintgenerator', *a, **kw)

    def drop(self, *a, **kw):
        """Drop the constraint from the database.

        :param engine: the database engine to use. If this is
          :keyword:`None` the instance's engine will be used
        :param cascade: Issue CASCADE drop if database supports it
        :type engine: :class:`sqlalchemy.engine.base.Engine`
        :type cascade: bool
        :param connection: reuse connection istead of creating new one.
        :type connection: :class:`sqlalchemy.engine.base.Connection` instance
        :returns: Instance with cleared columns
        """
        self.cascade = kw.pop('cascade', False)
        self.__do_imports('constraintdropper', *a, **kw)
        # the spirit of Constraint objects is that they
        # are immutable (just like in a DB.  they're only ADDed
        # or DROPped).
        #self.columns.clear()
        return self


class PrimaryKeyConstraint(ConstraintChangeset, schema.PrimaryKeyConstraint):
    """Construct PrimaryKeyConstraint

    Migrate's additional parameters:

    :param cols: Columns in constraint.
    :param table: If columns are passed as strings, this kw is required
    :type table: Table instance
    :type cols: strings or Column instances
    """

    __migrate_visit_name__ = 'migrate_primary_key_constraint'

    def __init__(self, *cols, **kwargs):
        colnames, table = self._normalize_columns(cols)
        table = kwargs.pop('table', table)
        super(PrimaryKeyConstraint, self).__init__(*colnames, **kwargs)
        if table is not None:
            self._set_parent(table)


    def autoname(self):
        """Mimic the database's automatic constraint names"""
        return "%s_pkey" % self.table.name


class ForeignKeyConstraint(ConstraintChangeset, schema.ForeignKeyConstraint):
    """Construct ForeignKeyConstraint

    Migrate's additional parameters:

    :param columns: Columns in constraint
    :param refcolumns: Columns that this FK reffers to in another table.
    :param table: If columns are passed as strings, this kw is required
    :type table: Table instance
    :type columns: list of strings or Column instances
    :type refcolumns: list of strings or Column instances
    """

    __migrate_visit_name__ = 'migrate_foreign_key_constraint'

    def __init__(self, columns, refcolumns, *args, **kwargs):
        colnames, table = self._normalize_columns(columns)
        table = kwargs.pop('table', table)
        refcolnames, reftable = self._normalize_columns(refcolumns,
                                                        table_name=True)
        super(ForeignKeyConstraint, self).__init__(colnames, refcolnames, *args,
                                                   **kwargs)
        if table is not None:
            self._set_parent(table)

    @property
    def referenced(self):
        return [e.column for e in self.elements]

    @property
    def reftable(self):
        return self.referenced[0].table

    def autoname(self):
        """Mimic the database's automatic constraint names"""
        if hasattr(self.columns, 'keys'):
            # SA <= 0.5
            firstcol = self.columns[self.columns.keys()[0]]
            ret = "%(table)s_%(firstcolumn)s_fkey" % dict(
                table=firstcol.table.name,
                firstcolumn=firstcol.name,)
        else:
            # SA >= 0.6
            ret = "%(table)s_%(firstcolumn)s_fkey" % dict(
                table=self.table.name,
                firstcolumn=self.columns[0],)
        return ret


class CheckConstraint(ConstraintChangeset, schema.CheckConstraint):
    """Construct CheckConstraint

    Migrate's additional parameters:

    :param sqltext: Plain SQL text to check condition
    :param columns: If not name is applied, you must supply this kw\
    to autoname constraint
    :param table: If columns are passed as strings, this kw is required
    :type table: Table instance
    :type columns: list of Columns instances
    :type sqltext: string
    """

    __migrate_visit_name__ = 'migrate_check_constraint'

    def __init__(self, sqltext, *args, **kwargs):
        cols = kwargs.pop('columns', [])
        if not cols and not kwargs.get('name', False):
            raise InvalidConstraintError('You must either set "name"'
                'parameter or "columns" to autogenarate it.')
        colnames, table = self._normalize_columns(cols)
        table = kwargs.pop('table', table)
        schema.CheckConstraint.__init__(self, sqltext, *args, **kwargs)
        if table is not None:
            self._set_parent(table)
        self.colnames = colnames

    def autoname(self):
        return "%(table)s_%(cols)s_check" % \
            dict(table=self.table.name, cols="_".join(self.colnames))


class UniqueConstraint(ConstraintChangeset, schema.UniqueConstraint):
    """Construct UniqueConstraint

    Migrate's additional parameters:

    :param cols: Columns in constraint.
    :param table: If columns are passed as strings, this kw is required
    :type table: Table instance
    :type cols: strings or Column instances

    .. versionadded:: 0.6.0
    """

    __migrate_visit_name__ = 'migrate_unique_constraint'

    def __init__(self, *cols, **kwargs):
        self.colnames, table = self._normalize_columns(cols)
        table = kwargs.pop('table', table)
        super(UniqueConstraint, self).__init__(*self.colnames, **kwargs)
        if table is not None:
            self._set_parent(table)

    def autoname(self):
        """Mimic the database's automatic constraint names"""
        return "%s_%s_key" % (self.table.name, self.colnames[0])