Friday, February 28, 2014

Loading a SCD Type 2 Persistent Staging Table using a Dynamic Merge Statement in SQL Server 2012

     A persistent staging database is a layer in your data warehouse architecture that stores data from the source systems untransformed, with change tracking. Think of this layer as an insurance policy against bad design choices made and ETL bugs in architecture layers closer to presentation. If there is redesign that needs to be made, or a transformation was wrong in your ETL you won't need to peg the source systems again to get the data back. All the data you need to reload resides in your persistent staging database with change history!

    In this post I'll detail a way, with certain caveats, to create a single stored procedure that your SSIS packages can call to pull data from a staging table and perform SCD Type 2 transformations into a persistent staging table using a SQL merge statement. When doing a simple google search for this I came upon a really good post about merge statements written by Dallas Snider. I thought that I'd use his solution as a starting point to implement what I was trying to accomplish, a dynamic way to generate the statement by utilizing metadata from the persistent staging table.

     To use the stored procedure there are a few caveats that need to be in place:
  • The stage table and the persistent stage table need to be in different databases
  • The stage table and the persistent stage table need to have the same name
  • The stage table and the persistent stage table need to have the same fields, with same names, except for the control fields in the persistent staging table. These control fields include:
  1. EffectiveStartDT (The datetime the record was valid from, defaults to 1/1/900)
  2. EffectiveExpireDT (The datetime the record was valid to, defaults to 1/1/4000)
  3. CurrentRowYN (A Y or N flag that determines whether or not the record is the current version)
  4. PSInsertDT (The datetime the record was inserted into the persistent staging table)
  5. Checksum (The binary checksum of the record, this will be compared against incoming data from staging to see if a record has changed)
  • The persistent staging table must have a primary key on a combination of the natural key(s), sometimes referred to as a business key) and the EffectiveStartDT.
  • None of the fields can be a BLOB i.e. ntext, text, image etc. since the binary_checksum function can't handle those data types. There is a work-around that I'll discuss later in this post.
  • This is for daily changes. If you have multiple loads a day, with multiple changes in a single day, the record will simply be overwritten without doing a SCD type 2. (Essentially a SCD Type 1).
  • Only one version of a record can exist in the staging table 
  • The staging database and the persistent staging database have to be on the same server
     Let's demonstrate how to use this stored procedure, then give out the code behind it. First we'll create some sample data to work with by creating 3 databases, Source, Staging and PersistentStaging and move data from a table called "Department" through the 3 databases.


use Source
go


create table Department
(
DepartmentID int,
DepartmentName varchar(25),
DepartmentLocation varchar(45),
DepartmentUnit varchar(40)
)

insert into Department
(DepartmentID, DepartmentName, DepartmentLocation, DepartmentUnit)
values
(1, 'Accounts Receivable', 'New York', 'Accounting'),
(2, 'Corporate Ethics', 'Nashville', 'Human Resources'),
(3, 'Help Desk', 'Nashville', 'Information Systems')

go


use Staging
go


create table Department
(
DepartmentID int,
DepartmentName varchar(25),
DepartmentLocation varchar(45),
DepartmentUnit varchar(40)
)


use PersistentStaging
go


create table Department
(
DepartmentID int not null,
DepartmentName varchar(25),
DepartmentLocation varchar(45),
DepartmentUnit varchar(40),
EffectiveStartDT datetime not null,
EffectiveExpireDT datetime,
CurrentRowYN char(1),
PSInsertDT datetime,
CheckSum varchar(35),
constraint pk_department primary key (DepartmentID, EffectiveStartDT)
)
go

 Now our goal is to get department data from the source database, stage the data in the staging database, then load SCD type 2 changes into our persistent staging database. Our SSIS package to do this will look something like this:

Figure 1. Sequence Container to Persistently Stage Data

     The package will start off by truncating the stage table, to prepare it for receiving fresh data from the source system:


Figure 2. Truncate Staging Table
     After truncating the staging table, its now ready to be populated with data from the source system table in the data flow task:


Figure 3. Load Source Data into Staging

      With the data now in staging we can call our stored procedure that will compare the data in the staging table to the data in the persistent staging table. If a record with the natural key doesn't exist yet, we simply insert a new record into the persistent staging table. If it does, and a change has happened, we expire the previous record and insert the new version of the record into persistent staging. This happens by calling a stored procedure called "PersistRecordFromStaging" and passing a parameter consisting of the table name:


