package com.sandu.ximon.admin.localMQTT.callback; import com.alibaba.fastjson.JSON; import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg; import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils; import com.sandu.ximon.admin.manager.iot.amqp.AmqpMessageListener; import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor; import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor; import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor; import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor; import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame; import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage; import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum; import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils; import com.sandu.ximon.admin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.jms.Message; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP; /** * @author van * @version 1.0 * msg:默认回调 * @date 2022/11/9 16:24 */ @Slf4j @Component("java_server_msg") public class MsgMqttCallBack extends AbsMqttCallBack { private static final String localMqttConnectTypeOfSync = "1"; private static final String localMqttConnectTypeOfAsync = "2"; public static final String localMqttSyncFrame = "local_mqtt_sync_frame:"; protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50000), new MsgMqttCallBack.NameTreadFactory()); static class NameTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement()); } } @Override protected void handleReceiveMessage(String topic, String message) { EXECUTOR_SERVICE.submit(() -> processMessage(topic,message)); log.info("接收到消息---MsgMqttCallBack:topic={},message={}", topic, message); } /** * 在这里处理您收到消息后的具体业务逻辑。 */ private void processMessage(String topic,String messageString) { try { String mac = topic.split("/")[3]; LocalMqttMsg localMqttMsg = JSON.parseObject(messageString, LocalMqttMsg.class); if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfAsync)){ // 设备数据上报 processTask(null,mac, localMqttMsg); } else if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfSync)){ System.out.println(localMqttMsg.getPayload()); MQTT_RETURN_FRAME_MAP.put(mac,localMqttMsg.getPayload()); } } catch (Exception e) { e.printStackTrace(); } } /** * 处理任务 * * @param productKey 产品码 * @param deviceName 产品名称 * @param localMqttMsg 上报消息localMqttMsg */ private void processTask(String productKey, String deviceName, LocalMqttMsg localMqttMsg) { if (localMqttMsg == null) { return; } log.info("处理订阅"); log.info(localMqttMsg.toString()); CommonFrame frame = HexFrameUtils.transformMessageToFrame(localMqttMsg.getPayload()); if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) { // 单灯数据上报处理 LightDataProcessor.getInstance().process(productKey, deviceName, frame); } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) { // C3充电桩上报处理 c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame); } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) { // 大气数据指令上报 AirDataProcessor.getInstance().process(productKey, deviceName, frame); } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) { PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame); } } }