Dyno-queues 分布式延迟队列 之 辅助功能

0x00 摘要

本系列我们会以设计分布式延迟队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,通过分析Dyno-queues 分布式延迟队列的源码来具体看看设计实现一个分布式延迟队列的方方面面。

0x01 前文回顾

前面两篇文章介绍了设计思路,消息的产生和消费。本文介绍一些辅助功能,有了这些功能可以让系统更加完善。

0x2 Ack机制

前面提到,从Redis角度来看,Dyno-queues 对于每个队列,维护三组Redis数据结构:

  • 包含队列元素和分数的有序集合;
  • 包含消息内容的Hash集合,其中key为消息ID;
  • 包含客户端已经消费但尚未确认的消息有序集合,Un-ack集合

这里的第三组数据结构,就是支持我们的 Ack 机制。

2.1 加入Un-ack集合

前面提到,_pop 是消费消息,具体 _pop 的逻辑如下:

  • 计算当前时间为最大分数。
  • 获取分数在 0 和 最大分数 之间的消息。
  • 将 messageID 添加到 unack 集合中,并从队列的有序集中删除这个 messageID
  • 如果上一步成功,则根据messageID从Redis集合中检索消息。

这就是涉及到 包含客户端已经消费但尚未确认的消息有序集合,Un-ack集合

代码如下:

private List<Message> _pop(String shard, int messageCount,
                           ConcurrentLinkedQueue<String> prefetchedIdQueue) throws Exception {
    String queueShardName = getQueueShardKey(queueName, shard);
    String unackShardName = getUnackKey(queueName, shard);
    double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();

    // NX option indicates add only if it doesn't exist.
    // https://redis.io/commands/zadd#zadd-options-redis-302-or-greater
    ZAddParams zParams = ZAddParams.zAddParams().nx();

    List<Message> popped = new LinkedList<>();
    for (;popped.size() != messageCount;) {
        String msgId = prefetchedIdQueue.poll();

        //将messageID添加到unack集合中
        long added = quorumConn.zadd(unackShardName, unackScore, msgId, zParams);
        if(added == 0){
            monitor.misses.increment();
            continue;
        }

        long removed = quorumConn.zrem(queueShardName, msgId);
        if (removed == 0) {
            monitor.misses.increment();
            continue;
        }

        String json = quorumConn.hget(messageStoreKey, msgId);
        if (json == null) {
            monitor.misses.increment();
            continue;
        }
        Message msg = om.readValue(json, Message.class);
        popped.add(msg);

        if (popped.size() == messageCount) {
            return popped;
        }
    }
    return popped;
}

此时逻辑如下:

                  message list



zset  +----------+----------+----------+-----+----------+  _pop (msg id 9)
      |          |          |          |     |          |
      | msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 | +----+
      |          |          |          |     |          |      |
      +---+------+----+-----+----+-----+-----+----+-----+      |
          |           |          |                |            |
          |           |          |                |            |
          v           v          v                v            |
hash  +---+---+   +---+---+   +--+----+        +--+--+         |
      | msg 1 |   | msg 2 |   | msg 3 |        |msg 9|         |
      +-------+   +-------+   +-------+        +-----+         |
                                                               |
                                                               |
                                                               |
                                                               |
                                                               |
                                                               |
                  unack list                                   |
       +------------+-------------+--------------+             |
zset   |            |             |              |             |
       |  msg id 11 |   msg id 12 |   msg id 13  |  <----------+
       |            |             |              |
       +------------+-------------+--------------+

2.2 ACK

用户当得到消息之后,需要Ack消息,比如:

List pushed_msgs = V1Queue.push(payloads);

Message poppedWithPredicate = V1Queue.popMsgWithPredicate("searchable pay*", false);

V1Queue.ack(poppedWithPredicate.getId());

Ack的逻辑是:

  • 从unack集合中删除messageID。
  • 因为此时已经是ack了,所以此消息就彻底没有意义了,所以从Message有效集合中删除messageID。

代码如下:

@Override
public boolean ack(String messageId) {
    try {
        return execute("ack", "(a shard in) " + queueName, () -> {

            for (String shard : allShards) {
                String unackShardKey = getUnackKey(queueName, shard);
                Long removed = quorumConn.zrem(unackShardKey, messageId);
                if (removed > 0) {
                    quorumConn.hdel(messageStoreKey, messageId);
                    return true;
                }
            }
            return false;
        });
    }
}

