Below is a full Azure Data Factory (ADF) project with a sample dataset, code, and detailed steps for creating a pipeline to move data from Azure Blob Storage to an Azure SQL Database, including transformations.
Demonstrate a complete Azure Data Factory (ADF) ETL pipeline that:
- Extracts data from Azure Blob Storage (CSV format)
- Transforms data with validation and enrichment
- Loads data into Azure SQL Database
- Implements monitoring, error handling, and security best practices
- Showcases modern ADF features like Data Flows and pipeline orchestration
Save this dataset as a sample_data.csv file and upload it to an Azure Blob Storage container.
EmployeeID,Name,Department,Salary,HireDate,IsActive,ManagerID
101,John Doe,Engineering,60000,2020-01-15,true,201
102,Jane Smith,Marketing,55000,2021-03-22,true,202
103,Michael Brown,Sales,45000,2019-11-08,true,203
104,Linda White,HR,50000,2020-07-12,true,204
105,Robert Johnson,Engineering,65000,2018-05-03,false,201
106,Sarah Davis,Marketing,52000,2022-01-10,true,202- Azure Subscription: Ensure you have access.
- Azure Blob Storage: Create a storage account, container, and upload the
sample_data.csvfile. - Azure SQL Database:
- Create a table for the data:
CREATE TABLE Employee ( EmployeeID INT PRIMARY KEY, Name NVARCHAR(100) NOT NULL, Department NVARCHAR(50) NOT NULL, Salary DECIMAL(10,2) NOT NULL, HireDate DATE NOT NULL, IsActive BIT NOT NULL DEFAULT 1, ManagerID INT NULL, CreatedDate DATETIME2 DEFAULT GETUTCDATE(), ModifiedDate DATETIME2 DEFAULT GETUTCDATE() ); -- Create index for better query performance CREATE NONCLUSTERED INDEX IX_Employee_Department ON Employee(Department); CREATE NONCLUSTERED INDEX IX_Employee_ManagerID ON Employee(ManagerID);
- Create a table for the data:
- Install Azure Data Factory Studio (available via the Azure Portal).
- https://learn.microsoft.com/en-us/azure-data-studio/download-azure-data-studio?tabs=win-install%2Cwin-user-install%2Credhat-install%2Cwindows-uninstall%2Credhat-uninstall#download-azure-data-studio
- Navigate to the Azure Portal.
- Search for Data Factory and click Create.
- Fill in:
- Subscription: Select your subscription.
- Resource Group: Create or select one.
- Region: Choose a nearby region.
- Data Factory Name: Provide a unique name.
- Click Review + Create, then Create.
- Security Configuration:
- Enable Managed Identity for the Data Factory
- Configure Azure Key Vault for storing connection strings securely
- Set up Private Endpoints if required for enhanced security
- Use Managed Identity: Enable system-assigned managed identity for ADF
- Key Vault Integration: Store sensitive information in Azure Key Vault
- Network Security: Configure firewall rules and private endpoints
- RBAC: Assign appropriate roles (Data Factory Contributor, etc.)
- Monitoring: Enable diagnostic logging and alerts
Linked Services are used to connect ADF to external resources.
- In ADF Studio, go to Manage > Linked Services.
- Click + New and select Azure Key Vault.
- Configure:
- Azure subscription: Select your subscription
- Azure key vault name: Select your Key Vault
- Authentication method: Use Managed Identity
- Test the connection and save as
AzureKeyVaultLS.
- Click + New and select Azure Blob Storage.
- Configure:
- Account Selection Method: From Azure subscription
- Storage Account Name: Select your storage account
- Authentication method: Use Managed Identity (recommended)
- Test the connection and save as
AzureBlobStorageLS.
- Create another Linked Service for Azure SQL Database.
- Configure:
- Server Name: Enter the server FQDN
- Database Name: Enter the database name
- Authentication Type: Azure SQL Database authentication
- Username: Reference from Key Vault:
@{linkedService().AzureKeyVaultLS.secretName('sql-username')} - Password: Reference from Key Vault:
@{linkedService().AzureKeyVaultLS.secretName('sql-password')}
- Test the connection and save as
AzureSQLDatabaseLS.
Datasets represent the data structure in the source and destination.
- Go to Author > Datasets, click + New Dataset.
- Select Azure Blob Storage and DelimitedText.
- Configure:
- Linked Service: Select
AzureBlobStorageLS - File Path: Container:
data-container, File:sample_data.csv - Column delimiter: Comma (,)
- Row delimiter: Default (\r\n or \n)
- Enable First Row as Header
- Schema: Import schema to validate data structure
- Linked Service: Select
- Preview Data to ensure correct parsing
- Save as
BlobInputDataset.
- Add another dataset for the Azure SQL Database.
- Select Azure SQL Database and configure:
- Linked Service: Select
AzureSQLDatabaseLS - Table Name:
dbo.Employee(include schema prefix) - Import Schema: Yes, to validate table structure
- Linked Service: Select
- Test Connection to ensure table accessibility
- Save as
SQLSinkDataset.
- In Author > Pipelines, click + New Pipeline.
- Name the pipeline
EmployeeDataProcessing. - Add Variables for dynamic configuration:
SourceContainer: String, default value: "data-container"ProcessedDate: String, default value:@utcnow()
- Drag and drop the Copy Data activity onto the canvas.
- Name it
CopyEmployeeData. - Configure the Source tab:
- Dataset: Select
BlobInputDataset - Data Consistency Verification: Enable checksum validation
- Dataset: Select
- Configure the Sink tab:
- Dataset: Select
SQLSinkDataset - Write Behavior: Upsert (recommended for data updates)
- Key Columns: EmployeeID
- Dataset: Select
- Configure Mapping:
- Auto-map columns or manually map:
EmployeeID → EmployeeIDName → NameDepartment → DepartmentSalary → SalaryHireDate → HireDateIsActive → IsActiveManagerID → ManagerID
- Auto-map columns or manually map:
- Create a Mapping Data Flow for advanced transformations:
- Add Data Flow activity to pipeline
- Create transformations: Filter active employees, validate data, add audit columns
- Validation Activity: Add before Copy Data
- Validate source file exists and has data
- Check SQL Database connectivity
- Try-Catch Logic:
- Wrap Copy Data in Try block
- Add Catch block with:
- Web Activity to send failure notifications
- Stored Procedure to log errors
-
Add Data Validation:
- Lookup Activity: Count source records
- If Condition: Proceed only if records > 0
- Data Flow: Add data quality checks (null validation, format validation)
-
Add Audit Logging:
- Create audit table in SQL Database
- Log pipeline start/end times, record counts, status
- Pipeline Validation: Click Validate All to check for errors
- Debug Mode: Click Debug to test the pipeline
- Monitor Progress: Watch the Output window and activity details
- Data Verification: Query destination table to verify data accuracy
-
Pre-Publish Validation:
- Run Validate All to check for any issues
- Test in Debug mode one final time
-
Publish: Click Publish All to save changes to the service
-
Create Triggers:
Manual Trigger:
- Use Trigger Now for ad-hoc runs
Schedule Trigger:
{ "name": "DailyEmployeeSync", "type": "ScheduleTrigger", "typeProperties": { "recurrence": { "frequency": "Day", "interval": 1, "startTime": "2024-01-01T06:00:00Z", "timeZone": "UTC" } } }Event-Based Trigger (when new file uploaded):
{ "name": "FileUploadTrigger", "type": "BlobEventsTrigger", "typeProperties": { "blobPathBeginsWith": "/data-container/employee", "blobPathEndsWith": ".csv", "ignoreEmptyBlobs": true, "events": ["Microsoft.Storage.BlobCreated"] } }
{
"name": "EmployeeDataProcessing",
"properties": {
"parameters": {
"SourceContainer": {
"type": "string",
"defaultValue": "data-container"
},
"ProcessDate": {
"type": "string",
"defaultValue": "@utcnow('yyyy-MM-dd')"
}
},
"variables": {
"RecordCount": {
"type": "Integer",
"defaultValue": 0
}
},
"activities": [
{
"name": "ValidateSourceFile",
"type": "Validation",
"typeProperties": {
"dataset": {
"referenceName": "BlobInputDataset",
"type": "DatasetReference"
},
"timeout": "0.00:05:00",
"sleep": 30,
"minimumSize": 1
}
},
{
"name": "CopyEmployeeData",
"type": "Copy",
"dependsOn": [
{
"activity": "ValidateSourceFile",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": false,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings",
"skipLineCount": 0
}
},
"sink": {
"type": "AzureSqlSink",
"preCopyScript": "TRUNCATE TABLE dbo.Employee_Staging",
"writeBehavior": "upsert",
"upsertSettings": {
"useTempDB": true,
"keys": ["EmployeeID"]
},
"sqlWriterUseTableLock": false,
"tableOption": "autoCreate"
},
"enableStaging": false,
"dataIntegrationUnits": 4
},
"inputs": [
{
"referenceName": "BlobInputDataset",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "SQLSinkDataset",
"type": "DatasetReference"
}
]
},
{
"name": "LogPipelineExecution",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "CopyEmployeeData",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"storedProcedureName": "sp_LogPipelineExecution",
"storedProcedureParameters": {
"PipelineName": {
"value": "@pipeline().Pipeline",
"type": "String"
},
"RunId": {
"value": "@pipeline().RunId",
"type": "String"
},
"RecordsProcessed": {
"value": "@activity('CopyEmployeeData').output.rowsCopied",
"type": "Int32"
},
"Status": {
"value": "Success",
"type": "String"
}
}
},
"linkedServiceName": {
"referenceName": "AzureSQLDatabaseLS",
"type": "LinkedServiceReference"
}
}
]
}
}{
"name": "BlobInputDataset",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorageLS",
"type": "LinkedServiceReference"
},
"parameters": {
"Container": {
"type": "string",
"defaultValue": "data-container"
},
"FileName": {
"type": "string",
"defaultValue": "sample_data.csv"
}
},
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"container": {
"value": "@dataset().Container",
"type": "Expression"
},
"fileName": {
"value": "@dataset().FileName",
"type": "Expression"
}
},
"columnDelimiter": ",",
"rowDelimiter": "\n",
"firstRowAsHeader": true,
"quoteChar": "\"",
"escapeChar": "\\"
},
"schema": [
{
"name": "EmployeeID",
"type": "String"
},
{
"name": "Name",
"type": "String"
},
{
"name": "Department",
"type": "String"
},
{
"name": "Salary",
"type": "String"
},
{
"name": "HireDate",
"type": "String"
},
{
"name": "IsActive",
"type": "String"
},
{
"name": "ManagerID",
"type": "String"
}
]
}
}{
"name": "SQLSinkDataset",
"properties": {
"linkedServiceName": {
"referenceName": "AzureSQLDatabaseLS",
"type": "LinkedServiceReference"
},
"parameters": {
"TableName": {
"type": "string",
"defaultValue": "Employee"
},
"SchemaName": {
"type": "string",
"defaultValue": "dbo"
}
},
"type": "AzureSqlTable",
"typeProperties": {
"schema": {
"value": "@dataset().SchemaName",
"type": "Expression"
},
"table": {
"value": "@dataset().TableName",
"type": "Expression"
}
},
"schema": [
{
"name": "EmployeeID",
"type": "int",
"precision": 10
},
{
"name": "Name",
"type": "nvarchar",
"precision": 100
},
{
"name": "Department",
"type": "nvarchar",
"precision": 50
},
{
"name": "Salary",
"type": "decimal",
"precision": 10,
"scale": 2
},
{
"name": "HireDate",
"type": "date"
},
{
"name": "IsActive",
"type": "bit"
},
{
"name": "ManagerID",
"type": "int",
"precision": 10
},
{
"name": "CreatedDate",
"type": "datetime2",
"scale": 7
},
{
"name": "ModifiedDate",
"type": "datetime2",
"scale": 7
}
]
}
}-
Azure Monitor Integration:
- Enable diagnostic settings for Data Factory
- Configure Log Analytics workspace
- Set up custom dashboards
-
Alert Rules:
{ "alertName": "Pipeline Failure Alert", "condition": "PipelineRuns | where Status == 'Failed'", "threshold": "Greater than 0", "actionGroup": "DataFactoryAlerts" } -
Performance Monitoring:
- Track pipeline duration trends
- Monitor data throughput
- Set up cost alerts
-
Common Issues:
- Authentication failures: Check managed identity permissions
- Data type mismatches: Verify schema mapping
- Connection timeouts: Check network configuration
- Performance issues: Optimize data integration units (DIUs)
-
Diagnostic Queries:
// Failed pipeline runs in last 24 hours PipelineRuns | where TimeGenerated > ago(24h) | where Status == "Failed" | project TimeGenerated, PipelineName, RunId, ErrorMessage
- Aggregate: Calculate department-wise salary summaries
- Conditional Split: Separate active/inactive employees
- Derived Column: Add calculated fields (tenure, salary grade)
- Filter: Remove test data or apply business rules
- Use Azure DevOps or GitHub Actions for pipeline deployment
- Implement environment-specific configurations (Dev/Test/Prod)
- Version control ADF artifacts using Git integration
- Environment-specific parameters
- Dynamic file paths with date patterns
- Configurable retry policies
- External configuration management
- Private endpoints for all data sources
- Customer-managed keys for encryption
- Network isolation with VNet integration
- Regular access reviews and least privilege principles
- Right-size integration runtime
- Use Auto-pause for Spark clusters
- Implement data lifecycle policies
- Monitor and optimize DIU usage
CREATE TABLE dbo.PipelineExecutionLog (
LogID BIGINT IDENTITY(1,1) PRIMARY KEY,
PipelineName NVARCHAR(100) NOT NULL,
RunId NVARCHAR(50) NOT NULL,
StartTime DATETIME2 DEFAULT GETUTCDATE(),
EndTime DATETIME2 NULL,
Status NVARCHAR(20) NOT NULL,
RecordsProcessed INT NULL,
ErrorMessage NVARCHAR(MAX) NULL,
CreatedDate DATETIME2 DEFAULT GETUTCDATE()
);
GO
-- Stored procedure for logging
CREATE PROCEDURE sp_LogPipelineExecution
@PipelineName NVARCHAR(100),
@RunId NVARCHAR(50),
@RecordsProcessed INT = NULL,
@Status NVARCHAR(20) = 'Success',
@ErrorMessage NVARCHAR(MAX) = NULL
AS
BEGIN
INSERT INTO dbo.PipelineExecutionLog
(PipelineName, RunId, Status, RecordsProcessed, ErrorMessage, EndTime)
VALUES
(@PipelineName, @RunId, @Status, @RecordsProcessed, @ErrorMessage, GETUTCDATE());
END;-- View for data quality metrics
CREATE VIEW vw_EmployeeDataQuality AS
SELECT
Department,
COUNT(*) as TotalEmployees,
COUNT(CASE WHEN IsActive = 1 THEN 1 END) as ActiveEmployees,
AVG(Salary) as AvgSalary,
MIN(HireDate) as EarliestHireDate,
MAX(HireDate) as LatestHireDate
FROM dbo.Employee
GROUP BY Department;-
Data Integration Units (DIUs):
- Start with 4 DIUs for small datasets
- Scale up to 32+ DIUs for large datasets (>1GB)
- Monitor performance and adjust accordingly
-
Parallel Processing:
- Use degree of copy parallelism for large tables
- Implement data partitioning strategies
- Consider using multiple copy activities for very large datasets
-
Staging and Compression:
- Enable staging for better performance with large datasets
- Use appropriate compression (gzip, snappy) for blob storage
- Consider Parquet format for analytical workloads
-
Incremental Loading:
- Implement change data capture (CDC) patterns
- Use watermark columns for incremental sync
- Schedule regular full refreshes for data integrity
This comprehensive project demonstrates creating a production-ready Azure Data Factory pipeline with security, monitoring, error handling, and performance optimization. The solution follows Azure best practices and includes all necessary components for enterprise-grade data integration.