publicclassRandomLoadBalanceextendsAbstractLoadBalance{ publicstaticfinal String NAME = "random";
/** * Select one invoker between a list using a random criteria */ @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){ // Number of invokers int length = invokers.size(); // Every invoker has the same weight? boolean sameWeight = true; // the weight of every invokers int[] weights = newint[length]; // the first invoker's weight int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // The sum of weights int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // save for later use weights[i] = weight; // Sum totalWeight += weight; if (sameWeight && weight != firstWeight) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
publicclassLeastActiveLoadBalanceextendsAbstractLoadBalance{ publicstaticfinal String NAME = "leastactive";
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){ // Number of invokers int length = invokers.size(); // The least active value of all invokers int leastActive = -1; // The number of invokers having the same least active value (leastActive) int leastCount = 0; // The index of invokers having the same least active value (leastActive) int[] leastIndexes = newint[length]; // the weight of every invokers int[] weights = newint[length]; // The sum of the warmup weights of all the least active invokers int totalWeight = 0; // The weight of the first least active invoker int firstWeight = 0; // Every least active invoker has the same weight value? boolean sameWeight = true; // 选择出所有最小活跃数 invokers // Filter out all the least active invokers for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // Get the active number of the invoker int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Get the weight of the invoker's configuration. The default value is 100. int afterWarmup = getWeight(invoker, invocation); // save for later use weights[i] = afterWarmup; // If it is the first invoker or the active number of the invoker is less than the current least active number // 当出现一个比所有都小的一个活跃数,则重新设置一些属性 if (leastActive == -1 || active < leastActive) { // Reset the active number of the current invoker to the least active number leastActive = active; // Reset the number of least active invokers leastCount = 1; // Put the first least active invoker first in leastIndexes leastIndexes[0] = i; // Reset totalWeight totalWeight = afterWarmup; // Record the weight the first least active invoker firstWeight = afterWarmup; // Each invoke has the same weight (only one invoker here) sameWeight = true; // If current invoker's active value equals with leaseActive, then accumulating. // 如果最小活跃数相同,则将最小活跃数的 invoker 的索引放入数组,然后累加总权重 } elseif (active == leastActive) { // Record the index of the least active invoker in leastIndexes order leastIndexes[leastCount++] = i; // Accumulate the total weight of the least active invoker totalWeight += afterWarmup; // If every invoker has the same weight? if (sameWeight && afterWarmup != firstWeight) { sameWeight = false; } } } // Choose an invoker from all the least active invokers if (leastCount == 1) { // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexes[0]); } // 权重不同,并且有多个最小活跃数,则进行随机加权算法选出一个 invoker if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on // totalWeight. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. // 所有权重都一样,则随机选取一个 invoker return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
privatefinal ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){ String methodName = RpcUtils.getMethodName(invocation); // 类似:org.apache.dubbo.demo.DemoService.sayHello String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // using the hashcode of list to compute the hash only pay attention to the elements in the list // 获取 集合 invokers 的原始 hash 值,其目的用来判断集合是否有改变 int invokersHashCode = invokers.hashCode(); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); // 如果 selector 为 null 或者 invokers 集合有变动,则进行重新hash计算 if (selector == null || selector.identityHashCode != invokersHashCode) { // 对服务列表 invokers 进行创建一个 hash 节点 ConsistentHashSelector<T> hashSelector = new ConsistentHashSelector<>(invokers, methodName, invokersHashCode); selectors.put(key, hashSelector); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); }
privatestaticfinalclassConsistentHashSelector<T> { privatefinal TreeMap<Long, Invoker<T>> virtualInvokers; privatefinalint replicaNumber; privatefinalint identityHashCode; privatefinalint[] argumentIndex; // 初始化 hash 虚拟节点 ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { // new 一个 TreeMap 用来存储 hash 和 invoker 映射 this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 获取要创建的节点个数,缺省配置为160 this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); // 获取参与 hash 计算的参数下标 String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = newint[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 进行循环服务列表,计算 hash 并设置到虚拟节点 virtualInvokers 中 for (Invoker<T> invoker : invokers) { // 获取服务器地址,类似:127.0.0.1:8082 String address = invoker.getUrl().getAddress(); // 循环节点的四分之一次,因为每次会进行虚拟4个节点 for (int i = 0; i < replicaNumber / 4; i++) { // 对地址进行+i,然后做md5计算:127.0.0.1:80820 // 对 address + i 进行 md5 运算,得到一个长度为16的字节数组 byte[] digest = md5(address + i); // 对 digest 部分字节进行4次 hash 运算,得到四个不同的 long 型正整数 for (int h = 0; h < 4; h++) { // h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进行位运算 // h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进行位运算 // h = 2, h = 3 时过程同上 long m = hash(digest, h); // 将 hash 到 invoker 的映射关系存储到 virtualInvokers 中, // virtualInvokers 需要提供高效的查询操作,因此选用 TreeMap 作为存储结构 virtualInvokers.put(m, invoker); } } } } // public Invoker<T> select(Invocation invocation){ String key = toKey(invocation.getArguments()); byte[] digest = md5(key); // 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法, // 寻找合适的 Invoker return selectForKey(hash(digest, 0)); }
private String toKey(Object[] args){ StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); }
publicstaticfinal String NAME = "shortestresponse";
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){ // Number of invokers int length = invokers.size(); // Estimated shortest response time of all invokers long shortestResponse = Long.MAX_VALUE; // The number of invokers having the same estimated shortest response time int shortestCount = 0; // The index of invokers having the same estimated shortest response time int[] shortestIndexes = newint[length]; // the weight of every invokers int[] weights = newint[length]; // The sum of the warmup weights of all the shortest response invokers int totalWeight = 0; // The weight of the first shortest response invokers int firstWeight = 0; // Every shortest response invoker has the same weight value? boolean sameWeight = true;
// Filter out all the shortest response invokers for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // Calculate the estimated response time from the product of active connections and succeeded average elapsed time. long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed(); int active = rpcStatus.getActive(); long estimateResponse = succeededAverageElapsed * active; int afterWarmup = getWeight(invoker, invocation); weights[i] = afterWarmup; // Same as LeastActiveLoadBalance if (estimateResponse < shortestResponse) { shortestResponse = estimateResponse; shortestCount = 1; shortestIndexes[0] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true; } elseif (estimateResponse == shortestResponse) { shortestIndexes[shortestCount++] = i; totalWeight += afterWarmup; if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } if (shortestCount == 1) { return invokers.get(shortestIndexes[0]); } if (!sameWeight && totalWeight > 0) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < shortestCount; i++) { int shortestIndex = shortestIndexes[i]; offsetWeight -= weights[shortestIndex]; if (offsetWeight < 0) { return invokers.get(shortestIndex); } } } return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]); } }