aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/sqlalchemy_migrate-0.7.2-py2.7.egg/migrate/tests/versioning/test_genmodel.py
blob: eeefe8a38a1f687b8bb8a4a082224e7c464f659f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# -*- coding: utf-8 -*-

import os

import sqlalchemy
from sqlalchemy import *
from nose.tools import eq_

from migrate.versioning import genmodel, schemadiff
from migrate.changeset import schema

from migrate.tests import fixture


class TestSchemaDiff(fixture.DB):
    table_name = 'tmp_schemadiff'
    level = fixture.DB.CONNECT

    def _setup(self, url):
        super(TestSchemaDiff, self)._setup(url)
        self.meta = MetaData(self.engine, reflect=True)
        self.meta.drop_all()  # in case junk tables are lying around in the test database
        self.meta = MetaData(self.engine, reflect=True)  # needed if we just deleted some tables
        self.table = Table(self.table_name, self.meta,
            Column('id',Integer(), primary_key=True),
            Column('name', UnicodeText()),
            Column('data', UnicodeText()),
        )

    def _teardown(self):
        if self.table.exists():
            self.meta = MetaData(self.engine, reflect=True)
            self.meta.drop_all()
        super(TestSchemaDiff, self)._teardown()

    def _applyLatestModel(self):
        diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version'])
        genmodel.ModelGenerator(diff,self.engine).runB2A()

    @fixture.usedb()
    def test_functional(self):

        def assertDiff(isDiff, tablesMissingInDatabase, tablesMissingInModel, tablesWithDiff):
            diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version'])
            eq_(
                (diff.tables_missing_from_B,
                 diff.tables_missing_from_A,
                 diff.tables_different.keys(),
                 bool(diff)),
                (tablesMissingInDatabase,
                 tablesMissingInModel,
                 tablesWithDiff,
                 isDiff)
                )

        # Model is defined but database is empty.
        assertDiff(True, [self.table_name], [], [])

        # Check Python upgrade and downgrade of database from updated model.
        diff = schemadiff.getDiffOfModelAgainstDatabase(self.meta, self.engine, excludeTables=['migrate_version'])
        decls, upgradeCommands, downgradeCommands = genmodel.ModelGenerator(diff,self.engine).genB2AMigration()

        # Feature test for a recent SQLa feature;
        # expect different output in that case.
        if repr(String()) == 'String()':
            self.assertEqualsIgnoreWhitespace(decls, '''
            from migrate.changeset import schema
            pre_meta = MetaData()
            post_meta = MetaData()
            tmp_schemadiff = Table('tmp_schemadiff', post_meta,
                Column('id', Integer, primary_key=True, nullable=False),
                Column('name', UnicodeText),
                Column('data', UnicodeText),
            )
            ''')
        else:
            self.assertEqualsIgnoreWhitespace(decls, '''
            from migrate.changeset import schema
            pre_meta = MetaData()
            post_meta = MetaData()
            tmp_schemadiff = Table('tmp_schemadiff', post_meta,
                Column('id', Integer, primary_key=True, nullable=False),
                Column('name', UnicodeText(length=None)),
                Column('data', UnicodeText(length=None)),
            )
            ''')

        # Create table in database, now model should match database.
        self._applyLatestModel()
        assertDiff(False, [], [], [])

        # Check Python code gen from database.
        diff = schemadiff.getDiffOfModelAgainstDatabase(MetaData(), self.engine, excludeTables=['migrate_version'])
        src = genmodel.ModelGenerator(diff,self.engine).genBDefinition()

        exec src in locals()

        c1 = Table('tmp_schemadiff', self.meta, autoload=True).c
        c2 = tmp_schemadiff.c
        self.compare_columns_equal(c1, c2, ['type'])
        # TODO: get rid of ignoring type

        if not self.engine.name == 'oracle':
            # Add data, later we'll make sure it's still present.
            result = self.engine.execute(self.table.insert(), id=1, name=u'mydata')
            dataId = result.inserted_primary_key[0]

        # Modify table in model (by removing it and adding it back to model)
        # Drop column data, add columns data2 and data3.
        self.meta.remove(self.table)
        self.table = Table(self.table_name,self.meta,
            Column('id',Integer(),primary_key=True),
            Column('name',UnicodeText(length=None)),
            Column('data2',Integer(),nullable=True),
            Column('data3',Integer(),nullable=True),
        )
        assertDiff(True, [], [], [self.table_name])

        # Apply latest model changes and find no more diffs.
        self._applyLatestModel()
        assertDiff(False, [], [], [])

        # Drop column data3, add data4
        self.meta.remove(self.table)
        self.table = Table(self.table_name,self.meta,
            Column('id',Integer(),primary_key=True),
            Column('name',UnicodeText(length=None)),
            Column('data2',Integer(),nullable=True),
            Column('data4',Float(),nullable=True),
        )
        assertDiff(True, [], [], [self.table_name])

        diff = schemadiff.getDiffOfModelAgainstDatabase(
            self.meta, self.engine, excludeTables=['migrate_version'])
        decls, upgradeCommands, downgradeCommands = genmodel.ModelGenerator(diff,self.engine).genB2AMigration(indent='')

        # decls have changed since genBDefinition
        exec decls in locals()
        # migration commands expect a namespace containing migrate_engine
        migrate_engine = self.engine
        # run the migration up and down
        exec upgradeCommands in locals()
        assertDiff(False, [], [], [])

        exec decls in locals()
        exec downgradeCommands in locals()
        assertDiff(True, [], [], [self.table_name])

        exec decls in locals()
        exec upgradeCommands in locals()
        assertDiff(False, [], [], [])

        if not self.engine.name == 'oracle':
            # Make sure data is still present.
            result = self.engine.execute(self.table.select(self.table.c.id==dataId))
            rows = result.fetchall()
            eq_(len(rows), 1)
            eq_(rows[0].name, 'mydata')

            # Add data, later we'll make sure it's still present.
            result = self.engine.execute(self.table.insert(), id=2, name=u'mydata2', data2=123)
            dataId2 = result.inserted_primary_key[0]

        # Change column type in model.
        self.meta.remove(self.table)
        self.table = Table(self.table_name,self.meta,
            Column('id',Integer(),primary_key=True),
            Column('name',UnicodeText(length=None)),
            Column('data2',String(255),nullable=True),
        )

        # XXX test type diff
        return

        assertDiff(True, [], [], [self.table_name])

        # Apply latest model changes and find no more diffs.
        self._applyLatestModel()
        assertDiff(False, [], [], [])

        if not self.engine.name == 'oracle':
            # Make sure data is still present.
            result = self.engine.execute(self.table.select(self.table.c.id==dataId2))
            rows = result.fetchall()
            self.assertEquals(len(rows), 1)
            self.assertEquals(rows[0].name, 'mydata2')
            self.assertEquals(rows[0].data2, '123')

            # Delete data, since we're about to make a required column.
            # Not even using sqlalchemy.PassiveDefault helps because we're doing explicit column select.
            self.engine.execute(self.table.delete(), id=dataId)

        if not self.engine.name == 'firebird':
            # Change column nullable in model.
            self.meta.remove(self.table)
            self.table = Table(self.table_name,self.meta,
                Column('id',Integer(),primary_key=True),
                Column('name',UnicodeText(length=None)),
                Column('data2',String(255),nullable=False),
            )
            assertDiff(True, [], [], [self.table_name])  # TODO test nullable diff

            # Apply latest model changes and find no more diffs.
            self._applyLatestModel()
            assertDiff(False, [], [], [])

            # Remove table from model.
            self.meta.remove(self.table)
            assertDiff(True, [], [self.table_name], [])