aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/pcp.py
blob: 8970f9019c67e47685e5f1da713446d9ff81f467 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# -*- test-case-name: twisted.test.test_pcp -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Producer-Consumer Proxy.
"""

from zope.interface import implements

from twisted.internet import interfaces


class BasicProducerConsumerProxy:
    """
    I can act as a man in the middle between any Producer and Consumer.

    @ivar producer: the Producer I subscribe to.
    @type producer: L{IProducer<interfaces.IProducer>}
    @ivar consumer: the Consumer I publish to.
    @type consumer: L{IConsumer<interfaces.IConsumer>}
    @ivar paused: As a Producer, am I paused?
    @type paused: bool
    """
    implements(interfaces.IProducer, interfaces.IConsumer)

    consumer = None
    producer = None
    producerIsStreaming = None
    iAmStreaming = True
    outstandingPull = False
    paused = False
    stopped = False

    def __init__(self, consumer):
        self._buffer = []
        if consumer is not None:
            self.consumer = consumer
            consumer.registerProducer(self, self.iAmStreaming)

    # Producer methods:

    def pauseProducing(self):
        self.paused = True
        if self.producer:
            self.producer.pauseProducing()

    def resumeProducing(self):
        self.paused = False
        if self._buffer:
            # TODO: Check to see if consumer supports writeSeq.
            self.consumer.write(''.join(self._buffer))
            self._buffer[:] = []
        else:
            if not self.iAmStreaming:
                self.outstandingPull = True

        if self.producer is not None:
            self.producer.resumeProducing()

    def stopProducing(self):
        if self.producer is not None:
            self.producer.stopProducing()
        if self.consumer is not None:
            del self.consumer

    # Consumer methods:

    def write(self, data):
        if self.paused or (not self.iAmStreaming and not self.outstandingPull):
            # We could use that fifo queue here.
            self._buffer.append(data)

        elif self.consumer is not None:
            self.consumer.write(data)
            self.outstandingPull = False

    def finish(self):
        if self.consumer is not None:
            self.consumer.finish()
        self.unregisterProducer()

    def registerProducer(self, producer, streaming):
        self.producer = producer
        self.producerIsStreaming = streaming

    def unregisterProducer(self):
        if self.producer is not None:
            del self.producer
            del self.producerIsStreaming
        if self.consumer:
            self.consumer.unregisterProducer()

    def __repr__(self):
        return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)


class ProducerConsumerProxy(BasicProducerConsumerProxy):
    """ProducerConsumerProxy with a finite buffer.

    When my buffer fills up, I have my parent Producer pause until my buffer
    has room in it again.
    """
    # Copies much from abstract.FileDescriptor
    bufferSize = 2**2**2**2

    producerPaused = False
    unregistered = False

    def pauseProducing(self):
        # Does *not* call up to ProducerConsumerProxy to relay the pause
        # message through to my parent Producer.
        self.paused = True

    def resumeProducing(self):
        self.paused = False
        if self._buffer:
            data = ''.join(self._buffer)
            bytesSent = self._writeSomeData(data)
            if bytesSent < len(data):
                unsent = data[bytesSent:]
                assert not self.iAmStreaming, (
                    "Streaming producer did not write all its data.")
                self._buffer[:] = [unsent]
            else:
                self._buffer[:] = []
        else:
            bytesSent = 0

        if (self.unregistered and bytesSent and not self._buffer and
            self.consumer is not None):
            self.consumer.unregisterProducer()

        if not self.iAmStreaming:
            self.outstandingPull = not bytesSent

        if self.producer is not None:
            bytesBuffered = sum([len(s) for s in self._buffer])
            # TODO: You can see here the potential for high and low
            # watermarks, where bufferSize would be the high mark when we
            # ask the upstream producer to pause, and we wouldn't have
            # it resume again until it hit the low mark.  Or if producer
            # is Pull, maybe we'd like to pull from it as much as necessary
            # to keep our buffer full to the low mark, so we're never caught
            # without something to send.
            if self.producerPaused and (bytesBuffered < self.bufferSize):
                # Now that our buffer is empty,
                self.producerPaused = False
                self.producer.resumeProducing()
            elif self.outstandingPull:
                # I did not have any data to write in response to a pull,
                # so I'd better pull some myself.
                self.producer.resumeProducing()

    def write(self, data):
        if self.paused or (not self.iAmStreaming and not self.outstandingPull):
            # We could use that fifo queue here.
            self._buffer.append(data)

        elif self.consumer is not None:
            assert not self._buffer, (
                "Writing fresh data to consumer before my buffer is empty!")
            # I'm going to use _writeSomeData here so that there is only one
            # path to self.consumer.write.  But it doesn't actually make sense,
            # if I am streaming, for some data to not be all data.  But maybe I
            # am not streaming, but I am writing here anyway, because there was
            # an earlier request for data which was not answered.
            bytesSent = self._writeSomeData(data)
            self.outstandingPull = False
            if not bytesSent == len(data):
                assert not self.iAmStreaming, (
                    "Streaming producer did not write all its data.")
                self._buffer.append(data[bytesSent:])

        if (self.producer is not None) and self.producerIsStreaming:
            bytesBuffered = sum([len(s) for s in self._buffer])
            if bytesBuffered >= self.bufferSize:

                self.producer.pauseProducing()
                self.producerPaused = True

    def registerProducer(self, producer, streaming):
        self.unregistered = False
        BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
        if not streaming:
            producer.resumeProducing()

    def unregisterProducer(self):
        if self.producer is not None:
            del self.producer
            del self.producerIsStreaming
        self.unregistered = True
        if self.consumer and not self._buffer:
            self.consumer.unregisterProducer()

    def _writeSomeData(self, data):
        """Write as much of this data as possible.

        @returns: The number of bytes written.
        """
        if self.consumer is None:
            return 0
        self.consumer.write(data)
        return len(data)