Skip to content
<

Go 操作 Kafka 之 kafka-go

Kafka 是一种高吞吐量的分布式发布订阅消息系统,本文介绍了如何使用 kafka-go 这个库实现 Go 语言与 kafka 的交互。

Go 社区中目前有三个比较常用的 kafka 客户端库,它们各有特点。

首先是IBM/sarama(这个库已经由 Shopify 转给了 IBM),之前 liwenzhou 写过一篇使用[sarama 操作 Kafka](./08-07Go 操作 kafka 之 sarama)的教程,相较于 sarama,kafka-go更简单、更易用。

segmentio/kafka-go是纯 Go 实现,提供了与 kafka 交互的低级别和高级别两套 API,同时也支持 Context。

此外社区中另一个比较常用的confluentinc/confluent-kafka-go,他是一个基于 cgo 的librdkafka包装,在项目中使用它会引入对 C 库的依赖。

准备 Kafka 环境

这里推荐使用 Docker Compose 快速搭建一套本地开发环境。

以下docker-compose.yml文件用来搭建一套单节点 zookeeper 和单节点 kafka 环境,并且在8080端口提供kafka-ui管理界面。

yaml
version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    depends_on:
      - kafka1
    environment:
      DYNAMIC_CONFIG_ENABLED: "TRUE"

参考资料

将上述docker-compose.yml文件在本地保存,在同一目录下执行以下命令启动容器。

sh
docker-compose up -d

容器启动后,使用浏览器打开127.0.0.1:8080即可看到如下kafka-ui界面。

点击页面右侧的"Configure new cluster"按钮,配置 kafka 服务连接信息。

填写完信息后,点击页面下方的"Submit"按钮提交即可。

安装 kafka-go

执行以下命令下载kafka-go依赖。

sh
go get github.com/segmentio/kafka-go

注意:kafka-go 需要 Go 1.15或更高版本。

kafka-go 使用指南

kafka-go提供了两套与 Kafka 交互的 API。

  • 低级别(low-level):基于与 Kafka 服务器的原始网络连接实现。
  • 高级别(high-level):对于常用读写操作封装了一套更易用的 API。

通常建议直接使用高级别的交互 API。

Connection

Conn类型是kafka-go包的核心。它代表与 Kafka broker 之间的连接。基于它实现了一套与 Kafka 交互的低级别 API。

发送消息

下面是连接至 Kafka 之后,使用 Conn 发送消息的代码示例。

go
// writeByConn 基于 Conn 发送消息
func writeByConn() {
    topic := "my-topic"
	partition := 0

	// 连接至 Kafka 集群的 Leader 节点
	conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
	if err != nil {
        log.Fatal("failed to dial leader:", err)
	}

	// 设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

	// 发送消息
	_, err = conn.WriteMessages(
        kafka.Message{Value: []byte("one!")},
		kafka.Message{Value: []byte("two!")},
		kafka.Message{Value: []byte("three!")},
	)
	if err != nil {
        log.Fatal("failed to write messages:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
        log.Fatal("failed to close writer:", err)
	}
}

消费消息

go
// readByConn 连接至 kafka 后接收消息
func readByConn() {
	// 指定要连接的 topic 和 partition
	topic := "my-topic"
	partition := 0

	// 连接至 Kafka 的 leader 节点
	conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置读取超时时间
	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	// 读取一批消息,得到的 batch 是一系列消息的迭代器
	batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

	// 遍历读取消息
	b := make([]byte, 10e3) // 10KB max per message
	for {
		n, err := batch.Read(b)
		if err != nil {
			break
		}
		fmt.Println(string(b[:n]))
	}

	// 关闭 batch
	if err := batch.Close(); err != nil {
		log.Fatal("failed to close batch:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close connection:", err)
	}
}

使用batch.Read更高效一些,但是需要根据消息长度选择合适的 buffer(上述代码中的 b),如果传入的 buffer 太小(消息装不下)就会返回io.ErrShortBuffer错误。

如果不考虑内存分配的效率问题,也可以按以下代码使用batch.ReadMessage读取消息。

go
for {
  msg, err := batch.ReadMessage()
  if err != nil {
    break
  }
  fmt.Println(string(msg.Value))
}

创建 topic

当 Kafka 关闭自动创建 topic 的设置时,可按如下方式创建 topic。

go
// createTopicByConn 创建 topic
func createTopicByConn() {
	// 指定要创建的 topic 名称
	topic := "my-topic"

	// 连接至任意 kafka 节点
	conn, err := kafka.Dial("tcp", "localhost:9092")
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	// 获取当前控制节点信息
	controller, err := conn.Controller()
	if err != nil {
		panic(err.Error())
	}
	var controllerConn *kafka.Conn
	// 连接至 leader 节点
	controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		panic(err.Error())
	}
	defer controllerConn.Close()

	topicConfigs := []kafka.TopicConfig{
		{
			Topic:             topic,
			NumPartitions:     1,
			ReplicationFactor: 1,
		},
	}

	// 创建 topic
	err = controllerConn.CreateTopics(topicConfigs...)
	if err != nil {
		panic(err.Error())
	}
}

