Browse Source

ffmpeg工具修改

master
wangwei 11 months ago
parent
commit
053ca2c023
9 changed files with 223 additions and 68 deletions
  1. +12
    -7
      loadCmd.properties
  2. +1
    -1
      loadFFmpeg.properties
  3. +5
    -0
      src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java
  4. +4
    -1
      src/main/java/com/github/bluesbruce/ffch/handler/KeepAliveHandler.java
  5. +1
    -1
      src/main/java/com/github/bluesbruce/spring/mqttService/consumer/MqttConsumerCallBack.java
  6. +6
    -1
      src/main/java/com/github/bluesbruce/spring/mqttService/send/MqttProviderConfig.java
  7. +1
    -1
      src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java
  8. +46
    -12
      src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java
  9. +147
    -44
      src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java

+ 12
- 7
loadCmd.properties View File

#推流地址(eg:阿里云地址) #推流地址(eg:阿里云地址)
pushUrl=rtmp://192.168.10.101:19350/rlive/stream_12?sign=LeL5Wchx
#多路推流
pushUrlMap=tee "[f=flv]rtmp://192.168.10.101:19350/rlive/stream_12?sign=LeL5Wchx | [f=flv]rtmp://192.168.10.101:19350/rlive/stream_17?sign=64FIaU8X"
pushUrl2=rtmp://192.168.10.101:19350/rlive/stream_17?sign=64FIaU8X
pushUrl=rtmp://192.168.10.101:19350/rlive/stream_11?sign=rHtBg3sz
#拉流地址(eg:吊舱拉流的地址) #拉流地址(eg:吊舱拉流的地址)
playUrl=http://192.168.10.101:18000/flv/live/34020000001110000001_34020000001320000099_0200000099.flv
#playUrl=http://192.168.10.101:18000/flv/live/34020000001110000001_34020000001320000099_0200000099.flv
playUrl=http://192.168.10.101:18000/flv/live/stream_176.flv
#固定通道 1 获取通道 2 多线程双路 3 单线程多路 4 #固定通道 1 获取通道 2 多线程双路 3 单线程多路 4
type=4 type=4


mqttUrl=tcp://106.15.120.154 mqttUrl=tcp://106.15.120.154
mqttTopic=/v1/123987/rtmp/live
time=60000
#主题 12345替换对应机场编号 004替换控制板id
mqttTopic=/v1/12345/rtmp/live,v1/004/confirm/DroneBMS

#推流时长(毫秒)1000表示1秒
time=600000

#多路推流(选配)
pushUrlMap=tee "[f=flv]rtmp://221.226.114.142:19350/rlive/stream_9?sign=f8a15b6n | [f=flv]rtmp://221.226.114.142:19350/rlive/stream_11?sign=rHtBg3sz"
pushUrl2=rtmp://192.168.10.101:19350/rlive/stream_17?sign=64FIaU8X

+ 1
- 1
loadFFmpeg.properties View File

timeout=300 timeout=300


#开启保活线程 #开启保活线程
keepalive=false
keepalive=true
#是否输出debug消息 #是否输出debug消息
debug=true debug=true

+ 5
- 0
src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java View File

