普通视图

发现新文章,点击刷新页面。
昨天以前遥远的街市

状态机与函数式编程(二)

作者 henix
2023年12月26日 00:00

  读了我的上一篇文章的读者可能仍然会觉得状态机,或者说 Puhser 这个概念太抽象,不知道该怎么用。因此我在这篇文章中讲一个实际的例子。

  考虑一个常见的数据汇总问题:有一个文件,每行是一个数据,每行数据中包含一个日期和一些统计数字(比如销售额、用户活跃数、用户付费等),需要按月度汇总这些统计数字,并且输出到另一个文件。

  我们还有一些限制条件和设计要点:

  1. 文件已经按照日期从小到大排序
  2. 输出时也应按日期从小到大排序
  3. 每个时间分组中的数据行的数量并不确定,例如一个月可能有 31 天,也可能有 28 天
  4. 这个文件很大,为了控制内存的使用,不能把所有行读进一个大数组里处理
  5. 扩展性:如果以后要求不仅仅是汇总每月,还要按每周、每季度、每年汇总,能否很方便地支持?能否支持任意自定义的日期汇总方式?
  6. 扩展性2:汇总方法可能不仅仅是求和,还可能是求平均、求中位数、求标准差之类的,能否方便地支持自定义?
  7. 扩展性3:可能在其他部分的代码中也有这种“按时间统计汇总”的需求,能否抽象出一个公共函数 / 类?

  这个问题我们一般称为“resample”,在处理时间序列型数据时非常常见。著名的 Python 数据处理库 pandas 还有专门的教程页面

  回到函数式编程,哪种序列变换函数对这个问题最合适?可以一步步地分解思考这个问题:

  1. 我们需要一个将原始日期变换成汇总后日期的函数,例如将“2023-10-06”变换成“2023-10”,即提取出“月”的部分
  2. 上一步得到的值可以看作一个 key ,我们需要将序列中相邻的且 key 相同的元素放进一个组中
  3. 对每个组进行汇总,然后输出

  可见这里的重点是,如何将序列中相邻的且 key 相同的元素分组,有什么现成的函数吗?

  我首先想到的是,这个需求有点像 GroupBy ,但 GroupBy 跟这个需求有一些不同:

  1. GroupBy 的输出结果是一个 Map[Key, Item[]] ,而我们需要输出另一个序列
  2. GroupBy 是全局的,它会把所有 key 相同的汇总,而这个问题中,我们需要汇总的是 key 相同且相邻的元素

  然后我找到一个看上去很接近的 lo.PartitionBy ,但仔细观察后发现,这个函数会调整元素的顺序,依然不是我们需要的。

  所以只能自己实现了,我称之为“SeqGroupBy”:

type Pusher[T any] interface {
  Push(T)
  Flush()
}

type PusherSeqGroupBy[K comparable, A any] struct {
  next   Pusher[[]A]
  getKey func(A) K
  curKey K
  items  []A
}

func NewPusherSeqGroupBy[K comparable, A any](getKey func(A) K, next Pusher[[]A]) *PusherSeqGroupBy[K, A] {
  return &PusherSeqGroupBy[K, A]{next: next, getKey: getKey}
}

func (t *PusherSeqGroupBy[K, A]) Push(a A) {
  newKey := t.getKey(a)
  if len(t.items) == 0 {
    t.items = append(t.items, a)
    t.curKey = newKey
  } else {
    if newKey == t.curKey {
      t.items = append(t.items, a)
    } else {
      t.next.Push(t.items)
      t.curKey = newKey
      t.items = []A{a}
    }
  }
  return
}

func (t *PusherSeqGroupBy[K, A]) Flush() {
  if len(t.items) > 0 {
    t.next.Push(t.items)
    t.next.Flush()
  }
  t.items = nil
  return
}

  构造一个 PusherSeqGroupBy 需要传入两个参数:

  1. getKey: 从一行数据中提取出 key ,这个函数的实现是,先从一行数据中提取出日期,再把日期变成汇总后的日期,比如按月的话就是将“2023-10-06”变成“2023-10”
  2. next: 数据需要传给的下一个处理环节,类型是 Pusher[[]Row] 。接受一个已经分好组的数据组,汇总然后输出

  我们可以画出程序的数据流图(dataflow diagram)如下:

  按行读文件 → SeqGroupBy 分组 → 每个组放进汇总器 → 每条汇总结果输出

  每个右箭头“→”都对应了一个 Pusher.Push() 的调用。最终可以实现内存中最多持有一个汇总组的数据,做到了随用随销毁。

  程序的剩余部分已经很显然了,我想可以留给读者自己完成。从这个例子也可以看出为什么 Pusher 的接口定义中需要有 Flush 。

  这个例子我们还可以看出基于状态机的,或 push-style 函数式编程的一个特点:如果说基于迭代器的函数式编程是以数据的源(source)为基础,经过层层变换,最终输出到数据的汇(sink);那么基于状态机的函数式编程则是反过来将数据的汇(sink)层层包装,最后接入数据源。当然,实际上我们也可以同时使用这两种风格,既用迭代器把源变形,又用状态机把汇变形,然后在中间的某个地方拼接在一起,这样,我们可以自由地选择最合适的工具。

