普通视图

发现新文章,点击刷新页面。
昨天以前hanjm's Blog

深入理解ActiveMQ消息队列协议STMOP AMQP MQTT

作者 hanjm
2019年2月7日 00:00

前言

AWS MQ是完全托管的 ActiveMQ 服务, 最近需要使用, 于是学习其文档, 实践其特性, 由于 ActiveMQ 支持非常丰富的协议, OpenWire amqp stomp mqtt, 所以也学习了各大协议的特性及其SDK.

安装

本地开发最方便的方式当然是docker了, rmohr/activemq 文档比较好的且有aws支持的5.15.6版本的tag.

需要注意的是, 首先要根据其docker hub镜像文档上的几步操作, 将镜像中的默认配置文件复制到自定义的本机conf目录下 /usr/local/activemq/conf, 然后就快速地启动了一个默认配置的 ActiveMQ server

# active mq
docker run -itd --name activemq \
-p 61616:61616 -p 8161:8161 -p 5672:5672 -p 61613:61613 -p 1883:1883 -p 61614:61614 \
-v /usr/local/activemq/conf:/opt/activemq/conf \
-v /usr/local/activemq/data:/opt/activemq/data \
rmohr/activemq:5.15.6

特性

Advisory

ActiveMQ可以将本身的一些事件投递到系统的消息队列, 如 queue/topic的创建, 没有消费者的queue/topic等. http://activemq.apache.org/advisory-message.html

这个特性对于监控MQ非常有用, 默认配置时关闭的, 需要在配置文件activemq.xml中打开.

Wildcards

通配符

. 用于分割名字中的多个单词
* 表示任一名字, 不包括点号(.)
> 表示任一名字, 包括点号(.), 用于表示前缀, >符号后面不会再跟其他限制条件.

通配符可以用在配置文件中表名作用范围, 也可以用于订阅时的destination名字, 这个功能很不错.

Virtual Topic

所谓virtual topic 就是将一个正常的topic, 变成了多个queue. 如TopicA启用了Virtual topic, 则consumer可以去消费 Consumer.xxx.TopicA 这样模式的queue的消息. (http://activemq.apache.org/virtual-destinations.html)

xxx对应类似NSQ中的Channel概念.

需要在activemq.xml中配置virtualDestinationInterceptor的范围 prefix及其他选项.

  • name=">" 表示所有的topic都启用virtualTopic功能.

  • prefix="Consumer.*." 表示可以订阅的virtualTopic的pattern是Consumer..

<destinationInterceptors> 
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="Consumer.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>

Delay & Schedule

ActiveMQ支持延时消息及定时消息, 在message header中带上如下字段即可, 其中AMQ_SCHEDULED_PERIOD的最大值是long的最大值, 所以可以设置延时很长时间.

Property nametypedescription
AMQ_SCHEDULED_DELAYlongThe time in milliseconds that a message will wait before being scheduled to be delivered by the broker
AMQ_SCHEDULED_PERIODlongThe time in milliseconds to wait after the start time to wait before scheduling the message again
AMQ_SCHEDULED_REPEATintThe number of times to repeat scheduling a message for delivery
AMQ_SCHEDULED_CRONStringUse a Cron entry to set the schedule

Dead Letter Queue

如果broker投递给消费者消息, 没有ACK或NACK, 则会触发重新投递, 投递超过一定次数则会进入死信队列, 默认只有一个公共的死信队列ActiveMQ.DLQ, 如果需要给topic分别设置死信队列, 则要在修改activemq.xml.

<broker>
   
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <!-- Set the following policy on all queues using the '>' wildcard -->
        <policyEntry queue=">">
          <deadLetterStrategy>
            <!--
              Use the prefix 'DLQ.' for the destination name, and make
              the DLQ a queue rather than a topic
            -->
            <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
          </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
</broker>

默认非持久化的topic不会进入到死信队列中, 如果需要, 则修改activemq.xml, 加入

<!-- 
Tell the dead letter strategy to also place non-persisted messages
onto the dead-letter queue if they can't be delivered.
-->
<deadLetterStrategy>
<... processNonPersistent="true" />
</deadLetterStrategy>

实践

STOMP

STOMP是Simple (or Streaming) Text Orientated Messaging Protocol 的缩写, 设计思路借鉴了HTTP, 有content-type, header, body, frame based, text based等类似HTTP的相关概念, 设计文档 < https://stomp.github.io/stomp-specification-1.2.html>, 非常得简洁, 一页就讲完了.

协议细节及特点:

  1. 对于重复的header key, 只有第一个有效.
  2. 服务端可以限制消息大小, header field数量, header长度.
  3. 一个client开多个subscriber时, 必须设置subscribe id.
  4. NACK command 表示 requeue.
  5. stomp有事务的概念, 消息从producer发出到broker确认收到算一个事务, broker投递到consumer ACK算一个事务, 事务具有原子性.
  6. 支持SSL.

ActiveMQ作为STOMP server

  1. 支持 v1.1版本的STMOP协议.

  2. 默认最大消息长度 maxDataLength104857600, maxFrameSizeMAX_LONG.

  3. 通过 destination 名字前缀是/queue/ 还是 /topic/ 来区分是 queue (生产消费模型)还是 topic(发布订阅模型). 真正的名字是去掉包括两个/符号的前缀后的.

  4. 发送默认不是持久化的, 需要在SEND时手动指定persistent:true的header以开启持久化.

    订阅默认不是持久化的, 需要在SUBSCRIBE时手动指定activemq.subscriptionName:订阅者名字的header来开启持久化订阅.

    很多特性都是靠STOMP header来处理的, ActiveMQ官方文档上有两节讲STOMP的header. http://activemq.apache.org/stomp.html#Stomp-StompExtensionsforJMSMessageSemantics

SDK

https://github.com/go-stomp/stomp 是目前star数最高的

  1. 提了个PR https://github.com/go-stomp/stomp/pull/58
  2. 解决了个issue https://github.com/go-stomp/stomp/issues/47

demo 代码

package main

import (
"context"
"github.com/go-stomp/stomp"
"github.com/hanjm/log"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
)

func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

wg.Add(1)
go func() {
defer wg.Done()
publisher(ctx, "/topic/stomp")
}()

wg.Add(1)
go func() {
defer wg.Done()
Subscriber(ctx, "channel1", "Consumer.channel1.stomp")
}()
//
wg.Add(1)
go func() {
defer wg.Done()
Subscriber(ctx, "channel2", "Consumer.channel2.stomp")
}()

wg.Add(1)
go func() {
defer wg.Done()
Subscriber(ctx, "channel3", "/topic/stomp")
}()

defer func() {
cancel()
wg.Wait()
}()
SignalsListen()
}

func publisher(ctx context.Context, destination string) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
defer conn.Disconnect()
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
err = conn.Send(
destination, // destination
"text/plain", // content-type
[]byte("Test message #"+strconv.Itoa(i)), stomp.SendOpt.Header("persistent", "true")) // body
if err != nil {
log.Error(err)
return
}
}
}
}

func Subscriber(ctx context.Context, clientID string, destination string) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
defer conn.Disconnect()
sub, err := conn.Subscribe(destination, stomp.AckClientIndividual, stomp.SubscribeOpt.Id(clientID), stomp.SubscribeOpt.Header("persistent", "true"))
if err != nil {
log.Fatal(err)
return
}
go func() {
select {
case <-ctx.Done():
err := sub.Unsubscribe()
if err != nil {
log.Fatal(clientID, err)
return
}
return
}
}()
for m := range sub.C {
if m.Err != nil {
log.Fatal(err)
return
}
log.Infof("%s msg body:%s", clientID, m.Body)
//log.Infof("%s msg header:%s", clientID, *m.Header)
//log.Infof("%s msg content-type:%s", clientID, m.ContentType)
//log.Infof("%s msg destination:%s", clientID, m.Destination)
m.Conn.Ack(m)
}
log.Info("close sub")
}

func SignalsListen() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGQUIT,
syscall.SIGTERM,
syscall.SIGINT,
syscall.SIGUSR1,
syscall.SIGUSR2)

switch <-sigs {
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT:
log.Info("service close")
}
return
}

MQTT

协议文档http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

翻译版文档https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html

协议细节及特点:

  1. transport支持TCP, 也支持WebSocket, 所以定位于IOT.
  2. 不支持生产消费模型, 只支持发布订阅模型.
  3. 用QOS来表示消息队列中的投递语义, QOS=0 表示至多发送一次, QOS=1表示至少发送一次, QOS=2表示精确地只发送一次.

ActiveMQ作为MQTT server

  1. 通配符不同, MQTT的 / + # 分别对应 ActiveMQ的. * >.
  2. QOS=0对应的是非持久化的topic, QOS=1或者QOS=2对应持久化的topic.

AMQP

协议文档: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html

AMQP相比 stomp mqtt 就复杂得多, 毕竟名字就是高级消息队列(Advanced Message Queuing Protocol ).

协议细节及特点:

  1. AMQP有很多不同的概念, 如Link, Container, Node. 不看模型文档的话就直接使用SDK的话会比较费劲. ContainerID对应ActiveMQ client ID, LinkName对应ActiveMQ subscription name.

