Skip to content

Commit

Permalink
feat: add AzureBlobOptions and IFileSaver interface; implement AzureS…
Browse files Browse the repository at this point in the history
…torageAccount provider and file handling
  • Loading branch information
Stephane Royer committed Dec 19, 2024
1 parent 91b9131 commit bce15fc
Show file tree
Hide file tree
Showing 17 changed files with 432 additions and 12 deletions.
8 changes: 4 additions & 4 deletions documentation/docs/recipes/2_normalize.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ The file would look like the following:

```csv title="post.csv"
title,author,email,timestamp,category,link,post
FundProcess features,Stéphane Royer,stephane.royer@fundprocess.lu,20210109113005,Category 2,https://www.fundprocess.lu/features/,
coucou features,Stéphane Royer,stephane.royer@coucou.lu,20210109113005,Category 2,https://www.coucou.lu/features/,
ETL.NET revealed,Paillave,[email protected],20210508181126,Category 2,,"This a post, about ETL.NET"
ETL.NET page,Paillave,[email protected],20210504164510,Category 1,https://paillave.github.io/Etl.Net/,
FundProcess presentation,Stéphane Royer,stephane.royer@fundprocess.lu,20210203124051,Category 2,,"This a ""post"", about FundProcess"
FundProcess website,Stéphane Royer,stephane.royer@fundprocess.lu,20210106103005,Category 1,http://www.fundprocess.lu,
coucou presentation,Stéphane Royer,stephane.royer@coucou.lu,20210203124051,Category 2,,"This a ""post"", about coucou"
coucou website,Stéphane Royer,stephane.royer@coucou.lu,20210106103005,Category 1,http://www.coucou.lu,
ETL.NET nuget,Paillave,[email protected],20200504164510,Category 1,http://www.nuget.org/packages/Etl.Net,
ETL.NET information,Paillave,[email protected],20200518071024,Category 3,,"This ""another post"" about ETL.NET"
FundProcess information,Stéphane Royer,stephane.royer@fundprocess.lu,20210819124550,Category 1,,This another post about FundProcess
coucou information,Stéphane Royer,stephane.royer@coucou.lu,20210819124550,Category 1,,This another post about coucou
```

The normalized structure where this file must be imported is this one:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Collections;
using Microsoft.Extensions.FileProviders;

namespace Paillave.Etl.AzureStorageAccountFileProvider;

