Explorar el Código

ffmpeg工具

master
wangwei hace 11 meses
commit
a0799c9cb9
Se han modificado 51 ficheros con 4524 adiciones y 0 borrados
  1. +32
    -0
      .gitignore
  2. +201
    -0
      LICENSE
  3. +16
    -0
      README.md
  4. +10
    -0
      loadCmd.properties
  5. +13
    -0
      loadFFmpeg.properties
  6. +312
    -0
      pom.xml
  7. +114
    -0
      src/main/java/com/github/bluesbruce/ffch/CommandManager.java
  8. +273
    -0
      src/main/java/com/github/bluesbruce/ffch/CommandManagerImpl.java
  9. +18
    -0
      src/main/java/com/github/bluesbruce/ffch/callback/EventCallBack.java
  10. +13
    -0
      src/main/java/com/github/bluesbruce/ffch/callback/EventCallBackType.java
  11. +89
    -0
      src/main/java/com/github/bluesbruce/ffch/callback/worker/EventMsgNetWorker.java
  12. +20
    -0
      src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssembly.java
  13. +82
    -0
      src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssemblyImpl.java
  14. +55
    -0
      src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidler.java
  15. +17
    -0
      src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidlerFactory.java
  16. +69
    -0
      src/main/java/com/github/bluesbruce/ffch/commandbuidler/DefaultCommandBuidler.java
  17. +62
    -0
      src/main/java/com/github/bluesbruce/ffch/config/ProgramConfig.java
  18. +15
    -0
      src/main/java/com/github/bluesbruce/ffch/config/defaultFFmpegConfig.properties
  19. +59
    -0
      src/main/java/com/github/bluesbruce/ffch/data/CommandTasker.java
  20. +46
    -0
      src/main/java/com/github/bluesbruce/ffch/data/TaskDao.java
  21. +69
    -0
      src/main/java/com/github/bluesbruce/ffch/data/TaskDaoImpl.java
  22. +41
    -0
      src/main/java/com/github/bluesbruce/ffch/data/TaskerEventMsg.java
  23. +47
    -0
      src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java
  24. +81
    -0
      src/main/java/com/github/bluesbruce/ffch/handler/KeepAliveHandler.java
  25. +116
    -0
      src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java
  26. +21
    -0
      src/main/java/com/github/bluesbruce/ffch/handler/OutHandlerMethod.java
  27. +45
    -0
      src/main/java/com/github/bluesbruce/ffch/handler/TaskHandler.java
  28. +69
    -0
      src/main/java/com/github/bluesbruce/ffch/handler/TaskHandlerImpl.java
  29. +13
    -0
      src/main/java/com/github/bluesbruce/ffch/loadFFmpeg.properties
  30. +200
    -0
      src/main/java/com/github/bluesbruce/ffch/test/Test.java
  31. +70
    -0
      src/main/java/com/github/bluesbruce/ffch/util/CommonUtil.java
  32. +110
    -0
      src/main/java/com/github/bluesbruce/ffch/util/ExecUtil.java
  33. +144
    -0
      src/main/java/com/github/bluesbruce/ffch/util/PropertiesUtil.java
  34. +166
    -0
      src/main/java/com/github/bluesbruce/ffch/util/ReflectUtil.java
  35. +34
    -0
      src/main/java/com/github/bluesbruce/spring/Application.java
  36. +74
    -0
      src/main/java/com/github/bluesbruce/spring/config/CmdParam.java
  37. +41
    -0
      src/main/java/com/github/bluesbruce/spring/config/Readproperties.java
  38. +295
    -0
      src/main/java/com/github/bluesbruce/spring/mqttService/HttpURLConnectionUtil.java
  39. +73
    -0
      src/main/java/com/github/bluesbruce/spring/mqttService/consumer/MqttConsumerCallBack.java
  40. +99
    -0
      src/main/java/com/github/bluesbruce/spring/mqttService/send/MqttProviderConfig.java
  41. +64
    -0
      src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java
  42. +34
    -0
      src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java
  43. +222
    -0
      src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java
  44. +137
    -0
      src/main/java/com/github/bluesbruce/spring/utils/SnowId.java
  45. +60
    -0
      src/main/java/com/github/bluesbruce/spring/utils/SpringUtil.java
  46. +34
    -0
      src/main/java/com/github/bluesbruce/spring/web/SubController.java
  47. +71
    -0
      src/main/resources/application.properties
  48. +61
    -0
      src/main/resources/config/mybatis-config.xml
  49. +31
    -0
      src/main/resources/log4j2.xml
  50. +10
    -0
      src/main/resources/static/index.html
  51. +476
    -0
      src/main/webapp/sub.jsp

+ 32
- 0
.gitignore Ver fichero

@@ -0,0 +1,32 @@
# maven ignore
target/
*.jar
*.war
*.zip
*.tar
*.tar.gz

# eclipse ignore
.settings/
.project
.classpath

# idea ignore
.idea/
*.ipr
*.iml
*.iws

# temp ignore
*.log
*.cache
*.diff
*.patch
*.tmp

# system ignore
.DS_Store
Thumbs.db

#sonar ignore
.sonar

+ 201
- 0
LICENSE Ver fichero

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/

TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION

1. Definitions.

"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.

"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.

"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.

"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.

"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.

"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.

"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).

"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.

"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."

"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.

2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.

3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.

4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:

