Dubbo 服务降级分析

Posted by Jay on May 7, 2019

Dubbo 服务降级分析

Dubbo 客户端发起请求过程分析中截取客户端请求总体流程中的一部分:

//代理发出请求
proxy0.sayHello(String paramString)
-->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
  -->new RpcInvocation(method, args)
  -->MockClusterInvoker.invoke(Invocation invocation)//服务降级的地方

Dubbo是通过MockClusterInvoker来实现服务降级的。

一、实例

public interface DemoService {
		// String sayHello(String name);
		Car sayHello(String name);
}

将dubbo-demo中的服务接口定义改为返回Car对象。提供者实现如下:

public class DemoServiceImpl implements DemoService {
    public Car sayHello(String name) {
        Car car = new Car();
        car.setCarNum("浙A10000");
        car.setGoMile(100);
        return car;
    }
}

消费者使用如下:

public class Consumer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
        context.start();
        DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy

        while (true) {
            try {
                Thread.sleep(1000);
                Car hello = demoService.sayHello("world"); // call remote method
                System.out.println(hello.getCarNum() + "-" + hello.getGoMile()); // get result
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
        }
    }
}

二、使用方式

实际使用中,会通过直接在dubbo-admin中设置服务降级策略,这里使用dubbo用户手册中的方式来更清晰的看一下服务降级的配置(实际上就是进行配置覆盖(消费者))。

配置规则

1、使用自定义mock类(接口名+Mock)

  • mock = default => DemoServiceMock
  • mock = true => DemoServiceMock
  • mock = fail => DemoServiceMock
  • mock = force => DemoServiceMock

2、先普通执行,执行失败之后再执行相应的mock逻辑

  • mock = fail:throw => throw new RpcException(“ mocked exception for Service degradation. “);
  • mock = fail:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
  • mock = fail:return => return null
  • mock = fail:return xxx => return xxx
  • mock = fail:return empty => return new Car()

3、直接执行相应的mock逻辑

  • mock = force:throw => throw new RpcException(“ mocked exception for Service degradation. “);
  • mock = force:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
  • mock = force:return => return null
  • mock = force:return xxx => return xxx
  • mock = force:return empty => return new Car()

进行配置:

public class DegradeTest {
    public static void main(String[] args) {
        RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
        Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.211.55.5:2181"));
        // return null;
        registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return"));
        registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return+null"));
        // return 空对象;
        registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return+empty"));
        // return value;
        registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return+hello"));
        // throw exception
        registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:throw"));
        // throw custom-msg exception
        registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:throw+com.alibaba.dubbo.Test.MyRuntimeException"));
        // 执行mock类
        registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force"));
    }
}

上述需要注意的是需要配置为“force:return+null”的格式而非“force:return null”。(实际上空格的url encode就是+号),上述代码的执行,实际上是在zk上创建configurators的子节点:

关于覆盖配置:配置规则

  • override:// 表示数据采用覆盖方式,支持 overrideabsent,可扩展,必填。
  • 0.0.0.0 表示对所有 IP 地址生效,如果只想覆盖某个 IP 的数据,请填入具体 IP,必填。
  • com.alibaba.dubbo.demo.DemoService表示只对指定服务生效,必填。
  • category=configurators 表示该数据为动态配置类型,必填。
  • dynamic=false 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填。
  • enabled=true 覆盖规则是否生效,可不填,缺省生效。
  • application=demo-consumer 表示只对指定应用生效,可不填,表示对所有应用生效。
  • mock=force:return+null表示将满足以上条件的 mock 参数的值覆盖为 force:return+null。如果想覆盖其它参数,直接加在 override 的 URL 参数上。

三、源码分析

public class MockClusterInvoker<T> implements Invoker<T> {
    private final Directory<T> directory; //RegistryDirectory:存储invoker列表
    private final Invoker<T> invoker; //FailoverClusterInvoker:容错策略

    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            ...
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    ...
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
}

首先去获取mock参数,

  • 如果没有配置,则直接使用FailoverClusterInvoker去正常的向provider发出请求;
  • 如果配置为以force开头的,则直接执行doMockInvoke(Invocation invocation, RpcException e),不再向provider发送请求;
  • 如果配置为以fail开头的,则先使用FailoverClusterInvoker去正常的向provider发出请求,如果失败抛出了非业务异常,则执行doMockInvoke(Invocation invocation, RpcException e);
private Result doMockInvoke(Invocation invocation, RpcException e) {
    Result result = null;
    Invoker<T> minvoker;

    List<Invoker<T>> mockInvokers = selectMockInvoker(invocation); //获取mock类型的Invoker
    if (mockInvokers == null || mockInvokers.size() == 0) {
        minvoker = (Invoker<T>) new MockInvoker(directory.getUrl()); //如果没有配置mock类型的Invoker,则自己创建一个MockInvoker
    } else {
        minvoker = mockInvokers.get(0);
    }
    try {
        result = minvoker.invoke(invocation); //执行MockInvoker的invoke(Invocation invocation)方法
    } catch (RpcException me) {
        if (me.isBiz()) {
            result = new RpcResult(me.getCause());
        } else { //非业务异常
            throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
        }
    } catch (Throwable me) {
        throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
    }
    return result;
}

