更新时间:2021年03月26日 13时44分35秒 来源:黑马程序员
因为微服务是目前互联网公司比较流行的架构,所以spring就提供了一个顶级框架-spring cloud,来解决我们在开发微服务架构中遇到的各种各样的问题,今天的主角是spring cloud 框架中集成的组件Ribbon,那么Ribbon能解决什么问题呢,我们来思考下面的问题。
微服务架构中的每个服务为了高可用,很大程度上都会进行集群,我们假设现在集群了3个user服务,同时能提供相同的服务,问题来了,我们如何决定调用这3个user服务中的哪一个呢?
根据不同分析角度,会有不同的答案,也可以理解为根据不同的情况,我们可以写不同的算法,来决定到底此时此刻,调用这3个user服务的哪一个,那么,Ribbon就给我们提供了不同的算法,我们可以根据业务场景,调整配置文件,决定到底使用哪个算法,这样,算法中就会计算出调用哪个user服务了。
1)我们准备一个eureka注册中心
2)再准备一个order服务
3)再准备3个相同代码的user服务,这样,order服务通过eureka注册中心,就可以发现user的3个服务
Ribbon是通过IRule的这个接口来选择3个user服务中的哪个的,但是实际执行的代码肯定是继承了这个接口的实现类,所以选择不同的实现类,就会选择不同负载均衡策略
public interface IRule { Server choose(Object var1); void setLoadBalancer(ILoadBalancer var1); ILoadBalancer getLoadBalancer(); }
此策略是Ribbon的默认策略,是按照顺序,依次对所有的user服务进行访问。
通过重写IRule的choose方法,来选择并返回决定调用的user服务,在下面的源码中,List
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } else { Server server = null; int count = 0; while(true) { if (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); //总服务实例数量 int serverCount = allServers.size(); if (upCount != 0 && serverCount != 0) { int nextServerIndex = this.incrementAndGetModulo(serverCount); server = (Server)allServers.get(nextServerIndex); if (server == null) { Thread.yield(); } else { if (server.isAlive() && server.isReadyToServe()) { return server; } server = null; } continue; } log.warn("No up servers available from load balancer: " + lb); return null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } } }
debug的图例:
第一次访问:访问的是第一个实例
第二次访问:访问的是第二个实例
就和这个策略的名字一样,是对user的3个服务的随机调用,所以不存在规律,如下源码中int index = this.chooseRandomInt(serverCount); 通过随机数来选择下标,所以对user服务的调用是随机的
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } else { Server server = null; while(server == null) { if (Thread.interrupted()) { return null; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int index = this.chooseRandomInt(serverCount); server = (Server)upList.get(index); if (server == null) { Thread.yield(); } else { if (server.isAlive()) { return server; } server = null; Thread.yield(); } } return server; } }
debug的图例:
第一次访问:访问的是第一个实例
第二次访问:访问的还是第一个实例
第三次访问:访问的是第三个实例
根据user的3个服务的响应时间来分配权重,响应时间越长的服务,权重越低,那么被调用的概率也就越低。相反,响应时间越短的服务,权重越高,被调用的概率也就越高
响应时间加权重策略的实现分为两步:
extends RoundRobinRule
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } else { Server server = null; while(server == null) { List<Double> currentWeights = this.accumulatedWeights; if (Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1); //在30秒之内,maxTotalWeight变量会一直是0.0 if (maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) { double randomWeight = this.random.nextDouble() * maxTotalWeight; int n = 0; for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) { Double d = (Double)var13.next(); if (d >= randomWeight) { serverIndex = n; break; } } server = (Server)allList.get(serverIndex); } else { server = super.choose(this.getLoadBalancer(), key); if (server == null) { return server; } } if (server == null) { Thread.yield(); } else { if (server.isAlive()) { return server; } server = null; } } return server; } }
debug的图例:
前几次访问:maxTotalWeight都是0.0,使用轮询策略,但是开始缓存权重数据
30秒之后:开始根据权重数据来分配权重,选择实例
如下图:8081端口的权重显然没有8082的权重大,所以8082端口的user服务实例被访问的次数多
重试策略是指通过轮询策略选出一个实例,然后去访问,如果此实例为null或者已经失效,那么会重试其他的实例,answer = this.subRule.choose(key); 会根据轮询策略选择一个实例,然后if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline)判断如果实例为null或者失效,那么会重新选择
public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + this.maxRetryMillis; Server answer = null; answer = this.subRule.choose(key); if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while(!Thread.interrupted()) { answer = this.subRule.choose(key); if (answer != null && answer.isAlive() || System.currentTimeMillis() >= deadline) { break; } Thread.yield(); } task.cancel(); } return answer != null && answer.isAlive() ? answer : null; }
会根据每个服务实例的并发数量来决定,访问并发数最少的那个服务,int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); 会获得当前遍历的实例的并发数,然后和其他的实例的并发数进行判断,最终访问并发量最少的那个实例
public Server choose(Object key) { if (this.loadBalancerStats == null) { return super.choose(key); } else { List<Server> serverList = this.getLoadBalancer().getAllServers(); int minimalConcurrentConnections = 2147483647; long currentTime = System.currentTimeMillis(); Server chosen = null; Iterator var7 = serverList.iterator(); while(var7.hasNext()) { //遍历所有的实例 Server server = (Server)var7.next(); ServerStats serverStats = this.loadBalancerStats.getSingleServerStat(server); if (!serverStats.isCircuitBreakerTripped(currentTime)) { int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); //判断并发数,并和已经判断出的最少的并发数比较 if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } if (chosen == null) { return super.choose(key); } else { return chosen; } } }
此策略会聪明的过滤掉一直失败并被标记为circuit tripped的user服务,而且会过滤掉那些高并发的user服务
public Server choose(Object key) { int count = 0; for(Server server = this.roundRobinRule.choose(key); count++ <= 10; server = this.roundRobinRule.choose(key)) { //通过predicate来过滤 if (this.predicate.apply(new PredicateKey(server))) { return server; } } //过滤掉一些服务之后,会采用轮询的方式调用剩下的服务 return super.choose(key); }
此策略本身并没有实现什么特殊的处理逻辑,但是可以通过重置LoadBalancer来达到自定义一些高级策略的目的,可以重写initWithNiwsConfig和setLoadBalancer
public void initWithNiwsConfig(IClientConfig clientConfig) { this.roundRobinRule = new RoundRobinRule(); } public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); this.roundRobinRule.setLoadBalancer(lb); } public Server choose(Object key) { if (this.roundRobinRule != null) { return this.roundRobinRule.choose(key); } else { throw new IllegalArgumentException("This class has not been initialized with the RoundRobinRule class"); } }
猜你喜欢: