aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_pcp.py
blob: 71de8bbc38de7af1ba4d7a637376d84a7fd7dd11 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# -*- Python -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.


__version__ = '$Revision: 1.5 $'[11:-2]

from StringIO import StringIO
from twisted.trial import unittest
from twisted.protocols import pcp

# Goal:

# Take a Protocol instance.  Own all outgoing data - anything that
# would go to p.transport.write.  Own all incoming data - anything
# that comes to p.dataReceived.

# I need:
# Something with the AbstractFileDescriptor interface.
# That is:
#  - acts as a Transport
#    - has a method write()
#    - which buffers
#  - acts as a Consumer
#    - has a registerProducer, unRegisterProducer
#    - tells the Producer to back off (pauseProducing) when its buffer is full.
#    - tells the Producer to resumeProducing when its buffer is not so full.
#  - acts as a Producer
#    - calls registerProducer
#    - calls write() on consumers
#    - honors requests to pause/resume producing
#    - honors stopProducing, and passes it along to upstream Producers


class DummyTransport:
    """A dumb transport to wrap around."""

    def __init__(self):
        self._writes = []

    def write(self, data):
        self._writes.append(data)

    def getvalue(self):
        return ''.join(self._writes)

class DummyProducer:
    resumed = False
    stopped = False
    paused = False

    def __init__(self, consumer):
        self.consumer = consumer

    def resumeProducing(self):
        self.resumed = True
        self.paused = False

    def pauseProducing(self):
        self.paused = True

    def stopProducing(self):
        self.stopped = True


class DummyConsumer(DummyTransport):
    producer = None
    finished = False
    unregistered = True

    def registerProducer(self, producer, streaming):
        self.producer = (producer, streaming)

    def unregisterProducer(self):
        self.unregistered = True

    def finish(self):
        self.finished = True

class TransportInterfaceTest(unittest.TestCase):
    proxyClass = pcp.BasicProducerConsumerProxy

    def setUp(self):
        self.underlying = DummyConsumer()
        self.transport = self.proxyClass(self.underlying)

    def testWrite(self):
        self.transport.write("some bytes")

class ConsumerInterfaceTest:
    """Test ProducerConsumerProxy as a Consumer.

    Normally we have ProducingServer -> ConsumingTransport.

    If I am to go between (Server -> Shaper -> Transport), I have to
    play the role of Consumer convincingly for the ProducingServer.
    """

    def setUp(self):
        self.underlying = DummyConsumer()
        self.consumer = self.proxyClass(self.underlying)
        self.producer = DummyProducer(self.consumer)

    def testRegisterPush(self):
        self.consumer.registerProducer(self.producer, True)
        ## Consumer should NOT have called PushProducer.resumeProducing
        self.failIf(self.producer.resumed)

    ## I'm I'm just a proxy, should I only do resumeProducing when
    ## I get poked myself?
    #def testRegisterPull(self):
    #    self.consumer.registerProducer(self.producer, False)
    #    ## Consumer SHOULD have called PushProducer.resumeProducing
    #    self.failUnless(self.producer.resumed)

    def testUnregister(self):
        self.consumer.registerProducer(self.producer, False)
        self.consumer.unregisterProducer()
        # Now when the consumer would ordinarily want more data, it
        # shouldn't ask producer for it.
        # The most succinct way to trigger "want more data" is to proxy for
        # a PullProducer and have someone ask me for data.
        self.producer.resumed = False
        self.consumer.resumeProducing()
        self.failIf(self.producer.resumed)

    def testFinish(self):
        self.consumer.registerProducer(self.producer, False)
        self.consumer.finish()
        # I guess finish should behave like unregister?
        self.producer.resumed = False
        self.consumer.resumeProducing()
        self.failIf(self.producer.resumed)


