package com.sandu.ximon.admin.localMQTT.callback; import com.alibaba.fastjson.JSON; import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg; import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils; import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor; import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor; import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor; import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor; import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame; import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum; import com.sandu.ximon.admin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP; /** * @author van * @version 1.0 * msg:默认回调 * @date 2022/11/9 16:24 */ @Slf4j @Component("java_server_status") public class StatusMqttCallBack extends AbsMqttCallBack { private static final String localMqttConnectStatusConnected = "connected"; private static final String localMqttConnectStatusDisconnected = "disconnected"; public static final String localMqttConnectStatus = "localMqttConnectStatus."; protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50000), new StatusMqttCallBack.NameTreadFactory()); static class NameTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement()); } } @Override protected void handleReceiveMessage(String topic, String message) { EXECUTOR_SERVICE.submit(() -> processMessage(topic,message)); // log.info("接收到消息---StatusMqttCallBack:topic={},message={}", topic, message); } /** * 在这里处理您收到消息后的具体业务逻辑。 */ private void processMessage(String topic,String messageString) { try { String mac = topic.split("/")[4]; String status = topic.split("/")[5]; if (status.equals(localMqttConnectStatusConnected)){ // 设备数据上报 boolean set = RedisUtils.getBean().set(localMqttConnectStatus + mac, 1); log.error("设备上线------,mac:{},上线,{}",mac,set); } else if (status.equals(localMqttConnectStatusDisconnected)){ boolean set = RedisUtils.getBean().set(localMqttConnectStatus+mac,0); log.error("设备下线------,mac:{},下线.{}",mac,set); } } catch (Exception e) { e.printStackTrace(); } } }