You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

69 lines
2.4KB

  1. import traceback
  2. from kafka import KafkaProducer, KafkaConsumer,TopicPartition
  3. from kafka.errors import kafka_errors
  4. import json
  5. def get_left_cnt(consumer,topic):
  6. partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
  7. # total
  8. toff = consumer.end_offsets(partitions)
  9. toff = [(key.partition, toff[key]) for key in toff.keys()]
  10. toff.sort()
  11. # current
  12. coff = [(x.partition, consumer.committed(x)) for x in partitions]
  13. coff.sort()
  14. # cal sum and left
  15. toff_sum = sum([x[1] for x in toff])
  16. cur_sum = sum([x[1] for x in coff if x[1] is not None])
  17. left_sum = toff_sum - cur_sum
  18. return left_sum
  19. def getAllRecords(consumer,topics):
  20. leftCnt = 0
  21. for topic in topics[0:2]:
  22. leftCnt+=get_left_cnt(consumer,topic)
  23. out = []
  24. if leftCnt == 0:
  25. return []
  26. for ii,msg in enumerate(consumer):
  27. consumer.commit()
  28. out.append(msg)
  29. if ii== (leftCnt-1):
  30. break###断流或者到终点
  31. return out
  32. def detector(par):
  33. consumer = KafkaConsumer(
  34. bootstrap_servers=par['server'],
  35. group_id=par['group_id'],
  36. auto_offset_reset='earliest',
  37. #auto_offset_reset='latest',
  38. #isolation_level = 'read_committed',
  39. #enable_auto_commit=True
  40. )
  41. consumer.subscribe( par['topic'][0:2])
  42. print( ' Start kafka ')
  43. #msgs = getAllRecords(consumer,par['topic'])
  44. #print( 'getover cnt',len(msgs))
  45. #for ii,msg in enumerate(msgs):
  46. for ii,msg in enumerate(consumer):
  47. print(msg)
  48. try:
  49. print('##'*10,ii)
  50. taskInfos = eval(msg.value.decode('utf-8'))
  51. print(taskInfos )
  52. except:
  53. print('**'*10,'wrong',ii)
  54. print(msg.value.decode('utf-8'))
  55. if __name__ == '__main__':
  56. par={};
  57. ###topic0--在线,topic1--离线
  58. #par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
  59. par['server']='192.168.11.242:9092';par['topic']=('alg-online-tasks', 'alg-task-results','alg-offline-tasks');par['group_id']='testww2';
  60. #par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks','alg-task-results','alg-offline-tasks');par['group_id']='testW11';
  61. par['kafka']='mintors/kafka'
  62. detector(par)