Browse Source

任务来源:机场

任务描述:告警通知对接硬件MQTT
develop
wubin 1 year ago
parent
commit
a823264156
4 changed files with 178 additions and 105 deletions
  1. +18
    -17
      tuoheng-admin/sql/V2/tuoheng_airport_platform_DDL.sql
  2. +2
    -2
      tuoheng-admin/src/main/java/com/tuoheng/admin/mapper/AlarmMapper.java
  3. +3
    -3
      tuoheng-admin/src/main/java/com/tuoheng/admin/pojo/entity/AlarmLog.java
  4. +155
    -83
      tuoheng-admin/src/main/java/com/tuoheng/admin/service/mqttService/consumer/topicHandle/alarm/AlarmTopicHandleService.java

+ 18
- 17
tuoheng-admin/sql/V2/tuoheng_airport_platform_DDL.sql View File

@@ -1,17 +1,18 @@
DROP TABLE IF EXISTS `th_alarm`;
CREATE TABLE `th_alarm` (
`id` int(0) UNSIGNED NOT NULL AUTO_INCREMENT,
`airport_id` int(0) NULL DEFAULT NULL COMMENT '机场ID',
`edge_Id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '控制板id',
`timestamp` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '时间戳,日志发送的时间 (单位: S)',
`type` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '日志类型(log: 常规日志;error: 错误日志; critical: 紧急日志)',
`data` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '消息主体',
`critical` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '紧急消息主体',
`result` int(0) NULL DEFAULT NULL COMMENT '是否处理, 1 : 已处理, 0: 未处理',
`create_user` int(0) NULL DEFAULT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
`update_user` int(0) NULL DEFAULT NULL,
`update_time` datetime(0) NULL DEFAULT NULL,
`mark` int(0) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 15 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '机场告警记录表' ROW_FORMAT = Dynamic;
DROP TABLE IF EXISTS `th_alarm_log`;
CREATE TABLE `th_alarm_log` (
`id` int(0) UNSIGNED NOT NULL AUTO_INCREMENT,
`airport_id` int(0) NULL DEFAULT NULL COMMENT '机场ID',
`edge_Id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '控制板id',
`timestamp` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '时间戳,日志发送的时间 (单位: S)',
`type` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '日志类型(log: 常规日志;error: 错误日志; critical: 紧急日志)',
`data` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '消息主体',
`critical` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '紧急消息主体',
`result` int(0) NULL DEFAULT NULL COMMENT '是否处理, 1 : 已处理, 0: 未处理',
`create_user` int(0) NULL DEFAULT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
`update_user` int(0) NULL DEFAULT NULL,
`update_time` datetime(0) NULL DEFAULT NULL,
`mark` int(0) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `idx_airport_id`(`airport_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 20 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '机场告警记录表' ROW_FORMAT = Dynamic;

+ 2
- 2
tuoheng-admin/src/main/java/com/tuoheng/admin/mapper/AlarmMapper.java View File

@@ -1,7 +1,7 @@
package com.tuoheng.admin.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.tuoheng.admin.pojo.entity.Alarm;
import com.tuoheng.admin.pojo.entity.AlarmLog;


/**
@@ -10,6 +10,6 @@ import com.tuoheng.admin.pojo.entity.Alarm;
* @Description: 机场告警数据 Mapper
* @Version: 1.0
*/
public interface AlarmMapper extends BaseMapper<Alarm> {
public interface AlarmMapper extends BaseMapper<AlarmLog> {

}

tuoheng-admin/src/main/java/com/tuoheng/admin/pojo/entity/Alarm.java → tuoheng-admin/src/main/java/com/tuoheng/admin/pojo/entity/AlarmLog.java View File

@@ -9,14 +9,14 @@ import lombok.experimental.Accessors;
/**
* @Author: 吴彬
* @CreateTime: 2023-06-12 11:31
* @Description: 机场告警数据
* @Description: 机场告警记录
* @Version: 1.0
*/
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@TableName("th_alarm")
public class Alarm extends BaseEntity {
@TableName("th_alarm_log")
public class AlarmLog extends BaseEntity {

private static final long serialVersionUID = 1L;


+ 155
- 83
tuoheng-admin/src/main/java/com/tuoheng/admin/service/mqttService/consumer/topicHandle/alarm/AlarmTopicHandleService.java View File

@@ -3,14 +3,12 @@ package com.tuoheng.admin.service.mqttService.consumer.topicHandle.alarm;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.tuoheng.admin.enums.AlarmTypeEnum;
import com.tuoheng.admin.enums.MQTTTopicEnum;
import com.tuoheng.admin.enums.ServiceExceptionEnum;
import com.tuoheng.admin.mapper.AirportMapper;
import com.tuoheng.admin.mapper.AlarmMapper;
import com.tuoheng.admin.pojo.entity.Airport;
import com.tuoheng.admin.pojo.entity.Alarm;
import com.tuoheng.admin.pojo.entity.AlarmLog;
import com.tuoheng.admin.pojo.entity.Bnconnect;
import com.tuoheng.admin.service.bnconnect.IBnconnectService;
import com.tuoheng.admin.service.mqttpush.IMqttPushService;
@@ -18,17 +16,18 @@ import com.tuoheng.admin.service.mqttService.HttpURLConnectionUtil;
import com.tuoheng.admin.service.mqttService.consumer.topicHandle.ITopicHandleService;
import com.tuoheng.admin.utils.SpringUtil;
import com.tuoheng.common.enums.MarkStatusEnum;
import com.tuoheng.common.utils.DateUtils;
import com.tuoheng.common.utils.JsonResult;
import com.tuoheng.common.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

/**
* @Author: 吴彬
@@ -43,11 +42,24 @@ public class AlarmTopicHandleService implements ITopicHandleService {
@Resource
private AlarmMapper alarmMapper;

@Autowired
private StringRedisTemplate redisTemplate;

/**
* DSP 告警服务URI
*/
private final static String DSP_ALARM_URI = "/alarm/alarmData";

/**
* DSP 平台编码
*/
private final static String DSP_PLATFORMCODE = "001";

/**
* DSP 告警模板
*/
private final static String DSP_ALARM_TEMPLATE = "您好,您所管辖的机场:${name},正在发生告警,告警内容是:${msg},请及时处理!";

/**
* 机场日志记录topic
*/
@@ -68,94 +80,154 @@ public class AlarmTopicHandleService implements ITopicHandleService {
@Override
public void topicHandle(MqttMessage message, String edgId) {
/**
* 1、记录告警日志
* 2、调用DSP接口上报告警数据
* 3、发送MQTT消息告警已处理
* 4、DSP调用成功更新告警记录为已处理
* 常规日志:
* 打印日志即可
* 错误日志:
* 记录数据库
* 紧急日志:
* 1、记录告警日志
* 2、调用DSP接口上报告警数据
* 3、发送MQTT消息告警已处理
* 4、DSP调用成功更新告警记录为已处理
*/
JSONObject jsonObject = (JSONObject) JSONObject.parse(message.getPayload());

if (ObjectUtil.isEmpty(jsonObject.get("type")) || ObjectUtil.isEmpty(jsonObject.get("critical"))) {
log.error("告警日志类型或紧急告警内容为空,消息无需处理!");
if (ObjectUtil.isEmpty(jsonObject.get("type"))) {
log.error("告警日志类型为空,消息无需处理!");
return;
}

if (AlarmTypeEnum.ALARM_LOG.getType().equals(jsonObject.get("type").toString())) {
log.info("常规日志无需处理!");
log.info("常规日志内容:{}", jsonObject);
}

//获取机场信息
AirportMapper airportMapper = SpringUtil.getBean(AirportMapper.class);
IBnconnectService bnconnectService = SpringUtil.getBean(IBnconnectService.class);
List<Airport> airports = airportMapper.selectList(new LambdaQueryWrapper<Airport>()
.eq(Airport::getMark, MarkStatusEnum.VALID.getCode())
.eq(Airport::getEdgeId, edgId));
if (CollectionUtils.isEmpty(airports)) {
log.error("控制板ID:{},对应机场信息不存在!", edgId);
return;
}
if (AlarmTypeEnum.ALARM_CRITICAL.getType().equals(jsonObject.get("type").toString())) {
//获取机场信息
AirportMapper airportMapper = SpringUtil.getBean(AirportMapper.class);
IBnconnectService bnconnectService = SpringUtil.getBean(IBnconnectService.class);
List<Airport> airports = airportMapper.selectList(new LambdaQueryWrapper<Airport>()
.eq(Airport::getMark, MarkStatusEnum.VALID.getCode())
.eq(Airport::getEdgeId, edgId));
Bnconnect bnconnect = bnconnectService.selectByCode("alarmDsp");

String critical = jsonObject.get("critical") == null ? "" : jsonObject.get("critical").toString();
String data = jsonObject.get("critical") == null ? "" : jsonObject.get("critical").toString();
String time = jsonObject.get("timestamp") == null ? "0" : jsonObject.get("timestamp").toString();
Bnconnect bnconnect = bnconnectService.selectByCode("alarmDsp");

String critical = jsonObject.get("critical") == null ? "" : jsonObject.get("critical").toString();
String data = jsonObject.get("critical") == null ? "" : jsonObject.get("critical").toString();
String time = jsonObject.get("timestamp") == null ? "0" : jsonObject.get("timestamp").toString();

//记录告警日志
AlarmLog alarmLog = new AlarmLog();
alarmLog.setAirportId(airports.get(0).getId());
alarmLog.setEdgeId(edgId);
alarmLog.setCritical(critical);
alarmLog.setData(data);
alarmLog.setTimestamp(time);
Date createDate = new Date(Long.valueOf(time) * 1000L);//注意硬件传过来的时间戳是10位的,故需要乘以1000
alarmLog.setCreateTime(createDate);
alarmLog.setUpdateTime(createDate);

if (AlarmTypeEnum.ALARM_ERROR.getType().equals(jsonObject.get("type").toString())) {
/**
* 错误日志只记录数据库
*/
alarmLog.setType(AlarmTypeEnum.ALARM_ERROR.getType());
this.saveAlarmLog(alarmLog);
} else if (AlarmTypeEnum.ALARM_CRITICAL.getType().equals(jsonObject.get("type").toString())) {
if(ObjectUtil.isEmpty(jsonObject.get("critical"))){
log.info("紧急告警内容为空,消息无需处理!");
return;
}
/**
* 告警日志记录数据库+上报DSP
*/
IMqttPushService mqttService = SpringUtil.getBean(IMqttPushService.class);
try {
if (airports.size() > 0 && bnconnect != null) {
String airportName = airports.get(0).getName();

//记录告警日志
Alarm alarm = new Alarm();
alarm.setAirportId(airports.get(0).getId());
alarm.setEdgeId(edgId);
alarm.setCritical(critical);
alarm.setData(data);
alarm.setType(AlarmTypeEnum.ALARM_CRITICAL.getType());
alarm.setTimestamp(time);
alarm.setResult(Alarm.HANDLE_NOT);
Integer insertResult = alarmMapper.insert(alarm);
if (insertResult <= 0) {
log.error("异常告警数据:{},记录入库异常:", time);
return;
}
log.info("异常告警数据:{},记录入库成功!", time);
//调用DSP,上报异常告警数据
HttpURLConnectionUtil httpURLConnectionUtil = new HttpURLConnectionUtil();
//dps请求地址
String url = bnconnect.getApiUrl() + DSP_ALARM_URI;
//请求参数
List parm = new ArrayList();
Map parmMap = new HashMap();
parmMap.put("platformCode", "001");
Map parmMapTemplate = new HashMap();
parmMapTemplate.put("name", airportName);
parmMapTemplate.put("msg", critical);
parmMap.put("alarmData", JSONObject.toJSONString(parmMapTemplate));
parmMap.put("alarmTemplate", "您好,您所管辖的机场:${name},正在发生告警,告警内容是:${msg},请及时处理!");
parm.add(parmMap);
String jsonparm = JSONObject.toJSONString(parm);
log.info("请求地址{},请求类型put,请求体{}", url, jsonparm);
String dspResponse = httpURLConnectionUtil.doPut(url, jsonparm);
log.info("请求返回值{}", dspResponse);
if (StringUtils.isEmpty(dspResponse)) {
log.error("异常告警数据上报调用DSP返回结果为空!");
throw new RuntimeException("异常告警数据上报调用DSP返回结果为空!");
String airportName = airports.get(0).getName();
alarmLog.setType(AlarmTypeEnum.ALARM_CRITICAL.getType());
alarmLog.setResult(AlarmLog.HANDLE_NOT);
Integer insertResult = this.saveAlarmLog(alarmLog);
if (insertResult <= 0) {
log.error("异常告警数据:{},记录入库异常:", time);
return;
}
log.info("异常告警数据:{},记录入库成功!", time);
//调用DSP,上报异常告警数据
HttpURLConnectionUtil httpURLConnectionUtil = new HttpURLConnectionUtil();
//dps请求地址
String url = bnconnect.getApiUrl() + DSP_ALARM_URI;
//请求参数
List parm = new ArrayList();
Map parmMap = new HashMap();
parmMap.put("platformCode", DSP_PLATFORMCODE);
Map parmMapTemplate = new HashMap();
parmMapTemplate.put("name", airportName);
parmMapTemplate.put("msg", critical);
parmMap.put("alarmData", JSONObject.toJSONString(parmMapTemplate));
parmMap.put("alarmTemplate", DSP_ALARM_TEMPLATE);
parm.add(parmMap);
String jsonparm = JSONObject.toJSONString(parm);
log.info("请求地址{},请求类型put,请求体{}", url, jsonparm);
String dspResponse = httpURLConnectionUtil.doPut(url, jsonparm);
log.info("请求返回值{}", dspResponse);
if (StringUtils.isEmpty(dspResponse)) {
log.error("异常告警数据上报调用DSP返回结果为空!");
throw new RuntimeException("异常告警数据上报调用DSP返回结果为空!");
} else {
JSONObject dspJSObject = JSONObject.parseObject(dspResponse);
if (ObjectUtil.isNotEmpty(dspJSObject) && JsonResult.SUCCESS == dspJSObject.getInteger("code")) {
//发送MQTT消息告警已处理
mqttService.alarmSend(edgId, AlarmLog.HANDLE_AGO, Long.parseLong(time));
log.info("告警数据:{},控制面板ID:{},告警处理已回复", time, edgId);
alarmLog.setResult(AlarmLog.HANDLE_AGO);
alarmLog.setUpdateTime(DateUtils.now());
//更新告警记录已处理
alarmMapper.updateById(alarmLog);
} else {
JSONObject dspJSObject = JSONObject.parseObject(dspResponse);
if (ObjectUtil.isNotEmpty(dspJSObject) && JsonResult.SUCCESS == dspJSObject.getInteger("code")) {
//发送MQTT消息告警已处理
mqttService.alarmSend(edgId, Alarm.HANDLE_AGO, Long.parseLong(time));
log.info("告警数据:{},控制面板ID:{},告警处理已回复", time, edgId);
alarm.setResult(Alarm.HANDLE_AGO);
//更新告警记录已处理
alarmMapper.updateById(alarm);
} else {
//请求返回失败
log.error("异常告警数据:{},上报调用DSP异常,返回值:{}", time, dspJSObject);
throw new RuntimeException("异常告警数据上报调用DSP返回结果异常!");
}
//请求返回失败
log.error("异常告警数据:{},上报调用DSP异常,返回值:{}", time, dspJSObject);
throw new RuntimeException("异常告警数据上报调用DSP返回结果异常!");
}

try{
sendAlarmLog(airports.get(0).getCode(),airports.get(0),alarmLog);
}catch (Exception e){
log.error("发送告警日志:{},到redis通道:{},异常:{}",alarmLog,airports.get(0).getCode(),e);
}
} else {
log.error("告警数据:{},控制板ID:{}告警,未查询到机场信息或dsp信息", time, edgId);
throw new RuntimeException("未查询到机场信息或dsp信息!");
}
}catch (Exception e){
log.error("告警数据:{},处理异常,通知MQTT消息未处理",time,e);
} catch (Exception e) {
log.error("告警数据:{},处理异常,通知MQTT消息未处理", time, e);
//发送MQTT消息告警未处理
mqttService.alarmSend(edgId, Alarm.HANDLE_NOT, Long.parseLong(time));
mqttService.alarmSend(edgId, AlarmLog.HANDLE_NOT, Long.parseLong(time));
}
}
}

/**
* 记录告警日志
*
* @param alarmLog
*/
private int saveAlarmLog(AlarmLog alarmLog) {
return alarmMapper.insert(alarmLog);
}

/**
* 发送告警日志到redis通道
* @param deviceId 地面站ID
* @param airport 机场信息
* @param alarmLog 告警信息
*/
private void sendAlarmLog(String deviceId,Airport airport,AlarmLog alarmLog){
JSONObject jsonObject = new JSONObject();
JSONObject alarmData = new JSONObject();
alarmData.put("airportName",airport.getName());//机场名称
alarmData.put("alarmMsg",alarmLog.getData());//告警内容
alarmData.put("alarmDate",DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS,alarmLog.getCreateTime()));//告警时间
jsonObject.put("alarmData",alarmData);
log.info("发送告警日志:{},到redis通道:{}",jsonObject.toJSONString(),deviceId);
redisTemplate.convertAndSend(deviceId, jsonObject.toJSONString());
}
}

Loading…
Cancel
Save