redis 队列推送消息的疑问 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
uoddsa
V2EX    PHP

redis 队列推送消息的疑问

  •  
  •   uoddsa 2019-05-09 15:58:16 +08:00 7456 次点击
    这是一个创建于 2416 天前的主题,其中的信息可能已经有所发展或是发生改变。

    有个需求,需要对系统的用户进行全局推送,包含定时推送。 因为怕重复推送了所以打算是待推送的任务发在 redis list 里面。定时任务去队列取未推送的任务。可是每次取多少,怎么取。 还是我要用发布订阅。

    第 1 条附言    2019-05-09 17:32:51 +08:00
    问题 ,怎么取队列里面的消息 while(true) ?还是有其他好的方法。
    29 条回复    2019-05-10 10:52:47 +08:00
    coffeSlider
        1
    coffeSlider  
       2019-05-09 16:10:31 +08:00
    取多少,怎么取不看你自己的业务吗?如果 job 是单线程,取数据就直接用 rpoplpush。
    rpdict
        2
    rpdict  
       2019-05-09 16:44:33 +08:00
    用的 redis 的 zset,按时间排序的有序集合,取的时候就取 0~当前时间戳对应的值就行了
    yxjn
        3
    yxjn  
       2019-05-09 16:52:25 +08:00
    redis 队列的缺点是需要自己写很多的异常补偿机制。毕竟 redis 本身不是作为一个完整的队列功能而存在的。
    julyclyde
        4
    julyclyde  
       2019-05-09 16:56:04 +08:00
    @rpdict zset 很容易慢的
    iyaozhen
        5
    iyaozhen  
       2019-05-09 16:56:10 +08:00 via Android
    redis 的 pub/sub 严格说不是个队列,是广播,无法并发消费

    用 list 就行,左进右出,一条条处理呗,可以多进程。还可以再建个重发队列,失败的丢进去。
    reus
        6
    reus  
       2019-05-09 17:04:40 +08:00
    如果 redis 崩了呢?你怎么知道那些发过哪些没发过?
    rpdict
        7
    rpdict  
       2019-05-09 17:12:45 +08:00
    @julyclyde 有序集合的好处就是读取的时候读第一个,如果时间没到就不用接着读了,节约的时间在这里
    julyclyde
        8
    julyclyde  
       2019-05-09 17:25:39 +08:00
    @rpdict 我是指 zset 的排序速度慢。尤其是没按顺序插入的时候
    uoddsa
        9
    uoddsa  
    OP
       2019-05-09 17:32:33 +08:00
    @iyaozhen 问题就在这里 怎么取队列里面的消息 while(true) ?还是有其他好的方法。
    yxjn
        10
    yxjn  
       2019-05-09 17:38:25 +08:00
    @uoddsa blpop 看看能不能满足你的需求
    lestat
        11
    lestat  
       2019-05-09 17:42:52 +08:00
    @rpdict laravel 里面的延时队列好像就是这么设计的
    rpdict
        12
    rpdict  
       2019-05-09 17:54:58 +08:00
    @uoddsa 是一个取舍,就好像数据库加了索引插入数据就会慢一些,但是读取会快很多,看需求是要更快的插入速度还是更准确的发送时间吧,有序集合的好处就是不用遍历所有,无序的好处就是插入快
    rpdict
        13
    rpdict  
       2019-05-09 17:57:26 +08:00
    @lestat 我是参考了有赞的延时队列,感觉大家做法都差不多?感谢回复,我去看看 laravel 怎么实现的
    Evilk
        14
    Evilk  
       2019-05-09 18:29:37 +08:00
    @yxjn 同意,redis 还是适合缓存,需要队列的话,还是选择专业的吧,比如 rabbitMQ
    strive
        15
    strive  
       2019-05-09 18:39:13 +08:00
    可以用个存储过程把要推送的用户和消息放到任务表里面,再把任务表里面数据放到 redis 的 list 里面拿出来处理就可以了
    brickyang
        16
    brickyang  
       2019-05-09 18:44:47 +08:00 via iPhone
    lovedebug
        17
    lovedebug  
       2019-05-09 18:58:27 +08:00 via Android
    这种需求用 azure service bus 更好吧。redis 是有丢消息风险的。用 kafka 都好很多。
    ericliu001
        18
    ericliu001  
       2019-05-09 19:08:25 +08:00
    lpoprpush 看下这个命令
    runnerlee
        19
    runnerlee  
       2019-05-09 19:35:27 +08:00   2
    laravel 的做法是同时维护三个队列: 主队列 (list), 备份队列 (reserved, zset) , 延时队列 (delayed, zset).

    消息从 list 里 lpop 出来之后会根据超时时间再次存放到备份队列里去, 这个操作用 lua 实现:

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L54
    ```
    -- Pop the first job off of the queue...
    local job = redis.call('lpop', KEYS[1])
    local reserved = false
    if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', KEYS[2], ARGV[1], reserved)
    redis.call('lpop', KEYS[3])
    end
    return {job, reserved}
    ```

    而在从主队列 pop 之前, 会根据当前时间从备份队列和延时队列两个 zset 中取出消息 rpush 到主队列中.

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/RedisQueue.php#L167

    同样也是使用 lua 进行操作
    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L105
    ```
    -- Get all of the jobs with an expired "score"...
    local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

    -- If we have values in the array, we will remove them from the first queue
    -- and add them onto the destination queue in chunks of 100, which moves
    -- all of the appropriate jobs onto the destination queue very safely.
    if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
    redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
    -- Push a notification for every job that was migrated...
    for j = i, math.min(i+99, #val) do
    redis.call('rpush', KEYS[3], 1)
    end
    end
    end

    return val
    ```

    同时为了避免重复消费, 在消息消费成功后, 会手动从备份队列删除备份消息.

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Jobs/RedisJob.php#L84

    在每次 pop 出消息并进行消费之前, 会注册一个 timeoutHandler, 通过计时器来实现中断超时任务

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L111

    所以, 当消费过程中发生异常退出或是超时中断后, 会根据重试时间, 从备份队列里面取出备份消息重新消费.
    iyaozhen
        20
    iyaozhen  
       2019-05-09 19:40:58 +08:00 via Android
    @uoddsa 死循环就行了,做好异常处理。后台运行呗
    runnerlee
        21
    runnerlee  
       2019-05-09 19:54:43 +08:00   1
    忘了补充一个细节,

    在用 lua 调用 lpop 之后, 会将消息 json decode 出来然后自增 attempts 字段, 再放到备份队列.

    https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L62

    这样是为了实现最大重试次数, 当失败到配置的最大次数之后, 会把消息保存到 mysql 后从 redis 里丢弃掉.
    fishioon
        22
    fishioon  
       2019-05-09 20:12:32 +08:00
    可以考虑下 redis 5.0 中的 stream 结构
    ihipop
        23
    ihipop  
       2019-05-09 23:56:26 +08:00 via Android
    NSQ
    scnace
        24
    scnace  
       2019-05-10 00:25:47 +08:00 via Android
    di.....disque ?
    zk123
        25
    zk123  
       2019-05-10 07:59:15 +08:00 via iPhone
    发布订阅模式的数据可靠性不保证,它的数据不保存到快照或者 aof 中,发布即焚,也没有 ack 机制,不适用这样消息队列场景。

    List 队列模式比较适合,不过自己要补偿做许多 ack 以及失败机制。建议还是考虑 MQ 或者其他 push。
    zchlwj
        26
    zchlwj  
       2019-05-10 08:54:43 +08:00
    redis 消息队列不存盘的哦,这种场景还是考虑 mq 把
    polebug
        27
    polebug  
       2019-05-10 09:03:58 +08:00 via Android
    我上次写业务 也是用的 zset 慢点无所谓 反正并发推送
    fuxinya
        28
    fuxinya  
       2019-05-10 10:13:41 +08:00 via Android
    如果是 spring 项目可以去看看 redisson
    ducklyl
        29
    ducklyl  
       2019-05-10 10:52:47 +08:00
    别用 redis 做队列,用 mq 或 rq
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2675 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 31ms UTC 00:18 PVG 08:18 LAX 16:18 JFK 19:18
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86