Compare commits

...

10 Commits

Author SHA1 Message Date
孙小云 a24d08b0c2 修改了代码 2025-12-27 13:04:33 +08:00
孙小云 0429afb3ad 修改了代码 2025-12-25 13:29:18 +08:00
孙小云 0ba76855f0 dashuju 2025-12-25 13:08:13 +08:00
孙小云 15ce14f117 添加定时删除功能 2025-12-24 15:16:08 +08:00
孙小云 71919393ef 添加定时删除功能 2025-08-04 15:24:12 +08:00
孙小云 240b87ab35 添加定时删除功能 2025-08-04 15:23:23 +08:00
孙小云 5eb7fcdc90 添加定时删除功能 2025-08-04 14:24:52 +08:00
孙小云 e996f1a34f 添加定时删除功能 2025-08-04 13:48:33 +08:00
孙小云 06f6ca0591 添加定时删除功能 2025-08-04 13:46:05 +08:00
孙小云 f16a17d918 添加定时删除功能 2025-07-19 13:18:35 +08:00
11 changed files with 713 additions and 735 deletions

View File

@ -1,5 +1,9 @@
package com.tuoheng.steam.controller; //
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.tuoheng.steam.controller;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.tuoheng.steam.controller.dto.PageInfo; import com.tuoheng.steam.controller.dto.PageInfo;
@ -12,162 +16,188 @@ import com.tuoheng.steam.service.ITaskService;
import com.tuoheng.steam.service.dos.DayRecord; import com.tuoheng.steam.service.dos.DayRecord;
import com.tuoheng.steam.service.dos.Mp4Record; import com.tuoheng.steam.service.dos.Mp4Record;
import com.tuoheng.steam.service.dos.StreamRecord; import com.tuoheng.steam.service.dos.StreamRecord;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.logging.log4j.util.Strings; import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.File; @RestController
import java.util.*; @RequestMapping({"/record"})
import java.util.stream.Collectors;
@RestController()
@RequestMapping("/record")
public class StreamRecordController { public class StreamRecordController {
private static final Logger logger = LoggerFactory.getLogger(StreamRecordController.class); private static final Logger logger = LoggerFactory.getLogger(StreamRecordController.class);
@Autowired @Autowired
ITaskService taskService; ITaskService taskService;
@Autowired @Autowired
IRecordService iRecordService; IRecordService iRecordService;
public StreamRecordController() {
}
@GetMapping("pic") @GetMapping({"pic"})
public Response<String> startPic(@RequestParam String streamUrl) { public Response<String> startPic(@RequestParam String streamUrl) {
logger.info("启动视频拍照 :"+streamUrl); logger.info("启动视频拍照 :" + streamUrl);
streamUrl = streamSwitch(streamUrl); streamUrl = this.streamSwitch(streamUrl);
logger.info("启动视频拍照_ :"+streamUrl); logger.info("启动视频拍照_ :" + streamUrl);
if(Objects.isNull(streamUrl)) { if (Objects.isNull(streamUrl)) {
return Response.fail(-1); return Response.fail(-1);
} } else {
String outfile = taskService.startPic(streamUrl); String outfile = this.taskService.startPic(streamUrl);
if(Objects.nonNull(outfile)) { if (Objects.nonNull(outfile)) {
int lastSlashIndex = outfile.lastIndexOf("/"); int lastSlashIndex = outfile.lastIndexOf("/");
String fileName = outfile.substring(lastSlashIndex + 1); String fileName = outfile.substring(lastSlashIndex + 1);
return Response.success(fileName); return Response.success(fileName);
}else { } else {
return Response.fail(-1); return Response.fail(-1);
}
}
@GetMapping("start")
public Response<StreamTask> startRecording(@RequestParam String streamUrl) {
logger.info("启动录制 :"+streamUrl);
streamUrl = streamSwitch(streamUrl);
logger.info("启动录制_ :"+streamUrl);
if(Objects.isNull(streamUrl)) {
return Response.fail(-1);
}
Response<StreamTask> response = Response.success(taskService.startTask(streamUrl));
logger.info("启动录制返回 :"+ JSON.toJSONString(response));
return response;
}
@GetMapping("stop")
public Response<StreamTask> stopRecording(@RequestParam String streamUrl){
logger.info("关闭录制 :"+streamUrl);
streamUrl = streamSwitch(streamUrl);
logger.info("关闭录制_ :"+streamUrl);
if(Objects.isNull(streamUrl)) {
return Response.fail(-1);
}
try {
Response<StreamTask> response = Response.success(taskService.stopTask(streamUrl));
if(Objects.isNull(response.getData()) || Objects.isNull(response.getData().getOutFileName()) || response.getData().getOutFileName().isEmpty()){
response.setCode(500);
} }
logger.info("关闭录制返回 :"+ JSON.toJSONString(response));
return response;
}catch (Exception e){
return Response.fail(-1);
} }
} }
@GetMapping("info") @GetMapping({"start"})
public Response<StreamTask> getLastTask(@RequestParam String streamUrl){ public Response<StreamTask> startRecording(@RequestParam String streamUrl) {
logger.info("查看录制 :"+streamUrl); logger.info("启动录制 :" + streamUrl);
streamUrl = streamSwitch(streamUrl); streamUrl = this.streamSwitch(streamUrl);
logger.info("查看录制_ :"+streamUrl); logger.info("启动录制_ :" + streamUrl);
if(Objects.isNull(streamUrl)) { if (Objects.isNull(streamUrl)) {
return Response.fail(-1); return Response.fail(-1);
} else {
Response<StreamTask> response = Response.success(this.taskService.startTask(streamUrl));
logger.info("启动录制返回 :" + JSON.toJSONString(response));
return response;
} }
Response<StreamTask> response = Response.success(taskService.getLastTask(streamUrl));
logger.info("查看录制返回 :"+ JSON.toJSONString(response));
return response;
} }
@GetMapping({"stop"})
public Response<StreamTask> stopRecording(@RequestParam String streamUrl) {
logger.info("关闭录制 :" + streamUrl);
streamUrl = this.streamSwitch(streamUrl);
logger.info("关闭录制_ :" + streamUrl);
if (Objects.isNull(streamUrl)) {
return Response.fail(-1);
} else {
try {
Response<StreamTask> response = Response.success(this.taskService.stopTask(streamUrl));
if (Objects.isNull(response.getData()) || Objects.isNull(((StreamTask)response.getData()).getOutFileName()) || ((StreamTask)response.getData()).getOutFileName().isEmpty()) {
response.setCode(500);
}
/** logger.info("关闭录制返回 :" + JSON.toJSONString(response));
* 废弃 return response;
* @param request } catch (Exception var3) {
* @return return Response.fail(-1);
*/ }
@PostMapping("search") }
public Response<PageInfo<Mp4Info>> streamView(@RequestBody PageStreamRequest request){ }
logger.info("查看录制 search :"+JSON.toJSONString(request));
if(Objects.isNull(request.getPageIndex()) || Objects.isNull(request.getPageSize()) @GetMapping({"info"})
|| request.getPageIndex() <0 || request.getPageSize() <=0 ){ public Response<StreamTask> getLastTask(@RequestParam String streamUrl) {
logger.info("查看录制 :" + streamUrl);
streamUrl = this.streamSwitch(streamUrl);
logger.info("查看录制_ :" + streamUrl);
if (Objects.isNull(streamUrl)) {
return Response.fail(-1);
} else {
Response<StreamTask> response = Response.success(this.taskService.getLastTask(streamUrl));
logger.info("查看录制返回 :" + JSON.toJSONString(response));
return response;
}
}
@GetMapping({"infoList"})
public Response<List<StreamTask>> infoList(@RequestParam String streamUrl) {
logger.info("查看录制列表 :" + streamUrl);
streamUrl = this.streamSwitch(streamUrl);
logger.info("查看录制列表_ :" + streamUrl);
if (Objects.isNull(streamUrl)) {
return Response.fail(-1);
} else {
Response<List<StreamTask>> response = Response.success(this.taskService.getTaskList(streamUrl));
logger.info("查看录制列表返回 :" + JSON.toJSONString(response));
return response;
}
}
@PostMapping({"search"})
public Response<PageInfo<Mp4Info>> streamView(@RequestBody PageStreamRequest request) {
logger.info("查看录制 search :" + JSON.toJSONString(request));
if (!Objects.isNull(request.getPageIndex()) && !Objects.isNull(request.getPageSize()) && request.getPageIndex() >= 0 && request.getPageSize() > 0) {
List<Mp4Info> mp4s = this.searchAll(request.getStreamId());
if (!Strings.isBlank(request.getStartTime())) {
long filerTime = Long.parseLong(request.getStartTime()) - 900000L;
mp4s = (List)mp4s.stream().filter((info) -> info.getStartTime().compareTo(Long.toString(filerTime)) >= 0).collect(Collectors.toList());
}
if (!Strings.isBlank(request.getEndTime())) {
long filerTime = Long.parseLong(request.getEndTime());
mp4s = (List)mp4s.stream().filter((info) -> info.getStartTime().compareTo(Long.toString(filerTime)) <= 0).collect(Collectors.toList());
}
mp4s.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime()));
PageInfo<Mp4Info> pageInfo = new PageInfo(mp4s, request.getPageIndex(), request.getPageSize());
logger.info("查看录制 search 返回:" + JSON.toJSONString(request));
return Response.success(pageInfo);
} else {
logger.error("查看录制 search 入参错误!"); logger.error("查看录制 search 入参错误!");
return Response.fail(-100); return Response.fail(-100);
} }
List<Mp4Info> mp4s = searchAll(request.getStreamId());
if(!Strings.isBlank(request.getStartTime())){
long filerTime = Long.parseLong(request.getStartTime()) - 15 * 60 * 1000;
mp4s = mp4s.stream()
.filter(info -> info.getStartTime().compareTo(Long.toString(filerTime)) >= 0)
.collect(Collectors.toList()); // 将结果收集到列表中
}
if(!Strings.isBlank(request.getEndTime())){
long filerTime = Long.parseLong(request.getEndTime());
mp4s = mp4s.stream()
.filter(info -> info.getStartTime().compareTo(Long.toString(filerTime)) <= 0)
.collect(Collectors.toList()); // 将结果收集到列表中
}
/**
* 倒序排列
*/
mp4s.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime()));
PageInfo<Mp4Info> pageInfo = new PageInfo<>(mp4s,request.getPageIndex(),request.getPageSize());
logger.info("查看录制 search 返回:"+JSON.toJSONString(request));
return Response.success(pageInfo);
} }
@GetMapping({"history"})
public Response<List<Mp4Info>> streamView(@RequestParam String streamUrl) {
logger.info("查看录像历史返回 :" + streamUrl);
List<Mp4Info> dayMp4 = this.searchAll(streamUrl);
dayMp4.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime()));
logger.info("查看录像历史返回_ :" + JSON.toJSONString(dayMp4));
return Response.success(dayMp4);
}
public List<Mp4Info> searchAll(String streamUrl){ public List<Mp4Info> searchAll(String streamUrl) {
List<Mp4Info> dayMp4 = new ArrayList();
List<Mp4Info> dayMp4 = new ArrayList<>(); for(DayRecord dayRecord : this.iRecordService.findDaysPath()) {
List<DayRecord> dayRecords = iRecordService.findDaysPath(); for(StreamRecord stream : this.iRecordService.findInDayRecord(dayRecord)) {
for(DayRecord dayRecord : dayRecords){ if (stream.getStreamId().equals(streamUrl)) {
List<StreamRecord> streamRecords = iRecordService.findInDayRecord(dayRecord); List<Mp4Record> mp4Records = stream.queryMp4Records();
for(StreamRecord stream : streamRecords){ if (Objects.nonNull(mp4Records) && !mp4Records.isEmpty()) {
if(stream.getStreamId().equals(streamUrl)){ for(Mp4Record mp4Record : mp4Records) {
List<Mp4Record> mp4Records = stream.queryMp4Records();
if(Objects.nonNull(mp4Records) && !mp4Records.isEmpty()){
for(Mp4Record mp4Record : mp4Records){
Mp4Info mp4Info = new Mp4Info(); Mp4Info mp4Info = new Mp4Info();
mp4Info.setStartTime(mp4Record.getStartTime()); mp4Info.setStartTime(mp4Record.getStartTime());
mp4Info.setUrl(dayRecord.getDay()+ String var10001 = dayRecord.getDay();
File.separator + stream.getStreamId() + File.separator + mp4Record.getMp4()); mp4Info.setUrl(var10001 + File.separator + stream.getStreamId() + File.separator + mp4Record.getMp4());
dayMp4.add(mp4Info); dayMp4.add(mp4Info);
} }
} }
} }
} }
} }
return dayMp4;
return dayMp4;
} }
@GetMapping({"exit"})
public String exit() {
new Thread(() -> {
try {
this.taskService.stopAllTask();
Thread.sleep(10000L);
} catch (Exception var2) {
}
System.exit(0);
});
return "OK";
}
@Value("${srs.domain}") @Value("${srs.domain}")
private String srsdomain; private String srsdomain;
@ -175,35 +205,60 @@ public class StreamRecordController {
@Value("${srs.name}") @Value("${srs.name}")
private String srsname; private String srsname;
// public String streamSwitch(String source) {
// if (source.contains("stream.t-aaron.com")) {
// return source;
// } else if (source.contains("rtmp://live.push.t-aaron.com")) {
// source = source.replace("rtmp://live.push.t-aaron.com", "http://live.play.t-aaron.com");
// if (source.endsWith("_")) {
// String var10000 = source.substring(0, source.length() - 1);
// source = var10000 + ".flv_";
// } else {
// source = source + ".flv";
// }
//
// return source;
// } else {
// if (source.contains("https://live.play.t-aaron.com")) {
// source = source.replace("https://live.play.t-aaron.com", "http://live.play.t-aaron.com");
// }
//
// return source;
// }
// }
public String streamSwitch(String source){ public String streamSwitch(String source){
if(Objects.nonNull(srsdomain) && !srsdomain.isEmpty()){ if(Objects.nonNull(srsname) && !srsname.isEmpty()){
if(Objects.nonNull(source) && !source.isEmpty()){ if(Objects.nonNull(source) && !source.isEmpty()){
return dockerFix(source); return dockerFix(source);
} }
} }
if (source.contains("stream.t-aaron.com")) {
if(source.contains("stream.t-aaron.com")){
return source; return source;
} else if (source.contains("rtmp://live.push.t-aaron.com")){ } else if (source.contains("rtmp://live.push.t-aaron.com")) {
source = source.replace("rtmp://live.push.t-aaron.com","http://live.play.t-aaron.com") ; source = source.replace("rtmp://live.push.t-aaron.com", "http://live.play.t-aaron.com");
if(source.endsWith("_")){ if (source.endsWith("_")) {
source = source.substring(0,source.length()-1) + ".flv" + "_"; String var10000 = source.substring(0, source.length() - 1);
}else { source = var10000 + ".flv_";
} else {
source = source + ".flv"; source = source + ".flv";
} }
return source; return source;
}else { } else {
if(source.contains("https://live.play.t-aaron.com")){ if (source.contains("https://live.play.t-aaron.com")) {
source = source.replace("https://live.play.t-aaron.com","http://live.play.t-aaron.com"); source = source.replace("https://live.play.t-aaron.com", "http://live.play.t-aaron.com");
} }
return source; return source;
} }
} }
public String dockerFix(String url) { public String dockerFix(String url) {
logger.info("dockerFix{}",url);
if (url == null || url.isEmpty()) { if (url == null || url.isEmpty()) {
return url; return url;
} }
@ -223,8 +278,7 @@ public class StreamRecordController {
return withNewDomain.replaceFirst("\\.flv$", ""); return withNewDomain.replaceFirst("\\.flv$", "");
} }
return url; return url;
} }
} }

