普通视图

发现新文章,点击刷新页面。
昨天以前Luyu Huang's Tech Blog

程序员是什么阶级

作者 Luyu Huang
2026年1月28日 00:00

我最近在读《毛泽东选集》。毛主席教导我们说,要将理论与实践相结合。那么就我所从事的行业,根据我观察到的事实,结合我学到的理论知识,分析一下程序员的阶级性。

传统制造业需要厂房和机器这样的生产资料,这些生产资料一般的工人不可能拥有。工人要生产,就必须被企业雇佣。 而软件开发却不同,基本上只需要一台普普通通的电脑,几千块钱,几乎人人都能拥有。也就是说,大多数情况下,一个程序员独立拥有开发软件的能力。事实上,早年也存在不少由一个或几个程序员开发的独立软件。那么从这个角度看,程序员应该是类似于小手工业者的小资产阶级。

但是我们身处一个资本主义高度发达的世界。十几年前智能手机的普及、几十年前个人电脑的普及,造就了一个巨大的软件消费市场。面对这个潜力巨大、尚未开发的市场,金融资本虎视眈眈。他们绝不会让独立软件开发者从两三人的小团队开始,通过挣得的利润慢慢扩大规模、完成原始积累。他们会大量砸钱,帮助创业者孵化项目,给创业公司估值,给它们投资,帮助它们快速扩大规模,为的是获得它们的股权,从而获得这个行业的利润——也就是产业工人的剩余价值。若不这样做,这个新兴市场的利润就与他们无关了。这样一来,软件行业迅速地产业化、规模化,诞生一大批颇具规模的软件公司。

具有一定规模的软件公司能够凭借规模效应,快速抢占市场。这样一来,独立开发者开发的产品难以同企业中几十人甚至几百人的团队开发的产品竞争。 如果某个品类尚无产品存在,拥有庞大资金的企业一定会迅速组建团队,以最快的速度研发出产品,以便快速抢占市场。如果独立开发者的创新产品开创了一个新品类,在市场上取得了一定成绩,那么企业也会立刻组建团队跟进,拆解、模仿、改进独立开发者的产品,推出自己的产品与之竞争。在大多数情况下,企业的产品会打败独立开发者的产品;或者企业直接出资收购独立开发者的团队。越大的企业,在竞争中往往越有优势。于是在短短二十多年的时间内,市场已经被各大企业瓜分完毕,独立开发者基本没有生存空间。

对于软件企业来说,要开发一款有竞争力的产品,需要组建一支几十人甚至几百人的团队,开发两三年甚至四五年,然后才能发行并盈利。这期间需要承担这支团队所有人的工资;且软件发行需要一大笔宣发费用。而且软件发行后不一定能盈利,这需要企业有一定的风险承担能力。所以,软件企业通常需要庞大的资本,这其中主要是用于购买劳动力的可变资本,而不是电脑、服务器这样的固定资本。

普通程序员虽然拥有开发软件的生产资料,但是基本上没什么用,只能被企业雇佣。这就像在工业时代,小手工业者,如木匠、裁缝,基本上没有独立生产的可能。他们要谋生,只能进厂打工,成为工人阶级。

在过去十几年的移动互联网浪潮中,有一部分程序员创业成功,赚了很多钱,实现阶级跃升,成为资产阶级。他们当中一小部分人当了老板;但是更多的人,虽然创业成功,但没能一直当老板。因为这是一个资本主义高度发达的世界,如果你的创业团队真的有潜力,金融资本会想方设法收购你的公司,反正他们有的是钱。如果你不想被收购,反而有倒闭的风险——可能有接受了投资的竞争对手开始迅速扩大规模,挤压你的市场。事实上很多创业者本质上就是炒作概念,讲故事,提高估值,然后卖掉套现离场。这些人能赚到一大笔钱,然后把钱投入房地产和金融市场,希望由此实现“财富自由”。但房价会下跌,投资可能会亏钱。如果投资赚的钱不够花,还得打工贴补一部分,就只是小资产阶级。如果投资不能稳定赚钱,那么也随时可能从资产阶级跌落。

在市场开发之初,垄断尚未形成,行业还存在大量创业机会。一旦创业成功,就能赚大钱,很多人都想找机会去创业。我称之为“创业机会主义”。再加上行业的快速发展与技术人才供应不足的矛盾,优秀程序员有一定的稀缺性。为了留住人才,让他们不要去创业单干,企业愿意同他们分享一部分利润。因此成功项目的程序员,往往能分到可观的奖金。这样一来,他们便具有小资产阶级的性质,因为分享了一部分利润。

但是事情正在起变化。随着产业的发展,垄断正在逐渐形成,市场逐渐被各大企业瓜分,创业机会逐渐减少;再加上产业发展放缓,人才不再稀缺,程序员的奖金分红也会逐渐减少。这样一来,程序员的小资产阶级性质会逐渐消失,最终成为单纯出卖劳动力的无产阶级。

AI 的出现带来了更加不确定的变化。一方面来说,AI 会带来新的市场和新的创业机会,相关领域的从业人员可能因此暂时成为小资产阶级。但在另一方面,AI 正在威胁程序员的岗位。让 AI 参与编程工作,会极大提高开发效率,产业的劳动密集型的现状可能因此改变,就像流水线工厂代替旧的劳动密集型工坊。由于训练 AI 需要高性能 GPU,这可能是一般工人无法拥有的。因此在可预见的未来,高阶 AI 可能像流水线机器一样,成为资本集团才能拥有的生产资料。如果是这样的话,普通程序员绝无可能保住小资产阶级的地位。

总结一下。程序员本是类似于小手工业者的小资产阶级。但是随着产业的发展,独立开发者难以与企业竞争,只能被企业雇佣。但成为工人的程序员并非完全是无产阶级,因为市场开发之初的“创业机会主义”和由此带来的分红,很多程序员都有小资产阶级的性质。然而随着市场逐渐被垄断集团瓜分,程序员的小资产阶级性质会逐渐消失,最终成为无产阶级。创业成功、当上老板的程序员自然是资产阶级。被收购、套现离场的创业者的阶级性质取决于资产状况,但是他们的阶级地位并不稳固。AI 的出现带来新的变化:新的市场会带来新的机会;但高阶 AI 可能成为普通人无法拥有的生产资料,普通程序员可能因此沦为真正的无产阶级。

组了一台 NAS

作者 Luyu Huang
2026年1月8日 00:00

我最近组了一台 NAS,用于同步手机上的相册、自建云盘存储文件、同步 Obsidian 的笔记、存储和管理影音资源、搭建 Git 托管代码等。这篇文章记录我是怎么做的。

硬件

主板选用云星 CS246,一款专门面向 NAS 的 ITX 小主板。它有 LGA 1151 CPU 插槽、2 个 2.5G 网卡(可加钱升级成 4 个)、8 个 SATA 3.0 接口、两个 NVMe 插槽,很适合用来做 NAS。花费¥429。

CPU 买了个 i3 9100T 二手散片。4 核 4 线程,带核显,可以硬解码,做 NAS 基本够用;TDP 35w,比较省电。花费¥135。

内存条选用七彩虹 DDR4 2666 8GB。说实话 8G 有点小,但是考虑到我的腾讯云服务器内存才 1G,苦日子过惯了,8G 凑合着也能用。花费¥119。

固态硬盘也是七彩虹的,CN600 128GB。Linux 系统盘 64G 都绰绰有余了,不过现在能买到的 NVMe 硬盘最小也有 128G。花费¥115。

机箱选的是疾风知 N52 NAS 机箱。它有 5 个盘位,硬盘仓支持安装风扇给硬盘散热。选它是因为我觉得它比较好看,而且相对便宜。花费¥229。

电源选了个 TT TRM SFX 350W。花费¥159。

散热器选了个铜芯铝鳍片的下压式散热器,花费¥27。硬盘仓风扇买的是利民的 12CM 无光静音风扇¥17。

总共花费¥1230。算下来,似乎并不是很划算😂。但自组 NAS 图的就是 DIY 的乐趣,为的是自由定制的权力,便不计较这个了。

系统

系统安装的是 Ubuntu 24.04.3 LTS。我理解的 NAS 就是一台 Linux 服务器,需要什么服务就在上面装就好了,这样最能自由定制,因此没有选择飞牛之类的 NAS 专用系统。主板装好后,插入安装 U 盘试下能否点亮。成功点亮后,顺便也把系统装好了。

硬盘

我本想用两块硬盘组一个 RAID 1,但是现在机械硬盘涨价严重,就只买一块 4 TB 的西数红盘。反正一块硬盘一时半会儿坏不了,等后面硬盘降价了(希望),再组成 RAID 1。

为了后续扩展成 RAID 1,我把这块 4 TB 的硬盘组成一个单盘的 RAID 1。首先使用 parted 创建分区,执行 sudo parted /dev/sda 进入 parted 交互界面,然后依次执行

(parted) mklabel gpt                # 创建分区表
(parted) mkpart primary 0% 100% # 创建主分区
(parted) print # 打印查看下分区表
(parted) set 1 raid on # 标记为 raid 分区

然后使用 mdadm 创建 raid:

sudo mdadm --verbose --create /dev/md0 --level=1 --raid-devices=1 --force /dev/sda1
sudo mkfs.ext4 /dev/md0

然后挂载到 /data

sudo mkdir /data
sudo mount /dev/md0 /data

# 挂载持久化
sudo blkid | grep /dev/md0 # 查看一下 UUID
echo UUID=ae65b749-4e1e-4209-ae57-15ec83578c5c /data ext4 defaults 0 0 | sudo tee -a /etc/fstab # 配置写入 /etc/fstab

Mdadm 的配置最好也持久化一下:

sudo mdadm --detail --scan | sudo tee -a /etc/mdadm/mdadm.conf
sudo update-initramfs -u

以后如果要加硬盘,可以执行以下命令即可:

sudo mdadm --grow /dev/md0 --raid-devices=2 --add /dev/sdb1

网络环境

我对网络环境有几点要求。第一,不能只在内网访问,在外面也要能访问。第二,要用域名访问,用不同的域名区分不同的服务,而不是用 IP + 端口。第三,同一个服务在内网和外网的访问地址是一样的,这样回到家后不需要修改任何配置。

Tailscale 实现内网穿透

为了实现内网穿透,我的方案是使用 Tailscale。Tailscale 是一个基于 WireGuard 实现的 VPN,通过 UDP 打洞实现内网穿透。如果打洞成功,则可以直连通信,无需经由中继服务器;通信速度取决于带宽,非常方便。

Tailscale 非常容易使用。在机器上安装好 Tailscale 后,执行 tailscale up,访问链接登录账号,即可将当前主机加入这个账号下的 Tailscale 网络 (Tailnet)。

$ sudo tailscale up

To authenticate, visit:

https://login.tailscale.com/a/******

同时手机上也安装 Tailscale 客户端,登录账号后,也可以加入我的 tailnet。这样手机和 NAS 便可以随时通信。

NAT 打洞也不是总能成功。根据 Tailscale 官方文档,如果通信双方都在 Hard NAT 中,则不能实现直连。就需要通过中继服务器(称为 DERP),速度很慢。

Client A NAT Type Client B NAT Type Expected Connection
No NAT No NAT Direct
No NAT Easy NAT Direct
No NAT Hard NAT Direct
Easy NAT Easy NAT Direct
Easy NAT Hard NAT Relayed
Hard NAT Hard NAT Relayed

好在现在宽带基本上都提供 ipv6 公网地址。我在光猫上开启允许内网设备使用 ipv6 通信,让 NAS 拥有公网 ipv6 地址。这样,使用 4G/5G 网络能 100% 直连;在外面连的 WIFI 如果支持 ipv6,也能直连。

配置 DNS 服务器

我的目标是在家里连上家里的 WiFi,或者在外连上 VPN,都可以用同样的域名访问 NAS 上的服务。这首先要在 NAS 上部署一个 DNS 服务器。我使用的是 CoreDNS,它的配置非常简单:

.:53 {
bind 192.168.1.2
hosts {
192.168.1.2 nas.luyuhuang.tech
192.168.1.2 immich.luyuhuang.tech

ttl 60
reload 1m
fallthrough
}

forward . /etc/resolv.conf
cache 120
reload 6s
errors
}

这样的配置让 CoreDNS 监听 192.168.1.2:58,在 hosts 中配置域名解析规则。其他域名默认 forward 到系统 DNS 解析。在路由器中将 NAS 的 IP 地址固定为 192.168.1.2,并将首选 DNS 服务器设置为 192.168.1.2。这样任何通过我家路由器上网的设备都可以通过域名访问 NAS 上的服务。

为了让连上 VPN 的设备也能用同样的域名访问 NAS,我们在 Tailscale 的控制台设置一个 Split DNS,地址设置为 NAS 的虚拟 IP;并指定后缀为 luyuhuang.tech 的域名请求 NAS 上的 DNS 服务器。

由于 NAS 的虚拟 IP 和物理局域网的 IP 不同,所以还要让 CoreDNS 监听虚拟 IP,解析结果也要返回虚拟 IP。

