文章

Netty 教程 | Netty 框架结构介绍

介绍 :

netty 作为 java 方面的网络通信框架一直在各个系统中被广泛应用。 在这里总结整理了下它的内部原理以及架构组织。

netty 主要用来应用在 socket 通信方面,其它基础方面的资料可以参考我之前的文档:

在这里我们主要从TCP应用部分讲述 Netty 的架构体系。

它涉及到的内容主要分为三个部分 : bootstrap, channel, eventLoop.

image.png

bootstrap 主要负责服务建立与发布 channel 主要负责协议建立与协议事件处理 eventloop 主要负责任务执行与事件监听

在基于 TCP 的 socket 程序里面, 我们的协议主要是指一个 socket 的建立, listen, accept, connect, read, write 等。

netty 使用协议与逻辑分离的思想, 允许我们通过接口来实现, 当协议事件发生时, 我们改如何处理我们的逻辑。


channel

channel 作为 netty 的核心对象之一, 其内部主要封装了用于实现连接与通信的主要属性与方法,它是我们使用 netty 的核心操作对象。

为了使整体的架构思路更加清洗, 我就把负责 socket 通信接口跟消息处理放在了一个地方(统归在 channel 中)。

在架构体系上(Nio模型) :

image.png

从整体来看 :

右侧的 channel 模块主要负责维护 socket 通信接口, 左侧的 pipeline 主要维护负责对于通信事件的逻辑处理。 依赖来分离我们的协议与事件逻辑。

最上层 ChannelOutboundInvokerChannelInboundInvoker 定义了我们的基本协议事件接口, ChannelOutboundInvoker 为主动事件,也就是我们通过操作 socket 对外发出的协议内容。 ChannelInboundInvoker 为被动事件, 及来自对端发出的协议请求被感知。

每一个对应的协议事件, 我们都可以实现相应的接口来针对我们的事件做出处理 : ChannelOutboundInvoker -> ChannelOutboundHandler, ChannelInboundInvoker -> ChannelInboundHandler.

在右侧, Channel 建立在 ChannelOutboundInvoker 之上, 允许后续实现, 建立基于 Channel 的 socket 主动协议方法。 AbstractChannel 在后续实现了 socket 大致过程。

AbstractNioChannel 在此之上实现了对接 Selector 的多路复用功能。 至此服务端与客户端的共性功能基本完备, 后面的 Channel 分为两部分, 分别实现了 Server 角色与 client 角色的Channel.

在左侧 ChannelPipeline 同时继承了ChannelOutboundInvokerChannelInboundInvoker 。 目的是传递来自 channel 的主动事件与被动事件。主动事件来自程序触发, 被动事件来自 select 监听。 两者都允许转移给 pipeline 去处理。

? 为啥 ChannelPipeline 继承自 Invoker 而不是 handler. 这里的 pipeline 担任的是中转的角色, 而不是处理的角色。 pipeline 内部维护一个事件处理队列,允许我们将事件在队列中依次处理, 这样 pipeline 的名字也由此而来。

但是特殊的是对于对外的主动事件, 队列最终节点会转交给 Channel 内部去处理。 因为主动事件诸如connect等, 需要最终通过Channel 去完成实际的 协议建立,让对端感知。

事件类型

有关 channel 的逻辑与事件使用 ChannelOutboundInvokerChannelInboundInvoker 定义, 从名字我们可以了解, ChannelInboundInvoker 主要用来定义被动监听到的从对端过来的事件, 而 ChannelOutboundInvoker 中主要定义来自本端的主动发起的事件。

ChannelInboundInvoker

方法说明
fireChannelRegistered当 channel 被注册到 select 监听池中时触发
fireChannelActive当 channel 变成可用状态时触发
fireChannelInactive当 channel 变为不可用时触发
fireExceptionCaught当事件处理过程中发生异常后触发
fireUserEventTriggered用来用户自定义消息传递
fireChannelRead当有数据到达时触发
fireChannelReadComplete当这批报文被读完时触发
fireChannelWritabilityChanged当写缓冲区不足时触发
fireChannelUnregistered当 channel 从 select 监听池中移出时触发

ChannelOutboundInvoker

方法说明
bind绑定本地端口, 建立 listen
connect连接远程服务端口
disconnect请求断开连接
close关闭本地socket
deregister取消注册 channel
write写数据到本地缓存
flush将本地缓存数据发送到远程
writeAndFlush合并 writeflush 调用

