Sven

限流


令牌桶和漏桶算法的差异

令牌桶和漏桶是两种常用的限流算法,它们在实现和特性上有一些关键的差异:

令牌桶算法

  • 原理: 以固定速率向桶中添加令牌,请求到来时消耗令牌。如果桶中有足够的令牌,则允许请求;否则,请求被拒绝或等待。
  • 特点: 允许一定程度的突发流量,因为令牌可以在桶中累积。
  • 优势: 对于需要处理突发流量的场景更为友好。

漏桶算法

  • 原理: 水(请求)以不固定的速率流入桶中,而桶以固定的速率漏水(处理请求)。如果桶满了,则多余的水(请求)会溢出(被拒绝)。
  • 特点: 强制限制了请求的最大处理速率,即使在突发流量情况下也保持稳定的输出。
  • 优势: 更适合需要稳定输出率的场景。

主要差异

  • 突发流量处理: 令牌桶允许短时间的突发流量,而漏桶始终保持固定的处理速率。
  • 实现复杂度: 令牌桶通常实现起来较为简单,而漏桶可能需要更复杂的队列管理。
  • 应用场景: 令牌桶适用于需要允许某些突发流量的场景,漏桶适用于需要严格控制处理速率的场景。

选择使用哪种算法取决于具体的应用需求和流量特征。在实际应用中,有时也会结合两种算法的特点来设计更复杂的限流策略。


RateLimiter 常用方法

Google Guava 的 RateLimiter 类提供了一些常用方法来实现限流功能:

  • create(double permitsPerSecond):创建一个稳定输出令牌的 RateLimiter,保证了平均每秒不超过 permitsPerSecond 个请求
  • acquire():从 RateLimiter 获取一个许可,该方法会被阻塞直到获取到许可
  • acquire(int permits):从 RateLimiter 获取指定数量的许可
  • tryAcquire():尝试获取一个许可,如果获取成功则立即返回 true,否则立即返回 false
  • tryAcquire(int permits):尝试获取指定数量的许可
  • tryAcquire(long timeout, TimeUnit unit):在指定的时间内尝试获取一个许可
  • setRate(double permitsPerSecond):更新 RateLimiter 的许可发放速率
  • getRate():返回 RateLimiter 的许可发放速率

这些方法可以根据不同的场景和需求来灵活使用,实现精确的限流控制。

import com.google.common.util.concurrent.RateLimiter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class TokenRateLimiterService {
    // 每个用户对应一个限流器
    private final Map<String, RateLimiter> userLimiters = new ConcurrentHashMap<>();
    
    // 每分钟10次,换算成每秒约0.1667次
    private double permitsPerSecond = 10.0 / 60.0;

    public TokenRateLimiterService(double permitsPerSecond) {
        this.permitsPerSecond = permitsPerSecond;
    }

    private RateLimiter getRateLimiterForUser(String token) {
        return userLimiters.computeIfAbsent(token, k -> RateLimiter.create(permitsPerSecond));
    }

    public boolean isRequestAllowed(String token) {
        RateLimiter rateLimiter = getRateLimiterForUser(token);
        // 尝试获取一个令牌,如果获取成功则允许请求
        return rateLimiter.tryAcquire();
    }
}
public class ApiService {
    private final TokenRateLimiterService rateLimiterService = new TokenRateLimiterService(1.0); // 每秒1次

    public String handleRequest(String token) {
        if (rateLimiterService.isRequestAllowed(token)) {
            return "Request is allowed";
        } else {
            return "Too many requests, please try again later.";
        }
    }
}

Bucket4j 常用方法(java8收费)

Bucket4j 是一个基于令牌桶算法的 Java 限流库,提供了以下常用方法:

  • Bucket.builder():创建一个 Bucket 构建器
  • addLimit(Bandwidth.classic(long capacity, Refill refill)):添加带有容量和填充策略的限制
  • build():构建 Bucket 实例
  • tryConsume(long tokens):尝试消费指定数量的令牌
  • tryConsumeAndReturnRemaining(long tokens):尝试消费令牌并返回剩余的令牌数
  • consumeAndReturnRemaining(long tokens):消费令牌并返回剩余的令牌数,可能会阻塞
  • getAvailableTokens():获取当前可用的令牌数

这些方法使得 Bucket4j 能够灵活地应用于各种限流场景,提供了精确的控制和监控能力。


使用计数器加时间戳实现限流

计数器加时间戳是一种简单而有效的限流方法。这种方法通过记录每个用户在特定时间窗口内的请求次数来实现限流。以下是这种方法的基本原理:

  • 计数器:为每个用户维护一个请求计数器
  • 时间戳:记录用户最后一次请求的时间
  • 时间窗口:设定一个固定的时间窗口(如1分钟)
  • 限制阈值:在时间窗口内允许的最大请求次数

实现步骤:

  1. 当收到请求时,检查当前时间与用户最后请求时间的差值
  2. 如果差值大于时间窗口,重置计数器和时间戳
  3. 如果在时间窗口内,增加计数器
  4. 如果计数器超过阈值,拒绝请求;否则允许请求

