Compare commits

..

4 Commits

Author SHA1 Message Date
孙小云 889a978bbe 删除 2025-04-14 09:39:36 +08:00
孙小云 95263673f9 生产环境无法注入的问题 2025-04-02 17:12:56 +08:00
孙小云 5a569084b8 添加如果流不存在则关闭时候报错 2025-04-02 16:46:00 +08:00
孙小云 4779c1b1bc 添加如果流不存在则关闭时候报错 2025-04-02 16:44:39 +08:00
6 changed files with 77 additions and 57 deletions

View File

@ -72,12 +72,16 @@ public class StreamRecordController {
if(Objects.isNull(streamUrl)) {
return Response.fail(-1);
}
Response<StreamTask> response = Response.success(taskService.stopTask(streamUrl));
if(Objects.isNull(response.getData().getOutFileName()) || response.getData().getOutFileName().isEmpty()){
response.setCode(500);
try {
Response<StreamTask> response = Response.success(taskService.stopTask(streamUrl));
if(Objects.isNull(response.getData()) || Objects.isNull(response.getData().getOutFileName()) || response.getData().getOutFileName().isEmpty()){
response.setCode(500);
}
System.out.println("关闭录制返回 :"+ JSON.toJSONString(response));
return response;
}catch (Exception e){
return Response.fail(-1);
}
System.out.println("关闭录制返回 :"+ JSON.toJSONString(response));
return response;
}
@GetMapping("info")

View File

@ -5,14 +5,15 @@ import com.tuoheng.steam.service.dos.*;
import java.util.Date;
import java.util.List;
/**
* 视频回放服务
*/
public interface IRecordService {
public List<DayRecord> findDaysPath();
public List<StreamRecord> findInDayRecord(DayRecord dayRecord);
public void mergeMp4(FlvRecord flvRecord);
}

View File

@ -2,13 +2,16 @@ package com.tuoheng.steam.service;
import com.tuoheng.steam.dos.StreamTask;
/**
* 视频录制服务
*/
public interface ITaskService {
public StreamTask startTask(String streamUrl);
public String startPic(String streamUrl);
public StreamTask stopTask(String streamUrl);
public StreamTask stopTask(String streamUrl) throws Exception;
public StreamTask getLastTask(String streamUrl);

View File

@ -30,8 +30,8 @@ public class TaskService implements ITaskService{
@Value("${srs.targetPath}")
private String targetPath;
@Value("${ffmpeg}")
private String ffmpeg;
// @Value("${ffmpeg}")
private String ffmpeg ="ffmpeg";
ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
@ -44,16 +44,18 @@ public class TaskService implements ITaskService{
StreamTask streamTask = entry.getValue();
if(streamTask.getStartTime().getTime() < new Date().getTime() - 30 * 60 * 1000 ) {
logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask));
StreamTask s = stopTask(entry.getKey());
scheduler.schedule(() -> {
File file = new File(targetPath +"/"+ s.getOutFileName());
try {
file.delete();
logger.info("废弃文件删除成功");
}catch (Exception e) {
logger.error(e.getMessage());
}
}, 60, TimeUnit.SECONDS);
try {
StreamTask s = stopTask(entry.getKey());
scheduler.schedule(() -> {
File file = new File(targetPath +"/"+ s.getOutFileName());
try {
logger.info("废弃文件删除成功 {}",file.delete());
}catch (Exception e) {
logger.error(e.getMessage());
}
}, 60, TimeUnit.SECONDS);
}catch (Exception ignore) {}
}
}
}
@ -93,7 +95,9 @@ public class TaskService implements ITaskService{
Date twoHoursAgo = calendar.getTime();
if (value.getStartTime().before(twoHoursAgo)) {
logger.error("taskId {} 执行超时,手动关闭", key);
stopTask(key);
try {
stopTask(key);
}catch (Exception ignore) {}
}
});
@ -170,7 +174,9 @@ public class TaskService implements ITaskService{
}
public StreamTask stopTask(String streamUrl) {
public StreamTask stopTask(String streamUrl) throws Exception {
boolean recordSuccess = true;
StreamTask currentStreamTask = runningTasks.remove(streamUrl);
if (currentStreamTask!= null) {
@ -178,11 +184,20 @@ public class TaskService implements ITaskService{
if (!CollectionUtils.isEmpty(currentStreamTask.getStreamProcesses())) {
for (StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) {
logger.info("streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId());
if(!new File(streamProcess.getFileName()).exists()){
recordSuccess = false;
logger.error("流录制失败: streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId());
}else {
logger.info("streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId());
}
streamProcess.destroy();
}
}
if(!recordSuccess){
throw new Exception("");
}
try {
String outFileName = UUID.randomUUID().toString() + ".mp4";
StreamProcess mergeProcess = processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses());
@ -225,19 +240,10 @@ public class TaskService implements ITaskService{
}
// if( historyTasks.containsKey(streamUrl)){
// historyTasks.get(streamUrl).add(currentStreamTask);
// }else {
// historyTasks.put(streamUrl, new LinkedList<>());
// historyTasks.get(streamUrl).offerLast(currentStreamTask);
// }
if(Objects.nonNull(currentStreamTask.getMergeProcess())){
if(Objects.nonNull(currentStreamTask.getMergeProcess().getFileName()) &&
!currentStreamTask.getMergeProcess().getFileName().isEmpty()){
try {
//
// scheduler.schedule(task, 3, TimeUnit.SECONDS);
executor.schedule(new Runnable() {
@Override
public void run() {
@ -287,11 +293,12 @@ public class TaskService implements ITaskService{
}
}else {
if(historyTasks.containsKey(streamUrl)){
return historyTasks.get(streamUrl).peekLast();
}else {
return null;
}
throw new Exception("");
// if(historyTasks.containsKey(streamUrl)){
// return historyTasks.get(streamUrl).peekLast();
// }else {
// return null;
// }
}
}
@ -350,7 +357,9 @@ public class TaskService implements ITaskService{
public void stopAllTask(){
runningTasks.forEach((key, value) -> {
stopTask(key);
try {
stopTask(key);
}catch (Exception ignore){}
});
}
}

View File

@ -14,10 +14,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -30,8 +27,8 @@ public class ProcessService {
@Value("${srs.splitPath}")
private String splitPath;
@Value("${ffmpeg}")
private String ffmpeg;
// @Value("${ffmpeg}")
private String ffmpeg = "ffmpeg";
@Value("${srs.targetPath}")
private String targetPath;
@ -143,9 +140,15 @@ public class ProcessService {
public StreamProcess recordStream(String streamUrl) throws IOException {
String recordFileName = splitPath+'/'+UUID.randomUUID().toString() +".ts";
/**
* 如果streamUrl末尾有个_,在实际录制的时候需要将_去除
*/
String command = String.format(
ffmpeg+ " -i %s -c copy -f mpegts -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 2 %s",
streamUrl, recordFileName);
Optional.of(streamUrl)
.filter(s -> s.endsWith("_"))
.map(s -> s.substring(0, s.length() - 1))
.orElse(streamUrl), recordFileName);
logger.info("recordStream {}", command);
ProcessBuilder pb = new ProcessBuilder(command.split(" "));
pb.redirectErrorStream(true);

View File

@ -1,14 +1,14 @@
spring.application.name=demo
#server.port = 8989
#srs.splitPath=/data/java/srs/stream_server/temp
#srs.targetPath=/data/java/srs/srs/trunk/objs/nginx/html
#ffmpeg=/data/ffmpeg/bin/ffmpeg
#recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record
#livedates=8
server.port = 8080
srs.splitPath=/Users/sunpeng/workspace/stream/temp
srs.targetPath=/Users/sunpeng/workspace/stream/html
server.port = 8989
srs.splitPath=/data/java/srs/stream_server/temp
srs.targetPath=/data/java/srs/srs/trunk/objs/nginx/html
ffmpeg=ffmpeg
recordPath=/Users/sunpeng/workspace/stream/record
livedates=7
recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record
livedates=8
#server.port = 8080
#srs.splitPath=/Users/sunpeng/workspace/stream/temp
#srs.targetPath=/Users/sunpeng/workspace/stream/html
#ffmpeg=ffmpeg
#recordPath=/Users/sunpeng/workspace/stream/record
#livedates=7