普通视图

发现新文章,点击刷新页面。
昨天以前Max的技术札记

说说 Redis pipeline

作者 Max Fang
2023年1月25日 15:29

Redis 客户端和服务端之间是采用 TCP 协议进行通信的,是基于 Request/Response 这种一问一答的模式,即请求一次响应一次。

普通模式

我们先来看下普通模式下,一条 Redis 命令的简要执行过程:

  • 客户端发送一条命令给 redis-server,阻塞等待 redis-server 应答
  • redis-server 接收到命令,执行命令
  • redis-server 将结果返回给客户端

下面我们来简要了解下一个完整请求的交互过程。

  1. 客户端调用 write () 将消息写入操作系统为 socket 分配的 send buffer 中
  2. 操作系统将 send buffer 中的内容发送到网卡,网卡通过网关路由把内容发送到服务器网卡
  3. 服务器网卡将接受到的消息写入操作系统为 socket 分配的 recv buffer
  4. 服务器进程调用 read () 从 recv buffer 中读取消息进行处理
  5. 处理完成之后,服务器调用 write () 将响应内容发送的 send buffer 中
  6. 服务器将 send buffer 中的内容通过网卡,发送到客户端
  7. 客户端操作系统将网卡中的内容放入 recv buffer 中
  8. 客户端进程调用 read () 从 recv buffer 中读取消息

普通模式的问题

我们来想一下,这种情况下可能导致什么问题。
如果同时执行大量的命令,那对于每一个命令,都要按上面的流程走一次,当前的命令需要等待上一条命令执行应答完毕之后,才会执行。这个过程中会有多次的 RTT ,也还会伴随着很多的 IO 开销,发送网络请求等。每条命令的发送和接收的过程都会占用两边的网络传输。
简单的来说,每个命令的执行时间 = 客户端发送耗时 + 服务器处理耗时 + 服务器返回耗时 + 一个网络来回耗时。
在这里,一个网络来回耗时(RTT) 是不好控制的,也是不稳定的。它的影响因素很多,比如客户端到服务器的网络线路是否拥堵,经过了多少跳。还有就是 IO 系统调用也是耗时的,一个 read 系统调用,需要从用户态,切换到内核态。上文我们讲述一个命令的请求过程时多次降到 read 和 write 系统调用。
可以说一个命令的执行时间,很大程度上受到它们的限制。

pipeline 模式

有没有什么方法来解决这种问题呢。
第一种方法,就是利用多线程机制,并行执行命令。
第二种方法,调用批量命令,例如 mget 等,一次操作多个键。
很多时候我们要执行的命令并不是一样的命令,而是一组命令,这个时候就无法使用类似 mget 这样的批量命令了。那还有其他的方法吗?
回想一下,我们初学编程的时候,老手都会告诉我们,不要在循环里面做查询。我有一个 books 列表数据,要根据 book_id 查询它们的 price,如果我们循环 books 列表,在每次循环里面取查询单个 book_id 的 price,那性能肯定是不理想的。一般我们的优化方式是将多个 book_id 取出来,一次性去查多个 book_id 的 price,这样性能就有明显的提示。即将多次小命令中的耗时操作合并到一次,从而减少总的执行时间。
类似的,Redis pipeline 出现了,一般称之为管道。它允许客户端一次可以发送多条命令,而不用像普通模式那样每次执行一个小命令都要等待前一个小命令执行完,服务器在接收到一堆命令后,会依次执行,然后把结果打包,再一次性返回给客户端。
这样可以避免频繁的命令发送,减少 RTT,减少 IO 调用次数 。前面已经介绍了,IO 调用会涉及到用户态和内核态之间的切换,在高性能的一些系统中,我们都是尽可能的减少 IO 调用。
简要流程如下图:

  • pipeline 的优点

    • 减少 RTT
    • 减少 IO 调用次数
  • 基本使用

    1
    2
    3
    4
    5
    6
    7
    Pipeline pipeline =jedis.pipelined();

    for(int i = 0; i < 100; i++){
    pipeline.rpush("rediskey", i + "");
    }

    pipeline.sync()

    总结一下 pipeline 的核心,就是 客户端将一组 Redis 命令进行组装,通过一次 RTT 发送给服务器,同时服务器再将这组命令的执行结果按照顺序一次返回给客户端

pipeline 注意问题

