diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/trial/runner.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/trial/runner.py | 861 |
1 files changed, 0 insertions, 861 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/trial/runner.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/trial/runner.py deleted file mode 100755 index 68663b5e..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/trial/runner.py +++ /dev/null @@ -1,861 +0,0 @@ -# -*- test-case-name: twisted.trial.test.test_runner -*- -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -A miscellany of code used to run Trial tests. - -Maintainer: Jonathan Lange -""" - -__all__ = [ - 'suiteVisit', 'TestSuite', - - 'DestructiveTestSuite', 'DryRunVisitor', - 'ErrorHolder', 'LoggedSuite', 'PyUnitTestCase', - 'TestHolder', 'TestLoader', 'TrialRunner', 'TrialSuite', - - 'filenameToModule', 'isPackage', 'isPackageDirectory', 'isTestCase', - 'name', 'samefile', 'NOT_IN_TEST', - ] - -import pdb -import os, types, warnings, sys, inspect, imp -import doctest, time - -from twisted.python import reflect, log, failure, modules, filepath -from twisted.python.compat import set - -from twisted.internet import defer -from twisted.trial import util, unittest -from twisted.trial.itrial import ITestCase -from twisted.trial.reporter import UncleanWarningsReporterWrapper - -# These are imported so that they remain in the public API for t.trial.runner -from twisted.trial.unittest import suiteVisit, TestSuite - -from zope.interface import implements - -pyunit = __import__('unittest') - - - -def isPackage(module): - """Given an object return True if the object looks like a package""" - if not isinstance(module, types.ModuleType): - return False - basename = os.path.splitext(os.path.basename(module.__file__))[0] - return basename == '__init__' - - -def isPackageDirectory(dirname): - """Is the directory at path 'dirname' a Python package directory? - Returns the name of the __init__ file (it may have a weird extension) - if dirname is a package directory. Otherwise, returns False""" - for ext in zip(*imp.get_suffixes())[0]: - initFile = '__init__' + ext - if os.path.exists(os.path.join(dirname, initFile)): - return initFile - return False - - -def samefile(filename1, filename2): - """ - A hacky implementation of C{os.path.samefile}. Used by L{filenameToModule} - when the platform doesn't provide C{os.path.samefile}. Do not use this. - """ - return os.path.abspath(filename1) == os.path.abspath(filename2) - - -def filenameToModule(fn): - """ - Given a filename, do whatever possible to return a module object matching - that file. - - If the file in question is a module in Python path, properly import and - return that module. Otherwise, load the source manually. - - @param fn: A filename. - @return: A module object. - @raise ValueError: If C{fn} does not exist. - """ - if not os.path.exists(fn): - raise ValueError("%r doesn't exist" % (fn,)) - try: - ret = reflect.namedAny(reflect.filenameToModuleName(fn)) - except (ValueError, AttributeError): - # Couldn't find module. The file 'fn' is not in PYTHONPATH - return _importFromFile(fn) - # ensure that the loaded module matches the file - retFile = os.path.splitext(ret.__file__)[0] + '.py' - # not all platforms (e.g. win32) have os.path.samefile - same = getattr(os.path, 'samefile', samefile) - if os.path.isfile(fn) and not same(fn, retFile): - del sys.modules[ret.__name__] - ret = _importFromFile(fn) - return ret - - -def _importFromFile(fn, moduleName=None): - fn = _resolveDirectory(fn) - if not moduleName: - moduleName = os.path.splitext(os.path.split(fn)[-1])[0] - if moduleName in sys.modules: - return sys.modules[moduleName] - fd = open(fn, 'r') - try: - module = imp.load_source(moduleName, fn, fd) - finally: - fd.close() - return module - - -def _resolveDirectory(fn): - if os.path.isdir(fn): - initFile = isPackageDirectory(fn) - if initFile: - fn = os.path.join(fn, initFile) - else: - raise ValueError('%r is not a package directory' % (fn,)) - return fn - - -def _getMethodNameInClass(method): - """ - Find the attribute name on the method's class which refers to the method. - - For some methods, notably decorators which have not had __name__ set correctly: - - getattr(method.im_class, method.__name__) != method - """ - if getattr(method.im_class, method.__name__, object()) != method: - for alias in dir(method.im_class): - if getattr(method.im_class, alias, object()) == method: - return alias - return method.__name__ - - -class DestructiveTestSuite(TestSuite): - """ - A test suite which remove the tests once run, to minimize memory usage. - """ - - def run(self, result): - """ - Almost the same as L{TestSuite.run}, but with C{self._tests} being - empty at the end. - """ - while self._tests: - if result.shouldStop: - break - test = self._tests.pop(0) - test(result) - return result - - - -# When an error occurs outside of any test, the user will see this string -# in place of a test's name. -NOT_IN_TEST = "<not in test>" - - - -class LoggedSuite(TestSuite): - """ - Any errors logged in this suite will be reported to the L{TestResult} - object. - """ - - def run(self, result): - """ - Run the suite, storing all errors in C{result}. If an error is logged - while no tests are running, then it will be added as an error to - C{result}. - - @param result: A L{TestResult} object. - """ - observer = unittest._logObserver - observer._add() - super(LoggedSuite, self).run(result) - observer._remove() - for error in observer.getErrors(): - result.addError(TestHolder(NOT_IN_TEST), error) - observer.flushErrors() - - - -class PyUnitTestCase(object): - """ - DEPRECATED in Twisted 8.0. - - This class decorates the pyunit.TestCase class, mainly to work around the - differences between unittest in Python 2.3, 2.4, and 2.5. These - differences are:: - - - The way doctest unittests describe themselves - - Where the implementation of TestCase.run is (used to be in __call__) - - Where the test method name is kept (mangled-private or non-mangled - private variable) - - It also implements visit, which we like. - """ - - def __init__(self, test): - warnings.warn("Deprecated in Twisted 8.0.", - category=DeprecationWarning) - self._test = test - test.id = self.id - - def id(self): - cls = self._test.__class__ - tmn = getattr(self._test, '_TestCase__testMethodName', None) - if tmn is None: - # python2.5's 'unittest' module is more sensible; but different. - tmn = self._test._testMethodName - return (cls.__module__ + '.' + cls.__name__ + '.' + - tmn) - - def __repr__(self): - return 'PyUnitTestCase<%r>'%(self.id(),) - - def __call__(self, results): - return self._test(results) - - - def visit(self, visitor): - """ - Call the given visitor with the original, standard library, test case - that C{self} wraps. See L{unittest.TestCase.visit}. - - Deprecated in Twisted 8.0. - """ - warnings.warn("Test visitors deprecated in Twisted 8.0", - category=DeprecationWarning) - visitor(self._test) - - - def __getattr__(self, name): - return getattr(self._test, name) - - - -class TrialSuite(TestSuite): - """ - Suite to wrap around every single test in a C{trial} run. Used internally - by Trial to set up things necessary for Trial tests to work, regardless of - what context they are run in. - """ - - def __init__(self, tests=()): - suite = LoggedSuite(tests) - super(TrialSuite, self).__init__([suite]) - - - def _bail(self): - from twisted.internet import reactor - d = defer.Deferred() - reactor.addSystemEventTrigger('after', 'shutdown', - lambda: d.callback(None)) - reactor.fireSystemEvent('shutdown') # radix's suggestion - # As long as TestCase does crap stuff with the reactor we need to - # manually shutdown the reactor here, and that requires util.wait - # :( - # so that the shutdown event completes - unittest.TestCase('mktemp')._wait(d) - - def run(self, result): - try: - TestSuite.run(self, result) - finally: - self._bail() - - -def name(thing): - """ - @param thing: an object from modules (instance of PythonModule, - PythonAttribute), a TestCase subclass, or an instance of a TestCase. - """ - if isTestCase(thing): - # TestCase subclass - theName = reflect.qual(thing) - else: - # thing from trial, or thing from modules. - # this monstrosity exists so that modules' objects do not have to - # implement id(). -jml - try: - theName = thing.id() - except AttributeError: - theName = thing.name - return theName - - -def isTestCase(obj): - """ - @return: C{True} if C{obj} is a class that contains test cases, C{False} - otherwise. Used to find all the tests in a module. - """ - try: - return issubclass(obj, pyunit.TestCase) - except TypeError: - return False - - - -class TestHolder(object): - """ - Placeholder for a L{TestCase} inside a reporter. As far as a L{TestResult} - is concerned, this looks exactly like a unit test. - """ - - implements(ITestCase) - - failureException = None - - def __init__(self, description): - """ - @param description: A string to be displayed L{TestResult}. - """ - self.description = description - - - def __call__(self, result): - return self.run(result) - - - def id(self): - return self.description - - - def countTestCases(self): - return 0 - - - def run(self, result): - """ - This test is just a placeholder. Run the test successfully. - - @param result: The C{TestResult} to store the results in. - @type result: L{twisted.trial.itrial.ITestResult}. - """ - result.startTest(self) - result.addSuccess(self) - result.stopTest(self) - - - def shortDescription(self): - return self.description - - - -class ErrorHolder(TestHolder): - """ - Used to insert arbitrary errors into a test suite run. Provides enough - methods to look like a C{TestCase}, however, when it is run, it simply adds - an error to the C{TestResult}. The most common use-case is for when a - module fails to import. - """ - - def __init__(self, description, error): - """ - @param description: A string used by C{TestResult}s to identify this - error. Generally, this is the name of a module that failed to import. - - @param error: The error to be added to the result. Can be an `exc_info` - tuple or a L{twisted.python.failure.Failure}. - """ - super(ErrorHolder, self).__init__(description) - self.error = util.excInfoOrFailureToExcInfo(error) - - - def __repr__(self): - return "<ErrorHolder description=%r error=%s%s>" % ( - # Format the exception type and arguments explicitly, as exception - # objects do not have nice looking string formats on Python 2.4. - self.description, self.error[0].__name__, self.error[1].args) - - - def run(self, result): - """ - Run the test, reporting the error. - - @param result: The C{TestResult} to store the results in. - @type result: L{twisted.trial.itrial.ITestResult}. - """ - result.startTest(self) - result.addError(self, self.error) - result.stopTest(self) - - - def visit(self, visitor): - """ - See L{unittest.TestCase.visit}. - """ - visitor(self) - - - -class TestLoader(object): - """ - I find tests inside function, modules, files -- whatever -- then return - them wrapped inside a Test (either a L{TestSuite} or a L{TestCase}). - - @ivar methodPrefix: A string prefix. C{TestLoader} will assume that all the - methods in a class that begin with C{methodPrefix} are test cases. - - @ivar modulePrefix: A string prefix. Every module in a package that begins - with C{modulePrefix} is considered a module full of tests. - - @ivar forceGarbageCollection: A flag applied to each C{TestCase} loaded. - See L{unittest.TestCase} for more information. - - @ivar sorter: A key function used to sort C{TestCase}s, test classes, - modules and packages. - - @ivar suiteFactory: A callable which is passed a list of tests (which - themselves may be suites of tests). Must return a test suite. - """ - - methodPrefix = 'test' - modulePrefix = 'test_' - - def __init__(self): - self.suiteFactory = TestSuite - self.sorter = name - self._importErrors = [] - - def sort(self, xs): - """ - Sort the given things using L{sorter}. - - @param xs: A list of test cases, class or modules. - """ - return sorted(xs, key=self.sorter) - - def findTestClasses(self, module): - """Given a module, return all Trial test classes""" - classes = [] - for name, val in inspect.getmembers(module): - if isTestCase(val): - classes.append(val) - return self.sort(classes) - - def findByName(self, name): - """ - Return a Python object given a string describing it. - - @param name: a string which may be either a filename or a - fully-qualified Python name. - - @return: If C{name} is a filename, return the module. If C{name} is a - fully-qualified Python name, return the object it refers to. - """ - if os.path.exists(name): - return filenameToModule(name) - return reflect.namedAny(name) - - def loadModule(self, module): - """ - Return a test suite with all the tests from a module. - - Included are TestCase subclasses and doctests listed in the module's - __doctests__ module. If that's not good for you, put a function named - either C{testSuite} or C{test_suite} in your module that returns a - TestSuite, and I'll use the results of that instead. - - If C{testSuite} and C{test_suite} are both present, then I'll use - C{testSuite}. - """ - ## XXX - should I add an optional parameter to disable the check for - ## a custom suite. - ## OR, should I add another method - if not isinstance(module, types.ModuleType): - raise TypeError("%r is not a module" % (module,)) - if hasattr(module, 'testSuite'): - return module.testSuite() - elif hasattr(module, 'test_suite'): - return module.test_suite() - suite = self.suiteFactory() - for testClass in self.findTestClasses(module): - suite.addTest(self.loadClass(testClass)) - if not hasattr(module, '__doctests__'): - return suite - docSuite = self.suiteFactory() - for doctest in module.__doctests__: - docSuite.addTest(self.loadDoctests(doctest)) - return self.suiteFactory([suite, docSuite]) - loadTestsFromModule = loadModule - - def loadClass(self, klass): - """ - Given a class which contains test cases, return a sorted list of - C{TestCase} instances. - """ - if not (isinstance(klass, type) or isinstance(klass, types.ClassType)): - raise TypeError("%r is not a class" % (klass,)) - if not isTestCase(klass): - raise ValueError("%r is not a test case" % (klass,)) - names = self.getTestCaseNames(klass) - tests = self.sort([self._makeCase(klass, self.methodPrefix+name) - for name in names]) - return self.suiteFactory(tests) - loadTestsFromTestCase = loadClass - - def getTestCaseNames(self, klass): - """ - Given a class that contains C{TestCase}s, return a list of names of - methods that probably contain tests. - """ - return reflect.prefixedMethodNames(klass, self.methodPrefix) - - def loadMethod(self, method): - """ - Given a method of a C{TestCase} that represents a test, return a - C{TestCase} instance for that test. - """ - if not isinstance(method, types.MethodType): - raise TypeError("%r not a method" % (method,)) - return self._makeCase(method.im_class, _getMethodNameInClass(method)) - - def _makeCase(self, klass, methodName): - return klass(methodName) - - def loadPackage(self, package, recurse=False): - """ - Load tests from a module object representing a package, and return a - TestSuite containing those tests. - - Tests are only loaded from modules whose name begins with 'test_' - (or whatever C{modulePrefix} is set to). - - @param package: a types.ModuleType object (or reasonable facsimilie - obtained by importing) which may contain tests. - - @param recurse: A boolean. If True, inspect modules within packages - within the given package (and so on), otherwise, only inspect modules - in the package itself. - - @raise: TypeError if 'package' is not a package. - - @return: a TestSuite created with my suiteFactory, containing all the - tests. - """ - if not isPackage(package): - raise TypeError("%r is not a package" % (package,)) - pkgobj = modules.getModule(package.__name__) - if recurse: - discovery = pkgobj.walkModules() - else: - discovery = pkgobj.iterModules() - discovered = [] - for disco in discovery: - if disco.name.split(".")[-1].startswith(self.modulePrefix): - discovered.append(disco) - suite = self.suiteFactory() - for modinfo in self.sort(discovered): - try: - module = modinfo.load() - except: - thingToAdd = ErrorHolder(modinfo.name, failure.Failure()) - else: - thingToAdd = self.loadModule(module) - suite.addTest(thingToAdd) - return suite - - def loadDoctests(self, module): - """ - Return a suite of tests for all the doctests defined in C{module}. - - @param module: A module object or a module name. - """ - if isinstance(module, str): - try: - module = reflect.namedAny(module) - except: - return ErrorHolder(module, failure.Failure()) - if not inspect.ismodule(module): - warnings.warn("trial only supports doctesting modules") - return - extraArgs = {} - if sys.version_info > (2, 4): - # Work around Python issue2604: DocTestCase.tearDown clobbers globs - def saveGlobals(test): - """ - Save C{test.globs} and replace it with a copy so that if - necessary, the original will be available for the next test - run. - """ - test._savedGlobals = getattr(test, '_savedGlobals', test.globs) - test.globs = test._savedGlobals.copy() - extraArgs['setUp'] = saveGlobals - return doctest.DocTestSuite(module, **extraArgs) - - def loadAnything(self, thing, recurse=False): - """ - Given a Python object, return whatever tests that are in it. Whatever - 'in' might mean. - - @param thing: A Python object. A module, method, class or package. - @param recurse: Whether or not to look in subpackages of packages. - Defaults to False. - - @return: A C{TestCase} or C{TestSuite}. - """ - if isinstance(thing, types.ModuleType): - if isPackage(thing): - return self.loadPackage(thing, recurse) - return self.loadModule(thing) - elif isinstance(thing, types.ClassType): - return self.loadClass(thing) - elif isinstance(thing, type): - return self.loadClass(thing) - elif isinstance(thing, types.MethodType): - return self.loadMethod(thing) - raise TypeError("No loader for %r. Unrecognized type" % (thing,)) - - def loadByName(self, name, recurse=False): - """ - Given a string representing a Python object, return whatever tests - are in that object. - - If C{name} is somehow inaccessible (e.g. the module can't be imported, - there is no Python object with that name etc) then return an - L{ErrorHolder}. - - @param name: The fully-qualified name of a Python object. - """ - try: - thing = self.findByName(name) - except: - return ErrorHolder(name, failure.Failure()) - return self.loadAnything(thing, recurse) - loadTestsFromName = loadByName - - def loadByNames(self, names, recurse=False): - """ - Construct a TestSuite containing all the tests found in 'names', where - names is a list of fully qualified python names and/or filenames. The - suite returned will have no duplicate tests, even if the same object - is named twice. - """ - things = [] - errors = [] - for name in names: - try: - things.append(self.findByName(name)) - except: - errors.append(ErrorHolder(name, failure.Failure())) - suites = [self.loadAnything(thing, recurse) - for thing in self._uniqueTests(things)] - suites.extend(errors) - return self.suiteFactory(suites) - - - def _uniqueTests(self, things): - """ - Gather unique suite objects from loaded things. This will guarantee - uniqueness of inherited methods on TestCases which would otherwise hash - to same value and collapse to one test unexpectedly if using simpler - means: e.g. set(). - """ - entries = [] - for thing in things: - if isinstance(thing, types.MethodType): - entries.append((thing, thing.im_class)) - else: - entries.append((thing,)) - return [entry[0] for entry in set(entries)] - - - -class DryRunVisitor(object): - """ - A visitor that makes a reporter think that every test visited has run - successfully. - """ - - def __init__(self, reporter): - """ - @param reporter: A C{TestResult} object. - """ - self.reporter = reporter - - - def markSuccessful(self, testCase): - """ - Convince the reporter that this test has been run successfully. - """ - self.reporter.startTest(testCase) - self.reporter.addSuccess(testCase) - self.reporter.stopTest(testCase) - - - -class TrialRunner(object): - """ - A specialised runner that the trial front end uses. - """ - - DEBUG = 'debug' - DRY_RUN = 'dry-run' - - def _getDebugger(self): - dbg = pdb.Pdb() - try: - import readline - except ImportError: - print "readline module not available" - sys.exc_clear() - for path in ('.pdbrc', 'pdbrc'): - if os.path.exists(path): - try: - rcFile = file(path, 'r') - except IOError: - sys.exc_clear() - else: - dbg.rcLines.extend(rcFile.readlines()) - return dbg - - - def _setUpTestdir(self): - self._tearDownLogFile() - currentDir = os.getcwd() - base = filepath.FilePath(self.workingDirectory) - testdir, self._testDirLock = util._unusedTestDirectory(base) - os.chdir(testdir.path) - return currentDir - - - def _tearDownTestdir(self, oldDir): - os.chdir(oldDir) - self._testDirLock.unlock() - - - _log = log - def _makeResult(self): - reporter = self.reporterFactory(self.stream, self.tbformat, - self.rterrors, self._log) - if self.uncleanWarnings: - reporter = UncleanWarningsReporterWrapper(reporter) - return reporter - - def __init__(self, reporterFactory, - mode=None, - logfile='test.log', - stream=sys.stdout, - profile=False, - tracebackFormat='default', - realTimeErrors=False, - uncleanWarnings=False, - workingDirectory=None, - forceGarbageCollection=False): - self.reporterFactory = reporterFactory - self.logfile = logfile - self.mode = mode - self.stream = stream - self.tbformat = tracebackFormat - self.rterrors = realTimeErrors - self.uncleanWarnings = uncleanWarnings - self._result = None - self.workingDirectory = workingDirectory or '_trial_temp' - self._logFileObserver = None - self._logFileObject = None - self._forceGarbageCollection = forceGarbageCollection - if profile: - self.run = util.profiled(self.run, 'profile.data') - - def _tearDownLogFile(self): - if self._logFileObserver is not None: - log.removeObserver(self._logFileObserver.emit) - self._logFileObserver = None - if self._logFileObject is not None: - self._logFileObject.close() - self._logFileObject = None - - def _setUpLogFile(self): - self._tearDownLogFile() - if self.logfile == '-': - logFile = sys.stdout - else: - logFile = file(self.logfile, 'a') - self._logFileObject = logFile - self._logFileObserver = log.FileLogObserver(logFile) - log.startLoggingWithObserver(self._logFileObserver.emit, 0) - - - def run(self, test): - """ - Run the test or suite and return a result object. - """ - test = unittest.decorate(test, ITestCase) - if self._forceGarbageCollection: - test = unittest.decorate( - test, unittest._ForceGarbageCollectionDecorator) - return self._runWithoutDecoration(test) - - - def _runWithoutDecoration(self, test): - """ - Private helper that runs the given test but doesn't decorate it. - """ - result = self._makeResult() - # decorate the suite with reactor cleanup and log starting - # This should move out of the runner and be presumed to be - # present - suite = TrialSuite([test]) - startTime = time.time() - if self.mode == self.DRY_RUN: - for single in unittest._iterateTests(suite): - result.startTest(single) - result.addSuccess(single) - result.stopTest(single) - else: - if self.mode == self.DEBUG: - # open question - should this be self.debug() instead. - debugger = self._getDebugger() - run = lambda: debugger.runcall(suite.run, result) - else: - run = lambda: suite.run(result) - - oldDir = self._setUpTestdir() - try: - self._setUpLogFile() - run() - finally: - self._tearDownLogFile() - self._tearDownTestdir(oldDir) - - endTime = time.time() - done = getattr(result, 'done', None) - if done is None: - warnings.warn( - "%s should implement done() but doesn't. Falling back to " - "printErrors() and friends." % reflect.qual(result.__class__), - category=DeprecationWarning, stacklevel=3) - result.printErrors() - result.writeln(result.separator) - result.writeln('Ran %d tests in %.3fs', result.testsRun, - endTime - startTime) - result.write('\n') - result.printSummary() - else: - result.done() - return result - - - def runUntilFailure(self, test): - """ - Repeatedly run C{test} until it fails. - """ - count = 0 - while True: - count += 1 - self.stream.write("Test Pass %d\n" % (count,)) - if count == 1: - result = self.run(test) - else: - result = self._runWithoutDecoration(test) - if result.testsRun == 0: - break - if not result.wasSuccessful(): - break - return result |