package com.sandu.ximon.admin.localMQTT.callback; import com.sandu.ximon.admin.localMQTT.client.MqttClientManager; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * @author van * @version 1.0 * msg:MQTT回调抽象类 * @date 2022/11/9 16:23 */ @Slf4j public abstract class AbsMqttCallBack implements MqttCallback { private String clientId; private MqttConnectOptions connectOptions; public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public MqttConnectOptions getConnectOptions() { return connectOptions; } public void setConnectOptions(MqttConnectOptions connectOptions) { this.connectOptions = connectOptions; } /** * 失去连接操作,进行重连 * * @param throwable 异常 */ @Override public void connectionLost(Throwable throwable) { try { if (null != clientId) { if (null != connectOptions) { MqttClientManager.getInstance().getMqttClientById(clientId).connect(connectOptions); } else { MqttClientManager.getInstance().getMqttClientById(clientId).connect(); } } } catch (Exception e) { log.error("{} reconnect failed!", e); } } /** * 接收订阅消息 * * @param topic 主题 * @param mqttMessage 接收消息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) { String content = new String(mqttMessage.getPayload()); log.info("Receive topic[{}],message={}", topic, content); handleReceiveMessage(topic, content); } /** * 消息发送成功 * * @param iMqttDeliveryToken toke */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("消息发送成功"); } /** * 处理接收的消息 * * @param topic 主题 * @param message 消息内容 */ protected abstract void handleReceiveMessage(String topic, String message); }