1.微服务篇

1.1.SpringCloud常见组件有哪些?

问题说明:这个题目主要考察对SpringCloud的组件基本了解

难易程度:简单

参考话术

SpringCloud包含的组件很多,有很多功能是重复的。其中最常用组件包括:

•注册中心组件:Eureka、Nacos等

•负载均衡组件:Ribbon

•远程调用组件:OpenFeign

•网关组件:Zuul、Gateway

•服务保护组件:Hystrix、Sentinel

•服务配置管理组件:SpringCloudConfig、Nacos

1.2.Nacos的服务注册表结构是怎样的?

问题说明:考察对Nacos数据分级结构的了解,以及Nacos源码的掌握情况

难易程度:一般

参考话术

Nacos采用了数据的分级存储模型,最外层是Namespace,用来隔离环境。然后是Group,用来对服务分组。接下来就是服务(Service)了,一个服务包含多个实例,但是可能处于不同机房,因此Service下有多个集群(Cluster),Cluster下是不同的实例(Instance)。

对应到Java代码中,Nacos采用了一个多层的Map来表示。结构为Map<String, Map<String, Service>>,其中最外层Map的key就是namespaceId,值是一个Map。内层Map的key是group拼接serviceName,值是Service对象。Service对象内部又是一个Map,key是集群名称,值是Cluster对象。而Cluster对象内部维护了Instance的集合。

如图:

1.3.Nacos如何支撑阿里内部数十万服务注册压力?

问题说明:考察对Nacos源码的掌握情况

难易程度:难

参考话术

Nacos内部接收到注册的请求时,不会立即写数据,而是将服务注册的任务放入一个阻塞队列就立即响应给客户端。然后利用线程池读取阻塞队列中的任务,异步来完成实例更新,从而提高并发写能力。

1.4.Nacos如何避免并发读写冲突问题?

问题说明:考察对Nacos源码的掌握情况

难易程度:难

参考话术

Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将旧的实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。

这样在更新的过程中,就不会对读实例列表的请求产生影响,也不会出现脏读问题了。

1.5.Nacos与Eureka的区别有哪些?

问题说明:考察对Nacos、Eureka的底层实现的掌握情况

难易程度:难

参考话术

Nacos与Eureka有相同点,也有不同之处,可以从以下几点来描述:

  • 接口方式:Nacos与Eureka都对外暴露了Rest风格的API接口,用来实现服务注册、发现等功能
  • 实例类型:Nacos的实例有永久和临时实例之分;而Eureka只支持临时实例
  • 健康检测:Nacos对临时实例采用心跳模式检测,对永久实例采用主动请求来检测;Eureka只支持心跳模式
  • 服务发现:Nacos支持定时拉取和订阅推送两种模式;Eureka只支持定时拉取模式

1.6.Sentinel的限流与Gateway的限流有什么差别?

问题说明:考察对限流算法的掌握情况

难易程度:难

参考话术

限流算法常见的有三种实现:滑动时间窗口、令牌桶算法、漏桶算法。Gateway则采用了基于Redis实现的令牌桶算法。

而Sentinel内部却比较复杂:

  • 默认限流模式是基于滑动时间窗口算法
  • 排队等待的限流模式则基于漏桶算法
  • 而热点参数限流则是基于令牌桶算法

1.7.Sentinel的线程隔离与Hystix的线程隔离有什么差别?

问题说明:考察对线程隔离方案的掌握情况

难易程度:一般

参考话术

Hystix默认是基于线程池实现的线程隔离,每一个被隔离的业务都要创建一个独立的线程池,线程过多会带来额外的CPU开销,性能一般,但是隔离性更强。

Sentinel是基于信号量(计数器)实现的线程隔离,不用创建线程池,性能较好,但是隔离性一般。

2.MQ篇

2.1.你们为什么选择了RabbitMQ而不是其它的MQ?

如图:

话术:

kafka是以吞吐量高而闻名,不过其数据稳定性一般,而且无法保证消息有序性。我们公司的日志收集也有使用,业务模块中则使用的RabbitMQ。

阿里巴巴的RocketMQ基于Kafka的原理,弥补了Kafka的缺点,继承了其高吞吐的优势,其客户端目前以Java为主。但是我们担心阿里巴巴开源产品的稳定性,所以就没有使用。

RabbitMQ基于面向并发的语言Erlang开发,吞吐量不如Kafka,但是对我们公司来讲够用了。而且消息可靠性较好,并且消息延迟极低,集群搭建比较方便。支持多种协议,并且有各种语言的客户端,比较灵活。Spring对RabbitMQ的支持也比较好,使用起来比较方便,比较符合我们公司的需求。

综合考虑我们公司的并发需求以及稳定性需求,我们选择了RabbitMQ。

2.2.RabbitMQ如何确保消息的不丢失?

话术:

RabbitMQ针对消息传递过程中可能发生问题的各个地方,给出了针对性的解决方案:

  • 生产者发送消息时可能因为网络问题导致消息没有到达交换机:
    • RabbitMQ提供了publisher confirm机制
      • 生产者发送消息后,可以编写ConfirmCallback函数
      • 消息成功到达交换机后,RabbitMQ会调用ConfirmCallback通知消息的发送者,返回ACK
      • 消息如果未到达交换机,RabbitMQ也会调用ConfirmCallback通知消息的发送者,返回NACK
      • 消息超时未发送成功也会抛出异常
  • 消息到达交换机后,如果未能到达队列,也会导致消息丢失:
    • RabbitMQ提供了publisher return机制
      • 生产者可以定义ReturnCallback函数
      • 消息到达交换机,未到达队列,RabbitMQ会调用ReturnCallback通知发送者,告知失败原因
  • 消息到达队列后,MQ宕机也可能导致丢失消息:
    • RabbitMQ提供了持久化功能,集群的主从备份功能
      • 消息持久化,RabbitMQ会将交换机、队列、消息持久化到磁盘,宕机重启可以恢复消息
      • 镜像集群,仲裁队列,都可以提供主从备份功能,主节点宕机,从节点会自动切换为主,数据依然在
  • 消息投递给消费者后,如果消费者处理不当,也可能导致消息丢失
    • SpringAMQP基于RabbitMQ提供了消费者确认机制、消费者重试机制,消费者失败处理策略:
      • 消费者的确认机制:
        • 消费者处理消息成功,未出现异常时,Spring返回ACK给RabbitMQ,消息才被移除
        • 消费者处理消息失败,抛出异常,宕机,Spring返回NACK或者不返回结果,消息不被异常
      • 消费者重试机制:
        • 默认情况下,消费者处理失败时,消息会再次回到MQ队列,然后投递给其它消费者。Spring提供的消费者重试机制,则是在处理失败后不返回NACK,而是直接在消费者本地重试。多次重试都失败后,则按照消费者失败处理策略来处理消息。避免了消息频繁入队带来的额外压力。
      • 消费者失败策略:
        • 当消费者多次本地重试失败时,消息默认会丢弃。
        • Spring提供了Republish策略,在多次重试都失败,耗尽重试次数后,将消息重新投递给指定的异常交换机,并且会携带上异常栈信息,帮助定位问题。

2.3.RabbitMQ如何避免消息堆积?

话术:

消息堆积问题产生的原因往往是因为消息发送的速度超过了消费者消息处理的速度。因此解决方案无外乎以下三点:

  • 提高消费者处理速度
  • 增加更多消费者
  • 增加队列消息存储上限

1)提高消费者处理速度

消费者处理速度是由业务代码决定的,所以我们能做的事情包括:

  • 尽可能优化业务代码,提高业务性能
  • 接收到消息后,开启线程池,并发处理多个消息

优点:成本低,改改代码即可

缺点:开启线程池会带来额外的性能开销,对于高频、低时延的任务不合适。推荐任务执行周期较长的业务。

2)增加更多消费者

一个队列绑定多个消费者,共同争抢任务,自然可以提供消息处理的速度。

优点:能用钱解决的问题都不是问题。实现简单粗暴

缺点:问题是没有钱。成本太高

3)增加队列消息存储上限

在RabbitMQ的1.8版本后,加入了新的队列模式:Lazy Queue

这种队列不会将消息保存在内存中,而是在收到消息后直接写入磁盘中,理论上没有存储上限。可以解决消息堆积问题。

优点:磁盘存储更安全;存储无上限;避免内存存储带来的Page Out问题,性能更稳定;

缺点:磁盘存储受到IO性能的限制,消息时效性不如内存模式,但影响不大。

2.4.RabbitMQ如何保证消息的有序性?

话术:

其实RabbitMQ是队列存储,天然具备先进先出的特点,只要消息的发送是有序的,那么理论上接收也是有序的。不过当一个队列绑定了多个消费者时,可能出现消息轮询投递给消费者的情况,而消费者的处理顺序就无法保证了。

因此,要保证消息的有序性,需要做的下面几点:

  • 保证消息发送的有序性
  • 保证一组有序的消息都发送到同一个队列
  • 保证一个队列只包含一个消费者

2.5.如何防止MQ消息被重复消费?

话术:

消息重复消费的原因多种多样,不可避免。所以只能从消费者端入手,只要能保证消息处理的幂等性就可以确保消息不被重复消费。

而幂等性的保证又有很多方案:

  • 给每一条消息都添加一个唯一id,在本地记录消息表及消息状态,处理消息时基于数据库表的id唯一性做判断
  • 同样是记录消息表,利用消息状态字段实现基于乐观锁的判断,保证幂等
  • 基于业务本身的幂等性。比如根据id的删除、查询业务天生幂等;新增、修改等业务可以考虑基于数据库id唯一性、或者乐观锁机制确保幂等。本质与消息表方案类似。

2.6.如何保证RabbitMQ的高可用?

话术:

要实现RabbitMQ的高可用无外乎下面两点:

  • 做好交换机、队列、消息的持久化
  • 搭建RabbitMQ的镜像集群,做好主从备份。当然也可以使用仲裁队列代替镜像集群。

2.7.使用MQ可以解决那些问题?

话术:

RabbitMQ能解决的问题很多,例如:

  • 解耦合:将几个业务关联的微服务调用修改为基于MQ的异步通知,可以解除微服务之间的业务耦合。同时还提高了业务性能。
  • 流量削峰:将突发的业务请求放入MQ中,作为缓冲区。后端的业务根据自己的处理能力从MQ中获取消息,逐个处理任务。流量曲线变的平滑很多
  • 延迟队列:基于RabbitMQ的死信队列或者DelayExchange插件,可以实现消息发送后,延迟接收的效果。

3.Redis篇

3.1.Redis与Memcache的区别?

  • redis支持更丰富的数据类型(支持更复杂的应用场景):Redis不仅仅支持简单的k/v类型的数据,同时还提供list,set,zset,hash等数据结构的存储。memcache支持简单的数据类型,String。
  • Redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用,而Memecache把数据全部存在内存之中。
  • 集群模式:memcached没有原生的集群模式,需要依靠客户端来实现往集群中分片写入数据;但是 redis 目前是原生支持 cluster 模式的.
  • Redis使用单线程:Memcached是多线程,非阻塞IO复用的网络模型;Redis使用单线程的多路 IO 复用模型。
1574821356723

3.2.Redis的单线程问题

面试官:Redis采用单线程,如何保证高并发?

面试话术

Redis快的主要原因是:

  1. 完全基于内存
  2. 数据结构简单,对数据操作也简单
  3. 使用多路 I/O 复用模型,充分利用CPU资源

面试官:这样做的好处是什么?

面试话术

单线程优势有下面几点:

  • 代码更清晰,处理逻辑更简单
  • 不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为锁而导致的性能消耗
  • 不存在多进程或者多线程导致的CPU切换,充分利用CPU资源

3.2.Redis的持久化方案由哪些?

相关资料:

1)RDB 持久化

RDB持久化可以使用save或bgsave,为了不阻塞主进程业务,一般都使用bgsave,流程:

  • Redis 进程会 fork 出一个子进程(与父进程内存数据一致)。
  • 父进程继续处理客户端请求命令
  • 由子进程将内存中的所有数据写入到一个临时的 RDB 文件中。
  • 完成写入操作之后,旧的 RDB 文件会被新的 RDB 文件替换掉。

下面是一些和 RDB 持久化相关的配置:

  • save 60 10000:如果在 60 秒内有 10000 个 key 发生改变,那就执行 RDB 持久化。
  • stop-writes-on-bgsave-error yes:如果 Redis 执行 RDB 持久化失败(常见于操作系统内存不足),那么 Redis 将不再接受 client 写入数据的请求。
  • rdbcompression yes:当生成 RDB 文件时,同时进行压缩。
  • dbfilename dump.rdb:将 RDB 文件命名为 dump.rdb。
  • dir /var/lib/redis:将 RDB 文件保存在/var/lib/redis目录下。

  当然在实践中,我们通常会将stop-writes-on-bgsave-error设置为false,同时让监控系统在 Redis 执行 RDB 持久化失败时发送告警,以便人工介入解决,而不是粗暴地拒绝 client 的写入请求。

RDB持久化的优点:

  • RDB持久化文件小,Redis数据恢复时速度快
  • 子进程不影响父进程,父进程可以持续处理客户端命令
  • 子进程fork时采用copy-on-write方式,大多数情况下,没有太多的内存消耗,效率比较好。

