使用 Apache Pekko 演员和 GPars 演员与 Groovy

作者: Paul King
发布: 2023-07-17 11:24PM (最后更新: 2024-06-25 11:59AM)


pekko logo Apache Pekko 是 Akka 项目的 Apache 许可分支(基于 Akka 版本 2.6.x),提供了一个框架,用于构建并发、分布式、弹性和容错的应用程序。Pekko 提供基于演员的高级并发抽象,以及用于持久性、流、HTTP 等的附加库。它提供 Scala 和 Java API/DSL 用于编写您的应用程序。我们将使用后者。我们将只看使用 Pekko 演员的一个示例。

gpars 为了比较,我们还将看看 GPars,这是一个用于 Java 和 Groovy 的并发库,支持演员、代理、并发和并行映射/规约、fork/join、异步闭包、数据流等等。一个 之前的博客文章 讨论了 GPars 的其他功能以及如何将其与虚拟线程一起使用。在这里,我们将只看看与 Pekko 示例中可比的演员功能。

示例

涉及演员的常见第一个示例包括创建两个演员,其中一个演员向第二个演员发送消息,第二个演员将回复发送回第一个演员。我们当然可以这样做,但我们将使用一个更有趣的涉及三个演员的示例。该示例来自 Pekko 文档,并在下图(来自 Pekko 文档)中说明。

actors in our system - from pekko documentation

该系统包含以下演员

  • HelloWorldMain 演员创建另外两个演员,并发送一条初始消息来启动我们的一个小系统。初始消息发送到 HelloWorld 演员,并将 HelloWorldBot 作为回复地址。

  • HelloWorld 演员正在监听 Greet 消息。当它收到一条消息时,它会向回复地址发送一个 Greeted 确认。

  • HelloWorldBot 就像一个回声室。它会返回它收到的任何消息。这可能是一个无限循环,但是,该演员有一个参数来告诉它在停止之前回声消息的最大次数。

Groovy 中的 Pekko 实现

此示例使用 Groovy 4.0.21 和 Pekko 1.0.3。它已在 JDK 11 和 17 上测试过。

Pekko 文档提供了 Java 和 Scala 实现。您应该注意到,Groovy 实现与 Java 实现类似,但只是稍微短一些。Groovy 代码比等效的 Scala 代码稍微复杂一些。我们当然可以使用 Groovy 元编程以多种方式简化 Groovy 代码,但这将是另一个话题。

以下是 HelloWorld 的代码

class HelloWorld extends AbstractBehavior<Greet> {

    static record Greet(String whom, ActorRef<Greeted> replyTo) {}
    static record Greeted(String whom, ActorRef<Greet> from) {}

    static Behavior<Greet> create() {
        Behaviors.setup(HelloWorld::new)
    }

    private HelloWorld(ActorContext<Greet> context) {
        super(context)
    }

    @Override
    Receive<Greet> createReceive() {
        newReceiveBuilder().onMessage(Greet.class, this::onGreet).build()
    }

    private Behavior<Greet> onGreet(Greet command) {
        context.log.info "Hello $command.whom!"
        command.replyTo.tell(new Greeted(command.whom, context.self))
        this
    }
}

首先,我们定义 GreetGreeter 记录,以便为系统中的消息提供强类型。然后我们定义演员的详细信息。其中相当一部分是样板代码。有趣的部分是在 onGreet 方法中。我们记录消息详细信息,然后发送回 Greeted 确认。

HelloWorldBot 类似。您应该注意到一些状态变量,它们保存一个调用计数器和一个在终止之前调用的最大次数

class HelloWorldBot extends AbstractBehavior<HelloWorld.Greeted> {

    static Behavior<HelloWorld.Greeted> create(int max) {
        Behaviors.setup(context -> new HelloWorldBot(context, max))
    }

    private final int max
    private int greetingCounter

    private HelloWorldBot(ActorContext<HelloWorld.Greeted> context, int max) {
        super(context)
        this.max = max
    }

    @Override
    Receive<HelloWorld.Greeted> createReceive() {
        newReceiveBuilder().onMessage(HelloWorld.Greeted.class, this::onGreeted).build()
    }

    private Behavior<HelloWorld.Greeted> onGreeted(HelloWorld.Greeted message) {
        greetingCounter++
        context.log.info "Greeting $greetingCounter for $message.whom"
        if (greetingCounter == max) {
            return Behaviors.stopped()
        } else {
            message.from.tell(new HelloWorld.Greet(message.whom, context.self))
            return this
        }
    }
}

有趣的逻辑在 onGreeted 方法中。我们增加计数器,如果达到最大计数阈值,则停止,否则将消息内容回声给发送方。

让我们看看最后一个演员

class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {

    static record SayHello(String name) { }

    static Behavior<SayHello> create() {
        Behaviors.setup(HelloWorldMain::new)
    }

    private final ActorRef<HelloWorld.Greet> greeter

    private HelloWorldMain(ActorContext<SayHello> context) {
        super(context)
        greeter = context.spawn(HelloWorld.create(), 'greeter')
    }

    @Override
    Receive<SayHello> createReceive() {
        newReceiveBuilder().onMessage(SayHello.class, this::onStart).build()
    }

