|
- # -*- coding: utf-8 -*-
- import time
- from threading import Thread
- from traceback import format_exc
-
- from loguru import logger
-
- from util.KafkaUtils import CustomerKafkaProducer
-
- '''
- 问题反馈线程
- '''
-
-
- class FeedbackThread(Thread):
- __slots__ = [
- '__fbQueue',
- '__context'
- ]
-
- def __init__(self, fbQueue, context):
- super().__init__()
- self.__fbQueue = fbQueue
- self.__context = context
-
- '''
- 阻塞获取反馈消息
- '''
-
- def getFeedback(self):
- return self.__fbQueue.get()
-
- def run(self):
- logger.info("启动问题反馈线程")
- kafkaProducer = CustomerKafkaProducer(self.__context)
- dsp_push_stream_result_topic = self.__context["kafka"]["topic"]["dsp-push-stream-result-topic"]
- while True:
- logger.info("问题反馈发送消息循环")
- feedback = {}
- try:
- fb = self.getFeedback()
- if fb is not None and len(fb) > 0:
- pull_stream = fb.get("pull_stream")
- if pull_stream is not None and len(pull_stream) > 0:
- kafkaProducer.sender(dsp_push_stream_result_topic, pull_stream["requestId"], pull_stream, 1)
- else:
- time.sleep(1)
- except Exception:
- logger.error("问题反馈异常:{}, requestId:{}", format_exc(), feedback.get("request_id"))
- logger.info("问题反馈线程执行完成")
|