RDB 持久化的缺点:

  • 子进程fork时采用copy-on-write方式,如果Redis此时写操作较多,可能导致额外的内存占用,甚至内存溢出
  • RDB文件压缩会减小文件体积,但通过时会对CPU有额外的消耗
  • 如果业务场景很看重数据的持久性 (durability),那么不应该采用 RDB 持久化。譬如说,如果 Redis 每 5 分钟执行一次 RDB 持久化,要是 Redis 意外奔溃了,那么最多会丢失 5 分钟的数据。

2)AOF 持久化

  可以使用appendonly yes配置项来开启 AOF 持久化。Redis 执行 AOF 持久化时,会将接收到的写命令追加到 AOF 文件的末尾,因此 Redis 只要对 AOF 文件中的命令进行回放,就可以将数据库还原到原先的状态。   与 RDB 持久化相比,AOF 持久化的一个明显优势就是,它可以提高数据的持久性 (durability)。因为在 AOF 模式下,Redis 每次接收到 client 的写命令,就会将命令write()到 AOF 文件末尾。   然而,在 Linux 中,将数据write()到文件后,数据并不会立即刷新到磁盘,而会先暂存在 OS 的文件系统缓冲区。在合适的时机,OS 才会将缓冲区的数据刷新到磁盘(如果需要将文件内容刷新到磁盘,可以调用fsync()fdatasync())。   通过appendfsync配置项,可以控制 Redis 将命令同步到磁盘的频率:

  • always:每次 Redis 将命令write()到 AOF 文件时,都会调用fsync(),将命令刷新到磁盘。这可以保证最好的数据持久性,但却会给系统带来极大的开销。
  • no:Redis 只将命令write()到 AOF 文件。这会让 OS 决定何时将命令刷新到磁盘。
  • everysec:除了将命令write()到 AOF 文件,Redis 还会每秒执行一次fsync()。在实践中,推荐使用这种设置,一定程度上可以保证数据持久性,又不会明显降低 Redis 性能。

  然而,AOF 持久化并不是没有缺点的:Redis 会不断将接收到的写命令追加到 AOF 文件中,导致 AOF 文件越来越大。过大的 AOF 文件会消耗磁盘空间,并且导致 Redis 重启时更加缓慢。为了解决这个问题,在适当情况下,Redis 会对 AOF 文件进行重写,去除文件中冗余的命令,以减小 AOF 文件的体积。在重写 AOF 文件期间, Redis 会启动一个子进程,由子进程负责对 AOF 文件进行重写。   可以通过下面两个配置项,控制 Redis 重写 AOF 文件的频率:

  • auto-aof-rewrite-min-size 64mb
  • auto-aof-rewrite-percentage 100

  上面两个配置的作用:当 AOF 文件的体积大于 64MB,并且 AOF 文件的体积比上一次重写之后的体积大了至少一倍,那么 Redis 就会执行 AOF 重写。

优点:

  • 持久化频率高,数据可靠性高
  • 没有额外的内存或CPU消耗

缺点:

  • 文件体积大
  • 文件大导致服务数据恢复时效率较低

面试话术:

Redis 提供了两种数据持久化的方式,一种是 RDB,另一种是 AOF。默认情况下,Redis 使用的是 RDB 持久化。

RDB持久化文件体积较小,但是保存数据的频率一般较低,可靠性差,容易丢失数据。另外RDB写数据时会采用Fork函数拷贝主进程,可能有额外的内存消耗,文件压缩也会有额外的CPU消耗。

ROF持久化可以做到每秒钟持久化一次,可靠性高。但是持久化文件体积较大,导致数据恢复时读取文件时间较长,效率略低

3.3.Redis的集群方式有哪些?

面试话术:

Redis集群可以分为主从集群分片集群两类。

主从集群一般一主多从,主库用来写数据,从库用来读数据。结合哨兵,可以再主库宕机时从新选主,目的是保证Redis的高可用

分片集群是数据分片,我们会让多个Redis节点组成集群,并将16383个插槽分到不同的节点上。存储数据时利用对key做hash运算,得到插槽值后存储到对应的节点即可。因为存储数据面向的是插槽而非节点本身,因此可以做到集群动态伸缩。目的是让Redis能存储更多数据。

1)主从集群

主从集群,也是读写分离集群。一般都是一主多从方式。

Redis 的复制(replication)功能允许用户根据一个 Redis 服务器来创建任意多个该服务器的复制品,其中被复制的服务器为主服务器(master),而通过复制创建出来的服务器复制品则为从服务器(slave)。

只要主从服务器之间的网络连接正常,主从服务器两者会具有相同的数据,主服务器就会一直将发生在自己身上的数据更新同步 给从服务器,从而一直保证主从服务器的数据相同。

  • 写数据时只能通过主节点完成
  • 读数据可以从任何节点完成
  • 如果配置了哨兵节点,当master宕机时,哨兵会从salve节点选出一个新的主。

主从集群分两种:

带有哨兵的集群:

2)分片集群

主从集群中,每个节点都要保存所有信息,容易形成木桶效应。并且当数据量较大时,单个机器无法满足需求。此时我们就要使用分片集群了。

集群特征:

  • 每个节点都保存不同数据

  • 所有的redis节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽.

  • 节点的fail是通过集群中超过半数的节点检测失效时才生效.

  • 客户端与redis节点直连,不需要中间proxy层连接集群中任何一个可用节点都可以访问到数据

  • redis-cluster把所有的物理节点映射到[0-16383]slot(插槽)上,实现动态伸缩

为了保证Redis中每个节点的高可用,我们还可以给每个节点创建replication(slave节点),如图:

出现故障时,主从可以及时切换:

3.4.Redis的常用数据类型有哪些?

支持多种类型的数据结构,主要区别是value存储的数据格式不同:

  • string:最基本的数据类型,二进制安全的字符串,最大512M。

  • list:按照添加顺序保持顺序的字符串列表。

  • set:无序的字符串集合,不存在重复的元素。

  • sorted set:已排序的字符串集合。

  • hash:key-value对格式

3.5.聊一下Redis事务机制

相关资料:

参考:http://redisdoc.com/topic/transaction.html

Redis事务功能是通过MULTI、EXEC、DISCARD和WATCH 四个原语实现的。Redis会将一个事务中的所有命令序列化,然后按顺序执行。但是Redis事务不支持回滚操作,命令运行出错后,正确的命令会继续执行。

  • MULTI: 用于开启一个事务,它总是返回OK。 MULTI执行之后,客户端可以继续向服务器发送任意多条命令,这些命令不会立即被执行,而是被放到一个待执行命令队列
  • EXEC:按顺序执行命令队列内的所有命令。返回所有命令的返回值。事务执行过程中,Redis不会执行其它事务的命令。
  • DISCARD:清空命令队列,并放弃执行事务, 并且客户端会从事务状态中退出
  • WATCH:Redis的乐观锁机制,利用compare-and-set(CAS)原理,可以监控一个或多个键,一旦其中有一个键被修改,之后的事务就不会执行

使用事务时可能会遇上以下两种错误:

  • 执行 EXEC 之前,入队的命令可能会出错。比如说,命令可能会产生语法错误(参数数量错误,参数名错误,等等),或者其他更严重的错误,比如内存不足(如果服务器使用 maxmemory 设置了最大内存限制的话)。
    • Redis 2.6.5 开始,服务器会对命令入队失败的情况进行记录,并在客户端调用 EXEC 命令时,拒绝执行并自动放弃这个事务。
  • 命令可能在 EXEC 调用之后失败。举个例子,事务中的命令可能处理了错误类型的键,比如将列表命令用在了字符串键上面,诸如此类。
    • 即使事务中有某个/某些命令在执行时产生了错误, 事务中的其他命令仍然会继续执行,不会回滚。

为什么 Redis 不支持回滚(roll back)?

以下是这种做法的优点:

  • Redis 命令只会因为错误的语法而失败(并且这些问题不能在入队时发现),或是命令用在了错误类型的键上面:这也就是说,从实用性的角度来说,失败的命令是由编程错误造成的,而这些错误应该在开发的过程中被发现,而不应该出现在生产环境中。
  • 因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。

鉴于没有任何机制能避免程序员自己造成的错误, 并且这类错误通常不会在生产环境中出现, 所以 Redis 选择了更简单、更快速的无回滚方式来处理事务。

面试话术:

Redis事务其实是把一系列Redis命令放入队列,然后批量执行,执行过程中不会有其它事务来打断。不过与关系型数据库的事务不同,Redis事务不支持回滚操作,事务中某个命令执行失败,其它命令依然会执行。

为了弥补不能回滚的问题,Redis会在事务入队时就检查命令,如果命令异常则会放弃整个事务。

因此,只要程序员编程是正确的,理论上说Redis会正确执行所有事务,无需回滚。

面试官:如果事务执行一半的时候Redis宕机怎么办?

Redis有持久化机制,因为可靠性问题,我们一般使用AOF持久化。事务的所有命令也会写入AOF文件,但是如果在执行EXEC命令之前,Redis已经宕机,则AOF文件中事务不完整。使用 redis-check-aof 程序可以移除 AOF 文件中不完整事务的信息,确保服务器可以顺利启动。

3.6.Redis的Key过期策略

参考资料:

为什么需要内存回收?

  • 1、在Redis中,set指令可以指定key的过期时间,当过期时间到达以后,key就失效了;
  • 2、Redis是基于内存操作的,所有的数据都是保存在内存中,一台机器的内存是有限且很宝贵的。

基于以上两点,为了保证Redis能继续提供可靠的服务,Redis需要一种机制清理掉不常用的、无效的、多余的数据,失效后的数据需要及时清理,这就需要内存回收了。

Redis的内存回收主要分为过期删除策略和内存淘汰策略两部分。

过期删除策略

删除达到过期时间的key。

  • 1)定时删除

对于每一个设置了过期时间的key都会创建一个定时器,一旦到达过期时间就立即删除。该策略可以立即清除过期的数据,对内存较友好,但是缺点是占用了大量的CPU资源去处理过期的数据,会影响Redis的吞吐量和响应时间。

  • 2)惰性删除

当访问一个key时,才判断该key是否过期,过期则删除。该策略能最大限度地节省CPU资源,但是对内存却十分不友好。有一种极端的情况是可能出现大量的过期key没有被再次访问,因此不会被清除,导致占用了大量的内存。

在计算机科学中,懒惰删除(英文:lazy deletion)指的是从一个散列表(也称哈希表)中删除元素的一种方法。在这个方法中,删除仅仅是指标记一个元素被删除,而不是整个清除它。被删除的位点在插入时被当作空元素,在搜索之时被当作已占据。

  • 3)定期删除

每隔一段时间,扫描Redis中过期key字典,并清除部分过期的key。该策略是前两者的一个折中方案,还可以通过调整定时扫描的时间间隔和每次扫描的限定耗时,在不同情况下使得CPU和内存资源达到最优的平衡效果。

在Redis中,同时使用了定期删除和惰性删除。不过Redis定期删除采用的是随机抽取的方式删除部分Key,因此不能保证过期key 100%的删除。

Redis结合了定期删除和惰性删除,基本上能很好的处理过期数据的清理,但是实际上还是有点问题的,如果过期key较多,定期删除漏掉了一部分,而且也没有及时去查,即没有走惰性删除,那么就会有大量的过期key堆积在内存中,导致redis内存耗尽,当内存耗尽之后,有新的key到来会发生什么事呢?是直接抛弃还是其他措施呢?有什么办法可以接受更多的key?

内存淘汰策略

Redis的内存淘汰策略,是指内存达到maxmemory极限时,使用某种算法来决定清理掉哪些数据,以保证新数据的存入。

Redis的内存淘汰机制包括:

  • noeviction: 当内存不足以容纳新写入数据时,新写入操作会报错。
  • allkeys-lru:当内存不足以容纳新写入数据时,在键空间(server.db[i].dict)中,移除最近最少使用的 key(这个是最常用的)。
  • allkeys-random:当内存不足以容纳新写入数据时,在键空间(server.db[i].dict)中,随机移除某个 key。
  • volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间(server.db[i].expires)中,移除最近最少使用的 key。
  • volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间(server.db[i].expires)中,随机移除某个 key。
  • volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间(server.db[i].expires)中,有更早过期时间的 key 优先移除。

在配置文件中,通过maxmemory-policy可以配置要使用哪一个淘汰机制。

什么时候会进行淘汰?

Redis会在每一次处理命令的时候(processCommand函数调用freeMemoryIfNeeded)判断当前redis是否达到了内存的最大限制,如果达到限制,则使用对应的算法去处理需要删除的key。

在淘汰key时,Redis默认最常用的是LRU算法(Latest Recently Used)。Redis通过在每一个redisObject保存lru属性来保存key最近的访问时间,在实现LRU算法时直接读取key的lru属性。

具体实现时,Redis遍历每一个db,从每一个db中随机抽取一批样本key,默认是3个key,再从这3个key中,删除最近最少使用的key。

面试话术:

Redis过期策略包含定期删除和惰性删除两部分。定期删除是在Redis内部有一个定时任务,会定期删除一些过期的key。惰性删除是当用户查询某个Key时,会检查这个Key是否已经过期,如果没过期则返回用户,如果过期则删除。

但是这两个策略都无法保证过期key一定删除,漏网之鱼越来越多,还可能导致内存溢出。当发生内存不足问题时,Redis还会做内存回收。内存回收采用LRU策略,就是最近最少使用。其原理就是记录每个Key的最近使用时间,内存回收时,随机抽取一些Key,比较其使用时间,把最老的几个删除。

