Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

58 rindas
2.5KB

  1. # -*- coding: utf-8 -*-
  2. from threading import Thread
  3. from time import sleep, time
  4. from traceback import format_exc
  5. from loguru import logger
  6. from common.Constant import init_progess
  7. from enums.AnalysisStatusEnum import AnalysisStatus
  8. from entity.FeedBack import message_feedback
  9. from enums.ExceptionEnum import ExceptionType
  10. from exception.CustomerException import ServiceException
  11. from util.QueUtil import get_no_block_queue, put_queue, clear_queue
  12. class Heartbeat(Thread):
  13. __slots__ = ('__fb_queue', '__hb_queue', '__request_id', '__analyse_type', "_context")
  14. def __init__(self, *args):
  15. super().__init__()
  16. self.__fb_queue, self.__hb_queue, self.__request_id, self.__analyse_type, self._context = args
  17. def run(self):
  18. request_id, hb_queue, progress = self.__request_id, self.__hb_queue, init_progess
  19. analyse_type, fb_queue = self.__analyse_type, self.__fb_queue
  20. service_timeout = int(self._context["service"]["timeout"]) + 120
  21. try:
  22. logger.info("开始启动心跳线程!requestId:{}", request_id)
  23. start_time = time()
  24. hb_init_num = 0
  25. while True:
  26. if time() - start_time > service_timeout:
  27. logger.error("心跳运行超时, requestId: {}", request_id)
  28. raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
  29. ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
  30. sleep(3)
  31. hb_msg = get_no_block_queue(hb_queue)
  32. if hb_msg is not None:
  33. command = hb_msg.get("command")
  34. hb_value = hb_msg.get("hb_value")
  35. if 'stop' == command:
  36. logger.info("开始终止心跳线程, requestId:{}", request_id)
  37. break
  38. if hb_value is not None:
  39. progress = hb_value
  40. if hb_init_num % 30 == 0:
  41. hb_init_num = 0
  42. put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.RUNNING.value, analyse_type,
  43. progress=progress), timeout=3, is_ex=True)
  44. hb_init_num += 3
  45. del hb_msg
  46. except Exception:
  47. logger.error("心跳线程异常:{}, requestId:{}", format_exc(), request_id)
  48. finally:
  49. clear_queue(hb_queue)
  50. logger.info("心跳线程停止完成!requestId:{}", request_id)