Compare commits

...

15 Commits

Author SHA1 Message Date
孙小云 f0b4b85dec 添加定时删除功能 2025-07-17 14:56:06 +08:00
孙小云 01727f8e3f 添加定时删除功能 2025-07-11 10:43:11 +08:00
孙小云 4cb2c1c468 xx 2025-05-24 17:18:10 +08:00
孙小云 e692d319c3 xx 2025-04-26 16:50:35 +08:00
孙小云 03a23330bb 修改录制时长 2025-04-24 16:42:53 +08:00
孙小云 613d6d3dc5 大数据 2025-04-23 11:56:18 +08:00
孙小云 4ca3c73323 xx 2025-04-19 11:35:37 +08:00
孙小云 d65aaaf4b6 修改配置文件 2025-04-14 09:53:59 +08:00
孙小云 889a978bbe 删除 2025-04-14 09:39:36 +08:00
孙小云 95263673f9 生产环境无法注入的问题 2025-04-02 17:12:56 +08:00
孙小云 5a569084b8 添加如果流不存在则关闭时候报错 2025-04-02 16:46:00 +08:00
孙小云 4779c1b1bc 添加如果流不存在则关闭时候报错 2025-04-02 16:44:39 +08:00
孙小云 94c8935b1a 添加图片获取功能 2025-03-22 16:34:11 +08:00
孙小云 84b3154cdd 视频录制功能 2025-03-22 16:31:22 +08:00
孙小云 727d347c71 修改分页开始地址 2025-03-17 13:32:07 +08:00
14 changed files with 425 additions and 171 deletions

16
pom.xml
View File

@ -35,11 +35,6 @@
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>io.minio</groupId> <groupId>io.minio</groupId>
@ -51,7 +46,18 @@
<artifactId>fastjson2</artifactId> <artifactId>fastjson2</artifactId>
<version>2.0.53</version> <version>2.0.53</version>
</dependency> </dependency>
<!-- SkyWalking 工具包Logback 支持) -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<version>8.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>8.6.0</version>
</dependency>

View File

