在 Groovy™ 中使用 Apache Pekko actor 和 GPars actor
发布时间:2023-07-17 11:24PM (最后更新时间:2024-09-28 04:59AM)
Apache Pekko 是 Akka 项目的一个 Apache 许可的派生版本(基于 Akka 2.6.x),它提供了一个用于构建并发、分布式、弹性和弹性应用程序的框架。Pekko 提供基于 actor 的高级并发抽象,以及用于持久化、流、HTTP 等的额外库。它提供了 Scala 和 Java API/DSL 用于编写您的应用程序。我们将使用后者。我们只看一个使用 Pekko actor 的例子。
示例
涉及 actor 的常见第一个示例是创建两个 actor,其中一个 actor 向第二个 actor 发送消息,第二个 actor 再向第一个 actor 发送回复。我们当然可以这样做,但我们将使用一个稍微更有趣的涉及三个 actor 的示例。该示例来自 Pekko 文档,并在以下图中(来自 Pekko 文档)进行说明
该系统由以下 actor 组成
-
HelloWorldMain
actor 创建其他两个 actor 并发送初始消息以启动我们的小系统。初始消息发送给HelloWorld
actor,并将HelloWorldBot
作为回复地址。 -
HelloWorld
actor 正在监听Greet
消息。当它收到一条消息时,它会向回复地址发送一个Greeted
确认。 -
HelloWorldBot
就像一个回音室。它返回它收到的任何消息。这可能会导致无限循环,但是,该 actor 有一个参数来告诉它在停止之前回显消息的最大次数。
Groovy 中的 Pekko 实现
此示例使用 Groovy 4.0.23 和 Pekko 1.1.1。它已通过 JDK 11、17、21 和 23 的测试。
Pekko 文档提供了 Java 和 Scala 的实现。您应该注意到 Groovy 实现与 Java 实现相似,但更短一些。Groovy 代码比等效的 Scala 代码稍微复杂一些。我们当然可以使用 Groovy 元编程以多种方式简化 Groovy 代码,但这将在以后讨论。
这是 HelloWorld
的代码
class HelloWorld extends AbstractBehavior<Greet> {
static record Greet(String whom, ActorRef<Greeted> replyTo) {}
static record Greeted(String whom, ActorRef<Greet> from) {}
static Behavior<Greet> create() {
Behaviors.setup(HelloWorld::new)
}
private HelloWorld(ActorContext<Greet> context) {
super(context)
}
@Override
Receive<Greet> createReceive() {
newReceiveBuilder().onMessage(Greet.class, this::onGreet).build()
}
private Behavior<Greet> onGreet(Greet command) {
context.log.info "Hello $command.whom!"
command.replyTo.tell(new Greeted(command.whom, context.self))
this
}
}
首先,我们定义 Greet
和 Greeter
记录,以便在我们的系统中对消息进行强类型化。然后我们定义 actor 的详细信息。其中很大一部分是样板代码。有趣的部分在 onGreet
方法内部。我们先记录消息详细信息,然后发送 Greeted
确认。
HelloWorldBot
类似。您应该注意到一些状态变量,它们维护一个调用计数器和终止前的最大调用次数
class HelloWorldBot extends AbstractBehavior<HelloWorld.Greeted> {
static Behavior<HelloWorld.Greeted> create(int max) {
Behaviors.setup(context -> new HelloWorldBot(context, max))
}
private final int max
private int greetingCounter
private HelloWorldBot(ActorContext<HelloWorld.Greeted> context, int max) {
super(context)
this.max = max
}
@Override
Receive<HelloWorld.Greeted> createReceive() {
newReceiveBuilder().onMessage(HelloWorld.Greeted.class, this::onGreeted).build()
}
private Behavior<HelloWorld.Greeted> onGreeted(HelloWorld.Greeted message) {
greetingCounter++
context.log.info "Greeting $greetingCounter for $message.whom"
if (greetingCounter == max) {
return Behaviors.stopped()
} else {
message.from.tell(new HelloWorld.Greet(message.whom, context.self))
return this
}
}
}
有趣的逻辑在 onGreeted
方法中。我们增加计数器,如果达到最大计数阈值则停止,否则将消息内容回显给发送者。
让我们看看最后一个 actor
class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {
static record SayHello(String name) { }
static Behavior<SayHello> create() {
Behaviors.setup(HelloWorldMain::new)
}
private final ActorRef<HelloWorld.Greet> greeter
private HelloWorldMain(ActorContext<SayHello> context) {
super(context)
greeter = context.spawn(HelloWorld.create(), 'greeter')
}
@Override
Receive<SayHello> createReceive() {
newReceiveBuilder().onMessage(SayHello.class, this::onStart).build()
}
private Behavior<SayHello> onStart(SayHello command) {
var replyTo = context.spawn(HelloWorldBot.create(3), command.name)
greeter.tell(new HelloWorld.Greet(command.name, replyTo))
this
}
}
有一个 SayHello
记录,作为强类型的传入消息。HelloWorldMain
actor 创建其他 actor。它创建一个 HelloWorld
actor,作为后续消息的“打招呼者”目标。对于每条传入的 SayHello
消息,它创建一个“机器人”,然后向“打招呼者”发送一条消息,其中包含 SayHello
有效载荷并告诉它回复给“机器人”。
最后,我们需要启动我们的系统。我们创建 HelloWorldMain
actor 并向其发送两条消息
var system = ActorSystem.create(HelloWorldMain.create(), 'hello')
system.tell(new HelloWorldMain.SayHello('World'))
system.tell(new HelloWorldMain.SayHello('Pekko'))
运行脚本的日志输出将类似于此
[hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello World! [hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello Pekko! [hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 1 for Pekko [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 1 for World [hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello Pekko! [hello-pekko.actor.default-dispatcher-6] INFO pekko.HelloWorld - Hello World! [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 2 for Pekko [hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 2 for World [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorld - Hello Pekko! [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorld - Hello World! [hello-pekko.actor.default-dispatcher-3] INFO pekko.HelloWorldBot - Greeting 3 for Pekko [hello-pekko.actor.default-dispatcher-5] INFO pekko.HelloWorldBot - Greeting 3 for World [hello-pekko.actor.default-dispatcher-6] INFO org.apache.pekko.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
Groovy 中的 GPars 实现
此示例使用 Groovy 4.0.23 和 GPars 1.2.1。它已通过 JDK 8、11、17、21 和 23 的测试。
在我们的 GPars 示例中,我们将遵循相同的强类型消息约定。以下是我们的三个消息容器
record Greet(String whom, Actor replyTo) { }
record Greeted(String whom, Actor from) {}
record SayHello(String name) { }
现在我们将定义我们的 helloWorld
actor
greeter = actor {
loop {
react { Greet command ->
println "Hello $command.whom!"
command.replyTo << new Greeted(command.whom, greeter)
}
}
}
在这里,我们使用 GPars Groovy 延续式 DSL 定义 actor。loop
表示 actor 将持续循环。当我们收到 Greet
消息时,我们将详细信息记录到标准输出并发送确认。
如果我们不想使用 DSL 语法,我们可以直接使用相关的类。在这里,我们将使用这种稍微更冗长的样式定义一个 HelloWorldBot
。它展示了添加我们跟踪调用计数所需的状态变量
class HelloWorldBot extends DefaultActor {
int max
private int greetingCounter = 0
@Override
protected void act() {
loop {
react { Greeted message ->
greetingCounter++
println "Greeting $greetingCounter for $message.whom"
if (greetingCounter < max) message.from << new Greet(message.whom, this)
else terminate()
}
}
}
}
我们的主 actor 非常简单。它正在等待 SayHello
消息,当它收到一条消息时,它会将负载发送给 helloWorld greeter,告诉它回复给一个新创建的“机器人”。
var main = actor {
loop {
react { SayHello command ->
greeter << new Greet(command.name, new HelloWorldBot(max: 3).start())
}
}
}
最后,我们通过发送一些初始消息来启动系统
main << new SayHello('World')
main << new SayHello('GPars')
输出如下
Hello World! Hello GPars! Greeting 1 for World Greeting 1 for GPars Hello World! Hello GPars! Greeting 2 for World Hello World! Greeting 2 for GPars Hello GPars! Greeting 3 for World Greeting 3 for GPars
讨论
与 Pekko 实现相比,GPars 实现更简洁,但 Pekko 以提供 actor 消息的额外类型安全性而闻名,这也是我们所看到的一部分。
GPars 支持多种样式,其中一些以运行时捕获某些错误而非编译时捕获为代价,提供了更简洁的语法。当希望使用 Groovy 的动态特性编写非常简洁的代码时,此类代码会很有用。当使用 Groovy 的静态特性或 Java 时,您可以考虑使用 GPars API 的选定部分。
例如,我们可以为 HelloWorldBot
提供如下替代定义
class HelloWorldBot extends StaticDispatchActor<Greeted> {
int max
private int greetingCounter = 0
@Override
void onMessage(Greeted message) {
greetingCounter++
println "Greeting $greetingCounter for $message.whom"
if (greetingCounter < max) message.from << new Greet(message.whom, this)
else terminate()
}
}
StaticDispatchActor
仅根据编译时信息分派消息。当不需要基于消息运行时类型分派时,这可以更高效。
我们还可以为 Greet
提供如下替代定义
record Greet(String whom, StaticDispatchActor<Greeted> replyTo) { }
通过进行这些更改,在使用 Groovy 的静态特性时,我们可以编写一个具有额外消息类型安全性的解决方案。
结论
我们快速了解了如何在 Apache Pekko 和 GPars 中使用 actor。
示例代码可以在这里找到