
netty 4.x
rocketmq NettyServer#doOpen 添加 ChannelHandler 时,使用了 ChannelPipeline#addLast(EventExecutorGroup, ChannelHandler...) 方法,这个方法在每个 ChannelHandler 执行时都会使用 EventExecutorGroup
ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 就是这个 addLast 方法 .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } }); static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 拿到当前 ChannelHandler 指定的线程池 EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { // 因为这个线程池不是 eventLoopGroupSelector 的,所以会走这边 executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } 然后执行完这个方法就返回了,所以又会去执行 NioByteUnsafe#read 里面的 do while,重新执行上面的 1.2.3.4。然后又去执行 ByteToMessageDecoder#channelRead。 假设第一次 ByteToMessageDecoder#channelRead 还没执行,第二次 ByteToMessageDecoder#channelRead 也有可能会先执行了吧?
第二行复制错了是 rocketmq NettyRemotingServer#start 最后一段 重新执行上面的 1.2.3.4 写错了,是 重新执行上面的 2.3.4 第一次写有点紧张,sorry
打扰大家了,已经看到为什么了
DefaultChannelPipeline#childExecutor
private EventExecutor childExecutor(EventExecutorGroup group) { if (group == null) { return null; } Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP); if (pinEventExecutor != null && !pinEventExecutor) { return group.next(); } Map&l;EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors; if (childExecutors == null) { // Use size of 4 as most people only use one extra EventExecutor. childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4); } // Pin one of the child executors once and remember it so that the same child executor // is used to fire events for the same channel. EventExecutor childExecutor = childExecutors.get(group); if (childExecutor == null) { childExecutor = group.next(); childExecutors.put(group, childExecutor); } return childExecutor; } 应该一个 channel 会指定一个 SingleThreadEventExecutor
1 mazai Dec 2, 2019 netty 是不存在多线程的问题的,因为一个 Channel 注册的时候只会只会注册到一个 EventLoop 上( SingleThreadEventExecutor 本质上就是 EventLoop ),在注册的时候会判断当前执行注册的线程是不是 EventLoop 所在的线程,如果是就注册,否则就使用 EventLoop 所在的线程发起一个任务执行注册。一个 EventLoop 上会注册多个 Channel,所以在 EventLoop 维护的 Channel 生命周期中只会使用一个线程执行,因此也就不存在多线程的问题! 注册的部分可以看 AbstractUnsafe.register(); |