1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.conch.tap}.
"""
try:
import Crypto.Cipher.DES3
except:
Crypto = None
try:
import pyasn1
except ImportError:
pyasn1 = None
try:
from twisted.conch import unix
except ImportError:
unix = None
if Crypto and pyasn1 and unix:
from twisted.conch import tap
from twisted.conch.openssh_compat.factory import OpenSSHFactory
from twisted.python.compat import set
from twisted.application.internet import StreamServerEndpointService
from twisted.cred import error
from twisted.cred.credentials import IPluggableAuthenticationModules
from twisted.cred.credentials import ISSHPrivateKey
from twisted.cred.credentials import IUsernamePassword, UsernamePassword
from twisted.trial.unittest import TestCase
class MakeServiceTest(TestCase):
"""
Tests for L{tap.makeService}.
"""
if not Crypto:
skip = "can't run w/o PyCrypto"
if not pyasn1:
skip = "Cannot run without PyASN1"
if not unix:
skip = "can't run on non-posix computers"
usernamePassword = ('iamuser', 'thisispassword')
def setUp(self):
"""
Create a file with two users.
"""
self.filename = self.mktemp()
f = open(self.filename, 'wb+')
f.write(':'.join(self.usernamePassword))
f.close()
self.options = tap.Options()
def test_basic(self):
"""
L{tap.makeService} returns a L{StreamServerEndpointService} instance
running on TCP port 22, and the linked protocol factory is an instance
of L{OpenSSHFactory}.
"""
config = tap.Options()
service = tap.makeService(config)
self.assertIsInstance(service, StreamServerEndpointService)
self.assertEqual(service.endpoint._port, 22)
self.assertIsInstance(service.factory, OpenSSHFactory)
def test_defaultAuths(self):
"""
Make sure that if the C{--auth} command-line option is not passed,
the default checkers are (for backwards compatibility): SSH, UNIX, and
PAM if available
"""
numCheckers = 2
try:
from twisted.cred import pamauth
self.assertIn(IPluggableAuthenticationModules,
self.options['credInterfaces'],
"PAM should be one of the modules")
numCheckers += 1
except ImportError:
pass
self.assertIn(ISSHPrivateKey, self.options['credInterfaces'],
"SSH should be one of the default checkers")
self.assertIn(IUsernamePassword, self.options['credInterfaces'],
"UNIX should be one of the default checkers")
self.assertEqual(numCheckers, len(self.options['credCheckers']),
"There should be %d checkers by default" % (numCheckers,))
def test_authAdded(self):
"""
The C{--auth} command-line option will add a checker to the list of
checkers, and it should be the only auth checker
"""
self.options.parseOptions(['--auth', 'file:' + self.filename])
self.assertEqual(len(self.options['credCheckers']), 1)
def test_authFailure(self):
"""
The checker created by the C{--auth} command-line option returns a
L{Deferred} that fails with L{UnauthorizedLogin} when
presented with credentials that are unknown to that checker.
"""
self.options.parseOptions(['--auth', 'file:' + self.filename])
checker = self.options['credCheckers'][-1]
invalid = UsernamePassword(self.usernamePassword[0], 'fake')
# Wrong password should raise error
return self.assertFailure(
checker.requestAvatarId(invalid), error.UnauthorizedLogin)
def test_authSuccess(self):
"""
The checker created by the C{--auth} command-line option returns a
L{Deferred} that returns the avatar id when presented with credentials
that are known to that checker.
"""
self.options.parseOptions(['--auth', 'file:' + self.filename])
checker = self.options['credCheckers'][-1]
correct = UsernamePassword(*self.usernamePassword)
d = checker.requestAvatarId(correct)
def checkSuccess(username):
self.assertEqual(username, correct.username)
return d.addCallback(checkSuccess)
def test_checkersPamAuth(self):
"""
The L{OpenSSHFactory} built by L{tap.makeService} has a portal with
L{IPluggableAuthenticationModules}, L{ISSHPrivateKey} and
L{IUsernamePassword} interfaces registered as checkers if C{pamauth} is
available.
"""
# Fake the presence of pamauth, even if PyPAM is not installed
self.patch(tap, "pamauth", object())
config = tap.Options()
service = tap.makeService(config)
portal = service.factory.portal
self.assertEqual(
set(portal.checkers.keys()),
set([IPluggableAuthenticationModules, ISSHPrivateKey,
IUsernamePassword]))
def test_checkersWithoutPamAuth(self):
"""
The L{OpenSSHFactory} built by L{tap.makeService} has a portal with
L{ISSHPrivateKey} and L{IUsernamePassword} interfaces registered as
checkers if C{pamauth} is not available.
"""
# Fake the absence of pamauth, even if PyPAM is installed
self.patch(tap, "pamauth", None)
config = tap.Options()
service = tap.makeService(config)
portal = service.factory.portal
self.assertEqual(
set(portal.checkers.keys()),
set([ISSHPrivateKey, IUsernamePassword]))
|