在 Groovy™ 中使用 Gatherers

作者: Paul King

发布时间:2023-12-09 03:30PM (最后更新时间:2025-03-19 02:00PM)


JDK 24 的 Gatherer 增强功能支持为多种场景创建更灵活、更高效的流管道。让我们用 Groovy 探索一些这样的场景,并查看 Groovy 对应的集合功能,为 JDK 8+ 用户处理类似的场景!

最近 JDK 版本中一个有趣的功能是 Gatherers

本博客探讨了在 Groovy 中使用该功能。与许多 JDK 增强功能一样,Groovy 无需任何更改即可提供自动的 Gatherer 支持。

本博客中的示例使用 Groovy 4.0.26 进行测试。Gatherer 示例使用 JDK 24.0.0 版本,其他示例使用 JDK 8、17 和 24 进行测试。

理解 Gatherers

Java 开发人员现在对流非常熟悉。流是值的潜在无限序列,支持惰性计算。流处理通过流管道完成,该管道由三部分组成:元素源、零个或多个中间操作(如 filtermap)以及一个终端操作。

这个框架非常强大和高效,并通过可定制的终端操作提供了一些可扩展性。可用的中间操作是固定大小的,虽然内置操作非常有用,但一些复杂的任务无法轻易地表示为流管道。Gatherers 应运而生。Gatherers 提供了自定义中间操作的能力。

借助 gatherers,流 API 已更新以支持一个 gather 中间操作,该操作接受一个 gatherer 并返回一个转换后的流。让我们深入了解 gatherers 的更多细节。

一个 gatherer 由四个功能部分定义

  • 可选的 initializer 只是一个 Supplier,它返回一些(初始)状态。

  • integrator 通常是最重要的部分。它满足以下接口

    interface Integrator<A, T, R> {
        boolean integrate(A state, T element, Downstream<? super R> downstream);
    }

    其中 state 是一些状态——我们将在接下来的几个示例中将列表用作状态,但它也可以是其他类或记录的实例;element 是当前流中要处理的下一个元素;downstream 是用于创建将在流管道下一阶段处理的元素的钩子。

  • 可选的 finisher 可以访问状态和下游管道钩子。它执行可能需要的任何最后步骤操作。

  • 可选的 combiner 用于在并行处理输入流时并行评估 gatherer。我们将在本博客文章中查看的示例本质上是有序的,因此无法并行化,因此我们在此不进一步讨论这方面。

除了 Gatherer API 之外,还有许多内置的 gatherer,如 windowFixedwindowSlidingfold 等。

在进入 gatherers 将变得至关重要的功能之前,我们首先来看看如何访问集合,其中集合和流 API 以及相关扩展方法都提供了很好的功能。

访问集合的某些部分

Groovy 提供了非常灵活的索引变体来从集合中选择特定元素

assert (1..8)[0..2] == [1, 2, 3]                   // index by closed range
assert (1..8)[3<..<6] == [5, 6]                    // index by open range
assert (1..8)[0..2,3..4,5] == [1, 2, 3, 4, 5, 6]   // index by multiple ranges
assert (1..8)[0..2,3..-1] == 1..8                  // ditto
assert (1..8)[0,2,4,6] == [1,3,5,7]                // select odd numbers
assert (1..8)[1,3,5,7] == [2,4,6,8]                // select even numbers

你还可以使用 takedrop 选取一个元素窗口

assert (1..8).take(3) == [1, 2, 3]                 // same as [0..2]
assert (1..8).drop(2).take(3) == [3, 4, 5]         // same as [2..4]

流用户可能会使用 skiplimit 做同样的事情

assert (1..8).stream().limit(3).toList() == [1, 2, 3]
assert (1..8).stream().skip(2).limit(3).toList() == [3, 4, 5]

我们在这里可以看到 droptake 有流等效项,但 Groovy 操纵集合的一些更复杂的机制呢?很高兴你问了。让我们看看 collatechop 的流等效项。

整理 (Collate)

整理列表 - 由 Dall-E 3 生成 Groovy 的 collate 方法将集合分成固定大小的块

assert (1..8).collate(3) == [[1, 2, 3], [4, 5, 6], [7, 8]]

此示例中的最后一个块小于块大小。它包含在创建所有完整大小的块后剩余的元素。如果我们不想要剩余的块,我们可以使用可选的布尔参数要求将其排除

assert (1..8).collate(3, false) == [[1, 2, 3], [4, 5, 6]]

