瀏覽代碼

机场推流服务

undefined
chenyukun 1 年之前
父節點
當前提交
606c231a45
共有 100 個檔案被更改,包括 0 行新增1530 行删除
  1. +0
    -141
      .gitignore
  2. +0
    -8
      .idea/.gitignore
  3. +0
    -14
      .idea/inspectionProfiles/Project_Default.xml
  4. +0
    -6
      .idea/misc.xml
  5. +0
    -8
      .idea/modules.xml
  6. +0
    -9
      .idea/tuoheng_push_stream.iml
  7. +0
    -6
      .idea/vcs.xml
  8. +0
    -11
      README.md
  9. +0
    -40
      airportMedia.py
  10. +0
    -43
      bean/Feedback.py
  11. +0
    -20
      bean/File.py
  12. +0
    -24
      bean/Result.py
  13. +0
    -0
      bean/__init__.py
  14. 二進制
      bean/__pycache__/Feedback.cpython-38.pyc
  15. 二進制
      bean/__pycache__/Feedback.cpython-39.pyc
  16. 二進制
      bean/__pycache__/File.cpython-38.pyc
  17. 二進制
      bean/__pycache__/File.cpython-39.pyc
  18. 二進制
      bean/__pycache__/Result.cpython-38.pyc
  19. 二進制
      bean/__pycache__/Result.cpython-39.pyc
  20. 二進制
      bean/__pycache__/__init__.cpython-38.pyc
  21. 二進制
      bean/__pycache__/__init__.cpython-39.pyc
  22. +0
    -22
      common/Constant.py
  23. +0
    -0
      common/__init__.py
  24. 二進制
      common/__pycache__/Constant.cpython-38.pyc
  25. 二進制
      common/__pycache__/Constant.cpython-39.pyc
  26. 二進制
      common/__pycache__/__init__.cpython-38.pyc
  27. 二進制
      common/__pycache__/__init__.cpython-39.pyc
  28. +0
    -0
      concurrency/__init__.py
  29. 二進制
      concurrency/__pycache__/__init__.cpython-38.pyc
  30. 二進制
      concurrency/__pycache__/__init__.cpython-39.pyc
  31. +0
    -61
      concurrency/http/HttpFeedbackThread.py
  32. +0
    -95
      concurrency/http/HttpServiceImpl.py
  33. +0
    -389
      concurrency/http/HttpUploadFileProcess.py
  34. +0
    -0
      concurrency/http/__init__.py
  35. 二進制
      concurrency/http/__pycache__/HttpFeedbackThread.cpython-38.pyc
  36. 二進制
      concurrency/http/__pycache__/HttpFeedbackThread.cpython-39.pyc
  37. 二進制
      concurrency/http/__pycache__/HttpServiceImpl.cpython-38.pyc
  38. 二進制
      concurrency/http/__pycache__/HttpServiceImpl.cpython-39.pyc
  39. 二進制
      concurrency/http/__pycache__/HttpUploadFileProcess.cpython-38.pyc
  40. 二進制
      concurrency/http/__pycache__/HttpUploadFileProcess.cpython-39.pyc
  41. 二進制
      concurrency/http/__pycache__/__init__.cpython-38.pyc
  42. 二進制
      concurrency/http/__pycache__/__init__.cpython-39.pyc
  43. +0
    -35
      concurrency/mqtt/MqttFeedbackThread.py
  44. +0
    -390
      concurrency/mqtt/MqttUploadFileProcess.py
  45. +0
    -0
      concurrency/mqtt/__init__.py
  46. 二進制
      concurrency/mqtt/__pycache__/MqttFeedbackThread.cpython-38.pyc
  47. 二進制
      concurrency/mqtt/__pycache__/MqttFeedbackThread.cpython-39.pyc
  48. 二進制
      concurrency/mqtt/__pycache__/MqttUploadFileProcess.cpython-38.pyc
  49. 二進制
      concurrency/mqtt/__pycache__/MqttUploadFileProcess.cpython-39.pyc
  50. 二進制
      concurrency/mqtt/__pycache__/__init__.cpython-38.pyc
  51. 二進制
      concurrency/mqtt/__pycache__/__init__.cpython-39.pyc
  52. +0
    -12
      config/aliyun.yml
  53. +0
    -20
      config/logger.yml
  54. +0
    -25
      config/mqtt.yml
  55. +0
    -12
      config/service.yml
  56. +0
    -25
      enums/ExceptionEnum.py
  57. +0
    -25
      enums/HttpExceptionEnum.py
  58. +0
    -34
      enums/StatusEnum.py
  59. +0
    -0
      enums/__init__.py
  60. 二進制
      enums/__pycache__/ExceptionEnum.cpython-38.pyc
  61. 二進制
      enums/__pycache__/ExceptionEnum.cpython-39.pyc
  62. 二進制
      enums/__pycache__/HttpExceptionEnum.cpython-38.pyc
  63. 二進制
      enums/__pycache__/HttpExceptionEnum.cpython-39.pyc
  64. 二進制
      enums/__pycache__/StatusEnum.cpython-38.pyc
  65. 二進制
      enums/__pycache__/StatusEnum.cpython-39.pyc
  66. 二進制
      enums/__pycache__/__init__.cpython-38.pyc
  67. 二進制
      enums/__pycache__/__init__.cpython-39.pyc
  68. +0
    -19
      exception/CustomerException.py
  69. +0
    -0
      exception/__init__.py
  70. 二進制
      exception/__pycache__/CustomerException.cpython-38.pyc
  71. 二進制
      exception/__pycache__/CustomerException.cpython-39.pyc
  72. 二進制
      exception/__pycache__/__init__.cpython-38.pyc
  73. 二進制
      exception/__pycache__/__init__.cpython-39.pyc
  74. 二進制
      media.ico
  75. +0
    -36
      osssdk/__init__.py
  76. 二進制
      osssdk/__pycache__/__init__.cpython-38.pyc
  77. 二進制
      osssdk/__pycache__/__init__.cpython-39.pyc
  78. 二進制
      osssdk/__pycache__/api.cpython-38.pyc
  79. 二進制
      osssdk/__pycache__/api.cpython-39.pyc
  80. 二進制
      osssdk/__pycache__/auth.cpython-38.pyc
  81. 二進制
      osssdk/__pycache__/auth.cpython-39.pyc
  82. 二進制
      osssdk/__pycache__/compat.cpython-38.pyc
  83. 二進制
      osssdk/__pycache__/compat.cpython-39.pyc
  84. 二進制
      osssdk/__pycache__/crc64_combine.cpython-38.pyc
  85. 二進制
      osssdk/__pycache__/crc64_combine.cpython-39.pyc
  86. 二進制
      osssdk/__pycache__/credentials.cpython-38.pyc
  87. 二進制
      osssdk/__pycache__/credentials.cpython-39.pyc
  88. 二進制
      osssdk/__pycache__/crypto.cpython-38.pyc
  89. 二進制
      osssdk/__pycache__/crypto.cpython-39.pyc
  90. 二進制
      osssdk/__pycache__/crypto_bucket.cpython-38.pyc
  91. 二進制
      osssdk/__pycache__/crypto_bucket.cpython-39.pyc
  92. 二進制
      osssdk/__pycache__/defaults.cpython-38.pyc
  93. 二進制
      osssdk/__pycache__/defaults.cpython-39.pyc
  94. 二進制
      osssdk/__pycache__/exceptions.cpython-38.pyc
  95. 二進制
      osssdk/__pycache__/exceptions.cpython-39.pyc
  96. 二進制
      osssdk/__pycache__/headers.cpython-38.pyc
  97. 二進制
      osssdk/__pycache__/headers.cpython-39.pyc
  98. 二進制
      osssdk/__pycache__/http.cpython-38.pyc
  99. 二進制
      osssdk/__pycache__/http.cpython-39.pyc
  100. +0
    -0
      osssdk/__pycache__/iterators.cpython-38.pyc

+ 0
- 141
.gitignore 查看文件