通过非 leader 节点连接 leader 节点

下面的示例代码演示了如何通过已有的非 leader 节点的 Conn,连接至 leader 节点。

go
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()
// 获取当前控制节点信息
controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer connLeader.Close()

获取 topic 列表

go
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

partitions, err := conn.ReadPartitions()
if err != nil {
    panic(err.Error())
}

m := map[string]struct{}{}
// 遍历所有分区取 topic
for _, p := range partitions {
    m[p.Topic] = struct{}{}
}
for k := range m {
    fmt.Println(k)
}

Reader

Reader是由kafka-go包提供的另一个概念,对于从单个主题-分区(topic-partition)消费消息这种典型场景,使用它能够简化代码。Reader 还实现了自动重连和偏移量管理,并支持使用 Context 支持异步取消和超时的 API。

当进程退出时,必须在Reader上调用Close()。Kafka 服务器需要一个优雅的断开连接来组织它继续尝试向已连接的客户端发送消息。如果进程使用 SIGINT(shell 中的 Ctrl-C)或 SIGTERM(如 docker stop 或 kubernetes start)终止,那么下面给出的示例不会调用Close()。当同一 topic 上有新 Reader 连接时,可能导致延迟(例如,新进程启动或新容器运行)。在这种场景下应使用signal.Notify处理程序在进程关闭时关闭 Reader。

消费消息

下面的代码演示了如何使用 Reader 连接 Kafka 消费消息。

go
// readByReader 通过 Reader 接收消息
func readByReader() {
	// 创建 Reader
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
		Topic:     "topic-A",
		Partition: 0,
		MaxBytes:  10e6, // 10MB
	})
	r.SetOffset(42) // 设置 Offset

	// 接收消息
	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
	}

	// 程序退出前关闭 Reader
	if err := r.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

消费者组

kafka-go支持消费者组,包括 broker 管理的 offset。要启用消费者组,只需在ReaderConfig中指定GroupID

使用消费者组时,ReadMessage 会自动提交偏移量。

go
// 创建一个 reader,指定 GroupID,从 topic-A 消费消息
r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:  []string{"localhost:9092", "localhost:9093", "localhost:9094"},
	GroupID:  "consumer-group-id", // 指定消费者组 id
	Topic:    "topic-A",
	MaxBytes: 10e6, // 10MB
})

