脚本管理主题信息
创建主题
在集群配置的时候就说过了 auto.create.topics.enable这个值设置为true时。生产者发送一个没有被创建主题的消息时,会自动创建一个分区数为num.partition默认为1,副本因子为default.replication.factory默认为1的主题。消费者请求某个主题时也会创建一个类似的主题。这样主题不利于管理,因此我们需要将 auto.create.topics.enable设置为false。
我们一般通过这个命令在服务器通过kafka-topics.sh脚本来创建主题。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-dh --partitions 4 --replication-factor 1
这个命令是创建一个叫做topic-dh的主题,分区有4个,副本因子2个。
配置文件里面有个logs的配置,里面配置了主题和分区。前面也说过了,每个副本必须在不同的broke。我们不仅可以通过刚刚的路径查看log日志文件,也可以通过。
我们不仅可以通过上述的日志文件查看主题,我们还可以通过zk客户端查看,每创建一个主题就会在zk的/brokers/topics上创建一个同名的实节点。该节点记录了创建该主题分区的分配方案。首先启动这个zk客户端,启动命令如下:
./zkCli.sh –server 127.0.0.1:2181。
然后在命令行界面输入获取节点。和书上不一样的是,我的节点信息存放在 /kafka/brokers/topics/topic-demo111。这个目录下面,没有存放在 /brokers/topics/topic-demo111里面。
我们可以通过下面命令查看分区细节信息。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-create
来查看信息。
我们可以更加详细的分配主题副本不同节点的分配。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-createsss --replica-assignment 2:0,0:1,1:2,2:1
这很好理解,2个副本,4个分区,分区1的副本是2,0,分区2的副本是0,2之类的。前面说过了,副本必须在不同节点上面,因此如果你设置的节点是一样的,会报错。同时如果设置的分区的副本数不一样也不行,也会报错。比如:2:0,0,2:1这种情况,第二个分区的副本是1个其余分区的副本是2个。2:0,,2:1跳过某个分区也是不行的。
其次,我在集群配置说了,我们可以在主题里面设置参数从而覆盖broker参数。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-dh --partitions 4 --replication-factor 1 --config cleanup.policy=compact --config max.message.bytes=10000
同时我们创建主题时不能同名。可以加这么一个参数来限定。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-dh --partitions 4 --replication-factor 1 --if-not-exists
kafka内部做埋点处理时会根据主题的名称来命名metrics的名称,且将”.”改成”_”,因此topic.1_2和topic_1_2的metrics的名称都是topic_1_2,因此注意是否重名。
broke支持指定机架信息,如果制定了机架信息,则在分区副本分配是会尽可能的让分区副本分配到不同机架上,通过参数broke.rack=RACK1来配置的。如果集群里面部分broke指定了机架,部分没有指定,那么依旧会报错。
使用这个脚本创建主题,也就是这个kafka-topic.sh,本质上是调用kafka.admin.TopicCommand这个类实现对主题的管理,我们可以直接使用这个类来创建主题。demo如下:
String[] opts = new String[] { "--zookeeper", ":/2181/kafka", "--create", "--replication-factor", "1",
"--topic", "topic-create-api" };
kafka.admin.TopicCommand.main(opts);
分区副本的分配
除了前面说的–replica-assignment参数来直接指定副本分配情况,如果没有的话,则是按照内部逻辑进行处理的,有2个方案。有前面我说的机架信息,和没有机架信息。
没有机架信息的方案如下:(ps 这是scala代码)
val rand = new Random
private def assignReplicasToBrokersRackUnaware(
nPartitions: Int, //分区数
replicationFactor: Int, //副本因子
brokerList: Seq[Int], //集群中broker列表
fixedStartIndex: Int, //起始索引。默认为-1
startPartitionId: Int //起始分区编号,默认-1
): Map[Int, Seq[Int]] //返回值类型
= {
val ret = mutable.Map[Int, Seq[Int]]() //声明一个map
val brokerArray = brokerList.toArray //获取brokerid的列表
//起始索引小于0,那么从brokerid列表里面获取一个随机的有效值
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
//确保起始分区大于0
var currentPartitionId = math.max(0, startPartitionId)
//指定副本间隔
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
//这是一个for循环,scala语法看着好难受,遍历所有分区。
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
//获取第一个副本索引
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
//生成一个该分区副本集合
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
//保存该分区的所有副本分配的broker集合
for (j <- 0 until replicationFactor - 1)
//为其余副本分配broker
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
//保存该分区的副本分配信息
ret.put(currentPartitionId, replicaBuffer)
//继续下一个分区
currentPartitionId += 1
}
ret
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
这个算法使得分区副本分配的很均匀。差不多正好。
指定机架信息和没指定机架信息本质上差不多。一个机架可以分配多个broker节点,但是满足下面条件的broker不可哟添加到当前分区的副本列表里面。
1.此broker所在机架已经有一个broker在这个分区的副本列表里面,且其他机架中没有任何的broken在该分区的副本列表里面。
2.此broker已经在在列表,且其他broker不在。创建主题时实质上是在zookeeper中的/kafka/brokers/topics节点下创建和该主题对应的子节点,并且写入副本分配信息,且在/config/topics节点下创建该节点对应的子节点并且主题配置信息。kafka创建主题的实质上动作是交给控制器异步完成的。
因此我们可以直接通过创建规则下的节点,来直接创建一个新的主题。这样我们可以绕过一些规则,比如我们创建主题分区的时候都是从0开始计数。我们通过创建zookeeper节点就不用从0开始累加了。
查看主题
kafka-topics.sh这个命令有5个指令类型:create,list,describe,alter和delete。其中list和describe是查看主题信息的。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka -list
这个命令是查看当前kafka当前所有可用主题。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-create
这个是查看topic-create这个主题的详细信息,可以接多个主题,一次查看多个主题信息。如果没有–topic这个参数则是查看所有主题信息。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topics-with-overrides
–topics-with-overrides加这个参数则是查看所有使用了覆盖配置的主题。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-create --under-replicated-partitions
这个参数是查询当前主题所有包含失效副本的分区
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-create -unavaliable-partitions
这个参数查看主题中没有lead副本的分区。
修改主题
当一个主题被创建之后,依然允许我们对其做一定修改,比如修改分区个数,修改配置,通过alter指令来完成的。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-create --partitions 3
这个是将topic-create的分区修改为3.如此的话可能会有影响。
目前是不支持分区从多变少的。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-create --config max.message.bytes=10000
这个命令是修改这个主题的配置。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-create --delete-config max.message.bytes
这个是删除之前的配置。使其恢复默认配置。
我们一般通过kafka-configs.sh脚本来执行修改主题配置信息。
配置管理
有个脚本kafka-configs.sh是专门对配置进行操作的。可以在运行时修改原有的配置。相对于之前的脚本,主要是可以修改broker,client,users这些配置的配置
./bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name topic-create
这个脚本支持查询主题,broker,client,users这些配置的配置,根据–entity-type来区分。–entity-name 显然指的是类型名字。这个查出来的仅仅是配置信息。和之前脚本不一样,这个命令本质上是从zookeeper上读取相关节点信息,/config/type/name
./bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name topic-create --add-config cleanup.policy=compact,max.message.bytes=10000
修改主题使用–add-config来增,改,覆盖之前的配置。
./bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name topic-create --delete-config cleanup.policy
删除原配置–delete-config,删除之前被覆盖的配置,恢复为默认配置。
使用kafka-configs.sh来修改脚本时,会在对应zookeeper中创建一个节点,并且将变更的配置写入到这个节点。
删除主题
当某个主题已确定不在使用时,为了节约资源,我们最好删除。
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic topic-create
这个参数和delete.topic.enable有关,默认为true,如果为false,那么删除操作就会被忽略。同时我们删除内部主题,不存在的主题时都会报错。当然加了–if-exists这个命令后就会忽略报错。
使用这个命令的本质是在zookeeper上的/admin/delete-topics路径下创建一个和待删除主题同名的节点,和创建主题一样,真正的删除动作是kafka的控制器完成的。
因此我们可以通过创建一个这样的节点来删除主题。
create /admin/delete_topics/topic_delete ""
如此就删除了这个叫做topic_delete的主题,同理,我们也可以创建按照规则的主题。
更加手动的方式,一个主题其信息元数据存在zookeeper的 /brokers/topics和config/topics路径下的,消息数据则是存在log.dir我们配置的路径下面,我们只需要删除这些东西即可,规则如下:先删除brokers/topics和config/topics路径下的节点,2者顺序任意,然后删除其数据文件。
KafkaAdminClient
package com.dh.kafka;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
/**
* 创建主题试试 使用KafkaAdminClient来创建主题。
*
* @author Lenovo
*
*/
public class CreateTopic {
public static void main(String[] args) {
String brokerList = "192.168.147.132:9092";
String topic = "topic-admin";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient client = AdminClient.create(props);
NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
client.close();
}
}
创建主题,给出了案例,很简单,创建主题时,有很多构造方法。我们先看其属性
public class NewTopic {
private final String name;
private final int numPartitions;
private final short replicationFactor;
private final Map<Integer, List<Integer>> replicasAssignments;
private Map<String, String> configs = null;
}
replicasAssignments这个参数是分区编号-broke列表。可以手动指定分区和broke的分配。configs则是配置的设定,我们可以给主题设置config.从而覆盖broke的配置。
AdminClient使用自己内置的协议来管理发送请求等功能。自己使用相关协议发送,然后再用相关协议解析。
主题的合法性
我们一般禁止客户端直接创建主题,不利于运维维护。但是前面AdminClient却可以直接创建。kafka有一个参数,叫做creat.topic.policy.class.name默认为null.提供了一个入口用来验证主题创建的合法性。我们自定义一个实现CreateTopicPolicy接口的类,然后让上面的参数指向我们这个类的全限定名。在启动服务,即可。这个类要在服务端,打个jar扔到classpath里面
优先副本选举
我们创建一个分区为3,副本为3的主题,必须要大于3的broke来支持。然后重启其中一个broke,那么lead副本可能不均衡了。由于消费者都是直接从lead副本交互数据,所以影响蛮大的。而创建和修改主题时,会有一个叫做优先副本的概念,kafka会通过一定的方式促使优先副本的选举为lead副本,从而使得分区平衡。当然不同broke的负载是不一样的,有的高,有的低。
这个方式是在broke端配置的,将auto.leader.rebalance.enable设置为true(默认也是true),开启后,kafka的控制器会启动一个定时任务来轮训所有broke节点,计算一个值(非优先副本leader副本/分区总数)超过leader.imbalance.per.broker.percentage默认是0.1,超过这个值就会开启优先副本选举以来分区平衡,定时器的周期是leader.imbalance.check.interval.seconds控制,默认300秒。
但是生产环境是不建议开启的,选举优先节点时会阻塞业务,不好,而且分区平衡也不是负载均衡。我们可以在一个时间内,手动去执行分区平衡。是通过执行这个脚本命令。
./bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181/kafka
这个命令是扫描集群里面所有分区,如果分区过多,可能执行失败,因为在选举过程,具体的元数据信息会被存入到zookeeper的/admin/preferred-replica-election节点,如果这些数据超过了zk默认节点大小(默认1M)因此我们可以path-to-json-file参数来小批量的对部分分区执行优先副本选举,通过path-to-json-file来指定一个json文件。
分区重分配
前面我们创建主题时,下线其中一个broke,这个节点的副本都会变得不可用,如果不修复,这个分区负载会一直这样,新增一个节点,也是如此。我们可以用kafka-reassign-partitions.sh来执行分区重分配的任务。原理就是复制,然后删除。