普通视图

发现新文章,点击刷新页面。
昨天以前mghio

Dubbo 中的集群容错

作者 mghio
2025年4月4日 21:28

apache_dubbo.jpg

前言

在微服务架构中,服务间的依赖关系复杂且动态,任何一个服务的故障都可能引发连锁反应,导致系统雪崩。一个好的容错设计可以避免这些问题发生:

  • 服务雪崩效应:单个服务崩溃或响应延迟可能导致调用链上的所有服务被阻塞,最终拖垮整个系统。例如,若服务 A 依赖服务 B,而服务 B 因高负载无法响应,A 的线程池可能被占满,进而影响其他依赖A的服务;

  • 分布式系统的脆弱性:网络抖动、节点宕机、资源耗尽等问题在分布式环境中不可避免。容错机制通过冗余和快速失败策略,确保部分故障不会扩散到整个系统;

  • 服务的可用性低:微服务的目标是提升系统可用性,而容错设计(如故障转移、熔断)是保障服务持续可用的核心手段。例如,通过自动切换健康节点,避免单点故障。

Dubbo 的集群容错机制

在 Dubbo 中,多个 Provider 实例构成一个「集群」。消费者调用时,Dubbo 通过 Cluster 模块实现容错策略的封装和路由,Cluster 模块会根据配置(如 cluster=failover)装配不同的容错策略实现类,对 Directory 中的多个 Invoker 进行处理,返回一个可执行的 Invoker。Dubbo 当前已支持以下 6 种容错策略(在 org.apache.dubbo.rpc.cluster.support 包下):

策略简称实现类名特性使用场景
FailoverFailoverClusterInvoker失败自动重试,默认实现网络不稳定,民登操作
FailfastFailfastClusterInvoker快速失败,不重试响应时间敏感,非幂等
FailsafeFailsafeClusterInvoker失败忽略异常日志记录、监控等非主要场景
FailbackFailbackClusterInvoker失败后后台重试可容忍失败,后续补偿重试
ForkingForkingClusterInvoker并行调用多个节点,最快成功返回实时性要求高,资源充足
BroadcastBroadcastClusterInvoker广播方式调用所有服务提供着配置更新、通知类等操作

Failover Cluster(失败自动切换,默认策略)

实现原理:通过循环重试实现容错。
实现源码关键点:

  1. FailoverClusterInvoker 的 doInvoke 方法中,通过 for 循环控制重试次数(默认重试 2 次,共调用 3 次);
  2. 每次重试前调用 list(invocation) 重新获取最新的 Invoker 列表,确保动态感知节点变化。
1
2
3
4
5
6
7
8
// 代码片段:org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke
for (int i = 0; i < len; i++) {
if (i > 0) {
copyInvokers = list(invocation); // 动态刷新 Invoker 列表
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
// 调用并处理异常...
}

Failfast Cluster(快速失败)

实现原理:仅发起一次调用,异常直接抛出。
实现源码关键点:

  1. FailfastClusterInvoker 直接调用目标 Invoker,不进行重试。
1
2
3
4
5
6
// 代码片段:org.apache.dubbo.rpc.cluster.support.FailfastClusterInvoker#doInvoke
fpublic Result doInvoke(...) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation); // 仅一次调用
}

Failsafe Cluster(失败安全)

实现原理:异常被捕获后返回空结果,不中断流程。
实现源码关键点:

  1. ailsafeClusterInvoker通过try-catch捕获异常并记录日志。
1
2
3
4
5
6
7
// 代码片段:org.apache.dubbo.rpc.cluster.support.FailsafeClusterInvoker
try {
// 调用逻辑...
} catch (Throwable e) {
logger.error("Failsafe ignore exception", e);
return new RpcResult(); // 返回空结果
}

Failback Cluster(失败自动恢复)

实现原理:失败请求存入队列,定时重试。
实现源码关键点:

  1. 捕获失败异常,使用 RetryTimerTask 存储失败请求,定时触发重试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 代码片段:org.apache.dubbo.rpc.cluster.support.FailbackClusterInvoker#doInvoke
private void addFailed(
LoadBalance loadbalance,
Invocation invocation,
List<Invoker<T>> invokers,
Invoker<T> lastInvoker,
URL consumerUrl) {
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
failTimer = new HashedWheelTimer(
new NamedThreadFactory("failback-cluster-timer", true),
1,
TimeUnit.SECONDS,
32,
failbackTasks);
}
}
}
RetryTimerTask retryTimerTask = new RetryTimerTask(
loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD, consumerUrl);
try {
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error(
CLUSTER_TIMER_RETRY_FAILED,
"add newTimeout exception",
"",
"Failback background works error, invocation->" + invocation + ", exception: " + e.getMessage(),
e);
}
}

Forking Cluster(并行调用)

实现原理:并发调用多个节点,首个成功结果即返回。
实现源码关键点:

  1. 使用线程池并发调用,结果通过 BlockingQueue 异步接收。
1
2
3
4
5
6
7
// 代码片段:org.apache.dubbo.rpc.cluster.support.ForkingClusterInvoker#doInvoke
for (Invoker<T> invoker : selected) {
executor.execute(() -> {
Result result = invoker.invoke(invocation);
ref.offer(result); // 结果存入队列
});
}

Broadcast Cluster(广播调用)

实现原理:逐个调用所有节点,任一失败则整体失败。
实现源码关键点:

  1. 遍历所有 Invoker 调用,异常累积后抛出。
1
2
3
4
5
6
7
8
9
// 代码片段:org.apache.dubbo.rpc.cluster.support.BroadcastClusterInvoker#doInvoke
for (Invoker<T> invoker : invokers) {
try {
invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
}
}
if (exception != null) throw exception;

如何自定义集群容错策略

如果以上提供的容错策略不满足需求,Dubbo 支持通过 SPI 自定义 Cluster 实现,步骤如下:

第一步:实现 Cluster 和 AbstractClusterInvoker
1
2
3
4
5
6
7
8
9
@SPI("custom")
public class MyCluster implements Cluster {

@Override
public <T> Invoker<T> join(Directory<T> directory) {
return new MyClusterInvoker<>(directory);
}

}
1
2
3
4
5
6
7
8
public class MyClusterInvoker<T> extends AbstractClusterInvoker<T> {

@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) {
// 自定义逻辑,例如条件重试、动态路由等
}

}
第二步:添加 SPI 配置

META-INF/dubbo/org.apache.dubbo.rpc.cluster.Cluster 中添加配置:

1
mycluster=com.example.MyCluster
第三步:配置使用自定义容错策略
1
<dubbo:reference cluster="mycluster" />

总结

建议核心服务优先使用 Failover(失败自动切换) 策略保障可用性,非核心服务可降级为 Failsafe(失败安全)。同时结合 Hystrix(已停止更新)Sentinel 实现熔断与限流,增强容错能力。

通过灵活组合 Dubbo 的容错策略,可显著提升分布式系统的鲁棒性。实际应用配置时需要根据业务特性权衡延迟、资源开销与一致性要求,一切皆是 trade off ~

P.S. 不妨再深入思考一下:Dubbo 的集群容错实现中有哪些优秀设计值得我们学习?

《UNIX 传奇:历史与回忆》读后感

作者 mghio
2024年12月21日 14:19

UNIX-A-History-and-a-Memoir.jpg

《UNIX 传奇:历史与回忆》 是 bwk(Brian W. Kernighan)2019 年的新作,回忆了 UNIX 在大半个世纪的风雨历程,是一本引人入胜的书籍。通过对 UNIX 操作系统的历史和发展进行详细的叙述和回顾,让我对这个操作系统有了更深入的了解。读完这本书,我不仅对 UNIX 的技术细节有了更清晰的认识,也对 UNIX 的影响力和价值有了更深刻的体会。

书中首先回顾了 UNIX 的诞生和发展过程,从贝尔实验室的研究项目到成为世界上最重要的操作系统没有之一,UNIX 经历了漫长而曲折的发展历程。作者通过详细的叙述和丰富的历史资料,将 UNIX 的发展与当时的技术环境和社会背景相结合,深入分析了 UNIX 的成功原因和对计算机科学的影响。

在书里,作者还介绍了 UNIX 的设计原则和哲学思想,如小即是美一切皆文件等,这些原则不仅体现了 UNIX 的简洁和灵活性,也影响了后来的操作系统设计。通过对 UNIX 设计原则的解读,我对 UNIX 的设计理念有了更深入的理解,也对软件设计和开发有了新的思考。

ken-and-den.jpg

上图(来源)中站着的是 dmr(Dennis MacAlistair Ritchie)、坐着打字的是 Ken(Ken Thompson) 和几台 PDP-11。此外,本书还详细介绍了 UNIX 的核心组件和功能,如文件系统、进程管理、网络通信等等。通过对这些功能的解析,会 UNIX 的内部机制有了更深入的了解,也对操作系统的工作原理有了更全面的认识。同时,书中还介绍了 UNIX 的各种衍生版本和相关技术,如 Linux、BSD(加州大学伯克利分校维护的版本)等,这些衍生版本不仅丰富了 UNIX 的功能和应用领域,也推动了开源软件的发展。

书中除了对 UNIX 技术的介绍,还涉及了 UNIX 社区的发展和文化。UNIX 社区以其开放、自由的精神吸引了众多开发者和用户,形成了独特的文化氛围。通过对 UNIX 社区的描述和分析,我对 UNIX 社区的运作方式和价值观有了更深入的了解。UNIX 社区以其开放的开发模式和共享的文化,促进了知识和经验的交流,推动了技术的不断进步。在 UNIX 社区中,人们通过邮件列表、论坛和会议等形式进行交流和合作,共同解决问题、改进软件,形成了一种合作共赢的氛围。

此外,作者还介绍了 UNIX 在商业领域的应用和发展。UNIX 不仅在学术界和科研领域得到广泛应用,也在商业领域取得了巨大成功。通过对 UNIX 商业化的历史和案例的介绍,对 UNIX 在商业环境中的优势和挑战有了更深入的认识。UNIX 的开放性和灵活性使其成为企业 IT 系统的首选,而 UNIX 商业公司的崛起也推动了 UNIX 的发展和推广。

Linux-declaration.jpg

上图是 1991 年 8 月 林纳斯·托瓦兹 的 Linux宣告(图片来源)。在读《UNIX 传奇:历史与回忆》之后,对 UNIX 的重要性和影响力有了更深刻的认识。UNIX 不仅是一种操作系统,更是一种思想和理念的体现。UNIX 的设计原则和开放的开发模式影响了整个计算机科学领域,推动了软件工程的发展。UNIX 的成功不仅在于其技术实力,更在于其背后的开放和合作精神。

然后,本书还通过对 UNIX 历史的回顾和个人经历的叙述,让我感受到了 UNIX 社区的热情和活力。UNIX 社区的成员们对技术的热爱和追求,以及对自由和开放的坚持,让我深受启发。作为一名从事软件开发的人,我深深地感受到了 UNIX 所传递的价值观和精神,这将对我的工作和职业发展产生积极的影响。

读完《UNIX 传奇:历史与回忆》后,我深受感动和启发。这本书不仅让我了解了 UNIX 的历史和技术,也让我感受到了 UNIX 的精神和价值。UNIX 的开放性、灵活性和合作精神,都是我在工作和生活中需要学习和借鉴的地方。

UNIX 的设计哲学「小即是美」,让我明白了在解决问题时,简洁的解决方案往往是最好的。在软件开发中,我们应该尽量避免复杂性,追求简洁和高效。同时,UNIX 的「一切皆文件」原则,也让我明白了抽象和统一的重要性。通过把所有资源都视为文件,UNIX 简化了操作和管理的复杂性,提高了效率和可用性。

Unix_history.png

上图是自 1969 年以来 UNIX 和类 UNIX 系统的演变历史(图片来源)。UNIX 开放源代码和开发模式,也让我认识到了开放和共享的价值。在今天的互联网时代,开放和共享是推动技术和知识进步的重要力量。我们应该积极参与开源社区,共享我们的知识和经验,共同推动技术的发展。

此外,UNIX 社区的活力和热情,也让我深受感动。在 UNIX 社区中,人们无私地分享知识,热情地帮助他人,共同解决问题,这种精神是我需要学习和倡导的。

总的来说,《UNIX 传奇:历史与回忆》是一本非常值得一读的书。它不仅让我了解了 UNIX 的历史和技术,也让我感受到了 UNIX 的精神和价值。

这本书对我来说,既是一次知识的旅行,也是一次精神的洗礼。我相信,这本书对任何对计算机科学和软件开发感兴趣的人,都会有所启发和帮助~

Go 并发模型—Goroutines

作者 mghio
2023年7月9日 13:29

Concurrency_in_Go.png

前言

Goroutines 是 Go 语言主要的并发原语。它看起来非常像线程,但是相比于线程它的创建和管理成本很低。Go 在运行时将 goroutine 有效地调度到真实的线程上,以避免浪费资源,因此您可以轻松地创建大量的 goroutine(例如每个请求一个 goroutine),并且您可以编写简单的,命令式的阻塞代码。因此,Go 的网络代码往往比其它语言中的等效代码更直接,更容易理解(这点从下文中的示例代码可以看出)。

对我来说,goroutine 是将 Go 这门语言与其它语言区分开来的一个主要特征。这就是为什么大家更喜欢用 Go 来编写需要并发的代码。在下面讨论更多关于 goroutine 之前,我们先了解一些历史,这样你就能理解为什么你想要它们了。

基于 fork 和线程

fork_thread.jpeg

高性能服务器需要同时处理来自多个客户端的请求。有很多方法可以设计一个服务端架构来处理这个问题。最容易想到的就是让一个主进程在循环中调用 accept,然后调用 fork 来创建一个处理请求的子进程。这篇 Beej’s Guide to Network Programming 指南中提到了这种方式。

在网络编程中,fork 是一个很好的模式,因为你可以专注于网络而不是服务器架构。但是它很难按照这种模式编写出一个高效的服务器,现在应该没有人在实践中使用这种方式了。

