套接字桶

A 'hello' enters a tin can and exits from a bucket

到目前为止,我们已经对 Erlang 本身进行了一些有趣的处理,几乎没有与外部世界进行通信,如果只是通过我们在这里和那里阅读的文本文件。虽然很多与自身的关系可能很有趣,但现在是走出我们的巢穴,开始与世界其他地方交谈的时候了。

本章将涵盖使用套接字的三个组成部分:IO 列表、UDP 套接字和 TCP 套接字。IO 列表作为一个主题并不复杂。它们只是一种有效地构建要通过套接字和其他 Erlang 驱动程序发送的字符串的巧妙方法。

IO 列表

我在本指南的前面提到过,对于文本,我们可以使用字符串(整数列表)或二进制文件(包含数据的二进制数据结构)。将“Hello World”等内容发送到网络上可以像这样完成,作为一个字符串:"Hello World",以及作为一个二进制文件:<<"Hello World">>。类似的表示法,类似的结果。

区别在于你如何组装事物。字符串有点像整数的链表:对于每个字符,你都必须存储字符本身以及指向列表其余部分的链接。此外,如果你想在列表中添加元素,无论是在中间还是在末尾,你都必须遍历整个列表直到要修改的位置,然后添加你的元素。然而,当你添加元素到列表开头时,情况并非如此。

A = [a]
B = [b|A] = [b,a]
C = [c|B] = [c,b,a]

在添加元素到列表开头的情况下,如上所述,无论AB还是C中包含什么,都不需要重写。C 的表示可以看作是 [c,b,a][c|B][c,|[b|[a]]],等等。在最后一种情况下,你可以看到A的形状在列表末尾与它被声明时相同。同样适用于B。以下是添加元素到列表末尾时的样子

A = [a]
B = A ++ [b] = [a] ++ [b] = [a|[b]]
C = B ++ [c] = [a|[b]] ++ [c] = [a|[b|[c]]]

你看到所有的重写了吗?当我们创建B时,我们必须重写A。当我们写C时,我们必须重写B(包括它包含的[a|...]部分)。如果我们要以类似的方式添加D,我们将需要重写C。在长字符串中,这变得过于低效,并且会产生大量垃圾,需要 Erlang VM 来清理。

使用二进制文件,情况并不完全相同

A = <<"a">>
B = <<A/binary, "b">> = <<"ab">>
C = <<B/binary, "c">> = <<"abc">>

在这种情况下,二进制文件知道它们自己的长度,数据可以在常数时间内连接。很好,比列表好多了。它们也更紧凑。出于这些原因,我们将来在使用文本时通常会尝试坚持使用二进制文件。

然而,也有一些缺点。二进制文件旨在以特定方式处理事物,修改二进制文件、拆分二进制文件等仍然需要付出一定的代价。此外,有时我们会使用使用字符串、二进制文件和单个字符可互换的代码。在类型之间不断转换将是一件很麻烦的事。

在这种情况下,IO 列表是我们的救星。IO 列表是一种奇怪的数据结构。它们是字节(从 0 到 255 的整数)、二进制文件或其他 IO 列表的列表。这意味着接受 IO 列表的函数可以接受诸如 [$H, $e, [$l, <<"lo">>, " "], [[["W","o"], <<"rl">>]] | [<<"d">>]] 之类的项。当发生这种情况时,Erlang VM 将在需要时扁平化列表,以获取字符序列 Hello World

哪些函数接受这些 IO 列表?大多数与输出数据相关的函数都接受。来自io 模块、file 模块、TCP 和 UDP 套接字的任何函数都能够处理它们。一些库函数,比如来自unicode 模块的某些函数以及来自re(用于正则表达式)模块的所有函数也将处理它们,仅举几例。

尝试使用io:format("~s~n", [IoList]) 在 shell 中运行前面的Hello World IO 列表,看看结果。它应该没有问题地运行。

A guido with an RJ-45 connection head

总的来说,它们是一种非常巧妙的构建字符串的方法,可以避免在动态构建要输出的内容时,不可变数据结构带来的问题。

TCP 和 UDP:协议

我们可以在 Erlang 中使用的第一种套接字基于UDP 协议。UDP 是构建在 IP 层之上的协议,它在 IP 层之上提供了一些抽象,例如端口号。UDP 被称为无状态协议。从 UDP 端口接收的数据被分成小块,没有标记,没有会话,并且不能保证你收到的片段与发送时的顺序相同。事实上,并不能保证如果有人发送了数据包,你是否能接收到它。出于这些原因,人们倾向于在数据包很小、偶尔丢失不会造成太大影响、没有太多复杂的交换发生或绝对需要低延迟时使用 UDP。

这与像TCP这样的有状态协议形成对比,在有状态协议中,协议负责处理丢失的数据包,重新排序它们,维护多个发送方和接收方之间的独立会话等等。TCP 允许可靠地交换信息,但设置起来可能会更慢、更繁重。UDP 会更快,但可靠性较低。根据你的需要仔细选择。

无论如何,在 Erlang 中使用 UDP 相对简单。我们在给定的端口上设置一个套接字,这个套接字既可以发送数据也可以接收数据

Diagram showing a Host A that has ports A, B and C, which can all send and receive packets to other hosts

打个不太恰当的比方,这就像在你家里有一堆邮箱(每个邮箱就是一个端口),每个邮箱里都收到了一些小纸条,上面写着一些小信息。它们可以包含任何内容,从“我喜欢你穿这条裤子的样子”到“纸条是从房子里出来的!”。当一些信息对于纸条来说太大时,它们中的许多就会被丢进邮箱。你需要负责将它们重新组装成有意义的方式,然后开车到某个房子,然后把纸条扔到里面作为回复。如果信息纯粹是通知性的(“嘿,你的门没锁”)或非常小(“你穿什么了?- Ron”),那就没问题,你可以使用一个邮箱来进行所有查询。但是,如果信息很复杂,我们可能希望每个会话使用一个端口,对吧?哎呀,不!使用 TCP!

对于 TCP,协议被称为有状态的、连接型的。在能够发送信息之前,你必须进行握手。这意味着有人要拿一个邮箱(类似于我们在 UDP 类比中的做法),然后发送一条信息说“嘿,伙计,这是 IP 94.25.12.37 正在呼叫。想聊天吗?”,你会以类似“当然。用数字 N 标记你的信息,然后在每个信息后面加上一个递增的数字”的方式回复。从那时起,当你或 IP 92.25.12.37 想互相通信时,就能对纸条进行排序,请求缺少的纸条,回复纸条等等,以有意义的方式进行。

这样,我们可以使用一个邮箱(或端口)来进行所有通信。这就是 TCP 的妙处。它增加了一些开销,但确保一切都按顺序排列、正确交付等等。

如果你不喜欢这些类比,不要绝望,因为我们马上就要看到如何使用 Erlang 的 TCP 和 UDP 套接字。这应该更简单。

UDP 套接字

使用 UDP 只有几个基本操作:设置套接字、发送信息、接收信息和关闭连接。可能性有点像这样

A graph showing that Opening a socket can lead to 3 options: sending data, receiving data, or closing a socket. Sending can lead to receiving data or closing a socket, receiving data can lead to sending data or closing a socket. Finally, closing a socket does nothing

无论如何,第一个操作是打开一个套接字。这可以通过调用gen_udp:open/1-2 来完成。最简单的形式是通过调用{ok, Socket} = gen_udp:open(PortNumber) 来完成。

端口号将是 1 到 65535 之间的任何整数。从 0 到 1023,这些端口被称为系统端口。大多数情况下,除非你有管理员权限,否则你的操作系统会阻止你监听系统端口。从 1024 到 49151 的端口是注册端口。它们通常不需要任何权限,可以随意使用,尽管其中一些端口已注册到众所周知的服务。然后,其余的端口被称为动态私有端口。它们经常用于临时端口。为了进行测试,我们将使用一些相对安全的端口号,例如8789,不太可能被占用。

但在那之前,gen_udp:open/2 呢?第二个参数可以是一个选项列表,指定我们想以什么类型接收数据(listbinary),以及我们想如何接收数据:作为消息({active, true})还是作为函数调用的结果({active, false})。还有更多选项,例如套接字是否应该设置为 IPv4(inet4)或 IPv6(inet6),UDP 套接字是否可以用于广播信息({broadcast, true | false}),缓冲区的大小等等。还有更多可用的选项,但我们现在只关注简单的东西,因为理解其他东西就需要你自己去学习了。不幸的是,本指南是关于 Erlang 的,而不是关于 TCP 和 UDP 的。

所以让我们打开一个套接字。首先启动一个给定的 Erlang shell

