中间件-dubbo之远程交互

中间件之dubbo整理总结二

Posted by Kang on September 16, 2019

  承接RPC模型,对consumer而言,Dubbo协议对每个Service默认是基于Netty单一长连接和NIO异步通讯的,适合小数据大并发的服务调用。在consumer端,会对需要调用的每个服务都创建一个服务代理bean,即Reference,该服务代理bean在consumer就是一个跟使用Spring的@Service注解修饰的普通Service类bean一样注册到spring容器,也是单例的。
Dubbo是如何实现单一长连接并发请求时包干扰(“串线”)的呢;

Dubbo调用流程(Dubbo.version = 2.6.4)

1. 远程接口代理

  对于每一个远程调用的接口,根据配置会产生一个ReferenceBean实例,调用方将以调用本地服务的方式无感知的调用ReferenceBean实例;

1.1 URL总线

  一个URL示例:

1
dubbo://127.0.0.1:9090/com.example.service1?param1=value1&param2=value2

  其包含了协议、主机、端口、交互参数等几部分

2. ReferenceBean的解析生成(全透明融入spring)

  我们知道spring的入口方法就是refresh(),从该方法跟踪进去:obtainFreshBeanFactory()->refreshBeanFactory()->loadBeanDefinitions(beanFactory)->loadBeanDefinitions(beanDefinitionReader)->..->XmlBeanDefinitionReader#loadBeanDefinitions->registerBeanDefinitions:

1
2
3
4
5
6
public int registerBeanDefinitions(Document doc, Resource resource) throws BeanDefinitionStoreException {
  BeanDefinitionDocumentReader documentReader = this.createBeanDefinitionDocumentReader();
  int countBefore = this.getRegistry().getBeanDefinitionCount();
  documentReader.registerBeanDefinitions(doc, this.createReaderContext(resource)); //上下文中封装外部处理器
  return this.getRegistry().getBeanDefinitionCount() - countBefore;
}

createReaderContext通过外部classPath方法生成引入了处理器:

1
2
3
4
protected NamespaceHandlerResolver createDefaultNamespaceHandlerResolver() {
        ClassLoader cl = this.getResourceLoader() != null ? this.getResourceLoader().getClassLoader() : this.getBeanClassLoader();
        return new DefaultNamespaceHandlerResolver(cl);
    }
1
2
3
public DefaultNamespaceHandlerResolver(@Nullable ClassLoader classLoader) {
     this(classLoader, "META-INF/spring.handlers");  //classpath目录,也即外部如dubbo等框架的解析器扩展
 }

  接着看代码到DefaultBeanDefinitionDocumentReader#parseBeanDefinitions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void parseBeanDefinitions(Element root, BeanDefinitionParserDelegate delegate) {
  if (delegate.isDefaultNamespace(root)) {
      NodeList nl = root.getChildNodes();

      for(int i = 0; i < nl.getLength(); ++i) {
          Node node = nl.item(i);
          if (node instanceof Element) {
              Element ele = (Element)node;
              if (delegate.isDefaultNamespace(ele)) {
                  this.parseDefaultElement(ele, delegate);
              } else {
                  delegate.parseCustomElement(ele);
              }
          }
      }
  } else {
      delegate.parseCustomElement(root);
  }

}

  可以看到,当不为默认Namespace命名空间(spring)时,走的上面上下文中存储的外部定制处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public BeanDefinition parseCustomElement(Element ele, @Nullable BeanDefinition containingBd) {
  String namespaceUri = this.getNamespaceURI(ele);
  if (namespaceUri == null) {
      return null;
  } else {
      NamespaceHandler handler = this.readerContext.getNamespaceHandlerResolver().resolve(namespaceUri);
      if (handler == null) {
          this.error("Unable to locate Spring NamespaceHandler for XML schema namespace [" + namespaceUri + "]", ele);
          return null;
      } else {
          return handler.parse(ele, new ParserContext(this.readerContext, this, containingBd));
      }
  }
}

根据元素的namespaceUri(也就是xml文件头那一串schema)找到对应的NamespaceHandler.【这里就是扩展自定义配置的入口】,然后调用解析器进行parse解析出BeanDefinition。

3. 代用总出口

  ReferenceAnnotationBeanPostProcessor实现了@Reference解析管理,其核心方法就是调用到ReferenceBean.get(),在通过FactoryBean接管了Dubbo代理对象的创建。
  ReferenceBean实例为一个服务代理refer,该refer包含一个DubboInvoker调用器invoker实例;

  1. ReferenceBean继承了ReferenceConfig类并实现了FactoryBean接口,ReferenceConfig实现了FactoryBean.getObject()转调用的get()方法,其中get–>init中存在:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 private void init() {
  ...
  StaticContext.getSystemContext().putAll(attributes);
  this.ref = this.createProxy(map);
  ...
 }
  private T createProxy(Map<String, String> map) {
    ...
    if(this.urls.size() == 1) {
       this.invoker = refprotocol.refer(this.interfaceClass, (URL)this.urls.get(0));  //默认为dubbo协议,即invoker默认为DubboInvoker
    } 
     ...
     return proxyFactory.getProxy(this.invoker);   //对Dubbo中使用的协议Invoker对象进行代理
  }
   public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
  }

