使用 Gatherers 与 Groovy

作者:Paul King
发布时间:2023-12-09 03:30PM(最后更新时间:2024-01-18 10:00PM)


JDK22 中正在预览的一项有趣功能是GatherersJEP 461)。本博客探讨了使用 Groovy 中的这项功能。本博客中的示例使用 Groovy 4.0.16 和 JDK 版本 22-ea+27-2262 进行测试。由于我们使用的 JDK 版本仍处于早期访问状态,因此您应阅读免责声明以了解此 JDK 功能在最终发布之前可能会发生变化。如果此功能最终成为最终版本,Groovy 似乎将自动支持它,无需任何额外的调整。

理解 Gatherers

Java 开发人员现在已经非常熟悉流。流是支持延迟计算的潜在无限值序列。流的处理通过流管道完成,流管道包含三个部分:元素源、零个或多个中间操作(如filtermap)以及终止操作。

此框架非常强大且高效,并通过可定制的终止操作提供了一些可扩展性。可用中间操作的大小是固定的,虽然内置操作非常有用,但一些复杂的任务不能轻松地表示为流管道。这就是Gatherers的用武之地。Gatherers 提供了定制中间操作的能力。

使用 Gatherers,流 API 更新为支持gather 中间操作,该操作接收一个 Gatherer 并返回一个经过转换的流。让我们深入了解 Gatherers 的更多细节。

Gatherer 由四个功能部分定义

  • 可选的初始化器只是一个Supplier,它返回一些(初始)状态。

  • 集成器通常是最重要的部分。它满足以下接口

    interface Integrator<A, T, R> {
        boolean integrate(A state, T element, Downstream<? super R> downstream);
    }

    其中state 是某种状态,我们将在接下来的几个示例中使用列表作为状态,但它也可以是其他类或记录的实例,element 是当前流中要处理的下一个元素,downstream 是用于创建将在流管道下一阶段中处理的元素的钩子。

  • 可选的finisher 可以访问状态和下游管道钩子。它执行可能需要的任何最后一步操作。

  • 可选的组合器用于在并行处理输入流时并行评估 Gatherer。我们将在本博客文章中介绍的示例本质上是有序的,因此无法并行化,所以我们不会在此处进一步讨论这方面。

除了 Gatherer API 之外,还有许多内置 Gatherers,例如windowFixedwindowSlidingfold 等。

在深入了解 Gatherers 变得必不可少的功能之前,让我们先从查看访问集合开始,集合和流 API 以及相关扩展方法中已经很好地提供了相关功能。

访问集合的各个部分

Groovy 提供了非常灵活的索引变体,用于从集合中选择特定元素

assert (1..8)[0..2] == [1, 2, 3]                   // index by closed range
assert (1..8)[3<..<6] == [5, 6]                    // index by open range
assert (1..8)[0..2,3..4,5] == [1, 2, 3, 4, 5, 6]   // index by multiple ranges
assert (1..8)[0..2,3..-1] == 1..8                  // ditto
assert (1..8)[0,2,4,6] == [1,3,5,7]                // select odd numbers
assert (1..8)[1,3,5,7] == [2,4,6,8]                // select even numbers

您还可以使用takedrop 选择出一组元素

assert (1..8).take(3) == [1, 2, 3]                 // same as [0..2]
assert (1..8).drop(2).take(3) == [3, 4, 5]         // same as [2..4]

流用户可以使用skiplimit 做同样的事情

assert (1..8).stream().limit(3).toList() == [1, 2, 3]
assert (1..8).stream().skip(2).limit(3).toList() == [3, 4, 5]

我们可以看到这里有droptake 的流等效项,但是 Groovy 中更复杂的用于操作集合的机制呢?我很高兴您问到。让我们看看collatechop 的流等效项。

整理

整理一个列表 - 由 Dall-E 3 生成 Groovy 的collate 方法将集合拆分为固定大小的块

assert (1..8).collate(3) == [[1, 2, 3], [4, 5, 6], [7, 8]]

此示例中的最后一个块小于块大小。它包含在创建所有完整大小的块后剩余的剩余元素。如果我们不想要剩余的块,我们可以使用一个可选的布尔参数来要求排除它

assert (1..8).collate(3, false) == [[1, 2, 3], [4, 5, 6]]

除非您想要多次处理流,或者将所有逻辑都塞到收集器中,否则这种功能在流中并不真正可行,但那样您就会失去流的一些关键优势。幸运的是,有了 Gatherers,我们现在可以获得此功能。

