| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Apache Pulsar基本理论 -> 正文阅读 |
|
[大数据]Apache Pulsar基本理论 |
前言最近apache pulsar出镜率挺高的,这里新开一篇文档记录一下pulsar的学习之路参考链接: 什么是Pulsar?Pulsar是由雅虎创建的开源的、 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。现在是Apache基金会的一个孵化项目。 1 Pulsar特性
Pulsar的关键特性:
2 Pulsar消息队列2.1 生产者生产者是一个附加到topic并将消息发布到 Pulsar broker的进程。 Pulsar broker 处理消息。 2.1.1 发送模式Producer 可以以同步(sync) 或 异步(async) 的方式发布消息到 broker
2.1.2 访问模式对于消息生产者来说在主题上你可以有不同类型的访问模式
2.1.3 压缩Pulsar 生产者目前支持以下类型的压缩:
2.1.4 批量处理当批量处理启用时,producer 会在单个请求中积累并发送一批消息。 批量处理的量大小由最大消息数和最大发布延迟定义。 因此,积压数量是分批处理的总数,而不是信息总数。 2.1.5 分块(chunking)
当启用分块时(chunkingEnabled=true) ,如果消息大小大于允许的最大发布有效载荷大小,则 producer 将原始消息分割成分块的消息,并将它们与块状的元数据一起单独和按顺序发布到 broker。 在 broker 中,分块的消息将和普通的消息以相同的方式存储在 Managed Ledger 上。 唯一的区别是,consumer 需要缓冲分块消息,并在收集完所有分块消息后将其合并成真正的消息。 Managed Ledger上的分块消息可以和普通消息交织在一起。 如果 producer 未能发布消息的所有分块,则当 consumer 未能在过期时间(expire time) 内接收所有分块时,consumer可以过期未完成的分块。默认情况下,过期时间设置为1小时。
如下图所示,当生产者向topic发送一批大的分块消息和普通的非分块消息时。 假设生产者发送的消息为 M1,M1 有三个分块 M1-C1,M1-C2 和 M1-C3。 这个 broker 在其管理的ledger里面保存所有的三个块消息,然后以相同的顺序分发给消费者(独占/灾备模式)。 消费者将在内存缓存所有的块消息,直到收到所有的消息块。将这些消息合并成为原始的消息M1,发送给处理进程。
当多个生产者发布块消息到单个topic,这个 Broker 在同一个 Ledger 里面保存来自不同生产者的所有块消息。 如下所示,生产者1发布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三个块组成。 生产者2发布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三个块组成。 这些特定消息的所有分块是顺序排列的,但是其在 ledger 里面可能不是连续的。 这种方式会给消费者带来一定的内存负担。因为消费者会为每个大消息在内存开辟一块缓冲区,以便将所有的块消息合并为原始的大消息。 2.2 消费者在Consumer端有一个队列,用于接收从broker推送来的消息。通过receiverQueueSize参数配置队列的长度 (队列的默认长度是1000) 每当 consumer.receive() 被调用一次,就从缓冲区(buffer)获取一条消息。 2.2.1 接收模式可以通过同步(sync) 或者异步(async)的方式从brokers接受消息。
2.2.2 确认当消费者成功的消费了一条消息,这个消费者会发送一个确认信息给broker。 这个消息时是永久保存的,只有在收到订阅者消费成功的消息确认后才会被删除。 如果希望消息被 Consumer 确认后仍然保留下来,可配置消息保留策略实现。
2.2.3 取消确认当消费者在某个时间没有成功的消费某条消息,消费者想重新消费到这条消息,这个消费者可以发送一条取消确认消息到broker,broker会将这条消息重新发给消费者。 2.2.4 确认超时如果消息没有被成功消费,你想去让 broker 自动重新交付这个消息, 你可以采用未确认消息自动重新交付机制。 客户端会跟踪 超时 时间范围内所有未确认的消息。 并且在指定超时时间后会发送一个 重发未确认的消息 请求到 broker。 2.2.5 死信主题死信主题使您可以在消费者无法成功消费某些消息时消费新消息。 在这种机制中,消费失败的消息存储在一个单独的主题中,称为死信主题。 您可以决定如何处理死信主题中的消息。
默认死信主题使用以下格式:
如果要指定死信主题的名称,请使用此 Java 客户端示例:
死信主题取决于消息重新传递。 由于确认超时或取消确认,消息被重新传送。 如果打算对消息使用取消确认,请确保在确认超时之前对其进行取消确认。 2.2.6 Retry letter topic很多在线的业务系统,由于业务逻辑处理出现异常,消息一般需要被重新消费。 若需要允许延时重新消费失败的消息,你可以配置生产者同时发送消息到业务主题和重试主题,并允许消费者自动重试消费。 配置了允许消费者自动重试。如果消息没有被消费成功,它将被保存到重试主题当中。并在指定延时时间后,自动重新消费重试主题里面的消费失败消息。
2.3 topic2.3.1 分区topic单个topic的消息一般是由单个broker处理,为了提高topic的消息处理能力,pulsar提供了partitioned topic的支持,与kafka和rocketmq一样,每个partition由不同的broker处理,在消费时,单个partition可选择exclusive, failover和shared模式 2.3.2 消息的保留和过期默认情况上,当broker会立刻删除所有收到了ack的消息,没有被ack的消息会持久化存储,但是我们可以修改pulsar的行为,pulsar允许我们存储已经收到ack了的消息,也可以给未收到ack的消息设置过期时间(TTL) 2.3.3 消息去重Pulsar支持在broker端对消息做去重,当打开消息去重后,重发的消息(重试等产生的)不会被重新存储,这个特性使得pulsar对流式计算引擎(例如flink)更加友好,流式计算引擎更容易实现exactly-once语义的计算任务,消息去重的存储示意图如下: 2.3.4 消息延迟传递延时消息功能允许你能够过一段时间才能消费到这条消息,而不是消息发布后,就马上可以消费到。 默认情况下启用延迟消息传递。 您可以在代理配置文件中更改它
下面是 Java 当中生产延时消息一个例子
2.4 命名空间命名空间是租户内部逻辑上的命名术语。 可以通过admin API在租户下创建多个命名空间。 例如,包含多个应用程序的租户可以为每个应用程序创建单独的命名空间。 Namespace使得程序可以以层级的方式创建和管理topic Topicmy-tenant/app1 ,它的namespace是app1这个应用,对应的租户是 my-tenant。 你可以在namespace下创建任意数量的topic。 2.5 订阅Pulsar支持exclusive、shared和failover三种消息订阅模式,这三种模式的示意图如下:
pulsar默认的消息订阅模式,在这种模式下,中能有一个consumer消息消息,一个订阅关系中只能有一台机器消费每个topic,如果有多于一个consumer消费此topic则会出错,消费示意图如下:
一个topic也是只有单个消费消费一个订阅关系的消息,与exclusive模式不同之处在于,failover模式下,每个消费者会被排序,当前面的消费者无法连接上broker后,消息会由下一个消费者消费,消费示意图如下: 消息可被多个consumer同时消费,这种模式下,无法保证消息的顺序,并且无法使用one by one和cumulative的ack模式,消息通过roundrobin的方式投递到每一个消费者,消费示意图如下: 3 Pulsar的架构单个 Pulsar 集群由以下三部分组成:
下图为一个 Pulsar 集群: 3.1 BrokerPulsar的broker是一个无状态组件, 主要负责运行另外的两个组件:
最后,为了支持全局Topic异地复制,Broker会控制Replicators追踪本地发布的条目,并把这些条目用Java 客户端重新发布到其他区域。 3.2 Cluster一个 Pulsar 实例由一个或多个 Pulsar 集群组成。 反过来,集群包括:
集群间可以通过异地复制进行消息同步。 3.3 元数据存储Pulsar 元数据存储维护一个 Pulsar 集群的所有元数据,例如topic元数据、schema、broker加载数据等。Pulsar 使用 Apache ZooKeeper 进行元数据存储、集群配置和协调。Pulsar 元数据存储可以部署在单独的 ZooKeeper 集群上,也可以部署在现有的 ZooKeeper 集群上。您可以将一个 ZooKeeper 集群同时用于 Pulsar 元数据存储和 BookKeeper 元数据存储。如果要部署连接到现有 BookKeeper 集群的 Pulsar broker,则需要分别为 Pulsar 元数据存储和 BookKeeper 元数据存储部署单独的 ZooKeeper 集群。
存储配置 3.4 持久化存储Pulsar 为应用程序提供有保证的消息传递。 如果消息成功到达 Pulsar broker,它将被传递到其预期目标。 3.4.1 Apache BookkeeperPulsar用 Apache BookKeeper作为持久化存储。 BookKeeper是一个分布式的预写日志(WAL)系统,有如下几个特性特别适合Pulsar的应用场景:
除了消息数据,cursors也持久存储在 BookKeeper 中。Cursors是消费端订阅消费的位置。 BookKeeper让Pulsar可以用一种可扩展的方式存储消费位置。
Pulsar也支持临时消息( (non-persistent) )存储 下图展示了brokers和bookies是如何交互的: 3.4.2 LedgersLedger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个BookKeeper存储节点(就是Bookies)的写入。 Ledger的条目会被复制到多个bookies。 Ledgers本身有着非常简单的语义:
3.4.3 Ledger读一致性BookKeeper的主要优势在于他能在有系统故障时保证读的一致性。 由于Ledger只能被一个进程写入(之前提的写入器进程),这样这个进程在写入时不会有冲突,从而写入会非常高效。 在一次故障之后,ledger会启动一个恢复进程来确定ledger的最终状态并确认最后提交到日志的是哪一个条目。 在这之后,能保证所有的ledger读进程读取到相同的内容。 3.4.4 ledgers管理(managed ledger)鉴于 Bookkeeper Ledger提供单一日志抽象,在ledger之上开发了一个库,称为managed ledger,代表单个主题的存储层。managed ledger即消息流的抽象,有一个写入器进程不断在流结尾添加消息,并且有多个cursors 消费这个流,每个cursor有自己的消费位置。
3.5 日志存储在 BookKeeper 中,日志文件包含 BookKeeper 事务日志。在更新到 ledger之前,bookie需要确保描述这个更新的事务被写到持久(非易失)存储上面。 在bookie启动和旧的日志文件大小达到上限(由 journalMaxSizeMB 参数配置)的时候,新的日志文件会被创建。 3.6 Pulsar协议Pulsar客户端和Pulsar集群交互的一种方式就是直连Pulsar brokers 。 然而,在某些情况下,这种直连既不可行也不可取,因为客户端并不知道broker的地址。 例如在云环境或者 Kubernetes 以及其他类似的系统上面运行Pulsar,直连brokers就基本上不可能了。
关于Pulsar proxy有一些比较重要的注意点:
4 Pulsar与kafka的区别4.1 性能对比Pulsar 表现最出色的就是性能,Pulsar 的速度比 Kafka 快得多,与 Kafka 相比,Pulsar 的速度提升了 2.5 倍,延迟降低了 40%。 4.2 对比总结
TTL是订阅的消费时间限制。如果在配置的TTL时间段内没有任何消费者使用消息,则消息将自动标记为已确认。 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/18 16:55:35- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |