diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_cftp.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_cftp.py | 975 |
1 files changed, 0 insertions, 975 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_cftp.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_cftp.py deleted file mode 100755 index 03e327a6..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_cftp.py +++ /dev/null @@ -1,975 +0,0 @@ -# -*- test-case-name: twisted.conch.test.test_cftp -*- -# Copyright (c) 2001-2009 Twisted Matrix Laboratories. -# See LICENSE file for details. - -""" -Tests for L{twisted.conch.scripts.cftp}. -""" - -import locale -import time, sys, os, operator, getpass, struct -from StringIO import StringIO - -from twisted.conch.test.test_ssh import Crypto, pyasn1 - -_reason = None -if Crypto and pyasn1: - try: - from twisted.conch import unix - from twisted.conch.scripts import cftp - from twisted.conch.test.test_filetransfer import FileTransferForTestAvatar - except ImportError, e: - # Python 2.3 compatibility fix - sys.modules.pop("twisted.conch.unix", None) - unix = None - _reason = str(e) - del e -else: - unix = None - - -from twisted.python.fakepwd import UserDatabase -from twisted.trial.unittest import TestCase -from twisted.cred import portal -from twisted.internet import reactor, protocol, interfaces, defer, error -from twisted.internet.utils import getProcessOutputAndValue -from twisted.python import log -from twisted.conch import ls -from twisted.test.proto_helpers import StringTransport -from twisted.internet.task import Clock - -from twisted.conch.test import test_ssh, test_conch -from twisted.conch.test.test_filetransfer import SFTPTestBase -from twisted.conch.test.test_filetransfer import FileTransferTestAvatar - - - -class ListingTests(TestCase): - """ - Tests for L{lsLine}, the function which generates an entry for a file or - directory in an SFTP I{ls} command's output. - """ - if getattr(time, 'tzset', None) is None: - skip = "Cannot test timestamp formatting code without time.tzset" - - def setUp(self): - """ - Patch the L{ls} module's time function so the results of L{lsLine} are - deterministic. - """ - self.now = 123456789 - def fakeTime(): - return self.now - self.patch(ls, 'time', fakeTime) - - # Make sure that the timezone ends up the same after these tests as - # it was before. - if 'TZ' in os.environ: - self.addCleanup(operator.setitem, os.environ, 'TZ', os.environ['TZ']) - self.addCleanup(time.tzset) - else: - def cleanup(): - # os.environ.pop is broken! Don't use it! Ever! Or die! - try: - del os.environ['TZ'] - except KeyError: - pass - time.tzset() - self.addCleanup(cleanup) - - - def _lsInTimezone(self, timezone, stat): - """ - Call L{ls.lsLine} after setting the timezone to C{timezone} and return - the result. - """ - # Set the timezone to a well-known value so the timestamps are - # predictable. - os.environ['TZ'] = timezone - time.tzset() - return ls.lsLine('foo', stat) - - - def test_oldFile(self): - """ - A file with an mtime six months (approximately) or more in the past has - a listing including a low-resolution timestamp. - """ - # Go with 7 months. That's more than 6 months. - then = self.now - (60 * 60 * 24 * 31 * 7) - stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0)) - - self.assertEqual( - self._lsInTimezone('America/New_York', stat), - '!--------- 0 0 0 0 Apr 26 1973 foo') - self.assertEqual( - self._lsInTimezone('Pacific/Auckland', stat), - '!--------- 0 0 0 0 Apr 27 1973 foo') - - - def test_oldSingleDigitDayOfMonth(self): - """ - A file with a high-resolution timestamp which falls on a day of the - month which can be represented by one decimal digit is formatted with - one padding 0 to preserve the columns which come after it. - """ - # A point about 7 months in the past, tweaked to fall on the first of a - # month so we test the case we want to test. - then = self.now - (60 * 60 * 24 * 31 * 7) + (60 * 60 * 24 * 5) - stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0)) - - self.assertEqual( - self._lsInTimezone('America/New_York', stat), - '!--------- 0 0 0 0 May 01 1973 foo') - self.assertEqual( - self._lsInTimezone('Pacific/Auckland', stat), - '!--------- 0 0 0 0 May 02 1973 foo') - - - def test_newFile(self): - """ - A file with an mtime fewer than six months (approximately) in the past - has a listing including a high-resolution timestamp excluding the year. - """ - # A point about three months in the past. - then = self.now - (60 * 60 * 24 * 31 * 3) - stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0)) - - self.assertEqual( - self._lsInTimezone('America/New_York', stat), - '!--------- 0 0 0 0 Aug 28 17:33 foo') - self.assertEqual( - self._lsInTimezone('Pacific/Auckland', stat), - '!--------- 0 0 0 0 Aug 29 09:33 foo') - - - def test_localeIndependent(self): - """ - The month name in the date is locale independent. - """ - # A point about three months in the past. - then = self.now - (60 * 60 * 24 * 31 * 3) - stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0)) - - # Fake that we're in a language where August is not Aug (e.g.: Spanish) - currentLocale = locale.getlocale() - locale.setlocale(locale.LC_ALL, "es_AR.UTF8") - self.addCleanup(locale.setlocale, locale.LC_ALL, currentLocale) - - self.assertEqual( - self._lsInTimezone('America/New_York', stat), - '!--------- 0 0 0 0 Aug 28 17:33 foo') - self.assertEqual( - self._lsInTimezone('Pacific/Auckland', stat), - '!--------- 0 0 0 0 Aug 29 09:33 foo') - - # if alternate locale is not available, the previous test will be - # skipped, please install this locale for it to run - currentLocale = locale.getlocale() - try: - try: - locale.setlocale(locale.LC_ALL, "es_AR.UTF8") - except locale.Error: - test_localeIndependent.skip = "The es_AR.UTF8 locale is not installed." - finally: - locale.setlocale(locale.LC_ALL, currentLocale) - - - def test_newSingleDigitDayOfMonth(self): - """ - A file with a high-resolution timestamp which falls on a day of the - month which can be represented by one decimal digit is formatted with - one padding 0 to preserve the columns which come after it. - """ - # A point about three months in the past, tweaked to fall on the first - # of a month so we test the case we want to test. - then = self.now - (60 * 60 * 24 * 31 * 3) + (60 * 60 * 24 * 4) - stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0)) - - self.assertEqual( - self._lsInTimezone('America/New_York', stat), - '!--------- 0 0 0 0 Sep 01 17:33 foo') - self.assertEqual( - self._lsInTimezone('Pacific/Auckland', stat), - '!--------- 0 0 0 0 Sep 02 09:33 foo') - - - -class StdioClientTests(TestCase): - """ - Tests for L{cftp.StdioClient}. - """ - def setUp(self): - """ - Create a L{cftp.StdioClient} hooked up to dummy transport and a fake - user database. - """ - class Connection: - pass - - conn = Connection() - conn.transport = StringTransport() - conn.transport.localClosed = False - - self.client = cftp.StdioClient(conn) - self.database = self.client._pwd = UserDatabase() - - # Intentionally bypassing makeConnection - that triggers some code - # which uses features not provided by our dumb Connection fake. - self.client.transport = StringTransport() - - - def test_exec(self): - """ - The I{exec} command runs its arguments locally in a child process - using the user's shell. - """ - self.database.addUser( - getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar', - sys.executable) - - d = self.client._dispatchCommand("exec print 1 + 2") - d.addCallback(self.assertEqual, "3\n") - return d - - - def test_execWithoutShell(self): - """ - If the local user has no shell, the I{exec} command runs its arguments - using I{/bin/sh}. - """ - self.database.addUser( - getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar', '') - - d = self.client._dispatchCommand("exec echo hello") - d.addCallback(self.assertEqual, "hello\n") - return d - - - def test_bang(self): - """ - The I{exec} command is run for lines which start with C{"!"}. - """ - self.database.addUser( - getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar', - '/bin/sh') - - d = self.client._dispatchCommand("!echo hello") - d.addCallback(self.assertEqual, "hello\n") - return d - - - def setKnownConsoleSize(self, width, height): - """ - For the duration of this test, patch C{cftp}'s C{fcntl} module to return - a fixed width and height. - - @param width: the width in characters - @type width: C{int} - @param height: the height in characters - @type height: C{int} - """ - import tty # local import to avoid win32 issues - class FakeFcntl(object): - def ioctl(self, fd, opt, mutate): - if opt != tty.TIOCGWINSZ: - self.fail("Only window-size queries supported.") - return struct.pack("4H", height, width, 0, 0) - self.patch(cftp, "fcntl", FakeFcntl()) - - - def test_progressReporting(self): - """ - L{StdioClient._printProgressBar} prints a progress description, - including percent done, amount transferred, transfer rate, and time - remaining, all based the given start time, the given L{FileWrapper}'s - progress information and the reactor's current time. - """ - # Use a short, known console width because this simple test doesn't need - # to test the console padding. - self.setKnownConsoleSize(10, 34) - clock = self.client.reactor = Clock() - wrapped = StringIO("x") - wrapped.name = "sample" - wrapper = cftp.FileWrapper(wrapped) - wrapper.size = 1024 * 10 - startTime = clock.seconds() - clock.advance(2.0) - wrapper.total += 4096 - self.client._printProgressBar(wrapper, startTime) - self.assertEqual(self.client.transport.value(), - "\rsample 40% 4.0kB 2.0kBps 00:03 ") - - - def test_reportNoProgress(self): - """ - L{StdioClient._printProgressBar} prints a progress description that - indicates 0 bytes transferred if no bytes have been transferred and no - time has passed. - """ - self.setKnownConsoleSize(10, 34) - clock = self.client.reactor = Clock() - wrapped = StringIO("x") - wrapped.name = "sample" - wrapper = cftp.FileWrapper(wrapped) - startTime = clock.seconds() - self.client._printProgressBar(wrapper, startTime) - self.assertEqual(self.client.transport.value(), - "\rsample 0% 0.0B 0.0Bps 00:00 ") - - - -class FileTransferTestRealm: - def __init__(self, testDir): - self.testDir = testDir - - def requestAvatar(self, avatarID, mind, *interfaces): - a = FileTransferTestAvatar(self.testDir) - return interfaces[0], a, lambda: None - - -class SFTPTestProcess(protocol.ProcessProtocol): - """ - Protocol for testing cftp. Provides an interface between Python (where all - the tests are) and the cftp client process (which does the work that is - being tested). - """ - - def __init__(self, onOutReceived): - """ - @param onOutReceived: A L{Deferred} to be fired as soon as data is - received from stdout. - """ - self.clearBuffer() - self.onOutReceived = onOutReceived - self.onProcessEnd = None - self._expectingCommand = None - self._processEnded = False - - def clearBuffer(self): - """ - Clear any buffered data received from stdout. Should be private. - """ - self.buffer = '' - self._linesReceived = [] - self._lineBuffer = '' - - def outReceived(self, data): - """ - Called by Twisted when the cftp client prints data to stdout. - """ - log.msg('got %s' % data) - lines = (self._lineBuffer + data).split('\n') - self._lineBuffer = lines.pop(-1) - self._linesReceived.extend(lines) - # XXX - not strictly correct. - # We really want onOutReceived to fire after the first 'cftp>' prompt - # has been received. (See use in TestOurServerCmdLineClient.setUp) - if self.onOutReceived is not None: - d, self.onOutReceived = self.onOutReceived, None - d.callback(data) - self.buffer += data - self._checkForCommand() - - def _checkForCommand(self): - prompt = 'cftp> ' - if self._expectingCommand and self._lineBuffer == prompt: - buf = '\n'.join(self._linesReceived) - if buf.startswith(prompt): - buf = buf[len(prompt):] - self.clearBuffer() - d, self._expectingCommand = self._expectingCommand, None - d.callback(buf) - - def errReceived(self, data): - """ - Called by Twisted when the cftp client prints data to stderr. - """ - log.msg('err: %s' % data) - - def getBuffer(self): - """ - Return the contents of the buffer of data received from stdout. - """ - return self.buffer - - def runCommand(self, command): - """ - Issue the given command via the cftp client. Return a C{Deferred} that - fires when the server returns a result. Note that the C{Deferred} will - callback even if the server returns some kind of error. - - @param command: A string containing an sftp command. - - @return: A C{Deferred} that fires when the sftp server returns a - result. The payload is the server's response string. - """ - self._expectingCommand = defer.Deferred() - self.clearBuffer() - self.transport.write(command + '\n') - return self._expectingCommand - - def runScript(self, commands): - """ - Run each command in sequence and return a Deferred that fires when all - commands are completed. - - @param commands: A list of strings containing sftp commands. - - @return: A C{Deferred} that fires when all commands are completed. The - payload is a list of response strings from the server, in the same - order as the commands. - """ - sem = defer.DeferredSemaphore(1) - dl = [sem.run(self.runCommand, command) for command in commands] - return defer.gatherResults(dl) - - def killProcess(self): - """ - Kill the process if it is still running. - - If the process is still running, sends a KILL signal to the transport - and returns a C{Deferred} which fires when L{processEnded} is called. - - @return: a C{Deferred}. - """ - if self._processEnded: - return defer.succeed(None) - self.onProcessEnd = defer.Deferred() - self.transport.signalProcess('KILL') - return self.onProcessEnd - - def processEnded(self, reason): - """ - Called by Twisted when the cftp client process ends. - """ - self._processEnded = True - if self.onProcessEnd: - d, self.onProcessEnd = self.onProcessEnd, None - d.callback(None) - - -class CFTPClientTestBase(SFTPTestBase): - def setUp(self): - f = open('dsa_test.pub','w') - f.write(test_ssh.publicDSA_openssh) - f.close() - f = open('dsa_test','w') - f.write(test_ssh.privateDSA_openssh) - f.close() - os.chmod('dsa_test', 33152) - f = open('kh_test','w') - f.write('127.0.0.1 ' + test_ssh.publicRSA_openssh) - f.close() - return SFTPTestBase.setUp(self) - - def startServer(self): - realm = FileTransferTestRealm(self.testDir) - p = portal.Portal(realm) - p.registerChecker(test_ssh.ConchTestPublicKeyChecker()) - fac = test_ssh.ConchTestServerFactory() - fac.portal = p - self.server = reactor.listenTCP(0, fac, interface="127.0.0.1") - - def stopServer(self): - if not hasattr(self.server.factory, 'proto'): - return self._cbStopServer(None) - self.server.factory.proto.expectedLoseConnection = 1 - d = defer.maybeDeferred( - self.server.factory.proto.transport.loseConnection) - d.addCallback(self._cbStopServer) - return d - - def _cbStopServer(self, ignored): - return defer.maybeDeferred(self.server.stopListening) - - def tearDown(self): - for f in ['dsa_test.pub', 'dsa_test', 'kh_test']: - try: - os.remove(f) - except: - pass - return SFTPTestBase.tearDown(self) - - - -class TestOurServerCmdLineClient(CFTPClientTestBase): - - def setUp(self): - CFTPClientTestBase.setUp(self) - - self.startServer() - cmds = ('-p %i -l testuser ' - '--known-hosts kh_test ' - '--user-authentications publickey ' - '--host-key-algorithms ssh-rsa ' - '-i dsa_test ' - '-a ' - '-v ' - '127.0.0.1') - port = self.server.getHost().port - cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp') - log.msg('running %s %s' % (sys.executable, cmds)) - d = defer.Deferred() - self.processProtocol = SFTPTestProcess(d) - d.addCallback(lambda _: self.processProtocol.clearBuffer()) - env = os.environ.copy() - env['PYTHONPATH'] = os.pathsep.join(sys.path) - reactor.spawnProcess(self.processProtocol, sys.executable, cmds, - env=env) - return d - - def tearDown(self): - d = self.stopServer() - d.addCallback(lambda _: self.processProtocol.killProcess()) - return d - - def _killProcess(self, ignored): - try: - self.processProtocol.transport.signalProcess('KILL') - except error.ProcessExitedAlready: - pass - - def runCommand(self, command): - """ - Run the given command with the cftp client. Return a C{Deferred} that - fires when the command is complete. Payload is the server's output for - that command. - """ - return self.processProtocol.runCommand(command) - - def runScript(self, *commands): - """ - Run the given commands with the cftp client. Returns a C{Deferred} - that fires when the commands are all complete. The C{Deferred}'s - payload is a list of output for each command. - """ - return self.processProtocol.runScript(commands) - - def testCdPwd(self): - """ - Test that 'pwd' reports the current remote directory, that 'lpwd' - reports the current local directory, and that changing to a - subdirectory then changing to its parent leaves you in the original - remote directory. - """ - # XXX - not actually a unit test, see docstring. - homeDir = os.path.join(os.getcwd(), self.testDir) - d = self.runScript('pwd', 'lpwd', 'cd testDirectory', 'cd ..', 'pwd') - d.addCallback(lambda xs: xs[:3] + xs[4:]) - d.addCallback(self.assertEqual, - [homeDir, os.getcwd(), '', homeDir]) - return d - - def testChAttrs(self): - """ - Check that 'ls -l' output includes the access permissions and that - this output changes appropriately with 'chmod'. - """ - def _check(results): - self.flushLoggedErrors() - self.assertTrue(results[0].startswith('-rw-r--r--')) - self.assertEqual(results[1], '') - self.assertTrue(results[2].startswith('----------'), results[2]) - self.assertEqual(results[3], '') - - d = self.runScript('ls -l testfile1', 'chmod 0 testfile1', - 'ls -l testfile1', 'chmod 644 testfile1') - return d.addCallback(_check) - # XXX test chgrp/own - - - def testList(self): - """ - Check 'ls' works as expected. Checks for wildcards, hidden files, - listing directories and listing empty directories. - """ - def _check(results): - self.assertEqual(results[0], ['testDirectory', 'testRemoveFile', - 'testRenameFile', 'testfile1']) - self.assertEqual(results[1], ['testDirectory', 'testRemoveFile', - 'testRenameFile', 'testfile1']) - self.assertEqual(results[2], ['testRemoveFile', 'testRenameFile']) - self.assertEqual(results[3], ['.testHiddenFile', 'testRemoveFile', - 'testRenameFile']) - self.assertEqual(results[4], ['']) - d = self.runScript('ls', 'ls ../' + os.path.basename(self.testDir), - 'ls *File', 'ls -a *File', 'ls -l testDirectory') - d.addCallback(lambda xs: [x.split('\n') for x in xs]) - return d.addCallback(_check) - - - def testHelp(self): - """ - Check that running the '?' command returns help. - """ - d = self.runCommand('?') - d.addCallback(self.assertEqual, - cftp.StdioClient(None).cmd_HELP('').strip()) - return d - - def assertFilesEqual(self, name1, name2, msg=None): - """ - Assert that the files at C{name1} and C{name2} contain exactly the - same data. - """ - f1 = file(name1).read() - f2 = file(name2).read() - self.assertEqual(f1, f2, msg) - - - def testGet(self): - """ - Test that 'get' saves the remote file to the correct local location, - that the output of 'get' is correct and that 'rm' actually removes - the file. - """ - # XXX - not actually a unit test - expectedOutput = ("Transferred %s/%s/testfile1 to %s/test file2" - % (os.getcwd(), self.testDir, self.testDir)) - def _checkGet(result): - self.assertTrue(result.endswith(expectedOutput)) - self.assertFilesEqual(self.testDir + '/testfile1', - self.testDir + '/test file2', - "get failed") - return self.runCommand('rm "test file2"') - - d = self.runCommand('get testfile1 "%s/test file2"' % (self.testDir,)) - d.addCallback(_checkGet) - d.addCallback(lambda _: self.failIf( - os.path.exists(self.testDir + '/test file2'))) - return d - - - def testWildcardGet(self): - """ - Test that 'get' works correctly when given wildcard parameters. - """ - def _check(ignored): - self.assertFilesEqual(self.testDir + '/testRemoveFile', - 'testRemoveFile', - 'testRemoveFile get failed') - self.assertFilesEqual(self.testDir + '/testRenameFile', - 'testRenameFile', - 'testRenameFile get failed') - - d = self.runCommand('get testR*') - return d.addCallback(_check) - - - def testPut(self): - """ - Check that 'put' uploads files correctly and that they can be - successfully removed. Also check the output of the put command. - """ - # XXX - not actually a unit test - expectedOutput = ('Transferred %s/testfile1 to %s/%s/test"file2' - % (self.testDir, os.getcwd(), self.testDir)) - def _checkPut(result): - self.assertFilesEqual(self.testDir + '/testfile1', - self.testDir + '/test"file2') - self.failUnless(result.endswith(expectedOutput)) - return self.runCommand('rm "test\\"file2"') - - d = self.runCommand('put %s/testfile1 "test\\"file2"' - % (self.testDir,)) - d.addCallback(_checkPut) - d.addCallback(lambda _: self.failIf( - os.path.exists(self.testDir + '/test"file2'))) - return d - - - def test_putOverLongerFile(self): - """ - Check that 'put' uploads files correctly when overwriting a longer - file. - """ - # XXX - not actually a unit test - f = file(os.path.join(self.testDir, 'shorterFile'), 'w') - f.write("a") - f.close() - f = file(os.path.join(self.testDir, 'longerFile'), 'w') - f.write("bb") - f.close() - def _checkPut(result): - self.assertFilesEqual(self.testDir + '/shorterFile', - self.testDir + '/longerFile') - - d = self.runCommand('put %s/shorterFile longerFile' - % (self.testDir,)) - d.addCallback(_checkPut) - return d - - - def test_putMultipleOverLongerFile(self): - """ - Check that 'put' uploads files correctly when overwriting a longer - file and you use a wildcard to specify the files to upload. - """ - # XXX - not actually a unit test - os.mkdir(os.path.join(self.testDir, 'dir')) - f = file(os.path.join(self.testDir, 'dir', 'file'), 'w') - f.write("a") - f.close() - f = file(os.path.join(self.testDir, 'file'), 'w') - f.write("bb") - f.close() - def _checkPut(result): - self.assertFilesEqual(self.testDir + '/dir/file', - self.testDir + '/file') - - d = self.runCommand('put %s/dir/*' - % (self.testDir,)) - d.addCallback(_checkPut) - return d - - - def testWildcardPut(self): - """ - What happens if you issue a 'put' command and include a wildcard (i.e. - '*') in parameter? Check that all files matching the wildcard are - uploaded to the correct directory. - """ - def check(results): - self.assertEqual(results[0], '') - self.assertEqual(results[2], '') - self.assertFilesEqual(self.testDir + '/testRemoveFile', - self.testDir + '/../testRemoveFile', - 'testRemoveFile get failed') - self.assertFilesEqual(self.testDir + '/testRenameFile', - self.testDir + '/../testRenameFile', - 'testRenameFile get failed') - - d = self.runScript('cd ..', - 'put %s/testR*' % (self.testDir,), - 'cd %s' % os.path.basename(self.testDir)) - d.addCallback(check) - return d - - - def testLink(self): - """ - Test that 'ln' creates a file which appears as a link in the output of - 'ls'. Check that removing the new file succeeds without output. - """ - def _check(results): - self.flushLoggedErrors() - self.assertEqual(results[0], '') - self.assertTrue(results[1].startswith('l'), 'link failed') - return self.runCommand('rm testLink') - - d = self.runScript('ln testLink testfile1', 'ls -l testLink') - d.addCallback(_check) - d.addCallback(self.assertEqual, '') - return d - - - def testRemoteDirectory(self): - """ - Test that we can create and remove directories with the cftp client. - """ - def _check(results): - self.assertEqual(results[0], '') - self.assertTrue(results[1].startswith('d')) - return self.runCommand('rmdir testMakeDirectory') - - d = self.runScript('mkdir testMakeDirectory', - 'ls -l testMakeDirector?') - d.addCallback(_check) - d.addCallback(self.assertEqual, '') - return d - - - def test_existingRemoteDirectory(self): - """ - Test that a C{mkdir} on an existing directory fails with the - appropriate error, and doesn't log an useless error server side. - """ - def _check(results): - self.assertEqual(results[0], '') - self.assertEqual(results[1], - 'remote error 11: mkdir failed') - - d = self.runScript('mkdir testMakeDirectory', - 'mkdir testMakeDirectory') - d.addCallback(_check) - return d - - - def testLocalDirectory(self): - """ - Test that we can create a directory locally and remove it with the - cftp client. This test works because the 'remote' server is running - out of a local directory. - """ - d = self.runCommand('lmkdir %s/testLocalDirectory' % (self.testDir,)) - d.addCallback(self.assertEqual, '') - d.addCallback(lambda _: self.runCommand('rmdir testLocalDirectory')) - d.addCallback(self.assertEqual, '') - return d - - - def testRename(self): - """ - Test that we can rename a file. - """ - def _check(results): - self.assertEqual(results[0], '') - self.assertEqual(results[1], 'testfile2') - return self.runCommand('rename testfile2 testfile1') - - d = self.runScript('rename testfile1 testfile2', 'ls testfile?') - d.addCallback(_check) - d.addCallback(self.assertEqual, '') - return d - - - -class TestOurServerBatchFile(CFTPClientTestBase): - def setUp(self): - CFTPClientTestBase.setUp(self) - self.startServer() - - def tearDown(self): - CFTPClientTestBase.tearDown(self) - return self.stopServer() - - def _getBatchOutput(self, f): - fn = self.mktemp() - open(fn, 'w').write(f) - port = self.server.getHost().port - cmds = ('-p %i -l testuser ' - '--known-hosts kh_test ' - '--user-authentications publickey ' - '--host-key-algorithms ssh-rsa ' - '-i dsa_test ' - '-a ' - '-v -b %s 127.0.0.1') % (port, fn) - cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:] - log.msg('running %s %s' % (sys.executable, cmds)) - env = os.environ.copy() - env['PYTHONPATH'] = os.pathsep.join(sys.path) - - self.server.factory.expectedLoseConnection = 1 - - d = getProcessOutputAndValue(sys.executable, cmds, env=env) - - def _cleanup(res): - os.remove(fn) - return res - - d.addCallback(lambda res: res[0]) - d.addBoth(_cleanup) - - return d - - def testBatchFile(self): - """Test whether batch file function of cftp ('cftp -b batchfile'). - This works by treating the file as a list of commands to be run. - """ - cmds = """pwd -ls -exit -""" - def _cbCheckResult(res): - res = res.split('\n') - log.msg('RES %s' % str(res)) - self.failUnless(res[1].find(self.testDir) != -1, repr(res)) - self.assertEqual(res[3:-2], ['testDirectory', 'testRemoveFile', - 'testRenameFile', 'testfile1']) - - d = self._getBatchOutput(cmds) - d.addCallback(_cbCheckResult) - return d - - def testError(self): - """Test that an error in the batch file stops running the batch. - """ - cmds = """chown 0 missingFile -pwd -exit -""" - def _cbCheckResult(res): - self.failIf(res.find(self.testDir) != -1) - - d = self._getBatchOutput(cmds) - d.addCallback(_cbCheckResult) - return d - - def testIgnoredError(self): - """Test that a minus sign '-' at the front of a line ignores - any errors. - """ - cmds = """-chown 0 missingFile -pwd -exit -""" - def _cbCheckResult(res): - self.failIf(res.find(self.testDir) == -1) - - d = self._getBatchOutput(cmds) - d.addCallback(_cbCheckResult) - return d - - - -class TestOurServerSftpClient(CFTPClientTestBase): - """ - Test the sftp server against sftp command line client. - """ - - def setUp(self): - CFTPClientTestBase.setUp(self) - return self.startServer() - - - def tearDown(self): - return self.stopServer() - - - def test_extendedAttributes(self): - """ - Test the return of extended attributes by the server: the sftp client - should ignore them, but still be able to parse the response correctly. - - This test is mainly here to check that - L{filetransfer.FILEXFER_ATTR_EXTENDED} has the correct value. - """ - fn = self.mktemp() - open(fn, 'w').write("ls .\nexit") - port = self.server.getHost().port - - oldGetAttr = FileTransferForTestAvatar._getAttrs - def _getAttrs(self, s): - attrs = oldGetAttr(self, s) - attrs["ext_foo"] = "bar" - return attrs - - self.patch(FileTransferForTestAvatar, "_getAttrs", _getAttrs) - - self.server.factory.expectedLoseConnection = True - cmds = ('-o', 'IdentityFile=dsa_test', - '-o', 'UserKnownHostsFile=kh_test', - '-o', 'HostKeyAlgorithms=ssh-rsa', - '-o', 'Port=%i' % (port,), '-b', fn, 'testuser@127.0.0.1') - d = getProcessOutputAndValue("sftp", cmds) - def check(result): - self.assertEqual(result[2], 0) - for i in ['testDirectory', 'testRemoveFile', - 'testRenameFile', 'testfile1']: - self.assertIn(i, result[0]) - return d.addCallback(check) - - - -if unix is None or Crypto is None or pyasn1 is None or interfaces.IReactorProcess(reactor, None) is None: - if _reason is None: - _reason = "don't run w/o spawnProcess or PyCrypto or pyasn1" - TestOurServerCmdLineClient.skip = _reason - TestOurServerBatchFile.skip = _reason - TestOurServerSftpClient.skip = _reason - StdioClientTests.skip = _reason -else: - from twisted.python.procutils import which - if not which('sftp'): - TestOurServerSftpClient.skip = "no sftp command-line client available" |