learning, progress, future.

skydh


  • 首页

  • 归档

kafka入门005 -主题和分区

发表于 2019-06-27

脚本管理主题信息

创建主题

  在集群配置的时候就说过了 auto.create.topics.enable这个值设置为true时。生产者发送一个没有被创建主题的消息时,会自动创建一个分区数为num.partition默认为1,副本因子为default.replication.factory默认为1的主题。消费者请求某个主题时也会创建一个类似的主题。这样主题不利于管理,因此我们需要将 auto.create.topics.enable设置为false。
  
  我们一般通过这个命令在服务器通过kafka-topics.sh脚本来创建主题。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-dh --partitions 4 --replication-factor 1

  这个命令是创建一个叫做topic-dh的主题,分区有4个,副本因子2个。
  配置文件里面有个logs的配置,里面配置了主题和分区。前面也说过了,每个副本必须在不同的broke。我们不仅可以通过刚刚的路径查看log日志文件,也可以通过。
  我们不仅可以通过上述的日志文件查看主题,我们还可以通过zk客户端查看,每创建一个主题就会在zk的/brokers/topics上创建一个同名的实节点。该节点记录了创建该主题分区的分配方案。首先启动这个zk客户端,启动命令如下:
  ./zkCli.sh –server 127.0.0.1:2181。
  然后在命令行界面输入获取节点。和书上不一样的是,我的节点信息存放在 /kafka/brokers/topics/topic-demo111。这个目录下面,没有存放在 /brokers/topics/topic-demo111里面。

  我们可以通过下面命令查看分区细节信息。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-create 

  来查看信息。

  我们可以更加详细的分配主题副本不同节点的分配。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-createsss --replica-assignment 2:0,0:1,1:2,2:1

  这很好理解,2个副本,4个分区,分区1的副本是2,0,分区2的副本是0,2之类的。前面说过了,副本必须在不同节点上面,因此如果你设置的节点是一样的,会报错。同时如果设置的分区的副本数不一样也不行,也会报错。比如:2:0,0,2:1这种情况,第二个分区的副本是1个其余分区的副本是2个。2:0,,2:1跳过某个分区也是不行的。
  其次,我在集群配置说了,我们可以在主题里面设置参数从而覆盖broker参数。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-dh --partitions 4 --replication-factor 1 --config cleanup.policy=compact --config max.message.bytes=10000

  同时我们创建主题时不能同名。可以加这么一个参数来限定。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-dh --partitions 4 --replication-factor 1 --if-not-exists

  kafka内部做埋点处理时会根据主题的名称来命名metrics的名称,且将”.”改成”_”,因此topic.1_2和topic_1_2的metrics的名称都是topic_1_2,因此注意是否重名。
  broke支持指定机架信息,如果制定了机架信息,则在分区副本分配是会尽可能的让分区副本分配到不同机架上,通过参数broke.rack=RACK1来配置的。如果集群里面部分broke指定了机架,部分没有指定,那么依旧会报错。

  使用这个脚本创建主题,也就是这个kafka-topic.sh,本质上是调用kafka.admin.TopicCommand这个类实现对主题的管理,我们可以直接使用这个类来创建主题。demo如下:

String[] opts = new String[] { "--zookeeper", ":/2181/kafka", "--create", "--replication-factor", "1",
            "--topic", "topic-create-api" };
    kafka.admin.TopicCommand.main(opts);

分区副本的分配

  除了前面说的–replica-assignment参数来直接指定副本分配情况,如果没有的话,则是按照内部逻辑进行处理的,有2个方案。有前面我说的机架信息,和没有机架信息。
  没有机架信息的方案如下:(ps 这是scala代码)

val rand = new Random
private def assignReplicasToBrokersRackUnaware(
nPartitions: Int, //分区数
replicationFactor: Int, //副本因子
brokerList: Seq[Int], //集群中broker列表
fixedStartIndex: Int, //起始索引。默认为-1
startPartitionId: Int //起始分区编号,默认-1
): Map[Int, Seq[Int]] //返回值类型
= {
val ret = mutable.Map[Int, Seq[Int]]() //声明一个map
val brokerArray = brokerList.toArray //获取brokerid的列表
//起始索引小于0,那么从brokerid列表里面获取一个随机的有效值
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
//确保起始分区大于0
var currentPartitionId = math.max(0, startPartitionId)
//指定副本间隔
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
//这是一个for循环,scala语法看着好难受,遍历所有分区。
for (_ <- 0 until nPartitions) {
  if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
    nextReplicaShift += 1
  //获取第一个副本索引
  val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
  //生成一个该分区副本集合
  val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
  //保存该分区的所有副本分配的broker集合
  for (j <- 0 until replicationFactor - 1)
    //为其余副本分配broker
    replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
  //保存该分区的副本分配信息
  ret.put(currentPartitionId, replicaBuffer)
  //继续下一个分区
  currentPartitionId += 1
}
ret
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}

  这个算法使得分区副本分配的很均匀。差不多正好。
  指定机架信息和没指定机架信息本质上差不多。一个机架可以分配多个broker节点,但是满足下面条件的broker不可哟添加到当前分区的副本列表里面。
  1.此broker所在机架已经有一个broker在这个分区的副本列表里面,且其他机架中没有任何的broken在该分区的副本列表里面。

  2.此broker已经在在列表,且其他broker不在。创建主题时实质上是在zookeeper中的/kafka/brokers/topics节点下创建和该主题对应的子节点,并且写入副本分配信息,且在/config/topics节点下创建该节点对应的子节点并且主题配置信息。kafka创建主题的实质上动作是交给控制器异步完成的。
  因此我们可以直接通过创建规则下的节点,来直接创建一个新的主题。这样我们可以绕过一些规则,比如我们创建主题分区的时候都是从0开始计数。我们通过创建zookeeper节点就不用从0开始累加了。

查看主题

  kafka-topics.sh这个命令有5个指令类型:create,list,describe,alter和delete。其中list和describe是查看主题信息的。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka -list

  这个命令是查看当前kafka当前所有可用主题。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-create 

  这个是查看topic-create这个主题的详细信息,可以接多个主题,一次查看多个主题信息。如果没有–topic这个参数则是查看所有主题信息。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe  --topics-with-overrides

  –topics-with-overrides加这个参数则是查看所有使用了覆盖配置的主题。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-create --under-replicated-partitions

  这个参数是查询当前主题所有包含失效副本的分区

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-create -unavaliable-partitions 

  这个参数查看主题中没有lead副本的分区。

