2021与蓝度共同重构项目,服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.sandu.ximon.admin.localMQTT.client;
 
import com.sandu.ximon.admin.localMQTT.callback.AbsMqttCallBack;
import com.sandu.ximon.admin.localMQTT.callback.MqttCallBackContext;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
 
import static com.sandu.ximon.admin.localMQTT.client.MqttClientCreate.MQTT_CLIENT_MAP;
 
/**
 * @author van
 * @version 1.0
 * msg:MQTT客户端管理类,如果客户端非常多后续可入redis缓存
 * @date 2022/11/9 16:30
 */
 
@Slf4j
@Component
public class MqttClientManager {
    @Value("${customer.mqtt.broker}")
    private String mqttBroker;
    @Resource
    private MqttCallBackContext mqttCallBackContext;
 
 
    public MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }
 
    private static MqttClientManager mqttClientManager = new MqttClientManager();
    public static MqttClientManager getInstance() {
        return mqttClientManager;
    }
    /**
     * 创建mqtt客户端
     *
     * @param clientId       客户端ID
     * @param subscribeTopic 订阅主题,可为空
     * @param userName       用户名,可为空
     * @param password       密码,可为空
     * @return mqtt客户端
     */
    public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
        MemoryPersistence persistence = new MemoryPersistence();
 
        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            if (null != userName && !"".equals(userName)) {
                connOpts.setUserName(userName);
            }
 
            if (null != password && !"".equals(password)) {
                connOpts.setPassword(password.toCharArray());
            }
 
            connOpts.setCleanSession(true);
            connOpts.setKeepAliveInterval(10);
 
            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
 
                if (null == callBack) {
                    callBack = mqttCallBackContext.getCallBack("default");
                }
 
                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);
            }
 
            //连接mqtt服务端broker
            client.connect(connOpts);
 
            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                client.subscribe(subscribeTopic);
            }
 
            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
 
 
 
        } catch (MqttException e) {
            log.error("Create mqttClient failed!", e);
        }
    }
}