package com.sandu.ximon.admin.manager.iot.amqp;
|
|
import com.alibaba.fastjson.JSON;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.AirDataProcessor;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.LightDataProcessor;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.PoleMonitorDataProcessor;
|
import com.sandu.ximon.admin.manager.iot.amqp.processor.c3ChargingProcessor;
|
import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5AtmosphereHeartbeatReportInnerFrame;
|
import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3CommonReportInnerFrame;
|
import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3ErrorCodeReportInnerFrame;
|
import com.sandu.ximon.admin.manager.iot.frame.inner.report.A5C3HeartbeatReportInnerFrame;
|
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame;
|
import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage;
|
import com.sandu.ximon.admin.manager.iot.rrpc.enums.A5OrderEnum;
|
import com.sandu.ximon.admin.manager.iot.rrpc.enums.C3ChargingEnum;
|
import com.sandu.ximon.admin.manager.iot.rrpc.enums.C3mRedisConstant;
|
import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils;
|
import com.sandu.ximon.admin.service.C3ChargingService;
|
import com.sandu.ximon.admin.service.C3mOrderService;
|
import com.sandu.ximon.admin.service.PoleBindingService;
|
import com.sandu.ximon.admin.service.PoleService;
|
import com.sandu.ximon.admin.utils.LogUtils;
|
import com.sandu.ximon.admin.utils.RedisUtils;
|
import com.sandu.ximon.admin.vo.C3mOrderVO;
|
import com.sandu.ximon.dao.domain.C3mCharging;
|
import com.sandu.ximon.dao.domain.C3mOrder;
|
import com.sandu.ximon.dao.domain.PoleBinding;
|
import com.sandu.ximon.dao.enums.OrderStatus;
|
import com.sandu.ximon.dao.enums.OrderType;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import javax.jms.Message;
|
import javax.jms.MessageListener;
|
import java.util.Date;
|
import java.util.Map;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
/**
|
* @author chenjiantian
|
* @date 2021/12/2 17:33
|
* 处理amqp订阅消息
|
*/
|
@Slf4j
|
public class AmqpMessageListener implements MessageListener {
|
|
@Autowired
|
private C3ChargingService c3ChargingService;
|
@Autowired
|
private PoleBindingService bindingService;
|
@Autowired
|
private PoleService poleService;
|
@Autowired
|
private C3mOrderService orderService;
|
|
|
protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
|
Runtime.getRuntime().availableProcessors(),
|
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
|
new LinkedBlockingQueue<>(50000), new NameTreadFactory());
|
|
static class NameTreadFactory implements ThreadFactory {
|
|
private final AtomicInteger mThreadNum = new AtomicInteger(1);
|
|
@Override
|
public Thread newThread(Runnable r) {
|
return new Thread(r, "ampq-msg-thread-" + mThreadNum.getAndIncrement());
|
}
|
}
|
|
@Override
|
public void onMessage(Message message) {
|
EXECUTOR_SERVICE.submit(() -> processMessage(message));
|
}
|
|
/**
|
* 在这里处理您收到消息后的具体业务逻辑。
|
*/
|
private void processMessage(Message message) {
|
try {
|
|
byte[] body = message.getBody(byte[].class);
|
String content = new String(body);
|
Map map = JSON.parseObject(content, Map.class);
|
String topic = message.getStringProperty("topic");
|
String messageId = message.getStringProperty("messageId");
|
|
log.info("收到订阅" + topic + "," + messageId);
|
log.info(content);
|
|
if (null != map.get("status")) {
|
// 上下线上报处理
|
} else {
|
// 设备数据上报
|
CommonReportMessage commonReportMessage = JSON.parseObject(content, CommonReportMessage.class);
|
|
CommonFrame connectFrame = FrameUtils.transformMessageToFrame(commonReportMessage.getItems().getParams().getValue());
|
|
processTask(commonReportMessage.getProductKey(), commonReportMessage.getDeviceName(), connectFrame);
|
}
|
|
} catch (Exception e) {
|
e.printStackTrace();
|
} finally {
|
|
}
|
}
|
|
/**
|
* 处理任务
|
*
|
* @param productKey 产品码
|
* @param deviceName 产品名称
|
* @param frame 上报帧
|
*/
|
private void processTask(String productKey, String deviceName, CommonFrame frame) {
|
if (frame == null) {
|
return;
|
}
|
log.info("处理订阅");
|
log.info(frame.toString());
|
if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_LIGHT_DATA.getCode())) {
|
// 单灯数据上报处理
|
LightDataProcessor.getInstance().process(productKey, deviceName, frame);
|
} else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_C3_DATA.getCode())) {
|
// C3充电桩上报处理
|
c3ChargingReportAnalysis(productKey, deviceName, frame);
|
} else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_ATMOSPHERE_DATA.getCode())) {
|
// //测试用的
|
// String s = "{\"connectType\":\"FE\",\"crc32\":\"8685DF1B\",\"functionCode\":\"A5\",\"orderType\":\"84\",\"payload\":\"F0010022FFFF0101010907640000000000000000419D000000360089000000000002019300008CA14C69\",\"payloadLength\":\"002A\",\"validate\":true}";
|
// CommonFrame commonFrame1 = JSON.parseObject(s, CommonFrame.class);
|
// 大气数据指令上报
|
AirDataProcessor.getInstance().process(productKey, deviceName, frame);
|
// atmosphereAnalysis(productKey, deviceName, frame);
|
} else if (frame.getOrderType().equals(A5OrderEnum.RESPONSE_POLE_MONITOR_DATA.getCode())) {
|
PoleMonitorDataProcessor.getInstance().process(productKey, deviceName, frame);
|
}
|
}
|
|
private void c3ChargingReportAnalysis(String productKey, String deviceName, CommonFrame frame) {
|
String functionCode = frame.getPayload().substring(2, 4);
|
if (C3ChargingEnum.NETWORK_REQUEST.getCode().equals(functionCode)) {
|
// A5C3CommonReportInnerFrame netRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload());
|
// log.info("C3充电桩上报处理_netRequestFrame");
|
// log.info(netRequestFrame.toString());
|
//网络请求
|
//
|
c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame);
|
|
} else if (C3ChargingEnum.QR_CODE_REQUEST.getCode().equals(functionCode)) {
|
// 网页操作二维码请求(41)
|
A5C3CommonReportInnerFrame codeRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload());
|
log.info("C3充电桩上报处理_codeRequestFrame");
|
log.info(codeRequestFrame.toString());
|
} else if (C3ChargingEnum.HEART_BEAT.getCode().equals(functionCode)) {
|
// 心跳包上报(42) 若五分钟无心跳包数据,则判断离线
|
c3ChargingProcessor.c3ChargingProcessorgetInstance().process(productKey, deviceName, frame);
|
} else if (C3ChargingEnum.CHARGE_COMPLETE.getCode().equals(functionCode)) {
|
A5C3CommonReportInnerFrame completeRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload());
|
log.info("C3充电桩上报处理_completeRequestFrame");
|
log.info(completeRequestFrame.toString());
|
// 充电结束上报(43)
|
/**
|
* 读取心跳包,判断剩余金额和已充电量,统计到缓存中正在进行的订单。
|
*/
|
A5C3HeartbeatReportInnerFrame.HeartBeatDataPackage aPackage = c3ChargingService.ReadTheHeartbeatPackage(completeRequestFrame.getDestinationAddress());
|
if (aPackage == null) {
|
LogUtils.error("{ 充电桩(" + completeRequestFrame.getDestinationAddress() + ")充电结束上报读取心跳包失败,请检查充电桩是否出现故障! }");
|
return;
|
}
|
refund(aPackage);
|
|
} else if (C3ChargingEnum.CHARGE_STOP.getCode().equals(functionCode)) {
|
A5C3CommonReportInnerFrame stopRequestFrame = new A5C3CommonReportInnerFrame().transformFrame(frame.getPayload());
|
log.info("C3充电桩上报处理_stopRequestFrame");
|
log.info(stopRequestFrame.toString());
|
A5C3HeartbeatReportInnerFrame.HeartBeatDataPackage aPackage = c3ChargingService.ReadTheHeartbeatPackage(stopRequestFrame.getDestinationAddress());
|
if (aPackage == null) {
|
LogUtils.error("{ 充电桩(" + stopRequestFrame.getDestinationAddress() + ")充电结束上报读取心跳包失败,请检查充电桩是否出现故障! }");
|
return;
|
}
|
refund(aPackage);
|
|
} else if (C3ChargingEnum.ERROR_CODE.getCode().equals(functionCode)) {
|
A5C3ErrorCodeReportInnerFrame errorCodeRequestFrame = new A5C3ErrorCodeReportInnerFrame().transformFrame(frame.getPayload());
|
log.info("C3充电桩上报处理_errorCodeRequestFrame");
|
log.info(errorCodeRequestFrame.toString());
|
A5C3HeartbeatReportInnerFrame.HeartBeatDataPackage aPackage = c3ChargingService.ReadTheHeartbeatPackage(errorCodeRequestFrame.getDestinationAddress());
|
if (aPackage == null) {
|
LogUtils.error("{ 充电桩(" + errorCodeRequestFrame.getDestinationAddress() + ")充电结束上报读取心跳包失败,请检查充电桩是否出现故障! }");
|
return;
|
}
|
refund(aPackage);
|
}
|
}
|
|
private void atmosphereAnalysis(String productKey, String deviceName, CommonFrame frame) {
|
A5AtmosphereHeartbeatReportInnerFrame transformFrame = new A5AtmosphereHeartbeatReportInnerFrame().transformFrame(frame.getPayload());
|
log.info("大气心跳上报");
|
log.info(transformFrame.toString());
|
|
}
|
|
|
private void refund(A5C3HeartbeatReportInnerFrame.HeartBeatDataPackage aPackage) {
|
// private void refund(A5C3CommonReportInnerFrame completeRequestFrame,String code){
|
/**
|
* 读取心跳包,判断剩余金额和已充电量,统计到缓存中正在进行的订单。
|
*/
|
String c3Mac = aPackage.getC3Mac();
|
// 获取心跳包中的剩余金额和已充电量,与缓存中正在进行的订单进行对比
|
String chargingOrderJson = RedisUtils.getBean().get(C3mRedisConstant.C3_CHARGING_ORDER.getCode() + c3Mac);
|
if (chargingOrderJson.isEmpty() || null == chargingOrderJson) {
|
C3mCharging c3m = c3ChargingService.getOne(Wrappers.lambdaQuery(C3mCharging.class).eq(C3mCharging::getC3Mac, c3Mac));
|
PoleBinding binding = bindingService.getOne(Wrappers.lambdaQuery(PoleBinding.class)
|
.eq(PoleBinding::getDeviceCode,aPackage.getC3Mac())
|
.eq(PoleBinding::getDeviceType, 2));
|
// 生成订单,并加载到redis缓存,设置超时时间为5分钟
|
C3mOrder order = new C3mOrderVO().generateOrder(binding.getPoleId() == null ? 0L : binding.getPoleId()
|
, c3m.getPoleDevicesCode(), c3m.getC3Mac(), OrderType.ERROR, Double.valueOf(aPackage.getRemainingAmount()),
|
Integer.parseInt(new java.text.DecimalFormat("0").format(aPackage.getReservedCapacity()))
|
);
|
order.setActualChargingCapacity(Double.valueOf(aPackage.getChargedCapacity()));
|
order.setOrderStatus(OrderStatus.REFUNDING.getStatus());
|
order.setRefundAmount(Double.valueOf(aPackage.getRemainingAmount()));
|
orderService.save(order);
|
String s = c3ChargingService.finishCharging(c3Mac);
|
// if(C3ChargingEnum.CHARGE_COMPLETE.getCode().equals(code)){
|
//
|
// }
|
//是否需要区分log类型 待定
|
LogUtils.error("{ 充电桩(" + c3Mac + ")不存在正在进行的订单,请检查充电桩是否出现故障!结束订单,结果为" + s + "}");
|
} else {
|
C3mOrder c3mOrderEntity = JSON.parseObject(chargingOrderJson, C3mOrder.class);
|
// 设置已充电量,订单状态,退款金额,订单退款时间戳,订单退款说明,结束充电时间戳,
|
c3mOrderEntity.setActualChargingCapacity(Double.valueOf(aPackage.getChargedCapacity()));
|
c3mOrderEntity.setOrderStatus(OrderStatus.REFUNDING.getStatus());
|
c3mOrderEntity.setRefundAmount(Double.valueOf(aPackage.getRemainingAmount()));
|
// 获取剩余金额进行退款,并写入当前正在进行的订单
|
boolean b = orderService.orderRefund(c3mOrderEntity.getOutTradeNo(), c3mOrderEntity.getRefundAmount());
|
c3mOrderEntity.setRefundTimestamp(new Date().getTime());
|
if (b) {
|
c3mOrderEntity.setOrderStatus(OrderStatus.REFUND.getStatus());
|
c3mOrderEntity.setRefundMsg("充电结束,订单退款成功");
|
} else {
|
c3mOrderEntity.setOrderStatus(OrderStatus.REFUND_FAILED.getStatus());
|
c3mOrderEntity.setRefundMsg(
|
"充电结束,订单退款失败,请进行手动退款(订单号(" +
|
c3mOrderEntity.getOutTradeNo() + "),总金额(" +
|
c3mOrderEntity.getTotalAmount() + ",退款金额(" +
|
aPackage.getRemainingAmount() + "))"
|
);
|
}
|
c3mOrderEntity.setStopChargingTimestamp(new Date().getTime());
|
orderService.updateById(c3mOrderEntity);
|
// 清除缓存中正在进行中的订单
|
b = RedisUtils.getBean().delete(C3mRedisConstant.C3_CHARGING_ORDER.getCode() + c3mOrderEntity.getC3Mac());
|
if (!b) {
|
try {
|
Thread.sleep(2000);
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
RedisUtils.getBean().delete(C3mRedisConstant.C3_CHARGING_ORDER.getCode() + c3mOrderEntity.getC3Mac());
|
}
|
// 发送结束订单
|
String s = c3ChargingService.finishCharging(c3Mac);
|
LogUtils.error("{ 充电桩(" + c3Mac + ")不存在正在进行的订单,请检查充电桩是否出现故障!结束订单,结果为" + s + "}");
|
}
|
|
}
|
}
|