更多关于多进程

声明你的状态

a roasted turkey leg

上一章中展示的例子,对于演示目的来说都很好,但是你仅仅依靠这些工具,不会走得太远。并不是说这些例子不好,而是说,如果进程和演员只是带消息的函数,那么它们没有太大的优势。为了解决这个问题,我们必须能够在进程中保存状态。

让我们先在一个新的 kitchen.erl 模块中创建一个函数,让进程像冰箱一样工作。该进程将允许两种操作:将食物存入冰箱和从冰箱中取出食物。只有事先存放的食物才能取出。以下函数可以作为我们进程的基础

-module(kitchen).
-compile(export_all).

fridge1() ->
    receive
        {From, {store, _Food}} ->
            From ! {self(), ok},
            fridge1();
        {From, {take, _Food}} ->
            %% uh....
            From ! {self(), not_found},
            fridge1();
        terminate ->
            ok
    end.

它出了问题。当我们要求存储食物时,进程应该回复 ok,但实际上并没有存储食物;fridge1() 被调用,然后该函数从头开始,没有状态。你也可以看到,当我们调用进程从冰箱中取食物时,没有状态可供取出,所以唯一能回复的就是 not_found。为了存储和取出食物,我们需要在函数中添加状态。

借助递归,进程的状态可以完全保存在函数的参数中。就我们的冰箱进程而言,一种可能性是将所有食物存储为一个列表,然后当有人需要吃东西的时候再从列表中查找

fridge2(FoodList) ->
    receive
        {From, {store, Food}} ->
            From ! {self(), ok},
            fridge2([Food|FoodList]);
        {From, {take, Food}} ->
            case lists:member(Food, FoodList) of
                true ->
                    From ! {self(), {ok, Food}},
                    fridge2(lists:delete(Food, FoodList));
                false ->
                    From ! {self(), not_found},
                    fridge2(FoodList)
            end;
        terminate ->
            ok
    end.

首先要注意的是,fridge2/1 接受一个参数,FoodList。你可以看到,当我们发送一条与 {From, {store, Food}} 匹配的消息时,该函数将在继续执行之前将 Food 添加到 FoodList 中。完成递归调用后,就可以检索同一项了。事实上,我已经在那里实现了它。该函数使用 lists:member/2 来检查 Food 是否是 FoodList 的一部分。根据结果,该项会被发送回调用进程(并从 FoodList 中删除),或者发送回 not_found

1> c(kitchen).
{ok,kitchen}
2> Pid = spawn(kitchen, fridge2, [[baking_soda]]).
<0.51.0>
3> Pid ! {self(), {store, milk}}.
{<0.33.0>,{store,milk}}
4> flush().
Shell got {<0.51.0>,ok}
ok

在冰箱中存放物品似乎可以正常工作。我们会尝试更多的东西,然后尝试从冰箱中取出。

5> Pid ! {self(), {store, bacon}}.
{<0.33.0>,{store,bacon}}
6> Pid ! {self(), {take, bacon}}.
{<0.33.0>,{take,bacon}}
7> Pid ! {self(), {take, turkey}}.
{<0.33.0>,{take,turkey}}
8> flush().
Shell got {<0.51.0>,ok}
Shell got {<0.51.0>,{ok,bacon}}
Shell got {<0.51.0>,not_found}
ok

正如预期的那样,我们可以从冰箱中取出培根,因为我们首先放入了它(还有牛奶和小苏打),但当我们要求一些火鸡时,冰箱进程没有找到。这就是为什么我们得到最后一条 {<0.51.0>,not_found} 消息的原因。

我们喜欢消息,但我们对它们保密

在前面的例子中,一件令人讨厌的事情是,使用冰箱的程序员必须知道为该进程设计的协议。这是不必要的负担。一个好的解决办法是使用处理接收和发送消息的函数来抽象消息

store(Pid, Food) ->
    Pid ! {self(), {store, Food}},
    receive
        {Pid, Msg} -> Msg
    end.

take(Pid, Food) ->
    Pid ! {self(), {take, Food}},
    receive
        {Pid, Msg} -> Msg
    end.

现在与进程的交互更加简洁

9> c(kitchen).
{ok,kitchen}
10> f().
ok
11> Pid = spawn(kitchen, fridge2, [[baking_soda]]).
<0.73.0>
12> kitchen:store(Pid, water).
ok
13> kitchen:take(Pid, water).
{ok,water}
14> kitchen:take(Pid, juice).
not_found

我们不再需要关心消息是如何工作的,也不需要关心是否发送 self() 或者像 takestore 这样的精确原子:只需要一个 pid 以及知道要调用哪些函数。这隐藏了所有脏工作,并使冰箱进程更容易构建。

剩下的一件事是要隐藏掉需要生成进程的整个部分。我们处理了隐藏消息,但我们仍然期望用户处理进程的创建。我将添加以下 start/1 函数

start(FoodList) ->
    spawn(?MODULE, fridge2, [FoodList]).
Two tin cans with a string, where the tin cans somehow represent the abstraction layer between the vibrating string and the voice

