Netty 框架学习 —— 编解码器框架
编解码器
每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式
- 编码器将消息转换为适合于传输的格式(最有可能的就是字节流)
- 解码器则是将 网络字节流转换回应用程序的消息格式
因此,编码器操作出站数据,而解码器处理入站数据
1. 解码器
在这一节,我们将研究 Netty 所提供的解码器类,并提供关于何时以及如何使用它们的具体示例,这些类覆盖了两个不同的用例:
- 将字节解码为消息 —— ByteToMessageDecoder 和 ReplayingDecoder
- 将一种消息类型解码为另一种 —— MessageToMessageDecoder
什么时候会用到解码器呢?很简单,每当需要为 ChannelPipeline 中的下一个 ChannelInboundHandler 转换入站数据时会用到。此外,得益于 ChannelPipeline 的设计,可以将多个解码器链接在一起,以实现任意复杂的转换逻辑
1.1 抽象类 ByteToMessageDecoder
将字节解码为消息是一项常见的任务,Netty 它提供了一个 抽象基类 ByteToMessageDecoder,这个类会对入站数据进行缓冲,直到它准备好处理
下面举一个如何使用这个类的示例,假设你接收了一个包含简单 int 的字节流,每个 int 都需要被单独处理。在这种情况下,你需要从入站 ByteBuf 中读取每个 int,并将它传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。为了解码这个字节流,你要扩展 ByteToMessageDecoder 类(需要注意的是,原子类型的 int 在被添加到 List 中时,会被自动装箱为 Integer)
// 扩展 ByteToMessageDecoder,以将字节解码为特定的格式
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//检查是否至少有 4 字节可读(1 个int的字节长度)
if (in.readableBytes() >= 4) {
//从入站 ByteBuf 中读取一个 int,并将其添加到解码消息的 List 中
out.add(in.readInt());
}
}
}
虽然 ByteToMessageDecoder 使得可以很简单地实现这种模式,但是你可能会发现,在调用 readInt()方法前不得不验证所输入的 ByteBuf 是否具有足够的数据有点繁琐。下面说的 ReplayingDecoder,它是一个特殊的解码器,以少量的开销消除了这个步骤
1.2 抽象类 ReplayingDecoder
ReplayingDecoder 扩展了 ByteToMessageDecoder 类,使得我们不必调用 readableBytes() 方法。它通过使用一个自定义的 ByteBuf 实现,ReplayingDecoderByteBuf,包装传入的 ByteBuf 实现了这一点,其将在内部执行该调用
这个类的完整声明是:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
类型参数 S 指定了用于状态管理的类型,其中 Void 代表不需要状态管理。下述代码展示了基于 ReplayingDecoder 重新实现的 ToIntegerDecoder
// 扩展ReplayingDecoder<Void> 以将字节解码为消息
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
// 传入的 ByteBuf 是 ReplayingDecoderByteBuf
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 从入站 ByteBuf 中读取一个 int,并将其添加到解码消息的 List 中
out.add(in.readInt());
}
}
和之前一样,从 ByteBuf 中提取的int将会被添加到List中。如果没有足够的字节可用,这 个 readInt() 方法的实现将会抛出一个 Error,其将在基类中被捕获并处理。当有更多的数据可供读取时,该 decode() 方法将会被再次调用
请注意 ReplayingDecoderByteBuf 的下面这些方面:
- 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一个 UnsupportedOperationException
- ReplayingDecoder 稍慢于 ByteToMessageDecoder
下面这些类用于处理更加复杂的用例:
- io.netty.handler.codec.LineBasedFrameDecoder —— 这个类在 Netty 内部也有使用,它使用了行尾控制字符(
或者
)来解析消息数据 - io.netty.handler.codec.http.HttpObjectDecoder —— HTTP 数据解码器
1.3 抽象类 MessageToMessageDecoder
在这一节,我们将解释如何在两个消息格式之间进行转换,例如,从一种 POJO 类型转换为另一种
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
参数类型 I 指定了 decode() 方法的输入参数 msg 的类型,它是你必须实现的唯一方法
我们将编写一个 IntegerToStringDecoder 解码器来扩展 MessageToMessageDecoder,它的 decode() 方法会把 Integer 参数转换为 String 表示。和之前一样,解码的 String 将被添加到传出的 List 中,并转发给下一个 ChannelInboundHandler
public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
//将 Integer 消息转换为它的 String 表示,并将其添加到输出的 List 中
out.add(String.valueOf(msg));
}
}
1.4 TooLongFrameException
由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提供了 TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出
为了避免这种情况,你可以设置一个最大字节数的阈值,如果超出该阈值,则会导致抛出一个 TooLongFrameException(随后会被 ChannelHandler.exceptionCaught() 方法捕获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如 HTTP)可能允许你返回一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接
下面的示例使用 TooLongFrameException 来通知 ChannelPipeline 中的其他 ChannelHandler 发生了帧大小溢出的。需要注意的是,如果你正在使用一个可变帧大小的协议,那么这种保护措施将是尤为重要的
public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
public static final int MAX_FRAME_SIZE = 1024;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readable = in.readableBytes();
// 检查缓冲区是否有超过 MAX_FRAME_SIZE 个字节
if (readable > MAX_FRAME_SIZE) {
// 跳过所有的可读字节,抛出 TooLongFrameException 并通知 ChannelHandler
in.skipBytes(readable);
throw new TooLongFrameException("Frame too big!");
}
//do something
}
}
2. 编码器
编码器实现了 ChannelOutboundHandler,并将出站数据从一种格式转换为另一种格式,和我们方才学习的解码器的功能正好相反。Netty 提供了一组类,用于帮助你编写具有以下功能的编码器:
- 将消息编码为字节
- 将消息编码为消息
2.1 抽象类 MessageToByteEncoder
前面我们看到了如何使用 ByteToMessageDecoder 来将字节转换为消息,现在我们使用 MessageToByteEncoder 来做逆向的事情
这个类只有一个方法,而解码器有两个。原因是解码器通常需要在 Channel 关闭之后产生最后一个消息(因此也就有了 decodeLast() 方法。显然这不适用于编码器的场景 —— 在连接被关闭之后仍然产生一个消息是毫无意义的
下述代码展示了 ShortToByteEncoder,其接受一个 Short 类型的实例作为消息,将它编码为Short的原子类型值,并将它写入 ByteBuf 中,其将随后被转发给 ChannelPipeline 中的 下一个 ChannelOutboundHandler。每个传出的 Short 值都将会占用 ByteBuf 中的 2 字节。
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
@Override
protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
// 将 Short 写入 ByteBuf
out.writeShort(msg);
}
}
2.2 抽象类 MessageToMessageEncoder
MessageToMessageEncoder 类的 encode() 方法提供了将入站数据从一个消息格式解码为另一种
下述代码使用 IntegerToStringEncoder 扩展了 MessageToMessageEncoder,编码器将每个出站 Integer 的 String 表示添加到了该 List 中
public class IntegerToStringEncoder extends MessageToMessageEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
out.add(String.valueOf(msg));
}
}
抽象的编解码器类
虽然我们一直将解码器和编码器作为单独的实体讨论,但是你有时将会发现在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/编码器对,以处理我们一直在学习的这两种类型的操作。正如同你可能已经猜想到的,这些类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口
为什么我们并没有一直优先于单独的解码器和编码器使用这些复合类呢?因为通过尽可能地将这两种功能分开,最大化了代码的可重用性和可扩展性,这是 Netty 设计的一个基本原则
1. 抽象类 ByteToMessageCodec
让我们来研究这样的一个场景:我们需要将字节解码为某种形式的消息,可能是 POJO,随后再次对它进行编码。ByteToMessageCodec 将为我们处理好这一切,因为它结合了 ByteToMessageDecoder 以及它的逆向 —— MessageToByteEncoder
任何的请求/响应协议都可以作为使用 ByteToMessageCodec 的理想选择。例如,在某个 SMTP 的实现中,编解码器将读取传入字节,并将它们解码为一个自定义的消息类型,如 SmtpRequest。而在接收端,当一个响应被创建时,将会产生一个 SmtpResponse,其将被编码回字节以便进行传输
2. 抽象类 MessageToMessageCodec
通过使用 MessageToMessageCodec,我们可以在一个单个的类中实现该转换的往返过程。MessageToMessageCodec 是一个参数化的类,定义如下:
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
decode() 方法是将 INBOUND_IN 类型的消息转换为 OUTBOUND_IN 类型的消息,而 encode() 方法则进行它的逆向操作。将 INBOUND_IN 类型的消息看作是通过网络发送的类型, 而将 OUTBOUND_IN 类型的消息看作是应用程序所处理的类型,将可能有所裨益
WebSocket 协议
下面关于 MessageToMessageCodec 的示例引用了一个新出的 WebSocket 协议,这个协议能实现 Web 浏览器和服务器之间的全双向通信
我们的 WebSocketConvertHandler 在参数化 MessageToMessageCodec 时将使用 INBOUND_IN 类型的 WebSocketFrame,以及 OUTBOUND_IN 类型的 MyWebSocketFrame,后者是 WebSocketConvertHandler 本身的一个静态嵌套类
public class WebSocketConvertHandler
extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
@Override
protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {
// 实例化一个指定子类型的 WebSocketFrame
ByteBuf payload = msg.getData().duplicate().retain();
switch (msg.getType()) {
case BINARY:
out.add(new BinaryWebSocketFrame(payload));
break;
case TEXT:
out.add(new TextWebSocketFrame(payload));
break;
case CLOSE:
out.add(new CloseWebSocketFrame(true, 0, payload));
break;
case CONTINUATION:
out.add(new ContinuationWebSocketFrame(payload));
break;
case PONG:
out.add(new PongWebSocketFrame(payload));
break;
case PING:
out.add(new PingWebSocketFrame(payload));
break;
default:
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
// 将 WebSocketFrame 解码为 MyWebSocketFrame,并设置 FrameType
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
ByteBuf paload = msg.content().duplicate().retain();
if (msg instanceof BinaryWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, paload));
} else
if (msg instanceof CloseWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, paload));
} else
if (msg instanceof PingWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, paload));
} else
if (msg instanceof PongWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, paload));
} else
if (msg instanceof TextWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, paload));
} else
if (msg instanceof ContinuationWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, paload));
} else {
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
public static final class MyWebSocketFrame {
public enum FrameType {
BINARY,
CLOSE,
PING,
PONG,
TEXT,
CONTINUATION
}
private final FrameType type;
private final ByteBuf data;
public MyWebSocketFrame(FrameType type, ByteBuf data) {
this.type = type;
this.data = data;
}
public FrameType getType() {
return type;
}
public ByteBuf getData() {
return data;
}
}
}
3. CombinedChannelDuplexHandler 类
正如我们前面所提到的,结合一个解码器和编码器可能会对可重用性造成影响。但是,有一 种方法既能够避免这种惩罚,又不会牺牲将一个解码器和一个编码器作为一个单独的单元部署所 带来的便利性。CombinedChannelDuplexHandler 提供了这个解决方案,其声明为:
public class CombinedChannelDuplexHandler
<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
这个类充当了 ChannelInboundHandler 和 ChannelOutboundHandler(该类的类型参数 I 和 O)的容器。通过提供分别继承了解码器类和编码器类的类型,我们可以实现一个编解码器,而又不必直接扩展抽象的编解码器类
首先,让我们研究下述代码,该实现扩展了 ByteToMessageDecoder,因为它要从 ByteBuf 读取字符
public class ByteToCharDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= 2) {
out.add(in.readChar());
}
}
}
这里的 decode() 方法一次将从 ByteBuf 中提取 2 字节,并将它们作为 char 写入到 List 中,其将会被自动装箱为 Character 对象
下述代码将 Character 转换回字节。这个类扩展了 MessageToByteEncoder,因为它需要将 char 消息编码到 ByteBuf 中。这是通过直接写入 ByteBuf 做到的
public class CharToByteEncoder extends MessageToByteEncoder<Character> {
@Override
protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
out.writeChar(msg);
}
}
既然我们有了解码器和编码器,我们可以结合它们来构建一个编解码器
// 通过该解码器和编码器实现参数化CombinedByteCharCodec
public class CombinedChannelDuplexHandler extends
io.netty.channel.CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public CombinedChannelDuplexHandler() {
// 将委托实例传递给父类
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}