编解码器

解码器

ByteToMessageDecoder

数据通过网络传输,最终会缓存在一个字节数组里

所以就会可能出现传输:

批注 2020-05-18 160509

接收:

批注 2020-05-18 160541

public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 如果缓冲区没有足够的数据,不进行处理,只有缓冲区累积一定的数据时,才将数据添加到out
        if (in.readableBytes() < 4){
            return;
        }
        // 添加到out后,代表解码器成功解码了一条消息
        out.add(in.readBytes(4));
    }
}
...
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
    }
});

当然这种粘包也可以通过创建一个缓冲区,每次数据到来时,将数据放入到缓冲区,如果缓冲区超过一定大小则就进行处理

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
}

ReplayingDecoder

使用了一个自定义的ByteBuf 支持更简单的操作

MessageToMessageDecoder

编码器

MessageToByteEncoder

public class ShortToByteEncoder extends MessageToByteEncoder<Short> { //← --  扩展了MessageToByteEncoder
    @Override
    public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out)
      throws Exception {
        out.writeShort(msg); // ← --  将Short 写入ByteBuf 中
    }
}

MessageToMessageEncoder

编解码器

netty 内置的 Handler 以及 编解码器

  @Override
  protected void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(
      new HttpServerCodec(),
      new HttpObjectAggregator(65536),   ← --  为握手提供聚合的HttpRequest
       new WebSocketServerProtocolHandler("/websocket"), ← --  如果被请求的端点是"/websocket",则处理该升级握手 
      new TextFrameHandler(),  ← --  TextFrameHandler 处理TextWebSocketFrame
       new BinaryFrameHandler(), ← -- BinaryFrameHandler 处理BinaryWebSocketFrame 
       new ContinuationFrameHandler()); ← -- ContinuationFrameHandler 处理ContinuationWebSocketFrame  
  }
FileInputStream in = new FileInputStream(file);   ← -- 创建一个FileInputStream 
FileRegion region = new DefaultFileRegion(  ← -- 以该文件的完整长度创建一个新的DefaultFileRegion
  in.getChannel(), 0, file.length());
channel.writeAndFlush(region);
pipeline.addLast(new ChunkedWriteHandler());  ← --  添加Chunked-WriteHandler以处理作为ChunkedInput传入的数据
    pipeline.addLast(new WriteStreamHandler());   ← --  一旦连接建立,WriteStreamHandler就开始写文件数据  

数据传输前置长度

无论使用什么分割符代表消息间隔,数据中都会可能出现这样的符号,为了避免这个问题,可以通过使用固定的字节长度代表下一条消息长度来解决

03 下雨天 03 留客天 02 天留 03 我不留

序列化

JDK

名称 描述
CompatibleObjectDecoder 和使用JDK序列化的非基于Netty的远程节点进行互操作的解码器
CompatibleObjectEncoder 和使用JDK序列化的非基于Netty的远程节点进行互操作的编码器
ObjectDecoder 构建于JDK序列化之上的使用自定义的序列化来解码的解码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取
ObjectEncoder 构建于JDK序列化之上的使用自定义的序列化来编码的编码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取

JBoss Marshalling

名称 描述
CompatibleMarshallingDecoder,CompatibleMarshallingEncoder 与只使用JDK序列化的远程节点兼容
MarshallingDecoder, MarshallingEncoder 适用于使用JBoss Marshalling的节点。这些类必须一起使用

Protocol Buffers

名称 描述
ProtobufDecoder 使用protobuf对消息进行解码
ProtobufEncoder 使用protobuf对消息进行编码
ProtobufVarint32FrameDecoder 根据消息中的Google Protocol Buffers的"Base 128 Varints"a整型长度字段值动态地分割所接收到的ByteBuf
ProtobufVarint32LengthFieldPrepender ByteBuf前追加一个Google Protocal Buffers的"Base 128 Varints"整型的长度字段值