From 5a4d0e2404c1480f8c0b3d2699514fb1dee15886 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=B0=8F=E4=BA=91?= Date: Thu, 13 Mar 2025 09:49:12 +0800 Subject: [PATCH] xx --- pom.xml | 56 +-- .../controller/StreamRecordController.java | 155 +-------- .../steam/controller/dto/PageInfo.java | 71 ---- .../controller/dto/PageStreamRequest.java | 53 --- .../steam/controller/dto/Response.java | 37 -- .../java/com/tuoheng/steam/dos/Mp4Info.java | 28 -- .../com/tuoheng/steam/dos/ProcessType.java | 5 - .../com/tuoheng/steam/dos/StreamProcess.java | 78 ----- .../com/tuoheng/steam/dos/StreamTask.java | 136 -------- .../com/tuoheng/steam/schedule/Scheduler.java | 221 ------------ .../tuoheng/steam/service/IRecordService.java | 18 - .../tuoheng/steam/service/ITaskService.java | 14 - .../tuoheng/steam/service/RecordService.java | 67 ---- .../tuoheng/steam/service/TaskService.java | 328 ------------------ .../tuoheng/steam/service/dos/DayRecord.java | 50 --- .../tuoheng/steam/service/dos/FlvRecord.java | 44 --- .../tuoheng/steam/service/dos/Mp4Record.java | 45 --- .../steam/service/dos/StreamRecord.java | 95 ----- .../tuoheng/steam/service/dos/StreamType.java | 5 - .../service/innerService/ProcessService.java | 182 ---------- .../java/com/tuoheng/steam/util/FileUtil.java | 88 ----- .../tuoheng/steam/util/ProcessManager.java | 105 ------ .../com/tuoheng/steam/util/TimeUtils.java | 32 -- src/main/resources/application.properties | 23 +- 24 files changed, 25 insertions(+), 1911 deletions(-) delete mode 100644 src/main/java/com/tuoheng/steam/controller/dto/PageInfo.java delete mode 100644 src/main/java/com/tuoheng/steam/controller/dto/PageStreamRequest.java delete mode 100644 src/main/java/com/tuoheng/steam/controller/dto/Response.java delete mode 100644 src/main/java/com/tuoheng/steam/dos/Mp4Info.java delete mode 100644 src/main/java/com/tuoheng/steam/dos/ProcessType.java delete mode 100644 src/main/java/com/tuoheng/steam/dos/StreamProcess.java delete mode 100644 src/main/java/com/tuoheng/steam/dos/StreamTask.java delete mode 100644 src/main/java/com/tuoheng/steam/schedule/Scheduler.java delete mode 100644 src/main/java/com/tuoheng/steam/service/IRecordService.java delete mode 100644 src/main/java/com/tuoheng/steam/service/ITaskService.java delete mode 100644 src/main/java/com/tuoheng/steam/service/RecordService.java delete mode 100644 src/main/java/com/tuoheng/steam/service/TaskService.java delete mode 100644 src/main/java/com/tuoheng/steam/service/dos/DayRecord.java delete mode 100644 src/main/java/com/tuoheng/steam/service/dos/FlvRecord.java delete mode 100644 src/main/java/com/tuoheng/steam/service/dos/Mp4Record.java delete mode 100644 src/main/java/com/tuoheng/steam/service/dos/StreamRecord.java delete mode 100644 src/main/java/com/tuoheng/steam/service/dos/StreamType.java delete mode 100644 src/main/java/com/tuoheng/steam/service/innerService/ProcessService.java delete mode 100644 src/main/java/com/tuoheng/steam/util/FileUtil.java delete mode 100644 src/main/java/com/tuoheng/steam/util/ProcessManager.java delete mode 100644 src/main/java/com/tuoheng/steam/util/TimeUtils.java diff --git a/pom.xml b/pom.xml index 398f135..d0bfa2f 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.boot spring-boot-starter-parent - 2.7.18 + 2.3.5.RELEASE com.example @@ -13,23 +13,16 @@ 0.0.1-SNAPSHOT demo demo - - - - - - - - - - - - - 11 + + + org.springframework.cloud + spring-cloud-starter-consul-discovery + + org.springframework.boot spring-boot-starter-web @@ -41,31 +34,19 @@ test - - io.minio - minio - 3.0.10 - - - com.alibaba.fastjson2 - fastjson2 - 2.0.53 - - - - - - - - - io.minio - minio - 8.3.3 - - - + + + + org.springframework.cloud + spring-cloud-dependencies + Hoxton.SR12 + pom + import + + + @@ -75,4 +56,5 @@ + diff --git a/src/main/java/com/tuoheng/steam/controller/StreamRecordController.java b/src/main/java/com/tuoheng/steam/controller/StreamRecordController.java index 2f12947..47c0d97 100644 --- a/src/main/java/com/tuoheng/steam/controller/StreamRecordController.java +++ b/src/main/java/com/tuoheng/steam/controller/StreamRecordController.java @@ -1,163 +1,14 @@ package com.tuoheng.steam.controller; - - -import com.alibaba.fastjson2.JSON; -import com.tuoheng.steam.controller.dto.PageInfo; -import com.tuoheng.steam.controller.dto.PageStreamRequest; -import com.tuoheng.steam.controller.dto.Response; -import com.tuoheng.steam.dos.Mp4Info; -import com.tuoheng.steam.dos.StreamTask; -import com.tuoheng.steam.service.IRecordService; -import com.tuoheng.steam.service.ITaskService; -import com.tuoheng.steam.service.dos.DayRecord; -import com.tuoheng.steam.service.dos.Mp4Record; -import com.tuoheng.steam.service.dos.StreamRecord; -import org.apache.logging.log4j.util.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; -import java.io.File; -import java.util.*; -import java.util.stream.Collectors; @RestController() -@RequestMapping("/record") public class StreamRecordController { - private static final Logger logger = LoggerFactory.getLogger(StreamRecordController.class); - - @Autowired - ITaskService taskService; - - @Autowired - IRecordService iRecordService; - - @GetMapping("start") - public Response startRecording(@RequestParam String streamUrl) { - System.out.println("启动录制 :"+streamUrl); - if(Objects.isNull(streamUrl)) { - return Response.fail(-1); - } - - Response response = Response.success(taskService.startTask(streamUrl)); - System.out.println("启动录制返回 :"+ JSON.toJSONString(response)); - return response; - } - - @GetMapping("stop") - public Response stopRecording(@RequestParam String streamUrl){ - - System.out.println("关闭录制 :"+streamUrl); - if(Objects.isNull(streamUrl)) { - return Response.fail(-1); - } - Response response = Response.success(taskService.stopTask(streamUrl)); - if(Objects.isNull(response.getData().getOutFileName()) || response.getData().getOutFileName().isEmpty()){ - response.setCode(500); - } - System.out.println("关闭录制返回 :"+ JSON.toJSONString(response)); - return response; - } - - @GetMapping("info") - public Response getLastTask(@RequestParam String streamUrl){ - - System.out.println("查看录制 :"+streamUrl); - if(Objects.isNull(streamUrl)) { - return Response.fail(-1); - } - Response response = Response.success(taskService.getLastTask(streamUrl)); - System.out.println("查看录制返回 :"+ JSON.toJSONString(response)); - return response; + @GetMapping("/") + public String home() { + return "Hello World!"; } - @PostMapping("search") - public Response> streamView(@RequestBody PageStreamRequest request){ - System.out.println("查看录制 search :"+JSON.toJSONString(request)); - - if(Objects.isNull(request.getPageIndex()) || Objects.isNull(request.getPageSize()) - || request.getPageIndex() <0 || request.getPageSize() <=0 ){ - System.out.println("查看录制 search 入参错误!"); - return Response.fail(-100); - } - - List mp4s = searchAll(request.getStreamId()); - - if(!Strings.isBlank(request.getStartTime())){ - long filerTime = Long.parseLong(request.getStartTime()) - 15 * 60 * 1000; - mp4s = mp4s.stream() - .filter(info -> info.getStartTime().compareTo(Long.toString(filerTime)) >= 0) - .collect(Collectors.toList()); // 将结果收集到列表中 - } - - if(!Strings.isBlank(request.getEndTime())){ - long filerTime = Long.parseLong(request.getEndTime()); - mp4s = mp4s.stream() - .filter(info -> info.getStartTime().compareTo(Long.toString(filerTime)) <= 0) - .collect(Collectors.toList()); // 将结果收集到列表中 - } - /** - * 倒序排列 - */ - mp4s.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime())); - PageInfo pageInfo = new PageInfo<>(mp4s,request.getPageIndex(),request.getPageSize()); - System.out.println("查看录制 search 返回:"+JSON.toJSONString(request)); - return Response.success(pageInfo); - } - - @GetMapping("history") - public Response> streamView(@RequestParam String streamUrl){ - System.out.println("查看录像历史返回 :"+ streamUrl); - List dayMp4 = searchAll(streamUrl); - dayMp4.sort((o1, o2) -> o2.getStartTime().compareTo(o1.getStartTime())); - System.out.println("查看录像历史返回 :"+ JSON.toJSONString(dayMp4)); - return Response.success(dayMp4) ; - } - - public List searchAll(String streamUrl){ - - List dayMp4 = new ArrayList<>(); - List dayRecords = iRecordService.findDaysPath(); - for(DayRecord dayRecord : dayRecords){ - List streamRecords = iRecordService.findInDayRecord(dayRecord); - for(StreamRecord stream : streamRecords){ - if(stream.getStreamId().equals(streamUrl)){ - List mp4Records = stream.queryMp4Records(); - if(Objects.nonNull(mp4Records) && !mp4Records.isEmpty()){ - for(Mp4Record mp4Record : mp4Records){ - Mp4Info mp4Info = new Mp4Info(); - mp4Info.setStartTime(mp4Record.getStartTime()); - mp4Info.setUrl(dayRecord.getDay()+ - File.separator + stream.getStreamId() + File.separator + mp4Record.getMp4()); - dayMp4.add(mp4Info); - } - } - } - } - } - return dayMp4; - } - - - - - /** - * 方便测试时候使用 - * @return - */ - @GetMapping("exit") - public String exit(){ - Thread thread = new Thread(() -> { - try { - taskService.stopAllTask(); - Thread.sleep(10000L); - }catch (Exception e){ - } - System.exit(0); - }); - return "OK"; - } } diff --git a/src/main/java/com/tuoheng/steam/controller/dto/PageInfo.java b/src/main/java/com/tuoheng/steam/controller/dto/PageInfo.java deleted file mode 100644 index d132672..0000000 --- a/src/main/java/com/tuoheng/steam/controller/dto/PageInfo.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.tuoheng.steam.controller.dto; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -public class PageInfo implements Serializable { - - public PageInfo(List list,int page,int pageSize) { - this.total = list.size(); - this.pageSize = pageSize; - this.pageIndex = page; - - // 计算分页数据 - int fromIndex = page * pageSize; // 起始索引 - int toIndex = Math.min(fromIndex + pageSize, total); // 结束索引 - - // 检查页码是否超出范围 - if (fromIndex >= total) { - // 如果页码超出范围,返回空列表 - this.list = Collections.emptyList(); - } else { - // 截取当前页的数据 - this.list = list.subList(fromIndex, toIndex); - } - } - - private List list; - private int total; - - public int getPageIndex() { - return pageIndex; - } - - public void setPageIndex(int pageIndex) { - this.pageIndex = pageIndex; - } - - private int pageSize; - private int pageIndex; - - public List getList() { - return list; - } - - public PageInfo(int pageIndex) { - this.pageIndex = pageIndex; - } - - public void setList(List list) { - this.list = list; - } - - public int getTotal() { - return total; - } - - public void setTotal(int total) { - this.total = total; - } - - public int getPageSize() { - return pageSize; - } - - public void setPageSize(int pageSize) { - this.pageSize = pageSize; - } - -} diff --git a/src/main/java/com/tuoheng/steam/controller/dto/PageStreamRequest.java b/src/main/java/com/tuoheng/steam/controller/dto/PageStreamRequest.java deleted file mode 100644 index 21ef226..0000000 --- a/src/main/java/com/tuoheng/steam/controller/dto/PageStreamRequest.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.tuoheng.steam.controller.dto; - -import java.io.Serializable; - -public class PageStreamRequest implements Serializable { - String streamId; - Integer pageIndex ; - Integer pageSize; - String startTime; - String endTime; - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public Integer getPageIndex() { - return pageIndex; - } - - public void setPageIndex(Integer pageIndex) { - this.pageIndex = pageIndex; - } - - public Integer getPageSize() { - return pageSize; - } - - public void setPageSize(Integer pageSize) { - this.pageSize = pageSize; - } - - public String getStartTime() { - return startTime; - } - - public void setStartTime(String startTime) { - this.startTime = startTime; - } - - public String getEndTime() { - return endTime; - } - - public void setEndTime(String endTime) { - this.endTime = endTime; - } - - -} diff --git a/src/main/java/com/tuoheng/steam/controller/dto/Response.java b/src/main/java/com/tuoheng/steam/controller/dto/Response.java deleted file mode 100644 index d682e94..0000000 --- a/src/main/java/com/tuoheng/steam/controller/dto/Response.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.tuoheng.steam.controller.dto; - -import java.io.Serializable; - -public class Response implements Serializable { - Integer code; - T data; - - public T getData() { - return data; - } - - public Integer getCode() { - return code; - } - - public void setCode(Integer code) { - this.code = code; - } - - public void setData(T data) { - this.data = data; - } - - public static Response success(T v){ - Response r = new Response(); - r.setData(v); - r.code = 200; - return r; - } - - public static Response fail(int code){ - Response r = new Response(); - r.code = code; - return r; - } -} diff --git a/src/main/java/com/tuoheng/steam/dos/Mp4Info.java b/src/main/java/com/tuoheng/steam/dos/Mp4Info.java deleted file mode 100644 index 310455e..0000000 --- a/src/main/java/com/tuoheng/steam/dos/Mp4Info.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.tuoheng.steam.dos; - -import java.io.Serializable; -import java.util.Date; - -public class Mp4Info implements Serializable { - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } - - - - String url; - String startTime; - - public String getStartTime() { - return startTime; - } - - public void setStartTime(String startTime) { - this.startTime = startTime; - } -} diff --git a/src/main/java/com/tuoheng/steam/dos/ProcessType.java b/src/main/java/com/tuoheng/steam/dos/ProcessType.java deleted file mode 100644 index 7b006a9..0000000 --- a/src/main/java/com/tuoheng/steam/dos/ProcessType.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.tuoheng.steam.dos; - -public enum ProcessType { - RECORD,MERGE -} diff --git a/src/main/java/com/tuoheng/steam/dos/StreamProcess.java b/src/main/java/com/tuoheng/steam/dos/StreamProcess.java deleted file mode 100644 index 6e2efcf..0000000 --- a/src/main/java/com/tuoheng/steam/dos/StreamProcess.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.tuoheng.steam.dos; - -import com.tuoheng.steam.util.TimeUtils; - -import java.io.Serializable; -import java.util.Date; -import java.util.concurrent.CompletableFuture; - - -public class StreamProcess implements Serializable { - - Process process; - Date createTime; - String stopTime; - ProcessType processType; - String fileName; - - - - public String startTime(){ - return TimeUtils.formatDateToString(createTime); - } - - public String getStopTime() { - return stopTime; - } - - public void setStopTime(String stopTime) { - this.stopTime = stopTime; - } - - public CompletableFuture onExit(){ - return process.onExit(); - } - - public void destroy(){ - process.destroy(); - } - - public ProcessType getProcessType() { - return processType; - } - - public void setProcessType(ProcessType processType) { - this.processType = processType; - } - - - - public Long getInnerProcessId() { - return process.pid(); - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public StreamProcess(Process process, String fileName, ProcessType processType) { - this.process = process; - this.fileName = fileName; - this.createTime = new Date(); - this.processType = processType; - } - - - - public String getFileName() { - return fileName; - } - - public void setFileName(String fileName) { - this.fileName = fileName; - } -} diff --git a/src/main/java/com/tuoheng/steam/dos/StreamTask.java b/src/main/java/com/tuoheng/steam/dos/StreamTask.java deleted file mode 100644 index f964ff9..0000000 --- a/src/main/java/com/tuoheng/steam/dos/StreamTask.java +++ /dev/null @@ -1,136 +0,0 @@ -package com.tuoheng.steam.dos; - -import com.tuoheng.steam.util.TimeUtils; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -public class StreamTask implements Serializable { - - Long taskId; - Date startTime; - String strStartTime; - - public String getDuration() { - return duration; - } - - public void setDuration(String duration) { - this.duration = duration; - } - - String duration; - /** - * 接收到结束命令的时间 - */ - String stopCommandTime; - /** - * 执行生成TS文件的命令 - */ - List streamProcesses = new ArrayList<>(); - /** - * 合并进程的命令 - */ - StreamProcess mergeProcess; - - private static Long pid = 0L; - - private synchronized static Long generatePid(){ - if(pid==Long.MAX_VALUE){ - pid = 0L; - }else { - pid++; - } - return pid; - } - - - public String getStrStartTime() { - return strStartTime; - } - - public void setStrStartTime(String strStartTime) { - this.strStartTime = strStartTime; - } - - public void setStartTime(String startTime) { - this.strStartTime = startTime; - } - - - public String getStopCommandTime() { - return stopCommandTime; - } - - public void setStopCommandTime(String stopCommandTime) { - this.stopCommandTime = stopCommandTime; - } - - - - public List getStreamProcesses() { - return streamProcesses; - } - - public void setStreamProcesses(List streamProcesses) { - this.streamProcesses = streamProcesses; - } - - public StreamProcess getMergeProcess() { - return mergeProcess; - } - - public void setMergeProcess(StreamProcess mergeProcess) { - this.mergeProcess = mergeProcess; - } - - public String getStreamUrl() { - return streamUrl; - } - - public void setStreamUrl(String streamUrl) { - this.streamUrl = streamUrl; - } - - - String streamUrl; - - - public Long getTaskId() { - return taskId; - } - - public void setTaskId(Long taskId) { - this.taskId = taskId; - } - - public Date getStartTime() { - return startTime; - } - - public void setStartTime(Date startTime) { - this.startTime = startTime; - } - - public StreamTask(String streamUrl) { - this.taskId = generatePid(); - this.streamUrl = streamUrl; - this.startTime = new Date(); - this.strStartTime = TimeUtils.formatDateToString(this.startTime); - this.stopCommandTime = null; - } - - - - public String getOutFileName() { - return outFileName; - } - - public void setOutFileName(String outFileName) { - this.outFileName = outFileName; - } - - String outFileName; -} diff --git a/src/main/java/com/tuoheng/steam/schedule/Scheduler.java b/src/main/java/com/tuoheng/steam/schedule/Scheduler.java deleted file mode 100644 index a0351af..0000000 --- a/src/main/java/com/tuoheng/steam/schedule/Scheduler.java +++ /dev/null @@ -1,221 +0,0 @@ -package com.tuoheng.steam.schedule; - -import com.tuoheng.steam.service.IRecordService; -import com.tuoheng.steam.service.TaskService; -import com.tuoheng.steam.service.dos.DayRecord; -import com.tuoheng.steam.service.dos.FlvRecord; -import com.tuoheng.steam.service.dos.Mp4Record; -import com.tuoheng.steam.service.dos.StreamRecord; -import com.tuoheng.steam.util.TimeUtils; -import io.minio.MinioClient; -import io.minio.PutObjectArgs; -import io.minio.credentials.AssumeRoleProvider; -import io.minio.credentials.Credentials; -import io.minio.errors.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - - -import java.io.*; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; -import java.util.*; - - -@Component -public class Scheduler { - - @Value("${minio.oss.endpoint}") - private String endpoint; - - @Value("${minio.oss.accessKeyId}") - private String accessKeyId; - - @Value("${minio.oss.accessKeySecret}") - private String accessKeySecret; - - @Value("${minio.oss.bucketName}") - private String bucketName; - - //文件存储目录 - @Value("${minio.oss.filedir}") - private String filedir; - - @Value("${minio.oss.dajiangName}") - private String dajiangName; - - @Value("${minio.oss.dajiangPassword}") - private String dajiangPassword; - - //授权策略,允许访问名为bucket的桶的目录 - public static final String ROLE_ARN = "arn:aws:s3:::"; - - - - private static final Logger logger = LoggerFactory.getLogger(Scheduler.class); - - @Autowired - IRecordService iRecordService; - - @Value("${livedates}") - private Integer livedates; - - /** - * 初次执行延迟6秒执行 - * 每隔 60 分钟执行一次 60*60*1000 - */ - @Scheduled(fixedRate = 3600000, initialDelay = 10) - public void mergeTask() throws Exception { -// File file = new File("/Users/sunpeng/workspace/text.txt"); -// InputStream inputStream = new FileInputStream(file); -// uploadFile2OSS(inputStream,"text.txt",file.length()); - - getCredentials(); - } - - - - - - /** - * 得到 临时凭据 - */ - private Credentials getCredentials() throws Exception { - - String POLICY_GET_AND_PUT = "{\n" + - " \"Version\": \"2012-10-17\",\n" + - " \"Statement\": [\n" + - " {\n" + - " \"Effect\": \"Allow\",\n" + - " \"Action\": [\n" + - " \"s3:GetObject\",\n" + - " \"s3:GetBucketLocation\",\n" + - " \"s3:PutObject\"\n" + - " ],\n" + - " \"Resource\": [\n" + - " \"arn:aws:s3:::"+bucketName+"/*\"\n" + - " ]\n" + - " }\n" + - " ]\n" + "}"; - - int durationSeconds = 360000;//秒 - //创建签名对象 - AssumeRoleProvider provider = new AssumeRoleProvider( - endpoint, - dajiangName, - dajiangPassword, - durationSeconds,//默认3600秒失效,设置小于这个就是3600,大于3600就实际值 - POLICY_GET_AND_PUT, - "us-east-1", - ROLE_ARN+bucketName+"/*", - "anysession", - null, - null); - - Credentials credentials = provider.fetch(); - - /** - * 下面的值按照大疆的要求传给大疆 - */ -// System.out.println("sessionToken=" + credentials.sessionToken()); -// System.out.println("accessKey=" + credentials.accessKey()); -// System.out.println("secretKey=" + credentials.secretKey()); -// System.out.println("isExpired=" + credentials.isExpired()); - - return credentials; - - } - - - - - private void uploadFile2OSS(InputStream inputStream, String fileName, long streamSize) throws Exception{ - // 初始化 MinioClient - MinioClient minioClient = MinioClient.builder() - .endpoint(endpoint) // MinIO 服务器地址 - .credentials(accessKeyId, accessKeySecret) // 访问密钥和秘密密钥 - .build(); - - - String contentType = getContentType(fileName.substring(fileName.lastIndexOf("."))); // 获取文件类型 - - minioClient.putObject( - PutObjectArgs.builder() - .bucket(bucketName) // 存储桶名称 - .object(filedir + "/" + fileName) // 对象名称(路径) - .stream(inputStream, streamSize, -1) // 输入流、文件大小(-1 表示未知大小) - .contentType(contentType) // 文件类型 - .build() - ); - } - - - private static String getContentType(String FilenameExtension) { - if (FilenameExtension.equalsIgnoreCase(".bmp")) { - return "image/bmp"; - } - if (FilenameExtension.equalsIgnoreCase(".gif")) { - return "image/gif"; - } - if (FilenameExtension.equalsIgnoreCase(".jpeg") || - FilenameExtension.equalsIgnoreCase(".jpg") || - FilenameExtension.equalsIgnoreCase(".png")) { - return "image/jpeg"; - } - if (FilenameExtension.equalsIgnoreCase(".html")) { - return "text/html"; - } - if (FilenameExtension.equalsIgnoreCase(".txt")) { - return "text/plain"; - } - if (FilenameExtension.equalsIgnoreCase(".vsd")) { - return "application/vnd.visio"; - } - if (FilenameExtension.equalsIgnoreCase(".pptx") || - FilenameExtension.equalsIgnoreCase(".ppt")) { - return "application/vnd.ms-powerpoint"; - } - if (FilenameExtension.equalsIgnoreCase(".docx") || - FilenameExtension.equalsIgnoreCase(".doc")) { - return "application/msword"; - } - if (FilenameExtension.equalsIgnoreCase(".xml")) { - return "text/xml"; - } - //PDF - if (FilenameExtension.equalsIgnoreCase(".pdf")) { - return "application/pdf"; - } - //excel - if (FilenameExtension.equalsIgnoreCase(".xls") || - FilenameExtension.equalsIgnoreCase(".xlsx")) { - return "application/octet-stream"; - } - //waypoints 拓恒+大疆的航线文件类型 - if (FilenameExtension.equalsIgnoreCase(".waypoints") || - FilenameExtension.equalsIgnoreCase(".kmz")) { - return "application/octet-stream"; - } - - return "image/jpeg"; - } - - - -// public static boolean isWithin15Minutes(String timestamp1, String timestamp2) { -// // 将字符串转换为 long 类型 -// long time1 = Long.parseLong(timestamp1); -// long time2 = Long.parseLong(timestamp2); -// // 计算时间差的绝对值 -// long diff = Math.abs(time2 - time1); -// // 15 分钟 = 15 * 60 * 1000 毫秒 -// long fifteenMinutesInMillis = 15 * 60 * 1000; -// // 判断是否小于或等于 15 分钟 -// return diff <= fifteenMinutesInMillis; -// } - -} diff --git a/src/main/java/com/tuoheng/steam/service/IRecordService.java b/src/main/java/com/tuoheng/steam/service/IRecordService.java deleted file mode 100644 index ded0f48..0000000 --- a/src/main/java/com/tuoheng/steam/service/IRecordService.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.tuoheng.steam.service; - -import com.tuoheng.steam.service.dos.*; - -import java.util.Date; -import java.util.List; - -public interface IRecordService { - - public List findDaysPath(); - - public List findInDayRecord(DayRecord dayRecord); - - - - public void mergeMp4(FlvRecord flvRecord); - -} diff --git a/src/main/java/com/tuoheng/steam/service/ITaskService.java b/src/main/java/com/tuoheng/steam/service/ITaskService.java deleted file mode 100644 index 711aad8..0000000 --- a/src/main/java/com/tuoheng/steam/service/ITaskService.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.tuoheng.steam.service; - -import com.tuoheng.steam.dos.StreamTask; - -public interface ITaskService { - - public StreamTask startTask(String streamUrl); - - public StreamTask stopTask(String streamUrl); - - public StreamTask getLastTask(String streamUrl); - - public void stopAllTask(); -} diff --git a/src/main/java/com/tuoheng/steam/service/RecordService.java b/src/main/java/com/tuoheng/steam/service/RecordService.java deleted file mode 100644 index de15658..0000000 --- a/src/main/java/com/tuoheng/steam/service/RecordService.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.tuoheng.steam.service; - -import com.tuoheng.steam.service.dos.*; -import com.tuoheng.steam.service.innerService.ProcessService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -import java.io.File; -import java.util.*; -import java.util.stream.Collectors; - -import static com.tuoheng.steam.util.FileUtil.readFiles; - -@Component -public class RecordService implements IRecordService { - - @Value("${recordPath}") - private String recordPath; - - @Autowired - ProcessService processService; - - private static final Logger logger = LoggerFactory.getLogger(IRecordService.class); - - public static boolean isValidDate(String dateStr) { - // 正则表达式:4位年份 + 2位月份 + 2位日期 - String regex = "^\\d{4}(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])$"; - return dateStr.matches(regex); - } - - @Override - public List findDaysPath() { - List days = new ArrayList<>(); - List fileNames = readFiles(recordPath); - for (String fileName : fileNames) { - if(isValidDate(fileName)){ - DayRecord day = new DayRecord(); - day.setRoot(recordPath); - day.setDay(fileName); - days.add(day); - } - } - days.sort(Comparator.comparing(DayRecord::getDay)); - return days; - } - - - @Override - public List findInDayRecord(DayRecord dayRecord) { - return dayRecord.getStreamRecords(); - } - - @Override - public void mergeMp4(FlvRecord flvRecord) { - try { - processService.mergeMp4(flvRecord); - }catch (Exception e){ - logger.error(e.getMessage()); - } - } - - - -} diff --git a/src/main/java/com/tuoheng/steam/service/TaskService.java b/src/main/java/com/tuoheng/steam/service/TaskService.java deleted file mode 100644 index 119d6dc..0000000 --- a/src/main/java/com/tuoheng/steam/service/TaskService.java +++ /dev/null @@ -1,328 +0,0 @@ -package com.tuoheng.steam.service; - -import com.alibaba.fastjson2.JSON; -import com.tuoheng.steam.dos.StreamProcess; -import com.tuoheng.steam.dos.StreamTask; -import com.tuoheng.steam.service.innerService.ProcessService; -import com.tuoheng.steam.util.ProcessManager; -import com.tuoheng.steam.util.TimeUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; - -import javax.annotation.PostConstruct; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.*; -import java.util.concurrent.*; - -@Service -@EnableScheduling -public class TaskService implements ITaskService{ - - @Value("${srs.targetPath}") - private String targetPath; - - @Value("${ffmpeg}") - private String ffmpeg; - - ScheduledExecutorService executor = Executors.newScheduledThreadPool(4); - - /** - * 清除没有被关闭的进程 - */ - @Scheduled(fixedRate = 60000) - public void cleaTask() { - for (ConcurrentHashMap.Entry entry : runningTasks.entrySet()) { - StreamTask streamTask = entry.getValue(); - if(streamTask.getStartTime().getTime() < new Date().getTime() - 30 * 60 * 1000 ) { - logger.info("清理废弃任务 {}", JSON.toJSONString(streamTask)); - StreamTask s = stopTask(entry.getKey()); - scheduler.schedule(() -> { - File file = new File(targetPath +"/"+ s.getOutFileName()); - try { - file.delete(); - logger.info("废弃文件删除成功"); - }catch (Exception e) { - logger.error(e.getMessage()); - } - }, 60, TimeUnit.SECONDS); - } - } - } - - private static final Logger logger = LoggerFactory.getLogger(TaskService.class); - - @Autowired - ProcessService processService; - - /** - * 执行中的任务 - */ - ConcurrentHashMap runningTasks = new ConcurrentHashMap<>(); - - /** - * 历史任务 - */ - ConcurrentHashMap> historyTasks = new ConcurrentHashMap<>(); - - /** - * 任务池 - */ - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); - - @PostConstruct - public void init() { - - /** - * - */ - scheduler.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - runningTasks.forEach((key, value) -> { - Calendar calendar = Calendar.getInstance(); - calendar.add(Calendar.HOUR, -2); - Date twoHoursAgo = calendar.getTime(); - if (value.getStartTime().before(twoHoursAgo)) { - logger.error("taskId {} 执行超时,手动关闭", key); - stopTask(key); - } - }); - - for (Map.Entry> entry : historyTasks.entrySet()) { - - for (StreamTask task : entry.getValue()) { - List streamProcesses = task.getStreamProcesses(); - for (StreamProcess process : streamProcesses) { - process.destroy(); - File file = new File(process.getFileName()); - if(file.exists()){ - file.delete(); - } - } - } - - while (entry.getValue().size()>10){ - entry.getValue().pollFirst(); - } - } - } - }, 60, 60, TimeUnit.SECONDS); - } - - public StreamTask startTask(String streamUrl) { - - /* - 当前有任务,则返回当前的任务 - */ - if (runningTasks.containsKey(streamUrl)) { - return runningTasks.get(streamUrl); - } - - /* - 当前无任务,则开启新任务 - */ - StreamTask taskInstance = new StreamTask(streamUrl); - if (runningTasks.putIfAbsent(streamUrl, taskInstance) == null) { - logger.info("streamUrl {} startTask {} ", streamUrl, taskInstance.getTaskId()); - startTask(streamUrl, taskInstance); - // runningTasks.put(streamUrl, taskInstance); - return taskInstance; - } else { - return runningTasks.get(streamUrl); - } - - } - - public StreamTask stopTask(String streamUrl) { - StreamTask currentStreamTask = runningTasks.remove(streamUrl); - - if (currentStreamTask!= null) { - currentStreamTask.setStopCommandTime(TimeUtils.formatDateToString(new Date())); - - if (!CollectionUtils.isEmpty(currentStreamTask.getStreamProcesses())) { - for (StreamProcess streamProcess : currentStreamTask.getStreamProcesses()) { - logger.info("streamUrl {} taskId {} destroy Process {}", streamUrl,currentStreamTask.getTaskId() ,streamProcess.getInnerProcessId()); - streamProcess.destroy(); - } - } - - try { - String outFileName = UUID.randomUUID().toString() + ".mp4"; - StreamProcess mergeProcess = processService.mergeStream(outFileName, currentStreamTask.getStreamProcesses()); - - if(Objects.nonNull(mergeProcess)){ - /** - * 注册进程 - */ - ProcessManager.registerProcess(mergeProcess.getInnerProcessId()); - currentStreamTask.setMergeProcess(mergeProcess); - - - logger.info("streamUrl {} taskId {} startMergeProcess {} ", streamUrl, currentStreamTask.getTaskId(), - mergeProcess.getInnerProcessId()); - - CompletableFuture future = mergeProcess.onExit(); - future.thenRun(() -> { - logger.info("streamUrl {} taskId {} MergeProcess {} Over", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId()); - mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date())); - ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); - for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){ - File file = new File(streamProcess.getFileName()); - if(file.exists()){ - file.delete(); - } - } - }).exceptionally(ex -> { - logger.error("streamUrl {} taskId {} MergeProcess {} exceptionally", streamUrl,currentStreamTask.getTaskId(), mergeProcess.getInnerProcessId(), ex); - mergeProcess.setStopTime(TimeUtils.formatDateToString(new Date())); - ProcessManager.unRegisterProcess(mergeProcess.getInnerProcessId()); - for(StreamProcess streamProcess :currentStreamTask.getStreamProcesses()){ - File file = new File(streamProcess.getFileName()); - if(file.exists()){ - file.delete(); - } - } - return null; - }); - currentStreamTask.setOutFileName(outFileName); - } - - -// if( historyTasks.containsKey(streamUrl)){ -// historyTasks.get(streamUrl).add(currentStreamTask); -// }else { -// historyTasks.put(streamUrl, new LinkedList<>()); -// historyTasks.get(streamUrl).offerLast(currentStreamTask); -// } - - if(Objects.nonNull(currentStreamTask.getMergeProcess())){ - if(Objects.nonNull(currentStreamTask.getMergeProcess().getFileName()) && - !currentStreamTask.getMergeProcess().getFileName().isEmpty()){ - try { -// -// scheduler.schedule(task, 3, TimeUnit.SECONDS); - executor.schedule(new Runnable() { - @Override - public void run() { - try { - System.out.println("FileName :"+currentStreamTask.getMergeProcess().getFileName()); - Process process = Runtime.getRuntime().exec(ffmpeg + " -i " + - currentStreamTask.getMergeProcess().getFileName()); - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream())); - String line; - while ((line = reader.readLine()) != null) { - System.out.println("Duration line: " + line); - if (line.contains("Duration")) { - String duration = line.split("Duration: ")[1].split(",")[0]; - System.out.println("Duration: " + duration.substring(0, 8)); - currentStreamTask.setDuration(duration.substring(0, 8)); - System.out.println("-----------------放入缓存-----------------"); - /** - * 放入缓存 - */ - if( historyTasks.containsKey(streamUrl)){ - historyTasks.get(streamUrl).add(currentStreamTask); - }else { - historyTasks.put(streamUrl, new LinkedList<>()); - historyTasks.get(streamUrl).offerLast(currentStreamTask); - } - break; - } - } - reader.close(); - }catch (Exception e){ - System.out.println(e.getMessage()); - } - } - },40, TimeUnit.SECONDS); - - } catch (Exception e) { - e.printStackTrace(); - } - } - - } - - return currentStreamTask; - } catch (Exception e) { - logger.info("taskId {} Stop exceptionally", streamUrl,e); - return null; - } - - }else { - if(historyTasks.containsKey(streamUrl)){ - return historyTasks.get(streamUrl).peekLast(); - }else { - return null; - } - } - } - - public StreamTask getLastTask(String streamUrl) { - StreamTask current = runningTasks.get(streamUrl); - if(Objects.nonNull(current)){ - return current; - }else { - if(historyTasks.containsKey(streamUrl)){ - return historyTasks.get(streamUrl).peekLast(); - } - return null; - } - } - - private void startTask(String streamUrl, StreamTask streamTask) { - - try { - /* - 任务不存在了,直接退出 - */ - if (streamTask.getStopCommandTime() != null) { - logger.info("streamUrl {} Task {} Stopped", streamUrl, streamTask.getTaskId()); - return; - } - - StreamProcess streamProcess = processService.recordStream(streamUrl); - /** - * 注册进程 - */ - ProcessManager.registerProcess(streamProcess.getInnerProcessId()); - streamTask.getStreamProcesses().add(streamProcess); - - logger.info("streamUrl {} taskId {} startProcess {} ", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId()); - - CompletableFuture future = streamProcess.onExit(); - future.thenRun(() -> { - logger.info("streamUrl {} taskId {} Process {} Over", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId()); - streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); - startTask(streamUrl, streamTask); - ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); - }).exceptionally(ex -> { - logger.info("streamUrl {} taskId {} Process {} Exceptionally", streamUrl, streamTask.getTaskId(),streamProcess.getInnerProcessId(),ex); - streamProcess.setStopTime(TimeUtils.formatDateToString(new Date())); - startTask(streamUrl, streamTask); - ProcessManager.unRegisterProcess(streamProcess.getInnerProcessId()); - return null; - }); - - } catch (IOException e) { - logger.info("streamUrl {} taskId {} startProcess Exception", streamUrl,streamTask.getTaskId(),e); - } - - } - - - public void stopAllTask(){ - runningTasks.forEach((key, value) -> { - stopTask(key); - }); - } -} diff --git a/src/main/java/com/tuoheng/steam/service/dos/DayRecord.java b/src/main/java/com/tuoheng/steam/service/dos/DayRecord.java deleted file mode 100644 index 0fc1a1c..0000000 --- a/src/main/java/com/tuoheng/steam/service/dos/DayRecord.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.tuoheng.steam.service.dos; - -import com.tuoheng.steam.util.FileUtil; - -import java.io.File; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; - -/** - * 代表日期的文件夹 - */ -public class DayRecord { - public String getRoot() { - return root; - } - - public void setRoot(String root) { - this.root = root; - } - - String root; - - String day; - - public String getDay() { - return day; - } - - public void setDay(String day) { - this.day = day; - } - - public void clear(){ - FileUtil.deleteFolder(root + File.separator + this.day); - } - - public List getStreamRecords() { - List result = new ArrayList<>(); - List fileList = FileUtil.readFiles(root + File.separator + day); - for(String fileName : fileList){ - StreamRecord record = new StreamRecord(); - record.setDayRecord(this); - record.setStreamId(fileName); - record.setStreamType(FileUtil.getStreamType(fileName)); - result.add(record); - } - return result; - } -} diff --git a/src/main/java/com/tuoheng/steam/service/dos/FlvRecord.java b/src/main/java/com/tuoheng/steam/service/dos/FlvRecord.java deleted file mode 100644 index b588700..0000000 --- a/src/main/java/com/tuoheng/steam/service/dos/FlvRecord.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.tuoheng.steam.service.dos; - -/** - * FLV文件 - */ -public class FlvRecord { - public String getFlv() { - return flv; - } - - public void setFlv(String flv) { - this.flv = flv; - } - - public StreamRecord getStream() { - return stream; - } - - public void setStream(StreamRecord stream) { - this.stream = stream; - } - - String flv; - StreamRecord stream; - - public String getStartTime() { - return extractNameWithoutExtension(flv); - } - - - public static String extractNameWithoutExtension(String fileName) { - // 找到最后一个 '.' 的位置 - int lastDotIndex = fileName.lastIndexOf('.'); - - // 如果文件名中没有 '.',则返回整个文件名 - if (lastDotIndex == -1) { - return fileName; - } - - // 截取从开头到最后一个 '.' 之前的部分 - return fileName.substring(0, lastDotIndex); - } - -} diff --git a/src/main/java/com/tuoheng/steam/service/dos/Mp4Record.java b/src/main/java/com/tuoheng/steam/service/dos/Mp4Record.java deleted file mode 100644 index 5a2fa1a..0000000 --- a/src/main/java/com/tuoheng/steam/service/dos/Mp4Record.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.tuoheng.steam.service.dos; - -/** - * MP4文件 - */ -public class Mp4Record { - - String mp4; - StreamRecord stream; - - - - public String getMp4() { - return mp4; - } - - public void setMp4(String mp4) { - this.mp4 = mp4; - } - - public StreamRecord getStream() { - return stream; - } - - public void setStream(StreamRecord stream) { - this.stream = stream; - } - - public String getStartTime() { - return extractNameWithoutExtension(mp4); - } - - public static String extractNameWithoutExtension(String fileName) { - // 找到最后一个 '.' 的位置 - int lastDotIndex = fileName.lastIndexOf('.'); - - // 如果文件名中没有 '.',则返回整个文件名 - if (lastDotIndex == -1) { - return fileName; - } - - // 截取从开头到最后一个 '.' 之前的部分 - return fileName.substring(0, lastDotIndex); - } -} diff --git a/src/main/java/com/tuoheng/steam/service/dos/StreamRecord.java b/src/main/java/com/tuoheng/steam/service/dos/StreamRecord.java deleted file mode 100644 index 0ced8b8..0000000 --- a/src/main/java/com/tuoheng/steam/service/dos/StreamRecord.java +++ /dev/null @@ -1,95 +0,0 @@ -package com.tuoheng.steam.service.dos; - -import com.tuoheng.steam.util.FileUtil; - -import java.io.File; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; - -/** - * 代表流的文件夹 - */ -public class StreamRecord { - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public DayRecord getDayRecord() { - return dayRecord; - } - - public void setDayRecord(DayRecord dayRecord) { - this.dayRecord = dayRecord; - } - - public StreamType getStreamType() { - return streamType; - } - - public void setStreamType(StreamType streamType) { - this.streamType = streamType; - } - - String streamId; - StreamType streamType; - DayRecord dayRecord; - - public List queryFlvRecords(){ - - List result = new ArrayList<>(); - List fileList = FileUtil.readFiles( - dayRecord.root+ File.separator + dayRecord.day + File.separator + streamId); - - for(String fileName : fileList){ - if(FileUtil.isFlv(fileName)){ - FlvRecord flvRecord = new FlvRecord(); - flvRecord.setFlv(fileName); - - flvRecord.setStream(this); - result.add(flvRecord); - } - } - - result.sort((o1, o2) -> { - // 提取时间戳并转换为 long 类型 - long timestamp1 = Long.parseLong(o1.getFlv().split("\\.")[0]); - long timestamp2 = Long.parseLong(o2.getFlv().split("\\.")[0]); - // 按时间戳升序排序 - return Long.compare(timestamp1, timestamp2); - }); - - return result; - } - - public List queryMp4Records(){ - - List result = new ArrayList<>(); - List fileList = FileUtil.readFiles( - dayRecord.root+ File.separator + dayRecord.day + File.separator + streamId); - for(String fileName : fileList){ - if(FileUtil.isMp4(fileName)){ - Mp4Record record = new Mp4Record(); - record.setStream(this); - record.setMp4(fileName); - result.add(record); - } - } - - result.sort((o1, o2) -> { - // 提取时间戳并转换为 long 类型 - long timestamp1 = Long.parseLong(o1.mp4.split("\\.")[0]); - long timestamp2 = Long.parseLong(o2.mp4.split("\\.")[0]); - // 按时间戳升序排序 - return Long.compare(timestamp1, timestamp2); - }); - - return result; - } -} diff --git a/src/main/java/com/tuoheng/steam/service/dos/StreamType.java b/src/main/java/com/tuoheng/steam/service/dos/StreamType.java deleted file mode 100644 index 6d5b146..0000000 --- a/src/main/java/com/tuoheng/steam/service/dos/StreamType.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.tuoheng.steam.service.dos; - -public enum StreamType { - Inner,Outer,LiveVideo -} diff --git a/src/main/java/com/tuoheng/steam/service/innerService/ProcessService.java b/src/main/java/com/tuoheng/steam/service/innerService/ProcessService.java deleted file mode 100644 index a3a6346..0000000 --- a/src/main/java/com/tuoheng/steam/service/innerService/ProcessService.java +++ /dev/null @@ -1,182 +0,0 @@ -package com.tuoheng.steam.service.innerService; - -import com.tuoheng.steam.dos.StreamProcess; -import com.tuoheng.steam.dos.ProcessType; -import com.tuoheng.steam.service.dos.FlvRecord; -import com.tuoheng.steam.util.ProcessManager; -import com.tuoheng.steam.util.TimeUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.stream.Collectors; - -@Service -public class ProcessService { - - @Value("${srs.splitPath}") - private String splitPath; - - @Value("${ffmpeg}") - private String ffmpeg; - - @Value("${srs.targetPath}") - private String targetPath; - - @Value("${recordPath}") - private String recordPath; - - private static final Logger logger = LoggerFactory.getLogger(ProcessService.class); - - /** - * 任务池 - */ - ExecutorService loggingService = Executors.newCachedThreadPool(); - - public void mergeMp4(FlvRecord flvRecord) throws IOException, ExecutionException, InterruptedException { - - String command = ffmpeg + " -i "+ recordPath+ File.separator + - flvRecord.getStream().getDayRecord().getDay() + File.separator + - flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv() + " -c:v copy -c:a aac " + recordPath+ File.separator + - flvRecord.getStream().getDayRecord().getDay() + File.separator + - flvRecord.getStream().getStreamId() + File.separator +flvRecord.getStartTime() + ".mp4"; - - - ProcessBuilder pb = new ProcessBuilder(command.split(" ")); - pb.redirectErrorStream(true); - Process process = pb.start(); - - loggingService.execute(new Runnable() { - @Override - public void run() { - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = reader.readLine()) != null) { - logger.info("mergeMp4-------- {}",line); - } - } catch (IOException e) { - logger.info("mergeMp4-------- Over",e); - } - logger.info("mergeMp4 Over"); - File delete = new File(recordPath+ File.separator + - flvRecord.getStream().getDayRecord().getDay() + File.separator + - flvRecord.getStream().getStreamId() + File.separator + flvRecord.getFlv()); - if(delete.exists()){ - delete.delete(); - } - } - }); - - CompletableFuture future = process.onExit(); - // 阻塞等待进程结束 - Process completedProcess = future.get(); - - // 检查进程是否成功结束 - if (completedProcess.exitValue() == 0) { - logger.info("进程成功结束!"); - /** - * 删除数据 - */ - } else { - logger.info("进程失败,退出码 {} " ,completedProcess.exitValue()); - } - - } - - public StreamProcess recordStream(String streamUrl) throws IOException { - - String recordFileName = splitPath+'/'+UUID.randomUUID().toString() +".ts"; - String command = String.format( - ffmpeg+ " -i %s -c copy -f mpegts -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 2 %s", - streamUrl, recordFileName); - logger.info("recordStream {}", command); - ProcessBuilder pb = new ProcessBuilder(command.split(" ")); - pb.redirectErrorStream(true); - Process process = pb.start(); - - loggingService.execute(new Runnable() { - @Override - public void run() { - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = reader.readLine()) != null) { -// logger.info("recordStream-------- {}",line); - } - } catch (IOException e) { - logger.info("recordStream-------- Over"); - } - } - }); - - return new StreamProcess(process,recordFileName, ProcessType.RECORD); - - }; - - public StreamProcess mergeStream(String outFileName, List streamProcesses) throws IOException { - - List fileList = new ArrayList<>(); - for(StreamProcess streamProcess : streamProcesses){ - File newFile = new File(streamProcess.getFileName()); - if(newFile.exists()){ - fileList.add(streamProcess.getFileName()); - } - } - - - if(fileList.isEmpty()){ - return null; - } - String command = ""; - outFileName = targetPath + "/" +outFileName; - if(fileList.size() == 1){ - command = String.format( - ffmpeg+ " -i %s -c copy %s", - fileList.get(0), outFileName); - }else { - String filePaths = String.join("|", fileList); - command = String.format( - ffmpeg+ " -i \"concat:%s\" -c copy %s", - filePaths, outFileName); - } - logger.info("mergeStream {}", command); - ProcessBuilder pb = new ProcessBuilder(command.split(" ")); - pb.redirectErrorStream(true); - Process process = pb.start(); - - loggingService.execute(new Runnable() { - @Override - public void run() { - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = reader.readLine()) != null) { - logger.info("mergeStream-------- {}",line); - } - } catch (IOException e) { - logger.info("mergeStream-------- Over"); - } - } - }); - - - return new StreamProcess(process,outFileName,ProcessType.MERGE); - - }; - - -} diff --git a/src/main/java/com/tuoheng/steam/util/FileUtil.java b/src/main/java/com/tuoheng/steam/util/FileUtil.java deleted file mode 100644 index 12ec691..0000000 --- a/src/main/java/com/tuoheng/steam/util/FileUtil.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.tuoheng.steam.util; - -import com.tuoheng.steam.service.dos.DayRecord; -import com.tuoheng.steam.service.dos.StreamType; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Stream; - -public class FileUtil { - - public static void deleteFolder(String stringPath) { - - try { - Path path = Paths.get(stringPath); - if (Files.exists(path)) { - try (Stream pathStream = Files.walk(path)) { - pathStream.sorted((p1, p2) -> -p1.compareTo(p2)) // 从子文件/文件夹开始删除 - .forEach(p -> { - try { - Files.delete(p); // 删除文件或文件夹 - } catch (IOException e) { - System.err.println("无法删除: " + p + ", 原因: " + e.getMessage()); - } - }); - } - } else { - System.out.println("文件夹不存在: " + path); - } - }catch (Exception e){ - e.printStackTrace(); - } - - } - - public static List readFiles(String stringPath) { - - List list = new ArrayList<>(); - - File directory = new File(stringPath); - - // 检查目录是否存在且是一个目录 - if (directory.exists() && directory.isDirectory()) { - // 获取目录下的所有子文件和文件夹 - File[] files = directory.listFiles(); - - if (files != null) { - for (File file : files) { - list.add(file.getName()); - } - } else { - - } - } else { - - } - return list; - } - - public static boolean isTempFile(String fileName) { - String[] result = fileName.split("\\."); - return result.length == 3; - } - - public static boolean isFlv(String fileName) { - return fileName.endsWith("flv"); - } - - public static boolean isMp4(String fileName) { - return fileName.endsWith("mp4"); - } - - public static StreamType getStreamType(String fileName) { - if(fileName.endsWith("outer")){ - return StreamType.Outer; - }else if(fileName.endsWith("inner")){ - return StreamType.Inner; - }else{ - return StreamType.LiveVideo; - } - } - -} diff --git a/src/main/java/com/tuoheng/steam/util/ProcessManager.java b/src/main/java/com/tuoheng/steam/util/ProcessManager.java deleted file mode 100644 index e8740c6..0000000 --- a/src/main/java/com/tuoheng/steam/util/ProcessManager.java +++ /dev/null @@ -1,105 +0,0 @@ -package com.tuoheng.steam.util; - -import org.springframework.stereotype.Service; - -import java.io.*; -import java.util.*; - -@Service -public class ProcessManager { - - private static final String DELIMITER = "="; - - static Map runningProcessIds = new HashMap<>(); - - static String pidPath ="pid.txt"; - - static { - runningProcessIds = loadFromFile(pidPath); - for(Map.Entry entry : runningProcessIds.entrySet()){ - try { - killProcessByPID(entry.getKey()); - }catch (Exception e){ - e.printStackTrace(); - } - } - runningProcessIds.clear(); - writeToFile(runningProcessIds); - } - - public static void registerProcess(Long process) { - runningProcessIds.put(process,TimeUtils.formatDateToString(new Date())); - writeToFile(runningProcessIds); - } - - public static void unRegisterProcess(Long process) { - runningProcessIds.remove(process); - writeToFile(runningProcessIds); - } - - public static void writeToFile(Map map){ - try (BufferedWriter writer = new BufferedWriter(new FileWriter(pidPath))) { - for (Map.Entry entry : map.entrySet()) { - writer.write(entry.getKey() + DELIMITER + entry.getValue()); - writer.newLine(); - } - }catch (IOException ignored){} - } - - public static void appendToFile(Long pid,String strDate) { - - try (BufferedWriter writer = new BufferedWriter(new FileWriter(pidPath,true))) { - writer.write(pid+ DELIMITER + strDate); - writer.newLine(); - }catch (IOException ignored) {} - } - - public static Map loadFromFile(String filePath){ - Map map = new LinkedHashMap<>(); - File file = new File(filePath); - try { - if(!file.exists()){ - file.createNewFile(); - } - }catch (Exception e){ - e.printStackTrace(); - } - try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { - String line; - while ((line = reader.readLine()) != null) { - String[] parts = line.split(ProcessManager.DELIMITER, 2); // Split into key and value - if (parts.length == 2) { - Long key = Long.parseLong(parts[0].trim()); - String value = parts[1].trim(); - map.put(key, value); - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return map; - } - - public static void killProcessByPID(long pid) throws IOException, InterruptedException { - String os = System.getProperty("os.name").toLowerCase(); - ProcessBuilder processBuilder; - - if (os.contains("win")) { - // Windows: Use taskkill command - processBuilder = new ProcessBuilder("cmd", "/c", "taskkill /PID " + pid + " /F"); - } else { - // Linux/Unix/macOS: Use kill command - processBuilder = new ProcessBuilder("bash", "-c", "kill -9 " + pid); - } - - Process process = processBuilder.start(); - int exitCode = process.waitFor(); - - if (exitCode == 0) { - System.out.println("Process with PID " + pid + " terminated successfully."); - } else { - System.err.println("Failed to terminate process with PID " + pid + ". Exit code: " + exitCode); - } - } - -} diff --git a/src/main/java/com/tuoheng/steam/util/TimeUtils.java b/src/main/java/com/tuoheng/steam/util/TimeUtils.java deleted file mode 100644 index 8456ec9..0000000 --- a/src/main/java/com/tuoheng/steam/util/TimeUtils.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.tuoheng.steam.util; - -import java.text.SimpleDateFormat; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; -import java.util.Date; - -public class TimeUtils { - - private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - - public static String formatDateToString(Date date) { - SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); - return sdf.format(date); - } - - private static final DateTimeFormatter DAY_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd"); - - public static boolean isBefore(String dateStr,Integer days) { - - // 将字符串解析为 LocalDate 对象 - LocalDate date = LocalDate.parse(dateStr, DAY_FORMAT); - // 获取当前日期 - LocalDate currentDate = LocalDate.now(); - // 判断日期是否在 N 天之前 - if (date.isBefore(currentDate.minusDays(days))) { - return true; - } else { - return false; - } - } -} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index df68d76..f01ecf6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,22 +1,5 @@ spring.application.name=demo -server.port = 8989 -srs.splitPath=/data/java/srs/stream_server/temp -srs.targetPath=/data/java/srs/srs/trunk/objs/nginx/html -ffmpeg=/data/ffmpeg/bin/ffmpeg -recordPath=/data/java/srs/srs/trunk/objs/nginx/html/record -livedates=8 +server.port = 7788 -#server.port = 8080 -#srs.splitPath=/Users/sunpeng/workspace/stream/temp -#srs.targetPath=/Users/sunpeng/workspace/stream/html -#ffmpeg=ffmpeg -#recordPath=/Users/sunpeng/workspace/stream/record -#livedates=7 - -minio.oss.endpoint: https://minio-jndsj.t-aaron.com:2443 -minio.oss.accessKeyId: PJM0c2qlauoXv5TMEHm2 -minio.oss.accessKeySecret: Wr69Dm3ZH39M3GCSeyB3eFLynLPuGCKYfphixZuI -minio.oss.bucketName: th-airport -minio.oss.filedir: prodFile -minio.oss.dajiangName: dajiang -minio.oss.dajiangPassword: dajiang2025 \ No newline at end of file +spring.cloud.consul.host=localhost +spring.cloud.consul.port=8500