Data Factory Ingestion Framework: Part 1 – Schema Loader

Use Case

A business wants to utilize cloud technology to enable data science and augment data warehousing by staging and prepping data in a data lake. There are multiple different systems we want to pull from, both in terms of system types and instances of those types.

  • Source type example: SQL Server, Oracle, Teradata, SAP Hana, Azure SQL, Flat Files ,etc.

The amount of manual coding effort this would take could take months of development hours using multiple resources.

We need a way to ingest data by source type. In other words, the scope of development we want to remain in is only adding a new source type.

Introduction

Over the course of the next four blogs, I will highlight the architecture for building an automated ingestion framework so that you only ever have to utilize two data factory pipelines and have configuration ability down to an individual dataset. This is meta-data driven approach.

  • Part 1: The Schema Loader: Ingest source schema\meta-data by system type
  • Part 2: The Metadata Model: built using Data Vault – this is the secret behind the solution
  • Part 3: The Data Loader: This will read from the Metadata Model and copy data by source type
  • Part 4: Data Processing: The Metadata Model will also feed the processing architecture using Apache Spark and Scala. Consider this ETL automation at scale

The Schema Loader

The purpose of the Schema Loader has a single job which is to extract schema data from a given source system type, rather that be SQL Server, SAP Hana, Teradata, a File System, or any other Linked Service type provided by Data Factory.

Activities

  1. Get-SourceSystemList: Get a list of distinct source systems\databases to extract schema from. Must have system type column. For the column, align with Data Factory’s naming convention for types: sql, asql, sapHana, etc.
  2. Truncate-Target: Run a stored procedure that truncates all staging tables. You will have 1 staging table per source type
  3. Extract-Metadata: Foreach activity that will have 1 ifCondition per source type. Within the ifCondition activity will live a copy activity whose purpose is to hold the specific schema extract query per source type and copy that data to it’s designated staging table within the metadata model. These must use dynamic linked services so that we can have only 1 copy activity per source type. Data that we are pulling is items like table names, column names, data types, constraints, etc.
  4. Load-Model: Run a stored procedure that loads the data from the staging tables into the metadata model. *We have not yet reviewed the metadata model, subscribe to keep up with blogs.

Activity: Extract-Metadata example:

Inside the Foreach activity of Extract-Metadata we first have a list of If Conditions activities. One per source type. For this example we have Azure SQL Server, and On-prem SQL Server. Source types follow native connectors already built in Azure Data Factory. Because we are passing in a list of source systems, we can simply use the expression: @equals(item().LinkedService_Type,’sql’) to route type information to it’s given type activity.

Copy Metadata Example

Here you can see we are also passing in the associated Server and Database per source system configuration record. An example of a SQL Server metadata extract is also included below the screenshot.

DECLARE @version VARCHAR(1000) =

(

SELECT @@version

);

DECLARE @Tables TABLE

(

sql_ServerName NVARCHAR(128),

sql_TableSchema_type NVARCHAR(128),

sql_TableName_Full NVARCHAR(256),

sql_Table_Catalog NVARCHAR(128),

sql_Table_Schema NVARCHAR(128),

sql_Table_Name NVARCHAR(128),

sql_Column_Name NVARCHAR(128),

sql_Ordinal_Position INT,

sql_Is_Nullable VARCHAR(3),

sql_Data_Type NVARCHAR(128),

sql_Character_Maximum_Length INT,

sql_isPrimaryKey INT

);

INSERT INTO @Tables

(

sql_ServerName ,

sql_TableSchema_type ,

sql_TableName_Full ,

sql_TABLE_CATALOG,

sql_TABLE_SCHEMA ,

sql_TABLE_NAME ,

sql_Column_Name ,

sql_Ordinal_Position ,

sql_Is_Nullable ,

sql_Data_Type ,

sql_Character_Maximum_Length,

sql_isPrimaryKey

)

SELECT DISTINCT

@@SERVERNAME src_ServerName,

t.type_desc AS src_TableSchema_type,

‘[‘ + s.name + ‘]’ + ‘.[‘ + t.name + ‘]’ COLLATE DATABASE_DEFAULT AS src_TableName_Full,

c.TABLE_CATALOG,

c.TABLE_SCHEMA,

c.TABLE_NAME,

c.COLUMN_NAME,

c.ORDINAL_POSITION,

c.IS_NULLABLE,

c.DATA_TYPE,

c.CHARACTER_MAXIMUM_LENGTH,

ISNULL(CAST(ix.is_primary_key AS INT), 0) is_primary_key

FROM

(

SELECT name,

type_desc,

schema_id,

t.object_id

FROM sys.tables t

UNION

SELECT name,

type_desc,

schema_id,

v.object_id

FROM sys.views v

) t

JOIN sys.schemas s

ON t.schema_id = s.schema_id

JOIN INFORMATION_SCHEMA.COLUMNS c

ON c.TABLE_NAME = t.name COLLATE DATABASE_DEFAULT

AND c.TABLE_SCHEMA = s.name

JOIN sys.columns co

ON co.object_id = t.object_id

AND co.name = c.COLUMN_NAME

JOIN sys.types ty

ON co.user_type_id = ty.user_type_id

LEFT JOIN

(

SELECT ic.object_id,

ic.index_id,

ic.index_column_id,

ic.column_id,

i.is_primary_key

FROM sys.index_columns ic

JOIN sys.indexes i

ON i.index_id = ic.index_id

AND i.object_id = ic.object_id

WHERE i.is_primary_key = 1

) ix

ON ix.object_id = co.object_id

AND ix.column_id = co.column_id

WHERE c.TABLE_CATALOG NOT IN ( ‘master’, ‘tempdb’, ‘msdb’, ‘model’ )

SELECT * FROM @Tables ;

Linked Services

In order for the copy activity in step 3 above to accomplish it’s task of only have 1 copy activity per source type, we must use dynamic linked services. I created this slide to help understand how dynamic linked services work. I will go over in detail in a later blog.