Skip to content

Commit

Permalink
Merge pull request #32567: Fixes a transform upgrade compatibility is…
Browse files Browse the repository at this point in the history
…sue related to BigqueryIO
  • Loading branch information
chamikaramj authored Oct 1, 2024
2 parents dcc0bd2 + 4d254a0 commit 5aed585
Showing 1 changed file with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -751,18 +752,27 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
if (numStorageWriteApiStreams != null) {
builder = builder.setNumStorageWriteApiStreams(numStorageWriteApiStreams);
}
Boolean propagateSuccessfulStorageApiWrites =
configRow.getBoolean("propagate_successful_storage_api_writes");
if (propagateSuccessfulStorageApiWrites != null) {
builder =
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
}
byte[] predicate = configRow.getBytes("propagate_successful_storage_api_writes_predicate");
if (predicate != null) {
builder =
builder.setPropagateSuccessfulStorageApiWritesPredicate(
(Predicate<String>) fromByteArray(predicate));

if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.60.0") >= 0) {
Boolean propagateSuccessfulStorageApiWrites =
configRow.getBoolean("propagate_successful_storage_api_writes");
if (propagateSuccessfulStorageApiWrites != null) {
builder =
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
}

byte[] predicate =
configRow.getBytes("propagate_successful_storage_api_writes_predicate");
if (predicate != null) {
builder =
builder.setPropagateSuccessfulStorageApiWritesPredicate(
(Predicate<String>) fromByteArray(predicate));
}
} else {
builder.setPropagateSuccessfulStorageApiWrites(false);
builder.setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue());
}

Integer maxFilesPerPartition = configRow.getInt32("max_files_per_partition");
if (maxFilesPerPartition != null) {
builder = builder.setMaxFilesPerPartition(maxFilesPerPartition);
Expand Down

0 comments on commit 5aed585

Please sign in to comment.