.:53 {
bind 100.64.0.3
hosts {
100.64.0.3 nas.luyuhuang.tech
100.64.0.3 immich.luyuhuang.tech

ttl 60
reload 1m
fallthrough
}

forward . /etc/resolv.conf
cache 120
reload 6s
errors
}

这样,例如同样是访问 nas.luyuhuang.tech,内网设备解析得到 192.168.1.2,连上 VPN 的设备则得到虚拟 IP 100.64.0.3。它们都可以用同样的域名访问 NAS。

配置 Nginx 和 HTTPS

我是用 Nginx 作为所有服务的反向代理,这样可以统一使用 80 或 443 端口,但使用不同的域名区分不同的服务。为了简单我直接使用 Docker 版的 Nginx,简单写个 docker-compose.yaml 即可。因为要反向代理到各种服务,网络模式使用 host 模式。

services:
nginx:
image: nginx
container_name: nginx
network_mode: host
volumes:
- ./config:/etc/nginx
- ./cert:/cert
restart: unless-stopped

我的域名 luyuhuang.tech 申请了 Let’s Encrypt 的证书,因此可以直接使用这个证书部署 HTTPS 服务。简单写个脚本同步证书到 NAS 上,这样也不用担心证书过期。

这样服务配置起来就很简单了。例如我的 Homepage 服务(下文会介绍)监听 http://127.0.0.1:4000,Nginx 这边就直接反代过去:

server {
listen 80;
server_name nas.luyuhuang.tech;

location / {
return 301 https://nas.luyuhuang.tech$request_uri;
}
}

server {
listen 443 ssl;
server_name nas.luyuhuang.tech;

include /etc/nginx/ssl.conf;

location / {
proxy_pass http://127.0.0.1:4000;
proxy_set_header Host nas.luyuhuang.tech;
}
}

安装各种服务

这里介绍几个我的 NAS 上安装的服务。这些服务都是用 Docker 安装的,安装步骤都很简单,这里就不介绍了。

Immich

Immich 是一个自建相册服务。它自带 Android 和 iOS 手机客户端,可以将手机的相册同步到 NAS 上。实际上我这次搞 NAS 的导火索就是我的小米云相册的容量满了。它支持 AI 人脸识别、以文搜图的功能(需要 GPU 支持);支持地图足迹功能。我用下来体验还是很不错的。

Cloudreve

Cloudreve 是自建网盘服务。它的网页端功能很不错,大多数格式的文件,无论是图片、音乐、视频,还是文档、压缩包,都可以直接在网页端打开。它支持 WebDAV,我用来作为笔记软件的同步服务;支持离线下载,我用来下载影音资源。它还支持各种云服务的对象存储,不过我暂时还没用到。缺点是没有 Android 客户端,iOS 客户端则需收费。

qBittorrent

qBittorrent 不用介绍了吧,著名的 Bittorrent 下载器。它可以与 Cloudreve 联动,作为 Cloudreve 的离线下载器。在 Cloudreve 中创建离线下载任务,调用 qBittorrent 下载,下载完成自动存入 Cloudreve 网盘。

Jellyfin

Jellyfin 是一个影视资源库。它将影视资源整理成影片和剧集,并从影视数据库中拉取海报、标题、介绍等元数据,方便观看。它能保存观看进度,并支持服务器解码(硬解码需要 GPU 支持),方便在各种设备中观看。除了网页端和 PC 客户端,它还支持手机客户端和 TV 客户端。

我让 Jellyfin 通过 WebDAV 挂载 Cloudreve,使用其中的影视资源。Jellyfin 本身不支持远程挂载,我使用 Rclone 的 Docker 插件,让 Docker 将 WebDAV 链接挂载成 Jellyfin 所在容器的 volume。

volumes:
cloudreve:
driver: rclone
driver_opts:
remote: 'cloudreve:'
read_only: 'true'
dir_cache_time: '1s'

Homepage

最后我还部署了 Homepage,将 NAS 上所有的服务都集中到一个主页,顺便还能显示机器的基本状态、各个服务的运行状态。

用 C 语言实现协程

作者 Luyu Huang
2024年6月14日 00:00

协程与线程(进程)[1]都是模拟多任务(routine)并发执行的软件实现。操作系统线程在一个 CPU 线程中模拟并发执行多个任务,而协程(coroutine)在一个操作系统线程中模拟并发执行多个任务。 因此协程也被称为“用户态线程”、“轻量级线程”。之所以说“模拟并发执行”,是因为任务实际并没有同时运行,而是通过来回切换实现的。

线程切换由操作系统负责,而协程切换通常由程序员直接控制。程序员通过 resume/yield 操作控制协程切换。resume 操作唤醒一个指定协程;yield 操作挂起当前协程,切换回唤醒它的协程。如果你用过 Lua 的协程,就会很熟悉这套流程。不过 Lua 是基于 Lua 虚拟机(LVM)的脚本语言,它只需要 LVM 中执行“虚拟的”上下文切换。本文介绍如何用 C 语言(和一点点汇编)实现一个 native 协程,执行真正的上下文切换。这个实现非常简单,总共不到 200 行代码。我参考了 libco 的实现。本文的完整代码见 toy-coroutine

上下文切换

所谓的上下文,就是一段程序之前做了什么,接下来要做什么,以及做事情过程的中间产物。例如我们有函数 ff 需要知道下一个指令是什么才能接着往下执行,便是“接下来要做什么”。f 函数还需要知道之前是谁调用了它,以便把结果返回给调用者,便是“之前做了什么”。在 f 函数执行过程中,局部变量要存好(不能被写坏),接下来的指令才能正确执行。这便是“过程的中间产物”。

在 x86-64 下,“之前做了什么” 存储在栈里。函数调用会执行 call 指令,把当前函数的下一个指令的地址压入栈顶,然后再跳转到被调用函数。被调用函数返回时执行 ret 指令,从栈顶取出调用者的返回点地址,然后跳转到返回点。因此栈上存有所有前序调用者的返回点地址。

函数的局部变量通常储存在 16 个通用寄存器中,如果寄存器不够用,就存在栈里(只要在函数返回前将它们弹出,让栈顶是返回点地址即可)。函数调用的参数也是局部变量,存在约定的 6 个通用寄存器里。如果不够用,也存在栈里。

至于“接下来要做什么”,其实也在栈里。上下文切换不过是调用一个函数,调用者在调用它之前已经把下一个指令的地址压栈了。当上下文切换函数返回,ret 指令自然会跳转到接下来要执行的指令。所以上下文就是 16 个通用寄存器 + 栈。

所有的协程共享同一个 CPU,也就共享同样的 16 个通用寄存器。如果我们要把 A 协程切换成 B 协程,就要把当前 16 个通用寄存器的值存在 A 协程的数据结构里;然后再从 B 协程的数据结构里取出 B 协程的寄存器的值,写回通用寄存器中。我们还要处理栈。不过栈与寄存器不同,x86-64 规定 %rsp 寄存器(也是通用寄存器之一)存的值便是栈顶的地址。不同的协程不必共享栈,它们可以分配各自的栈,上下文切换时将 %rsp 指向各自的栈顶即可。

实际上我们不必存储全部的 16 个通用寄存器,它们有些是暂存寄存器(Scratch Registers),是允许被写坏的。这些寄存器的值可能在执行一次函数调用后就变了(被被调用函数写坏的)。编译器也不会在暂存寄存器里存储函数调用后还要用的值。参考 libco 的实现,我们存储 13 个寄存器:

enum {
CO_R15 = 0,
CO_R14,
CO_R13,
CO_R12,
CO_R9,
CO_R8,
CO_RBP,
CO_RDI,
CO_RSI,
CO_RDX,
CO_RCX,
CO_RBX,
CO_RSP,
};

struct co_context {
void *regs[13];
};

有些寄存器有特殊的用途。这里我们只需要知道这三个:

  • %rsp: 栈寄存器,指向栈顶。
  • %rdi, %rsi: 第一参数寄存器和第二参数寄存器,调用函数前将第一个参数存在 %rdi 里,第二个存在 %rsi 里(剩下的四个依次是 %rdx, %rcx, %r8, %r9, 不过这里我们用不上),然后执行 call 指令。

接着我们定义一个函数做上下文切换,把当前通用寄存器的值保存在 curr 中,再把 next 中保存的寄存器的值写回各个通用寄存器。

extern void co_ctx_swap(struct co_context *curr, struct co_context *next);

Emm,这个函数没法用 C 语言实现,我们得用到一点点汇编了。其实非常简单,我们只需要用 movq 指令存取寄存器。代码如下:

.globl co_ctx_swap

co_ctx_swap:
movq %rsp, 96(%rdi)
movq %rbx, 88(%rdi)
movq %rcx, 80(%rdi)
movq %rdx, 72(%rdi)
movq %rsi, 64(%rdi)
movq %rdi, 56(%rdi)
movq %rbp, 48(%rdi)
movq %r8, 40(%rdi)
movq %r9, 32(%rdi)
movq %r12, 24(%rdi)
movq %r13, 16(%rdi)
movq %r14, 8(%rdi)
movq %r15, (%rdi)

movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 32(%rsi), %r9
movq 40(%rsi), %r8
movq 48(%rsi), %rbp
movq 56(%rsi), %rdi
movq 72(%rsi), %rdx
movq 80(%rsi), %rcx
movq 88(%rsi), %rbx
movq 96(%rsi), %rsp
movq 64(%rsi), %rsi

ret

不懂汇编没关系(其实我也不是很懂),只需要知道 movq 指令将第一个操作数的值复制到第二个操作数中。% 开头的标识符为寄存器。%rsp 这样不带括号的,表示存取寄存器的值。(%rdi) 这种带括号的,表示去内存里存取地址为 %rdi 的数据。如果括号前面有数字几,就表示这个地址要加几。movq 存取数据的长度为 8 字节,寄存器的长度也是 8 字节。

还记得前面说过,%rdi 是第一个参数,%rsi 是第二个参数吗?所以 %rdi 就是 struct co_context *curr96(%rdi) 就是 curr->regs[12]88(%rdi) 就是 curr->regs[11],……,(%rdi) 就是 curr->regs[0]。上半部分把 13 个通用寄存器的值全部存到了 curr 里。同理 %rsi 就是 struct co_context *next(%rsi) 就是 next->regs[0]8(%rsi) 就是 next->regs[1],依次类推。于是下半部分把 next 中保存的寄存器的值写回寄存器中。最后执行 ret 指令返回。

注意 29 行写入 %rsp 的值就是上次挂起时第 4 行保存的值,这个值我们原封未动,也没有做任何栈操作。因此最后 ret 返回时,栈顶就是 co_ctx_swap 的调用者设置的返回点地址。一个协程调用 co_ctx_swap 将自己挂起,便陷入沉睡。当 co_ctx_swap 返回之时,便是其它协程调用 co_ctx_swap 将它唤醒之时。此时寄存器被还原、栈被还原、也回到了返回点。它便知道自己之前做了什么、接下来要做什么、中间产物是怎样的。

协程的初始化

struct co_context 仅存储协程的上下文。我们还需要维护给协程分配的栈空间、记录入口函数地址等。我们定义 struct coroutine 表示协程对象。

typedef void (*start_coroutine)();

struct coroutine {
struct co_context ctx;
char *stack;
size_t stack_size;
start_coroutine start;
};

struct coroutine *co_new(start_coroutine start, size_t stack_size) {
struct coroutine *co = malloc(sizeof(struct coroutine));
memset(&co->ctx, 0, sizeof(co->ctx));
if (stack_size) {
co->stack = malloc(stack_size);
} else {
co->stack = NULL;
}
co->stack_size = stack_size;
co->start = start;

return co;
}

void co_free(struct coroutine *co) {
free(co->stack);
free(co);
}

co_new 创建一个新协程,接受两个参数:start 协程入口函数指针,和 stack_size 栈大小;这类似于 pthread_createco_new 分配协程的栈空间并设置好各个字段。

要把主线程切换到我们新创建的协程,这里有两个问题。一是主线程并不是一个协程,新协程跟谁交换上下文呢?二是新创建的协程的上下文是空的(19 行),切换过去肯定跑不起来。

第一个问题很简单:创建一个就行。因为主线程已经跑起来了,要切换到新协程,主线程只需要一个“容器”把它的上下文装进去。直接执行 main = co_new(NULL, 0) 创建主协程,调用 co_ctx_swap(&main->ctx, &new->ctx) 便可切换到新协程。此时主线(协)程的上下文保存在 main 中,当新协程反向调用 co_ctx_swap(&new->ctx, &main->ctx),便又切换回主协程了。

为了解决第二个问题,我们需要对新协程初始化。co_ctx_swap 将新协程的上下文复制到 CPU 后,执行 ret 返回栈顶记录的地址。因此我们要将栈顶置为协程入口函数的地址,这样在 co_ctx_swap 返回后便跳转到协程入口函数了。

void co_ctx_make(struct coroutine *co) {
char *sp = co->stack + co->stack_size - sizeof(void*);
sp = (char*)((intptr_t)sp & -16LL);
*(void**)sp = (void*)co->start;
co->ctx.regs[CO_RSP] = sp;
}

