package com.sandu.ximon.admin.localMQTT.client; import com.sandu.ximon.admin.localMQTT.callback.AbsMqttCallBack; import com.sandu.ximon.admin.localMQTT.callback.MqttCallBackContext; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import static com.sandu.ximon.admin.localMQTT.client.MqttClientCreate.MQTT_CLIENT_MAP; /** * @author van * @version 1.0 * msg:MQTT客户端管理类,如果客户端非常多后续可入redis缓存 * @date 2022/11/9 16:30 */ @Slf4j @Component public class MqttClientManager { @Value("${customer.mqtt.broker}") private String mqttBroker; @Resource private MqttCallBackContext mqttCallBackContext; public MqttClient getMqttClientById(String clientId) { return MQTT_CLIENT_MAP.get(clientId); } private static MqttClientManager mqttClientManager = new MqttClientManager(); public static MqttClientManager getInstance() { return mqttClientManager; } /** * 创建mqtt客户端 * * @param clientId 客户端ID * @param subscribeTopic 订阅主题,可为空 * @param userName 用户名,可为空 * @param password 密码,可为空 * @return mqtt客户端 */ public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) { MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(mqttBroker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); if (null != userName && !"".equals(userName)) { connOpts.setUserName(userName); } if (null != password && !"".equals(password)) { connOpts.setPassword(password.toCharArray()); } connOpts.setCleanSession(true); connOpts.setKeepAliveInterval(10); if (null != subscribeTopic && !"".equals(subscribeTopic)) { AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId); if (null == callBack) { callBack = mqttCallBackContext.getCallBack("default"); } callBack.setClientId(clientId); callBack.setConnectOptions(connOpts); client.setCallback(callBack); } //连接mqtt服务端broker client.connect(connOpts); if (null != subscribeTopic && !"".equals(subscribeTopic)) { client.subscribe(subscribeTopic); } MQTT_CLIENT_MAP.putIfAbsent(clientId, client); } catch (MqttException e) { log.error("Create mqttClient failed!", e); } } }