一、简介

Pulsar 是一个多用户,高性能的服务间消息通信的解决方案。最早是雅虎开发的,目前已经变成了 Apache 顶级项目。

Pulsar 的关键特性:

  • 单个 Pulsar 实例原生支持多个集群,并且支持对在集群中的消息有跨地区备份。
  • 非常非常低的发布和端到端的延迟。
  • 无缝扩展到百万级别的 topics。
  • 方便上手的客户端 API(Java,Go,Python,C++)
  • 对于 topics 拥有多种订阅模式。
  • 通过 Apache BookKeeper 保证消息分发和存储。
  • 拥有一个轻量级的无服务的计算框架来提供流式数据处理。
  • 拥有一个基于无服务计算框架的连接框架可以非常方便的将数据从 Pulsar 输入和输出。
  • Tiered Storage 将热点数据转移到冷数据区。

二、消息

2.1 messaging concepts

Pulsar 是基于发布-订阅模式的。一旦订阅被创建了,所有的消息就会被 Pulsar 保存,即使消费者失联。只有消息被消费者确认这些消息已经处理完毕了,这些消息才会被丢弃。

2.1.1 消息

消息是 Pulsar 中的最基本单元。

Component Purpose
Value / data payload The data carried by the message. All Pulsar messages carry raw bytes, although message data can also conform to data schemas 序列化与反序列化.
Key Messages can optionally be tagged with keys, which can be useful for things like topic compaction.
Properties An optional key/value map of user-defined properties.
Producer name The name of the producer that produced the message (producers are automatically given default names, but you can apply your own explicitly as well).
Sequence ID Each Pulsar message belongs to an ordered sequence on its topic. A message’s sequence ID is its ordering in that sequence.
Publish time The timestamp of when the message was published (automatically applied by the producer).
Event time An optional timestamp that applications can attach to the message representing when something happened, for example, when the message was processed. The event time of a message is 0 if none is explicitly set.
TypedMessageBuilder TypedMessageBuilder is used to construct a message. You can set message properties like the message key, message value with TypedMessageBuilder. When you set TypedMessageBuilder, the best practice is to set the key as a string. If you set the key as other types, for example, an AVRO object, the key is sent as bytes, and it is difficult to get the AVRO object back on the consumer.

2.2 生产者

一个生产者是一段将关联了 topic 的消息发布到 Pulsar broker 做处理的程序。

2.2.1 发送模式

Mode Description
Sync send The producer will wait for acknowledgement from the broker after sending each message. If acknowledgment isn’t received then the producer will consider the send operation a failure.
Async send The producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size configurable), the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.

2.2.2 压缩

消息可以压缩以节省带宽,有以下几种方式来来压缩消息:

2.2.3 Batching

当 batching 被打开了,生产者可以一个请求中发送多条消息。batch size 依赖于消息的最大大小和最大发送延迟。因此,backlog 的 size 代表了所有 batches 的大小而不是消息的总大小。

在 Pulsar 中,batches 被当成一个单独的集合而不是单个的消息。在这个条件下,消费者通常会把一个 batch 拆分成单独的消息。然而,延迟消息通常发送的是单个消息即使 batching 被打开了。

只有在 batch 中的所有消息都被消费者确认消费了,消费者才会回复 batch 被消费的消息。这也就意味着 batch 中的任何一个消息失败了,所有在 batch 中的消息就会重发,即使其中一部分消息已经确认过了。

为了解决这个问题,在 2.6.0 版本中加入了 batch index 的概念, broker 保存 batch index 确认状态避免消息被重复发送,当所有在 batch 中的消息被确认消费了,这个 batch 就会被删除。

默认 batch index 确认是关闭的,可以使用 batchIndexAcknowledgeEnable 这个参数打开。但是打开这个通知会带来更多的内存消耗,所以要小心这个操作。

2.3消费者

一个消费者是订阅了一个 topic 并不断接受该 topic 消息的程序。

2.3.1 接受模式

Mode Description
Sync receive A sync receive will be blocked until a message is available.
Async receive An async receive will return immediately with a future value—a CompletableFuture in Java, for example—that completes once a new message is available.

2.3.2 监听器