fork 同时也存在很多问题,首先第一个是成本: Linux 上的 fork 调用看起来很快,但它会将你所有的内存标记为 copy-on-write。每次写入 copy-on-write 页面都会导致一个小的页面错误,这是一个很难测量的小延迟,进程之间的上下文切换也很昂贵。

另一个问题是规模: 很难在大量子进程中协调共享资源(如 CPU、内存、数据库连接等)的使用。如果流量激增,并且创建了太多进程,那么它们将相互争夺 CPU。但是如果限制创建的进程数量,那么在 CPU 空闲时,大量缓慢的客户端可能会阻塞每个人的正常使用,这时使用超时机制会有所帮助(无论服务器架构如何,超时设置都是很必要的)。

通过使用线程而不是进程,上面这些问题在一定程度上能得到缓解。创建线程比创建进程更“便宜”,因为它共享内存和大多数其它资源。在共享地址空间中,线程之间的通信也相对容易,使用信号量和其它结构来管理共享资源,然而,线程仍然有很大的成本,如果你为每个连接创建一个新线程,你会遇到扩展问题。与进程一样,你此时需要限制正在运行的线程的数量,以避免严重的 CPU 争用,并且需要使慢速请求超时。创建一个新线程仍然需要时间,尽管可以通过使用线程池在请求之间回收线程来缓解这一问题。

无论你是使用进程还是线程,你仍然有一个难以回答的问题: 你应该创建多少个线程?如果您允许无限数量的线程,客户端可能会用完所有的内存和 CPU,而流量会出现小幅激增。如果你限制服务器的最大线程数,那么一堆缓慢的客户端就会阻塞你的服务器。虽然超时是有帮助的,但它仍然很难有效地使用你的硬件资源。

基于事件驱动

event-driven.png

那么既然无法轻易预测出需要多少线程,当如果尝试将请求与线程解耦时会发生什么呢?如果我们只有一个线程专门用于应用程序逻辑(或者可能是一个小的、固定数量的线程),然后在后台使用异步系统调用处理所有的网络流量,会怎么样?这就是一种 事件驱动 的服务端架构。

事件驱动架构模式是围绕 select 系统调用设计的。后来像 poll 这样的机制已经取代了 select,但是 select 是广为人知的,它们在这里都服务于相同的概念和目的。select 接受一个文件描述符列表(通常是套接字),并返回哪些是准备好读写的。如果所有文件描述符都没有准备好,则选择阻塞,直到至少有一个准备好

1
2
3
4
5
6
7
8
9
10
11
12
#include <sys/select.h>
#include <poll.h>

int select(int nfds,
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict exceptfds,
struct timeval *restrict timeout);

int poll(struct pollfd *fds,
nfds_t nfds,
int timeout);

为了实现一个事件驱动的服务器,你需要跟踪一个 socket 和网络上被阻塞的每个请求的一些状态。在服务器上有一个单一的主事件循环,它调用 select 来处理所有被阻塞的套接字。当 select 返回时,服务器知道哪些请求可以进行了,因此对于每个请求,它调用应用程序逻辑中的存储状态。当应用程序需要再次使用网络时,它会将套接字连同新状态一起添加回“阻塞”池中。这里的状态可以是应用程序恢复它正在做的事情所需的任何东西: 一个要回调的 closure,或者一个 Promise。

从技术上讲,这些其实都可以用一个线程实现。这里不能谈论任何特定实现的细节,但是像 JavaScript
这样缺乏线程的语言也很好的遵循了这个模型。Node.js 更是将自己描述为“an event-driven JavaScript runtime, designed to build scalable network applications.”

事件驱动的服务器通常比纯粹基于 fork 或线程的服务器更好地利用 CPU 和内存。你可以为每个核心生成一个应用程序线程来并行处理请求。线程不会相互争夺 CPU,因为线程的数量等于内核的数量。当有请求可以进行时,线程永远不会空闲,非常高效。效率如此之高,以至于现在大家都使用这种方式来编写服务端代码。

从理论上讲,这听起来不错,但是如果你编写这样的应用程序代码,就会发现这是一场噩梦。。。具体是什么样的噩梦,取决于你所使用的语言和框架。在 JavaScript 中,异步函数通常返回一个 Promise,你给它附加回调。在 Java gRPC 中,你要处理的是 StreamObserver。如果你不小心,你最终会得到很多深度嵌套的“箭头代码”函数。如果你很小心,你就把函数和类分开了,混淆了你的控制流。不管怎样,你都是在 callback hell 里。

下面是一个 Java gRPC 官方教程 中的一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public void routeChat() throws Exception {
info("*** RoutChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}

@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
finishLatch.countDown();
}

@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});

try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();

// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}

上面代码官方的初学者教程,它不是一个完整的例子,发送代码是同步的,而接收代码是异步的。在 Java 中,你可能会为你的 HTTP 服务器、gRPC、数据库和其它任何东西处理不同的异步类型,你需要在所有这些服务器之间使用适配器,这很快就会变得一团糟。

同时这里如果使用锁也很危险,你需要小心跨网络调用持有锁。锁和回调也很容易犯错误。例如,如果一个同步方法调用一个返回 ListenableFuture 的函数,然后附加一个内联回调,那么这个回调也需要一个同步块,即使它嵌套在父方法内部。

Goroutines

goroutine.jpg

终于到了我们的主角——goroutines。它是 Go 语言版本的线程。像它语言(比如:Java)中的线程一样,每个 gooutine 都有自己的堆栈。goroutine 可以与其它 goroutine 并行执行。与线程不同,goroutine 的创建成本非常低:它不绑定到 OS 线程上,它的堆栈开始非常小(初始只有 2 K),但可以根据需要增长。当你创建一个 goroutine 时,你实际上是在分配一个 closure,并在运行时将其添加到队列中。

在内部实现中,Go 的运行时有一组执行程序的 OS 线程(通常每个内核一个线程)。当一个线程可用并且一个 goroutine 准备运行时,运行时将这个 goroutine 调度到线程上,执行应用程序逻辑。如果一个运行例程阻塞了像 mutex 或 channel 这样的东西时,运行时将它添加到阻塞的运行 goroutine 集合中,然后将下一个就绪的运行例程调度到同一个 OS 线程上。

这也适用于网络:当一个线程程序在未准备好的套接字上发送或接收数据时,它将其 OS 线程交给调度器。这听起来是不是很熟悉?Go 的调度器很像事件驱动服务器中的主循环。除了仅仅依赖于 select 和专注于文件描述符之外,调度器处理语言中可能阻塞的所有内容。

你不再需要避免阻塞调用,因为调度程序可以有效地利用 CPU。可以自由地生成许多 goroutine(可以每个请求一个!),因为创建它们的成本很低,而且不会争夺 CPU,你不需要担心线程池和执行器服务,因为运行时实际上有一个大的线程池。

简而言之,你可以用干净的命令式风格编写简单的阻塞应用程序代码,就像在编写一个基于线程的服务器一样,但你保留了事件驱动服务器的所有效率优势,两全其美。这类代码可以很好地跨框架组合。你不需要 streamobserver 和 ListenableFutures 之间的这类适配器。

下面让我们看一下来自 Go gRPC 官方教程 的相同示例。可以发现这里的控制流比 Java 示例中的更容易理
解,因为发送和接收代码都是同步的。在这两个 goroutines 中,我们都可以在一个 for 循环中调用 stream.Recv 和stream.Send。不再需要回调、子类或执行器这些东西了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc

虚拟线程

virtual_threads.png

如何你使用 Java 这门语言,到目前为止,你要么必须生成数量不合理的线程,要么必须处理 Java 特有的回调地狱。令人高兴的是,JEP 444 中增加了 virtual threads,这看起来很像 Go 语言中的 goroutine。

创建虚拟线程的成本很低。JVM 将它们调度到平台线程(platform threads,内核中的真实线程)上。平台线程的数量是固定的,一般每个内核一个平台线程。当一个虚拟线程执行阻塞操作时,它会释放它的平台线程,JVM
可能会将另一个虚拟线程调度到它上面。与 gooutine 不同,虚拟线程调度是协作的: 虚拟线程在执行阻塞操作之前不会服从于调度程序。这意味着紧循环可以无限期地保持线程。目前不清楚这是实现限制还是有更深层次的问题。Go 以前也有这个问题,直到 1.14 才实现了完全抢占式调度(可见 GopherCon 2021)。

Java 的虚拟线程现在可以预览,预计在 JDK 21 中成为 stable(官方消息是预计 2023 年 9 月发布)状态。哈哈,很期待到时候能删除大量的 ListenableFutures。每当引入一种新的语言或运行时特性时,都会有一个漫长的迁移过渡期,个人认为 Java 生态系统在这方面还是过于保守了。

抓包分析 TCP 握手和挥手

作者 mghio
2022年11月6日 12:51

cover.jpg

前言

首先需要明确的是 TCP 是一个可靠传输协议,它的所有特点最终都是为了这个可靠传输服务。在网上看到过很多文章讲 TCP 连接的三次握手和断开连接的四次挥手,但是都太过于理论,看完感觉总是似懂非懂。反复思考过后,觉得我自己还是偏工程型的人,要学习这些理论性的知识,最好的方式还是要通过实际案例来理解,这样才会具象深刻。本文通过 Wireshark 抓包来分析 TCP 三次握手四次挥手,如果你也对这些理论感觉似懂非懂,那么强烈建议你也结合抓包实践来强化理解这些理论性的知识。

三次握手

TCP 建立连接的三次握手是连接的双方协商确认一些信息(Sequence number、Maximum Segment Size、Window Size 等),Sequence number 有两个作用:一个是 SYN 标识位为 1 时作为初始序列号(ISN),则实际第一个数据字节的序列号和相应 ACK 中的确认号就是这个序列号加 1;另一个是 SYN 标识位为 0 时,则是当前会话的 segment(传输层叫 segment,网络层叫 packet,数据链路层叫 frame)的第一个数据字节的累积序列号。Maximum Segment Size 简称 MSS,表示最大一个 segment 中能传输的信息(不含 TCP、IP 头部)。Window Size 表示发送方接收窗口的大小。下面看看我在本地访问博客 mghio 的三次握手过程:

three-way-hand-shake.jpg

图中三个小红框表示与服务器建立连接的三次握手。

  1. 第一步,client 端(这个示例也就是浏览器)发送 SYN 到 server 端;
  2. 第二步,server 端收到 SYN 消息后,回复 SYN + ACK 到client 端,ACK 表示已经收到了 client 的 SYN 消息;
  3. 第三步,client 端收到回复 SYN + ACK 后,也回复一个 ACK 表示收到了 server 端的 SYN + ACK 了,其实

到这一步,client 端的 60469 端口已经是 ESTABLISHED 状态了。
可以看到,其实三次握手的核心目的就是双方互相告知对象自己的 Sequence number,蓝框是 client 端的初始 Sequence number 和 client 端回复的 ACK,绿框是 server 端的初始 Sequence number 和 client 端回复的 ACK。这样协商好初始 Sequence number 后,发送数据包时发送端就可以判断丢包和进行丢包重传了。

三次握手还有一个目的是协商一些信息(上图中黄色方框是 Maximum Segment Size,粉色方框是 Window Size)。

three-way-hand-shake-dg.jpg

到这里,就可以知道平常所说的建立TCP连接本质是为了实现 TCP 可靠传输做的前置准备工作,实际上物理层并没有这个连接在那里。TCP 建立连接之后时拥有和维护一些状态信息,这个状态信息就包含了 Sequence number、MSS、Window Size 等,TCP 握手就是协商出来这些初始值。而这些状态才是我们平时所说的 TCP 连接的本质。因为这个太重要了,我还要再次强调一下,TCP 是一个可靠传输协议,它的所有特点最终都是为了这个可靠传输服务

四次挥手

下面再来看看,当关闭浏览器页面是发生断开连接的四次挥手过程:

tcp-close-sequence.jpg

相信你已经发现了,上图抓包抓到的不是四次挥手,而是三次挥手,这是为何呢?

这是由于 TCP 的时延机制(因为系统内核并不知道应用能不能立即关闭),当被挥手端(这里是 server 的 443 端口)第一次收到挥手端(这里是 client 的 63612 端口)的 FIN 请求时,并不会立即发送 ACK,而是会经过一段延迟时间后再发送,但是此时被挥手端也没有数据发送,就会向挥手端发送 FIN 请求,这里就可能造成被挥手端发送的 FIN 与 ACK 一起被挥手端收到,导致出现第二、三次挥手合并为一次的现象,也就最终呈现出“三次挥手”的情况。

断开连接四次挥手分为如下四步(假设没有出现挥手合并的情况):

  1. 第一步,client 端主动发送 FIN 包给 server 端;
  2. 第二步,server 端回复 ACK(对应第一步 FIN 包的 ACK)给 client,表示 server 知道 client 端要断开了;
  3. 第三步,server 端发送 FIN 包给 client 端,表示 server 端也没有数据要发送了,可以断开了;
  4. 第四步,client 端回复 ACK 包给 server 端,表示既然双发都已发送 FIN 包表示可以断开,那么就真的断开了啊。

下面是 TCP 连接流转状态图(其中 CLOSED 状态是虚拟的,实际上并不存在),这个图很重要,记住这个图后基本上所有的 TCP 网络问题就可以解决。

tcp_state_diagram.png

其中比较难以理解的是 TIME_WAIT 状态,主动关闭的那一端会经历这个状态。这一端停留在这个状态的最长时间是 Maximum segment lifetime(MSL)的 2 倍,大部分时候被简称之为 2MSL。存在 TIME_WAIT 状态有如下两个原因:

  1. 要可靠的实现 TCP 全双工连接终止;
  2. 让老的重复 segment 在网络中消失(一个 sement 在网络中存活的最长时间为 1 个 MSL,一来一回就是 2 MSL);

为什么握手是三次,而挥手是四次?

嘿嘿,这是个经典的面试题,其实大部分人都背过挥手是四次的原因:因为 TCP 是全双工(双向)的,所以回收需要四次……。但是再反问下:握手也是双向的,但是为什么是只要三次呢?

