DSPRiverInspection/code_bak/Send_debug.py

120 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import base64
import io,os
import requests
import time,json
import string,random
import glob,string,sys
import oss2,copy
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
msg_dict_off={
"msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
"biz_id":"hehuzhang",#业务标识
"mod_id":"ai",#模型标识
"status":"running",#任务状态
"type":str(1),#数据类型1图片 2视频
"error":str(9999),#错误信息
"results":[#问题结果
{
"original_url":"",#原图地址
"sign_url":"",#AI标记地址
"category_id":"",#分类标识
"description":"",#问题描述
"time":"",#时间戳
}
]
}
def test5(par):
indir,outdir,logdir,jsonDir = par['indir'],par['outdir'],par['logdir'],par['jsonDir']
videoBakDir,ossPar,vodPar,kafkaPar = par['videoBakDir'], par['ossPar'],par['vodPar'],par['kafkaPar']
'''producer = KafkaProducer(
bootstrap_servers=kafkaPar['boostServer'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'),
#request_timeout_ms=3,
#connections_max_idle_ms=60000
)'''
producer = KafkaProducer(
bootstrap_servers=['101.132.127.1:19092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
#while True:
for i in range(5):
for j in range(10):
msg = copy.deepcopy(msg_dict_off)
biz_id = 'i-%d-j-%d'%(i,j)
print(biz_id,msg['biz_id'])
msg['biz_id'] = biz_id
msg = json.dumps(msg, ensure_ascii=False)
'''future = producer.send(
kafkaPar['topic'],
msg
)'''
future = producer.send(
'alg-task-results',
key='count_num', # 同一个key值会被送至同一个分区
value=biz_id) # 向分区1发送消息
try:
record_metadata = future.get(timeout=30)
print('-'*10,biz_id,' send')
except Exception as e:
print('#'*10,biz_id,' error:',str(e))
for ii in range(30):
time.sleep(10)
print('sleep %d s'%(ii*10))
for j in range(10,20):
msg = copy.deepcopy(msg_dict_off)
biz_id = 'i-%d-j%d'%(i,j)
msg['biz_id'] = biz_id
msg = json.dumps(msg, ensure_ascii=False)
future = producer.send(
'alg-task-results',
key='count_num', # 同一个key值会被送至同一个分区
value=biz_id) # 向分区1发送消息
try:
record_metadata = future.get(timeout=30)
print('-'*10,biz_id,' send')
except Exception as e:
print('#'*10,biz_id,' error:',str(e))
if __name__=='__main__':
masterFile="conf/send_oss_debug.json"
assert os.path.exists(masterFile)
with open(masterFile,'r') as fp:
par=json.load(fp)
test5(par)