ActiveMQ作为AMQP server

  1. 使用1.0协议, 所以使用了0.9.1的2k star的sdk不能用.(https://github.com/streadway/amqp), 而且官方也认为没必要支持旧版本的协议.
  2. 默认最大消息长度 maxDataLength104857600(100MB), maxFrameSizeMAX_LONG, consumer持有的未确认最大消息数量prefetch为1000, producerCredit为10000. 可通过连接的URI设定.
  3. 支持SSL.
  4. 通过 destination 名字前缀是queue:// 还是 topic:// 来区分是 queue (生产消费模型)还是 topic(发布订阅模型). 真正的名字是去掉包括两个/符号的前缀后的.

性能

分别使用

github.com/vcabbage/amqp 76star 13issue 5contributors
github.com/go-stomp/stomp 132star 3issue 14contributors
github.com/eclipse/paho.mqtt.golang 650star 20issue 34contributors

作为SDK, 分别测试了下pub sub 1KB大小的消息普通场景.

publish性能上, amqp=stomp>mqtt, amqp和stomp差不多, 是mqtt的两倍多.
subscribe性能上, amqp比stomp快一点, mqtt则慢很多.

benchmark代码

package all_bench

import (
"bytes"
"context"
"github.com/eclipse/paho.mqtt.golang"
"github.com/go-stomp/stomp"
"github.com/hanjm/log"
"pack.ag/amqp"
"sync/atomic"
"testing"
"time"
)

var msgData = bytes.Repeat([]byte("1"), 1024)

var (
stompDestination = "bench-stomp"
amqpDestination = "bench-amqp"
mqttDestination = "bench-mqtt"
pubMsgCount = 20000
subMsgCount = 100
)

func TestMain(m *testing.M) {
m.Run()
}

// go test -bench Publish -benchmem
// go test -bench Sub -benchmem
func BenchmarkStompPublish(b *testing.B) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
defer conn.Disconnect()

b.N = pubMsgCount
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
err = conn.Send(
stompDestination, // destination
"text/plain", // content-type
msgData) // body
if err != nil {
log.Error(err)
return
}
}
}

func BenchmarkAmqpPublish(b *testing.B) {
// Create client
client, err := amqp.Dial("amqp://127.0.0.1",
amqp.ConnSASLPlain("system", "manager"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()

// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = session.Close(ctx)
if err != nil {
log.Errorf("failed to close session:%s", err)
return
}
//log.Info("session close")
}()

// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress(amqpDestination),
amqp.LinkSourceDurability(amqp.DurabilityUnsettledState),
amqp.LinkSourceExpiryPolicy(amqp.ExpiryNever),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}

defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := sender.Close(ctx)
if err != nil {
log.Errorf("failed to close sender:%s", err)
return
}
//log.Infof("sender close")
}()

ctx := context.Background()
msg := amqp.NewMessage(msgData)

b.N = pubMsgCount
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
// Send message
err = sender.Send(ctx, msg)
if err != nil {
log.Fatal("Sending message:", err)
}
if err != nil {
log.Fatal(err)
return
}
}
}

func BenchmarkMqttPublish(b *testing.B) {
opt := mqtt.NewClientOptions().SetClientID("pubClient").SetCleanSession(false)
opt.AddBroker("tcp://127.0.0.1:1883")
client := mqtt.NewClient(opt)
t := client.Connect()
err := t.Error()
if err != nil {
log.Fatal(err)
return
}
if t.Wait() {
err := t.Error()
if err != nil {
log.Fatal(err)
return
}
}
defer func() {
client.Disconnect(10000)
}()

b.N = pubMsgCount
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
token := client.Publish(mqttDestination, 2, true, msgData)
err := token.Error()
if err != nil {
log.Fatal(err)
return
}
}
}

func BenchmarkStompSubscriber(b *testing.B) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
clientID := "1"
//defer conn.Disconnect()
sub, err := conn.Subscribe(stompDestination, stomp.AckClientIndividual, stomp.SubscribeOpt.Id(clientID))
if err != nil {
log.Fatal(err)
return
}
//defer func() {
//err := sub.Unsubscribe()
//if err != nil {
//log.Fatal(clientID, err)
//return
//}
//return
//}()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

b.N = subMsgCount
b.ReportAllocs()
b.ResetTimer()
defer b.StopTimer()
var i int64 = 0

go func() {
for range time.Tick(time.Second) {
if atomic.LoadInt64(&i) >= int64(b.N) {
cancel()
}
}
}()

defer func() {
//log.Info("close")
}()
for {
select {
case m := <-sub.C:
if m.Err != nil {
log.Fatal(m.Err)
return
}
m.Conn.Ack(m)
i++
if atomic.LoadInt64(&i) > int64(b.N) {
return
}
case <-ctx.Done():
return
}
}
}

func BenchmarkAmqpSubscriber(b *testing.B) {
// Create client
client, err := amqp.Dial("amqp://127.0.0.1",
amqp.ConnSASLPlain("system", "manager"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
//defer client.Close()

// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}

clientID := "1"
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := session.Close(ctx)
if err != nil {
log.Errorf("%s failed to close session:%s", clientID, err)
return
}
//log.Errorf("%s session close", clientID)
}()

// Continuously read messages
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress(amqpDestination),
amqp.LinkCredit(10),
)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := receiver.Close(ctx)
if err != nil {
log.Errorf("%s failed to close receiver:%s", clientID, err)
return
}
//log.Errorf("%s receiver close", clientID)
}()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

b.N = subMsgCount
b.ReportAllocs()
b.ResetTimer()
defer b.StopTimer()
var i int64 = 0

go func() {
for range time.Tick(time.Second) {
if atomic.LoadInt64(&i) >= int64(b.N) {
cancel()
}
}
}()
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
if err == context.Canceled {
log.Infof("Reading message from AMQP:%s", err)
break
}
log.Errorf("Reading message from AMQP:%s", err)
break
}
//log.Infof("%s msg body:%s value:%T %s", clientID, msg.GetData(), msg.Value, msg.Value)
// Accept message
msg.Accept()
atomic.AddInt64(&i, 1)
if atomic.LoadInt64(&i) > int64(b.N) {
//log.Info("return")
return
}
}
}

func BenchmarkMqttSubscriber(b *testing.B) {
opt := mqtt.NewClientOptions().SetClientID("subClient").SetCleanSession(false)
opt.AddBroker("tcp://127.0.0.1:1883")
client := mqtt.NewClient(opt)
t := client.Connect()
if t.Wait() {
err := t.Error()
if err != nil {
log.Fatal(err)
return
}
}
defer func() {
client.Disconnect(1000)
}()

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

b.N = subMsgCount
b.ReportAllocs()
b.ResetTimer()
defer b.StopTimer()
var i int64 = 0

go func() {
for range time.Tick(time.Second) {
if atomic.LoadInt64(&i) >= int64(b.N) {
cancel()
}
}
}()
client.Subscribe(mqttDestination, 2, func(c mqtt.Client, m mqtt.Message) {
//log.Infof("%s msg body:%s", "1", m.Payload())
m.Ack()
atomic.AddInt64(&i, 1)
if atomic.LoadInt64(&i) > int64(b.N) {
//log.Info("return")
return
}
})
select {
case <-ctx.Done():
break
}
log.Info("close sub")
}

一些细节行为

官方的FAQ里面写了一些实现的细节

  1. 如果producer比较快而consumer比较慢的话, ActiveMQ的流量控制功能使得producer阻塞. http://activemq.apache.org/what-happens-with-a-fast-producer-and-slow-consumer.html
  2. 不支持消费者拿到消息之后Requeue, 即不支持像NSQ那样的消费者出现业务逻辑错误后重试.http://activemq.apache.org/how-do-i-unack-the-message-with-stomp.html. 但是可以利用延时消息实现类似的功能.

性能调优

  1. 如果使用了virtualTopic, 那么默认配置下, virtualTopic对应的Queue越多, 发送越慢, 因为默认virtualTopic转发到queue是串行的, 需要调整concurrentSend=true启用并发发送到queue.

    https://activemq.apache.org/virtual-destinations
    https://issues.jboss.org/browse/ENTMQ-1093
    https://github.com/apache/activemq/blob/9abe2c6f97c92fc99c5a2ef02846f62002a671cf/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java#L87

  2. concurrentStoreAndDispatchQueues设置为false. 默认配置下, 这个值是true, 根据文档所说在快速消费者情况下, 此值设置为true可以加快持久化消息的性能, 因为被快速消费了消息可以不用落盘, 但实测发现此值为true则10个producer并发发送和1个producer并发发送的性能是一样的没有提高. 设置为false之后提高producer并发则可获得性能倍速提高, 并且单个producer的发送性能并没有下降.

  3. 启用mKahaDB, ActiveMQ为了减少打开的文件描述符数量, 默认是用一个KahaDB实例来持久化消息, 但是在磁盘性能比较好的情况下, 一个kahaDB实例发挥不出磁盘的潜力, 启用多个kahaDB后性能可以获得倍速增长. 可以按queue名字的pattern来设置多个kahaDB实例, 也可以使用perDestination="true"设置每个queue一个kahaDB实例, 但这个参数也有坑, 如果destination名字超过了42个字符串, 则会被截断, 发送会报不可恢复的错. 可解决的办法是手动分好destination使用的kahadb, 但是这个配置后续不能动态改了, 只能新开Broker然后迁移. 否则会重启后如果分配规则改变导致分配到了不同的kahadb, 则之前的数据不会被消费.

    http://sigreen.github.io/2016/02/10/amq-tuning.html

    https://activemq.apache.org/kahadb#multim-kahadb-persistence-adapter

Macos Docker container连接宿主机172.17.0.1的办法

作者 hanjm
2018年12月16日 00:00

在Linux docker container里面, 如果想访问宿主机上的服务, 用 172.17.0.1 这个host即可.

今天在Mac上的 dockercontainer里面启动一个服务, 这个服务需要连我主机上的MySQL, 用 172.17.0.1 是访问不了的, Connection refused.

root@d99939cc53fc:/tmp# curl 172.17.0.1:3306
curl: (7) Failed to connect to 172.17.0.1 port 3306: Connection refused

但是看网络结构, 和Linux的一样, 也是在172.17段下的.

root@d99939cc53fc:/tmp# ip addr
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
2: tunl0@NONE: <NOARP> mtu 1480 qdisc noop state DOWN group default qlen 1
link/ipip 0.0.0.0 brd 0.0.0.0
3: ip6tnl0@NONE: <NOARP> mtu 1452 qdisc noop state DOWN group default qlen 1
link/tunnel6 :: brd ::
6: eth0@if7: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
inet 172.17.0.2/16 brd 172.17.255.255 scope global eth0
valid_lft forever preferred_lft forever

不得其解, Google之, 发现有个隐藏奥秘, https://stackoverflow.com/questions/38504890/docker-for-mac-1-12-0-how-to-connect-to-host-from-container 问题下有人在 Docker Community Edition 17.06.0-ce-mac18, 2017-06-28release notes中发现有

Add an experimental DNS name for the host: docker.for.mac.localhost

这样一条更新日志.

页面搜索docker.for.mac.localhost, 发现在 Docker Community Edition 17.12.0-ce-mac46 2018-01-09 的 release notes中发现有一条相关的更新日志

所以, 结论就是在 container 中应该用 docker.for.mac.host.internal 来访问宿主机.

于是用curl看一下端口通不通, 果然通.

root@d99939cc53fc:/tmp# curl docker.for.mac.host.internal:3306
5.7.21Bf

Nginx With gRPC编译安装

作者 hanjm
2018年12月15日 00:00

之前写过nginx HTTP2编译安装的文章, 最近想探索下nginx with gRPC support, 所以更新一下.

yum apt等包管理系统安装的软件有时候比较旧, 导致一些莫名其妙的问题. 最近在给Nginx加HTTP/2模块中, 编译时加上了--with-http_v2_module参数, 但Chrome请求发现还是不是http2, 后面发现是OpenSSL版本太低. 踩过这一坑后, 感觉Linux下部分软件最好还是自己编译安装比较妥, 如果编译过程出错, 搜下错误信息, 一般是基础依赖没有安装, 很好解决.

官方的源码编译指南
https://nginx.org/en/docs/configure.html
https://nginx.org/en/docs/http/ngx_http_v2_module.html (这里写了需要OpenSSL1.0.2以上版本), 很多选项都有合适的默认值, 比如–prefix=/usr/local/nginx, 所以只需要指定自己需要的字段

--user=www-data // 习惯将web相关的服务以www-data用户运行, 如没有此用户可以创建一个也可不加此项按默认nobody用户
--group=www-data
--with-http_v2_module // 默认选项不带http2
--with-http_ssl_module // 默认选项不带ssl, 上http2必须要上ssl的
--with-stream // https://nginx.org/en/docs/stream/ngx_stream_core_module.html
--with-openssl // 指定OpenSSL
--with-pcre=./pcre-8.40 // 需要(version 4.4 — 8.40)的pcre,注意Nginx不支持pcre2
--with-pcre-jit // 打开pcre JIT支持
--with-zlib=./zlib-1.2.11 // 需要(version 1.1.3 — 1.2.11)的zlib以支持gzip

1.官网下载Nginx包

cd /usr/local
wget https://nginx.org/download/nginx-1.14.2.tar.gz
tar -zxf nginx-1.14.2.tar.gz
cd nginx-1.14.2

2.[官网下载OpenSSL 1.0.2以上版本].https://github.com/openssl/openssl/releases

cd nginx-1.14.2
wget https://github.com/openssl/openssl/archive/OpenSSL_1_1_0e.tar.gz
tar -zxf OpenSSL_1_1_0e.tar.gz

2.官网下载pcre

注意Nginx不支持pcre2,下载pcre最新版即可. 解压到Nginx解压的目录

cd nginx-1.14.2
wget https://ftp.pcre.org/pub/pcre/pcre-8.40.tar.gz
tar -zxf pcre-8.40.tar.gz

4.官网下载zlib(version 1.1.3 — 1.2.11)

cd nginx-1.14.2
wget http://zlib.net/zlib-1.2.11.tar.gz
tar -zxf zlib-1.2.11.tar.gz

5.编译并安装

./configure \
--user=nginx \
--group=nginx \
--conf-path=/etc/nginx/nginx.conf \
--with-http_v2_module \
--with-http_ssl_module \
--with-stream \
--with-openssl=./openssl-OpenSSL_1_1_0e \
--with-pcre=./pcre-8.40 --with-pcre-jit \
--with-zlib=./zlib-1.2.11
make && make install

6.为了方便操作,软链/usr/local/nginx/sbin/nginx到/usr/local/bin

ln -sf /usr/local/nginx/sbin/nginx /usr/local/bin

Go如何优雅地错误处理(Error Handling and Go 1)

作者 hanjm
2018年7月8日 00:00

Go的错误处理一直被吐槽太繁琐, 作为主要用GO的攻城狮, 经常写 if err!=nil, 但是如果想偷懒, 少带了上下文信息, 直接写 if err!=nil { return err} 或者 fmt.Errorf 携带的上下文信息太少了的话, 看到错误日志也会一脸懵逼, 难以定位问题.
官方在 2011 年就发过一篇博客教大家如何在Go中处理error https://blog.golang.org/error-handling-and-go , error 是一个内建的 interface, 鼓励大家用好自定义错误类型, 常用的范式有三种:

  • 一是用 errors.New(str string) 定义错误常量, 让调用方去判断返回的 err 是否等于这个常量, 来进行区分处理;
  • 二是用 fmt.Errorf(fmt string, args... interface{}) 增加一些上下文信息, 用文字的方式告诉调用方哪里出错了, 让调用方打错误日志出来;
  • 三是自定义 struct type , 实现 error 接口, 调用方用类型断言转成特定的 struct type , 拿到更结构化的错误信息.

我最开始最常用的做法是, fmt.Errorf 时写上 此函数函数名、调用出错的函数名、参数是什么、err , 代码十分啰嗦, 而且通常打日志是在上层函数打的, 看到错误日志还需要用函数名去代码中搜索看看在哪里出错. 业务代码调用层级一多,非常麻烦. 很多情况下我既想带上下文信息, 又想在上层调用方取得最里层出错的函数返回的error常量或自定义的 struct type, 最好还能自动带上行号函数名信息, 减少每次写 fmt.Errof 的手动写上函数名的痛苦. 于是开始在 github 找包, star 数最高的是 pkg/errorsjuju/errors.

  • pkg/errors 解决了一些问题, 核心函数是 Wrapf 和 Cause: Wrapf包装错误附加上下文信息并带上调用栈, 但是每次去包装错误的时候都去取一次调用栈, 完全没有必要啊, 因为最早出错的函数里就能拿到完整的调用栈的, 并且调用栈打出来的信息也不好看, 而且通常HTTP服务会用框架, 用了框架的话调用栈就会肿起来, 这些框架的固定调用栈信息打印出来毫无帮助. Cause 去递归拿到最里层的 error, 用于和error常量比较或类型断言成自定义 struct type.
// Wrapf returns an error annotating err with a stack trace
// at the point Wrapf is call, and the format specifier.
// If err is nil, Wrapf returns nil.
func Wrapf(err error, format string, args ...interface{}) error {
if err == nil {
return nil
}
err = &withMessage{
cause: err,
msg: fmt.Sprintf(format, args...),
}
return &withStack{
err,
callers(),
}
}

// Cause returns the underlying cause of the error, if possible.
// An error value has a cause if it implements the following
// interface:
//
// type causer interface {
// Cause() error
// }
//
// If the error does not implement Cause, the original error will
// be returned. If the error is nil, nil will be returned without further
// investigation.
func Cause(err error) error {
type causer interface {
Cause() error
}

for err != nil {
cause, ok := err.(causer)
if !ok {
break
}
err = cause.Cause()
}
return err
}
  • juju/errors API非常复杂, 包装的error的函数就有三个 func Annotatef(other error, format string, args ...interface{}) errorfunc Maskf(other error, format string, args ...interface{}) errorfunc Wrapf(other, newDescriptive error, format string, args ...interface{}) error … , 每次包装时都会SetLocation, 消耗更大, 即时有时不需要打印error string 只需要判断, 它也去用runtime.Caller去拿文件名, 行号; 调用栈打出来的信息也不好看.
// SetLocation records the source location of the error at callDepth stack
// frames above the call.
func (e *Err) SetLocation(callDepth int) {
_, file, line, _ := runtime.Caller(callDepth + 1)
e.file = trimGoPath(file)
e.line = line
}