虽然 pipeline 在某些情况下会带来不小的性能提升,但是,我们在使用的时候也需要注意。

  • pipeline 中的命令数量不宜过多。

客户端会先将多个命令写入内存 buffer 中(打包),命令过多,如果是超过了客户端设置的 buffer 上限,被客户端的处理策略处理了(不同的客户端实现可能会有差异,比如 jedis pipeline ,限制每次最大的发送字节数为 8192,缓冲区满了就发送,然后再写缓冲,最后才处理 Redis 服务器的应答)。如果客户端没有设置 buffer 上限或不支持上限设置,则会占用更多的客户端机器内存,造成客户端瘫痪。官方推荐是每次 10k 个命令。
建议做好规范,遇到一次包含大量命令的 pipeline,可以拆分成多个稍小的 pipeline 来完成。

  • pipeline 一次只能运行在一个 Redis 节点上,一些集群或者 twemproxy 等中间件使用需要注意。

在集群环境下,一次 pipeline 批量执行多个命令,每个命令需要根据 key 计算槽位,然后根据槽位去特定的节点上去执行命令,这样一次 pipeline 就会使用多个节点的 redis 连接,这种当前也是不支持的。

  • pipeline 不保证原子性,如要求原子性,不建议使用 pipeline

它仅是将多个命令打包发送出去而已,如果中间有命令执行异常,也会继续执行剩余命令。

pipeline 与批量操作 mget 等区别

其实 meget 和 pipeline 优化的方向是一致的,即多个命令打包一次发送,减少网络时间。但是也是有区别的。

  • mget 等的场景是一个命令对应多个键值对,而 pipeline 一般是多条命令(不同的命令)
  • mget 操作是一个原子操作,而 pipeline 不是原子操作
  • mget 是服务端实现,而 pipeline 是客户端和服务端共同实现

pipeline 与事务的区别

这两者关注和解决的问题不是一个东西,原理也不一样。

  • pipeline 是一次请求,服务端顺序执行,一次返回。而事务是多次请求(先 multi,再多个操作命令,最后 exec),服务端顺序执行,一次返回
  • pipeline 关注的是 RTT 时间和 IO 调用,事务关注的是一致性问题

总结

本文主要讲了多命令执行时耗时问题,以及 pipeline 的解决方法,和其简单的原理,以及注意点。今天的学习就到这里,改天我们接着肝。

【Prometheus+Grafana 系列】基于 docker-compose 搭建

作者 Max Fang
2022年8月23日 22:40

前言

Prometheus

Prometheus 是有 SoundCloud 开发的开源监控系统和时序数据库,基于 Go 语言开发。通过基于 HTTP 的 pull 方式采集时序数据,通过服务发现或静态配置去获取要采集的目标服务器,支持多节点工作,支持多种可视化图表及仪表盘。
贴一下官方提供的架构图:
image.png

Pormetheus 几个主要模块有,Server,Exporters,Pushgateway,PromQL,Alertmanager,WebUI 等,主要逻辑如下:

  • Prometheus server 定期从静态配置的 targets 或者服务发现的 targets 拉取数据。
  • 当新拉取的数据大于配置内存缓存区时,Prometheus 会将数据持久化到磁盘(如果使用 remote storage 将持久化到云端)。
  • Prometheus 配置 rules,然后定时查询数据,当条件触发时,会将 alert 推送到配置的 Alertmanager。
  • Alertmanager 收到警告时,会根据配置,聚合、去重、降噪等操作,最后发送警告。
  • 可以使用 API,Prometheus Console 或者 Grafana 查询和聚合数据。

Grafana

Grafana 是一个开源的度量分析及可视化套件。通过访问数据库(如 InfluxDB、Prometheus),展示自定义图表。

Exporter

Exporter 是 Prometheus 推出的针对服务器状态监控的 Metrics 工具。目前开发中常见的组件都有对应的 exporter 可以直接使用。常见的有两大类,一种是社区提供的,包含数据库,消息队列,存储,HTTP 服务,日志等,比如 node_exporter,mysqld_exporter 等;还有一种是用户自定义的 exporter,可以基于官方提供的 Client Library 创建自己的 exporter 程序。
每个 exporter 的一个实例被称为 target,Prometheus 通过轮询的方式定期从这些 target 中获取样本数据。
image.png

原理简介

image.png

