最后更新日期:2014-12-16
archer-rpc 是一个简单的 RPC 服务器。架构如下:
- 协议:面向行的 TCP + json ,用换行符分隔请求和响应。json 库采用 spray-json
- 异步:基于 netty 和 Scala 的 Future
- boilerplate-free:使用 Scala 宏自动生成代码
由于协议是自定义的,别人恐怕很难拿去直接用。所以我这里开源其实主要是为了学习交流。
编写这个 RPC 的主要动机是:更容易地实现跨语言 RPC ,特别是动态和静态语言之间的。先说一下我们公司使用跨语言 RPC 的历史吧:
HTTP + JSON
相关的库最成熟,容易编写。但问题是:http 的很多东西我们是用不到的。比如各种状态码、Cookie、Cache、各种 headers 。所有数据在处理的时候都用过一次 HTTP 协议栈,效率较低。
archer-rpc 中规定的返回值类型只有如下几种:
- Success - 成功
- InputError - 输入错误(相当于 4xx)
- UpstreamError - 上游错误(相当于 502 504)
- MethodCallError - 内部错误(相当于 500)
Thrift
选择这个库的基本理由是原生就支持 Java 和 php ——两种我们主要使用的语言,而且性能看起来也不错。但问题是:太复杂。
- 使用一套自己的数据结构描述语言,双方都要适配到上面。对比前面的 json ,不够灵活
- Java 还需要生成代码,增加构建环节
archer-rpc
最终我选择了自己实现,因为我发现用 Scala 的宏可以实现 Thrift-Java 中必须用工具生成代码的部分,也就是静态转换成动态的部分。
下面就来看一下如何用 archer-rpc 编写编写 rpc 服务器吧,例子是加法函数(一个同步和一个异步版本):
MyMath 模块,通过 @RpcExport
标记要 export 的函数:
import henix.archer.RpcExport
import scala.concurrent.Future
object MyMath {
@RpcExportdef add(a: Int, b: Int): Int = a + b
@RpcExportdef addAsync(a: Int, b: Int): Future[Int] = Future { a + b }
}
Main.scala 启动 rpc 服务器,按 Ctrl+C 退出:
import henix.archer.{RpcServer, Rpc}
import spray.json._
import spray.json.DefaultJsonProtocol._
import sun.misc.{SignalHandler, Signal}
object Main extends App {
val mods = Map(
"MyMath" -> Rpc.genModule[MyMath.type]
)
val rpcServer = new RpcServer(mods, "localhost", 4600)
// unix only
val exitHandler = new SignalHandler {
override def handle(sig: Signal): Unit = {
.stop()
rpcServer}
}
.handle(new Signal("INT"), exitHandler)
Signal.handle(new Signal("TERM"), exitHandler)
Signal
.join()
rpcServer}
这里的 Rpc.genModule[T] 是一个宏,它会在 编译 的时候找出 T(现在只能用 object)的所有带有 @RpcExport 注解的方法,然后为这些方法生成 json -> 方法参数的代码。参见 https://github.com/henix/archer-rpc/blob/master/macros/src/main/scala/RpcMacros.scala 。
如何测试:因为 archer-rpc 是直接使用纯文本流的,这意味着可以使用普通的 tcp 工具与其交互(就像可以用 Firefox 跟 HTTP API 交互),比如 netcat 或 socat(–> 表示发送给服务器的内容,<– 表示服务器返回的内容):
nc localhost 4700
--> {"mod":"MyMath","func":"add","params":{"a":1,"b":2}}
<-- {"type":"Success","value":3}
nc 只支持按 Ctrl+C 结束。socat 则支持更强大的 readline 行编辑,可以 Ctrl+D 结束:
socat readline tcp4:localhost:4600
socat 也可以写脚本执行:
echo '{"mod":"MyMath","func":"add","params":{"a":1,"b":2}}' | socat -t 15 - tcp4:localhost:4600
如何为所有调用加上日志和统计功能?因为 Rpc.genModule 返回的是一个 Map[String, RpcMethodCall => Future[JsValue]]
,我们只需要包装一下这个 RpcMethodCall => Future[JsValue]
即可。这里用 Metrics 来统计成功/失败计数、测量函数运行时间:
import com.codahale.metrics.MetricRegistry
import com.typesafe.scalalogging.LazyLogging
import henix.archer.{UpstreamException, InputException, RpcMethodCall}
import Global.execctx
import spray.json.JsValue
import scala.concurrent.Future
import scala.concurrent.duration._
object RpcUtils extends LazyLogging {
private val metrics: MetricRegistry = Global.metrics
def withLogAndMetrics(func: RpcMethodCall => Future[JsValue])(methodCall: RpcMethodCall): Future[JsValue] = {
val metricName = methodCall.mod + "." + methodCall.func
val successEvent = metrics.meter(metricName + ".Success")
val inputErrorEvent = metrics.meter(metricName + ".InputError")
val upstreamErrorEvent = metrics.meter(metricName + ".UpstreamError")
val methodCallErrorEvent = metrics.meter(metricName + ".MethodCallError")
val timer = metrics.timer(metricName)
val activeRequests = metrics.counter(metricName + ".activeRequests")
val timerContext = timer.time()
.inc()
activeRequests
val f = Future(methodCall).flatMap(func)
.onSuccess { case _ => successEvent.mark() }
f.onFailure {
fcase e: InputException =>
.mark()
inputErrorEvent.warn("input.error: {} => {}", methodCall, e.getMessage)
loggercase e: UpstreamException =>
.mark()
upstreamErrorEvent.warn("upstream.error: {} => {}", methodCall, e.getMessage)
loggercase e: Exception =>
.mark()
methodCallErrorEvent.error(methodCall.toString, e)
logger}
.onComplete { _ =>
fval elapsed = timerContext.stop()
.dec()
activeRequests.info("{} complete in {}ms", methodCall, elapsed.nanoseconds.toMillis.toString)
logger}
f}
}
然后在 Main 里用 val mods = Map(...).mapValues(_.mapValues(RpcUtils.withLogAndMetrics))
。
JSON-RPC
后来我在网上闲逛的时候发现原来还有 JSON-RPC 这种协议,真是于我心有戚戚焉!如果让我再做一次设计,我可能会直接用它。
不过仔细思考了 JSON-RPC 跟上述我们的自定义协议的不同之后,我认为:
JSON-RPC 跟 archer-rpc 的自定义协议的最大不同是:它不是面向行的!
archer-rpc 的每个请求和响应都是按照换行符分割的:每个请求中间必须没有换行符,最后必须有一个换行符。每个响应也一样。这是为了方便解析。而 JSON-RPC 允许在中间随意插换行符,这导致无法简单地实现。
JSON-RPC 实现 batch query 要用特殊语法。而我的自定义协议只需要一次发送多行就可以了。
P.S. 为何要用这个名字呢?随便取的,出自 Fate 系列的七阶职之一。