View File

@ -1,5 +1,6 @@
package com.tuoheng.steam.dos; package com.tuoheng.steam.dos;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.tuoheng.steam.util.TimeUtils; import com.tuoheng.steam.util.TimeUtils;
import java.io.Serializable; import java.io.Serializable;
@ -9,13 +10,16 @@ import java.util.concurrent.CompletableFuture;
public class StreamProcess implements Serializable { public class StreamProcess implements Serializable {
@JsonIgnore
Process process; Process process;
Date createTime; Date createTime;
String stopTime; String stopTime;
ProcessType processType; ProcessType processType;
String fileName; String fileName;
public Process getProcess() {
return process;
}
public String startTime(){ public String startTime(){
return TimeUtils.formatDateToString(createTime); return TimeUtils.formatDateToString(createTime);

View File

@ -1,54 +1,58 @@
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.tuoheng.steam.dos; package com.tuoheng.steam.dos;
import com.tuoheng.steam.util.TimeUtils; import com.tuoheng.steam.util.TimeUtils;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
public class StreamTask implements Serializable { public class StreamTask implements Serializable {
Long taskId = generatePid();
Long taskId;
Date startTime; Date startTime;
String strStartTime; String strStartTime;
String duration;
String stopCommandTime;
List<StreamProcess> streamProcesses = new ArrayList();
StreamProcess mergeProcess;
private static Long pid = 0L;
String streamUrl;
String outFileName;
String transaction;
public String getTransaction() {
return transaction;
}
public void setTransaction(String transaction) {
this.transaction = transaction;
}
public String getDuration() { public String getDuration() {
return duration; return this.duration;
} }
public void setDuration(String duration) { public void setDuration(String duration) {
this.duration = duration; this.duration = duration;
} }
String duration; private static synchronized Long generatePid() {
/** if (pid == Long.MAX_VALUE) {
* 接收到结束命令的时间
*/
String stopCommandTime;
/**
* 执行生成TS文件的命令
*/
List<StreamProcess> streamProcesses = new ArrayList<>();
/**
* 合并进程的命令
*/
StreamProcess mergeProcess;
private static Long pid = 0L;
private synchronized static Long generatePid(){
if(pid==Long.MAX_VALUE){
pid = 0L; pid = 0L;
}else { } else {
pid++; Long var0 = pid;
pid = pid + 1L;
} }
return pid; return pid;
} }
public String getStrStartTime() { public String getStrStartTime() {
return strStartTime; return this.strStartTime;
} }
public void setStrStartTime(String strStartTime) { public void setStrStartTime(String strStartTime) {
@ -59,19 +63,16 @@ public class StreamTask implements Serializable {
this.strStartTime = startTime; this.strStartTime = startTime;
} }
public String getStopCommandTime() { public String getStopCommandTime() {
return stopCommandTime; return this.stopCommandTime;
} }
public void setStopCommandTime(String stopCommandTime) { public void setStopCommandTime(String stopCommandTime) {
this.stopCommandTime = stopCommandTime; this.stopCommandTime = stopCommandTime;
} }
public List<StreamProcess> getStreamProcesses() { public List<StreamProcess> getStreamProcesses() {
return streamProcesses; return this.streamProcesses;
} }
public void setStreamProcesses(List<StreamProcess> streamProcesses) { public void setStreamProcesses(List<StreamProcess> streamProcesses) {
@ -79,7 +80,7 @@ public class StreamTask implements Serializable {
} }
public StreamProcess getMergeProcess() { public StreamProcess getMergeProcess() {
return mergeProcess; return this.mergeProcess;
} }
public void setMergeProcess(StreamProcess mergeProcess) { public void setMergeProcess(StreamProcess mergeProcess) {
@ -87,19 +88,15 @@ public class StreamTask implements Serializable {
} }
public String getStreamUrl() { public String getStreamUrl() {
return streamUrl; return this.streamUrl;
} }
public void setStreamUrl(String streamUrl) { public void setStreamUrl(String streamUrl) {
this.streamUrl = streamUrl; this.streamUrl = streamUrl;
} }
String streamUrl;
public Long getTaskId() { public Long getTaskId() {
return taskId; return this.taskId;
} }
public void setTaskId(Long taskId) { public void setTaskId(Long taskId) {
@ -107,7 +104,7 @@ public class StreamTask implements Serializable {
} }
public Date getStartTime() { public Date getStartTime() {
return startTime; return this.startTime;
} }
public void setStartTime(Date startTime) { public void setStartTime(Date startTime) {
@ -115,22 +112,17 @@ public class StreamTask implements Serializable {
} }
public StreamTask(String streamUrl) { public StreamTask(String streamUrl) {
this.taskId = generatePid();
this.streamUrl = streamUrl; this.streamUrl = streamUrl;
this.startTime = new Date(); this.startTime = new Date();
this.strStartTime = TimeUtils.formatDateToString(this.startTime); this.strStartTime = TimeUtils.formatDateToString(this.startTime);
this.stopCommandTime = null; this.stopCommandTime = null;
} }
public String getOutFileName() { public String getOutFileName() {
return outFileName; return this.outFileName;
} }
public void setOutFileName(String outFileName) { public void setOutFileName(String outFileName) {
this.outFileName = outFileName; this.outFileName = outFileName;
} }
String outFileName;
} }

View File

@ -1,78 +1,62 @@
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.tuoheng.steam.schedule; package com.tuoheng.steam.schedule;
import com.tuoheng.steam.service.IRecordService; import com.tuoheng.steam.service.IRecordService;
import com.tuoheng.steam.service.TaskService;
import com.tuoheng.steam.service.dos.DayRecord; import com.tuoheng.steam.service.dos.DayRecord;
import com.tuoheng.steam.service.dos.FlvRecord; import com.tuoheng.steam.service.dos.FlvRecord;
import com.tuoheng.steam.service.dos.Mp4Record;
import com.tuoheng.steam.service.dos.StreamRecord; import com.tuoheng.steam.service.dos.StreamRecord;
import com.tuoheng.steam.util.TimeUtils; import com.tuoheng.steam.util.TimeUtils;
import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*;
@Component @Component
public class Scheduler { public class Scheduler {
private static final Logger logger = LoggerFactory.getLogger(Scheduler.class); private static final Logger logger = LoggerFactory.getLogger(Scheduler.class);
@Autowired @Autowired
IRecordService iRecordService; IRecordService iRecordService;
@Value("${livedates}") @Value("${livedates}")
private Integer livedates; private Integer livedates;
@Value("${cangneiwai}") @Value("${cangneiwai}")
private Boolean cangneiwai; private Boolean cangneiwai;
/**
* 初次执行延迟6秒执行 public Scheduler() {
* 每隔 60 分钟执行一次 60*60*1000 }
*/
@Scheduled(fixedRate = 3600000, initialDelay = 6000) @Scheduled(
fixedRate = 3600000L,
initialDelay = 6000L
)
public void mergeTask() { public void mergeTask() {
if (!this.cangneiwai) {
if(!cangneiwai){
logger.info("舱内外无需录制"); logger.info("舱内外无需录制");
return; } else {
} Logger var10000 = logger;
long var10001 = System.currentTimeMillis();
var10000.info("开始FLV到MP4的转换 - " + var10001 / 1000L);
List<DayRecord> dayRecords = this.iRecordService.findDaysPath();
logger.info("开始FLV到MP4的转换 - " + System.currentTimeMillis() / 1000); for(int index = 0; index < dayRecords.size(); ++index) {
List<DayRecord> dayRecords = iRecordService.findDaysPath(); DayRecord dayRecord = (DayRecord)dayRecords.get(index);
for (int index = 0; index < dayRecords.size(); index++) { if (TimeUtils.isBefore(dayRecord.getDay(), this.livedates)) {
dayRecord.clear();
DayRecord dayRecord = dayRecords.get(index); } else {
if(TimeUtils.isBefore(dayRecord.getDay(),livedates)){ for(StreamRecord streamRecord : dayRecord.getStreamRecords()) {
dayRecord.clear(); for(FlvRecord flvRecord : streamRecord.queryFlvRecords()) {
} else { this.iRecordService.mergeMp4(flvRecord);
List<StreamRecord> streamRecords = dayRecord.getStreamRecords(); }
for(StreamRecord streamRecord : streamRecords){
List<FlvRecord> flvRecords = streamRecord.queryFlvRecords();
for(FlvRecord flvRecord : flvRecords){
iRecordService.mergeMp4(flvRecord);
} }
} }
} }
} }
} }
// public static boolean isWithin15Minutes(String timestamp1, String timestamp2) {
// // 将字符串转换为 long 类型
// long time1 = Long.parseLong(timestamp1);
// long time2 = Long.parseLong(timestamp2);
// // 计算时间差的绝对值
// long diff = Math.abs(time2 - time1);
// // 15 分钟 = 15 * 60 * 1000 毫秒
// long fifteenMinutesInMillis = 15 * 60 * 1000;
// // 判断是否小于或等于 15 分钟
// return diff <= fifteenMinutesInMillis;
// }
} }

View File

@ -2,6 +2,8 @@ package com.tuoheng.steam.service;
import com.tuoheng.steam.dos.StreamTask; import com.tuoheng.steam.dos.StreamTask;
import java.util.List;
/** /**
* 视频录制服务 * 视频录制服务
*/ */
@ -15,5 +17,7 @@ public interface ITaskService {
public StreamTask getLastTask(String streamUrl); public StreamTask getLastTask(String streamUrl);
public List<StreamTask> getTaskList(String streamUrl);
public void stopAllTask(); public void stopAllTask();
} }

View File

@ -1,67 +1,66 @@
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.tuoheng.steam.service; package com.tuoheng.steam.service;
import com.tuoheng.steam.service.dos.*; import com.tuoheng.steam.service.dos.DayRecord;
import com.tuoheng.steam.service.dos.FlvRecord;
import com.tuoheng.steam.service.dos.StreamRecord;
import com.tuoheng.steam.service.innerService.ProcessService; import com.tuoheng.steam.service.innerService.ProcessService;
import com.tuoheng.steam.util.FileUtil;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.File;
import java.util.*;
import java.util.stream.Collectors;
import static com.tuoheng.steam.util.FileUtil.readFiles;
@Component @Component
public class RecordService implements IRecordService { public class RecordService implements IRecordService {
@Value("${recordPath}") @Value("${recordPath}")
private String recordPath; private String recordPath;
@Autowired @Autowired
ProcessService processService; ProcessService processService;
private static final Logger logger = LoggerFactory.getLogger(IRecordService.class); private static final Logger logger = LoggerFactory.getLogger(IRecordService.class);
public RecordService() {
}
public static boolean isValidDate(String dateStr) { public static boolean isValidDate(String dateStr) {
// 正则表达式4位年份 + 2位月份 + 2位日期
String regex = "^\\d{4}(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])$"; String regex = "^\\d{4}(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])$";
return dateStr.matches(regex); return dateStr.matches(regex);
} }
@Override
public List<DayRecord> findDaysPath() { public List<DayRecord> findDaysPath() {
List<DayRecord> days = new ArrayList<>(); List<DayRecord> days = new ArrayList();
List<String> fileNames = readFiles(recordPath);
for (String fileName : fileNames) { for(String fileName : FileUtil.readFiles(this.recordPath)) {
if(isValidDate(fileName)){ if (isValidDate(fileName)) {
DayRecord day = new DayRecord(); DayRecord day = new DayRecord();
day.setRoot(recordPath); day.setRoot(this.recordPath);
day.setDay(fileName); day.setDay(fileName);
days.add(day); days.add(day);
} }
} }
days.sort(Comparator.comparing(DayRecord::getDay)); days.sort(Comparator.comparing(DayRecord::getDay));
return days; return days;
} }
@Override
public List<StreamRecord> findInDayRecord(DayRecord dayRecord) { public List<StreamRecord> findInDayRecord(DayRecord dayRecord) {
return dayRecord.getStreamRecords(); return dayRecord.getStreamRecords();
} }
@Override
public void mergeMp4(FlvRecord flvRecord) { public void mergeMp4(FlvRecord flvRecord) {
try { try {
processService.mergeMp4(flvRecord); this.processService.mergeMp4(flvRecord);
}catch (Exception e){ } catch (Exception e) {
logger.error(e.getMessage()); logger.error(e.getMessage());
} }
} }
} }

View File

@ -1,3 +1,8 @@
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.tuoheng.steam.service; package com.tuoheng.steam.service;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
@ -7,6 +12,17 @@ import com.tuoheng.steam.service.innerService.ProcessService;
import com.tuoheng.steam.util.FileUtil; import com.tuoheng.steam.util.FileUtil;
import com.tuoheng.steam.util.ProcessManager; import com.tuoheng.steam.util.ProcessManager;
import com.tuoheng.steam.util.TimeUtils; import com.tuoheng.steam.util.TimeUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -16,352 +32,306 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.*;
import java.util.concurrent.*;
@Service @Service
@EnableScheduling @EnableScheduling
public class TaskService implements ITaskService{ public class TaskService implements ITaskService {
@Value("${srs.targetPath}") @Value("${srs.targetPath}")
private String targetPath; private String targetPath;
private String ffmpeg = "ffmpeg";
// @Value("${ffmpeg}")
private String ffmpeg ="ffmpeg";
ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
/**
* 清除没有被关闭的进程
*/
@Scheduled(fixedRate = 60000)
public void cleaTask() {
for (ConcurrentHashMap.Entry<String, StreamTask> entry : runningTasks.entrySet()) {
StreamTask streamTask = entry.getValue();
if(streamTask.getStartTime().getTime() < new Date().getTime() - 90 * 60 * 1000 ) {
logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask));
try {
StreamTask s = stopTask(entry.getKey());
scheduler.schedule(() -> {
File file = new File(targetPath +"/"+ s.getOutFileName());
try {
logger.info("废弃文件删除成功 {}",file.delete());
}catch (Exception e) {
logger.error(e.getMessage());
}
}, 60, TimeUnit.SECONDS);
}catch (Exception ignore) {}
}
}
}
private static final Logger logger = LoggerFactory.getLogger(TaskService.class); private static final Logger logger = LoggerFactory.getLogger(TaskService.class);
@Autowired @Autowired
ProcessService processService; ProcessService processService;
ConcurrentHashMap<String, StreamTask> runningTasks = new ConcurrentHashMap();
/** ConcurrentHashMap<String, Deque<StreamTask>> historyTasks = new ConcurrentHashMap();
* 执行中的任务
*/
ConcurrentHashMap<String, StreamTask> runningTasks = new ConcurrentHashMap<>();
/**
* 历史任务
*/
ConcurrentHashMap<String, Deque<StreamTask>> historyTasks = new ConcurrentHashMap<>();
/**
* 任务池
*/
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
public TaskService() {
}
@Scheduled(
fixedRate = 60000L
)
public void cleaTask() {
for(Map.Entry<String, StreamTask> entry : this.runningTasks.entrySet()) {
StreamTask streamTask = (StreamTask)entry.getValue();
if (streamTask.getStartTime().getTime() < (new Date()).getTime() - 5400000L) {
logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask));
try {
StreamTask s = this.stopTask((String)entry.getKey());
this.scheduler.schedule(() -> {
String var10002 = this.targetPath;
File file = new File(var10002 + "/" + s.getOutFileName());
try {
logger.info("废弃文件删除成功 {}", file.delete());
} catch (Exception e) {
logger.error(e.getMessage());
}
}, 60L, TimeUnit.SECONDS);
} catch (Exception var5) {
}
}
}
}
@PostConstruct @PostConstruct
public void init() { public void init() {
this.scheduler.scheduleWithFixedDelay(new Runnable() {
/**
*
*/
scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() { public void run() {
runningTasks.forEach((key, value) -> { TaskService.this.runningTasks.forEach((key, value) -> {
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.HOUR, -2); calendar.add(10, -2);
Date twoHoursAgo = calendar.getTime(); Date twoHoursAgo = calendar.getTime();
if (value.getStartTime().before(twoHoursAgo)) { if (value.getStartTime().before(twoHoursAgo)) {
logger.error("taskId {} 执行超时,手动关闭", key); TaskService.logger.error("taskId {} 执行超时,手动关闭", key);
try { try {
stopTask(key); TaskService.this.stopTask(key);
}catch (Exception ignore) {} } catch (Exception var6) {
}
});
for (Map.Entry<String, Deque<StreamTask>> entry : historyTasks.entrySet()) {
for (StreamTask task : entry.getValue()) {
List<StreamProcess> streamProcesses = task.getStreamProcesses();
for (StreamProcess process : streamProcesses) {
process.destroy();
File file = new File(process.getFileName());
FileUtil.deleteFile(file);
} }
} }
while (entry.getValue().size()>10){ });
entry.getValue().pollFirst();
for(Map.Entry<String, Deque<StreamTask>> entry : TaskService.this.historyTasks.entrySet()) {
for(StreamTask task : entry.getValue()) {
for(StreamProcess process : task.getStreamProcesses()) {
process.destroy();
File file = new File(process.getFileName());
FileUtil.deleteFile(file);
}
}
while(((Deque)entry.getValue()).size() > 10) {
((Deque)entry.getValue()).pollFirst();
} }
} }
} }
}, 60, 60, TimeUnit.SECONDS); }, 60L, 60L, TimeUnit.SECONDS);
} }
public String startPic(String streamUrl) { public String startPic(String streamUrl) {
try { try {
StreamProcess streamProcess = processService.takePic(streamUrl); StreamProcess streamProcess = this.processService.takePic(streamUrl);
String outFileName = streamProcess.getFileName(); String outFileName = streamProcess.getFileName();
/*
注册进程
*/
ProcessManager.registerProcess(streamProcess.getInnerProcessId()); ProcessManager.registerProcess(streamProcess.getInnerProcessId());
logger.info("streamUrl {} startPicProcess {} ", streamUrl,streamProcess.getInnerProcessId()); logger.info("streamUrl {} startPicProcess {} ", streamUrl, streamProcess.getInnerProcessId());
CompletableFuture<Process> future = streamProcess.onExit(); CompletableFuture<Process> future = streamProcess.onExit();
future.thenRun(() -> { future.thenRun(() -> {
logger.info("streamUrl {} startPicProcess {} Over ", streamUrl,streamProcess.getInnerProcessId()); logger.info("streamUrl {} startPicProcess {} Over ", streamUrl, streamProcess.getInnerProcessId());
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId());
}).exceptionally(ex -> { }).exceptionally((ex) -> {
logger.info("streamUrl {} startPicProcess {} Exceptionally ", streamUrl,streamProcess.getInnerProcessId()); logger.info("streamUrl {} startPicProcess {} Exceptionally ", streamUrl, streamProcess.getInnerProcessId());
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId());
return null; return null;
}); });
return outFileName; return outFileName;
}catch (Exception e) { } catch (Exception e) {
logger.info("streamUrl {} ", streamUrl,e); logger.info("streamUrl {} ", streamUrl, e);
return null;
} }
return null;
} }
public StreamTask startTask(String streamUrl) { public StreamTask startTask(String streamUrl) {
if (this.runningTasks.containsKey(streamUrl)) {
/* return (StreamTask)this.runningTasks.get(streamUrl);
当前有任务,则返回当前的任务
*/
if (runningTasks.containsKey(streamUrl)) {
return runningTasks.get(streamUrl);
}
/*
当前无任务,则开启新任务
*/
StreamTask taskInstance = new StreamTask(streamUrl);
if (runningTasks.putIfAbsent(streamUrl, taskInstance) == null) {
logger.info("streamUrl {} startTask {} ", streamUrl, taskInstance.getTaskId());
startTask(streamUrl, taskInstance);
// runningTasks.put(streamUrl, taskInstance);
return taskInstance;
} else { } else {
return runningTasks.get(streamUrl); StreamTask taskInstance = new StreamTask(streamUrl);
// taskInstance.setTransaction(transaction);
if (this.runningTasks.putIfAbsent(streamUrl, taskInstance) == null) {
logger.info("streamUrl {} startTask {} transaction {}", streamUrl, taskInstance.getTaskId(), taskInstance.getTransaction());
this.startTask(streamUrl, taskInstance);
return taskInstance;
} else {
return (StreamTask)this.runningTasks.get(streamUrl);
}
} }
} }
public StreamTask stopTask(String streamUrl) throws Exception { public StreamTask stopTask(final String streamUrl) throws Exception {
boolean recordSuccess = true; boolean recordSuccess = true;
StreamTask currentStreamTask = runningTasks.remove(streamUrl); final StreamTask currentStreamTask = (StreamTask)this.runningTasks.remove(streamUrl);
if (currentStreamTask == null) {
if (currentStreamTask!= null) { throw new Exception("");
} else {
currentStreamTask.setStopCommandTime(TimeUtils.formatDateToString(new Date())); currentStreamTask.setStopCommandTime(TimeUtils.formatDateToString(new Date()));
if (!CollectionUtils.isEmpty(currentStreamTask.getStreamProcesses())) { if (!CollectionUtils.isEmpty(currentStreamTask.getStreamProcesses())) {
for (StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) { for(StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) {
if(!new File(streamProcess.getFileName()).exists()){ if (!(new File(streamProcess.getFileName())).exists()) {
recordSuccess = false; recordSuccess = false;
logger.error("流录制失败: streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId()); logger.error("流录制失败: streamUrl {} taskId {} destroy Process {}", new Object[]{streamUrl, currentStreamTask.getTaskId(), streamProcess.getInnerProcessId()});
}else { } else {
logger.info("streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId()); logger.info("streamUrl {} taskId {} destroy Process {}", new Object[]{streamUrl, currentStreamTask.getTaskId(), streamProcess.getInnerProcessId()});
} }
streamProcess.destroy(); streamProcess.destroy();
} }
} }
if(!recordSuccess){ if (!recordSuccess) {
throw new Exception(""); throw new Exception("");
} } else {
try {
String outFileName = UUID.randomUUID().toString() + ".mp4";
StreamProcess mergeProcess = this.processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses());
if (Objects.nonNull(mergeProcess)) {
ProcessManager.registerProcess(mergeProcess.getInnerProcessId());
currentStreamTask.setMergeProcess(mergeProcess);
logger.info("streamUrl {} taskId {} startMergeProcess {} ", new Object[]{streamUrl, currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId()});
CompletableFuture<Process> future = mergeProcess.onExit();
future.thenRun(() -> {
logger.info("streamUrl {} taskId {} MergeProcess {} Over", new Object[]{streamUrl, currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId()});
mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId());
try { for(StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) {
String outFileName = UUID.randomUUID().toString() + ".mp4"; File file = new File(streamProcess.getFileName());
StreamProcess mergeProcess = processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses()); FileUtil.deleteFile(file);
}
if(Objects.nonNull(mergeProcess)){ }).exceptionally((ex) -> {
/** logger.error("streamUrl {} taskId {} MergeProcess {} exceptionally", new Object[]{streamUrl, currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId(), ex});
* 注册进程 mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
*/ ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId());
ProcessManager.registerProcess(mergeProcess.getInnerProcessId());
currentStreamTask.setMergeProcess(mergeProcess);
for(StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) {
File file = new File(streamProcess.getFileName());
FileUtil.deleteFile(file);
}
logger.info("streamUrl {} taskId {} startMergeProcess {} ", streamUrl, currentStreamTask.getTaskId(), return null;
mergeProcess.getInnerProcessId()); });
currentStreamTask.setOutFileName(outFileName);
}
CompletableFuture<Process> future = mergeProcess.onExit(); if (Objects.nonNull(currentStreamTask.getMergeProcess()) && Objects.nonNull(currentStreamTask.getMergeProcess().getFileName()) && !currentStreamTask.getMergeProcess().getFileName().isEmpty()) {
future.thenRun(() -> {
logger.info("streamUrl {} taskId {} MergeProcess {} Over", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId());
mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId());
for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){
File file = new File(streamProcess.getFileName());
FileUtil.deleteFile(file);
}
}).exceptionally(ex -> {
logger.error("streamUrl {} taskId {} MergeProcess {} exceptionally", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId(), ex);
mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId());
for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){
File file = new File(streamProcess.getFileName());
FileUtil.deleteFile(file);
}
return null;
});
currentStreamTask.setOutFileName(outFileName);
}
if(Objects.nonNull(currentStreamTask.getMergeProcess())){
if(Objects.nonNull(currentStreamTask.getMergeProcess().getFileName()) &&
!currentStreamTask.getMergeProcess().getFileName().isEmpty()){
try { try {
executor.schedule(new Runnable() { this.executor.schedule(new Runnable() {
@Override
public void run() { public void run() {
try { try {
logger.info("FileName :"+currentStreamTask.getMergeProcess().getFileName()); TaskService.logger.info("FileName :" + currentStreamTask.getMergeProcess().getFileName());
Process process = Runtime.getRuntime().exec(ffmpeg + " -i " + Runtime var10000 = Runtime.getRuntime();
currentStreamTask.getMergeProcess().getFileName()); String var10001 = TaskService.this.ffmpeg;
Process process = var10000.exec(var10001 + " -i " + currentStreamTask.getMergeProcess().getFileName());
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream())); BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
String line; String line;
while ((line = reader.readLine()) != null) { while((line = reader.readLine()) != null) {
logger.info("Duration line: " + line); TaskService.logger.info("Duration line: " + line);
if (line.contains("Duration")) { if (line.contains("Duration")) {
String duration = line.split("Duration: ")[1].split(",")[0]; String duration = line.split("Duration: ")[1].split(",")[0];
logger.info("Duration: " + duration.substring(0, 8)); TaskService.logger.info("Duration: " + duration.substring(0, 8));
currentStreamTask.setDuration(duration.substring(0, 8)); currentStreamTask.setDuration(duration.substring(0, 8));
logger.info("-----------------放入缓存-----------------"); TaskService.logger.info("-----------------放入缓存-----------------");
/** if (TaskService.this.historyTasks.containsKey(streamUrl)) {
* 放入缓存 ((Deque)TaskService.this.historyTasks.get(streamUrl)).add(currentStreamTask);
*/ } else {
if( historyTasks.containsKey(streamUrl)){ TaskService.this.historyTasks.put(streamUrl, new LinkedList());
historyTasks.get(streamUrl).add(currentStreamTask); ((Deque)TaskService.this.historyTasks.get(streamUrl)).offerLast(currentStreamTask);
}else {
historyTasks.put(streamUrl, new LinkedList<>());
historyTasks.get(streamUrl).offerLast(currentStreamTask);
} }
break; break;
} }
} }
reader.close();
}catch (Exception e){
logger.info(e.getMessage());
}
}
},40, TimeUnit.SECONDS);
reader.close();
} catch (Exception e) {
TaskService.logger.info(e.getMessage());
}
}
}, 40L, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
return currentStreamTask;
} catch (Exception e) {
logger.info("taskId {} Stop exceptionally", streamUrl, e);
return null;
} }
return currentStreamTask;
} catch (Exception e) {
logger.info("taskId {} Stop exceptionally", streamUrl,e);
return null;
} }
}else {
throw new Exception("");
// if(historyTasks.containsKey(streamUrl)){
// return historyTasks.get(streamUrl).peekLast();
// }else {
// return null;
// }
} }
} }
public StreamTask getLastTask(String streamUrl) { public List<StreamTask> getTaskList(String streamUrl){
StreamTask current = runningTasks.get(streamUrl); List<StreamTask> ret = new ArrayList<>();
StreamTask current = (StreamTask)this.runningTasks.get(streamUrl);
if(Objects.nonNull(current)){ if(Objects.nonNull(current)){
return current; ret.add(current);
}else { }
if(historyTasks.containsKey(streamUrl)){ if(this.historyTasks.containsKey(streamUrl)) {
return historyTasks.get(streamUrl).peekLast(); List<StreamTask> historys = new ArrayList<>(this.historyTasks.get(streamUrl));
if(!historys.isEmpty()) {
ret.addAll(historys);
} }
return null; }
return ret;
}
public StreamTask getLastTask(String streamUrl) {
StreamTask current = (StreamTask)this.runningTasks.get(streamUrl);
if (Objects.nonNull(current)) {
return current;
} else {
return this.historyTasks.containsKey(streamUrl) ? (StreamTask)((Deque)this.historyTasks.get(streamUrl)).peekLast() : null;
} }
} }
private void startTask(String streamUrl, StreamTask streamTask) { private void startTask(String streamUrl, StreamTask streamTask) {
try { try {
/*
任务不存在了,直接退出
*/
if (streamTask.getStopCommandTime() != null) { if (streamTask.getStopCommandTime() != null) {
logger.info("streamUrl {} Task {} Stopped", streamUrl, streamTask.getTaskId()); logger.info("streamUrl {} Task {} Stopped", streamUrl, streamTask.getTaskId());
return; return;
} }
StreamProcess streamProcess = processService.recordStream(streamUrl); StreamProcess streamProcess = this.processService.recordStream(streamUrl);
/**
* 注册进程
*/
ProcessManager.registerProcess(streamProcess.getInnerProcessId()); ProcessManager.registerProcess(streamProcess.getInnerProcessId());
streamTask.getStreamProcesses().add(streamProcess); streamTask.getStreamProcesses().add(streamProcess);
logger.info("streamUrl {} taskId {} startProcess {} ", new Object[]{streamUrl, streamTask.getTaskId(), streamProcess.getInnerProcessId()});
logger.info("streamUrl {} taskId {} startProcess {} ", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId());
CompletableFuture<Process> future = streamProcess.onExit(); CompletableFuture<Process> future = streamProcess.onExit();
future.thenRun(() -> { future.thenRun(() -> {
logger.info("streamUrl {} taskId {} Process {} Over", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId()); logger.info("streamUrl {} taskId {} Process {} Over", new Object[]{streamUrl, streamTask.getTaskId(), streamProcess.getInnerProcessId()});
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId());
try { try {
Thread.sleep(2000L); Thread.sleep(2000L);
} catch (Exception ignored){} } catch (Exception var5) {
if(!new File(streamProcess.getFileName()).exists()){ }
if (!(new File(streamProcess.getFileName())).exists()) {
streamTask.getStreamProcesses().remove(streamProcess); streamTask.getStreamProcesses().remove(streamProcess);
} }
startTask(streamUrl, streamTask);
}).exceptionally(ex -> { this.startTask(streamUrl, streamTask);
logger.info("streamUrl {} taskId {} Process {} Exceptionally", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId(),ex); }).exceptionally((ex) -> {
logger.info("streamUrl {} taskId {} Process {} Exceptionally", new Object[]{streamUrl, streamTask.getTaskId(), streamProcess.getInnerProcessId(), ex});
streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); streamProcess.setStopTime(TimeUtils.formatDateToString(new Date()));
startTask(streamUrl, streamTask); this.startTask(streamUrl, streamTask);
ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId());
return null; return null;
}); });
} catch (IOException e) { } catch (IOException e) {
logger.info("streamUrl {} taskId {} startProcess Exception", streamUrl,streamTask.getTaskId(),e); logger.info("streamUrl {} taskId {} startProcess Exception", new Object[]{streamUrl, streamTask.getTaskId(), e});
} }
} }
public void stopAllTask() {
public void stopAllTask(){ this.runningTasks.forEach((key, value) -> {
runningTasks.forEach((key, value) -> {
try { try {
stopTask(key); this.stopTask(key);
}catch (Exception ignore){} } catch (Exception var4) {
}
}); });
} }
} }

