设计并发应用程序
一切都很顺利。你理解了这些概念,但话说回来,从本书开始到现在,我们只用过一些玩具示例:计算器、树、希思罗机场到伦敦等等。是时候做一些更有趣、更有教育意义的事情了。我们将用并发 Erlang 写一个小型应用程序。该应用程序将很小且基于行,但仍然有用且适度可扩展。
我是一个有点不善于整理的人。我总是为作业、公寓周围的事情、这本书、工作、会议、约会等等而感到迷茫。我最终会到处都有几十个清单,上面列着我仍然忘记做或忘记查看的任务。希望你仍然需要提醒自己该做什么(但你的思绪不像我那样漫无边际),因为我们将编写一个事件提醒应用程序,它会提示你做事并提醒你约会。
理解问题
第一步是弄清楚我们在做什么。你说:“一个提醒应用程序”。我说:“当然”。但还有更多。我们打算如何与软件交互?我们希望它为我们做什么?我们如何用进程来表示程序?我们如何知道要发送哪些消息?
正如谚语所说,“在水上行走和根据规格开发软件很容易,如果两者都冻结了。”所以让我们获得一份规范并坚持它。我们的这小段软件将允许我们执行以下操作
- 添加事件。事件包含截止日期(提醒时间)、事件名称和描述。
- 在时间到的时候显示警告。
- 按名称取消事件。
- 没有持久性磁盘存储。不需要显示我们将看到的架构概念。对于一个真正的应用程序来说,这很糟糕,但我只会简单地展示如果你想这样做,它可以被插入在哪里,并指明一些有用的函数。
- 鉴于我们没有持久性存储,我们必须能够在运行时更新代码。
- 与软件的交互将通过命令行完成,但应该能够稍后扩展它,以便可以使用其他方式(例如 GUI、网页访问、即时消息软件、电子邮件等)。
以下是程序的结构,我选择这样做
其中客户端、事件服务器和 x、y、z 都是进程。以下是每个进程可以执行的操作
事件服务器
- 接受来自客户端的订阅
- 将事件进程的通知转发给每个订阅者
- 接受添加事件的消息(并启动所需的 x、y、z 进程)
- 可以接受取消事件的消息,并随后终止事件进程
- 可以被客户端终止
- 可以通过 shell 重新加载其代码。
客户端
- 订阅事件服务器并接收消息通知。因此,设计一堆都订阅事件服务器的客户端应该很容易。这些客户端中的每一个都可能是通往上面提到的不同交互点的网关(GUI、网页、即时消息软件、电子邮件等)。
- 要求服务器添加事件及其所有详细信息
- 要求服务器取消事件
- 监控服务器(以了解它是否已关闭)
- 在需要时关闭事件服务器
x、y、z
- 代表待触发的通知(它们基本上只是链接到事件服务器的计时器)
- 在时间到时向事件服务器发送消息
- 接收取消消息并死亡
请注意,所有客户端(IM、邮件等,在本手册中未实现)都会收到有关所有事件的通知,取消不是要向客户端发出警告的内容。在这里,软件是为我和你编写的,并且假设只有一个用户运行它。
以下是包含所有可能消息的更复杂图表
这代表了我们将拥有的每个进程。通过在其中绘制所有箭头并说它们是消息,我们编写了一个高级协议,或者至少是它的骨架。
需要注意的是,在实际应用中,使用每个事件一个进程来提醒可能会过于复杂,难以扩展。但是,对于你将作为唯一用户使用的应用程序,这已经足够了。另一种方法可以使用诸如 timer:send_after/2-3 之类的函数来避免生成过多的进程。
定义协议
既然我们知道了每个组件必须做什么以及如何通信,那么一个好主意是列出将要发送的所有消息,并指定它们的外观。首先让我们从客户端和事件服务器之间的通信开始
这里我选择使用两个监视器,因为客户端和服务器之间没有明显的依赖关系。我的意思是,当然,客户端没有服务器就无法工作,但服务器可以在没有客户端的情况下生存。一个链接可以在这里完成工作,但因为我们希望我们的系统可以扩展到许多客户端,所以我们不能假设其他客户端在服务器死亡时都想要崩溃。我们也不能假设客户端真的可以被转化为系统进程并在服务器死亡时捕获退出。现在到下一个消息集
这将一个事件添加到事件服务器。除非出现错误(例如,TimeOut 格式错误),否则会以 ok
原子的形式发送回确认。相反的操作,删除事件,可以按如下方式完成
然后,事件服务器可以在事件到期时发送通知
然后,我们只需要以下两种特殊情况,用于我们想要关闭服务器或服务器崩溃时
当服务器死亡时不会发送直接确认,因为监视器已经会警告我们。这基本上是客户端和事件服务器之间发生的所有事情。现在,对于事件服务器和事件进程本身之间的消息。
在此之前,需要注意的是,将事件服务器链接到事件将非常有用。这样做的原因是,我们希望所有事件在服务器死亡时都死亡:没有服务器,它们就没有意义。
好的,回到事件。当事件服务器启动它们时,它会给每个事件一个特殊的标识符(事件的名称)。一旦这些事件之一的时间到了,它需要发送一条消息来说明这一点
另一方面,事件必须注意来自事件服务器的取消调用
应该就是这样。我们的协议还需要最后一条消息,用于升级服务器
不需要回复。当我们实际编程该功能时,我们将看到原因,你也会发现它是有道理的。
有了协议定义和进程层次结构的总体思路,我们就可以真正开始着手这个项目了。
奠定基础
首先,我们应该建立一个标准的 Erlang 目录结构,如下所示
ebin/ include/ priv/ src/
ebin/
目录是编译后的文件存放的地方。include/
目录用于存储要由其他应用程序包含的 .hrl
文件;私有 .hrl
文件通常保存在 src/
目录中。priv/
目录用于可能需要与 Erlang 交互的可执行文件,例如特定驱动程序等。我们不会在这个项目中实际使用该目录。然后是最后一个,src/
目录,所有 .erl
文件都保存在这里。
在标准的 Erlang 项目中,这个目录结构可能略有不同。可以添加 conf/
目录用于特定配置文件,doc/
用于文档,lib/
用于应用程序运行所需的第三方库。市场上的不同 Erlang 产品通常使用与这些不同的名称,但上面提到的四个通常保持不变,因为它们是 标准 OTP 实践 的一部分。
事件模块
进入 src/
目录并启动 event.erl 模块,它将实现前面图示中的 x、y、z 事件。我从这个模块开始,因为它依赖最少:我们可以在不需要实现事件服务器或客户端功能的情况下尝试运行它。
在真正编写代码之前,我必须提一下协议是不完整的。它有助于表示将从一个进程发送到另一个进程的数据,但不是它的复杂性:寻址方式如何工作、我们是否使用引用或名称等等。大多数消息将以 {Pid, Ref, Message}
的形式包装,其中 Pid 是发送者,Ref 是一个唯一的消息标识符,用于帮助了解哪个回复来自谁。如果我们在寻找回复之前发送了许多消息,我们将不知道哪个回复与哪个消息相匹配,除非有引用。
所以,我们开始吧。运行 event.erl
代码的进程的核心将是 loop/1
函数,如果你还记得协议,它看起来有点像以下骨架
loop(State) -> receive {Server, Ref, cancel} -> ... after Delay -> ... end.
这显示了我们必须支持的超时以宣布事件已结束,以及服务器如何调用取消事件的方式。你会注意到循环中有一个变量 State。State 变量必须包含数据,例如超时值(以秒为单位)和事件的名称(以便发送消息 {done, Id}
)。它还需要知道事件服务器的 pid,以便向其发送通知。
所有这些都适合保存在循环的状态中。因此,让我们在文件开头声明一个 state
记录
-module(event). -compile(export_all). -record(state, {server, name="", to_go=0}).
定义好这个状态后,就可以进一步细化循环了
loop(S = #state{server=Server}) -> receive {Server, Ref, cancel} -> Server ! {Ref, ok} after S#state.to_go*1000 -> Server ! {done, S#state.name} end.
在这里,乘以一千是将 to_go
值从秒转换为毫秒。
不要喝太多酷乐 aid
语言问题即将出现!我之所以在函数头中绑定变量 'Server',是因为它在 receive 部分的模式匹配中使用。记住,记录是黑客行为! 表达式 S#state.server
实际上会扩展为 element(2, S)
,这不是一个有效的模式匹配对象。
这在 after
部分之后仍然适用于 S#state.to_go
,因为该部分可以是稍后计算的表达式。
现在测试循环
6> c(event). {ok,event} 7> rr(event, state). [state] 8> spawn(event, loop, [#state{server=self(), name="test", to_go=5}]). <0.60.0> 9> flush(). ok 10> flush(). Shell got {done,"test"} ok 11> Pid = spawn(event, loop, [#state{server=self(), name="test", to_go=500}]). <0.64.0> 12> ReplyRef = make_ref(). #Ref<0.0.0.210> 13> Pid ! {self(), ReplyRef, cancel}. {<0.50.0>,#Ref<0.0.0.210>,cancel} 14> flush(). Shell got {#Ref<0.0.0.210>,ok} ok
这里有很多东西要看。首先,我们使用`rr(Mod)`从事件模块导入记录。然后,我们使用shell作为服务器( `self()` )生成事件循环。这个事件应该在5秒后触发。第9个表达式在3秒后运行,第10个表达式在6秒后运行。你可以看到我们在第二次尝试时收到了 `{done, "test"}` 消息。
紧随其后,我尝试了取消功能(有充足的500秒来输入它)。你可以看到我创建了引用,发送了消息并得到了一个带有相同引用的回复,所以我知道我收到的`ok` 来自这个进程,而不是系统上的任何其他进程。
取消消息用引用包装而`done` 消息没有的原因很简单:我们不希望它来自任何特定的地方(任何地方都可以,我们不会匹配接收),也不应该回复它。我之前想做另一个测试。明年会发生事件吗?
15> spawn(event, loop, [#state{server=self(), name="test", to_go=365*24*60*60}]). <0.69.0> 16> =ERROR REPORT==== DD-MM-YYYY::HH:mm:SS === Error in process <0.69.0> with exit value: {timeout_value,[{event,loop,1}]}
哎呀。看起来我们遇到了实现限制。事实证明,Erlang 的超时值限制在约 50 天的毫秒。这可能并不重要,但我出于以下三个原因显示此错误
- 在写模块和测试模块的时候,在本章的一半的时候,它就让我很头疼。
- Erlang 当然不是每个任务的完美选择,我们在这里看到的是在实现者没有预期的使用方式中使用计时器的后果。
- 这不是真正的问题;让我们找到解决方法。
我决定为此应用的修复是编写一个函数,如果超时值过长,则将其拆分为多个部分。这也需要 `loop/1` 函数的支持。所以,拆分时间的方法基本上是将其分成 49 天的等份(因为限制大约是 50 天),然后将余数与所有这些等份放在一起。秒列表的总和现在应该是原始时间。
%% Because Erlang is limited to about 49 days (49*24*60*60*1000) in %% milliseconds, the following function is used normalize(N) -> Limit = 49*24*60*60, [N rem Limit | lists:duplicate(N div Limit, Limit)].
函数 `lists:duplicate/2` 将以给定的表达式作为第二个参数,并根据第一个参数的值(`[a,a,a] = lists:duplicate(3, a)`)复制它。如果我们要向 `normalize/1` 发送值 `98*24*60*60+4`,它将返回 `[4,4233600,4233600]`。`loop/1` 函数现在应该像这样看起来,以适应新的格式
%% Loop uses a list for times in order to go around the ~49 days limit %% on timeouts. loop(S = #state{server=Server, to_go=[T|Next]}) -> receive {Server, Ref, cancel} -> Server ! {Ref, ok} after T*1000 -> if Next =:= [] -> Server ! {done, S#state.name}; Next =/= [] -> loop(S#state{to_go=Next}) end end.
你可以尝试一下,它应该像往常一样工作,但现在支持几年甚至几十年的超时。它的工作原理是获取 `to_go` 列表的第一个元素,并等待其整个持续时间。完成后,将验证超时列表的下一个元素。如果为空,则超时结束,并通知服务器。否则,循环将继续使用列表的其余部分,直到完成。
每次启动事件进程时都必须手动调用 `event:normalize(N)` 之类的东西,这会非常令人讨厌,尤其是因为我们的解决方法不应该让使用我们代码的程序员担心。标准方法是使用一个 `init` 函数来处理循环函数正常工作所需的所有数据的初始化。我们正在做这件事的时候,我们还会添加标准的 `start` 和 `start_link` 函数
start(EventName, Delay) -> spawn(?MODULE, init, [self(), EventName, Delay]). start_link(EventName, Delay) -> spawn_link(?MODULE, init, [self(), EventName, Delay]). %%% Event's innards init(Server, EventName, Delay) -> loop(#state{server=Server, name=EventName, to_go=normalize(Delay)}).
现在界面更加干净。在测试之前,最好让我们唯一可以发送的消息(取消)也有自己的接口函数
cancel(Pid) -> %% Monitor in case the process is already dead Ref = erlang:monitor(process, Pid), Pid ! {self(), Ref, cancel}, receive {Ref, ok} -> erlang:demonitor(Ref, [flush]), ok; {'DOWN', Ref, process, Pid, _Reason} -> ok end.
哦!一个新技巧!这里我使用了一个监控器来查看进程是否存在。如果进程已经死了,我会避免无用的等待时间,并根据协议返回 `ok`。如果进程回复了引用,那么我知道它很快就会死:我删除了引用以避免在不再关心它们时收到它们。请注意,我还提供了 `flush` 选项,它将在我们没有时间取消监控之前清除 `DOWN` 消息。
让我们测试一下这些。
17> c(event). {ok,event} 18> f(). ok 19> event:start("Event", 0). <0.103.0> 20> flush(). Shell got {done,"Event"} ok 21> Pid = event:start("Event", 500). <0.106.0> 22> event:cancel(Pid). ok
它能工作!事件模块中最后一个令人讨厌的事情是,我们必须输入剩余的秒数。如果我们可以使用标准格式(例如 Erlang 的日期时间(`{{Year, Month, Day}, {Hour, Minute, Second}}`))会好得多。只需添加以下函数,该函数将计算您计算机上的当前时间与您插入的延迟之间的差值
time_to_go(TimeOut={{_,_,_}, {_,_,_}}) -> Now = calendar:local_time(), ToGo = calendar:datetime_to_gregorian_seconds(TimeOut) - calendar:datetime_to_gregorian_seconds(Now), Secs = if ToGo > 0 -> ToGo; ToGo =< 0 -> 0 end, normalize(Secs).
哦,是的。`calendar 模块` 有非常奇怪的函数名。如上所述,这计算了现在到事件应该触发时的时间差。如果事件已经过去,我们返回 `0`,以便它尽快通知服务器。现在修复 `init` 函数以调用此函数而不是 `normalize/1`。如果希望名称更具描述性,还可以将 `Delay` 变量重命名为 `DateTime`
init(Server, EventName, DateTime) -> loop(#state{server=Server, name=EventName, to_go=time_to_go(DateTime)}).
现在已经完成了,我们可以休息一下了。启动一个新的事件,去喝一杯(半升)牛奶/啤酒,然后及时回来,看看事件消息的到来。
事件服务器
让我们处理 `事件服务器`。根据协议,它的骨架应该看起来像这样
-module(evserv). -compile(export_all). loop(State) -> receive {Pid, MsgRef, {subscribe, Client}} -> ... {Pid, MsgRef, {add, Name, Description, TimeOut}} -> ... {Pid, MsgRef, {cancel, Name}} -> ... {done, Name} -> ... shutdown -> ... {'DOWN', Ref, process, _Pid, _Reason} -> ... code_change -> ... Unknown -> io:format("Unknown message: ~p~n",[Unknown]), loop(State) end.
你会注意到,我已经将需要回复的调用包装在与之前相同的 `{Pid, Ref, Message}` 格式中。现在,服务器需要在其状态中保留两件事:订阅客户端的列表和它生成的事件进程的列表。如果你注意到了,协议中说,当一个事件完成时,事件服务器应该接收 `{done, Name}`,但发送 `{done, Name, Description}`。这里的想法是尽可能减少流量,并且只让事件进程关心绝对必要的内容。所以,是客户端列表和事件列表。
-record(state, {events, %% list of #event{} records clients}). %% list of Pids -record(event, {name="", description="", pid, timeout={{1970,1,1},{0,0,0}}}).
现在循环在它的头部有记录定义
loop(S = #state{}) -> receive ... end.
如果事件和客户端都是 orddicts,那就太好了。我们不太可能一次拥有数百个。如果你还记得关于 `数据结构` 的章节,orddicts 非常适合这种需求。我们将编写一个 `init` 函数来处理这个问题
init() -> %% Loading events from a static file could be done here. %% You would need to pass an argument to init telling where the %% resource to find the events is. Then load it from here. %% Another option is to just pass the events straight to the server %% through this function. loop(#state{events=orddict:new(), clients=orddict:new()}).
完成了骨架和初始化后,我将逐个实现每个消息。第一个消息是关于订阅的消息。我们想要保留所有订阅者的列表,因为当一个事件完成时,我们必须通知他们。此外,上面的协议提到我们应该监控他们。这是有道理的,因为我们不希望保留崩溃的客户端,并且毫无理由地发送无用的消息。无论如何,它应该看起来像这样
{Pid, MsgRef, {subscribe, Client}} -> Ref = erlang:monitor(process, Client), NewClients = orddict:store(Ref, Client, S#state.clients), Pid ! {MsgRef, ok}, loop(S#state{clients=NewClients});
所以 `loop/1` 的这一部分的作用是启动一个监控器,并将客户端信息存储在以 `Ref` 为键的 orddict 中。这样做的原因很简单:我们唯一需要获取客户端 ID 的其他时间是当我们收到监控器的 `EXIT` 消息时,该消息将包含引用(这将让我们摆脱 orddict 的条目)。
接下来要关心的消息是添加事件的消息。现在,有可能返回一个错误状态。我们将执行的唯一验证是检查我们接受的时间戳。虽然很容易订阅 `{{Year,Month,Day}, {Hour,Minute,seconds}}` 布局,但我们必须确保不会做一些事情,比如在不是闰年的时候接受 2 月 29 日的事件,或者任何其他不存在的日期。此外,我们不希望接受像“5 小时,减去 1 分钟和 75 秒”这样的不可能的日期值。一个函数可以处理所有这些验证。
我们将使用的第一个构建块是函数 `calendar:valid_date/1`。顾名思义,它会检查日期是否有效。可悲的是,calendar 模块的怪异之处不仅限于奇怪的名称:实际上没有函数可以确认 `{H,M,S}` 有效的值。我们必须按照奇怪的命名方案来实现它。
valid_datetime({Date,Time}) -> try calendar:valid_date(Date) andalso valid_time(Time) catch error:function_clause -> %% not in {{Y,M,D},{H,Min,S}} format false end; valid_datetime(_) -> false. valid_time({H,M,S}) -> valid_time(H,M,S). valid_time(H,M,S) when H >= 0, H < 24, M >= 0, M < 60, S >= 0, S < 60 -> true; valid_time(_,_,_) -> false.
`valid_datetime/1` 函数现在可以在我们尝试添加消息的部分中使用
{Pid, MsgRef, {add, Name, Description, TimeOut}} -> case valid_datetime(TimeOut) of true -> EventPid = event:start_link(Name, TimeOut), NewEvents = orddict:store(Name, #event{name=Name, description=Description, pid=EventPid, timeout=TimeOut}, S#state.events), Pid ! {MsgRef, ok}, loop(S#state{events=NewEvents}); false -> Pid ! {MsgRef, {error, bad_timeout}}, loop(S) end;
如果时间有效,我们生成一个新的事件进程,然后将它的数据存储在事件服务器的状态中,然后再向调用者发送确认。如果超时错误,我们会通知客户端,而不是让错误静默地传递或使服务器崩溃。可以添加其他检查来检查名称冲突或其他限制(只需记住更新协议文档!)。
协议中定义的下一条消息是我们取消事件的消息。取消事件在客户端侧永远不会失败,因此代码在那里更简单。只需检查事件是否在进程的状态记录中。如果是,请使用我们定义的 `event:cancel/1` 函数来杀死它并发送 `ok`。如果找不到,只需告诉用户一切都正常——事件没有运行,这就是用户想要的。
{Pid, MsgRef, {cancel, Name}} -> Events = case orddict:find(Name, S#state.events) of {ok, E} -> event:cancel(E#event.pid), orddict:erase(Name, S#state.events); error -> S#state.events end, Pid ! {MsgRef, ok}, loop(S#state{events=Events});
很好,很好。所以现在所有来自客户端到事件服务器的自愿交互都已涵盖。让我们处理服务器和事件本身之间发生的事情。有两个消息要处理:取消事件(已完成)和事件超时。该消息只是 `{done, Name}`
{done, Name} -> case orddict:find(Name, S#state.events) of {ok, E} -> send_to_clients({done, E#event.name, E#event.description}, S#state.clients), NewEvents = orddict:erase(Name, S#state.events), loop(S#state{events=NewEvents}); error -> %% This may happen if we cancel an event and %% it fires at the same time loop(S) end;
函数 `send_to_clients/2` 顾名思义,定义如下
send_to_clients(Msg, ClientDict) -> orddict:map(fun(_Ref, Pid) -> Pid ! Msg end, ClientDict).
这应该是大部分循环代码。剩下的就是设置不同的状态消息:客户端关闭、关闭、代码升级等。它们来了
shutdown -> exit(shutdown); {'DOWN', Ref, process, _Pid, _Reason} -> loop(S#state{clients=orddict:erase(Ref, S#state.clients)}); code_change -> ?MODULE:loop(S); Unknown -> io:format("Unknown message: ~p~n",[Unknown]), loop(S)
第一个情况(`shutdown`)非常明确。你会收到 kill 消息,让进程死亡。如果你想将状态保存到磁盘,这可能是做这件事的地方。如果你想要更安全的保存/退出语义,可以在每个 `add`、`cancel` 或 `done` 消息中完成此操作。然后可以在 `init` 函数中从磁盘加载事件,并根据它们生成。
`'DOWN'` 消息的动作也足够简单。这意味着一个客户端死了,所以我们将其从状态中的客户端列表中删除。
未知消息将仅使用 `io:format/2` 显示以进行调试,尽管实际生产应用程序可能会使用专门的日志记录模块
代码更改消息来了。它足够有趣,让我专门为它开辟一个部分。
热代码爱好
为了进行热代码加载,Erlang 有一个叫做“代码服务器”的东西。代码服务器基本上是一个负责 `ETS 表`(内存数据库表,是 VM 本地)的 VM 进程。代码服务器可以在内存中保存一个模块的两个版本,并且两个版本都可以同时运行。使用 `c(Module)` 编译、使用 `l(Module)` 加载或使用 `code 模块` 的许多函数之一加载模块的新版本时,将自动加载该模块的新版本。
需要理解的一个概念是,Erlang 既有本地调用,也有外部调用。本地调用是指对可能未导出函数的函数调用。它们只是 Atom(Args)
格式。另一方面,外部调用只能通过导出函数完成,并且具有 Module:Function(Args)
格式。
当 VM 中加载了两个版本的模块时,所有本地调用都通过进程中当前正在运行的版本完成。但是,外部调用始终在代码服务器中可用的最新版本代码上完成。然后,如果从外部调用中进行本地调用,则它们位于代码的新版本中。
鉴于 Erlang 中的每个进程/actor 都需要进行递归调用才能改变其状态,因此可以通过进行外部递归调用来加载 actor 的全新版本。
注意:如果您在进程仍在使用第一个版本的模块运行时加载了模块的第三个版本,那么该进程将被 VM 杀死,VM 假设该进程是一个没有主管或升级自己的方式的孤儿进程。如果没有人运行最旧的版本,它将被简单地删除,而保留最新的版本。
有一些方法可以将自己绑定到一个系统模块,该模块将在加载模块的新版本时发送消息。通过这样做,您可以在收到此类消息时触发模块重新加载,并且始终使用代码升级函数(例如 MyModule:Upgrade(CurrentState)
)来执行此操作,该函数将能够根据新版本的规范转换状态数据结构。这种“订阅”处理由 OTP 框架自动完成,我们很快就会开始学习它。对于提醒应用程序,我们将不使用代码服务器,而是使用来自 shell 的自定义 code_change
消息,进行非常基本的重新加载。这几乎是您需要了解的有关热代码加载的所有内容。不过,这里有一个更通用的示例
-module(hotload). -export([server/1, upgrade/1]). server(State) -> receive update -> NewState = ?MODULE:upgrade(State), ?MODULE:server(NewState); %% loop in the new version of the module SomeMessage -> %% do something here server(State) %% stay in the same version no matter what. end. upgrade(OldState) -> %% transform and return the state here.
如您所见,我们的 ?MODULE:loop(S)
符合此模式。
我说,隐藏您的消息
隐藏消息!如果您希望人们构建您的代码和进程,您必须在接口函数中隐藏消息。以下是我们用于 evserv
模块的内容
start() -> register(?MODULE, Pid=spawn(?MODULE, init, [])), Pid. start_link() -> register(?MODULE, Pid=spawn_link(?MODULE, init, [])), Pid. terminate() -> ?MODULE ! shutdown.
我决定注册服务器模块,因为目前我们应该只运行一个。如果您要扩展提醒功能以支持许多用户,最好将名称注册到 全局模块 或 gproc 库。为了这个示例应用程序的缘故,这将足够了。
我们编写的第一个消息是我们应该抽象出来的下一个消息:如何订阅。我在上面编写的小协议或规范要求使用监控器,因此它已在此处添加。在任何时候,如果订阅消息返回的引用位于 DOWN
消息中,客户端将知道服务器已关闭。
subscribe(Pid) -> Ref = erlang:monitor(process, whereis(?MODULE)), ?MODULE ! {self(), Ref, {subscribe, Pid}}, receive {Ref, ok} -> {ok, Ref}; {'DOWN', Ref, process, _Pid, Reason} -> {error, Reason} after 5000 -> {error, timeout} end.
下一个是事件添加
add_event(Name, Description, TimeOut) -> Ref = make_ref(), ?MODULE ! {self(), Ref, {add, Name, Description, TimeOut}}, receive {Ref, Msg} -> Msg after 5000 -> {error, timeout} end.
请注意,我选择将可能接收到的 {error, bad_timeout}
消息转发给客户端。我也可以决定通过引发 erlang:error(bad_timeout)
来使客户端崩溃。在社区中,关于是否应该使客户端崩溃或转发错误消息仍然存在争议。以下是替代的崩溃函数
add_event2(Name, Description, TimeOut) -> Ref = make_ref(), ?MODULE ! {self(), Ref, {add, Name, Description, TimeOut}}, receive {Ref, {error, Reason}} -> erlang:error(Reason); {Ref, Msg} -> Msg after 5000 -> {error, timeout} end.
然后是事件取消,它只接受一个名称
cancel(Name) -> Ref = make_ref(), ?MODULE ! {self(), Ref, {cancel, Name}}, receive {Ref, ok} -> ok after 5000 -> {error, timeout} end.
最后,为客户端提供了一个小的便利,一个用于在给定时间段内累积所有消息的函数。如果找到消息,它们将全部被获取,并且函数尽快返回
listen(Delay) -> receive M = {done, _Name, _Description} -> [M | listen(0)] after Delay*1000 -> [] end.
试驾
您现在应该能够编译应用程序并进行测试运行。为了简化操作,我们将编写一个特定的 Erlang Makefile 来构建项目。打开一个名为 Emakefile
的文件,并将其放在项目的基目录中。该文件包含 Erlang 项,并为 Erlang 编译器提供烘焙美味酥脆 .beam
文件的配方
{'src/*', [debug_info, {i, "src"}, {i, "include"}, {outdir, "ebin"}]}.
这告诉编译器将调试信息添加到文件中(这很少是您想放弃的选项),在 src/
和 include/
目录中查找文件,并将它们输出到 ebin/
中。
现在,通过进入命令行并运行 erl -make
,所有文件都应该被编译并放入 ebin/
目录中。通过执行 erl -pa ebin/
启动 Erlang shell。-pa <directory>
选项告诉 Erlang VM 将该路径添加到它可以在其中查找模块的位置。
另一种选择是像往常一样启动 shell 并调用 make:all([load])
。这将在当前目录中查找名为“Emakefile”的文件,重新编译它(如果它已更改),并加载新文件。
您现在应该能够跟踪数千个事件(只需将 DateTime 变量替换为您在编写文本时有意义的任何内容即可)
1> evserv:start(). <0.34.0> 2> evserv:subscribe(self()). {ok,#Ref<0.0.0.31>} 3> evserv:add_event("Hey there", "test", FutureDateTime). ok 4> evserv:listen(5). [] 5> evserv:cancel("Hey there"). ok 6> evserv:add_event("Hey there2", "test", NextMinuteDateTime). ok 7> evserv:listen(2000). [{done,"Hey there2","test"}]
非常好。鉴于我们创建的几个基本接口函数,编写任何客户端现在应该足够简单了。
添加监督
为了成为一个更稳定的应用程序,我们应该编写另一个“重启器”,就像我们在上一章中做的那样。打开一个名为 sup.erl 的文件,我们的主管将位于其中
-module(sup). -export([start/2, start_link/2, init/1, loop/1]). start(Mod,Args) -> spawn(?MODULE, init, [{Mod, Args}]). start_link(Mod,Args) -> spawn_link(?MODULE, init, [{Mod, Args}]). init({Mod,Args}) -> process_flag(trap_exit, true), loop({Mod,start_link,Args}). loop({M,F,A}) -> Pid = apply(M,F,A), receive {'EXIT', _From, shutdown} -> exit(shutdown); % will kill the child too {'EXIT', Pid, Reason} -> io:format("Process ~p exited for reason ~p~n",[Pid,Reason]), loop({M,F,A}) end.
这与“重启器”有些类似,虽然这个更通用一些。它可以接受任何模块,只要它具有 start_link
函数。它将无限期地重启它监视的进程,除非主管本身被终止信号终止。以下是在使用中的情况
1> c(evserv), c(sup). {ok,sup} 2> SupPid = sup:start(evserv, []). <0.43.0> 3> whereis(evserv). <0.44.0> 4> exit(whereis(evserv), die). true Process <0.44.0> exited for reason die 5> exit(whereis(evserv), die). Process <0.48.0> exited for reason die true 6> exit(SupPid, shutdown). true 7> whereis(evserv). undefined
如您所见,杀死主管也会杀死它的子进程。
注意:我们将在有关 OTP 主管的章节中看到更高级和更灵活的主管。当人们提到监督树时,他们所指的是那些主管。这里展示的主管只是最基本的形式,与真正的主管相比,它并不完全适合生产环境。
命名空间(或缺乏命名空间)
由于 Erlang 具有扁平的模块结构(没有层次结构),因此一些应用程序经常发生冲突。一个例子是常用的 user
模块,几乎每个项目都至少尝试定义一次。这与 Erlang 附带的 user
模块发生冲突。您可以使用函数 code:clash/0
测试任何冲突。
因此,常见的模式是用项目的名称为每个模块名添加前缀。在本例中,我们的提醒应用程序的模块应重命名为 reminder_evserv
、reminder_sup
和 reminder_event
。
然后,一些程序员决定添加一个以应用程序本身命名的模块,该模块包装了程序员在使用自己的应用程序时可以使用的一些通用调用。示例调用可以是诸如使用主管启动应用程序、订阅服务器、添加和取消事件等函数。
了解其他命名空间也很重要,例如不能冲突的注册名称、数据库表等等。
这几乎是关于非常基本的并发 Erlang 应用程序的全部内容。这个例子表明我们可以在不费力的情况下拥有大量并发进程:主管、客户端、服务器、用作计时器的进程(我们可以拥有数千个这样的进程)等等。无需同步它们,无需锁,无需真正的主循环。消息传递使我们的应用程序轻松地划分为几个具有独立关注点和任务的模块。
现在可以使用 evserv.erl 中的基本调用来构建客户端,这些客户端允许从 Erlang VM 外部的某个地方与事件服务器进行交互,并使程序真正有用。
但是,在此之前,我建议您阅读有关 OTP 框架的知识。接下来的几章将介绍它的一些构建块,这些构建块将允许创建更强大、更优雅的应用程序。Erlang 力量的很大一部分来自使用它。它是一个精心设计且经过精心设计的工具,任何自尊的 Erlang 程序员都必须了解它。