使用 Apache Pekko 演员和 GPars 演员与 Groovy
作者: Paul King
发布: 2023-07-17 11:24PM (最后更新: 2024-06-25 11:59AM)
Apache Pekko 是 Akka 项目的 Apache 许可分支(基于 Akka 版本 2.6.x),提供了一个框架,用于构建并发、分布式、弹性和容错的应用程序。Pekko 提供基于演员的高级并发抽象,以及用于持久性、流、HTTP 等的附加库。它提供 Scala 和 Java API/DSL 用于编写您的应用程序。我们将使用后者。我们将只看使用 Pekko 演员的一个示例。
示例
涉及演员的常见第一个示例包括创建两个演员,其中一个演员向第二个演员发送消息,第二个演员将回复发送回第一个演员。我们当然可以这样做,但我们将使用一个更有趣的涉及三个演员的示例。该示例来自 Pekko 文档,并在下图(来自 Pekko 文档)中说明。
该系统包含以下演员
-
HelloWorldMain
演员创建另外两个演员,并发送一条初始消息来启动我们的一个小系统。初始消息发送到HelloWorld
演员,并将HelloWorldBot
作为回复地址。 -
HelloWorld
演员正在监听Greet
消息。当它收到一条消息时,它会向回复地址发送一个Greeted
确认。 -
HelloWorldBot
就像一个回声室。它会返回它收到的任何消息。这可能是一个无限循环,但是,该演员有一个参数来告诉它在停止之前回声消息的最大次数。
Groovy 中的 Pekko 实现
此示例使用 Groovy 4.0.21 和 Pekko 1.0.3。它已在 JDK 11 和 17 上测试过。
Pekko 文档提供了 Java 和 Scala 实现。您应该注意到,Groovy 实现与 Java 实现类似,但只是稍微短一些。Groovy 代码比等效的 Scala 代码稍微复杂一些。我们当然可以使用 Groovy 元编程以多种方式简化 Groovy 代码,但这将是另一个话题。
以下是 HelloWorld
的代码
class HelloWorld extends AbstractBehavior<Greet> {
static record Greet(String whom, ActorRef<Greeted> replyTo) {}
static record Greeted(String whom, ActorRef<Greet> from) {}
static Behavior<Greet> create() {
Behaviors.setup(HelloWorld::new)
}
private HelloWorld(ActorContext<Greet> context) {
super(context)
}
@Override
Receive<Greet> createReceive() {
newReceiveBuilder().onMessage(Greet.class, this::onGreet).build()
}
private Behavior<Greet> onGreet(Greet command) {
context.log.info "Hello $command.whom!"
command.replyTo.tell(new Greeted(command.whom, context.self))
this
}
}
首先,我们定义 Greet
和 Greeter
记录,以便为系统中的消息提供强类型。然后我们定义演员的详细信息。其中相当一部分是样板代码。有趣的部分是在 onGreet
方法中。我们记录消息详细信息,然后发送回 Greeted
确认。
HelloWorldBot
类似。您应该注意到一些状态变量,它们保存一个调用计数器和一个在终止之前调用的最大次数
class HelloWorldBot extends AbstractBehavior<HelloWorld.Greeted> {
static Behavior<HelloWorld.Greeted> create(int max) {
Behaviors.setup(context -> new HelloWorldBot(context, max))
}
private final int max
private int greetingCounter
private HelloWorldBot(ActorContext<HelloWorld.Greeted> context, int max) {
super(context)
this.max = max
}
@Override
Receive<HelloWorld.Greeted> createReceive() {
newReceiveBuilder().onMessage(HelloWorld.Greeted.class, this::onGreeted).build()
}
private Behavior<HelloWorld.Greeted> onGreeted(HelloWorld.Greeted message) {
greetingCounter++
context.log.info "Greeting $greetingCounter for $message.whom"
if (greetingCounter == max) {
return Behaviors.stopped()
} else {
message.from.tell(new HelloWorld.Greet(message.whom, context.self))
return this
}
}
}
有趣的逻辑在 onGreeted
方法中。我们增加计数器,如果达到最大计数阈值,则停止,否则将消息内容回声给发送方。
让我们看看最后一个演员
class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {
static record SayHello(String name) { }
static Behavior<SayHello> create() {
Behaviors.setup(HelloWorldMain::new)
}
private final ActorRef<HelloWorld.Greet> greeter
private HelloWorldMain(ActorContext<SayHello> context) {
super(context)
greeter = context.spawn(HelloWorld.create(), 'greeter')
}
@Override
Receive<SayHello> createReceive() {
newReceiveBuilder().onMessage(SayHello.class, this::onStart).build()
}
private Behavior<SayHello> onStart(SayHello command) {
var replyTo = context.spawn(HelloWorldBot.create(3), command.name)
greeter.tell(new HelloWorld.Greet(command.name, replyTo))
this
}
}
有一个 SayHello
记录,充当强类型传入消息。HelloWorldMain
演员创建其他演员。它创建了一个 HelloWorld
演员,它是后续消息的问候目标。对于每个传入的 SayHello
消息,它都会创建一个机器人,然后向问候者发送一条消息,其中包含 SayHello
负载,并告诉它回复机器人。
最后,我们需要启动我们的系统。我们创建 HelloWorldMain
演员并向它发送两条消息
var system = ActorSystem.create(HelloWorldMain.create(), 'hello')
system.tell(new HelloWorldMain.SayHello('World'))
system.tell(new HelloWorldMain.SayHello('Pekko'))
运行脚本的日志输出将类似于以下内容
[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello World! [hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello Pekko! [hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 1 for Pekko [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 1 for World [hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello Pekko! [hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello World! [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 2 for Pekko [hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 2 for World [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorld - Hello Pekko! [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorld - Hello World! [hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 3 for Pekko [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 3 for World [hello-pekko.actor.default-dispatcher-6] INFO org.apache.pekko.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
Groovy 中的 GPars 实现
此示例使用 Groovy 4.0.21 和 GPars 1.2.1。它已在 JDK 8、11、17 和 21 上测试过。
我们将在 GPars 示例中遵循相同的消息强类型约定。以下是我们的三个消息容器
record Greet(String whom, Actor replyTo) { }
record Greeted(String whom, Actor from) {}
record SayHello(String name) { }
现在我们将定义我们的 helloWorld
演员
greeter = actor {
loop {
react { Greet command ->
println "Hello $command.whom!"
command.replyTo << new Greeted(command.whom, greeter)
}
}
}
在这里,我们使用 GPars Groovy 继续风格的 DSL 来定义演员。loop
指示演员将持续循环。当我们收到 Greet
消息时,我们将详细信息记录到标准输出并发送确认。
如果我们不想使用 DSL 语法,我们可以直接使用相关类。在这里,我们将使用这种稍微更冗长的风格定义一个 HelloWorldBot
。它展示了如何添加我们需要用于跟踪调用计数的状态变量
class HelloWorldBot extends DefaultActor {
int max
private int greetingCounter = 0
@Override
protected void act() {
loop {
react { Greeted message ->
greetingCounter++
println "Greeting $greetingCounter for $message.whom"
if (greetingCounter < max) message.from << new Greet(message.whom, this)
else terminate()
}
}
}
}
我们的主要演员非常简单。它正在等待 SayHello
消息,当它收到一条消息时,它将负载发送到 helloWorld 问候者,并告诉它回复一个新创建的机器人。
var main = actor {
loop {
react { SayHello command ->
greeter << new Greet(command.name, new HelloWorldBot(max: 3).start())
}
}
}
最后,我们通过发送一些初始消息来启动系统
main << new SayHello('World')
main << new SayHello('GPars')
输出如下所示
Hello World! Hello GPars! Greeting 1 for World Greeting 1 for GPars Hello World! Hello GPars! Greeting 2 for World Hello World! Greeting 2 for GPars Hello GPars! Greeting 3 for World Greeting 3 for GPars
讨论
与 Pekko 实现相比,GPars 实现的冗长性较低,但 Pekko 以提供演员消息的额外类型安全性而闻名,而这正是我们所看到的。
GPars 支持多种风格,其中一些风格在减少冗长性的同时,也牺牲了在运行时而不是编译时捕获某些错误的能力。当需要使用 Groovy 的动态特性编写非常简洁的代码时,此类代码很有用。当使用 Groovy 的静态特性或 Java 时,您可能要考虑使用 GPars API 的一部分。
例如,我们可以为 HelloWorldBot
提供一个替代定义,如下所示
class HelloWorldBot extends StaticDispatchActor<Greeted> {
int max
private int greetingCounter = 0
@Override
void onMessage(Greeted message) {
greetingCounter++
println "Greeting $greetingCounter for $message.whom"
if (greetingCounter < max) message.from << new Greet(message.whom, this)
else terminate()
}
}
StaticDispatchActor
仅根据编译时信息调度消息。当不需要基于消息运行时类型进行调度时,这可能更有效。
我们也可以为 Greet
提供一个替代定义,如下所示
record Greet(String whom, StaticDispatchActor<Greeted> replyTo) { }
通过进行这些更改,我们可以在使用 Groovy 的静态特性时编写具有额外消息类型安全性的解决方案。
结论
我们快速地了解了如何使用 Apache Pekko 和 GPars 与演员一起使用。
示例代码可以在这里找到