aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/sqlalchemy_migrate-0.7.2-py2.7.egg/migrate/versioning/schemadiff.py
blob: 04cf83e657eae3d4dd8f461379ce5c0eb4f7ecd3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
"""
   Schema differencing support.
"""

import logging
import sqlalchemy

from sqlalchemy.types import Float

log = logging.getLogger(__name__)

def getDiffOfModelAgainstDatabase(metadata, engine, excludeTables=None):
    """
    Return differences of model against database.

    :return: object which will evaluate to :keyword:`True` if there \
      are differences else :keyword:`False`.
    """
    db_metadata = sqlalchemy.MetaData(engine, reflect=True)

    # sqlite will include a dynamically generated 'sqlite_sequence' table if
    # there are autoincrement sequences in the database; this should not be
    # compared.
    if engine.dialect.name == 'sqlite':
        if 'sqlite_sequence' in db_metadata.tables:
            db_metadata.remove(db_metadata.tables['sqlite_sequence'])

    return SchemaDiff(metadata, db_metadata,
                      labelA='model',
                      labelB='database',
                      excludeTables=excludeTables)


def getDiffOfModelAgainstModel(metadataA, metadataB, excludeTables=None):
    """
    Return differences of model against another model.

    :return: object which will evaluate to :keyword:`True` if there \
      are differences else :keyword:`False`.
    """
    return SchemaDiff(metadataA, metadataB, excludeTables)


class ColDiff(object):
    """
    Container for differences in one :class:`~sqlalchemy.schema.Column`
    between two :class:`~sqlalchemy.schema.Table` instances, ``A``
    and ``B``.

    .. attribute:: col_A

      The :class:`~sqlalchemy.schema.Column` object for A.

    .. attribute:: col_B

      The :class:`~sqlalchemy.schema.Column` object for B.

    .. attribute:: type_A

      The most generic type of the :class:`~sqlalchemy.schema.Column`
      object in A.

    .. attribute:: type_B

      The most generic type of the :class:`~sqlalchemy.schema.Column`
      object in A.

    """

    diff = False

    def __init__(self,col_A,col_B):
        self.col_A = col_A
        self.col_B = col_B

        self.type_A = col_A.type
        self.type_B = col_B.type

        self.affinity_A = self.type_A._type_affinity
        self.affinity_B = self.type_B._type_affinity

        if self.affinity_A is not self.affinity_B:
            self.diff = True
            return

        if isinstance(self.type_A,Float) or isinstance(self.type_B,Float):
            if not (isinstance(self.type_A,Float) and isinstance(self.type_B,Float)):
                self.diff=True
                return

        for attr in ('precision','scale','length'):
            A = getattr(self.type_A,attr,None)
            B = getattr(self.type_B,attr,None)
            if not (A is None or B is None) and A!=B:
                self.diff=True
                return

    def __nonzero__(self):
        return self.diff

class TableDiff(object):
    """
    Container for differences in one :class:`~sqlalchemy.schema.Table`
    between two :class:`~sqlalchemy.schema.MetaData` instances, ``A``
    and ``B``.

    .. attribute:: columns_missing_from_A

      A sequence of column names that were found in B but weren't in
      A.

    .. attribute:: columns_missing_from_B

      A sequence of column names that were found in A but weren't in
      B.

    .. attribute:: columns_different

      A dictionary containing information about columns that were
      found to be different.
      It maps column names to a :class:`ColDiff` objects describing the
      differences found.
    """
    __slots__ = (
        'columns_missing_from_A',
        'columns_missing_from_B',
        'columns_different',
        )

    def __nonzero__(self):
        return bool(
            self.columns_missing_from_A or
            self.columns_missing_from_B or
            self.columns_different
            )

