请教大家一个关于 netty 多线程安全的一个问题,麻烦各位大佬解答一下! - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
Sign Up Now
For Existing Member  Sign In
congzhou
V2EX    Java

请教大家一个关于 netty 多线程安全的一个问题,麻烦各位大佬解答一下!

  •  
  •   congzhou Jul 19, 2019 3867 views
    This topic created in 2477 days ago, the information mentioned may be changed or developed.

    netty 4.x

    rocketmq NettyServer#doOpen 添加 ChannelHandler 时,使用了 ChannelPipeline#addLast(EventExecutorGroup, ChannelHandler...) 方法,这个方法在每个 ChannelHandler 执行时都会使用 EventExecutorGroup

    ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 就是这个 addLast 方法 .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } }); 
    1. 假设 nio 触发了读事件
    2. 第一次 NioByteUnsafe#read 里面的 AbstractNioByteChannel#doReadBytes 完毕
    3. 然后执行 ChannelPipeline#fireChannelRead。
    4. 调用 AbstractChannelHandlerContext#invokeChannelRead(AbstractChannelHandlerContext, Object),每次执行的时候都会分发一个新的线程去解码 ByteToMessageDecoder#channelRead
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 拿到当前 ChannelHandler 指定的线程池 EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { // 因为这个线程池不是 eventLoopGroupSelector 的,所以会走这边 executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } 

    然后执行完这个方法就返回了,所以又会去执行 NioByteUnsafe#read 里面的 do while,重新执行上面的 1.2.3.4。然后又去执行 ByteToMessageDecoder#channelRead。 假设第一次 ByteToMessageDecoder#channelRead 还没执行,第二次 ByteToMessageDecoder#channelRead 也有可能会先执行了吧?

    Supplement 1    Jul 19, 2019

    第二行复制错了是 rocketmq NettyRemotingServer#start 最后一段 重新执行上面的 1.2.3.4 写错了,是 重新执行上面的 2.3.4 第一次写有点紧张,sorry

    Supplement 2    Jul 19, 2019

    打扰大家了,已经看到为什么了
    DefaultChannelPipeline#childExecutor

    private EventExecutor childExecutor(EventExecutorGroup group) { if (group == null) { return null; } Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP); if (pinEventExecutor != null && !pinEventExecutor) { return group.next(); } Map&l;EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors; if (childExecutors == null) { // Use size of 4 as most people only use one extra EventExecutor. childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4); } // Pin one of the child executors once and remember it so that the same child executor // is used to fire events for the same channel. EventExecutor childExecutor = childExecutors.get(group); if (childExecutor == null) { childExecutor = group.next(); childExecutors.put(group, childExecutor); } return childExecutor; } 

    应该一个 channel 会指定一个 SingleThreadEventExecutor

    1 replies    2019-12-02 17:21:17 +08:00
    mazai
        1
    mazai  
       Dec 2, 2019
    netty 是不存在多线程的问题的,因为一个 Channel 注册的时候只会只会注册到一个 EventLoop 上( SingleThreadEventExecutor 本质上就是 EventLoop ),在注册的时候会判断当前执行注册的线程是不是 EventLoop 所在的线程,如果是就注册,否则就使用 EventLoop 所在的线程发起一个任务执行注册。一个 EventLoop 上会注册多个 Channel,所以在 EventLoop 维护的 Channel 生命周期中只会使用一个线程执行,因此也就不存在多线程的问题!

    注册的部分可以看 AbstractUnsafe.register();
    About     Help     Advertise     Blog     API     FAQ     Solana     2744 Online   Highest 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 13:11 PVG 21:11 LAX 06:11 JFK 09:11
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86