最后更新日期:2023-12-26
读了我的上一篇文章的读者可能仍然会觉得状态机,或者说 Puhser 这个概念太抽象,不知道该怎么用。因此我在这篇文章中讲一个实际的例子。
考虑一个常见的数据汇总问题:有一个文件,每行是一个数据,每行数据中包含一个日期和一些统计数字(比如销售额、用户活跃数、用户付费等),需要按月度汇总这些统计数字,并且输出到另一个文件。
我们还有一些限制条件和设计要点:
- 文件已经按照日期从小到大排序
- 输出时也应按日期从小到大排序
- 每个时间分组中的数据行的数量并不确定,例如一个月可能有 31 天,也可能有 28 天
- 这个文件很大,为了控制内存的使用,不能把所有行读进一个大数组里处理
- 扩展性:如果以后要求不仅仅是汇总每月,还要按每周、每季度、每年汇总,能否很方便地支持?能否支持任意自定义的日期汇总方式?
- 扩展性2:汇总方法可能不仅仅是求和,还可能是求平均、求中位数、求标准差之类的,能否方便地支持自定义?
- 扩展性3:可能在其他部分的代码中也有这种“按时间统计汇总”的需求,能否抽象出一个公共函数 / 类?
这个问题我们一般称为“resample”,在处理时间序列型数据时非常常见。著名的 Python 数据处理库 pandas 还有专门的教程页面。
回到函数式编程,哪种序列变换函数对这个问题最合适?可以一步步地分解思考这个问题:
- 我们需要一个将原始日期变换成汇总后日期的函数,例如将“2023-10-06”变换成“2023-10”,即提取出“月”的部分
- 上一步得到的值可以看作一个 key ,我们需要将序列中相邻的且 key 相同的元素放进一个组中
- 对每个组进行汇总,然后输出
可见这里的重点是,如何将序列中相邻的且 key 相同的元素分组,有什么现成的函数吗?
我首先想到的是,这个需求有点像 GroupBy ,但 GroupBy 跟这个需求有一些不同:
- GroupBy 的输出结果是一个
Map[Key, Item[]]
,而我们需要输出另一个序列 - GroupBy 是全局的,它会把所有 key 相同的汇总,而这个问题中,我们需要汇总的是 key 相同且相邻的元素
然后我找到一个看上去很接近的 lo.PartitionBy ,但仔细观察后发现,这个函数会调整元素的顺序,依然不是我们需要的。
所以只能自己实现了,我称之为“SeqGroupBy”:
type Pusher[T any] interface {
(T)
Push()
Flush}
type PusherSeqGroupBy[K comparable, A any] struct {
[[]A]
next Pusherfunc(A) K
getKey
curKey K[]A
items }
func NewPusherSeqGroupBy[K comparable, A any](getKey func(A) K, next Pusher[[]A]) *PusherSeqGroupBy[K, A] {
return &PusherSeqGroupBy[K, A]{next: next, getKey: getKey}
}
func (t *PusherSeqGroupBy[K, A]) Push(a A) {
:= t.getKey(a)
newKey if len(t.items) == 0 {
.items = append(t.items, a)
t.curKey = newKey
t} else {
if newKey == t.curKey {
.items = append(t.items, a)
t} else {
.next.Push(t.items)
t.curKey = newKey
t.items = []A{a}
t}
}
return
}
func (t *PusherSeqGroupBy[K, A]) Flush() {
if len(t.items) > 0 {
.next.Push(t.items)
t.next.Flush()
t}
.items = nil
treturn
}
构造一个 PusherSeqGroupBy 需要传入两个参数:
- getKey: 从一行数据中提取出 key ,这个函数的实现是,先从一行数据中提取出日期,再把日期变成汇总后的日期,比如按月的话就是将“2023-10-06”变成“2023-10”
- next: 数据需要传给的下一个处理环节,类型是 Pusher[[]Row] 。接受一个已经分好组的数据组,汇总然后输出
我们可以画出程序的数据流图(dataflow diagram)如下:
按行读文件 → SeqGroupBy 分组 → 每个组放进汇总器 → 每条汇总结果输出
每个右箭头“→”都对应了一个 Pusher.Push() 的调用。最终可以实现内存中最多持有一个汇总组的数据,做到了随用随销毁。
程序的剩余部分已经很显然了,我想可以留给读者自己完成。从这个例子也可以看出为什么 Pusher 的接口定义中需要有 Flush 。
这个例子我们还可以看出基于状态机的,或 push-style 函数式编程的一个特点:如果说基于迭代器的函数式编程是以数据的源(source)为基础,经过层层变换,最终输出到数据的汇(sink);那么基于状态机的函数式编程则是反过来将数据的汇(sink)层层包装,最后接入数据源。当然,实际上我们也可以同时使用这两种风格,既用迭代器把源变形,又用状态机把汇变形,然后在中间的某个地方拼接在一起,这样,我们可以自由地选择最合适的工具。