安装数据收集器 node-exporter

安装 node-exporter

1
2
3
4
5
cd /opt
wget https://github.com/prometheus/node_exporter/releases/download/v1.4.0-rc.0/node_exporter-1.4.0-rc.0.linux-amd64.tar.gz
tar xvf node_exporter-1.4.0-rc.0.linux-amd64.tar.gz
mv node_exporter-1.4.0-rc.0.linux-amd64 node_exporter
mv node_exporter /usr/local/

运行如下命令测试 node-exporter 收集器启动情况,正常情况下会输出服务端口。

1
/usr/local/node_exporter/node_exporter

添加到系统服务

vim /etc/systemd/system/node_exporter.service
添加如下内容

1
2
3
4
5
6
7
8
[Unit]
Description=mysqld_exporter
After=network.target
[Service]
ExecStart=/usr/local/node_exporter/node_exporter
Restart=on-failure
[Install]
WantedBy=multi-user.target

加载并重启服务

1
2
3
4
5
6
7
8
# 加载配置
systemctl daemon-reload
# 启动服务
systemctl restart node_exporter.service
# 查看服务状态
systemctl status node_exporter.service
# 配置开机启动
systemctl enable node_exporter.service

查看数据收集情况

重新起一个终端,查看数据收集情况。也可以在浏览器中查看。

1
curl http://127.0.0.1:9100/metrics

安装 prometheus 和 grafana

安装 docker&docker-compose

本文介绍的安装方法是基于 docker-compose 的,所以需要先安装相关 docker 环境。相关方法可以见笔者的其他文章,本文中不做详细介绍。

安装 prometheus 和 grafana

可以直接 clone 这个项目来快速搭建:
https://github.com/FX-Max/docker-install-everything/tree/master/prometheus

该项目是笔者弄的一个使用 docker-compose 搭建软件开发常见服务的项目,大家觉得有帮助,可以帮忙点个 star,感谢。

根据实际情况,修改 prometheus.yml 文件中的内容,将 ip 修改为上面安装了 node-exporter 的服务器 ip 即可。
然后在该目录下执行 docker-compose up -d即可,docker ps查看服务启动情况。

1
2
3
4
5
CONTAINER ID   IMAGE              COMMAND                  CREATED        STATUS        PORTS                                      NAMES
6f360e9ab242 grafana/grafana "/run.sh" 25 hours ago Up 25 hours 0.0.0.0:3000->3000/tcp, :::3000->3000/tcp grafana
97b92b65aca6 prom/prometheus "/bin/prometheus --c…" 25 hours ago Up 21 hours 0.0.0.0:9090->9090/tcp, :::9090->9090/tcp prometheus
3f5906f07bf6 prom/pushgateway "/bin/pushgateway" 25 hours ago Up 25 hours 0.0.0.0:9091->9091/tcp, :::9091->9091/tcp pushgateway
f556168c1b8b prom/alertmanager "/bin/alertmanager -…" 25 hours ago Up 25 hours 0.0.0.0:9093->9093/tcp, :::9093->9093/tcp alertmanager

docker-compose.yml 内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
version: "3"
services:
prometheus:
image: prom/prometheus
container_name: prometheus
user: root
# restart: always
ports:
- "9090:9090"
volumes:
- ./conf/prometheus:/etc/prometheus
- ./data/prometheus/prometheus_db:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
networks:
- net-prometheus

grafana:
image: grafana/grafana
container_name: grafana
user: root
# restart: always
ports:
- "3000:3000"
volumes:
#- ./conf/grafana:/etc/grafana
- ./data/prometheus/grafana_data:/var/lib/grafana
depends_on:
- prometheus
networks:
- net-prometheus

pushgateway:
image: prom/pushgateway
container_name: pushgateway
user: root
# restart: always
ports:
- "9091:9091"
volumes:
- ./data/prometheus/pushgateway_data:/var/lib/pushgateway

alertmanager:
image: prom/alertmanager
hostname: alertmanager
container_name: alertmanager
user: root
# restart: always
ports:
- "9093:9093"
volumes:
- ./data/prometheus/alertmanager_data:/var/lib/alertmanager

networks:
net-prometheus:

prometheus.yml 内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
global:
scrape_interval: 5s
evaluation_interval: 5s

external_labels:
monitor: 'dashboard'

