ffch/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java

505 lines
21 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.github.bluesbruce.spring.service;
import com.github.bluesbruce.ffch.CommandManager;
import com.github.bluesbruce.ffch.CommandManagerImpl;
import com.github.bluesbruce.ffch.commandbuidler.CommandBuidlerFactory;
import com.github.bluesbruce.ffch.data.CommandTasker;
import com.alibaba.fastjson.JSONObject;
import com.github.bluesbruce.ffch.handler.DefaultOutHandlerMethod;
import com.github.bluesbruce.ffch.handler.OutHandler;
import com.github.bluesbruce.spring.config.CmdParam;
import com.github.bluesbruce.spring.mqttService.HttpURLConnectionUtil;
import com.github.bluesbruce.spring.mqttService.send.MqttProviderConfig;
import lombok.extern.slf4j.Slf4j;
import org.h2.util.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.*;
import static com.github.bluesbruce.ffch.util.PropertiesUtil.load;
@Service
@Slf4j
public class RtmpLiveService {
/**
* 加载配置文件的推拉流地址
*/
public static final CmdParam cmdParam = load("/loadCmd.properties", CmdParam.class);
@Autowired
private MqttProviderConfig mqttProviderConfig;
public static int status =0;
public static CommandManager manager = new CommandManagerImpl();
/**
* h获取通道并推送
*/
public void pushServer(String code) throws InterruptedException {
//CommandManager manager = new CommandManagerImpl();
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
JSONObject jsonObject = new JSONObject();
Collection<CommandTasker> infoList = manager.queryAll();
log.info(infoList.toString());
if (infoList.size() > 0) {
//manager.stopAll();
//manager.destory();
//Thread.sleep(3000);
/*jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务已启动,请勿重复启动"); //推流失败
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());*/
log.info("任务运行中");
return;
}
//检测线程
Thread thread = new Thread(new Runnable() {
public void run() {
try {
checkMsg();
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
status=1;
//Thread.sleep(5000);
String pushUrl = "";
if (cmdParam.getType() == 0){
log.info("使用指令推流");
runRtmpByCmd(cmdParam.getCmd());
}else
if (cmdParam.getType() == 2) {
log.info("获取流媒体通道");
//TODO 获取通道服务推拉流地址
JSONObject object = getChenl();
if (!ObjectUtils.isEmpty(object)) {
code = object.get("code").toString();
object.put("code", 0);
object.put("msg", "获取通道成功");
mqttProviderConfig.publish(2, false, reTopic, object.toJSONString());
} else {
log.info("获取通道失败");
object.put("code", -1);
object.put("msg", "获取通道失败");
mqttProviderConfig.publish(2, false, reTopic, object.toJSONString());
return;
}
runRtmp(pushUrl, cmdParam.getPlayUrl(), code);
//TODO end
} else if (cmdParam.getType() == 1) {
log.info("使用固定流媒体通道");
pushUrl = cmdParam.getPushUrl();
runRtmp(pushUrl, cmdParam.getPlayUrl(), code);
} else if (cmdParam.getType() == 3) {
runRtmp2to2(cmdParam.getPlayUrl());
} else if (cmdParam.getType() == 4) {
runRtmp1to2(cmdParam.getPlayUrl());
}
/*status=0;
DefaultOutHandlerMethod.outIdMap=new HashMap();*/
}
private void runRtmp2to2(String playUrl) throws InterruptedException {
try {
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
String taskId = manager.start("push3-1", CommandBuidlerFactory.createBuidler()
.add("ffmpeg")
.add("-rtsp_transport", "tcp").add("-i", playUrl)
.add("-vcodec", "copy")
/*.add("-acodec", "copy")*/
.add("-f", "flv")
.add("-b:v", "1M")
.add("-maxrate", "1M")
.add("-bufsize", "1M")
.add("-an").add(cmdParam.getPushUrl()));
Thread.sleep(5000);
String taskId2 = manager.start("push3-2", CommandBuidlerFactory.createBuidler()
.add("ffmpeg")
.add("-rtsp_transport", "tcp").add("-i", playUrl)
.add("-vcodec", "copy")
/* .add("-acodec", "copy")*/
.add("-f", "flv")
.add("-b:v", "1M")
.add("-maxrate", "1M")
.add("-bufsize", "1M")
.add("-an").add(cmdParam.getPushUrl2()));
JSONObject jsonObject = new JSONObject();
if (StringUtils.isNullOrEmpty(taskId) && StringUtils.isNullOrEmpty(taskId2)) {
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务失败"); //推流失败
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
// 停止全部任务
/*manager.stopAll();
return;*/
} else {
jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
/*Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();*/
} catch (Exception e) {
e.printStackTrace();
}
}
private void runRtmp1to2(String playUrl) {
try {
//单进程 多路
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
String taskId = manager.start("push4", CommandBuidlerFactory.createBuidler()
.add("ffmpeg")
.add("-rtsp_transport", "tcp").add("-i", playUrl)
.add("-vcodec", "copy")
.add("-acodec", "copy")
.add("-b:v", "2M")
.add("-maxrate", "2M")
.add("-bufsize", "1M")
//多路推流测试
.add("-map", "0")
.add("-f").add(cmdParam.getPushUrlMap()));
JSONObject jsonObject = new JSONObject();
if (StringUtils.isNullOrEmpty(taskId)) {
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务失败"); //推流失败
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
// 停止全部任务
manager.stopAll();
return;
} else {
jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
/*Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();*/
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取阿里云通道
*
* @return
*/
private JSONObject getChenl() {
String result = HttpURLConnectionUtil.doGet("https://streaming.t-aaron.com/livechannel/getLiveChannelList");
if (!StringUtils.isNullOrEmpty(result)) {
try {
JSONObject object = JSONObject.parseObject(result);
List<JSONObject> objectList = (List<JSONObject>) object.get("data");
if (objectList.size() > 0) {
log.info("获取到通道列表{}", objectList);
for (int i = 0; i < objectList.size(); i++) {
JSONObject chenl = objectList.get(i);
//设置通道占用
JSONObject param = new JSONObject();
param.put("code", chenl.get("code").toString());
String resultOn = HttpURLConnectionUtil.doPut("https://streaming.t-aaron.com/livechannel/useLiveChannel", param.toJSONString());
if (!StringUtils.isNullOrEmpty(resultOn)) {
JSONObject objectResult = JSONObject.parseObject(resultOn);
if (objectResult.get("code").toString().equals("0")) {
log.info("占用通道{}", objectList.get(i));
return objectList.get(i);
}
}
}
}
} catch (Exception e) {
log.error("", e);
}
} else {
return null;
}
return null;
}
/**
* 根据推拉流地址推动
*
* @param pushUrl
* @param playUrl
*/
public void runRtmp(String pushUrl, String playUrl, String code) {
try {
log.info("获取播流地址:{}");
//CommandManager manager = new CommandManagerImpl();
String taskId = manager.start("push1", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-rtsp_transport", "tcp")
.add("-i", playUrl)
.add("-vcodec", "copy")
/*.add("-acodec", "copy")*/
.add("-f", "flv")
/* .add("-b:v", "1000")
.add("-maxrate", "3000")
.add("-bufsize", "6000")
.add("-rtbufsize ", "6000")
.add("-max_delay", "4000")*/
.add("-an").add(pushUrl));
cmdParam.getMqttTopic();
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
log.info(manager.queryAll().toString());
JSONObject jsonObject = new JSONObject();
if (StringUtils.isNullOrEmpty(taskId)) {
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务失败"); //推流失败
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
// 停止全部任务
manager.stopAll();
return;
} else {
jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
/*Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();*/
} catch (Exception e) {
log.error("", e);
}
//ds.close();
}
/*private void checkMsg1() throws InterruptedException {
long oldtime = System.currentTimeMillis();
String oldMsg = "";
while (true) {
String msg = OutHandler.outMsg==null?"":OutHandler.outMsg;
if (!msg.equals(oldMsg)) {
oldMsg=msg;
oldtime=System.currentTimeMillis();
}
log.info("消息输出{},状态{}",msg,status);
if (System.currentTimeMillis()-oldtime>15000&&msg.equals(oldMsg)&&status==1){
//log.info("消息未输出{}",msg);
if(manager.queryAll().size()>0){
Collection<CommandTasker> list = manager.queryAll();
Iterator<CommandTasker> commandTaskerIterator = list.iterator();
CommandTasker s = commandTaskerIterator.next();
log.info("0000{},{}",s.getId(),s.getCommand());
log.info("任务消息未输出",s);
manager.stop(s.getId());
Thread.sleep(2000);
String result= manager.start(s.getId(),s.getCommand().split("bin/")[1]);
if (!StringUtils.isNullOrEmpty(result)){
oldtime= System.currentTimeMillis();
}else{
log.info("推流失败,重新推流");
}
};
}
if (status==0){
break;
}
Thread.sleep(200);
}
}*/
private void checkMsg() throws InterruptedException {
long oldtime = System.currentTimeMillis();
String oldMsg = "";
long oldtime2 = System.currentTimeMillis();
String oldMsg2 = "";
while (true) {
Map map = DefaultOutHandlerMethod.outIdMap;
if (cmdParam.getType()==3){
Map map1 = checkMsgDetil(oldtime,oldMsg,map.get("push3-1")==null?"":map.get("push3-1").toString(),"push3-1");
oldMsg=map1.get("oldMsg")==null?"":map1.get("oldMsg").toString();
oldtime=Long.parseLong(map1.get("oldtime")==null?"0":map1.get("oldtime").toString());
Map map2 = checkMsgDetil(oldtime2,oldMsg2,map.get("push3-2")==null?"":map.get("push3-2").toString(),"push3-2");
oldMsg2=map2.get("oldMsg")==null?"":map2.get("oldMsg").toString();
oldtime2=Long.parseLong(map2.get("oldtime")==null?"0":map2.get("oldtime").toString());
}else {
String msg = "";
String keys ="";
for (Object key : map.keySet()) {
msg=map.get(key).toString();
if (!(key.toString().equals("push3-1")||key.toString().equals("push3-1"))) {
keys = key.toString();
}
}
Map map1 = checkMsgDetil(oldtime,oldMsg,msg,keys);
oldMsg=map1.get("oldMsg")==null?"":map1.get("oldMsg").toString();
oldtime=Long.parseLong(map1.get("oldtime")==null?"0":map1.get("oldtime").toString());
}
if (status==0&&manager.queryAll().size()>0){
log.info("停止推流20");
if (manager.queryAll().size()>0){
manager.stopAll();
manager.destory();
}
Thread.sleep(5000);
if (manager.queryAll().size()>0){
log.info("延迟检测推流1");
manager.stopAll();
manager.destory();
}
log.info("无任务停止监听1");
break;
}
if (status==0&&manager.queryAll().size()==0){
log.info("无任务停止监听2");
Thread.sleep(5000);
if (manager.queryAll().size()>0){
log.info("延迟检测推流关闭");
manager.stopAll();
manager.destory();
}
break;
}
Thread.sleep(400);
}
}
public Map checkMsgDetil(long oldtime,String oldMsg,String msg,String id) throws InterruptedException {
if (!msg.equals(oldMsg)) {
oldMsg = msg;
oldtime = System.currentTimeMillis();
}
log.info("消息输出{},状态{}", msg, status);
if (System.currentTimeMillis() - oldtime > 15000 && msg.equals(oldMsg) && status == 1) {
//log.info("消息未输出{}",msg);
/*Collection<CommandTasker> list = manager.queryAll();
Iterator<CommandTasker> commandTaskerIterator = list.iterator();*/
CommandTasker s = manager.query(id);
if (!ObjectUtils.isEmpty(s)) {
log.info("0000{},{}", s.getId(), s.getCommand());
log.info("任务消息未输出,重新推流", s);
manager.stop(s.getId());
Thread.sleep(2000);
String result = manager.start(s.getId(), s.getCommand().split("bin/")[1]);
if (!StringUtils.isNullOrEmpty(result)) {
oldtime = System.currentTimeMillis();
} else {
log.info("推流失败,重新推流");
}
}
}
Map map = new HashMap();
map.put("oldtime",oldtime);
map.put("oldMsg",oldMsg);
return map;
}
public void stopRtmp() {
try {
status=0;
DefaultOutHandlerMethod.outIdMap=new HashMap();
log.info("停止推流");
//CommandManager manager = new CommandManagerImpl();
// 停止全部任务
int index = manager.stopAll();
manager.destory();
JSONObject jsonObject = new JSONObject();
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
if (index == 0) {
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务关闭失败"); //推流失败
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
// 停止全部任务
manager.stopAll();
return;
} else if (index > 0) {
jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务关闭成功");
returnChenl();
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
} catch (Exception e) {
log.error("", e);
}
//ds.close();
}
/**
* 释放阿里云通道
*
* @return
*/
private JSONObject returnChenl() {
try {
String result = HttpURLConnectionUtil.doGet("https://streaming.t-aaron.com/livechannel/getLiveChannelList");
if (!StringUtils.isNullOrEmpty(result)) {
JSONObject object = JSONObject.parseObject(result);
List<JSONObject> objectList = (List<JSONObject>) object.get("data");
if (objectList.size() > 0) {
log.info("获取到通道列表{}", objectList);
for (int i = 0; i < objectList.size(); i++) {
JSONObject chenl = objectList.get(i);
//设置通道占用
JSONObject param = new JSONObject();
param.put("code", chenl.get("code").toString());
String resultOn = HttpURLConnectionUtil.doPut("https://streaming.t-aaron.com/livechannel/useLiveChannel", param.toJSONString());
if (!StringUtils.isNullOrEmpty(resultOn)) {
JSONObject objectResult = JSONObject.parseObject(resultOn);
if (objectResult.get("code").toString().equals("0")) {
log.info("占用通道{}", objectList.get(i));
return objectList.get(i);
}
}
}
}
} else {
return null;
}
return null;
} catch (Exception e) {
log.error("", e);
}
return null;
}
/**
* 根据指令直推
* @param cmd
*/
public void runRtmpByCmd(String cmd) {
try {
String taskId = manager.start("push0",cmd) ;
cmdParam.getMqttTopic();
String reTopic = cmdParam.getMqttTopic().replace("live", "result");
log.info(manager.queryAll().toString());
JSONObject jsonObject = new JSONObject();
if (StringUtils.isNullOrEmpty(taskId)) {
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务失败"); //推流失败
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
// 停止全部任务
manager.stopAll();
return;
} else {
jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2, false, reTopic, jsonObject.toJSONString());
}
/*Thread.sleep(cmdParam.getTime());
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();*/
} catch (Exception e) {
log.error("", e);
}
//ds.close();
}
}