业务流程
在上一节中,我们使用分治重构了会话流程,但是对于其中的Dubbo泛化调用,其实现仍然放于session之中
这样的调用耦合性太大且不适合以后对于功能的扩展。
本节我们就把API网关中的RPC泛化调用提炼出来,比照ORM框架中对数据库的调用,将 RPC 的泛化调用抽象为一种数据连接资源进行使用。这样既可以方便我们后面扩展新的连接方式,也可以让RPC泛化调用与session进行进一步的解耦操作。
本节的主要内容就在于如何将RPC的泛化调用抽象提炼为数据资源,同时为其他连接方式调用留足扩展空间。
其大致过程如下
到这里整个API网关的通信结构已经逐渐清晰,从网络协议转换->开启通信会话->获取映射关系->执行具体的请求方案,到本章要实现的抽象的数据源。把 RPC、HTTP 当做数据源来维护。
注意:本节开发主要是学习职责分离,功能模块的解耦
业务实现
本节代码结构如下,其主要在datasource包下(将泛化调用抽象了出来),session和bind包略有参数的修改
数据资源抽象
本小节主要进行各种连接(HTTP、RPC)进行数据资源抽象化。
总体设计思路
由于未来的连接大概率会多样,那么我们首先就要定义有一个枚举类,来为以后的各种连接进行准备
如下
DataSourceType.java
/**
* @program: api-gateway-core
* @ClassName DataSourceType
* @description:数据源类型
* @author: zs宝
* @create: 2025-08-08 14:57
* @Version 1.0
**/
package com.zshunbao.gateway.datasource;
public enum DataSourceType {
Dubbo,
HTTP
}
同时这里我们需要思考一种问题,我们的数据抽象改进行如何设计?
由于我们这里是对照的ORM框架对数据库的调用设计思想进行抽象设计
ORM 框架对数据库的调用设计思想是:通过对象化的 API 将业务操作延迟转换为适配不同数据库方言的 SQL,并结合连接池、事务与缓存机制,高效、安全、透明地完成数据读写。
对比类推,这里我们的抽象是希望让开发者用面向对象的方式像操作数据库资源那样操作各种连接调用,将连接调用转为类似数据库连接一样的操作,这就非常类似于ORM框架在JDBC中的操作。对于其中JDBC的操作,一般会有几个重要配置:
数据源工厂(DataSource Factory):负责根据配置创建并初始化 数据源
数据源(DataSource):统一管理数据库连接(含连接池)
数据源连接(Connection):ORM 执行 SQL、完成数据读写的实际会话通道
类比这里我们也将设计上述三个层次,进行连接调用的抽象化:
数据源工厂(DataSource Factory):负责根据配置创建并初始化 数据源
数据源(DataSource):统一管理数据库连接
数据源连接(Connection):执行各种连接调用如RPC的泛化调用
数据源连接抽象
首先是对于连接的抽象
Connection.java
/**
* @program: api-gateway-core
* @ClassName Connection
* @description: 连接接口
* @author: zs宝
* @create: 2025-08-08 15:10
* @Version 1.0
**/
package com.zshunbao.gateway.datasource;
public interface Connection {
/**
* 连接后,对于每种请求http/RPC的执行方法
* @param methodName 方法名
* @param parameterTypes 参数类型
* @param parameterNames 参数名称
* @param args 参数
* @return 执行结果
*/
Object execute(String methodName,String[] parameterTypes,String[] parameterNames,Object[] args);
}
接下来分别是对于RPC连接的实现以及HTTP的post连接实现
DubboConnection.java
/**
* @program: api-gateway-core
* @ClassName DubboConnection
* @description: Dubbo泛化调用连接
* @author: zs宝
* @create: 2025-08-08 15:15
* @Version 1.0
**/
package com.zshunbao.gateway.datasource.connection;
import com.zshunbao.gateway.datasource.Connection;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.config.utils.ReferenceConfigCache;
import org.apache.dubbo.rpc.service.GenericService;
public class DubboConnection implements Connection {
private final GenericService genericService;
/**
* 这里我们希望的是将Dubbo的远程泛化调用抽象为一种数据源进行使用
* 那么对于这种数据源是哪一个那种类型,应该由外界传入,而非我们在数据源中硬编码
* 因此这里的泛化调用封装,关于Dubbo泛化调用的几个参数应该由外界传入
* 传入后我们在数据源建立连接时就应该与Dubbbo远程泛化调用进行引用缓存,并启动dubbo服务
* @param applicationConfig 应用配置
* @param registryConfig 注册中心配置
* @param reference 引用配置
*/
public DubboConnection(ApplicationConfig applicationConfig, RegistryConfig registryConfig, ReferenceConfig<GenericService> reference) {
//连接远程服务
DubboBootstrap bootstrap=DubboBootstrap.getInstance();
bootstrap.application(applicationConfig).registry(registryConfig).reference(reference).start();
//获取泛化接口
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
genericService = cache.get(reference);
}
/**
* 执行对应的泛化调用
* @param methodName 方法名
* @param parameterTypes 参数类型
* @param parameterNames 参数名称
* @param args 参数
* @return
*/
@Override
public Object execute(String methodName, String[] parameterTypes, String[] parameterNames, Object[] args) {
return genericService.$invoke(methodName,parameterTypes,args);
}
}
HTTPConnection
(这里对于HTTP连接的post请求处理不是本节重点,所以这里只是完成了个大致响应,后续章节再进行完善)
/**
* @program: api-gateway-core
* @ClassName HTTPConnection
* @description: 封装http的post请求
* @author: zs宝
* @create: 2025-08-08 15:28
* @Version 1.0
**/
package com.zshunbao.gateway.datasource.connection;
import com.zshunbao.gateway.datasource.Connection;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.HttpClient;
import java.io.IOException;
public class HTTPConnection implements Connection {
private final HttpClient httpClient;
private PostMethod postMethod;
public HTTPConnection(String uri){
httpClient=new HttpClient();
postMethod=new PostMethod(uri);
postMethod.addRequestHeader("accept", "*/*");
postMethod.addRequestHeader("connection", "Keep-Alive");
postMethod.addRequestHeader("Content-Type", "application/json;charset=GBK");
postMethod.addRequestHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.81 Safari/537.36");
}
@Override
public Object execute(String methodName, String[] parameterTypes, String[] parameterNames, Object[] args) {
String res="";
try {
//执行对应的post方法
int code = httpClient.executeMethod(postMethod);
//如果方法执行成功,拿到响应体
if(code==200){
res=postMethod.getResponseBodyAsString();
}
}catch (IOException e){
e.printStackTrace();
}
return res;
}
}
数据源抽象
数据源主要时用于管理数据源连接,即这里的主要功能就是获得数据源连接资源
DataSource.java
/**
* @program: api-gateway-core
* @ClassName DataSource
* @description:数据源接口,将http,RPC都当作连接的数据资源使用
* @author: zs宝
* @create: 2025-08-08 15:56
* @Version 1.0
**/
package com.zshunbao.gateway.datasource;
public interface DataSource {
//获取具体的数据源连接
Connection getConnection();
}
其非池化的实现为根据不同的连接类型拿到不同的连接资源
UnpooledDataSource.java
/**
* @program: api-gateway-core
* @ClassName UnpooledDataSource
* @description: 无池化的连接池
* @author: zs宝
* @create: 2025-08-08 16:00
* @Version 1.0
**/
package com.zshunbao.gateway.datasource.unpooled;
import com.zshunbao.gateway.datasource.Connection;
import com.zshunbao.gateway.datasource.DataSource;
import com.zshunbao.gateway.datasource.DataSourceType;
import com.zshunbao.gateway.datasource.connection.DubboConnection;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.service.GenericService;
public class UnpooledDataSource implements DataSource {
//数据源需要的相关配置属性
private Configuration configuration;
private HttpStatement httpStatement;
private DataSourceType dataSourceType;
@Override
public Connection getConnection() {
switch (dataSourceType){
//TODO 这里暂时先留着,本节重点在于将Dubbo的调用抽象为数据源
case HTTP:
break;
case Dubbo:
//获取存储在本地hash map中映射的键
String applicationName = httpStatement.getApplication();
String interfaceName = httpStatement.getInterfaceName();
//获取DubboConnect需要的参数配置
ApplicationConfig applicationConfig = configuration.getApplicationConfig(applicationName);
RegistryConfig registryConfig = configuration.getRegistryConfig(applicationName);
ReferenceConfig<GenericService> reference = configuration.getReferenceConfig(interfaceName);
return new DubboConnection(applicationConfig,registryConfig,reference);
default:
break;
}
throw new RuntimeException("DataSourceType:" + dataSourceType + "没有对应的数据源实现");
}
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
public void setHttpStatement(HttpStatement httpStatement) {
this.httpStatement = httpStatement;
}
public void setDataSourceType(DataSourceType dataSourceType) {
this.dataSourceType = dataSourceType;
}
}
数据源工厂抽象
主要用于获取数据源,同时设置数据源的配置
DataSourceFactory.java
/**
* @program: api-gateway-core
* @ClassName DataSourceFactory
* @description:数据源工厂
* @author: zs宝
* @create: 2025-08-08 15:58
* @Version 1.0
**/
package com.zshunbao.gateway.datasource;
import com.zshunbao.gateway.session.Configuration;
public interface DataSourceFactory {
//获取数据源
DataSource getDataSource();
/**
* 设置数据源配置
* @param configuration 配置类
* @param uri http请求
*/
void setProperties(Configuration configuration, String uri);
}
其非池化的实现为
/**
* @program: api-gateway-core
* @ClassName UnpooledDataSourceFactory
* @description: 无池化的数据源工厂
* @author: zs宝
* @create: 2025-08-08 16:16
* @Version 1.0
**/
package com.zshunbao.gateway.datasource.unpooled;
import com.zshunbao.gateway.datasource.DataSource;
import com.zshunbao.gateway.datasource.DataSourceFactory;
import com.zshunbao.gateway.datasource.DataSourceType;
import com.zshunbao.gateway.session.Configuration;
public class UnpooledDataSourceFactory implements DataSourceFactory {
protected UnpooledDataSource dataSource;
public UnpooledDataSourceFactory(){
this.dataSource=new UnpooledDataSource();
}
@Override
public DataSource getDataSource() {
return dataSource;
}
@Override
public void setProperties(Configuration configuration, String uri) {
this.dataSource.setConfiguration(configuration);
this.dataSource.setDataSourceType(DataSourceType.Dubbo);
this.dataSource.setHttpStatement(configuration.getHttpStatement(uri));
}
}
session与bind的小修改
这一部分主要时我们将泛化调用抽象出去后,原本在session处实现的泛化调用将被修,同时对于bind出的部分也会修改
对于默认的DefaultGatewaySession.java
将修改如下
/**
* @program: api-gateway-core
* @ClassName DefaultGatewaySession
* @description:
* @author: zs宝
* @create: 2025-08-04 15:18
* @Version 1.0
**/
package com.zshunbao.gateway.session.defaults;
import com.zshunbao.gateway.bind.IGenericReference;
import com.zshunbao.gateway.datasource.Connection;
import com.zshunbao.gateway.datasource.DataSource;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
public class DefaultGatewaySession implements GatewaySession {
private Configuration configuration;
private String uri;
private DataSource dataSource;
public DefaultGatewaySession(Configuration configuration, String uri, DataSource dataSource) {
this.configuration = configuration;
this.uri=uri;
this.dataSource=dataSource;
}
@Override
public Object get(String methodName,Object parameter) {
//现在一切的泛化调用都在对应的connection中
Connection connection = dataSource.getConnection();
return connection.execute(methodName, new String[]{"java.lang.String"}, new String[]{"name"}, new Object[]{parameter});
}
@Override
public Configuration getConfiguration() {
return configuration;
}
@Override
public IGenericReference getMapper() {
return configuration.getMapper(uri,this);
}
}
由于session的设计处我们也有个DefaultGatewaySessionFactory.java
,因此对于其中的默认实现有如下调整
/**
* @program: api-gateway-core
* @ClassName DefaultGatewaySessionFactory
* @description: 默认网关会话工厂
* @author: zs宝
* @create: 2025-08-04 15:27
* @Version 1.0
**/
package com.zshunbao.gateway.session.defaults;
import com.zshunbao.gateway.datasource.DataSource;
import com.zshunbao.gateway.datasource.DataSourceFactory;
import com.zshunbao.gateway.datasource.unpooled.UnpooledDataSourceFactory;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
import com.zshunbao.gateway.session.GatewaySessionFactory;
public class DefaultGatewaySessionFactory implements GatewaySessionFactory {
private final Configuration configuration;
public DefaultGatewaySessionFactory(Configuration configuration) {
this.configuration = configuration;
}
@Override
public GatewaySession openSession(String uri) {
// 获取数据源连接信息:这里把 Dubbo、HTTP 抽象为一种连接资源
DataSourceFactory dataSourceFactory=new UnpooledDataSourceFactory();
dataSourceFactory.setProperties(configuration,uri);
DataSource dataSource = dataSourceFactory.getDataSource();
return new DefaultGatewaySession(configuration,uri,dataSource);
}
}
最后还有关于bind层下的部分修(主要集中在MapperProxy.java
处)
/**
* @program: api-gateway-core
* @ClassName MapperProxy
* @description: 泛化调用静态代理,每发起一个HTTP请求,网关就会给对应的HTTP方法执行对应的PRC远程调用
* 这里只是进行对应的映射,具体的调用我们现在解耦在另一个MapperMethod类中
* 继承MethodInterceptor接口,让本地创建的动态代理对象的执行方法被拦截,走这里定义的方法,从而走真正的Dubbo泛化调用的代理
* @author: zs宝
* @create: 2025-08-04 14:50
* @Version 1.0
**/
package com.zshunbao.gateway.bind;
import com.zshunbao.gateway.session.GatewaySession;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
import java.lang.reflect.Method;
public class MapperProxy implements MethodInterceptor {
private GatewaySession gatewaySession;
private final String uri;
public MapperProxy(GatewaySession gatewaySession, String uri) {
this.gatewaySession = gatewaySession;
this.uri = uri;
}
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
MapperMethod linkMethod=new MapperMethod(uri,method,gatewaySession.getConfiguration());
//暂时只获取第0个参数
return linkMethod.execute(gatewaySession,args[0]);
}
}
注意:这里仅仅放了最主要的修改,其余如调用对应类中的函数的参数问题,修改下即可使用,绝对不复杂。
测试
这里本节主要是将泛化调用进一步抽象出去,因此测试类与上一章的一模一样
package com.zshunbao.gateway.test;
import com.zshunbao.gateway.mapping.HttpCommandType;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.defaults.DefaultGatewaySessionFactory;
import com.zshunbao.gateway.socket.GatewaySocketServer;
import io.netty.channel.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ApiTest {
private final Logger logger = LoggerFactory.getLogger(ApiTest.class);
/**
* 测试:http://localhost:7397/wg/activity/sayHi
*/
@Test
public void test_gateway() throws InterruptedException, ExecutionException {
// 1. 创建配置信息加载注册
Configuration configuration = new Configuration();
HttpStatement httpStatement = new HttpStatement(
"api-gateway-test",
"cn.bugstack.gateway.rpc.IActivityBooth",
"sayHi",
"/wg/activity/sayHi",
HttpCommandType.GET);
configuration.addMapper(httpStatement);
// 2. 基于配置构建会话工厂
DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory(configuration);
// 3. 创建启动网关网络服务
GatewaySocketServer server = new GatewaySocketServer(gatewaySessionFactory);
Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
Channel channel = future.get();
if (null == channel) throw new RuntimeException("netty server start error channel is null");
while (!channel.isActive()) {
logger.info("netty server gateway start Ing ...");
Thread.sleep(500);
}
logger.info("netty server gateway start Done! {}", channel.localAddress());
Thread.sleep(Long.MAX_VALUE);
}
}