2021与蓝度共同重构项目,服务端
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java
@@ -6,6 +6,7 @@
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.iot.model.v20180120.*;
import com.sandu.common.execption.BusinessException;
import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil;
import com.sandu.ximon.admin.manager.iot.frame.IRequestFrame;
import com.sandu.ximon.admin.manager.iot.frame.inner.BaseResponseInnerFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
@@ -14,8 +15,10 @@
import com.sandu.ximon.admin.manager.iot.rrpc.topic.IBaseTopic;
import com.sandu.ximon.admin.manager.iot.rrpc.topic.ICustomizeTopic;
import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.Base64;
import java.util.List;
import java.util.Map;
@@ -23,6 +26,7 @@
 * @author chenjiantian
 * @date 2021/12/2 18:21
 */
@Slf4j
public abstract class BaseInvokeSyncService implements IInvoke, IBaseTopic, ICustomizeTopic {
    /**
@@ -33,7 +37,7 @@
     * @param clz           内部帧响应实体类 如果不想自己解析 可带上
     * @return 返回的解码帧实体类 同时把内部帧解码之后也携带上
     */
    public <T extends BaseResponseInnerFrame<T>> WrapResponseCommonFrame<T> sendRRPC(String deviceName, IRequestFrame iRequestFrame, Class<T> clz){
    public <T extends BaseResponseInnerFrame<T>> WrapResponseCommonFrame<T> sendRRPC(String deviceName, IRequestFrame iRequestFrame, Class<T> clz) {
        CommonFrame commonFrame = sendRRPC(deviceName, iRequestFrame);
        if (commonFrame == null) {
            return null;
@@ -49,11 +53,8 @@
    }
    @Override
    public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame) {
        InvokeParam param = new InvokeParam();
        param.setOperate("1001");
        param.setFrame(iRequestFrame.getEncodeFrame());
        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param);
    public CommonFrame sendRRPC(String deviceName, InvokeParam invokeParam) {
        InvokeThingServiceResponse.Data data = invokeThing(deviceName, invokeParam, false);
        if (data == null) {
            return null;
        }
@@ -63,6 +64,46 @@
        result = (String) map.get("msg");
        return FrameUtils.transformMessageToFrame(result);
    }
    @Override
    public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame) {
//        InvokeParam param = new InvokeParam();
//        param.setOperate("1001");
//        param.setFrame(iRequestFrame.getEncodeFrame());
//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false);
//        if (data == null) {
//            return null;
//        }
//        String result = data.getResult();
//        result = result.replace("\\", "");
//        Map map = JSON.parseObject(result, Map.class);
//        result = (String) map.get("msg");
        String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
        String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
        return FrameUtils.transformMessageToFrame(result);
    }
    @Override
    public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame, boolean resendFlag) {
//        InvokeParam param = new InvokeParam();
//        param.setOperate("1001");
//        param.setFrame(iRequestFrame.getEncodeFrame());
//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true);
//        if (data == null) {
//            return null;
//        }
//        String result = data.getResult();
//        result = result.replace("\\", "");
//        Map map = JSON.parseObject(result, Map.class);
//        result = (String) map.get("msg");
        String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
        String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
        return FrameUtils.transformMessageToFrame(result);
    }
    /**
     * 基础通信同步调用
@@ -150,7 +191,7 @@
        return null;
    }
     /**
    /**
     * 调用自定义topic
     * https://help.aliyun.com/document_detail/69584.htm?spm=a2c4g.11186623.0.0.25b33982bSQSom#reference-snk-mrz-wdb
     *
@@ -158,36 +199,84 @@
     * @param param      参数
     * @return 设备返回的结果
     */
    protected InvokeThingServiceResponse.Data invokeThing(String deviceName, InvokeParam param) {
    protected InvokeThingServiceResponse.Data invokeThing(String deviceName, InvokeParam param, boolean resendFlag) {
        // 填充服务调用的参数
        InvokeThingServiceRequest request = new InvokeThingServiceRequest();
        // 设备证书之productKey
        request.setProductKey(getProductKey());
        // 设备证书之deviceName
        request.setDeviceName(deviceName);
        request.setSysReadTimeout(5000);
        // 要调用的服务标识符,取决于服务端定义 目前rrpc标识代表同步调用
        request.setIdentifier("rrpc");
        request.setArgs(JSON.toJSONString(param));
        //todo
        // 获得服务调用响应
        InvokeThingServiceResponse response = null;
        try {
            response = getClient().getAcsResponse(request);
        } catch (ClientException e) {
            e.printStackTrace();
        if (resendFlag) {
            return getAcsResponse(request, true, 1);
        } else {
            return getAcsResponse(request, false, null);
        }
        if (response == null) {
            throw new BusinessException("调用rrpc失败");
    }
    /**
     * 获得服务调用响应
     *
     * @param request
     * @return
     */
    private InvokeThingServiceResponse.Data getAcsResponse(InvokeThingServiceRequest request, boolean resendFlag, Integer time) {
        if (resendFlag) {
            //需要重发
            if (time <= 2) {
                InvokeThingServiceResponse response = null;
                try {
                    response = getClient().getAcsResponse(request);
                    if (response == null) {
                        log.error("调用rrpc失败,响应为空!");
                        return getAcsResponse(request, true, time + 1);
                    }
                    if (!response.getSuccess()) {
                        log.error("调用rrpc失败," + response.getErrorMessage());
                        return getAcsResponse(request, true, time + 1);
                    } else {
                        // 服务调用成功,仅代表发送服务指令的成功,不代表执行服务本身是否成功
                        return response.getData();
                    }
                } catch (Exception e) {
                    return getAcsResponse(request, true, time + 1);
//                throw new BusinessException("错误信息:" + e.getMessage());
                }
            } else {
                throw new BusinessException("硬件请求失败,请检查硬件设备!");
            }
        } else {
            //不需要重发
            InvokeThingServiceResponse response = null;
            try {
                response = getClient().getAcsResponse(request);
            } catch (ClientException e) {
                e.printStackTrace();
                throw new BusinessException("硬件请求失败,请检查硬件设备!");
            }
            if (response == null) {
                throw new BusinessException("调用rrpc失败,响应为空!");
            }
            if (!response.getSuccess()) {
                throw new BusinessException("调用rrpc失败," + response.getErrorMessage());
            }
            // 服务调用成功,仅代表发送服务指令的成功,不代表执行服务本身是否成功
            if (response.getSuccess()) {
                // 仅同步服务有result
                return response.getData();
            }
            return null;
        }
        if (!response.getSuccess()) {
            throw new BusinessException("调用rrpc失败," + response.getErrorMessage());
        }
        // 服务调用成功,仅代表发送服务指令的成功,不代表执行服务本身是否成功
        if (response.getSuccess()) {
            // 仅同步服务有result
            return response.getData();
        }
        return null;
    }
}