From e1bc3eb1eadd29736f2a7e6ac14504edd2969088 Mon Sep 17 00:00:00 2001 From: wangwei <305939031@qq.com> Date: Wed, 11 Oct 2023 13:11:52 +0800 Subject: [PATCH] =?UTF-8?q?ffmpeg=E5=B7=A5=E5=85=B7=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- loadCmd.properties | 4 +- .../ffch/handler/DefaultOutHandlerMethod.java | 8 +- .../bluesbruce/ffch/handler/OutHandler.java | 4 +- .../spring/service/FFrtmpServer.java | 65 ++++- .../spring/service/MqttLiveHandle.java | 4 +- .../spring/service/RtmpLiveService.java | 152 ++++++++++-- .../bluesbruce/spring/web/SubController.java | 34 --- src/main/webapp/{sub.jsp => push.jsp} | 226 +++++------------- 8 files changed, 264 insertions(+), 233 deletions(-) delete mode 100644 src/main/java/com/github/bluesbruce/spring/web/SubController.java rename src/main/webapp/{sub.jsp => push.jsp} (62%) diff --git a/loadCmd.properties b/loadCmd.properties index 7e668b0..ef52bbf 100644 --- a/loadCmd.properties +++ b/loadCmd.properties @@ -4,11 +4,11 @@ pushUrl=rtmp://192.168.10.101:19350/rlive/stream_11?sign=rHtBg3sz #playUrl=http://192.168.10.101:18000/flv/live/34020000001110000001_34020000001320000099_0200000099.flv playUrl=http://192.168.10.101:18000/flv/live/stream_176.flv #固定通道 1 获取通道 2 多线程双路 3 单线程多路 4 -type=4 +type=3 mqttUrl=tcp://106.15.120.154 #主题 12345替换对应机场编号 004替换控制板id -mqttTopic=/v1/12345/rtmp/live,v1/004/confirm/DroneBMS +mqttTopic=/v1/12345936/rtmp/live,v1/004/confirm/DroneBMS #推流时长(毫秒)1000表示1秒 time=600000 diff --git a/src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java b/src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java index 450602f..6282ae8 100644 --- a/src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java +++ b/src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java @@ -2,6 +2,9 @@ package com.github.bluesbruce.ffch.handler; import lombok.extern.slf4j.Slf4j; +import java.util.HashMap; +import java.util.Map; + /** * 榛樿浠诲姟娑堟伅杈撳嚭澶勭悊 * @author eguid @@ -15,7 +18,7 @@ public class DefaultOutHandlerMethod implements OutHandlerMethod{ * 浠诲姟鏄惁寮傚父涓柇锛屽鏋 */ public boolean isBroken=false; - + public static Map outIdMap = new HashMap(); @Override public void parse(String id,String msg) { long begin = System.currentTimeMillis(); @@ -40,9 +43,8 @@ public class DefaultOutHandlerMethod implements OutHandlerMethod{ }else { isBroken=false; log.info(id + "娑堟伅锛" + msg); - } - + outIdMap.put(id,msg); } @Override diff --git a/src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java b/src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java index e3bdfb1..f768c18 100644 --- a/src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java +++ b/src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java @@ -27,6 +27,7 @@ public class OutHandler extends Thread { /**娑堟伅澶勭悊鏂规硶*/ private OutHandlerMethod ohm; + public static String outMsg; /** * 鍒涘缓杈撳嚭绾跨▼锛堥粯璁ょ珛鍗冲紑鍚嚎绋嬶級 @@ -93,9 +94,10 @@ public class OutHandler extends Thread { try { if (CommandManager.config.isDebug()) { log.info(id + "寮濮嬫帹娴侊紒"); - } + } while (desstatus && (msg = br.readLine()) != null) { ohm.parse(id,msg); + outMsg=msg; if(ohm.isbroken()) { log.info("妫娴嬪埌涓柇锛屾彁浜ら噸鍚换鍔$粰淇濇椿澶勭悊鍣"); //濡傛灉鍙戠敓寮傚父涓柇锛岀珛鍗宠繘琛屼繚娲 diff --git a/src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java b/src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java index 822584f..463b4c7 100644 --- a/src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java +++ b/src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java @@ -4,13 +4,19 @@ package com.github.bluesbruce.spring.service; import com.github.bluesbruce.ffch.CommandManager; import com.github.bluesbruce.ffch.CommandManagerImpl; import com.github.bluesbruce.ffch.commandbuidler.CommandBuidlerFactory; +import com.github.bluesbruce.ffch.data.CommandTasker; +import com.github.bluesbruce.ffch.handler.OutHandler; import com.github.bluesbruce.spring.config.CmdParam; import lombok.extern.slf4j.Slf4j; +import org.h2.util.StringUtils; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import java.util.Collection; +import java.util.Iterator; + import static com.github.bluesbruce.ffch.util.PropertiesUtil.load; @Slf4j @@ -19,17 +25,18 @@ import static com.github.bluesbruce.ffch.util.PropertiesUtil.load; public class FFrtmpServer implements ApplicationRunner { public static final CmdParam cmdParam=load("/loadCmd.properties", CmdParam.class); + CommandManager manager = new CommandManagerImpl(); + int status=0; @Override public void run(ApplicationArguments args) { try { - /*CommandManager manager = new CommandManagerImpl(); // -rtsp_transport tcp //娴嬭瘯澶氫釜浠讳綍鍚屾椂鎵ц鍜屽仠姝㈡儏鍐 //false琛ㄧず浣跨敤閰嶇疆鏂囦欢涓殑ffmpeg璺緞锛宼rue琛ㄧず鏈潯鍛戒护宸茬粡鍖呭惈ffmpeg鎵鍦ㄧ殑瀹屾暣璺緞 //manager.start("tomcat", "ffmpeg -i http://192.168.10.101:18000/flv/live/34020000001110000002_34020000001320000071_0200000071.flv -vcodec copy -acodec copy -f flv -y rtmp://192.168.10.101:19350/rlive/stream_9?sign=f8a15b6n",false); //manager.start("tomcat", "ffmpeg -i rtsp://192.168.144.25:554/stream=0 -vcodec copy -acodec copy -f flv -y rtmp://221.226.114.142:19350/rlive/stream_9?sign=f8a15b6n",false); //manager.start("tomcat1", "ffmpeg -i rtsp://192.168.144.25:554/stream=0 -vcodec copy -acodec copy -f flv -y rtmp://221.226.114.142:19350/rlive/stream_11?sign=rHtBg3sz",false); - if (cmdParam.getType()==1) { + /*if (cmdParam.getType()==1) { manager.start("test0", CommandBuidlerFactory.createBuidler() .add("ffmpeg") .add("-rtsp_transport","tcp").add("-i", cmdParam.getPlayUrl()) @@ -66,7 +73,7 @@ public class FFrtmpServer implements ApplicationRunner { //鍗曡繘绋 澶氳矾 manager.start("test4", CommandBuidlerFactory.createBuidler() .add("ffmpeg") - *//*.add("-rtsp_transport", "tcp")*//*.add("-i", cmdParam.getPlayUrl()) + .add("-rtsp_transport", "tcp").add("-i", cmdParam.getPlayUrl()) .add("-vcodec", "copy") .add("-acodec", "copy") .add("-b:v", "2M") @@ -76,15 +83,63 @@ public class FFrtmpServer implements ApplicationRunner { .add("-map", "0") .add("-f").add(cmdParam.getPushUrlMap())); } - Thread.sleep(cmdParam.getTime()); + status =1; + //妫娴嬬嚎绋 + Thread thread = new Thread(new Runnable() { + public void run() { + try { + checkMsg(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + thread.start(); + Thread.sleep(20000); // 鍋滄鍏ㄩ儴浠诲姟 - manager.stopAll();*/ + manager.stopAll(); + status=0;*/ }catch (Exception e){ log.error("",e); } //ds.close(); } + private void checkMsg() throws InterruptedException { + long oldtime = System.currentTimeMillis(); + String oldMsg = ""; + while (true) { + String msg = OutHandler.outMsg==null?"":OutHandler.outMsg; + if (!msg.equals(oldMsg)) { + oldMsg=msg; + } + if (System.currentTimeMillis()-oldtime>15000&&msg.equals(oldMsg)&&status==1){ + log.info("娑堟伅鏈緭鍑簕}",msg); + if(manager.queryAll().size()>0){ + Collection list = manager.queryAll(); + Iterator commandTaskerIterator = list.iterator(); + CommandTasker s = commandTaskerIterator.next(); + log.info("0000{},{}",s.getId(),s.getCommand()); + log.info("浠诲姟娑堟伅鏈緭鍑",s); + manager.stopAll(); + Thread.sleep(2000); + if (status==0){ + return; + } + String result= manager.start(s.getId(),s.getCommand().split("bin/")[1]); + if (!StringUtils.isNullOrEmpty(result)){ + oldtime= System.currentTimeMillis(); + }else{ + log.info("鎺ㄦ祦澶辫触锛岄噸鏂版帹娴"); + } + }; + } + if (status==0){ + return; + } + Thread.sleep(2000); + } + } } diff --git a/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java b/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java index 109ff75..92c3427 100644 --- a/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java +++ b/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java @@ -21,7 +21,7 @@ public class MqttLiveHandle { if (!ObjectUtils.isEmpty(jsonObject.get("command"))) { String cmdoperat = jsonObject.get("command").toString(); if (cmdoperat.equals("start")) { - Thread.sleep(15000); + Thread thread = new Thread(new Runnable() { String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString(); public void run() { @@ -45,7 +45,7 @@ public class MqttLiveHandle { String boolbms = cmdoperat.getString("Power"); log.info("boolbms锛",jsonObject.get("Power")); if (boolbms.equals("on")) { - Thread.sleep(15000); + Thread thread = new Thread(new Runnable() { String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString(); public void run() { diff --git a/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java b/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java index 5b06e8a..3a7a412 100644 --- a/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java +++ b/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java @@ -5,6 +5,8 @@ import com.github.bluesbruce.ffch.CommandManagerImpl; import com.github.bluesbruce.ffch.commandbuidler.CommandBuidlerFactory; import com.github.bluesbruce.ffch.data.CommandTasker; import com.alibaba.fastjson.JSONObject; +import com.github.bluesbruce.ffch.handler.DefaultOutHandlerMethod; +import com.github.bluesbruce.ffch.handler.OutHandler; import com.github.bluesbruce.spring.config.CmdParam; import com.github.bluesbruce.spring.mqttService.HttpURLConnectionUtil; import com.github.bluesbruce.spring.mqttService.send.MqttProviderConfig; @@ -14,8 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; -import java.util.Collection; -import java.util.List; +import java.util.*; import static com.github.bluesbruce.ffch.util.PropertiesUtil.load; @@ -30,7 +31,9 @@ public class RtmpLiveService { @Autowired private MqttProviderConfig mqttProviderConfig; - public static final CommandManager manager = new CommandManagerImpl(); + public static int status =0; + + public static CommandManager manager = new CommandManagerImpl(); /** * h鑾峰彇閫氶亾骞舵帹閫 @@ -51,6 +54,18 @@ public class RtmpLiveService { log.info("浠诲姟杩愯涓"); return; } + //妫娴嬬嚎绋 + Thread thread = new Thread(new Runnable() { + public void run() { + try { + checkMsg(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + thread.start(); + status=1; //Thread.sleep(5000); String pushUrl = ""; if (cmdParam.getType() == 2) { @@ -80,42 +95,42 @@ public class RtmpLiveService { } else if (cmdParam.getType() == 4) { runRtmp1to2(cmdParam.getPlayUrl()); } - - + status=0; + DefaultOutHandlerMethod.outIdMap=new HashMap(); } private void runRtmp2to2(String playUrl) throws InterruptedException { try { String reTopic = cmdParam.getMqttTopic().replace("live", "result"); - String taskId = manager.start("test1", CommandBuidlerFactory.createBuidler() + String taskId = manager.start("push3-1", CommandBuidlerFactory.createBuidler() .add("ffmpeg") .add("-rtsp_transport", "tcp").add("-i", playUrl) .add("-vcodec", "copy") /*.add("-acodec", "copy")*/ .add("-f", "flv") - .add("-b:v", "2M") - .add("-maxrate", "2M") + .add("-b:v", "1M") + .add("-maxrate", "1M") .add("-bufsize", "1M") - .add("-y").add(cmdParam.getPushUrl())); - Thread.sleep(3000); - String taskId2 = manager.start("test2", CommandBuidlerFactory.createBuidler() + .add("-an").add(cmdParam.getPushUrl())); + Thread.sleep(5000); + String taskId2 = manager.start("push3-2", CommandBuidlerFactory.createBuidler() .add("ffmpeg") .add("-rtsp_transport", "tcp").add("-i", playUrl) .add("-vcodec", "copy") /* .add("-acodec", "copy")*/ .add("-f", "flv") - .add("-b:v", "2M") - .add("-maxrate", "2M") + .add("-b:v", "1M") + .add("-maxrate", "1M") .add("-bufsize", "1M") - .add("-y").add(cmdParam.getPushUrl2())); + .add("-an").add(cmdParam.getPushUrl2())); JSONObject jsonObject = new JSONObject(); if (StringUtils.isNullOrEmpty(taskId) && StringUtils.isNullOrEmpty(taskId2)) { jsonObject.put("code", -1); jsonObject.put("msg", "鎺ㄦ祦鏈嶅姟澶辫触"); //鎺ㄦ祦澶辫触 mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString()); // 鍋滄鍏ㄩ儴浠诲姟 - manager.stopAll(); - return; + /*manager.stopAll(); + return;*/ } else { jsonObject.put("code", 0); jsonObject.put("msg", "鎺ㄦ祦鏈嶅姟鍚姩鎴愬姛"); @@ -135,7 +150,7 @@ public class RtmpLiveService { try { //鍗曡繘绋 澶氳矾 String reTopic = cmdParam.getMqttTopic().replace("live", "result"); - String taskId = manager.start("test4", CommandBuidlerFactory.createBuidler() + String taskId = manager.start("push4", CommandBuidlerFactory.createBuidler() .add("ffmpeg") .add("-rtsp_transport", "tcp").add("-i", playUrl) .add("-vcodec", "copy") @@ -218,7 +233,7 @@ public class RtmpLiveService { try { log.info("鑾峰彇鎾祦鍦板潃锛歿}"); //CommandManager manager = new CommandManagerImpl(); - String taskId = manager.start(code, CommandBuidlerFactory.createBuidler() + String taskId = manager.start("push1", CommandBuidlerFactory.createBuidler() .add("ffmpeg").add("-rtsp_transport", "tcp") .add("-i", playUrl) .add("-vcodec", "copy") @@ -258,8 +273,109 @@ public class RtmpLiveService { //ds.close(); } + private void checkMsg1() throws InterruptedException { + long oldtime = System.currentTimeMillis(); + String oldMsg = ""; + while (true) { + String msg = OutHandler.outMsg==null?"":OutHandler.outMsg; + if (!msg.equals(oldMsg)) { + oldMsg=msg; + oldtime=System.currentTimeMillis(); + } + log.info("娑堟伅杈撳嚭{},鐘舵亄}",msg,status); + if (System.currentTimeMillis()-oldtime>15000&&msg.equals(oldMsg)&&status==1){ + //log.info("娑堟伅鏈緭鍑簕}",msg); + if(manager.queryAll().size()>0){ + Collection list = manager.queryAll(); + Iterator commandTaskerIterator = list.iterator(); + CommandTasker s = commandTaskerIterator.next(); + log.info("0000{},{}",s.getId(),s.getCommand()); + log.info("浠诲姟娑堟伅鏈緭鍑",s); + manager.stop(s.getId()); + Thread.sleep(2000); + String result= manager.start(s.getId(),s.getCommand().split("bin/")[1]); + if (!StringUtils.isNullOrEmpty(result)){ + oldtime= System.currentTimeMillis(); + }else{ + log.info("鎺ㄦ祦澶辫触锛岄噸鏂版帹娴"); + } + }; + } + if (status==0){ + break; + } + Thread.sleep(200); + } + } + private void checkMsg() throws InterruptedException { + long oldtime = System.currentTimeMillis(); + String oldMsg = ""; + long oldtime2 = System.currentTimeMillis(); + String oldMsg2 = ""; + while (true) { + Map map = DefaultOutHandlerMethod.outIdMap; + if (cmdParam.getType()==3){ + Map map1 = checkMsgDetil(oldtime,oldMsg,map.get("push3-1")==null?"":map.get("push3-1").toString(),"push3-1"); + oldMsg=map1.get("oldMsg")==null?"":map1.get("oldMsg").toString(); + oldtime=Long.parseLong(map1.get("oldtime")==null?"0":map1.get("oldtime").toString()); + Map map2 = checkMsgDetil(oldtime2,oldMsg2,map.get("push3-2")==null?"":map.get("push3-2").toString(),"push3-2"); + oldMsg2=map2.get("oldMsg")==null?"":map2.get("oldMsg").toString(); + oldtime2=Long.parseLong(map2.get("oldtime")==null?"0":map2.get("oldtime").toString()); + }else { + String msg = ""; + String keys =""; + for (Object key : map.keySet()) { + msg=map.get(key).toString(); + if (!(key.toString().equals("push3-1")||key.toString().equals("push3-1"))) { + keys = key.toString(); + } + } + Map map1 = checkMsgDetil(oldtime,oldMsg,msg,keys); + oldMsg=map1.get("oldMsg")==null?"":map1.get("oldMsg").toString(); + oldtime=Long.parseLong(map1.get("oldtime")==null?"0":map1.get("oldtime").toString()); + } + if (status==0){ + break; + } + Thread.sleep(200); + } + } + + public Map checkMsgDetil(long oldtime,String oldMsg,String msg,String id) throws InterruptedException { + if (!msg.equals(oldMsg)) { + oldMsg = msg; + oldtime = System.currentTimeMillis(); + } + log.info("娑堟伅杈撳嚭{},鐘舵亄}", msg, status); + if (System.currentTimeMillis() - oldtime > 15000 && msg.equals(oldMsg) && status == 1) { + //log.info("娑堟伅鏈緭鍑簕}",msg); + /*Collection list = manager.queryAll(); + Iterator commandTaskerIterator = list.iterator();*/ + CommandTasker s = manager.query(id); + if (!ObjectUtils.isEmpty(s)) { + log.info("0000{},{}", s.getId(), s.getCommand()); + log.info("浠诲姟娑堟伅鏈緭鍑", s); + manager.stop(s.getId()); + Thread.sleep(2000); + String result = manager.start(s.getId(), s.getCommand().split("bin/")[1]); + if (!StringUtils.isNullOrEmpty(result)) { + oldtime = System.currentTimeMillis(); + } else { + log.info("鎺ㄦ祦澶辫触锛岄噸鏂版帹娴"); + } + } + + } + Map map = new HashMap(); + map.put("oldtime",oldtime); + map.put("oldMsg",oldMsg); + return map; + } + public void stopRtmp() { try { + status=0; + DefaultOutHandlerMethod.outIdMap=new HashMap(); log.info("鍋滄鎺ㄦ祦"); //CommandManager manager = new CommandManagerImpl(); // 鍋滄鍏ㄩ儴浠诲姟 diff --git a/src/main/java/com/github/bluesbruce/spring/web/SubController.java b/src/main/java/com/github/bluesbruce/spring/web/SubController.java deleted file mode 100644 index d3fc580..0000000 --- a/src/main/java/com/github/bluesbruce/spring/web/SubController.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.github.bluesbruce.spring.web; - - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; - -import javax.annotation.Resource; - -@Controller -public class SubController { - @Resource - private ObjectMapper objectMapper; - - @Value("${spring.mqtt.client.id}") - private String clientId; - - @Value("${spring.mqtt.default.topic}") - private String defaultTopic; - - @RequestMapping("/init") - @ResponseBody - public String subject(String topic, int qos) { - try { - return "鍙戦佹垚鍔"; - } catch (Exception e) { - e.printStackTrace(); - return "鍙戦佸け璐"; - } - } - -} diff --git a/src/main/webapp/sub.jsp b/src/main/webapp/push.jsp similarity index 62% rename from src/main/webapp/sub.jsp rename to src/main/webapp/push.jsp index 583aa53..23540a3 100644 --- a/src/main/webapp/sub.jsp +++ b/src/main/webapp/push.jsp @@ -9,7 +9,7 @@ - mqtt宸ュ叿 + 鎺ㄦ祦宸ュ叿 @@ -74,24 +74,25 @@
-
+ <%--
-
-
+
--%> + <%--
- + --%> - + + <%--涓昏璺宠浆鎸夐挳 娆¤璺宠浆鎸夐挳--%>
-
+ <%--
@@ -100,10 +101,18 @@
- - <%--涓昏璺宠浆鎸夐挳 - 娆¤璺宠浆鎸夐挳--%> -
+ <%––%> + <%–涓昏璺宠浆鎸夐挳 + 娆¤璺宠浆鎸夐挳–%> +
--%> +
+
+ +
+
+ +
+
@@ -111,16 +120,16 @@
-
+
-
+
<%--
璁㈤槄
--%> - 鍏ㄩ儴璁㈤槄 + <%--鍏ㄩ儴璁㈤槄--%>
- + @@ -199,27 +208,16 @@
id鎺掑簭璁㈤槄娑堟伅澶囨敞鎿嶄綔id浠诲姟鍚嶇О鎸囦护澶囨敞鎿嶄綔