在 Groovy™ 中使用 Apache Pekko actor 和 GPars actor

作者: Paul King

发布时间:2023-07-17 11:24PM (最后更新时间:2024-09-28 04:59AM)


pekko logo Apache Pekko 是 Akka 项目的一个 Apache 许可的派生版本(基于 Akka 2.6.x),它提供了一个用于构建并发、分布式、弹性和弹性应用程序的框架。Pekko 提供基于 actor 的高级并发抽象,以及用于持久化、流、HTTP 等的额外库。它提供了 Scala 和 Java API/DSL 用于编写您的应用程序。我们将使用后者。我们只看一个使用 Pekko actor 的例子。

gpars 作为比较,我们还将关注 GPars,这是一个用于 Java 和 Groovy 的并发库,支持 actor、代理、并发和并行 map/reduce、fork/join、异步闭包、数据流等等。之前的博客文章介绍了 GPars 的其他功能以及如何将其与虚拟线程一起使用。在这里,我们只关注我们的 Pekko 示例中可比较的 actor 功能。

示例

涉及 actor 的常见第一个示例是创建两个 actor,其中一个 actor 向第二个 actor 发送消息,第二个 actor 再向第一个 actor 发送回复。我们当然可以这样做,但我们将使用一个稍微更有趣的涉及三个 actor 的示例。该示例来自 Pekko 文档,并在以下图中(来自 Pekko 文档)进行说明

actors in our system - from pekko documentation

该系统由以下 actor 组成

  • HelloWorldMain actor 创建其他两个 actor 并发送初始消息以启动我们的小系统。初始消息发送给 HelloWorld actor,并将 HelloWorldBot 作为回复地址。

  • HelloWorld actor 正在监听 Greet 消息。当它收到一条消息时,它会向回复地址发送一个 Greeted 确认。

  • HelloWorldBot 就像一个回音室。它返回它收到的任何消息。这可能会导致无限循环,但是,该 actor 有一个参数来告诉它在停止之前回显消息的最大次数。

Groovy 中的 Pekko 实现

此示例使用 Groovy 4.0.23 和 Pekko 1.1.1。它已通过 JDK 11、17、21 和 23 的测试。

Pekko 文档提供了 Java 和 Scala 的实现。您应该注意到 Groovy 实现与 Java 实现相似,但更短一些。Groovy 代码比等效的 Scala 代码稍微复杂一些。我们当然可以使用 Groovy 元编程以多种方式简化 Groovy 代码,但这将在以后讨论。

这是 HelloWorld 的代码

class HelloWorld extends AbstractBehavior<Greet> {

    static record Greet(String whom, ActorRef<Greeted> replyTo) {}
    static record Greeted(String whom, ActorRef<Greet> from) {}

    static Behavior<Greet> create() {
        Behaviors.setup(HelloWorld::new)
    }

    private HelloWorld(ActorContext<Greet> context) {
        super(context)
    }

    @Override
    Receive<Greet> createReceive() {
        newReceiveBuilder().onMessage(Greet.class, this::onGreet).build()
    }

    private Behavior<Greet> onGreet(Greet command) {
        context.log.info "Hello $command.whom!"
        command.replyTo.tell(new Greeted(command.whom, context.self))
        this
    }
}

首先,我们定义 GreetGreeter 记录,以便在我们的系统中对消息进行强类型化。然后我们定义 actor 的详细信息。其中很大一部分是样板代码。有趣的部分在 onGreet 方法内部。我们先记录消息详细信息,然后发送 Greeted 确认。

HelloWorldBot 类似。您应该注意到一些状态变量,它们维护一个调用计数器和终止前的最大调用次数

class HelloWorldBot extends AbstractBehavior<HelloWorld.Greeted> {

    static Behavior<HelloWorld.Greeted> create(int max) {
        Behaviors.setup(context -> new HelloWorldBot(context, max))
    }

    private final int max
    private int greetingCounter

    private HelloWorldBot(ActorContext<HelloWorld.Greeted> context, int max) {
        super(context)
        this.max = max
    }

    @Override
    Receive<HelloWorld.Greeted> createReceive() {
        newReceiveBuilder().onMessage(HelloWorld.Greeted.class, this::onGreeted).build()
    }

    private Behavior<HelloWorld.Greeted> onGreeted(HelloWorld.Greeted message) {
        greetingCounter++
        context.log.info "Greeting $greetingCounter for $message.whom"
        if (greetingCounter == max) {
            return Behaviors.stopped()
        } else {
            message.from.tell(new HelloWorld.Greet(message.whom, context.self))
            return this
        }
    }
}

有趣的逻辑在 onGreeted 方法中。我们增加计数器,如果达到最大计数阈值则停止,否则将消息内容回显给发送者。

让我们看看最后一个 actor

class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {

    static record SayHello(String name) { }

    static Behavior<SayHello> create() {
        Behaviors.setup(HelloWorldMain::new)
    }

    private final ActorRef<HelloWorld.Greet> greeter

    private HelloWorldMain(ActorContext<SayHello> context) {
        super(context)
        greeter = context.spawn(HelloWorld.create(), 'greeter')
    }

    @Override
    Receive<SayHello> createReceive() {
        newReceiveBuilder().onMessage(SayHello.class, this::onStart).build()
    }

