|
- import traceback
- from kafka import KafkaProducer, KafkaConsumer,TopicPartition
- from kafka.errors import kafka_errors
- import json
- def get_left_cnt(consumer,topic):
- partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
-
- # total
- toff = consumer.end_offsets(partitions)
- toff = [(key.partition, toff[key]) for key in toff.keys()]
- toff.sort()
-
- # current
- coff = [(x.partition, consumer.committed(x)) for x in partitions]
- coff.sort()
-
- # cal sum and left
- toff_sum = sum([x[1] for x in toff])
- cur_sum = sum([x[1] for x in coff if x[1] is not None])
- left_sum = toff_sum - cur_sum
-
- return left_sum
- def getAllRecords(consumer,topics):
- leftCnt = 0
- for topic in topics[0:2]:
- leftCnt+=get_left_cnt(consumer,topic)
- out = []
- if leftCnt == 0:
- return []
- for ii,msg in enumerate(consumer):
- consumer.commit()
- out.append(msg)
- if ii== (leftCnt-1):
- break###断流或者到终点
- return out
- def detector(par):
- consumer = KafkaConsumer(
- bootstrap_servers=par['server'],
- group_id=par['group_id'],
- auto_offset_reset='earliest',
- #auto_offset_reset='latest',
- #isolation_level = 'read_committed',
- #enable_auto_commit=True
- )
- consumer.subscribe( par['topic'][0:2])
- print( ' Start kafka ')
- #msgs = getAllRecords(consumer,par['topic'])
- #print( 'getover cnt',len(msgs))
- #for ii,msg in enumerate(msgs):
- for ii,msg in enumerate(consumer):
- print(msg)
- try:
- print('##'*10,ii)
- taskInfos = eval(msg.value.decode('utf-8'))
- print(taskInfos )
- except:
- print('**'*10,'wrong',ii)
- print(msg.value.decode('utf-8'))
- if __name__ == '__main__':
- par={};
- ###topic0--在线,topic1--离线
-
- #par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
-
- par['server']='192.168.11.242:9092';par['topic']=('alg-online-tasks', 'alg-task-results','alg-offline-tasks');par['group_id']='testww2';
- #par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks','alg-task-results','alg-offline-tasks');par['group_id']='testW11';
- par['kafka']='mintors/kafka'
- detector(par)
|