第一种情况非常常见,有一个内置的 Gatherer(Gatherers#windowFixed)用于处理这种情况

assert (1..8).stream().gather(windowFixed(3)).toList() ==
    [[1, 2, 3], [4, 5, 6], [7, 8]]

没有完全等效的处理更不常见的情况(即丢弃剩余元素),但我们可以很容易地编写自己的 Gatherer

<T> Gatherer<T, ?, List<T>> windowFixedTruncating(int windowSize) {
    Gatherer.ofSequential(
        () -> [],                                                      // initializer
        Gatherer.Integrator.ofGreedy { window, element, downstream ->  // integrator
            window << element
            if (window.size() < windowSize) return true
            var result = List.copyOf(window)
            window.clear()
            downstream.push(result)
        }
    )
}

我们有一个初始化器,它只是返回一个空列表作为我们的初始状态。集成器不断将元素添加到状态(我们的列表或窗口)。一旦列表填充满窗口大小,我们就会将其输出到下游,然后清除列表,准备下一个窗口。

这里的代码本质上是windowFixed 的简化版本,我们只需要省略windowFixed 在末尾输出部分填充的窗口所需的完成器即可。

一些细节

  • 我们的操作是顺序的,因为它本质上是有序的,因此我们使用ofSequential 来标记它。

  • 我们也会始终处理所有元素,所以我们使用ofGreedy 创建一个贪婪的 Gatherer。虽然并非严格必要,但这可以优化管道。

  • 在示例中,我们特意省略了一些验证逻辑,以便专注于新的 Gatherer 功能。查看windowFixed 如何为小于 1 的窗口大小抛出IllegalArgumentException,了解在生产环境中使用此功能时应该添加什么。

我们使用windowFixedTruncating 如下

assert (1..8).stream().gather(windowFixedTruncating(3)).toList() ==
    [[1, 2, 3], [4, 5, 6]]

使用collate 时的默认行为是在直接位于前一个元素之后的元素处开始下一个块/窗口,但是有一些重载也接受步长。这用于计算第二个(和后续)窗口的起始索引。还有一个可选的keepRemaining 布尔值来处理剩余的情况。如果我们想以 1 为步长进行滑动并丢弃剩余元素,我们将使用

assert (1..5).collate(3, 1, false) == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

这与内置的windowSliding Gatherer 一致

assert (1..5).stream().gather(windowSliding(3)).toList() ==
    [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

如果我们希望步长不为 1,或者我们希望控制剩余元素,则没有内置的 Gatherer 选项,但我们可以再次自己编写一个。让我们考虑一些示例。我们将在稍后查看 Gatherer 实现,但首先是 Groovy 的集合变体

assert (1..5).collate(3, 1) == [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).collate(3, 2) == [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).collate(3, 2, false) == [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).collate(3, 4, false) == [[1, 2, 3], [5, 6, 7]]
assert (1..8).collate(3, 3) == [[1, 2, 3], [4, 5, 6], [7, 8]]  // same as collate(3)

现在让我们编写我们的 Gatherer

<T> Gatherer<T, ?, List<T>> windowSlidingByStep(int windowSize, int stepSize, boolean keepRemaining = true) {
    int skip = 0
    Gatherer.ofSequential(
        () -> [],                                                      // initializer
        Gatherer.Integrator.ofGreedy { window, element, downstream ->  // integrator
            if (skip) {
                skip--
                return true
            }
            window << element
            if (window.size() < windowSize) return true
            var result = List.copyOf(window)
            skip = stepSize > windowSize ? stepSize - windowSize : 0
            [stepSize, windowSize].min().times { window.removeFirst() }
            downstream.push(result)
        },
        (window, downstream) -> {                                      // finisher
            if (keepRemaining) {
                while(window.size() > stepSize) {
                    downstream.push(List.copyOf(window))
                    stepSize.times{ window.removeFirst() }
                }
                downstream.push(List.copyOf(window))
            }
        }
    )
}

一些要点

  • 我们的 Gatherer 仍然是顺序的,原因与之前相同。我们仍然在处理每个元素,因此我们再次创建了一个贪婪的 Gatherer。

  • 我们在代码中加入了一些优化。如果我们的步长大于窗口大小,我们可以对窗口之间的元素停止在 Gatherer 中进行进一步处理。我们可以简化代码,仅存储这些元素,然后将其丢弃,但使算法尽可能高效并不需要太多努力。

  • 我们还需要一个完成器,在需要时处理剩余的块。

  • 与前面的示例一样,我们选择省略一些参数验证逻辑。

我们将像这样使用它

assert (1..5).stream().gather(windowSlidingByStep(3, 1)).toList() ==
    [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2)).toList() ==
    [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2, false)).toList() ==
    [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 4, false)).toList() ==
    [[1, 2, 3], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 3)).toList() ==
    [[1, 2, 3], [4, 5, 6], [7, 8]]

离开本节之前,让我们看几个使用 Groovy 的语言集成查询功能作为操作集合的替代方法的示例。

首先,是我们使用take / limit 所看到的等效项

assert GQL {
    from n in 1..8
    limit 3
    select n
} == [1, 2, 3]

然后,是添加了drop / skip 的等效项

assert GQL {
    from n in 1..8
    limit 2, 3
    select n
} == [3, 4, 5]

最后,是滑动窗口的等效项

assert GQL {
    from ns in (
        from n in 1..8
        select n, (lead(n) over(orderby n)), (lead(n, 2) over(orderby n))
    )
    limit 3
    select ns
}*.toList() == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

切片

切片一个列表 - 由 Dall-E 3 生成 Groovy 中一个相关的集合扩展方法是chop。对于此方法,我们也从原始集合中创建块,但不是指定适用于所有块的固定大小,而是指定每个块想要的大小。我们提供一个大小列表,每个大小只使用一次。特殊大小-1 表示我们希望将集合的其余部分作为最后一个块。

assert (1..8).chop(3) == [[1, 2, 3]]
assert (1..8).chop(3, 2, 1) == [[1, 2, 3], [4, 5], [6]]
assert (1..8).chop(3, -1) == [[1, 2, 3], [4, 5, 6, 7, 8]]

没有用于此功能的原始流或预构建的 Gatherer。我们将自己编写一个

<T> Gatherer<T, ?, List<T>> windowMultiple(int... steps) {
    var remaining = steps.toList()
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.of { window, element, downstream ->
            if (!remaining) {
                return false
            }
            window << element
            if (remaining[0] != -1) remaining[0]--
            if (remaining[0]) return true
            remaining.removeFirst()
            var result = List.copyOf(window)
            window.clear()
            downstream.push(result)
        },
        (window, downstream) -> {
            if (window) {
                var result = List.copyOf(window)
                downstream.push(result)
            }
        }
    )
}

一些要点

  • 这也是一个有序算法,所以我们再次使用ofSequential

  • 这与我们用于整理的算法类似,但我们在处理元素时每个块的大小都有所不同。

  • 一旦我们到达最后一个块,我们就不希望进一步处理元素,除非我们看到特殊的 -1 标记,因此我们不会创建一个贪婪的 Gatherer。

  • 我们确实需要一个完成器来输出可能已存储但尚未推送到下游的元素。

我们将像这样使用windowMultiple

assert (1..8).stream().gather(windowMultiple(3)).toList() ==
    [[1, 2, 3]]
assert (1..8).stream().gather(windowMultiple(3, 2, 1)).toList() ==
    [[1, 2, 3], [4, 5], [6]]
assert (1..8).stream().gather(windowMultiple(3, -1)).toList() ==
    [[1, 2, 3], [4, 5, 6, 7, 8]]

注入、折叠和扫描

Groovy 的inject 与流 API 的reduce 中间操作略有不同。后者期望一个二元运算符,该运算符限制了被使用和产生的元素的类型。

inject 方法可以像这里显示的那样具有不同的参数类型

assert (1..5).inject(''){ string, number -> string + number } == '12345'

fold 内置 Gatherer 使我们能够为流处理编写等效功能,如这里所示

assert (1..5).stream()
             .gather(fold(() -> '', (string, number) -> string + number))
             .findFirst()
             .get() == '12345'

让我们看另一个inject 示例。这次是累积和。如果我们有一系列数字,累积和是另一个序列,其在任何索引处的数值由原始序列中从开始到包括该索引的所有数字累加得出,例如[1, 2, 3, 4] 的累积和为[1, 3, 6, 10]

这再次非常适合 Groovy 的inject

assert (1..5).inject([]) { acc, next ->
    acc + [acc ? acc.last() + next : next]
} == [1, 3, 6, 10, 15]

Groovy 有多种方法可以实现此功能。以下是一个使用inits 的示例