class ProducerInterfaceTest:
    """Test ProducerConsumerProxy as a Producer.

    Normally we have ProducingServer -> ConsumingTransport.

    If I am to go between (Server -> Shaper -> Transport), I have to
    play the role of Producer convincingly for the ConsumingTransport.
    """

    def setUp(self):
        self.consumer = DummyConsumer()
        self.producer = self.proxyClass(self.consumer)

    def testRegistersProducer(self):
        self.assertEqual(self.consumer.producer[0], self.producer)

    def testPause(self):
        self.producer.pauseProducing()
        self.producer.write("yakkity yak")
        self.failIf(self.consumer.getvalue(),
                    "Paused producer should not have sent data.")

    def testResume(self):
        self.producer.pauseProducing()
        self.producer.resumeProducing()
        self.producer.write("yakkity yak")
        self.assertEqual(self.consumer.getvalue(), "yakkity yak")

    def testResumeNoEmptyWrite(self):
        self.producer.pauseProducing()
        self.producer.resumeProducing()
        self.assertEqual(len(self.consumer._writes), 0,
                             "Resume triggered an empty write.")

    def testResumeBuffer(self):
        self.producer.pauseProducing()
        self.producer.write("buffer this")
        self.producer.resumeProducing()
        self.assertEqual(self.consumer.getvalue(), "buffer this")

    def testStop(self):
        self.producer.stopProducing()
        self.producer.write("yakkity yak")
        self.failIf(self.consumer.getvalue(),
                    "Stopped producer should not have sent data.")


class PCP_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase):
    proxyClass = pcp.BasicProducerConsumerProxy

class PCPII_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase):
    proxyClass = pcp.ProducerConsumerProxy

class PCP_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase):
    proxyClass = pcp.BasicProducerConsumerProxy

class PCPII_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase):
    proxyClass = pcp.ProducerConsumerProxy

class ProducerProxyTest(unittest.TestCase):
    """Producer methods on me should be relayed to the Producer I proxy.
    """
    proxyClass = pcp.BasicProducerConsumerProxy

    def setUp(self):
        self.proxy = self.proxyClass(None)
        self.parentProducer = DummyProducer(self.proxy)
        self.proxy.registerProducer(self.parentProducer, True)

    def testStop(self):
        self.proxy.stopProducing()
        self.failUnless(self.parentProducer.stopped)


class ConsumerProxyTest(unittest.TestCase):
    """Consumer methods on me should be relayed to the Consumer I proxy.
    """
    proxyClass = pcp.BasicProducerConsumerProxy

    def setUp(self):
        self.underlying = DummyConsumer()
        self.consumer = self.proxyClass(self.underlying)

    def testWrite(self):
        # NOTE: This test only valid for streaming (Push) systems.
        self.consumer.write("some bytes")
        self.assertEqual(self.underlying.getvalue(), "some bytes")

    def testFinish(self):
        self.consumer.finish()
        self.failUnless(self.underlying.finished)

    def testUnregister(self):
        self.consumer.unregisterProducer()
        self.failUnless(self.underlying.unregistered)


class PullProducerTest:
    def setUp(self):
        self.underlying = DummyConsumer()
        self.proxy = self.proxyClass(self.underlying)
        self.parentProducer = DummyProducer(self.proxy)
        self.proxy.registerProducer(self.parentProducer, True)

    def testHoldWrites(self):
        self.proxy.write("hello")
        # Consumer should get no data before it says resumeProducing.
        self.failIf(self.underlying.getvalue(),
                    "Pulling Consumer got data before it pulled.")

    def testPull(self):
        self.proxy.write("hello")
        self.proxy.resumeProducing()
        self.assertEqual(self.underlying.getvalue(), "hello")

    def testMergeWrites(self):
        self.proxy.write("hello ")
        self.proxy.write("sunshine")
        self.proxy.resumeProducing()
        nwrites = len(self.underlying._writes)
        self.assertEqual(nwrites, 1, "Pull resulted in %d writes instead "
                             "of 1." % (nwrites,))
        self.assertEqual(self.underlying.getvalue(), "hello sunshine")


    def testLateWrite(self):
        # consumer sends its initial pull before we have data
        self.proxy.resumeProducing()
        self.proxy.write("data")
        # This data should answer that pull request.
        self.assertEqual(self.underlying.getvalue(), "data")

