diff --git a/concurrency/PushVideoStreamProcess.py b/concurrency/PushVideoStreamProcess.py index 6788d61..2004da7 100644 --- a/concurrency/PushVideoStreamProcess.py +++ b/concurrency/PushVideoStreamProcess.py @@ -1,4 +1,4 @@ -# -*- coding: utf-8 -*- +#ne -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor from multiprocessing import Process @@ -384,7 +384,6 @@ class OffPushStreamProcess(PushStreamProcess): if len(thread_p) > 0: for r in thread_p: r.result() - print('----line384:',self._algSwitch,self._algStatus) if self._algSwitch and (not self._algStatus): frame_merge = video_conjuncing(frame, frame.copy()) else: diff --git a/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc b/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc index c67b250..27cbd8c 100644 Binary files a/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc and b/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc differ diff --git a/concurrency/uploadGPU.py b/concurrency/uploadGPU.py new file mode 100644 index 0000000..a85c0a7 --- /dev/null +++ b/concurrency/uploadGPU.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +from threading import Thread +from time import sleep, time +from traceback import format_exc + +#from common.Constant import init_progess +import json,os,psutil,GPUtil,platform,socket +from kafka import KafkaProducer, KafkaConsumer +#from util.KafkaUtils import CustomerKafkaProducer +from common.YmlConstant import service_yml_path, kafka_yml_path +class uploadGPUinfos(Thread): + __slots__ = ('__kafka_config', "_context") + + def __init__(self, *args): + super().__init__() + self.__context,self.__kafka_config = args + self.__uploadInterval = self.__context['GPUpollInterval'] + #kafkaProducer = CustomerKafkaProducer(self.__kafka_config) + self.__producer = KafkaProducer( + bootstrap_servers=self.__kafka_config['bootstrap_servers'],#tencent yun + value_serializer=lambda v: v.encode('utf-8')) + + self.__topic = self.__kafka_config["topicGPU"] + def run(self): + while True: + + try: + #获取当前的gpu状态信息 + msg_dict = get_system_info() + #发送GPU状态到指定的topic + msg = json.dumps(msg_dict) + + # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json + + #future = kafkaProducer.sender(topic_on,msg) + future = self.__producer .send(self.__topic,msg) + try: + future.get(timeout=10) + except kafka_errors: + traceback.format_exc() + + sleep(self.__uploadInterval) + + except Exception as e: + print(e) + continue + #logger.error("上传GPU服务器线程状态异常:{}, requestId:{}", format_exc(), request_id) + + + + +def get_system_info(): + # 初始化一个字典来存储系统信息 + system_info = {} + + # 获取CPU信息 + system_info['CPU'] = { + 'Physical Cores': psutil.cpu_count(logical=False), # 物理核心数 + 'Logical Cores': psutil.cpu_count(logical=True), # 逻辑核心数 + 'Current Frequency': psutil.cpu_freq().current, # 当前频率 + 'Usage Per Core': psutil.cpu_percent(interval=1, percpu=True), # 每个核心的使用率 + 'Total Usage': psutil.cpu_percent(interval=1) # 总体CPU使用率 + } + + # 获取内存信息 + memory = psutil.virtual_memory() + system_info['Memory'] = { + 'Total': memory.total / (1024 ** 3), # 总内存,单位为GB + 'Available': memory.available / (1024 ** 3), # 可用内存 + 'Used': memory.used / (1024 ** 3), # 已用内存 + 'Usage Percentage': memory.percent # 内存使用率 + } + + # 获取GPU信息 + gpus = GPUtil.getGPUs() + system_info['GPU'] = [] + for gpu in gpus: + gpu_info = { + 'ID': gpu.id, + 'Name': gpu.name, + 'Load': gpu.load * 100, # GPU负载,百分比 + 'Memory Total': gpu.memoryTotal, # 总显存,单位为MB + 'Memory Used': gpu.memoryUsed, # 已用显存 + 'Memory Free': gpu.memoryFree, # 可用显存 + 'Temperature': gpu.temperature # GPU温度 + } + system_info['GPU'].append(gpu_info) + + # 获取系统信息 + system_info['System'] = { + 'Platform': platform.system(), # 操作系统类型 + 'Platform Version': platform.version(), # 操作系统版本 + 'Platform Release': platform.release(), # 操作系统发行版本 + 'Platform Node': platform.node(), # 网络名称 + 'Machine': platform.machine(), # 硬件架构 + 'Processor': platform.processor() # CPU架构 + } + + # 获取本机局域网IP地址(非回环地址) + try: + # 获取所有网络接口信息 + net_if_addrs = psutil.net_if_addrs() + for interface, addrs in net_if_addrs.items(): + for addr in addrs: + # 筛选IPv4地址且非回环地址 + if addr.family == socket.AF_INET and not addr.address.startswith("127."): + system_info['System']['Local IP Address'] = addr.address + break + if 'Local IP Address' in system_info['System']: + break + else: + system_info['System']['Local IP Address'] = "No local IP found" + except Exception as e: + system_info['System']['Local IP Address'] = "Unable to retrieve local IP address" + + return system_info + + + + +if __name__=="__main__": + + + context = { + 'GPUpollInterval':1, + 'topic':'server-status', + } + kafka_config = { + 'bootstrap_servers':['192.168.10.66:9092'] + } + + base_dir, env = '/home/thsw2/WJ/test/tuoheng_algN','test' + + + + + + upload_thread = uploadGPUinfos(context,kafka_config) + upload_thread.setDaemon(False) + + upload_thread.start() + + + # 主线程等待守护线程运行 + try: + while True: + sleep(1) + except KeyboardInterrupt: + print("主线程退出") + + + + \ No newline at end of file diff --git a/config/kafka/dsp_test_kafka.yml b/config/kafka/dsp_test_kafka.yml index 7d34c3d..3ba39d8 100644 --- a/config/kafka/dsp_test_kafka.yml +++ b/config/kafka/dsp_test_kafka.yml @@ -1,5 +1,6 @@ bootstrap_servers: ["106.14.96.218:19092"] -topic: +topicGPU: "server-status" +topicPort: dsp-alg-online-tasks-topic: "dsp-alg-online-tasks" dsp-alg-offline-tasks-topic: "dsp-alg-offline-tasks" dsp-alg-image-tasks-topic: "dsp-alg-image-tasks" @@ -8,6 +9,15 @@ topic: dsp-recording-result-topic: "dsp-recording-result" dsp-push-stream-task-topic: "dsp-push-stream-task" dsp-push-stream-result-topic: "dsp-push-stream-result" +topic: + dsp-alg-online-tasks-topic: "dsp-alg-online-tasks-192.168.11.7" + dsp-alg-offline-tasks-topic: "dsp-alg-offline-tasks-192.168.11.7" + dsp-alg-image-tasks-topic: "dsp-alg-image-tasks-192.168.11.7" + dsp-alg-results-topic: "dsp-alg-task-results" + dsp-recording-task-topic: "dsp-recording-task-192.168.11.7" + dsp-recording-result-topic: "dsp-recording-result" + dsp-push-stream-task-topic: "dsp-push-stream-task-192.168.11.7" + dsp-push-stream-result-topic: "dsp-push-stream-result" producer: acks: -1 retries: 3 diff --git a/config/service/dsp_test_service.yml b/config/service/dsp_test_service.yml index 00646a5..e00daac 100644 --- a/config/service/dsp_test_service.yml +++ b/config/service/dsp_test_service.yml @@ -3,6 +3,10 @@ video: file_path: "../dsp/video/" # 是否添加水印 video_add_water: false +#0-slave,1--master +role : 1 +#gpu信息上报间隔 +GPUpollInterval: 2 service: filter: # 图片得分多少分以上返回图片 @@ -27,7 +31,7 @@ service: image: limit: 20 #storage source,0--aliyun,1--minio - storage_source: 1 + storage_source: 0 #是否启用mqtt,0--不用,1--启用 mqtt_flag: 0 #是否启用alg控制功能 diff --git a/enums/ModelTypeEnum.py b/enums/ModelTypeEnum.py index 85192f5..fca7d90 100644 --- a/enums/ModelTypeEnum.py +++ b/enums/ModelTypeEnum.py @@ -126,7 +126,8 @@ class ModelType(Enum): 'seg_nclass': 3, 'segRegionCnt': 2, 'segPar': { - 'modelSize': (640, 360), + #'modelSize': (640, 360), + 'modelSize': (1920, 1080), 'mean': (0.485, 0.456, 0.406), 'std': (0.229, 0.224, 0.225), 'predResize': True, @@ -135,8 +136,8 @@ class ModelType(Enum): 'mixFunction': { 'function': tracfficAccidentMixFunction, 'pars': { - 'modelSize': (640, 360), - #'modelSize': (1920,1080), + #'modelSize': (640, 360), + 'modelSize': (1920,1080), 'RoadArea': 16000, 'roadVehicleAngle': 15, 'speedRoadVehicleAngleMax': 75, diff --git a/enums/__pycache__/ModelTypeEnum.cpython-38.pyc b/enums/__pycache__/ModelTypeEnum.cpython-38.pyc index fc9c834..ebfb24c 100644 Binary files a/enums/__pycache__/ModelTypeEnum.cpython-38.pyc and b/enums/__pycache__/ModelTypeEnum.cpython-38.pyc differ diff --git a/service/Dispatcher.py b/service/Dispatcher.py index 1eb2ad4..0641c05 100644 --- a/service/Dispatcher.py +++ b/service/Dispatcher.py @@ -2,13 +2,14 @@ import time,os from os.path import join from traceback import format_exc - +import json from cerberus import Validator from common.Constant import ONLINE_START_SCHEMA, ONLINE_STOP_SCHEMA, OFFLINE_START_SCHEMA, OFFLINE_STOP_SCHEMA, \ IMAGE_SCHEMA, RECORDING_START_SCHEMA, RECORDING_STOP_SCHEMA, PULL2PUSH_START_SCHEMA, PULL2PUSH_STOP_SCHEMA from common.YmlConstant import service_yml_path, kafka_yml_path from concurrency.FeedbackThread import FeedbackThread +from concurrency.uploadGPU import uploadGPUinfos from concurrency.IntelligentRecognitionProcess2 import OnlineIntelligentRecognitionProcess2, \ OfflineIntelligentRecognitionProcess2, PhotosIntelligentRecognitionProcess2 from concurrency.Pull2PushStreamProcess import PushStreamProcess @@ -26,11 +27,11 @@ from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecogniti OfflineIntelligentRecognitionProcess, PhotosIntelligentRecognitionProcess, ScreenRecordingProcess from util.CpuUtils import print_cpu_ex_status from util.FileUtils import create_dir_not_exist -from util.GPUtils import get_first_gpu_name, print_gpu_ex_status, check_cude_is_available +from util.GPUtils import get_first_gpu_name, print_gpu_ex_status, check_cude_is_available,select_best_server from util.KafkaUtils import CustomerKafkaConsumer from util.QueUtil import put_queue from util.RWUtils import getConfigs - +from kafka import KafkaProducer, KafkaConsumer ''' 分发服务 ''' @@ -38,7 +39,7 @@ from util.RWUtils import getConfigs class DispatcherService: __slots__ = ('__context', '__feedbackThread', '__listeningProcesses', '__fbQueue', '__topics','__taskType', '__task_type', - '__kafka_config', '__recordingProcesses', '__pull2PushProcesses') + '__kafka_config', '__recordingProcesses', '__pull2PushProcesses','__topicsPort','__gpuTopic','__role','__uploadGPUThread','__gpuDics','__producer') def __init__(self, base_dir, env): # 检测cuda是否活动 @@ -51,7 +52,7 @@ class DispatcherService: self.__context["base_dir"], self.__context["env"] = base_dir, env # 问题反馈线程 - self.__feedbackThread, self.__fbQueue = None, Queue() + self.__feedbackThread,self.__uploadGPUThread, self.__fbQueue = None,None, Queue() # 实时、离线、图片任务进程字典 self.__listeningProcesses = {} # 录屏任务进程字典 @@ -59,13 +60,34 @@ class DispatcherService: # 转推流任务进程字典 self.__pull2PushProcesses = {} self.__kafka_config = getConfigs(join(base_dir, kafka_yml_path % env)) - self.__topics = ( - self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"], # 实时监听topic - self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"], # 离线监听topic - self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"], # 图片监听topic - self.__kafka_config["topic"]["dsp-recording-task-topic"], # 录屏监听topic - self.__kafka_config["topic"]["dsp-push-stream-task-topic"] # 推流监听topic - ) + + self.__producer = KafkaProducer( + bootstrap_servers=self.__kafka_config['bootstrap_servers'],#tencent yun + value_serializer=lambda v: v.encode('utf-8')) + + self.__gpuDics = { }#用于存储gpu信息的字典 + self.__role = self.__context["role"] + self.__topics = [ + self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"], # 实时监听topic + self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"], # 离线监听topic + self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"], # 图片监听topic + self.__kafka_config["topic"]["dsp-recording-task-topic"], # 录屏监听topic + self.__kafka_config["topic"]["dsp-push-stream-task-topic"] # 推流监听topic + ] + + self.__topicsPort = [ + self.__kafka_config["topicPort"]["dsp-alg-online-tasks-topic"], # 实时监听topic + self.__kafka_config["topicPort"]["dsp-alg-offline-tasks-topic"], # 离线监听topic + self.__kafka_config["topicPort"]["dsp-alg-image-tasks-topic"], # 图片监听topic + self.__kafka_config["topicPort"]["dsp-recording-task-topic"], # 录屏监听topic + self.__kafka_config["topicPort"]["dsp-push-stream-task-topic"] # 推流监听topic + ] + self.__gpuTopic = [self.__kafka_config["topicGPU"]] + + if self.__role==1: + self.__topics = self.__topics + self.__topicsPort + self.__gpuTopic + + # 对应topic的各个lambda表达式 self.__task_type = { self.__topics[0]: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y), @@ -78,13 +100,14 @@ class DispatcherService: lambda x, y, z: self.recording_method(x, y, z)), self.__topics[4]: (AnalysisType.PULLTOPUSH.value, lambda x, y: self.pullStream(x, y), lambda x, y, z: self.push_stream_method(x, y, z)) + } self.__taskType={ - "dsp-alg-online-tasks":0, - "dsp-alg-offline-tasks":1, - "dsp-alg-image-tasks":2, - "dsp-recording-task":3, - "dsp-push-stream-task":4 + self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"]:0, # 实时监听topic + self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"]:1, # 离线监听topic + self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"]:2, # 图片监听topic + self.__kafka_config["topic"]["dsp-recording-task-topic"]:3, # 录屏监听topic + self.__kafka_config["topic"]["dsp-push-stream-task-topic"]:4 # 推流监听topic } gpu_name_array = get_first_gpu_name() gpu_array = [g for g in ('3090', '2080', '4090', 'A10') if g in gpu_name_array] @@ -102,6 +125,8 @@ class DispatcherService: def start_service(self): # 初始化kafka监听者 customerKafkaConsumer = CustomerKafkaConsumer(self.__kafka_config, topics=self.__topics) + ####增加一个线程,用于试试监控和发送gpu状态#### + #### logger.info("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 服务器IP:{}".format(self.__kafka_config['bootstrap_servers'] )) while True: try: @@ -109,21 +134,52 @@ class DispatcherService: self.check_process_task() # 启动反馈线程 self.start_feedback_thread() + self.start_uploadGPU_thread() msg = customerKafkaConsumer.poll() if msg is not None and len(msg) > 0: for k, v in msg.items(): for m in v: message = m.value - requestId = message.get("request_id") - if requestId is None: - logger.error("请求参数格式错误, 请检查请求体格式是否正确!message:%s"%(message)) + #如果收到的信息是gpu状态的话,收到信息后,更新自己的gpu服务器状态,下面不再执行 + if m.topic in self.__gpuTopic: + customerKafkaConsumer.commit_offset(m,'x'*16,False) + #更新机器资源现状 + ip = message['System']['Local IP Address'] + self.__gpuDics[ip]=message continue - customerKafkaConsumer.commit_offset(m, requestId) - logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}", - m.topic, m.offset, m.partition, message, requestId) - message['taskType']=self.__taskType[m.topic] - topic_method = self.__task_type[m.topic] - topic_method[2](topic_method[1], message, topic_method[0]) + #如果收到的信息是门户消息,收到信息后,要根据Gpu状态,转发到对应的机器。 + elif m.topic in self.__topicsPort: + customerKafkaConsumer.commit_offset(m, 'y'*16) + #状态分析 + #recondGpu={'hostname':'thsw2','IP':'192.168.10.66','gpuId':0} + recondGpu= select_best_server(self.__gpuDics) + if recondGpu is None: + print( 'recondGpu:',recondGpu, ' self.__gpuDics: ',self.__gpuDics,' topic:',m.topic, ' message:',message ) + continue + #转发消息 + message['transmit_topic'] = m.topic + '-' + recondGpu['IP'] + transmitMsg={'transmit':message} + msg_json = json.dumps( message ) + future = self.__producer.send( message['transmit_topic'] ,msg_json) + try: + future.get(timeout=2) + logger.info( "转发消息成功,消息topic:{},消息内容:{}",message['transmit_topic'],message ) + except kafka_errors as e: + print('------transmitted error:',e) + logger.info("转发消息失败") + traceback.format_exc() + else: + requestId = message.get("request_id") + if requestId is None: + logger.error("请求参数格式错误, 请检查请求体格式是否正确!message:%s"%(message)) + continue + customerKafkaConsumer.commit_offset(m, requestId) + logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}", + m.topic, m.offset, m.partition, message, requestId) + + message['taskType']=self.__taskType[m.topic] + topic_method = self.__task_type[m.topic] + topic_method[2](topic_method[1], message, topic_method[0]) else: print_gpu_ex_status() print_cpu_ex_status(self.__context["base_dir"]) @@ -303,6 +359,19 @@ class DispatcherService: self.__feedbackThread.start() time.sleep(1) + def start_uploadGPU_thread(self): + if self.__uploadGPUThread is None: + self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config) + self.__uploadGPUThread.setDaemon(True) + self.__uploadGPUThread.start() + time.sleep(1) + if self.__uploadGPUThread and not self.__uploadGPUThread.is_alive(): + logger.error("反馈线程异常停止, 开始重新启动反馈线程!!!!!") + self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config) + self.__uploadGPUThread.setDaemon(True) + self.__uploadGPUThread.start() + time.sleep(1) + ''' 在线分析逻辑 ''' diff --git a/service/__pycache__/Dispatcher.cpython-38.pyc b/service/__pycache__/Dispatcher.cpython-38.pyc index 114829c..1919f51 100644 Binary files a/service/__pycache__/Dispatcher.cpython-38.pyc and b/service/__pycache__/Dispatcher.cpython-38.pyc differ diff --git a/util/AliyunSdk.py b/util/AliyunSdk.py index 6dc33fa..3671b4e 100644 --- a/util/AliyunSdk.py +++ b/util/AliyunSdk.py @@ -46,7 +46,8 @@ class AliyunOssSdk: try: self.get_oss_bucket() self.bucket.put_object(updatePath, fileByte) - logger.info("上传文件到oss成功! requestId:{}", request_id) + logger.info("上传文件到oss成功! requestId:{},{}".format( request_id, updatePath)) + return updatePath break except Exception as e: retry_count += 1 @@ -146,4 +147,4 @@ class ThAliyunVodSdk: # if __name__ == "__main__": # aa = ThAliyunVodSdk('/home/th/tuo_heng/dev/tuoheng_alg', 'dev', "1") -# print(aa.get_play_info('6928821035b171ee9f3b6632b68f0102')) \ No newline at end of file +# print(aa.get_play_info('6928821035b171ee9f3b6632b68f0102')) diff --git a/util/GPUtils.py b/util/GPUtils.py index 018128b..ec528ce 100644 --- a/util/GPUtils.py +++ b/util/GPUtils.py @@ -59,7 +59,27 @@ def print_gpu_ex_status(requestId=None): except Exception: logger.error("打印gpu状态异常: {}", format_exc()) return result +def select_best_server(servers): + best_server_info = None + lowest_memory_usage = float('inf') # 初始化为无穷大 + for ip, server_info in servers.items(): + gpu_list = server_info['GPU'] + for gpu in gpu_list: + if gpu['ID'] == 0: + memory_used = gpu['Memory Used'] + memory_total = gpu['Memory Total'] + memory_usage = (memory_used / memory_total) * 100 # 计算显存使用率 + + if memory_usage < lowest_memory_usage: + lowest_memory_usage = memory_usage + best_server_info = { + 'hostname': server_info['System']['Platform Node'], + 'IP': server_info['System']['Local IP Address'], + 'gpuId': gpu['ID'] + } + + return best_server_info def print_gpu_status(requestId=None): try: diff --git a/util/KafkaUtils.py b/util/KafkaUtils.py index e890796..b8e83c7 100644 --- a/util/KafkaUtils.py +++ b/util/KafkaUtils.py @@ -136,7 +136,7 @@ class CustomerKafkaConsumer: logger.error("消费者拉取消息异常: {}", format_exc()) return msg - def commit_offset(self, message, request_id): + def commit_offset(self, message, request_id,log=True): retry_num = 0 topic = message.topic offset = message.offset + 1 @@ -144,20 +144,20 @@ class CustomerKafkaConsumer: while True: try: self.subscribe() - logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, + if log: logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, request_id) tp = TopicPartition(topic=topic, partition=partition) self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(offset, None))}) - logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, + if log: logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, request_id) break except Exception: self.customerConsumer = None - logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id) + if log: logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id) time.sleep(1) retry_num += 1 if retry_num > 3: - logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id) + if log : logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id) break # if __name__=="__main__": diff --git a/util/__pycache__/AliyunSdk.cpython-38.pyc b/util/__pycache__/AliyunSdk.cpython-38.pyc index 75c4833..ff407b8 100644 Binary files a/util/__pycache__/AliyunSdk.cpython-38.pyc and b/util/__pycache__/AliyunSdk.cpython-38.pyc differ diff --git a/util/__pycache__/GPUtils.cpython-38.pyc b/util/__pycache__/GPUtils.cpython-38.pyc index 41de0c1..46b5a42 100644 Binary files a/util/__pycache__/GPUtils.cpython-38.pyc and b/util/__pycache__/GPUtils.cpython-38.pyc differ diff --git a/util/__pycache__/KafkaUtils.cpython-38.pyc b/util/__pycache__/KafkaUtils.cpython-38.pyc index f3ce915..96daf51 100644 Binary files a/util/__pycache__/KafkaUtils.cpython-38.pyc and b/util/__pycache__/KafkaUtils.cpython-38.pyc differ