From 7827d1d1809e89c612b1299fb8069230fe6829f0 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 5 Oct 2023 14:40:59 -0400 Subject: [PATCH] Fix Reshuffle implementation in Java SDK --- runners/flink/flink_runner.gradle | 2 + .../flink/job-server/flink_job_server.gradle | 2 + .../samza/src/test/resources/ExpectedDag.json | 68 +++------ .../apache/beam/sdk/transforms/Reshuffle.java | 89 +++++++++++ .../beam/sdk/transforms/ReshuffleTest.java | 139 ++++++++++++++++++ 5 files changed, 253 insertions(+), 47 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index c510b346d5d0..c5946c509a95 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -310,6 +310,8 @@ def createValidatesRunnerTask(Map m) { excludeTestsMatching test } + // Flink reshuffle override does not preserve all metadata + excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata' // https://github.com/apache/beam/issues/20843 excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode' // https://github.com/apache/beam/issues/20845 diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 9643134d95b2..ab3f726e6ad9 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -196,6 +196,8 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle' }, testFilter: { + // Flink reshuffle override does not preserve all metadata + excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata' // TODO(https://github.com/apache/beam/issues/20269) excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2' // TODO(https://github.com/apache/beam/issues/20843) diff --git a/runners/samza/src/test/resources/ExpectedDag.json b/runners/samza/src/test/resources/ExpectedDag.json index 3165fc84958c..c61b80134d8a 100644 --- a/runners/samza/src/test/resources/ExpectedDag.json +++ b/runners/samza/src/test/resources/ExpectedDag.json @@ -98,26 +98,26 @@ "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow", "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()/Window.Assign", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()" + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow/Window.Assign", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow" } ] }, { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata", "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)" + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)" } ] } @@ -138,38 +138,16 @@ ] }, { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreMetadata", "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreMetadata/ParDo(Anonymous)", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreMetadata", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard", - "ChildNodes": [ - { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)" - } - ] - } - ] - }, - { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps", - "ChildNodes": [ - { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues", - "ChildNodes": [ - { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)" - } - ] + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreMetadata/ParDo(Anonymous)" } ] } @@ -287,14 +265,14 @@ }, { "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/Assign unique key/AddKeys/Map/ParMultiDo(Anonymous)", - "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()/Window.Assign" + "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow/Window.Assign" }, { - "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()/Window.Assign", - "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)" + "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow/Window.Assign", + "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)" }, { - "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)", "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/GroupByKey" }, { @@ -303,14 +281,10 @@ }, { "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ExpandIterable/ParMultiDo(Anonymous)", - "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)" - }, - { - "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)", - "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)" + "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)" }, { - "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)", "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Drop key/Values/Map/ParMultiDo(Anonymous)" }, { @@ -357,7 +331,7 @@ }, { "transformName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Drop key/Values/Map/ParMultiDo(Anonymous)", - "inputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output", + "inputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous).output", "outputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Drop key/Values/Map/ParMultiDo(Anonymous).output" }, { @@ -378,7 +352,7 @@ { "transformName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle", "inputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/Assign unique key/AddKeys/Map/ParMultiDo(Anonymous).output", - "outputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output" + "outputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous).output" }, { "transformName": "Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 42f0f6accc7f..a879667ff96f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -17,8 +17,13 @@ */ package org.apache.beam.sdk.transforms; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; @@ -27,7 +32,9 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -67,6 +74,63 @@ public static ViaRandomKey viaRandomKey() { @Override public PCollection> expand(PCollection> input) { + String requestedVersionString = + input.getPipeline().getOptions().as(StreamingOptions.class).getUpdateCompatibilityVersion(); + + if (requestedVersionString != null) { + List requestedVersion = Arrays.asList(requestedVersionString.split("\\.")); + List targetVersion = Arrays.asList("2", "53", "0"); + + if (Comparators.lexicographical(Comparator.naturalOrder()) + .compare(requestedVersion, targetVersion) + <= 0) { + return expand_2_53_0(input); + } + } + + WindowingStrategy originalStrategy = input.getWindowingStrategy(); + // If the input has already had its windows merged, then the GBK that performed the merge + // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained + // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged. + // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in + // time. + // Because this outputs as fast as possible, this should not hold the watermark. + Window> rewindow = + Window.>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder())) + .triggering(new ReshuffleTrigger<>()) + .discardingFiredPanes() + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + + PCollection>> reified = + input + .apply("SetIdentityWindow", rewindow) + .apply("ReifyOriginalMetadata", Reify.windowsInValue()); + + PCollection>>> grouped = + reified.apply(GroupByKey.create()); + return grouped + .apply( + "ExpandIterable", + ParDo.of( + new DoFn>>, KV>>() { + @ProcessElement + public void processElement( + @Element KV>> element, + OutputReceiver>> r) { + K key = element.getKey(); + for (ValueInSingleWindow value : element.getValue()) { + r.output(KV.of(key, value)); + } + } + })) + .apply("RestoreMetadata", new RestoreMetadata<>()) + // Set the windowing strategy directly, so that it doesn't get counted as the user having + // set allowed lateness. + .setWindowingStrategyInternal(originalStrategy); + } + + private PCollection> expand_2_53_0(PCollection> input) { WindowingStrategy originalStrategy = input.getWindowingStrategy(); // If the input has already had its windows merged, then the GBK that performed the merge // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained @@ -105,6 +169,31 @@ public void processElement( .apply("RestoreOriginalTimestamps", ReifyTimestamps.extractFromValues()); } + private static class RestoreMetadata + extends PTransform>>, PCollection>> { + @Override + public PCollection> expand(PCollection>> input) { + return input.apply( + ParDo.of( + new DoFn>, KV>() { + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(Long.MAX_VALUE); + } + + @ProcessElement + public void processElement( + @Element KV> kv, OutputReceiver> r) { + r.outputWindowedValue( + KV.of(kv.getKey(), kv.getValue().getValue()), + kv.getValue().getTimestamp(), + Collections.singleton(kv.getValue().getWindow()), + kv.getValue().getPane()); + } + })); + } + } + /** Implementation of {@link #viaRandomKey()}. */ public static class ViaRandomKey extends PTransform, PCollection> { private ViaRandomKey() {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java index 716bd8781831..e0e8a7feed6d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java @@ -24,13 +24,18 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.Serializable; import java.util.List; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; @@ -40,12 +45,15 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -152,6 +160,81 @@ public void testReshufflePreservesTimestamps() { pipeline.run(); } + /** + * Tests that window & pane info is preserved after applying a {@link Reshuffle} with the default + * {@link WindowingStrategy}. + */ + @Test + @Category(ValidatesRunner.class) + public void testReshufflePreservesMetadata() { + PCollection>> input = + pipeline + .apply( + Create.windowedValues( + WindowedValue.of( + "foo", + BoundedWindow.TIMESTAMP_MIN_VALUE, + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING), + WindowedValue.of( + "foo", + new Instant(0), + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING), + WindowedValue.of( + "bar", + new Instant(33), + GlobalWindow.INSTANCE, + PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1)), + WindowedValue.of( + "bar", + GlobalWindow.INSTANCE.maxTimestamp(), + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING)) + .withCoder(StringUtf8Coder.of()) + .withWindowCoder(GlobalWindow.Coder.INSTANCE)) + .apply(WithKeys.of(v -> v).withKeyType(TypeDescriptors.strings())) + .apply("ReifyOriginalMetadata", Reify.windowsInValue()); + + // The outer WindowedValue is the reified metadata post-reshuffle. The inner + // WindowedValue is the pre-reshuffle metadata. + PCollection>> output = + input + .apply(Reshuffle.of()) + .apply("ReifyReshuffledMetadata", Reify.windowsInValue()) + .apply(Values.create()); + + PAssert.that(output) + .satisfies( + input1 -> { + for (ValueInSingleWindow> elem : input1) { + Instant originalTimestamp = elem.getValue().getTimestamp(); + Instant afterReshuffleTimestamp = elem.getTimestamp(); + assertThat( + "Reshuffle did not preserve element timestamp for " + elem, + afterReshuffleTimestamp, + equalTo(originalTimestamp)); + + PaneInfo originalPaneInfo = elem.getValue().getPane(); + PaneInfo afterReshufflePaneInfo = elem.getPane(); + assertThat( + "Reshuffle did not preserve pane info for " + elem, + afterReshufflePaneInfo, + equalTo(originalPaneInfo)); + + BoundedWindow originalWindow = elem.getValue().getWindow(); + BoundedWindow afterReshuffleWindow = elem.getWindow(); + assertThat( + "Reshuffle did not preserve window for " + elem, + afterReshuffleWindow, + equalTo(originalWindow)); + } + return null; + }); + + pipeline.run(); + } + @Test @Category(ValidatesRunner.class) public void testReshuffleAfterSessionsAndGroupByKey() { @@ -301,4 +384,60 @@ public void testAssignShardFn() { pipeline.run(); } + + static class OldTransformSeeker extends Pipeline.PipelineVisitor.Defaults { + boolean isOldTransformFound = false; + + // A class that is only found in the old expansion + private Class restoreTimestampsClass = ReifyTimestamps.extractFromValues().getClass(); + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + if (restoreTimestampsClass.isInstance(node.getTransform())) { + this.isOldTransformFound = true; + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } else { + return CompositeBehavior.ENTER_TRANSFORM; + } + } + } + + @Test + public void testNoOldTransformByDefault() { + pipeline.apply(Create.of(KV.of("arbitrary", "kv"))).apply(Reshuffle.of()); + + OldTransformSeeker seeker = new OldTransformSeeker(); + pipeline.traverseTopologically(seeker); + assertFalse(seeker.isOldTransformFound); + } + + @Test + public void testRequestOldUpdateCompatibility() { + pipeline.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.53.0"); + pipeline.apply(Create.of(KV.of("arbitrary", "kv"))).apply(Reshuffle.of()); + + OldTransformSeeker seeker = new OldTransformSeeker(); + pipeline.traverseTopologically(seeker); + assertTrue(seeker.isOldTransformFound); + } + + @Test + public void testRequestVeryOldUpdateCompatibility() { + pipeline.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.46.0"); + pipeline.apply(Create.of(KV.of("arbitrary", "kv"))).apply(Reshuffle.of()); + + OldTransformSeeker seeker = new OldTransformSeeker(); + pipeline.traverseTopologically(seeker); + assertTrue(seeker.isOldTransformFound); + } + + @Test + public void testNoOldTransformInRecentVersion() { + pipeline.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.54.0"); + pipeline.apply(Create.of(KV.of("arbitrary", "kv"))).apply(Reshuffle.of()); + + OldTransformSeeker seeker = new OldTransformSeeker(); + pipeline.traverseTopologically(seeker); + assertFalse(seeker.isOldTransformFound); + } }