package com.sandu.ximon.admin.localMQTT.client;
|
|
import com.sandu.ximon.admin.localMQTT.callback.AbsMqttCallBack;
|
import com.sandu.ximon.admin.localMQTT.callback.MqttCallBackContext;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
|
import static com.sandu.ximon.admin.localMQTT.client.MqttClientCreate.MQTT_CLIENT_MAP;
|
|
/**
|
* @author van
|
* @version 1.0
|
* msg:MQTT客户端管理类,如果客户端非常多后续可入redis缓存
|
* @date 2022/11/9 16:30
|
*/
|
|
@Slf4j
|
@Component
|
public class MqttClientManager {
|
@Value("${customer.mqtt.broker}")
|
private String mqttBroker;
|
@Resource
|
private MqttCallBackContext mqttCallBackContext;
|
|
|
public MqttClient getMqttClientById(String clientId) {
|
return MQTT_CLIENT_MAP.get(clientId);
|
}
|
|
private static MqttClientManager mqttClientManager = new MqttClientManager();
|
public static MqttClientManager getInstance() {
|
return mqttClientManager;
|
}
|
/**
|
* 创建mqtt客户端
|
*
|
* @param clientId 客户端ID
|
* @param subscribeTopic 订阅主题,可为空
|
* @param userName 用户名,可为空
|
* @param password 密码,可为空
|
* @return mqtt客户端
|
*/
|
public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
|
MemoryPersistence persistence = new MemoryPersistence();
|
|
try {
|
MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
|
MqttConnectOptions connOpts = new MqttConnectOptions();
|
if (null != userName && !"".equals(userName)) {
|
connOpts.setUserName(userName);
|
}
|
|
if (null != password && !"".equals(password)) {
|
connOpts.setPassword(password.toCharArray());
|
}
|
|
connOpts.setCleanSession(true);
|
connOpts.setKeepAliveInterval(10);
|
|
if (null != subscribeTopic && !"".equals(subscribeTopic)) {
|
AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
|
|
if (null == callBack) {
|
callBack = mqttCallBackContext.getCallBack("default");
|
}
|
|
callBack.setClientId(clientId);
|
callBack.setConnectOptions(connOpts);
|
client.setCallback(callBack);
|
}
|
|
//连接mqtt服务端broker
|
client.connect(connOpts);
|
|
if (null != subscribeTopic && !"".equals(subscribeTopic)) {
|
client.subscribe(subscribeTopic);
|
}
|
|
MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
|
|
|
|
} catch (MqttException e) {
|
log.error("Create mqttClient failed!", e);
|
}
|
}
|
}
|