1.微服务篇
1.1.SpringCloud常见组件有哪些?
问题说明 :这个题目主要考察对SpringCloud的组件基本了解
难易程度 :简单
参考话术 :
SpringCloud包含的组件很多,有很多功能是重复的。其中最常用组件包括:
•注册中心组件:Eureka、Nacos等
•负载均衡组件:Ribbon
•远程调用组件:OpenFeign
•网关组件:Zuul、Gateway
•服务保护组件:Hystrix、Sentinel
•服务配置管理组件:SpringCloudConfig、Nacos
1.2.Nacos的服务注册表结构是怎样的?
问题说明 :考察对Nacos数据分级结构的了解,以及Nacos源码的掌握情况
难易程度 :一般
参考话术 :
Nacos采用了数据的分级存储模型,最外层是Namespace,用来隔离环境。然后是Group,用来对服务分组。接下来就是服务(Service)了,一个服务包含多个实例,但是可能处于不同机房,因此Service下有多个集群(Cluster),Cluster下是不同的实例(Instance)。
对应到Java代码中,Nacos采用了一个多层的Map来表示。结构为Map<String,
Map<String,
Service>>,其中最外层Map的key就是namespaceId,值是一个Map。内层Map的key是group拼接serviceName,值是Service对象。Service对象内部又是一个Map,key是集群名称,值是Cluster对象。而Cluster对象内部维护了Instance的集合。
如图:
1.3.Nacos如何支撑阿里内部数十万服务注册压力?
问题说明 :考察对Nacos源码的掌握情况
难易程度 :难
参考话术 :
Nacos内部接收到注册的请求时,不会立即写数据,而是将服务注册的任务放入一个阻塞队列就立即响应给客户端。然后利用线程池读取阻塞队列中的任务,异步来完成实例更新,从而提高并发写能力。
1.4.Nacos如何避免并发读写冲突问题?
问题说明 :考察对Nacos源码的掌握情况
难易程度 :难
参考话术 :
Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将旧的实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。
这样在更新的过程中,就不会对读实例列表的请求产生影响,也不会出现脏读问题了。
1.5.Nacos与Eureka的区别有哪些?
问题说明 :考察对Nacos、Eureka的底层实现的掌握情况
难易程度 :难
参考话术 :
Nacos与Eureka有相同点,也有不同之处,可以从以下几点来描述:
接口方式 :Nacos与Eureka都对外暴露了Rest风格的API接口,用来实现服务注册、发现等功能
实例类型 :Nacos的实例有永久和临时实例之分;而Eureka只支持临时实例
健康检测 :Nacos对临时实例采用心跳模式检测,对永久实例采用主动请求来检测;Eureka只支持心跳模式
服务发现 :Nacos支持定时拉取和订阅推送两种模式;Eureka只支持定时拉取模式
1.6.Sentinel的限流与Gateway的限流有什么差别?
问题说明 :考察对限流算法的掌握情况
难易程度 :难
参考话术 :
限流算法常见的有三种实现:滑动时间窗口、令牌桶算法、漏桶算法。Gateway则采用了基于Redis实现的令牌桶算法。
而Sentinel内部却比较复杂:
默认限流模式是基于滑动时间窗口算法
排队等待的限流模式则基于漏桶算法
而热点参数限流则是基于令牌桶算法
1.7.Sentinel的线程隔离与Hystix的线程隔离有什么差别?
问题说明 :考察对线程隔离方案的掌握情况
难易程度 :一般
参考话术 :
Hystix默认是基于线程池实现的线程隔离,每一个被隔离的业务都要创建一个独立的线程池,线程过多会带来额外的CPU开销,性能一般,但是隔离性更强。
Sentinel是基于信号量(计数器)实现的线程隔离,不用创建线程池,性能较好,但是隔离性一般。
2.MQ篇
2.1.你们为什么选择了RabbitMQ而不是其它的MQ?
如图:
话术:
kafka是以吞吐量高而闻名,不过其数据稳定性一般,而且无法保证消息有序性。我们公司的日志收集也有使用,业务模块中则使用的RabbitMQ。
阿里巴巴的RocketMQ基于Kafka的原理,弥补了Kafka的缺点,继承了其高吞吐的优势,其客户端目前以Java为主。但是我们担心阿里巴巴开源产品的稳定性,所以就没有使用。
RabbitMQ基于面向并发的语言Erlang开发,吞吐量不如Kafka,但是对我们公司来讲够用了。而且消息可靠性较好,并且消息延迟极低,集群搭建比较方便。支持多种协议,并且有各种语言的客户端,比较灵活。Spring对RabbitMQ的支持也比较好,使用起来比较方便,比较符合我们公司的需求。
综合考虑我们公司的并发需求以及稳定性需求,我们选择了RabbitMQ。
2.2.RabbitMQ如何确保消息的不丢失?
话术:
RabbitMQ针对消息传递过程中可能发生问题的各个地方,给出了针对性的解决方案:
生产者发送消息时可能因为网络问题导致消息没有到达交换机:
RabbitMQ提供了publisher confirm机制
生产者发送消息后,可以编写ConfirmCallback函数
消息成功到达交换机后,RabbitMQ会调用ConfirmCallback通知消息的发送者,返回ACK
消息如果未到达交换机,RabbitMQ也会调用ConfirmCallback通知消息的发送者,返回NACK
消息超时未发送成功也会抛出异常
消息到达交换机后,如果未能到达队列,也会导致消息丢失:
RabbitMQ提供了publisher return机制
生产者可以定义ReturnCallback函数
消息到达交换机,未到达队列,RabbitMQ会调用ReturnCallback通知发送者,告知失败原因
消息到达队列后,MQ宕机也可能导致丢失消息:
RabbitMQ提供了持久化功能,集群的主从备份功能
消息持久化,RabbitMQ会将交换机、队列、消息持久化到磁盘,宕机重启可以恢复消息
镜像集群,仲裁队列,都可以提供主从备份功能,主节点宕机,从节点会自动切换为主,数据依然在
消息投递给消费者后,如果消费者处理不当,也可能导致消息丢失
SpringAMQP基于RabbitMQ提供了消费者确认机制、消费者重试机制,消费者失败处理策略:
消费者的确认机制:
消费者处理消息成功,未出现异常时,Spring返回ACK给RabbitMQ,消息才被移除
消费者处理消息失败,抛出异常,宕机,Spring返回NACK或者不返回结果,消息不被异常
消费者重试机制:
默认情况下,消费者处理失败时,消息会再次回到MQ队列,然后投递给其它消费者。Spring提供的消费者重试机制,则是在处理失败后不返回NACK,而是直接在消费者本地重试。多次重试都失败后,则按照消费者失败处理策略来处理消息。避免了消息频繁入队带来的额外压力。
消费者失败策略:
当消费者多次本地重试失败时,消息默认会丢弃。
Spring提供了Republish策略,在多次重试都失败,耗尽重试次数后,将消息重新投递给指定的异常交换机,并且会携带上异常栈信息,帮助定位问题。
2.3.RabbitMQ如何避免消息堆积?
话术:
消息堆积问题产生的原因往往是因为消息发送的速度超过了消费者消息处理的速度。因此解决方案无外乎以下三点:
提高消费者处理速度
增加更多消费者
增加队列消息存储上限
1)提高消费者处理速度
消费者处理速度是由业务代码决定的,所以我们能做的事情包括:
尽可能优化业务代码,提高业务性能
接收到消息后,开启线程池,并发处理多个消息
优点:成本低,改改代码即可
缺点:开启线程池会带来额外的性能开销,对于高频、低时延的任务不合适。推荐任务执行周期较长的业务。
2)增加更多消费者
一个队列绑定多个消费者,共同争抢任务,自然可以提供消息处理的速度。
优点:能用钱解决的问题都不是问题。实现简单粗暴
缺点:问题是没有钱。成本太高
3)增加队列消息存储上限
在RabbitMQ的1.8版本后,加入了新的队列模式:Lazy Queue
这种队列不会将消息保存在内存中,而是在收到消息后直接写入磁盘中,理论上没有存储上限。可以解决消息堆积问题。
优点:磁盘存储更安全;存储无上限;避免内存存储带来的Page
Out问题,性能更稳定;
缺点:磁盘存储受到IO性能的限制,消息时效性不如内存模式,但影响不大。
2.4.RabbitMQ如何保证消息的有序性?
话术:
其实RabbitMQ是队列存储,天然具备先进先出的特点,只要消息的发送是有序的,那么理论上接收也是有序的。不过当一个队列绑定了多个消费者时,可能出现消息轮询投递给消费者的情况,而消费者的处理顺序就无法保证了。
因此,要保证消息的有序性,需要做的下面几点:
保证消息发送的有序性
保证一组有序的消息都发送到同一个队列
保证一个队列只包含一个消费者
2.5.如何防止MQ消息被重复消费?
话术:
消息重复消费的原因多种多样,不可避免。所以只能从消费者端入手,只要能保证消息处理的幂等性就可以确保消息不被重复消费。
而幂等性的保证又有很多方案:
给每一条消息都添加一个唯一id,在本地记录消息表及消息状态,处理消息时基于数据库表的id唯一性做判断
同样是记录消息表,利用消息状态字段实现基于乐观锁的判断,保证幂等
基于业务本身的幂等性。比如根据id的删除、查询业务天生幂等;新增、修改等业务可以考虑基于数据库id唯一性、或者乐观锁机制确保幂等。本质与消息表方案类似。
2.6.如何保证RabbitMQ的高可用?
话术:
要实现RabbitMQ的高可用无外乎下面两点:
做好交换机、队列、消息的持久化
搭建RabbitMQ的镜像集群,做好主从备份。当然也可以使用仲裁队列代替镜像集群。
2.7.使用MQ可以解决那些问题?
话术:
RabbitMQ能解决的问题很多,例如:
解耦合:将几个业务关联的微服务调用修改为基于MQ的异步通知,可以解除微服务之间的业务耦合。同时还提高了业务性能。
流量削峰:将突发的业务请求放入MQ中,作为缓冲区。后端的业务根据自己的处理能力从MQ中获取消息,逐个处理任务。流量曲线变的平滑很多
延迟队列:基于RabbitMQ的死信队列或者DelayExchange插件,可以实现消息发送后,延迟接收的效果。
3.Redis篇
3.1.Redis与Memcache的区别?
redis支持更丰富的数据类型
(支持更复杂的应用场景):Redis不仅仅支持简单的k/v类型的数据,同时还提供list,set,zset,hash等数据结构的存储。memcache支持简单的数据类型,String。
Redis支持数据的持久化
,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用,而Memecache把数据全部存在内存之中。
集群模式
:memcached没有原生的集群模式,需要依靠客户端来实现往集群中分片写入数据;但是
redis 目前是原生支持 cluster 模式的.
Redis使用单线程
:Memcached是多线程,非阻塞IO复用的网络模型;Redis使用单线程的多路
IO 复用模型。
1574821356723
3.2.Redis的单线程问题
面试官 :Redis采用单线程,如何保证高并发?
面试话术 :
Redis快的主要原因是:
完全基于内存
数据结构简单,对数据操作也简单
使用多路 I/O 复用模型,充分利用CPU资源
面试官 :这样做的好处是什么?
面试话术 :
单线程优势有下面几点:
代码更清晰,处理逻辑更简单
不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为锁而导致的性能消耗
不存在多进程或者多线程导致的CPU切换,充分利用CPU资源
3.2.Redis的持久化方案由哪些?
相关资料:
1)RDB 持久化
RDB持久化可以使用save或bgsave,为了不阻塞主进程业务,一般都使用bgsave,流程:
Redis 进程会 fork 出一个子进程(与父进程内存数据一致)。
父进程继续处理客户端请求命令
由子进程将内存中的所有数据写入到一个临时的 RDB 文件中。
完成写入操作之后,旧的 RDB 文件会被新的 RDB 文件替换掉。
下面是一些和 RDB 持久化相关的配置:
save 60 10000
:如果在 60 秒内有 10000 个 key
发生改变,那就执行 RDB 持久化。
stop-writes-on-bgsave-error yes
:如果 Redis 执行 RDB
持久化失败(常见于操作系统内存不足),那么 Redis 将不再接受 client
写入数据的请求。
rdbcompression yes
:当生成 RDB
文件时,同时进行压缩。
dbfilename dump.rdb
:将 RDB 文件命名为 dump.rdb。
dir /var/lib/redis
:将 RDB
文件保存在/var/lib/redis
目录下。
当然在实践中,我们通常会将stop-writes-on-bgsave-error
设置为false
,同时让监控系统在
Redis 执行 RDB 持久化失败时发送告警,以便人工介入解决,而不是粗暴地拒绝
client 的写入请求。
RDB持久化的优点:
RDB持久化文件小,Redis数据恢复时速度快
子进程不影响父进程,父进程可以持续处理客户端命令
子进程fork时采用copy-on-write方式,大多数情况下,没有太多的内存消耗,效率比较好。
RDB 持久化的缺点:
子进程fork时采用copy-on-write方式,如果Redis此时写操作较多,可能导致额外的内存占用,甚至内存溢出
RDB文件压缩会减小文件体积,但通过时会对CPU有额外的消耗
如果业务场景很看重数据的持久性 (durability),那么不应该采用 RDB
持久化。譬如说,如果 Redis 每 5 分钟执行一次 RDB 持久化,要是 Redis
意外奔溃了,那么最多会丢失 5 分钟的数据。
2)AOF 持久化
可以使用appendonly yes
配置项来开启 AOF 持久化。Redis
执行 AOF 持久化时,会将接收到的写命令追加到 AOF 文件的末尾,因此 Redis
只要对 AOF 文件中的命令进行回放,就可以将数据库还原到原先的状态。 与
RDB 持久化相比,AOF 持久化的一个明显优势就是,它可以提高数据的持久性
(durability)。因为在 AOF 模式下,Redis 每次接收到 client
的写命令,就会将命令write()
到 AOF 文件末尾。 然而,在
Linux
中,将数据write()
到文件后,数据并不会立即刷新到磁盘,而会先暂存在
OS 的文件系统缓冲区。在合适的时机,OS
才会将缓冲区的数据刷新到磁盘(如果需要将文件内容刷新到磁盘,可以调用fsync()
或fdatasync()
)。
通过appendfsync
配置项,可以控制 Redis
将命令同步到磁盘的频率:
always
:每次 Redis 将命令write()
到 AOF
文件时,都会调用fsync()
,将命令刷新到磁盘。这可以保证最好的数据持久性,但却会给系统带来极大的开销。
no
:Redis 只将命令write()
到 AOF
文件。这会让 OS 决定何时将命令刷新到磁盘。
everysec
:除了将命令write()
到 AOF
文件,Redis
还会每秒执行一次fsync()
。在实践中,推荐使用这种设置,一定程度上可以保证数据持久性,又不会明显降低
Redis 性能。
然而,AOF 持久化并不是没有缺点的:Redis
会不断将接收到的写命令追加到 AOF 文件中,导致 AOF 文件越来越大。过大的
AOF 文件会消耗磁盘空间,并且导致 Redis
重启时更加缓慢。为了解决这个问题,在适当情况下,Redis 会对 AOF
文件进行重写,去除文件中冗余的命令,以减小 AOF 文件的体积。在重写 AOF
文件期间, Redis 会启动一个子进程,由子进程负责对 AOF 文件进行重写。
可以通过下面两个配置项,控制 Redis 重写 AOF 文件的频率:
auto-aof-rewrite-min-size 64mb
auto-aof-rewrite-percentage 100
上面两个配置的作用:当 AOF 文件的体积大于 64MB,并且 AOF
文件的体积比上一次重写之后的体积大了至少一倍,那么 Redis 就会执行 AOF
重写。
优点:
持久化频率高,数据可靠性高
没有额外的内存或CPU消耗
缺点:
面试话术:
Redis 提供了两种数据持久化的方式,一种是 RDB,另一种是
AOF。默认情况下,Redis 使用的是 RDB 持久化。
RDB持久化文件体积较小,但是保存数据的频率一般较低,可靠性差,容易丢失数据。另外RDB写数据时会采用Fork函数拷贝主进程,可能有额外的内存消耗,文件压缩也会有额外的CPU消耗。
ROF持久化可以做到每秒钟持久化一次,可靠性高。但是持久化文件体积较大,导致数据恢复时读取文件时间较长,效率略低
3.3.Redis的集群方式有哪些?
面试话术:
Redis集群可以分为主从集群 和分片集群 两类。
主从集群 一般一主多从,主库用来写数据,从库用来读数据。结合哨兵,可以再主库宕机时从新选主,目的是保证Redis的高可用 。
分片集群 是数据分片,我们会让多个Redis节点组成集群,并将16383个插槽分到不同的节点上。存储数据时利用对key做hash运算,得到插槽值后存储到对应的节点即可。因为存储数据面向的是插槽而非节点本身,因此可以做到集群动态伸缩。目的是让Redis能存储更多数据。
1)主从集群
主从集群,也是读写分离集群。一般都是一主多从方式。
Redis 的复制(replication)功能允许用户根据一个 Redis
服务器来创建任意多个该服务器的复制品,其中被复制的服务器为主服务器(master),而通过复制创建出来的服务器复制品则为从服务器(slave)。
只要主从服务器之间的网络连接正常,主从服务器两者会具有相同的数据,主服务器就会一直将发生在自己身上的数据更新同步
给从服务器,从而一直保证主从服务器的数据相同。
写数据时只能通过主节点完成
读数据可以从任何节点完成
如果配置了哨兵节点
,当master宕机时,哨兵会从salve节点选出一个新的主。
主从集群分两种:
带有哨兵的集群:
2)分片集群
主从集群中,每个节点都要保存所有信息,容易形成木桶效应。并且当数据量较大时,单个机器无法满足需求。此时我们就要使用分片集群了。
集群特征:
每个节点都保存不同数据
所有的redis节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽.
节点的fail是通过集群中超过半数的节点检测失效时才生效.
客户端与redis节点直连,不需要中间proxy层连接集群中任何一个可用节点都可以访问到数据
redis-cluster把所有的物理节点映射到[0-16383]slot(插槽)上,实现动态伸缩
为了保证Redis中每个节点的高可用,我们还可以给每个节点创建replication(slave节点),如图:
出现故障时,主从可以及时切换:
3.4.Redis的常用数据类型有哪些?
支持多种类型的数据结构,主要区别是value存储的数据格式不同:
3.5.聊一下Redis事务机制
相关资料:
参考:http://redisdoc.com/topic/transaction.html
Redis事务功能是通过MULTI、EXEC、DISCARD和WATCH
四个原语实现的。Redis会将一个事务中的所有命令序列化,然后按顺序执行。但是Redis事务不支持回滚操作,命令运行出错后,正确的命令会继续执行。
MULTI
: 用于开启一个事务,它总是返回OK。
MULTI执行之后,客户端可以继续向服务器发送任意多条命令,这些命令不会立即被执行,而是被放到一个待执行命令队列 中
EXEC
:按顺序执行命令队列内的所有命令。返回所有命令的返回值。事务执行过程中,Redis不会执行其它事务的命令。
DISCARD
:清空命令队列,并放弃执行事务,
并且客户端会从事务状态中退出
WATCH
:Redis的乐观锁机制,利用compare-and-set(CAS)原理,可以监控一个或多个键,一旦其中有一个键被修改,之后的事务就不会执行
使用事务时可能会遇上以下两种错误:
执行 EXEC
之前,入队的命令可能会出错。比如说,命令可能会产生语法错误(参数数量错误,参数名错误,等等),或者其他更严重的错误,比如内存不足(如果服务器使用
maxmemory
设置了最大内存限制的话)。
Redis 2.6.5
开始,服务器会对命令入队失败的情况进行记录,并在客户端调用 EXEC
命令时,拒绝执行并自动放弃这个事务。
命令可能在 EXEC
调用之后失败。举个例子,事务中的命令可能处理了错误类型的键,比如将列表命令用在了字符串键上面,诸如此类。
即使事务中有某个/某些命令在执行时产生了错误,
事务中的其他命令仍然会继续执行,不会回滚。
为什么 Redis 不支持回滚(roll back)?
以下是这种做法的优点:
Redis
命令只会因为错误的语法而失败(并且这些问题不能在入队时发现),或是命令用在了错误类型的键上面:这也就是说,从实用性的角度来说,失败的命令是由编程错误 造成的,而这些错误应该在开发的过程中被发现,而不应该出现在生产环境中。
因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。
鉴于没有任何机制能避免程序员自己造成的错误,
并且这类错误通常不会在生产环境中出现, 所以 Redis
选择了更简单、更快速的无回滚方式来处理事务。
面试话术:
Redis事务其实是把一系列Redis命令放入队列,然后批量执行,执行过程中不会有其它事务来打断。不过与关系型数据库的事务不同,Redis事务不支持回滚操作,事务中某个命令执行失败,其它命令依然会执行。
为了弥补不能回滚的问题,Redis会在事务入队时就检查命令,如果命令异常则会放弃整个事务。
因此,只要程序员编程是正确的,理论上说Redis会正确执行所有事务,无需回滚。
面试官:如果事务执行一半的时候Redis宕机怎么办?
Redis有持久化机制,因为可靠性问题,我们一般使用AOF持久化。事务的所有命令也会写入AOF文件,但是如果在执行EXEC命令之前,Redis已经宕机,则AOF文件中事务不完整。使用
redis-check-aof
程序可以移除 AOF
文件中不完整事务的信息,确保服务器可以顺利启动。
3.6.Redis的Key过期策略
参考资料:
为什么需要内存回收?
1、在Redis中,set指令可以指定key的过期时间,当过期时间到达以后,key就失效了;
2、Redis是基于内存操作的,所有的数据都是保存在内存中,一台机器的内存是有限且很宝贵的。
基于以上两点,为了保证Redis能继续提供可靠的服务,Redis需要一种机制清理掉不常用的、无效的、多余的数据,失效后的数据需要及时清理,这就需要内存回收了。
Redis的内存回收主要分为过期删除策略和内存淘汰策略两部分。
过期删除策略
删除达到过期时间的key。
对于每一个设置了过期时间的key都会创建一个定时器,一旦到达过期时间就立即删除。该策略可以立即清除过期的数据,对内存较友好,但是缺点是占用了大量的CPU资源去处理过期的数据,会影响Redis的吞吐量和响应时间。
当访问一个key时,才判断该key是否过期,过期则删除。该策略能最大限度地节省CPU资源,但是对内存却十分不友好。有一种极端的情况是可能出现大量的过期key没有被再次访问,因此不会被清除,导致占用了大量的内存。
在计算机科学中,懒惰删除(英文:lazy
deletion)指的是从一个散列表(也称哈希表)中删除元素的一种方法。在这个方法中,删除仅仅是指标记一个元素被删除,而不是整个清除它。被删除的位点在插入时被当作空元素,在搜索之时被当作已占据。
每隔一段时间,扫描Redis中过期key字典,并清除部分过期的key。该策略是前两者的一个折中方案,还可以通过调整定时扫描的时间间隔和每次扫描的限定耗时,在不同情况下使得CPU和内存资源达到最优的平衡效果。
在Redis中,同时使用了定期删除和惰性删除
。不过Redis定期删除采用的是随机抽取的方式删除部分Key,因此不能保证过期key
100%的删除。
Redis结合了定期删除和惰性删除,基本上能很好的处理过期数据的清理,但是实际上还是有点问题的,如果过期key较多,定期删除漏掉了一部分,而且也没有及时去查,即没有走惰性删除,那么就会有大量的过期key堆积在内存中,导致redis内存耗尽,当内存耗尽之后,有新的key到来会发生什么事呢?是直接抛弃还是其他措施呢?有什么办法可以接受更多的key?
内存淘汰策略
Redis的内存淘汰策略,是指内存达到maxmemory极限时,使用某种算法来决定清理掉哪些数据,以保证新数据的存入。
Redis的内存淘汰机制包括:
noeviction: 当内存不足以容纳新写入数据时,新写入操作会报错。
allkeys-lru:当内存不足以容纳新写入数据时,在键空间(server.db[i].dict
)中,移除最近最少使用的
key(这个是最常用的)。
allkeys-random:当内存不足以容纳新写入数据时,在键空间(server.db[i].dict
)中,随机移除某个
key。
volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间(server.db[i].expires
)中,移除最近最少使用的
key。
volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间(server.db[i].expires
)中,随机移除某个
key。
volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间(server.db[i].expires
)中,有更早过期时间的
key 优先移除。
在配置文件中,通过maxmemory-policy可以配置要使用哪一个淘汰机制。
什么时候会进行淘汰?
Redis会在每一次处理命令的时候(processCommand函数调用freeMemoryIfNeeded)判断当前redis是否达到了内存的最大限制,如果达到限制,则使用对应的算法去处理需要删除的key。
在淘汰key时,Redis默认最常用的是LRU算法(Latest Recently
Used)。Redis通过在每一个redisObject保存lru属性来保存key最近的访问时间,在实现LRU算法时直接读取key的lru属性。
具体实现时,Redis遍历每一个db,从每一个db中随机抽取一批样本key,默认是3个key,再从这3个key中,删除最近最少使用的key。
面试话术:
Redis过期策略包含定期删除和惰性删除两部分。定期删除是在Redis内部有一个定时任务,会定期删除一些过期的key。惰性删除是当用户查询某个Key时,会检查这个Key是否已经过期,如果没过期则返回用户,如果过期则删除。
但是这两个策略都无法保证过期key一定删除,漏网之鱼越来越多,还可能导致内存溢出。当发生内存不足问题时,Redis还会做内存回收。内存回收采用LRU策略,就是最近最少使用。其原理就是记录每个Key的最近使用时间,内存回收时,随机抽取一些Key,比较其使用时间,把最老的几个删除。
Redis的逻辑是:最近使用过的,很可能再次被使用
3.7.Redis在项目中的哪些地方有用到?
(1)共享session
在分布式系统下,服务会部署在不同的tomcat,因此多个tomcat的session无法共享,以前存储在session中的数据无法实现共享,可以用redis代替session,解决分布式系统间数据共享问题。
(2)数据缓存
Redis采用内存存储,读写效率较高。我们可以把数据库的访问频率高的热点数据存储到redis中,这样用户请求时优先从redis中读取,减少数据库压力,提高并发能力。
(3)异步队列
Reids在内存存储引擎领域的一大优点是提供 list 和 set
操作,这使得Redis能作为一个很好的消息队列平台来使用。而且Redis中还有pub/sub这样的专用结构,用于1对N的消息通信模式。
(4)分布式锁
Redis中的乐观锁机制,可以帮助我们实现分布式锁的效果,用于解决分布式系统下的多线程安全问题
3.8.Redis的缓存击穿、缓存雪崩、缓存穿透
1)缓存穿透
参考资料:
什么是缓存穿透
正常情况下,我们去查询数据都是存在。那么请求去查询一条压根儿数据库中根本就不存在的数据,也就是缓存和数据库都查询不到这条数据,但是请求每次都会打到数据库上面去。这种查询不存在数据的现象我们称为缓存穿透 。
穿透带来的问题
试想一下,如果有黑客会对你的系统进行攻击,拿一个不存在的id
去查询数据,会产生大量的请求到数据库去查询。可能会导致你的数据库由于压力过大而宕掉。
解决办法
缓存空值:之所以会发生穿透,就是因为缓存中没有存储这些空数据的key。从而导致每次查询都到数据库去了。那么我们就可以为这些key对应的值设置为null
丢到缓存里面去。后面再出现查询这个key 的请求的时候,直接返回null
。这样,就不用在到数据库中去走一圈了,但是别忘了设置过期时间。
BloomFilter(布隆过滤):将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被
这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。在缓存之前在加一层
BloomFilter ,在查询的时候先去 BloomFilter 去查询 key
是否存在,如果不存在就直接返回,存在再走查缓存 -> 查 DB。
话术:
缓存穿透有两种解决方案:其一 是把不存在的key设置null值到缓存中。其二 是使用布隆过滤器,在查询缓存前先通过布隆过滤器判断key是否存在,存在再去查询缓存。
设置null值可能被恶意针对,攻击者使用大量不存在的不重复key
,那么方案一就会缓存大量不存在key数据。此时我们还可以对Key规定格式模板,然后对不存在的key做正则规范 匹配,如果完全不符合就不用存null值到redis,而是直接返回错误。
2)缓存击穿
相关资料 :
key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。
当这个key在失效的瞬间,redis查询失败,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
解决方案:
使用互斥锁(mutex
key):mutex,就是互斥。简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去load
db,而是先使用Redis的SETNX去set一个互斥key,当操作返回成功时,再进行load
db的操作并回设缓存;否则,就重试整个get缓存的方法。SETNX,是「SET if Not
eXists」的缩写,也就是只有不存在的时候才设置,可以利用它来实现互斥的效果。
软过期:也就是逻辑过期,不使用redis提供的过期时间,而是业务层在数据中存储过期时间信息。查询时由业务程序判断是否过期,如果数据即将过期时,将缓存的时效延长,程序可以派遣一个线程去数据库中获取最新的数据,其他线程这时看到延长了的过期时间,就会继续使用旧数据,等派遣的线程获取最新数据后再更新缓存。
推荐使用互斥锁,因为软过期会有业务逻辑侵入和额外的判断。
面试话术 :
缓存击穿主要担心的是某个Key过期,更新缓存时引起对数据库的突发高并发访问。因此我们可以在更新缓存时采用互斥锁控制,只允许一个线程去更新缓存,其它线程等待并重新读取缓存。例如Redis的setnx命令就能实现互斥效果。
3)缓存雪崩
相关资料 :
缓存雪崩,是指在某一个时间段,缓存集中过期失效。对这批数据的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。
解决方案:
数据分类分批处理:采取不同分类数据,缓存不同周期
相同分类数据:采用固定时长加随机数方式设置缓存
热点数据缓存时间长一些,冷门数据缓存时间短一些
避免redis节点宕机引起雪崩,搭建主从集群,保证高可用
面试话术:
解决缓存雪崩问题的关键是让缓存Key的过期时间分散。因此我们可以把数据按照业务分类,然后设置不同过期时间。相同业务类型的key,设置固定时长加随机数。尽可能保证每个Key的过期时间都不相同。
另外,Redis宕机也可能导致缓存雪崩,因此我们还要搭建Redis主从集群及哨兵监控,保证Redis的高可用。
3.9.缓存冷热数据分离
背景资料 :
Redis使用的是内存存储,当需要海量数据存储时,成本非常高。
经过调研发现,当前主流DDR3内存和主流SATA
SSD的单位成本价格差距大概在20倍左右,为了优化redis机器综合成本,我们考虑实现基于热度统计
的数据分级存储 及数据在RAM/FLASH之间的动态交换,从而大幅度降低成本,达到性能与成本的高平衡。
基本思路:基于key访问次数(LFU)的热度统计算法识别出热点数据,并将热点数据保留在redis中,对于无访问/访问次数少的数据则转存到SSD上,如果SSD上的key再次变热,则重新将其加载到redis内存中。
目前流行的高性能磁盘存储,并且遵循Redis协议的方案包括:
SSDB:http://ssdb.io/zh_cn/
RocksDB:https://rocksdb.org.cn/
因此,我们就需要在应用程序与缓存服务之间引入代理,实现Redis和SSD之间的切换,如图:
这样的代理方案阿里云提供的就有。当然也有一些开源方案,例如:https://github.com/JingchengLi/swapdb
3.10.Redis实现分布式锁
分布式锁要满足的条件:
多进程互斥:同一时刻,只有一个进程可以获取锁
保证锁可以释放:任务结束或出现异常,锁一定要释放,避免死锁
阻塞锁(可选):获取锁失败时可否重试
重入锁(可选):获取锁的代码递归调用时,依然可以获取锁
1)最基本的分布式锁:
利用Redis的setnx命令,这个命令的特征时如果多次执行,只有第一次执行会成功,可以实现互斥
的效果。但是为了保证服务宕机时也可以释放锁,需要利用expire命令给锁设置一个有效期
1 2 setnx lock thread-01 # 尝试获取锁 expire lock 10 # 设置有效期
面试官问题1 :如果expire之前服务宕机怎么办?
要保证setnx和expire命令的原子性。redis的set命令可以满足:
1 set key value [NX] [EX time]
需要添加nx和ex的选项:
NX:与setnx一致,第一次执行成功
EX:设置过期时间
面试官问题2 :释放锁的时候,如果自己的锁已经过期了,此时会出现安全漏洞,如何解决?
在锁中存储当前进程和线程标识,释放锁时对锁的标识判断,如果是自己的则删除,不是则放弃操作。
但是这两步操作要保证原子性,需要通过Lua脚本来实现。
1 2 3 if redis.call("get",KEYS[1]) == ARGV[1] then redis.call("del",KEYS[1]) end
2)可重入分布式锁
如果有重入的需求,则除了在锁中记录进程标识,还要记录重试次数,流程如下:
下面我们假设锁的key为“lock
”,hashKey是当前线程的id:“threadId
”,锁自动释放时间假设为20
获取锁的步骤:
1、判断lock是否存在 EXISTS lock
存在,说明有人获取锁了,下面判断是不是自己的锁
判断当前线程id作为hashKey是否存在:HEXISTS lock threadId
不存在,说明锁已经有了,且不是自己获取的,锁获取失败,end
存在,说明是自己获取的锁,重入次数+1:HINCRBY lock threadId 1
,去到步骤3
2、不存在,说明可以获取锁,HSET key threadId 1
3、设置锁自动释放时间,EXPIRE lock 20
释放锁的步骤:
1、判断当前线程id作为hashKey是否存在:HEXISTS lock threadId
不存在,说明锁已经失效,不用管了
存在,说明锁还在,重入次数减1:HINCRBY lock threadId -1
,获取新的重入次数
2、判断重入次数是否为0:
为0,说明锁全部释放,删除key:DEL lock
大于0,说明锁还在使用,重置有效时间:EXPIRE lock 20
对应的Lua脚本如下:
首先是获取锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 local key = KEYS[1 ]; local threadId = ARGV[1 ]; local releaseTime = ARGV[2 ]; if (redis.call('exists' , key) == 0 ) then redis.call('hset' , key, threadId, '1' ); redis.call('expire' , key, releaseTime); return 1 ; end ;if (redis.call('hexists' , key, threadId) == 1 ) then redis.call('hincrby' , key, threadId, '1' ); redis.call('expire' , key, releaseTime); return 1 ; end ;return 0 ;
然后是释放锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 local key = KEYS[1 ]; local threadId = ARGV[1 ]; local releaseTime = ARGV[2 ]; if (redis.call('HEXISTS' , key, threadId) == 0 ) then return nil ; end ;local count = redis.call('HINCRBY' , key, threadId, -1 ); if (count > 0 ) then redis.call('EXPIRE' , key, releaseTime); return nil ; else redis.call('DEL' , key); return nil ; end ;
3)高可用的锁
面试官问题
:redis分布式锁依赖与redis,如果redis宕机则锁失效。如何解决?
此时大多数同学会回答说:搭建主从集群,做数据备份。
这样就进入了陷阱,因为面试官的下一个问题就来了:
面试官问题
:如果搭建主从集群做数据备份时,进程A获取锁,master还没有把数据备份到slave,master宕机,slave升级为master,此时原来锁失效,其它进程也可以获取锁,出现安全问题。如何解决?
关于这个问题,Redis官网给出了解决方案,使用RedLock思路可以解决:
在Redis的分布式环境中,我们假设有N个Redis
master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,我们假设有5个Redis
master节点,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
获取当前Unix时间,以毫秒为单位。
依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
如果因为某些原因,获取锁失败(没有 在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
3.11.如何实现数据库与缓存数据一致?
面试话术:
实现方案有下面几种:
本地缓存同步:当前微服务的数据库数据与缓存数据同步,可以直接在数据库修改时加入对Redis的修改逻辑,保证一致。
跨服务缓存同步:服务A调用了服务B,并对查询结果缓存。服务B数据库修改,可以通过MQ通知服务A,服务A修改Redis缓存数据
通用方案:使用Canal框架,伪装成MySQL的salve节点,监听MySQL的binLog变化,然后修改Redis缓存数据
Nacos源码分析
1.下载Nacos源码并运行
要研究Nacos源码自然不能用打包好的Nacos服务端jar包来运行,需要下载源码自己编译来运行。
1.1.下载Nacos源码
Nacos的GitHub地址:https://github.com/alibaba/nacos
课前资料中已经提供了下载好的1.4.2版本的Nacos源码:
如果需要研究其他版本的同学,也可以自行下载:
大家找到其release页面:https://github.com/alibaba/nacos/tags,找到其中的1.4.2.版本:
点击进入后,下载Source code(zip):
1.2.导入Demo工程
我们的课前资料提供了一个微服务Demo,包含了服务注册、发现等业务。
导入该项目后,查看其项目结构:
结构说明:
cloud-source-demo:项目父目录
cloud-demo:微服务的父工程,管理微服务依赖
order-service:订单微服务,业务中需要访问user-service,是一个服务消费者
user-service:用户微服务,对外暴露根据id查询用户的接口,是一个服务提供者
1.3.导入Nacos源码
将之前下载好的Nacos源码解压到cloud-source-demo项目目录中:
然后,使用IDEA将其作为一个module来导入:
1)选择项目结构选项:
然后点击导入module:
在弹出窗口中,选择nacos源码目录:
然后选择maven模块,finish:
最后,点击OK即可:
导入后的项目结构:
1.4.proto编译
Nacos底层的数据通信会基于protobuf对数据做序列化和反序列化。并将对应的proto文件定义在了consistency这个子模块中:
我们需要先将proto文件编译为对应的Java代码。
1.4.1.什么是protobuf
protobuf的全称是Protocol
Buffer,是Google提供的一种数据序列化协议,这是Google官方的定义:
Protocol Buffers
是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或
RPC
数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
可以简单理解为,是一种跨语言、跨平台的数据传输格式。与json的功能类似,但是无论是性能,还是数据大小都比json要好很多。
protobuf的之所以可以跨语言,就是因为数据定义的格式为.proto
格式,需要基于protoc编译为对应的语言。
1.4.2.安装protoc
Protobuf的GitHub地址:https://github.com/protocolbuffers/protobuf/releases
我们可以下载windows版本的来使用:
另外,课前资料也提供了下载好的安装包:
解压到任意非中文目录下,其中的bin目录中的protoc.exe可以帮助我们编译:
然后将这个bin目录配置到你的环境变量path中,可以参考JDK的配置方式:
1.4.3.编译proto
进入nacos-1.4.2的consistency模块下的src/main目录下:
然后打开cmd窗口,运行下面的两个命令:
1 2 protoc --java_out =./java ./proto/consistency.proto protoc --java_out =./java ./proto/Data.proto
如图:
会在nacos的consistency模块中编译出这些java代码:
1.5.运行
nacos服务端的入口是在console模块中的Nacos类:
我们需要让它单机启动:
然后新建一个SpringBootApplication:
然后填写应用信息:
然后运行Nacos这个main函数:
将order-service和user-service服务启动后,可以查看nacos控制台:
2.服务注册
服务注册到Nacos以后,会保存在一个本地注册表中,其结构如下:
首先最外层是一个Map,结构为:Map<String, Map<String, Service>>
:
key:是namespace_id,起到环境隔离的作用。namespace下可以有多个group
value:又是一个Map<String, Service>
,代表分组及组内的服务。一个组内可以有多个服务
key:代表group分组,不过作为key时格式是group_name:service_name
value:分组下的某一个服务,例如userservice,用户服务。类型为Service
,内部也包含一个Map<String,Cluster>
,一个服务下可以有多个集群
key:集群名称
value:Cluster
类型,包含集群的具体信息。一个集群中可能包含多个实例,也就是具体的节点信息,其中包含一个Set<Instance>
,就是该集群下的实例的集合
Instance:实例信息,包含实例的IP、Port、健康状态、权重等等信息
每一个服务去注册到Nacos时,就会把信息组织并存入这个Map中。
2.1.服务注册接口
Nacos提供了服务注册的API接口,客户端只需要向该接口发送请求,即可实现服务注册。
接口说明: 注册一个实例到Nacos服务。
请求类型 :POST
请求路径 :/nacos/v1/ns/instance
请求参数 :
ip
字符串
是
服务实例IP
port
int
是
服务实例port
namespaceId
字符串
否
命名空间ID
weight
double
否
权重
enabled
boolean
否
是否上线
healthy
boolean
否
是否健康
metadata
字符串
否
扩展信息
clusterName
字符串
否
集群名
serviceName
字符串
是
服务名
groupName
字符串
否
分组名
ephemeral
boolean
否
是否临时实例
错误编码 :
400
Bad Request
客户端请求中的语法错误
403
Forbidden
没有权限
404
Not Found
无法找到资源
500
Internal Server Error
服务器内部错误
200
OK
正常
2.2.客户端
首先,我们需要找到服务注册的入口。
2.2.1.NacosServiceRegistryAutoConfiguration
因为Nacos的客户端是基于SpringBoot的自动装配实现的,我们可以在nacos-discovery依赖:
spring-cloud-starter-alibaba-nacos-discovery-2.2.6.RELEASE.jar
这个包中找到Nacos自动装配信息:
可以看到,有很多个自动配置类被加载了,其中跟服务注册有关的就是NacosServiceRegistryAutoConfiguration这个类,我们跟入其中。
可以看到,在NacosServiceRegistryAutoConfiguration这个类中,包含一个跟自动注册有关的Bean:
2.2.2.NacosAutoServiceRegistration
NacosAutoServiceRegistration
源码如图:
可以看到在初始化时,其父类AbstractAutoServiceRegistration
也被初始化了。
AbstractAutoServiceRegistration
如图:
可以看到它实现了ApplicationListener
接口,监听Spring容器启动过程中的事件。
在监听到WebServerInitializedEvent
(web服务初始化完成)的事件后,执行了bind
方法。
其中的bind方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void bind (WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management" .equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return ; } } this .port.compareAndSet(0 , event.getWebServer().getPort()); this .start(); }
其中的start方法流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void start () { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting" ); } return ; } if (!this .running.get()) { this .context.publishEvent( new InstancePreRegisteredEvent (this , getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this .context.publishEvent( new InstanceRegisteredEvent <>(this , getConfiguration())); this .running.compareAndSet(false , true ); } }
其中最关键的register()方法就是完成服务注册的关键,代码如下:
1 2 3 protected void register () { this .serviceRegistry.register(getRegistration()); }
此处的this.serviceRegistry就是NacosServiceRegistry:
2.2.3.NacosServiceRegistry
NacosServiceRegistry
是Spring的ServiceRegistry
接口的实现类,而ServiceRegistry接口是服务注册、发现的规约接口,定义了register、deregister等方法的声明。
而NacosServiceRegistry
对register
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override public void register (Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..." ); return ; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished" , group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { if (nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{}," , serviceId, registration.toString(), e); rethrowRuntimeException(e); } else { log.warn("Failfast is false. {} register failed...{}," , serviceId, registration.toString(), e); } } }
可以看到方法中最终是调用NamingService的registerInstance方法实现注册的。
而NamingService接口的默认实现就是NacosNamingService。
2.2.4.NacosNamingService
NacosNamingService提供了服务注册、订阅等功能。
其中registerInstance就是注册服务实例,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void registerInstance (String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
最终,由NacosProxy的registerService方法,完成服务注册。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void registerService (String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}" , namespaceId, serviceName, instance); final Map<String, String> params = new HashMap <String, String>(16 ); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip" , instance.getIp()); params.put("port" , String.valueOf(instance.getPort())); params.put("weight" , String.valueOf(instance.getWeight())); params.put("enable" , String.valueOf(instance.isEnabled())); params.put("healthy" , String.valueOf(instance.isHealthy())); params.put("ephemeral" , String.valueOf(instance.isEphemeral())); params.put("metadata" , JacksonUtils.toJson(instance.getMetadata())); reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
这里提交的信息就是Nacos服务注册接口需要的完整参数,核心参数有:
namespace_id:环境
service_name:服务名称
group_name:组名称
cluster_name:集群名称
ip: 当前实例的ip地址
port: 当前实例的端口
而在NacosNamingService的registerInstance方法中,有一段是与服务心跳有关的代码,我们在后续会继续学习。
2.2.5.客户端注册的流程图
如图:
2.3.服务端
在nacos-console的模块中,会引入nacos-naming这个模块:
模块结构如下:
其中的com.alibaba.nacos.naming.controllers包下就有服务注册、发现等相关的各种接口,其中的服务注册是在InstanceController
类中:
2.3.1.InstanceController
进入InstanceController类,可以看到一个register方法,就是服务注册的方法了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register (HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok" ; }
这里,进入到了serviceManager.registerInstance()方法中。
2.3.2.ServiceManager
ServiceManager就是Nacos中管理服务、实例信息的核心API,其中就包含Nacos的服务注册表:
而其中的registerInstance方法就是注册服务实例的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void registerInstance (String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null ) { throw new NacosException (NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
创建好了服务,接下来就要添加实例到服务中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void addInstance (String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances (); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }
该方法中对修改服务列表的动作加锁处理,确保线程安全。而在同步代码块中,包含下面几步:
1)先获取要更新的实例列表,addIpAddresses(service, ephemeral, ips);
2)然后将更新后的数据封装到Instances
对象中,后面更新注册表时使用
3)最后,调用consistencyService.put()
方法完成Nacos集群的数据同步,保证集群一致性。
注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中。在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。
这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案。
1)更服务列表
我们来看看实例列表的更新,对应的方法是addIpAddresses(service, ephemeral, ips);
:
1 2 3 private List<Instance> addIpAddresses (Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); }
继续进入updateIpAddresses
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public List<Instance> updateIpAddresses (Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> currentInstances = new HashMap <>(currentIPs.size()); Set<String> currentInstanceIds = Sets.newHashSet(); for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { instanceMap = new HashMap <>(ips.length); } for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster (instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration." , instance.getClusterName(), instance.toJson()); } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null ) { instance.setInstanceId(oldInstance.getInstanceId()); } else { instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException ( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } return new ArrayList <>(instanceMap.values()); }
简单来讲,就是先获取旧的实例列表,然后把新的实例信息与旧的做对比,新的实例就添加,老的实例同步ID。然后返回最新的实例列表。
2)Nacos集群一致性
在完成本地服务列表更新后,Nacos又实现了集群一致性更新,调用的是:
consistencyService.put(key, instances);
这里的ConsistencyService接口,代表集群一致性的接口,有很多中不同实现:
我们进入DelegateConsistencyServiceImpl来看:
1 2 3 4 5 @Override public void put (String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); }
其中的mapConsistencyService(key)
方法就是选择委托方式的:
1 2 3 4 5 6 private ConsistencyService mapConsistencyService (String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }
默认情况下,所有实例都是临时实例,我们关注DistroConsistencyServiceImpl即可。
2.3.4.DistroConsistencyServiceImpl
我们来看临时实例的一致性实现:DistroConsistencyServiceImpl类的put方法:
1 2 3 4 5 6 7 public void put (String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey (key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2 ); }
这里方法只有两行:
onPut(key, value)
:其中value就是Instances,要更新的服务信息。这行主要是基于线程池方式,异步的将Service信息写入注册表中(就是那个多重Map)
distroProtocol.sync()
:就是通过Distro协议将数据同步给集群中的其它Nacos节点
我们先看onPut方法
2.3.4.1.更新本地实例列表
1)放入阻塞队列
onPut方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void onPut (String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum <>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return ; } notifier.addTask(key, DataOperation.CHANGE); }
notifier的类型就是DistroConsistencyServiceImpl.Notifier
,内部维护了一个阻塞队列,存放服务列表变更的事件:
addTask时,将任务加入该阻塞队列:
1 2 3 4 5 6 7 8 9 10 11 12 public void addTask (String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return ; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.offer(Pair.with(datumKey, action)); }
2)Notifier异步更新
同时,notifier还是一个Runnable,通过一个单线程的线程池来不断从阻塞队列中获取任务,执行服务列表的更新。来看下其中的run方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public void run () { Loggers.DISTRO.info("distro notifier started" ); for (; ; ) { try { Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task" , e); } } }
来看看handle方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private void handle (Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0 ; if (!listeners.containsKey(datumKey)) { return ; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue ; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue ; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}" , datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}" , datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task" , e); } }
3)覆盖实例列表
而在Service的onChange方法中,就可以看到更新实例列表的逻辑了:
1 2 3 4 5 6 7 8 9 10 @Override public void onChange (String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}" , key, value); updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
updateIPs方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public void updateIPs (Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap <>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList <>()); } for (Instance instance : instances) { try { if (instance == null ) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null" ); continue ; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration." , instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster (instance.getClusterName(), this ); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null ) { clusterIPs = new LinkedList <>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { List<Instance> entryIPs = entry.getValue(); clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(this ); StringBuilder stringBuilder = new StringBuilder (); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_" ).append(instance.isHealthy()).append("," ); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}" , getNamespaceId(), getName(), stringBuilder.toString()); }
在第45行的代码中:clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
就是在更新注册表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public void updateIps (List<Instance> ips, boolean ephemeral) { Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap<String, Instance> oldIpMap = new HashMap <>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0 ) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}" , getService().getName(), getName(), newIPs.size(), newIPs.toString()); for (Instance ip : newIPs) { HealthCheckStatus.reset(ip); } } List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0 ) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}" , getService().getName(), getName(), deadIPs.size(), deadIPs.toString()); for (Instance ip : deadIPs) { HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet <>(ips); if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; } }
2.3.4.2.集群数据同步
在DistroConsistencyServiceImpl的put方法中分为两步:
其中的onPut方法已经分析过了。
下面的distroProtocol.sync()就是集群同步的逻辑了。
DistroProtocol类的sync方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void sync (DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { DistroKey distroKeyWithTarget = new DistroKey (distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); DistroDelayTask distroDelayTask = new DistroDelayTask (distroKeyWithTarget, action, delay); distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}" , distroKey, each.getAddress()); } } }
其中同步的任务封装为一个DistroDelayTask
对象。
交给了distroTaskEngineHolder.getDelayTaskExecuteEngine()
执行,这行代码的返回值是:
NacosDelayTaskExecuteEngine
,这个类维护了一个线程池,并且接收任务,执行任务。
执行任务的方法为processTasks()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected void processTasks () { Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue ; } NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue ; } try { if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } }
可以看出来基于Distro模式的同步是异步进行的,并且失败时会将任务重新入队并充实,因此不保证同步结果的强一致性,属于AP模式的一致性策略。
2.3.5.服务端流程图
2.4.总结
3.服务心跳
Nacos的实例分为临时实例和永久实例两种,可以通过在yaml 文件配置:
1 2 3 4 5 6 7 8 spring: application: name: order-service cloud: nacos: discovery: ephemeral: false server-addr: 192.168 .150 .1 :8845
临时实例基于心跳方式做健康检测,而永久实例则是由Nacos主动探测实例状态。
其中Nacos提供的心跳的API接口为:
接口描述 :发送某个实例的心跳
请求类型 :PUT
请求路径 :
1 /nacos/v1/ns/instance/beat
请求参数 :
serviceName
字符串
是
服务名
groupName
字符串
否
分组名
ephemeral
boolean
否
是否临时实例
beat
JSON格式字符串
是
实例心跳内容
错误编码 :
400
Bad Request
客户端请求中的语法错误
403
Forbidden
没有权限
404
Not Found
无法找到资源
500
Internal Server Error
服务器内部错误
200
OK
正常
3.1.客户端
在2.2.4.服务注册这一节中,我们说过NacosNamingService这个类实现了服务的注册,同时也实现了服务心跳:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void registerInstance (String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
3.1.1.BeatInfo
这里的BeanInfo就包含心跳需要的各种信息:
3.1.2.BeatReactor
而BeatReactor
这个类则维护了一个线程池:
当调用BeatReactor
的.addBeatInfo(groupedServiceName, beatInfo)
方法时,就会执行心跳:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void addBeatInfo (String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map." , beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null ; if ((existBeat = dom2Beat.remove(key)) != null ) { existBeat.setStopped(true ); } dom2Beat.put(key, beatInfo); executorService.schedule(new BeatTask (beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }
心跳周期的默认值在com.alibaba.nacos.api.common.Constants
类中:
可以看到是5秒,默认5秒一次心跳。
3.1.3.BeatTask
心跳的任务封装在BeatTask
这个类中,是一个Runnable,其run方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Override public void run () { if (beatInfo.isStopped()) { return ; } long nextTime = beatInfo.getPeriod(); try { JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this .lightBeatEnabled); long interval = result.get("clientBeatInterval" ).asLong(); boolean lightBeatEnabled = false ; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this .lightBeatEnabled = lightBeatEnabled; if (interval > 0 ) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance (); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true ); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}" , JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } catch (Exception unknownEx) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}" , JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx); } finally { executorService.schedule(new BeatTask (beatInfo), nextTime, TimeUnit.MILLISECONDS); } }
3.1.5.发送心跳
最终心跳的发送还是通过NamingProxy
的sendBeat
方法来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public JsonNode sendBeat (BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}" , namespaceId, beatInfo.toString()); } Map<String, String> params = new HashMap <String, String>(8 ); Map<String, String> bodyMap = new HashMap <String, String>(2 ); if (!lightBeatEnabled) { bodyMap.put("beat" , JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip" , beatInfo.getIp()); params.put("port" , String.valueOf(beatInfo.getPort())); String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat" , params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
3.2.服务端
对于临时实例,服务端代码分两部分:
1)InstanceController提供了一个接口,处理客户端的心跳请求
2)定时检测实例心跳是否按期执行
3.2.1.InstanceController
与服务注册时一样,在nacos-naming模块中的InstanceController类中,定义了一个方法用来处理心跳请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 @CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat (HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat" , StringUtils.EMPTY); RsInfo clientBeat = null ; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip" , StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port" , "0" )); if (clientBeat != null ) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}" , clientBeat, serviceName); Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); if (instance == null ) { if (clientBeat == null ) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}" , clientBeat, serviceName); instance = new Instance (); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); serviceManager.registerInstance(namespaceId, serviceName, instance); } Service service = serviceManager.getService(namespaceId, serviceName); if (service == null ) { throw new NacosException (NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null ) { clientBeat = new RsInfo (); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } service.processClientBeat(clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }
最终,在确认心跳请求对应的服务、实例都在的情况下,开始交给Service类处理这次心跳请求。调用了Service的processClientBeat方法
3.2.2.处理心跳请求
查看Service
的service.processClientBeat(clientBeat);
方法:
1 2 3 4 5 6 public void processClientBeat (final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor (); clientBeatProcessor.setService(this ); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); }
可以看到心跳信息被封装到了
ClientBeatProcessor类中,交给了HealthCheckReactor处理,HealthCheckReactor就是对线程池的封装,不用过多查看。
关键的业务逻辑都在ClientBeatProcessor这个类中,它是一个Runnable,其中的run方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override public void run () { Service service = this .service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}" , rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true ); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}" , rsInfo.toString()); } instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true ); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok" , cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } } }
处理心跳请求的核心就是更新心跳实例的最后一次心跳时间,lastBeat,这个会成为判断实例心跳是否过期的关键指标!
3.3.3.心跳异常检测
在服务注册时,一定会创建一个Service
对象,而Service
中有一个init
方法,会在注册时被调用:
1 2 3 4 5 6 7 8 public void init () { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this ); entry.getValue().init(); } }
其中HealthCheckReactor.scheduleCheck就是执行心跳检测的定时任务:
可以看到,该任务是5000ms执行一次,也就是5秒对实例的心跳状态做一次检测。
此处的ClientBeatCheckTask同样是一个Runnable,其中的run方法为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Override public void run () { try { List<Instance> instances = service.allIPs(true ); for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false ); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent (this , instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return ; } for (Instance instance : instances) { if (instance.isMarked()) { continue ; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}" , service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out." , e); } }
其中的超时时间同样是在com.alibaba.nacos.api.common.Constants
这个类中:
3.3.4.主动健康检测
对于非临时实例(ephemeral=false),Nacos会采用主动的健康检测,定时向实例发送请求,根据响应来判断实例健康状态。
入口在2.3.2小节的ServiceManager
类中的registerInstance方法:
创建空服务时:
1 2 3 4 public void createEmptyService (String namespaceId, String serviceName, boolean local) throws NacosException { createServiceIfAbsent(namespaceId, serviceName, local, null ); }
创建服务流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void createServiceIfAbsent (String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null ) { Loggers.SRV_LOG.info("creating empty service {}:{}" , namespaceId, serviceName); service = new Service (); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null ) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
关键在putServiceAndInit(service)
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 private void putServiceAndInit (Service service) throws NacosException { putService(service); service = getService(service.getNamespaceId(), service.getName()); service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true ), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false ), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}" , service.toJson()); }
进入初始化逻辑:service.init()
,这个会进入Service类中:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void init () { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this ); entry.getValue().init(); } }
这里集群的初始化entry.getValue().init();
会进入Cluster
类型的init()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void init () { if (inited) { return ; } checkTask = new HealthCheckTask (this ); HealthCheckReactor.scheduleCheck(checkTask); inited = true ; }
这里的HealthCheckReactor.scheduleCheck(checkTask);
会开启定时任务,对非临时实例做健康检测。检测逻辑定义在HealthCheckTask
这个类中,是一个Runnable,其中的run方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void run () { try { if (distroMapper.responsible(cluster.getService().getName()) && switchDomain .isHealthCheckEnabled(cluster.getService().getName())) { healthCheckProcessor.process(this ); } } catch (Throwable e) { } finally { if (!cancelled) { HealthCheckReactor.scheduleCheck(this ); } } }
健康检测逻辑定义在healthCheckProcessor.process(this);
方法中,在HealthCheckProcessor接口中,这个接口也有很多实现,默认是TcpSuperSenseProcessor
:
进入TcpSuperSenseProcessor
的process方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void process (HealthCheckTask task) { List<Instance> ips = task.getCluster().allIPs(false ); if (CollectionUtils.isEmpty(ips)) { return ; } for (Instance ip : ips) { Beat beat = new Beat (ip, task); taskQueue.add(beat); MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet(); } }
可以看到,所有的健康检测任务都被放入一个阻塞队列,而不是立即执行了。这里又采用了异步执行的策略,可以看到Nacos中大量这样的设计。
而TcpSuperSenseProcessor
本身就是一个Runnable,在它的构造函数中会把自己放入线程池中去执行,其run方法如下:
1 2 3 4 5 6 7 8 9 10 11 public void run () { while (true ) { try { processTask(); } catch (Throwable e) { SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task" , e); } } }
通过processTask来处理健康检测的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void processTask () throws Exception { Collection<Callable<Void>> tasks = new LinkedList <>(); do { Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2 , TimeUnit.MILLISECONDS); if (beat == null ) { return ; } tasks.add(new TaskProcessor (beat)); } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64 ); for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) { f.get(); } }
任务被封装到了TaskProcessor中去执行了,TaskProcessor是一个Callable,其中的call方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Override public Void call () { long waited = System.currentTimeMillis() - beat.getStartTime(); if (waited > MAX_WAIT_TIME_MILLISECONDS) { Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms" ); } SocketChannel channel = null ; try { Instance instance = beat.getIp(); channel = SocketChannel.open(); channel.configureBlocking(false ); channel.socket().setSoLinger(false , -1 ); channel.socket().setReuseAddress(true ); channel.socket().setKeepAlive(true ); channel.socket().setTcpNoDelay(true ); Cluster cluster = beat.getTask().getCluster(); int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport(); channel.connect(new InetSocketAddress (instance.getIp(), port)); SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.attach(beat); keyMap.put(beat.toString(), new BeatKey (key)); beat.setStartTime(System.currentTimeMillis()); GlobalExecutor .scheduleTcpSuperSenseTask(new TimeOutTask (key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (Exception e) { beat.finishCheck(false , false , switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage()); if (channel != null ) { try { channel.close(); } catch (Exception ignore) { } } } return null ; }
3.3.总结
Nacos的健康检测有两种模式:
临时实例:
采用客户端心跳检测模式,心跳周期5秒
心跳间隔超过15秒则标记为不健康
心跳间隔超过30秒则从服务列表删除
永久实例:
采用服务端主动健康检测方式
周期为2000 + 5000毫秒内的随机数
检测异常只会标记为不健康,不会删除
那么为什么Nacos有临时和永久两种实例呢?
以淘宝为例,双十一大促期间,流量会比平常高出很多,此时服务肯定需要增加更多实例来应对高并发,而这些实例在双十一之后就无需继续使用了,采用临时实例 比较合适。而对于服务的一些常备实例,则使用永久实例 更合适。
与eureka相比,Nacos与Eureka在临时实例上都是基于心跳模式实现,差别不大,主要是心跳周期不同,eureka是30秒,Nacos是5秒。
另外,Nacos支持永久实例,而Eureka不支持,Eureka只提供了心跳模式的健康监测,而没有主动检测功能。
4.服务发现
Nacos提供了一个根据serviceId查询实例列表的接口:
接口描述 :查询服务下的实例列表
请求类型 :GET
请求路径 :
1 /nacos/v1/ns/instance/list
请求参数 :
serviceName
字符串
是
服务名
groupName
字符串
否
分组名
namespaceId
字符串
否
命名空间ID
clusters
字符串,多个集群用逗号分隔
否
集群名称
healthyOnly
boolean
否,默认为false
是否只返回健康实例
错误编码 :
400
Bad Request
客户端请求中的语法错误
403
Forbidden
没有权限
404
Not Found
无法找到资源
500
Internal Server Error
服务器内部错误
200
OK
正常
4.1.客户端
4.1.1.定时更新服务列表
4.1.1.1.NacosNamingService
在2.2.4小节中,我们讲到一个类NacosNamingService
,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能。
多个重载的方法最终都会进入一个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public List<Instance> getAllInstances (String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, "," )); } else { serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, "," )); } List<Instance> list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList <Instance>(); } return list; }
4.1.1.2.HostReactor
进入1.1.订阅服务消息,这里是由HostReactor
类的getServiceInfo()
方法来实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public ServiceInfo getServiceInfo (final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo (serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object ()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0 ) { synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); }
基本逻辑就是先从本地缓存读,根据结果来选择:
如果本地缓存没有,立即去nacos读取,updateServiceNow(serviceName, clusters)
如果本地缓存有,则开启定时更新功能,并返回缓存结果:
scheduleUpdateIfAbsent(serviceName, clusters)
在UpdateTask中,最终还是调用updateService方法:
不管是立即更新服务列表,还是定时更新服务列表,最终都会执行HostReactor中的updateService()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void updateService (String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false ); if (StringUtils.isNotEmpty(result)) { processServiceJson(result); } } finally { if (oldService != null ) { synchronized (oldService) { oldService.notifyAll(); } } } }
4.1.1.3.ServerProxy
而ServerProxy的queryList方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public String queryList (String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap <String, String>(8 ); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters" , clusters); params.put("udpPort" , String.valueOf(udpPort)); params.put("clientIP" , NetUtils.localIP()); params.put("healthyOnly" , String.valueOf(healthyOnly)); return reqApi(UtilAndComs.nacosUrlBase + "/instance/list" , params, HttpMethod.GET); }
4.1.2.处理服务变更通知
除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能。
在HostReactor类的构造函数中,有非常重要的几个步骤:
基本思路是:
通过PushReceiver监听服务端推送的变更数据
解析数据后,通过NotifyCenter发布服务变更的事件
InstanceChangeNotifier监听变更事件,完成对服务列表的更新
4.1.2.1.PushReceiver
我们先看PushReceiver,这个类会以UDP方式接收Nacos服务端推送的服务变更数据。
先看构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public PushReceiver (HostReactor hostReactor) { try { this .hostReactor = hostReactor; String udpPort = getPushReceiverUdpPort(); if (StringUtils.isEmpty(udpPort)) { this .udpSocket = new DatagramSocket (); } else { this .udpSocket = new DatagramSocket (new InetSocketAddress (Integer.parseInt(udpPort))); } this .executorService = new ScheduledThreadPoolExecutor (1 , new ThreadFactory () { @Override public Thread newThread (Runnable r) { Thread thread = new Thread (r); thread.setDaemon(true ); thread.setName("com.alibaba.nacos.naming.push.receiver" ); return thread; } }); this .executorService.execute(this ); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed" , e); } }
PushReceiver构造函数中基于线程池来运行任务。这是因为PushReceiver本身也是一个Runnable,其中的run方法业务逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void run () { while (!closed) { try { byte [] buffer = new byte [UDP_MSS]; DatagramPacket packet = new DatagramPacket (buffer, buffer.length); udpSocket.receive(packet); String json = new String (IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); String ack; if ("dom" .equals(pushPacket.type) || "service" .equals(pushPacket.type)) { hostReactor.processServiceJson(pushPacket.data); } catch (Exception e) { if (closed) { return ; } NAMING_LOGGER.error("[NA] error while receiving push data" , e); } } }
4.1.2.2.HostReactor
通知数据的处理由交给了HostReactor
的processServiceJson
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public ServiceInfo processServiceJson (String json) { ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); String serviceKey = serviceInfo.getKey(); if (serviceKey == null ) { return null ; } ServiceInfo oldService = serviceInfoMap.get(serviceKey); boolean changed = false ; if (oldService != null ) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0 ) { NotifyCenter.publishEvent(new InstancesChangeEvent ( serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } } else { changed = true ; serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); NotifyCenter.publishEvent(new InstancesChangeEvent ( serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } return serviceInfo; }
4.2.服务端
4.2.1.拉取服务列表接口
在2.3.1小节介绍的InstanceController中,提供了拉取服务列表的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @GetMapping("/list") @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ) public ObjectNode list (HttpServletRequest request) throws Exception { String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); String agent = WebUtils.getUserAgent(request); String clusters = WebUtils.optional(request, "clusters" , StringUtils.EMPTY); String clientIP = WebUtils.optional(request, "clientIP" , StringUtils.EMPTY); int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort" , "0" )); String env = WebUtils.optional(request, "env" , StringUtils.EMPTY); boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck" , "false" )); String app = WebUtils.optional(request, "app" , StringUtils.EMPTY); String tenant = WebUtils.optional(request, "tid" , StringUtils.EMPTY); boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly" , "false" )); return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly); }
进入doSrvIpxt()
方法来获取服务列表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public ObjectNode doSrvIpxt (String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { ClientInfo clientInfo = new ClientInfo (agent); ObjectNode result = JacksonUtils.createEmptyJsonNode(); Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); try { if (udpPort > 0 && pushService.canEnablePush(agent)) { pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress (clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error("[NACOS-API] failed to added push client {}, {}:{}" , clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } if (service == null ) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}" , serviceName); } result.put("name" , serviceName); result.put("clusters" , clusters); result.put("cacheMillis" , cacheMillis); result.replace("hosts" , JacksonUtils.createEmptyArrayNode()); return result; } result.replace("hosts" , hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0" )) >= 0 ) { result.put("dom" , serviceName); } else { result.put("dom" , NamingUtils.getServiceName(serviceName)); } result.put("name" , serviceName); result.put("cacheMillis" , cacheMillis); result.put("lastRefTime" , System.currentTimeMillis()); result.put("checksum" , service.getChecksum()); result.put("useSpecifiedURL" , false ); result.put("clusters" , clusters); result.put("env" , env); result.replace("metadata" , JacksonUtils.transferToJsonNode(service.getMetadata())); return result; }
4.2.2.发布服务变更的UDP通知
在上一节中,InstanceController
中的doSrvIpxt()
方法中,有这样一行代码:
1 2 3 pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress (clientIP, udpPort), pushDataSource, tid, app);
其实是把消费者的UDP端口、IP等信息封装为一个PushClient对象,存储PushService中。方便以后服务变更后推送消息。
PushService类本身实现了ApplicationListener
接口:
这个是事件监听器接口,监听的是ServiceChangeEvent(服务变更事件)。
当服务列表变化时,就会通知我们:
4.3.总结
Nacos的服务发现分为两种模式:
模式一:主动拉取模式,消费者定期主动从Nacos拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表。
模式二:订阅模式,消费者订阅Nacos中的服务列表,并基于UDP协议来接收服务变更通知。当Nacos中的服务列表更新时,会发送UDP广播给所有订阅者。
与Eureka相比,Nacos的订阅模式服务状态更新更及时,消费者更容易及时发现服务列表的变化,剔除故障服务。
Sentinel源码分析
1.Sentinel的基本概念
Sentinel实现限流、隔离、降级、熔断等功能,本质要做的就是两件事情:
统计数据:统计某个资源的访问数据(QPS、RT等信息)
规则判断:判断限流规则、隔离规则、降级规则、熔断规则是否满足
这里的资源 就是希望被Sentinel保护的业务,例如项目中定义的controller方法就是默认被Sentinel保护的资源。
1.1.ProcessorSlotChain
实现上述功能的核心骨架是一个叫做ProcessorSlotChain的类。这个类基于责任链模式来设计,将不同的功能(限流、降级、系统保护)封装为一个个的Slot,请求进入后逐个执行即可。
其工作流如图:
责任链中的Slot也分为两大类:
统计数据构建部分(statistic)
NodeSelectorSlot:负责构建簇点链路中的节点(DefaultNode),将这些节点形成链路树
ClusterBuilderSlot:负责构建某个资源的ClusterNode,ClusterNode可以保存资源的运行信息(响应时间、QPS、block
数目、线程数、异常数等)以及来源信息(origin名称)
StatisticSlot:负责统计实时调用数据,包括运行信息、来源信息等
规则判断部分(rule checking)
AuthoritySlot:负责授权规则(来源控制)
SystemSlot:负责系统保护规则
ParamFlowSlot:负责热点参数限流规则
FlowSlot:负责限流规则
DegradeSlot:负责降级规则
1.2.Node
Sentinel中的簇点链路是由一个个的Node组成的,Node是一个接口,包括下面的实现:
所有的节点都可以记录对资源的访问统计数据,所以都是StatisticNode的子类。
按照作用分为两类Node:
DefaultNode:代表链路树中的每一个资源,一个资源出现在不同链路中时,会创建不同的DefaultNode节点。而树的入口节点叫EntranceNode,是一种特殊的DefaultNode
ClusterNode:代表资源,一个资源不管出现在多少链路中,只会有一个ClusterNode。记录的是当前资源被访问的所有统计数据之和。
DefaultNode记录的是资源在当前链路中的访问数据,用来实现基于链路模式的限流规则。ClusterNode记录的是资源在所有链路中的访问数据,实现默认模式、关联模式的限流规则。
例如:我们在一个SpringMVC项目中,有两个业务:
业务1:controller中的资源/order/query
访问了service中的资源/goods
业务2:controller中的资源/order/save
访问了service中的资源/goods
创建的链路图如下:
1.3.Entry
默认情况下,Sentinel会将controller中的方法作为被保护资源,那么问题来了,我们该如何将自己的一段代码标记为一个Sentinel的资源呢?
Sentinel中的资源用Entry来表示。声明Entry的API示例:
1 2 3 4 5 6 7 8 try (Entry entry = SphU.entry("resourceName" )) { } catch (BlockException ex) { }
1.3.1.自定义资源
例如,我们在order-service服务中,将OrderService
的queryOrderById()
方法标记为一个资源。
1)首先在order-service中引入sentinel依赖
1 2 3 4 5 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-sentinel</artifactId > </dependency >
2)然后配置Sentinel地址
1 2 3 4 5 spring: cloud: sentinel: transport: dashboard: localhost:8089
3)修改OrderService类的queryOrderById方法
代码这样来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public Order queryOrderById (Long orderId) { try (Entry entry = SphU.entry("resource1" )) { Order order = Order.build(101L , 4999L , "小米 MIX4" , 1 , 1L , null ); User user = userClient.findById(order.getUserId()); order.setUser(user); return order; }catch (BlockException e){ log.error("被限流或降级" , e); return null ; } }
4)访问
打开浏览器,访问order服务:http://localhost:8080/order/101
然后打开sentinel控制台,查看簇点链路:
1.3.2.基于注解标记资源
在之前学习Sentinel的时候,我们知道可以通过给方法添加@SentinelResource注解的形式来标记资源。
这个是怎么实现的呢?
来看下我们引入的Sentinel依赖包:
其中的spring.factories声明需要就是自动装配的配置类,内容如下:
我们来看下SentinelAutoConfiguration
这个类:
可以看到,在这里声明了一个Bean,SentinelResourceAspect
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Aspect public class SentinelResourceAspect extends AbstractSentinelAspectSupport { @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut () { } @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel (ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null ) { throw new IllegalStateException ("Wrong state for SentinelResource annotation" ); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null ; try { entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); Object result = pjp.proceed(); return result; } catch (BlockException ex) { return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable >[] exceptionsToIgnore = annotation.exceptionsToIgnore(); if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } throw ex; } finally { if (entry != null ) { entry.exit(1 , pjp.getArgs()); } } } }
简单来说,@SentinelResource注解就是一个标记 ,而Sentinel基于AOP思想,对被标记的方法做环绕增强,完成资源(Entry
)的创建。
1.4.Context
上一节,我们发现簇点链路中除了controller方法、service方法两个资源外,还多了一个默认的入口节点:
sentinel_spring_web_context,是一个EntranceNode类型的节点
这个节点是在初始化Context的时候由Sentinel帮我们创建的。
1.4.1.什么是Context
那么,什么是Context呢?
Context 代表调用链路上下文,贯穿一次调用链路中的所有资源(
Entry
),基于ThreadLocal。
Context 维持着入口节点(entranceNode
)、本次调用链路的
curNode(当前资源节点)、调用来源(origin
)等信息。
后续的Slot都可以通过Context拿到DefaultNode或者ClusterNode,从而获取统计数据,完成规则判断
Context初始化的过程中,会创建EntranceNode,contextName就是EntranceNode的名称
对应的API如下:
1 2 ContextUtil.enter("contextName" , "originName" );
1.4.2.Context的初始化
那么这个Context又是在何时完成初始化的呢?
1.4.2.1.自动装配
来看下我们引入的Sentinel依赖包:
其中的spring.factories声明需要就是自动装配的配置类,内容如下:
我们先看SentinelWebAutoConfiguration这个类:
这个类实现了WebMvcConfigurer,我们知道这个是SpringMVC自定义配置用到的类,可以配置HandlerInterceptor:
可以看到这里配置了一个SentinelWebInterceptor
的拦截器。
SentinelWebInterceptor
的声明如下:
发现它继承了AbstractSentinelInterceptor
这个类。
HandlerInterceptor
拦截器会拦截一切进入controller的方法,执行preHandle
前置拦截方法,而Context的初始化就是在这里完成的。
1.4.2.2.AbstractSentinelInterceptor
HandlerInterceptor
拦截器会拦截一切进入controller的方法,执行preHandle
前置拦截方法,而Context的初始化就是在这里完成的。
我们来看看这个类的preHandle
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { try { String resourceName = getResourceName(request); if (StringUtil.isEmpty(resourceName)) { return true ; } String origin = parseOrigin(request); String contextName = getContextName(request); ContextUtil.enter(contextName, origin); Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN); request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry); return true ; } catch (BlockException e) { try { handleBlockException(request, response, e); } finally { ContextUtil.exit(); } return false ; } }
1.4.2.3.ContextUtil
创建Context的方法就是ContextUtil.enter(contextName, origin);
我们进入该方法:
1 2 3 4 5 6 7 public static Context enter (String name, String origin) { if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) { throw new ContextNameDefineException ( "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!" ); } return trueEnter(name, origin); }
进入trueEnter
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 protected static Context trueEnter (String name, String origin) { Context context = contextHolder.get(); if (context == null ) { Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; DefaultNode node = localCacheNameMap.get(name); if (node == null ) { LOCK.lock(); try { node = contextNameNodeMap.get(name); if (node == null ) { node = new EntranceNode (new StringResourceWrapper (name, EntryType.IN), null ); Constants.ROOT.addChild(node); Map<String, DefaultNode> newMap = new HashMap <>(contextNameNodeMap.size() + 1 ); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } finally { LOCK.unlock(); } } context = new Context (node, name); context.setOrigin(origin); contextHolder.set(context); } return context; }
2.ProcessorSlotChain执行流程
接下来我们跟踪源码,验证下ProcessorSlotChain的执行流程。
2.1.入口
首先,回到一切的入口,AbstractSentinelInterceptor
类的preHandle
方法:
还有,SentinelResourceAspect
的环绕增强方法:
可以看到,任何一个资源必定要执行SphU.entry()
这个方法:
1 2 3 4 public static Entry entry (String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException { return Env.sph.entryWithType(name, resourceType, trafficType, 1 , args); }
继续进入Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
:
1 2 3 4 5 6 7 8 @Override public Entry entryWithType (String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException { StringResourceWrapper resource = new StringResourceWrapper (name, entryType, resourceType); return entryWithPriority(resource, count, prioritized, args); }
进入entryWithPriority
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private Entry entryWithPriority (ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { Context context = ContextUtil.getContext(); if (context == null ) { context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } 、 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); Entry e = new CtEntry (resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper, null , count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { RecordLog.info("Sentinel unexpected exception" , e1); } return e; }
在这段代码中,会获取ProcessorSlotChain
对象,然后基于chain.entry()开始执行slotChain中的每一个Slot.
而这里创建的是其实现类:DefaultProcessorSlotChain.
获取ProcessorSlotChain以后会保存到一个Map中,key是ResourceWrapper,值是ProcessorSlotChain.
所以,一个资源只会有一个ProcessorSlotChain .
2.2.DefaultProcessorSlotChain
我们进入DefaultProcessorSlotChain的entry方法:
1 2 3 4 5 6 @Override public void entry (Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { first.transformEntry(context, resourceWrapper, t, count, prioritized, args); }
这里的first,类型是AbstractLinkedProcessorSlot:
看下继承关系:
因此,first一定是这些实现类中的一个,按照最早讲的责任链顺序,first应该就是
NodeSelectorSlot
。
不过,既然是基于责任链模式,所以这里只要记住下一个slot就可以了,也就是next:
next确实是NodeSelectSlot类型。
而NodeSelectSlot的next一定是ClusterBuilderSlot,依次类推:
责任链就建立起来了。
2.3.NodeSelectorSlot
NodeSelectorSlot负责构建簇点链路中的节点(DefaultNode),将这些节点形成链路树。
核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void entry (Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { DefaultNode node = map.get(context.getName()); if (node == null ) { synchronized (this ) { node = map.get(context.getName()); if (node == null ) { node = new DefaultNode (resourceWrapper, null ); HashMap<String, DefaultNode> cacheMap = new HashMap <String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; ((DefaultNode) context.getLastNode()).addChild(node); } } } context.setCurNode(node); fireEntry(context, resourceWrapper, node, count, prioritized, args); }
这个Slot完成了这么几件事情:
为当前资源创建 DefaultNode
将DefaultNode放入缓存中,key是contextName,这样不同链路入口的请求,将会创建多个DefaultNode,相同链路则只有一个DefaultNode
将当前资源的DefaultNode设置为上一个资源的childNode
将当前资源的DefaultNode设置为Context中的curNode(当前节点)
下一个slot,就是ClusterBuilderSlot
2.4.ClusterBuilderSlot
ClusterBuilderSlot负责构建某个资源的ClusterNode,核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void entry (Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { if (clusterNode == null ) { synchronized (lock) { if (clusterNode == null ) { clusterNode = new ClusterNode (resourceWrapper.getName(), resourceWrapper.getResourceType()); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap <>(Math.max(clusterNodeMap.size(), 16 )); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } node.setClusterNode(clusterNode); if (!"" .equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } fireEntry(context, resourceWrapper, node, count, prioritized, args); }
2.5.StatisticSlot
StatisticSlot负责统计实时调用数据,包括运行信息(访问次数、线程数)、来源信息等。
StatisticSlot是实现限流的关键,其中基于滑动时间窗口算法 维护了计数器,统计进入某个资源的请求次数。
核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override public void entry (Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { fireEntry(context, resourceWrapper, node, count, prioritized, args); node.increaseThreadNum(); node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null ) { context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (Throwable e) { context.getCurEntry().setError(e); throw e; } }
另外,需要注意的是,所有的计数+1动作都包括两部分,以node.addPassRequest(count);
为例:
1 2 3 4 5 6 7 @Override public void addPassRequest (int count) { super .addPassRequest(count); this .clusterNode.addPassRequest(count); }
具体计数方式,我们后续再看。
接下来,进入规则校验的相关slot了,依次是:
AuthoritySlot:负责授权规则(来源控制)
SystemSlot:负责系统保护规则
ParamFlowSlot:负责热点参数限流规则
FlowSlot:负责限流规则
DegradeSlot:负责降级规则
2.6.AuthoritySlot
负责请求来源origin的授权规则判断,如图:
核心API:
1 2 3 4 5 6 7 8 @Override public void entry (Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkBlackWhiteAuthority(resourceWrapper, context); fireEntry(context, resourceWrapper, node, count, prioritized, args); }
黑白名单校验的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void checkBlackWhiteAuthority (ResourceWrapper resource, Context context) throws AuthorityException { Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules(); if (authorityRules == null ) { return ; } Set<AuthorityRule> rules = authorityRules.get(resource.getName()); if (rules == null ) { return ; } for (AuthorityRule rule : rules) { if (!AuthorityRuleChecker.passCheck(rule, context)) { throw new AuthorityException (context.getOrigin(), rule); } } }
再看下AuthorityRuleChecker.passCheck(rule, context)
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 static boolean passCheck (AuthorityRule rule, Context context) { String requester = context.getOrigin(); if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) { return true ; } int pos = rule.getLimitApp().indexOf(requester); boolean contain = pos > -1 ; if (contain) { boolean exactlyMatch = false ; String[] appArray = rule.getLimitApp().split("," ); for (String app : appArray) { if (requester.equals(app)) { exactlyMatch = true ; break ; } } contain = exactlyMatch; } int strategy = rule.getStrategy(); if (strategy == RuleConstant.AUTHORITY_BLACK && contain) { return false ; } if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) { return false ; } return true ; }
2.7.SystemSlot
SystemSlot是对系统保护的规则校验:
核心API:
1 2 3 4 5 6 7 8 @Override public void entry (Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable { SystemRuleManager.checkSystem(resourceWrapper); fireEntry(context, resourceWrapper, node, count, prioritized, args); }
来看下SystemRuleManager.checkSystem(resourceWrapper);
的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public static void checkSystem (ResourceWrapper resourceWrapper) throws BlockException { if (resourceWrapper == null ) { return ; } if (!checkSystemStatus.get()) { return ; } if (resourceWrapper.getEntryType() != EntryType.IN) { return ; } double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps(); if (currentQps > qps) { throw new SystemBlockException (resourceWrapper.getName(), "qps" ); } int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum(); if (currentThread > maxThread) { throw new SystemBlockException (resourceWrapper.getName(), "thread" ); } double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt(); if (rt > maxRt) { throw new SystemBlockException (resourceWrapper.getName(), "rt" ); } if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) { if (!checkBbr(currentThread)) { throw new SystemBlockException (resourceWrapper.getName(), "load" ); } } if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) { throw new SystemBlockException (resourceWrapper.getName(), "cpu" ); } }
2.8.ParamFlowSlot
ParamFlowSlot就是热点参数限流,如图:
是针对进入资源的请求,针对不同的请求参数值分别统计QPS的限流方式。
含义是每隔duration时间长度内,最多生产maxCount个令牌,上图配置的含义是每1秒钟生产2个令牌。
核心API:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void entry (Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) { fireEntry(context, resourceWrapper, node, count, prioritized, args); return ; } checkFlow(resourceWrapper, count, args); fireEntry(context, resourceWrapper, node, count, prioritized, args); }
2.8.1.令牌桶
热点规则判断采用了令牌桶算法来实现参数限流,为每一个不同参数值设置令牌桶,Sentinel的令牌桶有两部分组成:
这两个Map的key都是请求的参数值,value却不同,其中:
tokenCounters:用来记录剩余令牌数量
timeCounters:用来记录上一个请求的时间
当一个携带参数的请求到来后,基本判断流程是这样的:
sentinel
2.9.FlowSlot
FlowSlot是负责限流规则的判断,如图:
包括:
三种流控模式:直接模式、关联模式、链路模式
三种流控效果:快速失败、warm up、排队等待
三种流控模式,从底层数据统计 角度,分为两类:
对进入资源的所有请求(ClusterNode)做限流统计:直接模式、关联模式
对进入资源的部分链路(DefaultNode)做限流统计:链路模式
三种流控效果,从限流算法 来看,分为两类:
滑动时间窗口算法:快速失败、warm up
漏桶算法:排队等待效果
2.9.1.核心流程
核心API如下:
1 2 3 4 5 6 7 8 @Override public void entry (Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); }
checkFlow方法:
1 2 3 4 5 void checkFlow (ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); }
跟入FlowRuleChecker:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void checkFlow (Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null ) { return ; } Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null ) { for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException (rule.getLimitApp(), rule); } } } }
这里的FlowRule就是限流规则接口,其中的几个成员变量,刚好对应表单参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class FlowRule extends AbstractRule { private int grade = RuleConstant.FLOW_GRADE_QPS; private double count; private int strategy = RuleConstant.STRATEGY_DIRECT; private String refResource; private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; private int warmUpPeriodSec = 10 ; private int maxQueueingTimeMs = 500 ; }
校验的逻辑定义在FlowRuleChecker
的canPassCheck
方法中:
1 2 3 4 5 6 7 8 9 10 public boolean canPassCheck ( FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { String limitApp = rule.getLimitApp(); if (limitApp == null ) { return true ; } return passLocalCheck(rule, context, node, acquireCount, prioritized); }
进入passLocalCheck()
:
1 2 3 4 5 6 7 8 9 10 11 private static boolean passLocalCheck (FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null ) { return true ; } return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
这里对规则的判断先要通过FlowRule#getRater()
获取流量控制器TrafficShapingController
,然后再做限流。
而TrafficShapingController
有3种实现:
DefaultController:快速失败,默认的方式,基于滑动时间窗口算法
WarmUpController:预热模式,基于滑动时间窗口算法,只不过阈值是动态的
RateLimiterController:排队等待模式,基于漏桶算法
最终的限流判断都在TrafficShapingController的canPass方法中。
2.9.2.滑动时间窗口
滑动时间窗口的功能分两部分来看:
一是时间区间窗口的QPS计数功能,这个是在StatisticSlot中调用的
二是对滑动窗口内的时间区间窗口QPS累加,这个是在FlowRule中调用的
先来看时间区间窗口的QPS计数功能。
2.9.2.1.时间窗口请求量统计
回顾2.5章节中的StatisticSlot部分,有这样一段代码:
就是在统计通过该节点的QPS,我们跟入看看,这里进入了DefaultNode内部:
发现同时对DefaultNode
和ClusterNode
在做QPS统计,我们知道DefaultNode
和ClusterNode
都是StatisticNode
的子类,这里调用addPassRequest()
方法,最终都会进入StatisticNode
中。
随便跟入一个:
这里有秒、分两种纬度的统计,对应两个计数器。找到对应的成员变量,可以看到:
两个计数器都是ArrayMetric类型,并且传入了两个参数:
1 2 3 4 5 public ArrayMetric (int sampleCount, int intervalInMs) { this .data = new OccupiableBucketLeapArray (sampleCount, intervalInMs); }
如图:
接下来,我们进入ArrayMetric
类的addPass
方法:
1 2 3 4 5 6 7 @Override public void addPass (int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); }
那么,计数器如何知道当前所在的窗口是哪个呢?
这里的data是一个LeapArray:
LeapArray的四个属性:
1 2 3 4 5 6 7 8 9 10 public abstract class LeapArray <T> { protected int windowLengthInMs; protected int sampleCount; protected int intervalInMs; private double intervalInSecond; }
LeapArray是一个环形数组,因为时间是无限的,数组长度不可能无限,因此数组中每一个格子放入一个时间窗(window),当数组放满后,角标归0,覆盖最初的window。
因为滑动窗口最多分成sampleCount数量的小窗口,因此数组长度只要大于sampleCount,那么最近的一个滑动窗口内的2个小窗口就永远不会被覆盖,就不用担心旧数据被覆盖的问题了。
我们跟入data.currentWindow();
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public WindowWrap<T> currentWindow (long timeMillis) { if (timeMillis < 0 ) { return null ; } int idx = calculateTimeIdx(timeMillis); long windowStart = calculateWindowStart(timeMillis); while (true ) { WindowWrap<T> old = array.get(idx); if (old == null ) { WindowWrap<T> window = new WindowWrap <T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null , window)) { return window; } else { Thread.yield (); } } else if (windowStart == old.windowStart()) { return old; } else if (windowStart > old.windowStart()) { if (updateLock.tryLock()) { try { return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield (); } } else if (windowStart < old.windowStart()) { return new WindowWrap <T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
找到当前时间所在窗口(WindowWrap)后,只要调用WindowWrap对象中的add方法,计数器+1即可。
这里只负责统计每个窗口的请求量,不负责拦截。限流拦截要看FlowSlot中的逻辑。
2.9.2.2.滑动窗口QPS计算
在2.9.1小节我们讲过,FlowSlot的限流判断最终都由TrafficShapingController
接口中的canPass
方法来实现。该接口有三个实现类:
DefaultController:快速失败,默认的方式,基于滑动时间窗口算法
WarmUpController:预热模式,基于滑动时间窗口算法,只不过阈值是动态的
RateLimiterController:排队等待模式,基于漏桶算法
因此,我们跟入默认的DefaultController中的canPass方法来分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public boolean canPass (Node node, int acquireCount, boolean prioritized) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); throw new PriorityWaitException (waitInMs); } } return false ; } return true ; }
因此,判断的关键就是int curCount = avgUsedTokens(node);
1 2 3 4 5 6 private int avgUsedTokens (Node node) { if (node == null ) { return DEFAULT_AVG_USED_TOKENS; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int )(node.passQps()); }
因为我们采用的是限流,走node.passQps()
逻辑:
1 2 3 4 5 6 @Override public double passQps () { return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec(); }
那么rollingCounterInSecond.pass()
是如何得到请求量的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public long pass () { data.currentWindow(); long pass = 0 ; List<MetricBucket> list = data.values(); for (MetricBucket window : list) { pass += window.pass(); } return pass; }
来看看data.values()
如何获取 滑动窗口范围内
的所有小窗口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public List<T> values (long timeMillis) { if (timeMillis < 0 ) { return new ArrayList <T>(); } int size = array.length(); List<T> result = new ArrayList <T>(size); for (int i = 0 ; i < size; i++) { WindowWrap<T> windowWrap = array.get(i); if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) { continue ; } result.add(windowWrap.value()); } return result; }
那么,isWindowDeprecated(timeMillis, windowWrap)
又是如何判断窗口是否符合要求呢?
1 2 3 4 5 public boolean isWindowDeprecated (long time, WindowWrap<T> windowWrap) { return time - windowWrap.windowStart() > intervalInMs; }
2.9.3.漏桶
上一节我们讲过,FlowSlot的限流判断最终都由TrafficShapingController
接口中的canPass
方法来实现。该接口有三个实现类:
DefaultController:快速失败,默认的方式,基于滑动时间窗口算法
WarmUpController:预热模式,基于滑动时间窗口算法,只不过阈值是动态的
RateLimiterController:排队等待模式,基于漏桶算法
因此,我们跟入默认的RateLimiterController中的canPass方法来分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Override public boolean canPass (Node node, int acquireCount, boolean prioritized) { if (acquireCount <= 0 ) { return true ; } if (count <= 0 ) { return false ; } long currentTime = TimeUtil.currentTimeMillis(); long costTime = Math.round(1.0 * (acquireCount) / count * 1000 ); long expectedTime = costTime + latestPassedTime.get(); if (expectedTime <= currentTime) { latestPassedTime.set(currentTime); return true ; } else { long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { return false ; } else { long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false ; } if (waitTime > 0 ) { Thread.sleep(waitTime); } return true ; } catch (InterruptedException e) { } } } return false ; }
与我们之前分析的漏桶算法基本一致:
2.10.DegradeSlot
最后一关,就是降级规则判断了。
Sentinel的降级是基于状态机来实现的:
对应的实现在DegradeSlot类中,核心API:
1 2 3 4 5 6 7 8 @Override public void entry (Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { performChecking(context, resourceWrapper); fireEntry(context, resourceWrapper, node, count, prioritized, args); }
继续进入performChecking
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 void performChecking (Context context, ResourceWrapper r) throws BlockException { List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { return ; } for (CircuitBreaker cb : circuitBreakers) { if (!cb.tryPass(context)) { throw new DegradeException (cb.getRule().getLimitApp(), cb.getRule()); } } }
2.10.1.CircuitBreaker
我们进入CircuitBreaker的tryPass方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public boolean tryPass (Context context) { if (currentState.get() == State.CLOSED) { return true ; } if (currentState.get() == State.OPEN) { return retryTimeoutArrived() && fromOpenToHalfOpen(context); } return false ; }
有关时间窗的判断在retryTimeoutArrived()
方法:
1 2 3 4 protected boolean retryTimeoutArrived () { return TimeUtil.currentTimeMillis() >= nextRetryTimestamp; }
OPEN到HALF_OPEN切换在fromOpenToHalfOpen(context)
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected boolean fromOpenToHalfOpen (Context context) { if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) { notifyObservers(State.OPEN, State.HALF_OPEN, null ); Entry entry = context.getCurEntry(); entry.whenTerminate(new BiConsumer <Context, Entry>() { @Override public void accept (Context context, Entry entry) { if (entry.getBlockError() != null ) { currentState.compareAndSet(State.HALF_OPEN, State.OPEN); notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d ); } } }); return true ; } return false ; }
这里出现了从OPEN到HALF_OPEN、从HALF_OPEN到OPEN的变化,但是还有几个没有:
从CLOSED到OPEN
从HALF_OPEN到CLOSED
2.10.2.触发断路器
请求经过所有插槽
后,一定会执行exit方法,而在DegradeSlot的exit方法中:
会调用CircuitBreaker的onRequestComplete方法。而CircuitBreaker有两个实现:
我们这里以异常比例熔断为例来看,进入ExceptionCircuitBreaker
的onRequestComplete
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void onRequestComplete (Context context) { Entry entry = context.getCurEntry(); if (entry == null ) { return ; } Throwable error = entry.getError(); SimpleErrorCounter counter = stat.currentWindow().value(); if (error != null ) { counter.getErrorCount().add(1 ); } counter.getTotalCount().add(1 ); handleStateChangeWhenThresholdExceeded(error); }
来看阈值判断的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private void handleStateChangeWhenThresholdExceeded (Throwable error) { if (currentState.get() == State.OPEN) { return ; } if (currentState.get() == State.HALF_OPEN) { if (error == null ) { fromHalfOpenToClose(); } else { fromHalfOpenToOpen(1.0d ); } return ; } List<SimpleErrorCounter> counters = stat.values(); long errCount = 0 ; long totalCount = 0 ; for (SimpleErrorCounter counter : counters) { errCount += counter.errorCount.sum(); totalCount += counter.totalCount.sum(); } if (totalCount < minRequestAmount) { return ; } double curCount = errCount; if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) { curCount = errCount * 1.0d / totalCount; } if (curCount > threshold) { transformToOpen(curCount); } }