| ¶Ô±ÈÐÂÎļþ |
| | |
| | | package com.sandu.ximon.admin.manager.iot.amqp; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.sandu.ximon.admin.manager.iot.frame.inner.report.*; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5C3ReportEnum; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5LightReportEnum; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | |
| | | import javax.jms.Message; |
| | | import javax.jms.MessageListener; |
| | | import java.util.Map; |
| | | import java.util.concurrent.*; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | /** |
| | | * @author chenjiantian |
| | | * @date 2021/12/2 17:33 |
| | | * å¤çamqp订é
æ¶æ¯ |
| | | */ |
| | | @Slf4j |
| | | public class AmqpMessageListener implements MessageListener { |
| | | |
| | | protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( |
| | | Runtime.getRuntime().availableProcessors(), |
| | | Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, |
| | | new LinkedBlockingQueue<>(50000), new NameTreadFactory()); |
| | | |
| | | static class NameTreadFactory implements ThreadFactory { |
| | | |
| | | private final AtomicInteger mThreadNum = new AtomicInteger(1); |
| | | |
| | | @Override |
| | | public Thread newThread(Runnable r) { |
| | | return new Thread(r, "ampq-msg-thread-" + mThreadNum.getAndIncrement()); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onMessage(Message message) { |
| | | EXECUTOR_SERVICE.submit(() -> processMessage(message)); |
| | | } |
| | | |
| | | /** |
| | | * å¨è¿éå¤çæ¨æ¶å°æ¶æ¯åçå
·ä½ä¸å¡é»è¾ã |
| | | */ |
| | | private void processMessage(Message message) { |
| | | try { |
| | | |
| | | byte[] body = message.getBody(byte[].class); |
| | | String content = new String(body); |
| | | Map map = JSON.parseObject(content, Map.class); |
| | | String topic = message.getStringProperty("topic"); |
| | | String messageId = message.getStringProperty("messageId"); |
| | | |
| | | // log.info("æ¶å°è®¢é
" + topic + "," + messageId); |
| | | // log.info(content); |
| | | |
| | | if (null != map.get("status")) { |
| | | // ä¸ä¸çº¿ä¸æ¥å¤ç |
| | | } else { |
| | | // è®¾å¤æ°æ®ä¸æ¥ |
| | | CommonReportMessage commonReportMessage = JSON.parseObject(content, CommonReportMessage.class); |
| | | |
| | | CommonFrame connectFrame = FrameUtils.transformMessageToFrame(commonReportMessage.getItems().getParams().getValue()); |
| | | |
| | | processTask(commonReportMessage.getProductKey(), commonReportMessage.getDeviceName(), connectFrame); |
| | | } |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } finally { |
| | | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * å¤çä»»å¡ |
| | | * |
| | | * @param productKey 产åç |
| | | * @param deviceName 产ååç§° |
| | | * @param frame 䏿¥å¸§ |
| | | */ |
| | | private void processTask(String productKey, String deviceName, CommonFrame frame) { |
| | | if (frame == null) { |
| | | return; |
| | | } |
| | | log.info("å¤ç订é
"); |
| | | log.info(frame.toString()); |
| | | if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) { |
| | | // åç¯æ°æ®ä¸æ¥å¤ç |
| | | lightDataReportAnalysis(productKey, deviceName, frame); |
| | | } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) { |
| | | // C3å
çµæ¡©ä¸æ¥å¤ç |
| | | c3ChargingReportAnalysis(productKey, deviceName, frame); |
| | | } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) { |
| | | // å¤§æ°æ°æ®æä»¤ä¸æ¥ |
| | | atmosphereAnalysis(productKey, deviceName, frame); |
| | | } |
| | | } |
| | | |
| | | private void c3ChargingReportAnalysis(String productKey, String deviceName, CommonFrame frame) { |
| | | String functionCode = frame.getPayload().substring(2, 4); |
| | | if (A5C3ReportEnum.NETWORK_REQUEST.getCode().equals(functionCode)) { |
| | | A5C3CommonReportInnerFrame netRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); |
| | | } else if (A5C3ReportEnum.QR_CODE_REQUEST.getCode().equals(functionCode)) { |
| | | A5C3CommonReportInnerFrame codeRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); |
| | | } else if (A5C3ReportEnum.HEART_BEAT.getCode().equals(functionCode)) { |
| | | A5C3HeartbeatReportInnerFrame heartbeatReportInnerFrame = new A5C3HeartbeatReportInnerFrame().transformFrame(frame.getPayload()); |
| | | } else if (A5C3ReportEnum.CHARGE_COMPLETE.getCode().equals(functionCode)) { |
| | | A5C3CommonReportInnerFrame completeRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); |
| | | |
| | | } else if (A5C3ReportEnum.CHARGE_STOP.getCode().equals(functionCode)) { |
| | | A5C3CommonReportInnerFrame stopRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); |
| | | |
| | | } else if (A5C3ReportEnum.ERROR_CODE.getCode().equals(functionCode)) { |
| | | A5C3ErrorCodeReportInnerFrame errorCodeRequestFrame = new A5C3ErrorCodeReportInnerFrame().transformFrame(frame.getPayload()); |
| | | |
| | | } |
| | | } |
| | | |
| | | private void atmosphereAnalysis(String productKey, String deviceName, CommonFrame frame) { |
| | | A5AtmosphereHeartbeatReportInnerFrame transformFrame = new A5AtmosphereHeartbeatReportInnerFrame().transformFrame(frame.getPayload()); |
| | | // log.info("大æ°å¿è·³ä¸æ¥"); |
| | | // log.info(transformFrame.toString()); |
| | | } |
| | | |
| | | private void lightDataReportAnalysis(String productKey, String deviceName, CommonFrame frame) { |
| | | String functionCode = frame.getPayload().substring(2, 4); |
| | | if (A5LightReportEnum.HeartBeat_Data.getCode().equals(functionCode)) { |
| | | // log.info("å¿è·³ç¸åº"); |
| | | A5LightHeartbeatReportInnerFrame heartbeatReportInnerFrame = new A5LightHeartbeatReportInnerFrame().transformFrame(frame.getPayload()); |
| | | // log.info(heartbeatReportInnerFrame.toString()); |
| | | } else if (A5LightReportEnum.Time_Synchronized.getCode().equals(functionCode)) { |
| | | // log.info("è¯·æ±æ¶é´åæ¥"); |
| | | A5LightTimeSyncReportInnerFrame syncRespInnerFrame = new A5LightTimeSyncReportInnerFrame().transformFrame(frame.getPayload()); |
| | | // log.info(syncRespInnerFrame.toString()); |
| | | } else if (A5LightReportEnum.Error_Code.getCode().equals(functionCode)) { |
| | | // log.info("æ
éç 䏿¥"); |
| | | A5LightErrorCodeReportInnerFrame codeRespInnerFrame = new A5LightErrorCodeReportInnerFrame().transformFrame(frame.getPayload()); |
| | | // log.info(codeRespInnerFrame.toString()); |
| | | } |
| | | } |
| | | } |