# 一、微服务保护🎄
- Sentinel
学习内容:
- 初始 Sentinel
- 流量控制
- 隔离和降级
- 授权规则
- 规则持久化
# 1.1、初始 Sentinel🌳
- 雪崩问题及解决方案
- 服务保护技术对比
- Sentinel 介绍和安装
- 微服务整合 Sentinel
# 1.1.1、雪崩问题🌲
微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用,这就是雪崩。
场景:
比方说在服务 A 内部依赖于服务 B,而服务 A 内部可能还有一些其它的业务比如说它依赖于服务 C 或者是依赖于服务 D
现在假设说服务 D 出现了故障,那服务 A 内部依赖与服务 D 的业务请求就不能正常访问了
因为服务 A 访问服务 D 就必然要等待服务 D 的响应结果,而因为服务 D 出现了故障不可能返回结果它会阻塞,那就导致了服务 A 内部业务也会阻塞,阻塞 它就不会释放 tomcat 的链接。当然其它业务请求不受影响还可以正常工作。
但是既然有第一个依赖于服务 D 这样的请求,那一定还会有第二个甚至第三个。这样依赖于服务 D 的业务请求越来越多,而它们都不会释放连接,那么时久一定会把服务 A 内部所有的链接都给占用了
也就是说 tomcat 资源耗尽了
此时再有服务进来哪怕不是依赖于服务 D 的而是服务 B 的,是不是也进不来了。那就可以认为服务 A 也出现了故障
这就造成了一个服务的故障导致了依赖于它的服务最终也出现了故障。在微服务里这种调用关系可不止这么简单
那么服务 A 依赖于服务 D 导致最后给服务 D 拖垮了,那肯定还会有其它的服务也依赖于服务 D,最终也会被拖垮,将来其它依赖于服务 A 的也会出现故障,最终故障的服务越来越多那么整个微服务群就不可用了。
这不就是雪崩了吗!
# 1.1.2、解决雪崩问题🌲
解决雪崩问题的常见方式有四种:
1、超时处理:设定超时时间,请求超过一定时间没有响应结果就返回错误信息,不会无休止等待
会在调用业务时加上一个超时时间比如说 1 秒钟,当服务 A 依赖于服务 C 时请求最多等待 1 秒钟,如果请求超过 1 秒钟就会立即结束这个请求,不再等待,返回给用户提示信息 (不好意思失败了)
缺点:只能起到缓解作用,不能解决根本问题,因为请求速度大于等待时间就会出现问题
2、舱壁模式:限定每个业务能使用的线程数,避免耗尽整个 tomcat 的资源,因此也叫线程隔离。
舱壁模式是来自于现实生活中的船舱的设计,一些大型的轮船它都会把船体利用隔板分隔成独立的小的空间这样的隔板就叫做舱壁。因为这些空间之间是相互隔离的,假设说船体的某个部位撞上了冰山漏水了。那么最多只是把部分船舱填满水,因为是隔离的所以其它船舱不受影响。这样就提高了整艘船的容灾能力
这种模式延续到程序设计里边是怎么做的呢?
这是服务 A 里面的资源也就是 tomcat 就可以看做成整艘船。我们可以把 tomcat 里面的资源 (线程) 划分成一个一个独立的线程池。给每个业务分配一个线程池。现在业务 1 进来后它依赖于服务 B,它最多使用十个限制,访问业务 2 比方说它依赖于服务 C,它也最多使用十线程。
现在假设说服务 C 出现故障了,这个业务 2 就会阻塞占用线程,但是它最多占用十个,这时它能够使用 tomcat 的资源是有限的,这样就把故障隔离到十个线程内了,因此也叫线程隔离。因此就避免了整个 tomcat 被耗尽的情况
缺点:资源有一定的浪费,比如说服务 C 真的宕机了,现在每次请求来还让它尝试着去访问一下服务 C 还要占用十个线程也是一种浪费
3、熔断降级:由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务的一切请求。
统计服务 A 里面的业务,比方说服务 A 里第一次业务访问是正常的,结果后面两次都出现了故障。这时断路器就会统计比例,三个请求一个正常两个故障,故障比例高达 60%。假设说阈值是 50% 超出了阈值,此时就会出现熔断。
一旦被熔断如果在服务 A 内部还想要访问服务 D 的业务就无法再去访问服务 D 了,只要是访问服务 D 的业务就会被拦截
4、流量控制:限制业务访问 QPS,避免服务因流量的突增而故障。
QPS:每秒钟处理的请求数量
比方说有一个微服务,它能承受的最大 QPS 为 2,也就是每秒钟最多处理两个请求。
但是现在有无数个请求涌过来,就会出现故障,一旦这个服务出现故障,而依赖于这个服务的其它服务也就跟着出现故障就会出现雪崩问题
所以我们要避免服务因为流量过高而引起故障,这时就需要用到 Sentinel 了
现在假设说有无数个请求过来而 Sentinel 可以按照这个服务所能够承受的频率去释放请求,这时微服务就能从容应对这些请求了就避免了出现故障
流量控制是预防雪崩,前面三种是已经有服务故障了我怎么样去避免这个故障传递给其它服务
但是也不能说,那我就只用 流量控制呗,其它的我就不用了,这样显然是不对的。
因为高并发引起的服务故障只是故障的原因之一,往往服务还会因为其它问题而出现故障。比方说网络问题或者说 fgc 引起的假死问题,这时我们就需要用到其它的解决方案了
总结:
什么是雪崩问题?
- 微服务之间相互调用,因为调用链中一个服务故障,引起整个链路都无法访问的情况
如何避免因瞬间高并发流量而导致服务故障?
- 流量控制
如何避免因服务故障引起的雪崩问题?
- 超时处理
- 线程隔离
- 降级熔断
# 1.1.3、服务保护技术对比🌲
Sentinel | Hystrix | |
---|---|---|
隔离策略 | 信号量隔离 | 线程池隔离 / 信号量隔离 |
熔断降级策略 | 基于慢调用比例或异常比例 | 基于失败比例 |
实时指标实现 | 滑动窗口 | 滑动窗口 (基于 RxJava) |
规则配置 | 支持多种数据源 | 支持多种数据源 |
扩展性 | 多个扩展点 | 插件的形式 |
基于注解的支持 | 支持 | 支持 |
限流 | 基于 QPS,支持基于调用关系的限流 | 有限的支持 |
流量整形 | 支持慢启动,匀速排队模式 | 不支持 |
系统自适应保护 | 支持 | 不支持 |
控制台 | 开箱即用,可配置规则,查看秒级监控,机器发现等 | 不完善 |
常见框架的适配 | Servlet,Spring Cloud,Dubbo,gRPC 等 | Servlet,Spring Cloud Netflix |
# 1.2、认识 Sentinel🌳
Sentinel 是阿里巴巴开源的一款微服务流量控制组件。官网地址:https://sentinelguard.io/zh-cn/
Sentinel 具有以下特征:
1、丰富的应用场景:Sentinel 承接了阿里巴巴进 10 年的双十一大促流量的核心场景,例如秒杀 (即突发流量控制在系统容量可以承受的范围),消息削峰添谷,集群流量控制,实时熔断下游不可用应用等。
2、完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台集群秒级数据,甚至 500 台以下规模的集群的汇总运行情况
3、广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架 / 库的整合模块,例如与 Spring cloud,Dubbo,gRPC 的整合。您只需要引入相应的依赖并运行简单的配置即可快速的接入 Sentinel
4、完善的 SPI 扩展点:Sentinel 提供简单易用,完善的 SPI 扩展接口。您可以通过实现扩展接口来快速的定制逻辑。例如定制规则管理,适配动态数据源等。
# 1.2.1、安装 Sentinel 控制台🌲
sentinel 官方提供了 UI 控制台,方便我们对系统做限流设置。大家可以在 Github 下载。
1、将其拷贝到一个非中文目录,然后运行命令
java -jar sentinel-dashboard-1.8.6.jar |
2、然后访问:localhost:8080 即可看到控制台页面,默认的账号和密码都是 sentinel
# 1.3、安装 Sentinel 控制台🌳
如果要修改 Sentinel 的默认端口,账号,密码,可以通过下列配置:
配置项 | 默认值 | 说明 |
---|---|---|
server.port | 8080 | 服务端口 |
sentinel.dashboard.auth.username | sentinel | 默认用户名 |
sentinel.dashboard.auth.password | sentinel | 默认密码 |
但是它已经是一个 jar 包了怎么改它的配置文件呢?
修改配置方式:
举例说明:
sentinel-dashboard-1.8.6.jar -Dserver.port=8090
# 1.4、微服务与 Sentinel 整合🌳
# 1.4.1、引入 cloud-demo🌲
要使用 Sentinel 肯定要结合微服务,这里我们使用 SpringCloud 工程,项目地址:https://gitee.com/doukaixin/typora.git
项目结构如下:
启动项目进行测试看看是否还可以正常使用:
访问页面
一切正常后我们就开始整合
# 1.4.2、微服务整合 Sentinel🌲
我们在 order-service 中整合 Sentinel,并且链接 Sentinel 的控制台,步骤如下:
1、引入 Sentinel 依赖:
<dependency> | |
<groupId>com.alibaba.cloud</groupId> | |
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> | |
</dependency> |
2、配置控制台地址
spring: | |
cloud: | |
sentinel: | |
transport: | |
dashboard: localhost:8080 |
3、访问微服务的任意端点,触发 Sentinel 监控
# 1.5、限流规则🌳
- 快速入门
- 流控模式
- 流控效果
- 热点参数限流
# 1.5.1、蔟点链路🌲
蔟点链路:就是项目内的调用链路,链路中被监控的每个接口就是一个资源。默认情况下 Sentinel 会监控 SpringMVC 的每一个端点 (Endpoint),因此 SpringMVC 的每一个端点 (Endpoint),就是调用链路中的一个资源。
流控,熔断等都是针对蔟点链路中的资源来设置的,因此我们可以点击对应资源后面的按钮来设置规则:
点击资源 /order/{orderId} 后面的流控按钮,就可以弹出表单。表单中可以添加流控规则,如下图所示:
其含义是限制 /order/{orderId} 这个资源的单击 QPS 为 1,即每秒只允许 1 次请求,超出的请求会被拦截并报错。
针对来源中 defalt 表示一切请求都限流
# 1.5.1.1、案例,流控规则入门案例🌴
需求:给 /order/{orderId} 这个资源设置流控规则,QPS 不能超过 5。然后利用 jmeter 测试。
这里使用 jemeter 进行并发的测试工具安装文章:jmeter 安装及使用.
测试文件文章地址:sentinel 测试.jmx.
导入到 jmeter 中
如果点击没有反应就直接将 jmx 文件拖到蓝瓶子里面就算导入了
右键,流控入门,点击启动,之后可以点击查看结果树来查看请求的情况
这是设置 QPS 后的请求情况,没有限流的时候全是通过的
查看控制台的情况
# 1.5.2、流控模式🌲
在添加限流规则时,点击高级选项,可以选择三种流控模式:
直接:统计当前资源的请求,触发阈值时对当前资源直接限流,也是默认的模式
关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流
两个资源,A 触发了阈值,但我却对 B 做了限流
链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流
# 1.5.2.1、流控模式 - 关联🌴
- 关联模式:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流
- 使用场景:比如用户支付时需要修改订单状态,同时用户要查询订单。查询和修改操作会争抢数据库锁,产生竞争。业务需求是有限支付和更新订单的业务,因此当修改订单业务触发阈值时,需要对查询订单业务限流。
当 /write 资源访问量触发阈值时,就会对 /read 资源限流,避免影响 /write 资源。
# 1.5.2.1.1、案例,流控模式 - 关联🎋
需求:
- 在 OrderController 新建两个端点:/order/query 和 /order/update,无需实现业务
- 配置流控规则,当 /order/update 资源被访问的 QPS 超过 5 个时,对 /order/query 请求限流
重启服务后蔟点链路里面就会被清空了,我们需要重新去请求一下
分别请求一下 query 和 update 的接口
再去查看蔟点链路就可以看到有信息了
配置流控规则,当 /order/update 资源被访问的 QPS 超过 5 个时,对 /order/query 请求限流。
我们要对谁进行限流就对谁进行流控规则
点击 query 的流控
通过 jmeter 进行测试:
请求的地址就是 /order/update
可以看到这个请求不会受到任何影响
但,当访问 /order/query 时
就发生了异常,update 触发阈值时对 query 限流就实现了这样一种关联模式了
小结:
满足下面条件可以使用关联模式:
- 两个有竞争关系的资源
- 一个优先级较高,一个优先级较低
# 1.5.2.3、流控模式 - 链路🌴
链路模式:只针对从指定链路访问到本资源的请求做统计,判断是否超过阈值。
例如:有两条请求链路一个是从 test1 访问 common 资源另一个是从 test2 访问 common 资源:
- /test1 -> /common
- /test2 -> /common
如果希望统计从 /test2 进入到 /common 的请求,则可以这样配置:
这个配置的意思:
在做限流统计时,只统计从 test2 进入 common 的请求,test1 进来的不管。所以这种统计是对请求来源的一种统计
什么情况下我们会用到这样的模式呢?通过如下案例了解
# 1.5.2.3.1、案例,流控模式 - 链路🎋
需求:有查询订单和创建订单业务,两者需要查询商品。针对从查询订单进入到查询商品的请求统计,并设置限流。
步骤:
1、在 OrderService 中添加一个 queryGoods 方法,不用实现业务
@Service | |
public class OrderService { | |
... | |
@SentinelResource("goods") | |
public void queryGoods() { | |
System.out.println("查询商品"); | |
} | |
... | |
} |
2、在 OrderController 中,改造 /order/query 端点,调用 OrderService 中的 queryGoods 方法
@GetMapping("query") | |
public String queryOrder() | |
{ | |
// 查询商品 | |
orderService.queryGoods(); | |
// 查询订单 | |
System.out.println("查询订单"); | |
return "查询订单成功"; | |
} |
3、在 OrderController 中添加一个 /order/save 的端点,调用 OrderService 的 queryGoods 方法
@GetMapping("save") | |
public String saveOrder() | |
{ | |
orderService.queryGoods(); | |
System.out.println("新增订单"); | |
return "新增订单成功"; | |
} |
4、给 queryGoods 设置限流规则,从 /order/query 进入 queryGoods 的方法限制 QPS 必须小于 2
Sentinel 默认只标记 Controller 中的方法为资源,如果要标记其它方法,需要利用 @SentinelResource 注解,示例:
@SentinelResource("goods")
public void queryGoods() {
System.out.println("查询商品");
}
Sentinel 默认会将 Controller 方法做 context 整合,导致链路模式的流控失败,需要修改 application.yml,添加配置:
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080
web-context-unify: false // 关闭context整合
访问 order/query 和 order/save
查看 sentinel 的蔟点链路信息
可以看到 /order/query 与 /order/save 变成了两个独立的链路了,在之前没有关闭 context 整合时它俩是同一个跟链路下的子链路
现在我们就可以对 goods 添加流控规则了,也就是上面的第 4 步
使用 jmeter 进行测试
发起 200 个请求每秒发起 4 个 ,而且同时发起两个请求
可以看到请求 save 没问题
可以看到请求 query 时是两个两个的。
总结:
流控模式有哪些?
- 直接:对当前资源限流
- 关联:高优先级资源触发阈值,对低优先级资源限流
- 链路:阈值统计时,只统计从指定资源进入当前资源的请求,是对请求来源的限流
# 1.5.2.4、流控效果🌲
流控效果是指请求达到流控阈值时应该采取的措施,包括三种:
- 快速失败:达到阈值后,新的请求会被立即拒绝并抛出 FlowException 异常。是默认的处理方式。
- warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值
- 排队等待:让所有的请求按照先后次序排队执行,两个请求的剪个不能小于指定时长
# 1.5.2.4.1、流控效果 - warm up🌴
warm up 也叫预热模式,是应对服务冷启动的一种方案。请求阈值初始值是 threshold 除以 coldFactor,持续指定时长后,逐渐提高到 threshold 值。而 clodFactor 的默认值是 3.
例如,我设置 QPS 的 threshold 为 10,预热时间为 5 秒,那么初始阈值就是 10 / 3,也就是 3,然后在 5 秒后逐渐增长到 10
# 1.5.2.4.1.1、案例,流控效果 - warm up🎋
需求:给 /order/{orderId} 这个资源设置限流,最大 QPS 为 10,利用 warm up 效果,预热时长为 5 秒
请求一下通过 id 查询的 uri:http://localhost:8088/order/101
然后回到 Sentinel 刷新页面,就可以看到 orderId 了
给 orderId 添加一个流控规则
使用 jmeter 进行测试:
可以看到通过的 QPS 越来越多
# 1.5.2.4.2、流控效果 - 排队等待🌴
当请求超过 QPS 阈值时,快速失败和 warm up 会拒绝新的请求并抛出异常。而排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须等待前面执行完成。如果请求预期的等待时间超出最大时长,则会被拒绝。
例如:QPS=5,意味着每 200ms 处理一个队列中的请求;timeout=2000,意味着预期等待超过 2000ms 的请求会被拒绝并抛出异常
比如下面的时间线,有无数个请求要进入队列执行,那第一个进入队列的请求它的等待时间一定是 0ms。但是同一时刻又来了一个请求那么第二个请求它一定要等待至少 200ms,所以它的预期等待时间就是 2 秒,后面的请求以此类推。直到等到时间最多为 2000ms 时,再来一个请求它的等待时间就是超出这 2000ms 的时间了,而超出的请求就会被拒绝
好处:假设说请求 QPS 是波动型的,比如说第一秒钟一个请求也没来,这时队列是空的。结果第二秒一下来了 10 个请求,放到队列里然后每 200ms 放行一个换算成 QPS 就是 5,所以不管进入的 QPS 是怎样波动的出去的 QPS 一定是稳定的按照 200ms 一个的速度去放。所以就起到了流量整形的作用
# 1.5.2.4.2.1、案例,流控效果 - 排队等待🎋
需求:给 /order/{orderId} 这个资源设置限流,最大 QPS 为 10,利用排队的流控效果,超时时长设置为 5ss
使用 jmeter 进行测试:
总结:
流控效果有哪些?
- 快速失败:QPS 超过阈值时,拒绝新的请求
- warm up:QPS 超过阈值时,拒绝新的请求;QPS 阈值是逐渐提升的,可以避免冷启动时高并发导致服务宕机。
- 排队等待:请求会进入队列,按照阈值允许的时间间隔依次执行请求;如果请求预期等待时长大于超时时间,直接拒绝
# 1.5.3、热点参数限流🌲
之前的限流是统计访问某个资源的所有请求,判断是否超过 QPS 阈值。而热点参数限流是分别统计参数值相同的请求,判读是否超过 QPS 阈值。
比如说:有一个资源是根据 id 查询商品,共有 4 个请求。如果按照原来的统计方式那 QPS 就是 4。而按照热点参数它会根据参数值来判断,前三个请求传递的 id 为 1,而最后一个传递的 id 为 2。所以 QPS 统计就会分开统计了,id 为 1 的统计为 3 个 QPS 就为 3,id 为 2 的统计为 1 个 QPS 就为 1
# 1.5.3.1、配置热点限流🌴
参数索引:代表当前列表中索引为 0 的参数也就是第一个
单机阈值 + 统计窗口时长 = n 秒钟最多处理 5 个请求
代表的含义是:对 hot 这个资源的 0 号参数 (第一个参数) 做统计,每 1 秒相同参数值的请求数不能超过 5
# 1.5.3.2、配置热点限流高级项🌴
在热点参数限流的高级选项中,可以对部分参数设置例外配置:
结合上一个配置,这里的含义是对 0 号的 long 类型参数限流,每 1 秒相同参数的 QPS 不能超过 5,有两个例外:
1、如果参数值是 100,则每 1 秒允许的 QPS 为 10
2、如果参数值是 101,则每 1 秒允许的 QPS 为 15
# 1.5.3.3、案例,热点参数限流🌴
给 /order/{orderId} 这个资源添加热点参数限流,规则如下:
- 默认的热点参数规则是每 1 秒请求量不超过 2
- 给 102 这个参数设置例外:每 1 秒请求量不超过 4
- 给 103 这个参数设置例外:每 1 秒请求量不超过 10
<font color='red'> 注意 </font>:
热点参数限流对默认的 SpringMVC 资源无效
步骤:
1、热点参数限流对默认的 SpringMVC 资源无效,我们 order/{orderId} 这个资源恰好就是 Sentinel 默认对 SpringMVC 监控的一个资源,所以即便配置了热点参数也不会生效,只有通过 @SentinelResource 去声明的资源才可以配置热点参数限流
@SentinelResource("hot") | |
@GetMapping("{orderId}") | |
public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) { | |
// 根据 id 查询订单并返回 | |
return orderService.queryOrderById(orderId); | |
} |
这时资源就同时具备了两个名称了,一个是 order/{orderId} 这是默认的 SpringMVC 资源名称。
另外一个就是我们自己起的 hot
重启 orderService 服务后访问页面 uri:http://localhost:8088/order/101
这样就可以看到蔟点链路了
2、配置热点限流,不要点击树形列表中的热点这个表单里面没有高级配置
我们点击左边的选项栏中的热点规则,进行配置
点击新增热点限流规则就会弹出一个表单进行配置
使用 jmeter 进行测试:
order/101 的请求结果
order/102 请求的结果
order/103 请求结果
Sentinel 的控制台情况
# 1.6、隔离和降级🌳
- FeignClient 整合 Sentinel
- 线程隔离 (舱壁模式)
- 熔断降级
回顾,隔离和降级的原理
虽然限流可以尽量避免因高并发而引起的服务故障,但服务还会因为其它原因而故障。而要将这些故障控制在一定范围,避免雪崩,就要靠线程隔离 (舱壁模式) 和熔断降级手段了。
给每个业务划分线程池,当有请求访问业务 1 时最多使用十个线程,虽然服务 C 故障了请求过程中会阻塞但是最多只占用 10 个线程资源
熔断降级会统计故障服务的比例,比如说服务 A 访问服务 D 成功的只有一个,故障的有两个这时的比例就是 60%。此时 断路器就会熔断业务,再有要请求服务 D 的业务就会快速失败
不管是线程隔离还是熔断降级,都是对 <font color='red'> 客户端 (调用方) </font>. 的保护
# 1.6.1、Feign 整合 Sentinel🌲
SpringCloud 中,微服务调用都是通过 Feign 来实现的,因此做客户端保护必须整合 Feign 和 Sentinel
1、修改 OrderService 的 applicaiton.yml 文件,开启 Feign 的 Sentinel 功能
feign: | |
sentinel: | |
enabled: true # 开启 feign 对 sentinel 的支持 |
2、给 FeignClient 编写失败后的降级逻辑
2.1、方式一:FallbackClass,无法对远程调用的异常做处理
2.2、方式二:FallbackFactory,可以对远程调用的异常做处理,我们选择这种
2.2.1、在 feign-api 项目中定义类,实现 FallbackFactory:
@Slf4j | |
public class UserClientFallbackFactory implements FallbackFactory<UserClient> { | |
@Override | |
public UserClient create(Throwable throwable) { | |
// 创建 UserClient 接口实现类,实现其中的方法,编写失败降级的处理逻辑 | |
return new UserClient() { | |
@Override | |
public User findById(Long id) { | |
// 记录异常信息 | |
log.error("查询用户异常", throwable); | |
// 根据业务需求返回默认的数据,这里是空用户 | |
return new User(); | |
} | |
}; | |
} | |
} |
2.2.2、在 feign-api 项目中的 DefaultFeignConfigration 类中将 UserClientFallbackFactory 注册为一个 Bean:
声明 Bean 很简单,定义函数返回 new 出来的对象并注册 Bean 就行了
@Bean | |
public UserClientFallbackFactory userClientFallbackFactory() | |
{ | |
return new UserClientFallbackFactory(); | |
} |
2.2.3、在 feign-api 项目中的 UserClient 接口中使用 UserClientFallbackFactory:
@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class) | |
public interface UserClient { | |
@GetMapping("user/{id}") | |
User findById(@PathVariable Long id); | |
} |
3、重启 orderService 服务
重复完成后查看 Sentinel 的蔟点链路情况,下面的情况说明完成了 Feign 与 Sentinel 的整合了
总结:
Sentinel 支持的雪崩解决方案:
- 线程隔离 (仓壁模式)
- 降级熔断
Feign 整合 Sentinel 的步骤:
- 在 application.yml 中配置:feign.sentinel.enable = true
- 给 FeignClient 编写 FallbackFactory 并注册为 Bean
- 将 FallbackFactory 配置到 FeignClient
# 1.6.2、线程隔离🌲
线程隔离有两种方式实现:
- 线程池隔离
- 信号量隔离 (Sentinel 默认采用)
通过案例来查看两个线程隔离的差别:
假设说有四个服务 i,a,b,c 。服务 i 里面的业务依赖于服务 a,b,c。比方说来了一个请求,它的业务依赖于服务 a,b。如果说我们现在用的是线程池隔离那么它就会给这个业务所依赖的每个服务都创建线程池,请求来了以后不会使用请求本身的线程,而是去创建的线程池中分别取一个线程而用这个线程去调用 Feign 的客户端,发起远程调用。这样呢就把两个服务隔离了
而如果采用的是信号量的模式就简单多了。比如说来了一个请求,要访问服务 c。它会使用请求本身的线程直接去调用 Feign 的客户端去调用服务 c。那它怎么做隔离呢?它会在请求进入时做一个判断,维持了一个计数器。判断现在计数器还有没有。比如说计数器总数为 10 每进入一个请求计数器就会减 1,然后就可以去访问服务 c 了。当来了十个请求计数器被取完后再来新请求就会被拒绝。这样就等于利用计数器限制了最终线程的数量了。如果处理完请求计数器还是要还回去的
# 1.6.2.1、两者的优缺点:
# 线程池隔离
优点:
1、支持主动超时
如果发现请求有点久了可以手动的终止线程
2、支持异步调用
每一次请求都是一个独立的线程而不是 tomcat 请求的线程
缺点:
1、线程的额外开销比较大
场景:
1、低扇出
扇出:比方说请求到我这个服务,而我这个服务依赖于其它的 n 个服务。就是从我这来了,来了一个而后我扇出了好几个。如果依赖的服务越多,那扇出也就越高,而扇出越高调用的越多我需要开启的线程也越多,消耗也就越大。所以它不适用于高扇出的场景!
# 信号量隔离
优点:
1、轻量级,无额外开销
缺点:
1、不支持主动超时
2、不支持异步调用
场景:
1、高频调用
2、高扇出
# 1.6.2.2、线程隔离 (舱壁模式)🌴
在添加限流规则时,可以选择两种阈值类型:
- QPS:就是每秒的请求数,在快速入门中已经演示过
- 线程数:是该资源能使用的 tomcat 线程数的最大值。也就是通过限制线程数量,实现 <font color='red'> 舱壁模式 </font>.
# 1.6.2.2.1、案例,线程隔离 (舱壁模式)🎋
需求:给 UserClient 的查询用户接口设置流控规则,线程数不能超过 2。然后利用 jmeter 测试。
配置完成!使用 jmeter 进行高并发测试:
其中的线程数为 10,时间为 0。表示一瞬间发 10 个线程请求。
测试结果:
但是怎么没有看到被拒绝的请求呢?
其实我们做了降级策略了
在 FeignClient 注解中加了 fallbackFactory
@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class) | |
public interface UserClient { | |
@GetMapping("user/{id}") | |
User findById(@PathVariable Long id); | |
} |
所以当线程隔离被降级以后它不是报错,而是会走这个降级逻辑返回一个空对象
@Slf4j | |
public class UserClientFallbackFactory implements FallbackFactory<UserClient> { | |
@Override | |
public UserClient create(Throwable throwable) { | |
// 创建 UserClient 接口实现类,实现其中的方法,编写失败降级的处理逻辑 | |
return new UserClient() { | |
@Override | |
public User findById(Long id) { | |
// 记录异常信息 | |
log.error("查询用户异常", throwable); | |
// 根据业务需求返回默认的数据,这里是空用户 | |
return new User(); | |
} | |
}; | |
} | |
} |
但是在 idea 的控制台会打印异常的信息
10-17 13:40:38:046 ERROR 1804 --- [nio-8088-exec-3] c.i.f.c.f.UserClientFallbackFactory : 查询用户异常
我们在回到 jmeter 点击请求查看详细情况,有的获取的数据是空的
总结:
线程隔离的两种手段是?
- 信号隔离
- 线程池隔离
信号量隔离的特点是?
- 基于计数器模式,简单,开销小
线程池隔离的特点是?
- 基于线程池模式,有额外开销,但隔离控制更强
# 1.6.3、熔断降级🌲
熔断降级是解决雪崩问题的重要手段。其思路是由 <font color='red'> 断路器 </font > 统计服务调用的异常比例,慢请求比例,如果超出阈值则会 < font color='red'> 熔断 </font > 该服务。即拦截访问该服务的一切请求;而当请求恢复时,断路器会放行访问该服务的请求。
断路器内部由一个状态机来实现的,这个状态机包含三种状态,分别是:closed,open,Half-Open
closed 状态下,断路器不会拦截任何请求。不管请求是正常的还是异常的都可以访问,它会去统计调用异常的比例,如果统计过程中发现异常比例过高达到了阈值就会从 closed 状态切换到 open 状态、这时就会拦截进入该服务的一些请求了也就相当于是熔断了。熔断有一个持续的时间,当熔断时间结束就会从 open 状态切换到 half-open 状态 (半开状态)。half-open 状态会放行一次请求,然后根据这次请求的结果来判断下一步。
比如说放行一次请求发现这个请求依赖是失败的那么就会再次进入 open 状态
如果放行一次请求执行完了发现是成功的那么就会从 half-open 切换到 closed 状态
我们知道断路器要想从 closed 状态进入到 open 状态需要判断服务有没有触发熔断的条件。而熔断判断的条件就是依据熔断策略来完成的,而在 Sentinel 里熔断的策略有三种:慢调用,异常比例,异常数
# 1.6.3.1、熔断策略 - 慢调用🌴
慢调用:业务的响应时长 (RT) 大于指定时长的请求认定为慢调用请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断。例如:
解读:RT 超过 500ms 的调用是慢调用,统计最近 10000ms 内的请求,如果请求量超过 10 次,并且慢调用比例不低于 0.5,则触发熔断,熔断时长为 5 秒。然后进入 half-open 状态,放行一次请求做测试。
# 1.6.3.1.1、案例,熔断策略 - 慢调用🎋
需求:给 UserClient 的查询用户接口设置降级规则,慢调用的 RT 阈值为 50ms,统计时间为 1 秒, 最小请求数量为 5,失败阈值比例为 0.4,熔断时长为 5
问题:本地调用时间不可能那么长所以为了能够触发慢调用需要修改一下业务代码让其超过响应时间
@GetMapping("/{id}") | |
public User queryById(@PathVariable("id") Long id, @RequestHeader(value = "Truth", required = false) String truth) | |
{ | |
if(id == 1) | |
{ | |
// 休眠,触发熔断 | |
try { | |
Thread.sleep(60); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
System.out.println("truth: " + truth); | |
return userService.queryById(id); | |
} |
重启 userService 服务
访问 order/101 时就会去请求 userId 为 1 的用户
order/102
order/101
可以看到 102 请求时长基本不超过 10 毫秒,而 101 请求时长基本不下 70 毫秒。这样就能满足我们触发阈值的情况了
然后就可以去配置熔断降级策略了,去 Sentinel 控制台中
然后我们就可以去进行测试了:
这次的测试可以不用 jmeter 了,因为需要一秒内 5 次请求,有 2 次触发就行了很容易可以去刷新 order/{orderId} 的请求页面
现在访问 order/102,是正常可以访问的
连续请求 5 次 order/101,也就是刷新 5 次页面
再去访问 order/102 就会发现 user 中没有数据了,当 order 去查 user 的那一刻直接被熔断了,压根没有去查直接走降级逻辑返回 null 了
等待 5 秒后再去刷新一个 order/102 页面,熔断就取消了可以请求到 user 的数据了
# 1.6.3.2、熔断策略 - 异常比例,异常数🌴
- 异常比例或异常数:统计指定时间内的 调用,如果调用次数超过指定的请求数,并且出现异常的比例达到设定的比例阈值 (或超过指定异常数) ,则触发熔断。例如:
按照异常的比例熔断
达到异常数量熔断
解读:统计最近 1000ms 内的请求,如果请求量超过 10 次,并且异常比例不低于 0.5,则触发熔断,熔断时长为 5 秒。然后进入 half-open 状态,放行一次请求做测试。
# 1.6.3.2.1、案例,熔断策略 - 异常比例🎋
需求:给 UserClient 的查询用户接口设置降级规则,统计时间为 1 秒,最小请求数量为 5,失败阈值比例为 0.4,熔断时长为 5
<font color='red'> 提示 </font>:为了触发异常统计,我们需要修改 UserService 中的业务,抛出异常如下代码:
@GetMapping("/{id}") | |
public User queryById(@PathVariable("id") Long id, @RequestHeader(value = "Truth", required = false) String truth) | |
{ | |
if(id == 1) | |
{ | |
throw new RuntimeException("故意抛出异常,触发异常比例熔断"); | |
} | |
System.out.println("truth: " + truth); | |
return userService.queryById(id); | |
} |
给 UserClient 查询用户接口设置降级规则
访问 order/102
然后连续访问 n 次 order/101
在访问 order/102 就发现被熔断了
总结:
Sentinel 熔断降级的策略有哪些?
1、慢调用比例:超过指定时长的调用为慢调用,统计单位时长内慢调用的比例,超过阈值则熔断
2、异常比例:统计单位时长内异常调用的比例,超过阈值则熔断
3、异常数:统计单位时长内异常调用的次数,超过阈值则熔断
# 1.7、授权规则🌳
- 授权规则
- 自定义异常结果
# 1.7.1、授权规则🌲
授权规则可以对调用方的来源做控制,有白名单和黑名单两种方式。
我们学习网关的时候讲过,gateway 它不就是把门的吗。所有请求都要经过网关去做身份的认证,怎么到这儿又要整一个呢?
万一公司出了一个内鬼,它把微服务地址泄露给了外边哪些不怀好意的人,那这些哥们就可以绕过网关直接访问微服务了。那网关里做的再严密也都没用了,所以 Sentinel 授权规则可以解决这个问题。授权规则会去验证你请求从哪来的。如果说你是从网关过来的那就放行,如果说你从别的地方来的就拦截
- 白名单:来源 (origin) 在白名单内的调用者允许访问
- 黑名单:来源 (origin) 在黑名单内的调用者不允许访问
例如,我们限定只允许从网关来的请求访问 order-service,那么流控应用中就填写网关的名称
那么这时
资源名填的就是:order-service 里面的受保护资源,比方说 order/
流控应用填的就是:允许的调用者的名字 origin,请求来源名称,那么请求来源名称是怎么得到的呢?
Sentinel 是通过 RequestOriginParser 这个接口的 parseOrigin 来获取请求的来源的。
但是这个方法的返回结果只能是 default,也就是说无论是从网关过来还是浏览器过来它的来源名称都叫 default
public interface RequestOriginParser { | |
/** | |
* 从请求 request 对象中获取 origin,获取方式自定义 | |
*/ | |
String parseOrigin(HttpServletRequest request); | |
} |
所以我们必须自己想办法实现这个接口编写它的业务逻辑,让从网关过来的请求和从浏览器过来的请求返回不同的结果,这样来源名称不同就可以编写授权规则了
例如,我们尝试从 reqeust 中获取一个名为 origin 的请求头,作为 origin 的值:
这里 gateway 访问消费者也就是 orderservice 服务所以下面业务代码是写到 order 服务中的
@Component | |
public class HeaderOriginParser implements RequestOriginParser { | |
@Override | |
public String parseOrigin(HttpServletRequest request) { | |
// 1. 获取请求头 | |
String header = request.getHeader("origin"); | |
// 2. 非空判断 | |
if(StringUtils.isEmpty(header)) | |
{ | |
header = "blank"; | |
} | |
return header; | |
} | |
} |
我们还需要在 gateway 服务中,利用网关的过滤器添加名为 gateway 的 origin 头
spring: | |
cloud: | |
gateway: | |
default-filters: | |
- AddRequestHeader=origin,gateway # 添加名为 origin 的请求头,值为 gateway |
重启 orderservice 服务和 gateway 服务
随便访问一个 uri:http://localhost:8088/order/101
然后查看 Sentinel 控制台中的蔟点链路
然后我们就可以给 /order/{orderId} 配置授权规则:
此时我们再去访问 order/101 服务
注意:这里 order/101 是绕过了网关的
访问是不成功的。
那我们使用网关进行访问的
可以正常访问
但是由于上面直接跳过网关访问 order/101 的时候就会报错,页面的报错提示如下:
说是限流异常这就不合理了,明明是授权异常怎么就限流异常了呢
所以我们可以对其进行自定义异常:
# 1.7.2、自定义异常结果🌲
默认情况下,发生限流,降级,授权拦截时,都会抛出异常到调用方。如果要自定义异常时的返回结果,需要实现 BlockExceptionHanlder 接口:
public interface BlockExcpetionHandler { | |
/** | |
* 处理请求被限流,降级,授权拦截时抛出的异常:BlockException | |
*/ | |
void handle(HttpServletRequest request, HttpServletResponse response, BlockExcpetion e) throws Exception; | |
} |
而 BlockException 包含很多个子类,分别对应不同的场景:
异常 | 说明 |
---|---|
FlowException | 限流异常 |
ParamFlowException | 热点参数限流的异常 |
DegradeException | 降级异常 |
AuthorityException | 授权规则异常 |
SystemBlockException | 系统规则异常 |
我们在 order-service 中定义类,实现 BlockExceptionHandler 接口:
@Component | |
public class SentinelBlockHandler implements BlockExceptionHandler { | |
@Override | |
public void handle(HttpServletRequest httpServletRequest, HttpServletResponse response, BlockException e) throws Exception { | |
String msg = "未知异常"; | |
int status = 429; | |
if(e instanceof FlowException) | |
{ | |
msg = "请求被限流了 !"; | |
}else if(e instanceof DegradeException) | |
{ | |
msg = "请求被降级了 !"; | |
}else if(e instanceof ParamFlowException) | |
{ | |
msg = "热点参数限流 !"; | |
}else if(e instanceof AuthorityException) | |
{ | |
msg = "请求没有权限 !"; | |
status = 401; | |
} | |
response.setContentType("application/json;charset=utf-8"); | |
response.setStatus(status); | |
response.getWriter().println("{\"message\": \"" + msg + "\",\"status\": " + status + "}"); | |
} | |
} |
重启 orderservice 服务,然后访问一下 order/101。再去 Sentinel 控制台查看蔟点链路并添加授权规则:
然后再对 order/101 进行直接访问
可以看到自定义异常成功了!
总结:
获取请求来源的接口是什么?
- RequestOriginParser
处理 BlockException 的接口是什么
- BlockExcpetionHandler
# 1.8、规则持久化🌳
- 规则管理模式
- 实现 push 模式
Sentinel 的控制台规则管理有三种模式:
- 原始模式:Sentinel 的默认模式,将规则保存在内存,重启服务会丢失。
- pull 模式
- push 模式
# 1.8.1、规则管理模式 - pull 模式🌲
pull 模式:控制台将配置的规则推送到 Sentinel 客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则。
缺点:存在时效性问题,从而导致数据的不一致问题
# 1.8.2、规则管理模式 - push 模式🌲
push 模式:控制台将配置规则推送到远程配置中心,例如 Nacos。Sentinel 客户端监听 Nacos,获取配置变更的推送消息,完成本地配置更新
总结:
Sentinel 的三种配置管理模式是什么?
原始模式:保存在内存
不支持持久化
pull 模式:保存在本地文件或数据库,定时去读取
定时轮询存在时效性问题,导致数据不一致
push 模式:保存在 Nacos,监听变更实时更新
# 1.8.3、实现 push 模式🌲
push 模式实现最为复杂,依赖于 Nacos,并且需要修改 Sentinel 控制台源码。
详细步骤可以参考文章 sentinel 规则持久化.
# 二、分布式事务🎄
- seata
事务的 ACID 原则
# 2.1、分布式服务案例🌳
微服务下单业务,在下单时会调用订单服务,创建订单并写入数据库。然后订单服务调用账户服务和库存服务:
- 账户服务负责扣减用户余额
- 库存服务负责扣减商品库存
比方说以下微服务:里面包含三个服务 订单服务,账户服务,库存服务。现在有一个用户下单的业务,用户下单时我希望订单服务去创建订单并且写入数据库,而后它再去调用账户服务和库存服务。账户服务去扣减用户的余额,而库存服务则去扣减商品库存。里面就包含了三个不同的微服务调用,而每个微服务都有自己独立的数据库,也就是独立的事务。最终希望的肯定是下单业务一旦执行每一个服务都要成功,如果失败都失败。但是能不能达到这样的结果呢下面进行验证
# 2.2、演示分布式事务问题 🌳
在分布式系统下,一个业务跨越多个服务或数据源,每个服务都是一个分支事务,要保证所有分支事务最终状态一致这样的事务就是 <font color='red'> 分布式事务 </font>.
1、导入微服务项目:https://gitee.com/doukaixin/typora.git
2、创建数据库,名为 seata_demo,然后导入 SQL 文件,改 SQL 文件在 seata-demo 项目中的 SQL 文件夹中
3、启动 nacos,所有微服务
4、测试下单功能,发送 Post 请求
发送请求后查看数据库
账户表
库存表
可以发现,账户余额扣了 200 变成了 800,而库存扣了 2 变成了 8
那么我们让扣除库存的时候报错看看会发生什么情况。
现在库存不够 10 个了我请求扣 10 个就会发生报错。
可以看到响应结果 500 了
此时我们再查看数据库的情况
账户表
库存表
账户表的余额被扣了 200,但是库存表的数量却没有发生变化,这样显然是不合理的。
下面我们来分析下这件事儿。
在上面的业务中,订单服务去创建了订单然后去调用了账户服务和库存服务完成余额扣减和库存扣减,其中订单服务和账户服务都创建成功了。而库存服务在执行的时候却因为库存不足而失败了。那按照理论上讲这里报错前面都应该跟着回滚
原因:
1、每个服务都是独立的库存服务抛出异常其它服务也不知道
2、每个服务都是独立的所以它们的事务也是独立的
如果要解决分布式事务的问题,首先要考虑四点:
首先第一点:分布式事务产生的原因,这个上述分析过了。
第二点:理解理论基础
第三点:弄清 seata 原理
第四点:利用 seata 手动实践,解决分布式事务问题
学习内容:
- 理论基础
- 初始 Seata
- 手动实践
- 高可用
# 2.3、理论基础🌳
- CAP 定理
- BASE 理论
# 2.3.1、CAP 定理🌲
1998 年,加州大学的计算机科学 Eric Brewer 提出,分布式系统有三个指标:
1、Consistency (一致性)
2、Availability (可用性)
3、Partition tolerance (分区容错性)
Eric Brewer 说,分布式系统无法同时满足这三个指标。
这个结论就叫做 CAP 定理。
三个圆不会同时重叠,最多两两重叠,也就是说同时满足两个
# 2.3.1.1、CAP 定理 - Consistency🌴
Consistency (一致性) :用户访问分布式系统中的任意节点,得到的数据必须一致
比方说有两个节点,第一个节点上有个数据叫 data 值为 0,第二个节点也是如此。现在用户不管是访问哪个节点,结果都是一样的。
但是,如果我对节点 node1 的数据进行了修改,这时两个节点的数据就不一样了。
为了满足一致性就一定要把 node1 的数据同步到 node2 中,一旦数据同步完成,数据就再此一致了
所以,作为一个分布式系统在做数据备份的时候一定要及时完成数据同步这样才能满足一致性
# 2.3.1.2、CAP 定理 - Availability🌴
Availability (可用性) :用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝
比如说 node3 的请求被阻塞或者拒绝了,那所有的请求进来无法访问了,这时 node3 就是不可用了
所以可用性是指这个节点能不能被正常的访问
# 2.3.1.3、CAP 定理 - Partition tolerance🌴
Partition (分区) :因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区。
Tolerance (容错) :在集群出现分区时,整个系统也要持续对外提供服务
比如说 node3 因为网络故障与 node1 与 node2 断开了连接。node1 与 node2 之间正常访问。
此时整个集群就会被划分成两个区了。node1 和 node2 它俩是一个区,node3 自己是一个区,这时如果有用户向 node2 写入了一个数据,node2 可以把数据同步给 node1 的但是 node3 上不能同步到数据,因此这两个区的数据就不一致了
那如果我一定要满足数据的一致性呢,那我是不是可以让 node3 等待 node2 网络的恢复和数据的同步,在恢复之前所有来访问我的请求我都阻塞在这里。如果这么做可能会满足数据的一致性,但是 node3 明明是一个健康的节点结果进来的请求你都卡在这里不让人家访问,那 node3 不就是不可用了吗,所以它就不满足 可用性了。
我要想满足可用性,我就没办法保证一致性。
我要保证数据一致性,我就没办法让 node3 是可用性的
当网络出现分区时,可用性和一致性没有办法同时满足。但是网络分区也是不可避免的。
当 p 一定要实现,那么这时 c 和 a 之间就要做出抉择了。要么 c 要么 a 没有办法同时满足。这就是 CAP 定理了的原因了
总结:
简述 CAP 定理内容?
- 分布式系统节点通过网络连接,一定会出现分区问题 (P)
- 当分区出现时,系统的一致性 (C) 和可用性 (A) 就无法同时满足
思考:elasticsearch 集群是 CP 还是 AP?
答:ES 集群当网络出现故障时,有节点与其它节点断开连接的时候。es 集群处于一个警告状态,出现故障的节点过了一段时间后就会从集群中剔除,而这个节点上原来的数据分片。会分散到其它健康的节点上,而故障节点从集群中剔除用户无法访问了因此就牺牲了 可用性了。而数据会负责其它节点数据同步可以正常进行保证了数据的一致性。
因此 es 集群显然是一个 CP。满足高一致性低可用性
我们知道在分布式系统下网络分区是不可避免的,所以不得不在一致性和可用性之间做出一个选择。
但是这两个特性都很重要我一个都不想放弃我该怎么办呢?Base 理论正好可以解决这个问题
# 2.3.2、BASE 理论🌲
BASE 理论是对 CAP 的一种解决思路,包含三个思想:
- Basically Available (基本可用):分布式系统在出现故障时,允许损失部分可用性,即保证核心可用。
- Soft State (软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态。
- Eventually Consistent (最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。
而分布式事务最大的问题是各个子事务的一致性问题,因此可以借签 CAP 定理的 BASE 理论:
- AP 模式:各子事务分别执行和提交,允许出现结果不一致,然后采用弥补措施恢复数据即可,实现 <font color='red'> 最终一致 </font>.
- CP 模式:各个子事务执行后互相等待,同时提交,同时回滚,达成 <font color='red'> 强一致 </font>。但事务等待过程中,处于弱可用状态。
# 2.3.2.1、分布式事务模型🌴
解决分布式事务,各个子系统之间必须能感知到彼此的事务状态,才能保证状态一致,因此需要一个事务协调者来协调每一个事务的参与者 (子系统事务)。
用户下单调用订单服务,然后去调用账户服务和库存服务。那这个地方我们就需要一个事务的协调者了,每个微服务都跟事务协调者保持联系,如果你现在要做强一致。订单服务执行的时候不要提交,扣款服务,扣库存服务,但是库存服务执行完后发现失败了。怎么知道的。它们要把自己的执行结果告知给协调者,然后协调者一看有人失败了将来再通知其它服务让它们做回滚,这样大家就能保持一致了
每一个事务的参与者 (子系统事务)。
这里的子系统事务,称为 <font color='red'> 分支事务 </font>.;有关联的各个分支事务 在一起称为 <font color='red'> 全局事务 </font>.
所以事务协调者,就是去协调各个分支事物的状态让它们达成一致
总结:
简述 BASE 理论三个思想:
- 基本可用
- 软状态
- 最终一致
解决分布式事务的思想和模型:
- 全局事务:整个分布式事务
- 分支事务:分布式事务中包含的每个子系统的事务
- 最终一致思想:各分支事务分别执行并提交,如果有不一致的情况,再想办法恢复数据
- 强一致思想:各分支事务执行完业务不要提交,等待彼此结果。而后统一提交或回滚
# 2.4、初始 Seata🌳
- Seata 的架构
- 部署 TC 服务
- 微服务集成 Seata
Seata 是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案。
官网地址:https://seata.io/zh-cn/
Seata 事务管理中有三个重要的角色:
1、TC (Transaction Coordinator) - 事务协调者:维护全局和分布式事务的状态,协调全局事务提交和回滚。
2、TM (Transaction Manager) - 事务管理器:定义全局事务的范围,开始全局事务,提交或回滚全局事务。
3、RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
Seata 根据要做强一致还是最终一致又延伸出了好几种不同的解决方案:
Seata 提供了四个不同的分布式解决方案:
1、XA 模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
2、TCC 模式:最终一致的分阶段事务模式,有业务侵入
3、AT 模式:最终一致的分阶段事务模式,无业务侵入,也是 Seata 的默认模式
4、SAGA 模式:长事务模式,有业务侵入
# 2.4.1、部署 TC 服务🌲
参考文章:seata 的部署和集成.
# 2.4.2、手动实践🌲
- XA 模式
- AT 模式
- TCC 模式
- SAGA 模式
# 2.4.2.1、XA 模式原理🌴
XA 规则 是 X/Open 组织定义的分布式事务处理 (DTP,Distributed Transaction Processing) 标准,XA 规范 描述了全局的 TM 与局部的 RM 之间的接口,几乎所有主流的数据库都对 XA 规范 提供了支持。
这种模式,它把分布式事务定义成了两个阶段。第一阶段为 “准备阶段” 在此阶段中实物协调者会向事务参与者 (RM) 去发起一个准备的请求,在上面讲述中 RM 是资源管理器,但是在 XA 标准中这个 RM 其实都是由数据区实现的。也就是说数据库本身实现了 RM 功能。事务协调者通知这些数据库去执行自己的业务,但是执行完不要提交,然后把执行结果告知事务协调者
然后进行 “第二阶段” 事务协调者通知 RM 可以提交了。事务结束
如果 第一阶段 有任意一个服务失败了
那么第二阶段,事务协调者就会通知所有的 RM 进行回滚
XA 模式,其实就是基于数据库本身的特性去实现的一种分布式事务
# 2.4.2.2、seata 的 XA 模式🌴
seata 的 XA 模式做了一些调整,但大体相似:
TM 注册全局事务,TM 作为分布式入口自然去调用微服务,调用微服务里面的 RM 拦截请求,seata 里面也实现了 RM,数据库也有 RM,在这个情况下 seata 的 RM 仅仅是代理你的请求。然后做一下分支事务的注册,接下来的事就是直接调用数据库了执行业务 SQL,但是执行完不提交,去报告一下事务的状态到 TC 就行了。所以 TC 此时就冲当了事务协调的作用了
至此,第一阶段结束
第二阶段 TM 发现业务结束了就去通知 TC,然后 TC 去检查事务的状态,都成功就成功。有一个失败就要回滚
如果我们只看核心部分就和 XA 没有什么太大的区别了
RM 一阶段的工作:
1、注册分支实物到 TC
2、执行分支业务 sql 但不提交
3、报告执行状态到 TC
TC 二阶段的工作:
- TC 检测各分支事务执行状态
- 如果都成功,通过所有 RM 提交事务
- 如果有失败,通过所有 RM 回滚事务
RM 二阶段的工作:
- 接收 TC 指令,提交或回滚事务
总结:
XA 模式的优点是什么?
- 事务的强一致性,满足 ACID 原则
- 常用数据库都支持,实现简单,并且没有代码侵入
XA 模式的缺点是什么?
- 因为一阶段需要锁定数据库资源,等待二阶段结束才释放,性能较差
- 依赖关系型数据库实现事务
# 2.4.2.3、实现 XA 模式🌴
Seata 的 starter 已经完成了 XA 模式的自动装配,实现非常简单,步骤如下:
1、修改 application.yml 文件 (每个参与事务的微服务),开启 XA 模式
seata: | |
data-source-proxy-mode: XA # 开启数据源代理的 XA 模式 |
2、给发起全局事务的入口方法添加 @GlobalTransactional 注解,本例中是 OrderServiceImpl 中的 create 方法:
@Override | |
@GlobalTransactional | |
public Long create(Order order) { | |
// 创建订单 | |
orderMapper.insert(order); | |
try { | |
// 扣用户余额 | |
accountClient.deduct(order.getUserId(), order.getMoney()); | |
// 扣库存 | |
storageClient.deduct(order.getCommodityCode(), order.getCount()); | |
} catch (FeignException e) { | |
log.error("下单失败,原因:{}", e.contentUTF8(), e); | |
throw new RuntimeException(e.contentUTF8(), e); | |
} | |
return order.getId(); | |
} |
3、重启服务并测试
目前的 account_tbl 表单信息
目前 storage_tbl 表单信息
使用 postman 进行发送请求测试
测试后 account_tbl 表单信息
测试后 storage_tbl 表单信息
可以看到没有问题,下面测试 异常 的情况
postman 测试如下:
将存库数量改为 10,当前的库存数据是不足 10 个的因此执行就会出问题可以看到状态码 500 已经异常了
我们看下数据库有没有回滚啊
测试后 account_tbl 表单信息
测试后 storage_tbl 表单信息
可以看到没有任何变化
看下 idea 的控制台的打印信息:
accountApplication 打印出了事务回滚的信息,其它的是报错的信息,因为报错本身就没有执行成功
10-18 11:48:06:076 INFO 18016 --- [h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:xid=192.168.45.1:8091:4251845158843265029,branchId=4251845158843265031,branchType=XA,resourceId=jdbc:mysql:///seata_demo,applicationData=null
10-18 11:48:06:080 INFO 18016 --- [h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 192.168.45.1:8091:4251845158843265029 4251845158843265031 jdbc:mysql:///seata_demo
10-18 11:48:06:083 INFO 18016 --- [h_RMROLE_1_2_16] i.s.rm.datasource.xa.ResourceManagerXA : 192.168.45.1:8091:4251845158843265029-4251845158843265031 was rollbacked
10-18 11:48:06:084 INFO 18016 --- [h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
# 2.4.2.4、AT 模式原理🌴
AT 模式同样是分阶段提交的事务模型,不过却弥补了 XA 模型中资源锁定周期过长的缺陷。
AT 模式执行完业务 sql 后会直接提交事务,而不是等待对方的执行。在执行业务 sql 的时候由 RM 拦截这次的执行并且给数据形成一个快照,快照名为:undo log,如果失败了就可以根据快照进行恢复了。在第一阶段就已经提交了,第二阶段将 log 给删除
阶段一 RM 的工作:
1、注册分支事务
2、<font color='red'> 记录 undo-log (数据快照)</font>.
3、执行业务 sql 并 <font color='red'> 提交 </font>.
4、报告事务状态
阶段二提交时 RM 的工作:
1、删除 undo-log 即可
阶段二回滚时 RM 的工作:
1、根据 undo-log 恢复数据到更新前
下面通过一个例子来进行比喻:
例如,一个分支业务的 SQL 是这样的:update tb_account set money = money - 10 where id = 1
总结:
简述 AT 模式与 XA 模式最大的区别是什么?
- XA 模式一阶段不提交事务,锁定资源;AT 模式一阶段直接提交,不锁定资源
- XA 模式依赖数据库机制实现回滚;AT 模式利用数据快照实现数据回滚
- XA 模式强一致;AT 模式最终一致
# 2.4.2.4.1、AT 模式的脏写问题🎋
上述得知,AT 模式相对于 XA 模式来讲,性能得到了一些提升,因为 AT 模式在第一阶段的时候执行事务会直接提交。而不是等待各个分支一起执行完,因此资源锁定周期短 性能就比较好了。但是也正是因为它提前释放了资源没有去做锁,这就导致了在并发访问情况下会存在一些安全问题。
比如说有个 account 表,业务是修改 money 字段让其减 10,那么有一个线程开启了这个样事务。事务开启第一件事就是获取数据库锁,并生成一个快照。然后执行 sql 从 100 改为 90,然后提交事务,事务一旦提交锁就释放了,数据也就改了。第一阶段完成就轮到第二阶段,但同一时刻因为是并发现在有另外一个线程也开启了事务执行这个业务。事务 2 等待事务 1 释放锁了,然后拿到锁才能去保存快照并执行业务 sql 将 90 改 为 80,然后提交事务并释放锁。
如果这时事务 1 拿到锁后它要做回滚,因为事务 1 的第二阶段还没做完,这时事务 1 的快照数据为 money=100 这是回滚就出问题了。
为了解决这个问题 AT 模式引入了一个叫 “全局锁” 的东西
全局锁:由 TC 记录当前正在操作某行数据的事务,改事务持有全局锁,具备执行权。
TC 通过一张表来进行记录,分别记录:当前由哪个事务在操作,操作的哪张表,主键 操作这张表的哪一行数据。通过记录查看这个表的这行数据就只能被这个事务进行操作,其它事务不能操作。
尽管如此,事务 1 在操作 money 字段的时候也可能会有一个没有被 seata 管理的业务来操作 money 字段此时就会产生脏写的问题了。分布式事务我们应该尽量的避免两个操作同一个字段。但是再怎么避免也不是不可能的情况
AT 模式也对这种方式做了一些处理。
当事务 1 第二阶段进行回滚时,会拿到 更新前的快照和更新后的快照。然后用更新后的快照数据去对当前的数据库数据做对比,判断是否一致,如果不一致就会记录异常,发送警告,人工介入
总结:
AT 模式的优点:
- 一阶段完成直接提交事务,释放数据库资源,性能比较好
- 利用全局锁实现读写隔离
- 没有代码侵入,框架自动完成回滚和提交
AT 模式的缺点:
- 两阶段之间属于软状态,属于最终一致
- 框架的快照功能会影响性能,但比 XA 模式好很多
# 2.4.2.5、实现 AT 模式🌴
AT 模式中的快照生成,回滚等动作都是由框架自动完成,没有任何代码侵入,因此实现非常简单。
1、导入 SQL 文件:seata-at.sql。打开 sql 文件其中 lock_table 导入到 TC 服务关联的数据库,undo_log 表导入到微服务管理的数据库:
2、修改 application.yml 文件,将事务模式修改为 AT 模式即可:
seata: | |
data-source-proxy-mode: AT # 开启数据源代理的 AT 模式 |
3、重启服务并测试
目前的数据库数据
account_tbl 表的数据
storage_tbl 表的数据
使用 postman 进行测试
测试后的 account_tbl 表的数据
测试后的 storage_tbl 表的数据
使用 postman 进行一次失败的测试:
库存数据现在不够 10 个我们扣减 10 个此时就会产生异常
测试后的 account_tbl 表的数据
测试后的 storage_tbl 表的数据
可以发现没有发生任何的变化啊。说明事务生效了,整个分布式事务都回滚了
accountApplication 服务在 idea 控制台中打印的信息如下:
可以看到最终删除了一个叫 undo_log 的表的数据也就是快照数据
10-18 14:32:15:309 INFO 19080 --- [h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:xid=192.168.45.1:8091:4251845158843265037,branchId=4251845158843265039,branchType=AT,resourceId=jdbc:mysql:///seata_demo,applicationData=null
10-18 14:32:15:310 INFO 19080 --- [h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 192.168.45.1:8091:4251845158843265037 4251845158843265039 jdbc:mysql:///seata_demo
10-18 14:32:15:347 INFO 19080 --- [h_RMROLE_1_2_16] i.s.r.d.undo.AbstractUndoLogManager : xid 192.168.45.1:8091:4251845158843265037 branch 4251845158843265039, undo_log deleted with GlobalFinished
10-18 14:32:15:348 INFO 19080 --- [h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
# 2.4.2.6、TCC 模式原理🌴
TCC 模式与 AT 模式非常相似,每阶段都是独立事务,不同的是 TCC 通过人工编码来实现数据恢复。需要实现三个方法:
1、Try:资源的检测和预留
2、Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功
3、Cancel:预留资源释放,可以理解为 try 的反向操作
举例,一个扣减用户余额的业务。假设账户 A 原来余额是 100,需要余额扣减 30 元
阶段一 (Try):检查余额是否充足,如果充足则冻结金额增加 30 元,可用余额扣除 30
阶段二 (Confirm):假如要提交,则冻结金额扣减 30
阶段二 (Cancel):如果要回滚,则冻结金额扣减 30,可用余额增加 30
TCC 的工作模型图:
由 TM 开启并注册全局事务到 TC 上,然后 TM 通知每一个分支事务去执行,分支事务被 RM 做拦截,RM 先去注册一下分支事务。然后再执行操作,第一阶段进行 Try。Try 是一个独立的资源会直接提交,提交后报告事务的状态。二阶段:TM 通知 TC 事务结束了,然后 TC 做事务状态的判断,看看分支资源够不够。如果够直接 Confirm 提交,如果不够就执行 Cancel 逻辑进行回滚。
而这三个 Try,Confirm,Cancel 都是需要人工编写的
总结:
TCC 模式的每个阶段是做什么的?
- Try:资源检查和预留
- Confirm:业务执行和提交
- Cancel:预留资源的释放
TCC 的优点是什么?
- 一阶段完成直接提交事务,释放数据库资源,性能好
- 相比 AT 模型,无需生成快照,无需使用全局锁,性能最强
- 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库比如:redis
TCC 的缺点是什么?
- 有代码侵入,需要认为编写 Try,Confirm 和 Cancel 接口,太麻烦
- 软状态,事务是最终一致
- 需要考虑 Confirm 和 Cancel 的失败情况,做好幂等处理
# 2.4.2.6.1、案例,改造 account-service 服务,利用 TCC 实现分布式事务🎋
需求如下:
1、修改 acoount-service,编写 Try,Confirm,Cancel 逻辑
2、Try 业务:添加冻结金额,扣减可用金额
3、Confirm 业务:删除冻结金额
4、Cancel 业务:删除冻结金额,恢复可用金额
5、保证 Confirm,Cancel 接口的 <font color='red'> 幂等性 </font>.
6、允许 <font color='red'> 空回滚 </font>.
7、拒绝 <font color='red'> 业务悬挂 </font>.
# 2.4.2.6.2、TCC 的空回滚和业务悬挂🎋
当某分支事务的 try 阶段阻塞时,可能导致全局事务超时而触发二阶段的 cancel 操作。在未执行 try 操作时先执行了 cancel 操作,这时 cancel 不能做回滚,就是 <font color='red'> 空回滚 </font>.
因此也不能报错。报错就会重试就会陷入一个死循环中。
对于已经空回滚的业务,如果以后继续执行 try,就永远不可能 Confirm 或 Cancel,这就是 <font color='red'> 业务悬挂 </font>。应当阻止执行空回滚后的 try 操作,避免悬挂
当第二个事务去执行的时候就会被阻塞,阻塞后得不到执行 TM 就会卡在这里,等待超时后就会报一个错误给 TC 了,TC 此时就会返回一个回滚的通知于是 RM 就会执行 Cancel 的逻辑了。
第一个分支执行 Cancel 没问题因为它已经 Try 了,但是第二个分支就出问题了 还没冻结金额就去恢复金额去了
业务分析
为了实现空回滚,防止业务悬挂,以及幂等性要求。我们必须在数据库记录冻结金额的同时,记录当前事务 id 和执行状态,为此我们设计了一张表 (account_freeze_tbl 这个表在项目中 sql 文件夹已经提供了):
有了表后我们还需要创建实体类和 mapper
实体类
@Data | |
@TableName("account_freeze_tbl") | |
public class AccountFreeze { | |
@TableId(type = IdType.INPUT) | |
private String xid; | |
private String userId; | |
private Integer freezeMoney; | |
private Integer state; | |
public static abstract class State { | |
public final static int TRY = 0; | |
public final static int CONFIRM = 1; | |
public final static int CANCEL = 2; | |
} | |
} |
mapper
public interface AccountFreezeMapper extends BaseMapper<AccountFreeze> { | |
} |
# 2.4.2.6.3、声明 TCC 接口:
TCC 的 Try,Confirm,Cancel 方法都需要在接口中基于注解来声明,语法如下:
@LocalTCC | |
public interface AccountTCCService { | |
// Try 逻辑,@TwoPhaseBusinessAction 中的 name 属性要与当前方法名一致,用于指定 Try 逻辑对应方法 | |
@TwoPhaseBusinessAction(name = "deduct", | |
commitMethod = "confirm", rollbackMethod = "cancel") | |
void deduct(@BusinessActionContextParameter(paramName = "userId") String userId, | |
@BusinessActionContextParameter(paramName = "money") int money); | |
// 二阶段 confirm 确认方法,可以另命名,但要保证与 commitMethod 一致 | |
// @Param context 上下文,可以传递 Try 方法的参数 | |
// @return boolean 执行是否成功 | |
boolean confirm(BusinessActionContext ctx); | |
// 二阶段回滚方法,要保证与 rollbackMethod 一致 | |
boolean cancel(BusinessActionContext ctx); | |
} |
实现类:
@Slf4j | |
@Service | |
public class AccountTccServiceImpl implements AccountTCCService { | |
private AccountMapper mapper; | |
private AccountFreezeMapper freezeMapper; | |
public AccountTccServiceImpl(AccountMapper mapper, | |
AccountFreezeMapper freezeMapper) | |
{ | |
this.mapper = mapper; | |
this.freezeMapper = freezeMapper; | |
} | |
@Override | |
@Transactional | |
public void deduct(String userId, int money) { | |
// 1. 获取事务 id | |
String xid = RootContext.getXID(); | |
// 1.1. 判断 freeze 中是否有冻结记录,如果有,一定是 CANCEL 执行过,要拒绝业务 | |
AccountFreeze freeze1 = freezeMapper.selectById(xid); | |
if(freeze1 != null) | |
{ | |
// CANCEL 执行过,要拒绝业务 | |
return; | |
} | |
// 2. 扣减可用余额 | |
mapper.deduct(userId, money); | |
// 3. 记录冻结金额,事务状态 | |
AccountFreeze freeze = new AccountFreeze(); | |
freeze.setUserId(userId); | |
freeze.setFreezeMoney(money); | |
freeze.setState(AccountFreeze.State.TRY); | |
freeze.setXid(xid); | |
freezeMapper.insert(freeze); | |
} | |
@Override | |
public boolean confirm(BusinessActionContext ctx) { | |
// 1. 获取事务 id | |
String xid = ctx.getXid(); | |
// 2. 根据 id 删除冻结记录 | |
int count = freezeMapper.deleteById(xid); | |
return count == 1; | |
} | |
@Override | |
public boolean cancel(BusinessActionContext ctx) { | |
// 1. 从上下文中获取 用户的 id 和余额 | |
String xid = ctx.getXid(); | |
String userId = ctx.getActionContext("userId").toString(); | |
AccountFreeze freeze = freezeMapper.selectById(xid); | |
// 1.1. 空回滚的判断,判断 freeze 是否为 null,为 null 证明 try 没有执行,需要空回滚 | |
if(freeze == null) | |
{ | |
// 证明 try 没有执行,需要回滚 | |
freeze = new AccountFreeze(); | |
freeze.setUserId(userId); | |
freeze.setFreezeMoney(0); | |
freeze.setState(AccountFreeze.State.CANCEL); | |
freeze.setXid(xid); | |
freezeMapper.insert(freeze); | |
return true; | |
} | |
// 1.2. 幂等判断 | |
if(freeze.getState() == AccountFreeze.State.CANCEL) | |
{ | |
// 已经处理过一次 CANCEL 了,无需重复处理 | |
return true; | |
} | |
// 2. 恢复可用余额 | |
mapper.refund(freeze.getUserId(), freeze.getFreezeMoney()); | |
// 3. 将冻结金额清零,状态改为 CANCEL | |
freeze.setFreezeMoney(0); | |
freeze.setState(AccountFreeze.State.CANCEL); | |
int count = freezeMapper.updateById(freeze); | |
return count == 1; | |
} | |
} |
目前的数据库数据
account_tbl 表的数据
storage_tbl 表的数据
使用 postman 进行测试
请求完后查看数据库的数据
account_tbl 表的数据
storage_tbl 表的数据
测试失败的 postman 请求
因为此时库存的数量不够 10 个,我们减去 10 个就会产生异常
查看数据库的数据情况
测试后的 account_tbl 表的数据
测试后的 storage_tbl 表的数据
可以看到完成了数据的回滚
# 2.4.2.7、Saga 模式🌴
Saga 模式是 SEATA 提供的长事务解决方案。也分为两个阶段:
- 一阶段:直接提交本地事务
- 二阶段:成功则什么都不做,失败则通过编写补偿业务来回滚
Saga 模式优点:
1、事务参与者可以基于事务驱动实现异步调用,吞吐高
2、一阶段直接提交事务,无锁,性能好
3、不用编写 TCC 中的三个阶段,实现简单
缺点:
1、软状态支持时间不确定,时效性差
2、没有锁,没有事务隔离,会有脏写
# 2.4.2.8、四种模式对比🌴
XA | AT | TCC | SAGA | |
---|---|---|---|---|
一致性 | 强一致 | 弱一致 | 弱一致 | 最终一致 |
隔离性 | 完全隔离 | 基于全局锁隔离 | 基于资源预留隔离 | 无隔离 |
代码侵入 | 无 | 无 | 有,要编写三个接口 | 有,要编写状态机和补偿业务 |
性能 | 差 | 好 | 非常好 | 非常好 |
场景 | 对一致性,隔离性有高要求的业务 | 基于关系型数据库的大多数分布式场景都可以 | 对性能要求较高的事务 <br /> 有非关系型数据库要参与的事务 | 业务流程长,业务流程多 <br /> 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口 |
# 三、高可用🎄
- 高可用集群结构
- 实现高可用集群
TC 的异地多机房容灾架构
TC 服务作为 Seata 的核心服务,一定要保证高可用和异地容灾.
具体实现查看文章:seata 的部署和集成.md.
# 四,多级缓存🎄
亿级流量的缓存方案
# 4.1、传统缓存的问题🌳
传统的缓存策略一般是请求到达 Tomcat 后,先查询 redis,如果未命中则查询数据库,存在下面的问题:
1、请求要经过 Tomcat 处理,Tomcat 的性能成为整个系统的瓶颈
2、Redis 缓存失效时,会对数据库产生冲击
# 4.2、多级缓存方案🌳
多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻 Tomcat 压力,提升服务性能:
用作缓存的 Nginx 是业务 Nginx,需要部署为集群,再有专门的 Nginx 用来做反向代理:
# 4.3、JVM 进程缓存🌳
- 导入商品案例
- 初始 Caffeine
- 实现进程缓存
# 4.3.1、导入商品案例🌲
查看文章:案例导入说明.
# 4.3.2、初始 Caffeine🌲
# 4.3.2.1、本地进程缓存🌴
缓存在日常开发中起到至关重要的作用。由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们吧缓存分为两类:
1、分布式缓存,例如 Redis:
- 优点:存储容量更大,可靠性更好,可以在集群间共享
- 缺点:访问缓存有网络开销
- 场景:缓存数据量较大,可靠性要求较高,需要在集群间共享
2、进程本地缓存,例如 HashMap,GuavaCache:
- 优点:读取本地内存,没有网络开销,速度更快
- 缺点:存储容量有限,可靠性较低,无法共享
- 场景:性能要求较高,缓存数据量较小
Caffeine 是一个基于 java8 开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前 Spring 内部的缓存使用的就是 Caffeine。Github 地址:https://github.com/ben-manes/caffeine
# 4.3.2.1.1、Caffeine 示例🎋
可以通过 item-service 项目中的单元测试来学习 Caffeine 的使用:
@Test | |
void testBasicOpsTwo() | |
{ | |
// 1. 构建 cache 对象 | |
Cache<String, String> cache = Caffeine.newBuilder().build(); | |
// 2. 存数据 | |
cache.put("gf", "郭明然"); | |
// 3. 取数据 | |
String gf = cache.getIfPresent("gf"); | |
System.out.println("gf = " + gf); | |
// 4. 取数据,如果未命中,则查询数据库 | |
String defaultGF = cache.get("defaultGF", key -> { | |
// 4.1. 根据 key 去查询数据库 | |
return "是个傻子"; | |
}); | |
System.out.println("defaultGF = " + defaultGF); | |
} | |
//------------------ 执行结果 ------------------ | |
gf = 郭明然 | |
defaultGF = 是个傻子 |
Caffeine 提供了三种缓存驱逐策略:
1、基于容量:设置缓存的数量上限
@Test | |
void testBasicOpsTwo() | |
{ | |
// 1. 构建 cache 对象 | |
Cache<String, String> cache = Caffeine.newBuilder() | |
.maximumSize(1) // 设置缓存大小上限为 1 | |
.build(); | |
// 2. 存数据 | |
cache.put("gf", "郭明然"); | |
cache.put("gf1", "郭明然1"); | |
cache.put("gf2", "郭明然2"); | |
// try { | |
// // 延迟 10ms,给清理线程一点时间 | |
// Thread.sleep(100); | |
// } catch (InterruptedException e) { | |
// throw new RuntimeException(e); | |
// } | |
// 3. 取数据 | |
System.out.println(cache.getIfPresent("gf")); | |
System.out.println(cache.getIfPresent("gf1")); | |
System.out.println(cache.getIfPresent("gf2")); | |
} | |
//---------------- 打印结果 ---------------- | |
郭明然 | |
郭明然1 | |
郭明然2 |
使用线程睡眠让其等待 10ms 给清理线程一点时间
@Test | |
void testBasicOpsTwo() | |
{ | |
// 1. 构建 cache 对象 | |
Cache<String, String> cache = Caffeine.newBuilder() | |
.maximumSize(1) // 设置缓存大小上限为 1 | |
.build(); | |
// 2. 存数据 | |
cache.put("gf", "郭明然"); | |
cache.put("gf1", "郭明然1"); | |
cache.put("gf2", "郭明然2"); | |
try { | |
// 延迟 10ms,给清理线程一点时间 | |
Thread.sleep(10l); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
// 3. 取数据 | |
System.out.println(cache.getIfPresent("gf")); | |
System.out.println(cache.getIfPresent("gf1")); | |
System.out.println(cache.getIfPresent("gf2")); | |
} | |
//---------------- 打印结果 ---------------- | |
null | |
null | |
郭明然2 |
2、基于时间:设置缓存的有效时间
@Test | |
void testBasicOpsTwo() | |
{ | |
// 1. 构建 cache 对象 | |
Cache<String, String> cache = Caffeine.newBuilder() | |
.expireAfterWrite(Duration.ofSeconds(10)) // 设置缓存有效期为 10 秒,从最后一次写入开始计时 | |
.build(); | |
// 2. 存数据 | |
cache.put("gf", "郭明然"); | |
cache.put("gf1", "郭明然1"); | |
cache.put("gf2", "郭明然2"); | |
// 3. 取数据 | |
System.out.println(cache.getIfPresent("gf")); | |
try { | |
// 延迟 10ms,给清理线程一点时间 | |
Thread.sleep(11000); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
System.out.println(cache.getIfPresent("gf1")); | |
System.out.println(cache.getIfPresent("gf2")); | |
} | |
//------------------- 打印结果 ------------------- | |
郭明然 | |
null | |
null |
3、基于引用:设置缓存为软引用或弱引用,利用 GC 来回收缓存数据。性能较差,不建议使用。
软引用:内存不足时才会回收; 弱引用:就算内存充足时也会回收;
在默认情况下,当一个缓存元素过期的时候,Caffeine 不会自动立即将其清理和驱逐。而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐。
# 4.3.3、实现进程缓存🌲
# 4.3.3.1、案例,实现商品的查询的本地进程缓存🌴
利用 Caffeine 实现下列需求:
1、给根据 id 查询商品的业务添加缓存,缓存未命中时查询数据库
2、给根据 id 查询商品存库的业务添加缓存,缓存未命中时查询数据库
3、缓存初始大小为 100
4、缓存上限为 10000
步骤:
1、给根据 id 查询商品的业务添加缓存,缓存未命中时查询数据库
2、给根据 id 查询商品存库的业务添加缓存,缓存未命中时查询数据库
3、缓存初始大小为 100
4、缓存上限为 10000
@Configuration | |
public class CaffeineConfig { | |
@Bean | |
public Cache<Long, Item> itemCache() | |
{ | |
return Caffeine.newBuilder() | |
.initialCapacity(100) // 设置缓存初始化大小 | |
.maximumSize(10_000) // 设置缓存最大上限值 | |
.build(); | |
} | |
@Bean | |
public Cache<Long, ItemStock> itemStockCache() | |
{ | |
return Caffeine.newBuilder() | |
.initialCapacity(100) // 设置缓存初始化大小 | |
.maximumSize(10_000) // 设置缓存最大上限值 | |
.build(); | |
} | |
} |
5、在 controller 层进行两个缓存对象的自动装配
@Autowired | |
private Cache<Long, Item> itemCache; | |
@Autowired | |
private Cache<Long, ItemStock> itemStockCache; |
6、编写缓存代码
@GetMapping("/{id}") | |
public Item findById(@PathVariable("id") Long id){ | |
// 优先根据缓存查,如果未命中则查询数据库 | |
return itemCache.get(id, key -> { | |
return itemService.query() | |
.ne("status", 3).eq("id", key) | |
.one(); | |
}); | |
} | |
@GetMapping("/stock/{id}") | |
public ItemStock findStockById(@PathVariable("id") Long id){ | |
return itemStockCache.get(id, key -> { | |
return stockService.getById(id); | |
}); | |
} |
请求接口进行测试:http://localhost:8081/item/10001
第一次请求我们可以在 idea 控制台中看到打印了一段执行 SQL 的日志,说明查了一下数据库
17:31:53:355 DEBUG 5692 --- [nio-8081-exec-1] c.h.item.mapper.ItemMapper.selectOne : ==> Preparing: SELECT id,name,title,price,image,category,brand,spec,status,create_time,update_time FROM tb_item WHERE (status <> ? AND id = ?)
17:31:53:383 DEBUG 5692 --- [nio-8081-exec-1] c.h.item.mapper.ItemMapper.selectOne : ==> Parameters: 3(Integer), 10001(Long)
17:31:53:405 DEBUG 5692 --- [nio-8081-exec-1] c.h.item.mapper.ItemMapper.selectOne : <== Total: 1
清空 idea 控制台再对该页面进行一次访问,因为缓存一开始就是空的
当我们执行过一次后数据就会被缓存起来,第二次执行就直接拿缓存的数据了可以看到控制台很干净没有一点打印信息
当然库存也是一样的道理
访问 uri:http://localhost:8081/item/stock/10001
清空控制台信息进行第二次访问,此时数据已经被缓存起来了。
二次依旧很干净
# 五、Lua 语言入门🎄
因为 Nginx 开发需要使用 Lua 语言来进行编程,所以我们就需要学习 Lua 语言
学习内容:
- 初始 Lua
- 变量和循环
- 条件控制,函数
# 5.1、初始 Lua🌳
Lua 是一种轻量级小巧的脚本语言,用标准 C 语言编写并以源代码形式开放,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。官网:http://www.lua.org/
# 5.1.1、HelloWorld🌲
1、在 Linux 虚拟机的任意目录下,新建一个 hello.lua 文件
[root@localhost ~]# vim hello.lua |
2、添加下面的内容
print("Hello World!") |
3、运行
[root@localhost ~]# lua hello.lua | |
Hello World! | |
[root@localhost ~]# |
# 5.1.2、数据类型🌲
数据类型 | 描述 |
---|---|
nil | 这个最简单,只有值 nil 属于该类,表示一个无效值 (在条件表达式中相当于 false) |
boolean | 包含两个值:false 和 true |
number | 表示双精度类型的实浮点数 |
string | 字符串由一对双引号或单引号来表示 |
function | 由 C 或 Lua 编写的函数 |
table | Lua 中的表 (table) 其实是一个 “关联数组” (associative arrays),数组的索引可以是数字,字符串或表类型。在 Lua 里,table 的创建是通过 “构造表达式” 来完成,最简单构造表达式是 { },用来创建一个空表。 |
可以利用 type 函数测试给定变量或者值的类型
[root@localhost ~]# vim hello.lua | |
[root@localhost ~]# lua hello.lua | |
string | |
[root@localhost ~]# cat hello.lua | |
print(type("Hello World!")) | |
[root@localhost ~]# |
[root@localhost ~]# vim hello.lua | |
[root@localhost ~]# lua hello.lua | |
number | |
[root@localhost ~]# cat hello.lua | |
print(type(10.4 * 3)) | |
[root@localhost ~]# |
# 5.1.3、变量🌲
Lua 声明变量的时候,并不需要指定数据类型:
-- 声明字符串 | |
local str = 'hello' | |
-- 声明数字 | |
local num = 21 | |
-- 声明布尔类型 | |
local flag = true | |
-- 声明数组 key 为索引的 table | |
local arr = {'java', 'python', 'lua'} | |
-- 声明 table ,类似 java 的 map | |
local map = {name = 'Jack', age = 21} |
访问 table:
-- 访问数组,lua 数组的角标从 1 开始 | |
print(arr[1]) | |
-- 访问 table | |
print(map['name']) | |
print(map.name) |
字符串拼接使用 ..
# 5.1.4、循环🌲
数组,table 都可以利用 for 循环来遍历
- 遍历数组:
-- 声明数组 key 为索引的 table | |
local arr = {'java', 'python', 'lua'} | |
-- 遍历数组 | |
for index, value in ipairs(arr) do | |
print(index, value) | |
end |
- 遍历 table
-- 声明 map,也就是 table | |
local map = {name = 'Jack', age = 21} | |
-- 遍历 table | |
for key,value in pairs(map) do | |
print(key, value) | |
end |
# 5.1.5、函数🌲
定义函数的语法:
function 函数名( argument1, argument2, argument3 ...) | |
-- 函数体 | |
return 返回值 | |
end |
例如,定义一个函数,用来打印数组:
function printArr(arr) | |
for index, value in ipairs(arr) do | |
print(value) | |
end | |
end |
# 5.1.6、条件控制🌲
类似 Java 的条件控制,例如 if , else 语法:
if(布尔表达式) | |
then | |
-- [布尔表达式为 true 时执行该语句块 --] | |
else | |
-- [布尔表达式为 false 时执行该语句块 --] | |
end |
与 java 不同,布尔表达式中的逻辑运算是基于英文单词:
操作符 | 描述 | 实例 |
---|---|---|
and | 逻辑与操作符。若 A 为 false,则返回 A,否则返回 B | (A and B) 为 false |
or | 逻辑或操作符。若 A 为 true,则返回 A,否则返回 B | (A or B) 为 true |
not | 逻辑非操作符。与逻辑运算结果相反,如果条件为 true,逻辑非为 false | not (A and B) 为 true |
# 六、多级缓存🎄
- 安装 OpenResty
- OpenResty 快速入门
- 请求参数处理
- 查询 Tomcat
- Redis 缓存预热
- 查询 Redis 缓存
- Nginx 本地缓存
# 6.1、初始 OpenResty🌳
OpenResty 是一个基于 Nginx 的高性能 Web 平台,用于方便地搭建能够处理高并发,扩展性极高的动态 Web 应用,Web 服务和动态网关。具备下列特点:
- 具备 Nginx 的完整功能
- 基于 Lua 语言进行扩展,集成了大量精良的 Lua 库,第三方模块
- 允许使用 Lua 自定义业务逻辑,自定义库
官方网站:https://openresty.org/cn/
# 6.1.1、OpenResty 安装🌲
查看文章:安装 OpenResty.
# 6.1.2、案例,OpenResty 快速入门,实现商品详情页数据查询🌲
商品详情页目前展示的是假数据,在浏览器的控制台可以看到查询商品信息的请求:
而这个请求最终被反向代理到虚拟机的 OpenResty 集群:
需求:在 OpenResty 中接收这个请求,并返回一段商品的假数据。
步骤:一。
1、在 nginx.conf 的 http 下面,添加对 OpenResty 的 Lua 模块的加载
# lua 模块 | |
lua_package_path "/usr/local/openresty/lualib/?.lua;;"; | |
# c 模块 | |
lua_package_cpath "/usr/local/openresty/lualib/?.so;;"; |
2、在 nginx.conf 的 server 下面,添加对 /api/item 这个路径的监听
# 这里可以理解为就是 SpringMVC 中的 Controller | |
location /api/item { | |
# 默认的响应类型 | |
default_type application/json; | |
# 这里可以理解为就是 Service 业务逻辑层 | |
# 响应结果由 lua/item.lua 下的文件决定 | |
content_by_lua_file lua/item.lua; | |
} |
步骤:二。
1、在 nginx 目录创建文件夹:lua
[root@localhost nginx]# mkdir lua |
2、在 lua 文件夹下,新建文件:item.lua
[root@localhost nginx]# touch lua/item.lua |
3、内容如下:
-- 返回假数据,这里的 ngx.say () 函数,就是写数据到 Response 中 | |
ngx.say('{"id":10001, "name":"SALSA AIR"}') |
这里面的假数据去 uri:http://localhost/item.html?id=10001 中打开控制台选择 vue 然后赋值 Item 也就是 Object 对象,将里面的数据作为假数据使用
这个页面是从 nginx 代理里面访问过来的
将假数据拷贝到 ngx.say (‘’) 里面
为了能区分一下是否成功,我们将里面的一些数据进行一下修改
此处只是修改了尺寸和价格
ngx.say('{"id":10001,"name":"SALSA AIR","title":"RIMOWA 31寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":27900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}') |
完整的 nginx.conf 配置内容:
#user nobody; | |
worker_processes 1; | |
error_log logs/error.log; | |
events { | |
worker_connections 1024; | |
} | |
http { | |
include mime.types; | |
default_type application/octet-stream; | |
sendfile on; | |
keepalive_timeout 65; | |
# lua 模块 | |
lua_package_path "/usr/local/openresty/lualib/?.lua;;"; | |
# c 模块 | |
lua_package_cpath "/usr/local/openresty/lualib/?.so;;"; | |
server { | |
listen 8081; | |
server_name localhost; | |
location /api/item { | |
# 默认的响应类型 | |
default_type application/json; | |
# 响应结果由 lua/item.lua 下的文件决定 | |
content_by_lua_file lua/item.lua; | |
} | |
location / { | |
root html; | |
index index.html index.htm; | |
} | |
error_page 500 502 503 504 /50x.html; | |
location = /50x.html { | |
root html; | |
} | |
} | |
} |
4、重新加载配置
nginx -s reload |
查看商品页面,没有刷新前
刷新后
可以看到尺寸和价格被修改了但是中文也乱码了。。。
# 6.2、OpenResty 获取请求参数🌳
OpenResty 提供了各种 API 用来获取不同类型的请求参数:
# 6.2.1、案例,获取请求路径中的商品 id 信息,拼接到 json 结果中返回🌲
在查询商品信息的请求中,通过路径占位符的方式,传递了商品 id 到后台:
需求:在 OpenResty 中接收这个请求,并获取路径中的 id 信息,拼接到结果的 json 字符串中返回
OpenResty 中接受请求
location ~ /api/item(\d+) { | |
# 默认的响应类型 | |
default_type application/json; | |
# 响应结果由 lua/item.lua 下的文件决定 | |
content_by_lua_file lua/item.lua; | |
} |
并获取路径中的 id 信息,拼接到结果的 json 字符串中返回
-- 获取路径参数 | |
local id = ngx.var[1] | |
-- 返回结果 | |
ngx.say('{"id":' .. id .. ',"name":"SALSA AIR","title":"RIMOWA 31寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":27900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}') |
重启 Linux 中的 nginx 服务器
nginx -s reload |
访问页面进行测试:
http://localhost/item.html?id=10001
请求结果
http://localhost/item.html?id=10002
请求结果
可以看到 id 就不再是写死的了,而是可以事实变化的
# 6.3、查询 Tomcat🌳
先看下多级缓存的架构。目前已经准备好了 nginx 反向代理服务器,它去接受前端的请求并且转到 OpenResty,服务端也编写好了进程缓存 OpenResty 也搭好了,但是 OpenResty 接收到请求后要查询数据。我们先实现从 Tomcat 去查然后再存储到缓存中。
需要注册 OPenResty 在虚拟机上,而 Tomcat 在本机上,ip 地址不同
两个 ip 地址访问的便捷方式:不管虚拟机的 ip 地址是什么只要官前三位就行了,然后把最后一位替换成 1 一定得到的就是 windows 电脑的位置。前提是需要关闭防火墙
# 6.3.1、案例,获取请求路径中的商品 id 信息,根据 id 向 Tomcat 查询商品信息🌲
这里要修改 item.lua,满足下面的需求:
1、获取请求参数中的 id
2、根据 id 向 Tomcat 服务发送请求,查询商品信息
3、根据 id 向 Tomcat 服务发送请求,查询库存信息
4、组装商品信息,库存信息,序列化为 Json 格式并返回
步骤:
一,nginx 内部发送 Http 请求
nginx 提供了内部 API 用以发送 http 请求:
# 注意:这里的 path 是路径,并不包含 ip 和端口。这个请求会被 nginx 内部的 server 监听并处理 | |
# 这肯定不行,因为我们希望可以发送给 Tomcat 所以就要指定 Tomcat 的 ip 和端口 | |
local resp = ngx.location.capture("/path", { | |
method = ngx.HTTP_GET, -- 请求方式 | |
args = {a = 1, b = 2}, -- get方式传参数 | |
body = "c=3&d=4" -- post方式传参数 | |
}) |
返回的响应内容包括:
- resp.status:响应状态码
- resp.header:响应头,是一个 table
- resp.body:响应体,就是响应数据
我们希望这个请求发送到 Tomcat 服务器,所以还需要编写一个 server 来对这个路径做反向代理:
location /path { | |
# 这里是 windows 电脑的 ip 和 Java 服务端口,需要确保 windows 防火墙处于关闭状态 | |
proxy_pass http://192.168.137.169:8081; | |
} |
编写代码:
server { | |
listen 8081; | |
server_name localhost; | |
location /item { | |
proxy_pass http://192.168.137.1:8081; | |
} | |
location ~ /api/item/(\d+) { | |
# 默认的响应类型 | |
default_type application/json; | |
# 响应结果由 lua/item.lua 下的文件决定 | |
content_by_lua_file lua/item.lua; | |
} | |
location / { | |
root html; | |
index index.html index.htm; | |
} | |
error_page 500 502 503 504 /50x.html; | |
location = /50x.html { | |
root html; | |
} | |
} |
Tomcat 的反向代理搞定了,以后只要调用 capture API 请求是 item 开头就一定能达到 Tomcat。所以以后就可以放心的使用这套 API 了,那么既然这个 API 经常要使用。那我们可以将发请求的代码封装成一个函数
# 6.3.1.1、封装 HTTP 查询的函数🌴
我们可以吧 http 查询的请求封装为一个函数,放到 OPenResty 函数库中,方便后期使用。
1、在 /usr/local/openresty/lualib 目录下创建 common.lua 文件:
为什么是这个位置? 因为在上述配置 nginx 加载模块中指定的路径就是加载这个位置的任何.lua 文件
vim /usr/local/openresty/lualib/common.lua |
2、在 common.lua 中封装 http 查询的函数
-- 封装函数,发送 http 请求,并解析响应 | |
local function read_http(path, params) | |
local resp = ngx.location.capture(path,{ | |
method = ngx.HTTP_GET, | |
args = params, | |
}) | |
if not resp then | |
-- 记录错误信息,返回 404 | |
ngx.log(ngx.ERR, "http not found, path: ", path , ", args: ", args) | |
ngx.exit(404) | |
end | |
return resp.body | |
end | |
-- 将方法导出 | |
local _M = { | |
read_http = read_http | |
} | |
return _M |
在 item.lua 文件中编写代码如下:
-- 导入 common 函数库 | |
local common = require('common') | |
local read_http = common.read_http | |
-- 获取路径参数 | |
local id = ngx.var[1] | |
-- 查询商品信息 | |
local itemJSON = read_http("/item/" .. id, nil) | |
-- 查询库存信息 | |
local stockJSON = read_http("/item/stock" .. id, nil) | |
-- 返回结果 | |
ngx.say(itemJSON) |
访问 uri 进行一下请求测试:http://localhost/item.html?id=10003
返回的数据
{
"id": 10003,
"name": "韩版牛仔裤",
"title": "唐狮新品牛仔裤女学生韩版宽松裤子 A款/中牛仔蓝(无绒款) 26",
"price": 84600,
"image": "https://m.360buyimg.com/mobilecms/s720x720_jfs/t26989/116/124520860/644643/173643ea/5b860864N6bfd95db.jpg!q70.jpg.webp",
"category": "牛仔裤",
"brand": "唐狮",
"spec": "{\"颜色\": \"蓝色\", \"尺码\": \"26\"}",
"status": 1,
"createTime": "2019-05-01T00:00:00.000+00:00",
"updateTime": "2019-05-01T00:00:00.000+00:00",
"stock": null,
"sold": null
}
现在所有的商品都能正常查询了,但是!
里面的,销量和存库的数据还没有。
但是如果要是有 3 张表,4 张表,5 张表都需要把它们拼接起来,所以最终需要的是完整的数据
但是 itemJSON 得到的数据是一个 Json 数据,我们需要转换为 Lua 里面的对象,使用 table 这个数据类型来进行存储
如下就是将 JSON 反序列化为 Lua 对象的方式:
# 6.3.1.2、JSON 结果处理🌴
OPenResty 提供了一个 cjson 的模块用来处理 JSON 的序列化和反序列化。
官方地址:https://github.com/openresty/lua-cjson/
该模块的目录位置如下:/usr/local/openresty/lualib/
使用:
1、导入 cjson 模块:
local cjson = require('cjson') |
2、序列化:
local obj = { | |
name = 'jack', | |
age = 21 | |
} | |
local json = cjson.encode(obj) |
3、反序列化
local json = '{"name": "jack", "age": 21}' | |
-- 反序列化 | |
local obj = cjson.decode(json) | |
print(obj.name) |
继续下面的代码步骤:
1、将请求到的 json 数据序列化为 lua 中的 table 数据
2、再将序列化的数据进行调用拼接
3、拼接完毕后再将 lua 中 table 数据序列化为 json 响应到前端
-- 导入 common 函数库 | |
local common = require('common') | |
local read_http = common.read_http | |
-- 导入 cjson 库 | |
local cjson = require('cjson') | |
-- 获取路径参数 | |
local id = ngx.var[1] | |
-- 查询商品信息 | |
local itemJSON = read_http("/item/" .. id, nil) | |
-- 查询库存信息 | |
local stockJSON = read_http("/item/stock/" .. id, nil) | |
-- JSON 转化为 lua 的 table | |
local item = cjson.decode(itemJSON) | |
local sto = cjson.decode(stockJSON) | |
-- 组合数据 | |
item.stock = sto.stock | |
item.sold = sto.sold | |
-- 把 item 序列化为 json 返回结果 | |
ngx.say(cjson.encode(item)) |
请求 uri 进行测试:http://localhost/item.html?id=10004
请求到的数据
{
"spec": "{\"颜色\": \"白色\", \"尺码\": \"36\"}",
"sold": 974,
"status": 1,
"updateTime": "2019-05-01T00:00:00.000+00:00",
"title": "森马(senma)休闲鞋女2019春季新款韩版系带板鞋学生百搭平底女鞋 黄色 36",
"stock": 99999,
"price": 10400,
"image": "https://m.360buyimg.com/mobilecms/s720x720_jfs/t1/29976/8/2947/65074/5c22dad6Ef54f0505/0b5fe8c5d9bf6c47.jpg!q70.jpg.webp",
"createTime": "2019-05-01T00:00:00.000+00:00",
"category": "休闲鞋",
"name": "休闲板鞋",
"brand": "森马",
"id": 10004
}
可以看到有库存和销量的数据了
# 6.3.2、Tomcat 集群的负载均衡🌲
我们已经实现了,从 OPenResty 向 tomcat 发送一个 http 请求去查询商品信息返回页面,完成渲染。但是在上述实现案例中 Tomcat 只有一台
在实际生产中 Tomcat 一定是一个集群。所以 OPenResty 再发送请求时必须得对多台 Tomcat 实现一个负载均衡
比方说 负载均衡挑选了 8081,然后请求就会到达 8081 然后去查询数据库。
查询完数据库后会形成一个 jvm 进程缓存,保存在 8081 这台服务器上,缓存有了以后下次可以直接读取缓存就不用查询数据库了性能就非常好了。但是非常可惜的是进程缓存是不能共享的 8081 有这个缓存,8082 并没有,而负载均衡的规则是 轮询。如果这样访问缓存就会在多台 Tomcat 去冗余的保存。第一个占用额外空间,第二是缓存的命中率也有点惨
如果说我想要商品 10001 第一次查询完以后永远都有缓存。那么我必须让 item/10001 这个请求每次都指向同一个服务器,这样才能保证这个缓存一直生效。假如说又来一个 item/10002 那我就让它指向下一个服务器。
不同商品永远访问不同服务器,相同商品永远访问同一个服务器,这样就能保证缓存永远命中。
这样 jvm 进程缓存才有意义。但是现在是轮询肯定做不到这一点,所以我们需要修改 nginx 负载均衡的算法了。
# tomcat 集群配置 | |
upstream tomcat-cluster { | |
# 根据请求过来的路径来计算 hash 值然后决定访问哪个服务器 | |
hash $request_uri; | |
server 192.168.249.128:8081; | |
server 192.168.249.128:8082; | |
} |
代码演示如下:
http { | |
include mime.types; | |
default_type application/octet-stream; | |
sendfile on; | |
keepalive_timeout 65; | |
# lua 模块 | |
lua_package_path "/usr/local/openresty/lualib/?.lua;;"; | |
# c 模块 | |
lua_package_cpath "/usr/local/openresty/lualib/?.so;;"; | |
upstream tomcat-cluster { | |
hash $request_uri; | |
server 192.168.249.128:8081; | |
server 192.168.249.128:8082; | |
} | |
server { | |
listen 8081; | |
server_name localhost; | |
location /item { | |
proxy_pass http://tomcat-cluster; | |
} | |
location ~ /api/item/(\d+) { | |
# 默认的响应类型 | |
default_type application/json; | |
# 响应结果由 lua/item.lua 下的文件决定 | |
content_by_lua_file lua/item.lua; | |
} | |
location / { | |
root html; | |
index index.html index.htm; | |
} | |
error_page 500 502 503 504 /50x.html; | |
location = /50x.html { | |
root html; | |
} | |
} | |
} |
在 idea 中开启 8082 端口的服务
启动
访问 uri 进行测试:http://localhost/item.html?id=10001
这个 uri 访问请求到的 Tomcat 服务为如下:
idea 控制台中打印信息:
访问 uri 进行测试:http://localhost/item.html?id=10002
这个 uri 访问请求到的 Tomcat 服务为如下:
idea 控制台中打印信息:
将连个服务的控制台打印的信息都清空然后再去访问一下 10001 和 10002
控制台都没有打印任何的信息说明缓存起作用了。
# 6.4、添加 Redis 缓存的需求🌳
请求成功后不应该直接查询 jvm 进程缓存而是应该先到 redis 中看看有没有缓存再去看 jvm 进程缓存
# 6.4.1、添加 Redis 缓存要面临的问题🌲
# 6.4.1.1、冷启动与缓存预热🌴
冷启动:服务刚刚启动时,Redis 中并没有缓存,如果所有商品数据都在第一次查询是添加缓存,可能会给数据库带来较大压力
缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到 Redis 中
我们数据流量较少,可以在启动时将所有数据都存入缓存中。
# 6.4.1.2、缓存预热🌴
步骤:
1、利用 Docker 安装 Redis
[root@localhost nginx]# docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes |
2、在 item-service 服务中引入 redis 依赖
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-data-redis</artifactId> | |
</dependency> |
3、配置 Redis 地址
spring: | |
redis: | |
host: 192.168.249.128 | |
port: 6379 |
4、编写初始化类
@Component | |
public class RedisHandler implements InitializingBean { | |
@Autowired | |
private StringRedisTemplate redisTemplate; | |
@Autowired | |
private ItemService itemService; | |
@Autowired | |
private IItemStockService stockService; | |
private static final ObjectMapper MAPPER = new ObjectMapper(); | |
@Override | |
public void afterPropertiesSet() throws Exception { | |
// 初始化缓存 | |
// 1. 查询商品信息 | |
List<Item> list = itemService.list(); | |
// 2. 放入缓存 | |
for (Item item : list) { | |
// 2.1. 序列化为 json | |
String json = MAPPER.writeValueAsString(item); | |
// 2.2. 存入 redis | |
redisTemplate.opsForValue().set("item:id:" + item.getId(), json); | |
} | |
// 1. 查询库存信息 | |
List<ItemStock> list1 = stockService.list(); | |
// 2. 放入缓存 | |
for (ItemStock itemStock : list1) { | |
// 2.1. 序列化为 json | |
String json = MAPPER.writeValueAsString(itemStock); | |
// 2.2. 存入 redis | |
redisTemplate.opsForValue().set("item:stock:id:" + itemStock.getId(), json); | |
} | |
} | |
} |
重启或启动 8081 和 8082 两个服务器
可以看到启动时 redis 初始化缓存去查询了数据库此时可以看下 redis 中的数据情况
[root@localhost nginx]# docker exec -it redis redis-cli | |
127.0.0.1:6379> keys * | |
1) "item:id:10004" | |
2) "item:stock:id:10004" | |
3) "item:id:10002" | |
4) "item:stock:id:10005" | |
5) "item:id:10001" | |
6) "item:stock:id:10001" | |
7) "item:stock:id:10003" | |
8) "item:id:10003" | |
9) "item:stock:id:10002" | |
10) "item:id:10005" | |
127.0.0.1:6379> |
# 6.5、查询 Redis 缓存🌳
# 6.5.1、OpenResty 的 Redis 模块🌲
OPenResty 提供了操作 Redis 的模块,我们只要引入该模块就能直接使用:
在 common.lua 文件中进行编写
- 引入 Redis 模块,并初始化 Redis 对象
为什么引入模块中文件名中间需要一个 . ?
解释:因为我们上述文章中都是引入的 lualib 目录下的模块因为直接在 lua 目录所以可以直接写文件名就行了。但是这个 redis 可不再 lualib 根目录中而是在 resty 目录下
-- 引入 redis 模块 | |
local redis = require('resty.redis') | |
-- 初始化 redis 对象 | |
local red = redis:new() | |
-- 设置 redis 超时时间 | |
red:set_timeouts(1000, 1000, 1000) |
- 封装函数,用来释放 Redis 链接,其实是放入连接池
-- 关闭 redis 连接的工具方法,其实是放入连接池 | |
local function close_redis(red) | |
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒 | |
local pool_size = 100 -- 连接池大小 | |
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size) | |
if not ok then | |
ngx.log(ngx.ERR, "放入redis连接池失败: ", err) | |
end | |
end |
OPenResty 提供了操作 Redis 的模块,我们只要引入该模块就能直接使用:
- 封装函数,从 Redis 读数据并返回:
-- 查询 redis 的方法 ip 和 port 是 redis 地址,key 是查询的 key | |
local function read_redis(ip, port, key) | |
-- 获取一个连接 | |
local ok, err = red:connect(ip, port) | |
if not ok then | |
ngx.log(ngx.ERR, "连接redis失败 : ", err) | |
return nil | |
end | |
-- 查询 redis | |
local resp, err = red:get(key) | |
-- 查询失败处理 | |
if not resp then | |
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key) | |
end | |
-- 得到的数据为空处理 | |
if resp == ngx.null then | |
resp = nil | |
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key) | |
end | |
close_redis(red) | |
return resp | |
end |
完整的代码编写:
-- 引入 redis 模块 | |
local redis = require('resty.redis') | |
-- 初始化 redis 对象 | |
local red = redis:new() | |
-- 设置 redis 超时时间 | |
red:set_timeouts(1000, 1000, 1000) | |
-- 关闭 redis 连接的工具方法,其实是放入连接池 | |
local function close_redis(red) | |
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒 | |
local pool_size = 100 -- 连接池大小 | |
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size) | |
if not ok then | |
ngx.log(ngx.ERR, "放入redis连接池失败: ", err) | |
end | |
end | |
-- 查询 redis 的方法 ip 和 port 是 redis 地址,key 是查询的 key | |
local function read_redis(ip, port, key) | |
-- 获取一个连接 | |
local ok, err = red:connect(ip, port) | |
if not ok then | |
ngx.log(ngx.ERR, "连接redis失败 : ", err) | |
return nil | |
end | |
-- 查询 redis | |
local resp, err = red:get(key) | |
-- 查询失败处理 | |
if not resp then | |
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key) | |
end | |
-- 得到的数据为空处理 | |
if resp == ngx.null then | |
resp = nil | |
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key) | |
end | |
close_redis(red) | |
return resp | |
end | |
-- 封装函数,发送 http 请求,并解析响应 | |
local function read_http(path, params) | |
local resp = ngx.location.capture(path,{ | |
method = ngx.HTTP_GET, | |
args = params, | |
}) | |
if not resp then | |
-- 记录错误信息,返回 404 | |
ngx.log(ngx.ERR, "http 查询失败了~, path: ", path , ", args: ", args) | |
ngx.exit(404) | |
end | |
return resp.body | |
end | |
-- 将方法导出 | |
local _M = { | |
read_http = read_http, | |
read_redis = read_redis | |
} | |
return _M |
在 item.lua 文件中进行导入 redis 工具函数
local read_redis = common.read_redis |
由于添加了 redis 缓存那么之前的查询商品信息和库存的函数就需要修改了。
但是需要修改两个太麻烦了所以对其进行封装函数:
需求:
1、修改 item.lua,封装一个函数 read_data,实现先查询 redis,如果未命中,再查询 tomcat
2、修改 item.lua,查询商品和库存时都调用 read_data 这个函数
-- 封装查询函数,先查询 redis,如果未命中再查询 tomcat | |
local function read_data(key, path, params) | |
-- 查询 redis | |
local resp = read_redis("192.168.249.128", 6379, key) | |
-- 判断 redis 是否命中 | |
if not resp then | |
-- redis 查询失败,再查询 tomcat | |
resp = read_http(path, params) | |
end | |
return resp | |
end |
编写完封装函数后将原来的请求函数进行替换
-- 查询商品信息 | |
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil) | |
-- 查询库存信息 | |
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil) |
item.lua 文件的完整代码:
-- 导入 common 函数库 | |
local common = require('common') | |
local read_http = common.read_http | |
local read_redis = common.read_redis | |
-- 导入 cjson 库 | |
local cjson = require('cjson') | |
-- 封装查询函数,先查询 redis,如果未命中再查询 tomcat | |
local function read_data(key, path, params) | |
-- 查询 redis | |
local resp = read_redis("192.168.249.128", 6379, key) | |
-- 判断 redis 是否命中 | |
if not resp then | |
-- redis 查询失败,再查询 tomcat | |
ngx.log("redis查询失败,尝试查询http, key :", key) | |
resp = read_http(path, params) | |
end | |
return resp | |
end | |
-- 获取路径参数 | |
local id = ngx.var[1] | |
-- 查询商品信息 | |
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil) | |
-- 查询库存信息 | |
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil) | |
-- JSON 转化为 lua 的 table | |
local item = cjson.decode(itemJSON) | |
local sto = cjson.decode(stockJSON) | |
-- 组合数据 | |
item.stock = sto.stock | |
item.sold = sto.sold | |
-- 把 item 序列化为 json 返回结果 | |
ngx.say(cjson.encode(item)) |
重启虚拟机中的 nginx
[root@localhost nginx]# nginx -s reload |
访问 uri 进行测试:http://localhost/item.html?id=10002
查看 idea 控制台
将控制台信息清空后再次访问 uri:http://localhost/item.html?id=10002
控制台情况
说明数据已经被缓存了
我们把两个服务器全关了
再访问一下 uri:http://localhost/item.html?id=10002
可以看到如果没有启动 Tomcat 也没事儿因为先去看了下 Redis 发现有相关数据直接拿来就不找服务器了。
# 6.6、Nginx 本地缓存🌳
OPenResty 为 Nginx 提供了 shard dict 的功能,可以在 Nginx 的多个 worker 之间共享数据,实现缓存功能。
1、开启共享字典,在 nginx.conf 的 http 下添加配置
# 共享字典,也就是本地缓存,名称叫做:item_cache,大小 150m | |
lua_shared_dict item_cache 150m; |
2、操作共享字典
在 item.lua 文件中导入
-- 导入共享词典,本地缓存 | |
local item_cache = ngx.shared.item_cache |
# 6.6.1、案例,在查询商品时,优先查询 OPenResty 的本地缓存🌲
需求:
1、修改 item.lua 中的 read_data 函数,优先查询本地缓存,未命中时在查询 Redis,Tomcat
2、查询 Redis 或 Tomcat 成功后,将数据写入本地缓存,并设置有效期
3、商品基本信息,有效期 30 分钟
4、库存信息,有效期 1 分钟
-- 封装查询函数,先查询 redis,如果未命中再查询 tomcat | |
function read_data(key, expire, path, params) | |
-- 查询本地缓存 | |
local val = item_cache:get(key) | |
if not val then | |
-- 查询 redis | |
val = read_redis("192.168.249.128", 6379, key) | |
-- 判断查询结果 | |
if not val then | |
-- redis 查询失败,查询 tomcat | |
val = read_http(path, params) | |
end | |
end | |
-- 查询成功,把数据写入本地缓存 | |
item_cache:set(key, val, expire) | |
-- 返回数据 | |
return val | |
end |
测试
切换到 nginx 目录下来监控 error.log 文件的打印情况
[root@localhost nginx]# nginx -s reload | |
[root@localhost nginx]# pwd | |
/usr/local/openresty/nginx | |
[root@localhost nginx]# tail -f logs/error.log |
# 七、缓存同步🎄
- 数据同步策略
- 安装 Canal
- 监听 Canal
# 7.1、数据同步策略🌳
缓存数据同步的常见方式有三种:
1、设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新
1.1、优势:简单,方便
1.2、缺点:时效性差,缓存过期之前可能不一致
1.3、场景:更新频率较低,时效性要求低的业务
2、同步双写:在修改数据库的同时,直接修改缓存
2.1、优势:时效性强,缓存与数据库强一致
2.2、缺点:有代码侵入,耦合度高
2.3、场景:对一致性,时效性要求较高的缓存数据
3、异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
3.1、优势:低耦合,可以同时通知多个缓存服务
3.2、缺点:时效性一般,可能存在终极爱你不一致状态
3.3、场景:时效性要求一般,有多个服务需要同步
# 7.1.1、缓存同步策略🌲
异步通知可以使用基于 MQ 的异步通知
但是基于 MQ 还是要修改里面的代码发送消息,而下面要讲的是基于 Canal 的异步通知
基于 Canal 的异步通知
canal 可以去监听数据库的变化
# 7.1.2、初始 Canal🌲
Canal 意译为 水道 / 管道 / 沟渠,Canal 是阿里巴巴旗下的一款开源项目,基于 Java 开发。基于数据库增量日志解析,提供增量数据订阅 & 消费。GiHub 地址:https://github.com/alibaba/canal
Canal 是基于 mysql 的主从同步来实现的,MySQL 主从同步的原理如下:
master 就是 mysql 的主节点,slave 就是 mysql 的从节点。主从数据是要做同步的,那它怎么做到的呢?首先 mysql 的主节点在做增删改查时就回去记录日志到 binary.log 文件中这个文件称为二进制日志文件。其中记录的就是 binary log events,里面记录的就是业务 sql。slave 会开启一个线程不断的来读取这个日志文件把这个文件读过来放到一个 relay log 文件中。这样主节点做了哪些 sql 从节点 也做哪些 sql
而 Canal 就是把自己伪装成 MySQL 的一个 Slave 节点,从而监听 master 的 binary log 变化。再把得到的变化信息通知给 Canal 的客户端,进而完成对其它数据库的同步。
# 7.1.3、Canal 安装🌲
查看文章:安装 Canal.
# 7.1.4、Canal 客户端🌲
Canal 提供了各种语言的客户端,当 Canal 监听到 binlog 变化时,会通知 Canal 的客户端。不过这里我们会使用 Github 上的第三方开源的 canal-stater。地址:https://github.com/NormanGyllenhaal/canal-client
引入依赖:
<dependency> | |
<groupId>top.javatool</groupId> | |
<artifactId>canal-spring-boot-starter</artifactId> | |
<version>1.2.1-RELEASE</version> | |
</dependency> |
编写配置:
canal: | |
destination: dkx1 # canal 实例名称,要跟 canal-server 运行时设置的 destiation 一致 | |
server: 192.168.249.128:11111 # canal 地址 |
编写监听器,监听 Canal 消息:
// 指定要监听的表 | |
@CanalTable("tb_item") | |
@Component // 指定表关联的实体类 | |
public class ItemHandler implements EntryHandler<Item> { | |
// 下面三个重写方法是,监听到数据库的增删改的消息的 | |
@Override | |
public void insert(Item item) { | |
// 写数据到 redis | |
// 写数据到 jvm 进程缓存 | |
EntryHandler.super.insert(item); | |
} | |
@Override | |
public void update(Item before, Item after) { | |
// 写数据到 redis | |
// 写数据到 jvm 进程缓存 | |
EntryHandler.super.update(before, after); | |
} | |
@Override | |
public void delete(Item item) { | |
// 写数据到 redis | |
// 写数据到 jvm 进程缓存 | |
EntryHandler.super.delete(item); | |
} | |
} |
Canal 推送给 canal-client 的是被修改的这一行数据 (row) ,而我们引入的 canal-client 则会帮我们把行数据封装到 item 实体类中。这个过程中需要知道数据库与实体的映射关系,要用到 JPA 的几个注解:
@Data | |
@TableName("tb_item") | |
public class Item { | |
@TableId(type = IdType.AUTO) | |
// 标记表中的 id 字段 | |
@Id | |
private Long id;// 商品 id | |
... 其它字段 省略 | |
private Date updateTime;// 更新时间 | |
// @Transient 标记不属于表中的字段 忽略 | |
@TableField(exist = false) | |
@Transient | |
private Integer stock; | |
@TableField(exist = false) | |
@Transient | |
private Integer sold; | |
} |
编写 redis 的缓存类 ,添加两个函数,一个新增或更新,一个是删除
public void saveItem(Item item) | |
{ | |
try { | |
String json = MAPPER.writeValueAsString(item); | |
redisTemplate.opsForValue().set("item:id:" + item.getId(), json); | |
} catch (JsonProcessingException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void deleteItemById(Long id) | |
{ | |
redisTemplate.delete("item:id:" + id); | |
} |
定义完 redis 的缓存类里面的函数后再回到编写监听器的类里面编写代码:
@CanalTable("tb_item") | |
@Component | |
public class ItemHandler implements EntryHandler<Item> { | |
@Autowired | |
private RedisHandler redisHandler; | |
@Autowired | |
private Cache<Long, Item> cache; | |
@Override | |
public void insert(Item item) { | |
// 写数据到 jvm 进程缓存 | |
cache.put(item.getId(), item); | |
// 写数据到 redis | |
redisHandler.saveItem(item); | |
} | |
@Override | |
public void update(Item before, Item after) { | |
// 写数据到 jvm 进程缓存 | |
cache.put(after.getId(), after); | |
// 写数据到 redis | |
redisHandler.saveItem(after); | |
EntryHandler.super.update(before, after); | |
} | |
@Override | |
public void delete(Item item) { | |
// 写数据到 jvm 进程缓存 | |
cache.invalidate(item.getId()); | |
// 写数据到 redis | |
redisHandler.deleteItemById(item.getId()); | |
EntryHandler.super.delete(item); | |
} | |
} |
启动服务查看 idea 打印信息:
meters: 10005(Long)
14:50:32:387 DEBUG 4956 --- [nio-8081-exec-7] c.h.i.mapper.ItemStockMapper.selectById : <== Total: 1
14:50:32:472 INFO 4956 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]
14:50:34:474 INFO 4956 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]
14:50:36:477 INFO 4956 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]
14:50:38:478 INFO 4956 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]
14:50:40:472 INFO 4956 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=false,rawEntries=[]]
表示 canal 与 java 成功的连接到了
那么测试一下数据同步打开 8081 端口的页面
比如说修改 10001 这个商品的数据:
原来的数据如下:
修改后的数据如下:
点击确定
再查看 idea 的控制台信息:
可以看到打印了一堆日志信息
version: 1
logfileName: "mysql-bin.000006"
logfileOffset: 1173
serverId: 1000
serverenCode: "UTF-8"
executeTime: 1697871191000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 91
}
entryType: TRANSACTIONBEGIN
storeValue: " H"
, header {
version: 1
logfileName: "mysql-bin.000006"
logfileOffset: 1344
serverId: 1000
serverenCode: "UTF-8"
executeTime: 1697871191000
sourceType: MYSQL
schemaName: "dkx"
tableName: "tb_item"
eventLength: 620
eventType: UPDATE
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA
storeValue: "\b_\020\002P\000b\324\n\n&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\00510001R\006bigint\nd\b\001\020\f\032\005title \000(\0000\000BCRIMOWA 21\345\257\270\346\211\230\350\277\220\347\256\261\346\213\211\346\235\206\347\256\261 SALSA AIR\347\263\273\345\210\227\346\236\234\347\273\277\350\211\262 820.70.36.4R\fvarchar(264)\n)\b\002\020\f\032\004name \000(\0000\000B\tSALSA AIRR\fvarchar(128)\n)\b\003\020\373\377\377\377\377\377\377\377\377\001\032\005price \000(\0000\000B\00516900R\006bigint\n\226\001\b\004\020\f\032\005image \000(\0000\000Buhttps://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webpR\fvarchar(200)\n-\b\005\020\f\032\bcategory \000(\0000\000B\t\346\213\211\346\235\206\347\256\261R\fvarchar(200)\n\'\b\006\020\f\032\005brand \000(\0000\000B\006RIMOWAR\fvarchar(100)\nG\b\a\020\f\032\004spec \000(\0000\000B\'{\"\351\242\234\350\211\262\": \"\347\272\242\350\211\262\", \"\345\260\272\347\240\201\": \"26\345\257\270\"}R\fvarchar(200)\n\032\b\b\020\004\032\006status \000(\0000\000B\0011R\003int\n6\b\t\020]\032\vcreate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime\n6\b\n\020]\032\vupdate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime\022&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\00510001R\006bigint\022d\b\001\020\f\032\005title \000(\0000\000BCRIMOWA 21\345\257\270\346\211\230\350\277\220\347\256\261\346\213\211\346\235\206\347\256\261 SALSA AIR\347\263\273\345\210\227\346\236\234\347\273\277\350\211\262 820.70.36.4R\fvarchar(264)\022)\b\002\020\f\032\004name \000(\0000\000B\tSALSA AIRR\fvarchar(128)\022)\b\003\020\373\377\377\377\377\377\377\377\377\001\032\005price \000(\0010\000B\00528900R\006bigint\022\226\001\b\004\020\f\032\005image \000(\0000\000Buhttps://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webpR\fvarchar(200)\022-\b\005\020\f\032\bcategory \000(\0000\000B\t\346\213\211\346\235\206\347\256\261R\fvarchar(200)\022\'\b\006\020\f\032\005brand \000(\0000\000B\006RIMOWAR\fvarchar(100)\022G\b\a\020\f\032\004spec \000(\0010\000B\'{\"\351\242\234\350\211\262\": \"\347\272\242\350\211\262\", \"\345\260\272\347\240\201\": \"33\345\257\270\"}R\fvarchar(200)\022\032\b\b\020\004\032\006status \000(\0000\000B\0011R\003int\0226\b\t\020]\032\vcreate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime\0226\b\n\020]\032\vupdate_time \000(\0000\000B\0232019-05-01 00:00:00R\bdatetime"
],raw=false,rawEntries=[]]
我们再去访问商品数据的展示页面查看数据是否被修改了
访问 uri:http://localhost/item.html?id=10001#index
可以看到这个页面的数据也被修改了说明缓存同步成功了
# 八、总结多级缓存架构🎄
有一个页面叫 item.html 放到了 windows 的 nginx 服务器上。它的 主要作用就是一个静态资源服务器和反向代理服务器。当用户通过浏览器来请求它时它就可以把页面返回给用户而浏览器在渲染时发现缺少数据就会发送 ajax 请求来查询数据。查询地址就是 item/10001 这时的 nginx 反向代理服务器不回去处理业务,只是做反向代理,所以它会把请求代理给 OPenResty 集群,想要去查询数据又有先去本地如果有则直接返回,本地共享词典数据只能在当前 nginx 中使用。如果我们部署成 OPenResty 集群它们之间的内存是不共享的,所以这里也可以采用通过 id 计算 hash 来路由指定集群服务器的方式这样就会保证 OPenResty 缓存一直生效。本地缓存如果未命中则去查询 redis 缓存,命中则返回,未命中则查询 tomcat。查询 tomcat 也是集群也做了 jvm 进程缓存,tomcat 服务器之间是不共享内存的所以还需要使用通过 id 计算 hash 值的方式来路由指定 tomcat 服务器
那么多级缓存完成后就会面临一个问题就是数据同步的问题。
mysql 的数据进行了修改 redis 和 tomcat 进程缓存和 nginx 缓存都需要做同步
在 OPenResty 中使用的超时同步,设置过期时间过期自动删除就变成新数据了,这种方式适合于更新频率较低的数据
而 redis 与 mysql 之间的数据同步采用了 canal 监听 mysql 的方式
# 九、服务异步通讯🎄
高级篇 - rabbitmq 的高级特性
我们已经学习过它的基本用法了,学习了如何利用 springamqp 收和发消息,但是收和发消息只是 mq 的最基本的功能。因为在收和发消息的过程当中还有很多问题需要思考需要去解决这时就需要使用 mq 的高级特性去解决了
MQ 的一些常见问题
学习内容:
- 消息可靠性
- 死信交换机
- 惰性队列
- MQ 集群
# 9.1、消息可靠性问题🌳
消息从生产者发送到 exchange,再到 queue,再到消费者,有哪些导致消息丢失的可能性?
先来温习一下 mq 的整个流程
消息的发送者称为 publisher,它把消息投递给交换机 exchange,而交换机会根据 routingkey 将消息路由到队列 queue 中,而队列再把消息投递给消费者 consumer
在整个这样的流程当中消息可能会发生丢失
- 发送时丢失:
- 生产者发送的消息未送达 exchange
- 消息到达 exchange 后未达到 queue
- MQ 宕机,queue 将消息丢失
- consumer 接收到消息后未消息就宕机
针对上述问题我们来学习如下内容:
- 生产者消息确认
- 消息持久化
- 消费者消息确认
- 消费失败重试机制
# 9.1.1、生产者确认机制🌲
RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。消息发送到 MQ 以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回 ack
- 消息未投递到交换机,返回 nack
- publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回 ACK,及路由失败原因。
<font color='red'> 注意 </font>:
确认机制发送消息时,需要给每个消息设置一个全局唯一 id,以区分不同消息,避免 ack 冲突
引入 demo 工程
首先,需要引入项目:https://gitee.com/doukaixin/typora.git
拉取分支:mq-advanced-demo 代码演示
# 9.2、SpringAMQP 实现生产者确认🌳
1、在 publisher 这个微服务的 applicaiton.yml 中添加配置:
spring: | |
rabbitmq: | |
host: 192.168.249.128 # rabbitMQ 的 ip 地址 | |
port: 5672 # 端口 | |
username: itcast | |
password: 123321 | |
virtual-host: / | |
#------------------------------------ | |
publisher-confirm-type: correlated | |
publisher-returns: true | |
template: | |
mandatory: true |
配置说明:
- publishe-confirm-type:开启 publisher-confirm,这里支持两种类型:
- simple:同步等待 confirm 结果,直到超时
- correlated:异步回调,定义 ConfirmCallback,MQ 返回结果时会回调这个 ConfirmCallback
- publish-returns:开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallback
- template.mandatory:定义消息路由失败时的策略。true,则调用 ReturnCallback; false:则直接丢弃消息
2、每个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目启动过程中配置:
@Slf4j | |
@Configuration | |
// 实现 Spring 工厂的通知 | |
public class CommonConfig implements ApplicationContextAware { | |
// 重写 Bean 工厂准备好后调用的函数 | |
@Override | |
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { | |
// 取出 RabbitTempalte 的 Bean | |
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); | |
// 配置 ReturnCallback | |
rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { | |
// 记录日志 {} 为占位符 | |
log.error("消息发送到队列失败,响应码:{},失败原因:{}," + | |
"交换机:{},路由key:{},消息:{}", | |
i, s, s1, s2, message.toString()); | |
// 如果有需要的话,重发消息 | |
}); | |
} | |
} |
returncallback 是 exchange 路由不到 queue 时,才触发的回调
confirmcallback 是在消息达不到交换机时,才回调
3、发送消息,指定消息 ID,消息 ConfirmCallback
先使用 test 代码进行一下测试
发送消息的代码中,有 交换机,但是 Routingkey 不存在,所以我们需要去创建一下
点击 bind
但是!在发送消息的测试代码中以前是那样写的,但是现在要做消息确认机制所以就要多传递一个参数了
测试代码:
@Slf4j | |
@RunWith(SpringRunner.class) | |
@SpringBootTest | |
public class SpringAmqpTest { | |
@Autowired | |
private RabbitTemplate rabbitTemplate; | |
@Test | |
public void testSendMessage2SimpleQueue() throws InterruptedException { | |
String routingKey = "simple.test"; | |
// 1. 准备消息 | |
String message = "hello, spring amqp!"; | |
// 1.1. 准备 CorrelationData | |
// 1.2. 消息 ID | |
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); | |
// 1.3. 准备 ConfirmCallback | |
// 成功的回调函数 | |
correlationData.getFuture().addCallback(confirm -> { | |
// 判断结果 | |
if(confirm.isAck()) | |
{ | |
// ACK | |
log.debug("消息成功投递到交换机 !消息ID: {}", correlationData.getId()); | |
}else | |
{ | |
// NACK | |
log.error("消息投递到交换机失败了 ! 消息ID: {}", correlationData.getId()); | |
// 重发消息 | |
} | |
// 失败的回调函数 | |
}, throwable -> { | |
// 记录日志 | |
log.error("消息发送失败 ! ", throwable); | |
// 重发消息 | |
}); | |
// 2. 发送消息 | |
rabbitTemplate.convertAndSend("amq.topic", routingKey, message,correlationData ); | |
} | |
} |
运行这个测试代码
idea 控制台打印信息如下:
消息成功投递到交换机 !消息ID: a4abf2f4-85f5-4fdc-bb73-7be9ab91b6bd
查看消息队列是否接受到了一个消息并查看内容
总结:
SpringAMQP 中处理消息确认的几种情况:
- publisher-confirm
- 消息成功发送到 exchange,返回 ack
- 消息发送失败,没有到达交换机,返回 nack
- 消息发送过程中出现异常,没有收到回执
- 消息成功发送到 exchange,但没有路由到 queue
- 调用 ReturnCallback
以上就是生产确认的机制了。通过这些机制就能够确认消息 能够到达消息队列
# 9.3、消息持久化🌳
由于 RabbitMQ 默认是内存存储,如果此时 mq 发生了宕机,消息也是有可能丢失的。要想让消息真正安全,我们必须确保这个消息能够做到持久化。也就是把它写入到磁盘中
在消费者的 config 类中进行配置
1、交换机持久化:
@Bean | |
public DirectExchange simpleExchange() | |
{ | |
// 三个参数:交换机名称,是否持久化,当没有 queue 与其绑定时是否自动删除 | |
return new DirectExchange("simple.direct", true, false); | |
} |
2、队列持久化
@Bean | |
public Queue simpleQueue() | |
{ | |
// 使用 QueueBuilder 构建队列,durable 就是持久化的 | |
return QueueBuilder.durable("simple.queue").build(); | |
} |
启动服务进行测试,结果如下:
队列
其中的 D 就是 durable 就是持久化的意思
交换机
关闭所有服务器
发送一下消息进行测试一下消息是否也能持久化
我们向 simple.queue 中发送一条消息
此时队列中就会存留一条消息
那么现在我们队列里面有消息了,并且队列已经持久化了,我们再去重启 mq 来测试一下
[root@localhost nginx]# docker restart mq | |
mq | |
[root@localhost nginx]# |
回到页面中进行查看
可以看到消息 没了!
这就证明消息没有持久化,那我们的目的是让消息也能持久啊,那怎么办?
3、消息持久化,SpringAMQP 中的消息默认是持久的,可以通过 MessageProperties 中的 DeliveryMode 来指定:
在测试代码中进行编写:
@Test | |
public void testDurableMessage() | |
{ | |
// 1. 准备消息 | |
Message message = MessageBuilder.withBody("hello, spring". | |
getBytes(StandardCharsets.UTF_8)) // 消息体 | |
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 | |
.build(); | |
// 2. 发送消息 | |
rabbitTemplate.convertAndSend("simple.queue", message); | |
} |
启动成功后查看队列情况:
可以看到队列中就有了一条消息了
在队列的详情页面里面的 get message 中可以看到 delivery_mode:2 表示消息持久化
重启 mq 测试消息是否真的就是持久化了:
[root@localhost nginx]# docker restart mq | |
mq | |
[root@localhost nginx]# |
访问 mq 队里情况:
可以看到这条消息还存在着
# 9.4、消费者确认🌳
RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向 MQ 发送 ack 回执,MQ 收到 ack 回执后才会删除消息。而 SpringAMQP 则允许配置三种确认模式:
- manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack
- auto:自动 ack,由 Spring 检测 listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回 nack
- none:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除
配置方式是修改 application.yml 文件,添加下面配置:
spring: | |
rabbitmq: | |
host: 192.168.249.128 # rabbitMQ 的 ip 地址 | |
port: 5672 # 端口 | |
username: itcast | |
password: 123321 | |
virtual-host: / | |
listener: | |
simple: | |
prefetch: 1 | |
acknowledge-mode: auto # 自动 ack |
先看下 mq 的队列情况
队列中有一条消息
我们在 consumer 的消费代码编写一个异常
@Slf4j | |
@Component | |
public class SpringRabbitListener { | |
@RabbitListener(queues = "simple.queue") | |
public void listenSimpleQueue(String msg) { | |
* System.out.println("消费者接收到simple.queue的消息:【" + msg + "】"); | |
System.out.println(1/0); | |
log.info("消费者处理消息成功 !"); | |
} | |
} |
配置完成后启动 consumer 服务进行测试
此时程序走到 debug 处,没有执行完毕我们看下现在的队列情况
可以看到它没有删除这条消息而是标记为了 Unacked
如果程序不进行一直等待则会变回原来的状态
java 代码程序往下走就会抛出异常,但是它会进行重新投递
但是我们并不希望它就这样失败了就无脑重试那怎么办呢?看下面的内容。
# 9.5、失败重试机制🌳
当消费者出现异常后,消费会不断 requue (重新入队) 到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,导致 mq 的消息处理飙升,带来不必要的压力:
我们可以利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。
当然本地重试也不是无限让它一直重试
在消费者服务中进行配置:
spring: | |
rabbitmq: | |
host: 192.168.249.128 # rabbitMQ 的 ip 地址 | |
port: 5672 # 端口 | |
username: itcast | |
password: 123321 | |
virtual-host: / | |
listener: | |
simple: | |
prefetch: 1 | |
acknowledge-mode: auto | |
#------------------------------------------------------ | |
retry: | |
enabled: true # 开启消费者失败重试 | |
initial-interval: 1000 # 初始的失败等待时长为 1 秒 | |
multiplier: 3 # 下次失败的等待时长倍数,下次等待时长 = multiplier *last-interval | |
max-attempts: 4 # 最大重试次数 | |
stateless: true # true 无状态;false 有状态。如果业务中包含事务,这里改为 false |
启动消费者服务后进行测试 idea 控制台打印如下:
报了一个异常,说重试次数耗尽了
mqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:45) ~[spring-rabbit-2.2.15.RELEASE.jar:2.2.15.RELEASE]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.lambda$createRecoverer$0(StatelessRetryOperationsInterceptorFactoryBean.java:74) ~[spring-rabbit-2.2.15.RELEASE.jar:2.2.15.RELEASE]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:141) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.5.RELEASE.jar:na]
这条消息就会被丢弃掉
# 9.5.1、消费者失败消息处理策略🌲
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现:
1、RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
2、ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
3、RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
测试下 RepublishMessageRcoverer 处理模式:
1、首先,定义接收失败消息的交换机,队列及其绑定关系:
2、然后定义 RepublishMessageRecoverer:
@Configuration | |
public class ErrorMessageConfig { | |
@Bean | |
public DirectExchange errorMessageExchange() | |
{ | |
return new DirectExchange("error.direct"); | |
} | |
@Bean | |
public Queue errorQueue() | |
{ | |
return new Queue("error.queue"); | |
} | |
@Bean | |
public Binding errorMessageBinding() | |
{ | |
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error"); | |
} | |
@Bean | |
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) | |
{ | |
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); | |
} | |
} |
启动消费者服务查看 mq 的交换机和队列情况
发送一条消息进行测试,找到 simple.queue 进行发送消息
idea 控制台打印情况
消费者接收到simple.queue的消息:【hello spring !】
消费者接收到simple.queue的消息:【hello spring !】
消费者接收到simple.queue的消息:【hello spring !】
10:30:18:538 WARN 6996 --- [ntContainer#0-1] o.s.a.r.retry.RepublishMessageRecoverer : Republishing failed message to exchange 'error.direct' with routing key error
然后再查看队列情况
在 error.queue 队列的详细消息中可以看到报错的信息
总结:
如何确保 RabbitMQ 消息的可靠性?
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为 auto,由 spring 确认消息处理成功后完成 ack
- 开启消费者失败重试机制,并设置 MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
# 十、死信交换机🎄
- 初识死信交换机
- TTL
- 延迟队列
# 10.1、初识死信交换机🌳
当一个队列中的消息满足下列情况之一时,可以称为 <font color='red'> 死信 (dead letter)</font>:
1、消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
2、消息是一个过期消息,超时无人消费
3、要投递的队列消息堆积满了,最早的消息可能成为死信
如果该队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为 <font color='red'> 死信交换机 </font>. (Dead Letter Exchange , 简称 DLX)。
如果有一个消息发送到了消费者,假设消费者采用默认的重试机制不断重试直到次数耗尽就会将消息拒绝,而消息一旦被拒绝默认情况下就会被丢弃了。
如果说不想丢失就必须给 simple.queue 这个队列绑定死信交换机,这时消息就不会被丢弃了而是变成一个死信再回到队列,队列再投递到死信交换机。
交换机不能存储消息那么为了保证消息不丢失,还需要绑定一个队列,这个消息就会到达 dl.queue 队列中称为死信队列
这里与上述的 消费者失败消息处理策略 一样,但是还是有差异的,就是在上述方式中,所有的失败消息都是有消费者来做投递的。而在现在的方式中是由队列做投递的
如果只是做消息的兜底处理建议使用上述方式,如果是做消息兜底处理以及额外的功能建议使用当前方式
总结:
什么样的消息会成为死信?
1、消息被消费者 reject 或者返回 nack
2、消息超时未消费
3、队列满了
如何给队列绑定死信交换机?
1、给队列设置 dead-letter-exchange 属性,指定一个交换机
2、给队列设置 dead-letter-routing-key 属性,设置死信交换机与死信队列的 RoutingKey
# 10.2、TTL🌳
TTL,也就是 Time-To-Live。如果一个队列中的消息 TTL 结束仍未消费,则会变为死信,TTL 超时分为两种情况:
1、消息所在的队列设置了存活时间
2、消息本身设置了存活时间
假如一个消息自己设置了时间为 5000 毫秒到消息发出去,到达了队列的时候就会开始计时,计时结束后这个消息就会成为死信,从而投递到死信交换机 最终到达了死信队列。这时如果恰好有一个消费者在监听死信队列它就会收到这个死信消息
我们声明一组死信交换机和队列,基于注解方式:
@RabbitListener(bindings = @QueueBinding( | |
value = @Queue(name = "dl.queue", durable = "true"), | |
exchange = @Exchange(name = "dl.direct"), | |
key = "dl" | |
)) | |
public void listenDlQueue(String msg) | |
{ | |
log.info("消费者接收到了dl.queue的延迟消息"); | |
} |
要给队列设置超时时间,需要在声明队列的类中编写如下代码:
@Configuration | |
public class TTLMessageConfig { | |
@Bean | |
public DirectExchange ttlDirectExchange() | |
{ | |
return new DirectExchange("ttl.direct"); | |
} | |
@Bean | |
public Queue ttlQueue() | |
{ | |
return QueueBuilder | |
.durable("ttl.queue") // 指定队列名称,并持久化 | |
.ttl(10000) // 设置队列的超时时间,10 秒 | |
.deadLetterExchange("dl.direct") // 指定死信交换机 | |
.deadLetterRoutingKey("dl") // 指定死信 RoutingKey | |
.build(); | |
} | |
@Bean | |
public Binding ttlBinding() | |
{ | |
return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl"); | |
} | |
} |
编写发布者代码:
@Test | |
public void testTTLMessage() | |
{ | |
// 1. 准备消息 | |
Message message = MessageBuilder.withBody("hello, ttl message". | |
getBytes(StandardCharsets.UTF_8)) | |
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) | |
.build(); | |
// 2. 发送消息 | |
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message); | |
// 3. 记录日志 | |
log.info("消息已经成功发送了 !"); | |
} |
启动服务进行测试:
消息发布者发送出去的时间为:46
消息接收者接收到消息时间为:56
延迟了 10 秒钟的时间,这样延迟消息就实现了
上述是基于队列实现的延迟消息,下面来学习给消息设置延迟时间
发送消息时,给消息本身设置超时时间:
@Test | |
public void testTTLMessage() | |
{ | |
// 1. 准备消息 | |
Message message = MessageBuilder.withBody("hello, ttl message". | |
getBytes(StandardCharsets.UTF_8)) | |
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) | |
.setExpiration("5000") // 设置消息的延迟时间为 5 秒 | |
.build(); | |
// 2. 发送消息 | |
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message); | |
// 3. 记录日志 | |
log.info("消息已经成功发送了 !"); | |
} |
启动服务器测试查看消息延迟多久时间:
发送者发送消息时间为:16
接受者收到消息时间为:21
消息延迟了 5 秒的时间,当队列与消息本身都设置了延迟时间时,以最短的时间为准。
总结:
消息超时的两种方式是?
1、给队列设置 ttl 属性,进入队列后超过 ttl 时间的消息变为死信
2、给消息设置 ttl 属性,队列接收到消息超过 ttl 时间后变为死信
3、两者共存时,以时间短的 ttl 为准
如何实现发送一个消息 20 秒后消费者才收到消息?
1、给消息的目标队列指定死信交换机
2、消费者监听与死信交换机绑定的队列
3、发送消息时给消息设置 tll 为 20 秒
下面学习延迟队列的插件来实现延迟消息
# 10.3、延迟队列🌳
利用 TTL 结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为 <font color='red'> 延迟队列 (Delay Queue) 模式 </font>.
延迟队列的使用场景包括:
1、延迟发送短信
2、用户下单,如果用户在 15 分钟内未支付,则自动取消
3、预约工作会议,20 分钟后自动通知所有参会人员
# 10.3.1、延迟队列插件🌲
因为延迟队列的需求非常多,所以 RabbitMQ 的官方也推出了一个插件,原生支持延迟队列效果。
详细安装查看文章:RabbitMQ 部署指南.
# 10.3.2、SpringAMQP 使用延迟队列插件🌲
DelayExchange 的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定 delayed 属性为 true 即可
基于注解方式:
@RabbitListener(bindings = @QueueBinding( | |
value = @Queue(name = "delay.queue", durable = "true"), | |
exchange = @Exchange(name = "delay.direct", delayed = "true"), | |
key = "delay" | |
)) | |
public void listenDelayExchange(String msg) | |
{ | |
log.info("消费者接收到了delay.queue的延迟消息 , {}", msg); | |
} |
先启动消费者服务让它先等着
查看 mq 的交换机情况:
然后我们向这个 delay 为 true 的交换机中发送消息,一定要给消息添加一个 header: x-delay,值为延迟的时间,单位为毫秒:
@Test | |
public void testSndDelayMessage() throws InterruptedException { | |
Message message = MessageBuilder.withBody("hello, ttl message". | |
getBytes(StandardCharsets.UTF_8)) | |
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) | |
.setHeader("x-delay", 5000) // 设置消息的延迟时间为 5 秒 | |
.build(); | |
// 1.1. 准备 CorrelationData | |
// 1.2. 消息 ID | |
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); | |
// 2. 发送消息 | |
rabbitTemplate.convertAndSend("delay.direct", "delay", message,correlationData ); | |
log.info("消息发送成功"); | |
} |
再启动发布者测试代码进行发布:
发布后测试代码会提示一个错误但是这个错误是没有问题的,因为延迟它会误以为需要重试,就会触发重发机制。但是我们不需要对延迟消息进行重试,所以我们需要对其做一下判断
ERROR 17588 --- [nectionFactory1] cn.itcast.mq.config.CommonConfig : 消息发送到队列失败,响应码:312,失败原因:NO_ROUTE,交换机:delay.direct,路由key:delay,消息:(Body:'[B@71c09e9d(byte[18])' MessageProperties [headers={spring_returned_message_correlation=b85f2836-22b6-406e-9e49-d5f44a07af32}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, receivedDelay=5000, deliveryTag=0])
对 receivedDelay 进行判断是否存在值
@Slf4j | |
@Configuration | |
// 实现 Spring 工厂的通知 | |
public class CommonConfig implements ApplicationContextAware { | |
// 重写 Bean 工厂准备好后调用的函数 | |
@Override | |
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { | |
// 取出 RabbitTempalte 的 Bean | |
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); | |
// 配置 ReturnCallback | |
rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { | |
// 判断是否是延迟消息 | |
if(message.getMessageProperties().getReceivedDelay() > 0) | |
{ | |
// 是一个延迟消息,忽略这个错误提示 | |
return; | |
} | |
// 记录日志 {} 为占位符 | |
log.error("消息发送到队列失败,响应码:{},失败原因:{}," + | |
"交换机:{},路由key:{},消息:{}", | |
i, s, s1, s2, message.toString()); | |
// 如果有需要的话,重发消息 | |
}); | |
} | |
} |
在运行发布者测试代码后打印如下:
可以看到就没有错误提示了,也就解决了明明没有错误却重试的问题了
16:22:35:703 INFO 2568 --- [ main] cn.itcast.mq.spring.SpringAmqpTest : 消息发送成功
总结:
延迟队列插件的使用步骤包括哪些?
1、声明一个交换机,添加 delayed 属性为 true
2、发送消息时,添加 x-delay 头,值为超时时间
# 十一、惰性队列🎄
- 消息堆积问题
- 惰性队列
# 11.1、消息堆积问题🌳
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是 <font color='red'> 消息堆积 </font > 问题。
解决消息堆积有三种思路:
1、增加更多消费者,提高消费速度
2、在消费者内开启线程池加快消息处理速度
3、扩大队列容积,提高堆积上限
# 11.2、惰性队列🌳
从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的概念,也就是 <font color='red'> 惰性队列 </font>.
惰性队列的特征如下:
1、接受到消息后直接存入磁盘而非内存
2、消费者要消费时才会从磁盘中读取并加载到内存
3、支持数百万条的消息存储
而要设置一个队列为惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可。可以通过命令行将一个运行中的队列修改为惰性队列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode": "lazy"}' --apply-to queues |
用 SpringAMQP 声明惰性队列分两种方式:
@Bean 方式
@Configuration | |
public class LazyConfig { | |
@Bean | |
public Queue lazyQueue() | |
{ | |
return QueueBuilder | |
.durable("lazy.queue") | |
.lazy() // 开启 x-queue-mode 为 lazy | |
.build(); | |
} | |
} |
注解方式:
@RabbitListener(queuesToDeclare = @Queue( | |
name = "lazy.queue", | |
durable = "true", | |
"arguments" = @Argument(name = "x-queue-mode", value = "lazy") | |
)) | |
public void listenLazyQueue(String msg) | |
{ | |
log.info("接收到 lazy.queue的消息: {}" , msg); | |
} |
编写测试代码进行百万消息测试:
先对 delay.direct 消息队列进行百万消息的测试
@Test | |
public void testLazyQueue() throws InterruptedException { | |
for(int i = 0;i < 1000000;i ++) | |
{ | |
// 1. 准备消息 | |
Message message = MessageBuilder.withBody("hello, ttl message". | |
getBytes(StandardCharsets.UTF_8)) | |
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) | |
.build(); | |
// 2. 发送消息 | |
rabbitTemplate.convertAndSend("delay.direct", message); | |
} | |
} |
可以看到它没有存储磁盘而却呢爆红了
下面测试惰性队列的情况:
@Test | |
public void testLazyQueue() throws InterruptedException { | |
for(int i = 0;i < 1000000;i ++) | |
{ | |
// 1. 准备消息 | |
Message message = MessageBuilder.withBody("hello, ttl message". | |
getBytes(StandardCharsets.UTF_8)) | |
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) | |
.build(); | |
// 2. 发送消息 | |
rabbitTemplate.convertAndSend("lazy.queue", message); | |
} | |
} |
可以看到消息都存储到了磁盘中,这样性能就会比较好
总结:
消息堆积问题的解决方案?
1、队列上绑定多个消费者,提高消费速度
2、给消费者开启线程池,提高消费速度
3、使用惰性队列,可以再 mq 中保存更多消息
惰性队列的优点有哪些?
1、基于磁盘存储,消息上限高
2、没有间歇性的 page-out,性能比较稳定
惰性队列的缺点有哪些?
1、基于磁盘存储,消息时效性会降低
2、性能受限于磁盘的 IO
# 十二、MQ 集群🎄
- 集群分类
- 普通集群
- 镜像集群
- 仲载队列
# 12.1、集群分类🌳
RabbitMQ 是基于 Erlang 语言编写,而 Erlang 又是一个面向并发的语言,天然支持集群模式,搭建比较简单。RabbitMQ 的集群有两种模式:
1、普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
2、镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在 RabbitMQ 的 3.8 版本以后,推出了新的功能:仲载队列 来代替镜像集群,底层采用 Raft 协议确保主从的数据一致性。
# 12.1.1、普通集群🌲
普通集群,或者叫标准集群 (classic cluster) ,具备下列特征:
- 会在集群的各个节点间共享部分数据, 包括:交换机,队列元信息。不包含队列中的消息。
比如说有三台 RabbitMQ,然后创建一个交换机
由于交换机可以在各个节点间共享,所以将来这三个节点都能看到
但是,现在要创建一个队列这个队列叫 queue1 它是在第一个交换机上声明的,此时队列是不共享的。但是有队列元信息,比如说 queue1 它会有其它节点上有 queue1 的名字,位置等信息。相当于是一个引用
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
比如说现在有一个消费者绑定到了 queue1 但是在访问的时候不小心访问到了第三个节点。这个节点上没有 queue1 但是有 queue1 的元信息,当获取数据的时候就会根据第三节点有 queue1 的位置信息来找到 queue1 然后拿到数据
- 队列所在节点宕机,队列中的消息就会丢失
比如说 queue1 挂了,因为第三节点和 queue1 是引用关系再来取数据的时候就取不到了
# 12.1.1.1、搭建普通集群🌴
搭建普通集群可以参考文章:RabbitMQ 部署指南.
# 12.1.2、镜像集群🌲
比方说,三个节点,交换机都能看到。那队列呢这时就要看你是不是镜像了。比方说在 queue1 这个节点上创建了一个队列。因为是在节点 1 上创建的 queue1 队列所以节点 1 就是 queue1 的主节点,然后我们还可以挑出一个镜像节点。比如说在节点二上做一个镜像,那么节点二就会找节点一去同步 queue1 的所有数据。这样它俩的数据就共享了。
而主节点和镜像节点是可以互相备份的
镜像集群:本质是主从模式,具备下面的特征:
1、交换机,队列,队列中的消息会在各个 mq 的镜像节点之间同步备份。
2、创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
3、一个队列的主节点可能是另一个队列的镜像节点
4、所有操作都是主节点完成,然后同步给镜像节点
5、主宕机后,镜像节点会替代成新的主
比方说 queue1 它的镜像节点是在 queue2 上,queue2 这个队列是在节点 2 上的,但是它可以再节点 3 上备份,而 queue3 是在节点 3 上,而它的镜像节点是在节点 1 上。
# 12.1.2.1、搭建镜像集群🌴
搭建镜像集群可以参考文章:RabbitMQ 部署指南.
# 12.1.3、仲载队列🌲
仲载队列:仲载队列是在 3.8 版本以后才有的新功能,用来替代镜像队列,具备下列特征:
1、与镜像队列一样,都是主从模式,支持主从数据同步
2、使用非常简单,没有复杂的配置
3、主从同步基于 Raft 协议,强一致
# 12.1.3.1、仲载队列搭建🌴
仲载队列搭建可以参考文章:RabbitMQ 部署指南.