<broker> <destinationPolicy> <policyMap> <policyEntries> <!-- Set the following policy on all queues using the '>' wildcard --> <policyEntryqueue=">"> <deadLetterStrategy> <!-- Use the prefix 'DLQ.' for the destination name, and make the DLQ a queue rather than a topic --> <individualDeadLetterStrategyqueuePrefix="DLQ."useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> </broker>
默认非持久化的topic不会进入到死信队列中, 如果需要, 则修改activemq.xml, 加入
<!-- Tell the dead letter strategy to also place non-persisted messages onto the dead-letter queue if they can't be delivered. --> <deadLetterStrategy> <...processNonPersistent="true" /> </deadLetterStrategy>
// go test -bench Publish -benchmem // go test -bench Sub -benchmem funcBenchmarkStompPublish(b *testing.B) { conn, err := stomp.Dial("tcp", "127.0.0.1:61613") if err != nil { log.Fatal(err) return } defer conn.Disconnect()
b.N = pubMsgCount b.ReportAllocs() b.ResetTimer()
for i := 0; i < b.N; i++ { err = conn.Send( stompDestination, // destination "text/plain", // content-type msgData) // body if err != nil { log.Error(err) return } } }
Add an experimental DNS name for the host: docker.for.mac.localhost
这样一条更新日志.
页面搜索docker.for.mac.localhost, 发现在 Docker Community Edition 17.12.0-ce-mac46 2018-01-09 的 release notes中发现有一条相关的更新日志
DNS name docker.for.mac.host.internal should be used instead of docker.for.mac.localhost (still valid) for host resolution from containers, since since there is an RFC banning the use of subdomains of localhost. See https://tools.ietf.org/html/draft-west-let-localhost-be-localhost-06.
// Wrapf returns an error annotating err with a stack trace // at the point Wrapf is call, and the format specifier. // If err is nil, Wrapf returns nil. funcWrapf(err error, format string, args ...interface{})error { if err == nil { returnnil } err = &withMessage{ cause: err, msg: fmt.Sprintf(format, args...), } return &withStack{ err, callers(), } }
// Cause returns the underlying cause of the error, if possible. // An error value has a cause if it implements the following // interface: // // type causer interface { // Cause() error // } // // If the error does not implement Cause, the original error will // be returned. If the error is nil, nil will be returned without further // investigation. funcCause(err error)error { type causer interface { Cause() error }
for err != nil { cause, ok := err.(causer) if !ok { break } err = cause.Cause() } return err }
// SetLocation records the source location of the error at callDepth stack // frames above the call. func (e *Err) SetLocation(callDepth int) { _, file, line, _ := runtime.Caller(callDepth + 1) e.file = trimGoPath(file) e.line = line }
This client ID links a given connection to its published messages, subscriptions, especially durable subscriptions. Indeed, durable subscriptions are stored as a combination of the client ID and durable name. If an application wishes to resume message consumption from where it previously stopped, it needs to create a durable subscription. It does so by providing a durable name, which is combined with the client ID provided when the client created its connection. The server then maintain the state for this subscription even after the client connection is closed. Note: The starting position given by the client when restarting a durable subscription is ignored. When the application wants to stop receiving messages on a durable subscription, it should close - but not unsubscribe- this subscription. If a given client library does not have the option to close a subscription, the application should close the connection instead. When the application wants to delete the subscription, it must unsubscribe it. Once unsubscribed, the state is removed and it is then possible to re-use the durable name, but it will be considered a brand new durable subscription, with the start position being the one given by the client when creating the durable subscription.
// Options that can be passed to Connect. // Name is an Option to set the client name. func Name(name string) Option { return func(o *Options) error { o.Name = name return nil } }
type ConnInfo struct { Cid uint64 `json:"cid"` IP string `json:"ip"` Port int `json:"port"` Start time.Time `json:"start"` LastActivity time.Time `json:"last_activity"` Uptime string `json:"uptime"` Idle string `json:"idle"` Pending int `json:"pending_bytes"` InMsgs int64 `json:"in_msgs"` OutMsgs int64 `json:"out_msgs"` InBytes int64 `json:"in_bytes"` OutBytes int64 `json:"out_bytes"` NumSubs uint32 `json:"subscriptions"` Name string `json:"name,omitempty"` Lang string `json:"lang,omitempty"` Version string `json:"version,omitempty"` TLSVersion string `json:"tls_version,omitempty"` TLSCipher string `json:"tls_cipher_suite,omitempty"` AuthorizedUser string `json:"authorized_user,omitempty"` Subs []string `json:"subscriptions_list,omitempty"` }
If you provide a cluster ID not used by any of the servers in the network, no server will respond to the client, hence the timeout error message from the client library. If anything, this is an error message that needs to be updated in the client libraries, not in the server.
func main() { n, err := nats.Connect("nats://127.0.0.1:7222", nats.Name("test_client"), nats.UserInfo("", "")) if err != nil { panic(err) } subject := "test" msgCh := make(chan *nats.Msg, nats.DefaultMaxChanLen) _, err = n.ChanSubscribe(subject, msgCh) if err != nil { panic(err) } wg := sync.WaitGroup{} for i := 0; i < 2; i++ { wg.Add(1) go func() { defer wg.Done() // msg handler for msg := range msgCh { fmt.Printf("%s\n", msg.Data) } }() } quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2) select { case <-quit: defer wg.Wait() // close msgCh and wait process ok close(msgCh) n.Flush() n.Close() } }
NATS代码中的技巧
很有用的Go风格的可选参数设计模式, 很多地方见过.
// Option is a function on the options for a connection. type Option func(*Options) error
// Options can be used to create a customized connection. type Options struct { Url string ... User string Password string } var DefaultOptions = Options{ AllowReconnect: true, MaxReconnect: DefaultMaxReconnect, ReconnectWait: DefaultReconnectWait, Timeout: DefaultTimeout, PingInterval: DefaultPingInterval, MaxPingsOut: DefaultMaxPingOut, SubChanLen: DefaultMaxChanLen, ReconnectBufSize: DefaultReconnectBufSize, Dialer: &net.Dialer{ Timeout: DefaultTimeout, }, }
// Connect will attempt to connect to the NATS system. // The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222 // Comma separated arrays are also supported, e.g. urlA, urlB. // Options start with the defaults but can be overridden. func Connect(url string, options ...Option) (*Conn, error) { opts := DefaultOptions opts.Servers = processUrlString(url) for _, opt := range options { if err := opt(&opts); err != nil { return nil, err } } return opts.Connect() }
// Options that can be passed to Connect. // Name is an Option to set the client name. func Name(name string) Option { return func(o *Options) error { o.Name = name return nil } }
使用ringBuffer限制消息数量
You can view a message log as a ring buffer. Messages are appended to the end of the log. If a limit is set globally for all channels, or specifically for this channel, when the limit is reached, older messages are removed to make room for the new ones.
用reflect来绑定任意类型的chan
chVal := reflect.ValueOf(channel) if chVal.Kind() != reflect.Chan { return ErrChanArg } val, ok := chVal.Recv() if !ok { // Channel has most likely been closed. return }
// Decimal represents a fixed-point decimal. It is immutable. // number = value * 10 ^ exp type Decimal struct { value *big.Int
// NOTE(vadim): this must be an int32, because we cast it to float64 during // calculations. If exp is 64 bit, we might lose precision. // If we cared about being able to represent every possible decimal, we // could make exp a *big.Int but it would hurt performance and numbers // like that are unrealistic. exp int32 }
warning: Error disabling address space randomization: Operation not permitted Cannot create process: Operation not permitted During startup program exited with code 127. (gdb)
>--cap-add: Add Linux capabilities >--cap-drop: Drop Linux capabilities >--privileged=false: Give extended privileges to this container >--device=[]: Allows you to run devices inside the container without the --privileged flag. >
>By default, Docker containers are “unprivileged” and cannot, for example, run a Docker daemon inside a Docker container. This is because by default a container is not allowed to access any devices, but a “privileged” container is given access to all devices (see the documentation on cgroups devices).
>When the operator executes docker run --privileged, Docker will enable access to all devices on the host as well as set some configuration in AppArmor or SELinux to allow the container nearly all the same access to the host as processes running outside containers on the host. Additional information about running with --privileged is available on the Docker Blog.
>If you want to limit access to a specific device or devices you can use the --device flag. It allows you to specify one or more devices that will be accessible within the container.
type Basic struct { Id, N1, N2, N3, N4, N5 int Name string }
funcBenchmarkStructSliceWithoutPool(b *testing.B) { for i := 0; i < b.N; i++ { var list []Basic for j := 0; j < 101; j++ { var data = Basic{Id: j, Name: "Name"} list = append(list, data) } } }
funcBenchmarkStructPointerSliceWithoutPool(b *testing.B) { for i := 0; i < b.N; i++ { var list []*Basic for j := 0; j < 101; j++ { var data = Basic{Id: j, Name: "Name"} list = append(list, &data) } } }
funcBenchmarkStructSliceWithPool(b *testing.B) { for i := 0; i < b.N; i++ { list := structSlicePool.Get().([]Basic) initLen := len(list) for j := 0; j < 101; j++ { var data = Basic{Id: j, Name: "Name"} if j < initLen { list[j] = data } else { list = append(list, data) } } structSlicePool.Put(list) } }
funcBenchmarkStructPointerSliceWithPool(b *testing.B) { for i := 0; i < b.N; i++ { list := structPointerSlicePool.Get().([]*Basic) initLen := len(list) for j := 0; j < 101; j++ { var data = Basic{Id: j, Name: "Name"} if j < initLen { list[j] = &data } else { list = append(list, &data) } } structPointerSlicePool.Put(list) }
watch your allocations (string() is costly, re-user buffers) go里面 []byte和string互转是会发生复制的, 开销明显, 如果代码里频繁互转, 考虑使用bytes.buffer 和 sync.Pool
use anonymous structs for arbitrary JSON 在写http api时, parse body这种事情, 如果只是纯粹取body里的json数据, 没必要单独定义结构体, 在函数里定义一个匿名结构体就好. var s struct { A int}
no built-in per-request HTTP timeouts 这是说要注意默认的httpClient没有超时
synchronizing goroutine exit is hard - log each cleanup step in long-running goroutines 同步化的goroutine一不小心就没有退出, 如果你写一个长期运行的服务, 用logger记录每一个goroutine的清理退出, 防止goroutine泄露
// CallerEncoder will add caller to log. format is "filename:lineNum:funcName", e.g:"zaplog/zaplog_test.go:15:zaplog.TestNewLogger" funcCallerEncoder(caller zapcore.EntryCaller, enc zapcore.PrimitiveArrayEncoder) { enc.AppendString(strings.Join([]string{caller.TrimmedPath(), runtime.FuncForPC(caller.PC).Name()}, ":")) }
// NewCustomLoggers is a shortcut to get normal logger, noCallerLogger. funcNewCustomLoggers(debugLevel bool)(logger, noCallerLogger *zap.Logger) { loggerConfig := newLoggerConfig(debugLevel) logger, err := loggerConfig.Build() if err != nil { panic(err) } loggerConfig.DisableCaller = true noCallerLogger, err = loggerConfig.Build() if err != nil { panic(err) } return }
// NewLogger return a normal logger funcNewLogger(debugLevel bool)(logger *zap.Logger) { loggerConfig := newLoggerConfig(debugLevel) logger, err := loggerConfig.Build() if err != nil { panic(err) } return }
// NewNoCallerLogger return a no caller key value, will be faster funcNewNoCallerLogger(debugLevel bool)(noCallerLogger *zap.Logger) { loggerConfig := newLoggerConfig(debugLevel) loggerConfig.DisableCaller = true noCallerLogger, err := loggerConfig.Build() if err != nil { panic(err) } return }
// CompatibleLogger is a logger which compatible to logrus/std log/prometheus. // it implements Print() Println() Printf() Dbug() Debugln() Debugf() Info() Infoln() Infof() Warn() Warnln() Warnf() // Error() Errorln() Errorf() Fatal() Fataln() Fatalf() Panic() Panicln() Panicf() With() WithField() WithFields() type CompatibleLogger struct { _log *zap.Logger }
// NewCompatibleLogger return CompatibleLogger with caller field funcNewCompatibleLogger(debugLevel bool) *CompatibleLogger { return &CompatibleLogger{NewLogger(debugLevel).WithOptions(zap.AddCallerSkip(1))} }
// Print logs a message at level Info on the compatibleLogger. func(l CompatibleLogger)Print(args ...interface{}) { l._log.Info(fmt.Sprint(args...)) }
// Println logs a message at level Info on the compatibleLogger. func(l CompatibleLogger)Println(args ...interface{}) { l._log.Info(fmt.Sprint(args...)) }
// Printf logs a message at level Info on the compatibleLogger. func(l CompatibleLogger)Printf(format string, args ...interface{}) { l._log.Info(fmt.Sprintf(format, args...)) }
// Debug logs a message at level Debug on the compatibleLogger. func(l CompatibleLogger)Debug(args ...interface{}) { l._log.Debug(fmt.Sprint(args...)) }
// Debugln logs a message at level Debug on the compatibleLogger. func(l CompatibleLogger)Debugln(args ...interface{}) { l._log.Debug(fmt.Sprint(args...)) }
// Debugf logs a message at level Debug on the compatibleLogger. func(l CompatibleLogger)Debugf(format string, args ...interface{}) { l._log.Debug(fmt.Sprintf(format, args...)) }
// Info logs a message at level Info on the compatibleLogger. func(l CompatibleLogger)Info(args ...interface{}) { l._log.Info(fmt.Sprint(args...)) }
// Infoln logs a message at level Info on the compatibleLogger. func(l CompatibleLogger)Infoln(args ...interface{}) { l._log.Info(fmt.Sprint(args...)) }
// Infof logs a message at level Info on the compatibleLogger. func(l CompatibleLogger)Infof(format string, args ...interface{}) { l._log.Info(fmt.Sprintf(format, args...)) }
// Warn logs a message at level Warn on the compatibleLogger. func(l CompatibleLogger)Warn(args ...interface{}) { l._log.Warn(fmt.Sprint(args...)) }
// Warnln logs a message at level Warn on the compatibleLogger. func(l CompatibleLogger)Warnln(args ...interface{}) { l._log.Warn(fmt.Sprint(args...)) }
// Warnf logs a message at level Warn on the compatibleLogger. func(l CompatibleLogger)Warnf(format string, args ...interface{}) { l._log.Warn(fmt.Sprintf(format, args...)) }
// Error logs a message at level Error on the compatibleLogger. func(l CompatibleLogger)Error(args ...interface{}) { l._log.Error(fmt.Sprint(args...)) }
// Errorln logs a message at level Error on the compatibleLogger. func(l CompatibleLogger)Errorln(args ...interface{}) { l._log.Error(fmt.Sprint(args...)) }
// Errorf logs a message at level Error on the compatibleLogger. func(l CompatibleLogger)Errorf(format string, args ...interface{}) { l._log.Error(fmt.Sprintf(format, args...)) }
// Fatal logs a message at level Fatal on the compatibleLogger. func(l CompatibleLogger)Fatal(args ...interface{}) { l._log.Fatal(fmt.Sprint(args...)) }
// Fatalln logs a message at level Fatal on the compatibleLogger. func(l CompatibleLogger)Fatalln(args ...interface{}) { l._log.Fatal(fmt.Sprint(args...)) }
// Fatalf logs a message at level Fatal on the compatibleLogger. func(l CompatibleLogger)Fatalf(format string, args ...interface{}) { l._log.Fatal(fmt.Sprintf(format, args...)) }
// Panic logs a message at level Painc on the compatibleLogger. func(l CompatibleLogger)Panic(args ...interface{}) { l._log.Panic(fmt.Sprint(args...)) }
// Panicln logs a message at level Painc on the compatibleLogger. func(l CompatibleLogger)Panicln(args ...interface{}) { l._log.Panic(fmt.Sprint(args...)) }
// Panicf logs a message at level Painc on the compatibleLogger. func(l CompatibleLogger)Panicf(format string, args ...interface{}) { l._log.Panic(fmt.Sprintf(format, args...)) }
// With return a logger with an extra field. func(l *CompatibleLogger)With(key string, value interface{}) *CompatibleLogger { return &CompatibleLogger{l._log.With(zap.Any(key, value))} }
// WithField return a logger with an extra field. func(l *CompatibleLogger)WithField(key string, value interface{}) *CompatibleLogger { return &CompatibleLogger{l._log.With(zap.Any(key, value))} }
// WithFields return a logger with extra fields. func(l *CompatibleLogger)WithFields(fields map[string]interface{}) *CompatibleLogger { i := 0 var clog *CompatibleLogger for k, v := range fields { if i == 0 { clog = l.WithField(k, v) } else { clog = clog.WithField(k, v) } i++ } return clog }
// FormatStdLog set the output of stand package log to zaplog funcFormatStdLog() { log.SetFlags(log.Llongfile) log.SetOutput(&logWriter{NewNoCallerLogger(false)}) }
type logWriter struct { logger *zap.Logger }
// Write implement io.Writer, as std log's output func(w logWriter)Write(p []byte)(n int, err error) { i := bytes.Index(p, []byte(":")) + 1 j := bytes.Index(p[i:], []byte(":")) + 1 + i caller := bytes.TrimRight(p[:j], ":") // find last index of / i = bytes.LastIndex(caller, []byte("/")) // find penultimate index of / i = bytes.LastIndex(caller[:i], []byte("/")) w.logger.Info("stdLog", zap.ByteString("caller", caller[i+1:]), zap.ByteString("log", bytes.TrimSpace(p[j:]))) returnlen(p), nil }
相关说明如下: The number of seconds the server waits for activity on a noninteractive connection before closing it. On thread startup, the session wait_timeout value is initialized from the global wait_timeout value or from the global interactive_timeout value, depending on the type of client (as defined by the CLIENT_INTERACTIVE connect option to mysql_real_connect()). See also interactive_timeout.
默认是28800s, 8小时.
mysql> show variables like '%wait_timeout%'; +--------------------------+----------+ | Variable_name | Value | +--------------------------+----------+ | innodb_lock_wait_timeout | 50 | | lock_wait_timeout | 31536000 | | wait_timeout | 28800 | +--------------------------+----------+ 3 rows in set (0.00 sec)