You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
3.6KB

  1. from kafka import KafkaProducer, KafkaConsumer
  2. from kafka.errors import kafka_errors
  3. import traceback
  4. import json
  5. import time
  6. import random,string
  7. def producer_demo():
  8. cnt_online=1;cnt_offline=0
  9. Tecent=False;
  10. #topic_on='thsw';topic_off='thsw2';
  11. #server=['212.129.223.66:19092'];
  12. server=["192.168.11.242:9092"]
  13. #server=['101.132.127.1:19092']
  14. topic_on='dsp-alg-online-tasks';topic_off='dsp-alg-offline-tasks'
  15. # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
  16. producer = KafkaProducer(
  17. bootstrap_servers=server,#tencent yun
  18. value_serializer=lambda v: v.encode('utf-8'))
  19. # 发送三条消息
  20. if not Tecent:
  21. #pull_channel = "rtmp://live.play.t-aaron.com/live/THSA"
  22. #push_channel = 'rtmp://live.push.t-aaron.com/live/THSB'
  23. #pull_channel = 'rtmp://live.play.t-aaron.com/live/THSAa_hd'
  24. pull_channel = 'http://live.play.t-aaron.com/live/THSAa_hd.m3u8'
  25. push_channel = "rtmp://live.push.t-aaron.com/live/THSBa"
  26. else:
  27. pull_channel = "rtmp://demoplay.yunhengzhizao.cn/live/THSA_HD5M"
  28. push_channel = "rtmp://127.0.0.1:1935/live/test"
  29. #pull_channel = 'rtmp://live.play.t-aaron.com/live/THSAa'
  30. #push_channel = 'rtmp://127.0.0.1:1975/live/test'
  31. for i in range(0, cnt_online):
  32. time.sleep(0.0001)
  33. #''.join(random.sample(string.ascii_letters ,16) )
  34. msg_dict = {
  35. "request_id":'nn'+''.join(random.sample(string.ascii_letters ,30) ) ,
  36. "models":[
  37. { "id":"0","config":{}},
  38. { "id":"1","config":{}},
  39. { "id":"2","config":{}},
  40. { "id":"3","config":{}},
  41. ],
  42. "pull_url":pull_channel,
  43. "push_url":push_channel,
  44. "results_base_dir": "XJRW202111291703"+str(random.randint(10,99)),
  45. }
  46. msg = json.dumps(msg_dict)
  47. #msg = msg_dict
  48. future = producer.send(
  49. topic_on,
  50. #key='count_num', # 同一个key值,会被送至同一个分区
  51. msg
  52. )
  53. print('online send {}'.format(str(i)))
  54. try:
  55. future.get(timeout=10) # 监控是否发送成功
  56. except kafka_errors: # 发送失败抛出kafka_errors
  57. traceback.format_exc()
  58. for i in range(0, cnt_offline):
  59. msg_dict = {
  60. "request_id":'bb'+''.join(random.sample(string.ascii_letters ,30) ) ,
  61. "models":[
  62. { "id":"0","config":{}},
  63. { "id":"1","config":{}},
  64. { "id":"2","config":{}},
  65. { "id":"3","config":{}},
  66. ],
  67. "original_url":"http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4",
  68. "original_type":"mp4",
  69. "results_base_dir": "XJRW202203171535"+str(random.randint(10,99)),
  70. }
  71. msg = json.dumps(msg_dict)
  72. #msg = msg_dict
  73. future = producer.send(topic_off,msg)
  74. print('offline send {}'.format(str(i)))
  75. try:
  76. future.get(timeout=10)
  77. except kafka_errors:
  78. traceback.format_exc()
  79. if __name__=='__main__':
  80. producer_demo()