Figure 4. Executing Stored Procedure to Persistently Stage Data
You'll notice in this example, the execute sql task has the result set property set to Single row. This is because the store procedure will return a single row containing 2 fields. 

 The stored procedure will dynamically generate this merge statement based on the metadata of the persistent staging table:


BEGIN TRY
BEGIN TRAN
-->Inserts new records, and records that have been updated as a SCD type 2
SET NOCOUNT ON;
SET XACT_ABORT ON
INSERT INTO
 PersistentStaging.dbo.Department
(

 DepartmentID,
 DepartmentName,
 DepartmentLocation,
 DepartmentUnit,
 EffectiveStartDT,
 EffectiveExpireDT,
 CurrentRowYN,
 PSInsertDT,
 CheckSum
)
SELECT

 DepartmentID,
 DepartmentName,
 DepartmentLocation,
 DepartmentUnit,
 EffectiveStartDT,
 EffectiveExpireDT,
 CurrentRowYN,
 PSInsertDT,
 CheckSum

FROM
(
MERGE into PersistentStaging.dbo.Department AS target
 USING (
SELECT

DepartmentID,
DepartmentName,
DepartmentLocation,
DepartmentUnit
,binary_checksum(
convert(varchar, DepartmentID ),
DepartmentName,
DepartmentLocation,
DepartmentUnit) as CheckSum

 FROM Staging.dbo.Department with (nolock)) As source
(

 DepartmentID,
 DepartmentName,
 DepartmentLocation,
 DepartmentUnit,
 CheckSum

)
 ON
(

 target.DepartmentID = source.DepartmentID AND 1=1)
-->If change occurs we deactivate the previous record
WHEN MATCHED and target.[Checksum] <> source.[Checksum] and target.CurrentRowYN='Y' and target.[EffectiveStartDT]<>convert(date,sysdatetime())
THEN
update set [EffectiveExpireDT] =dateadd(ms,-3,dateadd(day,1,DATEADD(dd, DATEDIFF(dd,0,sysdatetime()), -1))),[CurrentRowYN]='N'
-->If change occurs on same day as previous change, we take the net
WHEN MATCHED and target.[Checksum] <> source.[Checksum] and target.CurrentRowYN='Y' and target.[EffectiveStartDT]=convert(date,sysdatetime())
THEN DELETE
-->If this record has never been inserted into this table, insert
WHEN NOT MATCHED THEN
INSERT
(

 DepartmentID,
 DepartmentName,
 DepartmentLocation,
 DepartmentUnit,
 EffectiveStartDT,
 EffectiveExpireDT,
 CurrentRowYN,
 PSInsertDT,
 CheckSum
)
VALUES
(

 source.DepartmentID,
 source.DepartmentName,
 source.DepartmentLocation,
 source.DepartmentUnit
,'1/1/1900',
'1/1/4000',
'Y',
'2014-02-27 19:14:15',
source.[Checksum]

)
OUTPUT

 source.DepartmentID,
 source.DepartmentName,
 source.DepartmentLocation,
 source.DepartmentUnit
,convert(date,sysdatetime()) as [EffectiveStartDT],
'1/1/4000' as [EffectiveExpireDT],
'Y'as [CurrentRowYN],
'Feb  27 2014  7:14PM' as [PSInsertDT],
source.[Checksum],
$action as action

)AS CHANGES
(

 DepartmentID,
 DepartmentName,
 DepartmentLocation,
 DepartmentUnit,
 EffectiveStartDT,
 EffectiveExpireDT,
 CurrentRowYN,
 PSInsertDT,
 CheckSum
,action
)
-->If we have records that have been deleted or updated, we do an insert
where action='UPDATE' or action='DELETE';
SELECT @UpdateCount=@@ROWCOUNT;
SELECT @InsertCount = count(*) from PersistentStaging.dbo.Department where '1/1/1900' = [EffectiveStartDT] and '1/1/4000' = [EffectiveExpireDT] and [PSInsertDT]='2014-02-27 19:14:15'
COMMIT TRAN
END TRY
BEGIN CATCH
IF XACT_STATE() = -1
ROLLBACK
RAISERROR (N'Error has occurred %s %d.', -- Message text.
           10, -- Severity,
           1, -- State,
           N'number', -- First argument.
           5); -- Second argument.