View File

@ -1,249 +1,212 @@
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.tuoheng.steam.service.innerService; package com.tuoheng.steam.service.innerService;
import com.tuoheng.steam.dos.StreamProcess;
import com.tuoheng.steam.dos.ProcessType; import com.tuoheng.steam.dos.ProcessType;
import com.tuoheng.steam.dos.StreamProcess;
import com.tuoheng.steam.service.dos.FlvRecord; import com.tuoheng.steam.service.dos.FlvRecord;
import com.tuoheng.steam.util.FileUtil; import com.tuoheng.steam.util.FileUtil;
import com.tuoheng.steam.util.ProcessManager; import java.io.BufferedReader;
import com.tuoheng.steam.util.TimeUtils; import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Service @Service
public class ProcessService { public class ProcessService {
@Value("${srs.splitPath}") @Value("${srs.splitPath}")
private String splitPath; private String splitPath;
// @Value("${ffmpeg}")
private String ffmpeg = "ffmpeg"; private String ffmpeg = "ffmpeg";
@Value("${srs.targetPath}") @Value("${srs.targetPath}")
private String targetPath; private String targetPath;
@Value("${recordPath}") @Value("${recordPath}")
private String recordPath; private String recordPath;
@Value("${video.bitrate:2000k}")
private String videoBitrate;
@Value("${audio.bitrate:128k}")
private String audioBitrate;
private static final Logger logger = LoggerFactory.getLogger(ProcessService.class); private static final Logger logger = LoggerFactory.getLogger(ProcessService.class);
/**
* 任务池
*/
ExecutorService loggingService = Executors.newCachedThreadPool(); ExecutorService loggingService = Executors.newCachedThreadPool();
public void mergeMp4(FlvRecord flvRecord) throws IOException, ExecutionException, InterruptedException { public ProcessService() {
}
String command = ffmpeg + " -i "+ recordPath+ File.separator +
flvRecord.getStream().getDayRecord().getDay() + File.separator +
flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv() + " -c:v copy -c:a aac " + recordPath+ File.separator +
flvRecord.getStream().getDayRecord().getDay() + File.separator +
flvRecord.getStream().getStreamId() + File.separator +flvRecord.getStartTime() + ".mp4";
public void mergeMp4(final FlvRecord flvRecord) throws IOException, ExecutionException, InterruptedException {
String var10000 = this.ffmpeg;
final String command = var10000 + " -i " + this.recordPath + File.separator + flvRecord.getStream().getDayRecord().getDay() + File.separator + flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv() + " -c:v copy -c:a aac " + this.recordPath + File.separator + flvRecord.getStream().getDayRecord().getDay() + File.separator + flvRecord.getStream().getStreamId() + File.separator + flvRecord.getStartTime() + ".mp4";
ProcessBuilder pb = new ProcessBuilder(command.split(" ")); ProcessBuilder pb = new ProcessBuilder(command.split(" "));
pb.redirectErrorStream(true); pb.redirectErrorStream(true);
Process process = pb.start(); final Process process = pb.start();
this.loggingService.execute(new Runnable() {
loggingService.execute(new Runnable() {
@Override
public void run() { public void run() {
logger.info("mergeMp4 process Start {}",command); ProcessService.logger.info("mergeMp4 process Start {}", command);
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) { String line;
String line; try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
while ((line = reader.readLine()) != null) { while((line = reader.readLine()) != null) {
logger.info("mergeMp4-------- {}",line); ProcessService.logger.info("mergeMp4-------- {}", line);
} }
} catch (IOException e) { } catch (IOException e) {
logger.info("mergeMp4-------- Over",e); ProcessService.logger.info("mergeMp4-------- Over", e);
} }
logger.info("mergeMp4 Over");
File delete = new File(recordPath+ File.separator + ProcessService.logger.info("mergeMp4 Over");
flvRecord.getStream().getDayRecord().getDay() + File.separator + String var10002 = ProcessService.this.recordPath;
flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv()); File delete = new File(var10002 + File.separator + flvRecord.getStream().getDayRecord().getDay() + File.separator + flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv());
FileUtil.deleteFile(delete); FileUtil.deleteFile(delete);
} }
}); });
CompletableFuture<Process> future = process.onExit(); CompletableFuture<Process> future = process.onExit();
// 阻塞等待进程结束
Process completedProcess = null; Process completedProcess = null;
try { try {
completedProcess = future.get(4, TimeUnit.SECONDS); completedProcess = (Process)future.get(4L, TimeUnit.SECONDS);
logger.info("mergeMp4正常完成-------- Over"); logger.info("mergeMp4正常完成-------- Over");
}catch (Exception e) { } catch (Exception e) {
String fileName = recordPath+ File.separator + var10000 = this.recordPath;
flvRecord.getStream().getDayRecord().getDay() + File.separator + String fileName = var10000 + File.separator + flvRecord.getStream().getDayRecord().getDay() + File.separator + flvRecord.getStream().getStreamId() + File.separator + flvRecord.getStartTime() + ".mp4";
flvRecord.getStream().getStreamId() + File.separator +flvRecord.getStartTime() + ".mp4";
File file = new File(fileName); File file = new File(fileName);
if(file.exists()){ if (file.exists()) {
logger.info("mergeMp4超时4S完成-------- Over",e); logger.info("mergeMp4超时4S完成-------- Over", e);
process.destroy(); process.destroy();
} else { } else {
try { try {
completedProcess = future.get(4, TimeUnit.SECONDS); completedProcess = (Process)future.get(4L, TimeUnit.SECONDS);
}catch (Exception e1) { } catch (Exception var11) {
if(file.exists()){ if (file.exists()) {
logger.info("mergeMp4超时8S完成-------- Over",e); logger.info("mergeMp4超时8S完成-------- Over", e);
process.destroy(); process.destroy();
}else { } else {
logger.info("mergeMp4超时8S未完成-------- Over",e); logger.info("mergeMp4超时8S未完成-------- Over", e);
process.destroy(); process.destroy();
} }
} }
} }
} }
if(Objects.nonNull(completedProcess)) { if (Objects.nonNull(completedProcess)) {
// 检查进程是否成功结束
if (completedProcess.exitValue() == 0) { if (completedProcess.exitValue() == 0) {
logger.info("进程成功结束!"); logger.info("进程成功结束!");
} else { } else {
logger.info("进程失败,退出码 {} " ,completedProcess.exitValue()); logger.info("进程失败,退出码 {} ", completedProcess.exitValue());
} }
} }
} }
public StreamProcess takePic(String streamUrl) throws IOException { public StreamProcess takePic(String streamUrl) throws IOException {
String outFileName = targetPath + "/" +UUID.randomUUID().toString() +".jpg"; String var10000 = this.targetPath;
String command = String.format( String outFileName = var10000 + "/" + UUID.randomUUID().toString() + ".jpg";
ffmpeg+ " -i %s -vf fps=1 -frames:v 1 -q:v 2 %s", String command = String.format(this.ffmpeg + " -i %s -vf fps=1 -frames:v 1 -q:v 2 %s", streamUrl, outFileName);
streamUrl, outFileName); List<String> listCommand = new ArrayList();
listCommand.add(this.ffmpeg);
List<String> listCommand = new ArrayList<>();
listCommand.add(ffmpeg); // ffmpeg 路径
listCommand.add("-i"); listCommand.add("-i");
listCommand.add(streamUrl); // 流地址 listCommand.add(streamUrl);
listCommand.add("-vf"); listCommand.add("-vf");
listCommand.add("fps=1"); listCommand.add("fps=1");
listCommand.add("-frames:v"); listCommand.add("-frames:v");
listCommand.add("1"); listCommand.add("1");
listCommand.add("-q:v"); listCommand.add("-q:v");
listCommand.add("2"); listCommand.add("2");
listCommand.add(outFileName); // 输出文件名 listCommand.add(outFileName);
logger.info("takePic {}", listCommand); logger.info("takePic {}", listCommand);
ProcessBuilder pb = new ProcessBuilder(listCommand); ProcessBuilder pb = new ProcessBuilder(listCommand);
pb.redirectErrorStream(true); pb.redirectErrorStream(true);
Process process = pb.start(); final Process process = pb.start();
this.loggingService.execute(new Runnable() {
loggingService.execute(new Runnable() {
@Override
public void run() { public void run() {
try (BufferedReader reader = new BufferedReader( String line;
new InputStreamReader(process.getInputStream()))) { try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line; while((line = reader.readLine()) != null) {
while ((line = reader.readLine()) != null) { ProcessService.logger.info("recordStream-------- {}", line);
logger.info("recordStream-------- {}",line);
} }
} catch (IOException e) { } catch (IOException var6) {
logger.info("takePic-------- Over"); ProcessService.logger.info("takePic-------- Over");
} }
} }
}); });
return new StreamProcess(process,outFileName, ProcessType.PIC); return new StreamProcess(process, outFileName, ProcessType.PIC);
} }
public StreamProcess recordStream(String streamUrl) throws IOException { public StreamProcess recordStream(String streamUrl) throws IOException {
String var10000 = this.splitPath;
String recordFileName = splitPath+'/'+UUID.randomUUID().toString() +".ts"; String recordFileName = var10000 + "/" + UUID.randomUUID().toString() + ".ts";
/** String command = String.format(this.ffmpeg + " -i %s -c:v libx264 -b:v %s -c:a aac -b:a %s -f mpegts -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 2 %s", Optional.of(streamUrl).filter((s) -> s.endsWith("_")).map((s) -> s.substring(0, s.length() - 1)).orElse(streamUrl), this.videoBitrate, this.audioBitrate, recordFileName);
* 如果streamUrl末尾有个_,在实际录制的时候需要将_去除
*/
String command = String.format(
ffmpeg+ " -i %s -c copy -f mpegts -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 2 %s",
Optional.of(streamUrl)
.filter(s -> s.endsWith("_"))
.map(s -> s.substring(0, s.length() - 1))
.orElse(streamUrl), recordFileName);
logger.info("recordStream {}", command); logger.info("recordStream {}", command);
ProcessBuilder pb = new ProcessBuilder(command.split(" ")); ProcessBuilder pb = new ProcessBuilder(command.split(" "));
pb.redirectErrorStream(true); pb.redirectErrorStream(true);
Process process = pb.start(); final Process process = pb.start();
this.loggingService.execute(new Runnable() {
loggingService.execute(new Runnable() {
@Override
public void run() { public void run() {
try (BufferedReader reader = new BufferedReader( try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
new InputStreamReader(process.getInputStream()))) { while(reader.readLine() != null) {
String line;
while ((line = reader.readLine()) != null) {
// logger.info("recordStream-------- {}",line);
} }
} catch (IOException e) { } catch (IOException var6) {
logger.info("recordStream-------- Over"); ProcessService.logger.info("recordStream-------- Over");
} }
} }
}); });
return new StreamProcess(process, recordFileName, ProcessType.RECORD);
return new StreamProcess(process,recordFileName, ProcessType.RECORD); }
};
public StreamProcess mergeStream(String outFileName, List<StreamProcess> streamProcesses) throws IOException { public StreamProcess mergeStream(String outFileName, List<StreamProcess> streamProcesses) throws IOException {
List<String> fileList = new ArrayList();
List<String> fileList = new ArrayList<>(); for(StreamProcess streamProcess : streamProcesses) {
for(StreamProcess streamProcess : streamProcesses){
File newFile = new File(streamProcess.getFileName()); File newFile = new File(streamProcess.getFileName());
if(newFile.exists()){ if (newFile.exists()) {
fileList.add(streamProcess.getFileName()); fileList.add(streamProcess.getFileName());
} }
} }
if (fileList.isEmpty()) {
if(fileList.isEmpty()){
return null; return null;
} } else {
String command = ""; String command = "";
outFileName = targetPath + "/" +outFileName; outFileName = this.targetPath + "/" + outFileName;
if(fileList.size() == 1){ if (fileList.size() == 1) {
command = String.format( command = String.format(this.ffmpeg + " -i %s -c copy %s", fileList.get(0), outFileName);
ffmpeg+ " -i %s -c copy %s", } else {
fileList.get(0), outFileName); String filePaths = String.join("|", fileList);
}else { command = String.format(this.ffmpeg + " -i \"concat:%s\" -c copy %s", filePaths, outFileName);
String filePaths = String.join("|", fileList);
command = String.format(
ffmpeg+ " -i \"concat:%s\" -c copy %s",
filePaths, outFileName);
}
logger.info("mergeStream {}", command);
ProcessBuilder pb = new ProcessBuilder(command.split(" "));
pb.redirectErrorStream(true);
Process process = pb.start();
loggingService.execute(new Runnable() {
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
logger.info("mergeStream-------- {}",line);
}
} catch (IOException e) {
logger.info("mergeStream-------- Over");
}
} }
});
return new StreamProcess(process,outFileName,ProcessType.MERGE);
};
logger.info("mergeStream {}", command);
ProcessBuilder pb = new ProcessBuilder(command.split(" "));
pb.redirectErrorStream(true);
final Process process = pb.start();
this.loggingService.execute(new Runnable() {
public void run() {
String line;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
while((line = reader.readLine()) != null) {
ProcessService.logger.info("mergeStream-------- {}", line);
}
} catch (IOException var6) {
ProcessService.logger.info("mergeStream-------- Over");
}
}
});
return new StreamProcess(process, outFileName, ProcessType.MERGE);
}
}
} }

