Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvdb/postgres: remove global application level lock #8529

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion autopilot/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
// to open channels to.
scores, err = chooseN(numChans, scores)
if err != nil {
return fmt.Errorf("unable to make weighted choice: %v",
return fmt.Errorf("unable to make weighted choice: %w",
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
err)
}

Expand Down
2 changes: 1 addition & 1 deletion autopilot/combinedattach.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *WeightedCombAttachment) NodeScores(g ChannelGraph, chans []LocalChannel
g, chans, chanSize, nodes,
)
if err != nil {
return nil, fmt.Errorf("unable to get sub score: %v",
return nil, fmt.Errorf("unable to get sub score: %w",
err)
}

Expand Down
2 changes: 1 addition & 1 deletion autopilot/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (m *Manager) queryHeuristics(nodes map[NodeID]struct{}, localState bool) (
m.cfg.PilotCfg.Graph, totalChans, chanSize, nodes,
)
if err != nil {
return nil, fmt.Errorf("unable to get sub score: %v",
return nil, fmt.Errorf("unable to get sub score: %w",
err)
}

Expand Down
2 changes: 1 addition & 1 deletion brontide/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (l *Listener) listen() {
// rejectedConnErr is a helper function that prepends the remote address of the
// failed connection attempt to the original error message.
func rejectedConnErr(err error, remoteAddr string) error {
return fmt.Errorf("unable to accept connection from %v: %v", remoteAddr,
return fmt.Errorf("unable to accept connection from %v: %w", remoteAddr,
err)
}

Expand Down
8 changes: 4 additions & 4 deletions cert/selfsigned.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func GenCertPair(org string, tlsExtraIPs, tlsExtraDomains []string,
&template, &priv.PublicKey, priv,
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create certificate: %v",
return nil, nil, fmt.Errorf("failed to create certificate: %w",
err)
}

Expand All @@ -270,21 +270,21 @@ func GenCertPair(org string, tlsExtraIPs, tlsExtraDomains []string,
certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes},
)
if err != nil {
return nil, nil, fmt.Errorf("failed to encode certificate: %v",
return nil, nil, fmt.Errorf("failed to encode certificate: %w",
err)
}

keybytes, err := x509.MarshalECPrivateKey(priv)
if err != nil {
return nil, nil, fmt.Errorf("unable to encode privkey: %v",
return nil, nil, fmt.Errorf("unable to encode privkey: %w",
err)
}
keyBuf := &bytes.Buffer{}
err = pem.Encode(
keyBuf, &pem.Block{Type: "EC PRIVATE KEY", Bytes: keybytes},
)
if err != nil {
return nil, nil, fmt.Errorf("failed to encode private key: %v",
return nil, nil, fmt.Errorf("failed to encode private key: %w",
err)
}

Expand Down
2 changes: 1 addition & 1 deletion chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// proceed with fallback methods.
jsonErr, ok := err.(*btcjson.RPCError)
if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
return nil, fmt.Errorf("unable to query for txid %v: %v",
return nil, fmt.Errorf("unable to query for txid %v: %w",
outpoint.Hash, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion chainntnfs/btcdnotify/btcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// proceed with fallback methods.
jsonErr, ok := err.(*btcjson.RPCError)
if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
return nil, fmt.Errorf("unable to query for txid %v: %v",
return nil, fmt.Errorf("unable to query for txid %v: %w",
outpoint.Hash, err)
}
}
Expand Down
16 changes: 8 additions & 8 deletions chainntnfs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,12 @@ func GetCommonBlockAncestorHeight(chainConn ChainConn, reorgHash,
for reorgHash != chainHash {
reorgHeader, err := chainConn.GetBlockHeader(&reorgHash)
if err != nil {
return 0, fmt.Errorf("unable to get header for hash=%v: %v",
return 0, fmt.Errorf("unable to get header for hash=%v: %w",
reorgHash, err)
}
chainHeader, err := chainConn.GetBlockHeader(&chainHash)
if err != nil {
return 0, fmt.Errorf("unable to get header for hash=%v: %v",
return 0, fmt.Errorf("unable to get header for hash=%v: %w",
chainHash, err)
}
reorgHash = reorgHeader.PrevBlock
Expand All @@ -497,7 +497,7 @@ func GetCommonBlockAncestorHeight(chainConn ChainConn, reorgHash,

verboseHeader, err := chainConn.GetBlockHeaderVerbose(&chainHash)
if err != nil {
return 0, fmt.Errorf("unable to get verbose header for hash=%v: %v",
return 0, fmt.Errorf("unable to get verbose header for hash=%v: %w",
chainHash, err)
}

Expand Down Expand Up @@ -719,7 +719,7 @@ func ConfDetailsFromTxIndex(chainConn TxIndexConn, r ConfRequest,
}

return nil, TxNotFoundIndex,
fmt.Errorf("unable to query for txid %v: %v",
fmt.Errorf("unable to query for txid %v: %w",
r.TxID, err)
}

Expand All @@ -728,13 +728,13 @@ func ConfDetailsFromTxIndex(chainConn TxIndexConn, r ConfRequest,
rawTx, err := hex.DecodeString(rawTxRes.Hex)
if err != nil {
return nil, TxNotFoundIndex,
fmt.Errorf("unable to deserialize tx %v: %v",
fmt.Errorf("unable to deserialize tx %v: %w",
r.TxID, err)
}
var tx wire.MsgTx
if err := tx.Deserialize(bytes.NewReader(rawTx)); err != nil {
return nil, TxNotFoundIndex,
fmt.Errorf("unable to deserialize tx %v: %v",
fmt.Errorf("unable to deserialize tx %v: %w",
r.TxID, err)
}

Expand All @@ -759,13 +759,13 @@ func ConfDetailsFromTxIndex(chainConn TxIndexConn, r ConfRequest,
if err != nil {
return nil, TxNotFoundIndex,
fmt.Errorf("unable to get block hash %v for "+
"historical dispatch: %v", rawTxRes.BlockHash, err)
"historical dispatch: %w", rawTxRes.BlockHash, err)
}
block, err := chainConn.GetBlock(blockHash)
if err != nil {
return nil, TxNotFoundIndex,
fmt.Errorf("unable to get block with hash %v for "+
"historical dispatch: %v", blockHash, err)
"historical dispatch: %w", blockHash, err)
}

// In the modern chain (the only one we really care about for LN), the
Expand Down
4 changes: 2 additions & 2 deletions chainntnfs/neutrinonotify/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (n *NeutrinoNotifier) historicalConfDetails(confRequest chainntnfs.ConfRequ
// can compute the current block hash.
blockHash, err := n.p2pNode.GetBlockHash(int64(scanHeight))
if err != nil {
return nil, fmt.Errorf("unable to get header for height=%v: %v",
return nil, fmt.Errorf("unable to get header for height=%v: %w",
scanHeight, err)
}

Expand All @@ -601,7 +601,7 @@ func (n *NeutrinoNotifier) historicalConfDetails(confRequest chainntnfs.ConfRequ
)
if err != nil {
return nil, fmt.Errorf("unable to retrieve regular filter for "+
"height=%v: %v", scanHeight, err)
"height=%v: %w", scanHeight, err)
}

// In the case that the filter exists, we'll attempt to see if
Expand Down
4 changes: 2 additions & 2 deletions chainreg/chainregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,11 +730,11 @@ func NewChainControl(walletConfig lnwallet.Config,

lnWallet, err := lnwallet.NewLightningWallet(walletConfig)
if err != nil {
return nil, ccCleanup, fmt.Errorf("unable to create wallet: %v",
return nil, ccCleanup, fmt.Errorf("unable to create wallet: %w",
err)
}
if err := lnWallet.Startup(); err != nil {
return nil, ccCleanup, fmt.Errorf("unable to create wallet: %v",
return nil, ccCleanup, fmt.Errorf("unable to create wallet: %w",
err)
}

Expand Down
2 changes: 1 addition & 1 deletion channeldb/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func WriteElement(w io.Writer, element interface{}) error {

if e.PubKey != nil {
if err := binary.Write(w, byteOrder, true); err != nil {
return fmt.Errorf("error writing serialized element: %s", err)
return fmt.Errorf("error writing serialized element: %w", err)
}

return WriteElement(w, e.PubKey)
Expand Down
2 changes: 1 addition & 1 deletion channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
if err != nil {
return fmt.Errorf("unable to read channel data for "+
"chan_point=%v: %v", outPoint, err)
"chan_point=%v: %w", outPoint, err)
}
oChannel.Db = c

Expand Down
12 changes: 9 additions & 3 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,9 +1497,15 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
// If we reach this point, then there are no longer any edges
// that connect this node, so we can delete it.
if err := c.deleteLightningNode(nodes, nodePubKey[:]); err != nil {
log.Warnf("Unable to prune node %x from the "+
"graph: %v", nodePubKey, err)
continue
if errors.Is(err, ErrGraphNodeNotFound) ||
errors.Is(err, ErrGraphNodesNotFound) {

log.Warnf("Unable to prune node %x from the "+
"graph: %v", nodePubKey, err)
continue
}

return err
}

log.Infof("Pruned unconnected node %x from channel graph",
Expand Down
2 changes: 1 addition & 1 deletion channeldb/migration21/current/current_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func WriteElement(w io.Writer, element interface{}) error {

if e.PubKey != nil {
if err := binary.Write(w, byteOrder, true); err != nil {
return fmt.Errorf("error writing serialized element: %s", err)
return fmt.Errorf("error writing serialized element: %w", err)
}

return WriteElement(w, e.PubKey)
Expand Down
2 changes: 1 addition & 1 deletion channeldb/migration21/legacy/legacy_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func WriteElement(w io.Writer, element interface{}) error {

if e.PubKey != nil {
if err := binary.Write(w, byteOrder, true); err != nil {
return fmt.Errorf("error writing serialized element: %s", err)
return fmt.Errorf("error writing serialized element: %w", err)
}

return WriteElement(w, e.PubKey)
Expand Down
2 changes: 1 addition & 1 deletion channeldb/migration25/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func findOpenChannels(openChanBucket kvdb.RBucket) ([]*OpenChannel, error) {
// open channels as they don't have any revocation logs and
// their current commitments reflect the initial balances.
if err := FetchChanCommitments(chanBucket, c); err != nil {
return fmt.Errorf("unable to fetch chan commits: %v",
return fmt.Errorf("unable to fetch chan commits: %w",
err)
}

Expand Down
2 changes: 1 addition & 1 deletion channeldb/migration27/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func findHistoricalChannels(historicalBucket kvdb.RBucket) ([]*OpenChannel,
// Try to fetch channel info in old format.
err = fetchChanInfoCompatible(chanBucket, c, true)
if err != nil {
return fmt.Errorf("%s: fetch chan info got: %v",
return fmt.Errorf("%s: fetch chan info got: %w",
c.FundingOutpoint, err)
}

Expand Down
2 changes: 1 addition & 1 deletion channeldb/migration30/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func writeRevocationLogs(openChanBucket kvdb.RwBucket,
logEntrykey := mig24.MakeLogKey(entry.commitHeight)
err = logBucket.Put(logEntrykey[:], b.Bytes())
if err != nil {
return fmt.Errorf("putRevocationLog err: %v",
return fmt.Errorf("putRevocationLog err: %w",
err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion channeldb/migration_01_to_11/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func WriteElement(w io.Writer, element interface{}) error {

if e.PubKey != nil {
if err := binary.Write(w, byteOrder, true); err != nil {
return fmt.Errorf("error writing serialized element: %s", err)
return fmt.Errorf("error writing serialized element: %w", err)
}

return WriteElement(w, e.PubKey)
Expand Down
2 changes: 1 addition & 1 deletion channeldb/migration_01_to_11/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ func deserializeChanEdgePolicy(r io.Reader,

node, err := fetchLightningNode(nodes, pub[:])
if err != nil {
return nil, fmt.Errorf("unable to fetch node: %x, %v",
return nil, fmt.Errorf("unable to fetch node: %x, %w",
pub[:], err)
}
edge.Node = &node
Expand Down
6 changes: 3 additions & 3 deletions channeldb/migration_01_to_11/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func MigratePruneEdgeUpdateIndex(tx kvdb.RwTx) error {
// well.
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil {
return fmt.Errorf("error creating edge index bucket: %s", err)
return fmt.Errorf("error creating edge index bucket: %w", err)
}
if edgeIndex == nil {
return fmt.Errorf("unable to create/fetch edge index " +
Expand Down Expand Up @@ -546,7 +546,7 @@ func MigratePruneEdgeUpdateIndex(tx kvdb.RwTx) error {
return nil
})
if err != nil {
return fmt.Errorf("unable to gather existing edge policies: %v",
return fmt.Errorf("unable to gather existing edge policies: %w",
err)
}

Expand All @@ -560,7 +560,7 @@ func MigratePruneEdgeUpdateIndex(tx kvdb.RwTx) error {
return nil
})
if err != nil {
return fmt.Errorf("unable to gather existing edge updates: %v",
return fmt.Errorf("unable to gather existing edge updates: %w",
err)
}

Expand Down
2 changes: 1 addition & 1 deletion channeldb/migration_01_to_11/zpay32/amountunits.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func encodeAmount(msat lnwire.MilliSatoshi) (string, error) {
// Should always be expressible in pico BTC.
pico, err := fromMSat['p'](msat)
if err != nil {
return "", fmt.Errorf("unable to express %d msat as pBTC: %v",
return "", fmt.Errorf("unable to express %d msat as pBTC: %w",
msat, err)
}
shortened := strconv.FormatUint(pico, 10) + "p"
Expand Down
Loading
Loading