Sentinel 是面向分布式服务架构的轻量级流量控制框架,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助您保护服务的稳定性。

 

1. Sentinel资源&规则

我们说的资源,可以是任何东西,服务,服务里的方法,甚至是一段代码。使用 Sentinel 来进行资源保护,主要分为两个步骤:

  1. 定义资源
  2. 定义规则

先把可能需要保护的资源定义好,之后再配置规则。也可以理解为,只要有了资源,我们就可以在任何时候灵活地定义各种流量控制规则。在编码的时候,只需要考虑这个代码是否需要保护,如果需要保护,就将之定义为一个资源。

2. Demo分析

以QPS流控为分析样例

定义规则

private static void initFlowQpsRule() {
    //可以看出规则是个链表,那么意味着可以一个资源对应多个规则
    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    //规则设置资源名字
    rule1.setResource("abc");
    //设置现在qps为20
    rule1.setCount(20);
    //设置流控的规则的以QPS为准,还有以线程为准
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    //根据调用方进行流量控制,默认就是全部生效
    rule1.setLimitApp("default");
    rules.add(rule1);
    //添加进流控规则管理中
    FlowRuleManager.loadRules(rules);
}

定义资源

Entry entry = null;
try {
    entry = SphU.entry(”abc“);
    //意味着通过
} catch (BlockException e1) {
    //意味着限流了
} catch (Exception e2) {
    // 业务异常
} finally {
    //确保这里一定要执行
    if (entry != null) {
        entry.exit();
    }
}

可以看到这个限制了只能20个pass,其他block

Debug分析

entry = SphU.entry(”abc“);

SphU.entry

public static Entry entry(String name) throws BlockException {

    return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);

}

这里是sph.entry实际上是CtSph.entry方法

CtSph.entry

public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {

    StringResourceWrapper resource = new StringResourceWrapper(name, type);

    return entry(resource, count, args);

}

这里根据name和type创建个StringResourceWrapper,name是我们传递进去的abc,type是EntryType.OUT表示出站流量Outbound traffic

CtSph.entry

public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {

    return entryWithPriority(resourceWrapper, count, false, args);

}

CtSph.entryWithPriority

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)

    throws BlockException {

    Context context = ContextUtil.getContext();

    if (context instanceof NullContext) {

        //进入这里代表Context超过阈值,这里只初始化entry,没有规则检验

        return new CtEntry(resourceWrapper, null, context);

    }



    if (context == null) {

        //使用默认的context

        context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());

    }



    // 全局开关是不是关闭了,不然也不会进行规则检查

    if (!Constants.ON) {

        return new CtEntry(resourceWrapper, null, context);

    }

    //这里生成一个链表,责任链的体现

    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);



    //在生成chain的里面有个判断,如果chainMap.size大于一个值就返回null,也不进行规则检测

    if (chain == null) {

        return new CtEntry(resourceWrapper, null, context);

    }

    //下面这里才真正开始,生成个entry

    Entry e = new CtEntry(resourceWrapper, chain, context);

    try {

        //开始检验规则

        chain.entry(context, resourceWrapper, null, count, prioritized, args);

    } catch (BlockException e1) {

        //限流了,往上抛,这里exit了,而外界finally需要判空再exit

        e.exit(count, args);

        throw e1;

    } catch (Throwable e1) {

        // This should not happen, unless there are errors existing in Sentinel internal.

        RecordLog.info("Sentinel unexpected exception", e1);

    }

    //正常pass情况

    return e;

}

这里分为几个步骤

  1. 做一些检查,全局开关,是不是超过Context阈值,entry数量是不是超过阈值等等,符合就返回个CtEntry不做后面的规则检查
  2. 根据resourceWrapper生成个slot责任链
    1. 如果抛出BlockException,entry就是exit然后抛给上层异常,让上层感知到block了,限流了
    2. 正常通过那就返回entry,上层就知道没有被限流

 

CtSph. lookProcessChain

//这里是线程不安全的Map,而且加了volatile

