aboutsummaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/cooker.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/cooker.py')
-rw-r--r--bitbake/lib/bb/cooker.py161
1 files changed, 27 insertions, 134 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py
index 9d051fa30f..9f7121fefc 100644
--- a/bitbake/lib/bb/cooker.py
+++ b/bitbake/lib/bb/cooker.py
@@ -34,7 +34,7 @@ from cStringIO import StringIO
from contextlib import closing
from functools import wraps
from collections import defaultdict
-import bb, bb.exceptions, bb.command
+import bb, bb.exceptions, bb.command, bb.compat
from bb import utils, data, parse, event, cache, providers, taskdata, runqueue
import Queue
import prserv.serv
@@ -1556,87 +1556,19 @@ class ParsingFailure(Exception):
self.recipe = recipe
Exception.__init__(self, realexception, recipe)
-class Feeder(multiprocessing.Process):
- def __init__(self, jobs, to_parsers, quit):
- self.quit = quit
- self.jobs = jobs
- self.to_parsers = to_parsers
- multiprocessing.Process.__init__(self)
-
- def run(self):
- while True:
- try:
- quit = self.quit.get_nowait()
- except Queue.Empty:
- pass
- else:
- if quit == 'cancel':
- self.to_parsers.cancel_join_thread()
- break
-
- try:
- job = self.jobs.pop()
- except IndexError:
- break
-
- try:
- self.to_parsers.put(job, timeout=0.5)
- except Queue.Full:
- self.jobs.insert(0, job)
- continue
-
-class Parser(multiprocessing.Process):
- def __init__(self, jobs, results, quit, init):
- self.jobs = jobs
- self.results = results
- self.quit = quit
- self.init = init
- multiprocessing.Process.__init__(self)
-
- def run(self):
- if self.init:
- self.init()
-
- pending = []
- while True:
- try:
- self.quit.get_nowait()
- except Queue.Empty:
- pass
- else:
- self.results.cancel_join_thread()
- break
-
- if pending:
- result = pending.pop()
- else:
- try:
- job = self.jobs.get(timeout=0.25)
- except Queue.Empty:
- continue
-
- if job is None:
- break
- result = self.parse(*job)
-
- try:
- self.results.put(result, timeout=0.25)
- except Queue.Full:
- pending.append(result)
-
- def parse(self, filename, appends, caches_array):
- try:
- return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array)
- except Exception as exc:
- tb = sys.exc_info()[2]
- exc.recipe = filename
- exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
- return True, exc
- # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
- # and for example a worker thread doesn't just exit on its own in response to
- # a SystemExit event for example.
- except BaseException as exc:
- return True, ParsingFailure(exc, filename)
+def parse_file((filename, appends, caches_array)):
+ try:
+ return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array)
+ except Exception as exc:
+ tb = sys.exc_info()[2]
+ exc.recipe = filename
+ exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
+ return True, exc
+ # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
+ # and for example a worker thread doesn't just exit on its own in response to
+ # a SystemExit event for example.
+ except BaseException as exc:
+ return True, ParsingFailure(exc, filename)
class CookerParser(object):
def __init__(self, cooker, filelist, masked):
@@ -1670,32 +1602,25 @@ class CookerParser(object):
self.fromcache.append((filename, appends))
self.toparse = self.total - len(self.fromcache)
self.progress_chunk = max(self.toparse / 100, 1)
+ self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1)
self.start()
self.haveshutdown = False
def start(self):
self.results = self.load_cached()
- self.processes = []
if self.toparse:
+ def process_init():
+ parse_file.cfg = self.cfgdata
+ multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1)
+ multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1)
+
bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
- def init():
- Parser.cfg = self.cfgdata
- multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1)
- multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1)
-
- self.feeder_quit = multiprocessing.Queue(maxsize=1)
- self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes)
- self.jobs = multiprocessing.Queue(maxsize=self.num_processes)
- self.result_queue = multiprocessing.Queue()
- self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit)
- self.feeder.start()
- for i in range(0, self.num_processes):
- parser = Parser(self.jobs, self.result_queue, self.parser_quit, init)
- parser.start()
- self.processes.append(parser)
-
- self.results = itertools.chain(self.results, self.parse_generator())
+
+ self.pool = bb.compat.Pool(self.num_processes, process_init)
+ parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk)
+ self.pool.close()
+ self.results = itertools.chain(self.results, parsed)
def shutdown(self, clean=True, force=False):
if not self.toparse:
@@ -1711,25 +1636,9 @@ class CookerParser(object):
self.total)
bb.event.fire(event, self.cfgdata)
- self.feeder_quit.put(None)
- for process in self.processes:
- self.jobs.put(None)
else:
- self.feeder_quit.put('cancel')
-
- self.parser_quit.cancel_join_thread()
- for process in self.processes:
- self.parser_quit.put(None)
-
- self.jobs.cancel_join_thread()
-
- for process in self.processes:
- if force:
- process.join(.1)
- process.terminate()
- else:
- process.join()
- self.feeder.join()
+ self.pool.terminate()
+ self.pool.join()
sync = threading.Thread(target=self.bb_cache.sync)
sync.start()
@@ -1742,22 +1651,6 @@ class CookerParser(object):
cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
yield not cached, infos
- def parse_generator(self):
- while True:
- if self.parsed >= self.toparse:
- break
-
- try:
- result = self.result_queue.get(timeout=0.25)
- except Queue.Empty:
- pass
- else:
- value = result[1]
- if isinstance(value, BaseException):
- raise value
- else:
- yield result
-
def parse_next(self):
result = []
parsed = None