@@ -1,141 +0,0 @@
## ---> Java
## Compiled class file
#*.class
#
## Log file
#*.log
#
## BlueJ files
#*.ctxt
#
## Mobile Tools for Java (J2ME)
#.mtj.tmp/
#
## Package Files #
#*.jar
#*.war
#*.nar
#*.ear
#*.zip
#*.tar.gz
#*.rar
#
## virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
#hs_err_pid*
#
## ---> Python
## Byte-compiled / optimized / DLL files
#__pycache__/
#*.py[cod]
#*$py.class
#
## C extensions
#*.so
#
## Distribution / packaging
#.Python
#build/
#develop-eggs/
#dist/
#downloads/
#eggs/
#.eggs/
#lib/
#lib64/
#parts/
#sdist/
#var/
#wheels/
#*.egg-info/
#.installed.cfg
#*.egg
#MANIFEST
#
## PyInstaller
## Usually these files are written by a python script from a template
## before PyInstaller builds the exe, so as to inject date/other infos into it.
#*.manifest
#*.spec
#
## Installer logs
#pip-log.txt
#pip-delete-this-directory.txt
#
## Unit test / coverage reports
#htmlcov/
#.tox/
#.nox/
#.coverage
#.coverage.*
#.cache
#nosetests.xml
#coverage.xml
#*.cover
#.hypothesis/
#.pytest_cache/
#
## Translations
#*.mo
#*.pot
#
## Django stuff:
#*.log
#local_settings.py
#db.sqlite3
#
## Flask stuff:
#instance/
#.webassets-cache
#
## Scrapy stuff:
#.scrapy
#
## Sphinx documentation
#docs/_build/
#
## PyBuilder
#target/
#
## Jupyter Notebook
#.ipynb_checkpoints
#
## IPython
#profile_default/
#ipython_config.py
#
## pyenv
#.python-version
#
## celery beat schedule file
#celerybeat-schedule
#
## SageMath parsed files
#*.sage.py
#
## Environments
#.env
#.venv
#env/
#venv/
#ENV/
#env.bak/
#venv.bak/
#
## Spyder project settings
#.spyderproject
#.spyproject
#
## Rope project settings
#.ropeproject
#
## mkdocs documentation
#/site
#
## mypy
#.mypy_cache/
#.dmypy.json
#dmypy.json
#
## Pyre type checker
#.pyre/
#

+ 0
- 8
.idea/.gitignore 查看文件

@@ -1,8 +0,0 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

+ 0
- 14
.idea/inspectionProfiles/Project_Default.xml 查看文件

@@ -1,14 +0,0 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N803" />
<option value="N806" />
<option value="N802" />
</list>
</option>
</inspection_tool>
</profile>
</component>

+ 0
- 6
.idea/misc.xml 查看文件

@@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" languageLevel="JDK_19" project-jdk-name="Python 3.10" project-jdk-type="Python SDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

+ 0
- 8
.idea/modules.xml 查看文件

@@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/tuoheng_push_stream.iml" filepath="$PROJECT_DIR$/.idea/tuoheng_push_stream.iml" />
</modules>
</component>
</project>

+ 0
- 9
.idea/tuoheng_push_stream.iml 查看文件

@@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.8 (test)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

+ 0
- 6
.idea/vcs.xml 查看文件

@@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

+ 0
- 11
README.md 查看文件

@@ -1,11 +0,0 @@
## tuoheng_airprot_media

# 机场媒体服务说明文档
https://docs.qq.com/doc/DYmhNc0xZR29NVnhJ?u=8c0e18a5a26b43ae9410f6c2d6192683

# 生产mqttt
url: tcp://mqtt.t-aaron.com:10883
#用户名
username: admin
#密码
password: admin##123

+ 0
- 40
airportMedia.py 查看文件

@@ -1,40 +0,0 @@
# -*- coding: utf-8 -*-
from multiprocessing import freeze_support
from os.path import dirname, realpath, join
from loguru import logger

from service.MqttDisService import MqttDispatcherService
from service.HttpDisService import HttpDispatcherService
from util.LogUtils import init_log
from util.RWUtils import getConfigs

'''
主程序入口
'''

if __name__ == '__main__':
freeze_support()
base_dir = dirname(realpath(__file__))
init_log(base_dir)
logger.info("(♥◠‿◠)ノ゙ 【机场媒体服务】开始启动 ლ(´ڡ`ლ)゙")
print("(♥◠‿◠)ノ゙ 【机场媒体服务】开始启动 ლ(´ڡ`ლ)゙")
print("############################################################")
print("配置文件路径: ", join(base_dir, "config"))
print("服务配置文件: service.yml")
print("mqtt配置文件: mqtt.yml")
print("日志配置文件: logger.yml")
print("阿里云配置文件: aliyun.yml")
print("日志文件路径: ", join(base_dir, "logs"))
print("############################################################")
# mqtt交互
service_config = getConfigs(base_dir, "config/service.yml")
service_config['base_dir'] = base_dir
if 1 == service_config['docking_method']:
print("当前使用的交互模式是mqtt交互!!")
print("############################################################")
MqttDispatcherService(service_config)
elif 2 == service_config['docking_method']:
print("当前使用的交互模式是接口交互!!")
print("############################################################")
HttpDispatcherService(service_config)


+ 0
- 43
bean/Feedback.py 查看文件

@@ -1,43 +0,0 @@
# -*-coding:utf-8 -*-
from enums.StatusEnum import UploadTaskStatusType
from util.QueUtil import put_queue
from util.TimeUtils import now_date_to_str


def upload_result(fb_queue, requestId, errorCode="", errorMsg="", status=UploadTaskStatusType.RUNNING.value[0],
imageList=[], videoList=[]):
if requestId is not None:
put_queue(fb_queue, ('upload',
{
"requestId": requestId,
"errorCode": errorCode,
"errorMsg": errorMsg,
"status": status,
"imageList": imageList,
"videoList": videoList,
"currentTime": now_date_to_str()
}),
timeout=2)


def rtmp_result(fb_queue, code, msg, data=None):
put_queue(fb_queue, ('rtmp', {"code": code, "msg": msg, "data": data}), timeout=2)


def upload_http_result(fb_queue, callback_url=None, requestId=None, errorCode="", errorMsg="",
status=UploadTaskStatusType.RUNNING.value[0], imageList=[], videoList=[]):
if callback_url is not None and requestId is not None:
put_queue(fb_queue, ('upload',
{
"callback_url": callback_url,
"data": {
"requestId": requestId,
"errorCode": errorCode,
"errorMsg": errorMsg,
"status": status,
"imageList": imageList,
"videoList": videoList,
"currentTime": now_date_to_str()
}
}),
timeout=2)

+ 0
- 20
bean/File.py 查看文件

@@ -1,20 +0,0 @@
# -*-coding:utf-8 -*-
from typing import Union

from fastapi._compat import Required
from pydantic import BaseModel, Field, HttpUrl


class UploadRequest(BaseModel):
requestId: str = Field(default=Required, title="请求id", pattern="^[a-zA-Z0-9]{1,36}$")
callbackUrl: HttpUrl = Field(default=Required, title="回调地址")


class CallbackRequest(BaseModel):
requestId: Union[str, None]
errorCode: Union[str, None]
errorMsg: Union[str, None]
status: Union[int, None]
imageList: Union[list, None]
videoList: Union[list, None]
currentTime: Union[str, None]

+ 0
- 24
bean/Result.py 查看文件

@@ -1,24 +0,0 @@
# -*-coding:utf-8 -*-
from pydantic import BaseModel


class JsonResult(BaseModel):
code: int
msg: str
data: None

@staticmethod
def success(code=0, msg="操作成功!", data=None):
return {
"code": code,
"msg": msg,
"data": data
}

@staticmethod
def error(code=-1, msg="操作失败!", data=None):
return {
"code": code,
"msg": msg,
"data": data
}

+ 0
- 0
bean/__init__.py 查看文件


二進制
bean/__pycache__/Feedback.cpython-38.pyc 查看文件


二進制
bean/__pycache__/Feedback.cpython-39.pyc 查看文件


二進制
bean/__pycache__/File.cpython-38.pyc 查看文件


二進制
bean/__pycache__/File.cpython-39.pyc 查看文件


二進制
bean/__pycache__/Result.cpython-38.pyc 查看文件


二進制
bean/__pycache__/Result.cpython-39.pyc 查看文件


二進制
bean/__pycache__/__init__.cpython-38.pyc 查看文件


二進制
bean/__pycache__/__init__.cpython-39.pyc 查看文件


+ 0
- 22
common/Constant.py 查看文件

@@ -1,22 +0,0 @@
# -*- coding: utf-8 -*-
from multiprocessing import Queue

SHARE_QUEUE = Queue()


def get_share_queue():
return SHARE_QUEUE