1> {ok, Socket} = gen_udp:open(8789, [binary, {active,true}]). 
{ok,#Port<0.676>}
2> gen_udp:open(8789, [binary, {active,true}]).
{error,eaddrinuse}

在第一个命令中,我打开了套接字,命令它返回二进制数据,并且我想要它处于活动状态。你可以看到一个新的数据结构被返回:#Port<0.676>。这是我们刚刚打开的套接字的表示形式。它们的使用方式与 Pids 很相似:你甚至可以为它们设置链接,以便在发生崩溃时,错误会传播到套接字!第二个函数调用尝试再次打开同一个套接字,这是不可能的。这就是为什么返回{error, eaddrinuse}。幸运的是,第一个Socket 套接字仍然是打开的。

无论如何,我们将启动第二个 Erlang shell。在第二个 shell 中,我们将打开第二个 UDP 套接字,使用不同的端口号

1> {ok, Socket} = gen_udp:open(8790).
{ok,#Port<0.587>}
2> gen_udp:send(Socket, {127,0,0,1}, 8789, "hey there!").
ok

啊,一个新的函数!在第二个调用中,使用gen_udp:send/4 来发送消息(真是一个非常描述性的名字)。参数的顺序如下:gen_udp:send(OwnSocket, RemoteAddress, RemotePort, Message)RemoteAddress 可以是一个字符串或一个包含域名(“example.org”)的原子,一个描述 IPv4 地址的 4 元组或一个描述 IPv6 地址的 8 元组。然后我们指定接收方的端口号(我们将把纸条放到哪个邮箱中?),然后是信息,它可以是一个字符串、一个二进制文件或一个 IO 列表。

信息是否真的被发送了?回到你的第一个 shell,尝试刷新数据

3> flush().
Shell got {udp,#Port<0.676>,{127,0,0,1},8790,<<"hey there!">>}
ok

太棒了。打开套接字的进程将接收以下格式的信息:{udp, Socket, FromIp, FromPort, Message}。使用这些字段,我们就能知道信息来自哪里,它通过哪个套接字,以及它的内容是什么。所以我们已经涵盖了打开套接字、发送数据和以活动模式接收数据。被动模式呢?为此,我们需要从第一个 shell 关闭套接字并打开一个新的套接字

4> gen_udp:close(Socket).
ok
5> f(Socket).
ok
6> {ok, Socket} = gen_udp:open(8789, [binary, {active,false}]).
{ok,#Port<0.683>}

因此,在这里,我们关闭了套接字,取消绑定Socket 变量,然后在我们再次打开套接字时绑定它,这次是处于被动模式。在发送回复之前,尝试以下操作

7> gen_udp:recv(Socket, 0).

你的 shell 应该卡住了。这里使用的函数是 recv/2。这个函数用于轮询被动套接字以获取消息。这里 0 是我们想要的消息长度。有趣的是,对于 gen_udp,长度完全被忽略了。gen_tcp 有一个类似的函数,在这种情况下,它确实有影响。无论如何,如果我们从未发送消息,recv/2 永远不会返回。回到第二个 shell 并发送一条新消息

3> gen_udp:send(Socket, {127,0,0,1}, 8789, "hey there!").
ok

然后第一个 shell 应该打印 {ok,{{127,0,0,1},8790,<<"hey there!">>}} 作为返回值。如果你不想永远等待怎么办?只需添加一个超时值

8> gen_udp:recv(Socket, 0, 2000).
{error,timeout}

这就是 UDP 的大部分内容。不,真的!

TCP 套接字

虽然 TCP 套接字与 UDP 套接字共享大部分接口,但它们的工作方式有一些重要的区别。最大的区别是,客户端和服务器是完全不同的东西。客户端将以以下操作执行

A diagram similar to the UDP one: connection leads to send and receive, which both send to each other. More over, all states can then lead to the closed state

而服务器将遵循以下模式

Diagram similar to the UDP one, although a listen state is added before the whole thing. That state can either move on to the 'accept' state (similar to 'open socket' for the possible branches) or to a close state.

看起来很奇怪,对吧?客户端的行为有点像我们在 gen_udp 中做的事情:你连接到一个端口,发送和接收,然后停止。然而,在服务时,我们有一种新的模式:监听。这是因为 TCP 如何设置会话。

首先,我们打开一个新的 shell 并使用 gen_tcp:listen(Port, Options) 启动一个称为监听套接字的东西

1> {ok, ListenSocket} = gen_tcp:listen(8091, [{active,true}, binary]).
{ok,#Port<0.661>}

监听套接字只负责等待连接请求。你可以看到我使用了与 gen_udp 相似的选项。这是因为大多数选项对于所有 IP 套接字都将是类似的。TCP 选项确实有一些更具体的选项,包括连接积压 ({backlog, N})、保持活动套接字 ({keepalive, true | false})、数据包打包 ({packet, N},其中 N 是要剥离和解析的每个数据包的标头长度) 等等。

一旦监听套接字打开,任何进程(多个进程)都可以获取监听套接字并进入“接受”状态,一直锁定直到某个客户端请求与其通信

2> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket, 2000).
** exception error: no match of right hand side value {error,timeout}
3> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket).
** exception error: no match of right hand side value {error,closed}

该死。我们超时了,然后崩溃了。当与之关联的 shell 进程消失时,监听套接字被关闭了。让我们重新开始,这次不使用 2 秒(2000 毫秒)超时

4> f().
ok
5> {ok, ListenSocket} = gen_tcp:listen(8091, [{active, true}, binary]).
{ok,#Port<0.728>}
6> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket).

然后该进程被锁定。太好了!让我们打开第二个 shell

1> {ok, Socket} = gen_tcp:connect({127,0,0,1}, 8091, [binary, {active,true}]). 
{ok,#Port<0.596>}

这个仍然使用与往常相同的选项,如果你不想永远等待,可以在最后一个位置添加 Timeout 参数。如果你回顾第一个 shell,它应该返回 {ok, SocketNumber}。从那时起,接受套接字和客户端套接字就可以一对一地通信,类似于 gen_udp。使用第二个 shell 向第一个 shell 发送消息

3> gen_tcp:send(Socket, "Hey there first shell!").
ok

从第一个 shell

7> flush().
Shell got {tcp,#Port<0.729>,<<"Hey there first shell!">>}
ok

两个套接字都可以以相同的方式发送消息,然后可以使用 gen_tcp:close(Socket) 关闭它们。请注意,关闭接受套接字将仅关闭该套接字,而关闭监听套接字将不会关闭任何相关的已建立接受套接字,但会通过返回 {error, closed} 来中断当前正在运行的接受调用。

这就是 Erlang 中 TCP 套接字的大部分内容!但这真的是吗?

啊,是的,当然,还有更多事情可以做。如果你自己尝试过一些套接字,你可能已经注意到套接字有一种所有权。

我的意思是,UDP 套接字、TCP 客户端套接字和 TCP 接受套接字都可以通过任何现有的进程发送消息,但接收到的消息只能由启动套接字的进程读取

A diagram that shows that all processes can send to a socket, but only the owner can receive messages

现在这很不实用,不是吗?这意味着我们必须始终保持所有者进程处于活动状态以中继消息,即使它与我们的需求无关。能够做这样的事情不是很好吗?

    1.  Process A starts a socket
    2.  Process A sends a request
    3.  Process A spawns process B
        with a socket
    4a. Gives ownership of the      4b. Process B handles the request
        socket to Process B
    5a. Process A sends a request   5b. Process B Keeps handling
                                        the request
    6a. Process A spawns process C  6b. ...
        with a socket
        ...

在这里,A 将负责运行一堆查询,但每个新进程将负责等待回复、处理回复等等。因此,对于 A 来说,委托一个新进程来运行该任务将很聪明。这里棘手的部分是放弃套接字的所有权。

这就是诀窍。gen_tcpgen_udp 都包含一个名为 controlling_process(Socket, Pid) 的函数。此函数必须由当前套接字所有者调用。然后,该进程告诉 Erlang“你知道吗?让这个 Pid 小伙子接管我的套接字。我放弃了”。从现在起,函数中的 Pid 就是可以从套接字读取和接收消息的人。就是这样。

使用 Inet 进行更多控制

所以现在我们理解了如何打开套接字、通过套接字发送消息、更改所有权等等。我们也知道如何在被动模式和主动模式下监听消息。在 UDP 示例中,当我想从主动模式切换到被动模式时,我重新启动了套接字,刷新了变量,然后继续。这很不实用,特别是当我们希望在使用 TCP 时执行相同的操作时,因为我们必须中断活动会话。

幸运的是,有一个名为 inet 的模块负责处理对 gen_tcpgen_udp 套接字都可能通用的所有操作。对于我们手头的这个问题,即在主动模式和被动模式之间切换,有一个名为 inet:setopts(Socket, Options) 的函数。选项列表可以包含在套接字设置时使用的任何项。

注意:小心!存在一个名为 inet 的模块和一个名为 inets 的模块。inet 是我们这里需要的模块。inets 是一个 OTP 应用程序,它包含许多预先编写的服务和服务器(包括 FTP、简单 FTP (TFTP)、HTTP 等等)。

区分它们的一个简单技巧是 inets 是关于构建在 inet 之上的services 的,或者如果你愿意,是 inet + s(ervices)。

启动一个 shell 作为 TCP 服务器

1> {ok, Listen} = gen_tcp:listen(8088, [{active,false}]).
{ok,#Port<0.597>}
2> {ok, Accept} = gen_tcp:accept(Listen).

在第二个 shell 中

1> {ok, Socket} = gen_tcp:connect({127,0,0,1}, 8088, []).
{ok,#Port<0.596>}
2> gen_tcp:send(Socket, "hey there").
ok

然后回到第一个 shell,套接字应该已被接受。我们刷新一下看看是否获得了任何东西

3> flush().
ok

当然没有,我们处于被动模式。让我们修复它

4> inet:setopts(Accept, [{active, true}]).
ok
5> flush().
Shell got {tcp,#Port<0.598>,"hey there"}
ok

是的!通过完全控制主动和被动套接字,我们拥有了力量。我们如何选择主动模式和被动模式?

A stop sign

嗯,有很多点。总的来说,如果你要立即等待消息,被动模式会快得多。Erlang 不必玩弄你的进程的邮箱来处理事情,你也不必扫描邮箱,获取消息等等。使用 recv 会更高效。但是,recv 会将你的进程从事件驱动型改为主动轮询型——如果你必须在套接字和一些其他 Erlang 代码之间充当中间人,这可能会让事情变得有点复杂。

在这种情况下,切换到主动模式将是一个好主意。如果数据包以消息形式发送,你只需在接收(或 gen_serverhandle_info 函数)中等待,然后玩弄消息。除了速度之外,这种方法的缺点与速率限制有关。

这个想法是,如果来自外部世界的所有数据包都被 Erlang 无条件地接受并转换为消息,那么 VM 外部的某个人很容易将其淹没并杀死它。被动模式的优点在于限制了消息何时以及如何放入 Erlang VM,并将阻塞、排队和丢弃消息的任务委托给更底层的实现。

那么,如果我们需要主动模式来实现语义,但需要被动模式来确保安全呢?我们可以尝试使用 inet:setopts/2 在被动模式和主动模式之间快速切换,但这对于竞争条件来说风险太大。相反,有一种称为“主动一次”的模式,使用 {active, once} 选项。让我们试一试,看看它是如何工作的。

保留之前带有服务器的 shell

6> inet:setopts(Accept, [{active, once}]).
ok

现在进入客户端 shell,运行另外两个 send/2 调用

3> gen_tcp:send(Socket, "one").
ok
4> gen_tcp:send(Socket, "two").
ok

回到服务器 shell

7> flush().
Shell got {tcp,#Port<0.598>,"one"}
ok
8> flush().
ok
9> inet:setopts(Accept, [{active, once}]).
ok
10> flush().
Shell got {tcp,#Port<0.598>,"two"}
ok

看到了吗?直到我们第二次请求 {active, once} 之前,消息 "two" 还没有被转换为消息,这意味着套接字已恢复到被动模式。因此,主动一次模式允许我们以安全的方式在主动模式和被动模式之间进行这种来回切换。很好的语义,加上安全性。

inet 中还有其他一些不错的函数。一些用于读取统计信息、获取当前主机信息、检查套接字等等的函数。

好吧,这就是套接字的大部分内容。现在是时候将其付诸实践了。

注意:在互联网的荒野中,你可以使用库来使用大量协议来执行此操作:HTTP、0mq、原始 Unix 套接字等等。它们都可用。然而,标准的 Erlang 分发版附带两个主要选项,TCP 和 UDP 套接字。它还附带一些 HTTP 服务器和解析代码,但它并不是最有效的东西。

更新
从 17.0 版开始,现在可以告诉一个端口在 N 个数据包期间保持活动状态。{active, N} 选项已添加到 TCP 和 UDP 端口,其中 N 可以是 0 到 32767 之间的任何值。一旦剩余消息计数器达到 0 或通过 inet:setopts/2 明确设置为 0,套接字将过渡到被动 ({active, false}) 模式。此时,将向套接字的控制进程发送一条消息以通知它过渡。消息将是 {tcp_passive, Socket},对于 UDP 则是 {udp_passive, Socket}

当多次调用该函数时,每个新值都会加到总计数器中。使用 {active, 3} 调用该函数三次将使它最多向控制进程发送 9 条消息。N 值也可以为负数以强制递减计数器。如果最终值将低于 0,Erlang 将将其静默设置为 0 并过渡到被动模式。

A cup of coffee with cookies and a spoon. Text says 'take a break'

Sockserv,再探

在本章中,我不会介绍太多新代码。相反,我们将回顾 Process Quest 中的 sockserv 服务器,在上一章中。它是一个完全可行的服务器,我们将看到如何在 OTP 监督树中,在一个 gen_server 中处理服务 TCP 连接。

TCP 服务器的朴素实现可能看起来像这样

-module(naive_tcp).
-compile(export_all).

start_server(Port) ->
    Pid = spawn_link(fun() ->
        {ok, Listen} = gen_tcp:listen(Port, [binary, {active, false}]),
        spawn(fun() -> acceptor(Listen) end),
        timer:sleep(infinity)
    end),
    {ok, Pid}.

acceptor(ListenSocket) ->
    {ok, Socket} = gen_tcp:accept(ListenSocket),
    spawn(fun() -> acceptor(ListenSocket) end),
    handle(Socket).

%% Echoing back whatever was obtained
handle(Socket) ->
    inet:setopts(Socket, [{active, once}]),
    receive
        {tcp, Socket, <<"quit", _/binary>>} ->
            gen_tcp:close(Socket);
        {tcp, Socket, Msg} ->
            gen_tcp:send(Socket, Msg),
            handle(Socket)
    end.

为了理解它是如何工作的,一个小的图形表示可能会有所帮助

A diagram showing the first process (P1) spawning a listen socket and a first acceptor process (P2). The first acceptor can accept request, handle messages, and then spawn a new acceptor process (P3) that does the same as P2

因此,start_server 函数打开一个监听套接字,生成一个接受器,然后一直空闲。空闲是必要的,因为监听套接字绑定到打开它的进程,因此只要我们想要处理连接,该进程就需要保持活动状态。每个接受器进程都会等待连接以接受。一旦有一个连接进来,接受器进程就会启动一个新的类似进程,并将监听套接字共享给它。然后,它可以继续执行一些处理,而新家伙正在工作。每个处理程序都会重复它接收到的所有消息,直到其中一个消息以 "quit" 开头——然后连接被关闭。

注意:模式 <<"quit", _/binary>> 表示我们首先要匹配包含字符 quit 的二进制字符串,加上一些我们不关心的二进制数据 (_)。

在 Erlang shell 中通过执行 naive_tcp:start_server(8091). 来启动服务器。然后打开一个 telnet 客户端(请记住,telnet 客户端从技术上讲不适合原始 TCP,但可以作为良好的客户端来测试服务器,而无需编写一个客户端)到 localhost,你就可以看到以下事件发生

$ telnet localhost 8091
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hey there
hey there
that's what I asked
that's what I asked
stop repeating >:(
stop repeating >:(
quit doing that!
Connection closed by foreign host.

好极了。是时候开始一家名为Poople Inc. 的新公司,并使用这样的服务器发布一些社交网络了。除了模块的名称提到它是朴素的实现之外。代码很简单,但没有考虑到并行性。如果所有请求都是一个接一个地发来的,那么朴素服务器工作良好。但是,如果我们有 15 个人同时想要连接到服务器的队列怎么办?

一次只能回复一个查询,这与每个进程首先等待连接、建立连接,然后生成新的接受者有关。队列中的第 15 个请求必须等待 14 个其他连接建立才能有机会请求与我们的服务器对话的权利。如果您使用的是生产服务器,这可能更接近每秒 500 到 1000 个查询。这是不切实际的。

我们需要做的是改变我们现有的顺序工作流程

A diagram showing in order, a listen operation, then a bunch of 'accepts' coming one after the other in a chain

变成更加并行的流程

A diagram showing in order, a listen operation, then a bunch of 'accepts' coming under the listen operation

通过让多个接受者处于待机状态,我们将减少回答新查询的延迟。现在,我们不会再进行另一个演示实现,而是研究上一章中的 sockserv-1.0.1。探索基于真实 OTP 组件和真实世界实践的东西会更好。事实上,sockserv 的通用模式与 cowboy(尽管 cowboy 无疑比 sockserv 更可靠)和 etorrent 种子客户端等服务器中使用的模式相同。

为了构建这个 Process Quest 的 sockserv,我们将自上而下进行。我们需要的设计将是一个包含多个工作者的监督者。如果我们看一下上面的并行图,监督者应该持有监听套接字并将其共享给所有工作者,工作者负责接收信息。

我们如何编写一个可以跨所有工作者共享信息的监督者?使用常规监督无法做到这一点:所有子进程都是完全独立的,无论您使用 one_for_oneone_for_all 还是 rest_for_one 监督。一个自然的反应可能是转向全局状态:一个已注册的进程,它只持有监听套接字并将它传递给处理程序。您必须克服这种本能,要聪明一点。运用原力(以及阅读 监督者章节 的能力)。您有两分钟时间思考解决方案(两分钟的计时基于荣誉制度。自己计时)。

秘密在于使用 simple_one_for_one 监督者。因为 simple_one_for_one 监督者与其所有子进程共享子进程规范,所以我们只需要将监听套接字塞入其中,让所有子进程都能访问它!

所以这是监督者在所有荣耀中的样子

%%% The supervisor in charge of all the socket acceptors.
-module(sockserv_sup).
-behaviour(supervisor).

-export([start_link/0, start_socket/0]).
-export([init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    {ok, Port} = application:get_env(port),
    %% Set the socket into {active_once} mode.
    %% See sockserv_serv comments for more details
    {ok, ListenSocket} = gen_tcp:listen(Port, [{active,once}, {packet,line}]),
    spawn_link(fun empty_listeners/0),
    {ok, {{simple_one_for_one, 60, 3600},
         [{socket,
          {sockserv_serv, start_link, [ListenSocket]}, % pass the socket!
          temporary, 1000, worker, [sockserv_serv]}
         ]}}.

start_socket() ->
    supervisor:start_child(?MODULE, []).

%% Start with 20 listeners so that many multiple connections can
%% be started at once, without serialization. In best circumstances,
%% a process would keep the count active at all times to insure nothing
%% bad happens over time when processes get killed too much.
empty_listeners() ->
    [start_socket() || _ <- lists:seq(1,20)],
    ok.

这里到底发生了什么。标准的 start_link/0init/1 函数都在这里。您可以看到 sockserv 获取了 simple_one_for_one 重启策略,以及包含传递给所有子进程的 ListenSocket 的子进程规范。每个使用 start_socket/0 启动的子进程默认都会将其作为参数。神奇!

仅仅拥有它还不够。我们希望应用程序能够尽快地服务查询。这就是为什么我添加了对 spawn_link(fun empty_listeners/0) 的调用。empty_listeners/0 函数将启动 20 个处理程序,以便锁定并等待传入连接。我将其放在 spawn_link/1 调用中,原因很简单:监督者进程处于其 init/1 阶段,无法回答任何消息。如果我们在 init 函数内部调用自身,进程将死锁并永远不会完成运行。出于这个原因,需要一个外部进程。

注意: 在上面的代码片段中,您会注意到我对 gen_tcp 传递了选项 {packet, line}。此选项将使所有接收到的数据包被拆分为单独的行,并根据该行进行排队(行尾仍然是接收到的字符串的一部分)。这将有助于确保在我们的情况下,与 telnet 客户端更好地协同工作。但是请注意,比接收缓冲区更长的行可能会被拆分为多个数据包,因此两个数据包可能代表一行。验证接收到的内容是否以换行符结尾将让您知道该行是否结束。

所以,这就是整个棘手部分。现在我们可以专注于编写工作者本身。

如果您还记得上一章的 Process Quest 会话,事情是这样的

  1. 用户连接到服务器
  2. 服务器询问角色名称
  3. 用户发送角色名称
  4. 服务器建议属性
    1. 用户拒绝,返回到第 4 步
    2. 用户接受,转到第 6 步
  5. 游戏向玩家发送事件,直到
  6. 用户向服务器发送 quit 或套接字被强制关闭

这意味着我们的服务器进程将有两类输入:来自 Process Quest 应用程序的输入和来自用户的输入。来自用户的数据将通过套接字进行,因此将在 gen_server 的 handle_info/2 函数中处理。来自 Process Quest 的数据可以通过我们控制的方式发送,因此由 handle_cast 处理的投递在这种情况下的意义重大。首先,我们必须启动服务器

-module(sockserv_serv).
-behaviour(gen_server).

-record(state, {name, % player's name
                next, % next step, used when initializing
                socket}). % the current socket

-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         code_change/3, terminate/2]).

首先是一个相当标准的 gen_server 回调模块。这里唯一特殊的是状态包含角色名称、套接字和一个名为 next 的字段。next 部分是一个有点像万金油的字段,用于存储与服务器状态相关的临时信息。可能不需要过多地麻烦,就可以使用 gen_fsm 来完成这个操作。

对于实际的服务器启动

-define(TIME, 800).
-define(EXP, 50).

start_link(Socket) ->
    gen_server:start_link(?MODULE, Socket, []).

init(Socket) ->
    %% properly seeding the process
    <<A:32, B:32, C:32>> = crypto:rand_bytes(12),
    random:seed({A,B,C}),
    %% Because accepting a connection is a blocking function call,
    %% we can not do it in here. Forward to the server loop!
    gen_server:cast(self(), accept),
    {ok, #state{socket=Socket}}.

%% We never need you, handle_call!
handle_call(_E, _From, State) ->
    {noreply, State}.

上面定义的两个宏(?TIME?EXP)是特殊参数,它们使得能够设置动作之间的基本延迟(800 毫秒)和达到第二级的所需经验值(50,每级翻倍)。

您会注意到 start_link/1 函数接收一个套接字。这就是从 sockserv_sup 传递的监听套接字。

关于随机种子的第一部分是为了确保进程被正确地播种,以便在以后生成角色统计信息。否则,许多进程将使用某个默认值,我们不希望这样。我们为什么在 init/1 函数中初始化而不是在使用随机数的任何库中初始化,是因为种子存储在进程级别(该死的!可变状态!),我们不希望在每次库调用时都设置新的种子。

无论如何,那里真正重要的是我们向自己投递了一条消息。这样做的原因是 gen_tcp:accept/1-2 是一个阻塞操作,再加上所有 init 函数都是同步的。如果我们等待 30 秒来接受连接,启动该进程的监督者也将被锁定 30 秒。所以,我们向自己投递了一条消息,然后将监听套接字添加到状态的 socket 字段中。

不要喝太多酷爱
如果您阅读其他人的代码,您经常会看到人们使用 now() 的结果调用 random:seed/1now() 是一个不错的函数,因为它返回单调时间(始终递增,绝不会两次相同)。但是,对于 Erlang 中使用的随机算法来说,它是一个糟糕的种子值。出于这个原因,最好使用 crypto:rand_bytes(12) 来生成 12 个加密安全的随机字节(如果您使用的是 R14B03+,请使用 crypto:strong_rand_bytes(12))。通过执行 <<A:32, B:32, C:32>>,我们将 12 个字节转换为 3 个整数,以传递给函数。

更新
从 18.0 版本开始,引入了 rand 模块,其中包含比 random 模块更好的伪随机算法,并且不需要播种。

我们需要接受那个连接。别再胡闹了

handle_cast(accept, S = #state{socket=ListenSocket}) ->
    {ok, AcceptSocket} = gen_tcp:accept(ListenSocket),
    %% Remember that thou art dust, and to dust thou shalt return.
    %% We want to always keep a given number of children in this app.
    sockserv_sup:start_socket(), % a new acceptor is born, praise the lord
    send(AcceptSocket, "What's your character's name?", []),
    {noreply, S#state{socket=AcceptSocket, next=name}};

我们接受连接,启动一个替换接受者(这样我们始终有大约 20 个接受者准备处理新连接),然后将接受套接字存储为 ListenSocket 的替换,并注意我们通过套接字接收的下一条消息是关于名称的,其 next 字段包含该信息。

但在继续之前,我们通过以下定义的 send 函数向客户端发送一个问题

send(Socket, Str, Args) ->
    ok = gen_tcp:send(Socket, io_lib:format(Str++"~n", Args)),
    ok = inet:setopts(Socket, [{active, once}]),
    ok.

狡猾!因为我期望我们在接收到消息后几乎总是必须回复,所以我将该函数中的 主动一次 例程,并在其中添加换行符。这仅仅是懒惰,隐藏在一个函数中。

我们已经完成了步骤 1 和步骤 2,现在我们必须等待来自套接字的用户的输入

handle_info({tcp, _Socket, Str}, S = #state{next=name}) ->
    Name = line(Str),
    gen_server:cast(self(), roll_stats),
    {noreply, S#state{name=Name, next=stats}};

我们不知道 Str 字符串中将包含什么,但这没关系,因为状态的 next 字段让我们知道我们接收到的任何内容都是一个名称。因为我期望用户使用 telnet 作为演示应用程序,所以我们接收到的所有文本片段都将包含行结束符。以下定义的 line/1 函数将它们删除

%% Let's get rid of the white space and ignore whatever's after.
%% makes it simpler to deal with telnet.
line(Str) ->
    hd(string:tokens(Str, "\r\n ")).

收到名称后,我们将存储它,然后向自己投递一条消息 (roll_stats),以生成玩家的属性,这是下一步。

注意: 如果你查看文件,你会发现我没有匹配整个消息,而是使用了更短的 ?SOCK(Var) 宏。宏定义为 -define(SOCK(Msg), {tcp, _Port, Msg}).,这仅仅是像我这样懒惰的人以更少的输入来匹配字符串的一种快捷方式。

属性滚动会返回到 handle_cast 子句中

handle_cast(roll_stats, S = #state{socket=Socket}) ->
    Roll = pq_stats:initial_roll(),
    send(Socket,
         "Stats for your character:~n"
         "  Charisma: ~B~n"
         "  Constitution: ~B~n"
         "  Dexterity: ~B~n"
         "  Intelligence: ~B~n"
         "  Strength: ~B~n"
         "  Wisdom: ~B~n~n"
         "Do you agree to these? y/n~n",
         [Points || {_Name, Points} <- lists:sort(Roll)]),
    {noreply, S#state{next={stats, Roll}}};
two dice, with a 5 rolled on each

pq_stats 模块包含用于滚动属性的函数,整个子句仅用于在其中输出属性。~B 格式参数意味着我们希望输出一个整数。状态的 next 部分在这里有点超载。因为我们询问用户是否同意,所以我们必须等待他们告诉我们,然后要么丢弃属性并生成新的属性,要么将它们传递给我们即将启动的 Process Quest 角色。

让我们收听用户的输入,这次在 handle_info 函数中

handle_info({tcp, Socket, Str}, S = #state{socket=Socket, next={stats, _}}) ->
    case line(Str) of
        "y" ->
            gen_server:cast(self(), stats_accepted);
        "n" ->
            gen_server:cast(self(), roll_stats);
        _ -> % ask again because we didn't get what we wanted
            send(Socket, "Answer with y (yes) or n (no)", [])
    end,
    {noreply, S};

在该直接函数子句中启动角色会很诱人,但我决定不这样做:handle_info 用于处理用户的输入,handle_cast 用于 Process Quest 的事情。关注点分离!如果用户拒绝属性,我们只需再次调用 roll_stats。没什么新鲜事。当用户接受时,我们可以启动 Process Quest 角色并开始等待来自那里的事件

%% The player has accepted the stats! Start the game!
handle_cast(stats_accepted, S = #state{name=Name, next={stats, Stats}}) ->
    processquest:start_player(Name, [{stats,Stats},{time,?TIME},
                                     {lvlexp, ?EXP}]),
    processquest:subscribe(Name, sockserv_pq_events, self()),
    {noreply, S#state{next=playing}};

这些是我为游戏定义的常规调用。您启动一个玩家,并使用 sockserv_pq_events 事件处理程序订阅事件。下一个状态是 playing,这意味着所有接收到的消息很可能是来自游戏的

%% Events coming in from process quest
%% We know this because all these events' tuples start with the
%% name of the player as part of the internal protocol defined for us
handle_cast(Event, S = #state{name=N, socket=Sock}) when element(1, Event) =:= N ->
    [case E of
       {wait, Time} -> timer:sleep(Time);
       IoList -> send(Sock, IoList, [])
     end || E <- sockserv_trans:to_str(Event)], % translate to a string
    {noreply, S}.

我不会深入探讨它的工作原理。只需要知道 sockserv_trans:to_str(Event) 将某些游戏事件转换为 IO 列表列表或 {wait, Time} 元组,这些元组表示事件部分之间的延迟(我们在显示敌人掉落的物品之前,会稍微提前打印 正在执行 ... 消息)。

如果您还记得要遵循的步骤列表,我们已经涵盖了所有步骤,除了一个。当用户告诉我们他们要退出时退出。将以下子句作为 handle_info 中的首个子句

handle_info({tcp, _Socket, "quit"++_}, S) ->
    processquest:stop_player(S#state.name),
    gen_tcp:close(S#state.socket),
    {stop, normal, S};

停止角色,关闭套接字,终止进程。太棒了。其他退出原因包括客户端关闭 TCP 套接字

handle_info({tcp_closed, _Socket}, S) ->
    {stop, normal, S};
handle_info({tcp_error, _Socket, _}, S) ->
    {stop, normal, S};
handle_info(E, S) ->
    io:format("unexpected: ~p~n", [E]),
    {noreply, S}.

我还添加了一个额外的子句来处理未知消息。如果用户输入了我们没有预料到的内容,我们不想崩溃。只剩下 terminate/2code_change/3 函数需要做

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(normal, _State) ->
    ok;
terminate(_Reason, _State) ->
    io:format("terminate reason: ~p~n", [_Reason]).

如果你完整地完成了所有步骤,可以尝试编译此文件,并将它替换为我们发布版本中相应的 beam 文件,看看它是否可以正常运行。如果你复制正确(我也复制正确的话),它应该可以运行。

接下来做什么?

如果你接受这个任务,你的下一个任务是为客户端添加一些你选择的命令:为什么不添加诸如“暂停”之类的命令,这些命令会在一段时间内将操作排队,然后在你恢复服务器时一次性输出它们呢?或者,如果你足够牛逼,可以记录你在 sockserv_serv 模块中到目前为止的级别和统计数据,并添加从客户端获取它们的命令。我总是讨厌留给读者练习的习题,但有时将它放在这里和那里实在太诱人了,所以尽情享受吧!

否则,阅读现有服务器实现的源代码,自己编写一些代码等等,都是很好的练习。很少有语言能像编写 Web 服务器那样,成为业余爱好者的练习,但 Erlang 就是其中之一。多加练习,它就会成为你的第二天性。Erlang 与外部世界的通信只是我们朝着编写有用软件迈出的众多步骤之一。