网上流传的资料都说 TCP 是双向的,所以回收需要四次,但是握手也是双向(握手双方都在告知对方自己的初始 Sequence number),那么为什么就不用四次握手呢?所以凡事需要多问几个为什么,要有探索和怀疑精神。

你再仔细回看上面三次握手的第二步(SYN + ACK),其实是可以拆分为两步的:第一步回复 ACK,第二步再发 SYN 也是完全可以的,只是效率会比较低,这样的话三次握手不也变成四次握手了。

看起来四次挥手主要是收到第一个 FIN 包后单独回复了一个 ACK 包这里多了一次,如果能像握手那样也回复 FIN + ACK 那么四次挥手也就变成三次了。这里再贴一下上面这个挥手的抓包图:

tcp-close-sequence.jpg

这个图中第二个红框就是 server 端回复的 FIN + ACK 包,这样四次挥手变成三次了(如果一个包算一次的话)。这里使用四次挥手原因主要是:被动关闭端在收到 FIN 后,知道主动关闭端要关闭了,然后系统内核层会通知应用层要关闭,此时应用层可能还需要做些关闭前的准备工作,可能还有数据没发送完,所以系统内核先回复一个 ACK 包,然后等应用层准备好了主动调 close 关闭时再发 FIN 包。

而握手过程中就没有这个准备过程了,所以可以立即发送 SYN + ACK(在这里的两步合成一步了,提高效率)。挥手过程中系统内核在收到对方的 FIN 后,只能 ACK,不能主动替应用来 FIN,因为系统内核并不知道应用能不能立即关闭。

总结

TCP 是一个很复杂的协议,为了实现可靠传输以及处理各种网络传输中的 N 多问题,有一些很经典的解决方案,比如其中的网络拥塞控制算法、滑动窗口、数据重传等。强烈建议你去读一下 rfc793TCP/IP 详解 卷1:协议 这本书。

如果你是那些纯看理论就能掌握好一门技能,然后还能举三反一的人,那我很佩服你;如果不是,那么学习理论知识注意要结合实践来强化理解理论,要经过反反复复才能比较好地掌握一个知识,讲究技巧,必要时要学会通过工具来达到目的。

最后 TCP 所有特性基本上核心都是为了实现可靠传输这个目标来服务的,然后有一些是出于优化性能的目的。

Spring 中 @EnableXXX 注解的套路

作者 mghio
2022年6月5日 16:29

cover.jpg

前言

在 Spring 框架中有很多实用的功能,不需要写大量的配置代码,只需添加几个注解即可开启。 其中一个重要原因是那些 @EnableXXX 注解,它可以让你通过在配置类加上简单的注解来快速地开启诸如事务管理(@EnableTransactionManagement)、Spring MVC(@EnableWebMvc)或定时任务(@EnableScheduling)等功能。这些看起来简单的注解语句提供了很多功能,但它们的内部机制从表面上看却不太明显。 一方面,对于使用者来说用这么少的代码获得这么多实用的功能是很好的,但另一方面,如果你不了解某个东西的内部是如何工作的,就会使调试和解决问题更加困难。

设计目标

Spring 框架中那些 @EnableXXX 注解的设计目标是允许用户用最少的代码来开启复杂使用的功能。 此外,用户必须能够使用简单的默认值,或者允许手动配置该代码。最后,代码的复杂性要向框架使用者隐藏掉。 简而言之,让使用者设置大量的 Bean,并选择性地配置它们,而不必知道这些 Bean 的细节(或真正被设置的内容)。下面来看看具体的几个例子:

@EnableScheduling (导入一个 @Configuration 类)

首先要知道的是,@EnableXXX 注解并不神奇。实际上在 BeanFactory 中并不知道这些注解的具体内容,而且在 BeanFactory 类中,核心功能和特定注解(如 @EnableWebMvc)或它们所存放的 jar 包(如 spring-web)之间没有任何依赖关系。 让我们看一下 @EnableScheduling,下面看看它是如何工作的。 定义一个 SchedulingConfig 配置类,如下所示:

1
2
3
4
5
@Configuration
@EnableScheduling
public class SchedulingConfig {
// some bean in here
}

上面的内容没有什么特别之处。只是一个用 @EnableScheduling 注释的标准 Java 配置。@EnableScheduling 让你以设定的频率执行某些方法。例如,你可以每 10 分钟运行 BankService.transferMoneyToMghio()。 @EnableScheduling 注解源码如下:

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

上面的 EnableScheduling 注解,我们可以看到它只是一个标准的类级注解(@Target/@Retention),应该包含在 JavaDocs 中(@Documented),但是它有一个 Spring 特有的注解(@Import)。 @Import 是将一切联系起来的关键。 在这种情况下,由于我们的 SchedulingConfig 被注解为 @EnableScheduling,当 BeanFactory 解析文件时(内部是ConfigurationClassPostProcessor 在解析它),它也会发现 @Import(SchedulingConfiguration.class) 注解,它将导入该值中定义的类。 在这个注解中,就是 SchedulingConfiguration。

这里导入是什么意思呢?在这种情况下,它只是被当作另一个 Spring Bean。 SchedulingConfiguration 实际上被注解为@Configuration,所以 BeanFactory 会把它看作是另一个配置类,所有在该类中定义的 Bean 都会被拉入你的应用上下文,就像你自己定义了另一个 @Configuration 类一样。 如果我们检查 SchedulingConfiguration,我们可以看到它只定义了一个Bean(一个Post Processor),它负责我们上面描述的调度工作,源码如下:

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}

}

也许你会问,如果想配置 SchedulingConfiguration 中定义的 bean 呢? 这里也只是在处理普通的Bean。 所以你对其它 Bean 所使用的机制也适用于此。 在这种情况下,ScheduledAnnotationBeanPostProcessor 使用一个标准的 Spring Bean 生命周期(postProcessAfterInitialization)来发现应用程序上下文何时被刷新。 当符合条件时,它会检查是否有任何 Bean 实现了 SchedulingConfigurer,如果有,就使用这些 Bean 来配置自己。 其实这一点并不明细(在 IDE 中也不太容易找到),但它与 BeanFactory 是完全分离的,而且是一个相当常见的模式,一个 Bean 被用来配置另一个 Bean。 而现在我们可以把所有的点连接起来,它(在某种程度上)很容易找到(你可以 Google 一下文档或阅读一下 JavaDocs)。

@EnableTransactionManagement(导入一个 ImportSelector)

在上一个示例中,我们讨论了像 @EnableScheduling 这样的注解如何使用 @Import 来导入另一个 @Configuration 类并使其所有的 Bean 对你的应用程序可用(和可配置)。但是如果你想根据某些配置加载不同的 Bean 集,会发生什么呢? @EnableTransactionManagement 就是一个很好的例子。TransactioConfig 定义如下:

1
2
3
4
5
@Configuration
@EnableTransactionManagement(mode = AdviceMode.ASPECTJ)
public class TransactioConfig {
// some bean in here
}

再一次,上面没有什么特别之处。只是一个用@EnableTransactionManagement注释的标准Java配置。唯一与之前的例子有些不同的是,用户为注释指定了一个参数(mode=AdviceMode.ASPECTJ)。 @EnableTransactionManagement注解本身看起来像这样。

1
2
3
4
5
6
7
8
9
10
11
12
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {

boolean proxyTargetClass() default false;

AdviceMode mode() default AdviceMode.PROXY;

int order() default Ordered.LOWEST_PRECEDENCE;
}

和前面一样,一个相当标准的注解,尽管这次它有一些参数。 然而,正如前文提到,@Import 注解是将一切联系在一起的关键,这一点再次得到证实。 但区别在于,这次我们导入的是 TransactionManagementConfigurationSelector 这个类,通过源码可以发现,其实它不是一个被 @Configuration 注解的类。 TransactionManagementConfigurationSelector 是一个实现ImportSelector 的类。 ImportSelector 的目的是让你的代码选择在运行时加载哪些配置类。 它有一个方法,接收关于注解的一些元数据,并返回一个类名数组。 在这种情况下,TransactionManagementConfigurationSelector 会查看模式并根据模式返回一些类。其中的 selectImports 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}

这些类中的大多数是 @Configuration(例如 ProxyTransactionManagementConfiguration),通过前文介绍我们知道它们会像前面一样工作。 对于 @Configuration 类,它们被加载和配置的方式与我们之前看到的完全一样。 所以简而言之,我们可以使用 @Import 和 @Configuration 类来加载一套标准的 Bean,或者使用 @Import 和 ImportSelector 来加载一套在运行时决定的 Bean。

@EnableAspectJAutoProxy (在 Bean 定义层导入)

@Import 支持的最后一种情况,即当你想直接处理 BeanRegistry(工厂)时。如果你需要操作Bean Factory或者在Bean定义层处理Bean,那么这种情况就适合你,它与上面的情况非常相似。 你的 AspectJProxyConfig 可能看起来像。

1
2
3
4
5
@Configuration
@EnableAspectJAutoProxy
public class AspectJProxyConfig {
// some bean in here
}

再一次,上面定义没有什么特别的东西。只是一个用 @EnableAspectJAutoProxy 注释的标准 Java 配置。 下面是@EnableAspectJAutoProxy 的源代码。

1
2
3
4
5
6
7
8
9
10
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AspectJAutoProxyRegistrar.class)
public @interface EnableAspectJAutoProxy {

boolean proxyTargetClass() default false;

boolean exposeProxy() default false;
}

和前面一样,@Import 是关键,但这次它指向 AspectJAutoProxyRegistrar,它既没有 @Configuration 注解,也没有实现 ImportSelector 接口。 这次使用的是实现了 ImportBeanDefinitionRegistrar。 这个接口提供了对 Bean 注册中心(Bean Registry)和注解元数据的访问,因此我们可以在运行时根据注解中的参数来操作 Bean 注册表。 如果你仔细看过前面的示例,你可以看到我们忽略的类也是 ImportBeanDefinitionRegistrar。 在 @Configuration 类不够用的时候,这些类会直接操作 BeanFactory。

所以现在我们已经涵盖了 @EnableXXX 注解使用 @Import 将各种 Bean 引入你的应用上下文的所有不同方式。 它们要么直接引入一组 @Configuration 类,这些类中的所有 Bean 都被导入到你的应用上下文中。 或者它们引入一个 ImportSelector 接口实现类,在运行时选择一组 @Configuration 类并将这些 Bean 导入到你的应用上下文中。 最后,他们引入一个ImportBeanDefinitionRegistrars,可以直接与 BeanFactory 在 BeanDefinition 级别上合作。

结论

总的来说,个人认为这种将 Bean 导入应用上下文的方法很好,因为它使框架使用者的使用某个功能非常容易。不幸的是,它模糊了如何找到可用的选项以及如何配置它们。 此外,它没有直接利用 IDE 的优势,所以很难知道哪些 Bean 正在被创建(以及为什么)。 然而,现在我们知道了 @Import 注解,我们可以使用 IDE 来挖掘一下每个注解及其相关的配置类,并了解哪些 Bean 正在被创建,它们如何被添加到你的应用上下文中,以及如何配置它们。 希望对你有帮助~

Java 内存模型

作者 mghio
2022年4月15日 16:24

cover.jpg

前言

在并发编程中,当多个线程同时访问同一个共享的可变变量时,会产生不确定的结果,所以要编写线程安全的代码,其本质上是对这些可变的共享变量的访问操作进行管理。导致这种不确定结果的原因就是可见性有序性原子性问题,Java 为解决可见性和有序性问题引入了 Java 内存模型,使用互斥方案(其核心实现技术是)来解决原子性问题。这篇先来看看解决可见性、有序性问题的 Java 内存模型(JMM)。

什么是 Java 内存模型

Java 内存模型在维基百科上的定义如下:

The Java memory model describes how threads in the Java programming language interact through memory. Together with the description of single-threaded execution of code, the memory model provides the semantics of the Java programming language.

内存模型限制的是共享变量,也就是存储在堆内存中的变量,在 Java 语言中,所有的实例变量、静态变量和数组元素都存储在堆内存之中。而方法参数、异常处理参数这些局部变量存储在方法栈帧之中,因此不会在线程之间共享,不会受到内存模型影响,也不存在内存可见性问题。

通常,在线程之间的通讯方式有共享内存和消息传递两种,很明显,Java 采用的是第一种即共享的内存模型,在共享的内存模型里,多线程之间共享程序的公共状态,通过读-写内存的方式来进行隐式通讯。

从抽象的角度来看,JMM 其实是定义了线程和主内存之间的关系,首先,多个线程之间的共享变量存储在主内存之中,同时每个线程都有一个自己私有的本地内存,本地内存中存储着该线程读或写共享变量的副本(注意:本地内存是 JMM 定义的抽象概念,实际上并不存在)。抽象模型如下图所示:

1.png

在这个抽象的内存模型中,在两个线程之间的通信(共享变量状态变更)时,会进行如下两个步骤:

  1. 线程 A 把在本地内存更新后的共享变量副本的值,刷新到主内存中。
  2. 线程 B 在使用到该共享变量时,到主内存中去读取线程 A 更新后的共享变量的值,并更新线程 B 本地内存的值。

JMM 本质上是在硬件(处理器)内存模型之上又做了一层抽象,使得应用开发人员只需要了解 JMM 就可以编写出正确的并发代码,而无需过多了解硬件层面的内存模型。

为什么需要 Java 内存模型

在日常的程序开发中,为一些共享变量赋值的场景会经常碰到,假设一个线程为整型共享变量 count 做赋值操作(count = 9527;),此时就会有一个问题,其它读取该共享变量的线程在什么情况下获取到的变量值为 9527 呢?如果缺少同步的话,会有很多因素导致其它读取该变量的线程无法立即甚至是永远都无法看到该变量的最新值。

比如缓存就可能会改变写入共享变量副本提交到主内存的次序,保存在本地缓存的值,对于其它线程是不可见的;编译器为了优化性能,有时候会改变程序中语句执行的先后顺序,这些因素都有可能会导致其它线程无法看到共享变量的最新值。