// 接收消息
for {
	m, err := r.ReadMessage(context.Background())
	if err != nil {
		break
	}
	fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

// 程序退出前关闭 Reader
if err := r.Close(); err != nil {
	log.Fatal("failed to close reader:", err)
}

在使用消费者组时会有以下限制:

  • (*Reader).SetOffset当设置了 GroupID 时会返回错误
  • (*Reader).Offset当设置了 GroupID 时会永远返回-1
  • (*Reader).Lag当设置了 GroupID 时会永远返回-1
  • (*Reader).ReadLag当设置了 GroupID 时会返回错误
  • (*Reader).Stats当设置了 GroupID 时会返回一个-1的分区

显式提交

kafka-go也支持显式提交。当需要显式提交时不要调用ReadMessage,而是调用FetchMessage获取消息,然后调用CommitMessages显式提交。

go
ctx := context.Background()
for {
    // 获取消息
    m, err := r.FetchMessage(ctx)
    if err != nil {
        break
    }
    // 处理消息
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
    // 显式提交
    if err := r.CommitMessages(ctx, m); err != nil {
        log.Fatal("failed to commit messages:", err)
    }
}

在消费者组中提交消息时,具有给定主题/分区的最大偏移量的消息确定该分区的提交偏移量的值。例如,如果通过调用FetchMessage获取了单个分区的偏移量为1、2和3的消息,则使用偏移量为3的消息调用CommitMessages也将导致该分区的偏移量为1和2的消息被提交。

管理提交间隔

默认情况下,调用CommitMessages将同步向 Kafka 提交偏移量。为了提高性能,可以在 ReaderConfig 中设置 CommitInterval 来定期向 Kafka 提交偏移。

go
// 创建一个 reader 从 topic-A 消费消息
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    MaxBytes:       10e6, // 10MB
    CommitInterval: time.Second, // 每秒刷新一次提交给 Kafka
})

Writer

向 Kafka 发送消息,除了使用基于Conn的低级 API,kafka-go包还提供了更高级别的 Writer 类型。大多数情况下使用Writer即可满足条件,它支持以下特性。

  • 对错误进行自动重试和重新连接。
  • 在可用分区之间可配置的消息分布。
  • 向 Kafka 同步或异步写入消息。
  • 使用 Context 的异步取消。
  • 关闭时清楚挂起的消息以支持正常关闭。
  • 在发布消息之前自动创建不存在的 topic。

发送消息

go
// 创建一个 writer 向 topic-A 发送消息
w := &kafka.Writer{
	Addr:         kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:        "topic-A",
	Balancer:     &kafka.LeastBytes{}, // 指定分区的 balancer 模式为最小字节分布
	RequiredAcks: kafka.RequireAll,    // ack 模式
	Async:        true,                // 异步
}

err := w.WriteMessages(context.Background(),
	kafka.Message{
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

创建不存在的 topic

如果给 Writer 配置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的 topic 时,则会自动创建 topic。

go
w := &Writer{
    Addr:                   kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    Topic:                  "topic-A",
    AllowAutoTopicCreation: true,  // 自动创建 topic
}

messages := []kafka.Message{
    {
        Key:   []byte("Key-A"),
        Value: []byte("Hello World!"),
    },
    {
        Key:   []byte("Key-B"),
        Value: []byte("One!"),
    },
    {
        Key:   []byte("Key-C"),
        Value: []byte("Two!"),
    },
}

var err error
const retries = 3
// 重试3次
for i := 0; i < retries; i++ {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    err = w.WriteMessages(ctx, messages...)
    if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
        time.Sleep(time.Millisecond * 250)
        continue
    }

    if err != nil {
        log.Fatalf("unexpected error %v", err)
    }
    break
}

// 关闭 Writer
if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

写入多个 topic

通常,WriterConfig.Topic用于初始化单个 topic 的 Writer。通过去掉 WriterConfig 中的 Topic 配置,分别设置每条消息的message.topic,可以实现将消息发送至多个 topic。

go
w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    // 注意: 当此处不设置 Topic 时,后续的每条消息都需要指定 Topic
	Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
    // 注意: 每条消息都需要指定一个 Topic, 否则就会报错
	kafka.Message{
        Topic: "topic-A",
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
        Topic: "topic-B",
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
        Topic: "topic-C",
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

注意:Writer 中的 Topic 和 Message 中的 Topic 是互斥的,同一时刻有且只能设置一处。

其他配置

TLS

对于基本的 Conn 类型或在 Reader/Writer 配置中,可以在 Dialer 中设置 TLS 选项。如果 TLS 字段为空,则它将不启用 TLS 连接。

注意:不在 Conn/Reader/Writer 上配置 TLS,连接到启用 TLS 的 Kafka 集群,可能会出现io.ErrUnexpectedEOF错误。

Connection
go
dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},  // 指定 TLS 配置
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
go
dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},  // 指定 TLS 配置
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})
Writer