Redis的逻辑是:最近使用过的,很可能再次被使用

3.7.Redis在项目中的哪些地方有用到?

(1)共享session

在分布式系统下,服务会部署在不同的tomcat,因此多个tomcat的session无法共享,以前存储在session中的数据无法实现共享,可以用redis代替session,解决分布式系统间数据共享问题。

(2)数据缓存

Redis采用内存存储,读写效率较高。我们可以把数据库的访问频率高的热点数据存储到redis中,这样用户请求时优先从redis中读取,减少数据库压力,提高并发能力。

(3)异步队列

Reids在内存存储引擎领域的一大优点是提供 list 和 set 操作,这使得Redis能作为一个很好的消息队列平台来使用。而且Redis中还有pub/sub这样的专用结构,用于1对N的消息通信模式。

(4)分布式锁

Redis中的乐观锁机制,可以帮助我们实现分布式锁的效果,用于解决分布式系统下的多线程安全问题

3.8.Redis的缓存击穿、缓存雪崩、缓存穿透

1)缓存穿透

参考资料:

  • 什么是缓存穿透
    • 正常情况下,我们去查询数据都是存在。那么请求去查询一条压根儿数据库中根本就不存在的数据,也就是缓存和数据库都查询不到这条数据,但是请求每次都会打到数据库上面去。这种查询不存在数据的现象我们称为缓存穿透
  • 穿透带来的问题
    • 试想一下,如果有黑客会对你的系统进行攻击,拿一个不存在的id 去查询数据,会产生大量的请求到数据库去查询。可能会导致你的数据库由于压力过大而宕掉。
  • 解决办法
    • 缓存空值:之所以会发生穿透,就是因为缓存中没有存储这些空数据的key。从而导致每次查询都到数据库去了。那么我们就可以为这些key对应的值设置为null 丢到缓存里面去。后面再出现查询这个key 的请求的时候,直接返回null 。这样,就不用在到数据库中去走一圈了,但是别忘了设置过期时间。
    • BloomFilter(布隆过滤):将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被 这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。在缓存之前在加一层 BloomFilter ,在查询的时候先去 BloomFilter 去查询 key 是否存在,如果不存在就直接返回,存在再走查缓存 -> 查 DB。

话术:

缓存穿透有两种解决方案:其一是把不存在的key设置null值到缓存中。其二是使用布隆过滤器,在查询缓存前先通过布隆过滤器判断key是否存在,存在再去查询缓存。

设置null值可能被恶意针对,攻击者使用大量不存在的不重复key ,那么方案一就会缓存大量不存在key数据。此时我们还可以对Key规定格式模板,然后对不存在的key做正则规范匹配,如果完全不符合就不用存null值到redis,而是直接返回错误。

2)缓存击穿

相关资料

  • 什么是缓存击穿?

key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。

当这个key在失效的瞬间,redis查询失败,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。

  • 解决方案:
    • 使用互斥锁(mutex key):mutex,就是互斥。简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db,而是先使用Redis的SETNX去set一个互斥key,当操作返回成功时,再进行load db的操作并回设缓存;否则,就重试整个get缓存的方法。SETNX,是「SET if Not eXists」的缩写,也就是只有不存在的时候才设置,可以利用它来实现互斥的效果。
    • 软过期:也就是逻辑过期,不使用redis提供的过期时间,而是业务层在数据中存储过期时间信息。查询时由业务程序判断是否过期,如果数据即将过期时,将缓存的时效延长,程序可以派遣一个线程去数据库中获取最新的数据,其他线程这时看到延长了的过期时间,就会继续使用旧数据,等派遣的线程获取最新数据后再更新缓存。

推荐使用互斥锁,因为软过期会有业务逻辑侵入和额外的判断。

面试话术

缓存击穿主要担心的是某个Key过期,更新缓存时引起对数据库的突发高并发访问。因此我们可以在更新缓存时采用互斥锁控制,只允许一个线程去更新缓存,其它线程等待并重新读取缓存。例如Redis的setnx命令就能实现互斥效果。

3)缓存雪崩

相关资料

缓存雪崩,是指在某一个时间段,缓存集中过期失效。对这批数据的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。

解决方案:

  • 数据分类分批处理:采取不同分类数据,缓存不同周期
  • 相同分类数据:采用固定时长加随机数方式设置缓存
  • 热点数据缓存时间长一些,冷门数据缓存时间短一些
  • 避免redis节点宕机引起雪崩,搭建主从集群,保证高可用

面试话术:

解决缓存雪崩问题的关键是让缓存Key的过期时间分散。因此我们可以把数据按照业务分类,然后设置不同过期时间。相同业务类型的key,设置固定时长加随机数。尽可能保证每个Key的过期时间都不相同。

另外,Redis宕机也可能导致缓存雪崩,因此我们还要搭建Redis主从集群及哨兵监控,保证Redis的高可用。

3.9.缓存冷热数据分离

背景资料

Redis使用的是内存存储,当需要海量数据存储时,成本非常高。

经过调研发现,当前主流DDR3内存和主流SATA SSD的单位成本价格差距大概在20倍左右,为了优化redis机器综合成本,我们考虑实现基于热度统计 的数据分级存储及数据在RAM/FLASH之间的动态交换,从而大幅度降低成本,达到性能与成本的高平衡。

基本思路:基于key访问次数(LFU)的热度统计算法识别出热点数据,并将热点数据保留在redis中,对于无访问/访问次数少的数据则转存到SSD上,如果SSD上的key再次变热,则重新将其加载到redis内存中。

目前流行的高性能磁盘存储,并且遵循Redis协议的方案包括:

  • SSDB:http://ssdb.io/zh_cn/
  • RocksDB:https://rocksdb.org.cn/

因此,我们就需要在应用程序与缓存服务之间引入代理,实现Redis和SSD之间的切换,如图:

这样的代理方案阿里云提供的就有。当然也有一些开源方案,例如:https://github.com/JingchengLi/swapdb

3.10.Redis实现分布式锁

分布式锁要满足的条件:

  • 多进程互斥:同一时刻,只有一个进程可以获取锁
  • 保证锁可以释放:任务结束或出现异常,锁一定要释放,避免死锁
  • 阻塞锁(可选):获取锁失败时可否重试
  • 重入锁(可选):获取锁的代码递归调用时,依然可以获取锁

1)最基本的分布式锁:

利用Redis的setnx命令,这个命令的特征时如果多次执行,只有第一次执行会成功,可以实现互斥的效果。但是为了保证服务宕机时也可以释放锁,需要利用expire命令给锁设置一个有效期

1
2
setnx lock thread-01 # 尝试获取锁
expire lock 10 # 设置有效期

面试官问题1:如果expire之前服务宕机怎么办?

要保证setnx和expire命令的原子性。redis的set命令可以满足:

1
set key value [NX] [EX time] 

需要添加nx和ex的选项:

  • NX:与setnx一致,第一次执行成功
  • EX:设置过期时间

面试官问题2:释放锁的时候,如果自己的锁已经过期了,此时会出现安全漏洞,如何解决?

在锁中存储当前进程和线程标识,释放锁时对锁的标识判断,如果是自己的则删除,不是则放弃操作。

但是这两步操作要保证原子性,需要通过Lua脚本来实现。

1
2
3
if redis.call("get",KEYS[1]) == ARGV[1] then
redis.call("del",KEYS[1])
end

2)可重入分布式锁

如果有重入的需求,则除了在锁中记录进程标识,还要记录重试次数,流程如下:

下面我们假设锁的key为“lock”,hashKey是当前线程的id:“threadId”,锁自动释放时间假设为20

获取锁的步骤:

  • 1、判断lock是否存在 EXISTS lock
    • 存在,说明有人获取锁了,下面判断是不是自己的锁
      • 判断当前线程id作为hashKey是否存在:HEXISTS lock threadId
        • 不存在,说明锁已经有了,且不是自己获取的,锁获取失败,end
        • 存在,说明是自己获取的锁,重入次数+1:HINCRBY lock threadId 1,去到步骤3
    • 2、不存在,说明可以获取锁,HSET key threadId 1
    • 3、设置锁自动释放时间,EXPIRE lock 20

释放锁的步骤:

  • 1、判断当前线程id作为hashKey是否存在:HEXISTS lock threadId
    • 不存在,说明锁已经失效,不用管了
    • 存在,说明锁还在,重入次数减1:HINCRBY lock threadId -1,获取新的重入次数
  • 2、判断重入次数是否为0:
    • 为0,说明锁全部释放,删除key:DEL lock
    • 大于0,说明锁还在使用,重置有效时间:EXPIRE lock 20

对应的Lua脚本如下:

首先是获取锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间

if(redis.call('exists', key) == 0) then -- 判断是否存在
redis.call('hset', key, threadId, '1'); -- 不存在, 获取锁
redis.call('expire', key, releaseTime); -- 设置有效期
return 1; -- 返回结果
end;

if(redis.call('hexists', key, threadId) == 1) then -- 锁已经存在,判断threadId是否是自己
redis.call('hincrby', key, threadId, '1'); -- 不存在, 获取锁,重入次数+1
redis.call('expire', key, releaseTime); -- 设置有效期
return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败

然后是释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间

if (redis.call('HEXISTS', key, threadId) == 0) then -- 判断当前锁是否还是被自己持有
return nil; -- 如果已经不是自己,则直接返回
end;
local count = redis.call('HINCRBY', key, threadId, -1); -- 是自己的锁,则重入次数-1

if (count > 0) then -- 判断是否重入次数是否已经为0
redis.call('EXPIRE', key, releaseTime); -- 大于0说明不能释放锁,重置有效期然后返回
return nil;
else
redis.call('DEL', key); -- 等于0说明可以释放锁,直接删除
return nil;
end;

3)高可用的锁

面试官问题:redis分布式锁依赖与redis,如果redis宕机则锁失效。如何解决?

此时大多数同学会回答说:搭建主从集群,做数据备份。

这样就进入了陷阱,因为面试官的下一个问题就来了:

面试官问题:如果搭建主从集群做数据备份时,进程A获取锁,master还没有把数据备份到slave,master宕机,slave升级为master,此时原来锁失效,其它进程也可以获取锁,出现安全问题。如何解决?

关于这个问题,Redis官网给出了解决方案,使用RedLock思路可以解决:

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,我们假设有5个Redis master节点,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以毫秒为单位。
  2. 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  4. 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
  5. 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。

3.11.如何实现数据库与缓存数据一致?

面试话术:

实现方案有下面几种:

  • 本地缓存同步:当前微服务的数据库数据与缓存数据同步,可以直接在数据库修改时加入对Redis的修改逻辑,保证一致。
  • 跨服务缓存同步:服务A调用了服务B,并对查询结果缓存。服务B数据库修改,可以通过MQ通知服务A,服务A修改Redis缓存数据
  • 通用方案:使用Canal框架,伪装成MySQL的salve节点,监听MySQL的binLog变化,然后修改Redis缓存数据

Nacos源码分析

1.下载Nacos源码并运行

要研究Nacos源码自然不能用打包好的Nacos服务端jar包来运行,需要下载源码自己编译来运行。

1.1.下载Nacos源码

Nacos的GitHub地址:https://github.com/alibaba/nacos

课前资料中已经提供了下载好的1.4.2版本的Nacos源码:

如果需要研究其他版本的同学,也可以自行下载:

大家找到其release页面:https://github.com/alibaba/nacos/tags,找到其中的1.4.2.版本:

点击进入后,下载Source code(zip):

1.2.导入Demo工程

我们的课前资料提供了一个微服务Demo,包含了服务注册、发现等业务。

导入该项目后,查看其项目结构:

结构说明:

  • cloud-source-demo:项目父目录
    • cloud-demo:微服务的父工程,管理微服务依赖
      • order-service:订单微服务,业务中需要访问user-service,是一个服务消费者
      • user-service:用户微服务,对外暴露根据id查询用户的接口,是一个服务提供者

1.3.导入Nacos源码

将之前下载好的Nacos源码解压到cloud-source-demo项目目录中:

然后,使用IDEA将其作为一个module来导入:

1)选择项目结构选项:

然后点击导入module:

在弹出窗口中,选择nacos源码目录:

然后选择maven模块,finish:

最后,点击OK即可:

导入后的项目结构:

1.4.proto编译

Nacos底层的数据通信会基于protobuf对数据做序列化和反序列化。并将对应的proto文件定义在了consistency这个子模块中:

我们需要先将proto文件编译为对应的Java代码。

1.4.1.什么是protobuf

protobuf的全称是Protocol Buffer,是Google提供的一种数据序列化协议,这是Google官方的定义:

Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

可以简单理解为,是一种跨语言、跨平台的数据传输格式。与json的功能类似,但是无论是性能,还是数据大小都比json要好很多。

protobuf的之所以可以跨语言,就是因为数据定义的格式为.proto格式,需要基于protoc编译为对应的语言。

1.4.2.安装protoc

Protobuf的GitHub地址:https://github.com/protocolbuffers/protobuf/releases

我们可以下载windows版本的来使用:

另外,课前资料也提供了下载好的安装包:

解压到任意非中文目录下,其中的bin目录中的protoc.exe可以帮助我们编译:

然后将这个bin目录配置到你的环境变量path中,可以参考JDK的配置方式:

1.4.3.编译proto

进入nacos-1.4.2的consistency模块下的src/main目录下:

然后打开cmd窗口,运行下面的两个命令:

