aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_unix.py
blob: 863f665851d243b14e2547642bce6b6e3d2e45d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorUNIX} and L{IReactorUNIXDatagram}.
"""

import stat, os, sys, types
import socket

from twisted.internet import interfaces, reactor, protocol, error, address, defer, utils
from twisted.python import lockfile
from twisted.trial import unittest

from twisted.test.test_tcp import MyServerFactory, MyClientFactory


class FailedConnectionClientFactory(protocol.ClientFactory):
    def __init__(self, onFail):
        self.onFail = onFail

    def clientConnectionFailed(self, connector, reason):
        self.onFail.errback(reason)



class UnixSocketTestCase(unittest.TestCase):
    """
    Test unix sockets.
    """
    def test_peerBind(self):
        """
        The address passed to the server factory's C{buildProtocol} method and
        the address returned by the connected protocol's transport's C{getPeer}
        method match the address the client socket is bound to.
        """
        filename = self.mktemp()
        peername = self.mktemp()
        serverFactory = MyServerFactory()
        connMade = serverFactory.protocolConnectionMade = defer.Deferred()
        unixPort = reactor.listenUNIX(filename, serverFactory)
        self.addCleanup(unixPort.stopListening)
        unixSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.addCleanup(unixSocket.close)
        unixSocket.bind(peername)
        unixSocket.connect(filename)
        def cbConnMade(proto):
            expected = address.UNIXAddress(peername)
            self.assertEqual(serverFactory.peerAddresses, [expected])
            self.assertEqual(proto.transport.getPeer(), expected)
        connMade.addCallback(cbConnMade)
        return connMade


    def test_dumber(self):
        """
        L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
        started with L{IReactorUNIX.listenUNIX}.
        """
        filename = self.mktemp()
        serverFactory = MyServerFactory()
        serverConnMade = defer.Deferred()
        serverFactory.protocolConnectionMade = serverConnMade
        unixPort = reactor.listenUNIX(filename, serverFactory)
        self.addCleanup(unixPort.stopListening)
        clientFactory = MyClientFactory()
        clientConnMade = defer.Deferred()
        clientFactory.protocolConnectionMade = clientConnMade
        c = reactor.connectUNIX(filename, clientFactory)
        d = defer.gatherResults([serverConnMade, clientConnMade])
        def allConnected((serverProtocol, clientProtocol)):

            # Incidental assertion which may or may not be redundant with some
            # other test.  This probably deserves its own test method.
            self.assertEqual(clientFactory.peerAddresses,
                             [address.UNIXAddress(filename)])

            clientProtocol.transport.loseConnection()
            serverProtocol.transport.loseConnection()
        d.addCallback(allConnected)
        return d


    def test_pidFile(self):
        """
        A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
        called and released when the Deferred returned by the L{IListeningPort}
        provider's C{stopListening} method is called back.
        """
        filename = self.mktemp()
        serverFactory = MyServerFactory()
        serverConnMade = defer.Deferred()
        serverFactory.protocolConnectionMade = serverConnMade
        unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
        self.assertTrue(lockfile.isLocked(filename + ".lock"))

        # XXX This part would test something about the checkPID parameter, but
        # it doesn't actually.  It should be rewritten to test the several
        # different possible behaviors.  -exarkun
        clientFactory = MyClientFactory()
        clientConnMade = defer.Deferred()
        clientFactory.protocolConnectionMade = clientConnMade
        c = reactor.connectUNIX(filename, clientFactory, checkPID=1)

        d = defer.gatherResults([serverConnMade, clientConnMade])
        def _portStuff((serverProtocol, clientProto)):

            # Incidental assertion which may or may not be redundant with some
            # other test.  This probably deserves its own test method.
            self.assertEqual(clientFactory.peerAddresses,
                             [address.UNIXAddress(filename)])

            clientProto.transport.loseConnection()
            serverProtocol.transport.loseConnection()
            return unixPort.stopListening()
        d.addCallback(_portStuff)

        def _check(ignored):
            self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
        d.addCallback(_check)
        return d


    def test_socketLocking(self):
        """
        L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed
        the name of a file on which a server is already listening.
        """
        filename = self.mktemp()
        serverFactory = MyServerFactory()
        unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)

        self.assertRaises(
            error.CannotListenError,
            reactor.listenUNIX, filename, serverFactory, wantPID=True)

        def stoppedListening(ign):
            unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
            return unixPort.stopListening()

        return unixPort.stopListening().addCallback(stoppedListening)


    def _uncleanSocketTest(self, callback):
        self.filename = self.mktemp()
        source = ("from twisted.internet import protocol, reactor\n"
                  "reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,)
        env = {'PYTHONPATH': os.pathsep.join(sys.path)}

        d = utils.getProcessValue(sys.executable, ("-u", "-c", source), env=env)
        d.addCallback(callback)
        return d


    def test_uncleanServerSocketLocking(self):
        """
        If passed C{True} for the C{wantPID} parameter, a server can be started
        listening with L{IReactorUNIX.listenUNIX} when passed the name of a
        file on which a previous server which has not exited cleanly has been
        listening using the C{wantPID} option.
        """
        def ranStupidChild(ign):
            # If this next call succeeds, our lock handling is correct.
            p = reactor.listenUNIX(self.filename, MyServerFactory(), wantPID=True)
            return p.stopListening()
        return self._uncleanSocketTest(ranStupidChild)


    def test_connectToUncleanServer(self):
        """
        If passed C{True} for the C{checkPID} parameter, a client connection
        attempt made with L{IReactorUNIX.connectUNIX} fails with
        L{error.BadFileError}.
        """
        def ranStupidChild(ign):
            d = defer.Deferred()
            f = FailedConnectionClientFactory(d)
            c = reactor.connectUNIX(self.filename, f, checkPID=True)
            return self.assertFailure(d, error.BadFileError)
        return self._uncleanSocketTest(ranStupidChild)


    def _reprTest(self, serverFactory, factoryName):
        """
        Test the C{__str__} and C{__repr__} implementations of a UNIX port when
        used with the given factory.
        """
        filename = self.mktemp()
        unixPort = reactor.listenUNIX(filename, serverFactory)

        connectedString = "<%s on %r>" % (factoryName, filename)
        self.assertEqual(repr(unixPort), connectedString)
        self.assertEqual(str(unixPort), connectedString)

        d = defer.maybeDeferred(unixPort.stopListening)
        def stoppedListening(ign):
            unconnectedString = "<%s (not listening)>" % (factoryName,)
            self.assertEqual(repr(unixPort), unconnectedString)
            self.assertEqual(str(unixPort), unconnectedString)
        d.addCallback(stoppedListening)
        return d


    def test_reprWithClassicFactory(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIX.listenUNIX} contains the name of the classic factory
        class being used and the filename on which the port is listening or
        indicates that the port is not listening.
        """
        class ClassicFactory:
            def doStart(self):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(ClassicFactory, types.ClassType)

        return self._reprTest(
            ClassicFactory(), "twisted.test.test_unix.ClassicFactory")


    def test_reprWithNewStyleFactory(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIX.listenUNIX} contains the name of the new-style factory
        class being used and the filename on which the port is listening or
        indicates that the port is not listening.
        """
        class NewStyleFactory(object):
            def doStart(self):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(NewStyleFactory, type)

        return self._reprTest(
            NewStyleFactory(), "twisted.test.test_unix.NewStyleFactory")



class ClientProto(protocol.ConnectedDatagramProtocol):
    started = stopped = False
    gotback = None

    def __init__(self):
        self.deferredStarted = defer.Deferred()
        self.deferredGotBack = defer.Deferred()

    def stopProtocol(self):
        self.stopped = True

    def startProtocol(self):
        self.started = True
        self.deferredStarted.callback(None)

    def datagramReceived(self, data):
        self.gotback = data
        self.deferredGotBack.callback(None)

class ServerProto(protocol.DatagramProtocol):
    started = stopped = False
    gotwhat = gotfrom = None

    def __init__(self):
        self.deferredStarted = defer.Deferred()
        self.deferredGotWhat = defer.Deferred()

    def stopProtocol(self):
        self.stopped = True

    def startProtocol(self):
        self.started = True
        self.deferredStarted.callback(None)

    def datagramReceived(self, data, addr):
        self.gotfrom = addr
        self.transport.write("hi back", addr)
        self.gotwhat = data
        self.deferredGotWhat.callback(None)



class DatagramUnixSocketTestCase(unittest.TestCase):
    """
    Test datagram UNIX sockets.
    """
    def test_exchange(self):
        """
        Test that a datagram can be sent to and received by a server and vice
        versa.
        """
        clientaddr = self.mktemp()
        serveraddr = self.mktemp()
        sp = ServerProto()
        cp = ClientProto()
        s = reactor.listenUNIXDatagram(serveraddr, sp)
        self.addCleanup(s.stopListening)
        c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
        self.addCleanup(c.stopListening)

        d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
        def write(ignored):
            cp.transport.write("hi")
            return defer.gatherResults([sp.deferredGotWhat,
                                        cp.deferredGotBack])

        def _cbTestExchange(ignored):
            self.assertEqual("hi", sp.gotwhat)
            self.assertEqual(clientaddr, sp.gotfrom)
            self.assertEqual("hi back", cp.gotback)

        d.addCallback(write)
        d.addCallback(_cbTestExchange)
        return d


    def test_cannotListen(self):
        """
        L{IReactorUNIXDatagram.listenUNIXDatagram} raises
        L{error.CannotListenError} if the unix socket specified is already in
        use.
        """
        addr = self.mktemp()
        p = ServerProto()
        s = reactor.listenUNIXDatagram(addr, p)
        self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
        s.stopListening()
        os.unlink(addr)

    # test connecting to bound and connected (somewhere else) address

    def _reprTest(self, serverProto, protocolName):
        """
        Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
        port when used with the given protocol.
        """
        filename = self.mktemp()
        unixPort = reactor.listenUNIXDatagram(filename, serverProto)

        connectedString = "<%s on %r>" % (protocolName, filename)
        self.assertEqual(repr(unixPort), connectedString)
        self.assertEqual(str(unixPort), connectedString)

        stopDeferred = defer.maybeDeferred(unixPort.stopListening)
        def stoppedListening(ign):
            unconnectedString = "<%s (not listening)>" % (protocolName,)
            self.assertEqual(repr(unixPort), unconnectedString)
            self.assertEqual(str(unixPort), unconnectedString)
        stopDeferred.addCallback(stoppedListening)
        return stopDeferred


    def test_reprWithClassicProtocol(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
        classic protocol class being used and the filename on which the port is
        listening or indicates that the port is not listening.
        """
        class ClassicProtocol:
            def makeConnection(self, transport):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(ClassicProtocol, types.ClassType)

        return self._reprTest(
            ClassicProtocol(), "twisted.test.test_unix.ClassicProtocol")


    def test_reprWithNewStyleProtocol(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
        new-style protocol class being used and the filename on which the port
        is listening or indicates that the port is not listening.
        """
        class NewStyleProtocol(object):
            def makeConnection(self, transport):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(NewStyleProtocol, type)

        return self._reprTest(
            NewStyleProtocol(), "twisted.test.test_unix.NewStyleProtocol")



if not interfaces.IReactorUNIX(reactor, None):
    UnixSocketTestCase.skip = "This reactor does not support UNIX domain sockets"
if not interfaces.IReactorUNIXDatagram(reactor, None):
    DatagramUnixSocketTestCase.skip = "This reactor does not support UNIX datagram sockets"