kafka入门001 -基本概念

读书笔记系列,一章一篇。
取自深入理解kafka和kafka专栏(极客时间)

基本概念

特点

消息系统

  和传统的消息系统一样,但是额外提供,顺序消费,回溯消费。

存储系统

  消息是持久化存储的。

流式平台处理

  kafka不仅为多个流行的流式处理框架提供了可靠的数据来源,还提供了完整的流式处理类库。

概念

producer

  生产者:生产消息发送给kakfa实例。

Consumer

  消费者:负责消费kafka上面的消息。消费者端采用pull的方式拉去消息,保存了数据的偏移量,当消费者宕机后重新上线,可以根据之前保存的消费位置重新进行消费。  

Broker

  一个kafka实例,一个kafka集群里面有多个Broker,也就是多个kafka实例

ZooKeeper

  Zk负责管理kafka集群的的元数据,控制器的选举等。它是一个分布式协调框架,负责协调管理并保存Kafka集群的所有元数据信息,比如集群都有哪些Broker在运行、创建了哪些Topic,每个Topic都有多少分区以及这些分区的Leader副本都在哪些机器上等信息。

主题

  kafka的消息是根据主题来分类。生产者发送的消息都要指定一个主题,消费者消费消息也是订阅主题。

分区

  主题是逻辑上的概念,一个主题可能是多个分区组成的,一个分区只可能会是一个主题的。分区可以看做是一个可追加的日志文件,每个消息在分区里面都有一个偏移量,kafka通过这个偏移量保证顺序消费,但是kafka保证的是分区顺序消费,而不是主题顺序消费。

副本

  一个分区有多个副本,其中一个是lead副本,其余是follow副本,分布在不同的broker里面,lead副本负责读写,其余副本负责和lead副本消息同步,当lead副本故障时,可以选举follow副本继续提供服务。由于副本的同步是异步的,因此可能不一致,所有副本统称为AR,所有和lead副本保持一致的是统称为ISR,与lead副本滞后的副本则统称为osr副本。这2个集合的副本是动态的,当ISR不满足时则将其提到OSR,当OSR更上时,则将其丢到ISR,副本数据的限制和你的当前borker相关,因为多个副本放在一台broken上毫无意思,因此,每个副本在不同的broken上。

HW

  高水位。这个是消费者目前只能拉倒的消息偏移量。这个值是所有副本AR里面保存数据最小值offset。

LEO

  则是记录当前下一条数据写入的offset.

版本号

  kafka-2.11-2.1.1,这个里面2.11是scala的编译器版本,2.1.1是kafka真实的版本。2是大版本,第一个1是小版本号,第二个1是修订号。
  0.7远古版本。
  0.8加了副本机制,但是api是远古版本客户端api,且和新版本不兼容。
  0.9加入了安全认证,权限的功能,新版本生产者api稳定,但是消费者api极其不稳定。
  0.10:加了了kafka Streams。消费者api较为稳定。
  0.11提供幂等性produceapi和事务api.以及对消息做了重构。
  1.0和2.0对stream做了优化,消息没有什么变化。

简单java代码

  ps:注意kafka的server.properrties的listeners要开放出来,其格式为protocoll://hostname:port1。协议+域名ip+端口。第一个协议有很多,默认是PLAINTEXT不开启安全校验,域名要写出来,不写则是默认网卡,但是可能为127.0.0.1导致外部无法访问。还有advertised.listeners作用和listeners类似,主要用于IaaS环境,比如公有云上配备多个网卡,包含了私网网卡和公网网卡。对于这种情况,可以设置advertised.listeners绑定公网ip对外客户端使用。使用listeners绑定私网ip给broker之间通信。

  maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

  生产者:

package com.dh.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * 
 * @author Lenovo
  *
 */
public class SimpleKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // broker地址
        props.put("bootstrap.servers", "192.168.147.131:9092");
        // 指定消息key序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 指定消息本身的序列化方式
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic-demo111", "hello, kafka");
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Message sent successfully");
        producer.close();
    }
}

  消费者:

package com.dh.kafka;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SimpleKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.147.131:9092");
        // 每个消费者分配独立的组号
        props.put("group.id", "group.demo");
        // 设置多久一次更新被消费消息的偏移量
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("topic-demo111"));
        System.out.println("Subscribed to topic " + "topic-demo111");
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf(record.value());
        }
    }
}