| 
                        副标题[/!--empirenews.page--]
                           
Sentinel 是阿里中间件团队开源的,面向分布式服务架构的轻量级高可用流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。 
大家可能会问:Sentinel 和之前常用的熔断降级库 Netflix Hystrix 有什么异同呢?Sentinel官网有一个对比的文章,这里摘抄一个总结的表格,具体的对比可以点此 链接 查看。 
 
从对比的表格可以看到,Sentinel比Hystrix在功能性上还要强大一些,本文让我们一起来了解下Sentinel的源码,揭开Sentinel的神秘面纱。 
项目结构 
将Sentinel的源码fork到自己的github库中,接着把源码clone到本地,然后开始源码阅读之旅吧。 
首先我们看一下Sentinel项目的整个结构: 
 
    - sentinel-core 核心模块,限流、降级、系统保护等都在这里实现
 
    - sentinel-dashboard 控制台模块,可以对连接上的sentinel客户端实现可视化的管理
 
    - sentinel-transport 传输模块,提供了基本的监控服务端和客户端的API接口,以及一些基于不同库的实现
 
    - sentinel-extension 扩展模块,主要对DataSource进行了部分扩展实现
 
    - sentinel-adapter 适配器模块,主要实现了对一些常见框架的适配
 
    - sentinel-demo 样例模块,可参考怎么使用sentinel进行限流、降级等
 
    - sentinel-benchmark 基准测试模块,对核心代码的精确性提供基准测试
 
 
运行样例 
基本上每个框架都会带有样例模块,有的叫example,有的叫demo,sentinel也不例外。 
那我们从sentinel的demo中找一个例子运行下看看大致的情况吧,上面说过了sentinel主要的核心功能是做限流、降级和系统保护,那我们就从“限流”开始看sentinel的实现原理吧。 
 