(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and

(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and

(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and

(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.

You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.

5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.

6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.

7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.

8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.

9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.

END OF TERMS AND CONDITIONS

APPENDIX: How to apply the Apache License to your work.

To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright [yyyy] [name of copyright owner]

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

+ 16
- 0
README.md Ver fichero

@@ -0,0 +1,16 @@
# SpringBoot集成H2

1. H2数据库是一个开源的关系型数据库。H2采用java语言编写,不受平台的限制,同时支持网络版和嵌入式版本,有比较好的兼容性,支持相当标准的sql标准
2. 提供JDBC、ODBC访问接口,提供了非常友好的基于web的数据库管理界面

> 官网:http://www.h2database.com/


在``application.properties``配置了H2的管理后台入口,``http://localhost:8081/h2console``
``JDBC URL``、``User Name``和``Password``参考配置文件输入。

执行``test\resource``下的sql脚本。

两个测试接口,get请求
- http://localhost:8081/getList.json?name=王
- http://localhost:8081/getOne.json?id=6

+ 10
- 0
loadCmd.properties Ver fichero

@@ -0,0 +1,10 @@
#芢霜華硊ㄗegㄩ陝爵堁華硊ㄘ
pushUrl=rtmp://192.168.10.101:19350/rlive/stream_12?sign=LeL5Wchx
pushUrl2=rtmp://192.168.10.101:19350/rlive/stream_17?sign=64FIaU8X
#嶺霜華硊ㄗegㄩ裂組嶺霜腔華硊ㄘ
playUrl=http://192.168.10.101:18000/flv/live/34020000001110000001_34020000001320000099_0200000099.flv
#嘐隅籵耋 1 鳳龰籵耋 2
type=1

mqttUrl=tcp://106.15.120.154
mqttTopic=/v1/123987/rtmp/live

+ 13
- 0
loadFFmpeg.properties Ver fichero

@@ -0,0 +1,13 @@
#ffmpeg执行路径,一般为ffmpeg的安装目录,该路径只能是目录,不能为具体文件路径,否则会报错
path=Z://ffmpeg/bin/
#存放任务的默认Map的初始化大小
size=10
#事件回调通知接口地址
callback=http://127.0.0.1/callback
#网络超时设置(毫秒)
timeout=300

#开启保活线程
keepalive=false
#是否输出debug消息
debug=true

+ 312
- 0
pom.xml Ver fichero

@@ -0,0 +1,312 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.github.bluesbruce</groupId>
<modelVersion>4.0.0</modelVersion>
<artifactId>ffch-test</artifactId>
<version>1.0.0</version>
<packaging>${pom.package}</packaging>
<name>H2 Test</name>
<description>SpringBoot2集成H2</description>
<url>${project.url}</url>

<!-- 项目的核心开发者信息 -->
<developers>
<developer>
<name>BBF</name>
<roles>
<role>architect</role>
<role>developer</role>
</roles>
<timezone>+8</timezone>
</developer>
</developers>

<!-- 项目许可 -->
<licenses>
<license>
<name>The Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<scm>
<connection>scm:git@gitee.com:bbfbbf/h2-test.git</connection>
<developerConnection>scm:git@gitee.com:bbfbbf/h2-test.git</developerConnection>
<url>${project.url}</url>
<tag>${project.version}</tag>
</scm>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.url>https://gitee.com/bbfbbf/h2-test</project.url>
<!-- base setting -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.locales>zh_CN</project.build.locales>
<project.build.jdk>1.8</project.build.jdk>

<!-- 日志 -->
<pom.log.name>FFRTMP-test</pom.log.name>
<pom.log.dir.backup>backup</pom.log.dir.backup>
<!-- 日志切割的最小单位 -->
<pom.log.filesize>5M</pom.log.filesize>
<!-- 日志输出级别 -->
<pom.log.level>INFO</pom.log.level>

<!-- test dependencies -->
<junit.version>4.13</junit.version>

<!-- project dependencies -->
<spring.boot.version>2.4.3</spring.boot.version>
<mybatis-spring.boot.version>2.1.4</mybatis-spring.boot.version>
<druid.boot.version>1.2.5</druid.boot.version>
<h2.version>2.1.212</h2.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- spring boot依赖 -->
<!-- 打包: mvn package spring-boot:repackage -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- spring boot开始 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<!-- 排除logback -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- MySql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>

<!-- 日志log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- spring boot结束 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis-spring.boot.version}</version>
</dependency>
<!-- 数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.boot.version}</version>
</dependency>
<!-- spring start -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>





<!--内置tocat对Jsp支持的依赖,用于编译Jsp-->
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
</dependency>

<!--jstl的支持,c标签-->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>javax.servlet.jsp-api</artifactId>
<version>2.3.1</version>
</dependency>
<!--mqtt 相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--mybatis-plus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.0</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!--devtools热部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
<scope>true</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- 测试插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.0</version>
<configuration>
<!-- 跳过单元测试 -->
<skipTests>true</skipTests>
<!-- 如果单元测试中有输出中文,eclipse的控制台里中文可能会变成乱码输出 -->
<argLine>-Dfile.encoding=UTF-8</argLine>
</configuration>
</plugin>
<!-- 如果没这个插件,打包命令 mvn package spring-boot:repackage -->
<!-- 现在打包命令 mvn package -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<!--允许linux上注册服务-->
<configuration>
<executable>true</executable>
<fork>true</fork>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<!-- false==无web.xml文件的构建war-->
<failOnMissingWebXml>false</failOnMissingWebXml>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
</plugins>
<!--配置资源文件过滤,如果有其他需要打包的资源,需要重载此配置节 -->
<resources>
<resource>
<!--源文件夹-->
<directory>scr/main/webapp</directory>
<!--指定编译到META-INF/resources-->
<targetPath>META-INF/resources</targetPath>
<!--指定源文件夹中哪个资源要进行编译-->
<includes>
<include>*.*</include>
</includes>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>config/**</include>
<include>log4j2.xml</include>
</includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>mapper/**</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
<profiles>
<profile>
<id>jar</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<pom.package>war</pom.package>
<pom.packageName>${project.artifactId}-${project.version}</pom.packageName>
<pom.log.dir>d:/FFrtmp-test</pom.log.dir>
</properties>
<dependencies>
<!-- undertow容器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

+ 114
- 0
src/main/java/com/github/bluesbruce/ffch/CommandManager.java Ver fichero

@@ -0,0 +1,114 @@
package com.github.bluesbruce.ffch;

import com.github.bluesbruce.ffch.commandbuidler.CommandAssembly;
import com.github.bluesbruce.ffch.commandbuidler.CommandBuidler;
import com.github.bluesbruce.ffch.config.ProgramConfig;
import com.github.bluesbruce.ffch.data.CommandTasker;
import com.github.bluesbruce.ffch.data.TaskDao;
import com.github.bluesbruce.ffch.handler.TaskHandler;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.Map;

import static com.github.bluesbruce.ffch.util.PropertiesUtil.load;

/**
* FFmpeg命令操作管理器,可执行FFmpeg命令/停止/查询任务信息
*
* @author eguid
* @since jdk1.7
* @version 2016年10月29日
*/
@Component
public interface CommandManager {
public static final ProgramConfig config=load("/loadFFmpeg.properties", ProgramConfig.class);
/**
* 注入自己实现的持久层
*
* @param taskDao
*/
public void setTaskDao(TaskDao taskDao);

/**
* 注入ffmpeg命令处理器
*
* @param taskHandler
*/
public void setTaskHandler(TaskHandler taskHandler);

/**
* 注入ffmpeg命令组装器
*
* @param commandAssembly
*/
public void setCommandAssembly(CommandAssembly commandAssembly);

/**
* 通过命令发布任务(默认命令前不加FFmpeg路径)
*
* @param id - 任务标识
* @param command - FFmpeg命令
* @return
*/
public String start(String id, String command);
/**
* 通过命令发布任务
* @param id - 任务标识
* @param commond - FFmpeg命令
* @param hasPath - 命令中是否包含FFmpeg执行文件的绝对路径
* @return
*/
public String start(String id, String commond, boolean hasPath);

/**
* 通过流式命令构建器发布任务
* @param commandBuidler
* @return
*/
public String start(String id, CommandBuidler commandBuidler);

/**
* 通过组装命令发布任务
*
* @param assembly
* -组装命令(详细请参照readme文档说明)
* @return
*/
public String start(Map<String, String> assembly);

/**
* 停止任务
*
* @param id
* @return
*/
public boolean stop(String id);

/**
* 停止全部任务
*
* @return
*/
public int stopAll();

/**
* 通过id查询任务信息
*
* @param id
*/
public CommandTasker query(String id);

/**
* 查询全部任务信息
*
*/
public Collection<CommandTasker> queryAll();
/**
* 销毁一些后台资源和保活线程
*/
public void destory();
}

+ 273
- 0
src/main/java/com/github/bluesbruce/ffch/CommandManagerImpl.java Ver fichero

@@ -0,0 +1,273 @@
package com.github.bluesbruce.ffch;

import com.github.bluesbruce.ffch.commandbuidler.CommandAssembly;
import com.github.bluesbruce.ffch.commandbuidler.CommandAssemblyImpl;
import com.github.bluesbruce.ffch.commandbuidler.CommandBuidler;
import com.github.bluesbruce.ffch.data.CommandTasker;
import com.github.bluesbruce.ffch.data.TaskDao;
import com.github.bluesbruce.ffch.data.TaskDaoImpl;
import com.github.bluesbruce.ffch.handler.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

/**
* FFmpeg命令操作管理器
*
* @author eguid
* @since jdk1.7
* @version 2017年10月13日
*/
@Slf4j
@Service
public class CommandManagerImpl implements CommandManager {
/**
* 任务持久化器
*/
private TaskDao taskDao = null;
/**
* 任务执行处理器
*/
private TaskHandler taskHandler = null;
/**
* 命令组装器
*/
private CommandAssembly commandAssembly = null;
/**
* 任务消息处理器
*/
private OutHandlerMethod ohm = null;
/**
* 保活处理器
*/
private KeepAliveHandler keepAliveHandler=null;

/**
* 全部默认初始化,自动查找配置文件
*/
public CommandManagerImpl() {
this(null);
}

/**
* 指定任务池大小的初始化,其他使用默认
* @param size
*/
public CommandManagerImpl(Integer size) {
init(size);
}
/**
* 初始化
* @param taskDao
* @param taskHandler
* @param commandAssembly
* @param ohm
* @param size
*/
public CommandManagerImpl(TaskDao taskDao, TaskHandler taskHandler, CommandAssembly commandAssembly, OutHandlerMethod ohm,Integer size) {
super();
this.taskDao = taskDao;
this.taskHandler = taskHandler;
this.commandAssembly = commandAssembly;
this.ohm = ohm;
init(size);
}

/**
* 初始化,如果几个处理器未注入,则使用默认处理器
*
* @param size
*/
public void init(Integer size) {
if (config == null) {
log.info("配置文件加载失败!配置文件不存在或配置错误");
return;
}
boolean iskeepalive=true;
if (size == null) {
size = config.getSize() == null ? 10 : config.getSize();
iskeepalive=config.isKeepalive();
}
if (this.ohm == null) {
this.ohm = new DefaultOutHandlerMethod();
}
if (this.taskDao == null) {
this.taskDao = new TaskDaoImpl(size);
//初始化保活线程
if(iskeepalive) {
keepAliveHandler = new KeepAliveHandler(taskDao);
keepAliveHandler.start();
}
}
if (this.taskHandler == null) {
this.taskHandler = new TaskHandlerImpl(this.ohm);
}
if (this.commandAssembly == null) {
this.commandAssembly = new CommandAssemblyImpl();
}
}

public void setTaskDao(TaskDao taskDao) {
this.taskDao = taskDao;
}

public void setTaskHandler(TaskHandler taskHandler) {
this.taskHandler = taskHandler;
}

public void setCommandAssembly(CommandAssembly commandAssembly) {
this.commandAssembly = commandAssembly;
}

public void setOhm(OutHandlerMethod ohm) {
this.ohm = ohm;
}

/**
* 是否已经初始化
*
* @param 如果未初始化时是否初始化
* @return
*/
public boolean isInit(boolean b) {
boolean ret = this.ohm == null || this.taskDao == null || this.taskHandler == null|| this.commandAssembly == null;
if (ret && b) {
init(null);
}
return ret;
}

@Override
public String start(String id, String command) {
return start(id, command, false);
}

@Override
public String start(String id, String command, boolean hasPath) {
if (isInit(true)) {
log.info("执行失败,未进行初始化或初始化失败!");
return null;
}
if (id != null && command != null) {
CommandTasker tasker = taskHandler.process(id, hasPath ? command : config.getPath() + command);
if (tasker != null) {
int ret = taskDao.add(tasker);
if (ret > 0) {
return tasker.getId();
} else {
// 持久化信息失败,停止处理
taskHandler.stop(tasker.getProcess(), tasker.getThread());
if (config.isDebug())
log.info("持久化失败,停止任务!");
}
}
}
return null;
}

@Override
public String start(Map<String, String> assembly) {
// ffmpeg环境是否配置正确
if (checkConfig()) {
log.info("配置未正确加载,无法执行");
return null;
}
// 参数是否符合要求
if (assembly == null || assembly.isEmpty() || !assembly.containsKey("appName")) {
log.info("参数不正确,无法执行");
return null;
}
String appName = (String) assembly.get("appName");
if (appName != null && "".equals(appName.trim())) {
log.info("appName不能为空");
return null;
}
assembly.put("ffmpegPath", config.getPath() + "ffmpeg");
String command = commandAssembly.assembly(assembly);
if (command != null) {
return start(appName, command, true);
}

return null;
}

@Override
public String start(String id,CommandBuidler commandBuidler) {
// ffmpeg环境是否配置正确
if (checkConfig()) {
log.info("配置未正确加载,无法执行");
return null;
}
String command =commandBuidler.get();
if (command != null) {
return start(id, command, true);
}
return null;
}

private boolean checkConfig() {
return config == null;
}
@Override
public boolean stop(String id) {
if (id != null && taskDao.isHave(id)) {
if (config.isDebug())
log.info("正在停止任务:" + id);
CommandTasker tasker = taskDao.get(id);
if (taskHandler.stop(tasker.getProcess(), tasker.getThread())) {
taskDao.remove(id);
log.info("停止任务完毕:" + id);
return true;
}
}
log.info("停止任务失败!id=" + id);
return false;
}

@Override
public int stopAll() {
Collection<CommandTasker> list = taskDao.getAll();
Iterator<CommandTasker> iter = list.iterator();
CommandTasker tasker = null;
int index = 0;
while (iter.hasNext()) {
tasker = iter.next();
if (taskHandler.stop(tasker.getProcess(), tasker.getThread())) {
taskDao.remove(tasker.getId());
index++;
}
}
if (config.isDebug())
log.info("停止了" + index + "个任务!");
return index;
}

@Override
public CommandTasker query(String id) {
return taskDao.get(id);
}

@Override
public Collection<CommandTasker> queryAll() {
return taskDao.getAll();
}

@Override
public void destory() {
if(keepAliveHandler!=null) {
//安全停止保活线程
keepAliveHandler.interrupt();
}
}
}

+ 18
- 0
src/main/java/com/github/bluesbruce/ffch/callback/EventCallBack.java Ver fichero

@@ -0,0 +1,18 @@
package com.github.bluesbruce.ffch.callback;

import com.github.bluesbruce.ffch.data.CommandTasker;

/**
* 事件回调
* @author eguid
*
*/
public interface EventCallBack {
/**
* 命令行执行开始事件
* @param t -事件类型
* @param tasker -任务信息
*/
boolean callback(EventCallBackType t, CommandTasker tasker);
}

+ 13
- 0
src/main/java/com/github/bluesbruce/ffch/callback/EventCallBackType.java Ver fichero

@@ -0,0 +1,13 @@
package com.github.bluesbruce.ffch.callback;

/**
* 事件回调类型
* @author eguid
*
*/
public enum EventCallBackType {
exec,//执行命令后通知
stop,//停止命令后通知
interrupt,//进程中断后通知
heartbeat,//主进程存活心跳
}

+ 89
- 0
src/main/java/com/github/bluesbruce/ffch/callback/worker/EventMsgNetWorker.java Ver fichero

@@ -0,0 +1,89 @@
package com.github.bluesbruce.ffch.callback.worker;

import com.github.bluesbruce.ffch.CommandManager;
import com.github.bluesbruce.ffch.callback.EventCallBack;
import com.github.bluesbruce.ffch.callback.EventCallBackType;
import com.github.bluesbruce.ffch.data.CommandTasker;
import com.github.bluesbruce.ffch.data.TaskerEventMsg;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* 事件消息独立发送线程
*
* @author eguid
*
*/
@Slf4j
public class EventMsgNetWorker extends Thread implements EventCallBack{

protected static Queue<TaskerEventMsg> queue = null;// 一个事件消息队列,发送失败的事件消息将会进入队列队尾等待下次再次发送

// 一个网络库,用于快速发送http消息
private int timeout = 300;// 默认300毫秒

public EventMsgNetWorker(int timeout) {
super();
this.timeout = timeout;
queue = new ConcurrentLinkedQueue<>();
}

@Override
public void run() {
for (;;) {
try {
while (queue.peek() != null) {
TaskerEventMsg t = queue.poll();
// 借助网络库发送该消息
String url = CommandManager.config.getCallback();
if (reqGET(url)) {
log.info("发送成功");
} else {
log.info("发送失败");
// 发送失败的事件消息将会进入队列队尾等待下次再次发送
queue.offer(t);
}
}
} catch (Exception e) {

}
}

}

/**
* 发送get请求
*/
private boolean reqGET(String url) {
URL realUrl;
// PrintWriter out = null;
try {
realUrl = new URL(url);
// 打开和URL之间的连接
URLConnection connection = realUrl.openConnection();
// 设置通用的请求属性
connection.setUseCaches(false);
connection.setConnectTimeout(timeout);
connection.setRequestProperty("accept", "*/*");
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
connection.setDoOutput(false);
connection.setDoInput(false);
connection.connect();
return true;
} catch (IOException e) {
return false;
}
}

@Override
public boolean callback(EventCallBackType ecbt, CommandTasker tasker) {
return queue.add(new TaskerEventMsg(ecbt, tasker));
}

}

+ 20
- 0
src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssembly.java Ver fichero

@@ -0,0 +1,20 @@
package com.github.bluesbruce.ffch.commandbuidler;

import java.util.Map;

/**
* 命令组装器接口
* @author eguid
* @since jdk1.7
* @version 2016年10月29日
*/
public interface CommandAssembly {
/**
* 将参数转为ffmpeg命令
* @param paramMap
* @return
*/
public String assembly(Map<String, String> paramMap);
public String assembly();
}

+ 82
- 0
src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandAssemblyImpl.java Ver fichero

@@ -0,0 +1,82 @@
package com.github.bluesbruce.ffch.commandbuidler;

import java.util.Map;

/**
* 默认命令组装器实现
* @author eguid
* @since jdk1.7
* @version 2016年10月29日
*/
public class CommandAssemblyImpl implements CommandAssembly{
/**
*
* @param map
* -要组装的map
* @param id
* -返回参数:id
* @param id
* -返回参数:组装好的命令
* @return
*/
public String assembly(Map<String, String> paramMap) {
try {
if (paramMap.containsKey("ffmpegPath")) {
String ffmpegPath = (String) paramMap.get("ffmpegPath");
// -i:输入流地址或者文件绝对地址
StringBuilder comm = new StringBuilder(ffmpegPath + " -i ");
// 是否有必输项:输入地址,输出地址,应用名,twoPart:0-推一个元码流;1-推一个自定义推流;2-推两个流(一个是自定义,一个是元码)
if (paramMap.containsKey("input") && paramMap.containsKey("output") && paramMap.containsKey("appName")
&& paramMap.containsKey("twoPart")) {
String input = (String) paramMap.get("input");
String output = (String) paramMap.get("output");
String appName = (String) paramMap.get("appName");
String twoPart = (String) paramMap.get("twoPart");
String codec = (String) paramMap.get("codec");
// 默认h264解码
codec = (codec == null ? "h264" : (String) paramMap.get("codec"));
// 输入地址
comm.append(input);
// 当twoPart为0时,只推一个元码流
if ("0".equals(twoPart)) {
comm.append(" -vcodec " + codec + " -f flv -an " + output + appName);
} else {
// -f :转换格式,默认flv
if (paramMap.containsKey("fmt")) {
String fmt = (String) paramMap.get("fmt");
comm.append(" -f " + fmt);
}
// -r :帧率,默认25;-g :帧间隔
if (paramMap.containsKey("fps")) {
String fps = (String) paramMap.get("fps");
comm.append(" -r " + fps);
comm.append(" -g " + fps);
}
// -s 分辨率 默认是原分辨率
if (paramMap.containsKey("rs")) {
String rs = (String) paramMap.get("rs");
comm.append(" -s " + rs);
}
// 输出地址+发布的应用名
comm.append(" -an " + output + appName);
// 当twoPart为2时推两个流,一个自定义流,一个元码流
if ("2".equals(twoPart)) {
// 一个视频源,可以有多个输出,第二个输出为拷贝源视频输出,不改变视频的各项参数并且命名为应用名+HD
comm.append(" -vcodec copy -f flv -an ").append(output + appName + "HD");
}
}
return comm.toString();
}
}
} catch (Exception e) {
return null;
}
return null;
}

@Override
public String assembly() {
// TODO Auto-generated method stub
return null;
}
}

+ 55
- 0
src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidler.java Ver fichero

@@ -0,0 +1,55 @@
package com.github.bluesbruce.ffch.commandbuidler;

/**
* 流式命令行构建器
*
* @author eguid
*/
public interface CommandBuidler {

/**
* 创建命令行
*
* @param root
* -命令行运行根目录或FFmpeg可执行文件安装目录
* @return
*/
CommandBuidler create(String root);

/**
* 创建默认根目录或默认FFmpeg可执行文件安装目录
*
* @return
*/
CommandBuidler create();

/**
* 累加键-值命令
*
* @param key
* @param val
* @return
*/
CommandBuidler add(String key, String val);

/**
* 累加命令
*
* @param val
* @return
*/
CommandBuidler add(String val);

/**
* 生成完整命令行
*
* @return
*/
CommandBuidler build();
/**
* 获取已经构建好的命令行
* @return
*/
String get();
}

+ 17
- 0
src/main/java/com/github/bluesbruce/ffch/commandbuidler/CommandBuidlerFactory.java Ver fichero

@@ -0,0 +1,17 @@
package com.github.bluesbruce.ffch.commandbuidler;

/**
* 默认流式命令构建器工厂类
* @author eguid
*
*/
public class CommandBuidlerFactory {

public static CommandBuidler createBuidler() {
return new DefaultCommandBuidler();
};
public static CommandBuidler createBuidler(String rootpath) {
return new DefaultCommandBuidler(rootpath);
};
}

+ 69
- 0
src/main/java/com/github/bluesbruce/ffch/commandbuidler/DefaultCommandBuidler.java Ver fichero

@@ -0,0 +1,69 @@
package com.github.bluesbruce.ffch.commandbuidler;

import com.github.bluesbruce.ffch.CommandManager;

/**
* 默认流式命令行构建器(非线程安全)
* @author eguid
*/
public class DefaultCommandBuidler implements CommandBuidler{

StringBuilder buidler=null;
String command=null;
public DefaultCommandBuidler() {
create();
}

public DefaultCommandBuidler(String rootpath) {
create(rootpath);
}


@Override
public CommandBuidler create(String rootpath) {
buidler=new StringBuilder(rootpath);
return this;
}

@Override
public CommandBuidler create() {
return create(CommandManager.config.getPath());
}

@Override
public CommandBuidler add(String key, String val) {
return add(key).add(val);
}

@Override
public CommandBuidler add(String val) {
if(buidler!=null) {
buidler.append(val);
addBlankspace();
}
return this;
}

@Override
public CommandBuidler build() {
if(buidler!=null) {
command=buidler.toString();
}
return this;
}
private void addBlankspace() {
buidler.append(" ");
}

@Override
public String get() {
if(command==null) {
build();
}
return command;
}

}

+ 62
- 0
src/main/java/com/github/bluesbruce/ffch/config/ProgramConfig.java Ver fichero

@@ -0,0 +1,62 @@
package com.github.bluesbruce.ffch.config;

/**
* 程序基础配置
*
* @author eguid
*
*/
public class ProgramConfig {
private String path;//默认命令行执行根路径
private boolean debug;//是否开启debug模式
private Integer size;//任务池大小
private String callback;//回调通知地址
private boolean keepalive;//是否开启保活

public String getPath() {
return path;
}

public void setPath(String path) {
this.path = path;
}

public boolean isDebug() {
return debug;
}

public void setDebug(boolean debug) {
this.debug = debug;
}

public Integer getSize() {
return size;
}

public void setSize(Integer size) {
this.size = size;
}

public String getCallback() {
return callback;
}

public void setCallback(String callback) {
this.callback = callback;
}

public boolean isKeepalive() {
return keepalive;
}

public void setKeepalive(boolean keepalive) {
this.keepalive = keepalive;
}

@Override
public String toString() {
return "ProgramConfig [path=" + path + ", debug=" + debug + ", size=" + size + ", callback=" + callback
+ ", keepalive=" + keepalive + "]";
}
}

+ 15
- 0
src/main/java/com/github/bluesbruce/ffch/config/defaultFFmpegConfig.properties Ver fichero

@@ -0,0 +1,15 @@
#ffmpeg执行路径,一般为ffmpeg的安装目录,该路径只能是目录,不能为具体文件路径,否则会报错
path=Z://ffmpeg/bin/
#是否启用默认的ffmpeg,如果启用默认ffmpeg,上面配置的path就没有用了
defaultpathEnable=false
#存放任务的默认Map的初始化大小
size=10
#事件回调通知接口地址
callback=http://127.0.0.1/callback
#网络超时设置(毫秒)
timeout=300
#开启保活线程
keepalive=true
#是否输出debug消息
debug=true


+ 59
- 0
src/main/java/com/github/bluesbruce/ffch/data/CommandTasker.java Ver fichero

@@ -0,0 +1,59 @@
package com.github.bluesbruce.ffch.data;

import com.github.bluesbruce.ffch.handler.OutHandler;

/**
* 用于存放任务id,任务主进程,任务输出线程
*
* @author eguid
* @since jdk1.7
* @version 2016年10月29日
*/
public class CommandTasker {
private final String id;// 任务id
private final String command;//命令行
private Process process;// 命令行运行主进程
private OutHandler thread;// 命令行消息输出子线程

public CommandTasker(String id,String command) {
this.id = id;
this.command=command;
}

public CommandTasker(String id,String command, Process process, OutHandler thread) {
this.id = id;
this.command=command;
this.process = process;
this.thread = thread;
}

public String getId() {
return id;
}

public Process getProcess() {
return process;
}

public OutHandler getThread() {
return thread;
}

public String getCommand() {
return command;
}

public void setProcess(Process process) {
this.process = process;
}

public void setThread(OutHandler thread) {
this.thread = thread;
}

@Override
public String toString() {
return "CommandTasker [id=" + id + ", command=" + command + ", process=" + process + ", thread=" + thread + "]";
}

}

+ 46
- 0
src/main/java/com/github/bluesbruce/ffch/data/TaskDao.java Ver fichero

@@ -0,0 +1,46 @@
package com.github.bluesbruce.ffch.data;

import java.util.Collection;

/**
* 任务信息持久层接口
* @author eguid
* @since jdk1.7
* @version 2016年10月29日
*/
public interface TaskDao {
/**
* 通过id查询任务信息
* @param id - 任务ID
* @return CommandTasker -任务实体
*/
public CommandTasker get(String id);
/**
* 查询全部任务信息
* @return Collection<CommandTasker>
*/
public Collection<CommandTasker> getAll();
/**
* 增加任务信息
* @param CommandTasker -任务信息实体
* @return 增加数量:<1-增加失败,>=1-增加成功
*/
public int add(CommandTasker CommandTasker);
/**
* 删除id对应的任务信息
* @param id
* @return 数量:<1-操作失败,>=1-操作成功
*/
public int remove(String id);
/**
* 删除全部任务信息
* @return 数量:<1-操作失败,>=1-操作成功
*/
public int removeAll();
/**
* 是否存在某个ID
* @param id - 任务ID
* @return true:存在,false:不存在
*/
public boolean isHave(String id);
}

+ 69
- 0
src/main/java/com/github/bluesbruce/ffch/data/TaskDaoImpl.java Ver fichero

@@ -0,0 +1,69 @@
package com.github.bluesbruce.ffch.data;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* 任务信息持久层实现
*
* @author eguid
* @since jdk1.7
* @version 2016年10月29日
*/
public class TaskDaoImpl implements TaskDao {
// 存放任务信息
private ConcurrentMap<String, CommandTasker> map = null;

public TaskDaoImpl(int size) {
map = new ConcurrentHashMap<>(size);
}

@Override
public CommandTasker get(String id) {
return map.get(id);
}

@Override
public Collection<CommandTasker> getAll() {
return map.values();
}

@Override
public int add(CommandTasker CommandTasker) {
String id = CommandTasker.getId();
if (id != null && !map.containsKey(id)) {
map.put(CommandTasker.getId(), CommandTasker);
if(map.get(id)!=null)
{
return 1;
}
}
return 0;
}

@Override
public int remove(String id) {
if(map.remove(id) != null){
return 1;
};
return 0;
}

@Override
public int removeAll() {
int size = map.size();
try {
map.clear();
} catch (Exception e) {
return 0;
}
return size;
}

@Override
public boolean isHave(String id) {
return map.containsKey(id);
}

}

+ 41
- 0
src/main/java/com/github/bluesbruce/ffch/data/TaskerEventMsg.java Ver fichero

@@ -0,0 +1,41 @@
package com.github.bluesbruce.ffch.data;

import com.github.bluesbruce.ffch.callback.EventCallBackType;

/**
* 命令行事件消息
* @author eguid
*
*/
public class TaskerEventMsg {
EventCallBackType ecbt;
CommandTasker tasker;

public TaskerEventMsg(EventCallBackType ecbt, CommandTasker tasker) {
super();
this.ecbt = ecbt;
this.tasker = tasker;
}

public EventCallBackType getEcbt() {
return ecbt;
}

public void setEcbt(EventCallBackType ecbt) {
this.ecbt = ecbt;
}

public CommandTasker getTasker() {
return tasker;
}

public void setTasker(CommandTasker tasker) {
this.tasker = tasker;
}

@Override
public String toString() {
return "CommandEventMsg [ecbt=" + ecbt + ", tasker=" + tasker + "]";
}

}

+ 47
- 0
src/main/java/com/github/bluesbruce/ffch/handler/DefaultOutHandlerMethod.java Ver fichero

@@ -0,0 +1,47 @@
package com.github.bluesbruce.ffch.handler;

import lombok.extern.slf4j.Slf4j;

/**
* 默认任务消息输出处理
* @author eguid
* @since jdk1.7
* @version 2017年10月13日
*/
@Slf4j
public class DefaultOutHandlerMethod implements OutHandlerMethod{

/**
* 任务是否异常中断,如果
*/
public boolean isBroken=false;
@Override
public void parse(String id,String msg) {
//过滤消息
if (msg.indexOf("fail") != -1) {
log.info(id + "任务可能发生故障:" + msg);
log.info("失败,设置中断状态");
isBroken=true;
}else if(msg.indexOf("miss")!= -1) {
log.info(id + "任务可能发生丢包:" + msg);
log.info("失败,设置中断状态");
isBroken=true;
}else if(msg.indexOf("occurred")!= -1) {
log.info(id + "任务可能发生丢包10054:" + msg);
log.info("失败,设置中断状态");
isBroken=false;
}else {
isBroken=false;
log.info(id + "消息:" + msg);
}

}

@Override
public boolean isbroken() {
return isBroken;
}
}

+ 81
- 0
src/main/java/com/github/bluesbruce/ffch/handler/KeepAliveHandler.java Ver fichero

@@ -0,0 +1,81 @@
package com.github.bluesbruce.ffch.handler;

import com.github.bluesbruce.ffch.data.CommandTasker;
import com.github.bluesbruce.ffch.data.TaskDao;
import com.github.bluesbruce.ffch.util.ExecUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* 任务保活处理器(一个后台保活线程,用于处理异常中断的持久任务)
* @author eguid
*
*/
@Slf4j
public class KeepAliveHandler extends Thread{
/**待处理队列*/
private static Queue<String> queue=null;
public int err_index=0;//错误计数
public volatile int stop_index=0;//安全停止线程标记
/** 任务持久化器*/
private TaskDao taskDao = null;
public KeepAliveHandler(TaskDao taskDao) {
super();
this.taskDao=taskDao;
queue=new ConcurrentLinkedQueue<>();
}

public static void add(String id ) {
if(queue!=null) {
queue.offer(id);
}
}
public boolean stop(Process process) {
if (process != null) {
process.destroy();
return true;
}
return false;
}
@Override
public void run() {
for(;stop_index==0;) {
if(queue==null) {
continue;
}
String id=null;
CommandTasker task=null;
try {
while(queue.peek() != null) {
log.info("准备重启任务:"+queue);
id=queue.poll();
task=taskDao.get(id);
//重启任务
ExecUtil.restart(task);
}
}catch(IOException e) {
log.info(id+" 任务重启失败,详情:"+task);
//重启任务失败
err_index++;
}catch(Exception e) {
}
}
}
@Override
public void interrupt() {
stop_index=1;
}
}

+ 116
- 0
src/main/java/com/github/bluesbruce/ffch/handler/OutHandler.java Ver fichero

@@ -0,0 +1,116 @@
package com.github.bluesbruce.ffch.handler;

import com.github.bluesbruce.ffch.CommandManager;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

/**
* 任务消息输出处理器
* @author eguid
* @since jdk1.7
* @version 2017年10月13日
*/
@Slf4j
public class OutHandler extends Thread {
/**控制状态 */
private volatile boolean desstatus = true;
/**读取输出流*/
private BufferedReader br = null;
/**任务ID*/
private String id = null;
/**消息处理方法*/
private OutHandlerMethod ohm;

/**
* 创建输出线程(默认立即开启线程)
* @param is
* @param id
* @param ohm
* @return
*/
public static OutHandler create(InputStream is, String id,OutHandlerMethod ohm) {
return create(is, id, ohm,true);
}
/**
* 创建输出线程
* @param is
* @param id
* @param ohm
* @param start-是否立即开启线程
* @return
*/
public static OutHandler create(InputStream is, String id,OutHandlerMethod ohm,boolean start) {
OutHandler out= new OutHandler(is, id, ohm);
if(start)
out.start();
return out;
}
public void setOhm(OutHandlerMethod ohm) {
this.ohm = ohm;
}
public void setDesStatus(boolean desStatus) {
this.desstatus = desStatus;
}

public void setId(String id) {
this.id = id;
}

public OutHandlerMethod getOhm() {
return ohm;
}

public OutHandler(InputStream is, String id,OutHandlerMethod ohm) {
br = new BufferedReader(new InputStreamReader(is));
this.id = id;
this.ohm=ohm;
}
/**
* 重写线程销毁方法,安全的关闭线程
*/
@Override
public void destroy() {
setDesStatus(false);
}

/**
* 执行输出线程
*/
@Override
public void run() {
String msg = null;
try {
if (CommandManager.config.isDebug()) {
log.info(id + "开始推流!");
}
while (desstatus && (msg = br.readLine()) != null) {
ohm.parse(id,msg);
if(ohm.isbroken()) {
log.info("检测到中断,提交重启任务给保活处理器");
//如果发生异常中断,立即进行保活
//把中断的任务交给保活处理器进行进一步处理
KeepAliveHandler.add(id);
}
}
} catch (IOException e) {
log.info("发生内部异常错误,自动关闭[" + this.getId() + "]线程");
destroy();
} finally {
if (this.isAlive()) {
destroy();
}
}
}

}

+ 21
- 0
src/main/java/com/github/bluesbruce/ffch/handler/OutHandlerMethod.java Ver fichero

@@ -0,0 +1,21 @@
package com.github.bluesbruce.ffch.handler;
/**
* 输出消息处理
* @author eguid
* @since jdk1.7
* @version 2017年10月13日
*/
public interface OutHandlerMethod {
/**
* 解析消息
* @param id-任务ID
* @param msg -消息
*/
public void parse(String id, String msg);
/**
* 任务是否异常中断
* @return
*/
public boolean isbroken();
}

+ 45
- 0
src/main/java/com/github/bluesbruce/ffch/handler/TaskHandler.java Ver fichero

@@ -0,0 +1,45 @@
package com.github.bluesbruce.ffch.handler;

import com.github.bluesbruce.ffch.data.CommandTasker;

/**
* 任务执行接口
* @author eguid
* @since jdk1.7
* @version 2016年10月29日
*/
public interface TaskHandler {
/**
* 按照命令执行主进程和输出线程
*
* @param id
* @param command
* @return
*/
public CommandTasker process(String id, String command);

/**
* 停止主进程(停止主进程需要保证输出线程已经关闭,否则输出线程会出错)
*
* @param process
* @return
*/
public boolean stop(Process process);

/**
* 停止输出线程
*
* @param thread
* @return
*/
public boolean stop(Thread thread);

/**
* 正确的停止输出线程和主进程
*
* @param process
* @param thread
* @return
*/
public boolean stop(Process process, Thread thread);
}

+ 69
- 0
src/main/java/com/github/bluesbruce/ffch/handler/TaskHandlerImpl.java Ver fichero

@@ -0,0 +1,69 @@
package com.github.bluesbruce.ffch.handler;

import com.github.bluesbruce.ffch.CommandManager;
import com.github.bluesbruce.ffch.data.CommandTasker;
import com.github.bluesbruce.ffch.util.ExecUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

/**
* 任务处理实现
* @author eguid
* @since jdk1.7
* @version 2016年10月29日
*/
@Slf4j
public class TaskHandlerImpl implements TaskHandler {
private OutHandlerMethod ohm=null;
public TaskHandlerImpl(OutHandlerMethod ohm) {
this.ohm = ohm;
}

public void setOhm(OutHandlerMethod ohm) {
this.ohm = ohm;
}

@Override
public CommandTasker process(String id, String command) {
CommandTasker tasker = null;
try {
log.info(command);
tasker =ExecUtil.createTasker(id,command,ohm);

if(CommandManager.config.isDebug())
log.info(id+" 执行命令行:"+command);
return tasker;
} catch (IOException e) {
//运行失败,停止任务
ExecUtil.stop(tasker);
if(CommandManager.config.isDebug())
log.info(id+" 执行命令失败!进程和输出线程已停止");
// 出现异常说明开启失败,返回null
return null;
}
}
@Override
public boolean stop(Process process) {
return ExecUtil.stop(process);
}

@Override
public boolean stop(Thread outHandler) {
return ExecUtil.stop(outHandler);
}

@Override
public boolean stop(Process process, Thread thread) {
boolean ret=false;
ret=stop(thread);
ret=stop(process);
return ret;
}
}

+ 13
- 0
src/main/java/com/github/bluesbruce/ffch/loadFFmpeg.properties Ver fichero

@@ -0,0 +1,13 @@
#ffmpeg执行路径,一般为ffmpeg的安装目录,该路径只能是目录,不能为具体文件路径,否则会报错
path=Z://ffmpeg/bin/
#存放任务的默认Map的初始化大小
size=10
#事件回调通知接口地址
callback=http://127.0.0.1/callback
#网络超时设置(毫秒)
timeout=300

#开启保活线程
keepalive=true
#是否输出debug消息
debug=true

+ 200
- 0
src/main/java/com/github/bluesbruce/ffch/test/Test.java Ver fichero

@@ -0,0 +1,200 @@
package com.github.bluesbruce.ffch.test;

import com.github.bluesbruce.ffch.CommandManager;
import com.github.bluesbruce.ffch.CommandManagerImpl;
import com.github.bluesbruce.ffch.commandbuidler.CommandBuidlerFactory;
import com.github.bluesbruce.ffch.data.CommandTasker;
import lombok.extern.slf4j.Slf4j;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static com.github.bluesbruce.ffch.util.PropertiesUtil.load;

/**
* 测试
* @author eguid
* @since jdk1.7
* @version 2017年10月13日
*/
@Slf4j
public class Test {
/**
* 命令组装器测试
* @throws InterruptedException
*/
public static void test1() throws InterruptedException{
CommandManager manager = new CommandManagerImpl();
Map<String,String> map = new HashMap<String,String>();
map.put("appName", "test123");
//FFCmdParamConfig Paramconfig=load("cc/eguid/cc.eguid.commandManager/loadCmdParam.yml", FFCmdParamConfig.class);
map.put("input", "");
map.put("output", "");
map.put("codec", "h264");
map.put("fmt", "flv");
map.put("fps", "25");
map.put("rs", "640x360");
map.put("twoPart", "2");
// 执行任务,id就是appName,如果执行失败返回为null
String id = manager.start(map);
log.info(id);
// 通过id查询
CommandTasker info = manager.query(id);
log.info(info.toString());
// 查询全部
Collection<CommandTasker> infoList = manager.queryAll();
log.info(infoList.toString());
Thread.sleep(30000);
// 停止id对应的任务
manager.stop(id);
}
/**
* 默认方式,rtsp->rtmp转流单个命令测试
* @throws InterruptedException
*/
public static void test2() throws InterruptedException{
CommandManager manager = new CommandManagerImpl();
// -rtsp_transport tcp
//测试多个任何同时执行和停止情况
//默认方式发布任务
manager.start("tomcat", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat");
Thread.sleep(30000);
// 停止全部任务
manager.stopAll();
}
/**
* 完整ffmpeg路径测试
* @throws InterruptedException
*/
public static void test4() throws InterruptedException{
CommandManager manager = new CommandManagerImpl();
// -rtsp_transport tcp
//测试多个任何同时执行和停止情况
//默认方式发布任务
manager.start("tomcat", "D:/TestWorkspaces/FFmpegCommandHandler/src/cc/eguid/FFmpegCommandManager/ffmpeg/ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat",true);
Thread.sleep(30000);
// 停止全部任务
manager.stopAll();
}
/**
* rtsp-rtmp转流多任务测试
* @throws InterruptedException
*/
public static void test3() throws InterruptedException{
CommandManager manager = new CommandManagerImpl();
// -rtsp_transport tcp
//测试多个任何同时执行和停止情况
//false表示使用配置文件中的ffmpeg路径,true表示本条命令已经包含ffmpeg所在的完整路径
//manager.start("tomcat", "ffmpeg -i http://192.168.10.101:18000/flv/live/34020000001110000002_34020000001320000071_0200000071.flv -vcodec copy -acodec copy -f flv -y rtmp://192.168.10.101:19350/rlive/stream_9?sign=f8a15b6n",false);
manager.start("tomcat", "ffmpeg -i rtsp://192.168.144.25:554 -vcodec copy -acodec copy -f flv -y rtmp://221.226.114.142:19350/rlive/stream_9?sign=f8a15b6n",false);
//manager.start("tomcat1", "ffmpeg -i rtsp://192.168.144.25:554 -vcodec copy -acodec copy -f flv -y rtmp://192.168.10.101:19350/rlive/stream_11?sign=rHtBg3sz",false);
/*manager.start("tomcat2", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat2",false);
manager.start("tomcat3", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat3",false);
manager.start("tomcat4", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat4",false);
manager.start("tomcat5", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat5",false);
manager.start("tomcat6", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat6",false);
manager.start("tomcat7", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat7",false);
manager.start("tomcat8", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat8",false);
manager.start("tomcat9", "ffmpeg -i rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov -vcodec copy -acodec copy -f flv -y rtmp://106.14.182.20:1935/rtmp/tomcat9",false);*/
Thread.sleep(60000);
// 停止全部任务
manager.stopAll();
}
/**
* 测试流式命令行构建器
* @throws InterruptedException
*/
public static void testStreamCommandAssmbly() throws InterruptedException {
CommandManager manager = new CommandManagerImpl();
manager.start("test1", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov")
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-y").add("rtmp://106.14.182.20:1935/rtmp/test1"));
Thread.sleep(30000);
// 停止全部任务
manager.stopAll();
}
/**
* 测试任务中断自动重启任务
* @throws InterruptedException
*/
public static void testBroken() throws InterruptedException {
CommandManager manager = new CommandManagerImpl();
manager.start("test1", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov")
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-y").add("rtmp://106.14.182.20:1935/rtmp/test1"));
Thread.sleep(30000);
// 停止全部任务
manager.stopAll();
manager.destory();
}
/**
* 批量测试任务中断自动重启任务
* @throws InterruptedException
*/
public static void testBrokenMuti() throws InterruptedException {
CommandManager manager = new CommandManagerImpl();
manager.start("test1", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i","rtsp://admin:HuaWei123@218.94.150.122:554/LiveMedia/ch1/Media1")
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-y").add("rtmp://live.push.t-aaron.com/live/whr"));
/*manager.start("test2", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov")
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-y").add("rtmp://106.14.182.20:1935/rtmp/test2"));
manager.start("test3", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov")
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-y").add("rtmp://106.14.182.20:1935/rtmp/test3"));
manager.start("test4", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov")
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-y").add("rtmp://106.14.182.20:1935/rtmp/test4"));
manager.start("test5", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i","rtsp://184.72.239.149/vod/mp4://BigBuckBunny_175k.mov")
// .add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-y").add("rtmp://106.14.182.20:1935/rtmp/test5"));*/
Thread.sleep(30000);
// 停止全部任务
manager.stopAll();
manager.destory();
}
public static void main(String[] args) throws InterruptedException {
// test1();
// test2();
test3();
// test4();
// testStreamCommandAssmbly();
// testBroken();
//testBrokenMuti();
}
}

+ 70
- 0
src/main/java/com/github/bluesbruce/ffch/util/CommonUtil.java Ver fichero

@@ -0,0 +1,70 @@
package com.github.bluesbruce.ffch.util;

import java.io.File;
import java.util.UUID;

/**
* 公共常用方法工具
* @author eguid
*
*/
public class CommonUtil {
/**
* 当前项目根路径
*/
public static final String rootPath = getProjectRootPath();
public static final String TRUE="true";
public static final String NULL_STRING="";
public static final String H_LINE="-";
public static String getUUID(){
return UUID.randomUUID().toString().trim().replaceAll(H_LINE, NULL_STRING);
}
/**
* 是否为空
*
* @param str
* @return boolean true:为空,false:不为空
*/
public static boolean isNull(String str) {
return str == null || NULL_STRING.equals(str.trim());
}
/**
* 字符串是否是"true"
* @param str
* @return
*/
public static boolean isTrue(String str){
return TRUE.equals(str)?true:false;
}
/**
* 获取项目根目录(静态)
* @return
*/
public static String getRootPath() {
return rootPath;
}
/**
* 获取项目根目录(动态)
* @return
*/
public static String getProjectRootPath() {
String path=null;
try{
path =CommonUtil.class.getResource("/").getPath();
}catch(Exception e){
File directory = new File(NULL_STRING);
path= directory.getAbsolutePath()+File.separator;
}
return path;
}
/**
* 获取类路径
* @param cla
* @return
*/
public static String getClassPath(Class<?> cla){
return cla.getResource("").getPath();
}
}

+ 110
- 0
src/main/java/com/github/bluesbruce/ffch/util/ExecUtil.java Ver fichero

@@ -0,0 +1,110 @@
package com.github.bluesbruce.ffch.util;

import com.github.bluesbruce.ffch.data.CommandTasker;
import com.github.bluesbruce.ffch.handler.OutHandler;
import com.github.bluesbruce.ffch.handler.OutHandlerMethod;

import java.io.IOException;

/**
* 命令行操作工具类
* @author eguid
*
*/
public class ExecUtil {

/**
* 执行命令行并获取进程
* @param cmd
* @return
* @throws IOException
*/
public static Process exec(String cmd) throws IOException {
Runtime runtime = Runtime.getRuntime();
Process process = runtime.exec(cmd);// 执行命令获取主进程
return process;
}
/**
* 销毁进程
* @param process
* @return
*/
public static boolean stop(Process process) {
if (process != null) {
process.destroy();
return true;
}
return false;
}

/**
* 销毁输出线程
* @param outHandler
* @return
*/
public static boolean stop(Thread outHandler) {
if (outHandler != null && outHandler.isAlive()) {
outHandler.stop();
outHandler.destroy();
return true;
}
return false;
}
/**
* 销毁
* @param process
* @param outHandler
*/
public static void stop(CommandTasker tasker) {
if(tasker!=null) {
stop(tasker.getThread());
stop(tasker.getProcess());
}
}

/**
* 创建命令行任务
* @param id
* @param command
* @return
* @throws IOException
*/
public static CommandTasker createTasker(String id, String command, OutHandlerMethod ohm) throws IOException {
// 执行本地命令获取任务主进程
Process process=exec(command);
// 创建输出线程
OutHandler outHandler=OutHandler.create(process.getErrorStream(), id,ohm);
CommandTasker tasker = new CommandTasker(id,command, process, outHandler);
return tasker;
}
/**
* 中断故障缘故重启
* @param tasker
* @return
* @throws IOException
*/
public static CommandTasker restart(CommandTasker tasker) throws IOException {
if(tasker!=null) {
String id=tasker.getId(),command=tasker.getCommand();
OutHandlerMethod ohm=null;
if(tasker.getThread()!=null) {
ohm=tasker.getThread().getOhm();
}
//安全销毁命令行进程和输出子线程
stop(tasker);
// 执行本地命令获取任务主进程
Process process=exec(command);
tasker.setProcess(process);
// 创建输出线程
OutHandler outHandler=OutHandler.create(process.getErrorStream(), id,ohm);
tasker.setThread(outHandler);
}
return tasker;
}
}

+ 144
- 0
src/main/java/com/github/bluesbruce/ffch/util/PropertiesUtil.java Ver fichero

@@ -0,0 +1,144 @@
package com.github.bluesbruce.ffch.util;

import lombok.extern.slf4j.Slf4j;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;

/**
* properties配置文件读取
* @author eguid
*
*/
@Slf4j
public class PropertiesUtil {
/**
* 加载properties配置文件并读取配置项
* @param path
* @param cl
* @return
*/
@SuppressWarnings("unchecked")
public static <T>T load(String path, Class<T> cl) {
InputStream is = null;
try {
is = getInputStream(path);
} catch (FileNotFoundException e) {
//尝试从web目录读取
String newpath=System.getProperty("user.dir")+path;
log.info("尝试从web目录读取配置文件:"+newpath);
try {
is = getInputStream(newpath);
log.info("web目录读取到配置文件:"+newpath);
} catch (FileNotFoundException e1) {
log.info("没找到配置文件,读取默认配置文件");
//尝试从jar包中读取默认配置文件
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
try {
is = classloader.getResourceAsStream("cc/eguid/FFmpegCommandManager/config/defaultFFmpegConfig.properties");
log.info("读取默认配置文件:defaultFFmpegConfig.properties");
} catch (Exception e2) {
log.info("没找到默认配置文件:defaultFFmpegConfig.properties");
return null;
}
}
}
if (is != null) {
Properties pro = new Properties();
try {
log.info("加载配置文件...");
pro.load(is);
log.info("加载配置文件完毕");
return (T)load(pro, cl);
} catch (IOException e) {
log.info("加载配置文件失败");
return null;
}

}
return null;
}
/**
* 读取配置项并转换为对应对象
* @param pro
* @param cl
* @return
*/
public static Object load(Properties pro, Class<?> cl) {
try {
Map<String, Object> map = getMap(pro);
log.info("读取的配置项:" + map);
Object obj = ReflectUtil.mapToObj(map, cl);
log.info("转换后的对象:" + obj);
return obj;
} catch (InstantiationException e) {
log.info("加载配置文件失败");
return null;
} catch (IllegalAccessException e) {
log.info("加载配置文件失败");
return null;
} catch (IllegalArgumentException e) {
log.info("加载配置文件失败");
return null;
} catch (InvocationTargetException e) {
log.info("加载配置文件失败");
return null;
}
}
/**
* 获取对应文件路径下的文件流
* @param path
* @return
* @throws FileNotFoundException
*/
public static InputStream getInputStream(String path) throws FileNotFoundException {
return new FileInputStream(path);
}
/**
* 根据路径获取properties的Map格式内容
* @param path
* @return
*/
public static Map<String, Object> getMap(String path){
Properties pro=new Properties();
try {
pro.load(getInputStream(path));
return getMap(pro);
} catch (IOException e) {
return null;
}
}
/**
* 根据路径获取properties的Map格式内容
* @param path
* @param isRootPath -是否在项目根目录中
* @return
*/
public static Map<String, Object> getMap(String path,boolean isRootPath){
return getMap(isRootPath?CommonUtil.getProjectRootPath()+path:path);
}
/**
* Properties配置项转为Map<String, Object>
* @param pro
* @return
*/
public static Map<String, Object> getMap(Properties pro) {
if (pro == null || pro.isEmpty() || pro.size() < 1) {
return null;
}
Map<String, Object> map = new HashMap<String, Object>();
for (Entry<Object, Object> en : pro.entrySet()) {
String key = (String) en.getKey();
Object value = en.getValue();
map.put(key, value);
}
return map;
}
}

+ 166
- 0
src/main/java/com/github/bluesbruce/ffch/util/ReflectUtil.java Ver fichero

@@ -0,0 +1,166 @@
package com.github.bluesbruce.ffch.util;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

/**
* 反射操作工具
*
* @author eguid
*
*/
public class ReflectUtil {

public static final String SET = "set";
public static final String GET = "get";

public static Object mapToObj(Map<String, Object> map, Class<?> oc)
throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
Method[] ms = oc.getDeclaredMethods();
if (ms == null || ms.length < 1) {
return null;
}
Object obj = getObject(oc);
for (Method m : ms) {
String methodName = m.getName();
String fieldName = getMethodField(methodName, SET);
Object value = map.get(fieldName);
if (value != null) {
setMethodValue(m, obj, typeConvert(value, m));
}
}
return obj;
}

public static Object typeConvert(Object obj, Method m) {
return typeConvert(obj, m.getParameterTypes()[0].getName());
}

public static Object typeConvert(Object obj, Field f) {
return typeConvert(obj, f.getType().getName());
}
/**
* 基础数据转换
* @param obj
* @param typeName
* @return
*/
public static Object typeConvert(Object obj, String typeName) {
// 基础数据都可以转为String
String str = String.valueOf(obj);
if ("int".equals(typeName) || "java.lang.Integer".equals(typeName)) {
return Integer.valueOf(str.trim());
} else if ("long".equals(typeName) || "java.lang.Long".equals(typeName)) {
return Long.valueOf(str.trim());
} else if ("byte".equals(typeName) || "java.lang.Byte".equals(typeName)) {
return Byte.valueOf(str.trim());
} else if ("short".equals(typeName) || "java.lang.Short".equals(typeName)) {
return Short.valueOf(str.trim());
} else if ("float".equals(typeName) || "java.lang.Float".equals(typeName)) {
return Float.valueOf(str.trim());
} else if ("double".equals(typeName) || "java.lang.Double".equals(typeName)) {
return Double.valueOf(str.trim());
} else if ("boolean".equals(typeName) || "java.lang.Boolean".equals(typeName)) {
return CommonUtil.TRUE.equals(str)?true:false;
} else if ("char".equals(typeName) || "java.lang.Character".equals(typeName)) {
return Character.valueOf(str.trim().charAt(0));
} else if ("java.lang.String".equals(typeName)) {
return str;
}
return null;
}

public static Class<?> getFieldType(Class<?> cl, String fieldName) throws NoSuchFieldException, SecurityException {
Field f = cl.getDeclaredField(fieldName);
return f.getType();
}

public static Field findField(Class<?> cl, String fieldName) throws NoSuchFieldException, SecurityException {
return cl.getDeclaredField(fieldName);
}

/**
* 执行方法
*
* @param m
* - 方法
* @param obj
* - 对象
* @param value
* - 参数
* @throws IllegalAccessException
* @throws IllegalArgumentException
* @throws InvocationTargetException
*/
public static Object setMethodValue(Method m, Object obj, Object... value)
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
m.getParameterTypes();
return m.invoke(obj, value);
}

public static Object getFieldValue(Class<?> obj, String FieldName) throws NoSuchFieldException, SecurityException {
return obj.getDeclaredField(FieldName);
}

/**
* 通过class实例化
*
* @param oc
* @return
* @throws InstantiationException
* @throws IllegalAccessException
*/
public static Object getObject(Class<?> oc) throws InstantiationException, IllegalAccessException {
return oc.newInstance();
}

/**
* 获取方法字段
*
* @param methodName
* @param prefix
* @param lowercase
* @return
*/
public static String getMethodField(String methodName, String prefix) {
String m = null;
if (prefix != null) {
if (methodName.indexOf(prefix) >= 0) {
m = methodName.substring(prefix.length());
return stringFirstLower(m);
}
}
return m;
}

/**
* 首字母大写
*
* @param str
* @return
*/
public static String stringFirstUpper(String str) {
char[] ch = str.toCharArray();
if (ch[0] >= 'a' && ch[0] <= 'z') {
ch[0] = (char) (ch[0] - 32);
}
return new String(ch);
}

/**
* 首字母小写
*
* @param str
* @return
*/
public static String stringFirstLower(String str) {
char[] ch = str.toCharArray();
if (ch[0] >= 'A' && ch[0] <= 'Z') {
ch[0] = (char) (ch[0] + 32);
}
return new String(ch);
}

}

+ 34
- 0
src/main/java/com/github/bluesbruce/spring/Application.java Ver fichero

@@ -0,0 +1,34 @@
package com.github.bluesbruce.spring;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Profile;

import static com.github.bluesbruce.ffch.util.PropertiesUtil.load;

/**
* SpringBoot 入口类
*
* @author BBF
*/

@EnableCaching
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
HibernateJpaAutoConfiguration.class})
public class Application extends SpringBootServletInitializer {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Profile(value = {"war"})
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(Application.class);
}

}

+ 74
- 0
src/main/java/com/github/bluesbruce/spring/config/CmdParam.java Ver fichero

@@ -0,0 +1,74 @@
package com.github.bluesbruce.spring.config;

public class CmdParam {
private String pushUrl;//
private String pushUrl2;//
private String playUrl;//
private int type;//1 固定通道 2 通道服务
String mqttUrl;
private String mqttTopic;

public String getPushUrl2() {
return pushUrl2;
}

public void setPushUrl2(String pushUrl2) {
this.pushUrl2 = pushUrl2;
}

public String getMqttUrl() {
return mqttUrl;
}

public void setMqttUrl(String mqttUrl) {
this.mqttUrl = mqttUrl;
}

public String getMqttTopic() {
return mqttTopic;
}

public void setMqttTopic(String mqttTopic) {
this.mqttTopic = mqttTopic;
}

public String getPushUrl() {
return pushUrl;
}

public int getType() {
return type;
}

public void setType(int type) {
this.type = type;
}

public CmdParam(String pushUrl, String playUrl, int type, String mqttUrl, String mqttTopic) {
this.pushUrl = pushUrl;
this.playUrl = playUrl;
this.type = type;
this.mqttUrl = mqttUrl;
this.mqttTopic = mqttTopic;
}

public CmdParam() {

}
public void setPushUrl(String pushUrl) {
this.pushUrl = pushUrl;
}

public String getPlayUrl() {
return playUrl;
}

@Override
public String toString() {
return "CmdParam [pushUrl=" + pushUrl + ", playUrl=" + playUrl + ",type=" + type + "]";
}

public void setPlayUrl(String playUrl) {
this.playUrl = playUrl;
}
}

+ 41
- 0
src/main/java/com/github/bluesbruce/spring/config/Readproperties.java Ver fichero

@@ -0,0 +1,41 @@
package com.github.bluesbruce.spring.config;


import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

/**
* @author 白豆五
* @version 2023/07/16
* @since JDK8
*/

public class Readproperties {
public void test() {
// 配置对象
Properties props = new Properties();
InputStreamReader input = null;
try {
// 输入流 (字节流转字符流)
input = new InputStreamReader(
this.getClass().getClassLoader().getResourceAsStream(System.getProperty("user.dir")+"/loadFFmpeg.properties"),//通过类加载器来获取指定路径下的资源文件,并返回一个InputStream对象
StandardCharsets.UTF_8); //指定编码格式
// 加载配置
props.load(input);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (input!=null)
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}

}
// 获取配置
System.out.println("id:" + props.getProperty("student.id") + ", name:" + props.getProperty("student.name"));
}
}

