Loading...

Reading Delta Lake in Dedicated SQL Pool

Reading Delta Lake in Dedicated SQL Pool

In June, Databricks announced that they are open sourcing Delta Lake 2.0Delta Lake is quickly becoming the format of choice in data science and data engineering.

 

To import Delta Lake into a Synapse dedicated SQL Pool you would need Azure Data Factory/Synapse Pipelines or Spark to handle the Delta Lake files.


This is not ideal because it adds extra overheads of complexity, time, and costs.

 

As an intellectual challenge, I wondered if it's possible to import Delta Lake files directly into the dedicated SQL Pool and support features like time-travel. It turned out to be a great little project and a great way of learning about Delta Lake.

 

I have uploaded the script to the Azure Synapse Toolbox Github siteThe Azure Synapse Toolbox is an open-source library of useful tools and scripts to use with Azure Synapse Analytics. 

 

Here is a breakdown of how the script works.

 

  1. Read the _last_checkpoint file, this contains the highest checkpoint version number.
  2. Read all the checkpoint files, these contain the 'Add' and 'Remove' files that need to be included and their timestamps.
  3. Read the latest JSON transaction files (since the last checkpoint), these contain the most recent 'Add' and 'Remove' files that need to be included and their timestamps.
  4. Filter out the 'Add' and 'Remove' files based on the 'modificationTime' and 'deletionTimestamp'
  5. Return the data from all the 'Add' files excluding the 'Remove' files. (Batching up the COPY INTO statements)

 

 

 