THROW

END CATCH;

The number of updates (records that had an SCD type 2 performed) and the number of inserts (brand new records never seen by the persistent staging table):


Figure 5. Execute SQL Task Result Set

 These can be stored in SSIS variables you create in the package:


Figure 6. SSIS Variables to Store Inserts/Updates

     After we persistently stage our data, we call another execute sql task that will log our insert and updates to an audit table. Lets run the package and see what the data in staging and persistent staging looks like:


select * from Staging.dbo.department

select * from PersistentStaging.dbo.department

Figure 7. Data Preview
     As you can see data came into staging, loaded into persistent staging and set the effective, expiration, current row, insert date and time and recorded the row check sum. Now, lets say we fast forward a day and sometime between this data warehouse load and the last load the Help Desk department moved from Nashville to Cleveland (sorry help desk people). 

use Source

update department
set DepartmentLocation='Cleveland'
where DepartmentName='Help Desk'

go

     Now, let's run the package again and see what happened to our data:

select * from Staging.dbo.department

select * from PersistentStaging.dbo.department

Figure 8. Data Change
    From looking at the data in persistent staging you can see that the new record for the Help Desk department was entered and the previous record was expired to the end of the previous date.  The code for this stored procedure and the user defined function it uses is: 

Use PersistentStaging
GO

Create
 function [dbo].[udfColumnsForBinaryChecksum] ( @TableName varchar(100))
returns

 varchar(8000)
as

-- =========================================================================

-- Author: Jim Ferris http://dennysjymbo.blogspot.com/

-- Create date: 2014-02-28

-- Description: returns a comma seperated list of fields from a table for binary checksum input

-- Example: select dbo.udfColumnsForBinaryChecksum ('Department')

-- =========================================================================

begin

declare @returnValues varchar(8000)=' '
Select

 @returnValues = @returnValues +CHAR(10) + case when NUMERIC_PRECISION is not null then 'convert(varchar, '+ COLUMN_NAME +' ) ' else  COLUMN_NAME end + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
Set @returnValues=SUBSTRING( @returnValues,0,len(@returnValues))
return @returnValues;
end

GO


CREATE

 PROCEDURE [dbo].[PersistRecordFromStaging]
(

@TableName AS varchar(100))
AS

--> =========================================================================

--> Author: Jim Ferris http://dennysjymbo.blogspot.com/

--> Create date: 2014-02-28

--> Description: Persistently stages data from staging to persistent staging

--> Example: exec dbo.PersistRecordFromStaging 'Department'

--> =========================================================================

--> SET NOCOUNT ON added to prevent extra result sets from interfering with SELECT statements.

SET NOCOUNT ON;
-->Variable declarations

DECLARE @DatabaseNameStage AS varchar(100) ='Staging',
@DatabaseNamePstage AS varchar(100) ='PersistentStaging',
@StagingOwner AS varchar(25) = 'dbo',
@PersistentStagingOwner AS varchar(25) = 'dbo',
@LogTableName AS varchar(100),
@SQL_Statement AS nvarchar(max),
@StageTableName AS varchar(100),
@CurrentDateTime as datetime = sysdatetime(),
@Fields AS varchar(max)=' ',
@TotalFields as varchar(max) =' ',
@Join as varchar(max) =' ',
@Source as varchar(max) =' ',
@Output as varchar(max) =' ' ,
@Selection as varchar(max) =' ',
@Changes as varchar(max) =' ' ,
@UpdateCount int,
@InsertCount int
-->Initialize variables

SET @LogTableName = @DatabaseNamePstage + '.'+@PersistentStagingOwner+'.' + @TableName
SET @StageTableName = @DatabaseNameStage + '.'+@StagingOwner+'.' + @TableName

BEGIN

 TRY
BEGIN TRAN
SET XACT_ABORT ON
-->Returns a string of all fields in the table

SELECT @TotalFields=@TotalFields + CHAR(10) + ' ' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'Checksum'
)
SET @TotalFields= @TotalFields +CHAR(10)+ 'EffectiveStartDT,'+ CHAR(10)+'EffectiveExpireDT,'+ CHAR(10)+'CurrentRowYN,'+ CHAR(10)+'PSInsertDT,'+ CHAR(10)+'CheckSum'

