aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_win32events.py
blob: b8ba59b6162b00db631e50f96b5c57b435c7a9fa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorWin32Events}.
"""

from thread import get_ident

try:
    import win32event
except ImportError:
    win32event = None

from zope.interface.verify import verifyObject

from twisted.python.threadable import isInIOThread
from twisted.internet.interfaces import IReactorWin32Events
from twisted.internet.defer import Deferred
from twisted.internet.test.reactormixins import ReactorBuilder


class Listener(object):
    """
    L{Listener} is an object that can be added to a L{IReactorWin32Events}
    reactor to receive callback notification when a Windows event is set.  It
    records what thread its callback is invoked in and fires a Deferred.

    @ivar success: A flag which is set to C{True} when the event callback is
        called.

    @ivar logThreadID: The id of the thread in which the C{logPrefix} method is
        called.

    @ivar eventThreadID: The id of the thread in which the event callback is
        called.

    @ivar connLostThreadID: The id of the thread in which the C{connectionLost}
        method is called.

    @ivar _finished: The L{Deferred} which will be fired when the event callback
        is called.
    """
    success = False
    logThreadID = eventThreadID = connLostThreadID = None

    def __init__(self, finished):
        self._finished = finished


    def logPrefix(self):
        self.logThreadID = get_ident()
        return 'Listener'


    def occurred(self):
        self.success = True
        self.eventThreadID = get_ident()
        self._finished.callback(None)


    def brokenOccurred(self):
        raise RuntimeError("Some problem")


    def returnValueOccurred(self):
        return EnvironmentError("Entirely different problem")


    def connectionLost(self, reason):
        self.connLostThreadID = get_ident()
        self._finished.errback(reason)



class Win32EventsTestsBuilder(ReactorBuilder):
    """
    Builder defining tests relating to L{IReactorWin32Events}.
    """
    requiredInterfaces = [IReactorWin32Events]

    def test_interface(self):
        """
        An instance of the reactor has all of the methods defined on
        L{IReactorWin32Events}.
        """
        reactor = self.buildReactor()
        verifyObject(IReactorWin32Events, reactor)


    def test_addEvent(self):
        """
        When an event which has been added to the reactor is set, the action
        associated with the event is invoked in the reactor thread.
        """
        reactorThreadID = get_ident()
        reactor = self.buildReactor()
        event = win32event.CreateEvent(None, False, False, None)
        finished = Deferred()
        finished.addCallback(lambda ignored: reactor.stop())
        listener = Listener(finished)
        reactor.addEvent(event, listener, 'occurred')
        reactor.callWhenRunning(win32event.SetEvent, event)
        self.runReactor(reactor)
        self.assertTrue(listener.success)
        self.assertEqual(reactorThreadID, listener.logThreadID)
        self.assertEqual(reactorThreadID, listener.eventThreadID)


    def test_ioThreadDoesNotChange(self):
        """
        Using L{IReactorWin32Events.addEvent} does not change which thread is
        reported as the I/O thread.
        """
        results = []
        def check(ignored):
            results.append(isInIOThread())
            reactor.stop()
        reactor = self.buildReactor()
        event = win32event.CreateEvent(None, False, False, None)
        finished = Deferred()
        listener = Listener(finished)
        finished.addCallback(check)
        reactor.addEvent(event, listener, 'occurred')
        reactor.callWhenRunning(win32event.SetEvent, event)
        self.runReactor(reactor)
        self.assertTrue(listener.success)
        self.assertEqual([True], results)


    def test_disconnectedOnError(self):
        """
        If the event handler raises an exception, the event is removed from the
        reactor and the handler's C{connectionLost} method is called in the I/O
        thread and the exception is logged.
        """
        reactorThreadID = get_ident()
        reactor = self.buildReactor()
        event = win32event.CreateEvent(None, False, False, None)
        finished = self.assertFailure(Deferred(), RuntimeError)
        finished.addCallback(lambda ignored: reactor.stop())
        listener = Listener(finished)
        reactor.addEvent(event, listener, 'brokenOccurred')
        reactor.callWhenRunning(win32event.SetEvent, event)
        self.runReactor(reactor)
        self.assertEqual(reactorThreadID, listener.connLostThreadID)
        self.assertEqual(1, len(self.flushLoggedErrors(RuntimeError)))


    def test_disconnectOnReturnValue(self):
        """
        If the event handler returns a value, the event is removed from the
        reactor and the handler's C{connectionLost} method is called in the I/O
        thread.
        """
        reactorThreadID = get_ident()
        reactor = self.buildReactor()
        event = win32event.CreateEvent(None, False, False, None)
        finished = self.assertFailure(Deferred(), EnvironmentError)
        finished.addCallback(lambda ignored: reactor.stop())
        listener = Listener(finished)
        reactor.addEvent(event, listener, 'returnValueOccurred')
        reactor.callWhenRunning(win32event.SetEvent, event)
        self.runReactor(reactor)
        self.assertEqual(reactorThreadID, listener.connLostThreadID)


    def test_notDisconnectedOnShutdown(self):
        """
        Event handlers added with L{IReactorWin32Events.addEvent} do not have
        C{connectionLost} called on them if they are still active when the
        reactor shuts down.
        """
        reactor = self.buildReactor()
        event = win32event.CreateEvent(None, False, False, None)
        finished = Deferred()
        listener = Listener(finished)
        reactor.addEvent(event, listener, 'occurred')
        reactor.callWhenRunning(reactor.stop)
        self.runReactor(reactor)
        self.assertIdentical(None, listener.connLostThreadID)

globals().update(Win32EventsTestsBuilder.makeTestCaseClasses())