|
- using Newtonsoft.Json;
- using System;
- using log4net;
- using log4net.Config;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using uPLibrary.Networking.M2Mqtt.Messages;
- using uPLibrary.Networking.M2Mqtt;
- using System.Runtime.InteropServices;
- using System.Reflection;
- using System.Net.Http;
- using System.Web.UI.WebControls;
- using System.Threading;
- using static THUploadOss.Common;
- using System.Net;
- using System.IO;
- using Newtonsoft.Json.Linq;
-
- namespace THUploadOss
- {
- public class THMqttClinet
- {
- public MqttClient mqttClient;
- public string m_brokerAddress; // MQTT Broker的地址 106.15.120.154 "192.168.253.130"
- public int m_brokerPort; // MQTT Broker的端口号
- public string m_clientId; // 客户端的ID,机场编号
- public string m_airportCode; // 控制编号
- public string m_username; // MQTT Broker的用户名
- public string m_password; // MQTT Broker的密码
- public string m_uploadSubTopic; // 文件上传订阅 topic
- public string m_uploadResTopic; // 文件上传响应 topic
- public string m_rtmpSubTic; // rtmp订阅 topic
- public string m_rtmpResTopic; // rtmp响应 topic
-
- private static readonly ILog log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- public void MqttClient_MqttMsgPublished(object sender, MqttMsgPublishedEventArgs e)
- {
- //Console.WriteLine("[" + DateTime.Now.ToString() + "]" + "消息已发布!");
- //log.Info("[" + DateTime.Now.ToString() + "]" + "主题:" + msg.Topic + " 收到消息:" + jsonObj.command + " " + jsonObj.requestId);
- //发布时会触发:用来记录发布的日志??????????????????????
- }
-
- public void MqttClient_MqttMsgReceived(object sender, MqttMsgPublishEventArgs msg)
- {
- string message = Encoding.UTF8.GetString(msg.Message);
- dynamic jsonObj = JsonConvert.DeserializeObject(message);
-
- Logger.WriteInfo("THMqttClinet: MqttClient_MqttMsgReceived-主题:" + msg.Topic + " 收到消息:" + message);
- if (msg.Topic == m_uploadSubTopic && jsonObj.command == "start" )//有任务正在执行中,则不能执行此操作需要返回????????????????
- {
- if (StsrtDownload != -1)
- {
- Logger.WriteInfo("THMqttClinet: 有任务正在进行中,MqttClient_MqttMsgReceived-主题:" + msg.Topic + " 收到消息:" + message);
- return;
- }
- RequestId = jsonObj.requestId;
- DownloadFinsh = 0;
- StsrtDownload = 1;
- Status = (int)UPLOAD_STATUS.Preparation;
- lock (lockFileList)
- {
- for (int i = FileList.Count - 1; i >= 0; i--)
- {
- FileList.Remove(FileList[i]);
- }
- }
- }
- if (msg.Topic == m_uploadSubTopic && jsonObj.command == "stop")
- {
- DownloadFinsh = -1;
- StsrtDownload = -1;
- //Status = (int)UPLOAD_STATUS.Ccancellation;
- }
-
- if (msg.Topic == m_rtmpSubTic && jsonObj.command == "start")
- {
- if (OpenRtmpChannel())
- PublishRtmpTopic((int)TRMP.RTMP_OK);
- else
- PublishRtmpTopic((int)TRMP.RTMP_START_ACTION_FAIL);
- }
- if (msg.Topic == m_rtmpSubTic && jsonObj.command == "stop")
- {
- if (CloseRtmpChannel())
- PublishRtmpTopic((int)TRMP.RTMP_OK);
- else
- PublishRtmpTopic((int)TRMP.RTMP_STOP_ACTION_FAIL);
- }
- }
- public bool InitMqttClient()
- {
- if (!getMqttConfig())
- return false;
- try
- {
- mqttClient = new MqttClient(m_brokerAddress, m_brokerPort, false, null, null, MqttSslProtocols.None);
-
- mqttClient.MqttMsgPublished += MqttClient_MqttMsgPublished;
- mqttClient.MqttMsgPublishReceived += MqttClient_MqttMsgReceived;
-
- if(MqttConnect())
- return true;
- else
- return false;
- }
- catch (Exception ex)
- {
- Logger.WriteError("THMqttClinet: InitMqttClient-MQTT发生错误:" + ex.Message.ToString());
- System.Threading.Thread.Sleep(1000);
- return false;
- }
- }
-
- public bool MqttConnect()
- {
- try
- {
- mqttClient.Connect(m_clientId, m_username, m_password);
- System.Threading.Thread.Sleep(2000);
- if (mqttClient.IsConnected)
- {
- mqttClient.Subscribe(new string[] { m_uploadSubTopic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
- mqttClient.Subscribe(new string[] { m_rtmpSubTic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
- Logger.WriteInfo("THMqttClinet: MqttConnect-连接到MQTT Broker");
- return true;
- }
- else
- {
- Logger.WriteError("THMqttClinet: MqttConnect-无法连接到MQTT Broker!");
- System.Threading.Thread.Sleep(1000);
- return false;
- }
- }
- catch (Exception ex)
- {
- Logger.WriteError("THMqttClinet: MqttConnect-MQTT发生错误:" + ex.Message);
- System.Threading.Thread.Sleep(1000);
- return false;
- }
- }
-
- public int PublishUploadTopic()
- {
- Logger.WriteError("MQTT PublishUploadTopic: StsrtDownload:" + StsrtDownload + "Status:"+ Status);
- if (StsrtDownload != -1 || Status != (int)UPLOAD_STATUS.Preparation)
- {
- JObject data = new JObject
- {
- { "requestId", RequestId },
- { "errorCode", "" },
- { "errorMsg", "" },
- { "status", Status },// 状态:待执行 5 执行中 10 完成 15 超时 20 失败 25
- { "currentTime", DateTime.Now.ToString() }
- };
- JArray imgjArray = new JArray();
- JArray videojArray = new JArray();
- lock (lockFileList)
- {
- foreach (FileStates file in FileList)
- {
- if (file.fileType == "pic")
- {
- JObject img = new JObject
- {
- { "fileName", file.fileName},
- { "imageUrl",file.url },
- { "status",file.status },
- { "progress",file.progress}
- };
- imgjArray.Add(img);
- }
- if (file.fileType == "video")
- {
- JObject video = new JObject
- {
- { "fileName", file.fileName},
- { "videoUrl",file.url },
- { "status",file.status },
- { "progress",file.progress }
- };
- videojArray.Add(video);
- }
- }
- }
- data.Add("imageList", new JArray(imgjArray));
- data.Add("videoList", new JArray(videojArray));
- // 将对象序列化为 JSON 字符串
- string json = JsonConvert.SerializeObject(data);
-
- Logger.WriteInfo("THMqttClinet: PublishUploadTopic:" + json);
- mqttClient.Publish(m_uploadResTopic, Encoding.UTF8.GetBytes(json), MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);
- if (Status == (int)UPLOAD_STATUS.Complete)
- Status = (int)UPLOAD_STATUS.Preparation;
- if (Status == (int)UPLOAD_STATUS.Fail)
- Status = (int)UPLOAD_STATUS.Preparation;
- if (Status == (int)UPLOAD_STATUS.Ccancellation)
- Status = (int)UPLOAD_STATUS.Preparation;
- }
- System.Threading.Thread.Sleep(5000);
- return 0;
- }
- public int PublishRtmpTopic(int ret)
- {
- var data = new { code = ret , msg = RTMP_RESULT[ret], data = "null" };
-
- string json = JsonConvert.SerializeObject(data);
-
- Logger.WriteInfo("THMqttClinet: PublishRtmpTopic:" + json);
- mqttClient.Publish(m_rtmpResTopic, Encoding.UTF8.GetBytes(json), MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);
-
- return 0;
- }
- public bool CloseRtmpChannel()
- {
- // 创建HttpClient实例
- HttpClient client = new HttpClient();
- try
- {
- // 发送GET请求并获取响应
- HttpResponseMessage response = client.GetAsync("http://localhost:19610/api/v1/getChannelList?offset=0&row=50").Result;
- // 检查响应是否成功
- if (response.IsSuccessStatusCode)
- {
- // 读取响应内容
- string content = response.Content.ReadAsStringAsync().Result;
- Logger.WriteInfo("CloseRtmpChannel: "+content);
-
- dynamic jsonObj = JsonConvert.DeserializeObject(content);
- string srcURL = jsonObj.ChannelList[0].srcURL;
- srcURL = WebUtility.UrlEncode(srcURL).Replace("%3A", ":");
- string dstURL = jsonObj.ChannelList[0].dstURL;
- dstURL = WebUtility.UrlEncode(dstURL).Replace("%3A", ":");
-
- string closeUrl = "http://127.0.0.1:19610/api/v1/updateChannel?indexCode=" + jsonObj.ChannelList[0].indexcode + "&name=" + jsonObj.ChannelList[0].name + "&srcURL=%22" + srcURL + "%22&connectType=" + jsonObj.ChannelList[0].connectType + "&timeout=" + jsonObj.ChannelList[0].connectTimeout + "&mediaType=video" + "&dstURL=" + dstURL + "&dstFormat=rtmp&enable=false";
-
- // 输出响应内容
- HttpResponseMessage closeResponse = client.GetAsync(closeUrl).Result;
- if (closeResponse.IsSuccessStatusCode)
- {
- string closeContent = closeResponse.Content.ReadAsStringAsync().Result;
- Logger.WriteInfo("CloseRtmpChannel: "+closeContent);
- if (closeContent == "OK")
- return true;
- else
- return false;
- }
- }
- else
- {
- Logger.WriteError("THMqttClinet: CloseRtmpChannel-请求失败,状态码:" + response.StatusCode);
- return false;
- }
- }
- catch (Exception e)
- {
- Logger.WriteError($"THMqttClinet: CloseRtmpChannel-其他错误发生: {e.Message}");
- return false;
- }
- return false;
- }
- public bool OpenRtmpChannel()
- {
- // 创建HttpClient实例
- HttpClient client = new HttpClient();
- try
- {
- // 发送GET请求并获取响应
- HttpResponseMessage response = client.GetAsync("http://localhost:19610/api/v1/getChannelList?offset=0&row=50").Result;
-
- // 检查响应是否成功
- if (response.IsSuccessStatusCode)
- {
- // 读取响应内容
- string content = response.Content.ReadAsStringAsync().Result;
- // 输出响应内容
- Logger.WriteInfo("OpenRtmpChannel: "+content);
- dynamic jsonObj = JsonConvert.DeserializeObject(content);
- string srcURL = jsonObj.ChannelList[0].srcURL;
- srcURL = WebUtility.UrlEncode(srcURL).Replace("%3A", ":");
- string dstURL = jsonObj.ChannelList[0].dstURL;
- dstURL = WebUtility.UrlEncode(dstURL).Replace("%3A", ":");
-
- string openUrl = "http://127.0.0.1:19610/api/v1/updateChannel?indexCode=" + jsonObj.ChannelList[0].indexcode + "&name=" + jsonObj.ChannelList[0].name + "&srcURL=%22" + srcURL + "%22&connectType=" + jsonObj.ChannelList[0].connectType + "&timeout=" + jsonObj.ChannelList[0].connectTimeout + "&mediaType=video" + "&dstURL=" + dstURL + "&dstFormat=rtmp&enable=true";
- // 输出响应内容
- HttpResponseMessage openResponse = client.GetAsync(openUrl).Result;
- if (openResponse.IsSuccessStatusCode)
- {
- string closeContent = openResponse.Content.ReadAsStringAsync().Result;
- Logger.WriteInfo("OpenRtmpChannel: "+closeContent);
- if (closeContent == "OK")
- return true;
- else
- return false;
- }
- }
- else
- {
- Logger.WriteError("THMqttClinet: OpenRtmpChannel-请求失败,状态码:" + response.StatusCode);
- return false;
- }
- }
- catch (Exception e)
- {
- Logger.WriteError($"THMqttClinet: OpenRtmpChannel-其他错误发生: {e.Message}");
- return false;
- }
- return false;
- }
- public bool getMqttConfig()
- {
- if (System.IO.File.Exists(ConfigPath))
- {
- try {
- StringBuilder brokerAddress = new StringBuilder(20);
- GetPrivateProfileString("MQTT", "brokerAddress", "配置文件存在,读取未成功!", brokerAddress, 255, ConfigPath);
- m_brokerAddress = brokerAddress.ToString();
-
- StringBuilder brokerPort = new StringBuilder(20);
- GetPrivateProfileString("MQTT", "brokerPort", "配置文件存在,读取未成功!", brokerPort, 255, ConfigPath);
- m_brokerPort = int.Parse(brokerPort.ToString());
-
- StringBuilder airportCode = new StringBuilder(100);
- GetPrivateProfileString("MQTT", "airportCode", "配置文件存在,读取未成功!", airportCode, 255, ConfigPath);
- m_airportCode = airportCode.ToString();
-
- StringBuilder clientId = new StringBuilder(100);
- GetPrivateProfileString("MQTT", "clientId", "配置文件存在,读取未成功!", clientId, 255, ConfigPath);
- m_clientId = clientId.ToString();
-
- StringBuilder username = new StringBuilder(100);
- GetPrivateProfileString("MQTT", "username", "配置文件存在,读取未成功!", username, 255, ConfigPath);
- m_username = username.ToString();
-
- StringBuilder password = new StringBuilder(100);
- GetPrivateProfileString("MQTT", "password", "配置文件存在,读取未成功!", password, 255, ConfigPath);
- m_password = password.ToString();
-
- StringBuilder uploadSubTopic = new StringBuilder(100);
- GetPrivateProfileString("MQTT", "uploadSubTopic", "配置文件存在,读取未成功!", uploadSubTopic, 255, ConfigPath);
- m_uploadSubTopic = uploadSubTopic.ToString();
- m_uploadSubTopic = m_uploadSubTopic.Replace("&", m_airportCode);
-
- StringBuilder uploadResTopic = new StringBuilder(100);
- GetPrivateProfileString("MQTT", "uploadResTopic", "配置文件存在,读取未成功!", uploadResTopic, 255, ConfigPath);
- m_uploadResTopic = uploadResTopic.ToString();
- m_uploadResTopic = m_uploadResTopic.Replace("&", m_airportCode);
-
- StringBuilder rtmpSubTic = new StringBuilder(100);
- GetPrivateProfileString("MQTT", "rtmpSubTic", "配置文件存在,读取未成功!", rtmpSubTic, 255, ConfigPath);
- m_rtmpSubTic = rtmpSubTic.ToString();
- m_rtmpSubTic = m_rtmpSubTic.Replace("&", m_airportCode);
-
- StringBuilder rtmpResTopic = new StringBuilder(100);
- GetPrivateProfileString("MQTT", "rtmpResTopic", "配置文件存在,读取未成功!", rtmpResTopic, 255, ConfigPath);
- m_rtmpResTopic = rtmpResTopic.ToString();
- m_rtmpResTopic = m_rtmpResTopic.Replace("&", m_airportCode);
-
- return true;
- }
- catch (Exception ex)
- {
- Logger.WriteError("THMqttClinet: getMqttConfig" + ex.Message.ToString());
- return false;
- }
- }
- return false;
- }
- }
- }
|