package com.sandu.ximon.admin.manager.iot.amqp;
|
|
import org.apache.commons.codec.binary.Base64;
|
import org.apache.qpid.jms.JmsConnection;
|
|
import javax.crypto.Mac;
|
import javax.crypto.spec.SecretKeySpec;
|
import javax.jms.*;
|
import javax.naming.Context;
|
import javax.naming.InitialContext;
|
import java.util.Hashtable;
|
|
/**
|
* @author chenjiantian
|
* @date 2021/12/2 17:18
|
* ampq订阅客户端
|
*/
|
public class AmqpClient {
|
|
private final AmqpConnectionListener amqpConnectionListener = new AmqpConnectionListener();
|
private final AmqpMessageListener amqpMessageListener = new AmqpMessageListener();
|
|
private IAmqpSubscriptionConfig config;
|
|
public AmqpClient(IAmqpSubscriptionConfig config) {
|
this.config = config;
|
}
|
|
/**
|
* 启动ampq订阅客户端
|
*
|
* @throws Exception
|
*/
|
public void execute() throws Exception {
|
for (int i = 0; i < config.getConnectionCount(); i++) {
|
long timeStamp = System.currentTimeMillis();
|
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
|
String signMethod = "hmacsha1";
|
|
// userName组装方法,请参见AMQP客户端接入说明文档。
|
String userName = config.getClientId() + "-" + i + "|authMode=aksign"
|
+ ",signMethod=" + signMethod
|
+ ",timestamp=" + timeStamp
|
+ ",authId=" + config.getAccessKey()
|
+ ",iotInstanceId=" + config.getIotInstanceId()
|
+ ",consumerGroupId=" + config.getConsumerGroupId()
|
+ "|";
|
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
|
String signContent = "authId=" + config.getAccessKey() + "×tamp=" + timeStamp;
|
String password = doSign(signContent, config.getAccessSecret(), signMethod);
|
String connectionUrl = "failover:(amqps://" + config.getHost() + ":5671?amqp.idleTimeout=80000)"
|
+ "?failover.reconnectDelay=30";
|
|
Hashtable<String, String> hashtable = new Hashtable<>();
|
hashtable.put("connectionfactory.SBCF", connectionUrl);
|
hashtable.put("queue.QUEUE", "default");
|
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
|
Context context = new InitialContext(hashtable);
|
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
|
Destination queue = (Destination) context.lookup("QUEUE");
|
// 创建连接。
|
Connection connection = cf.createConnection(userName, password);
|
// connections.add(connection);
|
|
((JmsConnection) connection).addConnectionListener(amqpConnectionListener);
|
// 创建会话。
|
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
|
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
connection.start();
|
// 创建Receiver连接。
|
MessageConsumer consumer = session.createConsumer(queue);
|
consumer.setMessageListener(amqpMessageListener);
|
}
|
}
|
|
/**
|
* 签名校验
|
*/
|
public String doSign(String toSignString, String secret, String signMethod) throws Exception {
|
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
|
Mac mac = Mac.getInstance(signMethod);
|
mac.init(signingKey);
|
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
|
return Base64.encodeBase64String(rawHmac);
|
}
|
|
}
|