From a0799c9cb97d27a4c315551f9221572d30320f55 Mon Sep 17 00:00:00 2001 From: wangwei <305939031@qq.com> Date: Wed, 27 Sep 2023 15:06:34 +0800 Subject: [PATCH] =?UTF-8?q?ffmpeg=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 32 ++ LICENSE | 201 ++++++++ README.md | 16 + loadCmd.properties | 10 + loadFFmpeg.properties | 13 + pom.xml | 312 ++++++++++++ .../bluesbruce/ffch/CommandManager.java | 114 +++++ .../bluesbruce/ffch/CommandManagerImpl.java | 273 ++++++++++ .../ffch/callback/EventCallBack.java | 18 + .../ffch/callback/EventCallBackType.java | 13 + .../callback/worker/EventMsgNetWorker.java | 89 ++++ .../ffch/commandbuidler/CommandAssembly.java | 20 + .../commandbuidler/CommandAssemblyImpl.java | 82 +++ .../ffch/commandbuidler/CommandBuidler.java | 55 ++ .../commandbuidler/CommandBuidlerFactory.java | 17 + .../commandbuidler/DefaultCommandBuidler.java | 69 +++ .../bluesbruce/ffch/config/ProgramConfig.java | 62 +++ .../config/defaultFFmpegConfig.properties | 15 + .../bluesbruce/ffch/data/CommandTasker.java | 59 +++ .../github/bluesbruce/ffch/data/TaskDao.java | 46 ++ .../bluesbruce/ffch/data/TaskDaoImpl.java | 69 +++ .../bluesbruce/ffch/data/TaskerEventMsg.java | 41 ++ .../ffch/handler/DefaultOutHandlerMethod.java | 47 ++ .../ffch/handler/KeepAliveHandler.java | 81 +++ .../bluesbruce/ffch/handler/OutHandler.java | 116 +++++ .../ffch/handler/OutHandlerMethod.java | 21 + .../bluesbruce/ffch/handler/TaskHandler.java | 45 ++ .../ffch/handler/TaskHandlerImpl.java | 69 +++ .../bluesbruce/ffch/loadFFmpeg.properties | 13 + .../com/github/bluesbruce/ffch/test/Test.java | 200 ++++++++ .../bluesbruce/ffch/util/CommonUtil.java | 70 +++ .../github/bluesbruce/ffch/util/ExecUtil.java | 110 ++++ .../bluesbruce/ffch/util/PropertiesUtil.java | 144 ++++++ .../bluesbruce/ffch/util/ReflectUtil.java | 166 ++++++ .../github/bluesbruce/spring/Application.java | 34 ++ .../bluesbruce/spring/config/CmdParam.java | 74 +++ .../spring/config/Readproperties.java | 41 ++ .../mqttService/HttpURLConnectionUtil.java | 295 +++++++++++ .../consumer/MqttConsumerCallBack.java | 73 +++ .../mqttService/send/MqttProviderConfig.java | 99 ++++ .../spring/service/FFrtmpServer.java | 64 +++ .../spring/service/MqttLiveHandle.java | 34 ++ .../spring/service/RtmpLiveService.java | 222 ++++++++ .../bluesbruce/spring/utils/SnowId.java | 137 +++++ .../bluesbruce/spring/utils/SpringUtil.java | 60 +++ .../bluesbruce/spring/web/SubController.java | 34 ++ src/main/resources/application.properties | 71 +++ src/main/resources/config/mybatis-config.xml | 61 +++ src/main/resources/log4j2.xml | 31 ++ src/main/resources/static/index.html | 10 + src/main/webapp/sub.jsp | 476 ++++++++++++++++++ 51 files changed, 4524 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 loadCmd.properties create mode 100644 loadFFmpeg.properties create mode 100644 pom.xml create mode 100644 src/main/java/com/github/bluesbruce/ffch/CommandManager.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/CommandManagerImpl.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/callback/EventCallBack.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/callback/EventCallBackType.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/callback/worker/EventMsgNetWorker.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssembly.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssemblyImpl.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidler.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidlerFactory.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/commandbuidler/DefaultCommandBuidler.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/config/ProgramConfig.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/config/defaultFFmpegConfig.properties create mode 100644 src/main/java/com/github/bluesbruce/ffch/data/CommandTasker.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/data/TaskDao.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/data/TaskDaoImpl.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/data/TaskerEventMsg.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/handler/KeepAliveHandler.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/handler/OutHandlerMethod.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/handler/TaskHandler.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/handler/TaskHandlerImpl.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/loadFFmpeg.properties create mode 100644 src/main/java/com/github/bluesbruce/ffch/test/Test.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/util/CommonUtil.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/util/ExecUtil.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/util/PropertiesUtil.java create mode 100644 src/main/java/com/github/bluesbruce/ffch/util/ReflectUtil.java create mode 100644 src/main/java/com/github/bluesbruce/spring/Application.java create mode 100644 src/main/java/com/github/bluesbruce/spring/config/CmdParam.java create mode 100644 src/main/java/com/github/bluesbruce/spring/config/Readproperties.java create mode 100644 src/main/java/com/github/bluesbruce/spring/mqttService/HttpURLConnectionUtil.java create mode 100644 src/main/java/com/github/bluesbruce/spring/mqttService/consumer/MqttConsumerCallBack.java create mode 100644 src/main/java/com/github/bluesbruce/spring/mqttService/send/MqttProviderConfig.java create mode 100644 src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java create mode 100644 src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java create mode 100644 src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java create mode 100644 src/main/java/com/github/bluesbruce/spring/utils/SnowId.java create mode 100644 src/main/java/com/github/bluesbruce/spring/utils/SpringUtil.java create mode 100644 src/main/java/com/github/bluesbruce/spring/web/SubController.java create mode 100644 src/main/resources/application.properties create mode 100644 src/main/resources/config/mybatis-config.xml create mode 100644 src/main/resources/log4j2.xml create mode 100644 src/main/resources/static/index.html create mode 100644 src/main/webapp/sub.jsp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b4bde82 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +# maven ignore +target/ +*.jar +*.war +*.zip +*.tar +*.tar.gz + +# eclipse ignore +.settings/ +.project +.classpath + +# idea ignore +.idea/ +*.ipr +*.iml +*.iws + +# temp ignore +*.log +*.cache +*.diff +*.patch +*.tmp + +# system ignore +.DS_Store +Thumbs.db + +#sonar ignore +.sonar diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..56d2f04 --- /dev/null +++ b/README.md @@ -0,0 +1,16 @@ +# SpringBoot集成H2 + +1. H2数据库是一个开源的关系型数据库。H2采用java语言编写,不受平台的限制,同时支持网络版和嵌入式版本,有比较好的兼容性,支持相当标准的sql标准 +2. 提供JDBC、ODBC访问接口,提供了非常友好的基于web的数据库管理界面 + +> 官网:http://www.h2database.com/ + + +在``application.properties``配置了H2的管理后台入口,``http://localhost:8081/h2console`` +``JDBC URL``、``User Name``和``Password``参考配置文件输入。 + +执行``test\resource``下的sql脚本。 + +两个测试接口,get请求 +- http://localhost:8081/getList.json?name=王 +- http://localhost:8081/getOne.json?id=6 \ No newline at end of file diff --git a/loadCmd.properties b/loadCmd.properties new file mode 100644 index 0000000..f993549 --- /dev/null +++ b/loadCmd.properties @@ -0,0 +1,10 @@ +#ַegƵַ +pushUrl=rtmp://192.168.10.101:19350/rlive/stream_12?sign=LeL5Wchx +pushUrl2=rtmp://192.168.10.101:19350/rlive/stream_17?sign=64FIaU8X +#ַegĵַ +playUrl=http://192.168.10.101:18000/flv/live/34020000001110000001_34020000001320000099_0200000099.flv +#̶ͨ 1 ȡͨ 2 +type=1 + +mqttUrl=tcp://106.15.120.154 +mqttTopic=/v1/123987/rtmp/live diff --git a/loadFFmpeg.properties b/loadFFmpeg.properties new file mode 100644 index 0000000..e2dbd79 --- /dev/null +++ b/loadFFmpeg.properties @@ -0,0 +1,13 @@ +#ffmpeg执行路径,一般为ffmpeg的安装目录,该路径只能是目录,不能为具体文件路径,否则会报错 +path=Z://ffmpeg/bin/ +#存放任务的默认Map的初始化大小 +size=10 +#事件回调通知接口地址 +callback=http://127.0.0.1/callback +#网络超时设置(毫秒) +timeout=300 + +#开启保活线程 +keepalive=false +#是否输出debug消息 +debug=true \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..9bb2ded --- /dev/null +++ b/pom.xml @@ -0,0 +1,312 @@ + + + com.github.bluesbruce + 4.0.0 + ffch-test + 1.0.0 + ${pom.package} + H2 Test + SpringBoot2集成H2 + ${project.url} + + + + + BBF + + architect + developer + + +8 + + + + + + + The Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + scm:git@gitee.com:bbfbbf/h2-test.git + scm:git@gitee.com:bbfbbf/h2-test.git + ${project.url} + ${project.version} + + + + UTF-8 + UTF-8 + https://gitee.com/bbfbbf/h2-test + + UTF-8 + UTF-8 + zh_CN + 1.8 + + + FFRTMP-test + backup + + 5M + + INFO + + + 4.13 + + + 2.4.3 + 2.1.4 + 1.2.5 + 2.1.212 + + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring.boot.version} + pom + import + + + + + + + junit + junit + ${junit.version} + test + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-logging + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-tomcat + + + + + + + mysql + mysql-connector-java + runtime + + + com.alibaba + fastjson + 1.2.58 + + + + + org.springframework.boot + spring-boot-starter-log4j2 + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + ${mybatis-spring.boot.version} + + + + com.alibaba + druid-spring-boot-starter + ${druid.boot.version} + + + + com.h2database + h2 + ${h2.version} + + + + + + + + + org.apache.tomcat.embed + tomcat-embed-jasper + + + + + javax.servlet + jstl + compile + + + + javax.servlet.jsp + javax.servlet.jsp-api + 2.3.1 + + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-stream + + + org.springframework.integration + spring-integration-mqtt + + + + com.baomidou + mybatis-plus-boot-starter + 3.1.0 + + + + org.projectlombok + lombok + true + + + + + org.springframework.boot + spring-boot-devtools + true + true + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.0 + + + true + + -Dfile.encoding=UTF-8 + + + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring.boot.version} + + + true + true + + + + + repackage + + + + + + org.apache.maven.plugins + maven-war-plugin + 3.2.2 + + + false + + + + org.apache.maven.plugins + maven-compiler-plugin + + 7 + 7 + + + + + + + + scr/main/webapp + + META-INF/resources + + + *.* + + + + src/main/resources + + **/*.properties + config/** + log4j2.xml + + true + + + src/main/resources + + mapper/** + + false + + + + + + jar + + true + + + war + ${project.artifactId}-${project.version} + d:/FFrtmp-test + + + + + org.springframework.boot + spring-boot-starter-undertow + + + + + \ No newline at end of file diff --git a/src/main/java/com/github/bluesbruce/ffch/CommandManager.java b/src/main/java/com/github/bluesbruce/ffch/CommandManager.java new file mode 100644 index 0000000..dfb5055 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/CommandManager.java @@ -0,0 +1,114 @@ +package com.github.bluesbruce.ffch; + +import com.github.bluesbruce.ffch.commandbuidler.CommandAssembly; +import com.github.bluesbruce.ffch.commandbuidler.CommandBuidler; +import com.github.bluesbruce.ffch.config.ProgramConfig; +import com.github.bluesbruce.ffch.data.CommandTasker; +import com.github.bluesbruce.ffch.data.TaskDao; +import com.github.bluesbruce.ffch.handler.TaskHandler; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.Map; + +import static com.github.bluesbruce.ffch.util.PropertiesUtil.load; + +/** + * FFmpeg命令操作管理器,可执行FFmpeg命令/停止/查询任务信息 + * + * @author eguid + * @since jdk1.7 + * @version 2016年10月29日 + */ +@Component +public interface CommandManager { + + public static final ProgramConfig config=load("/loadFFmpeg.properties", ProgramConfig.class); + /** + * 注入自己实现的持久层 + * + * @param taskDao + */ + public void setTaskDao(TaskDao taskDao); + + /** + * 注入ffmpeg命令处理器 + * + * @param taskHandler + */ + public void setTaskHandler(TaskHandler taskHandler); + + /** + * 注入ffmpeg命令组装器 + * + * @param commandAssembly + */ + public void setCommandAssembly(CommandAssembly commandAssembly); + + /** + * 通过命令发布任务(默认命令前不加FFmpeg路径) + * + * @param id - 任务标识 + * @param command - FFmpeg命令 + * @return + */ + public String start(String id, String command); + /** + * 通过命令发布任务 + * @param id - 任务标识 + * @param commond - FFmpeg命令 + * @param hasPath - 命令中是否包含FFmpeg执行文件的绝对路径 + * @return + */ + public String start(String id, String commond, boolean hasPath); + + /** + * 通过流式命令构建器发布任务 + * @param commandBuidler + * @return + */ + public String start(String id, CommandBuidler commandBuidler); + + /** + * 通过组装命令发布任务 + * + * @param assembly + * -组装命令(详细请参照readme文档说明) + * @return + */ + public String start(Map assembly); + + /** + * 停止任务 + * + * @param id + * @return + */ + public boolean stop(String id); + + /** + * 停止全部任务 + * + * @return + */ + public int stopAll(); + + /** + * 通过id查询任务信息 + * + * @param id + */ + public CommandTasker query(String id); + + /** + * 查询全部任务信息 + * + */ + public Collection queryAll(); + + /** + * 销毁一些后台资源和保活线程 + */ + public void destory(); + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/CommandManagerImpl.java b/src/main/java/com/github/bluesbruce/ffch/CommandManagerImpl.java new file mode 100644 index 0000000..914cfa0 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/CommandManagerImpl.java @@ -0,0 +1,273 @@ +package com.github.bluesbruce.ffch; + +import com.github.bluesbruce.ffch.commandbuidler.CommandAssembly; +import com.github.bluesbruce.ffch.commandbuidler.CommandAssemblyImpl; +import com.github.bluesbruce.ffch.commandbuidler.CommandBuidler; +import com.github.bluesbruce.ffch.data.CommandTasker; +import com.github.bluesbruce.ffch.data.TaskDao; +import com.github.bluesbruce.ffch.data.TaskDaoImpl; +import com.github.bluesbruce.ffch.handler.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; + +/** + * FFmpeg命令操作管理器 + * + * @author eguid + * @since jdk1.7 + * @version 2017年10月13日 + */ +@Slf4j +@Service +public class CommandManagerImpl implements CommandManager { + /** + * 任务持久化器 + */ + private TaskDao taskDao = null; + /** + * 任务执行处理器 + */ + private TaskHandler taskHandler = null; + /** + * 命令组装器 + */ + private CommandAssembly commandAssembly = null; + /** + * 任务消息处理器 + */ + private OutHandlerMethod ohm = null; + + /** + * 保活处理器 + */ + private KeepAliveHandler keepAliveHandler=null; + + /** + * 全部默认初始化,自动查找配置文件 + */ + public CommandManagerImpl() { + this(null); + } + + /** + * 指定任务池大小的初始化,其他使用默认 + * @param size + */ + public CommandManagerImpl(Integer size) { + init(size); + } + + /** + * 初始化 + * @param taskDao + * @param taskHandler + * @param commandAssembly + * @param ohm + * @param size + */ + public CommandManagerImpl(TaskDao taskDao, TaskHandler taskHandler, CommandAssembly commandAssembly, OutHandlerMethod ohm,Integer size) { + super(); + this.taskDao = taskDao; + this.taskHandler = taskHandler; + this.commandAssembly = commandAssembly; + this.ohm = ohm; + init(size); + } + + /** + * 初始化,如果几个处理器未注入,则使用默认处理器 + * + * @param size + */ + public void init(Integer size) { + if (config == null) { + log.info("配置文件加载失败!配置文件不存在或配置错误"); + return; + } + boolean iskeepalive=true; + if (size == null) { + size = config.getSize() == null ? 10 : config.getSize(); + iskeepalive=config.isKeepalive(); + } + + if (this.ohm == null) { + this.ohm = new DefaultOutHandlerMethod(); + } + + if (this.taskDao == null) { + this.taskDao = new TaskDaoImpl(size); + //初始化保活线程 + if(iskeepalive) { + keepAliveHandler = new KeepAliveHandler(taskDao); + keepAliveHandler.start(); + } + } + + if (this.taskHandler == null) { + this.taskHandler = new TaskHandlerImpl(this.ohm); + } + + if (this.commandAssembly == null) { + this.commandAssembly = new CommandAssemblyImpl(); + } + + } + + public void setTaskDao(TaskDao taskDao) { + this.taskDao = taskDao; + } + + public void setTaskHandler(TaskHandler taskHandler) { + this.taskHandler = taskHandler; + } + + public void setCommandAssembly(CommandAssembly commandAssembly) { + this.commandAssembly = commandAssembly; + } + + public void setOhm(OutHandlerMethod ohm) { + this.ohm = ohm; + } + + /** + * 是否已经初始化 + * + * @param 如果未初始化时是否初始化 + * @return + */ + public boolean isInit(boolean b) { + boolean ret = this.ohm == null || this.taskDao == null || this.taskHandler == null|| this.commandAssembly == null; + if (ret && b) { + init(null); + } + return ret; + } + + @Override + public String start(String id, String command) { + return start(id, command, false); + } + + @Override + public String start(String id, String command, boolean hasPath) { + if (isInit(true)) { + log.info("执行失败,未进行初始化或初始化失败!"); + return null; + } + if (id != null && command != null) { + CommandTasker tasker = taskHandler.process(id, hasPath ? command : config.getPath() + command); + if (tasker != null) { + int ret = taskDao.add(tasker); + if (ret > 0) { + return tasker.getId(); + } else { + // 持久化信息失败,停止处理 + taskHandler.stop(tasker.getProcess(), tasker.getThread()); + if (config.isDebug()) + log.info("持久化失败,停止任务!"); + } + } + } + return null; + } + + @Override + public String start(Map assembly) { + // ffmpeg环境是否配置正确 + if (checkConfig()) { + log.info("配置未正确加载,无法执行"); + return null; + } + // 参数是否符合要求 + if (assembly == null || assembly.isEmpty() || !assembly.containsKey("appName")) { + log.info("参数不正确,无法执行"); + return null; + } + String appName = (String) assembly.get("appName"); + if (appName != null && "".equals(appName.trim())) { + log.info("appName不能为空"); + return null; + } + assembly.put("ffmpegPath", config.getPath() + "ffmpeg"); + String command = commandAssembly.assembly(assembly); + if (command != null) { + return start(appName, command, true); + } + + return null; + } + + @Override + public String start(String id,CommandBuidler commandBuidler) { + // ffmpeg环境是否配置正确 + if (checkConfig()) { + log.info("配置未正确加载,无法执行"); + return null; + } + String command =commandBuidler.get(); + if (command != null) { + return start(id, command, true); + } + return null; + } + + private boolean checkConfig() { + return config == null; + } + + @Override + public boolean stop(String id) { + if (id != null && taskDao.isHave(id)) { + if (config.isDebug()) + log.info("正在停止任务:" + id); + CommandTasker tasker = taskDao.get(id); + if (taskHandler.stop(tasker.getProcess(), tasker.getThread())) { + taskDao.remove(id); + log.info("停止任务完毕:" + id); + return true; + } + } + log.info("停止任务失败!id=" + id); + return false; + } + + @Override + public int stopAll() { + Collection list = taskDao.getAll(); + Iterator iter = list.iterator(); + CommandTasker tasker = null; + int index = 0; + while (iter.hasNext()) { + tasker = iter.next(); + if (taskHandler.stop(tasker.getProcess(), tasker.getThread())) { + taskDao.remove(tasker.getId()); + index++; + } + } + if (config.isDebug()) + log.info("停止了" + index + "个任务!"); + return index; + } + + @Override + public CommandTasker query(String id) { + return taskDao.get(id); + } + + @Override + public Collection queryAll() { + return taskDao.getAll(); + } + + @Override + public void destory() { + if(keepAliveHandler!=null) { + //安全停止保活线程 + keepAliveHandler.interrupt(); + } + } +} diff --git a/src/main/java/com/github/bluesbruce/ffch/callback/EventCallBack.java b/src/main/java/com/github/bluesbruce/ffch/callback/EventCallBack.java new file mode 100644 index 0000000..ae8b352 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/callback/EventCallBack.java @@ -0,0 +1,18 @@ +package com.github.bluesbruce.ffch.callback; + +import com.github.bluesbruce.ffch.data.CommandTasker; + +/** + * 事件回调 + * @author eguid + * + */ +public interface EventCallBack { + + /** + * 命令行执行开始事件 + * @param t -事件类型 + * @param tasker -任务信息 + */ + boolean callback(EventCallBackType t, CommandTasker tasker); +} diff --git a/src/main/java/com/github/bluesbruce/ffch/callback/EventCallBackType.java b/src/main/java/com/github/bluesbruce/ffch/callback/EventCallBackType.java new file mode 100644 index 0000000..c88a11d --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/callback/EventCallBackType.java @@ -0,0 +1,13 @@ +package com.github.bluesbruce.ffch.callback; + +/** + * 事件回调类型 + * @author eguid + * + */ +public enum EventCallBackType { + exec,//执行命令后通知 + stop,//停止命令后通知 + interrupt,//进程中断后通知 + heartbeat,//主进程存活心跳 +} diff --git a/src/main/java/com/github/bluesbruce/ffch/callback/worker/EventMsgNetWorker.java b/src/main/java/com/github/bluesbruce/ffch/callback/worker/EventMsgNetWorker.java new file mode 100644 index 0000000..e12a66c --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/callback/worker/EventMsgNetWorker.java @@ -0,0 +1,89 @@ +package com.github.bluesbruce.ffch.callback.worker; + +import com.github.bluesbruce.ffch.CommandManager; +import com.github.bluesbruce.ffch.callback.EventCallBack; +import com.github.bluesbruce.ffch.callback.EventCallBackType; +import com.github.bluesbruce.ffch.data.CommandTasker; +import com.github.bluesbruce.ffch.data.TaskerEventMsg; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.URL; +import java.net.URLConnection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 事件消息独立发送线程 + * + * @author eguid + * + */ +@Slf4j +public class EventMsgNetWorker extends Thread implements EventCallBack{ + + protected static Queue queue = null;// 一个事件消息队列,发送失败的事件消息将会进入队列队尾等待下次再次发送 + + // 一个网络库,用于快速发送http消息 + private int timeout = 300;// 默认300毫秒 + + public EventMsgNetWorker(int timeout) { + super(); + this.timeout = timeout; + queue = new ConcurrentLinkedQueue<>(); + } + + @Override + public void run() { + for (;;) { + try { + while (queue.peek() != null) { + TaskerEventMsg t = queue.poll(); + // 借助网络库发送该消息 + String url = CommandManager.config.getCallback(); + if (reqGET(url)) { + log.info("发送成功"); + } else { + log.info("发送失败"); + // 发送失败的事件消息将会进入队列队尾等待下次再次发送 + queue.offer(t); + } + } + } catch (Exception e) { + + } + } + + } + + /** + * 发送get请求 + */ + private boolean reqGET(String url) { + URL realUrl; +// PrintWriter out = null; + try { + realUrl = new URL(url); + // 打开和URL之间的连接 + URLConnection connection = realUrl.openConnection(); + // 设置通用的请求属性 + connection.setUseCaches(false); + connection.setConnectTimeout(timeout); + connection.setRequestProperty("accept", "*/*"); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)"); + connection.setDoOutput(false); + connection.setDoInput(false); + connection.connect(); + return true; + } catch (IOException e) { + return false; + } + } + + @Override + public boolean callback(EventCallBackType ecbt, CommandTasker tasker) { + return queue.add(new TaskerEventMsg(ecbt, tasker)); + } + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssembly.java b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssembly.java new file mode 100644 index 0000000..b98747e --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssembly.java @@ -0,0 +1,20 @@ +package com.github.bluesbruce.ffch.commandbuidler; + +import java.util.Map; + +/** + * 命令组装器接口 + * @author eguid + * @since jdk1.7 + * @version 2016年10月29日 + */ +public interface CommandAssembly { + /** + * 将参数转为ffmpeg命令 + * @param paramMap + * @return + */ + public String assembly(Map paramMap); + + public String assembly(); +} diff --git a/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssemblyImpl.java b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssemblyImpl.java new file mode 100644 index 0000000..205b139 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssemblyImpl.java @@ -0,0 +1,82 @@ +package com.github.bluesbruce.ffch.commandbuidler; + +import java.util.Map; + +/** + * 默认命令组装器实现 + * @author eguid + * @since jdk1.7 + * @version 2016年10月29日 + */ +public class CommandAssemblyImpl implements CommandAssembly{ + /** + * + * @param map + * -要组装的map + * @param id + * -返回参数:id + * @param id + * -返回参数:组装好的命令 + * @return + */ + public String assembly(Map paramMap) { + try { + if (paramMap.containsKey("ffmpegPath")) { + String ffmpegPath = (String) paramMap.get("ffmpegPath"); + // -i:输入流地址或者文件绝对地址 + StringBuilder comm = new StringBuilder(ffmpegPath + " -i "); + // 是否有必输项:输入地址,输出地址,应用名,twoPart:0-推一个元码流;1-推一个自定义推流;2-推两个流(一个是自定义,一个是元码) + if (paramMap.containsKey("input") && paramMap.containsKey("output") && paramMap.containsKey("appName") + && paramMap.containsKey("twoPart")) { + String input = (String) paramMap.get("input"); + String output = (String) paramMap.get("output"); + String appName = (String) paramMap.get("appName"); + String twoPart = (String) paramMap.get("twoPart"); + String codec = (String) paramMap.get("codec"); + // 默认h264解码 + codec = (codec == null ? "h264" : (String) paramMap.get("codec")); + // 输入地址 + comm.append(input); + // 当twoPart为0时,只推一个元码流 + if ("0".equals(twoPart)) { + comm.append(" -vcodec " + codec + " -f flv -an " + output + appName); + } else { + // -f :转换格式,默认flv + if (paramMap.containsKey("fmt")) { + String fmt = (String) paramMap.get("fmt"); + comm.append(" -f " + fmt); + } + // -r :帧率,默认25;-g :帧间隔 + if (paramMap.containsKey("fps")) { + String fps = (String) paramMap.get("fps"); + comm.append(" -r " + fps); + comm.append(" -g " + fps); + } + // -s 分辨率 默认是原分辨率 + if (paramMap.containsKey("rs")) { + String rs = (String) paramMap.get("rs"); + comm.append(" -s " + rs); + } + // 输出地址+发布的应用名 + comm.append(" -an " + output + appName); + // 当twoPart为2时推两个流,一个自定义流,一个元码流 + if ("2".equals(twoPart)) { + // 一个视频源,可以有多个输出,第二个输出为拷贝源视频输出,不改变视频的各项参数并且命名为应用名+HD + comm.append(" -vcodec copy -f flv -an ").append(output + appName + "HD"); + } + } + return comm.toString(); + } + } + } catch (Exception e) { + return null; + } + return null; + } + + @Override + public String assembly() { + // TODO Auto-generated method stub + return null; + } +} diff --git a/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidler.java b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidler.java new file mode 100644 index 0000000..6933fb5 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidler.java @@ -0,0 +1,55 @@ +package com.github.bluesbruce.ffch.commandbuidler; + +/** + * 流式命令行构建器 + * + * @author eguid + */ +public interface CommandBuidler { + + /** + * 创建命令行 + * + * @param root + * -命令行运行根目录或FFmpeg可执行文件安装目录 + * @return + */ + CommandBuidler create(String root); + + /** + * 创建默认根目录或默认FFmpeg可执行文件安装目录 + * + * @return + */ + CommandBuidler create(); + + /** + * 累加键-值命令 + * + * @param key + * @param val + * @return + */ + CommandBuidler add(String key, String val); + + /** + * 累加命令 + * + * @param val + * @return + */ + CommandBuidler add(String val); + + /** + * 生成完整命令行 + * + * @return + */ + CommandBuidler build(); + + /** + * 获取已经构建好的命令行 + * @return + */ + String get(); +} diff --git a/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidlerFactory.java b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidlerFactory.java new file mode 100644 index 0000000..46a734c --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidlerFactory.java @@ -0,0 +1,17 @@ +package com.github.bluesbruce.ffch.commandbuidler; + +/** + * 默认流式命令构建器工厂类 + * @author eguid + * + */ +public class CommandBuidlerFactory { + + public static CommandBuidler createBuidler() { + return new DefaultCommandBuidler(); + }; + + public static CommandBuidler createBuidler(String rootpath) { + return new DefaultCommandBuidler(rootpath); + }; +} diff --git a/src/main/java/com/github/bluesbruce/ffch/commandbuidler/DefaultCommandBuidler.java b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/DefaultCommandBuidler.java new file mode 100644 index 0000000..83f2d21 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/commandbuidler/DefaultCommandBuidler.java @@ -0,0 +1,69 @@ +package com.github.bluesbruce.ffch.commandbuidler; + +import com.github.bluesbruce.ffch.CommandManager; + +/** + * 默认流式命令行构建器(非线程安全) + * @author eguid + */ +public class DefaultCommandBuidler implements CommandBuidler{ + + StringBuilder buidler=null; + String command=null; + + public DefaultCommandBuidler() { + create(); + } + + + public DefaultCommandBuidler(String rootpath) { + create(rootpath); + } + + + @Override + public CommandBuidler create(String rootpath) { + buidler=new StringBuilder(rootpath); + return this; + } + + @Override + public CommandBuidler create() { + return create(CommandManager.config.getPath()); + } + + @Override + public CommandBuidler add(String key, String val) { + return add(key).add(val); + } + + @Override + public CommandBuidler add(String val) { + if(buidler!=null) { + buidler.append(val); + addBlankspace(); + } + return this; + } + + @Override + public CommandBuidler build() { + if(buidler!=null) { + command=buidler.toString(); + } + return this; + } + + private void addBlankspace() { + buidler.append(" "); + } + + @Override + public String get() { + if(command==null) { + build(); + } + return command; + } + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/config/ProgramConfig.java b/src/main/java/com/github/bluesbruce/ffch/config/ProgramConfig.java new file mode 100644 index 0000000..e3674a2 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/config/ProgramConfig.java @@ -0,0 +1,62 @@ +package com.github.bluesbruce.ffch.config; + +/** + * 程序基础配置 + * + * @author eguid + * + */ +public class ProgramConfig { + + private String path;//默认命令行执行根路径 + private boolean debug;//是否开启debug模式 + private Integer size;//任务池大小 + private String callback;//回调通知地址 + private boolean keepalive;//是否开启保活 + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public boolean isDebug() { + return debug; + } + + public void setDebug(boolean debug) { + this.debug = debug; + } + + public Integer getSize() { + return size; + } + + public void setSize(Integer size) { + this.size = size; + } + + public String getCallback() { + return callback; + } + + public void setCallback(String callback) { + this.callback = callback; + } + + public boolean isKeepalive() { + return keepalive; + } + + public void setKeepalive(boolean keepalive) { + this.keepalive = keepalive; + } + + @Override + public String toString() { + return "ProgramConfig [path=" + path + ", debug=" + debug + ", size=" + size + ", callback=" + callback + + ", keepalive=" + keepalive + "]"; + } +} diff --git a/src/main/java/com/github/bluesbruce/ffch/config/defaultFFmpegConfig.properties b/src/main/java/com/github/bluesbruce/ffch/config/defaultFFmpegConfig.properties new file mode 100644 index 0000000..43ec5f6 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/config/defaultFFmpegConfig.properties @@ -0,0 +1,15 @@ +#ffmpeg执行路径,一般为ffmpeg的安装目录,该路径只能是目录,不能为具体文件路径,否则会报错 +path=Z://ffmpeg/bin/ +#是否启用默认的ffmpeg,如果启用默认ffmpeg,上面配置的path就没有用了 +defaultpathEnable=false +#存放任务的默认Map的初始化大小 +size=10 +#事件回调通知接口地址 +callback=http://127.0.0.1/callback +#网络超时设置(毫秒) +timeout=300 +#开启保活线程 +keepalive=true +#是否输出debug消息 +debug=true + diff --git a/src/main/java/com/github/bluesbruce/ffch/data/CommandTasker.java b/src/main/java/com/github/bluesbruce/ffch/data/CommandTasker.java new file mode 100644 index 0000000..20a1225 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/data/CommandTasker.java @@ -0,0 +1,59 @@ +package com.github.bluesbruce.ffch.data; + +import com.github.bluesbruce.ffch.handler.OutHandler; + +/** + * 用于存放任务id,任务主进程,任务输出线程 + * + * @author eguid + * @since jdk1.7 + * @version 2016年10月29日 + */ +public class CommandTasker { + private final String id;// 任务id + private final String command;//命令行 + private Process process;// 命令行运行主进程 + private OutHandler thread;// 命令行消息输出子线程 + + public CommandTasker(String id,String command) { + this.id = id; + this.command=command; + } + + public CommandTasker(String id,String command, Process process, OutHandler thread) { + this.id = id; + this.command=command; + this.process = process; + this.thread = thread; + } + + public String getId() { + return id; + } + + public Process getProcess() { + return process; + } + + public OutHandler getThread() { + return thread; + } + + public String getCommand() { + return command; + } + + public void setProcess(Process process) { + this.process = process; + } + + public void setThread(OutHandler thread) { + this.thread = thread; + } + + @Override + public String toString() { + return "CommandTasker [id=" + id + ", command=" + command + ", process=" + process + ", thread=" + thread + "]"; + } + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/data/TaskDao.java b/src/main/java/com/github/bluesbruce/ffch/data/TaskDao.java new file mode 100644 index 0000000..82852bc --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/data/TaskDao.java @@ -0,0 +1,46 @@ +package com.github.bluesbruce.ffch.data; + +import java.util.Collection; + +/** + * 任务信息持久层接口 + * @author eguid + * @since jdk1.7 + * @version 2016年10月29日 + */ +public interface TaskDao { + /** + * 通过id查询任务信息 + * @param id - 任务ID + * @return CommandTasker -任务实体 + */ + public CommandTasker get(String id); + /** + * 查询全部任务信息 + * @return Collection + */ + public Collection getAll(); + /** + * 增加任务信息 + * @param CommandTasker -任务信息实体 + * @return 增加数量:<1-增加失败,>=1-增加成功 + */ + public int add(CommandTasker CommandTasker); + /** + * 删除id对应的任务信息 + * @param id + * @return 数量:<1-操作失败,>=1-操作成功 + */ + public int remove(String id); + /** + * 删除全部任务信息 + * @return 数量:<1-操作失败,>=1-操作成功 + */ + public int removeAll(); + /** + * 是否存在某个ID + * @param id - 任务ID + * @return true:存在,false:不存在 + */ + public boolean isHave(String id); +} diff --git a/src/main/java/com/github/bluesbruce/ffch/data/TaskDaoImpl.java b/src/main/java/com/github/bluesbruce/ffch/data/TaskDaoImpl.java new file mode 100644 index 0000000..62fd6f7 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/data/TaskDaoImpl.java @@ -0,0 +1,69 @@ +package com.github.bluesbruce.ffch.data; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * 任务信息持久层实现 + * + * @author eguid + * @since jdk1.7 + * @version 2016年10月29日 + */ +public class TaskDaoImpl implements TaskDao { + // 存放任务信息 + private ConcurrentMap map = null; + + public TaskDaoImpl(int size) { + map = new ConcurrentHashMap<>(size); + } + + @Override + public CommandTasker get(String id) { + return map.get(id); + } + + @Override + public Collection getAll() { + return map.values(); + } + + @Override + public int add(CommandTasker CommandTasker) { + String id = CommandTasker.getId(); + if (id != null && !map.containsKey(id)) { + map.put(CommandTasker.getId(), CommandTasker); + if(map.get(id)!=null) + { + return 1; + } + } + return 0; + } + + @Override + public int remove(String id) { + if(map.remove(id) != null){ + return 1; + }; + return 0; + } + + @Override + public int removeAll() { + int size = map.size(); + try { + map.clear(); + } catch (Exception e) { + return 0; + } + return size; + } + + @Override + public boolean isHave(String id) { + return map.containsKey(id); + } + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/data/TaskerEventMsg.java b/src/main/java/com/github/bluesbruce/ffch/data/TaskerEventMsg.java new file mode 100644 index 0000000..63dfa8a --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/data/TaskerEventMsg.java @@ -0,0 +1,41 @@ +package com.github.bluesbruce.ffch.data; + +import com.github.bluesbruce.ffch.callback.EventCallBackType; + +/** + * 命令行事件消息 + * @author eguid + * + */ +public class TaskerEventMsg { + EventCallBackType ecbt; + CommandTasker tasker; + + public TaskerEventMsg(EventCallBackType ecbt, CommandTasker tasker) { + super(); + this.ecbt = ecbt; + this.tasker = tasker; + } + + public EventCallBackType getEcbt() { + return ecbt; + } + + public void setEcbt(EventCallBackType ecbt) { + this.ecbt = ecbt; + } + + public CommandTasker getTasker() { + return tasker; + } + + public void setTasker(CommandTasker tasker) { + this.tasker = tasker; + } + + @Override + public String toString() { + return "CommandEventMsg [ecbt=" + ecbt + ", tasker=" + tasker + "]"; + } + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java b/src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java new file mode 100644 index 0000000..b55d1ea --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java @@ -0,0 +1,47 @@ +package com.github.bluesbruce.ffch.handler; + +import lombok.extern.slf4j.Slf4j; + +/** + * 默认任务消息输出处理 + * @author eguid + * @since jdk1.7 + * @version 2017年10月13日 + */ +@Slf4j +public class DefaultOutHandlerMethod implements OutHandlerMethod{ + + /** + * 任务是否异常中断,如果 + */ + public boolean isBroken=false; + + @Override + public void parse(String id,String msg) { + //过滤消息 + if (msg.indexOf("fail") != -1) { + log.info(id + "任务可能发生故障:" + msg); + log.info("失败,设置中断状态"); + isBroken=true; + }else if(msg.indexOf("miss")!= -1) { + log.info(id + "任务可能发生丢包:" + msg); + log.info("失败,设置中断状态"); + isBroken=true; + }else if(msg.indexOf("occurred")!= -1) { + log.info(id + "任务可能发生丢包10054:" + msg); + log.info("失败,设置中断状态"); + isBroken=false; + }else { + isBroken=false; + log.info(id + "消息:" + msg); + + } + + } + + @Override + public boolean isbroken() { + return isBroken; + } + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/handler/KeepAliveHandler.java b/src/main/java/com/github/bluesbruce/ffch/handler/KeepAliveHandler.java new file mode 100644 index 0000000..ddc4ec4 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/handler/KeepAliveHandler.java @@ -0,0 +1,81 @@ +package com.github.bluesbruce.ffch.handler; + +import com.github.bluesbruce.ffch.data.CommandTasker; +import com.github.bluesbruce.ffch.data.TaskDao; +import com.github.bluesbruce.ffch.util.ExecUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 任务保活处理器(一个后台保活线程,用于处理异常中断的持久任务) + * @author eguid + * + */ +@Slf4j +public class KeepAliveHandler extends Thread{ + /**待处理队列*/ + private static Queue queue=null; + + public int err_index=0;//错误计数 + + public volatile int stop_index=0;//安全停止线程标记 + + /** 任务持久化器*/ + private TaskDao taskDao = null; + + public KeepAliveHandler(TaskDao taskDao) { + super(); + this.taskDao=taskDao; + queue=new ConcurrentLinkedQueue<>(); + } + + public static void add(String id ) { + if(queue!=null) { + queue.offer(id); + } + } + + public boolean stop(Process process) { + if (process != null) { + process.destroy(); + return true; + } + return false; + } + + @Override + public void run() { + for(;stop_index==0;) { + if(queue==null) { + continue; + } + String id=null; + CommandTasker task=null; + + try { + while(queue.peek() != null) { + log.info("准备重启任务:"+queue); + id=queue.poll(); + task=taskDao.get(id); + //重启任务 + ExecUtil.restart(task); + } + }catch(IOException e) { + log.info(id+" 任务重启失败,详情:"+task); + //重启任务失败 + err_index++; + }catch(Exception e) { + + } + } + } + + @Override + public void interrupt() { + stop_index=1; + } + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java b/src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java new file mode 100644 index 0000000..e3bdfb1 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java @@ -0,0 +1,116 @@ +package com.github.bluesbruce.ffch.handler; + +import com.github.bluesbruce.ffch.CommandManager; +import lombok.extern.slf4j.Slf4j; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +/** + * 任务消息输出处理器 + * @author eguid + * @since jdk1.7 + * @version 2017年10月13日 + */ +@Slf4j +public class OutHandler extends Thread { + /**控制状态 */ + private volatile boolean desstatus = true; + + /**读取输出流*/ + private BufferedReader br = null; + + /**任务ID*/ + private String id = null; + + /**消息处理方法*/ + private OutHandlerMethod ohm; + + /** + * 创建输出线程(默认立即开启线程) + * @param is + * @param id + * @param ohm + * @return + */ + public static OutHandler create(InputStream is, String id,OutHandlerMethod ohm) { + return create(is, id, ohm,true); + } + + /** + * 创建输出线程 + * @param is + * @param id + * @param ohm + * @param start-是否立即开启线程 + * @return + */ + public static OutHandler create(InputStream is, String id,OutHandlerMethod ohm,boolean start) { + OutHandler out= new OutHandler(is, id, ohm); + if(start) + out.start(); + return out; + } + + public void setOhm(OutHandlerMethod ohm) { + this.ohm = ohm; + } + + public void setDesStatus(boolean desStatus) { + this.desstatus = desStatus; + } + + public void setId(String id) { + this.id = id; + } + + public OutHandlerMethod getOhm() { + return ohm; + } + + public OutHandler(InputStream is, String id,OutHandlerMethod ohm) { + br = new BufferedReader(new InputStreamReader(is)); + this.id = id; + this.ohm=ohm; + } + + /** + * 重写线程销毁方法,安全的关闭线程 + */ + @Override + public void destroy() { + setDesStatus(false); + } + + /** + * 执行输出线程 + */ + @Override + public void run() { + String msg = null; + try { + if (CommandManager.config.isDebug()) { + log.info(id + "开始推流!"); + } + while (desstatus && (msg = br.readLine()) != null) { + ohm.parse(id,msg); + if(ohm.isbroken()) { + log.info("检测到中断,提交重启任务给保活处理器"); + //如果发生异常中断,立即进行保活 + //把中断的任务交给保活处理器进行进一步处理 + KeepAliveHandler.add(id); + } + } + } catch (IOException e) { + log.info("发生内部异常错误,自动关闭[" + this.getId() + "]线程"); + destroy(); + } finally { + if (this.isAlive()) { + destroy(); + } + } + } + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/handler/OutHandlerMethod.java b/src/main/java/com/github/bluesbruce/ffch/handler/OutHandlerMethod.java new file mode 100644 index 0000000..3ecc79a --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/handler/OutHandlerMethod.java @@ -0,0 +1,21 @@ +package com.github.bluesbruce.ffch.handler; +/** + * 输出消息处理 + * @author eguid + * @since jdk1.7 + * @version 2017年10月13日 + */ +public interface OutHandlerMethod { + /** + * 解析消息 + * @param id-任务ID + * @param msg -消息 + */ + public void parse(String id, String msg); + + /** + * 任务是否异常中断 + * @return + */ + public boolean isbroken(); +} diff --git a/src/main/java/com/github/bluesbruce/ffch/handler/TaskHandler.java b/src/main/java/com/github/bluesbruce/ffch/handler/TaskHandler.java new file mode 100644 index 0000000..6c1c55c --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/handler/TaskHandler.java @@ -0,0 +1,45 @@ +package com.github.bluesbruce.ffch.handler; + +import com.github.bluesbruce.ffch.data.CommandTasker; + +/** + * 任务执行接口 + * @author eguid + * @since jdk1.7 + * @version 2016年10月29日 + */ +public interface TaskHandler { + /** + * 按照命令执行主进程和输出线程 + * + * @param id + * @param command + * @return + */ + public CommandTasker process(String id, String command); + + /** + * 停止主进程(停止主进程需要保证输出线程已经关闭,否则输出线程会出错) + * + * @param process + * @return + */ + public boolean stop(Process process); + + /** + * 停止输出线程 + * + * @param thread + * @return + */ + public boolean stop(Thread thread); + + /** + * 正确的停止输出线程和主进程 + * + * @param process + * @param thread + * @return + */ + public boolean stop(Process process, Thread thread); +} diff --git a/src/main/java/com/github/bluesbruce/ffch/handler/TaskHandlerImpl.java b/src/main/java/com/github/bluesbruce/ffch/handler/TaskHandlerImpl.java new file mode 100644 index 0000000..8993359 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/handler/TaskHandlerImpl.java @@ -0,0 +1,69 @@ +package com.github.bluesbruce.ffch.handler; + +import com.github.bluesbruce.ffch.CommandManager; +import com.github.bluesbruce.ffch.data.CommandTasker; +import com.github.bluesbruce.ffch.util.ExecUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +/** + * 任务处理实现 + * @author eguid + * @since jdk1.7 + * @version 2016年10月29日 + */ +@Slf4j +public class TaskHandlerImpl implements TaskHandler { + + private OutHandlerMethod ohm=null; + + public TaskHandlerImpl(OutHandlerMethod ohm) { + this.ohm = ohm; + } + + public void setOhm(OutHandlerMethod ohm) { + this.ohm = ohm; + } + + @Override + public CommandTasker process(String id, String command) { + CommandTasker tasker = null; + try { + log.info(command); + tasker =ExecUtil.createTasker(id,command,ohm); + + if(CommandManager.config.isDebug()) + log.info(id+" 执行命令行:"+command); + + return tasker; + } catch (IOException e) { + //运行失败,停止任务 + ExecUtil.stop(tasker); + + if(CommandManager.config.isDebug()) + log.info(id+" 执行命令失败!进程和输出线程已停止"); + + // 出现异常说明开启失败,返回null + return null; + } + } + + @Override + public boolean stop(Process process) { + return ExecUtil.stop(process); + } + + @Override + public boolean stop(Thread outHandler) { + return ExecUtil.stop(outHandler); + } + + @Override + public boolean stop(Process process, Thread thread) { + boolean ret=false; + ret=stop(thread); + ret=stop(process); + return ret; + } +} diff --git a/src/main/java/com/github/bluesbruce/ffch/loadFFmpeg.properties b/src/main/java/com/github/bluesbruce/ffch/loadFFmpeg.properties new file mode 100644 index 0000000..fcd9e1c --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/loadFFmpeg.properties @@ -0,0 +1,13 @@ +#ffmpeg执行路径,一般为ffmpeg的安装目录,该路径只能是目录,不能为具体文件路径,否则会报错 +path=Z://ffmpeg/bin/ +#存放任务的默认Map的初始化大小 +size=10 +#事件回调通知接口地址 +callback=http://127.0.0.1/callback +#网络超时设置(毫秒) +timeout=300 + +#开启保活线程 +keepalive=true +#是否输出debug消息 +debug=true diff --git a/src/main/java/com/github/bluesbruce/ffch/test/Test.java b/src/main/java/com/github/bluesbruce/ffch/test/Test.java new file mode 100644 index 0000000..b792908 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/test/Test.java @@ -0,0 +1,200 @@ +package com.github.bluesbruce.ffch.test; + +import com.github.bluesbruce.ffch.CommandManager; +import com.github.bluesbruce.ffch.CommandManagerImpl; +import com.github.bluesbruce.ffch.commandbuidler.CommandBuidlerFactory; +import com.github.bluesbruce.ffch.data.CommandTasker; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static com.github.bluesbruce.ffch.util.PropertiesUtil.load; + +/** + * 测试 + * @author eguid + * @since jdk1.7 + * @version 2017年10月13日 + */ +@Slf4j +public class Test { + /** + * 命令组装器测试 + * @throws InterruptedException + */ + public static void test1() throws InterruptedException{ + CommandManager manager = new CommandManagerImpl(); + Map map = new HashMap(); + map.put("appName", "test123"); + //FFCmdParamConfig Paramconfig=load("cc/eguid/cc.eguid.commandManager/loadCmdParam.yml", FFCmdParamConfig.class); + map.put("input", ""); + map.put("output", ""); + map.put("codec", "h264"); + map.put("fmt", "flv"); + map.put("fps", "25"); + map.put("rs", "640x360"); + map.put("twoPart", "2"); + // 执行任务,id就是appName,如果执行失败返回为null + String id = manager.start(map); + log.info(id); + // 通过id查询 + CommandTasker info = manager.query(id); + log.info(info.toString()); + // 查询全部 + Collection infoList = manager.queryAll(); + log.info(infoList.toString()); + Thread.sleep(30000); + // 停止id对应的任务 + manager.stop(id); + } + /** + * 默认方式,rtsp->rtmp转流单个命令测试 + * @throws InterruptedException + */ + public static void test2() throws InterruptedException{ + CommandManager manager = new CommandManagerImpl(); + // -rtsp_transport tcp + //测试多个任何同时执行和停止情况 + //默认方式发布任务 + manager.start("tomcat", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat"); + + Thread.sleep(30000); + // 停止全部任务 + manager.stopAll(); + } + /** + * 完整ffmpeg路径测试 + * @throws InterruptedException + */ + public static void test4() throws InterruptedException{ + CommandManager manager = new CommandManagerImpl(); + // -rtsp_transport tcp + //测试多个任何同时执行和停止情况 + //默认方式发布任务 + manager.start("tomcat", "D:/TestWorkspaces/FFmpegCommandHandler/src/cc/eguid/FFmpegCommandManager/ffmpeg/ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat",true); + + Thread.sleep(30000); + // 停止全部任务 + manager.stopAll(); + } + + /** + * rtsp-rtmp转流多任务测试 + * @throws InterruptedException + */ + public static void test3() throws InterruptedException{ + CommandManager manager = new CommandManagerImpl(); + // -rtsp_transport tcp + //测试多个任何同时执行和停止情况 + //false表示使用配置文件中的ffmpeg路径,true表示本条命令已经包含ffmpeg所在的完整路径 + //manager.start("tomcat", "ffmpeg -i http://192.168.10.101:18000/flv/live/34020000001110000002_34020000001320000071_0200000071.flv -vcodec copy -acodec copy -f flv -y rtmp://192.168.10.101:19350/rlive/stream_9?sign=f8a15b6n",false); + manager.start("tomcat", "ffmpeg -i rtsp://192.168.144.25:554 -vcodec copy -acodec copy -f flv -y rtmp://221.226.114.142:19350/rlive/stream_9?sign=f8a15b6n",false); + //manager.start("tomcat1", "ffmpeg -i rtsp://192.168.144.25:554 -vcodec copy -acodec copy -f flv -y rtmp://192.168.10.101:19350/rlive/stream_11?sign=rHtBg3sz",false); + /*manager.start("tomcat2", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat2",false); + manager.start("tomcat3", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat3",false); + manager.start("tomcat4", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat4",false); + manager.start("tomcat5", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat5",false); + manager.start("tomcat6", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat6",false); + manager.start("tomcat7", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat7",false); + manager.start("tomcat8", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat8",false); + manager.start("tomcat9", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat9",false);*/ + + Thread.sleep(60000); + // 停止全部任务 + manager.stopAll(); + } + + /** + * 测试流式命令行构建器 + * @throws InterruptedException + */ + public static void testStreamCommandAssmbly() throws InterruptedException { + CommandManager manager = new CommandManagerImpl(); + manager.start("test1", CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov") + .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-y").add("rtmp://106.14.182.20:1935/rtmp/test1")); + Thread.sleep(30000); + // 停止全部任务 + manager.stopAll(); + } + /** + * 测试任务中断自动重启任务 + * @throws InterruptedException + */ + public static void testBroken() throws InterruptedException { + CommandManager manager = new CommandManagerImpl(); + manager.start("test1", CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov") + .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-y").add("rtmp://106.14.182.20:1935/rtmp/test1")); + Thread.sleep(30000); + // 停止全部任务 + manager.stopAll(); + manager.destory(); + } + /** + * 批量测试任务中断自动重启任务 + * @throws InterruptedException + */ + public static void testBrokenMuti() throws InterruptedException { + CommandManager manager = new CommandManagerImpl(); + manager.start("test1", CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i","rtsp://admin:HuaWei123@218.94.150.122:554/LiveMedia/ch1/Media1") + .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-y").add("rtmp://live.push.t-aaron.com/live/whr")); + /*manager.start("test2", CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov") + .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-y").add("rtmp://106.14.182.20:1935/rtmp/test2")); + manager.start("test3", CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov") + .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-y").add("rtmp://106.14.182.20:1935/rtmp/test3")); + manager.start("test4", CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov") + .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-y").add("rtmp://106.14.182.20:1935/rtmp/test4")); + manager.start("test5", CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov") +// .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-y").add("rtmp://106.14.182.20:1935/rtmp/test5"));*/ + Thread.sleep(30000); + // 停止全部任务 + manager.stopAll(); + manager.destory(); + } + + public static void main(String[] args) throws InterruptedException { + // test1(); +// test2(); + test3(); +// test4(); +// testStreamCommandAssmbly(); +// testBroken(); + //testBrokenMuti(); + } +} diff --git a/src/main/java/com/github/bluesbruce/ffch/util/CommonUtil.java b/src/main/java/com/github/bluesbruce/ffch/util/CommonUtil.java new file mode 100644 index 0000000..eaf9878 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/util/CommonUtil.java @@ -0,0 +1,70 @@ +package com.github.bluesbruce.ffch.util; + +import java.io.File; +import java.util.UUID; + +/** + * 公共常用方法工具 + * @author eguid + * + */ +public class CommonUtil { + /** + * 当前项目根路径 + */ + public static final String rootPath = getProjectRootPath(); + public static final String TRUE="true"; + public static final String NULL_STRING=""; + public static final String H_LINE="-"; + public static String getUUID(){ + return UUID.randomUUID().toString().trim().replaceAll(H_LINE, NULL_STRING); + } + /** + * 是否为空 + * + * @param str + * @return boolean true:为空,false:不为空 + */ + public static boolean isNull(String str) { + return str == null || NULL_STRING.equals(str.trim()); + } + /** + * 字符串是否是"true" + * @param str + * @return + */ + public static boolean isTrue(String str){ + return TRUE.equals(str)?true:false; + } + + /** + * 获取项目根目录(静态) + * @return + */ + public static String getRootPath() { + return rootPath; + } + /** + * 获取项目根目录(动态) + * @return + */ + public static String getProjectRootPath() { + String path=null; + try{ + path =CommonUtil.class.getResource("/").getPath(); + }catch(Exception e){ + File directory = new File(NULL_STRING); + path= directory.getAbsolutePath()+File.separator; + } + return path; + } + /** + * 获取类路径 + * @param cla + * @return + */ + public static String getClassPath(Class> cla){ + return cla.getResource("").getPath(); + } + +} diff --git a/src/main/java/com/github/bluesbruce/ffch/util/ExecUtil.java b/src/main/java/com/github/bluesbruce/ffch/util/ExecUtil.java new file mode 100644 index 0000000..b95ebd4 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/util/ExecUtil.java @@ -0,0 +1,110 @@ +package com.github.bluesbruce.ffch.util; + +import com.github.bluesbruce.ffch.data.CommandTasker; +import com.github.bluesbruce.ffch.handler.OutHandler; +import com.github.bluesbruce.ffch.handler.OutHandlerMethod; + +import java.io.IOException; + +/** + * 命令行操作工具类 + * @author eguid + * + */ +public class ExecUtil { + + /** + * 执行命令行并获取进程 + * @param cmd + * @return + * @throws IOException + */ + public static Process exec(String cmd) throws IOException { + Runtime runtime = Runtime.getRuntime(); + Process process = runtime.exec(cmd);// 执行命令获取主进程 + return process; + } + + /** + * 销毁进程 + * @param process + * @return + */ + public static boolean stop(Process process) { + if (process != null) { + process.destroy(); + return true; + } + return false; + } + + /** + * 销毁输出线程 + * @param outHandler + * @return + */ + public static boolean stop(Thread outHandler) { + if (outHandler != null && outHandler.isAlive()) { + outHandler.stop(); + outHandler.destroy(); + return true; + } + return false; + } + + /** + * 销毁 + * @param process + * @param outHandler + */ + public static void stop(CommandTasker tasker) { + if(tasker!=null) { + stop(tasker.getThread()); + stop(tasker.getProcess()); + } + } + + /** + * 创建命令行任务 + * @param id + * @param command + * @return + * @throws IOException + */ + public static CommandTasker createTasker(String id, String command, OutHandlerMethod ohm) throws IOException { + // 执行本地命令获取任务主进程 + Process process=exec(command); + // 创建输出线程 + OutHandler outHandler=OutHandler.create(process.getErrorStream(), id,ohm); + + CommandTasker tasker = new CommandTasker(id,command, process, outHandler); + + return tasker; + } + + /** + * 中断故障缘故重启 + * @param tasker + * @return + * @throws IOException + */ + public static CommandTasker restart(CommandTasker tasker) throws IOException { + if(tasker!=null) { + String id=tasker.getId(),command=tasker.getCommand(); + OutHandlerMethod ohm=null; + if(tasker.getThread()!=null) { + ohm=tasker.getThread().getOhm(); + } + + //安全销毁命令行进程和输出子线程 + stop(tasker); + // 执行本地命令获取任务主进程 + Process process=exec(command); + tasker.setProcess(process); + // 创建输出线程 + OutHandler outHandler=OutHandler.create(process.getErrorStream(), id,ohm); + tasker.setThread(outHandler); + } + return tasker; + } +} diff --git a/src/main/java/com/github/bluesbruce/ffch/util/PropertiesUtil.java b/src/main/java/com/github/bluesbruce/ffch/util/PropertiesUtil.java new file mode 100644 index 0000000..b0c225a --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/util/PropertiesUtil.java @@ -0,0 +1,144 @@ +package com.github.bluesbruce.ffch.util; + +import lombok.extern.slf4j.Slf4j; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +/** + * properties配置文件读取 + * @author eguid + * + */ +@Slf4j +public class PropertiesUtil { + /** + * 加载properties配置文件并读取配置项 + * @param path + * @param cl + * @return + */ + @SuppressWarnings("unchecked") + public static T load(String path, Class cl) { + InputStream is = null; + try { + is = getInputStream(path); + } catch (FileNotFoundException e) { + //尝试从web目录读取 + String newpath=System.getProperty("user.dir")+path; + log.info("尝试从web目录读取配置文件:"+newpath); + try { + is = getInputStream(newpath); + log.info("web目录读取到配置文件:"+newpath); + } catch (FileNotFoundException e1) { + log.info("没找到配置文件,读取默认配置文件"); + //尝试从jar包中读取默认配置文件 + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + try { + is = classloader.getResourceAsStream("cc/eguid/FFmpegCommandManager/config/defaultFFmpegConfig.properties"); + log.info("读取默认配置文件:defaultFFmpegConfig.properties"); + } catch (Exception e2) { + log.info("没找到默认配置文件:defaultFFmpegConfig.properties"); + return null; + } + } + } + if (is != null) { + Properties pro = new Properties(); + try { + log.info("加载配置文件..."); + pro.load(is); + log.info("加载配置文件完毕"); + return (T)load(pro, cl); + } catch (IOException e) { + log.info("加载配置文件失败"); + return null; + } + + } + return null; + } + /** + * 读取配置项并转换为对应对象 + * @param pro + * @param cl + * @return + */ + public static Object load(Properties pro, Class> cl) { + try { + Map map = getMap(pro); + log.info("读取的配置项:" + map); + Object obj = ReflectUtil.mapToObj(map, cl); + log.info("转换后的对象:" + obj); + return obj; + } catch (InstantiationException e) { + log.info("加载配置文件失败"); + return null; + } catch (IllegalAccessException e) { + log.info("加载配置文件失败"); + return null; + } catch (IllegalArgumentException e) { + log.info("加载配置文件失败"); + return null; + } catch (InvocationTargetException e) { + log.info("加载配置文件失败"); + return null; + } + } + /** + * 获取对应文件路径下的文件流 + * @param path + * @return + * @throws FileNotFoundException + */ + public static InputStream getInputStream(String path) throws FileNotFoundException { + return new FileInputStream(path); + } + /** + * 根据路径获取properties的Map格式内容 + * @param path + * @return + */ + public static Map getMap(String path){ + Properties pro=new Properties(); + try { + pro.load(getInputStream(path)); + return getMap(pro); + } catch (IOException e) { + return null; + } + } + /** + * 根据路径获取properties的Map格式内容 + * @param path + * @param isRootPath -是否在项目根目录中 + * @return + */ + public static Map getMap(String path,boolean isRootPath){ + return getMap(isRootPath?CommonUtil.getProjectRootPath()+path:path); + } + /** + * Properties配置项转为Map + * @param pro + * @return + */ + public static Map getMap(Properties pro) { + if (pro == null || pro.isEmpty() || pro.size() < 1) { + return null; + } + Map map = new HashMap(); + for (Entry en : pro.entrySet()) { + String key = (String) en.getKey(); + Object value = en.getValue(); + map.put(key, value); + } + return map; + } +} diff --git a/src/main/java/com/github/bluesbruce/ffch/util/ReflectUtil.java b/src/main/java/com/github/bluesbruce/ffch/util/ReflectUtil.java new file mode 100644 index 0000000..78ceb5f --- /dev/null +++ b/src/main/java/com/github/bluesbruce/ffch/util/ReflectUtil.java @@ -0,0 +1,166 @@ +package com.github.bluesbruce.ffch.util; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; + +/** + * 反射操作工具 + * + * @author eguid + * + */ +public class ReflectUtil { + + public static final String SET = "set"; + public static final String GET = "get"; + + public static Object mapToObj(Map map, Class> oc) + throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { + Method[] ms = oc.getDeclaredMethods(); + if (ms == null || ms.length < 1) { + return null; + } + Object obj = getObject(oc); + for (Method m : ms) { + String methodName = m.getName(); + String fieldName = getMethodField(methodName, SET); + Object value = map.get(fieldName); + if (value != null) { + setMethodValue(m, obj, typeConvert(value, m)); + } + } + return obj; + } + + public static Object typeConvert(Object obj, Method m) { + return typeConvert(obj, m.getParameterTypes()[0].getName()); + } + + public static Object typeConvert(Object obj, Field f) { + return typeConvert(obj, f.getType().getName()); + } + /** + * 基础数据转换 + * @param obj + * @param typeName + * @return + */ + public static Object typeConvert(Object obj, String typeName) { + // 基础数据都可以转为String + String str = String.valueOf(obj); + if ("int".equals(typeName) || "java.lang.Integer".equals(typeName)) { + return Integer.valueOf(str.trim()); + } else if ("long".equals(typeName) || "java.lang.Long".equals(typeName)) { + return Long.valueOf(str.trim()); + } else if ("byte".equals(typeName) || "java.lang.Byte".equals(typeName)) { + return Byte.valueOf(str.trim()); + } else if ("short".equals(typeName) || "java.lang.Short".equals(typeName)) { + return Short.valueOf(str.trim()); + } else if ("float".equals(typeName) || "java.lang.Float".equals(typeName)) { + return Float.valueOf(str.trim()); + } else if ("double".equals(typeName) || "java.lang.Double".equals(typeName)) { + return Double.valueOf(str.trim()); + } else if ("boolean".equals(typeName) || "java.lang.Boolean".equals(typeName)) { + return CommonUtil.TRUE.equals(str)?true:false; + } else if ("char".equals(typeName) || "java.lang.Character".equals(typeName)) { + return Character.valueOf(str.trim().charAt(0)); + } else if ("java.lang.String".equals(typeName)) { + return str; + } + return null; + } + + public static Class> getFieldType(Class> cl, String fieldName) throws NoSuchFieldException, SecurityException { + Field f = cl.getDeclaredField(fieldName); + return f.getType(); + } + + public static Field findField(Class> cl, String fieldName) throws NoSuchFieldException, SecurityException { + return cl.getDeclaredField(fieldName); + } + + /** + * 执行方法 + * + * @param m + * - 方法 + * @param obj + * - 对象 + * @param value + * - 参数 + * @throws IllegalAccessException + * @throws IllegalArgumentException + * @throws InvocationTargetException + */ + public static Object setMethodValue(Method m, Object obj, Object... value) + throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + m.getParameterTypes(); + return m.invoke(obj, value); + } + + public static Object getFieldValue(Class> obj, String FieldName) throws NoSuchFieldException, SecurityException { + return obj.getDeclaredField(FieldName); + } + + /** + * 通过class实例化 + * + * @param oc + * @return + * @throws InstantiationException + * @throws IllegalAccessException + */ + public static Object getObject(Class> oc) throws InstantiationException, IllegalAccessException { + return oc.newInstance(); + } + + /** + * 获取方法字段 + * + * @param methodName + * @param prefix + * @param lowercase + * @return + */ + public static String getMethodField(String methodName, String prefix) { + String m = null; + if (prefix != null) { + if (methodName.indexOf(prefix) >= 0) { + m = methodName.substring(prefix.length()); + return stringFirstLower(m); + } + } + return m; + } + + /** + * 首字母大写 + * + * @param str + * @return + */ + public static String stringFirstUpper(String str) { + char[] ch = str.toCharArray(); + if (ch[0] >= 'a' && ch[0] <= 'z') { + ch[0] = (char) (ch[0] - 32); + } + return new String(ch); + } + + /** + * 首字母小写 + * + * @param str + * @return + */ + public static String stringFirstLower(String str) { + char[] ch = str.toCharArray(); + if (ch[0] >= 'A' && ch[0] <= 'Z') { + ch[0] = (char) (ch[0] + 32); + } + return new String(ch); + } + +} diff --git a/src/main/java/com/github/bluesbruce/spring/Application.java b/src/main/java/com/github/bluesbruce/spring/Application.java new file mode 100644 index 0000000..1fb0a91 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/Application.java @@ -0,0 +1,34 @@ +package com.github.bluesbruce.spring; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.context.annotation.Profile; + +import static com.github.bluesbruce.ffch.util.PropertiesUtil.load; + +/** + * SpringBoot 入口类 + * + * @author BBF + */ + +@EnableCaching +@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, + HibernateJpaAutoConfiguration.class}) +public class Application extends SpringBootServletInitializer { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + @Profile(value = {"war"}) + @Override + protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { + return application.sources(Application.class); + } + +} diff --git a/src/main/java/com/github/bluesbruce/spring/config/CmdParam.java b/src/main/java/com/github/bluesbruce/spring/config/CmdParam.java new file mode 100644 index 0000000..dd43f72 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/config/CmdParam.java @@ -0,0 +1,74 @@ +package com.github.bluesbruce.spring.config; + +public class CmdParam { + private String pushUrl;// + private String pushUrl2;// + private String playUrl;// + private int type;//1 固定通道 2 通道服务 + String mqttUrl; + private String mqttTopic; + + public String getPushUrl2() { + return pushUrl2; + } + + public void setPushUrl2(String pushUrl2) { + this.pushUrl2 = pushUrl2; + } + + public String getMqttUrl() { + return mqttUrl; + } + + public void setMqttUrl(String mqttUrl) { + this.mqttUrl = mqttUrl; + } + + public String getMqttTopic() { + return mqttTopic; + } + + public void setMqttTopic(String mqttTopic) { + this.mqttTopic = mqttTopic; + } + + public String getPushUrl() { + return pushUrl; + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + public CmdParam(String pushUrl, String playUrl, int type, String mqttUrl, String mqttTopic) { + this.pushUrl = pushUrl; + this.playUrl = playUrl; + this.type = type; + this.mqttUrl = mqttUrl; + this.mqttTopic = mqttTopic; + } + + public CmdParam() { + + } + public void setPushUrl(String pushUrl) { + this.pushUrl = pushUrl; + } + + public String getPlayUrl() { + return playUrl; + } + + @Override + public String toString() { + return "CmdParam [pushUrl=" + pushUrl + ", playUrl=" + playUrl + ",type=" + type + "]"; + } + + public void setPlayUrl(String playUrl) { + this.playUrl = playUrl; + } +} diff --git a/src/main/java/com/github/bluesbruce/spring/config/Readproperties.java b/src/main/java/com/github/bluesbruce/spring/config/Readproperties.java new file mode 100644 index 0000000..37ccb5d --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/config/Readproperties.java @@ -0,0 +1,41 @@ +package com.github.bluesbruce.spring.config; + + +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +/** + * @author 白豆五 + * @version 2023/07/16 + * @since JDK8 + */ + +public class Readproperties { + public void test() { + // 配置对象 + Properties props = new Properties(); + InputStreamReader input = null; + try { + // 输入流 (字节流转字符流) + input = new InputStreamReader( + this.getClass().getClassLoader().getResourceAsStream(System.getProperty("user.dir")+"/loadFFmpeg.properties"),//通过类加载器来获取指定路径下的资源文件,并返回一个InputStream对象 + StandardCharsets.UTF_8); //指定编码格式 + // 加载配置 + props.load(input); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (input!=null) + try { + input.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + // 获取配置 + System.out.println("id:" + props.getProperty("student.id") + ", name:" + props.getProperty("student.name")); + } +} diff --git a/src/main/java/com/github/bluesbruce/spring/mqttService/HttpURLConnectionUtil.java b/src/main/java/com/github/bluesbruce/spring/mqttService/HttpURLConnectionUtil.java new file mode 100644 index 0000000..5eb427f --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/mqttService/HttpURLConnectionUtil.java @@ -0,0 +1,295 @@ +package com.github.bluesbruce.spring.mqttService; +import org.springframework.lang.Nullable; + +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import java.io.*; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +/** + * @author ww + * @date 2022/07/8 10:42 + */ +public class HttpURLConnectionUtil { + + /** + * Http get请求 + * @param httpUrl 连接 + * @return 响应数据 + */ + public static String doGet(String httpUrl){ + //链接 + HttpURLConnection connection = null; + InputStream is = null; + BufferedReader br = null; + StringBuffer result = new StringBuffer(); + try { + //创建连接 + URL url = new URL(httpUrl); + connection = (HttpURLConnection) url.openConnection(); + //设置请求方式 + connection.setRequestMethod("GET"); + + connection.setRequestProperty("accept", "*/*"); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)"); + connection.setRequestProperty("Content-Type", "application/json;charset=utf-8"); + connection.setRequestProperty("Authorization", "Basic YWRtaW46cHVibGlj"); + connection.setRequestProperty("Cookie", "jenkins-timestamper-offset=-28800000; order=id%20desc; serverType=nginx; ltd_end=-1; pro_end=-1; memSize=7768; bt_user_info=%7B%22status%22%3Atrue%2C%22msg%22%3A%22%u83B7%u53D6%u6210%u529F%21%22%2C%22data%22%3A%7B%22username%22%3A%22139****4314%22%7D%7D; sites_path=/www/wwwroot; distribution=centos8; p-1=1; lang=en-US; force=0; b6e922e8cf76dea67efb3a9292724aee=acc59469-46e3-4576-b216-81b907d80a7f.rB3Plw7uc301E0qFoEBeoP_fL4w; load_search=undefined; remember-me=YWRtaW46MTY1ODIxMDYwODA2MDoxY2ZhOTBhMzFlN2IzZTg1YzM5MWQ3Y2M4MWEzYzVlZmYzM2UzOGQ4MTVjZDZhNTUwNjQ4YjFhMjVlYTM3NWJj; JSESSIONID.fc8332ab=node01bo1jsydkd8ql1oo3srksa2k6w288.node0; screenResolution=1920x1080; b2651c13496a99bc2899b024b99bca4f=b4e55bb9-4a3e-44b6-ae8b-c054e7115182.SEaDzMgHxI4d2CGoGwNPnj3L2y4; request_token=hH1LwVtazrgz4sMI4CE1SwEVpCYgiQcHNp4EfyvPni9C32PJ; soft_remarks=%7B%22list%22%3A%5B%22%u4F01%u4E1A%u7248%u3001%u4E13%u4E1A%u7248%u63D2%u4EF6%22%2C%2215%u5929%u65E0%u7406%u7531%u9000%u6B3E%22%2C%22%u53EF%u66F4%u6362IP%22%2C%22%u5E74%u4ED8%u8D60%u90012%u5F20SSL%u8BC1%u4E66%22%2C%22%u5E74%u4ED8%u8D60%u90011000%u6761%u77ED%u4FE1%22%2C%22%u4F4E%u81F32.43%u5143/%u5929%22%2C%22%u5546%u7528%u9632%u706B%u5899%u6388%u6743%22%2C%22%u5E74%u4ED8%u53EF%u5165%u4F01%u4E1A%u7248%u670D%u52A1%u7FA4%22%2C%22%u4EA7%u54C1%u6388%u6743%u8BC1%u4E66%22%5D%2C%22pro_list%22%3A%5B%22%u4E13%u4E1A%u7248%u63D2%u4EF6%22%2C%2215%u5929%u65E0%u7406%u7531%u9000%u6B3E%22%2C%22%u53EF%u66F4%u6362IP%22%2C%22%u4F4E%u81F31.18%u5143/%u5929%22%2C%22%u5546%u7528%u9632%u706B%u5899%u6388%u6743%22%2C%22%u4EA7%u54C1%u6388%u6743%u8BC1%u4E66%22%5D%2C%22kfqq%22%3A%223007255432%22%2C%22kf%22%3A%22http%3A//q.url.cn/CDfQPS%3F_type%3Dwpa%26qidian%3Dtrue%22%2C%22qun%22%3A%22%22%2C%22kf_list%22%3A%5B%7B%22qq%22%3A%223007255432%22%2C%22kf%22%3A%22http%3A//q.url.cn/CDfQPS%3F_type%3Dwpa%26qidian%3Dtrue%22%7D%2C%7B%22qq%22%3A%222927440070%22%2C%22kf%22%3A%22http%3A//wpa.qq.com/msgrd%3Fv%3D3%26uin%3D2927440070%26site%3Dqq%26menu%3Dyes%26from%3Dmessage%26isappinstalled%3D0%22%7D%5D%2C%22wx_list%22%3A%5B%7B%22ps%22%3A%22%u552E%u524D%u54A8%u8BE2%22%2C%22kf%22%3A%22https%3A//work.weixin.qq.com/kfid/kfc72fcbde93e26a6f3%22%7D%5D%7D; load_page=null; load_type=null"); + //设置连接超时时间 + connection.setReadTimeout(15000); + //开始连接 + connection.connect(); + //获取响应数据 + if (connection.getResponseCode() == 200) { + //获取返回的数据 + is = connection.getInputStream(); + if (null != is) { + br = new BufferedReader(new InputStreamReader(is, "UTF-8")); + String temp = null; + while (null != (temp = br.readLine())) { + result.append(temp); + } + } + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (null != br) { + try { + br.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if (null != is) { + try { + is.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + //关闭远程连接 + connection.disconnect(); + } + return result.toString(); + } + + /** + * Http post请求 + * @param httpUrl 连接 + * @param param 参数 + * @return + */ + public static String doPost(String httpUrl, @Nullable String param) { + StringBuffer result = new StringBuffer(); + //连接 + HttpURLConnection connection = null; + OutputStream os = null; + InputStream is = null; + BufferedReader br = null; + trustAllHosts(); + try { + //创建连接对象 + URL url = new URL(httpUrl); + //创建连接 + connection = (HttpURLConnection) url.openConnection(); + //设置请求方法 + connection.setRequestMethod("POST"); + //设置连接超时时间 + connection.setConnectTimeout(15000); + //设置读取超时时间 + connection.setReadTimeout(15000); + //DoOutput设置是否向httpUrlConnection输出,DoInput设置是否从httpUrlConnection读入,此外发送post请求必须设置这两个 + //设置是否可读取 + connection.setDoOutput(true); + connection.setDoInput(true); + //设置通用的请求属性 + connection.setRequestProperty("accept", "*/*"); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)"); + connection.setRequestProperty("Content-Type", "application/json;charset=utf-8"); + + //拼装参数 + if (null != param && (!param.equals(""))) { + //设置参数 + os = connection.getOutputStream(); + //拼装参数 + os.write(param.getBytes("UTF-8")); + } + //设置权限 + //设置请求头等 + //开启连接 + //connection.connect(); + //读取响应 + if (connection.getResponseCode() == 200) { + is = connection.getInputStream(); + if (null != is) { + //br = new BufferedReader(new InputStreamReader(is, "GBK")); + br = new BufferedReader(new InputStreamReader(is, "UTF-8")); + String temp = null; + while (null != (temp = br.readLine())) { + result.append(temp); + result.append("\r\n"); + } + } + } + + } catch (MalformedURLException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + //关闭连接 + if(br!=null){ + try { + br.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if(os!=null){ + try { + os.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if(is!=null){ + try { + is.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + //关闭连接 + connection.disconnect(); + } + return result.toString(); + } + public static String doPut(String httpUrl, @Nullable String param) { + StringBuffer result = new StringBuffer(); + //连接 + HttpURLConnection connection = null; + OutputStream os = null; + InputStream is = null; + BufferedReader br = null; + try { + //创建连接对象 + URL url = new URL(httpUrl); + //创建连接 + connection = (HttpURLConnection) url.openConnection(); + //设置请求方法 + connection.setRequestMethod("PUT"); + //设置连接超时时间 + connection.setConnectTimeout(15000); + //设置读取超时时间 + connection.setReadTimeout(15000); + //DoOutput设置是否向httpUrlConnection输出,DoInput设置是否从httpUrlConnection读入,此外发送post请求必须设置这两个 + //设置是否可读取 + connection.setDoOutput(true); + connection.setDoInput(true); + //设置通用的请求属性 + connection.setRequestProperty("accept", "*/*"); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)"); + connection.setRequestProperty("Content-Type", "application/json;charset=utf-8"); + + //拼装参数 + if (null != param && (!param.equals(""))) { + //设置参数 + os = connection.getOutputStream(); + //拼装参数 + os.write(param.getBytes("UTF-8")); + } + //设置权限 + //设置请求头等 + //开启连接 + //connection.connect(); + //读取响应 + if (connection.getResponseCode() == 200) { + is = connection.getInputStream(); + if (null != is) { + //br = new BufferedReader(new InputStreamReader(is, "GBK")); + br = new BufferedReader(new InputStreamReader(is, "UTF-8")); + String temp = null; + while (null != (temp = br.readLine())) { + result.append(temp); + result.append("\r\n"); + } + } + } + + } catch (MalformedURLException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + //关闭连接 + if(br!=null){ + try { + br.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if(os!=null){ + try { + os.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if(is!=null){ + try { + is.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + //关闭连接 + connection.disconnect(); + } + return result.toString(); + } + public static void main(String[] args) { + String message = doPost("https://tcc.taobao.com/cc/json/mobile_tel_segment.htm?tel=13026194071", ""); + System.out.println(message); + } + + /** + * Trust every server - dont check for any certificate + */ + private static void trustAllHosts() { + final String TAG = "trustAllHosts"; + // Create a trust manager that does not validate certificate chains + TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() { + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[]{}; + } + + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + //Log.i(TAG, "checkClientTrusted"); + } + + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + //Log.i(TAG, "checkServerTrusted"); + } + }}; + // Install the all-trusting trust manager + try { + SSLContext sc = SSLContext.getInstance("TLS"); + sc.init(null, trustAllCerts, new java.security.SecureRandom()); + HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/github/bluesbruce/spring/mqttService/consumer/MqttConsumerCallBack.java b/src/main/java/com/github/bluesbruce/spring/mqttService/consumer/MqttConsumerCallBack.java new file mode 100644 index 0000000..99ea6db --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/mqttService/consumer/MqttConsumerCallBack.java @@ -0,0 +1,73 @@ +package com.github.bluesbruce.spring.mqttService.consumer; + +import com.github.bluesbruce.spring.service.MqttLiveHandle; +import com.github.bluesbruce.spring.utils.SpringUtil; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; + +@Slf4j +@Component +public class MqttConsumerCallBack implements MqttCallback { + + @Autowired + private MqttLiveHandle mqttLiveHandle; +/** + * 客户端断开连接的回调 + */ + @Override + public void connectionLost(Throwable throwable) { + System.out.println("与服务器断开连接,可重连"); + /*MqttProviderConfig client = SpringUtil.getBean(MqttProviderConfig.class); + System.out.println(client.isconnect()); + if (!client.isconnect()) { + client.connect(); + System.out.println("重连成功!"); + }*/ + } + + DateFormat bf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + /** + * 消息到达的回调 + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + try { + log.info(String.format("接收消息主题 : %s", topic) + "-" + bf.format(new Date())); + log.info(String.format("接收消息Qos : %d", message.getQos())); + log.info(String.format("接收消息内容 : %s", new String(message.getPayload()))); + log.info(String.format("接收消息retained : %b", message.isRetained())); + if (topic.contains("/rtmp/live")){ + if (ObjectUtils.isEmpty(mqttLiveHandle)){ + mqttLiveHandle = SpringUtil.getBean(MqttLiveHandle.class); + } + mqttLiveHandle.handleLive(topic,message); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 消息发布成功的回调 + */ + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + + + + + +} + diff --git a/src/main/java/com/github/bluesbruce/spring/mqttService/send/MqttProviderConfig.java b/src/main/java/com/github/bluesbruce/spring/mqttService/send/MqttProviderConfig.java new file mode 100644 index 0000000..afb31aa --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/mqttService/send/MqttProviderConfig.java @@ -0,0 +1,99 @@ +package com.github.bluesbruce.spring.mqttService.send; + +import com.github.bluesbruce.spring.config.CmdParam; +import com.github.bluesbruce.spring.mqttService.consumer.MqttConsumerCallBack; +import com.github.bluesbruce.spring.utils.SnowId; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; + +import static com.github.bluesbruce.ffch.util.PropertiesUtil.load; + + +@Configuration +@Slf4j +public class MqttProviderConfig { + + public static final CmdParam cmdParam=load("/loadCmd.properties", CmdParam.class); + /** + * 客户端对象 + */ + public MqttClient client; + + /** + * 在bean初始化后连接到服务器 + */ + @PostConstruct + public void init(){ + //connect(); + } + + /** + * 客户端连接服务端 + */ + public void connect(){ + try{ + //创建MQTT客户端对象 + client = new MqttClient(cmdParam.getMqttUrl(), String.valueOf(SnowId.snowId()),new MemoryPersistence()); + //连接设置 + MqttConnectOptions options = new MqttConnectOptions(); + //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 + //设置为true表示每次连接服务器都是以新的身份 + options.setCleanSession(true); + //设置连接用户名 + options.setUserName(""); + //设置连接密码 + options.setPassword("".toCharArray()); + //设置超时时间,单位为秒 + options.setConnectionTimeout(100); + //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线 + options.setKeepAliveInterval(20); + //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 + options.setWill("willTopic",(cmdParam.getMqttUrl() + "与服务器断开连接").getBytes(),0,false); + + //防止 ERROR o.e.p.c.mqttv3.internal.ClientState - Timed out as no activity 错误 + options.setConnectionTimeout(0); + + //mqttClient.reconnect(); 这个方法或者回调已经设置了重连 + options.setAutomaticReconnect(true); + //设置回调 + client.setCallback(new MqttConsumerCallBack()); + client.connect(options); + client.subscribe(cmdParam.getMqttTopic(),0); + } catch(MqttException e){ + e.printStackTrace(); + } + } + + public MqttDeliveryToken publish(int qos,boolean retained,String topic,String message){ + log.info("服务推送MQTT消息,topic:{};qos:{};message:{}",topic,qos,message); + if (!client.isConnected()){ + connect(); + } + MqttDeliveryToken token=null; + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(qos); + mqttMessage.setRetained(retained); + mqttMessage.setPayload(message.getBytes()); + //主题的目的地,用于发布/订阅信息 + MqttTopic mqttTopic = client.getTopic(topic); + //提供一种机制来跟踪消息的传递进度 + //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 + + try { + //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 + //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 + token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } catch (MqttException e) { + e.printStackTrace(); + log.error("机场平台推送MQTT消息,topic:{};qos:{};message:{}异常:{}",topic,qos,message,e); + } + return token; + } + +} + diff --git a/src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java b/src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java new file mode 100644 index 0000000..c46c744 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java @@ -0,0 +1,64 @@ +package com.github.bluesbruce.spring.service; + + +import com.github.bluesbruce.ffch.CommandManager; +import com.github.bluesbruce.ffch.CommandManagerImpl; +import com.github.bluesbruce.ffch.commandbuidler.CommandBuidlerFactory; +import com.github.bluesbruce.spring.config.CmdParam; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import static com.github.bluesbruce.ffch.util.PropertiesUtil.load; + +@Slf4j +@Component +@Order(value=2) +public class FFrtmpServer implements ApplicationRunner { + + public static final CmdParam cmdParam=load("/loadCmd.properties", CmdParam.class); + @Override + public void run(ApplicationArguments args) { + try { + CommandManager manager = new CommandManagerImpl(); + // -rtsp_transport tcp + //测试多个任何同时执行和停止情况 + //false表示使用配置文件中的ffmpeg路径,true表示本条命令已经包含ffmpeg所在的完整路径 + //manager.start("tomcat", "ffmpeg -i http://192.168.10.101:18000/flv/live/34020000001110000002_34020000001320000071_0200000071.flv -vcodec copy -acodec copy -f flv -y rtmp://192.168.10.101:19350/rlive/stream_9?sign=f8a15b6n",false); + //manager.start("tomcat", "ffmpeg -i rtsp://192.168.144.25:554/stream=0 -vcodec copy -acodec copy -f flv -y rtmp://221.226.114.142:19350/rlive/stream_9?sign=f8a15b6n",false); + //manager.start("tomcat1", "ffmpeg -i rtsp://192.168.144.25:554/stream=0 -vcodec copy -acodec copy -f flv -y rtmp://221.226.114.142:19350/rlive/stream_11?sign=rHtBg3sz",false); + manager.start("test1", CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i",cmdParam.getPlayUrl()) + .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-b:v", "2M") + .add("-maxrate", "2M") + .add("-bufsize", "1M") + .add("-y").add(cmdParam.getPushUrl())); + Thread.sleep(10000); + manager.start("test2", CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i",cmdParam.getPlayUrl()) + .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-b:v", "2M") + .add("-maxrate", "2M") + .add("-bufsize", "1M") + .add("-y").add(cmdParam.getPushUrl2())); + Thread.sleep(300000); + // 停止全部任务 + manager.stopAll(); + }catch (Exception e){ + log.error("",e); + } + //ds.close(); + } + + + +} diff --git a/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java b/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java new file mode 100644 index 0000000..3ad3b6c --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java @@ -0,0 +1,34 @@ +package com.github.bluesbruce.spring.service; + +import com.alibaba.fastjson.JSONObject; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +@Component +public class MqttLiveHandle { + @Autowired + private RtmpLiveService rtmpLiveService; + + public void handleLive(final String topic, MqttMessage message){ + String objmsg = new String(message.getPayload()); + final JSONObject jsonObject = JSONObject.parseObject(objmsg); + if (!ObjectUtils.isEmpty(jsonObject.get("command"))){ + String cmdoperat = jsonObject.get("command").toString(); + if (cmdoperat.equals("start")){ + Thread thread = new Thread(new Runnable() { + String code= jsonObject.get("code")==null?topic.split("/")[2]:jsonObject.get("code").toString(); + public void run() { + rtmpLiveService.pushServer(code); + } + }); + thread.start(); + }else if (cmdoperat.equals("stop")){ + rtmpLiveService.stopRtmp(); + } + } + } + + +} diff --git a/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java b/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java new file mode 100644 index 0000000..29eae76 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java @@ -0,0 +1,222 @@ +package com.github.bluesbruce.spring.service; + +import com.github.bluesbruce.ffch.CommandManager; +import com.github.bluesbruce.ffch.CommandManagerImpl; +import com.github.bluesbruce.ffch.commandbuidler.CommandBuidlerFactory; +import com.github.bluesbruce.ffch.data.CommandTasker; +import com.alibaba.fastjson.JSONObject; +import com.github.bluesbruce.spring.config.CmdParam; +import com.github.bluesbruce.spring.mqttService.HttpURLConnectionUtil; +import com.github.bluesbruce.spring.mqttService.send.MqttProviderConfig; +import lombok.extern.slf4j.Slf4j; +import org.h2.util.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; + +import java.util.Collection; +import java.util.List; + +import static com.github.bluesbruce.ffch.util.PropertiesUtil.load; + +@Service +@Slf4j +public class RtmpLiveService { + /** + * 加载配置文件的推拉流地址 + */ + public static final CmdParam cmdParam=load("/loadCmd.properties", CmdParam.class); + + @Autowired + private MqttProviderConfig mqttProviderConfig; + + public static final CommandManager manager = new CommandManagerImpl(); + /** + * h获取通道并推送 + */ + public void pushServer(String code){ + //CommandManager manager = new CommandManagerImpl(); + String reTopic = cmdParam.getMqttTopic().replace("live","result"); + JSONObject jsonObject = new JSONObject(); + Collection infoList = manager.queryAll(); + log.info(infoList.toString()); + if (infoList.size()>0){ + jsonObject.put("code", -1); + jsonObject.put("msg", "推流服务已启动"); //推流失败 + mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString()); + return; + } + String pushUrl=""; + if (cmdParam.getType()==2){ + log.info("获取流媒体通道"); + //TODO 获取通道服务推拉流地址 + JSONObject object = getChenl(); + if (!ObjectUtils.isEmpty(object)){ + object.put("code", 0); + object.put("msg", "获取通道成功"); + mqttProviderConfig.publish(2,false,reTopic,object.toJSONString()); + }else{ + log.info("获取通道失败"); + object.put("code", -1); + object.put("msg", "获取通道失败"); + mqttProviderConfig.publish(2,false,reTopic,object.toJSONString()); + return; + } + //TODO end + }else{ + log.info("使用固定流媒体通道"); + pushUrl= cmdParam.getPushUrl(); + } + runRtmp(pushUrl,cmdParam.getPlayUrl(),code); + + } + + /** + * 获取阿里云通道 + * @return + */ + private JSONObject getChenl() { + String result = HttpURLConnectionUtil.doGet("https://streaming.t-aaron.com/livechannel/getLiveChannelList"); + if (!StringUtils.isNullOrEmpty(result)) { + try { + JSONObject object = JSONObject.parseObject(result); + List objectList = (List) object.get("data"); + if (objectList.size() > 0) { + log.info("获取到通道列表{}", objectList); + for (int i = 0; i < objectList.size(); i++) { + JSONObject chenl = objectList.get(i); + //设置通道占用 + JSONObject param = new JSONObject(); + param.put("code", chenl.get("code").toString()); + String resultOn = HttpURLConnectionUtil.doPut("https://streaming.t-aaron.com/livechannel/useLiveChannel", param.toJSONString()); + if (!StringUtils.isNullOrEmpty(resultOn)) { + JSONObject objectResult = JSONObject.parseObject(resultOn); + if (objectResult.get("code").toString().equals("0")) { + log.info("占用通道{}", objectList.get(i)); + return objectList.get(i); + } + } + } + + } + }catch (Exception e){ + log.error("",e); + } + }else{ + return null; + } + return null; + } + + + /** + * 根据推拉流地址推动 + * @param pushUrl + * @param playUrl + */ + public void runRtmp(String pushUrl,String playUrl,String code) { + try { + log.info("获取播流地址:{}"); + //CommandManager manager = new CommandManagerImpl(); + String taskId = manager.start(code, CommandBuidlerFactory.createBuidler() + .add("ffmpeg").add("-i",playUrl) + .add("-rtsp_transport","tcp") + .add("-vcodec","copy") + .add("-acodec","copy") + .add("-f","flv") + .add("-y").add(pushUrl)); + cmdParam.getMqttTopic(); + String reTopic = cmdParam.getMqttTopic().replace("live","result"); + + log.info(manager.queryAll().toString()); + JSONObject jsonObject = new JSONObject(); + if (StringUtils.isNullOrEmpty(taskId)){ + jsonObject.put("code", -1); + jsonObject.put("msg", "推流服务失败"); //推流失败 + mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString()); + // 停止全部任务 + manager.stopAll(); + return; + }else { + jsonObject.put("code", 0); + jsonObject.put("msg", "推流服务启动成功"); + mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString()); + } + Thread.sleep(36000); + log.info(manager.queryAll().toString()); + // 停止全部任务 + manager.stopAll(); + manager.destory(); + }catch (Exception e){ + log.error("",e); + } + //ds.close(); + } + + public void stopRtmp() { + try { + log.info("停止推流"); + //CommandManager manager = new CommandManagerImpl(); + // 停止全部任务 + int index = manager.stopAll(); + manager.destory(); + JSONObject jsonObject = new JSONObject(); + String reTopic = cmdParam.getMqttTopic().replace("live","result"); + if (index==0){ + jsonObject.put("code", -1); + jsonObject.put("msg", "推流服务关闭失败"); //推流失败 + mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString()); + // 停止全部任务 + manager.stopAll(); + return; + }else if (index>0){ + jsonObject.put("code", 0); + jsonObject.put("msg", "推流服务关闭成功"); + returnChenl(); + mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString()); + } + }catch (Exception e){ + log.error("",e); + } + //ds.close(); + } + + + /** + * 释放阿里云通道 + * @return + */ + private JSONObject returnChenl() { + try { + String result = HttpURLConnectionUtil.doGet("https://streaming.t-aaron.com/livechannel/getLiveChannelList"); + if (!StringUtils.isNullOrEmpty(result)) { + JSONObject object = JSONObject.parseObject(result); + List objectList = (List) object.get("data"); + if (objectList.size() > 0) { + log.info("获取到通道列表{}", objectList); + for (int i = 0; i < objectList.size(); i++) { + JSONObject chenl = objectList.get(i); + //设置通道占用 + JSONObject param = new JSONObject(); + param.put("code", chenl.get("code").toString()); + String resultOn = HttpURLConnectionUtil.doPut("https://streaming.t-aaron.com/livechannel/useLiveChannel", param.toJSONString()); + if (!StringUtils.isNullOrEmpty(resultOn)) { + JSONObject objectResult = JSONObject.parseObject(resultOn); + if (objectResult.get("code").toString().equals("0")) { + log.info("占用通道{}", objectList.get(i)); + return objectList.get(i); + } + } + } + + } + } else { + return null; + } + return null; + }catch (Exception e){ + log.error("",e); + } + return null; + } +} diff --git a/src/main/java/com/github/bluesbruce/spring/utils/SnowId.java b/src/main/java/com/github/bluesbruce/spring/utils/SnowId.java new file mode 100644 index 0000000..427e792 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/utils/SnowId.java @@ -0,0 +1,137 @@ +package com.github.bluesbruce.spring.utils; + + +public class SnowId { + + // ==============================Fields=========================================== + /** 开始时间截 (2015-01-01) */ + private final long twepoch = 1420041600000L; + + /** 机器id所占的位数 */ + private final long workerIdBits = 5L; + + /** 数据标识id所占的位数 */ + private final long datacenterIdBits = 5L; + + /** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */ + private final long maxWorkerId = -1L ^ (-1L << workerIdBits); + + /** 支持的最大数据标识id,结果是31 */ + private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); + + /** 序列在id中占的位数 */ + private final long sequenceBits = 12L; + + /** 机器ID向左移12位 */ + private final long workerIdShift = sequenceBits; + + /** 数据标识id向左移17位(12+5) */ + private final long datacenterIdShift = sequenceBits + workerIdBits; + + /** 时间截向左移22位(5+5+12) */ + private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; + + /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + + /** 工作机器ID(0~31) */ + private long workerId; + + /** 数据中心ID(0~31) */ + private long datacenterId; + + /** 毫秒内序列(0~4095) */ + private long sequence = 0L; + + /** 上次生成ID的时间截 */ + private long lastTimestamp = -1L; + + //==============================Constructors===================================== + /** + * 构造函数 + * @param workerId 工作ID (0~31) + * @param datacenterId 数据中心ID (0~31) + */ + public SnowId(long workerId, long datacenterId) { + if (workerId > maxWorkerId || workerId < 0) { + throw new IllegalArgumentException(String.format + ("worker Id can't be greater than %d or less than 0", maxWorkerId)); + } + if (datacenterId > maxDatacenterId || datacenterId < 0) { + throw new IllegalArgumentException(String.format + ("datacenter Id can't be greater than %d or less than 0", maxDatacenterId)); + } + this.workerId = workerId; + this.datacenterId = datacenterId; + } + + // ==============================Methods========================================== + /** + * 获得下一个ID (该方法是线程安全的) + * @return SnowflakeId + */ + public synchronized long nextId() { + long timestamp = timeGen(); + + //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 + if (timestamp < lastTimestamp) { + throw new RuntimeException( + String.format + ("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); + } + + //如果是同一时间生成的,则进行毫秒内序列 + if (lastTimestamp == timestamp) { + sequence = (sequence + 1) & sequenceMask; + //毫秒内序列溢出 + if (sequence == 0) { + //阻塞到下一个毫秒,获得新的时间戳 + timestamp = tilNextMillis(lastTimestamp); + } + } + //时间戳改变,毫秒内序列重置 + else { + sequence = 0L; + } + + //上次生成ID的时间截 + lastTimestamp = timestamp; + + //移位并通过或运算拼到一起组成64位的ID + return ((timestamp - twepoch) << timestampLeftShift) // + | (datacenterId << datacenterIdShift) // + | (workerId << workerIdShift) // + | sequence; + } + + /** + * 阻塞到下一个毫秒,直到获得新的时间戳 + * @param lastTimestamp 上次生成ID的时间截 + * @return 当前时间戳 + */ + protected long tilNextMillis(long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + /** + * 返回以毫秒为单位的当前时间 + * @return 当前时间(毫秒) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } + + //==============================Test============================================= + /** 测试 */ + public static long snowId() { + SnowId idWorker = new SnowId(0, 0); + long id = idWorker.nextId(); + return id; + // log.info("id:"+id); + //id:768842202204864512 + } +} diff --git a/src/main/java/com/github/bluesbruce/spring/utils/SpringUtil.java b/src/main/java/com/github/bluesbruce/spring/utils/SpringUtil.java new file mode 100644 index 0000000..e2a3a5b --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/utils/SpringUtil.java @@ -0,0 +1,60 @@ +package com.github.bluesbruce.spring.utils; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +/** + * 实现ApplicationContextAware接口,并加入Component注解,让spring扫描到该bean + * 该类用于在普通Java类中注入bean,普通Java类中用@Autowired是无法注入bean的 + */ +@Component +public class SpringUtil implements ApplicationContextAware { + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + if(SpringUtil.applicationContext == null) { + SpringUtil.applicationContext = applicationContext; + } + } + + /** + * 获取applicationContext + */ + public static ApplicationContext getApplicationContext() { + return applicationContext; + } + + /** + *通过name获取 Bean. + * @param name + * @return + */ + public static Object getBean(String name){ + return getApplicationContext().getBean(name); + } + + /** + * 通过class获取Bean. + * @param clazz + * @param + * @return + */ + public static T getBean(Class clazz){ + return getApplicationContext().getBean(clazz); + } + + /** + * 通过name,以及Clazz返回指定的Bean + * @param name + * @param clazz + * @param + * @return + */ + public static T getBean(String name,Class clazz){ + return getApplicationContext().getBean(name, clazz); + } +} \ No newline at end of file diff --git a/src/main/java/com/github/bluesbruce/spring/web/SubController.java b/src/main/java/com/github/bluesbruce/spring/web/SubController.java new file mode 100644 index 0000000..d3fc580 --- /dev/null +++ b/src/main/java/com/github/bluesbruce/spring/web/SubController.java @@ -0,0 +1,34 @@ +package com.github.bluesbruce.spring.web; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; + +import javax.annotation.Resource; + +@Controller +public class SubController { + @Resource + private ObjectMapper objectMapper; + + @Value("${spring.mqtt.client.id}") + private String clientId; + + @Value("${spring.mqtt.default.topic}") + private String defaultTopic; + + @RequestMapping("/init") + @ResponseBody + public String subject(String topic, int qos) { + try { + return "发送成功"; + } catch (Exception e) { + e.printStackTrace(); + return "发送失败"; + } + } + +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties new file mode 100644 index 0000000..069ceea --- /dev/null +++ b/src/main/resources/application.properties @@ -0,0 +1,71 @@ +# IDENTITY (ContextIdApplicationContextInitializer) +spring.application.name=${project.name} +# 项目contextPath,一般在正式发布版本中,我们不配置 +#server.servlet.context-path=/ +# 错误页,指定发生错误时,跳转的URL。请查看BasicErrorController源码便知 +server.error.path=/errors +# 空白错误页激活 +server.error.whitelabel.enabled=true +# 是否包含错误堆栈,默认never,可配置always、on_trace_param +# 其中always为总是显示错误堆栈 +# on_trace_param 只有当request的parameter包含trace=true的时候显示 +server.error.include-stacktrace=on_trace_param +# 开启GZIP +server.compression.enabled=true +server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml +# 超过2KB的文件进行压缩 +server.compression.min-response-size=2KB +# 服务端口 +server.port=8032 +# session最大超时时间(秒),默认为1800秒(30分钟) +server.servlet.session.timeout=3600s +# 设置cookie的HttpOnly标识(true - 通过将无法读取到cookie信息) +server.servlet.session.cookie.http-only=false +# session的cookie名,要设置不同,防止浏览器打开多个项目,session名相同 +server.servlet.session.cookie.name=JSESSIONID_${project.artifactId} +# 单个文件上传大小 +spring.servlet.multipart.max-file-size=5MB +# 设置总上传的数据大小 +spring.servlet.multipart.max-request-size=10MB +# LOG +logging.config=classpath:log4j2.xml +# 使用CGLIB实现切面 +spring.aop.proxy-target-class=true +# 资源映射路径为/static/** +spring.mvc.static-path-pattern=/static/** +# SpringBoot视图配置 +spring.mvc.view.prefix=/ +spring.mvc.view.suffix=.jsp +# 资源映射地址 +spring.web.resources.static-locations=classpath:/static/ +############################################ +# MyBatis-Spring-Boot-Starter 相关配置 +############################################ +mybatis.type-aliases-package=com.github.bluesbruce.h2.service.dao +mybatis.mapper-locations=classpath:/mapper/*Mapper.xml +mybatis.config-location=classpath:/config/mybatis-config.xml +############################################ +# 配置H2数据库 +############################################ +spring.h2.console.enabled=true +# h2的web管理界面 +spring.h2.console.path=/h2console +spring.h2.console.settings.trace=true +# 允许外部访问 +spring.h2.console.settings.web-allow-others=true +# 用来检测连接是否有效的sql +spring.datasource.druid.validation-query=SELECT 'H' +spring.mqtt.username=admin +spring.mqtt.password=public +spring.mqtt.client.id=123456789999 +spring.mqtt.default.topic=123456789999 + + +spring.devtools.restart.enabled=true +spring.devtools.restart.additional-exclude=src/main/java +spring.devtools.restart.exclude=webapp/** +spring.freemarker.cache=false + + + + diff --git a/src/main/resources/config/mybatis-config.xml b/src/main/resources/config/mybatis-config.xml new file mode 100644 index 0000000..9540fe1 --- /dev/null +++ b/src/main/resources/config/mybatis-config.xml @@ -0,0 +1,61 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..933bcf0 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,31 @@ + + + + + D:\FFrtmp-test + %d{MM-dd HH:mm:ss.SSS} [%t-%L] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html new file mode 100644 index 0000000..aa8f8ef --- /dev/null +++ b/src/main/resources/static/index.html @@ -0,0 +1,10 @@ + + + + + Title + + +fgsagfs + + \ No newline at end of file diff --git a/src/main/webapp/sub.jsp b/src/main/webapp/sub.jsp new file mode 100644 index 0000000..583aa53 --- /dev/null +++ b/src/main/webapp/sub.jsp @@ -0,0 +1,476 @@ + + +<%@ page contentType="text/html;charset=UTF-8" language="java" %> + + + + + + + + + mqtt工具 + + + + + + + + <%-- + + + + + <%– + + + + 详细介绍 + 可以向相册加入信息,作者名称背景等,然后分享到社交网络上,目前仅仅是静态页面 + + + + 联系作者 + + 通过QQ联系 + 通过码云联系 + 通过Github联系 + + + + + –%> + + + + + + <%– + + + + + + + 我的相册 + –%> + + + + + + + + + + --%> + + + + + + + + 请选择 + <%--测试环境 + 开发环境--%> + + + + + + + 切换 + + kaifa + <%--主要跳转按钮 + 次要跳转按钮--%> + + + + + + + + + 提交 + + 订阅列表 + <%--主要跳转按钮 + 次要跳转按钮--%> + + + + + + + + + + + + + <%--订阅--%> + 全部订阅 + + + + id排序订阅消息备注操作 + + + + + + + + + + + + + + + + + + <%--sdfa啊手动阀 + 啊手动阀 + --%> + + + + + + + + + + + + + + + + + + + + + + + + + + × + 模态框(Modal)标题 + + + + + + 主题Qos节点 + + + + + + + + + + + + + + + + \ No newline at end of file
可以向相册加入信息,作者名称背景等,然后分享到社交网络上,目前仅仅是静态页面
啊手动阀