从RegistryDirectory中获取MockInvoker:

// 返回MockInvoker
// 契约:
// directory根据invocation中是否有Constants.INVOCATION_NEED_MOCK,来判断获取的是一个normal invoker 还是一个 mock invoker
// 如果directorylist 返回多个mock invoker,只使用第一个invoker.
private List<Invoker<T>> selectMockInvoker(Invocation invocation) {
    List<Invoker<T>> invokers = null;
    //TODO generic invoker?
    if (invocation instanceof RpcInvocation) {
        //存在隐含契约(虽然在接口声明中增加描述,但扩展性会存在问题.同时放在attachement中的做法需要改进
        ((RpcInvocation) invocation).setAttachment(Constants.INVOCATION_NEED_MOCK, Boolean.TRUE.toString());
        //directory根据invocation中attachment是否有Constants.INVOCATION_NEED_MOCK,来判断获取的是normal invokers or mock invokers
        try {
            invokers = directory.list(invocation);
        } catch (RpcException e) {
            if (logger.isInfoEnabled()) {
                logger.info("Exception when try to invoke mock. Get mock invokers error for service:"
                        + directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName()
                        + ", will contruct a new mock with 'new MockInvoker()'.", e);
            }
        }
    }
    return invokers;
}

首先使用RegistryDirectory获取出方法名为sayHello的Invoker列表,之后使用MockInvokersSelector(Router)选取出MockInvoker。

public class MockInvokersSelector implements Router {

    // 根据invocation attachment的属性选择invoker
    // @param invokers
    // @param url        refer url 消费者url
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, final Invocation invocation)
            throws RpcException {
        if (invocation.getAttachments() == null) {
            return getNormalInvokers(invokers); //  去除mock协议的invoker,返回
        } else {
            String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
            if (value == null) {
                return getNormalInvokers(invokers);
            } else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
                return getMockedInvokers(invokers); // 获取mock协议的provider
            }
        }
        return invokers;
    }

    private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) {
        if (!hasMockProviders(invokers)) {
            return null;
        }
        List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1);
        for (Invoker<T> invoker : invokers) {
            if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
                sInvokers.add(invoker); // 找出所有mock协议的provider
            }
        }
        return sInvokers;
    }

    private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
        if (!hasMockProviders(invokers)) {
            return invokers; // 没有mock协议的provider,直接返回
        } else {
            List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
            for (Invoker<T> invoker : invokers) {
                if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
                    sInvokers.add(invoker); // 去除mock协议的invoker,返回
                }
            }
            return sInvokers;
        }
    }

    // 是否有mock协议的提供者
    // @param invokers
    private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) {
        boolean hasMockProvider = false;
        for (Invoker<T> invoker : invokers) {
            if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
                hasMockProvider = true;
                break;
            }
        }
        return hasMockProvider;
    }

}

这里获取到的是空列表。所以会先创建一个MockInvoker对象,之后执行其invoke方法。

MockInvoker:

public Result invoke(Invocation invocation) throws RpcException {
    String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY); //key=sayHello.mock
    if (invocation instanceof RpcInvocation) {
        ((RpcInvocation) invocation).setInvoker(this);
    }
    if (StringUtils.isBlank(mock)) {
        mock = getUrl().getParameter(Constants.MOCK_KEY); //key=mock
    }

    if (StringUtils.isBlank(mock)) {
        throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
    }
    mock = normallizeMock(URL.decode(mock));
    if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())) { // return
        RpcResult result = new RpcResult();
        result.setValue(null);
        return result;
    } else if (mock.startsWith(Constants.RETURN_PREFIX)) { // return value(包括return null)
        mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
        mock = mock.replace('`', '"');
        try {
            Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
            Object value = parseMockValue(mock, returnTypes);
            return new RpcResult(value);
        } catch (Exception ew) {
            throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: " + url, ew);
        }
    } else if (mock.startsWith(Constants.THROW_PREFIX)) { // throw xxx
        mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
        mock = mock.replace('`', '"');
        if (StringUtils.isBlank(mock)) { // throw
            throw new RpcException(" mocked exception for Service degradation. ");
        } else { //用户自定义类 throw xxxException
            Throwable t = getThrowable(mock);
            throw new RpcException(RpcException.BIZ_EXCEPTION, t);
        }
    } else { //impl mock 自定义mock类
        try {
            Invoker<T> invoker = getInvoker(mock);
            return invoker.invoke(invocation);
        } catch (Throwable t) {
            throw new RpcException("Failed to create mock implemention class " + mock, t);
        }
    }
}

首先获取到mock配置,例如:mock=force:return+null,之后进行url解码为mock=force:return null,最后进行处理为mock=return null,然后根据规则走分支。

