您现在的位置是:网站首页> 内容页

Netty源码—四、事件处理

  • uedbet新版
  • 2019-04-04
  • 284人已阅读
简介前面经过channel初始化、注册,所需要的数据结构(epoll_event)基本上准备好了,serverSocket也处于监听状态,可以接收来自客户端的请求了。NioServerSo

前面经过channel初始化、注册,所需要的数据结构(epoll_event)基本上准备好了,serverSocket也处于监听状态,可以接收来自客户端的请求了。NioServerSocketChannel注册在了NioEventLoop#selector,在注册过程中启动了NioEventLoop,run方法会循环执行,每次循环都会执行select和执行所有的task。如果select有事件,则会处理收到的事件。

private void processSelectedKeys() { if (selectedKeys != null) { // 是否使用优化过的selectionKey processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); }}

前面在NioEventLoop初始化的时候说过关于selectionKey优化的问题,这里不再赘述。两种方式主要是遍历selectionKey的方式不同,具体处理事件的调用是一样的。这里以processSelectedKeysOptimized为例。

accept

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // channel是NioServerSocketChannel // unsafe是NioMessageUnsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 省略中间代码... if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 调用NioMessageUnsafe.read unsafe.read(); }} catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise());}public void read() { // 省略中间代码... // 由于是ServerSocket,只负责accept,如果有IO事件说明就是有新的客户端连接,所以这里就是创建NioSocketChannel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 注册刚刚创建的NioSocketChannel pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); // 省略中间代码... }}protected int doReadMessages(List<Object> buf) throws Exception { // 调用java.nio.channels.ServerSocketChannel#accept来创建SocketChannel SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { // 创建NioSocketChannel buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { // 省略中间代码... } return 0;}

上面创建了NioSocketChannel之后,接下来注册所有客户端连接的NioSocketChannel,调用的是DefaultChannelPipeline#fireChannelRead方法,接下来是执行pipeline中的handler,在初始化的时候添加了LoggingHandler (如果启动的时候配置了的话),那么目前pipeline中的handler有

io.netty.channel.DefaultChannelPipeline$HeadContext:pipeline创建的时候默认的第一个handlerio.netty.handler.logging.LoggingHandler:启动的时候用户配置的handlerio.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptorio.netty.channel.DefaultChannelPipeline$TailContext:pipeline创建的时候默认的最后一个handler

下面看下ServerBootstrap$ServerBootstrapAcceptor是什么时候添加到handler的

// io.netty.bootstrap.ServerBootstrap#init// 这个方法是NioServerSocketChannel初始化的时候调用的void init(Channel channel) throws Exception { // 省略中间代码... p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { // 在pipeline中添加ServerBootstrapAcceptor pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}

之所以说ServerBootstrapAcceptor,是因为NioSocketChannel的register过程是这个handler的channelRead方法开始的

public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); // 配置NioSocketChannel for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { // 这里childGroup就是一开始我们配置的workerGroup // 所以调用的是io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel) childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }}

接下来的注册过程和NioServerSocketChannel的注册过程是类似的,创建socket,创建SelectionKeyImpl等。只不过NioSocketChannel不监听accept事件。

read

上面在接收到来自客户端的连接请求后,将NioSocketChannel注册到selector上,这个selector也是在NioEventLoop里面的,后面和这个客户端的通信都会通过这个channel进行,如果客户端发送来数据,也是selector收到读事件通知,然后调用processSelectedKey来处理read事件。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // channel是NioSocketChannel // unsafe是NioSocketChannelUnsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 省略中间代码... if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 调用NioByteUnsafe.read unsafe.read(); }} catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise());}public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); // PooledByteBufAllocator,默认的内存申请管理器 final ByteBufAllocator allocator = config.getAllocator(); // AdaptiveRecvByteBufAllocator$HandleImpl final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申请内存 byteBuf = allocHandle.allocate(allocator); // 读取数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; // 执行pipeline中的handler pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); // 省略中间代码 }}

一般来说NioSocketChannel中的handler包括

io.netty.channel.DefaultChannelPipeline$HeadContextorg.lep.test.netty.protocol.custom.codec.NettyMessageDecoder:自定义的解码器org.lep.test.netty.protocol.custom.codec.NettyMessageEncoder:自定义的编码器org.lep.test.netty.protocol.custom.server.LoginAuthRespHandler:自定义的handlerorg.lep.test.netty.protocol.custom.server.HeartBeatRespHandler:自定义的handlerio.netty.channel.DefaultChannelPipeline$TailContext

netty提供了一些基本的编解码功能,自己可以根据实际需要扩展使用,然后自定义自己的逻辑处理handler。

上面还涉及到内存的分配部分留在下一节介绍。

总结

read事件处理过程:

    接收到read事件分配内存,初始化buffer调用channel.read将数据读取到buffer中执行pipeline中的handler,包括了编解码的handler,自定义的handler来处理数据
, 1, 0, 9);

文章评论

Top