package com.sandu.ximon.admin.localMQTT.util; import com.alibaba.fastjson.JSON; import com.sandu.ximon.admin.localMQTT.client.MqttClientManager; import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg; import com.sandu.ximon.admin.utils.StringUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import static java.lang.Thread.sleep; /** * @author van * @version 1.0 * msg:MQTT客户端工具类 * @date 2022/11/9 16:32 */ @Slf4j public class MqttClientUtil { private static MqttClientUtil mqttClientUtil = new MqttClientUtil(); public static MqttClientUtil getInstance() { return mqttClientUtil; } public static String publishPrefix = "v1/devices/request/"; public static String clientId = "java_server_msg"; public static final Map MQTT_RETURN_FRAME_MAP = new ConcurrentHashMap<>(); public static String sendMqttMsg(String topic, String content) { try { LocalMqttMsg localMqttMsg = new LocalMqttMsg(); localMqttMsg.setPayload(content); MqttMessage message = new MqttMessage(JSON.toJSONString(localMqttMsg).getBytes()); message.setQos(0); MqttClient mqttClient = MqttClientManager.getInstance().getMqttClientById(clientId); if (null == mqttClient) { log.error("Not exist mqttClient where it's clientId is {}", clientId); return topic; } long start = System.currentTimeMillis(); mqttClient.publish(publishPrefix+topic, message); log.info("{}",publishPrefix+topic); log.info("{}", message); /** * 实现伪同步 * */ try { String returnFrame = null ; returnFrame = MQTT_RETURN_FRAME_MAP.get(topic); for (int i = 0;i < 50;i++){ if (StringUtil.strIsNullOrEmpty(returnFrame)){ sleep(100); returnFrame = MQTT_RETURN_FRAME_MAP.get(topic); }else { log.info("返回时间:{} ms",System.currentTimeMillis() - start) ; String remove = MQTT_RETURN_FRAME_MAP.remove(topic); log.info("remove结果:{} ",remove); return returnFrame; } } } catch (InterruptedException e) { } } catch (MqttException e) { log.error("MqttClient send msg faild!", e); return("通信超时"); } return null; } }