因为 x86 的栈是从高地址向低地址增长的,初始栈为空,所以栈顶应该指向 co->stack 的最末尾。又因为 x86 的栈必须 16 字节对齐,所以执行 (intptr_t)sp & -16LL(-16 低 4 位为 0,其它都为 1)得到栈顶地址。然后将栈顶置为 co->start,也就是入口函数的地址。最后我们把保存的 rsp 寄存器的值设置为栈顶地址,这个值会在 co_ctx_swap 被复制到寄存器 %rsp 中。

现在我们的协程已经可以跑起来了。写一个简单的例子试试:

struct coroutine *main_co, *new_co;

void foo() {
printf("here is the new coroutine\n");
co_ctx_swap(&new_co->ctx, &main_co->ctx);
printf("new coroutine resumed\n");
co_ctx_swap(&new_co->ctx, &main_co->ctx);
}

int main() {
main_co = co_new(NULL, 0);
new_co = co_new(foo, 1024 * 1024);
co_ctx_make(new_co);

co_ctx_swap(&main_co->ctx, &new_co->ctx);
printf("main coroutine here\n");
co_ctx_swap(&main_co->ctx, &new_co->ctx);
return 0;
}

把上面所有的 C 代码复制到文件 co.c,汇编代码存为 co.S,然后执行 gcc -o co co.c co.S 编译,运行试试!

协程的管理

现在的协程虽然可以跑,但是使用起来很不方便,需要手动交换上下文,也容易出错。我们需要实现 resume/yield 操作。resume 操作唤醒指定协程,也就是当前协程与指定协程交换。yield 挂起当前协程,将当前协程与上次唤醒它的协程交换。因此我们需要记录当前运行的协程;而对于每个协程,要保存唤醒它的协程的指针。

协程切换要遵循这几条规则:

  • 主协程不能执行 yield 操作。这是显而易见的,因为它没有唤醒者。
  • 不能 resume 一个正在运行的协程。
  • 如果一个协程通过 resume 操作进入挂起状态,则不能由 resume 操作唤醒。例如,上图所示的协程 B 在 resume 协程 C 后,只能被协程 C 的 yield 操作唤醒。如果允许其它协程通过 resume 操作唤醒它,则协程切换会陷入混乱。
  • 除主协程外的协程结束时需要执行 yield 操作,之后进入死亡状态。死亡状态的协程不能被 resume 操作唤醒。

基于此,我们给协程定义五个状态:

enum {
CO_STATUS_INIT, // 初始状态
CO_STATUS_PENDING, // 执行 yield 操作进入的挂起状态
CO_STATUS_NORMAL, // 执行 resume 操作进入的挂起状态
CO_STATUS_RUNNING, // 运行状态
CO_STATUS_DEAD, // 死亡状态
};

我们使用全局变量 g_curr_co 记录当前协程。每个协程还要记录当前状态和唤醒自己的协程。

struct coroutine {
struct co_context ctx;
char *stack;
size_t stack_size;
int status; // 协程状态
struct coroutine *prev; // 唤醒者
start_coroutine start;
};

struct coroutine *g_curr_co = NULL; // 当前协程

struct coroutine *co_new(start_coroutine start, size_t stack_size) {
struct coroutine *co = malloc(sizeof(struct coroutine));
...
co->status = CO_STATUS_INIT;
co->prev = NULL;
return co;
}

void check_init() {
if (!g_curr_co) { // 初始化主协程
g_curr_co = co_new(NULL, 0);
g_curr_co->status = CO_STATUS_RUNNING; // 主协程状态初始为 RUNNING
}
}

接着实现 resume 操作和 yield 操作。根据上面描述的思路,实现起来很容易。

int co_resume(struct coroutine *next) {
check_init();

switch (next->status) {
case CO_STATUS_INIT: // 初始状态,需要执行 co_ctx_make 初始化
co_ctx_make(next);
case CO_STATUS_PENDING: // 只有处于 INIT 和 PENDING 状态的协程可以被 resume 唤醒
break;
default:
return -1;
}

struct coroutine *curr = g_curr_co;
g_curr_co = next; // g_curr_co 指向新协程
next->prev = curr; // 设置新协程的唤醒者为当前协程
curr->status = CO_STATUS_NORMAL; // 当前协程进入 NORMAL 状态
next->status = CO_STATUS_RUNNING; // 新协程进入 RUNNING 状态
co_ctx_swap(&curr->ctx, &next->ctx);

return 0;
}

int co_yield() {
check_init();

struct coroutine *curr = g_curr_co;
struct coroutine *prev = curr->prev;

if (!prev) { // 没有唤醒者,不能执行 yield 操作
return -1;
}

g_curr_co = prev; // g_curr_co 指向当前协程的唤醒者
if (curr->status != CO_STATUS_DEAD) {
curr->status = CO_STATUS_PENDING; // 当前协程进入 PENDING 状态
}
prev->status = CO_STATUS_RUNNING; // 唤醒者进入 RUNNING 状态
co_ctx_swap(&curr->ctx, &prev->ctx);

return 0;
}

除主协程外的协程结束运行时一定要执行 yield 操作将自己切出,否则它不知道该返回到哪儿。为了不让使用者手动执行这个操作,我们将协程入口函数封装一层。

void co_entrance(struct coroutine *co) {
co->start(); // 执行入口函数
co->status = CO_STATUS_DEAD;
co_yield(); // 已经置为 DEAD 状态了,切出后不会有人唤醒它了。这里 co_yield 永远不会返回
// 不会走到这里来
}

void co_ctx_make(struct coroutine *co) {
char *sp = co->stack + co->stack_size - sizeof(void*);
sp = (char*)((intptr_t)sp & -16LL);
*(void**)sp = (void*)co_entrance; // 设置入口地址为 co_entrance
co->ctx.regs[CO_RSP] = sp;
co->ctx.regs[CO_RDI] = co; // rdi 为第一参数寄存器,将它的值置为 co,这样 co_entrance 就能拿到它的参数了
}

这样我们的协程用起来就更方便了:

void foo() {
printf("here is the new coroutine\n");
co_yield();
printf("new coroutine resumed\n");
}

int main() {
struct coroutine *co = co_new(foo, 1024 * 1024);
co_resume(co);
printf("main coroutine here\n");
co_resume(co);
return 0;
}

传递参数

resume/yield 可以用于传递参数。运行上面的例子,我们发现 co_yield 返回之时便是其它协程调用 co_resume 之时;而 co_resume 返回之时又是其它协程调用 co_yield 之时。因此 resume 操作接受参数,传递给 yield 返回;yield 操作接受参数,传递给 resume 返回。这样方便在协程之间传递数据。

我们在 struct coroutine 中新增一个 data 字段用于传递参数。协程切换时,如果要给目标协程传递参数,就对目标协程的 data 字段赋值。协程切换后,就能从 data 字段中取出上一个协程传递的参数。

struct coroutine {
...
void *data; // 参数
};

int co_resume(struct coroutine *next, void *param, void **result) {
...

next->data = param; // 切换到 next 协程,给 next 协程的参数
co_ctx_swap(&curr->ctx, &next->ctx);
if (result) {
*result = curr->data;
}
return 0;
}

int co_yield(void *result, void **param) {
...

prev->data = result; // 切回 prev 协程,给 prev 协程的结果
co_ctx_swap(&curr->ctx, &prev->ctx);
if (param) {
*param = curr->data; // 其它协程唤醒它时给它的参数
}
return 0;
}

我们重新定义协程入口函数,让它接受参数和返回值。第一次 resume 的参数传给入口函数;入口函数的返回值在最后一次 yield 时传出去。

typedef void *(*start_coroutine)(void *);

static void co_entrance(struct coroutine *co) {
void *result = co->start(co->data);
co->status = CO_STATUS_DEAD;
co_yield(result, NULL); // 协程的最后一次 yield 操作,将入口函数的返回值传出去
}

例子

现在,我们的协程库已经完全实现了。我们可以写一些例子测试一下。比如说我们可以创建一个源源不断生成以 n 开头的自然数的协程:

void *number(void *param) {
intptr_t i = (intptr_t)param;
co_yield(NULL, NULL); // 初始化后立刻 yield
while (1) {
co_yield((void*)i, NULL);
++i;
}
}

int main() {
struct coroutine *num = co_new(number, 1024 * 1024);
co_resume(num, (void*)0, NULL); // 初始化为以 0 开头的自然数流
for (int i = 0; i < 10; ++i) {
intptr_t n;
co_resume(num, NULL, (void**)&n);
printf("%ld ", n);
}
co_free(num);
return 0;
}

运行结果就是

0 1 2 3 4 5 6 7 8 9

这个协程就是一个无限流。我们还可以写一个将两个无限流逐项相加的协程:

void *add(void *param) {
struct coroutine **cov = param, *co0 = cov[0], *co1 = cov[1]; // cov 指向前序协程的栈,这里要立刻将其中的数据取出来
co_yield(NULL, NULL); // 同样,初始化后立刻 yield
while (1) {
intptr_t a, b;
co_resume(co0, NULL, (void**)&a);
co_resume(co1, NULL, (void**)&b);
co_yield((void*)(a + b), NULL);
}
}

然后将 0 开头的自然数流与 1 开头的自然数流逐项相加,得到奇数无限流(0 + 1 = 1, 1 + 2 = 3, 2 + 3 = 5, …)

int main() {
struct coroutine *num0 = co_new(number, 1024 * 1024);
struct coroutine *num1 = co_new(number, 1024 * 1024);
struct coroutine *co_add = co_new(add, 1024 * 1024);

co_resume(num0, (void*)0, NULL); // 以 0 开头的自然数流
co_resume(num1, (void*)1, NULL); // 以 1 开头的自然数流

struct coroutine *cov[] = {num0, num1};
co_resume(co_add, cov, NULL); // 初始化 add 协程

for (int i = 0; i < 10; ++i) {
intptr_t s;
co_resume(co_add, NULL, (void**)&s);
printf("%ld ", s);
}

co_free(num0), co_free(num1), co_free(co_add);
return 0;
}

运行结果就是

1 3 5 7 9 11 13 15 17 19

当然还有更好玩的。我们可以实现一个斐波那契数列生成器。斐波那契数列可以自我定义:令 f(i) 是以第 i 项开头的斐波那契数列,f(a) + f(b) 表示将两个数列逐项相加,那么如下所示,f(2) = f(0) + f(1)。

    0  1  1  2  3  5
+ 1 1 2 3 5 8
-----------------------
1 2 3 5 8 13

所以我们可以这样做

void *fib(void *param) {
co_yield((void*)0, NULL); // 第 0 项
co_yield((void*)1, NULL); // 第 1 项
struct coroutine *f0 = co_new(fib, 1024 * 1024);
struct coroutine *f1 = co_new(fib, 1024 * 1024);
co_resume(f1, NULL, NULL); // f1 先走一步,让它成为以第 1 项开头的斐波那契数列

struct coroutine *co_add = co_new(add, 1024 * 1024);
struct coroutine *cov[] = {f0, f1};
co_resume(co_add, cov, NULL); // 将 f0 与 f1 逐项相加
while (1) {
intptr_t s;
co_resume(co_add, NULL, (void**)&s);
co_yield((void*)s, NULL);
}
}

int main() {
struct coroutine *f = co_new(fib, 1024 * 1024);
for (int i = 0; i < 10; ++i) {
intptr_t s;
co_resume(f, NULL, (void**)&s);
printf("%ld ", s);
}
}

运行结果便是斐波那契数列:

0 1 1 2 3 5 8 13 21 34

不过这种写法会创建大量协程,性能很低。仅供演示(炫技)


  1. 对于 Linux 内核而言,线程和进程是同一个东西 ↩︎

2023 Annual Review

作者 Luyu Huang
2024年1月1日 00:00

In 2023 I spent most of my time on work, learning and dating. Compared with the last year, devoted less time on this blog and community. It might be a pretext but to be honest, writing a blog post always consumes a lot of my energy, especially when there was a lot of overtime this year. Anyway, I should write more posts in the new year, and I’ll try to write some non-technical posts (well, partially because they’re easy to write (and read)).

Let’s talk a little about professional skills over here. In the past, I focused most of my efforts on coding skill, rather than engineering skills, or let’s say, the business skills. This is because I love the hacker spirit, and am fascinated by intricate program structures and algorithms. But your boss just need one who solve problems, them don’t care about computer science. To solve a problem, you have to consider many things other than the computer - namely, the people around you. And that’s exactly what software engineering does - to use some engineering methods to prevent or eliminate mistakes made by humans. So when I focus on hacking, I must keep an open mind on other skills, especially those methodologies for problem-solving.

Goals

  • Keep up learning English
  • Read Understanding the Linux Kernel (I’m not sure if I can finish it)
  • Keep up daily LeetCode exercises
  • Keep up writing blogs, basically one post a month
  • Read some non-technical books
  • Keep up exercises

Learning

