|
- # -*- coding: utf-8 -*-
-
- import threading
- import sys
-
- from loguru import logger
-
- try:
- import Queue as queue
- except ImportError:
- import queue
-
- import traceback
-
-
- class TaskQueue(object):
- def __init__(self, producer, consumers):
- self.__producer = producer
- self.__consumers = consumers
-
- self.__threads = []
-
- # must be an infinite queue, otherwise producer may be blocked after all consumers being dead.
- self.__queue = queue.Queue()
-
- self.__lock = threading.Lock()
- self.__exc_info = None
- self.__exc_stack = ''
-
- def run(self):
- self.__add_and_run(threading.Thread(target=self.__producer_func))
-
- for c in self.__consumers:
- self.__add_and_run(threading.Thread(target=self.__consumer_func, args=(c,)))
-
- # give KeyboardInterrupt chances to happen by joining with timeouts.
- while self.__any_active():
- for t in self.__threads:
- t.join(1)
-
- if self.__exc_info:
- logger.error('An exception was thrown by producer or consumer, backtrace: {0}'.format(self.__exc_stack))
- raise self.__exc_info[1]
-
- def put(self, data):
- assert data is not None
- self.__queue.put(data)
-
- def get(self):
- return self.__queue.get()
-
- def ok(self):
- with self.__lock:
- return self.__exc_info is None
-
- def __add_and_run(self, thread):
- thread.daemon = True
- thread.start()
- self.__threads.append(thread)
-
- def __any_active(self):
- return any(t.is_alive() for t in self.__threads)
-
- def __producer_func(self):
- try:
- self.__producer(self)
- except:
- self.__on_exception(sys.exc_info())
- self.__put_end()
- else:
- self.__put_end()
-
- def __consumer_func(self, consumer):
- try:
- consumer(self)
- except:
- self.__on_exception(sys.exc_info())
-
- def __put_end(self):
- for i in range(len(self.__consumers)):
- self.__queue.put(None)
-
- def __on_exception(self, exc_info):
- with self.__lock:
- if self.__exc_info is None:
- self.__exc_info = exc_info
- self.__exc_stack = traceback.format_exc()
-
-
|