业务流程
本章节主要做一些功能细节的完善
网关的注册中心需要提供一个网关算力与RPC服务的分配关系。groupId --1vn--> gatewayId --1vn--> systemId
10001 -> api-gateway-g3 -> api-gateway-test-01-provider
10001 -> api-gateway-g3 -> api-gateway-test-02-provider
10001 -> api-gateway-g4 -> api-gateway-test-03-provider
10001 -> api-gateway-g4 -> api-gateway-test-04-provider
10002 -> api-gateway-g5 -> api-gateway-test-05-provider
10002 -> api-gateway-g5 -> api-gateway-test-06-provider
RPC 应用上报的 SDK 中需要添加一个开关,是否允许上报。这样可以更方便的从应用中摘除 SDK 的功能,可以更便于的测试。
核心通信组件 CORE 中需要在返回的通信协议中携带上是哪台网关算力处理的HTTP协议,这样可以更方便的看到负载起的作用。
业务实现
首先是网关的注册中心需要提供一个网关算力与RPC服务的分配关系,代码如下
GatewayConfigManage.java
/**
* 网关算力与系统挂载配置
* groupId --1vn--> gatewayId --1vn--> systemId
* 10001 -> api-gateway-g3 -> api-gateway-test-01-provider
* 10001 -> api-gateway-g3 -> api-gateway-test-02-provider
* 10001 -> api-gateway-g4 -> api-gateway-test-03-provider
* 10001 -> api-gateway-g4 -> api-gateway-test-04-provider
* 10002 -> api-gateway-g5 -> api-gateway-test-05-provider
* 10002 -> api-gateway-g5 -> api-gateway-test-06-provider
*/
@PostMapping(value = "distributionGatewayServerNode",produces = "application/json;charset=utf-8")
public Result<Boolean> distributionGatewayServerNode(@RequestParam String groupId, @RequestParam String gatewayId,@RequestParam String systemId) {
try {
logger.info("网关算力与系统挂载配置。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId);
configManageService.distributionGatewayServerNode(groupId,gatewayId,systemId);
return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), true);
}catch (DuplicateKeyException e){
logger.warn("查询应用服务配置项信息失败,唯一索引冲突。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId, e);
return new Result<>(ResponseCode.INDEX_DUP.getCode(), e.getMessage(), true);
}catch (Exception e){
logger.error("网关算力与系统挂载配置。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId, e);
return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false);
}
}
ConfigManageService.java
@Override
public void distributionGatewayServerNode(String groupId, String gatewayId, String systemId) {
String systemName=configManageRepository.queryApplicationSystemName(systemId);
if(StringUtils.isEmpty(systemName)){
throw new RuntimeException("网关算力与系统挂载配置失败,systemId:" + systemId + " 在数据库表「application_system」中不存在!");
}
configManageRepository.distributionGatewayServerNode(groupId,gatewayId,systemId,systemName);
}
ConfigManageRepository.java
@Override
public String queryApplicationSystemName(String systemId) {
return applicationSystemDao.queryApplicationSystemName(systemId);
}
@Override
public void distributionGatewayServerNode(String groupId, String gatewayId, String systemId, String systemName) {
GatewayDistribution gatewayDistribution=new GatewayDistribution();
gatewayDistribution.setGroupId(groupId);
gatewayDistribution.setGatewayId(gatewayId);
gatewayDistribution.setSystemId(systemId);
gatewayDistribution.setSystemName(systemName);
gatewayDistributionDao.insert(gatewayDistribution);
}
application_system.xml
<select id="queryApplicationSystemName" parameterType="java.lang.String" resultType="java.lang.String">
select system_name
from application_system
where system_id = #{systemId}
</select>
gateway_distribution.xml
<insert id="insert" parameterType="com.zshunbao.gateway.center.infrastructure.po.GatewayDistribution">
INSERT into gateway_distribution(group_id, gateway_id, system_id, system_name, create_time, update_time)
VALUES (#{groupId}, #{gatewayId}, #{systemId}, #{systemName}, now(), now())
</insert>
然后是api-gateway-sdk增加开关,是否允许上报,修改代码如下
GatewaySDKServiceProperties.java
package com.zshunbao.gateway.sdk.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @program: api-gateway-sdk
* @ClassName GatewaySDKServiceProperties
* @description: 定义整个服务系统相关配置,这种配置一般都是既定好的,即就是application_system表的相关内容
* @author: zs宝
* @create: 2025-08-29 16:18
* @Version 1.0
**/
@ConfigurationProperties("api-gateway-sdk")
public class GatewaySDKServiceProperties {
/** 网关注册中心地址 */
private String address;
/** 系统标识 */
private String systemId;
/** 系统名称 */
private String systemName;
/** RPC注册中心;zookeeper://127.0.0.1:2181*/
private String systemRegistry;
/** 程序是否启用 */
private boolean enabled = true;
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getSystemId() {
return systemId;
}
public void setSystemId(String systemId) {
this.systemId = systemId;
}
public String getSystemName() {
return systemName;
}
public void setSystemName(String systemName) {
this.systemName = systemName;
}
public String getSystemRegistry() {
return systemRegistry;
}
public void setSystemRegistry(String systemRegistry) {
this.systemRegistry = systemRegistry;
}
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
GatewaySDKAutoConfig.java
package com.zshunbao.gateway.sdk.config;
import com.zshunbao.gateway.sdk.application.GatewaySDKApplication;
import com.zshunbao.gateway.sdk.domain.service.GatewayCenterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @program: api-gateway-sdk
* @ClassName GatewaySDKAutoConfig
* @description: 网关sdk配置服务
* @author: zs宝
* @create: 2025-08-29 16:25
* @Version 1.0
**/
@Configuration
@EnableConfigurationProperties(GatewaySDKServiceProperties.class)
@ConditionalOnProperty(
prefix = "api-gateway-sdk",
name = "enabled",
havingValue = "true",
matchIfMissing = true
)
public class GatewaySDKAutoConfig {
private Logger logger = LoggerFactory.getLogger(GatewaySDKAutoConfig.class);
@Bean
public GatewayCenterService gatewayCenterService(){
return new GatewayCenterService();
}
@Bean
public GatewaySDKApplication gatewaySDKApplication(GatewaySDKServiceProperties properties,GatewayCenterService gatewayCenterService) {
logger.info("构建 GatewaySDKApplication bean");
return new GatewaySDKApplication(properties,gatewayCenterService);
}
}
最后是api-gateway-core返回结果带上算力节点的IP端口信息,修改代码如下
GatewayResultMessage.java
package com.zshunbao.gateway.core.socket.agreement;
/**
* @program: api-gateway-core
* @ClassName GatewayResultMessage
* @description: 网关结果封装
* @author: zs宝
* @create: 2025-08-15 15:26
* @Version 1.0
**/
public class GatewayResultMessage {
private String code;
private String info;
private Object data;
private String node;
protected GatewayResultMessage(String code, String info, Object data) {
this.code = code;
this.info = info;
this.data = data;
}
public static GatewayResultMessage buildSuccess(Object data) {
return new GatewayResultMessage(AgreementConstants.ResponseCode._200.getCode(), AgreementConstants.ResponseCode._200.getInfo(), data );
}
public static GatewayResultMessage buildError(String code, String info) {
return new GatewayResultMessage(code, info, null);
}
public GatewayResultMessage setNode(String node){
this.node=node;
return this;
}
public String getCode() {
return code;
}
public String getInfo() {
return info;
}
public Object getData() {
return data;
}
public String getNode() {
return node;
}
}
ProtocolDataHandler.java
/**
* @program: api-gateway-core
* @ClassName ProtocolDataHandler
* @description:
* @author: zs宝
* @create: 2025-08-04 15:33
* @Version 1.0
**/
package com.zshunbao.gateway.core.socket.handlers;
import com.zshunbao.gateway.core.bind.IGenericReference;
import com.zshunbao.gateway.core.executor.result.SessionResult;
import com.zshunbao.gateway.core.session.GatewaySession;
import com.zshunbao.gateway.core.session.defaults.DefaultGatewaySessionFactory;
import com.zshunbao.gateway.core.socket.BaseHandler;
import com.zshunbao.gateway.core.socket.agreement.AgreementConstants;
import com.zshunbao.gateway.core.socket.agreement.GatewayResultMessage;
import com.zshunbao.gateway.core.socket.agreement.RequestParser;
import com.zshunbao.gateway.core.socket.agreement.ResponseParser;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class ProtocolDataHandler extends BaseHandler<FullHttpRequest> {
private final Logger logger = LoggerFactory.getLogger(ProtocolDataHandler.class);
private final DefaultGatewaySessionFactory gatewaySessionFactory;
public ProtocolDataHandler(DefaultGatewaySessionFactory gatewaySessionFactory) {
this.gatewaySessionFactory = gatewaySessionFactory;
}
@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求 uri:{} method:{}", request.uri(), request.method());
try {
//1、解析请求参数
RequestParser requestParser = new RequestParser(request);
String uri= requestParser.getUri();
if (uri==null || uri.equals("/favicon.ico")) return;
Map<String, Object> args = requestParser.parse();
//2、创建会话
GatewaySession gatewaySession = gatewaySessionFactory.openSession(uri);
//根据uri获得对应的mapper映射,即本地动态创建的代理类
IGenericReference reference = gatewaySession.getMapper();
//执行代理类的方法,这个方法会被拦截,然后执行Dubbo引用缓存到本地真正的泛化代理对象,从而调用远程方法执行,得到结果
SessionResult result = reference.$invoke(args);
//3、封装返回结果:注意由sessionResult到网关定义的返回结果的转化
DefaultFullHttpResponse response = new ResponseParser().parse("0000".equals(result.getCode()) ? GatewayResultMessage.buildSuccess(result.getData()).setNode(node()) : GatewayResultMessage.buildError(AgreementConstants.ResponseCode._404.getCode(), "网关协议调用失败!").setNode(node()));
channel.writeAndFlush(response);
} catch (Exception e) {
//4封装错误的结果
DefaultFullHttpResponse response=new ResponseParser().parse(GatewayResultMessage.buildError(AgreementConstants.ResponseCode._502.getCode(),"网关调用失败"+e.getMessage()).setNode(node()));
channel.writeAndFlush(response);
}
}
/**
* 获取当前节点的算力网关的IP端口信息
* @return
*/
private String node(){
return gatewaySessionFactory.getConfiguration().getHostName()+":"+gatewaySessionFactory.getConfiguration().getPort();
}
}
测试
依照第28章的内容,将所有模块重新mavean clean install,删除原有的docker部署及对应镜像,重新构造镜像,重新部署实例
到此基础内容已近全部完成