业务流程

本章节主要做一些功能细节的完善

  • 网关的注册中心需要提供一个网关算力与RPC服务的分配关系。groupId --1vn--> gatewayId --1vn--> systemId

    • 10001 -> api-gateway-g3 -> api-gateway-test-01-provider

    • 10001 -> api-gateway-g3 -> api-gateway-test-02-provider

    • 10001 -> api-gateway-g4 -> api-gateway-test-03-provider

    • 10001 -> api-gateway-g4 -> api-gateway-test-04-provider

    • 10002 -> api-gateway-g5 -> api-gateway-test-05-provider

    • 10002 -> api-gateway-g5 -> api-gateway-test-06-provider

  • RPC 应用上报的 SDK 中需要添加一个开关,是否允许上报。这样可以更方便的从应用中摘除 SDK 的功能,可以更便于的测试。

  • 核心通信组件 CORE 中需要在返回的通信协议中携带上是哪台网关算力处理的HTTP协议,这样可以更方便的看到负载起的作用。

业务实现

首先是网关的注册中心需要提供一个网关算力与RPC服务的分配关系,代码如下

GatewayConfigManage.java

 /**
      * 网关算力与系统挂载配置
      * groupId --1vn--> gatewayId --1vn--> systemId
      * 10001 -> api-gateway-g3 -> api-gateway-test-01-provider
      * 10001 -> api-gateway-g3 -> api-gateway-test-02-provider
      * 10001 -> api-gateway-g4 -> api-gateway-test-03-provider
      * 10001 -> api-gateway-g4 -> api-gateway-test-04-provider
      * 10002 -> api-gateway-g5 -> api-gateway-test-05-provider
      * 10002 -> api-gateway-g5 -> api-gateway-test-06-provider
      */
     @PostMapping(value = "distributionGatewayServerNode",produces = "application/json;charset=utf-8")
     public Result<Boolean> distributionGatewayServerNode(@RequestParam String groupId, @RequestParam String gatewayId,@RequestParam String systemId) {
         try {
             logger.info("网关算力与系统挂载配置。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId);
             configManageService.distributionGatewayServerNode(groupId,gatewayId,systemId);
             return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), true);
         }catch (DuplicateKeyException e){
             logger.warn("查询应用服务配置项信息失败,唯一索引冲突。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId, e);
             return new Result<>(ResponseCode.INDEX_DUP.getCode(), e.getMessage(), true);
         }catch (Exception e){
             logger.error("网关算力与系统挂载配置。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId, e);
             return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false);
         }
     }

ConfigManageService.java

 ​
 @Override
     public void distributionGatewayServerNode(String groupId, String gatewayId, String systemId) {
         String systemName=configManageRepository.queryApplicationSystemName(systemId);
         if(StringUtils.isEmpty(systemName)){
             throw new RuntimeException("网关算力与系统挂载配置失败,systemId:" + systemId + " 在数据库表「application_system」中不存在!");
         }
         configManageRepository.distributionGatewayServerNode(groupId,gatewayId,systemId,systemName);
     }

ConfigManageRepository.java

 @Override
     public String queryApplicationSystemName(String systemId) {
         return applicationSystemDao.queryApplicationSystemName(systemId);
     }
 ​
     @Override
     public void distributionGatewayServerNode(String groupId, String gatewayId, String systemId, String systemName) {
         GatewayDistribution gatewayDistribution=new GatewayDistribution();
         gatewayDistribution.setGroupId(groupId);
         gatewayDistribution.setGatewayId(gatewayId);
         gatewayDistribution.setSystemId(systemId);
         gatewayDistribution.setSystemName(systemName);
         gatewayDistributionDao.insert(gatewayDistribution);
     }

application_system.xml

 <select id="queryApplicationSystemName" parameterType="java.lang.String" resultType="java.lang.String">
         select system_name
         from application_system
         where system_id = #{systemId}
     </select>