以上包不满足要求, 只能造轮子了. 两个思想. API要设计的简单, 调用栈要好看 https://github.com/hanjm/errors

  • API简单: 定义error常量只有 errors.New 函数, 兼容标准库的函数, 兼容很重要; 包装error的只有 errors.Errorf 函数, 只在最早出错的时候取调用栈, 调用方再包装时无需取调用栈, 此时只需要pc, 不需要这时就把文件名行号取出来; 取最里层的 error 只有 errors.GetInnerMost, 用于和 error 常量比较或类型断言成自定义 struct type分类处理.
  • 调用栈好看: 去掉标准包的调用栈, 去掉框架固定的调用栈信息(通常是github.com的包), 只保留业务逻辑的调用栈. 按[ 文件名:行号 函数名:message]分行格式化输出, 把调用栈和附加的message对应起来. (第一版格式是[文件名:行号 函数名:message], 没有空格, 后面有个同事说在Goland IDE里看panic信息时可以点击定位到源码, 你的包能不能加这个功能, 所以去研究了下, 写了几个print的demo试了下发现如果输出中的文件名前后带空格的话, intellij IDE会自动识别输出中的文件名变成超链接, 所以给 “文件名:行号” 前后加了空格, 就能在IDE中直接点击定位到源码对应的行, 非常地方便, 感谢这位同事)

在IDE中加个live template, 写errf回车就补全到

if err!=nil {
err = errors.Errorf(err,"{{光标}}")
return
}

然后补充必要的注释和参数就行了, 在本地环境调试时看到错误日志点击就可以定位到源码, 在非本地环境跑看到错误日志相比之前也能更好地知道发生了什么, 复制文件名:行号到IDE中就能定位到源码, 大大减轻了错误处理的繁琐.

深入理解NATS & NATS Streaming (踩坑记)

作者 hanjm
2018年2月17日 00:00

简介

NATS Server是一个高性能的, cloud native的, 基于发布订阅机制的消息系统, 没有消息持久化功能.
NATS Streaming Server是基于NATS Server的, 增加消息持久化功能的消息系统.

NATS Streaming 持久化特性踩坑记

官网的文档并不详细, 很多重要的技术细节没说, 看了官网的文档之后发现用法很简单, 然后直接去写代码, 写publisher代码没什么问题, 写subscriber代码也能正常工作. 但是subscriber一重启, 重启后重启期间publisher发的消息不会继续收到, 说好的持久化呢? 我把官网的文档翻了遍也没找到答案. 最后在项目的readme.md中找到了答案: 要让subscriber重启后能继续收到重启期间发过来的消息且不重复消息, 必须在调用Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)订阅时设置一样的durableName, 且重启后连接时Connect(stanClusterID, clientID string, options ...Option) (Conn, error)ClusterID、clientID不能变.

要想理解NATS和NATS Streaming的特性, server和client的readme文档都需要仔细阅读, 特别是nats-streaming服务端的readme. 代码也值得阅读研究.

重要特性说明

  1. 当subject没有被订阅时, 消息会被直接丢弃, 所以重启订阅者会丢消息, 解决办法: 要么开2个以上客户端实例, 组成队列订阅QueueSubscribe, 要么换NATS Streaming.
  2. clientID和durableName对于NATS Streaming非常重要. 要让subscriber重启后能继续收到重启期间发过来的消息且不重复消息, 必须在调用Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error)订阅时设置一样的durableName, 调用Connect(stanClusterID, clientID string, options ...Option) (Conn, error)连接时ClusterID、clientID不能变. 程序关闭时应该使用Close而不是Unsubscribe, Unsubscribe()会删除在server端删除该持久化订阅.

    This client ID links a given connection to its published messages, subscriptions, especially durable subscriptions. Indeed, durable subscriptions are stored as a combination of the client ID and durable name.
    If an application wishes to resume message consumption from where it previously stopped, it needs to create a durable subscription. It does so by providing a durable name, which is combined with the client ID provided when the client created its connection. The server then maintain the state for this subscription even after the client connection is closed.
    Note: The starting position given by the client when restarting a durable subscription is ignored.
    When the application wants to stop receiving messages on a durable subscription, it should close - but not unsubscribe- this subscription. If a given client library does not have the option to close a subscription, the application should close the connection instead.
    When the application wants to delete the subscription, it must unsubscribe it. Once unsubscribed, the state is removed and it is then possible to re-use the durable name, but it will be considered a brand new durable subscription, with the start position being the one given by the client when creating the durable subscription.

  1. NATS连接时可以设置客户端的名字, 这样在monitor界面中的/connz就能方便地看到各个客户端的统计数据.

    // Options that can be passed to Connect.   // Name is an Option to set the client name. func Name(name string) Option {
    return func(o *Options) error {
    o.Name = name
    return nil
    }
    }

    type ConnInfo struct {
    Cid uint64 `json:"cid"`
    IP string `json:"ip"`
    Port int `json:"port"`
    Start time.Time `json:"start"`
    LastActivity time.Time `json:"last_activity"`
    Uptime string `json:"uptime"`
    Idle string `json:"idle"`
    Pending int `json:"pending_bytes"`
    InMsgs int64 `json:"in_msgs"`
    OutMsgs int64 `json:"out_msgs"`
    InBytes int64 `json:"in_bytes"`
    OutBytes int64 `json:"out_bytes"`
    NumSubs uint32 `json:"subscriptions"`
    Name string `json:"name,omitempty"`
    Lang string `json:"lang,omitempty"`
    Version string `json:"version,omitempty"`
    TLSVersion string `json:"tls_version,omitempty"`
    TLSCipher string `json:"tls_cipher_suite,omitempty"`
    AuthorizedUser string `json:"authorized_user,omitempty"`
    Subs []string `json:"subscriptions_list,omitempty"`
    }
  2. 使用.来分隔subject的级别. NATS允许subject包含斜杠/符号, 但NATS Streaming不允许, 因为NATS Streaming持久化时会使用subject名字来作为文件夹名,

    • NATS的subject可以为任意不为空的字符串, 具体的subject不能包含通配符’*’和’>’.
    • NATS Streaming的subject不能为空, 首尾不能为点’.’, 不能包含两个连续的点’.’, 由于暂时不支持通配符订阅功能, 所以不能包含’*’和’>’.
  3. NATS Streaming Server实际上是内嵌了一个NATS Server, 自己作为NATS的客户端. NATS Streaming的客户端实际上没有和NATS Streaming Server直接连接, 而是连接内嵌的NATS Server, NATS Streaming Server通过订阅客户端的心跳来知道NATS Streaming客户端连接有没有断开. 所以它强烈建议客户端退出程序时主动Close.
  4. NATS可以热重新加载配置, 发送SIGHUP信号或gnatsd -sl reload即可.
  5. 开发环境可以加-V参数了解NATS, 生产环境就没必要了, 否则会把发过来的消息全打在日志里.
  6. 你甚至可以用NATS的client包publish消息到NATS Streaming, NATS的client可以subscribe, 但NATS Streaming的client无法subscribe, 因为内部的subject变了. 最好不用混用, 容易出问题.
  7. NATS Streaming客户端连接时提供的ClusterID和服务端启动配置的ClusterID不一致时会报, 有人表示费解吐槽过, https://github.com/nats-io/nats-streaming-server/issues/309, 但官方解释说没有问题, Timeout也说的通.

    If you provide a cluster ID not used by any of the servers in the network, no server will respond to the client, hence the timeout error message from the client library. If anything, this is an error message that needs to be updated in the client libraries, not in the server.

  8. ChanSubscribe方式的客户端优雅关闭, 等待消息处理完成.

    package main

    import (
    "fmt"
    "os" "syscall" "os/signal" "github.com/nats-io/go-nats" "sync" )

    func main() {
    n, err := nats.Connect("nats://127.0.0.1:7222",
    nats.Name("test_client"),
    nats.UserInfo("", ""))
    if err != nil {
    panic(err)
    }
    subject := "test"
    msgCh := make(chan *nats.Msg, nats.DefaultMaxChanLen)
    _, err = n.ChanSubscribe(subject, msgCh)
    if err != nil {
    panic(err)
    }
    wg := sync.WaitGroup{}
    for i := 0; i < 2; i++ {
    wg.Add(1)
    go func() {
    defer wg.Done()
    // msg handler
    for msg := range msgCh {
    fmt.Printf("%s\n", msg.Data)
    }
    }()
    }
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGQUIT,
    syscall.SIGTERM,
    syscall.SIGINT,
    syscall.SIGUSR1,
    syscall.SIGUSR2)
    select {
    case <-quit:
    defer wg.Wait()
    // close msgCh and wait process ok
    close(msgCh)
    n.Flush()
    n.Close()
    }
    }