TASK_RECORD = {
"upload": None
}


def get_task_record():
return TASK_RECORD


# EasyRtmpLive
CHANNEL_LIST_URL = "http://localhost:19610/api/v1/getChannelList?offset=0&row=50"
UPDATE_CHANNEL_URL = 'http://localhost:19610/api/v1/updateChannel?%s'

+ 0
- 0
common/__init__.py 查看文件


二進制
common/__pycache__/Constant.cpython-38.pyc 查看文件


二進制
common/__pycache__/Constant.cpython-39.pyc 查看文件


二進制
common/__pycache__/__init__.cpython-38.pyc 查看文件


二進制
common/__pycache__/__init__.cpython-39.pyc 查看文件


+ 0
- 0
concurrency/__init__.py 查看文件


二進制
concurrency/__pycache__/__init__.cpython-38.pyc 查看文件


二進制
concurrency/__pycache__/__init__.cpython-39.pyc 查看文件


+ 0
- 61
concurrency/http/HttpFeedbackThread.py 查看文件

@@ -1,61 +0,0 @@
# -*- coding: utf-8 -*-
import time
from json import loads
from threading import Thread
from traceback import format_exc

from loguru import logger

from util.QueUtil import get_block_queue
from util.RequestUtil import HttpRequests


class HttpFeedbackThread(Thread):
__slots__ = '__fb_queue'

def __init__(self, fb_queue):
super().__init__()
self.__fb_queue = fb_queue

def run(self):
logger.info("启动反馈线程")
fb_queue = self.__fb_queue
request = HttpRequests()
try:
headers = {'content-type': "application/json"}
while True:
try:
fb = get_block_queue(fb_queue)
if fb is not None and len(fb) > 0:
if fb[0] == "upload":
upload_req(request, fb[1], headers)
del fb
else:
time.sleep(1)
except Exception:
logger.error("反馈异常:{}", format_exc())
if request:
request.close_session()
request = HttpRequests()
finally:
request.close_session()
logger.info("反馈线程执行完成")


def upload_req(request, upload, headers):
logger.info("开始发送文件上传回调请求, 回调地址: {}", upload['callback_url'])
logger.info("开始发送文件上传回调请求, 回调请求体: {}", upload['data'])
try:
response = request.send_request('POST', upload['callback_url'], data=upload['data'], headers=headers, timeout=3)
if response.status_code != 200:
logger.error("上传回调请求失败! 状态码: {}, {}", response.status_code, response.__dict__)
else:
content = response.content.decode('utf-8')
if content is not None and len(content) > 0:
content = loads(content)
code = content.get('code')
if code is not None and code != 0:
logger.error("上传回调请求失败! 失败描述: {}", content.get('msg'))
except Exception:
logger.error("回调请求失败, 请求体: {}, 回调地址: {}, 异常信息: {}", upload['data'], upload['callback_url'],
format_exc())

+ 0
- 95
concurrency/http/HttpServiceImpl.py 查看文件

@@ -1,95 +0,0 @@
# -*- coding: utf-8 -*-
import sys
from multiprocessing import Queue
from threading import Thread
from time import sleep
from traceback import format_exc

from loguru import logger

from bean.Feedback import upload_http_result
from common.Constant import get_task_record, get_share_queue
from concurrency.http.HttpFeedbackThread import HttpFeedbackThread
from concurrency.http.HttpUploadFileProcess import HttpUploadFileProcess
from enums.ExceptionEnum import UploadStrExceptionType
from enums.StatusEnum import UploadTaskStatusType
from exception.CustomerException import ServiceException
from util.QueUtil import get_no_block_queue


class HttpServiceImpl:
__slots__ = ()

def __init__(self, service_config):
service_thread = Thread(target=self.start_service, args=(service_config,))
service_thread.setDaemon(True)
service_thread.start()

@staticmethod
def start_service(service_config):
fb_queue = Queue()
task, msg_queue = get_task_record(), get_share_queue()
handle_method = {
"upload": lambda x, y, z, h: handle_upload(x, y, z, h)
}
feedbackThread = None
while True:
try:
if task["upload"] is not None and not task["upload"].is_alive():
task["upload"] = None
feedbackThread = start_feedback_thread(feedbackThread, fb_queue)
message = get_no_block_queue(msg_queue)
if message is not None and isinstance(message, tuple) and len(message) == 2:
if handle_method.get(message[0]) is not None:
handle_method[message[0]](message[1], service_config, task, fb_queue)
else:
sleep(1)
except Exception:
logger.error("服务异常: {}", format_exc())


def handle_upload(msg, service_config, task, fb_queue):
try:
command = msg["command"]
if 'start' == command:
if task["upload"] is not None:
logger.warning("上传任务已存在!!!")
upload_http_result(fb_queue, msg["callback_url"], msg["request_id"],
errorCode=UploadStrExceptionType.UPLOAD_TASK_IS_AREADLY.value[0],
errorMsg=UploadStrExceptionType.UPLOAD_TASK_IS_AREADLY.value[1],
status=UploadTaskStatusType.FAILED.value[0])
return
upload_p = HttpUploadFileProcess(fb_queue, service_config, msg["callback_url"], msg["request_id"])
upload_p.start()
task["upload"] = upload_p
elif 'stop' == command:
if task["upload"] is None:
logger.error("任务不存在, 任务无法停止!")
return
task["upload"].sendEvent({"command": "stop"})
except ServiceException as s:
logger.error("文件上传请求异常: {}", s.msg)
upload_http_result(fb_queue, msg["callback_url"], msg["request_id"],
errorCode=s.code,
errorMsg=s.msg,
status=UploadTaskStatusType.FAILED.value[0])
except Exception:
logger.error("消息处理异常: {}", format_exc())
upload_http_result(fb_queue, msg["callback_url"], msg["request_id"],
errorCode=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0],
errorMsg=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1],
status=UploadTaskStatusType.FAILED.value[0])
finally:
del msg


def start_feedback_thread(feedbackThread, fb_queue):
if feedbackThread is None:
feedbackThread = HttpFeedbackThread(fb_queue)
feedbackThread.setDaemon(True)
feedbackThread.start()
else:
if not feedbackThread.is_alive():
logger.error("反馈线程异常停止! 开始终止程序!")
sys.exit()
return feedbackThread

+ 0
- 389
concurrency/http/HttpUploadFileProcess.py 查看文件

@@ -1,389 +0,0 @@
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process, Queue
from os.path import join, exists
from threading import Thread
from time import sleep, time
from traceback import format_exc

from loguru import logger

from bean.Feedback import upload_http_result
from enums.ExceptionEnum import UploadStrExceptionType
from enums.StatusEnum import UploadTaskStatusType, UploadStatusType
from exception.CustomerException import ServiceException
from util.AliyunUtil import OssUtil, VodUtil
from util.FileUtil import get_all_images, get_all_videos, get_file_name, delete_file, create_dir, \
move_file, remove_after_create
from util.LogUtils import init_log
from util.QueUtil import get_no_block_queue, put_queue, clear_queue
from util.ThreadUtil import ThreadPoolExecutorNew
from util.TimeUtils import now_date_to_str, YMDHMS


class HttpUploadFileProcess(Process):
__slots__ = ('__fb_queue', '__requestId', 'videoPath', 'imagePath', 'image_backup', 'video_backup',
'__current_time', '__callback_url', '__service_config', '__event_queue')

def __init__(self, fb_queue, service_config, callback_url, request_id):
super().__init__()
self.__fb_queue = fb_queue
self.__requestId = request_id
self.__event_queue = Queue()
self.__service_config = service_config
self.__callback_url = callback_url
self.__current_time = now_date_to_str(YMDHMS)
backup = join(service_config["file"]["backup"], self.__current_time)
self.videoPath, self.imagePath = service_config["file"]['videoPath'], service_config["file"]['imagePath']
self.image_backup, self.video_backup = join(backup, 'images'), join(backup, 'videos')
upload_http_result(fb_queue, callback_url, request_id, status=UploadTaskStatusType.WAITING.value[0])

def sendEvent(self, result):
put_queue(self.__event_queue, result, timeout=2, is_throw_ex=True)

@staticmethod
def uploadImage(oss, uploadPath, filePath):
try:
oss.resumable_upload(uploadPath, filePath)
except Exception as e:
oss.exception = e
logger.error("oss上传文件异常: {}, uploadPath:{}, filePath:{}", format_exc(), uploadPath, filePath)
raise e

