You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

91 lines
2.2KB

  1. # -*- coding: utf-8 -*-
  2. import threading
  3. import sys
  4. from loguru import logger
  5. try:
  6. import Queue as queue
  7. except ImportError:
  8. import queue
  9. import traceback
  10. class TaskQueue(object):
  11. def __init__(self, producer, consumers):
  12. self.__producer = producer
  13. self.__consumers = consumers
  14. self.__threads = []
  15. # must be an infinite queue, otherwise producer may be blocked after all consumers being dead.
  16. self.__queue = queue.Queue()
  17. self.__lock = threading.Lock()
  18. self.__exc_info = None
  19. self.__exc_stack = ''
  20. def run(self):
  21. self.__add_and_run(threading.Thread(target=self.__producer_func))
  22. for c in self.__consumers:
  23. self.__add_and_run(threading.Thread(target=self.__consumer_func, args=(c,)))
  24. # give KeyboardInterrupt chances to happen by joining with timeouts.
  25. while self.__any_active():
  26. for t in self.__threads:
  27. t.join(1)
  28. if self.__exc_info:
  29. logger.error('An exception was thrown by producer or consumer, backtrace: {0}'.format(self.__exc_stack))
  30. raise self.__exc_info[1]
  31. def put(self, data):
  32. assert data is not None
  33. self.__queue.put(data)
  34. def get(self):
  35. return self.__queue.get()
  36. def ok(self):
  37. with self.__lock:
  38. return self.__exc_info is None
  39. def __add_and_run(self, thread):
  40. thread.daemon = True
  41. thread.start()
  42. self.__threads.append(thread)
  43. def __any_active(self):
  44. return any(t.is_alive() for t in self.__threads)
  45. def __producer_func(self):
  46. try:
  47. self.__producer(self)
  48. except:
  49. self.__on_exception(sys.exc_info())
  50. self.__put_end()
  51. else:
  52. self.__put_end()
  53. def __consumer_func(self, consumer):
  54. try:
  55. consumer(self)
  56. except:
  57. self.__on_exception(sys.exc_info())
  58. def __put_end(self):
  59. for i in range(len(self.__consumers)):
  60. self.__queue.put(None)
  61. def __on_exception(self, exc_info):
  62. with self.__lock:
  63. if self.__exc_info is None:
  64. self.__exc_info = exc_info
  65. self.__exc_stack = traceback.format_exc()