@ -16,6 +16,7 @@ import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.io.File; import java.io.File;
@ -34,53 +35,87 @@ public class StreamRecordController {
@Autowired @Autowired
IRecordService iRecordService; IRecordService iRecordService;
@GetMapping("pic")
public Response<String> startPic(@RequestParam String streamUrl) {
logger.info("启动视频拍照 :"+streamUrl);
streamUrl = streamSwitch(streamUrl);
logger.info("启动视频拍照_ :"+streamUrl);
if(Objects.isNull(streamUrl)) {
return Response.fail(-1);
}
String outfile = taskService.startPic(streamUrl);
if(Objects.nonNull(outfile)) {
int lastSlashIndex = outfile.lastIndexOf("/");
String fileName = outfile.substring(lastSlashIndex + 1);
return Response.success(fileName);
}else {
return Response.fail(-1);
}
}
@GetMapping("start") @GetMapping("start")
public Response<StreamTask> startRecording(@RequestParam String streamUrl) { public Response<StreamTask> startRecording(@RequestParam String streamUrl) {
System.out.println("启动录制 :"+streamUrl); logger.info("启动录制 :"+streamUrl);
streamUrl = streamSwitch(streamUrl);
logger.info("启动录制_ :"+streamUrl);
if(Objects.isNull(streamUrl)) { if(Objects.isNull(streamUrl)) {
return Response.fail(-1); return Response.fail(-1);
} }
Response<StreamTask> response = Response.success(taskService.startTask(streamUrl)); Response<StreamTask> response = Response.success(taskService.startTask(streamUrl));
System.out.println("启动录制返回 :"+ JSON.toJSONString(response)); logger.info("启动录制返回 :"+ JSON.toJSONString(response));
return response; return response;
} }
@GetMapping("stop") @GetMapping("stop")
public Response<StreamTask> stopRecording(@RequestParam String streamUrl){ public Response<StreamTask> stopRecording(@RequestParam String streamUrl){
System.out.println("关闭录制 :"+streamUrl); logger.info("关闭录制 :"+streamUrl);
streamUrl = streamSwitch(streamUrl);
logger.info("关闭录制_ :"+streamUrl);
if(Objects.isNull(streamUrl)) { if(Objects.isNull(streamUrl)) {
return Response.fail(-1); return Response.fail(-1);
} }
Response<StreamTask> response = Response.success(taskService.stopTask(streamUrl)); try {
if(Objects.isNull(response.getData().getOutFileName()) || response.getData().getOutFileName().isEmpty()){ Response<StreamTask> response = Response.success(taskService.stopTask(streamUrl));
response.setCode(500); if(Objects.isNull(response.getData()) || Objects.isNull(response.getData().getOutFileName()) || response.getData().getOutFileName().isEmpty()){
response.setCode(500);
}
logger.info("关闭录制返回 :"+ JSON.toJSONString(response));
return response;
}catch (Exception e){
return Response.fail(-1);
} }
System.out.println("关闭录制返回 :"+ JSON.toJSONString(response));
return response;
} }
@GetMapping("info") @GetMapping("info")
public Response<StreamTask> getLastTask(@RequestParam String streamUrl){ public Response<StreamTask> getLastTask(@RequestParam String streamUrl){
logger.info("查看录制 :"+streamUrl);
System.out.println("查看录制 :"+streamUrl); streamUrl = streamSwitch(streamUrl);
logger.info("查看录制_ :"+streamUrl);
if(Objects.isNull(streamUrl)) { if(Objects.isNull(streamUrl)) {
return Response.fail(-1); return Response.fail(-1);
} }
Response<StreamTask> response = Response.success(taskService.getLastTask(streamUrl)); Response<StreamTask> response = Response.success(taskService.getLastTask(streamUrl));
System.out.println("查看录制返回 :"+ JSON.toJSONString(response)); logger.info("查看录制返回 :"+ JSON.toJSONString(response));
return response; return response;
} }
/**
* 废弃
* @param request
* @return
*/
@PostMapping("search") @PostMapping("search")
public Response<PageInfo<Mp4Info>> streamView(@RequestBody PageStreamRequest request){ public Response<PageInfo<Mp4Info>> streamView(@RequestBody PageStreamRequest request){
System.out.println("查看录制 search :"+JSON.toJSONString(request)); logger.info("查看录制 search :"+JSON.toJSONString(request));
if(Objects.isNull(request.getPageIndex()) || Objects.isNull(request.getPageSize()) if(Objects.isNull(request.getPageIndex()) || Objects.isNull(request.getPageSize())
|| request.getPageIndex() <0 || request.getPageSize() <=0 ){ || request.getPageIndex() <0 || request.getPageSize() <=0 ){
System.out.println("查看录制 search 入参错误!"); logger.error("查看录制 search 入参错误!");
return Response.fail(-100); return Response.fail(-100);
} }
@ -104,18 +139,10 @@ public class StreamRecordController {
*/ */
mp4s.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime())); mp4s.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime()));
PageInfo<Mp4Info> pageInfo = new PageInfo<>(mp4s,request.getPageIndex(),request.getPageSize()); PageInfo<Mp4Info> pageInfo = new PageInfo<>(mp4s,request.getPageIndex(),request.getPageSize());
System.out.println("查看录制 search 返回:"+JSON.toJSONString(request)); logger.info("查看录制 search 返回:"+JSON.toJSONString(request));
return Response.success(pageInfo); return Response.success(pageInfo);
} }
@GetMapping("history")
public Response<List<Mp4Info>> streamView(@RequestParam String streamUrl){
System.out.println("查看录像历史返回 :"+ streamUrl);
List<Mp4Info> dayMp4 = searchAll(streamUrl);
dayMp4.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime()));
System.out.println("查看录像历史返回 :"+ JSON.toJSONString(dayMp4));
return Response.success(dayMp4) ;
}
public List<Mp4Info> searchAll(String streamUrl){ public List<Mp4Info> searchAll(String streamUrl){
@ -142,22 +169,62 @@ public class StreamRecordController {
} }
@Value("${srs.domain}")
private String srsdomain;
@Value("${srs.name}")
private String srsname;
/** public String streamSwitch(String source){
* 方便测试时候使用
* @return if(Objects.nonNull(srsdomain) && !srsdomain.isEmpty()){
*/ if(Objects.nonNull(source) && !source.isEmpty()){
@GetMapping("exit") return dockerFix(source);
public String exit(){
Thread thread = new Thread(() -> {
try {
taskService.stopAllTask();
Thread.sleep(10000L);
}catch (Exception e){
} }
System.exit(0); }
});
return "OK";
if(source.contains("stream.t-aaron.com")){
return source;
} else if (source.contains("rtmp://live.push.t-aaron.com")){
source = source.replace("rtmp://live.push.t-aaron.com","http://live.play.t-aaron.com") ;
if(source.endsWith("_")){
source = source.substring(0,source.length()-1) + ".flv" + "_";
}else {
source = source + ".flv";
}
return source;
}else {
if(source.contains("https://live.play.t-aaron.com")){
source = source.replace("https://live.play.t-aaron.com","http://live.play.t-aaron.com");
}
return source;
}
}
public String dockerFix(String url) {
if (url == null || url.isEmpty()) {
return url;
}
url = url.replaceFirst("^https?://", "rtmp://");
// 删除末尾的 .flv
url = url.replaceFirst("\\.flv$", "");
if(url.contains("srs-jndsj")){
return url;
}
// 处理 RTMP URL
if (url.startsWith("rtmp://")) {
// 先删除端口号
String withoutPort = url.replaceFirst("(rtmp://[^:/]+):\\d+", "$1");
// 替换域名为 aaa
String withNewDomain = withoutPort.replaceFirst("rtmp://[^/]+", "rtmp://"+srsname);
// 删除 .flv 后缀
return withNewDomain.replaceFirst("\\.flv$", "");
}
return url;
} }
} }

