stream_srs/src/main/java/com/tuoheng/steam/service/TaskService.java

275 lines
10 KiB
Java

package com.tuoheng.steam.service;
import com.alibaba.fastjson2.JSON;
import com.tuoheng.steam.dos.StreamProcess;
import com.tuoheng.steam.dos.StreamTask;
import com.tuoheng.steam.service.innerService.ProcessService;
import com.tuoheng.steam.util.ProcessManager;
import com.tuoheng.steam.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
@Service
@EnableScheduling
public class TaskService implements ITaskService{
@Value("${srs.targetPath}")
private String targetPath;
/**
* 清除没有被关闭的进程
*/
@Scheduled(fixedRate = 60000)
public void cleaTask() {
for (ConcurrentHashMap.Entry<String, StreamTask> entry : runningTasks.entrySet()) {
StreamTask streamTask = entry.getValue();
if(streamTask.getStartTime().getTime() < new Date().getTime() - 30 * 60 * 1000 ) {
logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask));
StreamTask s = stopTask(entry.getKey());
scheduler.schedule(() -> {
File file = new File(targetPath +"/"+ s.getOutFileName());
try {
file.delete();
logger.info("废弃文件删除成功");
}catch (Exception e) {
logger.error(e.getMessage());
}
}, 60, TimeUnit.SECONDS);
}
}
}
private static final Logger logger = LoggerFactory.getLogger(TaskService.class);
@Autowired
ProcessService processService;
/**
* 执行中的任务
*/
ConcurrentHashMap<String, StreamTask> runningTasks = new ConcurrentHashMap<>();
/**
* 历史任务
*/
ConcurrentHashMap<String, Deque<StreamTask>> historyTasks = new ConcurrentHashMap<>();
/**
* 任务池
*/
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
@PostConstruct
public void init() {
/**
*
*/
scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
runningTasks.forEach((key, value) -> {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.HOUR, -2);
Date twoHoursAgo = calendar.getTime();
if (value.getStartTime().before(twoHoursAgo)) {
logger.error("taskId {} 执行超时,手动关闭", key);
stopTask(key);
}
});
for (Map.Entry<String, Deque<StreamTask>> entry : historyTasks.entrySet()) {
for (StreamTask task : entry.getValue()) {
List<StreamProcess> streamProcesses = task.getStreamProcesses();
for (StreamProcess process : streamProcesses) {
process.destroy();
File file = new File(process.getFileName());
if(file.exists()){
file.delete();
}
}
}
while (entry.getValue().size()>10){
entry.getValue().pollFirst();
}
}
}
}, 60, 60, TimeUnit.SECONDS);
}
public StreamTask startTask(String streamUrl) {
/*
当前有任务,则返回当前的任务
*/
if (runningTasks.containsKey(streamUrl)) {
return runningTasks.get(streamUrl);
}
/*
当前无任务,则开启新任务
*/
StreamTask taskInstance = new StreamTask(streamUrl);
if (runningTasks.putIfAbsent(streamUrl, taskInstance) == null) {
logger.info("streamUrl {} startTask {} ", streamUrl, taskInstance.getTaskId());
startTask(streamUrl, taskInstance);
// runningTasks.put(streamUrl, taskInstance);
return taskInstance;
} else {
return runningTasks.get(streamUrl);
}
}
public StreamTask stopTask(String streamUrl) {
StreamTask currentStreamTask = runningTasks.remove(streamUrl);
if (currentStreamTask!= null) {
currentStreamTask.setStopCommandTime(TimeUtils.formatDateToString(new Date()));
if (!CollectionUtils.isEmpty(currentStreamTask.getStreamProcesses())) {
for (StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) {
logger.info("streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId());
streamProcess.destroy();
}
}
try {
String outFileName = UUID.randomUUID().toString() + ".mp4";
StreamProcess mergeProcess = processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses());
if(Objects.nonNull(mergeProcess)){
/**
* 注册进程
*/
ProcessManager.registerProcess(mergeProcess.getInnerProcessId());
currentStreamTask.setMergeProcess(mergeProcess);
logger.info("streamUrl {} taskId {} startMergeProcess {} ", streamUrl, currentStreamTask.getTaskId(),
mergeProcess.getInnerProcessId());
CompletableFuture<Process> future = mergeProcess.onExit();
future.thenRun(() -> {
logger.info("streamUrl {} taskId {} MergeProcess {} Over", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId());
mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId());
for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){
File file = new File(streamProcess.getFileName());
if(file.exists()){
file.delete();
}
}
}).exceptionally(ex -> {
logger.error("streamUrl {} taskId {} MergeProcess {} exceptionally", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId(), ex);
mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId());
for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){
File file = new File(streamProcess.getFileName());
if(file.exists()){
file.delete();
}
}
return null;
});
currentStreamTask.setOutFileName(outFileName);
}
if( historyTasks.containsKey(streamUrl)){
historyTasks.get(streamUrl).add(currentStreamTask);
}else {
historyTasks.put(streamUrl, new LinkedList<>());
historyTasks.get(streamUrl).offerLast(currentStreamTask);
}
return currentStreamTask;
} catch (Exception e) {
logger.info("taskId {} Stop exceptionally", streamUrl,e);
return null;
}
}else {
if(historyTasks.containsKey(streamUrl)){
return historyTasks.get(streamUrl).peekLast();
}else {
return null;
}
}
}
public StreamTask getLastTask(String streamUrl) {
StreamTask current = runningTasks.get(streamUrl);
if(Objects.nonNull(current)){
return current;
}else {
if(historyTasks.containsKey(streamUrl)){
return historyTasks.get(streamUrl).peekLast();
}
return null;
}
}
private void startTask(String streamUrl, StreamTask streamTask) {
try {
/*
任务不存在了,直接退出
*/
if (streamTask.getStopCommandTime() != null) {
logger.info("streamUrl {} Task {} Stopped", streamUrl, streamTask.getTaskId());
return;
}
StreamProcess streamProcess = processService.recordStream(streamUrl);
/**
* 注册进程
*/
ProcessManager.registerProcess(streamProcess.getInnerProcessId());
streamTask.getStreamProcesses().add(streamProcess);
logger.info("streamUrl {} taskId {} startProcess {} ", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId());
CompletableFuture<Process> future = streamProcess.onExit();
future.thenRun(() -> {
logger.info("streamUrl {} taskId {} Process {} Over", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId());
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
startTask(streamUrl, streamTask);
ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId());
}).exceptionally(ex -> {
logger.info("streamUrl {} taskId {} Process {} Exceptionally", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId(),ex);
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
startTask(streamUrl, streamTask);
ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId());
return null;
});
} catch (IOException e) {
logger.info("streamUrl {} taskId {} startProcess Exception", streamUrl,streamTask.getTaskId(),e);
}
}
public void stopAllTask(){
runningTasks.forEach((key, value) -> {
stopTask(key);
});
}
}