Azure Data Factory is a cloud-based service that ingests data from multiple sources, transforms the data using MapReduce & machine learning and publishes the output to visualisation tools & BI applications.

With Data Factory, organisations can utilise a single orchestrating engine for moving and transforming big data. Since the service is based in Azure, it is offered as a managed service by Microsoft with an SLA of 99.9% uptime. With no upfront costs and an affordable pricing structure based on utilisation, companies can make use of an opex model instead of a capex one to facilitate their ever-increasing needs for big data processing.

 

This post explores the capabilities of the Data Factory and demonstrates an end-to-end analytics solution which performs the following tasks:

  • Ingesting a CSV file containing sales records from Blob Storage.
  • Running a MapReduce operation (using HiveQL) in an HDInsight Hadoop cluster to aggregate the sales records by year, quarter, product category and subcategory.
  • Loading the output dataset containing the aggregated records into a SQL Database.
  • Visualising the output using Power BI.

Data Ingestion & Transformation

The following tasks need to be performed in the Data Factory to ingest and process the data. The exercise makes use of JSON snippets that are uploaded via the Azure Portal, but these operations can also be performed using other tools such as Visual Studio & PowerShell.

Prerequisites

To perform the steps described below, an Azure Data Factory, an Azure Blob Storage and an HDInsight Hadoop cluster need to be provisioned using an Azure subscription. A free trial for this is currently available (Link). The output is visualised using Power BI, however any other visualisation tool such as Tableau can also be used.

 

Data Factory supports ingesting data from a range of platforms (View the full list here). When data is to be sourced from on-premises data stores, the Data Management Gateway provides a secure channel for moving the data into the cloud. In this example, we will make use of Azure Blob Storage and ingest a CSV file.

 

The file used in this exercise contains around 15,000 sales records. The snapshot below shows the format of the file along with a few sample values.

Data Ingestion & Transformation

The following tasks need to be performed in the Data Factory to ingest and process the data. The exercise makes use of JSON snippets that are uploaded via the Azure Portal, but these operations can also be performed using other tools such as Visual Studio & PowerShell.

Creating Linked Services

A Linked Service in the Data Factory establishes the connection to the underlying data source. We need to create linked services to connect to Blob Storage (for input & output CSV files), the HDInsight Hadoop cluster and the SQL database.

 

Using the following JSON template, a Linked Service can be created to point to an existing Blob Storage account. The highlighted elements are specific to the Data Factory Hub and the Blob Storage account used in this exercise and would need to be changed as needed.

{
“name”: “AzureStorageLinkedService”,
“properties”: {
“description”: “Azure BLOB Storage”,
“hubName”: “ajdf_hub”,
“type”: “AzureStorage”,
“typeProperties”: {
“connectionString”: “DefaultEndpointsProtocol=https;AccountName=ajst;AccountKey=**********”
}
}
}

A Linked Service for the HDInsight Hadoop cluster can be created using the snippet below. The highlighted elements need to be changed as needed.

{
“name”: “HDInsightLinkedService”,
“properties”: {
“description”: “Hadoop Cluster”,
“hubName”: “ajdf_hub”,
“type”: “HDInsight”,
“typeProperties”: {
“clusterUri”: “https://ajhdi.azurehdinsight.net/”,
“userName”: “admin”,
“password”: “**********”,
“linkedServiceName”: “AzureStorageLinkedService”
}
}
}

A Linked Service for the SQL Database can be created using the snippet below. The highlighted elements need to be changed as needed.
{
“name”: “AzureSqlLinkedService”,
“properties”: {
“description”: “SQL Db”,
“hubName”: “ajdf_hub”,
“type”: “AzureSqlDatabase”,
“typeProperties”: {
“connectionString”: “Data Source=tcp:ajsqlsvr.database.windows.net,1433;Initial Catalog=ajsqldb;Integrated Security=False;User ID=ajitan14@ajsqlsvr;Password=**********;Connect Timeout=30;Encrypt=True”
}
}
}

Creating Datasets

A Dataset in the Data Factory is configured to map the structure of the input dataset and to set details regarding frequency of availability, storage location, etc. We need to create Datasets for the input CSV file containing the sales records, the output CSV file containing the aggregated sales records and the SQL table which will store the aggregated data for visualisation.

 