/* IF OBJECT_ID('mpmtest') IS NOT NULL BEGIN; DROP TABLE mpmtest END declare @path varchar(400), @dt datetime2, @credential varchar(500),@outputable varchar(500),@display int, @debug int; set @path = 'https://storageaccount.blob.core.windows.net/container/delta/demo/' --set @dt = convert(datetime2,'2022/07/06 18:37:00'); --getdate(); -- for time travel -- set @dt = getdate(); -- for time travel -- set @credential = 'IDENTITY= ''Shared Access Signature'', SECRET = ''___SECRET____'''; set @outputable = 'mpmtest' -- leave empty for temp table set @display = 1; -- if 1 display results set @debug = 1; exec delta @path,@dt,@credential,@outputable,@display, @debug --select count(*) from mpmtest; */ create PROC [dbo].[delta] @folder [varchar](4000), @dt [datetime2], @credential [varchar](4000), @dest_table [varchar](4000), @display int, -- 0 dont display, 1 display @debug int -- 0 dont debug , 1 debug mode AS begin -- version info -- 0.1 - Very basic, but sort of worked. -- 0.2 - Added support for remove files. -- 0.3 - Fixed bug , in authenication -- 0.4 - Improved performance in handling add/remove files -- 0.5 - Added support for checkpoint files -- 0.6 - Changed the json handling, added latest checkpoint handing DECLARE bigint,@tsmil bigint, @json varchar(8000), @tsql varchar(8000); DECLARE @json_remove varchar(8000), @tsql_checkpoint varchar(8000); DECLARE @td_checkpoint1 datetime2, @td_checkpoint2 datetime2; DECLARE @last_checkpoint varchar(25), @jsonversionchar varchar(25),@checkpointversion varchar(25) ; DECLARE @output_sql varchar(max); DECLARE _file_count bigint; -- default: AAD authenication IF (@credential IS NULL) SET @credential = '' IF (@credential <> '') SET @credential = ' CREDENTIAL = (' + @credential + ')' -- default : display results IF @display IS NULL set @display = 1 -- default : debug is off IF @debug IS NULL SET @debug = 0 -- if the datetime is null, get the latest version IF (@dt IS NULL) set @dt = getdate() -- number of milliseconds since 1970 -- Datediff doesn't return a bigint - so I need todo this into 2 parts set = datediff( s, '1970/1/1', convert(date,@dt) ) set = * 1000; set @tsmil = datediff(ms, convert(date,@dt) , @dt) set = + @tsmil; if (@dest_table IS NULL) set @dest_table = '#tmp_output' ---- Holds the raw json information ----------------------------- IF OBJECT_ID('tempdb..#last_checkpoint') IS NOT NULL DROP TABLE #last_checkpoint create table #last_checkpoint ( info varchar(max) ) with (distribution=round_robin, heap) ------ read _last_checkpoint -- This gives us the checkpoint if it exists, this gives us the starting point -- for the json files set @tsql = ' copy into #last_checkpoint (info) from ''' + @folder + '_delta_log/_last_checkpoint'' with ( ' + @credential + ' ,FILE_TYPE = ''CSV'' ,fieldterminator =''0x0b'' ,fieldquote = ''0x0b'' ,rowterminator = ''0x0d'' ) ' if @debug = 1 print @tsql; set @td_checkpoint1 = getdate(); exec(@tsql); set @td_checkpoint2 = getdate(); print 'Loading _last_checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms' if @debug = 1 begin select '_Last_checkpoint', * from #last_checkpoint end select @checkpointversion = ISNULL(JSON_VALUE(info,'$.version'),'00') from #last_checkpoint; if NOT EXISTS(SELECT * FROM #last_checkpoint) BEGIN SET @checkpointversion = '00' END PRINT 'checkpointversion=' +@checkpointversion set @jsonversionchar = '00000000000000000000'; IF OBJECT_ID('tempdb..#delta_checkpoint') IS NOT NULL DROP TABLE #delta_checkpoint create table #delta_checkpoint ( txn varchar(max), [add] varchar(max), [remove] varchar(max), metaData varchar(max), protocol varchar(max) ) with (distribution=round_robin, heap) --- Code to pull the the add / remove files from the checkpoint files. set @tsql_checkpoint = ' copy into #delta_checkpoint from ''' + @folder + '_delta_log/*.checkpoint.parquet'' with ( ' + @credential + ' ,FILE_TYPE = ''PARQUET'', AUTO_CREATE_TABLE = ''ON'' ) ' if @debug = 1 BEGIN print @tsql_checkpoint; END set @td_checkpoint1 = getdate() exec(@tsql_checkpoint); set @td_checkpoint2 = getdate() print 'Loading checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms' SELECT _file_count = count(*) from #delta_checkpoint if @debug = 1 BEGIN print 'No of checkpoint files ' + convert(varchar(10),@checkpoint_file_count); SELECT * FROM #delta_checkpoint order by [add],[remove] END -- Holds the information from the checkpoint files --------- IF OBJECT_ID('tempdb..#delta_files_checkpoint') IS NOT NULL BEGIN; print 'Dropping table #delta_files_checkpoint' DROP TABLE #delta_files_checkpoint END create table #delta_files_checkpoint ( fname varchar(99), ts bigint, dt datetime2, [action] char(1) ) with (distribution=round_robin, heap) -- create a table of add/remove files with the datetimestamp INSERT INTO #delta_files_checkpoint select DISTINCT CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN JSON_VALUE([remove],'$.path') ELSE JSON_VALUE([add],'$.path') END fname, CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp')) ELSE convert(bigint,JSON_VALUE([add],'$.modificationTime')) END updatetime, CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN dateadd(s, convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp')) / 1000 , '1970/1/1') ELSE dateadd(s, convert(bigint,JSON_VALUE([add],'$.modificationTime')) / 1000 , '1970/1/1') END updatetimedt, CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN 'R' ELSE 'A' END [action] from #delta_checkpoint DELETE from #delta_files_checkpoint where fname is null if @debug = 1 SELECT 'checkpoint', * , , @dt FROM #delta_files_checkpoint order by dt desc -- remove files after the given time DELETE FROM #delta_files_checkpoint WHERE ts > if @debug = 1 SELECT 'checkpoint filtered', * , , @dt FROM #delta_files_checkpoint order by dt desc ---- Holds the raw json information ----------------------------- IF OBJECT_ID('tempdb..#delta_json') IS NOT NULL BEGIN; DROP TABLE #delta_json END create table #delta_json ( jsoninput varchar(max) ) with (distribution=round_robin, heap) ---------------------------------------------------------------- set @jsonversionchar= left(substring(@jsonversionchar,0,len(@jsonversionchar)-len(@checkpointversion)+1) + @checkpointversion,len(@jsonversionchar)-1) + '*' -- Read all the delta transaction logs, we cant filter out things by file name. set @tsql = ' copy into #delta_json (jsoninput) from ''' + @folder + '_delta_log/'+ @jsonversionchar +'.json'' with ( ' + @credential + ' ,FILE_TYPE = ''CSV'' ,fieldterminator =''0x0b'' ,fieldquote = ''0x0b'' ,rowterminator = ''0x0d'' ) ' if @debug = 1 print @tsql; set @td_checkpoint1 = getdate(); exec(@tsql); set @td_checkpoint2 = getdate(); print 'Loading json files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms' if @debug = 1 SELECT 'JSON Files', * , , @dt FROM #delta_json ---- delta file details.. IF OBJECT_ID('tempdb..#delta_active_json') IS NOT NULL BEGIN; DROP TABLE #delta_active_json END create table #delta_active_json ( jsoninput varchar(max) , ts bigint, dt datetime2, [action] char(1) ) with (distribution=round_robin, heap) ----------------------------------------------------------- -- inserting into temp table, so the date and time is associated to each row insert into #delta_active_json select convert(varchar(max),jsoninput) , convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) as ts , dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') ,'' from #delta_json WHERE convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) < if @debug = 1 select 'json filtered by date',*, , @dt from #delta_active_json order by dt desc -- left in for debugging -- shows each version of the JSON if @debug = 1 begin select 'not filtered', convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) as ts , convert(varchar(8000),jsoninput) , dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') ,ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY JSON_VALUE(jsoninput,'$.commitInfo.timestamp') DESC) AS "Row Number" ,convert(varchar(8000),jsoninput) , , @dt from #delta_json order by convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) desc end --------------------------------------------------------- -- insert the JSON adds and removes to the existing list from checkpoint insert into #delta_files_checkpoint select substring(value,charindex('path',value)+7 ,67) [path] , convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts , dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td ,'A' as [action] from #delta_active_json cross apply STRING_SPLIT(jsoninput,',') where value like '%path%' and charindex('add',value) > 0 union all select substring(value,charindex('path',value)+7 ,67) , convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts , dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td , 'R' from #delta_active_json cross apply STRING_SPLIT(jsoninput,',') where value like '%path%' and charindex('remove',value) > 0 if @debug = 1 begin -- all the adds and removes from the json select 'adds', substring(value,charindex('path',value)+7 ,67) [path] , convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts , dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td ,'A' as [action] , , @dt from #delta_active_json cross apply STRING_SPLIT(jsoninput,',') where value like '%path%' and charindex('add',value) > 0 select 'removes', substring(value,charindex('path',value)+7 ,67) , convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts , dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td , 'R', , @dt from #delta_active_json cross apply STRING_SPLIT(jsoninput,',') where value like '%path%' and charindex('remove',value) > 0 select 'New list with adds and removes', *, , @dt from #delta_files_checkpoint order by ts desc; select distinct 'distinct list ', fname, dt, [action], , @dt from #delta_files_checkpoint order by dt desc; end ---- Holds the raw json information ----------------------------- IF OBJECT_ID('tempdb..#delta_files') IS NOT NULL BEGIN; DROP TABLE #delta_files END create table #delta_files ( filenames varchar(max), rownumber bigint ) with (distribution=round_robin, heap) insert into #delta_files select fname, ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY fname DESC) from #delta_files_checkpoint where [action] = 'A' and fname not in (select fname from #delta_files_checkpoint where [action] = 'R') if @debug = 1 select '#delta_files', * from #delta_files; if @debug = 1 select 'parquet to read ', fname,ts, , dt, @dt from #delta_files_checkpoint where [action] = 'A' and fname not in (select fname from #delta_files_checkpoint where [action] = 'R') --where ts < --- This batching code might appear over kill, but STRING_AGG falls over when the string -- is too long.. so I am batching up the inserts. DECLARE @flist varchar(max); DECLARE @batchsize int = 100000; DECLARE @iParquetFileCount int =0; DECLARE @iParquetFilePos int =1; select @iParquetFileCount = count(*) from #delta_files IF OBJECT_ID('tempdb..#_files') IS NOT NULL DROP TABLE #_files create table #_files ( fname varchar(999) ) with (distribution=round_robin,heap) ---- creating batches of COPY INTO statements while(@iParquetFilePos <= @iParquetFileCount) BEGIN INSERT INTO #_files SELECT [filenames] FROM #delta_files where rownumber between @iParquetFilePos and @iParquetFilePos + @batchsize -- Had issues with STRING_AGG with if the string gets too long SELECT @flist = 'COPY INTO ' + @dest_table + ' FROM ' + STRING_AGG(CAST(''''+@folder+fname+'''' AS VARCHAR(MAX)), ', ') + ' with ( ' + @credential + ' ,FILE_TYPE = ''PARQUET'' , AUTO_CREATE_TABLE = ''ON'' )' FROM #_files if @debug =1 select '_filelist', * from #_files; if @debug =1 print @flist; if @debug =1 print len(@flist); set @td_checkpoint1 = getdate(); exec(@flist); set @td_checkpoint2 = getdate(); print 'Loading batch of parquet ' + convert(varchar(10),@iParquetFilePos) + '->' + convert(varchar(10), @iParquetFilePos + @batchsize ) + ' took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms' truncate table #_files; set @iParquetFilePos = @iParquetFilePos + @batchsize +1; END -- if there is no file, there is no table and this causes a problem if charindex('#',@dest_table) > 0 set @output_sql = 'IF OBJECT_ID(''tempdb..' + @dest_table + ''') IS NOT NULL SELECT * from ' + @dest_table + ';' ELSE set @output_sql = 'IF OBJECT_ID(''' + @dest_table + ''') IS NOT NULL SELECT * from ' + @dest_table + ';' if @debug = 1 PRINT @output_sql; if @display = 1 exec(@output_sql ); end

 

 

 

Our team publishes blog(s) each week and you can find all these blogs here: https://aka.ms/synapsecseblog  

For deeper level understanding of Synapse implementation best practices, please refer to the Synapse Success by Design site. 

 

MarkPryceMaher_0-1662145763161.png

Author(s): Mark Pryce-Maher is a Program Manager in Azure Synapse Customer Success Engineering (CSE) team. 

 

 

 

 

 

 

 

 

 

 

Published on:

Learn more
Azure Synapse Analytics Blog articles
Azure Synapse Analytics Blog articles

Azure Synapse Analytics Blog articles

Share post:

Related posts

Stay up to date with latest Microsoft Dynamics 365 and Power Platform news!
* Yes, I agree to the privacy policy