| | |
| | | import com.aliyuncs.exceptions.ClientException; |
| | | import com.aliyuncs.iot.model.v20180120.*; |
| | | import com.sandu.common.execption.BusinessException; |
| | | import com.sandu.ximon.admin.dto.DeviceStatus; |
| | | import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil; |
| | | import com.sandu.ximon.admin.manager.iot.frame.IRequestFrame; |
| | | import com.sandu.ximon.admin.manager.iot.frame.inner.BaseResponseInnerFrame; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.dto.InvokeParam; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.dto.WrapResponseCommonFrame; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.enums.DeviceStateEnum; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.topic.IBaseTopic; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.topic.ICustomizeTopic; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.BeanUtils; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Base64; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap; |
| | | |
| | | /** |
| | | * @author chenjiantian |
| | | * @date 2021/12/2 18:21 |
| | | */ |
| | | @Slf4j |
| | | public abstract class BaseInvokeSyncService implements IInvoke, IBaseTopic, ICustomizeTopic { |
| | | |
| | | /** |
| | |
| | | * @param clz 内部帧响应实体类 如果不想自己解析 可带上 |
| | | * @return 返回的解码帧实体类 同时把内部帧解码之后也携带上 |
| | | */ |
| | | public <T extends BaseResponseInnerFrame<T>> WrapResponseCommonFrame<T> sendRRPC(String deviceName, IRequestFrame iRequestFrame, Class<T> clz){ |
| | | public <T extends BaseResponseInnerFrame<T>> WrapResponseCommonFrame<T> sendRRPC(String deviceName, IRequestFrame iRequestFrame, Class<T> clz) { |
| | | CommonFrame commonFrame = sendRRPC(deviceName, iRequestFrame); |
| | | if (commonFrame == null) { |
| | | return null; |
| | |
| | | } |
| | | |
| | | @Override |
| | | public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame) { |
| | | InvokeParam param = new InvokeParam(); |
| | | param.setOperate("1001"); |
| | | param.setFrame(iRequestFrame.getEncodeFrame()); |
| | | InvokeThingServiceResponse.Data data = invokeThing(deviceName, param); |
| | | if (data == null) { |
| | | return null; |
| | | } |
| | | String result = data.getResult(); |
| | | result = result.replace("\\", ""); |
| | | Map map = JSON.parseObject(result, Map.class); |
| | | result = (String) map.get("msg"); |
| | | CommonFrame connectFrame = FrameUtils.transformMessageToFrame(result); |
| | | return connectFrame; |
| | | public CommonFrame sendRRPC(String deviceName, InvokeParam invokeParam) { |
| | | // InvokeThingServiceResponse.Data data = invokeThing(deviceName, invokeParam, false); |
| | | // if (data == null) { |
| | | // return null; |
| | | // } |
| | | // String result = data.getResult(); |
| | | // result = result.replace("\\", ""); |
| | | // Map map = JSON.parseObject(result, Map.class); |
| | | // result = (String) map.get("msg"); |
| | | String result = MqttClientUtil.sendMqttMsg(deviceName,invokeParam.getFrame()); |
| | | return FrameUtils.transformMessageToFrame(result); |
| | | } |
| | | |
| | | @Override |
| | | public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame) { |
| | | // InvokeParam param = new InvokeParam(); |
| | | // param.setOperate("1001"); |
| | | // param.setFrame(iRequestFrame.getEncodeFrame()); |
| | | // InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false); |
| | | // if (data == null) { |
| | | // return null; |
| | | // } |
| | | // String result = data.getResult(); |
| | | // result = result.replace("\\", ""); |
| | | // Map map = JSON.parseObject(result, Map.class); |
| | | // result = (String) map.get("msg"); |
| | | String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString(); |
| | | String result = MqttClientUtil.sendMqttMsg(deviceName,frame); |
| | | log.info("自定义sendRRPC:请求帧:{},\n,响应帧:{}",iRequestFrame.toString(),result); |
| | | return FrameUtils.transformMessageToFrame(result); |
| | | } |
| | | |
| | | @Override |
| | | public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame, boolean resendFlag) { |
| | | // InvokeParam param = new InvokeParam(); |
| | | // param.setOperate("1001"); |
| | | // param.setFrame(iRequestFrame.getEncodeFrame()); |
| | | // InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true); |
| | | // if (data == null) { |
| | | // return null; |
| | | // } |
| | | // String result = data.getResult(); |
| | | // result = result.replace("\\", ""); |
| | | // Map map = JSON.parseObject(result, Map.class); |
| | | // result = (String) map.get("msg"); |
| | | String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString(); |
| | | String result = MqttClientUtil.sendMqttMsg(deviceName,frame); |
| | | return FrameUtils.transformMessageToFrame(result); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * 基础通信同步调用 |
| | |
| | | |
| | | @Override |
| | | public List<BatchGetDeviceStateResponse.DeviceStatus> batchGetDeviceState(List<String> deviceNames) { |
| | | BatchGetDeviceStateRequest request = new BatchGetDeviceStateRequest(); |
| | | request.setDeviceNames(deviceNames); |
| | | request.setProductKey(getProductKey()); |
| | | BatchGetDeviceStateResponse response = invokeSync(request); |
| | | if (response != null && response.getSuccess()) { |
| | | return response.getDeviceStatusList(); |
| | | } |
| | | return null; |
| | | List<BatchGetDeviceStateResponse.DeviceStatus> statusList = new ArrayList<>(); |
| | | |
| | | deviceNames.forEach(l -> { |
| | | BatchGetDeviceStateResponse.DeviceStatus deviceStatus = new BatchGetDeviceStateResponse.DeviceStatus(); |
| | | deviceStatus.setDeviceName(l); |
| | | |
| | | if (localMqttConnectStatusMap.get(l)!=null && |
| | | localMqttConnectStatusMap.get(l)== 1){ |
| | | deviceStatus.setStatus("ONLINE"); |
| | | }else { |
| | | deviceStatus.setStatus("OFFLINE"); |
| | | } |
| | | statusList.add(deviceStatus); |
| | | }); |
| | | |
| | | |
| | | return statusList; |
| | | } |
| | | |
| | | /** |
| | | /** |
| | | * 调用自定义topic |
| | | * https://help.aliyun.com/document_detail/69584.htm?spm=a2c4g.11186623.0.0.25b33982bSQSom#reference-snk-mrz-wdb |
| | | * |
| | |
| | | * @param param 参数 |
| | | * @return 设备返回的结果 |
| | | */ |
| | | protected InvokeThingServiceResponse.Data invokeThing(String deviceName, InvokeParam param) { |
| | | protected InvokeThingServiceResponse.Data invokeThing(String deviceName, InvokeParam param, boolean resendFlag) { |
| | | // 填充服务调用的参数 |
| | | InvokeThingServiceRequest request = new InvokeThingServiceRequest(); |
| | | // 设备证书之productKey |
| | | request.setProductKey(getProductKey()); |
| | | // 设备证书之deviceName |
| | | request.setDeviceName(deviceName); |
| | | request.setSysReadTimeout(5000); |
| | | // 要调用的服务标识符,取决于服务端定义 目前rrpc标识代表同步调用 |
| | | request.setIdentifier("rrpc"); |
| | | request.setArgs(JSON.toJSONString(param)); |
| | | |
| | | // 获得服务调用响应 |
| | | InvokeThingServiceResponse response = null; |
| | | try { |
| | | response = getClient().getAcsResponse(request); |
| | | } catch (ClientException e) { |
| | | e.printStackTrace(); |
| | | if (resendFlag) { |
| | | return getAcsResponse(request, true, 1); |
| | | |
| | | } else { |
| | | return getAcsResponse(request, false, null); |
| | | |
| | | } |
| | | if (response == null) { |
| | | throw new BusinessException("调用rrpc失败"); |
| | | } |
| | | |
| | | /** |
| | | * 获得服务调用响应 |
| | | * |
| | | * @param request |
| | | * @return |
| | | */ |
| | | private InvokeThingServiceResponse.Data getAcsResponse(InvokeThingServiceRequest request, boolean resendFlag, Integer time) { |
| | | |
| | | if (resendFlag) { |
| | | //需要重发 |
| | | if (time <= 2) { |
| | | InvokeThingServiceResponse response = null; |
| | | try { |
| | | response = getClient().getAcsResponse(request); |
| | | if (response == null) { |
| | | log.error("调用rrpc失败,响应为空!"); |
| | | return getAcsResponse(request, true, time + 1); |
| | | } |
| | | if (!response.getSuccess()) { |
| | | log.error("调用rrpc失败," + response.getErrorMessage()); |
| | | return getAcsResponse(request, true, time + 1); |
| | | } else { |
| | | // 服务调用成功,仅代表发送服务指令的成功,不代表执行服务本身是否成功 |
| | | return response.getData(); |
| | | } |
| | | } catch (Exception e) { |
| | | return getAcsResponse(request, true, time + 1); |
| | | // throw new BusinessException("错误信息:" + e.getMessage()); |
| | | } |
| | | } else { |
| | | throw new BusinessException("硬件请求失败,请检查硬件设备!"); |
| | | } |
| | | |
| | | } else { |
| | | //不需要重发 |
| | | InvokeThingServiceResponse response = null; |
| | | try { |
| | | response = getClient().getAcsResponse(request); |
| | | } catch (ClientException e) { |
| | | e.printStackTrace(); |
| | | throw new BusinessException("硬件请求失败,请检查硬件设备!"); |
| | | } |
| | | if (response == null) { |
| | | throw new BusinessException("调用rrpc失败,响应为空!"); |
| | | } |
| | | if (!response.getSuccess()) { |
| | | throw new BusinessException("调用rrpc失败," + response.getErrorMessage()); |
| | | } |
| | | // 服务调用成功,仅代表发送服务指令的成功,不代表执行服务本身是否成功 |
| | | if (response.getSuccess()) { |
| | | // 仅同步服务有result |
| | | return response.getData(); |
| | | } |
| | | return null; |
| | | } |
| | | if (!response.getSuccess()) { |
| | | throw new BusinessException("调用rrpc失败," + response.getErrorMessage()); |
| | | } |
| | | // 服务调用成功,仅代表发送服务指令的成功,不代表执行服务本身是否成功 |
| | | if (response.getSuccess()) { |
| | | // 仅同步服务有result |
| | | return response.getData(); |
| | | } |
| | | return null; |
| | | |
| | | |
| | | } |
| | | } |