学学Kafka

265次阅读
没有评论

共计 5865 个字符,预计需要花费 15 分钟才能阅读完成。

内容目录

Kafka 使用

前言

  最近一直在准备秋招,在这周,我顺利拿到两家 offer,一家浩鲸(听说是阿里的外包大厂),一家北京中科全安(主要业务是网安),都远远达不到我的预期,本想着秋招冲大厂的,但是一直没有收到过大厂的面试邀请。于是我也不想在现在的实习公司干等着了,打算先找个北京的实习干着,到我后面毕业了也好跳槽找工作。
  上面两家 offer 都是一把过的,浩鲸晚上面的,第二天上午 HR 电话打过来说面试过了;中科全安面了两场(技术面+场景面),当天晚上 HR 就打电话过来说面试过了。薪资方面,针对大三实习生的薪资感觉除了大厂外,普遍在 3k ~ 4k 之间。浩鲸:3.7k + 每天 10 块补助(记得不是很清楚),中科全安:4.3k + 每月 500 绩效 或 4.3k + 实习满 6 个月奖 2k(二选一)。
  结合我个人未来规划,最终选择去中科全安实习。面试官跟我说实习的工作差不多也是写一些接口,然后 ES 和 Kafka 会用得比较多,之前在我学习微服务时,学过 ES 的使用,并且在学校的实训项目中使用 ES 做文章搜索,而 Kafka 之前一直没有接触过,只知道是用作消息队列,在大数据中用得比较多。因此在本周末,准备花几天时间学学 Kafka 怎么用。

消息队列应用

  企业中比较常见的消息队列产品主要有:Kafka、ActiveMQ、RabbitMQ、RocketMQ 等。在大数据场景下主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

  传统的消息队列的主要应用场景包括:缓冲/消峰、解耦、异步通信

缓冲/消峰

  控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

学学Kafka

解耦

  允许我们独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

学学Kafka

异步通信

  允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

学学Kafka

之前在学校的企业实训中,我曾使用过 RabbitMQ 针对用户注册和邮箱验证码发送两个业务进行解耦。用户注册时需要发送验证码,当用户点击发送验证码时,用户注册业务会将需要发送验证码的消息发送到 RabbitMQ,然后直接响应发送成功。至于验证码最终有没有发送成功,是由验证码发送模块保证的,它既可能成功,也可能失败(在实际生活中,这是有体现的,比如我们注册时发送邮箱验证码,网站可能会提示: 若长时间没有接收到验证码,请重试),这就是两个业务模块直接异步通信的体现。

消息队列的模式

点对点模式

  消费者主动拉取数据,消息接收后消除消息。

学学Kafka

发布/订阅模式

  • 可以有多个 topic 主题(浏览、点赞等);
  • 消费者消费数据后,不删除数据;
  • 每个消费者相互独立,都可以消费到数据;

学学Kafka

Kafka 定义

传统定义

  Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

最新定义

  Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

Kafka 基础架构

学学Kafka

  • Producer(生产者): 消息生产者,将消息 push 到 Kafka 集群中的 Broker。

  • Consumer(消费者):消息消费者,从 Kafka 集群中 pull 消息,消费消息。

  • Consumer Group(消费组): 由一个或多个 Consumer 组成,每个 Consumer 都属于一个 Consumer Group。消费者组在逻辑上是一个订阅者。组内每个消费者负责消费不同分区的数据,一个分区只能由组内的一个消费者消费;消费组之间互不影响。

    每条消息只能被消费组中的一个消费者消费,但是可以被多个消费组消费,这样就实现了单播和多播。

  • Broker(代理): 一台 Kafka 服务器就是一个 Broker,一个集群可以由多个 Brker 组成,每个 Broker 可以容纳多个 Topic.

  • Topic(主题): 消费的类别或者主题,逻辑上可以理解为队列。Producer 只关注 push 消息到哪个 Topic,Consumer 只关注订阅哪个 Topic 以获取消息。

  • Partition(分区):基于负载均衡与扩展性考虑,一个 Topic 可以分为多个 Partition,物理存储在 Kafka 集群中的多个 Broker 上。从可靠性上角度考虑,每个 Partition 都会有备份 Replica。

  • Replica(拷贝): Partition 的副本,为了保证集群中的某个节点发生故障时,该节点上的 Partition 数据不会丢失,且 Kafka 仍能正常工作,所以 Kafka 提供了副本机制,一个 Topic 的每个 Partition 都有若干个副本,一个 Leader 和若干个 Follower。

  • Leader:Replica 的主要角色,Producer 和 Consumer 只跟 Leader 交互。

  • Follower: Replica 的从角色,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会变为新的 Leader 跟 Producer 和 Consumer 交互。

  • Controller: Kafka 集群中的其中一台机器,用来进行 Leader election 以及各种 Failover(故障转移).

  • Zookeeper: Kafka 通过 Zookeeper 存储集群的 meta 等信息。

Kafka 安装

  Kafka 安装一般是由架构师、运维等人安装的,开发人员无需知道 Kafka 怎么安装以及 Kafka 安装后配置一些什么东西(有追求的人可以学学),因此这里直接使用 Docker 安装单体应用:

version: "2"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.4
    container_name: zookeeper
    hostname: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:7.4.4
    container_name: kafka
    hostname: kafka
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
      - 9997:9997
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_HOSTNAME: kafka

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - 8080:8080
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
      DYNAMIC_CONFIG_ENABLED: "true"

  然后执行:docker compose up -d即可启动容器。这些容器包括:zookeeperkafkakafka-ui,其中kafka-ui提供了一个用于操作 Kafka 的可视化页面,便于我们添加 topic,对 kafka 进行监控等。

  容器启动完成后,访问localhost:8080即可查看 Kafka 情况,并且可以在 topic 中手动添加一个 Topic。

学学Kafka

学学Kafka

Kafka 与 SpringBoot 集成

  在 SpringBoot 中集成 Kafka,只需引入spring-kafka即可。

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

在 Spring Boot 中,不需要指定 spring-kafka 的版本,Spring-boot-parent 内部预先定义了 spring-kafka 适合的版本。

  下面将演示简单的消息发送和消息接收程序。

配置文件

spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

server:
  port: 8892

生产者

 这里定义了一个 controller,当访问接口时,会生成一条消息并发送至 Kafka 中。

package com.example.kafkademo.producer;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Project: com.example.kafkademo.producer
 * @Author: pgthinker
 * @GitHub: https://github.com/ningning0111
 * @Date: 2024/8/31 17:51
 * @Description:
 */
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class SendMessageController {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final String TOPIC = "test";

    @GetMapping("/send")
    public String sendMessage(String message){
        kafkaTemplate.send(TOPIC, message);
        return "发送成功";
    }
}

  和大多数组件一样,Spring 为我们提供了 Kafka 的一个抽象操作接口KafkaTemplate,我们可以使用该接口对 Kafka 进行操作,其中该接口的send方法就是向 Kafka 中发送消息,在发送时,需要指定该消息的 Topic。

消费者

package com.example.kafkademo.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Project: com.example.kafkademo.consumer
 * @Author: pgthinker
 * @GitHub: https://github.com/ningning0111
 * @Date: 2024/8/31 18:04
 * @Description:
 */
@Component
public class ReceiverMessageService {
    private final String TOPIC = "test";

    @KafkaListener(topics = TOPIC, groupId = "consumer_group_1")
    public void consumerTopicMessage(String message){
        System.out.println("收到消息:"+message);
    }
}

  在 Spring 中,我们可以使用@KafkaListener注解监听 Kafka 上发送过来的消息,在该注解中,需要指定关注的消息主题以及消费组 ID,消费组 ID 可以随便填写。消息在消费组中只会被一个消费者消费,多个消费组对消息的消费互不影响。

测试

学学Kafka

正文完
 
PG Thinker
版权声明:本站原创文章,由 PG Thinker 2024-08-31发表,共计5865字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
热评文章
以太坊网站导航

以太坊网站导航

以太坊网站导航