Skip to content

Commit

Permalink
Improved storage logging, normalized storage implementation with othe…
Browse files Browse the repository at this point in the history
…rs for better argument validation and other fixes.
  • Loading branch information
niemyjski committed Mar 29, 2023
1 parent 4d91795 commit 027b3d3
Showing 1 changed file with 154 additions and 57 deletions.
211 changes: 154 additions & 57 deletions src/Foundatio.Redis/Storage/RedisFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
namespace Foundatio.Storage {
public class RedisFileStorage : IFileStorage {
private readonly RedisFileStorageOptions _options;
private readonly ISerializer _serializer;
private readonly ILogger _logger;
private readonly string _fileSpecContainer;

public RedisFileStorage(RedisFileStorageOptions options) {
if (options.ConnectionMultiplexer == null)
throw new ArgumentException("ConnectionMultiplexer is required.");
options.Serializer = options.Serializer ?? DefaultSerializer.Instance;
_logger = options.LoggerFactory?.CreateLogger(typeof(RedisFileStorage)) ?? NullLogger.Instance;

_serializer = options.Serializer ?? DefaultSerializer.Instance;
_logger = options.LoggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance;

options.ConnectionMultiplexer.ConnectionRestored += ConnectionMultiplexerOnConnectionRestored;
_fileSpecContainer = $"{options.ContainerName}-filespecs";
_options = options;
Expand All @@ -31,7 +34,7 @@ public RedisFileStorage(RedisFileStorageOptions options) {
public RedisFileStorage(Builder<RedisFileStorageOptionsBuilder, RedisFileStorageOptions> config)
: this(config(new RedisFileStorageOptionsBuilder()).Build()) { }

public ISerializer Serializer => _options.Serializer;
ISerializer IHaveSerializer.Serializer => _serializer;

private IDatabase Database => _options.ConnectionMultiplexer.GetDatabase();

Expand All @@ -40,77 +43,137 @@ public void Dispose() {
}

public async Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default) {
var fileContent = await Run.WithRetriesAsync(() => Database.HashGetAsync(_options.ContainerName, NormalizePath(path)),
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Getting file stream for {Path}", normalizedPath);

var fileContent = await Run.WithRetriesAsync(() => Database.HashGetAsync(_options.ContainerName, normalizedPath),
cancellationToken: cancellationToken, logger: _logger).AnyContext();
if (fileContent.IsNull) return null;

if (fileContent.IsNull) {
_logger.LogError("Unable to get file stream for {Path}: File Not Found", normalizedPath);
return null;
}

return new MemoryStream(fileContent);
}

public async Task<FileSpec> GetFileInfoAsync(string path) {
var fileSpec = await Run.WithRetriesAsync(() => Database.HashGetAsync(_fileSpecContainer, NormalizePath(path)), logger: _logger).AnyContext();
if (!fileSpec.HasValue) return null;
return Serializer.Deserialize<FileSpec>((byte[])fileSpec);
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Getting file info for {Path}", normalizedPath);

var fileSpec = await Run.WithRetriesAsync(() => Database.HashGetAsync(_fileSpecContainer, normalizedPath), logger: _logger).AnyContext();
if (!fileSpec.HasValue) {
_logger.LogError("Unable to get file info for {Path}: File Not Found", normalizedPath);
return null;
}

return _serializer.Deserialize<FileSpec>((byte[])fileSpec);
}

public Task<bool> ExistsAsync(string path) {
return Run.WithRetriesAsync(() => Database.HashExistsAsync(_fileSpecContainer, NormalizePath(path)), logger: _logger);
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Checking if {Path} exists", normalizedPath);

return Run.WithRetriesAsync(() => Database.HashExistsAsync(_fileSpecContainer, normalizedPath), logger: _logger);
}

public async Task<bool> SaveFileAsync(string path, Stream stream, CancellationToken cancellationToken = default) {
path = NormalizePath(path);
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));
if (stream == null)
throw new ArgumentNullException(nameof(stream));

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Saving {Path}", normalizedPath);

try {
var database = Database;

using var memory = new MemoryStream();
await stream.CopyToAsync(memory, 0x14000, cancellationToken).AnyContext();
var saveFileTask = database.HashSetAsync(_options.ContainerName, path, memory.ToArray());
var saveFileTask = database.HashSetAsync(_options.ContainerName, normalizedPath, memory.ToArray());
long fileSize = memory.Length;
memory.Seek(0, SeekOrigin.Begin);
memory.SetLength(0);
Serializer.Serialize(new FileSpec {
Path = path,

_serializer.Serialize(new FileSpec {
Path = normalizedPath,
Created = DateTime.UtcNow,
Modified = DateTime.UtcNow,
Size = fileSize
}, memory);
var saveSpecTask = database.HashSetAsync(_fileSpecContainer, path, memory.ToArray());
var saveSpecTask = database.HashSetAsync(_fileSpecContainer, normalizedPath, memory.ToArray());
await Run.WithRetriesAsync(() => Task.WhenAll(saveFileTask, saveSpecTask),
cancellationToken: cancellationToken, logger: _logger).AnyContext();
return true;
}
catch (Exception ex) {
_logger.LogError(ex, "Error trying to save file: {Path}", path);
_logger.LogError(ex, "Error saving {Path}: {Message}", normalizedPath, ex.Message);
return false;
}
}

public async Task<bool> RenameFileAsync(string path, string newPath, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));
if (String.IsNullOrEmpty(newPath))
throw new ArgumentNullException(nameof(newPath));

string normalizedPath = NormalizePath(path);
string normalizedNewPath = NormalizePath(newPath);
_logger.LogInformation("Renaming {Path} to {NewPath}", normalizedPath, normalizedNewPath);

try {
var fileStream = await GetFileStreamAsync(path, cancellationToken).AnyContext();
return await DeleteFileAsync(path, cancellationToken).AnyContext() &&
await SaveFileAsync(newPath, fileStream, cancellationToken).AnyContext();
var stream = await GetFileStreamAsync(normalizedPath, cancellationToken).AnyContext();
return await DeleteFileAsync(normalizedPath, cancellationToken).AnyContext() &&
await SaveFileAsync(normalizedNewPath, stream, cancellationToken).AnyContext();
} catch (Exception ex) {
_logger.LogError(ex, "Error trying to rename file {Path} to {NewPath}.", path, newPath);
_logger.LogError(ex, "Error renaming {Path} to {NewPath}: {Message}", normalizedPath, newPath, ex.Message);
return false;
}
}

public async Task<bool> CopyFileAsync(string path, string targetPath, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));
if (String.IsNullOrEmpty(targetPath))
throw new ArgumentNullException(nameof(targetPath));

string normalizedPath = NormalizePath(path);
string normalizedTargetPath = NormalizePath(targetPath);
_logger.LogInformation("Copying {Path} to {TargetPath}", normalizedPath, normalizedTargetPath);

try {
var file = await GetFileStreamAsync(path, cancellationToken).AnyContext();
if (file == null) return false;
await SaveFileAsync(targetPath, file, cancellationToken).AnyContext();
return true;
using var stream = await GetFileStreamAsync(normalizedPath, cancellationToken).AnyContext();
if (stream == null)
return false;

return await SaveFileAsync(normalizedTargetPath, stream, cancellationToken).AnyContext();
} catch (Exception ex) {
_logger.LogError(ex, "Error trying to copy file {Path} to {TargetPath}.", path, targetPath);
_logger.LogError(ex, "Error copying {Path} to {TargetPath}: {Message}", normalizedPath, normalizedTargetPath, ex.Message);
return false;
}
}