除非你想多次处理流,或者你将所有逻辑都塞进收集器中,否则这种功能在流中是不可能实现的,但那样你就会放弃流的一些关键优势。幸运的是,有了 gatherers,我们现在可以获得这种功能。

第一个案例非常常见,有一个内置的 gatherer (Gatherers#windowFixed) 用于它

assert (1..8).stream().gather(windowFixed(3)).toList() ==
    [[1, 2, 3], [4, 5, 6], [7, 8]]

没有完全等效的方法来处理丢弃剩余元素的较不常见情况,但编写我们自己的 gatherer 足够容易

<T> Gatherer<T, ?, List<T>> windowFixedTruncating(int windowSize) {
    Gatherer.ofSequential(
        () -> [],                                                      // initializer
        Gatherer.Integrator.ofGreedy { window, element, downstream ->  // integrator
            window << element
            if (window.size() < windowSize) return true
            var result = List.copyOf(window)
            window.clear()
            downstream.push(result)
        }
    )
}

我们有一个初始化器,它只是返回一个空列表作为我们的初始状态。集成器不断将元素添加到状态(我们的列表或窗口)。一旦列表填充到窗口大小,我们就会将其输出到下游,然后清除列表,为下一个窗口做好准备。

这里的代码本质上是 windowFixed 的简化版本,我们可以直接省略 windowFixed 可能需要输出末尾部分填充窗口的 finisher。

一些细节

  • 我们的操作是顺序的,因为它本质上是有序的,因此我们使用 ofSequential 来标记它。

  • 我们也将始终处理所有元素,因此我们使用 ofGreedy 创建了一个贪婪的 gatherer。虽然并非严格必要,但这允许对管道进行优化。

  • 我们在此示例中特意省略了一些验证逻辑,以专注于新的 gatherer 功能。查看 windowFixed 如何对小于 1 的窗口大小抛出 IllegalArgumentException,以了解如果你在生产中使用此功能,实际上还应该添加什么。

我们会这样使用 windowFixedTruncating

assert (1..8).stream().gather(windowFixedTruncating(3)).toList() ==
    [[1, 2, 3], [4, 5, 6]]

使用 collate 时的默认设置是从上一个块/窗口之后的元素开始下一个块/窗口,但也有接受步长的重载。这用于计算第二个(及后续)窗口将开始的索引。还有一个可选的 keepRemaining 布尔值来处理剩余情况。如果我们想按 1 滑动并丢弃剩余元素,我们会使用

assert (1..5).collate(3, 1, false) == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

这与内置的 windowSliding gatherer 相符

assert (1..5).stream().gather(windowSliding(3)).toList() ==
    [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

如果我们希望步长不是 1,或者我们想控制剩余元素,则没有内置的 gatherer 选项,但我们可以再次自己编写一个。让我们考虑一些例子。我们很快会查看 gatherer 实现,但首先是 Groovy 的集合变体

assert (1..5).collate(3, 1) == [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).collate(3, 2) == [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).collate(3, 2, false) == [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).collate(3, 4, false) == [[1, 2, 3], [5, 6, 7]]
assert (1..8).collate(3, 3) == [[1, 2, 3], [4, 5, 6], [7, 8]]  // same as collate(3)

现在我们来编写 gatherer

<T> Gatherer<T, ?, List<T>> windowSlidingByStep(int windowSize, int stepSize, boolean keepRemaining = true) {
    int skip = 0
    Gatherer.ofSequential(
        () -> [],                                                      // initializer
        Gatherer.Integrator.ofGreedy { window, element, downstream ->  // integrator
            if (skip) {
                skip--
                return true
            }
            window << element
            if (window.size() < windowSize) return true
            var result = List.copyOf(window)
            skip = stepSize > windowSize ? stepSize - windowSize : 0
            [stepSize, windowSize].min().times { window.removeFirst() }
            downstream.push(result)
        },
        (window, downstream) -> {                                      // finisher
            if (keepRemaining) {
                while(window.size() > stepSize) {
                    downstream.push(List.copyOf(window))
                    stepSize.times{ window.removeFirst() }
                }
                downstream.push(List.copyOf(window))
            }
        }
    )
}

几点说明

  • 我们的 gatherer 仍然是顺序的,原因与之前相同。我们仍然在处理每个元素,所以我们再次创建了一个贪婪的 gatherer。

  • 我们的代码中内置了一些优化。如果我们的步长大于窗口大小,我们无法在 gatherer 中对窗口之间的元素进行进一步处理。我们可以简化代码并存储这些元素,然后稍后丢弃它们,但尽可能提高算法效率并不需要太多精力。

  • 我们还需要一个 finisher 来处理(如果需要的话)剩余的块。

  • 与前面的示例一样,我们选择省略了一些参数验证逻辑。

我们将这样使用它

assert (1..5).stream().gather(windowSlidingByStep(3, 1)).toList() ==
    [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2)).toList() ==
    [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2, false)).toList() ==
    [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 4, false)).toList() ==
    [[1, 2, 3], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 3)).toList() ==
    [[1, 2, 3], [4, 5, 6], [7, 8]]

