package com.sandu.ximon.admin.localMQTT.callback;
|
|
|
import com.alibaba.fastjson.JSON;
|
import com.sandu.ximon.admin.localMQTT.model.LocalMqttMsg;
|
import com.sandu.ximon.admin.localMQTT.util.HexFrameUtils;
|
import com.sandu.ximon.admin.manager.iot.amqp.AmqpMessageListener;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
|
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
|
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage;
|
import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
|
import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
|
import com.sandu.ximon.admin.utils.RedisUtils;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Component;
|
|
import javax.jms.Message;
|
import java.util.Map;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import static com.sandu.ximon.admin.localMQTT.util.MqttClientUtil.MQTT_RETURN_FRAME_MAP;
|
|
/**
|
* @author van
|
* @version 1.0
|
* msg:默认回调
|
* @date 2022/11/9 16:24
|
*/
|
@Slf4j
|
@Component("default")
|
public class DefaultMqttCallBack extends AbsMqttCallBack {
|
|
private static final String localMqttConnectTypeOfSync = "1";
|
|
private static final String localMqttConnectTypeOfAsync = "2";
|
|
public static final String localMqttSyncFrame = "local_mqtt_sync_frame:";
|
|
|
protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
|
Runtime.getRuntime().availableProcessors(),
|
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
|
new LinkedBlockingQueue<>(50000), new DefaultMqttCallBack.NameTreadFactory());
|
|
static class NameTreadFactory implements ThreadFactory {
|
|
private final AtomicInteger mThreadNum = new AtomicInteger(1);
|
|
@Override
|
public Thread newThread(Runnable r) {
|
return new Thread(r, "local-MQTT-msg-thread-" + mThreadNum.getAndIncrement());
|
}
|
}
|
|
@Override
|
protected void handleReceiveMessage(String topic, String message) {
|
EXECUTOR_SERVICE.submit(() -> processMessage(topic,message));
|
|
log.info("接收到消息---DefaultCallBack:topic={},message={}", topic, message);
|
}
|
|
|
/**
|
* 在这里处理您收到消息后的具体业务逻辑。
|
*/
|
private void processMessage(String topic,String messageString) {
|
try {
|
|
String mac = topic.split("/")[3];
|
LocalMqttMsg localMqttMsg = JSON.parseObject(messageString, LocalMqttMsg.class);
|
|
if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfAsync)){
|
// 设备数据上报
|
processTask(null,mac, localMqttMsg);
|
}
|
else if (localMqttMsg.getConnectType().equals(localMqttConnectTypeOfSync)){
|
System.out.println(localMqttMsg.getPayload());
|
MQTT_RETURN_FRAME_MAP.put(mac,localMqttMsg.getPayload());
|
}
|
|
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 处理任务
|
*
|
* @param productKey 产品码
|
* @param deviceName 产品名称
|
* @param localMqttMsg 上报消息localMqttMsg
|
*/
|
private void processTask(String productKey, String deviceName, LocalMqttMsg localMqttMsg) {
|
if (localMqttMsg == null) {
|
return;
|
}
|
log.info("处理订阅");
|
log.info(localMqttMsg.toString());
|
CommonFrame frame = HexFrameUtils.transformMessageToFrame(localMqttMsg.getPayload());
|
if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) {
|
// 单灯数据上报处理
|
LightDataProcessor.getInstance().process(productKey, deviceName, frame);
|
} else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) {
|
// C3充电桩上报处理
|
c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame);
|
} else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) {
|
// 大气数据指令上报
|
AirDataProcessor.getInstance().process(productKey, deviceName, frame);
|
} else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) {
|
PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame);
|
}
|
}
|
}
|