我的金融投资书单

作者 henix
2023年5月3日 00:00

  在我上大学的时候,我对金融投资漠不关心,后来我慢慢意识到,这些东西就像政治——你不关心政治,政治关心你。

  读了一些历史书,我也知道金融是怎么回事了。自从 1970 年美元与黄金脱钩,世界就进入了主权货币时代。我们每个人,只要使用货币,实际上都会被收一种隐形的铸币税——通货膨胀。历史告诉我们,通胀和通缩都是痛苦的,但通缩更痛苦。只要今天、我这一届政府你好我好,谁会管明天、下一届如何?所以各国政府,实际上总是倾向于货币宽松的,也就是通胀。

  通货膨胀会导致你持有的货币的购买力下降。假设每年货币购买力下降 6% ,看似不多,但累计 30 年之后呢?

  所以在我看来,金融投资是每个人都应该了解的。但投资是一件专业的事,进入市场之前,你应该先学习。我从 2015 年开始投资,根据我最近几年自己投资的经验,这次就推荐一些我认为不错的金融投资类书籍吧。

其他博主

  这里推荐我的入门老师:ETF 拯救世界。个人认为指数投资是最适合普通人的投资方式。

  他的粉丝有时将他尊称为“E大”。我推荐看看:

  1. E 大雪球置顶的干货合集
  2. 微信公众号“精华回顾”栏目,里面有 E 大的投资书籍推荐,还有公众号的精华文章

入门

  • 货币的教训 - 周其仁教授力作,可以进一步了解通货膨胀及其对普通人的影响
  • 股市长线法宝 - E 大推荐,这本书可以告诉你我们为什么应该投资股票以及为什么应该投资指数
  • 机构投资的创新之路 - 建立资产配置的概念,对 E 大的体系是一个很好的补充
  • 投资心理学 - E 大推荐,最好有一定投资经验再看,你会发现里面的各种心理陷阱你也经常犯

进阶

C++ 中如何多线程写日志

作者 henix
2022年7月3日 00:00

  日志是几乎每个程序都需要的核心组件,用过 C++ 的 std::cout 的朋友都知道,它不是线程安全的!所以多个线程同时用 std::cout 输出的话,输出可能会错乱。有没有简单的解决方案?

  如果你在网上搜索,或者去看一些专门的日志库(如 glog)是如何解决这个问题的,不外乎这几种方法:

  1. 加一个全局锁
  2. 加一个多生产者-单消费者队列,其他线程写日志的时候先放入队列,单独开一个线程把队列里的日志一条一条写出来。

  第一种加锁的方案,增加了一个全局状态,我觉得不是很漂亮。方案二就更不用说了……我就是写个日志而已,有必要引入一个队列再加一个线程吗?有没有更简单的方案?

  我写 C++ 程序一般是不用 std::cout 来格式化输出的,而是用更底层的系统调用,也就是 write(2) 或 WriteFile / WriteConsole ,所以我们可以跳过 stdio / iostream 内部状态的同步问题,直接从操作系统层面考虑以下问题:

  如果有多个线程 / 进程同时调用 write(2) 或 WriteFile 写一个文件描述符或内核句柄,它们写入的内容会互相覆盖吗?

  用常识来想,操作系统层面难道没有任何机制来保证写操作的原子性吗?

  于是我们很自然地会问出以下问题:

  上述讨论的结论是:

  1. 对 Linux ,POSIX 规范保证用 O_APPEND 模式打开的文件,如果一次写入的内容不超过 PIPE_BUF(一般为 4096)字节,那么就是原子的1
  2. 对 Win32 的 WriteFile ,如果打开文件时添加了 FILE_APPEND_DATA 参数那么也可以保证追加操作是原子的

  所以我对于多线程写日志的解决方案是:每一行单独写入一个 buffer ,然后一次性调用 write(2) 写入,程序日志绝大多数情况下不会超过 PIPE_BUF 。

  如果你还在用 C/C++ 自带的 IO 函数,它们的内部存在我们无法控制的缓冲区(stdio buffering),这种方法不一定奏效。所以,直接用系统调用保平安。如果你还是想用 C/C++ 自带的格式化函数,一个简单的方法是先用 snprintf / sstream 把要输出的内容格式化到一个 buffer ,再用系统调用输出。

  一个极简的 C++ 11 线程安全日志库(POSIX only):

