Skip to content

Commit

Permalink
better explanation on choose of semaphores amount
Browse files Browse the repository at this point in the history
  • Loading branch information
tmm360 committed Jul 24, 2024
1 parent 89d6700 commit 5556bbf
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion src/BeeNet.Util/Hashing/Pipeline/ChunkFeederPipelineStage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ public ChunkFeederPipelineStage(
* Double semaphores compared to current chunk concurrency.
* This avoids the race condition when: a chunk complete its hashing, it's semaphore is assigned and
* locked by another one, and only after this the direct child of the first one tries to wait its parent.
*
* This is possible because the returning order of semaphores in queue is not guaranteed.
*
* Explanation:
* While the task of chunk A is still waiting to lock on its prev A-1, all the prev chunks have ended tasks.
* Anyway, prevs didn't end in order, and for some reason semaphore that was of chunk A-1 comes in order
* before than next "Concurrency -1" (only active task is with A). Because of this, it can be allocated
* with any next task from A+1. If this happens before A locks on semaphore of A-1, we are in deadlock.
*
* Instead, doubling semaphores we guarantee that queue never goes under level of concurrency
* with contained elements, so a prev chunk's semaphore can't be reused until it's direct next
* has completed and released concurrency.
*/
for (int i = 0; i < chunkConcurrency * 2; i++)
chunkSemaphorePool.Enqueue(new SemaphoreSlim(1, 1));
Expand Down Expand Up @@ -103,6 +115,8 @@ public async Task<SwarmChunkReference> HashDataAsync(Stream dataStream)
if (chunkReadSize > 0 || //write only chunks with data
passedBytes == 0) //or if the first and only one is empty
{
var chunkNumberId = passedBytes / SwarmChunk.DataSize;

// Copy read data from buffer to a new chunk data byte[]. Include also span
var chunkData = new byte[SwarmChunk.SpanSize + chunkReadSize];
chunkBuffer.AsSpan(0, chunkReadSize).CopyTo(chunkData.AsSpan(SwarmChunk.SpanSize));
Expand All @@ -127,7 +141,7 @@ public async Task<SwarmChunkReference> HashDataAsync(Stream dataStream)
var feedArgs = new HasherPipelineFeedArgs(
span: chunkData[..SwarmChunk.SpanSize],
data: chunkData,
numberId: passedBytes / SwarmChunk.DataSize,
numberId: chunkNumberId,
prevChunkSemaphore: prevChunkSemaphore);

//run task
Expand Down

0 comments on commit 5556bbf

Please sign in to comment.