private String getUnackKey(String queueName, String shard) {
		return redisKeyPrefix + ".UNACK." + queueName + "." + shard;
}

具体如下:

                  message list



zset  +----------+----------+----------+------
      |          |          |          |     |
      | msg id 1 | msg id 2 | msg id 3 | ... |
      |          |          |          |     |
      +---+------+----+-----+----+-----+-----+
          |           |          |
          |           |          |
          v           v          v                         delete
hash  +---+---+   +---+---+   +--+----+        +-----+
      | msg 1 |   | msg 2 |   | msg 3 |        |msg 9|    <----+  ACK(msg id 9)
      +-------+   +-------+   +-------+        +-----+                  +
                                                                        |
                                                                        |
                                                                        |
                                                                        |
                                                                        |
                                                                        |
                  unack list                                            |
       +------------+-------------+--------------+-------------+  delete|
zset   |            |             |              |             |        |
       |  msg id 11 |   msg id 12 |   msg id 13  |   msg id 9  |  <-----+
       |            |             |              |             |
       +------------+-------------+--------------+-------------+

2.3 处理Un-ACK的消息

后台进程会定时做检测,即 监视 UNACK 集合中的消息,这些消息在给定时间内未被客户端确认(每个队列可配置)。这些消息将移回到队列中。

2.3.1 定时任务

定时任务是如下代码来启动:

schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);

if (this.singleRingTopology) {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}

2.3.2 Un-ACK

如下代码,就是把未确认消息退回到队列中。

@Override
public void processUnacks() {
    try {

        long queueDepth = size();
        monitor.queueDepth.record(queueDepth);

        String keyName = getUnackKey(queueName, shardName);

        execute("processUnacks", keyName, () -> {

            int batchSize = 1_000;
            String unackShardName = getUnackKey(queueName, shardName);

            double now = Long.valueOf(clock.millis()).doubleValue();
            int num_moved_back = 0;
            int num_stale = 0;

            Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);

            for (Tuple unack : unacks) {

                double score = unack.getScore();
                String member = unack.getElement();

                String payload = quorumConn.hget(messageStoreKey, member);
                if (payload == null) {
                    quorumConn.zrem(unackShardName, member);
                    ++num_stale;
                    continue;
                }

                long added_back = quorumConn.zadd(localQueueShard, score, member);
                long removed_from_unack = quorumConn.zrem(unackShardName, member);
                if (added_back > 0 && removed_from_unack > 0) ++num_moved_back;
            }
            return null;
        });

    }
}

此时逻辑如下:

                             message list



           zset  +----------+----------+----------+-----+
                 |          |          |          |     |
+------------->  | msg id 1 | msg id 2 | msg id 3 | ... |
|                |          |          |          |     |
|                +---+------+----+-----+----+-----+-----+
|                    |           |          |
|                    |           |          |
|                    v           v          v
|          hash  +---+---+   +---+---+   +--+----+
|                | msg 1 |   | msg 2 |   | msg 3 |
|                +-------+   +-------+   +-------+
|
|
|
|                           unack list
|                +------------+-------------+--------------+
|         zset   |            |             |              |
|                |  msg id 11 |   msg id 12 |   msg id 13  |
+-------------+  |            |             |              |
  msg id 11      +-------+----+-------------+--------------+
                         ^
                         |  msg id 11
                         |
                 +-------+---------+
                 |                 |
                 | ScheduledThread |
                 |                 |
                 +-----------------+

0x03 防止重复消费

对于防止重复消费,系统做了如下努力:

Dyno-queues 分布式延迟队列 之 辅助功能-LMLPHP

  • 每个节点(上图中的N1...Nn)与可用性区域具有关联性,并且与该区域中的redis服务器进行通信。
  • Dynomite / Redis节点一次只能提供一个请求,Dynomite可以允许数千个并发连接,但是请求是由Redis中的单个线程处理,这确保了当发出两个并发调用从队列轮询元素时,是由Redis服务器顺序执行,从而避免任何本地或分布式锁。
  • 在发生故障转移的情况下,确保没有两个客户端连接从队列中获取相同的消息。

0x04 防止消息丢失

4.1 消息丢失的可能

4.1.1 生产者弄丢了数据

生产者将数据发送到 MQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

比如,如下就是简单的插入,缺少必要的保证。

List pushed_msgs = V1Queue.push(payloads);

4.1.2 MQ 弄丢了数据

