aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_tap.py
blob: 44acecd3825ac761baaa8e567040e22f5ee4c4a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.conch.tap}.
"""

try:
    import Crypto.Cipher.DES3
except:
    Crypto = None

try:
    import pyasn1
except ImportError:
    pyasn1 = None

try:
    from twisted.conch import unix
except ImportError:
    unix = None

if Crypto and pyasn1 and unix:
    from twisted.conch import tap
    from twisted.conch.openssh_compat.factory import OpenSSHFactory

from twisted.python.compat import set
from twisted.application.internet import StreamServerEndpointService
from twisted.cred import error
from twisted.cred.credentials import IPluggableAuthenticationModules
from twisted.cred.credentials import ISSHPrivateKey
from twisted.cred.credentials import IUsernamePassword, UsernamePassword

from twisted.trial.unittest import TestCase



class MakeServiceTest(TestCase):
    """
    Tests for L{tap.makeService}.
    """

    if not Crypto:
        skip = "can't run w/o PyCrypto"

    if not pyasn1:
        skip = "Cannot run without PyASN1"

    if not unix:
        skip = "can't run on non-posix computers"

    usernamePassword = ('iamuser', 'thisispassword')

    def setUp(self):
        """
        Create a file with two users.
        """
        self.filename = self.mktemp()
        f = open(self.filename, 'wb+')
        f.write(':'.join(self.usernamePassword))
        f.close()
        self.options = tap.Options()


    def test_basic(self):
        """
        L{tap.makeService} returns a L{StreamServerEndpointService} instance
        running on TCP port 22, and the linked protocol factory is an instance
        of L{OpenSSHFactory}.
        """
        config = tap.Options()
        service = tap.makeService(config)
        self.assertIsInstance(service, StreamServerEndpointService)
        self.assertEqual(service.endpoint._port, 22)
        self.assertIsInstance(service.factory, OpenSSHFactory)


    def test_defaultAuths(self):
        """
        Make sure that if the C{--auth} command-line option is not passed,
        the default checkers are (for backwards compatibility): SSH, UNIX, and
        PAM if available
        """
        numCheckers = 2
        try:
            from twisted.cred import pamauth
            self.assertIn(IPluggableAuthenticationModules,
                self.options['credInterfaces'],
                "PAM should be one of the modules")
            numCheckers += 1
        except ImportError:
            pass

        self.assertIn(ISSHPrivateKey, self.options['credInterfaces'],
            "SSH should be one of the default checkers")
        self.assertIn(IUsernamePassword, self.options['credInterfaces'],
            "UNIX should be one of the default checkers")
        self.assertEqual(numCheckers, len(self.options['credCheckers']),
            "There should be %d checkers by default" % (numCheckers,))


    def test_authAdded(self):
        """
        The C{--auth} command-line option will add a checker to the list of
        checkers, and it should be the only auth checker
        """
        self.options.parseOptions(['--auth', 'file:' + self.filename])
        self.assertEqual(len(self.options['credCheckers']), 1)


    def test_authFailure(self):
        """
        The checker created by the C{--auth} command-line option returns a
        L{Deferred} that fails with L{UnauthorizedLogin} when
        presented with credentials that are unknown to that checker.
        """
        self.options.parseOptions(['--auth', 'file:' + self.filename])
        checker = self.options['credCheckers'][-1]
        invalid = UsernamePassword(self.usernamePassword[0], 'fake')
        # Wrong password should raise error
        return self.assertFailure(
            checker.requestAvatarId(invalid), error.UnauthorizedLogin)


    def test_authSuccess(self):
        """
        The checker created by the C{--auth} command-line option returns a
        L{Deferred} that returns the avatar id when presented with credentials
        that are known to that checker.
        """
        self.options.parseOptions(['--auth', 'file:' + self.filename])
        checker = self.options['credCheckers'][-1]
        correct = UsernamePassword(*self.usernamePassword)
        d = checker.requestAvatarId(correct)

        def checkSuccess(username):
            self.assertEqual(username, correct.username)

        return d.addCallback(checkSuccess)


    def test_checkersPamAuth(self):
        """
        The L{OpenSSHFactory} built by L{tap.makeService} has a portal with
        L{IPluggableAuthenticationModules}, L{ISSHPrivateKey} and
        L{IUsernamePassword} interfaces registered as checkers if C{pamauth} is
        available.
        """
        # Fake the presence of pamauth, even if PyPAM is not installed
        self.patch(tap, "pamauth", object())
        config = tap.Options()
        service = tap.makeService(config)
        portal = service.factory.portal
        self.assertEqual(
            set(portal.checkers.keys()),
            set([IPluggableAuthenticationModules, ISSHPrivateKey,
                 IUsernamePassword]))


    def test_checkersWithoutPamAuth(self):
        """
        The L{OpenSSHFactory} built by L{tap.makeService} has a portal with
        L{ISSHPrivateKey} and L{IUsernamePassword} interfaces registered as
        checkers if C{pamauth} is not available.
        """
        # Fake the absence of pamauth, even if PyPAM is installed
        self.patch(tap, "pamauth", None)
        config = tap.Options()
        service = tap.makeService(config)
        portal = service.factory.portal
        self.assertEqual(
            set(portal.checkers.keys()),
            set([ISSHPrivateKey, IUsernamePassword]))