Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] - Issue while deserializing the message by using Schema Registry with Forward compatibility - Schema Registry #22278

Closed
3 tasks
KrupalVanukuri opened this issue Jun 14, 2021 · 16 comments · Fixed by #26592
Assignees
Labels
Azure.Core azure-core bug This issue requires a change to an existing behavior in the product in order to be resolved. Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Schema Registry

Comments

@KrupalVanukuri
Copy link

KrupalVanukuri commented Jun 14, 2021

Describe the bug

I am working on EventHub Schema Registry with Forward compatibility. When I have same avro schema (let say V1) on both Producer and consumer applications, everything working fine. But when ever I do schema evolution by adding some new field(s) to Producer schema (say V2) and Consumer still using old schema (V1), getting an exception on Consumer application because newly added filed not found in consumer avro classes to map the field. I believe this issue is because of DatumReader created from writerSchema, not from ReaderSchema.

Note: Avro classes are SpecificRecord type and SpecifiRecord look for order/position of fields.

Exception or Stack Trace

2021.06.09 04:12:39,600 org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 ERROR com.azure.core.util.logging.ClientLogger.performLogging(ClientLogger.java:350) - Error deserializing Avro message.
java.lang.IllegalStateException: Error deserializing Avro message.
      at com.azure.data.schemaregistry.avro.AvroSchemaRegistryUtils.decode(AvroSchemaRegistryUtils.java:134) ~[azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?]
      at com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer.lambda$deserializeAsync$1(SchemaRegistryAvroSerializer.java:101) ~[azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?]
      at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:169) [reactor-core-3.4.0.jar!/:3.4.0]
      at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) [reactor-core-3.4.0.jar!/:3.4.0]
      at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) [reactor-core-3.4.0.jar!/:3.4.0]
      at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar!/:3.4.0]
      at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199) [reactor-core-3.4.0.jar!/:3.4.0]
      at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) [reactor-core-3.4.0.jar!/:3.4.0]
      at reactor.core.publisher.Mono.subscribe(Mono.java:3972) [reactor-core-3.4.0.jar!/:3.4.0]
      at reactor.core.publisher.Mono.block(Mono.java:1678) [reactor-core-3.4.0.jar!/:3.4.0]
      at com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer.deserialize(SchemaRegistryAvroSerializer.java:50) [azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?]
      at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:66) [azure-schemaregistry-kafka-avro-1.0.0-beta.4.jar!/:?]
      at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) [kafka-clients-2.6.0.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) [kafka-clients-2.6.0.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) [kafka-clients-2.6.0.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) [kafka-clients-2.6.0.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) [kafka-clients-2.6.0.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) [kafka-clients-2.6.0.jar!/:?]
      at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) [kafka-clients-2.6.0.jar!/:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) [kafka-clients-2.6.0.jar!/:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) [kafka-clients-2.6.0.jar!/:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) [kafka-clients-2.6.0.jar!/:?]
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1238) [spring-kafka-2.6.3.jar!/:2.6.3]
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133) [spring-kafka-2.6.3.jar!/:2.6.3]
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1054) [spring-kafka-2.6.3.jar!/:2.6.3]
      at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
      at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
      at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IndexOutOfBoundsException: Invalid index: 2
     at com.aa.opshub.test.Employee.put(Employee.java:111) ~[classes!/:0.0.1-SNAPSHOT]
      at org.apache.avro.generic.GenericData.setField(GenericData.java:816) ~[avro-1.9.2.jar!/:1.9.2]
      at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.2.jar!/:1.9.2]
      at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[avro-1.9.2.jar!/:1.9.2]
      at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.2.jar!/:1.9.2]
      at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[avro-1.9.2.jar!/:1.9.2]
      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[avro-1.9.2.jar!/:1.9.2]
      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.9.2.jar!/:1.9.2]
      at com.azure.data.schemaregistry.avro.AvroSchemaRegistryUtils.decode(AvroSchemaRegistryUtils.java:131) ~[azure-data-schemaregistry-avro-1.0.0-beta.4.jar!/:?]
      ... 27 more

To Reproduce

Steps:

  1. Created schema group with Forward compatibility in portal

  2. Defined a schema v1 at both producer and consumer applications and generated classes using avro-maven-plugin.

  3. Produced the message, schema is auto registered in schema registry.

  4. Consumer received the message successfully.

  5. Added new field called middleName in schema (v2) and generated the classes at only producer application.

  6. No changes made at consumer application, So consumer is still having schema v1 and corresponding generated classes.

  7. Produced the message with schema v2 and schema v2 got auto registered in schema registry.

  8. Now Consumer application throwing error because newly added filed not found in its corresponding avro class to map the field. Avro classes are SpecificRecord type and SpecifiRecord look for order/position of fields.

Code Snippet

https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/AvroSchemaRegistryUtils.java

Method:

private <T> DatumReader<T> getDatumReader(Schema writerSchema) {
        boolean writerSchemaIsPrimitive = AvroSchemaUtils.getPrimitiveSchemas().values().contains(writerSchema);
        // do not use SpecificDatumReader if writerSchema is a primitive
        if (avroSpecificReader && !writerSchemaIsPrimitive) {
            return new SpecificDatumReader<>(writerSchema);
        } else {
            return new GenericDatumReader<>(writerSchema);
        }
    }

In above method, DatumReader created from writerSchema, So consumer application expecting

Expected behavior

  1. To deserialize the payload/message, I believe DatumReader needs to be created from readerSchema. If reader schema is null then take the avro SpecificRecord class name from writer schema and from that we can get Schema.

