package com.sandu.ximon.admin.manager.iot.amqp; import com.alibaba.fastjson.JSON; import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonReportMessage; import com.sandu.ximon.admin.manager.iot.rrpc.dto.CommonFrame; import com.sandu.ximon.admin.manager.iot.rrpc.util.FrameUtils; import lombok.extern.slf4j.Slf4j; import javax.jms.Message; import javax.jms.MessageListener; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author chenjiantian * @date 2021/12/2 17:33 * 处理amqp订阅消息 */ @Slf4j public class AmqpMessageListener implements MessageListener { protected final static ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50000), new NameTreadFactory()); static class NameTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, "ampq-msg-thread-" + mThreadNum.getAndIncrement()); } } @Override public void onMessage(Message message) { EXECUTOR_SERVICE.submit(() -> processMessage(message)); } /** * 在这里处理您收到消息后的具体业务逻辑。 */ private void processMessage(Message message) { try { byte[] body = message.getBody(byte[].class); String content = new String(body); Map map = JSON.parseObject(content, Map.class); String topic = message.getStringProperty("topic"); String messageId = message.getStringProperty("messageId"); log.info("收到订阅" + topic + "," + messageId); log.info(content); if (null != map.get("status")) { // 上下线上报处理 } else { // 设备数据上报 CommonReportMessage commonReportMessage = JSON.parseObject(content, CommonReportMessage.class); CommonFrame connectFrame = FrameUtils.transformMessageToFrame(commonReportMessage.getItems().getParams().getValue()); processTask(commonReportMessage.getProductKey(), commonReportMessage.getDeviceName(), connectFrame); } } catch (Exception e) { e.printStackTrace(); } finally { } } /** * 处理任务 * * @param productKey 产品码 * @param deviceName 产品名称 * @param connectFrame 上报帧 */ private void processTask(String productKey, String deviceName, CommonFrame connectFrame) { } }