+ 295
- 0
src/main/java/com/github/bluesbruce/spring/mqttService/HttpURLConnectionUtil.java Ver fichero

@@ -0,0 +1,295 @@
package com.github.bluesbruce.spring.mqttService;
import org.springframework.lang.Nullable;

import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

/**
* @author ww
* @date 2022/07/8 10:42
*/
public class HttpURLConnectionUtil {

/**
* Http get请求
* @param httpUrl 连接
* @return 响应数据
*/
public static String doGet(String httpUrl){
//链接
HttpURLConnection connection = null;
InputStream is = null;
BufferedReader br = null;
StringBuffer result = new StringBuffer();
try {
//创建连接
URL url = new URL(httpUrl);
connection = (HttpURLConnection) url.openConnection();
//设置请求方式
connection.setRequestMethod("GET");

connection.setRequestProperty("accept", "*/*");
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");
connection.setRequestProperty("Content-Type", "application/json;charset=utf-8");
connection.setRequestProperty("Authorization", "Basic YWRtaW46cHVibGlj");
connection.setRequestProperty("Cookie", "jenkins-timestamper-offset=-28800000; order=id%20desc; serverType=nginx; ltd_end=-1; pro_end=-1; memSize=7768; bt_user_info=%7B%22status%22%3Atrue%2C%22msg%22%3A%22%u83B7%u53D6%u6210%u529F%21%22%2C%22data%22%3A%7B%22username%22%3A%22139****4314%22%7D%7D; sites_path=/www/wwwroot; distribution=centos8; p-1=1; lang=en-US; force=0; b6e922e8cf76dea67efb3a9292724aee=acc59469-46e3-4576-b216-81b907d80a7f.rB3Plw7uc301E0qFoEBeoP_fL4w; load_search=undefined; remember-me=YWRtaW46MTY1ODIxMDYwODA2MDoxY2ZhOTBhMzFlN2IzZTg1YzM5MWQ3Y2M4MWEzYzVlZmYzM2UzOGQ4MTVjZDZhNTUwNjQ4YjFhMjVlYTM3NWJj; JSESSIONID.fc8332ab=node01bo1jsydkd8ql1oo3srksa2k6w288.node0; screenResolution=1920x1080; b2651c13496a99bc2899b024b99bca4f=b4e55bb9-4a3e-44b6-ae8b-c054e7115182.SEaDzMgHxI4d2CGoGwNPnj3L2y4; request_token=hH1LwVtazrgz4sMI4CE1SwEVpCYgiQcHNp4EfyvPni9C32PJ; soft_remarks=%7B%22list%22%3A%5B%22%u4F01%u4E1A%u7248%u3001%u4E13%u4E1A%u7248%u63D2%u4EF6%22%2C%2215%u5929%u65E0%u7406%u7531%u9000%u6B3E%22%2C%22%u53EF%u66F4%u6362IP%22%2C%22%u5E74%u4ED8%u8D60%u90012%u5F20SSL%u8BC1%u4E66%22%2C%22%u5E74%u4ED8%u8D60%u90011000%u6761%u77ED%u4FE1%22%2C%22%u4F4E%u81F32.43%u5143/%u5929%22%2C%22%u5546%u7528%u9632%u706B%u5899%u6388%u6743%22%2C%22%u5E74%u4ED8%u53EF%u5165%u4F01%u4E1A%u7248%u670D%u52A1%u7FA4%22%2C%22%u4EA7%u54C1%u6388%u6743%u8BC1%u4E66%22%5D%2C%22pro_list%22%3A%5B%22%u4E13%u4E1A%u7248%u63D2%u4EF6%22%2C%2215%u5929%u65E0%u7406%u7531%u9000%u6B3E%22%2C%22%u53EF%u66F4%u6362IP%22%2C%22%u4F4E%u81F31.18%u5143/%u5929%22%2C%22%u5546%u7528%u9632%u706B%u5899%u6388%u6743%22%2C%22%u4EA7%u54C1%u6388%u6743%u8BC1%u4E66%22%5D%2C%22kfqq%22%3A%223007255432%22%2C%22kf%22%3A%22http%3A//q.url.cn/CDfQPS%3F_type%3Dwpa%26qidian%3Dtrue%22%2C%22qun%22%3A%22%22%2C%22kf_list%22%3A%5B%7B%22qq%22%3A%223007255432%22%2C%22kf%22%3A%22http%3A//q.url.cn/CDfQPS%3F_type%3Dwpa%26qidian%3Dtrue%22%7D%2C%7B%22qq%22%3A%222927440070%22%2C%22kf%22%3A%22http%3A//wpa.qq.com/msgrd%3Fv%3D3%26uin%3D2927440070%26site%3Dqq%26menu%3Dyes%26from%3Dmessage%26isappinstalled%3D0%22%7D%5D%2C%22wx_list%22%3A%5B%7B%22ps%22%3A%22%u552E%u524D%u54A8%u8BE2%22%2C%22kf%22%3A%22https%3A//work.weixin.qq.com/kfid/kfc72fcbde93e26a6f3%22%7D%5D%7D; load_page=null; load_type=null");
//设置连接超时时间
connection.setReadTimeout(15000);
//开始连接
connection.connect();
//获取响应数据
if (connection.getResponseCode() == 200) {
//获取返回的数据
is = connection.getInputStream();
if (null != is) {
br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String temp = null;
while (null != (temp = br.readLine())) {
result.append(temp);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != br) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (null != is) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//关闭远程连接
connection.disconnect();
}
return result.toString();
}

/**
* Http post请求
* @param httpUrl 连接
* @param param 参数
* @return
*/
public static String doPost(String httpUrl, @Nullable String param) {
StringBuffer result = new StringBuffer();
//连接
HttpURLConnection connection = null;
OutputStream os = null;
InputStream is = null;
BufferedReader br = null;
trustAllHosts();
try {
//创建连接对象
URL url = new URL(httpUrl);
//创建连接
connection = (HttpURLConnection) url.openConnection();
//设置请求方法
connection.setRequestMethod("POST");
//设置连接超时时间
connection.setConnectTimeout(15000);
//设置读取超时时间
connection.setReadTimeout(15000);
//DoOutput设置是否向httpUrlConnection输出,DoInput设置是否从httpUrlConnection读入,此外发送post请求必须设置这两个
//设置是否可读取
connection.setDoOutput(true);
connection.setDoInput(true);
//设置通用的请求属性
connection.setRequestProperty("accept", "*/*");
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");
connection.setRequestProperty("Content-Type", "application/json;charset=utf-8");

//拼装参数
if (null != param && (!param.equals(""))) {
//设置参数
os = connection.getOutputStream();
//拼装参数
os.write(param.getBytes("UTF-8"));
}
//设置权限
//设置请求头等
//开启连接
//connection.connect();
//读取响应
if (connection.getResponseCode() == 200) {
is = connection.getInputStream();
if (null != is) {
//br = new BufferedReader(new InputStreamReader(is, "GBK"));
br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String temp = null;
while (null != (temp = br.readLine())) {
result.append(temp);
result.append("\r\n");
}
}
}

} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
//关闭连接
if(br!=null){
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(os!=null){
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(is!=null){
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//关闭连接
connection.disconnect();
}
return result.toString();
}
public static String doPut(String httpUrl, @Nullable String param) {
StringBuffer result = new StringBuffer();
//连接
HttpURLConnection connection = null;
OutputStream os = null;
InputStream is = null;
BufferedReader br = null;
try {
//创建连接对象
URL url = new URL(httpUrl);
//创建连接
connection = (HttpURLConnection) url.openConnection();
//设置请求方法
connection.setRequestMethod("PUT");
//设置连接超时时间
connection.setConnectTimeout(15000);
//设置读取超时时间
connection.setReadTimeout(15000);
//DoOutput设置是否向httpUrlConnection输出,DoInput设置是否从httpUrlConnection读入,此外发送post请求必须设置这两个
//设置是否可读取
connection.setDoOutput(true);
connection.setDoInput(true);
//设置通用的请求属性
connection.setRequestProperty("accept", "*/*");
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");
connection.setRequestProperty("Content-Type", "application/json;charset=utf-8");

//拼装参数
if (null != param && (!param.equals(""))) {
//设置参数
os = connection.getOutputStream();
//拼装参数
os.write(param.getBytes("UTF-8"));
}
//设置权限
//设置请求头等
//开启连接
//connection.connect();
//读取响应
if (connection.getResponseCode() == 200) {
is = connection.getInputStream();
if (null != is) {
//br = new BufferedReader(new InputStreamReader(is, "GBK"));
br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String temp = null;
while (null != (temp = br.readLine())) {
result.append(temp);
result.append("\r\n");
}
}
}

} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
//关闭连接
if(br!=null){
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(os!=null){
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(is!=null){
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//关闭连接
connection.disconnect();
}
return result.toString();
}
public static void main(String[] args) {
String message = doPost("https://tcc.taobao.com/cc/json/mobile_tel_segment.htm?tel=13026194071", "");
System.out.println(message);
}

/**
* Trust every server - dont check for any certificate
*/
private static void trustAllHosts() {
final String TAG = "trustAllHosts";
// Create a trust manager that does not validate certificate chains
TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() {
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[]{};
}

public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
//Log.i(TAG, "checkClientTrusted");
}

public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
//Log.i(TAG, "checkServerTrusted");
}
}};
// Install the all-trusting trust manager
try {
SSLContext sc = SSLContext.getInstance("TLS");
sc.init(null, trustAllCerts, new java.security.SecureRandom());
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
} catch (Exception e) {
e.printStackTrace();
}
}

}

+ 73
- 0
src/main/java/com/github/bluesbruce/spring/mqttService/consumer/MqttConsumerCallBack.java Ver fichero

@@ -0,0 +1,73 @@
package com.github.bluesbruce.spring.mqttService.consumer;

import com.github.bluesbruce.spring.service.MqttLiveHandle;
import com.github.bluesbruce.spring.utils.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

@Slf4j
@Component
public class MqttConsumerCallBack implements MqttCallback {

@Autowired
private MqttLiveHandle mqttLiveHandle;
/**
* 客户端断开连接的回调
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("与服务器断开连接,可重连");
/*MqttProviderConfig client = SpringUtil.getBean(MqttProviderConfig.class);
System.out.println(client.isconnect());
if (!client.isconnect()) {
client.connect();
System.out.println("重连成功!");
}*/
}

DateFormat bf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

/**
* 消息到达的回调
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
try {
log.info(String.format("接收消息主题 : %s", topic) + "-" + bf.format(new Date()));
log.info(String.format("接收消息Qos : %d", message.getQos()));
log.info(String.format("接收消息内容 : %s", new String(message.getPayload())));
log.info(String.format("接收消息retained : %b", message.isRetained()));
if (topic.contains("/rtmp/live")){
if (ObjectUtils.isEmpty(mqttLiveHandle)){
mqttLiveHandle = SpringUtil.getBean(MqttLiveHandle.class);
}
mqttLiveHandle.handleLive(topic,message);
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 消息发布成功的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

}





}


+ 99
- 0
src/main/java/com/github/bluesbruce/spring/mqttService/send/MqttProviderConfig.java Ver fichero

@@ -0,0 +1,99 @@
package com.github.bluesbruce.spring.mqttService.send;

import com.github.bluesbruce.spring.config.CmdParam;
import com.github.bluesbruce.spring.mqttService.consumer.MqttConsumerCallBack;
import com.github.bluesbruce.spring.utils.SnowId;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

import static com.github.bluesbruce.ffch.util.PropertiesUtil.load;


@Configuration
@Slf4j
public class MqttProviderConfig {

public static final CmdParam cmdParam=load("/loadCmd.properties", CmdParam.class);
/**
* 客户端对象
*/
public MqttClient client;
/**
* 在bean初始化后连接到服务器
*/
@PostConstruct
public void init(){
//connect();
}

/**
* 客户端连接服务端
*/
public void connect(){
try{
//创建MQTT客户端对象
client = new MqttClient(cmdParam.getMqttUrl(), String.valueOf(SnowId.snowId()),new MemoryPersistence());
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接服务器都是以新的身份
options.setCleanSession(true);
//设置连接用户名
options.setUserName("");
//设置连接密码
options.setPassword("".toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic",(cmdParam.getMqttUrl() + "与服务器断开连接").getBytes(),0,false);

//防止 ERROR o.e.p.c.mqttv3.internal.ClientState - Timed out as no activity 错误
options.setConnectionTimeout(0);

//mqttClient.reconnect(); 这个方法或者回调已经设置了重连
options.setAutomaticReconnect(true);
//设置回调
client.setCallback(new MqttConsumerCallBack());
client.connect(options);
client.subscribe(cmdParam.getMqttTopic(),0);
} catch(MqttException e){
e.printStackTrace();
}
}

public MqttDeliveryToken publish(int qos,boolean retained,String topic,String message){
log.info("服务推送MQTT消息,topic:{};qos:{};message:{}",topic,qos,message);
if (!client.isConnected()){
connect();
}
MqttDeliveryToken token=null;
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主题的目的地,用于发布/订阅信息
MqttTopic mqttTopic = client.getTopic(topic);
//提供一种机制来跟踪消息的传递进度
//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度

try {
//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
log.error("机场平台推送MQTT消息,topic:{};qos:{};message:{}异常:{}",topic,qos,message,e);
}
return token;
}

}


+ 64
- 0
src/main/java/com/github/bluesbruce/spring/service/FFrtmpServer.java Ver fichero

@@ -0,0 +1,64 @@
package com.github.bluesbruce.spring.service;


import com.github.bluesbruce.ffch.CommandManager;
import com.github.bluesbruce.ffch.CommandManagerImpl;
import com.github.bluesbruce.ffch.commandbuidler.CommandBuidlerFactory;
import com.github.bluesbruce.spring.config.CmdParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import static com.github.bluesbruce.ffch.util.PropertiesUtil.load;

@Slf4j
@Component
@Order(value=2)
public class FFrtmpServer implements ApplicationRunner {

public static final CmdParam cmdParam=load("/loadCmd.properties", CmdParam.class);
@Override
public void run(ApplicationArguments args) {
try {
CommandManager manager = new CommandManagerImpl();
// -rtsp_transport tcp
//测试多个任何同时执行和停止情况
//false表示使用配置文件中的ffmpeg路径,true表示本条命令已经包含ffmpeg所在的完整路径
//manager.start("tomcat", "ffmpeg -i http://192.168.10.101:18000/flv/live/34020000001110000002_34020000001320000071_0200000071.flv -vcodec copy -acodec copy -f flv -y rtmp://192.168.10.101:19350/rlive/stream_9?sign=f8a15b6n",false);
//manager.start("tomcat", "ffmpeg -i rtsp://192.168.144.25:554/stream=0 -vcodec copy -acodec copy -f flv -y rtmp://221.226.114.142:19350/rlive/stream_9?sign=f8a15b6n",false);
//manager.start("tomcat1", "ffmpeg -i rtsp://192.168.144.25:554/stream=0 -vcodec copy -acodec copy -f flv -y rtmp://221.226.114.142:19350/rlive/stream_11?sign=rHtBg3sz",false);
manager.start("test1", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i",cmdParam.getPlayUrl())
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-b:v", "2M")
.add("-maxrate", "2M")
.add("-bufsize", "1M")
.add("-y").add(cmdParam.getPushUrl()));
Thread.sleep(10000);
manager.start("test2", CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i",cmdParam.getPlayUrl())
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-b:v", "2M")
.add("-maxrate", "2M")
.add("-bufsize", "1M")
.add("-y").add(cmdParam.getPushUrl2()));
Thread.sleep(300000);
// 停止全部任务
manager.stopAll();
}catch (Exception e){
log.error("",e);
}
//ds.close();
}



}

+ 34
- 0
src/main/java/com/github/bluesbruce/spring/service/MqttLiveHandle.java Ver fichero

@@ -0,0 +1,34 @@
package com.github.bluesbruce.spring.service;

import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

@Component
public class MqttLiveHandle {
@Autowired
private RtmpLiveService rtmpLiveService;

public void handleLive(final String topic, MqttMessage message){
String objmsg = new String(message.getPayload());
final JSONObject jsonObject = JSONObject.parseObject(objmsg);
if (!ObjectUtils.isEmpty(jsonObject.get("command"))){
String cmdoperat = jsonObject.get("command").toString();
if (cmdoperat.equals("start")){
Thread thread = new Thread(new Runnable() {
String code= jsonObject.get("code")==null?topic.split("/")[2]:jsonObject.get("code").toString();
public void run() {
rtmpLiveService.pushServer(code);
}
});
thread.start();
}else if (cmdoperat.equals("stop")){
rtmpLiveService.stopRtmp();
}
}
}


}

+ 222
- 0
src/main/java/com/github/bluesbruce/spring/service/RtmpLiveService.java Ver fichero

@@ -0,0 +1,222 @@
package com.github.bluesbruce.spring.service;

import com.github.bluesbruce.ffch.CommandManager;
import com.github.bluesbruce.ffch.CommandManagerImpl;
import com.github.bluesbruce.ffch.commandbuidler.CommandBuidlerFactory;
import com.github.bluesbruce.ffch.data.CommandTasker;
import com.alibaba.fastjson.JSONObject;
import com.github.bluesbruce.spring.config.CmdParam;
import com.github.bluesbruce.spring.mqttService.HttpURLConnectionUtil;
import com.github.bluesbruce.spring.mqttService.send.MqttProviderConfig;
import lombok.extern.slf4j.Slf4j;
import org.h2.util.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import java.util.Collection;
import java.util.List;

import static com.github.bluesbruce.ffch.util.PropertiesUtil.load;

@Service
@Slf4j
public class RtmpLiveService {
/**
* 加载配置文件的推拉流地址
*/
public static final CmdParam cmdParam=load("/loadCmd.properties", CmdParam.class);

@Autowired
private MqttProviderConfig mqttProviderConfig;

public static final CommandManager manager = new CommandManagerImpl();
/**
* h获取通道并推送
*/
public void pushServer(String code){
//CommandManager manager = new CommandManagerImpl();
String reTopic = cmdParam.getMqttTopic().replace("live","result");
JSONObject jsonObject = new JSONObject();
Collection<CommandTasker> infoList = manager.queryAll();
log.info(infoList.toString());
if (infoList.size()>0){
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务已启动"); //推流失败
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
return;
}
String pushUrl="";
if (cmdParam.getType()==2){
log.info("获取流媒体通道");
//TODO 获取通道服务推拉流地址
JSONObject object = getChenl();
if (!ObjectUtils.isEmpty(object)){
object.put("code", 0);
object.put("msg", "获取通道成功");
mqttProviderConfig.publish(2,false,reTopic,object.toJSONString());
}else{
log.info("获取通道失败");
object.put("code", -1);
object.put("msg", "获取通道失败");
mqttProviderConfig.publish(2,false,reTopic,object.toJSONString());
return;
}
//TODO end
}else{
log.info("使用固定流媒体通道");
pushUrl= cmdParam.getPushUrl();
}
runRtmp(pushUrl,cmdParam.getPlayUrl(),code);

}

/**
* 获取阿里云通道
* @return
*/
private JSONObject getChenl() {
String result = HttpURLConnectionUtil.doGet("https://streaming.t-aaron.com/livechannel/getLiveChannelList");
if (!StringUtils.isNullOrEmpty(result)) {
try {
JSONObject object = JSONObject.parseObject(result);
List<JSONObject> objectList = (List<JSONObject>) object.get("data");
if (objectList.size() > 0) {
log.info("获取到通道列表{}", objectList);
for (int i = 0; i < objectList.size(); i++) {
JSONObject chenl = objectList.get(i);
//设置通道占用
JSONObject param = new JSONObject();
param.put("code", chenl.get("code").toString());
String resultOn = HttpURLConnectionUtil.doPut("https://streaming.t-aaron.com/livechannel/useLiveChannel", param.toJSONString());
if (!StringUtils.isNullOrEmpty(resultOn)) {
JSONObject objectResult = JSONObject.parseObject(resultOn);
if (objectResult.get("code").toString().equals("0")) {
log.info("占用通道{}", objectList.get(i));
return objectList.get(i);
}
}
}

}
}catch (Exception e){
log.error("",e);
}
}else{
return null;
}
return null;
}


/**
* 根据推拉流地址推动
* @param pushUrl
* @param playUrl
*/
public void runRtmp(String pushUrl,String playUrl,String code) {
try {
log.info("获取播流地址:{}");
//CommandManager manager = new CommandManagerImpl();
String taskId = manager.start(code, CommandBuidlerFactory.createBuidler()
.add("ffmpeg").add("-i",playUrl)
.add("-rtsp_transport","tcp")
.add("-vcodec","copy")
.add("-acodec","copy")
.add("-f","flv")
.add("-y").add(pushUrl));
cmdParam.getMqttTopic();
String reTopic = cmdParam.getMqttTopic().replace("live","result");

log.info(manager.queryAll().toString());
JSONObject jsonObject = new JSONObject();
if (StringUtils.isNullOrEmpty(taskId)){
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务失败"); //推流失败
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
// 停止全部任务
manager.stopAll();
return;
}else {
jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务启动成功");
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
}
Thread.sleep(36000);
log.info(manager.queryAll().toString());
// 停止全部任务
manager.stopAll();
manager.destory();
}catch (Exception e){
log.error("",e);
}
//ds.close();
}

public void stopRtmp() {
try {
log.info("停止推流");
//CommandManager manager = new CommandManagerImpl();
// 停止全部任务
int index = manager.stopAll();
manager.destory();
JSONObject jsonObject = new JSONObject();
String reTopic = cmdParam.getMqttTopic().replace("live","result");
if (index==0){
jsonObject.put("code", -1);
jsonObject.put("msg", "推流服务关闭失败"); //推流失败
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
// 停止全部任务
manager.stopAll();
return;
}else if (index>0){
jsonObject.put("code", 0);
jsonObject.put("msg", "推流服务关闭成功");
returnChenl();
mqttProviderConfig.publish(2,false,reTopic,jsonObject.toJSONString());
}
}catch (Exception e){
log.error("",e);
}
//ds.close();
}


/**
* 释放阿里云通道
* @return
*/
private JSONObject returnChenl() {
try {
String result = HttpURLConnectionUtil.doGet("https://streaming.t-aaron.com/livechannel/getLiveChannelList");
if (!StringUtils.isNullOrEmpty(result)) {
JSONObject object = JSONObject.parseObject(result);
List<JSONObject> objectList = (List<JSONObject>) object.get("data");
if (objectList.size() > 0) {
log.info("获取到通道列表{}", objectList);
for (int i = 0; i < objectList.size(); i++) {
JSONObject chenl = objectList.get(i);
//设置通道占用
JSONObject param = new JSONObject();
param.put("code", chenl.get("code").toString());
String resultOn = HttpURLConnectionUtil.doPut("https://streaming.t-aaron.com/livechannel/useLiveChannel", param.toJSONString());
if (!StringUtils.isNullOrEmpty(resultOn)) {
JSONObject objectResult = JSONObject.parseObject(resultOn);
if (objectResult.get("code").toString().equals("0")) {
log.info("占用通道{}", objectList.get(i));
return objectList.get(i);
}
}
}

}
} else {
return null;
}
return null;
}catch (Exception e){
log.error("",e);
}
return null;
}
}

+ 137
- 0
src/main/java/com/github/bluesbruce/spring/utils/SnowId.java Ver fichero

@@ -0,0 +1,137 @@
package com.github.bluesbruce.spring.utils;


public class SnowId {

// ==============================Fields===========================================
/** 开始时间截 (2015-01-01) */
private final long twepoch = 1420041600000L;

/** 机器id所占的位数 */
private final long workerIdBits = 5L;

/** 数据标识id所占的位数 */
private final long datacenterIdBits = 5L;

/** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

/** 支持的最大数据标识id,结果是31 */
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

/** 序列在id中占的位数 */
private final long sequenceBits = 12L;

/** 机器ID向左移12位 */
private final long workerIdShift = sequenceBits;

/** 数据标识id向左移17位(12+5) */
private final long datacenterIdShift = sequenceBits + workerIdBits;

/** 时间截向左移22位(5+5+12) */
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

/** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

/** 工作机器ID(0~31) */
private long workerId;

/** 数据中心ID(0~31) */
private long datacenterId;

/** 毫秒内序列(0~4095) */
private long sequence = 0L;

/** 上次生成ID的时间截 */
private long lastTimestamp = -1L;

//==============================Constructors=====================================
/**
* 构造函数
* @param workerId 工作ID (0~31)
* @param datacenterId 数据中心ID (0~31)
*/
public SnowId(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format
("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format
("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}

// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
* @return SnowflakeId
*/
public synchronized long nextId() {
long timestamp = timeGen();

//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format
("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}

//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}

//上次生成ID的时间截
lastTimestamp = timestamp;

//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift) //
| (datacenterId << datacenterIdShift) //
| (workerId << workerIdShift) //
| sequence;
}

/**
* 阻塞到下一个毫秒,直到获得新的时间戳
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}

/**
* 返回以毫秒为单位的当前时间
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}

//==============================Test=============================================
/** 测试 */
public static long snowId() {
SnowId idWorker = new SnowId(0, 0);
long id = idWorker.nextId();
return id;
// log.info("id:"+id);
//id:768842202204864512
}
}

+ 60
- 0
src/main/java/com/github/bluesbruce/spring/utils/SpringUtil.java Ver fichero

@@ -0,0 +1,60 @@
package com.github.bluesbruce.spring.utils;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
* 实现ApplicationContextAware接口,并加入Component注解,让spring扫描到该bean
* 该类用于在普通Java类中注入bean,普通Java类中用@Autowired是无法注入bean的
*/
@Component
public class SpringUtil implements ApplicationContextAware {

private static ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(SpringUtil.applicationContext == null) {
SpringUtil.applicationContext = applicationContext;
}
}

/**
* 获取applicationContext
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}

/**
*通过name获取 Bean.
* @param name
* @return
*/
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}

/**
* 通过class获取Bean.
* @param clazz
* @param <T>
* @return
*/
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}

/**
* 通过name,以及Clazz返回指定的Bean
* @param name
* @param clazz
* @param <T>
* @return
*/
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name, clazz);
}
}

+ 34
- 0
src/main/java/com/github/bluesbruce/spring/web/SubController.java Ver fichero

@@ -0,0 +1,34 @@
package com.github.bluesbruce.spring.web;


import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

@Controller
public class SubController {
@Resource
private ObjectMapper objectMapper;

@Value("${spring.mqtt.client.id}")
private String clientId;

@Value("${spring.mqtt.default.topic}")
private String defaultTopic;

@RequestMapping("/init")
@ResponseBody
public String subject(String topic, int qos) {
try {
return "发送成功";
} catch (Exception e) {
e.printStackTrace();
return "发送失败";
}
}

}

+ 71
- 0
src/main/resources/application.properties Ver fichero

@@ -0,0 +1,71 @@
# IDENTITY (ContextIdApplicationContextInitializer)
spring.application.name=${project.name}
# 项目contextPath,一般在正式发布版本中,我们不配置
#server.servlet.context-path=/
# 错误页,指定发生错误时,跳转的URL。请查看BasicErrorController源码便知
server.error.path=/errors
# 空白错误页激活
server.error.whitelabel.enabled=true
# 是否包含错误堆栈,默认never,可配置always、on_trace_param
# 其中always为总是显示错误堆栈
# on_trace_param 只有当request的parameter包含trace=true的时候显示
server.error.include-stacktrace=on_trace_param
# 开启GZIP
server.compression.enabled=true
server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
# 超过2KB的文件进行压缩
server.compression.min-response-size=2KB
# 服务端口
server.port=8032
# session最大超时时间(秒),默认为1800秒(30分钟)
server.servlet.session.timeout=3600s
# 设置cookie的HttpOnly标识(true - 通过将无法读取到cookie信息)
server.servlet.session.cookie.http-only=false
# session的cookie名,要设置不同,防止浏览器打开多个项目,session名相同
server.servlet.session.cookie.name=JSESSIONID_${project.artifactId}
# 单个文件上传大小
spring.servlet.multipart.max-file-size=5MB
# 设置总上传的数据大小
spring.servlet.multipart.max-request-size=10MB
# LOG
logging.config=classpath:log4j2.xml
# 使用CGLIB实现切面
spring.aop.proxy-target-class=true
# 资源映射路径为/static/**
spring.mvc.static-path-pattern=/static/**
# SpringBoot视图配置
spring.mvc.view.prefix=/
spring.mvc.view.suffix=.jsp
# 资源映射地址
spring.web.resources.static-locations=classpath:/static/
############################################
# MyBatis-Spring-Boot-Starter 相关配置
############################################
mybatis.type-aliases-package=com.github.bluesbruce.h2.service.dao
mybatis.mapper-locations=classpath:/mapper/*Mapper.xml
mybatis.config-location=classpath:/config/mybatis-config.xml
############################################
# 配置H2数据库
############################################
spring.h2.console.enabled=true
# h2的web管理界面
spring.h2.console.path=/h2console
spring.h2.console.settings.trace=true
# 允许外部访问
spring.h2.console.settings.web-allow-others=true
# 用来检测连接是否有效的sql
spring.datasource.druid.validation-query=SELECT 'H'
spring.mqtt.username=admin
spring.mqtt.password=public
spring.mqtt.client.id=123456789999
spring.mqtt.default.topic=123456789999


spring.devtools.restart.enabled=true
spring.devtools.restart.additional-exclude=src/main/java
spring.devtools.restart.exclude=webapp/**
spring.freemarker.cache=false





+ 61
- 0
src/main/resources/config/mybatis-config.xml Ver fichero

@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<properties/>
<!-- 详细配置:http://www.mybatis.org/mybatis-3/configuration.html#settings -->
<settings>
<!-- 指定MyBatis的日志实现使用 -->
<!-- SLF4J | LOG4J | LOG4J2 | JDK_LOGGING | COMMONS_LOGGING | STDOUT_LOGGING | NO_LOGGING -->
<setting name="logImpl" value="SLF4J"/>

<!-- 全局的映射器启用或禁用缓存 -->
<setting name="cacheEnabled" value="true"/>

<!-- 全局启用或禁用延迟加载。当禁用时,所有关联对象都会即时加载 -->
<setting name="lazyLoadingEnabled" value="true"/>

<!-- 允许或不允许多种结果集从一个单独的语句中返回(需要适合的驱动) -->
<setting name="multipleResultSetsEnabled" value="true"/>

<!-- 使用列标签代替列名。不同的驱动在这方便表现不同。参考驱动文档或充分测试两种方法来决定所使用的驱动 -->
<setting name="useColumnLabel" value="true"/>

<!-- 允许JDBC支持生成的键。需要适合的驱动。
如果设置为true则这个设置强制生成的键被使用,尽管一些驱动拒绝兼容但仍然有效(比如Derby) -->
<setting name="useGeneratedKeys" value="false"/>

<!-- 指定MyBatis如何自动映射列到字段/属性。PARTIAL只会自动映射简单,
没有嵌套的结果。FULL会自动映射任意复杂的结果(嵌套的或其他情况) -->
<setting name="autoMappingBehavior" value="PARTIAL"/>

<!-- 配置默认的执行器。SIMPLE执行器没有什么特别之处。
REUSE执行器重用预处理语句。BATCH执行器重用语句和批量更新 -->
<!-- <setting name="defaultExecutorType" value="BATCH" /> -->

<!-- 设置超时时间,它决定驱动等待一个数据库响应的时间 -->
<!-- <setting name="defaultStatementTimeout" value="25000" /> -->

<!-- 允许在嵌套语句中使用分页-->
<setting name="safeRowBoundsEnabled" value="false"/>

<!--是否开启自动驼峰命名规则(camel case)映射,
即从经典数据库列名 A_COLUMN 到经典 Java 属性名 aColumn 的类似映射。-->
<setting name="mapUnderscoreToCamelCase" value="true"/>

<!--MyBatis 利用本地缓存机制(Local Cache)防止循环引用(circular references)和加速重复嵌套查询。
默认值为 SESSION,这种情况下会缓存一个会话中执行的所有查询。
若设置值为 STATEMENT,本地会话仅用在语句执行上,对相同 SqlSession 的不同调用将不会共享数据。-->
<setting name="localCacheScope" value="STATEMENT"/>

<!-- 当没有为参数提供特定的 JDBC 类型时,为空值指定 JDBC 类型。
某些驱动需要指定列的 JDBC 类型,多数情况直接用一般类型即可,比如 NULL、VARCHAR、OTHER。-->
<setting name="jdbcTypeForNull" value="OTHER"/>

<!-- 指定对象的方法触发一次延迟加载。-->
<setting name="lazyLoadTriggerMethods" value="equals,clone,hashCode,toString"/>

<!-- 设置关联对象加载的形态,此处为按需加载字段(加载字段由SQL指 定),不会加载关联表的所有字段,以提高性能 -->
<setting name="aggressiveLazyLoading" value="false"/>
</settings>
</configuration>

+ 31
- 0
src/main/resources/log4j2.xml Ver fichero

@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="warn" debug="true" packages="qs.config">
<Properties>
<!-- 配置日志文件输出目录 ${sys:user.home} -->
<Property name="LOG_HOME">D:\FFrtmp-test</Property>
<property name="PATTERN">%d{MM-dd HH:mm:ss.SSS} [%t-%L] %-5level %logger{36} - %msg%n</property>
</Properties>

<appenders>
<Console name="Console" target="SYSTEM_OUT">
<ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="[%d{HH:mm:ss.SSS}] [%t] %-5level %class{36} %L %M - %msg%xEx%n"/>
</Console>

<RollingFile name="RollingFileInfo" fileName="${LOG_HOME}/info.log" filePattern="${LOG_HOME}/$${date:yyyy-MM}/info-%d{yyyy-MM-dd}-%i.log">
<ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
</RollingFile>
</appenders>

<loggers>
<root level="info">
<appender-ref ref="RollingFileInfo"/>
<appender-ref ref="Console"/>
</root>
</loggers>
</configuration>

+ 10
- 0
src/main/resources/static/index.html Ver fichero

@@ -0,0 +1,10 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
fgsagfs
</body>
</html>

+ 476
- 0
src/main/webapp/sub.jsp Ver fichero

@@ -0,0 +1,476 @@
<!DOCTYPE html>
<!--依赖Bootstrap4.0和Jquery3.2.1-->
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device initial-scale=1,shrink-to-fit=no" >
<meta name="template" content="album static web page">
<meta name="author" content="https://gitee.com/PirateFlag">

<link rel="icon" href="">
<title>mqtt工具</title>

<!-- 引入 Bootstrap -->
<link href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
<!--Custom CSS,目前没有使用到,计划用于微调位置-->
<link rel="stylesheet" href="album.css">
</head>
<body>
<%--<header>
<nav>
<!--class表示属性,id表示具体,同时下划线命名法-->

<!--下拉内容-->
&lt;%&ndash;<div class="collapse navbar-collapse bg-dark" id="navbar_header">
<div class="container">
<div class="row">
<div class="col-sm-8 col-md-7 py-4">
<h4 class="text-white">详细介绍</h4>
<p class="text-muted">可以向相册加入信息,作者名称背景等,然后分享到社交网络上,目前仅仅是静态页面</p>
</div>
<!--col-sm-4 offset-md-1 py-4使用的是bootstrap的栅格偏移系统,用于适配不同页面-->
<div class="col-sm-4 offset-md-1 py-4">
<h4>联系作者</h4>
<ul class="list-unstyled">
<li><a href="#" class="text-white" title="277625446">通过QQ联系</a></li>
<li><a href="https://gitee.com/PirateFlag" class="text-white" title="PirateFlag">通过码云联系</a></li>
<li><a href="https://github.com/athanasiaorange" class="text-white" title="athanasiaorange">通过Github联系</a></li>
</ul>
</div>
</div>
</div>
</div>&ndash;%&gt;

<!--标志内容-->
<div class="navbar navbar-dark bg-dark box-shadow">
<div class="container d-flex justify-content-between">
<!--主页logo-->
&lt;%&ndash;<a href="#" class="navbar-brand d-flex">
<!--引用一只来自外部的svg图像,你也可以更换成自己的图像,这是适配页面的矢量图-->
<svg xmlns="http://www.w3.org/200/svg" width="35" height="35" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" class="mr-2">
<path xmlns="http://www.w3.org/2000/svg" d="M 23 19 a 2 2 0 0 1 -2 2 H 3 a 2 2 0 0 1 -2 -2 V 8 a 2 2 0 0 1 2 -2 h 4 l 2 -3 h 6 l 2 3 h 4 a 2 2 0 0 1 2 2 Z">
</path>
<circle cx="12" cy="13" r="4"></circle>
</svg>
<strong>我的相册</strong>
</a>&ndash;%&gt;
<!--搜索框-->
<!-- <div class="form-group">
<label for=""></label>
<input type="text" name="" id="" class="form-control" placeholder="" aria-describedby="helpId">
<small id="helpId" class="text-muted">Help text</small>
</div> -->
<!--按钮内容-->
<button class="navbar-toggler collapsed" type="button" data-toggle="collapse" data-target="#navbar_header">
<span class="navbar-toggler-icon"></span>
</button>
</div>
</div>
</nav>
</header>--%>

<div role="">
<!--主要介绍-->
<div class="container-fluid" style="height: 150px;">
<div class="row" style="margin-top: 25px">
<div class="col-md-6">
<select type="" class="form-control" id="selectV" >
<option value="0">请选择</option>
<%--<option value="tcp://106.15.120.154:1883">测试环境</option>
<option value="tcp://192.168.11.22:1883">开发环境</option>--%>
</select>

</div>
<div class="col-md-3">
<input type="text" class="form-control" id="clientId" placeholder="请输入客户端id">
</div>
<button type="" class="btn btn-success" onclick="selectV()" style="">切换</button>

<label class="remoteclass" style="margin-left: 20%;font-size: 22px">kaifa</label>
<%--<a href="#" class="btn btn-primary my-2">主要跳转按钮</a>
<a href="#" class="btn btn-warning my-2">次要跳转按钮</a>--%>
</div>
<div class="row" style="margin-top: 25px">
<div class="col-md-6">
<input type="text" class="form-control" id="subAdd1" placeholder="请输入订阅">
</div>
<div class="col-md-1">
<input type="text" class="form-control" id="pxAdd1" placeholder="请输入排序">
</div>
<button type="" class="btn btn-success" onclick="subAdd1()" style="">提交</button>

<button type="" class="btn btn-success" onclick="loadSubList()" style="margin-left: 20%">订阅列表</button>
<%--<a href="#" class="btn btn-primary my-2">主要跳转按钮</a>
<a href="#" class="btn btn-warning my-2">次要跳转按钮</a>--%>
</div>
</div>

<div class="" >
<div class="container-fluid">

<!--单个流卡-->
<div class="row">
<div class="col-md-6">
<!--使用card式布局-->
<div class="card mb-6 box-shadow">
<div class="card body">
<%--<h5 class="card-title">订阅</h5>--%>
<a href="#" onclick="subAll()" class="btn btn-success my-2" style="float: right">全部订阅</a>
<div class="d-flex justify-content-between aligin-item-center">
<table id="reportTable1" class="table table-bordered table-condensed">
<thead>
<th width="10%" style="display:none;">id</th><th width="10%">排序</th><th width="10%">订阅</th><th width="10%">消息</th><th width="10%">备注</th><th width="10%" colspan="2">操作</th>
</thead>
<tbody id="subTr">
</tbody>
<tfoot>

</tfoot>
</table>
</div>
</div>
</div>
</div>
<div class="col-md-6" style="height:800px !important;overflow: auto">
<!--使用card式布局-->
<div class="card mb-4 box-shadow">
<div class="card body" style="background-color: #c2bded">
<h5 class="card-title"></h5>
<div class="col-md-12 classmsg" >
<%--<div style="background-color: green" class="col-md-12"><span>sdfa</span><pre style='white-space: pre-wrap;'>啊手动阀 </pre></div>
<div class="col-md-12"><pre style='white-space: pre-wrap;'> 啊手动阀</pre></div>
<small class="text-muted"></small>--%>
</div>
</div>
</div>
</div>

</div>
</div>
</div>

</div>
<footer class="text-muted">
<div class="container">
<p class="float-right">
<a href="#">回到顶部</a>
</p>
<p></p>
</div>
</footer>

<script src="http://code.jquery.com/jquery-2.1.4.min.js"></script>
<!--popper的CDN依赖-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.14.7/umd/popper.min.js" integrity="sha384-UO2eT0CpHqdSJQ6hJty5KVphtPhzWj9WO1clHTMGa3JDZwrnQq4sF86dIHNDz0W1" crossorigin="anonymous"></script>
<!-- 最新的 Bootstrap 核心 JavaScript 文件 -->
<script src="https://cdn.staticfile.org/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>

</body>

<!-- 模态框(Modal) -->
<div class="modal fade" id="myModal" tabindex="-1" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true" style="height: 500px;scroll:auto ">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal" aria-hidden="true">&times;</button>
<h4 class="modal-title" id="myModalLabel">模态框(Modal)标题</h4>
</div>
<div class="modal-body" >
<table class="table" >
<thead>
<tr>
<th>主题</th><th>Qos</th><th>节点</th>
</tr>
</thead>
<tbody id="Subtable">

</tbody>
</table>

</div>
<div class="modal-footer">
<button type="button" class="btn btn-default" data-dismiss="modal">关闭</button>

</div>
</div><!-- /.modal-content -->
</div><!-- /.modal -->

</div>

<script>
$(function () {
sub();
indexMsg();
remoteloadSelect();
remote()
/*var s="{\n" +
" \"mid\": 1000000000020033,\n" +
" \"deviceid\": \"1000000001154323\",\n" +
" \"timestamp\": 1652248505,\n" +
" \"param\": {\n" +
" \"Action\": 0\n" +
" }\n" +
" }";
$("pre").html(s);*/
window.setInterval(indexMsg,2000)

})
function subAll(){
$.ajax({
type:"get",
url:"/subAll",
//data:{"checkName":$("#userName").val()}, //以键值对的方式传数据到后台 Servlet
dataType:"json",
success:function(mydata){
if (mydata=="success") {
alert("操作成功!"); //获取回调值mydata中数据:mydata.键《find》------>的值表示方法 该回调函数值mydata可以是JSON格式的:对象或者数组
// console.log(mydata.find);
}else{
alert("操作失败!");
}
}
});
}
function sub() {
$.ajax({
type:"get",
url:"/indexsub",
data:{"checkName":$("#userName").val()}, //以键值对的方式传数据到后台 Servlet
dataType:"json",
success:function(mydata){
// alert(mydata.find); //获取回调值mydata中数据:mydata.键《find》------>的值表示方法 该回调函数值mydata可以是JSON格式的:对象或者数组
// console.log(mydata.find);
if(mydata!==null){
var str ="";
for (var i = 0; i <mydata.length ; i++) {

var ex01=mydata[i].ex01==null?"":mydata[i].ex01;
var ex02=mydata[i].ex02==null?"":mydata[i].ex02;
var ex03=mydata[i].ex03==null?"":mydata[i].ex03;
str+="<tr class=\"info\"><td style='display: none'>"+mydata[i].id+"</td>" +
"<td style='background-color: #d3e4db' ondblclick='msgClick(this)'><label >"+ex03+"</label>" +
"<input class='tdedit' onchange='msgvalue(this)' onblur='ex02value(this)' type='text' style='display: none' ></td>" +
"<td>"+mydata[i].sub+"</td>" +
"<td style='background-color: #d3e4db' ondblclick='msgClick(this)'><label >"+ex01+"</label>" +
"<input class='tdedit' onchange='msgvalue(this)' onblur='msgvalue(this)' type='text' style='display: none' ></td>" +
"<td style='background-color: #c9d4e4' ondblclick='msgClick(this)'><label >"+ex02+"</label >" +
"<input class='tdedit' onchange='ex02value(this)' onblur='ex02value(this)' type='text' style='display: none' ></td>" +
"<td><button class='btn-success' onclick='subsave(this)'>保存</button><button class='btn-danger' onclick='delsave(this)'>删除</button></td>" +
"<td><button class='btn-info' onclick='subsub(this)'>发送</button></td>" +
"</tr>";
}
$("#subTr").html(str);
}
}
});
}

