58 lines
2.5 KiB
Python
58 lines
2.5 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
from threading import Thread
|
|||
|
|
from time import sleep, time
|
|||
|
|
from traceback import format_exc
|
|||
|
|
|
|||
|
|
from loguru import logger
|
|||
|
|
|
|||
|
|
from common.Constant import init_progess
|
|||
|
|
from enums.AnalysisStatusEnum import AnalysisStatus
|
|||
|
|
from entity.FeedBack import message_feedback
|
|||
|
|
from enums.ExceptionEnum import ExceptionType
|
|||
|
|
from exception.CustomerException import ServiceException
|
|||
|
|
from util.QueUtil import get_no_block_queue, put_queue, clear_queue
|
|||
|
|
|
|||
|
|
|
|||
|
|
class Heartbeat(Thread):
|
|||
|
|
__slots__ = ('__fb_queue', '__hb_queue', '__request_id', '__analyse_type', "_context")
|
|||
|
|
|
|||
|
|
def __init__(self, *args):
|
|||
|
|
super().__init__()
|
|||
|
|
self.__fb_queue, self.__hb_queue, self.__request_id, self.__analyse_type, self._context = args
|
|||
|
|
|
|||
|
|
def run(self):
|
|||
|
|
request_id, hb_queue, progress = self.__request_id, self.__hb_queue, init_progess
|
|||
|
|
analyse_type, fb_queue = self.__analyse_type, self.__fb_queue
|
|||
|
|
service_timeout = int(self._context["service"]["timeout"]) + 120
|
|||
|
|
try:
|
|||
|
|
logger.info("开始启动心跳线程!requestId:{}", request_id)
|
|||
|
|
start_time = time()
|
|||
|
|
hb_init_num = 0
|
|||
|
|
while True:
|
|||
|
|
if time() - start_time > service_timeout:
|
|||
|
|
logger.error("心跳运行超时, requestId: {}", request_id)
|
|||
|
|
raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
|
|||
|
|
ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
|
|||
|
|
sleep(3)
|
|||
|
|
hb_msg = get_no_block_queue(hb_queue)
|
|||
|
|
if hb_msg is not None:
|
|||
|
|
command = hb_msg.get("command")
|
|||
|
|
hb_value = hb_msg.get("hb_value")
|
|||
|
|
if 'stop' == command:
|
|||
|
|
logger.info("开始终止心跳线程, requestId:{}", request_id)
|
|||
|
|
break
|
|||
|
|
if hb_value is not None:
|
|||
|
|
progress = hb_value
|
|||
|
|
if hb_init_num % 30 == 0:
|
|||
|
|
hb_init_num = 0
|
|||
|
|
put_queue(fb_queue, message_feedback(request_id, AnalysisStatus.RUNNING.value, analyse_type,
|
|||
|
|
progress=progress), timeout=3, is_ex=True)
|
|||
|
|
|
|||
|
|
hb_init_num += 3
|
|||
|
|
del hb_msg
|
|||
|
|
except Exception:
|
|||
|
|
logger.error("心跳线程异常:{}, requestId:{}", format_exc(), request_id)
|
|||
|
|
finally:
|
|||
|
|
clear_queue(hb_queue)
|
|||
|
|
logger.info("心跳线程停止完成!requestId:{}", request_id)
|