@staticmethod
def uploadVideo(vod, file_title, filePath):
try:
vod.get_play_url(filePath, file_title)
except Exception as e:
vod.exception = e
logger.error("oss上传文件异常: {}, file_title:{}, filePath:{}", format_exc(), file_title, filePath)
raise e

@staticmethod
def move_method(file, backup):
move_file(file, backup)

@staticmethod
def hdThread(imageList, videoList, fb_queue, callback_url, hb_queue, requestId):
try:
logger.info("启动文件上传心跳线程, requestId:{}", requestId)
start_time = time()
count = 0
while True:
if time() - start_time > 43200:
logger.error("心跳线程运行超时!!!!requestId:{}", requestId)
break
command = get_no_block_queue(hb_queue)
if command is not None:
if "stop" == command.get("command"):
logger.info("开始停止心跳线程!!!!requestId:{}", requestId)
break
if count % 5 == 0:
upload_http_result(fb_queue, callback_url, requestId, status=UploadTaskStatusType.RUNNING.value[0],
imageList=imageList, videoList=videoList)
count = 0
count += 1
sleep(1)
except Exception:
logger.error("心跳线程异常:{}, requestId:{}", format_exc(), requestId)
logger.info("心跳线程停止完成!requestId:{}", requestId)

def upload_method(self, service_config, video_path_array, videoTask, videoList, vt, image_path_array, imageTask,
imageList, it):
current_time = self.__current_time
for i in video_path_array:
vod = VodUtil(service_config["base_dir"])
filename = get_file_name(i)
video_result = vt.submit(self.uploadVideo, vod, filename, i)
videoTask[filename] = [video_result, vod, i]
videoList.append({"fileName": filename, "videoUrl": "", "status": UploadStatusType.WAITING.value[0],
"progress": vod.get_progress()})
for i in image_path_array:
oss = OssUtil(service_config["base_dir"])
fileName = get_file_name(i)
uploadPath = "%s/%s" % (current_time, fileName)
image_result = it.submit(self.uploadImage, oss, uploadPath, i)
imageTask[fileName] = [image_result, oss, i]
imageList.append({"fileName": fileName, "imageUrl": "", "status": UploadStatusType.WAITING.value[0],
"progress": oss.progress})

def start_hd_thread(self, imageList, videoList, fb_queue, callback_url, hb_queue, requestId):
hd = Thread(target=self.hdThread, args=(imageList, videoList, fb_queue, callback_url, hb_queue, requestId))
hd.setDaemon(True)
hd.start()
return hd

@staticmethod
def stop_hd_thread(hb, hb_queue, requestId):
if hb is not None and hb.is_alive():
put_queue(hb_queue, {"command": "stop"}, timeout=2, is_throw_ex=False)
start = time()
hb.join(60)
if time() - start > 60:
logger.error("心跳线程停止超时, requestId:{}", requestId)
clear_queue(hb_queue)

@staticmethod
def change_status(imageList, num, imageTask, videoList, videoTask, requestId, deleteImageList, deleteVideoList):
for image in imageList:
if image["status"] < UploadStatusType.SUCCESS.value[0]:
num += 1
image_task = imageTask.get(image.get("fileName"))
# (image_result, oss, i)
if image_task:
if not image_task[0].done():
image["status"] = UploadStatusType.RUNNING.value[0]
image["progress"] = image_task[1].get_progress()
continue
if image_task[0].done():
try:
image_task[0].result()
image["imageUrl"] = image_task[1].get_image_url()
image["status"] = UploadStatusType.SUCCESS.value[0]
image["progress"] = "1.0000"
deleteImageList.append(image_task[2])
# delete_file(image_task[2])
except Exception:
logger.error("文件{}上传失败, 异常: {}, requestId: {}", image_task[2], format_exc(),
requestId)
image["status"] = UploadStatusType.FAILED.value[0]
image["progress"] = image_task[1].get_progress()
for video in videoList:
if video["status"] < UploadStatusType.SUCCESS.value[0]:
num += 1
video_task = videoTask.get(video.get("fileName"))
if video_task:
# 如果任务已经完成
# (video_result, vod, i)
if not video_task[0].done():
video["status"] = UploadStatusType.RUNNING.value[0]
video["progress"] = video_task[1].get_progress()
continue
if video_task[0].done():
try:
video_task[0].result()
video["videoUrl"] = video_task[1].get_video_url()
video["status"] = UploadStatusType.SUCCESS.value[0]
video["progress"] = "1.0000"
deleteVideoList.append(video_task[2])
# delete_file(video_task[2])
except Exception:
logger.error("文件{}上传失败, 异常:{}, requestId: {}", video_task[2], format_exc(),
requestId)
video["status"] = UploadStatusType.FAILED.value[0]
video["progress"] = video_task[1].get_progress()
return num

@staticmethod
def stop_all_task(imageList, imageTask, videoList, videoTask):
for image in imageList:
if image["status"] < UploadStatusType.SUCCESS.value[0]:
image_task = imageTask.get(image.get("fileName"))
# (image_result, oss, i)
if image_task:
if not image_task[0].done():
# 如果任务没有结束, 停止设置取消任务
image_task[1].status = True
for video in videoList:
if video["status"] < UploadStatusType.SUCCESS.value[0]:
video_task = videoTask.get(video.get("fileName"))
if video_task:
# 如果任务已经完成
# (video_result, vod, i)
if not video_task[0].done():
video_task[1].status = True
if video_task[1].uploader is not None:
if video_task[1].uploader.uploader is not None:
video_task[1].uploader.uploader.status = True

@staticmethod
def stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList):
for image in imageList:
if image["status"] < UploadStatusType.SUCCESS.value[0]:
image_task = imageTask.get(image.get("fileName"))
# (image_result, oss, i)
if image_task:
oss = image_task[1]
status = oss.status
image_url = oss.get_image_url()
if image_url is not None and len(image_url) > 0:
image["imageUrl"] = image_url
image["status"] = UploadStatusType.SUCCESS.value[0]
image["progress"] = "1.0000"
deleteImageList.append(image_task[2])
# delete_file(image_task[2])
else:
if status:
image["status"] = UploadStatusType.CANCEL.value[0]
image["progress"] = oss.get_progress()
else:
image["status"] = UploadStatusType.FAILED.value[0]
image["progress"] = oss.get_progress()
for video in videoList:
if video["status"] < UploadStatusType.SUCCESS.value[0]:
video_task = videoTask.get(video.get("fileName"))
if video_task:
# 如果任务已经完成
# (video_result, vod, i)
vod = video_task[1]
status = vod.status
video_url = vod.get_video_url()
if video_url is not None and len(video_url) > 0:
video["videoUrl"] = video_url
video["status"] = UploadStatusType.SUCCESS.value[0]
video["progress"] = "1.0000"
deleteVideoList.append(video_task[2])
# delete_file(video_task[2])
else:
if status:
video["status"] = UploadStatusType.CANCEL.value[0]
video["progress"] = vod.get_progress()
else:
video["status"] = UploadStatusType.FAILED.value[0]
video["progress"] = vod.get_progress()

@staticmethod
def wait_thread(it, vt, requestId):
if it:
it.shutdown()
logger.info("it线程池关闭完成m, requestId:{}", requestId)
if vt:
vt.shutdown()
logger.info("vt线程池关闭完成m, requestId:{}", requestId)