客户端库中为消费者提供了监听器的实现。例如 Java 客户端提供了 MesssageListener 接口,在这个接口中当有消息接受到就会调用received方法。

2.3.3 消息确认

当一个消费者成功的消费了一个消息,这个消费者就会发送一个确认通知请求给 broker。这个消息会被永久保存在 broker,只有所有的订阅都回复了确认通知。如果想要在确认通知后也存储消息,则需要配置 消息保存策略

消息可以被一个一个的确认,也可以积累到一起进行确认。积累确认时只需要确认最后一个它收到的消息就可以。在消息流中的消息直到(包括)确认的消息都不会重新发送给那个消费者。

消息确认通知有两种方式:

  • 单独确认。每一个消息单独确认。
  • 积累确认。确认最后一个消息。(积累确认不能应用在共享订阅模式[shared subscription mode,因为该模式中多个消费者访问了同一个订阅)

2.3.4 未确认通知

当一个消费者在一个时刻没有成功地消费一个消息并且想重新消费消息时,这个消费者可以发送一个 negative acknowledgement 给 broker,然后 broker 会重新发送这个消息。

同样的未确认通知也可以一个一个发和累积发。在排他和容错订阅模式下,消费者只发送最后一个消息的未确认通知,在共享模式中,可以单独地发送未确认消息。

在 batch 模式中,所有消费者将会重新收到 batch 只要发送了未确认通知。

2.3.5 确认通知超时

当想在消息未消费成功时自动触发 broker 重发消息,可以使用未确认消息时重发机制。客户端会在acktimeout时间区间内追踪未确认的消息,然后重新发送未确认消息请求给 broker。

如果 batching 被打开了,其他在同一个 batch 中的消息也会被重新发送。

倾向于使用未确认通知而不是依靠超时机制,未确认通知更准确,而且避免了因为消息处理时间过长而导致的非合法重发。

2.3.6 Dead letter topic

Dead letter topic 可以使消费者在不能成功消费一些消息的时候继续消费新消息。在这个机制中,消费失败的消息会被存在一个单独的 topic 中,这个 topic 就被称作 dead letter topic。

示例:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
              .topic(topic)
              .subscriptionName("my-subscription")
              .subscriptionType(SubscriptionType.Shared)
              .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(maxRedeliveryCount)
                    .build())
              .subscribe();

// 默认 dead letter topic 格式
<topicname>-<subscriptionname>-DLQ

//如果想要为 dead letter topic 指定名称 使用如下方式
  Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
              .topic(topic)
              .subscriptionName("my-subscription")
              .subscriptionType(SubscriptionType.Shared)
              .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(maxRedeliveryCount)
                    .deadLetterTopic("your-topic-name")
                    .build())
              .subscribe();

当前只有共享订阅模式支持 dead letter topic。

2.3.7 Retry letter topic

在很多线上业务系统中,消息需要被重复消费因为业务逻辑遇到了异常。通常,用户希望能够配置重复消费失败消息的延迟时间。在这个用例中,可以配置生产者同时向业务 topic 和 retry letter topic 发送消息,并且可以在消费者开启自动重试消息。

自动重试默认是关闭的,可以设置 enableRetrytrue打开自动重试机制。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .receiverQueueSize(100)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(maxRedeliveryCount)
                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
                        .build())
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

2.4 Topics

topic 在订阅系统中就是从生产者到消费者传递消息的通道。在 Pulsar 中 topic 的名称有一个良好定义的结构

{persistent|non-persistent}://tenant/namespace/topic
Topic name component Description
persistent / non-persistent This identifies the type of topic. Pulsar supports two kind of topics: persistent and non-persistent (persistent is the default, so if you don’t specify a type the topic will be persistent). With persistent topics, all messages are durably persisted on disk (that means on multiple disks unless the broker is standalone), whereas data for non-persistent topics isn’t persisted to storage disks.
tenant The topic’s tenant within the instance. Tenants are essential to multi-tenancy in Pulsar and can be spread across clusters.
namespace The administrative unit of the topic, which acts as a grouping mechanism for related topics. Most topic configuration is performed at the namespace level. Each tenant can have multiple namespaces.
topic The final part of the name. Topic names are freeform and have no special meaning in a Pulsar instance.

