业务流程

在上一章节中我们已经实现了在api-gateway-assist中从网关中心api-gateway-center中拉取配置,但是我们在上一节中并没有对这个配置信息做任何处理,仅仅是拉取了下来。但是我们要记住我们的目的时用这个配置在网关核心通信组件上执行,来进行远程服务的调用。那么如何将拉取下来的配置运用到我们的网关核心通信组件api-gateway-core上呢?在之前的设计中我们提到过,我们的api-gateway-assist是一个类似于starter的对api-gateway-core的包装,而无论是是springboot的starter还是Dubbo的starter,这个starter只要引入,那么就可以通过这个starter使用starter包装的方法,并且查看相关的starter资源resource包,会发现你只引入了一个starter却同时也引入了其starter包装的内容进来。这就表示我们在starter中的pom依赖其实是引入了我们被包装的那一部分内容,而后续的对starter的打包时也会将包装内容一起打包引入,后续的东西只需要引入starter就可以获得对应的所有功能。

因此,这里api-gateway-assist拉取到到的配置如何放入通信组件api-gateway-core呢?

  • 将通信组件api-gateway-core打包引入api-gateway-assist中,api-gateway-assist解析配置信息后直接调用通信组件api-gateway-core即可。

但是这里还有一个问题,我们之前对于 api-gateway-core的开发中,对于配置下 的Netty 服务的IP地址和端口信息是写死的,而我们在api-gateway-assist中的application.xml中确实有配置文件指定的(这也是之前写 api-gateway-core中一直提到的要用配置文件来配置Configuration),因此

  • 在本章节的第16章中,我们先提取 Netty 通信服务配置信息,包括:IP、端口、线程等,到会话的 Configuration 配置类中进行统一的维护和管理,便于外部调用方向内部传递信息。

  • 在下一章节的第17章中,我们再来使用相关的拉取到的配置信息。

  • 第16,17章最好一起看,深度关联

业务实现

由于我们会将api-gateway-core引入api-gateway-assist中,但是我们之前对于自身的包结构一直是

而我们现在写的api-gateway-center,api-gateway-assist,api-gateway-core都是以com.zshunbao.gateway的包结构开头,因此这里我们要首先对api-gateway-core调整包结构,如下图

修改分层结构。之后对 socket 通信提取配置,放到 session 会话的 Configuration 类中维护即可

本章节项目结构如下

首先我们来重新定义原先的Configuration.java(本章的主要内容就集中在Configuration.java上)

 /**
  * @program: api-gateway-core
  * @ClassName Configuration
  * @description:
  * @author: zs宝
  * @create: 2025-08-04 14:56
  * @Version 1.0
  **/
 ​
 package com.zshunbao.gateway.core.session;
 ​
 import com.zshunbao.gateway.core.authorization.IAuth;
 import com.zshunbao.gateway.core.authorization.auth.AuthService;
 import com.zshunbao.gateway.core.bind.IGenericReference;
 import com.zshunbao.gateway.core.bind.MapperRegistry;
 import com.zshunbao.gateway.core.datasource.Connection;
 import com.zshunbao.gateway.core.executor.Executor;
 import com.zshunbao.gateway.core.executor.SimpleExecutor;
 import com.zshunbao.gateway.core.mapping.HttpStatement;
 import org.apache.dubbo.config.ApplicationConfig;
 import org.apache.dubbo.config.ReferenceConfig;
 import org.apache.dubbo.config.RegistryConfig;
 import org.apache.dubbo.rpc.service.GenericService;
 ​
 import java.util.HashMap;
 import java.util.Map;
 ​
 public class Configuration {
     /**
      * 增添对于netty服务地址,端口,服务线程数的配置,以便这部分内容在以后都可以由用户指定
      */
     // 网关 Netty 服务地址,默认为本机地址
     private String hostName = "127.0.0.1";
     //网关Netty服务端口,默认为7397
     private int port=7397;
     //网关Netty 服务线程数配置
     private int bossNThreads=1;
     private int workNThreads=4;
 ​
     //对应的注册器
     private final MapperRegistry mapperRegistry=new MapperRegistry(this);
     private final Map<String, HttpStatement> httpStatements = new HashMap<>();
 ​
     //根据官方文档的示例,dubbo的泛化调用的API调用方式主要有3个东西需要配置
     //分别是ApplicationConfig,RegistryConfig,ReferenceConfig<GenericService>
     //但是我们这是一个网关,被调用的服务方可能不止一个,因此,我们需要按照对应的服务名和配置名,将其对应起来保存
     //RPC 应用服务配置项
     //注意键为应用名称;值为应用服务配置项
     private final Map<String, ApplicationConfig> applicationConfigMap=new HashMap<>();
     //RPC 注册中心配置项
     //键为应用名称;值为注册中心配置项
     private final Map<String, RegistryConfig> registryConfigMap=new HashMap<>();
     //RPC 泛化服务配置项
     //注意这里的键:远程服务接口的全限定类名 ; 值为泛化引用配置对象
     private final Map<String, ReferenceConfig<GenericService>> referenceConfigMap=new HashMap<>();
     //配置鉴权
     private IAuth auth=new AuthService();
 ​
     /*
     public Configuration(){
         //TODO 后期从配置中获取,本节主要内容是泛化服务的调用
         ApplicationConfig applicationConfig=new ApplicationConfig();
         applicationConfig.setName("api-gateway-test");
         applicationConfig.setQosEnable(false);
 ​
         RegistryConfig registryConfig=new RegistryConfig();
         //配置应用在那个注册中心可以被调用
         registryConfig.setAddress("zookeeper://127.0.0.1:2181");
         registryConfig.setRegister(false);
 ​
         //对应的泛化服务配置
         ReferenceConfig<GenericService> reference=new ReferenceConfig<>();
         //配置泛化调用的服务方接口
         reference.setInterface("cn.bugstack.gateway.rpc.IActivityBooth");
         reference.setVersion("1.0.0");
         reference.setGeneric("true");
 ​
         //加入缓存中去
         applicationConfigMap.put("api-gateway-test",applicationConfig);
         registryConfigMap.put("api-gateway-test",registryConfig);
         referenceConfigMap.put("cn.bugstack.gateway.rpc.IActivityBooth",reference);
 ​
     }
     */
 ​
     /**
      * 由以往的构造函数改造而来的配置注册,因为以后的配置都是外部配置加载而来传入,不会直接写死
      * @param applicationName
      * @param address
      * @param interfaceName
      * @param version
      */
     public synchronized void registryConfig(String applicationName,String address,String interfaceName,String version){
         if(applicationConfigMap.get(applicationName)==null){
             ApplicationConfig applicationConfig=new ApplicationConfig();
             applicationConfig.setName(applicationName);
             applicationConfig.setQosEnable(false);
             applicationConfigMap.put(applicationName,applicationConfig);
         }
 ​
         if(registryConfigMap.get(applicationName)==null){
             RegistryConfig registryConfig=new RegistryConfig();
             //配置应用在那个注册中心可以被调用
             registryConfig.setAddress(address);
             registryConfig.setRegister(false);
             registryConfigMap.put(applicationName,registryConfig);
         }
         if(referenceConfigMap.get(interfaceName)==null){
             ReferenceConfig<GenericService>referenceConfig=new ReferenceConfig<>();
             referenceConfig.setInterface(interfaceName);
             referenceConfig.setVersion(version);
             referenceConfig.setGeneric("true");
             referenceConfigMap.put(interfaceName,referenceConfig);
         }
     }
 ​
 ​
     //对应的服务配置项对外get方法
     public ApplicationConfig getApplicationConfig(String applicationName) {
         return applicationConfigMap.get(applicationName);
     }
 ​
     //对应的对外注册中心的get方法
     public RegistryConfig getRegistryConfig(String applicationName) {
         return registryConfigMap.get(applicationName);
     }
 ​
     //对应的对外泛化调用配置项get方法
     public ReferenceConfig<GenericService> getReferenceConfig(String interfaceName) {
         return referenceConfigMap.get(interfaceName);
     }
 ​
     //添加泛化调用方法
     public void addMapper(HttpStatement httpStatement) {
         mapperRegistry.addMapper(httpStatement);
     }
 ​
     public IGenericReference getMapper(String uri, GatewaySession gatewaySession) {
         return mapperRegistry.getMapper(uri, gatewaySession);
     }
 ​
     public void addHttpStatement(HttpStatement httpStatement) {
         httpStatements.put(httpStatement.getUri(), httpStatement);
     }
 ​
     public HttpStatement getHttpStatement(String uri) {
         return httpStatements.get(uri);
     }
 ​
     public Executor newExecutor(Connection connection) {
         return new SimpleExecutor(this,connection);
     }
 ​
     public boolean validate(String uid, String token) {
         return auth.validate(uid,token);
     }
 ​
 ​
     public String getHostName() {
         return hostName;
     }
 ​
     public void setHostName(String hostName) {
         this.hostName = hostName;
     }
 ​
     public int getPort() {
         return port;
     }
 ​
     public void setPort(int port) {
         this.port = port;
     }
 ​
     public int getBossNThreads() {
         return bossNThreads;
     }
 ​
     public void setBossNThreads(int bossNThreads) {
         this.bossNThreads = bossNThreads;
     }
 ​
     public int getWorkNThreads() {
         return workNThreads;
     }
 ​
     public void setWorkNThreads(int workNThreads) {
         this.workNThreads = workNThreads;
     }
 }
 ​
  • 主要增加了一些在GatewaySocketServer配置信息,包括:服务启动IP、服务端口、线程数,进行提取,并赋予一定的默认值

  • 增加了registryConfig函数,这个函数是有原本的构造函数改造而来,主要是调用Dubbo远程的引用缓存,但是后续的进行引用缓存的远程RPC服务信息不会再由我们写死,而是由api-gateway-assist从网关注册中心拉取而来。

