状态机与函数式编程

最后更新日期:2023-12-22

  在编程中,我们常用的迭代器(iterator)与我今天要说的状态机(state machine)都是对“流”(stream)的抽象,只不过一个是拉(pull),一个是推(push)。

  迭代器的定义一般是这样的:

class Iterator[T] {
  T next();
}

  即调用一个函数,它不断返回流中的下一个元素。通常还需要其他机制来指示迭代结束,比如添加一个 bool hasNext() 方法(Java),返回两个值,第二个 bool 值表示迭代是否结束(js),抛一个特定异常(Python)等。

  任何数据流都有生产者(Producer)和消费者(Consumer),迭代器是生产者的一种抽象,那如果我们把消费者抽象出来,会是什么样的?

  我这篇文章还是使用计算机科学的术语,称其为状态机。我理解的状态机的接口定义是这样的:

class Pusher[T] {
  void Push(T); // 将一个元素推给状态机
  void Flush(); // 标记一个阶段结束
}

  如果跟 Go 语言中的 io.Writer 接口对比,是不是非常相似?事实上,我认为 Go 的 io.Reader 可以类比为迭代器,io.Writer 可以类比为状态机。

  下面就是有意思的部分了:如果我们可以在迭代器的接口上实现各种函数式编程的操作(如 filter, map ),那我们能否在状态机的接口上实现同样的操作呢(只不过是以 push 的视角)?答案是肯定的。

  当然,Rx 框架也可以看作一种实现,只不过我认为它太复杂了,还不如我自己实现。

  下面是我用 Go 语言实现的简易 Pusher 及其 filter 定义:

type Pusher[T any] interface {
  Push(T)
  Flush()
}

type PusherFilter[A any] struct {
  next   Pusher[A]
  filter func(A) bool
}

func NewPusherFilter[A any](f func(A) bool, next Pusher[A]) *PusherFilter[A] {
  return &PusherFilter[A]{next, f}
}

func (t *PusherFilter[A]) Push(a A) {
  if t.filter(a) {
    t.next.Push(a)
  }
}

func (t *PusherFilter[A]) Flush() {
  t.next.Flush()
}

  使用:

pusher := NewPusherFilter(func(item *Item) bool { return productSet[item.Product] }, files)

  这种东西有什么用?我们通常更习惯使用迭代器和基于迭代器的函数式操作,但很多时候,设计一个基于迭代器的流式接口可能是很麻烦的。考虑一个例子:遍历文件目录树,流式地返回每个文件的信息。如果一定要做成迭代器,调用一个函数才返回下一个文件信息,那我们不得不手动构造一个栈(stack)来保存中间状态,程序写起来并不直观。

  但如果我们用状态机来设计这个东西的接口又如何?我们可能会得到如下接口:

void walk_dir(string dirname, Pusher[FileInfo] pusher);

  这样做的好处是让 walk_dir 可以用递归来实现。这里的 pusher 就是一个状态机,你也可以把它近似地看成一个回调函数,但跟回调函数的不同之处在于,Pusher 是可组合(composable)的!我们可以通过 map 、filter 等组合子(combinators)构造出新的匿名 Pusher 。这种可组合性(composability)跟迭代器如出一辙。当然也有不好的一面:失去了迭代器的可以在中间任意位置停下来的能力。

  回到 Go 语言本身,由于 Go 语言的迭代器接口还遥遥无期,我这里提出的 Pusher 是一种围魏救赵的办法。关于 pull 和 push 的对偶性(duality),也可以参考 Go 语言核心开发者 rsc 的讨论:user-defined iteration using range over func values · golang/go · Discussion #56413

相关文章: