From c8aace956a8a3f7e6d7fca271e9996f70b57b22e Mon Sep 17 00:00:00 2001 From: Anton Dort-Golts Date: Fri, 6 Nov 2020 19:25:12 +0300 Subject: [PATCH] Hybrid logstore (#463) * Refactoring headbook test suite Signed-off-by: Anton Dort-Golts * Impl headbook export test Signed-off-by: Anton Dort-Golts * Impl exportable for ds-backed headbook Signed-off-by: Anton Dort-Golts * Impl exportable for in-memory headbook Signed-off-by: Anton Dort-Golts * Cleanup go modules Signed-off-by: Anton Dort-Golts * Specialize dump/restore signatures in components Signed-off-by: Anton Dort-Golts * Impl export for address books (ds + mem) Signed-off-by: Anton Dort-Golts * Protect against empty dumps Signed-off-by: Anton Dort-Golts * Cover address book export with tests Signed-off-by: Anton Dort-Golts * Move log/thread ID parsing into dedicated functions Signed-off-by: Anton Dort-Golts * Use named error for empty dump restore Signed-off-by: Anton Dort-Golts * Impl export for key books (ds + mem) Signed-off-by: Anton Dort-Golts * Fix key suffix comparison Signed-off-by: Anton Dort-Golts * Cover key book export with tests Signed-off-by: Anton Dort-Golts * Fix resource leak on early return from key iteration Signed-off-by: Anton Dort-Golts * Impl export for metadata books (ds + mem) Signed-off-by: Anton Dort-Golts * Fix value decoding on dumps Signed-off-by: Anton Dort-Golts * Cover metadata book export with tests Signed-off-by: Anton Dort-Golts * Control restore behaviour with flag Signed-off-by: Anton Dort-Golts * Impl hybrid logstore Signed-off-by: Anton Dort-Golts * Add logstore suite to datastore tests Signed-off-by: Anton Dort-Golts * Run standard test suites on a hybrid store Signed-off-by: Anton Dort-Golts * Impl resource finalizer Signed-off-by: Anton Dort-Golts * Logstore kind option + refactoring Signed-off-by: Anton Dort-Golts * Rename: LogstoreKind -> LogstoreType Signed-off-by: Anton Dort-Golts --- common/common.go | 185 ++++++++------ core/logstore/logstore.go | 66 +++++ go.mod | 2 + logstore/logstore.go | 2 + logstore/lstoreds/addr_book.go | 122 +++++++++ logstore/lstoreds/ds_test.go | 30 ++- logstore/lstoreds/headbook.go | 97 ++++++- logstore/lstoreds/keybook.go | 147 +++++++++++ logstore/lstoreds/logstore.go | 27 +- logstore/lstoreds/metadata.go | 125 ++++++++- logstore/lstorehybrid/hybrid_test.go | 152 +++++++++++ logstore/lstorehybrid/logstore.go | 367 +++++++++++++++++++++++++++ logstore/lstoremem/addr_book.go | 80 +++++- logstore/lstoremem/headbook.go | 44 +++- logstore/lstoremem/keybook.go | 63 +++++ logstore/lstoremem/logstore.go | 3 + logstore/lstoremem/metadata.go | 97 +++++-- test/addr_book_suite.go | 32 ++- test/headbook_suite.go | 323 +++++++++++++---------- test/keybook_suite.go | 123 +++++++++ test/metadata_suite.go | 105 +++++++- util/finalizer.go | 44 ++++ 22 files changed, 1971 insertions(+), 265 deletions(-) create mode 100644 logstore/lstorehybrid/hybrid_test.go create mode 100644 logstore/lstorehybrid/logstore.go create mode 100644 util/finalizer.go diff --git a/common/common.go b/common/common.go index ab16b1cb..dfc140b2 100644 --- a/common/common.go +++ b/common/common.go @@ -2,25 +2,25 @@ package common import ( "context" + "fmt" "os" "path/filepath" "time" ipfslite "github.com/hsanjuan/ipfs-lite" - datastore "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p" connmgr "github.com/libp2p/go-libp2p-connmgr" cconnmgr "github.com/libp2p/go-libp2p-core/connmgr" - host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - peerstore "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p-kad-dht/dual" "github.com/libp2p/go-libp2p-peerstore/pstoreds" ma "github.com/multiformats/go-multiaddr" "github.com/textileio/go-threads/core/app" + core "github.com/textileio/go-threads/core/logstore" "github.com/textileio/go-threads/logstore/lstoreds" + "github.com/textileio/go-threads/logstore/lstorehybrid" + "github.com/textileio/go-threads/logstore/lstoremem" "github.com/textileio/go-threads/net" - util "github.com/textileio/go-threads/util" + "github.com/textileio/go-threads/util" "google.golang.org/grpc" ) @@ -37,41 +37,41 @@ type NetBoostrapper interface { } func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error) { - config := &NetConfig{} - for _, opt := range opts { - if err := opt(config); err != nil { - return nil, err - } - } + var ( + config NetConfig + fin = util.NewFinalizer() + ) - if config.HostAddr == nil { - addr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0") - if err != nil { + for _, opt := range opts { + if err := opt(&config); err != nil { return nil, err } - config.HostAddr = addr } - if config.ConnManager == nil { - config.ConnManager = connmgr.NewConnManager(100, 400, time.Second*20) + if err := setDefaults(&config); err != nil { + return nil, err } ipfsLitePath := filepath.Join(repoPath, defaultIpfsLitePath) if err := os.MkdirAll(ipfsLitePath, os.ModePerm); err != nil { return nil, err } + litestore, err := ipfslite.BadgerDatastore(ipfsLitePath) if err != nil { return nil, err } + fin.Add(litestore) ctx, cancel := context.WithCancel(context.Background()) + fin.Add(util.NewContextCloser(cancel)) + pstore, err := pstoreds.NewPeerstore(ctx, litestore, pstoreds.DefaultOpts()) if err != nil { - litestore.Close() - cancel() - return nil, err + return nil, fin.Cleanup(err) } + fin.Add(pstore) + priv := util.LoadKey(filepath.Join(ipfsLitePath, "key")) h, d, err := ipfslite.SetupLibp2p( ctx, @@ -84,37 +84,17 @@ func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error) libp2p.DisableRelay(), ) if err != nil { - cancel() - litestore.Close() - return nil, err + return nil, fin.Cleanup(err) } + lite, err := ipfslite.New(ctx, litestore, h, d, nil) if err != nil { - cancel() - litestore.Close() - return nil, err + return nil, fin.Cleanup(err) } - // Build a logstore - logstorePath := filepath.Join(repoPath, defaultLogstorePath) - if err := os.MkdirAll(logstorePath, os.ModePerm); err != nil { - cancel() - return nil, err - } - logstore, err := ipfslite.BadgerDatastore(logstorePath) + tstore, err := buildLogstore(ctx, config.LSType, repoPath, fin) if err != nil { - cancel() - litestore.Close() - return nil, err - } - tstore, err := lstoreds.NewLogstore(ctx, logstore, lstoreds.DefaultOpts()) - if err != nil { - cancel() - if err := logstore.Close(); err != nil { - return nil, err - } - litestore.Close() - return nil, err + return nil, fin.Cleanup(err) } // Build a network @@ -123,33 +103,89 @@ func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error) PubSub: config.PubSub, }, config.GRPCServerOptions, config.GRPCDialOptions) if err != nil { - cancel() - if err := logstore.Close(); err != nil { - return nil, err - } - litestore.Close() - return nil, err + return nil, fin.Cleanup(err) } + fin.Add(h, d, api) return &netBoostrapper{ - cancel: cancel, Net: api, litepeer: lite, - pstore: pstore, - logstore: logstore, - litestore: litestore, - host: h, - dht: d, + finalizer: fin, }, nil } +func buildLogstore(ctx context.Context, lstype LogstoreType, repoPath string, fin *util.Finalizer) (core.Logstore, error) { + switch lstype { + case LogstoreInMemory: + return lstoremem.NewLogstore(), nil + + case LogstoreHybrid: + pls, err := persistentLogstore(ctx, repoPath, fin) + if err != nil { + return nil, err + } + mls := lstoremem.NewLogstore() + return lstorehybrid.NewLogstore(pls, mls) + + case LogstorePersistent: + return persistentLogstore(ctx, repoPath, fin) + + default: + return nil, fmt.Errorf("unsupported logstore type: %s", lstype) + } +} + +func persistentLogstore(ctx context.Context, repoPath string, fin *util.Finalizer) (core.Logstore, error) { + logstorePath := filepath.Join(repoPath, defaultLogstorePath) + if err := os.MkdirAll(logstorePath, os.ModePerm); err != nil { + return nil, err + } + + dstore, err := ipfslite.BadgerDatastore(logstorePath) + if err != nil { + return nil, err + } + fin.Add(dstore) + + return lstoreds.NewLogstore(ctx, dstore, lstoreds.DefaultOpts()) +} + +func setDefaults(config *NetConfig) error { + if config.HostAddr == nil { + addr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0") + if err != nil { + return err + } + config.HostAddr = addr + } + + if config.ConnManager == nil { + config.ConnManager = connmgr.NewConnManager(100, 400, time.Second*20) + } + + if len(config.LSType) == 0 { + config.LSType = LogstorePersistent + } + + return nil +} + +type LogstoreType string + +const ( + LogstoreInMemory LogstoreType = "in-memory" + LogstorePersistent LogstoreType = "persistent" + LogstoreHybrid LogstoreType = "hybrid" +) + type NetConfig struct { HostAddr ma.Multiaddr ConnManager cconnmgr.ConnManager - Debug bool GRPCServerOptions []grpc.ServerOption GRPCDialOptions []grpc.DialOption + LSType LogstoreType PubSub bool + Debug bool } type NetOption func(c *NetConfig) error @@ -196,15 +232,17 @@ func WithNetPubSub(enabled bool) NetOption { } } +func WithNetLogstore(lt LogstoreType) NetOption { + return func(c *NetConfig) error { + c.LSType = lt + return nil + } +} + type netBoostrapper struct { - cancel context.CancelFunc app.Net litepeer *ipfslite.Peer - pstore peerstore.Peerstore - logstore datastore.Datastore - litestore datastore.Datastore - host host.Host - dht *dual.DHT + finalizer *util.Finalizer } var _ NetBoostrapper = (*netBoostrapper)(nil) @@ -218,22 +256,5 @@ func (tsb *netBoostrapper) GetIpfsLite() *ipfslite.Peer { } func (tsb *netBoostrapper) Close() error { - if err := tsb.Net.Close(); err != nil { - return err - } - tsb.cancel() - if err := tsb.dht.Close(); err != nil { - return err - } - if err := tsb.host.Close(); err != nil { - return err - } - if err := tsb.pstore.Close(); err != nil { - return err - } - if err := tsb.litestore.Close(); err != nil { - return err - } - return tsb.logstore.Close() - // Logstore closed by network + return tsb.finalizer.Cleanup(nil) } diff --git a/core/logstore/logstore.go b/core/logstore/logstore.go index bd8f67bb..55f3a790 100644 --- a/core/logstore/logstore.go +++ b/core/logstore/logstore.go @@ -2,6 +2,7 @@ package logstore import ( "context" + "errors" "fmt" "time" @@ -25,6 +26,9 @@ var ErrLogNotFound = fmt.Errorf("log not found") // ErrLogExists indicates a requested log already exists. var ErrLogExists = fmt.Errorf("log already exists") +// ErrEmptyDump indicates an attempt to restore from empty dump. +var ErrEmptyDump = errors.New("empty dump") + // Logstore stores log keys, addresses, heads and thread meta data. type Logstore interface { Close() error @@ -87,6 +91,12 @@ type ThreadMetadata interface { // ClearMetadata clears all metadata under a thread. ClearMetadata(t thread.ID) error + + // DumpMeta packs all the stored metadata. + DumpMeta() (DumpMetadata, error) + + // RestoreMeta restores metadata from the dump. + RestoreMeta(book DumpMetadata) error } // KeyBook stores log keys. @@ -126,6 +136,12 @@ type KeyBook interface { // ThreadsFromKeys returns a list of threads referenced in the book. ThreadsFromKeys() (thread.IDSlice, error) + + // DumpKeys packs all stored keys. + DumpKeys() (DumpKeyBook, error) + + // RestoreKeys restores keys from the dump. + RestoreKeys(book DumpKeyBook) error } // AddrBook stores log addresses. @@ -159,6 +175,12 @@ type AddrBook interface { // ThreadsFromAddrs returns a list of threads referenced in the book. ThreadsFromAddrs() (thread.IDSlice, error) + + // DumpHeads packs all stored addresses. + DumpAddrs() (DumpAddrBook, error) + + // RestoreHeads restores addresses from the dump. + RestoreAddrs(book DumpAddrBook) error } // HeadBook stores log heads. @@ -180,4 +202,48 @@ type HeadBook interface { // ClearHeads deletes the head entry for a log. ClearHeads(thread.ID, peer.ID) error + + // DumpHeads packs entire headbook into the tree. + DumpHeads() (DumpHeadBook, error) + + // RestoreHeads restores headbook from the dump. + RestoreHeads(DumpHeadBook) error } + +type ( + DumpHeadBook struct { + Data map[thread.ID]map[peer.ID][]cid.Cid + } + + ExpiredAddress struct { + Addr ma.Multiaddr + Expires time.Time + } + + DumpAddrBook struct { + Data map[thread.ID]map[peer.ID][]ExpiredAddress + } + + DumpKeyBook struct { + Data struct { + Public map[thread.ID]map[peer.ID]crypto.PubKey + Private map[thread.ID]map[peer.ID]crypto.PrivKey + Read map[thread.ID][]byte + Service map[thread.ID][]byte + } + } + + MetadataKey struct { + T thread.ID + K string + } + + DumpMetadata struct { + Data struct { + Int64 map[MetadataKey]int64 + Bool map[MetadataKey]bool + String map[MetadataKey]string + Bytes map[MetadataKey][]byte + } + } +) diff --git a/go.mod b/go.mod index 6975fe53..c26d2211 100644 --- a/go.mod +++ b/go.mod @@ -41,8 +41,10 @@ require ( github.com/libp2p/go-libp2p v0.10.3 github.com/libp2p/go-libp2p-connmgr v0.2.4 github.com/libp2p/go-libp2p-core v0.6.1 + github.com/libp2p/go-libp2p-crypto v0.1.0 github.com/libp2p/go-libp2p-gostream v0.2.0 github.com/libp2p/go-libp2p-kad-dht v0.8.3 + github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/libp2p/go-libp2p-pubsub v0.2.4 github.com/libp2p/go-libp2p-swarm v0.2.8 diff --git a/logstore/logstore.go b/logstore/logstore.go index 8f4da2b6..cb6796f1 100644 --- a/logstore/logstore.go +++ b/logstore/logstore.go @@ -12,6 +12,8 @@ import ( "github.com/textileio/go-threads/core/thread" ) +var _ core.Logstore = (*logstore)(nil) + var managedSuffix = "/managed" // logstore is a collection of books for storing thread logs. diff --git a/logstore/lstoreds/addr_book.go b/logstore/lstoreds/addr_book.go index 76ca5c17..0efce53b 100644 --- a/logstore/lstoreds/addr_book.go +++ b/logstore/lstoreds/addr_book.go @@ -453,3 +453,125 @@ func (ab *DsAddrBook) Close() error { ab.childrenDone.Wait() return nil } + +func (ab *DsAddrBook) DumpAddrs() (logstore.DumpAddrBook, error) { + // avoid interference with garbage collection + ab.gc.running <- struct{}{} + unlock := func() { <-ab.gc.running } + + var dump logstore.DumpAddrBook + data, err := ab.traverse(true) + unlock() + if err != nil { + return dump, fmt.Errorf("traversing datastore: %w", err) + } + + dump.Data = make(map[thread.ID]map[peer.ID][]logstore.ExpiredAddress, len(data)) + + for tid, logs := range data { + lm := make(map[peer.ID][]logstore.ExpiredAddress, len(logs)) + for lid, rec := range logs { + if len(rec.Addrs) > 0 { + var addrs = make([]logstore.ExpiredAddress, len(rec.Addrs)) + for i := 0; i < len(rec.Addrs); i++ { + var r = rec.Addrs[i] + addrs[i] = logstore.ExpiredAddress{ + Addr: r.Addr, + Expires: time.Unix(r.Expiry, 0), + } + } + lm[lid] = addrs + } + } + dump.Data[tid] = lm + } + + return dump, nil +} + +func (ab *DsAddrBook) RestoreAddrs(dump logstore.DumpAddrBook) error { + if !AllowEmptyRestore && len(dump.Data) == 0 { + return logstore.ErrEmptyDump + } + + // avoid interference with garbage collection + ab.gc.running <- struct{}{} + defer func() { <-ab.gc.running }() + + stored, err := ab.traverse(false) + if err != nil { + return fmt.Errorf("traversing datastore: %w", err) + } + + for tid, logs := range stored { + for lid := range logs { + if err := ab.ClearAddrs(tid, lid); err != nil { + return fmt.Errorf("clearing addrs for %s/%s: %w", tid, lid, err) + } + } + } + + var current = time.Now() + for tid, logs := range dump.Data { + for lid, addrs := range logs { + for _, addr := range addrs { + if ttl := addr.Expires.Sub(current); ttl > 0 { + if err := ab.setAddrs(tid, lid, []ma.Multiaddr{addr.Addr}, ttl, ttlOverride); err != nil { + return fmt.Errorf("setting address %s for %s/%s: %w", addr.Addr, tid, lid, err) + } + } + } + } + } + + return nil +} + +func (ab *DsAddrBook) traverse(withAddrs bool) (map[thread.ID]map[peer.ID]*pb.AddrBookRecord, error) { + var data = make(map[thread.ID]map[peer.ID]*pb.AddrBookRecord) + result, err := ab.ds.Query(query.Query{Prefix: logBookBase.String(), KeysOnly: !withAddrs}) + if err != nil { + return nil, err + } + defer result.Close() + + for entry := range result.Next() { + kns := ds.RawKey(entry.Key).Namespaces() + if len(kns) < 3 { + return nil, fmt.Errorf("bad addressbook key detected: %s", entry.Key) + } + + // get thread and log IDs from the key components + ts, ls := kns[len(kns)-2], kns[len(kns)-1] + + // parse thread ID + tid, err := parseThreadID(ts) + if err != nil { + return nil, fmt.Errorf("cannot restore thread ID %s: %w", ts, err) + } + + // parse log ID + lid, err := parseLogID(ls) + if err != nil { + return nil, fmt.Errorf("cannot restore log ID %s: %w", ls, err) + } + + var record *pb.AddrBookRecord + if withAddrs { + var pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} + if err := pr.Unmarshal(entry.Value); err != nil { + return nil, fmt.Errorf("cannot decode addressbook record: %w", err) + } + record = pr.AddrBookRecord + } + + la, exist := data[tid] + if !exist { + la = make(map[peer.ID]*pb.AddrBookRecord) + data[tid] = la + } + la[lid] = record + } + + return data, nil +} diff --git a/logstore/lstoreds/ds_test.go b/logstore/lstoreds/ds_test.go index 08f3a6db..88093ef5 100644 --- a/logstore/lstoreds/ds_test.go +++ b/logstore/lstoreds/ds_test.go @@ -13,13 +13,22 @@ import ( pt "github.com/textileio/go-threads/test" ) -type datastoreFactory func(tb testing.TB) (ds.Datastore, func()) +type datastoreFactory func(tb testing.TB) (ds.Batching, func()) var dstores = map[string]datastoreFactory{ "Badger": badgerStore, // "Leveldb": leveldbStore, } +func TestDatastoreLogstore(t *testing.T) { + for name, dsFactory := range dstores { + t.Run(name, func(t *testing.T) { + t.Parallel() + pt.LogstoreTest(t, logstoreFactory(t, dsFactory, DefaultOpts())) + }) + } +} + func TestDatastoreAddrBook(t *testing.T) { for name, dsFactory := range dstores { t.Run(name+" Cacheful", func(t *testing.T) { @@ -69,10 +78,25 @@ func TestDatastoreMetadataBook(t *testing.T) { } } +func logstoreFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.LogstoreFactory { + return func() (core.Logstore, func()) { + store, closeFunc := storeFactory(tb) + ls, err := NewLogstore(context.Background(), store, opts) + if err != nil { + tb.Fatal(err) + } + closer := func() { + _ = ls.Close() + closeFunc() + } + return ls, closer + } +} + func addressBookFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.AddrBookFactory { return func() (core.AddrBook, func()) { store, closeFunc := storeFactory(tb) - ab, err := NewAddrBook(context.Background(), store.(ds.Batching), opts) + ab, err := NewAddrBook(context.Background(), store, opts) if err != nil { tb.Fatal(err) } @@ -120,7 +144,7 @@ func metadataBookFactory(tb testing.TB, storeFactory datastoreFactory) pt.Metada } } -func badgerStore(tb testing.TB) (ds.Datastore, func()) { +func badgerStore(tb testing.TB) (ds.Batching, func()) { dataPath, err := ioutil.TempDir(os.TempDir(), "badger") if err != nil { tb.Fatal(err) diff --git a/logstore/lstoreds/headbook.go b/logstore/lstoreds/headbook.go index 5681d50d..3d5016d4 100644 --- a/logstore/lstoreds/headbook.go +++ b/logstore/lstoreds/headbook.go @@ -6,6 +6,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" "github.com/libp2p/go-libp2p-core/peer" core "github.com/textileio/go-threads/core/logstore" "github.com/textileio/go-threads/core/thread" @@ -92,8 +93,8 @@ func (hb *dsHeadBook) SetHeads(t thread.ID, p peer.ID, heads []cid.Cid) error { } entry := &pb.HeadBookRecord_HeadEntry{Cid: &pb.ProtoCid{Cid: heads[i]}} hr.Heads = append(hr.Heads, entry) - } + data, err := proto.Marshal(&hr) if err != nil { return fmt.Errorf("error when marshaling headbookrecord proto for %v: %w", key, err) @@ -131,3 +132,97 @@ func (hb *dsHeadBook) ClearHeads(t thread.ID, p peer.ID) error { } return nil } + +// Dump entire headbook into the tree-structure. +// Not a thread-safe, should not be interleaved with other methods! +func (hb *dsHeadBook) DumpHeads() (core.DumpHeadBook, error) { + data, err := hb.traverse(true) + return core.DumpHeadBook{Data: data}, err +} + +// Restore headbook from the provided dump replacing all the local data. +// Not a thread-safe, should not be interleaved with other methods! +func (hb *dsHeadBook) RestoreHeads(dump core.DumpHeadBook) error { + if !AllowEmptyRestore && len(dump.Data) == 0 { + return core.ErrEmptyDump + } + + stored, err := hb.traverse(false) + if err != nil { + return fmt.Errorf("traversing datastore: %w", err) + } + + // wipe out existing headbook + for tid, logs := range stored { + for lid := range logs { + if err := hb.ClearHeads(tid, lid); err != nil { + return fmt.Errorf("clearing heads for %s/%s: %w", tid, lid, err) + } + } + } + + // ... and replace it with the dump + for tid, logs := range dump.Data { + for lid, heads := range logs { + if err := hb.SetHeads(tid, lid, heads); err != nil { + return fmt.Errorf("setting heads for %s/%s: %w", tid, lid, err) + } + } + } + + return nil +} + +func (hb *dsHeadBook) traverse(withHeads bool) (map[thread.ID]map[peer.ID][]cid.Cid, error) { + var data = make(map[thread.ID]map[peer.ID][]cid.Cid) + result, err := hb.ds.Query(query.Query{Prefix: hbBase.String(), KeysOnly: !withHeads}) + if err != nil { + return nil, err + } + defer result.Close() + + for entry := range result.Next() { + kns := ds.RawKey(entry.Key).Namespaces() + if len(kns) < 3 { + return nil, fmt.Errorf("bad headbook key detected: %s", entry.Key) + } + + // get thread and log IDs from the key components + ts, ls := kns[len(kns)-2], kns[len(kns)-1] + + // parse thread ID + tid, err := parseThreadID(ts) + if err != nil { + return nil, fmt.Errorf("cannot restore thread ID %s: %w", ts, err) + } + + // parse log ID + lid, err := parseLogID(ls) + if err != nil { + return nil, fmt.Errorf("cannot restore log ID %s: %w", ls, err) + } + + var heads []cid.Cid + if withHeads { + var hr pb.HeadBookRecord + if err := proto.Unmarshal(entry.Value, &hr); err != nil { + return nil, fmt.Errorf("cannot decode headbook record: %w", err) + } + + heads = make([]cid.Cid, len(hr.Heads)) + for i := range hr.Heads { + heads[i] = hr.Heads[i].Cid.Cid + } + } + + lh, exist := data[tid] + if !exist { + lh = make(map[peer.ID][]cid.Cid) + data[tid] = lh + } + + lh[lid] = heads + } + + return data, nil +} diff --git a/logstore/lstoreds/keybook.go b/logstore/lstoreds/keybook.go index 19bc3d71..e0b9b04d 100644 --- a/logstore/lstoreds/keybook.go +++ b/logstore/lstoreds/keybook.go @@ -222,3 +222,150 @@ func (kb *dsKeyBook) ThreadsFromKeys() (thread.IDSlice, error) { } return ids, nil } + +func (kb *dsKeyBook) DumpKeys() (core.DumpKeyBook, error) { + var ( + dump core.DumpKeyBook + pub = make(map[thread.ID]map[peer.ID]crypto.PubKey) + priv = make(map[thread.ID]map[peer.ID]crypto.PrivKey) + rks = make(map[thread.ID][]byte) + sks = make(map[thread.ID][]byte) + ) + + result, err := kb.ds.Query(query.Query{Prefix: kbBase.String(), KeysOnly: false}) + if err != nil { + return dump, err + } + defer result.Close() + + for entry := range result.Next() { + kns := ds.RawKey(entry.Key).Namespaces() + if len(kns) < 4 { + return dump, fmt.Errorf("bad keybook key detected: %s", entry.Key) + } + + // discriminate by key suffix + switch suffix := "/" + kns[len(kns)-1]; suffix { + case pubSuffix.String(): + ts, ls := kns[2], kns[3] + tid, err := parseThreadID(ts) + if err != nil { + return dump, fmt.Errorf("cannot parse thread ID %s: %w", ts, err) + } + lid, err := parseLogID(ls) + if err != nil { + return dump, fmt.Errorf("cannot parse log ID %s: %w", ls, err) + } + pk, err := crypto.UnmarshalPublicKey(entry.Value) + if err != nil { + return dump, fmt.Errorf("cannot unmarshal public key: %w", err) + } + pkm, ok := pub[tid] + if !ok { + pkm = make(map[peer.ID]crypto.PubKey, 1) + pub[tid] = pkm + } + pkm[lid] = pk + + case privSuffix.String(): + ts, ls := kns[2], kns[3] + tid, err := parseThreadID(ts) + if err != nil { + return dump, fmt.Errorf("cannot parse thread ID %s: %w", ts, err) + } + lid, err := parseLogID(ls) + if err != nil { + return dump, fmt.Errorf("cannot parse log ID %s: %w", ls, err) + } + pk, err := crypto.UnmarshalPrivateKey(entry.Value) + if err != nil { + return dump, fmt.Errorf("cannot unmarshal private key: %w", err) + } + pkm, ok := priv[tid] + if !ok { + pkm = make(map[peer.ID]crypto.PrivKey, 1) + priv[tid] = pkm + } + pkm[lid] = pk + + case readSuffix.String(): + ts := kns[2] + tid, err := parseThreadID(ts) + if err != nil { + return dump, fmt.Errorf("cannot restore thread ID %s: %w", ts, err) + } + rks[tid] = entry.Value + + case serviceSuffix.String(): + ts := kns[2] + tid, err := parseThreadID(ts) + if err != nil { + return dump, fmt.Errorf("cannot restore thread ID %s: %w", ts, err) + } + sks[tid] = entry.Value + + default: + return dump, fmt.Errorf("bad suffix %s in a key: %s", suffix, entry.Key) + } + } + + dump.Data.Public = pub + dump.Data.Private = priv + dump.Data.Read = rks + dump.Data.Service = sks + + return dump, nil +} + +func (kb *dsKeyBook) RestoreKeys(dump core.DumpKeyBook) error { + if !AllowEmptyRestore && + len(dump.Data.Public) == 0 && + len(dump.Data.Private) == 0 && + len(dump.Data.Read) == 0 && + len(dump.Data.Service) == 0 { + return core.ErrEmptyDump + } + + // clear all local keys + if err := kb.clearKeys(kbBase); err != nil { + return err + } + + for tid, logs := range dump.Data.Public { + for lid, pubKey := range logs { + if err := kb.AddPubKey(tid, lid, pubKey); err != nil { + return err + } + } + } + + for tid, logs := range dump.Data.Private { + for lid, privKey := range logs { + if err := kb.AddPrivKey(tid, lid, privKey); err != nil { + return err + } + } + } + + for tid, rk := range dump.Data.Read { + key, err := sym.FromBytes(rk) + if err != nil { + return fmt.Errorf("decoding read key for thread %s: %w", tid, err) + } + if err := kb.AddReadKey(tid, key); err != nil { + return err + } + } + + for tid, sk := range dump.Data.Service { + key, err := sym.FromBytes(sk) + if err != nil { + return fmt.Errorf("decoding service key for thread %s: %w", tid, err) + } + if err := kb.AddServiceKey(tid, key); err != nil { + return err + } + } + + return nil +} diff --git a/logstore/lstoreds/logstore.go b/logstore/lstoreds/logstore.go index e853c10e..9e643fa8 100644 --- a/logstore/lstoreds/logstore.go +++ b/logstore/lstoreds/logstore.go @@ -13,6 +13,9 @@ import ( "github.com/whyrusleeping/base32" ) +// Define if storage will accept empty dumps. +var AllowEmptyRestore = false + // Configuration object for datastores type Options struct { // The size of the in-memory cache. A value of 0 or lower disables the cache. @@ -87,8 +90,7 @@ func uniqueThreadIds(ds ds.Datastore, prefix ds.Key, extractor func(result query ids := make(thread.IDSlice, 0, len(idset)) for id := range idset { - pid, _ := base32.RawStdEncoding.DecodeString(id) - id, err := thread.Cast(pid) + id, err := parseThreadID(id) if err == nil { ids = append(ids, id) } @@ -123,8 +125,7 @@ func uniqueLogIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.Re ids := make(peer.IDSlice, 0, len(idset)) for id := range idset { - pid, _ := base32.RawStdEncoding.DecodeString(id) - id, err := peer.IDFromBytes(pid) + id, err := parseLogID(id) if err == nil { ids = append(ids, id) } @@ -142,3 +143,21 @@ func dsLogKey(t thread.ID, p peer.ID, baseKey ds.Key) ds.Key { key = key.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))) return key } + +func parseThreadID(id string) (thread.ID, error) { + pid, err := base32.RawStdEncoding.DecodeString(id) + if err != nil { + return thread.Undef, err + } + + return thread.Cast(pid) +} + +func parseLogID(id string) (peer.ID, error) { + pid, err := base32.RawStdEncoding.DecodeString(id) + if err != nil { + return "", err + } + + return peer.IDFromBytes(pid) +} diff --git a/logstore/lstoreds/metadata.go b/logstore/lstoreds/metadata.go index 51577913..206a1227 100644 --- a/logstore/lstoreds/metadata.go +++ b/logstore/lstoreds/metadata.go @@ -129,11 +129,128 @@ func (m *dsThreadMetadata) setValue(t thread.ID, key string, val interface{}) er } func (m *dsThreadMetadata) ClearMetadata(t thread.ID) error { - q := query.Query{ - Prefix: tmetaBase.ChildString(base32.RawStdEncoding.EncodeToString(t.Bytes())).String(), - KeysOnly: true, + return m.clearKeys(tmetaBase.ChildString(base32.RawStdEncoding.EncodeToString(t.Bytes())).String()) +} + +func (m *dsThreadMetadata) DumpMeta() (core.DumpMetadata, error) { + var ( + vBool = make(map[core.MetadataKey]bool) + vInt64 = make(map[core.MetadataKey]int64) + vString = make(map[core.MetadataKey]string) + vBytes = make(map[core.MetadataKey][]byte) + + buff bytes.Buffer + dump core.DumpMetadata + dec = gob.NewDecoder(&buff) + ) + + results, err := m.ds.Query(query.Query{Prefix: tmetaBase.String()}) + if err != nil { + return dump, err + } + defer results.Close() + + for entry := range results.Next() { + kns := ds.RawKey(entry.Key).Namespaces() + if len(kns) < 4 { + return dump, fmt.Errorf("bad metabook key detected: %s", entry.Key) + } + + ts, key := kns[2], kns[3] + tid, err := parseThreadID(ts) + if err != nil { + return dump, fmt.Errorf("cannot parse thread ID %s: %w", ts, err) + } + + var mk = core.MetadataKey{T: tid, K: key} + + // we (kinda) don't know about type on the wire, so try to decode into every known type + { + var value int64 + buff.Reset() + buff.Write(entry.Value) + if dec.Decode(&value) == nil { + vInt64[mk] = value + continue + } + } + { + var value bool + buff.Reset() + buff.Write(entry.Value) + if dec.Decode(&value) == nil { + vBool[mk] = value + continue + } + } + { + var value string + buff.Reset() + buff.Write(entry.Value) + if dec.Decode(&value) == nil { + vString[mk] = value + continue + } + } + { + var value []byte + buff.Reset() + buff.Write(entry.Value) + if dec.Decode(&value) == nil { + vBytes[mk] = value + continue + } + } + + return dump, fmt.Errorf("cannot decode value at key: %v, value: %v", mk, entry.Value) + } + + dump.Data.Bool = vBool + dump.Data.Int64 = vInt64 + dump.Data.String = vString + dump.Data.Bytes = vBytes + return dump, nil +} + +func (m *dsThreadMetadata) RestoreMeta(dump core.DumpMetadata) error { + var dataLen = len(dump.Data.Bool) + + len(dump.Data.Int64) + + len(dump.Data.String) + + len(dump.Data.Bytes) + if !AllowEmptyRestore && dataLen == 0 { + return core.ErrEmptyDump + } + + if err := m.clearKeys(tmetaBase.String()); err != nil { + return err } - results, err := m.ds.Query(q) + + for mk, val := range dump.Data.Bool { + if err := m.setValue(mk.T, mk.K, val); err != nil { + return err + } + } + for mk, val := range dump.Data.Int64 { + if err := m.setValue(mk.T, mk.K, val); err != nil { + return err + } + } + for mk, val := range dump.Data.String { + if err := m.setValue(mk.T, mk.K, val); err != nil { + return err + } + } + for mk, val := range dump.Data.Bytes { + if err := m.setValue(mk.T, mk.K, val); err != nil { + return err + } + } + + return nil +} + +func (m *dsThreadMetadata) clearKeys(prefix string) error { + results, err := m.ds.Query(query.Query{Prefix: prefix, KeysOnly: true}) if err != nil { return err } diff --git a/logstore/lstorehybrid/hybrid_test.go b/logstore/lstorehybrid/hybrid_test.go new file mode 100644 index 00000000..0f45018f --- /dev/null +++ b/logstore/lstorehybrid/hybrid_test.go @@ -0,0 +1,152 @@ +package lstorehybrid + +import ( + "context" + "io/ioutil" + "os" + "testing" + + badger "github.com/ipfs/go-ds-badger" + core "github.com/textileio/go-threads/core/logstore" + "github.com/textileio/go-threads/logstore/lstoreds" + m "github.com/textileio/go-threads/logstore/lstoremem" + pt "github.com/textileio/go-threads/test" +) + +type storeFactory func(tb testing.TB) (core.Logstore, func()) + +var ( + persist = map[string]storeFactory{ + "lstoreds:Badger": lstoredsBadgerF, + } + + inMem = map[string]storeFactory{ + "lstoremem": lstorememF, + } +) + +func TestHybridLogstore(t *testing.T) { + for psName, psF := range persist { + for msName, msF := range inMem { + t.Run(psName+"+"+msName, func(t *testing.T) { + t.Parallel() + pt.LogstoreTest(t, logstoreFactory(t, psF, msF)) + }) + } + } +} + +func TestHybridAddrBook(t *testing.T) { + for psName, psF := range persist { + for msName, msF := range inMem { + t.Run(psName+"+"+msName, func(t *testing.T) { + t.Parallel() + pt.AddrBookTest(t, adapterAddrBook(logstoreFactory(t, psF, msF))) + }) + } + } +} + +func TestHybridKeyBook(t *testing.T) { + for psName, psF := range persist { + for msName, msF := range inMem { + t.Run(psName+"+"+msName, func(t *testing.T) { + t.Parallel() + pt.KeyBookTest(t, adapterKeyBook(logstoreFactory(t, psF, msF))) + }) + } + } +} + +func TestHybridHeadBook(t *testing.T) { + for psName, psF := range persist { + for msName, msF := range inMem { + t.Run(psName+"+"+msName, func(t *testing.T) { + t.Parallel() + pt.HeadBookTest(t, adapterHeadBook(logstoreFactory(t, psF, msF))) + }) + } + } +} + +func TestHybridMetadataBook(t *testing.T) { + for psName, psF := range persist { + for msName, msF := range inMem { + t.Run(psName+"+"+msName, func(t *testing.T) { + t.Parallel() + pt.MetadataBookTest(t, adapterMetaBook(logstoreFactory(t, psF, msF))) + }) + } + } +} + +/* store factories */ + +func logstoreFactory(tb testing.TB, persistF, memF storeFactory) pt.LogstoreFactory { + return func() (core.Logstore, func()) { + ps, psClose := persistF(tb) + ms, msClose := memF(tb) + + ls, err := NewLogstore(ps, ms) + if err != nil { + tb.Fatal(err) + } + + closer := func() { + _ = ls.Close() + psClose() + msClose() + } + + return ls, closer + } +} + +func lstoredsBadgerF(tb testing.TB) (core.Logstore, func()) { + dataPath, err := ioutil.TempDir(os.TempDir(), "badger") + if err != nil { + tb.Fatal(err) + } + + backend, err := badger.NewDatastore(dataPath, nil) + if err != nil { + tb.Fatal(err) + } + + lstore, err := lstoreds.NewLogstore( + context.Background(), + backend, + lstoreds.DefaultOpts(), + ) + + closer := func() { + _ = lstore.Close() + _ = backend.Close() + _ = os.RemoveAll(dataPath) + } + + return lstore, closer +} + +func lstorememF(_ testing.TB) (core.Logstore, func()) { + store := m.NewLogstore() + return store, func() { _ = store.Close() } +} + +/* component adapters */ + +func adapterAddrBook(f pt.LogstoreFactory) pt.AddrBookFactory { + return func() (core.AddrBook, func()) { return f() } +} + +func adapterKeyBook(f pt.LogstoreFactory) pt.KeyBookFactory { + return func() (core.KeyBook, func()) { return f() } +} + +func adapterHeadBook(f pt.LogstoreFactory) pt.HeadBookFactory { + return func() (core.HeadBook, func()) { return f() } +} + +func adapterMetaBook(f pt.LogstoreFactory) pt.MetadataBookFactory { + return func() (core.ThreadMetadata, func()) { return f() } +} diff --git a/logstore/lstorehybrid/logstore.go b/logstore/lstorehybrid/logstore.go new file mode 100644 index 00000000..2a264911 --- /dev/null +++ b/logstore/lstorehybrid/logstore.go @@ -0,0 +1,367 @@ +package lstorehybrid + +import ( + "context" + "fmt" + "time" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" + core "github.com/textileio/go-threads/core/logstore" + "github.com/textileio/go-threads/core/thread" + sym "github.com/textileio/go-threads/crypto/symmetric" +) + +var _ core.Logstore = (*lstore)(nil) + +type lstore struct { + inMem, persist core.Logstore +} + +func NewLogstore(persist, inMem core.Logstore) (*lstore, error) { + // on a start it's required to synchronize both storages, so we + // initialize in-memory storage with data from the persistent one + dKeys, err := persist.DumpKeys() + if err != nil { + return nil, fmt.Errorf("dumping keys from persistent storage: %w", err) + } + dAddrs, err := persist.DumpAddrs() + if err != nil { + return nil, fmt.Errorf("dumping addresses from persistent storage: %w", err) + } + dHeads, err := persist.DumpHeads() + if err != nil { + return nil, fmt.Errorf("dumping heads from persistent storage: %w", err) + } + dMeta, err := persist.DumpMeta() + if err != nil { + return nil, fmt.Errorf("dumping metadata from persistent storage: %w", err) + } + + // initialize in-memory storage + if err := inMem.RestoreKeys(dKeys); err != nil { + return nil, fmt.Errorf("initializing in-memory storage with keys: %w", err) + } + if err := inMem.RestoreAddrs(dAddrs); err != nil { + return nil, fmt.Errorf("initializing in-memory storage with addresses: %w", err) + } + if err := inMem.RestoreHeads(dHeads); err != nil { + return nil, fmt.Errorf("initializing in-memory storage with heads: %w", err) + } + if err := inMem.RestoreMeta(dMeta); err != nil { + return nil, fmt.Errorf("initializing in-memory storage with metadata: %w", err) + } + + return &lstore{inMem: inMem, persist: persist}, nil +} + +func (l *lstore) Close() error { + if err := l.persist.Close(); err != nil { + return err + } + return l.inMem.Close() +} + +func (l *lstore) GetInt64(tid thread.ID, key string) (*int64, error) { + return l.inMem.GetInt64(tid, key) +} + +func (l *lstore) PutInt64(tid thread.ID, key string, val int64) error { + if err := l.persist.PutInt64(tid, key, val); err != nil { + return err + } + return l.inMem.PutInt64(tid, key, val) +} + +func (l *lstore) GetString(tid thread.ID, key string) (*string, error) { + return l.inMem.GetString(tid, key) +} + +func (l *lstore) PutString(tid thread.ID, key string, val string) error { + if err := l.persist.PutString(tid, key, val); err != nil { + return err + } + return l.inMem.PutString(tid, key, val) +} + +func (l *lstore) GetBool(tid thread.ID, key string) (*bool, error) { + return l.inMem.GetBool(tid, key) +} + +func (l *lstore) PutBool(tid thread.ID, key string, val bool) error { + if err := l.persist.PutBool(tid, key, val); err != nil { + return err + } + return l.inMem.PutBool(tid, key, val) +} + +func (l *lstore) GetBytes(tid thread.ID, key string) (*[]byte, error) { + return l.inMem.GetBytes(tid, key) +} + +func (l *lstore) PutBytes(tid thread.ID, key string, val []byte) error { + if err := l.persist.PutBytes(tid, key, val); err != nil { + return err + } + return l.inMem.PutBytes(tid, key, val) +} + +func (l *lstore) ClearMetadata(tid thread.ID) error { + if err := l.persist.ClearMetadata(tid); err != nil { + return err + } + return l.inMem.ClearMetadata(tid) +} + +func (l *lstore) PubKey(tid thread.ID, lid peer.ID) (crypto.PubKey, error) { + return l.inMem.PubKey(tid, lid) +} + +func (l *lstore) AddPubKey(tid thread.ID, lid peer.ID, key crypto.PubKey) error { + if err := l.persist.AddPubKey(tid, lid, key); err != nil { + return err + } + return l.inMem.AddPubKey(tid, lid, key) +} + +func (l *lstore) PrivKey(tid thread.ID, lid peer.ID) (crypto.PrivKey, error) { + return l.inMem.PrivKey(tid, lid) +} + +func (l *lstore) AddPrivKey(tid thread.ID, lid peer.ID, key crypto.PrivKey) error { + if err := l.persist.AddPrivKey(tid, lid, key); err != nil { + return err + } + return l.inMem.AddPrivKey(tid, lid, key) +} + +func (l *lstore) ReadKey(tid thread.ID) (*sym.Key, error) { + return l.inMem.ReadKey(tid) +} + +func (l *lstore) AddReadKey(tid thread.ID, key *sym.Key) error { + if err := l.persist.AddReadKey(tid, key); err != nil { + return err + } + return l.inMem.AddReadKey(tid, key) +} + +func (l *lstore) ServiceKey(tid thread.ID) (*sym.Key, error) { + return l.inMem.ServiceKey(tid) +} + +func (l *lstore) AddServiceKey(tid thread.ID, key *sym.Key) error { + if err := l.persist.AddServiceKey(tid, key); err != nil { + return err + } + return l.inMem.AddServiceKey(tid, key) +} + +func (l *lstore) ClearKeys(tid thread.ID) error { + if err := l.persist.ClearKeys(tid); err != nil { + return err + } + return l.inMem.ClearKeys(tid) +} + +func (l *lstore) ClearLogKeys(tid thread.ID, lid peer.ID) error { + if err := l.persist.ClearLogKeys(tid, lid); err != nil { + return err + } + return l.inMem.ClearLogKeys(tid, lid) +} + +func (l *lstore) LogsWithKeys(tid thread.ID) (peer.IDSlice, error) { + return l.inMem.LogsWithKeys(tid) +} + +func (l *lstore) ThreadsFromKeys() (thread.IDSlice, error) { + return l.inMem.ThreadsFromKeys() +} + +func (l *lstore) AddAddr(tid thread.ID, lid peer.ID, addr ma.Multiaddr, dur time.Duration) error { + if err := l.persist.AddAddr(tid, lid, addr, dur); err != nil { + return err + } + return l.inMem.AddAddr(tid, lid, addr, dur) +} + +func (l *lstore) AddAddrs(tid thread.ID, lid peer.ID, addrs []ma.Multiaddr, dur time.Duration) error { + if err := l.persist.AddAddrs(tid, lid, addrs, dur); err != nil { + return err + } + return l.inMem.AddAddrs(tid, lid, addrs, dur) +} + +func (l *lstore) SetAddr(tid thread.ID, lid peer.ID, addr ma.Multiaddr, dur time.Duration) error { + if err := l.persist.SetAddr(tid, lid, addr, dur); err != nil { + return err + } + return l.inMem.SetAddr(tid, lid, addr, dur) +} + +func (l *lstore) SetAddrs(tid thread.ID, lid peer.ID, addrs []ma.Multiaddr, dur time.Duration) error { + if err := l.persist.SetAddrs(tid, lid, addrs, dur); err != nil { + return err + } + return l.inMem.SetAddrs(tid, lid, addrs, dur) +} + +func (l *lstore) UpdateAddrs(tid thread.ID, lid peer.ID, oldTTL time.Duration, newTTL time.Duration) error { + if err := l.persist.UpdateAddrs(tid, lid, oldTTL, newTTL); err != nil { + return err + } + return l.inMem.UpdateAddrs(tid, lid, oldTTL, newTTL) +} + +func (l *lstore) Addrs(tid thread.ID, lid peer.ID) ([]ma.Multiaddr, error) { + return l.inMem.Addrs(tid, lid) +} + +func (l *lstore) AddrStream(ctx context.Context, tid thread.ID, lid peer.ID) (<-chan ma.Multiaddr, error) { + return l.inMem.AddrStream(ctx, tid, lid) +} + +func (l *lstore) ClearAddrs(tid thread.ID, lid peer.ID) error { + if err := l.persist.ClearAddrs(tid, lid); err != nil { + return err + } + return l.inMem.ClearAddrs(tid, lid) +} + +func (l *lstore) LogsWithAddrs(tid thread.ID) (peer.IDSlice, error) { + return l.inMem.LogsWithAddrs(tid) +} + +func (l *lstore) ThreadsFromAddrs() (thread.IDSlice, error) { + return l.inMem.ThreadsFromAddrs() +} + +func (l *lstore) AddHead(tid thread.ID, lid peer.ID, cid cid.Cid) error { + if err := l.persist.AddHead(tid, lid, cid); err != nil { + return err + } + return l.inMem.AddHead(tid, lid, cid) +} + +func (l *lstore) AddHeads(tid thread.ID, lid peer.ID, cids []cid.Cid) error { + if err := l.persist.AddHeads(tid, lid, cids); err != nil { + return err + } + return l.inMem.AddHeads(tid, lid, cids) +} + +func (l *lstore) SetHead(tid thread.ID, lid peer.ID, cid cid.Cid) error { + if err := l.persist.SetHead(tid, lid, cid); err != nil { + return err + } + return l.inMem.SetHead(tid, lid, cid) +} + +func (l *lstore) SetHeads(tid thread.ID, lid peer.ID, cids []cid.Cid) error { + if err := l.persist.SetHeads(tid, lid, cids); err != nil { + return err + } + return l.inMem.SetHeads(tid, lid, cids) +} + +func (l *lstore) Heads(tid thread.ID, lid peer.ID) ([]cid.Cid, error) { + return l.inMem.Heads(tid, lid) +} + +func (l *lstore) ClearHeads(tid thread.ID, lid peer.ID) error { + if err := l.persist.ClearHeads(tid, lid); err != nil { + return err + } + return l.inMem.ClearHeads(tid, lid) +} + +func (l *lstore) Threads() (thread.IDSlice, error) { + return l.inMem.Threads() +} + +func (l *lstore) AddThread(info thread.Info) error { + if err := l.persist.AddThread(info); err != nil { + return err + } + return l.inMem.AddThread(info) +} + +func (l *lstore) GetThread(tid thread.ID) (thread.Info, error) { + return l.inMem.GetThread(tid) +} + +func (l *lstore) DeleteThread(tid thread.ID) error { + if err := l.persist.DeleteThread(tid); err != nil { + return err + } + return l.inMem.DeleteThread(tid) +} + +func (l *lstore) AddLog(tid thread.ID, info thread.LogInfo) error { + if err := l.persist.AddLog(tid, info); err != nil { + return err + } + return l.inMem.AddLog(tid, info) +} + +func (l *lstore) GetLog(tid thread.ID, lid peer.ID) (thread.LogInfo, error) { + return l.inMem.GetLog(tid, lid) +} + +func (l *lstore) GetManagedLogs(tid thread.ID) ([]thread.LogInfo, error) { + return l.inMem.GetManagedLogs(tid) +} + +func (l *lstore) DeleteLog(tid thread.ID, lid peer.ID) error { + if err := l.persist.DeleteLog(tid, lid); err != nil { + return err + } + return l.inMem.DeleteLog(tid, lid) +} + +func (l *lstore) DumpMeta() (core.DumpMetadata, error) { + return l.inMem.DumpMeta() +} + +func (l *lstore) RestoreMeta(dump core.DumpMetadata) error { + if err := l.persist.RestoreMeta(dump); err != nil { + return err + } + return l.inMem.RestoreMeta(dump) +} + +func (l *lstore) DumpKeys() (core.DumpKeyBook, error) { + return l.inMem.DumpKeys() +} + +func (l *lstore) RestoreKeys(dump core.DumpKeyBook) error { + if err := l.persist.RestoreKeys(dump); err != nil { + return err + } + return l.inMem.RestoreKeys(dump) +} + +func (l *lstore) DumpAddrs() (core.DumpAddrBook, error) { + return l.inMem.DumpAddrs() +} + +func (l *lstore) RestoreAddrs(dump core.DumpAddrBook) error { + if err := l.persist.RestoreAddrs(dump); err != nil { + return err + } + return l.inMem.RestoreAddrs(dump) +} + +func (l *lstore) DumpHeads() (core.DumpHeadBook, error) { + return l.inMem.DumpHeads() +} + +func (l *lstore) RestoreHeads(dump core.DumpHeadBook) error { + if err := l.persist.RestoreHeads(dump); err != nil { + return err + } + return l.inMem.RestoreHeads(dump) +} diff --git a/logstore/lstoremem/addr_book.go b/logstore/lstoremem/addr_book.go index 1ec80760..22d30930 100644 --- a/logstore/lstoremem/addr_book.go +++ b/logstore/lstoremem/addr_book.go @@ -56,6 +56,7 @@ type memoryAddrBook struct { ctx context.Context cancel func() + gcLock sync.Mutex subManager *AddrSubManager } @@ -104,7 +105,10 @@ func (mab *memoryAddrBook) Close() error { // gc garbage collects the in-memory address book. func (mab *memoryAddrBook) gc() { - now := time.Now() + mab.gcLock.Lock() + defer mab.gcLock.Unlock() + + var now = time.Now() for _, s := range mab.segments { s.Lock() for t, pmap := range s.addrs { @@ -324,6 +328,80 @@ func (mab *memoryAddrBook) AddrStream(ctx context.Context, t thread.ID, p peer.I return mab.subManager.AddrStream(ctx, p, initial) } +func (mab *memoryAddrBook) DumpAddrs() (core.DumpAddrBook, error) { + var dump = core.DumpAddrBook{ + Data: make(map[thread.ID]map[peer.ID][]core.ExpiredAddress, 256), + } + + mab.gcLock.Lock() + var now = time.Now() + + for _, segment := range mab.segments { + for tid, logs := range segment.addrs { + lm, exist := dump.Data[tid] + if !exist { + lm = make(map[peer.ID][]core.ExpiredAddress, len(logs)) + dump.Data[tid] = lm + } + + for lid, addrMap := range logs { + for _, ap := range addrMap { + if ap != nil && !ap.ExpiredBy(now) { + lm[lid] = append(lm[lid], core.ExpiredAddress{ + Addr: ap.Addr, + Expires: ap.Expires, + }) + } + } + } + } + } + mab.gcLock.Unlock() + return dump, nil +} + +func (mab *memoryAddrBook) RestoreAddrs(dump core.DumpAddrBook) error { + if !AllowEmptyRestore && len(dump.Data) == 0 { + return core.ErrEmptyDump + } + + mab.gcLock.Lock() + defer mab.gcLock.Unlock() + + // reset segments + for i := range mab.segments { + mab.segments[i] = &addrSegment{ + addrs: make(map[thread.ID]map[peer.ID]map[string]*expiringAddr, len(mab.segments[i].addrs)), + } + } + + var now = time.Now() + for tid, logs := range dump.Data { + for lid, addrs := range logs { + s := mab.segments.get(lid) + am, _ := s.getAddrs(tid, lid) + if am == nil { + if s.addrs[tid] == nil { + s.addrs[tid] = make(map[peer.ID]map[string]*expiringAddr, 1) + } + am = make(map[string]*expiringAddr, len(addrs)) + s.addrs[tid][lid] = am + } + + for _, rec := range addrs { + if rec.Expires.After(now) { + am[string(rec.Addr.Bytes())] = &expiringAddr{ + Addr: rec.Addr, + TTL: rec.Expires.Sub(now), + Expires: rec.Expires, + } + } + } + } + } + return nil +} + type addrSub struct { pubch chan ma.Multiaddr ctx context.Context diff --git a/logstore/lstoremem/headbook.go b/logstore/lstoremem/headbook.go index 159a9a46..8d778577 100644 --- a/logstore/lstoremem/headbook.go +++ b/logstore/lstoremem/headbook.go @@ -4,7 +4,7 @@ import ( "sync" "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peer" core "github.com/textileio/go-threads/core/logstore" "github.com/textileio/go-threads/core/thread" ) @@ -114,3 +114,45 @@ func (mhb *memoryHeadBook) ClearHeads(t thread.ID, p peer.ID) error { } return nil } + +func (mhb *memoryHeadBook) DumpHeads() (core.DumpHeadBook, error) { + var dump = core.DumpHeadBook{ + Data: make(map[thread.ID]map[peer.ID][]cid.Cid, len(mhb.heads)), + } + + for tid, logs := range mhb.heads { + lm := make(map[peer.ID][]cid.Cid, len(logs)) + for lid, hs := range logs { + heads := make([]cid.Cid, 0, len(hs)) + for head := range hs { + heads = append(heads, head) + } + lm[lid] = heads + } + dump.Data[tid] = lm + } + + return dump, nil +} + +func (mhb *memoryHeadBook) RestoreHeads(dump core.DumpHeadBook) error { + if !AllowEmptyRestore && len(dump.Data) == 0 { + return core.ErrEmptyDump + } + + var restored = make(map[thread.ID]map[peer.ID]map[cid.Cid]struct{}, len(dump.Data)) + for tid, logs := range dump.Data { + lm := make(map[peer.ID]map[cid.Cid]struct{}, len(logs)) + for lid, hs := range logs { + hm := make(map[cid.Cid]struct{}, len(hs)) + for _, head := range hs { + hm[head] = struct{}{} + } + lm[lid] = hm + } + restored[tid] = lm + } + + mhb.heads = restored + return nil +} diff --git a/logstore/lstoremem/keybook.go b/logstore/lstoremem/keybook.go index 692aaa7b..72115f64 100644 --- a/logstore/lstoremem/keybook.go +++ b/logstore/lstoremem/keybook.go @@ -194,3 +194,66 @@ func (mkb *memoryKeyBook) ThreadsFromKeys() (thread.IDSlice, error) { } return tids, nil } + +func (mkb *memoryKeyBook) DumpKeys() (core.DumpKeyBook, error) { + mkb.RLock() + defer mkb.RUnlock() + + var ( + dump core.DumpKeyBook + public = make(map[thread.ID]map[peer.ID]crypto.PubKey, len(mkb.pks)) + private = make(map[thread.ID]map[peer.ID]crypto.PrivKey, len(mkb.sks)) + read = make(map[thread.ID][]byte, len(mkb.rks)) + service = make(map[thread.ID][]byte, len(mkb.fks)) + ) + + for tid, logs := range mkb.pks { + lm := make(map[peer.ID]crypto.PubKey, len(logs)) + for lid, key := range logs { + lm[lid] = key + } + public[tid] = lm + } + + for tid, logs := range mkb.sks { + lm := make(map[peer.ID]crypto.PrivKey, len(logs)) + for lid, key := range logs { + lm[lid] = key + } + private[tid] = lm + } + + for tid, key := range mkb.rks { + read[tid] = key + } + + for tid, key := range mkb.fks { + service[tid] = key + } + + dump.Data.Public = public + dump.Data.Private = private + dump.Data.Read = read + dump.Data.Service = service + + return dump, nil +} + +func (mkb *memoryKeyBook) RestoreKeys(dump core.DumpKeyBook) error { + if !AllowEmptyRestore && + len(dump.Data.Public) == 0 && + len(dump.Data.Private) == 0 && + len(dump.Data.Read) == 0 && + len(dump.Data.Service) == 0 { + return core.ErrEmptyDump + } + + mkb.Lock() + defer mkb.Unlock() + + mkb.pks = dump.Data.Public + mkb.sks = dump.Data.Private + mkb.rks = dump.Data.Read + mkb.fks = dump.Data.Service + return nil +} diff --git a/logstore/lstoremem/logstore.go b/logstore/lstoremem/logstore.go index ea7e1eb4..5057ef8a 100644 --- a/logstore/lstoremem/logstore.go +++ b/logstore/lstoremem/logstore.go @@ -5,6 +5,9 @@ import ( lstore "github.com/textileio/go-threads/logstore" ) +// Define if storage will accept empty dumps. +var AllowEmptyRestore = true + // NewLogstore creates an in-memory threadsafe collection of thread logs. func NewLogstore() core.Logstore { return lstore.NewLogstore( diff --git a/logstore/lstoremem/metadata.go b/logstore/lstoremem/metadata.go index 9746d949..dc3feeb3 100644 --- a/logstore/lstoremem/metadata.go +++ b/logstore/lstoremem/metadata.go @@ -1,33 +1,23 @@ package lstoremem import ( + "fmt" "sync" core "github.com/textileio/go-threads/core/logstore" "github.com/textileio/go-threads/core/thread" ) -var internKeys = map[string]bool{ - "Name": true, -} - -type metakey struct { - id thread.ID - key string -} - type memoryThreadMetadata struct { - ds map[metakey]interface{} - dslock sync.RWMutex - interned map[string]interface{} + ds map[core.MetadataKey]interface{} + dslock sync.RWMutex } var _ core.ThreadMetadata = (*memoryThreadMetadata)(nil) func NewThreadMetadata() core.ThreadMetadata { return &memoryThreadMetadata{ - ds: make(map[metakey]interface{}), - interned: make(map[string]interface{}), + ds: make(map[core.MetadataKey]interface{}), } } @@ -88,20 +78,13 @@ func (m *memoryThreadMetadata) GetBytes(t thread.ID, key string) (*[]byte, error func (m *memoryThreadMetadata) putValue(t thread.ID, key string, val interface{}) { m.dslock.Lock() defer m.dslock.Unlock() - if vals, ok := val.(string); ok && internKeys[key] { - if interned, ok := m.interned[vals]; ok { - val = interned - } else { - m.interned[vals] = val - } - } - m.ds[metakey{t, key}] = val + m.ds[core.MetadataKey{T: t, K: key}] = val } func (m *memoryThreadMetadata) getValue(t thread.ID, key string) interface{} { m.dslock.RLock() defer m.dslock.RUnlock() - if v, ok := m.ds[metakey{t, key}]; ok { + if v, ok := m.ds[core.MetadataKey{T: t, K: key}]; ok { return v } return nil @@ -111,9 +94,75 @@ func (m *memoryThreadMetadata) ClearMetadata(t thread.ID) error { m.dslock.Lock() defer m.dslock.Unlock() for k := range m.ds { - if k.id.Equals(t) { + if k.T.Equals(t) { delete(m.ds, k) } } return nil } + +func (m *memoryThreadMetadata) DumpMeta() (core.DumpMetadata, error) { + m.dslock.RLock() + defer m.dslock.RUnlock() + + var ( + dump core.DumpMetadata + vInt64 = make(map[core.MetadataKey]int64) + vBool = make(map[core.MetadataKey]bool) + vString = make(map[core.MetadataKey]string) + vBytes = make(map[core.MetadataKey][]byte) + ) + + for mk, value := range m.ds { + switch v := value.(type) { + case bool: + vBool[mk] = v + case int64: + vInt64[mk] = v + case string: + vString[mk] = v + case []byte: + vBytes[mk] = v + default: + return dump, fmt.Errorf("unsupported value type %T, key: %v, value: %v", value, mk, value) + } + } + + dump.Data.Bool = vBool + dump.Data.Int64 = vInt64 + dump.Data.String = vString + dump.Data.Bytes = vBytes + return dump, nil +} + +func (m *memoryThreadMetadata) RestoreMeta(dump core.DumpMetadata) error { + var dataLen = len(dump.Data.Bool) + + len(dump.Data.Int64) + + len(dump.Data.String) + + len(dump.Data.Bytes) + if !AllowEmptyRestore && dataLen == 0 { + return core.ErrEmptyDump + } + + m.dslock.Lock() + defer m.dslock.Unlock() + + // clear local data + m.ds = make(map[core.MetadataKey]interface{}, dataLen) + + // replace with dump + for mk, val := range dump.Data.Bool { + m.ds[mk] = val + } + for mk, val := range dump.Data.Int64 { + m.ds[mk] = val + } + for mk, val := range dump.Data.String { + m.ds[mk] = val + } + for mk, val := range dump.Data.Bytes { + m.ds[mk] = val + } + + return nil +} diff --git a/test/addr_book_suite.go b/test/addr_book_suite.go index 5937d053..ea2fa1c9 100644 --- a/test/addr_book_suite.go +++ b/test/addr_book_suite.go @@ -20,7 +20,8 @@ var addressBookSuite = map[string]func(book core.AddrBook) func(*testing.T){ "AddressesExpire": testAddressesExpire, "ClearWithIter": testClearWithIterator, "LogsWithAddresses": testLogsWithAddrs, - "ThreadsWithAddresses": testThreadsFromddrs, + "ThreadsWithAddresses": testThreadsFromAddrs, + "ExportAddressBook": testExportAddressBook, } type AddrBookFactory func() (core.AddrBook, func()) @@ -355,7 +356,7 @@ func testLogsWithAddrs(ab core.AddrBook) func(t *testing.T) { } } -func testThreadsFromddrs(ab core.AddrBook) func(t *testing.T) { +func testThreadsFromAddrs(ab core.AddrBook) func(t *testing.T) { return func(t *testing.T) { // cannot run in parallel as the store is modified. // go runs sequentially in the specified order @@ -363,7 +364,7 @@ func testThreadsFromddrs(ab core.AddrBook) func(t *testing.T) { t.Run("empty addrbook", func(t *testing.T) { if logs, err := ab.ThreadsFromAddrs(); err != nil || len(logs) != 0 { - t.Fatal("expected to find no threads witout errors") + t.Fatal("expected to find no threads without errors") } }) @@ -387,6 +388,31 @@ func testThreadsFromddrs(ab core.AddrBook) func(t *testing.T) { } } +func testExportAddressBook(ab core.AddrBook) func(*testing.T) { + return func(t *testing.T) { + tid := thread.NewIDV1(thread.Raw, 24) + + t.Run("dump and restore", func(t *testing.T) { + ids := GeneratePeerIDs(2) + addrs := GenerateAddrs(2) + + check(t, ab.AddAddr(tid, ids[0], addrs[0], time.Hour)) + check(t, ab.AddAddr(tid, ids[1], addrs[1], time.Hour)) + + dump, err := ab.DumpAddrs() + check(t, err) + + check(t, ab.ClearAddrs(tid, ids[0])) + check(t, ab.ClearAddrs(tid, ids[1])) + + check(t, ab.RestoreAddrs(dump)) + + AssertAddressesEqual(t, addrs[:1], checkedAddrs(t, ab, tid, ids[0])) + AssertAddressesEqual(t, addrs[1:], checkedAddrs(t, ab, tid, ids[1])) + }) + } +} + func checkedAddrs(t *testing.T, ab core.AddrBook, tid thread.ID, id peer.ID) []ma.Multiaddr { addrs, err := ab.Addrs(tid, id) if err != nil { diff --git a/test/headbook_suite.go b/test/headbook_suite.go index 32deb874..c485d4ac 100644 --- a/test/headbook_suite.go +++ b/test/headbook_suite.go @@ -18,6 +18,7 @@ var headBookSuite = map[string]func(hb core.HeadBook) func(*testing.T){ "AddGetHeads": testHeadBookAddHeads, "SetGetHeads": testHeadBookSetHeads, "ClearHeads": testHeadBookClearHeads, + "ExportHeads": testHeadBookExport, } type HeadBookFactory func() (core.HeadBook, func()) @@ -39,125 +40,157 @@ func HeadBookTest(t *testing.T, factory HeadBookFactory) { func testHeadBookAddHeads(hb core.HeadBook) func(t *testing.T) { return func(t *testing.T) { - tid := thread.NewIDV1(thread.Raw, 24) + var ( + numLogs = 2 + numHeads = 3 + tid, logs = genHeads(numLogs, numHeads) + ) + + for lid, heads := range logs { + if stored, err := hb.Heads(tid, lid); err != nil || len(stored) > 0 { + t.Error("expected heads to be empty on init without errors") + } - _, pub, _ := pt.RandTestKeyPair(crypto.RSA, crypto.MinRsaKeyBits) - p, _ := peer.IDFromPublicKey(pub) + if err := hb.AddHeads(tid, lid, heads); err != nil { + t.Fatalf("error when adding heads: %v", err) + } + } - if heads, err := hb.Heads(tid, p); err != nil || len(heads) > 0 { - t.Error("expected heads to be empty on init without errors") + for lid, expected := range logs { + heads, err := hb.Heads(tid, lid) + if err != nil { + t.Fatalf("error while getting heads: %v", err) + } + + if !equalHeads(expected, heads) { + t.Fatalf("heads not equal, expected: %v, actual: %v", expected, heads) + } } + } +} - heads := make([]cid.Cid, 0) - for i := 0; i < 2; i++ { - hash, _ := mh.Encode([]byte("foo"+strconv.Itoa(i)), mh.SHA2_256) - head := cid.NewCidV1(cid.DagCBOR, hash) +func testHeadBookSetHeads(hb core.HeadBook) func(t *testing.T) { + return func(t *testing.T) { + var ( + numLogs = 2 + numHeads = 3 + tid, logs = genHeads(numLogs, numHeads) + ) + + for lid, heads := range logs { + if stored, err := hb.Heads(tid, lid); err != nil || len(stored) > 0 { + t.Error("expected heads to be empty on init without errors") + } - if err := hb.AddHeads(tid, p, []cid.Cid{head}); err != nil { + if err := hb.SetHeads(tid, lid, heads); err != nil { t.Fatalf("error when adding heads: %v", err) } - heads = append(heads, head) } - hbHeads, err := hb.Heads(tid, p) - if err != nil { - t.Fatalf("error while getting heads: %v", err) - } - for _, h := range heads { - var found bool - for _, b := range hbHeads { - if b == h { - found = true - break - } + for lid, expected := range logs { + heads, err := hb.Heads(tid, lid) + if err != nil { + t.Fatalf("error while getting heads: %v", err) } - if !found { - t.Errorf("head %s not found in book", h.String()) + + if !equalHeads(expected, heads) { + t.Fatalf("heads not equal, expected: %v, actual: %v", expected, heads) } } } } -func testHeadBookSetHeads(hb core.HeadBook) func(t *testing.T) { +func testHeadBookClearHeads(hb core.HeadBook) func(t *testing.T) { return func(t *testing.T) { - tid := thread.NewIDV1(thread.Raw, 24) - - _, pub, _ := pt.RandTestKeyPair(crypto.RSA, crypto.MinRsaKeyBits) - p, _ := peer.IDFromPublicKey(pub) + var ( + numLogs = 2 + numHeads = 2 + tid, logs = genHeads(numLogs, numHeads) + ) + + for lid, heads := range logs { + if stored, err := hb.Heads(tid, lid); err != nil || len(stored) > 0 { + t.Error("expected heads to be empty on init without errors") + } - if heads, err := hb.Heads(tid, p); err != nil || len(heads) > 0 { - t.Error("expected heads to be empty on init without errors") + if err := hb.AddHeads(tid, lid, heads); err != nil { + t.Fatalf("error when adding heads: %v", err) + } } - heads := make([]cid.Cid, 0) - for i := 0; i < 2; i++ { - hash, _ := mh.Encode([]byte("foo"+strconv.Itoa(i)), mh.SHA2_256) - head := cid.NewCidV1(cid.DagCBOR, hash) - heads = append(heads, head) - } - if err := hb.SetHeads(tid, p, heads); err != nil { - t.Fatalf("error when setting heads: %v", err) + for lid, expected := range logs { + heads, err := hb.Heads(tid, lid) + if err != nil { + t.Fatalf("error while getting heads: %v", err) + } + + if !equalHeads(expected, heads) { + t.Fatalf("heads not equal, expected: %v, actual: %v", expected, heads) + } } - hbHeads, err := hb.Heads(tid, p) - if err != nil { - t.Fatalf("error when getting heads: %v", err) + for lid := range logs { + if err := hb.ClearHeads(tid, lid); err != nil { + t.Fatalf("error when clearing heads: %v", err) + } } - for _, h := range heads { - var found bool - for _, b := range hbHeads { - if b == h { - found = true - break - } + + for lid := range logs { + heads, err := hb.Heads(tid, lid) + if err != nil { + t.Fatalf("error while getting heads: %v", err) } - if !found { - t.Errorf("head %s not found in book", h.String()) + + if len(heads) > 0 { + t.Fatalf("heads not empty after clear") } } } } -func testHeadBookClearHeads(hb core.HeadBook) func(t *testing.T) { +func testHeadBookExport(hb core.HeadBook) func(t *testing.T) { return func(t *testing.T) { - tid := thread.NewIDV1(thread.Raw, 24) - - _, pub, _ := pt.RandTestKeyPair(crypto.RSA, crypto.MinRsaKeyBits) - p, _ := peer.IDFromPublicKey(pub) - - if heads, err := hb.Heads(tid, p); err != nil || len(heads) > 0 { - t.Error("expected heads to be empty on init without errors") - } - - for i := 0; i < 2; i++ { - hash, _ := mh.Encode([]byte("foo"+strconv.Itoa(i)), mh.SHA2_256) - head := cid.NewCidV1(cid.DagCBOR, hash) + var ( + numLogs = 2 + numHeads = 3 + tid, logs = genHeads(numLogs, numHeads) + ) + + for lid, heads := range logs { + if stored, err := hb.Heads(tid, lid); err != nil || len(stored) > 0 { + t.Error("expected heads to be empty on init without errors") + } - if err := hb.AddHead(tid, p, head); err != nil { + if err := hb.AddHeads(tid, lid, heads); err != nil { t.Fatalf("error when adding heads: %v", err) } } - heads, err := hb.Heads(tid, p) + dump, err := hb.DumpHeads() if err != nil { - t.Fatalf("error when getting heads: %v", err) - } - len1 := len(heads) - if len1 != 2 { - t.Errorf("incorrect heads length %d", len1) + t.Fatalf("error dumping headbook: %v", err) } - if err = hb.ClearHeads(tid, p); err != nil { - t.Fatalf("error when clearing heads: %v", err) + // clear storage explicitly to ensure it will be empty + for lid := range logs { + if err := hb.ClearHeads(tid, lid); err != nil { + t.Fatalf("error when clearing heads: %v", err) + } } - heads, err = hb.Heads(tid, p) - if err != nil { - t.Fatalf("error when getting heads: %v", err) + if err := hb.RestoreHeads(dump); err != nil { + t.Fatalf("error restoring headbook: %v", err) } - len2 := len(heads) - if len2 != 0 { - t.Errorf("incorrect heads length %d", len2) + + for lid, expected := range logs { + heads, err := hb.Heads(tid, lid) + if err != nil { + t.Fatalf("error while getting heads: %v", err) + } + + if !equalHeads(expected, heads) { + t.Fatalf("heads not equal, expected: %v, actual: %v", expected, heads) + } } } } @@ -189,99 +222,115 @@ func BenchmarkHeadBook(b *testing.B, factory HeadBookFactory) { func benchmarkHeads(hb core.HeadBook) func(*testing.B) { return func(b *testing.B) { - tid := thread.NewIDV1(thread.Raw, 24) - - _, pub, err := pt.RandTestKeyPair(crypto.RSA, crypto.MinRsaKeyBits) - if err != nil { - b.Error(err) - } - - id, err := peer.IDFromPublicKey(pub) - if err != nil { - b.Error(err) + var ( + numLogs = 1 + numHeads = 1 + tid, logs = genHeads(numLogs, numHeads) + ) + + for lid, heads := range logs { + _ = hb.AddHeads(tid, lid, heads) } - hash, _ := mh.Encode([]byte("foo"), mh.SHA2_256) - head := cid.NewCidV1(cid.DagCBOR, hash) - - _ = hb.AddHead(tid, id, head) - b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = hb.Heads(tid, id) + for lid := range logs { + _, _ = hb.Heads(tid, lid) + } } } } func benchmarkAddHeads(hb core.HeadBook) func(*testing.B) { return func(b *testing.B) { - tid := thread.NewIDV1(thread.Raw, 24) - - _, pub, err := pt.RandTestKeyPair(crypto.RSA, crypto.MinRsaKeyBits) - if err != nil { - b.Error(err) - } - - id, err := peer.IDFromPublicKey(pub) - if err != nil { - b.Error(err) - } - - hash, _ := mh.Encode([]byte("foo"), mh.SHA2_256) - head := cid.NewCidV1(cid.DagCBOR, hash) + var ( + numLogs = 1 + numHeads = 1 + tid, logs = genHeads(numLogs, numHeads) + ) b.ResetTimer() for i := 0; i < b.N; i++ { - _ = hb.AddHeads(tid, id, []cid.Cid{head}) + for lid, heads := range logs { + _ = hb.AddHeads(tid, lid, heads) + } } } } func benchmarkSetHeads(hb core.HeadBook) func(*testing.B) { return func(b *testing.B) { - tid := thread.NewIDV1(thread.Raw, 24) + var ( + numLogs = 1 + numHeads = 1 + tid, logs = genHeads(numLogs, numHeads) + ) - _, pub, err := pt.RandTestKeyPair(crypto.RSA, crypto.MinRsaKeyBits) - if err != nil { - b.Error(err) + b.ResetTimer() + for i := 0; i < b.N; i++ { + for lid, heads := range logs { + _ = hb.SetHeads(tid, lid, heads) + } } + } +} - id, err := peer.IDFromPublicKey(pub) - if err != nil { - b.Error(err) +func benchmarkClearHeads(hb core.HeadBook) func(*testing.B) { + return func(b *testing.B) { + var ( + numLogs = 1 + numHeads = 1 + tid, logs = genHeads(numLogs, numHeads) + ) + + for lid, heads := range logs { + _ = hb.SetHeads(tid, lid, heads) } - hash, _ := mh.Encode([]byte("foo"), mh.SHA2_256) - head := cid.NewCidV1(cid.DagCBOR, hash) - b.ResetTimer() for i := 0; i < b.N; i++ { - _ = hb.SetHeads(tid, id, []cid.Cid{head}) + for lid := range logs { + _ = hb.ClearHeads(tid, lid) + } } } } -func benchmarkClearHeads(hb core.HeadBook) func(*testing.B) { - return func(b *testing.B) { - tid := thread.NewIDV1(thread.Raw, 24) +func genHeads(numLogs, numHeads int) (thread.ID, map[peer.ID][]cid.Cid) { + var ( + logs = make(map[peer.ID][]cid.Cid) + tid = thread.NewIDV1(thread.Raw, 32) + ) - _, pub, err := pt.RandTestKeyPair(crypto.RSA, crypto.MinRsaKeyBits) - if err != nil { - b.Error(err) - } + for i := 0; i < numLogs; i++ { + _, pub, _ := pt.RandTestKeyPair(crypto.RSA, crypto.MinRsaKeyBits) + lid, _ := peer.IDFromPublicKey(pub) - id, err := peer.IDFromPublicKey(pub) - if err != nil { - b.Error(err) + heads := make([]cid.Cid, numHeads) + for j := 0; j < numHeads; j++ { + hash, _ := mh.Encode([]byte("h:"+strconv.Itoa(i)+":"+strconv.Itoa(j)), mh.SHA2_256) + heads[j] = cid.NewCidV1(cid.DagCBOR, hash) } - hash, _ := mh.Encode([]byte("foo"), mh.SHA2_256) - head := cid.NewCidV1(cid.DagCBOR, hash) - _ = hb.SetHeads(tid, id, []cid.Cid{head}) + logs[lid] = heads + } - b.ResetTimer() - for i := 0; i < b.N; i++ { - _ = hb.ClearHeads(tid, id) + return tid, logs +} + +func equalHeads(h1, h2 []cid.Cid) bool { + if len(h1) != len(h2) { + return false + } + + sort.Slice(h1, func(i, j int) bool { return h1[i].String() < h1[j].String() }) + sort.Slice(h2, func(i, j int) bool { return h2[i].String() < h2[j].String() }) + + for i := 0; i < len(h1); i++ { + if !h1[i].Equals(h2[i]) { + return false } } + + return true } diff --git a/test/keybook_suite.go b/test/keybook_suite.go index 7bea2253..a98bf19e 100644 --- a/test/keybook_suite.go +++ b/test/keybook_suite.go @@ -25,6 +25,7 @@ var keyBookSuite = map[string]func(kb core.KeyBook) func(*testing.T){ "testKeyBookClearLogKeys": testKeyBookClearLogKeys, "ThreadsFromKeys": testKeyBookThreads, "PubKeyAddedOnRetrieve": testInlinedPubKeyAddedOnRetrieve, + "ExportKeyBook": testKeyBookExport, } type KeyBookFactory func() (core.KeyBook, func()) @@ -420,6 +421,128 @@ func testInlinedPubKeyAddedOnRetrieve(kb core.KeyBook) func(t *testing.T) { } } +func testKeyBookExport(kb core.KeyBook) func(t *testing.T) { + return func(t *testing.T) { + var ( + numThreads = 2 + numLogs = 3 + + public = make(map[thread.ID]map[peer.ID]crypto.PubKey) + private = make(map[thread.ID]map[peer.ID]crypto.PrivKey) + read = make(map[thread.ID][]byte) + service = make(map[thread.ID][]byte) + ) + + // generate key set + for i := 0; i < numThreads; i++ { + tid := thread.NewIDV1(thread.Raw, 24) + + readKey, err := sym.NewRandom() + if err != nil { + t.Error(err) + } + read[tid] = readKey.Bytes() + if err := kb.AddReadKey(tid, readKey); err != nil { + t.Fatal(err) + } + + serviceKey, err := sym.NewRandom() + if err != nil { + t.Error(err) + } + service[tid] = serviceKey.Bytes() + if err := kb.AddServiceKey(tid, serviceKey); err != nil { + t.Fatal(err) + } + + public[tid] = make(map[peer.ID]crypto.PubKey) + private[tid] = make(map[peer.ID]crypto.PrivKey) + + for j := 0; j < numLogs; j++ { + priv, pub, _ := pt.RandTestKeyPair(crypto.RSA, crypto.MinRsaKeyBits) + lid, _ := peer.IDFromPublicKey(pub) + + public[tid][lid] = pub + private[tid][lid] = priv + + if err := kb.AddPubKey(tid, lid, pub); err != nil { + t.Fatal(err) + } + + if err := kb.AddPrivKey(tid, lid, priv); err != nil { + t.Fatal(err) + } + } + } + + // make a dump + dump, err := kb.DumpKeys() + if err != nil { + t.Fatal(err) + } + + // purge all keys + for tid := range read { + if err := kb.ClearKeys(tid); err != nil { + t.Fatal(err) + } + } + + // try to restore from the dump + if err := kb.RestoreKeys(dump); err != nil { + t.Fatal(err) + } + + // compare public keys + for tid, logs := range public { + for lid, key := range logs { + pk, err := kb.PubKey(tid, lid) + if err != nil { + t.Fatal(err) + } + if !pk.Equals(key) { + t.Error("restored public key is different from the original one") + } + } + } + + // compare private keys + for tid, logs := range private { + for lid, key := range logs { + pk, err := kb.PrivKey(tid, lid) + if err != nil { + t.Fatal(err) + } + if !pk.Equals(key) { + t.Error("restored private key is different from the original one") + } + } + } + + // compare read keys + for tid, key := range read { + rk, err := kb.ReadKey(tid) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(rk.Bytes(), key) { + t.Error("restored thread-read key is different from the original one") + } + } + + // compare service keys + for tid, key := range service { + sk, err := kb.ServiceKey(tid) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(sk.Bytes(), key) { + t.Error("restored thread-service key is different from the original one") + } + } + } +} + var logKeybookBenchmarkSuite = map[string]func(kb core.KeyBook) func(*testing.B){ "PubKey": benchmarkPubKey, "AddPubKey": benchmarkAddPubKey, diff --git a/test/metadata_suite.go b/test/metadata_suite.go index 5b539468..c6f7b8e4 100644 --- a/test/metadata_suite.go +++ b/test/metadata_suite.go @@ -18,11 +18,12 @@ const ( ) var metadataBookSuite = map[string]func(mb core.ThreadMetadata) func(*testing.T){ - "Int64": testMetadataBookInt64, - "String": testMetadataBookString, - "Byte": testMetadataBookBytes, - "NotFound": testMetadataBookNotFound, - "ClearMetadata": testClearMetadata, + "Int64": testMetadataBookInt64, + "String": testMetadataBookString, + "Byte": testMetadataBookBytes, + "NotFound": testMetadataBookNotFound, + "ClearMetadata": testClearMetadata, + "ExportMetadata": testMetadataBookExport, } type MetadataBookFactory func() (core.ThreadMetadata, func()) @@ -181,3 +182,97 @@ func testClearMetadata(mb core.ThreadMetadata) func(*testing.T) { } } } + +func testMetadataBookExport(mb core.ThreadMetadata) func(*testing.T) { + return func(t *testing.T) { + var ( + tid = thread.NewIDV1(thread.Raw, 24) + + k1, v1 = "k1", int64(123) + k2, v2 = "k2", int64(-612) + k3, v3 = "k3", true + k4, v4 = "k4", false + k5, v5 = "k5", "v5" + k6, v6 = "k6", "value6" + k7, v7 = "k7", []byte("bytestring value 7") + k8, v8 = "k8", []byte("v8") + + check = func(err error, key string, tmpl string) { + if err != nil { + t.Fatalf(tmpl, key, err) + } + } + + compare = func(expected, actual interface{}) { + e, ok1 := expected.([]byte) + a, ok2 := actual.([]byte) + if (ok1 && ok2 && !bytes.Equal(e, a)) || (!ok1 && !ok2 && expected != actual) { + t.Fatalf(errStrValueMatch, e, a) + } + } + ) + + check(mb.PutInt64(tid, k1, v1), k1, errStrPut) + check(mb.PutInt64(tid, k2, v2), k2, errStrPut) + check(mb.PutBool(tid, k3, v3), k3, errStrPut) + check(mb.PutBool(tid, k4, v4), k4, errStrPut) + check(mb.PutString(tid, k5, v5), k5, errStrPut) + check(mb.PutString(tid, k6, v6), k6, errStrPut) + check(mb.PutBytes(tid, k7, v7), k7, errStrPut) + check(mb.PutBytes(tid, k8, v8), k8, errStrPut) + + dump, err := mb.DumpMeta() + if err != nil { + t.Fatalf("dumping metadata: %v", err) + } + + if err := mb.ClearMetadata(tid); err != nil { + t.Fatalf("clearing metadata: %v", err) + } + + if err := mb.RestoreMeta(dump); err != nil { + t.Fatalf("restoring metadata from the dump: %v", err) + } + + { + v, err := mb.GetInt64(tid, k1) + check(err, k1, errStrGet) + compare(v1, *v) + } + { + v, err := mb.GetInt64(tid, k2) + check(err, k2, errStrGet) + compare(v2, *v) + } + { + v, err := mb.GetBool(tid, k3) + check(err, k3, errStrGet) + compare(v3, *v) + } + { + v, err := mb.GetBool(tid, k4) + check(err, k4, errStrGet) + compare(v4, *v) + } + { + v, err := mb.GetString(tid, k5) + check(err, k5, errStrGet) + compare(v5, *v) + } + { + v, err := mb.GetString(tid, k6) + check(err, k6, errStrGet) + compare(v6, *v) + } + { + v, err := mb.GetBytes(tid, k7) + check(err, k7, errStrGet) + compare(v7, *v) + } + { + v, err := mb.GetBytes(tid, k8) + check(err, k8, errStrGet) + compare(v8, *v) + } + } +} diff --git a/util/finalizer.go b/util/finalizer.go new file mode 100644 index 00000000..61835a20 --- /dev/null +++ b/util/finalizer.go @@ -0,0 +1,44 @@ +package util + +import ( + "context" + "io" + + "github.com/hashicorp/go-multierror" +) + +// Finalizer collects resources for convenient cleanup. +func NewFinalizer() *Finalizer { + return &Finalizer{} +} + +type Finalizer struct { + resources []io.Closer +} + +func (r *Finalizer) Add(cs ...io.Closer) { + r.resources = append(r.resources, cs...) +} + +func (r *Finalizer) Cleanup(err error) error { + // release resources in a reverse order + for i := len(r.resources) - 1; i >= 0; i-- { + err = multierror.Append(r.resources[i].Close()) + } + + return err.(*multierror.Error).ErrorOrNil() +} + +// Transform context cancellation function to be used with finalizer. +func NewContextCloser(cancel context.CancelFunc) io.Closer { + return &ContextCloser{cf: cancel} +} + +type ContextCloser struct { + cf context.CancelFunc +} + +func (cc ContextCloser) Close() error { + cc.cf() + return nil +}