编译自官方文档。
下载最新版本(当前为 v3.3.1)的 Kafka 并解压:
$ tar -xzf kafka_2.13-3.3.1.tgz
$ cd kafka_2.13-3.3.1
注意:本地环境必须安装了 Java 8+。
Apache Kafka 可以配套使用 ZooKeeper 或者 KRaft 启动,请参考以下的 2.1 和 2.2 的其中任一小节步骤(二者选其一即可)配置以开始使用。
运行以下命令以按正确顺序启动所有服务:
# 启动 ZooKeeper 服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties
打开另一个终端会话并运行:
# 启动 Kafka Broker 服务
$ bin/kafka-server-start.sh config/server.properties
一旦所有服务都启动成功,你将拥有一个正在运行并可以使用的基本 Kafka 环境。
生成集群 UUID:
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
格式化日志目录:
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
启动 Kafka 服务:
$ bin/kafka-server-start.sh config/kraft/server.properties
Kafka 服务启动成功后,你将拥有一个正在运行并可以使用的基本 Kafka 环境。
Kafka 是一个分布式事件流平台,可让你跨多台计算机读取、写入、存储和处理事件(在文档中也称为 记录(Record) 或 消息(Message))。
示例事件包括支付交易、来自手机的地理位置更新、物流运输订单、来自物联网设备或医疗设备的传感器测量数据等等。这些事件统一被组织并存储在 主题 中。简单来说,主题就类似于文件系统中的文件夹,而事件则是该文件夹中的文件。
因此,在你编写第一个事件之前,你必须创建一个主题。打开另一个终端会话并运行:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Kafka 的所有命令行工具都有额外的选项:运行 kafka-topics.sh
不带任何参数的命令以显示使用信息。例如,它还可以向你显示新主题的分区计数等详细信息:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Kafka 客户端通过网络与 Kafka Broker 通信以读写事件。一旦收到,Broker 将以持久和容错的方式存储事件并保留一段时间(只要你需要,甚至可以一直保留)。
运行控制台生产者客户端以将一些事件写入你的主题。默认情况下,你输入的每一行都会导致一个单独的事件被写入主题中:
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
> This is my first event
> This is my second event
>
你可以随时按下 Ctrl + C
以停止生产者客户端。
打开另一个终端会话并运行控制台消费者客户端以读取你刚刚创建的事件:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
你可以随时按下 Ctrl + C
以停止消费者客户端。
然后你可以继续尝试:例如,切换回你的生产者终端(上一步)以编写其他事件,并查看这些事件如何立即显示在你的消费者终端中。
因为事件是持久存储在 Kafka 中的,所以它们可以被任意多次读取,并且可以被任意多的消费者读取。你可以通过打开另一个终端会话并再次重新运行之前的命令来轻松验证这一点。
你可能在关系型数据库或传统的消息传递系统等现有系统中拥有了大量数据,以及许多已经在使用这些系统的应用程序。 Kafka Connect 允许你不断地将数据从外部系统提取到 Kafka 中,反之亦然。它是一个运行着 连接器(Connector) 的可扩展工具,连接器实现了与外部系统交互的自定义逻辑,因此很容易将现有系统与 Kafka 进行集成。为了使这个过程更容易,现有数百个这样的连接器随时可用。
在本篇文章中,我们将了解到如何使用简单的连接器运行 Kafka Connect,这些连接器将数据从文件导入到 Kafka 主题中,再将数据从 Kafka 主题中导出到另一文件。
首先,确保已将 connect-file-3.3.1.jar
添加到 Connect Worker 配置中的 plugin.path
属性下。出于快速入门演示的目的,我们将使用相对路径并将连接器的包视为 超级 Jar 包(uber-jar),当从安装目录运行命令时,它会起作用。但是,值得注意的是,对于生产环境的部署,选择使用绝对路径的建议始终是可取的。有关如何设置此配置的详细说明,请参阅 plugin.path。
编辑 config/connect-standalone.properties
文件,添加或更改 plugin.path
与以下匹配的配置属性,然后保存文件:
> echo "plugin.path=libs/connect-file-3.3.1.jar"
然后,首先创建一些种子数据以方便进行测试:
> echo -e "foo\nbar" > test.txt
或者,在 Windows 系统上时:
> echo foo> test.txt
> echo bar>> test.txt
接下来,我们将启动两个以 单机(Standalone) 模式运行的连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,包含常见配置,例如要连接的 Kafka Broker 和数据的序列化格式等。其余配置文件分别指定要创建的连接器,这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置:
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
这些示例配置文件包含在 Kafka 中,使用你之前启动的默认本地集群配置并创建两个连接器:第一个是源连接器,它从输入文件中读取行并将每个行生成事件并导入到 Kafka 主题中,第二个是接收连接器,它从 Kafka 主题中读取消息并在输出文件中将每条消息生成为一行。
在启动过程中,你会看到许多日志消息,包括一些表明正在实例化连接器的消息。一旦 Kafka Connect 进程启动成功,源连接器应该开始从 test.txt
文件中读取每一行并将它们生成事件并导入到主题 connect-test
中,而接收连接器应该开始从主题 connect-test
中读取消息并将它们写入到文件 test.sink.txt
中。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道进行了传送:
> more test.sink.txt
foo
bar
请注意,数据存储在 Kafka 主题 connect-test
中,因此我们还可以运行控制台消费者来查看主题中的数据(或使用自定义消费者代码来处理它):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
连接器继续处理数据,因此我们可以将数据添加到文件中并查看它在管道中的移动结果:
> echo Another line>> test.txt
你应该可以看到该行出现在控制台消费者输出和接收器文件中。
一旦你的数据作为事件存储在 Kafka 中,你就可以使用适用于 Java/Scala 的 Kafka Streams 客户端库处理数据。它允许你实施任务关键型实时应用程序和微服务,其中输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 的服务器端集群技术的优势相结合,使这些应用程序具有高度的可扩展、弹性、容错和分布式的特性。该库支持精确一次(exactly-once)的语义处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。
为了给你一个初步的体验,下面演示如何实现流行的 WordCount
算法:
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Kafka Streams 示例和应用开发教程演示了如何从头到尾编写和运行此类流应用程序。
现在你已经完成了快速入门教程,可以随时删除 Kafka 环境,或者再继续尝试:
Ctrl + C
以停止生产者和消费者客户端(如果你还没有这样做的话);Ctrl + C
以停止 Kafka Broker;Ctrl + C
以停止 ZooKeeper 服务;如果你还想删除本地 Kafka 环境的任何数据,包括你在此过程中创建的任何事件,请运行以下命令:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs
你已成功完成了 Apache Kafka 的快速入门教程。
要了解更多信息,我们建议你可以执行以下后续步骤:
我们相信提高开发和团队协作的生产力可以帮助您产生更好的软件解决方案。这就是为什么 Visual Studio 版本控制团队发布了新特性,简化了内部循环和代码审查体验。