diff --git a/src/main/java/com/tuoheng/steam/controller/StreamRecordController.java b/src/main/java/com/tuoheng/steam/controller/StreamRecordController.java index 68e8cf0..1bc73d3 100644 --- a/src/main/java/com/tuoheng/steam/controller/StreamRecordController.java +++ b/src/main/java/com/tuoheng/steam/controller/StreamRecordController.java @@ -1,5 +1,9 @@ -package com.tuoheng.steam.controller; +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// +package com.tuoheng.steam.controller; import com.alibaba.fastjson2.JSON; import com.tuoheng.steam.controller.dto.PageInfo; @@ -12,219 +16,193 @@ import com.tuoheng.steam.service.ITaskService; import com.tuoheng.steam.service.dos.DayRecord; import com.tuoheng.steam.service.dos.Mp4Record; import com.tuoheng.steam.service.dos.StreamRecord; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; -import java.io.File; -import java.util.*; -import java.util.stream.Collectors; - -@RestController() -@RequestMapping("/record") +@RestController +@RequestMapping({"/record"}) public class StreamRecordController { - private static final Logger logger = LoggerFactory.getLogger(StreamRecordController.class); - @Autowired ITaskService taskService; - @Autowired IRecordService iRecordService; + public StreamRecordController() { + } - @GetMapping("pic") + @GetMapping({"pic"}) public Response startPic(@RequestParam String streamUrl) { - logger.info("启动视频拍照 :"+streamUrl); - streamUrl = streamSwitch(streamUrl); - logger.info("启动视频拍照_ :"+streamUrl); - if(Objects.isNull(streamUrl)) { + logger.info("启动视频拍照 :" + streamUrl); + streamUrl = this.streamSwitch(streamUrl); + logger.info("启动视频拍照_ :" + streamUrl); + if (Objects.isNull(streamUrl)) { return Response.fail(-1); - } - String outfile = taskService.startPic(streamUrl); - if(Objects.nonNull(outfile)) { - int lastSlashIndex = outfile.lastIndexOf("/"); - String fileName = outfile.substring(lastSlashIndex + 1); - return Response.success(fileName); - }else { - return Response.fail(-1); - } - } - - - - @GetMapping("start") - public Response startRecording(@RequestParam String streamUrl) { - logger.info("启动录制 :"+streamUrl); - streamUrl = streamSwitch(streamUrl); - logger.info("启动录制_ :"+streamUrl); - if(Objects.isNull(streamUrl)) { - return Response.fail(-1); - } - - Response response = Response.success(taskService.startTask(streamUrl)); - logger.info("启动录制返回 :"+ JSON.toJSONString(response)); - return response; - } - - @GetMapping("stop") - public Response stopRecording(@RequestParam String streamUrl){ - - logger.info("关闭录制 :"+streamUrl); - streamUrl = streamSwitch(streamUrl); - logger.info("关闭录制_ :"+streamUrl); - if(Objects.isNull(streamUrl)) { - return Response.fail(-1); - } - try { - Response response = Response.success(taskService.stopTask(streamUrl)); - if(Objects.isNull(response.getData()) || Objects.isNull(response.getData().getOutFileName()) || response.getData().getOutFileName().isEmpty()){ - response.setCode(500); + } else { + String outfile = this.taskService.startPic(streamUrl); + if (Objects.nonNull(outfile)) { + int lastSlashIndex = outfile.lastIndexOf("/"); + String fileName = outfile.substring(lastSlashIndex + 1); + return Response.success(fileName); + } else { + return Response.fail(-1); } - logger.info("关闭录制返回 :"+ JSON.toJSONString(response)); - return response; - }catch (Exception e){ - return Response.fail(-1); } } - @GetMapping("info") - public Response getLastTask(@RequestParam String streamUrl){ - logger.info("查看录制 :"+streamUrl); - streamUrl = streamSwitch(streamUrl); - logger.info("查看录制_ :"+streamUrl); - if(Objects.isNull(streamUrl)) { + @GetMapping({"start"}) + public Response startRecording(@RequestParam String streamUrl,@RequestParam String transaction) { + logger.info("启动录制 :" + streamUrl); + streamUrl = this.streamSwitch(streamUrl); + logger.info("启动录制_ :" + streamUrl); + if (Objects.isNull(streamUrl)) { return Response.fail(-1); + } else { + Response response = Response.success(this.taskService.startTask(streamUrl,transaction)); + logger.info("启动录制返回 :" + JSON.toJSONString(response)); + return response; } - Response response = Response.success(taskService.getLastTask(streamUrl)); - logger.info("查看录制返回 :"+ JSON.toJSONString(response)); - return response; } + @GetMapping({"stop"}) + public Response stopRecording(@RequestParam String streamUrl) { + logger.info("关闭录制 :" + streamUrl); + streamUrl = this.streamSwitch(streamUrl); + logger.info("关闭录制_ :" + streamUrl); + if (Objects.isNull(streamUrl)) { + return Response.fail(-1); + } else { + try { + Response response = Response.success(this.taskService.stopTask(streamUrl)); + if (Objects.isNull(response.getData()) || Objects.isNull(((StreamTask)response.getData()).getOutFileName()) || ((StreamTask)response.getData()).getOutFileName().isEmpty()) { + response.setCode(500); + } - /** - * 废弃 - * @param request - * @return - */ - @PostMapping("search") - public Response> streamView(@RequestBody PageStreamRequest request){ - logger.info("查看录制 search :"+JSON.toJSONString(request)); - if(Objects.isNull(request.getPageIndex()) || Objects.isNull(request.getPageSize()) - || request.getPageIndex() <0 || request.getPageSize() <=0 ){ + logger.info("关闭录制返回 :" + JSON.toJSONString(response)); + return response; + } catch (Exception var3) { + return Response.fail(-1); + } + } + } + + @GetMapping({"info"}) + public Response getLastTask(@RequestParam String streamUrl) { + logger.info("查看录制 :" + streamUrl); + streamUrl = this.streamSwitch(streamUrl); + logger.info("查看录制_ :" + streamUrl); + if (Objects.isNull(streamUrl)) { + return Response.fail(-1); + } else { + Response response = Response.success(this.taskService.getLastTask(streamUrl)); + logger.info("查看录制返回 :" + JSON.toJSONString(response)); + return response; + } + } + + @PostMapping({"search"}) + public Response> streamView(@RequestBody PageStreamRequest request) { + logger.info("查看录制 search :" + JSON.toJSONString(request)); + if (!Objects.isNull(request.getPageIndex()) && !Objects.isNull(request.getPageSize()) && request.getPageIndex() >= 0 && request.getPageSize() > 0) { + List mp4s = this.searchAll(request.getStreamId()); + if (!Strings.isBlank(request.getStartTime())) { + long filerTime = Long.parseLong(request.getStartTime()) - 900000L; + mp4s = (List)mp4s.stream().filter((info) -> info.getStartTime().compareTo(Long.toString(filerTime)) >= 0).collect(Collectors.toList()); + } + + if (!Strings.isBlank(request.getEndTime())) { + long filerTime = Long.parseLong(request.getEndTime()); + mp4s = (List)mp4s.stream().filter((info) -> info.getStartTime().compareTo(Long.toString(filerTime)) <= 0).collect(Collectors.toList()); + } + + mp4s.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime())); + PageInfo pageInfo = new PageInfo(mp4s, request.getPageIndex(), request.getPageSize()); + logger.info("查看录制 search 返回:" + JSON.toJSONString(request)); + return Response.success(pageInfo); + } else { logger.error("查看录制 search 入参错误!"); return Response.fail(-100); } - - List mp4s = searchAll(request.getStreamId()); - - if(!Strings.isBlank(request.getStartTime())){ - long filerTime = Long.parseLong(request.getStartTime()) - 15 * 60 * 1000; - mp4s = mp4s.stream() - .filter(info -> info.getStartTime().compareTo(Long.toString(filerTime)) >= 0) - .collect(Collectors.toList()); // 将结果收集到列表中 - } - - if(!Strings.isBlank(request.getEndTime())){ - long filerTime = Long.parseLong(request.getEndTime()); - mp4s = mp4s.stream() - .filter(info -> info.getStartTime().compareTo(Long.toString(filerTime)) <= 0) - .collect(Collectors.toList()); // 将结果收集到列表中 - } - /** - * 倒序排列 - */ - mp4s.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime())); - PageInfo pageInfo = new PageInfo<>(mp4s,request.getPageIndex(),request.getPageSize()); - logger.info("查看录制 search 返回:"+JSON.toJSONString(request)); - return Response.success(pageInfo); } + @GetMapping({"history"}) + public Response> streamView(@RequestParam String streamUrl) { + logger.info("查看录像历史返回 :" + streamUrl); + List dayMp4 = this.searchAll(streamUrl); + dayMp4.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime())); + logger.info("查看录像历史返回_ :" + JSON.toJSONString(dayMp4)); + return Response.success(dayMp4); + } - public List searchAll(String streamUrl){ + public List searchAll(String streamUrl) { + List dayMp4 = new ArrayList(); - List dayMp4 = new ArrayList<>(); - List dayRecords = iRecordService.findDaysPath(); - for(DayRecord dayRecord : dayRecords){ - List streamRecords = iRecordService.findInDayRecord(dayRecord); - for(StreamRecord stream : streamRecords){ - if(stream.getStreamId().equals(streamUrl)){ - List mp4Records = stream.queryMp4Records(); - if(Objects.nonNull(mp4Records) && !mp4Records.isEmpty()){ - for(Mp4Record mp4Record : mp4Records){ + for(DayRecord dayRecord : this.iRecordService.findDaysPath()) { + for(StreamRecord stream : this.iRecordService.findInDayRecord(dayRecord)) { + if (stream.getStreamId().equals(streamUrl)) { + List mp4Records = stream.queryMp4Records(); + if (Objects.nonNull(mp4Records) && !mp4Records.isEmpty()) { + for(Mp4Record mp4Record : mp4Records) { Mp4Info mp4Info = new Mp4Info(); mp4Info.setStartTime(mp4Record.getStartTime()); - mp4Info.setUrl(dayRecord.getDay()+ - File.separator + stream.getStreamId() + File.separator + mp4Record.getMp4()); + String var10001 = dayRecord.getDay(); + mp4Info.setUrl(var10001 + File.separator + stream.getStreamId() + File.separator + mp4Record.getMp4()); dayMp4.add(mp4Info); } } } } } - return dayMp4; + + return dayMp4; } + @GetMapping({"exit"}) + public String exit() { + new Thread(() -> { + try { + this.taskService.stopAllTask(); + Thread.sleep(10000L); + } catch (Exception var2) { + } - @Value("${srs.domain}") - private String srsdomain; + System.exit(0); + }); + return "OK"; + } - @Value("${srs.name}") - private String srsname; - - public String streamSwitch(String source){ - -// if(Objects.nonNull(srsdomain) && !srsdomain.isEmpty()){ -// if(Objects.nonNull(source) && !source.isEmpty()){ -// return dockerFix(source); -// } -// } - - - if(source.contains("stream.t-aaron.com")){ + public String streamSwitch(String source) { + if (source.contains("stream.t-aaron.com")) { return source; - } else if (source.contains("rtmp://live.push.t-aaron.com")){ - source = source.replace("rtmp://live.push.t-aaron.com","http://live.play.t-aaron.com") ; - if(source.endsWith("_")){ - source = source.substring(0,source.length()-1) + ".flv" + "_"; - }else { + } else if (source.contains("rtmp://live.push.t-aaron.com")) { + source = source.replace("rtmp://live.push.t-aaron.com", "http://live.play.t-aaron.com"); + if (source.endsWith("_")) { + String var10000 = source.substring(0, source.length() - 1); + source = var10000 + ".flv_"; + } else { source = source + ".flv"; } + return source; - }else { - if(source.contains("https://live.play.t-aaron.com")){ - source = source.replace("https://live.play.t-aaron.com","http://live.play.t-aaron.com"); + } else { + if (source.contains("https://live.play.t-aaron.com")) { + source = source.replace("https://live.play.t-aaron.com", "http://live.play.t-aaron.com"); } + return source; } - - } - - public String dockerFix(String url) { - if (url == null || url.isEmpty()) { - return url; - } - url = url.replaceFirst("^https?://", "rtmp://"); - // 删除末尾的 .flv - url = url.replaceFirst("\\.flv$", ""); - if(url.contains("srs-jndsj")){ - return url; - } - // 处理 RTMP URL - if (url.startsWith("rtmp://")) { - // 先删除端口号 - String withoutPort = url.replaceFirst("(rtmp://[^:/]+):\\d+", "$1"); - // 替换域名为 aaa - String withNewDomain = withoutPort.replaceFirst("rtmp://[^/]+", "rtmp://"+srsname); - // 删除 .flv 后缀 - return withNewDomain.replaceFirst("\\.flv$", ""); - } - - - - return url; } } diff --git a/src/main/java/com/tuoheng/steam/dos/StreamTask.java b/src/main/java/com/tuoheng/steam/dos/StreamTask.java index f964ff9..ad99606 100644 --- a/src/main/java/com/tuoheng/steam/dos/StreamTask.java +++ b/src/main/java/com/tuoheng/steam/dos/StreamTask.java @@ -1,54 +1,58 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// + package com.tuoheng.steam.dos; import com.tuoheng.steam.util.TimeUtils; - import java.io.Serializable; import java.util.ArrayList; import java.util.Date; import java.util.List; public class StreamTask implements Serializable { - - Long taskId; + Long taskId = generatePid(); Date startTime; String strStartTime; + String duration; + String stopCommandTime; + List streamProcesses = new ArrayList(); + StreamProcess mergeProcess; + private static Long pid = 0L; + String streamUrl; + String outFileName; + String transaction; + + public String getTransaction() { + return transaction; + } + + public void setTransaction(String transaction) { + this.transaction = transaction; + } public String getDuration() { - return duration; + return this.duration; } public void setDuration(String duration) { this.duration = duration; } - String duration; - /** - * 接收到结束命令的时间 - */ - String stopCommandTime; - /** - * 执行生成TS文件的命令 - */ - List streamProcesses = new ArrayList<>(); - /** - * 合并进程的命令 - */ - StreamProcess mergeProcess; - - private static Long pid = 0L; - - private synchronized static Long generatePid(){ - if(pid==Long.MAX_VALUE){ + private static synchronized Long generatePid() { + if (pid == Long.MAX_VALUE) { pid = 0L; - }else { - pid++; + } else { + Long var0 = pid; + pid = pid + 1L; } + return pid; } - public String getStrStartTime() { - return strStartTime; + return this.strStartTime; } public void setStrStartTime(String strStartTime) { @@ -59,19 +63,16 @@ public class StreamTask implements Serializable { this.strStartTime = startTime; } - public String getStopCommandTime() { - return stopCommandTime; + return this.stopCommandTime; } public void setStopCommandTime(String stopCommandTime) { this.stopCommandTime = stopCommandTime; } - - public List getStreamProcesses() { - return streamProcesses; + return this.streamProcesses; } public void setStreamProcesses(List streamProcesses) { @@ -79,7 +80,7 @@ public class StreamTask implements Serializable { } public StreamProcess getMergeProcess() { - return mergeProcess; + return this.mergeProcess; } public void setMergeProcess(StreamProcess mergeProcess) { @@ -87,19 +88,15 @@ public class StreamTask implements Serializable { } public String getStreamUrl() { - return streamUrl; + return this.streamUrl; } public void setStreamUrl(String streamUrl) { this.streamUrl = streamUrl; } - - String streamUrl; - - public Long getTaskId() { - return taskId; + return this.taskId; } public void setTaskId(Long taskId) { @@ -107,7 +104,7 @@ public class StreamTask implements Serializable { } public Date getStartTime() { - return startTime; + return this.startTime; } public void setStartTime(Date startTime) { @@ -115,22 +112,17 @@ public class StreamTask implements Serializable { } public StreamTask(String streamUrl) { - this.taskId = generatePid(); this.streamUrl = streamUrl; this.startTime = new Date(); this.strStartTime = TimeUtils.formatDateToString(this.startTime); this.stopCommandTime = null; } - - public String getOutFileName() { - return outFileName; + return this.outFileName; } public void setOutFileName(String outFileName) { this.outFileName = outFileName; } - - String outFileName; } diff --git a/src/main/java/com/tuoheng/steam/schedule/Scheduler.java b/src/main/java/com/tuoheng/steam/schedule/Scheduler.java index 42e67dc..8437f43 100644 --- a/src/main/java/com/tuoheng/steam/schedule/Scheduler.java +++ b/src/main/java/com/tuoheng/steam/schedule/Scheduler.java @@ -1,78 +1,62 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// + package com.tuoheng.steam.schedule; import com.tuoheng.steam.service.IRecordService; -import com.tuoheng.steam.service.TaskService; import com.tuoheng.steam.service.dos.DayRecord; import com.tuoheng.steam.service.dos.FlvRecord; -import com.tuoheng.steam.service.dos.Mp4Record; import com.tuoheng.steam.service.dos.StreamRecord; import com.tuoheng.steam.util.TimeUtils; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.util.*; - @Component public class Scheduler { - private static final Logger logger = LoggerFactory.getLogger(Scheduler.class); - @Autowired IRecordService iRecordService; - @Value("${livedates}") private Integer livedates; - @Value("${cangneiwai}") private Boolean cangneiwai; - /** - * 初次执行延迟6秒执行 - * 每隔 60 分钟执行一次 60*60*1000 - */ - @Scheduled(fixedRate = 3600000, initialDelay = 6000) + + public Scheduler() { + } + + @Scheduled( + fixedRate = 3600000L, + initialDelay = 6000L + ) public void mergeTask() { - - if(!cangneiwai){ + if (!this.cangneiwai) { logger.info("舱内外无需录制"); - return; - } + } else { + Logger var10000 = logger; + long var10001 = System.currentTimeMillis(); + var10000.info("开始FLV到MP4的转换 - " + var10001 / 1000L); + List dayRecords = this.iRecordService.findDaysPath(); - logger.info("开始FLV到MP4的转换 - " + System.currentTimeMillis() / 1000); - List dayRecords = iRecordService.findDaysPath(); - for (int index = 0; index < dayRecords.size(); index++) { - - DayRecord dayRecord = dayRecords.get(index); - if(TimeUtils.isBefore(dayRecord.getDay(),livedates)){ - dayRecord.clear(); - } else { - List streamRecords = dayRecord.getStreamRecords(); - for(StreamRecord streamRecord : streamRecords){ - List flvRecords = streamRecord.queryFlvRecords(); - for(FlvRecord flvRecord : flvRecords){ - iRecordService.mergeMp4(flvRecord); + for(int index = 0; index < dayRecords.size(); ++index) { + DayRecord dayRecord = (DayRecord)dayRecords.get(index); + if (TimeUtils.isBefore(dayRecord.getDay(), this.livedates)) { + dayRecord.clear(); + } else { + for(StreamRecord streamRecord : dayRecord.getStreamRecords()) { + for(FlvRecord flvRecord : streamRecord.queryFlvRecords()) { + this.iRecordService.mergeMp4(flvRecord); + } } } } + } } - - - - -// public static boolean isWithin15Minutes(String timestamp1, String timestamp2) { -// // 将字符串转换为 long 类型 -// long time1 = Long.parseLong(timestamp1); -// long time2 = Long.parseLong(timestamp2); -// // 计算时间差的绝对值 -// long diff = Math.abs(time2 - time1); -// // 15 分钟 = 15 * 60 * 1000 毫秒 -// long fifteenMinutesInMillis = 15 * 60 * 1000; -// // 判断是否小于或等于 15 分钟 -// return diff <= fifteenMinutesInMillis; -// } - } diff --git a/src/main/java/com/tuoheng/steam/service/ITaskService.java b/src/main/java/com/tuoheng/steam/service/ITaskService.java index e799a37..b91b4f2 100644 --- a/src/main/java/com/tuoheng/steam/service/ITaskService.java +++ b/src/main/java/com/tuoheng/steam/service/ITaskService.java @@ -7,7 +7,7 @@ import com.tuoheng.steam.dos.StreamTask; */ public interface ITaskService { - public StreamTask startTask(String streamUrl); + public StreamTask startTask(String streamUrl,String transaction); public String startPic(String streamUrl); diff --git a/src/main/java/com/tuoheng/steam/service/RecordService.java b/src/main/java/com/tuoheng/steam/service/RecordService.java index de15658..d99eef7 100644 --- a/src/main/java/com/tuoheng/steam/service/RecordService.java +++ b/src/main/java/com/tuoheng/steam/service/RecordService.java @@ -1,67 +1,66 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// + package com.tuoheng.steam.service; -import com.tuoheng.steam.service.dos.*; +import com.tuoheng.steam.service.dos.DayRecord; +import com.tuoheng.steam.service.dos.FlvRecord; +import com.tuoheng.steam.service.dos.StreamRecord; import com.tuoheng.steam.service.innerService.ProcessService; +import com.tuoheng.steam.util.FileUtil; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.io.File; -import java.util.*; -import java.util.stream.Collectors; - -import static com.tuoheng.steam.util.FileUtil.readFiles; - @Component public class RecordService implements IRecordService { - @Value("${recordPath}") private String recordPath; - @Autowired ProcessService processService; - private static final Logger logger = LoggerFactory.getLogger(IRecordService.class); + public RecordService() { + } + public static boolean isValidDate(String dateStr) { - // 正则表达式:4位年份 + 2位月份 + 2位日期 String regex = "^\\d{4}(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])$"; return dateStr.matches(regex); } - @Override public List findDaysPath() { - List days = new ArrayList<>(); - List fileNames = readFiles(recordPath); - for (String fileName : fileNames) { - if(isValidDate(fileName)){ + List days = new ArrayList(); + + for(String fileName : FileUtil.readFiles(this.recordPath)) { + if (isValidDate(fileName)) { DayRecord day = new DayRecord(); - day.setRoot(recordPath); + day.setRoot(this.recordPath); day.setDay(fileName); days.add(day); } } + days.sort(Comparator.comparing(DayRecord::getDay)); return days; } - - @Override public List findInDayRecord(DayRecord dayRecord) { - return dayRecord.getStreamRecords(); + return dayRecord.getStreamRecords(); } - @Override public void mergeMp4(FlvRecord flvRecord) { try { - processService.mergeMp4(flvRecord); - }catch (Exception e){ + this.processService.mergeMp4(flvRecord); + } catch (Exception e) { logger.error(e.getMessage()); } + } - - - } diff --git a/src/main/java/com/tuoheng/steam/service/TaskService.java b/src/main/java/com/tuoheng/steam/service/TaskService.java index 5259667..910bf0b 100644 --- a/src/main/java/com/tuoheng/steam/service/TaskService.java +++ b/src/main/java/com/tuoheng/steam/service/TaskService.java @@ -1,3 +1,8 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// + package com.tuoheng.steam.service; import com.alibaba.fastjson2.JSON; @@ -7,6 +12,23 @@ import com.tuoheng.steam.service.innerService.ProcessService; import com.tuoheng.steam.util.FileUtil; import com.tuoheng.steam.util.ProcessManager; import com.tuoheng.steam.util.TimeUtils; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Calendar; +import java.util.Date; +import java.util.Deque; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -16,371 +38,291 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; -import javax.annotation.PostConstruct; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.*; -import java.util.concurrent.*; - @Service @EnableScheduling -public class TaskService implements ITaskService{ - +public class TaskService implements ITaskService { @Value("${srs.targetPath}") private String targetPath; - -// @Value("${ffmpeg}") - private String ffmpeg ="ffmpeg"; - + private String ffmpeg = "ffmpeg"; ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); - - /** - * 清除没有被关闭的进程 - */ - @Scheduled(fixedRate = 60000) - public void cleaTask() { - for (ConcurrentHashMap.Entry entry : runningTasks.entrySet()) { - StreamTask streamTask = entry.getValue(); - if(streamTask.getStartTime().getTime() < new Date().getTime() - 90 * 60 * 1000 ) { - logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask)); - try { - StreamTask s = stopTask(entry.getKey()); - scheduler.schedule(() -> { - File file = new File(targetPath +"/"+ s.getOutFileName()); - try { - logger.info("废弃文件删除成功 {}",file.delete()); - }catch (Exception e) { - logger.error(e.getMessage()); - } - }, 60, TimeUnit.SECONDS); - }catch (Exception ignore) {} - - } - } - } - private static final Logger logger = LoggerFactory.getLogger(TaskService.class); - @Autowired ProcessService processService; - - /** - * 执行中的任务 - */ - ConcurrentHashMap runningTasks = new ConcurrentHashMap<>(); - - /** - * 历史任务 - */ - ConcurrentHashMap> historyTasks = new ConcurrentHashMap<>(); - - /** - * 任务池 - */ + ConcurrentHashMap runningTasks = new ConcurrentHashMap(); + ConcurrentHashMap> historyTasks = new ConcurrentHashMap(); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); + public TaskService() { + } + + @Scheduled( + fixedRate = 60000L + ) + public void cleaTask() { + for(Map.Entry entry : this.runningTasks.entrySet()) { + StreamTask streamTask = (StreamTask)entry.getValue(); + if (streamTask.getStartTime().getTime() < (new Date()).getTime() - 5400000L) { + logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask)); + + try { + StreamTask s = this.stopTask((String)entry.getKey()); + this.scheduler.schedule(() -> { + String var10002 = this.targetPath; + File file = new File(var10002 + "/" + s.getOutFileName()); + + try { + logger.info("废弃文件删除成功 {}", file.delete()); + } catch (Exception e) { + logger.error(e.getMessage()); + } + + }, 60L, TimeUnit.SECONDS); + } catch (Exception var5) { + } + } + } + + } + @PostConstruct public void init() { - - /** - * - */ - scheduler.scheduleWithFixedDelay(new Runnable() { - @Override + this.scheduler.scheduleWithFixedDelay(new Runnable() { public void run() { - runningTasks.forEach((key, value) -> { + TaskService.this.runningTasks.forEach((key, value) -> { Calendar calendar = Calendar.getInstance(); - calendar.add(Calendar.HOUR, -2); + calendar.add(10, -2); Date twoHoursAgo = calendar.getTime(); if (value.getStartTime().before(twoHoursAgo)) { - logger.error("taskId {} 执行超时,手动关闭", key); + TaskService.logger.error("taskId {} 执行超时,手动关闭", key); + try { - stopTask(key); - }catch (Exception ignore) {} - } - }); - - for (Map.Entry> entry : historyTasks.entrySet()) { - - for (StreamTask task : entry.getValue()) { - List streamProcesses = task.getStreamProcesses(); - for (StreamProcess process : streamProcesses) { - process.destroy(); - File file = new File(process.getFileName()); - FileUtil.deleteFile(file); - + TaskService.this.stopTask(key); + } catch (Exception var6) { } } - while (entry.getValue().size()>10){ - entry.getValue().pollFirst(); + }); + + for(Map.Entry> entry : TaskService.this.historyTasks.entrySet()) { + for(StreamTask task : entry.getValue()) { + for(StreamProcess process : task.getStreamProcesses()) { + process.destroy(); + File file = new File(process.getFileName()); + FileUtil.deleteFile(file); + } + } + + while(((Deque)entry.getValue()).size() > 10) { + ((Deque)entry.getValue()).pollFirst(); } } + } - }, 60, 60, TimeUnit.SECONDS); + }, 60L, 60L, TimeUnit.SECONDS); } public String startPic(String streamUrl) { - try { - StreamProcess streamProcess = processService.takePic(streamUrl); + StreamProcess streamProcess = this.processService.takePic(streamUrl); String outFileName = streamProcess.getFileName(); - /* - 注册进程 - */ ProcessManager.registerProcess(streamProcess.getInnerProcessId()); - logger.info("streamUrl {} startPicProcess {} ", streamUrl,streamProcess.getInnerProcessId()); + logger.info("streamUrl {} startPicProcess {} ", streamUrl, streamProcess.getInnerProcessId()); CompletableFuture future = streamProcess.onExit(); future.thenRun(() -> { - logger.info("streamUrl {} startPicProcess {} Over ", streamUrl,streamProcess.getInnerProcessId()); + logger.info("streamUrl {} startPicProcess {} Over ", streamUrl, streamProcess.getInnerProcessId()); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); - }).exceptionally(ex -> { - logger.info("streamUrl {} startPicProcess {} Exceptionally ", streamUrl,streamProcess.getInnerProcessId()); + }).exceptionally((ex) -> { + logger.info("streamUrl {} startPicProcess {} Exceptionally ", streamUrl, streamProcess.getInnerProcessId()); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); return null; }); return outFileName; - }catch (Exception e) { - logger.info("streamUrl {} ", streamUrl,e); + } catch (Exception e) { + logger.info("streamUrl {} ", streamUrl, e); + return null; } - return null; } - public StreamTask startTask(String streamUrl) { - - /* - 当前有任务,则返回当前的任务 - */ - if (runningTasks.containsKey(streamUrl)) { - return runningTasks.get(streamUrl); - } - - /* - 当前无任务,则开启新任务 - */ - StreamTask taskInstance = new StreamTask(streamUrl); - if (runningTasks.putIfAbsent(streamUrl, taskInstance) == null) { - logger.info("streamUrl {} startTask {} ", streamUrl, taskInstance.getTaskId()); - startTask(streamUrl, taskInstance); - // runningTasks.put(streamUrl, taskInstance); - return taskInstance; + public StreamTask startTask(String streamUrl,String transaction) { + if (this.runningTasks.containsKey(streamUrl)) { + return (StreamTask)this.runningTasks.get(streamUrl); } else { - return runningTasks.get(streamUrl); + StreamTask taskInstance = new StreamTask(streamUrl); + taskInstance.setTransaction(transaction); + if (this.runningTasks.putIfAbsent(streamUrl, taskInstance) == null) { + logger.info("streamUrl {} startTask {} transaction {}", streamUrl, taskInstance.getTaskId(), taskInstance.getTransaction()); + this.startTask(streamUrl, taskInstance); + return taskInstance; + } else { + return (StreamTask)this.runningTasks.get(streamUrl); + } } - } - public StreamTask stopTask(String streamUrl) throws Exception { - + public StreamTask stopTask(final String streamUrl) throws Exception { boolean recordSuccess = true; - StreamTask currentStreamTask = runningTasks.remove(streamUrl); - - if (currentStreamTask!= null) { + final StreamTask currentStreamTask = (StreamTask)this.runningTasks.remove(streamUrl); + if (currentStreamTask == null) { + throw new Exception(""); + } else { currentStreamTask.setStopCommandTime(TimeUtils.formatDateToString(new Date())); - if (!CollectionUtils.isEmpty(currentStreamTask.getStreamProcesses())) { - for (StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) { + for(StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) { + if (!(new File(streamProcess.getFileName())).exists()) { + recordSuccess = false; + logger.error("流录制失败: streamUrl {} taskId {} destroy Process {}", new Object[]{streamUrl, currentStreamTask.getTaskId(), streamProcess.getInnerProcessId()}); + } else { + logger.info("streamUrl {} taskId {} destroy Process {}", new Object[]{streamUrl, currentStreamTask.getTaskId(), streamProcess.getInnerProcessId()}); + } streamProcess.destroy(); - - try { - if (streamProcess.getProcess() != null) { - boolean finished = streamProcess.getProcess().waitFor(10, TimeUnit.SECONDS); - if (finished) { - logger.info("进程 {} 已完全结束", streamProcess.getInnerProcessId()); - } else { - logger.warn("进程 {} 超时,强制终止", streamProcess.getInnerProcessId()); - streamProcess.getProcess().destroyForcibly(); - } - } - } catch (InterruptedException e) { - logger.warn("等待进程结束被中断: {}", streamProcess.getInnerProcessId()); - Thread.currentThread().interrupt(); - } - - File file = new File(streamProcess.getFileName()); - if(!file.exists() || file.length() == 0){ - recordSuccess = false; - logger.error("流录制失败: streamUrl {} taskId {} Process {} 文件不存在或为空", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId()); - }else { - logger.info("streamUrl {} taskId {} Process {} 文件有效,大小: {} bytes", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId(), file.length()); - } - } } - if(!recordSuccess){ + if (!recordSuccess) { throw new Exception(""); - } + } else { + try { + String outFileName = UUID.randomUUID().toString() + ".mp4"; + StreamProcess mergeProcess = this.processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses()); + if (Objects.nonNull(mergeProcess)) { + ProcessManager.registerProcess(mergeProcess.getInnerProcessId()); + currentStreamTask.setMergeProcess(mergeProcess); + logger.info("streamUrl {} taskId {} startMergeProcess {} ", new Object[]{streamUrl, currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId()}); + CompletableFuture future = mergeProcess.onExit(); + future.thenRun(() -> { + logger.info("streamUrl {} taskId {} MergeProcess {} Over", new Object[]{streamUrl, currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId()}); + mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date())); + ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); - try { - String outFileName = UUID.randomUUID().toString() + ".mp4"; - StreamProcess mergeProcess = processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses()); + for(StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) { + File file = new File(streamProcess.getFileName()); + FileUtil.deleteFile(file); + } - if(Objects.nonNull(mergeProcess)){ - /** - * 注册进程 - */ - ProcessManager.registerProcess(mergeProcess.getInnerProcessId()); - currentStreamTask.setMergeProcess(mergeProcess); + }).exceptionally((ex) -> { + logger.error("streamUrl {} taskId {} MergeProcess {} exceptionally", new Object[]{streamUrl, currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId(), ex}); + mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date())); + ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); + for(StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) { + File file = new File(streamProcess.getFileName()); + FileUtil.deleteFile(file); + } - logger.info("streamUrl {} taskId {} startMergeProcess {} ", streamUrl, currentStreamTask.getTaskId(), - mergeProcess.getInnerProcessId()); + return null; + }); + currentStreamTask.setOutFileName(outFileName); + } - CompletableFuture future = mergeProcess.onExit(); - future.thenRun(() -> { - logger.info("streamUrl {} taskId {} MergeProcess {} Over", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId()); - mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date())); - ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); - for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){ - File file = new File(streamProcess.getFileName()); - FileUtil.deleteFile(file); - } - }).exceptionally(ex -> { - logger.error("streamUrl {} taskId {} MergeProcess {} exceptionally", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId(), ex); - mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date())); - ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); - for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){ - File file = new File(streamProcess.getFileName()); - FileUtil.deleteFile(file); - } - return null; - }); - currentStreamTask.setOutFileName(outFileName); - } - - - if(Objects.nonNull(currentStreamTask.getMergeProcess())){ - if(Objects.nonNull(currentStreamTask.getMergeProcess().getFileName()) && - !currentStreamTask.getMergeProcess().getFileName().isEmpty()){ + if (Objects.nonNull(currentStreamTask.getMergeProcess()) && Objects.nonNull(currentStreamTask.getMergeProcess().getFileName()) && !currentStreamTask.getMergeProcess().getFileName().isEmpty()) { try { - executor.schedule(new Runnable() { - @Override + this.executor.schedule(new Runnable() { public void run() { try { - logger.info("FileName :"+currentStreamTask.getMergeProcess().getFileName()); - Process process = Runtime.getRuntime().exec(ffmpeg + " -i " + - currentStreamTask.getMergeProcess().getFileName()); + TaskService.logger.info("FileName :" + currentStreamTask.getMergeProcess().getFileName()); + Runtime var10000 = Runtime.getRuntime(); + String var10001 = TaskService.this.ffmpeg; + Process process = var10000.exec(var10001 + " -i " + currentStreamTask.getMergeProcess().getFileName()); BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream())); + String line; - while ((line = reader.readLine()) != null) { - logger.info("Duration line: " + line); + while((line = reader.readLine()) != null) { + TaskService.logger.info("Duration line: " + line); if (line.contains("Duration")) { String duration = line.split("Duration: ")[1].split(",")[0]; - logger.info("Duration: " + duration.substring(0, 8)); + TaskService.logger.info("Duration: " + duration.substring(0, 8)); currentStreamTask.setDuration(duration.substring(0, 8)); - logger.info("-----------------放入缓存-----------------"); - /** - * 放入缓存 - */ - if( historyTasks.containsKey(streamUrl)){ - historyTasks.get(streamUrl).add(currentStreamTask); - }else { - historyTasks.put(streamUrl, new LinkedList<>()); - historyTasks.get(streamUrl).offerLast(currentStreamTask); + TaskService.logger.info("-----------------放入缓存-----------------"); + if (TaskService.this.historyTasks.containsKey(streamUrl)) { + ((Deque)TaskService.this.historyTasks.get(streamUrl)).add(currentStreamTask); + } else { + TaskService.this.historyTasks.put(streamUrl, new LinkedList()); + ((Deque)TaskService.this.historyTasks.get(streamUrl)).offerLast(currentStreamTask); } break; } } - reader.close(); - }catch (Exception e){ - logger.info(e.getMessage()); - } - } - },40, TimeUnit.SECONDS); + reader.close(); + } catch (Exception e) { + TaskService.logger.info(e.getMessage()); + } + + } + }, 40L, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } } + return currentStreamTask; + } catch (Exception e) { + logger.info("taskId {} Stop exceptionally", streamUrl, e); + return null; } - - return currentStreamTask; - } catch (Exception e) { - logger.info("taskId {} Stop exceptionally", streamUrl,e); - return null; } - - }else { - throw new Exception(""); -// if(historyTasks.containsKey(streamUrl)){ -// return historyTasks.get(streamUrl).peekLast(); -// }else { -// return null; -// } } } public StreamTask getLastTask(String streamUrl) { - StreamTask current = runningTasks.get(streamUrl); - if(Objects.nonNull(current)){ + StreamTask current = (StreamTask)this.runningTasks.get(streamUrl); + if (Objects.nonNull(current)) { return current; - }else { - if(historyTasks.containsKey(streamUrl)){ - return historyTasks.get(streamUrl).peekLast(); - } - return null; + } else { + return this.historyTasks.containsKey(streamUrl) ? (StreamTask)((Deque)this.historyTasks.get(streamUrl)).peekLast() : null; } } private void startTask(String streamUrl, StreamTask streamTask) { - try { - /* - 任务不存在了,直接退出 - */ if (streamTask.getStopCommandTime() != null) { logger.info("streamUrl {} Task {} Stopped", streamUrl, streamTask.getTaskId()); return; } - StreamProcess streamProcess = processService.recordStream(streamUrl); - /** - * 注册进程 - */ + StreamProcess streamProcess = this.processService.recordStream(streamUrl); ProcessManager.registerProcess(streamProcess.getInnerProcessId()); streamTask.getStreamProcesses().add(streamProcess); - - logger.info("streamUrl {} taskId {} startProcess {} ", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId()); - + logger.info("streamUrl {} taskId {} startProcess {} ", new Object[]{streamUrl, streamTask.getTaskId(), streamProcess.getInnerProcessId()}); CompletableFuture future = streamProcess.onExit(); future.thenRun(() -> { - logger.info("streamUrl {} taskId {} Process {} Over", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId()); + logger.info("streamUrl {} taskId {} Process {} Over", new Object[]{streamUrl, streamTask.getTaskId(), streamProcess.getInnerProcessId()}); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); + try { Thread.sleep(2000L); - } catch (Exception ignored){} - if(!new File(streamProcess.getFileName()).exists()){ + } catch (Exception var5) { + } + + if (!(new File(streamProcess.getFileName())).exists()) { streamTask.getStreamProcesses().remove(streamProcess); } - startTask(streamUrl, streamTask); - }).exceptionally(ex -> { - logger.info("streamUrl {} taskId {} Process {} Exceptionally", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId(),ex); + + this.startTask(streamUrl, streamTask); + }).exceptionally((ex) -> { + logger.info("streamUrl {} taskId {} Process {} Exceptionally", new Object[]{streamUrl, streamTask.getTaskId(), streamProcess.getInnerProcessId(), ex}); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); - startTask(streamUrl, streamTask); + this.startTask(streamUrl, streamTask); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); return null; }); - } catch (IOException e) { - logger.info("streamUrl {} taskId {} startProcess Exception", streamUrl,streamTask.getTaskId(),e); + logger.info("streamUrl {} taskId {} startProcess Exception", new Object[]{streamUrl, streamTask.getTaskId(), e}); } } - - public void stopAllTask(){ - runningTasks.forEach((key, value) -> { + public void stopAllTask() { + this.runningTasks.forEach((key, value) -> { try { - stopTask(key); - }catch (Exception ignore){} + this.stopTask(key); + } catch (Exception var4) { + } + }); } } diff --git a/src/main/java/com/tuoheng/steam/service/innerService/ProcessService.java b/src/main/java/com/tuoheng/steam/service/innerService/ProcessService.java index 17c9ff1..a529692 100644 --- a/src/main/java/com/tuoheng/steam/service/innerService/ProcessService.java +++ b/src/main/java/com/tuoheng/steam/service/innerService/ProcessService.java @@ -1,314 +1,208 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// + package com.tuoheng.steam.service.innerService; -import com.tuoheng.steam.dos.StreamProcess; import com.tuoheng.steam.dos.ProcessType; +import com.tuoheng.steam.dos.StreamProcess; import com.tuoheng.steam.service.dos.FlvRecord; import com.tuoheng.steam.util.FileUtil; -import com.tuoheng.steam.util.ProcessManager; -import com.tuoheng.steam.util.TimeUtils; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; - @Service public class ProcessService { - @Value("${srs.splitPath}") private String splitPath; - -// @Value("${ffmpeg}") private String ffmpeg = "ffmpeg"; - @Value("${srs.targetPath}") private String targetPath; - @Value("${recordPath}") private String recordPath; - private static final Logger logger = LoggerFactory.getLogger(ProcessService.class); - - // 缓存 FFmpeg 功能支持状态 - private Boolean ffmpegSupportsConcat = null; - - /** - * 任务池 - */ ExecutorService loggingService = Executors.newCachedThreadPool(); - public void mergeMp4(FlvRecord flvRecord) throws IOException, ExecutionException, InterruptedException { - - String command = ffmpeg + " -i "+ recordPath+ File.separator + - flvRecord.getStream().getDayRecord().getDay() + File.separator + - flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv() + " -c:v copy -c:a aac " + recordPath+ File.separator + - flvRecord.getStream().getDayRecord().getDay() + File.separator + - flvRecord.getStream().getStreamId() + File.separator +flvRecord.getStartTime() + ".mp4"; + public ProcessService() { + } + public void mergeMp4(final FlvRecord flvRecord) throws IOException, ExecutionException, InterruptedException { + String var10000 = this.ffmpeg; + final String command = var10000 + " -i " + this.recordPath + File.separator + flvRecord.getStream().getDayRecord().getDay() + File.separator + flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv() + " -c:v copy -c:a aac " + this.recordPath + File.separator + flvRecord.getStream().getDayRecord().getDay() + File.separator + flvRecord.getStream().getStreamId() + File.separator + flvRecord.getStartTime() + ".mp4"; ProcessBuilder pb = new ProcessBuilder(command.split(" ")); pb.redirectErrorStream(true); - Process process = pb.start(); - - // 使用原子布尔值来标记进程是否被终止 - AtomicBoolean processTerminated = new AtomicBoolean(false); - - loggingService.execute(new Runnable() { - @Override + final Process process = pb.start(); + this.loggingService.execute(new Runnable() { public void run() { - logger.info("mergeMp4 process Start {}",command); - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = reader.readLine()) != null) { - logger.info("mergeMp4-------- {}",line); + ProcessService.logger.info("mergeMp4 process Start {}", command); + + String line; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + while((line = reader.readLine()) != null) { + ProcessService.logger.info("mergeMp4-------- {}", line); } } catch (IOException e) { - logger.info("mergeMp4-------- Over",e); - + ProcessService.logger.info("mergeMp4-------- Over", e); } - logger.info("mergeMp4 Over"); + + ProcessService.logger.info("mergeMp4 Over"); + String var10002 = ProcessService.this.recordPath; + File delete = new File(var10002 + File.separator + flvRecord.getStream().getDayRecord().getDay() + File.separator + flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv()); + FileUtil.deleteFile(delete); } }); - CompletableFuture future = process.onExit(); - // 阻塞等待进程结束,增加超时时间到5分钟 Process completedProcess = null; try { - completedProcess = future.get(5, TimeUnit.MINUTES); - logger.info("mergeMp4正常完成-------- Over"); - }catch (TimeoutException e) { - String fileName = recordPath+ File.separator + - flvRecord.getStream().getDayRecord().getDay() + File.separator + - flvRecord.getStream().getStreamId() + File.separator +flvRecord.getStartTime() + ".mp4"; + completedProcess = (Process)future.get(4L, TimeUnit.SECONDS); + logger.info("mergeMp4正常完成-------- Over"); + } catch (Exception e) { + var10000 = this.recordPath; + String fileName = var10000 + File.separator + flvRecord.getStream().getDayRecord().getDay() + File.separator + flvRecord.getStream().getStreamId() + File.separator + flvRecord.getStartTime() + ".mp4"; File file = new File(fileName); - - logger.warn("mergeMp4超时5分钟,检查输出文件是否存在: {}", fileName); - - if(file.exists() && file.length() > 0){ - logger.info("mergeMp4超时但文件已生成,标记为完成-------- Over"); + if (file.exists()) { + logger.info("mergeMp4超时4S完成-------- Over", e); process.destroy(); } else { - logger.error("mergeMp4超时且文件未生成,强制终止进程-------- Over"); - process.destroyForcibly(); + try { + completedProcess = (Process)future.get(4L, TimeUnit.SECONDS); + } catch (Exception var11) { + if (file.exists()) { + logger.info("mergeMp4超时8S完成-------- Over", e); + process.destroy(); + } else { + logger.info("mergeMp4超时8S未完成-------- Over", e); + process.destroy(); + } + } } - } catch (Exception e) { - logger.error("mergeMp4执行异常", e); - processTerminated.set(true); - process.destroyForcibly(); } - // 删除源文件的逻辑 - String sourceFlvPath = recordPath+ File.separator + - flvRecord.getStream().getDayRecord().getDay() + File.separator + - flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv(); - File sourceFlvFile = new File(sourceFlvPath); - - if(Objects.nonNull(completedProcess)) { - // 检查进程是否成功结束 + if (Objects.nonNull(completedProcess)) { if (completedProcess.exitValue() == 0) { logger.info("进程成功结束!"); - // 正常完成时删除源文件 - if (sourceFlvFile.exists()) { - FileUtil.deleteFile(sourceFlvFile); - logger.info("转换成功,已删除源文件: {}", sourceFlvPath); - } } else { - logger.info("进程失败,退出码 {} " ,completedProcess.exitValue()); - logger.info("转换失败,保留源文件: {}", sourceFlvPath); - } - } else { - // 超时情况下的处理 - String fileName = recordPath+ File.separator + - flvRecord.getStream().getDayRecord().getDay() + File.separator + - flvRecord.getStream().getStreamId() + File.separator +flvRecord.getStartTime() + ".mp4"; - File file = new File(fileName); - - if(file.exists() && file.length() > 0){ - logger.info("mergeMp4超时但文件已生成,标记为完成-------- Over"); - if (sourceFlvFile.exists()) { - FileUtil.deleteFile(sourceFlvFile); - logger.info("超时但转换成功,已删除源文件: {}", sourceFlvPath); - } - } else { - logger.error("mergeMp4超时且文件未生成,强制终止进程-------- Over"); - logger.info("超时且转换失败,保留源文件: {}", sourceFlvPath); + logger.info("进程失败,退出码 {} ", completedProcess.exitValue()); } } } - public StreamProcess takePic(String streamUrl) throws IOException { - String outFileName = targetPath + "/" +UUID.randomUUID().toString() +".jpg"; - String command = String.format( - ffmpeg+ " -i %s -vf fps=1 -frames:v 1 -q:v 2 %s", - streamUrl, outFileName); - -// -rw_timeout 5 - - List listCommand = new ArrayList<>(); - listCommand.add(ffmpeg); // ffmpeg 路径 -// listCommand.add("-timeout"); -// listCommand.add("3000000"); -// listCommand.add("-rw_timeout"); -// listCommand.add("5000000"); + String var10000 = this.targetPath; + String outFileName = var10000 + "/" + UUID.randomUUID().toString() + ".jpg"; + String command = String.format(this.ffmpeg + " -i %s -vf fps=1 -frames:v 1 -q:v 2 %s", streamUrl, outFileName); + List listCommand = new ArrayList(); + listCommand.add(this.ffmpeg); listCommand.add("-i"); - // listCommand.add(" -timeout 3000000 -rw_timeout 5000000 -i "); - listCommand.add(streamUrl); // 流地址 + listCommand.add(streamUrl); listCommand.add("-vf"); listCommand.add("fps=1"); listCommand.add("-frames:v"); listCommand.add("1"); listCommand.add("-q:v"); listCommand.add("2"); - listCommand.add(outFileName); // 输出文件名 + listCommand.add(outFileName); logger.info("takePic {}", listCommand); - ProcessBuilder pb = new ProcessBuilder(listCommand); pb.redirectErrorStream(true); - Process process = pb.start(); - try { - if (!process.waitFor(30, TimeUnit.SECONDS)) { - process.destroyForcibly(); - // 处理超时逻辑 - logger.info("takePic {} 超时", listCommand); - } - }catch (Exception ignore) { - } - - loggingService.execute(new Runnable() { - @Override + final Process process = pb.start(); + this.loggingService.execute(new Runnable() { public void run() { - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = reader.readLine()) != null) { - logger.info("recordStream-------- {}",line); + String line; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + while((line = reader.readLine()) != null) { + ProcessService.logger.info("recordStream-------- {}", line); } - } catch (IOException e) { - logger.info("takePic-------- Over"); + } catch (IOException var6) { + ProcessService.logger.info("takePic-------- Over"); } + } }); - return new StreamProcess(process,outFileName, ProcessType.PIC); + return new StreamProcess(process, outFileName, ProcessType.PIC); } public StreamProcess recordStream(String streamUrl) throws IOException { - - String recordFileName = splitPath+'/'+UUID.randomUUID().toString() +".ts"; - /** - * 如果streamUrl末尾有个_,在实际录制的时候需要将_去除 - */ - String command = String.format( - ffmpeg+ " -i %s -c copy -f mpegts -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 2 %s", - Optional.of(streamUrl) - .filter(s -> s.endsWith("_")) - .map(s -> s.substring(0, s.length() - 1)) - .orElse(streamUrl), recordFileName); + String var10000 = this.splitPath; + String recordFileName = var10000 + "/" + UUID.randomUUID().toString() + ".ts"; + String command = String.format(this.ffmpeg + " -i %s -c copy -f mpegts -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 2 %s", Optional.of(streamUrl).filter((s) -> s.endsWith("_")).map((s) -> s.substring(0, s.length() - 1)).orElse(streamUrl), recordFileName); logger.info("recordStream {}", command); ProcessBuilder pb = new ProcessBuilder(command.split(" ")); pb.redirectErrorStream(true); - Process process = pb.start(); - - loggingService.execute(new Runnable() { - @Override + final Process process = pb.start(); + this.loggingService.execute(new Runnable() { public void run() { - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = reader.readLine()) != null) { -// logger.info("recordStream-------- {}",line); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + while(reader.readLine() != null) { } - } catch (IOException e) { - logger.info("recordStream-------- Over"); + } catch (IOException var6) { + ProcessService.logger.info("recordStream-------- Over"); } + } }); - - return new StreamProcess(process,recordFileName, ProcessType.RECORD); - - }; + return new StreamProcess(process, recordFileName, ProcessType.RECORD); + } public StreamProcess mergeStream(String outFileName, List streamProcesses) throws IOException { + List fileList = new ArrayList(); - List fileList = new ArrayList<>(); - for(StreamProcess streamProcess : streamProcesses){ -// File newFile = new File(streamProcess.getFileName()); -// if(newFile.exists()){ + for(StreamProcess streamProcess : streamProcesses) { + File newFile = new File(streamProcess.getFileName()); + if (newFile.exists()) { fileList.add(streamProcess.getFileName()); -// } - } - - - if(fileList.isEmpty()){ - return null; - } - String command = ""; - outFileName = targetPath + "/" +outFileName; - if(fileList.size() == 1){ - command = String.format( - ffmpeg+ " -i %s -c copy %s", - fileList.get(0), outFileName); - }else { - // 使用 concat demuxer 来更好地处理多个 ts 文件 - String concatFile = targetPath + "/concat_" + UUID.randomUUID().toString() + ".txt"; - try (PrintWriter writer = new PrintWriter(new FileWriter(concatFile))) { - for (String fileName : fileList) { - writer.println("file '" + fileName + "'"); - } } - - command = String.format( - ffmpeg+ " -f concat -safe 0 -i %s -c copy %s", - concatFile, outFileName); - - // 合并完成后删除临时 concat 文件 - final String finalConcatFile = concatFile; - loggingService.execute(() -> { - try { - Thread.sleep(5000); // 等待5秒确保合并完成 - new File(finalConcatFile).delete(); - } catch (Exception e) { - logger.warn("删除临时 concat 文件失败: {}", finalConcatFile); + } + + if (fileList.isEmpty()) { + return null; + } else { + String command = ""; + outFileName = this.targetPath + "/" + outFileName; + if (fileList.size() == 1) { + command = String.format(this.ffmpeg + " -i %s -c copy %s", fileList.get(0), outFileName); + } else { + String filePaths = String.join("|", fileList); + command = String.format(this.ffmpeg + " -i \"concat:%s\" -c copy %s", filePaths, outFileName); + } + + logger.info("mergeStream {}", command); + ProcessBuilder pb = new ProcessBuilder(command.split(" ")); + pb.redirectErrorStream(true); + final Process process = pb.start(); + this.loggingService.execute(new Runnable() { + public void run() { + String line; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + while((line = reader.readLine()) != null) { + ProcessService.logger.info("mergeStream-------- {}", line); + } + } catch (IOException var6) { + ProcessService.logger.info("mergeStream-------- Over"); + } + } }); + return new StreamProcess(process, outFileName, ProcessType.MERGE); } - logger.info("mergeStream {}", command); - ProcessBuilder pb = new ProcessBuilder(command.split(" ")); - pb.redirectErrorStream(true); - Process process = pb.start(); - - loggingService.execute(new Runnable() { - @Override - public void run() { - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = reader.readLine()) != null) { - logger.info("mergeStream-------- {}",line); - } - } catch (IOException e) { - logger.info("mergeStream-------- Over"); - } - } - }); - - - return new StreamProcess(process,outFileName,ProcessType.MERGE); - - }; - - + } } diff --git a/src/main/java/com/tuoheng/steam/util/FileUtil.java b/src/main/java/com/tuoheng/steam/util/FileUtil.java index c74c11c..61435f2 100644 --- a/src/main/java/com/tuoheng/steam/util/FileUtil.java +++ b/src/main/java/com/tuoheng/steam/util/FileUtil.java @@ -1,100 +1,95 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// + package com.tuoheng.steam.util; -import com.tuoheng.steam.controller.StreamRecordController; -import com.tuoheng.steam.service.dos.DayRecord; import com.tuoheng.steam.service.dos.StreamType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.LinkOption; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FileUtil { - private static final Logger logger = LoggerFactory.getLogger(FileUtil.class); + private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); - private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); + public FileUtil() { + } public static void deleteFile(File file) { - if(file.exists()) { + if (file.exists()) { file.delete(); } + } public static void deleteFolder(String stringPath) { - try { Path path = Paths.get(stringPath); - if (Files.exists(path)) { + if (Files.exists(path, new LinkOption[0])) { try (Stream pathStream = Files.walk(path)) { - pathStream.sorted((p1, p2) -> -p1.compareTo(p2)) // 从子文件/文件夹开始删除 - .forEach(p -> { - try { - Files.delete(p); // 删除文件或文件夹 - } catch (IOException e) { - System.err.println("无法删除: " + p + ", 原因: " + e.getMessage()); - } - }); + pathStream.sorted((p1, p2) -> -p1.compareTo(p2)).forEach((p) -> { + try { + Files.delete(p); + } catch (IOException e) { + System.err.println("无法删除: " + p + ", 原因: " + e.getMessage()); + } + + }); } } else { logger.info("文件夹不存在: " + path); } - }catch (Exception e) { - logger.error("deleteFolder",e); + } catch (Exception e) { + logger.error("deleteFolder", e); } } public static List readFiles(String stringPath) { - - List list = new ArrayList<>(); - + List list = new ArrayList(); File directory = new File(stringPath); - - // 检查目录是否存在且是一个目录 if (directory.exists() && directory.isDirectory()) { - // 获取目录下的所有子文件和文件夹 File[] files = directory.listFiles(); - if (files != null) { - for (File file : files) { + for(File file : files) { list.add(file.getName()); } } } + return list; } - public static boolean isTempFile(String fileName) { + public static boolean isTempFile(String fileName) { String[] result = fileName.split("\\."); return result.length == 3; } - public static boolean isFlv(String fileName) { + public static boolean isFlv(String fileName) { return fileName.endsWith("flv"); } - public static boolean isMp4(String fileName) { + public static boolean isMp4(String fileName) { return fileName.endsWith("mp4"); } public static StreamType getStreamType(String fileName) { - if(fileName.endsWith("outer")){ + if (fileName.endsWith("outer")) { return StreamType.Outer; - }else if(fileName.endsWith("inner")){ - return StreamType.Inner; - }else{ - return StreamType.LiveVideo; + } else { + return fileName.endsWith("inner") ? StreamType.Inner : StreamType.LiveVideo; } } - } diff --git a/src/main/java/com/tuoheng/steam/util/ProcessManager.java b/src/main/java/com/tuoheng/steam/util/ProcessManager.java index 7585880..3bf0335 100644 --- a/src/main/java/com/tuoheng/steam/util/ProcessManager.java +++ b/src/main/java/com/tuoheng/steam/util/ProcessManager.java @@ -1,110 +1,127 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// + package com.tuoheng.steam.util; -import com.tuoheng.steam.controller.StreamRecordController; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; -import java.io.*; -import java.util.*; - @Service public class ProcessManager { - private static final Logger logger = LoggerFactory.getLogger(ProcessManager.class); - private static final String DELIMITER = "="; + static Map runningProcessIds = new HashMap(); + static String pidPath = "pid.txt"; - static Map runningProcessIds = new HashMap<>(); + public ProcessManager() { + } - static String pidPath ="pid.txt"; - - static { - runningProcessIds = loadFromFile(pidPath); - for(Map.Entry entry : runningProcessIds.entrySet()){ - try { - killProcessByPID(entry.getKey()); - }catch (Exception e){ - e.printStackTrace(); - } - } - runningProcessIds.clear(); + public static void registerProcess(Long process) { + runningProcessIds.put(process, TimeUtils.formatDateToString(new Date())); writeToFile(runningProcessIds); } - public static void registerProcess(Long process) { - runningProcessIds.put(process,TimeUtils.formatDateToString(new Date())); - writeToFile(runningProcessIds); - } - - public static void unRegisterProcess(Long process) { + public static void unRegisterProcess(Long process) { runningProcessIds.remove(process); writeToFile(runningProcessIds); } - public static void writeToFile(Map map){ + public static void writeToFile(Map map) { try (BufferedWriter writer = new BufferedWriter(new FileWriter(pidPath))) { - for (Map.Entry entry : map.entrySet()) { - writer.write(entry.getKey() + DELIMITER + entry.getValue()); + for(Map.Entry entry : map.entrySet()) { + Object var10001 = entry.getKey(); + writer.write(var10001 + "=" + (String)entry.getValue()); writer.newLine(); } - }catch (IOException ignored){} + } catch (IOException var6) { + } + } - public static void appendToFile(Long pid,String strDate) { + public static void appendToFile(Long pid, String strDate) { + try (BufferedWriter writer = new BufferedWriter(new FileWriter(pidPath, true))) { + writer.write(pid + "=" + strDate); + writer.newLine(); + } catch (IOException var7) { + } - try (BufferedWriter writer = new BufferedWriter(new FileWriter(pidPath,true))) { - writer.write(pid+ DELIMITER + strDate); - writer.newLine(); - }catch (IOException ignored) {} } - public static Map loadFromFile(String filePath){ - Map map = new LinkedHashMap<>(); + public static Map loadFromFile(String filePath) { + Map map = new LinkedHashMap(); File file = new File(filePath); + try { - if(!file.exists()){ + if (!file.exists()) { file.createNewFile(); } - }catch (Exception e){ + } catch (Exception e) { e.printStackTrace(); } - try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { + + try { String line; - while ((line = reader.readLine()) != null) { - String[] parts = line.split(ProcessManager.DELIMITER, 2); // Split into key and value - if (parts.length == 2) { - Long key = Long.parseLong(parts[0].trim()); - String value = parts[1].trim(); - map.put(key, value); + try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { + while((line = reader.readLine()) != null) { + String[] parts = line.split("=", 2); + if (parts.length == 2) { + Long key = Long.parseLong(parts[0].trim()); + String value = parts[1].trim(); + map.put(key, value); + } } } + + return map; } catch (IOException e) { throw new RuntimeException(e); } - return map; } public static void killProcessByPID(long pid) throws IOException, InterruptedException { String os = System.getProperty("os.name").toLowerCase(); ProcessBuilder processBuilder; - if (os.contains("win")) { - // Windows: Use taskkill command - processBuilder = new ProcessBuilder("cmd", "/c", "taskkill /PID " + pid + " /F"); + processBuilder = new ProcessBuilder(new String[]{"cmd", "/c", "taskkill /PID " + pid + " /F"}); } else { - // Linux/Unix/macOS: Use kill command - processBuilder = new ProcessBuilder("bash", "-c", "kill -9 " + pid); + processBuilder = new ProcessBuilder(new String[]{"bash", "-c", "kill -9 " + pid}); } Process process = processBuilder.start(); int exitCode = process.waitFor(); - if (exitCode == 0) { logger.info("Process with PID " + pid + " terminated successfully."); } else { logger.error("Failed to terminate process with PID " + pid + ". Exit code: " + exitCode); } + } + static { + runningProcessIds = loadFromFile(pidPath); + + for(Map.Entry entry : runningProcessIds.entrySet()) { + try { + killProcessByPID((Long)entry.getKey()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + runningProcessIds.clear(); + writeToFile(runningProcessIds); + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 9852e9d..b8727be 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,26 +1,22 @@ #公司环境 -spring.application.name=stream_server -server.port = 9011 -srs.splitPath=/data/java/srs/stream_server/temp -srs.targetPath=/data/java/srs/srs/trunk/objs/nginx/html/recording -ffmpeg=ffmpeg -recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record -livedates=1 -cangneiwai=true -srs.domain="stream.t-aaron.com" -srs.name="stream.t-aaron.com" -# -#大数据局 #spring.application.name=stream_server -#server.port = 8989 +#server.port = 9011 #srs.splitPath=/data/java/srs/stream_server/temp #srs.targetPath=/data/java/srs/srs/trunk/objs/nginx/html/recording #ffmpeg=ffmpeg #recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record #livedates=8 -#cangneiwai=true -#srs.domain = "" -#srs.name = "" +#cangneiwai=false + +#大数据局 +spring.application.name=stream_server +server.port = 8989 +srs.splitPath=/data/java/srs/stream_server/temp +srs.targetPath=/data/java/srs/srs/trunk/objs/nginx/html/recording +ffmpeg=ffmpeg +recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record +livedates=8 +cangneiwai=true #本地测试 #spring.application.name=stream_server @@ -30,23 +26,4 @@ srs.name="stream.t-aaron.com" #ffmpeg=ffmpeg #recordPath=/Users/sunpeng/workspace/stream/record #livedates=7 -#cangneiwai=false -#srs.domain= -#srs.name= - -##容器化部署 -#通过注入 -#srs.domain = STREAM.t-aaron.com -##通过注入 -#srs.name = STREAM -#spring.application.name=stream_server -#server.port = 8080 -##零时文件 -#srs.splitPath=/data/temp -##拍照 + 录像 -#srs.targetPath=/data/recording -#ffmpeg=ffmpeg -## -#recordPath=/data/record -#livedates=8 -#cangneiwai=true \ No newline at end of file +#cangneiwai=false \ No newline at end of file