基础概念
为什么需要限流?
外部请求是不可控的,而我们系统的负载是有限的,如果没有限流机制,一旦外部请求超过系统承载的压力,就会出现系统宕机等严重问题。加入限流正是为了保证系统负载在可以承受的范围内。
在高并发系统中,如果没有控制流量的机制,大量请求同时进入可能会导致:
服务过载:服务器CPU、内存或数据库被打爆。
雪崩效应:某个服务过载后挂掉,引发级联故障。
资源浪费:恶意爬虫、异常流量抢占资源,影响正常用户。
限流的目的就是 保护系统稳定性和可用性,让服务在高并发下仍能提供有质量的响应。
什么是单机限流?
单机限流是指在 单个应用实例 内,通过代码或中间件控制请求速率。
优点:
简单、性能高,不依赖外部组件。
适合 QPS 不高的系统或本地调试。
缺点:
不适合集群:如果服务有多个实例,每个实例独立计数,无法全局限流。
无法应对恶意分布式流量。
常见实现方式
计数器法:在单位时间内(比如 1 秒)统计请求数,超过上限直接拒绝。
滑动窗口:更精细化,避免边界突刺问题。
令牌桶/漏桶算法:最常用,支持突发流量和稳定速率控制。
常用工具
Guava RateLimiter:基于令牌桶算法,调用
acquire()
获取令牌。Bucket4j:更强大,支持本地和分布式(配合 Redis)。
Resilience4j:功能更全面(限流、熔断、重试)。
什么是分布式限流?
分布式限流是指在 多实例或集群环境 下,通过集中式存储或网关层来统一控制流量。
优点:
能保证全局一致性,适合大规模分布式服务。
能抵御集群流量放大问题。
缺点:
引入 Redis/网关,增加了系统复杂度。
如果 Redis 挂了,可能影响整个限流逻辑(需做降级策略)。
实现思路
集中式计数器:在 Redis/Memcached 中维护全局计数。
集中式令牌桶:在 Redis 中存放令牌,所有实例共享。
消息队列限流:请求先进入队列,由消费端以稳定速率处理。
API 网关层限流:如 Nginx、Kong、Spring Cloud Gateway。
常见实现方式
(1) 基于 Redis
使用
INCR
/EXPIRE
实现计数器限流。使用 Lua 脚本实现原子性令牌桶。
Redis Cluster 可以支撑高并发,保证一致性。
(2) 基于 API 网关
Nginx 限流:
limit_req
(速率限流)、limit_conn
(并发连接数限制)。Spring Cloud Gateway:基于 Redis 的令牌桶。
Kong、Zuul、Envoy:都有现成的限流插件。
单机限流 VS 分布式限流
限流组件简易实现
项目概述
本次我们主要来实现一个简单的有关限流的可插拔组件,将使用工厂模式+策略模式,使得整体的限流算法选择易于扩充。整体项目结构如下
我们这里主要实现了(注意下述内容,在后续实现中项目将其自定义为不同的限流策略)
基于Guava框架的本地单机限流
基于滑动窗口算法实现的Redis分布式限流
基于令牌桶算法实现的Redis分布式限流
基于Redisson框架的分布式限流
基于Sentinel框架的分布式限流
项目源代码地址:https://github.com/zhaoshunbao/Think-on-a-whim/tree/main/FlowLimit/RateLimiteComponent
功能实现
自定义注解
本项目利用AOP切面编程的方式,进行接口上的限流,定义了两个注解
RateLimiter.java
:自定义限流注解,定义限流的相关信息,让用户可以选择不同的限流策略,定义想要限流的大小
package com.zshunbao.ratelimite.annotation;
import com.zshunbao.ratelimite.common.RateLimiteConstants;
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
/**
* @program: FlowLimit
* @ClassName RateLimiter
* @description: 自定义限流注解
* @author: zs宝
* @create: 2025-09-16 20:30
* @Version 1.0
**/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface RateLimiter {
//根据什么东西限流
String key() default "#ip";
//限流策略
String strategy() default RateLimiteConstants.LOCAL_GUAVA;
//如果是滑动窗口算法,就表示一个窗口内允许的最大请求数。
//如果是令牌桶算法,就表示桶的大小
int limit() default 10;
//如果是滑动窗口算法,就表示滑动窗口大小
//如果是令牌桶算法,就表示每秒生成的令牌数permitsPerSecond
int period() default 10;
// 限流触发时调用的兜底方法(可选:类内无参方法或同签名方法名)
String fallbackMethod() default "fallbackMethod";
}
RateLimiteStrategyName
:用于为每种不同的限流策略命名
package com.zshunbao.ratelimite.annotation;
import java.lang.annotation.*;
/**
* @program: FlowLimit
* @ClassName RateLimiteConstants
* @description: 用于标注每种策略容器名
* @author: zs宝
* @create: 2025-09-19 21:17
* @Version 1.0
**/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
public @interface RateLimiteStrategyName {
//容器名称
String value();
}
限流策略定义
我们使用策略模式,将每种不同的限流策略实现到不同的类,后续如果有其它限流方式的实现,那么就只需要根据策略接口实现对应的限流类即可,除此外不会一点业务流程的代码
IRateLimiterStrategy.java
:限流策略接口定义如下
package com.zshunbao.ratelimite.strategy;
/**
* @program: FlowLimit
* @ClassName IRateLimiterStrategy
* @description: 限流策略接口,定义所有限流算法的标准
* @author: zs宝
* @create: 2025-09-19 21:36
* @Version 1.0
**/
public interface IRateLimiterStrategy {
/**
* 尝试获取访问权限
* @param key 键
* @param limit 时间窗口内最大请求数
* @param period 时间窗口,单位:秒
* @return 是否允许访问
*/
boolean tryAcquire(String key,int limit,int period);
}
自定义策略工厂
当我们实现了不同的限流策略后,用户可以进行选择不同的策略对接口进行限流。那么这里就有个问题,用户选择了不同的限流策略,我们如何加载到不同的用户想要的限流策略呢?这里我们自定义策略工厂辅助拿到对应的限流策略组件
RateLimiterStrategyFactory.java
package com.zshunbao.ratelimite.factory;
import com.zshunbao.ratelimite.annotation.RateLimiteStrategyName;
import com.zshunbao.ratelimite.strategy.IRateLimiterStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @program: FlowLimit
* @ClassName RateLimiterStrategyFactory
* @description: 限流策略工厂,通过Spring IOC自动注入并注册所有策略
* @author: zs宝
* @create: 2025-09-20 20:39
* @Version 1.0
**/
@Component
public class RateLimiterStrategyFactory {
private static final Logger logger = LoggerFactory.getLogger(RateLimiterStrategyFactory.class);
//存储所有限流策略的HashMap,key为策略名称,value为策略实例
private final Map<String, IRateLimiterStrategy> strategyMap=new HashMap<>();
/**
* 构造函数注入:Spring会自动将所有RateLimiterStrategy接口的实现类注入到该列表
*/
public RateLimiterStrategyFactory(List<IRateLimiterStrategy> rateLimiterStrategyList){
logger.info("RateLimiterStrategyFactory 获取所有限流策略");
for(IRateLimiterStrategy strategy:rateLimiterStrategyList){
String strategyName=getRatelimiteStrategyName(strategy);
strategyMap.put(strategyName,strategy);
System.out.println("已自动注册限流策略: " + strategyName + " -> " +
strategy.getClass().getSimpleName());
}
logger.info("RateLimiterStrategyFactory 获取所有限流策略成功");
}
public String getRatelimiteStrategyName(IRateLimiterStrategy strategy){
//先检查是否有对应的注解RateLimiteStrategyName,有的话用注解中的value值
RateLimiteStrategyName annotation = strategy.getClass().getAnnotation(RateLimiteStrategyName.class);
if (annotation != null && !annotation.value().isEmpty()) {
return annotation.value();
}
//没有该注解,用类名的全大写作为key
String key = strategy.getClass().getSimpleName().toUpperCase();
return key;
}
/**
* 根据策略名称获取策略实例
*/
public IRateLimiterStrategy getStrategy(String strategyName) {
IRateLimiterStrategy strategy = strategyMap.get(strategyName);
if(strategy==null){
throw new IllegalArgumentException("未找到名为[" + strategyName + "]的限流策略");
}
return strategy;
}
/**
* 获取所有已注册的策略名称
*/
public String[] getAllStrategyNames() {
return strategyMap.keySet().toArray(new String[0]);
}
}
将所有的限流策略全部放在
strategyMap
中,key为策略名称即RateLimiteStrategyName
注解中的value函数值
,value为策略实例在工厂类初始化时利用
IOC
容器的构造函数依赖注入的方式,拿到所有的自定义限流策略对外通过策略名拿到对应的限流实例
自定义限流切面
我们的项目希望接口一旦使用我们自定义的限流注解@RateLimiter
便会对对应的接口实现限流,这里通过AOP的方式实现
RateLimitAspect.java
:注意,我们这里由于没有具体的业务语义,因此暂时用的是最粗暴的IP限流
package com.zshunbao.ratelimite.aop;
import com.zshunbao.ratelimite.annotation.RateLimiter;
import com.zshunbao.ratelimite.factory.RateLimiterStrategyFactory;
import com.zshunbao.ratelimite.strategy.IRateLimiterStrategy;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* @program: FlowLimit
* @ClassName RateLimitAspect
* @description: 限流AOP切面,拦截带有@RateLimiter注解的方法并应用限流策略
* @author: zs宝
* @create: 2025-09-16 20:40
* @Version 1.0
**/
@Component
@Aspect
public class RateLimitAspect {
private static final Logger logger = LoggerFactory.getLogger(RateLimitAspect.class);
private RateLimiterStrategyFactory rateLimiterStrategyFactory;
public RateLimitAspect(RateLimiterStrategyFactory rateLimiterStrategyFactory) {
this.rateLimiterStrategyFactory = rateLimiterStrategyFactory;
}
//aop切点
@Pointcut("@annotation(com.zshunbao.ratelimite.annotation.RateLimiter)")
public void aopPoint(){}
@Around("aopPoint() && @annotation(rateLimiter)")
public Object around(ProceedingJoinPoint joinPoint, RateLimiter rateLimiter) throws Throwable {
//默认进行ip限流
//获得IP
ServletRequestAttributes attrs = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attrs == null) {
// 非 Web 场景:可选择放行/拒绝/使用默认 key
throw new IllegalStateException("No request context for rate limiting");
}
HttpServletRequest request = attrs.getRequest();
String ip = getClientIp(request);
//获取接口名称用于拼凑出键
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
String methodName = signature.getMethod().getName();
String key=methodName+":"+ip;
//通过自定义注解获得策略名称
String strategyName = rateLimiter.strategy();
// 获取对应的限流策略
IRateLimiterStrategy strategy = rateLimiterStrategyFactory.getStrategy(strategyName);
// 尝试获取令牌
boolean allowed = strategy.tryAcquire(key, rateLimiter.limit(), rateLimiter.period());
if(!allowed){
//限流走指定的限流方法
return fallbackMethodResult(joinPoint,rateLimiter.fallbackMethod());
}
//不限流放行
return joinPoint.proceed();
}
/**
* 调用用户配置的回调方法,当拦截后,返回回调结果。
*/
private Object fallbackMethodResult(JoinPoint jp, String fallbackMethod) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Signature signature = jp.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method method = jp.getTarget().getClass().getMethod(fallbackMethod, methodSignature.getParameterTypes());
return method.invoke(jp.getThis(),jp.getArgs());
}
/**
* 获取客户端真实IP
*/
private String getClientIp(HttpServletRequest request) {
String[] candidates = {
"X-Forwarded-For", "X-Real-IP", "Forwarded",
"Proxy-Client-IP", "WL-Proxy-Client-IP"
};
for (String h : candidates) {
String v = request.getHeader(h);
if (v != null && !v.isEmpty() && !"unknown".equalsIgnoreCase(v)) {
// X-Forwarded-For 可能是 "client, proxy1, proxy2"
String ip = v.split(",")[0].trim();
return "::1".equals(ip) ? "127.0.0.1" : ip;
}
}
String ip = request.getRemoteAddr();
return "::1".equals(ip) ? "127.0.0.1" : ip;
}
}
通过工厂拿到注解中对应的限流策略示例
调用对应限流实例的限流方法
到此,其实整个组件大的限流逻辑框架已经完成,只需要按需实现对应的限流策略即可。
限流策略实现
在实现前我们先引入一个常量类,用于专门定义限流策略名称,以及限流的键的前缀
package com.zshunbao.ratelimite.common;
/**
* @program: FlowLimit
* @ClassName RateLimiteConstants
* @description:
* @author: zs宝
* @create: 2025-09-19 20:56
* @Version 1.0
**/
public class RateLimiteConstants {
//定义限流策略
public static final String LOCAL_GUAVA="GUAVA";
public static final String REDIS_SLIDE_WINDOW="REDIS_SLIDE_WINDOW";
public static final String REDIS_TOKEN_BUCKET="REDIS_TOKEN_BUCKET";
public static final String REDISSION="REDISSION";
public static final String SENTINEL="SENTINEL";
public static final String BUCKET4J="BUCKET4J";
//定义限流存储键
public static final String REDIS_SLIDING_WINDOW_KEY ="rate_limit:sliding_window:";
public static final String REDIS_TOKEN_BUCKET_KEY="rate_limit:token_bucket:";
public static final String REDISSION_KEY="rate_limit:redission:";
public static final String BUCKET4J_KEY="rate_limit:bucket4j:";
}
接下来我们开始实现不同的限流策略
Guava
实现有关Guava的本地单机限流,代码如下
package com.zshunbao.ratelimite.strategy.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;
import com.zshunbao.ratelimite.annotation.RateLimiteStrategyName;
import com.zshunbao.ratelimite.common.RateLimiteConstants;
import com.zshunbao.ratelimite.strategy.IRateLimiterStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @program: FlowLimit
* @ClassName GuavaRateLimiter
* @description: Guvava本地限流
* @author: zs宝
* @create: 2025-09-19 21:40
* @Version 1.0
**/
@Component
@RateLimiteStrategyName(RateLimiteConstants.LOCAL_GUAVA)
public class GuavaRateLimiter implements IRateLimiterStrategy {
private static final Logger logger = LoggerFactory.getLogger(GuavaRateLimiter.class);
//使用 Guava Cache 存储不同 key 对应的 RateLimiter。
//每个 RateLimiter 的生命周期为 1 分钟(写入后 1 分钟自动过期删除)
public final Cache<String, RateLimiter> accessRecord= CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.build();
//limit 表示周期内允许的请求数。
//period 表示时间周期(单位秒)。
//(double) limit / period 计算出每秒允许产生的令牌数。
@Override
public boolean tryAcquire(String key, int limit, int period) {
logger.info("GuavaRateLimiter 开始限流");
RateLimiter rateLimiter = accessRecord.getIfPresent(key);
//如果为空,则重新创建加入
if(rateLimiter==null){
logger.info("GuavaRateLimiter 对于对应key:{} 本地无记录",key);
//基于 令牌桶算法,每秒生成 permitsPerSecond 个令牌。
double permitsPerSecond = (double) limit / period;
rateLimiter=RateLimiter.create(permitsPerSecond);
accessRecord.put(key,rateLimiter);
}
// 尝试获取令牌
return rateLimiter.tryAcquire();
}
}
基于滑动窗口的redis限流
这里要了解滑动窗算法首先要知道固定窗口算法
固定窗口算法核心原理
将时间划分为固定大小的窗口(如 1 分钟、10 秒),每个窗口内维护一个请求计数器。当请求到达时:
判断当前请求是否落在当前窗口内;
若在窗口内,计数器 + 1,若计数器超过阈值则限流;
若窗口过期(进入下一个窗口),重置计数器为 0。
示例:窗口大小 = 1 分钟,阈值 = 100 次请求。0:00-0:01 为窗口 1,0:01-0:02 为窗口 2。若 0:00:59 来了 100 次请求(计数器 = 100,正常处理),0:01:01 又来 100 次请求(窗口 2 计数器 = 100,正常处理),则 2 秒内实际处理 200 次请求,远超 “1 分钟 100 次” 的预期 —— 这就是临界窗口问题。
固定窗口算法无法应对两个时间窗口临界时间内的突发流量,在此基础上优化出了滑动窗口算法
滑动窗口算法核心原理
为解决固定窗口的临界问题,将 “固定窗口” 拆分为多个更小的时间片(如 1 分钟窗口拆分为 6 个 10 秒时间片)。请求到达时,以 “时间片” 为单位滑动窗口,计算当前窗口覆盖的所有时间片内的总请求数,若超过阈值则限流。
示例:窗口大小 = 1 分钟,时间片 = 10 秒,阈值 = 100。0:00-0:01 的窗口覆盖时间片 1(0-10s)~6(50-60s)。当时间到 0:01:01 时,窗口滑动为 0:00:01-0:01:01,覆盖时间片 2(10-20s)~7(60-70s)。此时 0:00:59 的 100 次请求(时间片 6)和 0:01:01 的 100 次请求(时间片 7)会被计入同一窗口,总请求数 = 200>100,触发限流,解决了临界问题。
介绍完相关算法后,我们来实现滑动窗口的redis算法
首先先引入相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.7.18</version>
</dependency>
然后配置redis
package com.zshunbao.ratelimite.config;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.sync.RedisCommands;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @program: FlowLimit
* @ClassName RedisConfig
* @description: 配置redis
* @author: zs宝
* @create: 2025-09-19 21:21
* @Version 1.0
**/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
//设置序列化
StringRedisSerializer stringRedisSerializer=new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(stringRedisSerializer);
//检查配置是否完整正确
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
最后我们实现基于滑动窗口算法的限流实例类(分布式限流)
package com.zshunbao.ratelimite.strategy.impl;
import com.zshunbao.ratelimite.annotation.RateLimiteStrategyName;
import com.zshunbao.ratelimite.common.RateLimiteConstants;
import com.zshunbao.ratelimite.strategy.IRateLimiterStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
/**
* @program: FlowLimit
* @ClassName RedisSlidingWindowLimiter
* @description: 基于redis实现的滑动窗口限流算法
* @author: zs宝
* @create: 2025-09-20 19:43
* @Version 1.0
**/
@Component
@RateLimiteStrategyName(RateLimiteConstants.REDIS_SLIDE_WINDOW)
public class RedisSlidingWindowLimiter implements IRateLimiterStrategy {
private static final Logger logger = LoggerFactory.getLogger(RedisSlidingWindowLimiter.class);
private final RedisTemplate<String,Object>redisTemplate;
//对应的Lua脚本
private final DefaultRedisScript<Long> slidingWindowScript;
public RedisSlidingWindowLimiter(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
this.slidingWindowScript = new DefaultRedisScript<>();;
}
//@PostConstruct:Spring 生命周期注解,Bean 初始化后执行。
@PostConstruct
public void init(){
logger.info("RedisSlidingWindowLimiter 初始化加载lua脚本");
//加载lua脚本
slidingWindowScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/sliding_window.lua")));
slidingWindowScript.setResultType(Long.class);
}
@Override
public boolean tryAcquire(String key, int limit, int window) {
String finalKey=RateLimiteConstants.REDIS_SLIDING_WINDOW_KEY +key;
//时间转为ms
long windowSize=window*1000L;
long currentTime = System.currentTimeMillis();
//构建一个只包含单个元素的不可变 List
List<String>finalKeys= Collections.singletonList(finalKey);
//以字符串传递参数
Object[]args=new Object[]{
String.valueOf(windowSize),
String.valueOf(limit),
String.valueOf(currentTime)
};
logger.info("RedisSlidingWindowLimiter 开始限流 key:{}",finalKey);
//执行脚本
//作为 Redis 脚本执行时的 KEYS 参数。
//在调用 redisTemplate.execute(slidingWindowScript, keys, args); 时,
//keys 对应 Lua 脚本里的 KEYS 数组。
//args 对应 Lua 脚本里的 ARGV 数组。
Long result= null;
try {
logger.info("RedisSlidingWindowLimiter 执行脚本 key:{}",finalKey);
result = redisTemplate.execute(slidingWindowScript,finalKeys,args);
logger.info("RedisSlidingWindowLimiter 执行脚本成功 key:{}",finalKey);
} catch (Exception e) {
logger.info("RedisSlidingWindowLimiter 执行脚本失败 key:{} 错误{}",finalKey,e.getMessage());
}
if(result==null|| result==0){
logger.info("RedisSlidingWindowLimiter 已经限流");
}
return result!=null && result==1;
}
}
由于是分布式限流,因此对于redis相关命令执行要做到原子性,这里采用lua脚本的方式进行解决
数据结构:使用
ZSET
,元素的 score=时间戳,member 也携带时间戳(+ 随机数防重复)。
清理过期:删除所有 score ≤ (now - window) 的成员,保证集合内只剩“当前窗口”的请求。
计数判断:用
ZCARD
得到窗口内请求数,与limit
比较。
放行或拒绝:
未超限 ⇒ 记录本次请求(
ZADD now member
),设置合理 TTL,返回1
超限 ⇒ 不写入(或按需写入)当前请求,更新 TTL(可选),返回
0
原子性:Lua 在 Redis 中执行是原子的,天然避免并发竞态
sliding_window.lua
-- 滑动窗口限流Lua脚本
-- KEYS[1]: 限流键(包含IP)
-- ARGV[1]: 窗口大小(毫秒)
-- ARGV[2]: 限流次数
-- ARGV[3]: 当前时间戳(毫秒)
-- 1、参数校验
if #KEYS ~=1 or #ARGV ~=3 then
-- 参数错误
return 0
end
-- 2、参数解析(由字符串强制转为数字)
local key=KEYS[1];
local windowSize=tonumber(ARGV[1])
local limit=tonumber(ARGV[2])
local currentTime=tonumber(ARGV[3])
-- 3、验证参数有效性
if not windowSize or not limit or not currentTime then
return 0
end
-- 4. 计算窗口边界,移除窗口外的请求记录
local minTime=currentTime-windowSize
-- 删除有序集合中,分数(score)在 [min, max] 区间的所有元素。
redis.call('ZREMRANGEBYSCORE',key,0,minTime)
-- 5. 统计当前窗口请求数,判断是否限流
-- 返回有序集合(Sorted Set)key 中的元素数量。
local count =redis.call('ZCARD',key)
if count < limit then
-- 允许访问:添加当前请求时间戳(加随机数避免重复)
-- currentTime:当前时间戳(毫秒)。
--..:Lua 的字符串拼接运算符。
--math.random(100000):生成 1 ~ 100000 的随机数。
--拼接结果:例如 "1695234567890:12345"。
--目的:保证 member 值唯一。
--如果只用 currentTime 作为 member,同一毫秒内多个请求就会覆盖(因为 ZSET 的 member 必须唯一)。
--加上随机数后,即使同一毫秒有很多请求,也能插入多个不重复的元素。
local member=currentTime .. ":" .. math.random(100000)
redis.call('ZADD',key,currentTime,member)
-- 设置过期时间(毫秒级,窗口+1000ms,自动清理过期键)
redis.call('PEXPIRE',key,windowSize+1000)
-- 1=允许访问
return 1
end
-- 0=拒绝访问(超出限流)
return 0
到此基于滑动窗口的限流策略已经实现
基于令牌桶算法的redis限流
这里要了解令牌桶算法,必须先了解漏桶算法
漏桶算法中将限流器比作一个漏斗,每一个请求到来就会向桶中添加一定的水量,桶底有一个孔,以恒定速度不断的漏出水;当一个请求过来需要向加水时,如果漏桶剩余容积不足以容纳添加的水量,就会触发拒绝策略。
漏桶算法核心原理
类比 “有漏洞的水桶”:
输入:请求以任意速率进入桶中;
存储:桶有固定容量,满了之后新请求会溢出(限流);
输出:桶以固定速率向外 “漏水”(即处理请求),无论桶内有多少请求,处理速率始终不变。
核心特点:输出速率绝对稳定,彻底隔绝突发流量对后端系统的冲击。
示例:桶容量 = 10,漏水速率 = 2 个 / 秒。若 1 秒内涌入 15 个请求:桶先装满 10 个,5 个溢出(限流);之后每秒处理 2 个,10 个请求需 5 秒处理完。期间若新请求进入,需先判断桶是否已满,满则限流。
漏桶是看处理效率和生产效率来控制流速,但是这个流速是静态的,很可能无法充分利用机器的性能。比如,服务器能处理的速率是100qps,但是我们配置的恒定流速只有50qps,这个时候服务器资源还非常地冗余。
在基于漏桶算法的基础上,改进出了令牌桶算法
令牌桶算法能比较灵活的调整以最大化利用资源:系统每接受到一个请求时,都要求有一个令牌,如果拿到令牌就处理,否则就拒绝,处理完以后把令牌丢弃。 桶中能存放的最大令牌数决定了令牌桶算法的最大并发,当桶中放满令牌时,允许达到最大并发。
令牌桶算法核心原理
类比 “发放令牌的水桶”,与漏桶算法互补:
令牌生成:系统以固定速率向桶中放入令牌(如每秒放 2 个);
令牌存储:桶有固定容量,令牌满了之后新令牌会溢出(不存储);
请求处理:请求到达时需从桶中获取 1 个令牌,获取成功则处理请求,无令牌则限流(可选择等待或拒绝)。
核心特点:支持合理的突发流量—— 若桶中积累了令牌(如长时间无请求),突发请求可一次性获取多个令牌,快速处理;长期高流量时,令牌生成速率决定了最大处理速率,仍能限流。
示例:桶容量 = 10,令牌生成速率 = 2 个 / 秒。若长时间无请求,桶中积累 10 个令牌。此时突发 15 个请求:先获取 10 个令牌处理 10 个请求,剩余 5 个请求需等待令牌生成(每秒 2 个,3 秒生成 6 个,处理 5 个);若请求是 8 个,直接获取 8 个令牌处理,桶中剩余 2 个令牌。
这里基于令牌桶的限流策略实现如下
package com.zshunbao.ratelimite.strategy.impl;
import com.zshunbao.ratelimite.annotation.RateLimiteStrategyName;
import com.zshunbao.ratelimite.common.RateLimiteConstants;
import com.zshunbao.ratelimite.strategy.IRateLimiterStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
/**
* @program: FlowLimit
* @ClassName RedisTokenBucketLimiter
* @description: 基于redis实现的令牌桶限流算法
* @author: zs宝
* @create: 2025-09-20 20:09
* @Version 1.0
**/
@Component
@RateLimiteStrategyName(RateLimiteConstants.REDIS_TOKEN_BUCKET)
public class RedisTokenBucketLimiter implements IRateLimiterStrategy {
private static final Logger logger = LoggerFactory.getLogger(RedisTokenBucketLimiter.class);
private final RedisTemplate<String,Object>redisTemplate;
private final DefaultRedisScript<Long> redisTokenBucketScript;
public RedisTokenBucketLimiter(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
this.redisTokenBucketScript = new DefaultRedisScript<>();
}
@PostConstruct
public void init(){
logger.info("RedisTokenBucketLimiter 初始化加载lua脚本");
redisTokenBucketScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/token_bucket.lua")));
redisTokenBucketScript.setResultType(Long.class);
}
@Override
public boolean tryAcquire(String key, int capacity, int period) {
if(key==null || capacity<=0 || period<=0){
logger.info("RedisTokenBucketLimiter 传入参数错误 key:{} capacity:{} permitsPerSecond:{}",key,capacity,period);
return false;
}
//组装key
String finalKey=RateLimiteConstants.REDIS_TOKEN_BUCKET_KEY+key;
// 计算每秒生成的令牌数
double tokensPerSecond = (double) capacity / period;
List<String> finalKeys= Collections.singletonList(finalKey);
Object[]args=new Object[]{
//当前时间戳(毫秒)
String.valueOf(System.currentTimeMillis()),
//桶容量
String.valueOf(capacity),
//每秒生成的令牌数
String.valueOf(tokensPerSecond),
//本次请求需要的令牌数
String.valueOf(1)
};
Long result=null;
try {
logger.info("RedisTokenBucketLimiter 执行脚本 key:{}",finalKey);
result = redisTemplate.execute(redisTokenBucketScript,finalKeys,args);
logger.info("RedisTokenBucketLimiter 执行脚本成功 key:{}",finalKey);
}catch (Exception e){
logger.info("RedisTokenBucketLimiter 执行脚本失败 key:{} 错误{}",finalKey,e.getMessage());
}
if(result==null|| result==0){
logger.info("RedisTokenBucketLimiter 已经限流");
}
return result!=null && result==1;
}
}
其lua脚本如下
每次请求先从 Redis 拿到
tokens
和lastTime
。
根据
(now - lastTime)
计算这段时间新增的令牌数。
更新桶的令牌数(不能超过
capacity
)。
如果桶里还有令牌,就
tokens = tokens - 1
,允许请求;否则拒绝。
把新的
tokens
和lastTime
存回 Redis。
token_bucket.lua
-- 令牌桶限流Lua脚本(与Java参数一致,返回1/0)
-- KEYS[1]: 限流键名(包含IP)
-- ARGV[1]: 当前时间戳(毫秒)
-- ARGV[2]: 桶容量(整数)
-- ARGV[3]: 每秒生成的令牌数(可为小数)
-- ARGV[4]: 本次请求需要的令牌数(整数)
-- 1、参数校验
if #KEYS ~=1 or #ARGV ~=4 then
-- 参数错误
return 0
end
-- 2、参数解析(由字符串强制转为数字)
local key = KEYS[1]
local currentTime = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local ratePerSec = tonumber(ARGV[3])
local required = tonumber(ARGV[4])
-- 3、验证参数有效性
if not currentTime or not capacity or not ratePerSec or not required then
return 0
end
if capacity <= 0 or ratePerSec <= 0 or required <= 0 then
return 0
end
if required > capacity then
return 0
end
-- 4、读取当前桶状态
-- HMGET key field1 field2 ...:从哈希表 key 中取多个字段的值。
--这里取 tokens 和 lastTime 两个字段:
--tokens:桶里当前剩余的令牌数。
--lastTime:上一次填充令牌的时间戳。
local vals=redis.call('HMGET',key,'tokens','lastTime')
local tokens = tonumber(vals[1])
local lastTime = tonumber(vals[2])
if tokens == nil then tokens = capacity end
if lastTime == nil then lastTime = currentTime end
-- 5、令牌补充(按时间流逝生成令牌)
local elapsed=(currentTime-lastTime)/1000
if elapsed < 0 then elapsed = 0 end
-- 根据经过的时间,按令牌生成速率 ratePerSec 增加令牌。
local newTokens=tokens+elapsed*ratePerSec
if newTokens > capacity then
newTokens = capacity
end
-- 6、判断是否可以消费
if newTokens >= required then
newTokens=newTokens-required
--写回状态
redis.call('HSET',key,'tokens', newTokens, 'lastTime', currentTime)
-- TTL:桶从当前剩余到满桶所需时间 + 60s,至少60s
local ttl=math.ceil((capacity - newTokens) / ratePerSec)+60
if ttl <60 then ttl=60 end
-- 避免key无限增长
redis.call('EXPIRE', key, ttl)
return 1
else
-- 写回状态(即使失败也记录时间和令牌数)
redis.call('HSET', key, 'tokens', newTokens, 'lastTime', currentTime)
local ttl = math.ceil((capacity - newTokens) / ratePerSec) + 60
if ttl < 60 then ttl = 60 end
redis.call('EXPIRE', key, ttl)
return 0
end
到此基于令牌桶算法的redis限流也已经实现
Redisson
限流
上述两种利用redis
实现的限流策略都是让我们自己实现的限流算法,那么有没有现成的框架可以使用呢?Redisson
首先我们引入相关依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.20.1</version>
</dependency>
由于Redisson
和Guava
一样内部已经上线了相关的RateLimiter
,因此我们直接使用即可。基于Redisson实现的限流策略如下
package com.zshunbao.ratelimite.strategy.impl;
import com.zshunbao.ratelimite.annotation.RateLimiteStrategyName;
import com.zshunbao.ratelimite.common.RateLimiteConstants;
import com.zshunbao.ratelimite.strategy.IRateLimiterStrategy;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* @program: FlowLimit
* @ClassName RedissionRateLimiter
* @description: 利用Redission实现限流
* @author: zs宝
* @create: 2025-09-21 10:24
* @Version 1.0
**/
@Component
@RateLimiteStrategyName(RateLimiteConstants.REDISSION)
public class RedissionRateLimiter implements IRateLimiterStrategy {
private static final Logger logger = LoggerFactory.getLogger(RedissionRateLimiter.class);
private final RedissonClient redisson;
public RedissionRateLimiter(RedissonClient redisson) {
this.redisson = redisson;
}
@Override
public boolean tryAcquire(String key, int limit, int period) {
String finalKey=RateLimiteConstants.REDISSION_KEY+key;
//获得ratelimiter,RRateLimiter 是基于 令牌桶算法 实现的分布式限流器。
RRateLimiter rateLimiter = redisson.getRateLimiter(finalKey);
//初始化限流规则(仅首次生效,已存在则返回false)
rateLimiter.trySetRate(
// 所有实例共享
RateType.OVERALL,
// 周期内最大请求数
limit,
// 周期长度
period,
RateIntervalUnit.SECONDS
);
boolean allowed=false;
try {
logger.info("RedissionRateLimiter 执行限流 key:{}",finalKey);
allowed = rateLimiter.tryAcquire();
logger.info("RedissionRateLimiter 执行限流成功 key:{}",finalKey);
} catch (Exception e) {
logger.info("RedissionRateLimiter 执行限流失败 key:{} 错误:{}",finalKey,e.getMessage());
}
return allowed;
}
}
Sentinel限流
这里我们在最后介绍一个限流方案,阿里的sentinel限流。
官网:sentinel 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、流量路由、熔断降级、系统自适应过载保护、热点流量防护等多个维度保护服务的稳定性。
详情参考Sentinel官方文档
其实基于Sentinel的限流策略实现如下
package com.zshunbao.ratelimite.strategy.impl;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.zshunbao.ratelimite.annotation.RateLimiteStrategyName;
import com.zshunbao.ratelimite.common.RateLimiteConstants;
import com.zshunbao.ratelimite.strategy.IRateLimiterStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* @program: FlowLimit
* @ClassName SentinelRateLimiter
* @description:
* @author: zs宝
* @create: 2025-09-21 10:57
* @Version 1.0
**/
@Component// 让工厂用名称拿到它
@RateLimiteStrategyName(RateLimiteConstants.SENTINEL)
public class SentinelRateLimiter implements IRateLimiterStrategy {
private static final Logger logger = LoggerFactory.getLogger(SentinelRateLimiter.class);
/** 本地缓存已设置过的资源阈值,避免每次都重装规则(简单去抖) */
private final Map<String, Double> installedQpsCache = new ConcurrentHashMap<>();
@Override
public boolean tryAcquire(String key, int limit, int period) {
if (period <= 0) period = 1;
double qps = ((double) limit) / period; // 映射为每秒平均速率
//确保该资源已存在或更新到目标 QPS 的规则
ensureFlowRule(key, qps);
//尝试进入名为 resourceKey 的 Sentinel 资源
logger.info("sentinel 限流 key:{}",key);
try (Entry ignored = SphU.entry(key)) {
logger.info("sentinel 限流成功 key:{}",key);
return true; // 放行
} catch (BlockException ex) {
logger.info("sentinel 限流失败 key:{}",key);
return false; // 被限流或降级
}
}
/** 按需创建/更新 QPS 规则(简单合并到全量规则后 load) */
private void ensureFlowRule(String resource, double qps) {
//先查缓存:若该资源的 QPS 与目标 qps 几乎相等(浮点数允许极小差),直接返回以减少不必要的规则装载。
Double old = installedQpsCache.get(resource);
if (old != null && Math.abs(old - qps) < 1e-6) {
return; // 阈值没变,无需更新
}
// 取出现有规则,替换/追加该 resource 的规则
//从 FlowRuleManager 取出当前所有限流规则,复制成可修改的 ArrayList
List<FlowRule> all = new ArrayList<>(Optional.ofNullable(FlowRuleManager.getRules()).orElse(Collections.emptyList()));
boolean replaced = false;
//遍历找同名资源的规则:
//找到 → 用新规则替换(更新 QPS/行为等参数)
for (int i = 0; i < all.size(); i++) {
FlowRule r = all.get(i);
if (resource.equals(r.getResource())) {
FlowRule nr = buildQpsRule(resource, qps);
all.set(i, nr);
replaced = true;
break;
}
}
//没找到 → 说明是首次给该资源加规则,直接追加。
if (!replaced) {
all.add(buildQpsRule(resource, qps));
}
FlowRuleManager.loadRules(all);
installedQpsCache.put(resource, qps);
}
private FlowRule buildQpsRule(String resource, double qps) {
FlowRule rule = new FlowRule();
rule.setResource(resource);
//基于QPS限流(另一种是并发线程数
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
//// Sentinel 要求 >0
rule.setCount(Math.max(qps, 0.000001));
// 可选:阈值控制行为(直接拒绝/排队等待/预热)
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT); // 直接拒绝
// rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); // 匀速排队
rule.setWarmUpPeriodSec(10);
return rule;
}
}
到此本文文实现的5种限流策略已经全部完成
相关配置
这里主要写一下项目的相关配置依赖等信息
pom文件为
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<!-- Spring Boot 2.7.x 是最后一代支持 JDK8 的版本 -->
<version>2.7.18</version>
<relativePath/>
</parent>
<groupId>com.zshunbao.limit</groupId>
<artifactId>RateLimiteComponent</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- AOP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- Web(若做全局Filter可选) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Redis 客户端(Lettuce) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.7.18</version>
</dependency>
<!-- Guava(本地令牌桶) -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.13</version>
</dependency>
<!-- 引入redission-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.20.1</version>
</dependency>
<!-- sentinel-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>1.8.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置yml为application.yml
spring:
redis:
host: localhost
port: 16379
server:
port: 8081
redisson:
singleServerConfig:
address: "redis://127.0.0.1:16379"
database: 0
connectionPoolSize: 64
connectionMinimumIdleSize: 10
threads: 4
nettyThreads: 8
测试
这里我们在controller包下专门定义一个测试的请求,如下
package com.zshunbao.ratelimite.controller;
import com.zshunbao.ratelimite.annotation.RateLimiter;
import com.zshunbao.ratelimite.common.RateLimiteConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @program: FlowLimit
* @ClassName RateLimiteController
* @description:
* @author: zs宝
* @create: 2025-09-20 21:32
* @Version 1.0
**/
@RestController
@RequestMapping("/rateLimiter")
public class RateLimiteController {
@Autowired
private RedisTemplate<String, String> redisTemplate;
// 使用Guava本地限流,60秒内最多5次请求
@RateLimiter(strategy = RateLimiteConstants.LOCAL_GUAVA, limit = 5, period = 60, fallbackMethod = "fallbackMethod")
@GetMapping("/guava")
public String guavaTest() {
return "Guava限流测试:请求成功";
}
// 使用Redis滑动窗口限流,60秒内最多5次请求
@RateLimiter(strategy = RateLimiteConstants.REDIS_SLIDE_WINDOW, limit = 5, period = 60, fallbackMethod = "fallbackMethod")
@GetMapping("/sliding")
public String slidingWindowTest() {
return "滑动窗口限流测试:请求成功";
}
// 使用Redis令牌桶限流,60秒内最多5次请求
@RateLimiter(strategy = RateLimiteConstants.REDIS_TOKEN_BUCKET, limit = 5, period = 60, fallbackMethod = "fallbackMethod")
@GetMapping("/token")
public String tokenBucketTest() {
return "令牌桶限流测试:请求成功";
}
// 使用Redission令牌桶限流,60秒内最多5次请求
@RateLimiter(strategy = RateLimiteConstants.REDISSION, limit = 5, period = 60, fallbackMethod = "fallbackMethod")
@GetMapping("/redission")
public String redisssionTest() {
return "redisssion限流测试:请求成功";
}
// 使用sentinel限流,
@RateLimiter(strategy = RateLimiteConstants.SENTINEL, limit = 2, period = 2, fallbackMethod = "fallbackMethod")
@GetMapping("/sentinel")
public String sentinelTest() {
return "sentinel限流测试:请求成功";
}
@GetMapping("/testRedis")
public String testRedis() {
redisTemplate.opsForValue().set("rateLimiter", "rateLimiter");
return redisTemplate.opsForValue().get("rateLimiter");
}
public String fallbackMethod(){
return "IP已被限流";
}
}
启动类为
package com.zshunbao.ratelimite;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RateLimiterApplication {
public static void main(String[] args) {
SpringApplication.run(RateLimiterApplication.class, args);
}
}
启动项目
测试对应的接口
首先测试redis是否连接成功
然后开始测试各种限流策略
Guava
连续多刷新几次
基于滑动窗口算法的redis限流
多刷新几次
基于令牌桶的redis限流
多刷新几次
Redisson限流
连续多刷新几次
基于Sentinel限流
多连续刷新几次
到次测试完成
其它限流方案
除了上述限流策略外还可以基于以下方式进行限流
1、Redis的redis-cell限流模块 Git地址:redis-cell Redis 4.0提供了一个限流Redis模块,名称为redis-cell,并提供原子的限流指令。 该模块只有一条指令cl.throttle,其参数和返回值比较复杂。
根据官网说明,redis-cell的好处主要是两点: 1)、性能很好,非常快 2)、封装了底层实现,避免自定义Lua出现幼稚写法。
但是由于作者声称redis-cell模块目前已经达到很好的性能和实现,后续已经不再积极维护。
2、Nginx 限流 在Nginx中可以通过limit_req_zone配置限流策略。 limit_req_zone 用来限制单位时间内的请求数,采用的漏桶算法 “leaky bucket”。 limit_req_zone binary_remote_addr zone=test:10m rate=10r/s; #定义限流策略 1 binary_remote_addr 指定按 ip 限流统计。 zone=test:10m 表示生成一个大小为 10M,名字为 one 的内存区域,用来存储访问的频次信息。 rate=10r/s 表示允许同一个客户端的访问频次是每秒 10 次。
3、Gateway网关限流
4、Bucket4j 官方文档:Bucket4j
Bucket4j是一个基于令牌桶算法实现的强大的限流库,它不仅支持单机限流,还支持通过诸如 Hazelcast、Ignite、Coherence、Infinispan 或其他兼容 JCache API (JSR 107) 规范的分布式缓存实现分布式限流。 和 Guava 的限流器相比,Bucket4j 的功能显然要更胜一筹,毕竟 Guava 的目的只是用作通用工具类,而不是用于限流的。使用 Bucket4j 基本上可以满足我们的大多数要求,不仅支持单机限流和分布式限流,而且可以很好的集成监控,搭配 Prometheus 和 Grafana 简直完美。值得一提的是,有很多开源项目譬如 JHipster API Gateway 就是使用 Bucket4j 来实现限流的。
Bucket4j 唯一不足的地方是它只支持请求频率限流,不支持并发量限流,另外还有一点,虽然 Bucket4j 支持分布式限流,但它是基于 Hazelcast 这样的分布式缓存系统实现的,不能使用 Redis,这在很多使用 Redis 作缓存的项目中就很不爽,所以我们还需要在开源的世界里继续探索。
5、Resilience4j
官网:Resilience4j Resilience4j 是一款轻量级、易使用的高可用框架。用过 Spring Cloud 早期版本的同学肯定都听过 Netflix Hystrix,Resilience4j 的设计灵感就来自于它。自从 Hystrix 停止维护之后,官方也推荐大家使用 Resilience4j 来代替 Hystrix。
Resilience4j 的底层采用 Vavr,这是一个非常轻量级的 Java 函数式库,使得 Resilience4j 非常适合函数式编程。Resilience4j 以装饰器模式提供对函数式接口或 lambda 表达式的封装,提供了一波高可用机制:重试(Retry)、熔断(Circuit Breaker)、限流(Rate Limiter)、限时(Timer Limiter)、隔离(Bulkhead)、缓存(Caceh) 和 降级(Fallback)。我们重点关注这里的两个功能:限流(Rate Limiter) 和 隔离(Bulkhead),Rate Limiter 是请求频率限流,Bulkhead 是并发量限流。
Resilience4j 提供了两种限流的实现:SemaphoreBasedRateLimiter 和 AtomicRateLimiter。SemaphoreBasedRateLimiter 基于信号量实现,用户的每次请求都会申请一个信号量,并记录申请的时间,申请通过则允许请求,申请失败则限流,另外有一个内部线程会定期扫描过期的信号量并释放,很显然这是令牌桶的算法。AtomicRateLimiter 和上面的经典实现类似,不需要额外的线程,在处理每次请求时,根据距离上次请求的时间和生成令牌的速度自动填充。
参考资料
最后感谢大模型GPT老师,豆包老师