#include <sstream>
#include <iomanip>

#include <unistd.h>
#include <time.h>

/**
 * 在 out 后面追加时间戳,格式 YYYY-MM-DD HH:MM:SS.mmm
 */
void appendTimestampMs(std::ostream& out) {
    timespec t {};
    clock_gettime(CLOCK_REALTIME, &t); // 忽略错误
    {
        struct tm tm;
        localtime_r(&t.tv_sec, &tm);
        out << std::put_time(&tm, "%F %T");
    }
    char fill = out.fill();
    out << '.' << std::setfill('0') << std::setw(3) << t.tv_nsec / 1000000 << std::setfill(fill);
}

template<class... Args>
void plog(Args&&... args) {
    using _expander = int[];
    std::stringstream buf;
    // 先输出时间,精确到毫秒
    appendTimestampMs(buf);
    buf << ' ';
    (void)_expander{ (void(buf << std::forward<Args>(args)), 0)... };
    buf << '\n';
    std::string str = buf.str();
    write(STDOUT_FILENO, str.data(), str.size());
}

  使用:

plog("[INFO] test: ", 10);

  输出:

2022-05-06 20:47:43.725 [INFO] test: 10

脚注:


  1. 引用自 write(2) 手册页:“If the file was open(2)ed with O_APPEND, the file offset is first set to the end of the file before writing. The adjustment of the file offset and the write operation are performed as an atomic step.”↩︎

C++ 中的 Continuation-Passing Style

作者 henix
2021年4月5日 00:00

  先说一下何为 Continuation-Passing Style 。在我的理解中,Continuation-Passing Style 是指,一个函数想要返回一个结果,但并不是通过 return 返回,而是将结果传给一个回调函数。

  考虑一个基本问题:将一个整数序列化成 ASCII 编码的字符串,或者更广义的如何序列化对象的问题。一个简单的想法是返回一个 std::string ,但如果要序列化的对象比较大,占用内存较多,同时如果这个对象序列化后是写入 stdout 或文件 / 网络,那为什么不直接 write 呢,这样还可以省去内存分配。

  所以我们的问题变成了:如何编写一个通用的序列化函数,可以同时用于序列化到内存缓冲区和用户终端(以及其他任意对象 / 网络 / 文件)?。

  下面我给出一种方案:

#include <string>

/**
 * @param padTo 结果前面用 0 补齐的位数
 */
template<class Yield>
void formatInt(unsigned int n, uint8_t padTo, Yield&& yield) {
    if (n == 0) {
        if (padTo == 0) {
            yield('0');
        } else {
            for (; padTo > 0; padTo--) {
                yield('0');
            }
        }
        return;
    }
    static constexpr int BUF_SIZE = sizeof(unsigned int) * 8;
    char buf[BUF_SIZE];
    int pos = BUF_SIZE;
    while (n > 0) {
        int r = n % 10;
        pos--;
        assert(pos >= 0);
        buf[pos] = '0' + r;
        n /= 10;
    }
    const int len = BUF_SIZE - pos;
    for (; padTo > len; padTo--) {
        yield('0');
    }
    yield(std::string_view(buf + pos, BUF_SIZE - pos));
}

  这里的重点不是具体的序列化算法,而是其中叫“yield”的模板参数。相信你已经看出来了,这里的 yield 就是很多现代编程语言都有的 coroutine / generator 中的 yield ,它表示:从协程中返回一个值。但我们的程序只是借鉴了这个概念,并未真正用到协程。

  下面给出输出到 std::cout 的代码,很简单:

formatInt(42, 4, [](auto&& a) { std::cout << a; }); // => 0042

