前几天出去面试,大家都喜欢聊redis,一个是底层数据结构的实现,一个是在业务层的使用,这里就结合一些简单的python代码,讲下怎样用redis 做应用层面的定时器。

    首先,当大批量任务做超时管理,就会涉及到如何实现定时器,使系统开销最小的问题。通常的底层是一个timer 加一个最小堆(有个自己的实现,可以参考:https://my.oschina.net/u/2950272/blog/1822495),或者是环形数组,或者是红黑树。每种方式都用的还比较多的,nginx 使用的是红黑树,golang 底层time 使用的是最小堆。当业务层面要自己实现定时回调,其实使用redis 也可以做,而且还蛮简单的,就是zset 就ok。

    具体的思路是怎样呢?zadd 的时候,val 设置成taskid, score 设置成为timestamp 即可。然后主代码,开个线程,隔一段时间sleep 去zRangeByScore 当前时间戳到之前标记时间的值,然后就可以简单的用redis 管理超时任务了。

    具体例子的可以看下面的一个demo:

import redis
import time
import threading

host = "127.0.0.1"
port = "6379"
password = "123123"
db = 0

pool = redis.ConnectionPool(host=host, port=port, password=password, db=db)
rd = redis.StrictRedis(connection_pool=pool)

def put_task(task_id, timeout):
    due_time = int(time.time() + timeout)*100
    print "put", task_id
    rd.zadd("task", task_id, due_time)

def callback(task_id):
    print "consume taskid", task_id

def get_task():
    now = int(time.time() * 100)
    tasks = rd.zrangebyscore("task", 0, now)
    if tasks:
        rd.zremrangebyscore("task", 0, now)
    return tasks

def produce_task():
    while True:
        task_id = int(time.time() * 100)
        put_task(task_id, 1)
        time.sleep(1)

def consume_task():
    print "start loop"

    while True:
        task_ids = get_task()
        if task_ids:
            for task_id in task_ids:
                callback(task_id)

        time.sleep(1)

if __name__ == "__main__":
    t1 = threading.Thread(target=produce_task)
    t2 = threading.Thread(target=consume_task)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    多个线程注册回调,一个线程隔一段时间去轮训数据库即可,具体时间精度,可以控制时间戳和和循环最小周期,再明白思路的情况下看这个demo 应该是很好理解的。

    在我们线上测试,内网环境下,qps 在2w 是没有压力的。

09-01 20:08