普通视图

发现新文章,点击刷新页面。
昨天以前nickChenyx

修改 Home Assistant 的网络配置

作者 nickChen
2023年9月9日 19:03

因为 Home Assistant 需要联网下载诸多插件,比如加载项(Add-On)里的 ESPHome、Node-RED 之类的,需要配置网络代理才能够下载。

因为安装 Home Assistant 方式有好几种,配置网络代理的方法也有差异。下列配置方式请对号入座。

HAOS 配置网络

如果是安装的是 HAOS(Home Assistant Operating System),可以通过两种方式配置网络代理。当然前提是你必须有一个路由服务可以进行网络代理,以下称之为代理路由器。

  1. 在 Home Assistant 控制面中配置
    在主页侧边栏的「配置」中,找到「系统」-「网络」选项,进入「网络」配置页面中可以看到「配置网络接口」的配置栏,在此处可以配置 IPV4 为 「静态地址」,配置如下:
  • 自定义一个不冲突的 IP 地址/子网掩码,比如 192.168.0.123/24
  • 配置网关地址为你的代理路由器地址,比如 192.168.0.111
  • 配置 DNS 服务器地址为你的代理路由器地址,比如 192.168.0.111
  1. 在 HAOS 的终端中通过命令配置
  • 输入 network info 查看网卡名称,格式如下:
    ```yaml

    interfaces:
  • interface: enp0xxx # 这里就是网卡名称

    ```
  • 输入 network update {网卡名称} --ipv4-address 192.160.0.123 --ipv4-gateway 192.168.0.111 --ipv4-nameserver 192.168.0.111 等待配置成功即可
  • 实测在 HAOS 10.5 版本,修改 --ipv4-nameserver 不会生效,这个还是去第一步的控制面 UI 页面修改吧 ……
  1. 在 HAOS 的终端中通过 nmcli 命令配置,这个步骤较为复杂,不如直接在第一步配置。而且这里改动后,在前端页面的网络配置页面也不会展示,个人感觉官方可能也不建议这么搞。
  • 在 HA 系统中输入 login,登录进系统
  • 输入 nmcli connection show 查看你的连接列表,选择你想要修改的连接
    • 这里新安装的话,我们通常只能看到一个 Supervisor 之类的连接,记住这个连接名称(Name)
  • 输入 nmcli con edit "{连接名称}" ,这是能进入到 nmcli> 命令中
  • 这时候修改网络地址即可,连续输入以下几个命令:
    nmcli> set ipv4.address 192.168.0.123/24nmcli> set ipv4.dns 192.168.0.111nmcli> set ipv4.gateway 192.168.0.111nmcli> savenmcli> quit
  • 完成以上动作后,输入 exit 退出刚才 login 的系统,回到 ha> 命令页面,输入 ho reboot 重启服务
  • 系统重启后,输入 ha newtork info 就可以看到网络配置改好了。

灯开关补充零线

作者 nickChen
2023年9月6日 20:11

前言

本篇文章主要是介绍如何给灯开关上添加零线,至于我为啥要加这个零线,就是非常痛苦的故事了,后面再说。

先假定你已经明确是需要给灯开关加零线,我直接开始介绍如何操作,最后再补充为什么需要零线的原因。

灯加零线过程(精简版)

  1. 确认此时灯盒中,只有 L 线(火线)和灯线(也叫负载线),缺少一根零线。

笔者注:此时我的盒子里只有 L线(红色)和灯线(白色),这里的用颜色描述希望可以方便读者快速类比到自己的情况。

灯上连着 L 线和灯线

  1. 找到灯位置的接线,可以看到灯上接入了两根线。

笔者注:此时我的灯上有两根线,一根红线,一根蓝线,按照设计,灯上的红线接火线,蓝线接零线。此时灯红线连着 L线(蓝色),灯蓝线连着零线(白色),灯旁边有一根地线(黄绿色)。

灯上连着 L 线和灯线

  1. 接下来要做的是把灯蓝线接零线(白线)处,分出一根零线,接入到灯盒中。这时候需要到一个、「电工穿线神器」、「电线」和「接线端子压线帽」。

笔者注:这些都是笔者自行淘宝购买的。「电工穿线神器」最好是带滑轮头的,穿墙效果好;「电线」买公牛 BVR 电线 1.5 平方毫米的就行,能满足灯的需求;「接线端子压线帽」是为了连接将灯蓝线、零线(白线)和新穿电线连接在一起,用这个好处是可以将电线压实,防止虚接。当然如果你的电线非常松,能够在灯盒处直接拉动灯线(白色),灯盒处的白线能够从灯处拉出,那就可以直接把新买的「电线」用绝缘胶带多捆几圈绑在灯线(白线)上。这样就不用「电工穿线神器」了,直接利用已有的线带新线。

笔者注:笔者家里的线是藏在管道盒子里的,所以需要打开盒子才能穿线

电线被收在盒子里了,穿线需要把盒子线打开

  1. 在灯盒处,将新「电线」绑在「电工穿线神器」上(这个可以看「电工穿线神器」的商品说明),然后用「电工穿线神器」从管道口伸入,一点点从灯处穿出。穿出后,将新「电线」和灯蓝线、零线(白色)用「接线端子压线帽」压起来,压好后抽动下几根线,确认压实不会被抽出来。

  2. 这时候灯盒处已经有了零线了。大功告成!

  3. 附上修改前后的电路图作为参考

改造前,灯盒处只有两根线

改造后,灯盒处有三根线,新接的就是零线

为什么灯盒需要零线(躺坑路)

笔者为了接入智能开关,直接买了零火版的智能开关。我在拆开灯开关盒的时候,发现有两根线,也用电笔测试了下,一个是有电,一个无电,我就以为这是火线和零线了,自然而然地买了零火版的智能开关。在要安装开关的时候,才发现零火版的智能开关,需要连接三根线:

  • L线——火线
  • N线——零线
  • L1线——灯线

这时候我发现我灯盒里只有两根线!L1线是什么线?这时候才了解到灯盒里原有的红线(火线)和白线(灯线,也就是零火版里的L1线),是没有零线的。这个只有火线和灯线的情况,只能用单火版的智能开关,但是我已经买了零火版的,那我必须要用上啊…… 所以就开始折腾如何在灯盒里接零线了。

curl 实战

作者 nickChen
2022年10月14日 22:19

耗时分析

$ man curl  # 查看 curl 的使用方式-w, --write-out <format> # -w 参数配置输出格式

-w 参数中有部分时间相关的参数如下:

  • time_namelookup:从请求开始到 DNS 域名解析完成的耗时
  • time_connect:从请求开始到 TCP 连接建立耗时(三次握手)
  • time_appconnect:从请求开始到 SSL/SSH 等建立连接耗时( ssl handshake 等)
  • time_pretransfer:从请求开始到响应开始传输的时间
  • time_redirect:从请求开始到,包含前面四者耗时,且所有重定向的时间相加,直到最终访问目标服务前的耗时
  • time_starttransfer:从请求开始到第一个字节将要传输的耗时,包含了 time_pretransfer 耗时
  • time_total:请求的全部耗时

如何发起耗时分析请求

  1. 直接在命令行中拼写输出格式
    curl -w '\ntime_namelookup=%{time_namelookup}\ntime_connect=%{time_connect}\ntime_appconnect=%{time_appconnect}\ntime_redirect=%{time_redirect}\ntime_pretransfer=%{time_pretransfer}\ntime_starttransfer=%{time_starttransfer}\ntime_total=%{time_total}\n\n' -o /dev/null -s -L 'http://voidchen.com'
  1. 利用文件描述格式
    创建一个格式文件 format.txt
     time_namelookup:  %{time_namelookup}\n    time_connect:  %{time_connect}\n time_appconnect:  %{time_appconnect}\n   time_redirect:  %{time_redirect}\ntime_pretransfer:  %{time_pretransfer}\ntime_starttransfer:  %{time_starttransfer}\n                 ----------\n      time_total:  %{time_total}\n
    使用格式文件发起访问
    curl -w '@format.txt' -o /dev/null -s -L 'http://voidchen.com'

耗时计算

  1. DNS耗时 = time_namelookup
  2. TCP建连耗时 = time_connect - time_namelookup
  3. SSL握手耗时 = time_appconnect - time_connect
  4. 服务器处理请求耗时 = time_starttransfer - time_pretransfer
  5. TTFB耗时 = time_starttransfer - time_appconnect
  6. 服务器传输耗时 = time_total - time_starttransfer
  7. 总耗时 = time_total

TODO 分析脚本

参考

2023 OKR

作者 nickChen
2023年1月29日 21:26

这是第一次做年度规划,也是第一次以 OKR 的形式定义目标。希望今年目标能顺利达成。

O1 技术创新

KR1 博客 12 篇

以博客沉淀内容

KR2 学习 Rust 及另外一门新语言,产出 LearnXInYMinutes

本次目标不仅是学习一门语言,同时也要定义一套通用的学习新语言的方法。先以 LearnXInYMinutes 作为初入门的切入点。

O2 开源

KR1 参与 2 个开源项目

以基础工具库优先,GO 优先。NeoVIM 可以参与一个项目,了解 NeoVIM 开发模式。

KR2 各平台同步输出内容

国外平台优先,包括 Twitter、Medium。

O3 生活

KR1 旅游目标

  • 三次三日以上的旅行
  • 十次爬山或徒步

KR2 摄影

  • 完成 1000 张图片的拍摄,做好分类

漫谈 Golang 空指针 panic 场景

作者 nickChen
2022年7月7日 22:23

本文主要用于分析几类 Golang 空指针引起的 panic 场景,这些场景遍布在日常开发代码中。但是出现 panic 时,真的能根据空指针信息准确定位出原因,找出故障代码吗?

来看看熟悉的 panic 信息

panic: runtime error: invalid memory address or nil pointer dereference

见识一下“熟悉”的 panic

空 map 赋值

source code

package mainfunc main() {    var m map[int]int m[1] = 1}// Output:// panic: assignment to entry in nil map

第一个需要避免的就是对未初始化的 map 进行赋值,当然,这个并不是本篇文章需要讨论的,但是笔者认为 map 作为常用的组件,还是需要在编码时避免此场景发生。

结构体指针未初始化

可以看出 t.n 结构体会跟随者 T 的初始化而完成初始化,此时 t.n.Log0()t.n.Log() 调用都成功了。
但是 t.np 结构体指针并不会随着 T 的初始化而完成初始化,此时 t.np 实际类型是 (*N)nil ,它是有类型的,类型为 *N,但是 t.np 值为 nil。因此 t.np.Log() 能正常调用,但是 t.np.Log0() 则会 panic