在这里,?MODULE 是一个返回当前模块名称的宏。看起来好像编写这样的函数没有任何优势,但实际上确实有一些。它最主要的部分是与调用 take/2store/2 保持一致:关于冰箱进程的一切现在都由 kitchen 模块处理。如果你要在冰箱进程启动时添加日志记录,或者启动第二个进程(例如冷冻柜),那么在我们的 start/1 函数中很容易实现。但是,如果让用户通过 spawn/3 来完成生成,那么每个启动冰箱的地方都需要添加新的调用。这容易出错,而且错误很糟糕。

让我们看看这个函数是如何使用的

15> f().
ok
16> c(kitchen).
{ok,kitchen}
17> Pid = kitchen:start([rhubarb, dog, hotdog]).
<0.84.0>
18> kitchen:take(Pid, dog).
{ok,dog}
19> kitchen:take(Pid, dog).
not_found

耶!狗狗从冰箱里跑出来了,我们的抽象也完成了!

超时

让我们尝试使用 pid(A,B,C) 命令做一些事情,它让我们可以将 3 个整数 ABC 转换为一个 pid。在这里,我们将故意给 kitchen:take/2 提供一个假的 pid

20> kitchen:take(pid(0,250,0), dog).

哎呀。shell 冻结了。这是因为 take/2 的实现方式。为了理解发生了什么,让我们先回顾一下正常情况下会发生什么

  1. 一条取食物的消息从你(shell)发送到冰箱进程;
  2. 你的进程切换到接收模式并等待新消息;
  3. 冰箱取出物品并发送到你的进程;
  4. 你的进程接收它,并继续执行它的任务。
Hourglass

以下是 shell 冻结时发生的事情

  1. 一条取食物的消息从你(shell)发送到一个未知进程;
  2. 你的进程切换到接收模式并等待新消息;
  3. 该未知进程要么不存在,要么不希望收到这样的消息,因此不会对它进行任何处理;
  4. 你的 shell 进程卡在接收模式中。

这很烦人,尤其是在这里无法进行错误处理的情况下。没有任何非法操作发生,程序只是在等待。一般来说,任何与异步操作有关的东西(Erlang 中的消息传递就是异步操作)都需要一种方法,在一定时间内没有收到数据迹象后就放弃。当页面或图片加载时间过长时,网络浏览器会这样做,当你打电话时有人迟迟不接,或者在会议中迟到时,你也会这样做。Erlang 当然也为此提供了一种合适的机制,它就是 receive 结构的一部分

receive
    Match -> Expression1
after Delay ->
    Expression2
end.

receiveafter 之间的部分与我们已知的完全相同。如果在没有接收到与 Match 模式匹配的消息的情况下,已经花费了与 Delay(表示毫秒的整数)一样多的时间,则 after 部分将被触发。发生这种情况时,将执行 Expression2

我们将编写两个新的接口函数,store2/2take2/2,它们的行为与 store/2take/2 完全相同,只是它们会在 3 秒后停止等待

store2(Pid, Food) ->
    Pid ! {self(), {store, Food}},
    receive
        {Pid, Msg} -> Msg
    after 3000 ->
        timeout
    end.

take2(Pid, Food) ->
    Pid ! {self(), {take, Food}},
    receive
        {Pid, Msg} -> Msg
    after 3000 ->
        timeout
    end.

现在,你可以使用 ^G 解冻 shell,并尝试新的接口函数

User switch command
 --> k 
 --> s
 --> c
Eshell V5.7.5  (abort with ^G)
1> c(kitchen).
{ok,kitchen}
2> kitchen:take2(pid(0,250,0), dog).
timeout

现在它可以正常工作了。

注意:我说 after 只接受毫秒作为值,但实际上它可以接受原子 infinity。虽然这在很多情况下都没有用(你可能干脆删除 after 子句),但在某些情况下,当程序员可以将等待时间提交给一个需要接收结果的函数时,它还是有用的。这样,如果程序员真的想永远等待,他就可以。

除了在时间过长后放弃之外,这种计时器还有其他用途。一个非常简单的例子就是我们之前使用过的 timer:sleep/1 函数是如何工作的。以下是它的实现方式(让我们把它放到一个新的 multiproc.erl 模块中)

sleep(T) ->
    receive
    after T -> ok
    end.

在这个特定情况下,由于没有模式,因此永远不会在 receive 结构的 receive 部分匹配到任何消息。相反,当延迟 T 过去后,after 部分的结构将被调用。

另一种特殊情况是超时为 0

flush() ->
    receive
        _ -> flush()
    after 0 ->
        ok
    end.

当这种情况发生时,Erlang VM 将尝试查找一个与可用模式之一匹配的消息。在上面的例子中,任何东西都匹配。只要有消息,flush/0 函数就会递归调用自身,直到邮箱为空。完成后,将执行代码的 after 0 -> ok 部分,函数返回。

选择性接收

这种“清空”概念使实现选择性接收成为可能,该接收可以通过嵌套调用来优先处理你收到的消息

important() ->
    receive
        {Priority, Message} when Priority > 10 ->
            [Message | important()]
    after 0 ->
        normal()
    end.

normal() ->
    receive
        {_, Message} ->
            [Message | normal()]
    after 0 ->
        []
    end.

这个函数将构建一个包含所有消息的列表,优先级高于 10 的消息排在前面

1> c(multiproc).
{ok,multiproc}
2> self() ! {15, high}, self() ! {7, low}, self() ! {1, low}, self() ! {17, high}.       
{17,high}
3> multiproc:important().
[high,high,low,low]

因为我使用了 after 0 部分,所以每个消息都会被获取,直到没有消息为止,但该进程会尝试在考虑其他消息之前,获取所有优先级高于 10 的消息,这些消息在 normal/0 调用中被累积。

如果你对这种做法感兴趣,请注意,由于 Erlang 中选择性接收的工作方式,它有时是不安全的。

当消息被发送到进程时,它们会被存储在邮箱中,直到进程读取它们并与那里的模式匹配。正如在 上一章 中所说,这些消息是按照接收顺序存储的。这意味着每次你匹配一个消息时,它都会从最旧的消息开始。

然后,最旧的消息会与 receive 中的每个模式进行比较,直到其中一个模式匹配。当它匹配时,该消息将从邮箱中删除,并且进程的代码将正常执行,直到下一个 receive。当下一个 receive 被评估时,VM 会寻找目前在邮箱中最旧的消息(我们删除的消息之后的那个消息),以此类推。

Visual explanation of how message matching is done when a message from the mailbox does match

当没有办法匹配给定的消息时,它会被放到一个保存队列中,然后尝试下一条消息。如果第二条消息匹配,则第一条消息将被放回邮箱顶部,以便稍后重新尝试。

Visual explanation of how messages that won't match are moved back and forth from the mailbox to a save queue

这让你只需要关心有用的消息。忽略一些消息,以便稍后以上面描述的方式处理它们,这就是选择性接收的本质。虽然它们很有用,但问题是,如果你的进程有很多你并不关心的消息,那么读取有用消息实际上会越来越慢(而且进程的大小也会越来越大)。

在上图中,想象一下我们想要第 367 条消息,但前 366 条消息都是我们的代码忽略的垃圾信息。为了得到第 367 条消息,进程需要先尝试匹配前 366 条消息。完成匹配并将其全部放入队列后,将取出第 367 条消息,并将前 366 条消息放回邮箱顶部。下一条有用消息可能埋藏得更深,需要更长的时间才能找到。

这种类型的接收是 Erlang 中性能问题的一个常见原因。如果你的应用程序运行缓慢,并且你意识到有很多消息在四处传递,那么这可能是原因所在。

如果这种选择性接收确实导致你的代码大幅减速,那么首先要问问自己,为什么你收到了你不需要的消息。这些消息是否发送到正确的进程?模式是否正确?消息格式是否不正确?你是否在一个应该有多个进程的地方使用了一个进程?回答一个或多个这些问题可以解决你的问题。

由于存在无用消息污染进程邮箱的风险,Erlang 程序员有时会采取防御措施来防止此类事件发生。一种标准方法可能如下所示

receive
    Pattern1 -> Expression1;
    Pattern2 -> Expression2;
    Pattern3 -> Expression3;
    ...
    PatternN -> ExpressionN;
    Unexpected ->
        io:format("unexpected message ~p~n", [Unexpected])
end.

这样做可以确保任何消息至少匹配一个子句。`Unexpected` 变量将匹配任何内容,将意外消息从邮箱中取出并显示警告。根据你的应用程序,你可能想要将消息存储到某种日志记录设施中,以便以后查找有关它的信息:如果消息发送到错误的进程,那么永久丢失它们并难以查找为什么另一个进程没有收到它应该收到的内容将是一件很糟糕的事情。

如果你确实需要在消息中使用优先级,并且不能使用这种万能子句,那么更明智的做法是实现一个 最小堆 或使用 gb_trees 模块并将每个接收到的消息都放入其中(确保将优先级号放在键的开头,以便它用于对消息进行排序)。然后你就可以根据需要搜索数据结构中 最小最大 元素。

在大多数情况下,这种技术应该让你比选择性接收更有效地接收优先级更高的消息。然而,如果接收到的消息大部分具有最高的优先级,那么它可能会减慢你的速度。和往常一样,诀窍是在优化之前进行分析和测量。

注意: 从 R14A 开始,Erlang 编译器中添加了一个新的优化。它简化了进程之间来回通信的非常具体的用例中的选择性接收。例如 multiproc.erl 中的 optimized/1 函数。

为了使其工作,必须在函数中创建一个引用 (make_ref()),然后将其发送到消息中。在同一个函数中,然后进行选择性接收。如果没有消息可以匹配,除非它包含相同的引用,编译器会自动确保 VM 将跳过在创建该引用之前接收到的消息。

请注意,你不应该尝试将代码强迫适应这种优化。Erlang 开发人员只寻找经常使用的模式,然后使它们更快。如果你编写了惯用的代码,优化应该会自动出现。而不是相反。

理解了这些概念后,下一步将是对多个进程进行错误处理。