alerting:
alertmanagers:
- static_configs:
- targets:
- "alertmanager:9093"

rule_files:
#- 'alert.rules'

scrape_configs:
- job_name: 'prometheus'
scrape_interval: 5s
static_configs:
- targets: ['prometheus:9090']

- job_name: node
static_configs:
- targets: ['192.168.0.103:9100','pushgateway:9091']

- job_name: 'mysql-131'
static_configs:
- targets: ['192.168.0.131:9104']
labels:
instance: mysql

查看 prometheus

访问 http://127.0.0.1:9090/targets,效果如下,上面我们通过 node_exporter 收集的节点状态是 up 状态。
image.png

配置 Grafana

访问 http://127.0.0.1:3000,登录 Grafana,默认的账号密码是 admin:admin,首次登录需要修改默认密码。
image.png

按照如下添加 data sources,将 prometheus 添加到 data sources 中。
image.png
image.png
image.png
添加 prometheus 服务地址,此处由于服务是基于 docker-compose 构建的,没有填写 ip,直接填写服务名即可。

添加监控模版

image.png

输入官方模版 id,1860,点击 load。然后按照下图选择确认即可。
image.png

导入成功后,会自动跳转到监控面板页面,如下图。
image.png

结语

本文简单介绍了 prometheus + grafana 服务搭建流程,初步跑通了整个服务。当然它还有很多功能,后续笔者会开新的文章来分享。

参考文档

官方模板库:https://grafana.com/grafana/dashboards/
node 模板:https://grafana.com/grafana/dashboards/1860
MySQL 模板:https://grafana.com/grafana/dashboards/7362
docker 搭建 prometheus&grafana:https://blog.51cto.com/keep11/4261521

简单易用的任务队列 -beanstalkd

作者 Max Fang
2022年7月15日 21:14

概述

beanstalkd 是一个简单快速的分布式工作队列系统,协议基于 ASCII 编码运行在 TCP 上。其最初设计的目的是通过后台异步执行耗时任务的方式降低高容量 Web 应用的页面延时。其具有简单、轻量、易用等特点,也支持对任务优先级、延时 / 超时重发等控制,同时还有众多语言版本的客户端支持,这些优点使得它成为各种需要队列系统场景的一种常见选择。

beanstalkd 优点

  • 如他官网的介绍,simple&fast,使用非常简单,适合需要引入消息队列又不想引入 kafka 这类重型的 mq,维护成本低;同时,它的性能非常高,大部分场景下都可以 cover 住。
  • 支持持久化
  • 支持消息优先级,topic,延时消息,消息重试等
  • 主流语言客户端都支持,还可以根据 beanstalkd 协议自行实现。

beanstalkd 不足

  • 无最大内存控制,当业务消息极多时,服务可能会不稳定。
  • 官方没有提供集群故障切换方案(主从或哨兵等),需要自己解决。

beanstalkd 重点概念

  • job

任务,队列中的基本单元,每个 job 都会有 id 和优先级。有点类似其他消息队列中的 message 的概念。但 job 有各种状态,下文介绍生命周期部分会重点介绍。job 存放在 tube 中。

  • tube

管道,用来存储同一类型的 job。有点类似其他消息队列中的 topic 的概念。beanstalkd 通过 tube 来实现多任务队列,beanstalkd 中可以有多个管道,每个管道有自己的 producer 和 consumer,管道之间互相不影响。

  • producer

job 生产者。通过 put 命令将一个 job 放入到一个 tube 中。

  • consumer

job 消费者。通过 reserve 来获取 job,通过 delete、release、bury 来改变 job 的状态。

beanstalkd 生命周期

上文介绍到,beanstalkd 中 job 有状态区分,在整个生命周期中,job 可能有四种状态:READY, RESERVED, DELAYED, BURIED。只有处于 READY 状态的 job 才能被消费。下图介绍了各状态之间的流转情况。

beanstalkd-status

producer 在创建 job 的时候有两种方式,put 和 put with delay(延时任务)。
如果 producer 使用 put 直接创建一个 job 时,该 job 就处于 READY 状态,等待 consumer 处理。
如果 producer 使用 put with delay 方式创建 job,该 job 的初始状态为 DELAYED 状态,等待延迟时间过后才变更为 READY 状态。
以上两种方式创建的 job 都会传入一个 TTR(超时机制),当 job 处于 RESERVED 状态时,TTR 开始倒计时,当 TTR 倒计时完,job 状态还没有改变,则会认为该 job 处理失败,会被重新放回到队列中。

