IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka详解(原理、集群、API操作) -> 正文阅读

[大数据]kafka详解(原理、集群、API操作)

cd /export/servers
scp -r kafka/ node2:$PWD
scp -r kafka/ node3:$PWD

1. 消息队列(MQ)

1.1 什么是消息队列

消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术。消息队列,一般我们会简称它为MQ(Message Queue). 消息队列是一种帮助开发人员解决系统间异步通信的中间件,常用于解决系统解耦和请求的削峰平谷的问题。

队列(Queue):

????????Queue 是一种先进先出的数据结构,容器

?消息(Message):

????????不同应用之间传送的数据。

消息队列:

????????我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要 的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。队列 Queue 是一种先进先出的数据结构,所以消 费消息时也是按照顺序来消费的。比如生产者发送消息1,2,3...对于消费者就会按照1,2,3...的顺序来消费。

MQ方式

?

1.2 消息队列的应用场景

消息队列在实际应用中包括如下四个场景:

  • 1) 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
  • 2) 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
  • 3) 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;
  • 4) 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理

下面详细介绍上述四个场景以及消息队列如何在上述四个场景中使用

1.2.1 异步处理

具体场景:

????????用户为了使用某个应用,进行注册,系统需要发送注册邮件并验证短信。

????????对这两个子系统操作的处理方式有两种:串行及并行。

????????涉及到三个子系统:注册系统、邮件系统、短信系统

1)串行方式:新注册信息生成后,先发送注册邮件,再发送验证短信;

2) 并行处理:新注册信息写入后,由发短信和发邮件并行处理

在这种方式下,发短信和发邮件 需处理完成后再返回给客户端。

假设以上三个子系统处理的时间均为50ms,且不考虑网络延迟,则总的处理时间:

????????串行:50+50+50=150ms

????????并行:50+50 = 100ms

如果引入消息队列, 在来看整体的执行效率:

?????????在写入消息队列后立即返回成功给客户端,则总的响应时间依赖于写入消息队列的时间,而写入消息队列的时间本身是可以很快的,基本 可以忽略不计,因此总的处理时间为50ms,相比串行提高了2倍,相比并行提高了一倍;

1.2.2 应用耦合

具体场景:

????????用户使用QQ相册上传一张图片,人脸识别系统会对该图片进行人脸识别。

????????一般的做法是,服务器接收到图片后,图片上传系统立即调用人脸识别系统,

????????调用完成后再返回成功,如下图所示: 调用方式:webService、Http协议(HttpClient、RestTemplate)、Tcp协议(Dubbo)

?该方法有如下缺点:

1) 人脸识别系统被调失败,导致图片上传失败;

2) 延迟高,需要人脸识别系统处理完成后,再返回给客户端,即使用户并不需要立即知道结果;

3) 图片上传系统与人脸识别系统之间互相调用,需要做耦合;

若使用消息队列:

?客户端上传图片后,图片上传系统将图片信息批次写入消息队列,直接返回成功;

人脸识别系统则定时从消息队列中取数据,完成对新增图片的识别。

图片上传系统并不需要关心人脸识别系统是否对这些图片信息的处理、以及何时对这些图片信息进行处理。

事实上,由于用户并不需要立即知道人脸识别结果,人脸识别系统可以选择不同的调度策略,按照闲时、忙时、正常时间,对队列中的图 片信息进行处理。

1.2.3 限流削峰

具体场景:

????????购物网站开展秒杀活动,一般由于瞬时访问量过大,服务器接收过大,会导致流量暴增,相关系统无法处理请求甚至崩溃。

???????? 而加入消息队列后,系统可以从消息队列中取数据,相当于消息队列做了一次缓冲。

?该方法有如下优点

???????? 请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲,极大地减少了业务处理系统的压力;

????????队列长度可以做限制,事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信 息;

1.2.4 消息事件驱动的系统

具体场景:

????????用户新上传了一批照片 ---->人脸识别系统需要对这个用户的所有照片进行聚类 -------> 由对账系统重新生成用户的人脸索引(加快查 询)。

????????这三个子系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,后一个阶段从队列中获取消息继续处理。

?该方法有如下优点:

????????避免了直接调用下一个系统导致当前系统失败;

????????每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间段按不同处理速度处 理;

1.3 消息队列的两种模式

消息队列包括两种模式,点对点模式(point to point, queue)和发布/订阅模式(publish/subscribe,topic)

1.3.1 点对点模式

点对点模式下包括三个角色

  • 消息队列
  • 发送者 (生产者)
  • 接收者(消费者)

每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,可以放在内存 中也可以持久化,直到他们被消费或超 时。

?点对点模式特点:

  • 每个消息只有一个消费者,一旦被消费,消息就不再在消息队列中;
  • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
  • 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

1.3.2 发布/订阅模式

发布/订阅模式下包括三个角色:

  • 角色主题(Topic)
  • 发布者(Publisher)
  • 订阅者(Subscriber)

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被多个 订阅者消费。

?发布/订阅模式特点:

  • 每个消息可以有多个订阅者;
  • 发布者和订阅者之间有时间上的依赖性
  • 为了消费消息,订阅者必须保持在线运行。

1.4 消息队列实现机制

1.4.1 JMS

JMS(JAVA Message Service,Java消息服务)是一个Java平台中关于面向消息中间件的API 允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息

是一个消息服务的标准或者说是规范,是 Java 平台上有关面向消息中间件的技术规范

便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口,简化企业应用的开发。

JMS 消息机制主要分为两种模型:PTP 模型和 Pub/Sub 模型。

实现产品:Apache ActiveMQ

1.4.2 AMQP

????????AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开 放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条 件的限制。Erlang中的实现有RabbitMQ等。

1.4.2 JMS VS AMQP

?1.5常见的消息队列产品

?

综合上面的材料得出以下两点: (

1)中小型软件公司:

????????建议选RabbitMQ.一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。正所谓,成也萧何,败也萧何!他的 弊端也在这里,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸,RabbitMQ的社区十分活跃,可以解决 开发过程中遇到的bug,这点对于中小型公司来说十分重要。不考虑rocketmq和kafka的原因是,一方面中小型软件公司不如互联网公司, 数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除。不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里 放弃维护rocketmq,中小型公司一般抽不出人来进行rocketmq的定制化开发,因此不推荐。

(2)大型软件公司:

????????根据具体使用在rocketMq和kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针 对rocketMQ,大型软件公司也可以抽出人手对rocketMQ进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。至于kafka,根 据业务场景选择,如果有日志采集功能,肯定是首选kafka了。

2. kafka的基本介绍

2.1 什么是Kafka

官网:http://kafka.apache.org/

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做 MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

Kafka主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
    • 算法复杂度:时间复杂度和空间复杂度? ?
    • 以时间复杂度为O(1)的方式:常数时间运行和数据量的增长无关,假如操作一个链表,那么无论链表的大还是小,操作时间是一 样的
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
    • 支持普通服务器每秒百万级写入请求
    • Memory mapped Files
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展

2.2 kafka的特点

(1)解耦。Kafka具备消息系统的优点,只要生产者和消费者数据两端遵循接口约束,就可以自行扩展或修改数据处理的业务过程。

(2)高吞吐量、低延迟。即使在非常廉价的机器上,Kafka也能做到每秒处理几十万条消息,而它的延迟最低只有几毫秒。 (

3)持久性。Kafka可以将消息直接持久化在普通磁盘上,且磁盘读写性能优异。

(4)扩展性。Kafka集群支持热扩展,Kaka集群启动运行后,用户可以直接向集群添。

(5)容错性。Kafka会将数据备份到多台服务器节点中,即使Kafka集群中的某一台加新的Kafka服务节点宕机,也不会影响整个系统的功 能。

(6)支持多种客户端语言。Kafka支持Java、.NET、PHP、Python等多种语言。

(7) 支持多生产者和多消费者。

2.3 kafka的主要应用场景

3. kafka的架构

3.1 架构案例

?

?Kafka Cluster:由多个服务器组成。每个服务器单独的名字broker(掮客)。

kafka broker:kafka集群中包含的服务器

Kafka Producer:消息生产者、发布消息到 kafka 集群的终端或服务。

Kafka consumer:消息消费者、负责消费数据。

Kafka Topic: 主题,一类消息的名称。存储数据时将一类数据存放在某个topci下,消费数据也是消费一类数据。

订单系统:创建一个topic,叫做order。

用户系统:创建一个topic,叫做user。

商品系统:创建一个topic,叫做product。

注意:Kafka的元数据都是存放在zookeeper中。

3.2 架构剖析

kafka架构的内部细节剖析:

?

?说明:kafka支持消息持久化,消费端为拉模型来拉取数据,消费状态和订阅关系有客户端负责维护,消息消费完 后,不会立即删除,会保 留历史消息。因此支持多订阅时,消息只会存储一份就可以了。

  • Broker:kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker
  • Topic:每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic
  • Partition:分区,物理上的概念,每个topic包含一个或多个partition,一个partition对应一个文件夹,这个文件夹下存储partition的 数据和索引文件,每个partition内部是有序的

3.3 关系解释

Topic & Partition

  • Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。
  • Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
  • 一个topic为一类消息,每条消息必须指定一个topic。
  • 对于每一个topic, Kafka集群都会维持一个分区日志。
  • 如下图 每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。
  • 分区中的每一个记录都会分配一个id号来表示顺序,称之为offset,offset用来唯一的标识分区中每一条记录。

?

?在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置,偏移量由消费者所控制:通常在读取记录后,消费者会以线 性的方式增加偏移量 ,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从 而重新处理过去的数 据;也可以跳过最近的记录,从"现在"开始消费。

这些细节说明Kafka 消费者是非常廉价的—消费者的增加和减少,对集群或者其他消费者没有多大的影响。

4. kafka集群环境搭建

ZooKeeper 作为给分布式系统提供协调服务的工具被 kafka 所依赖。在分布式系统中,消费者需要知道有哪些生产者是可用的,而如果 每次消费者都需要和生产者建立连接并测试是否成功连接,那效率也太低了,显然是不可取的。而通过使用 ZooKeeper 协调服务,Kafka 就能将 Producer,Consumer,Broker 等结合在一起,同时借助 ZooKeeper,Kafka 就能够将所有组件在无状态的条件下建立起生产者和 消费者的订阅关系,实现负载均衡。

?4.1 准备工作

kafka集群所需包

链接:https://pan.baidu.com/s/1rX6LuEbRk6fAYxiZ107cSg?
提取码:27fh?
?

环境准备:

???????? 准备三台服务器, 安装jdk1.8 ,其中每一台虚拟机的hosts文件中都需要配置如下的内容

192.168.200.11 node1

192.168.200.12 node2

192.168.200.13 node3

1、修改IP地址

修改网卡配置文件 vi /etc/sysconfig/network-scripts/ifcfg-ens33

(1)bootproto=static,表示使用静态IP

(2)onboot=yes,表示将网卡设置为开机启用

(3)将原有的原有IP修改为192.168.200.11

注意: 修改ip地址后,还需修改ip映射与主机名
?

?(4)重启网络服务

service network restart

(5)安装目录创建

安装包存放的目录:/export/software
安装程序存放的目录:/export/servers
数据目录:/export/data
日志目录:/export/logs
创建各级目录命令:
mkdir -p /export/servers/
mkdir -p /export/software/
mkdir -p /export/data/
mkdir -p /export/logs/

(6)修改host

?(1)vi /etc/sysconfig/network
修改HOSTNAME=node1?
(2)vim /etc/hostname

填写主机名:node1

(3)vim /etc/hosts

执行命令”/etc/init.d/network restart“ 重启hosts;

执行命令”cat /etc/hosts“可以查看到hosts文件修改成功。

4.2 Zookeeper集群搭建

1. JDK安装(略)

2.Linux 安装Zookeeper,三台Linux都安装,以搭建Zookeeper集群

  • 上传zookeeper-3.4.14.tar.gz
  • 解压并配置zookeeper(配置data目录,集群节点)
# node1操作
# 解压到/opt目录
tar -zxf zookeeper-3.4.14.tar.gz -C /opt
# 配置
cd /opt/zookeeper-3.4.14/conf
# 配置文件重命名后生效
cp zoo_sample.cfg zoo.cfg
#编辑
vi zoo.cfg
# 设置数据目录
dataDir=/var/lagou/zookeeper/data
# 添加
server.1=node1:2881:3881
server.2=node2:2881:3881
server.3=node3:2881:3881
# 退出vim
mkdir -p /var/lagou/zookeeper/data
echo 1 > /var/lagou/zookeeper/data/myid
# 配置环境变量
vi /etc/profile
# 添加
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log
# 退出vim,让配置生效
source /etc/profile

node2 、node3 略...
zookeeper集群部署可查看??Zookeeper详解--(入门、原理及实战)_程序猿二鍋頭的博客-CSDN博客

4.3 下载安装包

中文网站: http://kafka.apachecn.org/

英文网站: http://kafka.apache.org/

4.4 上传安装包并解压

使用 ftp 将安装包上传至 /export/software
1) 切换目录上传安装包
cd /export/software

2) 解压安装包到指定目录下
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
cd /export/servers/

3) 重命名(由于名称太长)
mv kafka_2.11-1.0.0 kafka
?

4.5 修改kafka的核心配置文件

cd /export/servers/kafka/config/
vi server.properties
主要修改一下6个地方:
    1) broker.id 需要保证每一台kafka都有一个独立的broker
    2) log.dirs 数据存放的目录
    3) zookeeper.connect zookeeper的连接地址信息
    4) delete.topic.enable 是否直接删除topic
    5) host.name 主机的名称
    6) 修改: listeners=PLAINTEXT://node1:9092
#broker.id 标识了kafka集群中一个唯一broker。
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 存放生产者生产的数据 数据一般以topic的方式存放
log.dirs=/export/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# zk的信息
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node1

4.6 将配置好的kafka分发到其他二台主机

cd /export/servers
scp -r kafka/ node2:$PWD
scp -r kafka/ node3:$PWD

Linux scp 命令用于 Linux 之间复制文件和目录。

scp 是 secure copy 的缩写, scp 是 linux 系统下基于 ssh 登陆进行安全的远程文件拷贝命令。

  • 拷贝后, 需要修改每一台的broker.id 和 host.name和listeners

ip为11的服务器: broker.id=0 , host.name=node1 listeners=PLAINTEXT://node1:9092
ip为12的服务器: broker.id=1 , host.name=node2 listeners=PLAINTEXT://node2:9092
ip为13的服务器: broker.id=2 , host.name=node3 listeners=PLAINTEXT://node3:9092

  • 在每一台的服务器执行创建数据文件的命令

mkdir -p /export/data/kafka

4.7 启动集群

注意事项:在kafka启动前,一定要让zookeeper启动起来

cd /export/servers/kafka/bin


#前台启动
./kafka-server-start.sh /export/servers/kafka/config/server.properties


#后台启动
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &

?

注意:可以启动一台broker,单机版。也可以同时启动三台broker,组成一个kafka集群版
#kafka停止


./kafka-server-stop.sh

登录zookeeper: zkCli.sh

执行: ls /brokers/ids

4.8 安装kafka-manager (后续完善的,与前面IP有些不同,请自行更改)

链接:https://pan.baidu.com/s/1rX6LuEbRk6fAYxiZ107cSg?
提取码:27fh?

unzip -o kafka-manager-1.3.3.7.zip

cd kafka-manager-1.3.3.7

vi conf/application.conf

修改配置文件的下面一行

kafka-manager.zkhosts="IP:2181,IP2:2181"

basicAuthentication.enabled=true

?

启动kafka-manager

nohup bin/kafka-manager >/dev/null 2>&1 &

需要注意:

kafka-manager默认的端口是9000,可以在配置文件中修改

测试访问:http://192.168.40.171:9000/

?

4.9 Docker环境下的Kafka集群搭建

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-13 09:20:26  更:2021-09-13 09:22:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 13:43:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码