联博开奖网:你还在忧郁rpc接口超时吗

admin 2个月前 (08-08) 科技 50 1

在使用dubbo时,通常会遇到timeout这个属性,timeout属性的作用是:给某个服务挪用设置超时时间,若是服务在设置的时间内未返回效果,则会抛出挪用超时异常:TimeoutException,在使用的历程中,我们有时会对provider和consumer两个设置都市设置timeout值,那么服务挪用历程中会以哪个为准?橘子同砚今天主要针对这个问题举行剖析和扩展。

三种设置方式

以provider设置为例:

#### 方式级别
<dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl">
   <dubbo:method name="test" timeout="10000"/>
</dubbo:service>
#### 接口级别
<dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl" timeout="10000"/>
#### 全局级别
<dubbo:service ="10000"/>

优先级选择

在dubbo中若是provider和consumer都设置了相同的一个属性,好比本文剖析的timeout,实在它们是有优先级的,consumer方式设置 > provider方式设置 > consumer接口设置 > provider接口设置 > consumer全局设置 > provider全局设置。以是对于小橘最先的提出的问题就有了效果,会以消费者设置的为准,接下连系源码来举行剖析,实在源码很简单,在RegistryDirectory类中将服务列表转换为DubboInvlker方式中举行了处置:

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                        " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                        " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                        ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            // 重点就是下面这个方式
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }
     

重点就是上面mergeUrl方式,将provider和comsumer的url参数举行了整合,在mergeUrl方式有会挪用ClusterUtils.mergeUrl方式举行整合,由于这个方式比较简单,就是对一些参数举行了整合了,会用consumer参数举行笼罩,这里就不剖析了,若是感兴趣的同砚可以去研究一下。

超时处置

在设置设置了超时timeout,那么代码中是若何处置的,这里咱们在举行一下扩展,剖析一下dubbo中是若何处置超时的,在挪用服务方式,最后都市挪用DubboInvoker.doInvoke方式,咱们就从这个方式最先剖析:

  @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                // For compatibility
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);

                Result result;
                // 异步处置
                if (isAsyncFuture) {
                    // register resultCallback, sometimes we need the async result being processed by the filter chain.
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                // 同步处置
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

在这个方式中,就以同步模式举行剖析,看request方式,request()方式会返回一个DefaultFuture类,在去挪用DefaultFuture.get()方式,这里实在涉及到一个在异步中实现同步的技巧,咱们这里不做剖析,以是重点就在get()方式里:

 @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

在挪用get()方式时,会去挪用get(timeout)这个方式,在这个方式中会传一个timeout字段,在和timeout就是咱们设置的谁人参数,在这个方式中咱们要关注下面一个代码块:

 if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    // 线程壅闭
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 在超时时间里,还没有效果,则抛出超时异常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }

重点看await()方式,会举行壅闭timeout时间,若是壅闭时间到了,则会叫醒往下执行,超时跳出while循环中,判断是否有效果返回,若是没有(这个地方要注重:只有有效果返回,或超时才跳出循环中),则抛出超时异常。讲到这里,超时原理基本上实在差不多了,DefaultFuture这个类另有个地方需要注重,在初始化DefaultFuture工具时,会去建立一个超时的延迟义务,延迟时间就是timeout值,在这个延迟义务中也会挪用signal()方式叫醒壅闭。

分批挪用

不外在挪用rpc远程接口,若是对方的接口不能一次承载返回请求效果能力,我们一样平常做法是分批挪用,将挪用一次分成挪用多次,然后对每次效果举行汇聚,固然也可以做用行使多线程的能力去执行。后面文章小橘将会先容这种模式,敬请关注哦!

/**
 * Description:通用 分批挪用工具类
 * 场景:
 * <pre>
 *     好比List参数的size可能为 几十个甚至上百个
 *     若是invoke接口比较慢,传入50个以上会超时,那么可以每次传入20个,分批执行。
 * </pre>
 * Author: OrangeCsong
 */
public class ParallelInvokeUtil {

    private ParallelInvokeUtil() {}

    /**
     * @param sourceList 源数据
     * @param size 分批巨细
     * @param buildParam 构建函数
     * @param processFunction 处置函数
     * @param <R> 返回值
     * @param <T> 入参\
     * @param <P> 构建参数
     * @return
     */
    public static <R, T, P> List<R> partitionInvokeWithRes(List<T> sourceList, Integer size,
                                                           Function<List<T>, P> buildParam,
                                                           Function<P, List<R>> processFunction) {

        if (CollectionUtils.isEmpty(sourceList)) {
            return new ArrayList<>(0);
        }
        Preconditions.checkArgument(size > 0, "size巨细必须大于0");

        return Lists.partition(sourceList, size).stream()
                .map(buildParam)
                .map(processFunction)
                .filter(Objects::nonNull)
                .reduce(new ArrayList<>(),
                        (resultList1, resultList2) -> {
                            resultList1.addAll(resultList2);
                            return resultList1;
                        });

    }
}

本文由博客群发一文多发等运营工具平台 OpenWrite 公布

,

欧博开户www.allbetgame.us

欧博开户www.allbetgame.us

Allbet声明:该文看法仅代表作者自己,与本平台无关。转载请注明:联博开奖网:你还在忧郁rpc接口超时吗

网友评论

  • (*)

最新评论

  • Allbet客户端下载 2020-08-08 00:10:39 回复

    欧博网址www.ludiealliedinstitute.com欢迎进入欧博网址(Allbet Gaming),欧博网址开放会员注册、代理开户、电脑客户端下载、苹果安卓下载等业务。蛮用心的

    1

站点信息

  • 文章总数:437
  • 页面总数:0
  • 分类总数:8
  • 标签总数:970
  • 评论总数:117
  • 浏览总数:3358