Skip to main content

从零到桶,来一场和令牌的浪漫邂逅

· 20 min read

如果你在上海待过,“死亡九号线”一定不陌生。听说2024年会好些,但回想2021年的早晨八点,我站在泗泾地铁站,队伍常常能排到地铁站门口,加上错综复杂的围栏,东拐西拐,就像我的人生一样曲折坎坷。我不甘心,想起我的计算机生涯,同样如此,让我心碎不已。

看到地铁限流,我不禁想起了程序限流。工作了这么久,我竟然从未涉足过这方面,也对此一无所知,人生更是失败透顶。

我意识到,要改变人生,就得从生活中的每个细节开始落实。光想不行,还得付诸行动。

所以这次我打算自己动手,写一个Java的限流算法。限流,顾名思义,就是限制某一时刻能够访问的流量大小。比如说,本来100M/s的网速,被某云服务限制到100k/s,让人痛苦不堪。而服务器中的限流也是同样的道理,高峰期服务器扛不住压力,所以限制每个IP在5秒内只能访问一次,等等等等。

俗话说得好,知己知彼,方能百战百胜。这里我查了一下市面上常见的限流算法,如下:

特性/算法类型基于计数的限流算法(固定窗口)滑动窗口算法漏桶算法令牌桶算法
原理固定时间段内计数,超限则限流时间窗口内细分计数,逐个窗口检查请求进入队列并按恒定速率流出按固定速率填充令牌,请求需消耗令牌才能处理
特点粗粒度限流,易实现精细化限流,更平滑控制流出速率,无视突发请求允许一定突发流量,同时保持平均速率
灵活性较差,窗口切换时可能出现突刺较好,连续性和稳定性较好良好,恒定处理速率最佳,可调节限流速率和突发处理能力
突发处理不允许有一定处理能力不允许允许
平滑性较差较好很好很好
实现难度简单中等简单到中等中等到复杂
适用场景对简单限流需求,如基础并发控制需要更精细流量控制的场景稳定性要求高,流量整形网络传输、接口限流、既要限速又要允许突发流量

说实话,看得我是直流口水,如此多的算法,除了计数以外,似乎都很不错。考虑下来,最后还是选择了令牌桶,不为别的,就凭他的实现难度为最高,就该选它! 人生就是充满挑战性

按照令牌桶的原理所说,首先我们要实现按照固定速率填充令牌,这样好像我们只要考虑两个参数即可,速率?

说白了就是某刻时间消耗的令牌数量,消耗的令牌数量越大,速率越高。当我们明白这点以后,我们就可以先创建出来两个变量

public class TokenBucketContext {
/**
* 每周期令牌数(tokensPerPeriod): 在每个填充周期内向桶中添加的令牌数量。
*/
int tokensPerPeriod;

/**
* 填充间隔毫秒数(refillIntervalMillis): 两次填充操作之间的时间间隔,以毫秒为单位。
*/
int refillIntervalMillis;
}

让我在完善一下刚刚脑子中一些不成熟的说话,某一刻表示的实际上是一个周期,一个周期有多长呢?应该交给使用者来决定吧,比如说,十秒钟允许3次访问,10秒钟就代表一个周期,填充间隔应该就是10秒,一个周期有3次,每周期令牌数就应该是3。看到这里,我相信大家一定都能够明白

再让我们继续往下看 允许一定突发流量,同时保持平均速率,平均速率,我们已经实现了,但允许一定的突发流量,这个我们还没有实现,想要达到这个效果,我意识到仅仅只有周期令牌数和填充间隔时间是不够的,所以我们需要再次升级,这一次我加入了一个容量

public class TokenBucketParam {

/**
* 容量(capacity): 令牌桶的最大容量,表示最多可以存储的令牌数量。
*/
int capacity;
/**
* 每周期令牌数(tokensPerPeriod): 在每个填充周期内向桶中添加的令牌数量。
*/
int tokensPerPeriod;

/**
* 填充间隔毫秒数(refillIntervalMillis): 两次填充操作之间的时间间隔,以毫秒为单位。
*/
int refillIntervalMillis;
}

