You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

216 lines
7.3KB

  1. # -*- coding: utf-8 -*-
  2. import atexit
  3. from concurrent.futures import _base
  4. import itertools
  5. import queue
  6. import threading
  7. import weakref
  8. import os
  9. from util.QueUtil import clear_queue
  10. _threads_queues = weakref.WeakKeyDictionary()
  11. _shutdown = False
  12. def _python_exit():
  13. global _shutdown
  14. _shutdown = True
  15. items = list(_threads_queues.items())
  16. for t, q in items:
  17. q.put(None)
  18. for t, q in items:
  19. t.join()
  20. atexit.register(_python_exit)
  21. class _WorkItem(object):
  22. def __init__(self, future, fn, args, kwargs):
  23. self.future = future
  24. self.fn = fn
  25. self.args = args
  26. self.kwargs = kwargs
  27. def run(self):
  28. if not self.future.set_running_or_notify_cancel():
  29. return
  30. try:
  31. result = self.fn(*self.args, **self.kwargs)
  32. except BaseException as exc:
  33. self.future.set_exception(exc)
  34. # Break a reference cycle with the exception 'exc'
  35. self = None
  36. else:
  37. self.future.set_result(result)
  38. def _worker(executor_reference, work_queue, initializer, initargs):
  39. if initializer is not None:
  40. try:
  41. initializer(*initargs)
  42. except BaseException:
  43. _base.LOGGER.critical('Exception in initializer:', exc_info=True)
  44. executor = executor_reference()
  45. if executor is not None:
  46. executor._initializer_failed()
  47. return
  48. try:
  49. while True:
  50. work_item = work_queue.get(block=True)
  51. if work_item is not None:
  52. work_item.run()
  53. # Delete references to object. See issue16284
  54. del work_item
  55. # attempt to increment idle count
  56. executor = executor_reference()
  57. if executor is not None:
  58. executor._idle_semaphore.release()
  59. del executor
  60. continue
  61. executor = executor_reference()
  62. # Exit if:
  63. # - The interpreter is shutting down OR
  64. # - The executor that owns the worker has been collected OR
  65. # - The executor that owns the worker has been shutdown.
  66. if _shutdown or executor is None or executor._shutdown:
  67. # Flag the executor as shutting down as early as possible if it
  68. # is not gc-ed yet.
  69. if executor is not None:
  70. executor._shutdown = True
  71. # Notice other workers
  72. work_queue.put(None)
  73. return
  74. del executor
  75. except BaseException:
  76. _base.LOGGER.critical('Exception in worker', exc_info=True)
  77. class BrokenThreadPool(_base.BrokenExecutor):
  78. """
  79. 当 ThreadPoolExecutor 中的工作线程初始化失败时引发。
  80. """
  81. class ThreadPoolExecutorNew(_base.Executor):
  82. # 用于在未提供thread_name_prefix时分配唯一的线程名称
  83. _counter = itertools.count().__next__
  84. def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
  85. if max_workers is None:
  86. max_workers = min(32, (os.cpu_count() or 1) + 4)
  87. if max_workers <= 0:
  88. raise ValueError("max_workers必须大于0!")
  89. if initializer is not None and not callable(initializer):
  90. raise TypeError("initializer初始值设定项必须是可调用的!")
  91. self._max_workers = max_workers
  92. self._work_queue = queue.SimpleQueue()
  93. self._idle_semaphore = threading.Semaphore(0)
  94. self._threads = set()
  95. self._broken = False
  96. self._shutdown = False
  97. self._shutdown_lock = threading.Lock()
  98. self._thread_name_prefix = (("%s-%d" % (thread_name_prefix, self._counter()))
  99. or ("ThreadPoolExecutor-%d" % self._counter()))
  100. self._initializer = initializer
  101. self._initargs = initargs
  102. def submit(*args, **kwargs):
  103. if len(args) >= 2:
  104. self, fn, *args = args
  105. elif not args:
  106. raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
  107. "needs an argument")
  108. elif 'fn' in kwargs:
  109. fn = kwargs.pop('fn')
  110. self, *args = args
  111. import warnings
  112. warnings.warn("Passing 'fn' as keyword argument is deprecated",
  113. DeprecationWarning, stacklevel=2)
  114. else:
  115. raise TypeError('submit expected at least 1 positional argument, '
  116. 'got %d' % (len(args) - 1))
  117. with self._shutdown_lock:
  118. if self._broken:
  119. raise BrokenThreadPool(self._broken)
  120. if self._shutdown:
  121. raise RuntimeError('cannot schedule new futures after shutdown')
  122. if _shutdown:
  123. raise RuntimeError('cannot schedule new futures after '
  124. 'interpreter shutdown')
  125. f = _base.Future()
  126. w = _WorkItem(f, fn, args, kwargs)
  127. self._work_queue.put(w)
  128. self._adjust_thread_count()
  129. return f
  130. submit.__text_signature__ = _base.Executor.submit.__text_signature__
  131. submit.__doc__ = _base.Executor.submit.__doc__
  132. def _adjust_thread_count(self):
  133. # if idle threads are available, don't spin new threads
  134. if self._idle_semaphore.acquire(timeout=0):
  135. return
  136. # When the executor gets lost, the weakref callback will wake up
  137. # the worker threads.
  138. def weakref_cb(_, q=self._work_queue):
  139. q.put(None)
  140. num_threads = len(self._threads)
  141. if num_threads < self._max_workers:
  142. thread_name = '%s_%d' % (self._thread_name_prefix or self,
  143. num_threads)
  144. t = threading.Thread(name=thread_name, target=_worker,
  145. args=(weakref.ref(self, weakref_cb),
  146. self._work_queue,
  147. self._initializer,
  148. self._initargs))
  149. t.daemon = True
  150. t.start()
  151. self._threads.add(t)
  152. _threads_queues[t] = self._work_queue
  153. def _initializer_failed(self):
  154. with self._shutdown_lock:
  155. self._broken = ('A thread initializer failed, the thread pool '
  156. 'is not usable anymore')
  157. # Drain work queue and mark pending futures failed
  158. while True:
  159. try:
  160. work_item = self._work_queue.get_nowait()
  161. except queue.Empty:
  162. break
  163. if work_item is not None:
  164. work_item.future.set_exception(BrokenThreadPool(self._broken))
  165. def clear_work_queue(self):
  166. with self._shutdown_lock:
  167. self._shutdown = True
  168. clear_queue(self._work_queue, is_ex=False)
  169. self._work_queue.put(None)
  170. def shutdown(self, wait=True):
  171. with self._shutdown_lock:
  172. self._shutdown = True
  173. self._work_queue.put(None)
  174. if wait:
  175. for t in self._threads:
  176. t.join()
  177. shutdown.__doc__ = _base.Executor.shutdown.__doc__
  178. # if __name__ == "__main__":
  179. # semaphore = threading.Semaphore(0)
  180. # semaphore.release()
  181. # print(semaphore.acquire(timeout=0))