|
- # -*- coding: utf-8 -*-
- import time
- from loguru import logger
- from queue import Queue
- from threading import Thread
- from util import KafkaUtils
-
- '''
- 实时、离线消息拉取线程
- '''
-
-
- class MessagePollingThread(Thread):
- # 实时流分析消息拉取线程
- def __init__(self, name, cfg):
- super().__init__()
- self.name = name
- self.cfg = cfg
- self.msgQueue = Queue()
-
- def getMsgQueue(self):
- eBody = None
- try:
- eBody = self.msgQueue.get(block=False)
- except Exception as e:
- pass
- return eBody
-
- def run(self):
- logger.info("{} 线程任务开始执行", self.name)
- # 指令消息消费
- customerKafkaConsumer = KafkaUtils.CustomerKafkaConsumer(self.cfg["content"])
- customerKafkaConsumer.subscribe(topics=self.cfg["topics"])
- while True:
- try:
- if self.msgQueue.qsize() > 0:
- time.sleep(2)
- continue
- if customerKafkaConsumer.customerConsumer is None:
- customerKafkaConsumer.get_consumer()
- customerKafkaConsumer.subscribe(topics=self.cfg["topics"])
- # 拉取消息问题 1:后面运行会吃力,建议每次一条一拉
- msg = customerKafkaConsumer.customerConsumer.poll()
- if msg is not None and len(msg) > 0:
- self.msgQueue.put(msg)
- for k, v in msg.items():
- for m in v:
- customerKafkaConsumer.commit_offset(m)
- except Exception as e:
- logger.error("消息监听异常:")
- logger.exception(e)
- time.sleep(1)
-
- def poll(self):
- if self.msgQueue.qsize() > 0:
- return self.getMsgQueue()
- return None
-
-
- class OnlineMessagePollingThread(MessagePollingThread):
- # 实时流分析消息拉取线程
- pass
-
-
- class OfflineMessagePollingThread(MessagePollingThread):
- # 实时流分析消息拉取线程
- pass
-
- # if __name__ == '__main__':
- # t1 = OnlineMessagePollingThread('t1', {'bootstrap_servers': [
- # 'localhost:9092'], 'group_id': 'algSch', 'topics': ('alg_online_tasks',)})
- # t1.start()
- #
- # t2 = OfflineMessagePollingThread('t2', {'bootstrap_servers': [
- # 'localhost:9092'], 'group_id': 'algSch', 'topics': ('alg_offline_tasks',)})
- # t2.start()
- #
- # while True:
- # print(t1.poll())
- # print(t2.poll())
- # time.sleep(1)
|