在文章开头,提到了 JMM 主要是为了解决可见性有序性问题,那么首先就要先搞清楚,导致可见性有序性问题发生的本质原因是什么?现在的服务绝大部分都是运行在多核 CPU 的服务器上,每颗 CPU 都有自己的缓存,这时 CPU 缓存与内存的数据就会有一致性问题了,当一个线程对共享变量的修改,另外一个线程无法立刻看到。导致可见性问题的本质原因是缓存

2.png

有序性是指代码实际的执行顺序和代码定义的顺序一致,编译器为了优化性能,虽然会遵守 as-if-serial 语义(不管怎么重排序,在单线程下的执行结果不能改变),不过有时候编译器及解释器的优化也可能引发一些问题。比如:双重检查来创建单实例对象。下面是使用双重检查来实现延迟创建单例对象的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @author mghio
* @since 2021-08-22
*/
public class DoubleCheckedInstance {

private static DoubleCheckedInstance instance;

public static DoubleCheckedInstance getInstance() {
if (instance == null) {
synchronized (DoubleCheckedInstance.class) {
if (instance == null) {
instance = new DoubleCheckedInstance();
}
}
}

return instance;
}

}

这里的 instance = new DoubleCheckedInstance();,看起来 Java 代码只有一行,应该是无法就行重排序的,实际上其编译后的实际指令是如下三步:

  1. 分配对象的内存空间
  2. 初始化对象
  3. 设置 instance 指向刚刚已经分配的内存地址

上面的第 2 步和第 3 步如果改变执行顺序也不会改变单线程的执行结果,也就是说可能会发生重排序,下图是一种多线程并发执行的场景:

3.png

此时线程 B 获取到的 instance 是没有初始化过的,如果此来访问 instance 的成员变量就可能触发空指针异常。导致有序性问题的本质原因是编译器优化。那你可能会想既然缓存和编译器优化是导致可见性问题和有序性问题的原因,那直接禁用掉不就可以彻底解决这些问题了吗,但是如果这么做了的话,程序的性能可能就会受到比较大的影响了。

其实可以换一种思路,能不能把这些禁用缓存和编译器优化的权利交给编码的工程师来处理,他们肯定最清楚什么时候需要禁用,这样就只需要提供按需禁用缓存和编译优化的方法即可,使用比较灵活。因此Java 内存模型就诞生了,它规范了 JVM 如何提供按需禁用缓存和编译优化的方法,规定了 JVM 必须遵守一组最小的保证,这个最小保证规定了线程对共享变量的写入操作何时对其它线程可见。

顺序一致性内存模型

顺序一致性模型是一个理想化后的理论参考模型,处理器和编程语言的内存模型的设计都是参考的顺序一致性模型理论。其有如下两大特性:

  1. 一个线程中的所有操作必须按照程序的顺序来执行
  2. 所有的线程都只能看到一个单一的执行操作顺序,不管程序是否同步

在工程师视角下的顺序一致性模型如下:

4.png

顺序一致性模型有一个单一的全局内存,这个全局内存可以通过左右摇摆的开关可以连接到任意一个线程,每个线程都必须按照程序的顺序来执行内存的读和写操作。该理想模型下,任务时刻都只能有一个线程可以连接到内存,当多个线程并发执行时,就可以通过开关就可以把多个线程的读和写操作串行化

顺序一致性模型中,所有操操作完全按照顺序串行执行,但是在 JMM 中就没有这个保证了,未同步的程序在 JMM 中不仅程序的执行顺序是无序的,而且由于本地内存的存在,所有线程看到的操作顺序也可能会不一致,比如一个线程把写共享变量保存在本地内存中,在还没有刷新到主内存前,其它线程是不可见的,只有更新到主内存后,其它线程才有可能看到。

JMM 对在正确同步的程序做了顺序一致性的保证,也就是程序的执行结果和该程序在顺序一致性内存模型中的执行结果相同。

Happens-Before 规则

Happens-Before 规则是 JMM 中的核心概念,Happens-Before 概念最开始在 这篇论文 提出,其在论文中使用 Happens-Before 来定义分布式系统之间的偏序关系。在 JSR-133 中使用 Happens-Before 来指定两个操作之间的执行顺序。

JMM 正是通过这个规则来保证跨线程的内存可见性,Happens-Before 的含义是前面一个对共享变量的操作结果对该变量的后续操作是可见的,约束了编译器的优化行为,虽然允许编译器优化,但是优化后的代码必须要满足 Happens-Before 规则,这个规则给工程师做了这个保证:同步的多线程程序是按照 Happens-Before 指定的顺序来执行的。目的就是为了在不改变程序(单线程或者正确同步的多线程程序)执行结果的前提下,尽最大可能的提高程序执行的效率

5.png

JSR-133 规范中定了如下 6 项 Happens-Before 规则:

  1. 程序顺序规则:一个线程中的每个操作,Happens-Before 该线程中的任意后续操作
  2. 监视器锁规则:对一个锁的解锁操作,Happens-Before 于后面对这个锁的加锁操作
  3. volatile 规则对一个 volatile 类型的变量的写操作,Happens-Before 与任意后面对这个 volatile 变量的读操作
  4. 传递性规则:如果操作 A Happens-Before 于操作 B,并且操作 B Happens-Before 于操作 C,则操作 A Happens-Before 于操作 C
  5. start() 规则:如果一个线程 A 执行操作 threadB.start() 启动线程 B,那么线程 A 的 start() 操作 Happens-Before 于线程 B 的任意操作
  6. join() 规则:如果线程 A 执行操作 threadB.join() 并成功返回,那么线程 B 中的任意操作 Happens-Before 于线程 A 从 threadB.join() 操作成功返回

JMM 的一个基本原则是:只要不改变单线程和正确同步的多线程的执行结果,编译器和处理器随便怎么优化都可以,实际上对于应用开发人员对于两个操作是否真的被重排序并不关心,真正关心的是执行结果不能被修改。因此 Happens-Before 本质上和 sa-if-serial 的语义是一致的,只是 sa-if-serial 只是保证在单线程下的执行结果不被改变。

总结

本文主要介绍了内存模型的相关基础知识和相关概念,JMM 屏蔽了不同处理器内存模型之间的差异,在不同的处理器平台上给应用开发人员抽象出了统一的 Java 内存模型(JMM)。常见的处理器内存模型比 JMM 的要弱,因此 JVM 会在生成字节码指令时在适当的位置插入内存屏障(内存屏障的类型会因处理器平台而有所不同)来限制部分重排序。

Java 异步编程的几种方式

作者 mghio
2022年2月15日 16:21

cover.jpg

前言

异步编程是让程序并发运行的一种手段。它允许多个事情同时发生,当程序调用需要长时间运行的方法时,它不会阻塞当前的执行流程,程序可以继续运行,当方法执行完成时通知给主线程根据需要获取其执行结果或者失败异常的原因。使用异步编程可以大大提高我们程序的吞吐量,可以更好的面对更高的并发场景并更好的利用现有的系统资源,同时也会一定程度上减少用户的等待时间等。本文我们一起来看看在 Java 语言中使用异步编程有哪些方式。

Thread 方式

Java 语言中最简单使用异步编程的方式就是创建一个 Thread 来实现,如果你使用的 JDK 版本是 8 以上的话,可以使用 Lambda 表达式 会更加简洁。为了能更好的体现出异步的高效性,下面提供同步版本和异步版本的示例作为对照:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* @author mghio
* @since 2021-08-01
*/
public class SyncWithAsyncDemo {

public static void doOneThing() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("doOneThing ---->>> success");
}

public static void doOtherThing() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("doOtherThing ---->>> success");
}

public synchronized static void main(String[] args) throws InterruptedException {
StopWatch stopWatch = new StopWatch("SyncWithAsyncDemo");
stopWatch.start();

// 同步调用版本
// testSynchronize();

// 异步调用版本
testAsynchronize();

stopWatch.stop();
System.out.println(stopWatch);
}

private static void testAsynchronize() throws InterruptedException {
System.out.println("-------------------- testAsynchronize --------------------");

// 创建一个线程执行 doOneThing
Thread doOneThingThread = new Thread(SyncWithAsyncDemo::doOneThing, "doOneThing-Thread");
doOneThingThread.start();

doOtherThing();
// 等待 doOneThing 线程执行完成
doOneThingThread.join();
}

private static void testSynchronize() {
System.out.println("-------------------- testSynchronize --------------------");

doOneThing();
doOtherThing();
}

}

同步执行的运行如下:

1.png

注释掉同步调用版本的代码,得到异步执行的结果如下:

2.png

从两次的运行结果可以看出,同步版本耗时 4002 ms,异步版本执行耗时 2064 ms,异步执行耗时减少将近一半,可以看出使用异步编程后可以大大缩短程序运行时间。

上面的示例的异步线程代码在 main 方法内开启了一个线程 doOneThing-Thread 用来异步执行 doOneThing 任务,在这时该线程与 main 主线程并发运行,也就是任务 doOneThing 与任务 doOtherThing 并发运行,则等主线程运行完 doOtherThing 任务后同步等待线程 doOneThing 运行完毕,整体还是比较简单的。

但是这个示例只能作为示例使用,如果用到了生产环境发生事故后果自负,使用上面这种 Thread 方式异步编程存在两个明显的问题。

  1. 创建线程没有复用。我们知道频繁的线程创建与销毁是需要一部分开销的,而且示例里也没有限制线程的个数,如果使用不当可能会把系统线程用尽,从而引发事故,这个问题使用线程池可以解决。
  2. 异步任务无法获取最终的执行结果。示例中的这种方式是满足不了的,这时候就需要使用下面介绍的第二种 FutureTask 的方式了。

FutureTask 方式

JDK 1.5 开始,引入了 Future 接口和实现 Future 接口的 FutureTask 类来表示异步计算结果。这个 FutureTask 类不仅实现了 Future 接口还实现了 Runnable 接口,表示一种可生成结果的 Runnable。其可以处于这三种状态:

  • 未启动 当创建一个 FutureTask 没有执行 FutureTask.run() 方法之前
  • 已启动FutureTask.run() 方法执行的过程中
  • 已完成FutureTask.run() 方法正常执行结果或者调用了 FutureTask.cancel(boolean mayInterruptIfRunning) 方法以及在调用 FutureTask.run() 方法的过程中发生异常结束后

FutureTask 类实现了 Future 接口的开启和取消任务、查询任务是否完成、获取计算结果方法。要获取 FutureTask 任务的结果,我们只能通过调用 getXXX() 系列方法才能获取,当结果还没出来时候这些方法会被阻塞,同时这了任务可以是 Callable 类型(有返回结果),也可以是 Runnable 类型(无返回结果)。我们修改上面的示例把两个任务方法修改为返回 String 类型,使用 FutureTask 的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void testFutureTask() throws ExecutionException, InterruptedException {
System.out.println("-------------------- testFutureTask --------------------");

// 创建一个 FutureTask(doOneThing 任务)
FutureTask<String> futureTask = new FutureTask<>(FutureTaskDemo::doOneThing);
// 使用线程池执行 doOneThing 任务
ForkJoinPool.commonPool().execute(futureTask);

// 执行 doOtherThing 任务
String doOtherThingResult = doOtherThing();

// 同步等待线程执行 doOneThing 任务结束
String doOneThingResult = futureTask.get();

// 任务执行结果输出
System.out.println("doOneThingResult ---->>> " + doOneThingResult);
System.out.println("doOtherThingResult ---->>> " + doOtherThingResult);
}

使用 FutureTask 异步编程方式的耗时和上面的 Thread 方式是差不多的,其本质都是另起一个线程去做 doOneThing 任务然后等待返回,运行结果如下:

3.png

这个示例中,doOneThingdoOtherThing 都是有返回值的任务(都返回 String 类型结果),我们在主线程 main 中创建一个异步任务 FutureTask 来执行 doOneThing,然后使用 ForkJoinPool.commonPool() 创建线程池(有关 ForkJoinPool 的介绍见 这里),然后调用了线程池的 execute 方法把 futureTask 提交到线程池来执行。

通过示例可以看到,虽然 FutureTask 提供了一些方法让我们获取任务的执行结果、任务是否完成等,但是使用还是比较复杂,在一些较为复杂的场景(比如多个 FutureTask 之间的关系表示)的编码还是比较繁琐,还是当我们调用 getXXX() 系列方法时还是会在任务执行完毕前阻塞调用线程,达不到异步编程的效果,基于这些问题,在 JDK 8 中引入了 CompletableFuture 类,下面来看看如何使用 CompletableFuture 来实现异步编程。

CompletableFuture 方式

JDK 8 中引入了 CompletableFuture 类,实现了 FutureCompletionStage 接口,为异步编程提供了一些列方法,如 supplyAsyncrunAsyncthenApplyAsync 等,除此之外 CompletableFuture 还有一个重要的功能就是可以让两个或者多个 CompletableFuture 进行运算来产生结果。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* @author mghio
* @since 2021-08-01
*/
public class CompletableFutureDemo {

public static CompletableFuture<String> doOneThing() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "doOneThing";
});
}

public static CompletableFuture<String> doOtherThing(String parameter) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return parameter + " " + "doOtherThing";
});
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
StopWatch stopWatch = new StopWatch("CompletableFutureDemo");
stopWatch.start();

// 异步执行版本
testCompletableFuture();

stopWatch.stop();
System.out.println(stopWatch);
}

private static void testCompletableFuture() throws InterruptedException, ExecutionException {
// 先执行 doOneThing 任务,后执行 doOtherThing 任务
CompletableFuture<String> resultFuture = doOneThing().thenCompose(CompletableFutureDemo::doOtherThing);

// 获取任务结果
String doOneThingResult = resultFuture.get();

// 获取执行结果
System.out.println("DoOneThing and DoOtherThing execute finished. result = " + doOneThingResult);
}

}

执行结果如下:

4.png

在主线程 main 中首先调用了方法 doOneThing() 方法开启了一个异步任务,并返回了对应的 CompletableFuture 对象,我们取名为 doOneThingFuture,然后在 doOneThingFuture 的基础上使用 CompletableFuturethenCompose() 方法,让 doOneThingFuture 方法执行完成后,使用其执行结果作为 doOtherThing(String parameter) 方法的参数创建的异步任务返回。