可以看到sentinel-demo模块中有很多不同的样例,我们找到basic模块下的flow包,这个包下面就是对应的限流的样例,但是限流也有很多种类型的限流,我们就找根据qps限流的类看吧,其他的限流方式原理上都大差不差。 
- public class FlowQpsDemo { 
 -  
 - private static final String KEY = "abc"; 
 -  
 - private static AtomicInteger pass = new AtomicInteger(); 
 -  
 - private static AtomicInteger block = new AtomicInteger(); 
 -  
 - private static AtomicInteger total = new AtomicInteger(); 
 -  
 - private static volatile boolean stop = false; 
 -  
 - private static final int threadCount = 32; 
 -  
 - private static int seconds = 30; 
 -  
 - public static void main(String[] args) throws Exception { 
 -  
 - initFlowQpsRule(); 
 - tick(); 
 -  
 - // first make the system run on a very low condition 
 -  
 - simulateTraffic(); 
 -  
 - System.out.println("===== begin to do flow control"); 
 -  
 - System.out.println("only 20 requests per second can pass"); 
 -  
 - } 
 -  
 - private static void initFlowQpsRule() { 
 -  
 - List<FlowRule> rules = new ArrayList<FlowRule>(); 
 -  
 - FlowRule rule1 = new FlowRule(); 
 -  
 - rule1.setResource(KEY); 
 -  
 - // set limit qps to 20 
 -  
 - rule1.setCount(20); 
 -  
 - // 设置限流类型:根据qps 
 -  
 - rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); 
 -  
 - rule1.setLimitApp("default"); 
 -  
 - rules.add(rule1); 
 -  
 - // 加载限流的规则 
 -  
 - FlowRuleManager.loadRules(rules); 
 -  
 - } 
 -  
 - private static void simulateTraffic() { 
 -  
 - for (int i = 0; i < threadCount; i++) { 
 -  
 - Thread t = new Thread(new RunTask()); 
 -  
 - t.setName("simulate-traffic-Task"); 
 -  
 - t.start(); 
 -  
 - } 
 -  
 - } 
 -  
 - private static void tick() { 
 -  
 - Thread timer = new Thread(new TimerTask()); 
 -  
 - timer.setName("sentinel-timer-task"); 
 -  
 - timer.start(); 
 -  
 - } 
 -  
 - static class TimerTask implements Runnable { 
 -  
 - @Override 
 -  
 - public void run() { 
 -  
 - long start = System.currentTimeMillis(); 
 -  
 - System.out.println("begin to statistic!!!"); 
 -  
 - long oldTotal = 0; 
 -  
 - long oldPass = 0; 
 -  
 - long oldBlock = 0; 
 -  
 - while (!stop) { 
 -  
 - try { 
 -  
 - TimeUnit.SECONDS.sleep(1); 
 -  
 - } catch (InterruptedException e) { 
 -  
 - } 
 -  
 - long globalTotal = total.get(); 
 -  
 - long oneSecondTotal = globalTotal - oldTotal; 
 -  
 - oldTotal = globalTotal; 
 -  
 - long globalPass = pass.get(); 
 -  
 - long oneSecondPass = globalPass - oldPass; 
 -  
 - oldPass = globalPass; 
 -  
 - long globalBlock = block.get(); 
 -  
 - long oneSecondBlock = globalBlock - oldBlock; 
 -  
 - oldBlock = globalBlock; 
 -  
 - System.out.println(seconds + " send qps is: " + oneSecondTotal); 
 -  
 - System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal 
 -  
 - + ", pass:" + oneSecondPass 
 -  
 - + ", block:" + oneSecondBlock); 
 -  
 - if (seconds-- <= 0) { 
 -  
 - stop = true; 
 -  
 - } 
 -  
 - } 
 -  
 - long cost = System.currentTimeMillis() - start; 
 -  
 - System.out.println("time cost: " + cost + " ms"); 
 -  
 - System.out.println("total:" + total.get() + ", pass:" + pass.get() 
 -  
 - + ", block:" + block.get()); 
 -  
 - System.exit(0); 
 -  
 - } 
 -  
 - } 
 -  
 - static class RunTask implements Runnable { 
 -  
 - @Override 
 -  
 - public void run() { 
 -  
 - while (!stop) { 
 -  
 - Entry entry = null; 
 -  
 - try { 
 -  
 - entry = SphU.entry(KEY); 
 -  
 - // token acquired, means pass 
 -  
 - pass.addAndGet(1); 
 -  
 - } catch (BlockException e1) { 
 -  
 - block.incrementAndGet(); 
 -  
 - } catch (Exception e2) { 
 -  
 - // biz exception 
 -  
 - } finally { 
 -  
 - total.incrementAndGet(); 
 -  
 - if (entry != null) { 
 -  
 - entry.exit(); 
 -  
 - } 
 -  
 - } 
 -  
 - Random random2 = new Random(); 
 -  
 - try { 
 -  
 - TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); 
 -  
 - } catch (InterruptedException e) { 
 -  
 - // ignore 
 -  
 - } 
 -  
 - } 
 -  
 - } 
 -  
 - } 
 -  
 - } 
 
  
 
执行上面的代码后,打印出如下的结果: 
 
可以看到,上面的结果中,pass的数量和我们的预期并不相同,我们预期的是每秒允许pass的请求数是20个,但是目前有很多pass的请求数是超过20个的。 
原因是,我们这里测试的代码使用了多线程,注意看 threadCount 的值,一共有32个线程来模拟,而在RunTask的run方法中执行资源保护时,即在 SphU.entry 的内部是没有加锁的,所以就会导致在高并发下,pass的数量会高于20。 
可以用下面这个模型来描述下,有一个TimeTicker线程在做统计,每1秒钟做一次。有N个RunTask线程在模拟请求,被访问的business code被资源key保护着,根据规则,每秒只允许20个请求通过。 
由于pass、block、total等计数器是全局共享的,而多个RunTask线程在执行SphU.entry申请获取entry时,内部没有锁保护,所以会存在pass的个数超过设定的阈值。 
 
那为了证明在单线程下限流的正确性与可靠性,那我们的模型就应该变成了这样: 
 
那接下来我把 threadCount 的值改为1,只有一个线程来执行这个方法,看下具体的限流结果,执行上面的代码后打印的结果如下: 
 