有些朋友可能就开始困惑了呢,你加入一个容量,怎么就能实现允许一定的突发流量呢?别急,让我细细道来。

假如A系统,预计1个小时,有10wTPS。也就是一个小时有10w次修改,正常情况下,都是十分顺利的。但是A系统依赖的B系统,宕机了1小时,导致阻塞,无法更新,那么在下一个时刻,假设会有20wTPS过来。在这种极端场景下,我对于这种最大的突发流量评估下来,我设置最大容量为20w,每周期令牌数为10w,填充时间就为1小时

在T1时刻(故障的时刻)没有流量进来,我T1时刻产生的10w令牌,没有被用。到了T2时刻,由于我们最大容量为20w,所以T2产生的10w令牌,又可以继续存放起来,所以目前系统中的令牌为20w,T2时刻,对应的20wTPS 就能刚刚好承载住,所以我们只需要加入最大容量,就可以通过代码来完成实现啦,是不是很激动人心,不过别着急。我们距离完美还需要最后一个参数,那就是初始令牌数,毕竟,你也不想一上来,就必须得过一个周期才能使用吧?

public class TokenBucketContext {

/**
* 容量(capacity): 令牌桶的最大容量,表示最多可以存储的令牌数量。
*/
int capacity;
/**
* 每周期令牌数(tokensPerPeriod): 在每个填充周期内向桶中添加的令牌数量。
*/
int tokensPerPeriod;

/**
* 填充间隔毫秒数(refillIntervalMillis): 两次填充操作之间的时间间隔,以毫秒为单位。
*/
int refillIntervalMillis;
/**
* 初始令牌数(initialTokens): 令牌桶初始化时的令牌数量。
*/
int initialTokens;
}

至此,我们可以松一口气了,完成了第一个阶段,参数定义。在我们的设想中,这四个参数,足以满足我们实现令牌桶的特性了,剩下的,我们就需要开发实现细节了

public class TokenBucket {

private final TokenBucketContext context;

private int tokens;

@Getter
private long lastRefillTime;

public TokenBucket(TokenBucketContext context) {
this.context = context;
tokens = context.initialTokens;
lastRefillTime = System.currentTimeMillis();
}

// 简化的尝试获取令牌方法
public boolean tryAcquire() {
if (tokens > 0) {
tokens--; // 有令牌则减少一个令牌
return true;
}
return false; // 没有令牌则返回false
}
}

一口气,先让我们来做一个最最最最简单的demo,接下来,我们要尝试思考一个问题,tryAcquire会有线程安全问题吗,这取决于我们接下来该如何开发

答案是有的,如果你不理解的话,我可以举一个例子给你,假设A、B两个用户同时进来,他们都会操作这个类中的变量tokens。A用户先将tokens更改为10,而B进来的时候,tokens还处于11,A用户将tokens-1 也就是9,对于B来说,他完全不清楚,于是他继续-1,最后结果又变成了10。因此就造成了线程安全的问题

只有tokens会有线程安全问题吗?换句话说,只要是tryAcquire 有涉及到的类中的变量,存在并发更新,都会有线程安全问题。

public synchronized boolean tryAcquire(long time) {
int tokenCount = tokens;
// 大于0 表示令牌桶中有令牌
if (tokenCount > 0) {
// 减少令牌计数,由于已经同步,无需CAS操作
tokens = tokenCount - 1;
return true;
}
// 填充间隔毫秒数
int refillInterval = context.refillIntervalMillis;
// 小于等于0 => 不进行填充
if (refillInterval <= 0) {
return false;
}

int periods = (int) (time - lastRefillTime) / refillInterval;
// 计算从上一次填充令牌到现在经过的填充周期数periods。如果periods小于等于0,则表示还未到填充时间
if (periods <= 0) {
return false;
}
int capacity = context.capacity;
// 填充周期数 * 每个周期需要填充的数量,就等于填充的令牌数量
tokenCount = Math.min(MathUtil.multiply(periods, context.tokensPerPeriod, capacity) - 1, capacity - 1);
// 更新令牌数量和最后填充时间,这里直接操作因为方法已经是同步的
tokens = tokenCount;
lastRefillTime = time;
return true;
}



//工具类
public final class MathUtil {

private MathUtil() {
}

/**
* @see Math#addExact(int, int)
*/
public static int add(int x, int y, int fallbackValue) {
int r = x + y;
if (((x ^ r) & (y ^ r)) < 0) {
return fallbackValue;
}
return r;
}

/**
* @see Math#addExact(long, long)
*/
public static long add(long x, long y, long fallbackValue) {
long r = x + y;
if (((x ^ r) & (y ^ r)) < 0) {
return fallbackValue;
}
return r;
}

/**
* @see Math#multiplyExact(int, int)
*/
public static int multiply(int x, int y, int fallbackValue) {
long r = (long) x * (long) y;
if ((int) r != r) {
return fallbackValue;
}
return (int) r;
}

/**
* @see Math#multiplyExact(long, long)
*/
public static long multiply(long x, long y, long fallbackValue) {
long r = x * y;
long ax = Math.abs(x);
long ay = Math.abs(y);
if (((ax | ay) >>> 31 != 0)) {
if (((y != 0) && (r / y != x)) || (x == Long.MIN_VALUE && y == -1)) {
return fallbackValue;
}
}
return r;
}

public static int toInt(long value) {
if (value > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
if (value < Integer.MIN_VALUE) {
return Integer.MIN_VALUE;
}
return (int) value;
}

}

第一版本,我直接上了synchronized,简单、方便,解决掉了线程安全的问题,在我心里,第一个版本,性能不是关键的,能实现功能才是真的,所以你可以看到我又补充了一些逻辑,比如 计算填充周期数,以及计算tokenCount

让我来尝试解释的更详细一些,计算填充周期数,通过传入的time,一般为当前时间戳,减去上一次填充时间,得到中间未填充时间,再除去refillInterval(填充间隔数) 这样我们就得到了周期,还记得我们之前说的允许一定突出流量吗?也就是当我们计算出多个周期时,可以试着填充更多的令牌进去,但不能超过令牌,否则就坏啦~~

至此我们第一个版本的完整功能就开发完成了

在第二个版本,我将着重优化性能,作为有追求的程序员,肯定不会满足止步于此的对吧?相信你和我一样,我要改革的第一步,就是砍掉synchronized,尽管他十分的方便,但我还是认为他太重了,所以我选择了cas,使用cas我就需要重新考虑变量线程安全问题了,因为不加锁了嘛。还好java为我们提供了volatile,他可以保证可见性

public class TokenBucket {

private static final AtomicIntegerFieldUpdater<TokenBucket> TOKENS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(TokenBucket.class, "tokens");

private final TokenBucketContext context;

@Getter
private volatile int tokens;
@Getter
private volatile long lastRefillTime;

/**
* @implNote We don't validate properties here, and it should be validated when the properties
* are updated
*/
public TokenBucket(TokenBucketContext context) {
this.context = context;
tokens = context.initialTokens;
lastRefillTime = System.currentTimeMillis();
}
public boolean tryAcquire(long time) {
int tokenCount = tokens;
//大于0 表示令牌桶中有令牌
if (tokenCount > 0) {
//cas更新,能否成功获取到令牌
if (TOKENS_UPDATER.compareAndSet(this, tokenCount, tokenCount - 1)) {
return true;
}
//否则继续递归调用,只要桶内令牌存在,就继续递归调用 因此tokenCount是一个需要保证线程安全的变量 所以加了volatile
return tryAcquire(time);
}
// 填充间隔毫秒数
int refillInterval = context.refillIntervalMillis;
// 小于等于0 => 不进行填充
if (refillInterval <= 0) {
return false;
}

int periods = (int) (time - lastRefillTime) / refillInterval;
//计算从上一次填充令牌到现在经过的填充周期数periods。如果periods小于等于0,则表示还未到填充时间
//也就是必须经过一个填充周期数period
if (periods <= 0) {
return false;
}
int capacity = context.capacity;
//填充周期数 * 每个周期需要填充的数量,就等于填充的令牌数量
tokenCount = MathUtil.multiply(periods, context.tokensPerPeriod, capacity) - 1;
if (tokenCount > capacity) {
tokenCount = capacity - 1;
}
if (TOKENS_UPDATER.compareAndSet(this, 0, tokenCount)) {
lastRefillTime = time;
return true;
}
return tryAcquire(time);
}
}

至此,代码已经相当完美了,使用了cas无锁并发的我们感觉自己写的程序碉堡了!!!但是,请先别激动,代码仍然还有性能优化的空间。问题就出在了如果当tokenCount为0的时候,他需要重新填充令牌,才可以继续使用。如果我们有一种办法,提前填充,那我是不是就可以规避掉这部分的时间耗时呢?答案是可以的,我们需要新开一个方法,如下:

