Skip to content

Commit

Permalink
add chunk compaction feature
Browse files Browse the repository at this point in the history
  • Loading branch information
tmm360 committed Jul 24, 2024
2 parents 975d486 + 02b889e commit 89d6700
Show file tree
Hide file tree
Showing 29 changed files with 787 additions and 210 deletions.
4 changes: 2 additions & 2 deletions src/BeeNet.Client/BeeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -879,15 +879,15 @@ public async Task<string> RefreshAuthAsync(
},
cancellationToken).ConfigureAwait(false)).Key;

public async Task<SwarmHash> ResolveSwarmAddressToHashAsync(SwarmAddress address)
public async Task<SwarmChunkReference> ResolveAddressToChunkReferenceAsync(SwarmAddress address)
{
var chunkStore = new BeeClientChunkStore(this);

var rootManifest = new ReferencedMantarayManifest(
chunkStore,
address.Hash);

return await rootManifest.ResolveResourceHashAsync(address).ConfigureAwait(false);
return await rootManifest.ResolveAddressToChunkReferenceAsync(address).ConfigureAwait(false);
}

public Task ReuploadContentAsync(
Expand Down
59 changes: 59 additions & 0 deletions src/BeeNet.Core/Models/IReadOnlyPostageBuckets.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2021-present Etherna SA
// This file is part of Bee.Net.
//
// Bee.Net is free software: you can redistribute it and/or modify it under the terms of the
// GNU Lesser General Public License as published by the Free Software Foundation,
// either version 3 of the License, or (at your option) any later version.
//
// Bee.Net is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License along with Bee.Net.
// If not, see <https://www.gnu.org/licenses/>.

using System.Collections.Generic;

namespace Etherna.BeeNet.Models
{
public interface IReadOnlyPostageBuckets
{
// Properties.
/// <summary>
/// The higher level of collisions for a bucket
/// </summary>
uint MaxBucketCollisions { get; }

/// <summary>
/// The lower level of collisions for a bucket
/// </summary>
uint MinBucketCollisions { get; }

/// <summary>
/// Minimum required postage batch depth
/// </summary>
int RequiredPostageBatchDepth { get; }

/// <summary>
/// Total added chunks in buckets
/// </summary>
long TotalChunks { get; }

// Methods.
/// <summary>
/// Amount of buckets, grouped by collisions
/// </summary>
/// <returns>Array with collisions as index, and buckets amount as values</returns>
int[] CountBucketsByCollisions();

/// <summary>
/// Get a copy of all buckets
/// </summary>
/// <returns>All the buckets</returns>
uint[] GetBuckets();

IEnumerable<uint> GetBucketsByCollisions(uint collisions);

uint GetCollisions(uint bucketId);
}
}
197 changes: 160 additions & 37 deletions src/BeeNet.Core/Models/PostageBuckets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,27 @@
// If not, see <https://www.gnu.org/licenses/>.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;

namespace Etherna.BeeNet.Models
{
/// <summary>
/// A thread safe implementation of postage buckets array
/// </summary>
public class PostageBuckets
[SuppressMessage("Reliability", "CA2002:Do not lock on objects with weak identity")]
public class PostageBuckets : IReadOnlyPostageBuckets, IDisposable
{
// Consts.
public const int BucketsSize = 1 << PostageBatch.BucketDepth;

// Fields.
private readonly ConcurrentDictionary<uint, uint> _buckets; //<index, collisions>
private readonly uint[] _buckets;
private readonly Dictionary<uint, HashSet<uint>> bucketsByCollisions; //<collisions, bucketId[]>
private readonly ReaderWriterLockSlim bucketsLock = new(LockRecursionPolicy.NoRecursion);
private bool disposed;

// Constructor.
public PostageBuckets(
Expand All @@ -38,56 +44,173 @@ public PostageBuckets(
throw new ArgumentOutOfRangeException(nameof(initialBuckets),
$"Initial buckets must have length {BucketsSize}, or be null");

_buckets = ArrayToDictionary(initialBuckets ?? []);
//init "buckets" and reverse index "bucketsByCollisions"
_buckets = initialBuckets ?? new uint[BucketsSize];
bucketsByCollisions = new Dictionary<uint, HashSet<uint>> { [0] = [] };
for (uint i = 0; i < BucketsSize; i++)
bucketsByCollisions[0].Add(i);

//init counters
MaxBucketCollisions = 0;
MinBucketCollisions = 0;
TotalChunks = 0;
}

// Dispose.
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (disposed) return;

// Dispose managed resources.
if (disposing)
bucketsLock.Dispose();

disposed = true;
}

// Properties.
public ReadOnlySpan<uint> Buckets => DictionaryToArray(_buckets);
public uint MaxBucketCount { get; private set; }
public uint MaxBucketCollisions { get; private set; }
public uint MinBucketCollisions { get; private set; }
public int RequiredPostageBatchDepth => CollisionsToRequiredPostageBatchDepth(MaxBucketCollisions);
public long TotalChunks { get; private set; }

// Methods.
public int[] CountBucketsByCollisions()
{
bucketsLock.EnterReadLock();
try
{
return bucketsByCollisions.Select(pair => pair.Value.Count).ToArray();
}
finally
{
bucketsLock.ExitReadLock();
}
}

public uint[] GetBuckets()
{
bucketsLock.EnterReadLock();
try
{
return _buckets.ToArray();
}
finally
{
bucketsLock.ExitReadLock();
}
}

public IEnumerable<uint> GetBucketsByCollisions(uint collisions)
{
bucketsLock.EnterReadLock();
try
{
return bucketsByCollisions.TryGetValue(collisions, out var bucketsSet)
? bucketsSet
: Array.Empty<uint>();
}
finally
{
bucketsLock.ExitReadLock();
}
}

public uint GetCollisions(uint bucketId)
{
_buckets.TryGetValue(bucketId, out var collisions);
return collisions;
bucketsLock.EnterReadLock();
try
{
return _buckets[bucketId];
}
finally
{
bucketsLock.ExitReadLock();
}
}

public void IncrementCollisions(uint bucketId)
{
_buckets.AddOrUpdate(
bucketId,
_ =>
{
TotalChunks++;
if (1 > MaxBucketCount)
MaxBucketCount = 1;
return 1;
},
(_, c) =>
{
TotalChunks++;
if (c + 1 > MaxBucketCount)
MaxBucketCount = c + 1;
return c + 1;
});
/*
* We have to lock on _buckets because we need atomic operations also with bucketsByCollisions
* and counters. ConcurrentDictionary would have better locking on single values, but doesn't
* support atomic operations involving third objects, like counters and "bucketsByCollisions".
*/
bucketsLock.EnterWriteLock();
try
{
// Update collections.
_buckets[bucketId]++;

bucketsByCollisions.TryAdd(_buckets[bucketId], []);
bucketsByCollisions[_buckets[bucketId] - 1].Remove(bucketId);
bucketsByCollisions[_buckets[bucketId]].Add(bucketId);

// Update counters.
if (_buckets[bucketId] > MaxBucketCollisions)
MaxBucketCollisions = _buckets[bucketId];

MinBucketCollisions = bucketsByCollisions.OrderBy(p => p.Key)
.First(p => p.Value.Count > 0)
.Key;

TotalChunks++;
}
finally
{
bucketsLock.ExitWriteLock();
}
}

public void ResetBucketCollisions(uint bucketId) =>
_buckets.AddOrUpdate(bucketId, _ => 0, (_, _) => 0);
public void ResetBucketCollisions(uint bucketId)
{
bucketsLock.EnterWriteLock();
try
{
// Update collections.
var oldCollisions = _buckets[bucketId];
_buckets[bucketId] = 0;

bucketsByCollisions[oldCollisions].Remove(bucketId);
bucketsByCollisions[0].Add(bucketId);

// Update counters.
MaxBucketCollisions = bucketsByCollisions.OrderByDescending(p => p.Key)
.First(p => p.Value.Count > 0)
.Key;

MinBucketCollisions = 0;
}
finally
{
bucketsLock.ExitWriteLock();
}
}

// Helpers.
private static ConcurrentDictionary<uint, uint> ArrayToDictionary(uint[] buckets) =>
new(buckets.Select((c, i) => (c, (uint)i))
.ToDictionary<(uint value, uint index), uint, uint>(pair => pair.index, pair => pair.value));

private static uint[] DictionaryToArray(ConcurrentDictionary<uint, uint> dictionary)
// Static methods.
public static int CollisionsToRequiredPostageBatchDepth(uint collisions)
{
var outArray = new uint[BucketsSize];
for (uint i = 0; i < BucketsSize; i++)
if (dictionary.TryGetValue(i, out var value))
outArray[i] = value;
return outArray;
if (collisions == 0)
return PostageBatch.MinDepth;
return Math.Max(
(int)Math.Ceiling(Math.Log2(collisions)) + PostageBatch.BucketDepth,
PostageBatch.MinDepth);
}

public static uint PostageBatchDepthToMaxCollisions(int postageBatchDepth)
{
#pragma warning disable CA1512 //only supported from .net 8
if (postageBatchDepth < PostageBatch.MinDepth)
throw new ArgumentOutOfRangeException(nameof(postageBatchDepth));
#pragma warning restore CA1512

return (uint)Math.Pow(2, postageBatchDepth - PostageBatch.BucketDepth);
}
}
}
9 changes: 9 additions & 0 deletions src/BeeNet.Core/Models/SwarmChunkReference.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Etherna.BeeNet.Models
{
public class SwarmChunkReference(SwarmHash hash, XorEncryptKey? encryptionKey, bool useRecursiveEncryption)
{
public XorEncryptKey? EncryptionKey { get; } = encryptionKey;
public SwarmHash Hash { get; } = hash;
public bool UseRecursiveEncryption { get; } = useRecursiveEncryption;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// You should have received a copy of the GNU Lesser General Public License along with Bee.Net.
// If not, see <https://www.gnu.org/licenses/>.

using Nethereum.Hex.HexConvertors.Extensions;
using System;
using System.Security.Cryptography;

namespace Etherna.BeeNet.Manifest
namespace Etherna.BeeNet.Models
{
public class XorEncryptKey
{
Expand All @@ -34,6 +35,23 @@ public XorEncryptKey(byte[] bytes)

this.bytes = bytes;
}

public XorEncryptKey(string key)
{
ArgumentNullException.ThrowIfNull(key, nameof(key));

try
{
bytes = key.HexToByteArray();
}
catch (FormatException)
{
throw new ArgumentException("Invalid hash", nameof(key));
}

if (!IsValidKey(bytes))
throw new ArgumentOutOfRangeException(nameof(key));
}

// Builders.
public static XorEncryptKey BuildNewRandom()
Expand Down Expand Up @@ -61,5 +79,14 @@ public void EncryptDecrypt(Span<byte> data)
for (var i = 0; i < data.Length; i++)
data[i] = (byte)(data[i] ^ bytes[i % bytes.Length]);
}

public override string ToString() => bytes.ToHex();

// Static methods.
public static bool IsValidKey(byte[] value)
{
ArgumentNullException.ThrowIfNull(value, nameof(value));
return value.Length == KeySize;
}
}
}
Loading

0 comments on commit 89d6700

Please sign in to comment.