I finished learning Structure and Interpretation of Computer Programs (SICP), an amazing book. Its content comprised of functional programming, layering of program and data structure, OOP, infinite streams, the metacircular evaluator, lazy evaluation, compilation principle, etc. I have to say, SICP opened the gate of computer science for me. Before then, I don’t really comprehend the essential of computer science.

sicp

LeetCode

I kept on doing LeetCode like the past few years. The grid looks not bad.

leetcode

Now I’ve solved 1182 problems. Last year it’s 912, so I solved 270 problems in 2023.

Language

I kept learning English in 2023, as I did in the past few years. After continuously learning vocabulary on the APP baicizhan for 2024 days, I found it might not be a very effective way to memorize new words for me at the moment. Therefore, In November, I started learning Merriam-Webster’s Vocabulary Builder. This book organizes words by their roots, besides telling you how to use a word, its history, and related knowledge.

webster

I also read another amazing vocabulary builder, Word Power Made Easy. To me, this book like an introduction to the etymology and at the same time teaches you how to memorize over 3,000 words and continue building your vocabulary.

webster

In addition, I kept leaning Japanese on Duolingo as I did in 2022.

webster

Creating

I only wrote 6 posts on the blog.

I created a repo luyuhuang/nvim, but it’s just for personal configuration, should not be considered as a contribution. I submitted some pull requests and issues to the community and some of have been accepted.

Something Happy

Hiking and seeing the skyline of the city at the summit is quite happy, especially with the one you love.

webster

Finally

At the end of a year, I always feel time flies by quickly. But after I wrote the annual review, I found that the time of a year is quite long, because you can literally do many things in a year and grow quite a bit in certain aspects (say, I found my English writing skill is much better than the last year). So in the new year, keep learning and growing, and I believe we’ll get a better result. Happy New Year to everybody.

一种简单的事务实现

作者 Luyu Huang
2023年6月18日 00:00

在服务器编程中,事务往往是非常重要的,它的一个很重要的作用就是保证一系列操作的完整性。例如服务器处理某个请求要先后执行 a, b 两个修改操作,它们都有可能失败;如果 a 成功了但 b 失败了,事务会负责回滚 a 的修改。试想如果 a 操作是扣除余额,b 操作是发货,如果发货失败,钱就得退回去。如果服务器使用了支持事务的数据库系统,如 MySQL,事情就很好办。否则的话,实现类似的逻辑会比较棘手,也很容易犯错。

我希望有一种简单的事务系统,实现这样的效果:例如在下面的代码中,handler 函数处理业务逻辑。只要 handler 函数的任意位置抛出异常,那么 handler 中所有修改,无论是 _G.DB.last_update_timedata.order 还是 data.money,都将回滚。

function handler(data)
_G.DB.last_update_time = os.time()
data.order_id = get_order_id()
check_order(data)
data.money = data.money - 10
deliver(data)
end

因为我们的程序是单线程的,因此不用考虑事务隔离性之类的问题。所以这个所谓的“事务系统”只是一种自动回滚机制。

其实我在以前见过类似的事务实现。它的做法是将需要修改的数据(如上面的 data)存储两份,一份是正式数据,一份是暂存数据。业务代码修改暂存数据,如果没有抛出异常,则让暂存数据覆盖正式数据 (commit);否则让正式数据覆盖暂存数据 (rollback)。暂存数据只是正式数据的浅拷贝,即使是这样,内存开销仍然非常大。而且由于是浅拷贝,这种机制对引用类型(如 table)的字段无效。我认为这种做法并不够好。

最近我受到 SICP 4.3 节 Nondeterministic Computing 的启发,想到其实回滚数据很简单——再改回去就好了。我们在修改数据的时候记录下数据在修改之前的值,如果捕获到异常,就把对应的数据改回修改之前的值。我们从 pcall 开始动手:

local original_pcall = pcall
function pcall(f, ...)
begin()
local ok, res = original_pcall(f, ...)
if ok then
commit()
else
rollback()
end
return ok, res
end

由于 pcall 可以嵌套,i.e. pcall(function() pcall(function() end) end),我们使用栈保存事务的上下文,在 begin 中压栈,commitrollback 时弹出栈。因此栈顶就是当前事务的上下文。调用 set 执行修改操作,它会将数据的原始值保存在上下文中。

local stack = {}

local function begin()
table.insert(stack, {})
end

