大家好,欢迎来到IT知识分享网。
本章涵盖
- 开发用于并行计算的功能性 API
- 代数方式接近 API
- 定义泛型组合器
由于现代计算机的每个 CPU 有多个内核,并且通常有多个 CPU,因此以可以利用这种并行处理能力的方式设计程序比以往任何时候都更加重要。但是以并行方式运行的程序的交互是复杂的,而且众所周知,执行线程之间的传统通信机制 – 共享可变内存 – 很难推理。这很容易导致程序出现争用条件和死锁,不容易测试,并且不能很好地扩展。
在本章中,我们将构建一个纯函数库,用于创建并行和异步计算。我们将通过仅使用纯函数来描述并行程序固有的复杂性。这将使我们能够使用替换模型来简化我们的推理,并希望使并发计算既简单又愉快。
读完本章后,您不仅应该了解如何为纯函数并行编写库,还应该了解如何解决设计纯函数库的问题。我们的主要关注点是使我们的库高度可组合和模块化。为此,我们将保持我们的主题,将描述计算的关注点与实际运行计算分开。我们希望允许我们库的用户在非常高的级别编写程序,使他们与程序如何执行的细节隔离开来。例如,在本章末尾,我们将开发一个操作 parMap ,它将使我们能够轻松地将函数 f 同时应用于集合中的每个元素:
val outputList = parMap(inputList)(f)
为了实现这一目标,我们将迭代工作。我们将从我们希望我们的库处理的简单用例开始,然后开发一个促进此用例的接口;只有这样,我们才会考虑这个接口的实现应该是什么。随着我们不断完善我们的设计,我们将在接口和实现之间摇摆不定,同时通过越来越复杂的用例更好地了解领域和设计空间。我们将强调代数推理,并介绍 API 可以用遵守特定定律的代数来描述的想法。
为什么要设计我们自己的库?为什么不直接在scala.并发包中使用Scala标准库附带的并发原语呢?这部分是出于教学目的 — 我们想向您展示设计自己的实用库是多么容易。但还有另一个原因:我们希望鼓励这样一种观点,即没有现有的图书馆是权威的或无法重新审查的,即使由专家设计并标记为标准。做其他人做的事情有一定的安全性,但传统的东西不一定是最实用的。大多数库都包含许多任意的设计选择,其中许多是无意中做出的。当你从头开始时,你可以重新审视设计库的所有基本假设,走一条不同的道路,并发现其他人可能甚至没有考虑过的问题空间。因此,您可能会得出更适合您目的的自己的设计。在这种情况下,我们的基本假设是我们的库绝对不允许副作用。
我们将在本章中编写大量代码,主要作为读者的练习。与往常一样,您可以在本书随附的可下载内容中找到答案。
7.1 选择数据类型和函数
当你开始设计一个函数库时,你通常会对你希望能够做什么有一些一般的想法,而设计过程中的困难在于提炼这些想法并找到一个支持你想要的功能的数据类型。在我们的例子中,我们希望能够创建并行计算,但这到底意味着什么?让我们尝试将其细化为可以通过检查一个简单的、可并行化的计算来实现的东西:对整数列表求和。通常的左折如下:
def sum(ints: Seq[Int]): Int =
ints.foldLeft(0)((a, b) => a + b)
这里 Seq 是标准库中列表和其他序列的超类。重要的是,它有一个foldLeft方法。我们可以使用分而治之算法,而不是按顺序折叠(请参阅以下列表)。
示例 7.1 使用分而治之算法对列表求和
def sum(ints: IndexedSeq[Int]): Int = ①
if ints.size <= 1 then
ints.headOption.getOrElse(0) ②
else
val (l, r) = ints.splitAt(ints.size / 2) ③
sum(l) + sum(r) ④
(1) IndexedSeq 是标准库中随机存取序列的超类,如 Vector。与列表不同,这些序列提供了一种有效的 splitAt 方法,用于在特定索引处将它们分成两部分。
(4) headOption 是在 Scala 中的所有集合上定义的方法。我们在第<>章中看到了这个函数。
(3) 使用 splitAt 函数将序列分成两半
(4) 递归地将两半相加,并将结果相加
我们使用 splitAt 函数将序列分成两半,递归地对两半求和,然后合并它们的结果。与基于 foldLeft 的实现不同,此实现可以并行化;两半可以并行相加。
简单示例的重要性
实际上,对整数求和可能非常快,以至于并行化带来的开销超过了节省的开销。但是像这样的简单示例正是在设计函数库时最有帮助的那种。复杂的例子包括各种偶然的结构和无关的细节,这些细节可能会混淆最初的设计过程。我们试图解释问题域的本质,一个好方法是从琐碎的例子开始,在这些例子中排除共同关注的问题,并逐渐增加复杂性。在函数设计中,我们的目标是实现表现力,而不是通过堆积如山的特殊情况,而是通过构建一组简单且可组合的核心数据类型和函数来实现。
当我们考虑什么样的数据类型和函数可以实现并行化这种计算时,我们可以改变我们的观点。与其关注这种并行性最终将如何实现并强迫我们自己直接使用实现 API(可能与 java.lang.Thread 和 java.util.concurrent 库有关),不如设计我们自己理想的 API,如我们的示例所示,并从那里向后工作到实现。
7.1.1 并行计算的数据类型
查看行 sum(l) + sum(r),它递归地调用两半的 sum。仅通过查看这一行,我们可以看到,我们可能选择表示并行计算的任何数据类型都需要能够包含结果。该结果将具有一些有意义的类型(在本例中为 Int ),我们需要某种方法来提取此结果。让我们将这些新发现的知识应用到我们的设计中。现在,我们可以为我们的结果发明一个容器类型,Par[A](用于并行),并立法规定我们需要的函数的存在:
- def unit[A](a: => A): Par[A] — 获取未计算的 A,返回可能在单独的线程中评估它的计算。我们称它为单元,因为从某种意义上说,它创建了一个仅包装单个值的并行单元。
- def get[A](a: Par[A]): A —从并行计算中提取结果值。
我们真的能做到吗?是的,当然!现在,我们不需要担心我们需要什么其他函数,Par的内部表示可能是什么,或者这些功能是如何实现的。我们只是通过检查我们的简单示例来读取必要的数据类型和函数。现在让我们更新此示例。
示例 7.2 使用我们的自定义数据类型更新总和
def sum(ints: IndexedSeq[Int]): Int =
if ints.size <= 1 then
ints.headOption.getOrElse(0)
else
val (l, r) = ints.splitAt(ints.size / 2)
val sumL: Par[Int] = Par.unit(sum(l)) ①
val sumR: Par[Int] = Par.unit(sum(r)) ②
Par.get(sumL) + Par.get(sumR) ③
(1) 并行计算左半部分
(2) 并行计算右半部分
(3) 提取结果并求和
我们已经将两个递归调用包装为对单位的调用求和,并且我们调用get以从两个子计算中提取两个结果。
直接使用并发原语的问题
java.lang.Thread 和 Runnable 呢?让我们来看看这些类。以下是他们的API的部分摘录,转录到Scala中:
trait Runnable:
def run: Unit
class Thread(r: Runnable):
def start: Unit ①
def join: Unit ②
(1) 开始在单独的线程中运行 r
(2) 阻塞调用线程,直到 r 完成运行
我们已经可以看到这两种类型的一个问题:没有一个方法返回有意义的值。因此,如果我们想从 Runnable 中获取任何信息,它必须有一些副作用,比如改变我们可以检查的某些状态。
这对组合性不利;我们不能一般地操纵 Runnable 对象,因为我们总是需要了解它们的内部行为。线程还有一个缺点,即它直接映射到操作系统线程上,而操作系统线程是一种稀缺资源。最好创建尽可能多的逻辑线程,然后再将这些线程映射到实际的操作系统线程上。
这种事情可以通过java.util.并发之类的东西来处理。未来、执行服务或类似。我们为什么不直接使用它们?以下是他们 API 的一部分:
class ExecutorService:
def submit[A](a: Callable[A]): Future[A]
trait Future[A]:
def get: A
虽然它们在物理线程上抽象方面提供了巨大的帮助,但这些原语的抽象级别仍然比我们在本章中要创建的库低得多。例如,对 Future.get 的调用会阻塞调用线程,直到 ExecutorService 完成执行它,并且其 API 不提供编写期货的方法。当然,我们可以在这些工具之上构建库的实现(事实上,这是我们在本章后面最终要做的事情),但它们并没有提供我们想要直接从函数式程序中使用的模块化和组合 API。
我们现在可以选择单位的含义并得到;Unit 可以立即在单独的(逻辑)线程中开始计算其参数,1 或者它可以简单地保留其参数,直到调用 get 并开始计算。但请注意,在这个例子中,如果我们想获得任何程度的并行性,我们要求 unit 开始并发计算其参数并立即返回。你能明白为什么吗?阿拉伯数字
但是,如果单位开始同时评估其参数,那么调用get可以说会破坏引用透明度。我们可以通过用它们的定义替换 sumL 和 sumR 来看到这一点;如果我们这样做,我们仍然得到相同的结果,但我们的程序不再是并行的:
Par.get(Par.unit(sum(l))) + Par.get(Par.unit(sum(r)))
如果单元立即开始评估其参数,get 将等待该评估完成。这意味着如果我们简单地内联 sumL 和 sumR 变量,+ 号的两侧将不会并行运行。我们可以看到该单位具有明确的副作用,但仅限于获得 .也就是说,在这种情况下,unit 只是返回一个 Par[Int],表示异步计算。但是一旦我们通过了那个 Par 来获得 ,我们明确地等待它,暴露了副作用。因此,我们似乎希望避免调用get,或者至少将调用它推迟到最后。我们希望能够组合异步计算,而无需等待它们完成。
在我们继续之前,请花点时间检查一下我们所做的工作。首先,我们想出了一个简单、几乎微不足道的例子。接下来,我们稍微探讨了这个例子,以揭示设计选择。然后,通过一些实验,我们发现了一个选项的有趣结果,并在此过程中了解了我们问题域的性质!整个设计过程是一系列这些小冒险。你不需要任何特殊的许可证来进行这种探索,也不需要成为函数式编程方面的专家。只要潜入,看看你发现了什么。
7.1.2 组合并行计算
让我们看看是否可以避免上述组合单元并获得的陷阱。如果我们不调用 get,这意味着我们的 sum 函数必须返回一个 Par[Int]。这种变化揭示了什么后果?同样,让我们发明具有所需签名的函数:
def sum(ints: IndexedSeq[Int]): Par[Int] =
if ints.size <= 1 then
Par.unit(ints.headOption.getOrElse(0))
else
val (l, r) = ints.splitAt(ints.size / 2)
Par.map2(sum(l), sum(r))(_ + _)
习题 7.1 |
—————————————————————————————— |
Par.map2 是一个新的高阶函数,用于组合两个并行计算的结果。它的签名是什么?尽可能提供最通用的签名(不假设它仅适用于 Int )。
请注意,我们不再在递归情况下调用 unit,并且不清楚 unit 是否应该懒惰地接受它的参数。在这个例子中,懒惰地接受论点似乎不会提供任何好处,但也许情况并非总是如此。让我们稍后再回到这个问题。
那么 map2 呢——它应该懒惰地接受它的论点吗?对于 map2 来说,并行运行计算的两端是有意义的,给每一端平等的运行机会(map2 参数的顺序很重要似乎是任意的,因为我们只是希望 map2 表明正在组合的两个计算是独立的,可以并行运行)。什么选择让我们实现这个意义?作为一个简单的测试用例,考虑如果 map2 在两个参数中都很严格并且我们正在评估 sum(IndexedSeq(1,2,3,4)) 会发生什么。花点时间完成并理解以下(有点风格化的)程序跟踪。
清单 7.3 程序跟踪求和
sum(IndexedSeq(1,2,3,4))
map2(
sum(IndexedSeq(1,2)),
sum(IndexedSeq(3,4)))(_ + _)
map2(
map2(
sum(IndexedSeq(1)),
sum(IndexedSeq(2)))(_ + _),
sum(IndexedSeq(3,4)))(_ + _)
map2(
map2(
unit(1),
unit(2))(_ + _),
sum(IndexedSeq(3,4)))(_ + _)
map2(
map2(
unit(1),
unit(2))(_ + _),
map2(
sum(IndexedSeq(3)),
sum(IndexedSeq(4)))(_ + _))(_ + _)
...
在这个跟踪中,为了计算 sum(x),我们将 x 代入 sum 的定义中,就像我们在前面的章节中所做的那样。因为 map2 是严格的,并且 Scala 从左到右计算参数,所以每当我们遇到 map2(sum(x), sum(y))(_ + _) 时,我们必须递归地计算 sum(x) 等等。这有一个相当不幸的后果,要求我们先严格构造求和树的整个左半部分,然后再(严格)构造右半部分。这里 sum(IndexedSeq(1,2)) 在我们考虑 sum(IndexedSeq(3,4)) 之前完全展开。如果 map2 并行评估其参数(使用用于实现并行性的任何资源,如线程池),这意味着我们计算的左半部分将在我们开始构建计算的右半部分之前开始执行。
如果我们保持 map2 严格但不让它立即开始执行怎么办?这有帮助吗?如果 map2 没有立即开始评估,这意味着 Par 值只是构建了需要并行计算的内容的描述。在我们评估此描述之前,实际上什么都不会发生,也许使用类似 get 的函数。问题是,如果我们严格地构建我们的描述,它们将是相当重量级的对象。回顾我们的跟踪,我们的描述必须包含要执行的操作的完整树:
map2(
map2(
unit(1),
unit(2))(_ + _),
map2(
unit(3),
unit(4))(_ + _))(_ + _)
无论我们使用什么数据结构来存储此描述,它都可能比原始列表本身占用更多的空间。如果我们的描述更轻量级,那就太好了。
似乎我们应该让 map2 变得懒惰,并让它立即开始并行执行双方。这也解决了任何一方都不优先于另一方的问题。
7.1.3 显式分叉
我们的最新选择仍然有些不对劲。我们是否总是想并行评估 map2 的两个参数?应该不会。考虑这个简单的假设示例:
Par.map2(Par.unit(1), Par.unit(1))(_ + _)
在这种情况下,我们碰巧知道我们正在组合的两个计算将执行得如此之快,以至于生成一个单独的逻辑线程来评估它们没有多大意义,但是我们的 API 没有为我们提供任何提供此类信息的方法。也就是说,我们当前的 API 对于计算何时从主线程分叉非常明确;程序员无法指定这种分叉应该发生在哪里。如果我们使分叉更明确怎么办?我们可以通过发明另一个函数来做到这一点,def fork[A](a: => Par[A]):Par[A],我们可以认为这意味着给定的Par应该在单独的逻辑线程中运行:
def sum(ints: IndexedSeq[Int]): Par[Int] =
if ints.size <= 1 then
Par.unit(ints.headOption.getOrElse(0))
else
val (l, r) = ints.splitAt(ints.size / 2)
Par.map2(Par.fork(sum(l)), Par.fork(sum(r)))(_ + _)
使用 fork,我们现在可以使 map2 变得严格,如果程序员愿意,可以由他们来包装参数。像fork这样的函数解决了过于严格地实例化并行计算的问题,但更根本的是,它将并行性明确地置于程序员的控制之下。我们在这里解决两个问题:首先,我们需要某种方法来指示两个并行任务的结果应该合并。除此之外,我们可以选择是否应异步执行特定任务。通过将这些问题分开,我们避免了将任何类型的并行性全局策略附加到 map2 和我们编写的其他操作上,这意味着对哪种全局策略最好做出艰难(最终是武断的)选择。
现在让我们回到单位应该严格还是懒惰的问题。使用fork,我们现在可以使单位严格,而不会损失任何表现力。它的非严格版本——我们称之为 lazyUnit——可以使用 unit 和 fork 来实现:
def unit[A](a: A): Par[A]
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
函数 lazyUnit 是派生组合子的简单示例,与原始组合器(如 unit)相反。我们在这里非正式地使用术语组合器来指代在设计时考虑组合的函数和值,允许从单一用途部件逐步构建有趣的功能。我们能够根据其他操作来定义 lazyUnit;稍后,当我们为 Par 选择一个表示时,lazyUnit 不需要知道有关该表示的任何信息——它对 Par 的唯一了解将来自在 Par 上定义的操作分支和单元。3
我们知道我们希望 fork 发出信号,表明它的参数在单独的逻辑线程中得到评估,但我们仍然有一个问题,即它是否应该在被调用时立即开始这样做,或者保留它的参数,以便在稍后强制计算时在逻辑线程中进行评估,使用 get 之类的东西。换句话说,评估应该是分叉还是get的责任?评估应该是急切还是懒惰?当您不确定要分配给 API 中某些函数的含义时,您可以随时继续设计过程;在以后的某个时候,不同含义选择的权衡可能会变得清晰。在这里,我们利用了一个有用的技巧:我们将考虑实现fork和获取各种含义需要什么样的信息。
如果 fork 立即开始并行评估其参数,则实现必须清楚地直接或间接地知道如何创建线程或将任务提交到某种线程池。此外,这意味着线程池(或我们用于实现并行性的任何资源)必须(全局)可访问并正确初始化,无论我们想要调用 fork 。4 这意味着我们失去了控制用于程序不同部分的并行策略的能力。虽然拥有用于执行并行任务的全局资源本身并没有错,但我们可以想象,对在哪里使用哪些实现进行更精细的控制会很有用(例如,我们可能希望大型应用程序的每个子系统都拥有自己的具有不同参数的线程池)。赋予 get 创建线程和提交执行任务的责任似乎更合适。
请注意,得出这些结论并不需要确切地知道fork and get将如何实现,甚至不需要知道Par的表示形式是什么。我们只是非正式地推理了实际生成并行任务所需的信息类型,并检查了让 Par 值知道此信息的后果。
相反,如果 fork 只是保留其未计算的参数直到以后,它不需要访问实现并行性的机制;它只是获取一个未评估的 Par 并将其标记为并发评估。现在让我们假设这个意思是叉子。有了这个模型,Par 本身不需要知道如何实际实现并行性。它更像是对并行计算的描述,稍后由 get 函数之类的东西解释。这与以前相比发生了转变,当时我们认为 Par 是一个值的容器,我们可以在它可用时简单地获得它。现在,它更像是我们可以运行的一流程序。因此,让我们重命名我们的 get 函数以运行,并指示这是实际实现并行性的地方:
extension [A](pa: Par[A]) def run: A
因为 Par 现在只是一个纯数据结构,所以 run 需要有一些实现并行性的方法,无论是生成新线程、将任务委托给线程池,还是使用其他机制。
7.2 选取表示
通过探索这个简单的例子并思考不同选择的后果,我们已经勾勒出以下 API。
清单 7.4 Par API 的基本草图
def unit[A](a: A): Par[A] ①
extension [A](pa: Par[A])
def map2[B, C](pb: Par[B])(f: (A, B) => C): Par[C] ②
def fork[A](a: => Par[A]): Par[A] ③
def lazyUnit[A](a: => A): Par[A] = fork(unit(a)) ④
extension [A](pa: Par[A]) def run: A ⑤
(1) 创建一个计算,该计算立即生成值 a
(2) 将两个并行计算的结果与二进制函数相结合
(3) 标记运行并发评估的计算
(4) 将表达式 a 包装起来,以便通过运行进行并发计算
(5) 完全评估给定的 Par,根据 fork 的请求生成并行计算并提取结果值
我们还松散地为这些不同的函数分配了含义:
- unit —将常量值提升为并行计算
- map2 —将两个并行计算的结果与二进制函数相结合
- fork — 标记并发评估的计算 – 在运行强制之前不会进行评估
- lazyUnit —将其未计算的参数包装在 Par 中,并将其标记为并发计算
- run —通过执行计算从 Par 中提取值
在勾勒出 API 的任何时候,您都可以开始考虑出现的抽象类型的可能表示形式。
习题 7.2 |
—————————————————————————————— |
在继续之前,请尝试为 Par 提出可以实现我们 API 功能的表示。
让我们看看我们是否可以想出一个表示。我们知道运行需要以某种方式执行异步任务。我们可以编写自己的低级API,但是在Java标准库中已经有一个类可以使用:java.util.concurrent.ExecutorService。以下是它的API,摘录并转录为Scala:
class ExecutorService:
def submit[A](a: Callable[A]): Future[A]
trait Callable[A]:
def call: A ①
trait Future[A]:
def get: A
def get(timeout: Long, unit: TimeUnit): A
def cancel(evenIfRunning: Boolean): Boolean
def isDone: Boolean
def isCancelled: Boolean
(1)本质上只是一个懒惰的A
所以 ExecutorService 让我们提交一个可调用的值(在 Scala 中,我们可能只是使用一个惰性参数来提交)并返回一个相应的 future,它是可能在单独线程中运行的计算的句柄。我们可以使用其 get 方法从 Future 中获取一个值(该方法阻止当前线程直到该值可用),并且它具有一些额外的取消功能(在阻塞一定时间后抛出异常等等)。
让我们尝试假设我们的 run 函数可以访问 ExecutorService,看看这是否暗示了 Par 的表示:
extension [A](pa: Par[A]) def run(s: ExecutorService): A
Par[A] 最简单的模型可能是 ExecutorService => A 。这将使运行变得微不足道。但是,将等待计算多长时间或是否取消计算的决定推迟到run的调用者可能会很好。所以 Par[A] 变成了 ExecutorService => Future[A],并且 run 只是返回 Future :
opaque type Par[A] = ExecutorService => Future[A] ①
extension [A](pa: Par[A]) def run(s: ExecutorService): Future[A] = pa(s)
(1) Par 被定义为不透明类型,尽管我们可以使用常规类型别名或包装函数的 case 类,就像上一章中的 State 一样。使用不透明类型可提供内部表示形式的封装,同时避免不必要的分配。
请注意,由于 Par 由需要 ExecutorService 的函数表示,因此在提供此 ExecutorService 之前,Future 的创建实际上不会发生。真的有那么简单吗?让我们假设它是现在的,如果我们发现它不允许我们想要的某些功能,请修改我们的模型。
7.2.1 优化接口
到目前为止,我们的工作方式有点人为。在实践中,设计 API 和选择表示形式之间没有如此明确的界限,而且一个不一定先于另一个。表示的想法可以为 API 提供信息,API 可以为表示的选择提供信息,并且很自然地在这两个视角之间流畅地切换,在出现问题时运行实验,构建原型等等。
我们将在本节中专门探讨我们的 API。虽然我们在考虑一个简单的例子中得到了很多好处,但在我们添加任何新的基元操作之前,让我们更多地了解使用我们已经拥有的基元操作可以表达的内容。有了我们的原语和对它们的意义的选择,我们为自己开辟了一个小宇宙。我们现在开始发现在这个宇宙中可以表达哪些想法。这可以而且应该是一个流动的过程;我们可以随时改变宇宙的规则,从根本上改变我们的表示或引入新的原始人,并探索我们的创造物的行为方式。
让我们从实现到目前为止开发的 API 的功能开始。现在我们有了 Par 的表示,它的第一个激活成功教程应该很简单。下面是使用我们选择的 Par 表示的简单实现。
清单 7.5 Par 的基本实现
object Par:
def unit[A](a: A): Par[A] = es => UnitFuture(a) ①
private case class UnitFuture[A](get: A) extends Future[A]:
def isDone = true
def get(timeout: Long, units: TimeUnit) = get
def isCancelled = false
def cancel(evenIfRunning: Boolean): Boolean = false
extension [A](pa: Par[A])
def map2[B, C](pb: Par[B])(f: (A,B) => C): Par[C] = ②
(es: ExecutorService) => ③
val futureA = a(es)
val futureB = b(es)
UnitFuture(f(futureA.get, futureB.get)) ④
def fork[A](a: => Par[A]): Par[A] = ⑤
es => es.submit(new Callable[A] {
def call = a(es).get
})
(1) unit 表示为返回 UnitFuture 的函数,它是 Future 的简单实现,只包装一个常量值。它根本不使用执行器服务;它总是完成的,不能取消。它的 get 方法只是返回我们给它的值。
(2) map2 不会在单独的逻辑线程中评估对 f 的调用,这符合我们的设计选择,即让 fork 成为 API 中用于控制并行性的唯一函数。如果我们希望f的评估发生在单独的线程中,我们总是可以做fork(pa.map<>(pb)(f))。
(3)我们从ExecutorService返回一个函数到Future[C]。es 上的类型归属在这里是可选的;Scala可以从Par的定义中推断出来。
(2) map<> 的这种实现不遵守超时。它只是将执行器服务传递给两个面值,等待期货 af 和 bf 的结果,对它们应用 f,并将它们包装在 UnitFuture 中。为了尊重超时,我们需要一个新的 Future 实现来记录评估 af 所花费的时间,然后从分配给评估 bf 的可用时间中减去该时间。
(5)这是fork最简单,最自然的实现,但它存在一些问题;首先,外部 Callable 将阻止等待内部任务完成。由于此阻塞占用了线程池中的线程,或者支持ExecutorService的任何资源,这意味着我们正在失去一些潜在的并行性。从本质上讲,当一个线程就足够了时,我们使用两个线程。这是实现中一个更严重问题的症状,我们将在本章后面讨论这个问题。
我们应该注意,Future 没有纯粹的功能界面。这是我们不希望我们库的用户直接与 Future 打交道的部分原因。但重要的是,即使 Future 上的方法依赖于副作用,我们的整个 Par API 仍然是纯粹的。只有在用户调用运行并且实现收到执行器服务之后,我们才会公开 Future 机器。因此,我们的用户编程到一个纯粹的接口,其实现仍然依赖于一天结束时的效果。但由于我们的 API 保持纯净,因此这些影响不是副作用。在第 4 部分中,我们将详细讨论这种区别。
习题 7.3 |
—————————————————————————————— |
困难:修复 map2 的实现,使其遵守 Future 上的超时协定。
习题 7.4 |
—————————————————————————————— |
此 API 已启用一组丰富的操作。下面是一个简单的示例。使用 lazyUnit 编写一个函数,将任何函数 A => B 转换为异步计算其结果的函数:
def asyncF[A, B](f: A => B): A => Par[B]
我们现有的组合器还能表达什么?让我们看一个更具体的例子。
假设我们有一个 Par[List[Int]] 表示生成 List[Int] 的并行计算,并且我们想将其转换为结果排序的 Par[List[Int]]:
def sortPar(parList: Par[List[Int]]): Par[List[Int]]
当然,我们可以运行 Par,对结果列表进行排序,并将其重新打包到带有单位的 Par 中 — 但我们希望避免调用 run 。我们拥有的唯一另一个允许我们以任何方式操纵 Par 值的组合器是 map2 .因此,如果我们将 parList 传递给 map2 的一侧,我们将能够访问内部的列表并对其进行排序,我们可以将我们想要的任何内容传递给 map2 的另一侧,所以让我们只传递一个 no-op:
def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
parList.map2(unit(()))((a, _) => a.sorted)
我们现在可以告诉 Par[List[Int]] 我们希望对该列表进行排序,但我们不妨进一步概括这一点。我们可以提升任何类型 A => B 的函数,成为取 Par[A] 并返回 Par[B] 的函数,我们可以将任何函数映射到 Par 上:
extension [A](pa: Par[A]) def map[B](f: A => B): Par[B] =
pa.map2(unit(()))((a, _) => f(a))
例如,sortPar 现在只是这样:
def sortPar(parList: Par[List[Int]]) =
parList.map(_.sorted)
这句话言简意赅。我们只是组合了操作以使类型对齐。然而,如果你看一下 map2 和 unit 的实现,应该很清楚,map的这种实现意味着一些明智的东西。
将一个虚假的值 unit(()) 作为参数传递给 map2 只是为了忽略它的值,这是作弊吗?一点也不!事实上,我们可以根据 map2 实现 map,但不能反过来,只是表明 map2 严格比 map 更强大。当我们设计库时,这种事情经常发生;一个看似原始的函数通常可以使用一些更强大的原始函数来表达。
我们还可以使用我们的 API 实现什么?我们可以并行映射列表吗?与 map2 不同,它结合了两个并行计算,parmap(我们称之为它)需要组合 N 个并行计算。似乎这应该以某种方式表达:
def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]]
我们总是可以将parMap写成一个新的基元。请记住,Par[A] 只是 ExecutorService => Future[A] 的别名。
将操作实现为新的基元并没有错。在某些情况下,我们甚至可以通过假设我们正在处理的数据类型的基础表示来更有效地实现操作。但是现在,我们有兴趣探索使用我们现有的API可以表达哪些操作,并掌握我们定义的各种操作之间的关系。在第 3 部分中,当我们展示如何跨库抽象常见模式时,了解哪些组合子是真正的基元将变得更加重要。5
让我们看看在现有组合器方面我们可以在多大程度上实现 parMap:
def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] =
val fbs: List[Par[B]] = ps.map(asyncF(f))
...
请记住,asyncF 通过分叉并行计算来生成结果,将 A => B 转换为 A => Par[B]。因此,我们可以很容易地分叉我们的N个并行计算,但我们需要一些方法来收集它们的结果。我们被困住了吗?好吧,仅通过检查类型,我们可以看到我们需要某种方式将我们的 List[Par[B]] 转换为 parMap 返回类型所需的 Par[List[B]]。
习题 7.5 |
—————————————————————————————— |
写这个函数,叫做序列。不需要额外的基元;不要调用运行:
def sequence[A](ps: List[Par[A]]): Par[List[A]]
一旦我们有了序列,我们就可以完成 parMap 的实现:
def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] =
fork:
val fbs: List[Par[B]] = ps.map(asyncF(f))
sequence(fbs)
请注意,我们已将我们的实现包装在对 fork 的调用中。通过此实现,parMap 将立即返回,即使对于庞大的输入列表也是如此。当我们稍后调用 run 时,它将分叉单个异步计算,该计算本身生成 N 个并行计算,然后等待这些计算完成,将它们的结果收集到一个列表中。相反,如果我们省略了对fork的调用,调用parMap将在调用序列之前首先创建fbs列表,从而在调用线程上执行一些计算。
习题 7.6 |
—————————————————————————————— |
实现 parFilter ,它并行过滤列表的元素:
def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]]
你能想到其他有用的函数来写吗?尝试编写一些自己的并行计算,看看哪些计算可以在没有其他基元的情况下表达。以下是一些可以尝试的想法:
- 我们在本章开头写的并行求和函数是否有更通用的版本?尝试使用它来并行查找 IndexedSeq 的最大值。
- 编写一个函数,该函数采用段落列表(List[String])并并行返回所有段落中的单词总数。寻找推广此函数的方法。
- 根据 map3 实现 map4、map5 和 map2。
7.3 API 的代数
如上一节所示,我们通常只需写下所需操作的类型签名,然后将类型跟踪到实现中,就可以走得更远。当以这种方式工作时,我们几乎可以忘记具体的领域(例如,当我们根据map2和unit实现map时),而只专注于排列类型。这不是作弊;这是一种自然的推理风格,类似于简化代数方程时的推理。我们将 API 视为代数 6 或一组抽象的操作,以及一组我们假设为真的定律或属性,并且只是按照此代数指定的规则进行形式符号操作。
到目前为止,我们一直在非正式地推理我们的 API。这并没有错,但退后一步并正式确定您希望为您的 API 保留(或想要保留)哪些法律可能会有所帮助。7 在不知不觉中,你可能已经在心理上建立了一个你期望的属性或规律的模型。实际上,将这些内容写下来并使它们精确,可以突出设计选择,这些选择在非正式推理时不会很明显。
7.3.1 映射定律
像任何设计选择一样,选择法律会产生后果;它对操作的含义施加约束,确定可能的实现选择,并影响哪些其他属性可以为真。让我们看一个例子,在这个例子中,我们将编造一个看似合理的可能定律。如果我们为库编写测试,这可以用作测试用例:
unit(1).map(_ + 1) == unit(2)
我们的意思是,使用 _ + 1 函数映射在单位 (1) 上在某种意义上等同于 unit(2) 。(法律通常是这样开始的——作为我们期望持有的身份的具体例子8。它们在什么意义上是等价的?这是一个有趣的问题。现在,假设两个 Par 对象是等效的,如果对于任何有效的 ExecutorService 参数,它们的 Future 结果具有相同的值。
我们可以使用如下函数检查这是否适用于特定的执行器服务:
def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
p(e).get == p2(e).get
法律和功能有很多共同点。正如我们可以概括函数一样,我们可以概括规律。例如,上述内容可以这样概括:
unit(x).map(f) == unit(f(x))
这里我们说这种等式应该适用于 x 和 f 的任何选择,而不仅仅是 1 和 _ + 1 函数。这种平等对我们的实施施加了一些限制;例如,我们的 Unit 实现不能检查它收到的值,并决定在输入为 42 时返回结果为 1 的并行计算——它只能传递它收到的任何内容。同样,对于我们的 执行器服务 ,当我们向它提交可调用对象执行时,它不能根据它收到的值做出任何假设或改变行为。更具体地说,该定律不允许在映射和单元的实现中进行向下转换或isInstanceOf 检查(通常分组在术语类型化下)。
就像我们努力用更简单的函数来定义函数一样,每个函数只做一件事,我们可以用更简单的定律来定义定律,每个定律只说一件事。让我们看看是否可以进一步简化这条法律。我们说我们希望这条定律适用于 x 和 f 的任何选择。如果我们用恒等函数代替 f ,就会发生一些有趣的事情。9 我们可以简化等式的两边,得到一个相当简单的新定律:10
unit(x).map(f) == unit(f(x)) ①
unit(x).map(id) == unit(id(x)) ②
unit(x).map(id) == unit(x) ③
y.map(id) == y ④
(1) 初始定律
(2)用恒等函数代替f。
(3)简化。
(4) 将两边的单位(x)替换为y。
迷人!我们新的,更简单的法律只谈论地图;显然,提到单位是一个无关紧要的细节。为了深入了解这项新法律的内容,让我们考虑一下地图不能做什么。比如说,它不能在将函数应用于结果之前抛出异常并使计算崩溃(你能明白为什么这违反了法律吗?它所能做的就是将函数f应用于y的结果,当然,当该函数为id时,y不受影响。11 更有趣的是,给定 y.map(id) == y,我们可以在另一个方向上执行替换,以恢复我们原来的、更复杂的定律。(试试吧!从逻辑上讲,我们可以自由地这样做,因为 map 对于它接收的不同函数类型,它的行为不可能有所不同。因此,给定 y.map(id) == y,则 unit(x).map(f) == unit(f(x)) 一定为真。由于我们免费获得这个第二定律或定理,仅仅因为映射的参数性,它有时被称为自由定理。12
习题 7.7 |
—————————————————————————————— |
困难:给定 y.map(id) == y , y.map(g).map(f) == y.map(f compose g) 是一个自由定理。(这有时被称为映射融合,它可以用作优化;我们可以将其折叠到第一个映射中,而不是生成单独的并行计算来计算第二个映射。13 你能证明吗?
7.3.2 分叉定律
尽管这一切很有趣,但这条特殊的法律并没有多大限制我们的实施。你可能一直在假设这些属性,甚至没有意识到它(在map,unit或ExecutorService.submit的实现中有任何特殊情况或让map随机抛出异常会很奇怪)。让我们考虑一个更强的属性——fork 不应该影响并行计算的结果:
fork(x) == x,
这似乎显然应该适用于我们的实现,并且它显然是一个理想的属性,符合我们对 fork 应该如何工作的期望。fork(x) 应该做与 x 相同的事情,但异步地——在与主线程分开的逻辑线程中。如果这条定律并不总是成立,那么我们必须以某种方式知道何时在不改变含义的情况下进行调用是安全的,而无需类型系统的任何帮助。
令人惊讶的是,这个简单的属性对我们 fork 的实现施加了强烈的约束。在你写下这样的法律之后,摘下你的实施者帽子,戴上你的调试器帽子,并尝试违反你的法律。仔细考虑任何可能的极端情况,尝试提出反例,甚至构建一个非正式的证据,证明法律成立——至少足够彻底地说服一个持怀疑态度的程序员同行。
7.3.3 违法:一个微妙的错误
让我们试试这种思维模式。我们期待 fork(x) == x 用于 x 的所有选择和任何 ExecutorService 的选择。我们对x可能是什么有很好的了解;它是一些使用叉、单位和map2(以及从这些派生的其他组合子)的表达式。执行人服务怎么样?它有哪些可能的实现方式?在类java.util.concurrent 中有一个很好的不同实现列表。执行程序(有关更多信息,请参阅 API:http://mng.bz/urQd)。
习题 7.8 |
—————————————————————————————— |
困难:查看 Executors 中的各种静态方法,以了解存在的 ExecutorService 的不同实现。然后,在继续之前,请返回并重新审视您的 fork 实现,并尝试找到一个反例或说服自己法律适用于您的实现。
为什么关于代码和证明的法律很重要
陈述和证明有关 API 的属性似乎不寻常。这当然不是普通编程中通常做的事情。为什么它在FP中很重要?
在函数式编程中,很容易并且期望将常见功能分解为可以组合的通用、可重用的组件。副作用会损害组合性,但更一般地说,任何隐藏的或带外的假设或行为阻止我们将组件(无论是功能还是其他任何东西)视为黑匣子,都会使组合变得困难或不可能。
在我们的fork定律的例子中,我们可以看到,如果我们假设的定律不成立,我们的许多通用组合器,如parMap,将不再合理(它们的使用可能是危险的,因为它们可能会,取决于它们使用的更广泛的并行计算,导致死锁)。
为我们的 API 提供具有有意义和辅助推理的定律的代数,使 API 对客户端更有用,但也意味着我们可以将 API 的对象视为黑盒。正如我们将在第 3 部分看到的,这对于我们在编写的不同库中分解常见模式的能力至关重要。
实际上,在大多数 fork 实现中都会出现一个相当微妙的问题。当使用由有界大小的线程池支持的 ExecutorService 时(请参阅 Executors.newFixedThreadPool ),很容易遇到死锁。14 假设我们有一个由线程池支持的 ExecutorService,其中最大线程数为 1。尝试使用我们当前的实现运行以下示例:
val a = lazyUnit(42 + 1)
val es = Executors.newFixedThreadPool(1)
println(Par.equal(es)(a, fork(a)))
大多数 fork 实现都会导致此代码死锁。你能明白为什么吗?让我们再看看我们的 fork 实现:
def fork[A](a: => Par[A]): Par[A] =
es => es.submit(new Callable[A] { def call = a(es).get }) ①
(1) 等待一个可赎回在另一个可赎回中的结果
请注意,我们首先提交可调用对象,在该可调用对象中,我们将向执行者服务提交另一个可调用对象并阻止其结果(回想一下,a(es)将向执行者服务提交可调用对象并取回未来)。如果我们的线程池大小为 1 ,这是一个问题。外部可调用对象由唯一线程提交和拾取;在该线程中,在它完成之前,我们提交并阻止等待另一个可调用对象的结果,但没有线程可用于运行此可调用对象。它们在互相等待,因此我们的代码陷入僵局。
习题 7.9 |
—————————————————————————————— |
困难:表明在这种 fork 实现的情况下,任何固定大小的线程池都可以死锁。
当你找到这样的反例时,你有两个选择:你可以尝试修复你的实现,使法律成立,或者你可以稍微完善你的定律,更明确地说明它成立的条件(你可以简单地规定你需要可以无限增长的线程池)。即使这是一个很好的练习;它强制您记录以前隐式的不变量或假设。
我们可以修复 fork 以在固定大小的线程池上工作吗?让我们看一个不同的实现:
def fork[A](fa: => Par[A]): Par[A] =
es => fa(es)
这当然避免了僵局。唯一的问题是我们实际上并没有分叉一个单独的逻辑线程来评估 fa.所以对于一些 ExecutorService 的 fork(hugeComputing)(es) 会在主线程中运行 hugeComputing,这正是我们想要通过调用 fork 来避免的。不过,这仍然是一个有用的组合器,因为它允许我们延迟计算的实例化,直到实际需要它。让我们给它起个名字 延迟 :
def delay[A](fa: => Par[A]): Par[A] =
es => fa(es)
但我们真的希望能够在固定大小的线程池上运行任意计算。为此,我们需要选择Par的不同表示形式。
7.3.4 使用Actos的完全非阻塞Par实现
在本节中,我们将开发一个完全无阻塞的 Par 实现,适用于固定大小的线程池。由于这对于我们讨论功能设计各个方面的总体目标并不重要,因此如果您愿意,可以跳到下一节。否则,请继续阅读。
当前表示的本质问题是,如果没有当前线程阻塞其 get 方法,我们就无法从 Future 中获取值。不以这种方式泄漏资源的 Par 表示必须是非阻塞的,因为 fork 和 map2 的实现绝不能像 Future.get 那样调用阻塞当前线程的方法。正确编写这样的实现可能具有挑战性。幸运的是,我们有我们的法律来测试我们的实施,我们只需要把它做好一次。之后,我们库的用户可以享受一个可组合和抽象的API,每次都做正确的事情。
在下面的代码中,您不需要确切了解它的每个部分发生了什么。我们只是想用真实的代码向您展示尊重法律的Par的正确表示可能是什么样子的。
基本思想
我们如何实现 Par 的非阻塞表示?这个想法很简单。与其将 Par 转换为 java.util.concurrent.future,我们可以从中获取一个值(这需要阻塞),我们将引入我们自己的 Future 版本,通过它我们可以注册一个回调,当结果准备就绪时将被调用。这是视角的轻微转变:
opaque type Future[+A] = (A => Unit) => Unit ①
opaque type Par[+A] = ExecutorService => Future[A] ②
(1) 将类型 A => Unit 的函数作为参数并返回 Unit 的函数
(2) Par 看起来是一样的,但我们使用的是新的非阻塞 Future 而不是 java.util.conconcurrent 中的 Future。
我们的 Par 类型看起来相同,除了我们现在使用的是新版本的 Future ,它的 API 与 java.util.并发 中的 API 不同。我们的未来不是调用get来从我们的未来获得结果,而是封装一个接收另一个函数的函数 – 一个期望A并返回一个单位的函数。A => Unit 函数有时称为延续或回调。
通过这种编码,当我们将 ExecutorService 应用于表示 Par[A] 的函数时,我们得到一个新函数:(A => 单位) => 单位。然后,我们可以通过传递处理生成的 A 值的回调来调用它。每当计算 A 时,都会调用我们的回调,而不是立即调用。
对纯 API 使用本地副作用
我们在这里定义的 Future 类型是相当必要的 — A => 单位 。这样的函数只能用于使用给定的 A 执行一些副作用,因为我们当然没有使用返回的结果。当使用像 Future 这样的类型时,我们是否仍在进行函数式编程?是的,但我们正在利用使用副作用作为纯函数式 API 的实现细节的常用技术。我们可以侥幸逃脱,因为我们使用的副作用对于使用 Par 的代码是无法观察到的。请注意,Future 的函数表示是不透明的,不能由外部代码调用。
当我们完成非阻塞 Par 的其余实现时,您可能希望说服自己,外部代码无法观察到所使用的副作用。第14章更详细地讨论了局部效应的概念,可观察性以及我们对纯度和参考透明度定义的微妙之处,但就目前而言,非正式的理解是可以的。
有了 Par 的这种表示,让我们看看如何首先实现 run 函数,我们将它更改为只返回一个 A。由于它从Par[A]到A,它必须构造一个延续并将其传递给未来。
清单 7.6 实现 Par 运行
extension [A](pa: Par[A]) def run(es: ExecutorService): A =
val ref = new AtomicReference[A] ①
val latch = new CountDownLatch(1) ②
pa(es) { a => ref.set(a); latch.countDown } ③
latch.await ④
ref.get ⑤
(1) 用于存储结果的可变线程安全引用(有关这些类的更多信息,请参阅 java.util.concurrent.atomic 包)
(2) java.util.concurrent.CountDownLatch 允许线程等待,直到其 countDown 方法被调用一定次数。在这里,当我们从 p 收到类型 A 的值时,将调用一次 countDown 方法,我们希望运行实现阻止,直到发生这种情况。
(3) 当我们收到值时,它会设置结果并释放锁存器。
(4) 等待结果可用且闩锁松开
(5) 一旦我们传递了闩锁,我们就知道 ref 已经设置好了,我们返回它的值
应该注意的是,运行会在等待闩锁时阻塞调用线程。不可能编写不阻塞的运行实现。由于它需要返回一个类型为A的值,它需要等待该值可用才能返回。出于这个原因,我们希望 API 的用户在想要等待结果之前避免调用 run。我们甚至可以从我们的 API 中完全删除 run,并在 Par 上公开 apply 方法,以便用户可以注册异步回调。这当然是一个有效的设计选择,但我们现在将保留我们的 API。
让我们看一个创建 Par 的示例。最简单的一个是单位:
def unit[A](a: A): Par[A] =
es => cb => cb(a) ①
(1) 它只是将值传递给延续。请注意,不需要执行器服务。
由于单位已经有一个可用的 A 类型的值,它需要做的就是调用延续 cb ,传递它这个值。例如,如果该延续是来自我们运行实现的延续,这将释放闩锁并立即提供结果。
叉子呢?这就是我们介绍实际并行性的地方:
def fork[A](a: => Par[A]): Par[A] =
es => cb => eval(es)(a(es)(cb)) ①
def eval(es: ExecutorService)(r: => Unit): Unit = ②
es.submit(new Callable[Unit] { def call = r })
(1)评估分叉A的评估并立即返回。回调将在另一个线程上异步调用。
(2) 一个帮助程序函数,用于使用某些执行器服务异步评估操作
当 fork 返回的未来收到它的延续 cb 时,它会分叉一个任务来计算按名称参数 a。一旦参数被评估并调用以生成 Future[A],我们就注册 cb 以在该 Future 具有其结果 A 时被调用。
地图2呢?召回签名:
extension [A](pa: Par[A]) def map2[B, C](pb: Par[B])(f: (A, B) => C): Par[C]
在这里,非阻塞实现要棘手得多。从概念上讲,我们希望 map2 并行运行两个 Par 参数。当两个结果都到达时,我们要调用 f,然后将结果 C 传递给延续。但是这里有几个竞争条件需要担心,并且仅使用java.util.并发的低级原语很难实现正确的非阻塞实现。
演员简介
为了实现 map2,我们将使用一个非阻塞并发原语,称为 actor。Actor 本质上是一个并发进程,不会经常占用线程。相反,它仅在收到消息时占用线程。重要的是,尽管多个线程可能同时向一个参与者发送消息,但该参与者一次只处理一条消息,将其他消息排队等待后续处理。这使得它们在编写必须由多个线程访问的棘手代码时作为并发原语很有用,否则这些代码很容易出现争用条件或死锁。
最好用一个例子来说明这一点。许多 actor 的实现都非常适合我们的目的,包括流行的 Akka 库中的那个(见 https://github.com/akka/akka),但为了简单起见,我们将使用 Actor.scala 文件中章节代码中包含的我们自己的最小 actor 实现:
scala> import fpinscala.parallelism.*
scala> val s = Executors.newFixedThreadPool(4) ①
s: java.util.concurrent.ExecutorService = ...
scala> val echoer = Actor[String](s):
| msg => println(s"Got message: '$msg'") ②
echoer: fpinscala.parallelism.Actor[String] = ...
(1) 参与者使用执行器服务来处理消息到达时,因此我们在这里创建一个。
(2) 这是一个非常简单的 actor,它只是回显它收到的字符串消息。请注意,我们提供 s,一个执行器服务,用于处理消息。
我们来试试这个Actor:
scala> echoer ! "hello" ①
Got message: 'hello'
scala> ②
scala> echoer ! "goodbye" ③
Got message: 'goodbye'
scala> echoer ! "You're just repeating everything I say, aren't you?"
Got message: 'You're just repeating everything I say, aren't you?'
(1)它向演员发送“你好”消息。
(2) 请注意,echoer 此时不占用线程,因为它没有进一步的消息要处理。
(3)它向演员发送“再见”信息。参与者的反应是将任务提交到其执行器服务来处理该消息。
了解Actor实现根本不重要。正确、高效的实现是相当微妙的,但如果你好奇,请参阅章节代码中的 Actor.scala 文件。实现不到 100 行普通 Scala 代码。15
通过参与者实现 map2
现在,我们可以使用 Actor 实现 map2,以从两个参数中收集结果。代码很简单,无需担心竞争条件,因为我们知道 Actor 一次只会处理一条消息。
7.7 使用 Actor 实现 map2
extension [A](p: Par[A]) def map2[B, C](p2: Par[B])(f: (A, B) => C): Par[C] =
es => cb =>
var ar: Option[A] = None ①
var br: Option[B] = None
// this implementation is a little too liberal in forking of threads -
// it forks a new logical thread for the actor and for stack-safety,
// forks evaluation of the callback `cb`
val combiner = Actor[Either[A,B]](es): ②
case Left(a) =>
if br.isDefined then eval(es)(cb(f(a, br.get))) ③
else ar = Some(a)
case Right(b) =>
if ar.isDefined then eval(es)(cb(f(ar.get, b))) ④
else br = Some(b)
p(es)(a => combiner ! Left(a)) ⑤
p2(es)(b => combiner ! Right(b))
(1) 两个可变变量用于存储两个结果。
(2)等待两个结果的参与者,将它们与f组合,并将结果传递给cb
(3) 如果 A 结果先出现,则将其存储在 ar 中并等待 B。如果 A 结果排在最后,并且我们已经有了 B,它会调用 f 和两个结果,并将结果 C 传递给回调 cb。
(4) 类似地,如果 B 结果首先出现,它会将其存储在 br 中并等待 A。如果 B 结果排在最后并且我们已经有了 A,它会调用 f 和两个结果,并将结果 C 传递给回调 cb。
(5)它将演员作为延续传递给双方。在 A 侧,我们将结果包装在 Left 中,在 B 侧,我们将其包装在 Right 中。这些是 Both(双)数据类型的构造函数,它们用于向参与者指示结果的来源。
鉴于这些实现,我们现在应该能够运行任意复杂度的 Par 值,而不必担心线程耗尽,即使参与者只能访问单个 JVM 线程。
让我们在 REPL 中尝试一下:
scala> import java.util.concurrent.Executors, fpinscala
➥ .parallelism.Nonblocking.Par
scala> val p = Par.parMap(List.range(1, 100000))(math.sqrt(_))
p: fpinscala.parallelism.Nonblocking.Par[List[Double]] = < function >
scala> val x = p.run(Executors.newFixedThreadPool(2))
x: List[Double] = List(1.0, 1.4142135623730951, 1.7320508075688772,
2.0, 2.23606797749979, 2.449489742783178, 2.6457513110645907, 2.828
4271247461903, 3.0, 3.1622776601683795, 3.3166247903554, 3.46410...
这将调用fork大约100,000次,启动大约100,000个演员,一次将结果组合两个。由于我们的非阻塞Actor实现,我们不需要100,000个JVM线程。
匪夷所思。我们的分叉定律现在适用于固定大小的线程池。
习题 7.10 |
—————————————————————————————— |
困难:我们的非阻塞表示目前根本不处理错误。如果我们的计算在任何时候抛出异常,则运行实现的锁存器永远不会倒计时,并且异常会被简单地吞噬。你能解决这个问题吗?
退一步说,本节的目的不一定是找出 fork 的最佳非阻塞实现,而是表明法律很重要。它们为我们在考虑图书馆设计时提供了另一个角度。如果我们没有尝试编写 API 的一些定律,我们可能直到很久以后才在第一次实现中发现线程资源泄漏。
通常,在为 API 选择法律时,可以考虑多种方法。你可以从那里思考你的概念模型和推理,以假设应该成立的规律。你也可以发明你认为可能有用或有启发性的定律(就像我们对叉子定律所做的那样),看看是否有可能和明智地确保它们适用于你的模型。最后,您可以查看您的实施情况,并在此基础上提出您期望持有的法律。16
7.4 将组合器提炼到最一般的形式
功能设计是一个迭代过程。写下 API 并至少有一个原型实现后,请尝试将其用于逐渐更复杂或更现实的方案;有时你会发现这些场景需要新的组合器。在直接进入实现之前,最好先看看是否可以将所需的组合器细化为最通用的形式。可能你只需要一些更通用的组合器的特定情况。
关于本节中的练习
本节中的练习和答案使用我们原始的Par[A]的更简单(阻塞)表示。如果您想使用我们在上一节中开发的非阻塞实现来完成练习和答案,请参阅练习和答案项目中的 Nonblocking.scala 文件。
让我们看一个例子。假设我们希望一个函数根据初始计算的结果在两个分叉计算之间进行选择:
def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A]
这将构造一个计算,如果 cond 结果为真,则从 t 开始,如果 cond 结果为假,则使用 f。我们当然可以通过阻塞 cond 的结果,然后使用此结果来确定是运行 t 还是 f 来实现这一点。下面是一个简单的阻塞实现:17
def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] =
es =>
if cond.run(es).get then t(es) ①
else f(es)
(1) 请注意,我们正在阻止 cond 的结果。
但在我们继续之前,让我们考虑一下这个组合器。它在做什么?它正在运行 cond ,然后当结果可用时,它运行 t 或 f 。这似乎是合理的,但让我们看看我们是否可以想到一些变化来理解这个组合器的本质。事实上,我们使用布尔值并且只在两种可能的并行计算中选择,t和f,这里有一些相当武断的事实。为什么只有两个?如果能够根据第一个计算的结果在两个并行计算之间进行选择很有用,那么在 N 个计算之间进行选择应该很有用:
def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A]
假设 choiceN 运行 n,然后使用它从 options 中选择并行计算。这比选择更笼统一些。
习题 7.11 |
—————————————————————————————— |
实现选择N,然后根据选择N进行选择。
请注意我们到目前为止所做的工作;我们已经将我们原来的组合器Choice改进为ChoiceN,结果证明它更通用,能够表达Choice以及Choice不支持的其他用例。但是,让我们继续看看我们是否可以将选择细化为更通用的组合器。
习题 7.12 |
—————————————————————————————— |
关于选择N仍然有一些相当武断的东西:List的选择似乎过于具体。为什么我们拥有什么样的容器很重要?例如,如果我们有一个计算列表而不是计算列表,该怎么办?18
def choiceMap[K, V](key: Par[K])(choices: Map[K, Par[V]]): Par[V]
一组可能选项的映射编码感觉过于具体,就像 List 一样。如果我们看一下我们的 choiceMap 实现,我们可以看到我们并没有真正使用 Map 的 API。实际上,Map[A,Par[B]] 用于提供一个函数:A => Par[B]。现在我们已经发现,回顾 choice 和 choiceN ,我们可以看到,对于选择,这对参数被用作布尔 => Par[A] 类型的函数(布尔值选择两个 Par[A] 参数之一),对于 choiceN,该列表被用作 Int => Par[A] 类型的函数!让我们做一个更通用的签名来统一它们:
extension [A](pa: Par[A]) def chooser[B](choices: A => Par[B]): Par[B]
习题 7.13 |
—————————————————————————————— |
实现这个新的基元选择器,然后用它来实现选择和选择N。
每当你泛化这样的函数时,当你完成时,请批判性地看看你的泛化函数。尽管该功能可能是由某些特定用例激发的,但签名和实现可能具有更一般的含义。在这种情况下,选择器可能不再是此操作最合适的名称,它实际上非常通用 – 它是一个并行计算,运行时将运行初始计算,其结果用于确定第二个计算。在第一次计算的结果可用之前,甚至不需要存在第二个计算。它不需要存储在容器中,例如 列表 or 地图 .也许它是使用第一次计算的结果从整块布生成的。这个函数经常出现在函数库中,通常称为绑定或flatMap:
extension [A](pa: Par[A]) def flatMap[B](f: A => Par[B]): Par[B]
flatMap 真的是最原始的函数吗,或者我们可以进一步泛化吗?让我们再玩一玩。flatMap 这个名字暗示了这样一个事实,即此操作可以分解为两个步骤:将 f: A => Par[B] 映射到我们的 Par[A] 上,这会生成一个 Par[Par[B]],然后将这个嵌套的 Par[Par[B]] 展平化为 Par[B]。但这很有趣;它表明我们需要做的就是添加一个更简单的组合器——我们称之为连接——用于将 Par[Par[X]] 转换为 Par[X],以便选择任何 X:
def join[A](ppa: Par[Par[A]]): Par[A]
同样,我们只是遵循类型。我们有一个示例,它需要一个具有给定签名的函数,因此我们将其变为存在。现在它存在,我们可以考虑签名的含义。我们称之为 join ,因为从概念上讲,它是一个并行计算,运行时将执行内部计算,等待它完成(很像 Thread.join ),然后返回其结果。
习题 7.14 |
—————————————————————————————— |
实现联接 。你能看到如何使用连接实现平面地图吗?你能使用 flatMap 实现连接吗?
我们将到此为止,但鼓励您进一步探索这个代数。尝试更复杂的示例,发现新的组合器,看看你发现了什么!以下是需要考虑的一些问题:
- 你能实现一个与 map2 具有相同签名但使用 flatMap 和单位的函数吗?它的含义与map2有何不同?
- 你能想到与代数的其他原语相关的定律吗?
- 是否有不能用这个代数表达的并行计算?你能想到任何甚至不能通过向代数添加新原语来表达的计算吗?
认识代数的表现力和局限性
当你练习更多的函数式编程时,你将培养的技能之一是能够识别哪些函数可以从代数中表达,以及该代数的局限性是什么。例如,在前面的例子中,一开始可能并不明显,像选择这样的函数不能纯粹用map、map2和unit来表示,选择可能并不明显,选择只是flatMap的一个特例。随着时间的推移,像这样的观察会很快到来,你也会更好地发现如何修改你的代数,使一些需要的组合子可表达。这些技能将对您的所有 API 设计工作有所帮助。
作为一个实际的考虑,能够将 API 简化为一组最小的基元函数非常有用。正如我们之前在现有组合器中实现parMap时所指出的,原始组合器经常封装一些相当棘手的逻辑,重用它们意味着我们不必复制这个逻辑。
7.5 结论
我们现在已经完成了一个库的设计,用于以纯函数方式定义并行和异步计算。虽然这个领域很有趣,但本章的主要目标是为你提供一个了解功能设计过程的窗口,了解你可能遇到的问题类型,以及处理这些问题的想法。
第4-6章有一个强烈的关注点分离主题,特别是与将计算的描述与然后运行它的解释器分离的想法有关。在本章中,我们看到了该原则在库的设计中的实际应用,该库将并行计算描述为数据类型 Par 的值,并使用单独的解释器运行来实际生成线程以执行它们。
在下一章中,我们将研究一个完全不同的领域,为该领域提供另一个蜿蜒的API之旅,并进一步了解功能设计。
总结
- 现有的图书馆都无法重新审查。大多数库都包含任意设计选择。尝试构建替代库可能会导致发现有关问题空间的新事物。
- 简单的例子让我们专注于问题域的本质,而不是迷失在偶然的细节中。
- 并行和异步计算可以以纯函数方式建模。
- 构建计算的描述以及运行计算的单独解释器允许将计算视为值,然后可以与其他计算组合。
- API 设计的一种有效技术是调用类型和实现,尝试实现这些类型和实现,进行调整和迭代。
- Par[A] 类型描述了一种计算,该计算可以在多个线程上评估部分或全部计算。
- Par 值可以变换并与许多熟悉的操作相结合,例如 map、flatMap 和 序列。
- 将 API 视为代数并定义约束实现的定律既是有价值的设计工具,也是有效的测试技术。
- 将 API 划分为一组最小的基元函数和一组组合器函数可促进重用和理解。
- 参与者是基于消息传递的非阻塞并发原语。Actor不是纯粹的功能,但可用于实现纯功能API,如非阻塞Par的map2实现所示。
7.9 练习答案
答案 7.1 |
—————————————————————————————— |
让我们为每个并行计算引入一个类型参数,以及组合每个并行计算结果的函数的输出:
def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C]
或者,我们可以将 map2 定义为 Par[A] 上的扩展方法:
extension [A](pa: Par[A]) def map2[B, C](pb: Par[B])(f: (A, B) => C): Par[C]
答案 7.2 |
—————————————————————————————— |
答案将在练习 7.2 之后立即讨论。
答案 7.3 |
—————————————————————————————— |
我们最初的 map2 实现等待两个期货完成,然后返回一个包含最终结果的 UnitFuture。在调用者看到未来之前,我们已经等待了两个组成计算完成!
而不是使用UnitFuture,我们需要开始组成计算并立即返回引用它们的复合未来。然后,调用方可以在 Future 上使用超时的 get 重载或任何其他方法。
为了实现 get 的超时变体,我们需要在我们开始的每个期货上调用它。我们可以使用提供的超时等待第一个结果,并测量完成所需的时间。然后,我们可以等待第二个结果,将超时减少等待第一个结果的时间:
extension [A](pa: Par[A])
def map2Timeouts[B, C](pb: Par[B])(f: (A, B) => C): Par[C] =
es => new Future[C]:
private val futureA = pa(es) ①
private val futureB = pb(es)
@volatile private var cache: Option[C] = None
def isDone = cache.isDefined
def get() = get(Long.MaxValue, TimeUnit.NANOSECONDS)
def get(timeout: Long, units: TimeUnit) =
val timeoutNs = TimeUnit.NANOSECONDS.convert(timeout, units)
val started = System.nanoTime
val a = futureA.get(timeoutNs, TimeUnit.NANOSECONDS) ②
val elapsed = System.nanoTime - started
val b = futureB.get(timeoutNs - elapsed,
➥ TimeUnit.NANOSECONDS) ③
val c = f(a, b)
cache = Some(c)
c
def isCancelled = futureA.isCancelled || futureB.isCancelled
def cancel(evenIfRunning: Boolean) =
futureA.cancel(evenIfRunning) || futureB.cancel(evenIfRunning)
(1)当未来被创造出来时,我们立即开始组成计算。
(2) 等待 futureA 完成,如果未在请求的时间内完成,则超时。
(3) 等待未来 B 完成,但只等待请求的超时与等待未来 A 完成的已用时间之间的差异。
答案 7.4 |
—————————————————————————————— |
我们返回一个匿名函数,在收到 A 类型的值时,立即调用 lazyUnit,传递 f(a):
def asyncF[A, B](f: A => B): A => Par[B] =
a => lazyUnit(f(a))
由于 lazyUnit 采用按名称参数,因此尚未计算 f(a)。要了解为什么 lazyUnit(f(a)) 实现了所需的功能,让我们逐步替换定义:
a => lazyUnit(f(a))
a => fork(unit(f(a))) ①
a => es => es.submit(
new Callbable[B] { def run = unit(f(a))(es).get }) ②
a => es => es.submit(
new Callbable[B] { def run = (es => UnitFuture(f(a)))(es).get }) ③
a => es => es.submit(
new Callbable[B] { def run = UnitFuture(f(a)).get }) ④
a => es => es.submit(
new Callbable[B] { def run = f(a) }) ⑤
(1)替换懒惰单位。
(2)替代叉。
(3)替代单位。
(4) 将 es 应用于 es => UnitFuture(f(a))。
(5)替换单位未来#获取。
我们剩下一个函数,它接收 A 并返回 Par,该函数将作业提交给在操作系统线程上运行时计算 f(a) 的执行器。
答案 7.5 |
—————————————————————————————— |
到目前为止,我们已经实现了很多次序列 – 一个使用 map2 的 foldRight。因此,让我们稍微复制、粘贴和调整类型:
def sequence[A](pas: List[Par[A]]): Par[List[A]] =
pas.foldRight(unit(List.empty[A]))((pa, acc) => pa.map2(acc)(_ :: _))
但是,在这种情况下,我们可以做得更好,使用类似于我们使用的技术 总和 .让我们将计算分为两半,并并行计算每半。由于我们再次需要对集合中的元素进行有效的随机访问,因此我们将首先编写一个与 IndexedSeq[Par[A]] 一起使用的版本:
def sequenceBalanced[A](pas: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] =
if pas.isEmpty then unit(IndexedSeq.empty)
else if pas.size == 1 then pas.head.map(a => IndexedSeq(a))
else
val (l, r) = pas.splitAt(pas.size / 2)
sequenceBalanced(l).map2(sequenceBalanced(r))(_ ++ _)
然后我们可以根据序列平衡来实现序列:
def sequence[A](pas: List[Par[A]]): Par[List[A]] =
sequenceBalanced(pas.toIndexedSeq).map(_.toList)
答案 7.6 |
—————————————————————————————— |
我们可以首先使用提供的谓词过滤列表,并将结果列表提升到 Par 中:
def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]] =
unit(as.filter(f))
但是,此解决方案没有任何并行性。我们可以尝试通过使用 lazyUnit 而不是 unit 来解决这个问题:
def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]] =
lazyUnit(as.filter(f))
这要好一些,因为过滤是在不同的逻辑线程上完成的,但所有过滤都是在同一逻辑线程上完成的。我们真的希望谓词的每个调用都有它自己的逻辑线程。就像在 parMap 中一样,我们可以使用 asyncF ,但这一次,我们不是直接传递 f,而是传递一个匿名函数,该函数将 A 转换为 List[A] — 如果谓词通过,则为单个元素列表,否则为空列表:
def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]] =
fork: ①
val pars: List[Par[List[A]]] =
as.map(asyncF(a =>
if f(a) then List(a) else Nil)) ②
sequence(pars).map(_.flatten) ③
(1) 就像在 parMap 中一样,我们立即分叉,因此对原始列表的映射是在单独的逻辑线程上完成的,而不是在调用者的线程上完成的。
(2) 我们使用 asyncF 将 A => List[A] 函数转换为 A => Par[List[A]] 函数。
(3) sequence(pars) 返回一个 Par[List[List[A]]],所以我们映射它并展平内部嵌套列表。
答案 7.7 |
—————————————————————————————— |
我们将从我们试图证明的相等开始,然后进行代数替换,直到我们证明等式的两边简化为相同的值:
y.map(g).map(f) == y.map(f compose g) ①
y.map(id).map(f compose g) == y.map((f compose g)compose id) ②
y.map(id).map(f compose g) == y.map(f compose g) ③
y.map(f compose g) == y.map(f compose g) ④
(1) 初始定律
(2)将恒等函数替换为g,f在两侧将g组成f。
(3) 将右侧的 (f 组成 g) 撰写 id 简化为 f 组成 g。
(4) 将左侧的 y.map(id) 简化为 y。
答案 7.8 |
—————————————————————————————— |
考虑使用只有一个线程的固定线程池时会发生什么情况。下一节将更详细地探讨这一点。
答案 7.9 |
—————————————————————————————— |
任何固定大小的线程池都可以通过运行 fork(fork(fork(x))) 形式的表达式来死锁,其中至少比池中的线程多一个分支。池中的每个线程都会阻塞对 .get 的调用,从而导致所有线程都被阻塞,而另一个逻辑线程正在等待运行,从而解决所有等待。
答案 7.10 |
—————————————————————————————— |
我们将在第13章回到这个问题。
答案 7.11 |
—————————————————————————————— |
我们通过运行 n 并等待结果来计算选择的索引。然后,我们在该索引的选项中查找 Par 并运行它:
def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A] =
es =>
val index = n.run(es).get % choices.size
choices(index).run(es)
根据 choiceN 实现选择涉及将条件转换为索引,我们可以通过 map .在这里,我们选择将真实案例索引设为 0,将错误案例索引设为 1:
def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] =
choiceN(cond.map(b => if b then 0 else 1))(List(t, f))
答案 7.12 |
—————————————————————————————— |
选择映射的实现与选择N的实现几乎相同;我们不是使用索引在列表中查找 Par,而是使用键在 Map 中进行查找:
def choiceMap[K, V](key: Par[K])(choices: Map[K, Par[V]]): Par[V] =
es =>
val k = key.run(es).get
choices(k).run
如果你愿意,停止阅读这里,看看你是否可以想出一个新的、更通用的组合器,你可以实现 choice、choiceN 和 choiceMap 。
答案 7.13 |
—————————————————————————————— |
实现几乎与 choiceN 和 choiceMap 相同,唯一的区别是选择查找是如何完成的:
extension [A](pa: Par[A]) def chooser[B](choices: A => Par[B]): Par[B] =
es =>
val a = pa.run(es).get
choices(a).run
现在可以通过选择器实现选择,方法是传递一个函数,该函数在 COND 返回 true 时选择 t,否则选择 f:
def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] =
cond.chooser(b => if b then t else f)
同样,可以通过传递一个在选项列表中执行查找的函数来实现 choiceN。
def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A] =
n.chooser(i => choices(i % choices.size))
答案 7.14 |
—————————————————————————————— |
我们首先运行外部 Par 并等待它完成。然后我们运行生成的内部 Par:
def join[A](ppa: Par[Par[A]]): Par[A] =
es => ppa.run(es).get.run(es)
为了通过连接实现flatMap,我们首先在初始pa上映射f,给我们一个Par[Par[B]]。然后我们加入:
extension [A](pa: Par[A]) def flatMap[B](f: A => Par[B]): Par[B] =
join(pa.map(f))
在flatMap中实现join有点棘手。flatMap 允许我们将 Par[X] 转换为 Par[Y],给定函数 X => Par[Y]。诀窍是取 X = Par[A] 和 Y = A .因此,我们需要一个函数 Par[A] => Par[A],它是恒等函数:
def join[A](ppa: Par[Par[A]]): Par[A] =
ppa.flatMap(identity)
1 在本章中,我们将非正式地使用术语逻辑线程来表示与程序的主执行线程同时运行的计算。逻辑线程和操作系统线程之间不需要一对一的对应关系;例如,我们可能通过线程池将大量逻辑线程映射到较少数量的操作系统线程上。
2 Scala 中的函数参数从左到右严格计算,所以如果单元延迟执行直到调用 get,那么我们将生成并行计算并等待它完成,然后再生成第二个并行计算。这意味着计算实际上是顺序的!
3 这种对表示的漠不关心暗示了这些操作实际上更通用,可以抽象为适用于 Par 以外的类型。我们将在第 3 部分中详细探讨此主题。
4 这很像第 1 章中 Cafe 示例中的 buyCoffee 方法访问信用卡处理系统的方式。
5 在这种情况下,还有另一个很好的理由不将 parMap 实现为新的原语:正确执行此操作具有挑战性,特别是如果我们想正确尊重超时。通常情况下,原始组合器封装了一些相当棘手的逻辑,重用它们意味着我们不必复制这个逻辑。
6 我们的意思是一个或多个集合的数学意义上的代数,以及对这些集合的对象进行操作的函数集合,以及一组公理。公理是假定为真的陈述,我们可以从中推导出其他也必须为真的定理。在我们的例子中,集合是特定的类型,如Par[A]和List[Par[A]],函数是像map2、单位和序列这样的运算。
7 关于这一点,我们将在本书的其余部分有更多的话要说。在下一章中,我们将设计一个声明式测试库,让我们定义我们期望函数满足的属性,并自动生成测试用例来检查这些属性。在第 3 部分中,我们将介绍仅由定律集指定的抽象接口。
8 这里我们指的是数学意义上的恒等式,即两个表达式相同或等价的陈述。
9 恒等函数定义为 def id[A](a: A): A = a 。
10 这与求解代数方程时可能做的替换和简化相同。
11 我们说map需要保持结构,因为它不会改变并行计算的结构,只会改变计算中的值。
12 自由定理的概念是由菲利普·瓦德勒(Philip Wadler)在他的经典论文“免费定理!(http://mng.bz/Z9f1)。
13 我们对 Par 的表示无法实现此优化,因为它是一个不透明的函数。例如,y.map(g) 返回一个新的 Par,它是一个黑盒——当我们对该结果调用 .map(f) 时,我们已经失去了用于构造 y.map(g) 的部分的知识:即 y、map 和 g。我们所看到的只是不透明的函数,因此无法提取出g来与f组成。如果将 Par 化为数据类型(例如,各种操作的枚举),那么我们可以模式匹配并发现应用此规则的机会。您可能想尝试自己尝试这个想法。
14 在下一章中,我们将编写一个用于测试的组合器库,它可以帮助自动发现此类问题。
15 actor 实现中的主要棘手之处与多个线程可能同时向 actor 发送消息这一事实有关。实现需要确保一次处理一个消息,以及最终处理发送给参与者的所有消息,而不是无限期地排队。即便如此,代码最终还是很短。
16 最后一种产生规律的方法可能是最薄弱的,因为即使实施有缺陷或需要各种不寻常的附带条件,使组合变得困难,也很容易让法律反映实施。
17 请参阅非阻塞实现章节代码中的 Nonblocking.scala。
18 Map[K,V](参见API:http://mng.bz/eZ4l)是Scala标准库中的纯函数式数据结构。它将 K 类型的键与 V 类型的值以一对一的关系相关联,并允许我们通过关联的键查找值。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/63230.html