private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap

    = new HashMap<ResourceWrapper, ProcessorSlotChain>();





ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {

    //先从缓存中获取,看能不能获取到

    ProcessorSlotChain chain = chainMap.get(resourceWrapper);

    //这里做了double check 单例,所以HashMap没有线程不安全,加volatile是为了让其他线程立刻看到

    if (chain == null) {

        synchronized (LOCK) {

            chain = chainMap.get(resourceWrapper);

            if (chain == null) {

                //chainMap大小大于一个值,也就是entry数量大小限制了,一个chain对应一个entry

                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {

                    return null;

                }

                //新建Chain

                chain = SlotChainProvider.newSlotChain();

                //这里是逻辑是,新建一个Map大小是oldMap+1

                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(

                    chainMap.size() + 1);

                //然后先整体放入oldMap,再放新建的chain

                newMap.putAll(chainMap);

                newMap.put(resourceWrapper, chain);

                //替换

                chainMap = newMap;

                //这里的逻辑,应该是避免频繁的扩容!因为Constants.MAX_SLOT_CHAIN_SIZE默认6000,扩容一次的消耗太大

            }

        }

    }

    return chain;

}

SlotChainProvider.newSlotChain

public static ProcessorSlotChain newSlotChain() {

    if (builder != null) {

        return builder.build();

    }



    resolveSlotChainBuilder();



    if (builder == null) {

        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");

        builder = new DefaultSlotChainBuilder();

    }

    return builder.build();

}

这里是SPI扩展点,如果自己扩展,那么builder就是自己的,不走DefaultSlotChainBuilder,先不关注SPI,关注默认的DefaultSlotChainBuilder

DefaultSlotChainBuilder.build

public ProcessorSlotChain build() {

    ProcessorSlotChain chain = new DefaultProcessorSlotChain();

    chain.addLast(new NodeSelectorSlot());

    chain.addLast(new ClusterBuilderSlot());

    chain.addLast(new LogSlot());

    chain.addLast(new StatisticSlot());

    chain.addLast(new SystemSlot());

    chain.addLast(new AuthoritySlot());

    chain.addLast(new FlowSlot());

    chain.addLast(new DegradeSlot());



    return chain;

}

很明显Chain是一个责任链模式,本质上是个链表,添加很多的slot

然后我们得进入DefaultProcessorSlotChain看看

3. DefaultProcessorSlotChain

3.1. 初始化

在这里层次图中,以Slot结尾的是Chain中的元素,也就是一个一个的handler,这里叫slot而已,

这里的模式很有意思

在DefaultProcessorSlotChain中有两个元素first和end两个引用,类型是AbstractLinkedProcessorSlot,实际上指向是那些AbstractLinkedProcessorSlot的子类,FlowSlot之类

AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {



    @Override

    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)

        throws Throwable {

        super.fireEntry(context, resourceWrapper, t, count, prioritized, args);

    }



    @Override

    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {

        super.fireExit(context, resourceWrapper, count, args);

    }



};

AbstractLinkedProcessorSlot<?> end = first;

刚刚开始的情况,frist和end都指向一个匿名内部类

 添加Slot

最后分析的结果和debug的结果相同

SlotChain的entry方法

下面开始看一个entry走过的流程

CtSph.entryWithPriority

chain.entry(context, resourceWrapper, null, count, prioritized, args);

这里进入的是DefaultProcessorSlotChain的entry方法

public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)

    throws Throwable {

    first.transformEntry(context, resourceWrapper, t, count, prioritized, args);

}

transformEntry方法在AbstractLinkedProcessorSlot中被定义

void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)

    throws Throwable {

    T t = (T)o;

    entry(context, resourceWrapper, t, count, prioritized, args);

}

这里entry方法是AbstractLinkedProcessorSlot$1这个匿名内部类重写的那个方法

@Override

public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)

    throws Throwable {

    super.fireEntry(context, resourceWrapper, t, count, prioritized, args);

}

然后又是调用父类方法super.fireEntry

@Override

public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)

    throws Throwable {

    if (next != null) {

        next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);

    }

}

这里就开始调用下一个slot执行逻辑了

这里需要关注一点

entry方法          各个Slot自己实现

fireEntry方法     AbstractLinkedProcessorSlot定义好了   如果next不为空触发transformEntry方法

transformEntry方法    AbstractLinkedProcessorSlot定义好了  触发自定义的entry方法

到此,Sentinel的工作流程架构就梳理完成

具体的功能是Slot的部分

可以看下Sentinel自带提供了那些Slot

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;

 

下一章:构建节点资源树