定义
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
目标
- 解决什么问题?
- 瓶颈及限制:
- 优化及参数配置:???
- 安装、启动:
- 单机版
- 集群版
使用消息队列的好处
- 解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 - 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 - 缓冲
有助于控制和优化数据流经过系统的速度, 解决生产消息和消费消息的处理速度不一致的情况。 - 灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 - 异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
概念
- Producer 消息生产者,就是向 kafka broker发消息的客户端
- Consumer 消息消费者,向 kafka broker取消息的客户端
- Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- Broker 一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个topic。
- Topic 可以理解为一个队列,生产者和消费者面向的都是一个 topic
- Partition 为了实现扩展性,一个非常大的 topic可以分布到多个broker(即服务器)上,一个topic可以分为多个 partition,每个partition是一个有序的队列;
- Replica 副本,为保证集群中的某个节点发生故障时, 该节点上的partition 数据不丢失,且kafka仍然能够继续工作, kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
- leader 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
- follower 每个分区多个副本中的“从”,实时从 leader中同步数据,保持和 leader数据的同步。leader发生故障时,某个 follower会成为新的 follower。
架构图
安装
安装过程:
解压
tar zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module修改配置
vi server.properties- broker.id=XXX (修改成自己ip的最后一个数字)
- delete.topic.enable=true (修改成true)
- log.dirs=/opt/data/kafka_2.11-0.11.0.0-logs (kafka数据目录)
- zookeeper.connect=h101:2181,h102:2181,h103:2181 (配置zookeeper)
启动
/opt/module/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-0.11.0.0/config/server.properties-
单机版
- tar.gz
- docker spotify/kafka
- docker-compose
`text
version: “3”
services:
zookeeper1:
image: wurstmeister/zookeeper
container_name: zookeeper1
ports:
- "2181:2181"
environment:
- TZ=Asia/Shanghai
- LANG=en_US.utf8 kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- TZ=Asia/Shanghai
- LANG=en_US.utf8
- KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181
- KAFKA_ADVERTISED_HOST_NAME=192.168.0.204
- KAFKA_ADVERTISED_PORT=9092`
- 集群版
命令行连接
创建topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
查看topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092- 查看topic的信息
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topic` - 删除topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test-topic
生产消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
修改分区数
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test1 --partitions 8–replication-factor 2
数据丢失问题
?
java代码连接(作为生产者与消费者)
- 直接使用kafka-client
- 集成到springboot中
如下使用的kafka的版本为kafka_2.11-0.10.2.2。
- 创建topic,名称是test-topic
bin/kafka-topics.sh --zookeeper zookeeper1:2181 --create --topic test-topic --partitions 1 --replication-factor 1
- 查看topic列表
bin/kafka-topics.sh --zookeeper zookeeper1:2181 --list
- 查看某个topic的信息
bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe --topic test-topic
往指定topic生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic从指定topic消费数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
最后编辑:张三 更新时间:2026-03-12 12:10