aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ftp.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ftp.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ftp.py3111
1 files changed, 0 insertions, 3111 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ftp.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ftp.py
deleted file mode 100755
index 23ffcbaa..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ftp.py
+++ /dev/null
@@ -1,3111 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-FTP tests.
-"""
-
-import os
-import errno
-from StringIO import StringIO
-import getpass
-
-from zope.interface import implements
-from zope.interface.verify import verifyClass
-
-from twisted.trial import unittest, util
-from twisted.python.randbytes import insecureRandom
-from twisted.cred.portal import IRealm
-from twisted.protocols import basic
-from twisted.internet import reactor, task, protocol, defer, error
-from twisted.internet.interfaces import IConsumer
-from twisted.cred.error import UnauthorizedLogin
-from twisted.cred import portal, checkers, credentials
-from twisted.python import failure, filepath, runtime
-from twisted.test import proto_helpers
-
-from twisted.protocols import ftp, loopback
-
-
-_changeDirectorySuppression = util.suppress(
- category=DeprecationWarning,
- message=(
- r"FTPClient\.changeDirectory is deprecated in Twisted 8\.2 and "
- r"newer\. Use FTPClient\.cwd instead\."))
-
-if runtime.platform.isWindows():
- nonPOSIXSkip = "Cannot run on Windows"
-else:
- nonPOSIXSkip = None
-
-
-class Dummy(basic.LineReceiver):
- logname = None
- def __init__(self):
- self.lines = []
- self.rawData = []
- def connectionMade(self):
- self.f = self.factory # to save typing in pdb :-)
- def lineReceived(self,line):
- self.lines.append(line)
- def rawDataReceived(self, data):
- self.rawData.append(data)
- def lineLengthExceeded(self, line):
- pass
-
-
-class _BufferingProtocol(protocol.Protocol):
- def connectionMade(self):
- self.buffer = ''
- self.d = defer.Deferred()
- def dataReceived(self, data):
- self.buffer += data
- def connectionLost(self, reason):
- self.d.callback(self)
-
-
-
-class FTPServerTestCase(unittest.TestCase):
- """
- Simple tests for an FTP server with the default settings.
-
- @ivar clientFactory: class used as ftp client.
- """
- clientFactory = ftp.FTPClientBasic
- userAnonymous = "anonymous"
-
- def setUp(self):
- # Create a directory
- self.directory = self.mktemp()
- os.mkdir(self.directory)
- self.dirPath = filepath.FilePath(self.directory)
-
- # Start the server
- p = portal.Portal(ftp.FTPRealm(
- anonymousRoot=self.directory,
- userHome=self.directory,
- ))
- p.registerChecker(checkers.AllowAnonymousAccess(),
- credentials.IAnonymous)
-
- users_checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
- self.username = "test-user"
- self.password = "test-password"
- users_checker.addUser(self.username, self.password)
- p.registerChecker(users_checker, credentials.IUsernamePassword)
-
- self.factory = ftp.FTPFactory(portal=p,
- userAnonymous=self.userAnonymous)
- port = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
- self.addCleanup(port.stopListening)
-
- # Hook the server's buildProtocol to make the protocol instance
- # accessible to tests.
- buildProtocol = self.factory.buildProtocol
- d1 = defer.Deferred()
- def _rememberProtocolInstance(addr):
- # Done hooking this.
- del self.factory.buildProtocol
-
- protocol = buildProtocol(addr)
- self.serverProtocol = protocol.wrappedProtocol
- def cleanupServer():
- if self.serverProtocol.transport is not None:
- self.serverProtocol.transport.loseConnection()
- self.addCleanup(cleanupServer)
- d1.callback(None)
- return protocol
- self.factory.buildProtocol = _rememberProtocolInstance
-
- # Connect a client to it
- portNum = port.getHost().port
- clientCreator = protocol.ClientCreator(reactor, self.clientFactory)
- d2 = clientCreator.connectTCP("127.0.0.1", portNum)
- def gotClient(client):
- self.client = client
- self.addCleanup(self.client.transport.loseConnection)
- d2.addCallback(gotClient)
- return defer.gatherResults([d1, d2])
-
- def assertCommandResponse(self, command, expectedResponseLines,
- chainDeferred=None):
- """Asserts that a sending an FTP command receives the expected
- response.
-
- Returns a Deferred. Optionally accepts a deferred to chain its actions
- to.
- """
- if chainDeferred is None:
- chainDeferred = defer.succeed(None)
-
- def queueCommand(ignored):
- d = self.client.queueStringCommand(command)
- def gotResponse(responseLines):
- self.assertEqual(expectedResponseLines, responseLines)
- return d.addCallback(gotResponse)
- return chainDeferred.addCallback(queueCommand)
-
- def assertCommandFailed(self, command, expectedResponse=None,
- chainDeferred=None):
- if chainDeferred is None:
- chainDeferred = defer.succeed(None)
-
- def queueCommand(ignored):
- return self.client.queueStringCommand(command)
- chainDeferred.addCallback(queueCommand)
- self.assertFailure(chainDeferred, ftp.CommandFailed)
- def failed(exception):
- if expectedResponse is not None:
- self.assertEqual(
- expectedResponse, exception.args[0])
- return chainDeferred.addCallback(failed)
-
- def _anonymousLogin(self):
- d = self.assertCommandResponse(
- 'USER anonymous',
- ['331 Guest login ok, type your email address as password.'])
- return self.assertCommandResponse(
- 'PASS test@twistedmatrix.com',
- ['230 Anonymous login ok, access restrictions apply.'],
- chainDeferred=d)
-
- def _userLogin(self):
- """Authenticates the FTP client using the test account."""
- d = self.assertCommandResponse(
- 'USER %s' % (self.username),
- ['331 Password required for %s.' % (self.username)])
- return self.assertCommandResponse(
- 'PASS %s' % (self.password),
- ['230 User logged in, proceed'],
- chainDeferred=d)
-
-
-class FTPAnonymousTestCase(FTPServerTestCase):
- """
- Simple tests for an FTP server with different anonymous username.
- The new anonymous username used in this test case is "guest"
- """
- userAnonymous = "guest"
-
- def test_anonymousLogin(self):
- """
- Tests whether the changing of the anonymous username is working or not.
- The FTP server should not comply about the need of password for the
- username 'guest', letting it login as anonymous asking just an email
- address as password.
- """
- d = self.assertCommandResponse(
- 'USER guest',
- ['331 Guest login ok, type your email address as password.'])
- return self.assertCommandResponse(
- 'PASS test@twistedmatrix.com',
- ['230 Anonymous login ok, access restrictions apply.'],
- chainDeferred=d)
-
-
-
-class BasicFTPServerTestCase(FTPServerTestCase):
- def testNotLoggedInReply(self):
- """When not logged in, all commands other than USER and PASS should
- get NOT_LOGGED_IN errors.
- """
- commandList = ['CDUP', 'CWD', 'LIST', 'MODE', 'PASV',
- 'PWD', 'RETR', 'STRU', 'SYST', 'TYPE']
-
- # Issue commands, check responses
- def checkResponse(exception):
- failureResponseLines = exception.args[0]
- self.failUnless(failureResponseLines[-1].startswith("530"),
- "Response didn't start with 530: %r"
- % (failureResponseLines[-1],))
- deferreds = []
- for command in commandList:
- deferred = self.client.queueStringCommand(command)
- self.assertFailure(deferred, ftp.CommandFailed)
- deferred.addCallback(checkResponse)
- deferreds.append(deferred)
- return defer.DeferredList(deferreds, fireOnOneErrback=True)
-
- def testPASSBeforeUSER(self):
- """Issuing PASS before USER should give an error."""
- return self.assertCommandFailed(
- 'PASS foo',
- ["503 Incorrect sequence of commands: "
- "USER required before PASS"])
-
- def testNoParamsForUSER(self):
- """Issuing USER without a username is a syntax error."""
- return self.assertCommandFailed(
- 'USER',
- ['500 Syntax error: USER requires an argument.'])
-
- def testNoParamsForPASS(self):
- """Issuing PASS without a password is a syntax error."""
- d = self.client.queueStringCommand('USER foo')
- return self.assertCommandFailed(
- 'PASS',
- ['500 Syntax error: PASS requires an argument.'],
- chainDeferred=d)
-
- def testAnonymousLogin(self):
- return self._anonymousLogin()
-
- def testQuit(self):
- """Issuing QUIT should return a 221 message."""
- d = self._anonymousLogin()
- return self.assertCommandResponse(
- 'QUIT',
- ['221 Goodbye.'],
- chainDeferred=d)
-
- def testAnonymousLoginDenied(self):
- # Reconfigure the server to disallow anonymous access, and to have an
- # IUsernamePassword checker that always rejects.
- self.factory.allowAnonymous = False
- denyAlwaysChecker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
- self.factory.portal.registerChecker(denyAlwaysChecker,
- credentials.IUsernamePassword)
-
- # Same response code as allowAnonymous=True, but different text.
- d = self.assertCommandResponse(
- 'USER anonymous',
- ['331 Password required for anonymous.'])
-
- # It will be denied. No-one can login.
- d = self.assertCommandFailed(
- 'PASS test@twistedmatrix.com',
- ['530 Sorry, Authentication failed.'],
- chainDeferred=d)
-
- # It's not just saying that. You aren't logged in.
- d = self.assertCommandFailed(
- 'PWD',
- ['530 Please login with USER and PASS.'],
- chainDeferred=d)
- return d
-
-
- def test_anonymousWriteDenied(self):
- """
- When an anonymous user attempts to edit the server-side filesystem, they
- will receive a 550 error with a descriptive message.
- """
- d = self._anonymousLogin()
- return self.assertCommandFailed(
- 'MKD newdir',
- ['550 Anonymous users are forbidden to change the filesystem'],
- chainDeferred=d)
-
-
- def testUnknownCommand(self):
- d = self._anonymousLogin()
- return self.assertCommandFailed(
- 'GIBBERISH',
- ["502 Command 'GIBBERISH' not implemented"],
- chainDeferred=d)
-
- def testRETRBeforePORT(self):
- d = self._anonymousLogin()
- return self.assertCommandFailed(
- 'RETR foo',
- ["503 Incorrect sequence of commands: "
- "PORT or PASV required before RETR"],
- chainDeferred=d)
-
- def testSTORBeforePORT(self):
- d = self._anonymousLogin()
- return self.assertCommandFailed(
- 'STOR foo',
- ["503 Incorrect sequence of commands: "
- "PORT or PASV required before STOR"],
- chainDeferred=d)
-
- def testBadCommandArgs(self):
- d = self._anonymousLogin()
- self.assertCommandFailed(
- 'MODE z',
- ["504 Not implemented for parameter 'z'."],
- chainDeferred=d)
- self.assertCommandFailed(
- 'STRU I',
- ["504 Not implemented for parameter 'I'."],
- chainDeferred=d)
- return d
-
- def testDecodeHostPort(self):
- self.assertEqual(ftp.decodeHostPort('25,234,129,22,100,23'),
- ('25.234.129.22', 25623))
- nums = range(6)
- for i in range(6):
- badValue = list(nums)
- badValue[i] = 256
- s = ','.join(map(str, badValue))
- self.assertRaises(ValueError, ftp.decodeHostPort, s)
-
- def testPASV(self):
- # Login
- wfd = defer.waitForDeferred(self._anonymousLogin())
- yield wfd
- wfd.getResult()
-
- # Issue a PASV command, and extract the host and port from the response
- pasvCmd = defer.waitForDeferred(self.client.queueStringCommand('PASV'))
- yield pasvCmd
- responseLines = pasvCmd.getResult()
- host, port = ftp.decodeHostPort(responseLines[-1][4:])
-
- # Make sure the server is listening on the port it claims to be
- self.assertEqual(port, self.serverProtocol.dtpPort.getHost().port)
-
- # Semi-reasonable way to force cleanup
- self.serverProtocol.transport.loseConnection()
- testPASV = defer.deferredGenerator(testPASV)
-
- def test_SYST(self):
- """SYST command will always return UNIX Type: L8"""
- d = self._anonymousLogin()
- self.assertCommandResponse('SYST', ["215 UNIX Type: L8"],
- chainDeferred=d)
- return d
-
- def test_RNFRandRNTO(self):
- """
- Sending the RNFR command followed by RNTO, with valid filenames, will
- perform a successful rename operation.
- """
- # Create user home folder with a 'foo' file.
- self.dirPath.child(self.username).createDirectory()
- self.dirPath.child(self.username).child('foo').touch()
-
- d = self._userLogin()
- self.assertCommandResponse(
- 'RNFR foo',
- ["350 Requested file action pending further information."],
- chainDeferred=d)
- self.assertCommandResponse(
- 'RNTO bar',
- ["250 Requested File Action Completed OK"],
- chainDeferred=d)
-
- def check_rename(result):
- self.assertTrue(
- self.dirPath.child(self.username).child('bar').exists())
- return result
-
- d.addCallback(check_rename)
- return d
-
- def test_RNFRwithoutRNTO(self):
- """
- Sending the RNFR command followed by any command other than RNTO
- should return an error informing users that RNFR should be followed
- by RNTO.
- """
- d = self._anonymousLogin()
- self.assertCommandResponse(
- 'RNFR foo',
- ["350 Requested file action pending further information."],
- chainDeferred=d)
- self.assertCommandFailed(
- 'OTHER don-tcare',
- ["503 Incorrect sequence of commands: RNTO required after RNFR"],
- chainDeferred=d)
- return d
-
- def test_portRangeForwardError(self):
- """
- Exceptions other than L{error.CannotListenError} which are raised by
- C{listenFactory} should be raised to the caller of L{FTP.getDTPPort}.
- """
- def listenFactory(portNumber, factory):
- raise RuntimeError()
- self.serverProtocol.listenFactory = listenFactory
-
- self.assertRaises(RuntimeError, self.serverProtocol.getDTPPort,
- protocol.Factory())
-
-
- def test_portRange(self):
- """
- L{FTP.passivePortRange} should determine the ports which
- L{FTP.getDTPPort} attempts to bind. If no port from that iterator can
- be bound, L{error.CannotListenError} should be raised, otherwise the
- first successful result from L{FTP.listenFactory} should be returned.
- """
- def listenFactory(portNumber, factory):
- if portNumber in (22032, 22033, 22034):
- raise error.CannotListenError('localhost', portNumber, 'error')
- return portNumber
- self.serverProtocol.listenFactory = listenFactory
-
- port = self.serverProtocol.getDTPPort(protocol.Factory())
- self.assertEqual(port, 0)
-
- self.serverProtocol.passivePortRange = xrange(22032, 65536)
- port = self.serverProtocol.getDTPPort(protocol.Factory())
- self.assertEqual(port, 22035)
-
- self.serverProtocol.passivePortRange = xrange(22032, 22035)
- self.assertRaises(error.CannotListenError,
- self.serverProtocol.getDTPPort,
- protocol.Factory())
-
-
- def test_portRangeInheritedFromFactory(self):
- """
- The L{FTP} instances created by L{ftp.FTPFactory.buildProtocol} have
- their C{passivePortRange} attribute set to the same object the
- factory's C{passivePortRange} attribute is set to.
- """
- portRange = xrange(2017, 2031)
- self.factory.passivePortRange = portRange
- protocol = self.factory.buildProtocol(None)
- self.assertEqual(portRange, protocol.wrappedProtocol.passivePortRange)
-
-
-
-class FTPServerTestCaseAdvancedClient(FTPServerTestCase):
- """
- Test FTP server with the L{ftp.FTPClient} class.
- """
- clientFactory = ftp.FTPClient
-
- def test_anonymousSTOR(self):
- """
- Try to make an STOR as anonymous, and check that we got a permission
- denied error.
- """
- def eb(res):
- res.trap(ftp.CommandFailed)
- self.assertEqual(res.value.args[0][0],
- '550 foo: Permission denied.')
- d1, d2 = self.client.storeFile('foo')
- d2.addErrback(eb)
- return defer.gatherResults([d1, d2])
-
-
- def test_STORwriteError(self):
- """
- Any errors during writing a file inside a STOR should be returned to
- the client.
- """
- # Make a failing file writer.
- class FailingFileWriter(ftp._FileWriter):
- def receive(self):
- return defer.fail(ftp.IsNotADirectoryError("blah"))
-
- def failingSTOR(a, b):
- return defer.succeed(FailingFileWriter(None))
-
- # Monkey patch the shell so it returns a file writer that will
- # fail.
- self.patch(ftp.FTPAnonymousShell, 'openForWriting', failingSTOR)
-
- def eb(res):
- self.flushLoggedErrors()
- res.trap(ftp.CommandFailed)
- self.assertEqual(
- res.value.args[0][0],
- "550 Cannot rmd, blah is not a directory")
- d1, d2 = self.client.storeFile('failing_file')
- d2.addErrback(eb)
- return defer.gatherResults([d1, d2])
-
- def test_RETRreadError(self):
- """
- Any errors during reading a file inside a RETR should be returned to
- the client.
- """
- # Make a failing file reading.
- class FailingFileReader(ftp._FileReader):
- def send(self, consumer):
- return defer.fail(ftp.IsADirectoryError("blah"))
-
- def failingRETR(a, b):
- return defer.succeed(FailingFileReader(None))
-
- # Monkey patch the shell so it returns a file reader that will
- # fail.
- self.patch(ftp.FTPAnonymousShell, 'openForReading', failingRETR)
-
- def check_response(failure):
- self.flushLoggedErrors()
- failure.trap(ftp.CommandFailed)
- self.assertEqual(
- failure.value.args[0][0],
- "125 Data connection already open, starting transfer")
- self.assertEqual(
- failure.value.args[0][1],
- "550 blah: is a directory")
-
- proto = _BufferingProtocol()
- d = self.client.retrieveFile('failing_file', proto)
- d.addErrback(check_response)
- return d
-
-
-class FTPServerPasvDataConnectionTestCase(FTPServerTestCase):
- def _makeDataConnection(self, ignored=None):
- # Establish a passive data connection (i.e. client connecting to
- # server).
- d = self.client.queueStringCommand('PASV')
- def gotPASV(responseLines):
- host, port = ftp.decodeHostPort(responseLines[-1][4:])
- cc = protocol.ClientCreator(reactor, _BufferingProtocol)
- return cc.connectTCP('127.0.0.1', port)
- return d.addCallback(gotPASV)
-
- def _download(self, command, chainDeferred=None):
- if chainDeferred is None:
- chainDeferred = defer.succeed(None)
-
- chainDeferred.addCallback(self._makeDataConnection)
- def queueCommand(downloader):
- # wait for the command to return, and the download connection to be
- # closed.
- d1 = self.client.queueStringCommand(command)
- d2 = downloader.d
- return defer.gatherResults([d1, d2])
- chainDeferred.addCallback(queueCommand)
-
- def downloadDone((ignored, downloader)):
- return downloader.buffer
- return chainDeferred.addCallback(downloadDone)
-
- def testEmptyLIST(self):
- # Login
- d = self._anonymousLogin()
-
- # No files, so the file listing should be empty
- self._download('LIST', chainDeferred=d)
- def checkEmpty(result):
- self.assertEqual('', result)
- return d.addCallback(checkEmpty)
-
- def testTwoDirLIST(self):
- # Make some directories
- os.mkdir(os.path.join(self.directory, 'foo'))
- os.mkdir(os.path.join(self.directory, 'bar'))
-
- # Login
- d = self._anonymousLogin()
-
- # We expect 2 lines because there are two files.
- self._download('LIST', chainDeferred=d)
- def checkDownload(download):
- self.assertEqual(2, len(download[:-2].split('\r\n')))
- d.addCallback(checkDownload)
-
- # Download a names-only listing.
- self._download('NLST ', chainDeferred=d)
- def checkDownload(download):
- filenames = download[:-2].split('\r\n')
- filenames.sort()
- self.assertEqual(['bar', 'foo'], filenames)
- d.addCallback(checkDownload)
-
- # Download a listing of the 'foo' subdirectory. 'foo' has no files, so
- # the file listing should be empty.
- self._download('LIST foo', chainDeferred=d)
- def checkDownload(download):
- self.assertEqual('', download)
- d.addCallback(checkDownload)
-
- # Change the current working directory to 'foo'.
- def chdir(ignored):
- return self.client.queueStringCommand('CWD foo')
- d.addCallback(chdir)
-
- # Download a listing from within 'foo', and again it should be empty,
- # because LIST uses the working directory by default.
- self._download('LIST', chainDeferred=d)
- def checkDownload(download):
- self.assertEqual('', download)
- return d.addCallback(checkDownload)
-
- def testManyLargeDownloads(self):
- # Login
- d = self._anonymousLogin()
-
- # Download a range of different size files
- for size in range(100000, 110000, 500):
- fObj = file(os.path.join(self.directory, '%d.txt' % (size,)), 'wb')
- fObj.write('x' * size)
- fObj.close()
-
- self._download('RETR %d.txt' % (size,), chainDeferred=d)
- def checkDownload(download, size=size):
- self.assertEqual(size, len(download))
- d.addCallback(checkDownload)
- return d
-
-
- def test_downloadFolder(self):
- """
- When RETR is called for a folder, it will fail complaining that
- the path is a folder.
- """
- # Make a directory in the current working directory
- self.dirPath.child('foo').createDirectory()
- # Login
- d = self._anonymousLogin()
- d.addCallback(self._makeDataConnection)
-
- def retrFolder(downloader):
- downloader.transport.loseConnection()
- deferred = self.client.queueStringCommand('RETR foo')
- return deferred
- d.addCallback(retrFolder)
-
- def failOnSuccess(result):
- raise AssertionError('Downloading a folder should not succeed.')
- d.addCallback(failOnSuccess)
-
- def checkError(failure):
- failure.trap(ftp.CommandFailed)
- self.assertEqual(
- ['550 foo: is a directory'], failure.value.message)
- current_errors = self.flushLoggedErrors()
- self.assertEqual(
- 0, len(current_errors),
- 'No errors should be logged while downloading a folder.')
- d.addErrback(checkError)
- return d
-
-
- def test_NLSTEmpty(self):
- """
- NLST with no argument returns the directory listing for the current
- working directory.
- """
- # Login
- d = self._anonymousLogin()
-
- # Touch a file in the current working directory
- self.dirPath.child('test.txt').touch()
- # Make a directory in the current working directory
- self.dirPath.child('foo').createDirectory()
-
- self._download('NLST ', chainDeferred=d)
- def checkDownload(download):
- filenames = download[:-2].split('\r\n')
- filenames.sort()
- self.assertEqual(['foo', 'test.txt'], filenames)
- return d.addCallback(checkDownload)
-
-
- def test_NLSTNonexistent(self):
- """
- NLST on a non-existent file/directory returns nothing.
- """
- # Login
- d = self._anonymousLogin()
-
- self._download('NLST nonexistent.txt', chainDeferred=d)
- def checkDownload(download):
- self.assertEqual('', download)
- return d.addCallback(checkDownload)
-
-
- def test_NLSTOnPathToFile(self):
- """
- NLST on an existent file returns only the path to that file.
- """
- # Login
- d = self._anonymousLogin()
-
- # Touch a file in the current working directory
- self.dirPath.child('test.txt').touch()
-
- self._download('NLST test.txt', chainDeferred=d)
- def checkDownload(download):
- filenames = download[:-2].split('\r\n')
- self.assertEqual(['test.txt'], filenames)
- return d.addCallback(checkDownload)
-
-
-
-class FTPServerPortDataConnectionTestCase(FTPServerPasvDataConnectionTestCase):
- def setUp(self):
- self.dataPorts = []
- return FTPServerPasvDataConnectionTestCase.setUp(self)
-
- def _makeDataConnection(self, ignored=None):
- # Establish an active data connection (i.e. server connecting to
- # client).
- deferred = defer.Deferred()
- class DataFactory(protocol.ServerFactory):
- protocol = _BufferingProtocol
- def buildProtocol(self, addr):
- p = protocol.ServerFactory.buildProtocol(self, addr)
- reactor.callLater(0, deferred.callback, p)
- return p
- dataPort = reactor.listenTCP(0, DataFactory(), interface='127.0.0.1')
- self.dataPorts.append(dataPort)
- cmd = 'PORT ' + ftp.encodeHostPort('127.0.0.1', dataPort.getHost().port)
- self.client.queueStringCommand(cmd)
- return deferred
-
- def tearDown(self):
- l = [defer.maybeDeferred(port.stopListening) for port in self.dataPorts]
- d = defer.maybeDeferred(
- FTPServerPasvDataConnectionTestCase.tearDown, self)
- l.append(d)
- return defer.DeferredList(l, fireOnOneErrback=True)
-
- def testPORTCannotConnect(self):
- # Login
- d = self._anonymousLogin()
-
- # Listen on a port, and immediately stop listening as a way to find a
- # port number that is definitely closed.
- def loggedIn(ignored):
- port = reactor.listenTCP(0, protocol.Factory(),
- interface='127.0.0.1')
- portNum = port.getHost().port
- d = port.stopListening()
- d.addCallback(lambda _: portNum)
- return d
- d.addCallback(loggedIn)
-
- # Tell the server to connect to that port with a PORT command, and
- # verify that it fails with the right error.
- def gotPortNum(portNum):
- return self.assertCommandFailed(
- 'PORT ' + ftp.encodeHostPort('127.0.0.1', portNum),
- ["425 Can't open data connection."])
- return d.addCallback(gotPortNum)
-
-
-
-class DTPFactoryTests(unittest.TestCase):
- """
- Tests for L{ftp.DTPFactory}.
- """
- def setUp(self):
- """
- Create a fake protocol interpreter and a L{ftp.DTPFactory} instance to
- test.
- """
- self.reactor = task.Clock()
-
- class ProtocolInterpreter(object):
- dtpInstance = None
-
- self.protocolInterpreter = ProtocolInterpreter()
- self.factory = ftp.DTPFactory(
- self.protocolInterpreter, None, self.reactor)
-
-
- def test_setTimeout(self):
- """
- L{ftp.DTPFactory.setTimeout} uses the reactor passed to its initializer
- to set up a timed event to time out the DTP setup after the specified
- number of seconds.
- """
- # Make sure the factory's deferred fails with the right exception, and
- # make it so we can tell exactly when it fires.
- finished = []
- d = self.assertFailure(self.factory.deferred, ftp.PortConnectionError)
- d.addCallback(finished.append)
-
- self.factory.setTimeout(6)
-
- # Advance the clock almost to the timeout
- self.reactor.advance(5)
-
- # Nothing should have happened yet.
- self.assertFalse(finished)
-
- # Advance it to the configured timeout.
- self.reactor.advance(1)
-
- # Now the Deferred should have failed with TimeoutError.
- self.assertTrue(finished)
-
- # There should also be no calls left in the reactor.
- self.assertFalse(self.reactor.calls)
-
-
- def test_buildProtocolOnce(self):
- """
- A L{ftp.DTPFactory} instance's C{buildProtocol} method can be used once
- to create a L{ftp.DTP} instance.
- """
- protocol = self.factory.buildProtocol(None)
- self.assertIsInstance(protocol, ftp.DTP)
-
- # A subsequent call returns None.
- self.assertIdentical(self.factory.buildProtocol(None), None)
-
-
- def test_timeoutAfterConnection(self):
- """
- If a timeout has been set up using L{ftp.DTPFactory.setTimeout}, it is
- cancelled by L{ftp.DTPFactory.buildProtocol}.
- """
- self.factory.setTimeout(10)
- protocol = self.factory.buildProtocol(None)
- # Make sure the call is no longer active.
- self.assertFalse(self.reactor.calls)
-
-
- def test_connectionAfterTimeout(self):
- """
- If L{ftp.DTPFactory.buildProtocol} is called after the timeout
- specified by L{ftp.DTPFactory.setTimeout} has elapsed, C{None} is
- returned.
- """
- # Handle the error so it doesn't get logged.
- d = self.assertFailure(self.factory.deferred, ftp.PortConnectionError)
-
- # Set up the timeout and then cause it to elapse so the Deferred does
- # fail.
- self.factory.setTimeout(10)
- self.reactor.advance(10)
-
- # Try to get a protocol - we should not be able to.
- self.assertIdentical(self.factory.buildProtocol(None), None)
-
- # Make sure the Deferred is doing the right thing.
- return d
-
-
- def test_timeoutAfterConnectionFailed(self):
- """
- L{ftp.DTPFactory.deferred} fails with L{PortConnectionError} when
- L{ftp.DTPFactory.clientConnectionFailed} is called. If the timeout
- specified with L{ftp.DTPFactory.setTimeout} expires after that, nothing
- additional happens.
- """
- finished = []
- d = self.assertFailure(self.factory.deferred, ftp.PortConnectionError)
- d.addCallback(finished.append)
-
- self.factory.setTimeout(10)
- self.assertFalse(finished)
- self.factory.clientConnectionFailed(None, None)
- self.assertTrue(finished)
- self.reactor.advance(10)
- return d
-
-
- def test_connectionFailedAfterTimeout(self):
- """
- If L{ftp.DTPFactory.clientConnectionFailed} is called after the timeout
- specified by L{ftp.DTPFactory.setTimeout} has elapsed, nothing beyond
- the normal timeout before happens.
- """
- # Handle the error so it doesn't get logged.
- d = self.assertFailure(self.factory.deferred, ftp.PortConnectionError)
-
- # Set up the timeout and then cause it to elapse so the Deferred does
- # fail.
- self.factory.setTimeout(10)
- self.reactor.advance(10)
-
- # Now fail the connection attempt. This should do nothing. In
- # particular, it should not raise an exception.
- self.factory.clientConnectionFailed(None, defer.TimeoutError("foo"))
-
- # Give the Deferred to trial so it can make sure it did what we
- # expected.
- return d
-
-
-
-# -- Client Tests -----------------------------------------------------------
-
-class PrintLines(protocol.Protocol):
- """Helper class used by FTPFileListingTests."""
-
- def __init__(self, lines):
- self._lines = lines
-
- def connectionMade(self):
- for line in self._lines:
- self.transport.write(line + "\r\n")
- self.transport.loseConnection()
-
-
-class MyFTPFileListProtocol(ftp.FTPFileListProtocol):
- def __init__(self):
- self.other = []
- ftp.FTPFileListProtocol.__init__(self)
-
- def unknownLine(self, line):
- self.other.append(line)
-
-
-class FTPFileListingTests(unittest.TestCase):
- def getFilesForLines(self, lines):
- fileList = MyFTPFileListProtocol()
- d = loopback.loopbackAsync(PrintLines(lines), fileList)
- d.addCallback(lambda _: (fileList.files, fileList.other))
- return d
-
- def testOneLine(self):
- # This example line taken from the docstring for FTPFileListProtocol
- line = '-rw-r--r-- 1 root other 531 Jan 29 03:26 README'
- def check(((file,), other)):
- self.failIf(other, 'unexpect unparsable lines: %s' % repr(other))
- self.failUnless(file['filetype'] == '-', 'misparsed fileitem')
- self.failUnless(file['perms'] == 'rw-r--r--', 'misparsed perms')
- self.failUnless(file['owner'] == 'root', 'misparsed fileitem')
- self.failUnless(file['group'] == 'other', 'misparsed fileitem')
- self.failUnless(file['size'] == 531, 'misparsed fileitem')
- self.failUnless(file['date'] == 'Jan 29 03:26', 'misparsed fileitem')
- self.failUnless(file['filename'] == 'README', 'misparsed fileitem')
- self.failUnless(file['nlinks'] == 1, 'misparsed nlinks')
- self.failIf(file['linktarget'], 'misparsed linktarget')
- return self.getFilesForLines([line]).addCallback(check)
-
- def testVariantLines(self):
- line1 = 'drw-r--r-- 2 root other 531 Jan 9 2003 A'
- line2 = 'lrw-r--r-- 1 root other 1 Jan 29 03:26 B -> A'
- line3 = 'woohoo! '
- def check(((file1, file2), (other,))):
- self.failUnless(other == 'woohoo! \r', 'incorrect other line')
- # file 1
- self.failUnless(file1['filetype'] == 'd', 'misparsed fileitem')
- self.failUnless(file1['perms'] == 'rw-r--r--', 'misparsed perms')
- self.failUnless(file1['owner'] == 'root', 'misparsed owner')
- self.failUnless(file1['group'] == 'other', 'misparsed group')
- self.failUnless(file1['size'] == 531, 'misparsed size')
- self.failUnless(file1['date'] == 'Jan 9 2003', 'misparsed date')
- self.failUnless(file1['filename'] == 'A', 'misparsed filename')
- self.failUnless(file1['nlinks'] == 2, 'misparsed nlinks')
- self.failIf(file1['linktarget'], 'misparsed linktarget')
- # file 2
- self.failUnless(file2['filetype'] == 'l', 'misparsed fileitem')
- self.failUnless(file2['perms'] == 'rw-r--r--', 'misparsed perms')
- self.failUnless(file2['owner'] == 'root', 'misparsed owner')
- self.failUnless(file2['group'] == 'other', 'misparsed group')
- self.failUnless(file2['size'] == 1, 'misparsed size')
- self.failUnless(file2['date'] == 'Jan 29 03:26', 'misparsed date')
- self.failUnless(file2['filename'] == 'B', 'misparsed filename')
- self.failUnless(file2['nlinks'] == 1, 'misparsed nlinks')
- self.failUnless(file2['linktarget'] == 'A', 'misparsed linktarget')
- return self.getFilesForLines([line1, line2, line3]).addCallback(check)
-
- def testUnknownLine(self):
- def check((files, others)):
- self.failIf(files, 'unexpected file entries')
- self.failUnless(others == ['ABC\r', 'not a file\r'],
- 'incorrect unparsable lines: %s' % repr(others))
- return self.getFilesForLines(['ABC', 'not a file']).addCallback(check)
-
- def testYear(self):
- # This example derived from bug description in issue 514.
- fileList = ftp.FTPFileListProtocol()
- exampleLine = (
- '-rw-r--r-- 1 root other 531 Jan 29 2003 README\n')
- class PrintLine(protocol.Protocol):
- def connectionMade(self):
- self.transport.write(exampleLine)
- self.transport.loseConnection()
-
- def check(ignored):
- file = fileList.files[0]
- self.failUnless(file['size'] == 531, 'misparsed fileitem')
- self.failUnless(file['date'] == 'Jan 29 2003', 'misparsed fileitem')
- self.failUnless(file['filename'] == 'README', 'misparsed fileitem')
-
- d = loopback.loopbackAsync(PrintLine(), fileList)
- return d.addCallback(check)
-
-
-class FTPClientTests(unittest.TestCase):
-
- def testFailedRETR(self):
- f = protocol.Factory()
- f.noisy = 0
- port = reactor.listenTCP(0, f, interface="127.0.0.1")
- self.addCleanup(port.stopListening)
- portNum = port.getHost().port
- # This test data derived from a bug report by ranty on #twisted
- responses = ['220 ready, dude (vsFTPd 1.0.0: beat me, break me)',
- # USER anonymous
- '331 Please specify the password.',
- # PASS twisted@twistedmatrix.com
- '230 Login successful. Have fun.',
- # TYPE I
- '200 Binary it is, then.',
- # PASV
- '227 Entering Passive Mode (127,0,0,1,%d,%d)' %
- (portNum >> 8, portNum & 0xff),
- # RETR /file/that/doesnt/exist
- '550 Failed to open file.']
- f.buildProtocol = lambda addr: PrintLines(responses)
-
- client = ftp.FTPClient(passive=1)
- cc = protocol.ClientCreator(reactor, ftp.FTPClient, passive=1)
- d = cc.connectTCP('127.0.0.1', portNum)
- def gotClient(client):
- p = protocol.Protocol()
- return client.retrieveFile('/file/that/doesnt/exist', p)
- d.addCallback(gotClient)
- return self.assertFailure(d, ftp.CommandFailed)
-
- def test_errbacksUponDisconnect(self):
- """
- Test the ftp command errbacks when a connection lost happens during
- the operation.
- """
- ftpClient = ftp.FTPClient()
- tr = proto_helpers.StringTransportWithDisconnection()
- ftpClient.makeConnection(tr)
- tr.protocol = ftpClient
- d = ftpClient.list('some path', Dummy())
- m = []
- def _eb(failure):
- m.append(failure)
- return None
- d.addErrback(_eb)
- from twisted.internet.main import CONNECTION_LOST
- ftpClient.connectionLost(failure.Failure(CONNECTION_LOST))
- self.failUnless(m, m)
- return d
-
-
-
-class FTPClientTestCase(unittest.TestCase):
- """
- Test advanced FTP client commands.
- """
- def setUp(self):
- """
- Create a FTP client and connect it to fake transport.
- """
- self.client = ftp.FTPClient()
- self.transport = proto_helpers.StringTransportWithDisconnection()
- self.client.makeConnection(self.transport)
- self.transport.protocol = self.client
-
-
- def tearDown(self):
- """
- Deliver disconnection notification to the client so that it can
- perform any cleanup which may be required.
- """
- self.client.connectionLost(error.ConnectionLost())
-
-
- def _testLogin(self):
- """
- Test the login part.
- """
- self.assertEqual(self.transport.value(), '')
- self.client.lineReceived(
- '331 Guest login ok, type your email address as password.')
- self.assertEqual(self.transport.value(), 'USER anonymous\r\n')
- self.transport.clear()
- self.client.lineReceived(
- '230 Anonymous login ok, access restrictions apply.')
- self.assertEqual(self.transport.value(), 'TYPE I\r\n')
- self.transport.clear()
- self.client.lineReceived('200 Type set to I.')
-
-
- def test_CDUP(self):
- """
- Test the CDUP command.
-
- L{ftp.FTPClient.cdup} should return a Deferred which fires with a
- sequence of one element which is the string the server sent
- indicating that the command was executed successfully.
-
- (XXX - This is a bad API)
- """
- def cbCdup(res):
- self.assertEqual(res[0], '250 Requested File Action Completed OK')
-
- self._testLogin()
- d = self.client.cdup().addCallback(cbCdup)
- self.assertEqual(self.transport.value(), 'CDUP\r\n')
- self.transport.clear()
- self.client.lineReceived('250 Requested File Action Completed OK')
- return d
-
-
- def test_failedCDUP(self):
- """
- Test L{ftp.FTPClient.cdup}'s handling of a failed CDUP command.
-
- When the CDUP command fails, the returned Deferred should errback
- with L{ftp.CommandFailed}.
- """
- self._testLogin()
- d = self.client.cdup()
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'CDUP\r\n')
- self.transport.clear()
- self.client.lineReceived('550 ..: No such file or directory')
- return d
-
-
- def test_PWD(self):
- """
- Test the PWD command.
-
- L{ftp.FTPClient.pwd} should return a Deferred which fires with a
- sequence of one element which is a string representing the current
- working directory on the server.
-
- (XXX - This is a bad API)
- """
- def cbPwd(res):
- self.assertEqual(ftp.parsePWDResponse(res[0]), "/bar/baz")
-
- self._testLogin()
- d = self.client.pwd().addCallback(cbPwd)
- self.assertEqual(self.transport.value(), 'PWD\r\n')
- self.client.lineReceived('257 "/bar/baz"')
- return d
-
-
- def test_failedPWD(self):
- """
- Test a failure in PWD command.
-
- When the PWD command fails, the returned Deferred should errback
- with L{ftp.CommandFailed}.
- """
- self._testLogin()
- d = self.client.pwd()
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'PWD\r\n')
- self.client.lineReceived('550 /bar/baz: No such file or directory')
- return d
-
-
- def test_CWD(self):
- """
- Test the CWD command.
-
- L{ftp.FTPClient.cwd} should return a Deferred which fires with a
- sequence of one element which is the string the server sent
- indicating that the command was executed successfully.
-
- (XXX - This is a bad API)
- """
- def cbCwd(res):
- self.assertEqual(res[0], '250 Requested File Action Completed OK')
-
- self._testLogin()
- d = self.client.cwd("bar/foo").addCallback(cbCwd)
- self.assertEqual(self.transport.value(), 'CWD bar/foo\r\n')
- self.client.lineReceived('250 Requested File Action Completed OK')
- return d
-
-
- def test_failedCWD(self):
- """
- Test a failure in CWD command.
-
- When the PWD command fails, the returned Deferred should errback
- with L{ftp.CommandFailed}.
- """
- self._testLogin()
- d = self.client.cwd("bar/foo")
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'CWD bar/foo\r\n')
- self.client.lineReceived('550 bar/foo: No such file or directory')
- return d
-
-
- def test_passiveRETR(self):
- """
- Test the RETR command in passive mode: get a file and verify its
- content.
-
- L{ftp.FTPClient.retrieveFile} should return a Deferred which fires
- with the protocol instance passed to it after the download has
- completed.
-
- (XXX - This API should be based on producers and consumers)
- """
- def cbRetr(res, proto):
- self.assertEqual(proto.buffer, 'x' * 1000)
-
- def cbConnect(host, port, factory):
- self.assertEqual(host, '127.0.0.1')
- self.assertEqual(port, 12345)
- proto = factory.buildProtocol((host, port))
- proto.makeConnection(proto_helpers.StringTransport())
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- proto.dataReceived("x" * 1000)
- proto.connectionLost(failure.Failure(error.ConnectionDone("")))
-
- self.client.connectFactory = cbConnect
- self._testLogin()
- proto = _BufferingProtocol()
- d = self.client.retrieveFile("spam", proto)
- d.addCallback(cbRetr, proto)
- self.assertEqual(self.transport.value(), 'PASV\r\n')
- self.transport.clear()
- self.client.lineReceived('227 Entering Passive Mode (%s).' %
- (ftp.encodeHostPort('127.0.0.1', 12345),))
- self.assertEqual(self.transport.value(), 'RETR spam\r\n')
- self.transport.clear()
- self.client.lineReceived('226 Transfer Complete.')
- return d
-
-
- def test_RETR(self):
- """
- Test the RETR command in non-passive mode.
-
- Like L{test_passiveRETR} but in the configuration where the server
- establishes the data connection to the client, rather than the other
- way around.
- """
- self.client.passive = False
-
- def generatePort(portCmd):
- portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
- portCmd.protocol.makeConnection(proto_helpers.StringTransport())
- portCmd.protocol.dataReceived("x" * 1000)
- portCmd.protocol.connectionLost(
- failure.Failure(error.ConnectionDone("")))
-
- def cbRetr(res, proto):
- self.assertEqual(proto.buffer, 'x' * 1000)
-
- self.client.generatePortCommand = generatePort
- self._testLogin()
- proto = _BufferingProtocol()
- d = self.client.retrieveFile("spam", proto)
- d.addCallback(cbRetr, proto)
- self.assertEqual(self.transport.value(), 'PORT %s\r\n' %
- (ftp.encodeHostPort('127.0.0.1', 9876),))
- self.transport.clear()
- self.client.lineReceived('200 PORT OK')
- self.assertEqual(self.transport.value(), 'RETR spam\r\n')
- self.transport.clear()
- self.client.lineReceived('226 Transfer Complete.')
- return d
-
-
- def test_failedRETR(self):
- """
- Try to RETR an unexisting file.
-
- L{ftp.FTPClient.retrieveFile} should return a Deferred which
- errbacks with L{ftp.CommandFailed} if the server indicates the file
- cannot be transferred for some reason.
- """
- def cbConnect(host, port, factory):
- self.assertEqual(host, '127.0.0.1')
- self.assertEqual(port, 12345)
- proto = factory.buildProtocol((host, port))
- proto.makeConnection(proto_helpers.StringTransport())
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- proto.connectionLost(failure.Failure(error.ConnectionDone("")))
-
- self.client.connectFactory = cbConnect
- self._testLogin()
- proto = _BufferingProtocol()
- d = self.client.retrieveFile("spam", proto)
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'PASV\r\n')
- self.transport.clear()
- self.client.lineReceived('227 Entering Passive Mode (%s).' %
- (ftp.encodeHostPort('127.0.0.1', 12345),))
- self.assertEqual(self.transport.value(), 'RETR spam\r\n')
- self.transport.clear()
- self.client.lineReceived('550 spam: No such file or directory')
- return d
-
-
- def test_lostRETR(self):
- """
- Try a RETR, but disconnect during the transfer.
- L{ftp.FTPClient.retrieveFile} should return a Deferred which
- errbacks with L{ftp.ConnectionLost)
- """
- self.client.passive = False
-
- l = []
- def generatePort(portCmd):
- portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
- tr = proto_helpers.StringTransportWithDisconnection()
- portCmd.protocol.makeConnection(tr)
- tr.protocol = portCmd.protocol
- portCmd.protocol.dataReceived("x" * 500)
- l.append(tr)
-
- self.client.generatePortCommand = generatePort
- self._testLogin()
- proto = _BufferingProtocol()
- d = self.client.retrieveFile("spam", proto)
- self.assertEqual(self.transport.value(), 'PORT %s\r\n' %
- (ftp.encodeHostPort('127.0.0.1', 9876),))
- self.transport.clear()
- self.client.lineReceived('200 PORT OK')
- self.assertEqual(self.transport.value(), 'RETR spam\r\n')
-
- self.assert_(l)
- l[0].loseConnection()
- self.transport.loseConnection()
- self.assertFailure(d, ftp.ConnectionLost)
- return d
-
-
- def test_passiveSTOR(self):
- """
- Test the STOR command: send a file and verify its content.
-
- L{ftp.FTPClient.storeFile} should return a two-tuple of Deferreds.
- The first of which should fire with a protocol instance when the
- data connection has been established and is responsible for sending
- the contents of the file. The second of which should fire when the
- upload has completed, the data connection has been closed, and the
- server has acknowledged receipt of the file.
-
- (XXX - storeFile should take a producer as an argument, instead, and
- only return a Deferred which fires when the upload has succeeded or
- failed).
- """
- tr = proto_helpers.StringTransport()
- def cbStore(sender):
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- sender.transport.write("x" * 1000)
- sender.finish()
- sender.connectionLost(failure.Failure(error.ConnectionDone("")))
-
- def cbFinish(ign):
- self.assertEqual(tr.value(), "x" * 1000)
-
- def cbConnect(host, port, factory):
- self.assertEqual(host, '127.0.0.1')
- self.assertEqual(port, 12345)
- proto = factory.buildProtocol((host, port))
- proto.makeConnection(tr)
-
- self.client.connectFactory = cbConnect
- self._testLogin()
- d1, d2 = self.client.storeFile("spam")
- d1.addCallback(cbStore)
- d2.addCallback(cbFinish)
- self.assertEqual(self.transport.value(), 'PASV\r\n')
- self.transport.clear()
- self.client.lineReceived('227 Entering Passive Mode (%s).' %
- (ftp.encodeHostPort('127.0.0.1', 12345),))
- self.assertEqual(self.transport.value(), 'STOR spam\r\n')
- self.transport.clear()
- self.client.lineReceived('226 Transfer Complete.')
- return defer.gatherResults([d1, d2])
-
-
- def test_failedSTOR(self):
- """
- Test a failure in the STOR command.
-
- If the server does not acknowledge successful receipt of the
- uploaded file, the second Deferred returned by
- L{ftp.FTPClient.storeFile} should errback with L{ftp.CommandFailed}.
- """
- tr = proto_helpers.StringTransport()
- def cbStore(sender):
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- sender.transport.write("x" * 1000)
- sender.finish()
- sender.connectionLost(failure.Failure(error.ConnectionDone("")))
-
- def cbConnect(host, port, factory):
- self.assertEqual(host, '127.0.0.1')
- self.assertEqual(port, 12345)
- proto = factory.buildProtocol((host, port))
- proto.makeConnection(tr)
-
- self.client.connectFactory = cbConnect
- self._testLogin()
- d1, d2 = self.client.storeFile("spam")
- d1.addCallback(cbStore)
- self.assertFailure(d2, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'PASV\r\n')
- self.transport.clear()
- self.client.lineReceived('227 Entering Passive Mode (%s).' %
- (ftp.encodeHostPort('127.0.0.1', 12345),))
- self.assertEqual(self.transport.value(), 'STOR spam\r\n')
- self.transport.clear()
- self.client.lineReceived(
- '426 Transfer aborted. Data connection closed.')
- return defer.gatherResults([d1, d2])
-
-
- def test_STOR(self):
- """
- Test the STOR command in non-passive mode.
-
- Like L{test_passiveSTOR} but in the configuration where the server
- establishes the data connection to the client, rather than the other
- way around.
- """
- tr = proto_helpers.StringTransport()
- self.client.passive = False
- def generatePort(portCmd):
- portCmd.text = 'PORT %s' % ftp.encodeHostPort('127.0.0.1', 9876)
- portCmd.protocol.makeConnection(tr)
-
- def cbStore(sender):
- self.assertEqual(self.transport.value(), 'PORT %s\r\n' %
- (ftp.encodeHostPort('127.0.0.1', 9876),))
- self.transport.clear()
- self.client.lineReceived('200 PORT OK')
- self.assertEqual(self.transport.value(), 'STOR spam\r\n')
- self.transport.clear()
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- sender.transport.write("x" * 1000)
- sender.finish()
- sender.connectionLost(failure.Failure(error.ConnectionDone("")))
- self.client.lineReceived('226 Transfer Complete.')
-
- def cbFinish(ign):
- self.assertEqual(tr.value(), "x" * 1000)
-
- self.client.generatePortCommand = generatePort
- self._testLogin()
- d1, d2 = self.client.storeFile("spam")
- d1.addCallback(cbStore)
- d2.addCallback(cbFinish)
- return defer.gatherResults([d1, d2])
-
-
- def test_passiveLIST(self):
- """
- Test the LIST command.
-
- L{ftp.FTPClient.list} should return a Deferred which fires with a
- protocol instance which was passed to list after the command has
- succeeded.
-
- (XXX - This is a very unfortunate API; if my understanding is
- correct, the results are always at least line-oriented, so allowing
- a per-line parser function to be specified would make this simpler,
- but a default implementation should really be provided which knows
- how to deal with all the formats used in real servers, so
- application developers never have to care about this insanity. It
- would also be nice to either get back a Deferred of a list of
- filenames or to be able to consume the files as they are received
- (which the current API does allow, but in a somewhat inconvenient
- fashion) -exarkun)
- """
- def cbList(res, fileList):
- fls = [f["filename"] for f in fileList.files]
- expected = ["foo", "bar", "baz"]
- expected.sort()
- fls.sort()
- self.assertEqual(fls, expected)
-
- def cbConnect(host, port, factory):
- self.assertEqual(host, '127.0.0.1')
- self.assertEqual(port, 12345)
- proto = factory.buildProtocol((host, port))
- proto.makeConnection(proto_helpers.StringTransport())
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- sending = [
- '-rw-r--r-- 0 spam egg 100 Oct 10 2006 foo\r\n',
- '-rw-r--r-- 3 spam egg 100 Oct 10 2006 bar\r\n',
- '-rw-r--r-- 4 spam egg 100 Oct 10 2006 baz\r\n',
- ]
- for i in sending:
- proto.dataReceived(i)
- proto.connectionLost(failure.Failure(error.ConnectionDone("")))
-
- self.client.connectFactory = cbConnect
- self._testLogin()
- fileList = ftp.FTPFileListProtocol()
- d = self.client.list('foo/bar', fileList).addCallback(cbList, fileList)
- self.assertEqual(self.transport.value(), 'PASV\r\n')
- self.transport.clear()
- self.client.lineReceived('227 Entering Passive Mode (%s).' %
- (ftp.encodeHostPort('127.0.0.1', 12345),))
- self.assertEqual(self.transport.value(), 'LIST foo/bar\r\n')
- self.client.lineReceived('226 Transfer Complete.')
- return d
-
-
- def test_LIST(self):
- """
- Test the LIST command in non-passive mode.
-
- Like L{test_passiveLIST} but in the configuration where the server
- establishes the data connection to the client, rather than the other
- way around.
- """
- self.client.passive = False
- def generatePort(portCmd):
- portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
- portCmd.protocol.makeConnection(proto_helpers.StringTransport())
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- sending = [
- '-rw-r--r-- 0 spam egg 100 Oct 10 2006 foo\r\n',
- '-rw-r--r-- 3 spam egg 100 Oct 10 2006 bar\r\n',
- '-rw-r--r-- 4 spam egg 100 Oct 10 2006 baz\r\n',
- ]
- for i in sending:
- portCmd.protocol.dataReceived(i)
- portCmd.protocol.connectionLost(
- failure.Failure(error.ConnectionDone("")))
-
- def cbList(res, fileList):
- fls = [f["filename"] for f in fileList.files]
- expected = ["foo", "bar", "baz"]
- expected.sort()
- fls.sort()
- self.assertEqual(fls, expected)
-
- self.client.generatePortCommand = generatePort
- self._testLogin()
- fileList = ftp.FTPFileListProtocol()
- d = self.client.list('foo/bar', fileList).addCallback(cbList, fileList)
- self.assertEqual(self.transport.value(), 'PORT %s\r\n' %
- (ftp.encodeHostPort('127.0.0.1', 9876),))
- self.transport.clear()
- self.client.lineReceived('200 PORT OK')
- self.assertEqual(self.transport.value(), 'LIST foo/bar\r\n')
- self.transport.clear()
- self.client.lineReceived('226 Transfer Complete.')
- return d
-
-
- def test_failedLIST(self):
- """
- Test a failure in LIST command.
-
- L{ftp.FTPClient.list} should return a Deferred which fails with
- L{ftp.CommandFailed} if the server indicates the indicated path is
- invalid for some reason.
- """
- def cbConnect(host, port, factory):
- self.assertEqual(host, '127.0.0.1')
- self.assertEqual(port, 12345)
- proto = factory.buildProtocol((host, port))
- proto.makeConnection(proto_helpers.StringTransport())
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- proto.connectionLost(failure.Failure(error.ConnectionDone("")))
-
- self.client.connectFactory = cbConnect
- self._testLogin()
- fileList = ftp.FTPFileListProtocol()
- d = self.client.list('foo/bar', fileList)
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'PASV\r\n')
- self.transport.clear()
- self.client.lineReceived('227 Entering Passive Mode (%s).' %
- (ftp.encodeHostPort('127.0.0.1', 12345),))
- self.assertEqual(self.transport.value(), 'LIST foo/bar\r\n')
- self.client.lineReceived('550 foo/bar: No such file or directory')
- return d
-
-
- def test_NLST(self):
- """
- Test the NLST command in non-passive mode.
-
- L{ftp.FTPClient.nlst} should return a Deferred which fires with a
- list of filenames when the list command has completed.
- """
- self.client.passive = False
- def generatePort(portCmd):
- portCmd.text = 'PORT %s' % (ftp.encodeHostPort('127.0.0.1', 9876),)
- portCmd.protocol.makeConnection(proto_helpers.StringTransport())
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- portCmd.protocol.dataReceived('foo\r\n')
- portCmd.protocol.dataReceived('bar\r\n')
- portCmd.protocol.dataReceived('baz\r\n')
- portCmd.protocol.connectionLost(
- failure.Failure(error.ConnectionDone("")))
-
- def cbList(res, proto):
- fls = proto.buffer.splitlines()
- expected = ["foo", "bar", "baz"]
- expected.sort()
- fls.sort()
- self.assertEqual(fls, expected)
-
- self.client.generatePortCommand = generatePort
- self._testLogin()
- lstproto = _BufferingProtocol()
- d = self.client.nlst('foo/bar', lstproto).addCallback(cbList, lstproto)
- self.assertEqual(self.transport.value(), 'PORT %s\r\n' %
- (ftp.encodeHostPort('127.0.0.1', 9876),))
- self.transport.clear()
- self.client.lineReceived('200 PORT OK')
- self.assertEqual(self.transport.value(), 'NLST foo/bar\r\n')
- self.client.lineReceived('226 Transfer Complete.')
- return d
-
-
- def test_passiveNLST(self):
- """
- Test the NLST command.
-
- Like L{test_passiveNLST} but in the configuration where the server
- establishes the data connection to the client, rather than the other
- way around.
- """
- def cbList(res, proto):
- fls = proto.buffer.splitlines()
- expected = ["foo", "bar", "baz"]
- expected.sort()
- fls.sort()
- self.assertEqual(fls, expected)
-
- def cbConnect(host, port, factory):
- self.assertEqual(host, '127.0.0.1')
- self.assertEqual(port, 12345)
- proto = factory.buildProtocol((host, port))
- proto.makeConnection(proto_helpers.StringTransport())
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- proto.dataReceived('foo\r\n')
- proto.dataReceived('bar\r\n')
- proto.dataReceived('baz\r\n')
- proto.connectionLost(failure.Failure(error.ConnectionDone("")))
-
- self.client.connectFactory = cbConnect
- self._testLogin()
- lstproto = _BufferingProtocol()
- d = self.client.nlst('foo/bar', lstproto).addCallback(cbList, lstproto)
- self.assertEqual(self.transport.value(), 'PASV\r\n')
- self.transport.clear()
- self.client.lineReceived('227 Entering Passive Mode (%s).' %
- (ftp.encodeHostPort('127.0.0.1', 12345),))
- self.assertEqual(self.transport.value(), 'NLST foo/bar\r\n')
- self.client.lineReceived('226 Transfer Complete.')
- return d
-
-
- def test_failedNLST(self):
- """
- Test a failure in NLST command.
-
- L{ftp.FTPClient.nlst} should return a Deferred which fails with
- L{ftp.CommandFailed} if the server indicates the indicated path is
- invalid for some reason.
- """
- tr = proto_helpers.StringTransport()
- def cbConnect(host, port, factory):
- self.assertEqual(host, '127.0.0.1')
- self.assertEqual(port, 12345)
- proto = factory.buildProtocol((host, port))
- proto.makeConnection(tr)
- self.client.lineReceived(
- '150 File status okay; about to open data connection.')
- proto.connectionLost(failure.Failure(error.ConnectionDone("")))
-
- self.client.connectFactory = cbConnect
- self._testLogin()
- lstproto = _BufferingProtocol()
- d = self.client.nlst('foo/bar', lstproto)
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'PASV\r\n')
- self.transport.clear()
- self.client.lineReceived('227 Entering Passive Mode (%s).' %
- (ftp.encodeHostPort('127.0.0.1', 12345),))
- self.assertEqual(self.transport.value(), 'NLST foo/bar\r\n')
- self.client.lineReceived('550 foo/bar: No such file or directory')
- return d
-
-
- def test_changeDirectoryDeprecated(self):
- """
- L{ftp.FTPClient.changeDirectory} is deprecated and the direct caller of
- it is warned of this.
- """
- self._testLogin()
- d = self.assertWarns(
- DeprecationWarning,
- "FTPClient.changeDirectory is deprecated in Twisted 8.2 and "
- "newer. Use FTPClient.cwd instead.",
- __file__,
- lambda: self.client.changeDirectory('.'))
- # This is necessary to make the Deferred fire. The Deferred needs
- # to fire so that tearDown doesn't cause it to errback and fail this
- # or (more likely) a later test.
- self.client.lineReceived('250 success')
- return d
-
-
- def test_changeDirectory(self):
- """
- Test the changeDirectory method.
-
- L{ftp.FTPClient.changeDirectory} should return a Deferred which fires
- with True if succeeded.
- """
- def cbCd(res):
- self.assertEqual(res, True)
-
- self._testLogin()
- d = self.client.changeDirectory("bar/foo").addCallback(cbCd)
- self.assertEqual(self.transport.value(), 'CWD bar/foo\r\n')
- self.client.lineReceived('250 Requested File Action Completed OK')
- return d
- test_changeDirectory.suppress = [_changeDirectorySuppression]
-
-
- def test_failedChangeDirectory(self):
- """
- Test a failure in the changeDirectory method.
-
- The behaviour here is the same as a failed CWD.
- """
- self._testLogin()
- d = self.client.changeDirectory("bar/foo")
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'CWD bar/foo\r\n')
- self.client.lineReceived('550 bar/foo: No such file or directory')
- return d
- test_failedChangeDirectory.suppress = [_changeDirectorySuppression]
-
-
- def test_strangeFailedChangeDirectory(self):
- """
- Test a strange failure in changeDirectory method.
-
- L{ftp.FTPClient.changeDirectory} is stricter than CWD as it checks
- code 250 for success.
- """
- self._testLogin()
- d = self.client.changeDirectory("bar/foo")
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'CWD bar/foo\r\n')
- self.client.lineReceived('252 I do what I want !')
- return d
- test_strangeFailedChangeDirectory.suppress = [_changeDirectorySuppression]
-
-
- def test_renameFromTo(self):
- """
- L{ftp.FTPClient.rename} issues I{RNTO} and I{RNFR} commands and returns
- a L{Deferred} which fires when a file has successfully been renamed.
- """
- self._testLogin()
-
- d = self.client.rename("/spam", "/ham")
- self.assertEqual(self.transport.value(), 'RNFR /spam\r\n')
- self.transport.clear()
-
- fromResponse = (
- '350 Requested file action pending further information.\r\n')
- self.client.lineReceived(fromResponse)
- self.assertEqual(self.transport.value(), 'RNTO /ham\r\n')
- toResponse = (
- '250 Requested File Action Completed OK')
- self.client.lineReceived(toResponse)
-
- d.addCallback(self.assertEqual, ([fromResponse], [toResponse]))
- return d
-
-
- def test_renameFromToEscapesPaths(self):
- """
- L{ftp.FTPClient.rename} issues I{RNTO} and I{RNFR} commands with paths
- escaped according to U{http://cr.yp.to/ftp/filesystem.html}.
- """
- self._testLogin()
-
- fromFile = "/foo/ba\nr/baz"
- toFile = "/qu\nux"
- self.client.rename(fromFile, toFile)
- self.client.lineReceived("350 ")
- self.client.lineReceived("250 ")
- self.assertEqual(
- self.transport.value(),
- "RNFR /foo/ba\x00r/baz\r\n"
- "RNTO /qu\x00ux\r\n")
-
-
- def test_renameFromToFailingOnFirstError(self):
- """
- The L{Deferred} returned by L{ftp.FTPClient.rename} is errbacked with
- L{CommandFailed} if the I{RNFR} command receives an error response code
- (for example, because the file does not exist).
- """
- self._testLogin()
-
- d = self.client.rename("/spam", "/ham")
- self.assertEqual(self.transport.value(), 'RNFR /spam\r\n')
- self.transport.clear()
-
- self.client.lineReceived('550 Requested file unavailable.\r\n')
- # The RNTO should not execute since the RNFR failed.
- self.assertEqual(self.transport.value(), '')
-
- return self.assertFailure(d, ftp.CommandFailed)
-
-
- def test_renameFromToFailingOnRenameTo(self):
- """
- The L{Deferred} returned by L{ftp.FTPClient.rename} is errbacked with
- L{CommandFailed} if the I{RNTO} command receives an error response code
- (for example, because the destination directory does not exist).
- """
- self._testLogin()
-
- d = self.client.rename("/spam", "/ham")
- self.assertEqual(self.transport.value(), 'RNFR /spam\r\n')
- self.transport.clear()
-
- self.client.lineReceived('350 Requested file action pending further information.\r\n')
- self.assertEqual(self.transport.value(), 'RNTO /ham\r\n')
- self.client.lineReceived('550 Requested file unavailable.\r\n')
- return self.assertFailure(d, ftp.CommandFailed)
-
-
- def test_makeDirectory(self):
- """
- L{ftp.FTPClient.makeDirectory} issues a I{MKD} command and returns a
- L{Deferred} which is called back with the server's response if the
- directory is created.
- """
- self._testLogin()
-
- d = self.client.makeDirectory("/spam")
- self.assertEqual(self.transport.value(), 'MKD /spam\r\n')
- self.client.lineReceived('257 "/spam" created.')
- return d.addCallback(self.assertEqual, ['257 "/spam" created.'])
-
-
- def test_makeDirectoryPathEscape(self):
- """
- L{ftp.FTPClient.makeDirectory} escapes the path name it sends according
- to U{http://cr.yp.to/ftp/filesystem.html}.
- """
- self._testLogin()
- d = self.client.makeDirectory("/sp\nam")
- self.assertEqual(self.transport.value(), 'MKD /sp\x00am\r\n')
- # This is necessary to make the Deferred fire. The Deferred needs
- # to fire so that tearDown doesn't cause it to errback and fail this
- # or (more likely) a later test.
- self.client.lineReceived('257 win')
- return d
-
-
- def test_failedMakeDirectory(self):
- """
- L{ftp.FTPClient.makeDirectory} returns a L{Deferred} which is errbacked
- with L{CommandFailed} if the server returns an error response code.
- """
- self._testLogin()
-
- d = self.client.makeDirectory("/spam")
- self.assertEqual(self.transport.value(), 'MKD /spam\r\n')
- self.client.lineReceived('550 PERMISSION DENIED')
- return self.assertFailure(d, ftp.CommandFailed)
-
-
- def test_getDirectory(self):
- """
- Test the getDirectory method.
-
- L{ftp.FTPClient.getDirectory} should return a Deferred which fires with
- the current directory on the server. It wraps PWD command.
- """
- def cbGet(res):
- self.assertEqual(res, "/bar/baz")
-
- self._testLogin()
- d = self.client.getDirectory().addCallback(cbGet)
- self.assertEqual(self.transport.value(), 'PWD\r\n')
- self.client.lineReceived('257 "/bar/baz"')
- return d
-
-
- def test_failedGetDirectory(self):
- """
- Test a failure in getDirectory method.
-
- The behaviour should be the same as PWD.
- """
- self._testLogin()
- d = self.client.getDirectory()
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'PWD\r\n')
- self.client.lineReceived('550 /bar/baz: No such file or directory')
- return d
-
-
- def test_anotherFailedGetDirectory(self):
- """
- Test a different failure in getDirectory method.
-
- The response should be quoted to be parsed, so it returns an error
- otherwise.
- """
- self._testLogin()
- d = self.client.getDirectory()
- self.assertFailure(d, ftp.CommandFailed)
- self.assertEqual(self.transport.value(), 'PWD\r\n')
- self.client.lineReceived('257 /bar/baz')
- return d
-
-
- def test_removeFile(self):
- """
- L{ftp.FTPClient.removeFile} sends a I{DELE} command to the server for
- the indicated file and returns a Deferred which fires after the server
- sends a 250 response code.
- """
- self._testLogin()
- d = self.client.removeFile("/tmp/test")
- self.assertEqual(self.transport.value(), 'DELE /tmp/test\r\n')
- response = '250 Requested file action okay, completed.'
- self.client.lineReceived(response)
- return d.addCallback(self.assertEqual, [response])
-
-
- def test_failedRemoveFile(self):
- """
- If the server returns a response code other than 250 in response to a
- I{DELE} sent by L{ftp.FTPClient.removeFile}, the L{Deferred} returned
- by C{removeFile} is errbacked with a L{Failure} wrapping a
- L{CommandFailed}.
- """
- self._testLogin()
- d = self.client.removeFile("/tmp/test")
- self.assertEqual(self.transport.value(), 'DELE /tmp/test\r\n')
- response = '501 Syntax error in parameters or arguments.'
- self.client.lineReceived(response)
- d = self.assertFailure(d, ftp.CommandFailed)
- d.addCallback(lambda exc: self.assertEqual(exc.args, ([response],)))
- return d
-
-
- def test_unparsableRemoveFileResponse(self):
- """
- If the server returns a response line which cannot be parsed, the
- L{Deferred} returned by L{ftp.FTPClient.removeFile} is errbacked with a
- L{BadResponse} containing the response.
- """
- self._testLogin()
- d = self.client.removeFile("/tmp/test")
- response = '765 blah blah blah'
- self.client.lineReceived(response)
- d = self.assertFailure(d, ftp.BadResponse)
- d.addCallback(lambda exc: self.assertEqual(exc.args, ([response],)))
- return d
-
-
- def test_multilineRemoveFileResponse(self):
- """
- If the server returns multiple response lines, the L{Deferred} returned
- by L{ftp.FTPClient.removeFile} is still fired with a true value if the
- ultimate response code is 250.
- """
- self._testLogin()
- d = self.client.removeFile("/tmp/test")
- response = ['250-perhaps a progress report',
- '250 okay']
- map(self.client.lineReceived, response)
- return d.addCallback(self.assertTrue)
-
-
- def test_removeDirectory(self):
- """
- L{ftp.FTPClient.removeDirectory} sends a I{RMD} command to the server
- for the indicated directory and returns a Deferred which fires after
- the server sends a 250 response code.
- """
- self._testLogin()
- d = self.client.removeDirectory('/tmp/test')
- self.assertEqual(self.transport.value(), 'RMD /tmp/test\r\n')
- response = '250 Requested file action okay, completed.'
- self.client.lineReceived(response)
- return d.addCallback(self.assertEqual, [response])
-
-
- def test_failedRemoveDirectory(self):
- """
- If the server returns a response code other than 250 in response to a
- I{RMD} sent by L{ftp.FTPClient.removeDirectory}, the L{Deferred}
- returned by C{removeDirectory} is errbacked with a L{Failure} wrapping
- a L{CommandFailed}.
- """
- self._testLogin()
- d = self.client.removeDirectory("/tmp/test")
- self.assertEqual(self.transport.value(), 'RMD /tmp/test\r\n')
- response = '501 Syntax error in parameters or arguments.'
- self.client.lineReceived(response)
- d = self.assertFailure(d, ftp.CommandFailed)
- d.addCallback(lambda exc: self.assertEqual(exc.args, ([response],)))
- return d
-
-
- def test_unparsableRemoveDirectoryResponse(self):
- """
- If the server returns a response line which cannot be parsed, the
- L{Deferred} returned by L{ftp.FTPClient.removeDirectory} is errbacked
- with a L{BadResponse} containing the response.
- """
- self._testLogin()
- d = self.client.removeDirectory("/tmp/test")
- response = '765 blah blah blah'
- self.client.lineReceived(response)
- d = self.assertFailure(d, ftp.BadResponse)
- d.addCallback(lambda exc: self.assertEqual(exc.args, ([response],)))
- return d
-
-
- def test_multilineRemoveDirectoryResponse(self):
- """
- If the server returns multiple response lines, the L{Deferred} returned
- by L{ftp.FTPClient.removeDirectory} is still fired with a true value
- if the ultimate response code is 250.
- """
- self._testLogin()
- d = self.client.removeDirectory("/tmp/test")
- response = ['250-perhaps a progress report',
- '250 okay']
- map(self.client.lineReceived, response)
- return d.addCallback(self.assertTrue)
-
-
-
-class FTPClientBasicTests(unittest.TestCase):
-
- def testGreeting(self):
- # The first response is captured as a greeting.
- ftpClient = ftp.FTPClientBasic()
- ftpClient.lineReceived('220 Imaginary FTP.')
- self.assertEqual(['220 Imaginary FTP.'], ftpClient.greeting)
-
- def testResponseWithNoMessage(self):
- # Responses with no message are still valid, i.e. three digits followed
- # by a space is complete response.
- ftpClient = ftp.FTPClientBasic()
- ftpClient.lineReceived('220 ')
- self.assertEqual(['220 '], ftpClient.greeting)
-
- def testMultilineResponse(self):
- ftpClient = ftp.FTPClientBasic()
- ftpClient.transport = proto_helpers.StringTransport()
- ftpClient.lineReceived('220 Imaginary FTP.')
-
- # Queue (and send) a dummy command, and set up a callback to capture the
- # result
- deferred = ftpClient.queueStringCommand('BLAH')
- result = []
- deferred.addCallback(result.append)
- deferred.addErrback(self.fail)
-
- # Send the first line of a multiline response.
- ftpClient.lineReceived('210-First line.')
- self.assertEqual([], result)
-
- # Send a second line, again prefixed with "nnn-".
- ftpClient.lineReceived('123-Second line.')
- self.assertEqual([], result)
-
- # Send a plain line of text, no prefix.
- ftpClient.lineReceived('Just some text.')
- self.assertEqual([], result)
-
- # Now send a short (less than 4 chars) line.
- ftpClient.lineReceived('Hi')
- self.assertEqual([], result)
-
- # Now send an empty line.
- ftpClient.lineReceived('')
- self.assertEqual([], result)
-
- # And a line with 3 digits in it, and nothing else.
- ftpClient.lineReceived('321')
- self.assertEqual([], result)
-
- # Now finish it.
- ftpClient.lineReceived('210 Done.')
- self.assertEqual(
- ['210-First line.',
- '123-Second line.',
- 'Just some text.',
- 'Hi',
- '',
- '321',
- '210 Done.'], result[0])
-
-
- def test_noPasswordGiven(self):
- """
- Passing None as the password avoids sending the PASS command.
- """
- # Create a client, and give it a greeting.
- ftpClient = ftp.FTPClientBasic()
- ftpClient.transport = proto_helpers.StringTransport()
- ftpClient.lineReceived('220 Welcome to Imaginary FTP.')
-
- # Queue a login with no password
- ftpClient.queueLogin('bob', None)
- self.assertEqual('USER bob\r\n', ftpClient.transport.value())
-
- # Clear the test buffer, acknowledge the USER command.
- ftpClient.transport.clear()
- ftpClient.lineReceived('200 Hello bob.')
-
- # The client shouldn't have sent anything more (i.e. it shouldn't have
- # sent a PASS command).
- self.assertEqual('', ftpClient.transport.value())
-
-
- def test_noPasswordNeeded(self):
- """
- Receiving a 230 response to USER prevents PASS from being sent.
- """
- # Create a client, and give it a greeting.
- ftpClient = ftp.FTPClientBasic()
- ftpClient.transport = proto_helpers.StringTransport()
- ftpClient.lineReceived('220 Welcome to Imaginary FTP.')
-
- # Queue a login with no password
- ftpClient.queueLogin('bob', 'secret')
- self.assertEqual('USER bob\r\n', ftpClient.transport.value())
-
- # Clear the test buffer, acknowledge the USER command with a 230
- # response code.
- ftpClient.transport.clear()
- ftpClient.lineReceived('230 Hello bob. No password needed.')
-
- # The client shouldn't have sent anything more (i.e. it shouldn't have
- # sent a PASS command).
- self.assertEqual('', ftpClient.transport.value())
-
-
-
-class PathHandling(unittest.TestCase):
- def testNormalizer(self):
- for inp, outp in [('a', ['a']),
- ('/a', ['a']),
- ('/', []),
- ('a/b/c', ['a', 'b', 'c']),
- ('/a/b/c', ['a', 'b', 'c']),
- ('/a/', ['a']),
- ('a/', ['a'])]:
- self.assertEqual(ftp.toSegments([], inp), outp)
-
- for inp, outp in [('b', ['a', 'b']),
- ('b/', ['a', 'b']),
- ('/b', ['b']),
- ('/b/', ['b']),
- ('b/c', ['a', 'b', 'c']),
- ('b/c/', ['a', 'b', 'c']),
- ('/b/c', ['b', 'c']),
- ('/b/c/', ['b', 'c'])]:
- self.assertEqual(ftp.toSegments(['a'], inp), outp)
-
- for inp, outp in [('//', []),
- ('//a', ['a']),
- ('a//', ['a']),
- ('a//b', ['a', 'b'])]:
- self.assertEqual(ftp.toSegments([], inp), outp)
-
- for inp, outp in [('//', []),
- ('//b', ['b']),
- ('b//c', ['a', 'b', 'c'])]:
- self.assertEqual(ftp.toSegments(['a'], inp), outp)
-
- for inp, outp in [('..', []),
- ('../', []),
- ('a/..', ['x']),
- ('/a/..', []),
- ('/a/b/..', ['a']),
- ('/a/b/../', ['a']),
- ('/a/b/../c', ['a', 'c']),
- ('/a/b/../c/', ['a', 'c']),
- ('/a/b/../../c', ['c']),
- ('/a/b/../../c/', ['c']),
- ('/a/b/../../c/..', []),
- ('/a/b/../../c/../', [])]:
- self.assertEqual(ftp.toSegments(['x'], inp), outp)
-
- for inp in ['..', '../', 'a/../..', 'a/../../',
- '/..', '/../', '/a/../..', '/a/../../',
- '/a/b/../../..']:
- self.assertRaises(ftp.InvalidPath, ftp.toSegments, [], inp)
-
- for inp in ['../..', '../../', '../a/../..']:
- self.assertRaises(ftp.InvalidPath, ftp.toSegments, ['x'], inp)
-
-
-class BaseFTPRealmTests(unittest.TestCase):
- """
- Tests for L{ftp.BaseFTPRealm}, a base class to help define L{IFTPShell}
- realms with different user home directory policies.
- """
- def test_interface(self):
- """
- L{ftp.BaseFTPRealm} implements L{IRealm}.
- """
- self.assertTrue(verifyClass(IRealm, ftp.BaseFTPRealm))
-
-
- def test_getHomeDirectory(self):
- """
- L{ftp.BaseFTPRealm} calls its C{getHomeDirectory} method with the
- avatarId being requested to determine the home directory for that
- avatar.
- """
- result = filepath.FilePath(self.mktemp())
- avatars = []
- class TestRealm(ftp.BaseFTPRealm):
- def getHomeDirectory(self, avatarId):
- avatars.append(avatarId)
- return result
-
- realm = TestRealm(self.mktemp())
- iface, avatar, logout = realm.requestAvatar(
- "alice@example.com", None, ftp.IFTPShell)
- self.assertIsInstance(avatar, ftp.FTPShell)
- self.assertEqual(avatar.filesystemRoot, result)
-
-
- def test_anonymous(self):
- """
- L{ftp.BaseFTPRealm} returns an L{ftp.FTPAnonymousShell} instance for
- anonymous avatar requests.
- """
- anonymous = self.mktemp()
- realm = ftp.BaseFTPRealm(anonymous)
- iface, avatar, logout = realm.requestAvatar(
- checkers.ANONYMOUS, None, ftp.IFTPShell)
- self.assertIsInstance(avatar, ftp.FTPAnonymousShell)
- self.assertEqual(avatar.filesystemRoot, filepath.FilePath(anonymous))
-
-
- def test_notImplemented(self):
- """
- L{ftp.BaseFTPRealm.getHomeDirectory} should be overridden by a subclass
- and raises L{NotImplementedError} if it is not.
- """
- realm = ftp.BaseFTPRealm(self.mktemp())
- self.assertRaises(NotImplementedError, realm.getHomeDirectory, object())
-
-
-
-class FTPRealmTestCase(unittest.TestCase):
- """
- Tests for L{ftp.FTPRealm}.
- """
- def test_getHomeDirectory(self):
- """
- L{ftp.FTPRealm} accepts an extra directory to its initializer and treats
- the avatarId passed to L{ftp.FTPRealm.getHomeDirectory} as a single path
- segment to construct a child of that directory.
- """
- base = '/path/to/home'
- realm = ftp.FTPRealm(self.mktemp(), base)
- home = realm.getHomeDirectory('alice@example.com')
- self.assertEqual(
- filepath.FilePath(base).child('alice@example.com'), home)
-
-
- def test_defaultHomeDirectory(self):
- """
- If no extra directory is passed to L{ftp.FTPRealm}, it uses C{"/home"}
- as the base directory containing all user home directories.
- """
- realm = ftp.FTPRealm(self.mktemp())
- home = realm.getHomeDirectory('alice@example.com')
- self.assertEqual(filepath.FilePath('/home/alice@example.com'), home)
-
-
-
-class SystemFTPRealmTests(unittest.TestCase):
- """
- Tests for L{ftp.SystemFTPRealm}.
- """
- skip = nonPOSIXSkip
-
- def test_getHomeDirectory(self):
- """
- L{ftp.SystemFTPRealm.getHomeDirectory} treats the avatarId passed to it
- as a username in the underlying platform and returns that account's home
- directory.
- """
- # Try to pick a username that will have a home directory.
- user = getpass.getuser()
-
- # Try to find their home directory in a different way than used by the
- # implementation. Maybe this is silly and can only introduce spurious
- # failures due to system-specific configurations.
- import pwd
- expected = pwd.getpwnam(user).pw_dir
-
- realm = ftp.SystemFTPRealm(self.mktemp())
- home = realm.getHomeDirectory(user)
- self.assertEqual(home, filepath.FilePath(expected))
-
-
- def test_noSuchUser(self):
- """
- L{ftp.SystemFTPRealm.getHomeDirectory} raises L{UnauthorizedLogin} when
- passed a username which has no corresponding home directory in the
- system's accounts database.
- """
- user = insecureRandom(4).encode('hex')
- realm = ftp.SystemFTPRealm(self.mktemp())
- self.assertRaises(UnauthorizedLogin, realm.getHomeDirectory, user)
-
-
-
-class ErrnoToFailureTestCase(unittest.TestCase):
- """
- Tests for L{ftp.errnoToFailure} errno checking.
- """
-
- def test_notFound(self):
- """
- C{errno.ENOENT} should be translated to L{ftp.FileNotFoundError}.
- """
- d = ftp.errnoToFailure(errno.ENOENT, "foo")
- return self.assertFailure(d, ftp.FileNotFoundError)
-
-
- def test_permissionDenied(self):
- """
- C{errno.EPERM} should be translated to L{ftp.PermissionDeniedError}.
- """
- d = ftp.errnoToFailure(errno.EPERM, "foo")
- return self.assertFailure(d, ftp.PermissionDeniedError)
-
-
- def test_accessDenied(self):
- """
- C{errno.EACCES} should be translated to L{ftp.PermissionDeniedError}.
- """
- d = ftp.errnoToFailure(errno.EACCES, "foo")
- return self.assertFailure(d, ftp.PermissionDeniedError)
-
-
- def test_notDirectory(self):
- """
- C{errno.ENOTDIR} should be translated to L{ftp.IsNotADirectoryError}.
- """
- d = ftp.errnoToFailure(errno.ENOTDIR, "foo")
- return self.assertFailure(d, ftp.IsNotADirectoryError)
-
-
- def test_fileExists(self):
- """
- C{errno.EEXIST} should be translated to L{ftp.FileExistsError}.
- """
- d = ftp.errnoToFailure(errno.EEXIST, "foo")
- return self.assertFailure(d, ftp.FileExistsError)
-
-
- def test_isDirectory(self):
- """
- C{errno.EISDIR} should be translated to L{ftp.IsADirectoryError}.
- """
- d = ftp.errnoToFailure(errno.EISDIR, "foo")
- return self.assertFailure(d, ftp.IsADirectoryError)
-
-
- def test_passThrough(self):
- """
- If an unknown errno is passed to L{ftp.errnoToFailure}, it should let
- the originating exception pass through.
- """
- try:
- raise RuntimeError("bar")
- except:
- d = ftp.errnoToFailure(-1, "foo")
- return self.assertFailure(d, RuntimeError)
-
-
-
-class AnonymousFTPShellTestCase(unittest.TestCase):
- """
- Test anynomous shell properties.
- """
-
- def test_anonymousWrite(self):
- """
- Check that L{ftp.FTPAnonymousShell} returns an error when trying to
- open it in write mode.
- """
- shell = ftp.FTPAnonymousShell('')
- d = shell.openForWriting(('foo',))
- self.assertFailure(d, ftp.PermissionDeniedError)
- return d
-
-
-
-class IFTPShellTestsMixin:
- """
- Generic tests for the C{IFTPShell} interface.
- """
-
- def directoryExists(self, path):
- """
- Test if the directory exists at C{path}.
-
- @param path: the relative path to check.
- @type path: C{str}.
-
- @return: C{True} if C{path} exists and is a directory, C{False} if
- it's not the case
- @rtype: C{bool}
- """
- raise NotImplementedError()
-
-
- def createDirectory(self, path):
- """
- Create a directory in C{path}.
-
- @param path: the relative path of the directory to create, with one
- segment.
- @type path: C{str}
- """
- raise NotImplementedError()
-
-
- def fileExists(self, path):
- """
- Test if the file exists at C{path}.
-
- @param path: the relative path to check.
- @type path: C{str}.
-
- @return: C{True} if C{path} exists and is a file, C{False} if it's not
- the case.
- @rtype: C{bool}
- """
- raise NotImplementedError()
-
-
- def createFile(self, path, fileContent=''):
- """
- Create a file named C{path} with some content.
-
- @param path: the relative path of the file to create, without
- directory.
- @type path: C{str}
-
- @param fileContent: the content of the file.
- @type fileContent: C{str}
- """
- raise NotImplementedError()
-
-
- def test_createDirectory(self):
- """
- C{directoryExists} should report correctly about directory existence,
- and C{createDirectory} should create a directory detectable by
- C{directoryExists}.
- """
- self.assertFalse(self.directoryExists('bar'))
- self.createDirectory('bar')
- self.assertTrue(self.directoryExists('bar'))
-
-
- def test_createFile(self):
- """
- C{fileExists} should report correctly about file existence, and
- C{createFile} should create a file detectable by C{fileExists}.
- """
- self.assertFalse(self.fileExists('file.txt'))
- self.createFile('file.txt')
- self.assertTrue(self.fileExists('file.txt'))
-
-
- def test_makeDirectory(self):
- """
- Create a directory and check it ends in the filesystem.
- """
- d = self.shell.makeDirectory(('foo',))
- def cb(result):
- self.assertTrue(self.directoryExists('foo'))
- return d.addCallback(cb)
-
-
- def test_makeDirectoryError(self):
- """
- Creating a directory that already exists should fail with a
- C{ftp.FileExistsError}.
- """
- self.createDirectory('foo')
- d = self.shell.makeDirectory(('foo',))
- return self.assertFailure(d, ftp.FileExistsError)
-
-
- def test_removeDirectory(self):
- """
- Try to remove a directory and check it's removed from the filesystem.
- """
- self.createDirectory('bar')
- d = self.shell.removeDirectory(('bar',))
- def cb(result):
- self.assertFalse(self.directoryExists('bar'))
- return d.addCallback(cb)
-
-
- def test_removeDirectoryOnFile(self):
- """
- removeDirectory should not work in file and fail with a
- C{ftp.IsNotADirectoryError}.
- """
- self.createFile('file.txt')
- d = self.shell.removeDirectory(('file.txt',))
- return self.assertFailure(d, ftp.IsNotADirectoryError)
-
-
- def test_removeNotExistingDirectory(self):
- """
- Removing directory that doesn't exist should fail with a
- C{ftp.FileNotFoundError}.
- """
- d = self.shell.removeDirectory(('bar',))
- return self.assertFailure(d, ftp.FileNotFoundError)
-
-
- def test_removeFile(self):
- """
- Try to remove a file and check it's removed from the filesystem.
- """
- self.createFile('file.txt')
- d = self.shell.removeFile(('file.txt',))
- def cb(res):
- self.assertFalse(self.fileExists('file.txt'))
- d.addCallback(cb)
- return d
-
-
- def test_removeFileOnDirectory(self):
- """
- removeFile should not work on directory.
- """
- self.createDirectory('ned')
- d = self.shell.removeFile(('ned',))
- return self.assertFailure(d, ftp.IsADirectoryError)
-
-
- def test_removeNotExistingFile(self):
- """
- Try to remove a non existent file, and check it raises a
- L{ftp.FileNotFoundError}.
- """
- d = self.shell.removeFile(('foo',))
- return self.assertFailure(d, ftp.FileNotFoundError)
-
-
- def test_list(self):
- """
- Check the output of the list method.
- """
- self.createDirectory('ned')
- self.createFile('file.txt')
- d = self.shell.list(('.',))
- def cb(l):
- l.sort()
- self.assertEqual(l,
- [('file.txt', []), ('ned', [])])
- return d.addCallback(cb)
-
-
- def test_listWithStat(self):
- """
- Check the output of list with asked stats.
- """
- self.createDirectory('ned')
- self.createFile('file.txt')
- d = self.shell.list(('.',), ('size', 'permissions',))
- def cb(l):
- l.sort()
- self.assertEqual(len(l), 2)
- self.assertEqual(l[0][0], 'file.txt')
- self.assertEqual(l[1][0], 'ned')
- # Size and permissions are reported differently between platforms
- # so just check they are present
- self.assertEqual(len(l[0][1]), 2)
- self.assertEqual(len(l[1][1]), 2)
- return d.addCallback(cb)
-
-
- def test_listWithInvalidStat(self):
- """
- Querying an invalid stat should result to a C{AttributeError}.
- """
- self.createDirectory('ned')
- d = self.shell.list(('.',), ('size', 'whateverstat',))
- return self.assertFailure(d, AttributeError)
-
-
- def test_listFile(self):
- """
- Check the output of the list method on a file.
- """
- self.createFile('file.txt')
- d = self.shell.list(('file.txt',))
- def cb(l):
- l.sort()
- self.assertEqual(l,
- [('file.txt', [])])
- return d.addCallback(cb)
-
-
- def test_listNotExistingDirectory(self):
- """
- list on a directory that doesn't exist should fail with a
- L{ftp.FileNotFoundError}.
- """
- d = self.shell.list(('foo',))
- return self.assertFailure(d, ftp.FileNotFoundError)
-
-
- def test_access(self):
- """
- Try to access a resource.
- """
- self.createDirectory('ned')
- d = self.shell.access(('ned',))
- return d
-
-
- def test_accessNotFound(self):
- """
- access should fail on a resource that doesn't exist.
- """
- d = self.shell.access(('foo',))
- return self.assertFailure(d, ftp.FileNotFoundError)
-
-
- def test_openForReading(self):
- """
- Check that openForReading returns an object providing C{ftp.IReadFile}.
- """
- self.createFile('file.txt')
- d = self.shell.openForReading(('file.txt',))
- def cb(res):
- self.assertTrue(ftp.IReadFile.providedBy(res))
- d.addCallback(cb)
- return d
-
-
- def test_openForReadingNotFound(self):
- """
- openForReading should fail with a C{ftp.FileNotFoundError} on a file
- that doesn't exist.
- """
- d = self.shell.openForReading(('ned',))
- return self.assertFailure(d, ftp.FileNotFoundError)
-
-
- def test_openForReadingOnDirectory(self):
- """
- openForReading should not work on directory.
- """
- self.createDirectory('ned')
- d = self.shell.openForReading(('ned',))
- return self.assertFailure(d, ftp.IsADirectoryError)
-
-
- def test_openForWriting(self):
- """
- Check that openForWriting returns an object providing C{ftp.IWriteFile}.
- """
- d = self.shell.openForWriting(('foo',))
- def cb1(res):
- self.assertTrue(ftp.IWriteFile.providedBy(res))
- return res.receive().addCallback(cb2)
- def cb2(res):
- self.assertTrue(IConsumer.providedBy(res))
- d.addCallback(cb1)
- return d
-
-
- def test_openForWritingExistingDirectory(self):
- """
- openForWriting should not be able to open a directory that already
- exists.
- """
- self.createDirectory('ned')
- d = self.shell.openForWriting(('ned',))
- return self.assertFailure(d, ftp.IsADirectoryError)
-
-
- def test_openForWritingInNotExistingDirectory(self):
- """
- openForWring should fail with a L{ftp.FileNotFoundError} if you specify
- a file in a directory that doesn't exist.
- """
- self.createDirectory('ned')
- d = self.shell.openForWriting(('ned', 'idonotexist', 'foo'))
- return self.assertFailure(d, ftp.FileNotFoundError)
-
-
- def test_statFile(self):
- """
- Check the output of the stat method on a file.
- """
- fileContent = 'wobble\n'
- self.createFile('file.txt', fileContent)
- d = self.shell.stat(('file.txt',), ('size', 'directory'))
- def cb(res):
- self.assertEqual(res[0], len(fileContent))
- self.assertFalse(res[1])
- d.addCallback(cb)
- return d
-
-
- def test_statDirectory(self):
- """
- Check the output of the stat method on a directory.
- """
- self.createDirectory('ned')
- d = self.shell.stat(('ned',), ('size', 'directory'))
- def cb(res):
- self.assertTrue(res[1])
- d.addCallback(cb)
- return d
-
-
- def test_statOwnerGroup(self):
- """
- Check the owner and groups stats.
- """
- self.createDirectory('ned')
- d = self.shell.stat(('ned',), ('owner', 'group'))
- def cb(res):
- self.assertEqual(len(res), 2)
- d.addCallback(cb)
- return d
-
-
- def test_statNotExisting(self):
- """
- stat should fail with L{ftp.FileNotFoundError} on a file that doesn't
- exist.
- """
- d = self.shell.stat(('foo',), ('size', 'directory'))
- return self.assertFailure(d, ftp.FileNotFoundError)
-
-
- def test_invalidStat(self):
- """
- Querying an invalid stat should result to a C{AttributeError}.
- """
- self.createDirectory('ned')
- d = self.shell.stat(('ned',), ('size', 'whateverstat'))
- return self.assertFailure(d, AttributeError)
-
-
- def test_rename(self):
- """
- Try to rename a directory.
- """
- self.createDirectory('ned')
- d = self.shell.rename(('ned',), ('foo',))
- def cb(res):
- self.assertTrue(self.directoryExists('foo'))
- self.assertFalse(self.directoryExists('ned'))
- return d.addCallback(cb)
-
-
- def test_renameNotExisting(self):
- """
- Renaming a directory that doesn't exist should fail with
- L{ftp.FileNotFoundError}.
- """
- d = self.shell.rename(('foo',), ('bar',))
- return self.assertFailure(d, ftp.FileNotFoundError)
-
-
-
-class FTPShellTestCase(unittest.TestCase, IFTPShellTestsMixin):
- """
- Tests for the C{ftp.FTPShell} object.
- """
-
- def setUp(self):
- """
- Create a root directory and instantiate a shell.
- """
- self.root = filepath.FilePath(self.mktemp())
- self.root.createDirectory()
- self.shell = ftp.FTPShell(self.root)
-
-
- def directoryExists(self, path):
- """
- Test if the directory exists at C{path}.
- """
- return self.root.child(path).isdir()
-
-
- def createDirectory(self, path):
- """
- Create a directory in C{path}.
- """
- return self.root.child(path).createDirectory()
-
-
- def fileExists(self, path):
- """
- Test if the file exists at C{path}.
- """
- return self.root.child(path).isfile()
-
-
- def createFile(self, path, fileContent=''):
- """
- Create a file named C{path} with some content.
- """
- return self.root.child(path).setContent(fileContent)
-
-
-
-class TestConsumer(object):
- """
- A simple consumer for tests. It only works with non-streaming producers.
-
- @ivar producer: an object providing
- L{twisted.internet.interfaces.IPullProducer}.
- """
-
- implements(IConsumer)
- producer = None
-
- def registerProducer(self, producer, streaming):
- """
- Simple register of producer, checks that no register has happened
- before.
- """
- assert self.producer is None
- self.buffer = []
- self.producer = producer
- self.producer.resumeProducing()
-
-
- def unregisterProducer(self):
- """
- Unregister the producer, it should be done after a register.
- """
- assert self.producer is not None
- self.producer = None
-
-
- def write(self, data):
- """
- Save the data received.
- """
- self.buffer.append(data)
- self.producer.resumeProducing()
-
-
-
-class TestProducer(object):
- """
- A dumb producer.
- """
-
- def __init__(self, toProduce, consumer):
- """
- @param toProduce: data to write
- @type toProduce: C{str}
- @param consumer: the consumer of data.
- @type consumer: C{IConsumer}
- """
- self.toProduce = toProduce
- self.consumer = consumer
-
-
- def start(self):
- """
- Send the data to consume.
- """
- self.consumer.write(self.toProduce)
-
-
-
-class IReadWriteTestsMixin:
- """
- Generic tests for the C{IReadFile} and C{IWriteFile} interfaces.
- """
-
- def getFileReader(self, content):
- """
- Return an object providing C{IReadFile}, ready to send data C{content}.
- """
- raise NotImplementedError()
-
-
- def getFileWriter(self):
- """
- Return an object providing C{IWriteFile}, ready to receive data.
- """
- raise NotImplementedError()
-
-
- def getFileContent(self):
- """
- Return the content of the file used.
- """
- raise NotImplementedError()
-
-
- def test_read(self):
- """
- Test L{ftp.IReadFile}: the implementation should have a send method
- returning a C{Deferred} which fires when all the data has been sent
- to the consumer, and the data should be correctly send to the consumer.
- """
- content = 'wobble\n'
- consumer = TestConsumer()
- def cbGet(reader):
- return reader.send(consumer).addCallback(cbSend)
- def cbSend(res):
- self.assertEqual("".join(consumer.buffer), content)
- return self.getFileReader(content).addCallback(cbGet)
-
-
- def test_write(self):
- """
- Test L{ftp.IWriteFile}: the implementation should have a receive
- method returning a C{Deferred} which fires with a consumer ready to
- receive data to be written. It should also have a close() method that
- returns a Deferred.
- """
- content = 'elbbow\n'
- def cbGet(writer):
- return writer.receive().addCallback(cbReceive, writer)
- def cbReceive(consumer, writer):
- producer = TestProducer(content, consumer)
- consumer.registerProducer(None, True)
- producer.start()
- consumer.unregisterProducer()
- return writer.close().addCallback(cbClose)
- def cbClose(ignored):
- self.assertEqual(self.getFileContent(), content)
- return self.getFileWriter().addCallback(cbGet)
-
-
-
-class FTPReadWriteTestCase(unittest.TestCase, IReadWriteTestsMixin):
- """
- Tests for C{ftp._FileReader} and C{ftp._FileWriter}, the objects returned
- by the shell in C{openForReading}/C{openForWriting}.
- """
-
- def setUp(self):
- """
- Create a temporary file used later.
- """
- self.root = filepath.FilePath(self.mktemp())
- self.root.createDirectory()
- self.shell = ftp.FTPShell(self.root)
- self.filename = "file.txt"
-
-
- def getFileReader(self, content):
- """
- Return a C{ftp._FileReader} instance with a file opened for reading.
- """
- self.root.child(self.filename).setContent(content)
- return self.shell.openForReading((self.filename,))
-
-
- def getFileWriter(self):
- """
- Return a C{ftp._FileWriter} instance with a file opened for writing.
- """
- return self.shell.openForWriting((self.filename,))
-
-
- def getFileContent(self):
- """
- Return the content of the temporary file.
- """
- return self.root.child(self.filename).getContent()
-
-
-class CloseTestWriter:
- implements(ftp.IWriteFile)
- closeStarted = False
- def receive(self):
- self.s = StringIO()
- fc = ftp.FileConsumer(self.s)
- return defer.succeed(fc)
- def close(self):
- self.closeStarted = True
- return self.d
-
-class CloseTestShell:
- def openForWriting(self, segs):
- return defer.succeed(self.writer)
-
-class FTPCloseTest(unittest.TestCase):
- """Tests that the server invokes IWriteFile.close"""
-
- def test_write(self):
- """Confirm that FTP uploads (i.e. ftp_STOR) correctly call and wait
- upon the IWriteFile object's close() method"""
- f = ftp.FTP()
- f.workingDirectory = ["root"]
- f.shell = CloseTestShell()
- f.shell.writer = CloseTestWriter()
- f.shell.writer.d = defer.Deferred()
- f.factory = ftp.FTPFactory()
- f.factory.timeOut = None
- f.makeConnection(StringIO())
-
- di = ftp.DTP()
- di.factory = ftp.DTPFactory(f)
- f.dtpInstance = di
- di.makeConnection(None)#
-
- stor_done = []
- d = f.ftp_STOR("path")
- d.addCallback(stor_done.append)
- # the writer is still receiving data
- self.assertFalse(f.shell.writer.closeStarted, "close() called early")
- di.dataReceived("some data here")
- self.assertFalse(f.shell.writer.closeStarted, "close() called early")
- di.connectionLost("reason is ignored")
- # now we should be waiting in close()
- self.assertTrue(f.shell.writer.closeStarted, "close() not called")
- self.assertFalse(stor_done)
- f.shell.writer.d.callback("allow close() to finish")
- self.assertTrue(stor_done)
-
- return d # just in case an errback occurred
-
-
-
-class FTPResponseCodeTests(unittest.TestCase):
- """
- Tests relating directly to response codes.
- """
- def test_unique(self):
- """
- All of the response code globals (for example C{RESTART_MARKER_REPLY} or
- C{USR_NAME_OK_NEED_PASS}) have unique values and are present in the
- C{RESPONSE} dictionary.
- """
- allValues = set(ftp.RESPONSE)
- seenValues = set()
-
- for key, value in vars(ftp).items():
- if isinstance(value, str) and key.isupper():
- self.assertIn(
- value, allValues,
- "Code %r with value %r missing from RESPONSE dict" % (
- key, value))
- self.assertNotIn(
- value, seenValues,
- "Duplicate code %r with value %r" % (key, value))
- seenValues.add(value)
-