我们不需要显式使用 ExecutorService,在 CompletableFuture 内部使用的是 Fork/Join 框架异步处理任务,因此,它使我们编写的异步代码更加简洁。此外,CompletableFuture 类功能很强大其提供了和很多方便的方法,更多关于 CompletableFuture 的使用请见 这篇

总结

本文介绍了在 Java 中的 JDK 使用异步编程的三种方式,这些是我们最基础的实现异步编程的工具,在其之上的还有 Guava 库提供的 ListenableFutureFutures 类以及 Spring 框架提供的异步执行能力,使用 @Async 等注解实现异步处理,感兴趣的话可以自行学习了解。

Java 并发之 Fork/Join 框架

作者 mghio
2022年1月5日 16:19

cover.jpg

什么是 Fork/Join 框架

Fork/Join 框架是一种在 JDK 7 引入的线程池,用于并行执行把一个大任务拆成多个小任务并行执行,最终汇总每个小任务结果得到大任务结果的特殊任务。通过其命名也很容易看出框架主要分为 ForkJoin 两个阶段,第一阶段 Fork 是把一个大任务拆分为多个子任务并行的执行,第二阶段 Join 是合并这些子任务的所有执行结果,最后得到大任务的结果。

这里不难发现其执行主要流程:首先判断一个任务是否足够小,如果任务足够小,则直接计算,否则,就拆分成几个更小的小任务分别计算,这个过程可以反复的拆分成一系列小任务。Fork/Join 框架是一种基于 分治 的算法,通过拆分大任务成多个独立的小任务,然后并行执行这些小任务,最后合并小任务的结果得到大任务的最终结果,通过并行计算以提高效率。

Fork/Join 框架使用示例

下面通过一个计算列表中所有元素的总和的示例来看看 Fork/Join 框架是如何使用的,总的思路是:将这个列表分成许多子列表,然后对每个子列表的元素进行求和,然后,我们再计算所有这些值的总和就得到原始列表的和了。Fork/Join 框架中定义了 ForkJoinTask 来表示一个 Fork/Join 任务,其提供了 fork()join() 等操作,通常情况下,我们并不需要直接继承这个 ForkJoinTask 类,而是使用框架提供的两个 ForkJoinTask 的子类:

  • RecursiveAction 用于表示没有返回结果Fork/Join 任务。
  • RecursiveTask 用于表示有返回结果Fork/Join 任务。

很显然,在这个示例中是需要返回结果的,可以定义 SumAction 类继承自 RecursiveTask,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* @author mghio
* @since 2021-07-25
*/
public class SumTask extends RecursiveTask<Long> {

private static final int SEQUENTIAL_THRESHOLD = 50;

private final List<Long> data;

public SumTask(List<Long> data) {
this.data = data;
}

@Override
protected Long compute() {
if (data.size() <= SEQUENTIAL_THRESHOLD) {
long sum = computeSumDirectly();
System.out.format("Sum of %s: %d\n", data.toString(), sum);
return sum;
} else {
int mid = data.size() / 2;
SumTask firstSubtask = new SumTask(data.subList(0, mid));
SumTask secondSubtask = new SumTask(data.subList(mid, data.size()));
// 执行子任务
firstSubtask.fork();
secondSubtask.fork();
// 等待子任务执行完成,并获取结果
long firstSubTaskResult = firstSubtask.join();
long secondSubTaskResult = secondSubtask.join();
return firstSubTaskResult + secondSubTaskResult;
}
}

private long computeSumDirectly() {
long sum = 0;
for (Long l : data) {
sum += l;
}
return sum;
}

public static void main(String[] args) {
Random random = new Random();

List<Long> data = random
.longs(1_000, 1, 100)
.boxed()
.collect(Collectors.toList());

ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(data);
pool.invoke(task);

System.out.println("Sum: " + pool.invoke(task));
}
}

这里当列表大小小于 SEQUENTIAL_THRESHOLD 变量的值(阈值)时视为小任务,直接计算求和列表元素结果,否则再次拆分为小任务,运行结果如下:

1.png

通过这个示例代码可以发现,Fork/Join 框架 中 ForkJoinTask 任务与平常的一般任务的主要不同点在于:ForkJoinTask 需要实现抽象方法 compute() 来定义计算逻辑,在这个方法里一般通用的实现模板是,首先先判断当前任务是否是小任务,如果是,就执行执行任务,如果不是小任务,则再次拆分为两个子任务,然后当每个子任务调用 fork() 方法时,会再次进入到 compute() 方法中,检查当前任务是否需要再拆分为子任务,如果已经是小任务,则执行当前任务并返回结果,否则继续分割,最后调用 join() 方法等待所有子任务执行完成并获得执行结果。伪代码如下:

1
2
3
4
5
6
7
8
if (problem is small) {
directly solve problem.
} else {
Step 1. split problem into independent parts.
Step 2. fork new subtasks to solve each part.
Step 3. join all subtasks.
Step 4. compose result from subresults.
}

Fork/Join 框架设计

Fork/Join 框架核心思想是把一个大任务拆分成若干个小任务,然后汇总每个小任务的结果最终得到大任务的结果,如果让你设计一个这样的框架,你会如何实现呢?(建议思考一下),Fork/Join 框架的整个流程正如其名所示,分为两个步骤:

  1. 大任务分割 需要有这么一个的类,用来将大任务拆分为子任务,可能一次拆分后的子任务还是比较大,需要多次拆分,直到拆分出来的子任务符合我们定义的小任务才结束。
  2. 执行任务并合并任务结果 第一步拆分出来的子任务分别存放在一个个 双端队列 里面(P.S. 这里为什么要使用双端队列请看下文),然后每个队列启动一个线程从队列中获取任务执行。这些子任务的执行结果都会放到一个统一的队列中,然后再启动一个线程从这个队列中拿数据,最后合并这些数据返回。

Fork/Join 框架使用了如下两个类来完成以上两个步骤:

  • ForkJoinTask 类 在上文的实例中也有提到,表示 ForkJoin 任务,在使用框架时首先必须先定义任务,通常只需要继承自 ForkJoinTask 类的子类 RecursiveAction(无返回结果) 或者 RecursiveTask(有返回结果)即可。
  • ForkJoinPool 从名字也可以猜到一二了,就是用来执行 ForkJoinTask 的线程池。大任务拆分出的子任务会添加到当前线程的双端队列的头部。

喜欢思考的你,心中想必会想到这么一种场景,当我们需要完成一个大任务时,会先把这个大任务拆分为多个独立的子任务,这些子任务会放到独立的队列中,并为每个队列都创建一个单独的线程去执行队列里的任务,即这里线程和队列时一对一的关系,那么当有的线程可能会先把自己队列的任务执行完成了,而有的线程则没有执行完成,这就导致一些先执行完任务的线程干等了,这是个好问题。

既然是做并发的,肯定要最大程度压榨计算机的性能,对于这种场景并发大师 Doug Lea 使用了工作窃取算法处理,使用工作窃取算法后,先完成自己队列中任务的线程会去其它线程的队列中”窃取“一个任务来执行,哈哈,一方有难,八方支援。但是此时这个线程和队列的持有线程会同时访问同一个队列,所以为了减少窃取任务的线程和被窃取任务的线程之间的竞争ForkJoin 选择了双端队列这种数据结构,这样就可以按照这种规则执行任务了:被窃取任务的线程始终从队列头部获取任务并执行,窃取任务的线程使用从队列尾部获取任务执行。这个算法在绝大部分情况下都可以充分利用多线程进行并行计算,但是在双端队列里只有一个任务等极端情况下还是会存在一定程度的竞争。

2.png

Fork/Join 框架实现原理

Fork/Join 框架的实现核心是 ForkJoinPool 类,该类的重要组成部分为 ForkJoinTask 数组和 ForkJoinWorkerThread 数组,其中 ForkJoinTask 数组用来存放框架使用者给提交给 ForkJoinPool 的任务,ForkJoinWorkerThread 数组则负责执行这些任务。任务有如下四种状态:

  • NORMAL 已完成
  • CANCELLED 被取消
  • SIGNAL 信号
  • EXCEPTIONAL 发生异常

下面来看看这两个类的核心方法实现原理,首先来看 ForkJoinTaskfork() 方法,源码如下:

6.png

方法对于 ForkJoinWorkerThread 类型的线程,首先会调用 ForkJoinWorkerThreadworkQueuepush() 方法异步的去执行这个任务,然后马上返回结果。继续跟进 ForkJoinPoolpush() 方法,源码如下:

8.png

方法将当前任务添加到 ForkJoinTask 任务队列数组中,然后再调用 ForkJoinPoolsignalWork 方法创建或者唤醒一个工作线程来执行该任务。然后再来看看 ForkJoinTaskjoin() 方法,方法源码如下:

3.png

4.png

方法首先调用了 doJoin() 方法,该方法返回当前任务的状态,根据返回的任务状态做不同的处理:

  1. 已完成状态则直接返回结果
  2. 被取消状态则直接抛出异常(CancellationException
  3. 发生异常状态则直接抛出对应的异常

继续跟进 doJoin() 方法,方法源码如下:

5.png

方法首先判断当前任务状态是否已经执行完成,如果执行完成则直接返回任务状态。如果没有执行完成,则从任务数组中(workQueue)取出任务并执行,任务执行完成则设置任务状态为 NORMAL,如果出现异常则记录异常并设置任务状态为 EXCEPTIONAL(在 doExec() 方法中)。

总结

本文主要介绍了 Java 并发框架中的 Fork/Join 框架的基本原理和其使用的工作窃取算法(work-stealing)、设计方式和部分实现源码。Fork/Join 框架在 JDK 的官方标准库中也有应用。比如 JDK 1.8+ 标准库提供的 Arrays.parallelSort(array) 可以进行并行排序,它的原理就是内部通过 Fork/Join 框架对大数组分拆进行并行排序,可以提高排序的速度,还有集合中的 Collection.parallelStream() 方法底层也是基于 Fork/Join 框架实现的,最后就是定义小任务的阈值往往是需要通过测试验证才能合理给出,并且保证程序可以达到最好的性能。

Spring 整合 Feign 的原理

作者 mghio
2021年11月5日 16:13

cover.jpg

前言

上篇 介绍了 Feign 的核心实现原理,在文末也提到了会再介绍其和 Spring Cloud 的整合原理,Spring 具有很强的扩展性,会把一些常用的解决方案通过 starter 的方式开放给开发者使用,在引入官方提供的 starter 后通常只需要添加一些注解即可使用相关功能(通常是 @EnableXXX)。下面就一起来看看 Spring Cloud 到底是如何整合 Feign 的。

整合原理浅析

在 Spring 中一切都是围绕 Bean 来展开的工作,而所有的 Bean 都是基于 BeanDefinition 来生成的,可以说 BeanDefinition 是整个 Spring 帝国的基石,这个整合的关键也就是要如何生成 Feign 对应的 BeanDefinition。

要分析其整合原理,我们首先要从哪里入手呢?如果你看过 上篇 的话,在介绍结合 Spring Cloud 使用方式的例子时,第二步就是要在项目的 XXXApplication 上加添加 @EnableFeignClients 注解,我们可以从这里作为切入点,一步步深入分析其实现原理(通常相当一部分的 starter 一般都是在启动类中添加了开启相关功能的注解)。

feign-1.png

进入 @EnableFeignClients 注解中,其源码如下:

feign-2.png

从注解的源码可以发现,该注解除了定义几个参数(basePackages、defaultConfiguration、clients 等)外,还通过 @Import 引入了 FeignClientsRegistrar 类,一般 @Import 注解有如下功能(具体功能可见 官方 Java Doc):

  • 声明一个 Bean
  • 导入 @Configuration 注解的配置类
  • 导入 ImportSelector 的实现类
  • 导入 ImportBeanDefinitionRegistrar 的实现类(这里使用这个功能

到这里不难看出,整合实现的主要流程就在 FeignClientsRegistrar 类中了,让我们继续深入到类 FeignClientsRegistrar 的源码,

feign-3.png

通过源码可知 FeignClientsRegistrar 实现 ImportBeanDefinitionRegistrar 接口,该接口从名字也不难看出其主要功能就是将所需要初始化的 BeanDefinition 注入到容器中,接口定义两个方法功能都是用来注入给定的 BeanDefinition 的,一个可自定义 beanName(通过实现 BeanNameGenerator 接口自定义生成 beanName 的逻辑),另一个使用默认的规则生成 beanName(类名首字母小写格式)。接口源码如下所示:

feign-4.png

对 Spring 有一些了解的朋友们都知道,Spring 会在容器启动的过程中根据 BeanDefinition 的属性信息完成对类的初始化,并注入到容器中。所以这里 FeignClientsRegistrar 的终极目标就是将生成的代理类注入到 Spring 容器中。
虽然 FeignClientsRegistrar 这个类的源码看起来比较多,但是从其终结目标来看,我们主要是看如何生成 BeanDefinition 的,通过源码可以发现其实现了 ImportBeanDefinitionRegistrar 接口,并且重写了 registerBeanDefinitions(AnnotationMetadata, BeanDefinitionRegistry) 方法,在这个方法里完成了一些 BeanDefinition 的生成和注册工作。源码如下:

feign-5.png

整个过程主要分为如下两个步骤:

  1. 给 @EnableFeignClients 的全局默认配置(注解的 defaultConfiguration 属性)创建 BeanDefinition 对象并注入到容器中(对应上图中的第 ① 步)
  2. 给标有了 @FeignClient 的类创建 BeanDefinition 对象并注入到容器中(对应上图中的第 ② 步)

下面分别深入方法源码实现来看其具体实现原理,首先来看看第一步的方法 registerDefaultConfiguration(AnnotationMetadata, BeanDefinitionRegistry),源码如下:

feign-6.png

可以看到这里只是获取一下注解 @EnableFeignClients 的默认配置属性 defaultConfiguration 的值,最终的功能实现交给了 registerClientConfiguration(BeanDefinitionRegistry, Object, Object) 方法来完成,继续跟进深入该方法,其源码如下:

feign-7.png

可以看到,全局默认配置的 BeanClazz 都是 FeignClientSpecification,然后这里将全局默认配置 configuration 设置为 BeanDefinition 构造器的输入参数,然后当调用构造器实例化时将这个参数传进去。到这里就已经把 @EnableFeignClients 的全局默认配置(注解的 defaultConfiguration 属性)创建出 BeanDefinition 对象并注入到容器中了,第一步到此完成,整体还是比较简单的。

下面再来看看第二步 给标有了 @FeignClient 的类创建 BeanDefinition 对象并注入到容器中 是如何实现的。深入第二步的方法 registerFeignClients(AnnotationMetadata, BeanDefinitionRegistry) 实现中,由于方法实现代码较多,使用截图会比较分散,所以用贴出源代码并在相关位置添加必要注释的方式进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
// 最终获取到有 @FeignClient 注解类的集合
LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();
// 获取 @EnableFeignClients 注解的属性 map
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
// 获取 @EnableFeignClients 注解的 clients 属性
final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients");
if (clients == null || clients.length == 0) {
// 如果 @EnableFeignClients 注解未指定 clients 属性则扫描添加(扫描过滤条件为:标注有 @FeignClient 的类)
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));
Set<String> basePackages = getBasePackages(metadata);
for (String basePackage : basePackages) {
candidateComponents.addAll(scanner.findCandidateComponents(basePackage));
}
}
else {
// 如果 @EnableFeignClients 注解已指定 clients 属性,则直接添加,不再扫描(从这里可以看出,为了加快容器启动速度,建议都指定 clients 属性)
for (Class<?> clazz : clients) {
candidateComponents.add(new AnnotatedGenericBeanDefinition(clazz));
}
}