这种情况就是 MQ 自己弄丢了数据,这个你必须开启MQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 MQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。

4.2 Dyno-queues 保证

Dyno-queues 使用ensure来确认消息完全写入到所有分区

简单来说,就是:

  1. 对于所有分区,逐一进行:"写数据(就是message id),读出写入的数据" 这样的操作。如果有一个分区写出错,就返回失败。
  2. 如果把 message id 都已经写入到所有的分区,再写入消息内容。

Enqueues 'message' if it doesn't exist in any of the shards or unack sets.

@Override
public boolean ensure(Message message) {
    return execute("ensure", "(a shard in) " + queueName, () -> {

        String messageId = message.getId();
        for (String shard : allShards) {

            String queueShard = getQueueShardKey(queueName, shard);
            Double score = quorumConn.zscore(queueShard, messageId);
            if (score != null) {
                return false;
            }
            String unackShardKey = getUnackKey(queueName, shard);
            score = quorumConn.zscore(unackShardKey, messageId);
            if (score != null) {
                return false;
            }

        }
        push(Collections.singletonList(message));
        return true;
    });
}

0x05 过期消息

针对过期消息,Dyno-queues 的处理方式是一次性找出过期消息给用户处理,其中过期时间由用户在参数中设定。

所以 findStaleMessages 就是利用 lua 脚本找出过期消息。

@Override
public List<Message> findStaleMessages() {
    return execute("findStaleMessages", localQueueShard, () -> {

        List<Message> stale_msgs = new ArrayList<>();

        int batchSize = 10;

        double now = Long.valueOf(clock.millis()).doubleValue();
        long num_stale = 0;

        for (String shard : allShards) {
            String queueShardName = getQueueShardKey(queueName, shard);

            Set<String> elems = nonQuorumConn.zrangeByScore(queueShardName, 0, now, 0, batchSize);

            if (elems.size() == 0) {
                continue;
            }

            String findStaleMsgsScript = "local hkey=KEYS[1]\n" +
                    "local queue_shard=ARGV[1]\n" +
                    "local unack_shard=ARGV[2]\n" +
                    "local num_msgs=ARGV[3]\n" +
                    "\n" +
                    "local stale_msgs={}\n" +
                    "local num_stale_idx = 1\n" +
                    "for i=0,num_msgs-1 do\n" +
                    "  local msg_id=ARGV[4+i]\n" +
                    "\n" +
                    "  local exists_hash = redis.call('hget', hkey, msg_id)\n" +
                    "  local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" +
                    "  local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" +
                    "\n" +
                    "  if (exists_hash and exists_queue) then\n" +
                    "  elseif (not (exists_unack)) then\n" +
                    "    stale_msgs[num_stale_idx] = msg_id\n" +
                    "    num_stale_idx = num_stale_idx + 1\n" +
                    "  end\n" +
                    "end\n" +
                    "\n" +
                    "return stale_msgs\n";

            String unackKey = getUnackKey(queueName, shard);
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.add(queueShardName);
            builder.add(unackKey);
            builder.add(Integer.toString(elems.size()));
            for (String msg : elems) {
                builder.add(msg);
            }

            ArrayList<String> stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build());
            num_stale = stale_msg_ids.size();

            for (String m : stale_msg_ids) {
                Message msg = new Message();
                msg.setId(m);
                stale_msgs.add(msg);
            }
        }

        return stale_msgs;
    });
}

0x6 消息删除

Dyno-queues 支持消息删除:业务使用方可以随时删除指定消息。

具体删除是 从 unack队列 和 正常队列中删除。

@Override
public boolean remove(String messageId) {
		return execute("remove", "(a shard in) " + queueName, () -> {

            for (String shard : allShards) {

                String unackShardKey = getUnackKey(queueName, shard);
                quorumConn.zrem(unackShardKey, messageId);

                String queueShardKey = getQueueShardKey(queueName, shard);
                Long removed = quorumConn.zrem(queueShardKey, messageId);

                if (removed > 0) {
                    // Ignoring return value since we just want to get rid of it.
                    Long msgRemoved = quorumConn.hdel(messageStoreKey, messageId);
                    return true;
                }
            }
            return false;
        });
}

0x07 批量处理以增加吞吐

Dyno-queues 利用lua脚本来进行批量处理,这样可以增加吞吐。

7.1 Lua脚本

Redis中为什么引入Lua脚本?

Redis提供了非常丰富的指令集,官网上提供了200多个命令。但是某些特定领域,需要扩充若干指令原子性执行时,仅使用原生命令便无法完成。

