DSPRiverInspection/code_bak/consumer2.py

68 lines
2.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import traceback
from kafka import KafkaProducer, KafkaConsumer,TopicPartition
from kafka.errors import kafka_errors
import json
def get_left_cnt(consumer,topic):
partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
# total
toff = consumer.end_offsets(partitions)
toff = [(key.partition, toff[key]) for key in toff.keys()]
toff.sort()
# current
coff = [(x.partition, consumer.committed(x)) for x in partitions]
coff.sort()
# cal sum and left
toff_sum = sum([x[1] for x in toff])
cur_sum = sum([x[1] for x in coff if x[1] is not None])
left_sum = toff_sum - cur_sum
return left_sum
def getAllRecords(consumer,topics):
leftCnt = 0
for topic in topics[0:2]:
leftCnt+=get_left_cnt(consumer,topic)
out = []
if leftCnt == 0:
return []
for ii,msg in enumerate(consumer):
consumer.commit()
out.append(msg)
if ii== (leftCnt-1):
break###断流或者到终点
return out
def detector(par):
consumer = KafkaConsumer(
bootstrap_servers=par['server'],
group_id=par['group_id'],
#auto_offset_reset='earliest',
auto_offset_reset='latest',
#isolation_level = 'read_committed',
#enable_auto_commit=True
)
consumer.subscribe( par['topic'][0:2])
print( ' Start kafka ')
msgs = getAllRecords(consumer,par['topic'])
print( 'getover cnt',len(msgs))
for ii,msg in enumerate(msgs):
print(msg)
try:
print('##'*10,ii)
taskInfos = eval(msg.value.decode('utf-8'))
print(taskInfos )
except:
print('**'*10,'wrong',ii)
print(msg.value.decode('utf-8'))
if __name__ == '__main__':
par={};
###topic0--在线topic1--离线
#par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks', 'alg-task-results','alg-offline-tasks');par['group_id']='testww';
#par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks','alg-task-results','alg-offline-tasks');par['group_id']='testW11';
par['kafka']='mintors/kafka'
detector(par)