Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCP] Dedup storage experiment #363

Merged
merged 8 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update to match delegate style
  • Loading branch information
AlCutter committed Dec 5, 2024
commit eca09d49c25f79da204ca39d934b5ae1485ccfe1
29 changes: 10 additions & 19 deletions cmd/conformance/gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main

import (
"context"
"crypto/sha256"
"errors"
"flag"
"fmt"
Expand All @@ -38,6 +37,7 @@ var (
listen = flag.String("listen", ":2024", "Address:port to listen on")
spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')")
signer = flag.String("signer", "", "Note signer to use to sign checkpoints")
persistentDedup = flag.Bool("gcp_dedup", false, "Set to true to enable persistent dedupe storage")
additionalSigners = []string{}
)

Expand Down Expand Up @@ -66,13 +66,17 @@ func main() {
if err != nil {
klog.Exitf("Failed to create new GCP storage: %v", err)
}
dedupeAdd := tessera.InMemoryDedupe(storage.Add, 256)

gcpDedup, err := gcp.NewDedupeStorage(ctx, fmt.Sprintf("%s_dedup", *spanner))
if err != nil {
klog.Exitf("Failed to create new GCP dedupe storage: %v", err)
// Handle dedup configuration
addDelegate := storage.Add
if *persistentDedup {
gcpDedup, err := gcp.NewDedupeStorage(ctx, fmt.Sprintf("%s_dedup", *spanner))
if err != nil {
klog.Exitf("Failed to create new GCP dedupe storage: %v", err)
}
addDelegate = tessera.PersistentDedupe(ctx, gcpDedup, addDelegate)
}
dedup := tessera.NewDeduper(ctx, gcpDedup)
dedupeAdd := tessera.InMemoryDedupe(addDelegate, 256)

// Expose a HTTP handler for the conformance test writes.
// This should accept arbitrary bytes POSTed to /add, and return an ascii
Expand All @@ -84,16 +88,6 @@ func main() {
return
}

id := sha256.Sum256(b)
if idx, err := dedup.Index(r.Context(), id[:]); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
} else if idx != nil {
_, _ = w.Write([]byte(fmt.Sprintf("%d", *idx)))
return
}

idx, err := dedupeAdd(r.Context(), tessera.NewEntry(b))()
if err != nil {
if errors.Is(err, tessera.ErrPushback) {
Expand All @@ -107,9 +101,6 @@ func main() {
}
// Write out the assigned index
_, _ = w.Write([]byte(fmt.Sprintf("%d", idx)))
if err := dedup.Set(r.Context(), id[:], idx); err != nil {
klog.Warningf("Failed to set dedup %x -> %d: %v", id, idx, err)
}
})

h2s := &http2.Server{}
Expand Down
131 changes: 131 additions & 0 deletions dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package tessera
import (
"context"
"sync"
"time"

"github.com/globocom/go-buffer"
"github.com/hashicorp/golang-lru/v2/expirable"
"k8s.io/klog/v2"
)

// InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying
Expand Down Expand Up @@ -62,3 +65,131 @@ func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture {
}
return f
}

// DedupeStorage describes a struct which can store and retrieve hash to index mappings.
type DedupeStorage interface {
// Set must persist the provided id->index mappings.
// It must return an error if it fails to store one or more of the mappings, but it does not
// have to store all mappings atomically.
Set(context.Context, []DedupeMapping) error
// Index must return the index of a previously stored ID, or nil if the ID is unknown.
Index(context.Context, []byte) (*uint64, error)
}

// DedupeMapping represents an ID -> index mapping.
type DedupeMapping struct {
ID []byte
Idx uint64
}

type persistentDedup struct {
ctx context.Context
storage DedupeStorage
delegate func(ctx context.Context, e *Entry) IndexFuture

mu sync.Mutex
numLookups uint64
numWrites uint64
numDBDedups uint64
numPushErrs uint64

buf *buffer.Buffer
}

// PersistentDedup returns a wrapped Add method which will return the previously seen index for the given entry, if such an entry exists
func PersistentDedupe(ctx context.Context, s DedupeStorage, delegate func(ctx context.Context, e *Entry) IndexFuture) func(context.Context, *Entry) IndexFuture {
r := &persistentDedup{
ctx: ctx,
storage: s,
delegate: delegate,
}

// TODO(al): Make these configurable
r.buf = buffer.New(
buffer.WithSize(64),
buffer.WithFlushInterval(200*time.Millisecond),
buffer.WithFlusher(buffer.FlusherFunc(r.flush)),
buffer.WithPushTimeout(15*time.Second),
)
go func(ctx context.Context) {
t := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-t.C:
r.mu.Lock()
klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites, r.numLookups, r.numDBDedups, r.numPushErrs)
r.mu.Unlock()
}
}
}(ctx)
return r.add
}

// add adds the entry to the underlying delegate only if e isn't already known. In either case,
// an IndexFuture will be returned that the client can use to get the sequence number of this entry.
func (d *persistentDedup) add(ctx context.Context, e *Entry) IndexFuture {
idx, err := d.Index(ctx, e.Identity())
if err != nil {
return func() (uint64, error) { return 0, err }
}
if idx != nil {
return func() (uint64, error) { return *idx, nil }
}

i, err := d.delegate(ctx, e)()
if err != nil {
return func() (uint64, error) { return 0, err }
}

err = d.Set(ctx, e.Identity(), i)
return func() (uint64, error) {
return i, err
}
}

func (d *persistentDedup) inc(p *uint64) {
d.mu.Lock()
defer d.mu.Unlock()
(*p)++
}

func (d *persistentDedup) Index(ctx context.Context, h []byte) (*uint64, error) {
d.inc(&d.numLookups)
r, err := d.storage.Index(ctx, h)
if r != nil {
d.inc(&d.numDBDedups)
}
return r, err
}

func (d *persistentDedup) Set(_ context.Context, h []byte, idx uint64) error {
err := d.buf.Push(DedupeMapping{ID: h, Idx: idx})
if err != nil {
d.inc(&d.numPushErrs)
// This means there's pressure flushing dedup writes out, so discard this write.
if err != buffer.ErrTimeout {
return err
}
}
return nil
}

func (d *persistentDedup) flush(items []interface{}) {
entries := make([]DedupeMapping, len(items))
for i := range items {
entries[i] = items[i].(DedupeMapping)
}

ctx, c := context.WithTimeout(d.ctx, 15*time.Second)
defer c()

if err := d.storage.Set(ctx, entries); err != nil {
klog.Infof("Failed to flush dedup entries: %v", err)
return
}
d.mu.Lock()
d.numWrites += uint64(len(entries))
d.mu.Unlock()
}
107 changes: 0 additions & 107 deletions deduper.go

This file was deleted.

2 changes: 1 addition & 1 deletion storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (d *DedupStorage) Index(ctx context.Context, h []byte) (*uint64, error) {
}
}

func (d *DedupStorage) Set(ctx context.Context, entries []tessera.DedupEntry) error {
func (d *DedupStorage) Set(ctx context.Context, entries []tessera.DedupeMapping) error {
m := make([]*spanner.MutationGroup, 0, len(entries))
for _, e := range entries {
m = append(m, &spanner.MutationGroup{
Expand Down