1
2
protoc --java_out=./java ./proto/consistency.proto
protoc --java_out=./java ./proto/Data.proto

如图:

会在nacos的consistency模块中编译出这些java代码:

1.5.运行

nacos服务端的入口是在console模块中的Nacos类:

我们需要让它单机启动:

然后新建一个SpringBootApplication:

然后填写应用信息:

然后运行Nacos这个main函数:

将order-service和user-service服务启动后,可以查看nacos控制台:

2.服务注册

服务注册到Nacos以后,会保存在一个本地注册表中,其结构如下:

首先最外层是一个Map,结构为:Map<String, Map<String, Service>>

  • key:是namespace_id,起到环境隔离的作用。namespace下可以有多个group
  • value:又是一个Map<String, Service>,代表分组及组内的服务。一个组内可以有多个服务
    • key:代表group分组,不过作为key时格式是group_name:service_name
    • value:分组下的某一个服务,例如userservice,用户服务。类型为Service,内部也包含一个Map<String,Cluster>,一个服务下可以有多个集群
      • key:集群名称
      • value:Cluster类型,包含集群的具体信息。一个集群中可能包含多个实例,也就是具体的节点信息,其中包含一个Set<Instance>,就是该集群下的实例的集合
        • Instance:实例信息,包含实例的IP、Port、健康状态、权重等等信息

每一个服务去注册到Nacos时,就会把信息组织并存入这个Map中。

2.1.服务注册接口

Nacos提供了服务注册的API接口,客户端只需要向该接口发送请求,即可实现服务注册。

接口说明:注册一个实例到Nacos服务。

请求类型POST

请求路径/nacos/v1/ns/instance

请求参数

名称 类型 是否必选 描述
ip 字符串 服务实例IP
port int 服务实例port
namespaceId 字符串 命名空间ID
weight double 权重
enabled boolean 是否上线
healthy boolean 是否健康
metadata 字符串 扩展信息
clusterName 字符串 集群名
serviceName 字符串 服务名
groupName 字符串 分组名
ephemeral boolean 是否临时实例

错误编码

错误代码 描述 语义
400 Bad Request 客户端请求中的语法错误
403 Forbidden 没有权限
404 Not Found 无法找到资源
500 Internal Server Error 服务器内部错误
200 OK 正常

2.2.客户端

首先,我们需要找到服务注册的入口。

2.2.1.NacosServiceRegistryAutoConfiguration

因为Nacos的客户端是基于SpringBoot的自动装配实现的,我们可以在nacos-discovery依赖:

spring-cloud-starter-alibaba-nacos-discovery-2.2.6.RELEASE.jar

这个包中找到Nacos自动装配信息:

可以看到,有很多个自动配置类被加载了,其中跟服务注册有关的就是NacosServiceRegistryAutoConfiguration这个类,我们跟入其中。

可以看到,在NacosServiceRegistryAutoConfiguration这个类中,包含一个跟自动注册有关的Bean:

2.2.2.NacosAutoServiceRegistration

NacosAutoServiceRegistration源码如图:

可以看到在初始化时,其父类AbstractAutoServiceRegistration也被初始化了。

AbstractAutoServiceRegistration如图:

可以看到它实现了ApplicationListener接口,监听Spring容器启动过程中的事件。

在监听到WebServerInitializedEvent(web服务初始化完成)的事件后,执行了bind 方法。

其中的bind方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void bind(WebServerInitializedEvent event) {
// 获取 ApplicationContext
ApplicationContext context = event.getApplicationContext();
// 判断服务的 namespace,一般都是null
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
// 记录当前 web 服务的端口
this.port.compareAndSet(0, event.getWebServer().getPort());
// 启动当前服务注册流程
this.start();
}

其中的start方法流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}

// 当前服务处于未运行状态时,才进行初始化
if (!this.running.get()) {
// 发布服务开始注册的事件
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
// ☆☆☆☆开始注册☆☆☆☆
register();
if (shouldRegisterManagement()) {
registerManagement();
}
// 发布注册完成事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
// 服务状态设置为运行状态,基于AtomicBoolean
this.running.compareAndSet(false, true);
}

}

其中最关键的register()方法就是完成服务注册的关键,代码如下:

1
2
3
protected void register() {
this.serviceRegistry.register(getRegistration());
}

此处的this.serviceRegistry就是NacosServiceRegistry:

2.2.3.NacosServiceRegistry

NacosServiceRegistry是Spring的ServiceRegistry接口的实现类,而ServiceRegistry接口是服务注册、发现的规约接口,定义了register、deregister等方法的声明。

NacosServiceRegistryregister的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void register(Registration registration) {
// 判断serviceId是否为空,也就是spring.application.name不能为空
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
// 获取Nacos的命名服务,其实就是注册中心服务
NamingService namingService = namingService();
// 获取 serviceId 和 Group
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
// 封装服务实例的基本信息,如 cluster-name、是否为临时实例、权重、IP、端口等
Instance instance = getNacosInstanceFromRegistration(registration);

try {
// 开始注册服务
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
if (nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
else {
log.warn("Failfast is false. {} register failed...{},", serviceId,
registration.toString(), e);
}
}
}

可以看到方法中最终是调用NamingService的registerInstance方法实现注册的。

而NamingService接口的默认实现就是NacosNamingService。

2.2.4.NacosNamingService

NacosNamingService提供了服务注册、订阅等功能。

其中registerInstance就是注册服务实例,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 检查超时参数是否异常。心跳超时时间(默认15秒)必须大于心跳周期(默认5秒)
NamingUtils.checkInstanceIsLegal(instance);
// 拼接得到新的服务名,格式为:groupName@@serviceId
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是否为临时实例,默认为 true。
if (instance.isEphemeral()) {
// 如果是临时实例,需要定时向 Nacos 服务发送心跳
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 发送注册服务实例的请求
serverProxy.registerService(groupedServiceName, groupName, instance);
}

最终,由NacosProxy的registerService方法,完成服务注册。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
// 组织请求参数
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 通过POST请求将上述参数,发送到 /nacos/v1/ns/instance
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}

这里提交的信息就是Nacos服务注册接口需要的完整参数,核心参数有:

  • namespace_id:环境
  • service_name:服务名称
  • group_name:组名称
  • cluster_name:集群名称
  • ip: 当前实例的ip地址
  • port: 当前实例的端口

而在NacosNamingService的registerInstance方法中,有一段是与服务心跳有关的代码,我们在后续会继续学习。

2.2.5.客户端注册的流程图

如图:

2.3.服务端

在nacos-console的模块中,会引入nacos-naming这个模块:

模块结构如下:

其中的com.alibaba.nacos.naming.controllers包下就有服务注册、发现等相关的各种接口,其中的服务注册是在InstanceController类中:

2.3.1.InstanceController

进入InstanceController类,可以看到一个register方法,就是服务注册的方法了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
// 尝试获取namespaceId
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 尝试获取serviceName,其格式为 group_name@@service_name
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
// 解析出实例信息,封装为Instance对象
final Instance instance = parseInstance(request);
// 注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}

这里,进入到了serviceManager.registerInstance()方法中。

2.3.2.ServiceManager

ServiceManager就是Nacos中管理服务、实例信息的核心API,其中就包含Nacos的服务注册表:

而其中的registerInstance方法就是注册服务实例的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Register an instance to a service in AP mode.
*
* <p>This method creates service or cluster silently if they don't exist.
*
* @param namespaceId id of namespace
* @param serviceName service name
* @param instance instance to register
* @throws Exception any error occurred in the process
*/
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表)
// 此时不包含实例信息
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 拿到创建好的service
Service service = getService(namespaceId, serviceName);
// 拿不到则抛异常
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 添加要注册的实例到service中
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

创建好了服务,接下来就要添加实例到服务中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Add instance to service.
*
* @param namespaceId namespace
* @param serviceName service name
* @param ephemeral whether instance is ephemeral
* @param ips instances
* @throws NacosException nacos exception
*/
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 监听服务列表用到的key,服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取服务
Service service = getService(namespaceId, serviceName);
// 同步锁,避免并发修改的安全问题
synchronized (service) {
// 1)获取要更新的实例列表
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 2)封装实例列表到Instances对象
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 3)完成 注册表更新 以及 Nacos集群的数据同步
consistencyService.put(key, instances);
}
}

该方法中对修改服务列表的动作加锁处理,确保线程安全。而在同步代码块中,包含下面几步:

  • 1)先获取要更新的实例列表,addIpAddresses(service, ephemeral, ips);
  • 2)然后将更新后的数据封装到Instances对象中,后面更新注册表时使用
  • 3)最后,调用consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性。

注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中。在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。

这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案。

1)更服务列表

我们来看看实例列表的更新,对应的方法是addIpAddresses(service, ephemeral, ips);

1
2
3
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}

继续进入updateIpAddresses方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
// 根据namespaceId、serviceName获取当前服务的实例列表,返回值是Datum
// 第一次来,肯定是null
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
// 得到服务中现有的实例列表
List<Instance> currentIPs = service.allIPs(ephemeral);
// 创建map,保存实例列表,key为ip地址,value是Instance对象
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
// 创建Set集合,保存实例的instanceId
Set<String> currentInstanceIds = Sets.newHashSet();
// 遍历要现有的实例列表
for (Instance instance : currentIPs) {
// 添加到map中
currentInstances.put(instance.toIpAddr(), instance);
// 添加instanceId到set中
currentInstanceIds.add(instance.getInstanceId());
}

// 创建map,用来保存更新后的实例列表
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
// 如果服务中已经有旧的数据,则先保存旧的实例列表
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
// 如果没有旧数据,则直接创建新的map
instanceMap = new HashMap<>(ips.length);
}
// 遍历实例列表
for (Instance instance : ips) {
// 判断服务中是否包含要注册的实例的cluster信息
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
// 如果不包含,创建新的cluster
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
// 将集群放入service的注册表
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
// 删除实例 or 新增实例 ?
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
// 新增实例,instance生成全新的instanceId
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) {
instance.setInstanceId(oldInstance.getInstanceId());
} else {
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
// 放入instance列表
instanceMap.put(instance.getDatumKey(), instance);
}

}

if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
// 将instanceMap中的所有实例转为List返回
return new ArrayList<>(instanceMap.values());
}

简单来讲,就是先获取旧的实例列表,然后把新的实例信息与旧的做对比,新的实例就添加,老的实例同步ID。然后返回最新的实例列表。

2)Nacos集群一致性

在完成本地服务列表更新后,Nacos又实现了集群一致性更新,调用的是:

consistencyService.put(key, instances);

这里的ConsistencyService接口,代表集群一致性的接口,有很多中不同实现:

我们进入DelegateConsistencyServiceImpl来看:

1
2
3
4
5
@Override
public void put(String key, Record value) throws NacosException {
// 根据实例是否是临时实例,判断委托对象
mapConsistencyService(key).put(key, value);
}

其中的mapConsistencyService(key)方法就是选择委托方式的:

1
2
3
4
5
6
private ConsistencyService mapConsistencyService(String key) {
// 判断是否是临时实例:
// 是,选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类
// 否,选择 persistentConsistencyService,也就是PersistentConsistencyServiceDelegateImpl
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

默认情况下,所有实例都是临时实例,我们关注DistroConsistencyServiceImpl即可。

2.3.4.DistroConsistencyServiceImpl

我们来看临时实例的一致性实现:DistroConsistencyServiceImpl类的put方法:

1
2
3
4
5
6
7
public void put(String key, Record value) throws NacosException {
// 先将要更新的实例信息写入本地实例列表
onPut(key, value);
// 开始集群同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}

这里方法只有两行:

  • onPut(key, value):其中value就是Instances,要更新的服务信息。这行主要是基于线程池方式,异步的将Service信息写入注册表中(就是那个多重Map)
  • distroProtocol.sync():就是通过Distro协议将数据同步给集群中的其它Nacos节点

我们先看onPut方法

2.3.4.1.更新本地实例列表

1)放入阻塞队列

onPut方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void onPut(String key, Record value) {
// 判断是否是临时实例
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
// 封装 Instances 信息到 数据集:Datum
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 放入DataStore
dataStore.put(key, datum);
}

if (!listeners.containsKey(key)) {
return;
}
// 放入阻塞队列,这里的 notifier维护了一个阻塞队列,并且基于线程池异步执行队列中的任务
notifier.addTask(key, DataOperation.CHANGE);
}

notifier的类型就是DistroConsistencyServiceImpl.Notifier,内部维护了一个阻塞队列,存放服务列表变更的事件:

addTask时,将任务加入该阻塞队列:

1
2
3
4
5
6
7
8
9
10
11
12
// DistroConsistencyServiceImpl.Notifier类的 addTask 方法:
public void addTask(String datumKey, DataOperation action) {

if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// 任务放入阻塞队列
tasks.offer(Pair.with(datumKey, action));
}
2)Notifier异步更新

