聊聊rocketmq的RequestTask

聊聊rocketmq的RequestTask序本文主要研究一下rocketmq的RequestTaskRequestTaskorg/apache/rocketmq/remoting/net

大家好,欢迎来到IT知识分享网。

本文主要研究一下rocketmq的RequestTask

RequestTask

org/apache/rocketmq/remoting/netty/RequestTask.java

public class RequestTask implements Runnable {

private final Runnable runnable;

private final long createTimestamp = System.currentTimeMillis();

private final Channel channel;

private final RemotingCommand request;

private boolean stopRun = false;

public RequestTask(final Runnable runnable, final Channel channel, final RemotingCommand request) {

this.runnable = runnable;

this.channel = channel;

this.request = request;

}

@Override

public int hashCode() {

int result = runnable != null ? runnable.hashCode() : 0;

result = 31 * result + (int) (getCreateTimestamp() ^ (getCreateTimestamp() >>> 32));

result = 31 * result + (channel != null ? channel.hashCode() : 0);

result = 31 * result + (request != null ? request.hashCode() : 0);

result = 31 * result + (isStopRun() ? 1 : 0);

return result;

}

@Override

public boolean equals(final Object o) {

if (this == o)

return true;

if (!(o instanceof RequestTask))

return false;

final RequestTask that = (RequestTask) o;

if (getCreateTimestamp() != that.getCreateTimestamp())

return false;

if (isStopRun() != that.isStopRun())

return false;

if (channel != null ? !channel.equals(that.channel) : that.channel != null)

return false;

return request != null ? request.getOpaque() == that.request.getOpaque() : that.request == null;

}

public long getCreateTimestamp() {

return createTimestamp;

}

public boolean isStopRun() {

return stopRun;

}

public void setStopRun(final boolean stopRun) {

this.stopRun = stopRun;

}

@Override

public void run() {

if (!this.stopRun)

this.runnable.run();

}

public void returnResponse(int code, String remark) {

final RemotingCommand response = RemotingCommand.createResponseCommand(code, remark);

response.setOpaque(request.getOpaque());

this.channel.writeAndFlush(response);

}

}

  • 里头有一个runnable、channel以及remotingCommand

NettyRemotingAbstract.processRequestCommand

org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

/**

* This container holds all processors per request code, aka, for each incoming request, we may look up the

* responding processor in this map to handle the request.

*/

protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =

new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

//……

/**

* Process incoming request command issued by remote peer.

*

* @param ctx channel handler context.

* @param cmd request command.

*/

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {

final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());

final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;

final int opaque = cmd.getOpaque();

if (pair != null) {

Runnable run = new Runnable() {

@Override

public void run() {

try {

RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();

if (rpcHook != null) {

rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);

}

final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);

if (rpcHook != null) {

rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

}

if (!cmd.isOnewayRPC()) {

if (response != null) {

response.setOpaque(opaque);

response.markResponseType();

try {

ctx.writeAndFlush(response);

} catch (Throwable e) {

log.error(“process request over, but response failed”, e);

log.error(cmd.toString());

log.error(response.toString());

}

} else {

}

}

} catch (Throwable e) {

log.error(“process request exception”, e);

log.error(cmd.toString());

if (!cmd.isOnewayRPC()) {

final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,

RemotingHelper.exceptionSimpleDesc(e));

response.setOpaque(opaque);

ctx.writeAndFlush(response);

}

}

}

};

if (pair.getObject1().rejectRequest()) {

final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,

“[REJECTREQUEST]system busy, start flow control for a while”);

response.setOpaque(opaque);

ctx.writeAndFlush(response);

return;

}

try {

final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);

pair.getObject2().submit(requestTask);

} catch (RejectedExecutionException e) {

if ((System.currentTimeMillis() % 10000) == 0) {

log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())

+ “, too many requests and system thread pool busy, RejectedExecutionException “

+ pair.getObject2().toString()

+ ” request code: ” + cmd.getCode());

}

if (!cmd.isOnewayRPC()) {

final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,

“[OVERLOAD]system busy, start flow control for a while”);

response.setOpaque(opaque);

ctx.writeAndFlush(response);

}

}

} else {

String error = ” request type ” + cmd.getCode() + ” not supported”;

final RemotingCommand response =

RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);

response.setOpaque(opaque);

ctx.writeAndFlush(response);

log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);

}

}

  • 这里根据上下文准备好runnable,然后构建RequestTask,放入到pair的executor中执行
  • 这里的pair为Pair,每个request code对应一个pair

NettyRemotingServer.registerProcessor

org/apache/rocketmq/remoting/netty/NettyRemotingServer.java

public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {

ExecutorService executorThis = executor;

if (null == executor) {

executorThis = this.publicExecutor;

}

Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);

this.processorTable.put(requestCode, pair);

}

  • registerProcessor方法会给requestCode添加一个Pair

BrokerController.registerProcessor

org/apache/rocketmq/broker/BrokerController.java

private ExecutorService sendMessageExecutor;

private ExecutorService pullMessageExecutor;

private ExecutorService queryMessageExecutor;

private ExecutorService adminBrokerExecutor;

private ExecutorService clientManageExecutor;

private ExecutorService consumerManageExecutor;

//……

public void registerProcessor() {

/**

* SendMessageProcessor

*/

SendMessageProcessor sendProcessor = new SendMessageProcessor(this);

sendProcessor.registerSendMessageHook(sendMessageHookList);

sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);

this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);

this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);

this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);

/**

* PullMessageProcessor

*/

this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);

this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

/**

* QueryMessageProcessor

*/

NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);

this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);

this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

/**

* ClientManageProcessor

*/

ClientManageProcessor clientProcessor = new ClientManageProcessor(this);

this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);

this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);

this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

/**

* ConsumerManageProcessor

*/

ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);

this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);

this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

/**

* EndTransactionProcessor

*/

this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);

/**

* Default

*/

AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);

this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);

this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);

}

  • 这里一共分了sendMessageExecutor、pullMessageExecutor、queryMessageExecutor、adminBrokerExecutor、clientManageExecutor、consumerManageExecutor五个executor

小结

BrokerController给每个命令号注册了对应的executor,然后RequestTask在其命令号对应的executor中执行,不同的命令分开executor执行,进行了隔离,避免相互影响。

doc

  • RequestTask

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/48718.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信