|
|
@@ -1,33 +1,31 @@ |
|
|
|
package com.tuoheng.admin.service.mqttService.consumer; |
|
|
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
|
|
|
import com.tuoheng.admin.pojo.entity.Airport; |
|
|
|
import com.tuoheng.admin.service.IAirportService; |
|
|
|
import com.tuoheng.admin.utils.SpringUtil; |
|
|
|
import com.tuoheng.common.utils.RedisUtils; |
|
|
|
import com.tuoheng.admin.service.mqttService.consumer.topicHandle.ITopicHandleService; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttCallback; |
|
|
|
import org.eclipse.paho.client.mqttv3.MqttMessage; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
import java.text.DateFormat; |
|
|
|
import java.text.SimpleDateFormat; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Date; |
|
|
|
import java.util.List; |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* @Author: 吴彬 |
|
|
|
* @CreateTime: 2023-05-22 19:09 |
|
|
|
* @Description: MQTT 消息监听回调 |
|
|
|
* @Version: 1.0 |
|
|
|
*/ |
|
|
|
@Component |
|
|
|
@Slf4j |
|
|
|
public class MqttConsumerCallBack implements MqttCallback{ |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private List<ITopicHandleService> iTopicHandleServices; |
|
|
|
|
|
|
|
/** |
|
|
|
* 客户端断开连接的回调 |
|
|
|
*/ |
|
|
@@ -41,182 +39,32 @@ public class MqttConsumerCallBack implements MqttCallback{ |
|
|
|
System.out.println("重连成功!"); |
|
|
|
}*/ |
|
|
|
} |
|
|
|
DateFormat bf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
|
|
|
|
|
|
/** |
|
|
|
* 消息到达的回调 |
|
|
|
* 消费消息回调 |
|
|
|
* @param topic name of the topic on the message was published to |
|
|
|
* @param message the actual message. |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public void messageArrived(String topic, MqttMessage message) throws Exception { |
|
|
|
public void messageArrived(String topic, MqttMessage message){ |
|
|
|
try { |
|
|
|
log.info("接收消息主题 : {}\n{}",topic,JSONObject.parse(message.getPayload())); |
|
|
|
RedisUtils redisUtils = SpringUtil.getBean(RedisUtils.class); |
|
|
|
String edgId = topic.split("/")[2]; |
|
|
|
String command = topic.substring(edgId.length() + 4, topic.length()); |
|
|
|
if (!(command.contains("data/")||command.contains("alarm/"))) { |
|
|
|
redisUtils.hset(edgId, command, message, 100); |
|
|
|
}else{ |
|
|
|
//上报数据换算 |
|
|
|
message = getMathValue(message,command,redisUtils,edgId); |
|
|
|
redisUtils.hset(edgId, command, message); |
|
|
|
redisUtils.hset(edgId+"_data", command, message); |
|
|
|
if(command.contains("alarm")) { |
|
|
|
command = command.replace("alarm", "data"); |
|
|
|
redisUtils.hset(edgId, command, message); |
|
|
|
redisUtils.hset(edgId+"_data", command, message); |
|
|
|
//将topic中的设备编号替换成通用标识 |
|
|
|
String topicSource = topic.replace(edgId,"%s"); |
|
|
|
|
|
|
|
/** |
|
|
|
* 根据topic获取对应的消息处理类,后期扩展只需定义对应topic的处理类即可,注意:每个处理类对应的topic全局唯一 |
|
|
|
*/ |
|
|
|
for(ITopicHandleService iTopicHandleService : iTopicHandleServices){ |
|
|
|
if (topicSource.equals(iTopicHandleService.getTopic())){ |
|
|
|
iTopicHandleService.topicHandle(message,edgId); |
|
|
|
break; |
|
|
|
} |
|
|
|
getWth(command,redisUtils,edgId, message); |
|
|
|
} |
|
|
|
//saveStatus(edgId,command,message); |
|
|
|
//消息日志入库 |
|
|
|
//saveLog(edgId, command, message); |
|
|
|
//保存机场的运行状态 |
|
|
|
saveAirportRedisStatus(command,redisUtils,edgId, message); |
|
|
|
}catch (Exception e){ |
|
|
|
log.error("",e); |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 保存机场的运行状态 |
|
|
|
* @param command |
|
|
|
* @param redisUtils |
|
|
|
* @param edgId |
|
|
|
* @param message |
|
|
|
*/ |
|
|
|
private void saveAirportRedisStatus(String command, RedisUtils redisUtils, String edgId, MqttMessage message) { |
|
|
|
JSONObject jsonObject = (JSONObject)JSONObject.parse(message.getPayload()); |
|
|
|
IAirportService airportService = SpringUtil.getBean(IAirportService.class); |
|
|
|
QueryWrapper queryWrapper = new QueryWrapper(); |
|
|
|
queryWrapper.eq("edge_id",edgId); |
|
|
|
queryWrapper.eq("mark",1); |
|
|
|
List<Airport> list = airportService.list(queryWrapper); |
|
|
|
|
|
|
|
if (list.size()>0){ |
|
|
|
edgId = list.get(0).getCode(); |
|
|
|
} |
|
|
|
/** |
|
|
|
* 1,2 舱门(命令接收,动作完成) |
|
|
|
* 3,4 升降器(命令接收,动作完成) |
|
|
|
* 5,6 固定器(命令接收,动作完成) |
|
|
|
* 7,8 出舱(命令接收,动作完成) |
|
|
|
* 9,10 回舱(命令接收,动作完成) |
|
|
|
* 11 任务中 |
|
|
|
* 12 返航中 |
|
|
|
* 13 悬停中 |
|
|
|
*/ |
|
|
|
if(command.contains("confirm/DoorMotor")&&jsonObject.get("msg").toString().contains("SUCCESS")) { |
|
|
|
if(jsonObject.get("msg")!=null&&jsonObject.get("msg").toString().contains("SUCCESS")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"1"); |
|
|
|
} |
|
|
|
}else if (command.contains("data/DoorMotor")){ |
|
|
|
if(jsonObject.get("parm")!=null&&(JSONObject.toJSONString(jsonObject.get("parm"))).contains("true")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"2"); |
|
|
|
} |
|
|
|
}else if(command.contains("confirm/LiftMotor")) { |
|
|
|
if(jsonObject.get("msg")!=null&&jsonObject.get("msg").toString().contains("SUCCESS")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"3"); |
|
|
|
} |
|
|
|
}else if (command.contains("data/LiftMotor")){ |
|
|
|
if(jsonObject.get("parm")!=null&&(JSONObject.toJSONString(jsonObject.get("parm"))).contains("true")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"4"); |
|
|
|
} |
|
|
|
}else if(command.contains("confirm/FixedMotor")) { |
|
|
|
if(jsonObject.get("msg")!=null&&jsonObject.get("msg").toString().contains("SUCCESS")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"5"); |
|
|
|
} |
|
|
|
}else if (command.contains("data/FixedMotor")){ |
|
|
|
if(jsonObject.get("parm")!=null&&(JSONObject.toJSONString(jsonObject.get("parm"))).contains("true")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"6"); |
|
|
|
} |
|
|
|
}else if(command.contains("confirm/DroneGoAway")) { |
|
|
|
if(jsonObject.get("msg")!=null&&jsonObject.get("msg").toString().contains("SUCCESS")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"7"); |
|
|
|
} |
|
|
|
}else if (command.contains("data/DroneGoAway")){ |
|
|
|
if(jsonObject.get("parm")!=null&&(JSONObject.toJSONString(jsonObject.get("parm"))).contains("true")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"8"); |
|
|
|
} |
|
|
|
}else if(command.contains("confirm/DroneGoHome")) { |
|
|
|
if(jsonObject.get("msg")!=null&&jsonObject.get("msg").toString().contains("SUCCESS")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"9"); |
|
|
|
} |
|
|
|
}else if (command.contains("data/DroneGoHome")){ |
|
|
|
if(jsonObject.get("parm")!=null&&(JSONObject.toJSONString(jsonObject.get("parm"))).contains("true")){ |
|
|
|
redisUtils.hset("airportRunStatus",edgId,"10"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
//上报数据换算封装 |
|
|
|
private MqttMessage getMathValue(MqttMessage message,String command,RedisUtils redisUtils,String edgeId) { |
|
|
|
JSONObject jsonObject = (JSONObject)JSONObject.parse(message.getPayload()); |
|
|
|
|
|
|
|
JSONObject parmObjectList = (JSONObject)jsonObject.get("parm"); |
|
|
|
if(command.contains("/WTH")) { |
|
|
|
JSONObject parmNew = (JSONObject) parmObjectList.clone(); |
|
|
|
parmNew.put("WDIRNAME", getMathValueWDIR(parmObjectList.get("WDIR")==null?"0":parmObjectList.get("WDIR").toString())); |
|
|
|
parmNew.put("WDIR", String.format("%.2f",Double.parseDouble(parmObjectList.get("WDIR")==null?"0":parmObjectList.get("WDIR").toString())/10)); |
|
|
|
parmNew.put("WSPD", String.format("%.2f", Double.parseDouble(parmObjectList.get("WSPD")==null?"0":parmObjectList.get("WSPD").toString()) / 100)); |
|
|
|
parmNew.put("Rainfull", String.format("%.2f",Double.parseDouble(parmObjectList.get("Rainfull")==null?"0":parmObjectList.get("Rainfull").toString())*0.3)); |
|
|
|
parmNew.put("Hpa", String.format("%.2f",Double.parseDouble(parmObjectList.get("Hpa")==null?"0":parmObjectList.get("Hpa").toString())*0.001)); |
|
|
|
parmNew.put("Hum", String.format("%.2f",Double.parseDouble(parmObjectList.get("Hum")==null?"0":parmObjectList.get("Hum").toString())*0.1)); |
|
|
|
parmNew.put("Tmp", String.format("%.2f",Double.parseDouble(parmObjectList.get("Tmp")==null?"0":parmObjectList.get("Tmp").toString())*0.1)); |
|
|
|
parmNew.put("Lux", String.format("%.2f",Double.parseDouble(parmObjectList.get("Lux")==null?"0":parmObjectList.get("Lux").toString())*0.1)); |
|
|
|
parmNew.put("Nosie", String.format("%.2f",Double.parseDouble(parmObjectList.get("Nosie")==null?"0":parmObjectList.get("Nosie").toString())*0.1)); |
|
|
|
parmNew.put("Dew", String.format("%.2f",Double.parseDouble(parmObjectList.get("Dew")==null?"0":parmObjectList.get("Dew").toString())*0.1)); |
|
|
|
String acd = redisUtils.hget("acd",edgeId)==null?"":redisUtils.hget("acd",edgeId).toString(); |
|
|
|
parmNew.put("ACDTmp",acd); |
|
|
|
jsonObject.put("parmNew",parmNew); |
|
|
|
}else if(command.contains("/TAH")) { |
|
|
|
JSONObject parmNew = (JSONObject) parmObjectList.clone(); |
|
|
|
parmNew.put("Hum", String.format("%.2f",Double.parseDouble(parmObjectList.get("Hum")==null?"0":parmObjectList.get("Hum").toString())*0.1)); |
|
|
|
parmNew.put("Tmp", String.format("%.2f",Double.parseDouble(parmObjectList.get("Tmp")==null?"0":parmObjectList.get("Tmp").toString())*0.1)); |
|
|
|
jsonObject.put("parmNew",parmNew); |
|
|
|
} |
|
|
|
if (command.contains("/ACD")){ |
|
|
|
JSONObject parmNew = (JSONObject) parmObjectList.clone(); |
|
|
|
parmNew.put("ACDTmp", String.format("%.2f",Double.parseDouble(parmObjectList.get("Tmp")==null?"0":parmObjectList.get("Tmp").toString())*0.1)); |
|
|
|
jsonObject.put("parmNew",parmNew); |
|
|
|
|
|
|
|
redisUtils.hset("acd",edgeId,String.format("%.2f",Double.parseDouble(parmObjectList.get("Tmp")==null?"0":parmObjectList.get("Tmp").toString())*0.1),180); |
|
|
|
} |
|
|
|
if (command.contains("DoorMotor")||command.contains("DroneGoAway")){ |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
message.setPayload(jsonObject.toJSONString().getBytes()); |
|
|
|
return message; |
|
|
|
} |
|
|
|
|
|
|
|
private Object getMathValueWDIR(String wdir) { |
|
|
|
double dr = Double.parseDouble(wdir)/10; |
|
|
|
double resultde=dr-180; |
|
|
|
if (resultde==0){ |
|
|
|
return "北风"; |
|
|
|
} |
|
|
|
if (resultde>0&&resultde<90){ |
|
|
|
return "东北风"; |
|
|
|
log.error("接收消息主题 : {},异常:{}",topic,e); |
|
|
|
} |
|
|
|
if (resultde==90){ |
|
|
|
return "东风"; |
|
|
|
} |
|
|
|
if (resultde>90&&resultde<180){ |
|
|
|
return "东南风"; |
|
|
|
} |
|
|
|
if (resultde==180){ |
|
|
|
return "南风"; |
|
|
|
} |
|
|
|
if (resultde>-90&&resultde<0){ |
|
|
|
return "西北风"; |
|
|
|
} |
|
|
|
if (resultde==-90){ |
|
|
|
return "西风"; |
|
|
|
} |
|
|
|
if (resultde>-180&&resultde<-90){ |
|
|
|
return "西南风"; |
|
|
|
} |
|
|
|
return ""; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
@@ -227,23 +75,5 @@ public class MqttConsumerCallBack implements MqttCallback{ |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
public void getWth(String command,RedisUtils redisUtils,String edgId,MqttMessage message){ |
|
|
|
if(command.contains("data/WTH")||command.contains("data/TAH")){ |
|
|
|
|
|
|
|
JSONObject json = (JSONObject) JSON.parse(message.getPayload()); |
|
|
|
List list = (List) redisUtils.get(edgId+command+"list"); |
|
|
|
if(list==null) { |
|
|
|
list = new ArrayList(); |
|
|
|
} |
|
|
|
if(list.size()>3){ |
|
|
|
list.remove(0); |
|
|
|
} |
|
|
|
JSONObject map = (JSONObject)json.get("parm"); |
|
|
|
map.put("date", new Date()); |
|
|
|
list.add(map); |
|
|
|
redisUtils.set(edgId+command+"list",list); |
|
|
|
System.out.println(json.get("parm")); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|