@@ -0,0 +1,68 @@ | |||
package com.tuoheng.admin.dto; | |||
import lombok.Data; | |||
/** | |||
* <p> | |||
* 通道Dto | |||
* </p> | |||
* | |||
* @author 拓恒 | |||
* @since 2021-12-16 | |||
*/ | |||
@Data | |||
public class StreamingDto { | |||
/** | |||
* 通道id | |||
*/ | |||
private Integer id; | |||
/** | |||
* 通道名称 | |||
*/ | |||
private String name; | |||
/** | |||
* 通道编号 | |||
*/ | |||
private String code; | |||
/** | |||
* 原视频推流地址 | |||
*/ | |||
private String pushUrl; | |||
/** | |||
* 原视频拉流地址 | |||
*/ | |||
private String pullUrl; | |||
/** | |||
* 原视频拉流地址M3U8 | |||
*/ | |||
private String pullM3u8; | |||
/** | |||
* 通道状态:5空闲 10使用中 15停用 | |||
*/ | |||
private Integer status; | |||
/** | |||
* 通道状态描述 | |||
*/ | |||
private String statusName; | |||
/** | |||
* 描述 | |||
*/ | |||
private String note; | |||
public StreamingDto() { | |||
} | |||
public StreamingDto(String code) { | |||
this.code = code; | |||
} | |||
} |
@@ -0,0 +1,27 @@ | |||
package com.tuoheng.admin.enums; | |||
import lombok.Getter; | |||
/** | |||
* 通道状态 | |||
*/ | |||
public enum ChannelStatusEnum { | |||
IDLE(5,"空闲"), | |||
USING(10,"使用中"), | |||
DEACTIVATING(15,"停用"), | |||
; | |||
ChannelStatusEnum(int code, String description){ | |||
this.code = code; | |||
this.description = description; | |||
} | |||
@Getter | |||
private int code; | |||
@Getter | |||
private String description; | |||
} |
@@ -10,6 +10,7 @@ import com.tuoheng.admin.service.inspection.resubmit.ResubmitInspectionService; | |||
import com.tuoheng.admin.service.inspection.delete.DeleteInspectionService; | |||
import com.tuoheng.admin.service.inspection.query.*; | |||
import com.tuoheng.admin.service.inspection.update.UpdateInspectionService; | |||
import com.tuoheng.admin.service.inspection.update.flyer.UpdateFlyerService; | |||
import com.tuoheng.admin.service.inspection.update.status.UpdateInspectionStatusService; | |||
import com.tuoheng.admin.service.inspection.upload.UploadFlightUrlService; | |||
import com.tuoheng.common.core.common.OperationEnum; | |||
@@ -80,6 +81,9 @@ public class InspectionServiceImpl implements IInspectionService { | |||
@Autowired | |||
private UploadFlightUrlService uploadFlightUrlService; | |||
@Autowired | |||
private UpdateFlyerService updateFlyerService; | |||
/** | |||
* 查询巡检任务分页分页列表 | |||
* | |||
@@ -261,14 +265,14 @@ public class InspectionServiceImpl implements IInspectionService { | |||
} | |||
/** | |||
* 修改巡检任务 | |||
* 修改任务(修改任务状态和飞手信息) | |||
* | |||
* @param request | |||
* @return | |||
*/ | |||
@Override | |||
public JsonResult updateTaskByCode(UpdateTaskByCodeRequest request) { | |||
return null; | |||
return updateFlyerService.update(request); | |||
} | |||
} |
@@ -0,0 +1,80 @@ | |||
package com.tuoheng.admin.service.inspection.update.flyer; | |||
import com.tuoheng.admin.dto.StreamingDto; | |||
import com.tuoheng.admin.enums.ChannelStatusEnum; | |||
import com.tuoheng.common.core.exception.ServiceException; | |||
import com.tuoheng.common.core.utils.JsonResult; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.beans.factory.annotation.Qualifier; | |||
import org.springframework.beans.factory.annotation.Value; | |||
import org.springframework.http.HttpEntity; | |||
import org.springframework.http.HttpMethod; | |||
import org.springframework.http.HttpStatus; | |||
import org.springframework.http.ResponseEntity; | |||
import org.springframework.stereotype.Service; | |||
import org.springframework.web.client.RestTemplate; | |||
import java.util.Locale; | |||
@Slf4j | |||
@Service | |||
public class DeliverStreamingService { | |||
@Autowired | |||
@Qualifier("restTemplate") | |||
private RestTemplate restTemplate; | |||
@Autowired | |||
private GetLiveStatusService getLiveStatusService; | |||
@Value("${tuoheng.live-channel-domain-url:}") | |||
private String channel_url; | |||
/** | |||
* 根据通道编码获取 | |||
*/ | |||
private final static String GET_LIVE_CHANNEL_INFO = "streaming/getInfo/{code}"; | |||
/** | |||
* 直播通道占用设置地址 | |||
*/ | |||
public final static String LIVE_CHANNEL_USELIVECHANNEL_URL = "streaming/setUseing"; | |||
/** | |||
* 释放通道URL请求地址 | |||
*/ | |||
private final static String STREAMING_DELIVER_URL = "streaming/setDeliver"; | |||
/** | |||
* 释放小程序选择的通道 | |||
* | |||
* @param code 通道编码 | |||
*/ | |||
public void deliver(String code) { | |||
try { | |||
StreamingDto streaming = getLiveStatusService.getLiveStatus(code); | |||
if (ChannelStatusEnum.USING.getCode() != streaming.getStatus() | |||
|| ChannelStatusEnum.DEACTIVATING.getCode() == streaming.getStatus()) { | |||
return; | |||
} | |||
String url = String.format(Locale.ENGLISH, "%s%s", channel_url, STREAMING_DELIVER_URL); | |||
StreamingDto streamingDto = new StreamingDto(); | |||
streamingDto.setCode(code); | |||
HttpEntity<StreamingDto> httpEntity = new HttpEntity<>(streamingDto); | |||
ResponseEntity<JsonResult> response = restTemplate.exchange(url, HttpMethod.PUT, httpEntity, JsonResult.class); | |||
if (response == null || !response.hasBody()) { | |||
log.error("释放通道失败,响应体为空!"); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "释放通道失败,请重试"); | |||
} | |||
if (response == null || !response.hasBody() || response.getBody().getCode() != JsonResult.SUCCESS) { | |||
log.error("释放通道失败,响应状态码:{}", response.getBody().getCode()); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "释放通道失败,请重试"); | |||
} | |||
} catch (Exception e) { | |||
log.error("释放通道失败", e); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "释放通道失败,请重试"); | |||
} | |||
} | |||
} |
@@ -0,0 +1,64 @@ | |||
package com.tuoheng.admin.service.inspection.update.flyer; | |||
import com.tuoheng.admin.dto.StreamingDto; | |||
import com.tuoheng.common.core.exception.ServiceException; | |||
import com.tuoheng.common.core.utils.JsonResult; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.beans.factory.annotation.Qualifier; | |||
import org.springframework.beans.factory.annotation.Value; | |||
import org.springframework.core.ParameterizedTypeReference; | |||
import org.springframework.http.HttpMethod; | |||
import org.springframework.http.HttpStatus; | |||
import org.springframework.http.ResponseEntity; | |||
import org.springframework.stereotype.Service; | |||
import org.springframework.web.client.RestTemplate; | |||
import java.util.Locale; | |||
@Slf4j | |||
@Service | |||
public class GetLiveStatusService { | |||
@Autowired | |||
@Qualifier("restTemplate") | |||
private RestTemplate restTemplate; | |||
@Value("${tuoheng.live-channel-domain-url:}") | |||
private String channel_url; | |||
/** | |||
* 根据通道编码获取 | |||
*/ | |||
private final static String GET_LIVE_CHANNEL_INFO = "streaming/getInfo/{code}"; | |||
public StreamingDto getLiveStatus(String code) { | |||
if (org.apache.commons.lang3.StringUtils.isEmpty(code)) { | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "通道编码不能为空!"); | |||
} | |||
String url = String.format(Locale.ENGLISH, "%s%s", channel_url, GET_LIVE_CHANNEL_INFO); | |||
ParameterizedTypeReference<JsonResult<StreamingDto>> parameterizedTypeReference = | |||
new ParameterizedTypeReference<JsonResult<StreamingDto>>() { | |||
}; | |||
ResponseEntity<JsonResult<StreamingDto>> response; | |||
try { | |||
response = restTemplate.exchange(url, HttpMethod.GET, null, parameterizedTypeReference, code); | |||
} catch (Exception e) { | |||
log.error("", e); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "获取通道状态失败!"); | |||
} | |||
if (response == null || !response.hasBody() || response.getBody().getCode() != JsonResult.SUCCESS) { | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "获取通道状态失败!"); | |||
} | |||
StreamingDto liveChannelDto = response.getBody().getData(); | |||
if (liveChannelDto == null || org.apache.commons.lang3.StringUtils.isEmpty(liveChannelDto.getCode()) | |||
|| liveChannelDto.getStatus() == null | |||
|| org.apache.commons.lang3.StringUtils.isEmpty(liveChannelDto.getPushUrl()) | |||
|| org.apache.commons.lang3.StringUtils.isEmpty(liveChannelDto.getPullUrl())) { | |||
log.error("获取通道状态信息不完整!"); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "获取通道状态信息不完整!"); | |||
} | |||
return liveChannelDto; | |||
} | |||
} |
@@ -0,0 +1,26 @@ | |||
package com.tuoheng.admin.service.inspection.update.flyer; | |||
import com.tuoheng.admin.entity.Inspection; | |||
import com.tuoheng.admin.mapper.InspectionMapper; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.stereotype.Service; | |||
@Slf4j | |||
@Service | |||
public class OffLineCompleteService { | |||
@Autowired | |||
private InspectionMapper inspectionMapper; | |||
public void handle(Inspection inspection) { | |||
log.info("离线完成飞行,任务编号:{}", inspection.getId()); | |||
Inspection inspectionUpdate = new Inspection(); | |||
inspectionUpdate.setId(inspection.getId()); | |||
inspectionUpdate.setAnalyseStatus(1); | |||
inspectionUpdate.setHeartbeatTime(System.currentTimeMillis()); | |||
inspectionMapper.updateById(inspectionUpdate); | |||
log.info("离线完成飞行,设置分析状态:{}", inspection.getId()); | |||
} | |||
} |
@@ -0,0 +1,112 @@ | |||
package com.tuoheng.admin.service.inspection.update.flyer; | |||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; | |||
import com.tuoheng.admin.dto.StreamingDto; | |||
import com.tuoheng.admin.entity.Business; | |||
import com.tuoheng.admin.entity.Inspection; | |||
import com.tuoheng.admin.entity.InspectionFile; | |||
import com.tuoheng.admin.entity.LiveChannel; | |||
import com.tuoheng.admin.enums.ChannelStatusEnum; | |||
import com.tuoheng.admin.mapper.BusinessMapper; | |||
import com.tuoheng.admin.mapper.InspectionFileMapper; | |||
import com.tuoheng.admin.mapper.InspectionMapper; | |||
import com.tuoheng.admin.mapper.LiveChannelMapper; | |||
import com.tuoheng.admin.request.inspection.UpdateTaskByCodeRequest; | |||
import com.tuoheng.admin.vo.DspCallbackVo; | |||
import com.tuoheng.common.core.common.BaseEntity; | |||
import com.tuoheng.common.core.constant.CommonConstants; | |||
import com.tuoheng.common.core.enums.HttpStatusEnum; | |||
import com.tuoheng.common.core.exception.ServiceException; | |||
import com.tuoheng.common.core.utils.DateUtils; | |||
import com.tuoheng.common.core.utils.JacksonUtil; | |||
import com.tuoheng.common.core.utils.JsonResult; | |||
import com.tuoheng.common.core.utils.StringUtils; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.beans.factory.annotation.Qualifier; | |||
import org.springframework.beans.factory.annotation.Value; | |||
import org.springframework.core.ParameterizedTypeReference; | |||
import org.springframework.http.HttpEntity; | |||
import org.springframework.http.HttpMethod; | |||
import org.springframework.http.HttpStatus; | |||
import org.springframework.http.ResponseEntity; | |||
import org.springframework.stereotype.Service; | |||
import org.springframework.web.client.RestTemplate; | |||
import java.util.*; | |||
@Slf4j | |||
@Service | |||
public class OnlineCompleteService { | |||
@Autowired | |||
private InspectionMapper inspectionMapper; | |||
@Autowired | |||
private InspectionFileMapper inspectionFileMapper; | |||
@Autowired | |||
private LiveChannelMapper liveChannelMapper; | |||
@Autowired | |||
private BusinessMapper businessMapper; | |||
@Autowired | |||
private DeliverStreamingService deliverStreamingService; | |||
@Autowired | |||
@Qualifier("restTemplate") | |||
private RestTemplate restTemplate; | |||
@Value("${tuoheng.live-channel-domain-url:}") | |||
private String channel_url; | |||
/** | |||
* DSP服务API接口域名 | |||
*/ | |||
@Value("${tuoheng.dsp-domain-url:}") | |||
private String dspDomainUrl; | |||
@Value("${tuoheng.dsp-callback-url:}") | |||
private String dspCallbackUrl; | |||
public void handle(UpdateTaskByCodeRequest request, Inspection inspection) { | |||
LiveChannel channel = liveChannelMapper.selectOne(new LambdaQueryWrapper<LiveChannel>() | |||
.eq(LiveChannel::getInspectionId, inspection.getId()) | |||
.eq(LiveChannel::getMark, 1)); | |||
Optional.ofNullable(channel).orElseThrow( | |||
() -> new ServiceException(HttpStatus.BAD_REQUEST.value(), "获取通道信息失败!")); | |||
try { | |||
log.info("=========================发送DSP停止业务========================="); | |||
// 查询业务 | |||
Business business = businessMapper.selectOne(new LambdaQueryWrapper<Business>() | |||
.eq(Business::getType, 1) | |||
.eq(Business::getTypeId, inspection.getId()) | |||
.orderByDesc(BaseEntity::getCreateTime) | |||
.last("limit 1")); | |||
if (StringUtils.isNotNull(business)) { | |||
// 发送停止DSP分析服务 | |||
log.info("调用DSP服务"); | |||
String url = String.format("%s/api/web/serviceInst/%s/stop/", dspDomainUrl, business.getMsgId()); | |||
ResponseEntity<JsonResult> response = restTemplate.exchange(url, HttpMethod.PUT, null, new ParameterizedTypeReference<JsonResult>() { | |||
}); | |||
log.info(String.format("请求结果:%s", response.getBody())); | |||
if (response == null || !response.hasBody()) { | |||
log.error("停止DSP分析服务失败,响应体为空!"); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "停止DSP分析服务失败,请重试"); | |||
} | |||
JsonResult jsonResult = response.getBody(); | |||
log.info(String.format("结果对象转换:%s", jsonResult.getData())); | |||
if (jsonResult.getCode() != 0) { | |||
log.error("停止DSP分析服务失败, response:{}", JacksonUtil.obj2json(jsonResult)); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "停止DSP分析服务失败,请重试"); | |||
} | |||
} | |||
} finally { | |||
// 释放小程序选择的通道 | |||
deliverStreamingService.deliver(channel.getChannelCode()); | |||
} | |||
} | |||
} |
@@ -0,0 +1,292 @@ | |||
package com.tuoheng.admin.service.inspection.update.flyer; | |||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; | |||
import com.tuoheng.admin.dto.StreamingDto; | |||
import com.tuoheng.admin.entity.Business; | |||
import com.tuoheng.admin.entity.Inspection; | |||
import com.tuoheng.admin.entity.InspectionFile; | |||
import com.tuoheng.admin.entity.LiveChannel; | |||
import com.tuoheng.admin.enums.ChannelStatusEnum; | |||
import com.tuoheng.admin.mapper.BusinessMapper; | |||
import com.tuoheng.admin.mapper.InspectionFileMapper; | |||
import com.tuoheng.admin.mapper.InspectionMapper; | |||
import com.tuoheng.admin.mapper.LiveChannelMapper; | |||
import com.tuoheng.admin.request.inspection.UpdateTaskByCodeRequest; | |||
import com.tuoheng.admin.vo.DspCallbackVo; | |||
import com.tuoheng.common.core.constant.CommonConstants; | |||
import com.tuoheng.common.core.enums.HttpStatusEnum; | |||
import com.tuoheng.common.core.exception.ServiceException; | |||
import com.tuoheng.common.core.utils.DateUtils; | |||
import com.tuoheng.common.core.utils.JacksonUtil; | |||
import com.tuoheng.common.core.utils.JsonResult; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.beans.factory.annotation.Qualifier; | |||
import org.springframework.beans.factory.annotation.Value; | |||
import org.springframework.core.ParameterizedTypeReference; | |||
import org.springframework.http.HttpEntity; | |||
import org.springframework.http.HttpMethod; | |||
import org.springframework.http.HttpStatus; | |||
import org.springframework.http.ResponseEntity; | |||
import org.springframework.stereotype.Service; | |||
import org.springframework.web.client.RestTemplate; | |||
import java.util.*; | |||
@Slf4j | |||
@Service | |||
public class OnlineIdentifService { | |||
@Autowired | |||
private InspectionMapper inspectionMapper; | |||
@Autowired | |||
private InspectionFileMapper inspectionFileMapper; | |||
@Autowired | |||
private LiveChannelMapper liveChannelMapper; | |||
@Autowired | |||
private BusinessMapper businessMapper; | |||
@Autowired | |||
private GetLiveStatusService getLiveStatusService; | |||
@Autowired | |||
@Qualifier("restTemplate") | |||
private RestTemplate restTemplate; | |||
@Value("${tuoheng.live-channel-domain-url:}") | |||
private String channel_url; | |||
/** | |||
* DSP服务API接口域名 | |||
*/ | |||
@Value("${tuoheng.dsp-domain-url:}") | |||
private String dspDomainUrl; | |||
@Value("${tuoheng.dsp-callback-url:}") | |||
private String dspCallbackUrl; | |||
/** | |||
* 根据通道编码获取 | |||
*/ | |||
private final static String GET_LIVE_CHANNEL_INFO = "streaming/getInfo/{code}"; | |||
/** | |||
* 直播通道占用设置地址 | |||
*/ | |||
public final static String LIVE_CHANNEL_USELIVECHANNEL_URL = "streaming/setUseing"; | |||
/** | |||
* 释放通道URL请求地址 | |||
*/ | |||
private final static String STREAMING_DELIVER_URL = "streaming/setDeliver"; | |||
public void handle(UpdateTaskByCodeRequest request, Inspection inspection) { | |||
log.info("=========================在线识别开始========================="); | |||
if (request.getChannelCode() == null) { | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "请选择通道"); | |||
} | |||
StreamingDto dto = getLiveStatusService.getLiveStatus(request.getChannelCode()); | |||
useLiveChannel(new StreamingDto(request.getChannelCode())); | |||
log.info("更新巡检任务状态"); | |||
Inspection updateAnalyseStatus = new Inspection(); | |||
//原本是请求id->(任务id) | |||
updateAnalyseStatus.setId(inspection.getId()); | |||
updateAnalyseStatus.setAnalyseStatus(2); | |||
updateAnalyseStatus.setAiVideoUrl(""); | |||
updateAnalyseStatus.setHeartbeatTime(System.currentTimeMillis()); | |||
inspectionMapper.updateById(updateAnalyseStatus); | |||
//查询问题数 | |||
Integer num = inspectionFileMapper.selectCount(new LambdaQueryWrapper<InspectionFile>() | |||
.eq(InspectionFile::getTenantId, inspection.getTenantId()) | |||
.eq(InspectionFile::getInspectionId, inspection.getId()) | |||
.eq(InspectionFile::getMark, 1)); | |||
if (num > 0) { | |||
// 清空旧数据 | |||
inspectionFileMapper.deleteByInspectionId(inspection.getId()); | |||
} | |||
log.info("开始调用DSP服务"); | |||
// 调用DSP服务 | |||
String url = String.format(Locale.ENGLISH, "%s/api/web/serviceInst/827938791cf98ec2dfceea59726e5085/application", dspDomainUrl, ""); | |||
try { | |||
Map<String, Object> paramMap = new HashMap<>(); | |||
List<Map<String, Object>> configMapList = new ArrayList<>(); | |||
// 运行模式 | |||
Map<String, Object> serviceModeMap = new HashMap<>(); | |||
serviceModeMap.put("ename", "service_mode"); | |||
serviceModeMap.put("evalue", 1); | |||
configMapList.add(serviceModeMap); | |||
// 回调地址 | |||
Map<String, Object> callbackUrlMap = new HashMap<>(); | |||
callbackUrlMap.put("ename", "callback_url"); | |||
callbackUrlMap.put("evalue", dspCallbackUrl); | |||
configMapList.add(callbackUrlMap); | |||
paramMap.put("serviceInstConfigList", configMapList); | |||
// 请求入参 | |||
List<Map<String, Object>> reqMapList = new ArrayList<>(); | |||
Map<String, Object> pushUrlMap = new HashMap<>(); | |||
pushUrlMap.put("ename", "push_url"); | |||
pushUrlMap.put("evalue", dto.getPushUrl()); | |||
reqMapList.add(pushUrlMap); | |||
Map<String, Object> map3 = new HashMap<>(); | |||
map3.put("ename", "pull_url"); | |||
map3.put("evalue", dto.getPullUrl()); | |||
reqMapList.add(map3); | |||
paramMap.put("serviceInstReqList", reqMapList); | |||
log.info(String.format("DSP入参:%s", paramMap)); | |||
log.info("发起DSP服务调用网络请求"); | |||
HttpEntity<Map<String, Object>> httpEntity = new HttpEntity<>(paramMap); | |||
ResponseEntity<JsonResult<DspCallbackVo>> response = restTemplate.exchange(url, HttpMethod.POST, httpEntity, new ParameterizedTypeReference<JsonResult<DspCallbackVo>>() { | |||
}); | |||
log.info(String.format("请求结果:%s", response.getBody())); | |||
if (response == null || !response.hasBody()) { | |||
log.error("调用DSP只能分析服务失败,响应体为空!"); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "调用DSP只能分析服务失败,请重试"); | |||
} | |||
JsonResult<DspCallbackVo> jsonResult = response.getBody(); | |||
log.info(String.format("结果对象转换:%s", jsonResult.getData())); | |||
if (jsonResult.getCode() == HttpStatusEnum.NO_RESOURCES.getCode()) { | |||
log.warn("调用DSP只能分析服务失败, response:{}", JacksonUtil.obj2json(jsonResult)); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), HttpStatusEnum.NO_RESOURCES.getDescription()); | |||
} | |||
log.info("创建或更新通道记录"); | |||
// 创建通道记录数据 | |||
DspCallbackVo dspCallbackVo = jsonResult.getData(); | |||
log.info("dsp响应体:{}", JacksonUtil.obj2StringPretty(dspCallbackVo)); | |||
//通道表新增 | |||
LiveChannel liveChannel = new LiveChannel(); | |||
liveChannel.setInspectionId(inspection.getId()); | |||
liveChannel.setChannelCode(request.getChannelCode()); | |||
//修改通道名 | |||
liveChannel.setName(dto.getName()); | |||
liveChannel.setTenantId(inspection.getTenantId()); | |||
liveChannel.setPushUrl(dspCallbackVo.getPushUrl()); | |||
liveChannel.setPullUrl(dspCallbackVo.getPlayUrl()); | |||
liveChannel.setAipushUrl(dspCallbackVo.getAipushUrl()); | |||
liveChannel.setAipullUrl(dspCallbackVo.getAiplayUrl()); | |||
liveChannel.setCreateTime(DateUtils.now()); | |||
liveChannel.setUpdateTime(DateUtils.now()); | |||
// 查验是否存在 | |||
LiveChannel liveChannelTmp = liveChannelMapper.selectOne(new LambdaQueryWrapper<LiveChannel>() | |||
.eq(LiveChannel::getInspectionId, inspection.getId()) | |||
.eq(LiveChannel::getMark, 1)); | |||
if (liveChannelTmp == null) { | |||
// 添加通道使用记录 | |||
liveChannelMapper.insert(liveChannel); | |||
} else { | |||
// 更新通道使用记录 | |||
liveChannel.setId(liveChannelTmp.getId()); | |||
liveChannelMapper.updateById(liveChannel); | |||
} | |||
log.info("创建业务表数据"); | |||
// 添加业务表数据 | |||
insertBusiness(dspCallbackVo.getRequestId(), inspection.getId(), inspection.getTenantId()); | |||
} catch (Exception e) { | |||
log.error("DSP服务调用失败", e); | |||
// 释放小程序选择的通道 | |||
deliverStreaming(request.getChannelCode()); | |||
throw e; | |||
} | |||
log.info("=========================在线识别调用结束========================="); | |||
} | |||
private JsonResult<Void> useLiveChannel(StreamingDto liveChannelDto) { | |||
StreamingDto channelDto = getLiveStatusService.getLiveStatus(liveChannelDto.getCode()); | |||
// 通道只有空闲状态才能设置占用中 | |||
if (ChannelStatusEnum.IDLE.getCode() != channelDto.getStatus()) { | |||
log.error("通道状态不是空闲中,不能设置通道状态为占用状态!"); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "通道不是空闲状态,请稍后再试!"); | |||
} | |||
String url = String.format(Locale.ENGLISH, "%s%s", channel_url, LIVE_CHANNEL_USELIVECHANNEL_URL); | |||
HttpEntity httpEntity = new HttpEntity(liveChannelDto); | |||
ResponseEntity<JsonResult> response; | |||
try { | |||
response = restTemplate.exchange(url, HttpMethod.PUT, httpEntity, JsonResult.class); | |||
} catch (Exception e) { | |||
deliverStreaming(liveChannelDto.getCode()); | |||
log.error("", e); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "设置通道状态异常!"); | |||
} | |||
if (response == null || !response.hasBody() || response.getBody().getCode() != JsonResult.SUCCESS) { | |||
deliverStreaming(liveChannelDto.getCode()); | |||
log.error("设置占用通道状态失败!"); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "设置通道状态异常!"); | |||
} | |||
return response.getBody(); | |||
} | |||
/** | |||
* 保存kafka信息id和任务id,用于后期取出消息处理任务逻辑 | |||
* | |||
* @param msgId kafka消息id | |||
* @param id 任务id | |||
*/ | |||
private void insertBusiness(String msgId, String id, String tenantId) { | |||
// 将消息任务和kafka任务id消息存入数据库,用于监听获取任务信息 | |||
Business business = new Business(); | |||
business.setMsgId(msgId); | |||
business.setTenantId(tenantId); | |||
business.setType(CommonConstants.INSPECTION_TASK_TYPE); | |||
business.setTypeId(id); | |||
business.setCreateTime(DateUtils.now()); | |||
business.setUpdateTime(DateUtils.now()); | |||
business.setMark(1); | |||
// 查询任务是否存在 | |||
int result = businessMapper.selectCount(new LambdaQueryWrapper<Business>() | |||
.eq(Business::getMsgId, msgId) | |||
.eq(Business::getMark, 1)); | |||
if (result > 0) { // 做了一层保护,可有可无 | |||
businessMapper.update(business, new LambdaQueryWrapper<Business>() | |||
.eq(Business::getMsgId, msgId) | |||
.eq(Business::getMark, 1)); | |||
} else { | |||
businessMapper.insert(business); | |||
} | |||
} | |||
/** | |||
* 释放小程序选择的通道 | |||
* | |||
* @param code 通道编码 | |||
*/ | |||
private void deliverStreaming(String code) { | |||
try { | |||
StreamingDto streaming = getLiveStatusService.getLiveStatus(code); | |||
if (ChannelStatusEnum.USING.getCode() != streaming.getStatus() | |||
|| ChannelStatusEnum.DEACTIVATING.getCode() == streaming.getStatus()) { | |||
return; | |||
} | |||
String url = String.format(Locale.ENGLISH, "%s%s", channel_url, STREAMING_DELIVER_URL); | |||
StreamingDto streamingDto = new StreamingDto(); | |||
streamingDto.setCode(code); | |||
HttpEntity<StreamingDto> httpEntity = new HttpEntity<>(streamingDto); | |||
ResponseEntity<JsonResult> response = restTemplate.exchange(url, HttpMethod.PUT, httpEntity, JsonResult.class); | |||
if (response == null || !response.hasBody()) { | |||
log.error("释放通道失败,响应体为空!"); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "释放通道失败,请重试"); | |||
} | |||
if (response == null || !response.hasBody() || response.getBody().getCode() != JsonResult.SUCCESS) { | |||
log.error("释放通道失败,响应状态码:{}", response.getBody().getCode()); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "释放通道失败,请重试"); | |||
} | |||
} catch (Exception e) { | |||
log.error("释放通道失败", e); | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "释放通道失败,请重试"); | |||
} | |||
} | |||
} |
@@ -0,0 +1,141 @@ | |||
package com.tuoheng.admin.service.inspection.update.flyer; | |||
import cn.hutool.core.util.ObjectUtil; | |||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; | |||
import com.tuoheng.admin.entity.Inspection; | |||
import com.tuoheng.admin.enums.TaskStatusEnum; | |||
import com.tuoheng.admin.mapper.InspectionFileMapper; | |||
import com.tuoheng.admin.mapper.InspectionMapper; | |||
import com.tuoheng.admin.request.inspection.UpdateTaskByCodeRequest; | |||
import com.tuoheng.admin.utils.CurrentUserUtil; | |||
import com.tuoheng.common.core.exception.ServiceException; | |||
import com.tuoheng.common.core.utils.JacksonUtil; | |||
import com.tuoheng.common.core.utils.JsonResult; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.http.HttpStatus; | |||
import org.springframework.stereotype.Service; | |||
import org.springframework.transaction.annotation.Transactional; | |||
/** | |||
* 修改任务(修改任务状态和飞手信息) | |||
* | |||
* @author wanjing | |||
* @team tuoheng | |||
* @date 2022-12-23 | |||
*/ | |||
@Slf4j | |||
@Service | |||
public class UpdateFlyerService { | |||
@Autowired | |||
private InspectionMapper inspectionMapper; | |||
@Autowired | |||
private InspectionFileMapper inspectionFileMapper; | |||
@Autowired | |||
private OnlineIdentifService onlineIdentifService; | |||
@Autowired | |||
private OnlineCompleteService onlineCompleteService; | |||
@Autowired | |||
private OffLineCompleteService offLineCompleteService; | |||
/** | |||
* 修改任务(修改任务状态和飞手信息) | |||
* | |||
* @return | |||
*/ | |||
@Transactional(rollbackFor = Exception.class) | |||
public JsonResult update(UpdateTaskByCodeRequest request) { | |||
log.info("进入修改任务状态和飞手信息业务接口", JacksonUtil.obj2StringPretty(request)); | |||
String tenantId = CurrentUserUtil.getTenantId(); | |||
JsonResult result = this.check(request); | |||
if (0 != result.getCode()) { | |||
log.info("修改任务状态和飞手信息业务接口:校验失败:{}", result.getMsg()); | |||
return result; | |||
} | |||
Inspection inspection = (Inspection) result.getData(); | |||
//1 直播 2离线=非直播 | |||
Integer isLive = inspection.getIsLive(); | |||
Integer status = request.getStatus(); | |||
// 如果任务审核未通过,状态改为待审核 | |||
if (status == TaskStatusEnum.AUDITFAILED.getCode()) { | |||
request.setStatus(1); | |||
} | |||
//开始飞行 | |||
if (status == TaskStatusEnum.INFLIGHT.getCode()) { | |||
//直播填充盒子设备 | |||
if (1 == isLive) { | |||
inspection.setCloudBoxId(request.getCloudBoxId()); | |||
inspection.setCloudBoxName(request.getCloudBoxName()); | |||
inspection.setBoxSn(request.getBoxSn()); | |||
} | |||
inspection.setEquipmentId(request.getEquipmentId()); | |||
inspection.setEquipmentName(request.getEquipmentName()); | |||
inspection.setEquipmentMountId(request.getEquipmentMountId()); | |||
inspection.setEquipmentMountName(request.getEquipmentMountName()); | |||
inspection.setStatus(request.getStatus()); | |||
inspection.setFlightHand(request.getFlightHand()); | |||
inspection.setFlightHandName(request.getFlightHandName()); | |||
inspection.setExecutionStartTime(request.getFlightStartTime()); | |||
//更改 | |||
inspectionMapper.updateById(inspection); | |||
} | |||
//飞行完成 | |||
if (status == TaskStatusEnum.COMPLETE.getCode()) { | |||
inspection.setStatus(request.getStatus()); | |||
inspection.setExecutionEndTime(request.getFlightEndTime()); | |||
inspectionMapper.updateById(inspection); | |||
} | |||
// 直播 | |||
if (1 == inspection.getIsLive() && status == TaskStatusEnum.INFLIGHT.getCode()) { | |||
onlineIdentifService.handle(request, inspection); | |||
} | |||
//离线 | |||
if (2 == inspection.getIsLive() && status == TaskStatusEnum.COMPLETE.getCode()) { | |||
offLineCompleteService.handle(inspection); | |||
} | |||
// 如果直播状态,飞行完成修改状态为待关闭(如果任务分析状态为失败或成功,不修改通道状态) | |||
if (1 == inspection.getIsLive() && status == TaskStatusEnum.COMPLETE.getCode() | |||
&& 4 != inspection.getAnalyseStatus() | |||
&& 5 != inspection.getAnalyseStatus() | |||
&& 6 != inspection.getAnalyseStatus()) { | |||
onlineCompleteService.handle(request, inspection); | |||
} | |||
return JsonResult.success(); | |||
} | |||
/** | |||
* 检查参数 | |||
* | |||
* @param request | |||
* @return | |||
*/ | |||
private JsonResult check(UpdateTaskByCodeRequest request) { | |||
if (ObjectUtil.isNull(request.getStatus())) { | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "变量status不能为空"); | |||
} | |||
if (ObjectUtil.isNull(request.getTaskCode())) { | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "变量id不能为空"); | |||
} | |||
// 检查任务是否存在 | |||
Inspection inspection = inspectionMapper.selectOne(new LambdaQueryWrapper<Inspection>() | |||
.eq(Inspection::getCode, request.getTaskCode()) | |||
.eq(Inspection::getMark, 1)); | |||
if (inspection == null) { | |||
throw new ServiceException(HttpStatus.BAD_REQUEST.value(), "当前任务不存在"); | |||
} | |||
return JsonResult.success(inspection); | |||
} | |||
} |
@@ -0,0 +1,47 @@ | |||
package com.tuoheng.admin.vo.inspection; | |||
import lombok.Data; | |||
@Data | |||
public class InspectionChannelVo { | |||
/** | |||
* 任务唯一键 | |||
*/ | |||
private Integer id; | |||
/** | |||
* 任务编号 | |||
*/ | |||
private String code; | |||
/** | |||
* 通道编号 | |||
*/ | |||
private String channelCode; | |||
/** | |||
* 无人机推流地址 | |||
*/ | |||
private String push_url; | |||
/** | |||
* 无人机拉流地址 | |||
*/ | |||
private String pull_url; | |||
/** | |||
* AI推流地址 | |||
*/ | |||
private String aipush_url; | |||
/** | |||
* AI拉流地址 | |||
*/ | |||
private String aipull_url; | |||
/** | |||
* live_channel_id | |||
*/ | |||
private String live_channel_id; | |||
} |