def run(self):
# 文件
imageList, videoList = [], []
deleteImageList, deleteVideoList = [], []
# 线程任务
imageTask, videoTask = {}, {}
# 备份路径
image_backup, video_backup = self.image_backup, self.video_backup
imagePath, videoPath = self.imagePath, self.videoPath
# 队列
fb_queue, hb_queue, event_queue = self.__fb_queue, Queue(), self.__event_queue
requestId, service_config, callback_url = self.__requestId, self.__service_config, self.__callback_url
it, vt, ex, hd = None, None, None, None
try:
init_log(service_config["base_dir"])
logger.info("启动文件上传进程!requestId:{}", requestId)
# 检查文件路径是否存在,不存在则路径有问题
if not exists(imagePath) or not exists(videoPath):
logger.error("图片、视频路径检测失败, 图片或视频路径不存在!requestId:{}", requestId)
logger.error("图片路径: {}, 视频路径: {}, requestId:{}", imagePath, videoPath, requestId)
raise ServiceException(UploadStrExceptionType.FILE_PATH_IS_NOT_AREADLY.value[0],
UploadStrExceptionType.FILE_PATH_IS_NOT_AREADLY.value[1])
# 检查是否有视频和图片,没有直接返回
image_path_array, video_path_array = get_all_images(imagePath), get_all_videos(videoPath)
if len(image_path_array) == 0 and len(video_path_array) == 0:
logger.info("未查询到本地视频及图片文件!!!requestId:{}", requestId)
# upload_http_result(fb_queue, callback_url, requestId, status=UploadTaskStatusType.SUCCESS.value[0])
return
# 启动心跳线程
hd = self.start_hd_thread(imageList, videoList, fb_queue, callback_url, hb_queue, requestId)
# 初始化图片、视频线程池
it = ThreadPoolExecutorNew(max_workers=10, thread_name_prefix='图片线程池')
vt = ThreadPoolExecutorNew(max_workers=2, thread_name_prefix='视频线程线程池')
self.upload_method(service_config, video_path_array, videoTask, videoList, vt, image_path_array, imageTask,
imageList, it)
start_time = time()
while True:
# 超时检查
if time() - start_time >= 43200:
logger.error("上传文件任务执行超时!requestId: {}", requestId)
raise ServiceException(UploadStrExceptionType.TASK_IS_RUN_TIMEOUT.value[0],
UploadStrExceptionType.TASK_IS_RUN_TIMEOUT.value[1])
# 心跳检查
if hd is not None and not hd.is_alive():
logger.error("心跳线程异常停止, requestId:{}", requestId)
raise Exception("心跳线程异常停止!")
event_result = get_no_block_queue(event_queue)
# 终止任务
if event_result is not None and event_result.get("command") == "stop":
# 清空线程队列
it.clear_work_queue()
vt.clear_work_queue()
# 停止所有任务
self.stop_all_task(imageList, imageTask, videoList, videoTask)
# 等待线程任务完成
it.shutdown()
vt.shutdown()
# 更新任务状态
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList)
break
num = 0
num = self.change_status(imageList, num, imageTask, videoList, videoTask, requestId, deleteImageList,
deleteVideoList)
if num == 0:
break
sleep(1)
except ServiceException as s:
ex = s.code, s.msg
it.clear_work_queue()
vt.clear_work_queue()
# 停止所有任务
self.stop_all_task(imageList, imageTask, videoList, videoTask)
it.shutdown()
vt.shutdown()
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList)
logger.error("上传文件任务异常失败: {}, requestId:{}", s.msg, requestId)
except Exception:
ex = UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0], \
UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1]
it.clear_work_queue()
vt.clear_work_queue()
# 停止所有任务
self.stop_all_task(imageList, imageTask, videoList, videoTask)
it.shutdown()
vt.shutdown()
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList)
logger.error("上传文件任务异常失败: {}, requestId:{}", format_exc(), requestId)
finally:
try:
if len(deleteImageList) > 0 or len(deleteVideoList) > 0:
with ThreadPoolExecutor(max_workers=20) as de:
for imageFile in deleteImageList:
logger.error("删除图片文件, 文件名: {}, requestId: {}", imageFile, requestId)
de.submit(delete_file, imageFile)
for videoFile in deleteVideoList:
logger.error("删除视频文件, 文件名: {}, requestId: {}", videoFile, requestId)
de.submit(delete_file, videoFile)
de.shutdown(wait=True)
self.wait_thread(it, vt, requestId)
image_path_array1, video_path_array1 = get_all_images(imagePath), get_all_videos(videoPath)
if len(image_path_array1) > 0 or len(video_path_array1) > 0:
with ThreadPoolExecutor(max_workers=20) as ec:
for i in image_path_array1:
create_dir(image_backup)
logger.error("上传文件失败文件迁移备份, 文件名: {}, requestId: {}", i, requestId)
ec.submit(move_file, i, image_backup)
for i in video_path_array1:
create_dir(video_backup)
logger.error("上传文件失败文件迁移备份, 文件名: {}, requestId: {}", i, requestId)
ec.submit(move_file, i, video_backup)
ec.shutdown(wait=True)
self.stop_hd_thread(hd, hb_queue, requestId)
if ex:
code, msg = ex
upload_http_result(fb_queue, callback_url, requestId, errorCode=code, errorMsg=msg,
status=UploadTaskStatusType.FAILED.value[0], imageList=imageList,
videoList=videoList)
else:
upload_http_result(fb_queue, callback_url, requestId, status=UploadTaskStatusType.SUCCESS.value[0],
imageList=imageList, videoList=videoList)
except Exception:
logger.error("未知异常: {}", format_exc())
self.stop_hd_thread(hd, hb_queue, requestId)
upload_http_result(fb_queue, callback_url, requestId,
errorCode=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0],
errorMsg=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1],
status=UploadTaskStatusType.FAILED.value[0],
imageList=imageList,
videoList=videoList)
finally:
self.stop_hd_thread(hd, hb_queue, requestId)
clear_queue(event_queue, is_ex=False)
clear_queue(hb_queue, is_ex=False)
tmp = join(service_config["base_dir"], r'tmp\oss')
remove_after_create(tmp)
logger.info("上传文件任务完成, requestId: {}", requestId)

+ 0
- 0
concurrency/http/__init__.py 查看文件


二進制
concurrency/http/__pycache__/HttpFeedbackThread.cpython-38.pyc 查看文件


二進制
concurrency/http/__pycache__/HttpFeedbackThread.cpython-39.pyc 查看文件


二進制
concurrency/http/__pycache__/HttpServiceImpl.cpython-38.pyc 查看文件


二進制
concurrency/http/__pycache__/HttpServiceImpl.cpython-39.pyc 查看文件


二進制
concurrency/http/__pycache__/HttpUploadFileProcess.cpython-38.pyc 查看文件


二進制
concurrency/http/__pycache__/HttpUploadFileProcess.cpython-39.pyc 查看文件


二進制
concurrency/http/__pycache__/__init__.cpython-38.pyc 查看文件


二進制
concurrency/http/__pycache__/__init__.cpython-39.pyc 查看文件


+ 0
- 35
concurrency/mqtt/MqttFeedbackThread.py 查看文件

@@ -1,35 +0,0 @@
# -*- coding: utf-8 -*-
import time
from threading import Thread
from traceback import format_exc

from loguru import logger

from util.QueUtil import get_block_queue

'''
问题反馈线程
'''


class FeedbackThread(Thread):
__slots__ = ('__fb_queue', '__mq')

def __init__(self, fb_queue, mq):
super().__init__()
self.__fb_queue = fb_queue
self.__mq = mq

def run(self):
logger.info("启动反馈线程")
fb_queue, mq = self.__fb_queue, self.__mq
while True:
try:
fb = get_block_queue(fb_queue)
if fb is not None and len(fb) > 0:
mq.publish(fb)
else:
time.sleep(1)
except Exception:
logger.error("反馈异常:{}", format_exc())
logger.info("反馈线程执行完成")

+ 0
- 390
concurrency/mqtt/MqttUploadFileProcess.py 查看文件

@@ -1,390 +0,0 @@
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process, Queue
from os.path import join, exists
from threading import Thread
from time import sleep, time
from traceback import format_exc

from loguru import logger

from bean.Feedback import upload_result
from enums.ExceptionEnum import UploadStrExceptionType
from enums.StatusEnum import UploadTaskStatusType, UploadStatusType
from exception.CustomerException import ServiceException
from util.AliyunUtil import OssUtil, VodUtil
from util.FileUtil import get_all_images, get_all_videos, get_file_name, delete_file, create_dir, \
move_file, remove_after_create
from util.LogUtils import init_log
from util.QueUtil import put_queue, get_no_block_queue, clear_queue
from util.ThreadUtil import ThreadPoolExecutorNew
from util.TimeUtils import now_date_to_str, YMDHMS

'''
视频上传
'''


class MqttUploadFileProcess(Process):
__slots__ = ('__fb_queue', '__event_queue', '__service_config', '__requestId', '__current_time', 'videoPath',
'imagePath', 'image_backup', 'video_backup')

def __init__(self, fb_queue, service_config, request_id):
super().__init__()
self.__fb_queue = fb_queue
self.__requestId = request_id
self.__event_queue = Queue()
self.__service_config = service_config
self.__current_time = now_date_to_str(YMDHMS)
backup = join(service_config["file"]["backup"], self.__current_time)
self.videoPath, self.imagePath = service_config["file"]['videoPath'], service_config["file"]['imagePath']
self.image_backup, self.video_backup = join(backup, 'images'), join(backup, 'videos')
upload_result(fb_queue, request_id, status=UploadTaskStatusType.WAITING.value[0])

