读书笔记系列,一章一篇。
取自深入理解kafka和kafka专栏(极客时间)
基本概念
特点
消息系统
和传统的消息系统一样,但是额外提供,顺序消费,回溯消费。
存储系统
消息是持久化存储的。
流式平台处理
kafka不仅为多个流行的流式处理框架提供了可靠的数据来源,还提供了完整的流式处理类库。
概念
producer
生产者:生产消息发送给kakfa实例。
Consumer
消费者:负责消费kafka上面的消息。消费者端采用pull的方式拉去消息,保存了数据的偏移量,当消费者宕机后重新上线,可以根据之前保存的消费位置重新进行消费。
Broker
一个kafka实例,一个kafka集群里面有多个Broker,也就是多个kafka实例
ZooKeeper
Zk负责管理kafka集群的的元数据,控制器的选举等。它是一个分布式协调框架,负责协调管理并保存Kafka集群的所有元数据信息,比如集群都有哪些Broker在运行、创建了哪些Topic,每个Topic都有多少分区以及这些分区的Leader副本都在哪些机器上等信息。
主题
kafka的消息是根据主题来分类。生产者发送的消息都要指定一个主题,消费者消费消息也是订阅主题。
分区
主题是逻辑上的概念,一个主题可能是多个分区组成的,一个分区只可能会是一个主题的。分区可以看做是一个可追加的日志文件,每个消息在分区里面都有一个偏移量,kafka通过这个偏移量保证顺序消费,但是kafka保证的是分区顺序消费,而不是主题顺序消费。
副本
一个分区有多个副本,其中一个是lead副本,其余是follow副本,分布在不同的broker里面,lead副本负责读写,其余副本负责和lead副本消息同步,当lead副本故障时,可以选举follow副本继续提供服务。由于副本的同步是异步的,因此可能不一致,所有副本统称为AR,所有和lead副本保持一致的是统称为ISR,与lead副本滞后的副本则统称为osr副本。这2个集合的副本是动态的,当ISR不满足时则将其提到OSR,当OSR更上时,则将其丢到ISR,副本数据的限制和你的当前borker相关,因为多个副本放在一台broken上毫无意思,因此,每个副本在不同的broken上。
HW
高水位。这个是消费者目前只能拉倒的消息偏移量。这个值是所有副本AR里面保存数据最小值offset。
LEO
则是记录当前下一条数据写入的offset.
版本号
kafka-2.11-2.1.1,这个里面2.11是scala的编译器版本,2.1.1是kafka真实的版本。2是大版本,第一个1是小版本号,第二个1是修订号。
0.7远古版本。
0.8加了副本机制,但是api是远古版本客户端api,且和新版本不兼容。
0.9加入了安全认证,权限的功能,新版本生产者api稳定,但是消费者api极其不稳定。
0.10:加了了kafka Streams。消费者api较为稳定。
0.11提供幂等性produceapi和事务api.以及对消息做了重构。
1.0和2.0对stream做了优化,消息没有什么变化。
简单java代码
ps:注意kafka的server.properrties的listeners要开放出来,其格式为protocoll://hostname:port1。协议+域名ip+端口。第一个协议有很多,默认是PLAINTEXT不开启安全校验,域名要写出来,不写则是默认网卡,但是可能为127.0.0.1导致外部无法访问。还有advertised.listeners作用和listeners类似,主要用于IaaS环境,比如公有云上配备多个网卡,包含了私网网卡和公网网卡。对于这种情况,可以设置advertised.listeners绑定公网ip对外客户端使用。使用listeners绑定私网ip给broker之间通信。
maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
生产者:
package com.dh.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
*
* @author Lenovo
*
*/
public class SimpleKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
// broker地址
props.put("bootstrap.servers", "192.168.147.131:9092");
// 指定消息key序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定消息本身的序列化方式
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("topic-demo111", "hello, kafka");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Message sent successfully");
producer.close();
}
}
消费者:
package com.dh.kafka;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SimpleKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.147.131:9092");
// 每个消费者分配独立的组号
props.put("group.id", "group.demo");
// 设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@SuppressWarnings("resource")
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic-demo111"));
System.out.println("Subscribed to topic " + "topic-demo111");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
System.out.printf(record.value());
}
}
}