2021与蓝度共同重构项目,服务端
zhanzhiqin
2022-04-01 f0a75a44b34f34a31a035e71ff05f2890fe121c5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.sandu.ximon.admin.manager.iot.amqp;
 
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
 
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
 
/**
 * @author chenjiantian
 * @date 2021/12/2 17:18
 * ampq订阅客户端
 */
public class AmqpClient {
 
    private final AmqpConnectionListener amqpConnectionListener = new AmqpConnectionListener();
    private final AmqpMessageListener amqpMessageListener = new AmqpMessageListener();
 
    private IAmqpSubscriptionConfig config;
 
    public AmqpClient(IAmqpSubscriptionConfig config) {
        this.config = config;
    }
 
    /**
     * 启动ampq订阅客户端
     *
     * @throws Exception
     */
    public void execute() throws Exception {
        for (int i = 0; i < config.getConnectionCount(); i++) {
            long timeStamp = System.currentTimeMillis();
            //签名方法:支持hmacmd5、hmacsha1和hmacsha256。
            String signMethod = "hmacsha1";
 
            // userName组装方法,请参见AMQP客户端接入说明文档。
            String userName = config.getClientId() + "-" + i + "|authMode=aksign"
                    + ",signMethod=" + signMethod
                    + ",timestamp=" + timeStamp
                    + ",authId=" + config.getAccessKey()
                    + ",iotInstanceId=" + config.getIotInstanceId()
                    + ",consumerGroupId=" + config.getConsumerGroupId()
                    + "|";
            //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
            String signContent = "authId=" + config.getAccessKey() + "&timestamp=" + timeStamp;
            String password = doSign(signContent, config.getAccessSecret(), signMethod);
            String connectionUrl = "failover:(amqps://" + config.getHost() + ":5671?amqp.idleTimeout=80000)"
                    + "?failover.reconnectDelay=30";
 
            Hashtable<String, String> hashtable = new Hashtable<>();
            hashtable.put("connectionfactory.SBCF", connectionUrl);
            hashtable.put("queue.QUEUE", "default");
            hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
            Destination queue = (Destination) context.lookup("QUEUE");
            // 创建连接。
            Connection connection = cf.createConnection(userName, password);
//            connections.add(connection);
 
            ((JmsConnection) connection).addConnectionListener(amqpConnectionListener);
            // 创建会话。
            // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
            // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
            connection.start();
            // 创建Receiver连接。
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(amqpMessageListener);
        }
    }
 
    /**
     * 签名校验
     */
    public String doSign(String toSignString, String secret, String signMethod) throws Exception {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Base64.encodeBase64String(rawHmac);
    }
 
}