Loading...

Use python SDK to send and receive events with schema registry in azure event hub

Use python SDK to send and receive events with schema registry in azure event hub

This blog is the complement of another blog which is related to Azure Event Hub Schema Registry. As we known, it’s not supported to migrate Confluent(Kafka) schema registry into Azure schema registry directly. We need to create and manage the scheme registry in the azure event hub separately. The good news is Azure Event Hub supplies multiple client sdks for us to use to serialize and deserialize payloads containing schema registry identifiers and avro-encoded data. In this section, I’d like to share how to use python sdk to send or receive events with schema registry in the Azure Event Hub mostly.  

 

Prerequisites:

1.Create a schema registry group in the event hub portal.

You could refer to this official guidance to create schema registry group.

 

2.Install required python packages with pip tool

a.pip install azure-schemaregistry-avroencoder

The main package we will use below.

 

b.pip install azure-identity

Authentication is required to take schema registry group and create schema registry. Hence, we need use TokenCredential protocol of AAD credential.

 

c.pip install aiohttp

We need installing an async transport to use aysnc API.

 

3.To implement the TokenCredential authentication flow mentioned above, the following credential types if enabled will be tried in order:

  • EnvironmentCredential
  • ManagedIdentityCredential
  • SharedTokenCacheCredential
  • VisualStudioCredential
  • VisualStudioCodeCredential
  • AzureCliCredential
  • AzurePowerShellCredential
  • InteractiveBrowserCredential

In test demo below, EnvironmentCredential is used, hence we need to register an AAD application to get tenant id, client id and client secret information.

 

4.If we want to pass EventData as message type while encoding,  we also need to make sure that we have installed azure-eventhub>=5.9.0 to use azure.eventhub.EventData module class.

 

Test Demo:

1.Client initialization

import os from azure.eventhub import EventHubConsumerClient from azure.schemaregistry import SchemaRegistryClient from azure.schemaregistry.encoder.avroencoder import AvroEncoder from azure.identity import DefaultAzureCredential os.environ["AZURE_TENANT_ID"] = "***" os.environ["AZURE_CLIENT_ID"] = "***" os.environ["AZURE_CLIENT_SECRET"] = ***" token_credential = DefaultAzureCredential() fully_qualified_namespace = '<event hub namespace name>.servicebus.windows.net' group_name = '<schema registry group name>' eventhub_connection_str = '<Connection String>' eventhub_name = '<the event hub entity name we need to access>'

 

2.Send event data with encoded content

definition = """ {"namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }""" schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential) avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True) eventhub_producer = EventHubProducerClient.from_connection_string( conn_str=eventhub_connection_str, eventhub_name=eventhub_name ) with eventhub_producer, avro_encoder: event_data_batch = eventhub_producer.create_batch() dict_content = {"name": "John", "favorite_number": 20, "favorite_color": "blue"} event_data = avro_encoder.encode(dict_content, schema=definition) event_data_batch.add(event_data) eventhub_producer.send_batch(event_data_batch)

 

If we set the auto_register parameter as true, it registers new schemas passed to encode. We can check the created schema registry in event hub portal, the schema name is combined by value of namespace and name property in the definition we set. 

3.png

 

If we observe that the created event data, the event text content is encoded as binary. In fact, the avro_encoder.encode method is using underlying BinaryEncoder function to encode/decode the message data. 

4.png

 

Once the schema registry is created, we can’t add extra property which is not contained in the standard schema definition.

For example, if we set dictionary content as:

dict_content = {"name": "Bob1", "favorite_number": 7, "favorite_color": "red","jayName":"jay"}

 

we will encounter following exception:

1.png

 

If we miss required property, it won’t passed through as well. For example, if we set dictionary content as:

dict_content = {"name": "Bob", "favorite_number": 7}

we will encounter following exception:

 

2.png

 

3.Receive event data with decoded content

eventhub_consumer = EventHubConsumerClient.from_connection_string( conn_str=eventhub_connection_str, consumer_group='$Default', eventhub_name=eventhub_name, ) def on_event(partition_context, event): decoded_content = avro_encoder.decode(event) print(decoded_content) with eventhub_consumer, avro_encoder: eventhub_consumer.receive(on_event=on_event, starting_position="-1")

 

Jay_Gong_2-1668353026903.png

 

We could also get sync and async sample code snippets from this link.

Published on:

Learn more
Azure PaaS Blog articles
Azure PaaS Blog articles

Azure PaaS Blog articles

Share post:

Related posts

Azure SDK Release (January 2026)

Azure SDK releases every month. In this post, you'll find this month's highlights and release notes. The post Azure SDK Release (January 2026)...

7 hours ago

Azure Cosmos DB TV Recap – From Burger to Bots – Agentic Apps with Cosmos DB and LangChain.js | Ep. 111

In Episode 111 of Azure Cosmos DB TV, host Mark Brown is joined by Yohan Lasorsa to explore how developers can build agent-powered application...

13 hours ago

Accelerate Your Cosmos DB Infrastructure with GitHub Copilot CLI and Azure Cosmos DB Agent Kit

Modern infrastructure work is increasingly agent driven, but only if your AI actually understands the platform you’re deploying. This guide sh...

1 day ago

Accelerate Your Cosmos DB Infrastructure with GitHub Copilot CLI and Azure Cosmos DB Agent Kit

Modern infrastructure work is increasingly agent driven, but only if your AI actually understands the platform you’re deploying. This guide sh...

1 day ago

SharePoint: Migrate the Maps web part to Azure Maps

The SharePoint Maps web part will migrate from Bing Maps to Azure Maps starting March 2026, completing by mid-April. Key changes include renam...

1 day ago

Azure Cosmos DB TV Recap: Supercharging AI Agents with the Azure Cosmos DB MCP Toolkit (Ep. 110)

In Episode 110 of Azure Cosmos DB TV, host Mark Brown is joined by Sajeetharan Sinnathurai to explore how the Azure Cosmos DB MCP Toolkit is c...

6 days ago

Introducing the Azure Cosmos DB Agent Kit: Your AI Pair Programmer Just Got Smarter

The Azure Cosmos DB Agent Kit is an open-source collection of skills that teaches your AI coding assistant (GitHub Copilot, Claude Code, Gemin...

7 days ago
Stay up to date with latest Microsoft Dynamics 365 and Power Platform news!
* Yes, I agree to the privacy policy