Browse Source

上传

pull/1/head
chenyukun 1 year ago
parent
commit
71018cd7ba
5 changed files with 50 additions and 25 deletions
  1. +1
    -1
      config/application.json
  2. +3
    -3
      config/mqtt_config.json
  3. +8
    -2
      service/PushStreamThread.py
  4. +9
    -12
      service/Service.py
  5. +29
    -7
      util/MqttUtil.py

+ 1
- 1
config/application.json View File

@@ -1,4 +1,4 @@
{
"pullUrl": "rtsp://localhost:8554/live",
"pushUrl": "rtmp://192.168.10.101:19350/rlive/stream_127?sign=dueyaUFe"
"pushUrl": "rtmp://221.226.114.142:19350/rlive/stream_127?sign=dueyaUFe"
}

+ 3
- 3
config/mqtt_config.json View File

@@ -1,10 +1,10 @@
{
"client_id": "THOBS@THJSQ03A23045BLPGYC5",
"client_id": "THOBS@00000THJSQ232001",
"username": "",
"password": "",
"host": "127.0.0.1",
"port": 1883,
"keepalive": 60,
"sub_topic": "/v1/THJSQ03A23045BLPGYC5/stream/push",
"res_topic": "/v1/THJSQ03A23045BLPGYC5/stream/result"
"sub_topic": "/v1/00000THJSQ232001/stream/push",
"res_topic": "/v1/00000THJSQ232001/stream/result"
}

+ 8
- 2
service/PushStreamThread.py View File

@@ -9,7 +9,9 @@ from loguru import logger
from enums.ExceptionEnum import ExceptionType
from enums.StatusEnum import StatusType
from exception.CustomerException import ServiceException
from util.PushStreamUtils import PushStreamUtil
from util.QueUtil import put_queue, get_no_block_queue
from util.RWUtils import getConfigs
from util.TimeUtils import now_date_to_str

