package com.sandu.ximon.admin.manager.iot.amqp; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor; import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor; import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor; import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor; import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5AtmosphereHeartbeatReportInnerFrame; import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3CommonReportInnerFrame; import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3ErrorCodeReportInnerFrame; import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3HeartbeatReportInnerFrame; import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame; import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage; import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum; import com.sandu.ximon.admin.manager.iot.rrpc.enums.C3ChargingEnum; import com.sandu.ximon.admin.manager.iot.rrpc.enums.C3mRedisConstant; import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils; import com.sandu.ximon.admin.service.C3ChargingService; import com.sandu.ximon.admin.service.C3mOrderService; import com.sandu.ximon.admin.service.PoleBindingService; import com.sandu.ximon.admin.service.PoleService; import com.sandu.ximon.admin.utils.LogUtils; import com.sandu.ximon.admin.utils.RedisUtils; import com.sandu.ximon.admin.vo.C3mOrderVO; import com.sandu.ximon.dao.domain.C3mCharging; import com.sandu.ximon.dao.domain.C3mOrder; import com.sandu.ximon.dao.domain.PoleBinding; import com.sandu.ximon.dao.enums.OrderStatus; import com.sandu.ximon.dao.enums.OrderType; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import javax.jms.Message; import javax.jms.MessageListener; import java.util.Date; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author chenjiantian * @date 2021/12/2 17:33 * 处理amqp订阅消息 */ @Slf4j public class AmqpMessageListener implements MessageListener { protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50000), new NameTreadFactory()); static class NameTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, "ampq-msg-thread-" + mThreadNum.getAndIncrement()); } } @Override public void onMessage(Message message) { EXECUTOR_SERVICE.submit(() -> processMessage(message)); } /** * 在这里处理您收到消息后的具体业务逻辑。 */ private void processMessage(Message message) { try { byte[] body = message.getBody(byte[].class); String content = new String(body); Map map = JSON.parseObject(content, Map.class); String topic = message.getStringProperty("topic"); String messageId = message.getStringProperty("messageId"); log.info("收到订阅" + topic + "," + messageId); log.info(content); if (null != map.get("status")) { // 上下线上报处理 } else { // 设备数据上报 CommonReportMessage commonReportMessage = JSON.parseObject(content, CommonReportMessage.class); CommonFrame connectFrame = FrameUtils.transformMessageToFrame(commonReportMessage.getItems().getParams().getValue()); processTask(commonReportMessage.getProductKey(), commonReportMessage.getDeviceName(), connectFrame); } } catch (Exception e) { e.printStackTrace(); } finally { } } /** * 处理任务 * * @param productKey 产品码 * @param deviceName 产品名称 * @param frame 上报帧 */ private void processTask(String productKey, String deviceName, CommonFrame frame) { if (frame == null) { return; } log.info("处理订阅"); log.info(frame.toString()); if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) { // 单灯数据上报处理 LightDataProcessor.getInstance().process(productKey, deviceName, frame); } /*else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) { // C3充电桩上报处理 c3ChargingReportAnalysis(productKey, deviceName, frame); //TODO 正式上线打开 } */else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) { // //测试用的 // String s = "{\"connectType\":\"FE\",\"crc32\":\"8685DF1B\",\"functionCode\":\"A5\",\"orderType\":\"84\",\"payload\":\"F0010022FFFF0101010907640000000000000000419D000000360089000000000002019300008CA14C69\",\"payloadLength\":\"002A\",\"validate\":true}"; // CommonFrame commonFrame1 = JSON.parseObject(s, CommonFrame.class); // 大气数据指令上报 AirDataProcessor.getInstance().process(productKey, deviceName, frame); // atmosphereAnalysis(productKey, deviceName, frame); } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) { PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame); } } private void c3ChargingReportAnalysis(String productKey, String deviceName, CommonFrame frame) { String functionCode = frame.getPayload().substring(2, 4); if (C3ChargingEnum.NETWORK_REQUEST.getCode().equals(functionCode)) { c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame); } else if (C3ChargingEnum.QR_CODE_REQUEST.getCode().equals(functionCode)) { c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame); } else if (C3ChargingEnum.HEART_BEAT.getCode().equals(functionCode)) { // 心跳包上报(42) 若五分钟无心跳包数据,则判断离线 c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame); } else if (C3ChargingEnum.CHARGE_COMPLETE.getCode().equals(functionCode)) { c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame); } else if (C3ChargingEnum.CHARGE_STOP.getCode().equals(functionCode)) { c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame); } else if (C3ChargingEnum.ERROR_CODE.getCode().equals(functionCode)) { c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame); } } }