In June, Databricks announced that they are open sourcing Delta Lake 2.0. Delta Lake is quickly becoming the format of choice in data science and data engineering.
To import Delta Lake into a Synapse dedicated SQL Pool you would need Azure Data Factory/Synapse Pipelines or Spark to handle the Delta Lake files.
This is not ideal because it adds extra overheads of complexity, time, and costs.
As an intellectual challenge, I wondered if it's possible to import Delta Lake files directly into the dedicated SQL Pool and support features like time-travel. It turned out to be a great little project and a great way of learning about Delta Lake.
I have uploaded the script to the Azure Synapse Toolbox Github site. The Azure Synapse Toolbox is an open-source library of useful tools and scripts to use with Azure Synapse Analytics.
Here is a breakdown of how the script works.
- Read the _last_checkpoint file, this contains the highest checkpoint version number.
- Read all the checkpoint files, these contain the 'Add' and 'Remove' files that need to be included and their timestamps.
- Read the latest JSON transaction files (since the last checkpoint), these contain the most recent 'Add' and 'Remove' files that need to be included and their timestamps.
- Filter out the 'Add' and 'Remove' files based on the 'modificationTime' and 'deletionTimestamp'
- Return the data from all the 'Add' files excluding the 'Remove' files. (Batching up the COPY INTO statements)
/*
IF OBJECT_ID('mpmtest') IS NOT NULL
BEGIN;
DROP TABLE mpmtest
END
declare @path varchar(400), @dt datetime2, @credential varchar(500),@outputable varchar(500),@display int, @debug int;
set @path = 'https://storageaccount.blob.core.windows.net/container/delta/demo/'
--set @dt = convert(datetime2,'2022/07/06 18:37:00'); --getdate(); -- for time travel --
set @dt = getdate(); -- for time travel --
set @credential = 'IDENTITY= ''Shared Access Signature'', SECRET = ''___SECRET____''';
set @outputable = 'mpmtest' -- leave empty for temp table
set @display = 1; -- if 1 display results
set @debug = 1;
exec delta @path,@dt,@credential,@outputable,@display, @debug
--select count(*) from mpmtest;
*/
create PROC [dbo].[delta]
@folder [varchar](4000),
@dt [datetime2],
@credential [varchar](4000),
@dest_table [varchar](4000),
@display int, -- 0 dont display, 1 display
@debug int -- 0 dont debug , 1 debug mode
AS
begin
-- version info
-- 0.1 - Very basic, but sort of worked.
-- 0.2 - Added support for remove files.
-- 0.3 - Fixed bug , in authenication
-- 0.4 - Improved performance in handling add/remove files
-- 0.5 - Added support for checkpoint files
-- 0.6 - Changed the json handling, added latest checkpoint handing
DECLARE bigint,@tsmil bigint, @json varchar(8000), @tsql varchar(8000);
DECLARE @json_remove varchar(8000), @tsql_checkpoint varchar(8000);
DECLARE @td_checkpoint1 datetime2, @td_checkpoint2 datetime2;
DECLARE @last_checkpoint varchar(25), @jsonversionchar varchar(25),@checkpointversion varchar(25) ;
DECLARE @output_sql varchar(max);
DECLARE _file_count bigint;
-- default: AAD authenication
IF (@credential IS NULL)
SET @credential = ''
IF (@credential <> '')
SET @credential = ' CREDENTIAL = (' + @credential + ')'
-- default : display results
IF @display IS NULL
set @display = 1
-- default : debug is off
IF @debug IS NULL
SET @debug = 0
-- if the datetime is null, get the latest version
IF (@dt IS NULL)
set @dt = getdate()
-- number of milliseconds since 1970
-- Datediff doesn't return a bigint - so I need todo this into 2 parts
set = datediff( s, '1970/1/1', convert(date,@dt) )
set = * 1000;
set @tsmil = datediff(ms, convert(date,@dt) , @dt)
set = + @tsmil;
if (@dest_table IS NULL)
set @dest_table = '#tmp_output'
---- Holds the raw json information -----------------------------
IF OBJECT_ID('tempdb..#last_checkpoint') IS NOT NULL
DROP TABLE #last_checkpoint
create table #last_checkpoint
(
info varchar(max)
) with (distribution=round_robin, heap)
------ read _last_checkpoint
-- This gives us the checkpoint if it exists, this gives us the starting point
-- for the json files
set @tsql = '
copy into #last_checkpoint (info)
from ''' + @folder + '_delta_log/_last_checkpoint''
with ( ' + @credential + ' ,FILE_TYPE = ''CSV''
,fieldterminator =''0x0b'' ,fieldquote = ''0x0b'' ,rowterminator = ''0x0d'' ) '
if @debug = 1
print @tsql;
set @td_checkpoint1 = getdate();
exec(@tsql);
set @td_checkpoint2 = getdate();
print 'Loading _last_checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'
if @debug = 1
begin
select '_Last_checkpoint', * from #last_checkpoint
end
select @checkpointversion = ISNULL(JSON_VALUE(info,'$.version'),'00') from #last_checkpoint;
if NOT EXISTS(SELECT * FROM #last_checkpoint)
BEGIN
SET @checkpointversion = '00'
END
PRINT 'checkpointversion=' +@checkpointversion
set @jsonversionchar = '00000000000000000000';
IF OBJECT_ID('tempdb..#delta_checkpoint') IS NOT NULL
DROP TABLE #delta_checkpoint
create table #delta_checkpoint
(
txn varchar(max),
[add] varchar(max),
[remove] varchar(max),
metaData varchar(max),
protocol varchar(max)
) with (distribution=round_robin, heap)
--- Code to pull the the add / remove files from the checkpoint files.
set @tsql_checkpoint = '
copy into #delta_checkpoint from ''' + @folder + '_delta_log/*.checkpoint.parquet''
with ( ' + @credential + ' ,FILE_TYPE = ''PARQUET'', AUTO_CREATE_TABLE = ''ON'' ) '
if @debug = 1
BEGIN
print @tsql_checkpoint;
END
set @td_checkpoint1 = getdate()
exec(@tsql_checkpoint);
set @td_checkpoint2 = getdate()
print 'Loading checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'
SELECT _file_count = count(*) from #delta_checkpoint
if @debug = 1
BEGIN
print 'No of checkpoint files ' + convert(varchar(10),@checkpoint_file_count);
SELECT * FROM #delta_checkpoint order by [add],[remove]
END
-- Holds the information from the checkpoint files ---------
IF OBJECT_ID('tempdb..#delta_files_checkpoint') IS NOT NULL
BEGIN;
print 'Dropping table #delta_files_checkpoint'
DROP TABLE #delta_files_checkpoint
END
create table #delta_files_checkpoint
(
fname varchar(99),
ts bigint,
dt datetime2,
[action] char(1)
) with (distribution=round_robin, heap)
-- create a table of add/remove files with the datetimestamp
INSERT INTO #delta_files_checkpoint
select DISTINCT
CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN
JSON_VALUE([remove],'$.path')
ELSE
JSON_VALUE([add],'$.path')
END fname,
CASE
WHEN JSON_VALUE([add],'$.path') is NULL THEN
convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp'))
ELSE
convert(bigint,JSON_VALUE([add],'$.modificationTime'))
END updatetime,
CASE
WHEN JSON_VALUE([add],'$.path') is NULL THEN
dateadd(s, convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp')) / 1000 , '1970/1/1')
ELSE
dateadd(s, convert(bigint,JSON_VALUE([add],'$.modificationTime')) / 1000 , '1970/1/1')
END updatetimedt,
CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN
'R'
ELSE
'A'
END [action]
from #delta_checkpoint
DELETE from #delta_files_checkpoint where fname is null
if @debug = 1
SELECT 'checkpoint', * , , @dt FROM #delta_files_checkpoint order by dt desc
-- remove files after the given time
DELETE FROM #delta_files_checkpoint WHERE ts >
if @debug = 1
SELECT 'checkpoint filtered', * , , @dt FROM #delta_files_checkpoint order by dt desc
---- Holds the raw json information -----------------------------
IF OBJECT_ID('tempdb..#delta_json') IS NOT NULL
BEGIN;
DROP TABLE #delta_json
END
create table #delta_json
(
jsoninput varchar(max)
) with (distribution=round_robin, heap)
----------------------------------------------------------------
set @jsonversionchar= left(substring(@jsonversionchar,0,len(@jsonversionchar)-len(@checkpointversion)+1) + @checkpointversion,len(@jsonversionchar)-1) + '*'
-- Read all the delta transaction logs, we cant filter out things by file name.
set @tsql = '
copy into #delta_json (jsoninput)
from ''' + @folder + '_delta_log/'+ @jsonversionchar +'.json''
with ( ' + @credential + ' ,FILE_TYPE = ''CSV''
,fieldterminator =''0x0b'' ,fieldquote = ''0x0b'' ,rowterminator = ''0x0d'' ) '
if @debug = 1
print @tsql;
set @td_checkpoint1 = getdate();
exec(@tsql);
set @td_checkpoint2 = getdate();
print 'Loading json files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'
if @debug = 1
SELECT 'JSON Files', * , , @dt FROM #delta_json
---- delta file details..
IF OBJECT_ID('tempdb..#delta_active_json') IS NOT NULL
BEGIN;
DROP TABLE #delta_active_json
END
create table #delta_active_json
(
jsoninput varchar(max) ,
ts bigint,
dt datetime2,
[action] char(1)
) with (distribution=round_robin, heap)
-----------------------------------------------------------
-- inserting into temp table, so the date and time is associated to each row
insert into #delta_active_json
select
convert(varchar(max),jsoninput)
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) as ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1')
,''
from #delta_json
WHERE convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) <
if @debug = 1
select 'json filtered by date',*, , @dt from #delta_active_json order by dt desc
-- left in for debugging -- shows each version of the JSON
if @debug = 1
begin
select 'not filtered',
convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) as ts
, convert(varchar(8000),jsoninput)
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1')
,ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY JSON_VALUE(jsoninput,'$.commitInfo.timestamp') DESC) AS "Row Number"
,convert(varchar(8000),jsoninput)
, , @dt
from #delta_json
order by convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) desc
end
---------------------------------------------------------
-- insert the JSON adds and removes to the existing list from checkpoint
insert into #delta_files_checkpoint
select substring(value,charindex('path',value)+7 ,67) [path]
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td
,'A' as [action]
from #delta_active_json
cross apply STRING_SPLIT(jsoninput,',')
where value like '%path%' and charindex('add',value) > 0
union all
select substring(value,charindex('path',value)+7 ,67)
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td
, 'R'
from #delta_active_json
cross apply STRING_SPLIT(jsoninput,',')
where value like '%path%' and charindex('remove',value) > 0
if @debug = 1
begin
-- all the adds and removes from the json
select 'adds', substring(value,charindex('path',value)+7 ,67) [path]
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td
,'A' as [action] , , @dt
from #delta_active_json
cross apply STRING_SPLIT(jsoninput,',')
where value like '%path%' and charindex('add',value) > 0
select 'removes', substring(value,charindex('path',value)+7 ,67)
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td
, 'R', , @dt
from #delta_active_json
cross apply STRING_SPLIT(jsoninput,',')
where value like '%path%' and charindex('remove',value) > 0
select 'New list with adds and removes', *, , @dt from #delta_files_checkpoint order by ts desc;
select distinct 'distinct list ', fname, dt, [action], , @dt from #delta_files_checkpoint order by dt desc;
end
---- Holds the raw json information -----------------------------
IF OBJECT_ID('tempdb..#delta_files') IS NOT NULL
BEGIN;
DROP TABLE #delta_files
END
create table #delta_files
(
filenames varchar(max), rownumber bigint
) with (distribution=round_robin, heap)
insert into #delta_files
select fname,
ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY fname DESC)
from #delta_files_checkpoint where [action] = 'A' and
fname not in (select fname from #delta_files_checkpoint where [action] = 'R')
if @debug = 1
select '#delta_files', * from #delta_files;
if @debug = 1
select 'parquet to read ', fname,ts, , dt, @dt from #delta_files_checkpoint where [action] = 'A' and
fname not in (select fname from #delta_files_checkpoint where [action] = 'R')
--where ts <
--- This batching code might appear over kill, but STRING_AGG falls over when the string
-- is too long.. so I am batching up the inserts.
DECLARE @flist varchar(max);
DECLARE @batchsize int = 100000;
DECLARE @iParquetFileCount int =0;
DECLARE @iParquetFilePos int =1;
select @iParquetFileCount = count(*) from #delta_files
IF OBJECT_ID('tempdb..#_files') IS NOT NULL
DROP TABLE #_files
create table #_files
( fname varchar(999) ) with (distribution=round_robin,heap)
---- creating batches of COPY INTO statements
while(@iParquetFilePos <= @iParquetFileCount)
BEGIN
INSERT INTO #_files
SELECT [filenames] FROM #delta_files where rownumber between @iParquetFilePos and @iParquetFilePos + @batchsize
-- Had issues with STRING_AGG with if the string gets too long
SELECT @flist = 'COPY INTO ' + @dest_table + ' FROM '
+ STRING_AGG(CAST(''''+@folder+fname+'''' AS VARCHAR(MAX)), ',
') + ' with ( ' + @credential + '
,FILE_TYPE = ''PARQUET'' , AUTO_CREATE_TABLE = ''ON'' )'
FROM #_files
if @debug =1
select '_filelist', * from #_files;
if @debug =1 print @flist;
if @debug =1 print len(@flist);
set @td_checkpoint1 = getdate();
exec(@flist);
set @td_checkpoint2 = getdate();
print 'Loading batch of parquet ' + convert(varchar(10),@iParquetFilePos) + '->' + convert(varchar(10), @iParquetFilePos + @batchsize ) + ' took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'
truncate table #_files;
set @iParquetFilePos = @iParquetFilePos + @batchsize +1;
END
-- if there is no file, there is no table and this causes a problem
if charindex('#',@dest_table) > 0
set @output_sql = 'IF OBJECT_ID(''tempdb..' + @dest_table + ''') IS NOT NULL SELECT * from ' + @dest_table + ';'
ELSE
set @output_sql = 'IF OBJECT_ID(''' + @dest_table + ''') IS NOT NULL SELECT * from ' + @dest_table + ';'
if @debug = 1
PRINT @output_sql;
if @display = 1
exec(@output_sql );
end
Our team publishes blog(s) each week and you can find all these blogs here: https://aka.ms/synapsecseblog
For deeper level understanding of Synapse implementation best practices, please refer to the Synapse Success by Design site.
Author(s): Mark Pryce-Maher is a Program Manager in Azure Synapse Customer Success Engineering (CSE) team.