可以看到pass数基本上维持在20,但是第一次统计的pass值还是超过了20。这又是什么原因导致的呢? 
其实仔细看下Demo中的代码可以发现,模拟请求是用的一个线程,统计结果是用的另外一个线程,统计线程每1秒钟统计一次结果,这两个线程之间是有时间上的误差的。从TimeTicker线程打印出来的时间戳可以看出来,虽然每隔一秒进行统计,但是当前打印时的时间和上一次的时间还是有误差的,不完全是1000ms的间隔。 
要真正验证每秒限制20个请求,保证数据的精准性,需要做基准测试,这个不是本篇文章的重点,有兴趣的同学可以去了解下jmh,sentinel中的基准测试也是通过jmh做的。 
深入原理 
通过一个简单的示例程序,我们了解了sentinel可以对请求进行限流,除了限流外,还有降级和系统保护等功能。那现在我们就拨开云雾,深入源码内部去一窥sentinel的实现原理吧。 
首先从入口开始: SphU.entry() 。这个方法会去申请一个entry,如果能够申请成功,则说明没有被限流,否则会抛出BlockException,表面已经被限流了。 
从 SphU.entry() 方法往下执行会进入到 Sph.entry() ,Sph的默认实现类是 CtSph ,在CtSph中最终会执行到 entry(ResourceWrapperresourceWrapper,intcount,Object...args)throwsBlockException 这个方法。 
我们来看一下这个方法的具体实现: 
- public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { 
 -  
 - Context context = ContextUtil.getContext(); 
 -  
 - if (context instanceof NullContext) { 
 -  
 - // Init the entry only. No rule checking will occur. 
 -  
 - return new CtEntry(resourceWrapper, null, context); 
 -  
 - } 
 -  
 - if (context == null) { 
 -  
 - context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); 
 -  
 - } 
 -  
 - // Global switch is close, no rule checking will do. 
 -  
 - if (!Constants.ON) { 
 -  
 - return new CtEntry(resourceWrapper, null, context); 
 -  
 - } 
 -  
 - // 获取该资源对应的SlotChain 
 -  
 - ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); 
 -  
 - /* 
 -  
 - * Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no 
 -  
 - * rule checking will be done. 
 -  
 - */ 
 -  
 - if (chain == null) { 
 -  
 - return new CtEntry(resourceWrapper, null, context); 
 -  
 - } 
 -  
 - Entry e = new CtEntry(resourceWrapper, chain, context); 
 -  
 - try { 
 -  
 - // 执行Slot的entry方法 
 -  
 - chain.entry(context, resourceWrapper, null, count, args); 
 -  
 - } catch (BlockException e1) { 
 -  
 - e.exit(count, args); 
 -  
 - // 抛出BlockExecption 
 -  
 - throw e1; 
 -  
 - } catch (Throwable e1) { 
 -  
 - RecordLog.info("Sentinel unexpected exception", e1); 
 -  
 - } 
 -  
 - return e; 
 -  
 - } 
 
  
 
这个方法可以分为以下几个部分: 
    - 1.对参数和全局配置项做检测,如果不符合要求就直接返回了一个CtEntry对象,不会再进行后面的限流检测,否则进入下面的检测流程。
 
    - 2.根据包装过的资源对象获取对应的SlotChain
 
    - 3.执行SlotChain的entry方法
 
    - 3.1.如果SlotChain的entry方法抛出了BlockException,则将该异常继续向上抛出
 
    - 3.2.如果SlotChain的entry方法正常执行了,则最后会将该entry对象返回
 
    - 4.如果上层方法捕获了BlockException,则说明请求被限流了,否则请求能正常执行
 
 
其中比较重要的是第2、3两个步骤,我们来分解一下这两个步骤。 
创建SlotChain 
首先看一下lookProcessChain的方法实现: 
- private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { 
 -  
 - ProcessorSlotChain chain = chainMap.get(resourceWrapper); 
 -  
 - if (chain == null) { 
 -  
 - synchronized (LOCK) { 
 -  
 - chain = chainMap.get(resourceWrapper); 
 -  
 - if (chain == null) 
 -  
 - // Entry size limit. 
 -  
 - if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { 
 -  
 - return null; 
 -  
 - } 
 -  
 - // 具体构造chain的方法 
 -  
 - chain = Env.slotsChainbuilder.build(); 
 -  
 - Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1); 
 -  
 - newMap.putAll(chainMap); 
 -  
 - newMap.put(resourceWrapper, chain); 
 -  
 - chainMap = newMap; 
 -  
 - } 
 -  
 - } 
 -  
 - } 
 -  
 - return chain; 
 -  
 - } 
 
  
 
                                                (编辑:我爱故事小小网_铜陵站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |