Skip to content

Commit

Permalink
Merge pull request #9955 from vegaprotocol/feat-9802
Browse files Browse the repository at this point in the history
feat: add transaction result subscription
  • Loading branch information
jeremyletang authored Nov 6, 2023
2 parents bc7c4fa + 749aed9 commit d34d273
Show file tree
Hide file tree
Showing 17 changed files with 2,002 additions and 1,338 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

- [9930](https://github.com/vegaprotocol/vega/issues/9930) - `LiquidityFeeSettings` can now be used in market proposals to choose how liquidity fees are calculated.
- [9982](https://github.com/vegaprotocol/vega/issues/9982) - Remove fees and minimal transfer amount from vested account
- [9955](https://github.com/vegaprotocol/vega/issues/9955) - Add data node subscription for transaction results.

### 🐛 Fixes

Expand All @@ -23,7 +24,6 @@
- [9952](https://github.com/vegaprotocol/vega/issues/9952) - `PnL` flickering fix.
- [9977](https://github.com/vegaprotocol/vega/issues/9977) - Transfer infra fees directly to general account without going through vesting.


## 0.73.0

### 🚨 Breaking changes
Expand All @@ -41,7 +41,6 @@
- [9408](https://github.com/vegaprotocol/vega/issues/9408) - Enforce pagination range.
- [9757](https://github.com/vegaprotocol/vega/issues/9757) - Liquidity provisions `API` shows the pending `LP` instead of the current when an update is accepted by the network.


### 🛠 Improvements

- [8051](https://github.com/vegaprotocol/vega/issues/8051) - Upgrade to comet `0.38.0`
Expand Down
1 change: 1 addition & 0 deletions cmd/data-node/commands/start/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (l *NodeCommand) createGRPCServer(config api.Config) *api.GRPCServer {
l.paidLiquidityFeesStatsService,
l.partyLockedBalancesService,
l.partyVestingBalancesService,
l.transactionResultsService,
)
return grpcServer
}
6 changes: 6 additions & 0 deletions cmd/data-node/commands/start/sqlsubscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type SQLSubscribers struct {
paidLiquidityFeesStatsService *service.PaidLiquidityFeesStats
partyLockedBalancesService *service.PartyLockedBalances
partyVestingBalancesService *service.PartyVestingBalances
transactionResultsService *service.TransactionResults

// Subscribers
accountSub *sqlsubscribers.Account
Expand Down Expand Up @@ -180,6 +181,7 @@ type SQLSubscribers struct {
volumeDiscountProgramSub *sqlsubscribers.VolumeDiscountProgram
paidLiquidityFeesStatsSub *sqlsubscribers.PaidLiquidityFeesStats
vestingSummarySub *sqlsubscribers.VestingBalancesSummary
transactionResultsSub *sqlsubscribers.TransactionResults
}

func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
Expand Down Expand Up @@ -233,6 +235,7 @@ func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
s.volumeDiscountProgramSub,
s.paidLiquidityFeesStatsSub,
s.vestingSummarySub,
s.transactionResultsSub,
}
}

Expand Down Expand Up @@ -345,6 +348,9 @@ func (s *SQLSubscribers) SetupServices(ctx context.Context, log *logging.Logger,
s.partyLockedBalancesService = service.NewPartyLockedBalances(s.partyLockedBalancesStore)
s.partyVestingBalancesService = service.NewPartyVestingBalances(s.partyVestingBalancesStore)

s.transactionResultsSub = sqlsubscribers.NewTransactionResults(log)
s.transactionResultsService = service.NewTransactionResults(s.transactionResultsSub)

toInit := []interface{ Initialise(context.Context) error }{
s.marketDepthService,
s.marketDataService,
Expand Down
16 changes: 16 additions & 0 deletions core/events/transaction_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ type TransactionResult struct {
evt *eventspb.TransactionResult
}

func (tr *TransactionResult) PartyID() string {
return tr.evt.PartyId
}

func (tr *TransactionResult) Status() bool {
return tr.evt.Status
}

func (tr *TransactionResult) Hash() string {
return tr.evt.Hash
}

func NewTransactionResultEventSuccess(
ctx context.Context,
hash, party string,
Expand Down Expand Up @@ -186,6 +198,10 @@ func (t TransactionResult) Proto() eventspb.TransactionResult {
return *t.evt
}

func (t TransactionResult) TransactionResult() TransactionResult {
return t
}

func (t TransactionResult) StreamMessage() *eventspb.BusEvent {
busEvent := newBusEventFromBase(t.Base)
busEvent.Event = &eventspb.BusEvent_TransactionResult{
Expand Down
4 changes: 4 additions & 0 deletions datanode/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type GRPCServer struct {
paidLiquidityFeesStatsService *service.PaidLiquidityFeesStats
partyLockedBalances *service.PartyLockedBalances
partyVestingBalances *service.PartyVestingBalances
transactionResults *service.TransactionResults

eventObserver *eventObserver

Expand Down Expand Up @@ -217,6 +218,7 @@ func NewGRPCServer(
paidLiquidityFeesStatsService *service.PaidLiquidityFeesStats,
partyLockedBalances *service.PartyLockedBalances,
partyVestingBalances *service.PartyVestingBalances,
transactionResults *service.TransactionResults,
) *GRPCServer {
// setup logger
log = log.Named(namedLogger)
Expand Down Expand Up @@ -282,6 +284,7 @@ func NewGRPCServer(
paidLiquidityFeesStatsService: paidLiquidityFeesStatsService,
partyLockedBalances: partyLockedBalances,
partyVestingBalances: partyVestingBalances,
transactionResults: transactionResults,

eventObserver: &eventObserver{
log: log,
Expand Down Expand Up @@ -517,6 +520,7 @@ func (g *GRPCServer) Start(ctx context.Context, lis net.Listener) error {
partyLockedBalances: g.partyLockedBalances,
partyVestingBalances: g.partyVestingBalances,
vestingStats: g.vestingStatsService,
transactionResults: g.transactionResults,
}

protoapi.RegisterTradingDataServiceServer(g.srv, tradingDataSvcV2)
Expand Down
33 changes: 33 additions & 0 deletions datanode/api/trading_data_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"strings"
"time"

"code.vegaprotocol.io/vega/core/events"
"code.vegaprotocol.io/vega/core/risk"
"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/datanode/candlesv2"
Expand Down Expand Up @@ -124,6 +125,7 @@ type TradingDataServiceV2 struct {
partyLockedBalances *service.PartyLockedBalances
partyVestingBalances *service.PartyVestingBalances
vestingStats *service.VestingStats
transactionResults *service.TransactionResults
}

func (t *TradingDataServiceV2) GetPartyVestingStats(
Expand Down Expand Up @@ -4735,3 +4737,34 @@ func (t *TradingDataServiceV2) GetVolumeDiscountStats(ctx context.Context, req *
},
}, nil
}

// ObserveTransactionResults opens a subscription to the transaction results.
func (t *TradingDataServiceV2) ObserveTransactionResults(req *v2.ObserveTransactionResultsRequest, srv v2.TradingDataService_ObserveTransactionResultsServer) error {
// Wrap context from the request into cancellable. We can close internal chan on error.
ctx, cancel := context.WithCancel(srv.Context())
defer cancel()

tradesChan, ref := t.transactionResults.Observe(ctx, t.config.StreamRetries, req.PartyIds, req.Hashes, req.Status)

if t.log.GetLevel() == logging.DebugLevel {
t.log.Debug("Transaction results subscriber - new rpc stream", logging.Uint64("ref", ref))
}

return observeBatch(ctx, t.log, "TransactionResults", tradesChan, ref, func(results []events.TransactionResult) error {
protos := make([]*eventspb.TransactionResult, 0, len(results))
for _, v := range results {
p := v.Proto()
protos = append(protos, &p)
}

batches := batch(protos, snapshotPageSize)

for _, batch := range batches {
response := &v2.ObserveTransactionResultsResponse{TransactionResults: batch}
if err := srv.Send(response); err != nil {
return errors.Wrap(err, "sending transaction results")
}
}
return nil
})
}
3 changes: 3 additions & 0 deletions datanode/api/trading_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
vgtesting "code.vegaprotocol.io/vega/datanode/libs/testing"
"code.vegaprotocol.io/vega/datanode/service"
"code.vegaprotocol.io/vega/datanode/sqlstore"
"code.vegaprotocol.io/vega/datanode/sqlsubscribers"
"code.vegaprotocol.io/vega/libs/subscribers"
"code.vegaprotocol.io/vega/logging"
v2 "code.vegaprotocol.io/vega/protos/data-node/api/v2"
Expand Down Expand Up @@ -160,6 +161,7 @@ func getTestGRPCServer(t *testing.T, ctx context.Context) (tidy func(), conn *gr
paidLiquidityFeesStatsService := service.NewPaidLiquidityFeesStats(sqlstore.NewPaidLiquidityFeesStats(sqlConn))
partyLockedBalances := service.NewPartyLockedBalances(sqlstore.NewPartyLockedBalances(sqlConn))
partyVestingBalances := service.NewPartyVestingBalances(sqlstore.NewPartyVestingBalances(sqlConn))
transactionResults := service.NewTransactionResults(sqlsubscribers.NewTransactionResults(logger))

g := api.NewGRPCServer(
logger,
Expand Down Expand Up @@ -216,6 +218,7 @@ func getTestGRPCServer(t *testing.T, ctx context.Context) (tidy func(), conn *gr
paidLiquidityFeesStatsService,
partyLockedBalances,
partyVestingBalances,
transactionResults,
)
if g == nil {
err = fmt.Errorf("failed to create gRPC server")
Expand Down
20 changes: 20 additions & 0 deletions datanode/gateway/graphql/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions datanode/service/stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package service

import (
"code.vegaprotocol.io/vega/datanode/sqlstore"
"code.vegaprotocol.io/vega/datanode/sqlsubscribers"
)

type (
Expand Down Expand Up @@ -72,6 +73,9 @@ type (
PartyVestingBalances struct {
*sqlstore.PartyVestingBalance
}
TransactionResults struct {
*sqlsubscribers.TransactionResults
}
)

type (
Expand Down Expand Up @@ -206,3 +210,7 @@ func NewPartyLockedBalances(store *sqlstore.PartyLockedBalance) *PartyLockedBala
func NewPartyVestingBalances(store *sqlstore.PartyVestingBalance) *PartyVestingBalances {
return &PartyVestingBalances{PartyVestingBalance: store}
}

func NewTransactionResults(subscriber *sqlsubscribers.TransactionResults) *TransactionResults {
return &TransactionResults{TransactionResults: subscriber}
}
82 changes: 82 additions & 0 deletions datanode/sqlsubscribers/transaction_results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

// Copyright (c) 2022 Gobalsky Labs Limited
//
// Use of this software is governed by the Business Source License included
// in the LICENSE.DATANODE file and at https://www.mariadb.com/bsl11.
//
// Change Date: 18 months from the later of the date of the first publicly
// available Distribution of this version of the repository, and 25 June 2022.
//
// On the date above, in accordance with the Business Source License, use
// of this software will be governed by version 3 or later of the GNU General
// Public License.

package sqlsubscribers

import (
"context"

"code.vegaprotocol.io/vega/core/events"
"code.vegaprotocol.io/vega/datanode/utils"
"code.vegaprotocol.io/vega/libs/slice"
"code.vegaprotocol.io/vega/logging"
)

type TransactionResultEvent interface {
events.Event
TransactionResult() events.TransactionResult
}

type TransactionResults struct {
subscriber
observer utils.Observer[events.TransactionResult]
}

func NewTransactionResults(log *logging.Logger) *TransactionResults {
return &TransactionResults{
observer: utils.NewObserver[events.TransactionResult]("transaction_result", log, 5, 5),
}
}

func (tr *TransactionResults) Push(ctx context.Context, evt events.Event) error {
switch e := evt.(type) {
case TransactionResultEvent:
tr.observer.Notify([]events.TransactionResult{e.TransactionResult()})
return nil
default:
return nil
}
}

func (tr *TransactionResults) Types() []events.Type {
return []events.Type{events.TransactionResultEvent}
}

func (tr *TransactionResults) Observe(ctx context.Context, retries int,
partyIDs []string, hashes []string, status *bool,
) (transactions <-chan []events.TransactionResult, ref uint64) {
ch, ref := tr.observer.Observe(ctx,
retries,
func(tre events.TransactionResult) bool {
partiesOk := len(partyIDs) == 0 || slice.Contains(partyIDs, tre.PartyID())
hashesOk := len(hashes) == 0 || slice.Contains(hashes, tre.Hash())
statusOK := status == nil || *status == tre.Status()

return partiesOk && hashesOk && statusOK
})
return ch, ref
}
Loading

0 comments on commit d34d273

Please sign in to comment.