业务流程
在前面的所有章节中,我们已经将api-gateway-core,api-gateway-assist,api-gateway-engine,api-gateway-center,api-gateway-sdk的流程全部打通,如下图
这里面RPC服务引入了api-gateway-sdk组件会在启动的过程中将自身信息注册到网关注册中心去,而api-gateway-core,api-gateway-assist,api-gateway-engine这一套的网关服务会在api-gateway-engine启动的时候,将网关算力节点api-gateway-core的相关信息通过api-gateway-assist这个starter注册到网关中心,然后就会依照api-gateway-core的gateway_id信息通过网关注册中心关联网关算力服务和RPC远程服务的gateway_distribution表进行对应的RPC服务配置的拉取,拉取下来后,网关就可以将对应对网关的http请求通过api-gateway-core进行RPC泛化调用,拿到结果后再将结果从网关返回到用户端。这一套操作下来无论内部有多少服务,都只需要访问对应的网关请求即可,有效的减少大量访问请求的请求地址数量,以及不同域名的管理,并且通过网关也大大增强了内部服务的安全保障。
但是上面的流程还有一个不足,就是我们的网关算力服务在启动过程中会拉取注册中心的接口信息,到网关算法上进行注册操作。通过这样的一个步骤,才能让我们在访问网关接口的时候,泛化调用到对应的 RPC 服务。那么如果当网关算力服务已经是部署在Docker容器中后,再有新的服务或者接口注册到网关注册中心的时候,那么网关算力该如何把这些信息获取到并完成网关的映射呢?
这也是本节需要完成的业务需求
当有新的RPC服务加入网关注册中心时,如何通过我们已经提前定义好的gateway_distribution表的信息让对应的算力节点拉取相关配置呢?
这里有以下几个方案可以考虑:
网关算力节点的接口不断的轮训探测,向注册中心不断发起拉取配置的请求。接口的轮训,在网关算力 api-gateway-assist 助手服务中通过不断的像网关中心请求接口的方式,拉取到所有需要被注册的接口。这里可以在已经拉取的服务接口上,在Redis中做数据的记录,减少重复拉取。不过这样的方式会给网关中心带来不小的压力。
保持网关注册中心与网关算力服务的长连接。服务的连接,在网关助手类与网关中心建立一个 Netty 的服务,由网关中心接收到新的接口注册时候进行信息通知。但这样的长链接,已经会占用不少的资源。
事件通知,当有新的服务注册上来后,向对应的网关算力服务发出消息,算力服务收到消息后,进行拉取配置。可以使用 MQ、ZK等方式,也可以使用 Redis 的发布和订阅。在有接口变化的时候,可以通过消息的推送,让网关算力获取到变化的接口信息进行注册处理。那么本文就是通过 Redis 的发布消息/消息订阅方式进行处理。如下图
其详细的数据流向,具体流程如下图
本章节的业务涉及到以下模块的新增功能
api-gateway-sdk:新增向网关注册中心发起事件通知的请求,具体体现在,当当前RPC服务完成注册后,请求网关注册中心发起对应新增服务的事件通知。
api-gateway-center:新增对外的redis配置拉取接口,新增redis发布消息模块功能,并提供应用服务注册后的事件通知操作。这个通知只会通知给对应的网关算力服务,不会全局通知。
api-gateway-assist:新增Redis 订阅消息模块,当收到注册中心的消息推送时,则根据RPC服务系统的标识信息进行拉取服务配置。
参考资料
由于本节涉及到Redis的发布订阅消息的使用,所以非常必要阅读对应的官方文档,如下资料
Redis 原理 & 命令
Redis 官方 Pub/Sub 文档 👉 https://redis.io/docs/latest/develop/pubsub/ (PUBLISH、SUBSCRIBE、PSUBSCRIBE 的详细说明)
Redis 命令参考(中文版) 👉 http://redis.cn/commands.html (搜索 PUBLISH、SUBSCRIBE)
Spring Data Redis
Spring Data Redis 官方文档 👉 https://docs.spring.io/spring-data/redis/docs/current/reference/html/
Section 4: RedisTemplate API
Section 6: Messaging with Redis (Pub/Sub)
Spring 官方指南:Messaging with Redis 👉 https://spring.io/guides/gs/messaging-redis/ (快速上手示例,和项目类似)
书籍 & 系统学习
《Redis 设计与实现》(黄健宏)
适合深入理解 Redis 的内部机制(含 Pub/Sub 部分)。
《Redis 实战》 (Redis in Action)
偏工程应用,有 Pub/Sub 的案例。
Spring Boot 实战类书籍(如《Spring Boot 实战》)
里面有 Redis 集成示例,可以结合你的项目看。
进阶方向
如果想做可靠消息(保证送达 + 回放): 👉 学习 Redis Streams(
XADD
、XREADGROUP
),官方文档: https://redis.io/docs/latest/develop/data-types/streams/如果想和 MQ 对比学习:可以对照 RabbitMQ、Kafka 的 publish/subscribe 模型。
业务实现
环境搭建
首先docker安装redis环境
version: '1.0'
services:
zookeeper:
image: zookeeper:3.4.13
container_name: zookeeper
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zookeeper:2888:3888
networks:
- api-gateway
mysql:
image: mysql:8.0.32
container_name: mysql
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
TZ: Asia/Shanghai
MYSQL_ROOT_PASSWORD: 123456
ports:
- "13306:3306"
volumes:
- ./mysql/my.cnf:/etc/mysql/conf.d/mysql.cnf:ro
- ./mysql/sql:/docker-entrypoint-initdb.d
networks:
- api-gateway
# Redis
redis:
image: redis:6.2
container_name: redis
restart: always
hostname: redis
privileged: true
ports:
- 16379:6379
volumes:
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
- redis-data:/data # 数据持久化卷
command: redis-server /usr/local/etc/redis/redis.conf
networks:
- api-gateway # 统一网络
environment:
- TZ=Asia/Shanghai
networks:
api-gateway:
driver: bridge
volumes: # 添加数据卷定义
redis-data:
redis.conf
bind 0.0.0.0
port 6379
# 数据持久化目录(与下面的 data 卷挂载路径 /data 对应)
dir /data
api-gateway-center
首先我们来完成网关注册中心api-gateway-center这部分的新增功能。首先我们需要定义对应的Redis发布消息的领域,如下图
我们想想我们本节在Redis的应用上主要就两个,一个查询redis配置并对外提供拉取借口,一个redis发布订阅消息并对外提供请求接口,因此领域需要有这两个接口
IMessageService.java
package com.zshunbao.gateway.center.application;
import java.util.Map;
/**
* @program: api-gateway-center
* @ClassName IMessageService
* @description: 消息服务
* @author: zs宝
* @create: 2025-09-04 15:11
* @Version 1.0
**/
public interface IMessageService {
Map<String,String> queryRedisConfig();
void pushMessage(String gatewayId, Object message);
}
接下来就是主要实现这两个接口的功能
首先是最重要的发布订阅消息。在以往的spring 集成redis的使用中,我们一般都利用Spring Data Redis提供的RedisTemplate对redis的各种命令功能进行快速便捷的使用。而在本节中我们涉及到了消息的发送,不可避免的这就涉及到了消息实体的类型(键肯定是String),由于无法判定以后的消息究竟是什么类型,同时为了给以后留足扩展的空间,这里我们需要对使用的RedisTemplate的键值做定义,如下
PublisherConfig.java
设置业务使用的RedisTemplate键为String,值为Object。传输过程中进行要序列化,我们设置序列化工具
package com.zshunbao.gateway.center.domain.message.config;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
/**
* @program: api-gateway-center
* @ClassName PublisherConfig
* @description: Redis 消息监听推送配置
* @author: zs宝
* @create: 2025-09-04 15:13
* @Version 1.0
**/
@Configuration
public class PublisherConfig {
@Bean
public RedisTemplate<String,Object> redisMessageTemplate(RedisConnectionFactory connectionFactory){
RedisTemplate<String,Object> redisMessageTemplate=new RedisTemplate<>();
redisMessageTemplate.setConnectionFactory(connectionFactory);
//设置序列化工具
redisMessageTemplate.setDefaultSerializer(new FastJsonRedisSerializer<Object>(Object.class));
return redisMessageTemplate;
}
}
接着就是使用这个RedisTemplate进行消息的发布Publisher.java
package com.zshunbao.gateway.center.domain.message.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
/**
* @program: api-gateway-center
* @ClassName Publisher
* @description: 消息推送
* @author: zs宝
* @create: 2025-09-04 15:17
* @Version 1.0
**/
@Service
public class Publisher {
private final RedisTemplate<String,Object> redisMessageTemplate;
@Autowired
public Publisher(RedisTemplate<String, Object> redisMessageTemplate) {
this.redisMessageTemplate = redisMessageTemplate;
}
public void pushMessage(String topic,Object message){
redisMessageTemplate.convertAndSend(topic,message);
}
}
最后对本章节的有关redis的服务做对领域层之外调用的服务实现IMessageServiceImpl.java
package com.zshunbao.gateway.center.domain.message.service;
import com.zshunbao.gateway.center.application.IMessageService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* @program: api-gateway-center
* @ClassName IMessageServiceImpl
* @description:
* @author: zs宝
* @create: 2025-09-04 15:20
* @Version 1.0
**/
@Service
public class IMessageServiceImpl implements IMessageService {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private Integer port;
@Resource
private Publisher publisher;
@Override
public Map<String, String> queryRedisConfig() {
Map<String, String>config=new HashMap<>();
config.put("host",host);
config.put("port",String.valueOf(port));
return config;
}
@Override
public void pushMessage(String gatewayId, Object message) {
publisher.pushMessage(gatewayId,message);
}
}
其中有关redis配置信息的拉取主要时redis的ip和端口号,要让网关知道是从哪一个redis中获得消息
综上,网关注册中心的有关redis服务的领域层充血模型基本已经完成,现在我们要让他对外提供访问的http接口,好让sdk组件可以在服务注册完成后,可以发起事件通知,可以让网关算力服务拉取到监听的redis配置。
首先是sdk的RPC服务发起事件通知的请求接口RpcRegisterManage.java
@PostMapping(value = "registerEvent", produces = "application/json;charset=utf-8")
public Result<Boolean> registerEvent(@RequestParam String systemId) {
try {
logger.info("应用信息注册完成通知 systemId:{}", systemId);
String gatewayId =configManageService.queryGatewayDistribution(systemId);
//发布新的服务注册的订阅到redis
messageService.pushMessage(gatewayId,systemId);
return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), true);
}catch (Exception e){
logger.error("应用信息注册完成通知失败 systemId:{}", systemId, e);
return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(),false);
}
}
其中对应的queryGatewayDistribution函数也只是一个最简单的查询,没有任何其它逻辑,只有顺着充血模型->仓储服务->dao的查询
<select id="queryGatewayDistribution" parameterType="java.lang.String" resultType="java.lang.String">
SELECT gateway_id FROM gateway_distribution WHERE system_id = #{systemId}
</select>
然后是有关api-gateway-assist对redis配置的拉取接口。
这里有个问题:为什么我们要提供这样一个接口?直接在对应的组件的application配置不就好了吗?这是因为未来我们的redis可能会很多,网关算力服务实例也会很多,如果每个都去配置,很有可能配置错误,但是由网关注册中心直接提供可以大大简化各个实例的配置服务与压力。我们只用在网关注册中心配置一次即可,其余直接从这里拉取就可以了
如下GatewayConfigManage.java
@PostMapping(value = "queryRedisConfig", produces = "application/json;charset=utf-8")
public Result<Map<String, String>> queryRedisConfig() {
try {
logger.info("查询配置中心Redis配置信息");
Map<String, String> redisConfig = messageService.queryRedisConfig();
return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), redisConfig);
}catch (Exception e){
logger.error("查询配置中心Redis配置信息失败", e);
return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), null);
}
}
以上就是对外的redis配置拉取接口
但是做到这里还是有一个问题,我们的流程图显示是通过gateway_id,system_id来拉取配置,但是我们之前提供的就只是通过gateway_id来拉取所有配置
这里我们能不能即兼容之前的功能,即api-gateway-assist启动时直接通过gateway_id拉取所有的对应的RPC服务配置,而当有新服务注册通知时,只通过gateway_id,system_id拉取对应的服务,而不是全部对应的再拉取一次。
这里我们可以仔细看下流程,只有当有新服务注册成功后,才会有对应的system_id传送出来,这个信息蕴含在redis消息之中,但是api-gateway-assist启动时是一定没有这个system_id信息的,即当拉取配置请求没有system_id时,我们拉取对应的所有服务配置,当system_id有时,说明这是有新服务出现,我们只需要拉取对应的这一个服务配置即可。
因此针对原先的拉取配置接口,服务,有如下调整
GatewayConfigManage.java
@PostMapping(value = "queryApplicationSystemRichInfo",produces ="application/json;charset=utf-8" )
public Result<ApplicationSystemRichInfo> queryApplicationSystemRichInfo(@RequestParam String gatewayId,@RequestParam String systemId){
try {
logger.info("查询分配到网关下的待注册系统信息(系统、接口、方法) gatewayId:{}", gatewayId);
ApplicationSystemRichInfo applicationSystemRichInfo = configManageService.queryApplicationSystemRichInfo(gatewayId,systemId);
return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), applicationSystemRichInfo);
}catch (Exception e){
logger.error("查询分配到网关下的待注册系统信息(系统、接口、方法)异常 gatewayId:{}",gatewayId,e);
return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(),null);
}
}
ConfigManageService.java
@Override
public ApplicationSystemRichInfo queryApplicationSystemRichInfo(String gatewayId,String systemId) {
logger.info("domain 开始查询网关对应的所有服务 gatewayId:{}",gatewayId);
// 1. 查询出网关ID对应的关联系统ID集合。也就是一个网关ID会被分配一些系统RPC服务注册进来,需要把这些服务查询出来。
List<String>systemIdList=new ArrayList<>();
if(null==systemId){
systemIdList = configManageRepository.queryGatewayDistributionSystemIdList(gatewayId);
}else {
systemIdList.add(systemId);
}
ApplicationSystemRichInfo applicationSystemRichInfo = new ApplicationSystemRichInfo();
applicationSystemRichInfo.setGatewayId(gatewayId);
if(systemIdList==null || systemIdList.size()==0){
return applicationSystemRichInfo;
}
//2、根据系统id查询出所有的对应的系统信息
List<ApplicationSystemVO> applicationSystemVOList=configManageRepository.queryApplicationSystemList(systemIdList);
//3、现在就根据这个系统的信息,不断的去查询对应的接口表,方法表中的信息,包装进去
for(ApplicationSystemVO applicationSystemVO:applicationSystemVOList){
List<ApplicationInterfaceVO> applicationInterfaceVOList =configManageRepository.queryApplicationInterfaceList(applicationSystemVO.getSystemId());
for(ApplicationInterfaceVO applicationInterfaceVO:applicationInterfaceVOList){
List<ApplicationInterfaceMethodVO> applicationInterfaceMethodVOList=configManageRepository.queryApplicationInterfaceMethodList(applicationSystemVO.getSystemId(),applicationInterfaceVO.getInterfaceId());
applicationInterfaceVO.setMethodList(applicationInterfaceMethodVOList);
}
applicationSystemVO.setInterfaceList(applicationInterfaceVOList);
}
applicationSystemRichInfo.setApplicationSystemVOList(applicationSystemVOList);
return applicationSystemRichInfo;
}
如上服务代码做了小修改,拉取配置请求没有system_id时,我们拉取对应的所有服务配置,当system_id有时,说明这是有新服务出现,我们只需要拉取对应的这一个服务配置即可。
api-gateway-sdk
在之前的api-gateway-sdk组件中,我们已经提供了注册功能,这里只是需要添加一个注册完成后,请求网关注册中心发起事件通知即可
GatewayCenterService.java
//注册完成后发布的redis消息
public void doRegisterEvent(String address, String systemId) {
Map<String,Object>paramMap=new HashMap<>();
paramMap.put("systemId", systemId);
String resultStr;
try {
resultStr=HttpUtil.post(address+"/wg/admin/register/registerEvent",paramMap,1200);
}catch (Exception e){
logger.error("应用服务接口事件方法异常,链接资源不可用:{}", address + "/wg/admin/register/registerEvent");
throw e;
}
Result<Boolean> result = JSON.parseObject(resultStr, new TypeReference<Result<Boolean>>() {
});
logger.info("应用服务接口事件方法 systemId:{} 注册结果:{}", systemId, resultStr);
if (!"0000".equals(result.getCode()))
throw new GatewayException("向网关中心注册应用接口服务异常 [systemId:" + systemId + "] ");
}
GatewaySDKApplication.java
package com.zshunbao.gateway.sdk.application;
import com.alibaba.fastjson.JSON;
import com.zshunbao.gateway.sdk.GatewayException;
import com.zshunbao.gateway.sdk.annotation.ApiProducerClazz;
import com.zshunbao.gateway.sdk.annotation.ApiProducerMethod;
import com.zshunbao.gateway.sdk.config.GatewaySDKServiceProperties;
import com.zshunbao.gateway.sdk.domain.service.GatewayCenterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import java.lang.reflect.Method;
/**
* @program: api-gateway-sdk
* @ClassName GatewaySDKApplication
* @description: 应用服务注册
* @author: zs宝
* @create: 2025-08-29 16:24
* @Version 1.0
**/
public class GatewaySDKApplication implements BeanPostProcessor {
private Logger logger = LoggerFactory.getLogger(GatewaySDKApplication.class);
private GatewaySDKServiceProperties properties;
private GatewayCenterService gatewayCenterService;
public GatewaySDKApplication(GatewaySDKServiceProperties properties, GatewayCenterService gatewayCenterService) {
this.properties = properties;
this.gatewayCenterService = gatewayCenterService;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//先查看bean中是否有网关专门标注接口的注解
ApiProducerClazz apiProducerClazz = bean.getClass().getAnnotation(ApiProducerClazz.class);
//如果没有相关注解,直接返回bean即可
if(apiProducerClazz==null){
return bean;
}
//注册系统信息
logger.info("\n应用注册:系统信息 \nsystemId: {} \nsystemName: {} \nsystemType: {} \nsystemRegistry: {}", properties.getSystemId(), properties.getSystemName(), "RPC", properties.getSystemRegistry());
gatewayCenterService.doRegisterApplication(properties.getAddress(),
properties.getSystemId(),
properties.getSystemName(),
"RPC",
properties.getSystemRegistry());
//注册接口信息
Class<?>[] interfaces = bean.getClass().getInterfaces();
if(interfaces.length!=1){
throw new GatewayException(bean.getClass().getName() + "interfaces not one this is " + JSON.toJSONString(interfaces));
}
String interfaceId = interfaces[0].getName();
logger.info("\n应用注册:接口信息 \nsystemId: {} \ninterfaceId: {} \ninterfaceName: {} \ninterfaceVersion: {}", properties.getSystemId(), interfaceId, apiProducerClazz.interfaceName(), apiProducerClazz.interfaceVersion());
gatewayCenterService.doRegisterApplicationInterface(properties.getAddress(),
properties.getSystemId(),
interfaceId,
apiProducerClazz.interfaceName(),
apiProducerClazz.interfaceVersion());
//有网关标注在接口上的注解,那么就一定有相应的标注方法的接口,接下来,我们要将其找到,取出对应的信息
//先从字节码中拿到所有方法
Method[] methods = bean.getClass().getMethods();
//挨个遍历方法寻找有无标注网关在方法上的注解
for(Method method:methods){
ApiProducerMethod apiProducerMethod = method.getAnnotation(ApiProducerMethod.class);
//若没有,则直接下一个
if(apiProducerMethod==null){
continue;
}
//如果有,提取这个方法的相关信息
//但其实这个方法的相关信息,我们在注解中就已近标注完成,只需要从注解中拿就可以了
//但是我们在网关后续利用对RPC进行泛化调用的时候,还需要一个方法中的参数类型信息,这个是没有在注解中标注的
//需要我们动态提取
Class<?>[] parameterTypes = method.getParameterTypes();
StringBuilder parameters = new StringBuilder();
for(Class<?> clazz:parameterTypes){
//将每个参数类型名用字符串存储,中间用","隔开
parameters.append(clazz.getName()).append(",");
}
//将收集到的参数类型名字符串,根据","做拆分(由于最后多了一个","号)
String parameterType = parameters.toString().substring(0, parameters.toString().lastIndexOf(","));
//方法信息
logger.info(
"\n应用注册:方法信息 \nsystemId: {} \ninterfaceId: {} \nmethodId: {} \nmethodName: {} \nparameterType: {} \nuri: {} \nhttpCommandType: {} \nauth: {}",
properties.getSystemId(),
bean.getClass().getName(),
method.getName(),
apiProducerMethod.methodName(),
parameterType,
apiProducerMethod.uri(),
apiProducerMethod.httpCommandType(),
apiProducerMethod.auth()
);
gatewayCenterService.doRegisterApplicationInterfaceMethod(properties.getAddress(),
properties.getSystemId(),
interfaceId,
method.getName(),
apiProducerMethod.methodName(),
parameterType,
apiProducerMethod.uri(),
apiProducerMethod.httpCommandType(),
apiProducerMethod.auth());
}
//注册完成后,向redis的发布消息,从而使网管服务api-gateway-assist接受到新的服务注册上去的消息
gatewayCenterService.doRegisterEvent(properties.getAddress(), properties.getSystemId());
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
}
api-gateway-assist
api-gateway-assist在本章节中相当于redis消息服务的订阅者(消费者),当监听到对应的通知消息时,将会拉取新的服务配置
根据官方提供的示例Spring 官方指南:Messaging with Redis
我们需要作如下准备。
由于我么在api-gateway-assist中不会通过配置文件的方式配置redis,因此我们需要单独拉取到的配置创建对应的redis连接,以便后续服务。
RegisterGatewayService.java
public Map<String,String>queryRedisConfig(String address){
String resultStr;
try{
resultStr=HttpUtil.post(address+"/wg/admin/config/queryRedisConfig","",1550);
}catch (Exception e){
logger.error("网关服务拉取配置异常,链接资源不可用:{}", address + "/wg/admin/config/queryRedisConfig", e);
throw e;
}
Result<Map<String, String>> result = JSON.parseObject(resultStr, new TypeReference<Result<Map<String, String>>>() {
});
logger.info("从网关中心拉取Redis配置信息完成。result:{}", resultStr);
if (!"0000".equals(result.getCode()))
throw new GatewayException("从网关中心拉取Redis配置信息异常");
return result.getData();
}
GatewayAutoConfig.java
@Bean
public RedisConnectionFactory redisConnectionFactory(GatewayServiceProperties properties, RegisterGatewayService gatewayService) {
//拉取网关注册中心连接的redis配置
Map<String, String> redisConfig = gatewayService.queryRedisConfig(properties.getAddress());
//构建Redis服务
RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration();
standaloneConfig.setHostName(redisConfig.get("host"));
standaloneConfig.setPort(Integer.parseInt(redisConfig.get("port")));
//默认配置信息;一般这些配置可以被抽取出来
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(100);
poolConfig.setMaxWaitMillis(30 * 1000);
poolConfig.setMinIdle(20);
poolConfig.setMaxIdle(40);
poolConfig.setTestWhileIdle(true);
//创建Redis配置
JedisClientConfiguration clientConfig = JedisClientConfiguration.builder()
.connectTimeout(Duration.ofSeconds(2))
.clientName("api-gateway-assist-redis-" + properties.getGatewayId())
.usePooling().poolConfig(poolConfig).build();
//实例化Redis链接对象
return new JedisConnectionFactory(standaloneConfig,clientConfig);
}
@Bean
public RedisMessageListenerContainer container(GatewayServiceProperties properties,RedisConnectionFactory connectionFactory,
MessageListenerAdapter msgAgreementListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(msgAgreementListenerAdapter,new PatternTopic(properties.getGatewayId()));
return container;
}
@Bean
public MessageListenerAdapter msgAgreementListenerAdapter(GatewayApplication gatewayApplication){
return new MessageListenerAdapter(gatewayApplication,"receiveMessage");
}
根据官方文档:
https://spring.io/guides/gs/messaging-redis?utm_source=chatgpt.com
Spring Data Redis 提供了使用 Redis 发送和接收消息所需的所有组件。具体来说,您需要配置:
连接工厂
消息监听器容器
Redis 模板
您将使用 Redis 模板发送消息,并将
Receiver
注册到消息监听器容器,以便它接收消息。连接工厂驱动模板和消息监听器容器,使它们连接到 Redis 服务器。https://docs.spring.io/spring-data/redis/reference/redis/pubsub.html
消息监听容器:Spring Data 提供了
RedisMessageListenerContainer
来承担所有繁重的工作。如果您熟悉 EJB 和 JMS,您应该会觉得这些概念很熟悉,因为它的设计尽可能接近 Spring Framework 及其消息驱动的 POJO(MDP)中的支持。
RedisMessageListenerContainer
充当消息监听器容器。它用于从 Redis 通道接收消息,并驱动注入到其中的MessageListener
实例。监听器容器负责所有消息接收线程,并将消息分发到监听器进行处理。消息监听器容器是 MDP 和消息提供者之间的中介,负责注册接收消息、资源获取和释放、异常转换等操作。这让应用程序开发者可以编写与接收消息(并对其做出响应)相关的(可能很复杂的)业务逻辑,并将 Redis 基础设施的样板问题委托给框架。
MessageListener
还可以实现SubscriptionListener
,以便在订阅/取消订阅确认时接收通知。监听订阅通知在同步调用时非常有用。此外,为了最大限度地减少应用程序占用空间,
RedisMessageListenerContainer
允许多个侦听器共享一个连接和一个线程,即使它们不共享订阅。因此,无论应用程序跟踪多少个侦听器或通道,运行时成本在其整个生命周期内保持不变。此外,该容器允许更改运行时配置,以便您可以在应用程序运行时添加或删除侦听器,而无需重启。此外,该容器采用惰性订阅方法,仅在需要时使用RedisConnection
。如果所有侦听器都取消订阅,则会自动执行清理操作并释放线程。为了支持消息的异步特性,容器需要一个
java.util.concurrent.Executor
(或 Spring 的TaskExecutor
)来调度消息。根据负载、监听器数量或运行时环境,您可以更改或调整执行器以更好地满足您的需求。尤其是在托管环境(例如应用服务器)中,强烈建议选择合适的TaskExecutor
以充分利用其运行时环境。
注意其中new MessageListenerAdapter(gatewayApplication,"receiveMessage")中的gatewayApplication被定义为Receiver,receiveMessage被定义为默认的监听到消息的处理方法。
所以最后我们来看下这个默认的消息处理GatewayApplication.java
package com.zshunbao.gateway.assist.application;
import com.alibaba.fastjson.JSON;
import com.zshunbao.gateway.assist.config.GatewayServiceProperties;
import com.zshunbao.gateway.assist.domain.model.aggregates.ApplicationSystemRichInfo;
import com.zshunbao.gateway.assist.domain.model.vo.ApplicationInterfaceMethodVO;
import com.zshunbao.gateway.assist.domain.model.vo.ApplicationInterfaceVO;
import com.zshunbao.gateway.assist.domain.model.vo.ApplicationSystemVO;
import com.zshunbao.gateway.assist.domain.service.RegisterGatewayService;
import com.zshunbao.gateway.core.mapping.HttpCommandType;
import com.zshunbao.gateway.core.mapping.HttpStatement;
import com.zshunbao.gateway.core.session.Configuration;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import java.util.List;
/**
* @program: api-gateway-assist
* @ClassName GatewayApplication
* @description: 网关应用;与 Spring 链接,调用网关注册和接口拉取
* @author: zs宝
* @create: 2025-08-21 15:26
* @Version 1.0
**/
public class GatewayApplication implements ApplicationContextAware, ApplicationListener<ContextClosedEvent> {
private Logger logger = LoggerFactory.getLogger(GatewayApplication.class);
private GatewayServiceProperties properties;
private RegisterGatewayService registerGatewayService;
private Configuration configuration;
private Channel gatewaySocketServerChannel;
public GatewayApplication(GatewayServiceProperties properties, RegisterGatewayService registerGatewayService, Configuration configuration, Channel gatewaySocketServerChannel) {
this.properties = properties;
this.registerGatewayService = registerGatewayService;
this.configuration = configuration;
this.gatewaySocketServerChannel = gatewaySocketServerChannel;
}
//在 Bean 实例化并且完成依赖注入后,Spring 会调用 setApplicationContext 方法,把当前上下文传进去
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
try {
// 1. 注册网关服务;每一个用于转换 HTTP 协议泛化调用到 RPC 接口的网关都是一个算力,这些算力需要注册网关配置中心
registerGatewayService.doRegister(properties.getAddress(),
properties.getGroupId(),
properties.getGatewayId(),
properties.getGatewayName(),
properties.getGatewayAddress());
addMappers("");
}catch (Exception e){
logger.error("网关服务启动失败,停止服务。{}", e.getMessage(), e);
throw e;
}
}
/**
* 从网关注册中心拉取配置
* @param systemId
*/
public void addMappers(String systemId) {
//2、从注册中心拉取映射信息配置
ApplicationSystemRichInfo applicationSystemRichInfo = registerGatewayService.pullApplicationSystemRichInfo(properties.getAddress(), properties.getGatewayId(),systemId);
//System.out.println(JSON.toJSONString(applicationSystemRichInfo));
//现在我们就要从这些信息中提取出来,网关核心通信需要的映射信息,包括构建Dubbo引用缓存需要的内容,以及HTTPStatement
List<ApplicationSystemVO> systemList = applicationSystemRichInfo.getApplicationSystemVOList();
if (systemList.isEmpty()) {
logger.warn("网关{}服务注册映射为空,请排查 gatewayCenterService.pullApplicationSystemRichInfo 是否检索到此网关算力需要拉取的配置数据。", systemId);
return;
}
for(ApplicationSystemVO systemVO:systemList) {
List<ApplicationInterfaceVO> interfaceList = systemVO.getInterfaceList();
for (ApplicationInterfaceVO interfaceVO : interfaceList) {
//配置中加入可以构建Dubbo 引用缓存的内容
configuration.registryConfig(systemVO.getSystemId(), systemVO.getSystemRegistry(), interfaceVO.getInterfaceId(), interfaceVO.getInterfaceVersion());
//接下来获得HttpStatement的信息,并加入网关通信配置中去
List<ApplicationInterfaceMethodVO> methodList = interfaceVO.getMethodList();
for (ApplicationInterfaceMethodVO methodVO : methodList) {
HttpStatement httpStatement = new HttpStatement(
methodVO.getSystemId(),
methodVO.getInterfaceId(),
methodVO.getMethodId(),
methodVO.getParameterType(),
methodVO.getUri(),
HttpCommandType.valueOf(methodVO.getHttpCommandType()),
methodVO.isAuth()
);
configuration.addMapper(httpStatement);
logger.info("网关服务注册映射 系统:{} 接口:{} 方法:{}", systemVO.getSystemId(), interfaceVO.getInterfaceId(), methodVO.getMethodId());
}
}
}
}
@Override
public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
try {
if (gatewaySocketServerChannel.isActive()) {
logger.info("应用容器关闭,Api网关服务关闭。localAddress:{}", gatewaySocketServerChannel.localAddress());
gatewaySocketServerChannel.close();
}
} catch (Exception e) {
logger.error("应用容器关闭,Api网关服务关闭失败", e);
}
}
public void receiveMessage(Object message){
logger.info("【事件通知】接收注册中心推送消息 message:{}", message);
addMappers(message.toString().substring(1,message.toString().length()-1));
}
}
注意看,我们将原本的从网关注册中心拉取配置的代码抽取了出来为addMappers(String systemId)方法,当最开始容器初始化的时候调用的是
addMappers("");
这其中system_id为空,一旦到了网管注册中心的拉取配置服务上面,就会直接拉取gateway_id对应的所有服务配置,而当监听到消息时调用的是addMappers(message.toString().substring(1,message.toString().length()-1))
,一旦到了网关注册中心的拉取配置服务上面,就只会拉取gateway_id对应的这一个system_id服务配置。
测试
本次测试在本地进行连通测试
启动 Docker 容器,启动 ZK、启动 Redis 服务
删除数据库表application_system,application_interface, application_interface_method中的所有信息,为的就是在 api-gateway-engine 启动的时候,不让它拉取到任何接口信息,当后续注册新的接口信息时候,在拉取
启动 api-gateway-center 注册中心,保证后续的流程可以启动
打包install最新 api-gateway-assist-05 文件,启动本地的api-gateway-engine
启动 api-gateway-test-provider 这个时候观察 api-gateway-engine 的日志信息,是否有映射完成操作。
再次发起请求
测试通过