aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/status/persistent_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/status/persistent_queue.py')
-rw-r--r--lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/status/persistent_queue.py382
1 files changed, 0 insertions, 382 deletions
diff --git a/lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/status/persistent_queue.py b/lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/status/persistent_queue.py
deleted file mode 100644
index 0106a210..00000000
--- a/lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/status/persistent_queue.py
+++ /dev/null
@@ -1,382 +0,0 @@
-# This file is part of Buildbot. Buildbot is free software: you can
-# redistribute it and/or modify it under the terms of the GNU General Public
-# License as published by the Free Software Foundation, version 2.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
-# details.
-#
-# You should have received a copy of the GNU General Public License along with
-# this program; if not, write to the Free Software Foundation, Inc., 51
-# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-#
-# Copyright Buildbot Team Members
-
-from __future__ import with_statement
-
-
-from collections import deque
-import os
-import cPickle as pickle
-
-from zope.interface import implements, Interface
-
-
-def ReadFile(path):
- with open(path, 'rb') as f:
- return f.read()
-
-
-def WriteFile(path, buf):
- with open(path, 'wb') as f:
- f.write(buf)
-
-
-class IQueue(Interface):
- """Abstraction of a queue."""
- def pushItem(item):
- """Adds an individual item to the end of the queue.
-
- Returns an item if it was overflowed."""
-
- def insertBackChunk(items):
- """Adds a list of items as the oldest entries.
-
- Normally called in case of failure to process the data, queue the data
- back so it can be retrieved at a later time.
-
- Returns a list of items if it was overflowed."""
-
- def popChunk(nbItems=None):
- """Pop many items at once. Defaults to self.maxItems()."""
-
- def save():
- """Save the queue to storage if implemented."""
-
- def items():
- """Returns items in the queue.
-
- Warning: Can be extremely slow for queue on disk."""
-
- def nbItems():
- """Returns the number of items in the queue."""
-
- def maxItems():
- """Returns the maximum number of items this queue can hold."""
-
-
-class MemoryQueue(object):
- """Simple length bounded queue using deque.
-
- list.pop(0) operation is O(n) so for a 10000 items list, it can start to
- be real slow. On the contrary, deque.popleft() is O(1) most of the time.
- See http://docs.python.org/library/collections.html for more
- information.
- """
- implements(IQueue)
-
- def __init__(self, maxItems=None):
- self._maxItems = maxItems
- if self._maxItems is None:
- self._maxItems = 10000
- self._items = deque()
-
- def pushItem(self, item):
- ret = None
- if len(self._items) == self._maxItems:
- ret = self._items.popleft()
- self._items.append(item)
- return ret
-
- def insertBackChunk(self, chunk):
- ret = None
- excess = len(self._items) + len(chunk) - self._maxItems
- if excess > 0:
- ret = chunk[0:excess]
- chunk = chunk[excess:]
- self._items.extendleft(reversed(chunk))
- return ret
-
- def popChunk(self, nbItems=None):
- if nbItems is None:
- nbItems = self._maxItems
- if nbItems > len(self._items):
- items = list(self._items)
- self._items = deque()
- else:
- items = []
- for i in range(nbItems):
- items.append(self._items.popleft())
- return items
-
- def save(self):
- pass
-
- def items(self):
- return list(self._items)
-
- def nbItems(self):
- return len(self._items)
-
- def maxItems(self):
- return self._maxItems
-
-
-class DiskQueue(object):
- """Keeps a list of abstract items and serializes it to the disk.
-
- Use pickle for serialization."""
- implements(IQueue)
-
- def __init__(self, path, maxItems=None, pickleFn=pickle.dumps,
- unpickleFn=pickle.loads):
- """
- @path: directory to save the items.
- @maxItems: maximum number of items to keep on disk, flush the
- older ones.
- @pickleFn: function used to pack the items to disk.
- @unpickleFn: function used to unpack items from disk.
- """
- self.path = path
- self._maxItems = maxItems
- if self._maxItems is None:
- self._maxItems = 100000
- if not os.path.isdir(self.path):
- os.mkdir(self.path)
- self.pickleFn = pickleFn
- self.unpickleFn = unpickleFn
-
- # Total number of items.
- self._nbItems = 0
- # The actual items id start at one.
- self.firstItemId = 0
- self.lastItemId = 0
- self._loadFromDisk()
-
- def pushItem(self, item):
- ret = None
- if self._nbItems == self._maxItems:
- id = self._findNext(self.firstItemId)
- path = os.path.join(self.path, str(id))
- ret = self.unpickleFn(ReadFile(path))
- os.remove(path)
- self.firstItemId = id + 1
- else:
- self._nbItems += 1
- self.lastItemId += 1
- path = os.path.join(self.path, str(self.lastItemId))
- if os.path.exists(path):
- raise IOError('%s already exists.' % path)
- WriteFile(path, self.pickleFn(item))
- return ret
-
- def insertBackChunk(self, chunk):
- ret = None
- excess = self._nbItems + len(chunk) - self._maxItems
- if excess > 0:
- ret = chunk[0:excess]
- chunk = chunk[excess:]
- for i in reversed(chunk):
- self.firstItemId -= 1
- path = os.path.join(self.path, str(self.firstItemId))
- if os.path.exists(path):
- raise IOError('%s already exists.' % path)
- WriteFile(path, self.pickleFn(i))
- self._nbItems += 1
- return ret
-
- def popChunk(self, nbItems=None):
- if nbItems is None:
- nbItems = self._maxItems
- ret = []
- for i in range(nbItems):
- if self._nbItems == 0:
- break
- id = self._findNext(self.firstItemId)
- path = os.path.join(self.path, str(id))
- ret.append(self.unpickleFn(ReadFile(path)))
- os.remove(path)
- self._nbItems -= 1
- self.firstItemId = id + 1
- return ret
-
- def save(self):
- pass
-
- def items(self):
- """Warning, very slow."""
- ret = []
- for id in range(self.firstItemId, self.lastItemId + 1):
- path = os.path.join(self.path, str(id))
- if os.path.exists(path):
- ret.append(self.unpickleFn(ReadFile(path)))
- return ret
-
- def nbItems(self):
- return self._nbItems
-
- def maxItems(self):
- return self._maxItems
-
- #### Protected functions
-
- def _findNext(self, id):
- while True:
- path = os.path.join(self.path, str(id))
- if os.path.isfile(path):
- return id
- id += 1
- return None
-
- def _loadFromDisk(self):
- """Loads the queue from disk upto self.maxMemoryItems items into
- self.items.
- """
- def SafeInt(item):
- try:
- return int(item)
- except ValueError:
- return None
-
- files = filter(None, [SafeInt(x) for x in os.listdir(self.path)])
- files.sort()
- self._nbItems = len(files)
- if self._nbItems:
- self.firstItemId = files[0]
- self.lastItemId = files[-1]
-
-
-class PersistentQueue(object):
- """Keeps a list of abstract items and serializes it to the disk.
-
- It has 2 layers of queue, normally an in-memory queue and an on-disk queue.
- When the number of items in the primary queue gets too large, the new items
- are automatically saved to the secondary queue. The older items are kept in
- the primary queue.
- """
- implements(IQueue)
-
- def __init__(self, primaryQueue=None, secondaryQueue=None, path=None):
- """
- @primaryQueue: memory queue to use before buffering to disk.
- @secondaryQueue: disk queue to use as permanent buffer.
- @path: path is a shortcut when using default DiskQueue settings.
- """
- self.primaryQueue = primaryQueue
- if self.primaryQueue is None:
- self.primaryQueue = MemoryQueue()
- self.secondaryQueue = secondaryQueue
- if self.secondaryQueue is None:
- self.secondaryQueue = DiskQueue(path)
- # Preload data from the secondary queue only if we know we won't start
- # using the secondary queue right away.
- if self.secondaryQueue.nbItems() < self.primaryQueue.maxItems():
- self.primaryQueue.insertBackChunk(
- self.secondaryQueue.popChunk(self.primaryQueue.maxItems()))
-
- def pushItem(self, item):
- # If there is already items in secondaryQueue, we'd need to pop them
- # all to start inserting them into primaryQueue so don't bother and
- # just push it in secondaryQueue.
- if (self.secondaryQueue.nbItems() or
- self.primaryQueue.nbItems() == self.primaryQueue.maxItems()):
- item = self.secondaryQueue.pushItem(item)
- if item is None:
- return item
- # If item is not None, secondaryQueue overflowed. We need to push it
- # back to primaryQueue so the oldest item is dumped.
- # Or everything fit in the primaryQueue.
- return self.primaryQueue.pushItem(item)
-
- def insertBackChunk(self, chunk):
- ret = None
- # Overall excess
- excess = self.nbItems() + len(chunk) - self.maxItems()
- if excess > 0:
- ret = chunk[0:excess]
- chunk = chunk[excess:]
- # Memory excess
- excess = (self.primaryQueue.nbItems() + len(chunk) -
- self.primaryQueue.maxItems())
- if excess > 0:
- chunk2 = []
- for i in range(excess):
- chunk2.append(self.primaryQueue.items().pop())
- chunk2.reverse()
- x = self.primaryQueue.insertBackChunk(chunk)
- assert x is None, "primaryQueue.insertBackChunk did not return None"
- if excess > 0:
- x = self.secondaryQueue.insertBackChunk(chunk2)
- assert x is None, ("secondaryQueue.insertBackChunk did not return "
- " None")
- return ret
-
- def popChunk(self, nbItems=None):
- if nbItems is None:
- nbItems = self.primaryQueue.maxItems()
- ret = self.primaryQueue.popChunk(nbItems)
- nbItems -= len(ret)
- if nbItems and self.secondaryQueue.nbItems():
- ret.extend(self.secondaryQueue.popChunk(nbItems))
- return ret
-
- def save(self):
- self.secondaryQueue.insertBackChunk(self.primaryQueue.popChunk())
-
- def items(self):
- return self.primaryQueue.items() + self.secondaryQueue.items()
-
- def nbItems(self):
- return self.primaryQueue.nbItems() + self.secondaryQueue.nbItems()
-
- def maxItems(self):
- return self.primaryQueue.maxItems() + self.secondaryQueue.maxItems()
-
-
-class IndexedQueue(object):
- """Adds functionality to a IQueue object to track its usage.
-
- Adds a new member function getIndex() and modify popChunk() and
- insertBackChunk() to keep a virtual pointer to the queue's first entry
- index."""
- implements(IQueue)
-
- def __init__(self, queue):
- # Copy all the member functions from the other object that this class
- # doesn't already define.
- assert IQueue.providedBy(queue)
- def Filter(m):
- return (m[0] != '_' and callable(getattr(queue, m))
- and not hasattr(self, m))
- for member in filter(Filter, dir(queue)):
- setattr(self, member, getattr(queue, member))
- self.queue = queue
- self._index = 0
-
- def getIndex(self):
- return self._index
-
- def popChunk(self, *args, **kwargs):
- items = self.queue.popChunk(*args, **kwargs)
- if items:
- self._index += len(items)
- return items
-
- def insertBackChunk(self, items):
- self._index -= len(items)
- ret = self.queue.insertBackChunk(items)
- if ret:
- self._index += len(ret)
- return ret
-
-
-def ToIndexedQueue(queue):
- """If the IQueue wasn't already a IndexedQueue, makes it an IndexedQueue."""
- if not IQueue.providedBy(queue):
- raise TypeError("queue doesn't implement IQueue", queue)
- if isinstance(queue, IndexedQueue):
- return queue
- return IndexedQueue(queue)
-
-# vim: set ts=4 sts=4 sw=4 et: