Skip to content

Commit

Permalink
Fixup leaked pebble snapshots and iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
mrdimidium committed Apr 16, 2024
1 parent 407d0f6 commit 1520713
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 21 deletions.
12 changes: 8 additions & 4 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) {
// ...
io := pebble.IterOptions{}
i := cho.db.NewIter(&io)
defer i.Close()

for i.SeekGE([]byte{'l'}); i.Valid() && i.Key()[0] == 'L'; i.Next() {
address := string(i.Key()[1:])
err := tcp.Listen(address)
Expand All @@ -212,7 +214,6 @@ func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) {
_, _ = fmt.Fprintln(os.Stderr, err.Error())
}
}
_ = i.Close()
}

func (cho *Chotki) AddPacketHose(name string) (feed toyqueue.FeedCloser) {
Expand Down Expand Up @@ -388,7 +389,9 @@ func (cho *Chotki) Close() error {
cho.lock.Unlock()
return ErrClosed
}
_ = cho.db.Close()
if err := cho.db.Close(); err != nil {
return err
}
cho.db = nil
// todo
cho.last = rdx.ID0
Expand Down Expand Up @@ -437,10 +440,11 @@ func (cho *Chotki) ObjectIterator(oid rdx.ID) *pebble.Iterator {
if it.SeekGE(fro) {
id, rdt := OKeyIdRdt(it.Key())
if rdt == 'O' && id == oid {
// An iterator is returned from a function, it cannot be closed
return it
}
}
_ = it.Close()
it.Close()
return nil
}

Expand All @@ -454,14 +458,14 @@ func GetFieldTLV(reader pebble.Reader, id rdx.ID) (rdt byte, tlv []byte) {
LowerBound: []byte{'O'},
UpperBound: []byte{'P'},
})
defer it.Close()
if it.SeekGE(key) {
fact, r := OKeyIdRdt(it.Key())
if fact == id {
tlv = it.Value()
rdt = r
}
}
_ = it.Close()
return
}

Expand Down
2 changes: 2 additions & 0 deletions debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (cho *Chotki) DumpObjects() {
UpperBound: []byte{'P'},
}
i := cho.db.NewIter(&io)
defer i.Close()
for i.SeekGE([]byte{'O'}); i.Valid(); i.Next() {
_, _ = fmt.Fprintln(os.Stderr, ChotkiKVString(i.Key(), i.Value()))
}
Expand All @@ -44,6 +45,7 @@ func (cho *Chotki) DumpVV() {
UpperBound: []byte{'W'},
}
i := cho.db.NewIter(&io)
defer i.Close()
for i.SeekGE(VKey(rdx.ID0)); i.Valid(); i.Next() {
id := rdx.IDFromBytes(i.Key()[1:])
vv := make(rdx.VV)
Expand Down
28 changes: 18 additions & 10 deletions object_example_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package chotki

import (
"github.com/drpcorg/chotki/rdx"
"github.com/learn-decentralized-systems/toyqueue"
"github.com/stretchr/testify/assert"
"io"
"os"
"testing"

"github.com/drpcorg/chotki/rdx"
"github.com/learn-decentralized-systems/toyqueue"
"github.com/stretchr/testify/assert"
)

func TestORMExample(t *testing.T) {
_ = os.RemoveAll("cho1e")
_ = os.RemoveAll("cho1f")
defer func() {
_ = os.RemoveAll("cho1e")
_ = os.RemoveAll("cho1f")
}()

var a, b Chotki
err := a.Create(0x1e, "test replica")
assert.Nil(t, err)
Expand Down Expand Up @@ -54,18 +60,20 @@ func TestORMExample(t *testing.T) {
err = toyqueue.Pump(&syncera, &syncerb)
assert.Equal(t, io.EOF, err)

var exb Example
itb := b.ObjectIterator(rdx.IDFromString("1e-2"))
assert.NotNil(t, itb)

var exb Example
err = exb.Load(itb)
assert.Nil(t, err)

assert.Equal(t, "Ivan Petrov", exb.Name)
assert.Equal(t, int64(102), exb.Score)

err = a.Close()
assert.Nil(t, err)
err = b.Close()
assert.Nil(t, err)
_ = os.RemoveAll("cho1e")
_ = os.RemoveAll("cho1f")
assert.Nil(t, ita.Close())
assert.Nil(t, itb.Close())
assert.Nil(t, syncera.Close())
assert.Nil(t, syncerb.Close())
assert.Nil(t, a.Close())
assert.Nil(t, b.Close())
}
13 changes: 9 additions & 4 deletions objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (cho *Chotki) ObjectFieldsByClass(oid rdx.ID, form []string) (tid rdx.ID, t
if it == nil {
return rdx.BadId, nil, ErrUnknownObject
}
defer it.Close()

tid = rdx.IDFromZipBytes(it.Value())
for it.Next() {
id, rdt := OKeyIdRdt(it.Key())
Expand All @@ -114,7 +116,6 @@ func (cho *Chotki) ObjectFieldsByClass(oid rdx.ID, form []string) (tid rdx.ID, t
}
tlvs = append(tlvs, it.Value())
}
_ = it.Close()
return
}

Expand All @@ -124,6 +125,8 @@ func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl Fields, fact toyqu
err = ErrUnknownObject
return
}
defer it.Close()

tid = rdx.IDFromZipBytes(it.Value())
decl, err = cho.ClassFields(tid)
if err != nil {
Expand All @@ -144,7 +147,6 @@ func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl Fields, fact toyqu
}
fact = append(fact, it.Value())
}
_ = it.Close()
return
}

