状态机与函数式编程(二)

最后更新日期:2023-12-26

  读了我的上一篇文章的读者可能仍然会觉得状态机,或者说 Puhser 这个概念太抽象,不知道该怎么用。因此我在这篇文章中讲一个实际的例子。

  考虑一个常见的数据汇总问题:有一个文件,每行是一个数据,每行数据中包含一个日期和一些统计数字(比如销售额、用户活跃数、用户付费等),需要按月度汇总这些统计数字,并且输出到另一个文件。

  我们还有一些限制条件和设计要点:

  1. 文件已经按照日期从小到大排序
  2. 输出时也应按日期从小到大排序
  3. 每个时间分组中的数据行的数量并不确定,例如一个月可能有 31 天,也可能有 28 天
  4. 这个文件很大,为了控制内存的使用,不能把所有行读进一个大数组里处理
  5. 扩展性:如果以后要求不仅仅是汇总每月,还要按每周、每季度、每年汇总,能否很方便地支持?能否支持任意自定义的日期汇总方式?
  6. 扩展性2:汇总方法可能不仅仅是求和,还可能是求平均、求中位数、求标准差之类的,能否方便地支持自定义?
  7. 扩展性3:可能在其他部分的代码中也有这种“按时间统计汇总”的需求,能否抽象出一个公共函数 / 类?

  这个问题我们一般称为“resample”,在处理时间序列型数据时非常常见。著名的 Python 数据处理库 pandas 还有专门的教程页面

  回到函数式编程,哪种序列变换函数对这个问题最合适?可以一步步地分解思考这个问题:

  1. 我们需要一个将原始日期变换成汇总后日期的函数,例如将“2023-10-06”变换成“2023-10”,即提取出“月”的部分
  2. 上一步得到的值可以看作一个 key ,我们需要将序列中相邻的且 key 相同的元素放进一个组中
  3. 对每个组进行汇总,然后输出

  可见这里的重点是,如何将序列中相邻的且 key 相同的元素分组,有什么现成的函数吗?

  我首先想到的是,这个需求有点像 GroupBy ,但 GroupBy 跟这个需求有一些不同:

  1. GroupBy 的输出结果是一个 Map[Key, Item[]] ,而我们需要输出另一个序列
  2. GroupBy 是全局的,它会把所有 key 相同的汇总,而这个问题中,我们需要汇总的是 key 相同且相邻的元素

  然后我找到一个看上去很接近的 lo.PartitionBy ,但仔细观察后发现,这个函数会调整元素的顺序,依然不是我们需要的。

  所以只能自己实现了,我称之为“SeqGroupBy”:

type Pusher[T any] interface {
  Push(T)
  Flush()
}

type PusherSeqGroupBy[K comparable, A any] struct {
  next   Pusher[[]A]
  getKey func(A) K
  curKey K
  items  []A
}

func NewPusherSeqGroupBy[K comparable, A any](getKey func(A) K, next Pusher[[]A]) *PusherSeqGroupBy[K, A] {
  return &PusherSeqGroupBy[K, A]{next: next, getKey: getKey}
}

func (t *PusherSeqGroupBy[K, A]) Push(a A) {
  newKey := t.getKey(a)
  if len(t.items) == 0 {
    t.items = append(t.items, a)
    t.curKey = newKey
  } else {
    if newKey == t.curKey {
      t.items = append(t.items, a)
    } else {
      t.next.Push(t.items)
      t.curKey = newKey
      t.items = []A{a}
    }
  }
  return
}

func (t *PusherSeqGroupBy[K, A]) Flush() {
  if len(t.items) > 0 {
    t.next.Push(t.items)
    t.next.Flush()
  }
  t.items = nil
  return
}

  构造一个 PusherSeqGroupBy 需要传入两个参数:

  1. getKey: 从一行数据中提取出 key ,这个函数的实现是,先从一行数据中提取出日期,再把日期变成汇总后的日期,比如按月的话就是将“2023-10-06”变成“2023-10”
  2. next: 数据需要传给的下一个处理环节,类型是 Pusher[[]Row] 。接受一个已经分好组的数据组,汇总然后输出

  我们可以画出程序的数据流图(dataflow diagram)如下:

  按行读文件 → SeqGroupBy 分组 → 每个组放进汇总器 → 每条汇总结果输出

  每个右箭头“→”都对应了一个 Pusher.Push() 的调用。最终可以实现内存中最多持有一个汇总组的数据,做到了随用随销毁。

  程序的剩余部分已经很显然了,我想可以留给读者自己完成。从这个例子也可以看出为什么 Pusher 的接口定义中需要有 Flush 。

  这个例子我们还可以看出基于状态机的,或 push-style 函数式编程的一个特点:如果说基于迭代器的函数式编程是以数据的源(source)为基础,经过层层变换,最终输出到数据的汇(sink);那么基于状态机的函数式编程则是反过来将数据的汇(sink)层层包装,最后接入数据源。当然,实际上我们也可以同时使用这两种风格,既用迭代器把源变形,又用状态机把汇变形,然后在中间的某个地方拼接在一起,这样,我们可以自由地选择最合适的工具。