修改主题

  当一个主题被创建之后,依然允许我们对其做一定修改,比如修改分区个数,修改配置,通过alter指令来完成的。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-create --partitions 3

  这个是将topic-create的分区修改为3.如此的话可能会有影响。
  目前是不支持分区从多变少的。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-create   --config max.message.bytes=10000

  这个命令是修改这个主题的配置。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-create   --delete-config max.message.bytes

  这个是删除之前的配置。使其恢复默认配置。

  我们一般通过kafka-configs.sh脚本来执行修改主题配置信息。

配置管理

  有个脚本kafka-configs.sh是专门对配置进行操作的。可以在运行时修改原有的配置。相对于之前的脚本,主要是可以修改broker,client,users这些配置的配置

./bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name topic-create

  这个脚本支持查询主题,broker,client,users这些配置的配置,根据–entity-type来区分。–entity-name 显然指的是类型名字。这个查出来的仅仅是配置信息。和之前脚本不一样,这个命令本质上是从zookeeper上读取相关节点信息,/config/type/name

./bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name topic-create --add-config cleanup.policy=compact,max.message.bytes=10000

  修改主题使用–add-config来增,改,覆盖之前的配置。

./bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name topic-create --delete-config cleanup.policy

  删除原配置–delete-config,删除之前被覆盖的配置,恢复为默认配置。

  使用kafka-configs.sh来修改脚本时,会在对应zookeeper中创建一个节点,并且将变更的配置写入到这个节点。

删除主题

  当某个主题已确定不在使用时,为了节约资源,我们最好删除。

./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic topic-create   

  这个参数和delete.topic.enable有关,默认为true,如果为false,那么删除操作就会被忽略。同时我们删除内部主题,不存在的主题时都会报错。当然加了–if-exists这个命令后就会忽略报错。
  使用这个命令的本质是在zookeeper上的/admin/delete-topics路径下创建一个和待删除主题同名的节点,和创建主题一样,真正的删除动作是kafka的控制器完成的。

  因此我们可以通过创建一个这样的节点来删除主题。

create /admin/delete_topics/topic_delete ""

  如此就删除了这个叫做topic_delete的主题,同理,我们也可以创建按照规则的主题。

  更加手动的方式,一个主题其信息元数据存在zookeeper的 /brokers/topics和config/topics路径下的,消息数据则是存在log.dir我们配置的路径下面,我们只需要删除这些东西即可,规则如下:先删除brokers/topics和config/topics路径下的节点,2者顺序任意,然后删除其数据文件。

KafkaAdminClient

package com.dh.kafka;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

/**
 * 创建主题试试 使用KafkaAdminClient来创建主题。
* 
 * @author Lenovo
 *
 */
public class CreateTopic {
public static void main(String[] args) {
    String brokerList = "192.168.147.132:9092";
    String topic = "topic-admin";
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
    AdminClient client = AdminClient.create(props);
    NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
    CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
    try {
        result.all().get();
    } catch (InterruptedException | ExecutionException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    client.close();
}
}

  创建主题,给出了案例,很简单,创建主题时,有很多构造方法。我们先看其属性

public class NewTopic {
private final String name;
private final int numPartitions;
private final short replicationFactor;
private final Map<Integer, List<Integer>> replicasAssignments;
private Map<String, String> configs = null;
}

  replicasAssignments这个参数是分区编号-broke列表。可以手动指定分区和broke的分配。configs则是配置的设定,我们可以给主题设置config.从而覆盖broke的配置。

  AdminClient使用自己内置的协议来管理发送请求等功能。自己使用相关协议发送,然后再用相关协议解析。

主题的合法性

  我们一般禁止客户端直接创建主题,不利于运维维护。但是前面AdminClient却可以直接创建。kafka有一个参数,叫做creat.topic.policy.class.name默认为null.提供了一个入口用来验证主题创建的合法性。我们自定义一个实现CreateTopicPolicy接口的类,然后让上面的参数指向我们这个类的全限定名。在启动服务,即可。这个类要在服务端,打个jar扔到classpath里面

优先副本选举

  我们创建一个分区为3,副本为3的主题,必须要大于3的broke来支持。然后重启其中一个broke,那么lead副本可能不均衡了。由于消费者都是直接从lead副本交互数据,所以影响蛮大的。而创建和修改主题时,会有一个叫做优先副本的概念,kafka会通过一定的方式促使优先副本的选举为lead副本,从而使得分区平衡。当然不同broke的负载是不一样的,有的高,有的低。
  这个方式是在broke端配置的,将auto.leader.rebalance.enable设置为true(默认也是true),开启后,kafka的控制器会启动一个定时任务来轮训所有broke节点,计算一个值(非优先副本leader副本/分区总数)超过leader.imbalance.per.broker.percentage默认是0.1,超过这个值就会开启优先副本选举以来分区平衡,定时器的周期是leader.imbalance.check.interval.seconds控制,默认300秒。
  但是生产环境是不建议开启的,选举优先节点时会阻塞业务,不好,而且分区平衡也不是负载均衡。我们可以在一个时间内,手动去执行分区平衡。是通过执行这个脚本命令。

./bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181/kafka 

  这个命令是扫描集群里面所有分区,如果分区过多,可能执行失败,因为在选举过程,具体的元数据信息会被存入到zookeeper的/admin/preferred-replica-election节点,如果这些数据超过了zk默认节点大小(默认1M)因此我们可以path-to-json-file参数来小批量的对部分分区执行优先副本选举,通过path-to-json-file来指定一个json文件。

分区重分配

  前面我们创建主题时,下线其中一个broke,这个节点的副本都会变得不可用,如果不修复,这个分区负载会一直这样,新增一个节点,也是如此。我们可以用kafka-reassign-partitions.sh来执行分区重分配的任务。原理就是复制,然后删除。

kafka入门003 -消费者

发表于 2019-06-25

消费者与消费组

  每个消费者都有一个对应的消费组,当消息发送到主题后。只会被投递给订阅他的每个消费组中的某一个消费者。

  按照kafka默认原则。主题X有a,b,c,d。4个分区,有2个消费者组A(C0,C1,C2,C3)4个消费者,B(C4,C5)2个消费者都订阅了这个主题,那么消费者A组中每个消费者都分配一个分区,消费者B每个消费者分配2个分区。消费者只能消费自己所分配分区的消息。换句话说,消费者组之间是共享的,消费组中的消费者是互斥的。

  同时我们可以动态的增加消费者增加消费能力,比如B消费者组,我给他再加2个消费者,那么这个B消费者组就会的4个消费者就会被一个分区一个消费者,如果再加一个消费者没什么卵用,因为分区数目<消费者数目,没有多余的分区可以分配给消费者了。同时,假设组内某个实例挂掉了,Kafka能够自动检测到,然后把这个Failed实例之前负责的分区转移给其他活着的消费者。这个过程就是Kafka中大名鼎鼎的“重平衡”(Rebalance)。

  消息中间件有2种消息投递方式:即点对点模型(Peer to Peer,P2P)和发布订阅模型。这里面的点对点指的是同一条消息只能被下游的一个消费者消费,其他消费者则不能染指。而发布订阅则是将消息发送到某个主题,消息订阅者都可以从这个主题里面获取消息。
  对于kafka来说。所有的消费者都是一个消费组的时候就是点对点模型,消费者隶属于不同消费者组时,则是发布订阅模型。

消息格式

public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;

    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private volatile Long checksum;
}

  简单的我就不说了和生产者差不多,offset表示消息所属分区的偏移量。

参数配置

  bootstrap.servers:消费者客户端连接kafka集群所需的broker地址清单,一般写2个,host:port,host1:port1,不要全部写出来,因为生产者会从给定的broke里面查出其他broker的信息,写2个是为了防止一个宕机了,生产者依旧可以连接到kafak。

  key.deserializer,value.deserializer:和生产者的key.serializer,value.serializer相对应,消费者从broker服务器接受的消息是字节数组存在的,这个就是指定,key和value的序列化操作的反序列化器,来进行数据的反序列化。

  group.id:消费者隶属的消费组的名称。

  client.id:对应kafka客户端的id,也就是客户端的名字。

  fetch.min.bytes:表示poll能从kafka拉取的最小数据量,默认1b

  fetch.max.bytes:最大,是50m,但是不是绝对的。真正最大值是message.max.bytes来配置

  fetch.max.wait.ms:默认500ms,在这个时间内,如果没有fetch.min.bytes的限定值,还是返回。

  max.partition.fetch.bytes:每个分区返回给consumer最大的数据量,默认1m。

  max.poll.records:一次poll最大的消息数。默认500.

  connections.max.ide.ms:多久之后干不限制的连接,默认9分钟。

  exclude.internal.topic:默认true,没法通过正则来订阅内部主题。

  receive.buffer.bytes:设定socket的接受缓冲区大小,默认大小:64kb.

  send.buffer.bytes:设定sockect发送缓冲区大小,默认是128kb.

  request.timeout.ms:配置consumer等待请求响应时间。默认是30000ms。
  
  metadata.max.age.ms:配置元数据更新时间,默认5分钟,即便元数据没有任何变化。

  reconnect.backoff.ms:配置尝试重新连接指定主机之前的等待时间。避免某些故障的情况下,频繁发送。默认50ms

  retry.backoff.ms:配置尝试重新发送失败到指定主题分区的等待时间,避免某些故障的情况下,频繁发送。默认100ms
  1

  isolation.level:配置消费者的事务隔离级别。默认是read_uncommitted,可以消费到HW.      

代码拼写

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

  我们写这些参数时可能某个字符写错了。因此可以用ConsumerConfig来替代。   

订阅主题和分区

  通过KafkaConsumer的subscribe方法来订阅主题。可以订阅多个主题。因为接收对象是个集合。下面是常见的一个接口方法。

public void subscribe(Collection<String> topics) {
    subscribe(topics, new NoOpConsumerRebalanceListener());
}

  还有一个接口是通过正则表达式来订阅的,只要是符合要求的,都会被订阅,即便是后面新增的也是如此。

@Override
public void subscribe(Pattern pattern) {
    subscribe(pattern, new NoOpConsumerRebalanceListener());
}

  比如

consumer.subscribe(Pattern.compile("topic-.*"));

  这个就是以topic开头的主题都会被订阅。
  在kafka客户端里面TopicPartition类表示分区。

public final class TopicPartition implements Serializable {

private int hash = 0;
private final int partition;
private final String topic;
}

  主要2个属性,主题名称,和分区编号。
  
consumer.assign(Collections.singletonList(new TopicPartition(topic,0)));
  我们可以通过assign方法订阅主题里面的某个分区。
  但是主题里面有哪些分区时元数据信息。我们可以获取到这些信息,通过生产者的partitionsFor获取分区元数据信息。

List<PartitionInfo> list=consumer.partitionsFor(topic);

public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
}

  topic是主题,partition分区号,leader分区的lead副本,replicas分区的AR集合,inSyncReplicas是ISR集合,offlineReplicas是OSR集合。
  我们可以先通过partitionsFor方法获取到元数据信息,在进行分区订阅。

consumer.unsubscribe();

  我们可以通过上述方法取消订阅。 
  我们订阅主题消息的方式有3个,分别代表不一样的状态,他们是互斥的。

反序列化

  和生产者一文的大致类似,只是反着来了,没啥好说的。

消息消费

  消息的消费有2个模式,推:服务器主动把消息推给消费者,拉:消费者主动向服务器发起请求拉取消息。
  kafka则是通过poll方法去拉取的。
  在我们代码里面,我们就是循环不断调用poll来拉取数据的。poll方法有个参数是时间,表示消费者缓冲区里面没有数据时阻塞。
  我们可以根据消息分区维度进行消费。

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (TopicPartition tp : records.partitions()) {
                for (ConsumerRecord<String, String> record : records.records(tp))
                    System.out.printf(record.value());
            }

  同理,也可以根据消息主题进行区分消费。主要用到了下面的方法:

  public Iterable<ConsumerRecord<K, V>> records(String topic) {
    if (topic == null)
        throw new IllegalArgumentException("Topic must be non-null.");
    List<List<ConsumerRecord<K, V>>> recs = new ArrayList<>();
    for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
        if (entry.getKey().topic().equals(topic))
            recs.add(entry.getValue());
    }
    return new ConcatenatedIterable<>(recs);
}

