Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Codebase Cleanup, Badger Query Optimization, PostgreSQL Index Creation #12

Merged
merged 13 commits into from
May 18, 2020
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ jobs:

# specify any bash command here prefixed with `run: `
- run: go get -v -t -d ./...
- run: go test -v -race -short -coverprofile=coverage.txt ./...
- run: go test -v -race -short -coverprofile=coverage.txt -timeout 1800s ./...
- run: bash <(curl -s https://codecov.io/bash)
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# go-datastores

[![codecov](https://codecov.io/gh/RTradeLtd/go-datastores/branch/master/graph/badge.svg)](https://codecov.io/gh/RTradeLtd/go-datastores) [![Build Status](https://travis-ci.com/RTradeLtd/go-datastores.svg?branch=master)](https://travis-ci.com/RTradeLtd/go-datastores) [![RTradeLtd](https://circleci.com/gh/RTradeLtd/go-datastores.svg?style=shield)](https://app.circleci.com/pipelines/github/RTradeLtd/go-datastores) [![Go Report Card](https://goreportcard.com/badge/github.com/RTradeLtd/go-datastores)](https://goreportcard.com/report/github.com/RTradeLtd/go-datastores)


`go-datastores` is a collection of a variety of IPFS datastores to be used by TemporalX in a single monorepo for easy maintenance. A majority of these datastores are forked from upstream repositories, with minor modifications to faciltiate easier integration with TemporalX, along with performance improvements and optimizations where possible. Additionally it allows us to pull in all datastores we need from a single repository.

If you are a user of TemporalX and want to be able to use datastores that we do not yet support, you can submit a PR and we'll enable usage of the datastore within our next release
Expand Down
164 changes: 0 additions & 164 deletions badger/datastore.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dsbadger

import (
"context"
"errors"
"log"
"time"
Expand Down Expand Up @@ -519,169 +518,6 @@ func (t *txn) Query(q dsq.Query) (dsq.Results, error) {
return t.query(q)
}

func (t *txn) query(q dsq.Query) (dsq.Results, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = !q.KeysOnly
prefix := ds.NewKey(q.Prefix).String()
if prefix != "/" {
opt.Prefix = []byte(prefix + "/")
}
// Handle ordering
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
// We order by key by default.
case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
// Reverse order by key
opt.Reverse = true
default:
// Ok, we have a weird order we can't handle. Let's
// perform the _base_ query (prefix, filter, etc.), then
// handle sort/offset/limit later.

// Skip the stuff we can't apply.
baseQuery := q
baseQuery.Limit = 0
baseQuery.Offset = 0
baseQuery.Orders = nil

// perform the base query.
res, err := t.query(baseQuery)
if err != nil {
return nil, err
}

// fix the query
res = dsq.ResultsReplaceQuery(res, q)

// Remove the parts we've already applied.
naiveQuery := q
naiveQuery.Prefix = ""
naiveQuery.Filters = nil

// Apply the rest of the query
return dsq.NaiveQueryApply(naiveQuery, res), nil
}
}
var (
it = t.txn.NewIterator(opt)
done = make(chan bool)
resultChan = make(chan dsq.Result)
entries = make([]dsq.Entry, 0)
)
go func() {
defer func() {
done <- true
}()
if t.ds.closed.IsSet() {
return
}
// this iterator is part of an implicit transaction, so when
// we're done we must discard the transaction. It's safe to
// discard the txn it because it contains the iterator only.
if t.implicit {
defer t.discard()
}
defer it.Close()
// All iterators must be started by rewinding.
it.Rewind()
// skip to the offset
for skipped := 0; skipped < q.Offset && it.Valid(); it.Next() {
// On the happy path, we have no filters and we can go
// on our way.
if len(q.Filters) == 0 {
skipped++
continue
}
// On the sad path, we need to apply filters before
// counting the item as "skipped" as the offset comes
// _after_ the filter.
item := it.Item()
matches := true
check := func(value []byte) error {
e := dsq.Entry{Key: string(item.Key()), Value: value, Size: int(item.ValueSize())}
// Only calculate expirations if we need them.
if q.ReturnExpirations {
e.Expiration = expires(item)
}
matches = filter(q.Filters, e)
return nil
}
// Maybe check with the value, only if we need it.
var err error
if q.KeysOnly {
err = check(nil)
} else {
err = item.Value(check)
}
if err != nil {
select {
case resultChan <- dsq.Result{Error: err}:
case <-t.ds.closing:
return
case <-ctx.Done():
return
}
}
if !matches {
skipped++
}
}
for sent := 0; (q.Limit <= 0 || sent < q.Limit) && it.Valid(); it.Next() {
item := it.Item()
e := dsq.Entry{Key: string(item.Key())}
// Maybe get the value
var result dsq.Result
if !q.KeysOnly {
b, err := item.ValueCopy(nil)
if err != nil {
result = dsq.Result{Error: err}
} else {
e.Value = b
e.Size = len(b)
result = dsq.Result{Entry: e}
}
} else {
e.Size = int(item.ValueSize())
result = dsq.Result{Entry: e}
}

if q.ReturnExpirations {
result.Expiration = expires(item)
}

// Finally, filter it (unless we're dealing with an error).
if result.Error == nil && filter(q.Filters, e) {
continue
}

select {
case resultChan <- result:
sent++
case <-ctx.Done():
return
case <-t.ds.closing:
return
}
}
}()
for {
select {
case <-done:
goto FINISHED
case result := <-resultChan:
if result.Error != nil {
log.Println("query result failure: ", result.Error)
}
entries = append(entries, result.Entry)
}
}
FINISHED:
return dsq.ResultsWithEntries(q, entries), nil
}

func (t *txn) Commit() error {
if t.ds.closed.IsSet() {
return ErrClosed
Expand Down
79 changes: 13 additions & 66 deletions badger/ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,12 @@ import (
"testing"
"time"

"github.com/RTradeLtd/go-datastores/testutils"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
dstest "github.com/ipfs/go-datastore/test"
)

var testcases = map[string]string{
"/a": "a",
"/a/b": "ab",
"/a/b/c": "abc",
"/a/b/d": "a/b/d",
"/a/c": "ac",
"/a/d": "ad",
"/e": "e",
"/f": "f",
"/g": "",
}

// returns datastore, and a function to call on exit.
// (this garbage collects). So:
//
Expand Down Expand Up @@ -66,26 +55,6 @@ func newDSSync(t *testing.T, sync bool) (*Datastore, func()) {
}
}

func addTestCases(t *testing.T, d *Datastore, testcases map[string]string) {
for k, v := range testcases {
dsk := ds.NewKey(k)
if err := d.Put(dsk, []byte(v)); err != nil {
t.Fatal(err)
}
}

for k, v := range testcases {
dsk := ds.NewKey(k)
v2, err := d.Get(dsk)
if err != nil {
t.Fatal(err)
}
if string(v2) != v {
t.Errorf("%s values differ: %s != %s", k, v, v2)
}
}
}

func Test_Sync(t *testing.T) {
type args struct {
sync bool
Expand Down Expand Up @@ -129,12 +98,12 @@ func TestQuery(t *testing.T) {
d, done := newDS(t)
defer done()

addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)
rs, err := d.Query(dsq.Query{Prefix: "/a/"})
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
testutils.ExpectMatches(t, []string{
"/a/b",
"/a/b/c",
"/a/b/d",
Expand All @@ -147,7 +116,7 @@ func TestQuery(t *testing.T) {
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
testutils.ExpectMatches(t, []string{
"/a/b/d",
"/a/c",
}, rs)
Expand All @@ -156,7 +125,7 @@ func TestQuery(t *testing.T) {
func TestHas(t *testing.T) {
d, done := newDS(t)
defer done()
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)

has, err := d.Has(ds.NewKey("/a/b/c"))
if err != nil {
Expand All @@ -180,14 +149,14 @@ func TestHas(t *testing.T) {
func TestGetSize(t *testing.T) {
d, done := newDS(t)
defer done()
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)

size, err := d.GetSize(ds.NewKey("/a/b/c"))
if err != nil {
t.Error(err)
}

if size != len(testcases["/a/b/c"]) {
if size != len(testutils.TestCases["/a/b/c"]) {
t.Error("")
}

Expand All @@ -200,7 +169,7 @@ func TestGetSize(t *testing.T) {
func TestNotExistGet(t *testing.T) {
d, done := newDS(t)
defer done()
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)

has, err := d.Has(ds.NewKey("/a/b/c/d"))
if err != nil {
Expand All @@ -227,7 +196,7 @@ func TestNotExistGet(t *testing.T) {
func TestDelete(t *testing.T) {
d, done := newDS(t)
defer done()
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)

has, err := d.Has(ds.NewKey("/a/b/c"))
if err != nil {
Expand Down Expand Up @@ -270,28 +239,6 @@ func TestGetEmpty(t *testing.T) {
}
}

func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
}

if len(actual) != len(expect) {
t.Error("not enough", expect, actual)
}
for _, k := range expect {
found := false
for _, e := range actual {
if e.Key == k {
found = true
}
}
if !found {
t.Error(k, "not found")
}
}
}

func TestBatching(t *testing.T) {
d, done := newDS(t)
defer done()
Expand All @@ -301,7 +248,7 @@ func TestBatching(t *testing.T) {
t.Fatal(err)
}

for k, v := range testcases {
for k, v := range testutils.TestCases {
err := b.Put(ds.NewKey(k), []byte(v))
if err != nil {
t.Fatal(err)
Expand All @@ -313,7 +260,7 @@ func TestBatching(t *testing.T) {
t.Fatal(err)
}

for k, v := range testcases {
for k, v := range testutils.TestCases {
val, err := d.Get(ds.NewKey(k))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -351,7 +298,7 @@ func TestBatching(t *testing.T) {
t.Fatal(err)
}

expectMatches(t, []string{
testutils.ExpectMatches(t, []string{
"/a",
"/a/b/d",
"/a/c",
Expand Down Expand Up @@ -656,7 +603,7 @@ func TestDiskUsage(t *testing.T) {
if err != nil {
t.Fatal(err)
}
addTestCases(t, d, testcases)
testutils.AddTestCases(t, d, testutils.TestCases)
d.Close()

d, err = NewDatastore(path, nil)
Expand Down
Loading