• [Dubbo3.0.8源码解析系列]-26-消费者一个服务是如何通过RPC调用到提供者的


    26-消费者一个服务是如何通过RPC调用到提供者的

    26.1 简介

    这个就要回到消费者的调用代码了主要代码如下:
    下面是我写的消费者样例代码中的一块代码:

    //我们这个DemoService是个接口实际的对象是由Dubbo内部通过动态代理的方式创建的一个对象类型为DemoServiceDubboProxy1
     DemoService demoService = bootstrap.getCache().get(reference);
     //消费者调用提供者的代码
            String message = demoService.sayHello("dubbo");
    
    • 1
    • 2
    • 3
    • 4

    DemoServiceDubboProxy1中的sayHello代理方法我们看不到这里直接看代理方法调用的方法:如下

    26.2 InvokerInvocationHandler类型的invoke

    参数:

    • proxy DemoServiceDubboProxy1
    • method sayHello方法对应的Java反射元数据Method
    • args 这里args为参数 我们只有一个参数值为dubbo
    
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            //方法名字为sayHello
            String methodName = method.getName();
            //参数类型只有一个为java.lang.String
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 0) {
                if ("toString".equals(methodName)) {
                    return invoker.toString();
                } else if ("$destroy".equals(methodName)) {
                    invoker.destroy();
                    return null;
                } else if ("hashCode".equals(methodName)) {
                    return invoker.hashCode();
                }
            } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
                return invoker.equals(args[0]);
            }
            //这个RpcInvocation比较重要主要是封装调用方法的一些元数据信息
            RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
    
            if (serviceModel instanceof ConsumerModel) {
                rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
                rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
            }
            //invoker为调用器我们继续看
            return InvocationUtil.invoke(invoker, rpcInvocation);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    继续看方法 InvocationUtil.invoke(invoker, rpcInvocation);

    26.3 InvocationUtil的invoke方法

    public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
        //url例子:consumer://192.168.1.169/link.elastic.dubbo.entity.DemoService?application=dubbo-demo-api-consumer&background=false&dubbo=2.0.2&interface=link.elastic.dubbo.entity.DemoService&methods=sayHello,sayHelloAsync&pid=99368&qos.enable=false&qos.port=-1®ister.ip=192.168.1.169&release=3.0.10&side=consumer&sticky=false×tamp=1661595732778
            URL url = invoker.getUrl();
            //这里无分组值为:link.elastic.dubbo.entity.DemoService
            String serviceKey = url.getServiceKey();
            rpcInvocation.setTargetServiceUniqueName(serviceKey);
    
            // invoker.getUrl() returns consumer url.
            RpcServiceContext.getServiceContext().setConsumerUrl(url);
    
            //默认前后开启性能分析
            if (ProfilerSwitch.isEnableSimpleProfiler()) {
                //创建一个InternalThreadLocal bizProfiler线程本地对象来存储性能信息
                //首次进入这个性能实体为空
                ProfilerEntry parentProfiler = Profiler.getBizProfiler();
                ProfilerEntry bizProfiler;
    
                //首次为空走下面逻辑创建个性能分析实体
                //这里如果是第二个调用invoker方法则将性能数据串起来前面的放到parent ProfilerEntry 内部用链表结构实现一个性能链路
                if (parentProfiler != null) {
                    bizProfiler = Profiler.enter(parentProfiler,
                        "Receive request. Client invoke begin. ServiceKey: " + serviceKey + " MethodName:" + rpcInvocation.getMethodName());
                } else {
                    bizProfiler = Profiler.start("Receive request. Client invoke begin. ServiceKey: " + serviceKey + " " + "MethodName:" + rpcInvocation.getMethodName());
                }
                rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
                try {
                    //第一个invoker类型为MigrationInvoker
                    return invoker.invoke(rpcInvocation).recreate();
                } finally {
                    Profiler.release(bizProfiler);
                    int timeout;
                    Object timeoutKey = rpcInvocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
                    if (timeoutKey instanceof Integer) {
                        timeout = (Integer) timeoutKey;
                    } else {
                        timeout = url.getMethodPositiveParameter(rpcInvocation.getMethodName(),
                            TIMEOUT_KEY,
                            DEFAULT_TIMEOUT);
                    }
                    long usage = bizProfiler.getEndTime() - bizProfiler.getStartTime();
                    if ((usage / (1000_000L * ProfilerSwitch.getWarnPercent())) > timeout) {
                        StringBuilder attachment = new StringBuilder();
                        rpcInvocation.foreachAttachment((entry) -> {
                            attachment.append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");
                        });
    
                        logger.warn(String.format(
                            "[Dubbo-Consumer] execute service %s#%s cost %d.%06d ms, this invocation almost (maybe already) timeout. Timeout: %dms\n" + "invocation context:\n%s" + "thread info: \n%s",
                            rpcInvocation.getProtocolServiceKey(),
                            rpcInvocation.getMethodName(),
                            usage / 1000_000,
                            usage % 1000_000,
                            timeout,
                            attachment,
                            Profiler.buildDetail(bizProfiler)));
                    }
                }
            }
            return invoker.invoke(rpcInvocation).recreate();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    26.4 MigrationInvoker类型的

    其实前面我们已经说过是如何决策应用级还是接口级的调用默认走应用级下面直接看应用级相关的invoker链路

     public Result invoke(Invocation invocation) throws RpcException {
            if (currentAvailableInvoker != null) {
                if (step == APPLICATION_FIRST) {
                    // call ratio calculation based on random value
                    if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
                        // fall back to interface mode
                        return invoker.invoke(invocation);
                    }
                    // check if invoker available for each time
                
                    return decideInvoker().invoke(invocation);
                }
                return currentAvailableInvoker.invoke(invocation);
            }
    
            switch (step) {
                case APPLICATION_FIRST:
                    currentAvailableInvoker = decideInvoker();
                    break;
                case FORCE_APPLICATION:
                    currentAvailableInvoker = serviceDiscoveryInvoker;
                    break;
                case FORCE_INTERFACE:
                default:
                    currentAvailableInvoker = invoker;
            }
    
            return currentAvailableInvoker.invoke(invocation);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    26.5 MockClusterInvoker的invoker

    接下来要走的Invoker逻辑是带有容错逻辑的MockClusterInvoker的invoker
    
    • 1

    MockClusterInvoker类型的invoker

       public Result invoke(Invocation invocation) throws RpcException {
           Result result;
           //判断是否开启服务容错逻辑mock 默认是没有开启的
           String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
           if (ConfigUtils.isEmpty(value)) {
               //no mock
               result = this.invoker.invoke(invocation);
           } else if (value.startsWith(FORCE_KEY)) {
               if (logger.isWarnEnabled()) {
                   logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
               }
               //force:direct mock 
               result = doMockInvoke(invocation, null);
           } else {
               //fail-mock
               try {
                   result = this.invoker.invoke(invocation);
    
                   //fix:#4585
                   if (result.getException() != null && result.getException() instanceof RpcException) {
                       RpcException rpcException = (RpcException) result.getException();
                       if (rpcException.isBiz()) {
                           throw rpcException;
                       } else {
                           result = doMockInvoke(invocation, rpcException);
                       }
                   }
       
               } catch (RpcException e) {
                   if (e.isBiz()) {
                       throw e;
                   }
       
                   if (logger.isWarnEnabled()) {
                       logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
                   }
                   result = doMockInvoke(invocation, e);
               }
           }
           return result;
       }
       ````
    
    
       接下来要走的逻辑是默认类AbstractCluster的invoke方法
      ```java
        public Result invoke(Invocation invocation) throws RpcException {
               return filterInvoker.invoke(invocation);
           }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    26.7 CallbackRegistrationInvoker类型的invoke

    接下来要走的逻辑是带有回调通知的链路:
    CallbackRegistrationInvoker类型的invoke方法

     public Result invoke(Invocation invocation) throws RpcException {
                Result asyncResult = filterInvoker.invoke(invocation);
                asyncResult.whenCompleteWithContext((r, t) -> {
                    for (int i = filters.size() - 1; i >= 0; i--) {
                        FILTER filter = filters.get(i);
                        try {
                            InvocationProfilerUtils.releaseDetailProfiler(invocation);
                            if (filter instanceof ListenableFilter) {
                                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                                Filter.Listener listener = listenableFilter.listener(invocation);
                                try {
                                    if (listener != null) {
                                        if (t == null) {
                                            listener.onResponse(r, filterInvoker, invocation);
                                        } else {
                                            listener.onError(t, filterInvoker, invocation);
                                        }
                                    }
                                } finally {
                                    listenableFilter.removeListener(invocation);
                                }
                            } else if (filter instanceof FILTER.Listener) {
                                FILTER.Listener listener = (FILTER.Listener) filter;
                                if (t == null) {
                                    listener.onResponse(r, filterInvoker, invocation);
                                } else {
                                    listener.onError(t, filterInvoker, invocation);
                                }
                            }
                        } catch (Throwable filterThrowable) {
                            LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));
                            }
                            throw filterThrowable;
                        }
                    }
                });
    
                return asyncResult;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    26.8 CopyOfFilterChainNode类型的invoke

    接下来要走的是过滤器的链路:
    CopyOfFilterChainNode类型的invoke方法

          public Result invoke(Invocation invocation) throws RpcException {
                Result asyncResult;
                try {
                    InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");
                    asyncResult = filter.invoke(nextNode, invocation);
                } catch (Exception e) {
                    InvocationProfilerUtils.releaseDetailProfiler(invocation);
                    if (filter instanceof ListenableFilter) {
                        ListenableFilter listenableFilter = ((ListenableFilter) filter);
                        try {
                            Filter.Listener listener = listenableFilter.listener(invocation);
                            if (listener != null) {
                                listener.onError(e, originalInvoker, invocation);
                            }
                        } finally {
                            listenableFilter.removeListener(invocation);
                        }
                    } else if (filter instanceof FILTER.Listener) {
                        FILTER.Listener listener = (FILTER.Listener) filter;
                        listener.onError(e, originalInvoker, invocation);
                    }
                    throw e;
                } finally {
        
                }
                return asyncResult;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    26.9 第一个过滤器ConsumerContextFilter

        然后走第一个过滤器ConsumerContextFilter
    
    • 1
            public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            RpcContext.RestoreServiceContext originServiceContext = RpcContext.storeServiceContext();
            try {
                RpcContext.getServiceContext()
                    .setInvoker(invoker)
                    .setInvocation(invocation)
                    .setLocalAddress(NetUtils.getLocalHost(), 0);
        
                RpcContext context = RpcContext.getClientAttachment();
                context.setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getApplication());
                if (invocation instanceof RpcInvocation) {
                    ((RpcInvocation) invocation).setInvoker(invoker);
                }
        
                if (CollectionUtils.isNotEmpty(supportedSelectors)) {
                    for (PenetrateAttachmentSelector supportedSelector : supportedSelectors) {
                        Map<String, Object> selected = supportedSelector.select();
                        if (CollectionUtils.isNotEmptyMap(selected)) {
                            ((RpcInvocation) invocation).addObjectAttachments(selected);
                        }
                    }
                } else {
                    ((RpcInvocation) invocation).addObjectAttachments(RpcContext.getServerAttachment().getObjectAttachments());
                }
        
                Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
                if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
                    /**
                     * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
                     * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
                     * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
                     * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
                     */
                    ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
                }
        
                // pass default timeout set by end user (ReferenceConfig)
                Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
                if (countDown != null) {
                    TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
                    if (timeoutCountDown.isExpired()) {
                        return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
                            "No time left for making the following call: " + invocation.getServiceName() + "."
                                + invocation.getMethodName() + ", terminate directly."), invocation);
                    }
                }
        
                RpcContext.removeServerContext();
                return invoker.invoke(invocation);
            } finally {
                RpcContext.restoreServiceContext(originServiceContext);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    26.10 FutureFilter

    继续走过滤器逻辑FutureFilter

        public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
            fireInvokeCallback(invoker, invocation);
            // need to configure if there's return value before the invocation in order to help invoker to judge if it's
            // necessary to return future.
            return invoker.invoke(invocation);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    26.11 过滤器MonitorFilter

    过滤器MonitorFilter

     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
                invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
                invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getServiceContext().getRemoteHost());
                // count up
                getConcurrent(invoker, invocation).incrementAndGet();
            }
            // proceed invocation chain
            return invoker.invoke(invocation);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    26.12 RouterSnapshotFilter过滤器

    RouterSnapshotFilter过滤器

     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            if (!switcher.isEnable()) {
                return invoker.invoke(invocation);
            }
    
            if (!logger.isInfoEnabled()) {
                return invoker.invoke(invocation);
            }
    
            if (!switcher.isEnable(invocation.getServiceModel().getServiceKey())) {
                return invoker.invoke(invocation);
            }
    
            RpcContext.getServiceContext().setNeedPrintRouterSnapshot(true);
            return invoker.invoke(invocation);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    26.13 AbstractClusterInvoker

    容错模版类AbstractClusterInvoker

    public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
    
            // binding attachments into invocation.
    //        Map contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
    //        if (contextAttachments != null && contextAttachments.size() != 0) {
    //            ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(contextAttachments);
    //        }
    
            InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Router route.");
            //从服务目录里面根据路由规则动态查询invoke服务提供者的调用器
            List<Invoker<T>> invokers = list(invocation);
            InvocationProfilerUtils.releaseDetailProfiler(invocation);
            //获取负载均衡策略默认为随机RandomLoadBalance
            LoadBalance loadbalance = initLoadBalance(invokers, invocation);
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Cluster " + this.getClass().getName() + " invoke.");
            try {
                return doInvoke(invocation, invokers, loadbalance);
            } finally {
                InvocationProfilerUtils.releaseDetailProfiler(invocation);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    26.14 FailoverClusterInvoker类型的doInvoke方法

    执行具有失效转移功能的FailoverClusterInvoker类型的doInvoke方法
    ```java
    public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
        List> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        //重试次数计算默认为3
        int len = calculateInvokeTimes(methodName);
        // retry loop.
        RpcException le = null; // last exception.
        List> invoked = new ArrayList>(copyInvokers.size()); // invoked invokers.
        Set providers = new HashSet(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getServiceContext().setInvokers((List) invoked);
            boolean success = false;
            try {
                Result result = invokeWithContext(invoker, invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                success = true;
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                if (!success) {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    
    ## 29.15 AbstractClusterInvoker类型的select
    
    AbstractClusterInvoker类型的select
    负载均衡算法执行:
    ```java
    protected Invoker select(LoadBalance loadbalance, Invocation invocation,
                                    List> invokers, List> selected) throws RpcException {
    
            if (CollectionUtils.isEmpty(invokers)) {
                return null;
            }
            String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
    
            boolean sticky = invokers.get(0).getUrl()
                .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);
    
            //ignore overloaded method
            if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                stickyInvoker = null;
            }
            //ignore concurrency problem
            if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                if (availableCheck && stickyInvoker.isAvailable()) {
                    return stickyInvoker;
                }
            }
    
            Invoker invoker = doSelect(loadbalance, invocation, invokers, selected);
    
            if (sticky) {
                stickyInvoker = invoker;
            }
    
            return invoker;
        }
    
        private Invoker doSelect(LoadBalance loadbalance, Invocation invocation,
                                    List> invokers, List> selected) throws RpcException {
    
            if (CollectionUtils.isEmpty(invokers)) {
                return null;
            }
            if (invokers.size() == 1) {
                Invoker tInvoker = invokers.get(0);
                checkShouldInvalidateInvoker(tInvoker);
                return tInvoker;
            }
            Invoker invoker = loadbalance.select(invokers, getUrl(), invocation);
    
            //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
            boolean isSelected = selected != null && selected.contains(invoker);
            boolean isUnavailable = availableCheck && !invoker.isAvailable() && getUrl() != null;
    
            if (isUnavailable) {
                invalidateInvoker(invoker);
            }
            if (isSelected || isUnavailable) {
                try {
                    Invoker rInvoker = reselect(loadbalance, invocation, invokers, selected, availableCheck);
                    if (rInvoker != null) {
                        invoker = rInvoker;
                    } else {
                        //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                        int index = invokers.indexOf(invoker);
                        try {
                            //Avoid collision
                            invoker = invokers.get((index + 1) % invokers.size());
                        } catch (Exception e) {
                            logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                        }
                    }
                } catch (Throwable t) {
                    logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
                }
            }
    
            return invoker;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    29.16 AbstractClusterInvoker的invokeWithContext

    AbstractClusterInvoker类型的调用
    invokeWithContext调用方法

    ListenerInvokerWrapper的invoke方法
    
    • 1
     public Result invoke(Invocation invocation) throws RpcException {
            return invoker.invoke(invocation);
        }
    
    • 1
    • 2
    • 3

    29.18 AbstractInvoker的invoke

         public Result invoke(Invocation inv) throws RpcException {
            // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
            if (isDestroyed()) {
                logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
            }
    
            RpcInvocation invocation = (RpcInvocation) inv;
        
            // prepare rpc invocation 准备调用初始化一些变量
            prepareInvocation(invocation);
        
            // do invoke rpc invocation and return async result 执行调用
            AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
        
            // wait rpc result if sync
            waitForResultIfSync(asyncResult, invocation);
        
            return asyncResult;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    29.19 AbstractClusterInvoker类型的doInvokeAndReturn

    AbstractClusterInvoker类型的doInvokeAndReturn

    private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {
            AsyncRpcResult asyncResult;
            try {
                asyncResult = (AsyncRpcResult) doInvoke(invocation);
            } catch (InvocationTargetException e) {
                Throwable te = e.getTargetException();
                if (te != null) {
                    // if biz exception
                    if (te instanceof RpcException) {
                        ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                    }
                    asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
                } else {
                    asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
                }
            } catch (RpcException e) {
                // if biz exception
                if (e.isBiz()) {
                    asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
                } else {
                    throw e;
                }
            } catch (Throwable e) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            }
    
            if (setFutureWhenSync || invocation.getInvokeMode() != InvokeMode.SYNC) {
                // set server context
                RpcContext.getServiceContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));
            }
    
            return asyncResult;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    29.20 DubboInvoker类型的doInvoke方法

    DubboInvoker类型的doInvoke方法

        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(PATH_KEY, getUrl().getPath());
            inv.setAttachment(VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = calculateTimeout(invocation, methodName);
                invocation.setAttachment(TIMEOUT_KEY, timeout);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    return AsyncRpcResult.newDefaultAsyncResult(invocation);
                } else {
                    //这里主要看默认的逻辑同步调用 默认的同步调用模式线程池为ThreadlessExecutor 无线程线程池
                    ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                    CompletableFuture<AppResponse> appResponseFuture =
                            currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                    // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                    FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                    AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                    result.setExecutor(executor);
                    return result;
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    29.21 ReferenceCountExchangeClient的request

        public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
            return client.request(request, timeout, executor);
        }
    
    • 1
    • 2
    • 3

    29.22 HeaderExchangeClient的request

         public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
           return channel.request(request, timeout, executor);
       }
       ```
       
    ##  29.22 HeaderExchangeChannel的request方法
    ```java
       public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
           if (closed) {
               throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
           }
           // create request.
           Request req = new Request();
           req.setVersion(Version.getProtocolVersion());
           req.setTwoWay(true);
           req.setData(request);
           DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
           try {
               channel.send(req);
           } catch (RemotingException e) {
               future.cancel();
               throw e;
           }
           return future;
       }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    29.23 AbstractPeer类型的send

     public void send(Object message) throws RemotingException {
            send(message, url.getParameter(Constants.SENT_KEY, false));
        }
    
    
        public void send(Object message, boolean sent) throws RemotingException {
            if (needReconnect && !isConnected()) {
                connect();
            }
            Channel channel = getChannel();
            //TODO Can the value returned by getChannel() be null? need improvement.
            if (channel == null || !channel.isConnected()) {
                throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
            }
            channel.send(message, sent);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    29.24 Dubbo netty4包下的NettyChannel的send

       public void send(Object message, boolean sent) throws RemotingException {
           // whether the channel is closed
           super.send(message, sent);
    
           boolean success = true;
           int timeout = 0;
           try {
               ChannelFuture future = channel.writeAndFlush(message);
               if (sent) {
                   // wait timeout ms
                   timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                   success = future.await(timeout);
               }
               Throwable cause = future.cause();
               if (cause != null) {
                   throw cause;
               }
           } catch (Throwable e) {
               removeChannelIfDisconnected(channel);
               throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
           }
           if (!success) {
               throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
                       + "in timeout(" + timeout + "ms) limit");
           }
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    29.25 AbstractChannel的send

         public void send(Object message, boolean sent) throws RemotingException {
            if (isClosed()) {
                throw new RemotingException(this, "Failed to send message "
                        + (message == null ? "" : message.getClass().getName()) + ":" + PayloadDropper.getRequestWithoutData(message)
                        + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    29.26 netty包下的AbstractChannel的writeAndFlush方法

    public ChannelFuture writeAndFlush(Object msg) {
            return pipeline.writeAndFlush(msg);
        }
    
    
    • 1
    • 2
    • 3
    • 4

    29.27 DefaultChannelPipeline类型的writeAndFlush

       public final ChannelFuture writeAndFlush(Object msg) {
            return tail.writeAndFlush(msg);
        }
    
    
    • 1
    • 2
    • 3
    • 4

    29.28 AbstractChannelHandlerContext类型的writeAndFlush

     public ChannelFuture writeAndFlush(Object msg) {
            return writeAndFlush(msg, newPromise());
        }
     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
            write(msg, true, promise);
            return promise;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    29.29 AbstractChannelHandlerContext类型的write方法

    private void write(Object msg, boolean flush, ChannelPromise promise) {
            ObjectUtil.checkNotNull(msg, "msg");
            try {
                if (isNotValidPromise(promise, true)) {
                    ReferenceCountUtil.release(msg);
                    // cancelled
                    return;
                }
            } catch (RuntimeException e) {
                ReferenceCountUtil.release(msg);
                throw e;
            }
    
            final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                    (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
            final Object m = pipeline.touch(msg, next);
            //executor位NioEventLoop类型对象
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                //生成WriteTask
                final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
                //事件执行器执行任务WriteTask
                if (!safeExecute(executor, task, promise, m, !flush)) {
                    // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
                    // and put it back in the Recycler for re-use later.
                    //
                    // See https://github.com/netty/netty/issues/8343.
                    task.cancel();
                }
            }
        }
    
         private static boolean safeExecute(EventExecutor executor, Runnable runnable,
                ChannelPromise promise, Object msg, boolean lazy) {
            try {
                if (lazy && executor instanceof AbstractEventExecutor) {
                    ((AbstractEventExecutor) executor).lazyExecute(runnable);
                } else {
                    executor.execute(runnable);
                }
                return true;
            } catch (Throwable cause) {
                try {
                    if (msg != null) {
                        ReferenceCountUtil.release(msg);
                    }
                } finally {
                    promise.setFailure(cause);
                }
                return false;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    29.21 SingleThreadEventExecutor类型的execute

     public void execute(Runnable task) {
            ObjectUtil.checkNotNull(task, "task");
            execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
        }
    
        private void execute(Runnable task, boolean immediate) {
            //当前线程是否处于事件循环之中
            boolean inEventLoop = inEventLoop();
            //将当前任务添加到SingleThreadEventExecutor类型的MpscUnboundedArrayQueue taskQueue队列中
            addTask(task);
            if (!inEventLoop) {
                //是否需要开启线程
                startThread();
                if (isShutdown()) {
                    boolean reject = false;
                    try {
                        if (removeTask(task)) {
                            reject = true;
                        }
                    } catch (UnsupportedOperationException e) {
                        // The task queue does not support removal so the best thing we can do is to just move on and
                        // hope we will be able to pick-up the task before its completely terminated.
                        // In worst case we will log on termination.
                    }
                    if (reject) {
                        reject();
                    }
                }
            }
    
            if (!addTaskWakesUp && immediate) {
                wakeup(inEventLoop);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    29.22 SingleThreadEventExecutor类型的wakeup

     protected void wakeup(boolean inEventLoop) {
            if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
                selector.wakeup();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    29.23 SelectedSelectionKeySetSelector类型的wakeup

    public Selector wakeup() {
            return delegate.wakeup();
        }
    
    
    • 1
    • 2
    • 3
    • 4

    29.24 NioEventLoop的wakeup

     protected void wakeup(boolean inEventLoop) {
            if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
                selector.wakeup();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    29.25 我用的mac电脑走的是KQueueSelectorImpl逻辑

      public Selector wakeup() {
            synchronized (interruptLock) {
                if (!interruptTriggered) {
                    try {
                        IOUtil.write1(fd1, (byte)0);
                    } catch (IOException ioe) {
                        throw new InternalError(ioe);
                    }
                    interruptTriggered = true;
                }
            }
            return this;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    S7-200系列西门子plc简介
    【台前调度】使用指南:如何打开和关闭iPadOS 16台前调度
    【ROS】给ROS小车加点Active SLAM的调料
    【PAT甲级】1073 Scientific Notation
    apache和nginx的TLS1.0和TLS1.1禁用处理方案
    适用于 PC 的最佳 Android 操作系统(2022 版)
    C#-文件读写
    OPTEE:TA和TA加载(一)
    删除文件后磁盘空间未释放,只能重启进程?(仅供参考)
    2. 字符串
  • 原文地址:https://blog.csdn.net/songjunyan/article/details/126573647