什么是 OTP?
它是开放电信平台!
OTP 代表 *开放电信平台*,尽管它不再与电信关系密切(它更多地与具有电信应用程序属性的软件相关,但确实如此)。如果 Erlang 的强大功能一半来自它的并发性和分布式,另一半来自它的错误处理能力,那么 OTP 框架就是它的第三部分。
在之前的章节中,我们已经看到了几个关于如何使用语言内置设施编写并发应用程序的常见做法示例:链接、监控、服务器、超时、捕获退出等。在事物执行顺序、如何避免竞争条件或始终记住进程可能随时死亡方面,有一些 “注意事项”。此外还有热代码加载、命名进程和添加监督者等等。
手动完成所有这些工作既费时又容易出错。有些边缘情况会被遗漏,也可能掉进陷阱。OTP 框架通过将这些基本实践分组到经过多年精心设计和实战检验的库中来解决这个问题。每个 Erlang 程序员都应该使用它们。
OTP 框架也是一组模块和标准,旨在帮助你构建应用程序。鉴于大多数 Erlang 程序员最终都会使用 OTP,你在野外遇到的大多数 Erlang 应用程序往往会遵循这些标准。
抽象的通用进程
我们在之前进程示例中多次做的事情之一是按照非常具体的任务进行划分。在大多数进程中,我们有一个负责生成新进程的函数,一个负责赋予它初始值的函数,一个主循环等等。
事实证明,这些部分通常存在于你编写的每个并发程序中,无论该进程用于什么目的。
OTP 框架背后的工程师和计算机科学家发现了这些模式,并将它们包含在一组通用库中。这些库使用等效于我们使用的大多数抽象的代码构建(例如使用引用来标记消息),并且具有在该领域使用多年的优势,并且比我们使用自己的实现更加谨慎。它们包含用于安全地生成和初始化进程、以容错方式向它们发送消息以及许多其他功能的函数。有趣的是,你很少需要自己使用这些库。它们包含的抽象非常基本和通用,以至于在它们之上构建了许多更有趣的东西。我们将使用那些库。
在接下来的章节中,我们将看到几个关于进程的常见用法,然后是如何抽象它们,再是如何使其泛化。对于其中的每一个,我们还将看到使用 OTP 框架的行为的对应实现,以及如何使用它们。
基本服务器
我将要描述的第一个常见模式是我们已经使用过的。在编写 事件服务器 时,我们有可以称为 *客户端-服务器模型* 的东西。事件服务器会接收来自客户端的调用,对其进行处理,然后根据协议决定是否回复客户端。
在本章中,我们将使用一个非常简单的服务器,让我们能够专注于它的基本属性。这是 小猫服务器
%%%%% Naive version -module(kitty_server). -export([start_link/0, order_cat/4, return_cat/2, close_shop/1]). -record(cat, {name, color=green, description}). %%% Client API start_link() -> spawn_link(fun init/0). %% Synchronous call order_cat(Pid, Name, Color, Description) -> Ref = erlang:monitor(process, Pid), Pid ! {self(), Ref, {order, Name, Color, Description}}, receive {Ref, Cat} -> erlang:demonitor(Ref, [flush]), Cat; {'DOWN', Ref, process, Pid, Reason} -> erlang:error(Reason) after 5000 -> erlang:error(timeout) end. %% This call is asynchronous return_cat(Pid, Cat = #cat{}) -> Pid ! {return, Cat}, ok. %% Synchronous call close_shop(Pid) -> Ref = erlang:monitor(process, Pid), Pid ! {self(), Ref, terminate}, receive {Ref, ok} -> erlang:demonitor(Ref, [flush]), ok; {'DOWN', Ref, process, Pid, Reason} -> erlang:error(Reason) after 5000 -> erlang:error(timeout) end. %%% Server functions init() -> loop([]). loop(Cats) -> receive {Pid, Ref, {order, Name, Color, Description}} -> if Cats =:= [] -> Pid ! {Ref, make_cat(Name, Color, Description)}, loop(Cats); Cats =/= [] -> % got to empty the stock Pid ! {Ref, hd(Cats)}, loop(tl(Cats)) end; {return, Cat = #cat{}} -> loop([Cat|Cats]); {Pid, Ref, terminate} -> Pid ! {Ref, ok}, terminate(Cats); Unknown -> %% do some logging here too io:format("Unknown message: ~p~n", [Unknown]), loop(Cats) end. %%% Private functions make_cat(Name, Col, Desc) -> #cat{name=Name, color=Col, description=Desc}. terminate(Cats) -> [io:format("~p was set free.~n",[C#cat.name]) || C <- Cats], ok.
所以这是一个小猫服务器/商店。它的行为极其简单:你描述一只猫,你就会得到那只猫。如果有人退货一只猫,它会被添加到列表中,然后自动作为下一份订单发送,而不是客户端实际要求的订单(我们在这个小猫商店是为了赚钱,而不是为了笑容)。
1> c(kitty_server). {ok,kitty_server} 2> rr(kitty_server). [cat] 3> Pid = kitty_server:start_link(). <0.57.0> 4> Cat1 = kitty_server:order_cat(Pid, carl, brown, "loves to burn bridges"). #cat{name = carl,color = brown, description = "loves to burn bridges"} 5> kitty_server:return_cat(Pid, Cat1). ok 6> kitty_server:order_cat(Pid, jimmy, orange, "cuddly"). #cat{name = carl,color = brown, description = "loves to burn bridges"} 7> kitty_server:order_cat(Pid, jimmy, orange, "cuddly"). #cat{name = jimmy,color = orange,description = "cuddly"} 8> kitty_server:return_cat(Pid, Cat1). ok 9> kitty_server:close_shop(Pid). carl was set free. ok 10> kitty_server:close_shop(Pid). ** exception error: no such process or port in function kitty_server:close_shop/1
回顾该模块的源代码,我们可以看到以前应用过的模式。设置和取消监控、应用计时器、接收数据、使用主循环、处理 init 函数等部分应该都很熟悉。应该可以将这些我们一直重复做的事情抽象出来。
让我们首先看一下客户端 API。我们可以注意到的第一件事是,同步调用非常相似。这些调用很可能被抽象到上一节中提到的抽象库中。目前,我们只将它们抽象为一个单一函数,放在一个 新模块 中,该模块将包含小猫服务器的所有泛化部分
-module(my_server). -compile(export_all). call(Pid, Msg) -> Ref = erlang:monitor(process, Pid), Pid ! {self(), Ref, Msg}, receive {Ref, Reply} -> erlang:demonitor(Ref, [flush]), Reply; {'DOWN', Ref, process, Pid, Reason} -> erlang:error(Reason) after 5000 -> erlang:error(timeout) end.
这将消息和 PID 放入函数中,然后以安全的方式转发消息。从现在开始,我们可以用调用这个函数来代替我们进行的消息发送。因此,如果我们要重写一个新的与抽象的 my_server
配对的小猫服务器,它可以这样开始
-module(kitty_server2). -export([start_link/0, order_cat/4, return_cat/2, close_shop/1]). -record(cat, {name, color=green, description}). %%% Client API start_link() -> spawn_link(fun init/0). %% Synchronous call order_cat(Pid, Name, Color, Description) -> my_server:call(Pid, {order, Name, Color, Description}). %% This call is asynchronous return_cat(Pid, Cat = #cat{}) -> Pid ! {return, Cat}, ok. %% Synchronous call close_shop(Pid) -> my_server:call(Pid, terminate).
我们拥有的下一个大型泛化代码块不像 call/2
函数那么明显。请注意,到目前为止,我们编写的每个进程都有一个循环,其中所有消息都被模式匹配。这是一个有点棘手的部分,但在这里我们必须将模式匹配与循环本身分开。一个快速的方法是添加
loop(Module, State) -> receive Message -> Module:handle(Message, State) end.
然后特定模块可以像这样
handle(Message1, State) -> NewState1; handle(Message2, State) -> NewState2; ... handle(MessageN, State) -> NewStateN.
这样更好。还有办法让它更干净。如果你在阅读 kitty_server
模块时注意到了(我希望你注意到了!),你会注意到我们有一种特定的方式来进行同步调用,另一种方式来进行异步调用。如果我们的通用服务器实现能够提供一种明确的方法来识别哪种调用是哪种,那就非常有帮助。
为了做到这一点,我们需要在 my_server:loop/2
中匹配不同类型的消息。这意味着我们需要稍微改变一下 call/2
函数,以便通过在函数第二行中的消息添加原子 sync
来使同步调用变得明显
call(Pid, Msg) -> Ref = erlang:monitor(process, Pid), Pid ! {sync, self(), Ref, Msg}, receive {Ref, Reply} -> erlang:demonitor(Ref, [flush]), Reply; {'DOWN', Ref, process, Pid, Reason} -> erlang:error(Reason) after 5000 -> erlang:error(timeout) end.
现在我们可以为异步调用提供一个新函数。函数 cast/2
将处理它
cast(Pid, Msg) -> Pid ! {async, Msg}, ok.
这样一来,循环就可以像这样
loop(Module, State) -> receive {async, Msg} -> loop(Module, Module:handle_cast(Msg, State)); {sync, Pid, Ref, Msg} -> loop(Module, Module:handle_call(Msg, Pid, Ref, State)) end.
然后你也可以添加特定的槽位来处理不符合同步/异步概念的消息(可能是意外发送的)或将调试函数和其他东西(例如热代码加载)放在那里。
上面循环中令人失望的一点是抽象出现了泄漏。使用 my_server
的程序员仍然需要了解引用才能发送同步消息并回复它们。这使得抽象毫无用处。要使用它,你仍然需要了解所有无聊的细节。这里有一个快速修复
loop(Module, State) -> receive {async, Msg} -> loop(Module, Module:handle_cast(Msg, State)); {sync, Pid, Ref, Msg} -> loop(Module, Module:handle_call(Msg, {Pid, Ref}, State)) end.
通过将 Pid 和 Ref 两个变量放入一个元组中,可以将它们作为一个参数传递给另一个函数,该参数可以使用像 From 这样的名称。这样,用户就不必了解变量的内部结构。相反,我们将提供一个函数来发送回复,该函数应该了解 From 包含的内容
reply({Pid, Ref}, Reply) -> Pid ! {Ref, Reply}.
剩下要做的是指定启动函数 (start
、start_link
和 init
),它们会传递模块名称等。一旦它们被添加,该模块应该如下所示
-module(my_server). -export([start/2, start_link/2, call/2, cast/2, reply/2]). %%% Public API start(Module, InitialState) -> spawn(fun() -> init(Module, InitialState) end). start_link(Module, InitialState) -> spawn_link(fun() -> init(Module, InitialState) end). call(Pid, Msg) -> Ref = erlang:monitor(process, Pid), Pid ! {sync, self(), Ref, Msg}, receive {Ref, Reply} -> erlang:demonitor(Ref, [flush]), Reply; {'DOWN', Ref, process, Pid, Reason} -> erlang:error(Reason) after 5000 -> erlang:error(timeout) end. cast(Pid, Msg) -> Pid ! {async, Msg}, ok. reply({Pid, Ref}, Reply) -> Pid ! {Ref, Reply}. %%% Private stuff init(Module, InitialState) -> loop(Module, Module:init(InitialState)). loop(Module, State) -> receive {async, Msg} -> loop(Module, Module:handle_cast(Msg, State)); {sync, Pid, Ref, Msg} -> loop(Module, Module:handle_call(Msg, {Pid, Ref}, State)) end.
接下来要做的是重新实现小猫服务器,现在是 kitty_server2
,作为一个回调模块,它将遵守我们为 my_server
定义的接口。我们将保留与之前实现相同的接口,只是所有调用现在都被重定向到通过 my_server
进行
-module(kitty_server2). -export([start_link/0, order_cat/4, return_cat/2, close_shop/1]). -export([init/1, handle_call/3, handle_cast/2]). -record(cat, {name, color=green, description}). %%% Client API start_link() -> my_server:start_link(?MODULE, []). %% Synchronous call order_cat(Pid, Name, Color, Description) -> my_server:call(Pid, {order, Name, Color, Description}). %% This call is asynchronous return_cat(Pid, Cat = #cat{}) -> my_server:cast(Pid, {return, Cat}). %% Synchronous call close_shop(Pid) -> my_server:call(Pid, terminate).
请注意,我在模块顶部添加了第二个 -export()
。这些是 my_server
需要调用的函数,以使一切正常工作
%%% Server functions init([]) -> []. %% no treatment of info here! handle_call({order, Name, Color, Description}, From, Cats) -> if Cats =:= [] -> my_server:reply(From, make_cat(Name, Color, Description)), Cats; Cats =/= [] -> my_server:reply(From, hd(Cats)), tl(Cats) end; handle_call(terminate, From, Cats) -> my_server:reply(From, ok), terminate(Cats). handle_cast({return, Cat = #cat{}}, Cats) -> [Cat|Cats].
然后需要做的就是重新添加私有函数
%%% Private functions make_cat(Name, Col, Desc) -> #cat{name=Name, color=Col, description=Desc}. terminate(Cats) -> [io:format("~p was set free.~n",[C#cat.name]) || C <- Cats], exit(normal).
只需确保在 terminate/1
中用 exit(normal)
替换之前的 ok
,否则服务器将继续运行。
代码应该可以编译和测试,并且以与以前完全相同的方式运行。代码非常相似,但让我们看看发生了哪些变化。
特定与泛化
我们刚刚做的是从概念上理解 OTP 的核心。这就是 OTP 真正的意义所在:将所有泛化组件提取到库中,确保它们正常工作,然后尽可能地重复使用这些代码。然后,剩下的就是专注于特定内容,这些内容始终会在不同的应用程序之间发生变化。
显然,仅仅使用小猫服务器进行这种方式的操作并没有多少节省。它看起来有点像为了抽象而抽象。如果我们要交付给客户的应用程序仅仅是小猫服务器,那么第一个版本可能就可以了。如果你要开发更大的应用程序,那么将代码的泛化部分与特定部分分开可能值得一试。
让我们想象一下,我们有一些 Erlang 软件运行在服务器上。我们的软件运行着几个小猫服务器,一个兽医进程(你将损坏的小猫发送过去,它会将它们修复后返回),一个小猫美容院,一个宠物食品供应服务器等等。大多数这些都可以使用客户端-服务器模式实现。随着时间的推移,你的复杂系统变得充满了各种各样的服务器。
添加服务器会增加代码复杂性,也会增加测试、维护和理解的复杂性。每个实现可能都不同,由不同的人用不同的风格编写,等等。但是,如果所有这些服务器都共享相同的通用 my_server
抽象,那么你就可以大大降低这种复杂性。你能够立即理解该模块的基本概念(“哦,这是一个服务器!”),只有一个通用的实现需要测试、文档化等等。其他的工作可以放在每个特定实现上。
这意味着你减少了很多跟踪和解决错误的时间(只需针对所有服务器在一个地方完成即可)。这也意味着你减少了引入的错误数量。如果你要不断地重写 my_server:call/3
或进程的主循环,不仅会更耗时,而且忘记某一步或其他操作的可能性会急剧上升,错误也会随之增加。更少的错误意味着晚上接到电话去修复问题的次数更少,这绝对对我们所有人都有好处。你的情况可能有所不同,但我敢打赌你也不喜欢在休假的时候去办公室修复错误。
当我们将泛化与特定分离时,另一个有趣的事情是,我们立即使单独测试模块变得更加容易。如果你想对旧的小猫服务器实现进行单元测试,你需要为每个测试生成一个进程,赋予它正确状态,发送你的消息并期望得到你预期的回复。另一方面,我们的第二个小猫服务器只需要我们通过 'handle_call/3' 和 'handle_cast/2' 函数运行函数调用,并查看它们作为新状态输出什么。无需设置服务器,无需操纵状态。只需将其作为函数参数传递进去。请注意,这也意味着服务器的泛化方面更容易测试,因为你可以只实现非常简单的函数,这些函数除了让你专注于你想要观察的行为之外什么也不做,而无需其他任何操作。
使用这种方式的通用抽象带来的一个“隐藏”优势是,如果每个人都使用相同的后台来处理他们的流程,那么当有人优化这个单一的后台使其速度更快时,所有使用它的流程也会更快地运行。为了使这一原则在实践中发挥作用,通常需要很多人使用相同的抽象并对其进行努力。幸运的是,对于 Erlang 社区来说,这就是 OTP 框架的情况。
回到我们的模块。我们还没有解决很多问题:命名进程、配置超时、添加调试信息、如何处理意外消息、如何整合热代码加载、处理特定错误、抽象掉大多数回复的编写、处理大多数关闭服务器的方式、确保服务器与主管进程配合良好,等等。详细讨论所有这些对于本文来说是多余的,但在需要发布的实际产品中则是必要的。再次强调,你可能会明白为什么自己做所有这些事情是一个风险很大的任务。幸运的是,对于你(以及支持你的应用程序的人来说),Erlang/OTP 团队已经帮你处理了所有这些问题,并提供了 gen_server 行为。gen_server
就像打了兴奋剂的 my_server
一样,除了它拥有多年测试和生产使用经验。