其中,refprotocol为SPI注入,根据Protocol类的@SPI(“dubbo”),取默认为DubboProtocol实例。DubboProtocol#refer返回DubboInvoker实例对象。需要注意的是:代理对象接口为业务接口,其处理类Handler中包装了传入的invoker对象,关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);  
        }
        ...
        return invoker.invoke(new RpcInvocation(method, args)).recreate();// 参数转调到invoker对象
    }

}

4. 底层通讯

  DubboInvoker实例包含一个或多个ExchangerClient实例,默认为1个,通过DubboProtocol#refer#getClients中的connections参数指定,其默认实现类为HeaderExchangeClient,初始化方式为DubboInvoker生成实例时,ExchangeClient[]取值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private ExchangeClient[] getClients(URL url) {
  // whether to share connection
  boolean service_share_connect = false;
  int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
  // if not configured, connection is shared, otherwise, one connection for one service
  if (connections == 0) { //默认
      service_share_connect = true;
      connections = 1;
  }

  ExchangeClient[] clients = new ExchangeClient[connections];
  for (int i = 0; i < clients.length; i++) {
      if (service_share_connect) {
          clients[i] = getSharedClient(url); //默认
      } else {
          clients[i] = initClient(url);
      }
  }
  return clients;
}

一路追踪:getSharedClient->initClient->Exchangers.connect->HeaderExchanger#connect,在connect中给出了默认HeaderExchangeClient类型的client对象,其最终在getSharedClient方法中被ReferenceCountExchangeClient进行了装饰包装。

1
2
3
4
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
     return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
 }

对Transporters.connect进行追踪,可以看到下面NettyTransporter#connect的方法:

1
2
3
4
 @Override
 public Client connect(URL url, ChannelHandler listener) throws RemotingException {
     return new NettyClient(url, listener);
 }

故可以知道,默认情况下使用NettyClient进行通讯
  HeaderExchangeClient使用Netty的Bootstrap作为远程通讯client。并使用HeaderExchangeChannel作为channel作为通讯通道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public HeaderExchangeClient(Client client) {
        if(client == null) {
            throw new IllegalArgumentException("client == null");
        } else {
            this.client = client;
            this.channel = new HeaderExchangeChannel(client); // 使用HeaderExchangeChannel作为底层通讯通道
            String dubbo = client.getUrl().getParameter("dubbo");
            this.heartbeat = client.getUrl().getParameter("heartbeat", dubbo != null && dubbo.startsWith("1.0.")?'\uea60':0);
            this.heartbeatTimeout = client.getUrl().getParameter("heartbeat.timeout", this.heartbeat * 3);
            if(this.heartbeatTimeout < this.heartbeat * 2) {
                throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
            } else {
                this.startHeatbeatTimer();
            }
        }
    }

5. 服务调用

前面介绍到,在Spring启动的时候,在Spring容器中注入的是proxyFactory动态创建的一个远程服务的代理对象。在调用动态代理对象时,将调用代理InvokerInvocationHandler对象invoke方法,也即如前面描述,转调到AbstractInvoker#invoke,而调用其子类DubboInvoker(默认)的doInvoke方法:

1
2
3
4
5
6
7
public Result invoke(Invocation inv) throws RpcException {
...
try {
    return this.doInvoke(invocation);
}
...
}

6. 远程返回

DubboInvoker的doInvoke如下,其中参数Invocation封装了接口、方法、参数、附加参数(Attachments)等信息。其中,DefaultFuture构造函数中存在Static属性FUTURES存放了请求任务。

1
2
3
4
5
6
7
8
9
public DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // put into waiting map.
    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected Result doInvoke(final Invocation invocation) throws Throwable {
...
ExchangeClient currentClient;
if (clients.length == 1) {
   currentClient = clients[0];
} else {
   currentClient = clients[index.getAndIncrement() % clients.length];
}
...
try {
   boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
   boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
   int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
   if (isOneway) { //无结果
       boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
       currentClient.send(inv, isSent);
       RpcContext.getContext().setFuture(null);
       return new RpcResult();
   } else if (isAsync) { //异步方式
       ResponseFuture future = currentClient.request(inv, timeout);// 前面分析指知currentClient为ReferenceCountExchangeClient,检查超时时间线程为DefaultFuture
       RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); // 放在ThreadLocal中,FutureAdapter 是一个适配器,用于将 Dubbo 中的 ResponseFuture 与 JDK 中的 Future 进行适配。
       return new RpcResult(); //异步返回结果
   } else { //同步调用
       RpcContext.getContext().setFuture(null);
       return (Result) currentClient.request(inv, timeout).get();
   }
}
...
}