def sendEvent(self, result):
put_queue(self.__event_queue, result, timeout=2, is_throw_ex=True)

@staticmethod
def uploadImage(oss, uploadPath, filePath):
try:
oss.resumable_upload(uploadPath, filePath)
except Exception as e:
oss.exception = e
logger.error("oss上传文件异常: {}, uploadPath:{}, filePath:{}", format_exc(), uploadPath, filePath)
raise e

@staticmethod
def uploadVideo(vod, file_title, filePath):
try:
vod.get_play_url(filePath, file_title)
except Exception as e:
vod.exception = e
logger.error("oss上传文件异常: {}, file_title:{}, filePath:{}", format_exc(), file_title, filePath)
raise e

@staticmethod
def move_method(file, backup):
move_file(file, backup)

@staticmethod
def hdThread(imageList, videoList, fb_queue, hb_queue, requestId):
try:
logger.info("启动文件上传心跳线程, requestId:{}", requestId)
start_time = time()
count = 0
while True:
if time() - start_time > 43200:
logger.error("心跳线程运行超时!!!!requestId:{}", requestId)
break
command = get_no_block_queue(hb_queue)
if command is not None:
if "stop" == command.get("command"):
logger.info("开始停止心跳线程!!!!requestId:{}", requestId)
break
if count % 5 == 0:
upload_result(fb_queue, requestId, status=UploadTaskStatusType.RUNNING.value[0],
imageList=imageList, videoList=videoList)
count = 0
count += 1
sleep(1)
except Exception:
logger.error("心跳线程异常:{}, requestId:{}", format_exc(), requestId)
logger.info("心跳线程停止完成!requestId:{}", requestId)

def upload_method(self, service_config, video_path_array, videoTask, videoList, vt, image_path_array, imageTask,
imageList, it):
current_time = self.__current_time
for i in video_path_array:
vod = VodUtil(service_config["base_dir"])
filename = get_file_name(i)
video_result = vt.submit(self.uploadVideo, vod, filename, i)
videoTask[filename] = [video_result, vod, i]
videoList.append({"fileName": filename, "videoUrl": "", "status": UploadStatusType.WAITING.value[0],
"progress": vod.get_progress()})
for i in image_path_array:
oss = OssUtil(service_config["base_dir"])
fileName = get_file_name(i)
uploadPath = "%s/%s" % (current_time, fileName)
image_result = it.submit(self.uploadImage, oss, uploadPath, i)
imageTask[fileName] = [image_result, oss, i]
imageList.append({"fileName": fileName, "imageUrl": "", "status": UploadStatusType.WAITING.value[0],
"progress": oss.progress})

def start_hd_thread(self, imageList, videoList, fb_queue, hb_queue, requestId):
hd = Thread(target=self.hdThread, args=(imageList, videoList, fb_queue, hb_queue, requestId))
hd.setDaemon(True)
hd.start()
return hd

@staticmethod
def stop_hd_thread(hb, hb_queue, requestId):
if hb is not None and hb.is_alive():
put_queue(hb_queue, {"command": "stop"}, timeout=2, is_throw_ex=False)
start = time()
hb.join(60)
if time() - start > 60:
logger.error("心跳线程停止超时, requestId:{}", requestId)
clear_queue(hb_queue)

@staticmethod
def change_status(imageList, num, imageTask, videoList, videoTask, requestId, deleteImageList, deleteVideoList):
for image in imageList:
if image["status"] < UploadStatusType.SUCCESS.value[0]:
num += 1
image_task = imageTask.get(image.get("fileName"))
# (image_result, oss, i)
if image_task:
if not image_task[0].done():
image["status"] = UploadStatusType.RUNNING.value[0]
image["progress"] = image_task[1].get_progress()
continue
if image_task[0].done():
try:
image_task[0].result()
image["imageUrl"] = image_task[1].get_image_url()
image["status"] = UploadStatusType.SUCCESS.value[0]
image["progress"] = "1.0000"
deleteImageList.append(image_task[2])
# delete_file(image_task[2])
except Exception:
logger.error("文件{}上传失败, 异常: {}, requestId: {}", image_task[2], format_exc(),
requestId)
image["status"] = UploadStatusType.FAILED.value[0]
image["progress"] = image_task[1].get_progress()
for video in videoList:
if video["status"] < UploadStatusType.SUCCESS.value[0]:
num += 1
video_task = videoTask.get(video.get("fileName"))
if video_task:
# 如果任务已经完成
# (video_result, vod, i)
if not video_task[0].done():
video["status"] = UploadStatusType.RUNNING.value[0]
video["progress"] = video_task[1].get_progress()
continue
if video_task[0].done():
try:
video_task[0].result()
video["videoUrl"] = video_task[1].get_video_url()
video["status"] = UploadStatusType.SUCCESS.value[0]
video["progress"] = "1.0000"
deleteVideoList.append(video_task[2])
# delete_file(video_task[2])
except Exception:
logger.error("文件{}上传失败, 异常:{}, requestId: {}", video_task[2], format_exc(),
requestId)
video["status"] = UploadStatusType.FAILED.value[0]
video["progress"] = video_task[1].get_progress()
return num

@staticmethod
def stop_all_task(imageList, imageTask, videoList, videoTask):
for image in imageList:
if image["status"] < UploadStatusType.SUCCESS.value[0]:
image_task = imageTask.get(image.get("fileName"))
# (image_result, oss, i)
if image_task:
if not image_task[0].done():
# 如果任务没有结束, 停止设置取消任务
image_task[1].status = True
for video in videoList:
if video["status"] < UploadStatusType.SUCCESS.value[0]:
video_task = videoTask.get(video.get("fileName"))
if video_task:
# 如果任务已经完成
# (video_result, vod, i)
if not video_task[0].done():
video_task[1].status = True
if video_task[1].uploader is not None:
if video_task[1].uploader.uploader is not None:
video_task[1].uploader.uploader.status = True

@staticmethod
def stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList):
for image in imageList:
if image["status"] < UploadStatusType.SUCCESS.value[0]:
image_task = imageTask.get(image.get("fileName"))
# (image_result, oss, i)
if image_task:
oss = image_task[1]
status = oss.status
image_url = oss.get_image_url()
if image_url is not None and len(image_url) > 0:
image["imageUrl"] = image_url
image["status"] = UploadStatusType.SUCCESS.value[0]
image["progress"] = "1.0000"
deleteImageList.append(image_task[2])
# delete_file(image_task[2])
else:
if status:
image["status"] = UploadStatusType.CANCEL.value[0]
image["progress"] = oss.get_progress()
else:
image["status"] = UploadStatusType.FAILED.value[0]
image["progress"] = oss.get_progress()
for video in videoList:
if video["status"] < UploadStatusType.SUCCESS.value[0]:
video_task = videoTask.get(video.get("fileName"))
if video_task:
# 如果任务已经完成
# (video_result, vod, i)
vod = video_task[1]
status = vod.status
video_url = vod.get_video_url()
if video_url is not None and len(video_url) > 0:
video["videoUrl"] = video_url
video["status"] = UploadStatusType.SUCCESS.value[0]
video["progress"] = "1.0000"
deleteVideoList.append(video_task[2])
# delete_file(video_task[2])
else:
if status:
video["status"] = UploadStatusType.CANCEL.value[0]
video["progress"] = vod.get_progress()
else:
video["status"] = UploadStatusType.FAILED.value[0]
video["progress"] = vod.get_progress()

@staticmethod
def wait_thread(it, vt, requestId):
if it:
it.shutdown()
logger.info("it线程池关闭完成m, requestId:{}", requestId)
if vt:
vt.shutdown()
logger.info("vt线程池关闭完成m, requestId:{}", requestId)

