From b531004a3cbd33d7bb9f5b7dce06ddd4f5e7ec9a Mon Sep 17 00:00:00 2001
From: liuhaonan <31457034@qq.com>
Date: 星期五, 18 十一月 2022 11:57:02 +0800
Subject: [PATCH] Merge remote-tracking branch 'origin/xm-20221107' into xm-20221107

---
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java                              |   31 +
 ximon-admin/src/main/resources/application.yml                                                                 |    5 
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java                     |   90 ++++
 ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java                           |    1 
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java                    |   40 ++
 ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java             |    5 
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java              |   72 +++
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java                             |   92 ++++
 ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java |   13 
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java                              |   80 ++++
 ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java                    |   98 +++-
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java                              |   32 +
 ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java                                      |   23 
 sandu-common/src/main/resources/logback-spring.xml                                                             |    5 
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java                        |  119 +++++
 ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java                                  |   20 
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java                         |   51 ++
 ximon-admin/pom.xml                                                                                            |   12 
 ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java                                       |   21 
 ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java                                 |    2 
 ximon-admin/src/main/resources/application-xm.yml                                                              |  121 ++++++
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java                        |   94 ++++
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java                               |   30 +
 ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java                        |   89 ++++
 ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java                         |    2 
 25 files changed, 1,086 insertions(+), 62 deletions(-)

diff --git a/sandu-common/src/main/resources/logback-spring.xml b/sandu-common/src/main/resources/logback-spring.xml
index a64bb41..ddde766 100644
--- a/sandu-common/src/main/resources/logback-spring.xml
+++ b/sandu-common/src/main/resources/logback-spring.xml
@@ -74,6 +74,11 @@
         </root>
     </springProfile>
 
+    <springProfile name="xm">
+        <root level="info" >
+            <appender-ref ref="STDOUT"/>
+        </root>
+    </springProfile>
     <!-- 鐢熶骇鐜. 鏃ュ織绾у埆涓篧ARN涓斿啓鏃ュ織鏂囦欢-->
     <springProfile name="prod,docker">
         <root level="warn">
diff --git a/ximon-admin/pom.xml b/ximon-admin/pom.xml
index bcecb14..cd497b3 100644
--- a/ximon-admin/pom.xml
+++ b/ximon-admin/pom.xml
@@ -121,6 +121,18 @@
             <version>5.6.0</version>
         </dependency>
 
