FlowSlot获取全部流控规则 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null ) { for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException (rule.getLimitApp(), rule); } } } ------------------------------------------------------------------------------------------- private final Function<String, Collection<FlowRule>> ruleProvider = new Function <String, Collection<FlowRule>>() { @Override public Collection<FlowRule> apply (String resource) { Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); return flowRules.get(resource); } };
那我们来看apply方法,要注意的是其中的FlowRuleManager其实如果我们通过代码的方式来设置流控就会使用到它,因为它提供了对应的方法FlowRuleManager.loadRules()
1 2 3 4 5 6 7 8 public static void loadRules (List<FlowRule> rules) { currentProperty.updateValue(rules); }
但是现在这个位置源码用的FlowRuleManager.getFlowRuleMap(),其实通过这个方法我们就可以得知,FlowRuleManager的作用就是设置对应资源的流控规则,资源对应的流控规则(list集合)组成一个Map
1 2 Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
那我们现在来看FlowRuleChecker.checkFlow()方法如何应用规则,这里我们要注意遍历
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void checkFlow (Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null ) { return ; } Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null ) { for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException (rule.getLimitApp(), rule); } } } }
那我们需要跟进去看一下canPassCheck方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public boolean canPassCheck ( FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) { String limitApp = rule.getLimitApp(); if (limitApp == null ) { return true ; } if (rule.isClusterMode()) { return passClusterCheck(rule, context, node, acquireCount, prioritized); } return passLocalCheck(rule, context, node, acquireCount, prioritized); }
那么这里我们要关注的是处理单机流控的方法passLocalCheck
1 2 3 4 5 6 7 8 9 10 11 private static boolean passLocalCheck (FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null ) { return true ; } return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
在这里我们来看一下根据请求来选择节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 static Node selectNodeByRequesterAndStrategy ( FlowRule rule, Context context, DefaultNode node) { String limitApp = rule.getLimitApp(); int strategy = rule.getStrategy(); String origin = context.getOrigin(); if (limitApp.equals(origin) && filterOrigin(origin)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return node.getClusterNode(); } return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } return selectReferenceNode(rule, context, node); } return null ; }
那到这为止,我们就知道如何获取流控规则和针对不同来源的不同操作,那么现在还有一个问题就是,就是这个位置的解析
1 2 return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
流控效果对应的Controller 其实这个位置的rule.getRater()的返回值类型为TrafficShapingController,而这里的类结构为:
1 2 3 TrafficShapingController getRater () { return controller; }
其实从这里我们就能看出针对不同的流控模式,针对不同的控制器来进行处理,那么我们这里演示一个DefaultController
那我们现在来看一下具体DefaultController的操作,在DefaultController中,首先获取当前的线程数或者QPS数,如果当前的线程数或者QPS+申请的数量>配置的总数,则不通过,如果当前线程数或者QPS+申请的数量<=配置的总数,则直接通过
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class DefaultController implements TrafficShapingController { private static final int DEFAULT_AVG_USED_TOKENS = 0 ; private double count; private int grade; public DefaultController (double count, int grade) { this .count = count; this .grade = grade; } @Override public boolean canPass (Node node, int acquireCount) { return canPass(node, acquireCount, false ); } @Override public boolean canPass (Node node, int acquireCount, boolean prioritized) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); throw new PriorityWaitException (waitInMs); } } return false ; } return true ; } private int avgUsedTokens (Node node) { if (node == null ) { return DEFAULT_AVG_USED_TOKENS; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int )(node.passQps()); } private void sleep (long timeMillis) { try { Thread.sleep(timeMillis); } catch (InterruptedException e) { } } }
流程总结