consumer 获取到(reserve)一个 READY 状态的 job 之后,该 job 的状态就会变更为 RESERVED。此时,其他的 consumer 就不能再操作该 job 了。当 consumer 完成该 job 之后,可以选择 delete,release,或 bury 操作。

  • delete ,job 被删除,从 beanstalkd 中清除,以后也无法再获取到,生命周期结束。
  • release ,可以把该 job 重新变更为 READY 状态,使得其他的 consumer 可以继续获取和执行该 job,也可以使用 release with delay 延时操作,这样会先进入 DELAYED 状态,延迟时间到达后再变为 READY。
  • bury,可以将 job 休眠,等需要的时候,在将休眠的 job 通过 kick 命令变更回 READY 状态,也可以通过 delete 直接删除 BURIED 状态的 job 。

处于 BURIED 状态的 job,可以通过 kick 重回 READY 状态,也可以通过 delete 删除 job。

为什么设计这个 BURIED 状态呢?
一般我们可以用这个状态来做异常捕获,例如执行超时或者异常的 job,我们可以将其置为 BURIED 状态,这样做有几个好处:
1. 可以便面这些异常的 job 直接被放回队列重试,影响正常的队列消费(这些失败一次的 job,很有可能再次失败)。如果没有这个 BURIED 状态,如果我们要单独隔离,一般我们会使用一个新的 tube 单独存放这些异常的 job,使用单独的 consumer 消费。这样就不会影响正常的新消息消费。特别是失败率比较高的时候,会占用很多的正常资源。
2. 便于人工排查,上面已经讲到,可以将异常的 job 置为 BURIED 状态,这样人工排查时重点关注这个状态就可以了。

beanstalkd 特性

持久化

通过 binlog 将 job 及其状态记录到本地文件,当 beanstalkd 重启时,可以通过读取 binlog 来恢复之前的 job 状态。

分布式

在 beanstalkd 的文档中,其实是支持分布式的,其设计思想和 Memcached 类似,beanstalkd 各个 server 之间并不知道彼此的存在,是通过 client 实现分布式以及根据 tube 名称去特定的 server 上获取 job。贴一篇专门讨论 beanstalkd 分布式的文章,Beanstalkd 的一种分布式方案

任务延时

天然支持延时任务,可以在创建 job 时指定延时时间,也可以当 job 被处理完后才能后,消费者使用 release with delay 将 job 再次放入队列延时执行。

任务优先级

producer 生成的 job 可以给他分配优先级,支持 0 到 2^32 的优先级,值越小,优先级越高,默认优先级为 1024。优先级高的 job 会被 consumer 优先执行。

超时机制

为了防止某个 consumer 长时间占用 job 但无法处理完成的情况,beanstalkd 的 reserve 操作支持设置 timeout 时间 (TTR)。如果 consumer 不能在 TTR 内发送 delete、release 或 bury 命令改变 job 状态,那么 beanstalkd 会认为任务处理失败,会将 job 重新置为 READY 状态供其他 consumer 消费。
如果消费者已经预知可能无法在 TTR 内完成该 job,则可以发送 touch 命令,使得 beanstalkd 重新计算 TTR。

任务预留

有一个 BURIED 状态可以作为缓冲,具体特点见上文生命周期中关于 BURIED 状态的介绍。

安装及配置

以下以 ubuntu 为例,安转 beanstalkd:

1
2
sudo apt-get update
sudo apt-get install beanstalkd

1
2
3
vi /etc/sysconfig/beanstalkd
# 添加如下内容
BEANSTALKD_BINLOG_DIR=/data/beanstalkd/binlog

可以通过 beanstalkd 命令来运行服务,并且可以添加多种参数。命令的格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
beanstalkd [OPTIONS]

-b DIR wal directory
-f MS fsync at most once every MS milliseconds (use -f0 for "always fsync")
-F never fsync (default)
-l ADDR listen on address (default is 0.0.0.0)
-p PORT listen on port (default is 11300)
-u USER become user and group
-z BYTES set the maximum job size in bytes (default is 65535)
-s BYTES set the size of each wal file (default is 10485760)
(will be rounded up to a multiple of 512 bytes)
-c compact the binlog (default)
-n do not compact the binlog
-v show version information
-V increase verbosity
-h show this help

如下我们启动一个 beanstalkd 服务,并开启 binlog:
1
nohup beanstalkd -l 0.0.0.0 -p 11300 -b /data/beanstalkd/binlog/ &

beanstalkd 管理工具

官方推荐的一些管理工具:Tools
笔者常用的管理工具:https://github.com/ptrofimov/beanstalk_console
如果只是简单的操作和查看 beanstalkd,可以使用 telnet 工具,然后执行 stats,use,put,watch 等:

1
2
$ telnet 127.0.0.1 11300
stats

实际应用

beansralkd 有很多语言版本的客户端实现,官方提供了一些客户端列表beanstalkd 客户端列表
如果现有的这些库不满足需求,也可以自行实现,参考 beanstalkd 协议

以下以 go 为例,简单演示下 beanstalkd 常用处理操作。

1
go get github.com/beanstalkd/go-beanstalk

生产者

向默认的 tube 中投入 job:

1
2
3
4
5
id, err := conn.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {
panic(err)
}
fmt.Println("job", id)

向指定的 tube 中投入 job:
1
2
3
4
5
6
tube := &beanstalk.Tube{Conn: conn, Name: "mytube"}
id, err := tube.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {
panic(err)
}
fmt.Println("job", id)

消费者

消费默认的 tube 中的 job:

1
2
3
4
5
6
id, body, err := conn.Reserve(5 * time.Second)
if err != nil {
panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))

消费指定的 tube (此处指定多个) 中的 job:
1
2
3
4
5
6
7
tubeSet := beanstalk.NewTubeSet(conn, "mytube1", "mytube2")
id, body, err := tubeSet.Reserve(10 * time.Hour)
if err != nil {
panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))

beanstalkd 使用小 tips

  • 可以通过指定 tube ,在 put 的时候将 job 放入指定的 tube 中,否则会放入 default 的 tube 中。
  • beanstalkd 支持持久化,在启动时使用 -b参数来开启 binlog,通过binog 可以将 job 及其状态记录到文件里。当重新使用 -b 参数重启 beanstalkd,将读取 binlog 来恢复之前的 job 及状态。

参考资料

laravel 源码分析 - 队列 Queue

作者 Max Fang
2022年1月7日 15:28

laravel 源码分析具体注释见 https://github.com/FX-Max/source-analysis-laravel

前言

队列 (Queue) 是 laravel 中比较常用的一个功能,队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间。本文我们就来分析下队列创建和执行的源码。

本文笔者基于 laravel 5.8.* 版本

队列任务的创建

先通过命令创建一个 Job 类,成功之后会创建如下文件 laravel-src/laravel/app/Jobs/DemoJob.php。

1
2
3
> php artisan make:job DemoJob

> Job created successfully.

下面我们来分析一下 Job 类的具体生成过程。

执行 php artisan make:job DemoJob 后,会触发调用如下方法。

laravel-src/laravel/vendor/laravel/framework/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php

1
2
3
4
5
6
7
8
9
10
11
/**
* Register the command.
* [A] make:job 时触发的方法
* @return void
*/
protected function registerJobMakeCommand()
{
$this->app->singleton('command.job.make', function ($app) {
return new JobMakeCommand($app['files']);
});
}

接着我们来看下 JobMakeCommand 这个类,这个类里面没有过多的处理逻辑,处理方法在其父类中。

1
class JobMakeCommand extends GeneratorCommand

我们直接看父类中的处理方法,GeneratorCommand->handle(),以下是该方法中的主要方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
public function handle()
{
// 获取类名
$name = $this->qualifyClass($this->getNameInput());
// 获取文件路径
$path = $this->getPath($name);
// 创建目录和文件
$this->makeDirectory($path);
// buildClass() 通过模板获取新类文件的内容
$this->files->put($path, $this->buildClass($name));
// $this->type 在子类中定义好了,例如 JobMakeCommand 中 type = 'Job'
$this->info($this->type.' created successfully.');
}

方法就是通过目录和文件,创建对应的类文件,至于新文件的内容,都是基于已经设置好的模板来创建的,具体的内容在 buildClass($name) 方法中。

