GPars 遇见虚拟线程

作者:Paul King
发布:2022-06-15 11:28AM (上次更新:2023-04-14 06:23PM)


gpars JDK21 中即将出现的一项令人兴奋的功能是虚拟线程 (JEP 444)。它已经在 JDK19 (JEP 425) 和 JDK20 (JEP 436) 中进行了预览,但 JDK21 将是第一个无需使用 Java 的预览开关即可使用该功能的版本。本博文中的示例使用 Groovy 4.0.11 和 JDK 版本 21-ea+18-1480 (以及启用预览功能的 JDK19) 进行测试。

虚拟线程与我最喜欢的 Groovy 并行和并发库 GPars 协同良好。GPars 已经存在一段时间 (从 Java 5 和 Groovy 1.8 的时代开始),但仍然拥有许多有用的功能。让我们来看几个例子。

如果你想尝试一下,请使用最新的 JDK21ea 版本,它默认支持虚拟线程。或者使用最新的 JDK19-20 版本,并使用你的 Groovy 工具启用 *预览* 功能。

并行集合

首先,回顾一下,我们将首先看看如何使用 GPars 并行集合功能与普通线程一起使用。让我们从一个数字列表开始

var nums = [1, 2, 3]

为了使用普通线程并行计算原始数字的平方列表,我们使用 GParsPool.withPool 方法,如下所示

withPool {
    assert nums.collectParallel{ it ** 2 } == [1, 4, 9]
}

对于任何 Java 阅读者,不要与 collectParallel 方法名称混淆。Groovy 的 collect 方法 (命名灵感来自 Smalltalk) 等同于 Java 的 map 方法。因此,使用 Java 流 API 的等效 Groovy 代码将类似于

assert nums.parallelStream().map(n -> n ** 2).toList() == [1, 4, 9]

现在,让我们将虚拟线程引入其中。幸运的是,GPars 并行集合设施提供了用于使用 *现有* 自定义执行器服务的钩子 (GParsExecutorsPool.withExistingPool)。这使得在这样的代码中使用虚拟线程变得很容易。首先,我们创建自己的池 (由虚拟线程支持)

var vtPool = Executors.newVirtualThreadPerTaskExecutor()

现在,我们可以像这样使用它

withExistingPool(vtPool) {
    assert nums.collectParallel{ it ** 2 } == [1, 4, 9]
}

或者我们可以使用其他许多 '*Parallel' 方法,在本例中为 findAllParallel

var isEven = n -> n % 2 == 0
withExistingPool(vtPool) {
    assert (1..9).findAllParallel(isEven) == (2..8).step(2)
}

不错!使用虚拟线程非常简单!

让我们看另一个例子,FizzBuzz 示例

var result = withExistingPool(vtPool) {
    (1..15).collectParallel {
        switch(it) {
            case { it % 15 == 0 } -> 'FizzBuzz'
            case { it % 5 == 0 } -> 'Buzz'
            case { it % 3 == 0 } -> 'Fizz'
            default -> it
        }
    }.join(',')
}
assert result == '1,2,Fizz,4,Buzz,Fizz,7,8,Fizz,Buzz,11,Fizz,13,14,FizzBuzz'

现在,让我们来看一些 Java 开发人员可能不太熟悉的例子。

GPars 具有提供自定义线程池的附加功能,其余的示例依赖于这些功能。当前版本的 GPars 没有接受普通执行器服务的 DefaultPool 构造函数;因此,我们将编写自己的类

@AutoImplement
class VirtualPool implements Pool {
    private final ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor()
    int getPoolSize() { pool.poolSize }
    void execute(Runnable task) { pool.execute(task) }
    ExecutorService getExecutorService() { pool }
}

它本质上是从 GPars Pool 接口到虚拟线程执行器服务的委托。

我们将在接下来的示例中使用它。

代理

代理围绕一个可能可变的共享状态对象提供了一个线程安全的非阻塞包装器。它们受 Clojure 中的代理启发。

在本例中,我们将使用代理来“保护”一个普通的 ArrayList。对于这种简单的情况,我们可以使用一些同步列表,但总的来说,代理消除了寻找线程安全的实现类或真正关心底层包装对象的线程安全的必要性。

var mutableState = []     // a non-synchronized mutable list
var agent = new Agent(mutableState)

agent.attachToThreadPool(new VirtualPool()) // omit line for normal threads

agent { it << 'Dave' }    // one thread updates list
agent { it << 'Joe' }     // another thread also updating
assert agent.val.size() == 2

演员

演员允许使用基于消息传递的并发模型。actor 模型确保最多只有一个线程在任何时间处理 actor 的主体。GPars 的演员 API 和 DSL 非常丰富,支持许多功能。我们将在这里看看一个简单的例子。

GPars 在组中管理 actor 线程池。让我们创建一个由虚拟线程支持的线程池

var vgroup = new DefaultPGroup(new VirtualPool())

现在我们可以编写一个加密和解密演员对,如下所示

var decryptor = vgroup.actor {
    loop {
        react { String message ->
            reply message.reverse()
        }
    }
}

var console = vgroup.actor {
    decryptor << 'lellarap si yvoorG'
    react {
        println 'Decrypted message: ' + it
    }
}

console.join() // output: Decrypted message: Groovy is parallel

数据流

数据流提供了一种本质上安全且健壮的声明式并发模型。数据流也通过线程组进行管理,因此我们将使用之前创建的 vgroup

为了举个例子,我们将创建一个场景,其中两个任务正在生成一些结果,而第三个任务正在添加其他任务的结果。

gpars dataflow

我们有三个可以并行运行并执行其工作的逻辑任务。这些任务需要交换数据,它们通过 *数据流变量* 来完成。将数据流变量视为一次性通道,安全可靠地将数据从生产者传输到消费者。

var df = new Dataflows()

vgroup.with {
    task {
        df.z = df.x + df.y
    }

    task {
        df.x = 10
    }

    task {
        df.y = 5
    }

    assert df.z == 15
}

这段代码的风格是声明式的。我们可以按任何顺序指定三个任务。我们没有给出任何关于哪些任务应该先发生的指示。数据流框架会计算出如何调度各个任务,并确保当需要时,任务的输入变量已准备就绪。

结论

我们已经快速浏览了如何在 Groovy 和 GPars 中使用虚拟线程。虚拟线程仍处于起步阶段,因此随着 JDK21 变得更加主流,预计将出现更多功能。