用kafka接收消息
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

68 lines
2.4KB

  1. import traceback
  2. from kafka import KafkaProducer, KafkaConsumer,TopicPartition
  3. from kafka.errors import kafka_errors
  4. import json
  5. def get_left_cnt(consumer,topic):
  6. partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
  7. # total
  8. toff = consumer.end_offsets(partitions)
  9. toff = [(key.partition, toff[key]) for key in toff.keys()]
  10. toff.sort()
  11. # current
  12. coff = [(x.partition, consumer.committed(x)) for x in partitions]
  13. coff.sort()
  14. # cal sum and left
  15. toff_sum = sum([x[1] for x in toff])
  16. cur_sum = sum([x[1] for x in coff if x[1] is not None])
  17. left_sum = toff_sum - cur_sum
  18. return left_sum
  19. def getAllRecords(consumer,topics):
  20. leftCnt = 0
  21. for topic in topics[0:2]:
  22. leftCnt+=get_left_cnt(consumer,topic)
  23. out = []
  24. if leftCnt == 0:
  25. return []
  26. for ii,msg in enumerate(consumer):
  27. consumer.commit()
  28. out.append(msg)
  29. if ii== (leftCnt-1):
  30. break###断流或者到终点
  31. return out
  32. def detector(par):
  33. consumer = KafkaConsumer(
  34. bootstrap_servers=par['server'],
  35. group_id=par['group_id'],
  36. #auto_offset_reset='earliest',
  37. auto_offset_reset='latest',
  38. #isolation_level = 'read_committed',
  39. #enable_auto_commit=True
  40. )
  41. consumer.subscribe( par['topic'][0:2])
  42. print( ' Start kafka ')
  43. msgs = getAllRecords(consumer,par['topic'])
  44. print( 'getover cnt',len(msgs))
  45. for ii,msg in enumerate(msgs):
  46. print(msg)
  47. try:
  48. print('##'*10,ii)
  49. taskInfos = eval(msg.value.decode('utf-8'))
  50. print(taskInfos )
  51. except:
  52. print('**'*10,'wrong',ii)
  53. print(msg.value.decode('utf-8'))
  54. if __name__ == '__main__':
  55. par={};
  56. ###topic0--在线,topic1--离线
  57. #par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
  58. par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks', 'alg-task-results','alg-offline-tasks');par['group_id']='testww';
  59. #par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks','alg-task-results','alg-offline-tasks');par['group_id']='testW11';
  60. par['kafka']='mintors/kafka'
  61. detector(par)