blob: b6f9e1db2c56d7a4cc950bc9d81226d8a12aa554 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for implementations of L{IReactorTime}.
"""
__metaclass__ = type
from twisted.python.runtime import platform
from twisted.internet.test.reactormixins import ReactorBuilder
from twisted.internet.interfaces import IReactorTime
class TimeTestsBuilder(ReactorBuilder):
"""
Builder for defining tests relating to L{IReactorTime}.
"""
requiredInterfaces = (IReactorTime,)
def test_delayedCallStopsReactor(self):
"""
The reactor can be stopped by a delayed call.
"""
reactor = self.buildReactor()
reactor.callLater(0, reactor.stop)
reactor.run()
class GlibTimeTestsBuilder(ReactorBuilder):
"""
Builder for defining tests relating to L{IReactorTime} for reactors based
off glib.
"""
requiredInterfaces = (IReactorTime,)
if platform.isWindows():
_reactors = ["twisted.internet.gtk2reactor.PortableGtkReactor"]
else:
_reactors = ["twisted.internet.glib2reactor.Glib2Reactor",
"twisted.internet.gtk2reactor.Gtk2Reactor"]
def test_timeout_add(self):
"""
A C{reactor.callLater} call scheduled from a C{gobject.timeout_add}
call is run on time.
"""
import gobject
reactor = self.buildReactor()
result = []
def gschedule():
reactor.callLater(0, callback)
return 0
def callback():
result.append(True)
reactor.stop()
reactor.callWhenRunning(gobject.timeout_add, 10, gschedule)
self.runReactor(reactor, 5)
self.assertEqual(result, [True])
globals().update(TimeTestsBuilder.makeTestCaseClasses())
globals().update(GlibTimeTestsBuilder.makeTestCaseClasses())
|