同时,notifier还是一个Runnable,通过一个单线程的线程池来不断从阻塞队列中获取任务,执行服务列表的更新。来看下其中的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// DistroConsistencyServiceImpl.Notifier类的run方法:
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
// 死循环,不断执行任务。因为是阻塞队列,不会导致CPU负载过高
for (; ; ) {
try {
// 从阻塞队列中获取任务
Pair<String, DataOperation> pair = tasks.take();
// 处理任务,更新服务列表
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}

来看看handle方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// DistroConsistencyServiceImpl.Notifier类的 handle 方法:
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();

services.remove(datumKey);

int count = 0;

if (!listeners.containsKey(datumKey)) {
return;
}
// 遍历,找到变化的service,这里的 RecordListener就是 Service
for (RecordListener listener : listeners.get(datumKey)) {

count++;

try {
// 服务的实例列表CHANGE事件
if (action == DataOperation.CHANGE) {
// 更新服务列表
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
// 服务的实例列表 DELETE 事件
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}

if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
3)覆盖实例列表

而在Service的onChange方法中,就可以看到更新实例列表的逻辑了:

1
2
3
4
5
6
7
8
9
10
@Override
public void onChange(String key, Instances value) throws Exception {

Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

// 更新实例列表
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

recalculateChecksum();
}

updateIPs方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// 准备一个Map,key是cluster,值是集群下的Instance集合
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
// 获取服务的所有cluster名称
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
// 遍历要更新的实例
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
// 判断实例是否包含clusterName,没有的话用默认cluster
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 判断cluster是否存在,不存在则创建新的cluster
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
// 获取当前cluster实例的集合,不存在则创建新的
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 添加新的实例到 Instance 集合
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}

for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 将实例集合更新到 clusterMap(注册表)
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}

setLastModifiedMillis(System.currentTimeMillis());
// 发布服务变更的通知消息
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();

for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}

Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());

}

在第45行的代码中:clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);

就是在更新注册表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void updateIps(List<Instance> ips, boolean ephemeral) {
// 获取旧实例列表
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;

HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());

for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}

// 检查新加入实例的状态
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs.toString());

for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
// 移除要删除的实例
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);

if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs.toString());

for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}

toUpdateInstances = new HashSet<>(ips);
// 直接覆盖旧实例列表
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}

2.3.4.2.集群数据同步

在DistroConsistencyServiceImpl的put方法中分为两步:

其中的onPut方法已经分析过了。

下面的distroProtocol.sync()就是集群同步的逻辑了。

DistroProtocol类的sync方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void sync(DistroKey distroKey, DataOperation action, long delay) {
// 遍历 Nacos 集群中除自己以外的其它节点
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
// 定义一个Distro的同步任务
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// 交给线程池去执行
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}

其中同步的任务封装为一个DistroDelayTask对象。

交给了distroTaskEngineHolder.getDelayTaskExecuteEngine()执行,这行代码的返回值是:

NacosDelayTaskExecuteEngine,这个类维护了一个线程池,并且接收任务,执行任务。

执行任务的方法为processTasks()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// 尝试执行同步任务,如果失败会重试
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}

可以看出来基于Distro模式的同步是异步进行的,并且失败时会将任务重新入队并充实,因此不保证同步结果的强一致性,属于AP模式的一致性策略。

2.3.5.服务端流程图

2.4.总结

  • Nacos的注册表结构是什么样的?

    • 答:Nacos是多级存储模型,最外层通过namespace来实现环境隔离,然后是group分组,分组下就是服务,一个服务有可以分为不同的集群,集群中包含多个实例。因此其注册表结构为一个Map,类型是:

      Map<String, Map<String, Service>>

      外层key是namespace_id,内层key是group+serviceName.

      Service内部维护一个Map,结构是:Map<String,Cluster>,key是clusterName,值是集群信息

      Cluster内部维护一个Set集合,元素是Instance类型,代表集群中的多个实例。

  • Nacos如何保证并发写的安全性?

    • 答:首先,在注册实例时,会对service加锁,不同service之间本身就不存在并发写问题,互不影响。相同service时通过锁来互斥。并且,在更新实例列表时,是基于异步的线程池来完成,而线程池的线程数量为1.
  • Nacos如何避免并发读写的冲突?

    • 答:Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将Old实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。
  • Nacos如何应对阿里内部数十万服务的并发写请求?

    • 答:Nacos内部会将服务注册的任务放入阻塞队列,采用线程池异步来完成实例更新,从而提高并发写能力。

3.服务心跳

Nacos的实例分为临时实例和永久实例两种,可以通过在yaml 文件配置:

1
2
3
4
5
6
7
8
spring:
application:
name: order-service
cloud:
nacos:
discovery:
ephemeral: false # 设置实例为永久实例。true:临时; false:永久
server-addr: 192.168.150.1:8845

临时实例基于心跳方式做健康检测,而永久实例则是由Nacos主动探测实例状态。

其中Nacos提供的心跳的API接口为:

接口描述:发送某个实例的心跳

请求类型:PUT

请求路径

1
/nacos/v1/ns/instance/beat

请求参数

名称 类型 是否必选 描述
serviceName 字符串 服务名
groupName 字符串 分组名
ephemeral boolean 是否临时实例
beat JSON格式字符串 实例心跳内容

错误编码

错误代码 描述 语义
400 Bad Request 客户端请求中的语法错误
403 Forbidden 没有权限
404 Not Found 无法找到资源
500 Internal Server Error 服务器内部错误
200 OK 正常

3.1.客户端

在2.2.4.服务注册这一节中,我们说过NacosNamingService这个类实现了服务的注册,同时也实现了服务心跳:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是否是临时实例。
if (instance.isEphemeral()) {
// 如果是临时实例,则构建心跳信息BeatInfo
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 添加心跳任务
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}

3.1.1.BeatInfo

这里的BeanInfo就包含心跳需要的各种信息:

3.1.2.BeatReactor

BeatReactor这个类则维护了一个线程池:

当调用BeatReactor.addBeatInfo(groupedServiceName, beatInfo)方法时,就会执行心跳:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 利用线程池,定期执行心跳任务,周期为 beatInfo.getPeriod()
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

心跳周期的默认值在com.alibaba.nacos.api.common.Constants类中:

可以看到是5秒,默认5秒一次心跳。

3.1.3.BeatTask

心跳的任务封装在BeatTask这个类中,是一个Runnable,其run方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
// 获取心跳周期
long nextTime = beatInfo.getPeriod();
try {
// 发送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
// 判断心跳结果
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
// 如果失败,则需要 重新注册实例
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}

3.1.5.发送心跳

最终心跳的发送还是通过NamingProxysendBeat方法来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {

if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
// 组织请求参数
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
// 发送请求,这个地址就是:/v1/ns/instance/beat
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}

3.2.服务端

对于临时实例,服务端代码分两部分:

  • 1)InstanceController提供了一个接口,处理客户端的心跳请求
  • 2)定时检测实例心跳是否按期执行

3.2.1.InstanceController

与服务注册时一样,在nacos-naming模块中的InstanceController类中,定义了一个方法用来处理心跳请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
// 解析心跳的请求参数
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// 尝试根据参数中的namespaceId、serviceName、clusterName、ip、port等信息
// 从Nacos的注册表中 获取实例
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// 如果获取失败,说明心跳失败,实例尚未注册
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}

Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
// 这里重新注册一个实例
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());

serviceManager.registerInstance(namespaceId, serviceName, instance);
}
// 尝试基于namespaceId和serviceName从 注册表中获取Service服务
Service service = serviceManager.getService(namespaceId, serviceName);
// 如果不存在,说明服务不存在,返回404
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// 如果心跳没问题,开始处理心跳结果
service.processClientBeat(clientBeat);

result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}

最终,在确认心跳请求对应的服务、实例都在的情况下,开始交给Service类处理这次心跳请求。调用了Service的processClientBeat方法

3.2.2.处理心跳请求

查看Serviceservice.processClientBeat(clientBeat);方法:

1
2
3
4
5
6
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

可以看到心跳信息被封装到了 ClientBeatProcessor类中,交给了HealthCheckReactor处理,HealthCheckReactor就是对线程池的封装,不用过多查看。

关键的业务逻辑都在ClientBeatProcessor这个类中,它是一个Runnable,其中的run方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}

String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
// 获取集群信息
Cluster cluster = service.getClusterMap().get(clusterName);
// 获取集群中的所有实例信息
List<Instance> instances = cluster.allIPs(true);

for (Instance instance : instances) {
// 找到心跳的这个实例
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 更新实例的最后一次心跳时间 lastBeat
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
}
}
}
}
}

处理心跳请求的核心就是更新心跳实例的最后一次心跳时间,lastBeat,这个会成为判断实例心跳是否过期的关键指标!

3.3.3.心跳异常检测

在服务注册时,一定会创建一个Service对象,而Service中有一个init方法,会在注册时被调用:

1
2
3
4
5
6
7
8
public void init() {
// 开启心跳检测的任务
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}

其中HealthCheckReactor.scheduleCheck就是执行心跳检测的定时任务:

可以看到,该任务是5000ms执行一次,也就是5秒对实例的心跳状态做一次检测。

此处的ClientBeatCheckTask同样是一个Runnable,其中的run方法为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
public void run() {
try {
// 找到所有临时实例的列表
List<Instance> instances = service.allIPs(true);

// first set health status of instances:
for (Instance instance : instances) {
// 判断 心跳间隔(当前时间 - 最后一次心跳时间) 是否大于 心跳超时时间,默认15秒
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
// 如果超时,标记实例为不健康 healthy = false
instance.setHealthy(false);

// 发布实例状态变更的事件
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}

if (!getGlobalConfig().isExpireInstance()) {
return;
}

// then remove obsolete instances:
for (Instance instance : instances) {

if (instance.isMarked()) {
continue;
}
// 判断心跳间隔(当前时间 - 最后一次心跳时间)是否大于 实例被删除的最长超时时间,默认30秒
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// 如果是超过了30秒,则删除实例
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}

} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}

}

其中的超时时间同样是在com.alibaba.nacos.api.common.Constants这个类中:

3.3.4.主动健康检测

对于非临时实例(ephemeral=false),Nacos会采用主动的健康检测,定时向实例发送请求,根据响应来判断实例健康状态。

入口在2.3.2小节的ServiceManager类中的registerInstance方法:

创建空服务时:

1
2
3
4
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
// 如果服务不存在,创建新的服务
createServiceIfAbsent(namespaceId, serviceName, local, null);
}

创建服务流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
// 尝试获取服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
// 发现服务不存在,开始创建新服务
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// ** 写入注册表并初始化 **
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}

关键在putServiceAndInit(service)方法中:

1
2
3
4
5
6
7
8
9
10
11
12
private void putServiceAndInit(Service service) throws NacosException {
// 将服务写入注册表
putService(service);
service = getService(service.getNamespaceId(), service.getName());
// 完成服务的初始化
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

进入初始化逻辑:service.init(),这个会进入Service类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Init service.
*/
public void init() {
// 开启临时实例的心跳监测任务
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
// 遍历注册表中的集群
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
// 完成集群初识化
entry.getValue().init();
}
}

这里集群的初始化entry.getValue().init();会进入Cluster类型的init()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Init cluster.
*/
public void init() {
if (inited) {
return;
}
// 创建健康检测的任务
checkTask = new HealthCheckTask(this);
// 这里会开启对 非临时实例的 定时健康检测
HealthCheckReactor.scheduleCheck(checkTask);
inited = true;
}

这里的HealthCheckReactor.scheduleCheck(checkTask);会开启定时任务,对非临时实例做健康检测。检测逻辑定义在HealthCheckTask这个类中,是一个Runnable,其中的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void run() {

try {
if (distroMapper.responsible(cluster.getService().getName()) && switchDomain
.isHealthCheckEnabled(cluster.getService().getName())) {
// 开始健康检测
healthCheckProcessor.process(this);
// 记录日志 。。。
}
} catch (Throwable e) {
// 记录日志 。。。
} finally {
if (!cancelled) {
// 结束后,再次进行任务调度,一定延迟后执行
HealthCheckReactor.scheduleCheck(this);

// 。。。
}
}
}

健康检测逻辑定义在healthCheckProcessor.process(this);方法中,在HealthCheckProcessor接口中,这个接口也有很多实现,默认是TcpSuperSenseProcessor

进入TcpSuperSenseProcessor的process方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void process(HealthCheckTask task) {
// 获取所有 非临时实例的 集合
List<Instance> ips = task.getCluster().allIPs(false);

if (CollectionUtils.isEmpty(ips)) {
return;
}

for (Instance ip : ips) {
// 封装健康检测信息到 Beat
Beat beat = new Beat(ip, task);
// 放入一个阻塞队列中
taskQueue.add(beat);
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}
}

可以看到,所有的健康检测任务都被放入一个阻塞队列,而不是立即执行了。这里又采用了异步执行的策略,可以看到Nacos中大量这样的设计。

TcpSuperSenseProcessor本身就是一个Runnable,在它的构造函数中会把自己放入线程池中去执行,其run方法如下:

1
2
3
4
5
6
7
8
9
10
11
public void run() {
while (true) {
try {
// 处理任务
processTask();
// ...
} catch (Throwable e) {
SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);
}
}
}

通过processTask来处理健康检测的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void processTask() throws Exception {
// 将任务封装为一个 TaskProcessor,并放入集合
Collection<Callable<Void>> tasks = new LinkedList<>();
do {
Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
if (beat == null) {
return;
}

tasks.add(new TaskProcessor(beat));
} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
// 批量处理集合中的任务
for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
f.get();
}
}