public async Task<bool> DeleteFileAsync(string path, CancellationToken cancellationToken = default) {
path = NormalizePath(path);
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Deleting {Path}", normalizedPath);

var database = Database;
var deleteSpecTask = database.HashDeleteAsync(_fileSpecContainer, path);
var deleteFileTask = database.HashDeleteAsync(_options.ContainerName, path);
var deleteSpecTask = database.HashDeleteAsync(_fileSpecContainer, normalizedPath);
var deleteFileTask = database.HashDeleteAsync(_options.ContainerName, normalizedPath);
await Run.WithRetriesAsync(() => Task.WhenAll(deleteSpecTask, deleteFileTask), cancellationToken: cancellationToken, logger: _logger).AnyContext();
return true;
}
Expand All @@ -119,64 +182,70 @@ public async Task<int> DeleteFilesAsync(string searchPattern = null, Cancellatio
var files = await GetFileListAsync(searchPattern, cancellationToken: cancellationToken).AnyContext();
int count = 0;

_logger.LogInformation("Deleting {FileCount} files matching {SearchPattern}", files, searchPattern);
foreach (var file in files) {
await DeleteFileAsync(file.Path, cancellationToken).AnyContext();
count++;
}

_logger.LogTrace("Finished deleting {FileCount} files matching {SearchPattern}", count, searchPattern);

return count;
}

