2021与蓝度共同重构项目,服务端
liuhaonan
2022-11-18 b531004a3cbd33d7bb9f5b7dce06ddd4f5e7ec9a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.sandu.ximon.admin.localMQTT.callback;
 
 
import com.alibaba.fastjson.JSON;
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.stereotype.Component;
 
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
 
import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
 
/**
 * @author van
 * @version 1.0
 * msg:默认回调
 * @date 2022/11/9 16:24
 */
@Slf4j
@Component("java_server_status")
public class StatusMqttCallBack extends AbsMqttCallBack {
 
    private static final String localMqttConnectStatusConnected = "connected";
 
    private static final String localMqttConnectStatusDisconnected = "disconnected";
 
    public static final Map<String, Integer> localMqttConnectStatusMap = new ConcurrentHashMap<>();
 
 
    protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000), new StatusMqttCallBack.NameTreadFactory());
 
    static class NameTreadFactory implements ThreadFactory {
 
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
 
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
        }
    }
 
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
 
        log.info("接收到消息---StatusMqttCallBack:topic={},message={}", topic, message);
    }
 
 
    /**
     * 在这里处理您收到消息后的具体业务逻辑。
     */
    private void processMessage(String topic,String messageString) {
        try {
 
            String mac = topic.split("/")[4];
            String status = topic.split("/")[5];
 
            System.out.println("----------------------");
            System.out.println(mac);
            System.out.println(status);
            if (status.equals(localMqttConnectStatusConnected)){
                // 设备数据上报
                localMqttConnectStatusMap.put(mac,1);
            }
            else if (status.equals(localMqttConnectStatusDisconnected)){
                localMqttConnectStatusMap.put(mac,0);
            }
 
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
}