# -*- coding: utf-8 -*- import time from threading import Thread from traceback import format_exc from loguru import logger from util.KafkaUtils import CustomerKafkaProducer ''' 问题反馈线程 ''' class FeedbackThread(Thread): __slots__ = ('__fbQueue', '__kafka_config') def __init__(self, fbQueue, kafka_config): super().__init__() self.__fbQueue = fbQueue self.__kafka_config = kafka_config ''' 阻塞获取反馈消息 ''' def getFeedback(self): return self.__fbQueue.get() def run(self): logger.info("启动问题反馈线程") kafkaProducer = CustomerKafkaProducer(self.__kafka_config) dsp_alg_results_topic = self.__kafka_config["topic"]["dsp-alg-results-topic"] dsp_recording_result_topic = self.__kafka_config["topic"]["dsp-recording-result-topic"] dsp_push_stream_result_topic = self.__kafka_config["topic"]["dsp-push-stream-result-topic"] while True: logger.info("问题反馈发送消息循环") feedback = None recording = None pull_stream = None try: fb = self.getFeedback() if fb is not None and len(fb) > 0: feedback = fb.get("feedback") recording = fb.get("recording") pull_stream = fb.get("pull_stream") if feedback is not None and len(feedback) > 0: kafkaProducer.sender(dsp_alg_results_topic, feedback["request_id"], feedback, 1) feedback = None if recording is not None and len(recording) > 0: kafkaProducer.sender(dsp_recording_result_topic, recording["request_id"], recording, 1) recording = None if pull_stream is not None and len(pull_stream) > 0: kafkaProducer.sender(dsp_push_stream_result_topic, pull_stream["request_id"], pull_stream, 1) pull_stream = None else: time.sleep(1) except Exception: if feedback and feedback.get("request_id"): logger.error("问题反馈异常:{}, requestId:{}", format_exc(), feedback.get("request_id")) elif recording and recording.get("request_id"): logger.error("问题反馈异常:{}, requestId:{}", format_exc(), recording.get("request_id")) elif pull_stream and pull_stream.get("request_id"): logger.error("问题反馈异常:{}, requestId:{}", format_exc(), pull_stream.get("request_id")) else: logger.error("问题反馈异常:{}", format_exc()) logger.info("问题反馈线程执行完成")