@@ -319,23 +319,31 @@ class HttpUploadFileProcess(Process): | |||
sleep(1) | |||
except ServiceException as s: | |||
ex = s.code, s.msg | |||
it.clear_work_queue() | |||
vt.clear_work_queue() | |||
if it: | |||
it.clear_work_queue() | |||
if vt: | |||
vt.clear_work_queue() | |||
# 停止所有任务 | |||
self.stop_all_task(imageList, imageTask, videoList, videoTask) | |||
it.shutdown() | |||
vt.shutdown() | |||
if it: | |||
it.shutdown() | |||
if vt: | |||
vt.shutdown() | |||
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList) | |||
logger.error("上传文件任务异常失败: {}, requestId:{}", s.msg, requestId) | |||
except Exception: | |||
ex = UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0], \ | |||
UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1] | |||
it.clear_work_queue() | |||
vt.clear_work_queue() | |||
if it: | |||
it.clear_work_queue() | |||
if vt: | |||
vt.clear_work_queue() | |||
# 停止所有任务 | |||
self.stop_all_task(imageList, imageTask, videoList, videoTask) | |||
it.shutdown() | |||
vt.shutdown() | |||
if it: | |||
it.shutdown() | |||
if vt: | |||
vt.shutdown() | |||
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList) | |||
logger.error("上传文件任务异常失败: {}, requestId:{}", format_exc(), requestId) | |||
finally: |
@@ -322,23 +322,31 @@ class MqttUploadFileProcess(Process): | |||
sleep(1) | |||
except ServiceException as s: | |||
ex = s.code, s.msg | |||
it.clear_work_queue() | |||
vt.clear_work_queue() | |||
if it: | |||
it.clear_work_queue() | |||
if vt: | |||
vt.clear_work_queue() | |||
# 停止所有任务 | |||
self.stop_all_task(imageList, imageTask, videoList, videoTask) | |||
it.shutdown() | |||
vt.shutdown() | |||
if it: | |||
it.shutdown() | |||
if vt: | |||
vt.shutdown() | |||
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList) | |||
logger.error("上传文件任务异常失败: {}, requestId:{}", s.msg, requestId) | |||
except Exception: | |||
ex = UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0], \ | |||
UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1] | |||
it.clear_work_queue() | |||
vt.clear_work_queue() | |||
if it: | |||
it.clear_work_queue() | |||
if vt: | |||
vt.clear_work_queue() | |||
# 停止所有任务 | |||
self.stop_all_task(imageList, imageTask, videoList, videoTask) | |||
it.shutdown() | |||
vt.shutdown() | |||
if it: | |||
it.shutdown() | |||
if vt: | |||
vt.shutdown() | |||
self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList) | |||
logger.error("上传文件任务异常失败: {}, requestId:{}", format_exc(), requestId) | |||
finally: |
@@ -1,6 +1,8 @@ | |||
mqtt: | |||
# mqtt客户端id对应每个机场平台的编码 | |||
client_id: "THOBS@0000THJSQ232003" | |||
client_id: "THOBS@0000THJSQ232004" | |||
# 机场平台编号 | |||
airport_code: "0000THJSQ232004" | |||
# mqtt用户(根据对接环境修改) | |||
username: "admin" | |||
# mqtt密码(根据对接环境修改) | |||
@@ -13,13 +15,13 @@ mqtt: | |||
keepalive: 60 | |||
topic: | |||
upload: | |||
# 文件上传订阅topic(不修改) | |||
# 文件上传订阅topic(不修改) %s代码会自动替换为airport_code | |||
sub_topic: "/v1/%s/media/upload" | |||
# 文件上传响应topic(不修改) | |||
# 文件上传响应topic(不修改)%s代码会自动替换为airport_code | |||
res_topic: "/v1/%s/media/result" | |||
easy_rtmp_live: | |||
# EasyRtmpLive启动停止订阅topic(不修改) | |||
# EasyRtmpLive启动停止订阅topic(不修改) %s代码会自动替换为airport_code | |||
sub_topic: "/v1/%s/rtmp/live" | |||
# EasyRtmpLive响应topic(不修改) | |||
# EasyRtmpLive响应topic(不修改) %s代码会自动替换为airport_code | |||
res_topic: "/v1/%s/rtmp/result" | |||
@@ -100,6 +100,7 @@ async def stopPushStream(): | |||
channel = channelList[0] | |||
if channel["enable"] == "false": | |||
logger.info("EasyRtmpLive任务已停止") | |||
return JsonResult.success() | |||
channel["enable"] = "false" | |||
channel["srcURL"] = '"%s"' % channel["srcURL"] | |||
req_param = 'indexCode=%s&name=%s&srcURL=%s&connectType=%s&timeout=5&mediaType=%s&dstURL=%s&dstFormat=%s&enable=%s' % \ |
@@ -142,6 +142,8 @@ def handle_rtmp(msg, task, fb_queue, service_config): | |||
if 'stop' == command: | |||
if channel["enable"] == "false": | |||
logger.info("EasyRtmpLive任务已停止!") | |||
rtmp_result(fb_queue, 0, "操作成功!") | |||
return | |||
channel["enable"] = "false" | |||
channel["srcURL"] = '"%s"' % channel["srcURL"] | |||
req_param = 'indexCode=%s&name=%s&srcURL=%s&connectType=%s&timeout=5&mediaType=%s&dstURL=%s&dstFormat=%s&enable=%s' % \ |
@@ -20,8 +20,8 @@ class MqttClient: | |||
self.client = None | |||
self.config = getConfigs(base_dir, "config/mqtt.yml") | |||
self.res_topic = { | |||
"upload": self.config["topic"]["upload"]["res_topic"] % self.config["mqtt"]["client_id"], | |||
"rtmp": self.config["topic"]["easy_rtmp_live"]["res_topic"] % self.config["mqtt"]["client_id"] | |||
"upload": self.config["topic"]["upload"]["res_topic"] % self.config["mqtt"]["airport_code"], | |||
"rtmp": self.config["topic"]["easy_rtmp_live"]["res_topic"] % self.config["mqtt"]["airport_code"] | |||
} | |||
self.msg_queue = msg_queue | |||
self.fb_queue = fb_queue | |||
@@ -30,6 +30,7 @@ class MqttClient: | |||
if self.client is None: | |||
try: | |||
client_id = self.config["mqtt"]["client_id"] | |||
airport_code = self.config["mqtt"]["airport_code"] | |||
self.client = mqtt.Client(client_id=client_id, clean_session=True) | |||
# self.client.enable_logger(logger=logging) | |||
self.client.disable_logger() | |||
@@ -38,8 +39,8 @@ class MqttClient: | |||
self.client.on_message = self.on_message | |||
# self.client.on_disconnect = self.on_disconnect | |||
# self.client.on_log = self.on_log | |||
upload_topic = self.config["topic"]["upload"]["sub_topic"] % client_id | |||
rtmp_topic = self.config["topic"]["easy_rtmp_live"]["sub_topic"] % client_id | |||
upload_topic = self.config["topic"]["upload"]["sub_topic"] % airport_code | |||
rtmp_topic = self.config["topic"]["easy_rtmp_live"]["sub_topic"] % airport_code | |||
self.client.user_data_set({ | |||
"sub_upload_topic": upload_topic, | |||
"sub_rtmp_topic": rtmp_topic, |