业务流程
在第3章(分治处理会话流程)的时候,我们就拆分了结构,希望做到
让执行器来完成泛化调用,而不是由session会话层进行完成。本节内容就是来进行进一步的解耦以构造出执行器逻辑,并对整体的调用架构进一步的完善。
在上一节中的HTTP请求参数解析时,我们在会话层中的关于远程泛化调用还是不可避免的使用到了数据源datasource,以及其connection的抽象
我们无论是在第1,2章的业务理解,还是3,4章的业务重构过程中,都表达着这样一种思想(这种思想模仿着mybatis的源码设计):我们定义session以作为我们整个网关功能逻辑的一种概述(用会话来表达一种业务上的概念,从而与直接使用netty进行网络操作的channelRead0作出了区分,主要是表达含义更符合项目的业务),这种业务逻辑属于业务应用层这一快的东西,而将各种连接如RPC,HTTP抽象为一种数据源之列的东西进行使用,我们不希望我们的应用层的东西能够直接调用到数据资源的东西,因为这些数据资源可以放在一个池子中被很多业务使用,那么它就不能和某一个业务进行耦合,也不希望业务可以直接看到有关数据资源的东西,这就需要在中间加一层包装。
同时在原本的会话流程中,由单一职责原则来看,会话流程中还有些内容过于冗余,这些内容就是对数据源(RPC)的调用时入参和出参的封装,它们应该被提取到一个专门的类来处理,这样才能更加方便的管理。会话的职责是负责串联上下文,执行器的职责是负责对数据源的调用信息处理。
其大致逻辑如下
本节的业务也主要在于
添加 executor 执行器模块,封装对数据源(RPC)的调用,以及处理相关的入参、出参信息。同时这里还会把网关的调用结果进行封装到一个标准的类中,类里包括了code码、info描述,以及 data 结果信息。
同时简单的处理下,GatewayServerHandler 网络请求中的内容处理。让整个代码调用能看到主要干路,清晰化流程,方便后续维护。
本节的代码结构主要在于(标注的要么是新增要么是有所修改的,其余不变)
实现后最终的调用图如下
业务实现
执行器
从上面的业务流程可以知道,执行器这块主要时将原来的session会话层的多余内容进行封装处理,同时由执行器来进行对数据源连接的泛化调用。
我们先看原来的代码入参,出参,来确定我们执行器这一层的包装应该如何构建
从这一块的代码我们看到其实执行器最后就是要想办法调用数据资源连接,同时处理好有关数据资源进行泛化调用时的入参信息,并让最后session会话层中不再有datasource这些数据资源,而是执行器的实现类在其中。而查看
connection.execute(
methodName,
new String[]{parameterType},
new String[]{"ignore"},
SimpleTypeRegistry.isSimpleType(parameterType)?params.values().toArray() :new Object[]{params});
这一块的代码可以知道,其中的入参params
是http请求的参数解析,其它的真正使用到的只有methodName
,parameterType
,而这两个入参是我们http请求与远程泛化调用服务的映射类HttpStatement
的属性,最后返回一个Object
的返回值。而这个返回值,我们不在希望直接就这么返回,我们希望更规范化,因此我们首先定义一个远程泛化调用的返回标准类
GatewayResult
package com.zshunbao.gateway.executor.result;
/**
* @program: api-gateway-core
* @ClassName GatewayResult
* @description: 执行器执行后的返回结果
* @author: zs宝
* @create: 2025-08-11 15:06
* @Version 1.0
**/
public class GatewayResult {
private String code;
private String info;
private Object data;
public GatewayResult(String code, String info, Object data) {
this.code = code;
this.info = info;
this.data = data;
}
public static GatewayResult buildSuccess(Object data){
return new GatewayResult("0000","调用成功",data);
}
public static GatewayResult buildError(Object data){
return new GatewayResult("0001","调用失败",data);
}
public String getCode() {
return code;
}
public String getInfo() {
return info;
}
public Object getData() {
return data;
}
}
由上面的分析可以确定我们执行器方法的入参,出参。
Executor.java
package com.zshunbao.gateway.executor;
import com.zshunbao.gateway.executor.result.GatewayResult;
import com.zshunbao.gateway.mapping.HttpStatement;
import java.util.Map;
/**
* @program: api-gateway-core
* @ClassName Executor
* @description: 执行器接口类,用于包装Dubbo的远程泛化调用的调用执行
* @author: zs宝
* @create: 2025-08-11 14:56
* @Version 1.0
**/
public interface Executor {
/**
* 执行方法
* @param httpStatement http请求与远程调用的映射信息
* @param params http请求中所携带的参数
* @return Dubbo远程泛化调用的返回值
*/
GatewayResult exec(HttpStatement httpStatement, Map<String, Object> params);
}
对于泛化调用我们希望有一个基类使用模版设计模式将主体流程定义清楚,至于对数据资源的泛化调用是否有其余的处理,我们希望这个是可根据业务进行扩展的,应当由以后的业务定义子类来实现
于是有
BaseExecutor.java
package com.zshunbao.gateway.executor;
import com.alibaba.fastjson.JSON;
import com.zshunbao.gateway.datasource.Connection;
import com.zshunbao.gateway.executor.result.GatewayResult;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.type.SimpleTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @program: api-gateway-core
* @ClassName BaseExecutor
* @description: 执行器抽象基类-采用模版模式将整体的执行器调用业务流程定义清楚
* @author: zs宝
* @create: 2025-08-11 15:10
* @Version 1.0
**/
public abstract class BaseExecutor implements Executor {
private Logger logger = LoggerFactory.getLogger(BaseExecutor.class);
protected Configuration configuration;
protected Connection connection;
public BaseExecutor(Configuration configuration, Connection connection) {
this.configuration = configuration;
this.connection = connection;
}
/**
* 定义泛化调用流程的模版,封装起来
* @param httpStatement http请求与远程调用的映射信息
* @param params http请求中所携带的参数
* @return 执行器的封装结果
*/
@Override
public GatewayResult exec(HttpStatement httpStatement, Map<String, Object> params) {
//获取泛化调用需要的一些参数信息,并提前处理
String methodName = httpStatement.getMethodName();
String parameterType = httpStatement.getParameterType();
String[]parameterTypes=new String[]{parameterType};
Object[]args= SimpleTypeRegistry.isSimpleType(parameterType)?params.values().toArray():new Object[]{params};
logger.info("执行调用 method:{}#{}.{}({}) args:{}", httpStatement.getApplication(), httpStatement.getInterfaceName(), httpStatement.getMethodName(), JSON.toJSONString(parameterTypes), JSON.toJSONString(args));
try {
//真正的调用逻辑,我们用一个抽象方法让子类继承来实现,如果以后业务还有其他处理需求,让对应的业务处理子类来实现,为以后的扩展流出足够的空间
Object data=doExec(methodName,parameterTypes,args);
return GatewayResult.buildSuccess(data);
}catch (Exception e){
return GatewayResult.buildError(e.getMessage());
}
}
protected abstract Object doExec(String methodName, String[] parameterTypes, Object[] args);
}
由于我们现在的业务对于泛化调用只是最简单的调用,因此
SimpleExecutor.java
package com.zshunbao.gateway.executor;
import com.zshunbao.gateway.datasource.Connection;
import com.zshunbao.gateway.session.Configuration;
/**
* @program: api-gateway-core
* @ClassName SimpleExecutor
* @description: 基础执行器的子类,最简单的调用泛化调用执行器
* @author: zs宝
* @create: 2025-08-11 15:24
* @Version 1.0
**/
public class SimpleExecutor extends BaseExecutor {
public SimpleExecutor(Configuration configuration, Connection connection) {
super(configuration, connection);
}
@Override
protected Object doExec(String methodName, String[] parameterTypes, Object[] args) {
return connection.execute(methodName,parameterTypes,new String[]{"ignore"},args);
}
}
到此执行器这一块完成。
代码重构
会话的职责是负责串联上下文,执行器的职责是负责对数据源的调用信息处理。因此会话相关的业务不应该接触到数据资源的东西。
对于会话一定是有执行器的,这种执行器应当是会话服务的一种默认配置
Configuration,java
/**
* @program: api-gateway-core
* @ClassName Configuration
* @description:
* @author: zs宝
* @create: 2025-08-04 14:56
* @Version 1.0
**/
package com.zshunbao.gateway.session;
import com.zshunbao.gateway.bind.IGenericReference;
import com.zshunbao.gateway.bind.MapperRegistry;
import com.zshunbao.gateway.datasource.Connection;
import com.zshunbao.gateway.executor.Executor;
import com.zshunbao.gateway.executor.SimpleExecutor;
import com.zshunbao.gateway.mapping.HttpStatement;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.service.GenericService;
import java.util.HashMap;
import java.util.Map;
public class Configuration {
//对应的注册器
private final MapperRegistry mapperRegistry=new MapperRegistry(this);
private final Map<String, HttpStatement> httpStatements = new HashMap<>();
//根据官方文档的示例,dubbo的泛化调用的API调用方式主要有3个东西需要配置
//分别是ApplicationConfig,RegistryConfig,ReferenceConfig<GenericService>
//但是我们这是一个网关,被调用的服务方可能不止一个,因此,我们需要按照对应的服务名和配置名,将其对应起来保存
//RPC 应用服务配置项
//注意键为应用名称;值为应用服务配置项
private final Map<String, ApplicationConfig> applicationConfigMap=new HashMap<>();
//RPC 注册中心配置项
//键为应用名称;值为注册中心配置项
private final Map<String, RegistryConfig> registryConfigMap=new HashMap<>();
//RPC 泛化服务配置项
//注意这里的键:远程服务接口的全限定类名 ; 值为泛化引用配置对象
private final Map<String, ReferenceConfig<GenericService>> referenceConfigMap=new HashMap<>();
public Configuration(){
//TODO 后期从配置中获取,本节主要内容是泛化服务的调用
ApplicationConfig applicationConfig=new ApplicationConfig();
applicationConfig.setName("api-gateway-test");
applicationConfig.setQosEnable(false);
RegistryConfig registryConfig=new RegistryConfig();
//配置应用在那个注册中心可以被调用
registryConfig.setAddress("zookeeper://127.0.0.1:2181");
registryConfig.setRegister(false);
//对应的泛化服务配置
ReferenceConfig<GenericService> reference=new ReferenceConfig<>();
//配置泛化调用的服务方接口
reference.setInterface("cn.bugstack.gateway.rpc.IActivityBooth");
reference.setVersion("1.0.0");
reference.setGeneric("true");
//加入缓存中去
applicationConfigMap.put("api-gateway-test",applicationConfig);
registryConfigMap.put("api-gateway-test",registryConfig);
referenceConfigMap.put("cn.bugstack.gateway.rpc.IActivityBooth",reference);
}
//对应的服务配置项对外get方法
public ApplicationConfig getApplicationConfig(String applicationName) {
return applicationConfigMap.get(applicationName);
}
//对应的对外注册中心的get方法
public RegistryConfig getRegistryConfig(String applicationName) {
return registryConfigMap.get(applicationName);
}
//对应的对外泛化调用配置项get方法
public ReferenceConfig<GenericService> getReferenceConfig(String interfaceName) {
return referenceConfigMap.get(interfaceName);
}
//添加泛化调用方法
public void addMapper(HttpStatement httpStatement) {
mapperRegistry.addMapper(httpStatement);
}
public IGenericReference getMapper(String uri, GatewaySession gatewaySession) {
return mapperRegistry.getMapper(uri, gatewaySession);
}
public void addHttpStatement(HttpStatement httpStatement) {
httpStatements.put(httpStatement.getUri(), httpStatement);
}
public HttpStatement getHttpStatement(String uri) {
return httpStatements.get(uri);
}
public Executor newExecutor(Connection connection) {
return new SimpleExecutor(this,connection);
}
}
会话的能够使用到的资源是由会话工厂进行管理的,因此有
DefaultGatewaySessionFactory.java
/**
* @program: api-gateway-core
* @ClassName DefaultGatewaySessionFactory
* @description: 默认网关会话工厂
* @author: zs宝
* @create: 2025-08-04 15:27
* @Version 1.0
**/
package com.zshunbao.gateway.session.defaults;
import com.zshunbao.gateway.datasource.Connection;
import com.zshunbao.gateway.datasource.DataSource;
import com.zshunbao.gateway.datasource.DataSourceFactory;
import com.zshunbao.gateway.datasource.unpooled.UnpooledDataSourceFactory;
import com.zshunbao.gateway.executor.Executor;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
import com.zshunbao.gateway.session.GatewaySessionFactory;
public class DefaultGatewaySessionFactory implements GatewaySessionFactory {
private final Configuration configuration;
public DefaultGatewaySessionFactory(Configuration configuration) {
this.configuration = configuration;
}
@Override
public GatewaySession openSession(String uri) {
// 获取数据源连接信息:这里把 Dubbo、HTTP 抽象为一种连接资源
DataSourceFactory dataSourceFactory=new UnpooledDataSourceFactory();
dataSourceFactory.setProperties(configuration,uri);
DataSource dataSource = dataSourceFactory.getDataSource();
//创建执行器
Executor executor = configuration.newExecutor(dataSource.getConnection());
//创建会话
return new DefaultGatewaySession(configuration,uri,executor);
}
}
所以对于会话的修改如下
DefaultGatewaySession.java
/**
* @program: api-gateway-core
* @ClassName DefaultGatewaySession
* @description:
* @author: zs宝
* @create: 2025-08-04 15:18
* @Version 1.0
**/
package com.zshunbao.gateway.session.defaults;
import com.zshunbao.gateway.bind.IGenericReference;
import com.zshunbao.gateway.datasource.DataSource;
import com.zshunbao.gateway.executor.Executor;
import com.zshunbao.gateway.executor.SimpleExecutor;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
import java.util.Map;
public class DefaultGatewaySession implements GatewaySession {
private Configuration configuration;
private String uri;
private Executor executor;
public DefaultGatewaySession(Configuration configuration, String uri, Executor executor) {
this.configuration = configuration;
this.uri=uri;
this.executor = executor;
}
@Override
public Object get(String methodName, Map<String, Object> params) {
HttpStatement httpStatement = configuration.getHttpStatement(uri);
try {
return executor.exec(httpStatement,params);
}catch (Exception e){
throw new RuntimeException("Error exec get. Cause: "+e);
}
}
@Override
public Object post(String methodName, Map<String, Object> params) {
return get(methodName, params);
}
@Override
public Configuration getConfiguration() {
return configuration;
}
@Override
public IGenericReference getMapper() {
return configuration.getMapper(uri,this);
}
}
最后我们希望使的GatewayServerHandler.java
网络请求中的内容处理更加清晰明了
原来在其中既包括了对请求参数的处理,又包含了大量网络请求返回结果的构建,我们将其拆分重构
重构为
/**
* @program: api-gateway-core
* @ClassName GatewayServerHandler
* @description:
* @author: zs宝
* @create: 2025-08-04 15:33
* @Version 1.0
**/
package com.zshunbao.gateway.socket.handlers;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.zshunbao.gateway.bind.IGenericReference;
import com.zshunbao.gateway.session.GatewaySession;
import com.zshunbao.gateway.session.defaults.DefaultGatewaySessionFactory;
import com.zshunbao.gateway.socket.BaseHandler;
import com.zshunbao.gateway.socket.agreement.RequestParser;
import com.zshunbao.gateway.socket.agreement.ResponseParser;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class GatewayServerHandler extends BaseHandler<FullHttpRequest> {
private final Logger logger = LoggerFactory.getLogger(GatewayServerHandler.class);
private final DefaultGatewaySessionFactory gatewaySessionFactory;
public GatewayServerHandler(DefaultGatewaySessionFactory gatewaySessionFactory) {
this.gatewaySessionFactory = gatewaySessionFactory;
}
@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求 uri:{} method:{}", request.uri(), request.method());
//1、解析请求参数
RequestParser requestParser = new RequestParser(request);
String uri= requestParser.getUri();
if (uri==null || uri.equals("/favicon.ico")) return;
Map<String, Object> args = requestParser.parse();
//2、创建会话
GatewaySession gatewaySession = gatewaySessionFactory.openSession(uri);
//根据uri获得对应的mapper映射,即本地动态创建的代理类
IGenericReference reference = gatewaySession.getMapper();
//执行代理类的方法,这个方法会被拦截,然后执行Dubbo引用缓存到本地真正的泛化代理对象,从而调用远程方法执行,得到结果
Object result = reference.$invoke(args);
//3、封装返回结果
DefaultFullHttpResponse response = new ResponseParser().parse(result);
channel.writeAndFlush(response);
}
}
其中对于GatewayServerHandler.java
之前的返回结果构建,将其封装为一个专门的处理类
ResponseParser.java
package com.zshunbao.gateway.socket.agreement;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.handler.codec.http.*;
/**
* @program: api-gateway-core
* @ClassName ResponseParser
* @description: 构造网关返回结果
* @author: zs宝
* @create: 2025-08-11 15:55
* @Version 1.0
**/
public class ResponseParser {
public DefaultFullHttpResponse parse(Object result){
// 返回信息处理
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
// 设置回写数据
response.content().writeBytes(JSON.toJSONString(result).getBytes());
// 头部信息设置
HttpHeaders heads = response.headers();
// 返回内容类型
heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8");
// 响应体的长度
heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// 配置持久连接
heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
// 配置跨域访问
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
return response;
}
}
最后还有一个网关统一远程调用接口的修改
/**
* @program: api-gateway-core
* @ClassName IGenericReference
* @description: 统一泛化调用接口,无论怎样的HTTP请求,我们暴露出去的都是这个接口,一致
* @author: zs宝
* @create: 2025-07-29 15:53
* @Version 1.0
**/
package com.zshunbao.gateway.bind;
import java.util.Map;
public interface IGenericReference {
Object $invoke(Map<String, Object>params);
}
测试
测试代码与上一节的测试代码一模一样
/**
* @program: api-gateway-core
* @ClassName ApiTest
* @description:
* @author: zs宝
* @create: 2025-08-09 17:21
* @Version 1.0
**/
package com.zshunbao.gateway.test;
import com.zshunbao.gateway.mapping.HttpCommandType;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.defaults.DefaultGatewaySessionFactory;
import com.zshunbao.gateway.socket.GatewaySocketServer;
import io.netty.channel.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ApiTest {
private final Logger logger = LoggerFactory.getLogger(ApiTest.class);
/**
* 测试:
* http://localhost:7397/wg/activity/sayHi
* 参数:
* {
* "str": "10001"
* }
*
* http://localhost:7397/wg/activity/insert
* 参数:
* {
* "name":"zshunbao",
* "uid":"10001"
* }
*/
@Test
public void test_gateway() throws InterruptedException, ExecutionException {
// 1. 创建配置信息加载注册
Configuration configuration = new Configuration();
HttpStatement httpStatement01 = new HttpStatement(
"api-gateway-test",
"cn.bugstack.gateway.rpc.IActivityBooth",
"sayHi",
"java.lang.String",
"/wg/activity/sayHi",
HttpCommandType.GET);
HttpStatement httpStatement02 = new HttpStatement(
"api-gateway-test",
"cn.bugstack.gateway.rpc.IActivityBooth",
"insert",
"cn.bugstack.gateway.rpc.dto.XReq",
"/wg/activity/insert",
HttpCommandType.POST);
configuration.addMapper(httpStatement01);
configuration.addMapper(httpStatement02);
// 2. 基于配置构建会话工厂
DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory(configuration);
// 3. 创建启动网关网络服务
GatewaySocketServer server = new GatewaySocketServer(gatewaySessionFactory);
Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
Channel channel = future.get();
if (null == channel) throw new RuntimeException("netty server start error channel is null");
while (!channel.isActive()) {
logger.info("netty server gateway start Ing ...");
Thread.sleep(500);
}
logger.info("netty server gateway start Done! {}", channel.localAddress());
Thread.sleep(Long.MAX_VALUE);
}
}
注意启动zookeeper,远程服务