source code

package mainimport (    "fmt")type T struct {    n  N    np *N}type N struct {    n int}func (n N) Log0() {    fmt.Println("n log0")}func (n *N) Log() {    fmt.Println("n log")}func main() {    t := T{}    t.n.Log()    t.n.Log0()    t.np.Log()    t.np.Log0()}// Output:// n log// n log0// n log// panic: runtime error: invalid memory address or nil pointer dereference

interface{} 未初始化

相较于结构体指针,当成员变量是一个接口(interface{})时,结果又会变得不一样。

此时 t.m 是一个接口 M,当初始化创建 T 时,如果不主动初始化 M ,那么此时 M 是一个 nil。调用 t.m.Log() 会造成 panic

source code 1

package mainimport (    "fmt")type T struct {    m M}type M interface {    Log()}type M1 struct {}func (m *M1) Log() {    fmt.Println("m1 log")}func main() {    t := T{        m: &M1{},    }    t.m.Log()    t = T{}    t.m.Log()}// Output:// m1 log// panic: runtime error: invalid memory address or nil pointer dereference

source code 2

package maintype M interface {    Log()}func main() {    var m M    m.Log()}// Output:// panic: runtime error: invalid memory address or nil pointer dereference

interface{} assign nil

这个 case 在代码中常有发生,根因还是在于 Golang 的 nil 机制有个特殊的地方,可以看source code 1

source code 1

package mainimport (    "fmt")func main() {    var i interface{}    var n *int32    if n == nil {        fmt.Println("n is nil")    }    if i == nil {        fmt.Println("i is nil")    }    i = n    if i != nil {        fmt.Println("after assign, i is not nil")    }}// Output:// n is nil// i is nil// after assign, i is not nil

基于这个规则,常见的业务中出现 panic 的场景可以看如下代码。在这个场景下,panic 的原因同结构体指针未初始化的场景一致,根因是对于变量是否为 nil 的判断上出了问题。

source code 2