创建Writer时可以按如下方式指定 TLS 配置。

go
w := kafka.Writer{
    Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), 
    Topic:   "topic-A",
    Balancer: &kafka.Hash{},
    Transport: &kafka.Transport{
        TLS: &tls.Config{},  // 指定 TLS 配置
      },
    }

SASL

可以在Dialer上指定一个选项以使用 SASL 身份验证。Dialer可以直接用来打开一个 Conn,也可以通过它们各自的配置传递给一个ReaderWriter。如果SASLMechanism字段为 nil,则不会使用 SASL 进行身份验证。

SASL 身份验证类型
明文
go
mechanism := plain.Mechanism{
    Username: "username",
    Password: "password",
}
SCRAM
go
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}
Connection
go
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
go
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092","localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})
Writer
go
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

// Transport 负责管理连接池和其他资源,
// 通常最好的使用方式是创建后在应用程序中共享使用它们。
sharedTransport := &kafka.Transport{
    SASL: mechanism,
}

w := kafka.Writer{
	Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:     "topic-A",
	Balancer:  &kafka.Hash{},
	Transport: sharedTransport,
}
Client
go
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

// Transport 负责管理连接池和其他资源,
// 通常最好的使用方式是创建后在应用程序中共享使用它们。
sharedTransport := &kafka.Transport{
    SASL: mechanism,
}

client := &kafka.Client{
    Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    Timeout:   10 * time.Second,
    Transport: sharedTransport,
}

Balancer

kafka-go实现了多种负载均衡策略。特别是当你从其他 Kafka 库迁移过来时,你可以按如下说明选择合适的 Balancer 实现。

Sarama

如果从 sarama 切换过来,并且需要/希望使用相同的算法进行消息分区,则可以使用kafka.Hashkafka.ReferenceHash

  • kafka.Hash = sarama.NewHashPartitioner
  • kafka.ReferenceHash = sarama.NewReferenceHashPartitioner
go
w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:    "topic-A",
	Balancer: &kafka.Hash{},
}
librdkafka 和 confluent-kafka-go

kafka.CRC32Balancerlibrdkafka默认的consistent_random策略表现一致。

go
w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:    "topic-A",
	Balancer: kafka.CRC32Balancer{},
}
Java

使用kafka.Murmur2Balancer可以获得与默认 Java 客户端相同的策略。

go
w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:    "topic-A",
	Balancer: kafka.Murmur2Balancer{},
}

Compression

可以通过设置Compression字段在 Writer 上启用压缩:

go
w := &kafka.Writer{
	Addr:        kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:       "topic-A",
	Compression: kafka.Snappy,
}

Reader将通过检查消息属性来确定消费的消息是否被压缩。

Logging

想要记录 Reader/Writer 类型的操作,可以在创建时配置日志记录器。

kafka-go 中的Logger是一个接口类型。

go
type Logger interface {
	Printf(string, ...interface{})
}

并且提供了一个LoggerFunc类型,帮我们实现了Logger接口。

go
type LoggerFunc func(string, ...interface{})

func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }
Reader

借助kafka.LoggerFunc我们可以自定义一个Logger

go
// 自定义一个 Logger
func logf(msg string, a ...interface{}) {
	fmt.Printf(msg, a...)
	fmt.Println()
}

r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:     []string{"localhost:9092", "localhost:9093", "localhost:9094"},
	Topic:       "q1mi-topic",
	Partition:   0,
	Logger:      kafka.LoggerFunc(logf),
	ErrorLogger: kafka.LoggerFunc(logf),
})
Writer

也可以直接使用第三方日志库,例如下面示例代码中使用了 zap 日志库。

go
w := &kafka.Writer{
	Addr:        kafka.TCP("localhost:9092"),
	Topic:       "q1mi-topic",
	Logger:      kafka.LoggerFunc(zap.NewExample().Sugar().Infof),
	ErrorLogger: kafka.LoggerFunc(zap.NewExample().Sugar().Errorf),
}