View File

@ -10,10 +10,10 @@ public class PageInfo <T> implements Serializable {
public PageInfo(List<T> list,int page,int pageSize) { public PageInfo(List<T> list,int page,int pageSize) {
this.total = list.size(); this.total = list.size();
this.pageSize = pageSize; this.pageSize = pageSize;
this.pageIndex = page; this.pageIndex = page ;
// 计算分页数据 // 计算分页数据
int fromIndex = page * pageSize; // 起始索引 int fromIndex = (page - 1) * pageSize; // 起始索引
int toIndex = Math.min(fromIndex + pageSize, total); // 结束索引 int toIndex = Math.min(fromIndex + pageSize, total); // 结束索引
// 检查页码是否超出范围 // 检查页码是否超出范围

View File

@ -1,5 +1,5 @@
package com.tuoheng.steam.dos; package com.tuoheng.steam.dos;
public enum ProcessType { public enum ProcessType {
RECORD,MERGE RECORD,MERGE,PIC
} }

View File

@ -27,13 +27,21 @@ public class Scheduler {
@Value("${livedates}") @Value("${livedates}")
private Integer livedates; private Integer livedates;
@Value("${cangneiwai}")
private Boolean cangneiwai;
/** /**
* 初次执行延迟6秒执行 * 初次执行延迟6秒执行
* 每隔 60 分钟执行一次 60*60*1000 * 每隔 60 分钟执行一次 60*60*1000
*/ */
@Scheduled(fixedRate = 3600000, initialDelay = 6000) @Scheduled(fixedRate = 3600000, initialDelay = 6000)
public void mergeTask() { public void mergeTask() {
System.out.println("开始FLV到MP4的转换 - " + System.currentTimeMillis() / 1000);
if(!cangneiwai){
logger.info("舱内外无需录制");
return;
}
logger.info("开始FLV到MP4的转换 - " + System.currentTimeMillis() / 1000);
List<DayRecord> dayRecords = iRecordService.findDaysPath(); List<DayRecord> dayRecords = iRecordService.findDaysPath();
for (int index = 0; index < dayRecords.size(); index++) { for (int index = 0; index < dayRecords.size(); index++) {

View File

@ -5,14 +5,15 @@ import com.tuoheng.steam.service.dos.*;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
/**
* 视频回放服务
*/
public interface IRecordService { public interface IRecordService {
public List<DayRecord> findDaysPath(); public List<DayRecord> findDaysPath();
public List<StreamRecord> findInDayRecord(DayRecord dayRecord); public List<StreamRecord> findInDayRecord(DayRecord dayRecord);
public void mergeMp4(FlvRecord flvRecord); public void mergeMp4(FlvRecord flvRecord);
} }

View File

@ -2,11 +2,16 @@ package com.tuoheng.steam.service;
import com.tuoheng.steam.dos.StreamTask; import com.tuoheng.steam.dos.StreamTask;
/**
* 视频录制服务
*/
public interface ITaskService { public interface ITaskService {
public StreamTask startTask(String streamUrl); public StreamTask startTask(String streamUrl);
public StreamTask stopTask(String streamUrl); public String startPic(String streamUrl);
public StreamTask stopTask(String streamUrl) throws Exception;
public StreamTask getLastTask(String streamUrl); public StreamTask getLastTask(String streamUrl);

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSON;
import com.tuoheng.steam.dos.StreamProcess; import com.tuoheng.steam.dos.StreamProcess;
import com.tuoheng.steam.dos.StreamTask; import com.tuoheng.steam.dos.StreamTask;
import com.tuoheng.steam.service.innerService.ProcessService; import com.tuoheng.steam.service.innerService.ProcessService;
import com.tuoheng.steam.util.FileUtil;
import com.tuoheng.steam.util.ProcessManager; import com.tuoheng.steam.util.ProcessManager;
import com.tuoheng.steam.util.TimeUtils; import com.tuoheng.steam.util.TimeUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -30,8 +31,8 @@ public class TaskService implements ITaskService{
@Value("${srs.targetPath}") @Value("${srs.targetPath}")
private String targetPath; private String targetPath;
@Value("${ffmpeg}") // @Value("${ffmpeg}")
private String ffmpeg; private String ffmpeg ="ffmpeg";
ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
@ -42,18 +43,20 @@ public class TaskService implements ITaskService{
public void cleaTask() { public void cleaTask() {
for (ConcurrentHashMap.Entry<String, StreamTask> entry : runningTasks.entrySet()) { for (ConcurrentHashMap.Entry<String, StreamTask> entry : runningTasks.entrySet()) {
StreamTask streamTask = entry.getValue(); StreamTask streamTask = entry.getValue();
if(streamTask.getStartTime().getTime() < new Date().getTime() - 30 * 60 * 1000 ) { if(streamTask.getStartTime().getTime() < new Date().getTime() - 90 * 60 * 1000 ) {
logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask)); logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask));
StreamTask s = stopTask(entry.getKey()); try {
scheduler.schedule(() -> { StreamTask s = stopTask(entry.getKey());
File file = new File(targetPath +"/"+ s.getOutFileName()); scheduler.schedule(() -> {
try { File file = new File(targetPath +"/"+ s.getOutFileName());
file.delete(); try {
logger.info("废弃文件删除成功"); logger.info("废弃文件删除成功 {}",file.delete());
}catch (Exception e) { }catch (Exception e) {
logger.error(e.getMessage()); logger.error(e.getMessage());
} }
}, 60, TimeUnit.SECONDS); }, 60, TimeUnit.SECONDS);
}catch (Exception ignore) {}
} }
} }
} }
@ -93,7 +96,9 @@ public class TaskService implements ITaskService{
Date twoHoursAgo = calendar.getTime(); Date twoHoursAgo = calendar.getTime();
if (value.getStartTime().before(twoHoursAgo)) { if (value.getStartTime().before(twoHoursAgo)) {
logger.error("taskId {} 执行超时,手动关闭", key); logger.error("taskId {} 执行超时,手动关闭", key);
stopTask(key); try {
stopTask(key);
}catch (Exception ignore) {}
} }
}); });
@ -104,9 +109,8 @@ public class TaskService implements ITaskService{
for (StreamProcess process : streamProcesses) { for (StreamProcess process : streamProcesses) {
process.destroy(); process.destroy();
File file = new File(process.getFileName()); File file = new File(process.getFileName());
if(file.exists()){ FileUtil.deleteFile(file);
file.delete();
}
} }
} }
@ -118,6 +122,34 @@ public class TaskService implements ITaskService{
}, 60, 60, TimeUnit.SECONDS); }, 60, 60, TimeUnit.SECONDS);
} }
public String startPic(String streamUrl) {
try {
StreamProcess streamProcess = processService.takePic(streamUrl);
String outFileName = streamProcess.getFileName();
/*
注册进程
*/
ProcessManager.registerProcess(streamProcess.getInnerProcessId());
logger.info("streamUrl {} startPicProcess {} ", streamUrl,streamProcess.getInnerProcessId());
CompletableFuture<Process> future = streamProcess.onExit();
future.thenRun(() -> {
logger.info("streamUrl {} startPicProcess {} Over ", streamUrl,streamProcess.getInnerProcessId());
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId());
}).exceptionally(ex -> {
logger.info("streamUrl {} startPicProcess {} Exceptionally ", streamUrl,streamProcess.getInnerProcessId());
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId());
return null;
});
return outFileName;
}catch (Exception e) {
logger.info("streamUrl {} ", streamUrl,e);
}
return null;
}
public StreamTask startTask(String streamUrl) { public StreamTask startTask(String streamUrl) {
/* /*
@ -142,7 +174,9 @@ public class TaskService implements ITaskService{
} }
public StreamTask stopTask(String streamUrl) { public StreamTask stopTask(String streamUrl) throws Exception {
boolean recordSuccess = true;
StreamTask currentStreamTask = runningTasks.remove(streamUrl); StreamTask currentStreamTask = runningTasks.remove(streamUrl);
if (currentStreamTask!= null) { if (currentStreamTask!= null) {
@ -150,11 +184,20 @@ public class TaskService implements ITaskService{
if (!CollectionUtils.isEmpty(currentStreamTask.getStreamProcesses())) { if (!CollectionUtils.isEmpty(currentStreamTask.getStreamProcesses())) {
for (StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) { for (StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) {
logger.info("streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId()); if(!new File(streamProcess.getFileName()).exists()){
recordSuccess = false;
logger.error("流录制失败: streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId());
}else {
logger.info("streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId());
}
streamProcess.destroy(); streamProcess.destroy();
} }
} }
if(!recordSuccess){
throw new Exception("");
}
try { try {
String outFileName = UUID.randomUUID().toString() + ".mp4"; String outFileName = UUID.randomUUID().toString() + ".mp4";
StreamProcess mergeProcess = processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses()); StreamProcess mergeProcess = processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses());
@ -177,9 +220,7 @@ public class TaskService implements ITaskService{
ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId());
for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){ for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){
File file = new File(streamProcess.getFileName()); File file = new File(streamProcess.getFileName());
if(file.exists()){ FileUtil.deleteFile(file);
file.delete();
}
} }
}).exceptionally(ex -> { }).exceptionally(ex -> {
logger.error("streamUrl {} taskId {} MergeProcess {} exceptionally", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId(), ex); logger.error("streamUrl {} taskId {} MergeProcess {} exceptionally", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId(), ex);
@ -187,9 +228,7 @@ public class TaskService implements ITaskService{
ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId());
for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){ for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){
File file = new File(streamProcess.getFileName()); File file = new File(streamProcess.getFileName());
if(file.exists()){ FileUtil.deleteFile(file);
file.delete();
}
} }
return null; return null;
}); });
@ -197,35 +236,26 @@ public class TaskService implements ITaskService{
} }
// if( historyTasks.containsKey(streamUrl)){
// historyTasks.get(streamUrl).add(currentStreamTask);
// }else {
// historyTasks.put(streamUrl, new LinkedList<>());
// historyTasks.get(streamUrl).offerLast(currentStreamTask);
// }
if(Objects.nonNull(currentStreamTask.getMergeProcess())){ if(Objects.nonNull(currentStreamTask.getMergeProcess())){
if(Objects.nonNull(currentStreamTask.getMergeProcess().getFileName()) && if(Objects.nonNull(currentStreamTask.getMergeProcess().getFileName()) &&
!currentStreamTask.getMergeProcess().getFileName().isEmpty()){ !currentStreamTask.getMergeProcess().getFileName().isEmpty()){
try { try {
//
// scheduler.schedule(task, 3, TimeUnit.SECONDS);
executor.schedule(new Runnable() { executor.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
System.out.println("FileName :"+currentStreamTask.getMergeProcess().getFileName()); logger.info("FileName :"+currentStreamTask.getMergeProcess().getFileName());
Process process = Runtime.getRuntime().exec(ffmpeg + " -i " + Process process = Runtime.getRuntime().exec(ffmpeg + " -i " +
currentStreamTask.getMergeProcess().getFileName()); currentStreamTask.getMergeProcess().getFileName());
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream())); BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
System.out.println("Duration line: " + line); logger.info("Duration line: " + line);
if (line.contains("Duration")) { if (line.contains("Duration")) {
String duration = line.split("Duration: ")[1].split(",")[0]; String duration = line.split("Duration: ")[1].split(",")[0];
System.out.println("Duration: " + duration.substring(0, 8)); logger.info("Duration: " + duration.substring(0, 8));
currentStreamTask.setDuration(duration.substring(0, 8)); currentStreamTask.setDuration(duration.substring(0, 8));
System.out.println("-----------------放入缓存-----------------"); logger.info("-----------------放入缓存-----------------");
/** /**
* 放入缓存 * 放入缓存
*/ */
@ -240,7 +270,7 @@ public class TaskService implements ITaskService{
} }
reader.close(); reader.close();
}catch (Exception e){ }catch (Exception e){
System.out.println(e.getMessage()); logger.info(e.getMessage());
} }
} }
},40, TimeUnit.SECONDS); },40, TimeUnit.SECONDS);
@ -259,11 +289,12 @@ public class TaskService implements ITaskService{
} }
}else { }else {
if(historyTasks.containsKey(streamUrl)){ throw new Exception("");
return historyTasks.get(streamUrl).peekLast(); // if(historyTasks.containsKey(streamUrl)){
}else { // return historyTasks.get(streamUrl).peekLast();
return null; // }else {
} // return null;
// }
} }
} }
@ -303,8 +334,14 @@ public class TaskService implements ITaskService{
future.thenRun(() -> { future.thenRun(() -> {
logger.info("streamUrl {} taskId {} Process {} Over", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId()); logger.info("streamUrl {} taskId {} Process {} Over", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId());
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
startTask(streamUrl, streamTask);
ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId());
try {
Thread.sleep(2000L);
} catch (Exception ignored){}
if(!new File(streamProcess.getFileName()).exists()){
streamTask.getStreamProcesses().remove(streamProcess);
}
startTask(streamUrl, streamTask);
}).exceptionally(ex -> { }).exceptionally(ex -> {
logger.info("streamUrl {} taskId {} Process {} Exceptionally", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId(),ex); logger.info("streamUrl {} taskId {} Process {} Exceptionally", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId(),ex);
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
@ -322,7 +359,9 @@ public class TaskService implements ITaskService{
public void stopAllTask(){ public void stopAllTask(){
runningTasks.forEach((key, value) -> { runningTasks.forEach((key, value) -> {
stopTask(key); try {
stopTask(key);
}catch (Exception ignore){}
}); });
} }
} }

View File

@ -3,6 +3,7 @@ package com.tuoheng.steam.service.innerService;
import com.tuoheng.steam.dos.StreamProcess; import com.tuoheng.steam.dos.StreamProcess;
import com.tuoheng.steam.dos.ProcessType; import com.tuoheng.steam.dos.ProcessType;
import com.tuoheng.steam.service.dos.FlvRecord; import com.tuoheng.steam.service.dos.FlvRecord;
import com.tuoheng.steam.util.FileUtil;
import com.tuoheng.steam.util.ProcessManager; import com.tuoheng.steam.util.ProcessManager;
import com.tuoheng.steam.util.TimeUtils; import com.tuoheng.steam.util.TimeUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -14,14 +15,8 @@ import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.ArrayList; import java.util.*;
import java.util.Date; import java.util.concurrent.*;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Service @Service
@ -30,8 +25,8 @@ public class ProcessService {
@Value("${srs.splitPath}") @Value("${srs.splitPath}")
private String splitPath; private String splitPath;
@Value("${ffmpeg}") // @Value("${ffmpeg}")
private String ffmpeg; private String ffmpeg = "ffmpeg";
@Value("${srs.targetPath}") @Value("${srs.targetPath}")
private String targetPath; private String targetPath;
@ -62,6 +57,7 @@ public class ProcessService {
loggingService.execute(new Runnable() { loggingService.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
logger.info("mergeMp4 process Start {}",command);
try (BufferedReader reader = new BufferedReader( try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) { new InputStreamReader(process.getInputStream()))) {
String line; String line;
@ -75,34 +71,105 @@ public class ProcessService {
File delete = new File(recordPath+ File.separator + File delete = new File(recordPath+ File.separator +
flvRecord.getStream().getDayRecord().getDay() + File.separator + flvRecord.getStream().getDayRecord().getDay() + File.separator +
flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv()); flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv());
if(delete.exists()){ FileUtil.deleteFile(delete);
delete.delete();
}
} }
}); });
CompletableFuture<Process> future = process.onExit(); CompletableFuture<Process> future = process.onExit();
// 阻塞等待进程结束 // 阻塞等待进程结束
Process completedProcess = future.get(); Process completedProcess = null;
try {
// 检查进程是否成功结束 completedProcess = future.get(4, TimeUnit.SECONDS);
if (completedProcess.exitValue() == 0) { logger.info("mergeMp4正常完成-------- Over");
logger.info("进程成功结束!"); }catch (Exception e) {
/** String fileName = recordPath+ File.separator +
* 删除数据 flvRecord.getStream().getDayRecord().getDay() + File.separator +
*/ flvRecord.getStream().getStreamId() + File.separator +flvRecord.getStartTime() + ".mp4";
} else { File file = new File(fileName);
logger.info("进程失败,退出码 {} " ,completedProcess.exitValue()); if(file.exists()){
logger.info("mergeMp4超时4S完成-------- Over",e);
process.destroy();
} else {
try {
completedProcess = future.get(4, TimeUnit.SECONDS);
}catch (Exception e1) {
if(file.exists()){
logger.info("mergeMp4超时8S完成-------- Over",e);
process.destroy();
}else {
logger.info("mergeMp4超时8S未完成-------- Over",e);
process.destroy();
}
}
}
} }
if(Objects.nonNull(completedProcess)) {
// 检查进程是否成功结束
if (completedProcess.exitValue() == 0) {
logger.info("进程成功结束!");
} else {
logger.info("进程失败,退出码 {} " ,completedProcess.exitValue());
}
}
}
public StreamProcess takePic(String streamUrl) throws IOException {
String outFileName = targetPath + "/" +UUID.randomUUID().toString() +".jpg";
String command = String.format(
ffmpeg+ " -i %s -vf fps=1 -frames:v 1 -q:v 2 %s",
streamUrl, outFileName);
List<String> listCommand = new ArrayList<>();
listCommand.add(ffmpeg); // ffmpeg 路径
listCommand.add("-i");
listCommand.add(streamUrl); // 流地址
listCommand.add("-vf");
listCommand.add("fps=1");
listCommand.add("-frames:v");
listCommand.add("1");
listCommand.add("-q:v");
listCommand.add("2");
listCommand.add(outFileName); // 输出文件名
logger.info("takePic {}", listCommand);
ProcessBuilder pb = new ProcessBuilder(listCommand);
pb.redirectErrorStream(true);
Process process = pb.start();
loggingService.execute(new Runnable() {
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
logger.info("recordStream-------- {}",line);
}
} catch (IOException e) {
logger.info("takePic-------- Over");
}
}
});
return new StreamProcess(process,outFileName, ProcessType.PIC);
} }
public StreamProcess recordStream(String streamUrl) throws IOException { public StreamProcess recordStream(String streamUrl) throws IOException {
String recordFileName = splitPath+'/'+UUID.randomUUID().toString() +".ts"; String recordFileName = splitPath+'/'+UUID.randomUUID().toString() +".ts";
/**
* 如果streamUrl末尾有个_,在实际录制的时候需要将_去除
*/
String command = String.format( String command = String.format(
ffmpeg+ " -i %s -c copy -f mpegts -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 2 %s", ffmpeg+ " -i %s -c copy -f mpegts -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 2 %s",
streamUrl, recordFileName); Optional.of(streamUrl)
.filter(s -> s.endsWith("_"))
.map(s -> s.substring(0, s.length() - 1))
.orElse(streamUrl), recordFileName);
logger.info("recordStream {}", command); logger.info("recordStream {}", command);
ProcessBuilder pb = new ProcessBuilder(command.split(" ")); ProcessBuilder pb = new ProcessBuilder(command.split(" "));
pb.redirectErrorStream(true); pb.redirectErrorStream(true);

View File

@ -1,7 +1,10 @@
package com.tuoheng.steam.util; package com.tuoheng.steam.util;
import com.tuoheng.steam.controller.StreamRecordController;
import com.tuoheng.steam.service.dos.DayRecord; import com.tuoheng.steam.service.dos.DayRecord;
import com.tuoheng.steam.service.dos.StreamType; import com.tuoheng.steam.service.dos.StreamType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -10,10 +13,23 @@ import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream; import java.util.stream.Stream;
public class FileUtil { public class FileUtil {
private static final Logger logger = LoggerFactory.getLogger(FileUtil.class);
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
public static void deleteFile(File file) {
if(file.exists()) {
file.delete();
}
}
public static void deleteFolder(String stringPath) { public static void deleteFolder(String stringPath) {
try { try {
@ -30,10 +46,10 @@ public class FileUtil {
}); });
} }
} else { } else {
System.out.println("文件夹不存在: " + path); logger.info("文件夹不存在: " + path);
} }
}catch (Exception e){ }catch (Exception e) {
e.printStackTrace(); logger.error("deleteFolder",e);
} }
} }
@ -53,11 +69,7 @@ public class FileUtil {
for (File file : files) { for (File file : files) {
list.add(file.getName()); list.add(file.getName());
} }
} else {
} }
} else {
} }
return list; return list;
} }

