package com.sandu.ximon.admin.manager.iot.amqp;
|
|
import com.alibaba.fastjson.JSON;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.*;
|
import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5AtmosphereHeartbeatReportInnerFrame;
|
import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3CommonReportInnerFrame;
|
import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3ErrorCodeReportInnerFrame;
|
import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3HeartbeatReportInnerFrame;
|
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
|
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage;
|
import com.sandu.ximon.admin.manager.iot.rrpc.enums.*;
|
import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
|
import com.sandu.ximon.admin.service.C3ChargingService;
|
import com.sandu.ximon.admin.service.C3mOrderService;
|
import com.sandu.ximon.admin.service.PoleBindingService;
|
import com.sandu.ximon.admin.service.PoleService;
|
import com.sandu.ximon.admin.utils.LogUtils;
|
import com.sandu.ximon.admin.utils.RedisUtils;
|
import com.sandu.ximon.admin.vo.C3mOrderVO;
|
import com.sandu.ximon.dao.domain.C3mCharging;
|
import com.sandu.ximon.dao.domain.C3mOrder;
|
import com.sandu.ximon.dao.domain.PoleBinding;
|
import com.sandu.ximon.dao.enums.OrderStatus;
|
import com.sandu.ximon.dao.enums.OrderType;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import javax.jms.Message;
|
import javax.jms.MessageListener;
|
import java.util.Date;
|
import java.util.Map;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
/**
|
* @author chenjiantian
|
* @date 2021/12/2 17:33
|
* 处理amqp订阅消息
|
*/
|
@Slf4j
|
public class AmqpMessageListener implements MessageListener {
|
|
|
protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
|
Runtime.getRuntime().availableProcessors() * 2,
|
Runtime.getRuntime().availableProcessors() * 4, 60, TimeUnit.SECONDS,
|
new LinkedBlockingQueue<>(50000), new NameTreadFactory());
|
|
static class NameTreadFactory implements ThreadFactory {
|
|
private final AtomicInteger mThreadNum = new AtomicInteger(1);
|
|
@Override
|
public Thread newThread(Runnable r) {
|
return new Thread(r, "ampq-msg-thread-" + mThreadNum.getAndIncrement());
|
}
|
}
|
|
@Override
|
public void onMessage(Message message) {
|
EXECUTOR_SERVICE.submit(() -> processMessage(message));
|
}
|
|
/**
|
* 在这里处理您收到消息后的具体业务逻辑。
|
*/
|
private void processMessage(Message message) {
|
try {
|
|
byte[] body = message.getBody(byte[].class);
|
String content = new String(body);
|
Map map = JSON.parseObject(content, Map.class);
|
String topic = message.getStringProperty("topic");
|
String messageId = message.getStringProperty("messageId");
|
|
// log.info("收到订阅" + topic + "," + messageId);
|
// log.info(content);
|
|
if (null != map.get("status")) {
|
// 上下线上报处理
|
} else {
|
// 设备数据上报
|
CommonReportMessage commonReportMessage = JSON.parseObject(content, CommonReportMessage.class);
|
|
CommonFrame connectFrame = FrameUtils.transformMessageToFrame(commonReportMessage.getItems().getParams().getValue());
|
|
processTask(commonReportMessage.getProductKey(), commonReportMessage.getDeviceName(), connectFrame);
|
}
|
|
} catch (Exception e) {
|
e.printStackTrace();
|
} finally {
|
|
}
|
}
|
|
/**
|
* 处理任务
|
*
|
* @param productKey 产品码
|
* @param deviceName 产品名称
|
* @param frame 上报帧
|
*/
|
private void processTask(String productKey, String deviceName, CommonFrame frame) {
|
if (frame == null) {
|
return;
|
}
|
// if (!deviceName.equals("3930364d485010ff803affff")){
|
// return;
|
// }
|
log.info("处理订阅:\nmac:{},frame:{}",deviceName,frame.toString());
|
if (frame.getFunctionCode().equals("A5")){
|
log.info("A5");
|
if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) {
|
// 单灯数据上报处理
|
LightDataProcessor.getInstance().process(productKey, deviceName, frame);
|
} else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) {
|
// C3充电桩上报处理
|
c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame);
|
} else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) {
|
// 大气数据指令上报
|
AirDataProcessor.getInstance().process(productKey, deviceName, frame);
|
} else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) {
|
PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame);
|
}
|
} else if (frame.getFunctionCode().equals("A7")) {
|
log.info("A7");
|
|
if (frame.getOrderType().equals(A7OrderEnum.RESPONSE_PLC_DATA.getCode())){
|
// PLC
|
PlcDataProcessor.getInstance().process(productKey, deviceName,frame);
|
}
|
}
|
|
}
|
}
|