2021与蓝度共同重构项目,服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package com.sandu.ximon.admin.localMQTT.callback;
 
 
import com.alibaba.fastjson.JSON;
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
import com.sandu.ximon.admin.manager.iot.amqp.AmqpMessageListener;
import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage;
import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
import com.sandu.ximon.admin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
import javax.jms.Message;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
 
import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
 
/**
 * @author van
 * @version 1.0
 * msg:默认回调
 * @date 2022/11/9 16:24
 */
@Slf4j
@Component("java_server_msg")
public class MsgMqttCallBack extends AbsMqttCallBack {
 
    private static final String localMqttConnectTypeOfSync = "1";
 
    private static final String localMqttConnectTypeOfAsync = "2";
 
    public static final String localMqttSyncFrame = "local_mqtt_sync_frame:";
 
 
    protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000), new MsgMqttCallBack.NameTreadFactory());
 
    static class NameTreadFactory implements ThreadFactory {
 
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
 
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
        }
    }
 
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
 
        log.info("接收到消息---MsgMqttCallBack:topic={},message={}", topic, message);
    }
 
 
    /**
     * 在这里处理您收到消息后的具体业务逻辑。
     */
    private void processMessage(String topic,String messageString) {
        try {
 
            String mac = topic.split("/")[3];
            LocalMqttMsg localMqttMsg = JSON.parseObject(messageString, LocalMqttMsg.class);
 
            if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfAsync)){
                // 设备数据上报
                processTask(null,mac, localMqttMsg);
            }
            else if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfSync)){
                System.out.println(localMqttMsg.getPayload());
                MQTT_RETURN_FRAME_MAP.put(mac,localMqttMsg.getPayload());
            }
 
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 处理任务
     *
     * @param productKey 产品码
     * @param deviceName 产品名称
     * @param localMqttMsg      上报消息localMqttMsg
     */
    private void processTask(String productKey, String deviceName, LocalMqttMsg localMqttMsg) {
        if (localMqttMsg == null) {
            return;
        }
        log.info("处理订阅");
        log.info(localMqttMsg.toString());
        CommonFrame frame = HexFrameUtils.transformMessageToFrame(localMqttMsg.getPayload());
        if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) {
            // 单灯数据上报处理
            LightDataProcessor.getInstance().process(productKey, deviceName, frame);
        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) {
            // C3充电桩上报处理
            c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame);
        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) {
            // 大气数据指令上报
            AirDataProcessor.getInstance().process(productKey, deviceName, frame);
        } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) {
            PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame);
        }
    }
}