任务被封装到了TaskProcessor中去执行了,TaskProcessor是一个Callable,其中的call方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
public Void call() {
// 获取检测任务已经等待的时长
long waited = System.currentTimeMillis() - beat.getStartTime();
if (waited > MAX_WAIT_TIME_MILLISECONDS) {
Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
}

SocketChannel channel = null;
try {
// 获取实例信息
Instance instance = beat.getIp();
// 通过NIO建立TCP连接
channel = SocketChannel.open();
channel.configureBlocking(false);
// only by setting this can we make the socket close event asynchronous
channel.socket().setSoLinger(false, -1);
channel.socket().setReuseAddress(true);
channel.socket().setKeepAlive(true);
channel.socket().setTcpNoDelay(true);

Cluster cluster = beat.getTask().getCluster();
int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
channel.connect(new InetSocketAddress(instance.getIp(), port));
// 注册连接、读取事件
SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
key.attach(beat);
keyMap.put(beat.toString(), new BeatKey(key));

beat.setStartTime(System.currentTimeMillis());

GlobalExecutor
.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),
"tcp:error:" + e.getMessage());

if (channel != null) {
try {
channel.close();
} catch (Exception ignore) {
}
}
}

return null;
}

3.3.总结

Nacos的健康检测有两种模式:

  • 临时实例:
    • 采用客户端心跳检测模式,心跳周期5秒
    • 心跳间隔超过15秒则标记为不健康
    • 心跳间隔超过30秒则从服务列表删除
  • 永久实例:
    • 采用服务端主动健康检测方式
    • 周期为2000 + 5000毫秒内的随机数
    • 检测异常只会标记为不健康,不会删除

那么为什么Nacos有临时和永久两种实例呢?

以淘宝为例,双十一大促期间,流量会比平常高出很多,此时服务肯定需要增加更多实例来应对高并发,而这些实例在双十一之后就无需继续使用了,采用临时实例比较合适。而对于服务的一些常备实例,则使用永久实例更合适。

与eureka相比,Nacos与Eureka在临时实例上都是基于心跳模式实现,差别不大,主要是心跳周期不同,eureka是30秒,Nacos是5秒。

另外,Nacos支持永久实例,而Eureka不支持,Eureka只提供了心跳模式的健康监测,而没有主动检测功能。

4.服务发现

Nacos提供了一个根据serviceId查询实例列表的接口:

接口描述:查询服务下的实例列表

请求类型:GET

请求路径

1
/nacos/v1/ns/instance/list

请求参数

名称 类型 是否必选 描述
serviceName 字符串 服务名
groupName 字符串 分组名
namespaceId 字符串 命名空间ID
clusters 字符串,多个集群用逗号分隔 集群名称
healthyOnly boolean 否,默认为false 是否只返回健康实例

错误编码

错误代码 描述 语义
400 Bad Request 客户端请求中的语法错误
403 Forbidden 没有权限
404 Not Found 无法找到资源
500 Internal Server Error 服务器内部错误
200 OK 正常

4.1.客户端

4.1.1.定时更新服务列表

4.1.1.1.NacosNamingService

在2.2.4小节中,我们讲到一个类NacosNamingService,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能。

多个重载的方法最终都会进入一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {

ServiceInfo serviceInfo;
// 1.判断是否需要订阅服务信息(默认为 true)
if (subscribe) {
// 1.1.订阅服务信息
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
// 1.2.直接去nacos拉取服务信息
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
// 2.从服务信息中获取实例列表并返回
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}

4.1.1.2.HostReactor

进入1.1.订阅服务消息,这里是由HostReactor类的getServiceInfo()方法来实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
// 由 服务名@@集群名拼接 key
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 读取本地服务列表的缓存,缓存是一个Map,格式:Map<String, ServiceInfo>
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 判断缓存是否存在
if (null == serviceObj) {
// 不存在,创建空ServiceInfo
serviceObj = new ServiceInfo(serviceName, clusters);
// 放入缓存
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
// 放入待更新的服务列表(updatingMap)中
updatingMap.put(serviceName, new Object());
// 立即更新服务列表
updateServiceNow(serviceName, clusters);
// 从待更新列表中移除
updatingMap.remove(serviceName);

} else if (updatingMap.containsKey(serviceName)) {
// 缓存中有,但是需要更新
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish 等待5秒中,待更新完成
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 开启定时更新服务列表的功能
scheduleUpdateIfAbsent(serviceName, clusters);
// 返回缓存中的服务信息
return serviceInfoMap.get(serviceObj.getKey());
}

基本逻辑就是先从本地缓存读,根据结果来选择:

  • 如果本地缓存没有,立即去nacos读取,updateServiceNow(serviceName, clusters)

  • 如果本地缓存有,则开启定时更新功能,并返回缓存结果:

    • scheduleUpdateIfAbsent(serviceName, clusters)

    在UpdateTask中,最终还是调用updateService方法:

不管是立即更新服务列表,还是定时更新服务列表,最终都会执行HostReactor中的updateService()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 基于ServerProxy发起远程调用,查询服务列表
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

if (StringUtils.isNotEmpty(result)) {
// 处理查询结果
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}

4.1.1.3.ServerProxy

而ServerProxy的queryList方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
// 准备请求参数
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
// 发起请求,地址与API接口一致
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

4.1.2.处理服务变更通知

除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能。

在HostReactor类的构造函数中,有非常重要的几个步骤:

基本思路是:

  • 通过PushReceiver监听服务端推送的变更数据
  • 解析数据后,通过NotifyCenter发布服务变更的事件
  • InstanceChangeNotifier监听变更事件,完成对服务列表的更新

4.1.2.1.PushReceiver

我们先看PushReceiver,这个类会以UDP方式接收Nacos服务端推送的服务变更数据。

先看构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
// 创建 UDP客户端
String udpPort = getPushReceiverUdpPort();
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
// 准备线程池
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
// 开启线程任务,准备接收变更数据
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}

PushReceiver构造函数中基于线程池来运行任务。这是因为PushReceiver本身也是一个Runnable,其中的run方法业务逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// 接收推送数据
udpSocket.receive(packet);
// 解析为json字符串
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
// 反序列化为对象
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
// 交给 HostReactor去处理
hostReactor.processServiceJson(pushPacket.data);

// send ack to server 发送ACK回执,略。。
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}

4.1.2.2.HostReactor

通知数据的处理由交给了HostReactorprocessServiceJson方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public ServiceInfo processServiceJson(String json) {
// 解析出ServiceInfo信息
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
// 查询缓存中的 ServiceInfo
ServiceInfo oldService = serviceInfoMap.get(serviceKey);

// 如果缓存存在,则需要校验哪些数据要更新
boolean changed = false;
if (oldService != null) {
// 拉取的数据是否已经过期
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
+ serviceInfo.getLastRefTime());
}
// 放入缓存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

// 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例;
// modHosts:需要修改的实例
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
// 发布实例变更的事件
NotifyCenter.publishEvent(new InstancesChangeEvent(
serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}

} else {
// 本地缓存不存在
changed = true;
// 放入缓存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 直接发布实例变更的事件
NotifyCenter.publishEvent(new InstancesChangeEvent(
serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}
// 。。。
return serviceInfo;
}

4.2.服务端

4.2.1.拉取服务列表接口

在2.3.1小节介绍的InstanceController中,提供了拉取服务列表的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Get all instance of input service.
*
* @param request http request
* @return list of instance
* @throws Exception any error during list
*/
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
// 从request中获取namespaceId和serviceName
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);

String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
// 获取客户端的 UDP端口
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));

// 获取服务列表
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}

进入doSrvIpxt()方法来获取服务列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,
String clusters, String clientIP,
int udpPort, String env, boolean isCheck,
String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// 获取服务列表信息
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();

// now try to enable the push
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {
// 添加当前客户端 IP、UDP端口到 PushService 中
pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}

if (service == null) {
// 如果没找到,返回空
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
// 结果的检测,异常实例的剔除等逻辑省略
// 最终封装结果并返回 。。。

result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}

4.2.2.发布服务变更的UDP通知

在上一节中,InstanceController中的doSrvIpxt()方法中,有这样一行代码:

1
2
3
pushService.addClient(namespaceId, serviceName, clusters, agent,
new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);

其实是把消费者的UDP端口、IP等信息封装为一个PushClient对象,存储PushService中。方便以后服务变更后推送消息。

PushService类本身实现了ApplicationListener接口:

这个是事件监听器接口,监听的是ServiceChangeEvent(服务变更事件)。

当服务列表变化时,就会通知我们:

4.3.总结

Nacos的服务发现分为两种模式:

  • 模式一:主动拉取模式,消费者定期主动从Nacos拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表。
  • 模式二:订阅模式,消费者订阅Nacos中的服务列表,并基于UDP协议来接收服务变更通知。当Nacos中的服务列表更新时,会发送UDP广播给所有订阅者。

与Eureka相比,Nacos的订阅模式服务状态更新更及时,消费者更容易及时发现服务列表的变化,剔除故障服务。

Sentinel源码分析

1.Sentinel的基本概念

Sentinel实现限流、隔离、降级、熔断等功能,本质要做的就是两件事情:

  • 统计数据:统计某个资源的访问数据(QPS、RT等信息)
  • 规则判断:判断限流规则、隔离规则、降级规则、熔断规则是否满足

这里的资源就是希望被Sentinel保护的业务,例如项目中定义的controller方法就是默认被Sentinel保护的资源。

1.1.ProcessorSlotChain

实现上述功能的核心骨架是一个叫做ProcessorSlotChain的类。这个类基于责任链模式来设计,将不同的功能(限流、降级、系统保护)封装为一个个的Slot,请求进入后逐个执行即可。

其工作流如图:

责任链中的Slot也分为两大类:

  • 统计数据构建部分(statistic)
    • NodeSelectorSlot:负责构建簇点链路中的节点(DefaultNode),将这些节点形成链路树
    • ClusterBuilderSlot:负责构建某个资源的ClusterNode,ClusterNode可以保存资源的运行信息(响应时间、QPS、block 数目、线程数、异常数等)以及来源信息(origin名称)
    • StatisticSlot:负责统计实时调用数据,包括运行信息、来源信息等
  • 规则判断部分(rule checking)
    • AuthoritySlot:负责授权规则(来源控制)
    • SystemSlot:负责系统保护规则
    • ParamFlowSlot:负责热点参数限流规则
    • FlowSlot:负责限流规则
    • DegradeSlot:负责降级规则

1.2.Node

Sentinel中的簇点链路是由一个个的Node组成的,Node是一个接口,包括下面的实现:

所有的节点都可以记录对资源的访问统计数据,所以都是StatisticNode的子类。

按照作用分为两类Node:

  • DefaultNode:代表链路树中的每一个资源,一个资源出现在不同链路中时,会创建不同的DefaultNode节点。而树的入口节点叫EntranceNode,是一种特殊的DefaultNode
  • ClusterNode:代表资源,一个资源不管出现在多少链路中,只会有一个ClusterNode。记录的是当前资源被访问的所有统计数据之和。

DefaultNode记录的是资源在当前链路中的访问数据,用来实现基于链路模式的限流规则。ClusterNode记录的是资源在所有链路中的访问数据,实现默认模式、关联模式的限流规则。

例如:我们在一个SpringMVC项目中,有两个业务:

  • 业务1:controller中的资源/order/query访问了service中的资源/goods
  • 业务2:controller中的资源/order/save访问了service中的资源/goods

创建的链路图如下:

1.3.Entry

默认情况下,Sentinel会将controller中的方法作为被保护资源,那么问题来了,我们该如何将自己的一段代码标记为一个Sentinel的资源呢?

Sentinel中的资源用Entry来表示。声明Entry的API示例:

1
2
3
4
5
6
7
8
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
try (Entry entry = SphU.entry("resourceName")) {
// 被保护的业务逻辑
// do something here...
} catch (BlockException ex) {
// 资源访问阻止,被限流或被降级
// 在此处进行相应的处理操作
}

1.3.1.自定义资源

例如,我们在order-service服务中,将OrderServicequeryOrderById()方法标记为一个资源。

1)首先在order-service中引入sentinel依赖

1
2
3
4
5
<!--sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

2)然后配置Sentinel地址

1
2
3
4
5
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8089 # 这里我的sentinel用了8089的端口

3)修改OrderService类的queryOrderById方法

代码这样来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Order queryOrderById(Long orderId) {
// 创建Entry,标记资源,资源名为resource1
try (Entry entry = SphU.entry("resource1")) {
// 1.查询订单,这里是假数据
Order order = Order.build(101L, 4999L, "小米 MIX4", 1, 1L, null);
// 2.查询用户,基于Feign的远程调用
User user = userClient.findById(order.getUserId());
// 3.设置
order.setUser(user);
// 4.返回
return order;
}catch (BlockException e){
log.error("被限流或降级", e);
return null;
}
}

4)访问

打开浏览器,访问order服务:http://localhost:8080/order/101

然后打开sentinel控制台,查看簇点链路:

1.3.2.基于注解标记资源

在之前学习Sentinel的时候,我们知道可以通过给方法添加@SentinelResource注解的形式来标记资源。

这个是怎么实现的呢?

来看下我们引入的Sentinel依赖包:

其中的spring.factories声明需要就是自动装配的配置类,内容如下:

我们来看下SentinelAutoConfiguration这个类:

可以看到,在这里声明了一个Bean,SentinelResourceAspect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Aspect for methods with {@link SentinelResource} annotation.
*
* @author Eric Zhao
*/
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
// 切点是添加了 @SentinelResource注解的类
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}

// 环绕增强
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
// 获取受保护的方法
Method originMethod = resolveMethod(pjp);
// 获取 @SentinelResource注解
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
// 获取注解上的资源名称
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
// 创建资源 Entry
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 执行受保护的方法
Object result = pjp.proceed();
return result;
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list will be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
return handleFallback(pjp, annotation, ex);
}

