From 284307221664d17fcdb0ecb2d2c6a8890aa0f7f4 Mon Sep 17 00:00:00 2001 From: wangwei <305939031@qq.com> Date: Fri, 13 Oct 2023 13:40:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E6=B5=81=E4=B8=8D=E7=A8=B3=E5=AE=9A?= =?UTF-8?q?=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../spring/service/MqttLiveHandle.java | 92 ++++++++++--------- .../spring/service/RtmpLiveService.java | 26 +++--- .../spring/web/IndexController.java | 14 +-- 3 files changed, 66 insertions(+), 66 deletions(-) diff --git a/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java b/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java index 92c3427..73125f7 100644 --- a/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java +++ b/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java @@ -15,53 +15,59 @@ public class MqttLiveHandle { private RtmpLiveService rtmpLiveService; public void handleLive(final String topic, MqttMessage message) throws InterruptedException { - String objmsg = new String(message.getPayload()); - final JSONObject jsonObject = JSONObject.parseObject(objmsg); - if (topic.contains("rtmp/live")) { - if (!ObjectUtils.isEmpty(jsonObject.get("command"))) { - String cmdoperat = jsonObject.get("command").toString(); - if (cmdoperat.equals("start")) { + try { + String objmsg = new String(message.getPayload()); + final JSONObject jsonObject = JSONObject.parseObject(objmsg); + if (topic.contains("rtmp/live")) { + if (!ObjectUtils.isEmpty(jsonObject.get("command"))) { + String cmdoperat = jsonObject.get("command").toString(); + if (cmdoperat.equals("start")) { - Thread thread = new Thread(new Runnable() { - String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString(); - public void run() { - try { - rtmpLiveService.pushServer(code); - } catch (InterruptedException e) { - e.printStackTrace(); + Thread thread = new Thread(new Runnable() { + String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString(); + + public void run() { + try { + rtmpLiveService.pushServer(code); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - } - }); - thread.start(); - } else if (cmdoperat.equals("stop")) { - rtmpLiveService.stopRtmp(); - } - } - }else if(topic.contains("/data/DroneBMS")){ - log.info("获取object:",jsonObject); - if (!ObjectUtils.isEmpty(jsonObject.get("parm"))) { - log.info("获取param:",jsonObject.get("parm")); - JSONObject cmdoperat = jsonObject.getJSONObject("parm"); - String boolbms = cmdoperat.getString("Power"); - log.info("boolbms:",jsonObject.get("Power")); - if (boolbms.equals("on")) { - - Thread thread = new Thread(new Runnable() { - String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString(); - public void run() { - try { - rtmpLiveService.pushServer(code); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }); - thread.start(); - } else if (boolbms.equals("off")) { - log.info("关闭流"); - rtmpLiveService.stopRtmp(); + }); + thread.start(); + } else if (cmdoperat.equals("stop")) { + rtmpLiveService.stopRtmp(); + } + } + } else if (topic.contains("/data/DroneBMS")) { + log.info("获取object:", jsonObject); + if (!ObjectUtils.isEmpty(jsonObject.get("parm"))) { + log.info("获取param:", jsonObject.get("parm")); + JSONObject cmdoperat = jsonObject.getJSONObject("parm"); + String boolbms = cmdoperat.getString("Power"); + log.info("boolbms:", jsonObject.get("Power")); + if (boolbms.equals("on")) { + + Thread thread = new Thread(new Runnable() { + String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString(); + + public void run() { + try { + rtmpLiveService.pushServer(code); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + thread.start(); + } else if (boolbms.equals("off")) { + log.info("关闭流"); + rtmpLiveService.stopRtmp(); + } } } + }catch (Exception e){ + log.error("",e); } } diff --git a/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java b/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java index 3dc9c55..626d2c8 100644 --- a/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java +++ b/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java @@ -99,8 +99,8 @@ public class RtmpLiveService { } else if (cmdParam.getType() == 4) { runRtmp1to2(cmdParam.getPlayUrl()); } - status=0; - DefaultOutHandlerMethod.outIdMap=new HashMap(); + /*status=0; + DefaultOutHandlerMethod.outIdMap=new HashMap();*/ } private void runRtmp2to2(String playUrl) throws InterruptedException { @@ -140,11 +140,11 @@ public class RtmpLiveService { jsonObject.put("msg", "推流服务启动成功"); mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString()); } - Thread.sleep(cmdParam.getTime()); + /*Thread.sleep(cmdParam.getTime()); log.info(manager.queryAll().toString()); // 停止全部任务 manager.stopAll(); - manager.destory(); + manager.destory();*/ } catch (Exception e) { e.printStackTrace(); } @@ -178,11 +178,11 @@ public class RtmpLiveService { jsonObject.put("msg", "推流服务启动成功"); mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString()); } - Thread.sleep(cmdParam.getTime()); + /*Thread.sleep(cmdParam.getTime()); log.info(manager.queryAll().toString()); // 停止全部任务 manager.stopAll(); - manager.destory(); + manager.destory();*/ } catch (Exception e) { e.printStackTrace(); } @@ -266,18 +266,18 @@ public class RtmpLiveService { jsonObject.put("msg", "推流服务启动成功"); mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString()); } - Thread.sleep(cmdParam.getTime()); + /*Thread.sleep(cmdParam.getTime()); log.info(manager.queryAll().toString()); // 停止全部任务 manager.stopAll(); - manager.destory(); + manager.destory();*/ } catch (Exception e) { log.error("", e); } //ds.close(); } - private void checkMsg1() throws InterruptedException { + /*private void checkMsg1() throws InterruptedException { long oldtime = System.currentTimeMillis(); String oldMsg = ""; while (true) { @@ -310,7 +310,7 @@ public class RtmpLiveService { } Thread.sleep(200); } - } + }*/ private void checkMsg() throws InterruptedException { long oldtime = System.currentTimeMillis(); String oldMsg = ""; @@ -358,7 +358,7 @@ public class RtmpLiveService { CommandTasker s = manager.query(id); if (!ObjectUtils.isEmpty(s)) { log.info("0000{},{}", s.getId(), s.getCommand()); - log.info("任务消息未输出", s); + log.info("任务消息未输出,重新推流", s); manager.stop(s.getId()); Thread.sleep(2000); String result = manager.start(s.getId(), s.getCommand().split("bin/")[1]); @@ -470,11 +470,11 @@ public class RtmpLiveService { jsonObject.put("msg", "推流服务启动成功"); mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString()); } - Thread.sleep(cmdParam.getTime()); + /*Thread.sleep(cmdParam.getTime()); log.info(manager.queryAll().toString()); // 停止全部任务 manager.stopAll(); - manager.destory(); + manager.destory();*/ } catch (Exception e) { log.error("", e); } diff --git a/src/main/java/com/github/bluesbruce/spring/web/IndexController.java b/src/main/java/com/github/bluesbruce/spring/web/IndexController.java index dd794ad..2a74f62 100644 --- a/src/main/java/com/github/bluesbruce/spring/web/IndexController.java +++ b/src/main/java/com/github/bluesbruce/spring/web/IndexController.java @@ -71,11 +71,8 @@ public class IndexController { @ResponseBody public String start() { try { - if(rtmpLiveService.manager.queryAll().size()>0){ - rtmpLiveService.status=0; - DefaultOutHandlerMethod.outIdMap= new HashMap(); - rtmpLiveService.manager.stopAll(); - }; + log.info("http发送重启推流"); + rtmpLiveService.stopRtmp(); //启动线程 Thread thread = new Thread(new Runnable() { public void run() { @@ -106,11 +103,8 @@ public class IndexController { @ResponseBody public String stop() { try { - if(rtmpLiveService.manager.queryAll().size()>0){ - rtmpLiveService.status=0; - DefaultOutHandlerMethod.outIdMap= new HashMap(); - rtmpLiveService.manager.stopAll(); - }; + log.info("http发送停止推流"); + rtmpLiveService.stopRtmp(); return objectMapper.writeValueAsString("success"); } catch (Exception e) { e.printStackTrace();