from kafka import KafkaProducer, KafkaConsumer from kafka.errors import kafka_errors import traceback import json import time import random,string def producer_demo(): cnt_online=0;cnt_offline=1 Tecent=False; #topic_on='thsw';topic_off='thsw2'; #server=['212.129.223.66:19092']; #server=['101.132.127.1:19092'] server=['192.168.11.242:9092'] topic_on='alg-online-tasks';topic_off='alg-offline-tasks' # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json producer = KafkaProducer( bootstrap_servers=server,#tencent yun value_serializer=lambda v: v.encode('utf-8')) # 发送三条消息 if not Tecent: #pull_channel = "rtmp://live.play.t-aaron.com/live/THSA" #push_channel = 'rtmp://live.push.t-aaron.com/live/THSB' #pull_channel = 'rtmp://live.play.t-aaron.com/live/THSAa_hd' pull_channel = 'http://live.play.t-aaron.com/live/THSAa_hd.m3u8' push_channel = "rtmp://live.push.t-aaron.com/live/THSBa" else: pull_channel = "rtmp://demoplay.yunhengzhizao.cn/live/THSA_HD5M" push_channel = "rtmp://127.0.0.1:1935/live/test" #pull_channel = 'rtmp://live.play.t-aaron.com/live/THSAa' #push_channel = 'rtmp://127.0.0.1:1975/live/test' for i in range(0, cnt_online): time.sleep(0.0001) #''.join(random.sample(string.ascii_letters ,16) ) msg_dict = { "biz_id": "hehuzhang", "mod_id": "ai", "msg_id": 'bb'+''.join(random.sample(string.ascii_letters ,30) ) , #"pull_channel": "rtmp://live.play.t-aaron.com/live/THSA", #"push_channel": "rtmp://live.push.t-aaron.com/live/THSB", "pull_channel": pull_channel, "push_channel": push_channel, 'channel_code':'LC001', "results_base_dir": "XJRW202111291703"+str(random.randint(10,99)), } msg = json.dumps(msg_dict) #msg = msg_dict future = producer.send( topic_on, #key='count_num', # 同一个key值,会被送至同一个分区 msg ) print('online send {}'.format(str(i))) try: future.get(timeout=10) # 监控是否发送成功 except kafka_errors: # 发送失败抛出kafka_errors traceback.format_exc() for i in range(0, cnt_offline): msg_dict = { "biz_id":"hehuzhang", "mod_id":"ai", 'channel_code':'LC001', "msg_id":'bb'+''.join(random.sample(string.ascii_letters ,30) ) , "offering_id":"http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4", #"offering_id":"http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/61921b16-1807435dfa3-0004-f90c-f2c-7ec68.mp4", "offering_type":"mp4", "results_base_dir": "XJRW202203171535"+str(random.randint(10,99)), } msg = json.dumps(msg_dict) #msg = msg_dict future = producer.send(topic_off,msg) print('offline send {}'.format(str(i))) try: future.get(timeout=10) except kafka_errors: traceback.format_exc() if __name__=='__main__': producer_demo()