You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 line
4.1KB

  1. # -*- coding: utf-8 -*-
  2. import json
  3. from multiprocessing import Process, Queue
  4. from util import LogUtils
  5. from util import KafkaUtils
  6. from loguru import logger
  7. from concurrency.FileUpdateThread import ImageFileUpdate
  8. from concurrency.HeartbeatThread import Heartbeat
  9. import time
  10. class CommonProcess(Process):
  11. def __init__(self, fbQueue, hbQueue, content, msg, imageQueue, mode_service):
  12. super().__init__()
  13. self.fbQueue = fbQueue
  14. self.hbQueue = hbQueue
  15. self.content = content
  16. self.msg = msg
  17. self.mode_service = mode_service
  18. self.imageQueue = imageQueue
  19. def getFeedback(self):
  20. eBody = None
  21. try:
  22. eBody = self.fbQueue.get(block=False)
  23. except Exception as e:
  24. pass
  25. return eBody
  26. # 推送执行结果
  27. def sendImageResult(self, result):
  28. while self.imageQueue.full():
  29. logger.info("图片上传队列已满, 2秒后重试! requestId:{}", self.msg.get("request_id"))
  30. time.sleep(2)
  31. self.imageQueue.put(result)
  32. def run(self):
  33. # 初始化日志配置
  34. LogUtils.init_log(self.content)
  35. logger.info("心跳、图片上传,反馈进程开始执行, requestId:{}", self.msg.get("request_id"))
  36. # 启动心跳线程
  37. hb = Heartbeat(self.fbQueue, self.hbQueue, self.msg.get("request_id"), self.mode_service)
  38. hb.setDaemon(True)
  39. hb.start()
  40. # 图片上传线程
  41. imageFileUpdate = ImageFileUpdate(self.fbQueue, self.content, self.msg, self.imageQueue, self.mode_service)
  42. imageFileUpdate.setDaemon(True)
  43. imageFileUpdate.start()
  44. kafkaProducer = KafkaUtils.CustomerKafkaProducer(self.content, self.msg.get("request_id"))
  45. # 心跳线程检测
  46. heartbeat_num = 0
  47. # 图片上传线程检测
  48. imageFileUpdate_num = 0
  49. while True:
  50. try:
  51. if heartbeat_num == 0 and not hb.is_alive():
  52. logger.error("未检测到心跳线程活动,心跳线程可能出现异常, reuqestId:{}", self.msg.get("request_id"))
  53. break
  54. if imageFileUpdate_num == 0 and not imageFileUpdate.is_alive():
  55. logger.error("未检测到图片上传线程活动,图片上传线程可能出现异常, reuqestId:{}", self.msg.get("request_id"))
  56. break
  57. fb = self.getFeedback()
  58. if fb is not None and len(fb) > 0:
  59. feedback = fb.get("feedback")
  60. command = fb.get("command")
  61. if feedback is not None and len(feedback) > 0:
  62. kafkaProducer.get_producer()
  63. kafkaProducer.sender(self.content["kafka"]["topic"]["dsp-alg-results-topic"],
  64. feedback["request_id"], feedback, 1)
  65. if command is not None and len(command) > 0:
  66. # 接收心跳线程和图片上传停止指令
  67. if 'stop_heartbeat_imageFileUpdate' == command:
  68. heartbeat_num += 1
  69. imageFileUpdate_num += 1
  70. hb.run_status = False
  71. self.sendImageResult({"command": "stop"})
  72. hb.join()
  73. imageFileUpdate.join()
  74. # 接收进程停止指令
  75. if 'stop' == command:
  76. heartbeat_num += 1
  77. imageFileUpdate_num += 1
  78. hb.run_status = False
  79. self.sendImageResult({"command": "stop"})
  80. hb.join()
  81. imageFileUpdate.join()
  82. break
  83. else:
  84. time.sleep(1)
  85. except Exception as e:
  86. logger.exception("结果反馈异常:{}, requestId:{}", e, self.msg.get("request_id"))
  87. logger.info("心跳、图片上传,反馈进程执行完成, requestId:{}", self.msg.get("request_id"))