function set(tab, key, val)
local top = stack[#stack]
if top then
table.insert(top, {tab, key, tab[key]})
end
tab[key] = val
end

Commit 时,当前事务的赋值操作全部生效,当前事务造成的副作用亦是上层事务的副作用,需要将当前事务记录的数据原始值移动到上层事务(如果有的话)的上下文中。回滚时,从后往前依次取出每次 set 操作的原始值,将数据设置成修改前的值,完成回滚操作。

local function commit()
local top = table.remove(stack)
local pi = stack[#stack]
if pi then
for _, assign in ipairs(top) do
table.insert(pi, assign)
end
end
end

local function rollback()
local top = table.remove(stack)
for i = #top, 1, -1 do
local tab, key, val = table.unpack(top[i], 1, 3)
tab[key] = val
end
end

使用的时候不能直接赋值,需要调用 set。当然也可以做成元表,不过我不是很喜欢这样。

val = {}
function test()
local old = val
set(_G, 'val', 42)
set(old, 1, 1)
error()
end

pcall(test) -- false nil
next(val) -- nil

整个实现可以说是非常简单且行之有效,开销也并不大。代码是我随手写的,它还有优化空间:stack 中的旧数据存储可以使用更紧凑的数据结构;代码可以用 C 实现提高性能等。

Neovim 使用体验

作者 Luyu Huang
2023年3月21日 00:00

我很喜欢 Vim 这种纯键盘的交互方式, 但是我却一直不怎么会用 Vim. 虽然熟悉 Vim 的键位, 但是不懂 Vimscript, 不知道怎么装插件, 怎么定制它. 因此我一直使用 VSCode 作为主力开发工具, 搭配 VSCodeVim 这个插件, 能同时享受到 Vim 和 VSCode 的特性. 但是我总觉得需要会用真正的 Vim.

直到我接触到了 Neovim, 作为 Vim 的改进版, 它使用 Lua (我最熟悉的语言之一) 作为配置文件, 有着很现代化的特性, 解决了 Vim 的很多遗留问题. 于是花时间学习了解了 Neovim, 并且调教成我喜欢的样子. 现在我使用 Neovim 作为主力开发工具近三个月了, 这篇文章谈谈我的一些配置和使用体会. 我的 Neovim 配置在仓库 luyuhuang/nvim.

插件

我使用的插件有很多, 无法一一介绍. 这里介绍几个我认为比较重要的.

插件管理器: Lazy.nvim

Neovim 可以在没有插件管理器的情况下安装插件. 但是插件管理器可以带来很多好处: 可以根据配置的插件名自动安装插件, 可以自动更新插件, 以及最重要的: 可以实现延迟加载. 如果安装了太多插件, 会导致 Neovim 启动缓慢. 使用插件管理器就可以在真正需要使用插件的时候加载它.

我目前使用的插件管理器是 lazy.nvim. 刚开始我也使用 packer.nvim, 但是在插件之间存在依赖时, 它对延时加载的支持并不很好. lazy.nvim 则很好地解决了这个问题. 当一个插件被加载的时候, 它所依赖的插件会自动加载. 也就是我只需要配置插件的依赖项, 以及何时需要加载这个插件, 而不需要关心依赖项什么加载.

Lazy.nvim 提供了一个管理界面, 可以展示插件的加载状态, 安装和更新插件, 分析插件加载耗时等.

nvim

Profile 功能非常好用, 它展示各个插件时如何加载的, 耗时情况, 有效帮助我们优化性能. 虽然我总共安装了 31 个插件, 但是启动时仅加载 3 个插件; 其他插件都是延迟加载的. 启动时间仅 35 毫秒.

模糊搜索: Telescope

没人喜欢输入完整的文件名来打开编辑文件. 我比较习惯 vscode 和 sublime 的方式, Ctrl-P 模糊查找文件. 我使用的插件是 telescope.nvim. 它由 Lua 实现, 支持扩展, 功能十分强大. 它几乎支持一切可以列在列表中的东西, 例如:

  • 文件内容模糊搜索
  • Tag 符号搜索
  • LSP 定义/引用搜索
  • 诊断信息预览
  • Treesitter 符号搜索
  • Git 提交记录搜索
  • Git 文件变动预览
nvim

Telescope 需要手动设置按键映射. 我将 leader + / 设置为模糊搜索当前文件内容, Ctrl-P 设置为查找文件, Ctrl-O 设置为打开当前文件的符号; gs 在普通模式下设置为使用全词匹配搜索光标下的词 (通过参数 word_match = '-w'), visual 模式下设置为不使用全词匹配搜索选定内容.

local builtin = require('telescope.builtin')

vim.keymap.set('n', '<leader>/', builtin.current_buffer_fuzzy_find)
vim.keymap.set('n', '<C-p>', builtin.find_files)
vim.keymap.set('n', '<C-o>', builtin.current_buffer_tags)
vim.keymap.set('n', 'gs', function()
builtin.grep_string({word_match = '-w'})
end)
vim.keymap.set('v', 'gs', function()
vim.cmd.normal('"fy')
builtin.grep_string({search = vim.fn.getreg('"f')})
end)

Telescope 可定制性很强. 例如全局搜索, 有的时候我们需要全词匹配, 有时需要区分大小写, 有时需要开启正则表达式. 我的设置是利用 vim 的 count 机制: 执行命令前可以先按一串数字表示这个命令重复多少遍. 对于自定义映射, 我们自然可以获取到这串数字. 不过我的定义不是表示重复多少遍, 而是用于设置选项: 如果按过 1, 则表示开启正则匹配; 如果按过 2, 则表示开启全词匹配; 如果按过 3, 则表示区分大小写.

function live_grep_opts(opts)
local flags = tostring(vim.v.count)
local additional_args = {}
local prompt_title = 'Live Grep'
if flags:find('1') then
prompt_title = prompt_title .. ' [.*]'
else
table.insert(additional_args, '--fixed-strings')
end
if flags:find('2') then
prompt_title = prompt_title .. ' [w]'
table.insert(additional_args, '--word-regexp')
end
if flags:find('3') then
prompt_title = prompt_title .. ' [Aa]'
table.insert(additional_args, '--case-sensitive')
end

opts = opts or {}
opts.additional_args = function() return additional_args end
opts.prompt_title = prompt_title
return opts
end

vim.keymap.set('n', '<leader>s', function() builtin.live_grep(utils.live_grep_opts{}) end)

改变页签的逻辑: Bufferline.nvim

Vim / Neovim 有个缓冲区 (Buffer) 的概念. 每个打开的文件都是一个 buffer, buffer 可以绑定在窗口上, 即使窗口关闭, 缓冲区依然存在. Bufferline.nvim 这个插件可以将 buffer 作为页签列出来, 显示在页签栏, 取代原生的页签栏.

nvim

这种做法改变了原生页签栏的逻辑, 但是却有很大的好处. 实际上我觉得 vim 的 buffer 就近似等于诸如 vscode 和 sublime 之类编辑器的页签. 首先, 打开文件时, 如果目标 buffer 不存在, 则会创建它, 否则切换到已存在的 buffer. 跳转定义等操作时也是如此: 文件已打开则跳转, 否则在新页签中打开. 这一操作逻辑与 vscode 相同.

由于窗格可以自由绑定 buffer, 这让多窗格的操作逻辑也很顺畅.

Bufferline 需要一些简单的配置. 我的配置为 leader 加数字键切换页签, leader 键为空格. 此外 leader + j 和 leader + k 用于切换相邻页签, Ctrl-j 和 Ctrl-k 用于移动页签. ZZ 关闭当前页签 (实际上是删除当前 buffer)

for i = 1, 9 do
vim.keymap.set('n', '<leader>' .. i, function() bufferline.go_to_buffer(i, true) end)
end

vim.keymap.set('n', '<leader>j', '<Cmd>BufferLineCyclePrev<CR>')
vim.keymap.set('n', '<leader>k', '<Cmd>BufferLineCycleNext<CR>')
vim.keymap.set('n', 'gT', '<Cmd>BufferLineCyclePrev<CR>')
vim.keymap.set('n', 'gt', '<Cmd>BufferLineCycleNext<CR>')
vim.keymap.set('n', '<C-j>', '<Cmd>BufferLineMovePrev<CR>')
vim.keymap.set('n', '<C-k>', '<Cmd>BufferLineMoveNext<CR>')
vim.keymap.set('n', 'ZZ', function()
if vim.bo.modified then
vim.cmd.write()
end
local buf = vim.fn.bufnr()
bufferline.cycle(-1)
vim.cmd.bdelete(buf)
end)

像个 IDE 一样: LSP 和补全

LSP (Language Server Protocol) 是个很棒的发明. 过去我们需要 C++ 的 IDE, Java 的 IDE, C++ 的 IDE… 不同的 IDE 操作都不同, 而我们需要使用的功能基本上就是跳转定义, 查找引用, 错误检测, 自动补全等. LSP 则将这两部分分开了. 诸如跳转到定义, 自动补全之内的功能, 由一个称为 language server 的独立进程完成, 它与编辑器之间使用 LSP 通信. 这样各种语言只需要实现自己的 language server, 开发者可以使用自己喜欢的编辑器开发.

Neovim 原生支持 LSP, 只需要简单几行配置就可以接入 language server. 不过如果你像我一样连这几行代码都不愿写, 就可以使用 nvim-lspconfig 这个插件. 它集成了两百多种主流 language server 的配置, 只需要一行代码就能接入.

lspconfig.clangd.setup{on_attach = on_attach}

on_attach 用于指定当这个 language server 接入时的回调函数. 在这里可以指定一些按键映射. 例如我将 gd 映射成跳转到定义, gr 映射成查找引用, F2 重命名等.

local function on_attach(client, bufnr)
local bufopts = {noremap=true, silent=true, buffer=bufnr}
vim.keymap.set('n', '<C-o>', function() builtin.lsp_document_symbols{symbol_width = 0.8} end, bufopts) -- 打开当前文件的符号
vim.keymap.set('n', 'gd', function() builtin.lsp_definitions{fname_width = 0.4} end, bufopts) -- 跳转定义
vim.keymap.set('n', 'K', vim.lsp.buf.hover, bufopts) -- 模拟鼠标悬停
vim.keymap.set('n', 'gr', function() builtin.lsp_references{fname_width = 0.4} end, bufopts) -- 查找引用
vim.keymap.set('n', '<F2>', vim.lsp.buf.rename, bufopts) -- 重命名
end

此外还可以设置光标移动时变量名高亮, 输入时显示函数签名提示等.

local function on_attach(client, bufnr)
...

vim.api.nvim_create_autocmd({'CursorHold', 'CursorHoldI'}, {callback = vim.lsp.buf.document_highlight, buffer = bufnr}) -- 光标不动时高亮变量名
vim.api.nvim_create_autocmd({'CursorMoved', 'CursorMovedI'}, {callback = vim.lsp.buf.clear_references, buffer = bufnr}) -- 移动光标时清除高亮
vim.api.nvim_create_autocmd({'TextChangedI', 'TextChangedP'}, {callback = vim.lsp.buf.signature_help, buffer = bufnr}) -- 输入时显示函数签名提示
end

补全

Neovim 原生的补全也能用, 不过插件可以带来更好的体验. 我使用的是 nvim-cmp, 它支持多种补全, 不同的补全能力由插件提供. 例如:

  • 根据 language server 语义分析补全, 可以由插件 hrsh7th/cmp-nvim-lsp 提供.
  • 根据 buffer 中的单词补全, 可以由插件 hrsh7th/cmp-buffer 提供.
  • 根据 tags 中的符号补全, 可以由插件 quangnguyen30192/cmp-nvim-tags 提供.

nvim-cmp 需要配合 snippet 插件实现 LSP 的代码片段补全. 我用的是 saadparwaiz1/cmp_luasnip. 只需要做一些简单的配置

require('cmp').setup{
preselect = cmp.PreselectMode.None,
snippet = { -- snippet 插件设置
expand = function(args)
require('snippet').lsp_expand(args.body)
end,
},
mapping = cmp.mapping.preset.insert({ -- 快捷键
['<Tab>'] = cmp.mapping.select_next_item(),
['<S-Tab>'] = cmp.mapping.select_prev_item(),
['<CR>'] = cmp.mapping.confirm(),
}),
sources = { -- 补全优先级. LSP 优先级最高, 其次是 tags 和 buffer
{name = 'nvim_lsp'},
{name = 'tags'},
{name = 'buffer'},
},
}

LSP 和补全的效果可以参见本文开头的视频.

指哪打哪: Hop.nvim

最后再介绍一个我很喜欢的插件: hop.nvim. 使用它可以方便地跳转到屏幕中的任意位置:

视频中的例子是跳转单词的起始位置. 按下快捷键后, 屏幕中每个单词的起始位置都会出现若干个字母, 按下对应的字母即可跳转到对应的位置. Hop.nvim 支持多种跳转方式:

  • 跳转到全屏单词起始位置
  • 跳转到当前行单词起始位置
  • 跳转到每行起始位置
  • 跳转到全屏指定单字母的位置
  • 跳转到全屏指定双字母的位置
  • 跳转到当前列的任意位置

总之它的功能非常丰富. 对于我来说, 使用 “跳转全屏单词” 和 “跳转本行单词” 这两种跳转方式就已经非常够用了. 我将它们分别映射成 leader + h 和 leader + f.

实用配置

打通剪切板

Vim 有一套自己的剪切板系统, 复制的内容会放入 vim 的寄存器中. 但是编辑器的剪切板与系统的剪切板不互通给我们带来了很多麻烦. 好在 Neovim 对此有支持.

为了解耦, Neovim 并不直接提供剪切板支持, 而是依赖于独立的工具. 当数据写入 +* 寄存器时, Neovim 会寻找系统中的剪切板工具, 如果有就会把数据同步给它们. 类似地, 当读取 +* 的时候, Neovim 也会尝试从剪切板工具中读取数据. Neovim 原生支持 pbcopy, pbpaste, xclip win32yank tmux 等工具, 只要安装了这些工具就会自动启用.

在 Windows/WSL 中, 我们可以使用 win32yank. 它包含在 Windows 版的 Neovim 中, 只要确保它在 PATH 下即可. 也就是说, 基本上只要安装好 Windows 版的 Neovim, 剪切板就已经打通了. 我们只需要设置好按键映射, 让复制粘贴使用 +* 寄存器.

vim.keymap.set({'i', 'c'}, '<C-v>', '<C-r>+')
vim.keymap.set('v', '<C-v>', '"+p')
vim.keymap.set('t', '<C-v>', '<C-\\><C-n>"+pa')
vim.keymap.set('v', '<C-c>', '"+y')

我将 visual 模式的 Ctrl-C 映射成复制; 将插入模式, 命令模式, visual 模式, 终端模式的 Ctrl-V 映射成粘贴. 普通模式的 Ctrl-V 还是要保留原本的行为, 用于进入 blockwise visual 模式.

WSL 中可以直接执行 exe, 所以只需要让 win32yank 在 WSL 的 PATH 下即可打通剪切板.

对于 ssh 远程访问的 Neovim, 可以使用 tmux. tmux 也有一套自己的剪切板系统, 叫做 buffer. tmux load-buffer 用于从文件中加载内容到 buffer, 相当于设置剪切板内容; tmux save-buffer 用于将 buffer 中的内容保存到文件, 相当于读取剪切板内容. 最新版的 tmux (3.3a) 更是支持将 buffer 的内容发送到目标客户端的剪切板中, 只需要执行 load-buffer 时带上 -w 选项即可. 这样就打通了远程 Neovim 和本机的剪切板.

Neovim 原生支持 tmux 作为剪切板工具, 但是不支持 -w 参数. 不过 Neovim 支持用户自定义剪切板工具, 可以指定复制粘贴时访问寄存器的行为:

if vim.env.TMUX then
vim.g.clipboard = {
name = 'tmux-clipboard',
copy = {
['+'] = {'tmux', 'load-buffer', '-w', '-'},
},
paste = {
['+'] = {'tmux', 'save-buffer', '-'},
},
cache_enabled = true,
}
end

上面的配置表示当复制到 + 寄存器时, 会执行命令 tmux load-buffer -w - 将复制的内容以标准输入的形式传递给 tmux; 当从 + 寄存器粘贴内容时, 会执行命令 tmux save-buffer - 从标准输出读取要粘贴的内容. 命令末尾的 - 告诉 tmux 从标准输入/输出读写内容.

跳转到行开头

在 Vim 中按下 0 跳转到当前行开头, 而按下 ^ 则跳转到当前行第一个非空白字符处. 但是 ^ 键的位置正好处于双手中间, 又要按住 Shift 键, 按起来并不方便. 我觉得 vscode 的交互处理很好: 按下 Home 键时, 会回到该行第一个非空白字符处; 再次按 Home, 又会跳转到行首. 这样一个键就实现了两种操作, 很方便.

得益于 Neovim 的高度可定制性, 我们也可以写几行代码在 Neovim 中实现这个功能.

local function home()
local head = (vim.api.nvim_get_current_line():find('[^%s]') or 1) - 1
local cursor = vim.api.nvim_win_get_cursor(0)
cursor[2] = cursor[2] == head and 0 or head
vim.api.nvim_win_set_cursor(0, cursor)
end

vim.keymap.set({'i', 'n'}, '<Home>', home)
vim.keymap.set('n', '0', home)

实现起来很简单, 无非是判断下当前光标的位置, 然后再看情况跳转就行.

恢复上次会话

像 vscode 这样的编辑器可以在打开时恢复上次会话, 自动打开上次打开的文件, 光标也会跳转到上次编辑的位置. Neovim 也有这个功能, 不过它不会自动执行, 而是将执行的时机和方式交给用户, 以保证充分的定制性. 我们只需要写几行代码即可实现类似的效果.

Neovim 使用 mksession 命令将当前会话状态保存成一个 .vim 会话文件. 保存的内容由 sessionoptions 选项指定, 可以是缓冲区, 当前目录, 窗口大小, 页签, 当前会话设置的全局变量, 按键映射等. 默认保存路径为当前目录. 只需要 source 保存的 .vim 会话文件即可还原会话状态.

我的做法是设置一个自动命令, 当退出 neovim 的时候保存当前会话状态. 没有必要保存太多东西, 我设置 sessionoptions 只保存最必要的几项内容

vim.opt.sessionoptions = 'buffers,curdir,tabpages,winsize'

保存在当前目录可能对版本控制不太友好, 因此我选择统一保存在固定的位置. 我将当前路径的 sha1 值作为会话文件的文件名.

local path = vim.fn.expand(vim.fn.stdpath('state') .. '/sessions/')

vim.api.nvim_create_autocmd('VimLeavePre', {callback = function()
vim.fn.mkdir(path, 'p')
vim.cmd('mks! ' .. path .. vim.fn.sha256(vim.fn.getcwd()) .. '.vim')
end})

为了不拖慢启动速度, 并且也不是每次打开都想恢复上次会话, 我选择设置一个自定义命令手动恢复会话. 恢复时只需要用 source 命令加载会话文件即可.

vim.api.nvim_create_user_command('Resume', function()
local fname = path .. vim.fn.sha256(vim.fn.getcwd()) .. '.vim'
if vim.fn.filereadable(fname) ~= 0 then
vim.cmd.source(fname)
end
end, {})

文件的上次光标位置保存在 '" 标记中. 创建一个自动命令在打开文件的时候检查这个标记并跳转, 即可实现恢复光标的上次位置.

vim.api.nvim_create_autocmd('BufReadPost', {callback = function()
local line = vim.fn.line('\'"')
if line > 1 and line <= vim.fn.line('$') then
vim.cmd.normal('g\'"')
end
end})

总结和体验

我这几个月使用体验主要有这几点

  • 纯键盘交互, 使用方便. 使用 Neovim 之后, 我使用鼠标的频率再次大大降低了. 之前使用 vscode + vscode-vim, 时不时还是会用到鼠标的: 切换到终端, 切换不同的 vscode, 相互复制粘贴等. 而 Neovim 可以配合 tmux, 在编辑器和终端之间随心所欲地切换, 并且剪切板也是互通的. btw 我的键盘是 HHKB, 其键数少, 键位方便 (特别是Ctrl 键和 ESC 键), 很适合这种交互逻辑.

  • 可定制性极强. Neovim 的插件生态很丰富, 你总能找当各种插件满足你的需求. 然而这不是最吸引人的, 因为 vscode 的插件生态也非常丰富. 我认为 Vim/Neovim 最大的特点是它的配置文件不是 json, 不是 ini, 而是一个完备的编程语言, 你可以随时在配置文件中写代码, 以实现你的种种需求. 如果你要给 vscode 或者 sublime 添加功能, 就必须动手写插件; 而 Neovim 却只需要在配置文件中写几行代码. Neovim 的 API 极为丰富, 你甚至可以用它实现一个 web 服务器.

  • Neovim 是一款现代化的编辑器. 正在不断推陈出新的 Vim 尚未过时, 何况是 Neovim. 事实上如果你的终端模拟器太古老, 你可能甚至无法正常使用 Neovim, 因为需要特殊字体的渲染支持, 真彩色终端的支持等. Neovim 有很多现代化特性, 如 LSP, Treesitter, 异步任务, 多线程, 远程控制等等; 但是界面却是 TUI, 装作一个老古董的样子. 总之它的功能之强大, 令我刚接触时为之惊叹.

  • 客制化是灵魂. 要用好 Neovim, 不应该复制别人的配置, 或者找一些 “开箱即用” 的配置, 一定要自己理解配置. 我认为 Vim/Neovim 的定位就是高度可定制, 如果你追求开箱即用, 那应该直接去用 vscode. Neovim 有丰富的帮助文档, 善用 help 命令, 总能找到解答; 而一旦理解并上手了, 就会从中找到很多乐趣. 这就是 DIY 的乐趣, 类似于客制化键盘, 组装电脑, 业余无线电… 把刷抖音的时间用来做这个, 也未尝不可.

2022 Annual Summary

作者 Luyu Huang
2023年1月11日 00:00

2022 is a bit tough for many of us. The pandemic disturbed manypeople’s life. The Chinese internet industry is not that prosperous inthe year: many companies layoffs and many people are unemployed, manygame projects were aborted due to suspended issued publishing licenses,many companies’ share prices fell, and so on. I was infected withCOVID-19 at the end of 2022 and suffered from fever, body aches, andfatigue for a week.

Anyway, It’s worth celebrating at the beginning of 2023 as I’m stillalive, I still have a job, and I’ve done some meaningful work in theyear. We still hope that the world will be better and that all wishescome true.

Goals

I finished some of the new year’s resolutions for 2022.

  • Keep up learning English.
  • Finish reading C++Primer
  • Read Understand the LinuxKernel (4 chapters)
  • Keep up daily LeetCodeexercises
  • Keep up writing blogs, at least one post amonth (only 9 posts)
  • Keep up exercises (running, push-up)(did not run, just push-ups every day)

Learning

On weekends, I usually spend time reading tech books.

  • Finished reading C++ Primer. Last year I read half ofit.
  • Read 4 chapters of Understand the Linux Kernel.

LeetCode

This year I spent lots of time doing LeetCode, basically everyday.

leetcode

Now I’ve done 912 exercises, I think it might be a small achievementthat’s not very easy to achieve. Doing LeetCode has somewhat improved mycoding skill and logical mind. it’s a good habit that’s worth keepingup.

leetcode

Language

English learning is not easy. I’ve spent plenty of time learningEnglish, including memorizing words and practicing listening andspeaking. I insist on memorizing words every day this year and untilnow, I’ve used Baicizhan to insist on memorizing words for 1709days.

English

I also spent lots of time on Open Language to practice listening andspeaking. It helped a lot at the beginning, but now I feel it doesn’thelp much. I think I’ve hit the English learning plateau.

In addition to English learning, I’ve been learning Japanese onDuolingo. I didn’t spend much time on that, basically two units a day.It’s just for fun.

duolingo

Creating

  • (only) Wrote 9 post on my blog.

This year I didn’t create much work in my spare time. I intended touse C++ to write a VPN software, but I only completed the most basicfeature and have not released an initial version. I just had somesporadic commit records on Github.

github

Something Happy

The happiest thing is that I’ve had a good time with the people Ilove. Thanks for her company to make me no longer feel lonely.

flowers

Finally

Not long ago I revisited My Life as McDull (麦兜故事), afamous Hongkong film I watched when I was a kid. But I didn’t understandits profound central idea at that age. I love what it said at theending:

Roasted chicken is easy to cook. The material is a chicken; themethod is to take a chicken and roast it, and then it’s done. If youwant it to be delicious, the secret is to cook it better.

mcdull

Well, life might be a simple process from birth to death. It’ssimple, but it also can be complicated if you want. If you want it to bebeautiful, the secret is to be positive and try to make it better.

Although we might be going to face challenges in 2023, we stillbelieve that good things are about to happen. Happy New Year to us.

谈谈 C++ 中的内存顺序 (Memory Order)

作者 Luyu Huang
2022年6月25日 00:00

C++11 将多线程纳入了标准. 一旦涉及到多线程,就需要考虑并发, 数据竞争 (date race),线程同步等问题, 为此 C++ 提供了互斥锁std::mutex, 原子变量 std::atomic 等标准库.对于原子变量的操作, 有一个很重要的概念就是内存顺序 (memoryorder), 其中涉及到的概念很多, 理解起来可能会有些困难.本文我们来谈谈这个话题.

本文可能有些长, 涉及到的概念有些多. 其中 3.4 节和 3.5 节标注了星号,它们的实际应用较少, 不感兴趣的同学可以先跳过, 或者读完全文后再阅读.

1. 原子变量

我们不能在两个线程中同时访问修改一个变量, 这会导致数据竞争的问题.程序的结果是未定义的. 从实现上来说, 我们不能保证读写操作是原子的, 例如32 位机器上, 修改一个 64 位变量可能需要两条指令;或者变量有可能只是在寄存器里, 对其的修改要在稍后才会写入内存.解决数据竞争的方式除了使用 std::mutex 加锁,还可以使用原子变量.

1
2
3
4
5
6
7
8
9
std::atomic<int> a{0};

void thread1() {
a = 1;
}

void thread2() {
std::cout << a << std::endl;
}

上面的例子展示了原子变量最简单的用法. 使用原子变量不用担心数据竞争,对它的操作都是原子的. 除此之外, 原子变量的操作可以指定内存顺序,帮助我们实现线程同步, 这也是本文的重点. 上面的代码中, 线程 1将值写入原子变量 a, 线程 2 则读取 a 中的值.这便是原子变量最基础的两种操作.

1.1 原子变量的操作

对原子变量的操作可以分为三种

  1. store. 将一个值存到原子变量中.
  2. load. 读取原子变量中的值.
  3. read-modify-write (RMW). 原子地执行读取, 修改和写入. 如自增操作fetch_add, 交换操作 exchange(返回变量当前的值并写入指定值) 等.

每个原子操作都需要指定一个内存顺序 (memory order).不同的内存顺序有不同的语义, 会实现不同的顺序模型 (order model),性能也各不相同. C++ 中有六种内存顺序

1
2
3
4
5
6
7
8
enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst,
};

这六种内存顺序相互组合可以实现三种顺序模型 (ordering model)

  • Sequencial consistent ordering. 实现同步, 且保证全局顺序一致 (singletotal order) 的模型. 是一致性最强的模型, 也是默认的顺序模型.
  • Acquire-release ordering. 实现同步,但不保证保证全局顺序一致的模型.
  • Relaxed ordering. 不能实现同步, 只保证原子性的模型.

稍后我们会详细讨论这六种内存顺序. atomic::storeatomic::load 函数都有一个内存顺序的参数, 默认为memory_order_seq_cst. 它们的声明如下

1
2
void store(T desired, std::memory_order order = std::memory_order_seq_cst);
T load(std::memory_order order = std::memory_order_seq_cst) const;

此外 std::atomic 重载了运算符,我们可以像使用普通变量一样读写原子变量.例如上面代码中两个线程的读写操作分别调用的是std::atomic<int>::operator=(int)std::atomic<int>::operator int().此时会使用默认的内存顺序, 也就是 memory_order_seq_cst.

2. 基础概念

在开始讲这六种内存顺序之前, 有必要先了解一下几个最基础的概念.

2.1 修改顺序 (Modificationorders)

对一个原子变量的所有修改操作总是存在一定的先后顺序,且所有线程都认可这个顺序, 即使这些修改操作是在不同的线程中执行的.这个所有线程一致同意的顺序就称为修改顺序 (modificationorder). 这意味着

  • 两个修改操作不可能同时进行, 一定存在一个先后顺序. 这很容易理解,因为这是原子操作必须保证的, 否则就有数据竞争的问题.
  • 即使每次运行的修改顺序可能都不同,但所有线程看到的修改顺序总是一致的. 如果线程 a 看到原子变量 x 由 1 变成2, 那么线程 b 就不可能看到 x 由 2 变成 1.

无论使用哪种内存顺序, 原子变量的操作总能满足修改顺序一致性,即使是最松散的 memory_order_relaxed. 我们来看一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
std::atomic<int> a{0};

void thread1() {
for (int i = 0; i < 10; i += 2)
a.store(i, std::memory_order_relaxed);
}

void thread2() {
for (int i = 1; i < 10; i += 2)
a.store(i, std::memory_order_relaxed);
}

void thread3(vector<int> *v) {
for (int i = 0; i < 10; ++i)
v->push_back(a.load(std::memory_order_relaxed));
}

void thread4(vector<int> *v) {
for (int i = 0; i < 10; ++i)
v->push_back(a.load(std::memory_order_relaxed));
}

int main() {
vector<int> v3, v4;
std::thread t1(thread1), t2(thread2), t3(thread3, &v3), t4(thread4, &v4);
t1.join(), t2.join(), t3.join(), t4.join();

for (int i : v3) cout << i << " ";
cout << endl;
for (int i : v4) cout << i << " ";
cout << endl;
return 0;
}

上面的代码创建了 4 个线程. thread1thread2 分别将偶数和奇数依次写入原子变量 a,thread3thread4 则读取它们. 最后输出thread3thread4 每次读取到的值.程序运行的结果可能是这样的

1
2
3
4
5
6
7
$ ./test-modification-order
1 8 7 7 7 9 9 9 9 9
0 2 8 8 8 7 9 9 9 9

$ ./test-modification-order
1 2 5 6 9 9 9 8 8 8
1 3 2 5 9 8 8 8 8 8

虽然每次运行的修改顺序不同, 各个线程也不太可能看到每次修改的结果,但是它们看到的修改顺序是一致的. 例如 thread3 看到 8 先于 9,thread4 也会看到 8 先于 9, 反之亦然.

2.2 Happens-before

Happens-before 是一个非常重要的概念. 如果操作 a“happens-before” 操作 b, 则操作 a 的结果对于操作 b 可见. happens-before的关系可以建立在用一个线程的两个操作之间,也可以建立在不同的线程的两个操作之间.

2.2.1 单线程的情况:sequenced-before

单线程的情况很容易理解. 函数的语句按顺序依次执行, 前面的语句先执行,后面的后执行. 正式地说, 前面的语句总是“sequenced-before” 后面的语句. 显然, 根据定义,sequenced-before 具有传递性:

  • 如果操作 a “sequenced-before” 操作 k, 且操作 k “sequenced-before”操作 b, 则操作 a “sequenced-before” 操作 b.

Sequenced-before 可以直接构成 happens-before 的关系. 如果操作 a“sequenced-before” 操作 b, 则操作 a “happens-before” 操作 b. 例如

1
2
a = 42; // (1)
cout << a << endl; // (2)

语句 (1) 在语句 (2) 的前面, 因此语句 (1) “sequenced-before” 语句 (2),也就是 (1) “happens-before” 语句 (2). 所以 (2) 可以打印出 (1)赋值的结果.

2.2.2多线程的情况: synchronizes-with 和 inter-thread happens-before

多线程的情况就稍微复杂些. 一般来说多线程都是并发执行的,如果没有正确的同步操作, 就无法保证两个操作之间有 happens-before 的关系.如果我们通过一些手段, 让不同线程的两个操作同步, 我们称这两个操作之间有synchronizes-with 的关系.稍后我们会详细讨论如何组合使用 6 种内存顺序, 让两个操作达成synchronizes-with 的关系.

如果线程 1 中的操作 a “synchronizes-with” 线程 2 中的操作 b, 则操作 a“inter-thread happens-before” 操作 b. 此外synchronizes-with 还可以 “后接” 一个 sequenced-before 关系组合成inter-thread happens-before 的关系:

  • 如果操作 a “synchronizes-with” 操作 k, 且操作 k “sequenced-before”操作 b, 则操作 a “inter-thread happens-before” 操作 b.

Inter-thread happens-before 关系则可以 “前接” 一个 sequenced-before关系以延伸它的范围; 而且 inter-thread happens-before 关系具有传递性:

  • 如果操作 a “sequenced-before” 操作 k, 且操作 k “inter-threadhappens-before” 操作 b, 则操作 a “inter-thread happens-before” 操作b.
  • 如果操作 a “inter-thread happens-before” 操作 k, 且操作 k“inter-thread happens-before” 操作 b, 则操作 a “inter-threadhappens-before” 操作 b.

正如它的名字暗示的, 如果操作 a “inter-thread happens-before” 操作 b,则操作 a “happens-before” 操作 b. 下图展示了这几个概念之间的关系:

happens-before

注意, 虽然 sequenced-before 和 inter-thread happens-before都有传递性, 但是 happens-before 没有传递性.后面我们会在 3.5 节中看到这个性质的重要性, 以及 C++为什么要定义这么多概念.

现在我们来看一个例子. 假设下面的代码中 unlock() 操作“synchronizes-with” lock() 操作.

1
2
3
4
5
6
7
8
9
void thread1() {
a += 1 // (1)
unlock(); // (2)
}

void thread2() {
lock(); // (3)
cout << a << endl; // (4)
}

假设直到 thread1 执行到 (2) 之前, thread2都会阻塞在 (3) 处的 lock() 中. 那么可以推导出:

  • 根据语句顺序, 有 (1) “sequenced-before” (2) 且 (3)“sequenced-before” (4);
  • 因为 (2) “synchronizes-with” (3) 且 (3) “sequenced-before” (4), 所以(2) “inter-thread happens-before” (4);
  • 因为 (1) “sequenced-before” (2) 且 (2) “inter-thread happens-before”(4), 所以 (1) “inter-thread happens-before” (4); 所以 (1)“happens-before” (4).

因此 (4) 可以读到 (1) 对变量 a 的修改.

2.3 Happens-before不代表指令实际的执行顺序

需要说明的是, happens-before 是 C++ 语义层面的概念, 它并不代表指令在CPU 中实际的执行顺序. 为了优化性能,编译器会在不破坏语义的前提下对指令重排. 例如

1
2
3
4
5
6
extern int a, b;
int add() {
a++;
b++;
return a + b;
}

虽然有 a++; “happens-before” b++;,但编译器实际生成的指令可能是先加载 a, b两个变量到寄存器, 接着分别执行 “加一” 操作, 然后再执行a + b, 最后才将自增的结果写入内存.

1
2
3
4
5
6
7
8
9
add():
movl a(%rip), %eax # 将变量 a 加载到寄存器
movl b(%rip), %ecx # 将变量 b 加载到寄存器
addl $1, %eax # a 的值加一
leal 1(%rcx), %edx # b 的值加一
movl %eax, a(%rip) # 将 a 加一的结果写入内存
addl %edx, %eax # a + b
movl %edx, b(%rip) # 将 b 加一的结果写入内存
ret

上面展示了 x86-64 下的一种可能的编译结果. 可以看到 C++的一条语句可能产生多条指令, 这些指令都是交错执行的.其实编译器甚至还有可能先自增 b 再自增 a.这样的重排并不会影响语义, 两个自增操作的结果仍然对return a + b; 可见.

3. 内存顺序

前面我们提到 C++ 的六种内存顺序相互组合可以实现三种顺序模型.现在我们来具体看看如何使用这六种内存顺序, 以及怎样的组合可以实现synchronizes-with 的关系.

3.1 memory_order_seq_cst

memory_order_seq_cst 可以用于 store, load 和read-modify-write 操作, 实现 sequencial consistent 的顺序模型.在这个模型下, 所有线程看到的所有操作都有一个一致的顺序,即使这些操作可能针对不同的变量, 运行在不同的线程. 2.1节中我们介绍了修改顺序 (modification order),即单一变量的修改顺序在所有线程看来都是一致的. Sequencial consistent则将这种一致性扩展到了所有变量. 例如

1
2
3
4
5
6
7
8
9
std::atomic<bool> x{false}, y{false};

void thread1() {
x.store(true, std::memory_order_seq_cst); // (1)
}

void thread2() {
y.store(true, std::memory_order_seq_cst); // (2)
}

thread1thread2 分别修改原子变量xy. 运行过程中, 有可能先执行 (1) 再执行(2), 也有可能先执行 (2) 后执行 (1). 但无论如何,所有线程中看到的顺序都是一致的. 因此如果我们这样测试这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::atomic<int> z{0};

void read_x_then_y() {
while (!x.load(std::memory_order_seq_cst)); // (3)
if (y.load(std::memory_order_seq_cst)) ++z; // (4)
}

void read_y_then_x() {
while (!y.load(std::memory_order_seq_cst)); // (5)
if (x.load(std::memory_order_seq_cst)) ++z; // (6)
}

int main() {
std::thread a(thread1), b(thread2), c(read_x_then_y), d(read_y_then_x);
a.join(), b.join(), c.join(), d.join();
assert(z.load() != 0); // (7)
}

(7) 处的断言永远不会失败. 因为 xy的修改顺序是全局一致的, 如果先执行 (1) 后执行 (2), 则read_y_then_x 中循环 (5) 退出时, 能保证 ytrue, 此时 x 也必然为 true, 因此(6) 会被执行; 同理, 如果先执行 (2) 后执行 (1), 则循环 (3) 退出时y 也必然为 true, 因此 (4) 会被执行. 无论如何,z 最终都不会等于 0.

Sequencial consistent 可以实现 synchronizes-with 的关系. 如果一个memory_order_seq_cst 的 load 操作在某个原子变量上读到了一个memory_order_seq_cst 的 store 操作在这个原子变量中写入的值,则 store 操作 “synchronizes-with” load 操作. 在上面的例子中, 有 (1)“synchronizes-with” (3) 和 (2) “synchronizes-with” (5).

实现 sequencial consistent 模型有一定的开销. 现代 CPU 通常有多核,每个核心还有自己的缓存. 为了做到全局顺序一致,每次写入操作都必须同步给其他核心. 为了减少性能开销,如果不需要全局顺序一致, 我们应该考虑使用更加宽松的顺序模型.

3.2 memory_order_relaxed

memory_order_relaxed 可以用于 store, load 和read-modify-write 操作, 实现 relaxed 的顺序模型. 这种模型下,只能保证操作的原子性和修改顺序 (modification order) 一致性, 无法实现synchronizes-with 的关系. 例如

1
2
3
4
5
6
std::atomic<bool> x{false}, y{false};

void thread1() {
x.store(true, std::memory_order_relaxed); // (1)
y.store(true, std::memory_order_relaxed); // (2)
}

thread1 对不同的变量执行 store 操作. 那么在某些线程看来,有可能是 x 先变为 true, y 后变为true; 另一些线程看来, 又有可能是 y 先变为true, x 后变为 true.如果这样测试这段代码:

1
2
3
4
void thread2() {
while (!y.load(std::memory_order_relaxed)); // (3)
assert(x.load()); // (4)
}

(4) 处的断言就有可能失败. 因为 (2) 与 (3) 之间没有 synchronizes-with的关系, 所以就不能保证 (1) “happens-before” (4). 因此 (4) 就有可能读到false. 至于 relaxed 顺序模型能保证的修改顺序一致性的例子,2.1 节中已经讨论过了, 这里就不多赘述了.

Relaxed 顺序模型的开销很小. 在 x86 架构下,memory_order_relaxed 的操作不会产生任何其他的指令,只会影响编译器优化, 确保操作是原子的. Relaxed模型可以用在一些不需要线程同步的场景, 但是使用时要小心. 例如std::shared_ptr 增加引用计数时用的就是memory_order_relaxed, 因为不需要同步;但是减小应用计数不能用它, 因为需要与析构操作同步.

3.3 Acquire-release

在 acquire-release 模型中, 会使用 memory_order_acquire,memory_order_releasememory_order_acq_rel这三种内存顺序. 它们的用法具体是这样的:

  • 对原子变量的 load 可以使用 memory_order_acquire内存顺序. 这称为 acquire 操作.
  • 对原子变量的 store 可以使用 memory_order_release内存顺序. 这称为 release 操作.
  • read-modify-write 操作即读 (load) 又写 (store), 它可以使用memory_order_acquire, memory_order_releasememory_order_acq_rel:
    • 如果使用 memory_order_acquire, 则作为 acquire操作;
    • 如果使用 memory_order_release, 则作为 release操作;
    • 如果使用 memory_order_acq_rel, 则同时为两者.

Acquire-release 可以实现 synchronizes-with 的关系. 如果一个 acquire操作在同一个原子变量上读取到了一个 release 操作写入的值, 则这个 release操作 “synchronizes-with” 这个 acquire 操作. 我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
std::atomic<bool> x{false}, y{false};

void thread1() {
x.store(true, std::memory_order_relaxed); // (1)
y.store(true, std::memory_order_release); // (2)
}

void thread2() {
while (!y.load(std::memory_order_acquire)); // (3)
assert(x.load(std::memory_order_relaxed)); // (4)
}

在上面的例子中, 语句 (2) 使用 memory_order_releasey 中写入 true, 语句 (3) 中使用memory_order_acquirey 中读取值. 循环 (3)退出时, 它已经读取到了 y 的值为 true,也就是读取到了操作 (2) 中写入的值. 因此有 (2) “synchronizes-with” (3).根据 2.2 节介绍的规则我们可以推导出:

  • 因为 (2) “synchronizes-with” (3) 且 (3) “sequenced-before” (4), 所以(2) “inter-thread happens-before” (4);
  • 因为 (1) “sequenced-before” (2) 且 (2) “inter-thread happens-before”(4), 所以 (1) “inter-thread happens-before” (4);

所以 (1) “happens-before” (4). 因此 (4) 能读取到 (1) 中写入的值,断言永远不会失败. 即使 (1) 和 (4) 用的是memory_order_relaxed.

3.1 节我们提到 sequencial consistent 模型可以实现 synchronizes-with关系. 事实上, 内存顺序为 memory_order_seq_cst 的 load操作和 store 操作可以分别视为 acquire 操作和 release 操作.因此对于两个指定了 memory_order_seq_cst 的 store 操作和load 操作, 如果后者读到了前者写入的值, 则前者 “synchronizes-with”后者.

为了实现 synchronizes-with 关系, acquire 操作和 release操作应该成对出现. 如果 memory_order_acquire 的 load 读到了memory_order_relaxed 的 store 写入的值, 或者memory_order_relaxed 的 load 读到了memory_order_release 的 store 写入的值, 都不能实现synchronizes-with 的关系.

虽然 sequencial consistent 模型能够像 acquire-release 一样实现同步,但是反过来 acquire-release 模型不能像 sequencial consistent一样提供全局顺序一致性. 如果将 3.1 节的例子中的memory_order_seq_cst 换成 memory_order_acquirememory_order_release

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void thread1() {
x.store(true, std::memory_order_release); // (1)
}

void thread2() {
y.store(true, std::memory_order_release); // (2)
}

void read_x_then_y() {
while (!x.load(std::memory_order_acquire)); // (3)
if (y.load(std::memory_order_acquire)) ++z; // (4)
}

void read_y_then_x() {
while (!y.load(std::memory_order_acquire)); // (5)
if (x.load(std::memory_order_acquire)) ++z; // (6)
}

则最终不能保证 z 不为 0. 在同一次运行中,read_x_then_y 有可能看到先 (1) 后 (2), 而read_y_then_x 有可能看到先 (2) 后 (1). 这样有可能 (4) 和(6) 的 load 的结果都为 false, 导致最后 z仍然为 0.

Acquire-release 的开销比 sequencial consistent 小. 在 x86 架构下,memory_order_acquirememory_order_release的操作不会产生任何其他的指令, 只会影响编译器的优化: 任何指令都不能重排到acquire 操作的前面, 且不能重排到 release 操作的后面; 否则会违反acquire-release 的语义. 因此很多需要实现 synchronizes-with关系的场景都会使用 acquire-release.

3.4* Release sequences

到目前为止我们看到的, 无论是 sequencial consistent 还是acquire-release, 要想实现 synchronizes-with 的关系, acquire操作必须在同一个原子变量上读到 release 操作的写入的值. 如果 acquire操作没有读到 release 操作写入的值, 那么它俩之间通常没有synchronizes-with 的关系. 例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::atomic<int> x{0}, y{0};

void thread1() {
x.store(1, std::memory_order_relaxed); // (1)
y.store(1, std::memory_order_release); // (2)
}

void thread2() {
y.store(2, std::memory_order_release); // (3)
}

void thread3() {
while (!y.load(std::memory_order_acquire)); // (4)
assert(x.load(std::memory_order_relaxed) == 1); // (5)
}

上面的例子中, 只要 y 的值非 0 循环 (4) 就会退出.当它退出时, 有可能读到 (2) 写入的值, 也有可能读到 (3) 写入的值.如果是后者, 则只能保证 (3) “synchronizes-with” (4), 不能保证与 (2) 与(4) 之间有同步关系. 因此 (5) 处的断言就有可能失败.

但并不是只有在 acquire 操作读取到 release 操作写入的值时才能构成synchronizes-with 关系. 为了说这种情况, 我们需要引入 releasesequence 这个概念.

针对一个原子变量 M 的 release 操作 A 完成后, 接下来 M上可能还会有一连串的其他操作. 如果这一连串操作是由

  • 同一线程上的写操作, 或者
  • 任意线程上的 read-modify-write 操作

这两种构成的, 则称这一连串的操作为以 release 操作 A 为首的release sequence. 这里的写操作和 read-modify-write操作可以使用任意内存顺序.

如果一个 acquire 操作在同一个原子变量上读到了一个 release操作写入的值, 或者读到了以这个 release 操作为首的 release sequence写入的值, 那么这个 release 操作 “synchronizes-with” 这个 acquire 操作.我们来看个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::vector<int> data;
std::atomic<int> flag{0};

void thread1() {
data.push_back(42); // (1)
flag.store(1, std::memory_order_release); // (2)
}

void thread2() {
int expected = 1;
while (!flag.compare_exchange_strong(expected, 2, std::memory_order_relaxed)) // (3)
expected = 1;
}

void thread3() {
while (flag.load(std::memory_order_acquire) < 2); // (4)
assert(data.at(0) == 42); // (5)
}

上面的例子中, (3) 处的 compare_exchange_strong 是一种read-modify-write 操作, 它判断原子变量的值是否与期望的值 (第一个参数)相等, 如果相等则将原子变量设置成目标值 (第二个参数) 并返回true, 否则将第一个参数 (引用传递)设置成原子变量当前值并返回 false. 操作 (3) 会一直循环检查,当 flag 当值为 1 时, 将其替换成 2. 所以 (3) 属于 (2) 的release sequence. 而循环 (4) 退出时, 它已经读到了 (3) 写入的值, 也就是release 操作 (2) 为首的 release sequence 写入的值. 所以有 (2)“synchronizes-with” (4). 因此 (1) “happens-before” (5), (5)处的断言不会失败.

注意 (3) 处的 compare_exchange_strong 的内存顺序是memory_order_relaxed, 所以 (2) 与 (3) 并不构成synchronizes-with 的关系. 也就是说, 当循环 (3) 退出时, 并不能保证thread2 能读到 data.at(0) 为 42. 但是 (3) 属于(2) 的 release sequence, 当 (4) 以 memory_order_acquire的内存顺序读到 (2) 的 release sequence 写入的值时, 可以与 (2) 构成synchronizes-with 的关系.

3.5* memory_order_consume

memory_order_consume 其实是 acquire-release模型的一部分, 但是它比较特殊, 它涉及到数据间相互依赖的关系.为此我们又要提出两个新概念: carries dependencydependency-ordered before.

如果操作 a “sequenced-before” b, 且 b 依赖 a 的数据, 则 a “carries adependency into” b. 一般来说, 如果 a 的值用作 b 的一个操作数, 或者 b读取到了 a 写入的值, 都可以称为 b 依赖于 a. 例如

1
2
3
p++;  // (1)
i++; // (2)
p[i]; // (3)

有 (1) “sequenced-before” (2) “sequenced-before” (3); (1) 和 (2)的值作为 (3) 的下标运算符 [] 的操作数, 所以有 (1) “carriesa dependency into” (3) 和 (2) “carries a dependency into” (3). 但是 (1)和 (2) 并没有相互依赖, 它们之间没有 carries dependency 的关系. 类似于sequenced-before, carries dependency 关系具有传递性.

memory_order_consume 可以用于 load 操作. 使用memory_order_consume 的 load 称为 consume 操作. 如果一个consume 操作在同一个原子变量上读到了一个 release 操作写入的值,或以其为首的 release sequence 写入的值, 则这个 release 操作“dependency-ordered before” 这个 consume 操作.

Dependency-ordered before 可以 “后接” 一个 carries dependency的关系以延伸它的范围: 如果 a “dependency-ordered before” k 且 k “carriesa dependency into” b, 则 a “dependency-ordered before” b.Dependency-ordered before 可以直接构成 inter-thread happens-before的关系: 如果 a “dependency-ordered before” b 则 a “inter-threadhappens-before” b.

概念很复杂, 但是基本思路是:

  • release 操作和 acquire 操作构成的 synchronizes-with 可以后接sequenced-before 构成 inter-thread happens-before 的关系;
  • release 操作和 consume 操作构成的 dependency-ordered before则只能后接 carries dependency 构成 inter-thread happens-before的关系.
  • 无论 inter-thread happens-before 是怎么构成的, 都可以前接sequenced-before 以延伸其范围.
dependency-ordered before

我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::atomic<std::string*> ptr;
int data;

void thread1() {
std::string* p = new std::string("Hello"); // (1)
data = 42; // (2)
ptr.store(p, std::memory_order_release); // (3)
}

void thread2() {
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_consume))); // (4)
assert(*p2 == "Hello"); // (5)
assert(data == 42); // (6)
}