不用明确地在 Pulsar 中创建一个 topic。当消费者尝试消费一个不存在的 topic 时,Pulsar 会自动地创建一这个 topic。如果 tenant 和 namespace 没有指定,这个 topic 就会被声称在默认的 tenant 和 namespace 下。

2.5 Namespaces

命名空间是在租户下的逻辑命名法,一个租户可以创建多个命名空间。比如一个租户可以为两个不同的应用创建完全不同的命名空间。

2.6 Subscriptions

订阅是一个命名配置规则,这个规则决定了消息怎么推送到消费者。目前在 Pulsar 中有四种消费模式:独占,共享,容错和 key 共享。如下图所示:

Subscription modes

组合订阅是非常方便的。

  • 如果想在消费者中达到传统的发布订阅模式,可以为每一个消费者起一个唯一的订阅名称。
  • 如果想在多个消费者间使用消息队列模式,可以让多个消费者拥有同一个订阅名称(共享,容错,key_shared)。
  • 如果想同时组合使用,可以让一部分消费者使用独占模式,其他选择另外的模式。

2.6.1 独占 Exclusive

在独占模式中,只有一个消费者允许添加到一个订阅中。如果超过一个消费者尝试使用同一个订阅订阅一个主题,这个消费者会报错。默认情况下开启的是独占模式。如下图所示,只有 Consumer A-0 允许消费这个订阅。

Exclusive subscriptions

2.6.2 容错 FailOver

在容错模式中,多个消费者可以绑定到同一个订阅中。在该模式中, broker 会基于优先级和消费者名称的字母排序选择一个主消费者。一开始只有主消费者会收到消息,当主消费者失联后,消息会被按顺序发送到下一个消费者。

对于 partitioned topics 而言,broker 分配 partioned topics 给拥有最高优先级的消费者。如果多个消费者都是最高优先级,则 broker 公平的分配 topics 给这些消费者。

如下图所示,Consumer B-0 是主消费者,如果 Consumer B-0 挂了,则 Consumer B-1会变成下一个消费者。

Failover subscriptions

2.6.3 共享模式 Shared

在共享或轮转模式中,多个消费者可以绑定到同一个订阅。消息被轮流发送到这些 Consumer 中,并且每一个消息只会发送给一个消费者。当一个消费者失联,所有未被确认的消息将会被重新安排发送到剩下的消费者中。

当使用共享模式时,需要注意以下几点:

  • 消息是无序的
  • 不能使用累积确认策略

如下图所示,Consumer C-1、Consumer C-2、Consumer C-3 都可以绑定到这个订阅。

Shared subscriptions

2.6.4 Key_Shared

在 key_shared 模式中,多个消费者可以绑定到同一个订阅中。消息在多个消费者间进行分发,拥有同一个key 或者同种顺序的 key 的消息会被发送到同一个消费者。不管消息被重发了多少次,他总是被重发到同一个消费者。当一个消费者加入或者离开订阅,会导致当前的消费者接受到的消息发生变化。

当使用 Key_Shared 模式时,需要注意以下几点:

  • 需要为消息指定一个 key 或者有序的 key
  • 在 key_shared 模式中不能使用累积确认

可以在 broker.config 模式中屏蔽掉 key_shared 订阅。

Key_Shared subscriptions

2.7 多主题订阅 Multi-topic subscriptions

Pulsar 中的消费者可以同时订阅不同的主题,可以用如下两种方式定义一个主题列表

  • 依靠正则表达式,比如 persistent://public/default/finance-.* (所有主题必须拥有相同的命名空间)
  • 明确定义一个列表。

当订阅了多个主题后, Pulsar 客户端会自动调用 Pulsar API 去发现匹配的主题并订阅它们。当某些主题当前不存在时,消费者会在它们被创建时自动订阅。

要注意的是多个主题间的消息不保证顺序。

如下是多主题订阅的 Java 版本的示例。

import java.util.regex.Pattern;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;

PulsarClient pulsarClient = // Instantiate Pulsar client object

// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(allTopicsInNamespace)
                .subscriptionName("subscription-1")
                .subscribe();

// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(someTopicsInNamespace)
                .subscriptionName("subscription-1")
                .subscribe();

