diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/application/test/test_internet.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/application/test/test_internet.py | 252 |
1 files changed, 0 insertions, 252 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/application/test/test_internet.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/application/test/test_internet.py deleted file mode 100755 index 9e058d71..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/application/test/test_internet.py +++ /dev/null @@ -1,252 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for (new code in) L{twisted.application.internet}. -""" - - -from zope.interface import implements -from zope.interface.verify import verifyClass - -from twisted.internet.protocol import Factory -from twisted.trial.unittest import TestCase -from twisted.application.internet import StreamServerEndpointService -from twisted.internet.interfaces import IStreamServerEndpoint, IListeningPort -from twisted.internet.defer import Deferred, CancelledError - -class FakeServer(object): - """ - In-memory implementation of L{IStreamServerEndpoint}. - - @ivar result: The L{Deferred} resulting from the call to C{listen}, after - C{listen} has been called. - - @ivar factory: The factory passed to C{listen}. - - @ivar cancelException: The exception to errback C{self.result} when it is - cancelled. - - @ivar port: The L{IListeningPort} which C{listen}'s L{Deferred} will fire - with. - - @ivar listenAttempts: The number of times C{listen} has been invoked. - - @ivar failImmediately: If set, the exception to fail the L{Deferred} - returned from C{listen} before it is returned. - """ - - implements(IStreamServerEndpoint) - - result = None - factory = None - failImmediately = None - cancelException = CancelledError() - listenAttempts = 0 - - def __init__(self): - self.port = FakePort() - - - def listen(self, factory): - """ - Return a Deferred and store it for future use. (Implementation of - L{IStreamServerEndpoint}). - """ - self.listenAttempts += 1 - self.factory = factory - self.result = Deferred( - canceller=lambda d: d.errback(self.cancelException)) - if self.failImmediately is not None: - self.result.errback(self.failImmediately) - return self.result - - - def startedListening(self): - """ - Test code should invoke this method after causing C{listen} to be - invoked in order to fire the L{Deferred} previously returned from - C{listen}. - """ - self.result.callback(self.port) - - - def stoppedListening(self): - """ - Test code should invoke this method after causing C{stopListening} to - be invoked on the port fired from the L{Deferred} returned from - C{listen} in order to cause the L{Deferred} returned from - C{stopListening} to fire. - """ - self.port.deferred.callback(None) - -verifyClass(IStreamServerEndpoint, FakeServer) - - - -class FakePort(object): - """ - Fake L{IListeningPort} implementation. - - @ivar deferred: The L{Deferred} returned by C{stopListening}. - """ - - implements(IListeningPort) - - deferred = None - - def stopListening(self): - self.deferred = Deferred() - return self.deferred - -verifyClass(IStreamServerEndpoint, FakeServer) - - - -class TestEndpointService(TestCase): - """ - Tests for L{twisted.application.internet}. - """ - - def setUp(self): - """ - Construct a stub server, a stub factory, and a - L{StreamServerEndpointService} to test. - """ - self.fakeServer = FakeServer() - self.factory = Factory() - self.svc = StreamServerEndpointService(self.fakeServer, self.factory) - - - def test_privilegedStartService(self): - """ - L{StreamServerEndpointService.privilegedStartService} calls its - endpoint's C{listen} method with its factory. - """ - self.svc.privilegedStartService() - self.assertIdentical(self.factory, self.fakeServer.factory) - - - def test_synchronousRaiseRaisesSynchronously(self, thunk=None): - """ - L{StreamServerEndpointService.startService} should raise synchronously - if the L{Deferred} returned by its wrapped - L{IStreamServerEndpoint.listen} has already fired with an errback and - the L{StreamServerEndpointService}'s C{_raiseSynchronously} flag has - been set. This feature is necessary to preserve compatibility with old - behavior of L{twisted.internet.strports.service}, which is to return a - service which synchronously raises an exception from C{startService} - (so that, among other things, twistd will not start running). However, - since L{IStreamServerEndpoint.listen} may fail asynchronously, it is - a bad idea to rely on this behavior. - """ - self.fakeServer.failImmediately = ZeroDivisionError() - self.svc._raiseSynchronously = True - self.assertRaises(ZeroDivisionError, thunk or self.svc.startService) - - - def test_synchronousRaisePrivileged(self): - """ - L{StreamServerEndpointService.privilegedStartService} should behave the - same as C{startService} with respect to - L{TestEndpointService.test_synchronousRaiseRaisesSynchronously}. - """ - self.test_synchronousRaiseRaisesSynchronously( - self.svc.privilegedStartService) - - - def test_failReportsError(self): - """ - L{StreamServerEndpointService.startService} and - L{StreamServerEndpointService.privilegedStartService} should both log - an exception when the L{Deferred} returned from their wrapped - L{IStreamServerEndpoint.listen} fails. - """ - self.svc.startService() - self.fakeServer.result.errback(ZeroDivisionError()) - logged = self.flushLoggedErrors(ZeroDivisionError) - self.assertEqual(len(logged), 1) - - - def test_synchronousFailReportsError(self): - """ - Without the C{_raiseSynchronously} compatibility flag, failing - immediately has the same behavior as failing later; it logs the error. - """ - self.fakeServer.failImmediately = ZeroDivisionError() - self.svc.startService() - logged = self.flushLoggedErrors(ZeroDivisionError) - self.assertEqual(len(logged), 1) - - - def test_startServiceUnstarted(self): - """ - L{StreamServerEndpointService.startService} sets the C{running} flag, - and calls its endpoint's C{listen} method with its factory, if it - has not yet been started. - """ - self.svc.startService() - self.assertIdentical(self.factory, self.fakeServer.factory) - self.assertEqual(self.svc.running, True) - - - def test_startServiceStarted(self): - """ - L{StreamServerEndpointService.startService} sets the C{running} flag, - but nothing else, if the service has already been started. - """ - self.test_privilegedStartService() - self.svc.startService() - self.assertEqual(self.fakeServer.listenAttempts, 1) - self.assertEqual(self.svc.running, True) - - - def test_stopService(self): - """ - L{StreamServerEndpointService.stopService} calls C{stopListening} on - the L{IListeningPort} returned from its endpoint, returns the - C{Deferred} from stopService, and sets C{running} to C{False}. - """ - self.svc.privilegedStartService() - self.fakeServer.startedListening() - # Ensure running gets set to true - self.svc.startService() - result = self.svc.stopService() - l = [] - result.addCallback(l.append) - self.assertEqual(len(l), 0) - self.fakeServer.stoppedListening() - self.assertEqual(len(l), 1) - self.assertFalse(self.svc.running) - - - def test_stopServiceBeforeStartFinished(self): - """ - L{StreamServerEndpointService.stopService} cancels the L{Deferred} - returned by C{listen} if it has not yet fired. No error will be logged - about the cancellation of the listen attempt. - """ - self.svc.privilegedStartService() - result = self.svc.stopService() - l = [] - result.addBoth(l.append) - self.assertEqual(l, [None]) - self.assertEqual(self.flushLoggedErrors(CancelledError), []) - - - def test_stopServiceCancelStartError(self): - """ - L{StreamServerEndpointService.stopService} cancels the L{Deferred} - returned by C{listen} if it has not fired yet. An error will be logged - if the resulting exception is not L{CancelledError}. - """ - self.fakeServer.cancelException = ZeroDivisionError() - self.svc.privilegedStartService() - result = self.svc.stopService() - l = [] - result.addCallback(l.append) - self.assertEqual(l, [None]) - stoppingErrors = self.flushLoggedErrors(ZeroDivisionError) - self.assertEqual(len(stoppingErrors), 1) - - |