
有个需求,需要对系统的用户进行全局推送,包含定时推送。 因为怕重复推送了所以打算是待推送的任务发在 redis list 里面。定时任务去队列取未推送的任务。可是每次取多少,怎么取。 还是我要用发布订阅。
1 coffeSlider 2019-05-09 16:10:31 +08:00 取多少,怎么取不看你自己的业务吗?如果 job 是单线程,取数据就直接用 rpoplpush。 |
2 rpdict 2019-05-09 16:44:33 +08:00 用的 redis 的 zset,按时间排序的有序集合,取的时候就取 0~当前时间戳对应的值就行了 |
3 yxjn 2019-05-09 16:52:25 +08:00 redis 队列的缺点是需要自己写很多的异常补偿机制。毕竟 redis 本身不是作为一个完整的队列功能而存在的。 |
5 iyaozhen 2019-05-09 16:56:10 +08:00 via Android redis 的 pub/sub 严格说不是个队列,是广播,无法并发消费 用 list 就行,左进右出,一条条处理呗,可以多进程。还可以再建个重发队列,失败的丢进去。 |
6 reus 2019-05-09 17:04:40 +08:00 如果 redis 崩了呢?你怎么知道那些发过哪些没发过? |
12 rpdict 2019-05-09 17:54:58 +08:00 @uoddsa 是一个取舍,就好像数据库加了索引插入数据就会慢一些,但是读取会快很多,看需求是要更快的插入速度还是更准确的发送时间吧,有序集合的好处就是不用遍历所有,无序的好处就是插入快 |
15 strive 2019-05-09 18:39:13 +08:00 可以用个存储过程把要推送的用户和消息放到任务表里面,再把任务表里面数据放到 redis 的 list 里面拿出来处理就可以了 |
16 brickyang 2019-05-09 18:44:47 +08:00 via iPhone |
17 lovedebug 2019-05-09 18:58:27 +08:00 via Android 这种需求用 azure service bus 更好吧。redis 是有丢消息风险的。用 kafka 都好很多。 |
18 ericliu001 2019-05-09 19:08:25 +08:00 lpoprpush 看下这个命令 |
19 runnerlee 2019-05-09 19:35:27 +08:00 laravel 的做法是同时维护三个队列: 主队列 (list), 备份队列 (reserved, zset) , 延时队列 (delayed, zset). 消息从 list 里 lpop 出来之后会根据超时时间再次存放到备份队列里去, 这个操作用 lua 实现: https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L54 ``` -- Pop the first job off of the queue... local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then -- Increment the attempt count and place job on the reserved queue... reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', KEYS[2], ARGV[1], reserved) redis.call('lpop', KEYS[3]) end return {job, reserved} ``` 而在从主队列 pop 之前, 会根据当前时间从备份队列和延时队列两个 zset 中取出消息 rpush 到主队列中. https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/RedisQueue.php#L167 同样也是使用 lua 进行操作 https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L105 ``` -- Get all of the jobs with an expired "score"... local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto the destination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) -- Push a notification for every job that was migrated... for j = i, math.min(i+99, #val) do redis.call('rpush', KEYS[3], 1) end end end return val ``` 同时为了避免重复消费, 在消息消费成功后, 会手动从备份队列删除备份消息. https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Jobs/RedisJob.php#L84 在每次 pop 出消息并进行消费之前, 会注册一个 timeoutHandler, 通过计时器来实现中断超时任务 https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L111 所以, 当消费过程中发生异常退出或是超时中断后, 会根据重试时间, 从备份队列里面取出备份消息重新消费. |
21 runnerlee 2019-05-09 19:54:43 +08:00 忘了补充一个细节, 在用 lua 调用 lpop 之后, 会将消息 json decode 出来然后自增 attempts 字段, 再放到备份队列. https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L62 这样是为了实现最大重试次数, 当失败到配置的最大次数之后, 会把消息保存到 mysql 后从 redis 里丢弃掉. |
22 fishioon 2019-05-09 20:12:32 +08:00 可以考虑下 redis 5.0 中的 stream 结构 |
23 ihipop 2019-05-09 23:56:26 +08:00 via Android NSQ |
24 scnace 2019-05-10 00:25:47 +08:00 via Android di.....disque ? |
25 zk123 2019-05-10 07:59:15 +08:00 via iPhone 发布订阅模式的数据可靠性不保证,它的数据不保存到快照或者 aof 中,发布即焚,也没有 ack 机制,不适用这样消息队列场景。 List 队列模式比较适合,不过自己要补偿做许多 ack 以及失败机制。建议还是考虑 MQ 或者其他 push。 |
26 zchlwj 2019-05-10 08:54:43 +08:00 redis 消息队列不存盘的哦,这种场景还是考虑 mq 把 |
27 polebug 2019-05-10 09:03:58 +08:00 via Android 我上次写业务 也是用的 zset 慢点无所谓 反正并发推送 |
28 fuxinya 2019-05-10 10:13:41 +08:00 via Android 如果是 spring 项目可以去看看 redisson |
29 ducklyl 2019-05-10 10:52:47 +08:00 别用 redis 做队列,用 mq 或 rq |