Skip to content

Commit

Permalink
Core: Transform parquet bloom filter props when updating schema (#5426)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyujiang authored Aug 26, 2022
1 parent af1d405 commit 1526c1f
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 36 deletions.
33 changes: 0 additions & 33 deletions core/src/main/java/org/apache/iceberg/MetricsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.Immutable;
Expand Down Expand Up @@ -64,38 +63,6 @@ public static MetricsConfig getDefault() {
return DEFAULT;
}

static Map<String, String> updateProperties(
Map<String, String> props, List<String> deletedColumns, Map<String, String> renamedColumns) {
if (props.keySet().stream().noneMatch(key -> key.startsWith(METRICS_MODE_COLUMN_CONF_PREFIX))) {
return props;
} else {
Map<String, String> updatedProperties = Maps.newHashMap();
// Put all of the non metrics columns we aren't modifying
props
.keySet()
.forEach(
key -> {
if (key.startsWith(METRICS_MODE_COLUMN_CONF_PREFIX)) {
String columnAlias = key.replaceFirst(METRICS_MODE_COLUMN_CONF_PREFIX, "");
if (renamedColumns.get(columnAlias) != null) {
// The name has changed.
String newKey =
METRICS_MODE_COLUMN_CONF_PREFIX + renamedColumns.get(columnAlias);
updatedProperties.put(newKey, props.get(key));
} else if (!deletedColumns.contains(columnAlias)) {
// Copy over the original
updatedProperties.put(key, props.get(key));
}
// Implicit drop if deleted
} else {
// Not a metric property
updatedProperties.put(key, props.get(key));
}
});
return updatedProperties;
}
}

/**
* Creates a metrics config from table configuration.
*
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -39,6 +40,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -478,9 +480,16 @@ private TableMetadata applyChangesToMetadata(TableMetadata metadata) {
updates.keySet().stream()
.filter(id -> !schema.findColumnName(id).equals(newSchema.findColumnName(id)))
.collect(Collectors.toMap(schema::findColumnName, newSchema::findColumnName));
Map<String, String> updatedProperties =
MetricsConfig.updateProperties(newMetadata.properties(), deletedColumns, renamedColumns);
newMetadata = newMetadata.replaceProperties(updatedProperties);
if (!deletedColumns.isEmpty() || !renamedColumns.isEmpty()) {
Set<String> columnProperties =
ImmutableSet.of(
TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX,
TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);
Map<String, String> updatedProperties =
PropertyUtil.applySchemaChanges(
newMetadata.properties(), deletedColumns, renamedColumns, columnProperties);
newMetadata = newMetadata.replaceProperties(updatedProperties);
}
}

return newMetadata;
Expand Down
40 changes: 40 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.iceberg.util;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class PropertyUtil {

Expand Down Expand Up @@ -92,4 +95,41 @@ public static Map<String, String> propertiesWithPrefix(
.filter(e -> e.getKey().startsWith(prefix))
.collect(Collectors.toMap(e -> e.getKey().replaceFirst(prefix, ""), Map.Entry::getValue));
}

public static Map<String, String> applySchemaChanges(
Map<String, String> properties,
List<String> deletedColumns,
Map<String, String> renamedColumns,
Set<String> columnProperties) {
if (properties.keySet().stream()
.noneMatch(key -> columnProperties.stream().anyMatch(key::startsWith))) {
return properties;
} else {
Map<String, String> updatedProperties = Maps.newHashMap();
properties
.keySet()
.forEach(
key -> {
String prefix =
columnProperties.stream().filter(key::startsWith).findFirst().orElse(null);

if (prefix != null) {
String columnAlias = key.replaceFirst(prefix, "");
if (renamedColumns.get(columnAlias) != null) {
// The name has changed.
String newKey = prefix + renamedColumns.get(columnAlias);
updatedProperties.put(newKey, properties.get(key));
} else if (!deletedColumns.contains(columnAlias)) {
// Copy over the original.
updatedProperties.put(key, properties.get(key));
}
// Implicit drop if deleted.
} else {
updatedProperties.put(key, properties.get(key));
}
});

return updatedProperties;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;

import java.util.Objects;
import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -199,6 +201,27 @@ public void testModificationWithMetricsMetrics() {
table.properties().get(TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + "bloop"));
}

@Test
public void testModificationWithParquetBloomConfig() {
table
.updateProperties()
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id", "true")
.commit();

table.updateSchema().renameColumn("id", "ID").commit();
Assert.assertNotNull(
"Parquet bloom config for new column name ID should exists",
table.properties().get(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "ID"));
Assert.assertNull(
"Parquet bloom config for old column name id should not exists",
table.properties().get(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id"));

table.updateSchema().deleteColumn("ID").commit();
Assert.assertNull(
"Parquet bloom config for dropped column name ID should not exists",
table.properties().get(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "ID"));
}

@Test
public void testDeleteAndAddColumnReassign() {
NameMapping mapping = MappingUtil.create(table.schema());
Expand Down

0 comments on commit 1526c1f

Please sign in to comment.