Browse Source

推流不稳定问题修复

master
wangwei 1 year ago
parent
commit
2843072216
3 changed files with 63 additions and 63 deletions
  1. +46
    -40
      src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java
  2. +13
    -13
      src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java
  3. +4
    -10
      src/main/java/com/github/bluesbruce/spring/web/IndexController.java

+ 46
- 40
src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java View File

@@ -15,53 +15,59 @@ public class MqttLiveHandle {
private RtmpLiveService rtmpLiveService;

public void handleLive(final String topic, MqttMessage message) throws InterruptedException {
String objmsg = new String(message.getPayload());
final JSONObject jsonObject = JSONObject.parseObject(objmsg);
if (topic.contains("rtmp/live")) {
if (!ObjectUtils.isEmpty(jsonObject.get("command"))) {
String cmdoperat = jsonObject.get("command").toString();
if (cmdoperat.equals("start")) {
try {
String objmsg = new String(message.getPayload());
final JSONObject jsonObject = JSONObject.parseObject(objmsg);
if (topic.contains("rtmp/live")) {
if (!ObjectUtils.isEmpty(jsonObject.get("command"))) {
String cmdoperat = jsonObject.get("command").toString();
if (cmdoperat.equals("start")) {

Thread thread = new Thread(new Runnable() {
String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString();
public void run() {
try {
rtmpLiveService.pushServer(code);
} catch (InterruptedException e) {
e.printStackTrace();
Thread thread = new Thread(new Runnable() {
String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString();

public void run() {
try {
rtmpLiveService.pushServer(code);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
thread.start();
} else if (cmdoperat.equals("stop")) {
rtmpLiveService.stopRtmp();
});
thread.start();
} else if (cmdoperat.equals("stop")) {
rtmpLiveService.stopRtmp();
}
}
}
}else if(topic.contains("/data/DroneBMS")){
log.info("获取object:",jsonObject);
if (!ObjectUtils.isEmpty(jsonObject.get("parm"))) {
log.info("获取param:",jsonObject.get("parm"));
JSONObject cmdoperat = jsonObject.getJSONObject("parm");
String boolbms = cmdoperat.getString("Power");
log.info("boolbms:",jsonObject.get("Power"));
if (boolbms.equals("on")) {
} else if (topic.contains("/data/DroneBMS")) {
log.info("获取object:", jsonObject);
if (!ObjectUtils.isEmpty(jsonObject.get("parm"))) {
log.info("获取param:", jsonObject.get("parm"));
JSONObject cmdoperat = jsonObject.getJSONObject("parm");
String boolbms = cmdoperat.getString("Power");
log.info("boolbms:", jsonObject.get("Power"));
if (boolbms.equals("on")) {

Thread thread = new Thread(new Runnable() {
String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString();

Thread thread = new Thread(new Runnable() {
String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString();
public void run() {
try {
rtmpLiveService.pushServer(code);
} catch (InterruptedException e) {
e.printStackTrace();
public void run() {
try {
rtmpLiveService.pushServer(code);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
thread.start();
} else if (boolbms.equals("off")) {
log.info("关闭流");
rtmpLiveService.stopRtmp();
});
thread.start();
} else if (boolbms.equals("off")) {
log.info("关闭流");
rtmpLiveService.stopRtmp();
}
}
}
}catch (Exception e){
log.error("",e);
}
}


+ 13
- 13
src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java View File

@@ -99,8 +99,8 @@ public class RtmpLiveService {
} else if (cmdParam.getType() == 4) {
runRtmp1to2(cmdParam.getPlayUrl());
}
status=0;
DefaultOutHandlerMethod.outIdMap=new HashMap();
/*status=0;
DefaultOutHandlerMethod.outIdMap=new HashMap();*/
}

private void runRtmp2to2(String playUrl) throws InterruptedException {
@@ -140,11 +140,11 @@ public class RtmpLiveService {
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
Thread.sleep(cmdParam.getTime());
/*Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();
manager.destory();*/
} catch (Exception e) {
e.printStackTrace();
}
@@ -178,11 +178,11 @@ public class RtmpLiveService {
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
Thread.sleep(cmdParam.getTime());
/*Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();
manager.destory();*/
} catch (Exception e) {
e.printStackTrace();
}
@@ -266,18 +266,18 @@ public class RtmpLiveService {
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
Thread.sleep(cmdParam.getTime());
/*Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();
manager.destory();*/
} catch (Exception e) {
log.error("", e);
}
//ds.close();
}

private void checkMsg1() throws InterruptedException {
/*private void checkMsg1() throws InterruptedException {
long oldtime = System.currentTimeMillis();
String oldMsg = "";
while (true) {
@@ -310,7 +310,7 @@ public class RtmpLiveService {
}
Thread.sleep(200);
}
}
}*/
private void checkMsg() throws InterruptedException {
long oldtime = System.currentTimeMillis();
String oldMsg = "";
@@ -358,7 +358,7 @@ public class RtmpLiveService {
CommandTasker s = manager.query(id);
if (!ObjectUtils.isEmpty(s)) {
log.info("0000{},{}", s.getId(), s.getCommand());
log.info("任务消息未输出", s);
log.info("任务消息未输出,重新推流", s);
manager.stop(s.getId());
Thread.sleep(2000);
String result = manager.start(s.getId(), s.getCommand().split("bin/")[1]);
@@ -470,11 +470,11 @@ public class RtmpLiveService {
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
Thread.sleep(cmdParam.getTime());
/*Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();
manager.destory();*/
} catch (Exception e) {
log.error("", e);
}

+ 4
- 10
src/main/java/com/github/bluesbruce/spring/web/IndexController.java View File

@@ -71,11 +71,8 @@ public class IndexController {
@ResponseBody
public String start() {
try {
if(rtmpLiveService.manager.queryAll().size()>0){
rtmpLiveService.status=0;
DefaultOutHandlerMethod.outIdMap= new HashMap();
rtmpLiveService.manager.stopAll();
};
log.info("http发送重启推流");
rtmpLiveService.stopRtmp();
//启动线程
Thread thread = new Thread(new Runnable() {
public void run() {
@@ -106,11 +103,8 @@ public class IndexController {
@ResponseBody
public String stop() {
try {
if(rtmpLiveService.manager.queryAll().size()>0){
rtmpLiveService.status=0;
DefaultOutHandlerMethod.outIdMap= new HashMap();
rtmpLiveService.manager.stopAll();
};
log.info("http发送停止推流");
rtmpLiveService.stopRtmp();
return objectMapper.writeValueAsString("success");
} catch (Exception e) {
e.printStackTrace();

Loading…
Cancel
Save