Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add transaction result subscription #9955

Merged
merged 10 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

- [9930](https://github.com/vegaprotocol/vega/issues/9930) - `LiquidityFeeSettings` can now be used in market proposals to choose how liquidity fees are calculated.
- [9982](https://github.com/vegaprotocol/vega/issues/9982) - Remove fees and minimal transfer amount from vested account
- [9955](https://github.com/vegaprotocol/vega/issues/9955) - Add data node subscription for transaction results.

### 🐛 Fixes

Expand All @@ -23,7 +24,6 @@
- [9952](https://github.com/vegaprotocol/vega/issues/9952) - `PnL` flickering fix.
- [9977](https://github.com/vegaprotocol/vega/issues/9977) - Transfer infra fees directly to general account without going through vesting.


## 0.73.0

### 🚨 Breaking changes
Expand All @@ -41,7 +41,6 @@
- [9408](https://github.com/vegaprotocol/vega/issues/9408) - Enforce pagination range.
- [9757](https://github.com/vegaprotocol/vega/issues/9757) - Liquidity provisions `API` shows the pending `LP` instead of the current when an update is accepted by the network.


### 🛠 Improvements

- [8051](https://github.com/vegaprotocol/vega/issues/8051) - Upgrade to comet `0.38.0`
Expand Down
1 change: 1 addition & 0 deletions cmd/data-node/commands/start/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (l *NodeCommand) createGRPCServer(config api.Config) *api.GRPCServer {
l.paidLiquidityFeesStatsService,
l.partyLockedBalancesService,
l.partyVestingBalancesService,
l.transactionResultsService,
)
return grpcServer
}
6 changes: 6 additions & 0 deletions cmd/data-node/commands/start/sqlsubscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type SQLSubscribers struct {
paidLiquidityFeesStatsService *service.PaidLiquidityFeesStats
partyLockedBalancesService *service.PartyLockedBalances
partyVestingBalancesService *service.PartyVestingBalances
transactionResultsService *service.TransactionResults

// Subscribers
accountSub *sqlsubscribers.Account
Expand Down Expand Up @@ -180,6 +181,7 @@ type SQLSubscribers struct {
volumeDiscountProgramSub *sqlsubscribers.VolumeDiscountProgram
paidLiquidityFeesStatsSub *sqlsubscribers.PaidLiquidityFeesStats
vestingSummarySub *sqlsubscribers.VestingBalancesSummary
transactionResultsSub *sqlsubscribers.TransactionResults
}

func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
Expand Down Expand Up @@ -233,6 +235,7 @@ func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
s.volumeDiscountProgramSub,
s.paidLiquidityFeesStatsSub,
s.vestingSummarySub,
s.transactionResultsSub,
}
}

Expand Down Expand Up @@ -345,6 +348,9 @@ func (s *SQLSubscribers) SetupServices(ctx context.Context, log *logging.Logger,
s.partyLockedBalancesService = service.NewPartyLockedBalances(s.partyLockedBalancesStore)
s.partyVestingBalancesService = service.NewPartyVestingBalances(s.partyVestingBalancesStore)

s.transactionResultsSub = sqlsubscribers.NewTransactionResults(log)
s.transactionResultsService = service.NewTransactionResults(s.transactionResultsSub)

toInit := []interface{ Initialise(context.Context) error }{
s.marketDepthService,
s.marketDataService,
Expand Down
16 changes: 16 additions & 0 deletions core/events/transaction_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ type TransactionResult struct {
evt *eventspb.TransactionResult
}

func (tr *TransactionResult) PartyID() string {
return tr.evt.PartyId
}

func (tr *TransactionResult) Status() bool {
return tr.evt.Status
}

func (tr *TransactionResult) Hash() string {
return tr.evt.Hash
}

func NewTransactionResultEventSuccess(
ctx context.Context,
hash, party string,
Expand Down Expand Up @@ -186,6 +198,10 @@ func (t TransactionResult) Proto() eventspb.TransactionResult {
return *t.evt
}

func (t TransactionResult) TransactionResult() TransactionResult {
return t
}

func (t TransactionResult) StreamMessage() *eventspb.BusEvent {
busEvent := newBusEventFromBase(t.Base)
busEvent.Event = &eventspb.BusEvent_TransactionResult{
Expand Down
4 changes: 4 additions & 0 deletions datanode/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type GRPCServer struct {
paidLiquidityFeesStatsService *service.PaidLiquidityFeesStats
partyLockedBalances *service.PartyLockedBalances
partyVestingBalances *service.PartyVestingBalances
transactionResults *service.TransactionResults

eventObserver *eventObserver

Expand Down Expand Up @@ -217,6 +218,7 @@ func NewGRPCServer(
paidLiquidityFeesStatsService *service.PaidLiquidityFeesStats,
partyLockedBalances *service.PartyLockedBalances,
partyVestingBalances *service.PartyVestingBalances,
transactionResults *service.TransactionResults,
) *GRPCServer {
// setup logger
log = log.Named(namedLogger)
Expand Down Expand Up @@ -282,6 +284,7 @@ func NewGRPCServer(
paidLiquidityFeesStatsService: paidLiquidityFeesStatsService,
partyLockedBalances: partyLockedBalances,
partyVestingBalances: partyVestingBalances,
transactionResults: transactionResults,

eventObserver: &eventObserver{
log: log,
Expand Down Expand Up @@ -517,6 +520,7 @@ func (g *GRPCServer) Start(ctx context.Context, lis net.Listener) error {
partyLockedBalances: g.partyLockedBalances,
partyVestingBalances: g.partyVestingBalances,
vestingStats: g.vestingStatsService,
transactionResults: g.transactionResults,
}

protoapi.RegisterTradingDataServiceServer(g.srv, tradingDataSvcV2)
Expand Down
33 changes: 33 additions & 0 deletions datanode/api/trading_data_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"strings"
"time"

"code.vegaprotocol.io/vega/core/events"
"code.vegaprotocol.io/vega/core/risk"
"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/datanode/candlesv2"
Expand Down Expand Up @@ -124,6 +125,7 @@ type TradingDataServiceV2 struct {
partyLockedBalances *service.PartyLockedBalances
partyVestingBalances *service.PartyVestingBalances
vestingStats *service.VestingStats
transactionResults *service.TransactionResults
}

func (t *TradingDataServiceV2) GetPartyVestingStats(
Expand Down Expand Up @@ -4735,3 +4737,34 @@ func (t *TradingDataServiceV2) GetVolumeDiscountStats(ctx context.Context, req *
},
}, nil
}

// ObserveTransactionResults opens a subscription to the transaction results.
func (t *TradingDataServiceV2) ObserveTransactionResults(req *v2.ObserveTransactionResultsRequest, srv v2.TradingDataService_ObserveTransactionResultsServer) error {
// Wrap context from the request into cancellable. We can close internal chan on error.
ctx, cancel := context.WithCancel(srv.Context())
defer cancel()

tradesChan, ref := t.transactionResults.Observe(ctx, t.config.StreamRetries, req.PartyIds, req.Hashes, req.Status)

if t.log.GetLevel() == logging.DebugLevel {
t.log.Debug("Transaction results subscriber - new rpc stream", logging.Uint64("ref", ref))
}

return observeBatch(ctx, t.log, "TransactionResults", tradesChan, ref, func(results []events.TransactionResult) error {
protos := make([]*eventspb.TransactionResult, 0, len(results))
for _, v := range results {
p := v.Proto()
protos = append(protos, &p)
}

batches := batch(protos, snapshotPageSize)

for _, batch := range batches {
response := &v2.ObserveTransactionResultsResponse{TransactionResults: batch}
if err := srv.Send(response); err != nil {
return errors.Wrap(err, "sending transaction results")
}
}
return nil
})
}
3 changes: 3 additions & 0 deletions datanode/api/trading_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
vgtesting "code.vegaprotocol.io/vega/datanode/libs/testing"
"code.vegaprotocol.io/vega/datanode/service"
"code.vegaprotocol.io/vega/datanode/sqlstore"
"code.vegaprotocol.io/vega/datanode/sqlsubscribers"
"code.vegaprotocol.io/vega/libs/subscribers"
"code.vegaprotocol.io/vega/logging"
v2 "code.vegaprotocol.io/vega/protos/data-node/api/v2"
Expand Down Expand Up @@ -160,6 +161,7 @@ func getTestGRPCServer(t *testing.T, ctx context.Context) (tidy func(), conn *gr
paidLiquidityFeesStatsService := service.NewPaidLiquidityFeesStats(sqlstore.NewPaidLiquidityFeesStats(sqlConn))
partyLockedBalances := service.NewPartyLockedBalances(sqlstore.NewPartyLockedBalances(sqlConn))
partyVestingBalances := service.NewPartyVestingBalances(sqlstore.NewPartyVestingBalances(sqlConn))
transactionResults := service.NewTransactionResults(sqlsubscribers.NewTransactionResults(logger))

g := api.NewGRPCServer(
logger,
Expand Down Expand Up @@ -216,6 +218,7 @@ func getTestGRPCServer(t *testing.T, ctx context.Context) (tidy func(), conn *gr
paidLiquidityFeesStatsService,
partyLockedBalances,
partyVestingBalances,
transactionResults,
)
if g == nil {
err = fmt.Errorf("failed to create gRPC server")
Expand Down
20 changes: 20 additions & 0 deletions datanode/gateway/graphql/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions datanode/service/stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package service

import (
"code.vegaprotocol.io/vega/datanode/sqlstore"
"code.vegaprotocol.io/vega/datanode/sqlsubscribers"
)

type (
Expand Down Expand Up @@ -72,6 +73,9 @@ type (
PartyVestingBalances struct {
*sqlstore.PartyVestingBalance
}
TransactionResults struct {
*sqlsubscribers.TransactionResults
}
)

type (
Expand Down Expand Up @@ -206,3 +210,7 @@ func NewPartyLockedBalances(store *sqlstore.PartyLockedBalance) *PartyLockedBala
func NewPartyVestingBalances(store *sqlstore.PartyVestingBalance) *PartyVestingBalances {
return &PartyVestingBalances{PartyVestingBalance: store}
}

func NewTransactionResults(subscriber *sqlsubscribers.TransactionResults) *TransactionResults {
return &TransactionResults{TransactionResults: subscriber}
}
82 changes: 82 additions & 0 deletions datanode/sqlsubscribers/transaction_results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

// Copyright (c) 2022 Gobalsky Labs Limited
//
// Use of this software is governed by the Business Source License included
// in the LICENSE.DATANODE file and at https://www.mariadb.com/bsl11.
//
// Change Date: 18 months from the later of the date of the first publicly
// available Distribution of this version of the repository, and 25 June 2022.
//
// On the date above, in accordance with the Business Source License, use
// of this software will be governed by version 3 or later of the GNU General
// Public License.

package sqlsubscribers

import (
"context"

"code.vegaprotocol.io/vega/core/events"
"code.vegaprotocol.io/vega/datanode/utils"
"code.vegaprotocol.io/vega/libs/slice"
"code.vegaprotocol.io/vega/logging"
)

type TransactionResultEvent interface {
events.Event
TransactionResult() events.TransactionResult
}

type TransactionResults struct {
subscriber
observer utils.Observer[events.TransactionResult]
}

func NewTransactionResults(log *logging.Logger) *TransactionResults {
return &TransactionResults{
observer: utils.NewObserver[events.TransactionResult]("transaction_result", log, 5, 5),
}
}

func (tr *TransactionResults) Push(ctx context.Context, evt events.Event) error {
switch e := evt.(type) {
case TransactionResultEvent:
tr.observer.Notify([]events.TransactionResult{e.TransactionResult()})
return nil
default:
return nil
}
}

func (tr *TransactionResults) Types() []events.Type {
return []events.Type{events.TransactionResultEvent}
}

func (tr *TransactionResults) Observe(ctx context.Context, retries int,
partyIDs []string, hashes []string, status *bool,
) (transactions <-chan []events.TransactionResult, ref uint64) {
ch, ref := tr.observer.Observe(ctx,
retries,
func(tre events.TransactionResult) bool {
partiesOk := len(partyIDs) == 0 || slice.Contains(partyIDs, tre.PartyID())
hashesOk := len(hashes) == 0 || slice.Contains(hashes, tre.Hash())
statusOK := status == nil || *status == tre.Status()

return partiesOk && hashesOk && statusOK
})
return ch, ref
}
Loading
Loading