View File

@ -1,100 +1,95 @@
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.tuoheng.steam.util; package com.tuoheng.steam.util;
import com.tuoheng.steam.controller.StreamRecordController;
import com.tuoheng.steam.service.dos.DayRecord;
import com.tuoheng.steam.service.dos.StreamType; import com.tuoheng.steam.service.dos.StreamType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileUtil { public class FileUtil {
private static final Logger logger = LoggerFactory.getLogger(FileUtil.class); private static final Logger logger = LoggerFactory.getLogger(FileUtil.class);
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); public FileUtil() {
}
public static void deleteFile(File file) { public static void deleteFile(File file) {
if(file.exists()) { if (file.exists()) {
file.delete(); file.delete();
} }
} }
public static void deleteFolder(String stringPath) { public static void deleteFolder(String stringPath) {
try { try {
Path path = Paths.get(stringPath); Path path = Paths.get(stringPath);
if (Files.exists(path)) { if (Files.exists(path, new LinkOption[0])) {
try (Stream<Path> pathStream = Files.walk(path)) { try (Stream<Path> pathStream = Files.walk(path)) {
pathStream.sorted((p1, p2) -> -p1.compareTo(p2)) // 从子文件/文件夹开始删除 pathStream.sorted((p1, p2) -> -p1.compareTo(p2)).forEach((p) -> {
.forEach(p -> { try {
try { Files.delete(p);
Files.delete(p); // 删除文件或文件夹 } catch (IOException e) {
} catch (IOException e) { System.err.println("无法删除: " + p + ", 原因: " + e.getMessage());
System.err.println("无法删除: " + p + ", 原因: " + e.getMessage()); }
}
}); });
} }
} else { } else {
logger.info("文件夹不存在: " + path); logger.info("文件夹不存在: " + path);
} }
}catch (Exception e) { } catch (Exception e) {
logger.error("deleteFolder",e); logger.error("deleteFolder", e);
} }
} }
public static List<String> readFiles(String stringPath) { public static List<String> readFiles(String stringPath) {
List<String> list = new ArrayList();
List<String> list = new ArrayList<>();
File directory = new File(stringPath); File directory = new File(stringPath);
// 检查目录是否存在且是一个目录
if (directory.exists() && directory.isDirectory()) { if (directory.exists() && directory.isDirectory()) {
// 获取目录下的所有子文件和文件夹
File[] files = directory.listFiles(); File[] files = directory.listFiles();
if (files != null) { if (files != null) {
for (File file : files) { for(File file : files) {
list.add(file.getName()); list.add(file.getName());
} }
} }
} }
return list; return list;
} }
public static boolean isTempFile(String fileName) { public static boolean isTempFile(String fileName) {
String[] result = fileName.split("\\."); String[] result = fileName.split("\\.");
return result.length == 3; return result.length == 3;
} }
public static boolean isFlv(String fileName) { public static boolean isFlv(String fileName) {
return fileName.endsWith("flv"); return fileName.endsWith("flv");
} }
public static boolean isMp4(String fileName) { public static boolean isMp4(String fileName) {
return fileName.endsWith("mp4"); return fileName.endsWith("mp4");
} }
public static StreamType getStreamType(String fileName) { public static StreamType getStreamType(String fileName) {
if(fileName.endsWith("outer")){ if (fileName.endsWith("outer")) {
return StreamType.Outer; return StreamType.Outer;
}else if(fileName.endsWith("inner")){ } else {
return StreamType.Inner; return fileName.endsWith("inner") ? StreamType.Inner : StreamType.LiveVideo;
}else{
return StreamType.LiveVideo;
} }
} }
} }

