package com.github.bluesbruce.spring.service; import com.alibaba.fastjson.JSONObject; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @Component @Slf4j public class MqttLiveHandle { @Autowired private RtmpLiveService rtmpLiveService; public void handleLive(final String topic, MqttMessage message) throws InterruptedException { String objmsg = new String(message.getPayload()); final JSONObject jsonObject = JSONObject.parseObject(objmsg); if (topic.contains("rtmp/live")) { if (!ObjectUtils.isEmpty(jsonObject.get("command"))) { String cmdoperat = jsonObject.get("command").toString(); if (cmdoperat.equals("start")) { Thread.sleep(15000); Thread thread = new Thread(new Runnable() { String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString(); public void run() { try { rtmpLiveService.pushServer(code); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread.start(); } else if (cmdoperat.equals("stop")) { rtmpLiveService.stopRtmp(); } } }else if(topic.contains("/data/DroneBMS")){ log.info("获取object:",jsonObject); if (!ObjectUtils.isEmpty(jsonObject.get("parm"))) { log.info("获取param:",jsonObject.get("parm")); JSONObject cmdoperat = jsonObject.getJSONObject("parm"); String boolbms = cmdoperat.getString("Power"); log.info("boolbms:",jsonObject.get("Power")); if (boolbms.equals("on")) { Thread.sleep(15000); Thread thread = new Thread(new Runnable() { String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString(); public void run() { try { rtmpLiveService.pushServer(code); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread.start(); } else if (boolbms.equals("off")) { log.info("关闭流"); rtmpLiveService.stopRtmp(); } } } } }