微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Pulsar启动了哪些服务

这篇文章主要讲解了“Apache Pulsar启动了哪些服务”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Apache Pulsar启动了哪些服务”吧!

1.启动入口

PulsarStandalonestarter
在standalone模式下,主要启动了以下几个服务

    @H_404_10@

    PulsarService

    @H_404_10@

    PulSaradmin

    @H_404_10@

    LocalBookeeperEnsemble

    @H_404_10@

    WorkerService

PulsarbrokerStarter.brokerStarter
在普通模式下,启动了以下几个服务

    @H_404_10@

    PulsarService

    @H_404_10@

    BookieServer

    @H_404_10@

    AutoRecoveryMain

    @H_404_10@

    StatsProvider

    @H_404_10@

    WorkerService

简单说一些这几个服务

    @H_404_10@

    WorkerService: Pulsar function 相关,可以不启动

    @H_404_10@

    PulsarService: 主要的Pulsarbroker相关

    @H_404_10@

    BookieServer: Bookeeper相关

    @H_404_10@

    AutoRecoveryMain: Bookeeper autorecovery相关

    @H_404_10@

    StatsProvider: Metric Exporter类似的功能

2. PulsarService

PulsarService.start

    @H_404_10@

    ProtocolHandlers
    支持不同protocol处理(kafka协议等)

    @H_404_10@

    localZookeeperConnectionProvider
    维护zk session 和zk连接

    @H_404_10@

    startZkCacheService

    @H_404_10@

    LocalZooKeeperCache => LocalZooKeeperCacheService

    @H_404_10@

    GlobalZooKeeperCache => ConfigurationCacheService

@H_404_10@

BookkeeperClientFactory
创建配置Bookkeeper 客户端

@H_404_10@

managedLedgerClientFactory
维护一个ManagedLedger的客户端,借用BookkeeperClient

@H_404_10@

brokerService
这个是服务器的主要逻辑了,这个放在后面说

@H_404_10@

loadManager
收集集群机器负载,并根据负载情况均衡负载

@H_404_10@

startNamespaceService
NameSpaceService,管理放置的ResourceBundle,和LoadManager相关

@H_404_10@

schemaStorage

@H_404_10@

schemaRegistryService
上面2个都是和Schema相关的

@H_404_10@

defaultOffloader
LedgerOffloader,用来将Ledger(Bookkeeper)中的冷数据放到其他存储当中

    @H_404_10@

    WebService

    @H_404_10@

    webSocketService
    http,websocket相关

    @H_404_10@

    leaderElectionService
    和LoadManager有关,如果是集中方式的话需要选出一个leader定期根据集群情况进行均衡负载

    @H_404_10@

    transactionMetadataStoreService
    事务相关

    @H_404_10@

    metricGenerator
    metric相关

    @H_404_10@

    WorkerService
    pulsar function 相关

3. brokerService

public void start() throws Exception {
        // producer id 分布式生成器
        this.producerNameGenerator = new distributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
                pulsar.getConfiguration().getClusterName());

        // 网络层配置
        ServerBootstrap bootstrap = defaultServerBootstrap.clone();

        ServiceConfiguration serviceConfig = pulsar.getConfiguration();

        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
        ...
        // 绑定端口
        listenChannel = bootstrap.bind(addr).sync().channel();
        ...

       // metric
        this.startStatsUpdater(
                serviceConfig.getStatsUpdateInitialDelayInSecs(),
                serviceConfig.getStatsUpdateFrequencyInSecs());

       // 启动了一堆需要定期执行的任务
        this.startInactivityMonitor();
       // 启动3个schedule任务分别检测
       // 1. 长时间无效的topic
       // 2. 长时间无效的producer(和message去重相关)
       // 3. 长时间无效的subscription
        this.startMessageExpiryMonitor();
        this.startCompactionMonitor();
        this.startMessagePublishBufferMonitor();
        this.startConsumedLedgersMonitor();
        this.startBacklogQuotaChecker();
        this.updatebrokerPublisherThrottlingMaxRate();
        this.startCheckReplicationPolicies();

        // register listener to capture zk-latency
        ClientCnxnAspect.addListener(zkStatsListener);
        ClientCnxnAspect.registerExecutor(pulsar.getExecutor());

4. PulsarChannelInitializer

顺着netty的初始化方式我们直接看ChannelInitializer,这里应该和Kafka类似进行处理请求的操作。

protected void initChannel(SocketChannel ch) throws Exception {
        
        ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
     
        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
            brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));

        ch.pipeline().addLast("flowController", new FlowControlHandler());
        ServerCnx cnx = new ServerCnx(pulsar);
        ch.pipeline().addLast("handler", cnx);

        connections.put(ch.remoteAddress(), cnx);
    }

5. ServerCnx

这个类的作用可以对标KafkaApis,处理各种Api请求
这个类实际上是一个ChannelHandler
继承了PulsarHandler(主要负责一些连接的keepalive逻辑)
PulsarHandler继承了 PulsarDecoder ( 主要负责序列化,反序列化Api请求)
PulsarDecoder实际上是一个 ChannelInboundHandlerAdapter

而PulSaraPi实际上是通过Pulsar.proto 生成的,这里编写了各种Api的定义

感谢各位的阅读,以上就是“Apache Pulsar启动了哪些服务”的内容了,经过本文的学习后,相信大家对Apache Pulsar启动了哪些服务这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程之家,小编将为大家推送更多相关知识点的文章,欢迎关注!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