位移提交

  在kafka服务器端每个消息有个offset用来表示消息所在分区的位置,我们用偏移量来表示,而消费者端也有一个offset,表示消费者消费到分区的哪个消息了,我们用位移来表示。
  消费者消费的消息都是没有消费的,因此这个位移必须保存且持久化,这个位移保存在kafka内部的主题_consumer_offsets中,每个消费组一个消费位移。消费者在消费完消息后,需要再这里提交消费位移的提交。记住消费者要提交的消费位移不是已经消费的消息位置的最后一个,而是已消费消息位置+1,这个没有消费的位置。而committed offset则表示已经提交过的消费位移。

@Override
public long position(TopicPartition partition) {
    return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
}

 

@Override
public OffsetAndMetadata committed(TopicPartition partition) {
    return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
}

  消费者的2个接口可以分别获取到上述的2个值。这2个值在某些情况是一样的,但是在某些情况是不一样的。

  对于位移提交是要把握时机的,比如你拉取了一大堆数据,还没有消费完就出了bug,你却已经提交了消费位移,那么就会造成消息丢失。如果全部不提交,那也可能造成消息重复。
  kafka消息位移提交的策略是自动提交,定期提交,每隔5秒。是以下参数配置
  enable.auto.commit默认是true,
  auto.commit.interval.ms默认是5s。

  在默认情况下,刚提交完一次位移,然后拉取一批消息消费,在下一次自动提交消费位移钱,消费者宕机了,那么重启后,或者转给别的消费者后,就要重复消费。
  而消息丢失呢,主要是你代码造成的,比如a线程拉取数据到本地缓存,b线程处理缓存的数据,某个点,提交了位移,因为数据确实消费完了,但是b线程没有消费到缓存中对应的那个点就宕机了,那么就造成了数据丢失。

  当然上述情况都是异常情况导致的。正常情况不会如此,因此kafka提供了手动提交位移的操作。
  手动提交可以是同步或者异步的。

consumer.commitAsync();
consumer.commitSync();

  对于同步提交依然可能造成消息重复。这个还是批量提交。因为可能中间有异常,但是我们可以把异常捕捉,在针对性的处理。

  更加精细的提交方式

@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
    commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
}

  用这个接口的方法,可以实现每消费一个消息就提交一次位移。一般来说不会这么做,太消耗性能了。一般来说,我们大多是根据分区的粒度来划分提交位移的界限,代码如下:看了你就懂了。

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (TopicPartition tp : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecords = records.records(tp);
                for (ConsumerRecord<String, String> record : pRecords)
                    System.out.printf(record.value());
                long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
            }

  异步提交则是不一样的,执行时,消费者线程不会被阻塞,可能提交消费位移的结果还没有返回之前就开始了新一轮的拉取操作。他有个回调方法很不错。

consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        //这里操作

                    }
                });

  由于是异步,可能出现下面问题consumer.commitAsync失败了,你直接重试,然后下面一个成功了,你重试也成功了,然后宕机重启,完了,消息又重复消费了。一般情况,不需要重试,很少发生,后面的会补上。同时我们可以在异步提交之后,在try一下,为了保证消息位移的提交在加个同步提交。

控制消费

  有时我们需要暂停某些分区的消费,当达到某些时间时再恢复。

consumer.pause(partitions);
consumer.resume(partitions);

  这个是暂停和恢复2个方法。

Set<TopicPartition> set=consumer.paused();

  这个是查看被暂停分区的方法。

  除了while (isRunning.get()) {}这个方法来设定是否关闭连接外,还可以同期线程外的wakeup方法来退出poll的逻辑。他会抛出一个可捕捉的异常。我们要处理。但是要注意同时关闭资源,调用close方法。

指定位移消费

  这些情况下,可能找不到消息位移。
  1.新建立的消费组。
  2.消费组新的一个消费者订阅了一个新的主题。
  3._consumer_offsets这个主题中有关这个消费组的位移信息因为过期被删除。
  当kafka找不到消息消费位移后,就会根据消费者客户端参数auto.offset.reset来读取,这个配置默认读取latest,表示从分区末尾开始读取消息。如果改成earlist则是从起始处读取配置。如果改成none,则是抛出异常。只能是这3个值。

  除了找不到消费位移,位移越界也会触发auto.offset.reset操作。
  我们使用poll 来批量抓取消息时是黑盒的,但是我们可以通过seek细致的抓取消息处理。

public void seek(TopicPartition partition, long offset)

  他可以为每个分区设定位移。

Set<TopicPartition> set=consumer.assignment();

  
  获取当前消费者被分配的分区。

  我们先poll一次,获取到元数据等信息,然后对分区设置位移,开始消费(ps,不要担心poll会造成消息丢失,因为我们要设定自己的新位移,那次poll的消息是无意义的)

  需求来了,我们需要消费昨天8点之后的消息。

@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
    return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs));
}

  参数是map,key是分区,value是时间戳。这个方法会返回时间戳大于等于待查询时间的第一条消息的位置和时间戳。我们可以利用这个位置来针对性的消费。

  位移越界也会造成auto.offset.reset启用。

  我们同时可以把消费位移保存到db里面,然后下次消费时从db读取位移,再消费。

再均衡

  前面说了,基于主题的可以再均衡,直接订阅分区的无法进行再均衡。就是更前面我们说的,一个消费组,新增或者减少消费者,会进行均衡处理。一个分区被重新分配给另一个消费者时,消费者当前的状态会丢失。同时 再进行再均衡期间,消息是无法被读取的。如果某个消费者消费了消息,消息位移还没有提交,那么这个分区被分配给新的消费者时,那么就会造成消息重复消费。
  这个再均衡和我们之前说的NoOpConsumerRebalanceListener相关。这个监听器是用来设定发生再均衡前后的一些准备和收尾工作。主要下面2个方法处理

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}

  这个方法是在均衡之前,消费者停止读取消息之后被调用。可以用这个方法来提交消息位移。

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

  这个方法在重新分配分区之后和消费者重新开始读取之前调用。

Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<>();
    consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            consumer.commitSync(offsets);
            offsets.clear();
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

        }
    });

  我们需要把消费位移记录到map里面。每一次消费消息时,把这个消息的分区和分区位移加到这个map里面。

消费者拦截器

public interface ConsumerInterceptor<K, V> extends Configurable {
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    public void close();
}

  这个拦截器,必须实现这个接口,主要2个方法,onConsume方法是poll方法返回之前调用的,onCommit则是提交完消费位移后调用的。
  注意要在配置里面添加:

props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, value)

  和生产者一样,可以写成链式。

多线程实现

  生产者是线程安全的,但是消费者不是线程安全的。

 private void acquire() {
    long threadId = Thread.currentThread().getId();
    if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
        throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
    refcount.incrementAndGet();
}

  他有个方法是专门检测当前是否只有一个线程操作。

  private void release() {
    if (refcount.decrementAndGet() == 0)
        currentThread.set(NO_CURRENT_THREAD);
}

  和这个方法相互配合,类似加锁和解锁。注意compareAndSet(a,b)这个方法是a先和这个类的数据比较,如果一样则把b放进去替代数据。
  为了提高消费者的消费能力,防止broke里面堆积的消息过多。从而导致消息还没被消费就丢失。下面几个策略可以提高消费能力。
  1.每个线程实现一个KafkaConsumer,但是线程数目不能超过分区数,前面解释过。
  2.去除再平衡效果,直接以分区作为基本单位进行消费。多个线程消费一个分区。一般不会推荐这么做的。
  直接启动多个消费者线程,有个缺点就是,每个线程都要维护一个tcp连接。这会造成很大的系统开销。
  
  3.对于kafka来说poll的速度是很快的,但是一般是处理消息是很消耗性能。如果改动了这一块,那么对性能的提升很大。因此我们可以吧消息处理模块变成多线程处理。但是对顺序消费有问题。同时位移提交也要处理。可以考虑共享变量Map offsets=new HashMap<>()同时注意覆盖问题,调用时;还要注意消息丢失问题,比如2个处理业务的线程,a发生异常,没有提交,b在a后面的,然后成功提交,那么a的数据就丢失了。
  可以考虑tcp的滑动窗口模式。把数据丢到一个缓存队列里面。然后多个设定一个固定的大小的窗口,也就是,最小游标和最大游标。然后多线程(业务线程)充窗口拉数据,处理完了就回去报告,接着拉取数据,同时窗口判断自己是否可移动。

kafka入门004 -集群配置

发表于 2019-06-24

配置存储的参数

  log.dirs:这是非常重要的参数,指定了Broker需要使用的若干个文件目录路径。这个参数是没有默认值的,必须由你亲自指定。样例:比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。多个磁盘可以提高读写性能,实现故障转移。
  log.dir:能表示单个路径,它是补充上一个参数用的。一般不需要设置

zookeeper参数

  zookeeper.connect:比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。2181是ZooKeeper的默认端口如果多个kafka集群共用一个zookeeper集群时,如何配置处理。在最后的一个zookeeper后加个别名例如:zk1:2181,zk2:2181,zk3:2181/kafka

broker通信相关

  listeners这个配置的,前文由于写demo介绍过。

  listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的Kafka服务。
  advertised.listeners:和listeners相比多了个advertised。Advertised的含义表示宣称的、公布的,就是说这组监听器是Broker用于对外发布的。
  host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。

topic

  auto.create.topics.enable:是否允许自动创建Topic。一般为false,让运维管控。免得出现一堆奇葩命名的
  unclean.leader.election.enable:是否允许Unclean Leader选举。每个分区存在多个副本,有的副本落后lead太多数据,但是lead和其他不落后的都挂了,如果为true,那么这个分区可能数据丢失,因为会选举落后很多的副本作为lead
  auto.leader.rebalance.enable:是否允许定期进行Leader选举。它的值为true表示允许Kafka定期地对一些Topic分区进行Leader重选举,这个是强行选举,因此需要设置为false.

数据留存

  log.retention.{hour|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说ms设置最高、minutes次之、hour最低。
  log.retention.bytes:这是指定Broker为消息保存的总磁盘容量大小。
  message.max.bytes:控制Broker能够接收的最大消息大小

参数级别

  我们可以在broke端加参数,也可以在topic端添加参数。参数意义一样的,topic端会覆盖broke端。不同消息主题有不同的需求,比如A主题需要消息留存时间2h,B主题则仅仅需要5m,那么们broke端只能设置一个最长的参数来确定,但是有了topic主题后,我们可以针对性设置。

  retention.ms:规定了该Topic消息被保存的时长。默认是7天,即该Topic只保存最近7天的消息。一旦设置了这个值,它会覆盖掉Broker端的全局参数值。

  retention.bytes:规定了要为该Topic预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。

  我们可以在用户创建kafka的topic,和修改topic时可以增加修改topic参数。

jvm参数

  一般最好是java1.8,堆大小最好6GB。我们必须在启动kafka之前加好这些环境变量。
  KAFKA_HEAP_OPTS:指定堆大小。KAFKA_JVM_PERFORMANCE_OPTS:指定GC参数。
  然后正常启动即可。

操作系统参数

  文件描述符,可以将其设置的大一点。
  文件系统类型,XFS性能更高。
  swap,内存交换空间,这个值设置的尽量小一点,但是不要设置为0。因为那样内存不够了,会直接杀死任意一个进程。
  flush落盘时间,这个值可以大一点。

kafka入门002 -生产者

发表于 2019-06-20

消息格式

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}

  topic是消息主题,partition是分区号,headers一般没用,key是消息的键,同一个key的消息会被发到一个分区里面。

参数配置

  bootstrap.servers:生成者客户端连接kafka集群所需的broker地址清单,一般写2个,host:port,host1:port1,不要全部写出来,因为生产者会从给定的broke里面查出其他broker的信息,写2个是为了防止一个宕机了,生产者依旧可以连接到kafak。

  key.serializer,value.serializer:broker服务器接受的消息是字节数组存在的,这个就是指定,key和value的序列化操作的序列化器。

  client.id:对应kafka客户端的id,也就是客户端的名字,如果不设置则是默认取名,produce-i

  retries:对于可重试异常,可重试次数配置,默认是0,ps(KafkaProducer一般有2种异常,一个是可重试异常,比如NetworkException,表示网络异常,可以通过重试解决,一个是不可重试异常,比如RecordToolargeException,表示消息太大,不会重试。),重试次数超过了配置的值。也会抛出异常。
  retry.backoff.ms:这个主要是和上面的retries相对应的。是重试时间间隔。默认是100

  props.put(ProducerConfig.ACKS_CONFIG, “0”);
  当为0时,发到服务端就不管了。
  当为1时,只要leader副本写入,就返回,但是可能leader奔溃了,但是其他副本没有拉取到消息。造成数据丢失
  当为-1或者all时,必须所有副本都要同步完才返回。

  max.request.size:默认值是1m,主要要和broke端配置的message.max.bytes相对应。

  max.in.flight.requests.per.connection为1时,可以保证顺序消费。
  compression.type:默认为none,可以配置为gzip,lz4等压缩方式,可以减小网络io但是会造成时延。

  connection.max.idle.ms:默认9分钟,多久后关闭限制的连接。

  Linger.ms:这个参数是指ProducerBatch(看后文)等待更多ProducerRecord加入的时间,默认为0,生产者会在这个ProducerBatch被填满,或者等待时间超过Linger.ms时发送出去

  send.buffer.bytes:socket接收消息缓冲区大小。默认32k。
  
  send.buffer.bytes:socket发送消息缓冲区大小。默认128k。

  request.timeout.ms:等待请求响应最长时间,默认30000ms,超时后可以选择重试。但是注意了,这个值要比broker端的replica.lag.time.max.ms的值要大,可以减小因为客户端重试导致的消息重复。

代码拼写

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

  我们写这些参数时可能某个字符写错了。因此可以用ProducerConfig来替代。
  KafkaProducer是线程安全的。

发送消息

  3个模式
  1.发后即忘
  这个模式不关心消息是否正确到达,大多数也没什么问题,但是存在不可重试异常时,会造成消息的丢失,这个方式效率最高,可靠性最低。实现方式为

try {
        producer.send(record);
    } catch (Exception e) {
        e.printStackTrace();
    }

  2.同步
  这个模式是直接链式调用send方法返回对象的get方法来阻塞等待kafka的响应。直到消息发送成功,或者发生异常被捕捉处理。实现方式如下:

try {
        Future<RecordMetadata> future=producer.send(record);
        RecordMetadata recordMetadata=future.get();
    } catch (Exception e) {
        e.printStackTrace();
    }

  3.异步
  这个解决了上面的性能问题,上面的future也是异步的逻辑处理。但是写法没有下面的方便:

producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null)
                    exception.printStackTrace();
            }
        });

  这样就不会阻塞程序的执行了,等道kafka响应时就会做出相关操作处理。前面说过kafka通过偏移量保证顺序消费,响应如果是同一个分区,那么也是顺序的。

  close方法,会阻塞之前所有请求后再关闭KafkaProducer。来进行资源回收。

序列化

  除了上面说的StringDeserializer这个string类型的序列化器之外,还有Integer,Long,Double,Bytes,都实现了序列化接口,主要有2个方法,解析下:

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
    Object encodingValue = configs.get(propertyName);
    if (encodingValue == null)
        encodingValue = configs.get("serializer.encoding");
    if (encodingValue instanceof String)
        encoding = (String) encodingValue;
}

  这个方法是KafkaProducer创建实例时调用的,来确定编码类型。

@Override
public byte[] serialize(String topic, String data) {
    try {
        if (data == null)
            return null;
        else
            return data.getBytes(encoding);
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
    }
}

  很简单,序列化。
  我们也可以自定义序列化器。只需要实现org.apache.kafka.common.serialization.Serializer接口即可。然后替换下即可。

分区器

  生成者send数据到broke时,需要经过拦截器,序列化器,和分区器等,在消息对象的partition不为空时,则不走分区器,为空则需要走分区器,分区器根据key来进行分区。
  默认的分区器是DefaultPartitioner,实现了Partitioner这个接口。在这个类里面主要方法。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

  
  代码很简单,key不为空时根据hash算法,算出key对应的编号。为空时则是获取这个topic的所有分区轮训。中间用到了currenthashmap
  我们也可以通过实现Partitioner这个接口来自定义分区器,只需要早参数配置时加入

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class.getName());

  来设定你的分区器。

生产者拦截器

  需要实现ProducerInterceptor这个接口即可。KafkaProducer的send方法

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

  会在第一时间先调用拦截器的send方法来对消息进行相应的处理。

  同时会在消息应答或者消息发送失败时,调用拦截器onAcknowledgement方法。这个调用在Callback之前发生。

public class ProduceInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
    // TODO Auto-generated method stub
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    // TODO Auto-generated method stub

}
@Override
public void close() {
    // TODO Auto-generated method stub
}
}

  简单如上实现即可。 
  然后在prop上注册即可。

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProduceInterceptor.class.getName());

  同理,这边也支持职责链模式。也就是多个拦截器。

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProduceInterceptor.class.getName()+","+ProduceInterceptor.class.getName());

整体流程

  1.主线程生产者调用send()方法后,先经过拦截器,序列化器和分区器后,在缓存到RecordAccumulator(消息累加器)中。
  2.Send线程负责从RecordAccumulator获取消息,批量发送,减少网络传输的资源消耗。
  3.有2个生产者参数,buffer.memory 这个是RecordAccumulator的大小默认是32m, 如果1的速度大于2,那么可能空间不足,因此1要么阻塞,要么抛出异常,取决于max.block.ms的配置,这个值默认是60s。
  4.RecordAccumulator内部为每个分区维护了一个双向队列。ConcurrentMap> batches,写入消息是追加到尾部,send读消息时从头部读取,这里面的ProducerBatch是一个或者多个ProducerRecord的合成。
  5.当一个消息ProducerRecord被append到RecordAccumulator时会根据一个叫做batch.size默认参数是16kb进行区分,首先判断尾部的ProducerBatch是否可以继续添加ProducerRecord,不可以的话,判断是否大于batch.size,如果不大于则按照16kb的大小创建ProducerBatch,因为RecordAccumulator内部有个BufferPool这个是缓存了固定大小的ByteBuffer对象。
  6.InFlightRequests,在send线程发送到kafka之前,会把对象保存到这个对象里面Map>,这个里面request保存了发出去了,但是没有收到回复的响应请求。
  7.元数据的更新
  当客户端没有所需要的元数据时,就会向其中一个node发送请求,获取元数据(元数据包括:集群有哪些主题,哪些分区,lead副本在哪个节点上等)。

springboot单应用脚手架

发表于 2019-06-19

地址

  脚手架地址

简介