1
2
3
4
5
6
7
protected function buildClass($name)
{
// 得到类文件模板,getStub() 在子类中有实现,具体看 JobMakeCommand
$stub = $this->files->get($this->getStub());
// 用实际的 name 来替换模板中的内容,都是关键词替换
return $this->replaceNamespace($stub, $name)->replaceClass($stub, $name);
}

获取模板文件

1
2
3
4
5
6
protected function getStub()
{
return $this->option('sync')
? __DIR__.'/stubs/job.stub'
: __DIR__.'/stubs/job-queued.stub';
}

job.stub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php
/**
* job 类的生成模板
*/
namespace DummyNamespace;

use Illuminate\Bus\Queueable;
use Illuminate\Foundation\Bus\Dispatchable;

class DummyClass
{
use Dispatchable, Queueable;

/**
* Create a new job instance.
*
* @return void
*/
public function __construct()
{
//
}

/**
* Execute the job.
*
* @return void
*/
public function handle()
{
//
}
}

job-queued.stub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
/**
* job 类的生成模板
*/
namespace DummyNamespace;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class DummyClass implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

/**
* Create a new job instance.
*
* @return void
*/
public function __construct()
{
//
}

/**
* Execute the job.
*
* @return void
*/
public function handle()
{
//
}
}

下面看一下前面我们创建的一个 Job 类,DemoJob.php,就是来源于模板 job-queued.stub。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
/**
* job 类的生成模板
*/
namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class DemoJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

/**
* Create a new job instance.
*
* @return void
*/
public function __construct()
{
//
}

/**
* Execute the job.
*
* @return void
*/
public function handle()
{
//
}
}

至此,我们已经大致明白了队列任务类是如何创建的了。下面我们来分析下其是如何生效运行的。

队列任务的分发

任务类创建后,我们就可以在需要的地方进行任务的分发,常见的方法如下:

1
2
DemoJob::dispatch(); // 任务分发
DemoJob::dispatchNow(); // 同步调度,队列任务不会排队,并立即在当前进程中进行

下面先以 dispatch() 为例分析下分发过程。

1
2
3
4
5
6
7
trait Dispatchable
{
public static function dispatch()
{
return new PendingDispatch(new static(...func_get_args()));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class PendingDispatch
{
protected $job;

public function __construct($job)
{ echo '[Max] ' . 'PendingDispatch ' . '__construct' . PHP_EOL;
$this->job = $job;
}

public function __destruct()
{ echo '[Max] ' . 'PendingDispatch ' . '__destruct' . PHP_EOL;
app(Dispatcher::class)->dispatch($this->job);
}
}

重点是 app(Dispatcher::class)->dispatch($this->job) 这部分。

我们先来分析下前部分 app(Dispatcher::class),它是在 laravel 框架中自带的 BusServiceProvider 中向 $app 中注入的。

1
2
3
4
5
6
7
8
9
10
11
class BusServiceProvider extends ServiceProvider implements DeferrableProvider
{
public function register()
{
$this->app->singleton(Dispatcher::class, function ($app) {
return new Dispatcher($app, function ($connection = null) use ($app) {
return $app[QueueFactoryContract::class]->connection($connection);
});
});
}
}

看一下 Dispatcher 的构造方法,至此,我们已经知道前半部分 app(Dispatcher::class) 是如何来的了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Dispatcher implements QueueingDispatcher
{
protected $container;
protected $pipeline;
protected $queueResolver;

public function __construct(Container $container, Closure $queueResolver = null)
{
$this->container = $container;
/**
* Illuminate/Bus/BusServiceProvider.php->register()中
* $queueResolver 传入的是一个闭包
* function ($connection = null) use ($app) {
* return $app[QueueFactoryContract::class]->connection($connection);
* }
*/
$this->queueResolver = $queueResolver;
$this->pipeline = new Pipeline($container);
}

public function dispatch($command)
{
if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
// 将 $command 存入队列
return $this->dispatchToQueue($command);
}
return $this->dispatchNow($command);
}
}

BusServiceProvider 中注册了 Dispatcher::class ,然后 app(Dispatcher::class)->dispatch($this->job) 调用的即是 Dispatcher->dispatch()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public function dispatchToQueue($command)
{
// 获取任务所属的 connection
$connection = $command->connection ?? null;
/*
* 获取队列实例,根据 config/queue.php 中的配置
* 此处我们配置 QUEUE_CONNECTION=redis 为例,则获取的是 RedisQueue
* 至于如何通过 QUEUE_CONNECTION 的配置获取 queue ,此处先跳过,本文后面会具体分析。
*/
$queue = call_user_func($this->queueResolver, $connection);

if (! $queue instanceof Queue) {
throw new RuntimeException('Queue resolver did not return a Queue implementation.');
}
// 我们创建的 DemoJob 无 queue 方法,则不会调用
if (method_exists($command, 'queue')) {
return $command->queue($queue, $command);
}
// 将 job 放入队列
return $this->pushCommandToQueue($queue, $command);
}

protected function pushCommandToQueue($queue, $command)
{
// 在指定了 queue 或者 delay 时会调用不同的方法,基本大同小异
if (isset($command->queue, $command->delay)) {
return $queue->laterOn($command->queue, $command->delay, $command);
}

if (isset($command->queue)) {
return $queue->pushOn($command->queue, $command);
}

if (isset($command->delay)) {
return $queue->later($command->delay, $command);
}
// 此处我们先看最简单的无参数时的情况,调用 push()
return $queue->push($command);
}

笔者的配置是 QUEUE_CONNECTION=redis ,估以此来分析,其他类型的原理基本类似。

配置的是 redis 时, $queue 是 RedisQueue 实例,下面我们看下 RedisQueue->push() 的内容。

Illuminate/Queue/RedisQueue.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public function push($job, $data = '', $queue = null)
{
/**
* 获取队列名称
* var_dump($this->getQueue($queue));
* 创建统一的 payload,转成 json
* var_dump($this->createPayload($job, $this->getQueue($queue), $data));
*/
// 将任务和数据存入队列
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}

public function pushRaw($payload, $queue = null, array $options = [])
{
// 写入 redis 中
$this->getConnection()->eval(
LuaScripts::push(), 2, $this->getQueue($queue),
$this->getQueue($queue).':notify', $payload
);
// 返回 id
return json_decode($payload, true)['id'] ?? null;
}

至此,我们已经分析完了任务是如何被加入到队列中的。

AWS 修改 RDS 时区

作者 Max Fang
2021年7月18日 23:05

查看 RDS 当前时区

默认情况下,AWS 的 RDS 采用的是 UTC 时间。而我们地区一般位于东八区,因此我们本地的时间是 UTC+8。

连接到 RDS 上,查询当前实例的时区。

1
show variables where variable_name like 'time_zone';

显示的结果如下,表示当前 RDS 时区的 UTC。

time_zone UTC

调整 RDS 时区

RDS 的时区调整是通过调整参数组来操作的。AWS 的 RDS 是不允许修改 default 参数组的。因此先要确认下当前 RDS 采用的参数组是不是 default 参数组。如果是 default 参数组,则需要新建一个参数组。然后在该参数组上调整 timezone 相关参数,然后变更 RDS 使用的参数组,使用新的参数组。

从左侧的参数组菜单进入,即可新建参数组。一般我们都会从把当前在使用的参数组作为模版来复制一份新的来调整。
选择当前在使用的参数组,Actions->Copy 即可。以笔者测试为例,当前在使用的参数组为 pg-mysql57-demo ,复制过来的新的参数组为 pg-mysql57-demo-new 。

1

2

接下来就可以修改新的参数组的参数了,点击改参数组进入详情页面,搜索关键词 time_zone,然后点击 Modify 即可对参数进行修改,从可选值中找到我们需要的值,此处我们选择 Asia/Shanghai,最后确认变更即可。

3

4

再进入参数组,搜索 time_zone ,发现值已经修改为 Asia/Shanghai,说明已经修改完毕。

5

参数组调增完毕了,接下来就是给对应实例应用该参数组了。
进入到需要调整的 RDS ,在参数组配置中,选择新的参数组。确认修改后,系统会提示是否立即应用修改。可以根据实际情况选择立即修改或者下一次维护窗口。修改 time_zone 需要重启数据库实例,这里我们选择下一次停机窗口重启。

6

选择合适的时机,重启 RDS 即可。

验证修改生效

在 RDS 重启完毕之后,再次执行上面的查询时区的语句,显示的结果如下(Asia/Shanghai),表示时区已修改成功。

1
show variables where variable_name like 'time_zone';

#time_zone Asia/Shanghai


Happy Coding.

❌
❌