(4) 处的循环退出时, consume 操作 (4) 读取到 release 操作 (3)写入的值, 因此 (3) “dependency-ordered before” (4). 由此可以推导出:

  • p2 的值作为 (5) 的操作数, 因此 (4) “carries adependency into” (5);
  • 因为 (3) “dependency-ordered before” (4) 且 (4) “carries adependency into” (5), 所以 (3) “inter-thread happens-before” (5);
  • 因为 (1) “sequenced-before” (3) 且 (3) “inter-thread happens-before”(5), 所以 (1) “inter-thread happens-before” (5);

所以 (1) “happens-before” (5). 因此 (5) 可以读到 (1) 写入的值, 断言(5) 不会失败. 但是操作 (6) 并不依赖于 (4), 所以 (3) 和 (6) 之间没有inter-thread happens-before 的关系, 因此断言 (6) 就有可能失败. 回想 2.2节强调过的, happens-before 没有传递性. 所以不能说因为 (3)“happens-before” (4) 且 (4) “happens-before” (6) 所以 (2)“happens-before” (6).

与 acquire-release 类似, 在 x86 下使用memory_order_consume 的操作不会产生任何其他的指令,只会影响编译器优化. 与 consume 操作有依赖关系的指令都不会重排到 consume操作前面. 它对重排的限制比 acquire 宽松些, acquire要求所有的指令都不能重排到它的前面, 而 consume只要求有依赖关系的指令不能重排到它的前面. 因此在某些情况下, consume的性能可能会高一些.

