archer-rpc: 一个基于 Scala 宏的异步 RPC 服务器

最后更新日期:2014-12-16

  archer-rpc 是一个简单的 RPC 服务器。架构如下:

  由于协议是自定义的,别人恐怕很难拿去直接用。所以我这里开源其实主要是为了学习交流。

  编写这个 RPC 的主要动机是:更容易地实现跨语言 RPC ,特别是动态和静态语言之间的。先说一下我们公司使用跨语言 RPC 的历史吧:

HTTP + JSON

  相关的库最成熟,容易编写。但问题是:http 的很多东西我们是用不到的。比如各种状态码、Cookie、Cache、各种 headers 。所有数据在处理的时候都用过一次 HTTP 协议栈,效率较低。

  archer-rpc 中规定的返回值类型只有如下几种:

Thrift

  选择这个库的基本理由是原生就支持 Java 和 php ——两种我们主要使用的语言,而且性能看起来也不错。但问题是:太复杂。

  1. 使用一套自己的数据结构描述语言,双方都要适配到上面。对比前面的 json ,不够灵活
  2. Java 还需要生成代码,增加构建环节

archer-rpc

  最终我选择了自己实现,因为我发现用 Scala 的宏可以实现 Thrift-Java 中必须用工具生成代码的部分,也就是静态转换成动态的部分。

  下面就来看一下如何用 archer-rpc 编写编写 rpc 服务器吧,例子是加法函数(一个同步和一个异步版本):

  MyMath 模块,通过 @RpcExport 标记要 export 的函数:

import henix.archer.RpcExport

import scala.concurrent.Future

object MyMath {

    @RpcExport
    def add(a: Int, b: Int): Int = a + b

    @RpcExport
    def addAsync(a: Int, b: Int): Future[Int] = Future { a + b }
}

  Main.scala 启动 rpc 服务器,按 Ctrl+C 退出:

import henix.archer.{RpcServer, Rpc}
import spray.json._
import spray.json.DefaultJsonProtocol._
import sun.misc.{SignalHandler, Signal}

object Main extends App {

  val mods = Map(
    "MyMath" -> Rpc.genModule[MyMath.type]
  )

  val rpcServer = new RpcServer(mods, "localhost", 4600)

  // unix only
  val exitHandler = new SignalHandler {
    override def handle(sig: Signal): Unit = {
      rpcServer.stop()
    }
  }
  Signal.handle(new Signal("INT"), exitHandler)
  Signal.handle(new Signal("TERM"), exitHandler)

  rpcServer.join()
}

  这里的 Rpc.genModule[T] 是一个宏,它会在 编译 的时候找出 T(现在只能用 object)的所有带有 @RpcExport 注解的方法,然后为这些方法生成 json -> 方法参数的代码。参见 https://github.com/henix/archer-rpc/blob/master/macros/src/main/scala/RpcMacros.scala

  如何测试:因为 archer-rpc 是直接使用纯文本流的,这意味着可以使用普通的 tcp 工具与其交互(就像可以用 Firefox 跟 HTTP API 交互),比如 netcat 或 socat(–> 表示发送给服务器的内容,<– 表示服务器返回的内容):

nc localhost 4700
--> {"mod":"MyMath","func":"add","params":{"a":1,"b":2}}
<-- {"type":"Success","value":3}

  nc 只支持按 Ctrl+C 结束。socat 则支持更强大的 readline 行编辑,可以 Ctrl+D 结束:

socat readline tcp4:localhost:4600

  socat 也可以写脚本执行:

echo '{"mod":"MyMath","func":"add","params":{"a":1,"b":2}}' | socat -t 15 - tcp4:localhost:4600

  如何为所有调用加上日志和统计功能?因为 Rpc.genModule 返回的是一个 Map[String, RpcMethodCall => Future[JsValue]] ,我们只需要包装一下这个 RpcMethodCall => Future[JsValue] 即可。这里用 Metrics 来统计成功/失败计数、测量函数运行时间:

import com.codahale.metrics.MetricRegistry
import com.typesafe.scalalogging.LazyLogging
import henix.archer.{UpstreamException, InputException, RpcMethodCall}
import Global.execctx
import spray.json.JsValue

import scala.concurrent.Future
import scala.concurrent.duration._

object RpcUtils extends LazyLogging {

  private val metrics: MetricRegistry = Global.metrics

  def withLogAndMetrics(func: RpcMethodCall => Future[JsValue])(methodCall: RpcMethodCall): Future[JsValue] = {
    val metricName = methodCall.mod + "." + methodCall.func

    val successEvent = metrics.meter(metricName + ".Success")
    val inputErrorEvent = metrics.meter(metricName + ".InputError")
    val upstreamErrorEvent = metrics.meter(metricName + ".UpstreamError")
    val methodCallErrorEvent = metrics.meter(metricName + ".MethodCallError")
    val timer = metrics.timer(metricName)
    val activeRequests = metrics.counter(metricName + ".activeRequests")

    val timerContext = timer.time()
    activeRequests.inc()

    val f = Future(methodCall).flatMap(func)

    f.onSuccess { case _ => successEvent.mark() }
    f.onFailure {
      case e: InputException =>
        inputErrorEvent.mark()
        logger.warn("input.error: {} => {}", methodCall, e.getMessage)
      case e: UpstreamException =>
        upstreamErrorEvent.mark()
        logger.warn("upstream.error: {} => {}", methodCall, e.getMessage)
      case e: Exception =>
        methodCallErrorEvent.mark()
        logger.error(methodCall.toString, e)
    }
    f.onComplete { _ =>
      val elapsed = timerContext.stop()
      activeRequests.dec()
      logger.info("{} complete in {}ms", methodCall, elapsed.nanoseconds.toMillis.toString)
    }

    f
  }
}

  然后在 Main 里用 val mods = Map(...).mapValues(_.mapValues(RpcUtils.withLogAndMetrics))

JSON-RPC

  后来我在网上闲逛的时候发现原来还有 JSON-RPC 这种协议,真是于我心有戚戚焉!如果让我再做一次设计,我可能会直接用它。

  不过仔细思考了 JSON-RPC 跟上述我们的自定义协议的不同之后,我认为:

  1. JSON-RPC 跟 archer-rpc 的自定义协议的最大不同是:它不是面向行的!

    archer-rpc 的每个请求和响应都是按照换行符分割的:每个请求中间必须没有换行符,最后必须有一个换行符。每个响应也一样。这是为了方便解析。而 JSON-RPC 允许在中间随意插换行符,这导致无法简单地实现。

  2. JSON-RPC 实现 batch query 要用特殊语法。而我的自定义协议只需要一次发送多行就可以了。

P.S. 为何要用这个名字呢?随便取的,出自 Fate 系列的七阶职之一。