redis -stream

诞生

  Redis5.0出来的,新的数据结构 Stream 支持多播的可持久化的消息队列。其设计借鉴了kafka的设计。
  这个存储类型是一个数据链表,将所有加入的消息都串起来。每个消息都有唯一的id和对应的内容,消息是持久化的,redis重启后,内容还在,其持久化方式,类似于rdb+aof。
  每个stream都有唯一的一个名称就是其key,我们首次使用xadd指令追加消息时,自动创建,同时每个strean可以同时挂多个消费组,每个消费组都有游标last_delivered_id在Stream数组之上向前移动,表示当前消费组已经消费到哪条消息了,每个消费组都有一个stream内唯一的名称,消费组不会自动创建,需要单独的指令xgroup create进行创建,需要制定从这个stream的某个消息id,表示从这个stream开始消费,这个id用来初始化last_delivered_id变量。
  每个消费组的状态都是独立的。相互不影响,也就是说同一个stream内部的消息会被每个消费组消费到。
  每个消费组可以挂起多个消费者,这些消费者是竞争关系的,一个消息被一个消费者消化了,那就无法被另一个消化。每个消费者组内是唯一的。
  消费者内部有个状态变量pending_ids,它记录了客户端读取的消息,但是没有ack的,如果客户端发了ack,那么这个消息id就会被去掉,它来表示消息是否被至少消费一次。
  消息id是timestampInMillis-sequence,当前时间戳,的第几条消息,同时也可以客户端自己指定,但是形式必须是整数-整数,消息内容是键值对。

操作

  xadd:追加消息。
  xadd codehole * name laoqian age 30
  xdel:删除消息这里的删除仅仅是设置了标志位,不影响消息总长度。
  xrange:获取消息队列,自动过滤被删除消息。
  xrange codehole - +
  xlen:消息长度。
  xlen codehole
  del:删除stream。

独立消费

  我们可以不建立消费组的情况下对stream独立消费,其有一个指令叫做xread,可以将stream当做普通的消息队列(list)来使用,使用xread的时候我们可以忽略消费组的存在,好比stream就是一个普通的列表(list)。xread count 2 streams codehole 0-0表示从头读2个数据,没有数据就一直阻塞。但是我们可以用block来设置阻塞时间,0表示永久阻塞。1000表示1秒。 xread block 1000 count 1 streams codehole $。

创建消费组

  Stream通过xgroup createn 创建消费组。需要传递起始消息id作为参数来初始化last_delivered_id。
  xgroup create codehole cg1 0-0:从头开始消费。
  xgroup create codehole cg2 $:从尾部开始消费,只接受新消息。
   xinfo stream codehole:获取stream信息。

消费

  Stream指令xreadgroup可以进行消费组的组内消费,需要消费组名称,消费者名称,起始消息id,读取到新消息后,对应的消息id就会进入消费者的PEL结构里面,等待ack,来删除。

   xreadgroup GROUP cg1 c1 count 1 streams codehole >
命令 消费组 消费者 读几个 那个stream 表示从这个之后读取

限制长度

  stream消息要是太多怎么办,一个链表太长了,会导致性能下降很多,而且,删除也是逻辑删除,这边我们可以在创建这个stream的时候设置默认长度,新的覆盖旧的。
  xadd codehole maxlen 3 * name xiaorui age 1 长度为3

pel如何避免消息丢失

  当客户端突然断掉了来自服务端的消息,消息丢失了,但是pel里面已经保存了发消息的id,待客户端连上了,我们可以读取pel里面的消息列表,来保证消息不丢失。不过此时 xreadgroup 的起始消息 ID 不能为参数>,而必须是任意有效的消息 ID,一般将参数设为 0-0,表示读取所有的 PEL 消息以及自last_delivered_id之后的新消息。

stream的高可用

  这个的高可用是建立在主从复制的基础上的,他和其他数据的复制机制没区别。

分区 Partition

  Redis不支持分区功能,如果要分区,那就配置多个stream,然后客户端根据一定策略生成消息到不同stream。