Redis 为这样的用户场景提供了 lua 脚本支持,用户可以向服务器发送 lua 脚本来执行自定义动作,获取脚本的响应数据。Redis 服务器会单线程原子性执行 lua 脚本,保证 lua 脚本在处理的过程中不会被任意其它请求打断。

使用脚本的好处如下:

  • 减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延。
  • 原子操作。Redis会将整个脚本作为一个整体执行,中间不会被其他请求插入。因此在脚本运行过程中无需担心会出现竞态条件,无需使用事务。
  • 复用。客户端发送的脚本会永久存在redis中,这样其他客户端可以复用这一脚本,而不需要使用代码完成相同的逻辑。

7.2 实现

具体代码如下,可以看到就是采用了lua脚本一次性写入:

// TODO: Do code cleanup/consolidation
private List<Message> atomicBulkPopHelper(int messageCount,
                      ConcurrentLinkedQueue<String> prefetchedIdQueue, boolean localShardOnly) throws IOException {

    double now = Long.valueOf(clock.millis() + 1).doubleValue();
    double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();

    // The script requires the scores as whole numbers
    NumberFormat fmt = NumberFormat.getIntegerInstance();
    fmt.setGroupingUsed(false);
    String nowScoreString = fmt.format(now);
    String unackScoreString = fmt.format(unackScore);

    List<String> messageIds = new ArrayList<>();
    for (int i = 0; i < messageCount; ++i) {
        messageIds.add(prefetchedIdQueue.poll());
    }

    String atomicBulkPopScriptLocalOnly="local hkey=KEYS[1]\n" +
            "local num_msgs=ARGV[1]\n" +
            "local peek_until=ARGV[2]\n" +
            "local unack_score=ARGV[3]\n" +
            "local queue_shard_name=ARGV[4]\n" +
            "local unack_shard_name=ARGV[5]\n" +
            "local msg_start_idx = 6\n" +
            "local idx = 1\n" +
            "local return_vals={}\n" +
            "for i=0,num_msgs-1 do\n" +
            "  local message_id=ARGV[msg_start_idx + i]\n" +
            "  local exists = redis.call('zscore', queue_shard_name, message_id)\n" +
            "  if (exists) then\n" +
            "    if (exists <=peek_until) then\n" +
            "      local value = redis.call('hget', hkey, message_id)\n" +
            "      if (value) then\n" +
            "        local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" +
            "        if (zadd_ret) then\n" +
            "          redis.call('zrem', queue_shard_name, message_id)\n" +
            "          return_vals[idx]=value\n" +
            "          idx=idx+1\n" +
            "        end\n" +
            "      end\n" +
            "    end\n" +
            "  else\n" +
            "    return {}\n" +
            "  end\n" +
            "end\n" +
            "return return_vals";

    String atomicBulkPopScript="local hkey=KEYS[1]\n" +
            "local num_msgs=ARGV[1]\n" +
            "local num_shards=ARGV[2]\n" +
            "local peek_until=ARGV[3]\n" +
            "local unack_score=ARGV[4]\n" +
            "local shard_start_idx = 5\n" +
            "local msg_start_idx = 5 + (num_shards * 2)\n" +
            "local out_idx = 1\n" +
            "local return_vals={}\n" +
            "for i=0,num_msgs-1 do\n" +
            "  local found_msg=false\n" +
            "  local message_id=ARGV[msg_start_idx + i]\n" +
            "  for j=0,num_shards-1 do\n" +
            "    local queue_shard_name=ARGV[shard_start_idx + (j*2)]\n" +
            "    local unack_shard_name=ARGV[shard_start_idx + (j*2) + 1]\n" +
            "    local exists = redis.call('zscore', queue_shard_name, message_id)\n" +
            "    if (exists) then\n" +
            "      found_msg=true\n" +
            "      if (exists <=peek_until) then\n" +
            "        local value = redis.call('hget', hkey, message_id)\n" +
            "        if (value) then\n" +
            "          local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" +
            "          if (zadd_ret) then\n" +
            "            redis.call('zrem', queue_shard_name, message_id)\n" +
            "            return_vals[out_idx]=value\n" +
            "            out_idx=out_idx+1\n" +
            "            break\n" +
            "          end\n" +
            "        end\n" +
            "      end\n" +
            "    end\n" +
            "  end\n" +
            "  if (found_msg == false) then\n" +
            "    return {}\n" +
            "  end\n" +
            "end\n" +
            "return return_vals";

    List<Message> payloads = new ArrayList<>();
    if (localShardOnly) {
        String unackShardName = getUnackKey(queueName, shardName);

        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(Integer.toString(messageCount));
        builder.add(nowScoreString);
        builder.add(unackScoreString);
        builder.add(localQueueShard);
        builder.add(unackShardName);
        for (int i = 0; i < messageCount; ++i) {
            builder.add(messageIds.get(i));
        }

        List<String> jsonPayloads;
        // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
        jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScriptLocalOnly,
                Collections.singletonList(messageStoreKey), builder.build());

        for (String p : jsonPayloads) {
            Message msg = om.readValue(p, Message.class);
            payloads.add(msg);
        }
    } else {
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(Integer.toString(messageCount));
        builder.add(Integer.toString(allShards.size()));
        builder.add(nowScoreString);
        builder.add(unackScoreString);
        for (String shard : allShards) {
            String queueShard = getQueueShardKey(queueName, shard);
            String unackShardName = getUnackKey(queueName, shard);
            builder.add(queueShard);
            builder.add(unackShardName);
        }
        for (int i = 0; i < messageCount; ++i) {
            builder.add(messageIds.get(i));
        }

        List<String> jsonPayloads;
        // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
        jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScript,
                Collections.singletonList(messageStoreKey), builder.build());

        for (String p : jsonPayloads) {
            Message msg = om.readValue(p, Message.class);
            payloads.add(msg);
        }
    }

    return payloads;
}