在离开本节之前,让我们看几个使用 Groovy 语言集成查询功能的示例,作为操作集合的另一种方式。

首先,我们看到的 take / limit 的等效操作

assert GQL {
    from n in 1..8
    limit 3
    select n
} == [1, 2, 3]

然后,如果我们添加 drop / skip 的等效操作

assert GQL {
    from n in 1..8
    limit 2, 3
    select n
} == [3, 4, 5]

最后,一个滑动窗口的等效操作

assert GQL {
    from ns in (
        from n in 1..8
        select n, (lead(n) over(orderby n)), (lead(n, 2) over(orderby n))
    )
    limit 3
    select ns
}*.toList() == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

切分 (Chop)

切分列表 - 由 Dall-E 3 生成 Groovy 中一个相关的集合扩展方法是 chop。对于这个方法,我们也从原始集合中创建块,但不是指定适用于所有块的固定大小,而是指定我们希望每个块的大小。我们提供一个大小列表,每个大小只使用一次。特殊大小 -1 表示我们希望将集合的其余部分作为最后一个块。

assert (1..8).chop(3) == [[1, 2, 3]]
assert (1..8).chop(3, 2, 1) == [[1, 2, 3], [4, 5], [6]]
assert (1..8).chop(3, -1) == [[1, 2, 3], [4, 5, 6, 7, 8]]

没有原始流或预构建的 gatherer 用于此功能。我们将自己编写

<T> Gatherer<T, ?, List<T>> windowMultiple(int... steps) {
    var remaining = steps.toList()
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.of { window, element, downstream ->
            if (!remaining) {
                return false
            }
            window << element
            if (remaining[0] != -1) remaining[0]--
            if (remaining[0]) return true
            remaining.removeFirst()
            var result = List.copyOf(window)
            window.clear()
            downstream.push(result)
        },
        (window, downstream) -> {
            if (window) {
                var result = List.copyOf(window)
                downstream.push(result)
            }
        }
    )
}

几点说明

  • 这同样是一个有序算法,所以我们再次使用 ofSequential

  • 这类似于我们用于整理的功能,但在处理元素时,我们为每个块大小设置了不同大小的窗口。

  • 一旦我们到达最后一个块,我们就不想处理更多的元素,除非我们看到特殊的 -1 标记,所以我们不会创建一个贪婪的 gatherer。

  • 我们确实需要一个 finisher 来潜在地输出已存储但尚未推送到下游的元素。

我们会这样使用 windowMultiple

assert (1..8).stream().gather(windowMultiple(3)).toList() ==
    [[1, 2, 3]]
assert (1..8).stream().gather(windowMultiple(3, 2, 1)).toList() ==
    [[1, 2, 3], [4, 5], [6]]
assert (1..8).stream().gather(windowMultiple(3, -1)).toList() ==
    [[1, 2, 3], [4, 5, 6, 7, 8]]

注入 (Inject)、折叠 (fold) 和扫描 (scan)

Groovy 的 inject 与流 API 的 reduce 中间操作略有不同。后者需要一个二元运算符,它限制了被消费和生产的元素的类型。

inject 方法的参数可以有不同的类型,如下所示

assert (1..5).inject(''){ string, number -> string + number } == '12345'

内置的 fold gatherer 允许我们为流处理编写等效功能,如下所示

assert (1..5).stream()
             .gather(fold(() -> '', (string, number) -> string + number))
             .findFirst()
             .get() == '12345'

让我们看另一个 inject 示例。这次是累积和。如果我们有一个数字序列,累积和是另一个序列,其在任何索引处的值是通过累积原始序列中直到并包括该索引的所有数字来确定的,例如,[1, 2, 3, 4] 的累积和是 [1, 3, 6, 10]

这再次非常适合 Groovy 的 inject

assert (1..5).inject([]) { acc, next ->
    acc + [acc ? acc.last() + next : next]
} == [1, 3, 6, 10, 15]

