2021与蓝度共同重构项目,服务端
新增本地MQTT模块,新增xm本地开发配置环境,启用了心跳包自动插入单灯设备,覆写了原rrpc方法,原阿里云自定义事件调用已注释掉
已修改8个文件
已添加12个文件
894 ■■■■■ 文件已修改
sandu-common/src/main/resources/logback-spring.xml 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/pom.xml 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java 89 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/DefaultMqttCallBack.java 119 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java 94 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java 80 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/resources/application-xm.yml 111 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/resources/application.yml 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sandu-common/src/main/resources/logback-spring.xml
@@ -74,6 +74,11 @@
        </root>
    </springProfile>
    <springProfile name="xm">
        <root level="info" >
            <appender-ref ref="STDOUT"/>
        </root>
    </springProfile>
    <!-- ç”Ÿäº§çŽ¯å¢ƒ. æ—¥å¿—级别为WARN且写日志文件-->
    <springProfile name="prod,docker">
        <root level="warn">
ximon-admin/pom.xml
@@ -121,6 +121,18 @@
            <version>5.6.0</version>
        </dependency>
        <!--本地mqtt依赖-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.16</version>
        </dependency>
    </dependencies>
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,89 @@
package com.sandu.ximon.admin.localMQTT.callback;
import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * @author van
 * @version 1.0
 * msg:MQTT回调抽象类
 * @date 2022/11/9 16:23
 */
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {
    private String clientId;
    private MqttConnectOptions connectOptions;
    public String getClientId() {
        return clientId;
    }
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
    public MqttConnectOptions getConnectOptions() {
        return connectOptions;
    }
    public void setConnectOptions(MqttConnectOptions connectOptions) {
        this.connectOptions = connectOptions;
    }
    /**
     * å¤±åŽ»è¿žæŽ¥æ“ä½œ,进行重连
     *
     * @param throwable å¼‚常
     */
    @Override
    public void connectionLost(Throwable throwable) {
        try {
            if (null != clientId) {
                if (null != connectOptions) {
                    MqttClientManager.getInstance().getMqttClientById(clientId).connect(connectOptions);
                } else {
                    MqttClientManager.getInstance().getMqttClientById(clientId).connect();
                }
            }
        } catch (Exception e) {
            log.error("{} reconnect failed!", e);
        }
    }
    /**
     * æŽ¥æ”¶è®¢é˜…消息
     *
     * @param topic           ä¸»é¢˜
     * @param mqttMessage æŽ¥æ”¶æ¶ˆæ¯
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        String content = new String(mqttMessage.getPayload());
        log.info("Receive topic[{}],message={}", topic, content);
        handleReceiveMessage(topic, content);
    }
    /**
     * æ¶ˆæ¯å‘送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息发送成功");
    }
    /**
     * å¤„理接收的消息
     *
     * @param topic   ä¸»é¢˜
     * @param message æ¶ˆæ¯å†…容
     */
    protected abstract void handleReceiveMessage(String topic, String message);
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/DefaultMqttCallBack.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,119 @@
package com.sandu.ximon.admin.localMQTT.callback;
import com.alibaba.fastjson.JSON;
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
import com.sandu.ximon.admin.manager.iot.amqp.AmqpMessageListener;
import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage;
import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
import com.sandu.ximon.admin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
/**
 * @author van
 * @version 1.0
 * msg:默认回调
 * @date 2022/11/9 16:24
 */
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {
    private static final String localMqttConnectTypeOfSync = "1";
    private static final String localMqttConnectTypeOfAsync = "2";
    public static final String localMqttSyncFrame = "local_mqtt_sync_frame:";
    protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000), new DefaultMqttCallBack.NameTreadFactory());
    static class NameTreadFactory implements ThreadFactory {
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
        }
    }
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
        log.info("接收到消息---DefaultCallBack:topic={},message={}", topic, message);
    }
    /**
     * åœ¨è¿™é‡Œå¤„理您收到消息后的具体业务逻辑。
     */
    private void processMessage(String topic,String messageString) {
        try {
            String mac = topic.split("/")[3];
            LocalMqttMsg localMqttMsg = JSON.parseObject(messageString, LocalMqttMsg.class);
            if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfAsync)){
                // è®¾å¤‡æ•°æ®ä¸ŠæŠ¥
                processTask(null,mac, localMqttMsg);
            }
            else if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfSync)){
                System.out.println(localMqttMsg.getPayload());
                MQTT_RETURN_FRAME_MAP.put(mac,localMqttMsg.getPayload());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * å¤„理任务
     *
     * @param productKey äº§å“ç 
     * @param deviceName äº§å“åç§°
     * @param localMqttMsg      ä¸ŠæŠ¥æ¶ˆæ¯localMqttMsg
     */
    private void processTask(String productKey, String deviceName, LocalMqttMsg localMqttMsg) {
        if (localMqttMsg == null) {
            return;
        }
        log.info("处理订阅");
        log.info(localMqttMsg.toString());
        CommonFrame frame = HexFrameUtils.transformMessageToFrame(localMqttMsg.getPayload());
        if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) {
            // å•灯数据上报处理
            LightDataProcessor.getInstance().process(productKey, deviceName, frame);
        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) {
            // C3充电桩上报处理
            c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame);
        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) {
            // å¤§æ°”数据指令上报
            AirDataProcessor.getInstance().process(productKey, deviceName, frame);
        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) {
            PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame);
        }
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,40 @@
package com.sandu.ximon.admin.localMQTT.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author van
 * @version 1.0
 * msg:MQTT订阅回调环境类
 * @date 2022/11/9 16:25
 */
