import traceback from kafka import KafkaProducer, KafkaConsumer,TopicPartition from kafka.errors import kafka_errors import json def get_left_cnt(consumer,topic): partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)] # total toff = consumer.end_offsets(partitions) toff = [(key.partition, toff[key]) for key in toff.keys()] toff.sort() # current coff = [(x.partition, consumer.committed(x)) for x in partitions] coff.sort() # cal sum and left toff_sum = sum([x[1] for x in toff]) cur_sum = sum([x[1] for x in coff if x[1] is not None]) left_sum = toff_sum - cur_sum return left_sum def getAllRecords(consumer,topics): leftCnt = 0 for topic in topics[0:2]: leftCnt+=get_left_cnt(consumer,topic) out = [] if leftCnt == 0: return [] for ii,msg in enumerate(consumer): consumer.commit() out.append(msg) if ii== (leftCnt-1): break###断流或者到终点 return out def detector(par): consumer = KafkaConsumer( bootstrap_servers=par['server'], group_id=par['group_id'], auto_offset_reset='earliest', #auto_offset_reset='latest', #isolation_level = 'read_committed', #enable_auto_commit=True ) consumer.subscribe( par['topic'][0:2]) print( ' Start kafka ') #msgs = getAllRecords(consumer,par['topic']) #print( 'getover cnt',len(msgs)) #for ii,msg in enumerate(msgs): for ii,msg in enumerate(consumer): print(msg) try: print('##'*10,ii) taskInfos = eval(msg.value.decode('utf-8')) print(taskInfos ) except: print('**'*10,'wrong',ii) print(msg.value.decode('utf-8')) if __name__ == '__main__': par={}; ###topic0--在线,topic1--离线 #par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test'; par['server']='192.168.11.242:9092';par['topic']=('alg-online-tasks', 'alg-task-results','alg-offline-tasks');par['group_id']='testww2'; #par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks','alg-task-results','alg-offline-tasks');par['group_id']='testW11'; par['kafka']='mintors/kafka' detector(par)