68 lines
2.4 KiB
Python
68 lines
2.4 KiB
Python
|
|
import traceback
|
|||
|
|
from kafka import KafkaProducer, KafkaConsumer,TopicPartition
|
|||
|
|
from kafka.errors import kafka_errors
|
|||
|
|
import json
|
|||
|
|
def get_left_cnt(consumer,topic):
|
|||
|
|
partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
|
|||
|
|
|
|||
|
|
# total
|
|||
|
|
toff = consumer.end_offsets(partitions)
|
|||
|
|
toff = [(key.partition, toff[key]) for key in toff.keys()]
|
|||
|
|
toff.sort()
|
|||
|
|
|
|||
|
|
# current
|
|||
|
|
coff = [(x.partition, consumer.committed(x)) for x in partitions]
|
|||
|
|
coff.sort()
|
|||
|
|
|
|||
|
|
# cal sum and left
|
|||
|
|
toff_sum = sum([x[1] for x in toff])
|
|||
|
|
cur_sum = sum([x[1] for x in coff if x[1] is not None])
|
|||
|
|
left_sum = toff_sum - cur_sum
|
|||
|
|
|
|||
|
|
return left_sum
|
|||
|
|
def getAllRecords(consumer,topics):
|
|||
|
|
leftCnt = 0
|
|||
|
|
for topic in topics[0:2]:
|
|||
|
|
leftCnt+=get_left_cnt(consumer,topic)
|
|||
|
|
out = []
|
|||
|
|
if leftCnt == 0:
|
|||
|
|
return []
|
|||
|
|
for ii,msg in enumerate(consumer):
|
|||
|
|
consumer.commit()
|
|||
|
|
out.append(msg)
|
|||
|
|
if ii== (leftCnt-1):
|
|||
|
|
break###断流或者到终点
|
|||
|
|
return out
|
|||
|
|
def detector(par):
|
|||
|
|
consumer = KafkaConsumer(
|
|||
|
|
bootstrap_servers=par['server'],
|
|||
|
|
group_id=par['group_id'],
|
|||
|
|
#auto_offset_reset='earliest',
|
|||
|
|
auto_offset_reset='latest',
|
|||
|
|
#isolation_level = 'read_committed',
|
|||
|
|
#enable_auto_commit=True
|
|||
|
|
)
|
|||
|
|
consumer.subscribe( par['topic'][0:2])
|
|||
|
|
print( ' Start kafka ')
|
|||
|
|
msgs = getAllRecords(consumer,par['topic'])
|
|||
|
|
print( 'getover cnt',len(msgs))
|
|||
|
|
for ii,msg in enumerate(msgs):
|
|||
|
|
print(msg)
|
|||
|
|
try:
|
|||
|
|
print('##'*10,ii)
|
|||
|
|
taskInfos = eval(msg.value.decode('utf-8'))
|
|||
|
|
print(taskInfos )
|
|||
|
|
except:
|
|||
|
|
print('**'*10,'wrong',ii)
|
|||
|
|
print(msg.value.decode('utf-8'))
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
par={};
|
|||
|
|
###topic0--在线,topic1--离线
|
|||
|
|
|
|||
|
|
#par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
|
|||
|
|
|
|||
|
|
par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks', 'alg-task-results','alg-offline-tasks');par['group_id']='testww';
|
|||
|
|
#par['server']='101.132.127.1:19092';par['topic']=('alg-online-tasks','alg-task-results','alg-offline-tasks');par['group_id']='testW11';
|
|||
|
|
par['kafka']='mintors/kafka'
|
|||
|
|
detector(par)
|