分布式消息队列服务kafka
Apache Kafka是一个开源分布式流媒体平台,它收集、处理、存储和管理不断流入 Kafka 服务器的实时数据。它是一组协同工作以存储和组织实时数据的集群。用户可以进一步访问此类实时无限数据,以构建数据驱动的应用程序。
Apache Kafka 也称为发布-订阅消息服务,因为用户可以发布和订阅 Kafka 服务器以执行各种与数据相关的操作。
由于如此高效的功能和特性,Kafka 被用于多种用例,包括流处理、实时分析、用户活动跟踪等。
参考资料
Kafka的主要特点
Apache Kafka非常受欢迎,因为它的功能包括确保正常运行时间、简化扩展以及允许它管理大量数据。
让我们看一下它提供的一些强大功能:
- 高可扩展性: Kafka 使用的分区日志模型将数据分布在多台服务器上,使其能够扩展到单个服务器的容量之外。
- 低延迟:Kafka 分离数据流,导致极低的延迟和巨大的吞吐量。
- 容错和耐用:数据写入磁盘,分区分布在多个服务器上并复制。这可以保护数据免受服务器故障的影响,并使其具有容错性和持久性。Kafka 集群可以处理主数据库和数据库中的故障。它能够自行重新启动服务器。
- 可扩展性:自从 Kafka 近几年大受欢迎以来,很多其他应用都构建了连接器。这可以在几秒钟内安装额外的功能,例如与其他系统的集成。
- 指标和监控: Kafka 是一种用于跟踪运营数据的流行工具。这需要从多个应用程序收集数据并将其整合到具有指标的集中式源中。
要了解有关如何在 Kafka 中分析数据的更多信息,您可以参考使用 Kafka Analytics 进行实时报告。
Apache Kafka拥有庞大的环境架构,包括生产者、代理、消费者和 Zookeeper。在 Kafka 架构中,Zookeeper 充当集中控制器,用于管理有关 Kafka 生产者、代理和消费者的所有元数据信息。但是,您可以在没有 Zookeeper 的情况下安装和运行 Kafka。在这种情况下,不是将所有元数据存储在 Zookeeper 中,而是将所有 Kafka 配置数据存储为 Kafka 本身内的一个单独分区。
Kafka+Kraft新模式
KRaft 控制器共同形成一个Kraft quorum,它存储有关 Kafka 集群的所有元数据信息。使用这种方法,您可以消除 Zookeeper 在 Kafka 环境架构中的依赖。此外,在没有 Zookeeper 的情况下运行 Kafka 时,您可以获得各种好处,例如消除系统复杂性和数据冗余。由于 Kafka 计划停止将 Zookeeper 作为集中配置服务,您将拥有一个简化的 Kafka 架构,而无需任何第三方服务依赖项。
Kafka 3.3.1 包含许多重要的新功能。以下是一些显着变化的摘要:
- KIP-833:将 KRaft 标记为生产就绪
- Apache Kafka 支持 Java 17
- 不再支持 Java 8 和 Scala 2.12 (>3.0.0版本)
Kafka的安装和测试
Kafka下载包
安装JDK 11
yum install -y java-11-openjdk-devel.x86_64
Kafka 解压缩
./gradlew jar -PscalaVersion=2.13.8
Kraft 生成不同的配置文件
node.id=1 controller.quorum.voters=1@localhost:19092,2@localhost:19093,3@localhost:19094 listeners=PLAINTEXT://:9092,CONTROLLER://:19092 advertised.listeners=PLAINTEXT://localhost:9092 log.dirs=/tmp/kraft-combined-logs-server1 process.roles=broker,controller inter.broker.listener.name=PLAINTEXT controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000
初始化集群
#!/bin/sh export KAFKA_HEAP_OPTS="-Xmx2G –Xms1G" DIR="/root/kafka-3.3.1-src" cd $DIR KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" for i in {1..3};do ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server$i.properties done for i in {1..3};do ./bin/kafka-server-start.sh config/kraft/server$i.properties done
创建和测试 topic
./bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 ./bin/kafka-topics.sh --list --bootstrap-server localhost:9094