多数据源

  在configuration包里面,我们定义了2个数据源,一个是pgsql,一个是mysql。这边对mysql加了@Primary优先处理,因此mysql可以使用jpa进行操作,而pg由于业务只用其进行批量处理,因此只让其使用springjdbc即可。

  由于jpa对批量处理以及复杂sql的不友好,因此这边建议,简单sql使用jpa,加快开发效率,复杂批量sql使用springjdbc,开发简单高效。我们约定,jpa的dao命名为xxxJpaDao,springjdbc的dao命名为xxxDao。

aop动态代理

  1.非法字符过滤,这个功能主要是检查你的rest接口里面的vo是否存在非法字符,会递归检查到基本类型,但是要检验的vo必须实现一个空接口。支持list,vo,map这3个数据混搭的检查。

  2.任务调度加锁。这个功能主要是用于工程在集群部署的情况下。定时器多次调用做了加锁限制。使用的是redis分布式锁。

  3.数据字段校验,这个主要是使用spring的安全校验来检查表单字段。

  4.ip校验,控制这个游客接口最多访问次数。

上下文

  上下文是用ThreadLocal实现的,默认实现了用户信息的缓存,同时开发也可以自定义往里面丢值进去,但是请记住,我们访问服务的线程是从线程池里面取出来的,因此这个线程里面可能存在旧的值,因此,你需要在代码里面控制,要么每次都覆盖,要么在这个线程或者这个请求结束前将其数据释放。默认使用覆盖策略。

枚举

  常量一律使用枚举。

异常

  使用了全局异常捕捉。

http

  由于是单应用,存在远程调用,为了方便,我们使用http方式调用,采用的是spring的RestTemplate作为接口,httpClient为实现的方式。

拦截器

  拦截器有2个,一个是登录校验的拦截器,一个是基本非法字符校验的拦截器。

国际化

  返回信息需要国际化处理,因为可能是不同国家,需要不同的语言。

分页查询

  基于springjdbc封装了一个分页查询的工具类。ps:可以考虑封装更多的工具类来完全代替jpa。

事务

  定义了2个事务管理器,一个是pgsql的DataSourceTransactionManager,来管理pgsql的springjdbc的事务,一个是JpaTransactionManager的事务管理器,可以管理mysql的jpa和spring jdbc的事务管理。

  ps(This transaction manager also supports direct DataSource access within a transaction
(i.e. plain JDBC code working with the same DataSource).
This allows for mixing services which access JPA and services which use plain JDBC (without being aware of JPA)!
Application code needs to stick to the same simple Connection lookup pattern as with DataSourceTransactionManager
(i.e. DataSourceUtils.getConnection(javax.sql.DataSource) or going through a TransactionAwareDataSourceProxy).
Note that this requires a vendor-specific JpaDialect to be configured.)

返回值

  返回值信息有4个,分别是,是否成功,返回code,返回message,返回vo。

HikariCP 笔记

发表于 2019-06-18

优势

  1.HikariCp优化创建了concurrentBag,增加了并发读写的效率,减少了其他没必要的操作,是的集合性能提高了很多。
  2.使用threadlocal缓存连接以及大量使用cas的机制,避免lock.
  3.使用Javassist来实现动态代理类,实现了字节码的优化。
  4.心跳语句由select 1 变成了ping。
  5.大量的方法内联。效率提高了很多。
  6.代码量少。
  7.FastStatementList代替了Arraylist,去掉了调用get时的范围检查,增加性能。

声明定义

  HikariDataSource:数据源,我们通过其对象来获取连接。

  HikariPool:连接池,对资源进行管理。

  ConcurrentBag:作为物理连接的共享资源站。

  PoolEntry:物理连接的封装。

  获取连接流程getConnection:
  1.HikariDataSource对象调用getConnection来获取连接
  2.调用HikariPool的getConnection获取连接。
  3.调用ConcurrentBag取出一个PoolEntry.然后这个PoolEntry通过createProxyConnection掉用工厂类生成HikariProxyConnection返回。

  连接关闭closeConnection():
  closeConnectionExecutor关闭连接后,会调用fillPool()方法对连接池进行连接填充。同时HikariPool提供evictConnection(Connection)方法对物理连接进行手动关闭。
  连接关闭close():
  HikariProxyConnection调用close方法时调用了PooleEntry的recycle方法,之后通过HikariPool调用了ConcurrentBag的requite放回。(poolEntry通过borrow从bag中取出,再通过requite放回。资源成功回收)。

  创建连接createPoolEntry:
  HikariCP中通过独立的线程池addConnectionExecutor进行新连接的生成,连接生成方法为PoolEntryCreator。物理链接的生成只由PoolBase的newConnection()实现,之后封装成PoolEntry,通过Bag的add方法加入ConcurrentBag。当ConcurrentBag存在等待线程,或者有连接被关闭时,会触发IBagItemListener的addBagItem(wait)方法,调用PoolEntryCreator进行新连接的生成。

连接池到底该多大

  2个参数:

  maxPoolSize:最大连接数。

  minIdle:最小连接数。作者建议这个值和maxPoolSize保持一致作为一个固定大小的连接池。

  连接池大家是综合每个应用系统的业务逻辑特性,加上应用硬件配置,加上应用部署数量,再加上db硬件配置和最大允许连接数测试出来的。很难有一个简单公式进行计算。连接数及超时时间设置不正确经常会带来较大的性能问题,并影响整个服务能力的稳定性。具体设置多少,要看系统的访问量,可通过反复测试,找到最佳点。压测很重要。

  我们进行判断时,可以从以下几个层面分析:
  系统中多少个线程在进行与数据库有关的工作?多少个线程在等待获取数据库连接?获取数据库连接需要的平均时长是多少?如果平均时长较长,如大于 100ms,则可能说明配置的数据库连接数不足,或存在连接泄漏问题。   

     

kafka入门001 -基本概念

发表于 2019-06-03

读书笔记系列,一章一篇。
取自深入理解kafka和kafka专栏(极客时间)

基本概念

特点

消息系统

  和传统的消息系统一样,但是额外提供,顺序消费,回溯消费。

存储系统

  消息是持久化存储的。

流式平台处理

  kafka不仅为多个流行的流式处理框架提供了可靠的数据来源,还提供了完整的流式处理类库。

概念

producer

  生产者:生产消息发送给kakfa实例。

Consumer

  消费者:负责消费kafka上面的消息。消费者端采用pull的方式拉去消息,保存了数据的偏移量,当消费者宕机后重新上线,可以根据之前保存的消费位置重新进行消费。  