接下来还有修改的,一个是网关Netty的服务端相关配置要从直接写死改为从配置类中获取,从而会话初始化也会有所修改(都只是小修改)

GatewaySocketServer.java

 /**
  * @program: api-gateway-core
  * @ClassName GatewaySocketServer
  * @description: 网关会话服务端
  * @author: zs宝
  * @create: 2025-07-25 16:48
  * @Version 1.0
  **/
 ​
 package com.zshunbao.gateway.core.socket;
 ​
 import com.zshunbao.gateway.core.session.Configuration;
 import com.zshunbao.gateway.core.session.defaults.DefaultGatewaySessionFactory;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ​
 import java.net.InetSocketAddress;
 import java.util.concurrent.Callable;
 ​
 //实现异步调用接口,使整个会话服务端的过程异步建立
 public class GatewaySocketServer implements Callable<Channel> {
     private final Logger logger = LoggerFactory.getLogger(GatewaySocketServer.class);
     private EventLoopGroup boss;
     private EventLoopGroup worker;
     private Channel channel;
 ​
     private DefaultGatewaySessionFactory gatewaySessionFactory;
     private final Configuration configuration;
 ​
     public GatewaySocketServer(Configuration configuration,DefaultGatewaySessionFactory gatewaySessionFactory) {
         this.gatewaySessionFactory = gatewaySessionFactory;
         this.configuration = configuration;
         this.initEventLoopGroup();
     }
 ​
     private void initEventLoopGroup() {
         boss=new NioEventLoopGroup(configuration.getBossNThreads());
         worker=new NioEventLoopGroup(configuration.getWorkNThreads());
     }
 ​
     @Override
     public Channel call() throws Exception {
         ChannelFuture channelFuture=null;
         try {
             ServerBootstrap b=new ServerBootstrap();
             b.group(boss,worker)
                     .channel(NioServerSocketChannel.class)
                     //设置 TCP 最大连接等待队列
                     .option(ChannelOption.SO_BACKLOG,128)
                     .childHandler(new SessionChannelInitializer(gatewaySessionFactory,configuration));
             //InetSocketAddress 是 Java 用于表示“IP + 端口”的地址对象
             //syncUninterruptibly() 会阻塞直到启动完成(不抛出异常)
             channelFuture = b.bind(new InetSocketAddress(configuration.getHostName(),configuration.getPort())).syncUninterruptibly();
             this.channel=channelFuture.channel();
         }catch (Exception e){
             logger.error("socket server start error.", e);
         }finally {
             if(null!=channelFuture && channelFuture.isSuccess()){
                 logger.info("socket server start done.");
             }else{
                 logger.error("socket server start error.");
             }
         }
         return channel;
     }
 }
 ​
  • 主要是服务启动IP、服务端口、线程数改为从配置中获取

