GPars 遇见虚拟线程

作者: Paul King

发布时间:2022-06-15 11:28AM(最后更新时间:2023-04-14 06:23PM)


gpars JDK21 中即将推出的一个令人兴奋的特性是虚拟线程(JEP 444)。它已在 JDK19(JEP 425)和 JDK20(JEP 436)中进行了预览,但 JDK21 将是该特性无需使用 Java 预览开关即可使用的第一个版本。本博客中的示例使用 Groovy 4.0.11 和 JDK 版本 21-ea+18-1480(并使用启用了预览功能的 JDK19)进行了测试。

虚拟线程与我最喜欢的 Groovy 并行和并发库 GPars 配合得很好。GPars 已经存在一段时间了(从 Java 5 和 Groovy 1.8 时代开始),但仍然有许多有用的功能。让我们看几个例子。

如果您想尝试这些功能,请使用最新版本的 JDK21ea,它默认支持虚拟线程。或者使用最近的 JDK19-20 版本,并使用 Groovy 工具启用预览功能。

并行集合

首先回顾一下,我们先来看看如何使用 GPars 并行集合功能与普通线程。让我们从一个数字列表开始

var nums = [1, 2, 3]

为了使用普通线程并行计算原始数字的平方列表,我们使用 GParsPool.withPool 方法,如下所示

withPool {
    assert nums.collectParallel{ it ** 2 } == [1, 4, 9]
}

对于任何 Java 读者,不要混淆 collectParallel 方法名。Groovy 的 collect 方法(命名灵感来自 Smalltalk)相当于 Java 的 map 方法。因此,使用 Java streams API 的等效 Groovy 代码将类似于

assert nums.parallelStream().map(n -> n ** 2).toList() == [1, 4, 9]

现在,让我们将虚拟线程引入进来。幸运的是,GPars 并行集合功能提供了一个使用现有自定义执行器服务(GParsExecutorsPool.withExistingPool)的钩子。这使得使用虚拟线程来处理此类代码变得容易。首先我们创建我们的池(由虚拟线程支持)

var vtPool = Executors.newVirtualThreadPerTaskExecutor()

现在,我们可以像这样使用它

withExistingPool(vtPool) {
    assert nums.collectParallel{ it ** 2 } == [1, 4, 9]
}

或者我们可以使用许多其他 '*Parallel' 方法之一,在这种情况下是 findAllParallel

var isEven = n -> n % 2 == 0
withExistingPool(vtPool) {
    assert (1..9).findAllParallel(isEven) == (2..8).step(2)
}

太棒了!使用虚拟线程非常简单!

让我们看一个例子,FizzBuzz 例子

var result = withExistingPool(vtPool) {
    (1..15).collectParallel {
        switch(it) {
            case { it % 15 == 0 } -> 'FizzBuzz'
            case { it % 5 == 0 } -> 'Buzz'
            case { it % 3 == 0 } -> 'Fizz'
            default -> it
        }
    }.join(',')
}
assert result == '1,2,Fizz,4,Buzz,Fizz,7,8,Fizz,Buzz,11,Fizz,13,14,FizzBuzz'

现在,让我们继续看一些 Java 开发人员可能不太熟悉的例子。

GPars 具有提供自定义线程池的附加功能,其余示例都依赖于这些功能。当前版本的 GPars 没有接受普通执行器服务的 DefaultPool 构造函数;因此,我们将编写自己的类

@AutoImplement
class VirtualPool implements Pool {
    private final ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor()
    int getPoolSize() { pool.poolSize }
    void execute(Runnable task) { pool.execute(task) }
    ExecutorService getExecutorService() { pool }
}

它本质上是 GPars Pool 接口到虚拟线程执行器服务的委托。

我们将在其余示例中使用它。

代理

代理提供了一个线程安全的非阻塞包装器,用于包装一个可能可变的共享状态对象。它们的灵感来自 Clojure 中的代理。

在我们的例子中,我们将使用代理来“保护”一个普通的 ArrayList。对于这个简单的例子,我们可以使用一些同步列表,但通常情况下,代理消除了寻找线程安全实现类或根本不需要关心底层包装对象的线程安全性的需求。

var mutableState = []     // a non-synchronized mutable list
var agent = new Agent(mutableState)

agent.attachToThreadPool(new VirtualPool()) // omit line for normal threads

agent { it << 'Dave' }    // one thread updates list
agent { it << 'Joe' }     // another thread also updating
assert agent.val.size() == 2

Actor

Actor 允许基于消息传递的并发模型。Actor 模型确保在任何给定时间最多只有一个线程处理 actor 的主体。GPars API 和 Actor 的 DSL 非常丰富,支持许多功能。我们在这里看一个简单的例子。

GPars 在组中管理 Actor 线程池。让我们创建一个由虚拟线程支持的线程池

var vgroup = new DefaultPGroup(new VirtualPool())

现在我们可以编写一个加密和解密 Actor 对,如下所示

var decryptor = vgroup.actor {
    loop {
        react { String message ->
            reply message.reverse()
        }
    }
}

var console = vgroup.actor {
    decryptor << 'lellarap si yvoorG'
    react {
        println 'Decrypted message: ' + it
    }
}

console.join() // output: Decrypted message: Groovy is parallel

数据流

数据流提供了一种本质上安全可靠的声明式并发模型。数据流也通过线程组进行管理,因此我们将使用我们之前创建的 vgroup

为了举例说明,我们将创建一个场景,其中两个任务产生一些结果,第三个任务添加其他任务的结果。

gpars dataflow

我们有三个逻辑任务可以并行运行并执行它们的工作。这些任务需要交换数据,它们使用数据流变量来做到这一点。将数据流变量视为一次性通道,安全可靠地将数据从生产者传输到其消费者。

var df = new Dataflows()

vgroup.with {
    task {
        df.z = df.x + df.y
    }

    task {
        df.x = 10
    }

    task {
        df.y = 5
    }

    assert df.z == 15
}

此代码采用声明式风格。我们可以以任何顺序指定这三个任务。我们没有指示哪些任务应该首先发生。数据流框架会计算如何调度各个任务,并确保在需要时任务的输入变量已准备就绪。

结论

我们已经快速了解了如何将虚拟线程与 Groovy 和 GPars 结合使用。虚拟线程仍处于早期阶段,因此随着 JDK21 变得越来越主流,预计还会出现更多内容。