Netty客户端与服务器端的回顾

在之前的黑马Netty课程学习Netty的学习中,我们发现,对于客户端与服务端的代码总是总体结构上非常相似的

对于服务端的代码骨架总是为

 /**
  * @program: Netty-learn
  * @ClassName HelloServer
  * @description:
  * @author: zs宝
  * @create: 2025-07-19 16:06
  * @Version 1.0
  **/
 ​
 package com.netty.helloworld;
 ​
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.string.StringDecoder;
 ​
 public class HelloServer {
     public static void main(String[] args) {
         //1、启动器,负责组装netty组件,启动服务器
         new ServerBootstrap()
                 //2、BossEventLoop WorkerEventLoop(selector,thread) ,group 组
                 .group(new NioEventLoopGroup())
                 //3、选择 服务器的 ServerSocketChannel实现
                 .channel(NioServerSocketChannel.class)
                 //4、boss 负责处理连接 worker(child) 负责处理读写,决定了worker(child)能执行那些操作
                 .childHandler(
                         //5、channel 代表和客户端进行数据读写的通道 Initializer 初始化 负责添加别的handler
                         new ChannelInitializer<NioSocketChannel>() {
 ​
                     @Override
                     protected void initChannel(NioSocketChannel ch) throws Exception {
                         //6、添加具体的handler
                         ch.pipeline().addLast(new StringDecoder());//将ByteBuf转换为字符串
                         ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){//自定义handler
 ​
                             @Override
                             public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                 System.out.println(msg);
                             }
                         });
                     }
                 })
                 //绑定端口号
                 .bind(8080);
     }
 }
 ​

这里Netty将NIO进行封装,开发者只需要关注对于业务的处理,而业务的处理就集中在了其中的childHandler这一快的部分,而在childHandler中最主要进行业务处理的就是(也是主要由我们开发人员定义的就是)ch.pipeline().addLast这一部分,这一部分就是自定义handler的关键部分。如下例子

 public class SimpleServer {
 ​
     public static void main(String[] args) throws Exception {
         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
         EventLoopGroup workerGroup = new NioEventLoopGroup();
         try {
             ServerBootstrap b = new ServerBootstrap();
             b.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .childHandler(new SimpleServerInitializer());
 ​
             b.bind(8080).sync().channel().closeFuture().sync();
         } finally {
             bossGroup.shutdownGracefully();
             workerGroup.shutdownGracefully();
         }
     }
 }
 ​
 class SimpleServerInitializer extends ChannelInitializer<SocketChannel> {
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
         
         // 将 Handler 添加到 ChannelPipeline
         pipeline.addLast(new SimpleServerHandler());
     }
 }
 ​
 class SimpleServerHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         // 处理接收到的消息
         String message = (String) msg;
         System.out.println("Received message: " + message);
         
         // 响应消息
         ctx.writeAndFlush("Hello, client!");
     }
 }

也就是说,我们对于网络编程的部分,在使用Netty的框架后,最主要的就集中在了自定义handler上面。

而对于客户端,其大致骨架总是为

 /**
  * @program: Netty-learn
  * @ClassName HelloClient
  * @description:
  * @author: zs宝
  * @create: 2025-07-19 16:19
  * @Version 1.0
  **/
 ​
 package com.netty.helloworld;
 ​
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
 ​
 import java.net.InetSocketAddress;
 ​
 public class HelloClient {
     public static void main(String[] args) throws InterruptedException {
         //1、启动类
         new Bootstrap()
                 //添加group
                 .group(new NioEventLoopGroup())
                 //选择客户端channel实现
                 .channel(NioSocketChannel.class)
                 //4、添加处理器
                 .handler(new ChannelInitializer<NioSocketChannel>() {
                     //在连接建立后期被调用
                     @Override
                     protected void initChannel(NioSocketChannel ch) throws Exception {
                         ch.pipeline().addLast(new StringEncoder());
                     }
                 })
                 //5、连接服务器
                 .connect(new InetSocketAddress("localhost",8080))
                 .sync()//阻塞方法,直到连接建立
                 .channel()//代表连接对象
                 //6、向服务器发送数据
                 .writeAndFlush("hello world");
     }
 }
 ​

可见,在使用Netty框架后,对于整个有关Java的网络编程部分,无论是客户端还是服务端,与业务核心相关连的处理,绝大多数都始终聚焦在自定义handler的处理上。