function msgClick(obj) {
//if($(this).find("input").css("display")=="none"){
$(obj).find("label").text("");
$(obj).find("input").css("display","");
// }
}

function msgvalue(obj) {
$(obj).hide();
$(obj).closest("td").find("label").text($(obj).val())
}

function ex02value(obj) {
$(obj).hide();
$(obj).closest("td").find("label").text($(obj).val())
}

function subsub(obj) {
var sub= $(obj).closest("tr").find("td").eq(2).text();
var ex01= $(obj).closest("tr").find("td").eq(3).find("label").text();
$.ajax({
type: "post",
url: "/sendMessage",
data: JSON.stringify({ "sub":sub,"ex01":ex01}), //以键值对的方式传数据到后台 Servlet
dataType: "json",
contentType: 'application/json',
cache: false,
success: function (mydata) {
if (mydata=="success"){
alert("操作成功!")
}
}
})
}


function subsave(obj) {
var id= $(obj).closest("tr").find("td").eq(0).text();
var ex03= $(obj).closest("tr").find("td").eq(1).find("label").text();
var sub= $(obj).closest("tr").find("td").eq(2).text();
var ex01= $(obj).closest("tr").find("td").eq(3).find("label").text();
var ex02= $(obj).closest("tr").find("td").eq(4).find("label").text();

$.ajax({
type: "post",
url: "/editsub",
data: JSON.stringify({"id": id, "sub":sub,"ex01":ex01,"ex02":ex02,"ex03":ex03}), //以键值对的方式传数据到后台 Servlet
dataType: "json",
contentType: 'application/json',
cache: false,
success: function (mydata) {
if (mydata=="success"){
alert("操作成功!")
}
}
})
}
function delsave(obj) {
var id= $(obj).closest("tr").find("td").eq(0).text();
var ex03= $(obj).closest("tr").find("td").eq(1).find("label").text();
var sub= $(obj).closest("tr").find("td").eq(2).text();
var ex01= $(obj).closest("tr").find("td").eq(3).find("label").text();
var ex02= $(obj).closest("tr").find("td").eq(4).find("label").text();

$.ajax({
type: "post",
url: "/delsub",
data: JSON.stringify({"id": id, "sub":sub,"ex01":ex01,"ex02":ex02,"ex03":ex03}), //以键值对的方式传数据到后台 Servlet
dataType: "json",
contentType: 'application/json',
cache: false,
success: function (mydata) {
if (mydata=="success"){
alert("操作成功!")
window.location.reload();
}
}
})
}
function subAdd1() {
if ($("#subAdd1").val().trim()=="") {
alert("不能为空!")
return;
}
$.ajax({
type: "post",
url: "/addsub1",
data: JSON.stringify({"sub":$("#subAdd1").val(),"ex03":$("#pxAdd1").val()}), //以键值对的方式传数据到后台 Servlet
dataType: "json",
contentType: 'application/json',
cache: false,
success: function (mydata) {
if (mydata=="success"){
alert("操作成功!")
window.location.reload();
}
}
})
}
function indexMsg() {
$.ajax({
type: "post",
url: "/indexMsg",
dataType: "json",
contentType: 'application/json',
cache: false,
success: function (mydata) {
var divmsg="";
if (mydata.length>0){
for (var i = 0; i < mydata.length; i++) {
var msg=mydata[i].msg;
divmsg+="<div style=\"background-color: #a8c8fc;border: 1px solid #000;\" class=\"col-md-12\"><span style='font-weight: bold'>"+mydata[i].sub+"</span><pre style='white-space: pre-wrap;background-color: #a8c8fc'>"+mydata[i].msg+"</pre><span>"+mydata[i].time+"</span>" +
"<span style='float: right'>"+mydata[i].ex01+"</span></div>";
}
$(".classmsg").html(divmsg);
}
}
})
}

