2021与蓝度共同重构项目,服务端
liuhaonan
2022-11-18 b531004a3cbd33d7bb9f5b7dce06ddd4f5e7ec9a
Merge remote-tracking branch 'origin/xm-20221107' into xm-20221107
已修改12个文件
已添加13个文件
1148 ■■■■■ 文件已修改
sandu-common/src/main/resources/logback-spring.xml 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/pom.xml 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java 89 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java 119 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java 94 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java 80 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java 92 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/resources/application-xm.yml 121 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ximon-admin/src/main/resources/application.yml 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sandu-common/src/main/resources/logback-spring.xml
@@ -74,6 +74,11 @@
        </root>
    </springProfile>
    <springProfile name="xm">
        <root level="info" >
            <appender-ref ref="STDOUT"/>
        </root>
    </springProfile>
    <!-- ç”Ÿäº§çŽ¯å¢ƒ. æ—¥å¿—级别为WARN且写日志文件-->
    <springProfile name="prod,docker">
        <root level="warn">
ximon-admin/pom.xml
@@ -121,6 +121,18 @@
            <version>5.6.0</version>
        </dependency>
        <!--本地mqtt依赖-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.16</version>
        </dependency>
    </dependencies>
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,89 @@
package com.sandu.ximon.admin.localMQTT.callback;
import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * @author van
 * @version 1.0
 * msg:MQTT回调抽象类
 * @date 2022/11/9 16:23
 */
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {
    private String clientId;
    private MqttConnectOptions connectOptions;
    public String getClientId() {
        return clientId;
    }
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
    public MqttConnectOptions getConnectOptions() {
        return connectOptions;
    }
    public void setConnectOptions(MqttConnectOptions connectOptions) {
        this.connectOptions = connectOptions;
    }
    /**
     * å¤±åŽ»è¿žæŽ¥æ“ä½œ,进行重连
     *
     * @param throwable å¼‚常
     */
    @Override
    public void connectionLost(Throwable throwable) {
        try {
            if (null != clientId) {
                if (null != connectOptions) {
                    MqttClientManager.getInstance().getMqttClientById(clientId).connect(connectOptions);
                } else {
                    MqttClientManager.getInstance().getMqttClientById(clientId).connect();
                }
            }
        } catch (Exception e) {
            log.error("{} reconnect failed!", e);
        }
    }
    /**
     * æŽ¥æ”¶è®¢é˜…消息
     *
     * @param topic           ä¸»é¢˜
     * @param mqttMessage æŽ¥æ”¶æ¶ˆæ¯
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        String content = new String(mqttMessage.getPayload());
        log.info("Receive topic[{}],message={}", topic, content);
        handleReceiveMessage(topic, content);
    }
    /**
     * æ¶ˆæ¯å‘送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息发送成功");
    }
    /**
     * å¤„理接收的消息
     *
     * @param topic   ä¸»é¢˜
     * @param message æ¶ˆæ¯å†…容
     */
    protected abstract void handleReceiveMessage(String topic, String message);
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,40 @@
package com.sandu.ximon.admin.localMQTT.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author van
 * @version 1.0
 * msg:MQTT订阅回调环境类
 * @date 2022/11/9 16:25
 */
@Component
@Slf4j
public class MqttCallBackContext {
    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();
    /**
     * é»˜è®¤æž„造函数
     *
     * @param callBackMap å›žè°ƒé›†åˆ
     */
    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
        this.callBackMap.clear();
        callBackMap.forEach((k, v) -> this.callBackMap.put(k, v));
    }
    /**
     * èŽ·å–MQTT回调类
     *
     * @param clientId å®¢æˆ·ç«¯ID
     * @return MQTT回调类
     */
    public AbsMqttCallBack getCallBack(String clientId) {
        return this.callBackMap.get(clientId);
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,119 @@
package com.sandu.ximon.admin.localMQTT.callback;
import com.alibaba.fastjson.JSON;
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
import com.sandu.ximon.admin.manager.iot.amqp.AmqpMessageListener;
import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage;
import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
import com.sandu.ximon.admin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
/**
 * @author van
 * @version 1.0
 * msg:默认回调
 * @date 2022/11/9 16:24
 */
@Slf4j
@Component("java_server_msg")
public class MsgMqttCallBack extends AbsMqttCallBack {
    private static final String localMqttConnectTypeOfSync = "1";
    private static final String localMqttConnectTypeOfAsync = "2";
    public static final String localMqttSyncFrame = "local_mqtt_sync_frame:";
    protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000), new MsgMqttCallBack.NameTreadFactory());
    static class NameTreadFactory implements ThreadFactory {
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
        }
    }
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
        log.info("接收到消息---MsgMqttCallBack:topic={},message={}", topic, message);
    }
    /**
     * åœ¨è¿™é‡Œå¤„理您收到消息后的具体业务逻辑。
     */
    private void processMessage(String topic,String messageString) {
        try {
            String mac = topic.split("/")[3];
            LocalMqttMsg localMqttMsg = JSON.parseObject(messageString, LocalMqttMsg.class);
            if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfAsync)){
                // è®¾å¤‡æ•°æ®ä¸ŠæŠ¥
                processTask(null,mac, localMqttMsg);
            }
            else if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfSync)){
                System.out.println(localMqttMsg.getPayload());
                MQTT_RETURN_FRAME_MAP.put(mac,localMqttMsg.getPayload());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * å¤„理任务
     *
     * @param productKey äº§å“ç 
     * @param deviceName äº§å“åç§°
     * @param localMqttMsg      ä¸ŠæŠ¥æ¶ˆæ¯localMqttMsg
     */
    private void processTask(String productKey, String deviceName, LocalMqttMsg localMqttMsg) {
        if (localMqttMsg == null) {
            return;
        }
        log.info("处理订阅");
        log.info(localMqttMsg.toString());
        CommonFrame frame = HexFrameUtils.transformMessageToFrame(localMqttMsg.getPayload());
        if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) {
            // å•灯数据上报处理
            LightDataProcessor.getInstance().process(productKey, deviceName, frame);
        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) {
            // C3充电桩上报处理
            c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame);
        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) {
            // å¤§æ°”数据指令上报
            AirDataProcessor.getInstance().process(productKey, deviceName, frame);
        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) {
            PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame);
        }
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,90 @@
package com.sandu.ximon.admin.localMQTT.callback;
import com.alibaba.fastjson.JSON;
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
/**
 * @author van
 * @version 1.0
 * msg:默认回调
 * @date 2022/11/9 16:24
 */