// No fallback function can handle the exception, so throw it out.
throw ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}

简单来说,@SentinelResource注解就是一个标记,而Sentinel基于AOP思想,对被标记的方法做环绕增强,完成资源(Entry)的创建。

1.4.Context

上一节,我们发现簇点链路中除了controller方法、service方法两个资源外,还多了一个默认的入口节点:

sentinel_spring_web_context,是一个EntranceNode类型的节点

这个节点是在初始化Context的时候由Sentinel帮我们创建的。

1.4.1.什么是Context

那么,什么是Context呢?

  • Context 代表调用链路上下文,贯穿一次调用链路中的所有资源( Entry),基于ThreadLocal。
  • Context 维持着入口节点(entranceNode)、本次调用链路的 curNode(当前资源节点)、调用来源(origin)等信息。
  • 后续的Slot都可以通过Context拿到DefaultNode或者ClusterNode,从而获取统计数据,完成规则判断
  • Context初始化的过程中,会创建EntranceNode,contextName就是EntranceNode的名称

对应的API如下:

1
2
// 创建context,包含两个参数:context名称、 来源名称
ContextUtil.enter("contextName", "originName");

1.4.2.Context的初始化

那么这个Context又是在何时完成初始化的呢?

1.4.2.1.自动装配

来看下我们引入的Sentinel依赖包:

其中的spring.factories声明需要就是自动装配的配置类,内容如下:

我们先看SentinelWebAutoConfiguration这个类:

这个类实现了WebMvcConfigurer,我们知道这个是SpringMVC自定义配置用到的类,可以配置HandlerInterceptor:

可以看到这里配置了一个SentinelWebInterceptor的拦截器。

SentinelWebInterceptor的声明如下:

发现它继承了AbstractSentinelInterceptor这个类。

HandlerInterceptor拦截器会拦截一切进入controller的方法,执行preHandle前置拦截方法,而Context的初始化就是在这里完成的。

1.4.2.2.AbstractSentinelInterceptor

HandlerInterceptor拦截器会拦截一切进入controller的方法,执行preHandle前置拦截方法,而Context的初始化就是在这里完成的。

我们来看看这个类的preHandle实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
// 获取资源名称,一般是controller方法的@RequestMapping路径,例如/order/{orderId}
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
// 从request中获取请求来源,将来做 授权规则 判断时会用
String origin = parseOrigin(request);

// 获取 contextName,默认是sentinel_spring_web_context
String contextName = getContextName(request);
// 创建 Context
ContextUtil.enter(contextName, origin);
// 创建资源,名称就是当前请求的controller方法的映射路径
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try {
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}

1.4.2.3.ContextUtil

创建Context的方法就是ContextUtil.enter(contextName, origin);

我们进入该方法:

1
2
3
4
5
6
7
public static Context enter(String name, String origin) {
if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
throw new ContextNameDefineException(
"The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
}
return trueEnter(name, origin);
}

进入trueEnter方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected static Context trueEnter(String name, String origin) {
// 尝试获取context
Context context = contextHolder.get();
// 判空
if (context == null) {
// 如果为空,开始初始化
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
// 尝试获取入口节点
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
LOCK.lock();
try {
node = contextNameNodeMap.get(name);
if (node == null) {
// 入口节点为空,初始化入口节点 EntranceNode
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// 添加入口节点到 ROOT
Constants.ROOT.addChild(node);
// 将入口节点放入缓存
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
} finally {
LOCK.unlock();
}
}
// 创建Context,参数为:入口节点 和 contextName
context = new Context(node, name);
// 设置请求来源 origin
context.setOrigin(origin);
// 放入ThreadLocal
contextHolder.set(context);
}
// 返回
return context;
}

2.ProcessorSlotChain执行流程

接下来我们跟踪源码,验证下ProcessorSlotChain的执行流程。

2.1.入口

首先,回到一切的入口,AbstractSentinelInterceptor类的preHandle方法:

还有,SentinelResourceAspect的环绕增强方法:

可以看到,任何一个资源必定要执行SphU.entry()这个方法:

1
2
3
4
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
throws BlockException {
return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}

继续进入Env.sph.entryWithType(name, resourceType, trafficType, 1, args);

1
2
3
4
5
6
7
8
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException {
// 将 资源名称等基本信息 封装为一个 StringResourceWrapper对象
StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
// 继续
return entryWithPriority(resource, count, prioritized, args);
}

进入entryWithPriority方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 获取 Context
Context context = ContextUtil.getContext();

if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 获取 Slot执行链,同一个资源,会创建一个执行链,放入缓存
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

// 创建 Entry,并将 resource、chain、context 记录在 Entry中
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 执行 slotChain
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}

在这段代码中,会获取ProcessorSlotChain对象,然后基于chain.entry()开始执行slotChain中的每一个Slot. 而这里创建的是其实现类:DefaultProcessorSlotChain.

获取ProcessorSlotChain以后会保存到一个Map中,key是ResourceWrapper,值是ProcessorSlotChain.

所以,一个资源只会有一个ProcessorSlotChain.

2.2.DefaultProcessorSlotChain

我们进入DefaultProcessorSlotChain的entry方法:

1
2
3
4
5
6
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
// first,就是责任链中的第一个 slot
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}

这里的first,类型是AbstractLinkedProcessorSlot:

看下继承关系:

因此,first一定是这些实现类中的一个,按照最早讲的责任链顺序,first应该就是 NodeSelectorSlot

不过,既然是基于责任链模式,所以这里只要记住下一个slot就可以了,也就是next:

next确实是NodeSelectSlot类型。

而NodeSelectSlot的next一定是ClusterBuilderSlot,依次类推:

责任链就建立起来了。

2.3.NodeSelectorSlot

NodeSelectorSlot负责构建簇点链路中的节点(DefaultNode),将这些节点形成链路树。

核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 尝试获取 当前资源的 DefaultNode
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 如果为空,为当前资源创建一个新的 DefaultNode
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
// 放入缓存中,注意这里的 key是contextName,
// 这样不同链路进入相同资源,就会创建多个 DefaultNode
cacheMap.put(context.getName(), node);
map = cacheMap;
// 当前节点加入上一节点的 child中,这样就构成了调用链路树
((DefaultNode) context.getLastNode()).addChild(node);
}

}
}
// context中的curNode(当前节点)设置为新的 node
context.setCurNode(node);
// 执行下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

这个Slot完成了这么几件事情:

  • 为当前资源创建 DefaultNode
  • 将DefaultNode放入缓存中,key是contextName,这样不同链路入口的请求,将会创建多个DefaultNode,相同链路则只有一个DefaultNode
  • 将当前资源的DefaultNode设置为上一个资源的childNode
  • 将当前资源的DefaultNode设置为Context中的curNode(当前节点)

下一个slot,就是ClusterBuilderSlot

2.4.ClusterBuilderSlot

ClusterBuilderSlot负责构建某个资源的ClusterNode,核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count, boolean prioritized, Object... args)
throws Throwable {
// 判空,注意ClusterNode是共享的成员变量,也就是说一个资源只有一个ClusterNode,与链路无关
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// 创建 cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
// 放入缓存,可以是nodeId,也就是resource名称
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// 将资源的 DefaultNode与 ClusterNode关联
node.setClusterNode(clusterNode);
// 记录请求来源 origin 将 origin放入 entry
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
// 继续下一个slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

2.5.StatisticSlot

StatisticSlot负责统计实时调用数据,包括运行信息(访问次数、线程数)、来源信息等。

StatisticSlot是实现限流的关键,其中基于滑动时间窗口算法维护了计数器,统计进入某个资源的请求次数。

核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count, boolean prioritized, Object... args) throws Throwable {
try {
// 放行到下一个 slot,做限流、降级等判断
fireEntry(context, resourceWrapper, node, count, prioritized, args);

// 请求通过了, 线程计数器 +1 ,用作线程隔离
node.increaseThreadNum();
// 请求计数器 +1 用作限流
node.addPassRequest(count);

if (context.getCurEntry().getOriginNode() != null) {
// 如果有 origin,来源计数器也都要 +1
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}

if (resourceWrapper.getEntryType() == EntryType.IN) {
// 如果是入口资源,还要给全局计数器 +1.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}

// 请求通过后的回调.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (Throwable e) {
// 各种异常处理就省略了。。。
context.getCurEntry().setError(e);

throw e;
}
}

另外,需要注意的是,所有的计数+1动作都包括两部分,以node.addPassRequest(count);为例:

1
2
3
4
5
6
7
@Override
public void addPassRequest(int count) {
// DefaultNode的计数器,代表当前链路的 计数器
super.addPassRequest(count);
// ClusterNode计数器,代表当前资源的 总计数器
this.clusterNode.addPassRequest(count);
}

具体计数方式,我们后续再看。

接下来,进入规则校验的相关slot了,依次是:

  • AuthoritySlot:负责授权规则(来源控制)
  • SystemSlot:负责系统保护规则
  • ParamFlowSlot:负责热点参数限流规则
  • FlowSlot:负责限流规则
  • DegradeSlot:负责降级规则

2.6.AuthoritySlot

负责请求来源origin的授权规则判断,如图:

核心API:

1
2
3
4
5
6
7
8
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
// 校验黑白名单
checkBlackWhiteAuthority(resourceWrapper, context);
// 进入下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

黑白名单校验的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
// 获取授权规则
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

if (authorityRules == null) {
return;
}

Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
// 遍历规则并判断
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
// 规则不通过,直接抛出异常
throw new AuthorityException(context.getOrigin(), rule);
}
}
}

再看下AuthorityRuleChecker.passCheck(rule, context)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static boolean passCheck(AuthorityRule rule, Context context) {
// 得到请求来源 origin
String requester = context.getOrigin();

// 来源为空,或者规则为空,都直接放行
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true;
}

// rule.getLimitApp()得到的就是 白名单 或 黑名单 的字符串,这里先用 indexOf方法判断
int pos = rule.getLimitApp().indexOf(requester);
boolean contain = pos > -1;

if (contain) {
// 如果包含 origin,还要进一步做精确判断,把名单列表以","分割,逐个判断
boolean exactlyMatch = false;
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
if (requester.equals(app)) {
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
// 如果是黑名单,并且包含origin,则返回false
int strategy = rule.getStrategy();
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false;
}
// 如果是白名单,并且不包含origin,则返回false
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false;
}
// 其它情况返回true
return true;
}

2.7.SystemSlot

SystemSlot是对系统保护的规则校验:

核心API:

1
2
3
4
5
6
7
8
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count,boolean prioritized, Object... args) throws Throwable {
// 系统规则校验
SystemRuleManager.checkSystem(resourceWrapper);
// 进入下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

来看下SystemRuleManager.checkSystem(resourceWrapper);的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {
return;
}

// 只针对入口资源做校验,其它直接返回
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}

// 全局 QPS校验
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}

// 全局 线程数 校验
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
// 全局平均 RT校验
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}

// 全局 系统负载 校验
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}

// 全局 CPU使用率 校验
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}

2.8.ParamFlowSlot

ParamFlowSlot就是热点参数限流,如图:

是针对进入资源的请求,针对不同的请求参数值分别统计QPS的限流方式。

  • 这里的单机阈值,就是最大令牌数量:maxCount

  • 这里的统计窗口时长,就是统计时长:duration

含义是每隔duration时间长度内,最多生产maxCount个令牌,上图配置的含义是每1秒钟生产2个令牌。

核心API:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count, boolean prioritized, Object... args) throws Throwable {
// 如果没有设置热点规则,直接放行
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
// 热点规则判断
checkFlow(resourceWrapper, count, args);
// 进入下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

2.8.1.令牌桶

热点规则判断采用了令牌桶算法来实现参数限流,为每一个不同参数值设置令牌桶,Sentinel的令牌桶有两部分组成:

这两个Map的key都是请求的参数值,value却不同,其中:

  • tokenCounters:用来记录剩余令牌数量
  • timeCounters:用来记录上一个请求的时间

当一个携带参数的请求到来后,基本判断流程是这样的:

sentinel

2.9.FlowSlot

FlowSlot是负责限流规则的判断,如图:

包括:

  • 三种流控模式:直接模式、关联模式、链路模式
  • 三种流控效果:快速失败、warm up、排队等待

三种流控模式,从底层数据统计角度,分为两类:

  • 对进入资源的所有请求(ClusterNode)做限流统计:直接模式、关联模式
  • 对进入资源的部分链路(DefaultNode)做限流统计:链路模式

三种流控效果,从限流算法来看,分为两类:

  • 滑动时间窗口算法:快速失败、warm up
  • 漏桶算法:排队等待效果

2.9.1.核心流程

核心API如下:

1
2
3
4
5
6
7
8
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 限流规则检测
checkFlow(resourceWrapper, context, node, count, prioritized);
// 放行
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

checkFlow方法:

1
2
3
4
5
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
// checker是 FlowRuleChecker 类的一个对象
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

跟入FlowRuleChecker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, 
ResourceWrapper resource,Context context, DefaultNode node,
int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// 获取当前资源的所有限流规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 遍历,逐个规则做校验
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}

