[前提]
[正文]
在org.apache.dubbo.rpc.FutureContext内部的result属性
当业务线程需要调用外部dubbo接口的时候, 会创建一个DefaultFuture, 每个DefaultFuture对象都会有唯一的一个Id与之对应, 并把这个关系放到Map中
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// 存储 id <-> DefaultFuture关系
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
由于接口调用都会有超时, 那么如何实现这个超时机制呢?
将一个超时任务放入到时间轮上.
// org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuture
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
// 超时检查
timeoutCheck(future);
return future;
}
// org.apache.dubbo.remoting.exchange.support.DefaultFuture#timeoutCheck
private static void timeoutCheck(DefaultFuture future) {
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}
在我之前的 Netty中的时间轮(v3.10.7) 文章中介绍了时间轮, 通过时间轮的方式, 检测任务是否超时到期了.
接下来就是将DefaultFuture等信息组装成一个FutureContext放入到线程的ThreadLocalmap中.
// org.apache.dubbo.rpc.protocol.dubbo.dubboInvoker#doInvoke
protected Result doInvoke(final Invocation invocation) throws Throwable {
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getmethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getmethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeto(responseFuture);
//
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
}
}
}
// org.apache.dubbo.rpc.FutureContext
public class FutureContext {
private static InternalThreadLocal<FutureContext> futureTL = new InternalThreadLocal<FutureContext>() {
@Override
protected FutureContext initialValue() {
return new FutureContext();
}
};
public static FutureContext getContext() {
return futureTL.get();
}
}
综上, id和DefaultFuture存到Map中, 设置好定时任务, DefaultFuture放到线程ThreadLocalmap中之后, 线程就可以被阻塞了
当dubbo的提供方返回数据之后, dubbo调用方的线程就可以处理响应了.
如上图, dubbo调用方的dubbo线程开始处理响应.// org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response, boolean)
public static void received(Channel channel, Response response, boolean timeout) {
try {
// 从Map中移除id <-> DefaultFuture的关系
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// 将时间轮上的超时任务取消掉
t.cancel();
}
//
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
首先从Map中移除id <-> DefaultFuture的关系, 将时间轮上的超时任务取消掉.
接下来就是把响应数据设置到DefaultFuture上, 并唤醒之前阻塞的线程.
// org.apache.dubbo.remoting.exchange.support.DefaultFuture#doReceived
private void doReceived(Response res) {
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
}
}
// java.util.concurrent.CompletableFuture#complete
public boolean complete(T value) {
// 将结果设置到DefaultFuture
boolean triggered = completeValue(value);
// 唤醒阻塞线程
postComplete();
return triggered;
}
使用到了异步编程
被唤醒的阻塞线程就可以从DefaultFuture中拿到已设置好的数据,继续后续的业务处理.
如上图, 消费者的线程ThreadLocalmap中的FutureContext中的result值却一直留在线程的ThreadLocalmap中了,并不会被释放掉, 造成了内存泄漏.
公众号
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。