2.8 Partitioned topics

一般的主题只能由一个 broker 服务,限制了这个主题的最大流量。分片主题是一种特殊的主题可以被多个 broker 处理。

在这个场景中,一个分片主题其实是由 N 个内部主题实现的,N 是分片数量。当推送消息到一个分片主题时,每一个消息是被路由到其中一个 broker 中的。跨 broker 的分片分布是有 Pulsar 自动处理的。

如下图所示。

img

当前的 Topic1 主题有四个分片(P0~P4)。因为分片数多于 broker 数,所以有两个 broker 拥有两个分片,其中一个拥有一个分片。

消息被广播给了两个消费者。路由模式(routing mode)决定了每个消息应该发布到哪个分片,订阅模式(subscription mode)决定了消息进入哪个消费者。

路由和订阅模式通常是可以分开决定的。通常来讲,流量问题应当考虑分片路由,应用语义应当考虑订阅模式。

订阅模式不受分片与否的影响,分片只决定了消息被生产者发布和消费者处理确认消息之间发生了什么。

分片主题需要通过 admin API 明确创建。分片数量可以在创建主题时决定。

2.8.1 路由模式 (Routing modes)

当推送到分片主题时,必须指定一个路由模式。路由模式决定了每个消息应当发布到哪个分片。有以下三种路由模式

Mode Description
RoundRobinPartition If no key is provided, the producer will publish messages across all partitions in round-robin fashion to achieve maximum throughput. Please note that round-robin is not done per individual message but rather it’s set to the same boundary of batching delay, to ensure batching is effective. While if a key is specified on the message, the partitioned producer will hash the key and assign message to a particular partition. This is the default mode.
SinglePartition If no key is provided, the producer will randomly pick one single partition and publish all the messages into that partition. While if a key is specified on the message, the partitioned producer will hash the key and assign message to a particular partition.
CustomPartition Use custom message router implementation that will be called to determine the partition for a particular message. User can create a custom routing mode by using the Java client and implementing the MessageRouter interface.

2.8.2 顺序保证

消息的顺序取决于 MessageRoutingMode 和 Message Key。通常,用户希望每一个 key 的分片是有序的。

如果一个消息中存在一个 key ,当使用 RoundRobinPartition 和 SinglePartition 模式时,在 ProducerBuilder 中可以指定 HashingScheme 来决定这个消息被路由到哪个分片。

Ordering guarantee Description Routing Mode and Key
Per-key-partition All the messages with the same key will be in order and be placed in same partition. Use either SinglePartition or RoundRobinPartition mode, and Key is provided by each message.
Per-producer All the messages from the same producer will be in order. Use SinglePartition mode, and no Key is provided for each message.

2.8.3 Hashing scheme

Hashing Scheme 是一个在具体消息选择分片时的代表了标准哈希方法的可用集合。

有两种类型的哈希方法是可用的:JavaStringHashMurmur3_32Hash。要注意的是 JavaStringHash在从多种语言的生产者中获取消息不能够生效,这种情况下,推荐使用Murmur3_32Hash方式。

2.9 非持久化 topics

通常,Pulsar 会将为确认的消息存在多个 Bookeeper 存储节点 bookies 上。持久化的消息数据可以在 broker 重启或者订阅失败时保存下来。

同时,Pulsar 支持非持久化消息,只存在内存中,没有持久化到硬盘里。这种模式下,消息会丢失。

可以用如下方式生命非持久化数据。

non-persistent://tenant/namespace/topic

在非持久化主题中, broker 会立刻发送消息到所有的订阅,而不经过将他们持久化到 Bookeeper 上。当订阅者失联了,broker 不能够推送那些即时消息,并且订阅者再也不会受到那些消息了。非持久化主题在某些情况下会比持久化主题快一些,但是丢失了 Pulsar 部分优势。

2.9.1 性能 Performance

性能上通常会快于持久化的主题。

2.9.2 客户端 API

与持久化主题唯一不同的点在于主题名称必须以 non-persistent开头。

以下为示例:

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
String npTopic = "non-persistent://public/default/my-topic";
String subscriptionName = "my-subscription-name";

Consumer<byte[]> consumer = client.newConsumer()
        .topic(npTopic)
        .subscriptionName(subscriptionName)
        .subscribe();