mock参数的处理函数:

// 一、使用自定义mock类
// mock = default => DemoServiceMock
// mock = true => DemoServiceMock
// mock = fail => DemoServiceMock
// mock = force => DemoServiceMock
//
// 二、先普通执行,执行失败之后再执行相应的mock逻辑
// mock = fail:throw => throw new RpcException(" mocked exception for Service degradation. ");
// mock = fail:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
// mock = fail:return => return null
// mock = fail:return xxx => return xxx
//
// 三、直接执行相应的mock逻辑
// mock = force:throw => throw new RpcException(" mocked exception for Service degradation. ");
// mock = force:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
// mock = force:return => return null
// mock = force:return xxx => return xxx
//
// @param mock
// @return
private String normallizeMock(String mock) {
    if (mock == null || mock.trim().length() == 0) {
        return mock;
    } else if (ConfigUtils.isDefault(mock) || "fail".equalsIgnoreCase(mock.trim()) || "force".equalsIgnoreCase(mock.trim())) {
        mock = url.getServiceInterface() + "Mock";
    }
    if (mock.startsWith(Constants.FAIL_PREFIX)) {
        mock = mock.substring(Constants.FAIL_PREFIX.length()).trim();
    } else if (mock.startsWith(Constants.FORCE_PREFIX)) {
        mock = mock.substring(Constants.FORCE_PREFIX.length()).trim();
    }
    return mock;
}

我们这里来看一下自定义mock类。消费端编写:

public class DemoServiceMock implements DemoService {

    @Override
    public Car sayHello(String name) {
        Car car = new Car();
        car.setCarNum("mock中");
        car.setGoMile(666);
        return car;
    }
}

配置覆盖:

registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force"));

MockInvoker.invoke:

try {
    Invoker<T> invoker = getInvoker(mock);
    return invoker.invoke(invocation);
} catch (Throwable t) {
    throw new RpcException("Failed to create mock implemention class " + mock, t);
}
private Invoker<T> getInvoker(String mockService) {
    Invoker<T> invoker = (Invoker<T>) mocks.get(mockService);
    if (invoker != null) {
        return invoker;
    } else {
        Class<T> serviceType = (Class<T>) ReflectUtils.forName(url.getServiceInterface());
        if (ConfigUtils.isDefault(mockService)) {
            mockService = serviceType.getName() + "Mock";
        }

        Class<?> mockClass = ReflectUtils.forName(mockService);
        if (!serviceType.isAssignableFrom(mockClass)) {
            throw new IllegalArgumentException("The mock implemention class " + mockClass.getName() + " not implement interface " + serviceType.getName());
        }

        if (!serviceType.isAssignableFrom(mockClass)) {
            throw new IllegalArgumentException("The mock implemention class " + mockClass.getName() + " not implement interface " + serviceType.getName());
        }
        try {
            T mockObject = (T) mockClass.newInstance(); // 获取自定义mock类实例
          	// 和普通类一样创建Invoker
            invoker = proxyFactory.getInvoker(mockObject, (Class<T>) serviceType, url);
            if (mocks.size() < 10000) {
                mocks.put(mockService, invoker);
            }
            return invoker;
        } catch (InstantiationException e) {
            throw new IllegalStateException("No such empty constructor \"public " + mockClass.getSimpleName() + "()\" in mock implemention class " + mockClass.getName(), e);
        } catch (IllegalAccessException e) {
            throw new IllegalStateException(e);
        }
    }
}

上边看了return和自定义mock类,最后来看一下throw异常。

默认抛出RpcException,异常信息:mocked exception for Service degradation. 也可以自定义异常,例如:

public class MyRuntimeException extends RuntimeException {
    private String msg;

    public MyRuntimeException(String msg){
        this.msg = msg;
    }
}

自定义异常必须具有单参构造器且参数为String。

配置覆盖:

registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:throw+com.alibaba.dubbo.Test.MyRuntimeException"));

MockInvoker.invoke:

mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
mock = mock.replace('`', '"');
if (StringUtils.isBlank(mock)) {
    throw new RpcException(" mocked exception for Service degradation. ");
} else { //用户自定义类
    Throwable t = getThrowable(mock);
    throw new RpcException(RpcException.BIZ_EXCEPTION, t);
}
private Throwable getThrowable(String throwstr) {
    Throwable throwable = (Throwable) throwables.get(throwstr);
    if (throwable != null) {
        return throwable;
    } else {
        Throwable t = null;
        try {
            Class<?> bizException = ReflectUtils.forName(throwstr);
            Constructor<?> constructor;
            constructor = ReflectUtils.findConstructor(bizException, String.class);
            t = (Throwable) constructor.newInstance(new Object[]{" mocked exception for Service degradation. "});
            if (throwables.size() < 1000) {
                throwables.put(throwstr, t);
            }
        } catch (Exception e) {
            throw new RpcException("mock throw error :" + throwstr + " argument error.", e);
        }
        return t;
    }
}

至此,Dubbo服务降级分析结束。