Skip to content

Commit 581cf2e

Browse files
committed
Go: [BREAKING] explicit destruction for futures and transactions
Stop relying on Go's GC finalizers and instead use explicit Close() calls to release resources, which is more Go-idiomatic. NOTE: this would be a breaking change for users because without any code breakage they would now have huge memory leaks on their FoundationDB clients; a release management solution must be devised for this problem.
1 parent 1d03904 commit 581cf2e

File tree

8 files changed

+121
-116
lines changed

8 files changed

+121
-116
lines changed

bindings/go/src/fdb/database.go

+32-22
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import "C"
2828

2929
import (
3030
"errors"
31-
"runtime"
3231
)
3332

3433
// ErrMultiVersionClientUnavailable is returned when the multi-version client API is unavailable.
@@ -92,6 +91,7 @@ func (opt DatabaseOptions) setOpt(code int, param []byte) error {
9291
// preferable to use the (Database).Transact method, which handles
9392
// automatically creating and committing a transaction with appropriate retry
9493
// behavior.
94+
// Close() must be called on the returned transaction to avoid a memory leak.
9595
func (d Database) CreateTransaction() (Transaction, error) {
9696
var outt *C.FDBTransaction
9797

@@ -100,10 +100,6 @@ func (d Database) CreateTransaction() (Transaction, error) {
100100
}
101101

102102
t := &transaction{outt, d}
103-
// transactions cannot be destroyed explicitly if any future is still potentially used
104-
// thus the GC is used to figure out when all Go wrapper objects for futures have gone out of scope,
105-
// making the transaction ready to be garbage-collected.
106-
runtime.SetFinalizer(t, (*transaction).destroy)
107103

108104
return Transaction{t}, nil
109105
}
@@ -114,18 +110,21 @@ func (d Database) CreateTransaction() (Transaction, error) {
114110
// if the data directory is writeable by creating a validation file. The address must be a
115111
// process address is the form of IP:Port pair.
116112
func (d Database) RebootWorker(address string, checkFile bool, suspendDuration int) error {
117-
t := &futureInt64{
118-
future: newFutureWithDb(
119-
d.database,
120-
nil,
121-
C.fdb_database_reboot_worker(
122-
d.ptr,
123-
byteSliceToPtr([]byte(address)),
124-
C.int(len(address)),
125-
C.fdb_bool_t(boolToInt(checkFile)),
126-
C.int(suspendDuration),
127-
),
113+
f := newFutureWithDb(
114+
d.database,
115+
nil,
116+
C.fdb_database_reboot_worker(
117+
d.ptr,
118+
byteSliceToPtr([]byte(address)),
119+
C.int(len(address)),
120+
C.fdb_bool_t(boolToInt(checkFile)),
121+
C.int(suspendDuration),
128122
),
123+
)
124+
defer f.Close()
125+
126+
t := &futureInt64{
127+
future: f,
129128
}
130129

131130
dbVersion, err := t.Get()
@@ -145,8 +144,11 @@ func (d Database) GetClientStatus() ([]byte, error) {
145144
return nil, errAPIVersionUnset
146145
}
147146

147+
f := newFutureWithDb(d.database, nil, C.fdb_database_get_client_status(d.ptr))
148+
defer f.Close()
149+
148150
st := &futureByteSlice{
149-
future: newFutureWithDb(d.database, nil, C.fdb_database_get_client_status(d.ptr)),
151+
future: f,
150152
}
151153

152154
b, err := st.Get()
@@ -173,7 +175,10 @@ func retryable(wrapped func() (interface{}, error), onError func(Error) FutureNi
173175
// fdb.Error
174176
var ep Error
175177
if errors.As(err, &ep) {
176-
processedErr := onError(ep).Get()
178+
f := onError(ep)
179+
defer f.Close()
180+
181+
processedErr := f.Get()
177182
var newEp Error
178183
if !errors.As(processedErr, &newEp) || newEp.Code != ep.Code {
179184
// override original error only if not an Error or code changed
@@ -214,18 +219,22 @@ func retryable(wrapped func() (interface{}, error), onError func(Error) FutureNi
214219
// Transaction and Database objects.
215220
func (d Database) Transact(f func(Transaction) (interface{}, error)) (interface{}, error) {
216221
tr, err := d.CreateTransaction()
217-
// Any error here is non-retryable
218222
if err != nil {
223+
// Any error here is non-retryable
219224
return nil, err
220225
}
226+
defer tr.Close()
221227

222228
wrapped := func() (ret interface{}, err error) {
223229
defer panicToError(&err)
224230

225231
ret, err = f(tr)
226232

227233
if err == nil {
228-
err = tr.Commit().Get()
234+
f := tr.Commit()
235+
defer f.Close()
236+
237+
err = f.Get()
229238
}
230239

231240
return
@@ -263,14 +272,14 @@ func (d Database) ReadTransact(f func(ReadTransaction) (interface{}, error)) (in
263272
// Any error here is non-retryable
264273
return nil, err
265274
}
275+
defer tr.Close()
266276

267277
wrapped := func() (ret interface{}, err error) {
268278
defer panicToError(&err)
269279

270280
ret, err = f(tr)
271281

272-
// read-only transactions are not committed and will be destroyed automatically via GC,
273-
// once all the futures go out of scope
282+
// read-only transactions are not committed and will be destroyed automatically by the deferred Close()
274283

275284
return
276285
}
@@ -300,6 +309,7 @@ func (d Database) LocalityGetBoundaryKeys(er ExactRange, limit int, readVersion
300309
if err != nil {
301310
return nil, err
302311
}
312+
defer tr.Close()
303313

304314
if readVersion != 0 {
305315
tr.SetReadVersion(readVersion)

bindings/go/src/fdb/directory/allocator.go

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func (hca highContentionAllocator) allocate(tr fdb.Transaction, s subspace.Subsp
9595
// Increment the allocation count for the current window
9696
tr.Add(hca.counters.Sub(start), oneBytes)
9797
countFuture := tr.Snapshot().Get(hca.counters.Sub(start))
98+
defer countFuture.Close()
9899

99100
allocatorMutex.Unlock()
100101

bindings/go/src/fdb/directory/directory_layer.go

+45-26
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,12 @@ func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transacti
8787
return nil, errors.New("the root directory cannot be opened")
8888
}
8989

90-
existingNode := dl.find(rtr, path).prefetchMetadata(rtr)
91-
if existingNode.exists() {
90+
existingNode, exists := dl.find(rtr, path).prefetchMetadata(rtr)
91+
if exists {
92+
defer existingNode._layer.Close()
93+
}
94+
95+
if exists {
9296
if existingNode.isInPartition(nil, false) {
9397
subpath := existingNode.getPartitionSubpath()
9498
enc, err := existingNode.getContents(dl, nil)
@@ -225,9 +229,11 @@ func (dl directoryLayer) Exists(rt fdb.ReadTransactor, path []string) (bool, err
225229
return false, err
226230
}
227231

228-
node := dl.find(rtr, path).prefetchMetadata(rtr)
229-
if !node.exists() {
232+
node, exists := dl.find(rtr, path).prefetchMetadata(rtr)
233+
if !exists {
230234
return false, nil
235+
} else {
236+
defer node._layer.Close()
231237
}
232238

233239
if node.isInPartition(nil, false) {
@@ -252,9 +258,11 @@ func (dl directoryLayer) List(rt fdb.ReadTransactor, path []string) ([]string, e
252258
return nil, err
253259
}
254260

255-
node := dl.find(rtr, path).prefetchMetadata(rtr)
256-
if !node.exists() {
261+
node, exists := dl.find(rtr, path).prefetchMetadata(rtr)
262+
if !exists {
257263
return nil, ErrDirNotExists
264+
} else {
265+
defer node._layer.Close()
258266
}
259267

260268
if node.isInPartition(nil, true) {
@@ -291,8 +299,14 @@ func (dl directoryLayer) Move(t fdb.Transactor, oldPath []string, newPath []stri
291299
return nil, errors.New("the destination directory cannot be a subdirectory of the source directory")
292300
}
293301

294-
oldNode := dl.find(tr, oldPath).prefetchMetadata(tr)
295-
newNode := dl.find(tr, newPath).prefetchMetadata(tr)
302+
oldNode, oldNodeExists := dl.find(tr, oldPath).prefetchMetadata(tr)
303+
if oldNodeExists {
304+
defer oldNode._layer.Close()
305+
}
306+
newNode, newNodeExists := dl.find(tr, newPath).prefetchMetadata(tr)
307+
if newNodeExists {
308+
defer newNode._layer.Close()
309+
}
296310

297311
if !oldNode.exists() {
298312
return nil, errors.New("the source directory does not exist")
@@ -349,10 +363,11 @@ func (dl directoryLayer) Remove(t fdb.Transactor, path []string) (bool, error) {
349363
return false, errors.New("the root directory cannot be removed")
350364
}
351365

352-
node := dl.find(tr, path).prefetchMetadata(tr)
353-
354-
if !node.exists() {
366+
node, exists := dl.find(tr, path).prefetchMetadata(tr)
367+
if !exists {
355368
return false, nil
369+
} else {
370+
defer node._layer.Close()
356371
}
357372

358373
if node.isInPartition(nil, false) {
@@ -416,16 +431,10 @@ func (dl directoryLayer) subdirNames(rtr fdb.ReadTransaction, node subspace.Subs
416431
sd := node.Sub(_SUBDIRS)
417432

418433
rr := rtr.GetRange(sd, fdb.RangeOptions{})
419-
ri := rr.Iterator()
420434

421435
var ret []string
422-
423-
for ri.Advance() {
424-
kv, err := ri.Get()
425-
if err != nil {
426-
return nil, err
427-
}
428-
436+
kvs := rr.GetSliceOrPanic()
437+
for _, kv := range kvs {
429438
p, err := sd.Unpack(kv.Key)
430439
if err != nil {
431440
return nil, err
@@ -441,13 +450,10 @@ func (dl directoryLayer) subdirNodes(tr fdb.Transaction, node subspace.Subspace)
441450
sd := node.Sub(_SUBDIRS)
442451

443452
rr := tr.GetRange(sd, fdb.RangeOptions{})
444-
ri := rr.Iterator()
445453

446454
var ret []subspace.Subspace
447-
448-
for ri.Advance() {
449-
kv := ri.MustGet()
450-
455+
kvs := rr.GetSliceOrPanic()
456+
for _, kv := range kvs {
451457
ret = append(ret, dl.nodeWithPrefix(kv.Value))
452458
}
453459

@@ -507,7 +513,10 @@ func (dl directoryLayer) isPrefixFree(rtr fdb.ReadTransaction, prefix []byte) (b
507513
}
508514

509515
func (dl directoryLayer) checkVersion(rtr fdb.ReadTransaction, tr *fdb.Transaction) error {
510-
version, err := rtr.Get(dl.rootNode.Sub([]byte("version"))).Get()
516+
f := rtr.Get(dl.rootNode.Sub([]byte("version")))
517+
defer f.Close()
518+
519+
version, err := f.Get()
511520
if err != nil {
512521
return err
513522
}
@@ -590,8 +599,18 @@ func (dl directoryLayer) nodeWithPrefix(prefix []byte) subspace.Subspace {
590599

591600
func (dl directoryLayer) find(rtr fdb.ReadTransaction, path []string) *node {
592601
n := &node{dl.rootNode, []string{}, path, nil}
602+
futures := make([]fdb.FutureByteSlice, len(path))
593603
for i := range path {
594-
n = &node{dl.nodeWithPrefix(rtr.Get(n.subspace.Sub(_SUBDIRS, path[i])).MustGet()), path[:i+1], path, nil}
604+
futures[i] = rtr.Get(n.subspace.Sub(_SUBDIRS, path[i]))
605+
}
606+
defer func() {
607+
for _, f := range futures {
608+
f.Close()
609+
}
610+
}()
611+
612+
for i, f := range futures {
613+
n = &node{dl.nodeWithPrefix(f.MustGet()), path[:i+1], path, nil}
595614
if !n.exists() || bytes.Compare(n.layer(rtr).MustGet(), []byte("partition")) == 0 {
596615
return n
597616
}

bindings/go/src/fdb/directory/node.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ func (n *node) exists() bool {
4343
return true
4444
}
4545

46-
func (n *node) prefetchMetadata(rtr fdb.ReadTransaction) *node {
46+
func (n *node) prefetchMetadata(rtr fdb.ReadTransaction) (*node, bool) {
4747
if n.exists() {
4848
n.layer(rtr)
49+
return n, true
4950
}
50-
return n
51+
return n, false
5152
}
5253

5354
func (n *node) layer(rtr fdb.ReadTransaction) fdb.FutureByteSlice {

0 commit comments

Comments
 (0)