阿八博客
  • 100000+

    文章

  • 23

    评论

  • 20

    友链

  • 最近新加了很多技术文章,大家多来逛逛吧~~~~
  • 喜欢这个网站的朋友可以加一下QQ群,我们一起交流技术。

聊聊NacosNamingService的selectOneHealthyInstance

欢迎来到阿八个人博客网站。本 阿八个人博客 网站提供最新的站长新闻,各种互联网资讯。 喜欢本站的朋友可以收藏本站,或者加QQ:我们大家一起来交流技术! URL链接:https://www.abboke.com/jsh/2019/1010/116479.html

本文主要研究一下NacosNamingService的selectOneHealthyInstance

NacosNamingService

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java

public class NacosNamingService implements NamingService {    private static final String DEFAULT_PORT = "8080";    private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);    /**     * Each Naming instance should have different namespace.     */    private String namespace;    private String endpoint;    private String serverList;    private String cacheDir;    private String logName;    private HostReactor hostReactor;    private BeatReactor beatReactor;    private EventDispatcher eventDispatcher;    private NamingProxy serverProxy;    //......    @Override    public Instance selectOneHealthyInstance(String serviceName) throws NacosException {        return selectOneHealthyInstance(serviceName, new ArrayList<String>());    }    @Override    public Instance selectOneHealthyInstance(String serviceName, String groupName) throws NacosException {        return selectOneHealthyInstance(serviceName, groupName, true);    }    @Override    public Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException {        return selectOneHealthyInstance(serviceName, new ArrayList<String>(), subscribe);    }    @Override    public Instance selectOneHealthyInstance(String serviceName, String groupName, boolean subscribe) throws NacosException {        return selectOneHealthyInstance(serviceName, groupName, new ArrayList<String>(), subscribe);    }    @Override    public Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException {        return selectOneHealthyInstance(serviceName, clusters, true);    }    @Override    public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters) throws NacosException {        return selectOneHealthyInstance(serviceName, groupName, clusters, true);    }    @Override    public Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe)        throws NacosException {        return selectOneHealthyInstance(serviceName, Constants.DEFAULT_GROUP, clusters, subscribe);    }    @Override    public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {        if (subscribe) {            return Balancer.RandomByWeight.selectHost(                hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));        } else {            return Balancer.RandomByWeight.selectHost(                hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));        }    }    //......}
selectOneHealthyInstance跟selectInstances类似,只不过它返回的是单个instance;selectOneHealthyInstance也是先从hostReactor获取serviceInfo如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo获取到serviceInfo之后,selectOneHealthyInstance通过Balancer.RandomByWeight.selectHost方法来选取单个healthy的instance

Balancer

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/Balancer.java

public class Balancer {    /**     * report status to server     */    public final static List<String> UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER = new CopyOnWriteArrayList<String>();    public static class RandomByWeight {        public static List<Instance> selectAll(ServiceInfo serviceInfo) {            List<Instance> hosts = serviceInfo.getHosts();            if (CollectionUtils.isEmpty(hosts)) {                throw new IllegalStateException("no host to srv for serviceInfo: " + serviceInfo.getName());            }            return hosts;        }        public static Instance selectHost(ServiceInfo dom) {            List<Instance> hosts = selectAll(dom);            if (CollectionUtils.isEmpty(hosts)) {                throw new IllegalStateException("no host to srv for service: " + dom.getName());            }            return getHostByRandomWeight(hosts);        }    }    /**     * Return one host from the host list by random-weight.     *     * @param hosts The list of the host.     * @return The random-weight result of the host     */    protected static Instance getHostByRandomWeight(List<Instance> hosts) {        NAMING_LOGGER.debug("entry randomWithWeight");        if (hosts == null || hosts.size() == 0) {            NAMING_LOGGER.debug("hosts == null || hosts.size() == 0");            return null;        }        Chooser<String, Instance> vipChooser = new Chooser<String, Instance>("www.taobao.com");        NAMING_LOGGER.debug("new Chooser");        List<Pair<Instance>> hostsWithWeight = new ArrayList<Pair<Instance>>();        for (Instance host : hosts) {            if (host.isHealthy()) {                hostsWithWeight.add(new Pair<Instance>(host, host.getWeight()));            }        }        NAMING_LOGGER.debug("for (Host host : hosts)");        vipChooser.refresh(hostsWithWeight);        NAMING_LOGGER.debug("vipChooser.refresh");        return vipChooser.randomWithWeight();    }}
Balancer的RandomByWeight提供了selectAll及selectHost方法;selectAll针对serviceInfo.getHosts()进行了空判断,空的话会抛出IllegalStateExceptionselectHost方法内部调用了selectAll方法,其最后通过getHostByRandomWeight来选取单个healthy的instancegetHostByRandomWeight方法首先创建一个Chooser,然后选取healthy的instance构造hostsWithWeight,再通过vipChooser.refresh(hostsWithWeight)进行refresh,最后通过vipChooser.randomWithWeight()选取单个healthy的instance