gateway_distribution.xml

 <insert id="insert" parameterType="com.zshunbao.gateway.center.infrastructure.po.GatewayDistribution">
         INSERT into gateway_distribution(group_id, gateway_id, system_id, system_name, create_time, update_time)
         VALUES (#{groupId}, #{gatewayId}, #{systemId}, #{systemName}, now(), now())
     </insert>

然后是api-gateway-sdk增加开关,是否允许上报,修改代码如下

GatewaySDKServiceProperties.java

 package com.zshunbao.gateway.sdk.config;
 ​
 import org.springframework.boot.context.properties.ConfigurationProperties;
 ​
 /**
  * @program: api-gateway-sdk
  * @ClassName GatewaySDKServiceProperties
  * @description: 定义整个服务系统相关配置,这种配置一般都是既定好的,即就是application_system表的相关内容
  * @author: zs宝
  * @create: 2025-08-29 16:18
  * @Version 1.0
  **/
 ​
 @ConfigurationProperties("api-gateway-sdk")
 public class GatewaySDKServiceProperties {
     /** 网关注册中心地址 */
     private String address;
     /** 系统标识 */
     private String systemId;
     /** 系统名称 */
     private String systemName;
     /** RPC注册中心;zookeeper://127.0.0.1:2181*/
     private String systemRegistry;
 ​
     /** 程序是否启用 */
     private boolean enabled = true;
 ​
     public String getAddress() {
         return address;
     }
 ​
     public void setAddress(String address) {
         this.address = address;
     }
 ​
     public String getSystemId() {
         return systemId;
     }
 ​
     public void setSystemId(String systemId) {
         this.systemId = systemId;
     }
 ​
     public String getSystemName() {
         return systemName;
     }
 ​
     public void setSystemName(String systemName) {
         this.systemName = systemName;
     }
 ​
     public String getSystemRegistry() {
         return systemRegistry;
     }
 ​
     public void setSystemRegistry(String systemRegistry) {
         this.systemRegistry = systemRegistry;
     }
 ​
     public boolean isEnabled() {
         return enabled;
     }
 ​
     public void setEnabled(boolean enabled) {
         this.enabled = enabled;
     }
 }
 ​

GatewaySDKAutoConfig.java

 package com.zshunbao.gateway.sdk.config;
 ​
 import com.zshunbao.gateway.sdk.application.GatewaySDKApplication;
 import com.zshunbao.gateway.sdk.domain.service.GatewayCenterService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 ​
 /**
  * @program: api-gateway-sdk
  * @ClassName GatewaySDKAutoConfig
  * @description: 网关sdk配置服务
  * @author: zs宝
  * @create: 2025-08-29 16:25
  * @Version 1.0
  **/
 @Configuration
 @EnableConfigurationProperties(GatewaySDKServiceProperties.class)
 @ConditionalOnProperty(
         prefix = "api-gateway-sdk",
         name = "enabled",
         havingValue = "true",
         matchIfMissing = true
 )
 public class GatewaySDKAutoConfig {
     private Logger logger = LoggerFactory.getLogger(GatewaySDKAutoConfig.class);
 ​
     @Bean
     public GatewayCenterService gatewayCenterService(){
         return new GatewayCenterService();
     }
 ​
     @Bean
     public GatewaySDKApplication gatewaySDKApplication(GatewaySDKServiceProperties properties,GatewayCenterService gatewayCenterService) {
         logger.info("构建 GatewaySDKApplication bean");
         return new GatewaySDKApplication(properties,gatewayCenterService);
     }
 }
 ​

最后是api-gateway-core返回结果带上算力节点的IP端口信息,修改代码如下

GatewayResultMessage.java

 package com.zshunbao.gateway.core.socket.agreement;
 ​
 /**
  * @program: api-gateway-core
  * @ClassName GatewayResultMessage
  * @description: 网关结果封装
  * @author: zs宝
  * @create: 2025-08-15 15:26
  * @Version 1.0
  **/
 public class GatewayResultMessage {
     private String code;
     private String info;
     private Object data;
     private String node;
 ​
     protected GatewayResultMessage(String code, String info, Object data) {
         this.code = code;
         this.info = info;
         this.data = data;
     }
     public static GatewayResultMessage buildSuccess(Object data) {
         return new GatewayResultMessage(AgreementConstants.ResponseCode._200.getCode(), AgreementConstants.ResponseCode._200.getInfo(), data );
     }
 ​
     public static GatewayResultMessage buildError(String code, String info) {
         return new GatewayResultMessage(code, info, null);
     }
 ​
     public GatewayResultMessage setNode(String node){
         this.node=node;
         return this;
     }
 ​
     public String getCode() {
         return code;
     }
 ​
     public String getInfo() {
         return info;
     }
 ​
     public Object getData() {
         return data;
     }
 ​
     public String getNode() {
         return node;
     }
 ​
 }
 ​

ProtocolDataHandler.java

 /**
  * @program: api-gateway-core
  * @ClassName ProtocolDataHandler
  * @description:
  * @author: zs宝
  * @create: 2025-08-04 15:33
  * @Version 1.0
  **/
 ​
 package com.zshunbao.gateway.core.socket.handlers;
 ​
 ​
 import com.zshunbao.gateway.core.bind.IGenericReference;
 import com.zshunbao.gateway.core.executor.result.SessionResult;
 import com.zshunbao.gateway.core.session.GatewaySession;
 import com.zshunbao.gateway.core.session.defaults.DefaultGatewaySessionFactory;
 import com.zshunbao.gateway.core.socket.BaseHandler;
 import com.zshunbao.gateway.core.socket.agreement.AgreementConstants;
 import com.zshunbao.gateway.core.socket.agreement.GatewayResultMessage;
 import com.zshunbao.gateway.core.socket.agreement.RequestParser;
 import com.zshunbao.gateway.core.socket.agreement.ResponseParser;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ​
 import java.util.Map;
 ​
 public class ProtocolDataHandler extends BaseHandler<FullHttpRequest> {
     private final Logger logger = LoggerFactory.getLogger(ProtocolDataHandler.class);
 ​
     private final DefaultGatewaySessionFactory gatewaySessionFactory;
 ​
     public ProtocolDataHandler(DefaultGatewaySessionFactory gatewaySessionFactory) {
         this.gatewaySessionFactory = gatewaySessionFactory;
     }
 ​
     @Override
     protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
         logger.info("网关接收请求 uri:{} method:{}", request.uri(), request.method());
         try {
             //1、解析请求参数
             RequestParser requestParser = new RequestParser(request);
             String uri= requestParser.getUri();
             if (uri==null || uri.equals("/favicon.ico")) return;
             Map<String, Object> args = requestParser.parse();
             //2、创建会话
             GatewaySession gatewaySession = gatewaySessionFactory.openSession(uri);
             //根据uri获得对应的mapper映射,即本地动态创建的代理类
             IGenericReference reference = gatewaySession.getMapper();
             //执行代理类的方法,这个方法会被拦截,然后执行Dubbo引用缓存到本地真正的泛化代理对象,从而调用远程方法执行,得到结果
             SessionResult result = reference.$invoke(args);
 ​
             //3、封装返回结果:注意由sessionResult到网关定义的返回结果的转化
             DefaultFullHttpResponse response = new ResponseParser().parse("0000".equals(result.getCode()) ? GatewayResultMessage.buildSuccess(result.getData()).setNode(node()) : GatewayResultMessage.buildError(AgreementConstants.ResponseCode._404.getCode(), "网关协议调用失败!").setNode(node()));
 ​
             channel.writeAndFlush(response);
         } catch (Exception e) {
             //4封装错误的结果
             DefaultFullHttpResponse response=new ResponseParser().parse(GatewayResultMessage.buildError(AgreementConstants.ResponseCode._502.getCode(),"网关调用失败"+e.getMessage()).setNode(node()));
             channel.writeAndFlush(response);
         }
     }
 ​
     /**
      * 获取当前节点的算力网关的IP端口信息
      * @return
      */
     private String node(){
         return gatewaySessionFactory.getConfiguration().getHostName()+":"+gatewaySessionFactory.getConfiguration().getPort();
     }
 }
 ​

测试

依照第28章的内容,将所有模块重新mavean clean install,删除原有的docker部署及对应镜像,重新构造镜像,重新部署实例

到此基础内容已近全部完成

参考资料