admin 管理员组文章数量: 1086019
如何限流,具体实现有哪些?
什么是限流
限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统 的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
比如场景:
某天小明突然发现自己的接口请求突然之间涨到了原来的10倍,接口几乎不能使用,产生了一系列连锁反应,导致了整个系统崩溃。这就好比,老电闸中都安装了保险丝,一旦使用大功率设备,保险丝就会熔断,保证各个电器不被强电流烧坏,系统也同样安装保险丝,防止非预期请求过大,引起系统瘫痪。
限流方法
常用的限流算法有:计数法,滑动窗口计数法,漏桶算法和令牌桶算法。
漏桶算法思路
水(请求)进入到漏桶里,漏桶以一定的速度流出,当水流的速度过大会直接溢出, 漏桶是强行限制了数据的传输速率。
令牌桶算法
除了要能够限制数据的平均传输速率外,还需要允许某种程度的突发请求,令牌桶更为合适。
令牌桶的思路是以一个恒定的速率往桶里放令牌,如果请求需要被处理,则需要从桶里取出一个令牌,如果没有令牌可取,那么就拒绝服务。
Google开源工具包Guava提供了限流工具类RateLimiter是基于令牌桶算法来实现的。
public double acquire() {return acquire(1);}public double acquire(int permits) {checkPermits(permits); //检查参数是否合法(是否大于0)long microsToWait;synchronized (mutex) { //应对并发情况需要同步microsToWait = reserveNextTicket(permits, readSafeMicros()); //获得需要等待的时间 }ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达到限制时,microsToWait为0return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);}private long reserveNextTicket(double requiredPermits, long nowMicros) {resync(nowMicros); //补充令牌long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //获取这次请求消耗的令牌数目double freshPermits = requiredPermits - storedPermitsToSpend;long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)+ (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;this.storedPermits -= storedPermitsToSpend; // 减去消耗的令牌return microsToNextFreeTicket;}private void resync(long nowMicros) {// if nextFreeTicket is in the past, resync to nowif (nowMicros > nextFreeTicketMicros) {storedPermits = Math.min(maxPermits,storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);nextFreeTicketMicros = nowMicros;}}
计数器
控制单位时间内的请求数量
import java.util.concurrent.atomic.AtomicInteger;public class Counter {/*** 最大访问数量*/private final int limit = 10;/*** 访问时间差*/private final long timeout = 1000;/*** 请求时间*/private long time;/*** 当前计数器*/private AtomicInteger reqCount = new AtomicInteger(0);public boolean limit() {long now = System.currentTimeMillis();if (now < time + timeout) {// 单位时间内reqCount.getAndAdd(1);return reqCount.get() <= limit;} else {// 超出单位时间time = now;reqCount = new AtomicInteger(0);return true;}}public static void main(String[] args) {}}
计数方式有没有问题?
假设每分钟请求数量为 60 个,每秒钟系统可以处理1个请求,用户在00:59 发送了60 个请求,然后在 1:00 发生了60个请求,此时 2 秒内有120个请求(每秒60)个请求,这样的方式并没有实现限制流量,因为每分钟可以处理60个,但是实际上这60个是一秒钟发过来的。
滑动窗口计数
滑动窗口是对计数方式对改进,增加一个时间粒度的度量单位。
把一分钟分成了若干等份,比如分成6份, 每份10s, 在一份独立计数器上,在 00:00-00:09 之间计数器累加1, 当等份数量越大,限流统计越详细。
package ratelimit;import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;public class TimeWindow {private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();/**a* 间隔秒数*/private int seconds;/*** 最大限流*/private int max;public TimeWindow(int max, int seconds) {this.seconds = seconds;this.max = max;new Thread(() -> {while (true) {try {Thread.sleep((seconds - 1) * 1000);} catch (InterruptedException e) {e.printStackTrace();}clean();}}).start();}public static void main(String[] args) {final TimeWindow timeWindow = new TimeWindow(10, 1);IntStream.range(0, 3).forEach((i) -> {new Thread(() -> {try {while (true) {Thread.sleep(new Random().nextInt(20) * 100);}} catch (InterruptedException e) {e.printStackTrace();}timeWindow.take();}).start();});}public void take() {long start = System.currentTimeMillis();int size = sizeOfValid();if (size > max) {System.out.println("超限");}synchronized (queue) {if (sizeOfValid() > max) {System.err.println("超限");System.err.println("queue 中有:" + queue.size() + "最大数量:" + max);}this.queue.offer(System.currentTimeMillis());}System.err.println("queue 中有:" + queue.size() + "最大数量:" + max);}private int sizeOfValid() {Iterator<Long> it = queue.iterator();Long ms = System.currentTimeMillis() - seconds * 1000;int count = 0;while (it.hasNext()) {long t = it.next();if (t > ms) {//在时间窗口范围内count++;}}return count;}public void clean() {Long c = System.currentTimeMillis() - seconds * 1000;Long t1 = null;while ((t1 = queue.peek()) != null && t1 < c) {System.out.println("数据清理");queue.poll();}}
}
令牌桶问题
令牌桶规定固定容量的桶,令牌 token 以固定速度往桶内填充,当桶填满时 token 不会继续放入,每过来一个请求把 token 从桶中移除,当没有 token 可以获取时,拒绝请求。
令牌桶算法
当网络设备衡量流量是否超过额定带宽时,需要查看令牌桶,而令牌桶中会放置一定数量的令牌,一个令牌允许接口发送或接收1bit数据(有时是1 Byte数据),当接口通过1bit数据后,同时也要从桶中移除一个令牌。当桶里没有令牌的时候,任何流量都被视为超过额定带宽,只有当桶中有令牌时,数据才可以通过接口。令牌桶中的令牌不仅仅可以被移除,同样也可以往里添加,所以为了保证接口随时有数据通过,就必须不停地往桶里加令牌,由此可见,往桶里加令牌的速度,就决定了数据通过接口的速度。 因此,我们通过控制往令牌桶里加令牌的速度从而控制用户流量的带宽。而设置的这个用户传输数据的速率被称为承诺信息速率(CIR),通常以秒为单位。比如我们设置用户的带宽为1000 bit每秒,只要保证每秒钟往桶里添加1000个令牌即可。
令牌桶可以用来保护自己,主要用来对调用者频率进行限流,为的是不让自己的系统垮掉。
令牌桶算法代码
package com.netease.datastream.util.flowcontrol;import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;/*** <pre>* Created by inter12 on 15-3-18.* </pre>*/
public class TokenBucket {// 默认桶大小个数 即最大瞬间流量是64Mprivate static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;// 一个桶的单位是1字节private int everyTokenSize = 1;// 瞬间最大流量private int maxFlowRate;// 平均流量private int avgFlowRate;// 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 *// 1024 * 64private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();private volatile boolean isStart = false;private ReentrantLock lock = new ReentrantLock(true);private static final byte A_CHAR = 'a';public TokenBucket() {}public TokenBucket(int maxFlowRate, int avgFlowRate) {this.maxFlowRate = maxFlowRate;this.avgFlowRate = avgFlowRate;}public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {this.everyTokenSize = everyTokenSize;this.maxFlowRate = maxFlowRate;this.avgFlowRate = avgFlowRate;}public void addTokens(Integer tokenNum) {// 若是桶已经满了,就不再家如新的令牌for (int i = 0; i < tokenNum; i++) {tokenQueue.offer(Byte.valueOf(A_CHAR));}}public TokenBucket build() {start();return this;}/*** 获取足够的令牌个数* * @return*/public boolean getTokens(byte[] dataSize) {// Preconditions.checkNotNull(dataSize);
// Preconditions.checkArgument(isStart,
// "please invoke start method first !");int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数final ReentrantLock lock = this.lock;lock.lock();try {boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量if (!result) {return false;}int tokenCount = 0;for (int i = 0; i < needTokenNum; i++) {Byte poll = tokenQueue.poll();if (poll != null) {tokenCount++;}}return tokenCount == needTokenNum;} finally {lock.unlock();}}public void start() {// 初始化桶队列大小if (maxFlowRate != 0) {tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);}// 初始化令牌生产者TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1,TimeUnit.SECONDS);isStart = true;}public void stop() {isStart = false;scheduledExecutorService.shutdown();}public boolean isStarted() {return isStart;}class TokenProducer implements Runnable {private int avgFlowRate;private TokenBucket tokenBucket;public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {this.avgFlowRate = avgFlowRate;this.tokenBucket = tokenBucket;}@Overridepublic void run() {tokenBucket.addTokens(avgFlowRate);}}public static TokenBucket newBuilder() {return new TokenBucket();}public TokenBucket everyTokenSize(int everyTokenSize) {this.everyTokenSize = everyTokenSize;return this;}public TokenBucket maxFlowRate(int maxFlowRate) {this.maxFlowRate = maxFlowRate;return this;}public TokenBucket avgFlowRate(int avgFlowRate) {this.avgFlowRate = avgFlowRate;return this;}private String stringCopy(String data, int copyNum) {StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);for (int i = 0; i < copyNum; i++) {sbuilder.append(data);}return sbuilder.toString();}public static void main(String[] args) throws IOException,InterruptedException {tokenTest();}private static void arrayTest() {ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);tokenQueue.offer(1);tokenQueue.offer(1);tokenQueue.offer(1);System.out.println(tokenQueue.size());System.out.println(tokenQueue.remainingCapacity());}private static void tokenTest() throws InterruptedException, IOException {TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("D:/ds_test")));String data = "xxxx";// 四个字节for (int i = 1; i <= 1000; i++) {Random random = new Random();int i1 = random.nextInt(100);boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data,i1).getBytes());TimeUnit.MILLISECONDS.sleep(100);if (tokens) {bufferedWriter.write("token pass --- index:" + i1);System.out.println("token pass --- index:" + i1);} else {bufferedWriter.write("token rejuect --- index" + i1);System.out.println("token rejuect --- index" + i1);}bufferedWriter.newLine();bufferedWriter.flush();}bufferedWriter.close();}}
令牌桶和漏桶的选择问题
如果要让自己的系统不被打垮,用令牌桶,如果保证别人的系统不被打垮,用漏桶算法。
内推链接:=MzsxNjIwMzgzNzA2MzYyOzY5Mzk2OTkyMjAwODk2NjkxNTE7MA
欢迎关注公众号:程序员开发者社区
参考资料
- .html
本文标签: 如何限流,具体实现有哪些
版权声明:本文标题:如何限流,具体实现有哪些? 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/b/1686559009a10197.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论