From b531004a3cbd33d7bb9f5b7dce06ddd4f5e7ec9a Mon Sep 17 00:00:00 2001
From: liuhaonan <31457034@qq.com>
Date: 星期五, 18 十一月 2022 11:57:02 +0800
Subject: [PATCH] Merge remote-tracking branch 'origin/xm-20221107' into xm-20221107
---
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java | 31 +
ximon-admin/src/main/resources/application.yml | 5
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java | 90 ++++
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java | 1
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java | 40 ++
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java | 5
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java | 72 +++
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java | 92 ++++
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java | 13
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java | 80 ++++
ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java | 98 +++-
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java | 32 +
ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java | 23
sandu-common/src/main/resources/logback-spring.xml | 5
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java | 119 +++++
ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java | 20
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java | 51 ++
ximon-admin/pom.xml | 12
ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java | 21
ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java | 2
ximon-admin/src/main/resources/application-xm.yml | 121 ++++++
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java | 94 ++++
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java | 30 +
ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java | 89 ++++
ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java | 2
25 files changed, 1,086 insertions(+), 62 deletions(-)
diff --git a/sandu-common/src/main/resources/logback-spring.xml b/sandu-common/src/main/resources/logback-spring.xml
index a64bb41..ddde766 100644
--- a/sandu-common/src/main/resources/logback-spring.xml
+++ b/sandu-common/src/main/resources/logback-spring.xml
@@ -74,6 +74,11 @@
</root>
</springProfile>
+ <springProfile name="xm">
+ <root level="info" >
+ <appender-ref ref="STDOUT"/>
+ </root>
+ </springProfile>
<!-- 鐢熶骇鐜. 鏃ュ織绾у埆涓篧ARN涓斿啓鏃ュ織鏂囦欢-->
<springProfile name="prod,docker">
<root level="warn">
diff --git a/ximon-admin/pom.xml b/ximon-admin/pom.xml
index bcecb14..cd497b3 100644
--- a/ximon-admin/pom.xml
+++ b/ximon-admin/pom.xml
@@ -121,6 +121,18 @@
<version>5.6.0</version>
</dependency>
+ <!--鏈湴mqtt渚濊禆-->
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>1.2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.openjdk.jol</groupId>
+ <artifactId>jol-core</artifactId>
+ <version>0.16</version>
+ </dependency>
+
</dependencies>
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java
new file mode 100644
index 0000000..3d9361d
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/AbsMqttCallBack.java
@@ -0,0 +1,89 @@
+package com.sandu.ximon.admin.localMQTT.callback;
+
+import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT鍥炶皟鎶借薄绫�
+ * @date 2022/11/9 16:23
+ */
+@Slf4j
+public abstract class AbsMqttCallBack implements MqttCallback {
+ private String clientId;
+ private MqttConnectOptions connectOptions;
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public MqttConnectOptions getConnectOptions() {
+ return connectOptions;
+ }
+
+ public void setConnectOptions(MqttConnectOptions connectOptions) {
+ this.connectOptions = connectOptions;
+ }
+
+ /**
+ * 澶卞幓杩炴帴鎿嶄綔,杩涜閲嶈繛
+ *
+ * @param throwable 寮傚父
+ */
+ @Override
+ public void connectionLost(Throwable throwable) {
+ try {
+ if (null != clientId) {
+ if (null != connectOptions) {
+ MqttClientManager.getInstance().getMqttClientById(clientId).connect(connectOptions);
+ } else {
+ MqttClientManager.getInstance().getMqttClientById(clientId).connect();
+ }
+ }
+
+ } catch (Exception e) {
+ log.error("{} reconnect failed!", e);
+ }
+ }
+
+ /**
+ * 鎺ユ敹璁㈤槄娑堟伅
+ *
+ * @param topic 涓婚
+ * @param mqttMessage 鎺ユ敹娑堟伅
+ */
+ @Override
+ public void messageArrived(String topic, MqttMessage mqttMessage) {
+ String content = new String(mqttMessage.getPayload());
+ log.info("Receive topic[{}],message={}", topic, content);
+ handleReceiveMessage(topic, content);
+ }
+
+ /**
+ * 娑堟伅鍙戦�佹垚鍔�
+ *
+ * @param iMqttDeliveryToken toke
+ */
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ log.info("娑堟伅鍙戦�佹垚鍔�");
+ }
+
+
+ /**
+ * 澶勭悊鎺ユ敹鐨勬秷鎭�
+ *
+ * @param topic 涓婚
+ * @param message 娑堟伅鍐呭
+ */
+ protected abstract void handleReceiveMessage(String topic, String message);
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java
new file mode 100644
index 0000000..687a11f
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MqttCallBackContext.java
@@ -0,0 +1,40 @@
+package com.sandu.ximon.admin.localMQTT.callback;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT璁㈤槄鍥炶皟鐜绫�
+ * @date 2022/11/9 16:25
+ */
+@Component
+@Slf4j
+public class MqttCallBackContext {
+ private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();
+
+ /**
+ * 榛樿鏋勯�犲嚱鏁�
+ *
+ * @param callBackMap 鍥炶皟闆嗗悎
+ */
+ public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
+ this.callBackMap.clear();
+ callBackMap.forEach((k, v) -> this.callBackMap.put(k, v));
+ }
+
+ /**
+ * 鑾峰彇MQTT鍥炶皟绫�
+ *
+ * @param clientId 瀹㈡埛绔疘D
+ * @return MQTT鍥炶皟绫�
+ */
+ public AbsMqttCallBack getCallBack(String clientId) {
+ return this.callBackMap.get(clientId);
+ }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java
new file mode 100644
index 0000000..82fc40f
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/MsgMqttCallBack.java
@@ -0,0 +1,119 @@
+package com.sandu.ximon.admin.localMQTT.callback;
+
+
+import com.alibaba.fastjson.JSON;
+import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
+import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
+import com.sandu.ximon.admin.manager.iot.amqp.AmqpMessageListener;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
+import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
+import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage;
+import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
+import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
+import com.sandu.ximon.admin.utils.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.jms.Message;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛氶粯璁ゅ洖璋�
+ * @date 2022/11/9 16:24
+ */
+@Slf4j
+@Component("java_server_msg")
+public class MsgMqttCallBack extends AbsMqttCallBack {
+
+ private static final String localMqttConnectTypeOfSync = "1";
+
+ private static final String localMqttConnectTypeOfAsync = "2";
+
+ public static final String localMqttSyncFrame = "local_mqtt_sync_frame:";
+
+
+ protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors(),
+ Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(50000), new MsgMqttCallBack.NameTreadFactory());
+
+ static class NameTreadFactory implements ThreadFactory {
+
+ private final AtomicInteger mThreadNum = new AtomicInteger(1);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
+ }
+ }
+
+ @Override
+ protected void handleReceiveMessage(String topic, String message) {
+ EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
+
+ log.info("鎺ユ敹鍒版秷鎭�---MsgMqttCallBack:topic={},message={}", topic, message);
+ }
+
+
+ /**
+ * 鍦ㄨ繖閲屽鐞嗘偍鏀跺埌娑堟伅鍚庣殑鍏蜂綋涓氬姟閫昏緫銆�
+ */
+ private void processMessage(String topic,String messageString) {
+ try {
+
+ String mac = topic.split("/")[3];
+ LocalMqttMsg localMqttMsg = JSON.parseObject(messageString, LocalMqttMsg.class);
+
+ if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfAsync)){
+ // 璁惧鏁版嵁涓婃姤
+ processTask(null,mac, localMqttMsg);
+ }
+ else if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfSync)){
+ System.out.println(localMqttMsg.getPayload());
+ MQTT_RETURN_FRAME_MAP.put(mac,localMqttMsg.getPayload());
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 澶勭悊浠诲姟
+ *
+ * @param productKey 浜у搧鐮�
+ * @param deviceName 浜у搧鍚嶇О
+ * @param localMqttMsg 涓婃姤娑堟伅localMqttMsg
+ */
+ private void processTask(String productKey, String deviceName, LocalMqttMsg localMqttMsg) {
+ if (localMqttMsg == null) {
+ return;
+ }
+ log.info("澶勭悊璁㈤槄");
+ log.info(localMqttMsg.toString());
+ CommonFrame frame = HexFrameUtils.transformMessageToFrame(localMqttMsg.getPayload());
+ if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) {
+ // 鍗曠伅鏁版嵁涓婃姤澶勭悊
+ LightDataProcessor.getInstance().process(productKey, deviceName, frame);
+ } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) {
+ // C3鍏呯數妗╀笂鎶ュ鐞�
+ c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame);
+ } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) {
+ // 澶ф皵鏁版嵁鎸囦护涓婃姤
+ AirDataProcessor.getInstance().process(productKey, deviceName, frame);
+ } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) {
+ PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame);
+ }
+ }
+}
+
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java
new file mode 100644
index 0000000..f3cf820
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/callback/StatusMqttCallBack.java
@@ -0,0 +1,90 @@
+package com.sandu.ximon.admin.localMQTT.callback;
+
+
+import com.alibaba.fastjson.JSON;
+import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
+import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
+import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
+import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
+import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛氶粯璁ゅ洖璋�
+ * @date 2022/11/9 16:24
+ */
+@Slf4j
+@Component("java_server_status")
+public class StatusMqttCallBack extends AbsMqttCallBack {
+
+ private static final String localMqttConnectStatusConnected = "connected";
+
+ private static final String localMqttConnectStatusDisconnected = "disconnected";
+
+ public static final Map<String, Integer> localMqttConnectStatusMap = new ConcurrentHashMap<>();
+
+
+ protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors(),
+ Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(50000), new StatusMqttCallBack.NameTreadFactory());
+
+ static class NameTreadFactory implements ThreadFactory {
+
+ private final AtomicInteger mThreadNum = new AtomicInteger(1);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
+ }
+ }
+
+ @Override
+ protected void handleReceiveMessage(String topic, String message) {
+ EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
+
+ log.info("鎺ユ敹鍒版秷鎭�---StatusMqttCallBack:topic={},message={}", topic, message);
+ }
+
+
+ /**
+ * 鍦ㄨ繖閲屽鐞嗘偍鏀跺埌娑堟伅鍚庣殑鍏蜂綋涓氬姟閫昏緫銆�
+ */
+ private void processMessage(String topic,String messageString) {
+ try {
+
+ String mac = topic.split("/")[4];
+ String status = topic.split("/")[5];
+
+ System.out.println("----------------------");
+ System.out.println(mac);
+ System.out.println(status);
+ if (status.equals(localMqttConnectStatusConnected)){
+ // 璁惧鏁版嵁涓婃姤
+ localMqttConnectStatusMap.put(mac,1);
+ }
+ else if (status.equals(localMqttConnectStatusDisconnected)){
+ localMqttConnectStatusMap.put(mac,0);
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
+
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java
new file mode 100644
index 0000000..93cd4fd
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientCreate.java
@@ -0,0 +1,51 @@
+package com.sandu.ximon.admin.localMQTT.client;
+
+
+import com.sandu.ximon.admin.localMQTT.config.MqttConfig;
+import com.sandu.ximon.admin.localMQTT.model.MqttClientVO;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT瀹㈡埛绔垱寤�
+ * @date 2022/11/9 16:31
+ */
+@Component
+@Slf4j
+public class MqttClientCreate {
+ @Autowired
+ private MqttConfig mqttConfig;
+
+ @Resource
+ private MqttClientManager mqttClientManager;
+
+ /**
+ * 瀛樺偍MQTT瀹㈡埛绔�
+ */
+ public static final Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();
+
+ /**
+ * 鍒涘缓MQTT瀹㈡埛绔�
+ */
+ @PostConstruct
+ public void createMqttClient() {
+ System.out.println("createMqttClient");
+ List<MqttClientVO> mqttClientList = mqttConfig.getClientList();
+
+ for (MqttClientVO mqttClient : mqttClientList) {
+ //鍒涘缓瀹㈡埛绔紝瀹㈡埛绔疘D锛歞emo锛屽洖璋冪被璺熷鎴风ID涓�鑷�
+ System.out.println(mqttClient.getClientId());
+ mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
+ }
+ }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java
new file mode 100644
index 0000000..b35af27
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/client/MqttClientManager.java
@@ -0,0 +1,94 @@
+package com.sandu.ximon.admin.localMQTT.client;
+
+import com.sandu.ximon.admin.localMQTT.callback.AbsMqttCallBack;
+import com.sandu.ximon.admin.localMQTT.callback.MqttCallBackContext;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+import static com.sandu.ximon.admin.localMQTT.client.MqttClientCreate.MQTT_CLIENT_MAP;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT瀹㈡埛绔鐞嗙被,濡傛灉瀹㈡埛绔潪甯稿鍚庣画鍙叆redis缂撳瓨
+ * @date 2022/11/9 16:30
+ */
+
+@Slf4j
+@Component
+public class MqttClientManager {
+ @Value("${customer.mqtt.broker}")
+ private String mqttBroker;
+ @Resource
+ private MqttCallBackContext mqttCallBackContext;
+
+
+ public MqttClient getMqttClientById(String clientId) {
+ return MQTT_CLIENT_MAP.get(clientId);
+ }
+
+ private static MqttClientManager mqttClientManager = new MqttClientManager();
+ public static MqttClientManager getInstance() {
+ return mqttClientManager;
+ }
+ /**
+ * 鍒涘缓mqtt瀹㈡埛绔�
+ *
+ * @param clientId 瀹㈡埛绔疘D
+ * @param subscribeTopic 璁㈤槄涓婚锛屽彲涓虹┖
+ * @param userName 鐢ㄦ埛鍚嶏紝鍙负绌�
+ * @param password 瀵嗙爜锛屽彲涓虹┖
+ * @return mqtt瀹㈡埛绔�
+ */
+ public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
+ MemoryPersistence persistence = new MemoryPersistence();
+
+ try {
+ MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ if (null != userName && !"".equals(userName)) {
+ connOpts.setUserName(userName);
+ }
+
+ if (null != password && !"".equals(password)) {
+ connOpts.setPassword(password.toCharArray());
+ }
+
+ connOpts.setCleanSession(true);
+ connOpts.setKeepAliveInterval(10);
+
+ if (null != subscribeTopic && !"".equals(subscribeTopic)) {
+ AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
+
+ if (null == callBack) {
+ callBack = mqttCallBackContext.getCallBack("default");
+ }
+
+ callBack.setClientId(clientId);
+ callBack.setConnectOptions(connOpts);
+ client.setCallback(callBack);
+ }
+
+ //杩炴帴mqtt鏈嶅姟绔痓roker
+ client.connect(connOpts);
+
+ if (null != subscribeTopic && !"".equals(subscribeTopic)) {
+ client.subscribe(subscribeTopic);
+ }
+
+ MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
+
+
+
+ } catch (MqttException e) {
+ log.error("Create mqttClient failed!", e);
+ }
+ }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java
new file mode 100644
index 0000000..a5bedc5
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/config/MqttConfig.java
@@ -0,0 +1,30 @@
+package com.sandu.ximon.admin.localMQTT.config;
+
+import com.sandu.ximon.admin.localMQTT.model.MqttClientVO;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.List;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁qtt閰嶇疆绫�
+ * @date 2022/11/9 16:21
+ */
+@Data
+@Configuration
+@ConfigurationProperties(prefix = "customer.mqtt")
+public class MqttConfig {
+ /**
+ * mqtt broker鍦板潃
+ */
+ String broker;
+ /**
+ * 闇�瑕佸垱寤虹殑MQTT瀹㈡埛绔�
+ */
+ List<MqttClientVO> clientList;
+
+
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java
new file mode 100644
index 0000000..9fcf3dd
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/controller/localMQTTTestController.java
@@ -0,0 +1,72 @@
+package com.sandu.ximon.admin.localMQTT.controller;
+
+import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
+import static java.lang.Thread.sleep;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛�
+ * @date 2022/11/11 14:18
+ */
+@RestController
+@RequestMapping("/localMQTT")
+@Slf4j
+public class localMQTTTestController {
+
+
+ @RequestMapping("/test")
+ public String localMQTT() throws InterruptedException {
+
+ for (int i = 0; i < 100; i++) {
+
+ /*
+ * 寮�鐏�100
+ * FEA501000BFE010003FFFF0045971477B6D8C2CA
+ * 10
+ * FEA501000BFE010003FFFF0AA542FD69D4E6194E
+ * 鍏崇伅
+ * FEA501000BFE0100030001007130ECA9150640E6
+ * 鏌ヨ蹇冭烦鏃堕棿
+ * FEA501000AFE110002FFFF26008FBE3DAC7C0D
+ * 璁剧疆蹇冭烦30绉�
+ * FEA501000CFE210004FFFF001E9BB444E9C75BDB49
+ * 5鍒嗛挓
+ * FEA501000CFE210004FFFF012C4A7824285825CB53
+ * */
+// 寮�10
+ String result1 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
+ "FEA501000BFE010003FFFF0AA542FD69D4E6194E");
+ log.info("寮�鐏繑鍥炵粨鏋�:"+result1);
+ sleep(3000);
+// 鍏�
+ String result2 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
+ "FEA501000BFE0100030001007130ECA9150640E6");
+ log.info("鍏崇伅杩斿洖缁撴灉:"+result2);
+ sleep(3000);
+//// 蹇冭烦鏌ヨ
+// String result3 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
+// "FEA501000AFE110002FFFF26008FBE3DAC7C0D");
+// log.info("蹇冭烦鏌ヨ杩斿洖缁撴灉:"+result3);
+// sleep(3000);
+// String result4 = MqttClientUtil.sendMqttMsg("363832544e5008ff3a32ffff",
+// "FEA501001AFE230012FE23000A00017F1019647F111E005428F600EC64EC194EA28A7C");
+// log.info("瀹氭椂浠诲姟杩斿洖缁撴灉:"+result4);
+// sleep(3000);
+
+// System.out.println("閾炬帴鐘舵�侊細---"+i+"---:");
+// System.out.println(localMqttConnectStatusMap.get("363832544e5008ff3a32ffff"));
+// sleep(10000);
+
+ }
+
+
+ return "OK";
+ }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java
new file mode 100644
index 0000000..866f971
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/LocalMqttMsg.java
@@ -0,0 +1,31 @@
+package com.sandu.ximon.admin.localMQTT.model;
+
+import lombok.Data;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛氭湰鍦癿qtt娑堟伅缁撴瀯
+ * @date 2022/11/9 13:55
+ */
+@Data
+public class LocalMqttMsg {
+ private String timestamp;
+ private String connectType;
+ private String msgType;
+ private String payload;
+
+ public LocalMqttMsg(String timestamp, String connectType, String msgType, String payload) {
+ this.timestamp = timestamp;
+ this.connectType = connectType;
+ this.msgType = msgType;
+ this.payload = payload;
+ }
+
+ public LocalMqttMsg() {
+ this.timestamp = String.valueOf(System.currentTimeMillis());
+ this.connectType = "1";
+ this.msgType = "1";
+ this.payload = "";
+ }
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java
new file mode 100644
index 0000000..2824c89
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/model/MqttClientVO.java
@@ -0,0 +1,32 @@
+package com.sandu.ximon.admin.localMQTT.model;
+
+import lombok.Data;
+/**
+ * @author van
+ * @version 1.0
+ * msg锛� MQTT瀹㈡埛绔�
+ * @date 2022/11/9 16:20
+ */
+@Data
+public class MqttClientVO {
+ /**
+ * 瀹㈡埛绔疘D
+ */
+ private String clientId;
+ /**
+ * 鐩戝惉涓婚
+ */
+ private String subscribeTopic;
+ /**
+ * 鐢ㄦ埛鍚�
+ */
+ private String userName;
+ /**
+ * 瀵嗙爜
+ */
+ private String password;
+ /**
+ * 鎺ㄩ�佷富棰�
+ * */
+ private String publishTopic;
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java
new file mode 100644
index 0000000..321fe44
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/HexFrameUtils.java
@@ -0,0 +1,80 @@
+package com.sandu.ximon.admin.localMQTT.util;
+
+import cn.hutool.core.codec.Base64;
+import cn.hutool.core.util.HexUtil;
+import cn.hutool.core.util.StrUtil;
+import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
+import com.sandu.ximon.admin.manager.iot.rrpc.util.CRC32Utils;
+
+/**
+ * @author chenjiantian
+ * @date 2021/12/3 17:36
+ */
+public class HexFrameUtils {
+
+ /**
+ * 鏈湴MQTT 娑堟伅涓嶉渶瑕乥ase64杞寲锛岀洿鎺ヤ娇鐢╤ex閫氫俊
+ * 鎶婁笂鎶ョ殑鍐呭 杞寲鎴愬抚瀹炰綋绫�
+ * @param hex 涓婃姤鍐呭 {{@link com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage.Params}}
+ * @return 甯у疄浣撶被
+ */
+ public static CommonFrame transformMessageToFrame(String hex) {
+ if (hex == null || hex.length() < 18) {
+ return null;
+ }
+ // 灏嗗崄鍏繘鍒舵暟杞负澶у啓
+ hex = hex.toUpperCase();
+ CommonFrame frame = new CommonFrame();
+ // MQTT閫氫俊鏂瑰紡(1)
+ frame.setConnectType(hex.substring(0, 2));
+ // 鍔熻兘鐮�(1)
+ frame.setFunctionCode(hex.substring(2, 4));
+ // 鍛戒护绫诲瀷(1)
+ frame.setOrderType(hex.substring(4, 6));
+ // 璐熻嵎闀垮害(2)
+ frame.setPayloadLength(hex.substring(6, 10));
+ // 鍝嶅簲payload
+ frame.setPayload(hex.substring(10, hex.length() - 8));
+ // 鏍¢獙鐮�(4)
+ frame.setCrc32(hex.substring(hex.length() - 8));
+ // 鍒ゆ柇鏍¢獙鏄惁閫氳繃
+ String content = hex.substring(2, hex.length() - 8);
+ frame.setValidate(CRC32Utils.validateFrame(content, frame.getCrc32()));
+
+ return frame;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(HexUtil.hexToInt("FF"));
+ System.out.println(transformMessageToFrame("/qWEACrwAQAi//8BAQEYAB4CAAAAAAAAAApcAAAAEAAVAAAAAwAQAagAAMPfU3KbHyny").toString());
+ }
+
+ /**
+ * 灏嗚澶囦笂鎶ョ殑鍐呭瑙g爜
+ *
+ * @param value 璁惧涓婃姤鐨勫唴瀹�
+ * @return 瑙g爜鍚庣殑鍐呭锛�16杩涘埗
+ */
+ public static String decodeReportMessage(String value) {
+ if (StrUtil.isBlank(value)) {
+ return null;
+ }
+ byte[] decode = Base64.decode(value.getBytes());
+ return HexUtil.encodeHexStr(decode);
+ }
+
+ /**
+ * 灏嗗噯澶囧彂閫佺殑鍐呭缂栫爜
+ * @param value 寰呯紪鐮佸唴瀹�
+ * @return 缂栫爜鍚庣殑鍐呭 瀛楃涓�
+ */
+ public static String encodeReportMessage(String value) {
+ if (StrUtil.isBlank(value)) {
+ return null;
+ }
+ byte[] bytes = HexUtil.decodeHex(value);
+ String encode = Base64.encode(bytes);
+ return encode;
+ }
+
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java
new file mode 100644
index 0000000..5b9a528
--- /dev/null
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/localMQTT/util/MqttClientUtil.java
@@ -0,0 +1,92 @@
+package com.sandu.ximon.admin.localMQTT.util;
+
+import com.alibaba.fastjson.JSON;
+import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
+import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
+import com.sandu.ximon.admin.utils.StringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.lang.Thread.sleep;
+
+/**
+ * @author van
+ * @version 1.0
+ * msg锛歁QTT瀹㈡埛绔伐鍏风被
+ * @date 2022/11/9 16:32
+ */
+@Slf4j
+public class MqttClientUtil {
+
+ private static MqttClientUtil mqttClientUtil = new MqttClientUtil();
+
+ public static MqttClientUtil getInstance() {
+ return mqttClientUtil;
+ }
+
+ public static String publishPrefix = "v1/devices/request/";
+
+
+ public static String clientId = "java_server_msg";
+
+ public static final Map<String, String> MQTT_RETURN_FRAME_MAP = new ConcurrentHashMap<>();
+
+ public static String sendMqttMsg(String topic, String content) {
+ try {
+ LocalMqttMsg localMqttMsg = new LocalMqttMsg();
+ localMqttMsg.setPayload(content);
+
+ MqttMessage message = new MqttMessage(JSON.toJSONString(localMqttMsg).getBytes());
+
+ message.setQos(0);
+ MqttClient mqttClient = MqttClientManager.getInstance().getMqttClientById(clientId);
+
+ if (null == mqttClient) {
+ log.error("Not exist mqttClient where it's clientId is {}", clientId);
+ return topic;
+ }
+
+ long start = System.currentTimeMillis();
+ mqttClient.publish(publishPrefix+topic, message);
+ log.info("{}",publishPrefix+topic);
+ log.info("{}", message);
+
+ /**
+ * 瀹炵幇浼悓姝�
+ * */
+ try {
+ String returnFrame = null ;
+ returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
+
+ for (int i = 0;i < 50;i++){
+ if (StringUtil.strIsNullOrEmpty(returnFrame)){
+ sleep(100);
+ returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
+ }else {
+ log.info("杩斿洖鏃堕棿锛歿} ms",System.currentTimeMillis() - start) ;
+ String remove = MQTT_RETURN_FRAME_MAP.remove(topic);
+ log.info("remove缁撴灉锛歿} ",remove);
+ return returnFrame;
+ }
+ }
+
+
+ } catch (InterruptedException e) {
+
+ }
+
+ } catch (MqttException e) {
+ log.error("MqttClient send msg faild!", e);
+ return("閫氫俊瓒呮椂");
+ }
+ return null;
+ }
+
+
+
+}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java
index e433eac..28cee03 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/amqp/processor/LightDataProcessor.java
@@ -39,13 +39,12 @@
log.info("蹇冭烦鐩稿簲");
A5LightHeartbeatReportInnerFrame heartbeatReportInnerFrame = new A5LightHeartbeatReportInnerFrame().transformFrame(frame.getPayload());
- if ("363832544e5008ff1741ffff".equals(deviceName)) {
System.out.println("蹇冭烦鍖�: " + JSON.toJSONString(heartbeatReportInnerFrame));
- }
+
if (heartbeatReportInnerFrame.isValidate()) {
SpringContextHolder.getBean(LightReportDataService.class).saveReportData(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
//蹇冭烦鍖呬笂鎶ヤ笉淇濆瓨纭欢璁惧淇℃伅
-// SpringContextHolder.getBean(LightService.class).saveLight(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
+ SpringContextHolder.getBean(LightService.class).saveLight(deviceName, heartbeatReportInnerFrame.getHeartBeatDataPackage());
}
} else if (A5LightReportEnum.Time_Synchronized.getCode().equals(functionCode)) {
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java
index 602b658..43e3d3c 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/IRequestFrame.java
@@ -48,4 +48,5 @@
* @return 璐熻嵎
*/
String getPayload();
+
}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java
index 5cc7f92..c821ef3 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/frame/inner/request/A5LightTimerReqInnerFrame.java
@@ -18,9 +18,7 @@
*/
public class A5LightTimerReqInnerFrame implements IRequestInnerFrame {
- private final String payload;
- private final String functionCode = A5LightDataEnum.LightTimer.getCode();
- private final String payloadLength;
+ private String payload;
/**
* @param framePayload 澶氫釜璺伅瀹氭椂鎸囦护锛�
@@ -32,13 +30,18 @@
} else {
destinationAddress = lightAddress;
}
- payload = destinationAddress + framePayload;
- this.payloadLength = SupplementUtils.suppleZero(Integer.toHexString((payload.length() / 2)).toUpperCase(), 4);
+ payload = destinationAddress + framePayload ;
+// payload = getEncodeFrame();
+
+
}
@Override
public String getEncodeFrame() {
+ String functionCode = A5LightDataEnum.LightTimer.getCode();
+ String payloadLength = SupplementUtils.suppleZero(Integer.toHexString((payload.length() / 2)).toUpperCase(), 4);
String frame = functionCode + payloadLength + payload;
+
return MQTTConnectTypeEnum.SYNCHRONIZATION.getCode() + frame.toUpperCase() + CRC32Utils.getCRC32(frame.toUpperCase());
}
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java
index 6e93fc5..c78d4a0 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/manager/iot/rrpc/BaseInvokeSyncService.java
@@ -6,19 +6,26 @@
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.iot.model.v20180120.*;
import com.sandu.common.execption.BusinessException;
+import com.sandu.ximon.admin.dto.DeviceStatus;
+import com.sandu.ximon.admin.localMQTT.util.MqttClientUtil;
import com.sandu.ximon.admin.manager.iot.frame.IRequestFrame;
import com.sandu.ximon.admin.manager.iot.frame.inner.BaseResponseInnerFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.InvokeParam;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.WrapResponseCommonFrame;
+import com.sandu.ximon.admin.manager.iot.rrpc.enums.DeviceStateEnum;
import com.sandu.ximon.admin.manager.iot.rrpc.topic.IBaseTopic;
import com.sandu.ximon.admin.manager.iot.rrpc.topic.ICustomizeTopic;
import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
+import java.util.ArrayList;
+import java.util.Base64;
import java.util.List;
import java.util.Map;
+
+import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
/**
* @author chenjiantian
@@ -52,46 +59,52 @@
@Override
public CommonFrame sendRRPC(String deviceName, InvokeParam invokeParam) {
- InvokeThingServiceResponse.Data data = invokeThing(deviceName, invokeParam, false);
- if (data == null) {
- return null;
- }
- String result = data.getResult();
- result = result.replace("\\", "");
- Map map = JSON.parseObject(result, Map.class);
- result = (String) map.get("msg");
+// InvokeThingServiceResponse.Data data = invokeThing(deviceName, invokeParam, false);
+// if (data == null) {
+// return null;
+// }
+// String result = data.getResult();
+// result = result.replace("\\", "");
+// Map map = JSON.parseObject(result, Map.class);
+// result = (String) map.get("msg");
+ String result = MqttClientUtil.sendMqttMsg(deviceName,invokeParam.getFrame());
return FrameUtils.transformMessageToFrame(result);
}
@Override
public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame) {
- InvokeParam param = new InvokeParam();
- param.setOperate("1001");
- param.setFrame(iRequestFrame.getEncodeFrame());
- InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false);
- if (data == null) {
- return null;
- }
- String result = data.getResult();
- result = result.replace("\\", "");
- Map map = JSON.parseObject(result, Map.class);
- result = (String) map.get("msg");
+// InvokeParam param = new InvokeParam();
+// param.setOperate("1001");
+// param.setFrame(iRequestFrame.getEncodeFrame());
+// InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, false);
+// if (data == null) {
+// return null;
+// }
+// String result = data.getResult();
+// result = result.replace("\\", "");
+// Map map = JSON.parseObject(result, Map.class);
+// result = (String) map.get("msg");
+ String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
+ String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
+ log.info("鑷畾涔塻endRRPC:璇锋眰甯э細{},\n,鍝嶅簲甯�:{}",iRequestFrame.toString(),result);
return FrameUtils.transformMessageToFrame(result);
}
@Override
public CommonFrame sendRRPC(String deviceName, IRequestFrame iRequestFrame, boolean resendFlag) {
- InvokeParam param = new InvokeParam();
- param.setOperate("1001");
- param.setFrame(iRequestFrame.getEncodeFrame());
- InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true);
- if (data == null) {
- return null;
- }
- String result = data.getResult();
- result = result.replace("\\", "");
- Map map = JSON.parseObject(result, Map.class);
- result = (String) map.get("msg");
+// InvokeParam param = new InvokeParam();
+// param.setOperate("1001");
+// param.setFrame(iRequestFrame.getEncodeFrame());
+// InvokeThingServiceResponse.Data data = invokeThing(deviceName, param, true);
+// if (data == null) {
+// return null;
+// }
+// String result = data.getResult();
+// result = result.replace("\\", "");
+// Map map = JSON.parseObject(result, Map.class);
+// result = (String) map.get("msg");
+ String frame = FrameUtils.transformMessageToFrame(iRequestFrame.getEncodeFrame()).toString();
+ String result = MqttClientUtil.sendMqttMsg(deviceName,frame);
return FrameUtils.transformMessageToFrame(result);
}
@@ -175,14 +188,23 @@
@Override
public List<BatchGetDeviceStateResponse.DeviceStatus> batchGetDeviceState(List<String> deviceNames) {
- BatchGetDeviceStateRequest request = new BatchGetDeviceStateRequest();
- request.setDeviceNames(deviceNames);
- request.setProductKey(getProductKey());
- BatchGetDeviceStateResponse response = invokeSync(request);
- if (response != null && response.getSuccess()) {
- return response.getDeviceStatusList();
- }
- return null;
+ List<BatchGetDeviceStateResponse.DeviceStatus> statusList = new ArrayList<>();
+
+ deviceNames.forEach(l -> {
+ BatchGetDeviceStateResponse.DeviceStatus deviceStatus = new BatchGetDeviceStateResponse.DeviceStatus();
+ deviceStatus.setDeviceName(l);
+
+ if (localMqttConnectStatusMap.get(l)!=null &&
+ localMqttConnectStatusMap.get(l)== 1){
+ deviceStatus.setStatus("ONLINE");
+ }else {
+ deviceStatus.setStatus("OFFLINE");
+ }
+ statusList.add(deviceStatus);
+ });
+
+
+ return statusList;
}
/**
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java
index 027274f..621feec 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/C3mOrderSchedule.java
@@ -20,7 +20,7 @@
private final C3mOrderService c3mOrderService;
//姣�10鍒嗛挓杩愯涓�娆� 鍒犻櫎瓒呮椂璁㈠崟
- @Scheduled(cron = "0 0/10 * * * ? ")
+// @Scheduled(cron = "0 0/10 * * * ? ")
public void deleteOvertimeOrders() {
c3mOrderService.deleteOrderListByCreateTime();
log.error("鍒犻櫎瓒呮椂璁㈠崟瀹氭椂浠诲姟鎵ц鎴愬姛");
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java
index b16f604..ac7bf17 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/schedule/WaterQualityDataSchedule.java
@@ -24,7 +24,7 @@
private WaterQualityEquipmentService waterQualityEquipmentService;
private WaterQualityDataService waterQualityDataService;
- @Scheduled(cron = "0 0 0/1 * * ?")
+// @Scheduled(cron = "0 0 0/1 * * ?")
// @Scheduled(cron = "0 0/2 * * * ? ")
public void UserSubjectRefund() {
List<WaterQualityEquipmentBo> waterQualityEquipmentList = waterQualityEquipmentService.listWaterQualityEquipment();
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java
index 998120c..e00f1f8 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightService.java
@@ -51,6 +51,8 @@
import java.util.*;
import java.util.stream.Collectors;
+import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
+
/**
* @author chenjiantian
* @date 2021/12/13 16:00
@@ -144,7 +146,8 @@
// 鑾峰彇鏈�杩戠殑涓婃姤鏃堕棿
List<String> deviceCodeList = listLight.stream().map(Light::getDeviceCode).collect(Collectors.toList());
- //鎷嗗垎list
+
+// //鎷嗗垎list
List<List<String>> split = CollectionUtil.split(deviceCodeList, 100);
List<BatchGetDeviceStateResponse.DeviceStatus> deviceStatuses = null;
@@ -265,13 +268,17 @@
List<Map<String, Object>> resultList = new ArrayList<>();
for (LightControlParam param : paramList) {
- A5LightBrightnessReqInnerFrame lightControlFrame = new A5LightBrightnessReqInnerFrame(param.getBrightness(), param.getLightAddress());
- A5Frame a5Frame = new A5Frame(A5OrderEnum.REQUEST_LIGHT_DATA.getCode(), lightControlFrame);
+ A5LightBrightnessReqInnerFrame
+ lightControlFrame = new A5LightBrightnessReqInnerFrame(param.getBrightness(), param.getLightAddress());
+ A5Frame a5Frame = new A5Frame(
+ A5OrderEnum.REQUEST_LIGHT_DATA.getCode(),
+ lightControlFrame);
Map<String, Object> map = new HashMap<>();
try {
map.put("deviceCode", param.getDeviceCode());
WrapResponseCommonFrame<A5LightBrightnessRespInnerFrame> frame
- = MainBoardInvokeSyncService.getInstance().sendRRPC(param.getDeviceCode(), a5Frame, A5LightBrightnessRespInnerFrame.class);
+ = MainBoardInvokeSyncService.getInstance().sendRRPC
+ (param.getDeviceCode(), a5Frame, A5LightBrightnessRespInnerFrame.class);
//瀛樺偍鎺у埗甯ф寚浠�
StoreOperationRecordsUtils.storeInnerFrameData(param.getDeviceCode(), "鍗曠伅甯�-浜害鎺у埗", a5Frame, frame);
@@ -1027,10 +1034,10 @@
public void timeSynchronizationInitiative(String deviceCode, String lightAddress) {
//鍗曠伅淇℃伅
Light light = getLight(deviceCode);
- if (light == null) {
- log.error("鍗曠伅涓诲姩鍚屾鏃堕棿璇锋眰寮傚父锛屽崟鐏俊鎭笉瀛樺湪锛�");
- return;
- }
+// if (light == null) {
+// log.error("鍗曠伅涓诲姩鍚屾鏃堕棿璇锋眰寮傚父锛屽崟鐏俊鎭笉瀛樺湪锛�");
+// return;
+// }
//鍗曠伅浠诲姟淇℃伅
LightTaskPoleRelation lightTaskPoleRelation = SpringContextHolder.getBean(LightTaskPoleRelationService.class)
.getOne(Wrappers.lambdaQuery(LightTaskPoleRelation.class)
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java
index c528e04..2920df7 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/LightTaskService.java
@@ -16,6 +16,7 @@
import com.sandu.common.util.SpringContextHolder;
import com.sandu.ximon.admin.dto.LightTaskDto;
import com.sandu.ximon.admin.dto.SingleLightOrderDto;
+import com.sandu.ximon.admin.manager.iot.frame.A5Frame;
import com.sandu.ximon.admin.manager.iot.frame.FrameBuilder;
import com.sandu.ximon.admin.manager.iot.frame.IRequestFrame;
import com.sandu.ximon.admin.manager.iot.frame.inner.request.A5LightTimerReqInnerFrame;
@@ -431,10 +432,25 @@
* @return 杩斿洖甯�
*/
public A5LightTimerRespInnerFrame sendTimeRRpc(String framePayload, String deviceCode, String lightAddress) {
- IRequestFrame requestFrame = FrameBuilder.builderA5().innerFrame(new A5LightTimerReqInnerFrame(framePayload, lightAddress)).orderType(A5OrderEnum.REQUEST_LIGHT_DATA.getCode()).build();
+// IRequestFrame requestFrame = FrameBuilder.
+// builderA5().
+// innerFrame(
+// new A5LightTimerReqInnerFrame
+// (framePayload, lightAddress)
+// ).
+// orderType(A5OrderEnum.REQUEST_LIGHT_DATA.getCode()).
+// build();
+
+ A5LightTimerReqInnerFrame
+ a5LightTimerReqInnerFrame = new A5LightTimerReqInnerFrame(framePayload, lightAddress);
+ System.out.println(JSON.toJSONString(a5LightTimerReqInnerFrame) + " --------a5LightTimerReqInnerFrame");
+
+ A5Frame requestFrame = new A5Frame(A5OrderEnum.REQUEST_LIGHT_DATA.getCode(), a5LightTimerReqInnerFrame);
System.out.println(requestFrame + " --------requestFrame");
- WrapResponseCommonFrame<A5LightTimerRespInnerFrame> responseCommonFrame = MainBoardInvokeSyncService.getInstance().sendRRPC(deviceCode, requestFrame, A5LightTimerRespInnerFrame.class);
+ WrapResponseCommonFrame<A5LightTimerRespInnerFrame> responseCommonFrame
+ = MainBoardInvokeSyncService.getInstance().sendRRPC
+ (deviceCode, requestFrame, A5LightTimerRespInnerFrame.class);
System.out.println(responseCommonFrame + " -----------responseCommonFrame");
StoreOperationRecordsUtils.storeInnerFrameData(deviceCode, "鍗曠伅甯�-鎺х伅", requestFrame, responseCommonFrame);
return Optional.ofNullable(responseCommonFrame).map(WrapResponseCommonFrame::getResponseInnerFrame).orElse(null);
diff --git a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java
index 6d90489..0de67ea 100644
--- a/ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java
+++ b/ximon-admin/src/main/java/com/sandu/ximon/admin/service/PoleService.java
@@ -60,6 +60,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static com.sandu.ximon.admin.localMQTT.callback.StatusMqttCallBack.localMqttConnectStatusMap;
+
/**
* 鐏潌鐩稿叧
*
@@ -1050,7 +1052,7 @@
* @return 璁惧鐘舵�佸垪琛�
*/
public List<DeviceStatus> listStatusByDeviceCode(ArrayList<String> deviceCodeList) {
- // 鏈�澶у彧鑳芥煡50涓�
+// 鏈�澶у彧鑳芥煡50涓�
List<List<String>> split = CollectionUtil.split(deviceCodeList, 50);
List<DeviceStatus> statusList = new ArrayList<>();
for (List<String> list : split) {
@@ -1064,6 +1066,21 @@
}
}
}
+// List<DeviceStatus> statusList = new ArrayList<>();
+//
+// deviceCodeList.forEach(l -> {
+// DeviceStatus deviceStatus = new DeviceStatus();
+// deviceStatus.setDeviceCode(l);
+//
+// if (localMqttConnectStatusMap.get(l)!=null &&
+// localMqttConnectStatusMap.get(l)== 1){
+// deviceStatus.setStatus(1);
+// }else {
+// deviceStatus.setStatus(0);
+// }
+// statusList.add(deviceStatus);
+// });
+
return statusList;
}
@@ -1419,4 +1436,4 @@
}
-}
\ No newline at end of file
+}
diff --git a/ximon-admin/src/main/resources/application-xm.yml b/ximon-admin/src/main/resources/application-xm.yml
new file mode 100644
index 0000000..a582ce5
--- /dev/null
+++ b/ximon-admin/src/main/resources/application-xm.yml
@@ -0,0 +1,121 @@
+spring:
+ datasource:
+ # 鏁版嵁搴撶敤鎴峰悕
+ username: root
+ # 鏁版嵁搴撳瘑鐮�
+ password: zhxm2512209
+ url: jdbc:mysql://39.103.154.108:2512/xm_dev?useUnicode=true&autoReconnect=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai
+ type: com.alibaba.druid.pool.DruidDataSource
+ druid:
+ connection-init-sqls: set names utf8mb4
+ driver-class-name: com.mysql.cj.jdbc.Driver
+ redis:
+ host: 39.103.154.108
+ password: zhxm2512209
+ port: 6379
+ database: 0
+server:
+ port: 20017
+sandu:
+ jwt:
+ header: Authorization
+ # 浠ょ墝鍓嶇紑
+ token-start-with: Bearer
+ # 蹇呴』浣跨敤鏈�灏�88浣嶇殑Base64瀵硅浠ょ墝杩涜缂栫爜
+ base64-secret: U1GZNSKOH43V19GNIVE8HUYM9H86K657V1D66EAVSL9Q023J4JNWE44BNHCS6V9E66BPKF0KXUI5R1ZOYK2OWZZYALPD07JHOYUROL930UGJQUJDNAEYNTMUS27BHKTJEF9011DGGQ4QT9BN6CM2P9SY2VV2MZKJPCOW9YIGN0VJ
+ # 浠ょ墝杩囨湡鏃堕棿 姝ゅ鍗曚綅/姣 锛屽彲鍦ㄦ缃戠珯鐢熸垚 https://www.convertworld.com/zh-hans/time/milliseconds.html 1涓湀
+ token-validity-in-seconds: 2629800000
+ # 鍦ㄧ嚎鐢ㄦ埛key
+ online-key: online-token
+ # 鏄惁鍚姩redis缂撳瓨鐢ㄦ埛淇℃伅
+ cache-online: false
+ #************************鏈湴涓婁紶鏂囦欢閰嶇疆************************
+ upload:
+ #鏂囦欢鏈嶅姟鍣ㄨ矾寰�
+ upload-root-path: E:\file\novafile
+ storage: local
+ #鏈嶅姟鍣ㄦ枃浠跺墠缂�
+ real-url: http://localhost/
+ common:
+ urlPrefix: http://localhost/
+ quartz:
+ enable: true
+
+listenter:
+ isOpen: false
+
+minio:
+ endpoint: 47.106.172.9
+ port: 9000
+ accessKey: minioadmin
+ secretKey: zhxm2512209
+ secure: false
+
+
+# led灞忓箷鏈嶅姟鍣ㄥ湴鍧�锛堟洿鏀归渶瑕佸悓鏃舵洿鏀癸級
+realtime-server:
+ # command: http://101.132.131.91:8081/payload/
+ # url: http://101.132.131.91:8081/
+ command: http://112.74.63.130:20018/command/
+ url: http://112.74.63.130:20018/
+
+
+server-conf:
+ ip: 127.0.0.1 # 47.106.172.9/101.132.131.91
+
+
+
+nova-conf: #璇虹摝鍥炶皟
+ notify-url: http://39.103.154.108:20018/serv/vnnox/progress
+ screen-shot-notify-url: http://39.103.154.108:20018/serv/vnnox/screenshot
+ status-notify-url: http://39.103.154.108:20018/serv/vnnox/asyncStatus
+
+ #iot浜у搧绉橀挜
+iot:
+ access_key: LTAI4G27Af8MZEF55phdMQ4y
+ access_secret: KUc2yOtr7TRB4FuF5Wr0dWeTblbEuh
+
+#闃块噷浜憃ss閰嶇疆
+oss-conf:
+ end-point: oss-cn-shanghai.aliyuncs.com
+ key-id: LTAI5tPdpt5wvJyLipRijFSP
+ key-secret: 1ahYfCKd0yTddsUnuDLQzI23MLh4VQ
+ bucket-name: ximonsmart
+
+#鏂拌鐡�
+new-nova:
+ #渚濊禆鍦板潃
+ string-path: C:\Users\Administrator\Desktop\novaWin\bin\viplexcore.dll
+
+new-nova-file:
+ upload:
+ #鏂囦欢鏈嶅姟鍣ㄨ矾寰�
+ upload-root-path: E:\file\novafile
+ storage: local
+ #鏈嶅姟鍣ㄦ枃浠跺墠缂�
+ real-url: http://localhost/
+
+customer:
+ mqtt:
+ broker: tcp://127.0.0.1:1883
+ clientList:
+ #瀹㈡埛绔疘D
+ - clientId: java_server_msg
+ #鐩戝惉涓婚
+ subscribeTopic: v1/devices/response/+
+ #鐢ㄦ埛鍚�
+ userName: server_admin
+ #瀵嗙爜
+ password: zhxm2512209
+ #涓嬪彂涓婚
+ publishTopic: v1/devices/request/
+#鐩戝惉涓婁笅绾�
+ - clientId: java_server_status
+ #鐩戝惉涓婚
+ subscribeTopic: $SYS/brokers/+/clients/#
+ #鐢ㄦ埛鍚�
+ userName: server_admin
+ #瀵嗙爜
+ password: zhxm2512209
+ #涓嬪彂涓婚
+ publishTopic: v1/devices/request/
diff --git a/ximon-admin/src/main/resources/application.yml b/ximon-admin/src/main/resources/application.yml
index 0b58b1a..c1e7fa6 100644
--- a/ximon-admin/src/main/resources/application.yml
+++ b/ximon-admin/src/main/resources/application.yml
@@ -1,6 +1,6 @@
spring:
profiles:
- active: prod
+ active: xm
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
@@ -53,4 +53,5 @@
#涓绘澘rrpc閫氫俊PRODUCT_KEY
#rrpc:
-# key: a1JsfPG4iKW
\ No newline at end of file
+# key: a1JsfPG4iKW
+
--
Gitblit v1.9.3