-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
how to pass schema version as kafka property #15
Comments
Any suggestions please ? |
Any suggestions on how schema evolve works? Created a schema group with Forward compatibility and uploaded a schema with initial version 1 in Azure portal. Now both producer and consumer applications are working fine. Version 2:
} Version 1: { |
The TL;DR answer to your question is that typically it is not recommended to pass schema version as Kafka properties, but rather, you should follow the typical Avro model and generate a versioned java class of your Avro schema, and let the internal mechanics of the schema registry to register the schema for you. ========================= In particular, in Kafka, typically produce/fetch using a schema is more or less about creating Avro classes for the given schema, rather than pointing a config to a schema endpoint. Your typical flow should be something like the following:
As for schema evolution, most people typically go through this kind of life cycle.
Note that since each data you produce and fetch are all embeded with the schema id, so data produce/fetch in step (1) will have a different schema id than data produce/fetch in step (3) - This is how the data can still be de/serialized even though schema has evolved. Also note that your generated java files (AvroUser.java and AvroUserV2.java) is basically the code representation of your schema evolution. |
Hi Eric,
I tired below process and getting error. Please let us know if you can available for 30 min to walk through our applications.
Steps:
1. Created schema group with Forward compatibility in portal
2. Defined the schema v1 (with only 2 fields) at both producer and consumer applications and generated classes using avro-maven-plugin.
3. Produced the message, schema is auto registered in schema registry.
4. Consumer received the message successfully.
1. Added new field called middleName in schema (v2) and generated the classes at only producer application.
2. No changes made at consumer application, So consumer is still having schema v1 and corresponding generated classes.
3. Produced the message with schema v2 and schema v2 got auto registered in schema registry.
4. Now Consumer application throwing error because newly added filed not found in its corresponding avro class to map the field. Avro classes are SpecificRecord type and SpecifiRecord look for order/position of fields.
2021.06.09 04:12:39,600 org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 ERROR com.azure.core.util.logging.ClientLogger.performLogging(ClientLogger.java:350) - Error deserializing Avro message.
java.lang.IllegalStateException: Error deserializing Avro message.
at com.azure.data.schemaregistry.avro.AvroSchemaRegistryUtils.decode(AvroSchemaRegistryUtils.java:134) ~[azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?]
at com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer.lambda$deserializeAsync$1(SchemaRegistryAvroSerializer.java:101) ~[azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:169) [reactor-core-3.4.0.jar!/:3.4.0]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) [reactor-core-3.4.0.jar!/:3.4.0]
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) [reactor-core-3.4.0.jar!/:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar!/:3.4.0]
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199) [reactor-core-3.4.0.jar!/:3.4.0]
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) [reactor-core-3.4.0.jar!/:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3972) [reactor-core-3.4.0.jar!/:3.4.0]
at reactor.core.publisher.Mono.block(Mono.java:1678) [reactor-core-3.4.0.jar!/:3.4.0]
at com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer.deserialize(SchemaRegistryAvroSerializer.java:50) [azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?]
at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:66) [azure-schemaregistry-kafka-avro-1.0.0-beta.4.jar!/:?]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) [kafka-clients-2.6.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) [kafka-clients-2.6.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) [kafka-clients-2.6.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) [kafka-clients-2.6.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) [kafka-clients-2.6.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) [kafka-clients-2.6.0.jar!/:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) [kafka-clients-2.6.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) [kafka-clients-2.6.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) [kafka-clients-2.6.0.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) [kafka-clients-2.6.0.jar!/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1238) [spring-kafka-2.6.3.jar!/:2.6.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133) [spring-kafka-2.6.3.jar!/:2.6.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1054) [spring-kafka-2.6.3.jar!/:2.6.3]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IndexOutOfBoundsException: Invalid index: 2
at com.aa.opshub.test.Employee.put(Employee.java:111) ~[classes!/:0.0.1-SNAPSHOT]
at org.apache.avro.generic.GenericData.setField(GenericData.java:816) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.9.2.jar!/:1.9.2]
at com.azure.data.schemaregistry.avro.AvroSchemaRegistryUtils.decode(AvroSchemaRegistryUtils.java:131) ~[azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?]
... 27 more
Consumer Config:
public Map<String, Object> configs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
properties.put("security.protocol", securityProtocol);
properties.put("sasl.mechanism", saslMechanism);
properties.put("sasl.jaas.config", saslJaasConfig);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class);
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "registryurl");
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
properties.put(KafkaAvroDeserializerConfig.AVRO_SPECIFIC_READER_CONFIG, true);
return properties;
}
Producer: Schema v2:
{
"type": "record",
"namespace": "com.aa.opshub.test",
"name": "Employee",
"fields": [
{
"name": "firstName",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "middleName",
"type": "string"
}
]
}
Consumer: Schema v1
{
"type" : "record",
"namespace" : "com.aa.opshub.test",
"name" : "Employee",
"fields" : [
{ "name" : "firstName" , "type" : "string" },
{ "name" : "age", "type" : "int" }
]
}
Method throwing error in Employee.java (Avro class)
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: firstName = (java.lang.CharSequence)value$; break;
case 1: age = (java.lang.Integer)value$; break;
default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
}
}
Thanks
Krupal
From: Eric Lam (MSFT) ***@***.***>
Sent: Tuesday, June 8, 2021 7:22 PM
To: Azure/azure-schema-registry-for-kafka ***@***.***>
Cc: Vanukuri, Krupal - Vendor ***@***.***>; Author ***@***.***>
Subject: Re: [Azure/azure-schema-registry-for-kafka] how to pass schema version as kafka property (#15)
Since you are in the Kafka space, the integration with schema is typically done through code generation, rather than through portal experience (The portal experience gives you a good UI tool to look at a schema after it is written). I would suggest you run through our sample at https://github.com/Azure/azure-schema-registry-for-kafka/tree/master/java/avro/samples to give you a better understanding of how the integration is typically done.
In particular, in Kafka, typically produce/fetch using a schema is more or less about creating Avro classes for the given schema, rather than pointing a config to a schema endpoint. Your typical flow should be something like the following:
1. Define your schema (let say AvroUser.avsc)
2. Generate the java classes using the schema (most people use avro-maven-plugin<https://mvnrepository.com/artifact/org.apache.avro/avro-maven-plugin> to generate the java classes in their project)
3. After step 2 you should now have a java class that encapsulate your schema (e.g. AvroUser.java), which you can use that class in your code in both producer and consumer to write your data using the schema - again, you should refer to our sample<https://github.com/Azure/azure-schema-registry-for-kafka/tree/master/java/avro/samples> for details.
4. Note: specific to Event Hub Schema Registry, you should always pre-create the schema group that you specified in schema.group config in portal before running your code/sample.
As for schema evolution, most people typically go through this kind of life cycle.
1. Let say you start with AvroUser.avsc. Producer and Consumer uses that schema (the generated class) to produce records - since you configured to use our schema registry, the producer and consumer will register that schema into the registry using the namespace and name you specified in your avsc.
2. At some point you want to create version 2 of AvroUser.avsc - you make modification to the schema, but keep the same namesapce and name, and let say you now have a file called AvroUserV2.avsc.
3. When you compile your code, avro-maven-plugin will generate AvroUserV2.java which now you will use in your producer/consumer code. When you run your code, just like in step 1, the library will automatically register that new schema with the registry - because the namespace and name are the same, it will now know this is the new version.
Note that since each data you produce and fetch are all embeded with the schema id, so data produce/fetch in step (1) will have a different schema id than data produce/fetch in step (3) - This is how the data can still be de/serialized even though schema has evolved.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub<#15 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/AT7S3HEEHZRC6ANDIHOW4EDTR2XZRANCNFSM45JGF24Q>.
|
When I look at the below link, I see DatumReader created based on writer schema. I believe to deserialize the payload/message, DatatumReader should consider ReaderSchema. Method Name: |
Your repro looks correct to me as a forward compatibility sample. I would suggest filing a github issues over at the azure-sdk-for-java repo with your forward compatibility repro step, so that the SDK team can track this and fix it as needed. |
@hmlam any updates on this azure schema registry for kafka with Forward compatibility ? |
If schema has multiple versions, as a producer/consumer how to pass/refer particular version from kafka properties or kafka configuration ? Can I use something like below ?
consumer:
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "https://myeventhubNamespace:443/$schemagroups/mygroupName/schemas/mySchemaName/versions/2");
producer:
full method:
public KafkaTemplate<Object, Object> getKafkaTemplate() {
final Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
properties.put("security.protocol", securityProtocol);
properties.put("sasl.mechanism", saslMechanism);
properties.put("sasl.jaas.config", saslJaasConfig);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer.class);
Is this recommended? And does port no 443 will change on deleting and creating the EH namespace with same name ?
The text was updated successfully, but these errors were encountered: