|
- # -*- coding: utf-8 -*-
- from threading import Thread
- from time import sleep, time
- from traceback import format_exc
-
- from loguru import logger
-
- from common.Constant import init_progess
- from enums.AnalysisStatusEnum import AnalysisStatus
- from entity.FeedBack import message_feedback
- from enums.ExceptionEnum import ExceptionType
- from exception.CustomerException import ServiceException
- from util.QueUtil import get_no_block_queue, put_queue, clear_queue
-
-
- class Heartbeat(Thread):
- __slots__ = ('__fb_queue', '__hb_queue', '__request_id', '__analyse_type', "_context")
-
- def __init__(self, *args):
- super().__init__()
- self.__fb_queue, self.__hb_queue, self.__request_id, self.__analyse_type, self._context = args
-
- def run(self):
- request_id, hb_queue, progress = self.__request_id, self.__hb_queue, init_progess
- analyse_type, fb_queue = self.__analyse_type, self.__fb_queue
- service_timeout = int(self._context["service"]["timeout"]) + 120
- try:
- logger.info("开始启动心跳线程!requestId:{}", request_id)
- start_time = time()
- hb_init_num = 0
- while True:
- if time() - start_time > service_timeout:
- logger.error("心跳运行超时, requestId: {}", request_id)
- raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
- ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
- sleep(3)
- hb_msg = get_no_block_queue(hb_queue)
- if hb_msg is not None:
- command = hb_msg.get("command")
- hb_value = hb_msg.get("hb_value")
- if 'stop' == command:
- logger.info("开始终止心跳线程, requestId:{}", request_id)
- break
- if hb_value is not None:
- progress = hb_value
- if hb_init_num % 30 == 0:
- hb_init_num = 0
- put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.RUNNING.value, analyse_type,
- progress=progress), timeout=3, is_ex=True)
-
- hb_init_num += 3
- del hb_msg
- except Exception:
- logger.error("心跳线程异常:{}, requestId:{}", format_exc(), request_id)
- finally:
- clear_queue(hb_queue)
- logger.info("心跳线程停止完成!requestId:{}", request_id)
|