def run(self):
imageList, videoList = [], []
deleteImageList, deleteVideoList = [], []
# 线程任务
imageTask, videoTask = {}, {}
# 备份路径
image_backup, video_backup = self.image_backup, self.video_backup
imagePath, videoPath = self.imagePath, self.videoPath
# 队列
fb_queue, hb_queue, event_queue = self.__fb_queue, Queue(), self.__event_queue
requestId, service_config = self.__requestId, self.__service_config
it, vt, ex, hd = None, None, None, None
try:
init_log(service_config["base_dir"])
logger.info("启动文件上传进程!requestId:{}", requestId)
# 检查文件路径是否存在,不存在则路径有问题
if not exists(imagePath) or not exists(videoPath):
logger.error("图片、视频路径检测失败, 图片或视频路径不存在!requestId:{}", requestId)
logger.error("图片路径: {}, 视频路径: {}, requestId:{}", imagePath, videoPath, requestId)
raise ServiceException(UploadStrExceptionType.FILE_PATH_IS_NOT_AREADLY.value[0],
UploadStrExceptionType.FILE_PATH_IS_NOT_AREADLY.value[1])
# 检查是否有视频和图片,没有直接返回
image_path_array, video_path_array = get_all_images(imagePath), get_all_videos(videoPath)
if len(image_path_array) == 0 and len(video_path_array) == 0:
logger.info("未查询到本地视频及图片文件!!!requestId:{}", requestId)
# upload_result(fb_queue, requestId, status=UploadTaskStatusType.SUCCESS.value[0])
return
# 启动心跳线程
hd = self.start_hd_thread(imageList, videoList, fb_queue, hb_queue, requestId)
# 初始化图片、视频线程池
it = ThreadPoolExecutorNew(max_workers=10, thread_name_prefix='图片线程池')
vt = ThreadPoolExecutorNew(max_workers=2, thread_name_prefix='视频线程线程池')
self.upload_method(service_config, video_path_array, videoTask, videoList, vt, image_path_array, imageTask,
imageList, it)
start_time = time()
while True:
# 超时检查
if time() - start_time >= 43200:
logger.error("上传文件任务执行超时!requestId: {}", requestId)
raise ServiceException(UploadStrExceptionType.TASK_IS_RUN_TIMEOUT.value[0],
UploadStrExceptionType.TASK_IS_RUN_TIMEOUT.value[1])
# 心跳检查
if hd is not None and not hd.is_alive():
logger.error("心跳线程异常停止, requestId:{}", requestId)
raise Exception("心跳线程异常停止!")
event_result = get_no_block_queue(event_queue)
# 终止任务
if event_result is not None and event_result.get("command") == "stop":
# 清空线程队列
it.clear_work_queue()
vt.clear_work_queue()
# 停止所有任务
self.stop_all_task(imageList, imageTask, videoList, videoTask)
# 等待线程任务完成
it.shutdown()
vt.shutdown()
# 更新任务状态
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList,
deleteVideoList)
break
num = 0
num = self.change_status(imageList, num, imageTask, videoList, videoTask, requestId, deleteImageList,
deleteVideoList)
if num == 0:
break
sleep(1)
except ServiceException as s:
ex = s.code, s.msg
it.clear_work_queue()
vt.clear_work_queue()
# 停止所有任务
self.stop_all_task(imageList, imageTask, videoList, videoTask)
it.shutdown()
vt.shutdown()
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList)
logger.error("上传文件任务异常失败: {}, requestId:{}", s.msg, requestId)
except Exception:
ex = UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0], \
UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1]
it.clear_work_queue()
vt.clear_work_queue()
# 停止所有任务
self.stop_all_task(imageList, imageTask, videoList, videoTask)
it.shutdown()
vt.shutdown()
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList)
logger.error("上传文件任务异常失败: {}, requestId:{}", format_exc(), requestId)
finally:
try:
if len(deleteImageList) > 0 or len(deleteVideoList) > 0:
with ThreadPoolExecutor(max_workers=20) as de:
for imageFile in deleteImageList:
logger.error("删除图片文件, 文件名: {}, requestId: {}", imageFile, requestId)
de.submit(delete_file, imageFile)
for videoFile in deleteVideoList:
logger.error("删除视频文件, 文件名: {}, requestId: {}", videoFile, requestId)
de.submit(delete_file, videoFile)
de.shutdown(wait=True)
self.wait_thread(it, vt, requestId)
image_path_array1, video_path_array1 = get_all_images(imagePath), get_all_videos(videoPath)
if len(image_path_array1) > 0 or len(video_path_array1) > 0:
with ThreadPoolExecutor(max_workers=20) as ec:
for i in image_path_array1:
create_dir(image_backup)
logger.error("上传文件失败文件迁移备份, 文件名: {}, requestId: {}", i, requestId)
ec.submit(move_file, i, image_backup)
for i in video_path_array1:
create_dir(video_backup)
logger.error("上传文件失败文件迁移备份, 文件名: {}, requestId: {}", i, requestId)
ec.submit(move_file, i, video_backup)
ec.shutdown(wait=True)
self.stop_hd_thread(hd, hb_queue, requestId)
if ex:
code, msg = ex
upload_result(fb_queue, requestId, errorCode=code, errorMsg=msg,
status=UploadTaskStatusType.FAILED.value[0], imageList=imageList, videoList=videoList)
else:
upload_result(fb_queue, requestId, status=UploadTaskStatusType.SUCCESS.value[0], imageList=imageList,
videoList=videoList)
except Exception:
logger.error("未知异常: {}", format_exc())
self.stop_hd_thread(hd, hb_queue, requestId)
upload_result(fb_queue, requestId,
errorCode=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0],
errorMsg=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1],
status=UploadTaskStatusType.FAILED.value[0],
imageList=imageList, videoList=videoList)
finally:
self.stop_hd_thread(hd, hb_queue, requestId)
clear_queue(event_queue, is_ex=False)
clear_queue(hb_queue, is_ex=False)
tmp = join(service_config["base_dir"], r'tmp\oss')
remove_after_create(tmp)
logger.info("上传文件任务完成, requestId: {}", requestId)

+ 0
- 0
concurrency/mqtt/__init__.py 查看文件


二進制
concurrency/mqtt/__pycache__/MqttFeedbackThread.cpython-38.pyc 查看文件


二進制
concurrency/mqtt/__pycache__/MqttFeedbackThread.cpython-39.pyc 查看文件


二進制
concurrency/mqtt/__pycache__/MqttUploadFileProcess.cpython-38.pyc 查看文件


二進制
concurrency/mqtt/__pycache__/MqttUploadFileProcess.cpython-39.pyc 查看文件


二進制
concurrency/mqtt/__pycache__/__init__.cpython-38.pyc 查看文件


二進制
concurrency/mqtt/__pycache__/__init__.cpython-39.pyc 查看文件


+ 0
- 12
config/aliyun.yml 查看文件

@@ -1,12 +0,0 @@
access_key: "LTAI5tSJ62TLMUb4SZuf285A"
access_secret: "MWYynm30filZ7x0HqSHlU3pdLVNeI7"
oss:
endpoint: "http://oss-cn-shanghai.aliyuncs.com"
bucket: "th-airprt-media"
host_url: "https://th-airprt-media.oss-cn-shanghai.aliyuncs.com"
vod:
host_address: "https://vod.play.t-aaron.com/"
ecsRegionId: "cn-shanghai"
cateId: 1000499327



+ 0
- 20
config/logger.yml 查看文件

@@ -1,20 +0,0 @@
# 是否启动日志文件记录日志
enable_file_log: true
# 是否启动控制台打印日志
enable_stderr: true
# 日志相对于根路径下的相对路径
base_path: "logs"
# 日志文件名称
log_name: "airport_media.log"
# 日志打印格式
log_fmt: "{time:YYYY-MM-DD HH:mm:ss.SSS} [{level}][{process.name}-{process.id}-{thread.name}-{thread.id}][{line}] {module}-{function} - {message}"
# 日志打印级别
level: "INFO"
# 日志绕接时间
rotation: "00:00"
# 日志保存时间
retention: "7 days"
# 日志编码格式
encoding: "utf8"



+ 0
- 25
config/mqtt.yml 查看文件

@@ -1,25 +0,0 @@
mqtt:
# mqtt客户端id对应每个机场平台的编码
client_id: "THOBS@0000THJSQ232003"
# mqtt用户(根据对接环境修改)
username: "admin"
# mqtt密码(根据对接环境修改)
password: "admin##123"
# mqtt连接地址(根据对接环境修改)
host: "mqtt.t-aaron.com"
# mqtt端口号(根据对接环境修改)
port: 10883
# 长链接时间
keepalive: 60
topic:
upload:
# 文件上传订阅topic(不修改)
sub_topic: "/v1/%s/media/upload"
# 文件上传响应topic(不修改)
res_topic: "/v1/%s/media/result"
easy_rtmp_live:
# EasyRtmpLive启动停止订阅topic(不修改)
sub_topic: "/v1/%s/rtmp/live"
# EasyRtmpLive响应topic(不修改)
res_topic: "/v1/%s/rtmp/result"


