序
本文主要研究一下rocketmq的AllocateMessageQueueConsistentHash
AllocateMessageQueueStrategy
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
*
* @param consumerGroup current consumer group
* @param currentCID current consumer id
* @param mqAll message queue set in current topic
* @param cidAll consumer set in current consumer group
* @return The allocate result of given strategy
*/
List<MessageQueue> allocate(
final String consumerGroup,
final String currentCID,
final List<MessageQueue> mqAll,
final List<String> cidAll
);
/**
* Algorithm name
*
* @return The strategy name
*/
String getName();
}
- AllocateMessageQueueStrategy定义了allocate、getName方法
AllocateMessageQueueConsistentHash
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
public AllocateMessageQueueConsistentHash() {
this(10);
}
public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {
this(virtualNodeCnt, null);
}
public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
if (virtualNodeCnt < 0) {
throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
}
this.virtualNodeCnt = virtualNodeCnt;
this.customHashFunction = customHashFunction;
}
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
}
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(mq);
}
}
return results;
}
@Override
public String getName() {
return "CONSISTENT_HASH";
}
private static class ClientNode implements Node {
private final String clientID;
public ClientNode(String clientID) {
this.clientID = clientID;
}
@Override
public String getKey() {
return clientID;
}
}
}
- AllocateMessageQueueConsistentHash实现了AllocateMessageQueueStrategy接口,它定义了virtualNodeCnt(
默认为10
)及customHashFunction属性;其allocate方法根据cidAll构造ClientNode列表,然后创建ConsistentHashRouter,最后遍历mqAll使用router.routeNode(mq.toString())选择clientNode,若clientNode不为null且currentCID.equals(clientNode.getKey()),则将该mq添加到results中,最后返回results;其getName方法返回的是CONSISTENT_HASH
ConsistentHashRouter
rocketmq-common-4.5.2-sources.jar!/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
public class ConsistentHashRouter<T extends Node> {
private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>();
private final HashFunction hashFunction;
public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
this(pNodes, vNodeCount, new MD5Hash());
}
/**
* @param pNodes collections of physical nodes
* @param vNodeCount amounts of virtual nodes
* @param hashFunction hash Function to hash Node instances
*/
public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
if (hashFunction == null) {
throw new NullPointerException("Hash Function is null");
}
this.hashFunction = hashFunction;
if (pNodes != null) {
for (T pNode : pNodes) {
addNode(pNode, vNodeCount);
}
}
}
/**
* add physic node to the hash ring with some virtual nodes
*
* @param pNode physical node needs added to hash ring
* @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
*/
public void addNode(T pNode, int vNodeCount) {
if (vNodeCount < 0)
throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
int existingReplicas = getExistingReplicas(pNode);
for (int i = 0; i < vNodeCount; i++) {
VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);
ring.put(hashFunction.hash(vNode.getKey()), vNode);
}
}
/**
* remove the physical node from the hash ring
*/
public void removeNode(T pNode) {
Iterator<Long> it = ring.keySet().iterator();
while (it.hasNext()) {
Long key = it.next();
VirtualNode<T> virtualNode = ring.get(key);
if (virtualNode.isVirtualNodeOf(pNode)) {
it.remove();
}
}
}
/**
* with a specified key, route the nearest Node instance in the current hash ring
*
* @param objectKey the object key to find a nearest Node
*/
public T routeNode(String objectKey) {
if (ring.isEmpty()) {
return null;
}
Long hashVal = hashFunction.hash(objectKey);
SortedMap<Long, VirtualNode<T>> tailMap = ring.tailMap(hashVal);
Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
return ring.get(nodeHashVal).getPhysicalNode();
}
public int getExistingReplicas(T pNode) {
int replicas = 0;
for (VirtualNode<T> vNode : ring.values()) {
if (vNode.isVirtualNodeOf(pNode)) {
replicas++;
}
}
return replicas;
}
//default hash function
private static class MD5Hash implements HashFunction {
MessageDigest instance;
public MD5Hash() {
try {
instance = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
}
}
@Override
public long hash(String key) {
instance.reset();
instance.update(key.getBytes());
byte[] digest = instance.digest();
long h = 0;
for (int i = 0; i < 4; i++) {
h <<= 8;
h |= ((int) digest[i]) & 0xFF;
}
return h;
}
}
}
- ConsistentHashRouter使用virtual node来进行一致性哈希,默认的hashFunction为MD5Hash;它提供了addNode、removeNode、routeNode、getExistingReplicas方法;其routeNode方法使用hashFunction.hash(objectKey)计算hashVal,然后使用ring.tailMap(hashVal)获取tailMap,若tailMap不为空则取tailMap.firstKey(),否则取ring.firstKey()作为nodeHashVal,最后根据该值取ring中的VirtualNode再取其physicalNode
小结
AllocateMessageQueueConsistentHash实现了AllocateMessageQueueStrategy接口,它定义了virtualNodeCnt(默认为10
)及customHashFunction属性;其allocate方法根据cidAll构造ClientNode列表,然后创建ConsistentHashRouter,最后遍历mqAll使用router.routeNode(mq.toString())选择clientNode,若clientNode不为null且currentCID.equals(clientNode.getKey()),则将该mq添加到results中,最后返回results;其getName方法返回的是CONSISTENT_HASH