assert (1..5).inits().grep().reverse()*.sum() == [1, 3, 6, 10, 15]

inits 是一个列表处理函数,我们将在下一节中更详细地介绍。

在检查 Gatherer 等效项之前,我们应该注意,此特定操作被认为足够有用,以至于 Java 实际上为数组提供了内置库函数

Integer[] nums = 1..5
Arrays.parallelPrefix(nums, Integer::sum)
assert nums == [1, 3, 6, 10, 15]

累积和不太适合传统的流,但现在有了 Gatherers,我们可以使用scan 内置 Gatherer 在处理流时获得类似的功能

assert (1..5).stream()
             .gather(scan(() -> 0, Integer::sum))
             .toList() == [1, 3, 6, 10, 15]

测试子序列(initstails 的乐趣)

作为最后一个示例,让我们看看如何测试一个列表是否是另一个列表的子集。

我们将从一个词列表和一个包含有序搜索词的列表开始

var words = 'the quick brown fox jumped over the lazy dog'.split().toList()
var search = 'brown fox'.split().toList()

事实证明,这已经在 JDK 中针对集合解决了

assert Collections.indexOfSubList(words, search) != -1

让我们看看一些可能的流实现。但首先要进行一个改道。对于可能已经尝试过 Haskell 的任何函数式编程人员,您可能已经看过这本书Learn You a Haskell for Great Good!。它为使用initstails 找到“大海捞针”设置了一项有趣的练习。那么什么是initstails 呢?它们是 Haskell 和 Groovy 中的内置函数

assert (1..6).inits() == [[1, 2, 3, 4, 5, 6],
                          [1, 2, 3, 4, 5],
                          [1, 2, 3, 4],
                          [1, 2, 3],
                          [1, 2],
                          [1],
                          []]

assert (1..6).tails() == [[1, 2, 3, 4, 5, 6],
                             [2, 3, 4, 5, 6],
                                [3, 4, 5, 6],
                                   [4, 5, 6],
                                      [5, 6],
                                         [6],
                                          []]

一旦我们了解了这些方法,我们就可以用 Groovy 将“大海捞针”的解决方案解释如下

var found = words.tails().any{ subseq -> subseq.inits().contains(search) }
assert found

这可能不是实现此功能最有效的方案,但它具有很好的对称性。现在让我们探索一些基于流的解决方案。

我们可以从一个tails收集器开始。

<T> Gatherer<T, ?, List<T>> tails() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            state << element
            return true
        },
        (state, downstream) -> {
            state.tails().each(downstream::push)
        }
    )
}

在集成器中,我们只存储所有元素,在终结器中进行所有工作。这可以工作,但实际上并没有真正利用流管道特性。

我们可以通过以下方式检查它是否有效。

assert search.stream().gather(tails()).toList() ==
    [['brown', 'fox'], ['fox'], []]

我们可以继续使用这种方法来创建一个initsOfTails收集器。

<T> Gatherer<T, ?, List<T>> initsOfTails() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            state << element
            return true
        },
        (state, downstream) -> {
            state.tails()*.inits().sum().each(downstream::push)
        }
    )
}

同样,所有工作都在终结器中完成,我们并没有真正利用流管道的强大功能。

当然,它仍然有效。

assert words.stream().gather(initsOfTails()).anyMatch { it == search }

但是,将流作为列表收集并使用 Groovy 的内置 initstails 可能更有效。

但并非所有希望都落空。如果我们愿意稍微调整一下算法,我们可以更好地利用流管道。例如,如果我们不介意以相反的顺序获取 inits 结果,我们可以定义以下用于 inits 的收集器。

<T> Gatherer<T, ?, List<T>> inits() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            downstream.push(List.copyOf(state))
            state << element
            return true
        },
        (state, downstream) -> {
            downstream.push(state)
        }
    )
}

我们将像这样使用它。

assert search.stream().gather(inits()).toList() ==
    [[], ['brown'], ['brown', 'fox']]

结论

Groovy 拥有一套丰富的与集合一起使用的方法,这真是太棒了。其中一些方法具有流等效项,现在我们看到,使用收集器与 Groovy,我们可以模拟更多方法。并非所有算法都需要或从使用流中受益,但很高兴知道,使用收集器,我们可以选择任何有意义的风格。

收集器仍然处于早期阶段,因此随着此功能变得更加主流,预计将出现更多内容。我们期待它超越预览状态。

更新历史

2024 年 1 月 18 日:更新了扫描/累积求和示例。