from kafka import KafkaProducer, KafkaConsumer from kafka.errors import kafka_errors import traceback import json,time,random,string import utilsK from utilsK.modelEval import onlineModelProcess import multiprocessing from multiprocessing import Process,Queue def consumer_demo(par): consumer = KafkaConsumer( par['topic'], bootstrap_servers=par['server'], group_id=par['group_id'], auto_offset_reset='latest', enable_auto_commit=False ) itest = 0 ''' for message in consumer: itest+=1 if itest>1:break; print("receive value: {}, partition:{} offset:{}".format( json.loads(message.value.decode()), message.partition,message.offset ) ) consumer.commit() ''' dataPar ={ 'imgData':'', 'imgName':'testW', 'streamName':'THSA_HD5M' } dataPar['inSource'] = 'http://images.5gai.taauav.com/video/8bc32984dd893930dabb2856eb92b4d1.mp4';dataPar['outSource'] = None process_uid=''.join(random.sample(string.ascii_letters + string.digits, 16)) parent_conn, child_conn = multiprocessing.Pipe(); dataPar['callback']=child_conn gpuProcess=Process(target=onlineModelProcess,name='process:%s'%( process_uid ),args=(dataPar,)) gpuProcess.start() child_return = parent_conn.recv() returnData={'bboxes': 9999}; returnData['gpu']=str(child_return) returnData['pid']=gpuProcess.pid returnData['pidName']=gpuProcess.name print( '#####consumer main:',returnData ) if __name__=='__main__': par={}; par['server']='212.129.223.66:9092';par['topic']='thsw';par['group_id']='test'; consumer_demo(par)