package mainimport (    "fmt")type M interface {    Log()}type M1 struct {}func (m M1) Log0() {    fmt.Println("log0")}func (m *M1) Log() {    fmt.Println("log")}func main() {    var i interface{}    var m M    var s *M1    if i == nil {        fmt.Println("i is nil")    }    i = m    if i == nil {        fmt.Println("after assign m, i is nil")    }    i = s    if i != nil {        fmt.Println("after assign s, i is not nil")    }    // 业务中常用 == nil 判断,然后快速失败 return    if i == nil {        return    }    // 业务中会认为此时的 i 就是非 nil 的了,然后开始进行方法调用    // 和结构体指针未初始化的场景一样,这里调用 *M 的方法是可以进行的    i.(*M1).Log()    // 但是如果调用的是 M 的方法,那就出问题了,panic 随之而来    i.(*M1).Log0()}// Output:// i is nil// after assign m, i is nil// after assign s, i is not nil// log// panic: runtime error: invalid memory address or nil pointer dereference

要改变是否为 nil 的判断,确保这种 case 不会发生。可以使用:

func IsNil(x interface{}) {    return x == nil || (reflect.ValueOf(x).Kind() == reflect.Ptr && reflect.ValueOf(x).IsNil())}

Golang 开源之 try 使用指南

作者 nickChen
2022年5月10日 20:20

Try(https://github.com/dsnet/try) go1.18 后一种新的 panic 处理方式

错误处理 —— Error Handling

Try 在 Golang 中实现了一种极简的错误处理方式。
不过这里需要明确一点使用前提,必须是 Go1.18 以上的泛型版本才可以使用此库。

入门

回顾 Golang 标准错误处理流程
作为比较,先放一下 Golang 中标准的错误处理流程。

type Duck struct {    Age int}func main() {    duck := Duck{Age:1}    bs, err := json.Marshal(&duck)    if err != nil {        log.Fatal("marshal duck fail")        return    }    var dummy Duck    err = json.Unmarshal(bs, &dummy)    if err != nil {        log.Fatal("unmarshal duck fail")        return    }    fmt.Printf("dummy: %+v", dummy)}

标准的 Goland Error Handling 实在是有点繁琐,每次 err 都需要单独判断,频繁的 if 逻辑穿插在业务处理流程当中。

如何能够像类似 Java Try-Catch 一样的错误处理方式呢?

使用 Try 进行错误处理

现在我们尝试使用 Try 来编写以上代码:

type Duck struct {    Age int}func main() {    defer try.F(log.Fatal)    duck := Duck{Age:1}    bs := try.E1(json.Marshal(&duck))    var dummy Duck    try.E(json.Unmarshal(bs, &dummy))    fmt.Printf("dummy: %+v", dummy)}

太干净了!可以看到主流程都非常紧密的连接在一起了,错误处理已经被 Try 给包裹住,不需要再使用烦人的 if err!= nil 语句了。

Try API 解析

开发文档地址 https://pkg.go.dev/github.com/dsnet/try

全量 API 列表

func E(err error)func E1(a A, err error) Afunc E2(a A, b B, err error) (A, B)func E3(a A, b B, c C, err error) (A, B, C)func E4(a A, b B, c C, d D, err error) (A, B, C, D)func F(fn func(...any))func Handle(errptr *error)func HandleF(errptr *error, fn func())func Recover(fn func(err error, frame runtime.Frame))

通过 API 可以发现 Try 的设计:

  1. 函数 E() 是核心,主要用来包裹返回参数 error。多个函数 E() 是因为需要匹配不同数量的返回值。
    • 使用了泛型去匹配非 error 类型的变量,所以必须要 Go1.18 以上才可使用
    • error 都是在末位,所以函数 E() 只能支持 error 在最后一位的函数返回值作为入参
    • 如果需要有更长数量返回值的函数,或 error 位置不在最后一位的函数,可以新增函数 E() 来适配
  2. 函数 F(fn func(...any)) 实际上接收的是一个函数,函数的入参是可变长的 any 类型,“入门”一节中使用的 log.Fatal 就是这类函数
  3. 函数 Handle(errptr *error) 接收了一个 error 指针,这里的使用方式是,将拦截到的 error 信息可以赋值到 errptr 指针处,后面介绍一个用法
  4. 函数 HandleF(errptr *error, fn func()) 接受了一个 error 指针的同时,也提供了一个处理函数,这个处理函数会在 error 赋值到 errptr 指针后进行,后面补充用法
  5. 函数 Recover(...) 可以看到接受了一个函数,可以同时处理 error 和栈帧信息

函数 Handler 使用

func f() (err error) {    defer try.Hander(&err)    try.E(...)    ...}

可以看到此处函数 Handler 接收了 函数 f 的返回参数中的 err 变量地址作为入参,此时如果 try.E 中拦截到了 error,会将这个 error 信息赋值到返回参数的 err 变量中,实现了错误的传递

函数 HandlerF 使用

func f() (err error) {    defer try.HandlerF(&err, func() {        err = fmt.Errorf("f() call err: %w", err)    })    try.E(...)    ...}

函数 HandlerF 的一个功能是将 error 传递到函数 f 的返回参数 err 中;另一个能力是接收一个自定义函数。假定的一个场景是:在这个自定义函数中,对传递的 error 进行一次 wrap,增加上一些额外信息 f() call err:,这样可以标识错误传递来源是函数f

原理

panic and recover

func f() {    defer recover()    panic("^_^")}

Try 利用了 Golang 提供的 panic & recover 的能力,巧妙的将业务逻辑中的 error 信息包装成 panic,随后被 defer 中的 recover 捕获,从而完成中断业务处理,返回错误信息的能力。

以函数 Handler(errptr *error) 举例,代码中对 panic 操作进行了 recover,并且只有 panic 的信息会 error 对象时,才捕获并转移到 errptr 中。

func Handle(errptr *error) {    r := recover()    switch r.(type) {        case nil:        case error:            *errptr = r.(error)        default:            panic(r)    }}

以上代码存在的问题是,recover 出来的 error 可能并不是 try.E() 函数 panic 抛出的,所以上文定义的 Handler() 执行下来会存在将其他代码片段出现的 panic error 一并处理了。这超出了 Try 职能的范围。所以在 Try 的实现中,实际上是自定义了一个 error 类型—— wrapError ,这样在 switch type 判断中,明确判断是否是 wrapError 即可准确的处理 Try 函数中抛出的 error。

type wrapError struct {    error    pc [1]uintptr}

dsq 和 jq 操作记录

作者 nickChen
2022年3月11日 20:12

本地文件处理工作流整理,待归档到一个合适的目录

通过 dsq join 文件条件查找

安装工具

// 文件 a.json[    {"type": "tool", "id": "1"},    {"type": "tool", "id": "2"},    {"type": "tool", "id": "3"},    {"type": "book", "id": "1"},    {"type": "none", "id": "1"}]// 文件 b.json[    {"author": "nickchen", "type":"tool", "tag":[1,2,3]},    {"author": "whoru", "type":"none"},    {"author": "teacher", "type":"book"}]

需求打印出 author、type、id ,且 id=1 的组合

  1. 因为 dsq 没法解析内嵌列表,所有 b.json 中的 tag 字段必须要被删除
  2. 需要 dsq join a.json 和 b.json,且判断 id = 1
  3. 打印结果确认是否符合要求
  4. 输出一个格式化的 json array

开始执行:

  1. 清理 b.json 中的 tag 字段
    • jq 的最外侧 [] 是列表构造器,会生成一份列表
    • jq 的 .[] 代表迭代器,迭代当前这个 json array
    • jq 的 | 是 pipe 管道,将数据流转到下一个程序
    • jq 的 del() 是内建函数,可以删除某个字段
➜  cat b.json | jq '[.[]|del(.tag)]' > c.jsonor➜  cat b.json | jq '[.[]|{author:.author,type:.type}]' > c.json
  1. dsq join 查询 & 打印结果
    • dsq 的 --pretty 可以输出表格结果
    • dsq 输入多个文件,可以按照进行 join 操作,表名即文件顺序
➜  dsq --pretty a.json c.json "select {0}.id, {1}.type, {1}.author from {0} join {1} on {0}.type = {1}.type where {0}.id = 1"+----------+----+------+|  author  | id | type |+----------+----+------+| nickchen |  1 | tool || teacher  |  1 | book || whoru    |  1 | none |+----------+----+------+
  1. 输出一个格式化的 json array
➜  dsq a.json c.json "select {0}.id, {1}.type, {1}.author from {0} join {1} on {0}.type = {1}.type where {0}.id = 1" | jq .[  {    "id": "1",    "type": "tool",    "author": "nickchen"  },  {    "type": "book",    "author": "teacher",    "id": "1"  },  {    "id": "1",    "type": "none",    "author": "whoru"  }]

记一次端口扫描工具实现

作者 nickChen
2020年11月17日 14:33

端口扫描工具

工具如其名,主要功能是扫描服务器端口是否开放。常用的方式是扫描服务器(S)在某一段端口(P)范围内,有哪些端口正在被监听。

如:扫描服务器 127.0.0.1,端口 [80, 2000)段内,查看被监听的端口。

实现方案

  • 和对端服务器端口尝试建立 socket 链接,如果链接成功建立,则表明该端口正在被监听。
  • 使用多线程方案,加快端口扫描速度
  • 使用协程方案,降低资源使用,加快扫描速度

以上就是大致的是实现步骤。本次实现了

  • Java 单线程扫描
  • Java 多线程扫描
  • Java 协程扫描
  • Rust 协程扫描

没错,>..< 实际上这是一次学习 Java Quasar Fiber 和 Rust 过程中的小练习。

实现代码

实现已放在 github 仓库,感兴趣的可以下载自行运行。

遇到的问题

Java Fiber 和 Rust 的实现过程中都遇到了关于协程使用上的问题,在此记录一下。

Java Fiber

首先需要在函数上标明 @Suspendable,协程内的阻塞函数需要标记此注解才能让协程正常服务。

其次协程中使用的阻塞函数,需要有自己的封装实现。线程方式中使用 java.net.Socket#connect 函数是调用的 JDK 原生实现 java.net.PlainSocketImpl#socketConnect
这个实现是阻塞的,此阻塞的实现,会影响阻塞住协程,使得协程的性能无法发挥。
Quasar 为此单独提供了 FiberSocketChannel 实现,此实现调用了 co.paralleluniverse.fibers.io.AsyncFiberSocketChannel#connect
底层是异步 IO 实现,使用 Fiber.park 接替了原有的阻塞实现。

使用过程中可以感受到,使用 Fiber 接替现有的代码,还是有需要改造的部分,无法零成本接入。(此处如果我有使用不合理的地方,请邮件我改进,谢谢

Rust

Rust 版本实现中,使用了三方依赖 may 作为 coroutine 的实现依赖。出现了同 Java 版本类似的问题。
原生的 std::net::TcpStream 依然会阻塞住协程,还是需要使用 may 提供的异步 IO 实现 may::net::TcpStream 才能匹配协程提升性能。

另外就是 Rust 的 TcpStream::connect_timeout 使用方式不能按照官方提供的文档使用。

use std::net::TcpStream;if let Ok(stream) = TcpStream::connect("127.0.0.1:8080") {    println!("Connected to the server!");} else {    println!("Couldn't connect to server...");}

按照此方法使用 TcpStream::connect_timeout,会导致 if 判断提前进入 else 逻辑,造成逻辑异常。
必须将链接建立的语句提前,才能逻辑正常。

// 正确实现let stream = TcpStream::connect_timeout(&addrs[0], timeout);if stream.is_ok() {    stream.unwrap().shutdown(Shutdown::Both).expect("shutdown tcp stream fail");    return true;} else {    return false;}

诡异的问题

实现 Rust 版本的测试结果中,出现了诡异的问题。问题描述如下:

使用 rdial --start-port 80 --end-port 9000 --hostname 127.0.0.1 --timeout 200 执行时,结果返回了 [1080, 4198] 两个接口,实际上 6394 接口也是在被监听,但没有被扫描出来。修改使用 rdial --start-port 6000 --end-port 9000 --hostname 127.0.0.1 --timeout 200 执行时,此时结果返回了 6394 接口。

经过 debug 确认了所有端口都有的的确确被扫描到,并没有漏掉 6394 端口。

继续尝试增加 timeout 超时时间,发现还是无法扫描到 6394 端口,但是有个特殊现象——6394 端口在建立链接语句执行后,立马返回了“close”日志,并没有经过设置好的等待时间。

最后发现问题是在操作系统限制的 fd 上,使用 ulimit -n 发现此时设置为 4864,可以看到我们的扫描程序第一次执行的确扫描到了 4198 这个接口监听,而忽略了 6394 这个接口。

答案呼之欲出,执行 ulimit -n 10000 将 fd 限制提升到 10000 之后再次执行,此时返回结果 [1080, 4198, 6394],问题解决。

Vim 学习笔记

作者 nickChen
2020年11月1日 17:01

[TOC]

删除

  • dw = daw

  • daw 删除单词及旁边空格

  • diw 删除单词

  • dt( 删除直到左括号

  • dt” 如上

  • d0 删除到行首

  • d$ 删除到行尾

  • ndd 删除 n 行

  • nx 删除 n 个字符

替换

  • r replace

  • c change

  • s substitute 删除当前字符并进入插入模式

  • R 不断替换后续字符,覆盖写

  • S 整行删除进入插入模式

  • ns 删除n个字符并进入插入模式

  • caw 删除当前单词并进入插入模式

  • ct” 删除直到”并进入插入模式

查询

  • / 前向
  • ? 反向
  • n/N 上下移动
  • */# 对当前单词进行前向反向匹配

buffer 切换

  • :bprevious

  • :bnext

  • :bfirst

  • :blast

  • :b n 跳转 buffer

  • :b file_name 跳转 buffer

  • :ls 展示 buffer 列表

  • :e 打开编辑文件

window

  • :sp [file_name] 水平分隔

  • :vs [file_name] 垂直分隔

  • H/L 窗口左右替换

  • = 所有窗口等宽高

Tabpage

  • :tabnew [file_name] 新建一个标签页
  • gt 切换下一个 tab
  • gT 切换上一个 tab

宏录制

  1. 在 normal 模式下,按 q{register} 设置宏存放的寄存器位置,例如 qa,将宏存放在寄存器 a
  2. 开始进行 vim 操作
  3. 回到 normal 模式下,再按 q 结束录制
  4. 在 normal 模式下,按 @a 执行寄存器 a 中录制好的宏,可以利用100@a 执行一百次

复制黏贴

  • yy 默认复制一行到无名寄存器
  • p 默认粘贴无名寄存器的字符
  • 寄存器 0 为复制寄存器,使用 y 复制文本会将内容同步保存到寄存器 0
  • :reg a 查看寄存器 a 中信息
  • :reg {register} 同上
  • “{register} 使用某寄存器(register a-z 都可以使用)
  • “{register}yy 复制一行并将结果存放到寄存器 {register}
  • “{register}p 粘贴寄存器 {register} 中的字符
  • “+ 使用系统剪贴板
  • 设置 set clipboard=unnamed 默认使用系统剪贴板作为无名寄存器(Mac 需要使用 vim –version 检查是否支持 +clipboard 才有效)
  • 在 insert 模式下,使用 {register} 可以粘贴指定寄存器的内容

补全

  • 根据 Ctrl-n、Ctrl-p 补全单词
  • 根据 Ctrl-x Ctrl-f 补全文件名
  • 根据 Ctrl-x Ctrl-o 补全代码,需要开启文件类型检查

更换配色主题

  • :colorscheme 显示当前主题
  • :colorscheme 查看可选主题
  • :colorscheme <主题名> 更改主题

Linux 服务器负载(load)过高排查

作者 nickChen
2020年11月1日 14:10

[TOC]

load avg

服务器负载 load Wiki 解释

服务器负载实际上指的就是当前操作系统 Running(执行中) 及 Runnable(就绪等待执行) 的任务队列数量(实际情况可能会包含其他,参看 wiki),实际展示了当前系统执行的压力——如果就绪执行的队列过长,那么任务执行的周期就越长,可以理解为服务器已经无法承担如此大的压力,只能排队执行了。
类比于一个面包店,正常情况下,一个服务员可以解决店内某一时刻一人结账,如果此时有两人结账,那么其中一人只能等待。此时结账还可控,只是第二位客人结账时间会延长,但如果有十人同时结账,此时只能一个接一个的排队结账了,最后一位客人的结账周期就会非常漫长了。

linux load avg 可以通过 uptime \ top 等指令查看,也可以通过读取文件 /proc/loadavg 获取相关数值。

$ uptime 14:34:03 up 10:43,  4 users,  load average: 0.06, 0.11, 0.09

load avg 后跟有三个数值,分别以逗号分隔开,这三个数字分别代表了系统近 1 分钟、5 分钟、15 分钟的平均负载情况。

实时的系统运行情况可以通过 vmstat 查询,获取到更细粒度的系统运行状态。

# 以每 1 秒为间隔,连输打印五次$vmstat 1 5procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu----- r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st 4  1      0 2311684 178948 18389776    0    0     3   189   17    9 13  8 79  0  0 1  0      0 2311404 178952 18390256    0    0     0   384 18457 32160  3  3 94  0  0 2  1      0 2312008 178952 18390368    0    0     0   608 18300 31971  5  3 92  1  0 1  0      0 2311052 178952 18390636    0    0     0   636 18108 31634  3  3 94  1  0 3  1      0 2311168 178952 18390784    0    0     0   576 19212 32975  5  3 92  0  0

问题排查

通过 CPU 使用率定位

通过 CPU 的使用率来定位某个进程/线程的运行问题。

执行方案:

1. 先定位出 CPU 使用率较高的进程,找到对应的进程 id

$top# 进入交互界面后,按 P 使进程按照 CPU 使用率排序

2. 找到对应进程的使用 CPU 较高的子进程

$top -Hp <pid># <pid> 就是第一步查出来的进程 id

此时已经能定位到某些 CPU 占用较高的进程

如果是 JVM 项目,那么可以继续向下定位到 JVM 的执行堆栈。

JVM 项目

3. 将第二步得到的进程 id 转换为十六进制

$printf "0x%x\n" <pid># <pid> 是第二步查出的进程 id

4. 将第三步得到的十六进制和 jstack 比较找到堆栈

$jstack <pid> > jstack.tmp# 打印 JVM 进程堆栈到 jstack.tmp 文件# 后续只需要在文件中搜索第三步的十六进制字符串,即可找到其对应的堆栈

以上步骤已有脚本实现,可以参考使用。 [^1] 不过在多用户的 linux 中可能会出现使用上的问题,所以基本的排查原理还是要熟知。

TODO

  • sar 工具
  • iostat 工具

[^1]: 查找 CPU 使用最高的 Java 线程

Linux 开发运维操作记录

作者 nickChen
2020年8月17日 13:22

[TOC]

此篇文章碎片式积累了 linux 上的相关操作,使用时可以搜索当前页面

文件

压缩日志并删除原始文件

#!/bin/bashyesterday=`date -d '1days ago' +%Y_%m_%d`cd $1find . -name "*$yesterday*.log" -type f | xargs -I {} tar -zcvf {}.tar.gz {} --remove-files

说明加上参数--remove-filestar 命令可以压缩并删除的源文件

这样只能删除文件,如果删除源文件夹,可以使用以下方法

tar -zcvf aaa/ aaa.tar.gz && rm -rf aaa

进程

查看进程启动时间

# 启动时间ps -eo lstart# 运行多长时间ps -eo etime# 直接查看进程的启动时间ps -eo pid,lstart,etime | grep <pid>

查看进程系统调用

# 通常 -p <pid> 即可# -f 可以跟踪其子进程的系统调用,也就可以跟踪一个多线程服务的所有系统调用了strace -f -p <pid>

查看进程的内存占用

# 查看进程的内存占用情况pmap -x <pid>

内存

查看剩余可用内存

free -hfree -m

Redis 线上运维操作

作者 nickChen
2020年5月8日 16:08

[TOC]

线上 KEY 批量删除

Redis 删除特定前缀的 key,需要注意性能影响,比较直观的模糊删除的方式是:

# 错误示范,生产环境不可用!redis-cli --raw keys "service:order:*" | xargs redis-cli del

使用 redis-cli 执行 KEYS 指令模糊匹配对应的 key,再利用管道传递给 redis-cli 执行 DEL 指令。这里的 KEYS 指令是个性能隐患。

实际上阿里云 4G 的 redis 实例上,只有 500 MB 左右的内存占用时,KEYS 指令的 rt 已经达到了 500ms+ 的程度,严重影响了 Redis 的性能。

此处 redis-cli 命令最简化了,实际上线上机器应当还有 host/password 等相关信息需要配置在 redis-cli 的参数上

线上不能使用 KEYS 指令应该成为编码过程中必须注意的点,实际也可以禁用 Redis 的 KEYS 指令,避免开发人员误操作。

修改 redis.conf 文件,添加 rename-command KEYS "" 即可,可以理解为重命名某个指令,重命名为空字符串即为禁用

使用 SCAN 指令进行模糊匹配的操作(Redis 2.8 开始支持此指令)

SCAN 指令的具体用法不在此处描述,此处实际要利用的是 SCAN 指令的特性进行模糊匹配要删除的 key。

redis-cli --scan --pattern "service:order:*" | xargs redis-cli del

--scan 使用的就是 SCAN 指令的特性进行了 key 的模糊匹配,对比 KEYS 指令这个操作不会阻塞 Redis 操作,对线上业务没有影响。

需要注意上述操作模糊删除 string 类型的 Redis 键时时间复杂度为 O(1),但是对于其他数据结构时间可能不一样了。比如 set 数据结构,如果直接使用 DEL 指令删除这个 set 数据结构的 key,他的时间复杂度并不是 O(1),而是 O(M),M 为数据结构中元素的数量。也就是意味着如果这个数据集合过大,这个 DEL 指令的执行实际上也有性能风险(此处可看官方文档中的 DEL 时间复杂度描述)。

面对除了 string 类型的其他数据结构,Redis 有对应的 HSCANSSCANZSCAN 指令可以遍历其元素,利用这一特性可以整理出以下的批量删除脚本。

批量删除 set 数据结构中的数据

import redisdef del_big_set_key(key_name):    r = redis.StrictRedis(host='localhost', port=6379)    # count表示每次删除的元素数量,这里每次删除300元素    for key in r.sscan_iter(name=key_name, count=300):        r.srem(key_name, key)del_big_set_key('ops-coffee')

批量删除 hash 数据结构中的数据

import redisdef del_big_hash_key(key_name):    r = redis.StrictRedis(host='localhost', port=6379)    # hscan_iter获取出来的结果是个元祖,下边hdel删除用key[0]取到key    for key in r.hscan_iter(name=key_name, count=300):        r.hdel(key_name, key[0])del_big_hash_key('ops-coffee')

批量删除 zset 数据结构中的数据

import redisdef del_big_sort_key(key_name):    r = redis.StrictRedis(host='localhost', port=6379)    while r.zcard(key_name) > 0:        # 判断集合中是否有元素,如有有则删除排行0-99的元素        r.zremrangebyrank(key_name, 0, 99)del_big_sort_key('ops-coffee')

参考资料

MySQL 总集

作者 nickChen
2018年4月18日 11:19

show statusshow session status

⬆查看当前MySQL服务器连接的会话状态变量信息;

show global status

⬆查看全局状态变量;

flush status

⬆初始化当前会话状态变量

show variables

⬆查看全局系统变量、会话系统变量和静态变量等;


MySQL 缓存:

按缓存读写功能不同划分

  • Cache 缓存 (加速读)
  • Buffer 缓存 (缓冲写)

按生存周期长短划分

  • 全局缓存 例如二进制日志 binlog_cache_size
  • 会话缓存 例如结果集缓存 net_buffer_size
  • 临时缓存 例如select语句中包含的派生表生成的内存临时表

按存储引擎实现划分

  • MySQL 缓存
  • MyISAM 缓存
  • InnoDB 缓存

超时

show variables like '%timeout%'

⬆查看超时相关变量配置

连接超时

  • connect_timeout 建立连接超时
  • wait_timeout 保持睡眠状态太长,超时
  • interactive_timeout 交互模式下(cmd)保持睡眠状态太长,超时
  • net_write_timeout 默认60秒 写超时
  • net_read_timeout 默认30秒 读超时

InnoDB 锁等待超时

  • innodb_lock_wait_timeout 默认50秒 设置行级锁锁等待时间,超时触发导致行级锁锁等待的SQL语句回滚(若希望整个事务回滚,启动MySQL时开启 innodb_rollback_on_timeout 参数)
  • innodb_rollback_on_timeout 默认OFF 回滚上一条导致行级锁锁等待的SQL语句, 设置为ON则回滚整个事务

元数据锁超时 metadata locks

  • lock_wait_timeout 默认值1年 31536000 取值范围[1, 31536000]

复制连接超时

  • slave_net_timeout 默认3600秒 MySQL主从复制时,从拉取主二进制日志失败后,等待该设置的时间后,再重连主获取数据。 设置为30秒,减少网络问题导致的数据同步延迟。

MyISAM 表的延迟插入超时

  • delayed_insert_timeout

MySQL 连接的优化

连接参数

show variables like '%connect%'

⬆查询MySQL服务的连接参数信息

  • max_connections 设置最大的并发连接数,拥有SUPER权限的用户可以在连接数达到最大时依然能建立链接。
  • max_user_connections 设置指定的MySQL账号的最大并发连接数,设置为0表示不限制
  • max_connect_errors 某主机连接到MySQL服务器失败次数过多,超过该值,服务器会拒绝该主机的连接,除非执行 flush hosts
  • init_connect 客户机连接服务器时,会先执行 init_connect 参数内设置的SQL语句。SUPER权限的用户连接不会执行这些SQL语句

连接状态

show status like '%connections%'

⬆查看当前实例连接MySQL服务的状态信息

  • Connections Mysq服务从启动到现在尝试连接的请求数(包括不能成功建立的连接请求)
  • max_used_connections 表示MySQL服务从启动到现在,同一时刻并行连接的最大值。如果 max_used_connections 和 max_connections 相同, 则说明 max_connections 设置过低或者服务器负载上限。
  • connection_errors_max_connections 由于MySQL服务器已经达到 max_connections 的上限,连接被拒绝的次数。如果该值过大,则说明 max_connections 设置过低或者服务器负载上限。

连接线程参数

show variables like 'thread%'

⬆查看MySQL连接线程参数信息

  • thread_cache_size 表示当前可用的MySQL连接池大小
  • thread_concurrency 针对Solaris系统设置为CPU核心数的2倍
  • thread_handling 默认为 one-thread-per-connection,值为 no-threads 只能提供一个连接线程
  • thread_stack 默认 192KB, 配置连接线程分配的内存大小用于保存每个连接线程的信息

连接状态信息

show status like 'Thread%'

⬆查看连接线程的状态信息

  • Threads_cached 当前线程池的线程数
  • Threads_connected 当前连接数
  • Threads_created 连接线程创建数,该值过大会扩充连接池大小
  • Threads_running 不在睡眠状态的连接线程数量

连接池的连接命中率 = (Connections - Threads_created)/ connections * 100%

该值较低时,需要增加 thread_cache_size。

连接请求堆栈

show variables like 'back_log'

⬆查询堆栈中的连接请求(因连接数过大而被塞入)

连接异常

show status like 'Aborted%'

⬆查看连接异常的状态信息

  • Aborted_clients MySQL客户机被异常关闭的次数。例如发送的SQL语句过长或者select语句执行结果太大,超过 max_allowed_packet 参数值,或者 wait_timeout、 interactive_timeout ( max_allowed_packet 默认 1M)
  • Aborted_connects 试图连接到MySQL服务器而失败的连接次数,该次数过大可能有网络问题。错误的账户名密码或者无效的数据库都会使得该值递增。

其他

show status like 'Slow%'

⬆查看其他链接状态

  • Slow_launch_threads 记录创建时间超过 slow_launch_time 的线程数,如果该值过大,可能是服务器过载。 (默认情况下, slow_launch_time 为 2秒)

show status like 'Connection_error%'

⬆查看连接错误的状态统计信息


缓存的优化

show variables like 'host_cache_size'

⬆查询主机名缓存大小

show variables like 'stored_program_cache'

⬆查看MySQL为每个会话提供的存储程序缓存个数上限

show variables like 'innodb_ft_cache_size'

⬆查询InnoDB 全文索引缓存的大小

查询缓存 Query Cache

show variables like '%query_cache%'

⬆查询有关查询缓存的参数设置

  • have_query_cache 是否支持查询缓存 YES NO
  • query_cache_type 0(OFF) 关闭,1(ON)先到查询缓存中查找,除非
  • select 语句中包含 sql_no_cache, 2(DEMOND)不使用查询缓存,除非 select 语句中包含 sql_cache
  • query_cache_size 查询缓存的大小
  • query_cache_limit 如果 select 语句的结果集大小超过了该值,将不会被添加进查询缓存
  • query_cache_min_res_unit 查询缓存是以块为单位分配内存空间,结果集大于该值就会多申请一块,如此反复。合适的值不仅可以减少内存分配操作的次数,还可以减少内存碎片
  • query_cache_wlock_invalidate 用于设置行级排他锁与查询缓存之间的关系,默认 0 (false),表示施加行级排他锁时,该表的所有查询缓存依然有效。如果设置为1(true),表示施加行级排他锁时,该表的所有查询缓存将失效。

查询缓存的命中率

set global query_cache_size = 102760448

⬆开启缓存查询,将其内存大小设置为98M

show status like 'Qcache%'

⬆获取当前实例的查询缓存状态,从而可以计算出当前缓存查询的命中率,继而确定 query_cache_size 的设置是否合理

  • Qcache_free_memory 当前可用内存
  • Qcache_lowmen_prunes 因查询缓存已满而溢出、删除的查询结果个数。该值过大表示需要增加查询缓存大小
  • Qcache_hits 使用查询缓存的次数,若该值过小,则考虑是否应该开启查询缓存
  • Qcache_total_blocks 查询缓存的总块数
  • Qcache_free_blocks 处于空闲的块数(碎片数量)如果该值较大,意味着查询缓存中碎片较多,表明查询结果集比较小,此时可以减少 query_cache_min_res_unit。使用 flush query cache 对碎片进行整理。(reset query cache 会移除查询缓存中的结果集)
  • Qcache_inserts 表示此前总共缓存过多少条 select语句的结果集
  • Qcache_not_cached 表示没有进入查询缓存的 select语句的个数
  • Qcache_queries_in_cache 表示查询缓存中缓存中多少条 select 语句的结果集

结果集缓存

select 语句的结果集会暂存在结果集缓存中,结果集缓存的初始大小由 net_buffer_size 定义(默认16KB),如果 select语句的结果集大小超过初始大小,则会自动扩容,但不会超过 max_allowed_packet 的参数值。select 语句执行成功后,结果集缓存空间会“瘦身”到初始大小。

优化表结构

  • 尽量将字段定义为 NOT NULL
  • 考虑使用 enum、 set等复合数据类型
  • 尽量不存文件、视频等二进制数据
  • 数值型字段的比较比字符串效率高很多

SQL语句优化

了解 SQL 的执行频率

show status like 'queries'

⬆执行的 SQL 语句的数量,不统计 com_ping、com_statistics

show global status like 'Com_%'

⬆查看MySQL服务执行各种SQL语句的数量

  • com_select
  • com_insert 批量插入只记一次
  • com_update
  • com_delete

可以通过上面的信息了解当前应用偏向于 OLTP 还是 OLAP。

  • com_commit
  • com_rollback

可以通过上面信息,了解到rollback从而推断程序中存在某些问题。

数据处理状态信息

show global status like 'handler%'

⬆执行次数查询

JDK1.7 ConcurrentHashMap 解析

作者 nickChen
2018年4月17日 16:04

力求简单而明确地直指 JDK1.7 ConcurrentHashMap 的设计要点。

线程安全之 ConcurrentHashMap

在 JDK 中,有 java.util.concurrent包,里面存有这一些线程安全的集合类。其中应用最多的就是 ConcurrentHashMap,这是一个线程安全的 HashMap,不同的 JDK 版本可能使用了不同的技巧来保证这个线程安全的特性。这里我讨论的是 JDK1.7 版本的线程安全设计。

JDK7 采用了分段锁的方式来保证 ConcurrentHashMap 的线程安全,分段锁的工作原理是:

HashMap 下需要完成线程安全可以使用 Collections.synchronizedMap 构造一个线程安全的 HashMap,他的原理就是利用 synchronized 关键字对基本上所有的方法进行上锁,是一种非常粗暴的解决方案。可想而知的是低性能换来的线程安全。

ConcurrentHashMap 下有多个 Segment(默认 16 个), Segment 下是一个 HashTable 用来存 key-value。 Segment 是继承 ReentrantLock,可以直接享有相关的锁方法。一个数据存入 ConcurrentHashMap 需要先进行一次 hash 来确定他是在哪个 Segment 下,再对该 Segment 上锁写数据。可以看到这一点已经比前者的实现高明了。

ConcurrentHashMap 的性能优化

上一节中提到了 put 操作会对 Segment 上锁,事实上这里 JDK7 还有所优化,调用 put 方法会先尝试获取锁,tryLock() 这个是 ReentrantLock 的方法,获取不到会继续去尝试 scanAndLockForPut(),这个方法里会不停的 tryLock(),这里会涉及到 MAX_SCAN_RETRIES 次尝试 tryLock(),超过次数了会直接 lock() 获取锁。在操作最后会在 try...finallyunlock() 释放锁。

static final int MAX_SCAN_RETRIES =            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

还有 size() 方法获取 ConcurrentHashMap 的大小时,会有一系列的操作来保证得到准确的 size。

获取 size 大小是一个无限循环,每次循环会先自增一次 retries,只有 retries == RETRIES_BEFORE_LOCK(=2) 时才会将所有的 Segment 都上锁再计算。这里有个 trick,会记录上一次的 Segment 的 modCount 总值,在下一次计算时会比较,如果不相等,那就是再来一次了。这里利用了 map 中 modCount 机制。

for (;;) {    // 先尝试直接获取 size 大小,这里有计算一个 modCount的数值,会和上次(last)作比较,如果一样的话,说明map没有做增删操作啥的,就是正确的了    // 如果尝试的次数超过了 RETRIES_BEFORE_LOCK, 就直接去锁 segment 再计数    if (retries++ == RETRIES_BEFORE_LOCK) {        for (int j = 0; j < segments.length; ++j)            ensureSegment(j).lock(); // force creation    }    sum = 0L;    size = 0;    overflow = false;    for (int j = 0; j < segments.length; ++j) {        Segment<K,V> seg = segmentAt(segments, j);        if (seg != null) {            sum += seg.modCount;            int c = seg.count;            if (c < 0 || (size += c) < 0)                overflow = true;        }    }    if (sum == last)        break;    last = sum;}

至于其他的方法,涉及到的锁步骤无非也就上面的重复了,就不多谈了。在此留笔,方便日后再记。

JSONP 详解

作者 nickChen
2018年4月17日 16:04

简单直白的解释 JSONP 的由来及实现原理

JSONP 详解,不信你听不懂

对于一样技术,先了解其发展的背景十分重要。因为了解了背景,才能明白他为何要如此设计,能解决什么问题。

原理有点枯燥,我就挑口语化的来说,你能听懂就行。对术语有兴趣的可以继续谷歌下去。

JavaScript 是一种在 Web 开发中经常使用的前端动态脚本技术。在 JavaScript 中,有一个很重要的安全性限制,被称为 Same-Origin Policy(同源策略)

由于同源策略的限制,XmlHttpRequest (which 是 Ajax 实现的方式) 只允许请求当前源(域名、协议、端口)的资源。为了实现跨域请求,可以通过 script 标签实现跨域请求,然后在服务端输出 JSON 数据并执行回调函数,从而解决了跨域的数据请求。
利用在页面中创建 <script> 节点的方法向不同域提交 HTTP 请求的方法称为 JSONP,这项技术可以解决跨域提交 Ajax 请求的问题。

也就是出现 JSONP 是为了在同源策略所保证的安全的前提下,绕个弯突破限制获取资源。

上文说到是利用在页面中创建 <script> 节点,修改 script 节点的 src 属性到目标地址来获取资源,这里就有了 script 标签的局限性 —— script 标签仅支持 GET 请求方式。

这就是 JSONP 只能是 GET 请求的前因后果。

JSONP 实现细节

使用 script 标签,改变 src 属性为目标地址,服务器返回的就是一个文本 (文本内是一个函数),这个函数作为 js 被执行。

伪代码

生成的 script:

// 这是一个由 JSONP 生成的 script 标签// 他请求了一个 user 对象// 注意这里的 *callback=JSONP_CB* 这里定义了一个回调函数的名称,通常在使用 jQ 的时候,可以自定义回调函数名或者不定义由 jQ 自动生成。// 这个函数名将会被后端使用,包裹着数据传回。<script src="http://localhost:1111/user/1?callback=JSONP_CB"></script>

后端代码:

// 返回 user 对象, 通常情况下我们只需要返回 json体// 而 JSONP 则会使用回调函数名包裹着数据返回,这也就是一个 js 语法, 等于是调用了一次 JSONP_CB 函数。return "JSONP_CB({'id': 1, 'name':'nickChen'})"

这里就是回调函数使用的关键了:

// 服务器返回的 JSONP_CB({'id': 1, 'name':'nickChen'}) 因为在 script 标签下,作为一个 js 代码运行了// 然后就可以看到巧妙地利用了回调函数取回了数据并执行你的方法来处理数据、// 将 JSONP_CB 注册到成为一个函数window[callbackName] = function(data){ // 关键了,调用你的方法来处理数据 callback(data); // 清场 window.document.body.removeChild(scriptElem);};

后续测试

script, img, iframe, link 四个标签都可以请求到跨域资源,其中 link 也可以完成 script标签实现的 JSONP 功能,即可以调起回调函数。 img 不能处理文本内容,但是可以监听是否响应。iframe 就是把请求的资源当做 HTML渲染咯(即回调函数会作为文本显示)。

总结

至此,也看到了 JSONP 的实现方式,可以看到 JSONP 的设计还是十分巧妙地,而实现上又非常简单。

当然了,现在也可以不使用 JSONP 的方式来处理跨域了,可以使用 CORS 来解决跨域。

SPI 机制详解

作者 nickChen
2018年4月17日 16:04

SPI, Service Provider Interface.

There are three essential components of a service provider framework:
a service interface, which providers implement;
a provider registration API, which the system uses to register implementations, giving clients access to them;
and a service access API, which clients use to obtain an instance of the service.

service provider framework 有三个重要的组件,

service interface, 提供实现
供应者的注册接口,可以用来注册接口实现,这样就可以访问到实现类。
获取 service 的api,可以用来获取 service 的实例。

SPI 的作用

SPI 主要是被框架的开发人员使用,比如 java.sql.Driver 接口,其他不同厂商可以针对同一接口做出不同的实现,mysql 和 postgresql 都有不同的实现提供给用户。Java 的 SPI 机制可以为某个接口注册服务实现。

Java 的 SPI 实现是由 java.util.ServiceLoader 类实现。当服务的提供者提供了一种接口的实现之后,需要在 classpath 下的 META-INF/services/ 目录里创建一个以服务接口命名的文件,这个文件里的内容就是这个接口的具体的实现类。当其他的程序需要这个服务的时候,就可以通过查找这个jar包(一般都是以jar包做依赖)的 META-INF/services/ 中的配置文件,配置文件中有接口的具体实现类名,可以根据这个类名进行加载实例化,就可以使用该服务了。

这是一个 service provider framework 大致结构

// Service provider framework sketch// Service 接口,我们要使用的功能在这里// Service interfacepublic interface Service {    ... // Service-specific methods go here}// Provider 用来获取 Service 实例// Service provider interfacepublic interface Provider {    Service newService();}// Noninstantiable class for service registration and accesspublic class Services {    private Services() { }  // Prevents instantiation (Item 4)    // 用来维持 Provider 实例的 Map    // Maps service names to services    private static final Map<String, Provider> providers =        new ConcurrentHashMap<String, Provider>();    public static final String DEFAULT_PROVIDER_NAME = "<def>";    // 注册 Provider 到 Map    // Provider registration API    public static void registerDefaultProvider(Provider p) {        registerProvider(DEFAULT_PROVIDER_NAME, p);    }    public static void registerProvider(String name, Provider p){        providers.put(name, p);    }    // Service 获取接口    // Service access API    public static Service newInstance() {        return newInstance(DEFAULT_PROVIDER_NAME);    }    // 生成 Service 实例    public static Service newInstance(String name) {        Provider p = providers.get(name);        if (p == null)            throw new IllegalArgumentException(                "No provider registered with name: " + name);        return p.newService();    }}

上面的一段代码复现了上一节中提到的三个重要的组件。理解这段代码,对 SPI 机制原理的理解非常重要。

基于 Interface 的 SPI 实现

至于为什么要基于 Interface 去做 SPI,那是因为读入的类你是不知道他的具体类的,也并不知道他有哪些方法,因为这是运行时加载的,编译期都没法获取。具体见下面伪代码:

// 如果不使用 interface// 比如现在要导入一个 Student 类class Student {    public void say () { println("hello, i'm a student"); }}// 平常使用main () {    // 如果是 基于 SPI 运行时注册的类,你是不知道他的类名的,就像你并不知道他叫 Student    // 即使你知道他类名叫 Student,也没有用,因为这里的 Student 类需要 import 进来,而编译时并没有这个类,就没办法 import    Student stu = new Student();    stu.say();}// 基于 SPIinterface Person {    public void say();}class Student implements Person {    public void say () { println("hello, i'm a student"); }}main () {    // 动态加载类,使用 cast 就可以得到一个 Person接口的实例,就可以调用 say() 方法了    // 这里是直接获取 Service,可以看到能得到的 Service 是受限的,所以可以使用一个 Provider 并提供一个 API 来获取 Service,以期更完美的实现。(参看上面的三个重要组件。)    Person stu = (Person) Class.forName("com.xx.Student").newInstance();    stu.say();}

JDK 中的 SPI 实现

JDK 的 SPI 实现是由 java.util.ServiceLoader 类实现。

以下是 ServiceLoader 类的成员变量:

public final class ServiceLoader<S>    implements Iterable<S>{   // 可以看到默认的寻找配置的地址是 META-INF/services/    private static final String PREFIX = "META-INF/services/";    // 使用 Class.forName 加载到配置文件中的对象,使用 service.cast(newInstance) 强转类型    // The class or interface representing the service being loaded    private Class<S> service;    // 类加载器,如果为null,默认使用 systemClassLoader    // The class loader used to locate, load, and instantiate providers    private ClassLoader loader;    // 存储 provider 的集合,存的是 service 的实例[service.cast(newInstance)]    // Cached providers, in instantiation order    private LinkedHashMap<String,S> providers = new LinkedHashMap<>();    // 迭代器,迭代过程中实例化 service并存到 map中    // The current lazy-lookup iterator    private LazyIterator lookupIterator;    ...}

其实原理也很简单,用目标对象 Interface 作为泛型,这样就能利用 Interface 的全限定名查找 META-INF/services/ 下的文件,然后一行一行读取文件,加载 providers 到 map 中。所以 ServiceLoader 类是加载配置文件中全部的类的实例的,而且是一次性加载完成。

由上可知,整个类的其他部分就是在实现查找文件 => 获取类加载器 => 加载 class 对象 => cast 到指定 Interface 并存入 map,而整个过程都是在迭代器中完成的(iterator.next() 方法)。

哦,对了,重新加载直接调用 reload() 方法就好了,方法实现就是新建一个 LazyIterator,然后重复上面的动作。

Scala 学习总结

作者 nickChen
2018年4月17日 16:04

这篇文章用来总结 Scala 学习中需要记录的知识,也会是以后 Scala 相关知识的索引。

Assignment

Scala 的赋值语句的返回值是 Unit, 因此不能使用 x=y=1 类似的赋值语法。
可以使用 `@`` 的小技巧来完成一个连续赋值的语法糖。

y = 1 // Unit ()// not workx = y = 1// trickvar x@y = 1 // x = 1, y = 1

Input

读取 Console 数据。

import scala.io._val name = StdIn.readLine("your name:")print("your age:")val age = StdIn.readInt()

Output

格式化输出,建议使用 f插值表达式,类型会在编译期得到检查。

printf("%d year %f seconds", year, sec)// recommend, type-safeprint(f"$year ${sec}%7.2f seconds")// raw textprint(raw"\n 123")

Loops

Scala 没有 break, continue 关键字,只能通过其他方式来实现。

// 1. use Booleanvar flag = truefor (i <- 1 to 9) {    if (flag) {        print(i)        flag = false    } else flag = true}// 2. use `return`def loop(): Int = {  for (i <- 1 to 10) {    if (i == 2) return -1    else ()  }  0}// 3. use `break` method in the `Breaks` object// not recommend//   the control transfer is done by throwing and catching an exception,//   so you should avoid this mechanism when time is of essence.import scala.util.control.Breaks._def loop1(): Unit = {  breakable {    for (i <- 1 to 10) {      if (i == 3) break      else println(i)    }  }}

Scala 的循环语句中,本地的变量名可以覆盖使用。

val n = 10// local n will be overlappingfor (n <- 1 to 9) {    print(n) // print 1-9}

Advanced for Loops

Scala 有更加便利的循环操作,可以完成多级循环以及列表推导。

非常简单的语法糖完成多级的 for 循环

// multiple generatorsfor (i <- 1 to 3; j <- 1 to 3) println(f"${i*10 + j}%3d")// guardfor (i <- 1 to 3; j <- 1 to 3 if i!=j) println(f"${i*10 + j}%3d")// definitions, any number.for (i <- 1 to 3; from = 4-i; j <- from to 3) println(f"${i*10 + j}%3d")

列表推导

列表推导生成的结果总是兼容第一个生成器的格式,可以看2、3例,第一个生成器是 String, 生成的就是 String格式。

for (i <- 1 to 9) yield i%5  // Yields Vector(1, 2, 3, 4, 0, 1, 2, 3, 4)// The generated collection is compatible with the first generator.for (c <- "Hello"; i <- 0 to 1) yield (c + i).toChar  // Yields "HIeflmlmop"for (i <- 0 to 1; c <- "Hello") yield (c + i).toChar  // Yields Vector('H', 'e', 'l', 'l', 'o', 'I', 'f', 'm', 'm', 'p')

如果不想使用分号的风格 ,可以使用 {} 加换行 替代

for { i <- 1 to 3  from = 4 - i  j <- from to 3 }

Variable Arguments

这是普通的可变长参数函数的实现,这里主要是指出一下 Scala 特有的语法。

能够使一个列表转变成可变参数的形式传递到方法内。

def sum(args: Int *): Int = {  if (args.isEmpty) 0  else args.head + sum(args.tail :_*)}sum(1 to 5 :_*)

一道 String Interpolator 的题目

快捷的定义一个 java.time.LocalDate,使用到了 implicit 关键字。

import java.time.LocalDateimplicit class DateInterpolator(val sc: StringContext) extends AnyVal {  def date(args: Any*): LocalDate = {    if (args.length != 3) throw new IllegalArgumentException("arguments should contain year, month, day.")    for (x <- sc.parts) if (x.length > 0 && !x.equals("-")) throw new IllegalArgumentException("year-month-day format required")    LocalDate.of(args(0).toString.toInt, args(1).toString.toInt, args(2).toString.toInt)  }}val year = 2017val month = 1val day = 5date"$year-$month-$day" // java.time.LocalDate = 2017-01-05

Array

Scala 中的数组操作, Array 对应的是定长数组,ArrayBuffer 对应的是 Java的 ArrayList

// Traverse indicesfor (i <- 0 until a.length) { }// orfor (i <- a.indices) { }// To visit every second elementfor (i <- 0 until a.length by 2) { }// To visit the elements starting from the end of the arrayfor (i <- 0 until a.length by -1) { }// orfor (i <- a.indices.reverse) { }// Traverse all values of the listfor (i <- a) { }

Class

Scala 实现 class的方式不同于 Java。Scala 对所有的 varval都会选择性地生成对应的 Setter & Getter

generatevarval
setter×
getter

如果声明是 private 的话,那么生成的 Setter & Getter 也是 private 的。
如果不想要生成 Setter & Getter,可以使用 private[this] 来修饰字段。
这里还有一个特殊点:字段声明是 private 的,只有该类的对象才能访问,这点和 Java的表现不同(Java 是只能在类部才能使用)。
下面代码中的 other也是一个 Counter类型,他也能访问 private var value。如果使用了 private[this],表现就和 Java中一样了。

class Counter {  private var value = 0  def increment() { value += 1 }  def isLess(other : Counter) = value < other.value    // Can access private field of other object}

Extractors with No Arguments

Extractors 可以用无参形式调用,这种情况下,它的返回值应该是一个 Boolean
下面是一个样例,可以看到无参形式的 Extractors在模式匹配的时候使用。

object Name {  def unapply(input: String) = {    val pos = input.indexOf(" ")    if (pos == -1) None    else Some((input.substring(0, pos), input.substring(pos + 1)))  }}object IsCompound {  def unapply(input: String) = input.contains(" ")}val author = "king kong W" // "king kongW"author match {  case Name(first, IsCompound()) => print(first + " mix " )    // 当 IsCompound() 的返回值为 True时执行  case Name(first, last) => print(first + " : " + last)}

Functions as Values

Scala 中函数(Function)也是第一等公民,可以作为值来传递。但是方法(Method)并不是函数,无法作为值传递。
下面展示一下方法如何转化为一个函数。
PS: 任何时候使用 def 关键词定义的都是方法,不是函数。

import scala.math._// -- method from package object --val fun = ceil _    //  the _ turns the ceil method into a function.val func:(Double) => Double = ceil    // The _ suffix is not necessary when you use a method name in a context where    // a function is expected.// -- method from a class --val f = (_: String).charAt(_:Int)val fc: (String, Int) => Char = _.charAt(_)

Control Abstractions

Scala 中有两种调用形式的参数, call-by-namecall-by-value,大多数情况下只使用后者,现在有一种使用前者的情况。

                // call-by-valuedef runInThread(block: () => Unit) {  // 这是对一个参数的类型定义  new Thread {    override def run() { block() } // 这里是调用函数  }.start()}runInThread { () => println("Hi"); Thread.sleep(10000); println("Bye") }    // 这里调用的时候 必须是 `() =>`带这个开头,就显得很多余
                // call-by-namedef runInThread(block: => Unit) {  new Thread {    override def run() { block }  }.start()}runInThread { println("Hi"); Thread.sleep(10000); println("Bye") }    // 这里就可以省略掉 `() =>`这个开头了,匿名函数写起来就很简洁

可以看到 call-by-name 的参数调用使得方法在调用的时候非常方便,这里利用这一点实现类似 while 的语法。

// definition          // call-by-name        // call-by-namedef until(condition: => Boolean)(block: => Unit) {  if (!condition) {    block    until(condition)(block)  }}// -- sample --var x = 10until (x == 0) { // without `()=>`, pretty concise  x -= 1  println(x)}Unlike a regular (or call-by-value) parameter, the parameter expression is not evaluated when the function is called.After all, we don’t want x == 0 to evaluate to false in the call to until.

这里说的非常重要,正是因为 call-by-name 的这个特性,才使得 until 方法可以对在运行时求值,而不是调用方法时 x==0 就已经作为值 false 传入了。

Patterns in Variable Declarations

Scala 支持在变量声明时的解构操作,如下操作:

val (x, y) = (1, 2)

对于表达式 val p(x1, ..., xn) = e, 定义上等同与

val $result = e match { case p(x1, ..., xn) => (x1, ..., xn) }val x1 = $result._1...val xn = $result._n

其中 x1~xn 是 free variables,可以是任意的值,如下表达式,在 Scala中是合理的:

val 2 = x

等同于:

var $result = x match { case 2 => () }// No assignments.

并没有赋值语句。 这等同于:

if (!(2 == x)) throw new MatchError

Partial Functions

Scala 又一迷之特性,这个语法糖不知道又会有多少玩法了。 偏函数,它的定义是这样的:

a function which may not be defined for all inputs. PartialFunction[A, B]. (A is the parameter type, B the return type.)

实际上如果一个偏函数穷举了所有可能性,那他就变成了一个 Function1。一个神奇的方法… Scala 设置了 Function1 到 Function22 总共可以允许 22 个参数。

然后就是神奇的语法糖了,甜不甜…

A Seq[A] is a PartialFunction[Int, A], and a Map[K, V] is a PartialFunction[K, V].

基于这个可以带来的操作:

val names = Array("Alice", "Bob", "Carmen")val scores = Map("Alice" -> 10, "Carmen" -> 7)names.collect(scores) // Yields Array(10, 7)

偏函数有 lift 函数,可以将偏函数转变为一个正常的函数,返回值是 Option[T]。反之也可以将一个有 Option[T] 返回值的函数,通过 unlift 转变为一个偏函数。

try 语句的 catch 子句就是一个偏函数,可以将这个字句赋值给一个变量。

                // call by namedef tryCatch[T](b: => T, catcher: PartialFunction[Throwable, T]) =  try { b } catch catcherval result = tryCatch(str.toInt,  { case _: NumberFormatException => -1 })

可以看到 catch 子句就是一个偏函数, 通过 catcher 这个变量可以动态的切换偏函数。

不得不感叹一声,这个设计思维啊。

实现Leader选举通过 Curator

作者 nickChen
2018年4月17日 14:42

Curator Framework 深入了解

本文受到 colobu 前辈文章的指引,深入了解 Curator Framework 的工作流程,十分感谢 colobu 前辈的博文给予的启发和指导。

选举功能实现 (Leader Election)

Curator 提供了 Leader 选举的功能,用于在分布式计算中选举出一个节点作为一组节点的 Leader。Curator 提供了两种 Leader Election 的 Recipe:

LeaderLatch

构造方法:

// LeaderLatch.classpublic LeaderLatch(CuratorFramework client, String latchPath)public LeaderLatch(CuratorFramework client, String latchPath, String id/*zk的 path:value 中的 value*/)

同之前几章的使用风格,需要 start() 方法调用了才会开启选举。 start() 方法之后会调用真正的工作开始方法:

// LeaderLatch.classprivate synchronized void internalStart() {        if ( state.get() == State.STARTED ) { // 状态标记为开始 start()会完成            // 很重要的一条实践,客户端需要注册一个 lisenter 用来监听和 zk 连接的状态,比如中断、重连等            client.getConnectionStateListenable().addListener(listener);            //...            // 开始选举相关的工作            reset();            //...        }}

reset() 是一个会重复执行的方法,用来争抢当前的 leader:

// LeaderLatch.classvoid reset() throws Exception {        setLeadership(false); // 当前不是leader,先置为 false;如果是leader不会进行这个操作        setNode(null); // 成为leader后会创建他的节点,存储起来方便下次删除旧节点        // Curator 方法非常通用的一种设计,专门用来做回调        BackgroundCallback callback = new BackgroundCallback() {            @Override            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {                // 不知道这个 debugResetWaitLatch 这个什么用... 一开始就被赋值 null,没有修改过。看起来开发开发另一个新特性的 hook。                // volatile CountDownLatch debugResetWaitLatch = null;                if ( debugResetWaitLatch != null ) {                    debugResetWaitLatch.await();                    debugResetWaitLatch = null;                }                // 节点创建成功                if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) {                    setNode(event.getName()); // 将当前 path 的名称记录下来,方便后续删除                    if ( state.get() == State.CLOSED ) {                        setNode(null); // 这应该是一个安全检测,如果这时候leaderLatch被 close() 了,这里的 node 也就不存了。下面创建的也是临时节点。                    } else {                        getChildren(); // 获取latchPath(构造方法中传入的)下所有的节点,用来关键的判断谁拿到了 leader 权限                    }                } else {                    log.error("getChildren() failed. rc = " + event.getResultCode());                }            }        };        //这里可以看到创建的是一个临时节点,value的值就是 id    client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));    }

checkLeadership() 是关键的终结方法了,他用来判断是谁拿到了 leader 权限:

// LeaderLatch.classprivate void getChildren() throws Exception {        BackgroundCallback callback = new BackgroundCallback() {            public void processResult(... )throws Exception {                if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) {                    // 终结方法,找到对应的 leader                    checkLeadership(event.getChildren());                }            }        };    // 获取 latchPath 所有的节点client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));    }

篇幅有限,checkLeadership() 只介绍获得 leader 身份的情况了:

// LeaderLatch.classprivate void checkLeadership(List<String> children) throws Exception {        final String localOurPath = ourPath.get(); // 当前 LeaderLacth 获取的节点        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); // 排序 latchPath 下所有的节点        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; // 很明白的代码,查询当前 LeaderLacth 类的节点是否出现在排序数组中        if ( ourIndex < 0 ) {// 没有出现,就 reset() 方法重新来            log.error("Can't find our node. Resetting. Index: " + ourIndex);            reset();        }        else if ( ourIndex == 0 ) {// 这里就是关键了, == 0,排在第一位,获得 leader 权限            setLeadership(true);        } else { /*...*/}}

至此一个 Leader 选举的过程就完成了,Curator 利用了 ZooKeeper 的各种特性可谓是玩出了花儿…

这里还介绍一个阻塞的方法等待当前对象获取到 Leader 身份:

// LeaderLatch.classpublic void await() throws InterruptedException, EOFException {        synchronized(this) { // 锁住当前对象            while ((state.get() == State.STARTED) && !hasLeadership.get()){                wait(); // 等待成为 Leader,这里 setLeadership(true) 的方法里会 notifyAll()来唤醒的            }        }        if ( state.get() != State.STARTED ) {            throw new EOFException();        }}// 超时版本public boolean await(long timeout, TimeUnit unit) throws InterruptedException

LeaderSelector

Curator还提供了另外一种选举方法,注意涉及以下四个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException
// LeaderSelector.class// 构造函数public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)

需要分析 LeaderSelector 依旧需要从 start() 方法开始,但在开始之前还有一个重要的方法 autoRequeue() 。如果需要该实例不停的去尝试获取 leader 身份,就需要调用此方法一次,在构造好该对象之后先调用 autoRequeue()start()

start() 的逻辑是 :

start() -> requeue() -> internalRequeue() ----                               ↑             ↓ autoRequeue == true                               --------------

internalRequeue() 中配置了一个 Future 任务执行 doWorkLoop() 方法,每次调用 internalRequeue() 是同步的,并且 Future 任务执行也是同步的,也就是必须一次一次同步的去尝试获取 leader 身份。

// LeaderSelector.classvoid doWork() throws Exception {        hasLeadership = false;        try {            // 这里就是关键了,这是一个分布式锁            // InterProcessMutex mutex            // 一旦这个拿到了就是持有锁了            // 下面只需要 takeLeadership 方法阻塞住方法,不让这边执行到 finally 代码块就好了            mutex.acquire();            hasLeadership = true;            try {/*...*/}            catch(/**/){/**/}            finally {                clearIsQueued();            }        }        catch ( InterruptedException e ) {            Thread.currentThread().interrupt();            throw e;        }        finally {            if ( hasLeadership ) {                hasLeadership = false;                try {                    mutex.release(); // 释放了锁,其他的可以去竞争 leader 了                }                catch ( Exception e )                {                    ThreadUtils.checkInterrupted(e);                    log.error("The leader threw an exception", e);                    // ignore errors - this is just a safety                }            }        }    }

异常处理
LeaderSelectorListener 类继承了 ConnectionStateListenerLeaderSelector 必须小心连接状态的改变。如果实例成为 leader, 当 SUSPENDED 状态出现时, 实例必须假定在重新连接成功之前它可能不再是 leader了。 如果 LOST 状态出现, 实例不再是 leader, takeLeadership() 方法返回。

重要:推荐处理方式是当收到 SUSPENDEDLOST 时抛出 CancelLeadershipException 异常。 这会导致 LeaderSelector 实例中断并取消执行 takeLeadership()方法的异常。Curator 提供了 LeaderSelectorListenerAdapter 以供继承,此 Adapter 提供了推荐的处理逻辑。

public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {    @Override    public void stateChanged(CuratorFramework client, ConnectionState newState ){        if ( client.getConnectionStateErrorPolicy().isErrorState(newState) ){            throw new CancelLeadershipException();        }    }}

这里跑出异常以中断 takeLeadership()方法只能抛出CancelLeadershipException 异常:

// LeaderSelector.WrappedListener.classpublic void stateChanged(CuratorFramework client, ConnectionState newState) {       try{                listener.stateChanged(client, newState);       } catch ( CancelLeadershipException dummy ) {                // 中断逻辑                 leaderSelector.interruptLeadership();       }}

LeaderLatch 相比, 通过 LeaderSelectorListener 可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。

实现分布式队列通过 Curator

作者 nickChen
2018年4月12日 11:33

Curator Framework 深入了解

本文受到 colobu 前辈文章的指引,深入了解 Curator Framework 的工作流程,十分感谢 colobu 前辈的博文给予的启发和指导。

分布式队列实现(DistributedQueue 实现)

DistributedQueue 是最普通的一种队列。 它设计以下四个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

创建队列使用 QueueBuilder,它也是其它队列的创建类,看看他的 builder 方法:

    public static <T> QueueBuilder<T> builder(CuratorFramework client, QueueConsumer<T> consumer, QueueSerializer<T> serializer, String queuePath) {            return new QueueBuilder(client, consumer, serializer, queuePath);        }

这里有四个入参,分别对应着客户端连接对象,consumer 对象,serializer 对象,path 节点对象。队列的消费就是通过 consumer 对象来实现的;serializer 对象负责存入 queue 的数据序列化和消费时的反序列化。

    // 创建了一个没有 consumer 的 builder    QueueBuilder<String> builder = QueueBuilder.builder(client, null, createQueueSerializer(), PATH);     // 创建了一个 queue 对象    DistributedQueue<String> queue = builder.buildQueue();    // 启动 queue    queue.start();    // 这样操作就可以往 queue 里塞入消息了    queue.put("Test Message.");

注意此时的 queue 是没有消费者的,如果需要消费者可以新建一个 queue_2 对象来消费对应 queuePath 的消息队列。当然也可以在创建 queue 对象的时候配置好 consumer 就可以即刻消费了。

    // 注意这里的第二个入参,配置了 consumer,此时的 queue_2 如果 start 会直接开始消费队列中的消息    DistributedQueue<String> queue_2 = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH).buildQueue();

具体的逻辑可以看 queue.start() 时做了什么操作:

    // DistributedQueue.class    private final boolean isProducerOnly;    ...    // 构造函数中做了如下判断    this.isProducerOnly = consumer == null;    // 下面就是配置了 consumer 时会进行的操作,通过 runLoop 方法去不停的消费队列    // public void start() throws Exception    if (!this.isProducerOnly) {        this.service.submit(new Callable<Object>() {             public Object call() {                 DistributedQueue.this.runLoop();                 return null;             }        });    }    // private DistributedQueue.ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception     // 伪代码 可以看到这个 processMessageBytes 方法是真正消费的地方,先把消息反序列化之后再使用 consumer 对象的 consumeMessage() 方法    this.consumer.consumeMessage(ItemSerializer.deserialize(bytes, this.serializer));

上面的代码中还有个缺点,通过源码可知,消费队列是先将消息从队列中移除,再由 consumer 消费。 这两个步骤不是原子的,QueueBuilder 提供了 lockPath(String path) 方法以保证消费安全。当消费者消费数据时持有锁,这样其它消费者不能消费此消息。如果消费失败或者进程死掉,消息可以交给其它进程。这会带来一点性能的损失。 最好还是单消费者模式使用队列。

    // DistributedQueue.class    // private void processChildren(List<String> children, long currentVersion) throws Exception    // 这里就可以看到加锁和不加锁采用的是不同的策略    if (isUsingLockSafety) {        DistributedQueue.this.processWithLockSafety(itemNode, DistributedQueue.ProcessType.NORMAL);    } else {        DistributedQueue.this.processNormally(itemNode, DistributedQueue.ProcessType.NORMAL);    }

分布式含ID队列实现(DistributedIdQueue 实现)

DistributedIdQueue 和上面的队列类似, 但是可以为队列中的每一个元素设置一个ID。 可以通过ID把队列中任意的元素移除。

通过下面方法创建:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

看下他是如何实现 id 这个属性的:

    // DistributedIdQueue.class    private String makeIdPath(String itemId) {       return this.queue.makeItemPath() + '|' + fixId(itemId) + '|';    }    // DistributedQueue.class    String makeItemPath() {       return ZKPaths.makePath(this.queuePath, "queue-");    }

可以看到他是直接通过 id 的值加入 path 生成了一个指定的节点存储数据,这样也可以逆向操作得到该节点的 path 从而删除元素。

添加元素调用的都是 DistributedQueue 中的 internalPut() 方法:

    boolean internalPut(T item, MultiItem<T> multiItem, String path, int maxWait, TimeUnit unit) throws Exception

DistributedIdQueue 和 DistributedQueue 添加元素的 put 方法实际上都是调用到这个方法。DistributedIdQueue 是自己构建了 path,而 DistributedQueue 是自动生成如下的节点 path。

queue-0000000009queue-0000000008queue-0000000007queue-0000000006queue-0000000005queue-0000000004queue-0000000003queue-0000000002

分布式优先级队列实现(DistributedPriorityQueue 实现)

优先级队列对队列中的消息按照优先级进行排序。 Priority 越小越靠前, 优先被消费。

创建一个 DistributedPriorityQueue 的方式如下:

    DistributedPriorityQueue<String> queue = builder.buildPriorityQueue(0/*minItemsBeforeRefresh*/);

可以看到只需要配置一个 minItemsBeforeRefresh 参数,这个参数用来对比当前是否需要进行重排序;需要强制重排序还需要配合 refreshOnWatch 参数,不过在 builder 创建 DistributedPriorityQueue 的时候就在 DistributedQueue 的构造参数上设置该值为 true 了。

    // DistributedPriorityQueue 的构造参数    DistributedPriorityQueue(.../*很多入参*/) {            Preconditions.checkArgument(minItemsBeforeRefresh >= 0, "minItemsBeforeRefresh cannot be negative");            this.queue = new DistributedQueue(client, consumer, serializer, queuePath, threadFactory, executor, minItemsBeforeRefresh, true/*refreshOnWatch 直接设置为 true 了。*/, lockPath, maxItems, putInBackground, finalFlushMs);        }

强制重排序的逻辑如下:

    // DistributedQueue.class    //private void processChildren(List<String> children, long currentVersion) throws Exception     int min = this.minItemsBeforeRefresh;    ...    while(..){        // min 就是强制刷新所需的最小的元素数量,或称之你的程序可以容忍的不排序的最小值。        // 从源码可以看出 minItemsBeforeRefresh 被设置为 1 或者 0 都是可以直接触发重排序的一个决定因素        if (min-- <= 0 && this.refreshOnWatch && currentVersion != this.childrenCache.getData().version) {            // 这里的 processedLatch 是一个 Semaphore 对象             // final Semaphore processedLatch = new Semaphore(0);            // 可以看到代码段的最下方的 acquire 代码,线程池消费完所有的代码之后才会 release 所有的信号量            // 这里直接释放了,这样处理逻辑的代码可以直接退出            // 然后在 runLoop 下一次循环的时候会进行 collection 的 sort            processedLatch.release(children.size());            break;        }        ...        消费消息的代码        ...    }    processedLatch.acquire(children.size());  

分布式Delay队列实现(DistributedDelayQueue 实现)

DistributedDelayQueue 中新增的元素有个delay值, 消费者隔一段时间才能收到元素。同样的可以通过 QueueBuilder 来创建该对象:

    DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时可以指定 delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意 delayUntilEpoch 不是离现在的一个时间间隔, 比如20毫秒,而是未来的一个时间戳,如 System.currentTimeMillis() + 10 秒。 如果delayUntilEpoch的时间已经过去,消息会立刻被消费者接收。

延时队列的实现同样基于 DistributedQueue,在 runLoop 方法的逻辑中,会获取元素的 delay 值,默认直接返回 0,DistributedDelayQueue 重写了获取 delay 时间的方法:

    // DistributedQueue.class    // private void runLoop()     maxWaitMs = this.getDelay((String)children.get(0));    if (maxWaitMs <= 0L) {        this.processChildren(children, currentVersion);    }
    // DistributedDelayQueue.class    // 构造函数    DistributedDelayQueue(.../*很多入参*/) {            Preconditions.checkArgument(minItemsBeforeRefresh >= 0, "minItemsBeforeRefresh cannot be negative");            this.queue = new DistributedQueue<T>(.../*很多入参*/) {                // override 了原有的方法,DistributedQueue 中的 getDelay 方法直接返回 0L                protected long getDelay(String itemNode) {                    return this.getDelay(itemNode, System.currentTimeMillis());                }                private long getDelay(String itemNode, long sortTime) {                    long epoch = DistributedDelayQueue.getEpoch(itemNode);                    return epoch - sortTime;                }                // 重写了排序的方法,根据 delay 的时间来排序了                protected void sortChildren(List<String> children) {                    final long sortTime = System.currentTimeMillis();                    Collections.sort(children, new Comparator<String>() {                        public int compare(String o1, String o2) {                            long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);                            return diff < 0L ? -1 : (diff > 0L ? 1 : 0);                        }                    });                }            };        }

JDK Queue风格接口的分布式队列实现(SimpleDistributedQueue 实现)

前面虽然实现了各种队列,但并没有 JDK 中的队列接口风格实现。 SimpleDistributedQueue 提供了和JDK一致性的接口(但是没有实现Queue接口)。 创建很简单:

public SimpleDistributedQueue(CuratorFramework client, String path)

增加元素:

    public boolean offer(byte[] data) throws Exception    // 即往一个既定的 path 下有以 qn- 开头的子路径,如 /path/qn-0000001 

删除元素:

    public byte[] take() throws Exception    // 获取队列最前的元素,同时zk剔除该路径    // 使用 CountDownLatch 来达到超时的设置,虽然 take 是没有设置超时的... 也就是要一致等待 zk 回应

另外还提供了其它方法:

    // 获取元素 同 element() 返回队列最前的元素    public byte[] peek() throws Exception    // 和 take() 实际上是一样的方法,但是这里会有超时配置,见上关于 take()的解释    public byte[] poll(long timeout, TimeUnit unit) throws Exception    // 和 remove() 操作一样    public byte[] poll() throws Exception    // 直接删掉队列最前的元素    public byte[] remove() throws Exception    public byte[] element() throws Exception
❌
❌