消费者与消费组
每个消费者都有一个对应的消费组,当消息发送到主题后。只会被投递给订阅他的每个消费组中的某一个消费者。
按照kafka默认原则。主题X有a,b,c,d。4个分区,有2个消费者组A(C0,C1,C2,C3)4个消费者,B(C4,C5)2个消费者都订阅了这个主题,那么消费者A组中每个消费者都分配一个分区,消费者B每个消费者分配2个分区。消费者只能消费自己所分配分区的消息。换句话说,消费者组之间是共享的,消费组中的消费者是互斥的。
同时我们可以动态的增加消费者增加消费能力,比如B消费者组,我给他再加2个消费者,那么这个B消费者组就会的4个消费者就会被一个分区一个消费者,如果再加一个消费者没什么卵用,因为分区数目<消费者数目,没有多余的分区可以分配给消费者了。同时,假设组内某个实例挂掉了,Kafka能够自动检测到,然后把这个Failed实例之前负责的分区转移给其他活着的消费者。这个过程就是Kafka中大名鼎鼎的“重平衡”(Rebalance)。
消息中间件有2种消息投递方式:即点对点模型(Peer to Peer,P2P)和发布订阅模型。这里面的点对点指的是同一条消息只能被下游的一个消费者消费,其他消费者则不能染指。而发布订阅则是将消息发送到某个主题,消息订阅者都可以从这个主题里面获取消息。
对于kafka来说。所有的消费者都是一个消费组的时候就是点对点模型,消费者隶属于不同消费者组时,则是发布订阅模型。
消息格式
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
}
简单的我就不说了和生产者差不多,offset表示消息所属分区的偏移量。
参数配置
bootstrap.servers:消费者客户端连接kafka集群所需的broker地址清单,一般写2个,host:port,host1:port1,不要全部写出来,因为生产者会从给定的broke里面查出其他broker的信息,写2个是为了防止一个宕机了,生产者依旧可以连接到kafak。
key.deserializer,value.deserializer:和生产者的key.serializer,value.serializer相对应,消费者从broker服务器接受的消息是字节数组存在的,这个就是指定,key和value的序列化操作的反序列化器,来进行数据的反序列化。
group.id:消费者隶属的消费组的名称。
client.id:对应kafka客户端的id,也就是客户端的名字。
fetch.min.bytes:表示poll能从kafka拉取的最小数据量,默认1b
fetch.max.bytes:最大,是50m,但是不是绝对的。真正最大值是message.max.bytes来配置
fetch.max.wait.ms:默认500ms,在这个时间内,如果没有fetch.min.bytes的限定值,还是返回。
max.partition.fetch.bytes:每个分区返回给consumer最大的数据量,默认1m。
max.poll.records:一次poll最大的消息数。默认500.
connections.max.ide.ms:多久之后干不限制的连接,默认9分钟。
exclude.internal.topic:默认true,没法通过正则来订阅内部主题。
receive.buffer.bytes:设定socket的接受缓冲区大小,默认大小:64kb.
send.buffer.bytes:设定sockect发送缓冲区大小,默认是128kb.
request.timeout.ms:配置consumer等待请求响应时间。默认是30000ms。
metadata.max.age.ms:配置元数据更新时间,默认5分钟,即便元数据没有任何变化。
reconnect.backoff.ms:配置尝试重新连接指定主机之前的等待时间。避免某些故障的情况下,频繁发送。默认50ms
retry.backoff.ms:配置尝试重新发送失败到指定主题分区的等待时间,避免某些故障的情况下,频繁发送。默认100ms
1
isolation.level:配置消费者的事务隔离级别。默认是read_uncommitted,可以消费到HW.
代码拼写
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
我们写这些参数时可能某个字符写错了。因此可以用ConsumerConfig来替代。
订阅主题和分区
通过KafkaConsumer的subscribe方法来订阅主题。可以订阅多个主题。因为接收对象是个集合。下面是常见的一个接口方法。
public void subscribe(Collection<String> topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}
还有一个接口是通过正则表达式来订阅的,只要是符合要求的,都会被订阅,即便是后面新增的也是如此。
@Override
public void subscribe(Pattern pattern) {
subscribe(pattern, new NoOpConsumerRebalanceListener());
}
比如
consumer.subscribe(Pattern.compile("topic-.*"));
这个就是以topic开头的主题都会被订阅。
在kafka客户端里面TopicPartition类表示分区。
public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
}
主要2个属性,主题名称,和分区编号。
consumer.assign(Collections.singletonList(new TopicPartition(topic,0)));
我们可以通过assign方法订阅主题里面的某个分区。
但是主题里面有哪些分区时元数据信息。我们可以获取到这些信息,通过生产者的partitionsFor获取分区元数据信息。
List<PartitionInfo> list=consumer.partitionsFor(topic);
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;
}
topic是主题,partition分区号,leader分区的lead副本,replicas分区的AR集合,inSyncReplicas是ISR集合,offlineReplicas是OSR集合。
我们可以先通过partitionsFor方法获取到元数据信息,在进行分区订阅。
consumer.unsubscribe();
我们可以通过上述方法取消订阅。
我们订阅主题消息的方式有3个,分别代表不一样的状态,他们是互斥的。
反序列化
和生产者一文的大致类似,只是反着来了,没啥好说的。
消息消费
消息的消费有2个模式,推:服务器主动把消息推给消费者,拉:消费者主动向服务器发起请求拉取消息。
kafka则是通过poll方法去拉取的。
在我们代码里面,我们就是循环不断调用poll来拉取数据的。poll方法有个参数是时间,表示消费者缓冲区里面没有数据时阻塞。
我们可以根据消息分区维度进行消费。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition tp : records.partitions()) {
for (ConsumerRecord<String, String> record : records.records(tp))
System.out.printf(record.value());
}
同理,也可以根据消息主题进行区分消费。主要用到了下面的方法:
public Iterable<ConsumerRecord<K, V>> records(String topic) {
if (topic == null)
throw new IllegalArgumentException("Topic must be non-null.");
List<List<ConsumerRecord<K, V>>> recs = new ArrayList<>();
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
if (entry.getKey().topic().equals(topic))
recs.add(entry.getValue());
}
return new ConcatenatedIterable<>(recs);
}
位移提交
在kafka服务器端每个消息有个offset用来表示消息所在分区的位置,我们用偏移量来表示,而消费者端也有一个offset,表示消费者消费到分区的哪个消息了,我们用位移来表示。
消费者消费的消息都是没有消费的,因此这个位移必须保存且持久化,这个位移保存在kafka内部的主题_consumer_offsets中,每个消费组一个消费位移。消费者在消费完消息后,需要再这里提交消费位移的提交。记住消费者要提交的消费位移不是已经消费的消息位置的最后一个,而是已消费消息位置+1,这个没有消费的位置。而committed offset则表示已经提交过的消费位移。
@Override
public long position(TopicPartition partition) {
return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
}
消费者的2个接口可以分别获取到上述的2个值。这2个值在某些情况是一样的,但是在某些情况是不一样的。
对于位移提交是要把握时机的,比如你拉取了一大堆数据,还没有消费完就出了bug,你却已经提交了消费位移,那么就会造成消息丢失。如果全部不提交,那也可能造成消息重复。
kafka消息位移提交的策略是自动提交,定期提交,每隔5秒。是以下参数配置
enable.auto.commit默认是true,
auto.commit.interval.ms默认是5s。
在默认情况下,刚提交完一次位移,然后拉取一批消息消费,在下一次自动提交消费位移钱,消费者宕机了,那么重启后,或者转给别的消费者后,就要重复消费。
而消息丢失呢,主要是你代码造成的,比如a线程拉取数据到本地缓存,b线程处理缓存的数据,某个点,提交了位移,因为数据确实消费完了,但是b线程没有消费到缓存中对应的那个点就宕机了,那么就造成了数据丢失。
当然上述情况都是异常情况导致的。正常情况不会如此,因此kafka提供了手动提交位移的操作。
手动提交可以是同步或者异步的。
consumer.commitAsync();
consumer.commitSync();
对于同步提交依然可能造成消息重复。这个还是批量提交。因为可能中间有异常,但是我们可以把异常捕捉,在针对性的处理。
更加精细的提交方式
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
}
用这个接口的方法,可以实现每消费一个消息就提交一次位移。一般来说不会这么做,太消耗性能了。一般来说,我们大多是根据分区的粒度来划分提交位移的界限,代码如下:看了你就懂了。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> pRecords = records.records(tp);
for (ConsumerRecord<String, String> record : pRecords)
System.out.printf(record.value());
long lastOffset = pRecords.get(pRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
}
异步提交则是不一样的,执行时,消费者线程不会被阻塞,可能提交消费位移的结果还没有返回之前就开始了新一轮的拉取操作。他有个回调方法很不错。
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
//这里操作
}
});
由于是异步,可能出现下面问题consumer.commitAsync失败了,你直接重试,然后下面一个成功了,你重试也成功了,然后宕机重启,完了,消息又重复消费了。一般情况,不需要重试,很少发生,后面的会补上。同时我们可以在异步提交之后,在try一下,为了保证消息位移的提交在加个同步提交。
控制消费
有时我们需要暂停某些分区的消费,当达到某些时间时再恢复。
consumer.pause(partitions);
consumer.resume(partitions);
这个是暂停和恢复2个方法。
Set<TopicPartition> set=consumer.paused();
这个是查看被暂停分区的方法。
除了while (isRunning.get()) {}这个方法来设定是否关闭连接外,还可以同期线程外的wakeup方法来退出poll的逻辑。他会抛出一个可捕捉的异常。我们要处理。但是要注意同时关闭资源,调用close方法。
指定位移消费
这些情况下,可能找不到消息位移。
1.新建立的消费组。
2.消费组新的一个消费者订阅了一个新的主题。
3._consumer_offsets这个主题中有关这个消费组的位移信息因为过期被删除。
当kafka找不到消息消费位移后,就会根据消费者客户端参数auto.offset.reset来读取,这个配置默认读取latest,表示从分区末尾开始读取消息。如果改成earlist则是从起始处读取配置。如果改成none,则是抛出异常。只能是这3个值。
除了找不到消费位移,位移越界也会触发auto.offset.reset操作。
我们使用poll 来批量抓取消息时是黑盒的,但是我们可以通过seek细致的抓取消息处理。
public void seek(TopicPartition partition, long offset)
他可以为每个分区设定位移。
Set<TopicPartition> set=consumer.assignment();
获取当前消费者被分配的分区。
我们先poll一次,获取到元数据等信息,然后对分区设置位移,开始消费(ps,不要担心poll会造成消息丢失,因为我们要设定自己的新位移,那次poll的消息是无意义的)
需求来了,我们需要消费昨天8点之后的消息。
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs));
}
参数是map,key是分区,value是时间戳。这个方法会返回时间戳大于等于待查询时间的第一条消息的位置和时间戳。我们可以利用这个位置来针对性的消费。
位移越界也会造成auto.offset.reset启用。
我们同时可以把消费位移保存到db里面,然后下次消费时从db读取位移,再消费。
再均衡
前面说了,基于主题的可以再均衡,直接订阅分区的无法进行再均衡。就是更前面我们说的,一个消费组,新增或者减少消费者,会进行均衡处理。一个分区被重新分配给另一个消费者时,消费者当前的状态会丢失。同时 再进行再均衡期间,消息是无法被读取的。如果某个消费者消费了消息,消息位移还没有提交,那么这个分区被分配给新的消费者时,那么就会造成消息重复消费。
这个再均衡和我们之前说的NoOpConsumerRebalanceListener相关。这个监听器是用来设定发生再均衡前后的一些准备和收尾工作。主要下面2个方法处理
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
这个方法是在均衡之前,消费者停止读取消息之后被调用。可以用这个方法来提交消息位移。
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
这个方法在重新分配分区之后和消费者重新开始读取之前调用。
Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<>();
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(offsets);
offsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
});
我们需要把消费位移记录到map里面。每一次消费消息时,把这个消息的分区和分区位移加到这个map里面。
消费者拦截器
public interface ConsumerInterceptor<K, V> extends Configurable {
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
这个拦截器,必须实现这个接口,主要2个方法,onConsume方法是poll方法返回之前调用的,onCommit则是提交完消费位移后调用的。
注意要在配置里面添加:
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, value)
和生产者一样,可以写成链式。
多线程实现
生产者是线程安全的,但是消费者不是线程安全的。
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
他有个方法是专门检测当前是否只有一个线程操作。
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
和这个方法相互配合,类似加锁和解锁。注意compareAndSet(a,b)这个方法是a先和这个类的数据比较,如果一样则把b放进去替代数据。
为了提高消费者的消费能力,防止broke里面堆积的消息过多。从而导致消息还没被消费就丢失。下面几个策略可以提高消费能力。
1.每个线程实现一个KafkaConsumer,但是线程数目不能超过分区数,前面解释过。
2.去除再平衡效果,直接以分区作为基本单位进行消费。多个线程消费一个分区。一般不会推荐这么做的。
直接启动多个消费者线程,有个缺点就是,每个线程都要维护一个tcp连接。这会造成很大的系统开销。
3.对于kafka来说poll的速度是很快的,但是一般是处理消息是很消耗性能。如果改动了这一块,那么对性能的提升很大。因此我们可以吧消息处理模块变成多线程处理。但是对顺序消费有问题。同时位移提交也要处理。可以考虑共享变量Map
可以考虑tcp的滑动窗口模式。把数据丢到一个缓存队列里面。然后多个设定一个固定的大小的窗口,也就是,最小游标和最大游标。然后多线程(业务线程)充窗口拉数据,处理完了就回去报告,接着拉取数据,同时窗口判断自己是否可移动。