public class AzureBlobDirectoryContents : IDirectoryContents, IEnumerable<AzureBlobFileInfo>
{
private readonly List<AzureBlobFileInfo> _blobs;
public bool Exists { get; }
internal AzureBlobDirectoryContents(List<AzureBlobFileInfo> blobs)
=> (_blobs, Exists) = (blobs, true);
public IEnumerator<IFileInfo> GetEnumerator()
=> _blobs.ToList().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator()
=> GetEnumerator();

IEnumerator<AzureBlobFileInfo> IEnumerable<AzureBlobFileInfo>.GetEnumerator()
=> _blobs.ToList().GetEnumerator();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.FileProviders;

namespace Paillave.Etl.AzureStorageAccountFileProvider;

public class AzureBlobFileInfo : IFileInfo
{
private readonly BlobClient? _blobClient = null;
private readonly BlobContainerClient? _blobContainerClient = null;
internal AzureBlobFileInfo(BlobClient blobClient)
{
_blobClient = blobClient;

var properties = blobClient.GetProperties()?.Value ?? throw new InvalidOperationException("Cannot get blob properties.");
Name = blobClient.Name.Split('/').Last();
Length = properties.ContentLength;
LastModified = properties.LastModified;
PhysicalPath = blobClient.Name;
}
internal AzureBlobFileInfo(BlobContainerClient blobContainerClient, BlobHierarchyItem blobHierarchyItem)
{
_blobContainerClient = blobContainerClient;
if (blobHierarchyItem.IsPrefix)
{
IsDirectory = true;
Name = blobHierarchyItem.Prefix.TrimEnd('/').Split('/').Last();
PhysicalPath = blobHierarchyItem.Prefix;
}
else
{
_blobClient = blobContainerClient.GetBlobClient(blobHierarchyItem.Blob.Name);
Name = blobHierarchyItem.Blob.Name.Split('/').Last();
Length = blobHierarchyItem.Blob.Properties.ContentLength ?? 0;
LastModified = blobHierarchyItem.Blob.Properties.LastModified ?? DateTimeOffset.MinValue;
PhysicalPath = blobHierarchyItem.Blob.Name;
}
}
public async Task DeleteAsync(CancellationToken cancellationToken = default)
{
if (_blobClient == null)
throw new InvalidOperationException("Cannot delete a directory.");
await _blobClient.DeleteAsync(cancellationToken: cancellationToken);
}
public async Task SaveStreamAsync(Stream stream, CancellationToken cancellationToken = default)
{
if (_blobClient == null)
throw new InvalidOperationException("Cannot save a stream for a directory.");
await _blobClient.UploadAsync(stream, cancellationToken);
var properties = _blobClient.GetProperties()?.Value ?? throw new InvalidOperationException("Cannot get blob properties.");
Length = properties.ContentLength;
LastModified = properties.LastModified;
}
public Task<AzureBlobFileInfo> SaveStream(string name, Stream stream, CancellationToken cancellationToken = default)
{
if (_blobContainerClient == null)
throw new InvalidOperationException("Cannot save a stream for a file.");
return _blobContainerClient.SaveFileAsync($"{PhysicalPath}{name}", stream, true, null, cancellationToken);
}
public Stream CreateReadStream() => _blobClient != null
? _blobClient.OpenRead()
: throw new InvalidOperationException("Cannot open a stream for a directory!");

public bool Exists => true;
public long Length { get; private set; }
public string? PhysicalPath { get; }
public string Name { get; }
public DateTimeOffset LastModified { get; private set; }
public bool IsDirectory { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using Azure.Storage.Blobs;
using Microsoft.Extensions.FileProviders;
using Microsoft.Extensions.Primitives;

namespace Paillave.Etl.AzureStorageAccountFileProvider;
public class AzureBlobFileProvider : IFileProvider, IFileSaver
{
private readonly BlobContainerClient _blobContainerClient;

public AzureBlobFileProvider(AzureBlobOptions azureBlobOptions)
=> _blobContainerClient = azureBlobOptions.GetBlobContainerClient();

public IDirectoryContents GetDirectoryContents(string subpath)
=> _blobContainerClient.GetDirectoryContents(subpath);

public IFileInfo GetFileInfo(string subpath)
=> _blobContainerClient.GetFileInfo(subpath);
public async Task<IFileInfo> SaveFileAsync(string subpath, Stream stream, CancellationToken cancellationToken = default)
=> await _blobContainerClient.SaveFileAsync(subpath, stream, true, null, cancellationToken);
public Task DeleteFileAsync(string subpath, CancellationToken cancellationToken = default)
=> _blobContainerClient.GetFileInfo(subpath).DeleteAsync(cancellationToken);

private class AzureBlobFileProviderChangeToken : IChangeToken
{
private readonly BlobContainerClient _blobContainerClient;
private readonly string _filter;
private readonly List<Action<object>> _callbacks = new();

public AzureBlobFileProviderChangeToken(string filter, BlobContainerClient blobContainerClient)
=> (_filter, _blobContainerClient) = (filter, blobContainerClient);

public bool ActiveChangeCallbacks => false;
public bool HasChanged => false;
public IDisposable RegisterChangeCallback(Action<object?> callback, object? state)
{
_callbacks.Add(callback);
return new DisposableAction(() => _callbacks.Remove(callback));
}
}
public IChangeToken Watch(string filter) => new AzureBlobFileProviderChangeToken(filter, _blobContainerClient);


private class DisposableAction : IDisposable
{
private readonly Action _action;
public DisposableAction(Action action) => _action = action;
public void Dispose() => _action();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Paillave.Etl.AzureStorageAccountFileProvider;

public class AzureBlobOptions
{
public Uri? BaseUri { get; set; }
public bool? DefaultAzureCredential { get; set; }
public required string DocumentContainer { get; set; }
public string? ConnectionString { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Paillave.Etl.Core;

namespace Paillave.Etl.AzureStorageAccountFileProvider;

public class AzureStorageAccountProviderProcessorAdapter : ProviderProcessorAdapterBase<AzureBlobOptions,
AzureStorageAccountAdapterProviderParameters,
AzureStorageAccountAdapterProcessorParameters>
{
public override string Description => "Get file and create document in azure storage account";
public override string Name => "AzureStorageAccount";

protected override IFileValueProvider CreateProvider(string code, string name, string connectionName,
AzureBlobOptions connectionParameters,
AzureStorageAccountAdapterProviderParameters inputParameters)
=> new AzureStorageAccountFileValueProvider(code, name, connectionName, connectionParameters, inputParameters);

protected override IFileValueProcessor CreateProcessor(string code, string name, string connectionName,
AzureBlobOptions connectionParameters,
AzureStorageAccountAdapterProcessorParameters outputParameters)
=> new AzureStorageAccountFileValueProcessor(code, name, connectionName, connectionParameters, outputParameters);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using Paillave.Etl.Core;

namespace Paillave.Etl.AzureStorageAccountFileProvider;

public class AzureStorageAccountFileValue : FileValueBase<AzureStorageAccountFileValueMetadata>
{
private readonly AzureBlobOptions _azureBlobOptions;
private readonly AzureBlobFileInfo _fileInfo;
public AzureStorageAccountFileValue(AzureBlobFileInfo fileInfo, string connectorCode, string connectorName, string connectionName, AzureBlobOptions azureBlobOptions)
: base(new AzureStorageAccountFileValueMetadata
{
BaseUri = azureBlobOptions.BaseUri,
Name = fileInfo.Name,
Folder = fileInfo.PhysicalPath,
DocumentContainer = azureBlobOptions.DocumentContainer,
ConnectionName = connectionName,
ConnectorCode = connectorCode,
ConnectorName = connectorName,
})
{
_fileInfo = fileInfo;
_azureBlobOptions = azureBlobOptions;
}

public override string Name => _fileInfo.Name;

public override Stream GetContent()
=> _fileInfo.CreateReadStream();

public override StreamWithResource OpenContent()
=> new(GetContent());

protected override void DeleteFile()
=> _fileInfo.DeleteAsync().Wait();
}
public class AzureStorageAccountFileValueMetadata : FileValueMetadataBase
{
public Uri? BaseUri { get; set; }
public string? Folder { get; set; }
public required string Name { get; set; }
public string? DocumentContainer { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using Paillave.Etl.Core;

namespace Paillave.Etl.AzureStorageAccountFileProvider;

public class AzureStorageAccountAdapterProcessorParameters
{
public string? SubFolder { get; set; }
public bool? OverwriteIfAlreadyExists { get; set; } = false;
}

public class AzureStorageAccountFileValueProcessor : FileValueProcessorBase<AzureBlobOptions, AzureStorageAccountAdapterProcessorParameters>
{
public AzureStorageAccountFileValueProcessor(string code, string name, string connectionName, AzureBlobOptions connectionParameters, AzureStorageAccountAdapterProcessorParameters processorParameters)
: base(code, name, connectionName, connectionParameters, processorParameters) { }

public override ProcessImpact PerformanceImpact => ProcessImpact.Average;
public override ProcessImpact MemoryFootPrint => ProcessImpact.Light;
protected override void Process(IFileValue fileValue, AzureBlobOptions connectionParameters, AzureStorageAccountAdapterProcessorParameters processorParameters, Action<IFileValue> push, CancellationToken cancellationToken, IExecutionContext context)
{
IDictionary<string, string>? metadata = ExtractMetadataRecursively(fileValue.Metadata);
var blobContainerClient = connectionParameters.GetBlobContainerClient();
var subpath = string.IsNullOrWhiteSpace(processorParameters.SubFolder)
? fileValue.Name
: $"{processorParameters.SubFolder.TrimEnd('/')}/{fileValue.Name}";
blobContainerClient.SaveFileAsync(
subpath,
fileValue.GetContent(),
processorParameters.OverwriteIfAlreadyExists ?? false,
metadata,
cancellationToken).Wait();
push(fileValue);
}

private IDictionary<string, string>? ExtractMetadataRecursively(IFileValueMetadata metadata)
{
if (metadata == null)
return null;
var result = new Dictionary<string, string>();
if (metadata.Properties != null)
{
foreach (var property in metadata.Properties.GetType().GetProperties())
{
var value = property.GetValue(metadata.Properties);
if (value != null)
{
var stringValue = value.ToString();
if (!string.IsNullOrWhiteSpace(stringValue))
result[property.Name] = stringValue;
}
}
}
return result;
}

protected override void Test(AzureBlobOptions connectionParameters, AzureStorageAccountAdapterProcessorParameters processorParameters)
{
var blobContainerClient = connectionParameters.GetBlobContainerClient();
var fileValueName = Guid.NewGuid().ToString();
var subPath = string.IsNullOrWhiteSpace(processorParameters.SubFolder) ? fileValueName : $"{processorParameters.SubFolder.TrimEnd('/')}/{fileValueName}";
var ms = new MemoryStream();
blobContainerClient.SaveFileAsync(subPath, ms).Wait();
blobContainerClient.GetBlobClient(subPath).Delete();
}

private byte[] ToByteArray(Stream stream)
{
using var memoryStream = new MemoryStream();
stream.Seek(0, SeekOrigin.Begin);
stream.CopyTo(memoryStream);
return memoryStream.ToArray();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using Microsoft.Extensions.FileSystemGlobbing;
using Paillave.Etl.Core;

namespace Paillave.Etl.AzureStorageAccountFileProvider;

public class AzureStorageAccountAdapterProviderParameters
{
public string? SubFolder { get; set; }
public string? FileNamePattern { get; set; }
}

public class AzureStorageAccountFileValueProvider : FileValueProviderBase<AzureBlobOptions, AzureStorageAccountAdapterProviderParameters>
{
public AzureStorageAccountFileValueProvider(string code, string name, string connectionName,
AzureBlobOptions connectionParameters,
AzureStorageAccountAdapterProviderParameters inputParameters)
: base(code, name, connectionName, connectionParameters, inputParameters) { }

public override ProcessImpact PerformanceImpact => ProcessImpact.Average;
public override ProcessImpact MemoryFootPrint => ProcessImpact.Light;
protected override void Provide(Action<IFileValue> pushFileValue, AzureBlobOptions connectionParameters,
AzureStorageAccountAdapterProviderParameters providerParameters, CancellationToken cancellationToken,
IExecutionContext context)
{
var searchPattern = string.IsNullOrEmpty(providerParameters.FileNamePattern) ? "*" : providerParameters.FileNamePattern;
var matcher = new Matcher().AddInclude(searchPattern);
var blobContainerClient = connectionParameters.GetBlobContainerClient();
foreach (var blobHierarchyItem in blobContainerClient.GetDirectoryContents(providerParameters.SubFolder ?? string.Empty, cancellationToken).Cast<AzureBlobFileInfo>())
{
if (cancellationToken.IsCancellationRequested) break;
if (!blobHierarchyItem.IsDirectory)
{
if (matcher.Match(blobHierarchyItem.Name).HasMatches)
pushFileValue(new AzureStorageAccountFileValue(blobHierarchyItem, this.Code, this.Name, this.ConnectionName, connectionParameters));
}
}
}

protected override void Test(AzureBlobOptions connectionParameters, AzureStorageAccountAdapterProviderParameters inputParameters)
{
connectionParameters.GetBlobContainerClient().GetDirectoryContents(inputParameters.SubFolder ?? string.Empty);
}
}
Loading

0 comments on commit bce15fc

Please sign in to comment.