NATS代码中的技巧

  1. 很有用的Go风格的可选参数设计模式, 很多地方见过.

    // Option is a function on the options for a connection. 
    type Option func(*Options) error

    // Options can be used to create a customized connection.
    type Options struct {
    Url string
    ...
    User string
    Password string
    }
    var DefaultOptions = Options{
    AllowReconnect: true,
    MaxReconnect: DefaultMaxReconnect,
    ReconnectWait: DefaultReconnectWait,
    Timeout: DefaultTimeout,
    PingInterval: DefaultPingInterval,
    MaxPingsOut: DefaultMaxPingOut,
    SubChanLen: DefaultMaxChanLen,
    ReconnectBufSize: DefaultReconnectBufSize,
    Dialer: &net.Dialer{
    Timeout: DefaultTimeout,
    }, }

    // Connect will attempt to connect to the NATS system.
    // The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
    // Comma separated arrays are also supported, e.g. urlA, urlB.
    // Options start with the defaults but can be overridden.
    func Connect(url string, options ...Option) (*Conn, error) {
    opts := DefaultOptions
    opts.Servers = processUrlString(url)
    for _, opt := range options {
    if err := opt(&opts); err != nil {
    return nil, err
    }
    }
    return opts.Connect()
    }

    // Options that can be passed to Connect. // Name is an Option to set the client name. func Name(name string) Option {
    return func(o *Options) error {
    o.Name = name
    return nil
    }
    }
  2. 使用ringBuffer限制消息数量

    You can view a message log as a ring buffer. Messages are appended to the end of the log. If a limit is set globally for all channels, or specifically for this channel, when the limit is reached, older messages are removed to make room for the new ones.
  3. 用reflect来绑定任意类型的chan

    chVal := reflect.ValueOf(channel)
    if chVal.Kind() != reflect.Chan {
    return ErrChanArg
    }
    val, ok := chVal.Recv()
    if !ok {
    // Channel has most likely been closed.
    return
    }

Go如何精确计算小数-Decimal研究-Tidb MyDecimal问题

作者 hanjm
2017年8月27日 00:00

1 浮点数为什么不精确

先看两个case

// case1: 135.90*100 ====
// float32
var f1 float32 = 135.90
fmt.Println(f1 * 100) // output:13589.999
// float64
var f2 float64 = 135.90
fmt.Println(f2 * 100) // output:13590

浮点数在单精度下, 135.9*100即出现了偏差, 双精度下结果正确.

// case2: 0.1 add 10 times ===
// float32
var f3 float32 = 0
for i := 0; i < 10; i++ {
f3 += 0.1
}
fmt.Println(f3) //output:1.0000001

// float64
var f4 float64 = 0
for i := 0; i < 10; i++ {
f4 += 0.1
}
fmt.Println(f4) //output:0.9999999999999999

0.1加10次, 这下无论是float32和float64都出现了偏差.

为什么呢, Go和大多数语言一样, 使用标准的IEEE754表示浮点数, 0.1使用二进制表示结果是一个无限循环数, 只能舍入后表示, 累加10次之后就会出现偏差.

此外, 还有几个隐藏的坑https://play.golang.org/p/bQPbirROmN

  1. float32和float64直接互转会精度丢失, 四舍五入后错误.
  2. int64转float64在数值很大的时候出现偏差.
  3. 合理但须注意: 两位小数乘100强转int, 比期望值少了1.
package main

import (
"fmt"
)

func main() {
// case: float32==>float64
// 从数据库中取出80.45, 历史代码用float32接收
var a float32 = 80.45
var b float64
// 有些函数只能接收float64, 只能强转
b = float64(a)
// 打印出值, 强转后出现偏差
fmt.Println(a) //output:80.45
fmt.Println(b) //output:80.44999694824219
// ... 四舍五入保留小数点后1位, 期望80.5, 结果是80.4

// case: int64==>float64
var c int64 = 987654321098765432
fmt.Printf("%.f\n", float64(c)) //output:987654321098765440

// case: int(float64(xx.xx*100))
var d float64 = 1129.6
var e int64 = int64(d * 100)
fmt.Println(e) //output:112959
}

##2 数据库是怎么做的
MySQL提供了decimal(p,d)/numberlic(p,d)类型的定点数表示法, 由p位数字(不包括符号、小数点)组成, 小数点后面有d位数字, 占p+2个字节, 计算性能会比double/float类型弱一些.

##3 Go代码如何实现Decimal
Java有成熟的标准库java.lang.BigDecimal,Python有标准库Decimal, 可惜GO没有. 在GitHub搜decimal, star数量比较多的是TiDB里的MyDecimal和ithub.com/shopspring/decimal的实现.

  • shopspring的Decimal实现比较简单, 思路是使用十进制定点数表示法, 有多少位小数就小数点后移多少位, value保存移之后的整数, exp保存小数点后的数位个数, number=value*10^exp, 因为移小数点后的整数可能很大, 所以这里借用标准包里的math/big表示这个大整数. exp使用了int32, 所以这个包最多能表示小数点后有32个十进制数位的情况.

    Decimal结构体的定义如下

    // Decimal represents a fixed-point decimal. It is immutable.
    // number = value * 10 ^ exp
    type Decimal struct {
    value *big.Int

    // NOTE(vadim): this must be an int32, because we cast it to float64 during
    // calculations. If exp is 64 bit, we might lose precision.
    // If we cared about being able to represent every possible decimal, we
    // could make exp a *big.Int but it would hurt performance and numbers
    // like that are unrealistic.
    exp int32
    }
  • TiDB里的MyDecimal定义位于github.com/pingcap/tidb/util/types/mydecimal.go, 实现比shopspring的Decimal复杂多了, 也更底层(不依赖math/big), 性能也更好(见下面的benchmark). 其思路是:
    digitsInt保存数字的整数部分数字个数, digitsFrac保存数字的小数部分数字个数, resultFrac保存计算及序列化时保留至小数点后几位, negative标明数字是否为负数, wordBuf是一个定长的int32数组(长度为9), 数字去掉小数点的主体保存在这里, 一个int32有32个bit, 最大值为(2**31-1)2147483647(10个十进制数), 所以一个int32最多能表示9个十进制数位, 因此wordBuf 最多能容纳9*9个十进制数位.

    // MyDecimal represents a decimal value.
    type MyDecimal struct {
    digitsInt int8 // the number of *decimal* digits before the point.

    digitsFrac int8 // the number of decimal digits after the point.

    resultFrac int8 // result fraction digits.

    negative bool

    // wordBuf is an array of int32 words.
    // A word is an int32 value can hold 9 digits.(0 <= word < wordBase)
    wordBuf [maxWordBufLen]int32
    }

看看这两种decimal类型在文首的两个case下的结果, 同时跑个分.

main_test.go

package main

import (
"testing"
"github.com/shopspring/decimal"
"github.com/pingcap/tidb/util/types"
"log"
)

var case1String = "135.90"
var case1Bytes = []byte(case1String)
var case2String = "0"
var case2Bytes = []byte("0")

func ShopspringDecimalCase1() decimal.Decimal {
dec1, err := decimal.NewFromString(case1String)
if err != nil {
log.Fatal(err)
}
dec2 := decimal.NewFromFloat(100)
dec3 := dec1.Mul(dec2)
return dec3
}

func TidbDecimalCase1() *types.MyDecimal {
dec1 := new(types.MyDecimal)
err := dec1.FromString(case1Bytes)
if err != nil {
log.Fatal(err)
}
dec2 := new(types.MyDecimal).FromInt(100)
dec3 := new(types.MyDecimal)
err = types.DecimalMul(dec1, dec2, dec3)
if err != nil {
log.Fatal(err)
}
return dec3
}

func ShopspringDecimalCase2() decimal.Decimal {
dec1, err := decimal.NewFromString(case2String)
if err != nil {
log.Fatal(err)
}
dec2 := decimal.NewFromFloat(0.1)
for i := 0; i < 10; i++ {
dec1 = dec1.Add(dec2)
}
return dec1
}

func TidbDecimalCase2() *types.MyDecimal {
dec1 := new(types.MyDecimal)
dec1.FromString(case2Bytes)
dec2 := new(types.MyDecimal)
dec2.FromFloat64(0.1)
for i := 0; i < 10; i++ {
types.DecimalAdd(dec1, dec2, dec1)
}
return dec1

}

// case1: 135.90*100 ====
func BenchmarkShopspringDecimalCase1(b *testing.B) {
for i := 0; i < b.N; i++ {
ShopspringDecimalCase1()
}
b.Log(ShopspringDecimalCase1()) // output: 13590
}

func BenchmarkTidbDecimalCase1(b *testing.B) {
for i := 0; i < b.N; i++ {
TidbDecimalCase1()
}
b.Log(TidbDecimalCase1()) // output: 13590.00
}

// case2: 0.1 add 10 times ===
func BenchmarkShopspringDecimalCase2(b *testing.B) {
for i := 0; i < b.N; i++ {
ShopspringDecimalCase2()
}
b.Log(ShopspringDecimalCase2()) // output: 1
}

func BenchmarkTidbDecimalCase2(b *testing.B) {
for i := 0; i < b.N; i++ {
TidbDecimalCase2()
}
b.Log(TidbDecimalCase2()) // output: 1.0
}
BenchmarkShopspringDecimalCase1-8        2000000               664 ns/op             340 B/op         10 allocs/op

BenchmarkTidbDecimalCase1-8 20000000 99.2 ns/op 48 B/op 1 allocs/op

BenchmarkShopspringDecimalCase2-8 300000 5210 ns/op 4294 B/op 111 allocs/op

BenchmarkTidbDecimalCase2-8 3000000 517 ns/op 83 B/op 3 allocs/op

可见两种实现在上面两个case下表示准确, TiDB的decimal实现的性能高于shopspring的实现, 堆内存分配次数也更少.

##4. MyDecimal的已知问题

