一. 前言
RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程
二. 代码流程
在broker启动的时候会拉起相关服务
流程如下:
流程图引用网址
http://blog.csdn.net/akfly/article/details/53447000
三. 代码流程
由于是broker来存储消息,那么消息入口的代码应该是在broker里面,而broker的入口是brokerStartup,以及重要的brokerController。
具体流程可以参考broker启动源代码分析。
broker启动流程
以发送消息为例
1. broker启动注册发送消息处理器
broker启动的时候,会注册一个SendMessageProcesser来响应netty的发送消息请求,如下:
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
}
2. 消息处理器处理发送者发送过来的消息
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand proce***equest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
...
switch (request.getCode()) {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
继续看sendMessage..
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
...
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msginner);
}
调用MessageStore.putMessage(msginner)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。