class PCP_PullProducerTest(PullProducerTest, unittest.TestCase):
    class proxyClass(pcp.BasicProducerConsumerProxy):
        iAmStreaming = False

class PCPII_PullProducerTest(PullProducerTest, unittest.TestCase):
    class proxyClass(pcp.ProducerConsumerProxy):
        iAmStreaming = False

# Buffering!

class BufferedConsumerTest(unittest.TestCase):
    """As a consumer, ask the producer to pause after too much data."""

    proxyClass = pcp.ProducerConsumerProxy

    def setUp(self):
        self.underlying = DummyConsumer()
        self.proxy = self.proxyClass(self.underlying)
        self.proxy.bufferSize = 100

        self.parentProducer = DummyProducer(self.proxy)
        self.proxy.registerProducer(self.parentProducer, True)

    def testRegisterPull(self):
        self.proxy.registerProducer(self.parentProducer, False)
        ## Consumer SHOULD have called PushProducer.resumeProducing
        self.failUnless(self.parentProducer.resumed)

    def testPauseIntercept(self):
        self.proxy.pauseProducing()
        self.failIf(self.parentProducer.paused)

    def testResumeIntercept(self):
        self.proxy.pauseProducing()
        self.proxy.resumeProducing()
        # With a streaming producer, just because the proxy was resumed is
        # not necessarily a reason to resume the parent producer.  The state
        # of the buffer should decide that.
        self.failIf(self.parentProducer.resumed)

    def testTriggerPause(self):
        """Make sure I say \"when.\""""

        # Pause the proxy so data sent to it builds up in its buffer.
        self.proxy.pauseProducing()
        self.failIf(self.parentProducer.paused, "don't pause yet")
        self.proxy.write("x" * 51)
        self.failIf(self.parentProducer.paused, "don't pause yet")
        self.proxy.write("x" * 51)
        self.failUnless(self.parentProducer.paused)

    def testTriggerResume(self):
        """Make sure I resumeProducing when my buffer empties."""
        self.proxy.pauseProducing()
        self.proxy.write("x" * 102)
        self.failUnless(self.parentProducer.paused, "should be paused")
        self.proxy.resumeProducing()
        # Resuming should have emptied my buffer, so I should tell my
        # parent to resume too.
        self.failIf(self.parentProducer.paused,
                    "Producer should have resumed.")
        self.failIf(self.proxy.producerPaused)

class BufferedPullTests(unittest.TestCase):
    class proxyClass(pcp.ProducerConsumerProxy):
        iAmStreaming = False

        def _writeSomeData(self, data):
            pcp.ProducerConsumerProxy._writeSomeData(self, data[:100])
            return min(len(data), 100)

    def setUp(self):
        self.underlying = DummyConsumer()
        self.proxy = self.proxyClass(self.underlying)
        self.proxy.bufferSize = 100

        self.parentProducer = DummyProducer(self.proxy)
        self.proxy.registerProducer(self.parentProducer, False)

    def testResumePull(self):
        # If proxy has no data to send on resumeProducing, it had better pull
        # some from its PullProducer.
        self.parentProducer.resumed = False
        self.proxy.resumeProducing()
        self.failUnless(self.parentProducer.resumed)

    def testLateWriteBuffering(self):
        # consumer sends its initial pull before we have data
        self.proxy.resumeProducing()
        self.proxy.write("datum" * 21)
        # This data should answer that pull request.
        self.assertEqual(self.underlying.getvalue(), "datum" * 20)
        # but there should be some left over
        self.assertEqual(self.proxy._buffer, ["datum"])


# TODO:
#  test that web request finishing bug (when we weren't proxying
#    unregisterProducer but were proxying finish, web file transfers
#    would hang on the last block.)
#  test what happens if writeSomeBytes decided to write zero bytes.