sample ex: (not full solution)

    private <T> DatumReader<T> getDatumReader(Schema writerSchema, Schema readerSchema) {
        boolean writerSchemaIsPrimitive = AvroSchemaUtils.getPrimitiveSchemas().values().contains(writerSchema);
        // do not use SpecificDatumReader if writerSchema is a primitive
        if (avroSpecificReader && !writerSchemaIsPrimitive) {
        	if (readerSchema == null) {
        		Class<SpecificRecord> readerClass = SpecificData.get().getClass(writerSchema);
                        readerSchema = ((SpecificRecord)readerClass.newInstance()).getSchema();
                       return (DatumReader)new SpecificDatumReader(writerSchema, readerSchema);
                  }catch(Exception e) {
                	  
                  }
              }
  1. Can we have an option for consumer to pass his schema?

Screenshots

If applicable, add screenshots to help explain your problem.

Setup (please complete the following information):

  • OS: Deploying as AppService developed with SpringBoot and Kafka
  • IDE: STS
  • Library/Libraries:
 -     <dependency> 
      <groupId>com.azure</groupId>
      <artifactId>azure-core-serializer-avro-apache</artifactId>
      <version>1.0.0-beta.3</version>
    </dependency>
    
    <dependency>
      <groupId>com.microsoft.azure</groupId>
      <artifactId>azure-schemaregistry-kafka-avro</artifactId>
      <version>1.0.0-beta.4</version>
    </dependency>

Additional context

Reference Link I used:
[ https://github.com/Azure/azure-schema-registry-for-kafka/tree/master/java/avro/samples ]

Please refer this link for sample config and schemas I used.
Azure/azure-schema-registry-for-kafka#15

Information Checklist

Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Jun 14, 2021
@alzimmermsft alzimmermsft added Azure.Core azure-core Client This issue points to a problem in the data-plane of the library. Schema Registry labels Jun 14, 2021
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Jun 14, 2021
@KrupalVanukuri
Copy link
Author

@srnagar Any priority and timeline on this issue?

@KrupalVanukuri
Copy link
Author

Any updates on this issue ?

@KrupalVanukuri
Copy link
Author

Any updates on this issue ?

@srnagar
Copy link
Member

srnagar commented Nov 22, 2021

@conniey Could you please take a look at this issue?

@KrupalVanukuri
Copy link
Author

@conniey Could you please share any updates on this issue ? we are looking to implement Schema Registry feature in our applications.

@conniey
Copy link
Member

conniey commented Dec 14, 2021

Hey. Thanks for reporting this. I'm looking into it.

@ramya-rao-a ramya-rao-a added this to the [2022] February milestone Jan 5, 2022
@conniey conniey added bug This issue requires a change to an existing behavior in the product in order to be resolved. and removed question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Jan 10, 2022
@KrupalVanukuri
Copy link
Author

When I can expect azure-data-schemaregistry-apacheavro 1.0.0.- beta.9 will release as maven dependency ? Looks like above bug fix in that release.

@conniey
Copy link
Member

conniey commented Feb 15, 2022

@KrupalVanukuri It's been published to Maven Central

@KrupalVanukuri
Copy link
Author

KrupalVanukuri commented Feb 16, 2022

Thanks @conniey. But now I am confused with MessageWithMetadata model class. I am using spring Kafka to push the data to EventHub. Previously I used to send byte[] to Eventhub. Now Do I need to send MessageWithMetadata object to EventHub ?

Is this config correct ?
final Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
properties.put("security.protocol", securityProtocol);
properties.put("sasl.mechanism", saslMechanism);
properties.put("sasl.jaas.config", saslJaasConfig);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
KafkaProducer<String, MessageWithMetadata> producer = new KafkaProducer<String, MessageWithMetadata>(properties);

@KrupalVanukuri
Copy link
Author

I think if I want to use azure-schema-registry-for-kafka, then this fix needs be implemented in azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/AvroSchemaRegistryUtils.java.
(OR) Do you have any other library that works with Kafka where reader schema different from writer schema ?

@KrupalVanukuri
Copy link
Author

Any updates on this ? When the fix can be apply to azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/AvroSchemaRegistryUtils.java ??

@conniey
Copy link
Member

conniey commented Feb 28, 2022

Could you explain a bit why that class needs to be updated? Its intention is as a utility class that shouldn't be exposed for public consumption (the class no longer exists in our repository). The forward compatibility issue was resolved here: #26592

@KrupalVanukuri
Copy link
Author

I am using kafka libraries to publish and consume the data. so I what to use some value serializer class in kafka configuration. When I started my POC I used https://github.com/Azure/azure-schema-registry-for-kafka library that in turn use azure-data-schemaregistry-avro as a dependency . So I found the issue in that dependency library src/main/java/com/azure/data/schemaregistry/avro/AvroSchemaRegistryUtils.java. class.

Now the fix included in azure-data-schemaregistry-apacheavro but it does not have any Serilialzer class that I can use in Kafka configuration. All I see Encoder class. So that's the where I got confused. Do you have any samples that I can refer for SchemaRegistry with Kafka libraries (including forward compatibility fix) ?

@conniey
Copy link
Member

conniey commented Mar 1, 2022

I'll take a look at that repo and get back to you.

@KrupalVanukuri
Copy link
Author

@conniey Do you have any updates on this please ?

@KrupalVanukuri
Copy link
Author

@conniey Any updates on this Please ? Is the fix integrated with azure-schema-registry-for-kafka ?

@github-actions github-actions bot locked and limited conversation to collaborators Apr 12, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Azure.Core azure-core bug This issue requires a change to an existing behavior in the product in order to be resolved. Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Schema Registry
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants