分布式GPU

This commit is contained in:
th 2025-03-29 17:25:34 +08:00
parent c0079fc9f5
commit a424592a7e
15 changed files with 297 additions and 40 deletions

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
#ne -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process
@ -384,7 +384,6 @@ class OffPushStreamProcess(PushStreamProcess):
if len(thread_p) > 0:
for r in thread_p:
r.result()
print('----line384:',self._algSwitch,self._algStatus)
if self._algSwitch and (not self._algStatus):
frame_merge = video_conjuncing(frame, frame.copy())
else:

153
concurrency/uploadGPU.py Normal file
View File

@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
from threading import Thread
from time import sleep, time
from traceback import format_exc
#from common.Constant import init_progess
import json,os,psutil,GPUtil,platform,socket
from kafka import KafkaProducer, KafkaConsumer
#from util.KafkaUtils import CustomerKafkaProducer
from common.YmlConstant import service_yml_path, kafka_yml_path
class uploadGPUinfos(Thread):
__slots__ = ('__kafka_config', "_context")
def __init__(self, *args):
super().__init__()
self.__context,self.__kafka_config = args
self.__uploadInterval = self.__context['GPUpollInterval']
#kafkaProducer = CustomerKafkaProducer(self.__kafka_config)
self.__producer = KafkaProducer(
bootstrap_servers=self.__kafka_config['bootstrap_servers'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'))
self.__topic = self.__kafka_config["topicGPU"]
def run(self):
while True:
try:
#获取当前的gpu状态信息
msg_dict = get_system_info()
#发送GPU状态到指定的topic
msg = json.dumps(msg_dict)
# 假设生产的消息为键值对不是一定要键值对且序列化方式为json
#future = kafkaProducer.sender(topic_on,msg)
future = self.__producer .send(self.__topic,msg)
try:
future.get(timeout=10)
except kafka_errors:
traceback.format_exc()
sleep(self.__uploadInterval)
except Exception as e:
print(e)
continue
#logger.error("上传GPU服务器线程状态异常:{}, requestId:{}", format_exc(), request_id)
def get_system_info():
# 初始化一个字典来存储系统信息
system_info = {}
# 获取CPU信息
system_info['CPU'] = {
'Physical Cores': psutil.cpu_count(logical=False), # 物理核心数
'Logical Cores': psutil.cpu_count(logical=True), # 逻辑核心数
'Current Frequency': psutil.cpu_freq().current, # 当前频率
'Usage Per Core': psutil.cpu_percent(interval=1, percpu=True), # 每个核心的使用率
'Total Usage': psutil.cpu_percent(interval=1) # 总体CPU使用率
}
# 获取内存信息
memory = psutil.virtual_memory()
system_info['Memory'] = {
'Total': memory.total / (1024 ** 3), # 总内存单位为GB
'Available': memory.available / (1024 ** 3), # 可用内存
'Used': memory.used / (1024 ** 3), # 已用内存
'Usage Percentage': memory.percent # 内存使用率
}
# 获取GPU信息
gpus = GPUtil.getGPUs()
system_info['GPU'] = []
for gpu in gpus:
gpu_info = {
'ID': gpu.id,
'Name': gpu.name,
'Load': gpu.load * 100, # GPU负载百分比
'Memory Total': gpu.memoryTotal, # 总显存单位为MB
'Memory Used': gpu.memoryUsed, # 已用显存
'Memory Free': gpu.memoryFree, # 可用显存
'Temperature': gpu.temperature # GPU温度
}
system_info['GPU'].append(gpu_info)
# 获取系统信息
system_info['System'] = {
'Platform': platform.system(), # 操作系统类型
'Platform Version': platform.version(), # 操作系统版本
'Platform Release': platform.release(), # 操作系统发行版本
'Platform Node': platform.node(), # 网络名称
'Machine': platform.machine(), # 硬件架构
'Processor': platform.processor() # CPU架构
}
# 获取本机局域网IP地址非回环地址
try:
# 获取所有网络接口信息
net_if_addrs = psutil.net_if_addrs()
for interface, addrs in net_if_addrs.items():
for addr in addrs:
# 筛选IPv4地址且非回环地址
if addr.family == socket.AF_INET and not addr.address.startswith("127."):
system_info['System']['Local IP Address'] = addr.address
break
if 'Local IP Address' in system_info['System']:
break
else:
system_info['System']['Local IP Address'] = "No local IP found"
except Exception as e:
system_info['System']['Local IP Address'] = "Unable to retrieve local IP address"
return system_info
if __name__=="__main__":
context = {
'GPUpollInterval':1,
'topic':'server-status',
}
kafka_config = {
'bootstrap_servers':['192.168.10.66:9092']
}
base_dir, env = '/home/thsw2/WJ/test/tuoheng_algN','test'
upload_thread = uploadGPUinfos(context,kafka_config)
upload_thread.setDaemon(False)
upload_thread.start()
# 主线程等待守护线程运行
try:
while True:
sleep(1)
except KeyboardInterrupt:
print("主线程退出")

View File

@ -1,5 +1,6 @@
bootstrap_servers: ["106.14.96.218:19092"]
topic:
topicGPU: "server-status"
topicPort:
dsp-alg-online-tasks-topic: "dsp-alg-online-tasks"
dsp-alg-offline-tasks-topic: "dsp-alg-offline-tasks"
dsp-alg-image-tasks-topic: "dsp-alg-image-tasks"
@ -8,6 +9,15 @@ topic:
dsp-recording-result-topic: "dsp-recording-result"
dsp-push-stream-task-topic: "dsp-push-stream-task"
dsp-push-stream-result-topic: "dsp-push-stream-result"
topic:
dsp-alg-online-tasks-topic: "dsp-alg-online-tasks-192.168.11.7"
dsp-alg-offline-tasks-topic: "dsp-alg-offline-tasks-192.168.11.7"
dsp-alg-image-tasks-topic: "dsp-alg-image-tasks-192.168.11.7"
dsp-alg-results-topic: "dsp-alg-task-results"
dsp-recording-task-topic: "dsp-recording-task-192.168.11.7"
dsp-recording-result-topic: "dsp-recording-result"
dsp-push-stream-task-topic: "dsp-push-stream-task-192.168.11.7"
dsp-push-stream-result-topic: "dsp-push-stream-result"
producer:
acks: -1
retries: 3

View File

@ -3,6 +3,10 @@ video:
file_path: "../dsp/video/"
# 是否添加水印
video_add_water: false
#0-slave,1--master
role : 1
#gpu信息上报间隔
GPUpollInterval: 2
service:
filter:
# 图片得分多少分以上返回图片
@ -27,7 +31,7 @@ service:
image:
limit: 20
#storage source,0--aliyun,1--minio
storage_source: 1
storage_source: 0
#是否启用mqtt0--不用1--启用
mqtt_flag: 0
#是否启用alg控制功能

View File

@ -126,7 +126,8 @@ class ModelType(Enum):
'seg_nclass': 3,
'segRegionCnt': 2,
'segPar': {
'modelSize': (640, 360),
#'modelSize': (640, 360),
'modelSize': (1920, 1080),
'mean': (0.485, 0.456, 0.406),
'std': (0.229, 0.224, 0.225),
'predResize': True,
@ -135,8 +136,8 @@ class ModelType(Enum):
'mixFunction': {
'function': tracfficAccidentMixFunction,
'pars': {
'modelSize': (640, 360),
#'modelSize': (1920,1080),
#'modelSize': (640, 360),
'modelSize': (1920,1080),
'RoadArea': 16000,
'roadVehicleAngle': 15,
'speedRoadVehicleAngleMax': 75,

View File

@ -2,13 +2,14 @@
import time,os
from os.path import join
from traceback import format_exc
import json
from cerberus import Validator
from common.Constant import ONLINE_START_SCHEMA, ONLINE_STOP_SCHEMA, OFFLINE_START_SCHEMA, OFFLINE_STOP_SCHEMA, \
IMAGE_SCHEMA, RECORDING_START_SCHEMA, RECORDING_STOP_SCHEMA, PULL2PUSH_START_SCHEMA, PULL2PUSH_STOP_SCHEMA
from common.YmlConstant import service_yml_path, kafka_yml_path
from concurrency.FeedbackThread import FeedbackThread
from concurrency.uploadGPU import uploadGPUinfos
from concurrency.IntelligentRecognitionProcess2 import OnlineIntelligentRecognitionProcess2, \
OfflineIntelligentRecognitionProcess2, PhotosIntelligentRecognitionProcess2
from concurrency.Pull2PushStreamProcess import PushStreamProcess
@ -26,11 +27,11 @@ from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecogniti
OfflineIntelligentRecognitionProcess, PhotosIntelligentRecognitionProcess, ScreenRecordingProcess
from util.CpuUtils import print_cpu_ex_status
from util.FileUtils import create_dir_not_exist
from util.GPUtils import get_first_gpu_name, print_gpu_ex_status, check_cude_is_available
from util.GPUtils import get_first_gpu_name, print_gpu_ex_status, check_cude_is_available,select_best_server
from util.KafkaUtils import CustomerKafkaConsumer
from util.QueUtil import put_queue
from util.RWUtils import getConfigs
from kafka import KafkaProducer, KafkaConsumer
'''
分发服务
'''
@ -38,7 +39,7 @@ from util.RWUtils import getConfigs
class DispatcherService:
__slots__ = ('__context', '__feedbackThread', '__listeningProcesses', '__fbQueue', '__topics','__taskType', '__task_type',
'__kafka_config', '__recordingProcesses', '__pull2PushProcesses')
'__kafka_config', '__recordingProcesses', '__pull2PushProcesses','__topicsPort','__gpuTopic','__role','__uploadGPUThread','__gpuDics','__producer')
def __init__(self, base_dir, env):
# 检测cuda是否活动
@ -51,7 +52,7 @@ class DispatcherService:
self.__context["base_dir"], self.__context["env"] = base_dir, env
# 问题反馈线程
self.__feedbackThread, self.__fbQueue = None, Queue()
self.__feedbackThread,self.__uploadGPUThread, self.__fbQueue = None,None, Queue()
# 实时、离线、图片任务进程字典
self.__listeningProcesses = {}
# 录屏任务进程字典
@ -59,13 +60,34 @@ class DispatcherService:
# 转推流任务进程字典
self.__pull2PushProcesses = {}
self.__kafka_config = getConfigs(join(base_dir, kafka_yml_path % env))
self.__topics = (
self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"], # 实时监听topic
self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"], # 离线监听topic
self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"], # 图片监听topic
self.__kafka_config["topic"]["dsp-recording-task-topic"], # 录屏监听topic
self.__kafka_config["topic"]["dsp-push-stream-task-topic"] # 推流监听topic
)
self.__producer = KafkaProducer(
bootstrap_servers=self.__kafka_config['bootstrap_servers'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'))
self.__gpuDics = { }#用于存储gpu信息的字典
self.__role = self.__context["role"]
self.__topics = [
self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"], # 实时监听topic
self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"], # 离线监听topic
self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"], # 图片监听topic
self.__kafka_config["topic"]["dsp-recording-task-topic"], # 录屏监听topic
self.__kafka_config["topic"]["dsp-push-stream-task-topic"] # 推流监听topic
]
self.__topicsPort = [
self.__kafka_config["topicPort"]["dsp-alg-online-tasks-topic"], # 实时监听topic
self.__kafka_config["topicPort"]["dsp-alg-offline-tasks-topic"], # 离线监听topic
self.__kafka_config["topicPort"]["dsp-alg-image-tasks-topic"], # 图片监听topic
self.__kafka_config["topicPort"]["dsp-recording-task-topic"], # 录屏监听topic
self.__kafka_config["topicPort"]["dsp-push-stream-task-topic"] # 推流监听topic
]
self.__gpuTopic = [self.__kafka_config["topicGPU"]]
if self.__role==1:
self.__topics = self.__topics + self.__topicsPort + self.__gpuTopic
# 对应topic的各个lambda表达式
self.__task_type = {
self.__topics[0]: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y),
@ -78,13 +100,14 @@ class DispatcherService:
lambda x, y, z: self.recording_method(x, y, z)),
self.__topics[4]: (AnalysisType.PULLTOPUSH.value, lambda x, y: self.pullStream(x, y),
lambda x, y, z: self.push_stream_method(x, y, z))
}
self.__taskType={
"dsp-alg-online-tasks":0,
"dsp-alg-offline-tasks":1,
"dsp-alg-image-tasks":2,
"dsp-recording-task":3,
"dsp-push-stream-task":4
self.__kafka_config["topic"]["dsp-alg-online-tasks-topic"]:0, # 实时监听topic
self.__kafka_config["topic"]["dsp-alg-offline-tasks-topic"]:1, # 离线监听topic
self.__kafka_config["topic"]["dsp-alg-image-tasks-topic"]:2, # 图片监听topic
self.__kafka_config["topic"]["dsp-recording-task-topic"]:3, # 录屏监听topic
self.__kafka_config["topic"]["dsp-push-stream-task-topic"]:4 # 推流监听topic
}
gpu_name_array = get_first_gpu_name()
gpu_array = [g for g in ('3090', '2080', '4090', 'A10') if g in gpu_name_array]
@ -102,6 +125,8 @@ class DispatcherService:
def start_service(self):
# 初始化kafka监听者
customerKafkaConsumer = CustomerKafkaConsumer(self.__kafka_config, topics=self.__topics)
####增加一个线程用于试试监控和发送gpu状态####
####
logger.info("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 服务器IP:{}".format(self.__kafka_config['bootstrap_servers'] ))
while True:
try:
@ -109,21 +134,52 @@ class DispatcherService:
self.check_process_task()
# 启动反馈线程
self.start_feedback_thread()
self.start_uploadGPU_thread()
msg = customerKafkaConsumer.poll()
if msg is not None and len(msg) > 0:
for k, v in msg.items():
for m in v:
message = m.value
requestId = message.get("request_id")
if requestId is None:
logger.error("请求参数格式错误, 请检查请求体格式是否正确message:%s"%(message))
#如果收到的信息是gpu状态的话收到信息后更新自己的gpu服务器状态下面不再执行
if m.topic in self.__gpuTopic:
customerKafkaConsumer.commit_offset(m,'x'*16,False)
#更新机器资源现状
ip = message['System']['Local IP Address']
self.__gpuDics[ip]=message
continue
customerKafkaConsumer.commit_offset(m, requestId)
logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}",
m.topic, m.offset, m.partition, message, requestId)
message['taskType']=self.__taskType[m.topic]
topic_method = self.__task_type[m.topic]
topic_method[2](topic_method[1], message, topic_method[0])
#如果收到的信息是门户消息收到信息后要根据Gpu状态转发到对应的机器。
elif m.topic in self.__topicsPort:
customerKafkaConsumer.commit_offset(m, 'y'*16)
#状态分析
#recondGpu={'hostname':'thsw2','IP':'192.168.10.66','gpuId':0}
recondGpu= select_best_server(self.__gpuDics)
if recondGpu is None:
print( 'recondGpu',recondGpu, ' self.__gpuDics: ',self.__gpuDics,' topic:',m.topic, ' message:',message )
continue
#转发消息
message['transmit_topic'] = m.topic + '-' + recondGpu['IP']
transmitMsg={'transmit':message}
msg_json = json.dumps( message )
future = self.__producer.send( message['transmit_topic'] ,msg_json)
try:
future.get(timeout=2)
logger.info( "转发消息成功消息topic:{},消息内容:{}",message['transmit_topic'],message )
except kafka_errors as e:
print('------transmitted error:',e)
logger.info("转发消息失败")
traceback.format_exc()
else:
requestId = message.get("request_id")
if requestId is None:
logger.error("请求参数格式错误, 请检查请求体格式是否正确message:%s"%(message))
continue
customerKafkaConsumer.commit_offset(m, requestId)
logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}",
m.topic, m.offset, m.partition, message, requestId)
message['taskType']=self.__taskType[m.topic]
topic_method = self.__task_type[m.topic]
topic_method[2](topic_method[1], message, topic_method[0])
else:
print_gpu_ex_status()
print_cpu_ex_status(self.__context["base_dir"])
@ -303,6 +359,19 @@ class DispatcherService:
self.__feedbackThread.start()
time.sleep(1)
def start_uploadGPU_thread(self):
if self.__uploadGPUThread is None:
self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config)
self.__uploadGPUThread.setDaemon(True)
self.__uploadGPUThread.start()
time.sleep(1)
if self.__uploadGPUThread and not self.__uploadGPUThread.is_alive():
logger.error("反馈线程异常停止, 开始重新启动反馈线程!!!!!")
self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config)
self.__uploadGPUThread.setDaemon(True)
self.__uploadGPUThread.start()
time.sleep(1)
'''
在线分析逻辑
'''

