68 lines
2.4 KiB
Python
68 lines
2.4 KiB
Python
import traceback
|
||
from kafka import KafkaProducer, KafkaConsumer,TopicPartition
|
||
from kafka.errors import kafka_errors
|
||
import json
|
||
def get_left_cnt(consumer,topic):
|
||
partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
|
||
|
||
# total
|
||
toff = consumer.end_offsets(partitions)
|
||
toff = [(key.partition, toff[key]) for key in toff.keys()]
|
||
toff.sort()
|
||
|
||
# current
|
||
coff = [(x.partition, consumer.committed(x)) for x in partitions]
|
||
coff.sort()
|
||
|
||
# cal sum and left
|
||
toff_sum = sum([x[1] for x in toff])
|
||
cur_sum = sum([x[1] for x in coff if x[1] is not None])
|
||
left_sum = toff_sum - cur_sum
|
||
|
||
return left_sum
|
||
def getAllRecords(consumer,topics):
|
||
leftCnt = 0
|
||
for topic in topics[0:2]:
|
||
leftCnt+=get_left_cnt(consumer,topic)
|
||
out = []
|
||
if leftCnt == 0:
|
||
return []
|
||
for ii,msg in enumerate(consumer):
|
||
consumer.commit()
|
||
out.append(msg)
|
||
if ii== (leftCnt-1):
|
||
break###断流或者到终点
|
||
return out
|
||
def detector(par):
|
||
consumer = KafkaConsumer(
|
||
bootstrap_servers=par['server'],
|
||
group_id=par['group_id'],
|
||
#auto_offset_reset='earliest',
|
||
auto_offset_reset='latest',
|
||
#isolation_level = 'read_committed',
|
||
#enable_auto_commit=True
|
||
)
|
||
consumer.subscribe( par['topic'][0:2])
|
||
print( ' Start kafka ')
|
||
msgs = getAllRecords(consumer,par['topic'])
|
||
print( 'getover cnt',len(msgs))
|
||
for ii,msg in enumerate(msgs):
|
||
print(msg)
|
||
try:
|
||
print('##'*10,ii)
|
||
taskInfos = eval(msg.value.decode('utf-8'))
|
||
print(taskInfos )
|
||
except:
|
||
print('**'*10,'wrong',ii)
|
||
print(msg.value.decode('utf-8'))
|
||
if __name__ == '__main__':
|
||
par={};
|
||
###topic0--在线,topic1--离线
|
||
|
||
#par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
|
||
|
||
par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks', 'alg-task-results','alg-offline-tasks');par['group_id']='testww';
|
||
#par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks','alg-task-results','alg-offline-tasks');par['group_id']='testW11';
|
||
par['kafka']='mintors/kafka'
|
||
detector(par)
|