下面的代码示例展示了这种方法的具体实现:

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class TokenRateLimiterService {
    private final Map<String, UserRequestInfo> requestCounts = new ConcurrentHashMap<>();
    private final int MAX_REQUESTS_PER_MINUTE = 10;

    private static class UserRequestInfo {
        int requestCount;
        long lastRequestTime;
    }

    public synchronized boolean isRequestAllowed(String token) {
        long currentTime = System.currentTimeMillis();
        UserRequestInfo info = requestCounts.computeIfAbsent(token, k -> new UserRequestInfo());

        // 如果超过1分钟,重置计数
        if ((currentTime - info.lastRequestTime) > 60 * 1000) {
            info.requestCount = 0;
            info.lastRequestTime = currentTime;
        }

        // 判断请求是否超过限制
        if (info.requestCount < MAX_REQUESTS_PER_MINUTE) {
            info.requestCount++;
            return true;
        } else {
            return false;
        }
    }
}

使用 Spring AOP 实现限流

使用 Spring AOP 实现限流是一种非常灵活和可扩展的方法。以下是使用 Spring AOP 实现限流的示例:

首先,我们定义一个自定义注解 @RateLimit:

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
    int value() default 10; // 默认每分钟允许10次请求
}

然后,我们创建一个 AOP 切面来实现限流逻辑:

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Aspect
@Component
public class RateLimitAspect {

    private final Map<String, UserRequestInfo> requestCounts = new ConcurrentHashMap<>();

    private static class UserRequestInfo {
        int requestCount;
        long lastRequestTime;
    }

    @Around("@annotation(rateLimit) && args(token,..)")
    public Object rateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimit, String token) throws Throwable {
        long currentTime = System.currentTimeMillis();
        UserRequestInfo info = requestCounts.computeIfAbsent(token, k -> new UserRequestInfo());

        // 超过一分钟则重置计数
        if ((currentTime - info.lastRequestTime) > 60 * 1000) {
            info.requestCount = 0;
            info.lastRequestTime = currentTime;
        }

        // 判断请求是否在限制范围内
        if (info.requestCount < rateLimit.maxRequests()) {
            info.requestCount++;
            return joinPoint.proceed();
        } else {
            throw new RuntimeException("Too many requests, please wait a minute.");
        }
    }
}

最后,我们可以在需要限流的方法上使用 @RateLimit 注解:

@RestController
public class MyController {
    @RateLimit(maxRequests = 10) // 每分钟最多10次
    @GetMapping("/my-endpoint")
    public String myEndpoint(@RequestHeader("Authorization") String token) {
        return "Request allowed";
    }
}

这种方法的优点是:

  • 可以方便地应用到任何需要限流的方法上
  • 通过修改注解参数,可以为不同的方法设置不同的限流规则
  • 实现了关注点分离,限流逻辑与业务逻辑分开
  • 可以轻松扩展,例如添加更复杂的限流策略或日志记录

需要注意的是,这个实现是基于内存的,对于分布式系统可能需要使用分布式缓存(如Redis)来存储限流信息。


使用 Redis 和 Lua 脚本实现分布式限流

在分布式系统中,使用 Redis 实现限流是一种常见的做法。结合 Lua 脚本可以确保原子性操作,有效解决并发问题。以下是一个使用 Redis 和 Lua 脚本实现分布式限流的示例:

首先,我们定义一个 Lua 脚本来实现限流逻辑:

local key = KEYS[1]
local limit = tonumber(ARGV[1])
local expire_time = tonumber(ARGV[2])

local current = redis.call('incr', key)
if current == 1 then
    redis.call('expire', key, expire_time)
end

if current > limit then
    return 0
else
    return 1
end

然后,我们可以在 Java 中使用 Jedis 客户端来执行这个 Lua 脚本:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class RedisRateLimiter {
    private final JedisPool jedisPool;
    private final String luaScript;

    public RedisRateLimiter(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
        this.luaScript = loadLuaScript();
    }

    private String loadLuaScript() {
        // 加载上面定义的 Lua 脚本
        return "local key = KEYS[1] ...";  // 完整的 Lua 脚本内容
    }

    public boolean isAllowed(String key, int limit, int expireTime) {
        try (Jedis jedis = jedisPool.getResource()) {
            String sha = jedis.scriptLoad(luaScript);
            Object result = jedis.evalsha(sha, 1, key, String.valueOf(limit), String.valueOf(expireTime));
            return Long.parseLong(result.toString()) == 1;
        }
    }
}

最后,我们可以在 Spring Boot 应用中使用这个限流器:

@Service
public class ApiService {
    private final RedisRateLimiter rateLimiter;

    public ApiService(JedisPool jedisPool) {
        this.rateLimiter = new RedisRateLimiter(jedisPool);
    }

    public String accessApi(String userId) {
        String key = "rate_limit:" + userId;
        if (rateLimiter.isAllowed(key, 10, 60)) {
            return "Access granted";
        } else {
            throw new RuntimeException("Rate limit exceeded");
        }
    }
}

这种实现方式的优点包括:

  • 分布式:可以在多个应用实例间共享限流状态
  • 原子性:使用 Lua 脚本确保了增加计数和设置过期时间的原子性
  • 高性能:Redis 的高性能特性使得限流检查非常快速
  • 灵活性:可以轻松调整限流的键、限制和时间窗口

需要注意的是,这种方法依赖于 Redis 的可用性。在实际应用中,应考虑 Redis 的高可用性配置,如使用 Redis Cluster 或 Sentinel。

On this page