You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

51 lines
1.5KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from threading import Thread
  4. from traceback import format_exc
  5. from loguru import logger
  6. from util.KafkaUtils import CustomerKafkaProducer
  7. '''
  8. 问题反馈线程
  9. '''
  10. class FeedbackThread(Thread):
  11. __slots__ = [
  12. '__fbQueue',
  13. '__context'
  14. ]
  15. def __init__(self, fbQueue, context):
  16. super().__init__()
  17. self.__fbQueue = fbQueue
  18. self.__context = context
  19. '''
  20. 阻塞获取反馈消息
  21. '''
  22. def getFeedback(self):
  23. return self.__fbQueue.get()
  24. def run(self):
  25. logger.info("启动问题反馈线程")
  26. kafkaProducer = CustomerKafkaProducer(self.__context)
  27. dsp_push_stream_result_topic = self.__context["kafka"]["topic"]["dsp-push-stream-result-topic"]
  28. while True:
  29. logger.info("问题反馈发送消息循环")
  30. feedback = {}
  31. try:
  32. fb = self.getFeedback()
  33. if fb is not None and len(fb) > 0:
  34. pull_stream = fb.get("pull_stream")
  35. if pull_stream is not None and len(pull_stream) > 0:
  36. kafkaProducer.sender(dsp_push_stream_result_topic, pull_stream["requestId"], pull_stream, 1)
  37. else:
  38. time.sleep(1)
  39. except Exception:
  40. logger.error("问题反馈异常:{}, requestId:{}", format_exc(), feedback.get("request_id"))
  41. logger.info("问题反馈线程执行完成")