Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NonderterministicFastCommit to speed up migrations when ordering isn't required #403

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 205 additions & 2 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,12 +777,17 @@ func (s *PersistentSlabStorage) sortedOwnedDeltaKeys() []SlabID {
}

func (s *PersistentSlabStorage) Commit() error {
var err error

// this part ensures the keys are sorted so commit operation is deterministic
keysWithOwners := s.sortedOwnedDeltaKeys()

for _, id := range keysWithOwners {
return s.commit(keysWithOwners)
}

func (s *PersistentSlabStorage) commit(keys []SlabID) error {
var err error

for _, id := range keys {
slab := s.deltas[id]

// deleted slabs
Expand Down Expand Up @@ -964,6 +969,204 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error {
return nil
}

// NonderterministicFastCommit commits changes in nondeterministic order.
// This is used by migration program when ordering isn't required.
func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to get a diff of this new function and the existing deterministic version of it? It would make it easier to review, by seeing how they differ.

If the functions only differ in a small way, maybe it's possible to extract the shared code and only make the parts that differ configurable?

// No changes
if len(s.deltas) == 0 {
return nil
}

type slabToBeEncoded struct {
slabID SlabID
slab Slab
}

type encodedSlab struct {
slabID SlabID
data []byte
err error
}

// Define encoder (worker) to encode slabs in parallel
encoder := func(
wg *sync.WaitGroup,
done <-chan struct{},
jobs <-chan slabToBeEncoded,
results chan<- encodedSlab,
) {
defer wg.Done()

for job := range jobs {
// Check if goroutine is signaled to stop before proceeding.
select {
case <-done:
return
default:
}

id := job.slabID
slab := job.slab

if slab == nil {
results <- encodedSlab{
slabID: id,
data: nil,
err: nil,
}
continue
}

// Serialize
data, err := EncodeSlab(slab, s.cborEncMode)
results <- encodedSlab{
slabID: id,
data: data,
err: err,
}
}
}

// Modified slabs need to be encoded (in parallel) and stored in underlying storage.
modifiedSlabCount := 0
// Deleted slabs need to be removed from underlying storage.
deletedSlabCount := 0
for k, v := range s.deltas {
// Ignore slabs not owned by accounts
if k.address == AddressUndefined {
continue
}
if v == nil {
turbolent marked this conversation as resolved.
Show resolved Hide resolved
deletedSlabCount++
} else {
modifiedSlabCount++
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a raw idea, anyway we can capture this data as part of updates, so we can skip iteration here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a raw idea, anyway we can capture this data as part of updates, so we can skip iteration here.

@ramtinms I like the idea of skipping iterations if possible!

By "updates", do you mean in Store() when modified or deleted slabs are stored in deltas? If so, it may be less memory efficient for non-migration use cases.

To reduce iterations here in NonderterministicFastCommit(), I pushed commit a2a4f5c to iterate only once to get modified and deleted slabs to process later.


if modifiedSlabCount == 0 && deletedSlabCount == 0 {
return nil
}

if modifiedSlabCount < 2 {
// Avoid goroutine overhead
ids := make([]SlabID, 0, modifiedSlabCount+deletedSlabCount)
for k := range s.deltas {
// Ignore slabs not owned by accounts
if k.address == AddressUndefined {
continue
}
ids = append(ids, k)
}

return s.commit(ids)
}

if numWorkers > modifiedSlabCount {
numWorkers = modifiedSlabCount
}

var wg sync.WaitGroup

// Create done signal channel
done := make(chan struct{})

// Create job queue
jobs := make(chan slabToBeEncoded, modifiedSlabCount)

// Create result queue
results := make(chan encodedSlab, modifiedSlabCount)

defer func() {
// This ensures that all goroutines are stopped before output channel is closed.

// Wait for all goroutines to finish
wg.Wait()

// Close output channel
close(results)
}()

// Launch workers to encode slabs
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go encoder(&wg, done, jobs, results)
}

// Send jobs
deletedSlabIDs := make([]SlabID, 0, deletedSlabCount)
for k, v := range s.deltas {
// ignore the ones that are not owned by accounts
if k.address == AddressUndefined {
continue
}
if v == nil {
deletedSlabIDs = append(deletedSlabIDs, k)
} else {
jobs <- slabToBeEncoded{k, v}
}
}
close(jobs)

// Remove deleted slabs from underlying storage.
for _, id := range deletedSlabIDs {

err := s.baseStorage.Remove(id)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id))
}

// Deleted slabs are removed from deltas and added to read cache so that:
// 1. next read is from in-memory read cache
// 2. deleted slabs are not re-committed in next commit
s.cache[id] = nil
delete(s.deltas, id)
}

// Process encoded slabs
for i := 0; i < modifiedSlabCount; i++ {
result := <-results

if result.err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// result.err is already categorized by Encode().
return result.err
}

id := result.slabID
data := result.data

if data == nil {
// Closing done channel signals goroutines to stop.
close(done)
// This is unexpected because deleted slabs are processed separately.
return NewEncodingErrorf("unexpectd encoded empty data")
}

// Store
err := s.baseStorage.Store(id, data)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", id))
}

s.cache[id] = s.deltas[id]
// It's safe to remove slab from deltas because
// iteration is on non-temp slabs and temp slabs
// are still in deltas.
delete(s.deltas, id)
}

// Do NOT reset deltas because slabs with empty address are not saved.

return nil
}

func (s *PersistentSlabStorage) DropDeltas() {
s.deltas = make(map[SlabID]Slab)
}
Expand Down
134 changes: 134 additions & 0 deletions storage_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Atree - Scalable Arrays and Ordered Maps
*
* Copyright 2024 Dapper Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package atree

import (
"encoding/binary"
"math/rand"
"runtime"
"strconv"
"testing"

"github.com/fxamacker/cbor/v2"
"github.com/stretchr/testify/require"
)

func benchmarkFastCommit(b *testing.B, seed int64, numberOfSlabs int) {
r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

slabs := make([]Slab, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slabs[i] = generateLargeSlab(id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorage()
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil)

for _, slab := range slabs {
err = storage.Store(slab.SlabID(), slab)
require.NoError(b, err)
}

b.StartTimer()

err := storage.FastCommit(runtime.NumCPU())
require.NoError(b, err)
}
})
}

func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs int) {
r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

slabs := make([]Slab, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slabs[i] = generateLargeSlab(id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorage()
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil)

for _, slab := range slabs {
err = storage.Store(slab.SlabID(), slab)
require.NoError(b, err)
}

b.StartTimer()

err := storage.NonderterministicFastCommit(runtime.NumCPU())
require.NoError(b, err)
}
})
}

func BenchmarkStorageFastCommit(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkFastCommit(b, fixedSeed, 10)
benchmarkFastCommit(b, fixedSeed, 100)
benchmarkFastCommit(b, fixedSeed, 1_000)
benchmarkFastCommit(b, fixedSeed, 10_000)
benchmarkFastCommit(b, fixedSeed, 100_000)
benchmarkFastCommit(b, fixedSeed, 1_000_000)
}

func BenchmarkStorageNondeterministicFastCommit(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkNondeterministicFastCommit(b, fixedSeed, 10)
benchmarkNondeterministicFastCommit(b, fixedSeed, 100)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 10_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000)
}
Loading
Loading