package com.sandu.ximon.admin.localMQTT.util;
|
|
import com.alibaba.fastjson.JSON;
|
import com.sandu.ximon.admin.localMQTT.client.MqttClientManager;
|
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
|
import com.sandu.ximon.admin.utils.StringUtil;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import static java.lang.Thread.sleep;
|
|
/**
|
* @author van
|
* @version 1.0
|
* msg:MQTT客户端工具类
|
* @date 2022/11/9 16:32
|
*/
|
@Slf4j
|
public class MqttClientUtil {
|
|
private static MqttClientUtil mqttClientUtil = new MqttClientUtil();
|
|
public static MqttClientUtil getInstance() {
|
return mqttClientUtil;
|
}
|
|
public static String publishPrefix = "v1/devices/request/";
|
|
|
public static String clientId = "java_server_msg";
|
|
public static final Map<String, String> MQTT_RETURN_FRAME_MAP = new ConcurrentHashMap<>();
|
|
public static String sendMqttMsg(String topic, String content) {
|
try {
|
LocalMqttMsg localMqttMsg = new LocalMqttMsg();
|
localMqttMsg.setPayload(content);
|
|
MqttMessage message = new MqttMessage(JSON.toJSONString(localMqttMsg).getBytes());
|
|
message.setQos(0);
|
MqttClient mqttClient = MqttClientManager.getInstance().getMqttClientById(clientId);
|
|
if (null == mqttClient) {
|
log.error("Not exist mqttClient where it's clientId is {}", clientId);
|
return topic;
|
}
|
|
long start = System.currentTimeMillis();
|
mqttClient.publish(publishPrefix+topic, message);
|
log.info("{}",publishPrefix+topic);
|
log.info("{}", message);
|
|
/**
|
* 实现伪同步
|
* */
|
try {
|
String returnFrame = null ;
|
returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
|
|
for (int i = 0;i < 50;i++){
|
if (StringUtil.strIsNullOrEmpty(returnFrame)){
|
sleep(100);
|
returnFrame = MQTT_RETURN_FRAME_MAP.get(topic);
|
}else {
|
log.info("返回时间:{} ms",System.currentTimeMillis() - start) ;
|
String remove = MQTT_RETURN_FRAME_MAP.remove(topic);
|
log.info("remove结果:{} ",remove);
|
return returnFrame;
|
}
|
}
|
|
|
} catch (InterruptedException e) {
|
|
}
|
|
} catch (MqttException e) {
|
log.error("MqttClient send msg faild!", e);
|
return("通信超时");
|
}
|
return null;
|
}
|
|
|
|
}
|