共计 5865 个字符,预计需要花费 15 分钟才能阅读完成。
Kafka 使用
前言
最近一直在准备秋招,在这周,我顺利拿到两家 offer,一家浩鲸(听说是阿里的外包大厂),一家北京中科全安(主要业务是网安),都远远达不到我的预期,本想着秋招冲大厂的,但是一直没有收到过大厂的面试邀请。于是我也不想在现在的实习公司干等着了,打算先找个北京的实习干着,到我后面毕业了也好跳槽找工作。
上面两家 offer 都是一把过的,浩鲸晚上面的,第二天上午 HR 电话打过来说面试过了;中科全安面了两场(技术面+场景面),当天晚上 HR 就打电话过来说面试过了。薪资方面,针对大三实习生的薪资感觉除了大厂外,普遍在 3k ~ 4k 之间。浩鲸:3.7k + 每天 10 块补助(记得不是很清楚),中科全安:4.3k + 每月 500 绩效 或 4.3k + 实习满 6 个月奖 2k(二选一)。
结合我个人未来规划,最终选择去中科全安实习。面试官跟我说实习的工作差不多也是写一些接口,然后 ES 和 Kafka 会用得比较多,之前在我学习微服务时,学过 ES 的使用,并且在学校的实训项目中使用 ES 做文章搜索,而 Kafka 之前一直没有接触过,只知道是用作消息队列,在大数据中用得比较多。因此在本周末,准备花几天时间学学 Kafka 怎么用。
消息队列应用
企业中比较常见的消息队列产品主要有:Kafka、ActiveMQ、RabbitMQ、RocketMQ 等。在大数据场景下主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。
传统的消息队列的主要应用场景包括:缓冲/消峰、解耦、异步通信。
缓冲/消峰
控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
解耦
允许我们独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
异步通信
允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。
之前在学校的企业实训中,我曾使用过 RabbitMQ 针对用户注册和邮箱验证码发送两个业务进行解耦。用户注册时需要发送验证码,当用户点击发送验证码时,用户注册业务会将需要发送验证码的消息发送到 RabbitMQ,然后直接响应发送成功。至于验证码最终有没有发送成功,是由验证码发送模块保证的,它既可能成功,也可能失败(在实际生活中,这是有体现的,比如我们注册时发送邮箱验证码,网站可能会提示: 若长时间没有接收到验证码,请重试),这就是两个业务模块直接异步通信的体现。
消息队列的模式
点对点模式
消费者主动拉取数据,消息接收后消除消息。
发布/订阅模式
- 可以有多个 topic 主题(浏览、点赞等);
- 消费者消费数据后,不删除数据;
- 每个消费者相互独立,都可以消费到数据;
Kafka 定义
传统定义
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
最新定义
Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
Kafka 基础架构
-
Producer(生产者): 消息生产者,将消息 push 到 Kafka 集群中的 Broker。
-
Consumer(消费者):消息消费者,从 Kafka 集群中 pull 消息,消费消息。
-
Consumer Group(消费组): 由一个或多个 Consumer 组成,每个 Consumer 都属于一个 Consumer Group。消费者组在逻辑上是一个订阅者。组内每个消费者负责消费不同分区的数据,一个分区只能由组内的一个消费者消费;消费组之间互不影响。
每条消息只能被消费组中的一个消费者消费,但是可以被多个消费组消费,这样就实现了单播和多播。
-
Broker(代理): 一台 Kafka 服务器就是一个 Broker,一个集群可以由多个 Brker 组成,每个 Broker 可以容纳多个 Topic.
-
Topic(主题): 消费的类别或者主题,逻辑上可以理解为队列。Producer 只关注 push 消息到哪个 Topic,Consumer 只关注订阅哪个 Topic 以获取消息。
-
Partition(分区):基于负载均衡与扩展性考虑,一个 Topic 可以分为多个 Partition,物理存储在 Kafka 集群中的多个 Broker 上。从可靠性上角度考虑,每个 Partition 都会有备份 Replica。
-
Replica(拷贝): Partition 的副本,为了保证集群中的某个节点发生故障时,该节点上的 Partition 数据不会丢失,且 Kafka 仍能正常工作,所以 Kafka 提供了副本机制,一个 Topic 的每个 Partition 都有若干个副本,一个 Leader 和若干个 Follower。
-
Leader:Replica 的主要角色,Producer 和 Consumer 只跟 Leader 交互。
-
Follower: Replica 的从角色,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会变为新的 Leader 跟 Producer 和 Consumer 交互。
-
Controller: Kafka 集群中的其中一台机器,用来进行 Leader election 以及各种 Failover(故障转移).
-
Zookeeper: Kafka 通过 Zookeeper 存储集群的 meta 等信息。
Kafka 安装
Kafka 安装一般是由架构师、运维等人安装的,开发人员无需知道 Kafka 怎么安装以及 Kafka 安装后配置一些什么东西(有追求的人可以学学),因此这里直接使用 Docker 安装单体应用:
version: "2"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.4
container_name: zookeeper
hostname: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:7.4.4
container_name: kafka
hostname: kafka
depends_on:
- zookeeper
ports:
- 29092:29092
- 9997:9997
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9997
KAFKA_JMX_HOSTNAME: kafka
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- 8080:8080
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
DYNAMIC_CONFIG_ENABLED: "true"
然后执行:docker compose up -d
即可启动容器。这些容器包括:zookeeper
、kafka
和kafka-ui
,其中kafka-ui
提供了一个用于操作 Kafka 的可视化页面,便于我们添加 topic,对 kafka 进行监控等。
容器启动完成后,访问localhost:8080
即可查看 Kafka 情况,并且可以在 topic 中手动添加一个 Topic。
Kafka 与 SpringBoot 集成
在 SpringBoot 中集成 Kafka,只需引入spring-kafka
即可。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
在 Spring Boot 中,不需要指定 spring-kafka 的版本,Spring-boot-parent 内部预先定义了 spring-kafka 适合的版本。
下面将演示简单的消息发送和消息接收程序。
配置文件
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
port: 8892
生产者
这里定义了一个 controller,当访问接口时,会生成一条消息并发送至 Kafka 中。
package com.example.kafkademo.producer;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Project: com.example.kafkademo.producer
* @Author: pgthinker
* @GitHub: https://github.com/ningning0111
* @Date: 2024/8/31 17:51
* @Description:
*/
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class SendMessageController {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String TOPIC = "test";
@GetMapping("/send")
public String sendMessage(String message){
kafkaTemplate.send(TOPIC, message);
return "发送成功";
}
}
和大多数组件一样,Spring 为我们提供了 Kafka 的一个抽象操作接口KafkaTemplate
,我们可以使用该接口对 Kafka 进行操作,其中该接口的send
方法就是向 Kafka 中发送消息,在发送时,需要指定该消息的 Topic。
消费者
package com.example.kafkademo.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @Project: com.example.kafkademo.consumer
* @Author: pgthinker
* @GitHub: https://github.com/ningning0111
* @Date: 2024/8/31 18:04
* @Description:
*/
@Component
public class ReceiverMessageService {
private final String TOPIC = "test";
@KafkaListener(topics = TOPIC, groupId = "consumer_group_1")
public void consumerTopicMessage(String message){
System.out.println("收到消息:"+message);
}
}
在 Spring 中,我们可以使用@KafkaListener
注解监听 Kafka 上发送过来的消息,在该注解中,需要指定关注的消息主题以及消费组 ID,消费组 ID 可以随便填写。消息在消费组中只会被一个消费者消费,多个消费组对消息的消费互不影响。