0x08 V2

最新版本是 V2,有三个类,我们看看具体是什么作用。

  • QueueBuilder

  • MultiRedisQueue

  • RedisPipelineQueue

8.1 QueueBuilder

就是封装,对外统一提供API。

public class QueueBuilder {

    private Clock clock;

    private String queueName;

    private String redisKeyPrefix;

    private int unackTime;

    private String currentShard;

    private ShardSupplier shardSupplier;

    private HostSupplier hs;

    private EurekaClient eurekaClient;

    private String applicationName;

    private Collection<Host> hosts;

    private JedisPoolConfig redisPoolConfig;

    private DynoJedisClient dynoQuorumClient;

    private DynoJedisClient dynoNonQuorumClient;
}

8.2 MultiRedisQueue

该类也是为了提高速度,其内部包括多个RedisPipelineQueue,每个queue代表一个分区,利用 round robin 方式写入。

/**
 * MultiRedisQueue exposes a single queue using multiple redis queues.  Each RedisQueue is a shard.
 * When pushing elements to the queue, does a round robin to push the message to one of the shards.
 * When polling, the message is polled from the current shard (shardName) the instance is associated with.
 */
public class MultiRedisQueue implements DynoQueue {
    private List<String> shards;
    private String name;
    private Map<String, RedisPipelineQueue> queues = new HashMap<>();
    private RedisPipelineQueue me;
}

8.3 RedisPipelineQueue

这个类就是使用pipeline来提升吞吐。

Queue implementation that uses Redis pipelines that improves the throughput under heavy load.。

public class RedisPipelineQueue implements DynoQueue {

    private final Logger logger = LoggerFactory.getLogger(RedisPipelineQueue.class);

    private final Clock clock;

    private final String queueName;

    private final String shardName;

    private final String messageStoreKeyPrefix;

    private final String myQueueShard;

    private final String unackShardKeyPrefix;

    private final int unackTime;

    private final QueueMonitor monitor;

    private final ObjectMapper om;

    private final RedisConnection connPool;

    private volatile RedisConnection nonQuorumPool;

    private final ScheduledExecutorService schedulerForUnacksProcessing;

    private final HashPartitioner partitioner = new Murmur3HashPartitioner();

    private final int maxHashBuckets = 32;

    private final int longPollWaitIntervalInMillis = 10;
}

0xFF 参考

干货分享 | 如何从零开始设计一个消息队列

消息队列的理解,几种常见消息队列对比,新手也能看得懂!----分布式中间件消息队列

消息队列设计精要

有赞延迟队列设计

基于Dynomite的分布式延迟队列

http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/

http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq

http://activemq.apache.org/delay-and-schedule-message-delivery.html

源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)

源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)

原创 Amazon Dynamo系统架构

Netlix Dynomite性能基准测试,基于AWS和Redis

为什么分布式一定要有延时任务?

02-26 06:55