|
|
@@ -0,0 +1,440 @@ |
|
|
|
package com.tuoheng.admin.service.impl; |
|
|
|
|
|
|
|
import cn.hutool.core.util.IdUtil; |
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
|
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
|
|
|
import com.tuoheng.admin.config.CommonConfig; |
|
|
|
import com.tuoheng.admin.dto.StreamingDto; |
|
|
|
import com.tuoheng.admin.entity.*; |
|
|
|
import com.tuoheng.admin.enums.*; |
|
|
|
import com.tuoheng.admin.mapper.*; |
|
|
|
import com.tuoheng.admin.request.CallbackRequest; |
|
|
|
import com.tuoheng.admin.request.DspInspection.VideoCallbackRequest; |
|
|
|
import com.tuoheng.admin.service.IDspCallbackService; |
|
|
|
import com.tuoheng.admin.utils.GaodeUtil; |
|
|
|
import com.tuoheng.common.core.common.BaseEntity; |
|
|
|
import com.tuoheng.common.core.enums.MarkEnum; |
|
|
|
import com.tuoheng.common.core.exception.ServiceException; |
|
|
|
import com.tuoheng.common.core.utils.*; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.beans.BeanUtils; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Qualifier; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.core.ParameterizedTypeReference; |
|
|
|
import org.springframework.http.HttpEntity; |
|
|
|
import org.springframework.http.HttpMethod; |
|
|
|
import org.springframework.http.HttpStatus; |
|
|
|
import org.springframework.http.ResponseEntity; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.springframework.util.CollectionUtils; |
|
|
|
import org.springframework.web.client.RestTemplate; |
|
|
|
|
|
|
|
import java.util.List; |
|
|
|
import java.util.Locale; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
/** |
|
|
|
* DSP回调 服务实现类 |
|
|
|
* |
|
|
|
* @author WangHaoran |
|
|
|
* @since 2021-09-01 |
|
|
|
*/ |
|
|
|
@Slf4j |
|
|
|
@Service |
|
|
|
public class DspCallbackServiceImpl implements IDspCallbackService { |
|
|
|
|
|
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(InspectionServiceImpl.class); |
|
|
|
|
|
|
|
/** |
|
|
|
* 根据通道编码获取 |
|
|
|
*/ |
|
|
|
private final static String GET_LIVE_CHANNEL_INFO = "streaming/getInfo/{code}"; |
|
|
|
|
|
|
|
@Value("${tuoheng.live-channel-domain-url:}") |
|
|
|
private String channel_url; |
|
|
|
|
|
|
|
/** |
|
|
|
* 释放通道URL请求地址 |
|
|
|
*/ |
|
|
|
private final static String STREAMING_DELIVER_URL = "streaming/setDeliver"; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private BusinessMapper businessMapper; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private InspectionMapper inspectionMapper; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private InspectionFileMapper inspectionFileMapper; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private FlightDataMapper flightDataMapper; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private LiveChannelMapper liveChannelMapper; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
@Qualifier("restTemplate") |
|
|
|
private RestTemplate restTemplate; |
|
|
|
|
|
|
|
/** |
|
|
|
* 保存DSP回调数据 |
|
|
|
* |
|
|
|
* @param requestId 请求id |
|
|
|
* @param callbackRequest 回调请求体 |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public JsonResult saveCallbackData(String requestId, CallbackRequest callbackRequest) { |
|
|
|
log.info("DSP服务回调地址接收成功, requestId:{}, callbackRequest:{}", requestId, JacksonUtil.obj2StringPretty(callbackRequest)); |
|
|
|
log.info("以下为打印的回调参数"); |
|
|
|
log.info("requestId为:{}", requestId); |
|
|
|
log.info(JSON.toJSONString(callbackRequest)); |
|
|
|
|
|
|
|
// 查询业务表数据 |
|
|
|
Business thbusiness = businessMapper.selectOne(new LambdaQueryWrapper<Business>() |
|
|
|
.eq(Business::getMsgId, requestId) |
|
|
|
.eq(Business::getType, 1) |
|
|
|
.last("limit 1")); |
|
|
|
if (StringUtils.isNull(thbusiness)) { |
|
|
|
throw new ServiceException(0, "业务数据不存在"); |
|
|
|
} |
|
|
|
log.info("业务关系表查询成功"); |
|
|
|
// 巡检任务ID |
|
|
|
String inspectionId = thbusiness.getTypeId(); |
|
|
|
// 查询任务 |
|
|
|
Inspection inspection = inspectionMapper.selectById(inspectionId); |
|
|
|
if (inspection == null) { |
|
|
|
log.info("巡检任务不存在"); |
|
|
|
throw new ServiceException(0, "巡检任务不存在"); |
|
|
|
} |
|
|
|
log.info("巡检任务查询成功,分析状态为:{}", callbackRequest.getAnalyseStatus()); |
|
|
|
// 分析状态处理: 5:waiting(等待)、10:running(分析中)、15:success(分析完成)、20:timeout(成功超时)、25:failed(分析失败) |
|
|
|
if (callbackRequest.getAnalyseStatus().equals(5)) { |
|
|
|
// 等待 |
|
|
|
inspection.setAnalyseStatus(2); |
|
|
|
} else if (callbackRequest.getAnalyseStatus().equals(10)) { |
|
|
|
// 分析中 |
|
|
|
inspection.setAnalyseStatus(3); |
|
|
|
} else if (callbackRequest.getAnalyseStatus().equals(15)) { |
|
|
|
// 分析完成 |
|
|
|
inspection.setAnalyseStatus(4); |
|
|
|
} else if (callbackRequest.getAnalyseStatus().equals(20)) { |
|
|
|
// 成功超时 |
|
|
|
inspection.setAnalyseStatus(5); |
|
|
|
} else if (callbackRequest.getAnalyseStatus().equals(25)) { |
|
|
|
// 分析失败 |
|
|
|
inspection.setAnalyseStatus(6); |
|
|
|
} |
|
|
|
// 分析进度 |
|
|
|
log.info("巡检任务查询成功,分析进度为:{}", callbackRequest.getProgress()); |
|
|
|
inspection.setProgressbar(callbackRequest.getProgress()); |
|
|
|
// 视频处理 |
|
|
|
if (callbackRequest.getType().equals(1)) { |
|
|
|
// 实时直播 |
|
|
|
log.info("实时直播视频处理"); |
|
|
|
if (StringUtils.isNotBlank(callbackRequest.getVideoUrl())) { |
|
|
|
inspection.setVideoUrl(StringUtils.removeHost(callbackRequest.getVideoUrl(), CommonConfig.videoURL)); |
|
|
|
} |
|
|
|
if (StringUtils.isNotBlank(callbackRequest.getAiVideoUrl())) { |
|
|
|
inspection.setAiVideoUrl(StringUtils.removeHost(callbackRequest.getAiVideoUrl(), CommonConfig.videoURL)); |
|
|
|
} |
|
|
|
} else if (callbackRequest.getType().equals(2)) { |
|
|
|
// 离线识别 |
|
|
|
log.info("离线检测视频处理"); |
|
|
|
if (StringUtils.isNotBlank(callbackRequest.getAiVideoUrl())) { |
|
|
|
inspection.setAiVideoUrl(StringUtils.removeHost(callbackRequest.getAiVideoUrl(), CommonConfig.videoURL)); |
|
|
|
} |
|
|
|
} |
|
|
|
inspection.setUpdateTime(DateUtils.now()); |
|
|
|
inspectionMapper.updateById(inspection); |
|
|
|
log.info("巡检任务状态更新成功"); |
|
|
|
|
|
|
|
log.info("巡检任务分析问题图片处理开始,此处图片数量为:{}张", StringUtils.isNull(callbackRequest.getQuestionFiles()) ? 0 : callbackRequest.getQuestionFiles().size()); |
|
|
|
|
|
|
|
// 分析处理图片并入库 |
|
|
|
log.info("以下为本地问题图片信息:"); |
|
|
|
log.info(JSON.toJSONString(callbackRequest.getQuestionFiles())); |
|
|
|
List<QuestionFile> questionFiles = callbackRequest.getQuestionFiles(); |
|
|
|
if (!CollectionUtils.isEmpty(questionFiles)) { |
|
|
|
log.info("问题图片处理开始, 图片数量:{}", questionFiles.size()); |
|
|
|
// 坐标 |
|
|
|
boolean online_condition = AiAnalyseTypeEnum.ONLINE.getCode() == inspection.getIsLive(); |
|
|
|
boolean offline_condition = AiAnalyseTypeEnum.OFFLINE.getCode() == inspection.getIsLive(); |
|
|
|
log.info("在线:{}, 离线:{}", online_condition, offline_condition); |
|
|
|
List<FlightData> flightDataList = flightDataMapper.selectList(new LambdaQueryWrapper<FlightData>() |
|
|
|
.eq(FlightData::getInspectionId, thbusiness.getTypeId()) |
|
|
|
.notLike(online_condition, FlightData::getLng, "E-") |
|
|
|
.notLike(online_condition, FlightData::getLng, "0.0") |
|
|
|
.notLike(online_condition, FlightData::getLat, "E-") |
|
|
|
.notLike(online_condition, FlightData::getLat, "0.0") |
|
|
|
.orderByDesc(FlightData::getTimestamp)); |
|
|
|
if (!CollectionUtils.isEmpty(flightDataList)) { |
|
|
|
log.info("飞行坐标大小:{}", flightDataList.size()); |
|
|
|
} |
|
|
|
List<InspectionFile> thirstyQuestionFiles = questionFiles.stream().map(questionFile -> { |
|
|
|
// 遍历创建巡检图片信息 |
|
|
|
InspectionFile inspectionFile = new InspectionFile(); |
|
|
|
// 对象属性拷贝 |
|
|
|
BeanUtils.copyProperties(questionFile, inspectionFile); |
|
|
|
inspectionFile.setId(IdUtil.fastSimpleUUID()); |
|
|
|
//问题编号 |
|
|
|
inspectionFile.setQuestionId(questionFile.getQuestionCode()); |
|
|
|
inspectionFile.setTenantId(inspection.getTenantId()); |
|
|
|
inspectionFile.setInspectionId(inspectionId); |
|
|
|
// 文件类型 |
|
|
|
inspectionFile.setFileType(FileTypeEnum.IMAGE.getCode()); |
|
|
|
// 文件编码 |
|
|
|
inspectionFile.setFileCode(inspectionFile.getFileCode()); |
|
|
|
// 文件名称 |
|
|
|
inspectionFile.setFileName(inspectionFile.getFileName()); |
|
|
|
//图片大小,单位MB,保留两位小数,目前没有获取图片大小的字段 |
|
|
|
inspectionFile.setFileSize(0.00); |
|
|
|
// 原始图片 |
|
|
|
String fileOriginalUrl = questionFile.getFileOriginalUrl(); |
|
|
|
if (StringUtils.isNotEmpty(fileOriginalUrl) && fileOriginalUrl.contains(CommonConfig.imageURL)) { |
|
|
|
fileOriginalUrl = fileOriginalUrl.replaceAll(CommonConfig.imageURL, ""); |
|
|
|
} |
|
|
|
inspectionFile.setFileOriginal(fileOriginalUrl); |
|
|
|
// 标记图片 |
|
|
|
String fileMarkerUrl = questionFile.getFileMarkerUrl(); |
|
|
|
if (StringUtils.isNotEmpty(fileMarkerUrl) && fileMarkerUrl.contains(CommonConfig.imageURL)) { |
|
|
|
fileMarkerUrl = fileMarkerUrl.replaceAll(CommonConfig.imageURL, ""); |
|
|
|
} |
|
|
|
inspectionFile.setFileImage(fileMarkerUrl); |
|
|
|
// 问题名称 |
|
|
|
inspectionFile.setQuestionName(questionFile.getQuestionName()); |
|
|
|
// 问题图片审核状态 |
|
|
|
inspectionFile.setStatus(QuestionEnum.NOTREVIEWED.getCode()); |
|
|
|
// 问题图片来源 |
|
|
|
inspectionFile.setSource(SourceEnum.AI.getCode()); |
|
|
|
inspectionFile.setCreateUser(thbusiness.getCreateUser()); // 设置默认用户 |
|
|
|
inspectionFile.setCreateTime(DateUtils.now()); |
|
|
|
|
|
|
|
if (offline_condition) { |
|
|
|
// 离线分析处理 |
|
|
|
log.info("离线坐标分析处理..."); |
|
|
|
// 离线获取图片坐标信息 |
|
|
|
String frame = StringUtils.subString(questionFile.getFileName(), "_frame-", "_type"); |
|
|
|
String startFrame = frame.substring(0, frame.indexOf("-")); |
|
|
|
//统一按照25帧计算 |
|
|
|
int second = Integer.parseInt(startFrame) / 25; |
|
|
|
//long shootTime = inspection.getExecutionStartTime().getTime() + second; |
|
|
|
long shootTime = Long.parseLong(flightDataList.get(flightDataList.size() - 1).getTimestamp()) + second * 1000; |
|
|
|
FlightData flightData_off = getFlightDataByTime(flightDataList, shootTime); |
|
|
|
JSONObject gaodeCoordinateOff = GaodeUtil.getGaodeCoordinate(flightData_off.getLng(), flightData_off.getLat()); |
|
|
|
inspectionFile.setLongitude(flightData_off.getLng()); |
|
|
|
inspectionFile.setLatitude(flightData_off.getLat()); |
|
|
|
inspectionFile.setGaodeLongitude(gaodeCoordinateOff.getString("longitude")); |
|
|
|
inspectionFile.setGaodeLatitude(gaodeCoordinateOff.getString("latitude")); |
|
|
|
String gaodeAddressOff = GaodeUtil.getGaodeAddress(gaodeCoordinateOff.getString("longitude"), gaodeCoordinateOff.getString("latitude")); |
|
|
|
inspectionFile.setLocation(gaodeAddressOff); |
|
|
|
inspectionFile.setGaodeAddress(gaodeAddressOff); |
|
|
|
} |
|
|
|
if (online_condition) { |
|
|
|
// 实时直播处理 |
|
|
|
log.info("实时直播坐标分析处理..."); |
|
|
|
// long time = DateUtils.dateTime(DateUtils.YYYY_MM_DD_HH_MM_SS, questionFile.getAnalyseTime()).getTime(); |
|
|
|
FlightData flightData_live = getFlightDataByTime(flightDataList, questionFile.getAnalyseTime().getTime()); |
|
|
|
log.info("实时坐标:{}", JacksonUtil.obj2StringPretty(flightData_live)); |
|
|
|
inspectionFile.setLongitude(flightData_live.getLng()); |
|
|
|
inspectionFile.setLatitude(flightData_live.getLat()); |
|
|
|
//坐标转换,GPS转高德 |
|
|
|
JSONObject gaodeCoordinateLive = GaodeUtil.getGaodeCoordinate(flightData_live.getLng(), flightData_live.getLat()); |
|
|
|
inspectionFile.setGaodeLongitude(gaodeCoordinateLive.getString("longitude")); |
|
|
|
inspectionFile.setGaodeLatitude(gaodeCoordinateLive.getString("latitude")); |
|
|
|
String gaodeAddress_live = GaodeUtil.getGaodeAddress(gaodeCoordinateLive.getString("longitude"), gaodeCoordinateLive.getString("latitude")); |
|
|
|
inspectionFile.setLocation(gaodeAddress_live); |
|
|
|
inspectionFile.setGaodeAddress(gaodeAddress_live); |
|
|
|
} |
|
|
|
return inspectionFile; |
|
|
|
}).collect(Collectors.toList()); |
|
|
|
|
|
|
|
log.info("批量插入问题图片数据"); |
|
|
|
CommonUtils.batchOperate((x) -> inspectionFileMapper.batchadd(x), thirstyQuestionFiles, 1000); |
|
|
|
} |
|
|
|
if ((4 == inspection.getAnalyseStatus() || 5 == inspection.getAnalyseStatus() || 6 == inspection.getAnalyseStatus()) && 1 == inspection.getInspectionType()) { |
|
|
|
|
|
|
|
LiveChannel liveChannel = liveChannelMapper.selectOne(Wrappers.<LiveChannel>lambdaQuery() |
|
|
|
.eq(LiveChannel::getInspectionId, inspectionId) |
|
|
|
.eq(LiveChannel::getMark, 1)); |
|
|
|
// 释放小程序选择的通道 |
|
|
|
if (null != liveChannel) { |
|
|
|
deliverStreaming(liveChannel.getChannelCode()); |
|
|
|
} |
|
|
|
} |
|
|
|
return JsonResult.success(); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 回调数据入库 |
|
|
|
* |
|
|
|
* @param dspCallbackRequest |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public JsonResult saveVideoCallbackData(VideoCallbackRequest dspCallbackRequest) { |
|
|
|
log.info("以下为打印的回调参数"); |
|
|
|
log.info("requestId为:{}", dspCallbackRequest.getRequestId()); |
|
|
|
log.info(JSON.toJSONString(dspCallbackRequest)); |
|
|
|
|
|
|
|
// 查询任务 |
|
|
|
Inspection inspection = inspectionMapper.selectOne(Wrappers.<Inspection>lambdaQuery() |
|
|
|
.eq(Inspection::getRequestId, dspCallbackRequest.getRequestId()) |
|
|
|
.eq(BaseEntity::getMark, MarkEnum.VALID.getCode())); |
|
|
|
if (inspection == null) { |
|
|
|
log.info("巡检任务不存在"); |
|
|
|
throw new ServiceException(0, "巡检任务不存在"); |
|
|
|
} |
|
|
|
//填充通道相关url |
|
|
|
//通道表新增 |
|
|
|
LiveChannel liveChannel = new LiveChannel(); |
|
|
|
liveChannel.setTenantId(inspection.getTenantId()); |
|
|
|
liveChannel.setInspectionId(inspection.getId()); |
|
|
|
liveChannel.setChannelCode("0"); |
|
|
|
liveChannel.setPushUrl(dspCallbackRequest.getPushUrl()); |
|
|
|
liveChannel.setPullUrl(dspCallbackRequest.getPlayUrl()); |
|
|
|
liveChannel.setAipushUrl(dspCallbackRequest.getAiPushUrl()); |
|
|
|
liveChannel.setAipullUrl(dspCallbackRequest.getAiPlayUrl()); |
|
|
|
log.info("添加通道相关表单成功"); |
|
|
|
// 查验是否存在 |
|
|
|
LiveChannel liveChannelQuery = liveChannelMapper.selectOne(new LambdaQueryWrapper<LiveChannel>() |
|
|
|
.eq(LiveChannel::getInspectionId, inspection.getId()) |
|
|
|
.eq(LiveChannel::getMark, MarkEnum.VALID.getCode())); |
|
|
|
if (liveChannelQuery == null) { |
|
|
|
// 添加通道使用记录 |
|
|
|
liveChannel.setCreateTime(DateUtils.now()); |
|
|
|
liveChannelMapper.insert(liveChannel); |
|
|
|
} else { |
|
|
|
// 更新通道使用记录 |
|
|
|
liveChannel.setId(liveChannelQuery.getId()); |
|
|
|
liveChannelMapper.updateById(liveChannel); |
|
|
|
} |
|
|
|
// 添加业务表数据 |
|
|
|
insertBusiness(dspCallbackRequest.getRequestId(), inspection.getId(), inspection.getTenantId()); |
|
|
|
return JsonResult.success(); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 获取飞行坐标 |
|
|
|
* |
|
|
|
* @param flightDataList 飞行坐标集合 |
|
|
|
* @param time 当前时间 |
|
|
|
* @return 当前飞行对象 |
|
|
|
*/ |
|
|
|
private FlightData getFlightDataByTime(List<FlightData> flightDataList, long time) { |
|
|
|
if (CollectionUtils.isEmpty(flightDataList)) { |
|
|
|
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "获取无人机飞行坐标失败!"); |
|
|
|
} |
|
|
|
|
|
|
|
List<FlightData> flightData = flightDataList.stream().filter(data -> { |
|
|
|
try { |
|
|
|
return Long.valueOf(data.getTimestamp()) <= time; |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("获取飞行数据异常:", e); |
|
|
|
log.error("批量插入问题图片数据:{}", JacksonUtil.obj2StringPretty(data)); |
|
|
|
return false; |
|
|
|
} |
|
|
|
}).collect(Collectors.toList()); |
|
|
|
if (CollectionUtils.isEmpty(flightData)) { |
|
|
|
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "获取无人机飞行坐标失败!"); |
|
|
|
} |
|
|
|
return flightData.get(0); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 释放小程序选择的通道 |
|
|
|
* |
|
|
|
* @param code 通道编码 |
|
|
|
*/ |
|
|
|
private void deliverStreaming(String code) { |
|
|
|
try { |
|
|
|
StreamingDto streaming = getLiveStatus(code); |
|
|
|
if (ChannelStatusEnum.USING.getCode() != streaming.getStatus() |
|
|
|
|| ChannelStatusEnum.DEACTIVATING.getCode() == streaming.getStatus()) { |
|
|
|
return; |
|
|
|
} |
|
|
|
String url = String.format(Locale.ENGLISH, "%s%s", channel_url, STREAMING_DELIVER_URL); |
|
|
|
StreamingDto streamingDto = new StreamingDto(); |
|
|
|
streamingDto.setCode(code); |
|
|
|
HttpEntity<StreamingDto> httpEntity = new HttpEntity<>(streamingDto); |
|
|
|
ResponseEntity<JsonResult> response = restTemplate.exchange(url, HttpMethod.PUT, httpEntity, JsonResult.class); |
|
|
|
if (response == null || !response.hasBody()) { |
|
|
|
LOGGER.error("释放通道失败,响应体为空!"); |
|
|
|
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "释放通道失败,请重试"); |
|
|
|
} |
|
|
|
if (response == null || !response.hasBody() || response.getBody().getCode() != JsonResult.SUCCESS) { |
|
|
|
LOGGER.error("释放通道失败,响应状态码:{}", response.getBody().getCode()); |
|
|
|
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "释放通道失败,请重试"); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
LOGGER.error("释放通道失败", e); |
|
|
|
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "释放通道失败,请重试"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public StreamingDto getLiveStatus(String code) { |
|
|
|
if (org.apache.commons.lang3.StringUtils.isEmpty(code)) { |
|
|
|
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "通道编码不能为空!"); |
|
|
|
} |
|
|
|
String url = String.format(Locale.ENGLISH, "%s%s", channel_url, |
|
|
|
GET_LIVE_CHANNEL_INFO); |
|
|
|
ParameterizedTypeReference<JsonResult<StreamingDto>> parameterizedTypeReference = |
|
|
|
new ParameterizedTypeReference<JsonResult<StreamingDto>>() { |
|
|
|
}; |
|
|
|
ResponseEntity<JsonResult<StreamingDto>> response; |
|
|
|
try { |
|
|
|
response = restTemplate.exchange(url, HttpMethod.GET, null, parameterizedTypeReference, code); |
|
|
|
} catch (Exception e) { |
|
|
|
LOGGER.error("", e); |
|
|
|
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "获取通道状态失败!"); |
|
|
|
} |
|
|
|
if (response == null || !response.hasBody() || response.getBody().getCode() != JsonResult.SUCCESS) { |
|
|
|
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "获取通道状态失败!"); |
|
|
|
} |
|
|
|
StreamingDto liveChannelDto = response.getBody().getData(); |
|
|
|
if (liveChannelDto == null || org.apache.commons.lang3.StringUtils.isEmpty(liveChannelDto.getCode()) |
|
|
|
|| liveChannelDto.getStatus() == null |
|
|
|
|| org.apache.commons.lang3.StringUtils.isEmpty(liveChannelDto.getPushUrl()) |
|
|
|
|| org.apache.commons.lang3.StringUtils.isEmpty(liveChannelDto.getPullUrl())) { |
|
|
|
LOGGER.error("获取通道状态信息不完整!"); |
|
|
|
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "获取通道状态信息不完整!"); |
|
|
|
} |
|
|
|
return liveChannelDto; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 保存kafka信息id和任务id,用于后期取出消息处理任务逻辑 |
|
|
|
* |
|
|
|
* @param msgId kafka消息id |
|
|
|
* @param id 任务id |
|
|
|
*/ |
|
|
|
private void insertBusiness(String msgId, String id, String tenantId) { |
|
|
|
|
|
|
|
// 将消息任务和kafka任务id消息存入数据库,用于监听获取任务信息 |
|
|
|
Business thbusiness = new Business(); |
|
|
|
thbusiness.setMsgId(msgId); |
|
|
|
thbusiness.setTenantId(tenantId); |
|
|
|
thbusiness.setType(1); |
|
|
|
thbusiness.setTypeId(id); |
|
|
|
thbusiness.setCreateTime(DateUtils.now()); |
|
|
|
thbusiness.setUpdateTime(DateUtils.now()); |
|
|
|
thbusiness.setMark(1); |
|
|
|
// 查询任务是否存在 |
|
|
|
int result = businessMapper.selectCount(new LambdaQueryWrapper<Business>() |
|
|
|
.eq(Business::getMsgId, msgId) |
|
|
|
.eq(Business::getMark, 1)); |
|
|
|
if (result > 0) { // 做了一层保护,可有可无 |
|
|
|
businessMapper.update(thbusiness, new LambdaQueryWrapper<Business>() |
|
|
|
.eq(Business::getMsgId, msgId) |
|
|
|
.eq(Business::getMark, 1)); |
|
|
|
} else { |
|
|
|
businessMapper.insert(thbusiness); |
|
|
|
} |
|
|
|
} |
|
|
|
} |