-->Returns a string of all fields minus the control fields in a table

SELECT @Fields = @Fields +CHAR(10) + ' ' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'Checksum'
)
SET @Fields= @Fields + CHAR(10)+'Checksum'
-->Returns all the fields in the table minus the control fields, adding the source roleplaying identifier

SELECT @Source = @Source +CHAR(10) + ' source.' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
SET @Source=SUBSTRING( @Source,0,len(@Source)) + CHAR(10) +',''1/1/1900'','+ CHAR(10) +'''1/1/4000'','+ CHAR(10) +'''Y'','+ CHAR(10) +''''+ convert(varchar,@CurrentDateTime,20)+''','+ CHAR(10) +'source.[Checksum]'+ CHAR(10)
-->Returns all the fields in a table, minus the control fields, with the addition of the binary checksum

SELECT @Selection = @Selection +CHAR(10) + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
SET @Selection=SUBSTRING( @Selection,0,len(@Selection)) + CHAR(10) + ',binary_checksum('+dbo.udfColumnsForBinaryChecksum(@TableName)+') as CheckSum'
-->Returns all the fields in a table, minus the control fields, with the addition of the values needed for an insert

SELECT @Output = @Output +CHAR(10) + ' source.' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
SET @Output=SUBSTRING( @Output,0,len(@Output)) + CHAR(10) +',convert(date,sysdatetime()) as [EffectiveStartDT],'+ CHAR(10) +'''1/1/4000'' as [EffectiveExpireDT],'+ CHAR(10) +'''Y''as [CurrentRowYN],'+ CHAR(10) +''''+ convert(varchar,@CurrentDateTime)+''''+ ' as [PSInsertDT],'+ CHAR(10) +'source.[Checksum],'+ CHAR(10) +'$action as action' + CHAR(10)

-->Returns the changes output
SELECT @Changes = @Changes +CHAR(10) + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
SET @Changes= @Changes +CHAR(10)+ 'EffectiveStartDT,'+ CHAR(10)+'EffectiveExpireDT,'+ CHAR(10)+'CurrentRowYN,'+ CHAR(10)+'PSInsertDT,'+ CHAR(10)+'CheckSum'


-->Returns the join statement for the join between the staging and the persistent staging tables
SELECT @Join =@Join + CHAR(10) + ' target.' + ccu.COLUMN_NAME + ' = source.' + ccu.COLUMN_NAME + ' AND'
FROM
INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu ON tc.CONSTRAINT_NAME = ccu.Constraint_name
JOIN INFORMATION_SCHEMA.COLUMNS c ON ccu.TABLE_NAME = c.TABLE_NAME AND ccu.COLUMN_NAME = c.COLUMN_NAME
WHERE
tc.CONSTRAINT_TYPE = 'Primary Key' and ccu.COLUMN_NAME <> 'EffectiveStartDT'
and ccu.TABLE_NAME = @TableName
SET @Join =@Join + ' 1=1)'
-->Begin generating merge statement

Select @SQL_Statement=convert(nvarchar(max), N'')