private Task<IEnumerable<FileSpec>> GetFileListAsync(string searchPattern = null, int? limit = null, int? skip = null, CancellationToken cancellationToken = default) {
if (limit.HasValue && limit.Value <= 0)
return Task.FromResult<IEnumerable<FileSpec>>(new List<FileSpec>());
private Task<List<FileSpec>> GetFileListAsync(string searchPattern = null, int? limit = null, int? skip = null, CancellationToken cancellationToken = default) {
if (limit is <= 0)
return Task.FromResult(new List<FileSpec>());

searchPattern = NormalizePath(searchPattern);
string prefix = searchPattern;
Regex patternRegex = null;
int wildcardPos = searchPattern?.IndexOf('*') ?? -1;
if (searchPattern != null && wildcardPos >= 0) {
patternRegex = new Regex("^" + Regex.Escape(searchPattern).Replace("\\*", ".*?") + "$");
patternRegex = new Regex($"^{Regex.Escape(searchPattern).Replace("\\*", ".*?")}$");
int slashPos = searchPattern.LastIndexOf('/');
prefix = slashPos >= 0 ? searchPattern.Substring(0, slashPos) : String.Empty;
}
prefix = prefix ?? String.Empty;

prefix ??= String.Empty;
int pageSize = limit ?? Int32.MaxValue;
return Task.FromResult(Database.HashScan(_fileSpecContainer, prefix + "*")
.Select(entry => Serializer.Deserialize<FileSpec>((byte[])entry.Value))

_logger.LogTrace(
s => s.Property("SearchPattern", searchPattern).Property("Limit", limit).Property("Skip", skip),
"Getting file list matching {Prefix} and {Pattern}...", prefix, patternRegex
);

return Task.FromResult(Database.HashScan(_fileSpecContainer, $"{prefix}*")
.Select(entry => _serializer.Deserialize<FileSpec>((byte[])entry.Value))
.Where(fileSpec => patternRegex == null || patternRegex.IsMatch(fileSpec.Path))
.Take(pageSize));
.Take(pageSize)
.ToList()
);
}

public async Task<PagedFileListResult> GetPagedFileListAsync(int pageSize = 100, string searchPattern = null, CancellationToken cancellationToken = default) {
if (pageSize <= 0)
return PagedFileListResult.Empty;

searchPattern = NormalizePath(searchPattern);

var result = new PagedFileListResult(r => Task.FromResult(GetFiles(searchPattern, 1, pageSize)));
var criteria = GetRequestCriteria(searchPattern);
var result = new PagedFileListResult(r => Task.FromResult(GetFiles(criteria, 1, pageSize)));
await result.NextPageAsync().AnyContext();
return result;
}

private NextPageResult GetFiles(string searchPattern, int page, int pageSize) {
private NextPageResult GetFiles(SearchCriteria criteria, int page, int pageSize) {
int pagingLimit = pageSize;
int skip = (page - 1) * pagingLimit;
if (pagingLimit < Int32.MaxValue)
pagingLimit = pagingLimit + 1;

string prefix = searchPattern;
Regex patternRegex = null;
int wildcardPos = searchPattern?.IndexOf('*') ?? -1;
if (searchPattern != null && wildcardPos >= 0) {
patternRegex = new Regex("^" + Regex.Escape(searchPattern).Replace("\\*", ".*?") + "$");
int slashPos = searchPattern.LastIndexOf('/');
prefix = slashPos >= 0 ? searchPattern.Substring(0, slashPos) : String.Empty;
}
prefix = prefix ?? String.Empty;

var list = Database.HashScan(_fileSpecContainer, prefix + "*")
.Select(entry => Serializer.Deserialize<FileSpec>((byte[])entry.Value))
.Where(fileSpec => patternRegex == null || patternRegex.IsMatch(fileSpec.Path))
pagingLimit++;

_logger.LogTrace(
s => s.Property("Limit", pagingLimit).Property("Skip", skip),
"Getting files matching {Prefix} and {Pattern}...", criteria.Prefix, criteria.Pattern
);

var list = Database.HashScan(_fileSpecContainer, $"{criteria.Prefix}*")
.Select(entry => _serializer.Deserialize<FileSpec>((byte[])entry.Value))
.Where(fileSpec => criteria.Pattern == null || criteria.Pattern.IsMatch(fileSpec.Path))
.Skip(skip)
.Take(pagingLimit)
.ToList();
Expand All @@ -191,16 +260,44 @@ private NextPageResult GetFiles(string searchPattern, int page, int pageSize) {
Success = true,
HasMore = hasMore,
Files = list,
NextPageFunc = hasMore ? r => Task.FromResult(GetFiles(searchPattern, page + 1, pageSize)) : (Func<PagedFileListResult, Task<NextPageResult>>)null
NextPageFunc = hasMore ? _ => Task.FromResult(GetFiles(criteria, page + 1, pageSize)) : null
};
}

private string NormalizePath(string path) {
return path?.Replace('\\', '/');
}

private class SearchCriteria {
public string Prefix { get; set; }
public Regex Pattern { get; set; }
}

private SearchCriteria GetRequestCriteria(string searchPattern) {
if (String.IsNullOrEmpty(searchPattern))
return new SearchCriteria { Prefix = String.Empty };

string normalizedSearchPattern = NormalizePath(searchPattern);
int wildcardPos = normalizedSearchPattern.IndexOf('*');
bool hasWildcard = wildcardPos >= 0;

string prefix = normalizedSearchPattern;
Regex patternRegex = null;

if (hasWildcard) {
patternRegex = new Regex($"^{Regex.Escape(normalizedSearchPattern).Replace("\\*", ".*?")}$");
int slashPos = normalizedSearchPattern.LastIndexOf('/');
prefix = slashPos >= 0 ? normalizedSearchPattern.Substring(0, slashPos) : String.Empty;
}

return new SearchCriteria {
Prefix = prefix,
Pattern = patternRegex
};
}

private void ConnectionMultiplexerOnConnectionRestored(object sender, ConnectionFailedEventArgs connectionFailedEventArgs) {
if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("Redis connection restored.");
_logger.LogInformation("Redis connection restored");
}
}
}

0 comments on commit 027b3d3

Please sign in to comment.