Skip to content

Commit

Permalink
Update to match delegate style
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Dec 3, 2024
1 parent 86955e8 commit b6ecaf3
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 127 deletions.
29 changes: 10 additions & 19 deletions cmd/conformance/gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main

import (
"context"
"crypto/sha256"
"errors"
"flag"
"fmt"
Expand All @@ -38,6 +37,7 @@ var (
listen = flag.String("listen", ":2024", "Address:port to listen on")
spanner = flag.String("spanner", "", "Spanner resource URI ('projects/.../...')")
signer = flag.String("signer", "", "Note signer to use to sign checkpoints")
persistentDedup = flag.Bool("gcp_dedup", false, "Set to true to enable persistent dedupe storage")
additionalSigners = []string{}
)

Expand Down Expand Up @@ -66,13 +66,17 @@ func main() {
if err != nil {
klog.Exitf("Failed to create new GCP storage: %v", err)
}
dedupeAdd := tessera.InMemoryDedupe(storage.Add, 256)

gcpDedup, err := gcp.NewDedupeStorage(ctx, fmt.Sprintf("%s_dedup", *spanner))
if err != nil {
klog.Exitf("Failed to create new GCP dedupe storage: %v", err)
// Handle dedup configuration
addDelegate := storage.Add
if *persistentDedup {
gcpDedup, err := gcp.NewDedupeStorage(ctx, fmt.Sprintf("%s_dedup", *spanner))
if err != nil {
klog.Exitf("Failed to create new GCP dedupe storage: %v", err)
}
addDelegate = tessera.PersistentDedupe(ctx, gcpDedup, addDelegate)
}
dedup := tessera.NewDeduper(ctx, gcpDedup)
dedupeAdd := tessera.InMemoryDedupe(addDelegate, 256)

// Expose a HTTP handler for the conformance test writes.
// This should accept arbitrary bytes POSTed to /add, and return an ascii
Expand All @@ -84,16 +88,6 @@ func main() {
return
}

id := sha256.Sum256(b)
if idx, err := dedup.Index(r.Context(), id[:]); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
} else if idx != nil {
_, _ = w.Write([]byte(fmt.Sprintf("%d", *idx)))
return
}

idx, err := dedupeAdd(r.Context(), tessera.NewEntry(b))()
if err != nil {
if errors.Is(err, tessera.ErrPushback) {
Expand All @@ -107,9 +101,6 @@ func main() {
}
// Write out the assigned index
_, _ = w.Write([]byte(fmt.Sprintf("%d", idx)))
if err := dedup.Set(r.Context(), id[:], idx); err != nil {
klog.Warningf("Failed to set dedup %x -> %d: %v", id, idx, err)
}
})

h2s := &http2.Server{}
Expand Down
131 changes: 131 additions & 0 deletions dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package tessera
import (
"context"
"sync"
"time"

"github.com/globocom/go-buffer"
"github.com/hashicorp/golang-lru/v2/expirable"
"k8s.io/klog/v2"
)

// InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying
Expand Down Expand Up @@ -62,3 +65,131 @@ func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture {
}
return f
}

// DedupeStorage describes a struct which can store and retrieve hash to index mappings.
type DedupeStorage interface {
// Set must persist the provided id->index mappings.
// It must return an error if it fails to store one or more of the mappings, but it does not
// have to store all mappings atomically.
Set(context.Context, []DedupeMapping) error
// Index must return the index of a previously stored ID, or nil if the ID is unknown.
Index(context.Context, []byte) (*uint64, error)
}

// DedupeMapping represents an ID -> index mapping.
type DedupeMapping struct {
ID []byte
Idx uint64
}

type persistentDedup struct {
ctx context.Context
storage DedupeStorage
delegate func(ctx context.Context, e *Entry) IndexFuture

mu sync.Mutex
numLookups uint64
numWrites uint64
numDBDedups uint64
numPushErrs uint64

buf *buffer.Buffer
}

// PersistentDedup returns a wrapped Add method which will return the previously seen index for the given entry, if such an entry exists
func PersistentDedupe(ctx context.Context, s DedupeStorage, delegate func(ctx context.Context, e *Entry) IndexFuture) func(context.Context, *Entry) IndexFuture {
r := &persistentDedup{
ctx: ctx,
storage: s,
delegate: delegate,
}

// TODO(al): Make these configurable
r.buf = buffer.New(
buffer.WithSize(64),
buffer.WithFlushInterval(200*time.Millisecond),
buffer.WithFlusher(buffer.FlusherFunc(r.flush)),
buffer.WithPushTimeout(15*time.Second),
)
go func(ctx context.Context) {
t := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-t.C:
r.mu.Lock()
klog.V(1).Infof("DEDUP: # Writes %d, # Lookups %d, # DB hits %v, # buffer Push discards %d", r.numWrites, r.numLookups, r.numDBDedups, r.numPushErrs)
r.mu.Unlock()
}
}
}(ctx)
return r.add
}

// add adds the entry to the underlying delegate only if e isn't already known. In either case,
// an IndexFuture will be returned that the client can use to get the sequence number of this entry.
func (d *persistentDedup) add(ctx context.Context, e *Entry) IndexFuture {
idx, err := d.Index(ctx, e.Identity())
if err != nil {
return func() (uint64, error) { return 0, err }
}
if idx != nil {
return func() (uint64, error) { return *idx, nil }
}

i, err := d.delegate(ctx, e)()
if err != nil {
return func() (uint64, error) { return 0, err }
}

err = d.Set(ctx, e.Identity(), i)
return func() (uint64, error) {
return i, err
}
}

func (d *persistentDedup) inc(p *uint64) {
d.mu.Lock()
defer d.mu.Unlock()
(*p)++
}

func (d *persistentDedup) Index(ctx context.Context, h []byte) (*uint64, error) {
d.inc(&d.numLookups)
r, err := d.storage.Index(ctx, h)
if r != nil {
d.inc(&d.numDBDedups)
}
return r, err
}

func (d *persistentDedup) Set(_ context.Context, h []byte, idx uint64) error {
err := d.buf.Push(DedupeMapping{ID: h, Idx: idx})
if err != nil {
d.inc(&d.numPushErrs)
// This means there's pressure flushing dedup writes out, so discard this write.
if err != buffer.ErrTimeout {
return err
}
}
return nil
}

func (d *persistentDedup) flush(items []interface{}) {
entries := make([]DedupeMapping, len(items))
for i := range items {
entries[i] = items[i].(DedupeMapping)
}

ctx, c := context.WithTimeout(d.ctx, 15*time.Second)
defer c()

if err := d.storage.Set(ctx, entries); err != nil {
klog.Infof("Failed to flush dedup entries: %v", err)
return
}
d.mu.Lock()
d.numWrites += uint64(len(entries))
d.mu.Unlock()
}
107 changes: 0 additions & 107 deletions deduper.go

This file was deleted.

2 changes: 1 addition & 1 deletion storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ func (d *DedupStorage) Index(ctx context.Context, h []byte) (*uint64, error) {
}
}

func (d *DedupStorage) Set(ctx context.Context, entries []tessera.DedupEntry) error {
func (d *DedupStorage) Set(ctx context.Context, entries []tessera.DedupeMapping) error {
m := make([]*spanner.MutationGroup, 0, len(entries))
for _, e := range entries {
m = append(m, &spanner.MutationGroup{
Expand Down

0 comments on commit b6ecaf3

Please sign in to comment.