微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Thrift源码分析--TThreadedSelectorServer线程模型

Thrift的网路服务模型之一的 TThreadedSelectorServer 实现了多Reactor线程模型,请先理解线程模型再深入分析源码。线程模型的讲解此处不再赘述,给出一张总结图如下:

截图来自:Thrift 服务模型和序列化机制深入学习

下面从一次完整的服务调用过程来分析源码:

1. 服务入口

先来看看服务入口函数 serve() ,TThreadedSelectorServer 的 serve() 方法继承自 AbstractNoblockingServer 抽象类。

其中startThread() 方法为抽象方法,在 TThreadedSelectorServer 中实现如下:

在 startThread() 方法中启动了一个 AcceptThread(用来接收网络socket)和多个 SelectorThread(进行网络I/O操作)。接下来具体分析这两个线程对象。

2. Acceptor

直接看其 run() 方法可知,内部不断的循环监听新来的网络连接:

将就绪的连接分配到对应 SelectorThread 中的 BlockingQueue 队列中:

在 handleAccept() 方法中使用了 SelectorThreadLoadBalancer 负载均衡器轮循分发就绪的连接到对应的 SelectorThread 上去:

接下来看 Selector 如何处理连接。

3. Selector

同样线程对象直接看其 run() 方法:可以看到待网络 I/O 事件发生时处理对应的 I/O 事件。

深入对应读写方法可以发现,真正数据的读写操作是调用 FrameBuffer 的 read()/write() 方法,后续我们再对 FrameBuffer 类做讲解,接下来看下工作线程池部分的实现。

4. 任务工作线程池

使用的是Java 提供的 ExecutorService 线程池来管理工作线程。线程池配置来至 Args 配置管理类中:

认实现是 FixedThreadPool 大小为 5,可以自定义调节大小和修改线程池实现。

接下来我们根据其调用处来跟进线程池的用出:

使用处一:Acceptor中的 handleAccept 方法用来处理 FAIR_ACCEPT 策略的新连接。
使用处二:当SelectorThread 完成 read 操作后,回调 frameBuffer.invoke() 方法 -- 最后回调业务处理方法

从线程池中取出线程去执行 Invocation 线程对象:

Invocation 封装 FrameBuffer 的 invoke() 方法到单独的线程中。

FrameBuffer 中的 invoke() 方法

从图中可以看到回调 TProcessor 接口的 process(in,out) 方法。 Thrift IDL 生成代码中的 Processor 类实现 TProcessor 接口,Processor 中对应的业务方法实际是调用 Iface 接口定义的业务方法,我们自己的业务代码实现 Iface 接口完成自己的业务处理逻辑。

完成业务逻辑处理后,会调用 responseReady() 方法,其内部判断数据读取完成和业务处理完成后,将更改 FrameBufferState 为 write 状态,然后开始回写结果给调用端。

至此完成一次完整的服务调用过程。

原文地址:https://www.jb51.cc/react/304571.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