业务流程
在Dubbo泛化调用的官方文档处中有这样的案例(这里只粘贴与本节业务相关的部分,详情请看官方文档)
import org.apache.dubbo.rpc.service.GenericService;
...
// 引用远程服务
// 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>();
// 弱类型接口名
reference.setInterface("com.xxx.XxxService");
reference.setVersion("1.0.0");
// 声明为泛化接口
reference.setGeneric(true);
// 用org.apache.dubbo.rpc.service.GenericService可以替代所有接口引用
GenericService genericService = reference.get();
// 基本类型以及Date,List,Map等不需要转换,直接调用
Object result = genericService.$invoke("sayHello", new String[] {"java.lang.String"}, new Object[] {"world"});
// 用Map表示POJO参数,如果返回值为POJO也将自动转成Map
Map<String, Object> person = new HashMap<String, Object>();
person.put("name", "xxx");
person.put("password", "yyy");
// 如果返回POJO将自动转成Map
Object result = genericService.$invoke("findPerson", new String[]
{"com.xxx.Person"}, new Object[]{person});
这里面有一个很重要的地方在于,
// 基本类型以及Date,List,Map等不需要转换,直接调用
Object result = genericService.$invoke("sayHello", new String[] {"java.lang.String"}, new Object[] {"world"});
但是一旦不是基本类型,对于泛化调用传递进去的参数就要进行map包装
// 用Map表示POJO参数,如果返回值为POJO也将自动转成Map
Map<String, Object> person = new HashMap<String, Object>();
person.put("name", "xxx");
person.put("password", "yyy");
// 如果返回POJO将自动转成Map
Object result = genericService.$invoke("findPerson", new String[]
{"com.xxx.Person"}, new Object[]{person});
但是对于我们在前面几节的代码对于HTTP请求传递进来的参数一直是硬编码如下
@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求 uri:{} method:{}", request.uri(), request.method());
String uri = request.getUri();
if (uri.equals("/favicon.ico")) return;
//创建会话
GatewaySession gatewaySession = gatewaySessionFactory.openSession(uri);
//根据uri获得对应的mapper映射,即本地动态创建的代理类
IGenericReference reference = gatewaySession.getMapper();
//执行代理类的方法,这个方法会被拦截,然后执行Dubbo引用缓存到本地真正的泛化代理对象,从而调用远程方法执行,得到结果
String result = reference.$invoke("test") + " " + System.currentTimeMillis();
这明显不符合我们日常所用的网关对于接收到的请求的处理,因此本节的重点就在于对HTTP请求进行解析,将请求的参数传递进去进行Dubbo远程泛化调用。
现在我们需要想想我们对于HTTP请求参数解析需要考虑设计那些东西?
HTTP请求有哪些类型?GET,POST,DELETE,PUT
不同的HTTP请求的参数是怎样的,与请求头中的那些字段有关?
解析后对于泛化调用的参数需要做那些修改?
这里提供一个有关RPC接口入参信息的规范:HTTP 接口请求的参数需要解析成可以匹配到 RPC 接口的入参信息,所以通常为了方便控制一般只支持 RPC 接口单个对象入参,并且不允许同名不同参数的重载方法出现,这些会在 RPC 方法注册阶段进行报错提醒。
本节暂时只对GET,POST请求进行参数解析(这两个最常见,用的也最多),流程图如下
从网络请求到会话,需要对 GET/POST 的请求,以及请求的参数类型 Content-Type 做细化的参数解析操作。
同时按照 RPC 泛化调用的入参方式,将解析的参数封装处理。
数据源服务核心链路关系图(图片来源于小傅哥)如下
本节业务代码目录
本节的主要代码也就在参数解析的那个类上,然后是关于参数类型的判断上,最后是一些代码的参数细微调整
业务实现
Dubbo官方文档的泛化调用有介绍,当数据类型不为基本类型时,需要用map进行包装,因此当请求进来时,我们肯定需要判断这个请求参数类型是不是基本类型,因此这个里我们定义一个类来帮助我们快速根据映射信息判断需要的参数是不是基本类型
SimpleTypeRegistry.java
/**
* @program: api-gateway-core
* @ClassName SimpleTypeRegistry
* @description: 基本类型注册器
* @author: zs宝
* @create: 2025-08-09 16:27
* @Version 1.0
**/
package com.zshunbao.gateway.type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
public class SimpleTypeRegistry {
//存储Java常见的基本类型参数
private static final Set<String> SIMPLE_TYPE_SET = new HashSet<>();
static {
SIMPLE_TYPE_SET.add(String.class.getName());
SIMPLE_TYPE_SET.add(Byte.class.getName());
SIMPLE_TYPE_SET.add(Short.class.getName());
SIMPLE_TYPE_SET.add(Character.class.getName());
SIMPLE_TYPE_SET.add(Integer.class.getName());
SIMPLE_TYPE_SET.add(Long.class.getName());
SIMPLE_TYPE_SET.add(Float.class.getName());
SIMPLE_TYPE_SET.add(Double.class.getName());
SIMPLE_TYPE_SET.add(Boolean.class.getName());
SIMPLE_TYPE_SET.add(Date.class.getName());
SIMPLE_TYPE_SET.add(Class.class.getName());
SIMPLE_TYPE_SET.add(BigInteger.class.getName());
SIMPLE_TYPE_SET.add(BigDecimal.class.getName());
}
private SimpleTypeRegistry(){}
/**
* 用于判断某种参数类型是否是常见的基本参数类型
* @param clazz 参数类型的名称
* @return
*/
public static boolean isSimpleType(String clazz) {
return SIMPLE_TYPE_SET.contains(clazz);
}
}
现在关于参数类型的判断已经完成,接下来就是本节最重要的参数解析上
RequestParser.java
/**
* @program: api-gateway-core
* @ClassName RequestParser
* @description:请求解析器,解析HTTP请求,GET/POST form-data\raw-json
* @author: zs宝
* @create: 2025-08-09 16:33
* @Version 1.0
**/
package com.zshunbao.gateway.socket.agreement;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import com.alibaba.fastjson.JSON;
public class RequestParser {
//请求参数
private final FullHttpRequest request;
public RequestParser(FullHttpRequest request){
this.request = request;
}
/**
* 根据请求是post还是get,对参数进行解析为map集合
* @return 参数的map集合<属性,值>
*/
public Map<String, Object> parse() {
//获取请求类型
HttpMethod method = request.method();
if(HttpMethod.GET==method){
Map<String, Object> parameterMap=new HashMap<>();
//直接用netty提供的编码器处理
//由于是get请求,那么所有的参数都在uri中
QueryStringDecoder decoder=new QueryStringDecoder(request.uri());
decoder.parameters().forEach((key,value)->parameterMap.put(key,value.get(0)));
return parameterMap;
} else if (HttpMethod.POST==method) {
// 获取 Content-type
//注意get请求是没有这个参数的
String contentType = getContentType();
switch (contentType){
case "multipart/form-data":
Map<String, Object> parameterMap=new HashMap<>();
HttpPostRequestDecoder decoder=new HttpPostRequestDecoder(request);
//将整个请求体给解析器
decoder.offer(request);
decoder.getBodyHttpDatas().forEach(data->{
//拿到普通表单字段
//一个 HTTP POST 表单中的普通键值对字段(非文件),值是文本。
Attribute attr = (Attribute) data;
try {
parameterMap.put(data.getName(), attr.getValue());
}catch (IOException ignore){
ignore.printStackTrace();
}
});
return parameterMap;
case "application/json":
//解析json类型的数据
//直接拿到请求体中的内容,并复制一份
ByteBuf byteBuf = request.content().copy();
//若不为空
if(byteBuf.isReadable()){
//转为String类型
String content = byteBuf.toString(StandardCharsets.UTF_8);
//将json字符串转为json形式的map集合
return JSON.parseObject(content);
}
break;
default:
throw new RuntimeException("未实现的协议类型 Content-Type:" + contentType);
}
}
throw new RuntimeException("未实现的请求类型 HttpMethod:" + method);
}
/**
* 得到请求参数中的ContentType中的值
* @return
*/
private String getContentType() {
//从请求参数中取得对应的Content-Type键值
Optional<Map.Entry<String, String>> header = request.headers().entries().stream().filter(
val -> val.getKey().equals("Content-Type")
).findAny();
//判断是否为空,有没有
Map.Entry<String, String> entry = header.orElse(null);
assert entry!=null;
//不为空,则只需要取得参数的类型是什么如application/json、multipart/form-data
//先从中取得对应的值
String entryValue = entry.getValue();
//找到第一个分号位置
int index = entryValue.indexOf(";");
//说明其中除了参数类型,还有其他值,我们只需要参数类型这一个即可
if(index>0){
return entryValue.substring(0,index);
}else {
return entryValue;
}
}
}
到此本节的业务代码其实已经完成,还剩下的就是一些细微的调整
代码调整
首先是我们需要将我们需要对于网关接口映射的信息进行新增一个字段parameterType
/**
* @program: api-gateway-core
* @ClassName HttpStatement
* @description: 网关接口映射信息
* @author: zs宝
* @create: 2025-08-04 14:40
* @Version 1.0
**/
package com.zshunbao.gateway.mapping;
public class HttpStatement {
/** 应用名称; */
private String application;
/** 服务接口;RPC、其他 */
private String interfaceName;
/** 服务方法;RPC#method */
private String methodName;
/** 参数类型(RPC 限定单参数注册);new String[]{"java.lang.String"}、new String[]{"cn.bugstack.gateway.rpc.dto.XReq"} */
private String parameterType;
/** 网关接口 */
private String uri;
/** 接口类型;GET、POST、PUT、DELETE */
private HttpCommandType httpCommandType;
public HttpStatement(String application, String interfaceName, String methodName, String parameterType, String uri, HttpCommandType httpCommandType){
this.application = application;
this.interfaceName = interfaceName;
this.methodName = methodName;
this.parameterType = parameterType;
this.uri = uri;
this.httpCommandType = httpCommandType;
}
public String getApplication() {
return application;
}
public String getInterfaceName() {
return interfaceName;
}
public String getMethodName() {
return methodName;
}
public String getUri() {
return uri;
}
public HttpCommandType getHttpCommandType() {
return httpCommandType;
}
public String getParameterType() {
return parameterType;
}
}
然后对于网络通信出处的关于请求的处理需要进行修改,让泛化调用参数不再是被写死的
/**
* @program: api-gateway-core
* @ClassName GatewayServerHandler
* @description:
* @author: zs宝
* @create: 2025-08-04 15:33
* @Version 1.0
**/
package com.zshunbao.gateway.socket.handlers;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.zshunbao.gateway.bind.IGenericReference;
import com.zshunbao.gateway.session.GatewaySession;
import com.zshunbao.gateway.session.defaults.DefaultGatewaySessionFactory;
import com.zshunbao.gateway.socket.BaseHandler;
import com.zshunbao.gateway.socket.agreement.RequestParser;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class GatewayServerHandler extends BaseHandler<FullHttpRequest> {
private final Logger logger = LoggerFactory.getLogger(GatewayServerHandler.class);
private final DefaultGatewaySessionFactory gatewaySessionFactory;
public GatewayServerHandler(DefaultGatewaySessionFactory gatewaySessionFactory) {
this.gatewaySessionFactory = gatewaySessionFactory;
}
@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求 uri:{} method:{}", request.uri(), request.method());
//解析请求参数
Map<String, Object> requestObj = new RequestParser(request).parse();
String uri = request.uri();
int index = uri.indexOf("?");
uri=index>0?uri.substring(0,index):uri;
if (uri.equals("/favicon.ico")) return;
//创建会话
GatewaySession gatewaySession = gatewaySessionFactory.openSession(uri);
//根据uri获得对应的mapper映射,即本地动态创建的代理类
IGenericReference reference = gatewaySession.getMapper();
//执行代理类的方法,这个方法会被拦截,然后执行Dubbo引用缓存到本地真正的泛化代理对象,从而调用远程方法执行,得到结果
String result = reference.$invoke(requestObj) + " " + System.currentTimeMillis();
// 返回信息处理
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
// 设置回写数据
response.content().writeBytes(JSON.toJSONBytes(result, SerializerFeature.PrettyFormat));
// 头部信息设置
HttpHeaders heads = response.headers();
// 返回内容类型
heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8");
// 响应体的长度
heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// 配置持久连接
heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
// 配置跨域访问
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
channel.writeAndFlush(response);
}
}
由于所有参数都被包装为了map,所以我们在bind的代理和方法处的调用方法参数也需要进行修改,同时添加有关post的处理
/**
* @program: api-gateway-core
* @ClassName MapperProxy
* @description: 泛化调用静态代理,每发起一个HTTP请求,网关就会给对应的HTTP方法执行对应的PRC远程调用
* 这里只是进行对应的映射,具体的调用我们现在解耦在另一个MapperMethod类中
* 继承MethodInterceptor接口,让本地创建的动态代理对象的执行方法被拦截,走这里定义的方法,从而走真正的Dubbo泛化调用的代理
* @author: zs宝
* @create: 2025-08-04 14:50
* @Version 1.0
**/
package com.zshunbao.gateway.bind;
import com.zshunbao.gateway.session.GatewaySession;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
import java.lang.reflect.Method;
import java.util.Map;
public class MapperProxy implements MethodInterceptor {
private GatewaySession gatewaySession;
private final String uri;
public MapperProxy(GatewaySession gatewaySession, String uri) {
this.gatewaySession = gatewaySession;
this.uri = uri;
}
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
MapperMethod linkMethod=new MapperMethod(uri,method,gatewaySession.getConfiguration());
//暂时只获取第0个参数
return linkMethod.execute(gatewaySession,(Map<String, Object>) args[0]);
}
}
/**
* @program: api-gateway-core
* @ClassName MapperMethod
* @description: 调用绑定的对应方法
* @author: zs宝
* @create: 2025-08-04 14:45
* @Version 1.0
**/
package com.zshunbao.gateway.bind;
import com.zshunbao.gateway.mapping.HttpCommandType;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
import java.lang.reflect.Method;
import java.util.Map;
public class MapperMethod {
private String methodName;
private final HttpCommandType command;
public MapperMethod(String uri, Method method, Configuration configuration) {
this.methodName=configuration.getHttpStatement(uri).getMethodName();
this.command = configuration.getHttpStatement(uri).getHttpCommandType();;
}
public Object execute(GatewaySession session, Map<String, Object> params) {
Object result=null;
switch (command){
case GET:
result=session.get(methodName,params);
break;
case POST:
result=session.post(methodName,params);
break;
case PUT:
break;
case DELETE:
break;
default:
throw new RuntimeException("Unknown execution method for: " + command);
}
return result;
}
}
紧接着的调用默认session处的泛化调用传递参数需要修改,同时添加有关post的处理
/**
* @program: api-gateway-core
* @ClassName DefaultGatewaySession
* @description:
* @author: zs宝
* @create: 2025-08-04 15:18
* @Version 1.0
**/
package com.zshunbao.gateway.session.defaults;
import com.zshunbao.gateway.bind.IGenericReference;
import com.zshunbao.gateway.datasource.Connection;
import com.zshunbao.gateway.datasource.DataSource;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.GatewaySession;
import com.zshunbao.gateway.type.SimpleTypeRegistry;
import java.util.Map;
public class DefaultGatewaySession implements GatewaySession {
private Configuration configuration;
private String uri;
private DataSource dataSource;
public DefaultGatewaySession(Configuration configuration, String uri, DataSource dataSource) {
this.configuration = configuration;
this.uri=uri;
this.dataSource=dataSource;
}
@Override
public Object get(String methodName, Map<String, Object> params) {
//现在一切的泛化调用都在对应的connection中
Connection connection = dataSource.getConnection();
HttpStatement httpStatement = configuration.getHttpStatement(uri);
String parameterType = httpStatement.getParameterType();
//这里需要判断参数的类型是不是基本类型
return connection.execute(
methodName,
new String[]{parameterType},
new String[]{"ignore"},
SimpleTypeRegistry.isSimpleType(parameterType)?params.values().toArray() :new Object[]{params});
}
@Override
public Object post(String methodName, Map<String, Object> params) {
return get(methodName, params);
}
@Override
public Configuration getConfiguration() {
return configuration;
}
@Override
public IGenericReference getMapper() {
return configuration.getMapper(uri,this);
}
}
测试
/**
* @program: api-gateway-core
* @ClassName ApiTest
* @description:
* @author: zs宝
* @create: 2025-08-09 17:21
* @Version 1.0
**/
package com.zshunbao.gateway.test;
import com.zshunbao.gateway.mapping.HttpCommandType;
import com.zshunbao.gateway.mapping.HttpStatement;
import com.zshunbao.gateway.session.Configuration;
import com.zshunbao.gateway.session.defaults.DefaultGatewaySessionFactory;
import com.zshunbao.gateway.socket.GatewaySocketServer;
import io.netty.channel.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ApiTest {
private final Logger logger = LoggerFactory.getLogger(ApiTest.class);
/**
* 测试:
* http://localhost:7397/wg/activity/sayHi
* 参数:
* {
* "str": "10001"
* }
*
* http://localhost:7397/wg/activity/insert
* 参数:
* {
* "name":"zshunbao",
* "uid":"10001"
* }
*/
@Test
public void test_gateway() throws InterruptedException, ExecutionException {
// 1. 创建配置信息加载注册
Configuration configuration = new Configuration();
HttpStatement httpStatement01 = new HttpStatement(
"api-gateway-test",
"cn.bugstack.gateway.rpc.IActivityBooth",
"sayHi",
"java.lang.String",
"/wg/activity/sayHi",
HttpCommandType.GET);
HttpStatement httpStatement02 = new HttpStatement(
"api-gateway-test",
"cn.bugstack.gateway.rpc.IActivityBooth",
"insert",
"cn.bugstack.gateway.rpc.dto.XReq",
"/wg/activity/insert",
HttpCommandType.POST);
configuration.addMapper(httpStatement01);
configuration.addMapper(httpStatement02);
// 2. 基于配置构建会话工厂
DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory(configuration);
// 3. 创建启动网关网络服务
GatewaySocketServer server = new GatewaySocketServer(gatewaySessionFactory);
Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
Channel channel = future.get();
if (null == channel) throw new RuntimeException("netty server start error channel is null");
while (!channel.isActive()) {
logger.info("netty server gateway start Ing ...");
Thread.sleep(500);
}
logger.info("netty server gateway start Done! {}", channel.localAddress());
Thread.sleep(Long.MAX_VALUE);
}
}
启动zookeeper,运行自定义的测试远程服务,启动测试类
使用API-POST测试