而对于自定义的handler,可以这样理解:

  • 首先每一个客户端与服务端建立连接,这个连接可以比作一个通道(channel),这个通道是可以全双工通信

  • 客户端可以用这个通道向服务端发送信息,这个信息可以比作一个快递,最开始发送出去的信息就是快递中的物品,但是快递不能直接就这样发送出去,在到达服务端之前,它会被层层包装好,以保护其中的物品的安全以及带上一些能够让其顺利到达的附加信息,而这个包装和附加其它信息的过程就是我们的handler处理过程,在信息到达服务端后,服务端会将快递拆开,这个拆开的过程也是handler的处理过程,而对于不同的快递,拆开的方式肯定是有区别的,在快递最终被拆开的原本物件到达服务端后,我们就看到了对应的信息。

  • 服务端拿到原本信息后,也可以向客户端返还信息,这个返还信息的流程与上面客户端发送信息的流程基本一致,也就是我们购物完成后要求有一个确认收货的信息。

  • 这里无论是客户端还是服务端都有入站和出站的流程,也就是有对应入站和出站的handler。

业务流程

那么本节作为网关项目的HTTP请求会话协议的处理,主要集中在运用Netty对HTTP请求信息的协议转换,构建网关会话服务,简单响应HTTP请求信息到页面。即客户端发送信息到到网关这个服务端,服务端要解析客户端的请求,并作出简单的响应告诉客户端,服务端确认收货。

仔细看这个流程忽略其中的API网关内部的处理部分,这很像一个会话session的处理,一次 HTTP 请求,就要完成建立连接、协议转换、方法映射、泛化调用、返回结果等一些列操作。而在这些操作过程中的各类行为处理,其实也类似于ORM框架,只不过一个是对数据库的处理,一个是对 RPC服务的处理。

这里我们做一个测试,打开网络调试助手,关注好本地的7397端口,用谷歌浏览器访问对应的端口,随意发送一段信息

结果:

image-20250726152244624

收集到信息如下:

 【Receive from 127.0.0.1 : 54134】:GET /syhello HTTP/1.1
 Host: localhost:7397
 Connection: keep-alive
 Cache-Control: max-age=0
 sec-ch-ua: "Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"
 sec-ch-ua-mobile: ?0
 sec-ch-ua-platform: "Windows"
 Upgrade-Insecure-Requests: 1
 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36
 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
 Sec-Fetch-Site: none
 Sec-Fetch-Mode: navigate
 Sec-Fetch-User: ?1
 Sec-Fetch-Dest: document
 Accept-Encoding: gzip, deflate, br, zstd
 Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
 Cookie: pma_lang=zh_CN
 ​

本节我们要做的就是拿到这些信息,同时对客户端返回一写简单的响应

服务端搭建

由于netty的服务端代码骨架是基本确定的,因此我们有如下代码

 /**
  * @program: api-gateway-core
  * @ClassName SessionServer
  * @description: 网关会话服务端
  * @author: zs宝
  * @create: 2025-07-25 16:48
  * @Version 1.0
  **/
 ​
 package com.zshunbao.gateway.session;
 ​
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.Channel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ​
 ​
 import java.net.InetSocketAddress;
 import java.util.concurrent.Callable;
 //实现异步调用接口,使整个会话服务端的过程异步建立
 public class SessionServer implements Callable<Channel> {
     private final Logger logger = LoggerFactory.getLogger(SessionServer.class);
     private final EventLoopGroup boss=new NioEventLoopGroup(1);
     private final EventLoopGroup worker=new NioEventLoopGroup();
     private Channel channel;
 ​
     @Override
     public Channel call() throws Exception {
         ChannelFuture channelFuture=null;
         try {
             ServerBootstrap b=new ServerBootstrap();
             b.group(boss,worker)
                     .channel(NioServerSocketChannel.class)
                     //设置 TCP 最大连接等待队列
                     .option(ChannelOption.SO_BACKLOG,128)
                     .childHandler(new SessionChannelInitializer());
             //InetSocketAddress 是 Java 用于表示“IP + 端口”的地址对象
             //syncUninterruptibly() 会阻塞直到启动完成(不抛出异常)
             channelFuture = b.bind(new InetSocketAddress(7397)).syncUninterruptibly();
             this.channel=channelFuture.channel();
         }catch (Exception e){
             logger.error("socket server start error.", e);
         }finally {
             if(null!=channelFuture && channelFuture.isSuccess()){
                 logger.info("socket server start done.");
             }else{
                 logger.error("socket server start error.");
             }
         }
         return channel;
     }
 }
 ​

