Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: publish ongoing game data #11223

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

### 🛠 Improvements

- [](https://github.com/vegaprotocol/vega/issues/xxx)
- [11209](https://github.com/vegaprotocol/vega/issues/11209) - Publish ongoing games data.

### 🐛 Fixes

Expand Down
1 change: 1 addition & 0 deletions cmd/data-node/commands/start/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (l *NodeCommand) createGRPCServer(config api.Config) *api.GRPCServer {
l.gamesService,
l.marginModesService,
l.timeWeightedNotionalPositionService,
l.gameScoreService,
)
return grpcServer
}
8 changes: 7 additions & 1 deletion cmd/data-node/commands/start/sqlsubscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type SQLSubscribers struct {
gamesStore *sqlstore.Games
marginModesStore *sqlstore.MarginModes
timeWeightedNotionalPositionStore *sqlstore.TimeWeightedNotionalPosition
gameScoreStore *sqlstore.GameScores

// Services
candleService *candlesv2.Svc
Expand Down Expand Up @@ -138,6 +139,7 @@ type SQLSubscribers struct {
gamesService *service.Games
marginModesService *service.MarginModes
timeWeightedNotionalPositionService *service.TimeWeightedNotionalPosition
gameScoreService *service.GameScore

// Subscribers
accountSub *sqlsubscribers.Account
Expand Down Expand Up @@ -190,6 +192,7 @@ type SQLSubscribers struct {
transactionResultsSub *sqlsubscribers.TransactionResults
marginModesSub *sqlsubscribers.MarginModes
timeWeightedNotionalPositionSub *sqlsubscribers.TimeWeightedNotionalPosition
gameScoreSub *sqlsubscribers.GameScore
}

func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
Expand Down Expand Up @@ -246,6 +249,7 @@ func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
s.transactionResultsSub,
s.marginModesSub,
s.timeWeightedNotionalPositionSub,
s.gameScoreSub,
}
}

Expand Down Expand Up @@ -307,6 +311,7 @@ func (s *SQLSubscribers) CreateAllStores(ctx context.Context, Log *logging.Logge
s.gamesStore = sqlstore.NewGames(transactionalConnectionSource)
s.marginModesStore = sqlstore.NewMarginModes(transactionalConnectionSource)
s.timeWeightedNotionalPositionStore = sqlstore.NewTimeWeightedNotionalPosition(transactionalConnectionSource)
s.gameScoreStore = sqlstore.NewGameScores(transactionalConnectionSource)
}

func (s *SQLSubscribers) SetupServices(ctx context.Context, log *logging.Logger, candlesConfig candlesv2.Config) error {
Expand Down Expand Up @@ -363,7 +368,7 @@ func (s *SQLSubscribers) SetupServices(ctx context.Context, log *logging.Logger,
s.gamesService = service.NewGames(s.gamesStore)
s.marginModesService = service.NewMarginModes(s.marginModesStore)
s.timeWeightedNotionalPositionService = service.NewTimeWeightedNotionalPosition(s.timeWeightedNotionalPositionStore)

s.gameScoreService = service.NewGameScore(s.gameScoreStore, log)
s.transactionResultsSub = sqlsubscribers.NewTransactionResults(log)
s.transactionResultsService = service.NewTransactionResults(s.transactionResultsSub)

Expand Down Expand Up @@ -432,4 +437,5 @@ func (s *SQLSubscribers) SetupSQLSubscribers() {
s.vestingSummarySub = sqlsubscribers.NewVestingBalancesSummary(s.partyVestingBalancesStore, s.partyLockedBalancesStore)
s.marginModesSub = sqlsubscribers.NewMarginModes(s.marginModesService)
s.timeWeightedNotionalPositionSub = sqlsubscribers.NewTimeWeightedNotionalPosition(s.timeWeightedNotionalPositionService)
s.gameScoreSub = sqlsubscribers.NewGameScore(s.gameScoreStore)
}
1 change: 0 additions & 1 deletion core/banking/cancel_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func TestCancelTransfer(t *testing.T) {

e.assets.EXPECT().Get(gomock.Any()).Times(2).Return(
assets.NewAsset(&mockAsset{quantum: num.DecimalFromFloat(100), name: asset}), nil)
e.tsvc.EXPECT().GetTimeNow().Times(2)
e.broker.EXPECT().Send(gomock.Any()).Times(2)
assert.NoError(t, e.TransferFunds(ctx, transfer))

Expand Down
4 changes: 3 additions & 1 deletion core/banking/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sort"
"sync/atomic"
"time"

"code.vegaprotocol.io/vega/core/assets"
"code.vegaprotocol.io/vega/core/events"
Expand Down Expand Up @@ -272,6 +273,7 @@ func (e *Engine) loadRecurringTransfers(
ctx context.Context, r *checkpoint.RecurringTransfers,
) []events.Event {
evts := []events.Event{}
e.nextMetricUpdate = time.Unix(0, r.NextMetricUpdate)
for _, v := range r.RecurringTransfers {
transfer := types.RecurringTransferFromEvent(v)
e.recurringTransfers = append(e.recurringTransfers, transfer)
Expand Down Expand Up @@ -334,7 +336,7 @@ func (e *Engine) getRecurringTransfers() *checkpoint.RecurringTransfers {
for _, v := range e.recurringTransfers {
out.RecurringTransfers = append(out.RecurringTransfers, v.IntoEvent(nil, e.getGameID(v)))
}

out.NextMetricUpdate = e.nextMetricUpdate.UnixNano()
return out
}

Expand Down
7 changes: 0 additions & 7 deletions core/banking/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ func TestDepositFinalisedAfterCheckpoint(t *testing.T) {
func testSimpledScheduledTransfer(t *testing.T) {
e := getTestEngine(t)

e.tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(10, 0)
}).AnyTimes()

// let's do a massive fee, easy to test.
e.OnTransferFeeFactorUpdate(context.Background(), num.NewDecimalFromFloat(1))
e.OnTick(context.Background(), time.Unix(10, 0))
Expand Down Expand Up @@ -229,7 +224,6 @@ func TestGovernanceScheduledTransfer(t *testing.T) {
}

e.broker.EXPECT().Send(gomock.Any()).Times(1)
e.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Unix(10, 0))
require.NoError(t, e.NewGovernanceTransfer(ctx, "1", "some reference", transfer))

checkp, err := e.Checkpoint()
Expand Down Expand Up @@ -290,7 +284,6 @@ func TestGovernanceRecurringTransfer(t *testing.T) {
}

e.broker.EXPECT().Send(gomock.Any()).Times(1)
e.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Unix(10, 0))
require.NoError(t, e.NewGovernanceTransfer(ctx, "1", "some reference", transfer))

checkp, err := e.Checkpoint()
Expand Down
3 changes: 0 additions & 3 deletions core/banking/deduplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestAssetActionDeduplication(t *testing.T) {
asset1 := assets.NewAsset(erc20Asset)

t.Run("Generate asset list", func(t *testing.T) {
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, ""))

Expand All @@ -71,7 +70,6 @@ func TestAssetActionDeduplication(t *testing.T) {
})

t.Run("Generate duplicated asset list and ", func(t *testing.T) {
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, ""))

Expand All @@ -88,7 +86,6 @@ func TestAssetActionDeduplication(t *testing.T) {
// set, which might happen with the introduction of the second bridge. We have
// to ensure the event is acknowledge as a duplicate.
t.Run("Generate a duplicated event but updated with the chain ID", func(t *testing.T) {
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, "1"))

Expand Down
29 changes: 29 additions & 0 deletions core/banking/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type MarketActivityTracker interface {
MarkPaidProposer(asset, market, payoutAsset string, marketsInScope []string, funder string)
MarketTrackedForAsset(market, asset string) bool
TeamStatsForMarkets(allMarketsForAssets, onlyTheseMarkets []string) map[string]map[string]*num.Uint
PublishGameMetric(ctx context.Context, dispatchStrategy []*vega.DispatchStrategy, now time.Time)
}

type EthereumEventSource interface {
Expand Down Expand Up @@ -206,6 +207,9 @@ type Engine struct {

maxGovTransferQunatumMultiplier num.Decimal
maxGovTransferFraction num.Decimal

metricUpdateFrequency time.Duration
nextMetricUpdate time.Time
}

type withdrawalRef struct {
Expand Down Expand Up @@ -259,6 +263,7 @@ func New(log *logging.Logger,
minTransferQuantumMultiple: num.DecimalZero(),
minWithdrawQuantumMultiple: num.DecimalZero(),
marketActivityTracker: marketActivityTracker,
nextMetricUpdate: time.Time{},
hashToStrategy: map[string]*dispatchStrategyCacheEntry{},
primaryBridgeState: &bridgeState{
active: true,
Expand Down Expand Up @@ -310,6 +315,28 @@ func (e *Engine) ReloadConf(cfg Config) {
e.cfg = cfg
}

func (e *Engine) OnBlockEnd(ctx context.Context, now time.Time) {
if !now.Before(e.nextMetricUpdate) {
e.publishMetricData(ctx, now)
e.nextMetricUpdate = now.Add(e.metricUpdateFrequency)
}
}

// publishMetricData requests the market activity tracker to publish and event
// for each game with the current metric data for each party.
func (e *Engine) publishMetricData(ctx context.Context, now time.Time) {
hashes := make([]string, 0, len(e.hashToStrategy))
for hash := range e.hashToStrategy {
hashes = append(hashes, hash)
}
sort.Strings(hashes)
dss := make([]*vega.DispatchStrategy, 0, len(hashes))
for _, hash := range hashes {
dss = append(dss, e.hashToStrategy[hash].ds)
}
e.marketActivityTracker.PublishGameMetric(ctx, dss, now)
}

func (e *Engine) OnEpoch(ctx context.Context, ep types.Epoch) {
switch ep.Action {
case proto.EpochAction_EPOCH_ACTION_START:
Expand All @@ -320,6 +347,8 @@ func (e *Engine) OnEpoch(ctx context.Context, ep types.Epoch) {
e.distributeRecurringGovernanceTransfers(ctx)
e.applyPendingFeeDiscountsUpdates(ctx)
e.sendTeamsStats(ctx, ep.Seq)
// as the metrics are going to be published here, we want to progress the next update.
e.nextMetricUpdate = e.timeService.GetTimeNow().Add(e.metricUpdateFrequency)
default:
e.log.Panic("epoch action should never be UNSPECIFIED", logging.String("epoch", ep.String()))
}
Expand Down
10 changes: 4 additions & 6 deletions core/banking/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func getTestEngine(t *testing.T) *testEngine {
col := mocks.NewMockCollateral(ctrl)
assets := mocks.NewMockAssets(ctrl)
tsvc := mocks.NewMockTimeService(ctrl)
tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(10, 0)
}).AnyTimes()
notary := mocks.NewMockNotary(ctrl)
broker := bmocks.NewMockBroker(ctrl)
top := mocks.NewMockTopology(ctrl)
Expand Down Expand Up @@ -118,7 +122,6 @@ func testDepositSuccess(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.NoError(t, err)

Expand Down Expand Up @@ -147,7 +150,6 @@ func testDepositSuccessNoTxDuplicate(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
require.NoError(t, eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42))

// then we call the callback from the fake witness
Expand All @@ -161,7 +163,6 @@ func testDepositSuccessNoTxDuplicate(t *testing.T) {
eng.OnTick(context.Background(), time.Now())

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
require.NoError(t, eng.DepositBuiltinAsset(context.Background(), bad, "depositid2", 43))

// then we call the callback from the fake witness
Expand All @@ -188,7 +189,6 @@ func testDepositFailure(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.NoError(t, err)

Expand Down Expand Up @@ -217,7 +217,6 @@ func testDepositError(t *testing.T) {
eng.witness.err = expectError

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.EqualError(t, err, expectError.Error())
}
Expand All @@ -236,7 +235,6 @@ func testDepositFailureNotBuiltin(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.EqualError(t, err, expectError.Error())
}
Expand Down
12 changes: 12 additions & 0 deletions core/banking/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading