DSPRiverInspection/code_bak/Send_debugF.py

117 lines
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
import time
def producer_demo():
# 假设生产的消息为键值对不是一定要键值对且序列化方式为json
producer = KafkaProducer(
bootstrap_servers=['101.132.127.1:19092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode(),
)
# 发送三条消息
for i in range(0, 3):
future = producer.send(
'alg-task-results',
key='count_num', # 同一个key值会被送至同一个分区
value=str(i)) # 向分区1发送消息
print("send {}".format(str(i)))
try:
future.get(timeout=10) # 监控是否发送成功
except Exception as e: # 发送失败抛出kafka_errors
print(e)
producer = KafkaProducer(
bootstrap_servers=['101.132.127.1:19092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode(),
)
future = producer.send(
'alg-task-results',
key='count_num', # 同一个key值会被送至同一个分区
value=str(i)) # 向分区1发送消息
try:
future.get() # 监控是否发送成功
print("re send {}".format(str(i)))
except Exception as e:
print('resend error:',e)
for ii in range(30):
time.sleep(10)
print('sleep %d s'%(ii*10))
for i in range(0, 3):
future = producer.send(
'alg-task-results',
key='count_num', # 同一个key值会被送至同一个分区
value=str(i)) # 向分区1发送消息
print("send {}".format(str(i)))
try:
future.get() # 监控是否发送成功
except Exception as e: # 发送失败抛出kafka_errors
print(e)
producer = KafkaProducer(
bootstrap_servers=['101.132.127.1:19092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode(),
)
future = producer.send(
'alg-task-results',
key='count_num', # 同一个key值会被送至同一个分区
value=str(i)) # 向分区1发送消息
try:
future.get() # 监控是否发送成功
print("re send {}".format(str(i)))
except Exception as e:
print('resend error:',e)
def prod():
producer = KafkaProducer(
bootstrap_servers=['101.132.127.1:19092'],
key_serializer=lambda k: json.dumps(k).encode(),
metadata_max_age_ms=120000,
value_serializer=lambda v: json.dumps(v).encode()
)
# 发送三条消息
for i in range(0, 3):
future = producer.send(
'alg-task-results',
key='count_num', # 同一个key值会被送至同一个分区
value=str(i)) # 向分区1发送消息
print("send {}".format(str(i)))
try:
future.get() # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
for ii in range(30):
time.sleep(10)
print('sleep %d s'%(ii*10))
for i in range(0, 3):
future = producer.send(
'alg-task-results',
key='count_num', # 同一个key值会被送至同一个分区
value=str(i)) # 向分区1发送消息
print("send {}".format(str(i)))
try:
future.get() # 监控是否发送成功
except Exception as e:
print('resend error:',e)
if __name__=='__main__':
print('########demo1 2nd############')
prod()
print('########demo2 1st############')
producer_demo()