@Override @Override
public void parse(String id,String msg) { public void parse(String id,String msg) {
long begin = System.currentTimeMillis();
//过滤消息 //过滤消息
if (msg.indexOf("fail") != -1) { if (msg.indexOf("fail") != -1) {
log.info(id + "任务可能发生故障:" + msg); log.info(id + "任务可能发生故障:" + msg);
log.info(id + "任务可能发生丢包10054:" + msg); log.info(id + "任务可能发生丢包10054:" + msg);
log.info("失败,设置中断状态"); log.info("失败,设置中断状态");
isBroken=false; isBroken=false;
}else if(msg.indexOf("Invalid")!= -1) {
log.info(id + "任务可能发生非法响应:" + msg);
log.info("失败,设置中断状态");
isBroken=true;
}else { }else {
isBroken=false; isBroken=false;
log.info(id + "消息:" + msg); log.info(id + "消息:" + msg);

+ 4
- 1
src/main/java/com/github/bluesbruce/ffch/handler/KeepAliveHandler.java View File

import com.github.bluesbruce.ffch.data.CommandTasker; import com.github.bluesbruce.ffch.data.CommandTasker;
import com.github.bluesbruce.ffch.data.TaskDao; import com.github.bluesbruce.ffch.data.TaskDao;
import com.github.bluesbruce.ffch.util.ExecUtil; import com.github.bluesbruce.ffch.util.ExecUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;


import java.io.IOException; import java.io.IOException;
return false; return false;
} }
@SneakyThrows
@Override @Override
public void run() { public void run() {
for(;stop_index==0;) { for(;stop_index==0;) {
} }
String id=null; String id=null;
CommandTasker task=null; CommandTasker task=null;
Thread.sleep(2000);
//log.info("当前任务{}",queue);
try { try {
while(queue.peek() != null) { while(queue.peek() != null) {
log.info("准备重启任务:"+queue); log.info("准备重启任务:"+queue);

+ 1
- 1
src/main/java/com/github/bluesbruce/spring/mqttService/consumer/MqttConsumerCallBack.java View File

log.info(String.format("接收消息Qos : %d", message.getQos())); log.info(String.format("接收消息Qos : %d", message.getQos()));
log.info(String.format("接收消息内容 : %s", new String(message.getPayload()))); log.info(String.format("接收消息内容 : %s", new String(message.getPayload())));
log.info(String.format("接收消息retained : %b", message.isRetained())); log.info(String.format("接收消息retained : %b", message.isRetained()));
if (topic.contains("/rtmp/live")){
if (topic.contains("/rtmp/live")||topic.contains("/data/DroneBMS")){
if (ObjectUtils.isEmpty(mqttLiveHandle)){ if (ObjectUtils.isEmpty(mqttLiveHandle)){
mqttLiveHandle = SpringUtil.getBean(MqttLiveHandle.class); mqttLiveHandle = SpringUtil.getBean(MqttLiveHandle.class);
} }

+ 6
- 1
src/main/java/com/github/bluesbruce/spring/mqttService/send/MqttProviderConfig.java View File

//设置回调 //设置回调
client.setCallback(new MqttConsumerCallBack()); client.setCallback(new MqttConsumerCallBack());
client.connect(options); client.connect(options);
client.subscribe(cmdParam.getMqttTopic(),0);

String[] topics= cmdParam.getMqttTopic().split(",");
for (String topic :topics) {
client.subscribe(topic,0);
}

} catch(MqttException e){ } catch(MqttException e){
e.printStackTrace(); e.printStackTrace();
} }

+ 1
- 1
src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java View File

@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
try { try {
/*CommandManager manager = new CommandManagerImpl();
/* CommandManager manager = new CommandManagerImpl();
// -rtsp_transport tcp // -rtsp_transport tcp
//测试多个任何同时执行和停止情况 //测试多个任何同时执行和停止情况
//false表示使用配置文件中的ffmpeg路径,true表示本条命令已经包含ffmpeg所在的完整路径 //false表示使用配置文件中的ffmpeg路径,true表示本条命令已经包含ffmpeg所在的完整路径

+ 46
- 12
src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java View File

package com.github.bluesbruce.spring.service; package com.github.bluesbruce.spring.service;


import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;


@Component @Component
@Slf4j
public class MqttLiveHandle { public class MqttLiveHandle {
@Autowired @Autowired
private RtmpLiveService rtmpLiveService; private RtmpLiveService rtmpLiveService;
public void handleLive(final String topic, MqttMessage message){ public void handleLive(final String topic, MqttMessage message){
String objmsg = new String(message.getPayload()); String objmsg = new String(message.getPayload());
final JSONObject jsonObject = JSONObject.parseObject(objmsg); final JSONObject jsonObject = JSONObject.parseObject(objmsg);
if (!ObjectUtils.isEmpty(jsonObject.get("command"))){
String cmdoperat = jsonObject.get("command").toString();
if (cmdoperat.equals("start")){
Thread thread = new Thread(new Runnable() {
String code= jsonObject.get("code")==null?topic.split("/")[2]:jsonObject.get("code").toString();
public void run() {
rtmpLiveService.pushServer(code);
}
});
thread.start();
}else if (cmdoperat.equals("stop")){
rtmpLiveService.stopRtmp();
if (topic.contains("rtmp/live")) {
if (!ObjectUtils.isEmpty(jsonObject.get("command"))) {
String cmdoperat = jsonObject.get("command").toString();
if (cmdoperat.equals("start")) {
Thread thread = new Thread(new Runnable() {
String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString();

public void run() {
try {
rtmpLiveService.pushServer(code);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
} else if (cmdoperat.equals("stop")) {
rtmpLiveService.stopRtmp();
}
}
}else if(topic.contains("/data/DroneBMS")){
log.info("获取object:",jsonObject);
if (!ObjectUtils.isEmpty(jsonObject.get("parm"))) {
log.info("获取param:",jsonObject.get("parm"));
JSONObject cmdoperat = jsonObject.getJSONObject("parm");
String boolbms = cmdoperat.getString("Power");
log.info("boolbms:",jsonObject.get("Power"));
if (boolbms.equals("on")) {
Thread thread = new Thread(new Runnable() {
String code = jsonObject.get("code") == null ? topic.split("/")[2] : jsonObject.get("code").toString();
public void run() {
try {
rtmpLiveService.pushServer(code);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
} else if (boolbms.equals("off")) {
log.info("关闭流");
rtmpLiveService.stopRtmp();
}
} }
} }
} }

+ 147
- 44
src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java View File

/** /**
* 加载配置文件的推拉流地址 * 加载配置文件的推拉流地址
*/ */
public static final CmdParam cmdParam=load("/loadCmd.properties", CmdParam.class);
public static final CmdParam cmdParam = load("/loadCmd.properties", CmdParam.class);


@Autowired @Autowired
private MqttProviderConfig mqttProviderConfig; private MqttProviderConfig mqttProviderConfig;


public static final CommandManager manager = new CommandManagerImpl(); public static final CommandManager manager = new CommandManagerImpl();

/** /**
* h获取通道并推送 * h获取通道并推送
*/ */
public void pushServer(String code){
public void pushServer(String code) throws InterruptedException {
//CommandManager manager = new CommandManagerImpl(); //CommandManager manager = new CommandManagerImpl();
String reTopic = cmdParam.getMqttTopic().replace("live","result");
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
Collection<CommandTasker> infoList = manager.queryAll(); Collection<CommandTasker> infoList = manager.queryAll();
log.info(infoList.toString()); log.info(infoList.toString());
if (infoList.size()>0){
jsonObject.put("code", -1);
if (infoList.size() > 0) {
//manager.stopAll();
//manager.destory();
//Thread.sleep(3000);
/*jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务已启动,请勿重复启动"); //推流失败 jsonObject.put("msg", "推流服务已启动,请勿重复启动"); //推流失败
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());*/
log.info("任务运行中");
return; return;
} }
String pushUrl="";
if (cmdParam.getType()==2){
String pushUrl = "";
if (cmdParam.getType() == 2) {
log.info("获取流媒体通道"); log.info("获取流媒体通道");
//TODO 获取通道服务推拉流地址 //TODO 获取通道服务推拉流地址
JSONObject object = getChenl(); JSONObject object = getChenl();
if (!ObjectUtils.isEmpty(object)){
if (!ObjectUtils.isEmpty(object)) {
code = object.get("code").toString(); code = object.get("code").toString();
object.put("code", 0); object.put("code", 0);
object.put("msg", "获取通道成功"); object.put("msg", "获取通道成功");
mqttProviderConfig.publish(2,false,reTopic,object.toJSONString());
}else{
mqttProviderConfig.publish(2, false, reTopic, object.toJSONString());
} else {
log.info("获取通道失败"); log.info("获取通道失败");
object.put("code", -1); object.put("code", -1);
object.put("msg", "获取通道失败"); object.put("msg", "获取通道失败");
mqttProviderConfig.publish(2,false,reTopic,object.toJSONString());
mqttProviderConfig.publish(2, false, reTopic, object.toJSONString());
return; return;
} }
runRtmp(pushUrl, cmdParam.getPlayUrl(), code);
//TODO end //TODO end
}else{
} else if (cmdParam.getType() == 1) {
log.info("使用固定流媒体通道"); log.info("使用固定流媒体通道");
pushUrl= cmdParam.getPushUrl();
pushUrl = cmdParam.getPushUrl();
runRtmp(pushUrl, cmdParam.getPlayUrl(), code);
} else if (cmdParam.getType() == 3) {
runRtmp2to2(cmdParam.getPlayUrl());
} else if (cmdParam.getType() == 4) {
runRtmp1to2(cmdParam.getPlayUrl());
}


}

private void runRtmp2to2(String playUrl) throws InterruptedException {
try {
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
String taskId = manager.start("test1", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i", playUrl)
.add("-rtsp_transport", "tcp")
.add("-vcodec", "copy")
.add("-acodec", "copy")
.add("-f", "flv")
.add("-b:v", "2M")
.add("-maxrate", "2M")
.add("-bufsize", "1M")
.add("-y").add(cmdParam.getPushUrl()));
Thread.sleep(3000);
String taskId2 = manager.start("test2", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i", playUrl)
.add("-rtsp_transport", "tcp")
.add("-vcodec", "copy")
.add("-acodec", "copy")
.add("-f", "flv")
.add("-b:v", "2M")
.add("-maxrate", "2M")
.add("-bufsize", "1M")
.add("-y").add(cmdParam.getPushUrl2()));
JSONObject jsonObject = new JSONObject();
if (StringUtils.isNullOrEmpty(taskId) && StringUtils.isNullOrEmpty(taskId2)) {
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务失败"); //推流失败
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
// 停止全部任务
manager.stopAll();
return;
} else {
jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();
} catch (Exception e) {
e.printStackTrace();
} }
runRtmp(pushUrl,cmdParam.getPlayUrl(),code);
}


private void runRtmp1to2(String playUrl) {
try {
//单进程 多路
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
String taskId = manager.start("test4", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i", playUrl)
.add("-rtsp_transport", "tcp")
.add("-vcodec", "copy")
.add("-acodec", "copy")
.add("-b:v", "2M")
.add("-maxrate", "2M")
.add("-bufsize", "1M")
//多路推流测试
.add("-map", "0")
.add("-f").add(cmdParam.getPushUrlMap()));
JSONObject jsonObject = new JSONObject();
if (StringUtils.isNullOrEmpty(taskId)) {
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务失败"); //推流失败
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
// 停止全部任务
manager.stopAll();
return;
} else {
jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();
} catch (Exception e) {
e.printStackTrace();
}
} }


/** /**
* 获取阿里云通道 * 获取阿里云通道
*
* @return * @return
*/ */
private JSONObject getChenl() { private JSONObject getChenl() {
String result = HttpURLConnectionUtil.doGet("https://streaming.t-aaron.com/livechannel/getLiveChannelList");
String result = HttpURLConnectionUtil.doGet("https://streaming.t-aaron.com/livechannel/getLiveChannelList");
if (!StringUtils.isNullOrEmpty(result)) { if (!StringUtils.isNullOrEmpty(result)) {
try { try {
JSONObject object = JSONObject.parseObject(result); JSONObject object = JSONObject.parseObject(result);
} }


} }
}catch (Exception e){
log.error("",e);
} catch (Exception e) {
log.error("", e);
} }
}else{
} else {
return null; return null;
} }
return null; return null;


/** /**
* 根据推拉流地址推动 * 根据推拉流地址推动
*
* @param pushUrl * @param pushUrl
* @param playUrl * @param playUrl
*/ */
public void runRtmp(String pushUrl,String playUrl,String code) {
public void runRtmp(String pushUrl, String playUrl, String code) {
try { try {
log.info("获取播流地址:{}"); log.info("获取播流地址:{}");
//CommandManager manager = new CommandManagerImpl(); //CommandManager manager = new CommandManagerImpl();
String taskId = manager.start(code, CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i",playUrl)
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
String taskId = manager.start(code, CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i", playUrl)
.add("-rtsp_transport", "tcp")
.add("-vcodec", "copy")
.add("-acodec", "copy")
.add("-f", "flv")
.add("-b:v", "2M")
.add("-maxrate", "2M")
.add("-bufsize", "1M")
.add("-v", "trace ")
.add("-y").add(pushUrl)); .add("-y").add(pushUrl));
cmdParam.getMqttTopic(); cmdParam.getMqttTopic();
String reTopic = cmdParam.getMqttTopic().replace("live","result");
String reTopic = cmdParam.getMqttTopic().replace("live", "result");


log.info(manager.queryAll().toString()); log.info(manager.queryAll().toString());
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
if (StringUtils.isNullOrEmpty(taskId)){
if (StringUtils.isNullOrEmpty(taskId)) {
jsonObject.put("code", -1); jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务失败"); //推流失败 jsonObject.put("msg", "推流服务失败"); //推流失败
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
// 停止全部任务 // 停止全部任务
manager.stopAll(); manager.stopAll();
return; return;
}else {
} else {
jsonObject.put("code", 0); jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务启动成功"); jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
} }
Thread.sleep(36000);
Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString()); log.info(manager.queryAll().toString());
// 停止全部任务 // 停止全部任务
manager.stopAll(); manager.stopAll();
manager.destory(); manager.destory();
}catch (Exception e){
log.error("",e);
} catch (Exception e) {
log.error("", e);
} }
//ds.close(); //ds.close();
} }


public void stopRtmp() {
public void stopRtmp() {
try { try {
log.info("停止推流"); log.info("停止推流");
//CommandManager manager = new CommandManagerImpl(); //CommandManager manager = new CommandManagerImpl();
int index = manager.stopAll(); int index = manager.stopAll();
manager.destory(); manager.destory();
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
String reTopic = cmdParam.getMqttTopic().replace("live","result");
if (index==0){
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
if (index == 0) {
jsonObject.put("code", -1); jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务关闭失败"); //推流失败 jsonObject.put("msg", "推流服务关闭失败"); //推流失败
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
// 停止全部任务 // 停止全部任务
manager.stopAll(); manager.stopAll();
return; return;
}else if (index>0){
} else if (index > 0) {
jsonObject.put("code", 0); jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务关闭成功"); jsonObject.put("msg", "推流服务关闭成功");
returnChenl(); returnChenl();
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
} }
}catch (Exception e){
log.error("",e);
} catch (Exception e) {
log.error("", e);
} }
//ds.close(); //ds.close();
} }


/** /**
* 释放阿里云通道 * 释放阿里云通道
*
* @return * @return
*/ */
private JSONObject returnChenl() { private JSONObject returnChenl() {
return null; return null;
} }
return null; return null;
}catch (Exception e){
log.error("",e);
} catch (Exception e) {
log.error("", e);
} }
return null; return null;
} }

Loading…
Cancel
Save