DSPRiverInspection/code_bak/consumer2.py

68 lines
2.4 KiB
Python
Raw Permalink Normal View History

2022-07-19 15:38:00 +08:00
import traceback
from kafka import KafkaProducer, KafkaConsumer,TopicPartition
from kafka.errors import kafka_errors
import json
def get_left_cnt(consumer,topic):
partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
# total
toff = consumer.end_offsets(partitions)
toff = [(key.partition, toff[key]) for key in toff.keys()]
toff.sort()
# current
coff = [(x.partition, consumer.committed(x)) for x in partitions]
coff.sort()
# cal sum and left
toff_sum = sum([x[1] for x in toff])
cur_sum = sum([x[1] for x in coff if x[1] is not None])
left_sum = toff_sum - cur_sum
return left_sum
def getAllRecords(consumer,topics):
leftCnt = 0
for topic in topics[0:2]:
leftCnt+=get_left_cnt(consumer,topic)
out = []
if leftCnt == 0:
return []
for ii,msg in enumerate(consumer):
consumer.commit()
out.append(msg)
if ii== (leftCnt-1):
break###断流或者到终点
return out
def detector(par):
consumer = KafkaConsumer(
bootstrap_servers=par['server'],
group_id=par['group_id'],
#auto_offset_reset='earliest',
auto_offset_reset='latest',
#isolation_level = 'read_committed',
#enable_auto_commit=True
)
consumer.subscribe( par['topic'][0:2])
print( ' Start kafka ')
msgs = getAllRecords(consumer,par['topic'])
print( 'getover cnt',len(msgs))
for ii,msg in enumerate(msgs):
print(msg)
try:
print('##'*10,ii)
taskInfos = eval(msg.value.decode('utf-8'))
print(taskInfos )
except:
print('**'*10,'wrong',ii)
print(msg.value.decode('utf-8'))
if __name__ == '__main__':
par={};
###topic0--在线topic1--离线
#par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks', 'alg-task-results','alg-offline-tasks');par['group_id']='testww';
#par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks','alg-task-results','alg-offline-tasks');par['group_id']='testW11';
par['kafka']='mintors/kafka'
detector(par)