// 遍历最终获取到的 @FeignClient 注解类的集合
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
// 验证带注释的类必须是接口,不是接口则直接抛出异常(大家可以想一想为什么只能是接口?)
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");
// 获取 @FeignClient 注解的属性值
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(FeignClient.class.getCanonicalName());
// 获取 clientName 的值,也就是在构造器的参数值(具体获取逻辑可以参见 getClientName(Map<String, Object>) 方法
String name = getClientName(attributes);
// 同上文第一步最后调用的方法,注入 @FeignClient 注解的配置对象到容器中
registerClientConfiguration(registry, name, attributes.get("configuration"));
// 注入 @FeignClient 对象,该对象可以在其它类中通过 @Autowired 直接引入(e.g. XXXService)
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}

通过源码可以看到最后是通过方法 registerFeignClient(BeanDefinitionRegistry, AnnotationMetadata, Map<String, Object>) 注入的 @FeignClient 对象,继续深入该方法,源码如下:

feign-8.png

方法实现比较长,最终目标是构造出 BeanDefinition 对象,然后通过 BeanDefinitionReaderUtils.registerBeanDefinition(BeanDefinitionHolder, BeanDefinitionRegistry) 注入到容器中。

其中关键的一步是从 @FeignClient 注解中获取信息并设置到 BeanDefinitionBuilder 中,BeanDefinitionBuilder 中注册的类是 FeignClientFactoryBean,这个类的功能正如它的名字一样是用来创建出 FeignClient 的 Bean 的,然后 Spring 会根据 FeignClientFactoryBean 生成对象并注入到容器中。

需要明确的一点是,实际上这里最终注入到容器当中的是 FeignClientFactoryBean 这个类,Spring 会在类初始化的时候会根据这个类来生成实例对象,就是调用 FeignClientFactoryBean.getObject() 方法,这个生成的对象就是我们实际使用的代理对象。下面再进入到类 FeignClientFactoryBean 的 getObject() 这个⽅法,源码如下:

feign-9.png

可以看到这个方法是直接调用的类中的另一个方法 getTarget() 的,在继续跟进该方法,由于该方法实现代码较多,使用截图会比较分散,所以用贴出源代码并在相关位置添加必要注释的方式进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* @param <T> the target type of the Feign client
* @return a {@link Feign} client created with the specified data and the context
* information
*/
<T> T getTarget() {
// 从 Spring 容器中获取 FeignContext Bean
FeignContext context = beanFactory != null ? beanFactory.getBean(FeignContext.class)
: applicationContext.getBean(FeignContext.class);
// 根据获取到的 FeignContext 构建出 Feign.Builder
Feign.Builder builder = feign(context);

// 注解 @FeignClient 未指定 url 属性
if (!StringUtils.hasText(url)) {
// url 属性是固定访问某一个实例地址,如果未指定协议则拼接 http 请求协议
if (!name.startsWith("http")) {
url = "http://" + name;
}
else {
url = name;
}
// 格式化 url
url += cleanPath();
// 生成代理和我们之前的代理一样,注解 @FeignClient 未指定 url 属性则返回一个带有负载均衡功能的客户端对象
return (T) loadBalance(builder, context, new HardCodedTarget<>(type, name, url));
}
// 注解 @FeignClient 已指定 url 属性
if (StringUtils.hasText(url) && !url.startsWith("http")) {
url = "http://" + url;
}
String url = this.url + cleanPath();
// 获取一个 client
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof FeignBlockingLoadBalancerClient) {
// not load balancing because we have a url,
// but Spring Cloud LoadBalancer is on the classpath, so unwrap
// 这里没有负载是因为我们有指定了 url
client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
}
builder.client(client);
}
// 生成代理和我们之前的代理一样,最后被注入到 Spring 容器中
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(type, name, url));
}

通过源码得知 FeignClientFactoryBean 继承了 FactoryBean,其方法 FactoryBean.getObject 返回的就是 Feign 的代理对象,最后这个代理对象被注入到 Spring 容器中,我们就通过 @Autowired 可以直接注入使用了。同时还可以发现上面的代码分支最终都会走到如下代码:

1
2
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, target);

点进去深入 targeter.target 的源码,可以看到实际上这里创建的就是一个代理对象,也就是说在容器启动的时候,会为每个 @FeignClient 创建了一个代理对象。至此,Spring Cloud 和 Feign 整合原理的核心实现介绍完毕。

总结

本文主要介绍了 Spring Cloud 整合 Feign 的原理。通过上文介绍,你已经知道 Srpring 会我们的标注的 @FeignClient 的接口创建了一个代理对象,那么有了这个代理对象我们就可以做增强处理(e.g. 前置增强、后置增强),那么你知道是如何实现的吗?感兴趣的朋友可以再翻翻源码寻找答案(温馨提示:增强逻辑在 InvocationHandler 中)。还有 Feign 与 Ribbon 和 Hystrix 等组件的协作,感兴趣的朋友可以自行下载源码学习了解。

信息爆炸时代,该如何获取优质信息?

作者 mghio
2021年9月5日 16:07

cover.jpg

前言

我们现在所处的信息爆炸时代,如何强调快速获取信息都不为过,信息多种多样,有些能找到源头,有些则不能,有些能找到规律,有些则不一定能找到,信息的源头和获取渠道很重要。然而事实上,能够真正有效获取到优质信息并加以消化利用的人并不多。

在信息的获取的过程中,应该要具备筛选信息的能力,什么是官方信息,你要核实,什么是虚假信息,你要甄别。看到网上有些陷入杀猪盘的,负载累累。仔细思考一下,其实甄别筛选信息的能力真的是最大的问题。

当然一个人将信息并内化利用是一个很复杂的过程,每个人都有自己独到的方法。今天来聊聊应该如何去获取「优质信息」以及如何去过滤无用信息。下面分享几个获取信息的原则:

尽自己最大努力去获取“一手信息”

这个原则的关键是,这里的“一手信息”是如何定义呢? 对于那些权威机构或者国家机构,或者专家大咖,或者作者本人所发布的信息绝大部分情况下都可以看作为一手信息,第一手信息,不是被别人理解过、消化过的二手信息。

尤其对于知识性的东西来说,更应该是这样。应该是原汁原味的,不应该是被添油加醋的。对于一手信息的价值为什么大于二手信息甚至多手信息呢?很简单,这个效应在股票或者投资市场会被放大的很明显,能够在第一时间获取到第一手信息,是能否准确快速判断出市场行情走向的关键因素之一。

收费的信息优于免费的信息

对于这个原则,可能不是绝对,但至少在绝大部分情况下是正确的,对于现在很多“白嫖党”来说,可能确实要改一改自己的陋习了,要知道,其实免费反而最贵,因为它给你带来的负面作用或者时间成本,甚至可能会“毒害”你对于信息和知识的热情,当然也会有少部分人会把好的东西给开源或者免费掉。

国外大部分情况优于国内

和上面的第二点一样,需要你带着审视和批判思维来看待了,国内整体的创作环境个人角色还是相关比较浮躁和恶劣的,尽管这些年有所改观,但当前的自媒体,包括一些所谓的大 V,也会有很多滥竽充数的文章,视频等内容,包括当前都说信息过载,其实准确来说,是垃圾信息过载,那些优质的内容与知识,毕竟少数。

有选择的相信专家,并关注他们的日常分享,但不要迷恋迷信专家,很多人会无脑喷当前所谓的砖家伪公知 ,但在大部分情况,专家是在某些领域沉淀研究了很多年,你可以看看别人的一些思路,观点,与框架性东西,在某些情况下,可能真会对自己有所启发。

通过信息的冗余和比对

举一个简单的例子,如果今天巴菲特发表了一番言论,当然媒体会对此有记录和报道。但是,各种媒体可能记录有误差,而且可能还有意无意加入自己的看法,把不是巴菲特发表的言论加到他头上,这样就主观或客观地引入了错误信息。

此时如果你只从一个信息源了解信息,其实是很难判断所获得的是准确信息还是夹杂着一定的错误信息的。但是如果你能从多个信息源了解信息,虽然它们各自都有部分个人主观因素,但是由于各自角度的不同,很多噪音彼此可以抵消掉,获得的则是相对比较准确的信息。

要时刻警惕回音室响应,避免把自己关进一个封闭的信息圈子,这样慢慢的外部的信息就没法进来了,总之在获取信息的时候一定要尝试从不同维度,不同角度去摄取。

将信息分解到不同的维度过滤

在中国,人们常常都会很纠结一个问题,「就是老婆和妈妈掉到水里后先救谁?」,如果仔细思考一下,这个两难问题的重要原因在于,我们要考虑的因素太多,以至于大家越想越糊涂。其实只要你细想就会发现,这个问题的关键是分清楚什么是我们该考虑的信息,什么是不用考虑的信息。

比如,如果你觉得孝是第一位的,或者觉得以后谁和我生活更长时间是第一位的,作出选择就没有什么难的。这时,你其实是将这个有很多干扰信息的问题,分解到了某些你能够区分的维度,比如孝的维度,或者和一起生活的时间的维度。

要主动去 pull 信息,不要总是等 push 信息

这是最后一点,也是最重要的一点,这里的 pull 和 push 可能说得有点偏技术化,解释一下就是 pull 是说要目的地在网络上主动查询一些信息,而不是等各种 APP 给你推送信息,虽然主动查询信息可能会让你感到比较难受,但这是获取优质信息的第一步。

我个人是目前做技术相关的工作,对这点比较有感触,一个人的学习能力强不强,其实就像生存能力一样,一个重要判断点就是看这个人是能自己找食吃,还是要等别人“喂着吃”。

别人投喂给你的信息未必都是错的,都是坏的,比如某些自媒体的信息,但如果要获取到更为准确的信息,我建议你还是要去主动搜索核对一下,而不是单凭别人的一面之词。也就是说,别人是不会为你的后果负责的,而你要为自己的后果负责。


当然,以上是个人在选择信息源的一些原则和思考,希望可以对需要进行信息获取和筛选的朋友有所帮助。

分享几个好用的 Google 搜索技巧

作者 mghio
2021年8月15日 16:03

cover.jpg

搜索能力是被绝大多数人低估一项基本素质,绝大部分做编程技术相关的朋友应该都知道如何使用 Google,但是并不知道如何利用它的潜力。其实不管是 Google 还是 百度,会搜索的人一样都可以查找到需要的东西,不会搜索的人用什么都不好使。下面介绍一些 Google 常用的搜索技巧以及搜索快捷方式,可以帮助你更快,更准确地找到结果。Google 是世界上功能最强大的搜索引擎,它已经改变了我们查找信息的方式。

0. 使用准确的词组

将您要搜索的关键字用引号引起来,Google 会进行精确的词组搜索。

语法:”[searchkey 1] [searchkey 2]” [searchkey 3]

Google-Tips-1.jpg

1. 多个互斥的搜索条件使用 OR

默认情况下,除非指定,否则 Google 会包含你搜索条件中的所有搜索关键字。通过在您的关键词之字输入ORGoogle 会知道它可以查找一组或另一组。大写 OR,否则 Google 会认为它只是你的关键字的一部分。

语法:[searchkey 1] OR [searchkey 2]

Google-Tips-2.jpg

2. 排除指定关键词

通过在单词的前面添加减号,将单词从 Google 搜索中排除。

语法:-[searchkey to exclude] [searchkey to include]

Google-Tips-3.jpg

3. 查找文本块中的所有单词

使用 Googleallintext: 语法仅搜索网站的正文,而忽略链接,URL 和标题。

语法:allintext:[searchkeys]

Google-Tips-4.jpg

4. 在文本 + 标题 + URL 等查找单词

查找搜索词在不同位置的网页。即-在页面正文中,页面标题,URL 等中。为此,在您的关键字之前使用 intext:

语法:intext:[searchkeys]

Google-Tips-5.jpg

5. 标题搜索(单个关键字)

在网页标题内搜索一个单词,然后在网页上的其他位置搜索另一个单词。为此,您需要将 intitle: 混合到您的搜索查询中。

语法:[searchkeys 1] intitle:[searchkeys 2]

Google-Tips-6.jpg

6. 标题搜索(多个关键字)

在网页标题中搜索查询中的所有关键字,在我们的搜索词之前使用 allintitle:

语法:allintitle:[searchkey1 searchkey2]

Google-Tips-7.jpg

7. 在 URL 中搜索

使用 allinURL 可以很容易地在 URL 中搜索关键字 。

语法:allinURL:[searchkeys]

Google-Tips-8.jpg

8. 在指定网站内搜索