Chooser

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java

public class Chooser<K, T> {    private K uniqueKey;    private volatile Ref<T> ref;    public T random() {        List<T> items = ref.items;        if (items.size() == 0) {            return null;        }        if (items.size() == 1) {            return items.get(0);        }        return items.get(ThreadLocalRandom.current().nextInt(items.size()));    }    public T randomWithWeight() {        Ref<T> ref = this.ref;        double random = ThreadLocalRandom.current().nextDouble(0, 1);        int index = Arrays.binarySearch(ref.weights, random);        if (index < 0) {            index = -index - 1;        } else {            return ref.items.get(index);        }        if (index >= 0 && index < ref.weights.length) {            if (random < ref.weights[index]) {                return ref.items.get(index);            }        }        /* This should never happen, but it ensures we will return a correct         * object in case there is some floating point inequality problem         * wrt the cumulative probabilities. */        return ref.items.get(ref.items.size() - 1);    }    public Chooser(K uniqueKey) {        this(uniqueKey, new ArrayList<Pair<T>>());    }    public Chooser(K uniqueKey, List<Pair<T>> pairs) {        Ref<T> ref = new Ref<T>(pairs);        ref.refresh();        this.uniqueKey = uniqueKey;        this.ref = ref;    }    public K getUniqueKey() {        return uniqueKey;    }    public Ref<T> getRef() {        return ref;    }    public void refresh(List<Pair<T>> itemsWithWeight) {        Ref<T> newRef = new Ref<T>(itemsWithWeight);        newRef.refresh();        newRef.poller = this.ref.poller.refresh(newRef.items);        this.ref = newRef;    }    //......}
Chooser的refresh方法会根据itemsWithWeight创建Ref,然后执行Ref的refresh方法;randomWithWeight方法通过Arrays.binarySearch(ref.weights, random)创建初始index,然后根据index从ref.items获取元素

Ref

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java

    public class Ref<T> {        private List<Pair<T>> itemsWithWeight = new ArrayList<Pair<T>>();        private List<T> items = new ArrayList<T>();        private Poller<T> poller = new GenericPoller<T>(items);        private double[] weights;        @SuppressWarnings("unchecked")        public Ref(List<Pair<T>> itemsWithWeight) {            this.itemsWithWeight = itemsWithWeight;        }        public void refresh() {            Double originWeightSum = (double) 0;            for (Pair<T> item : itemsWithWeight) {                double weight = item.weight();                //ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest                if (weight <= 0) {                    continue;                }                items.add(item.item());                if (Double.isInfinite(weight)) {                    weight = 10000.0D;                }                if (Double.isNaN(weight)) {                    weight = 1.0D;                }                originWeightSum += weight;            }            double[] exactWeights = new double[items.size()];            int index = 0;            for (Pair<T> item : itemsWithWeight) {                double singleWeight = item.weight();                //ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest                if (singleWeight <= 0) {                    continue;                }                exactWeights[index++] = singleWeight / originWeightSum;            }            weights = new double[items.size()];            double randomRange = 0D;            for (int i = 0; i < index; i++) {                weights[i] = randomRange + exactWeights[i];                randomRange += exactWeights[i];            }            double doublePrecisionDelta = 0.0001;            if (index == 0 || (Math.abs(weights[index - 1] - 1) < doublePrecisionDelta)) {                return;            }            throw new IllegalStateException("Cumulative Weight caculate wrong , the sum of probabilities does not equals 1.");        }        //......    }
Ref的refresh方法主要是初始化items及weights

小结

selectOneHealthyInstance跟selectInstances类似,只不过它返回的是单个instance;selectOneHealthyInstance也是先从hostReactor获取serviceInfo如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo获取到serviceInfo之后,selectOneHealthyInstance通过Balancer.RandomByWeight.selectHost方法来选取单个healthy的instance

doc

NacosNamingService

相关文章

暂住......别动,不想说点什么吗?
  • 全部评论(0
    还没有评论,快来抢沙发吧!