package com.sandu.ximon.admin.manager.iot.amqp; import org.apache.commons.codec.binary.Base64; import org.apache.qpid.jms.JmsConnection; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import java.util.Hashtable; /** * @author chenjiantian * @date 2021/12/2 17:18 * ampq订阅客户端 */ public class AmqpClient { private final AmqpConnectionListener amqpConnectionListener = new AmqpConnectionListener(); private final AmqpMessageListener amqpMessageListener = new AmqpMessageListener(); private IAmqpSubscriptionConfig config; public AmqpClient(IAmqpSubscriptionConfig config) { this.config = config; } /** * 启动ampq订阅客户端 * * @throws Exception */ public void execute() throws Exception { for (int i = 0; i < config.getConnectionCount(); i++) { long timeStamp = System.currentTimeMillis(); //签名方法:支持hmacmd5、hmacsha1和hmacsha256。 String signMethod = "hmacsha1"; // userName组装方法,请参见AMQP客户端接入说明文档。 String userName = config.getClientId() + "-" + i + "|authMode=aksign" + ",signMethod=" + signMethod + ",timestamp=" + timeStamp + ",authId=" + config.getAccessKey() + ",iotInstanceId=" + config.getIotInstanceId() + ",consumerGroupId=" + config.getConsumerGroupId() + "|"; //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 String signContent = "authId=" + config.getAccessKey() + "×tamp=" + timeStamp; String password = doSign(signContent, config.getAccessSecret(), signMethod); String connectionUrl = "failover:(amqps://" + config.getHost() + ":5671?amqp.idleTimeout=80000)" + "?failover.reconnectDelay=30"; Hashtable hashtable = new Hashtable<>(); hashtable.put("connectionfactory.SBCF", connectionUrl); hashtable.put("queue.QUEUE", "default"); hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); Context context = new InitialContext(hashtable); ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF"); Destination queue = (Destination) context.lookup("QUEUE"); // 创建连接。 Connection connection = cf.createConnection(userName, password); // connections.add(connection); ((JmsConnection) connection).addConnectionListener(amqpConnectionListener); // 创建会话。 // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。 // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); // 创建Receiver连接。 MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(amqpMessageListener); } } /** * 签名校验 */ public String doSign(String toSignString, String secret, String signMethod) throws Exception { SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod); Mac mac = Mac.getInstance(signMethod); mac.init(signingKey); byte[] rawHmac = mac.doFinal(toSignString.getBytes()); return Base64.encodeBase64String(rawHmac); } }