package com.sandu.ximon.admin.localMQTT.callback;
|
|
import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
/**
|
* @author van
|
* @version 1.0
|
* msg:MQTT回调抽象类
|
* @date 2022/11/9 16:23
|
*/
|
@Slf4j
|
public abstract class AbsMqttCallBack implements MqttCallback {
|
private String clientId;
|
private MqttConnectOptions connectOptions;
|
|
public String getClientId() {
|
return clientId;
|
}
|
|
public void setClientId(String clientId) {
|
this.clientId = clientId;
|
}
|
|
public MqttConnectOptions getConnectOptions() {
|
return connectOptions;
|
}
|
|
public void setConnectOptions(MqttConnectOptions connectOptions) {
|
this.connectOptions = connectOptions;
|
}
|
|
/**
|
* 失去连接操作,进行重连
|
*
|
* @param throwable 异常
|
*/
|
@Override
|
public void connectionLost(Throwable throwable) {
|
try {
|
if (null != clientId) {
|
if (null != connectOptions) {
|
MqttClientManager.getInstance().getMqttClientById(clientId).connect(connectOptions);
|
} else {
|
MqttClientManager.getInstance().getMqttClientById(clientId).connect();
|
}
|
}
|
|
} catch (Exception e) {
|
log.error("{} reconnect failed!", e);
|
}
|
}
|
|
/**
|
* 接收订阅消息
|
*
|
* @param topic 主题
|
* @param mqttMessage 接收消息
|
*/
|
@Override
|
public void messageArrived(String topic, MqttMessage mqttMessage) {
|
String content = new String(mqttMessage.getPayload());
|
log.info("Receive topic[{}],message={}", topic, content);
|
handleReceiveMessage(topic, content);
|
}
|
|
/**
|
* 消息发送成功
|
*
|
* @param iMqttDeliveryToken toke
|
*/
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
log.info("消息发送成功");
|
}
|
|
|
/**
|
* 处理接收的消息
|
*
|
* @param topic 主题
|
* @param message 消息内容
|
*/
|
protected abstract void handleReceiveMessage(String topic, String message);
|
}
|