package com.tuoheng.steam.service; import com.alibaba.fastjson2.JSON; import com.tuoheng.steam.dos.StreamProcess; import com.tuoheng.steam.dos.StreamTask; import com.tuoheng.steam.service.innerService.ProcessService; import com.tuoheng.steam.util.ProcessManager; import com.tuoheng.steam.util.TimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.util.*; import java.util.concurrent.*; @Service @EnableScheduling public class TaskService implements ITaskService{ @Value("${srs.targetPath}") private String targetPath; // @Value("${ffmpeg}") private String ffmpeg ="ffmpeg"; ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); /** * 清除没有被关闭的进程 */ @Scheduled(fixedRate = 60000) public void cleaTask() { for (ConcurrentHashMap.Entry entry : runningTasks.entrySet()) { StreamTask streamTask = entry.getValue(); if(streamTask.getStartTime().getTime() < new Date().getTime() - 30 * 60 * 1000 ) { logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask)); try { StreamTask s = stopTask(entry.getKey()); scheduler.schedule(() -> { File file = new File(targetPath +"/"+ s.getOutFileName()); try { logger.info("废弃文件删除成功 {}",file.delete()); }catch (Exception e) { logger.error(e.getMessage()); } }, 60, TimeUnit.SECONDS); }catch (Exception ignore) {} } } } private static final Logger logger = LoggerFactory.getLogger(TaskService.class); @Autowired ProcessService processService; /** * 执行中的任务 */ ConcurrentHashMap runningTasks = new ConcurrentHashMap<>(); /** * 历史任务 */ ConcurrentHashMap> historyTasks = new ConcurrentHashMap<>(); /** * 任务池 */ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); @PostConstruct public void init() { /** * */ scheduler.scheduleWithFixedDelay(new Runnable() { @Override public void run() { runningTasks.forEach((key, value) -> { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.HOUR, -2); Date twoHoursAgo = calendar.getTime(); if (value.getStartTime().before(twoHoursAgo)) { logger.error("taskId {} 执行超时,手动关闭", key); try { stopTask(key); }catch (Exception ignore) {} } }); for (Map.Entry> entry : historyTasks.entrySet()) { for (StreamTask task : entry.getValue()) { List streamProcesses = task.getStreamProcesses(); for (StreamProcess process : streamProcesses) { process.destroy(); File file = new File(process.getFileName()); if(file.exists()){ file.delete(); } } } while (entry.getValue().size()>10){ entry.getValue().pollFirst(); } } } }, 60, 60, TimeUnit.SECONDS); } public String startPic(String streamUrl) { try { StreamProcess streamProcess = processService.takePic(streamUrl); String outFileName = streamProcess.getFileName(); /* 注册进程 */ ProcessManager.registerProcess(streamProcess.getInnerProcessId()); logger.info("streamUrl {} startPicProcess {} ", streamUrl,streamProcess.getInnerProcessId()); CompletableFuture future = streamProcess.onExit(); future.thenRun(() -> { logger.info("streamUrl {} startPicProcess {} Over ", streamUrl,streamProcess.getInnerProcessId()); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); }).exceptionally(ex -> { logger.info("streamUrl {} startPicProcess {} Exceptionally ", streamUrl,streamProcess.getInnerProcessId()); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); return null; }); return outFileName; }catch (Exception e) { logger.info("streamUrl {} ", streamUrl,e); } return null; } public StreamTask startTask(String streamUrl) { /* 当前有任务,则返回当前的任务 */ if (runningTasks.containsKey(streamUrl)) { return runningTasks.get(streamUrl); } /* 当前无任务,则开启新任务 */ StreamTask taskInstance = new StreamTask(streamUrl); if (runningTasks.putIfAbsent(streamUrl, taskInstance) == null) { logger.info("streamUrl {} startTask {} ", streamUrl, taskInstance.getTaskId()); startTask(streamUrl, taskInstance); // runningTasks.put(streamUrl, taskInstance); return taskInstance; } else { return runningTasks.get(streamUrl); } } public StreamTask stopTask(String streamUrl) throws Exception { boolean recordSuccess = true; StreamTask currentStreamTask = runningTasks.remove(streamUrl); if (currentStreamTask!= null) { currentStreamTask.setStopCommandTime(TimeUtils.formatDateToString(new Date())); if (!CollectionUtils.isEmpty(currentStreamTask.getStreamProcesses())) { for (StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) { if(!new File(streamProcess.getFileName()).exists()){ recordSuccess = false; logger.error("流录制失败: streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId()); }else { logger.info("streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId()); } streamProcess.destroy(); } } if(!recordSuccess){ throw new Exception(""); } try { String outFileName = UUID.randomUUID().toString() + ".mp4"; StreamProcess mergeProcess = processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses()); if(Objects.nonNull(mergeProcess)){ /** * 注册进程 */ ProcessManager.registerProcess(mergeProcess.getInnerProcessId()); currentStreamTask.setMergeProcess(mergeProcess); logger.info("streamUrl {} taskId {} startMergeProcess {} ", streamUrl, currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId()); CompletableFuture future = mergeProcess.onExit(); future.thenRun(() -> { logger.info("streamUrl {} taskId {} MergeProcess {} Over", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId()); mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date())); ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){ File file = new File(streamProcess.getFileName()); if(file.exists()){ file.delete(); } } }).exceptionally(ex -> { logger.error("streamUrl {} taskId {} MergeProcess {} exceptionally", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId(), ex); mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date())); ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){ File file = new File(streamProcess.getFileName()); if(file.exists()){ file.delete(); } } return null; }); currentStreamTask.setOutFileName(outFileName); } if(Objects.nonNull(currentStreamTask.getMergeProcess())){ if(Objects.nonNull(currentStreamTask.getMergeProcess().getFileName()) && !currentStreamTask.getMergeProcess().getFileName().isEmpty()){ try { executor.schedule(new Runnable() { @Override public void run() { try { logger.info("FileName :"+currentStreamTask.getMergeProcess().getFileName()); Process process = Runtime.getRuntime().exec(ffmpeg + " -i " + currentStreamTask.getMergeProcess().getFileName()); BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream())); String line; while ((line = reader.readLine()) != null) { logger.info("Duration line: " + line); if (line.contains("Duration")) { String duration = line.split("Duration: ")[1].split(",")[0]; logger.info("Duration: " + duration.substring(0, 8)); currentStreamTask.setDuration(duration.substring(0, 8)); logger.info("-----------------放入缓存-----------------"); /** * 放入缓存 */ if( historyTasks.containsKey(streamUrl)){ historyTasks.get(streamUrl).add(currentStreamTask); }else { historyTasks.put(streamUrl, new LinkedList<>()); historyTasks.get(streamUrl).offerLast(currentStreamTask); } break; } } reader.close(); }catch (Exception e){ logger.info(e.getMessage()); } } },40, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } } } return currentStreamTask; } catch (Exception e) { logger.info("taskId {} Stop exceptionally", streamUrl,e); return null; } }else { throw new Exception(""); // if(historyTasks.containsKey(streamUrl)){ // return historyTasks.get(streamUrl).peekLast(); // }else { // return null; // } } } public StreamTask getLastTask(String streamUrl) { StreamTask current = runningTasks.get(streamUrl); if(Objects.nonNull(current)){ return current; }else { if(historyTasks.containsKey(streamUrl)){ return historyTasks.get(streamUrl).peekLast(); } return null; } } private void startTask(String streamUrl, StreamTask streamTask) { try { /* 任务不存在了,直接退出 */ if (streamTask.getStopCommandTime() != null) { logger.info("streamUrl {} Task {} Stopped", streamUrl, streamTask.getTaskId()); return; } StreamProcess streamProcess = processService.recordStream(streamUrl); /** * 注册进程 */ ProcessManager.registerProcess(streamProcess.getInnerProcessId()); streamTask.getStreamProcesses().add(streamProcess); logger.info("streamUrl {} taskId {} startProcess {} ", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId()); CompletableFuture future = streamProcess.onExit(); future.thenRun(() -> { logger.info("streamUrl {} taskId {} Process {} Over", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId()); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); startTask(streamUrl, streamTask); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); }).exceptionally(ex -> { logger.info("streamUrl {} taskId {} Process {} Exceptionally", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId(),ex); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); startTask(streamUrl, streamTask); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); return null; }); } catch (IOException e) { logger.info("streamUrl {} taskId {} startProcess Exception", streamUrl,streamTask.getTaskId(),e); } } public void stopAllTask(){ runningTasks.forEach((key, value) -> { try { stopTask(key); }catch (Exception ignore){} }); } }