Loading...

Extracting relational schema from streaming data containing complex JSON documents

Image

devangshah_0-1662451712165.png

Author: @devangshah is a Principal Program Manager for Data Explorer in the Synapse Customer Success Engineering (CSE) team. 

 

In the world of IoT devices, industrial historians, infrastructure and application logs, and metrics, machine-generated or software-generated telemetry, there are often scenarios where the upstream data producer produces data in non-standard schemas, formats, and structures that often make it difficult to analyze the data contained in these at scale. Azure Data Explorer provides some useful features to run meaningful, fast, and interactive analytics on such heterogenous data structures and formats. 

 

In this blog, we're taking an example of a complex JSON file as shown in the screenshot below. You can access the JSON file from this GitHub page to try the steps below.

 

devangshah_0-1663323784929.png

 

This JSON has keys and values in two different arrays. To convert this JSON document into the relational schema as shown below, we will use the approach of extracting the 'structure' object into one table and the 'kpi_data' object into another table and then join the two tables using the GUID.

 

devangshah_0-1662295867422.png

 

Step 1: Use ADX Ingestion Hub (also called One Click Ingestion), to upload sample data and let ADX understand the schema of the JSON document. With multi-level JSON, you can extract multiple objects within the JSON document. However, in the example above, this will not work and hence we will write KQL in step 2.

 

devangshah_1-1662296167625.png

 

Step 2: Since there are 2 nested JSON arrays, we will use mv-expand operator to expand these dynamic arrays. We will first use the 'project' operator to select the columns of interest and then apply mv-expand on the column containing nested arrays.

 

SampleTable1 | project Timestamp = from, Level1_id = structure.id, Level1_Name=structure.name, Level1_kpi_type=structure.kpi_type, kpi_structure=structure.kpi_structure | mv-expand kpi_structure

 

 

Upon executing these KQL statements, we see the following output:

 

devangshah_2-1662296636057.png

 

Step 3: We will apply the mv-expand operator again to expand the next array. We're also using the 'extend' operator to extract columns from the expanded JSON arrays. 

 

SampleTable1 | project Timestamp = from, Level1_id = structure.id, Level1_Name=structure.name, Level1_kpi_type=structure.kpi_type, kpi_structure=structure.kpi_structure | mv-expand kpi_structure | extend Level2_data_type = kpi_structure.data_type, Level2_Id = kpi_structure.id, Level2_kpi_type = kpi_structure.kpi_type, Level2_name = kpi_structure.name, new_kpi_structure=kpi_structure.kpi_structure | mv-expand new_kpi_structure | extend Level3_data_type = new_kpi_structure.data_type, Level3_Id = tostring(new_kpi_structure.id), Level3_name = new_kpi_structure.name, Level3_Unit = new_kpi_structure.unit | project-away kpi_structure, new_kpi_structure

 

 

 

The output of executing these statements is:

devangshah_3-1662297841112.png

 

Step 4: Use the mv-expand operator again to expand the array inside the 'kpi_data' JSON object

 

 

SampleTable1 | mv-expand kpi_data | project kpi_values = kpi_data.values

 

 

The output of executing these statements is:

devangshah_4-1662298489184.png

 

Step 5: In this JSON document, the value (in this case, 18806s) is referenced using a GUID key. Since GUID key can be different for each value, we will use the bag_keys() function to transform this JSON structure into a column of keys and values.

 

SampleTable1 | mv-expand kpi_data | project kpi_values = kpi_data.values | extend Level3_Id = tostring(bag_keys(kpi_values)[0]) | extend key_value = kpi_values[Level3_Id]

 

 

 

 

The output of executing these statements is:

devangshah_5-1662298811990.png

 

Step 6: We will use the mv-apply operator to execute some of the statements in step 5 on each row that can be present in the 'kpi_data' JSON object.

 

SampleTable1 | mv-expand kpi_data | project kpi_values = kpi_data.values | mv-apply kpi_values on ( extend Level3_Id = tostring(bag_keys(kpi_values)[0]) | project Level3_Id, key_value = kpi_values[Level3_Id] )

 

 

 

In the case of a single row, these statements generate the same output as Step 5. However, in the case of multiple rows, you will get the desired output for each row as shown below.

devangshah_6-1662299573780.png

 

Step 7: Join the two tables created in Step 3 (Keys) and Step 6 (Values) to retrieve a complete table containing key-value pairs that can be easily queried. After the join, we divided the value column into 3 columns having distinct data types Integer, Boolean, and String. Doing this will allow the data analysts and scientists to run the calculation, aggregation, and other queries more effectively without having to worry about data type conversion at each stage.

 

let Keys = SampleTable1 | project Timestamp = from, Level1_id = structure.id, Level1_Name=structure.name, Level1_kpi_type=structure.kpi_type, kpi_structure=structure.kpi_structure | mv-expand kpi_structure | extend Level2_data_type = kpi_structure.data_type, Level2_Id = kpi_structure.id, Level2_kpi_type = kpi_structure.kpi_type, Level2_name = kpi_structure.name, new_kpi_structure=kpi_structure.kpi_structure | mv-expand new_kpi_structure | extend Level3_data_type = new_kpi_structure.data_type, Level3_Id = tostring(new_kpi_structure.id), Level3_name = new_kpi_structure.name, Level3_Unit = new_kpi_structure.unit; let Values = SampleTable1 | mv-expand kpi_data | project kpi_values = kpi_data.values | mv-apply kpi_values on ( extend Level3_Id = tostring(bag_keys(kpi_values)[0]) | project Level3_Id, key_value = kpi_values[Level3_Id] ); Keys | join kind = leftouter Values on $left.Level3_Id==$right.Level3_Id | extend int_val = iff(tostring(Level3_data_type) in ("INTEGER","FILL_LEVEL","COUNT"), key_value.integer_value,0), str_val=iff(tostring(Level3_data_type) == "STRING", key_value.string_value,""), bool_val=iff(tostring(Level3_data_type) == "BOOLEAN", iff(isempty(tobool(key_value.boolean_value)),false,tobool(key_value.boolean_value)),false), duration=iff(tostring(Level3_data_type) == "DURATION", tolong(trim("s",tostring(key_value.duration_value))),0) | project Timestamp, Level1_Name, Level1_kpi_type, Level2_name, Level3_name, Level3_data_type, Level3_Unit, int_val, str_val,bool_val,duration

 

 

 

 

devangshah_7-1662299606317.png

 

Possible next steps:

  • You can store these KQL statements as a User-defined Function that can then be used by others in your organization
  • Once saved as a User-defined Function, it can also be referenced in an Update Policy that will then execute this function on every incoming JSON document in your source table. Update Policy is a lightweight Extract-Load-Transform capability of Azure Data Explorer that can help you transform raw messages into curated data. 

 

In summary, with Kusto Query Language, you can extract data from a complex JSON document containing nested arrays and objects using only a few lines of code.

 

Our team publishes blog(s) regularly and you can find all these blogs here: https://aka.ms/synapsecseblog

For a deeper level of understanding of Synapse implementation best practices, please refer to our Success by Design (SBD) site: https://aka.ms/Synapse-Success-By-Design

Learn more
Author image

Azure Synapse Analytics Blog articles

Azure Synapse Analytics Blog articles

Share post:

Related

Stay up to date with latest Microsoft Dynamics 365 and Power Platform news!

* Yes, I agree to the privacy policy