Producer<byte[]> producer = client.newProducer()
                .topic(npTopic)
                .create();

2.10 消息保存和过期

通常在 Pulsar 消息 broker 中

  • 会立刻删除被确认过的消息
  • 持久化存储未被确认的消息。

Pulsar 有两个 feature,保证了可以重写以上行为:

  • 消息 retention 保证存储消息即使已经被消费者确认了
  • 消息expiry保证了你可以为未被确认的消息设置 TTL

所有的 retentionexpiry策略都被保存在命名空间级。

概念如下图所示。

Message retention and expiry

2.11 消息去重存储

当一个消息被存储不止一次时会发生消息重复。去重是 Pulsar 的一个可选 feature 防止了不必要的消息重复,即使这个消息被接受到不止一次。下图比较了两种模式:

Pulsar message deduplication

需要注意的是,该设置是在命名空间级别进行处理。

2.11.1 生产者幂等

另一种可用的消息去重途径是保证每条消息被生产一次。这个途径通常被称作生产者幂等,这种方式需要客户端保证。在 Pulsar 中,可以在 broker 层实现消息去重。只需要进行设置就可以了。Managing message deduplication

2.12 延迟消息发送

延迟消息发送保证了你可以不用立刻消费一个消息。在这个机制下,消息存储于 Bookeeper 中,在消息发布到 broker 后,DelayedDeliveryTracker 在内存中保留了时间索引,它会被推送到客户端直到达到延迟的推送时间。延迟消息只在共享订阅模式下能够很好的工作,在独占和容错模式下,消息会被立刻发送。

Delayed Message Delivery

消息会被无检查的存储在 broker 中,当一个消费者消费消息时,如果这个消息被设置了延迟消费,这个消息就会被添加到 DelayedDeliveryTracker。订阅会从 DelayedDeliveryTracker 中检查到时间的消息。

2.12.1 Broker

延迟消息默认是开启的,可以通过如下配置进行更改。

# Whether to enable the delayed delivery for messages.
# If disabled, messages are immediately delivered and there is no tracking overhead.
delayedDeliveryEnabled=true

# Control the ticking time for the retry of delayed message delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000

2.12.2 Producer

下面的示例是如何开启延迟消息。

// message to be delivered at the configured delay interval
producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();

三、架构

在最高层级,一个 Pulsar 实例有一个或多个 Pulsar 集群组成。实例中的集群可以互相备份数据。

在一个 Pulsar 集群中:

  • 一个或多个 broker 为从生产者产生的消息做负载均衡,转发这些消息给消费者,与 Pulsar 配置中心通信以处理多种协作任务,将消息存储到 Bookeeper 的实例中,依靠为集群指定的 Zookeeper 来处理确切任务。
  • 一个由多个 bookie 组成的 Bookeeper 集群处理消息的持久化存储。
  • 一个全局的 Zookeeper 来处理多个 Pulsar 集群间的协作任务。

下图展示了一个 Pulsar 集群。

Pulsar architecture diagram

3.1 Brokers

Pulsar broker 是一个无状态的基础组件,它是为了另两个组件:

  • 一个为了管理任务和生产者消费者间的 topic 查找的开放 REST API 的 http 服务
  • 转发器,由异步 TCP 服务实现,用来传输所有数据。

broker 只有在 managed ledger 缓存的容量小于 backlog 时,才会从 Bookeeper 中读取 entries,否则为了性能考虑会优先读取缓存。

最后,为了跨地域备份的全局主题,broker 管理备份组件 tail 本地的 entries 发布区域通过 Java client 重新发送到远程区域。

3.2 Clusters

一个 Pulsar 实例可以有一个或多个 Pulsar 集群组成。集群由以下内容组成:

  • 一个或多个 brokers
  • 一个 Zookeeper 用来管理集群级别的配置和协作
  • bookies 集合用来持久化消息。

3.3 元数据存储

Pulsar 用 Zookeeper 来协作通信、存储元数据和集群配置。在一个 Pulsar 实例中:

  • 租户信息,命名空间,还有其他实体需要全局一致地存储在配置存储中。
  • 每一个 cluster 有它自己本地的 Zookeeper 来存储指定的集群配置和协作信息,比如元数据,broker 负载报告,Bookeeper ledger 的元数据。