用了一段时间后, tidb.MyDecimal也有一些问题

  1. 原版除法有bug, 可以通过除数和被除数同时放大一定倍数临时修复, 更好的解决方法需要官方人员解决, 已提issue, 这个bug真是匪夷所思. https://github.com/pingcap/tidb/issues/4873, 2017.11.3官方修复decimal除法问题:https://github.com/pingcap/tidb/pull/4995/files.
  2. 原版乘法有小问题, 行为不一致, 原版的from1和to不能为同一个指针, 但 Add Sub Div却可以. 可以通过copy参数修复.
  3. 移位小坑, 右移属于扩大数值, 没有问题. 左移有问题, 注意1左移两位不会变成0.01, 所以shift不要传负数.
  4. round, 目前这个库的Round模式ModeHalfEven实际上是ModeHalfUp, 正常的四舍五入, 不是float的ModeHalfEven. 3.5=>4, 4.5=>5, 5.5=>6, 注意后期是否有变更.

DockerContainer下gdb无法正常工作的解决办法

作者 hanjm
2017年8月20日 00:00

昨天想在Mac上使用gdb调试一个Linux下编译的动态链接库, 以正常选项启动一个docker container, 运行gdb却发现如下错误提示.

warning: Error disabling address space randomization: Operation not permitted
Cannot create process: Operation not permitted
During startup program exited with code 127.
(gdb)

在google搜索结果里第6个才找到正确答案, https://www.google.com/search?safe=off&q=docker+gdb+warning%3A+Error+disabling+address+space+randomization%3A+Operation+not+permitted+Cannot+create+process%3A+Operation+not+permitted+During+startup+program+exited+with+code+127&oq=docker+gdb+warning%3A+Error+disabling+address+space+randomization%3A+Operation+not+permitted+Cannot+create+process%3A+Operation+not+permitted+During+startup+program+exited+with+code+127, 原来是docker run中的一个不太常用的选项, docker run –privileged, 加上即可.

于是找官方文档查看此选项的解释, 了解到: 默认docker是以受限模式下运行container, 如不能在container中运行再运行一个docker, 不能访问宿主机上的真实设备, /dev/, gdb无法访问真实的内存设备.

Runtime privilege and Linux capabilities

>--cap-add: Add Linux capabilities
>--cap-drop: Drop Linux capabilities
>--privileged=false: Give extended privileges to this container
>--device=[]: Allows you to run devices inside the container without the --privileged flag.
>
>By default, Docker containers are “unprivileged” and cannot, for example, run a Docker daemon inside a Docker container. This is because by default a container is not allowed to access any devices, but a “privileged” container is given access to all devices (see the documentation on cgroups devices).

>When the operator executes docker run --privileged, Docker will enable access to all devices on the host as well as set some configuration in AppArmor or SELinux to allow the container nearly all the same access to the host as processes running outside containers on the host. Additional information about running with --privileged is available on the Docker Blog.

>If you want to limit access to a specific device or devices you can use the --device flag. It allows you to specify one or more devices that will be accessible within the container.

>

$ docker run –device=/dev/snd:/dev/snd …



Go sync.Pool Slice Benchmark

作者 hanjm
2017年7月2日 00:00

纠结于[]struct还是[]*struct
直接make([]struct,0) 后append 还是 用sync.Pool make([]struct,100)

写段代码跑个分, 结论是

  1. []*struct的要比[]struct多n次取指针的内存分配, 所有更慢, 如果不用修改结构体元素内的值, 没有必要用指针切片
  2. append[]*struct要比[]struct
  3. sync.Pool效果明显

benchmark结果

BenchmarkStructSliceWithoutPool-8                 200000              5458 ns/op           16320 B/op          8 allocs/op
BenchmarkStructPointerSliceWithoutPool-8 200000 6045 ns/op 8504 B/op 109 allocs/op
BenchmarkStructSliceWithPool-8 1000000 1287 ns/op 32 B/op 1 allocs/op
BenchmarkStructPointerSliceWithPool-8 300000 4910 ns/op 6498 B/op 102 allocs/op

benchmark代码

package main

import (
"sync"
"testing"
)

var structSlicePool = sync.Pool{
New: func() interface{} {
return make([]Basic, 100)
},
}

var structPointerSlicePool = sync.Pool{
New: func() interface{} {
return make([]*Basic, 100)
},
}

type Basic struct {
Id, N1, N2, N3, N4, N5 int
Name string
}

func BenchmarkStructSliceWithoutPool(b *testing.B) {
for i := 0; i < b.N; i++ {
var list []Basic
for j := 0; j < 101; j++ {
var data = Basic{Id: j, Name: "Name"}
list = append(list, data)
}
}
}

func BenchmarkStructPointerSliceWithoutPool(b *testing.B) {
for i := 0; i < b.N; i++ {
var list []*Basic
for j := 0; j < 101; j++ {
var data = Basic{Id: j, Name: "Name"}
list = append(list, &data)
}
}
}

func BenchmarkStructSliceWithPool(b *testing.B) {
for i := 0; i < b.N; i++ {
list := structSlicePool.Get().([]Basic)
initLen := len(list)
for j := 0; j < 101; j++ {
var data = Basic{Id: j, Name: "Name"}
if j < initLen {
list[j] = data
} else {
list = append(list, data)
}
}
structSlicePool.Put(list)
}
}

func BenchmarkStructPointerSliceWithPool(b *testing.B) {
for i := 0; i < b.N; i++ {
list := structPointerSlicePool.Get().([]*Basic)
initLen := len(list)
for j := 0; j < 101; j++ {
var data = Basic{Id: j, Name: "Name"}
if j < initLen {
list[j] = &data
} else {
list = append(list, &data)
}
}
structPointerSlicePool.Put(list)
}

}

Go最佳实践

作者 hanjm
2017年6月24日 00:00

来自NSQ

nsq的官方文档的Dsign中提到一个PPThttps://speakerdeck.com/snakes/nsq-nyc-golang-meetup, 里面有这样一段话

总结一下.

  1. don’t be afraid of sync package
    sync包里有
  • sync.Mutex(互斥锁,一读一写)
  • sync.RWMutex(读写锁,可以多读一写)
  • sync.Pool(对象池, 合理利用可以减少内存分配, 降低GC压力, 稍后写一篇博客说说)
  • sync.Once(并发控制. 适用于开几个goroutines去执行一个只执行一次的任务, 比如单例模式)
  • sync.Cond(并发控制, cond.Wait()阻塞至其他goroutie运行到cond.Signal())
  • sync.WaitGroup(并发控制. 通常用法 wg.Add增加任务数量 goroutie完成任务后执行wg.Done,任务数量减1 wg.Wait等待wg任务数量为0)
  1. goroutines are cheap not free
    这句话在其他地方也看过, go func()简单好用, 创建开销也很小, 但也是有开销的. 很多情况下开固定数量worker, 用channel传递数据, 效果会更好.
    go-apns2中的example是个非常好的例子.https://github.com/sideshow/apns2/blob/master/_example/channel/main.go

注意一个问题, go里面一个goroutine panic了, 会导致进程退出, 所以go func()时第一行带上

go func(){
defer func(){
if err:=recover(); err!=nil{
}
}()
}()

是安全的做法, worker channel法时类似

package main

import (
"fmt"
"log"
"time"
)

func main() {
ch := make(chan int, 10)

for i := 0; i < 2; i++ {
go worker(ch, i)
}

for i := 0; i < 3; i++ {
ch <- i
ch <- -1
}

time.Sleep(time.Second * 5)
}

func worker(ch <-chan int, goId int) {

log.Printf("worker%d running", goId)
for data := range ch {
func() {
defer func() {
if err := recover(); err != nil {
log.Printf("worker%d recover error:%s", goId, err)
}
}()
log.Printf("worker%d received data:%d", goId, data)
if data == -1 {
panic(fmt.Errorf("worker%d panic", goId))
}
}()
}
}

fasthttp之所以快, 其中一个原因就是net/http是来一个连接就创建一个goroutie, 而fasthttp用了池复用了goroutines.

  1. watch your allocations (string() is costly, re-user buffers)
    go里面 []byte和string互转是会发生复制的, 开销明显, 如果代码里频繁互转, 考虑使用bytes.buffer 和 sync.Pool

  2. use anonymous structs for arbitrary JSON
    在写http api时, parse body这种事情, 如果只是纯粹取body里的json数据, 没必要单独定义结构体, 在函数里定义一个匿名结构体就好. var s struct { A int}

  3. no built-in per-request HTTP timeouts
    这是说要注意默认的httpClient没有超时

  4. synchronizing goroutine exit is hard - log each cleanup step in long-running goroutines
    同步化的goroutine一不小心就没有退出, 如果你写一个长期运行的服务, 用logger记录每一个goroutine的清理退出, 防止goroutine泄露

  5. select skips nil channels

select语句是会跳过nil的channels的. 因为在Go里往已经close掉的channel里发送数据是会panic的, 可以利用select语句.
附: channel操作导致panic的情况有: 关闭一个nil的channel, 关闭一个已经关闭的channel( j,ok:= <- ch, ok为false时代表ch已经关闭了), 往一个已经关闭的channel里发送数据(从已经关闭的channel里读数据是OK的, 如果这个channel是带缓冲的, 那么可以读到所有数据)

来自GO箴言

Python有import this的zen of Python, 想不到Go也有箴言
https://speakerdeck.com/ajstarks/go-proverbs

  1. 在go里, goroutines之间通信不要用共享内存的方式实现, 应该用channel来实现
  2. 并发不是并行
  3. channel是编排, mutexs是串行
  4. interface定义越多的方法, 抽象程度越低. Go提倡用接口组合的方式实现更大的接口
  5. 零值, 猜测这里说的是struct{}吧, struct{}是一个不占内存的空结构体, 在用map实现set, channel发送无额外意义的signal时能降低内存分配
  6. 提倡gofmt
  7. 一点点复制比一点点依赖好. 官方包里有时能见到一些复制的代码, 这是为了不互相依赖
  8. syscall每个平台实现不一样, 要加build tags
  9. cgo每个平台的lib不一样, 要加build tags
  10. Cgo不是go
  11. unsafe包不提供保障
  12. 简洁胜过高效
  13. error是值 可以用值的方式去处理错误: 传递, 比较
  14. 不用仅检查错误, 要优雅地处理
  15. 多花精力设计架构, 模块命名, 写详细的文档
  16. 写良好的文档给用户
  17. 对于普通错误, 应该用多值返回错误, 而不是手动panic

实践

  1. 写可重复使用的函数, 接收接口类型, 返回具体类型
  2. 写可扩展便于二次包装的函数, 接收接口类型, 返回接口类型. 如标准库的database/sql包

传值还是传指针

  1. 指针仅用于要修改值的场景和反射, 其他场景尽可能地用值传递, 能让变量尽可能分配在栈上, 减少GC压力, 提高性能. 还能减少nil 参考 https://segment.com/blog/allocation-efficiency-in-high-performance-go-services/

Golang Github

https://github.com/golang/go/wiki/CodeReviewComments

GO Logger 日志实践

作者 hanjm
2017年5月19日 00:00

分析一下用过的打印日志的log包

  1. Go标准库自带log, 这个log的func比较少, 没有区分level, 但足够简单, 有prefix功能, 可以设置flag来控制时间格式, caller的文件名和行数, 其它的标准包如 net/http database/sql 等也用了此包.
  2. 对自带的log进行包装, 加入level, 颜色. 如ngaut/log, 这个log star数并不多, 还是从最近很火的一个项目pingcap/tidb里看到的, 有点小清新的感觉, 但这个log可能只是为tidb使用的, 缺少自带log的一些方法, 导致没法使用在一些可以定制logger的第三方库如gorm中. 于是我fork了一下https://github.com/hanjm/log, 增加了一些方法, 以便可以给gorm用.
  3. 完全自己实现的log, 结构化输出, 通常是key=value或json, 有名的有logrus, zap等. 第一次看到logrus感觉美极了, 于是大量使用, 直到在关注tidb的时候收到带日志的issues邮件, 里面的日志带了caller, 感觉很有用, 于是去搜logrus的issue看有没有这个功能, 搜到了一个issuehttps://github.com/sirupsen/logrus/issues/63, 讨论了三年这个功能还没加上, 只好放弃美丽的logrus, 找到了替代品zap, zap的设计非常好, 定制性强. log是经常调用的代码, 每次调用不可避免地要进行内存分配, 分配次数和每次分配的内存大小将影响性能. 对log内容的处理也是一个涉及到性能的点, 像log.Printf参数是interface{}, logrus的field是map[stirng]interface{}, 打印interface{}只能靠reflect, Go是静态强类型语言, 用反射的开销相对比较大, 所以zap使用了手动指定类型的方式, 从zap提供的benchmark上开看, 性能提升还是蛮大的, 虽然相比logrus使用起来更麻烦, 但为了性能, 还是值得的.

总结

  1. 为了方便进行日志分析, 统一用json行日志, 这样用elk时可以免去定制正则表达式存储到elasticSearch的field中.
  2. net/http database/sql 及一些第三方包可能直接使用了标注库的log, 有个trick可以改变所有使用标准包log的行为, 通过log.SetOutput(w io.Writer)来改变位置, w是一个实现了Write(p []byte) (n int, err error)方法的io.Writer即可.
  3. runtime.Caller可以得到调用者的pc, 文件名, 文件行数, runtime.FuncForPC(pc).Name()可以得到pc所在的函数名, 对于debug非常有帮助. 但有一定性能开销, 所以方案是: 对于http server的access log, 没有必要使用带caller的日志, 而对于http api具体实现的函数内的log, 有必要记录caller, 而且光有文件名和行数还不够, 毕竟改了代码行数就变了, 而函数名一般不会变, 带上函数名会更直观.
  4. GitHub搜了一圈, 好多公司都会定制自己的log, 如tidb的ngaut/log, 七牛的qiniu/log, 饿了么的eleme/log, mailgun的mailgun/log, 是的, 我也造一个小轮子zaplog.zaplog是包装了zap, 带caller func name, 兼容logrus stdlog 的日志输出工具.
package zaplog

import (
"bytes"
"fmt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"log"
"runtime"
"strings"
)

// CallerEncoder will add caller to log. format is "filename:lineNum:funcName", e.g:"zaplog/zaplog_test.go:15:zaplog.TestNewLogger"
func CallerEncoder(caller zapcore.EntryCaller, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(strings.Join([]string{caller.TrimmedPath(), runtime.FuncForPC(caller.PC).Name()}, ":"))
}

func newLoggerConfig(debugLevel bool) (loggerConfig zap.Config) {
loggerConfig = zap.NewProductionConfig()
loggerConfig.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
loggerConfig.EncoderConfig.EncodeCaller = CallerEncoder
if debugLevel {
loggerConfig.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
}
return
}

// NewCustomLoggers is a shortcut to get normal logger, noCallerLogger.
func NewCustomLoggers(debugLevel bool) (logger, noCallerLogger *zap.Logger) {
loggerConfig := newLoggerConfig(debugLevel)
logger, err := loggerConfig.Build()
if err != nil {
panic(err)
}
loggerConfig.DisableCaller = true
noCallerLogger, err = loggerConfig.Build()
if err != nil {
panic(err)
}
return
}

// NewLogger return a normal logger
func NewLogger(debugLevel bool) (logger *zap.Logger) {
loggerConfig := newLoggerConfig(debugLevel)
logger, err := loggerConfig.Build()
if err != nil {
panic(err)
}
return
}

// NewNoCallerLogger return a no caller key value, will be faster
func NewNoCallerLogger(debugLevel bool) (noCallerLogger *zap.Logger) {
loggerConfig := newLoggerConfig(debugLevel)
loggerConfig.DisableCaller = true
noCallerLogger, err := loggerConfig.Build()
if err != nil {
panic(err)
}
return
}

// CompatibleLogger is a logger which compatible to logrus/std log/prometheus.
// it implements Print() Println() Printf() Dbug() Debugln() Debugf() Info() Infoln() Infof() Warn() Warnln() Warnf()
// Error() Errorln() Errorf() Fatal() Fataln() Fatalf() Panic() Panicln() Panicf() With() WithField() WithFields()
type CompatibleLogger struct {
_log *zap.Logger
}

// NewCompatibleLogger return CompatibleLogger with caller field
func NewCompatibleLogger(debugLevel bool) *CompatibleLogger {
return &CompatibleLogger{NewLogger(debugLevel).WithOptions(zap.AddCallerSkip(1))}
}

// Print logs a message at level Info on the compatibleLogger.
func (l CompatibleLogger) Print(args ...interface{}) {
l._log.Info(fmt.Sprint(args...))
}

// Println logs a message at level Info on the compatibleLogger.
func (l CompatibleLogger) Println(args ...interface{}) {
l._log.Info(fmt.Sprint(args...))
}

// Printf logs a message at level Info on the compatibleLogger.
func (l CompatibleLogger) Printf(format string, args ...interface{}) {
l._log.Info(fmt.Sprintf(format, args...))
}

// Debug logs a message at level Debug on the compatibleLogger.
func (l CompatibleLogger) Debug(args ...interface{}) {
l._log.Debug(fmt.Sprint(args...))
}

// Debugln logs a message at level Debug on the compatibleLogger.
func (l CompatibleLogger) Debugln(args ...interface{}) {
l._log.Debug(fmt.Sprint(args...))
}

// Debugf logs a message at level Debug on the compatibleLogger.
func (l CompatibleLogger) Debugf(format string, args ...interface{}) {
l._log.Debug(fmt.Sprintf(format, args...))
}

// Info logs a message at level Info on the compatibleLogger.
func (l CompatibleLogger) Info(args ...interface{}) {
l._log.Info(fmt.Sprint(args...))
}

// Infoln logs a message at level Info on the compatibleLogger.
func (l CompatibleLogger) Infoln(args ...interface{}) {
l._log.Info(fmt.Sprint(args...))
}

// Infof logs a message at level Info on the compatibleLogger.
func (l CompatibleLogger) Infof(format string, args ...interface{}) {
l._log.Info(fmt.Sprintf(format, args...))
}

// Warn logs a message at level Warn on the compatibleLogger.
func (l CompatibleLogger) Warn(args ...interface{}) {
l._log.Warn(fmt.Sprint(args...))
}

// Warnln logs a message at level Warn on the compatibleLogger.
func (l CompatibleLogger) Warnln(args ...interface{}) {
l._log.Warn(fmt.Sprint(args...))
}

// Warnf logs a message at level Warn on the compatibleLogger.
func (l CompatibleLogger) Warnf(format string, args ...interface{}) {
l._log.Warn(fmt.Sprintf(format, args...))
}

// Error logs a message at level Error on the compatibleLogger.
func (l CompatibleLogger) Error(args ...interface{}) {
l._log.Error(fmt.Sprint(args...))
}

// Errorln logs a message at level Error on the compatibleLogger.
func (l CompatibleLogger) Errorln(args ...interface{}) {
l._log.Error(fmt.Sprint(args...))
}