然后由于服务端的修涂,会话初始化的类也会有修改(都不涉及到业务上的,只是一些字段的获取方式改为从配置类中拿取)

SessionChannelInitializer.java

 /**
  * @program: api-gateway-core
  * @ClassName SessionChannelInitializer
  * @description:自定义netty服务端链接的childHandler的初始化工具
  * @author: zs宝
  * @create: 2025-07-25 16:56
  * @Version 1.0
  **/
 ​
 package com.zshunbao.gateway.core.socket;
 ​
 ​
 ​
 import com.zshunbao.gateway.core.session.Configuration;
 import com.zshunbao.gateway.core.session.defaults.DefaultGatewaySessionFactory;
 ​
 import com.zshunbao.gateway.core.socket.handlers.AuthorizationHandler;
 import com.zshunbao.gateway.core.socket.handlers.GatewayServerHandler;
 import com.zshunbao.gateway.core.socket.handlers.ProtocolDataHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 ​
 public class SessionChannelInitializer extends ChannelInitializer<SocketChannel>{
 ​
     private DefaultGatewaySessionFactory gatewaySessionFactory;
     private final Configuration configuration;
 ​
     public SessionChannelInitializer(DefaultGatewaySessionFactory gatewaySessionFactory, Configuration configuration) {
         this.gatewaySessionFactory = gatewaySessionFactory;
         this.configuration = configuration;
     }
 ​
     @Override
     protected void initChannel(SocketChannel channel) throws Exception {
         //得到处理的流水线
         ChannelPipeline pipeline = channel.pipeline();
         pipeline.addLast(new HttpRequestDecoder());
         pipeline.addLast(new HttpResponseEncoder());
         pipeline.addLast(new HttpObjectAggregator(1024*1024));
         pipeline.addLast(new GatewayServerHandler(configuration));
         pipeline.addLast(new AuthorizationHandler(configuration));
         pipeline.addLast(new ProtocolDataHandler(gatewaySessionFactory));
     }
 }

api-gateway-core其余代码与之前一样,未做任何修改

测试

     @Test
     public void test_gateway() throws InterruptedException, ExecutionException {
         // 1. 创建配置信息加载注册
         Configuration configuration = new Configuration();
         configuration.setHostName("127.0.0.1");
         configuration.setPort(7397);
 ​
         // 2. 基于配置构建会话工厂
         DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory(configuration);
 ​
         // 3. 创建启动网关网络服务
         GatewaySocketServer server = new GatewaySocketServer(configuration, gatewaySessionFactory);
 ​
         Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
         Channel channel = future.get();
 ​
         if (null == channel) throw new RuntimeException("netty server start error channel is null");
 ​
         while (!channel.isActive()) {
             logger.info("netty server gateway start Ing ...");
             Thread.sleep(500);
         }
         logger.info("netty server gateway start Done! {}", channel.localAddress());
 ​
         // 4. 注册接口
         configuration.registryConfig("api-gateway-test", "zookeeper://127.0.0.1:2181", "cn.bugstack.gateway.rpc.IActivityBooth", "1.0.0");
 ​
         HttpStatement httpStatement01 = new HttpStatement(
                 "api-gateway-test",
                 "cn.bugstack.gateway.rpc.IActivityBooth",
                 "sayHi",
                 "java.lang.String",
                 "/wg/activity/sayHi",
                 HttpCommandType.GET,
                 false);
 ​
         HttpStatement httpStatement02 = new HttpStatement(
                 "api-gateway-test",
                 "cn.bugstack.gateway.rpc.IActivityBooth",
                 "insert",
                 "cn.bugstack.gateway.rpc.dto.XReq",
                 "/wg/activity/insert",
                 HttpCommandType.POST,
                 true);
 ​
         configuration.addMapper(httpStatement01);
         configuration.addMapper(httpStatement02);
 ​
         Thread.sleep(Long.MAX_VALUE);
     }
  • 注意这个测试我们之前的章节都是是先加载映射内容,但是这里我们是先启动网关Netty服务端,再加载的映射内容,也就是说我们的网关通信组件启动完成前后我们都可以添加映射接口,也就是当有新的接口注册上来以后,也可以随时映射到网关中。

本章第16章是为17章做准备

参考资料