View File

@ -1,110 +1,127 @@
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.tuoheng.steam.util; package com.tuoheng.steam.util;
import com.tuoheng.steam.controller.StreamRecordController; import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.*;
import java.util.*;
@Service @Service
public class ProcessManager { public class ProcessManager {
private static final Logger logger = LoggerFactory.getLogger(ProcessManager.class); private static final Logger logger = LoggerFactory.getLogger(ProcessManager.class);
private static final String DELIMITER = "="; private static final String DELIMITER = "=";
static Map<Long, String> runningProcessIds = new HashMap();
static String pidPath = "pid.txt";
static Map<Long, String> runningProcessIds = new HashMap<>(); public ProcessManager() {
}
static String pidPath ="pid.txt"; public static void registerProcess(Long process) {
runningProcessIds.put(process, TimeUtils.formatDateToString(new Date()));
static {
runningProcessIds = loadFromFile(pidPath);
for(Map.Entry<Long, String> entry : runningProcessIds.entrySet()){
try {
killProcessByPID(entry.getKey());
}catch (Exception e){
e.printStackTrace();
}
}
runningProcessIds.clear();
writeToFile(runningProcessIds); writeToFile(runningProcessIds);
} }
public static void registerProcess(Long process) { public static void unRegisterProcess(Long process) {
runningProcessIds.put(process,TimeUtils.formatDateToString(new Date()));
writeToFile(runningProcessIds);
}
public static void unRegisterProcess(Long process) {
runningProcessIds.remove(process); runningProcessIds.remove(process);
writeToFile(runningProcessIds); writeToFile(runningProcessIds);
} }
public static void writeToFile(Map<Long, String> map){ public static void writeToFile(Map<Long, String> map) {
try (BufferedWriter writer = new BufferedWriter(new FileWriter(pidPath))) { try (BufferedWriter writer = new BufferedWriter(new FileWriter(pidPath))) {
for (Map.Entry<Long, String> entry : map.entrySet()) { for(Map.Entry<Long, String> entry : map.entrySet()) {
writer.write(entry.getKey() + DELIMITER + entry.getValue()); Object var10001 = entry.getKey();
writer.write(var10001 + "=" + (String)entry.getValue());
writer.newLine(); writer.newLine();
} }
}catch (IOException ignored){} } catch (IOException var6) {
}
} }
public static void appendToFile(Long pid,String strDate) { public static void appendToFile(Long pid, String strDate) {
try (BufferedWriter writer = new BufferedWriter(new FileWriter(pidPath, true))) {
writer.write(pid + "=" + strDate);
writer.newLine();
} catch (IOException var7) {
}
try (BufferedWriter writer = new BufferedWriter(new FileWriter(pidPath,true))) {
writer.write(pid+ DELIMITER + strDate);
writer.newLine();
}catch (IOException ignored) {}
} }
public static Map<Long, String> loadFromFile(String filePath){ public static Map<Long, String> loadFromFile(String filePath) {
Map<Long, String> map = new LinkedHashMap<>(); Map<Long, String> map = new LinkedHashMap();
File file = new File(filePath); File file = new File(filePath);
try { try {
if(!file.exists()){ if (!file.exists()) {
file.createNewFile(); file.createNewFile();
} }
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
try {
String line; String line;
while ((line = reader.readLine()) != null) { try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
String[] parts = line.split(ProcessManager.DELIMITER, 2); // Split into key and value while((line = reader.readLine()) != null) {
if (parts.length == 2) { String[] parts = line.split("=", 2);
Long key = Long.parseLong(parts[0].trim()); if (parts.length == 2) {
String value = parts[1].trim(); Long key = Long.parseLong(parts[0].trim());
map.put(key, value); String value = parts[1].trim();
map.put(key, value);
}
} }
} }
return map;
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return map;
} }
public static void killProcessByPID(long pid) throws IOException, InterruptedException { public static void killProcessByPID(long pid) throws IOException, InterruptedException {
String os = System.getProperty("os.name").toLowerCase(); String os = System.getProperty("os.name").toLowerCase();
ProcessBuilder processBuilder; ProcessBuilder processBuilder;
if (os.contains("win")) { if (os.contains("win")) {
// Windows: Use taskkill command processBuilder = new ProcessBuilder(new String[]{"cmd", "/c", "taskkill /PID " + pid + " /F"});
processBuilder = new ProcessBuilder("cmd", "/c", "taskkill /PID " + pid + " /F");
} else { } else {
// Linux/Unix/macOS: Use kill command processBuilder = new ProcessBuilder(new String[]{"bash", "-c", "kill -9 " + pid});
processBuilder = new ProcessBuilder("bash", "-c", "kill -9 " + pid);
} }
Process process = processBuilder.start(); Process process = processBuilder.start();
int exitCode = process.waitFor(); int exitCode = process.waitFor();
if (exitCode == 0) { if (exitCode == 0) {
logger.info("Process with PID " + pid + " terminated successfully."); logger.info("Process with PID " + pid + " terminated successfully.");
} else { } else {
logger.error("Failed to terminate process with PID " + pid + ". Exit code: " + exitCode); logger.error("Failed to terminate process with PID " + pid + ". Exit code: " + exitCode);
} }
} }
static {
runningProcessIds = loadFromFile(pidPath);
for(Map.Entry<Long, String> entry : runningProcessIds.entrySet()) {
try {
killProcessByPID((Long)entry.getKey());
} catch (Exception e) {
e.printStackTrace();
}
}
runningProcessIds.clear();
writeToFile(runningProcessIds);
}
} }

View File

@ -7,10 +7,8 @@
#recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record #recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record
#livedates=8 #livedates=8
#cangneiwai=false #cangneiwai=false
#srs.domain = ""
#srs.name = "" #大数据局生产
#
#大数据局
#spring.application.name=stream_server #spring.application.name=stream_server
#server.port = 8989 #server.port = 8989
#srs.splitPath=/data/java/srs/stream_server/temp #srs.splitPath=/data/java/srs/stream_server/temp
@ -19,8 +17,10 @@
#recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record #recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record
#livedates=8 #livedates=8
#cangneiwai=true #cangneiwai=true
#srs.domain = ""
#srs.name = "" # 录制码率配置 (单位: kbps, 2000k = 2Mbps)
video.bitrate=4000k
audio.bitrate=128k
#本地测试 #本地测试
#spring.application.name=stream_server #spring.application.name=stream_server
@ -31,13 +31,9 @@
#recordPath=/Users/sunpeng/workspace/stream/record #recordPath=/Users/sunpeng/workspace/stream/record
#livedates=7 #livedates=7
#cangneiwai=false #cangneiwai=false
#srs.domain=
#srs.name=
##容器化部署 #大数据容器化
#通过注入 ##通过注入
srs.domain = STREAM.t-aaron.com
#通过注入
srs.name = STREAM srs.name = STREAM
spring.application.name=stream_server spring.application.name=stream_server
server.port = 8080 server.port = 8080
@ -49,4 +45,4 @@ ffmpeg=ffmpeg
# #
recordPath=/data/record recordPath=/data/record
livedates=8 livedates=8
cangneiwai=true cangneiwai=false