Netty客户端与服务器端的回顾
在之前的黑马Netty课程学习Netty的学习中,我们发现,对于客户端与服务端的代码总是总体结构上非常相似的
对于服务端的代码骨架总是为
/**
* @program: Netty-learn
* @ClassName HelloServer
* @description:
* @author: zs宝
* @create: 2025-07-19 16:06
* @Version 1.0
**/
package com.netty.helloworld;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class HelloServer {
public static void main(String[] args) {
//1、启动器,负责组装netty组件,启动服务器
new ServerBootstrap()
//2、BossEventLoop WorkerEventLoop(selector,thread) ,group 组
.group(new NioEventLoopGroup())
//3、选择 服务器的 ServerSocketChannel实现
.channel(NioServerSocketChannel.class)
//4、boss 负责处理连接 worker(child) 负责处理读写,决定了worker(child)能执行那些操作
.childHandler(
//5、channel 代表和客户端进行数据读写的通道 Initializer 初始化 负责添加别的handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//6、添加具体的handler
ch.pipeline().addLast(new StringDecoder());//将ByteBuf转换为字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){//自定义handler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
//绑定端口号
.bind(8080);
}
}
这里Netty将NIO进行封装,开发者只需要关注对于业务的处理,而业务的处理就集中在了其中的childHandler
这一快的部分,而在childHandler
中最主要进行业务处理的就是(也是主要由我们开发人员定义的就是)ch.pipeline().addLast
这一部分,这一部分就是自定义handler
的关键部分。如下例子
public class SimpleServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new SimpleServerInitializer());
b.bind(8080).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class SimpleServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 将 Handler 添加到 ChannelPipeline
pipeline.addLast(new SimpleServerHandler());
}
}
class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 处理接收到的消息
String message = (String) msg;
System.out.println("Received message: " + message);
// 响应消息
ctx.writeAndFlush("Hello, client!");
}
}
也就是说,我们对于网络编程的部分,在使用Netty的框架后,最主要的就集中在了自定义handler上面。
而对于客户端,其大致骨架总是为
/**
* @program: Netty-learn
* @ClassName HelloClient
* @description:
* @author: zs宝
* @create: 2025-07-19 16:19
* @Version 1.0
**/
package com.netty.helloworld;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
//1、启动类
new Bootstrap()
//添加group
.group(new NioEventLoopGroup())
//选择客户端channel实现
.channel(NioSocketChannel.class)
//4、添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
//在连接建立后期被调用
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
//5、连接服务器
.connect(new InetSocketAddress("localhost",8080))
.sync()//阻塞方法,直到连接建立
.channel()//代表连接对象
//6、向服务器发送数据
.writeAndFlush("hello world");
}
}
可见,在使用Netty框架后,对于整个有关Java的网络编程部分,无论是客户端还是服务端,与业务核心相关连的处理,绝大多数都始终聚焦在自定义handler的处理上。
而对于自定义的handler,可以这样理解:
首先每一个客户端与服务端建立连接,这个连接可以比作一个通道(channel),这个通道是可以全双工通信的
客户端可以用这个通道向服务端发送信息,这个信息可以比作一个快递,最开始发送出去的信息就是快递中的物品,但是快递不能直接就这样发送出去,在到达服务端之前,它会被层层包装好,以保护其中的物品的安全以及带上一些能够让其顺利到达的附加信息,而这个包装和附加其它信息的过程就是我们的handler处理过程,在信息到达服务端后,服务端会将快递拆开,这个拆开的过程也是handler的处理过程,而对于不同的快递,拆开的方式肯定是有区别的,在快递最终被拆开的原本物件到达服务端后,我们就看到了对应的信息。
服务端拿到原本信息后,也可以向客户端返还信息,这个返还信息的流程与上面客户端发送信息的流程基本一致,也就是我们购物完成后要求有一个确认收货的信息。
这里无论是客户端还是服务端都有入站和出站的流程,也就是有对应入站和出站的handler。
业务流程
那么本节作为网关项目的HTTP请求会话协议的处理,主要集中在运用Netty对HTTP请求信息的协议转换,构建网关会话服务,简单响应HTTP请求信息到页面。即客户端发送信息到到网关这个服务端,服务端要解析客户端的请求,并作出简单的响应告诉客户端,服务端确认收货。
仔细看这个流程忽略其中的API网关内部的处理部分,这很像一个会话session的处理,一次 HTTP 请求,就要完成建立连接、协议转换、方法映射、泛化调用、返回结果等一些列操作。而在这些操作过程中的各类行为处理,其实也类似于ORM框架,只不过一个是对数据库的处理,一个是对 RPC服务的处理。
这里我们做一个测试,打开网络调试助手,关注好本地的7397端口,用谷歌浏览器访问对应的端口,随意发送一段信息
结果:
收集到信息如下:
【Receive from 127.0.0.1 : 54134】:GET /syhello HTTP/1.1
Host: localhost:7397
Connection: keep-alive
Cache-Control: max-age=0
sec-ch-ua: "Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Sec-Fetch-Site: none
Sec-Fetch-Mode: navigate
Sec-Fetch-User: ?1
Sec-Fetch-Dest: document
Accept-Encoding: gzip, deflate, br, zstd
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: pma_lang=zh_CN
本节我们要做的就是拿到这些信息,同时对客户端返回一写简单的响应
服务端搭建
由于netty的服务端代码骨架是基本确定的,因此我们有如下代码
/**
* @program: api-gateway-core
* @ClassName SessionServer
* @description: 网关会话服务端
* @author: zs宝
* @create: 2025-07-25 16:48
* @Version 1.0
**/
package com.zshunbao.gateway.session;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
//实现异步调用接口,使整个会话服务端的过程异步建立
public class SessionServer implements Callable<Channel> {
private final Logger logger = LoggerFactory.getLogger(SessionServer.class);
private final EventLoopGroup boss=new NioEventLoopGroup(1);
private final EventLoopGroup worker=new NioEventLoopGroup();
private Channel channel;
@Override
public Channel call() throws Exception {
ChannelFuture channelFuture=null;
try {
ServerBootstrap b=new ServerBootstrap();
b.group(boss,worker)
.channel(NioServerSocketChannel.class)
//设置 TCP 最大连接等待队列
.option(ChannelOption.SO_BACKLOG,128)
.childHandler(new SessionChannelInitializer());
//InetSocketAddress 是 Java 用于表示“IP + 端口”的地址对象
//syncUninterruptibly() 会阻塞直到启动完成(不抛出异常)
channelFuture = b.bind(new InetSocketAddress(7397)).syncUninterruptibly();
this.channel=channelFuture.channel();
}catch (Exception e){
logger.error("socket server start error.", e);
}finally {
if(null!=channelFuture && channelFuture.isSuccess()){
logger.info("socket server start done.");
}else{
logger.error("socket server start error.");
}
}
return channel;
}
}
这里我们将其定义为处理请求会话的服务,并让其继承Callable
类,这是Java并发编程的一个基础包java.util.concurrent
下的一个接口,用于表示当前任务是可以在别的线程中被执行,并且有返回值的任务。由此客户端与服务端的连接过程就可以交给线程池中的某一个线程执行,实现异步处理的操作。
一旦实现异步操作机会意味着当前服务端类可以作为一个“带返回值的异步任务”被提交到线程池中运行。
具体行为:
当
submit(server)
时,它的call()
方法就会被执行call()
方法负责启动 Netty(绑定端口、配置处理器链)Channel
是 Netty 启动成功后监听的服务通道
这在和客户端的连接上。通过异步线程的方式,我们可以保证这个服务是启动可控的(客户端只有在服务端绑定成功,返回通道后才可以连接),并且这将允许服务端并发启动,进行多端口的监听。
同时相比于Netty客户端与服务器端的回顾
部分的普通服务端,我们异步启动后可以获取启动状态,获得并监听通道channel。可以控制这个服务端
在多线程中启动
主线程中等待启动结果
随时取消、回收、关闭
相比直接 new Thread(() -> { ... })
,可以精细控制执行流程与阻塞行为。
正如Netty客户端与服务器端的回顾
写到的,我们与网络编程的主要业务都集中在childHandler
中,因此我们在服务端中将其进行解耦,对应的处理部分将放在其它类中,尽量保证设计模式中的单一职责原则
服务处理
在每个channel的处理上,这很像现实工厂的流水线一样,每一个handler都是一道工序,而工序的顺序流程则由childHnadler
中的参数ChannelInitializer
来定义
/**
* @program: api-gateway-core
* @ClassName SessionChannelInitializer
* @description:自定义netty服务端链接的childHandler的初始化工具
* @author: zs宝
* @create: 2025-07-25 16:56
* @Version 1.0
**/
package com.zshunbao.gateway.session;
import com.zshunbao.gateway.session.handlers.SessionServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
public class SessionChannelInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//得到处理的流水线
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpObjectAggregator(1024*1024));
pipeline.addLast(new SessionServerHandler());
}
}
在这其中,由于我们这个会话是对http请求进行处理,因此,我们定义了几到工序(handler)
HttpRequestDecoder
:HTTP 请求解码器将底层字节流(ByteBuf)解析成 Netty 的 HTTP 请求对象(如
HttpRequest
,FullHttpRequest
)是 inbound 处理器(接收客户端请求)
支持 HTTP/1.0、HTTP/1.1 等文本协议格式
HttpResponseEncoder
:HTTP 响应编码器将
HttpResponse
、DefaultFullHttpResponse
等对象 编码为字节流 发送到客户端是 outbound 处理器(处理写回响应)
HttpObjectAggregator
:聚合器(将多个 HTTP 片段合成完整请求)Netty 中 HTTP 请求可能是多个部分(头、体)分片发送的
聚合器的作用是:将多个 HTTP 对象(如
HttpRequest
,HttpContent
,LastHttpContent
)合并为一个FullHttpRequest
或FullHttpResponse
才能在SessionServerHandler
中拿到完整的请求体
如果不加聚合器,的
channelRead0()
可能只会收到碎片,不好处理SessionServerHandler
:自定义业务处理器
前面的几个handler都是为了将http协议进行解析,以方便进行后续的业务处理,现在就让我们来定义好我们的业务处理handler
在Netty客户端与服务器端的回顾
中对于无论是客户端还是服务端,自定义handler都有这样一段操作
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){//自定义handler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
其中对于其中的参数
对于我们常用的ChannelHandler
对于Netty中入站的操作是ChannelInboundHandlerAdapter
,对应的出站的操作是ChannelInboundHandlerAdapter
,对于这两个类又有如下实现
其中入站操作的类,我们希望定义一个模版,让所有业务处理的类都遵循这个模版,于是利用模版设计模式
,我们定义BaseHandler
/**
* @program: api-gateway-core
* @ClassName BaseHandler
* @description:
* @author: zs宝
* @create: 2025-07-25 17:10
* @Version 1.0
**/
package com.zshunbao.gateway.session;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public abstract class BaseHandler<T> extends SimpleChannelInboundHandler<T> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, T msg) throws Exception {
session(ctx,ctx.channel(),msg);
}
protected abstract void session(ChannelHandlerContext ctx, final Channel channel,T request);
}
其中的channelRead0()
是 SimpleChannelInboundHandler
中必须实现的方法,专门用来接收并处理来自客户端的消息,这里我们进一步封装,将所有业务请求都不再放于channelRead0()
方法中,而放于我们自定义的session
方法中,这样也是出于模版模式的抽象父类控制流程,子类实现细节的设计风格。
其中的T
表示不同业务的自定义请求消息类型,这里我们用泛型来表示,使得该类可以复用于不同类型的消息协议处理
最后我们来实现本节的自定义SessionServerHandler
/**
* @program: api-gateway-core
* @ClassName SessionServerHandler
* @description:
* @author: zs宝
* @create: 2025-07-25 17:09
* @Version 1.0
**/
package com.zshunbao.gateway.session.handlers;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson.JSON;
import com.zshunbao.gateway.session.BaseHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SessionServerHandler extends BaseHandler<FullHttpRequest> {
private final Logger logger = LoggerFactory.getLogger(SessionServerHandler.class);
/**
* 网关接受到外部请求的会话信息处理,这里主要是向外部返回一部分信息,表示网关接收到了
* @param ctx
* @param channel
* @param request
*/
@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求 uri:{} method:{}", request.uri(), request.method());
//返回信息处理
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//设置响应体的内容
response.content().writeBytes(JSON.toJSONBytes("你访问路径被api-gateway管理了 URL:"+request.uri(), SerializerFeature.PrettyFormat));
//接下来进行响应头信息的设置
//获得响应头
HttpHeaders headers = response.headers();
//配置响应体类型
headers.add(HttpHeaderNames.CONTENT_TYPE,HttpHeaderValues.APPLICATION_JSON+"; charset=UTF-8");
//配置响应体的长度
headers.add(HttpHeaderNames.CONTENT_LENGTH,response.content().readableBytes());
//配置响应的持久链接
headers.add(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);
//配置跨域访问
headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN,"*");
headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS,"*");
headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS,"GET, POST, PUT, DELETE");
headers.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS,"true");
//最后向客户端通道返回网关的响应
channel.writeAndFlush(response);
}
}
其中
FullHttpRequest
:完整的 HTTP 请求,它包含了 HTTP 请求的全部组成部分HttpHeaderNames
:HTTP 头字段的常量集,定义的 所有标准 HTTP Header 字段名的常量集合,目的是避免手写字符串(更安全、更快)DefaultFullHttpResponse
:完整的 HTTP 响应体,表示一个完整的 HTTP 响应,包含:响应行(协议版本、状态码)
响应头
响应体(Body)
至此本节的业务开发已全部完成
测试
/**
* @program: api-gateway-core
* @ClassName ApiTest
* @description:HTTP请求会话协议处理测试类
* @author: zs宝
* @create: 2025-07-25 17:32
* @Version 1.0
**/
package com.zshunbao.gateway.test;
import com.zshunbao.gateway.session.SessionServer;
import io.netty.channel.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ApiTest {
private final Logger logger = LoggerFactory.getLogger(ApiTest.class);
@Test
public void test() throws ExecutionException, InterruptedException {
SessionServer server=new SessionServer();
Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
Channel channel = future.get();
if(null==channel){
throw new RuntimeException("netty server start error channel is null");
}
while (!channel.isActive()){
logger.info("NettyServer启动服务 ...");
Thread.sleep(500);
}
logger.info("NettyServer启动服务完成 {}", channel.localAddress());
Thread.sleep(Long.MAX_VALUE);
}
}
到此API网关第一章完成