@Component
@Slf4j
public class MqttCallBackContext {
    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();
    /**
     * é»˜è®¤æž„造函数
     *
     * @param callBackMap å›žè°ƒé›†åˆ
     */
    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
        this.callBackMap.clear();
        callBackMap.forEach((k, v) -> this.callBackMap.put(k, v));
    }
    /**
     * èŽ·å–MQTT回调类
     *
     * @param clientId å®¢æˆ·ç«¯ID
     * @return MQTT回调类
     */
    public AbsMqttCallBack getCallBack(String clientId) {
        return this.callBackMap.get(clientId);
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,51 @@
package com.sandu.ximon.admin.localMQTT.client;
import com.sandu.ximon.admin.localMQTT.config.MqttConfig;
import com.sandu.ximon.admin.localMQTT.model.MqttClientVO;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author van
 * @version 1.0
 * msg:MQTT客户端创建
 * @date 2022/11/9 16:31
 */
@Component
@Slf4j
public class MqttClientCreate {
    @Autowired
    private MqttConfig mqttConfig;
    @Resource
    private MqttClientManager mqttClientManager;
    /**
     * å­˜å‚¨MQTT客户端
     */
    public static final Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();
    /**
     * åˆ›å»ºMQTT客户端
     */
    @PostConstruct
    public void createMqttClient() {
        System.out.println("createMqttClient");
        List<MqttClientVO> mqttClientList = mqttConfig.getClientList();
        for (MqttClientVO mqttClient : mqttClientList) {
            //创建客户端,客户端ID:demo,回调类跟客户端ID一致
            System.out.println(mqttClient.getClientId());
            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
        }
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,94 @@
package com.sandu.ximon.admin.localMQTT.client;
import com.sandu.ximon.admin.localMQTT.callback.AbsMqttCallBack;
import com.sandu.ximon.admin.localMQTT.callback.MqttCallBackContext;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import static com.sandu.ximon.admin.localMQTT.client.MqttClientCreate.MQTT_CLIENT_MAP;
/**
 * @author van
 * @version 1.0
 * msg:MQTT客户端管理类,如果客户端非常多后续可入redis缓存
 * @date 2022/11/9 16:30
 */
@Slf4j
@Component
public class MqttClientManager {
    @Value("${customer.mqtt.broker}")
    private String mqttBroker;
    @Resource
    private MqttCallBackContext mqttCallBackContext;
    public MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }
    private static MqttClientManager mqttClientManager = new MqttClientManager();
    public static MqttClientManager getInstance() {
        return mqttClientManager;
    }
    /**
     * åˆ›å»ºmqtt客户端
     *
     * @param clientId       å®¢æˆ·ç«¯ID
     * @param subscribeTopic è®¢é˜…主题,可为空
     * @param userName       ç”¨æˆ·åï¼Œå¯ä¸ºç©º
     * @param password       å¯†ç ï¼Œå¯ä¸ºç©º
     * @return mqtt客户端
     */
    public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            if (null != userName && !"".equals(userName)) {
                connOpts.setUserName(userName);
            }
            if (null != password && !"".equals(password)) {
                connOpts.setPassword(password.toCharArray());
            }
            connOpts.setCleanSession(true);
            connOpts.setKeepAliveInterval(10);
            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
                if (null == callBack) {
                    callBack = mqttCallBackContext.getCallBack("default");
                }
                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);
            }
            //连接mqtt服务端broker
            client.connect(connOpts);
            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                client.subscribe(subscribeTopic);
            }
            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
        } catch (MqttException e) {
            log.error("Create mqttClient failed!", e);
        }
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,30 @@
package com.sandu.ximon.admin.localMQTT.config;
import com.sandu.ximon.admin.localMQTT.model.MqttClientVO;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
 * @author van
 * @version 1.0
 * msg:Mqtt配置类
 * @date 2022/11/9 16:21
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
    /**
     * mqtt broker地址
     */
    String broker;
    /**
     * éœ€è¦åˆ›å»ºçš„MQTT客户端
     */
    List<MqttClientVO> clientList;
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,42 @@
package com.sandu.ximon.admin.localMQTT.controller;
import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import static java.lang.Thread.sleep;
/**
 * @author van
 * @version 1.0
 * msg:
 * @date 2022/11/11 14:18
 */