Broker

  一个kafka实例,一个kafka集群里面有多个Broker,也就是多个kafka实例

ZooKeeper

  Zk负责管理kafka集群的的元数据,控制器的选举等。它是一个分布式协调框架,负责协调管理并保存Kafka集群的所有元数据信息,比如集群都有哪些Broker在运行、创建了哪些Topic,每个Topic都有多少分区以及这些分区的Leader副本都在哪些机器上等信息。

主题

  kafka的消息是根据主题来分类。生产者发送的消息都要指定一个主题,消费者消费消息也是订阅主题。

分区

  主题是逻辑上的概念,一个主题可能是多个分区组成的,一个分区只可能会是一个主题的。分区可以看做是一个可追加的日志文件,每个消息在分区里面都有一个偏移量,kafka通过这个偏移量保证顺序消费,但是kafka保证的是分区顺序消费,而不是主题顺序消费。

副本

  一个分区有多个副本,其中一个是lead副本,其余是follow副本,分布在不同的broker里面,lead副本负责读写,其余副本负责和lead副本消息同步,当lead副本故障时,可以选举follow副本继续提供服务。由于副本的同步是异步的,因此可能不一致,所有副本统称为AR,所有和lead副本保持一致的是统称为ISR,与lead副本滞后的副本则统称为osr副本。这2个集合的副本是动态的,当ISR不满足时则将其提到OSR,当OSR更上时,则将其丢到ISR,副本数据的限制和你的当前borker相关,因为多个副本放在一台broken上毫无意思,因此,每个副本在不同的broken上。

HW

  高水位。这个是消费者目前只能拉倒的消息偏移量。这个值是所有副本AR里面保存数据最小值offset。

LEO

  则是记录当前下一条数据写入的offset.

版本号

  kafka-2.11-2.1.1,这个里面2.11是scala的编译器版本,2.1.1是kafka真实的版本。2是大版本,第一个1是小版本号,第二个1是修订号。
  0.7远古版本。
  0.8加了副本机制,但是api是远古版本客户端api,且和新版本不兼容。
  0.9加入了安全认证,权限的功能,新版本生产者api稳定,但是消费者api极其不稳定。
  0.10:加了了kafka Streams。消费者api较为稳定。
  0.11提供幂等性produceapi和事务api.以及对消息做了重构。
  1.0和2.0对stream做了优化,消息没有什么变化。

简单java代码

  ps:注意kafka的server.properrties的listeners要开放出来,其格式为protocoll://hostname:port1。协议+域名ip+端口。第一个协议有很多,默认是PLAINTEXT不开启安全校验,域名要写出来,不写则是默认网卡,但是可能为127.0.0.1导致外部无法访问。还有advertised.listeners作用和listeners类似,主要用于IaaS环境,比如公有云上配备多个网卡,包含了私网网卡和公网网卡。对于这种情况,可以设置advertised.listeners绑定公网ip对外客户端使用。使用listeners绑定私网ip给broker之间通信。

  maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

  生产者:

package com.dh.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * 
 * @author Lenovo
  *
 */
public class SimpleKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // broker地址
        props.put("bootstrap.servers", "192.168.147.131:9092");
        // 指定消息key序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 指定消息本身的序列化方式
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic-demo111", "hello, kafka");
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Message sent successfully");
        producer.close();
    }
}

  消费者:

package com.dh.kafka;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SimpleKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.147.131:9092");
        // 每个消费者分配独立的组号
        props.put("group.id", "group.demo");
        // 设置多久一次更新被消费消息的偏移量
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("topic-demo111"));
        System.out.println("Subscribed to topic " + "topic-demo111");
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf(record.value());
        }
    }
}

mysql内部临时表

发表于 2019-05-28

简介

  在mysql里面,join_buffer,sort_buffer,内部临时表这3个都是在执行sql时暂存数据用的。前面2个前面文章都有介绍,本文主要介绍内部临时表。

union

(select 1000 as f) union (select id from t1 order by id desc limit 2);

  比如这个sql,就是用到了内部临时表。这个sql就是查询2个子查询的并集,去重,若是不要去重,则是union all。
  这个语句的操作流程是先创建一个临时表把第一个子查询的数据丢进去,然后再把第二个查询的数据丢进去。如果用union all则不涉及到去重,那么就不会创建临时表,只是直接拼装结果返回。

group by

  1.select id%10 as m, count(*) as c from t1 group by m;
  其操作流程为,先创建一个内部临时表,存放2个字段,m,c然后遍历主键,将计算出来的值丢到m,判断是否存在,若存在则+1,然后输出。

大小参数

  默认16m,由这个参数tmp_table_size确认。

优化 

  select SQL_BIG_RESULT id%100 as m, count(*) as c from t1 group by m;
  如此,可以直接省去其中某些步骤,这是针对数据量大的情况直接执行走磁盘临时表。

maven一更新jdk版本就变化

发表于 2019-05-28

问题

  每次更新maven,项目的jdk版本,编译版本就会变成1.5,很奇怪。

背景

  由于公司私服没有某个jar,只能切换为默认配置,从中央仓库下载后上传到私服,结果更新maven后,编译环境就变化。

方案

  发现是setting文件出现问题,需要添加

<profile>
        <id>jdk</id>
        <activation>
            <activeByDefault>true</activeByDefault>
            <jdk>1.8</jdk>
        </activation>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
        </properties>
    </profile>

  进入默认的配置文件即可。

eclipse奇葩缺包异常

发表于 2019-05-27

问题

  早上一来,打开工程,直接报错Missing artifact jdk.tools:jdk.tools:jar:1.8,但是显然其他同事没有报错,网上搜索,需要添加这个包的依赖,不采取。

方案

  tools.jar包是JDK自带的,pom.xml中以来的包隐式依赖tools.jar包,而tools.jar并未在库中。找到答案。eclipse启动的jre不是开发用的jre,而你编译报错则是,在eclipse的jre里面没有找到这个包,我们切换eclipse依赖的jre即可。方法:找到eclipse.ini,在-vmargs这个参数前面添加:
  -vm
  C:\Program Files\Java\jdk1.8.0_172\jre\bin\server\jvm.dll
  即可。重启eclipse,maven更新即可。

1234…13

skydh

skydh

126 日志
© 2020 skydh
本站访客数:
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.3