function selectV() {
$.ajax({
type: "post",
url: "/connectMqtt",
data: JSON.stringify({"ip":$("#selectV").val(),"clientId":$("#clientId").val()}),
dataType: "json",
contentType: 'application/json',
cache: false,
success: function (mydata) {

if (mydata=="success"){
alert("连接成功!")
window.location.reload();
}else{
alert("连接失败!")
}
}
})
}

function loadSubList() {
$("#myModal").modal("show");
$.ajax({
type: "post",
url: "/doGetSub",
data: JSON.stringify({"ip":$("#selectV").val()}),
dataType: "json",
contentType: 'application/json',
cache: false,
success: function (mydata) {
var substr="";
if (mydata.length>0){
var obj= JSON.parse(mydata);
for (var i = 0; i < obj.data.length; i++) {
substr+="<tr><td>"+obj.data[i].topic+"</td><td>"+obj.data[i].qos+"</td><td>"+obj.data[i].node+"</td></tr>";
}
$("#Subtable").html(substr);
}else{
alert("连接失败!")
}
}
})
}

function remote() {

$.ajax({
type: "get",
url: "/getRemote",
//data: JSON.stringify({"sub":$("#subAdd1").val(),"ex03":$("#pxAdd1").val()}), //以键值对的方式传数据到后台 Servlet
//dataType: "json",
contentType: 'application/json',
cache: false,
success: function (mydata) {
if (mydata!=null){
$(".remoteclass").html(mydata.ex01)
$("#selectV").val(mydata.url)
}
}
})
}

function remoteloadSelect() {
$.ajax({
type: "get",
url: "/getAllRemote",
//data: JSON.stringify({"sub":$("#subAdd1").val(),"ex03":$("#pxAdd1").val()}), //以键值对的方式传数据到后台 Servlet
//dataType: "json",
contentType: 'application/json',
cache: false,
success: function (mydata) {
if (mydata!=null){
var str="";
var urls ="";
for (var i = 0; i < mydata.length; i++) {
str+="<option value=\""+mydata[i].url+"\">"+mydata[i].ex01+"</option>";
if(mydata[i].status=="1"){
urls=mydata[i].url;
}
}
$("#selectV").html(str);
$("#selectV").val(urls)
}
}
})
}
</script>
</html>

Cargando…
Cancelar
Guardar