在网站内搜索单词-使用网站 URL 前面的 site: 语法,后跟您的搜索词。这会将搜索结果仅限制在该网站上。

语法:site:[website URL] [searchkeys]

Google-Tips-9.jpg

9. Google 搜索定义

通过在单词之前使用 define: 轻松地找到单词的定义,而无需访问词典网站。Google 将提供定义,并提供一个音频播放器来提供该单词的语音发音。

语法:define:[searchkey]

Google-Tips-10.jpg

10. Google 搜索通配符(遗漏或未知词)

没想到所有的话吗?加上 * 告诉 Google 为您填写空白,这对于歌曲歌词或书名搜索非常有效。

语法:[searchkeys 1] * [searchkeys 2]

Google-Tips-11.jpg

11. Google 搜索文件类型

搜索文件类型(例如 PowerPointPDF 等)时,请在搜索词中使用 filetype: 命令。

语法:[searchkeyword] filetype:[file type extension]

Google-Tips-12.jpg

12. 转换计算

使用 Google 可以进行任何度量转换。

语法:convert [data value + unit of measure] to [like unit of measure]

Google-Tips-13.jpg

13. Google 搜索计算器

在搜索栏中输入您的计算结果,将 Google 用作计算器。数值运算符: * 表示乘,+ 表示加,- 表示减,/ 表示除。

语法:[number] [operator] [number]

Google-Tips-14.jpg

14. Google 图片搜索

查找图像的名称,描述和类型。

语法:[searchkeyw] image type

Google-Tips-15.jpg

如何实现一个简易版的 Spring - 如何实现 AOP(终结篇)

作者 mghio
2021年6月13日 15:57

cover.jpg

前言

上篇 实现了 判断一个类的方式是符合配置的 pointcut 表达式、根据一个 Bean 的名称和方法名,获取 Method 对象、实现了 BeforeAdvice、AfterReturningAdvice 以及 AfterThrowingAdvice并按照指定次序调用 等功能,这篇再来看看剩下的 代理对象如何生成根据 XML 配置文件生成 BeanDefintion以及如何将生成的代理对象放入到容器中 等功能,话不多说,下面进入主题。

代理对象生成

代理对象的生成策略和 Spring 框架一致,当被代理类实现了接口时采用 JDK 动态代理的方式生成代理对象,被代理对象未实现接口时使用 CGLIB 来生成代理对象,为了简单起见这里不支持手动指定生成代理对象的策略,JDK 动态代理的实现这里不在介绍,感兴趣可以自己实现一下,这里主要讨论 CGLIB 的生成方式。

在这里插入图片描述

基于面向接口编程的思想,这里的生成代理对象需要定义一个统一的接口,不管是 CGLIB 生成方式还是JDK 动态代理生成方式都要实现该接口。生成代理对象是根据一些配置去生成的,同样,这里生成代理的配置也可以抽取一个统一的接口,在实现类中定义拦截器(也就是 Advice)以及实现的接口等,CGLIB 的基本使用可以到官网自行查找。代理对象生成的整体的类图如下:

在这里插入图片描述

其中代理创建的工厂接口 AopProxyFactory 如下,提供了不指定 ClassLoader(使用默认的 ClassLoader)和指定 ClassLoader 两种方式创建代理对象,源码如下:

1
2
3
4
5
6
7
8
9
10
11
/**
* @author mghio
* @since 2021-06-13
*/
public interface AopProxyFactory {

Object getProxy();

Object getProxy(ClassLoader classLoader);

}

使用 CGLIB 创建代理的工厂接口实现类如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* @author mghio
* @since 2021-06-13
*/
public class CglibProxyFactory implements AopProxyFactory {

/*
* Constants for CGLIB callback array indices
*/
private static final int AOP_PROXY = 0;

protected final Advised advised;

public CglibProxyFactory(Advised config) {
Assert.notNull(config, "AdvisedSupport must not be null");
if (config.getAdvices().size() == 0) {
throw new AopConfigException("No advisors and no TargetSource specified");
}

this.advised = config;
}

@Override
public Object getProxy() {
return getProxy(null);
}

@Override
public Object getProxy(ClassLoader classLoader) {
if (logger.isDebugEnabled()) {
logger.debug("Creating CGLIB proxy: target class is " + this.advised.getTargetClass());
}

try {
Class<?> rootClass = this.advised.getTargetClass();

// Configure CGLIB Enhancer...
Enhancer enhancer = new Enhancer();
if (classLoader != null) {
enhancer.setClassLoader(classLoader);
}
enhancer.setSuperclass(rootClass);
enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE); // BySpringCGLIB
enhancer.setInterceptDuringConstruction(false);

Callback[] callbacks = getCallbacks(rootClass);
Class<?>[] types = new Class<?>[callbacks.length];
for (int i = 0; i < types.length; i++) {
types[i] = callbacks[i].getClass();
}
enhancer.setCallbackFilter(new ProxyCallbackFilter(this.advised));
enhancer.setCallbackTypes(types);
enhancer.setCallbacks(callbacks);

// Generate the proxy class and create a proxy instance.
return enhancer.create();
}
catch (CodeGenerationException | IllegalArgumentException ex) {
throw new AopConfigException("Could not generate CGLIB subclass of class [" +
this.advised.getTargetClass() + "]: " +
"Common causes of this problem include using a final class or a non-visible class",
ex);
} catch (Exception ex) {
// TargetSource.getTarget() failed
throw new AopConfigException("Unexpected AOP exception", ex);
}
}

// omit other methods ...

}

整体来看还是比较简单的,主要是 CGLIB 第三方字节码生成库的基本用法,当然,前提是你已经了解了 CGLIB 的基本使用。AOP 的相关配置接口 Advised 相对来说就比较简单了,主要是一些相关属性的增、删、改等操作,主要部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @author mghio
* @since 2021-06-13
*/
public interface Advised {

Class<?> getTargetClass();

boolean isInterfaceProxied(Class<?> intf);

List<Advice> getAdvices();

void addAdvice(Advice advice);

List<Advice> getAdvices(Method method);

void addInterface(Class<?> clazz);

// omit other methods ...

}

实现类也比较简单,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* @author mghio
* @since 2021-06-13
*/
public class AdvisedSupport implements Advised {

private boolean proxyTargetClass = false;
private Object targetObject = null;
private final List<Advice> advices = new ArrayList<>();
private final List<Class<?>> interfaces = new ArrayList<>();

public AdvisedSupport() {
}

@Override
public Class<?> getTargetClass() {
return this.targetObject.getClass();
}

@Override
public boolean isInterfaceProxied(Class<?> intf) {
return interfaces.contains(intf);
}

@Override
public List<Advice> getAdvices() {
return this.advices;
}

@Override
public void addAdvice(Advice advice) {
this.advices.add(advice);
}

@Override
public List<Advice> getAdvices(Method method) {
List<Advice> result = new ArrayList<>();
for (Advice advice : this.getAdvices()) {
Pointcut pc = advice.getPointcut();
if (pc.getMethodMatcher().matches(method)) {
result.add(advice);
}
}
return result;
}

@Override
public void addInterface(Class<?> clazz) {
this.interfaces.add(clazz);
}

// omit other methods ...

}

到这里,代理对象使用 CGLIB 生成的方式就已经实现了,核心代码其实比较简单,主要是需要多考虑考虑代码后期的扩展性。

创建 BeanDefinition

我们先来看看一般 AOP 在 XML 配置文件中是如何定义的,一个包含 BeforeAdvice、AfterReturningAdvice以及AfterThrowingAdvice 的 XML 配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.e3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/beans/spring-context.xsd">

<context:scann-package base-package="cn.mghio.service.version5,cn.mghio.dao.version5" />

<bean id="tx" class="cn.mghio.tx.TransactionManager"/>

<aop:config>
<aop:aspect ref="tx">
<aop:pointcut id="placeOrder" expression="execution(* cn.mghio.service.version5.*.placeOrder(..))"/>
<aop:before pointcut-ref="placeOrder" method="start"/>
<aop:after-returning pointcut-ref="placeOrder" method="commit"/>
<aop:after-throwing pointcut-ref="placeOrder" method="rollback"/>
</aop:aspect>
</aop:config>
</beans>

有了之前解析 XML 的 Bean 定义的经验后,很显然这里我们需要一个数据结构去表示这个 AOP 配置,如果你阅读过 上篇 的话,类 AspectJExpressionPointcut 表示的是 <aop:pointcut id=”placeOrder” expression=”execution(* cn.mghio.service.version5.*.placeOrder(..))”/>,另外几个 Advice 配置分别对应 AspectJBeforeAdvice、AspectJAfterReturningAdvice以及 AspectJAfterThrowingAdvice 等几个类。
这里只要解析 XML 配置文件,然后使用对应的 Advice 的构造器创建对应的对象即可,解析 XML 使用的是 dom4j,主要部分代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* @author mghio
* @since 2021-06-13
*/
public class ConfigBeanDefinitionParser {

private static final String ASPECT = "aspect";
private static final String EXPRESSION = "expression";
private static final String ID = "id";
private static final String REF = "ref";
private static final String BEFORE = "before";
private static final String AFTER = "after";
private static final String AFTER_RETURNING_ELEMENT = "after-returning";
private static final String AFTER_THROWING_ELEMENT = "after-throwing";
private static final String AROUND = "around";
private static final String POINTCUT = "pointcut";
private static final String POINTCUT_REF = "pointcut-ref";
private static final String ASPECT_NAME_PROPERTY = "aspectName";

public void parse(Element element, BeanDefinitionRegistry registry) {
List<Element> childElements = element.elements();
for (Element el : childElements) {
String localName = el.getName();
if (ASPECT.equals(localName)) {
parseAspect(el, registry);
}
}
}

private void parseAspect(Element aspectElement, BeanDefinitionRegistry registry) {
String aspectName = aspectElement.attributeValue(REF);

List<BeanDefinition> beanDefinitions = new ArrayList<>();
List<RuntimeBeanReference> beanReferences = new ArrayList<>();

// parse advice
List<Element> elements = aspectElement.elements();
boolean adviceFoundAlready = false;
for (Element element : elements) {
if (isAdviceNode(element)) {
if (!adviceFoundAlready) {
adviceFoundAlready = true;
if (!StringUtils.hasText(aspectName)) {
return;
}
beanReferences.add(new RuntimeBeanReference(aspectName));
}
GenericBeanDefinition advisorDefinition = parseAdvice(aspectName, element, registry,
beanDefinitions, beanReferences);
beanDefinitions.add(advisorDefinition);
}
}

// parse pointcut
List<Element> pointcuts = aspectElement.elements(POINTCUT);
for (Element pointcut : pointcuts) {
parsePointcut(pointcut, registry);
}
}

private void parsePointcut(Element pointcutElement, BeanDefinitionRegistry registry) {
String id = pointcutElement.attributeValue(ID);
String expression = pointcutElement.attributeValue(EXPRESSION);

GenericBeanDefinition pointcutDefinition = createPointcutDefinition(expression);
if (StringUtils.hasText(id)) {
registry.registerBeanDefinition(id, pointcutDefinition);
} else {
BeanDefinitionReaderUtils.registerWithGeneratedName(pointcutDefinition, registry);
}
}

private GenericBeanDefinition parseAdvice(String aspectName, Element adviceElement,
BeanDefinitionRegistry registry, List<BeanDefinition> beanDefinitions,
List<RuntimeBeanReference> beanReferences) {

GenericBeanDefinition methodDefinition = new GenericBeanDefinition(MethodLocatingFactory.class);
methodDefinition.getPropertyValues().add(new PropertyValue("targetBeanName", aspectName));
methodDefinition.getPropertyValues().add(new PropertyValue("methodName",
adviceElement.attributeValue("method")));
methodDefinition.setSynthetic(true);

// create instance definition factory
GenericBeanDefinition aspectFactoryDef = new GenericBeanDefinition(AopInstanceFactory.class);
aspectFactoryDef.getPropertyValues().add(new PropertyValue("aspectBeanName", aspectName));
aspectFactoryDef.setSynthetic(true);

// register the pointcut
GenericBeanDefinition adviceDef = createAdviceDefinition(adviceElement, aspectName,
methodDefinition, aspectFactoryDef, beanDefinitions, beanReferences);
adviceDef.setSynthetic(true);

// register the final advisor
BeanDefinitionReaderUtils.registerWithGeneratedName(adviceDef, registry);

return adviceDef;
}

// omit other methods ...

}

创建 BeanDefinition 已经完成了,现在可根据 XML 配置文件解析出对应的 BeanDefintion 了,下面只需要在合适的时机将这些 BeanDefinition 放到容器中就完成了全部流程了。

如何放到容器中

该如何把解析出来的 BeanDefintion 放到容器当中去呢?我们知道在 Spring 框架当中提供了很多的“钩子函数”,可以从这里入手,Bean 的生命周期如下:

在这里插入图片描述

