From a424592a7e55fd72d4a5c5704dc6fc190ebe76bd Mon Sep 17 00:00:00 2001 From: th Date: Sat, 29 Mar 2025 17:25:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E5=B8=83=E5=BC=8FGPU?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- concurrency/PushVideoStreamProcess.py | 3 +- .../PushVideoStreamProcess.cpython-38.pyc | Bin 12382 -> 12343 bytes concurrency/uploadGPU.py | 153 ++++++++++++++++++ config/kafka/dsp_test_kafka.yml | 12 +- config/service/dsp_test_service.yml | 6 +- enums/ModelTypeEnum.py | 7 +- .../__pycache__/ModelTypeEnum.cpython-38.pyc | Bin 16924 -> 16931 bytes service/Dispatcher.py | 121 +++++++++++--- service/__pycache__/Dispatcher.cpython-38.pyc | Bin 16407 -> 18245 bytes util/AliyunSdk.py | 5 +- util/GPUtils.py | 20 +++ util/KafkaUtils.py | 10 +- util/__pycache__/AliyunSdk.cpython-38.pyc | Bin 5675 -> 5692 bytes util/__pycache__/GPUtils.cpython-38.pyc | Bin 2871 -> 3390 bytes util/__pycache__/KafkaUtils.cpython-38.pyc | Bin 5987 -> 6022 bytes 15 files changed, 297 insertions(+), 40 deletions(-) create mode 100644 concurrency/uploadGPU.py diff --git a/concurrency/PushVideoStreamProcess.py b/concurrency/PushVideoStreamProcess.py index 6788d61..2004da7 100644 --- a/concurrency/PushVideoStreamProcess.py +++ b/concurrency/PushVideoStreamProcess.py @@ -1,4 +1,4 @@ -# -*- coding: utf-8 -*- +#ne -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor from multiprocessing import Process @@ -384,7 +384,6 @@ class OffPushStreamProcess(PushStreamProcess): if len(thread_p) > 0: for r in thread_p: r.result() - print('----line384:',self._algSwitch,self._algStatus) if self._algSwitch and (not self._algStatus): frame_merge = video_conjuncing(frame, frame.copy()) else: diff --git a/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc b/concurrency/__pycache__/PushVideoStreamProcess.cpython-38.pyc index c67b250ac53168c0d505e63c11a50fe0100750dc..27cbd8c70d292e5b7649e3c1eea4d4a49bb580d3 100644 GIT binary patch delta 1072 zcmY*Y%}X0m5PxrDzQm+4i6%kfS7Krk?U%-{#>T{WCpdJYt&#!F5kEk4TqF z4=pTRdX){m^-$12d-LGILk~T8utkdW)=U40&f6@K?!xTM{C+cUX5Q@O#@7ws6Q8e> z;M2GDXH!1(eT-AdIE-I7-{_v*4nyxBqCbtJ{trWEYCpI%%Ht4cME2?)Lsc&HaH%M+Hc2LU$@D{CaigPD-7GBqYt@2GJy5*y^^sp+|G@9EQ3Sf`{kO zQ)B^_e&^s}k(!a$5*w{bP9laeO&p{kcm)s#%(pWT&&bXOwZ>*=VBNg5Nq8A@6|$D% zi?GN>?V-Aq`ZDX{B`BLYbZ13xl<2PQWhmjqT_QxMYma#pqIQrIi`-M)Xd@eMD-U@n z%Wm|3#<)E-ZDBEiY1F*Sk~p7U$e3f$i)D!^xs+co31pzRfHDBRF@j(oX|tF#XB;J; z-5(R-R-$pCRV7PLTzRjzyqg_-0=uhR6({L#MaPCJFl!yL5_Yp?S?l(kbpd(mwOF?hmk__*hQDInSNMdDPqyKv z6`5}#x@Y^0O|_vX;dU6C2nvd^pGpt)Q^J=h*&Rei51WRiTCr6gY74?LzNaF&lH93# zl~qc_+hVF^&2B`E)D~+ou>?Xh0_ik;Z_K0<^>ejLBRnD`M52WGWrA7G!EO2)1ji%k z-W_528^Zrh_imr7*H}msTjAY402jEyTXoj`9rjL7&-ECYmC$+f9}nl{|#a z0X_;@XaYA}0bAK2rYzl<@wX>fTIRiwD0FGm%4%12Nbpo)oW=4=n1}=g1Kn=<-;UWuAla=V9TOOOCRv}-nz zL|2FE#pFrUshZf(JWN=roa!fIseh_P z$0f&c#ol6rY|D7lzgRlqKC_Ko$CLAg37!QW6}wTDtn*Wu8_0c^Wi^brg7`=szRy}O z@pKK()Zy~*0NHEAb zI89HP;B*w-wJW9OvecjIuAOt6t10j-~cDM@;1DqW)b3E2-T23tFnYPHc^`c zxkKWUOX@oQ1=O@8tCHNSe2p#B<~?@`pT}F<4@bt+lg3qC%tFPCQ|dgmKT)@~d8M5A QLFp^w+2BcKF?FH*0}aeP-T(jq diff --git a/concurrency/uploadGPU.py b/concurrency/uploadGPU.py new file mode 100644 index 0000000..a85c0a7 --- /dev/null +++ b/concurrency/uploadGPU.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +from threading import Thread +from time import sleep, time +from traceback import format_exc + +#from common.Constant import init_progess +import json,os,psutil,GPUtil,platform,socket +from kafka import KafkaProducer, KafkaConsumer +#from util.KafkaUtils import CustomerKafkaProducer +from common.YmlConstant import service_yml_path, kafka_yml_path +class uploadGPUinfos(Thread): + __slots__ = ('__kafka_config', "_context") + + def __init__(self, *args): + super().__init__() + self.__context,self.__kafka_config = args + self.__uploadInterval = self.__context['GPUpollInterval'] + #kafkaProducer = CustomerKafkaProducer(self.__kafka_config) + self.__producer = KafkaProducer( + bootstrap_servers=self.__kafka_config['bootstrap_servers'],#tencent yun + value_serializer=lambda v: v.encode('utf-8')) + + self.__topic = self.__kafka_config["topicGPU"] + def run(self): + while True: + + try: + #获取当前的gpu状态信息 + msg_dict = get_system_info() + #发送GPU状态到指定的topic + msg = json.dumps(msg_dict) + + # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json + + #future = kafkaProducer.sender(topic_on,msg) + future = self.__producer .send(self.__topic,msg) + try: + future.get(timeout=10) + except kafka_errors: + traceback.format_exc() + + sleep(self.__uploadInterval) + + except Exception as e: + print(e) + continue + #logger.error("上传GPU服务器线程状态异常:{}, requestId:{}", format_exc(), request_id) + + + + +def get_system_info(): + # 初始化一个字典来存储系统信息 + system_info = {} + + # 获取CPU信息 + system_info['CPU'] = { + 'Physical Cores': psutil.cpu_count(logical=False), # 物理核心数 + 'Logical Cores': psutil.cpu_count(logical=True), # 逻辑核心数 + 'Current Frequency': psutil.cpu_freq().current, # 当前频率 + 'Usage Per Core': psutil.cpu_percent(interval=1, percpu=True), # 每个核心的使用率 + 'Total Usage': psutil.cpu_percent(interval=1) # 总体CPU使用率 + } + + # 获取内存信息 + memory = psutil.virtual_memory() + system_info['Memory'] = { + 'Total': memory.total / (1024 ** 3), # 总内存,单位为GB + 'Available': memory.available / (1024 ** 3), # 可用内存 + 'Used': memory.used / (1024 ** 3), # 已用内存 + 'Usage Percentage': memory.percent # 内存使用率 + } + + # 获取GPU信息 + gpus = GPUtil.getGPUs() + system_info['GPU'] = [] + for gpu in gpus: + gpu_info = { + 'ID': gpu.id, + 'Name': gpu.name, + 'Load': gpu.load * 100, # GPU负载,百分比 + 'Memory Total': gpu.memoryTotal, # 总显存,单位为MB + 'Memory Used': gpu.memoryUsed, # 已用显存 + 'Memory Free': gpu.memoryFree, # 可用显存 + 'Temperature': gpu.temperature # GPU温度 + } + system_info['GPU'].append(gpu_info) + + # 获取系统信息 + system_info['System'] = { + 'Platform': platform.system(), # 操作系统类型 + 'Platform Version': platform.version(), # 操作系统版本 + 'Platform Release': platform.release(), # 操作系统发行版本 + 'Platform Node': platform.node(), # 网络名称 + 'Machine': platform.machine(), # 硬件架构 + 'Processor': platform.processor() # CPU架构 + } + + # 获取本机局域网IP地址(非回环地址) + try: + # 获取所有网络接口信息 + net_if_addrs = psutil.net_if_addrs() + for interface, addrs in net_if_addrs.items(): + for addr in addrs: + # 筛选IPv4地址且非回环地址 + if addr.family == socket.AF_INET and not addr.address.startswith("127."): + system_info['System']['Local IP Address'] = addr.address + break + if 'Local IP Address' in system_info['System']: + break + else: + system_info['System']['Local IP Address'] = "No local IP found" + except Exception as e: + system_info['System']['Local IP Address'] = "Unable to retrieve local IP address" + + return system_info + + + + +if __name__=="__main__": + + + context = { + 'GPUpollInterval':1, + 'topic':'server-status', + } + kafka_config = { + 'bootstrap_servers':['192.168.10.66:9092'] + } + + base_dir, env = '/home/thsw2/WJ/test/tuoheng_algN','test' + + + + + + upload_thread = uploadGPUinfos(context,kafka_config) + upload_thread.setDaemon(False) + + upload_thread.start() + + + # 主线程等待守护线程运行 + try: + while True: + sleep(1) + except KeyboardInterrupt: + print("主线程退出") + + + + \ No newline at end of file diff --git a/config/kafka/dsp_test_kafka.yml b/config/kafka/dsp_test_kafka.yml index 7d34c3d..3ba39d8 100644 --- a/config/kafka/dsp_test_kafka.yml +++ b/config/kafka/dsp_test_kafka.yml @@ -1,5 +1,6 @@ bootstrap_servers: ["106.14.96.218:19092"] -topic: +topicGPU: "server-status" +topicPort: dsp-alg-online-tasks-topic: "dsp-alg-online-tasks" dsp-alg-offline-tasks-topic: "dsp-alg-offline-tasks" dsp-alg-image-tasks-topic: "dsp-alg-image-tasks" @@ -8,6 +9,15 @@ topic: dsp-recording-result-topic: "dsp-recording-result" dsp-push-stream-task-topic: "dsp-push-stream-task" dsp-push-stream-result-topic: "dsp-push-stream-result" +topic: + dsp-alg-online-tasks-topic: "dsp-alg-online-tasks-192.168.11.7" + dsp-alg-offline-tasks-topic: "dsp-alg-offline-tasks-192.168.11.7" + dsp-alg-image-tasks-topic: "dsp-alg-image-tasks-192.168.11.7" + dsp-alg-results-topic: "dsp-alg-task-results" + dsp-recording-task-topic: "dsp-recording-task-192.168.11.7" + dsp-recording-result-topic: "dsp-recording-result" + dsp-push-stream-task-topic: "dsp-push-stream-task-192.168.11.7" + dsp-push-stream-result-topic: "dsp-push-stream-result" producer: acks: -1 retries: 3 diff --git a/config/service/dsp_test_service.yml b/config/service/dsp_test_service.yml index 00646a5..e00daac 100644 --- a/config/service/dsp_test_service.yml +++ b/config/service/dsp_test_service.yml @@ -3,6 +3,10 @@ video: file_path: "../dsp/video/" # 是否添加水印 video_add_water: false +#0-slave,1--master +role : 1 +#gpu信息上报间隔 +GPUpollInterval: 2 service: filter: # 图片得分多少分以上返回图片 @@ -27,7 +31,7 @@ service: image: limit: 20 #storage source,0--aliyun,1--minio - storage_source: 1 + storage_source: 0 #是否启用mqtt,0--不用,1--启用 mqtt_flag: 0 #是否启用alg控制功能 diff --git a/enums/ModelTypeEnum.py b/enums/ModelTypeEnum.py index 85192f5..fca7d90 100644 --- a/enums/ModelTypeEnum.py +++ b/enums/ModelTypeEnum.py @@ -126,7 +126,8 @@ class ModelType(Enum): 'seg_nclass': 3, 'segRegionCnt': 2, 'segPar': { - 'modelSize': (640, 360), + #'modelSize': (640, 360), + 'modelSize': (1920, 1080), 'mean': (0.485, 0.456, 0.406), 'std': (0.229, 0.224, 0.225), 'predResize': True, @@ -135,8 +136,8 @@ class ModelType(Enum): 'mixFunction': { 'function': tracfficAccidentMixFunction, 'pars': { - 'modelSize': (640, 360), - #'modelSize': (1920,1080), + #'modelSize': (640, 360), + 'modelSize': (1920,1080), 'RoadArea': 16000, 'roadVehicleAngle': 15, 'speedRoadVehicleAngleMax': 75, diff --git a/enums/__pycache__/ModelTypeEnum.cpython-38.pyc b/enums/__pycache__/ModelTypeEnum.cpython-38.pyc index fc9c834b8b9e39ef41ccb0e1256e420f47b462a8..ebfb24c0d79725f73df2f7dd23cb9d2eb724b901 100644 GIT binary patch delta 353 zcmX|6-z!6L9KGkW+g)8Q-8(-~erq5(%T4FPjEU=o4!53F`U0*+m%PtT#p26qCLIEbytXpRmoj)6@vREXVR1f{|pdO!m| Ug`89!Cm1EFB;#LsSy5%eFM~#2g#Z8m delta 303 zcmXYr&nv@m7{~X1_Kj?PX1`)-X|2t7G1*M?Ew;!GQHT0@Grr3p?s_pE>-X#6^Y zgrsD=$7bxY^5v119p4LZAPfFx!kuITdEiYxgNMMkZ0mlYMM|NR8A*18FVQLYVV!82 zc*#2|dfymfmow2xY{+eNg23_{%Ml(WH@H=Alj_h6@WE5Y5q{*$$P&UbH*5el=8+;C k9Eq>O#b@zFSlO0%g- 0: for k, v in msg.items(): for m in v: message = m.value - requestId = message.get("request_id") - if requestId is None: - logger.error("请求参数格式错误, 请检查请求体格式是否正确!message:%s"%(message)) + #如果收到的信息是gpu状态的话,收到信息后,更新自己的gpu服务器状态,下面不再执行 + if m.topic in self.__gpuTopic: + customerKafkaConsumer.commit_offset(m,'x'*16,False) + #更新机器资源现状 + ip = message['System']['Local IP Address'] + self.__gpuDics[ip]=message continue - customerKafkaConsumer.commit_offset(m, requestId) - logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}", - m.topic, m.offset, m.partition, message, requestId) - message['taskType']=self.__taskType[m.topic] - topic_method = self.__task_type[m.topic] - topic_method[2](topic_method[1], message, topic_method[0]) + #如果收到的信息是门户消息,收到信息后,要根据Gpu状态,转发到对应的机器。 + elif m.topic in self.__topicsPort: + customerKafkaConsumer.commit_offset(m, 'y'*16) + #状态分析 + #recondGpu={'hostname':'thsw2','IP':'192.168.10.66','gpuId':0} + recondGpu= select_best_server(self.__gpuDics) + if recondGpu is None: + print( 'recondGpu:',recondGpu, ' self.__gpuDics: ',self.__gpuDics,' topic:',m.topic, ' message:',message ) + continue + #转发消息 + message['transmit_topic'] = m.topic + '-' + recondGpu['IP'] + transmitMsg={'transmit':message} + msg_json = json.dumps( message ) + future = self.__producer.send( message['transmit_topic'] ,msg_json) + try: + future.get(timeout=2) + logger.info( "转发消息成功,消息topic:{},消息内容:{}",message['transmit_topic'],message ) + except kafka_errors as e: + print('------transmitted error:',e) + logger.info("转发消息失败") + traceback.format_exc() + else: + requestId = message.get("request_id") + if requestId is None: + logger.error("请求参数格式错误, 请检查请求体格式是否正确!message:%s"%(message)) + continue + customerKafkaConsumer.commit_offset(m, requestId) + logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}", + m.topic, m.offset, m.partition, message, requestId) + + message['taskType']=self.__taskType[m.topic] + topic_method = self.__task_type[m.topic] + topic_method[2](topic_method[1], message, topic_method[0]) else: print_gpu_ex_status() print_cpu_ex_status(self.__context["base_dir"]) @@ -303,6 +359,19 @@ class DispatcherService: self.__feedbackThread.start() time.sleep(1) + def start_uploadGPU_thread(self): + if self.__uploadGPUThread is None: + self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config) + self.__uploadGPUThread.setDaemon(True) + self.__uploadGPUThread.start() + time.sleep(1) + if self.__uploadGPUThread and not self.__uploadGPUThread.is_alive(): + logger.error("反馈线程异常停止, 开始重新启动反馈线程!!!!!") + self.__uploadGPUThread = uploadGPUinfos(self.__context, self.__kafka_config) + self.__uploadGPUThread.setDaemon(True) + self.__uploadGPUThread.start() + time.sleep(1) + ''' 在线分析逻辑 ''' diff --git a/service/__pycache__/Dispatcher.cpython-38.pyc b/service/__pycache__/Dispatcher.cpython-38.pyc index 114829cd571f6773f9f30c7a3ef2ef21c1224f39..1919f517b27fcb1124cfaa2a0ba36f63f58e2285 100644 GIT binary patch delta 7716 zcmb6;ZFC#QakmHHZ~y@i`~tx*@JplwNu(splq{N(C{dKe=$n*d`)V74a7PN{9RTee zWQhii#Izf=jw30XCTY^xWNNEX8ploOuf~p@_C4p-P3!c-b^Go$KiX7w^rQ9b>wE2M zUE7`610f2P5(j)YH#<8sJ3G5GvwQr?XUH30Afda0LB9aM%dfmQ_1x5@P%HWIrFH!g zqDzXT3aVT88nPl20dS9gX{NJi)GD>SDd}}ay;5&9C=I;r)gwlu(#UaHj~Y!%6UTje zv(cio7_CaH(WbOE^V6zRKuPdN>}`dyUn~YL18W z;u<5a#CfAyUu&#W)*0)S^}HR{`;302pW`+924kbL(b%MH;_X`fHe<80nd5c(?Z$vI z!0~#0i?LPN%JByM4r80L%@|Y$jXRY)jUi=-(-%o|aCm$6&f zZHy=*#vWylF{+GmdXv7_*r)8{c(cCW7*obL-lBiXIG`Nhc&mQUIHVjh4l9R=uuo9i z)Yzkf8k-lDBPy8?;_XXskuFk0>`*?ZXOrr_qj#kDIg>m}vuZx2(Nc3Hn=$i-MpKt+n#e!1 z7d-pP7WR?ni=FE2j|xhgIsg`IwQKgI&0Nw-ozmz8C{L%fc$i(0zEM;HiG-ESrBmk7 zEVUGWA~BWAA4h^BB@#5NYf4QbQ8xVeDXJw^#mCw9fD}sPN*1MDxcy2$54=P0AkBP7 z6a-6{7l>dHODqt}RS+Y>yg1L!c?XKBU`g}9C<^CE!V8ZKk1vfaKZjKu21>HQk}QG# zkX3!2OovoYlb}kf7yK?iAp$+jC2FA0t@@f!Q!VH7178)kK?d;v;Kk)ea79OG1zz1G zOnE>>9Ty9)P=PqAKn;j5Ye0Ni1LDgX>N#f(=tJWhpjE5Z7H7nZgr0;34p85S>KYL8 z0qfMdCc%nKH%>=CM<$6{53`gx)_~Zg3+2>W&eK%liIjO7SMWqRPjiWy3h5k}|W?azv zzSEj>XJo5W?VTr&5UXpUTU}ic3+_c15ejbfo?Bfr1nyo~wFnzj6(C^Z?1ZoT-kOuy ztYum>nJY!4nIfg#jW5J;cTd1$V&*Lf3xY}(G3Ld?v!1*)*>?v7v7}{ESycm!oVg;< zK|x?A{KI57Tl8;fkBf9Gh$`VTNjf;}AEe0xdOpUdm|gatX(xaGiF<-kKF;o4l@Je7d` z3DUo6=*hB@S3_GRzP^L(zeDYj@>1LWAw6wcsjQLPahmm4pCI?J3)Pq99=aZ7H?r;F zby6JBZk7qJA>C|KO}(#N7}$mIpL*#g=(k(3$G=<@>=w=lRfU&}D7KufeXOQcv}4!~ zT(3YS>E$9sn_0Hz!N^+F>(aH1?HfIt(sEWho1tCoe`{Kbd(or;1X}a}vN{L6{EBNc80bS*R{}RF5k0qz8o(Qe;tX66U-G37EnK?~ri%?CG;5p_&;9 zD$+MgV!lNY>UJ&iLC%#6GMFSozi8E|erQ!;j~YN(HCT}5;fU#IQbUab^g|wpx1@%L z1hu9hrOBMns$Xa*7NkWo=Lg})LL+uJ2?hV6I9D~>Rj8Vds#8VZ4VG!-QxizwT?(1PB)TNr@6J{Nd6aF1}WkP+`e z1B=o^RBZ$kf`wpdMn4A8RyA6XA_9s76L(x8$UT5+o4AKO({1pMf$xHg5-#3!JJMkS zlG;3k?GC3s7b=9XHUosh3j86}-6YIIvlYL+wls0h zv@~Njd@!3z>K$W8J9eom)l5?fpR4%gal>MA%c%Qu`S)M_+H7q{2OLu;`^%@l!H!vP z2M^xCSzm_%YLdk%ZSa3cpCXPORYFiZGo}H7#(5NZxlIp(YN$qDD{W}myp`V!?O*)x zJ6A7UdiTZi?|$~Vch6tE`rA*v|LW(RrZdDtbL*Yv)!%&V>UaNm@S(ZcXdnMA>$NmB z)}c|FrGxp}3YAx%{@yR1e}=tW|D(a3I5Er_`Fa5V^71S1{owWYzV-OkS0B0h^2;#M zhvwp|X+JXY8pHE;7dCbFQp46_#I%ysx_&55So~CMyK`Axr_;a)r3;*F(kT?$fdCV* z5`^rE-hlFHY8K@sg{SvEE|a6_jAgs0&1}Z@sCgr2Qmj|D+tf0u?V8doB`}kmJUNq0 z@Hv{c4 zSJS<0CepQwZo__J*j~P{Q<^QNa|$n&d`Bo%%WgkpPARfsP9?bM(j+V>)ojAP9%(5$ zwFZadhZ6&^Ie1KTlOTp&5z#Hmq7124CQ(vL{6vC`idof9++v6XTzUb-KcUR!EydYnj407^~vZ1c&HFQcdgd(Fz5SKRbr>}f~v z1q8UYu88zFylhvV>Xkb0hK{${c#9%A^9Gd@Mfy7^`c(wKi=bk0P{xMGlmJLj%t9Z> zYi{i(ZqapLMNwl1g0g}10yc41aX*dH7DTb$#wDj5e-R1jB+3wc34r1|1BcY?7~GL7 z4V-}Px0c>)oi00STdZrKys)2aO&f8b7`qsYY~}HJqm9>1q?eGf!bL}c_VcBm#Rds6 zS+wJ;>rwg&;j2?x4*HFo$f0XLVE@>0jO=9nokPV3kcT@E_nz&2FiA6b{8W(MiX`dq zk?}*j4mvrc4JqNVg9nf98b2_0cwgeskv*da=_=&r8x^x1L@_2Nsv|CDOqg(%H74ZoBN@Coz(W^bS@19y0Nze-dJ?jcr)nv+60NEg)D# zP@w|%L=RzSSC19H31~dvj9cCn+`rd%1brHH@GT3=S6cTv#4e+BDWK5#*O!)l8=AIf zGEL#0aO3cH9p5cS@G8fK*Z9N^!Vau?YV&tc0WJakK7tCNTE2$P^{?daezMdS_h3A9 zuibJpH+27MHw>{i*Y4+D;AuJ@`oyll$e_<4=taPT0js*>6z}z2#GL}$GDmUGbH^wW zzhSDSNpe;EegHca0%;7dpQTA|AxqD$yN6Wqt%~)7?quPJ_9WGv!M<9*|olB?xAl&GhRjS zL%|vZ3IZI4{yBnQe}D%;Y#Ih=XQv0F{_BS)R@m?#MXrv=K1LCoNs{J_?a|Y4SmMRn z*%Tsu0eTg0QlFZDOcE#bMYQEP0Fe24*!&q1JP7c7!8eiZf>V`azq2&ZVFwN$NsNz9 z9JyNKs@h5=ZV2c;Elx?AJ zVUzo^YzzG<(zpdbM2uS?Bla2sw1EB0rZ~qU_&q`%#5M z>0hAyAq0Pk0HmDbP&~N8Z9b%aQd9o~nQv*E2gmw48@T<$`)^!Y$C4Y3a2Bw7g(d#7 zB|Hb;+G*^p+kDZkl0MZ-vZNX z?47M25B@c>uXIiTmGEUcf!Hr`PW84sVz+e8I0(PNv~ADre;cs4>{Pl6TN`kM3U~Y+ z(kk}i-vjM!cHiK)bnf71kp;uDbd*b^RTDnKV1O&KbGQS7=cYqex`)1l$|}bD2cZ27 zt9&o}@|{r$(?UGPp1*TrF@xL&g8c|45FACoqnpPx2ACbdb9Fl9d<3Pu=;x7*hae9; zzQcJw!Q4n6M6e412A&g4xWOncI>q&(;L@T%(H9gATwydfA1e#p3}6jB<{bcJ;&#c$ z2u)fBL1> zs_`qab*_J}rm4H}ei=cAMg&pz;r1A5Vy!z`i_OSer)Bbn*@W-Pq(xU1)Chf@#T-N6a22jnJ}p-VW)ouxgBLYu&+gf4}t8Z9np9vG;K-GPUUHy zS2?@dfRdUn9q;f|-Hj4)_VD(2(K$2s0KqvhV8koIyp`7bM{@bQ;BE5Tu@d66y=h%@ zi1_VO32-8Oe&>8^QOi06>k;%JM>Xf*|EY7B`jP6$9m{K{kjO8XCEEDi4%Ia;NO6{wgcDO#QlXA-T+*Y1LArSaQ1IIJN;2k3y`q%f9!N4 Aod5s; delta 5975 zcma)AYj7LY72dmgt!!DAWy!YWx8pn%$Bsi1$4Q*0tt18~0hT64-phW zacPq7l`m%63%u#pf_kl7tB2$eZ+o<`UMJUaTGHzE2Dw3x$Pqm%M>)@{HR?@r6Q?V* zX1zsj(OczKy-jZ8yh^QI?~pqm0Vb*$K)7iRBNmC zF1d@-0d0-mEqCiZau09UXlwOextG&HZJmCdd>yB2we|W2c>||I+D5%k?$bBPoAiFU zU*9Zm=KQd>Mc*oK<#e4kpl_46ak^gHuJ0IbcY+#!EkqGE)v}%S@MmAw4kEwJ_r6*HKHCDk|Jdg#|6KB zc#voOi?B{zn~#1!+y^yS4mVMd%G&|h_q9IsMb5kYZB zK)y(ON@%Hs_Lk5U5n;jwBltnfuT(|^vrv6JaJ=R$nIMW!sk&d{lpm=H2N$Xaq3Ywo zqD){xrlv$D$YpAaGPMgbp%R%emkAYR>aNHXl=>2(1}+pX3Pr96DN$P{g|^ozl}fb| zRKiMw()cuT>dl53;Q`^S0E=i+n(vpCmLXw=Ogog;X{Q;xsRBQvrS4zUd%h^97M=F!3R!d*#NkCws}s#?16Z9vM(!^ zu^Qh6_PTF`TwG z4)8I1!`HKxZU7RJ&6Am!&{AoY_F?}u^oF?}d(Pjx3RUP1gk4JrUvCTV^sir29N(4F z6O&7ieuJ$5qj!J(=p$Sh;A01S)7QQ3${M^>O=hT)N>47iiv0`oY$_TSSWp_*Rr6Rz zS#nVaY~6RN*RLz;R%COUHfB;ap)Wb(!G#$MMXg{qXB-H_6nQ(oin?P(?R4eESPicVbP>++a%6q7XzEzJaM+N z_XDkU3pr#}3}`9COlI`#c9}V9?j$!crRE1x3++Nvdsr~oRk;eu4gjlzT?~fV(V(AQ z2)-^_QPv&|NoFRSN*Xl6Na*gm7>YSGHEnt0_axPl#qSqx zFY5`%d{rVTe!q~I9v-Onf+C)QgYXzT341NPs&M)rd12O#I}(09{Fls1bHcQk7ZqX3 zOFuKcc_^2PbIM6S%ZsxWbHb2t^wA^Yv}4+tcLIH8+Ld=5ubg)0-LpQ$HAkjBc@OAH z^ik6>TcrTMPM9M^IB_gbX8n0r-ZLi>0s5Ru!2_z*@C4wg$-AM;jiwwAB0nPJ-HJ4T z?OMA%E#)NzDls6`V33y-XGEBR5_96%i6iM2Gc*HBF~gt%^$mJpTC;w(L8(9k4htt% zI0Z8@RRLA2pMYcwx1&n&nb8>mt-1|FZiR=}rz+Oda7Tk&b5|x~LMA1$;|3f`Dm5qp z7qXWkmn-8yP;@9=$I{W}`bN428@D0I2<-@Wvgf0l3eIds)94+*uv{rq)eU+((vC^h zv|K8s87fyn7U~dcC|!nX()HNywi}i=o1kWjC(}_J^a#Rggp~+m0G88C>8j;2G*!*g zab)8e8S_%eCiX^So8KZQEuvcdD#uvoy}QEe!|{c;?s z3_>@;A%rkO1Ayh;vuk)HK4g2znan8a;*w~TrCM6I<0i4HjwRBHrd}yL2J(D~bV562 zq>P(SWmSt9_K~r$=jl!mIy3)n%Y($71Cl10zqgs4YVTt|YO5Z+7g(0tFcW6Zxa6cM zlxRV?2LYpYNu*eKEk}-OUV8`@q4`{kukZ^l1}v0>dLCH|`p}0avj9;81*oE<< z5Dw=x)U*;msiw_K4m-d&pCx9?)8DEw;KaIW?Dqr~jm>UMf&zU4;Yoz20AkLPu^dTV zq0eAfNhH{(v8eb*f!(=#1zWfJcDS}Zw7R#`ER%w!RRuTu|GOR+Ci;78`^s0n?CV$U zWJd2kzGz;aWBzZva!hx69$`5G_dzXE+y|b;{NQy25-v+Izqub|H@~kb#pSq=KfrdZ zs}sKz<`1vCmGtwRBgh{Fh*eT2{8-Y-gqBhgrn-1kSZ;nw8ng|CqwJ&WTCV4jS*UR_ z7e7v*c0;1+33>uYa#PjFX(lgkrB1~@ubq&(FNF%&}uz09lD{=8uUCz4tTA6EOjG29ZFYqz=M#CJ2(;M!; zmP<>)6@Z_ZwnIev9Q4W_xG0aowv7=w3vK#60I0bd@MjTy2CM z!0Hl9@N#Zj!Ykm?PGg0EzGX{Vu@8nxpogOD?ag8K$-omyzQry9AFhJD#PY0Z$NBK& zpeqn~n)4uWrmX_0rVhxry7dc!?TMDF(dG;Rcr_`&tDI*EOdscMp9Vw94mu z2Y64i@!dPzSP^2)?1kOEg}YENfv^(+Z)tQG;W30~5#|6aAKqtE$>LuEFCt$>;Jc6S zIiB3SluTg9Mg-h$_7=jlq8Mt5QKINbdRY*91&U^sn97|rfV{N;E8#Kv03_mcNL!^g zc5bkFTa6TeZ>3Z%c_j}bmyHeE74;&B!~Zu%5BwK+jEdOu!uwZJy(^KNq7L@q;01Pm zXiX*FvM4`{YuUR)t4N4_Iux!9BexEro>j+NNCWGMH<1XtAs#t`l}D~s(>dMf*$lw`E4#BU%-{xdL=j};ps?A)5fzdea8yZ`_I diff --git a/util/AliyunSdk.py b/util/AliyunSdk.py index 6dc33fa..3671b4e 100644 --- a/util/AliyunSdk.py +++ b/util/AliyunSdk.py @@ -46,7 +46,8 @@ class AliyunOssSdk: try: self.get_oss_bucket() self.bucket.put_object(updatePath, fileByte) - logger.info("上传文件到oss成功! requestId:{}", request_id) + logger.info("上传文件到oss成功! requestId:{},{}".format( request_id, updatePath)) + return updatePath break except Exception as e: retry_count += 1 @@ -146,4 +147,4 @@ class ThAliyunVodSdk: # if __name__ == "__main__": # aa = ThAliyunVodSdk('/home/th/tuo_heng/dev/tuoheng_alg', 'dev', "1") -# print(aa.get_play_info('6928821035b171ee9f3b6632b68f0102')) \ No newline at end of file +# print(aa.get_play_info('6928821035b171ee9f3b6632b68f0102')) diff --git a/util/GPUtils.py b/util/GPUtils.py index 018128b..ec528ce 100644 --- a/util/GPUtils.py +++ b/util/GPUtils.py @@ -59,7 +59,27 @@ def print_gpu_ex_status(requestId=None): except Exception: logger.error("打印gpu状态异常: {}", format_exc()) return result +def select_best_server(servers): + best_server_info = None + lowest_memory_usage = float('inf') # 初始化为无穷大 + for ip, server_info in servers.items(): + gpu_list = server_info['GPU'] + for gpu in gpu_list: + if gpu['ID'] == 0: + memory_used = gpu['Memory Used'] + memory_total = gpu['Memory Total'] + memory_usage = (memory_used / memory_total) * 100 # 计算显存使用率 + + if memory_usage < lowest_memory_usage: + lowest_memory_usage = memory_usage + best_server_info = { + 'hostname': server_info['System']['Platform Node'], + 'IP': server_info['System']['Local IP Address'], + 'gpuId': gpu['ID'] + } + + return best_server_info def print_gpu_status(requestId=None): try: diff --git a/util/KafkaUtils.py b/util/KafkaUtils.py index e890796..b8e83c7 100644 --- a/util/KafkaUtils.py +++ b/util/KafkaUtils.py @@ -136,7 +136,7 @@ class CustomerKafkaConsumer: logger.error("消费者拉取消息异常: {}", format_exc()) return msg - def commit_offset(self, message, request_id): + def commit_offset(self, message, request_id,log=True): retry_num = 0 topic = message.topic offset = message.offset + 1 @@ -144,20 +144,20 @@ class CustomerKafkaConsumer: while True: try: self.subscribe() - logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, + if log: logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, request_id) tp = TopicPartition(topic=topic, partition=partition) self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(offset, None))}) - logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, + if log: logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, request_id) break except Exception: self.customerConsumer = None - logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id) + if log: logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id) time.sleep(1) retry_num += 1 if retry_num > 3: - logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id) + if log : logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id) break # if __name__=="__main__": diff --git a/util/__pycache__/AliyunSdk.cpython-38.pyc b/util/__pycache__/AliyunSdk.cpython-38.pyc index 75c483322ce668245147a13b322bc3bb2674ae08..ff407b85a7086e133fda720e0e15964b01e30a25 100644 GIT binary patch delta 485 zcmXw!ze^lJ6vyW~H@mlc-X6MV6vGh{5H1_f^T)Z05=Bz1Xe5mUv9JhS5{OZ8TpL}x zv?0Z_u&~&}5D*0uPDm3(2q{8Hk=7>I=3hvYc_*2H4{v7P=e;*4l`j?VspsW{S??yV zUOxQf9V?vEXLaXsH4$m1As`a*N~KnUL^dQuj8xWDYA2Zl4cUNs$C6kSO2paKoIZuv zvmth(#Hw54V$X`5)TNSj&pk9|uAYtCD6&=%(0eODg??B&$WvsOZTrQBUVBEX_M4tQ zo(DGaP*76BK>@xwi>96Z0Ap=ti*)HIJfd&T8A952mk>37y4RWZQKRY!M#u>0!dq^? z&*1}utISrcbw8C02dL6zVG{GrzlBg@jD|ao+FEkHCDZSY{u?j&FB|W+U%@oZ`U@DK zeZPuz`s)7*7h08HKVR1l=emq3#xx@`2$b&Pc#bj8n4#~*yLd}~ieWIw7ekC;#t5Ut wX#InI%`Nm3y1vz2(lrxv^`Sg*J!tM6hZT2=MYdc*>foW19MSlmjD0& delta 469 zcmXw!J4*vW5XW~edzZbKizZ4UDjG%1(^pJTL=lDfwiLCniwM5J%@rb-#=_hS@X)Rlphp9F6eXBFp`+wQW! zu-<8wlx61Jwg>0+b?CthJ%+FP0o37=5kbc|3U$#4V1O4ma7_aNK9xl^<^`RQsHQj#1YqzO@cw$>UV6k`hSuDmT}97lX5=*1V`D9ly8^%)H^ zIORXBtI6J)j9>o5pBMd?m6r%4VH{Tji;%>#Kp*VlYv3ibQ0u(2u_p!9nuIaJI3cSL z<1$EjmM}+{!1u;56!E7q6YHake!>7@kkCe`{eyf-Psk-KxSLy3DpKP-4JtIQ39fdx Z=#VKaHl-n2eQFv2NU7own!yzG`~rgTV&wn; diff --git a/util/__pycache__/GPUtils.cpython-38.pyc b/util/__pycache__/GPUtils.cpython-38.pyc index 41de0c12a329b303283c0a345e53b59e3ae9649c..46b5a4252431e28e5868593041d41014c844b9ec 100644 GIT binary patch delta 1123 zcmY*YOK;Oa5Z?7Gj-7QT*QHhmw<9ZAZP`upj9Or30a`XWZe+z)MeKJS~HTHjY znx(y59lzZ>{pivR$&q6(v70AVk|zbQ3ZzI%FOgd$HBu&15Gj!gnTAN2WbdNJ?B2Hz zmPqxy=XXMn2p-~jZ1X+5g0uWPzL>XR5qbm}KCd`9$8RVdz>mtMf>1l#>_5hDWI0S+$N7X#i!c-boef(AH31i!!oB}VV@K3YN#w(ctfb)dx>QL1R5 z$2!FB4UE_zY84S}Q=zB8)?qFMqXDBCr($y!kyIHC(s4RgHW6@!X%N+j2_L1aXlz+v zjfCuX>Ht4E^`snQXm(~*sNJm%q25|+3T1hT%Di{&6~7nK9jD2BvYQz@>tW;tzX;3= z1%j@|dzvk+l?fx=p zC_>)|LN5|}7j#&|a#PHwPkqYV{6n8bt&!J))IzrugwJHGmn7BdGp_?;-EG&NWSOAc z3c4&B%Kh)V_95LTZf5L6vW7!1QJUO7SzD3$f!~f=6KPz9ICB6H*08DMu?cr(go=qZ z#ljXg{#ph-1DsC_`gtWqPlDNB;yFX`C39<72e*-IxXgP7ACq!7I zWSdbp5aTM6-XBiDt{m7msqfP{sdrq00?<%taz-`SQbte9u#Jid0%W8=4Y1A^)7S7c zf02GQl?WUiAX@!z6W9 qfKF#6I1*%YlBE-!iN254gIQr->4l^p_*Z2itj6p1jBc3~Yw91-8V}t7 delta 626 zcmY*W%Wl&^6!q9{-T0N4le(di+Nwgb`ao$}B0y?Y1OkKv*?5CY?lc6Px{PfBD;9LY zDpGXW@CEE)(H%SZ2>$@^1F+?egZi+n<1_ajpZl0^ozu44cN|lpXZP=9sMMzJGMh{{ zo^Q0lh3W|lYp@74s1vJ$4-IG%Yk&u>6D4ed^H>R%rawNLp)ro$+)pA1@ikjz75u~= zFbjXOomz#=#W|Q5s4KcnLF};(4%K_i#xLq_J=BKB=_TPGb&EA|T^l$K$pb@J(yEFd zwe72Q8BvRh;IDF?W_O`5hR6BS5nQ(TrCnn__6!d<^@%n6Uz&LMU0>_Ai5ByMr+J)- zMH#=?r#^R`b-(tE9o6^^l+5?hT_|whJbJf8$G{bG3Q?6XM^Naaqr;?7M<5#IF_!x< z$9$ZKxBYAsht`ZgNwO$DOQifvbRakv9qD&Pf=Uo*LUtMIcOLaqxuTqUB>P#X^5%p` za2xlmTg=1v)`wPUa6U-MOkA;-xBH|_bpnOWGqFbBfIa;GdPOA@(SSdT1~1FXA-=HV rgK}l%oK2DoUqV-ctXf)J*x9TUC5nX`;{@`U-;^_?KCU{xW}B^l#1(~i diff --git a/util/__pycache__/KafkaUtils.cpython-38.pyc b/util/__pycache__/KafkaUtils.cpython-38.pyc index f3ce915072ef9e04910362c62450a524850ad122..96daf51ca363d7adef89bdc88d62ff3e6fe3ba60 100644 GIT binary patch delta 949 zcmZ9L&rcIk5XaxVZg;mow%FPh!2~t(t0o{uegz|7Kmv(~{4i*vLF{V~XstZfM9nrm zV7%ew{Q;JXo(*1%F<$)xOwiSXi6&ko-qh#;XC9G6H$_ z;q&9gmxZmTP9g~Y=HXz4L-S_UY5{0eukDK2iVB(891qQd*+_nqdKTW^bz9Yw$S`!M zuaW2VgSh4g@^^AFNh*yLc_-AXXg4&g@6j_2eUvy&%IK;)?=I$wdG|(1$c|lMTWzf> zI0K1J4D&1tf@QgYY(O~brBfG=iLi*=hpZ){(=1ctr_^LRrG7c@EP65j(43CH#k#C9 ziW_viIatH_zDa5@MVKbcsHylVxS$@!j}4Jlh6wt2CVAt8a|qwKUMdyjd5R|pO@#kW zIH7*Ww+`xM-PpKQDiq{NQtEv>)pnv0W>sQ;Z)GNDVE*qz%Va}ntDg}9%7zC^tQ|&+a6o8mTb>4I3r1g9w0PHgEk5+$)t*D-43};~^IlVhr{pwM&3hinz z)nCz*Hh`BP`w99!_5JDZVo`1k;hVWqvAF0;n=<;AXSwB{mr5BJU_uy7&cOzQMS<@aJPHT)d>dk@FjxXrw$`Ze zbVmkeGPa~oDTfJX2_u9wL7!)qysHWa{fHbz{|eeF{e{?wheLLq9oG3wjqFnI(?`Kq LJLxtU#ft7f!mh@~ delta 921 zcmZ9K$xjqP6o>0oud_5G<1B~?E^K1L5(iMDNCXTf3W6x&a3DA}QDDY_qDj>BWDXc_ z7?b)3W=@_w=y>s9ymIj1!NfQh4_-CKi(cH`>%q8mr@s8&((m=Fs(Zcfdz{OT6J+?L zzTUt7=&=(uAYI*WlsL3k%T^~qtJ=0pW+!IIL)D4EBABhD+tkzG$GYrP4?|jPlh3+03PSeQ6Q`0%m2-xp zN6^f&GzgaF0@9iYtLM(1NI(Sdva}^aGpx(0#noCOuD&^ME&8q6z6Ia@+%RCBCA-D9 zt0NK4cTG|GX~IRqjJni#0xqZrjfY1mR|W|Fa3-nqgmZ|Reyvc*$qBM232lV`=9^UC z8{h2p!+v9ZwUEn68lhe{r9MU5U{=L;A1h@vbpL*A6x4YX*ZOOOfTHHXGArsx5Io6> z1}a`O@636utW_8=S1%hv^K{SfOpjL#^q6RwLf_&=%VW#dvh7)Jz%vBf*GVrwM+P;g4#< zUeV>YpBwOVc|`+u73<<^p2H;=o>XvB0uCqJ8B_DI>H38lQcq%Kvm5;zDHVw~snz&_ zl7H1sQSK1I-_k!vW0B9_VKY<6=a*bb8(81y6e*q1PZ&US*)mFckkElh2V~Zjw-;`1 zO2fhJIaOzj7BkT