Skip to content

Commit

Permalink
Add nested field support for Avro [HZ-2509, HZ-3062] (hazelcast#25269)
Browse files Browse the repository at this point in the history
This PR
1. refactors usages of CREATE TYPE and INSERT INTO statements,
2. refactors usages of factoryId/classId/version triplets,
3. decouples serialization format from user-defined types,
4. adds nested field support for Avro,
5. fixes HazelcastObjectType#digest and deletes HazelcastObjectTypeReference.

Notes:
- Type options always override mapping options. Specifically, if the mapping has only "__key" or "this" fields, and both the mapping and type define a schema ID, the one defined by the type is used. This also means that, in such situations, as long as the type defines a schema ID, the mapping does not need to.

Breaking changes:
- User must drop all user-defined types (UDTs) and mappings with UDTs before the rolling upgrade, and recreate them with the new semantics after upgrading.

Closes HZ-2509
Closes HZ-3062
  • Loading branch information
burakgok authored Nov 3, 2023
1 parent c6bfa65 commit 96fd958
Show file tree
Hide file tree
Showing 86 changed files with 3,134 additions and 3,490 deletions.
6 changes: 4 additions & 2 deletions hazelcast-sql/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ SqlCreate SqlCreateType(Span span, boolean replace) :
name = CompoundIdentifier()
columns = TypeColumns()

<OPTIONS>
sqlOptions = SqlOptions()
[
<OPTIONS>
sqlOptions = SqlOptions()
]
{
return new SqlCreateType(
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.AbstractJetInstance;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.util.ReflectionUtils;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.AlterJobPlan;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateIndexPlan;
Expand All @@ -60,12 +59,9 @@
import com.hazelcast.jet.sql.impl.SqlPlanImpl.IMapUpdatePlan;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.SelectPlan;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.ShowStatementPlan;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
import com.hazelcast.jet.sql.impl.parse.SqlShowStatement.ShowStatementTarget;
import com.hazelcast.jet.sql.impl.schema.DataConnectionResolver;
import com.hazelcast.jet.sql.impl.schema.TableResolverImpl;
import com.hazelcast.jet.sql.impl.schema.TypeDefinitionColumn;
import com.hazelcast.jet.sql.impl.schema.TypesUtils;
import com.hazelcast.jet.sql.impl.validate.UpdateDataConnectionOperation;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
Expand All @@ -74,7 +70,6 @@
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.nio.serialization.ClassDefinition;
import com.hazelcast.partition.PartitioningStrategy;
import com.hazelcast.partition.strategy.AttributePartitioningStrategy;
import com.hazelcast.partition.strategy.DefaultPartitioningStrategy;
Expand All @@ -97,7 +92,6 @@
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.hazelcast.sql.impl.schema.dataconnection.DataConnectionCatalogEntry;
import com.hazelcast.sql.impl.schema.type.Type;
import com.hazelcast.sql.impl.schema.type.TypeKind;
import com.hazelcast.sql.impl.schema.view.View;
import com.hazelcast.sql.impl.security.SqlSecurityContext;
import com.hazelcast.sql.impl.state.QueryResultRegistry;
Expand All @@ -116,7 +110,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand All @@ -135,7 +128,6 @@
import static com.hazelcast.jet.impl.JetServiceBackend.SQL_ARGUMENTS_KEY_NAME;
import static com.hazelcast.jet.impl.util.ExceptionUtil.isTopologyException;
import static com.hazelcast.jet.impl.util.Util.getNodeEngine;
import static com.hazelcast.jet.impl.util.Util.getSerializationService;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateDataConnectionPlan;
import static com.hazelcast.jet.sql.impl.parse.SqlCreateIndex.UNIQUE_KEY;
import static com.hazelcast.jet.sql.impl.parse.SqlCreateIndex.UNIQUE_KEY_TRANSFORMATION;
Expand Down Expand Up @@ -605,7 +597,7 @@ SqlResult execute(IMapSelectPlan plan,
long timeout,
@Nonnull SqlSecurityContext ssc) {
List<Object> args = prepareArguments(plan.parameterMetadata(), arguments);
InternalSerializationService serializationService = getSerializationService(hazelcastInstance);
InternalSerializationService serializationService = Util.getSerializationService(hazelcastInstance);
ExpressionEvalContext evalContext = ExpressionEvalContext.createContext(
args,
hazelcastInstance,
Expand Down Expand Up @@ -723,81 +715,8 @@ SqlResult execute(CreateTypePlan plan) {
throw QueryException.error("Experimental feature of creating custom types isn't enabled. To enable, set "
+ SQL_CUSTOM_TYPES_ENABLED + " to true");
}
final String format = plan.options().get(SqlConnector.OPTION_FORMAT);
final Type type;

if (SqlConnector.PORTABLE_FORMAT.equals(format)) {
final Integer factoryId = Optional.ofNullable(plan.option(SqlConnector.OPTION_TYPE_PORTABLE_FACTORY_ID))
.map(Integer::parseInt)
.orElse(null);
final Integer classId = Optional.ofNullable(plan.option(SqlConnector.OPTION_TYPE_PORTABLE_CLASS_ID))
.map(Integer::parseInt)
.orElse(null);
final Integer version = Optional.ofNullable(plan.option(SqlConnector.OPTION_TYPE_PORTABLE_CLASS_VERSION))
.map(Integer::parseInt)
.orElse(0);

if (factoryId == null || classId == null) {
throw QueryException.error("FactoryID and ClassID are required for Portable Types");
}

final ClassDefinition existingClassDef = getSerializationService(hazelcastInstance).getPortableContext()
.lookupClassDefinition(factoryId, classId, version);

if (existingClassDef != null) {
type = TypesUtils.convertPortableClassToType(plan.name(), existingClassDef, catalog);
} else {
if (plan.columns().isEmpty()) {
throw QueryException.error("The given FactoryID/ClassID/Version combination not known to the member. " +
"You need to provide column list for this type");
}

type = new Type();
type.setName(plan.name());
type.setKind(TypeKind.PORTABLE);
type.setPortableFactoryId(factoryId);
type.setPortableClassId(classId);
type.setPortableVersion(version);
type.setFields(new ArrayList<>());

for (int i = 0; i < plan.columns().size(); i++) {
final TypeDefinitionColumn planColumn = plan.columns().get(i);
type.getFields().add(new Type.TypeField(planColumn.name(), planColumn.dataType()));
}
}
} else if (SqlConnector.COMPACT_FORMAT.equals(format)) {
if (plan.columns().isEmpty()) {
throw QueryException.error("Column list is required to create Compact-based Types");
}
type = new Type();
type.setKind(TypeKind.COMPACT);
type.setName(plan.name());
final List<Type.TypeField> typeFields = plan.columns().stream()
.map(typeColumn -> new Type.TypeField(typeColumn.name(), typeColumn.dataType()))
.collect(Collectors.toList());
type.setFields(typeFields);

final String compactTypeName = plan.option(SqlConnector.OPTION_TYPE_COMPACT_TYPE_NAME);
if (compactTypeName == null || compactTypeName.isEmpty()) {
throw QueryException.error("Compact Type Name must not be empty for Compact-based Types.");
}
type.setCompactTypeName(compactTypeName);
} else if (SqlConnector.JAVA_FORMAT.equals(format)) {
final Class<?> typeClass;
try {
typeClass = ReflectionUtils.loadClass(plan.options().get(SqlConnector.OPTION_TYPE_JAVA_CLASS));
} catch (Exception e) {
throw QueryException.error("Unable to load class: '"
+ plan.options().get(SqlConnector.OPTION_TYPE_JAVA_CLASS) + "'", e);
}

type = TypesUtils.convertJavaClassToType(plan.name(), plan.columns(), typeClass);
} else {
throw QueryException.error("Unsupported type format: " + format);
}

final Type type = new Type(plan.name(), plan.columns(), plan.options());
catalog.createType(type, plan.replace(), plan.ifNotExists());

return UpdateSqlResultImpl.createUpdateCountResult(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public interface SqlConnector {

String OPTION_TYPE_PORTABLE_CLASS_VERSION = "portableClassVersion";

String OPTION_TYPE_AVRO_SCHEMA = "avroSchema";

/**
* Value for {@value #OPTION_KEY_FORMAT} and {@value #OPTION_VALUE_FORMAT}
* for Java serialization.
Expand Down Expand Up @@ -558,6 +560,7 @@ default Set<String> nonSensitiveConnectorOptions() {
options.add(OPTION_TYPE_PORTABLE_FACTORY_ID);
options.add(OPTION_TYPE_PORTABLE_CLASS_ID);
options.add(OPTION_TYPE_PORTABLE_CLASS_VERSION);
options.add(OPTION_TYPE_AVRO_SCHEMA);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ protected List<Object[]> rows() {
i + 1,
null, // attribute_default
"YES", // is_nullable
toSqlDataTypeString(field.getQueryDataType()), // data type
field.getQueryDataType().getTypeFamily().equals(QueryDataTypeFamily.VARCHAR)
toSqlDataTypeString(field.getType()), // data type
field.getType().getTypeFamily() == QueryDataTypeFamily.VARCHAR
? Integer.MAX_VALUE
: null, // character_maximum_length
field.getQueryDataType().getTypeFamily().equals(QueryDataTypeFamily.VARCHAR)
field.getType().getTypeFamily() == QueryDataTypeFamily.VARCHAR
? Integer.MAX_VALUE
: null, // character_octet_length
null, // character_set_catalog
Expand All @@ -108,28 +108,28 @@ protected List<Object[]> rows() {
null, // collation_catalog
null, // collation_schema
null, // collation_name
field.getQueryDataType().getTypeFamily().isNumeric()
? getNumericTypePrecision(field.getQueryDataType())
field.getType().getTypeFamily().isNumeric()
? getNumericTypePrecision(field.getType())
: null, // numeric_precision
field.getQueryDataType().getTypeFamily().isNumeric()
field.getType().getTypeFamily().isNumeric()
? 2
: null, // numeric_precision_radix
field.getQueryDataType().getTypeFamily().isNumericInteger()
field.getType().getTypeFamily().isNumericInteger()
? 0
: null, // numeric_scale
field.getQueryDataType().getTypeFamily().isTemporal()
? getTemporalTypePrecision(field.getQueryDataType())
field.getType().getTypeFamily().isTemporal()
? getTemporalTypePrecision(field.getType())
: null, // datetime_precision
null, // interval_type
null, // interval_precision
field.getQueryDataType().isCustomType()
field.getType().isCustomType()
? catalog()
: null, // attribute_udt_catalog
field.getQueryDataType().isCustomType()
field.getType().isCustomType()
? schema
: null, // attribute_udt_schema
field.getQueryDataType().isCustomType()
? field.getQueryDataType().getObjectTypeName()
field.getType().isCustomType()
? field.getType().getObjectTypeName()
: null, // attribute_udt_name
null, // scope_catalog
null, // scope_schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ String dataConnectionName() {
}

Properties kafkaConsumerProperties() {
return PropertiesResolver.resolveConsumerProperties(options);
return PropertiesResolver.resolveConsumerProperties(options,
keyUpsertDescriptor.getSchema(), valueUpsertDescriptor.getSchema());
}

Properties kafkaProducerProperties() {
return PropertiesResolver.resolveProducerProperties(options);
return PropertiesResolver.resolveProducerProperties(options,
keyUpsertDescriptor.getSchema(), valueUpsertDescriptor.getSchema());
}

QueryTargetDescriptor keyQueryDescriptor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.hazelcast.jet.kafka.HazelcastKafkaAvroSerializer;
import com.hazelcast.jet.kafka.impl.HazelcastJsonValueDeserializer;
import com.hazelcast.jet.kafka.impl.HazelcastJsonValueSerializer;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
import com.hazelcast.jet.sql.impl.connector.keyvalue.JavaClassNameResolver;

import java.util.Map;
Expand All @@ -32,8 +31,10 @@
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.AVRO_FORMAT;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.JAVA_FORMAT;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.JSON_FLAT_FORMAT;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_KEY_AVRO_SCHEMA;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_KEY_CLASS;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_KEY_FORMAT;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_VALUE_AVRO_SCHEMA;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_VALUE_CLASS;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_VALUE_FORMAT;

Expand Down Expand Up @@ -84,20 +85,20 @@ final class PropertiesResolver {

private PropertiesResolver() { }

static Properties resolveConsumerProperties(Map<String, String> options) {
static Properties resolveConsumerProperties(Map<String, String> options, Object keySchema, Object valueSchema) {
Properties properties = from(options);

withSerdeConsumerProperties(true, options, properties);
withSerdeConsumerProperties(false, options, properties);
withSerdeConsumerProperties(true, options, keySchema, properties);
withSerdeConsumerProperties(false, options, valueSchema, properties);

return properties;
}

static Properties resolveProducerProperties(Map<String, String> options) {
static Properties resolveProducerProperties(Map<String, String> options, Object keySchema, Object valueSchema) {
Properties properties = from(options);

withSerdeProducerProperties(true, options, properties);
withSerdeProducerProperties(false, options, properties);
withSerdeProducerProperties(true, options, keySchema, properties);
withSerdeProducerProperties(false, options, valueSchema, properties);

return properties;
}
Expand All @@ -118,6 +119,7 @@ private static Properties from(Map<String, String> options) {
private static void withSerdeConsumerProperties(
boolean isKey,
Map<String, String> options,
Object schema,
Properties properties
) {
String deserializer = isKey ? KEY_DESERIALIZER : VALUE_DESERIALIZER;
Expand All @@ -126,12 +128,16 @@ private static void withSerdeConsumerProperties(
if (format == null && isKey) {
properties.putIfAbsent(deserializer, BYTE_ARRAY_DESERIALIZER);
} else if (AVRO_FORMAT.equals(format)) {
properties.putIfAbsent(deserializer, options.containsKey("schema.registry.url")
? CONFLUENT_AVRO_DESERIALIZER : HAZELCAST_AVRO_DESERIALIZER);
if (options.containsKey("schema.registry.url")) {
properties.putIfAbsent(deserializer, CONFLUENT_AVRO_DESERIALIZER);
} else {
properties.putIfAbsent(deserializer, HAZELCAST_AVRO_DESERIALIZER);
properties.put(isKey ? OPTION_KEY_AVRO_SCHEMA : OPTION_VALUE_AVRO_SCHEMA, schema);
}
} else if (JSON_FLAT_FORMAT.equals(format)) {
properties.putIfAbsent(deserializer, BYTE_ARRAY_DESERIALIZER);
} else if (JAVA_FORMAT.equals(format)) {
String clazz = options.get(isKey ? SqlConnector.OPTION_KEY_CLASS : SqlConnector.OPTION_VALUE_CLASS);
String clazz = options.get(isKey ? OPTION_KEY_CLASS : OPTION_VALUE_CLASS);
String deserializerClass = resolveDeserializer(clazz);
if (deserializerClass != null) {
properties.putIfAbsent(deserializer, deserializerClass);
Expand Down Expand Up @@ -170,6 +176,7 @@ private static String resolveDeserializer(String clazz) {
private static void withSerdeProducerProperties(
boolean isKey,
Map<String, String> options,
Object schema,
Properties properties
) {
String serializer = isKey ? KEY_SERIALIZER : VALUE_SERIALIZER;
Expand All @@ -178,12 +185,16 @@ private static void withSerdeProducerProperties(
if (format == null && isKey) {
properties.putIfAbsent(serializer, BYTE_ARRAY_SERIALIZER);
} else if (AVRO_FORMAT.equals(format)) {
properties.putIfAbsent(serializer, options.containsKey("schema.registry.url")
? CONFLUENT_AVRO_SERIALIZER : HAZELCAST_AVRO_SERIALIZER);
if (options.containsKey("schema.registry.url")) {
properties.putIfAbsent(serializer, CONFLUENT_AVRO_SERIALIZER);
} else {
properties.putIfAbsent(serializer, HAZELCAST_AVRO_SERIALIZER);
properties.put(isKey ? OPTION_KEY_AVRO_SCHEMA : OPTION_VALUE_AVRO_SCHEMA, schema);
}
} else if (JSON_FLAT_FORMAT.equals(format)) {
properties.putIfAbsent(serializer, BYTE_ARRAY_SERIALIZER);
} else if (JAVA_FORMAT.equals(format)) {
String clazz = options.get(isKey ? SqlConnector.OPTION_KEY_CLASS : SqlConnector.OPTION_VALUE_CLASS);
String clazz = options.get(isKey ? OPTION_KEY_CLASS : OPTION_VALUE_CLASS);
String serializerClass = resolveSerializer(clazz);
if (serializerClass != null) {
properties.putIfAbsent(serializer, serializerClass);
Expand Down
Loading

0 comments on commit 96fd958

Please sign in to comment.