tuoheng_algN/concurrency/FeedbackThread.py

68 lines
2.8 KiB
Python

# -*- coding: utf-8 -*-
import time
from threading import Thread
from traceback import format_exc
from loguru import logger
from util.KafkaUtils import CustomerKafkaProducer
'''
问题反馈线程
'''
class FeedbackThread(Thread):
__slots__ = ('__fbQueue', '__kafka_config')
def __init__(self, fbQueue, kafka_config):
super().__init__()
self.__fbQueue = fbQueue
self.__kafka_config = kafka_config
'''
阻塞获取反馈消息
'''
def getFeedback(self):
return self.__fbQueue.get()
def run(self):
logger.info("启动问题反馈线程")
kafkaProducer = CustomerKafkaProducer(self.__kafka_config)
dsp_alg_results_topic = self.__kafka_config["topic"]["dsp-alg-results-topic"]
dsp_recording_result_topic = self.__kafka_config["topic"]["dsp-recording-result-topic"]
dsp_push_stream_result_topic = self.__kafka_config["topic"]["dsp-push-stream-result-topic"]
while True:
logger.info("问题反馈发送消息循环")
feedback = None
recording = None
pull_stream = None
try:
fb = self.getFeedback()
if fb is not None and len(fb) > 0:
feedback = fb.get("feedback")
recording = fb.get("recording")
pull_stream = fb.get("pull_stream")
if feedback is not None and len(feedback) > 0:
kafkaProducer.sender(dsp_alg_results_topic, feedback["request_id"], feedback, 1)
feedback = None
if recording is not None and len(recording) > 0:
kafkaProducer.sender(dsp_recording_result_topic, recording["request_id"], recording, 1)
recording = None
if pull_stream is not None and len(pull_stream) > 0:
kafkaProducer.sender(dsp_push_stream_result_topic, pull_stream["request_id"], pull_stream, 1)
pull_stream = None
else:
time.sleep(1)
except Exception:
if feedback and feedback.get("request_id"):
logger.error("问题反馈异常:{}, requestId:{}", format_exc(), feedback.get("request_id"))
elif recording and recording.get("request_id"):
logger.error("问题反馈异常:{}, requestId:{}", format_exc(), recording.get("request_id"))
elif pull_stream and pull_stream.get("request_id"):
logger.error("问题反馈异常:{}, requestId:{}", format_exc(), pull_stream.get("request_id"))
else:
logger.error("问题反馈异常:{}", format_exc())
logger.info("问题反馈线程执行完成")