选择在 Bean 实例化完成之后 BeanPostProcessor 的 postProcessAfterInitialization() 方法创建代理对象,AOP 使用的是 AspectJ,将创建代理对象的类命名为 AspectJAutoProxyCreator,实现 BeanPostProcessor 接口,处理代理对象的创建,AspectJAutoProxyCreator 类的核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* @author mghio
* @since 2021-06-13
*/
public class AspectJAutoProxyCreator implements BeanPostProcessor {

private ConfigurableBeanFactory beanFactory;

@Override
public Object beforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

@Override
public Object afterInitialization(Object bean, String beanName) throws BeansException {
// 如果这个 bean 本身就是 Advice 及其子类,则不生成动态代理
if (isInfrastructureClass(bean.getClass())) {
return bean;
}

List<Advice> advices = getCandidateAdvices(bean);
if (advices.isEmpty()) {
return bean;
}

return createProxy(advices, bean);
}

protected Object createProxy(List<Advice> advices, Object bean) {
Advised config = new AdvisedSupport();
for (Advice advice : advices) {
config.addAdvice(advice);
}

Set<Class> targetInterfaces = ClassUtils.getAllInterfacesForClassAsSet(bean.getClass());
for (Class targetInterface : targetInterfaces) {
config.addInterface(targetInterface);
}
config.setTargetObject(bean);

AopProxyFactory proxyFactory = null;
if (config.getProxiedInterfaces().length == 0) {
// CGLIB 代理
proxyFactory = new CglibProxyFactory(config);
} else {
// TODO(mghio): JDK dynamic proxy ...

}

return proxyFactory.getProxy();
}

public void setBeanFactory(ConfigurableBeanFactory beanFactory) {
this.beanFactory = beanFactory;
}

private List<Advice> getCandidateAdvices(Object bean) {
List<Object> advices = this.beanFactory.getBeansByType(Advice.class);
List<Advice> result = new ArrayList<>();
for (Object advice : advices) {
Pointcut pointcut = ((Advice) advice).getPointcut();
if (canApply(pointcut, bean.getClass())) {
result.add((Advice) advice);
}
}
return result;
}

private boolean canApply(Pointcut pointcut, Class<?> targetClass) {
MethodMatcher methodMatcher = pointcut.getMethodMatcher();
Set<Class> classes = new LinkedHashSet<>(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
classes.add(targetClass);
for (Class<?> clazz : classes) {
Method[] methods = clazz.getDeclaredMethods();
for (Method m : methods) {
if (methodMatcher.matches(m)) {
return true;
}
}
}
return false;
}

private boolean isInfrastructureClass(Class<?> beanClass) {
return Advice.class.isAssignableFrom(beanClass);
}
}

最后别忘了,这里的 BeanPostProcessor 接口是我们新加的,需要到之前定义的 DefaultFactoryBean 中加上对 BeanPostProcessor 的处理逻辑,主要修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class DefaultBeanFactory extends AbstractBeanFactory implements BeanDefinitionRegistry {

@Override
public Object createBean(BeanDefinition bd) throws BeanCreationException {
// 1. instantiate bean
Object bean = instantiateBean(bd);
// 2. populate bean
populateBean(bd, bean);
// 3. initialize bean
bean = initializeBean(bd, bean);
return bean;
}

protected Object initializeBean(BeanDefinition bd, Object bean) {

...

// 非合成类型则创建代理
if (!bd.isSynthetic()) {
return applyBeanPostProcessorAfterInitialization(bean, bd.getId());
}
return bean;
}

private Object applyBeanPostProcessorAfterInitialization(Object existingBean, String beanName) {
Object result = existingBean;
for (BeanPostProcessor postProcessor : getBeanPostProcessors()) {
result = postProcessor.afterInitialization(result, beanName);
if (result == null) {
return null;
}
}
return result;
}

// omit other field and methods ...

}

最后运行事先测试用例,正常通过符合预期。

在这里插入图片描述

总结

本文主要介绍了 AOP 代理对象生成、解析 XML 配置文件并创建对应的 BeanDefinition 以及最后注入到容器中,只是介绍了大体实现思路,具体代码实现已上传 mghio-spring,感兴趣的朋友可以参考,到这里,AOP 实现部分已经全部介绍完毕。

如何实现一个简易版的 Spring - 如何实现 @Autowired 注解

作者 mghio
2021年3月7日 15:44

cover.jpg

前言

本文是 如何实现一个简易版的 Spring 系列第四篇,在 上篇 介绍了 @Component 注解的实现,这篇再来看看在使用 Spring 框架开发中常用的 @Autowired 注入要如何实现,大家用过 Spring 都知道,该注解可以用在字段构造函数以及setter 方法上,限于篇幅原因我们主要讨论用在字段的方式实现,其它的使用方式大体思路是相同的,不同的只是解析和注入方式有所区别,话不多说,下面进入我们今天的正题—如何实现一个简易版的 Spring - 如何实现 @Autowired 注解

实现步骤拆分

实现步骤总的来说分为三大步:

  1. 分析总结要做的事情,抽象出数据结构
  2. 利用这些数据结构来做一些事情
  3. 在某个时机注入到 Spring 容器中

细心的朋友可以发现,其实前面几篇文章的实现也是套路,其中最为关键也是比较困难的点就是如何抽象出数据结构。这里我们要做的是当某个 Bean 上的字段有 @Autowired 注解时,从容器中获取该类型的 Bean 然后调用该字段对应的 setter 方法设置到对象的属性中。下面就跟着这个思路去实现 @Autowired 注解。

数据结构抽象

要想根据字段的类型注入在容器中对应的实例,首先需要提供这个从一个类型获取对应 Bean 实例的能力,这需要 BeanFactory 接口提供一个这样的能力,等等,像这样容器内部使用的接口直接定义在 BeanFactory 好吗?像这种内部的操作应该尽量做到对使用者透明,所以这里新加一个接口 AutowireCapableBeanFactory 继承自 BeanFactory,这样在内部就可以直接使用新接口接口。需要注意的是新接口的方法参数并不能直接使用 Class 类型去容器中查找对应的 Bean,为了后期的灵活扩展(比如:是否必须依赖等),需要使用一个类来描述这种依赖,命名为 DependencyDescriptor,其部分源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @author mghio
* @since 2021-03-07
*/
public class DependencyDescriptor {
private Field field;
private boolean required;

public DependencyDescriptor(Field field, boolean required) {
Assert.notNull(field, "Field must not be null");
this.field = field;
this.required = required;
}

public Class<?> getDependencyType() {
if (this.field != null) {
return field.getType();
}
throw new RuntimeException("only support field dependency");
}

public boolean isRequired() {
return this.required;
}
}

接口 AutowireCapableBeanFactory 声明如下:

1
2
3
4
5
6
7
/**
* @author mghio
* @since 2021-03-07
*/
public interface AutowireCapableBeanFactory extends BeanFactory {
Object resolveDependency(DependencyDescriptor descriptor);
}

查找解析依赖的功能我们抽象完成了,下面来看看核心步骤如何抽象封装注入的过程,抽象总结后不难发现,注入可以分为两大部分:注入的目标对象需要被注入的元素列表,这些对于注入来说是一些元数据,命名为 InjectionMetadata,其包含两个字段,一个是注入的目标对象,另一个是被注入的元素列表,还有一个重要的方法将元素列表注入到方法参数传入的目标对象中去。

每个注入元素都要提供一个注入到指定目标对象的能力,所以抽取出公共抽象父类 InjectionElement,使用上文的 AutowireCapableBeanFactory 接口解析出当前字段类型对应 Bean,然后注入到指定的目标对象中。抽象父类 InjectinElement 的主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @author mghio
* @since 2021-03-07
*/
public abstract class InjectionElement {
protected Member member;
protected AutowireCapableBeanFactory factory;

public InjectionElement(Member member, AutowireCapableBeanFactory factory) {
this.member = member;
this.factory = factory;
}

abstract void inject(Object target);
}

注入元数据类 InjectionMetadata 的主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @author mghio
* @since 2021-03-07
*/
public class InjectionMetadata {
private final Class<?> targetClass;
private List<InjectionElement> injectionElements;

public InjectionMetadata(Class<?> targetClass, List<InjectedElement> injectionElements) {
this.targetClass = targetClass;
this.injectionElements = injectionElements;
}

public void inject(Object target) {
if (injectionElements == null || injectionElements.isEmpty()) {
return;
}
for (InjectionElement element : injectionElements) {
element.inject(target);
}
}

...

}

把一个 Class 转换为 InjectionMetadata 的部分实现我们留到下文实现部分介绍,抽象后总的流程就是把一个 Class 转换为 InjectionMedata ,然后调用 InjectionMedata 提供的 inject(Object) 方法来完成注入(依赖 AutowireCapableBeanFactory 接口提供的 resolveDependency(DependencyDescriptor) 能力),下面是抽象后的字段注入部分的相关类图关系如下:

autowried-arc.png

解析构造出定义的数据结构

在上文我们还没实现将一个类转换为 InjectionMetadata 的操作,也就是需要实现这样的一个方法 InjectionMetadata buildAutowiringMetadata(Class<?> clz),实现过程也比较简单,扫描类中声明的属性找到有 @Autowried 注解解析构造出 InjectinMetadata 实例,核心实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* @author mghio
* @since 2021-03-07
*/
public class AutowiredAnnotationProcessor {
private final String requiredParameterName = "required";
private boolean requiredParameterValue = true;
private final Set<Class<? extends Annotation>> autowiredAnnotationTypes = new LinkedHashSet<>();

public AutowiredAnnotationProcessor() {
this.autowiredAnnotationTypes.add(Autowired.class);
}

public InjectionMetadata buildAutowiringMetadata(Class<?> clz) {
LinkedList<InjectionElement> elements = new LinkedList<>();
Class<?> targetClass = clz;
do {
LinkedList<InjectionElement> currElements = new LinkedList<>();
for (Field field : targetClass.getDeclaredFields()) {
Annotation ann = findAutowiredAnnotation(field);
if (ann != null) {
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
boolean required = determineRequiredStatus(ann);
elements.add(new AutowiredFieldElement(field, required, beanFactory));
}
}
elements.addAll(0, currElements);
targetClass = targetClass.getSuperclass();
} while (targetClass != null && targetClass != Object.class);
return new InjectionMetadata(clz, elements);
}

protected boolean determineRequiredStatus(Annotation ann) {
try {
Method method = ReflectionUtils.findMethod(ann.annotationType(), this.requiredParameterName);
if (method == null) {
return true;
}
return (this.requiredParameterValue == (Boolean) ReflectionUtils.invokeMethod(method, ann));
} catch (Exception e) {
return true;
}
}

private Annotation findAutowiredAnnotation(AccessibleObject ao) {
for (Class<? extends Annotation> annotationType : this.autowiredAnnotationTypes) {
Annotation ann = AnnotationUtils.getAnnotation(ao, annotationType);
if (ann != null) {
return ann;
}
}
return null;
}

...

}

上面在做数据结构抽象时定义好了注入元素的抽象父类 InjectionElement,这里需要定义一个子类表示字段注入类型,命名为 AutowiredFieldElement,依赖 AutowireCapableBeanFactory 接口的能力解析出字段所属类型的 Bean,然后调用属性的 setter 方法完成注入,在基于我们上面定义好的数据结构后实现比较简单,主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author mghio
* @since 2021-03-07
*/
public class AutowiredFieldElement extends InjectionElement {
private final boolean required;

public AutowiredFieldElement(Field field, boolean required, AutowireCapableBeanFactory factory) {
super(field, factory);
this.required = required;
}

public Field getField() {
return (Field) this.member;
}

@Override
void inject(Object target) {
Field field = this.getField();
try {
DependencyDescriptor descriptor = new DependencyDescriptor(field, this.required);
Object value = factory.resolveDependency(descriptor);
if (value != null) {
ReflectionUtils.makeAccessible(field);
field.set(target, value);
}
} catch (Throwable e) {
throw new BeanCreationException("Could not autowire field:" + field, e);
}
}
}

注入到 Spring 中

接下来面临的问题是:要在什么时候调用上面这些类和方法呢?在这里我们回顾一下 SpringBean 的生命周期,其中几个钩子入口如下图所示:

bean-lifecycle.png

通过生命周期开放的钩子方法可以看出我们需要在 InstantiationAwareBeanPostProcessor 接口的 postProcessPropertyValues 方法中实现 Autowired 注入,将前面的 AutowiredAnnotationProcessor 类实现该接口然后在 postProcessPropertyValues 方法处理注入即可。这部分的整体类图如下所示:

AutowriedAnnotationProcessor.png

AutowiredAnnotationProcessor 处理器实现的 postProcessPropertyValues() 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @author mghio
* @since 2021-03-07
*/
public class AutowiredAnnotationProcessor implements InstantiationAwareBeanProcessor {

...

@Override
public void postProcessPropertyValues(Object bean, String beanName) throws BeansException {
InjectionMetadata metadata = this.buildAutowiringMetadata(bean.getClass());
try {
metadata.inject(bean);
} catch (Throwable e) {
throw new BeanCreationException(beanName, "Injection of autowired dependencies failed");
}
}
}

然后只需要在抽象父类 AbstractApplicationContext 构造函数注册那些我们定义的 processor,然后在 Bean 注入的时候(DefaultBeanFactory.populateBean())调用 processorpostProcessPropertyValues 方法完成属性注入,抽象类 AbstractApplicationContext 改动部分的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @author mghio
* @since 2021-03-07
*/
public abstract class AbstractApplicationContext implements ApplicationContext {

...

public AbstractApplicationContext(String configFilePath) {
...
registerBeanPostProcessor(beanFactory);
}

protected void registerBeanPostProcessor(ConfigurableBeanFactory beanFactory) {
AutowiredAnnotationProcessor postProcessor = new AutowiredAnnotationProcessor();
postProcessor.setBeanFactory(beanFactory);
beanFactory.addBeanPostProcessor(postProcessor);
}

...

}

BeanFactory 接口的默认实现类 DefaultBeanFactory 注入 Bean 属性的方法 populateBean(BeanDefinition, Object) 改动如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @author mghio
* @since 2021-03-07
*/
public class DefaultBeanFactory extends DefaultSingletonBeanRegistry implements ConfigurableBeanFactory,
BeanDefinitionRegistry {

...

private final List<BeanPostProcessor> beanPostProcessors = new ArrayList<>();

private void populateBean(BeanDefinition bd, Object bean) {
for (BeanPostProcessor postProcessor : this.getBeanPostProcessors()) {
if (postProcessor instanceof InstantiationAwareBeanProcessor) {
((InstantiationAwareBeanProcessor) postProcessor).postProcessPropertyValues(bean, bd.getId());
}
}

...

}

...

}

总的来说整个使用 processor 的过程分为两步,首先在 AbstractApplicationContext 构造方法中注册我们自定义的 processor,然后再 DefaultBeanFactory 中调用其 postProcessPropertyValues 方法进行注入,至此使用在类字段上的 @Autowired 注解实现完成。

总结

本文简要介绍了实现 Spring@Autowired 注解(使用在类字段上的方式),其中比较麻烦的步骤是数据结构抽象部分,需要考虑到后期的扩展性和内部操作对使用者尽量透明,限于篇幅,只列出了部分核心实现代码,完整代码已上传至 GitHub ,感兴趣的朋友可以查看完整代码。

❌
❌