Channel 作为对接用户的网络解析类,它在 ChannelOutboundInvoker 的基础上实现了操作网络协议的相关字段。 将协议的实现实体逻辑放在 Unsafe 中,而后将接口暴露在 Channel 中便于用户调用, 每次调用都会通过 Unsafe 来处理。

AbstractChannel 中实现了基本的 Channel 关于网络协议的主要操作(来自 ChannelOutboundInvoker), 同时对于任何协议方法诸如 bind 等, 都会有对应的 doXxxx 接口暴露出来用于后面实现,提供给 Unsafe 去调用。

image.png

在上层实现 AbstractNioChannel 中, 再次实现了对接 Selector 的功能。 往后分别实现了 AbstractNioMessageChannel, NioMessageUnsafe 用于定义服务端与客户端不同角色的实现。

当一个连接产生时,事件会首先触发 fireChannelRegistered 然后触发 fireChannelActive. 详细信息可以参照

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
AbstractUnsafe.class

private void register0(ChannelPromise promise) {
    try {
        ....
        ....
        boolean firstRegistration = neverRegistered;
        doRegister();   // 调用注册逻辑实现 将 channel 注册到监听池中
        ...
        ...
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        ...
        ...
        if (isActive()) {  
            ...
            pipeline.fireChannelActive();   // 触发 active
            
        }
    } catch (Throwable t) {
        ...
        ...
    }
}

当客户端被关闭时, 事件会首先触发 fireChannelInactive 后触发 fireChannelUnregistered. 详情信息参照 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
AbstractUnsafe.class

private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
    ...
    ...
    invokeLater(new Runnable() {
        @Override
        public void run() {
            try {
                doDeregister();   // 定义卸载注册实现
            } catch (Throwable t) {
                ...
            } finally {
                if (fireChannelInactive) {
                    pipeline.fireChannelInactive();
                }
                ...
                if (registered) {
                    registered = false;
                    pipeline.fireChannelUnregistered();
                }
                ...
            }
        }
    });
}

ChannelPipeline

对于 Channel 的每一个事件, Unsafe 都会提交给 ChannelPipeline 来处理, 每一个 Channel 都拥有自己的 ChannelPipeline

ChannelPipeline 的事件模型为 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 +----------------------------------------------------------------+
 |                        ChannelPipeline                         |
 |                               |                                |
 |                              \|/                               |
 |                    +---------------------+                     |
 |                    |      TailContext    |                     |
 |                    +---+-------------+---+                     |
 |                       /|\            |                         |
 |                        |            \|/                        |
 |                +-------+-------------+---------+               |
 |                |  DefaultChannelHandlerContext |               | 
 |                +-------+-------------+---------+               |
 |                       /|\            .                         |
 |                        .             .                         |
 |                        .             .                         |
 |                        .            \|/                        |
 |                +-------+-------------+---------+               |
 |                | DefaultChannelHandlerContext  |               | 
 |                +-------+-------------+---------+               |
 |                       /|\            |                         |
 |                        |            \|/                        |
 |                    +---+-------------+---+                     |
 |                    |     HeadContext     |                     |
 |                    +---+-------------+---+                     |
 |                       /|\            |                         |
 +------------------------+-------------+-------------------------+
                          |            \|/
 +------------------------+-------------+-------------------------+
 |                        |             |                         |
 |              [ Socket.read() ]   [ Socket.write() ]            |
 |                                                                |
 |  Netty Internal I/O Threads (Transport Implementation)         |
 +----------------------------------------------------------------+

ChannelPipeline 中提供了以时间流的方式来处理 Channel 的每一个事件。事件处理处理都是继承自 AbstractChannelHandlerContext, 这是一个双向链表结构, 用于事件在两个方向上进行传递.

在链表节点的实现中, 首尾分别使用 HeadContextTailContext 来实现。 而中间使用 DefaultChannelHandlerContext 来实现。

  • 被动事件(ChannelInboundInvoker) 的传递方式是从 HeadContextTailContext.
  • 而主动事件(ChannelOutboundInvoker)的传递方式是从 TailContextHeadContext.

HeadContext 主要完成了对主动事件形如 bind 等操作的最终socket协议建立。 而对于被动事件形如 read 的数据向后传递。 TailContext 只做了对于主动事件的向前传递工作。

DefaultChannelHandlerContext 用于封装用户通过 ChannelHandler 来实现的自定义事件处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;   // 用户自定义的逻辑实现
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}

当我们通过 addFirst, addLast 等方式插入我们的 ChannelHandler 时, 实际插入到事件流中的是 DefaultChannelHandlerContext.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        。。。
        // 构建 DefaultChannelHandlerContext
        newCtx = newContext(group, filterName(name, handler), handler);
        // 插入到事件流
        addLast0(newCtx);
        。。。。
    }
    。。。
}

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

这样只需要实现 ChannelHandler 即可。

但在实际使用过程中, 我们却不能直接继承 ChannelHandler, 因为在pipeline的事件处理与传递中都是通过 ChannelInboundHandlerChannelOutboundHandler, 来定义实现。

  • 对于每一个 ChannelInboundInvoker 的事件, 我们可以通过 ChannelInboundHandler 来处理。
  • 对于每一个 ChannelOutboundInvoker 的事件, 我们可以通过 ChannelOutboundHandler 来处理。

ChannelInboundHandler

方法对应的 invoker 的事件说明
channelRegisteredfireChannelRegistered注册事件处理逻辑实现方法
channelUnregisteredfireChannelUnregistered取消注册事件逻辑处理
channelActivefireChannelActiveChannel 激活事件处理逻辑实现方法
channelInactivefireChannelInactiveChannel 失效时的逻辑实现方法
channelReadfireChannelReadChannel 有数据可读的逻辑实现方法
channelReadCompletefireChannelReadCompleteChannel 的一个数据报文读完时的逻辑实现方法
userEventTriggeredfireUserEventTriggered用户自定义事件触发的逻辑处理
channelWritabilityChangedfireChannelWritabilityChanged当写缓冲区不足时的逻辑实现方法
exceptionCaughtfireExceptionCaught当处理事件发生异常时的逻辑处理

ChannelOutboundHandler

方法对应 invoke 的事件说明
bindbind绑定前的逻辑处理实现
connectconnect连接前的逻辑处理实现
disconnectdisconnect请求断开连接前的逻辑处理实现
closeclose关闭连接前的逻辑处理实现
deregisterderegister取消注册前的逻辑处理实现
writewrite数据刷到缓存前的逻辑处理实现
flushflush缓存数据发送到对端前的逻辑处理实现

同时上层又暴露了对应的 ChannelInboundHandlerAdapter, ChannelOutboundHandlerAdapter 适配器, 让我们可以只实现自己关注的方法。

事件传递流

image.png

ChannelPipeline 提供了基于继承接口的过滤功能, 允许我们的事件在链表中只在对应的实现中传递。

我们通过一个实际的事件来说明事件传递方式 :

image.png

Unsafe 触发一个可读数据时, 会调用 ChannelPipeline.fireChannelRead(msg) 方法进去事件流. 它会调用 AbstractChannelHandlerContext 的静态方法 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)。 虽然这个参数叫 next 但它实际是用来调用当前对象的invokeChannelRead(Object msg) 方法。 比如在ChannelPipeline中, 将 header 对象传递进去,便调用了 header.invokeChannelRead(msg).

invokeChannelRead(msg)中, 会通过 ((ChannelInboundHandler) handler()).channelRead(this, msg); 来调用 header 的 channelRead ,channelRead 用于实现对于 read 事件的具体控制逻辑。

而在 HeadContext 中, 并没有对数据做特殊处理, 而是仅仅进行了后向传递。

1
2
3
4
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.fireChannelRead(msg);
}

这样在通过 ctx.fireChannelRead(msg); 来调用 AbstractChannelHandlerContext.fireChannelRead 实现去寻找下一个 AbstractChannelHandlerContext 执行触发 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg), 以此来实现循环递归调用。

由此可见, 对于ChannelInboundHandler我们也可以通过 ctx.fireXxxxxx 来进行事件的后向传递。 ChannelOutboundHandler 稍有不同, 我们直接通过 ctx.xxxx 便可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//ChannelPipeline
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

// AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    ...
    next.invokeChannelRead(m);
    ...
}

// AbstractChannelHandlerContext
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {   // DefaultChannelHandlerContext
        ...
        ((ChannelInboundHandler) handler()).channelRead(this, msg);
        ....
    } else {  // header or tail
        fireChannelRead(msg);
    }
}

// AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}

AbstractChannelHandlerContext 中有两个方法用来通过事件处理链表,来根据当前的context 寻找下一个合适的 context 的方法。 他们分别是 findContextInboundfindContextOutbound

findContextInbound 用于沿着 header 到 tail 的方向寻找下一个合适的符合规则的 context. findContextOutbound 用于沿着 tail 到 header 方向寻找下一个合适的符合规则的 context.

方法接受一个掩码, 用来匹配是否符合规则, 比如 channelRead 事件,它的掩码是 MASK_CHANNEL_READ, 这个掩码是如何设置的? 我们可以看 mask0 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private static int mask0(Class<? extends ChannelHandler> handlerType) {
    int mask = MASK_EXCEPTION_CAUGHT;
    try {
        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_INBOUND;

            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {   // 是否有 @Skip 注解
                mask &= ~MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_UNREGISTERED;
            }
           。。。。。。
        }

        if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_OUTBOUND;

            if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_BIND;
            }
            .....
        }

        if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
            mask &= ~MASK_EXCEPTION_CAUGHT;
        }
     ....
    return mask;
}

ChannelHandler 最初被 add 到事件流后, 会首先根据 ChannelHandler 的实现类来设置掩码, 比如如果继承 ChannelInboundHandler 则掩码将会设置为与 Inbound 有关的所有掩码。 在这基础上,如果用户在方法使用 @Skip 注解, 则取消对应的掩码。

由此,便实现了 ChannelPipeline 的事件传递与处理。

eventloop

EventLoopGroup 是 Netty 内部定一个一个线程池结构, 用于执行来自 netty 内部的事件监听与逻辑处理。

EventLoopGroup 中每一个线程为 EventLoop 结构。每个 Channel 会关联一个 EventLoop,用于执行这个 Channel 触发的任务。

image.png

这个是 NioEventLoopGroup 的一个实现架构。 NioEventLoopGroup 内部会根据定义的线程池的数量启用制定数量的 NioEventLoop, 这在 MultithreadEventExecutorGroup 完成的初始化。

1
2
3
4
5
6
7
8
9
10
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    。。。
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        。。。
        children[i] = newChild(executor, args);
        。。。
    }
}

具体 newChild 的实现会放在 NioEventLoopGroup 子类中去 new NioEventLoop.

每个 NioEventLoop 内部使用一个 FastThreadLocalThread, 来建立自己的线程, 这个 Thread 是懒加载的,并不会在初始化时调用, 而是在当有任务监听时才开始建立线程。

image.png

如上所示, 当客户端发起连接请求后, 服务器首先会调用 NioEventLoopGroup 从所有的 NioEventLoop 中选择一个, 被选中的 NioEventLoop 会赋给对应的 Channel.

NioEventLoopGroupNioEventLoop 的选择算法时通过 DefaultEventExecutorChooserFactory 工厂产生的, 工厂通过线程池大小是否时 2 的幂级数选择是使用 PowerOfTwoEventExecutorChooser 还是 GenericEventExecutorChooser

1
2
3
4
5
6
7
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

这两者的实现逻辑时一样的, 都是按序,依次循环的从 NioEventLoop 数组中取出一个来, 只是 PowerOfTwoEventExecutorChooser 在实现上使用了位运算代替算术运算, 提高性能。

当选出一个 NioEventLoop 后, 如果 NioEventLoop 这时候没有线程在执行, 会首先调用 ThreadPerTaskExecutor 通过线程工厂类创建一个线程并执行任务。

FastThreadLocalThread 线程运行起来以后, 会进入循环状态, 首先判断当前 NioEventLoop 中有没有被阻塞的任务. 没有任何任务要被执行, 会进入select 阻塞, 等待对端事件到达,或者服务端任务到达被唤醒。

唤醒以后, 通过 processSelectedKeys 遍历来自 select 上的所有监听 channel 判断有无事件。 如果有就处理对端事件。 之后便通过 runAllTasks 来运行阻塞的事件。 系统允许我们设置 ioRatio 来控制 processSelectedKeysrunAllTasks 的执行时间比例。 这个比例时近似的, 因为对于 runAllTasks 来说。 每次都要保证至少执行完一个任务。

阻塞任务使用了线程安全队列来控制。 它主要存放我们主动发起的 channel 事件, 比如我们要通过 writeAndFlush() 向对端发送数据时。 这时候会将任务发送到队列中。 等待被线程执行。

bootstrap

bootstrap 是我们的 netty 服务建造者, 用于定义启动我们的 netty 服务。

image.png

ServerBootstrap 用于定义服务端的netty 程序, Bootstrap 用于定义客户端的 netty 程序。


参考资料 :

本文由作者按照 CC BY 4.0 进行授权

© . 保留部分权利。

本站采用 Jekyll 主题 Chirpy