-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add NonderterministicFastCommit to speed up migrations when ordering isn't required #403
Changes from 2 commits
7162eab
e739c6e
a2a4f5c
aa1c121
e83159f
5808810
81b6dcd
88fa22f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -777,12 +777,17 @@ func (s *PersistentSlabStorage) sortedOwnedDeltaKeys() []SlabID { | |
} | ||
|
||
func (s *PersistentSlabStorage) Commit() error { | ||
var err error | ||
|
||
// this part ensures the keys are sorted so commit operation is deterministic | ||
keysWithOwners := s.sortedOwnedDeltaKeys() | ||
|
||
for _, id := range keysWithOwners { | ||
return s.commit(keysWithOwners) | ||
} | ||
|
||
func (s *PersistentSlabStorage) commit(keys []SlabID) error { | ||
var err error | ||
|
||
for _, id := range keys { | ||
slab := s.deltas[id] | ||
|
||
// deleted slabs | ||
|
@@ -964,6 +969,204 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error { | |
return nil | ||
} | ||
|
||
// NonderterministicFastCommit commits changes in nondeterministic order. | ||
// This is used by migration program when ordering isn't required. | ||
func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) error { | ||
// No changes | ||
if len(s.deltas) == 0 { | ||
return nil | ||
} | ||
|
||
type slabToBeEncoded struct { | ||
slabID SlabID | ||
slab Slab | ||
} | ||
|
||
type encodedSlab struct { | ||
slabID SlabID | ||
data []byte | ||
err error | ||
} | ||
|
||
// Define encoder (worker) to encode slabs in parallel | ||
encoder := func( | ||
wg *sync.WaitGroup, | ||
done <-chan struct{}, | ||
jobs <-chan slabToBeEncoded, | ||
results chan<- encodedSlab, | ||
) { | ||
defer wg.Done() | ||
|
||
for job := range jobs { | ||
// Check if goroutine is signaled to stop before proceeding. | ||
select { | ||
case <-done: | ||
return | ||
default: | ||
} | ||
|
||
id := job.slabID | ||
slab := job.slab | ||
|
||
if slab == nil { | ||
results <- encodedSlab{ | ||
slabID: id, | ||
data: nil, | ||
err: nil, | ||
} | ||
continue | ||
} | ||
|
||
// Serialize | ||
data, err := EncodeSlab(slab, s.cborEncMode) | ||
results <- encodedSlab{ | ||
slabID: id, | ||
data: data, | ||
err: err, | ||
} | ||
} | ||
} | ||
|
||
// Modified slabs need to be encoded (in parallel) and stored in underlying storage. | ||
modifiedSlabCount := 0 | ||
// Deleted slabs need to be removed from underlying storage. | ||
deletedSlabCount := 0 | ||
for k, v := range s.deltas { | ||
// Ignore slabs not owned by accounts | ||
if k.address == AddressUndefined { | ||
continue | ||
} | ||
if v == nil { | ||
turbolent marked this conversation as resolved.
Show resolved
Hide resolved
|
||
deletedSlabCount++ | ||
} else { | ||
modifiedSlabCount++ | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a raw idea, anyway we can capture this data as part of updates, so we can skip iteration here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@ramtinms I like the idea of skipping iterations if possible! By "updates", do you mean in To reduce iterations here in |
||
|
||
if modifiedSlabCount == 0 && deletedSlabCount == 0 { | ||
return nil | ||
} | ||
|
||
if modifiedSlabCount < 2 { | ||
// Avoid goroutine overhead | ||
ids := make([]SlabID, 0, modifiedSlabCount+deletedSlabCount) | ||
for k := range s.deltas { | ||
// Ignore slabs not owned by accounts | ||
if k.address == AddressUndefined { | ||
continue | ||
} | ||
ids = append(ids, k) | ||
} | ||
|
||
return s.commit(ids) | ||
} | ||
|
||
if numWorkers > modifiedSlabCount { | ||
numWorkers = modifiedSlabCount | ||
} | ||
|
||
var wg sync.WaitGroup | ||
|
||
// Create done signal channel | ||
done := make(chan struct{}) | ||
|
||
// Create job queue | ||
jobs := make(chan slabToBeEncoded, modifiedSlabCount) | ||
|
||
// Create result queue | ||
results := make(chan encodedSlab, modifiedSlabCount) | ||
|
||
defer func() { | ||
// This ensures that all goroutines are stopped before output channel is closed. | ||
|
||
// Wait for all goroutines to finish | ||
wg.Wait() | ||
|
||
// Close output channel | ||
close(results) | ||
}() | ||
|
||
// Launch workers to encode slabs | ||
wg.Add(numWorkers) | ||
for i := 0; i < numWorkers; i++ { | ||
go encoder(&wg, done, jobs, results) | ||
} | ||
|
||
// Send jobs | ||
deletedSlabIDs := make([]SlabID, 0, deletedSlabCount) | ||
for k, v := range s.deltas { | ||
// ignore the ones that are not owned by accounts | ||
if k.address == AddressUndefined { | ||
continue | ||
} | ||
if v == nil { | ||
deletedSlabIDs = append(deletedSlabIDs, k) | ||
} else { | ||
jobs <- slabToBeEncoded{k, v} | ||
} | ||
} | ||
close(jobs) | ||
|
||
// Remove deleted slabs from underlying storage. | ||
for _, id := range deletedSlabIDs { | ||
|
||
err := s.baseStorage.Remove(id) | ||
if err != nil { | ||
// Closing done channel signals goroutines to stop. | ||
close(done) | ||
// Wrap err as external error (if needed) because err is returned by BaseStorage interface. | ||
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id)) | ||
} | ||
|
||
// Deleted slabs are removed from deltas and added to read cache so that: | ||
// 1. next read is from in-memory read cache | ||
// 2. deleted slabs are not re-committed in next commit | ||
s.cache[id] = nil | ||
delete(s.deltas, id) | ||
} | ||
|
||
// Process encoded slabs | ||
for i := 0; i < modifiedSlabCount; i++ { | ||
result := <-results | ||
|
||
if result.err != nil { | ||
// Closing done channel signals goroutines to stop. | ||
close(done) | ||
// result.err is already categorized by Encode(). | ||
return result.err | ||
} | ||
|
||
id := result.slabID | ||
data := result.data | ||
|
||
if data == nil { | ||
// Closing done channel signals goroutines to stop. | ||
close(done) | ||
// This is unexpected because deleted slabs are processed separately. | ||
return NewEncodingErrorf("unexpectd encoded empty data") | ||
} | ||
|
||
// Store | ||
err := s.baseStorage.Store(id, data) | ||
if err != nil { | ||
// Closing done channel signals goroutines to stop. | ||
close(done) | ||
// Wrap err as external error (if needed) because err is returned by BaseStorage interface. | ||
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", id)) | ||
} | ||
|
||
s.cache[id] = s.deltas[id] | ||
// It's safe to remove slab from deltas because | ||
// iteration is on non-temp slabs and temp slabs | ||
// are still in deltas. | ||
delete(s.deltas, id) | ||
} | ||
|
||
// Do NOT reset deltas because slabs with empty address are not saved. | ||
|
||
return nil | ||
} | ||
|
||
func (s *PersistentSlabStorage) DropDeltas() { | ||
s.deltas = make(map[SlabID]Slab) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
/* | ||
* Atree - Scalable Arrays and Ordered Maps | ||
* | ||
* Copyright 2024 Dapper Labs, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package atree | ||
|
||
import ( | ||
"encoding/binary" | ||
"math/rand" | ||
"runtime" | ||
"strconv" | ||
"testing" | ||
|
||
"github.com/fxamacker/cbor/v2" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func benchmarkFastCommit(b *testing.B, seed int64, numberOfSlabs int) { | ||
r := rand.New(rand.NewSource(seed)) | ||
|
||
encMode, err := cbor.EncOptions{}.EncMode() | ||
require.NoError(b, err) | ||
|
||
decMode, err := cbor.DecOptions{}.DecMode() | ||
require.NoError(b, err) | ||
|
||
slabs := make([]Slab, numberOfSlabs) | ||
for i := 0; i < numberOfSlabs; i++ { | ||
addr := generateRandomAddress(r) | ||
|
||
var index SlabIndex | ||
binary.BigEndian.PutUint64(index[:], uint64(i)) | ||
|
||
id := SlabID{addr, index} | ||
|
||
slabs[i] = generateLargeSlab(id) | ||
} | ||
|
||
b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { | ||
for i := 0; i < b.N; i++ { | ||
b.StopTimer() | ||
|
||
baseStorage := NewInMemBaseStorage() | ||
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) | ||
|
||
for _, slab := range slabs { | ||
err = storage.Store(slab.SlabID(), slab) | ||
require.NoError(b, err) | ||
} | ||
|
||
b.StartTimer() | ||
|
||
err := storage.FastCommit(runtime.NumCPU()) | ||
require.NoError(b, err) | ||
} | ||
}) | ||
} | ||
|
||
func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs int) { | ||
r := rand.New(rand.NewSource(seed)) | ||
|
||
encMode, err := cbor.EncOptions{}.EncMode() | ||
require.NoError(b, err) | ||
|
||
decMode, err := cbor.DecOptions{}.DecMode() | ||
require.NoError(b, err) | ||
|
||
slabs := make([]Slab, numberOfSlabs) | ||
for i := 0; i < numberOfSlabs; i++ { | ||
addr := generateRandomAddress(r) | ||
|
||
var index SlabIndex | ||
binary.BigEndian.PutUint64(index[:], uint64(i)) | ||
|
||
id := SlabID{addr, index} | ||
|
||
slabs[i] = generateLargeSlab(id) | ||
} | ||
|
||
b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { | ||
for i := 0; i < b.N; i++ { | ||
b.StopTimer() | ||
|
||
baseStorage := NewInMemBaseStorage() | ||
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) | ||
|
||
for _, slab := range slabs { | ||
err = storage.Store(slab.SlabID(), slab) | ||
require.NoError(b, err) | ||
} | ||
|
||
b.StartTimer() | ||
|
||
err := storage.NonderterministicFastCommit(runtime.NumCPU()) | ||
require.NoError(b, err) | ||
} | ||
}) | ||
} | ||
|
||
func BenchmarkStorageFastCommit(b *testing.B) { | ||
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. | ||
|
||
benchmarkFastCommit(b, fixedSeed, 10) | ||
benchmarkFastCommit(b, fixedSeed, 100) | ||
benchmarkFastCommit(b, fixedSeed, 1_000) | ||
benchmarkFastCommit(b, fixedSeed, 10_000) | ||
benchmarkFastCommit(b, fixedSeed, 100_000) | ||
benchmarkFastCommit(b, fixedSeed, 1_000_000) | ||
} | ||
|
||
func BenchmarkStorageNondeterministicFastCommit(b *testing.B) { | ||
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. | ||
|
||
benchmarkNondeterministicFastCommit(b, fixedSeed, 10) | ||
benchmarkNondeterministicFastCommit(b, fixedSeed, 100) | ||
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000) | ||
benchmarkNondeterministicFastCommit(b, fixedSeed, 10_000) | ||
benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000) | ||
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to get a diff of this new function and the existing deterministic version of it? It would make it easier to review, by seeing how they differ.
If the functions only differ in a small way, maybe it's possible to extract the shared code and only make the parts that differ configurable?