120 lines
3.6 KiB
Python
120 lines
3.6 KiB
Python
|
||
import numpy as np
|
||
|
||
import base64
|
||
import io,os
|
||
import requests
|
||
import time,json
|
||
import string,random
|
||
import glob,string,sys
|
||
|
||
import oss2,copy
|
||
from kafka import KafkaProducer, KafkaConsumer
|
||
|
||
|
||
|
||
from kafka.errors import kafka_errors
|
||
|
||
msg_dict_off={
|
||
"msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
|
||
"biz_id":"hehuzhang",#业务标识
|
||
"mod_id":"ai",#模型标识
|
||
"status":"running",#任务状态
|
||
"type":str(1),#数据类型:1图片 2视频
|
||
"error":str(9999),#错误信息
|
||
"results":[#问题结果
|
||
{
|
||
"original_url":"",#原图地址
|
||
"sign_url":"",#AI标记地址
|
||
"category_id":"",#分类标识
|
||
"description":"",#问题描述
|
||
"time":"",#时间戳
|
||
}
|
||
]
|
||
}
|
||
|
||
def test5(par):
|
||
|
||
indir,outdir,logdir,jsonDir = par['indir'],par['outdir'],par['logdir'],par['jsonDir']
|
||
videoBakDir,ossPar,vodPar,kafkaPar = par['videoBakDir'], par['ossPar'],par['vodPar'],par['kafkaPar']
|
||
|
||
'''producer = KafkaProducer(
|
||
bootstrap_servers=kafkaPar['boostServer'],#tencent yun
|
||
value_serializer=lambda v: v.encode('utf-8'),
|
||
#request_timeout_ms=3,
|
||
#connections_max_idle_ms=60000
|
||
|
||
)'''
|
||
|
||
producer = KafkaProducer(
|
||
bootstrap_servers=['101.132.127.1:19092'],
|
||
key_serializer=lambda k: json.dumps(k).encode(),
|
||
value_serializer=lambda v: json.dumps(v).encode())
|
||
|
||
|
||
|
||
#while True:
|
||
for i in range(5):
|
||
for j in range(10):
|
||
|
||
|
||
|
||
msg = copy.deepcopy(msg_dict_off)
|
||
biz_id = 'i-%d-j-%d'%(i,j)
|
||
print(biz_id,msg['biz_id'])
|
||
msg['biz_id'] = biz_id
|
||
msg = json.dumps(msg, ensure_ascii=False)
|
||
|
||
'''future = producer.send(
|
||
kafkaPar['topic'],
|
||
msg
|
||
)'''
|
||
|
||
|
||
future = producer.send(
|
||
'alg-task-results',
|
||
key='count_num', # 同一个key值,会被送至同一个分区
|
||
value=biz_id) # 向分区1发送消息
|
||
|
||
|
||
try:
|
||
record_metadata = future.get(timeout=30)
|
||
print('-'*10,biz_id,' send')
|
||
except Exception as e:
|
||
print('#'*10,biz_id,' error:',str(e))
|
||
|
||
|
||
for ii in range(30):
|
||
time.sleep(10)
|
||
print('sleep %d s'%(ii*10))
|
||
|
||
|
||
for j in range(10,20):
|
||
msg = copy.deepcopy(msg_dict_off)
|
||
biz_id = 'i-%d-j%d'%(i,j)
|
||
msg['biz_id'] = biz_id
|
||
msg = json.dumps(msg, ensure_ascii=False)
|
||
|
||
|
||
future = producer.send(
|
||
'alg-task-results',
|
||
key='count_num', # 同一个key值,会被送至同一个分区
|
||
value=biz_id) # 向分区1发送消息
|
||
try:
|
||
record_metadata = future.get(timeout=30)
|
||
print('-'*10,biz_id,' send')
|
||
except Exception as e:
|
||
print('#'*10,biz_id,' error:',str(e))
|
||
|
||
|
||
|
||
|
||
if __name__=='__main__':
|
||
|
||
masterFile="conf/send_oss_debug.json"
|
||
assert os.path.exists(masterFile)
|
||
with open(masterFile,'r') as fp:
|
||
par=json.load(fp)
|
||
|
||
test5(par)
|