You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

68 lines
2.8KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from threading import Thread
  4. from traceback import format_exc
  5. from loguru import logger
  6. from util.KafkaUtils import CustomerKafkaProducer
  7. '''
  8. 问题反馈线程
  9. '''
  10. class FeedbackThread(Thread):
  11. __slots__ = ('__fbQueue', '__kafka_config')
  12. def __init__(self, fbQueue, kafka_config):
  13. super().__init__()
  14. self.__fbQueue = fbQueue
  15. self.__kafka_config = kafka_config
  16. '''
  17. 阻塞获取反馈消息
  18. '''
  19. def getFeedback(self):
  20. return self.__fbQueue.get()
  21. def run(self):
  22. logger.info("启动问题反馈线程")
  23. kafkaProducer = CustomerKafkaProducer(self.__kafka_config)
  24. dsp_alg_results_topic = self.__kafka_config["topic"]["dsp-alg-results-topic"]
  25. dsp_recording_result_topic = self.__kafka_config["topic"]["dsp-recording-result-topic"]
  26. dsp_push_stream_result_topic = self.__kafka_config["topic"]["dsp-push-stream-result-topic"]
  27. while True:
  28. logger.info("问题反馈发送消息循环")
  29. feedback = None
  30. recording = None
  31. pull_stream = None
  32. try:
  33. fb = self.getFeedback()
  34. if fb is not None and len(fb) > 0:
  35. feedback = fb.get("feedback")
  36. recording = fb.get("recording")
  37. pull_stream = fb.get("pull_stream")
  38. if feedback is not None and len(feedback) > 0:
  39. kafkaProducer.sender(dsp_alg_results_topic, feedback["request_id"], feedback, 1)
  40. feedback = None
  41. if recording is not None and len(recording) > 0:
  42. kafkaProducer.sender(dsp_recording_result_topic, recording["request_id"], recording, 1)
  43. recording = None
  44. if pull_stream is not None and len(pull_stream) > 0:
  45. kafkaProducer.sender(dsp_push_stream_result_topic, pull_stream["request_id"], pull_stream, 1)
  46. pull_stream = None
  47. else:
  48. time.sleep(1)
  49. except Exception:
  50. if feedback and feedback.get("request_id"):
  51. logger.error("问题反馈异常:{}, requestId:{}", format_exc(), feedback.get("request_id"))
  52. elif recording and recording.get("request_id"):
  53. logger.error("问题反馈异常:{}, requestId:{}", format_exc(), recording.get("request_id"))
  54. elif pull_stream and pull_stream.get("request_id"):
  55. logger.error("问题反馈异常:{}, requestId:{}", format_exc(), pull_stream.get("request_id"))
  56. else:
  57. logger.error("问题反馈异常:{}", format_exc())
  58. logger.info("问题反馈线程执行完成")