Expand All @@ -153,13 +155,14 @@ func (cho *Chotki) ObjectFieldsTLV(oid rdx.ID) (tid rdx.ID, tlv toyqueue.Records
if it == nil {
return rdx.BadId, nil, ErrUnknownObject
}
defer it.Close()

tid = rdx.IDFromZipBytes(it.Value())
for it.Next() {
cp := make([]byte, len(it.Value()))
copy(cp, it.Value())
tlv = append(tlv, cp)
}
_ = it.Close()
return
}

Expand All @@ -178,7 +181,10 @@ func (cho *Chotki) ObjectFieldTLV(fid rdx.ID) (rdt byte, tlv []byte, err error)
if db == nil {
return 0, nil, ErrClosed
}

it := cho.db.NewIter(&pebble.IterOptions{})
defer it.Close()

key := OKey(fid, 0)
if !it.SeekGE(key) {
return 0, nil, pebble.ErrNotFound
Expand All @@ -189,7 +195,6 @@ func (cho *Chotki) ObjectFieldTLV(fid rdx.ID) (rdt byte, tlv []byte, err error)
return 0, nil, pebble.ErrNotFound
}
tlv = it.Value()
_ = it.Close()
return
}

Expand Down
27 changes: 24 additions & 3 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,35 @@ var SendStates = []string{

func (sync *Syncer) Close() error {
sync.lock.Lock()
defer sync.lock.Unlock()

if sync.Host == nil {
sync.lock.Unlock()
return toyqueue.ErrClosed
}

if sync.snap != nil {
_ = sync.snap.Close()
sync.snap = nil
}

if sync.ffit != nil {
if err := sync.ffit.Close(); err != nil {
return err
}
sync.ffit = nil
}

if sync.vvit != nil {
if err := sync.vvit.Close(); err != nil {
return err
}
sync.vvit = nil
}

_ = sync.Host.RemovePacketHose(sync.Name)
sync.SetFeedState(SendNone) //fixme
sync.lock.Unlock()
_, _ = fmt.Fprintf(os.Stderr, "connection %s closed: %s\n", sync.Name, sync.reason)
_, _ = fmt.Fprintf(os.Stderr, "connection %s closed: %v\n", sync.Name, sync.reason)

return nil
}

Expand Down

0 comments on commit 1520713

Please sign in to comment.