kafka入门002 -生产者

消息格式

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}

  topic是消息主题,partition是分区号,headers一般没用,key是消息的键,同一个key的消息会被发到一个分区里面。

参数配置

  bootstrap.servers:生成者客户端连接kafka集群所需的broker地址清单,一般写2个,host:port,host1:port1,不要全部写出来,因为生产者会从给定的broke里面查出其他broker的信息,写2个是为了防止一个宕机了,生产者依旧可以连接到kafak。

  key.serializer,value.serializer:broker服务器接受的消息是字节数组存在的,这个就是指定,key和value的序列化操作的序列化器。

  client.id:对应kafka客户端的id,也就是客户端的名字,如果不设置则是默认取名,produce-i

  retries:对于可重试异常,可重试次数配置,默认是0,ps(KafkaProducer一般有2种异常,一个是可重试异常,比如NetworkException,表示网络异常,可以通过重试解决,一个是不可重试异常,比如RecordToolargeException,表示消息太大,不会重试。),重试次数超过了配置的值。也会抛出异常。
  retry.backoff.ms:这个主要是和上面的retries相对应的。是重试时间间隔。默认是100

  props.put(ProducerConfig.ACKS_CONFIG, “0”);
  当为0时,发到服务端就不管了。
  当为1时,只要leader副本写入,就返回,但是可能leader奔溃了,但是其他副本没有拉取到消息。造成数据丢失
  当为-1或者all时,必须所有副本都要同步完才返回。

  max.request.size:默认值是1m,主要要和broke端配置的message.max.bytes相对应。

  max.in.flight.requests.per.connection为1时,可以保证顺序消费。
  compression.type:默认为none,可以配置为gzip,lz4等压缩方式,可以减小网络io但是会造成时延。

  connection.max.idle.ms:默认9分钟,多久后关闭限制的连接。

  Linger.ms:这个参数是指ProducerBatch(看后文)等待更多ProducerRecord加入的时间,默认为0,生产者会在这个ProducerBatch被填满,或者等待时间超过Linger.ms时发送出去

  send.buffer.bytes:socket接收消息缓冲区大小。默认32k。
  
  send.buffer.bytes:socket发送消息缓冲区大小。默认128k。

  request.timeout.ms:等待请求响应最长时间,默认30000ms,超时后可以选择重试。但是注意了,这个值要比broker端的replica.lag.time.max.ms的值要大,可以减小因为客户端重试导致的消息重复。

代码拼写

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

  我们写这些参数时可能某个字符写错了。因此可以用ProducerConfig来替代。
  KafkaProducer是线程安全的。

发送消息

  3个模式
  1.发后即忘
  这个模式不关心消息是否正确到达,大多数也没什么问题,但是存在不可重试异常时,会造成消息的丢失,这个方式效率最高,可靠性最低。实现方式为

try {
        producer.send(record);
    } catch (Exception e) {
        e.printStackTrace();
    }

  2.同步
  这个模式是直接链式调用send方法返回对象的get方法来阻塞等待kafka的响应。直到消息发送成功,或者发生异常被捕捉处理。实现方式如下:

try {
        Future<RecordMetadata> future=producer.send(record);
        RecordMetadata recordMetadata=future.get();
    } catch (Exception e) {
        e.printStackTrace();
    }

  3.异步
  这个解决了上面的性能问题,上面的future也是异步的逻辑处理。但是写法没有下面的方便:

producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null)
                    exception.printStackTrace();
            }
        });

  这样就不会阻塞程序的执行了,等道kafka响应时就会做出相关操作处理。前面说过kafka通过偏移量保证顺序消费,响应如果是同一个分区,那么也是顺序的。

  close方法,会阻塞之前所有请求后再关闭KafkaProducer。来进行资源回收。

序列化

  除了上面说的StringDeserializer这个string类型的序列化器之外,还有Integer,Long,Double,Bytes,都实现了序列化接口,主要有2个方法,解析下:

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
    Object encodingValue = configs.get(propertyName);
    if (encodingValue == null)
        encodingValue = configs.get("serializer.encoding");
    if (encodingValue instanceof String)
        encoding = (String) encodingValue;
}

  这个方法是KafkaProducer创建实例时调用的,来确定编码类型。

@Override
public byte[] serialize(String topic, String data) {
    try {
        if (data == null)
            return null;
        else
            return data.getBytes(encoding);
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
    }
}

  很简单,序列化。
  我们也可以自定义序列化器。只需要实现org.apache.kafka.common.serialization.Serializer接口即可。然后替换下即可。

分区器

  生成者send数据到broke时,需要经过拦截器,序列化器,和分区器等,在消息对象的partition不为空时,则不走分区器,为空则需要走分区器,分区器根据key来进行分区。
  默认的分区器是DefaultPartitioner,实现了Partitioner这个接口。在这个类里面主要方法。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

  
  代码很简单,key不为空时根据hash算法,算出key对应的编号。为空时则是获取这个topic的所有分区轮训。中间用到了currenthashmap
  我们也可以通过实现Partitioner这个接口来自定义分区器,只需要早参数配置时加入

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class.getName());

  来设定你的分区器。

生产者拦截器

  需要实现ProducerInterceptor这个接口即可。KafkaProducer的send方法

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

  会在第一时间先调用拦截器的send方法来对消息进行相应的处理。

  同时会在消息应答或者消息发送失败时,调用拦截器onAcknowledgement方法。这个调用在Callback之前发生。

public class ProduceInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
    // TODO Auto-generated method stub
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    // TODO Auto-generated method stub

}
@Override
public void close() {
    // TODO Auto-generated method stub
}
}

  简单如上实现即可。 
  然后在prop上注册即可。

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProduceInterceptor.class.getName());

  同理,这边也支持职责链模式。也就是多个拦截器。

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProduceInterceptor.class.getName()+","+ProduceInterceptor.class.getName());

整体流程

  1.主线程生产者调用send()方法后,先经过拦截器,序列化器和分区器后,在缓存到RecordAccumulator(消息累加器)中。
  2.Send线程负责从RecordAccumulator获取消息,批量发送,减少网络传输的资源消耗。
  3.有2个生产者参数,buffer.memory 这个是RecordAccumulator的大小默认是32m, 如果1的速度大于2,那么可能空间不足,因此1要么阻塞,要么抛出异常,取决于max.block.ms的配置,这个值默认是60s。
  4.RecordAccumulator内部为每个分区维护了一个双向队列。ConcurrentMap> batches,写入消息是追加到尾部,send读消息时从头部读取,这里面的ProducerBatch是一个或者多个ProducerRecord的合成。
  5.当一个消息ProducerRecord被append到RecordAccumulator时会根据一个叫做batch.size默认参数是16kb进行区分,首先判断尾部的ProducerBatch是否可以继续添加ProducerRecord,不可以的话,判断是否大于batch.size,如果不大于则按照16kb的大小创建ProducerBatch,因为RecordAccumulator内部有个BufferPool这个是缓存了固定大小的ByteBuffer对象。
  6.InFlightRequests,在send线程发送到kafka之前,会把对象保存到这个对象里面Map>,这个里面request保存了发出去了,但是没有收到回复的响应请求。
  7.元数据的更新
  当客户端没有所需要的元数据时,就会向其中一个node发送请求,获取元数据(元数据包括:集群有哪些主题,哪些分区,lead副本在哪个节点上等)。