| | |
| | | package com.sandu.ximon.admin.manager.iot.amqp; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.sandu.ximon.admin.manager.iot.frame.inner.report.*; |
| | | import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
| | | import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor; |
| | | import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor; |
| | | import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor; |
| | | import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor; |
| | | import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5AtmosphereHeartbeatReportInnerFrame; |
| | | import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3CommonReportInnerFrame; |
| | | import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3ErrorCodeReportInnerFrame; |
| | | import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3HeartbeatReportInnerFrame; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5C3ReportEnum; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5LightReportEnum; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.enums.C3ChargingEnum; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.enums.C3mRedisConstant; |
| | | import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils; |
| | | import com.sandu.ximon.admin.service.C3ChargingService; |
| | | import com.sandu.ximon.admin.service.C3mOrderService; |
| | | import com.sandu.ximon.admin.service.PoleBindingService; |
| | | import com.sandu.ximon.admin.service.PoleService; |
| | | import com.sandu.ximon.admin.utils.LogUtils; |
| | | import com.sandu.ximon.admin.utils.RedisUtils; |
| | | import com.sandu.ximon.admin.vo.C3mOrderVO; |
| | | import com.sandu.ximon.dao.domain.C3mCharging; |
| | | import com.sandu.ximon.dao.domain.C3mOrder; |
| | | import com.sandu.ximon.dao.domain.PoleBinding; |
| | | import com.sandu.ximon.dao.enums.OrderStatus; |
| | | import com.sandu.ximon.dao.enums.OrderType; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | |
| | | import javax.jms.Message; |
| | | import javax.jms.MessageListener; |
| | | import java.util.Date; |
| | | import java.util.Map; |
| | | import java.util.concurrent.*; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | |
| | | */ |
| | | @Slf4j |
| | | public class AmqpMessageListener implements MessageListener { |
| | | |
| | | |
| | | protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( |
| | | Runtime.getRuntime().availableProcessors(), |
| | |
| | | String topic = message.getStringProperty("topic"); |
| | | String messageId = message.getStringProperty("messageId"); |
| | | |
| | | // log.info("收到订阅" + topic + "," + messageId); |
| | | // log.info(content); |
| | | log.info("收到订阅" + topic + "," + messageId); |
| | | log.info(content); |
| | | |
| | | if (null != map.get("status")) { |
| | | // 上下线上报处理 |
| | |
| | | log.info(frame.toString()); |
| | | if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) { |
| | | // 单灯数据上报处理 |
| | | lightDataReportAnalysis(productKey, deviceName, frame); |
| | | LightDataProcessor.getInstance().process(productKey, deviceName, frame); |
| | | } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) { |
| | | // C3充电桩上报处理 |
| | | c3ChargingReportAnalysis(productKey, deviceName, frame); |
| | | c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame); |
| | | } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) { |
| | | // 大气数据指令上报 |
| | | atmosphereAnalysis(productKey, deviceName, frame); |
| | | } |
| | | } |
| | | |
| | | private void c3ChargingReportAnalysis(String productKey, String deviceName, CommonFrame frame) { |
| | | String functionCode = frame.getPayload().substring(2, 4); |
| | | if (A5C3ReportEnum.NETWORK_REQUEST.getCode().equals(functionCode)) { |
| | | A5C3CommonReportInnerFrame netRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); |
| | | } else if (A5C3ReportEnum.QR_CODE_REQUEST.getCode().equals(functionCode)) { |
| | | A5C3CommonReportInnerFrame codeRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); |
| | | } else if (A5C3ReportEnum.HEART_BEAT.getCode().equals(functionCode)) { |
| | | A5C3HeartbeatReportInnerFrame heartbeatReportInnerFrame = new A5C3HeartbeatReportInnerFrame().transformFrame(frame.getPayload()); |
| | | } else if (A5C3ReportEnum.CHARGE_COMPLETE.getCode().equals(functionCode)) { |
| | | A5C3CommonReportInnerFrame completeRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); |
| | | |
| | | } else if (A5C3ReportEnum.CHARGE_STOP.getCode().equals(functionCode)) { |
| | | A5C3CommonReportInnerFrame stopRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload()); |
| | | |
| | | } else if (A5C3ReportEnum.ERROR_CODE.getCode().equals(functionCode)) { |
| | | A5C3ErrorCodeReportInnerFrame errorCodeRequestFrame = new A5C3ErrorCodeReportInnerFrame().transformFrame(frame.getPayload()); |
| | | |
| | | } |
| | | } |
| | | |
| | | private void atmosphereAnalysis(String productKey, String deviceName, CommonFrame frame) { |
| | | A5AtmosphereHeartbeatReportInnerFrame transformFrame = new A5AtmosphereHeartbeatReportInnerFrame().transformFrame(frame.getPayload()); |
| | | log.info("大气心跳上报"); |
| | | log.info(transformFrame.toString()); |
| | | } |
| | | |
| | | private void lightDataReportAnalysis(String productKey, String deviceName, CommonFrame frame) { |
| | | String functionCode = frame.getPayload().substring(2, 4); |
| | | if (A5LightReportEnum.HeartBeat_Data.getCode().equals(functionCode)) { |
| | | // log.info("心跳相应"); |
| | | A5LightHeartbeatReportInnerFrame heartbeatReportInnerFrame = new A5LightHeartbeatReportInnerFrame().transformFrame(frame.getPayload()); |
| | | // log.info(heartbeatReportInnerFrame.toString()); |
| | | } else if (A5LightReportEnum.Time_Synchronized.getCode().equals(functionCode)) { |
| | | // log.info("请求时间同步"); |
| | | A5LightTimeSyncReportInnerFrame syncRespInnerFrame = new A5LightTimeSyncReportInnerFrame().transformFrame(frame.getPayload()); |
| | | // log.info(syncRespInnerFrame.toString()); |
| | | } else if (A5LightReportEnum.Error_Code.getCode().equals(functionCode)) { |
| | | // log.info("故障码上报"); |
| | | A5LightErrorCodeReportInnerFrame codeRespInnerFrame = new A5LightErrorCodeReportInnerFrame().transformFrame(frame.getPayload()); |
| | | // log.info(codeRespInnerFrame.toString()); |
| | | AirDataProcessor.getInstance().process(productKey, deviceName, frame); |
| | | } else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) { |
| | | PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame); |
| | | } |
| | | } |
| | | } |