+        <!--鏈湴mqtt渚濊禆-->
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.openjdk.jol</groupId>
+            <artifactId>jol-core</artifactId>
+            <version>0.16</version>
+        </dependency>
+
     </dependencies>
 
 
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java
new file mode 100644
index 0000000..3d9361d
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java
@@ -0,0 +1,89 @@
+package com.sandu.ximon.admin.localMQTT.callback;
+
+import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT鍥炶皟鎶借薄绫�
+ * @date 2022/11/9 16:23
+ */
+@Slf4j
+public abstract class AbsMqttCallBack implements MqttCallback {
+    private String clientId;
+    private MqttConnectOptions connectOptions;
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public MqttConnectOptions getConnectOptions() {
+        return connectOptions;
+    }
+
+    public void setConnectOptions(MqttConnectOptions connectOptions) {
+        this.connectOptions = connectOptions;
+    }
+
+    /**
+     * 澶卞幓杩炴帴鎿嶄綔,杩涜閲嶈繛
+     *
+     * @param throwable 寮傚父
+     */
+    @Override
+    public void connectionLost(Throwable throwable) {
+        try {
+            if (null != clientId) {
+                if (null != connectOptions) {
+                    MqttClientManager.getInstance().getMqttClientById(clientId).connect(connectOptions);
+                } else {
+                    MqttClientManager.getInstance().getMqttClientById(clientId).connect();
+                }
+            }
+
+        } catch (Exception e) {
+            log.error("{} reconnect failed!", e);
+        }
+    }
+
+    /**
+     * 鎺ユ敹璁㈤槄娑堟伅
+     *
+     * @param topic           涓婚
+     * @param mqttMessage 鎺ユ敹娑堟伅
+     */
+    @Override
+    public void messageArrived(String topic, MqttMessage mqttMessage) {
+        String content = new String(mqttMessage.getPayload());
+        log.info("Receive topic[{}],message={}", topic, content);
+        handleReceiveMessage(topic, content);
+    }
+
+    /**
+     * 娑堟伅鍙戦�佹垚鍔�
+     *
+     * @param iMqttDeliveryToken toke
+     */
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+        log.info("娑堟伅鍙戦�佹垚鍔�");
+    }
+
+
+    /**
+     * 澶勭悊鎺ユ敹鐨勬秷鎭�
+     *
+     * @param topic   涓婚
+     * @param message 娑堟伅鍐呭
+     */
+    protected abstract void handleReceiveMessage(String topic, String message);
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java
new file mode 100644
index 0000000..687a11f
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java
@@ -0,0 +1,40 @@
+package com.sandu.ximon.admin.localMQTT.callback;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT璁㈤槄鍥炶皟鐜绫�
+ * @date 2022/11/9 16:25
+ */
+@Component
+@Slf4j
+public class MqttCallBackContext {
+    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();
+
+    /**
+     * 榛樿鏋勯�犲嚱鏁�
+     *
+     * @param callBackMap 鍥炶皟闆嗗悎
+     */
+    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
+        this.callBackMap.clear();
+        callBackMap.forEach((k, v) -> this.callBackMap.put(k, v));
+    }
+
+    /**
+     * 鑾峰彇MQTT鍥炶皟绫�
+     *
+     * @param clientId 瀹㈡埛绔疘D
+     * @return MQTT鍥炶皟绫�
+     */
+    public AbsMqttCallBack getCallBack(String clientId) {
+        return this.callBackMap.get(clientId);
+    }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java
new file mode 100644
index 0000000..82fc40f
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java
@@ -0,0 +1,119 @@
+package com.sandu.ximon.admin.localMQTT.callback;
+
+
+import com.alibaba.fastjson.JSON;
+import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
+import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
+import com.sandu.ximon.admin.manager.iot.amqp.AmqpMessageListener;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
+import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
+import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage;
+import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
+import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
+import com.sandu.ximon.admin.utils.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.jms.Message;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛氶粯璁ゅ洖璋�
+ * @date 2022/11/9 16:24
+ */
+@Slf4j
+@Component("java_server_msg")
+public class MsgMqttCallBack extends AbsMqttCallBack {
+
+    private static final String localMqttConnectTypeOfSync = "1";
+
+    private static final String localMqttConnectTypeOfAsync = "2";
+
+    public static final String localMqttSyncFrame = "local_mqtt_sync_frame:";
+
+
+    protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            Runtime.getRuntime().availableProcessors(),
+            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(50000), new MsgMqttCallBack.NameTreadFactory());
+
+    static class NameTreadFactory implements ThreadFactory {
+
+        private final AtomicInteger mThreadNum = new AtomicInteger(1);
+
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
+        }
+    }
+
+    @Override
+    protected void handleReceiveMessage(String topic, String message) {
+        EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
+
+        log.info("鎺ユ敹鍒版秷鎭�---MsgMqttCallBack:topic={},message={}", topic, message);
+    }
+
+
+    /**
+     * 鍦ㄨ繖閲屽鐞嗘偍鏀跺埌娑堟伅鍚庣殑鍏蜂綋涓氬姟閫昏緫銆�
+     */
+    private void processMessage(String topic,String messageString) {
+        try {
+
+            String mac = topic.split("/")[3];
+            LocalMqttMsg localMqttMsg = JSON.parseObject(messageString, LocalMqttMsg.class);
+
+            if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfAsync)){
+                // 璁惧鏁版嵁涓婃姤
+                processTask(null,mac, localMqttMsg);
+            }
+            else if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfSync)){
+                System.out.println(localMqttMsg.getPayload());
+                MQTT_RETURN_FRAME_MAP.put(mac,localMqttMsg.getPayload());
+            }
+
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 澶勭悊浠诲姟
+     *
+     * @param productKey 浜у搧鐮�
+     * @param deviceName 浜у搧鍚嶇О
+     * @param localMqttMsg      涓婃姤娑堟伅localMqttMsg
+     */
+    private void processTask(String productKey, String deviceName, LocalMqttMsg localMqttMsg) {
+        if (localMqttMsg == null) {
+            return;
+        }
+        log.info("澶勭悊璁㈤槄");
+        log.info(localMqttMsg.toString());
+        CommonFrame frame = HexFrameUtils.transformMessageToFrame(localMqttMsg.getPayload());
+        if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) {
+            // 鍗曠伅鏁版嵁涓婃姤澶勭悊
+            LightDataProcessor.getInstance().process(productKey, deviceName, frame);
+        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) {
+            // C3鍏呯數妗╀笂鎶ュ鐞�
+            c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame);
+        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) {
+            // 澶ф皵鏁版嵁鎸囦护涓婃姤
+            AirDataProcessor.getInstance().process(productKey, deviceName, frame);
+        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) {
+            PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame);
+        }
+    }
+}
+
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java
new file mode 100644
index 0000000..f3cf820
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java
@@ -0,0 +1,90 @@
+package com.sandu.ximon.admin.localMQTT.callback;
+
+
+import com.alibaba.fastjson.JSON;
+import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
+import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
+import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
+import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛氶粯璁ゅ洖璋�
+ * @date 2022/11/9 16:24
+ */
+@Slf4j
+@Component("java_server_status")
+public class StatusMqttCallBack extends AbsMqttCallBack {
+
+    private static final String localMqttConnectStatusConnected = "connected";
+
+    private static final String localMqttConnectStatusDisconnected = "disconnected";
+
+    public static final Map<String, Integer> localMqttConnectStatusMap = new ConcurrentHashMap<>();
+
+
+    protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            Runtime.getRuntime().availableProcessors(),
+            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(50000), new StatusMqttCallBack.NameTreadFactory());
+
+    static class NameTreadFactory implements ThreadFactory {
+
+        private final AtomicInteger mThreadNum = new AtomicInteger(1);
+
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
+        }
+    }
+
+    @Override
+    protected void handleReceiveMessage(String topic, String message) {
+        EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
+
+        log.info("鎺ユ敹鍒版秷鎭�---StatusMqttCallBack:topic={},message={}", topic, message);
+    }
+
+
+    /**
+     * 鍦ㄨ繖閲屽鐞嗘偍鏀跺埌娑堟伅鍚庣殑鍏蜂綋涓氬姟閫昏緫銆�
+     */
+    private void processMessage(String topic,String messageString) {
+        try {
+
+            String mac = topic.split("/")[4];
+            String status = topic.split("/")[5];
+
+            System.out.println("----------------------");
+            System.out.println(mac);
+            System.out.println(status);
+            if (status.equals(localMqttConnectStatusConnected)){
+                // 璁惧鏁版嵁涓婃姤
+                localMqttConnectStatusMap.put(mac,1);
+            }
+            else if (status.equals(localMqttConnectStatusDisconnected)){
+                localMqttConnectStatusMap.put(mac,0);
+            }
+
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}
+
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java
new file mode 100644
index 0000000..93cd4fd
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java
@@ -0,0 +1,51 @@
+package com.sandu.ximon.admin.localMQTT.client;
+
+
+import com.sandu.ximon.admin.localMQTT.config.MqttConfig;
+import com.sandu.ximon.admin.localMQTT.model.MqttClientVO;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT瀹㈡埛绔垱寤�
+ * @date 2022/11/9 16:31
+ */
+@Component
+@Slf4j
+public class MqttClientCreate {
+    @Autowired
+    private MqttConfig mqttConfig;
+
+    @Resource
+    private MqttClientManager mqttClientManager;
+
+    /**
+     * 瀛樺偍MQTT瀹㈡埛绔�
+     */
+    public static final Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();
+
+    /**
+     * 鍒涘缓MQTT瀹㈡埛绔�
+     */
+    @PostConstruct
+    public void createMqttClient() {
+        System.out.println("createMqttClient");
+        List<MqttClientVO> mqttClientList = mqttConfig.getClientList();
+
+        for (MqttClientVO mqttClient : mqttClientList) {
+            //鍒涘缓瀹㈡埛绔紝瀹㈡埛绔疘D锛歞emo锛屽洖璋冪被璺熷鎴风ID涓�鑷�
+            System.out.println(mqttClient.getClientId());
+            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
+        }
+    }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java
new file mode 100644
index 0000000..b35af27
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java
@@ -0,0 +1,94 @@
+package com.sandu.ximon.admin.localMQTT.client;
+
+import com.sandu.ximon.admin.localMQTT.callback.AbsMqttCallBack;
+import com.sandu.ximon.admin.localMQTT.callback.MqttCallBackContext;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+import static com.sandu.ximon.admin.localMQTT.client.MqttClientCreate.MQTT_CLIENT_MAP;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT瀹㈡埛绔鐞嗙被,濡傛灉瀹㈡埛绔潪甯稿鍚庣画鍙叆redis缂撳瓨
+ * @date 2022/11/9 16:30
+ */
+
+@Slf4j
+@Component
+public class MqttClientManager {
+    @Value("${customer.mqtt.broker}")
+    private String mqttBroker;
+    @Resource
+    private MqttCallBackContext mqttCallBackContext;
+
+
+    public MqttClient getMqttClientById(String clientId) {
+        return MQTT_CLIENT_MAP.get(clientId);
+    }
+
+    private static MqttClientManager mqttClientManager = new MqttClientManager();
+    public static MqttClientManager getInstance() {
+        return mqttClientManager;
+    }
+    /**
+     * 鍒涘缓mqtt瀹㈡埛绔�
+     *
+     * @param clientId       瀹㈡埛绔疘D
+     * @param subscribeTopic 璁㈤槄涓婚锛屽彲涓虹┖
+     * @param userName       鐢ㄦ埛鍚嶏紝鍙负绌�
+     * @param password       瀵嗙爜锛屽彲涓虹┖
+     * @return mqtt瀹㈡埛绔�
+     */
+    public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
+        MemoryPersistence persistence = new MemoryPersistence();
+
+        try {
+            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
+            MqttConnectOptions connOpts = new MqttConnectOptions();
+            if (null != userName && !"".equals(userName)) {
+                connOpts.setUserName(userName);
+            }
+
+            if (null != password && !"".equals(password)) {
+                connOpts.setPassword(password.toCharArray());
+            }
+
+            connOpts.setCleanSession(true);
+            connOpts.setKeepAliveInterval(10);
+
+            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
+                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
+
+                if (null == callBack) {
+                    callBack = mqttCallBackContext.getCallBack("default");
+                }
+
+                callBack.setClientId(clientId);
+                callBack.setConnectOptions(connOpts);
+                client.setCallback(callBack);
+            }
+
+            //杩炴帴mqtt鏈嶅姟绔痓roker
+            client.connect(connOpts);
+
+            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
+                client.subscribe(subscribeTopic);
+            }
+
+            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
+
+
+
+        } catch (MqttException e) {
+            log.error("Create mqttClient failed!", e);
+        }
+    }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java
new file mode 100644
index 0000000..a5bedc5
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java
@@ -0,0 +1,30 @@
+package com.sandu.ximon.admin.localMQTT.config;
+
+import com.sandu.ximon.admin.localMQTT.model.MqttClientVO;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.List;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁qtt閰嶇疆绫�
+ * @date 2022/11/9 16:21
+ */
+@Data
+@Configuration
+@ConfigurationProperties(prefix = "customer.mqtt")
+public class MqttConfig {
+    /**
+     * mqtt broker鍦板潃
+     */
+    String broker;
+    /**
+     * 闇�瑕佸垱寤虹殑MQTT瀹㈡埛绔�
+     */
+    List<MqttClientVO> clientList;
+
+
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java
new file mode 100644
index 0000000..9fcf3dd
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java
@@ -0,0 +1,72 @@
+package com.sandu.ximon.admin.localMQTT.controller;
+
+import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
+import static java.lang.Thread.sleep;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛�
+ * @date 2022/11/11 14:18
+ */
+@RestController
+@RequestMapping("/localMQTT")
+@Slf4j
+public class localMQTTTestController {
+
+
+    @RequestMapping("/test")
+    public String localMQTT() throws InterruptedException {
+
+        for (int i = 0; i < 100; i++) {
+
+            /*
+            * 寮�鐏�100
+            * FEA501000BFE010003FFFF0045971477B6D8C2CA
+            * 10
+            * FEA501000BFE010003FFFF0AA542FD69D4E6194E
+            * 鍏崇伅
+            * FEA501000BFE0100030001007130ECA9150640E6
+            * 鏌ヨ蹇冭烦鏃堕棿
+            * FEA501000AFE110002FFFF26008FBE3DAC7C0D
+            * 璁剧疆蹇冭烦30绉�
+            * FEA501000CFE210004FFFF001E9BB444E9C75BDB49
+            * 5鍒嗛挓
+            * FEA501000CFE210004FFFF012C4A7824285825CB53
+            * */
+//            寮�10
+            String result1 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
+                            "FEA501000BFE010003FFFF0AA542FD69D4E6194E");
+            log.info("寮�鐏繑鍥炵粨鏋�:"+result1);
+            sleep(3000);
+//            鍏�
+            String result2 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
+                    "FEA501000BFE0100030001007130ECA9150640E6");
+            log.info("鍏崇伅杩斿洖缁撴灉:"+result2);
+            sleep(3000);
+////            蹇冭烦鏌ヨ
+//            String result3 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
+//                    "FEA501000AFE110002FFFF26008FBE3DAC7C0D");
+//            log.info("蹇冭烦鏌ヨ杩斿洖缁撴灉:"+result3);
+//            sleep(3000);
+//            String result4 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
+//                    "FEA501001AFE230012FE23000A00017F1019647F111E005428F600EC64EC194EA28A7C");
+//            log.info("瀹氭椂浠诲姟杩斿洖缁撴灉:"+result4);
+//            sleep(3000);
+
+//            System.out.println("閾炬帴鐘舵�侊細---"+i+"---:");
+//            System.out.println(localMqttConnectStatusMap.get("363832544e5008ff3a32ffff"));
+//            sleep(10000);
+
+        }
+
+
+        return "OK";
+    }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java
new file mode 100644
index 0000000..866f971
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java
@@ -0,0 +1,31 @@
+package com.sandu.ximon.admin.localMQTT.model;
+
+import lombok.Data;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛氭湰鍦癿qtt娑堟伅缁撴瀯
+ * @date 2022/11/9 13:55
+ */
+@Data
+public class LocalMqttMsg {
+    private String timestamp;
+    private String connectType;
+    private String msgType;
+    private String payload;
+
+    public LocalMqttMsg(String timestamp, String connectType, String msgType, String payload) {
+        this.timestamp = timestamp;
+        this.connectType = connectType;
+        this.msgType = msgType;
+        this.payload = payload;
+    }
+
+    public LocalMqttMsg() {
+        this.timestamp = String.valueOf(System.currentTimeMillis());
+        this.connectType = "1";
+        this.msgType = "1";
+        this.payload = "";
+    }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java
new file mode 100644
index 0000000..2824c89
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java
@@ -0,0 +1,32 @@
+package com.sandu.ximon.admin.localMQTT.model;
+
+import lombok.Data;
+/**
+ * @author van
+ * @version 1.0
+ * msg锛� MQTT瀹㈡埛绔�
+ * @date 2022/11/9 16:20
+ */
+@Data
+public class MqttClientVO {
+    /**
+     * 瀹㈡埛绔疘D
+     */
+    private String clientId;
+    /**
+     * 鐩戝惉涓婚
+     */
+    private String subscribeTopic;
+    /**
+     * 鐢ㄦ埛鍚�
+     */
+    private String userName;
+    /**
+     * 瀵嗙爜
+     */
+    private String password;
+    /**
+     * 鎺ㄩ�佷富棰�
+     * */
+    private String publishTopic;
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java
new file mode 100644
index 0000000..321fe44
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java
@@ -0,0 +1,80 @@
+package com.sandu.ximon.admin.localMQTT.util;
+
+import cn.hutool.core.codec.Base64;
+import cn.hutool.core.util.HexUtil;
+import cn.hutool.core.util.StrUtil;
+import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
+import com.sandu.ximon.admin.manager.iot.rrpc.util.CRC32Utils;
+
+/**
+ * @author chenjiantian
+ * @date 2021/12/3 17:36
+ */
+public class HexFrameUtils {
+
+    /**
+     * 鏈湴MQTT 娑堟伅涓嶉渶瑕乥ase64杞寲锛岀洿鎺ヤ娇鐢╤ex閫氫俊
+     * 鎶婁笂鎶ョ殑鍐呭 杞寲鎴愬抚瀹炰綋绫�
+     * @param hex 涓婃姤鍐呭 {{@link com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage.Params}}
+     * @return 甯у疄浣撶被
+     */
+    public static CommonFrame transformMessageToFrame(String hex) {
+        if (hex == null || hex.length() < 18) {
+            return null;
+        }
+        //  灏嗗崄鍏繘鍒舵暟杞负澶у啓
+        hex = hex.toUpperCase();
+        CommonFrame frame = new CommonFrame();
+        // MQTT閫氫俊鏂瑰紡(1)
+        frame.setConnectType(hex.substring(0, 2));
+        //  鍔熻兘鐮�(1)
+        frame.setFunctionCode(hex.substring(2, 4));
+        //  鍛戒护绫诲瀷(1)
+        frame.setOrderType(hex.substring(4, 6));
+        //  璐熻嵎闀垮害(2)
+        frame.setPayloadLength(hex.substring(6, 10));
+        //  鍝嶅簲payload
+        frame.setPayload(hex.substring(10, hex.length() - 8));
+        //  鏍¢獙鐮�(4)
+        frame.setCrc32(hex.substring(hex.length() - 8));
+        //  鍒ゆ柇鏍¢獙鏄惁閫氳繃
+        String content = hex.substring(2, hex.length() - 8);
+        frame.setValidate(CRC32Utils.validateFrame(content, frame.getCrc32()));
+
+        return frame;
+    }
+
+    public static void main(String[] args) {
+        System.out.println(HexUtil.hexToInt("FF"));
+        System.out.println(transformMessageToFrame("/qWEACrwAQAi//8BAQEYAB4CAAAAAAAAAApcAAAAEAAVAAAAAwAQAagAAMPfU3KbHyny").toString());
+    }
+
+    /**
+     * 灏嗚澶囦笂鎶ョ殑鍐呭瑙g爜
+     *
+     * @param value 璁惧涓婃姤鐨勫唴瀹�
+     * @return 瑙g爜鍚庣殑鍐呭锛�16杩涘埗
+     */
+    public static String decodeReportMessage(String value) {
+        if (StrUtil.isBlank(value)) {
+            return null;
+        }
+        byte[] decode = Base64.decode(value.getBytes());
+        return HexUtil.encodeHexStr(decode);
+    }
+
+    /**
+     * 灏嗗噯澶囧彂閫佺殑鍐呭缂栫爜
+     * @param value 寰呯紪鐮佸唴瀹�
+     * @return 缂栫爜鍚庣殑鍐呭 瀛楃涓�
+     */
+    public static String encodeReportMessage(String value) {
+        if (StrUtil.isBlank(value)) {
+            return null;
+        }
+        byte[] bytes = HexUtil.decodeHex(value);
+        String encode = Base64.encode(bytes);
+        return encode;
+    }
+
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java
new file mode 100644
index 0000000..5b9a528
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java
@@ -0,0 +1,92 @@
+package com.sandu.ximon.admin.localMQTT.util;
+
+import com.alibaba.fastjson.JSON;
+import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
+import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
+import com.sandu.ximon.admin.utils.StringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.lang.Thread.sleep;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT瀹㈡埛绔伐鍏风被
+ * @date 2022/11/9 16:32
+ */
+@Slf4j
+public class MqttClientUtil {
+
+    private static MqttClientUtil mqttClientUtil = new MqttClientUtil();
+
+    public static MqttClientUtil getInstance() {
+        return mqttClientUtil;
+    }
+
+    public static String publishPrefix = "v1/devices/request/";
+
+
+    public static String clientId = "java_server_msg";
+
+    public static final Map<String, String> MQTT_RETURN_FRAME_MAP = new ConcurrentHashMap<>();
+
+    public static String sendMqttMsg(String topic, String content) {
+        try {
+            LocalMqttMsg localMqttMsg = new LocalMqttMsg();
+            localMqttMsg.setPayload(content);
+
+            MqttMessage message = new MqttMessage(JSON.toJSONString(localMqttMsg).getBytes());
+
+             message.setQos(0);
+            MqttClient mqttClient = MqttClientManager.getInstance().getMqttClientById(clientId);
+
+            if (null == mqttClient) {
+                log.error("Not exist mqttClient where it's clientId is {}", clientId);
+                return topic;
+            }
+
+            long start = System.currentTimeMillis();
+            mqttClient.publish(publishPrefix+topic, message);
+            log.info("{}",publishPrefix+topic);
+            log.info("{}", message);
+
+            /**
+             * 瀹炵幇浼悓姝�
+             * */
+            try {
+                String returnFrame = null ;
+                returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
+
+                for (int i = 0;i < 50;i++){
+                    if (StringUtil.strIsNullOrEmpty(returnFrame)){
+                        sleep(100);
+                        returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
+                    }else {
+                        log.info("杩斿洖鏃堕棿锛歿} ms",System.currentTimeMillis() - start) ;
+                        String remove = MQTT_RETURN_FRAME_MAP.remove(topic);
+                        log.info("remove缁撴灉锛歿} ",remove);
+                        return returnFrame;
+                    }
+                }
+
+
+            } catch (InterruptedException e) {
+
+            }
+
+        } catch (MqttException e) {
+            log.error("MqttClient send msg faild!", e);
+            return("閫氫俊瓒呮椂");
+        }
+        return null;
+    }
+
+
+
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java
index e433eac..28cee03 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java
@@ -39,13 +39,12 @@
             log.info("蹇冭烦鐩稿簲");
             A5LightHeartbeatReportInnerFrame heartbeatReportInnerFrame = new A5LightHeartbeatReportInnerFrame().transformFrame(frame.getPayload());
 
-            if ("363832544e5008ff1741ffff".equals(deviceName)) {
                 System.out.println("蹇冭烦鍖�: " + JSON.toJSONString(heartbeatReportInnerFrame));
-            }
+
             if (heartbeatReportInnerFrame.isValidate()) {
                 SpringContextHolder.getBean(LightReportDataService.class).saveReportData(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
                 //蹇冭烦鍖呬笂鎶ヤ笉淇濆瓨纭欢璁惧淇℃伅
-//                SpringContextHolder.getBean(LightService.class).saveLight(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
+                SpringContextHolder.getBean(LightService.class).saveLight(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
             }
 
         } else if (A5LightReportEnum.Time_Synchronized.getCode().equals(functionCode)) {
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java
index 602b658..43e3d3c 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java
@@ -48,4 +48,5 @@
      * @return 璐熻嵎
      */
     String getPayload();
+
 }
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java
index 5cc7f92..c821ef3 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java
@@ -18,9 +18,7 @@
  */
 public class A5LightTimerReqInnerFrame implements IRequestInnerFrame {
 
-    private final String payload;
-    private final String functionCode = A5LightDataEnum.LightTimer.getCode();
-    private final String payloadLength;
+    private String payload;
 
     /**
      * @param framePayload 澶氫釜璺伅瀹氭椂鎸囦护锛�
@@ -32,13 +30,18 @@
         } else {
             destinationAddress = lightAddress;
         }
-        payload = destinationAddress + framePayload;
-        this.payloadLength = SupplementUtils.suppleZero(Integer.toHexString((payload.length() / 2)).toUpperCase(), 4);
+        payload = destinationAddress + framePayload ;
+//        payload = getEncodeFrame();
+
+
     }
 
     @Override
     public String getEncodeFrame() {
+        String functionCode = A5LightDataEnum.LightTimer.getCode();
+        String payloadLength = SupplementUtils.suppleZero(Integer.toHexString((payload.length() / 2)).toUpperCase(), 4);
         String frame = functionCode + payloadLength + payload;
+
         return MQTTConnectTypeEnum.SYNCHRONIZATION.getCode() + frame.toUpperCase() + CRC32Utils.getCRC32(frame.toUpperCase());
     }
 
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java
index 6e93fc5..c78d4a0 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java
@@ -6,19 +6,26 @@
 import com.aliyuncs.exceptions.ClientException;
 import com.aliyuncs.iot.model.v20180120.*;
 import com.sandu.common.execption.BusinessException;
+import com.sandu.ximon.admin.dto.DeviceStatus;
+import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil;
 import com.sandu.ximon.admin.manager.iot.frame.IRequestFrame;
 import com.sandu.ximon.admin.manager.iot.frame.inner.BaseResponseInnerFrame;
 import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
 import com.sandu.ximon.admin.manager.iot.rrpc.dto.InvokeParam;
 import com.sandu.ximon.admin.manager.iot.rrpc.dto.WrapResponseCommonFrame;
+import com.sandu.ximon.admin.manager.iot.rrpc.enums.DeviceStateEnum;
 import com.sandu.ximon.admin.manager.iot.rrpc.topic.IBaseTopic;
 import com.sandu.ximon.admin.manager.iot.rrpc.topic.ICustomizeTopic;
 import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
 
+import java.util.ArrayList;
+import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+
+import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
 
 /**
  * @author chenjiantian
@@ -52,46 +59,52 @@
 
     @Override
     public CommonFrame sendRRPC(String deviceName, InvokeParam invokeParam) {
-        InvokeThingServiceResponse.Data data = invokeThing(deviceName, invokeParam, false);
-        if (data == null) {
-            return null;
-        }
-        String result = data.getResult();
-        result = result.replace("\\", "");
-        Map map = JSON.parseObject(result, Map.class);
-        result = (String) map.get("msg");
+//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, invokeParam, false);
+//        if (data == null) {
+//            return null;
+//        }
+//        String result = data.getResult();
+//        result = result.replace("\\", "");
+//        Map map = JSON.parseObject(result, Map.class);
+//        result = (String) map.get("msg");
+        String result = MqttClientUtil.sendMqttMsg(deviceName,invokeParam.getFrame());
         return FrameUtils.transformMessageToFrame(result);
     }
 
     @Override
     public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame) {
-        InvokeParam param = new InvokeParam();
-        param.setOperate("1001");
-        param.setFrame(iRequestFrame.getEncodeFrame());
-        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false);
-        if (data == null) {
-            return null;
-        }
-        String result = data.getResult();
-        result = result.replace("\\", "");
-        Map map = JSON.parseObject(result, Map.class);
-        result = (String) map.get("msg");
+//        InvokeParam param = new InvokeParam();
+//        param.setOperate("1001");
+//        param.setFrame(iRequestFrame.getEncodeFrame());
+//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false);
+//        if (data == null) {
+//            return null;
+//        }
+//        String result = data.getResult();
+//        result = result.replace("\\", "");
+//        Map map = JSON.parseObject(result, Map.class);
+//        result = (String) map.get("msg");
+        String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
+        String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
+        log.info("鑷畾涔塻endRRPC:璇锋眰甯э細{},\n,鍝嶅簲甯�:{}",iRequestFrame.toString(),result);
         return FrameUtils.transformMessageToFrame(result);
     }
 
     @Override
     public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame, boolean resendFlag) {
-        InvokeParam param = new InvokeParam();
-        param.setOperate("1001");
-        param.setFrame(iRequestFrame.getEncodeFrame());
-        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true);
-        if (data == null) {
-            return null;
-        }
-        String result = data.getResult();
-        result = result.replace("\\", "");
-        Map map = JSON.parseObject(result, Map.class);
-        result = (String) map.get("msg");
+//        InvokeParam param = new InvokeParam();
+//        param.setOperate("1001");
+//        param.setFrame(iRequestFrame.getEncodeFrame());
+//        InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true);
+//        if (data == null) {
+//            return null;
+//        }
+//        String result = data.getResult();
+//        result = result.replace("\\", "");
+//        Map map = JSON.parseObject(result, Map.class);
+//        result = (String) map.get("msg");
+        String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
+        String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
         return FrameUtils.transformMessageToFrame(result);
     }
 
@@ -175,14 +188,23 @@
 
     @Override
     public List<BatchGetDeviceStateResponse.DeviceStatus> batchGetDeviceState(List<String> deviceNames) {
-        BatchGetDeviceStateRequest request = new BatchGetDeviceStateRequest();
-        request.setDeviceNames(deviceNames);
-        request.setProductKey(getProductKey());
-        BatchGetDeviceStateResponse response = invokeSync(request);
-        if (response != null && response.getSuccess()) {
-            return response.getDeviceStatusList();
-        }
-        return null;
+        List<BatchGetDeviceStateResponse.DeviceStatus> statusList = new ArrayList<>();
+
+        deviceNames.forEach(l -> {
+            BatchGetDeviceStateResponse.DeviceStatus deviceStatus = new BatchGetDeviceStateResponse.DeviceStatus();
+                    deviceStatus.setDeviceName(l);
+
+            if (localMqttConnectStatusMap.get(l)!=null &&
+                    localMqttConnectStatusMap.get(l)== 1){
+                deviceStatus.setStatus("ONLINE");
+            }else {
+                deviceStatus.setStatus("OFFLINE");
+            }
+            statusList.add(deviceStatus);
+        });
+
+
+        return statusList;
     }
 
     /**
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java
index 027274f..621feec 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java
@@ -20,7 +20,7 @@
     private final C3mOrderService c3mOrderService;
 
     //姣�10鍒嗛挓杩愯涓�娆�   鍒犻櫎瓒呮椂璁㈠崟
-    @Scheduled(cron = "0 0/10 * * * ? ")
+//    @Scheduled(cron = "0 0/10 * * * ? ")
     public void deleteOvertimeOrders() {
         c3mOrderService.deleteOrderListByCreateTime();
         log.error("鍒犻櫎瓒呮椂璁㈠崟瀹氭椂浠诲姟鎵ц鎴愬姛");
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java
index b16f604..ac7bf17 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java
@@ -24,7 +24,7 @@
     private WaterQualityEquipmentService waterQualityEquipmentService;
     private WaterQualityDataService waterQualityDataService;
 
-    @Scheduled(cron = "0 0 0/1 * * ?")
+//    @Scheduled(cron = "0 0 0/1 * * ?")
 //    @Scheduled(cron = "0 0/2 * * * ? ")
     public void UserSubjectRefund() {
         List<WaterQualityEquipmentBo> waterQualityEquipmentList = waterQualityEquipmentService.listWaterQualityEquipment();
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java
index 998120c..e00f1f8 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java
@@ -51,6 +51,8 @@
 import java.util.*;
 import java.util.stream.Collectors;
 
+import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
+
 /**
  * @author chenjiantian
  * @date 2021/12/13 16:00
@@ -144,7 +146,8 @@
 
         // 鑾峰彇鏈�杩戠殑涓婃姤鏃堕棿
         List<String> deviceCodeList = listLight.stream().map(Light::getDeviceCode).collect(Collectors.toList());
-        //鎷嗗垎list
+
+//        //鎷嗗垎list
         List<List<String>> split = CollectionUtil.split(deviceCodeList, 100);
 
         List<BatchGetDeviceStateResponse.DeviceStatus> deviceStatuses = null;
@@ -265,13 +268,17 @@
 
         List<Map<String, Object>> resultList = new ArrayList<>();
         for (LightControlParam param : paramList) {
-            A5LightBrightnessReqInnerFrame lightControlFrame = new A5LightBrightnessReqInnerFrame(param.getBrightness(), param.getLightAddress());
-            A5Frame a5Frame = new A5Frame(A5OrderEnum.REQUEST_LIGHT_DATA.getCode(), lightControlFrame);
+            A5LightBrightnessReqInnerFrame
+                    lightControlFrame = new A5LightBrightnessReqInnerFrame(param.getBrightness(), param.getLightAddress());
+            A5Frame a5Frame = new A5Frame(
+                    A5OrderEnum.REQUEST_LIGHT_DATA.getCode(),
+                    lightControlFrame);
             Map<String, Object> map = new HashMap<>();
             try {
                 map.put("deviceCode", param.getDeviceCode());
                 WrapResponseCommonFrame<A5LightBrightnessRespInnerFrame> frame
-                        = MainBoardInvokeSyncService.getInstance().sendRRPC(param.getDeviceCode(), a5Frame, A5LightBrightnessRespInnerFrame.class);
+                        = MainBoardInvokeSyncService.getInstance().sendRRPC
+                        (param.getDeviceCode(), a5Frame, A5LightBrightnessRespInnerFrame.class);
                 //瀛樺偍鎺у埗甯ф寚浠�
                 StoreOperationRecordsUtils.storeInnerFrameData(param.getDeviceCode(), "鍗曠伅甯�-浜害鎺у埗", a5Frame, frame);
 
@@ -1027,10 +1034,10 @@
     public void timeSynchronizationInitiative(String deviceCode, String lightAddress) {
         //鍗曠伅淇℃伅
         Light light = getLight(deviceCode);
-        if (light == null) {
-            log.error("鍗曠伅涓诲姩鍚屾鏃堕棿璇锋眰寮傚父锛屽崟鐏俊鎭笉瀛樺湪锛�");
-            return;
-        }
+//        if (light == null) {
+//            log.error("鍗曠伅涓诲姩鍚屾鏃堕棿璇锋眰寮傚父锛屽崟鐏俊鎭笉瀛樺湪锛�");
+//            return;
+//        }
         //鍗曠伅浠诲姟淇℃伅
         LightTaskPoleRelation lightTaskPoleRelation = SpringContextHolder.getBean(LightTaskPoleRelationService.class)
                 .getOne(Wrappers.lambdaQuery(LightTaskPoleRelation.class)
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java
index c528e04..2920df7 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java
@@ -16,6 +16,7 @@
 import com.sandu.common.util.SpringContextHolder;
 import com.sandu.ximon.admin.dto.LightTaskDto;
 import com.sandu.ximon.admin.dto.SingleLightOrderDto;
+import com.sandu.ximon.admin.manager.iot.frame.A5Frame;
 import com.sandu.ximon.admin.manager.iot.frame.FrameBuilder;
 import com.sandu.ximon.admin.manager.iot.frame.IRequestFrame;
 import com.sandu.ximon.admin.manager.iot.frame.inner.request.A5LightTimerReqInnerFrame;
@@ -431,10 +432,25 @@
      * @return 杩斿洖甯�
      */
     public A5LightTimerRespInnerFrame sendTimeRRpc(String framePayload, String deviceCode, String lightAddress) {
-        IRequestFrame requestFrame = FrameBuilder.builderA5().innerFrame(new A5LightTimerReqInnerFrame(framePayload, lightAddress)).orderType(A5OrderEnum.REQUEST_LIGHT_DATA.getCode()).build();
+//        IRequestFrame requestFrame = FrameBuilder.
+//                builderA5().
+//                innerFrame(
+//                        new A5LightTimerReqInnerFrame
+//                                (framePayload, lightAddress)
+//                                ).
+//                orderType(A5OrderEnum.REQUEST_LIGHT_DATA.getCode()).
+//                build();
+
+        A5LightTimerReqInnerFrame
+                a5LightTimerReqInnerFrame = new A5LightTimerReqInnerFrame(framePayload, lightAddress);
+        System.out.println(JSON.toJSONString(a5LightTimerReqInnerFrame) + "          --------a5LightTimerReqInnerFrame");
+
+        A5Frame requestFrame = new A5Frame(A5OrderEnum.REQUEST_LIGHT_DATA.getCode(), a5LightTimerReqInnerFrame);
         System.out.println(requestFrame + "          --------requestFrame");
 
-        WrapResponseCommonFrame<A5LightTimerRespInnerFrame> responseCommonFrame = MainBoardInvokeSyncService.getInstance().sendRRPC(deviceCode, requestFrame, A5LightTimerRespInnerFrame.class);
+        WrapResponseCommonFrame<A5LightTimerRespInnerFrame> responseCommonFrame
+                = MainBoardInvokeSyncService.getInstance().sendRRPC
+                (deviceCode, requestFrame, A5LightTimerRespInnerFrame.class);
         System.out.println(responseCommonFrame + "         -----------responseCommonFrame");
         StoreOperationRecordsUtils.storeInnerFrameData(deviceCode, "鍗曠伅甯�-鎺х伅", requestFrame, responseCommonFrame);
         return Optional.ofNullable(responseCommonFrame).map(WrapResponseCommonFrame::getResponseInnerFrame).orElse(null);
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java
index 6d90489..0de67ea 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java
@@ -60,6 +60,8 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
+
 /**
  * 鐏潌鐩稿叧
  *
@@ -1050,7 +1052,7 @@
      * @return 璁惧鐘舵�佸垪琛�
      */
     public List<DeviceStatus> listStatusByDeviceCode(ArrayList<String> deviceCodeList) {
-        // 鏈�澶у彧鑳芥煡50涓�
+//         鏈�澶у彧鑳芥煡50涓�
         List<List<String>> split = CollectionUtil.split(deviceCodeList, 50);
         List<DeviceStatus> statusList = new ArrayList<>();
         for (List<String> list : split) {
@@ -1064,6 +1066,21 @@
                 }
             }
         }
+//        List<DeviceStatus> statusList = new ArrayList<>();
+//
+//        deviceCodeList.forEach(l -> {
+//                    DeviceStatus deviceStatus = new DeviceStatus();
+//                    deviceStatus.setDeviceCode(l);
+//
+//            if (localMqttConnectStatusMap.get(l)!=null &&
+//                    localMqttConnectStatusMap.get(l)== 1){
+//                deviceStatus.setStatus(1);
+//            }else {
+//                deviceStatus.setStatus(0);
+//            }
+//            statusList.add(deviceStatus);
+//        });
+
         return statusList;
     }
 
@@ -1419,4 +1436,4 @@
 
     }
 
-}
\ No newline at end of file
+}
diff --git a/ximon-admin/src/main/resources/application-xm.yml b/ximon-admin/src/main/resources/application-xm.yml
new file mode 100644
index 0000000..a582ce5
--- /dev/null
+++ b/ximon-admin/src/main/resources/application-xm.yml
@@ -0,0 +1,121 @@
+spring:
+  datasource:
+    # 鏁版嵁搴撶敤鎴峰悕
+    username: root
+    # 鏁版嵁搴撳瘑鐮�
+    password: zhxm2512209
+    url: jdbc:mysql://39.103.154.108:2512/xm_dev?useUnicode=true&autoReconnect=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai
+    type: com.alibaba.druid.pool.DruidDataSource
+    druid:
+      connection-init-sqls: set names utf8mb4
+      driver-class-name: com.mysql.cj.jdbc.Driver
+  redis:
+    host: 39.103.154.108
+    password: zhxm2512209
+    port: 6379
+    database: 0
+server:
+  port: 20017
+sandu:
+  jwt:
+    header: Authorization
+    # 浠ょ墝鍓嶇紑
+    token-start-with: Bearer
+    # 蹇呴』浣跨敤鏈�灏�88浣嶇殑Base64瀵硅浠ょ墝杩涜缂栫爜
+    base64-secret: U1GZNSKOH43V19GNIVE8HUYM9H86K657V1D66EAVSL9Q023J4JNWE44BNHCS6V9E66BPKF0KXUI5R1ZOYK2OWZZYALPD07JHOYUROL930UGJQUJDNAEYNTMUS27BHKTJEF9011DGGQ4QT9BN6CM2P9SY2VV2MZKJPCOW9YIGN0VJ
+    # 浠ょ墝杩囨湡鏃堕棿 姝ゅ鍗曚綅/姣 锛屽彲鍦ㄦ缃戠珯鐢熸垚 https://www.convertworld.com/zh-hans/time/milliseconds.html 1涓湀
+    token-validity-in-seconds: 2629800000
+    # 鍦ㄧ嚎鐢ㄦ埛key
+    online-key: online-token
+    # 鏄惁鍚姩redis缂撳瓨鐢ㄦ埛淇℃伅
+    cache-online: false
+    #************************鏈湴涓婁紶鏂囦欢閰嶇疆************************
+  upload:
+    #鏂囦欢鏈嶅姟鍣ㄨ矾寰�
+    upload-root-path: E:\file\novafile
+    storage: local
+    #鏈嶅姟鍣ㄦ枃浠跺墠缂�
+    real-url: http://localhost/
+  common:
+    urlPrefix: http://localhost/
+  quartz:
+    enable: true
+
+listenter:
+  isOpen: false
+
+minio:
+  endpoint: 47.106.172.9
+  port: 9000
+  accessKey: minioadmin
+  secretKey: zhxm2512209
+  secure: false
+
+
+# led灞忓箷鏈嶅姟鍣ㄥ湴鍧�锛堟洿鏀归渶瑕佸悓鏃舵洿鏀癸級
+realtime-server:
+  #  command: http://101.132.131.91:8081/payload/
+  #  url: http://101.132.131.91:8081/
+  command: http://112.74.63.130:20018/command/
+  url: http://112.74.63.130:20018/
+
+
+server-conf:
+  ip: 127.0.0.1 # 47.106.172.9/101.132.131.91
+
+
+
+nova-conf:   #璇虹摝鍥炶皟
+  notify-url: http://39.103.154.108:20018/serv/vnnox/progress
+  screen-shot-notify-url: http://39.103.154.108:20018/serv/vnnox/screenshot
+  status-notify-url: http://39.103.154.108:20018/serv/vnnox/asyncStatus
+
+  #iot浜у搧绉橀挜
+iot:
+  access_key: LTAI4G27Af8MZEF55phdMQ4y
+  access_secret: KUc2yOtr7TRB4FuF5Wr0dWeTblbEuh
+
+#闃块噷浜憃ss閰嶇疆
+oss-conf:
+  end-point: oss-cn-shanghai.aliyuncs.com
+  key-id: LTAI5tPdpt5wvJyLipRijFSP
+  key-secret: 1ahYfCKd0yTddsUnuDLQzI23MLh4VQ
+  bucket-name: ximonsmart
+
+#鏂拌鐡�
+new-nova:
+  #渚濊禆鍦板潃
+  string-path: C:\Users\Administrator\Desktop\novaWin\bin\viplexcore.dll
+
+new-nova-file:
+  upload:
+    #鏂囦欢鏈嶅姟鍣ㄨ矾寰�
+    upload-root-path: E:\file\novafile
+    storage: local
+    #鏈嶅姟鍣ㄦ枃浠跺墠缂�
+    real-url: http://localhost/
+
+customer:
+  mqtt:
+    broker: tcp://127.0.0.1:1883
+    clientList:
+      #瀹㈡埛绔疘D
+      - clientId: java_server_msg
+        #鐩戝惉涓婚
+        subscribeTopic: v1/devices/response/+
+        #鐢ㄦ埛鍚�
+        userName: server_admin
+        #瀵嗙爜
+        password: zhxm2512209
+        #涓嬪彂涓婚
+        publishTopic: v1/devices/request/
+#鐩戝惉涓婁笅绾�
+      - clientId: java_server_status
+        #鐩戝惉涓婚
+        subscribeTopic: $SYS/brokers/+/clients/#
+        #鐢ㄦ埛鍚�
+        userName: server_admin
+        #瀵嗙爜
+        password: zhxm2512209
+        #涓嬪彂涓婚
+        publishTopic: v1/devices/request/
diff --git a/ximon-admin/src/main/resources/application.yml b/ximon-admin/src/main/resources/application.yml
index 0b58b1a..c1e7fa6 100644
--- a/ximon-admin/src/main/resources/application.yml
+++ b/ximon-admin/src/main/resources/application.yml
@@ -1,6 +1,6 @@
 spring:
   profiles:
-    active: prod
+    active: xm
   jackson:
     date-format: yyyy-MM-dd HH:mm:ss
     time-zone: GMT+8
@@ -53,4 +53,5 @@
 
 #涓绘澘rrpc閫氫俊PRODUCT_KEY
 #rrpc:
-#  key: a1JsfPG4iKW
\ No newline at end of file
+#  key: a1JsfPG4iKW
+

--
Gitblit v1.9.3