The JSON configuration for the Input Dataset is shown below. Highlighted elements need to be changed as needed. The availability of the file has been set to monthly.

{
“name”: “AzureBlobInput”,
“properties”: {
“structure”: [
{
“name”: “Category”,
“type”: “String”
},
{
“name”: “SubCategory”,
“type”: “String”
},
{
“name”: “YearNm”,
“type”: “String”
},
{
“name”: “QuarterNm”,
“type”: “Int32”
},
{
“name”: “MonthNm”,
“type”: “String”
},
{
“name”: “DayNo”,
“type”: “Int32”
},
{
“name”: “Amount”,
“type”: “Decimal”
}
],
“published”: false,
“type”: “AzureBlob”,
“linkedServiceName”: “AzureStorageLinkedService”,
“typeProperties”: {
“fileName”: “Sales.csv”,
“folderPath”: “ajstbl/Input”,
“format”: {
“type”: “TextFormat”,
“rowDelimiter”: “\n”,
“columnDelimiter”: “,”
}
},
“availability”: {
“frequency”: “Month”,
“interval”: 1
},
“external”: true,
“policy”: {}
}
}

The JSON configuration for the Output Dataset is shown below. Highlighted elements need to be changed as needed.

{
“name”: “AzureBlobOutput”,
“properties”: {
“structure”: [
{
“name”: “YearNm”,
“type”: “String”
},
{
“name”: “QuarterNm”,
“type”: “Int32”
},
{
“name”: “Category”,
“type”: “String”
},
{
“name”: “SubCategory”,
“type”: “String”
},
{
“name”: “TotalAmount”,
“type”: “Decimal”
}
],
“published”: false,
“type”: “AzureBlob”,
“linkedServiceName”: “AzureStorageLinkedService”,
“typeProperties”: {
“folderPath”: “ajstbl/Output”,
“format”: {
“type”: “TextFormat”,
“rowDelimiter”: “\n”,
“columnDelimiter”: “,”
}
},
“availability”: {
“frequency”: “Month”,
“interval”: 1
}
}
}

The JSON configuration for the SQL Table’s Dataset is shown below. Highlighted element (name of the table in the database) needs to be changed as needed.

{
“name”: “AzureSQLDataset”,
“properties”: {
“structure”: [
{
“name”: “YearNm”,
“type”: “String”
},
{
“name”: “QuarterNm”,
“type”: “Int32”
},
{
“name”: “Category”,
“type”: “String”
},
{
“name”: “SubCategory”,
“type”: “String”
},
{
“name”: “TotalAmount”,
“type”: “Decimal”
}
],
“published”: false,
“type”: “AzureSqlTable”,
“linkedServiceName”: “AzureSqlLinkedService”,
“typeProperties”: {
“tableName”: “SalesAggregated
},
“availability”: {
“frequency”: “Month”,
“interval”: 1
}
}
}

The create table statement to be used on the SQL Database is shown below. Table name can be changed as desired, but this should be consistent with the JSON configuration for the Dataset.

CREATE TABLE [dbo].[SalesAggregated](
[YearNm] [varchar](25) NULL,
[QuarterNm] [varchar](25) NULL,
[Category] [varchar](50) NULL,
[SubCategory] [varchar](50) NULL,
[TotalAmount] [decimal](18, 2) NULL
)

Building a HiveQL Script

The ingested sales records will be processed by a HiveQL script, which will run a MapReduce operation and aggregate the sales amounts. The output will be saved as an external table on Blob Storage. The parameters for the input and output table locations will be set in the Pipeline’s configuration (described in the next section). Once built, the script needs to be uploaded to Azure Storage so that it can be referenced in the Pipeline.

DROP TABLE IF EXISTS SalesRecords;

CREATE TABLE SalesRecords (
Category string,
SubCategory string,
YearNm string,
QuarterNm string,
MonthNm string,
DayNo int,
Amount decimal
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
LINES TERMINATED BY ‘\n’;

LOAD DATA INPATH ‘${hiveconf:inputtable}’ OVERWRITE INTO TABLE SalesRecords;

DROP TABLE IF EXISTS SalesAggregated;

CREATE EXTERNAL TABLE SalesAggregated (
YearNm string,
QuarterNm string,
Category string,
SubCategory string,
TotalAmount decimal
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE
LOCATION ‘${hiveconf:outputtable}’;

INSERT INTO TABLE SalesAggregated
SELECT
YearNm,
QuarterNm,
Category,
SubCategory,
SUM(Amount) AS TotalAmount
FROM
SalesRecords
GROUP BY
YearNm,
QuarterNm,
Category,
SubCategory;

Creating the Pipeline

The Pipeline in the Data Factory performs the activities of moving and/or transforming the data. In this exercise, the Pipeline contains 2 activities – running a MapReduce operation on the data contained in the input CSV file and copying the data from the output CSV file to a SQL database.

 

Data Factory facilitates transforming the ingested data using many technologies. The full list is available here. In this exercise, an HDInsight Hadoop cluster is used to run a Hive activity.

 

The Pipeline has been scheduled to execute immediately after it is deployed (achieved by setting pipelineMode to ‘Scheduled’ and start & end times in the past). Highlighted elements, including the location of the HiveQL script described in the previous section, need to be changed as needed.

{
“name”: “ProcessSalesRecords”,
“properties”: {
“description”: “This pipeline aggregates the sales records”,
“activities”: [
{
“type”: “HDInsightHive”,
“typeProperties”: {
“scriptPath”: “ajstbl/Scripts/HiveQL_Script.hql”,
“scriptLinkedService”: “AzureStorageLinkedService”,
“defines”: {
“inputtable”: “wasbs://ajstbl@ajst.blob.core.windows.net/Input”,
“outputtable”: “wasbs://ajstbl@ajst.blob.core.windows.net/Output
}
},
“inputs”: [
{
“name”: “AzureBlobInput”
}
],
“outputs”: [
{
“name”: “AzureBlobOutput”
}
],
“policy”: {
“concurrency”: 1,
“retry”: 1
},
“scheduler”: {
“frequency”: “Month”,
“interval”: 1
},
“name”: “HiveActivity”,
“linkedServiceName”: “HDInsightLinkedService”
},
{
“type”: “Copy”,
“typeProperties”: {
“source”: {
“type”: “BlobSource”
},
“sink”: {
“type”: “SqlSink”,
“writeBatchSize”: 1000,
“writeBatchTimeout”: “00:30:00”
}
},
“inputs”: [
{
“name”: “AzureBlobOutput”
}
],
“outputs”: [
{
“name”: “AzureSQLDataset”
}
],
“policy”: {
“timeout”: “1.00:00:00”,
“concurrency”: 1,
“executionPriorityOrder”: “NewestFirst”
},
“scheduler”: {
“frequency”: “Month”,
“interval”: 1
},
“name”: “CopyActivity”
}
],
“start”: “2015-07-01T00:00:00Z”,
“end”: “2015-07-02T00:00:00Z”,
“isPaused”: false,
“hubName”: “ajdf_hub”,
“pipelineMode”: “Scheduled”
}
}

Diagram

If the components described above have been configured as expected, the Diagram in the Data Factory should look like this (the pipeline has been expanded):

Reviewing the Output

Once the Pipeline is deployed, it will start executing immediately. Data Factory enables extensive monitoring which can be used to review the process. Details on this are available here.

 

When the Output CSV file gets generated, it can be viewed in the Storage folder as shown below. Once this file gets generated, Data Factory will kick off the Copy activity. When this process completes, data should be available in the destination table in the SQL database.

Visualising the Data

The result set can be viewed and analysed in any visualisation tool or BI application. A sample report built in Power BI is shown below:

Conclusion

Azure Data Factory is a comprehensive orchestration platform for use in Cloud Analytics. It can be used to source data from on-premises & cloud-based data repositories and transform it using a wide range of processing methodologies. The output can also be stored in a host of supported platforms.

 

As the example described in this blog post shows, configuring the process is intuitive. Managing a deployed solution is well supported through monitoring & alerting and support for automation through PowerShell. Not only does the Data Factory provide the framework for moving and transforming the data, it also gives an organisation detailed information on data lineage.

About the author

Ajit is an Ambassador for Microsoft technologies at Servian. He has over a decade of experience in delivering Business Intelligence and Analytics solutions using Microsoft and other technologies across several industries including Equipment Rental, Financial Services and Telecommunications. He holds a Masters degree in Information Technology from UTS in Sydney and the Specialist in Big Data Analytics Solutions certification. He currently specialises in Cloud Analytics solutions using Microsoft Azure.
Ajit Ananthram