4. 一些例子

前面讲了很多概念和理论, 现在我们来看两个实际的例子来加深理解.

4.1 自旋锁

在一些场景下, 如果锁被占用的时间很短, 我们会选择自旋锁,以减少上下文切换的开销. 锁一般用来保护临界数据的读写,我们希望同一时间只有一个线程能获取到锁, 且获取到锁后,被锁保护的数据总是最新的. 前者通过原子操作即可保证,而后者就需要考虑内存顺序了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::deque<int> queue;
spinlock mu;

void thread1() {
int val;
while ((val = read_from_remote())) {
mu.lock(); // (1)
queue.push_back(val); // (2)
mu.unlock(); // (3)
}
}

void thread2() {
while (true) {
mu.lock(); // (4)
cout << queue.front() << endl;
queue.pop_front(); // (5)
mu.unlock(); // (6)
}
}

两个线程并发运行, thread1 往队列里写入数据,thread2 从队列里读出数据. 入队操作 (2) 可能需要复制数据,移动指针, 甚至 resize 队列, 因此我们要保证获取到锁时,这些操作的结果完全可见. 出队操作也是同理. 所以自旋锁要保证 unlock 操作“synchronizes-with” lock 操作, 保证锁保护的数据是完整的.

我们可以用 acquire-release 模型实现自旋锁. 下面是一个简单的实现:

1
2
3
4
5
6
7
8
9
10
class spinlock {
std::atomic<bool> flag{false};
public:
void lock() {
while (flag.exchange(true, std::memory_order_acquire)); // (1)
}
void unlock() {
flag.store(false, std::memory_order_release); // (2)
}
};

上面的实现中, (1) 处加锁用到的 exchange 是一种read-modify-write 操作, 它将目标值 (第一个参数) 写入原子变量,并返回写入前的值. 在这个实现中, 锁被占用时 flagtrue. 如果锁被占用, (1) 处的 exchange 操作会一直返回true, 线程阻塞在循环中; 直到锁被释放, flagfalse, exchange 操作将 flag 重新置为true 以抢占锁, 并且返回其原来的值 false,循环退出, 加锁成功. 解锁则很简单, 将 flag 置为false 即可.

由于解锁操作使用 memory_order_release 且加锁操作使用memory_order_acquire,所以能保证加锁成功时与上一次解锁操作构成 “synchronizes-with” 的关系,也就是 unlock 操作 “synchronizes-with” lock 操作.

加锁时的 exchange 操作是一个 read-modify-write 操作, 它既读又写.当它使用 memory_order_acquire 时, 只能保证它读的部分是一个acquire 操作. 如果有两个线程抢占同一个锁

1
2
3
4
5
6
7
8
9
10
spinlock mu;

void thread1() {
// some operations
mu.lock(); // (1)
}

void thread2() {
mu.lock(); // (2)
}

(1) 和 (2) 之间没有任何同步关系, 假设先执行操作 (1) 后执行操作 (2),那么 thread1 中 (1) 之前的操作结果不一定对thread2 可见. 但能确定的是, 只会有一个线程得到锁,这是由原子变量的修改顺序 (modification order) 所保证的. 要么thread1 先将 flag 置为 true, 要么thread2 先将 flag 置为 true,这个顺序是全局一致的.

4.2 线程安全的单例模式

单例模式是一种很常用的设计模式.我们通常用一个静态成员指针存储这个类的唯一实例,然后用一个静态成员函数获取它, 如果指针为空则创建.

1
2
3
4
5
6
7
8
9
class App {
static App *instance;
public:
static App *get_instance() {
if (!instance)
instance = new App;
return instance;
}
};

但是这种做法在多线程下有并发的问题. 解决这个问题最简单的办法就是加锁.但是给整个函数加锁是没有必要的,因为只有在初始创建对象时才会有并发的问题, 后续则只需要返回指针,此时的锁会造成不必要的性能负担. 更好的做法是仅在要创建对象的时候加锁,我们可以这样实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
class App {
static App *instance;
static std::mutex mu;
public:
static App *get_instance() {
if (!instance) { // (1)
std::lock_guard<std::mutex> guard(mu);
if (!instance)
instance = new App; // (2)
}
return instance;
}
};

在上面的实现中, 如果发现 instance 指针为空,则加锁并创建对象. 获取到锁后, 还需要再判断一下 instance是否为空, 以免在判断 (1) 之后, 锁获取到之前, 有其他线程创建了对象.但是这种做法是有问题的: (1) 并没有在锁的保护下, 它有可能与 (2) 并发,导致数据竞争.

我们可以使用原子变量解决这个问题, 将 instance指针改成原子类型 std::atomic<App*>. 那么在读取和写入instance 时, 应该使用什么内存顺序呢,memory_order_relaxed 可以吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class App {
static std::atomic<App*> instance;
static std::mutex mu;
public:
static App *get_instance() {
auto *p = instance.load(std::memory_order_relaxed); // (1)
if (!p) {
std::lock_guard<std::mutex> guard(mu);
if (!(p = instance.load(std::memory_order_relaxed))) { // (2)
p = new App;
instance.store(p, std::memory_order_relaxed); // (3)
}
}
return p;
}
};

假设线程 1 调用了 get_instance, 发现对象没有创建,然后成功获取到锁并且创建对象, 接着执行 (3) 将新对象的指针写入instance 中; 随后线程 2 也调用 get_instance执行 (1) 读取到线程 1 在操作 (3) 中写入的值, 此时能保证得到的指针p 是有效的吗?

注意线程 1 执行 p = new App 时需要调用构造函数初始化App 中的成员. 因为 (1) 和 (3) 都是memory_order_relaxed 的内存顺序, 它们之间没有任何synchronizes-with 的关系. 所以当线程 2 在操作 (1) 读取到线程 1 在操作(3) 写入的指针时, 不能保证 App 成员的初始化结果对线程 2可见. 这会导致线程 2 得到的对象数据不完整, 造成非常意外的结果.

正确的做法是使用 acquire-release 模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class App {
static std::atomic<App*> instance;
static std::mutex mu;
public:
static App *get_instance() {
auto *p = instance.load(std::memory_order_acquire); // (1)
if (!p) {
std::lock_guard<std::mutex> guard(mu);
if (!(p = instance.load(std::memory_order_relaxed))) { // (2)
p = new App;
instance.store(p, std::memory_order_release); // (3)
}
}
return p;
}
};

这样当线程 2 在操作 (1) 读取到线程 1 在操作 (3) 写入的指针时, 有 (3)“synchronizes-with” (1). 因此线程 1 初始化 App成员的结果对线程 2 可见, 此时线程 2 返回 p不会有任何问题.

操作 (2) 仍然使用 memory_order_relaxed,因为它在锁的保护下, 锁可以保证线程同步, 因此没有问题.

5. 总结

总结一下这几种内存顺序模型:

  • memory_order_relaxed: 最宽松的内存顺序,只保证操作的原子性修改顺序 (modificationorder).
  • memory_order_acquire, memory_order_releasememory_order_acq_rel: 实现 acquire操作release 操作, 如果 acquire 操作读到了release 操作写入的值, 或其 release sequence 写入的值, 则构成synchronizes-with 关系, 进而可以推导出happens-before 的关系.
  • memory_order_consume: 实现 consume操作, 能实现数据依赖相关的同步关系. 如果 consume 操作读到了release 操作写入的值, 或其 release sequence 写入的值, 则构成dependency-ordered before 的关系,对于有数据依赖的操作可以进而推导出 happens-before的关系.
  • memory_order_seq_cst: 加强版的 acquire-release 模型,除了可以实现 synchronizes-with 关系,还保证全局顺序一致.

6. 扩展阅读

对于有些概念, 如 sequenced-before 和 carries dependency,本文只描述了其中几种简单的情况, 并没有给出严谨的定义.在实际应用中我们一般不用考虑它们的严格定义,但如果你需要了解或对其感兴趣, 可以参考 cppreference.com.

关于内存顺序, C++ 还有一些本文没提到的功能, 如内存栅栏 (memoryfences). C++ 的原子变量提供的操作也很多, 本文只提到了其中一部分.对其感兴趣或需要了解的同学可以参考 C++ Concurrency in Action 和cppreference.com. 限于篇幅, 本文的有些内容可能解释地不够详尽,或者定义不够严谨. 如果想了解概念的严格定义, 可以参考 cppreference.com;如果需要详细的讲解, 可以参考 C++ Concurrency in Action.


参考资料:

  • C++ Concurrency in Action: Practical Multithreading, AnthonyWilliams.
  • std::memory_order- cppreference.com
❌
❌