'''
@@ -21,12 +23,16 @@ from util.TimeUtils import now_date_to_str
class PushStreamThread(Thread):
__slots__ = ('__fbQueue', '__event', '__push_stream_tool', '__hb_status')

def __init__(self, fbQueue, push_stream_tool):
def __init__(self, fbQueue, base_dir, pullUrl, pushUrl):
super().__init__()
self.__fb_queue = fbQueue
self.__event = Queue()
self.__push_stream_tool = push_stream_tool
self.__hb_status = StatusType.WAITTING.value[0]
application_config = getConfigs(base_dir, 'config/application.json')
push_stream_tool = PushStreamUtil(application_config.get("pullUrl"), application_config.get("pushUrl"))
if pullUrl is not None and len(pullUrl) > 0 and pushUrl is not None and len(pushUrl) > 0:
push_stream_tool.set_url(pullUrl, pushUrl)
self.__push_stream_tool = push_stream_tool
put_queue(self.__fb_queue, {
"errorCode": "",
"errorMsg": "",

+ 9
- 12
service/Service.py View File

@@ -11,9 +11,7 @@ from exception.CustomerException import ServiceException
from service.FeedbackThread import FeedbackThread
from service.PushStreamThread import PushStreamThread
from util.MqttUtil import MqttClient
from util.PushStreamUtils import PushStreamUtil
from util.QueUtil import get_no_block_queue, put_queue
from util.RWUtils import getConfigs
from util.TimeUtils import now_date_to_str


@@ -29,33 +27,33 @@ class DispatcherService:
self.start_service()

def start_service(self):
mq = MqttClient(self.__base_dir, self.__msg_queue)
mq = MqttClient(self.__base_dir, self.__msg_queue, self.__fb_queue)
mq.start()
sleep(1)
retry_count = 0
start_time = time()
application_config = getConfigs(self.__base_dir, 'config/application.json')
push_stream_tool = PushStreamUtil(application_config.get("pullUrl"), application_config.get("pushUrl"))

while True:
try:
if self.__task is not None and not self.__task.is_alive():
self.__task = None
retry_count, start_time = self.start_feedback_thread(mq, retry_count, start_time)
if not mq.client.is_connected():
logger.info("mqtt重连中")
mq.stop()
sleep(2)
sleep(4)
mq.client.reconnect()
mq.client.loop_start()
# 订阅消息处理
message = get_no_block_queue(self.__msg_queue)
if message is not None and len(message) > 0:
self.handle_message(message, push_stream_tool)
self.handle_message(message)
else:
sleep(1)
except Exception:
logger.error("推流服务异常: {}", format_exc())

def handle_message(self, message, push_stream_tool):
def handle_message(self, message):
try:
self.check_msg(message)
command = message.get("command")
@@ -70,14 +68,12 @@ class DispatcherService:
return
pullUrl = message.get("pullUrl")
pushUrl = message.get("pushUrl")
if pullUrl is not None and len(pullUrl) > 0 and pushUrl is not None and len(pushUrl) > 0:
push_stream_tool.set_url(message.get("pullUrl"), message.get("pullUrl"))
put_queue(self.__fb_queue, {
"errorCode": "",
"errorMsg": "",
"status": StatusType.INIT.value[0],
"current_time": now_date_to_str()}, is_throw_ex=False)
p_thread = PushStreamThread(self.__fb_queue, push_stream_tool)
p_thread = PushStreamThread(self.__fb_queue, self.__base_dir, pullUrl, pushUrl)
p_thread.setDaemon(True)
p_thread.start()
self.__task = p_thread
@@ -111,7 +107,8 @@ class DispatcherService:
result = v.validate(msg)
if not result:
logger.error("参数校验异常: {}", v.errors)
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0], v.errors)
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
except ServiceException as s:
raise s
except Exception:

+ 29
- 7
util/MqttUtil.py View File

@@ -1,33 +1,40 @@
from json import load, dump, loads, dumps
from json import loads, dumps
from traceback import format_exc

import paho.mqtt.client as mqtt
import time

from loguru import logger

import logging
from enums.ExceptionEnum import ExceptionType
from enums.StatusEnum import StatusType
from util.QueUtil import put_queue
from util.RWUtils import getConfigs
from util.TimeUtils import now_date_to_str


class MqttClient:
__slots__ = ("client", "config", "queue")
__slots__ = ("client", "config", "queue", "fb_queue")

def __init__(self, base_dir, queue):
def __init__(self, base_dir, queue, fb_queue):
self.client = None
self.config = getConfigs(base_dir, "config/mqtt_config.json")
self.queue = queue
self.fb_queue = fb_queue

def create_client(self):
if self.client is None:
try:
self.client = mqtt.Client(client_id=self.config["client_id"], clean_session=True)
self.client.enable_logger(logger=logger)
self.client.enable_logger(logger=logging)
# self.client.disable_logger()
self.client.on_connect = self.on_connect
self.client.on_subscribe = self.on_subscribe
self.client.on_message = self.on_message
# self.client.on_disconnect = self.on_disconnect
# self.client.on_log = self.on_log
self.client.user_data_set({"sub_topic": self.config["sub_topic"], "queue": self.queue})
self.client.user_data_set({"sub_topic": self.config["sub_topic"], "queue": self.queue,
"fb_queue": self.fb_queue})
self.client.username_pw_set(self.config["username"], self.config["password"])
# self.client.will_set("willTopic", dumps({"message", "与服务器断开连接!!"}), 0, False)
self.client.connect(host=self.config["host"], port=self.config["port"], keepalive=self.config["keepalive"])
@@ -60,6 +67,13 @@ class MqttClient:
def on_subscribe(client, userdata, mid, granted_qos):
logger.info("mqtt开始订阅: {},{}!", mid, granted_qos)

# @staticmethod
# def on_disconnect(client, userdata, rc):
# if rc != 0:
# logger.info("mqtt端口连接!开始重新连接!!!!")
# # 重新连接
# client.reconnect()

@staticmethod
def on_publish(client, obj, mid):
logger.info("mqtt开始发布: {},{}!", mid, obj)
@@ -67,13 +81,21 @@ class MqttClient:
# 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。
@staticmethod
def on_message(client, userdata, msg):
fb_queue = userdata.get("fb_queue")
try:
message = loads(msg.payload.decode("utf-8"))
if msg.payload is None or len(msg.payload) == 0:
raise Exception("消息不能为空!")
message = loads(msg.payload.decode())
logger.info("mqtt消息监听, topic:{}, message: {}!", msg.topic, message)
queue = userdata.get("queue")
queue.put(message, timeout=20)
except Exception:
logger.error("mqtt消息监听异常, topic:{}, message: {}, 异常: {}!", msg.topic, msg.payload, format_exc())
put_queue(fb_queue, {
"errorCode": ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
"errorMsg": ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1],
"status": StatusType.FAILED.value[0],
"current_time": now_date_to_str()}, is_throw_ex=False)

# 当客户端有日志信息时调用
@staticmethod

Loading…
Cancel
Save