这里我们将其定义为处理请求会话的服务,并让其继承Callable类,这是Java并发编程的一个基础包java.util.concurrent下的一个接口,用于表示当前任务是可以在别的线程中被执行,并且有返回值的任务。由此客户端与服务端的连接过程就可以交给线程池中的某一个线程执行,实现异步处理的操作。

一旦实现异步操作机会意味着当前服务端类可以作为一个“带返回值的异步任务”被提交到线程池中运行。

具体行为:

  • submit(server) 时,它的 call() 方法就会被执行

  • call() 方法负责启动 Netty(绑定端口、配置处理器链)

  • Channel 是 Netty 启动成功后监听的服务通道

这在和客户端的连接上。通过异步线程的方式,我们可以保证这个服务是启动可控的(客户端只有在服务端绑定成功,返回通道后才可以连接),并且这将允许服务端并发启动,进行多端口的监听。

同时相比于Netty客户端与服务器端的回顾部分的普通服务端,我们异步启动后可以获取启动状态,获得并监听通道channel。可以控制这个服务端

  • 在多线程中启动

  • 主线程中等待启动结果

  • 随时取消、回收、关闭

相比直接 new Thread(() -> { ... }),可以精细控制执行流程与阻塞行为

正如Netty客户端与服务器端的回顾写到的,我们与网络编程的主要业务都集中在childHandler中,因此我们在服务端中将其进行解耦,对应的处理部分将放在其它类中,尽量保证设计模式中的单一职责原则

服务处理

在每个channel的处理上,这很像现实工厂的流水线一样,每一个handler都是一道工序,而工序的顺序流程则由childHnadler中的参数ChannelInitializer来定义

 /**
  * @program: api-gateway-core
  * @ClassName SessionChannelInitializer
  * @description:自定义netty服务端链接的childHandler的初始化工具
  * @author: zs宝
  * @create: 2025-07-25 16:56
  * @Version 1.0
  **/
 ​
 package com.zshunbao.gateway.session;
 ​
 ​
 import com.zshunbao.gateway.session.handlers.SessionServerHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 ​
 public class SessionChannelInitializer extends ChannelInitializer<SocketChannel>{
 ​
     @Override
     protected void initChannel(SocketChannel channel) throws Exception {
         //得到处理的流水线
         ChannelPipeline pipeline = channel.pipeline();
         pipeline.addLast(new HttpRequestDecoder());
         pipeline.addLast(new HttpResponseEncoder());
         pipeline.addLast(new HttpObjectAggregator(1024*1024));
         pipeline.addLast(new SessionServerHandler());
     }
 }
 ​

在这其中,由于我们这个会话是对http请求进行处理,因此,我们定义了几到工序(handler)

  • HttpRequestDecoder:HTTP 请求解码器

    • 将底层字节流(ByteBuf)解析成 Netty 的 HTTP 请求对象(如 HttpRequest, FullHttpRequest

    • inbound 处理器(接收客户端请求)

    • 支持 HTTP/1.0、HTTP/1.1 等文本协议格式

  • HttpResponseEncoder:HTTP 响应编码器

    • HttpResponseDefaultFullHttpResponse 等对象 编码为字节流 发送到客户端

    • outbound 处理器(处理写回响应)

  • HttpObjectAggregator:聚合器(将多个 HTTP 片段合成完整请求)

    • Netty 中 HTTP 请求可能是多个部分(头、体)分片发送的

    • 聚合器的作用是:将多个 HTTP 对象(如 HttpRequest, HttpContent, LastHttpContent)合并为一个 FullHttpRequestFullHttpResponse才能在 SessionServerHandler 中拿到完整的请求体

    如果不加聚合器,的 channelRead0() 可能只会收到碎片,不好处理

  • SessionServerHandler自定义业务处理器

前面的几个handler都是为了将http协议进行解析,以方便进行后续的业务处理,现在就让我们来定义好我们的业务处理handler

Netty客户端与服务器端的回顾中对于无论是客户端还是服务端,自定义handler都有这样一段操作

 ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){//自定义handler
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(msg);
     }
 });

其中对于其中的参数

对于我们常用的ChannelHandler

对于Netty中入站的操作是ChannelInboundHandlerAdapter,对应的出站的操作是ChannelInboundHandlerAdapter,对于这两个类又有如下实现