class SchemaDiff(object):
    """
    Compute the difference between two :class:`~sqlalchemy.schema.MetaData`
    objects.

    The string representation of a :class:`SchemaDiff` will summarise
    the changes found between the two
    :class:`~sqlalchemy.schema.MetaData` objects.

    The length of a :class:`SchemaDiff` will give the number of
    changes found, enabling it to be used much like a boolean in
    expressions.

    :param metadataA:
      First :class:`~sqlalchemy.schema.MetaData` to compare.

    :param metadataB:
      Second :class:`~sqlalchemy.schema.MetaData` to compare.

    :param labelA:
      The label to use in messages about the first
      :class:`~sqlalchemy.schema.MetaData`.

    :param labelB:
      The label to use in messages about the second
      :class:`~sqlalchemy.schema.MetaData`.

    :param excludeTables:
      A sequence of table names to exclude.

    .. attribute:: tables_missing_from_A

      A sequence of table names that were found in B but weren't in
      A.

    .. attribute:: tables_missing_from_B

      A sequence of table names that were found in A but weren't in
      B.

    .. attribute:: tables_different

      A dictionary containing information about tables that were found
      to be different.
      It maps table names to a :class:`TableDiff` objects describing the
      differences found.
    """

    def __init__(self,
                 metadataA, metadataB,
                 labelA='metadataA',
                 labelB='metadataB',
                 excludeTables=None):

        self.metadataA, self.metadataB = metadataA, metadataB
        self.labelA, self.labelB = labelA, labelB
        self.label_width = max(len(labelA),len(labelB))
        excludeTables = set(excludeTables or [])

        A_table_names = set(metadataA.tables.keys())
        B_table_names = set(metadataB.tables.keys())

        self.tables_missing_from_A = sorted(
            B_table_names - A_table_names - excludeTables
            )
        self.tables_missing_from_B = sorted(
            A_table_names - B_table_names - excludeTables
            )

        self.tables_different = {}
        for table_name in A_table_names.intersection(B_table_names):

            td = TableDiff()

            A_table = metadataA.tables[table_name]
            B_table = metadataB.tables[table_name]

            A_column_names = set(A_table.columns.keys())
            B_column_names = set(B_table.columns.keys())

            td.columns_missing_from_A = sorted(
                B_column_names - A_column_names
                )

            td.columns_missing_from_B = sorted(
                A_column_names - B_column_names
                )

            td.columns_different = {}

            for col_name in A_column_names.intersection(B_column_names):

                cd = ColDiff(
                    A_table.columns.get(col_name),
                    B_table.columns.get(col_name)
                    )

                if cd:
                    td.columns_different[col_name]=cd

            # XXX - index and constraint differences should
            #       be checked for here

            if td:
                self.tables_different[table_name]=td

    def __str__(self):
        ''' Summarize differences. '''
        out = []
        column_template ='      %%%is: %%r' % self.label_width

        for names,label in (
            (self.tables_missing_from_A,self.labelA),
            (self.tables_missing_from_B,self.labelB),
            ):
            if names:
                out.append(
                    '  tables missing from %s: %s' % (
                        label,', '.join(sorted(names))
                        )
                    )

        for name,td in sorted(self.tables_different.items()):
            out.append(
               '  table with differences: %s' % name
               )
            for names,label in (
                (td.columns_missing_from_A,self.labelA),
                (td.columns_missing_from_B,self.labelB),
                ):
                if names:
                    out.append(
                        '    %s missing these columns: %s' % (
                            label,', '.join(sorted(names))
                            )
                        )
            for name,cd in td.columns_different.items():
                out.append('    column with differences: %s' % name)
                out.append(column_template % (self.labelA,cd.col_A))
                out.append(column_template % (self.labelB,cd.col_B))

        if out:
            out.insert(0, 'Schema diffs:')
            return '\n'.join(out)
        else:
            return 'No schema diffs'

    def __len__(self):
        """
        Used in bool evaluation, return of 0 means no diffs.
        """
        return (
            len(self.tables_missing_from_A) +
            len(self.tables_missing_from_B) +
            len(self.tables_different)
            )