// Errorf logs a message at level Error on the compatibleLogger.
func (l CompatibleLogger) Errorf(format string, args ...interface{}) {
l._log.Error(fmt.Sprintf(format, args...))
}

// Fatal logs a message at level Fatal on the compatibleLogger.
func (l CompatibleLogger) Fatal(args ...interface{}) {
l._log.Fatal(fmt.Sprint(args...))
}

// Fatalln logs a message at level Fatal on the compatibleLogger.
func (l CompatibleLogger) Fatalln(args ...interface{}) {
l._log.Fatal(fmt.Sprint(args...))
}

// Fatalf logs a message at level Fatal on the compatibleLogger.
func (l CompatibleLogger) Fatalf(format string, args ...interface{}) {
l._log.Fatal(fmt.Sprintf(format, args...))
}

// Panic logs a message at level Painc on the compatibleLogger.
func (l CompatibleLogger) Panic(args ...interface{}) {
l._log.Panic(fmt.Sprint(args...))
}

// Panicln logs a message at level Painc on the compatibleLogger.
func (l CompatibleLogger) Panicln(args ...interface{}) {
l._log.Panic(fmt.Sprint(args...))
}

// Panicf logs a message at level Painc on the compatibleLogger.
func (l CompatibleLogger) Panicf(format string, args ...interface{}) {
l._log.Panic(fmt.Sprintf(format, args...))
}

// With return a logger with an extra field.
func (l *CompatibleLogger) With(key string, value interface{}) *CompatibleLogger {
return &CompatibleLogger{l._log.With(zap.Any(key, value))}
}

// WithField return a logger with an extra field.
func (l *CompatibleLogger) WithField(key string, value interface{}) *CompatibleLogger {
return &CompatibleLogger{l._log.With(zap.Any(key, value))}
}

// WithFields return a logger with extra fields.
func (l *CompatibleLogger) WithFields(fields map[string]interface{}) *CompatibleLogger {
i := 0
var clog *CompatibleLogger
for k, v := range fields {
if i == 0 {
clog = l.WithField(k, v)
} else {
clog = clog.WithField(k, v)
}
i++
}
return clog
}

// FormatStdLog set the output of stand package log to zaplog
func FormatStdLog() {
log.SetFlags(log.Llongfile)
log.SetOutput(&logWriter{NewNoCallerLogger(false)})
}

type logWriter struct {
logger *zap.Logger
}

// Write implement io.Writer, as std log's output
func (w logWriter) Write(p []byte) (n int, err error) {
i := bytes.Index(p, []byte(":")) + 1
j := bytes.Index(p[i:], []byte(":")) + 1 + i
caller := bytes.TrimRight(p[:j], ":")
// find last index of /
i = bytes.LastIndex(caller, []byte("/"))
// find penultimate index of /
i = bytes.LastIndex(caller[:i], []byte("/"))
w.logger.Info("stdLog", zap.ByteString("caller", caller[i+1:]), zap.ByteString("log", bytes.TrimSpace(p[j:])))
return len(p), nil
}

MacOS下酷工具收集(持续)

作者 hanjm
2017年5月6日 00:00
  1. LaunchRocket. 系统偏好设置中的用户与群组可以添加登录项脚本, 但对于需要root权限启动的应用无解, 脚本会卡在输入密码. LaunchRocket能优雅解决.(当然, 还有一种魔法方法: 用docker加--restart always的container或docker swarm service 跑, docker启动时会自动启动这些container/service) GitHub地址https://github.com/jimbojsb/launchrocket LaunchRocket
  2. Snip. Windows下有Winsnap、FastStone、Snipaste等此等优秀的截图软件, 相比之下, Mac上的截图软件要逊色不少. Winsnap有个很方便的特性是截图可以默认复制到剪切板, 很容易粘贴到其他软件去, Mac上snip也有这个特性, snip是一款腾讯开发的截图软件, 官网http://snip.qq.com/
  3. jietu. 还是腾讯开发的截图软件, 比snip更好用, 也是默认复制到剪切板, 而且提供编辑功能. 官网http://jietu.qq.com
  4. Macdown. 好用优雅免费的Markdown书写工具. GitHub地址https://github.com/MacDownApp/macdown 2018.12.15发现了更棒的Markdown软件Typora, 胜在颜值和文件夹模式.
  5. Homebrew. yum/dnf之于centOS, apt之于Ubuntu, pacman之于ArchLinux, brew之于macOS, brew install简直不能太爽. 官网https://brew.sh/
  6. Homebrew-Cask. brew install不能安装chrome Macdown这样的GUI app, Homebrew-Cask扩展了brew, GitHub地址https://github.com/caskroom/homebrew-cask Homebrew-Cask
  7. privoxy. brew pip npm install、docker pull总是慢如蜗牛? privoxy能将shadowsocks的socks代理(127.0.0.1:1080)转换为http/https代理, 有个奇特的地方是把它把文档写在配置文件的注释里, config文件有2271行, 初让人以为配置起来会巨复杂, 实际上基本的功能两行配置即可. listen-address配置为0.0.0.0:8118, 局域网内其他设备也可以走此代理:8118. 官网https://www.privoxy.org/

    listen-address  0.0.0.0:8118
    forward-socks5 / localhost:1080 .

    然后在.zshrc或.bashrc中加入一下命令就可以通过proxy dproxy来切换是否在本终端下使用代理了.

    function proxy(){
    export http_proxy=http://127.0.0.1:8118
    export https_proxy=https://127.0.0.1:8118
    export HTTP_PROXY=http://127.0.0.1:8118
    export HTTPS_PROXY=https://127.0.0.1:8118
    }
    function dproxy(){
    unset http_proxy
    unset https_proxy
    unset HTTP_PROXY
    unset HTTPS_PROXY
    }
  8. proxier 可以按进程名指定哪些进程走ss代理, 比如ssh

  9. kcptun. bandwagon上的shadowsocks越来越慢, 不用kcptun加速没法正常使用, 只能不太厚道地超量发包了. GitHub地址https://github.com/xtaci/kcptun
  10. oh-my-zsh的z命令. 手动输入一个很长的路径名不停地tab很麻烦, 配置了oh-my-zsh的话可以启用z命令(edit: ~/.zshrc line: plugins=(git python z tmux)), z 文件夹名 就可以跳转到常用目录中最符合输入文件夹名的文件夹中, 非常方便, GitHub地址https://github.com/robbyrussell/oh-my-zsh
  11. aria2. 非常厉害的下载器, brew install aria2即可. GitHub地址:https://github.com/aria2/aria2 GUI Client:https://github.com/yangshun1029/aria2gui
  12. BaiduExporter. 百度网盘文件导出到aria2下载, GitHub地址https://github.com/acgotaku/BaiduExporter
  13. Chrome插件加白描述文件. 有时需要安装一些在商店下架了的扩展, 比如BaiduExporter, 在扩展页面安装后重启chrome会提示此扩展程序并非来自chrome商店,启用开关灰色无法启用, 可以下载此描述文件将特定扩展id加入白名单. https://hencolle.com/2016/10/16/baidu_exporter/
  14. 坚果云.同步配置文件,pdf书非常方便.
  15. Monaco字体. 非常舒服的等宽代码字体 https://github.com/hanjm/codeFont
  16. Scroll Reverser. 外接鼠标神器, 让触摸板是自然方向, 鼠标是习惯的Windows滚动方向.
  17. Karabiner-Elements. 外接键盘神器, 我就想禁用capsLock键, 让右边的option键变成control键, 能把外接键盘的Windows键映射为command键.
  18. ssh tunnel. ssh隧道管理器, 本地连接远程服务器的kibana nsqAdmin mysql简直不要太爽

Mysql 连接池问题

作者 hanjm
2017年4月26日 00:00

最近应用日志里发现了mysql偶尔会出现问题

[mysql] 2017/04/26 10:01:05 packets.go:130: write tcp 127.0.0.1:56346->127.0.0.1:3306: write: broken pipe
[mysql] 2017/04/26 10:01:05 packets.go:130: write tcp 127.0.0.1:56346->127.0.0.1:3306: write: broken pipe
[mysql] 2017/04/26 10:01:05 packets.go:130: write tcp 127.0.0.1:56350->127.0.0.1:3306: write: broken pipe
[mysql] 2017/04/26 10:01:05 packets.go:130: write tcp 127.0.0.1:56350->127.0.0.1:3306: write: broken pipe

找GitHub issues, 提到了和mysql的wait_timeout变量有关系, https://github.com/go-sql-driver/mysql/issues/446, 于是找MySQL文档https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#idm140549060476496.

相关说明如下:
The number of seconds the server waits for activity on a noninteractive connection before closing it.
On thread startup, the session wait_timeout value is initialized from the global wait_timeout value or from the global interactive_timeout value, depending on the type of client (as defined by the CLIENT_INTERACTIVE connect option to mysql_real_connect()). See also interactive_timeout.

默认是28800s, 8小时.

mysql> show variables like '%wait_timeout%';                                                                                                                                                                                                                      
+--------------------------+----------+
| Variable_name | Value |
+--------------------------+----------+
| innodb_lock_wait_timeout | 50 |
| lock_wait_timeout | 31536000 |
| wait_timeout | 28800 |
+--------------------------+----------+
3 rows in set (0.00 sec)

解决办法:

db.SetConnMaxLifetime(time.Hour*7)

❌
❌