aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threadable.py
blob: f23515ada179a158f597e70a32cd95a1cbb357b3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

import sys, pickle

try:
    import threading
except ImportError:
    threading = None

from twisted.trial import unittest
from twisted.python import threadable
from twisted.internet import defer, reactor

class TestObject:
    synchronized = ['aMethod']

    x = -1
    y = 1

    def aMethod(self):
        for i in xrange(10):
            self.x, self.y = self.y, self.x
            self.z = self.x + self.y
            assert self.z == 0, "z == %d, not 0 as expected" % (self.z,)

threadable.synchronize(TestObject)

class SynchronizationTestCase(unittest.TestCase):
    def setUp(self):
        """
        Reduce the CPython check interval so that thread switches happen much
        more often, hopefully exercising more possible race conditions.  Also,
        delay actual test startup until the reactor has been started.
        """
        if hasattr(sys, 'getcheckinterval'):
            self.addCleanup(sys.setcheckinterval, sys.getcheckinterval())
            sys.setcheckinterval(7)
        # XXX This is a trial hack.  We need to make sure the reactor
        # actually *starts* for isInIOThread() to have a meaningful result.
        # Returning a Deferred here should force that to happen, if it has
        # not happened already.  In the future, this should not be
        # necessary.
        d = defer.Deferred()
        reactor.callLater(0, d.callback, None)
        return d


    def testIsInIOThread(self):
        foreignResult = []
        t = threading.Thread(target=lambda: foreignResult.append(threadable.isInIOThread()))
        t.start()
        t.join()
        self.failIf(foreignResult[0], "Non-IO thread reported as IO thread")
        self.failUnless(threadable.isInIOThread(), "IO thread reported as not IO thread")


    def testThreadedSynchronization(self):
        o = TestObject()

        errors = []

        def callMethodLots():
            try:
                for i in xrange(1000):
                    o.aMethod()
            except AssertionError, e:
                errors.append(str(e))

        threads = []
        for x in range(5):
            t = threading.Thread(target=callMethodLots)
            threads.append(t)
            t.start()

        for t in threads:
            t.join()

        if errors:
            raise unittest.FailTest(errors)

    def testUnthreadedSynchronization(self):
        o = TestObject()
        for i in xrange(1000):
            o.aMethod()

class SerializationTestCase(unittest.TestCase):
    def testPickling(self):
        lock = threadable.XLock()
        lockType = type(lock)
        lockPickle = pickle.dumps(lock)
        newLock = pickle.loads(lockPickle)
        self.failUnless(isinstance(newLock, lockType))

    def testUnpickling(self):
        lockPickle = 'ctwisted.python.threadable\nunpickle_lock\np0\n(tp1\nRp2\n.'
        lock = pickle.loads(lockPickle)
        newPickle = pickle.dumps(lock, 2)
        newLock = pickle.loads(newPickle)

if threading is None:
    SynchronizationTestCase.testThreadedSynchronization.skip = "Platform lacks thread support"
    SerializationTestCase.testPickling.skip = "Platform lacks thread support"