    private Behavior<SayHello> onStart(SayHello command) {
        var replyTo = context.spawn(HelloWorldBot.create(3), command.name)
        greeter.tell(new HelloWorld.Greet(command.name, replyTo))
        this
    }
}

有一个 SayHello 记录,充当强类型传入消息。HelloWorldMain 演员创建其他演员。它创建了一个 HelloWorld 演员,它是后续消息的问候目标。对于每个传入的 SayHello 消息,它都会创建一个机器人,然后向问候者发送一条消息,其中包含 SayHello 负载,并告诉它回复机器人

最后,我们需要启动我们的系统。我们创建 HelloWorldMain 演员并向它发送两条消息

var system = ActorSystem.create(HelloWorldMain.create(), 'hello')

system.tell(new HelloWorldMain.SayHello('World'))
system.tell(new HelloWorldMain.SayHello('Pekko'))

运行脚本的日志输出将类似于以下内容

[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello World!
[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello Pekko!
[hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 1 for Pekko
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 1 for World
[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello Pekko!
[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello World!
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 2 for Pekko
[hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 2 for World
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorld - Hello Pekko!
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorld - Hello World!
[hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 3 for Pekko
[hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 3 for World
[hello-pekko.actor.default-dispatcher-6] INFO org.apache.pekko.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]

Groovy 中的 GPars 实现

此示例使用 Groovy 4.0.21 和 GPars 1.2.1。它已在 JDK 8、11、17 和 21 上测试过。

我们将在 GPars 示例中遵循相同的消息强类型约定。以下是我们的三个消息容器

record Greet(String whom, Actor replyTo) { }

record Greeted(String whom, Actor from) {}

record SayHello(String name) { }

现在我们将定义我们的 helloWorld 演员

greeter = actor {
    loop {
        react { Greet command ->
            println "Hello $command.whom!"
            command.replyTo << new Greeted(command.whom, greeter)
        }
    }
}

在这里,我们使用 GPars Groovy 继续风格的 DSL 来定义演员。loop 指示演员将持续循环。当我们收到 Greet 消息时,我们将详细信息记录到标准输出并发送确认。

如果我们不想使用 DSL 语法,我们可以直接使用相关类。在这里,我们将使用这种稍微更冗长的风格定义一个 HelloWorldBot。它展示了如何添加我们需要用于跟踪调用计数的状态变量

class HelloWorldBot extends DefaultActor {
    int max
    private int greetingCounter = 0

    @Override
    protected void act() {
        loop {
            react { Greeted message ->
                greetingCounter++
                println "Greeting $greetingCounter for $message.whom"
                if (greetingCounter < max) message.from << new Greet(message.whom, this)
                else terminate()
            }
        }
    }
}

我们的主要演员非常简单。它正在等待 SayHello 消息,当它收到一条消息时,它将负载发送到 helloWorld 问候者,并告诉它回复一个新创建的机器人

var main = actor {
    loop {
        react { SayHello command ->
            greeter << new Greet(command.name, new HelloWorldBot(max: 3).start())
        }
    }
}

最后,我们通过发送一些初始消息来启动系统

main << new SayHello('World')
main << new SayHello('GPars')

输出如下所示

Hello World!
Hello GPars!
Greeting 1 for World
Greeting 1 for GPars
Hello World!
Hello GPars!
Greeting 2 for World
Hello World!
Greeting 2 for GPars
Hello GPars!
Greeting 3 for World
Greeting 3 for GPars

讨论

与 Pekko 实现相比,GPars 实现的冗长性较低,但 Pekko 以提供演员消息的额外类型安全性而闻名,而这正是我们所看到的。

GPars 支持多种风格,其中一些风格在减少冗长性的同时,也牺牲了在运行时而不是编译时捕获某些错误的能力。当需要使用 Groovy 的动态特性编写非常简洁的代码时,此类代码很有用。当使用 Groovy 的静态特性或 Java 时,您可能要考虑使用 GPars API 的一部分。

例如,我们可以为 HelloWorldBot 提供一个替代定义,如下所示

class HelloWorldBot extends StaticDispatchActor<Greeted> {
    int max
    private int greetingCounter = 0

    @Override
    void onMessage(Greeted message) {
        greetingCounter++
        println "Greeting $greetingCounter for $message.whom"
        if (greetingCounter < max) message.from << new Greet(message.whom, this)
        else terminate()
    }
}

StaticDispatchActor 仅根据编译时信息调度消息。当不需要基于消息运行时类型进行调度时,这可能更有效。

我们也可以为 Greet 提供一个替代定义,如下所示

record Greet(String whom, StaticDispatchActor<Greeted> replyTo) { }

通过进行这些更改,我们可以在使用 Groovy 的静态特性时编写具有额外消息类型安全性的解决方案。

结论

我们快速地了解了如何使用 Apache Pekko 和 GPars 与演员一起使用。

示例代码可以在这里找到

更新历史

2023 年 7 月 17 日: 初始版本。
2023 年 7 月 18 日: 添加关于类型安全消息的讨论。
2023 年 7 月 26 日: 更新到 Pekko 1.0.1。
2024 年 6 月 25 日: 更新到 Pekko 1.0.3。