这里的FlowRule就是限流规则接口,其中的几个成员变量,刚好对应表单参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FlowRule extends AbstractRule {
/**
* 阈值类型 (0: 线程, 1: QPS).
*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
/**
* 阈值.
*/
private double count;
/**
* 三种限流模式.
*
* {@link RuleConstant#STRATEGY_DIRECT} 直连模式;
* {@link RuleConstant#STRATEGY_RELATE} 关联模式;
* {@link RuleConstant#STRATEGY_CHAIN} 链路模式.
*/
private int strategy = RuleConstant.STRATEGY_DIRECT;
/**
* 关联模式关联的资源名称.
*/
private String refResource;
/**
* 3种流控效果.
* 0. 快速失败, 1. warm up, 2. 排队等待, 3. warm up + 排队等待
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
// 预热时长
private int warmUpPeriodSec = 10;
/**
* 队列最大等待时间.
*/
private int maxQueueingTimeMs = 500;
// 。。。 略
}

校验的逻辑定义在FlowRuleCheckercanPassCheck方法中:

1
2
3
4
5
6
7
8
9
10
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 获取限流资源名称
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
// 校验规则
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

进入passLocalCheck()

1
2
3
4
5
6
7
8
9
10
11
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node,
int acquireCount, boolean prioritized) {
// 基于限流模式判断要统计的节点,
// 如果是直连模式,关联模式,对ClusterNode统计,如果是链路模式,则对DefaultNode统计
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// 判断规则
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

这里对规则的判断先要通过FlowRule#getRater()获取流量控制器TrafficShapingController,然后再做限流。

TrafficShapingController有3种实现:

  • DefaultController:快速失败,默认的方式,基于滑动时间窗口算法
  • WarmUpController:预热模式,基于滑动时间窗口算法,只不过阈值是动态的
  • RateLimiterController:排队等待模式,基于漏桶算法

最终的限流判断都在TrafficShapingController的canPass方法中。

2.9.2.滑动时间窗口

滑动时间窗口的功能分两部分来看:

  • 一是时间区间窗口的QPS计数功能,这个是在StatisticSlot中调用的
  • 二是对滑动窗口内的时间区间窗口QPS累加,这个是在FlowRule中调用的

先来看时间区间窗口的QPS计数功能。

2.9.2.1.时间窗口请求量统计

回顾2.5章节中的StatisticSlot部分,有这样一段代码:

就是在统计通过该节点的QPS,我们跟入看看,这里进入了DefaultNode内部:

发现同时对DefaultNodeClusterNode在做QPS统计,我们知道DefaultNodeClusterNode都是StatisticNode的子类,这里调用addPassRequest()方法,最终都会进入StatisticNode中。

随便跟入一个:

这里有秒、分两种纬度的统计,对应两个计数器。找到对应的成员变量,可以看到:

两个计数器都是ArrayMetric类型,并且传入了两个参数:

1
2
3
4
5
// intervalInMs:是滑动窗口的时间间隔,默认为 1 秒
// sampleCount: 时间窗口的分隔数量,默认为 2,就是把 1秒分为 2个小时间窗
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}

如图:

接下来,我们进入ArrayMetric类的addPass方法:

1
2
3
4
5
6
7
@Override
public void addPass(int count) {
// 获取当前时间所在的时间窗
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 计数器 +1
wrap.value().addPass(count);
}

那么,计数器如何知道当前所在的窗口是哪个呢?

这里的data是一个LeapArray:

LeapArray的四个属性:

1
2
3
4
5
6
7
8
9
10
public abstract class LeapArray<T> {
// 小窗口的时间长度,默认是500ms ,值 = intervalInMs / sampleCount
protected int windowLengthInMs;
// 滑动窗口内的 小窗口 数量,默认为 2
protected int sampleCount;
// 滑动窗口的时间间隔,默认为 1000ms
protected int intervalInMs;
// 滑动窗口的时间间隔,单位为秒,默认为 1
private double intervalInSecond;
}

LeapArray是一个环形数组,因为时间是无限的,数组长度不可能无限,因此数组中每一个格子放入一个时间窗(window),当数组放满后,角标归0,覆盖最初的window。

因为滑动窗口最多分成sampleCount数量的小窗口,因此数组长度只要大于sampleCount,那么最近的一个滑动窗口内的2个小窗口就永远不会被覆盖,就不用担心旧数据被覆盖的问题了。

我们跟入data.currentWindow();方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算当前时间对应的数组角标
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间所在窗口的开始时间.
long windowStart = calculateWindowStart(timeMillis);

/*
* 先根据角标获取数组中保存的 oldWindow 对象,可能是旧数据,需要判断.
*
* (1) oldWindow 不存在, 说明是第一次,创建新 window并存入,然后返回即可
* (2) oldWindow的 starTime = 本次请求的 windowStar, 说明正是要找的窗口,直接返回.
* (3) oldWindow的 starTime < 本次请求的 windowStar, 说明是旧数据,需要被覆盖,创建
* 新窗口,覆盖旧窗口
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 创建新 window
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// 基于CAS写入数组,避免线程安全问题
if (array.compareAndSet(idx, null, window)) {
// 写入成功,返回新的 window
return window;
} else {
// 写入失败,说明有并发更新,等待其它人更新完成即可
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// 获取并发锁,覆盖旧窗口并返回
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// 获取锁失败,等待其它线程处理就可以了
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 这种情况不应该存在,写这里只是以防万一。
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

找到当前时间所在窗口(WindowWrap)后,只要调用WindowWrap对象中的add方法,计数器+1即可。

这里只负责统计每个窗口的请求量,不负责拦截。限流拦截要看FlowSlot中的逻辑。

2.9.2.2.滑动窗口QPS计算

在2.9.1小节我们讲过,FlowSlot的限流判断最终都由TrafficShapingController接口中的canPass方法来实现。该接口有三个实现类:

  • DefaultController:快速失败,默认的方式,基于滑动时间窗口算法
  • WarmUpController:预热模式,基于滑动时间窗口算法,只不过阈值是动态的
  • RateLimiterController:排队等待模式,基于漏桶算法

因此,我们跟入默认的DefaultController中的canPass方法来分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 计算目前为止滑动窗口内已经存在的请求量
int curCount = avgUsedTokens(node);
// 判断:已使用请求量 + 需要的请求量(1) 是否大于 窗口的请求阈值
if (curCount + acquireCount > count) {
// 大于,说明超出阈值,返回false
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);

// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
// 小于等于,说明在阈值范围内,返回true
return true;
}

因此,判断的关键就是int curCount = avgUsedTokens(node);

1
2
3
4
5
6
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

因为我们采用的是限流,走node.passQps()逻辑:

1
2
3
4
5
6
// 这里又进入了 StatisticNode类
@Override
public double passQps() {
// 请求量 ÷ 滑动窗口时间间隔 ,得到的就是QPS
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}

那么rollingCounterInSecond.pass()是如何得到请求量的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// rollingCounterInSecond 本质是ArrayMetric,之前说过
@Override
public long pass() {
// 获取当前窗口
data.currentWindow();
long pass = 0;
// 获取 当前时间的 滑动窗口范围内 的所有小窗口
List<MetricBucket> list = data.values();
// 遍历
for (MetricBucket window : list) {
// 累加求和
pass += window.pass();
}
// 返回
return pass;
}

来看看data.values()如何获取 滑动窗口范围内 的所有小窗口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 此处进入LeapArray类中:

public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
// 创建空集合,大小等于 LeapArray长度
int size = array.length();
List<T> result = new ArrayList<T>(size);
// 遍历 LeapArray
for (int i = 0; i < size; i++) {
// 获取每一个小窗口
WindowWrap<T> windowWrap = array.get(i);
// 判断这个小窗口是否在 滑动窗口时间范围内(1秒内)
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
// 不在范围内,则跳过
continue;
}
// 在范围内,则添加到集合中
result.add(windowWrap.value());
}
// 返回集合
return result;
}

那么,isWindowDeprecated(timeMillis, windowWrap)又是如何判断窗口是否符合要求呢?

1
2
3
4
5
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
// 当前时间 - 窗口开始时间 是否大于 滑动窗口的最大间隔(1秒)
// 也就是说,我们要统计的时 距离当前时间1秒内的 小窗口的 count之和
return time - windowWrap.windowStart() > intervalInMs;
}

2.9.3.漏桶

上一节我们讲过,FlowSlot的限流判断最终都由TrafficShapingController接口中的canPass方法来实现。该接口有三个实现类:

  • DefaultController:快速失败,默认的方式,基于滑动时间窗口算法
  • WarmUpController:预热模式,基于滑动时间窗口算法,只不过阈值是动态的
  • RateLimiterController:排队等待模式,基于漏桶算法

因此,我们跟入默认的RateLimiterController中的canPass方法来分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// 阈值小于等于 0 ,阻止请求
if (count <= 0) {
return false;
}
// 获取当前时间
long currentTime = TimeUtil.currentTimeMillis();
// 计算两次请求之间允许的最小时间间隔
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

// 计算本次请求 允许执行的时间点 = 最近一次请求的可执行时间 + 两次请求的最小间隔
long expectedTime = costTime + latestPassedTime.get();
// 如果允许执行的时间点小于当前时间,说明可以立即执行
if (expectedTime <= currentTime) {
// 更新上一次的请求的执行时间
latestPassedTime.set(currentTime);
return true;
} else {
// 不能立即执行,需要计算 预期等待时长
// 预期等待时长 = 两次请求的最小间隔 +最近一次请求的可执行时间 - 当前时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 如果预期等待时间超出阈值,则拒绝请求
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 预期等待时间小于阈值,更新最近一次请求的可执行时间,加上costTime
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 保险起见,再判断一次预期等待时间,是否超过阈值
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
// 如果超过,则把刚才 加 的时间再 减回来
latestPassedTime.addAndGet(-costTime);
// 拒绝
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
// 预期等待时间在阈值范围内,休眠要等待的时间,醒来后继续执行
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}

与我们之前分析的漏桶算法基本一致:

2.10.DegradeSlot

最后一关,就是降级规则判断了。

Sentinel的降级是基于状态机来实现的:

对应的实现在DegradeSlot类中,核心API:

1
2
3
4
5
6
7
8
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count, boolean prioritized, Object... args) throws Throwable {
// 熔断降级规则判断
performChecking(context, resourceWrapper);
// 继续下一个slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

继续进入performChecking方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
void performChecking(Context context, ResourceWrapper r) throws BlockException {
// 获取当前资源上的所有的断路器 CircuitBreaker
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
// 遍历断路器,逐个判断
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}

2.10.1.CircuitBreaker

我们进入CircuitBreaker的tryPass方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public boolean tryPass(Context context) {
// 判断状态机状态
if (currentState.get() == State.CLOSED) {
// 如果是closed状态,直接放行
return true;
}
if (currentState.get() == State.OPEN) {
// 如果是OPEN状态,断路器打开
// 继续判断OPEN时间窗是否结束,如果是则把状态从OPEN切换到 HALF_OPEN,返回true
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
// OPEN状态,并且时间窗未到,返回false
return false;
}

有关时间窗的判断在retryTimeoutArrived()方法:

1
2
3
4
protected boolean retryTimeoutArrived() {
// 当前时间 大于 下一次 HalfOpen的重试时间
return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}

OPEN到HALF_OPEN切换在fromOpenToHalfOpen(context)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected boolean fromOpenToHalfOpen(Context context) {
// 基于CAS修改状态,从 OPEN到 HALF_OPEN
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
// 状态变更的事件通知
notifyObservers(State.OPEN, State.HALF_OPEN, null);
// 得到当前资源
Entry entry = context.getCurEntry();
// 给资源设置监听器,在资源Entry销毁时(资源业务执行完毕时)触发
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// 判断 资源业务是否异常
if (entry.getBlockError() != null) {
// 如果异常,则再次进入OPEN状态
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
return true;
}
return false;
}

这里出现了从OPEN到HALF_OPEN、从HALF_OPEN到OPEN的变化,但是还有几个没有:

  • 从CLOSED到OPEN
  • 从HALF_OPEN到CLOSED

2.10.2.触发断路器

请求经过所有插槽 后,一定会执行exit方法,而在DegradeSlot的exit方法中:

会调用CircuitBreaker的onRequestComplete方法。而CircuitBreaker有两个实现:

我们这里以异常比例熔断为例来看,进入ExceptionCircuitBreakeronRequestComplete方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void onRequestComplete(Context context) {
// 获取资源 Entry
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
// 尝试获取 资源中的 异常
Throwable error = entry.getError();
// 获取计数器,同样采用了滑动窗口来计数
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
// 如果出现异常,则 error计数器 +1
counter.getErrorCount().add(1);
}
// 不管是否出现异常,total计数器 +1
counter.getTotalCount().add(1);
// 判断异常比例是否超出阈值
handleStateChangeWhenThresholdExceeded(error);
}

来看阈值判断的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
// 如果当前已经是OPEN状态,不做处理
if (currentState.get() == State.OPEN) {
return;
}
// 如果已经是 HALF_OPEN 状态,判断是否需求切换状态
if (currentState.get() == State.HALF_OPEN) {
if (error == null) {
// 没有异常,则从 HALF_OPEN 到 CLOSED
fromHalfOpenToClose();
} else {
// 有一次,再次进入OPEN
fromHalfOpenToOpen(1.0d);
}
return;
}
// 说明当前是CLOSE状态,需要判断是否触发阈值
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
// 累加计算 异常请求数量、总请求数量
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
// 如果总请求数量未达到阈值,什么都不做
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// 计算请求的异常比例
curCount = errCount * 1.0d / totalCount;
}
// 如果比例超过阈值,切换到 OPEN
if (curCount > threshold) {
transformToOpen(curCount);
}
}