Skip to content

Commit

Permalink
Optimize ingestion pipeline files handling (microsoft#1017)
Browse files Browse the repository at this point in the history
  • Loading branch information
flanakin authored Sep 30, 2024
1 parent d4549e8 commit 21b6612
Showing 1 changed file with 183 additions and 183 deletions.
366 changes: 183 additions & 183 deletions src/templates/finops-hub/modules/dataFactory.bicep
Original file line number Diff line number Diff line change
Expand Up @@ -2394,6 +2394,187 @@ resource pipeline_ToIngestion 'Microsoft.DataFactory/factories/pipelines@2018-06
errorCode: 'SchemaLoadFailed'
}
}
{ // Convert to Parquet
name: 'Convert to Parquet'
description: 'Convert CSV to parquet and move the file to the ${ingestionContainerName} container.'
type: 'Switch'
dependsOn: [
{
activity: 'Load Schema Mappings'
dependencyConditions: [
'Succeeded'
]
}
]
userProperties: []
typeProperties: {
on: {
value: '@last(array(split(replace(pipeline().parameters.blobPath, \'.csv.gz\', \'.csv_gz\'), \'.\')))'
type: 'Expression'
}
cases: [
{
value: 'csv'
activities: [
{
name: 'Convert CSV File'
type: 'Copy'
dependsOn: []
policy: {
timeout: '0.00:10:00'
retry: 0
retryIntervalInSeconds: 30
secureOutput: false
secureInput: false
}
userProperties: []
typeProperties: {
source: {
type: 'DelimitedTextSource'
additionalColumns: {
type: 'Expression'
value: '@activity(\'Load Schema Mappings\').output.firstRow.additionalColumns'
}
storeSettings: {
type: 'AzureBlobFSReadSettings'
recursive: true
enablePartitionDiscovery: false
}
formatSettings: {
type: 'DelimitedTextReadSettings'
}
}
sink: {
type: 'ParquetSink'
storeSettings: {
type: 'AzureBlobFSWriteSettings'
}
formatSettings: {
type: 'ParquetWriteSettings'
fileExtension: '.parquet'
}
}
enableStaging: false
parallelCopies: 1
validateDataConsistency: false
translator: {
value: '@activity(\'Load Schema Mappings\').output.firstRow.translator'
type: 'Expression'
}
}
inputs: [
{
referenceName: dataset_msexports.name
type: 'DatasetReference'
parameters: {
blobPath: {
value: '@pipeline().parameters.blobPath'
type: 'Expression'
}
}
}
]
outputs: [
{
referenceName: dataset_ingestion.name
type: 'DatasetReference'
parameters: {
blobPath: {
value: '@concat(pipeline().parameters.destinationFolder, \'/\', pipeline().parameters.destinationFile)'
type: 'Expression'
}
}
}
]
}
]
}
{
value: 'parquet'
activities: [
{
name: 'Move Parquet File'
type: 'Copy'
dependsOn: []
policy: {
timeout: '0.00:05:00'
retry: 0
retryIntervalInSeconds: 30
secureOutput: false
secureInput: false
}
userProperties: []
typeProperties: {
source: {
type: 'ParquetSource'
storeSettings: {
type: 'AzureBlobFSReadSettings'
recursive: true
enablePartitionDiscovery: false
}
formatSettings: {
type: 'ParquetReadSettings'
}
}
sink: {
type: 'ParquetSink'
storeSettings: {
type: 'AzureBlobFSWriteSettings'
}
formatSettings: {
type: 'ParquetWriteSettings'
fileExtension: '.parquet'
}
}
enableStaging: false
parallelCopies: 1
validateDataConsistency: false
}
inputs: [
{
referenceName: dataset_msexports_parquet.name
type: 'DatasetReference'
parameters: {
blobPath: {
value: '@pipeline().parameters.blobPath'
type: 'Expression'
}
}
}
]
outputs: [
{
referenceName: dataset_ingestion.name
type: 'DatasetReference'
parameters: {
blobPath: {
value: '@concat(pipeline().parameters.destinationFolder, \'/\', pipeline().parameters.destinationFile)'
type: 'Expression'
}
}
}
]
}
]
}
]
defaultActivities: [
{
name: 'Unsupported File Type'
type: 'Fail'
dependsOn: []
userProperties: []
typeProperties: {
message: {
value: '@concat(\'Unable to ingest the specified export file because the file type is not supported. File: \', pipeline().parameters.blobPath)'
type: 'Expression'
}
errorCode: 'UnsupportedExportFileType'
}
}
]
}
}
{ // Get Existing Parquet Files
name: 'Get Existing Parquet Files'
description: 'Get the previously ingested files so we can remove any older data. This is necessary to avoid data duplication in reports.'
Expand Down Expand Up @@ -2457,7 +2638,7 @@ resource pipeline_ToIngestion 'Microsoft.DataFactory/factories/pipelines@2018-06
type: 'ForEach'
dependsOn: [
{
activity: 'Load Schema Mappings'
activity: 'Convert to Parquet'
dependencyConditions: [
'Succeeded'
]
Expand Down Expand Up @@ -2495,7 +2676,7 @@ resource pipeline_ToIngestion 'Microsoft.DataFactory/factories/pipelines@2018-06
type: 'DatasetReference'
parameters: {
blobPath: {
value: '@item()'
value: '@concat(pipeline().parameters.destinationFolder, \'/\', item().name)'
type: 'Expression'
}
}
Expand Down Expand Up @@ -2606,187 +2787,6 @@ resource pipeline_ToIngestion 'Microsoft.DataFactory/factories/pipelines@2018-06
]
}
}
{ // Convert to Parquet
name: 'Convert to Parquet'
description: 'Convert CSV to parquet and move the file to the ${ingestionContainerName} container.'
type: 'Switch'
dependsOn: [
{
activity: 'For Each Old File'
dependencyConditions: [
'Succeeded'
]
}
]
userProperties: []
typeProperties: {
on: {
value: '@last(array(split(replace(pipeline().parameters.blobPath, \'.csv.gz\', \'.csv_gz\'), \'.\')))'
type: 'Expression'
}
cases: [
{
value: 'csv'
activities: [
{
name: 'Convert CSV File'
type: 'Copy'
dependsOn: []
policy: {
timeout: '0.00:05:00'
retry: 0
retryIntervalInSeconds: 30
secureOutput: false
secureInput: false
}
userProperties: []
typeProperties: {
source: {
type: 'DelimitedTextSource'
additionalColumns: {
type: 'Expression'
value: '@activity(\'Load Schema Mappings\').output.firstRow.additionalColumns'
}
storeSettings: {
type: 'AzureBlobFSReadSettings'
recursive: true
enablePartitionDiscovery: false
}
formatSettings: {
type: 'DelimitedTextReadSettings'
}
}
sink: {
type: 'ParquetSink'
storeSettings: {
type: 'AzureBlobFSWriteSettings'
}
formatSettings: {
type: 'ParquetWriteSettings'
fileExtension: '.parquet'
}
}
enableStaging: false
parallelCopies: 1
validateDataConsistency: false
translator: {
value: '@activity(\'Load Schema Mappings\').output.firstRow.translator'
type: 'Expression'
}
}
inputs: [
{
referenceName: dataset_msexports.name
type: 'DatasetReference'
parameters: {
blobPath: {
value: '@pipeline().parameters.blobPath'
type: 'Expression'
}
}
}
]
outputs: [
{
referenceName: dataset_ingestion.name
type: 'DatasetReference'
parameters: {
blobPath: {
value: '@concat(pipeline().parameters.destinationFolder, \'/\', pipeline().parameters.destinationFile)'
type: 'Expression'
}
}
}
]
}
]
}
{
value: 'parquet'
activities: [
{
name: 'Move Parquet File'
type: 'Copy'
dependsOn: []
policy: {
timeout: '0.00:05:00'
retry: 0
retryIntervalInSeconds: 30
secureOutput: false
secureInput: false
}
userProperties: []
typeProperties: {
source: {
type: 'ParquetSource'
storeSettings: {
type: 'AzureBlobFSReadSettings'
recursive: true
enablePartitionDiscovery: false
}
formatSettings: {
type: 'ParquetReadSettings'
}
}
sink: {
type: 'ParquetSink'
storeSettings: {
type: 'AzureBlobFSWriteSettings'
}
formatSettings: {
type: 'ParquetWriteSettings'
fileExtension: '.parquet'
}
}
enableStaging: false
parallelCopies: 1
validateDataConsistency: false
}
inputs: [
{
referenceName: dataset_msexports_parquet.name
type: 'DatasetReference'
parameters: {
blobPath: {
value: '@pipeline().parameters.blobPath'
type: 'Expression'
}
}
}
]
outputs: [
{
referenceName: dataset_ingestion.name
type: 'DatasetReference'
parameters: {
blobPath: {
value: '@concat(pipeline().parameters.destinationFolder, \'/\', pipeline().parameters.destinationFile)'
type: 'Expression'
}
}
}
]
}
]
}
]
defaultActivities: [
{
name: 'Unsupported File Type'
type: 'Fail'
dependsOn: []
userProperties: []
typeProperties: {
message: {
value: '@concat(\'Unable to ingest the specified export file because the file type is not supported. File: \', pipeline().parameters.blobPath)'
type: 'Expression'
}
errorCode: 'UnsupportedExportFileType'
}
}
]
}
}
]
parameters: {
blobPath: {
Expand Down

0 comments on commit 21b6612

Please sign in to comment.