+ 0
- 12
config/service.yml 查看文件

@@ -1,12 +0,0 @@
# 1: mqtt消息队列方式对接
# 2: http接口方式对接
docking_method: 1
# 机场视频文件、图片文件、备份路径配置
file:
# 机场录制视频地址
videoPath: "D:\\test\\video"
# 机场截图图片地址
imagePath: "D:\\test\\image"
# 机场视频、图片上传失败备份地址
backup: "D:\\test\\backup"


+ 0
- 25
enums/ExceptionEnum.py 查看文件

@@ -1,25 +0,0 @@
from enum import Enum, unique


@unique
class ExceptionType(Enum):
ILLEGAL_PARAMETER_FORMAT = ("CN000", "参数格式错误!")

SERVICE_INNER_EXCEPTION = ("CN999", "系统内部异常!")


@unique
class UploadStrExceptionType(Enum):
UPLOAD_TASK_IS_AREADLY = ("UT000", "上传任务已存在!")

UPLOAD_TASK_IS_NOT_AREADLY = ("UT001", "上传任务不存在!")

FILE_PATH_IS_NOT_AREADLY = ("UT002", "图片或视频路径不存在!")

TASK_IS_RUN_TIMEOUT = ("UT003", "任务执行超时!")

SERVICE_INNER_EXCEPTION = ("UT999", "系统内部异常!")

@unique
class RtmpExceptionType(Enum):
ILLEGAL_PARAMETER_FORMAT = (-1, "参数格式错误!")

+ 0
- 25
enums/HttpExceptionEnum.py 查看文件

@@ -1,25 +0,0 @@
from enum import Enum, unique


@unique
class UploadExceptionType(Enum):
TASK_IS_EXECUTING = (-1, "任务正在执行, 请稍后再试!")

TASK_NOT_EXISTS = (-1, "任务停止失败, 任务不存在!")


@unique
class EasyExceptionType(Enum):
TASK_IS_EXECUTING = (-1, "任务正在执行, 请稍后再试!")

TASK_NOT_EXISTS = (-1, "任务停止失败, 任务已停止!")

EASY_TASK_GET_FAIL = (-1, "获取EasyRtmpLive任务失败!")

EASY_TASK_LIMIT = (-1, "EasyRtmpLive任务限制, 限制1个任务!")

EASY_TASK_START_FAIL = (-1, "EasyRtmpLive启动失败!")

EASY_TASK_STOP_FAIL = (-1, "EasyRtmpLive停止失败!")

SERVICE_INNER_EXCEPTION = (-1, "系统内部异常!")

+ 0
- 34
enums/StatusEnum.py 查看文件

@@ -1,34 +0,0 @@
from enum import Enum, unique


@unique
class UploadStatusType(Enum):

WAITING = (5, "待上传")

RUNNING = (10, "上传中")

SUCCESS = (15, "完成")

FAILED = (20, "失败")

CANCEL = (25, "取消")


@unique
class UploadTaskStatusType(Enum):

WAITING = (5, "待执行")

RUNNING = (10, "执行中")

SUCCESS = (15, "完成")

TIMEOUT = (20, "超时")

FAILED = (25, "失败")






+ 0
- 0
enums/__init__.py 查看文件


二進制
enums/__pycache__/ExceptionEnum.cpython-38.pyc 查看文件


二進制
enums/__pycache__/ExceptionEnum.cpython-39.pyc 查看文件


二進制
enums/__pycache__/HttpExceptionEnum.cpython-38.pyc 查看文件


二進制
enums/__pycache__/HttpExceptionEnum.cpython-39.pyc 查看文件


二進制
enums/__pycache__/StatusEnum.cpython-38.pyc 查看文件


二進制
enums/__pycache__/StatusEnum.cpython-39.pyc 查看文件


二進制
enums/__pycache__/__init__.cpython-38.pyc 查看文件


二進制
enums/__pycache__/__init__.cpython-39.pyc 查看文件


+ 0
- 19
exception/CustomerException.py 查看文件

@@ -1,19 +0,0 @@
# -*- coding: utf-8 -*-
from loguru import logger


"""
自定义异常
"""


class ServiceException(Exception):
def __init__(self, code, msg):
self.code = code
self.msg = msg

def __str__(self):
logger.error("异常编码:{}, 异常描述:{}", self.code, self.msg)




+ 0
- 0
exception/__init__.py 查看文件


二進制
exception/__pycache__/CustomerException.cpython-38.pyc 查看文件


二進制
exception/__pycache__/CustomerException.cpython-39.pyc 查看文件


二進制
exception/__pycache__/__init__.cpython-38.pyc 查看文件


二進制
exception/__pycache__/__init__.cpython-39.pyc 查看文件


二進制
media.ico 查看文件

Before After

+ 0
- 36
osssdk/__init__.py 查看文件

@@ -1,36 +0,0 @@
__version__ = '2.18.0'

from . import models, exceptions, defaults

from .api import Service, Bucket
from .auth import Auth, AuthV2, AuthV4, AnonymousAuth, StsAuth, AUTH_VERSION_1, AUTH_VERSION_2, AUTH_VERSION_4, \
make_auth, ProviderAuth, ProviderAuthV2, ProviderAuthV4
from .http import Session, CaseInsensitiveDict
from .credentials import EcsRamRoleCredentialsProvider, EcsRamRoleCredential, CredentialsProvider, \
StaticCredentialsProvider

from .iterators import (BucketIterator, ObjectIterator, ObjectIteratorV2,
MultipartUploadIterator, ObjectUploadIterator,
PartIterator, LiveChannelIterator)

from .resumable import resumable_upload, resumable_download, ResumableStore, ResumableDownloadStore, determine_part_size
from .resumable import make_upload_store, make_download_store

from .compat import to_bytes, to_string, to_unicode, urlparse, urlquote, urlunquote

from .utils import SizedFileAdapter, make_progress_adapter
from .utils import content_type_by_name, is_valid_bucket_name, is_valid_endpoint
from .utils import http_date, http_to_unixtime, iso8601_to_unixtime, date_to_iso8601, iso8601_to_date

from .models import BUCKET_ACL_PRIVATE, BUCKET_ACL_PUBLIC_READ, BUCKET_ACL_PUBLIC_READ_WRITE
from .models import SERVER_SIDE_ENCRYPTION_AES256, SERVER_SIDE_ENCRYPTION_KMS, SERVER_SIDE_ENCRYPTION_SM4, \
KMS_DATA_ENCRYPTION_SM4
from .models import OBJECT_ACL_DEFAULT, OBJECT_ACL_PRIVATE, OBJECT_ACL_PUBLIC_READ, OBJECT_ACL_PUBLIC_READ_WRITE
from .models import BUCKET_STORAGE_CLASS_STANDARD, BUCKET_STORAGE_CLASS_IA, BUCKET_STORAGE_CLASS_ARCHIVE, \
BUCKET_STORAGE_CLASS_COLD_ARCHIVE
from .models import BUCKET_VERSIONING_ENABLE, BUCKET_VERSIONING_SUSPEND
from .models import BUCKET_DATA_REDUNDANCY_TYPE_LRS, BUCKET_DATA_REDUNDANCY_TYPE_ZRS

from .crypto import LocalRsaProvider, AliKMSProvider, RsaProvider, EncryptionMaterials
from .crypto_bucket import CryptoBucket


二進制
osssdk/__pycache__/__init__.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/__init__.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/api.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/api.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/auth.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/auth.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/compat.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/compat.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/crc64_combine.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/crc64_combine.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/credentials.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/credentials.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/crypto.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/crypto.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/crypto_bucket.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/crypto_bucket.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/defaults.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/defaults.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/exceptions.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/exceptions.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/headers.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/headers.cpython-39.pyc 查看文件


二進制
osssdk/__pycache__/http.cpython-38.pyc 查看文件


二進制
osssdk/__pycache__/http.cpython-39.pyc 查看文件


+ 0
- 0
osssdk/__pycache__/iterators.cpython-38.pyc 查看文件


部分文件因文件數量過多而無法顯示

Loading…
取消
儲存