业务流程
在上一章,我们实现了代理RPC的泛化调用功能,但是
本章主要是对向一章的内容进行一个分治重构.
上一章节的实现相当于是直接在网络会话中调用我们的绑定关系PRC泛化调用,
但是现在有有一个问题就是我们的网络会话其实是将网络socket通信和session会话耦合在一起的,如果我们未来想在网络通信这一块上加上流量整形,熔断或者鉴权的机制,这个时候整个会话的代码会显得非常庞大,但是本身按照设计模式的单一职责而言,会话这里本身应该不会收到这部分网络添加的功能而影响。而本身像流量整形,鉴权这些本身是没有语义的,就是写功能性的技术点,甚至很多可能Netty本身就有所提供,这些本身也不需要涉及到网关的场景,只是一些基本的功能,这种放在网络通信部分较为合适。另外的关于RPC泛化调用的部分,对于映射的RPC泛化调用,上一章也只有一个绑定关系,即将一个网关接口和RPC进行一个绑定,但是实际来讲,RPC并不需要在这个时候进行一个泛化调用的绑定,泛化调用更多是一种通用的业务型逻辑,我们或许可以把它当成一种资源池进行使用。
这里再解释一下会话的含义
会话(session):这里的会话指的是我们专门为这个业务客户端与服务端的功能调用做的统称,我们不希望再使用netty中的channel来表示,也不希望将网关的功能代码置于netty的通信方法之中美因茨我们单独在BaseHandler中定义了一个session方法,来表示业务的处理,与业务所有有关的handler都会实现这个session
所以现在我们希望对业务项目进行重构,对模块进行新的划分
我们将原本的两个模块拆分为四个模块,分别是:网络通信(socket),网络会话(session),映射器(mapper),泛化调用。这里如上图在泛化调用和执行器那块为虚线,这部分内容暂时由于功能不足,不好直接拆分,我们将在后续章节进行完善。
注意:这种重构的代码,代码本身可能变化不大,但是为什么要这么拆分,拆分后的流程发生了什么变化,调用顺序发生了什么改变,重构的思想是什么,这些才是最难以理解的,这块本节学习暂时学到的经验就是不断的debug调试,然后不断画图去看
在拆分后,我们的业务流程将会变为
这里由上图看到,其实代码的执行逻辑与第二章的梳理总结部分没有太多差别,主要时重构了代码模块,调整了代码位置所引起了不同,接下来我们将依次解释重构的思路。
这里重构后的项目如下图,主要拆分出来了以下几个包
Socket(网络通信):主要负责使用netty进行外部到网关的通信请求,接收客户端的HTTP请求
Session(网络会话):创建会话接口,绑定配置项,同时提供了泛化调用的真正实现。
Mapping(映射模块):这是本节唯一新增的代码,将原本的HTTP请求,进行映射我们后续进行泛化调用需要的配置,将其抽出来,变成一个类。
bind(绑定模块):主要负责在网关运行时进行动态代理类的创建,这个动态代理类是个空壳子,在这里我们主要的是将这个空壳子内的方法与真正的本地引用缓存中的Dubbo泛化调用建立联系,使得本地动态代理类执行方法时可以变向为执行泛化调用的的方法。
业务重构
网络会话模块
这里主要分为3个类
GatewaySession接口:定义session接口的模版,其中包含了真正的泛化调用和Dubbo的引用缓存,默认实现类DefaultGatewaySession
GatewaySessionFactory:网关会话工厂接口,用于创建开启会话,默认实现类DefaultGatewaySessionFactory
Configuration类:会话的配置类,里面记录了泛化调用需要的信息
代码如下
Configuration.java
/**
* @program: api-gateway-core
* @ClassName Configuration
* @description:
* @author: zs宝
* @create: 2025-08-04 14:56
* @Version 1.0
**/
package com.zshunbao.gateway.session;
import com.zshunbao.gateway.bind.IGenericReference;
import com.zshunbao.gateway.bind.MapperRegistry;
import com.zshunbao.gateway.mapping.HttpStatement;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.service.GenericService;
import java.util.HashMap;
import java.util.Map;
public class Configuration {
//对应的注册器
private final MapperRegistry mapperRegistry=new MapperRegistry(this);
private final Map<String, HttpStatement> httpStatements = new HashMap<>();
//根据官方文档的示例,dubbo的泛化调用的API调用方式主要有3个东西需要配置
//分别是ApplicationConfig,RegistryConfig,ReferenceConfig<GenericService>
//但是我们这是一个网关,被调用的服务方可能不止一个,因此,我们需要按照对应的服务名和配置名,将其对应起来保存
//RPC 应用服务配置项
//注意键为应用名称;值为应用服务配置项
private final Map<String, ApplicationConfig> applicationConfigMap=new HashMap<>();
//RPC 注册中心配置项
//键为应用名称;值为注册中心配置项
private final Map<String, RegistryConfig> registryConfigMap=new HashMap<>();
//RPC 泛化服务配置项
//注意这里的键:远程服务接口的全限定类名 ; 值为泛化引用配置对象
private final Map<String, ReferenceConfig<GenericService>> referenceConfigMap=new HashMap<>();
public Configuration(){
//TODO 后期从配置中获取,本节主要内容是泛化服务的调用
ApplicationConfig applicationConfig=new ApplicationConfig();
applicationConfig.setName("api-gateway-test");
applicationConfig.setQosEnable(false);
RegistryConfig registryConfig=new RegistryConfig();
//配置应用在那个注册中心可以被调用
registryConfig.setAddress("zookeeper://127.0.0.1:2181");
registryConfig.setRegister(false);
//对应的泛化服务配置
ReferenceConfig<GenericService> reference=new ReferenceConfig<>();
//配置泛化调用的服务方接口
reference.setInterface("cn.bugstack.gateway.rpc.IActivityBooth");
reference.setVersion("1.0.0");
reference.setGeneric("true");
//加入缓存中去
applicationConfigMap.put("api-gateway-test",applicationConfig);
registryConfigMap.put("api-gateway-test",registryConfig);
referenceConfigMap.put("cn.bugstack.gateway.rpc.IActivityBooth",reference);
}
//对应的服务配置项对外get方法
public ApplicationConfig getApplicationConfig(String applicationName) {
return applicationConfigMap.get(applicationName);
}
//对应的对外注册中心的get方法
public RegistryConfig getRegistryConfig(String applicationName) {
return registryConfigMap.get(applicationName);
}
//对应的对外泛化调用配置项get方法
public ReferenceConfig<GenericService> getReferenceConfig(String interfaceName) {
return referenceConfigMap.get(interfaceName);
}
//添加泛化调用方法
public void addMapper(HttpStatement httpStatement) {
mapperRegistry.addMapper(httpStatement);
}
public IGenericReference getMapper(String uri, GatewaySession gatewaySession) {
return mapperRegistry.getMapper(uri, gatewaySession);
}
public void addHttpStatement(HttpStatement httpStatement) {
httpStatements.put(httpStatement.getUri(), httpStatement);
}
public HttpStatement getHttpStatement(String uri) {
return httpStatements.get(uri);
}
}
GatewaySession.java
/**
* @program: api-gateway-core
* @ClassName GatewaySession
* @description:用户处理网关 HTTP 请求
* @author: zs宝
* @create: 2025-08-04 14:47
* @Version 1.0
**/
package com.zshunbao.gateway.session;
import com.zshunbao.gateway.bind.IGenericReference;
public interface GatewaySession {
public Object get(String uri, Object args);
public Configuration getConfiguration();
IGenericReference getMapper(String uri);
}
DefaultGatewaySession.java
/**
* @program: api-gateway-core
* @ClassName DefaultGatewaySession
* @description:
* @author: zs宝
* @create: 2025-08-04 15:18
* @Version 1.0
**/
package com.zshunbao.gateway.session.defaults;
import com.zshunbao.gateway.bind.IGenericReference;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.config.utils.ReferenceConfigCache;
import org.apache.dubbo.rpc.service.GenericService;
public class DefaultGatewaySession implements GatewaySession {
private Configuration configuration;
public DefaultGatewaySession(Configuration configuration) {
this.configuration = configuration;
}
@Override
public Object get(String uri, Object args) {
/* 以下这部分内容,后续拆到执行器中处理 */
//配置信息拿到
HttpStatement httpStatement = configuration.getHttpStatement(uri);
String application = httpStatement.getApplication();
String interfaceName = httpStatement.getInterfaceName();
//获取基础服务
ApplicationConfig applicationConfig = configuration.getApplicationConfig(application);
RegistryConfig registryConfig = configuration.getRegistryConfig(application);
ReferenceConfig<GenericService> reference = configuration.getReferenceConfig(interfaceName);
//启动Dubbo服务
DubboBootstrap bootstrap=DubboBootstrap.getInstance();
bootstrap.application(applicationConfig).registry(registryConfig).reference(reference).start();
//获取对应的泛化调用服务
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
GenericService genericService = cache.get(reference);
return genericService.$invoke(httpStatement.getMethodName(),new String[]{"java.lang.String"},new Object[]{"小傅哥"});
}
@Override
public Configuration getConfiguration() {
return configuration;
}
@Override
public IGenericReference getMapper(String uri) {
return configuration.getMapper(uri,this);
}
}
GatewaySessionFactory.java
/**
* @program: api-gateway-core
* @ClassName GatewaySessionFactory
* @description:网关会话工厂接口
* @author: zs宝
* @create: 2025-08-04 15:12
* @Version 1.0
**/
package com.zshunbao.gateway.session;
public interface GatewaySessionFactory {
GatewaySession openSession();
}
DefaultGatewaySessionFactory.java
/**
* @program: api-gateway-core
* @ClassName DefaultGatewaySessionFactory
* @description:
* @author: zs宝
* @create: 2025-08-04 15:27
* @Version 1.0
**/
package com.zshunbao.gateway.session.defaults;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
import com.zshunbao.gateway.session.GatewaySessionFactory;
public class DefaultGatewaySessionFactory implements GatewaySessionFactory {
private final Configuration configuration;
public DefaultGatewaySessionFactory(Configuration configuration) {
this.configuration = configuration;
}
@Override
public GatewaySession openSession() {
return new DefaultGatewaySession(configuration);
}
}
网络通信模块
这部分的主要功能就是上线客户端与服务端的通信服务,接收HTTP请求,并对请求进行处理(handler)
GatewaySocketServer.java
/**
* @program: api-gateway-core
* @ClassName GatewaySocketServer
* @description: 网关会话服务端
* @author: zs宝
* @create: 2025-07-25 16:48
* @Version 1.0
**/
package com.zshunbao.gateway.socket;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.defaults.DefaultGatewaySessionFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
//实现异步调用接口,使整个会话服务端的过程异步建立
public class GatewaySocketServer implements Callable<Channel> {
private final Logger logger = LoggerFactory.getLogger(GatewaySocketServer.class);
private final EventLoopGroup boss=new NioEventLoopGroup(1);
private final EventLoopGroup worker=new NioEventLoopGroup();
private Channel channel;
private DefaultGatewaySessionFactory gatewaySessionFactory;
public GatewaySocketServer(DefaultGatewaySessionFactory gatewaySessionFactory) {
this.gatewaySessionFactory = gatewaySessionFactory;
}
@Override
public Channel call() throws Exception {
ChannelFuture channelFuture=null;
try {
ServerBootstrap b=new ServerBootstrap();
b.group(boss,worker)
.channel(NioServerSocketChannel.class)
//设置 TCP 最大连接等待队列
.option(ChannelOption.SO_BACKLOG,128)
.childHandler(new SessionChannelInitializer(gatewaySessionFactory));
//InetSocketAddress 是 Java 用于表示“IP + 端口”的地址对象
//syncUninterruptibly() 会阻塞直到启动完成(不抛出异常)
channelFuture = b.bind(new InetSocketAddress(7397)).syncUninterruptibly();
this.channel=channelFuture.channel();
}catch (Exception e){
logger.error("socket server start error.", e);
}finally {
if(null!=channelFuture && channelFuture.isSuccess()){
logger.info("socket server start done.");
}else{
logger.error("socket server start error.");
}
}
return channel;
}
}
SessionChannelInitializer.java
/**
* @program: api-gateway-core
* @ClassName SessionChannelInitializer
* @description:自定义netty服务端链接的childHandler的初始化工具
* @author: zs宝
* @create: 2025-07-25 16:56
* @Version 1.0
**/
package com.zshunbao.gateway.socket;
import com.zshunbao.gateway.session.defaults.DefaultGatewaySessionFactory;
import com.zshunbao.gateway.socket.handlers.GatewayServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
public class SessionChannelInitializer extends ChannelInitializer<SocketChannel>{
private DefaultGatewaySessionFactory gatewaySessionFactory;
public SessionChannelInitializer(DefaultGatewaySessionFactory gatewaySessionFactory) {
this.gatewaySessionFactory = gatewaySessionFactory;
}
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//得到处理的流水线
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpObjectAggregator(1024*1024));
pipeline.addLast(new GatewayServerHandler(gatewaySessionFactory));
}
}
BaseHandler.java
/**
* @program: api-gateway-core
* @ClassName BaseHandler
* @description:
* @author: zs宝
* @create: 2025-07-25 17:10
* @Version 1.0
**/
package com.zshunbao.gateway.socket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public abstract class BaseHandler<T> extends SimpleChannelInboundHandler<T> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, T msg) throws Exception {
session(ctx,ctx.channel(),msg);
}
protected abstract void session(ChannelHandlerContext ctx, final Channel channel,T request);
}
GatewayServerHandler.java
/**
* @program: api-gateway-core
* @ClassName GatewayServerHandler
* @description:
* @author: zs宝
* @create: 2025-08-04 15:33
* @Version 1.0
**/
package com.zshunbao.gateway.socket.handlers;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.zshunbao.gateway.bind.IGenericReference;
import com.zshunbao.gateway.session.GatewaySession;
import com.zshunbao.gateway.session.defaults.DefaultGatewaySessionFactory;
import com.zshunbao.gateway.socket.BaseHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GatewayServerHandler extends BaseHandler<FullHttpRequest> {
private final Logger logger = LoggerFactory.getLogger(GatewayServerHandler.class);
private final DefaultGatewaySessionFactory gatewaySessionFactory;
public GatewayServerHandler(DefaultGatewaySessionFactory gatewaySessionFactory) {
this.gatewaySessionFactory = gatewaySessionFactory;
}
@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求 uri:{} method:{}", request.uri(), request.method());
String uri = request.getUri();
if (uri.equals("/favicon.ico")) return;
//创建会话
GatewaySession gatewaySession = gatewaySessionFactory.openSession();
//根据uri获得对应的mapper映射,即本地动态创建的代理类
IGenericReference reference = gatewaySession.getMapper(uri);
//执行代理类的方法,这个方法会被拦截,然后执行Dubbo引用缓存到本地真正的泛化代理对象,从而调用远程方法执行,得到结果
String result = reference.$invoke("test") + " " + System.currentTimeMillis();
// 返回信息处理
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
// 设置回写数据
response.content().writeBytes(JSON.toJSONBytes(result, SerializerFeature.PrettyFormat));
// 头部信息设置
HttpHeaders heads = response.headers();
// 返回内容类型
heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8");
// 响应体的长度
heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// 配置持久连接
heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
// 配置跨域访问
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
channel.writeAndFlush(response);
}
}
映射模块
这个模块主要时将HTTP请求映射为我们后续泛化调用需要的信息
HttpCommandType.java
/**
* @program: api-gateway-core
* @ClassName HttpCommandType
* @description: HTTP调用类型
* @author: zs宝
* @create: 2025-08-04 14:41
* @Version 1.0
**/
package com.zshunbao.gateway.mapping;
public enum HttpCommandType {
UNKNOWN,
GET,
POST,
PUT,
DELETE
}
HttpStatement.java
/**
* @program: api-gateway-core
* @ClassName HttpStatement
* @description: 网关接口映射信息
* @author: zs宝
* @create: 2025-08-04 14:40
* @Version 1.0
**/
package com.zshunbao.gateway.mapping;
public class HttpStatement {
/** 应用名称; */
private String application;
/** 服务接口;RPC、其他 */
private String interfaceName;
/** 服务方法;RPC#method */
private String methodName;
/** 网关接口 */
private String uri;
/** 接口类型;GET、POST、PUT、DELETE */
private HttpCommandType httpCommandType;
public HttpStatement(String application, String interfaceName, String methodName, String uri, HttpCommandType httpCommandType){
this.application = application;
this.interfaceName = interfaceName;
this.methodName = methodName;
this.uri = uri;
this.httpCommandType = httpCommandType;
}
public String getApplication() {
return application;
}
public String getInterfaceName() {
return interfaceName;
}
public String getMethodName() {
return methodName;
}
public String getUri() {
return uri;
}
public HttpCommandType getHttpCommandType() {
return httpCommandType;
}
}
绑定模块
这里就是我们动态创建代理对象,并让代理对象与泛化调用代理对象建立联系的地方.这一块的参数由于我们已经对于请求做了映射,因此后续的参数大多做了修改
IGenericReference.java:定义了一个泛化调用接口,包含一个
$invoke
方法。
MapperProxy.java:实现了
MethodInterceptor
接口,用于拦截代理对象的方法调用。
MapperProxyFactory.java:是一个静态代理工厂,用于创建泛化调用的代理对象。
MapperMethod.java:用于执行具体的网关调用方法,如GET、POST、PUT、DELETE。
MapperRegistry.java:用于注册和管理泛化调用的静态代理工厂。
IGenericReference.java
/**
* @program: api-gateway-core
* @ClassName IGenericReference
* @description: 统一泛化调用接口,无论怎样的HTTP请求,我们暴露出去的都是这个接口,一致
* @author: zs宝
* @create: 2025-07-29 15:53
* @Version 1.0
**/
package com.zshunbao.gateway.bind;
public interface IGenericReference {
String $invoke(String args);
}
MapperProxy.java
/**
* @program: api-gateway-core
* @ClassName MapperProxy
* @description: 泛化调用静态代理,每发起一个HTTP请求,网关就会给对应的HTTP方法执行对应的PRC远程调用
* 这里只是进行对应的映射,具体的调用我们现在解耦在另一个MapperMethod类中
* 继承MethodInterceptor接口,让本地创建的动态代理对象的执行方法被拦截,走这里定义的方法,从而走真正的Dubbo泛化调用的代理
* @author: zs宝
* @create: 2025-08-04 14:50
* @Version 1.0
**/
package com.zshunbao.gateway.bind;
import com.zshunbao.gateway.session.GatewaySession;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
import java.lang.reflect.Method;
public class MapperProxy implements MethodInterceptor {
private GatewaySession gatewaySession;
private final String uri;
public MapperProxy(GatewaySession gatewaySession, String uri) {
this.gatewaySession = gatewaySession;
this.uri = uri;
}
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
MapperMethod linkMethod=new MapperMethod(uri,method,gatewaySession.getConfiguration());
return linkMethod.execute(gatewaySession,args);
}
}
这个类在第二章对应于GenericReferenceProxy.java
,但是第二章的类里面直接就实现了动态代理方法到泛化调用的映射,这里我们进一步解耦,将其作为一个映射,具体的方法再做进一步判断
MapperMethod.java
/**
* @program: api-gateway-core
* @ClassName MapperMethod
* @description: 调用绑定的对应方法
* @author: zs宝
* @create: 2025-08-04 14:45
* @Version 1.0
**/
package com.zshunbao.gateway.bind;
import com.zshunbao.gateway.mapping.HttpCommandType;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
import java.lang.reflect.Method;
public class MapperMethod {
private String uri;
private final HttpCommandType command;
public MapperMethod(String uri, Method method, Configuration configuration) {
this.uri=uri;
this.command = configuration.getHttpStatement(uri).getHttpCommandType();;
}
public Object execute(GatewaySession session, Object args) {
Object result=null;
switch (command){
case GET:
result=session.get(uri,args);
break;
case POST:
break;
case PUT:
break;
case DELETE:
break;
default:
throw new RuntimeException("Unknown execution method for: " + command);
}
return result;
}
}
MapperProxyFactory.java
/**
* @program: api-gateway-core
* @ClassName MapperProxyFactory
* @description: 泛化调用静态代理工厂
* 每一个应用的接口的方法对应一个运行时创建的动态代理,注意这个动态代理只是个空壳子,是为了解耦而创建存在
* @author: zs宝
* @create: 2025-08-04 14:59
* @Version 1.0
**/
package com.zshunbao.gateway.bind;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.GatewaySession;
import net.sf.cglib.core.Signature;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.InterfaceMaker;
import org.objectweb.asm.Type;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MapperProxyFactory {
private String uri;
public MapperProxyFactory(String uri) {
this.uri = uri;
}
private final Map<String, IGenericReference> genericReferenceCache = new ConcurrentHashMap<>();
public IGenericReference newInstance(GatewaySession gatewaySession) {
return genericReferenceCache.computeIfAbsent(uri,k->{
//得到自定义的映射的HTTP对象
HttpStatement httpStatement = gatewaySession.getConfiguration().getHttpStatement(uri);
InterfaceMaker interfaceMaker=new InterfaceMaker();
interfaceMaker.add(
new Signature(httpStatement.getMethodName(), Type.getType(String.class),new Type[]{Type.getType(String.class)})
,
null
);
Class <?> interfaceClass = interfaceMaker.create();
//开始创建动态代理对象
Enhancer enhancer=new Enhancer();
enhancer.setSuperclass(Object.class);
enhancer.setInterfaces(new Class[]{IGenericReference.class,interfaceClass});
//这里很关键,设置回调函数,即拦截函数,我们本地创建的动态代理类根本不知道远程服务要做怎样的处理
//因此内部方法什么的都是空壳子,但是要有这个东西进行解耦,避免直接调用Dubbo的泛化调用,且利用IGenericReference统一执行函数入口
MapperProxy genericReferenceProxy=new MapperProxy(gatewaySession,uri);
enhancer.setCallback(genericReferenceProxy);
return (IGenericReference) enhancer.create();
});
}
}
MapperRegistry.java
/**
* @program: api-gateway-core
* @ClassName MapperRegistry
* @description:泛化调用注册器
* @author: zs宝
* @create: 2025-08-04 15:14
* @Version 1.0
**/
package com.zshunbao.gateway.bind;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
import java.util.HashMap;
import java.util.Map;
public class MapperRegistry {
private final Configuration configuration;
public MapperRegistry(Configuration configuration) {
this.configuration = configuration;
}
// 泛化调用静态代理工厂
private final Map<String, MapperProxyFactory> knownMappers = new HashMap<>();
public void addMapper(HttpStatement httpStatement) {
String uri = httpStatement.getUri();
// 如果重复注册则报错
if(hasMapper(uri)){
throw new RuntimeException("Uri " + uri + " is already known to the MapperRegistry.");
}
knownMappers.put(uri,new MapperProxyFactory(uri));
//保存接口映射信息
configuration.addHttpStatement(httpStatement);
}
public IGenericReference getMapper(String uri, GatewaySession gatewaySession) {
MapperProxyFactory mapperProxyFactory = knownMappers.get(uri);
if (mapperProxyFactory == null) {
throw new RuntimeException("Uri " + uri + " is not known to the MapperRegistry.");
}
try {
return mapperProxyFactory.newInstance(gatewaySession);
} catch (Exception e) {
throw new RuntimeException("Error getting mapper instance. Cause: " + e, e);
}
}
public <T> boolean hasMapper(String uri) {
return knownMappers.containsKey(uri);
}
}
测试
package com.zshunbao.gateway.test;
import com.zshunbao.gateway.mapping.HttpCommandType;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.defaults.DefaultGatewaySessionFactory;
import com.zshunbao.gateway.socket.GatewaySocketServer;
import io.netty.channel.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ApiTest {
private final Logger logger = LoggerFactory.getLogger(ApiTest.class);
/**
* 测试:http://localhost:7397/wg/activity/sayHi
*/
@Test
public void test_gateway() throws InterruptedException, ExecutionException {
// 1. 创建配置信息加载注册
Configuration configuration = new Configuration();
HttpStatement httpStatement = new HttpStatement(
"api-gateway-test",
"cn.bugstack.gateway.rpc.IActivityBooth",
"sayHi",
"/wg/activity/sayHi",
HttpCommandType.GET);
configuration.addMapper(httpStatement);
// 2. 基于配置构建会话工厂
DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory(configuration);
// 3. 创建启动网关网络服务
GatewaySocketServer server = new GatewaySocketServer(gatewaySessionFactory);
Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
Channel channel = future.get();
if (null == channel) throw new RuntimeException("netty server start error channel is null");
while (!channel.isActive()) {
logger.info("netty server gateway start Ing ...");
Thread.sleep(500);
}
logger.info("netty server gateway start Done! {}", channel.localAddress());
Thread.sleep(Long.MAX_VALUE);
}
}
启动第二章的自定义远程服务和zookeeper。
启动本节测试类,然后取浏览器访问测试地址