    private Behavior<SayHello> onStart(SayHello command) {
        var replyTo = context.spawn(HelloWorldBot.create(3), command.name)
        greeter.tell(new HelloWorld.Greet(command.name, replyTo))
        this
    }
}

有一个 SayHello 记录,作为强类型的传入消息。HelloWorldMain actor 创建其他 actor。它创建一个 HelloWorld actor,作为后续消息的“打招呼者”目标。对于每条传入的 SayHello 消息,它创建一个“机器人”,然后向“打招呼者”发送一条消息,其中包含 SayHello 有效载荷并告诉它回复给“机器人”。

最后,我们需要启动我们的系统。我们创建 HelloWorldMain actor 并向其发送两条消息

var system = ActorSystem.create(HelloWorldMain.create(), 'hello')

system.tell(new HelloWorldMain.SayHello('World'))
system.tell(new HelloWorldMain.SayHello('Pekko'))

运行脚本的日志输出将类似于此

[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello World!
[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello Pekko!
[hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 1 for Pekko
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 1 for World
[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello Pekko!
[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello World!
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 2 for Pekko
[hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 2 for World
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorld - Hello Pekko!
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorld - Hello World!
[hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 3 for Pekko
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 3 for World
[hello-pekko.actor.default-dispatcher-6] INFO org.apache.pekko.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]

Groovy 中的 GPars 实现

此示例使用 Groovy 4.0.23 和 GPars 1.2.1。它已通过 JDK 8、11、17、21 和 23 的测试。

在我们的 GPars 示例中,我们将遵循相同的强类型消息约定。以下是我们的三个消息容器

record Greet(String whom, Actor replyTo) { }

record Greeted(String whom, Actor from) {}

record SayHello(String name) { }

现在我们将定义我们的 helloWorld actor

greeter = actor {
    loop {
        react { Greet command ->
            println "Hello $command.whom!"
            command.replyTo << new Greeted(command.whom, greeter)
        }
    }
}

在这里,我们使用 GPars Groovy 延续式 DSL 定义 actor。loop 表示 actor 将持续循环。当我们收到 Greet 消息时,我们将详细信息记录到标准输出并发送确认。

如果我们不想使用 DSL 语法,我们可以直接使用相关的类。在这里,我们将使用这种稍微更冗长的样式定义一个 HelloWorldBot。它展示了添加我们跟踪调用计数所需的状态变量

class HelloWorldBot extends DefaultActor {
    int max
    private int greetingCounter = 0

    @Override
    protected void act() {
        loop {
            react { Greeted message ->
                greetingCounter++
                println "Greeting $greetingCounter for $message.whom"
                if (greetingCounter < max) message.from << new Greet(message.whom, this)
                else terminate()
            }
        }
    }
}

我们的主 actor 非常简单。它正在等待 SayHello 消息,当它收到一条消息时,它会将负载发送给 helloWorld greeter,告诉它回复给一个新创建的“机器人”。

var main = actor {
    loop {
        react { SayHello command ->
            greeter << new Greet(command.name, new HelloWorldBot(max: 3).start())
        }
    }
}

最后,我们通过发送一些初始消息来启动系统

main << new SayHello('World')
main << new SayHello('GPars')

输出如下

Hello World!
Hello GPars!
Greeting 1 for World
Greeting 1 for GPars
Hello World!
Hello GPars!
Greeting 2 for World
Hello World!
Greeting 2 for GPars
Hello GPars!
Greeting 3 for World
Greeting 3 for GPars

讨论

与 Pekko 实现相比,GPars 实现更简洁,但 Pekko 以提供 actor 消息的额外类型安全性而闻名,这也是我们所看到的一部分。

GPars 支持多种样式,其中一些以运行时捕获某些错误而非编译时捕获为代价,提供了更简洁的语法。当希望使用 Groovy 的动态特性编写非常简洁的代码时,此类代码会很有用。当使用 Groovy 的静态特性或 Java 时,您可以考虑使用 GPars API 的选定部分。

例如,我们可以为 HelloWorldBot 提供如下替代定义

class HelloWorldBot extends StaticDispatchActor<Greeted> {
    int max
    private int greetingCounter = 0

    @Override
    void onMessage(Greeted message) {
        greetingCounter++
        println "Greeting $greetingCounter for $message.whom"
        if (greetingCounter < max) message.from << new Greet(message.whom, this)
        else terminate()
    }
}

StaticDispatchActor 仅根据编译时信息分派消息。当不需要基于消息运行时类型分派时,这可以更高效。

我们还可以为 Greet 提供如下替代定义

record Greet(String whom, StaticDispatchActor<Greeted> replyTo) { }

通过进行这些更改,在使用 Groovy 的静态特性时,我们可以编写一个具有额外消息类型安全性的解决方案。

结论

我们快速了解了如何在 Apache Pekko 和 GPars 中使用 actor。

示例代码可以在这里找到

更新历史

2023年7月17日:初始版本。
2023年7月18日:添加关于类型安全消息的讨论。
2023年7月26日:更新到 Pekko 1.0.1。
2024年6月25日:更新到 Pekko 1.0.3。
2024年9月28日:更新到 Pekko 1.1.1。