    /**
* 重新填充令牌桶。
* 根据过去的时间计算需要添加的令牌数量,并更新令牌桶中的令牌数量。
* 如果令牌数量超过令牌桶的容量,则将其限制在容量范围内。
*
* @param time 当前时间,用于计算自上次填充以来的间隔。
*/
public void refill(long time) {
// 获取填充间隔,如果间隔不合法,则直接返回。
int refillInterval = context.refillIntervalMillis;
if (refillInterval <= 0) {
return;
}

// 计算自上次填充以来的间隔段数。
int periods = (int) (time - lastRefillTime) / refillInterval;
if (periods <= 0) {
return;
}

// 获取当前令牌数和令牌桶容量。
int tokenCount = tokens;
int capacity = context.capacity;

// 计算新的令牌数量,可能超过容量。
int newTokenCount = MathUtil.add(tokenCount,
MathUtil.multiply(periods, context.tokensPerPeriod, capacity),
capacity);
if (newTokenCount > capacity) {
newTokenCount = capacity;
}

// 尝试原子方式更新令牌数,如果成功,则更新上次填充时间。
if (TOKENS_UPDATER.compareAndSet(this, tokenCount, newTokenCount)) {
lastRefillTime = time;
} else {
// 如果更新失败,则重新尝试填充。
refill(time);
}
}

他将专门用来填充令牌,同时,我们开一个定时任务,用来更新他,这里,大家可以用不同的方案进行实现~

 public class RateLimitManager{
//限流的令牌
private final Map<String, TokenBucket> ipToTokenBucket = new ConcurrentHashMap<>(32);

public static void start(){
// 假设 properties 和 ipToTokenBucket 已经正确定义
TokenBucketContext tokenBucketContext = new TokenBucketContext(properties);

// 创建一个定时任务执行器
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

// 使用CronUtil解析cron表达式得到对应的调度时间规则(此部分需引入第三方库如cron-utils)
CronParser parser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX));
Cron cron = parser.parse(CronConst.EXPIRED_ADMIN_API_ACCESS_INFO_CLEANUP_CRON);

// 计算首次执行时间
ZonedDateTime nextExecution = cron.nextExecution(ZonedDateTime.now(ZoneId.systemDefault()));
long initialDelay = nextExecution.toInstant().toEpochMilli() - System.currentTimeMillis();

// 定义清理任务
Runnable cleanupTask = () -> {
// 获取当前时间
long now = System.currentTimeMillis();

// 遍历并处理每个令牌桶
List<TokenBucket> toRemove = new ArrayList<>();
for (Map.Entry<String, TokenBucket> entry : ipToTokenBucket.entrySet()) {
TokenBucket bucket = entry.getValue();
bucket.refill(now); // 根据当前时间补充令牌

// 检查是否达到最大容量,如果是,则计划移除
if (bucket.getTokens() >= tokenBucketContext.getCapacity()) {
toRemove.add(bucket);
}
}

// 移除符合条件的桶
toRemove.forEach(bucket -> ipToTokenBucket.values().remove(bucket));

// 重新调度下一次执行
ZonedDateTime nextRun = cron.nextExecution(ZonedDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneId.systemDefault()));
long delay = nextRun.toInstant().toEpochMilli() - System.currentTimeMillis();
scheduler.schedule(cleanupTask, delay, TimeUnit.MILLISECONDS);
};

// 初次调度任务
scheduler.scheduleAtFixedRate(cleanupTask, initialDelay, 1, TimeUnit.HOURS);
}

}


至此,我们的高性能令牌桶就完成了,我距离Java入门,又更近了一步!