@RestController
@RequestMapping("/localMQTT")
@Slf4j
public class localMQTTTestController {
    @RequestMapping("/test")
    public static String localMQTT() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            String result1 = MqttClientUtil.sendMqttMsg("363832544E5008FF3A32FFFF",
                    "FEA501000BFE010003FFFF0045971477B6D8C2CA");
            log.info("返回结果:"+result1);
            sleep(1000);
            String result2 = MqttClientUtil.sendMqttMsg("363832544E5008FF3A32FFFF",
                    "FEA501000BFE010003FFFF640F48B1367ABDE0B9");
            log.info("返回结果:"+result2);
            sleep(1000);
        }
        return "OK";
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,31 @@
package com.sandu.ximon.admin.localMQTT.model;
import lombok.Data;
/**
 * @author van
 * @version 1.0
 * msg:本地mqtt消息结构
 * @date 2022/11/9 13:55
 */
@Data
public class LocalMqttMsg {
    private String timestamp;
    private String connectType;
    private String msgType;
    private String payload;
    public LocalMqttMsg(String timestamp, String connectType, String msgType, String payload) {
        this.timestamp = timestamp;
        this.connectType = connectType;
        this.msgType = msgType;
        this.payload = payload;
    }
    public LocalMqttMsg() {
        this.timestamp = String.valueOf(System.currentTimeMillis());
        this.connectType = "1";
        this.msgType = "1";
        this.payload = "FEA501000BFE010003FFFF640F48B1367ABDE0B9";
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,32 @@
package com.sandu.ximon.admin.localMQTT.model;
import lombok.Data;
/**
 * @author van
 * @version 1.0
 * msg: MQTT客户端
 * @date 2022/11/9 16:20
 */
@Data
public class MqttClientVO {
    /**
     * å®¢æˆ·ç«¯ID
     */
    private String clientId;
    /**
     * ç›‘听主题
     */
    private String subscribeTopic;
    /**
     * ç”¨æˆ·å
     */
    private String userName;
    /**
     * å¯†ç 
     */
    private String password;
    /**
     * æŽ¨é€ä¸»é¢˜
     * */
    private String publishTopic;
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,80 @@
package com.sandu.ximon.admin.localMQTT.util;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.StrUtil;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.util.CRC32Utils;
/**
 * @author chenjiantian
 * @date 2021/12/3 17:36
 */
public class HexFrameUtils {
    /**
     * æœ¬åœ°MQTT æ¶ˆæ¯ä¸éœ€è¦base64转化,直接使用hex通信
     * æŠŠä¸ŠæŠ¥çš„内容 è½¬åŒ–成帧实体类
     * @param hex ä¸ŠæŠ¥å†…容 {{@link com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage.Params}}
     * @return å¸§å®žä½“ç±»
     */
    public static CommonFrame transformMessageToFrame(String hex) {
        if (hex == null || hex.length() < 18) {
            return null;
        }
        //  å°†åå…­è¿›åˆ¶æ•°è½¬ä¸ºå¤§å†™
        hex = hex.toUpperCase();
        CommonFrame frame = new CommonFrame();
        // MQTT通信方式(1)
        frame.setConnectType(hex.substring(0, 2));
        //  åŠŸèƒ½ç (1)
        frame.setFunctionCode(hex.substring(2, 4));
        //  å‘½ä»¤ç±»åž‹(1)
        frame.setOrderType(hex.substring(4, 6));
        //  è´Ÿè·é•¿åº¦(2)
        frame.setPayloadLength(hex.substring(6, 10));
        //  å“åº”payload
        frame.setPayload(hex.substring(10, hex.length() - 8));
        //  æ ¡éªŒç (4)
        frame.setCrc32(hex.substring(hex.length() - 8));
        //  åˆ¤æ–­æ ¡éªŒæ˜¯å¦é€šè¿‡
        String content = hex.substring(2, hex.length() - 8);
        frame.setValidate(CRC32Utils.validateFrame(content, frame.getCrc32()));
        return frame;
    }
    public static void main(String[] args) {
        System.out.println(HexUtil.hexToInt("FF"));
        System.out.println(transformMessageToFrame("/qWEACrwAQAi//8BAQEYAB4CAAAAAAAAAApcAAAAEAAVAAAAAwAQAagAAMPfU3KbHyny").toString());
    }
    /**
     * å°†è®¾å¤‡ä¸ŠæŠ¥çš„内容解码
     *
     * @param value è®¾å¤‡ä¸ŠæŠ¥çš„内容
     * @return è§£ç åŽçš„内容,16进制
     */
    public static String decodeReportMessage(String value) {
        if (StrUtil.isBlank(value)) {
            return null;
        }
        byte[] decode = Base64.decode(value.getBytes());
        return HexUtil.encodeHexStr(decode);
    }
    /**
     * å°†å‡†å¤‡å‘送的内容编码
     * @param value å¾…编码内容
     * @return ç¼–码后的内容 å­—符串
     */
    public static String encodeReportMessage(String value) {
        if (StrUtil.isBlank(value)) {
            return null;
        }
        byte[] bytes = HexUtil.decodeHex(value);
        String encode = Base64.encode(bytes);
        return encode;
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,90 @@
package com.sandu.ximon.admin.localMQTT.util;
import com.alibaba.fastjson.JSON;
import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
import com.sandu.ximon.admin.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static java.lang.Thread.sleep;
/**
 * @author van
 * @version 1.0
 * msg:MQTT客户端工具类
 * @date 2022/11/9 16:32
 */
@Slf4j
public class MqttClientUtil {
    private static MqttClientUtil mqttClientUtil = new MqttClientUtil();
    public static MqttClientUtil getInstance() {
        return mqttClientUtil;
    }
    public static String publishPrefix = "v1/devices/request/";
    public static String clientId = "java_server";
    public static final Map<String, String> MQTT_RETURN_FRAME_MAP = new ConcurrentHashMap<>();
    public static String sendMqttMsg(String topic, String content) {
        try {
            LocalMqttMsg localMqttMsg = new LocalMqttMsg();
            localMqttMsg.setPayload(content);
            MqttMessage message = new MqttMessage(JSON.toJSONString(localMqttMsg).getBytes());
             message.setQos(0);
            MqttClient mqttClient = MqttClientManager.getInstance().getMqttClientById(clientId);
            if (null == mqttClient) {
                log.error("Not exist mqttClient where it's clientId is {}", clientId);
                return topic;
            }
            long start = System.currentTimeMillis();
            mqttClient.publish(publishPrefix+topic, message);
            log.info("{}",publishPrefix+topic);
            log.info("{}", message);
            /**
             * å®žçŽ°ä¼ªåŒæ­¥
             * */
            try {
                String returnFrame = null ;
                returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
                for (int i = 0;i < 10;i++){
                    if (!StringUtil.strIsNullOrEmpty(returnFrame)){
                        sleep(500);
                        returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
                    }else {
                        log.info("返回时间:{} ms",System.currentTimeMillis() - start) ;
                        MQTT_RETURN_FRAME_MAP.remove(topic);
                        return returnFrame;
                    }
                }
            } catch (InterruptedException e) {
            }
        } catch (MqttException e) {
            log.error("MqttClient send msg faild!", e);
        }
        return null;
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java
@@ -45,7 +45,7 @@
            if (heartbeatReportInnerFrame.isValidate()) {
                SpringContextHolder.getBean(LightReportDataService.class).saveReportData(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
                //心跳包上报不保存硬件设备信息
//                SpringContextHolder.getBean(LightService.class).saveLight(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
                SpringContextHolder.getBean(LightService.class).saveLight(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
            }
        } else if (A5LightReportEnum.Time_Synchronized.getCode().equals(functionCode)) {
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java
@@ -48,4 +48,5 @@
     * @return è´Ÿè·
     */
    String getPayload();
}
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java
@@ -6,6 +6,7 @@
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.iot.model.v20180120.*;
import com.sandu.common.execption.BusinessException;
import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil;
import com.sandu.ximon.admin.manager.iot.frame.IRequestFrame;
import com.sandu.ximon.admin.manager.iot.frame.inner.BaseResponseInnerFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
@@ -17,6 +18,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.Base64;
import java.util.List;
import java.util.Map;
@@ -65,33 +67,37 @@
    @Override
    public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame) {
        InvokeParam param = new InvokeParam();
        param.setOperate("1001");
        param.setFrame(iRequestFrame.getEncodeFrame());
        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false);
        if (data == null) {
            return null;
        }
        String result = data.getResult();
        result = result.replace("\\", "");
        Map map = JSON.parseObject(result, Map.class);
        result = (String) map.get("msg");
//        InvokeParam param = new InvokeParam();
//        param.setOperate("1001");
//        param.setFrame(iRequestFrame.getEncodeFrame());
//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false);
//        if (data == null) {
//            return null;
//        }
//        String result = data.getResult();
//        result = result.replace("\\", "");
//        Map map = JSON.parseObject(result, Map.class);
//        result = (String) map.get("msg");
        String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
        String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
        return FrameUtils.transformMessageToFrame(result);
    }
    @Override
    public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame, boolean resendFlag) {
        InvokeParam param = new InvokeParam();
        param.setOperate("1001");
        param.setFrame(iRequestFrame.getEncodeFrame());
        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true);
        if (data == null) {
            return null;
        }
        String result = data.getResult();
        result = result.replace("\\", "");
        Map map = JSON.parseObject(result, Map.class);
        result = (String) map.get("msg");
//        InvokeParam param = new InvokeParam();
//        param.setOperate("1001");
//        param.setFrame(iRequestFrame.getEncodeFrame());
//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true);
//        if (data == null) {
//            return null;
//        }
//        String result = data.getResult();
//        result = result.replace("\\", "");
//        Map map = JSON.parseObject(result, Map.class);
//        result = (String) map.get("msg");
        String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
        String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
        return FrameUtils.transformMessageToFrame(result);
    }
ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java
@@ -24,7 +24,7 @@
    private WaterQualityEquipmentService waterQualityEquipmentService;
    private WaterQualityDataService waterQualityDataService;
    @Scheduled(cron = "0 0 0/1 * * ?")
//    @Scheduled(cron = "0 0 0/1 * * ?")
//    @Scheduled(cron = "0 0/2 * * * ? ")
    public void UserSubjectRefund() {
        List<WaterQualityEquipmentBo> waterQualityEquipmentList = waterQualityEquipmentService.listWaterQualityEquipment();
ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java
@@ -1027,10 +1027,10 @@
    public void timeSynchronizationInitiative(String deviceCode, String lightAddress) {
        //单灯信息
        Light light = getLight(deviceCode);
        if (light == null) {
            log.error("单灯主动同步时间请求异常,单灯信息不存在!");
            return;
        }
//        if (light == null) {
//            log.error("单灯主动同步时间请求异常,单灯信息不存在!");
//            return;
//        }
        //单灯任务信息
        LightTaskPoleRelation lightTaskPoleRelation = SpringContextHolder.getBean(LightTaskPoleRelationService.class)
                .getOne(Wrappers.lambdaQuery(LightTaskPoleRelation.class)
ximon-admin/src/main/resources/application-xm.yml
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,111 @@
spring:
  datasource:
    # æ•°æ®åº“用户名
    username: root
    # æ•°æ®åº“密码
    password: zhxm2512209
    url: jdbc:mysql://39.103.154.108:2512/xm_dev?useUnicode=true&autoReconnect=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      connection-init-sqls: set names utf8mb4
      driver-class-name: com.mysql.cj.jdbc.Driver
  redis:
    host: 39.103.154.108
    password: zhxm2512209
    port: 6379
    database: 0
server:
  port: 20017
sandu:
  jwt:
    header: Authorization
    # ä»¤ç‰Œå‰ç¼€
    token-start-with: Bearer
    # å¿…须使用最少88位的Base64对该令牌进行编码
    base64-secret: U1GZNSKOH43V19GNIVE8HUYM9H86K657V1D66EAVSL9Q023J4JNWE44BNHCS6V9E66BPKF0KXUI5R1ZOYK2OWZZYALPD07JHOYUROL930UGJQUJDNAEYNTMUS27BHKTJEF9011DGGQ4QT9BN6CM2P9SY2VV2MZKJPCOW9YIGN0VJ
    # ä»¤ç‰Œè¿‡æœŸæ—¶é—´ æ­¤å¤„单位/毫秒 ï¼Œå¯åœ¨æ­¤ç½‘站生成 https://www.convertworld.com/zh-hans/time/milliseconds.html 1个月
    token-validity-in-seconds: 2629800000
    # åœ¨çº¿ç”¨æˆ·key
    online-key: online-token
    # æ˜¯å¦å¯åЍredis缓存用户信息
    cache-online: false
    #************************本地上传文件配置************************
  upload:
    #文件服务器路径
    upload-root-path: E:\file\novafile
    storage: local
    #服务器文件前缀
    real-url: http://localhost/
  common:
    urlPrefix: http://localhost/
  quartz:
    enable: true
listenter:
  isOpen: false
minio:
  endpoint: 47.106.172.9
  port: 9000
  accessKey: minioadmin
  secretKey: zhxm2512209
  secure: false
# led屏幕服务器地址(更改需要同时更改)
realtime-server:
  #  command: http://101.132.131.91:8081/payload/
  #  url: http://101.132.131.91:8081/
  command: http://112.74.63.130:20018/command/
  url: http://112.74.63.130:20018/
server-conf:
  ip: 127.0.0.1 # 47.106.172.9/101.132.131.91
nova-conf:   #诺瓦回调
  notify-url: http://39.103.154.108:20018/serv/vnnox/progress
  screen-shot-notify-url: http://39.103.154.108:20018/serv/vnnox/screenshot
  status-notify-url: http://39.103.154.108:20018/serv/vnnox/asyncStatus
  #iot产品秘钥
iot:
  access_key: LTAI4G27Af8MZEF55phdMQ4y
  access_secret: KUc2yOtr7TRB4FuF5Wr0dWeTblbEuh
#阿里云oss配置
oss-conf:
  end-point: oss-cn-shanghai.aliyuncs.com
  key-id: LTAI5tPdpt5wvJyLipRijFSP
  key-secret: 1ahYfCKd0yTddsUnuDLQzI23MLh4VQ
  bucket-name: ximonsmart
#新诺瓦
new-nova:
  #依赖地址
  string-path: C:\Users\Administrator\Desktop\novaWin\bin\viplexcore.dll
new-nova-file:
  upload:
    #文件服务器路径
    upload-root-path: E:\file\novafile
    storage: local
    #服务器文件前缀
    real-url: http://localhost/
customer:
  mqtt:
    broker: tcp://127.0.0.1:1883
    clientList:
      #客户端ID
      - clientId: java_server
        #监听主题
        subscribeTopic: v1/devices/response/+
        #用户名
        userName: server_admin
        #密码
        password: zhxm2512209
        #下发主题
        publishTopic: v1/devices/request/
ximon-admin/src/main/resources/application.yml
@@ -1,6 +1,6 @@
spring:
  profiles:
    active: prod
    active: xm
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
@@ -53,4 +53,5 @@
#主板rrpc通信PRODUCT_KEY
#rrpc:
#  key: a1JsfPG4iKW
#  key: a1JsfPG4iKW