@Slf4j
@Component("java_server_status")
public class StatusMqttCallBack extends AbsMqttCallBack {
    private static final String localMqttConnectStatusConnected = "connected";
    private static final String localMqttConnectStatusDisconnected = "disconnected";
    public static final Map<String, Integer> localMqttConnectStatusMap = new ConcurrentHashMap<>();
    protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000), new StatusMqttCallBack.NameTreadFactory());
    static class NameTreadFactory implements ThreadFactory {
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
        }
    }
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
        log.info("接收到消息---StatusMqttCallBack:topic={},message={}", topic, message);
    }
    /**
     * åœ¨è¿™é‡Œå¤„理您收到消息后的具体业务逻辑。
     */
    private void processMessage(String topic,String messageString) {
        try {
            String mac = topic.split("/")[4];
            String status = topic.split("/")[5];
            System.out.println("----------------------");
            System.out.println(mac);
            System.out.println(status);
            if (status.equals(localMqttConnectStatusConnected)){
                // è®¾å¤‡æ•°æ®ä¸ŠæŠ¥
                localMqttConnectStatusMap.put(mac,1);
            }
            else if (status.equals(localMqttConnectStatusDisconnected)){
                localMqttConnectStatusMap.put(mac,0);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,51 @@
package com.sandu.ximon.admin.localMQTT.client;
import com.sandu.ximon.admin.localMQTT.config.MqttConfig;
import com.sandu.ximon.admin.localMQTT.model.MqttClientVO;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author van
 * @version 1.0
 * msg:MQTT客户端创建
 * @date 2022/11/9 16:31
 */
@Component
@Slf4j
public class MqttClientCreate {
    @Autowired
    private MqttConfig mqttConfig;
    @Resource
    private MqttClientManager mqttClientManager;
    /**
     * å­˜å‚¨MQTT客户端
     */
    public static final Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();
    /**
     * åˆ›å»ºMQTT客户端
     */
    @PostConstruct
    public void createMqttClient() {
        System.out.println("createMqttClient");
        List<MqttClientVO> mqttClientList = mqttConfig.getClientList();
        for (MqttClientVO mqttClient : mqttClientList) {
            //创建客户端,客户端ID:demo,回调类跟客户端ID一致
            System.out.println(mqttClient.getClientId());
            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
        }
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,94 @@
package com.sandu.ximon.admin.localMQTT.client;
import com.sandu.ximon.admin.localMQTT.callback.AbsMqttCallBack;
import com.sandu.ximon.admin.localMQTT.callback.MqttCallBackContext;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import static com.sandu.ximon.admin.localMQTT.client.MqttClientCreate.MQTT_CLIENT_MAP;
/**
 * @author van
 * @version 1.0
 * msg:MQTT客户端管理类,如果客户端非常多后续可入redis缓存
 * @date 2022/11/9 16:30
 */
@Slf4j
@Component
public class MqttClientManager {
    @Value("${customer.mqtt.broker}")
    private String mqttBroker;
    @Resource
    private MqttCallBackContext mqttCallBackContext;
    public MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }
    private static MqttClientManager mqttClientManager = new MqttClientManager();
    public static MqttClientManager getInstance() {
        return mqttClientManager;
    }
    /**
     * åˆ›å»ºmqtt客户端
     *
     * @param clientId       å®¢æˆ·ç«¯ID
     * @param subscribeTopic è®¢é˜…主题,可为空
     * @param userName       ç”¨æˆ·åï¼Œå¯ä¸ºç©º
     * @param password       å¯†ç ï¼Œå¯ä¸ºç©º
     * @return mqtt客户端
     */
    public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            if (null != userName && !"".equals(userName)) {
                connOpts.setUserName(userName);
            }
            if (null != password && !"".equals(password)) {
                connOpts.setPassword(password.toCharArray());
            }
            connOpts.setCleanSession(true);
            connOpts.setKeepAliveInterval(10);
            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
                if (null == callBack) {
                    callBack = mqttCallBackContext.getCallBack("default");
                }
                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);
            }
            //连接mqtt服务端broker
            client.connect(connOpts);
            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                client.subscribe(subscribeTopic);
            }
            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
        } catch (MqttException e) {
            log.error("Create mqttClient failed!", e);
        }
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,30 @@
package com.sandu.ximon.admin.localMQTT.config;
import com.sandu.ximon.admin.localMQTT.model.MqttClientVO;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
 * @author van
 * @version 1.0
 * msg:Mqtt配置类
 * @date 2022/11/9 16:21
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
    /**
     * mqtt broker地址
     */
    String broker;
    /**
     * éœ€è¦åˆ›å»ºçš„MQTT客户端
     */
    List<MqttClientVO> clientList;
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,72 @@
package com.sandu.ximon.admin.localMQTT.controller;
import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
import static java.lang.Thread.sleep;
/**
 * @author van
 * @version 1.0
 * msg:
 * @date 2022/11/11 14:18
 */
@RestController
@RequestMapping("/localMQTT")
@Slf4j
public class localMQTTTestController {
    @RequestMapping("/test")
    public String localMQTT() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            /*
            * å¼€ç¯100
            * FEA501000BFE010003FFFF0045971477B6D8C2CA
            * 10
            * FEA501000BFE010003FFFF0AA542FD69D4E6194E
            * å…³ç¯
            * FEA501000BFE0100030001007130ECA9150640E6
            * æŸ¥è¯¢å¿ƒè·³æ—¶é—´
            * FEA501000AFE110002FFFF26008FBE3DAC7C0D
            * è®¾ç½®å¿ƒè·³30秒
            * FEA501000CFE210004FFFF001E9BB444E9C75BDB49
            * 5分钟
            * FEA501000CFE210004FFFF012C4A7824285825CB53
            * */
//            å¼€10
            String result1 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
                            "FEA501000BFE010003FFFF0AA542FD69D4E6194E");
            log.info("开灯返回结果:"+result1);
            sleep(3000);
//            å…³
            String result2 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
                    "FEA501000BFE0100030001007130ECA9150640E6");
            log.info("关灯返回结果:"+result2);
            sleep(3000);