--+'Declare @UpdateCount int=0, @InsertCount int=0'+ CHAR(10)
+ '-->Inserts new records, and records that have been updated as a SCD type 2'
+ CHAR(10) + 'SET NOCOUNT ON;'
+ CHAR(10) + 'SET XACT_ABORT ON '
+ CHAR(10) + 'INSERT INTO'
+ CHAR(10) + ' ' + @LogTableName
+ CHAR(10) + '('
+ CHAR(10) + @TotalFields +')'
+ CHAR(10) + 'SELECT '
+ CHAR(10) + @TotalFields
+ CHAR(10) + 'FROM'
+ CHAR(10) + '('
+ CHAR(10) + 'MERGE into '+@LogTableName+' AS target'
+ CHAR(10) + ' USING ('
+ CHAR(10) + 'SELECT'
+ CHAR(10) + @Selection + CHAR(10)
+ CHAR(10) +' FROM '+@StageTableName+' with (nolock)) As source'
+ CHAR(10) + '('
+ CHAR(10) +@Fields
+ CHAR(10) + ')'
+ CHAR(10) + ' ON'
+ CHAR(10) + '('
+ CHAR(10) + @Join
+ CHAR(10) + '-->If change occurs we deactivate the previous record'
+ CHAR(10) + 'WHEN MATCHED and target.[Checksum] <> source.[Checksum] and target.CurrentRowYN=''Y'' and target.[EffectiveStartDT]<>convert(date,sysdatetime())'
+ CHAR(10) + 'THEN'
+ CHAR(10) + 'update set [EffectiveExpireDT] =dateadd(ms,-3,dateadd(day,1,DATEADD(dd, DATEDIFF(dd,0,sysdatetime()), -1))),[CurrentRowYN]=''N'''
+ CHAR(10) +'-->If change occurs on same day as previous change, we take the net'
+ CHAR(10) + 'WHEN MATCHED and target.[Checksum] <> source.[Checksum] and target.CurrentRowYN=''Y'' and target.[EffectiveStartDT]=convert(date,sysdatetime())'
+ CHAR(10) + 'THEN DELETE '
+ CHAR(10) + '-->If this record has never been inserted into this table, insert'
+ CHAR(10) + 'WHEN NOT MATCHED THEN '
+ CHAR(10) + 'INSERT'
+ CHAR(10) + '('
+ CHAR(10) + @TotalFields +')'
+ CHAR(10) + 'VALUES'
+ CHAR(10) + '('
+ CHAR(10) + @Source
+ CHAR(10) + ')'
+ CHAR(10) + 'OUTPUT'
+ CHAR(10) + @Output
+ CHAR(10) +')'
+'AS CHANGES' + CHAR(10)
+'('+ CHAR(10)
+ @Changes
+ ',action'+ CHAR(10)
+')'+ CHAR(10)
+'-->If we have records that have been deleted or updated, we do an insert'+ CHAR(10)
+'where action=''UPDATE'' or action=''DELETE'';'+ CHAR(10)
+ 'SELECT @UpdateCount=@@ROWCOUNT;'
+ CHAR(10) + 'SELECT @InsertCount = count(*) from ' + @LogTableName+' where ''1/1/1900'' = [EffectiveStartDT] and ''1/1/4000'' = [EffectiveExpireDT] and [PSInsertDT]='+''''+convert(varchar,@CurrentDateTime,20)+''''

exec sp_executesql @SQL_Statement,N'@UpdateCount int OUTPUT, @InsertCount int OUTPUT', @UpdateCount OUTPUT,@InsertCount OUTPUT;
SELECT @UpdateCount as UpdateCount, @InsertCount as InsertCount
--print @sql_statement
COMMIT

 TRAN
END TRY
BEGIN CATCH
IF XACT_STATE() = -1
ROLLBACK
RAISERROR (N'Error has occurred %s %d.', -- Message text.
           10, -- Severity,
           1, -- State,
           N'number', -- First argument.
           5); -- Second argument.
THROW

END CATCH;


     This stored procedure will dynamically generate the merge statement for any persistent staging table as long as you follow the rules stated in the beginning of this post. This should speed up your dev time on these kinds of loads tremendously.  To get around the BLOB issue, you can change the user defined function to something like this:

USE PersistentStaging
go
ALTER function [dbo].[udfColumnsForBinaryChecksum] ( @TableName varchar(100))

returns varchar(8000)

as

begin
   declare @returnValues varchar(8000)=' '

Select @returnValues = @returnValues +CHAR(10) + case when character_maximum_length=-then 'convert(varchar(4000),left('+ COLUMN_NAME +',4000)) ' else COLUMN_NAME end + ','
                FROM
                     INFORMATION_SCHEMA.COLUMNS
                WHERE
                    TABLE_NAME = @TableName
                     AND COLUMN_NAME NOT IN
                        (
                        'EffectiveStartDT',
                        'EffectiveExpireDT',
                        'CurrentRowYN',
                        'PSInsertDT',
                        'CheckSum'
                        )
        Set @returnValues=SUBSTRING( @returnValues,0,len(@returnValues))
        return @returnValues;

end

     Now this will only check the first 4000 characters of the text. You could probably increase its likely hood of catching a change in the blob by concatenating it with a right(4000) too. It all depends on the situation you have and how big the blob is. I wrote another post on how to generate a check sum on Blobs such as this. You could probably alter the dynamic sql to somehow incorporate this and just pull the check sum from the staging table rather than have it generated on the fly in sql.