其中入站操作的类,我们希望定义一个模版,让所有业务处理的类都遵循这个模版,于是利用模版设计模式,我们定义BaseHandler

 /**
  * @program: api-gateway-core
  * @ClassName BaseHandler
  * @description:
  * @author: zs宝
  * @create: 2025-07-25 17:10
  * @Version 1.0
  **/
 ​
 package com.zshunbao.gateway.session;
 ​
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 ​
 public abstract class BaseHandler<T> extends SimpleChannelInboundHandler<T> {
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, T msg) throws Exception {
         session(ctx,ctx.channel(),msg);
     }
     protected abstract void session(ChannelHandlerContext ctx, final Channel channel,T request);
 }
 ​

其中的channelRead0()SimpleChannelInboundHandler 中必须实现的方法,专门用来接收并处理来自客户端的消息,这里我们进一步封装,将所有业务请求都不再放于channelRead0()方法中,而放于我们自定义的session方法中,这样也是出于模版模式的抽象父类控制流程,子类实现细节的设计风格。

其中的T表示不同业务的自定义请求消息类型,这里我们用泛型来表示,使得该类可以复用于不同类型的消息协议处理

最后我们来实现本节的自定义SessionServerHandler

 /**
  * @program: api-gateway-core
  * @ClassName SessionServerHandler
  * @description:
  * @author: zs宝
  * @create: 2025-07-25 17:09
  * @Version 1.0
  **/
 ​
 package com.zshunbao.gateway.session.handlers;
 ​
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.fastjson.JSON;
 import com.zshunbao.gateway.session.BaseHandler;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 public class SessionServerHandler extends BaseHandler<FullHttpRequest> {
     private final Logger logger = LoggerFactory.getLogger(SessionServerHandler.class);
 ​
     /**
      * 网关接受到外部请求的会话信息处理,这里主要是向外部返回一部分信息,表示网关接收到了
      * @param ctx
      * @param channel
      * @param request
      */
     @Override
     protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
         logger.info("网关接收请求 uri:{} method:{}", request.uri(), request.method());
         //返回信息处理
         DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
         //设置响应体的内容
         response.content().writeBytes(JSON.toJSONBytes("你访问路径被api-gateway管理了 URL:"+request.uri(), SerializerFeature.PrettyFormat));
         //接下来进行响应头信息的设置
         //获得响应头
         HttpHeaders headers = response.headers();
         //配置响应体类型
         headers.add(HttpHeaderNames.CONTENT_TYPE,HttpHeaderValues.APPLICATION_JSON+"; charset=UTF-8");
         //配置响应体的长度
         headers.add(HttpHeaderNames.CONTENT_LENGTH,response.content().readableBytes());
         //配置响应的持久链接
         headers.add(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);
         //配置跨域访问
         headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN,"*");
         headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS,"*");
         headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS,"GET, POST, PUT, DELETE");
         headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS,"true");
 ​
         //最后向客户端通道返回网关的响应
         channel.writeAndFlush(response);
     }
 }
 ​

其中

  • FullHttpRequest:完整的 HTTP 请求,它包含了 HTTP 请求的全部组成部分

  • HttpHeaderNames:HTTP 头字段的常量集,定义的 所有标准 HTTP Header 字段名的常量集合,目的是避免手写字符串(更安全、更快)

  • DefaultFullHttpResponse:完整的 HTTP 响应体,表示一个完整的 HTTP 响应,包含:

    • 响应行(协议版本、状态码)

    • 响应头

    • 响应体(Body)

至此本节的业务开发已全部完成

测试

 /**
  * @program: api-gateway-core
  * @ClassName ApiTest
  * @description:HTTP请求会话协议处理测试类
  * @author: zs宝
  * @create: 2025-07-25 17:32
  * @Version 1.0
  **/
 ​
 package com.zshunbao.gateway.test;
 ​
 import com.zshunbao.gateway.session.SessionServer;
 import io.netty.channel.Channel;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ​
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 ​
 public class ApiTest {
     private final Logger logger = LoggerFactory.getLogger(ApiTest.class);
     @Test
     public void test() throws ExecutionException, InterruptedException {
         SessionServer server=new SessionServer();
         Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
         Channel channel = future.get();
         if(null==channel){
             throw new RuntimeException("netty server start error channel is null");
         }
         while (!channel.isActive()){
             logger.info("NettyServer启动服务 ...");
             Thread.sleep(500);
         }
         logger.info("NettyServer启动服务完成 {}", channel.localAddress());
 ​
         Thread.sleep(Long.MAX_VALUE);
     }
 }
 ​

到此API网关第一章完成