输出到 std::string 要稍微复杂一些,需要一个辅助类 StringPusher:

class StringPusher {
    std::string& str;
public:
    StringPusher(std::string& str): str(str) {}
    void operator()(char c) {
        str.push_back(c);
    }
    void operator()(std::string_view buf) {
        str.append(buf.data(), buf.size());
    }
};

std::string s;
StringPusher push(s);

formatInt(42, 4, push); // => 0042

  总结一下就是:这个序列化函数并不是返回 string ,而是返回 Generator[byte] ,即一个字节序列的生成器。而 std::cout 或 StringPusher 则起到字节序列的接收器的作用。套用生产者-消费者模型来说,formatInt 是生产者,std::cout 或 StringPusher 是消费者。

  考虑另一个常见问题:将一个字符串按指定的分隔符分割。应该如何设计这个函数的接口?

  常规的做法是返回一个字符串数组,如 js 的 String.prototype.split 和 Java 的 String.split ,而 boost::algorithm::split 要求传入一个容器的左值引用作为输出参数,本质上跟返回数组没什么区别。

  在 C++ 中返回数组并非最优设计,因为用户可能只需要一边生成一边使用,用完就销毁,并不需要把所有字符串存起来。所以我们可以不用返回字符串数组,而是返回字符串生成器,即 Generator[string] 。但这里我们仍旧只是借用协程的思想,并不实际使用协程,因为真正的协程代价还是比较高。generator 翻译成 C++ 标准库的语言则是 OutputIterator ,用 CPS 风格来表述则是回调函数。

  如下是一个使用回调函数作为输出的 split 的简单实现:

/**
 * 会保留最后一个空串,例:
 * ",".split(',') -> ["", ""]
 */
class Split {
    char delim;
public:
    Split(char delim): delim(delim) {}
    template<class Yield>
    const char* operator()(const char* first, const char* last, Yield&& yield) {
        const char* start = first;
        const char* i = std::find(start, last, delim);
        while (i != last) {
            yield(std::string_view(start, i - start));
            start = i + 1;
            i = std::find(start, last, delim);
        }
        yield(std::string_view(start, last - start));
        return last;
    }
};

  使用:

std::string_view s("a,b");

int i = 0;
Split(',')(s.data(), s.data() + s.size(), [&](std::string_view part) {
    std::cout << "item " << i << ": " << part << std::endl;
    i++;
});

  每个 part 都是随用随销毁,没有多余的内存分配。体现出了 generator / 回调函数风格的高效。

  不过有个问题在回调函数风格中可能会比普通风格中麻烦一点:如果处理到中间的某个地方,想要直接退出,怎么办?

  一个简单粗暴的解决方案是使用异常。我们可以约定一个异常,专门用来表示退出 coroutine ,比如名字叫 ExitCo 。这就要求相关代码是异常安全的,如果使用了外部资源,应该用 RAII 包装。

struct ExitCo {};

  使用:

try {
    int i = 0;
    Split(',')(s.data(), s.data() + s.size(), [&](std::string_view part) {
        // 最多处理前 5 个
        if (i >= 5) {
            throw ExitCo();
        }
        std::cout << "item " << i << ": " << part << std::endl;
        i++;
    });
} catch (const ExitCo&) {}

  总结:个人认为 C++ 中凡是需要返回数组的时候,都可以考虑使用回调函数来避免多余的内存分配。

  如果你更喜欢传统的 C++ ,也可以使用 Output Iterator(参考 std::transform)。但 Output Iterator 的缺点在于必须采用运算符重载的方式实现,会多一层 proxy object 。而用 Yield 模板参数的好处是,如果你想用真正的 coroutine ,可以直接传入 boost::coroutines2::coroutine::push_type ,代码不用做任何改动。

  相关概念的对比表:

名称 boost coroutines2 CPS 风格 C++ 标准库 GC 语言(js / Python / Java / Go) 多线程(生产者-消费者模式)
接收器 push-coroutine 回调函数 Output Iterator 状态机(state machine) / reducer / Iteratee 消费者
生成器 pull-coroutine 自己实现类似 java.util.Iterator 的一个类 两个 Input Iterator: first, last generator, java.util.Iterator 生产者
缓冲区 返回容器 用容器保存中间结果 返回数组 Disruptor / concurrent queue / go channel / actor mailbox

  回调函数相当于一个大小为 0 的 go channel ,而大小为 0 的 go channel 相当于一个 condition variable

❌
❌