docker单点kafka-broker扩容

背景

公司为了快速上线,几个月前通过docker镜像 kafka 部署了一台kafka-borker。Dockerfile如下

FROM wurstmeister/kafka:latest

ENV KAFKA_MESSAGE_MAX_BYTES=21474836
ENV KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
ENV KAFKA_CREATE_TOPICS=test:1:1
ENV KAFKA_ZOOKEEPER_CONNECT=zk1:2181

VOLUME ["/kafka"]

随着业务的增长,单节点的kafka-broker已经成为一个潜在的可靠性隐患。所以借着N年难得一遇的停机维护,决定把kafka-broker扩容为主备集群模式,提升稳定性,并为日后的分区扩容提供可能(不停服)。

问题

除了单节点问题,docker的镜像配置也存在几个问题。

未指定broker.id

broker.id是kafka-broker的唯一标识,选举、服务查找等操作都是基于此标识。如果没有显式的指定,kafka-broker会由zk生成并下发一个大于1000的id,如:(1001),并保存在kafka-broker本地的meta.properties文件中。如果手动指定broker.id是无法指定大于1000的值。测试环境每次会执行rm -rf操作,但是zk不会同时操作。所以每次重启的brokerId都会+1。为了方便后续管理,需要指定brokerId。

object KafkaServer {
    if (brokerIdSet.size > 1)
          throw new InconsistentBrokerIdException(
            s"Failed to match broker.id across log.dirs. This could happen if multiple brokers shared a log directory (log.dirs) " +
            s"or partial data was manually copied from another broker. Found $brokerIdSet")
        else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId)
          throw new InconsistentBrokerIdException(
            s"Configured broker.id $brokerId doesn't match stored broker.id ${brokerIdSet.last} in meta.properties. " +
            s"If you moved your data, make sure your configured broker.id matches. " +
            s"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
        else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper
          brokerId = generateBrokerId
        else if (brokerIdSet.size == 1) // pick broker.id from meta.properties
          brokerId = brokerIdSet.last

  /**
    * Return a sequence id generated by updating the broker sequence id path in ZK.
    * Users can provide brokerId in the config. To avoid conflicts between ZK generated
    * sequence id and configured brokerId, we increment the generated sequence id by KafkaConfig.MaxReservedBrokerId.
    */
  private def generateBrokerId: Int = {
    try {
      zkClient.generateBrokerSequenceId() + config.maxReservedBrokerId
    } catch {
      case e: Exception =>
        error("Failed to generate broker.id due to ", e)
        throw new GenerateBrokerIdException("Failed to generate broker.id", e)
    }
  }
}

开启自动创建topic

通过KAFKA_AUTO_CREATE_TOPICS_ENABLE开启自动创建topic,会在producer发送消息时创建topic,但是不会指定partition和replica。需要关闭此配置,统一由运维人员审核创建。

镜像采用了latest标签

开发环境可以使用到最近的镜像,但是生产环境使用此tag,容易造成版本不一致。本次升级过程中遇到的问题,就是基于此,后面会详细介绍。

方案

  1. 制作Dockerfile,修改环境变量。
  2. 停止producer应用
  3. 观察consumer应用消费完成
  4. 部署broker2
  5. 修改测试topic的replica并同步数据
  6. 观察测试topic在broker2中的数据及zk节点元数据
  7. 测试producer及consumer
  8. 批量执行replica脚本
  9. 删除broker1数据并重新部署
  10. 启动应用

制作Dockerfile,修改环境变量。

FROM wurstmeister/kafka:latest

BROKER_ID_COMMAND="echo 2"
ENV KAFKA_MESSAGE_MAX_BYTES=21474836
ENV KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
ENV KAFKA_ZOOKEEPER_CONNECT=zk1:2181,zk2:2181,zk3:2181

VOLUME ["/kafka"]

停止producer应用 & 观察consumer应用消费完成

进入broker docker容器

docker exec -it kafka-broker1 bash

查看group列表

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看group的消费情况

kafka-consumer-groups.sh --bootstrap-server localhost:9092  --group group_name --describe

可以看到

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 6901 6901 0 consumer-1-80496f05-3be9-49c3-8b84-45c26f07a4aa /172.20.0.1 consumer-1 consumer-1

LAG = LOG-END-OFFSET - CURRENT-OFFSET,如果LAG=0,则表示消息已经消费完成。

部署broker2

可以将BROKER_ID_COMMAND等环境变量信息,通过 docker run -e的形式执行,本次通过Dockerfile设置,每个broker对应一个Dockerfile。

修改测试topic的replica并同步数据

通过kafka-reassign-partitions.sh脚本进行replica重分配,分配规则以json文件保存。

    "version":1,
    "partitions":
    [
        {
            "topic": "test.topic.1",
            "partition": 0,
            "replicas": [1001,2]
        },
        {
            "topic": "test.topic.1",
            "partition": 1,
            "replicas": [1001,2]
        }
    ]
}
/opt/kafka/bin/kafka-reassign-partitions.sh  --zookeeper  zk1:2181,zk2:2181,zk3:2181 --reassignment-json-file 
test.topic.1.json --execute

此时出现了故障,因为Dockerfile使用了lastest标签FROM wurstmeister/kafka:latest导致获取到的版本高于生产环境的broker版本,此时日志提示版本问题,无法同步,且kafka-reassign-partitions一直处于pending状态,无法再次执行。

There is an existing assignment running.

解决方案

查找kafka-reassign-partitions强制终止命令

查看源码ReassignPartitionsCommand,只找到verify,没有关闭操作。

删除zk对应节点

kafka-reassign-partitions指定了zk地址,查找在zk中保存的元数据信息,并删除。

kafka/core/src/main/scala/kafka/zk/ZkData.scala

object ReassignPartitionsZNode {
  def path = s"${AdminZNode.path}/reassign_partitions"
}

zk命令

delete /admin/reassign_partitions
修改dockerfile版本后重新执行kafka-reassign-partitions

再次执行kafka-reassign-partitions,很快执行完成,并在broker2中观察到对应数据。

批量执行replica脚本

批量脚本内容如下

#!/bin/bash
KFK_CMD='/opt/kafka/bin/kafka-reassign-partitions.sh  --zookeeper  zk1:2181,zk2:2181,zk3:2181 --reassignment-json-file'
CHK_CMD='/opt/kafka/bin/kafka-topics.sh --describe --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic '

topic_list='
test.topic.1
test.topic.2
test.topic.3
test.topic.4
test.topic.5
__consumer_offsets
'

for i in $topic_list
do
    $KFK_CMD $i.json --execute
    sleep 2
    $CHK_CMD i
done

需要提前准备好分配json,并已topic命名,如 __consumer_offsets.json

{
    "version":1,
    "partitions":
    [
        {
            "topic": "__consumer_offsets",
            "partition": 0,
            "replicas": [1001,2]
        },
        {
            "topic": "__consumer_offsets",
            "partition": 1,
            "replicas": [1001,2]
        }
    ]
}

删除broker1数据并重新部署

删除broker1镜像数据并重新制作Dockerfile升级。一定要删除镜像数据,否者会优先读取本地的meta.properties,环境变量不会生效。

结论

至此,docker下单节点kafka扩容有惊无险的完成了。

wangyuheng wechat
扫码关注我的公众号,获取更多文章推送