From 447b6c75dc3a2f886e1992e4a3e1d838decb66b8 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Tue, 30 Jul 2024 16:30:29 +0100 Subject: [PATCH 1/8] Dedup experiment --- cmd/conformance/gcp/main.go | 20 +++++++ deduper.go | 107 ++++++++++++++++++++++++++++++++++++ storage/gcp/gcp.go | 63 +++++++++++++++++++++ 3 files changed, 190 insertions(+) create mode 100644 deduper.go diff --git a/cmd/conformance/gcp/main.go b/cmd/conformance/gcp/main.go index 9bd9d9be..d790a294 100644 --- a/cmd/conformance/gcp/main.go +++ b/cmd/conformance/gcp/main.go @@ -17,6 +17,7 @@ package main import ( "context" + "crypto/sha256" "errors" "flag" "fmt" @@ -67,6 +68,12 @@ func main() { } dedupeAdd := tessera.InMemoryDedupe(storage.Add, 256) + gcpDedup, err := gcp.NewDedupeStorage(ctx, fmt.Sprintf("%s_dedup", *spanner)) + if err != nil { + klog.Exitf("Failed to create new GCP dedupe storage: %v", err) + } + dedup := tessera.NewDeduper(ctx, gcpDedup) + // Expose a HTTP handler for the conformance test writes. // This should accept arbitrary bytes POSTed to /add, and return an ascii // decimal representation of the index assigned to the entry. @@ -77,6 +84,16 @@ func main() { return } + id := sha256.Sum256(b) + if idx, err := dedup.Index(r.Context(), id[:]); err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + return + } else if idx != nil { + _, _ = w.Write([]byte(fmt.Sprintf("%d", *idx))) + return + } + idx, err := dedupeAdd(r.Context(), tessera.NewEntry(b))() if err != nil { if errors.Is(err, tessera.ErrPushback) { @@ -90,6 +107,9 @@ func main() { } // Write out the assigned index _, _ = w.Write([]byte(fmt.Sprintf("%d", idx))) + if err := dedup.Set(r.Context(), id[:], idx); err != nil { + klog.Warningf("Failed to set dedup %x -> %d: %v", id, idx, err) + } }) h2s := &http2.Server{} diff --git a/deduper.go b/deduper.go new file mode 100644 index 00000000..9a24f738 --- /dev/null +++ b/deduper.go @@ -0,0 +1,107 @@ +package tessera + +import ( + "context" + "sync" + "time" + + "github.com/globocom/go-buffer" + "k8s.io/klog/v2" +) + +type DeduperStorage interface { + Set(context.Context, []DedupEntry) error + Index(context.Context, []byte) (*uint64, error) +} + +type Deduper struct { + ctx context.Context + storage DeduperStorage + + mu sync.Mutex + numLookups uint64 + numWrites uint64 + numCacheDedups uint64 + numDBDedups uint64 + numPushErrs uint64 + + buf *buffer.Buffer +} + +func NewDeduper(ctx context.Context, s DeduperStorage) *Deduper { + r := &Deduper{ + ctx: ctx, + storage: s, + } + + r.buf = buffer.New( + buffer.WithSize(64), + buffer.WithFlushInterval(200*time.Millisecond), + buffer.WithFlusher(buffer.FlusherFunc(r.flush)), + buffer.WithPushTimeout(15*time.Second), + ) + go func(ctx context.Context) { + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + r.mu.Lock() + klog.Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites, r.numLookups, r.numDBDedups, r.numPushErrs) + r.mu.Unlock() + } + } + }(ctx) + return r +} + +type DedupEntry struct { + ID []byte + Idx uint64 +} + +func (s *Deduper) inc(p *uint64) { + s.mu.Lock() + defer s.mu.Unlock() + (*p)++ +} + +func (s *Deduper) Index(ctx context.Context, h []byte) (*uint64, error) { + s.inc(&s.numLookups) + r, err := s.storage.Index(ctx, h) + if r != nil { + s.inc(&s.numDBDedups) + } + return r, err +} + +func (s *Deduper) Set(_ context.Context, h []byte, idx uint64) error { + err := s.buf.Push(DedupEntry{ID: h, Idx: idx}) + if err != nil { + s.inc(&s.numPushErrs) + // This means there's pressure flushing dedup writes out, so discard this write. + if err != buffer.ErrTimeout { + return err + } + } + return nil +} + +func (s *Deduper) flush(items []interface{}) { + entries := make([]DedupEntry, len(items)) + for i := range items { + entries[i] = items[i].(DedupEntry) + } + + ctx, c := context.WithTimeout(s.ctx, 15*time.Second) + defer c() + + if err := s.storage.Set(ctx, entries); err != nil { + klog.Infof("Failed to flush dedup entries: %v", err) + return + } + s.mu.Lock() + s.numWrites += uint64(len(entries)) + s.mu.Unlock() +} diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 6b5fce2f..c3ce1994 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -783,3 +783,66 @@ func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, e } return r.Attrs.LastModified, r.Close() } + +// NewDedupeStorage returns a struct which can be used to store identity -> index mappings backed +// by Spanner. +// +// Note that updates to this dedup storage is logically entriely separate from any updates +// happening to the log storage. +func NewDedupeStorage(ctx context.Context, spannerDB string) (*DedupStorage, error) { + /* + Schema for reference: + + CREATE TABLE IDSeq ( + id INT64 NOT NULL, + h BYTES(MAX) NOT NULL, + idx INT64 NOT NULL, + ) PRIMARY KEY (id, h); + */ + dedupDB, err := spanner.NewClient(ctx, spannerDB) + if err != nil { + return nil, fmt.Errorf("failed to connect to Spanner: %v", err) + } + + return &DedupStorage{ + dbPool: dedupDB, + }, nil +} + +type DedupStorage struct { + dbPool *spanner.Client +} + +func (d *DedupStorage) Index(ctx context.Context, h []byte) (*uint64, error) { + var idx int64 + if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, h}, []string{"idx"}); err != nil { + if c := spanner.ErrCode(err); c == codes.NotFound { + return nil, nil + } + return nil, err + } else { + if err := row.Column(0, &idx); err != nil { + return nil, fmt.Errorf("failed to read dedup index: %v", err) + } + idx := uint64(idx) + return &idx, nil + } +} + +func (d *DedupStorage) Set(ctx context.Context, entries []tessera.DedupEntry) error { + m := make([]*spanner.MutationGroup, 0, len(entries)) + for _, e := range entries { + m = append(m, &spanner.MutationGroup{ + Mutations: []*spanner.Mutation{spanner.Insert("IDSeq", []string{"id", "h", "idx"}, []interface{}{0, e.ID, int64(e.Idx)})}, + }) + } + + i := d.dbPool.BatchWrite(ctx, m) + return i.Do(func(r *spannerpb.BatchWriteResponse) error { + s := r.GetStatus() + if c := codes.Code(s.Code); c != codes.OK && c != codes.AlreadyExists { + return fmt.Errorf("failed to write dedup record: %v (%v)", s.GetMessage(), c) + } + return nil + }) +} From eca09d49c25f79da204ca39d934b5ae1485ccfe1 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Tue, 3 Dec 2024 16:50:05 +0000 Subject: [PATCH 2/8] Update to match delegate style --- cmd/conformance/gcp/main.go | 29 +++----- dedupe.go | 131 ++++++++++++++++++++++++++++++++++++ deduper.go | 107 ----------------------------- storage/gcp/gcp.go | 2 +- 4 files changed, 142 insertions(+), 127 deletions(-) delete mode 100644 deduper.go diff --git a/cmd/conformance/gcp/main.go b/cmd/conformance/gcp/main.go index d790a294..e64e4479 100644 --- a/cmd/conformance/gcp/main.go +++ b/cmd/conformance/gcp/main.go @@ -17,7 +17,6 @@ package main import ( "context" - "crypto/sha256" "errors" "flag" "fmt" @@ -38,6 +37,7 @@ var ( listen = flag.String("listen", ":2024", "Address:port to listen on") spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')") signer = flag.String("signer", "", "Note signer to use to sign checkpoints") + persistentDedup = flag.Bool("gcp_dedup", false, "Set to true to enable persistent dedupe storage") additionalSigners = []string{} ) @@ -66,13 +66,17 @@ func main() { if err != nil { klog.Exitf("Failed to create new GCP storage: %v", err) } - dedupeAdd := tessera.InMemoryDedupe(storage.Add, 256) - gcpDedup, err := gcp.NewDedupeStorage(ctx, fmt.Sprintf("%s_dedup", *spanner)) - if err != nil { - klog.Exitf("Failed to create new GCP dedupe storage: %v", err) + // Handle dedup configuration + addDelegate := storage.Add + if *persistentDedup { + gcpDedup, err := gcp.NewDedupeStorage(ctx, fmt.Sprintf("%s_dedup", *spanner)) + if err != nil { + klog.Exitf("Failed to create new GCP dedupe storage: %v", err) + } + addDelegate = tessera.PersistentDedupe(ctx, gcpDedup, addDelegate) } - dedup := tessera.NewDeduper(ctx, gcpDedup) + dedupeAdd := tessera.InMemoryDedupe(addDelegate, 256) // Expose a HTTP handler for the conformance test writes. // This should accept arbitrary bytes POSTed to /add, and return an ascii @@ -84,16 +88,6 @@ func main() { return } - id := sha256.Sum256(b) - if idx, err := dedup.Index(r.Context(), id[:]); err != nil { - w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte(err.Error())) - return - } else if idx != nil { - _, _ = w.Write([]byte(fmt.Sprintf("%d", *idx))) - return - } - idx, err := dedupeAdd(r.Context(), tessera.NewEntry(b))() if err != nil { if errors.Is(err, tessera.ErrPushback) { @@ -107,9 +101,6 @@ func main() { } // Write out the assigned index _, _ = w.Write([]byte(fmt.Sprintf("%d", idx))) - if err := dedup.Set(r.Context(), id[:], idx); err != nil { - klog.Warningf("Failed to set dedup %x -> %d: %v", id, idx, err) - } }) h2s := &http2.Server{} diff --git a/dedupe.go b/dedupe.go index 23b9b2fa..5290524a 100644 --- a/dedupe.go +++ b/dedupe.go @@ -17,8 +17,11 @@ package tessera import ( "context" "sync" + "time" + "github.com/globocom/go-buffer" "github.com/hashicorp/golang-lru/v2/expirable" + "k8s.io/klog/v2" ) // InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying @@ -62,3 +65,131 @@ func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { } return f } + +// DedupeStorage describes a struct which can store and retrieve hash to index mappings. +type DedupeStorage interface { + // Set must persist the provided id->index mappings. + // It must return an error if it fails to store one or more of the mappings, but it does not + // have to store all mappings atomically. + Set(context.Context, []DedupeMapping) error + // Index must return the index of a previously stored ID, or nil if the ID is unknown. + Index(context.Context, []byte) (*uint64, error) +} + +// DedupeMapping represents an ID -> index mapping. +type DedupeMapping struct { + ID []byte + Idx uint64 +} + +type persistentDedup struct { + ctx context.Context + storage DedupeStorage + delegate func(ctx context.Context, e *Entry) IndexFuture + + mu sync.Mutex + numLookups uint64 + numWrites uint64 + numDBDedups uint64 + numPushErrs uint64 + + buf *buffer.Buffer +} + +// PersistentDedup returns a wrapped Add method which will return the previously seen index for the given entry, if such an entry exists +func PersistentDedupe(ctx context.Context, s DedupeStorage, delegate func(ctx context.Context, e *Entry) IndexFuture) func(context.Context, *Entry) IndexFuture { + r := &persistentDedup{ + ctx: ctx, + storage: s, + delegate: delegate, + } + + // TODO(al): Make these configurable + r.buf = buffer.New( + buffer.WithSize(64), + buffer.WithFlushInterval(200*time.Millisecond), + buffer.WithFlusher(buffer.FlusherFunc(r.flush)), + buffer.WithPushTimeout(15*time.Second), + ) + go func(ctx context.Context) { + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + r.mu.Lock() + klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites, r.numLookups, r.numDBDedups, r.numPushErrs) + r.mu.Unlock() + } + } + }(ctx) + return r.add +} + +// add adds the entry to the underlying delegate only if e isn't already known. In either case, +// an IndexFuture will be returned that the client can use to get the sequence number of this entry. +func (d *persistentDedup) add(ctx context.Context, e *Entry) IndexFuture { + idx, err := d.Index(ctx, e.Identity()) + if err != nil { + return func() (uint64, error) { return 0, err } + } + if idx != nil { + return func() (uint64, error) { return *idx, nil } + } + + i, err := d.delegate(ctx, e)() + if err != nil { + return func() (uint64, error) { return 0, err } + } + + err = d.Set(ctx, e.Identity(), i) + return func() (uint64, error) { + return i, err + } +} + +func (d *persistentDedup) inc(p *uint64) { + d.mu.Lock() + defer d.mu.Unlock() + (*p)++ +} + +func (d *persistentDedup) Index(ctx context.Context, h []byte) (*uint64, error) { + d.inc(&d.numLookups) + r, err := d.storage.Index(ctx, h) + if r != nil { + d.inc(&d.numDBDedups) + } + return r, err +} + +func (d *persistentDedup) Set(_ context.Context, h []byte, idx uint64) error { + err := d.buf.Push(DedupeMapping{ID: h, Idx: idx}) + if err != nil { + d.inc(&d.numPushErrs) + // This means there's pressure flushing dedup writes out, so discard this write. + if err != buffer.ErrTimeout { + return err + } + } + return nil +} + +func (d *persistentDedup) flush(items []interface{}) { + entries := make([]DedupeMapping, len(items)) + for i := range items { + entries[i] = items[i].(DedupeMapping) + } + + ctx, c := context.WithTimeout(d.ctx, 15*time.Second) + defer c() + + if err := d.storage.Set(ctx, entries); err != nil { + klog.Infof("Failed to flush dedup entries: %v", err) + return + } + d.mu.Lock() + d.numWrites += uint64(len(entries)) + d.mu.Unlock() +} diff --git a/deduper.go b/deduper.go deleted file mode 100644 index 9a24f738..00000000 --- a/deduper.go +++ /dev/null @@ -1,107 +0,0 @@ -package tessera - -import ( - "context" - "sync" - "time" - - "github.com/globocom/go-buffer" - "k8s.io/klog/v2" -) - -type DeduperStorage interface { - Set(context.Context, []DedupEntry) error - Index(context.Context, []byte) (*uint64, error) -} - -type Deduper struct { - ctx context.Context - storage DeduperStorage - - mu sync.Mutex - numLookups uint64 - numWrites uint64 - numCacheDedups uint64 - numDBDedups uint64 - numPushErrs uint64 - - buf *buffer.Buffer -} - -func NewDeduper(ctx context.Context, s DeduperStorage) *Deduper { - r := &Deduper{ - ctx: ctx, - storage: s, - } - - r.buf = buffer.New( - buffer.WithSize(64), - buffer.WithFlushInterval(200*time.Millisecond), - buffer.WithFlusher(buffer.FlusherFunc(r.flush)), - buffer.WithPushTimeout(15*time.Second), - ) - go func(ctx context.Context) { - t := time.NewTicker(time.Second) - for { - select { - case <-ctx.Done(): - return - case <-t.C: - r.mu.Lock() - klog.Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites, r.numLookups, r.numDBDedups, r.numPushErrs) - r.mu.Unlock() - } - } - }(ctx) - return r -} - -type DedupEntry struct { - ID []byte - Idx uint64 -} - -func (s *Deduper) inc(p *uint64) { - s.mu.Lock() - defer s.mu.Unlock() - (*p)++ -} - -func (s *Deduper) Index(ctx context.Context, h []byte) (*uint64, error) { - s.inc(&s.numLookups) - r, err := s.storage.Index(ctx, h) - if r != nil { - s.inc(&s.numDBDedups) - } - return r, err -} - -func (s *Deduper) Set(_ context.Context, h []byte, idx uint64) error { - err := s.buf.Push(DedupEntry{ID: h, Idx: idx}) - if err != nil { - s.inc(&s.numPushErrs) - // This means there's pressure flushing dedup writes out, so discard this write. - if err != buffer.ErrTimeout { - return err - } - } - return nil -} - -func (s *Deduper) flush(items []interface{}) { - entries := make([]DedupEntry, len(items)) - for i := range items { - entries[i] = items[i].(DedupEntry) - } - - ctx, c := context.WithTimeout(s.ctx, 15*time.Second) - defer c() - - if err := s.storage.Set(ctx, entries); err != nil { - klog.Infof("Failed to flush dedup entries: %v", err) - return - } - s.mu.Lock() - s.numWrites += uint64(len(entries)) - s.mu.Unlock() -} diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index c3ce1994..0ed6b7f1 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -829,7 +829,7 @@ func (d *DedupStorage) Index(ctx context.Context, h []byte) (*uint64, error) { } } -func (d *DedupStorage) Set(ctx context.Context, entries []tessera.DedupEntry) error { +func (d *DedupStorage) Set(ctx context.Context, entries []tessera.DedupeMapping) error { m := make([]*spanner.MutationGroup, 0, len(entries)) for _, e := range entries { m = append(m, &spanner.MutationGroup{ From aa33870e2975902156580969d0083da80f575049 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Tue, 3 Dec 2024 17:32:58 +0000 Subject: [PATCH 3/8] Experiment warning! --- cmd/conformance/gcp/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/conformance/gcp/main.go b/cmd/conformance/gcp/main.go index e64e4479..3a2a6c59 100644 --- a/cmd/conformance/gcp/main.go +++ b/cmd/conformance/gcp/main.go @@ -37,7 +37,7 @@ var ( listen = flag.String("listen", ":2024", "Address:port to listen on") spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')") signer = flag.String("signer", "", "Note signer to use to sign checkpoints") - persistentDedup = flag.Bool("gcp_dedup", false, "Set to true to enable persistent dedupe storage") + persistentDedup = flag.Bool("gcp_dedup", false, "EXPERIMENTAL: Set to true to enable persistent dedupe storage") additionalSigners = []string{} ) @@ -69,6 +69,8 @@ func main() { // Handle dedup configuration addDelegate := storage.Add + + // PersistentDedup is currently experimental, so there's no terraform or documentation yet! if *persistentDedup { gcpDedup, err := gcp.NewDedupeStorage(ctx, fmt.Sprintf("%s_dedup", *spanner)) if err != nil { From 05a010f26506187bde0ffcfa754b226e661cd3a0 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Tue, 3 Dec 2024 18:03:57 +0000 Subject: [PATCH 4/8] Move impl into GCP --- cmd/conformance/gcp/main.go | 5 +- dedupe.go | 131 ------------------------------------ storage/gcp/gcp.go | 130 +++++++++++++++++++++++++++++++---- 3 files changed, 120 insertions(+), 146 deletions(-) diff --git a/cmd/conformance/gcp/main.go b/cmd/conformance/gcp/main.go index 3a2a6c59..e34b08ed 100644 --- a/cmd/conformance/gcp/main.go +++ b/cmd/conformance/gcp/main.go @@ -72,11 +72,10 @@ func main() { // PersistentDedup is currently experimental, so there's no terraform or documentation yet! if *persistentDedup { - gcpDedup, err := gcp.NewDedupeStorage(ctx, fmt.Sprintf("%s_dedup", *spanner)) + addDelegate, err = gcp.NewDedupe(ctx, fmt.Sprintf("%s_dedup", *spanner), addDelegate) if err != nil { - klog.Exitf("Failed to create new GCP dedupe storage: %v", err) + klog.Exitf("Failed to create new GCP dedupe: %v", err) } - addDelegate = tessera.PersistentDedupe(ctx, gcpDedup, addDelegate) } dedupeAdd := tessera.InMemoryDedupe(addDelegate, 256) diff --git a/dedupe.go b/dedupe.go index 5290524a..23b9b2fa 100644 --- a/dedupe.go +++ b/dedupe.go @@ -17,11 +17,8 @@ package tessera import ( "context" "sync" - "time" - "github.com/globocom/go-buffer" "github.com/hashicorp/golang-lru/v2/expirable" - "k8s.io/klog/v2" ) // InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying @@ -65,131 +62,3 @@ func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { } return f } - -// DedupeStorage describes a struct which can store and retrieve hash to index mappings. -type DedupeStorage interface { - // Set must persist the provided id->index mappings. - // It must return an error if it fails to store one or more of the mappings, but it does not - // have to store all mappings atomically. - Set(context.Context, []DedupeMapping) error - // Index must return the index of a previously stored ID, or nil if the ID is unknown. - Index(context.Context, []byte) (*uint64, error) -} - -// DedupeMapping represents an ID -> index mapping. -type DedupeMapping struct { - ID []byte - Idx uint64 -} - -type persistentDedup struct { - ctx context.Context - storage DedupeStorage - delegate func(ctx context.Context, e *Entry) IndexFuture - - mu sync.Mutex - numLookups uint64 - numWrites uint64 - numDBDedups uint64 - numPushErrs uint64 - - buf *buffer.Buffer -} - -// PersistentDedup returns a wrapped Add method which will return the previously seen index for the given entry, if such an entry exists -func PersistentDedupe(ctx context.Context, s DedupeStorage, delegate func(ctx context.Context, e *Entry) IndexFuture) func(context.Context, *Entry) IndexFuture { - r := &persistentDedup{ - ctx: ctx, - storage: s, - delegate: delegate, - } - - // TODO(al): Make these configurable - r.buf = buffer.New( - buffer.WithSize(64), - buffer.WithFlushInterval(200*time.Millisecond), - buffer.WithFlusher(buffer.FlusherFunc(r.flush)), - buffer.WithPushTimeout(15*time.Second), - ) - go func(ctx context.Context) { - t := time.NewTicker(time.Second) - for { - select { - case <-ctx.Done(): - return - case <-t.C: - r.mu.Lock() - klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites, r.numLookups, r.numDBDedups, r.numPushErrs) - r.mu.Unlock() - } - } - }(ctx) - return r.add -} - -// add adds the entry to the underlying delegate only if e isn't already known. In either case, -// an IndexFuture will be returned that the client can use to get the sequence number of this entry. -func (d *persistentDedup) add(ctx context.Context, e *Entry) IndexFuture { - idx, err := d.Index(ctx, e.Identity()) - if err != nil { - return func() (uint64, error) { return 0, err } - } - if idx != nil { - return func() (uint64, error) { return *idx, nil } - } - - i, err := d.delegate(ctx, e)() - if err != nil { - return func() (uint64, error) { return 0, err } - } - - err = d.Set(ctx, e.Identity(), i) - return func() (uint64, error) { - return i, err - } -} - -func (d *persistentDedup) inc(p *uint64) { - d.mu.Lock() - defer d.mu.Unlock() - (*p)++ -} - -func (d *persistentDedup) Index(ctx context.Context, h []byte) (*uint64, error) { - d.inc(&d.numLookups) - r, err := d.storage.Index(ctx, h) - if r != nil { - d.inc(&d.numDBDedups) - } - return r, err -} - -func (d *persistentDedup) Set(_ context.Context, h []byte, idx uint64) error { - err := d.buf.Push(DedupeMapping{ID: h, Idx: idx}) - if err != nil { - d.inc(&d.numPushErrs) - // This means there's pressure flushing dedup writes out, so discard this write. - if err != buffer.ErrTimeout { - return err - } - } - return nil -} - -func (d *persistentDedup) flush(items []interface{}) { - entries := make([]DedupeMapping, len(items)) - for i := range items { - entries[i] = items[i].(DedupeMapping) - } - - ctx, c := context.WithTimeout(d.ctx, 15*time.Second) - defer c() - - if err := d.storage.Set(ctx, entries); err != nil { - klog.Infof("Failed to flush dedup entries: %v", err) - return - } - d.mu.Lock() - d.numWrites += uint64(len(entries)) - d.mu.Unlock() -} diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 0ed6b7f1..aa4a374a 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -37,11 +37,13 @@ import ( "io" "net/http" "os" + "sync/atomic" "time" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" gcs "cloud.google.com/go/storage" + "github.com/globocom/go-buffer" "github.com/google/go-cmp/cmp" "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" @@ -784,12 +786,20 @@ func (s *gcsStorage) lastModified(ctx context.Context, obj string) (time.Time, e return r.Attrs.LastModified, r.Close() } -// NewDedupeStorage returns a struct which can be used to store identity -> index mappings backed -// by Spanner. +// NewDedupe returns wrapped Add func which will use Spanner to maintain a mapping of +// previously seen entries and their assigned indices. Future calls with the same entry +// will return the previously assigned index, as yet unseen entries will be passed to the provided +// delegate function to have an index assigned. // -// Note that updates to this dedup storage is logically entriely separate from any updates -// happening to the log storage. -func NewDedupeStorage(ctx context.Context, spannerDB string) (*DedupStorage, error) { +// For performance reasons, the ID -> index associations returned by the delegate are buffered before +// being flushed to Spanner. This can result in duplicates occuring in some circumstances, but in +// general this should not be a problem. +// +// Note that the storage for this mapping is entirely separate and unconnected to the storage used for +// maintaining the Merkle tree. +// +// This functionality is experimental! +func NewDedupe(ctx context.Context, spannerDB string, delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture) (func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture, error) { /* Schema for reference: @@ -804,16 +814,49 @@ func NewDedupeStorage(ctx context.Context, spannerDB string) (*DedupStorage, err return nil, fmt.Errorf("failed to connect to Spanner: %v", err) } - return &DedupStorage{ - dbPool: dedupDB, - }, nil + r := &dedupStorage{ + ctx: ctx, + dbPool: dedupDB, + delegate: delegate, + } + + // TODO(al): Make these configurable + r.buf = buffer.New( + buffer.WithSize(64), + buffer.WithFlushInterval(200*time.Millisecond), + buffer.WithFlusher(buffer.FlusherFunc(r.flush)), + buffer.WithPushTimeout(15*time.Second), + ) + go func(ctx context.Context) { + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites.Load(), r.numLookups.Load(), r.numDBDedups.Load(), r.numPushErrs.Load()) + } + } + }(ctx) + return r.add, nil } -type DedupStorage struct { - dbPool *spanner.Client +type dedupStorage struct { + ctx context.Context + dbPool *spanner.Client + delegate func(ctx context.Context, e *tessera.Entry) tessera.IndexFuture + + numLookups atomic.Uint64 + numWrites atomic.Uint64 + numDBDedups atomic.Uint64 + numPushErrs atomic.Uint64 + + buf *buffer.Buffer } -func (d *DedupStorage) Index(ctx context.Context, h []byte) (*uint64, error) { +// index returns the index (if any) previously associated with the provided hash +func (d *dedupStorage) index(ctx context.Context, h []byte) (*uint64, error) { + d.numLookups.Add(1) var idx int64 if row, err := d.dbPool.Single().ReadRow(ctx, "IDSeq", spanner.Key{0, h}, []string{"idx"}); err != nil { if c := spanner.ErrCode(err); c == codes.NotFound { @@ -825,11 +868,16 @@ func (d *DedupStorage) Index(ctx context.Context, h []byte) (*uint64, error) { return nil, fmt.Errorf("failed to read dedup index: %v", err) } idx := uint64(idx) + d.numDBDedups.Add(1) return &idx, nil } } -func (d *DedupStorage) Set(ctx context.Context, entries []tessera.DedupeMapping) error { +// storeMappings stores the associations between the keys and IDs in a non-atomic fashion +// (i.e. it does not store all or none in a transactional sense). +// +// Returns an error if one or more mappings cannot be stored. +func (d *dedupStorage) storeMappings(ctx context.Context, entries []dedupeMapping) error { m := make([]*spanner.MutationGroup, 0, len(entries)) for _, e := range entries { m = append(m, &spanner.MutationGroup{ @@ -846,3 +894,61 @@ func (d *DedupStorage) Set(ctx context.Context, entries []tessera.DedupeMapping) return nil }) } + +// dedupeMapping represents an ID -> index mapping. +type dedupeMapping struct { + ID []byte + Idx uint64 +} + +// add adds the entry to the underlying delegate only if e isn't already known. In either case, +// an IndexFuture will be returned that the client can use to get the sequence number of this entry. +func (d *dedupStorage) add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { + idx, err := d.index(ctx, e.Identity()) + if err != nil { + return func() (uint64, error) { return 0, err } + } + if idx != nil { + return func() (uint64, error) { return *idx, nil } + } + + i, err := d.delegate(ctx, e)() + if err != nil { + return func() (uint64, error) { return 0, err } + } + + err = d.enqueueMapping(ctx, e.Identity(), i) + return func() (uint64, error) { + return i, err + } +} + +// enqueueMapping buffers the provided ID -> index mapping ready to be flushed to storage. +func (d *dedupStorage) enqueueMapping(_ context.Context, h []byte, idx uint64) error { + err := d.buf.Push(dedupeMapping{ID: h, Idx: idx}) + if err != nil { + d.numPushErrs.Add(1) + // This means there's pressure flushing dedup writes out, so discard this write. + if err != buffer.ErrTimeout { + return err + } + } + return nil +} + +// flush writes enqueued mappings to storage. +func (d *dedupStorage) flush(items []interface{}) { + entries := make([]dedupeMapping, len(items)) + for i := range items { + entries[i] = items[i].(dedupeMapping) + } + + ctx, c := context.WithTimeout(d.ctx, 15*time.Second) + defer c() + + if err := d.storeMappings(ctx, entries); err != nil { + klog.Infof("Failed to flush dedup entries: %v", err) + return + } + d.numWrites.Add(uint64(len(entries))) +} From 59e0e292c59dfcc526ac767e8c7ac98a1eac7bda Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Tue, 3 Dec 2024 18:18:33 +0000 Subject: [PATCH 5/8] InMemory shouldn't lock --- dedupe.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dedupe.go b/dedupe.go index 23b9b2fa..c81ff069 100644 --- a/dedupe.go +++ b/dedupe.go @@ -16,7 +16,6 @@ package tessera import ( "context" - "sync" "github.com/hashicorp/golang-lru/v2/expirable" ) @@ -44,16 +43,17 @@ func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, si type inMemoryDedupe struct { delegate func(ctx context.Context, e *Entry) IndexFuture - mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes - cache *expirable.LRU[string, IndexFuture] + // mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes + cache *expirable.LRU[string, IndexFuture] } // Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case, // an IndexFuture will be returned that the client can use to get the sequence number of this entry. func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { id := string(e.Identity()) - d.mu.Lock() - defer d.mu.Unlock() + // XXX: This can't lock like this, or it won't compose well with other dedupe "layers" which also block. + // d.mu.Lock() + // defer d.mu.Unlock() f, ok := d.cache.Get(id) if !ok { From ce53102400334200eb7195904f4b89b3cdc1636d Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Tue, 3 Dec 2024 18:37:17 +0000 Subject: [PATCH 6/8] Use non-expiring cache's PeekOrAdd --- dedupe.go | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/dedupe.go b/dedupe.go index c81ff069..8e6ecd42 100644 --- a/dedupe.go +++ b/dedupe.go @@ -16,8 +16,10 @@ package tessera import ( "context" + "fmt" + "sync" - "github.com/hashicorp/golang-lru/v2/expirable" + lru "github.com/hashicorp/golang-lru/v2" ) // InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying @@ -34,31 +36,42 @@ import ( // InMemoryDedupe. This allows recent duplicates to be deduplicated in memory, reducing the need to // make calls to a persistent storage. func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, size uint) func(context.Context, *Entry) IndexFuture { + c, err := lru.New[string, IndexFuture](int(size)) + if err != nil { + panic(fmt.Errorf("lru.New(%d): %v", size, err)) + } dedupe := &inMemoryDedupe{ delegate: delegate, - cache: expirable.NewLRU[string, IndexFuture](int(size), nil, 0), + cache: c, } return dedupe.add } type inMemoryDedupe struct { delegate func(ctx context.Context, e *Entry) IndexFuture - // mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes - cache *expirable.LRU[string, IndexFuture] + cache *lru.Cache[string, IndexFuture] } // Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case, // an IndexFuture will be returned that the client can use to get the sequence number of this entry. func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { id := string(e.Identity()) - // XXX: This can't lock like this, or it won't compose well with other dedupe "layers" which also block. - // d.mu.Lock() - // defer d.mu.Unlock() - f, ok := d.cache.Get(id) - if !ok { - f = d.delegate(ctx, e) - d.cache.Add(id, f) + // However many calls with the same entry come in and are deduped, we should only call delegate + // once for each unique entry: + f := sync.OnceValues(func() (uint64, error) { + return d.delegate(ctx, e)() + }) + + // if we've seen this entry before, discard our f and replace + // with the one we created last time, otherwise store f against id. + if prev, ok, _ := d.cache.PeekOrAdd(id, f); ok { + f = prev } + + // Someone MUST resolve the future or the entry will never actually get assigned a sequence number. + // We may as well do it here for now to avoid anyone getting confused if they forget to do so in the + // personality. + _, _ = f() return f } From 4b166df8ba2a34c87cf118f68a9cd85adf58609e Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Thu, 5 Dec 2024 17:33:06 +0000 Subject: [PATCH 7/8] simplify --- dedupe.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/dedupe.go b/dedupe.go index 8e6ecd42..20a31917 100644 --- a/dedupe.go +++ b/dedupe.go @@ -36,7 +36,7 @@ import ( // InMemoryDedupe. This allows recent duplicates to be deduplicated in memory, reducing the need to // make calls to a persistent storage. func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, size uint) func(context.Context, *Entry) IndexFuture { - c, err := lru.New[string, IndexFuture](int(size)) + c, err := lru.New[string, func() IndexFuture](int(size)) if err != nil { panic(fmt.Errorf("lru.New(%d): %v", size, err)) } @@ -49,7 +49,7 @@ func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, si type inMemoryDedupe struct { delegate func(ctx context.Context, e *Entry) IndexFuture - cache *lru.Cache[string, IndexFuture] + cache *lru.Cache[string, func() IndexFuture] } // Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case, @@ -59,8 +59,8 @@ func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { // However many calls with the same entry come in and are deduped, we should only call delegate // once for each unique entry: - f := sync.OnceValues(func() (uint64, error) { - return d.delegate(ctx, e)() + f := sync.OnceValue(func() IndexFuture { + return d.delegate(ctx, e) }) // if we've seen this entry before, discard our f and replace @@ -69,9 +69,5 @@ func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { f = prev } - // Someone MUST resolve the future or the entry will never actually get assigned a sequence number. - // We may as well do it here for now to avoid anyone getting confused if they forget to do so in the - // personality. - _, _ = f() - return f + return f() } From 5cccf8cad919e2ce71085d09a0b4f2c5489e309a Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Thu, 5 Dec 2024 17:33:14 +0000 Subject: [PATCH 8/8] Fix test to resolve futures --- dedupe_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dedupe_test.go b/dedupe_test.go index 8bc2aa1d..c29b0851 100644 --- a/dedupe_test.go +++ b/dedupe_test.go @@ -60,13 +60,15 @@ func TestDedupe(t *testing.T) { dedupeAdd := tessera.InMemoryDedupe(delegate, 256) // Add foo, bar, baz to prime the cache to make things interesting - dedupeAdd(ctx, tessera.NewEntry([]byte("foo"))) - dedupeAdd(ctx, tessera.NewEntry([]byte("bar"))) - dedupeAdd(ctx, tessera.NewEntry([]byte("baz"))) + for _, s := range []string{"foo", "bar", "baz"} { + if _, err := dedupeAdd(ctx, tessera.NewEntry([]byte(s)))(); err != nil { + t.Fatalf("dedupeAdd(%q): %v", s, err) + } + } idx, err := dedupeAdd(ctx, tessera.NewEntry([]byte(tC.newValue)))() if err != nil { - t.Fatal(err) + t.Fatalf("dedupeAdd(%q): %v", tC.newValue, err) } if idx != tC.wantIdx { t.Errorf("got != want (%d != %d)", idx, tC.wantIdx)