3.4 Persistent storage

Pulsar 为应用提供了消息发送的保证。如果一个消息顺利到达了 Pulsar broker,他将被发送到他指定的目标。

3.4.1 Apache Bookeeper

Pulsar 用了 Apache Bookeeper 来持久化消息存储。 Bookeeper 是一个分布式的 WAL 系统,提供了如下优势:

  • 它使 Pulsar 可以使用很多独立的日志,称作 ledgers。主题随时可以创建多个 ledgers
  • 它非常方便地提供了数据的顺序存储
  • 它保证了在不同系统失败的情况下 ledgers 的读一致性
  • 它提供了跨 bookies 的 I/O 分布公平
  • 在容量和流量上支持水平扩展。容量可以在增加更多 bookies 时立刻增加。
  • bookies 是为千万级的 ledgers 并发读写设计的。使用多个磁盘,bookies 可以将堵操作独立出来以降低写操作的延迟影响。

同时,对于消息数据,游标也持久化存储到了 Bookeeper 中。游标是消费者的订阅位置。Bookeeper 确保了 Pulsar 可以存储消费者的消费位置。因此,Pulsar 支持了持久化存储。

bookies 和 brokers 的交互如下图所示

Brokers and bookies

3.4.2 Ledgers

一个 ledgers 追加写的数据结构,由一个单独的 writer 分配给多个 Bookeeper 的存储节点。Ledger entries 在多个 bookies 间被复制了多个副本,Ledgers 本身拥有很简单的语义:

  • 一个Pulsar broker 可以创建一个 ledger ,并且往其中追加 entries,和关闭这个 ledger。
  • 当一个 ledger 被关闭了,不管是主动的还是因为 writer 崩溃了,这个 ledger 只能以只读模式再次打开。
  • 最终,当 ledger 中的 entries 不被需要了,整个 ledger 可以从系统中被删除。
3.4.2.1 Ledger 读一致性

Bookeeper 的主要优势在于它保证了 ledgers 的读一致性。而且这个 ledger 只能被一个进程写,这个进程可以非常高效地追加 entries,不用考虑一致性。在失败后,ledger 会经过一个恢复流程,它会终结 ledger 的状态,并且确认那个 entry 是最后一次提交的。在那之后,这个 ledger 的所有 readers 被保证了能够获得同一份内容。

3.4.2.2 管理 ledgers

Bookeeper ledgers 提供了一份日志的抽象,因此开发了一个叫做 managed ledger 库代表了单个主题的存储层。一个 managed ledger 代表了一个消息流的抽象,单个写过程追加内容到流的结尾,和多个消费该消息流的指针。

在内部,一个 manager ledger 使用多个 ledgers 保存数据。有两个原因为什么使用多个 ledgers:

  • 在失败后,一个 ledger 不能够继续写,只能创建一个新的 ledger。
  • 一个 ledger 可以在这个消息的所有游标都被消费后被删除。

3.4.3 Journal storage

在 Bookeeper 中,journal files 包含了 Bookeeper 的事务日志。在对一个 ledger 进行更新操作前,boodie 需要确保此次更新操作的描述被持久化到事务日志中。一个新的 journal 文件会在 bookie 启动时,或者老的文件达到最大值时被创建。

3.5 Pulsar proxy

在不能直接使用 Pulsar 客户端和 Pulsar cluster 交互的时候。考虑使用 Pulsar Proxy。比如客户端不能直接访问 broker 的地址,例如:将Pulsar 运行在一个云环境中,或者 Kubernetes 中。

3.6 服务发现

连接到 Pulsar brokers 的客户端需要能够通过一个 URL 连接到整个 Pulsar 实例。Pulsar 提供了一个内置的服务发现机制,你可以使用 Deploying a Pulsar instance 构建。

下图展示了 Pulsar 的服务发现。

alt-text

图中,Pulsar 集群是通过一个单一的 DNS 名称暴露出来的。客户端可以通过如下方式访问:

from pulsar import Client

client = Client('pulsar://pulsar-cluster.acme.com:6650')


pulsar      工具

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!