package com.sandu.ximon.admin.manager.iot.amqp; import com.alibaba.fastjson.JSON; import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor; import com.sandu.ximon.admin.manager.iot.frame.inner.report.*; import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame; import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage; import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5C3ReportEnum; import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5LightReportEnum; import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum; import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils; import lombok.extern.slf4j.Slf4j; import javax.jms.Message; import javax.jms.MessageListener; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author chenjiantian * @date 2021/12/2 17:33 * 处理amqp订阅消息 */ @Slf4j public class AmqpMessageListener implements MessageListener { protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50000), new NameTreadFactory()); static class NameTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, "ampq-msg-thread-" + mThreadNum.getAndIncrement()); } } @Override public void onMessage(Message message) { EXECUTOR_SERVICE.submit(() -> processMessage(message)); } /** * 在这里处理您收到消息后的具体业务逻辑。 */ private void processMessage(Message message) { try { byte[] body = message.getBody(byte[].class); String content = new String(body); Map map = JSON.parseObject(content, Map.class); String topic = message.getStringProperty("topic"); String messageId = message.getStringProperty("messageId"); // log.info("收到订阅" + topic + "," + messageId); // log.info(content); if (null != map.get("status")) { // 上下线上报处理 } else { // 设备数据上报 CommonReportMessage commonReportMessage = JSON.parseObject(content, CommonReportMessage.class); CommonFrame connectFrame = FrameUtils.transformMessageToFrame(commonReportMessage.getItems().getParams().getValue()); processTask(commonReportMessage.getProductKey(), commonReportMessage.getDeviceName(), connectFrame); } } catch (Exception e) { e.printStackTrace(); } finally { } } /** * 处理任务 * * @param productKey 产品码 * @param deviceName 产品名称 * @param frame 上报帧 */ private void processTask(String productKey, String deviceName, CommonFrame frame) { if (frame == null) { return; } log.info("处理订阅"); log.info(frame.toString()); if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) { // 单灯数据上报处理 LightDataProcessor.getInstance().process(productKey,deviceName,frame); } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) { // C3充电桩上报处理 c3ChargingReportAnalysis(productKey, deviceName, frame); } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) { // 大气数据指令上报 atmosphereAnalysis(productKey, deviceName, frame); } } private void c3ChargingReportAnalysis(String productKey, String deviceName, CommonFrame frame) { String functionCode = frame.getPayload().substring(2, 4); if (A5C3ReportEnum.NETWORK_REQUEST.getCode().equals(functionCode)) { A5C3CommonReportInnerFrame netRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); } else if (A5C3ReportEnum.QR_CODE_REQUEST.getCode().equals(functionCode)) { A5C3CommonReportInnerFrame codeRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); } else if (A5C3ReportEnum.HEART_BEAT.getCode().equals(functionCode)) { A5C3HeartbeatReportInnerFrame heartbeatReportInnerFrame = new A5C3HeartbeatReportInnerFrame().transformFrame(frame.getPayload()); } else if (A5C3ReportEnum.CHARGE_COMPLETE.getCode().equals(functionCode)) { A5C3CommonReportInnerFrame completeRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); } else if (A5C3ReportEnum.CHARGE_STOP.getCode().equals(functionCode)) { A5C3CommonReportInnerFrame stopRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); } else if (A5C3ReportEnum.ERROR_CODE.getCode().equals(functionCode)) { A5C3ErrorCodeReportInnerFrame errorCodeRequestFrame = new A5C3ErrorCodeReportInnerFrame().transformFrame(frame.getPayload()); } } private void atmosphereAnalysis(String productKey, String deviceName, CommonFrame frame) { A5AtmosphereHeartbeatReportInnerFrame transformFrame = new A5AtmosphereHeartbeatReportInnerFrame().transformFrame(frame.getPayload()); log.info("大气心跳上报"); log.info(transformFrame.toString()); } }