前面讲到ReferenceCountExchangeClient对HeaderExchangeClient进行了包装,故最终request底层通讯在HeaderExchangeChannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ResponseFuture request(Object request, int timeout) throws RemotingException {
  if (closed) {
      throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
  }
  // 其中,Request中mId为当前应用的全局id,生成方法见newId()
  Request req = new Request();
  req.setVersion(Version.getProtocolVersion());
  req.setTwoWay(true);
  req.setData(request);
  DefaultFuture future = new DefaultFuture(channel, req, timeout); //异步响应对象,返回给调用方,携带了唯一ID
  try {
      channel.send(req); //最终调用底层Netty等通讯框架进行数据传输
  } catch (RemotingException e) {
      future.cancel();
      throw e;
  }
  return future;
}

//INVOKE_ID为AtomicLong
private static long newId() {
    // getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
    return INVOKE_ID.getAndIncrement();
}

7. 结果获取

此处讲两种处理方式:

  1. 主动调用模式 异步调用模式下,则由用户调用ResponseFuture的 get 方法。通过RpcContext来获取上述DefaultFuture对象来获取请求结果,会阻塞至服务器端返产生结果给客户端。
1
2
3
4
5
6
//配置需要设置异步标识
<dubbo:reference id="xxx" ....>
    <dubbo:method name="method1" async="true" />
</dubbo:reference>

RpcContext.getContext().getFuture().get();
  1. 事件通知模式 为了支持时间回调,dubbo引入了特定的过滤器FutureFilter来处理异步调用相关的逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//需要配置异步回调处理对象
<bean id="notify" class="com.alibaba.dubbo.callback.implicit.NofifyImpl" />
<dubbo:reference >
   <dubbo:method name="method1" async="true" onreturn="notify.onreturn" onthrow="notify.onthrow" oninvoke="notify.invoke" />
</dubbo:reference>
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {

    @Override
    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
        fireInvokeCallback(invoker, invocation);// 反射调用配置的通知对象,如:上面配置的Notify对象处理方法(--怎么感觉不是有结果后通知,而是发生远程调用回调通知?)
        Result result = invoker.invoke(invocation);
        if (isAsync) {
            asyncCallback(invoker, invocation);
        } else {
            syncCallback(invoker, invocation, result);
        }
        return result;
    }
    
}

在过滤器中,fireInvokeCallback去调用回调对象的方法做通知。asyncCallback进入口可以明显的看到和主动调用一样的代码,getFuture()异步等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
  Future<?> f = RpcContext.getContext().getFuture();
  if (f instanceof FutureAdapter) {
      ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
      future.setCallback(new ResponseCallback() {
          @Override
          public void done(Object rpcResult) {
              if (rpcResult == null) {
                  logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                  return;
              }
              ///must be rpcResult
              if (!(rpcResult instanceof Result)) {
                  logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                  return;
              }
              Result result = (Result) rpcResult;
              if (result.hasException()) {
                  fireThrowCallback(invoker, invocation, result.getException());
              } else {
                  fireReturnCallback(invoker, invocation, result.getValue());
              }
          }

          @Override
          public void caught(Throwable exception) {
              fireThrowCallback(invoker, invocation, exception);
          }
      });
  }
}

  从上面可以看到,ResponseFuture future = ((FutureAdapter<?>) f).getFuture(); 和主动调用一样先获取Futrue,然后在当前未拿到响应的时候,先赋值给DefaultFuture的实例ResponseCallback对象作为异步处理回调。
  那么之后会在什么时候执行回调函数的方法呢?当consumer接收到provider的响应的时候!

8. 异步回调结果接收

  前面提到,客户端底层默认通过NettyClient来进行网络传输,在NettyClient#doOpen打开网络时,底层handler传入了NettyHandler/NettyClientHandler(netty4,同样可知,服务端为NettyServerHandler)对象,NettyHandler接收网络消息:

1
2
3
4
5
6
7
8
9
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
  try {
      handler.received(channel, e.getMessage());
  } finally {
      NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
  }
}

HeaderExchangeHandler的received处理方法对于Response响应进行转调handleResponse:

1
2
3
4
5
static void handleResponse(Channel channel, Response response) throws RemotingException {
  if (response != null && !response.isHeartbeat()) {
      DefaultFuture.received(channel, response);
  }
}

此处调用了DefaultFuture,也即在received方法中进行了异步转同步,其中,根据请求-响应头中携带的ID作为标识,获取Task任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void received(Channel channel, Response response) {
  try {
      DefaultFuture future = FUTURES.remove(response.getId()); 
      if (future != null) {
          future.doReceived(response);
      } else {
          logger.warn("The timeout response finally returned at "
                  + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                  + ", response " + response
                  + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                  + " -> " + channel.getRemoteAddress()));
      }
  } finally {
      CHANNELS.remove(response.getId());
  }
}

其中FUTURES为ConcurrentHashMap类型,通过唯一id进行查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void doReceived(Response res) {
  lock.lock();
  try {
      response = res;
      if (done != null) {
          done.signal();
      }
  } finally {
      lock.unlock();
  }
  if (callback != null) {
      invokeCallback(callback);
  }
}

doReceived中invokeCallback调用了前面FutureFilter#asyncCallback方法设置回调对象,进而最终在fireReturnCallback进行反射回调。