diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/web/test/test_web.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/web/test/test_web.py | 1079 |
1 files changed, 0 insertions, 1079 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/web/test/test_web.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/web/test/test_web.py deleted file mode 100755 index bd9b09c3..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/web/test/test_web.py +++ /dev/null @@ -1,1079 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for various parts of L{twisted.web}. -""" - -from cStringIO import StringIO - -from zope.interface import implements -from zope.interface.verify import verifyObject - -from twisted.trial import unittest -from twisted.internet import reactor -from twisted.internet.address import IPv4Address -from twisted.internet.defer import Deferred -from twisted.web import server, resource, util -from twisted.internet import defer, interfaces, task -from twisted.web import iweb, http, http_headers, error -from twisted.python import log - - -class DummyRequest: - """ - Represents a dummy or fake request. - - @ivar _finishedDeferreds: C{None} or a C{list} of L{Deferreds} which will - be called back with C{None} when C{finish} is called or which will be - errbacked if C{processingFailed} is called. - - @type headers: C{dict} - @ivar headers: A mapping of header name to header value for all request - headers. - - @type outgoingHeaders: C{dict} - @ivar outgoingHeaders: A mapping of header name to header value for all - response headers. - - @type responseCode: C{int} - @ivar responseCode: The response code which was passed to - C{setResponseCode}. - - @type written: C{list} of C{str} - @ivar written: The bytes which have been written to the request. - """ - uri = 'http://dummy/' - method = 'GET' - client = None - - def registerProducer(self, prod,s): - self.go = 1 - while self.go: - prod.resumeProducing() - - def unregisterProducer(self): - self.go = 0 - - - def __init__(self, postpath, session=None): - self.sitepath = [] - self.written = [] - self.finished = 0 - self.postpath = postpath - self.prepath = [] - self.session = None - self.protoSession = session or server.Session(0, self) - self.args = {} - self.outgoingHeaders = {} - self.responseHeaders = http_headers.Headers() - self.responseCode = None - self.headers = {} - self._finishedDeferreds = [] - - - def getHeader(self, name): - """ - Retrieve the value of a request header. - - @type name: C{str} - @param name: The name of the request header for which to retrieve the - value. Header names are compared case-insensitively. - - @rtype: C{str} or L{NoneType} - @return: The value of the specified request header. - """ - return self.headers.get(name.lower(), None) - - - def setHeader(self, name, value): - """TODO: make this assert on write() if the header is content-length - """ - self.outgoingHeaders[name.lower()] = value - - def getSession(self): - if self.session: - return self.session - assert not self.written, "Session cannot be requested after data has been written." - self.session = self.protoSession - return self.session - - - def render(self, resource): - """ - Render the given resource as a response to this request. - - This implementation only handles a few of the most common behaviors of - resources. It can handle a render method that returns a string or - C{NOT_DONE_YET}. It doesn't know anything about the semantics of - request methods (eg HEAD) nor how to set any particular headers. - Basically, it's largely broken, but sufficient for some tests at least. - It should B{not} be expanded to do all the same stuff L{Request} does. - Instead, L{DummyRequest} should be phased out and L{Request} (or some - other real code factored in a different way) used. - """ - result = resource.render(self) - if result is server.NOT_DONE_YET: - return - self.write(result) - self.finish() - - - def write(self, data): - self.written.append(data) - - def notifyFinish(self): - """ - Return a L{Deferred} which is called back with C{None} when the request - is finished. This will probably only work if you haven't called - C{finish} yet. - """ - finished = Deferred() - self._finishedDeferreds.append(finished) - return finished - - - def finish(self): - """ - Record that the request is finished and callback and L{Deferred}s - waiting for notification of this. - """ - self.finished = self.finished + 1 - if self._finishedDeferreds is not None: - observers = self._finishedDeferreds - self._finishedDeferreds = None - for obs in observers: - obs.callback(None) - - - def processingFailed(self, reason): - """ - Errback and L{Deferreds} waiting for finish notification. - """ - if self._finishedDeferreds is not None: - observers = self._finishedDeferreds - self._finishedDeferreds = None - for obs in observers: - obs.errback(reason) - - - def addArg(self, name, value): - self.args[name] = [value] - - - def setResponseCode(self, code, message=None): - """ - Set the HTTP status response code, but takes care that this is called - before any data is written. - """ - assert not self.written, "Response code cannot be set after data has been written: %s." % "@@@@".join(self.written) - self.responseCode = code - self.responseMessage = message - - - def setLastModified(self, when): - assert not self.written, "Last-Modified cannot be set after data has been written: %s." % "@@@@".join(self.written) - - - def setETag(self, tag): - assert not self.written, "ETag cannot be set after data has been written: %s." % "@@@@".join(self.written) - - - def getClientIP(self): - """ - Return the IPv4 address of the client which made this request, if there - is one, otherwise C{None}. - """ - if isinstance(self.client, IPv4Address): - return self.client.host - return None - - -class ResourceTestCase(unittest.TestCase): - def testListEntities(self): - r = resource.Resource() - self.assertEqual([], r.listEntities()) - - -class SimpleResource(resource.Resource): - """ - @ivar _contentType: C{None} or a C{str} giving the value of the - I{Content-Type} header in the response this resource will render. If it - is C{None}, no I{Content-Type} header will be set in the response. - """ - def __init__(self, contentType=None): - resource.Resource.__init__(self) - self._contentType = contentType - - - def render(self, request): - if self._contentType is not None: - request.responseHeaders.setRawHeaders( - "content-type", [self._contentType]) - - if http.CACHED in (request.setLastModified(10), - request.setETag('MatchingTag')): - return '' - else: - return "correct" - - -class DummyChannel: - class TCP: - port = 80 - disconnected = False - - def __init__(self): - self.written = StringIO() - self.producers = [] - - def getPeer(self): - return IPv4Address("TCP", '192.168.1.1', 12344) - - def write(self, bytes): - assert isinstance(bytes, str) - self.written.write(bytes) - - def writeSequence(self, iovec): - map(self.write, iovec) - - def getHost(self): - return IPv4Address("TCP", '10.0.0.1', self.port) - - def registerProducer(self, producer, streaming): - self.producers.append((producer, streaming)) - - def loseConnection(self): - self.disconnected = True - - - class SSL(TCP): - implements(interfaces.ISSLTransport) - - site = server.Site(resource.Resource()) - - def __init__(self): - self.transport = self.TCP() - - - def requestDone(self, request): - pass - - - -class SiteTest(unittest.TestCase): - def test_simplestSite(self): - """ - L{Site.getResourceFor} returns the C{""} child of the root resource it - is constructed with when processing a request for I{/}. - """ - sres1 = SimpleResource() - sres2 = SimpleResource() - sres1.putChild("",sres2) - site = server.Site(sres1) - self.assertIdentical( - site.getResourceFor(DummyRequest([''])), - sres2, "Got the wrong resource.") - - - -class SessionTest(unittest.TestCase): - """ - Tests for L{server.Session}. - """ - def setUp(self): - """ - Create a site with one active session using a deterministic, easily - controlled clock. - """ - self.clock = task.Clock() - self.uid = 'unique' - self.site = server.Site(resource.Resource()) - self.session = server.Session(self.site, self.uid, self.clock) - self.site.sessions[self.uid] = self.session - - - def test_defaultReactor(self): - """ - If not value is passed to L{server.Session.__init__}, the global - reactor is used. - """ - session = server.Session(server.Site(resource.Resource()), '123') - self.assertIdentical(session._reactor, reactor) - - - def test_startCheckingExpiration(self): - """ - L{server.Session.startCheckingExpiration} causes the session to expire - after L{server.Session.sessionTimeout} seconds without activity. - """ - self.session.startCheckingExpiration() - - # Advance to almost the timeout - nothing should happen. - self.clock.advance(self.session.sessionTimeout - 1) - self.assertIn(self.uid, self.site.sessions) - - # Advance to the timeout, the session should expire. - self.clock.advance(1) - self.assertNotIn(self.uid, self.site.sessions) - - # There should be no calls left over, either. - self.assertFalse(self.clock.calls) - - - def test_expire(self): - """ - L{server.Session.expire} expires the session. - """ - self.session.expire() - # It should be gone from the session dictionary. - self.assertNotIn(self.uid, self.site.sessions) - # And there should be no pending delayed calls. - self.assertFalse(self.clock.calls) - - - def test_expireWhileChecking(self): - """ - L{server.Session.expire} expires the session even if the timeout call - isn't due yet. - """ - self.session.startCheckingExpiration() - self.test_expire() - - - def test_notifyOnExpire(self): - """ - A function registered with L{server.Session.notifyOnExpire} is called - when the session expires. - """ - callbackRan = [False] - def expired(): - callbackRan[0] = True - self.session.notifyOnExpire(expired) - self.session.expire() - self.assertTrue(callbackRan[0]) - - - def test_touch(self): - """ - L{server.Session.touch} updates L{server.Session.lastModified} and - delays session timeout. - """ - # Make sure it works before startCheckingExpiration - self.clock.advance(3) - self.session.touch() - self.assertEqual(self.session.lastModified, 3) - - # And after startCheckingExpiration - self.session.startCheckingExpiration() - self.clock.advance(self.session.sessionTimeout - 1) - self.session.touch() - self.clock.advance(self.session.sessionTimeout - 1) - self.assertIn(self.uid, self.site.sessions) - - # It should have advanced it by just sessionTimeout, no more. - self.clock.advance(1) - self.assertNotIn(self.uid, self.site.sessions) - - - def test_startCheckingExpirationParameterDeprecated(self): - """ - L{server.Session.startCheckingExpiration} emits a deprecation warning - if it is invoked with a parameter. - """ - self.session.startCheckingExpiration(123) - warnings = self.flushWarnings([ - self.test_startCheckingExpirationParameterDeprecated]) - self.assertEqual(len(warnings), 1) - self.assertEqual(warnings[0]['category'], DeprecationWarning) - self.assertEqual( - warnings[0]['message'], - "The lifetime parameter to startCheckingExpiration is deprecated " - "since Twisted 9.0. See Session.sessionTimeout instead.") - - - def test_checkExpiredDeprecated(self): - """ - L{server.Session.checkExpired} is deprecated. - """ - self.session.checkExpired() - warnings = self.flushWarnings([self.test_checkExpiredDeprecated]) - self.assertEqual(warnings[0]['category'], DeprecationWarning) - self.assertEqual( - warnings[0]['message'], - "Session.checkExpired is deprecated since Twisted 9.0; sessions " - "check themselves now, you don't need to.") - self.assertEqual(len(warnings), 1) - - -# Conditional requests: -# If-None-Match, If-Modified-Since - -# make conditional request: -# normal response if condition succeeds -# if condition fails: -# response code -# no body - -def httpBody(whole): - return whole.split('\r\n\r\n', 1)[1] - -def httpHeader(whole, key): - key = key.lower() - headers = whole.split('\r\n\r\n', 1)[0] - for header in headers.split('\r\n'): - if header.lower().startswith(key): - return header.split(':', 1)[1].strip() - return None - -def httpCode(whole): - l1 = whole.split('\r\n', 1)[0] - return int(l1.split()[1]) - -class ConditionalTest(unittest.TestCase): - """ - web.server's handling of conditional requests for cache validation. - """ - def setUp(self): - self.resrc = SimpleResource() - self.resrc.putChild('', self.resrc) - self.resrc.putChild('with-content-type', SimpleResource('image/jpeg')) - self.site = server.Site(self.resrc) - self.site.logFile = log.logfile - - # HELLLLLLLLLLP! This harness is Very Ugly. - self.channel = self.site.buildProtocol(None) - self.transport = http.StringTransport() - self.transport.close = lambda *a, **kw: None - self.transport.disconnecting = lambda *a, **kw: 0 - self.transport.getPeer = lambda *a, **kw: "peer" - self.transport.getHost = lambda *a, **kw: "host" - self.channel.makeConnection(self.transport) - - - def tearDown(self): - self.channel.connectionLost(None) - - - def _modifiedTest(self, modifiedSince=None, etag=None): - """ - Given the value C{modifiedSince} for the I{If-Modified-Since} header or - the value C{etag} for the I{If-Not-Match} header, verify that a response - with a 200 code, a default Content-Type, and the resource as the body is - returned. - """ - if modifiedSince is not None: - validator = "If-Modified-Since: " + modifiedSince - else: - validator = "If-Not-Match: " + etag - for line in ["GET / HTTP/1.1", validator, ""]: - self.channel.lineReceived(line) - result = self.transport.getvalue() - self.assertEqual(httpCode(result), http.OK) - self.assertEqual(httpBody(result), "correct") - self.assertEqual(httpHeader(result, "Content-Type"), "text/html") - - - def test_modified(self): - """ - If a request is made with an I{If-Modified-Since} header value with - a timestamp indicating a time before the last modification of the - requested resource, a 200 response is returned along with a response - body containing the resource. - """ - self._modifiedTest(modifiedSince=http.datetimeToString(1)) - - - def test_unmodified(self): - """ - If a request is made with an I{If-Modified-Since} header value with a - timestamp indicating a time after the last modification of the request - resource, a 304 response is returned along with an empty response body - and no Content-Type header if the application does not set one. - """ - for line in ["GET / HTTP/1.1", - "If-Modified-Since: " + http.datetimeToString(100), ""]: - self.channel.lineReceived(line) - result = self.transport.getvalue() - self.assertEqual(httpCode(result), http.NOT_MODIFIED) - self.assertEqual(httpBody(result), "") - # Since there SHOULD NOT (RFC 2616, section 10.3.5) be any - # entity-headers, the Content-Type is not set if the application does - # not explicitly set it. - self.assertEqual(httpHeader(result, "Content-Type"), None) - - - def test_invalidTimestamp(self): - """ - If a request is made with an I{If-Modified-Since} header value which - cannot be parsed, the header is treated as not having been present - and a normal 200 response is returned with a response body - containing the resource. - """ - self._modifiedTest(modifiedSince="like, maybe a week ago, I guess?") - - - def test_invalidTimestampYear(self): - """ - If a request is made with an I{If-Modified-Since} header value which - contains a string in the year position which is not an integer, the - header is treated as not having been present and a normal 200 - response is returned with a response body containing the resource. - """ - self._modifiedTest(modifiedSince="Thu, 01 Jan blah 00:00:10 GMT") - - - def test_invalidTimestampTooLongAgo(self): - """ - If a request is made with an I{If-Modified-Since} header value which - contains a year before the epoch, the header is treated as not - having been present and a normal 200 response is returned with a - response body containing the resource. - """ - self._modifiedTest(modifiedSince="Thu, 01 Jan 1899 00:00:10 GMT") - - - def test_invalidTimestampMonth(self): - """ - If a request is made with an I{If-Modified-Since} header value which - contains a string in the month position which is not a recognized - month abbreviation, the header is treated as not having been present - and a normal 200 response is returned with a response body - containing the resource. - """ - self._modifiedTest(modifiedSince="Thu, 01 Blah 1970 00:00:10 GMT") - - - def test_etagMatchedNot(self): - """ - If a request is made with an I{If-None-Match} ETag which does not match - the current ETag of the requested resource, the header is treated as not - having been present and a normal 200 response is returned with a - response body containing the resource. - """ - self._modifiedTest(etag="unmatchedTag") - - - def test_etagMatched(self): - """ - If a request is made with an I{If-None-Match} ETag which does match the - current ETag of the requested resource, a 304 response is returned along - with an empty response body. - """ - for line in ["GET / HTTP/1.1", "If-None-Match: MatchingTag", ""]: - self.channel.lineReceived(line) - result = self.transport.getvalue() - self.assertEqual(httpHeader(result, "ETag"), "MatchingTag") - self.assertEqual(httpCode(result), http.NOT_MODIFIED) - self.assertEqual(httpBody(result), "") - - - def test_unmodifiedWithContentType(self): - """ - Similar to L{test_etagMatched}, but the response should include a - I{Content-Type} header if the application explicitly sets one. - - This I{Content-Type} header SHOULD NOT be present according to RFC 2616, - section 10.3.5. It will only be present if the application explicitly - sets it. - """ - for line in ["GET /with-content-type HTTP/1.1", - "If-None-Match: MatchingTag", ""]: - self.channel.lineReceived(line) - result = self.transport.getvalue() - self.assertEqual(httpCode(result), http.NOT_MODIFIED) - self.assertEqual(httpBody(result), "") - self.assertEqual(httpHeader(result, "Content-Type"), "image/jpeg") - - - -class RequestTests(unittest.TestCase): - """ - Tests for the HTTP request class, L{server.Request}. - """ - - def test_interface(self): - """ - L{server.Request} instances provide L{iweb.IRequest}. - """ - self.assertTrue( - verifyObject(iweb.IRequest, server.Request(DummyChannel(), True))) - - - def testChildLink(self): - request = server.Request(DummyChannel(), 1) - request.gotLength(0) - request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') - self.assertEqual(request.childLink('baz'), 'bar/baz') - request = server.Request(DummyChannel(), 1) - request.gotLength(0) - request.requestReceived('GET', '/foo/bar/', 'HTTP/1.0') - self.assertEqual(request.childLink('baz'), 'baz') - - def testPrePathURLSimple(self): - request = server.Request(DummyChannel(), 1) - request.gotLength(0) - request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') - request.setHost('example.com', 80) - self.assertEqual(request.prePathURL(), 'http://example.com/foo/bar') - - def testPrePathURLNonDefault(self): - d = DummyChannel() - d.transport.port = 81 - request = server.Request(d, 1) - request.setHost('example.com', 81) - request.gotLength(0) - request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') - self.assertEqual(request.prePathURL(), 'http://example.com:81/foo/bar') - - def testPrePathURLSSLPort(self): - d = DummyChannel() - d.transport.port = 443 - request = server.Request(d, 1) - request.setHost('example.com', 443) - request.gotLength(0) - request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') - self.assertEqual(request.prePathURL(), 'http://example.com:443/foo/bar') - - def testPrePathURLSSLPortAndSSL(self): - d = DummyChannel() - d.transport = DummyChannel.SSL() - d.transport.port = 443 - request = server.Request(d, 1) - request.setHost('example.com', 443) - request.gotLength(0) - request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') - self.assertEqual(request.prePathURL(), 'https://example.com/foo/bar') - - def testPrePathURLHTTPPortAndSSL(self): - d = DummyChannel() - d.transport = DummyChannel.SSL() - d.transport.port = 80 - request = server.Request(d, 1) - request.setHost('example.com', 80) - request.gotLength(0) - request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') - self.assertEqual(request.prePathURL(), 'https://example.com:80/foo/bar') - - def testPrePathURLSSLNonDefault(self): - d = DummyChannel() - d.transport = DummyChannel.SSL() - d.transport.port = 81 - request = server.Request(d, 1) - request.setHost('example.com', 81) - request.gotLength(0) - request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') - self.assertEqual(request.prePathURL(), 'https://example.com:81/foo/bar') - - def testPrePathURLSetSSLHost(self): - d = DummyChannel() - d.transport.port = 81 - request = server.Request(d, 1) - request.setHost('foo.com', 81, 1) - request.gotLength(0) - request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') - self.assertEqual(request.prePathURL(), 'https://foo.com:81/foo/bar') - - - def test_prePathURLQuoting(self): - """ - L{Request.prePathURL} quotes special characters in the URL segments to - preserve the original meaning. - """ - d = DummyChannel() - request = server.Request(d, 1) - request.setHost('example.com', 80) - request.gotLength(0) - request.requestReceived('GET', '/foo%2Fbar', 'HTTP/1.0') - self.assertEqual(request.prePathURL(), 'http://example.com/foo%2Fbar') - - - -class RootResource(resource.Resource): - isLeaf=0 - def getChildWithDefault(self, name, request): - request.rememberRootURL() - return resource.Resource.getChildWithDefault(self, name, request) - def render(self, request): - return '' - -class RememberURLTest(unittest.TestCase): - def createServer(self, r): - chan = DummyChannel() - chan.site = server.Site(r) - return chan - - def testSimple(self): - r = resource.Resource() - r.isLeaf=0 - rr = RootResource() - r.putChild('foo', rr) - rr.putChild('', rr) - rr.putChild('bar', resource.Resource()) - chan = self.createServer(r) - for url in ['/foo/', '/foo/bar', '/foo/bar/baz', '/foo/bar/']: - request = server.Request(chan, 1) - request.setHost('example.com', 81) - request.gotLength(0) - request.requestReceived('GET', url, 'HTTP/1.0') - self.assertEqual(request.getRootURL(), "http://example.com/foo") - - def testRoot(self): - rr = RootResource() - rr.putChild('', rr) - rr.putChild('bar', resource.Resource()) - chan = self.createServer(rr) - for url in ['/', '/bar', '/bar/baz', '/bar/']: - request = server.Request(chan, 1) - request.setHost('example.com', 81) - request.gotLength(0) - request.requestReceived('GET', url, 'HTTP/1.0') - self.assertEqual(request.getRootURL(), "http://example.com/") - - -class NewRenderResource(resource.Resource): - def render_GET(self, request): - return "hi hi" - - def render_HEH(self, request): - return "ho ho" - - - -class HeadlessResource(object): - """ - A resource that implements GET but not HEAD. - """ - implements(resource.IResource) - - allowedMethods = ["GET"] - - def render(self, request): - """ - Leave the request open for future writes. - """ - self.request = request - if request.method not in self.allowedMethods: - raise error.UnsupportedMethod(self.allowedMethods) - self.request.write("some data") - return server.NOT_DONE_YET - - - - -class NewRenderTestCase(unittest.TestCase): - """ - Tests for L{server.Request.render}. - """ - def _getReq(self, resource=None): - """ - Create a request object with a stub channel and install the - passed resource at /newrender. If no resource is passed, - create one. - """ - d = DummyChannel() - if resource is None: - resource = NewRenderResource() - d.site.resource.putChild('newrender', resource) - d.transport.port = 81 - request = server.Request(d, 1) - request.setHost('example.com', 81) - request.gotLength(0) - return request - - def testGoodMethods(self): - req = self._getReq() - req.requestReceived('GET', '/newrender', 'HTTP/1.0') - self.assertEqual(req.transport.getvalue().splitlines()[-1], 'hi hi') - - req = self._getReq() - req.requestReceived('HEH', '/newrender', 'HTTP/1.0') - self.assertEqual(req.transport.getvalue().splitlines()[-1], 'ho ho') - - def testBadMethods(self): - req = self._getReq() - req.requestReceived('CONNECT', '/newrender', 'HTTP/1.0') - self.assertEqual(req.code, 501) - - req = self._getReq() - req.requestReceived('hlalauguG', '/newrender', 'HTTP/1.0') - self.assertEqual(req.code, 501) - - def testImplicitHead(self): - req = self._getReq() - req.requestReceived('HEAD', '/newrender', 'HTTP/1.0') - self.assertEqual(req.code, 200) - self.assertEqual(-1, req.transport.getvalue().find('hi hi')) - - - def test_unsupportedHead(self): - """ - HEAD requests against resource that only claim support for GET - should not include a body in the response. - """ - resource = HeadlessResource() - req = self._getReq(resource) - req.requestReceived("HEAD", "/newrender", "HTTP/1.0") - headers, body = req.transport.getvalue().split('\r\n\r\n') - self.assertEqual(req.code, 200) - self.assertEqual(body, '') - - - -class GettableResource(resource.Resource): - """ - Used by AllowedMethodsTest to simulate an allowed method. - """ - def render_GET(self): - pass - - def render_fred_render_ethel(self): - """ - The unusual method name is designed to test the culling method - in C{twisted.web.resource._computeAllowedMethods}. - """ - pass - - - -class AllowedMethodsTest(unittest.TestCase): - """ - 'C{twisted.web.resource._computeAllowedMethods} is provided by a - default should the subclass not provide the method. - """ - - - def _getReq(self): - """ - Generate a dummy request for use by C{_computeAllowedMethod} tests. - """ - d = DummyChannel() - d.site.resource.putChild('gettableresource', GettableResource()) - d.transport.port = 81 - request = server.Request(d, 1) - request.setHost('example.com', 81) - request.gotLength(0) - return request - - - def test_computeAllowedMethods(self): - """ - C{_computeAllowedMethods} will search through the - 'gettableresource' for all attributes/methods of the form - 'render_{method}' ('render_GET', for example) and return a list of - the methods. 'HEAD' will always be included from the - resource.Resource superclass. - """ - res = GettableResource() - allowedMethods = resource._computeAllowedMethods(res) - self.assertEqual(set(allowedMethods), - set(['GET', 'HEAD', 'fred_render_ethel'])) - - - def test_notAllowed(self): - """ - When an unsupported method is requested, the default - L{_computeAllowedMethods} method will be called to determine the - allowed methods, and the HTTP 405 'Method Not Allowed' status will - be returned with the allowed methods will be returned in the - 'Allow' header. - """ - req = self._getReq() - req.requestReceived('POST', '/gettableresource', 'HTTP/1.0') - self.assertEqual(req.code, 405) - self.assertEqual( - set(req.responseHeaders.getRawHeaders('allow')[0].split(", ")), - set(['GET', 'HEAD','fred_render_ethel']) - ) - - - def test_notAllowedQuoting(self): - """ - When an unsupported method response is generated, an HTML message will - be displayed. That message should include a quoted form of the URI and, - since that value come from a browser and shouldn't necessarily be - trusted. - """ - req = self._getReq() - req.requestReceived('POST', '/gettableresource?' - 'value=<script>bad', 'HTTP/1.0') - self.assertEqual(req.code, 405) - renderedPage = req.transport.getvalue() - self.assertNotIn("<script>bad", renderedPage) - self.assertIn('<script>bad', renderedPage) - - - def test_notImplementedQuoting(self): - """ - When an not-implemented method response is generated, an HTML message - will be displayed. That message should include a quoted form of the - requested method, since that value come from a browser and shouldn't - necessarily be trusted. - """ - req = self._getReq() - req.requestReceived('<style>bad', '/gettableresource', 'HTTP/1.0') - self.assertEqual(req.code, 501) - renderedPage = req.transport.getvalue() - self.assertNotIn("<style>bad", renderedPage) - self.assertIn('<style>bad', renderedPage) - - - -class SDResource(resource.Resource): - def __init__(self,default): - self.default = default - - - def getChildWithDefault(self, name, request): - d = defer.succeed(self.default) - resource = util.DeferredResource(d) - return resource.getChildWithDefault(name, request) - - - -class DeferredResourceTests(unittest.TestCase): - """ - Tests for L{DeferredResource}. - """ - - def testDeferredResource(self): - r = resource.Resource() - r.isLeaf = 1 - s = SDResource(r) - d = DummyRequest(['foo', 'bar', 'baz']) - resource.getChildForRequest(s, d) - self.assertEqual(d.postpath, ['bar', 'baz']) - - - def test_render(self): - """ - L{DeferredResource} uses the request object's C{render} method to - render the resource which is the result of the L{Deferred} being - handled. - """ - rendered = [] - request = DummyRequest([]) - request.render = rendered.append - - result = resource.Resource() - deferredResource = util.DeferredResource(defer.succeed(result)) - deferredResource.render(request) - self.assertEqual(rendered, [result]) - - - -class DummyRequestForLogTest(DummyRequest): - uri = '/dummy' # parent class uri has "http://", which doesn't really happen - code = 123 - - clientproto = 'HTTP/1.0' - sentLength = None - client = IPv4Address('TCP', '1.2.3.4', 12345) - - - -class TestLogEscaping(unittest.TestCase): - def setUp(self): - self.site = http.HTTPFactory() - self.site.logFile = StringIO() - self.request = DummyRequestForLogTest(self.site, False) - - def testSimple(self): - self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( - 25, 'Oct', 2004, 12, 31, 59) - self.site.log(self.request) - self.site.logFile.seek(0) - self.assertEqual( - self.site.logFile.read(), - '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "-"\n') - - def testMethodQuote(self): - self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( - 25, 'Oct', 2004, 12, 31, 59) - self.request.method = 'G"T' - self.site.log(self.request) - self.site.logFile.seek(0) - self.assertEqual( - self.site.logFile.read(), - '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "G\\"T /dummy HTTP/1.0" 123 - "-" "-"\n') - - def testRequestQuote(self): - self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( - 25, 'Oct', 2004, 12, 31, 59) - self.request.uri='/dummy"withquote' - self.site.log(self.request) - self.site.logFile.seek(0) - self.assertEqual( - self.site.logFile.read(), - '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy\\"withquote HTTP/1.0" 123 - "-" "-"\n') - - def testProtoQuote(self): - self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( - 25, 'Oct', 2004, 12, 31, 59) - self.request.clientproto='HT"P/1.0' - self.site.log(self.request) - self.site.logFile.seek(0) - self.assertEqual( - self.site.logFile.read(), - '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HT\\"P/1.0" 123 - "-" "-"\n') - - def testRefererQuote(self): - self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( - 25, 'Oct', 2004, 12, 31, 59) - self.request.headers['referer'] = 'http://malicious" ".website.invalid' - self.site.log(self.request) - self.site.logFile.seek(0) - self.assertEqual( - self.site.logFile.read(), - '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "http://malicious\\" \\".website.invalid" "-"\n') - - def testUserAgentQuote(self): - self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( - 25, 'Oct', 2004, 12, 31, 59) - self.request.headers['user-agent'] = 'Malicious Web" Evil' - self.site.log(self.request) - self.site.logFile.seek(0) - self.assertEqual( - self.site.logFile.read(), - '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "Malicious Web\\" Evil"\n') - - - -class ServerAttributesTestCase(unittest.TestCase): - """ - Tests that deprecated twisted.web.server attributes raise the appropriate - deprecation warnings when used. - """ - - def test_deprecatedAttributeDateTimeString(self): - """ - twisted.web.server.date_time_string should not be used; instead use - twisted.web.http.datetimeToString directly - """ - deprecated_func = server.date_time_string - warnings = self.flushWarnings( - offendingFunctions=[self.test_deprecatedAttributeDateTimeString]) - - self.assertEqual(len(warnings), 1) - self.assertEqual(warnings[0]['category'], DeprecationWarning) - self.assertEqual( - warnings[0]['message'], - ("twisted.web.server.date_time_string was deprecated in Twisted " - "12.1.0: Please use twisted.web.http.datetimeToString instead")) - - - def test_deprecatedAttributeStringDateTime(self): - """ - twisted.web.server.string_date_time should not be used; instead use - twisted.web.http.stringToDatetime directly - """ - deprecated_func = server.string_date_time - warnings = self.flushWarnings( - offendingFunctions=[self.test_deprecatedAttributeStringDateTime]) - - self.assertEqual(len(warnings), 1) - self.assertEqual(warnings[0]['category'], DeprecationWarning) - self.assertEqual( - warnings[0]['message'], - ("twisted.web.server.string_date_time was deprecated in Twisted " - "12.1.0: Please use twisted.web.http.stringToDatetime instead")) |