Groovy 有多种替代方法来实现此功能。这是使用 inits 的一种方法

assert (1..5).inits().grep().reverse()*.sum() == [1, 3, 6, 10, 15]

inits 是一个列表处理函数,我们将在下一节中详细介绍。

在检查 gatherer 等效项之前,我们应该注意,此特定操作被认为足够有用,以至于 Java 实际上为数组提供了内置库函数

Integer[] nums = 1..5
Arrays.parallelPrefix(nums, Integer::sum)
assert nums == [1, 3, 6, 10, 15]

累积和不适用于传统流,但现在有了 gatherers,我们可以使用内置的 scan gatherer 在处理流时拥有类似的功能

assert (1..5).stream()
             .gather(scan(() -> 0, Integer::sum))
             .toList() == [1, 3, 6, 10, 15]

测试子序列(initstails 的乐趣)

作为最后一个例子,让我们看看如何测试一个列表是否是另一个列表的子集。

我们将从一个单词列表和一个包含有序搜索词的列表开始

var words = 'the quick brown fox jumped over the lazy dog'.split().toList()
var search = 'brown fox'.split().toList()

事实证明,JDK 中已经为集合解决了这个问题

assert Collections.indexOfSubList(words, search) != -1

让我们看看一些可能的流实现。但首先岔开一下。对于任何可能涉猎过 Haskell 的函数式程序员,你可能见过 Learn You a Haskell for Great Good! 这本书。它提出了一个有趣的练习,使用 initstails 寻找“大海捞针”。那么 initstails 是什么?它们是 Haskell 和 Groovy 中的内置函数

assert (1..6).inits() == [[1, 2, 3, 4, 5, 6],
                          [1, 2, 3, 4, 5],
                          [1, 2, 3, 4],
                          [1, 2, 3],
                          [1, 2],
                          [1],
                          []]

assert (1..6).tails() == [[1, 2, 3, 4, 5, 6],
                             [2, 3, 4, 5, 6],
                                [3, 4, 5, 6],
                                   [4, 5, 6],
                                      [5, 6],
                                         [6],
                                          []]

一旦我们了解了这些方法,我们就可以将 Groovy 中集合的“大海捞针”解决方案转述如下

var found = words.tails().any{ subseq -> subseq.inits().contains(search) }
assert found

它可能不是此功能最有效的实现,但它具有很好的对称性。现在让我们探索一些基于流的解决方案。

我们可以从 tails gatherer 开始

<T> Gatherer<T, ?, List<T>> tails() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            state << element
            return true
        },
        (state, downstream) -> {
            state.tails().each(downstream::push)
        }
    )
}

在集成器中,我们只存储所有元素,而在 finisher 中我们完成所有工作。这可行,但并未真正充分利用流管道的特性。

我们可以检查它是否这样工作

assert search.stream().gather(tails()).toList() ==
    [['brown', 'fox'], ['fox'], []]

我们可以继续这种方法来创建 initsOfTails gatherer

<T> Gatherer<T, ?, List<T>> initsOfTails() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            state << element
            return true
        },
        (state, downstream) -> {
            state.tails()*.inits().sum().each(downstream::push)
        }
    )
}

同样,所有的工作都在 finisher 中完成,我们并没有真正利用流管道的力量。

当然它仍然有效

assert words.stream().gather(initsOfTails()).anyMatch { it == search }

但更有效的方法可能是将流收集为列表,并在此列表上使用 Groovy 的内置 initstails

但并非一无是处。如果我们愿意稍微调整一下算法,我们可以更好地利用流管道。例如,如果我们不介意以相反的顺序获取 inits 结果,我们可以为 inits 定义以下 gatherer

<T> Gatherer<T, ?, List<T>> inits() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            downstream.push(List.copyOf(state))
            state << element
            return true
        },
        (state, downstream) -> {
            downstream.push(state)
        }
    )
}

我们会这样使用它

assert search.stream().gather(inits()).toList() ==
    [[], ['brown'], ['brown', 'fox']]

结论

Groovy 拥有丰富的与集合协作的方法,这非常棒。其中一些方法有流等效项,现在我们看到,通过在 Groovy 中使用 gatherers,我们可以模拟更多的方法。并非所有算法都需要或受益于使用流,但很高兴知道有了 gatherers,我们很可能选择任何有意义的风格。

更新历史

2023 年 12 月 9 日:初始版本
2024 年 1 月 18 日:更新了扫描/累积和示例。
2025 年 3 月 19 日:针对 JDK24 进行了更新。