aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_udp.py
blob: 92ebceca86779bf6121cbe747470ca6ce81895f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
# -*- test-case-name: twisted.test.test_udp -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorUDP} and L{IReactorMulticast}.
"""

from twisted.trial import unittest

from twisted.internet.defer import Deferred, gatherResults, maybeDeferred
from twisted.internet import protocol, reactor, error, defer, interfaces, udp
from twisted.python import runtime


class Mixin:

    started = 0
    stopped = 0

    startedDeferred = None

    def __init__(self):
        self.packets = []

    def startProtocol(self):
        self.started = 1
        if self.startedDeferred is not None:
            d, self.startedDeferred = self.startedDeferred, None
            d.callback(None)

    def stopProtocol(self):
        self.stopped = 1


class Server(Mixin, protocol.DatagramProtocol):
    packetReceived = None
    refused = 0


    def datagramReceived(self, data, addr):
        self.packets.append((data, addr))
        if self.packetReceived is not None:
            d, self.packetReceived = self.packetReceived, None
            d.callback(None)



class Client(Mixin, protocol.ConnectedDatagramProtocol):

    packetReceived = None
    refused = 0

    def datagramReceived(self, data):
        self.packets.append(data)
        if self.packetReceived is not None:
            d, self.packetReceived = self.packetReceived, None
            d.callback(None)

    def connectionFailed(self, failure):
        if self.startedDeferred is not None:
            d, self.startedDeferred = self.startedDeferred, None
            d.errback(failure)
        self.failure = failure

    def connectionRefused(self):
        if self.startedDeferred is not None:
            d, self.startedDeferred = self.startedDeferred, None
            d.errback(error.ConnectionRefusedError("yup"))
        self.refused = 1


class GoodClient(Server):

    def connectionRefused(self):
        if self.startedDeferred is not None:
            d, self.startedDeferred = self.startedDeferred, None
            d.errback(error.ConnectionRefusedError("yup"))
        self.refused = 1



class BadClientError(Exception):
    """
    Raised by BadClient at the end of every datagramReceived call to try and
    screw stuff up.
    """



class BadClient(protocol.DatagramProtocol):
    """
    A DatagramProtocol which always raises an exception from datagramReceived.
    Used to test error handling behavior in the reactor for that method.
    """
    d = None

    def setDeferred(self, d):
        """
        Set the Deferred which will be called back when datagramReceived is
        called.
        """
        self.d = d


    def datagramReceived(self, bytes, addr):
        if self.d is not None:
            d, self.d = self.d, None
            d.callback(bytes)
        raise BadClientError("Application code is very buggy!")



class UDPTestCase(unittest.TestCase):

    def test_oldAddress(self):
        """
        The C{type} of the host address of a listening L{DatagramProtocol}'s
        transport is C{"UDP"}.
        """
        server = Server()
        d = server.startedDeferred = defer.Deferred()
        p = reactor.listenUDP(0, server, interface="127.0.0.1")
        def cbStarted(ignored):
            addr = p.getHost()
            self.assertEqual(addr.type, 'UDP')
            return p.stopListening()
        return d.addCallback(cbStarted)


    def test_startStop(self):
        """
        The L{DatagramProtocol}'s C{startProtocol} and C{stopProtocol}
        methods are called when its transports starts and stops listening,
        respectively.
        """
        server = Server()
        d = server.startedDeferred = defer.Deferred()
        port1 = reactor.listenUDP(0, server, interface="127.0.0.1")
        def cbStarted(ignored):
            self.assertEqual(server.started, 1)
            self.assertEqual(server.stopped, 0)
            return port1.stopListening()
        def cbStopped(ignored):
            self.assertEqual(server.stopped, 1)
        return d.addCallback(cbStarted).addCallback(cbStopped)


    def test_rebind(self):
        """
        Re-listening with the same L{DatagramProtocol} re-invokes the
        C{startProtocol} callback.
        """
        server = Server()
        d = server.startedDeferred = defer.Deferred()
        p = reactor.listenUDP(0, server, interface="127.0.0.1")

        def cbStarted(ignored, port):
            return port.stopListening()

        def cbStopped(ignored):
            d = server.startedDeferred = defer.Deferred()
            p = reactor.listenUDP(0, server, interface="127.0.0.1")
            return d.addCallback(cbStarted, p)

        return d.addCallback(cbStarted, p)


    def test_bindError(self):
        """
        A L{CannotListenError} exception is raised when attempting to bind a
        second protocol instance to an already bound port
        """
        server = Server()
        d = server.startedDeferred = defer.Deferred()
        port = reactor.listenUDP(0, server, interface='127.0.0.1')

        def cbStarted(ignored):
            self.assertEqual(port.getHost(), server.transport.getHost())
            server2 = Server()
            self.assertRaises(
                error.CannotListenError,
                reactor.listenUDP, port.getHost().port, server2,
                interface='127.0.0.1')
        d.addCallback(cbStarted)

        def cbFinished(ignored):
            return port.stopListening()
        d.addCallback(cbFinished)
        return d


    def test_sendPackets(self):
        """
        Datagrams can be sent with the transport's C{write} method and
        received via the C{datagramReceived} callback method.
        """
        server = Server()
        serverStarted = server.startedDeferred = defer.Deferred()
        port1 = reactor.listenUDP(0, server, interface="127.0.0.1")

        client = GoodClient()
        clientStarted = client.startedDeferred = defer.Deferred()

        def cbServerStarted(ignored):
            self.port2 = reactor.listenUDP(0, client, interface="127.0.0.1")
            return clientStarted

        d = serverStarted.addCallback(cbServerStarted)

        def cbClientStarted(ignored):
            client.transport.connect("127.0.0.1",
                                     server.transport.getHost().port)
            cAddr = client.transport.getHost()
            sAddr = server.transport.getHost()

            serverSend = client.packetReceived = defer.Deferred()
            server.transport.write("hello", (cAddr.host, cAddr.port))

            clientWrites = [
                ("a",),
                ("b", None),
                ("c", (sAddr.host, sAddr.port))]

            def cbClientSend(ignored):
                if clientWrites:
                    nextClientWrite = server.packetReceived = defer.Deferred()
                    nextClientWrite.addCallback(cbClientSend)
                    client.transport.write(*clientWrites.pop(0))
                    return nextClientWrite

            # No one will ever call .errback on either of these Deferreds,
            # but there is a non-trivial amount of test code which might
            # cause them to fail somehow.  So fireOnOneErrback=True.
            return defer.DeferredList([
                cbClientSend(None),
                serverSend],
                fireOnOneErrback=True)

        d.addCallback(cbClientStarted)

        def cbSendsFinished(ignored):
            cAddr = client.transport.getHost()
            sAddr = server.transport.getHost()
            self.assertEqual(
                client.packets,
                [("hello", (sAddr.host, sAddr.port))])
            clientAddr = (cAddr.host, cAddr.port)
            self.assertEqual(
                server.packets,
                [("a", clientAddr),
                 ("b", clientAddr),
                 ("c", clientAddr)])

        d.addCallback(cbSendsFinished)

        def cbFinished(ignored):
            return defer.DeferredList([
                defer.maybeDeferred(port1.stopListening),
                defer.maybeDeferred(self.port2.stopListening)],
                fireOnOneErrback=True)

        d.addCallback(cbFinished)
        return d


    def test_connectionRefused(self):
        """
        A L{ConnectionRefusedError} exception is raised when a connection
        attempt is actively refused by the other end.

        Note: This test assumes no one is listening on port 80 UDP.
        """
        client = GoodClient()
        clientStarted = client.startedDeferred = defer.Deferred()
        port = reactor.listenUDP(0, client, interface="127.0.0.1")

        server = Server()
        serverStarted = server.startedDeferred = defer.Deferred()
        port2 = reactor.listenUDP(0, server, interface="127.0.0.1")

        d = defer.DeferredList(
            [clientStarted, serverStarted],
            fireOnOneErrback=True)

        def cbStarted(ignored):
            connectionRefused = client.startedDeferred = defer.Deferred()
            client.transport.connect("127.0.0.1", 80)

            for i in range(10):
                client.transport.write(str(i))
                server.transport.write(str(i), ("127.0.0.1", 80))

            return self.assertFailure(
                connectionRefused,
                error.ConnectionRefusedError)

        d.addCallback(cbStarted)

        def cbFinished(ignored):
            return defer.DeferredList([
                defer.maybeDeferred(port.stopListening),
                defer.maybeDeferred(port2.stopListening)],
                fireOnOneErrback=True)

        d.addCallback(cbFinished)
        return d


    def test_badConnect(self):
        """
        A call to the transport's connect method fails with a L{ValueError}
        when a non-IP address is passed as the host value.

        A call to a transport's connect method fails with a L{RuntimeError}
        when the transport is already connected.
        """
        client = GoodClient()
        port = reactor.listenUDP(0, client, interface="127.0.0.1")
        self.assertRaises(ValueError, client.transport.connect,
                          "localhost", 80)
        client.transport.connect("127.0.0.1", 80)
        self.assertRaises(RuntimeError, client.transport.connect,
                          "127.0.0.1", 80)
        return port.stopListening()



    def test_datagramReceivedError(self):
        """
        When datagramReceived raises an exception it is logged but the port
        is not disconnected.
        """
        finalDeferred = defer.Deferred()

        def cbCompleted(ign):
            """
            Flush the exceptions which the reactor should have logged and make
            sure they're actually there.
            """
            errs = self.flushLoggedErrors(BadClientError)
            self.assertEqual(len(errs), 2, "Incorrectly found %d errors, expected 2" % (len(errs),))
        finalDeferred.addCallback(cbCompleted)

        client = BadClient()
        port = reactor.listenUDP(0, client, interface='127.0.0.1')

        def cbCleanup(result):
            """
            Disconnect the port we started and pass on whatever was given to us
            in case it was a Failure.
            """
            return defer.maybeDeferred(port.stopListening).addBoth(lambda ign: result)
        finalDeferred.addBoth(cbCleanup)

        addr = port.getHost()

        # UDP is not reliable.  Try to send as many as 60 packets before giving
        # up.  Conceivably, all sixty could be lost, but they probably won't be
        # unless all UDP traffic is being dropped, and then the rest of these
        # UDP tests will likely fail as well.  Ideally, this test (and probably
        # others) wouldn't even use actual UDP traffic: instead, they would
        # stub out the socket with a fake one which could be made to behave in
        # whatever way the test desires.  Unfortunately, this is hard because
        # of differences in various reactor implementations.
        attempts = range(60)
        succeededAttempts = []

        def makeAttempt():
            """
            Send one packet to the listening BadClient.  Set up a 0.1 second
            timeout to do re-transmits in case the packet is dropped.  When two
            packets have been received by the BadClient, stop sending and let
            the finalDeferred's callbacks do some assertions.
            """
            if not attempts:
                try:
                    self.fail("Not enough packets received")
                except:
                    finalDeferred.errback()

            self.failIfIdentical(client.transport, None, "UDP Protocol lost its transport")

            packet = str(attempts.pop(0))
            packetDeferred = defer.Deferred()
            client.setDeferred(packetDeferred)
            client.transport.write(packet, (addr.host, addr.port))

            def cbPacketReceived(packet):
                """
                A packet arrived.  Cancel the timeout for it, record it, and
                maybe finish the test.
                """
                timeoutCall.cancel()
                succeededAttempts.append(packet)
                if len(succeededAttempts) == 2:
                    # The second error has not yet been logged, since the
                    # exception which causes it hasn't even been raised yet.
                    # Give the datagramReceived call a chance to finish, then
                    # let the test finish asserting things.
                    reactor.callLater(0, finalDeferred.callback, None)
                else:
                    makeAttempt()

            def ebPacketTimeout(err):
                """
                The packet wasn't received quickly enough.  Try sending another
                one.  It doesn't matter if the packet for which this was the
                timeout eventually arrives: makeAttempt throws away the
                Deferred on which this function is the errback, so when
                datagramReceived callbacks, so it won't be on this Deferred, so
                it won't raise an AlreadyCalledError.
                """
                makeAttempt()

            packetDeferred.addCallbacks(cbPacketReceived, ebPacketTimeout)
            packetDeferred.addErrback(finalDeferred.errback)

            timeoutCall = reactor.callLater(
                0.1, packetDeferred.errback,
                error.TimeoutError(
                    "Timed out in testDatagramReceivedError"))

        makeAttempt()
        return finalDeferred


    def test_portRepr(self):
        """
        The port number being listened on can be found in the string
        returned from calling repr() on L{twisted.internet.udp.Port}.
        """
        client = GoodClient()
        p = reactor.listenUDP(0, client)
        portNo = str(p.getHost().port)
        self.failIf(repr(p).find(portNo) == -1)
        def stoppedListening(ign):
            self.failIf(repr(p).find(portNo) != -1)
        d = defer.maybeDeferred(p.stopListening)
        d.addCallback(stoppedListening)
        return d


    def test_NoWarningOnBroadcast(self):
        """
        C{'<broadcast>'} is an alternative way to say C{'255.255.255.255'}
        ({socket.gethostbyname("<broadcast>")} returns C{'255.255.255.255'}),
        so because it becomes a valid IP address, no deprecation warning about
        passing hostnames to L{twisted.internet.udp.Port.write} needs to be
        emitted by C{write()} in this case.
        """
        class fakeSocket:
            def sendto(self, foo, bar):
                pass

        p = udp.Port(0, Server())
        p.socket = fakeSocket()
        p.write("test", ("<broadcast>", 1234))

        warnings = self.flushWarnings([self.test_NoWarningOnBroadcast])
        self.assertEqual(len(warnings), 0)



class ReactorShutdownInteraction(unittest.TestCase):
    """Test reactor shutdown interaction"""

    def setUp(self):
        """Start a UDP port"""
        self.server = Server()
        self.port = reactor.listenUDP(0, self.server, interface='127.0.0.1')

    def tearDown(self):
        """Stop the UDP port"""
        return self.port.stopListening()

    def testShutdownFromDatagramReceived(self):
        """Test reactor shutdown while in a recvfrom() loop"""

        # udp.Port's doRead calls recvfrom() in a loop, as an optimization.
        # It is important this loop terminate under various conditions.
        # Previously, if datagramReceived synchronously invoked
        # reactor.stop(), under certain reactors, the Port's socket would
        # synchronously disappear, causing an AttributeError inside that
        # loop.  This was mishandled, causing the loop to spin forever.
        # This test is primarily to ensure that the loop never spins
        # forever.

        finished = defer.Deferred()
        pr = self.server.packetReceived = defer.Deferred()

        def pktRece(ignored):
            # Simulate reactor.stop() behavior :(
            self.server.transport.connectionLost()
            # Then delay this Deferred chain until the protocol has been
            # disconnected, as the reactor should do in an error condition
            # such as we are inducing.  This is very much a whitebox test.
            reactor.callLater(0, finished.callback, None)
        pr.addCallback(pktRece)

        def flushErrors(ignored):
            # We are breaking abstraction and calling private APIs, any
            # number of horrible errors might occur.  As long as the reactor
            # doesn't hang, this test is satisfied.  (There may be room for
            # another, stricter test.)
            self.flushLoggedErrors()
        finished.addCallback(flushErrors)
        self.server.transport.write('\0' * 64, ('127.0.0.1',
                                    self.server.transport.getHost().port))
        return finished



class MulticastTestCase(unittest.TestCase):

    def setUp(self):
        self.server = Server()
        self.client = Client()
        # multicast won't work if we listen over loopback, apparently
        self.port1 = reactor.listenMulticast(0, self.server)
        self.port2 = reactor.listenMulticast(0, self.client)
        self.client.transport.connect(
            "127.0.0.1", self.server.transport.getHost().port)


    def tearDown(self):
        return gatherResults([
            maybeDeferred(self.port1.stopListening),
            maybeDeferred(self.port2.stopListening)])


    def testTTL(self):
        for o in self.client, self.server:
            self.assertEqual(o.transport.getTTL(), 1)
            o.transport.setTTL(2)
            self.assertEqual(o.transport.getTTL(), 2)


    def test_loopback(self):
        """
        Test that after loopback mode has been set, multicast packets are
        delivered to their sender.
        """
        self.assertEqual(self.server.transport.getLoopbackMode(), 1)
        addr = self.server.transport.getHost()
        joined = self.server.transport.joinGroup("225.0.0.250")

        def cbJoined(ignored):
            d = self.server.packetReceived = Deferred()
            self.server.transport.write("hello", ("225.0.0.250", addr.port))
            return d
        joined.addCallback(cbJoined)

        def cbPacket(ignored):
            self.assertEqual(len(self.server.packets), 1)
            self.server.transport.setLoopbackMode(0)
            self.assertEqual(self.server.transport.getLoopbackMode(), 0)
            self.server.transport.write("hello", ("225.0.0.250", addr.port))

            # This is fairly lame.
            d = Deferred()
            reactor.callLater(0, d.callback, None)
            return d
        joined.addCallback(cbPacket)

        def cbNoPacket(ignored):
            self.assertEqual(len(self.server.packets), 1)
        joined.addCallback(cbNoPacket)

        return joined


    def test_interface(self):
        """
        Test C{getOutgoingInterface} and C{setOutgoingInterface}.
        """
        self.assertEqual(
            self.client.transport.getOutgoingInterface(), "0.0.0.0")
        self.assertEqual(
            self.server.transport.getOutgoingInterface(), "0.0.0.0")

        d1 = self.client.transport.setOutgoingInterface("127.0.0.1")
        d2 = self.server.transport.setOutgoingInterface("127.0.0.1")
        result = gatherResults([d1, d2])

        def cbInterfaces(ignored):
            self.assertEqual(
                self.client.transport.getOutgoingInterface(), "127.0.0.1")
            self.assertEqual(
                self.server.transport.getOutgoingInterface(), "127.0.0.1")
        result.addCallback(cbInterfaces)
        return result


    def test_joinLeave(self):
        """
        Test that multicast a group can be joined and left.
        """
        d = self.client.transport.joinGroup("225.0.0.250")

        def clientJoined(ignored):
            return self.client.transport.leaveGroup("225.0.0.250")
        d.addCallback(clientJoined)

        def clientLeft(ignored):
            return self.server.transport.joinGroup("225.0.0.250")
        d.addCallback(clientLeft)

        def serverJoined(ignored):
            return self.server.transport.leaveGroup("225.0.0.250")
        d.addCallback(serverJoined)

        return d


    def test_joinFailure(self):
        """
        Test that an attempt to join an address which is not a multicast
        address fails with L{error.MulticastJoinError}.
        """
        # 127.0.0.1 is not a multicast address, so joining it should fail.
        return self.assertFailure(
            self.client.transport.joinGroup("127.0.0.1"),
            error.MulticastJoinError)
    if runtime.platform.isWindows() and not runtime.platform.isVista():
        test_joinFailure.todo = "Windows' multicast is wonky"


    def test_multicast(self):
        """
        Test that a multicast group can be joined and messages sent to and
        received from it.
        """
        c = Server()
        p = reactor.listenMulticast(0, c)
        addr = self.server.transport.getHost()

        joined = self.server.transport.joinGroup("225.0.0.250")

        def cbJoined(ignored):
            d = self.server.packetReceived = Deferred()
            c.transport.write("hello world", ("225.0.0.250", addr.port))
            return d
        joined.addCallback(cbJoined)

        def cbPacket(ignored):
            self.assertEqual(self.server.packets[0][0], "hello world")
        joined.addCallback(cbPacket)

        def cleanup(passthrough):
            result = maybeDeferred(p.stopListening)
            result.addCallback(lambda ign: passthrough)
            return result
        joined.addCallback(cleanup)

        return joined


    def test_multiListen(self):
        """
        Test that multiple sockets can listen on the same multicast port and
        that they both receive multicast messages directed to that address.
        """
        firstClient = Server()
        firstPort = reactor.listenMulticast(
            0, firstClient, listenMultiple=True)

        portno = firstPort.getHost().port

        secondClient = Server()
        secondPort = reactor.listenMulticast(
            portno, secondClient, listenMultiple=True)

        theGroup = "225.0.0.250"
        joined = gatherResults([self.server.transport.joinGroup(theGroup),
                                firstPort.joinGroup(theGroup),
                                secondPort.joinGroup(theGroup)])


        def serverJoined(ignored):
            d1 = firstClient.packetReceived = Deferred()
            d2 = secondClient.packetReceived = Deferred()
            firstClient.transport.write("hello world", (theGroup, portno))
            return gatherResults([d1, d2])
        joined.addCallback(serverJoined)

        def gotPackets(ignored):
            self.assertEqual(firstClient.packets[0][0], "hello world")
            self.assertEqual(secondClient.packets[0][0], "hello world")
        joined.addCallback(gotPackets)

        def cleanup(passthrough):
            result = gatherResults([
                maybeDeferred(firstPort.stopListening),
                maybeDeferred(secondPort.stopListening)])
            result.addCallback(lambda ign: passthrough)
            return result
        joined.addBoth(cleanup)
        return joined
    if runtime.platform.isWindows():
        test_multiListen.skip = ("on non-linux platforms it appears multiple "
                                 "processes can listen, but not multiple sockets "
                                 "in same process?")


if not interfaces.IReactorUDP(reactor, None):
    UDPTestCase.skip = "This reactor does not support UDP"
    ReactorShutdownInteraction.skip = "This reactor does not support UDP"
if not interfaces.IReactorMulticast(reactor, None):
    MulticastTestCase.skip = "This reactor does not support multicast"

def checkForLinux22():
    import os
    if os.path.exists("/proc/version"):
        s = open("/proc/version").read()
        if s.startswith("Linux version"):
            s = s.split()[2]
            if s.split(".")[:2] == ["2", "2"]:
                f = MulticastTestCase.testInterface.im_func
                f.todo = "figure out why this fails in linux 2.2"
checkForLinux22()