View File

@ -46,7 +46,8 @@ class AliyunOssSdk:
try:
self.get_oss_bucket()
self.bucket.put_object(updatePath, fileByte)
logger.info("上传文件到oss成功! requestId:{}", request_id)
logger.info("上传文件到oss成功! requestId:{},{}".format( request_id, updatePath))
return updatePath
break
except Exception as e:
retry_count += 1
@ -146,4 +147,4 @@ class ThAliyunVodSdk:
# if __name__ == "__main__":
# aa = ThAliyunVodSdk('/home/th/tuo_heng/dev/tuoheng_alg', 'dev', "1")
# print(aa.get_play_info('6928821035b171ee9f3b6632b68f0102'))
# print(aa.get_play_info('6928821035b171ee9f3b6632b68f0102'))

View File

@ -59,7 +59,27 @@ def print_gpu_ex_status(requestId=None):
except Exception:
logger.error("打印gpu状态异常: {}", format_exc())
return result
def select_best_server(servers):
best_server_info = None
lowest_memory_usage = float('inf') # 初始化为无穷大
for ip, server_info in servers.items():
gpu_list = server_info['GPU']
for gpu in gpu_list:
if gpu['ID'] == 0:
memory_used = gpu['Memory Used']
memory_total = gpu['Memory Total']
memory_usage = (memory_used / memory_total) * 100 # 计算显存使用率
if memory_usage < lowest_memory_usage:
lowest_memory_usage = memory_usage
best_server_info = {
'hostname': server_info['System']['Platform Node'],
'IP': server_info['System']['Local IP Address'],
'gpuId': gpu['ID']
}
return best_server_info
def print_gpu_status(requestId=None):
try:

View File

@ -136,7 +136,7 @@ class CustomerKafkaConsumer:
logger.error("消费者拉取消息异常: {}", format_exc())
return msg
def commit_offset(self, message, request_id):
def commit_offset(self, message, request_id,log=True):
retry_num = 0
topic = message.topic
offset = message.offset + 1
@ -144,20 +144,20 @@ class CustomerKafkaConsumer:
while True:
try:
self.subscribe()
logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
if log: logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
request_id)
tp = TopicPartition(topic=topic, partition=partition)
self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(offset, None))})
logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
if log: logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
request_id)
break
except Exception:
self.customerConsumer = None
logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id)
if log: logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id)
time.sleep(1)
retry_num += 1
if retry_num > 3:
logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id)
if log : logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id)
break
# if __name__=="__main__":