IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 消息队列中间件之Kafka(一) -> 正文阅读

[大数据]消息队列中间件之Kafka(一)

一、消息队列

1.概述

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

2.应用场景

以下介绍消息队列在实际应用中常用的异步处理、应用解耦、流量削峰、日志处理等场景。

2.1 异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种

  1. 串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
    在这里插入图片描述
  2. 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

在这里插入图片描述
假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑进行异步处理。改造后的架构如下:
在这里插入图片描述

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

2.2 应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:
在这里插入图片描述

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

如何解决以上问题呢?引入应用消息队列后的方案,如下图:
在这里插入图片描述

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

2.3 流量削锋

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用
在这里插入图片描述
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理

2.4 日志处理

2.4日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下
在这里插入图片描述
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
Kafka消息队列,负责日志数据的接收,存储和转发
日志处理应用:订阅并消费kafka队列中的日志数据

3.JMS消息服务

JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

3.1 消息模型

在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

P2P模式

  • P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)

  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列

  • 接收者在成功接收消息之后需向队列应答成功

  • 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。

Pub/Sub模式

  • 包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

  • 每个消息可以有多个消费者

  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息

  • 为了消费消息,订阅者必须保持运行的状态

  • 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

  • 如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

3.2 消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
(1)同步
订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;

(2)异步
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

二、Kafka

1.概述

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。(文件追加的方式写入数据,过期的数据定期删除)
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息
  • 支持通过Kafka服务器和消费机集群来分区消息
  • 支持Hadoop并行数据加载
  • 一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用。

2.Kafka相关概念

  • Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  • Partition
    Parition是物理上的概念,每个Topic包含一个或多个Partition.
  • Producer
    负责发布消息到Kafka broker
  • Consumer
    消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

具体流程如下图:
在这里插入图片描述

三、Docker环境下Zookeeper集群环境搭建

1.概述

ZooKeeper 是一个开源的分布式协调服务,是 Hadoop,HBase 和其他分布式框架使用的有组织服务的标准。

分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。

2.Docker 配置

在/mydata/zookeeper路径下创建docker-compose-zookeeper-cluster.yml文件

version: '3.3'

networks:
  docker_net:
    external: true

services:
  zoo1:
    image: zookeeper
    restart: unless-stopped
    hostname: zoo1
    container_name: zoo1
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - ./zoo1/data:/data
      - ./zoo1/datalog:/datalog
    networks:
      - docker_net

  zoo2:
    image: zookeeper
    restart: unless-stopped
    hostname: zoo2
    container_name: zoo2
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - ./zoo2/data:/data
      - ./zoo2/datalog:/datalog
    networks:
      - docker_net

  zoo3:
    image: zookeeper
    restart: unless-stopped
    hostname: zoo3
    container_name: zoo3
    ports:
      - 2184:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
    volumes:
      - ./zoo3/data:/data
      - ./zoo3/datalog:/datalog
    networks:
      - docker_net

3.启动集群

docker-compose -f docker-compose-zookeeper-cluster.yml up -d

在这里插入图片描述

4.查看信息

1.分别查看各个节点的角色
在这里插入图片描述
由上结果可知,zoo2 是 leader。负责集群的读写。

2.查看 zoo2 选举数据
在这里插入图片描述

5.选举演练

1.模拟 Leader 掉线,发现zoo3已选举为Leader。
在这里插入图片描述
2. zoo2 节点重新上线,恢复为follow节点
在这里插入图片描述

6.常用操作

  • 查看文件目录 ls /
  • 创建顺序节点 create -s /zk-test 123
  • 创建临时节点 create -e /zk-temp 123 当会话过期或客户端断开连接时,临时节点将被自动删除
  • 创建永久节点 create /zk-permanent 123
  • 读取节点 get /
  • 更新节点 set /zk-permanent 456
  • 检查节点状态 stat /zk-permanent
  • 删除节点 rmr /zk-permanent

四、Docker环境下Kafka集群环境搭建

1.在/mydata/kafka目录下创建docker-compose-kafka-cluster.yml文件

version: '3.3'

networks:
  docker_net:
    external: true

services:
  kafka1:
    image: wurstmeister/kafka
    restart: unless-stopped
    container_name: kafka1
    ports:
      - "9093:9092"
    external_links:
      - zoo1
      - zoo2
      - zoo3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_HOST_NAME: 47.94.93.93                   ## 修改:宿主机IP
      KAFKA_ADVERTISED_PORT: 9093                                 ## 修改:宿主机映射port
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://47.94.93.93:9093    ## 绑定发布订阅的端口。修改:宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: "47.94.93.93:2182,47.94.93.93:2183,47.94.93.93:2184"
    volumes:
      - "./kafka1/docker.sock:/var/run/docker.sock"
      - "./kafka1/data/:/kafka"
    networks:
      - docker_net

  kafka2:
    image: wurstmeister/kafka
    restart: unless-stopped
    container_name: kafka2
    ports:
      - "9094:9092"
    external_links:
      - zoo1
      - zoo2
      - zoo3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_HOST_NAME: 47.94.93.93                 ## 修改:宿主机IP
      KAFKA_ADVERTISED_PORT: 9094                               ## 修改:宿主机映射port
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://47.94.93.93:9094   ## 修改:宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: "47.94.93.93:2182,47.94.93.93:2183,47.94.93.93:2184"
    volumes:
      - "./kafka2/docker.sock:/var/run/docker.sock"
      - "./kafka2/data/:/kafka"
    networks:
      - docker_net

  kafka3:
    image: wurstmeister/kafka
    restart: unless-stopped
    container_name: kafka3
    ports:
      - "9095:9092"
    external_links:
      - zoo1
      - zoo2
      - zoo3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_HOST_NAME: 47.94.93.93                 ## 修改:宿主机IP
      KAFKA_ADVERTISED_PORT: 9095                              ## 修改:宿主机映射port
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://47.94.93.93:9095   ## 修改:宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: "47.94.93.93:2182,47.94.93.93:2183,47.94.93.93:2184"
    volumes:
      - "./kafka3/docker.sock:/var/run/docker.sock"
      - "./kafka3/data/:/kafka"
    networks:
      - docker_net

  kafka-manager:
    image: sheepkiller/kafka-manager:latest
    restart: unless-stopped
    container_name: kafka-manager
    hostname: kafka-manager
    ports:
      - "9000:9000"
    links:            # 连接本compose文件创建的container
      - kafka1
      - kafka2
      - kafka3
    external_links:   # 连接本compose文件以外的container
      - zoo1
      - zoo2
      - zoo3
    environment:
      ZK_HOSTS: 47.94.93.93:2182,47.94.93.93:2183,47.94.93.93:2184                 ## 修改:宿主机IP
      TZ: CST-8
    networks:
      - docker_net

启动kafka集群

docker-compose -f docker-compose-kafka-cluster.yml up -d

在这里插入图片描述

2.在上面启动时,也启动了可视化管理模块kafka-manager,因为 kafka 的元数据、配置信息由 Zookeeper 管理,这里我们在 UI 页面做下相关配置。

  1. 访问 http:47.94.93.93:9000,按图示添加相关配置
    在这里插入图片描述
    2.配置后我们可以看到默认有一个 topic(__consumer_offsets),3 个 brokers。该 topic 分 50 个 partition,用于记录 kafka 的消费偏移量。(其他topic都为后续生成的)
    在这里插入图片描述
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 12:48:25  更:2021-07-30 12:50:38 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 11:46:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码