Saturday, July 8, 2017

Using SQL Server Change Tracking with Streamset's Data Collector Kudu Destination

     Just started using Apache Kudu for a few projects that require near-real time data, as well as for our ETL logging for analytics. It's a really great storage engine if you like working with relational data, and if you can get around the limitations. My favorite thing about it is that not only can you interact with Kudu tables using Impala and SQL, but it also has an API that allows you insert/update/delete/upsert to tables directly. This means I can send data to Kudu with instructions on what to do with that data. So for traditional data warehousing, you would most likely send your deltas to a staging table before running batched upserts into the destination table to get it in synch with the source system. With Kudu you could still do this, but due to the Java API and Streamsets Data Collector, there really is no need to. 

     In Streamsets Data Collector they have a Kudu Destination that takes advantage of the Java API to load data. This is tightly coupled with the CDC feature they have on their JDBC Origin. The way this works is that the CDC operations like:

1 for INSERT
2 for DELETE
3 for UPDATE
4 for UPSERT

are assigned to the Data Collector header variable sdc.operation.type which in turn tells the Kudu destination what to do with the data. Unfortunately for some of us change tracking users the operation codes come in as I, D or U. So in order for us to take advantage of Kudu, we're going to have to translate these change tracking operations and get that assigned to the header variable.

     Before we dive into how we do this I wanted to list a couple of gotchas I ran into while using Data Collector with Kudu. Unlike a Hive destination that has the Hive Metadata Processor, the Kudu destination has no equivalent and will not create the table for you. You will have to create the table in advance of kicking off the pipeline or it will fail. Kudu will not accept your datetime data types from SQL Server, you will have to get the Unix epoch representation and store them in a bigint. I will demonstrate how to do this. Make sure the Max Batch Size (Records) on the JDBC origin does not exceed the Mutation Buffer Space (records) on the Kudu Destination or the pipeline will fail.

     Ok, for this example we will use a pipeline that looks like this:


Figure 1. JDBC to Kudu Pipeline

For the JDBC Origin make sure that:

    -The Incremental Mode checkbox is checked
    -Initial Offset is set to 0
    -The Offset Column is set to sys_change_version

The source query is going to pull from this table:


Figure 2. Source System Table

using this query:

DECLARE @offset INT =${offset} 
DECLARE @unixepoch2 datetime2 = '1970-01-01 00:00:00.0000' 
IF (@offset =0 OR @offset IS NULL) 
   SELECT 
        sct.abn_note_id AS abn_note_id, 
        sct.line AS line, 
        sct.cm_log_owner_id AS cm_log_owner_id, 
        sct.cm_phy_owner_id AS cm_phy_owner_id, 
        sct.order_id AS order_id, 
 ((cast (datediff(day,@unixepoch2,sct.abn_check_from_date) AS float) * 86400) + (cast(datediff(millisecond,dateadd(day,datediff(day,@unixepoch2,sct.abn_check_from_date),@unixepoch2),sct.abn_check_from_date) AS float ) / 1000))*1000 AS abn_check_from_date,
        c.sys_change_version AS sys_change_version, 
        'I' AS sys_change_operation 
FROM 
        dbo.abn_orders sct 
CROSS APPLY (
   SELECT 
        coalesce(max(sys_change_version), 0) AS sys_change_version 
    FROM 
        changetable(changes dbo.abn_orders, 0) c) AS c 
ELSE 
   SELECT 
        ct.abn_note_id AS abn_note_id, 
        ct.line AS line, 
        sct.cm_log_owner_id AS cm_log_owner_id,                                            sct.cm_phy_owner_id AS cm_phy_owner_id, 
        sct.order_id AS order_id,
((cast (datediff(day,@unixepoch2,sct.abn_check_from_date) AS float) * 86400) + (cast(datediff(millisecond,dateadd(day,datediff(day,@unixepoch2,sct.abn_check_from_date),@unixepoch2),sct.abn_check_from_date) AS float ) / 1000))*1000 AS abn_check_from_date, 
        sys_change_version AS sys_change_version, 
        sys_change_operation AS sys_change_operation 
  FROM 
        dbo.abn_orders AS sct 
   RIGHT OUTER JOIN changetable(changes dbo.abn_orders, ${offset}) AS ct 
   ON ct.abn_note_id = sct.abn_note_id AND ct.line = sct.line 
  WHERE 
        sys_change_version > ${offset} 
  ORDER BY 
        sys_change_version 

     In this query you can see we are using an if - else statement. The first part of the if is used when the offset is 0, thus triggering an initial load from the table and grabbing the latest offset to store in Data Collector. The reason we have to do this is to protect ourselves if we are loading this data from outside the change tracking retention window, or change tracking was not enabled from the table's inception. After the else is used for delta loading. This will pull data from the table since the last recorded offset. Pro tip...if you are using incremental pipelines make sure to periodically backup /var/lib/sdc/data on your Data Collector server in order to save the offset states of your pipelines. Can definitely come in handy when disaster strikes.

     You will also notice the functions we apply to the abn_check_from_date field. This is to convert the SQL Server datetime to a unix epoch with millisecond precision that we can store in a bigint column over in the Kudu destination table. We can later throw a view on top of this value to display it as a timestamp in Hive/Impala.

     Now that we took care of the JDBC origin, we need to take care of setting the header variable in order to tell Kudu what to do with our record(s) coming through the pipeline. We can accomplish this using a Javascript Evaluator processor. In the script textarea put:

//Loop through columns
for (var i = 0; i < records.length; i++) {
 try {
  //We either delete
  if (records[i].value.sys_change_operation == "D") {
   records[i].attributes['sdc.operation.type'] = '2'
  }
  //Or we upsert
  else {
   records[i].attributes['sdc.operation.type'] = '4'
  }

  // Write record to processor output
  output.write(records[i]);
 } catch (e) {
  // Send record to error
  error.write(records[i], e);
 }

}

     You can see that if the operation is "D" we set the header variable to 2, else we set it to 4 for an upsert. I did that for simplicity sake, you can change this to add cases specifically for inserts and updates if need be.

     That's it! You now have an incremental pipeline that takes advantage of SQL Server Change Tracking and Kudu. The devs over at Streamsets have promised that they will support Change Tracking natively in a future release, but for now this solution will get you by.


No comments:

Post a Comment