選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

77 行
2.8KB

  1. from kafka import KafkaProducer, KafkaConsumer
  2. from kafka.errors import kafka_errors
  3. import traceback
  4. import json
  5. import time
  6. def producer_demo():
  7. # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
  8. producer = KafkaProducer(
  9. bootstrap_servers=['101.132.127.1:19092'],
  10. key_serializer=lambda k: json.dumps(k).encode(),
  11. value_serializer=lambda v: json.dumps(v).encode())
  12. # 发送三条消息
  13. for i in range(0, 3):
  14. future = producer.send(
  15. 'alg-task-results',
  16. key='count_num', # 同一个key值,会被送至同一个分区
  17. value=str(i)) # 向分区1发送消息
  18. print("send {}".format(str(i)))
  19. try:
  20. future.get(timeout=10) # 监控是否发送成功
  21. except kafka_errors: # 发送失败抛出kafka_errors
  22. traceback.format_exc()
  23. for ii in range(30):
  24. time.sleep(10)
  25. print('sleep %d s'%(ii*10))
  26. for i in range(10, 20):
  27. future = producer.send(
  28. 'alg-task-results',
  29. key='count_num', # 同一个key值,会被送至同一个分区
  30. value=str(i)) # 向分区1发送消息
  31. print("send {}".format(str(i)))
  32. try:
  33. future.get(timeout=10) # 监控是否发送成功
  34. except : # 发送失败抛出kafka_errors
  35. traceback.format_exc()
  36. def prod():
  37. producer = KafkaProducer(
  38. bootstrap_servers=['101.132.127.1:19092'],
  39. key_serializer=lambda k: json.dumps(k).encode(),
  40. value_serializer=lambda v: json.dumps(v).encode())
  41. # 发送三条消息
  42. for i in range(0, 3):
  43. future = producer.send(
  44. 'alg-task-results',
  45. key='count_num', # 同一个key值,会被送至同一个分区
  46. value=str(i)) # 向分区1发送消息
  47. print("send {}".format(str(i)))
  48. try:
  49. future.get(timeout=10) # 监控是否发送成功
  50. except kafka_errors: # 发送失败抛出kafka_errors
  51. traceback.format_exc()
  52. for ii in range(30):
  53. time.sleep(10)
  54. print('sleep %d s'%(ii*10))
  55. for i in range(0, 3):
  56. future = producer.send(
  57. 'alg-task-results',
  58. key='count_num', # 同一个key值,会被送至同一个分区
  59. value=str(i)) # 向分区1发送消息
  60. print("send {}".format(str(i)))
  61. try:
  62. future.get(timeout=10) # 监控是否发送成功
  63. except kafka_errors: # 发送失败抛出kafka_errors
  64. traceback.format_exc()
  65. if __name__=='__main__':
  66. print('########demo2 1st############')
  67. prod()
  68. print('########demo1 2nd############')
  69. prod()