Skip to content

Commit

Permalink
Merge pull request #11223 from vegaprotocol/fix_ti_schema
Browse files Browse the repository at this point in the history
feat: publish ongoing game data
  • Loading branch information
jeremyletang authored May 14, 2024
2 parents 4d001a6 + d681c09 commit 9b6a803
Show file tree
Hide file tree
Showing 69 changed files with 28,810 additions and 19,124 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

### 🛠 Improvements

- [](https://github.com/vegaprotocol/vega/issues/xxx)
- [11209](https://github.com/vegaprotocol/vega/issues/11209) - Publish ongoing games data.

### 🐛 Fixes

Expand Down
1 change: 1 addition & 0 deletions cmd/data-node/commands/start/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (l *NodeCommand) createGRPCServer(config api.Config) *api.GRPCServer {
l.gamesService,
l.marginModesService,
l.timeWeightedNotionalPositionService,
l.gameScoreService,
)
return grpcServer
}
8 changes: 7 additions & 1 deletion cmd/data-node/commands/start/sqlsubscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type SQLSubscribers struct {
gamesStore *sqlstore.Games
marginModesStore *sqlstore.MarginModes
timeWeightedNotionalPositionStore *sqlstore.TimeWeightedNotionalPosition
gameScoreStore *sqlstore.GameScores

// Services
candleService *candlesv2.Svc
Expand Down Expand Up @@ -138,6 +139,7 @@ type SQLSubscribers struct {
gamesService *service.Games
marginModesService *service.MarginModes
timeWeightedNotionalPositionService *service.TimeWeightedNotionalPosition
gameScoreService *service.GameScore

// Subscribers
accountSub *sqlsubscribers.Account
Expand Down Expand Up @@ -190,6 +192,7 @@ type SQLSubscribers struct {
transactionResultsSub *sqlsubscribers.TransactionResults
marginModesSub *sqlsubscribers.MarginModes
timeWeightedNotionalPositionSub *sqlsubscribers.TimeWeightedNotionalPosition
gameScoreSub *sqlsubscribers.GameScore
}

func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
Expand Down Expand Up @@ -246,6 +249,7 @@ func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
s.transactionResultsSub,
s.marginModesSub,
s.timeWeightedNotionalPositionSub,
s.gameScoreSub,
}
}

Expand Down Expand Up @@ -307,6 +311,7 @@ func (s *SQLSubscribers) CreateAllStores(ctx context.Context, Log *logging.Logge
s.gamesStore = sqlstore.NewGames(transactionalConnectionSource)
s.marginModesStore = sqlstore.NewMarginModes(transactionalConnectionSource)
s.timeWeightedNotionalPositionStore = sqlstore.NewTimeWeightedNotionalPosition(transactionalConnectionSource)
s.gameScoreStore = sqlstore.NewGameScores(transactionalConnectionSource)
}

func (s *SQLSubscribers) SetupServices(ctx context.Context, log *logging.Logger, candlesConfig candlesv2.Config) error {
Expand Down Expand Up @@ -363,7 +368,7 @@ func (s *SQLSubscribers) SetupServices(ctx context.Context, log *logging.Logger,
s.gamesService = service.NewGames(s.gamesStore)
s.marginModesService = service.NewMarginModes(s.marginModesStore)
s.timeWeightedNotionalPositionService = service.NewTimeWeightedNotionalPosition(s.timeWeightedNotionalPositionStore)

s.gameScoreService = service.NewGameScore(s.gameScoreStore, log)
s.transactionResultsSub = sqlsubscribers.NewTransactionResults(log)
s.transactionResultsService = service.NewTransactionResults(s.transactionResultsSub)

Expand Down Expand Up @@ -432,4 +437,5 @@ func (s *SQLSubscribers) SetupSQLSubscribers() {
s.vestingSummarySub = sqlsubscribers.NewVestingBalancesSummary(s.partyVestingBalancesStore, s.partyLockedBalancesStore)
s.marginModesSub = sqlsubscribers.NewMarginModes(s.marginModesService)
s.timeWeightedNotionalPositionSub = sqlsubscribers.NewTimeWeightedNotionalPosition(s.timeWeightedNotionalPositionService)
s.gameScoreSub = sqlsubscribers.NewGameScore(s.gameScoreStore)
}
1 change: 0 additions & 1 deletion core/banking/cancel_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func TestCancelTransfer(t *testing.T) {

e.assets.EXPECT().Get(gomock.Any()).Times(2).Return(
assets.NewAsset(&mockAsset{quantum: num.DecimalFromFloat(100), name: asset}), nil)
e.tsvc.EXPECT().GetTimeNow().Times(2)
e.broker.EXPECT().Send(gomock.Any()).Times(2)
assert.NoError(t, e.TransferFunds(ctx, transfer))

Expand Down
4 changes: 3 additions & 1 deletion core/banking/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sort"
"sync/atomic"
"time"

"code.vegaprotocol.io/vega/core/assets"
"code.vegaprotocol.io/vega/core/events"
Expand Down Expand Up @@ -272,6 +273,7 @@ func (e *Engine) loadRecurringTransfers(
ctx context.Context, r *checkpoint.RecurringTransfers,
) []events.Event {
evts := []events.Event{}
e.nextMetricUpdate = time.Unix(0, r.NextMetricUpdate)
for _, v := range r.RecurringTransfers {
transfer := types.RecurringTransferFromEvent(v)
e.recurringTransfers = append(e.recurringTransfers, transfer)
Expand Down Expand Up @@ -334,7 +336,7 @@ func (e *Engine) getRecurringTransfers() *checkpoint.RecurringTransfers {
for _, v := range e.recurringTransfers {
out.RecurringTransfers = append(out.RecurringTransfers, v.IntoEvent(nil, e.getGameID(v)))
}

out.NextMetricUpdate = e.nextMetricUpdate.UnixNano()
return out
}

Expand Down
7 changes: 0 additions & 7 deletions core/banking/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ func TestDepositFinalisedAfterCheckpoint(t *testing.T) {
func testSimpledScheduledTransfer(t *testing.T) {
e := getTestEngine(t)

e.tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(10, 0)
}).AnyTimes()

// let's do a massive fee, easy to test.
e.OnTransferFeeFactorUpdate(context.Background(), num.NewDecimalFromFloat(1))
e.OnTick(context.Background(), time.Unix(10, 0))
Expand Down Expand Up @@ -229,7 +224,6 @@ func TestGovernanceScheduledTransfer(t *testing.T) {
}

e.broker.EXPECT().Send(gomock.Any()).Times(1)
e.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Unix(10, 0))
require.NoError(t, e.NewGovernanceTransfer(ctx, "1", "some reference", transfer))

checkp, err := e.Checkpoint()
Expand Down Expand Up @@ -290,7 +284,6 @@ func TestGovernanceRecurringTransfer(t *testing.T) {
}

e.broker.EXPECT().Send(gomock.Any()).Times(1)
e.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Unix(10, 0))
require.NoError(t, e.NewGovernanceTransfer(ctx, "1", "some reference", transfer))

checkp, err := e.Checkpoint()
Expand Down
3 changes: 0 additions & 3 deletions core/banking/deduplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestAssetActionDeduplication(t *testing.T) {
asset1 := assets.NewAsset(erc20Asset)

t.Run("Generate asset list", func(t *testing.T) {
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, ""))

Expand All @@ -71,7 +70,6 @@ func TestAssetActionDeduplication(t *testing.T) {
})

t.Run("Generate duplicated asset list and ", func(t *testing.T) {
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, ""))

Expand All @@ -88,7 +86,6 @@ func TestAssetActionDeduplication(t *testing.T) {
// set, which might happen with the introduction of the second bridge. We have
// to ensure the event is acknowledge as a duplicate.
t.Run("Generate a duplicated event but updated with the chain ID", func(t *testing.T) {
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, "1"))

Expand Down
29 changes: 29 additions & 0 deletions core/banking/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type MarketActivityTracker interface {
MarkPaidProposer(asset, market, payoutAsset string, marketsInScope []string, funder string)
MarketTrackedForAsset(market, asset string) bool
TeamStatsForMarkets(allMarketsForAssets, onlyTheseMarkets []string) map[string]map[string]*num.Uint
PublishGameMetric(ctx context.Context, dispatchStrategy []*vega.DispatchStrategy, now time.Time)
}

type EthereumEventSource interface {
Expand Down Expand Up @@ -206,6 +207,9 @@ type Engine struct {

maxGovTransferQunatumMultiplier num.Decimal
maxGovTransferFraction num.Decimal

metricUpdateFrequency time.Duration
nextMetricUpdate time.Time
}

type withdrawalRef struct {
Expand Down Expand Up @@ -259,6 +263,7 @@ func New(log *logging.Logger,
minTransferQuantumMultiple: num.DecimalZero(),
minWithdrawQuantumMultiple: num.DecimalZero(),
marketActivityTracker: marketActivityTracker,
nextMetricUpdate: time.Time{},
hashToStrategy: map[string]*dispatchStrategyCacheEntry{},
primaryBridgeState: &bridgeState{
active: true,
Expand Down Expand Up @@ -310,6 +315,28 @@ func (e *Engine) ReloadConf(cfg Config) {
e.cfg = cfg
}

func (e *Engine) OnBlockEnd(ctx context.Context, now time.Time) {
if !now.Before(e.nextMetricUpdate) {
e.publishMetricData(ctx, now)
e.nextMetricUpdate = now.Add(e.metricUpdateFrequency)
}
}

// publishMetricData requests the market activity tracker to publish and event
// for each game with the current metric data for each party.
func (e *Engine) publishMetricData(ctx context.Context, now time.Time) {
hashes := make([]string, 0, len(e.hashToStrategy))
for hash := range e.hashToStrategy {
hashes = append(hashes, hash)
}
sort.Strings(hashes)
dss := make([]*vega.DispatchStrategy, 0, len(hashes))
for _, hash := range hashes {
dss = append(dss, e.hashToStrategy[hash].ds)
}
e.marketActivityTracker.PublishGameMetric(ctx, dss, now)
}

func (e *Engine) OnEpoch(ctx context.Context, ep types.Epoch) {
switch ep.Action {
case proto.EpochAction_EPOCH_ACTION_START:
Expand All @@ -320,6 +347,8 @@ func (e *Engine) OnEpoch(ctx context.Context, ep types.Epoch) {
e.distributeRecurringGovernanceTransfers(ctx)
e.applyPendingFeeDiscountsUpdates(ctx)
e.sendTeamsStats(ctx, ep.Seq)
// as the metrics are going to be published here, we want to progress the next update.
e.nextMetricUpdate = e.timeService.GetTimeNow().Add(e.metricUpdateFrequency)
default:
e.log.Panic("epoch action should never be UNSPECIFIED", logging.String("epoch", ep.String()))
}
Expand Down
10 changes: 4 additions & 6 deletions core/banking/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func getTestEngine(t *testing.T) *testEngine {
col := mocks.NewMockCollateral(ctrl)
assets := mocks.NewMockAssets(ctrl)
tsvc := mocks.NewMockTimeService(ctrl)
tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(10, 0)
}).AnyTimes()
notary := mocks.NewMockNotary(ctrl)
broker := bmocks.NewMockBroker(ctrl)
top := mocks.NewMockTopology(ctrl)
Expand Down Expand Up @@ -118,7 +122,6 @@ func testDepositSuccess(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.NoError(t, err)

Expand Down Expand Up @@ -147,7 +150,6 @@ func testDepositSuccessNoTxDuplicate(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
require.NoError(t, eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42))

// then we call the callback from the fake witness
Expand All @@ -161,7 +163,6 @@ func testDepositSuccessNoTxDuplicate(t *testing.T) {
eng.OnTick(context.Background(), time.Now())

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
require.NoError(t, eng.DepositBuiltinAsset(context.Background(), bad, "depositid2", 43))

// then we call the callback from the fake witness
Expand All @@ -188,7 +189,6 @@ func testDepositFailure(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.NoError(t, err)

Expand Down Expand Up @@ -217,7 +217,6 @@ func testDepositError(t *testing.T) {
eng.witness.err = expectError

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.EqualError(t, err, expectError.Error())
}
Expand All @@ -236,7 +235,6 @@ func testDepositFailureNotBuiltin(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.EqualError(t, err, expectError.Error())
}
Expand Down
12 changes: 12 additions & 0 deletions core/banking/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9b6a803

Please sign in to comment.