View File

@ -1,5 +1,8 @@
package com.tuoheng.steam.util; package com.tuoheng.steam.util;
import com.tuoheng.steam.controller.StreamRecordController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.*; import java.io.*;
@ -8,6 +11,8 @@ import java.util.*;
@Service @Service
public class ProcessManager { public class ProcessManager {
private static final Logger logger = LoggerFactory.getLogger(ProcessManager.class);
private static final String DELIMITER = "="; private static final String DELIMITER = "=";
static Map<Long, String> runningProcessIds = new HashMap<>(); static Map<Long, String> runningProcessIds = new HashMap<>();
@ -96,9 +101,9 @@ public class ProcessManager {
int exitCode = process.waitFor(); int exitCode = process.waitFor();
if (exitCode == 0) { if (exitCode == 0) {
System.out.println("Process with PID " + pid + " terminated successfully."); logger.info("Process with PID " + pid + " terminated successfully.");
} else { } else {
System.err.println("Failed to terminate process with PID " + pid + ". Exit code: " + exitCode); logger.error("Failed to terminate process with PID " + pid + ". Exit code: " + exitCode);
} }
} }

View File

@ -1,14 +1,52 @@
spring.application.name=demo #公司环境
server.port = 8989 #spring.application.name=stream_server
srs.splitPath=/data/java/srs/stream_server/temp #server.port = 9011
srs.targetPath=/data/java/srs/srs/trunk/objs/nginx/html #srs.splitPath=/data/java/srs/stream_server/temp
ffmpeg=/data/ffmpeg/bin/ffmpeg #srs.targetPath=/data/java/srs/srs/trunk/objs/nginx/html/recording
recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record #ffmpeg=ffmpeg
livedates=8 #recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record
#livedates=8
#cangneiwai=false
#srs.domain = ""
#srs.name = ""
#
#大数据局
#spring.application.name=stream_server
#server.port = 8989
#srs.splitPath=/data/java/srs/stream_server/temp
#srs.targetPath=/data/java/srs/srs/trunk/objs/nginx/html/recording
#ffmpeg=ffmpeg
#recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record
#livedates=8
#cangneiwai=true
#srs.domain = ""
#srs.name = ""
#server.port = 8080 #本地测试
#spring.application.name=stream_server
#server.port = 9011
#srs.splitPath=/Users/sunpeng/workspace/stream/temp #srs.splitPath=/Users/sunpeng/workspace/stream/temp
#srs.targetPath=/Users/sunpeng/workspace/stream/html #srs.targetPath=/Users/sunpeng/workspace/stream/html
#ffmpeg=ffmpeg #ffmpeg=ffmpeg
#recordPath=/Users/sunpeng/workspace/stream/record #recordPath=/Users/sunpeng/workspace/stream/record
#livedates=7 #livedates=7
#cangneiwai=false
#srs.domain=
#srs.name=
##容器化部署
#通过注入
srs.domain = STREAM.t-aaron.com
#通过注入
srs.name = STREAM
spring.application.name=stream_server
server.port = 8080
#零时文件
srs.splitPath=/data/temp
#拍照 + 录像
srs.targetPath=/data/recording
ffmpeg=ffmpeg
#
recordPath=/data/record
livedates=8
cangneiwai=true

View File

@ -0,0 +1,40 @@
<configuration>
<!-- 控制台输出 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 文件输出(按日滚动) -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>./logs/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>./logs/app.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!--skywalking日志上报-->
<appender name="grpc-log" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
<Pattern>[th-airport] %d{yyyy-MM-dd HH:mm:ss.SSS} [%X{tid}] [%thread] %-5level %logger{36} -%msg%n</Pattern>
</layout>
</encoder>
</appender>
<!-- 日志级别设置 -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
<appender-ref ref="grpc-log"/>
</root>
<!-- <root level="TRACE">-->
<!-- <appender-ref ref="CONSOLE"/>-->
<!-- <appender-ref ref="FILE"/>-->
<!-- <appender-ref ref="grpc-log"/>-->
<!-- </root>-->
</configuration>

View File

@ -1,34 +0,0 @@
package com.tuoheng.steam;
import com.tuoheng.steam.service.TaskService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class StreamApplicationTests {
@Autowired
TaskService taskService;
@Test
void contextLoads() {
taskService.startTask("rtmp://stream.t-aaron.com/live/123");
boolean stop = false;
while (!stop) {
try {
Thread.sleep(60000L);
}catch (Exception e){
}
}
taskService.stopAllTask();
}
}