////            å¿ƒè·³æŸ¥è¯¢
//            String result3 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
//                    "FEA501000AFE110002FFFF26008FBE3DAC7C0D");
//            log.info("心跳查询返回结果:"+result3);
//            sleep(3000);
//            String result4 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
//                    "FEA501001AFE230012FE23000A00017F1019647F111E005428F600EC64EC194EA28A7C");
//            log.info("定时任务返回结果:"+result4);
//            sleep(3000);
//            System.out.println("链接状态:---"+i+"---:");
//            System.out.println(localMqttConnectStatusMap.get("363832544e5008ff3a32ffff"));
//            sleep(10000);
        }
        return "OK";
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,31 @@
package com.sandu.ximon.admin.localMQTT.model;
import lombok.Data;
/**
 * @author van
 * @version 1.0
 * msg:本地mqtt消息结构
 * @date 2022/11/9 13:55
 */
@Data
public class LocalMqttMsg {
    private String timestamp;
    private String connectType;
    private String msgType;
    private String payload;
    public LocalMqttMsg(String timestamp, String connectType, String msgType, String payload) {
        this.timestamp = timestamp;
        this.connectType = connectType;
        this.msgType = msgType;
        this.payload = payload;
    }
    public LocalMqttMsg() {
        this.timestamp = String.valueOf(System.currentTimeMillis());
        this.connectType = "1";
        this.msgType = "1";
        this.payload = "";
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,32 @@
package com.sandu.ximon.admin.localMQTT.model;
import lombok.Data;
/**
 * @author van
 * @version 1.0
 * msg: MQTT客户端
 * @date 2022/11/9 16:20
 */
@Data
public class MqttClientVO {
    /**
     * å®¢æˆ·ç«¯ID
     */
    private String clientId;
    /**
     * ç›‘听主题
     */
    private String subscribeTopic;
    /**
     * ç”¨æˆ·å
     */
    private String userName;
    /**
     * å¯†ç 
     */
    private String password;
    /**
     * æŽ¨é€ä¸»é¢˜
     * */
    private String publishTopic;
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,80 @@
package com.sandu.ximon.admin.localMQTT.util;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.StrUtil;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.util.CRC32Utils;
/**
 * @author chenjiantian
 * @date 2021/12/3 17:36
 */
public class HexFrameUtils {
    /**
     * æœ¬åœ°MQTT æ¶ˆæ¯ä¸éœ€è¦base64转化,直接使用hex通信
     * æŠŠä¸ŠæŠ¥çš„内容 è½¬åŒ–成帧实体类
     * @param hex ä¸ŠæŠ¥å†…容 {{@link com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage.Params}}
     * @return å¸§å®žä½“ç±»
     */
    public static CommonFrame transformMessageToFrame(String hex) {
        if (hex == null || hex.length() < 18) {
            return null;
        }
        //  å°†åå…­è¿›åˆ¶æ•°è½¬ä¸ºå¤§å†™
        hex = hex.toUpperCase();
        CommonFrame frame = new CommonFrame();
        // MQTT通信方式(1)
        frame.setConnectType(hex.substring(0, 2));
        //  åŠŸèƒ½ç (1)
        frame.setFunctionCode(hex.substring(2, 4));
        //  å‘½ä»¤ç±»åž‹(1)
        frame.setOrderType(hex.substring(4, 6));
        //  è´Ÿè·é•¿åº¦(2)
        frame.setPayloadLength(hex.substring(6, 10));
        //  å“åº”payload
        frame.setPayload(hex.substring(10, hex.length() - 8));
        //  æ ¡éªŒç (4)
        frame.setCrc32(hex.substring(hex.length() - 8));
        //  åˆ¤æ–­æ ¡éªŒæ˜¯å¦é€šè¿‡
        String content = hex.substring(2, hex.length() - 8);
        frame.setValidate(CRC32Utils.validateFrame(content, frame.getCrc32()));
        return frame;
    }
    public static void main(String[] args) {
        System.out.println(HexUtil.hexToInt("FF"));
        System.out.println(transformMessageToFrame("/qWEACrwAQAi//8BAQEYAB4CAAAAAAAAAApcAAAAEAAVAAAAAwAQAagAAMPfU3KbHyny").toString());
    }
    /**
     * å°†è®¾å¤‡ä¸ŠæŠ¥çš„内容解码
     *
     * @param value è®¾å¤‡ä¸ŠæŠ¥çš„内容
     * @return è§£ç åŽçš„内容,16进制
     */
    public static String decodeReportMessage(String value) {
        if (StrUtil.isBlank(value)) {
            return null;
        }
        byte[] decode = Base64.decode(value.getBytes());
        return HexUtil.encodeHexStr(decode);
    }
    /**
     * å°†å‡†å¤‡å‘送的内容编码
     * @param value å¾…编码内容
     * @return ç¼–码后的内容 å­—符串
     */
    public static String encodeReportMessage(String value) {
        if (StrUtil.isBlank(value)) {
            return null;
        }
        byte[] bytes = HexUtil.decodeHex(value);
        String encode = Base64.encode(bytes);
        return encode;
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,92 @@
package com.sandu.ximon.admin.localMQTT.util;
import com.alibaba.fastjson.JSON;
import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
import com.sandu.ximon.admin.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static java.lang.Thread.sleep;
/**
 * @author van
 * @version 1.0
 * msg:MQTT客户端工具类
 * @date 2022/11/9 16:32
 */
@Slf4j
public class MqttClientUtil {
    private static MqttClientUtil mqttClientUtil = new MqttClientUtil();
    public static MqttClientUtil getInstance() {
        return mqttClientUtil;
    }
    public static String publishPrefix = "v1/devices/request/";
    public static String clientId = "java_server_msg";
    public static final Map<String, String> MQTT_RETURN_FRAME_MAP = new ConcurrentHashMap<>();
    public static String sendMqttMsg(String topic, String content) {
        try {
            LocalMqttMsg localMqttMsg = new LocalMqttMsg();
            localMqttMsg.setPayload(content);
            MqttMessage message = new MqttMessage(JSON.toJSONString(localMqttMsg).getBytes());
             message.setQos(0);
            MqttClient mqttClient = MqttClientManager.getInstance().getMqttClientById(clientId);
            if (null == mqttClient) {
                log.error("Not exist mqttClient where it's clientId is {}", clientId);
                return topic;
            }
            long start = System.currentTimeMillis();
            mqttClient.publish(publishPrefix+topic, message);
            log.info("{}",publishPrefix+topic);
            log.info("{}", message);
            /**
             * å®žçŽ°ä¼ªåŒæ­¥
             * */
            try {
                String returnFrame = null ;
                returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
                for (int i = 0;i < 50;i++){
                    if (StringUtil.strIsNullOrEmpty(returnFrame)){
                        sleep(100);
                        returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
                    }else {
                        log.info("返回时间:{} ms",System.currentTimeMillis() - start) ;
                        String remove = MQTT_RETURN_FRAME_MAP.remove(topic);
                        log.info("remove结果:{} ",remove);
                        return returnFrame;
                    }
                }
            } catch (InterruptedException e) {
            }
        } catch (MqttException e) {
            log.error("MqttClient send msg faild!", e);
            return("通信超时");
        }
        return null;
    }
}
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java
@@ -39,13 +39,12 @@
            log.info("心跳相应");
            A5LightHeartbeatReportInnerFrame heartbeatReportInnerFrame = new A5LightHeartbeatReportInnerFrame().transformFrame(frame.getPayload());
            if ("363832544e5008ff1741ffff".equals(deviceName)) {
                System.out.println("心跳包: " + JSON.toJSONString(heartbeatReportInnerFrame));
            }
            if (heartbeatReportInnerFrame.isValidate()) {
                SpringContextHolder.getBean(LightReportDataService.class).saveReportData(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
                //心跳包上报不保存硬件设备信息
//                SpringContextHolder.getBean(LightService.class).saveLight(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
                SpringContextHolder.getBean(LightService.class).saveLight(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
            }
        } else if (A5LightReportEnum.Time_Synchronized.getCode().equals(functionCode)) {
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java
@@ -48,4 +48,5 @@
     * @return è´Ÿè·
     */
    String getPayload();
}
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java
@@ -18,9 +18,7 @@
 */
public class A5LightTimerReqInnerFrame implements IRequestInnerFrame {
    private final String payload;
    private final String functionCode = A5LightDataEnum.LightTimer.getCode();
    private final String payloadLength;
    private String payload;
    /**
     * @param framePayload å¤šä¸ªè·¯ç¯å®šæ—¶æŒ‡ä»¤ï¼Œ
@@ -32,13 +30,18 @@
        } else {
            destinationAddress = lightAddress;
        }
        payload = destinationAddress + framePayload;
        this.payloadLength = SupplementUtils.suppleZero(Integer.toHexString((payload.length() / 2)).toUpperCase(), 4);
        payload = destinationAddress + framePayload ;
//        payload = getEncodeFrame();
    }
    @Override
    public String getEncodeFrame() {
        String functionCode = A5LightDataEnum.LightTimer.getCode();
        String payloadLength = SupplementUtils.suppleZero(Integer.toHexString((payload.length() / 2)).toUpperCase(), 4);
        String frame = functionCode + payloadLength + payload;
        return MQTTConnectTypeEnum.SYNCHRONIZATION.getCode() + frame.toUpperCase() + CRC32Utils.getCRC32(frame.toUpperCase());
    }
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java
@@ -6,19 +6,26 @@
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.iot.model.v20180120.*;
import com.sandu.common.execption.BusinessException;
import com.sandu.ximon.admin.dto.DeviceStatus;
import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil;
import com.sandu.ximon.admin.manager.iot.frame.IRequestFrame;
import com.sandu.ximon.admin.manager.iot.frame.inner.BaseResponseInnerFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.InvokeParam;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.WrapResponseCommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.enums.DeviceStateEnum;
import com.sandu.ximon.admin.manager.iot.rrpc.topic.IBaseTopic;
import com.sandu.ximon.admin.manager.iot.rrpc.topic.ICustomizeTopic;
import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
/**
 * @author chenjiantian
@@ -52,46 +59,52 @@
    @Override
    public CommonFrame sendRRPC(String deviceName, InvokeParam invokeParam) {
        InvokeThingServiceResponse.Data data = invokeThing(deviceName, invokeParam, false);
        if (data == null) {
            return null;
        }
        String result = data.getResult();
        result = result.replace("\\", "");
        Map map = JSON.parseObject(result, Map.class);
        result = (String) map.get("msg");
//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, invokeParam, false);
//        if (data == null) {
//            return null;
//        }
//        String result = data.getResult();
//        result = result.replace("\\", "");
//        Map map = JSON.parseObject(result, Map.class);
//        result = (String) map.get("msg");
        String result = MqttClientUtil.sendMqttMsg(deviceName,invokeParam.getFrame());
        return FrameUtils.transformMessageToFrame(result);
    }
    @Override
    public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame) {
        InvokeParam param = new InvokeParam();
        param.setOperate("1001");
        param.setFrame(iRequestFrame.getEncodeFrame());
        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false);
        if (data == null) {
            return null;
        }
        String result = data.getResult();
        result = result.replace("\\", "");
        Map map = JSON.parseObject(result, Map.class);
        result = (String) map.get("msg");
//        InvokeParam param = new InvokeParam();
//        param.setOperate("1001");
//        param.setFrame(iRequestFrame.getEncodeFrame());
//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false);
//        if (data == null) {
//            return null;
//        }
//        String result = data.getResult();
//        result = result.replace("\\", "");
//        Map map = JSON.parseObject(result, Map.class);
//        result = (String) map.get("msg");
        String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
        String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
        log.info("自定义sendRRPC:请求帧:{},\n,响应帧:{}",iRequestFrame.toString(),result);
        return FrameUtils.transformMessageToFrame(result);
    }
    @Override
    public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame, boolean resendFlag) {
        InvokeParam param = new InvokeParam();
        param.setOperate("1001");
        param.setFrame(iRequestFrame.getEncodeFrame());
        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true);
        if (data == null) {
            return null;
        }
        String result = data.getResult();
        result = result.replace("\\", "");
        Map map = JSON.parseObject(result, Map.class);
        result = (String) map.get("msg");
//        InvokeParam param = new InvokeParam();
//        param.setOperate("1001");
//        param.setFrame(iRequestFrame.getEncodeFrame());
//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true);
//        if (data == null) {
//            return null;
//        }
//        String result = data.getResult();
//        result = result.replace("\\", "");
//        Map map = JSON.parseObject(result, Map.class);
//        result = (String) map.get("msg");
        String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
        String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
        return FrameUtils.transformMessageToFrame(result);
    }
@@ -175,14 +188,23 @@
    @Override
    public List<BatchGetDeviceStateResponse.DeviceStatus> batchGetDeviceState(List<String> deviceNames) {
        BatchGetDeviceStateRequest request = new BatchGetDeviceStateRequest();
        request.setDeviceNames(deviceNames);
        request.setProductKey(getProductKey());
        BatchGetDeviceStateResponse response = invokeSync(request);
        if (response != null && response.getSuccess()) {
            return response.getDeviceStatusList();
        }
        return null;
        List<BatchGetDeviceStateResponse.DeviceStatus> statusList = new ArrayList<>();
        deviceNames.forEach(l -> {
            BatchGetDeviceStateResponse.DeviceStatus deviceStatus = new BatchGetDeviceStateResponse.DeviceStatus();
                    deviceStatus.setDeviceName(l);
            if (localMqttConnectStatusMap.get(l)!=null &&
                    localMqttConnectStatusMap.get(l)== 1){
                deviceStatus.setStatus("ONLINE");
            }else {
                deviceStatus.setStatus("OFFLINE");
            }
            statusList.add(deviceStatus);
        });
        return statusList;
    }
    /**
ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java
@@ -20,7 +20,7 @@
    private final C3mOrderService c3mOrderService;
    //每10分钟运行一次   åˆ é™¤è¶…时订单
    @Scheduled(cron = "0 0/10 * * * ? ")
//    @Scheduled(cron = "0 0/10 * * * ? ")
    public void deleteOvertimeOrders() {
        c3mOrderService.deleteOrderListByCreateTime();
        log.error("删除超时订单定时任务执行成功");
ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java
@@ -24,7 +24,7 @@
    private WaterQualityEquipmentService waterQualityEquipmentService;
    private WaterQualityDataService waterQualityDataService;
    @Scheduled(cron = "0 0 0/1 * * ?")
//    @Scheduled(cron = "0 0 0/1 * * ?")
//    @Scheduled(cron = "0 0/2 * * * ? ")
    public void UserSubjectRefund() {
        List<WaterQualityEquipmentBo> waterQualityEquipmentList = waterQualityEquipmentService.listWaterQualityEquipment();
ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java
@@ -51,6 +51,8 @@
import java.util.*;
import java.util.stream.Collectors;
import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
/**
 * @author chenjiantian
 * @date 2021/12/13 16:00
@@ -144,7 +146,8 @@
        // èŽ·å–æœ€è¿‘çš„ä¸ŠæŠ¥æ—¶é—´
        List<String> deviceCodeList = listLight.stream().map(Light::getDeviceCode).collect(Collectors.toList());
        //拆分list
//        //拆分list
        List<List<String>> split = CollectionUtil.split(deviceCodeList, 100);
        List<BatchGetDeviceStateResponse.DeviceStatus> deviceStatuses = null;
@@ -265,13 +268,17 @@
        List<Map<String, Object>> resultList = new ArrayList<>();
        for (LightControlParam param : paramList) {
            A5LightBrightnessReqInnerFrame lightControlFrame = new A5LightBrightnessReqInnerFrame(param.getBrightness(), param.getLightAddress());
            A5Frame a5Frame = new A5Frame(A5OrderEnum.REQUEST_LIGHT_DATA.getCode(), lightControlFrame);
            A5LightBrightnessReqInnerFrame
                    lightControlFrame = new A5LightBrightnessReqInnerFrame(param.getBrightness(), param.getLightAddress());
            A5Frame a5Frame = new A5Frame(
                    A5OrderEnum.REQUEST_LIGHT_DATA.getCode(),
                    lightControlFrame);
            Map<String, Object> map = new HashMap<>();
            try {
                map.put("deviceCode", param.getDeviceCode());
                WrapResponseCommonFrame<A5LightBrightnessRespInnerFrame> frame
                        = MainBoardInvokeSyncService.getInstance().sendRRPC(param.getDeviceCode(), a5Frame, A5LightBrightnessRespInnerFrame.class);
                        = MainBoardInvokeSyncService.getInstance().sendRRPC
                        (param.getDeviceCode(), a5Frame, A5LightBrightnessRespInnerFrame.class);
                //存储控制帧指令
                StoreOperationRecordsUtils.storeInnerFrameData(param.getDeviceCode(), "单灯帧-亮度控制", a5Frame, frame);
@@ -1027,10 +1034,10 @@
    public void timeSynchronizationInitiative(String deviceCode, String lightAddress) {
        //单灯信息
        Light light = getLight(deviceCode);
        if (light == null) {
            log.error("单灯主动同步时间请求异常,单灯信息不存在!");
            return;
        }
//        if (light == null) {
//            log.error("单灯主动同步时间请求异常,单灯信息不存在!");
//            return;
//        }
        //单灯任务信息
        LightTaskPoleRelation lightTaskPoleRelation = SpringContextHolder.getBean(LightTaskPoleRelationService.class)
                .getOne(Wrappers.lambdaQuery(LightTaskPoleRelation.class)
ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java
@@ -16,6 +16,7 @@
import com.sandu.common.util.SpringContextHolder;
import com.sandu.ximon.admin.dto.LightTaskDto;
import com.sandu.ximon.admin.dto.SingleLightOrderDto;
import com.sandu.ximon.admin.manager.iot.frame.A5Frame;
import com.sandu.ximon.admin.manager.iot.frame.FrameBuilder;
import com.sandu.ximon.admin.manager.iot.frame.IRequestFrame;
import com.sandu.ximon.admin.manager.iot.frame.inner.request.A5LightTimerReqInnerFrame;
@@ -431,10 +432,25 @@
     * @return è¿”回帧
     */
    public A5LightTimerRespInnerFrame sendTimeRRpc(String framePayload, String deviceCode, String lightAddress) {
        IRequestFrame requestFrame = FrameBuilder.builderA5().innerFrame(new A5LightTimerReqInnerFrame(framePayload, lightAddress)).orderType(A5OrderEnum.REQUEST_LIGHT_DATA.getCode()).build();
//        IRequestFrame requestFrame = FrameBuilder.
//                builderA5().
//                innerFrame(
//                        new A5LightTimerReqInnerFrame
//                                (framePayload, lightAddress)
//                                ).
//                orderType(A5OrderEnum.REQUEST_LIGHT_DATA.getCode()).
//                build();
        A5LightTimerReqInnerFrame
                a5LightTimerReqInnerFrame = new A5LightTimerReqInnerFrame(framePayload, lightAddress);
        System.out.println(JSON.toJSONString(a5LightTimerReqInnerFrame) + "          --------a5LightTimerReqInnerFrame");
        A5Frame requestFrame = new A5Frame(A5OrderEnum.REQUEST_LIGHT_DATA.getCode(), a5LightTimerReqInnerFrame);
        System.out.println(requestFrame + "          --------requestFrame");
        WrapResponseCommonFrame<A5LightTimerRespInnerFrame> responseCommonFrame = MainBoardInvokeSyncService.getInstance().sendRRPC(deviceCode, requestFrame, A5LightTimerRespInnerFrame.class);
        WrapResponseCommonFrame<A5LightTimerRespInnerFrame> responseCommonFrame
                = MainBoardInvokeSyncService.getInstance().sendRRPC
                (deviceCode, requestFrame, A5LightTimerRespInnerFrame.class);
        System.out.println(responseCommonFrame + "         -----------responseCommonFrame");
        StoreOperationRecordsUtils.storeInnerFrameData(deviceCode, "单灯帧-控灯", requestFrame, responseCommonFrame);
        return Optional.ofNullable(responseCommonFrame).map(WrapResponseCommonFrame::getResponseInnerFrame).orElse(null);
ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java
@@ -60,6 +60,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
/**
 * ç¯æ†ç›¸å…³
 *
@@ -1050,7 +1052,7 @@
     * @return è®¾å¤‡çŠ¶æ€åˆ—è¡¨
     */
    public List<DeviceStatus> listStatusByDeviceCode(ArrayList<String> deviceCodeList) {
        // æœ€å¤§åªèƒ½æŸ¥50个
//         æœ€å¤§åªèƒ½æŸ¥50个
        List<List<String>> split = CollectionUtil.split(deviceCodeList, 50);
        List<DeviceStatus> statusList = new ArrayList<>();
        for (List<String> list : split) {
@@ -1064,6 +1066,21 @@
                }
            }
        }
//        List<DeviceStatus> statusList = new ArrayList<>();
//
//        deviceCodeList.forEach(l -> {
//                    DeviceStatus deviceStatus = new DeviceStatus();
//                    deviceStatus.setDeviceCode(l);
//
//            if (localMqttConnectStatusMap.get(l)!=null &&
//                    localMqttConnectStatusMap.get(l)== 1){
//                deviceStatus.setStatus(1);
//            }else {
//                deviceStatus.setStatus(0);
//            }
//            statusList.add(deviceStatus);
//        });
        return statusList;
    }
@@ -1419,4 +1436,4 @@
    }
}
}
ximon-admin/src/main/resources/application-xm.yml
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,121 @@
spring:
  datasource:
    # æ•°æ®åº“用户名
    username: root
    # æ•°æ®åº“密码
    password: zhxm2512209
    url: jdbc:mysql://39.103.154.108:2512/xm_dev?useUnicode=true&autoReconnect=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      connection-init-sqls: set names utf8mb4
      driver-class-name: com.mysql.cj.jdbc.Driver
  redis:
    host: 39.103.154.108
    password: zhxm2512209
    port: 6379
    database: 0
server:
  port: 20017
sandu:
  jwt:
    header: Authorization
    # ä»¤ç‰Œå‰ç¼€
    token-start-with: Bearer
    # å¿…须使用最少88位的Base64对该令牌进行编码
    base64-secret: U1GZNSKOH43V19GNIVE8HUYM9H86K657V1D66EAVSL9Q023J4JNWE44BNHCS6V9E66BPKF0KXUI5R1ZOYK2OWZZYALPD07JHOYUROL930UGJQUJDNAEYNTMUS27BHKTJEF9011DGGQ4QT9BN6CM2P9SY2VV2MZKJPCOW9YIGN0VJ
    # ä»¤ç‰Œè¿‡æœŸæ—¶é—´ æ­¤å¤„单位/毫秒 ï¼Œå¯åœ¨æ­¤ç½‘站生成 https://www.convertworld.com/zh-hans/time/milliseconds.html 1个月
    token-validity-in-seconds: 2629800000
    # åœ¨çº¿ç”¨æˆ·key
    online-key: online-token
    # æ˜¯å¦å¯åЍredis缓存用户信息
    cache-online: false
    #************************本地上传文件配置************************
  upload:
    #文件服务器路径
    upload-root-path: E:\file\novafile
    storage: local
    #服务器文件前缀
    real-url: http://localhost/
  common:
    urlPrefix: http://localhost/
  quartz:
    enable: true
listenter:
  isOpen: false
minio:
  endpoint: 47.106.172.9
  port: 9000
  accessKey: minioadmin
  secretKey: zhxm2512209
  secure: false
# led屏幕服务器地址(更改需要同时更改)
realtime-server:
  #  command: http://101.132.131.91:8081/payload/
  #  url: http://101.132.131.91:8081/
  command: http://112.74.63.130:20018/command/
  url: http://112.74.63.130:20018/
server-conf:
  ip: 127.0.0.1 # 47.106.172.9/101.132.131.91
nova-conf:   #诺瓦回调
  notify-url: http://39.103.154.108:20018/serv/vnnox/progress
  screen-shot-notify-url: http://39.103.154.108:20018/serv/vnnox/screenshot
  status-notify-url: http://39.103.154.108:20018/serv/vnnox/asyncStatus
  #iot产品秘钥
iot:
  access_key: LTAI4G27Af8MZEF55phdMQ4y
  access_secret: KUc2yOtr7TRB4FuF5Wr0dWeTblbEuh
#阿里云oss配置
oss-conf:
  end-point: oss-cn-shanghai.aliyuncs.com
  key-id: LTAI5tPdpt5wvJyLipRijFSP
  key-secret: 1ahYfCKd0yTddsUnuDLQzI23MLh4VQ
  bucket-name: ximonsmart
#新诺瓦
new-nova:
  #依赖地址
  string-path: C:\Users\Administrator\Desktop\novaWin\bin\viplexcore.dll
new-nova-file:
  upload:
    #文件服务器路径
    upload-root-path: E:\file\novafile
    storage: local
    #服务器文件前缀
    real-url: http://localhost/
customer:
  mqtt:
    broker: tcp://127.0.0.1:1883
    clientList:
      #客户端ID
      - clientId: java_server_msg
        #监听主题
        subscribeTopic: v1/devices/response/+
        #用户名
        userName: server_admin
        #密码
        password: zhxm2512209
        #下发主题
        publishTopic: v1/devices/request/
#监听上下线
      - clientId: java_server_status
        #监听主题
        subscribeTopic: $SYS/brokers/+/clients/#
        #用户名
        userName: server_admin
        #密码
        password: zhxm2512209
        #下发主题
        publishTopic: v1/devices/request/
ximon-admin/src/main/resources/application.yml
@@ -1,6 +1,6 @@
spring:
  profiles:
    active: prod
    active: xm
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
@@ -53,4 +53,5 @@
#主板rrpc通信PRODUCT_KEY
#rrpc:
#  key: a1JsfPG4iKW
#  key: a1JsfPG4iKW