Skip to content

Commit

Permalink
feat: publish ongoing game data
Browse files Browse the repository at this point in the history
  • Loading branch information
ze97286 committed May 1, 2024
1 parent 3323afc commit 38bc93c
Show file tree
Hide file tree
Showing 35 changed files with 4,300 additions and 3,359 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [11143](https://github.com/vegaprotocol/vega/issues/11143) - Add support for new asset proposal in batch governance proposal
- [11182](https://github.com/vegaprotocol/vega/issues/11182) - Remove reduce only restriction on spot markets stop orders.
- [11153](https://github.com/vegaprotocol/vega/issues/11153) - Add check on start-up that bridge `RPC-endpoints` are functional.
- [11209](https://github.com/vegaprotocol/vega/issues/11209) - Publish ongoing games data.

### 🐛 Fixes

Expand Down
1 change: 0 additions & 1 deletion core/banking/cancel_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func TestCancelTransfer(t *testing.T) {

e.assets.EXPECT().Get(gomock.Any()).Times(2).Return(
assets.NewAsset(&mockAsset{quantum: num.DecimalFromFloat(100), name: asset}), nil)
e.tsvc.EXPECT().GetTimeNow().Times(2)
e.broker.EXPECT().Send(gomock.Any()).Times(2)
assert.NoError(t, e.TransferFunds(ctx, transfer))

Expand Down
4 changes: 3 additions & 1 deletion core/banking/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sort"
"sync/atomic"
"time"

"code.vegaprotocol.io/vega/core/assets"
"code.vegaprotocol.io/vega/core/events"
Expand Down Expand Up @@ -272,6 +273,7 @@ func (e *Engine) loadRecurringTransfers(
ctx context.Context, r *checkpoint.RecurringTransfers,
) []events.Event {
evts := []events.Event{}
e.nextMetricUpdate = time.Unix(0, r.NextMetricUpdate)
for _, v := range r.RecurringTransfers {
transfer := types.RecurringTransferFromEvent(v)
e.recurringTransfers = append(e.recurringTransfers, transfer)
Expand Down Expand Up @@ -334,7 +336,7 @@ func (e *Engine) getRecurringTransfers() *checkpoint.RecurringTransfers {
for _, v := range e.recurringTransfers {
out.RecurringTransfers = append(out.RecurringTransfers, v.IntoEvent(nil, e.getGameID(v)))
}

out.NextMetricUpdate = e.nextMetricUpdate.UnixNano()
return out
}

Expand Down
7 changes: 0 additions & 7 deletions core/banking/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ func TestDepositFinalisedAfterCheckpoint(t *testing.T) {
func testSimpledScheduledTransfer(t *testing.T) {
e := getTestEngine(t)

e.tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(10, 0)
}).AnyTimes()

// let's do a massive fee, easy to test.
e.OnTransferFeeFactorUpdate(context.Background(), num.NewDecimalFromFloat(1))
e.OnTick(context.Background(), time.Unix(10, 0))
Expand Down Expand Up @@ -229,7 +224,6 @@ func TestGovernanceScheduledTransfer(t *testing.T) {
}

e.broker.EXPECT().Send(gomock.Any()).Times(1)
e.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Unix(10, 0))
require.NoError(t, e.NewGovernanceTransfer(ctx, "1", "some reference", transfer))

checkp, err := e.Checkpoint()
Expand Down Expand Up @@ -290,7 +284,6 @@ func TestGovernanceRecurringTransfer(t *testing.T) {
}

e.broker.EXPECT().Send(gomock.Any()).Times(1)
e.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Unix(10, 0))
require.NoError(t, e.NewGovernanceTransfer(ctx, "1", "some reference", transfer))

checkp, err := e.Checkpoint()
Expand Down
3 changes: 0 additions & 3 deletions core/banking/deduplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestAssetActionDeduplication(t *testing.T) {
asset1 := assets.NewAsset(erc20Asset)

t.Run("Generate asset list", func(t *testing.T) {
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, ""))

Expand All @@ -71,7 +70,6 @@ func TestAssetActionDeduplication(t *testing.T) {
})

t.Run("Generate duplicated asset list and ", func(t *testing.T) {
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, ""))

Expand All @@ -88,7 +86,6 @@ func TestAssetActionDeduplication(t *testing.T) {
// set, which might happen with the introduction of the second bridge. We have
// to ensure the event is acknowledge as a duplicate.
t.Run("Generate a duplicated event but updated with the chain ID", func(t *testing.T) {
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, "1"))

Expand Down
29 changes: 29 additions & 0 deletions core/banking/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type MarketActivityTracker interface {
MarkPaidProposer(asset, market, payoutAsset string, marketsInScope []string, funder string)
MarketTrackedForAsset(market, asset string) bool
TeamStatsForMarkets(allMarketsForAssets, onlyTheseMarkets []string) map[string]map[string]*num.Uint
PublishGameMetric(ctx context.Context, dispatchStrategy []*vega.DispatchStrategy, now time.Time)
}

type EthereumEventSource interface {
Expand Down Expand Up @@ -206,6 +207,9 @@ type Engine struct {

maxGovTransferQunatumMultiplier num.Decimal
maxGovTransferFraction num.Decimal

metricUpdateFrequency time.Duration
nextMetricUpdate time.Time
}

type withdrawalRef struct {
Expand Down Expand Up @@ -259,6 +263,7 @@ func New(log *logging.Logger,
minTransferQuantumMultiple: num.DecimalZero(),
minWithdrawQuantumMultiple: num.DecimalZero(),
marketActivityTracker: marketActivityTracker,
nextMetricUpdate: time.Time{},
hashToStrategy: map[string]*dispatchStrategyCacheEntry{},
primaryBridgeState: &bridgeState{
active: true,
Expand Down Expand Up @@ -310,6 +315,28 @@ func (e *Engine) ReloadConf(cfg Config) {
e.cfg = cfg
}

func (e *Engine) OnBlockEnd(ctx context.Context, now time.Time) {
if !now.Before(e.nextMetricUpdate) {
e.publishMetricData(ctx, now)
e.nextMetricUpdate = now.Add(e.metricUpdateFrequency)
}
}

// publishMetricData requests the market activity tracker to publish and event
// for each game with the current metric data for each party.
func (e *Engine) publishMetricData(ctx context.Context, now time.Time) {
hashes := make([]string, 0, len(e.hashToStrategy))
for hash := range e.hashToStrategy {
hashes = append(hashes, hash)
}
sort.Strings(hashes)
dss := make([]*vega.DispatchStrategy, 0, len(hashes))
for _, hash := range hashes {
dss = append(dss, e.hashToStrategy[hash].ds)
}
e.marketActivityTracker.PublishGameMetric(ctx, dss, now)
}

func (e *Engine) OnEpoch(ctx context.Context, ep types.Epoch) {
switch ep.Action {
case proto.EpochAction_EPOCH_ACTION_START:
Expand All @@ -320,6 +347,8 @@ func (e *Engine) OnEpoch(ctx context.Context, ep types.Epoch) {
e.distributeRecurringGovernanceTransfers(ctx)
e.applyPendingFeeDiscountsUpdates(ctx)
e.sendTeamsStats(ctx, ep.Seq)
// as the metrics are going to be published here, we want to progress the next update.
e.nextMetricUpdate = e.timeService.GetTimeNow().Add(e.metricUpdateFrequency)
default:
e.log.Panic("epoch action should never be UNSPECIFIED", logging.String("epoch", ep.String()))
}
Expand Down
10 changes: 4 additions & 6 deletions core/banking/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func getTestEngine(t *testing.T) *testEngine {
col := mocks.NewMockCollateral(ctrl)
assets := mocks.NewMockAssets(ctrl)
tsvc := mocks.NewMockTimeService(ctrl)
tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(10, 0)
}).AnyTimes()
notary := mocks.NewMockNotary(ctrl)
broker := bmocks.NewMockBroker(ctrl)
top := mocks.NewMockTopology(ctrl)
Expand Down Expand Up @@ -118,7 +122,6 @@ func testDepositSuccess(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.NoError(t, err)

Expand Down Expand Up @@ -147,7 +150,6 @@ func testDepositSuccessNoTxDuplicate(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
require.NoError(t, eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42))

// then we call the callback from the fake witness
Expand All @@ -161,7 +163,6 @@ func testDepositSuccessNoTxDuplicate(t *testing.T) {
eng.OnTick(context.Background(), time.Now())

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
require.NoError(t, eng.DepositBuiltinAsset(context.Background(), bad, "depositid2", 43))

// then we call the callback from the fake witness
Expand All @@ -188,7 +189,6 @@ func testDepositFailure(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.NoError(t, err)

Expand Down Expand Up @@ -217,7 +217,6 @@ func testDepositError(t *testing.T) {
eng.witness.err = expectError

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.EqualError(t, err, expectError.Error())
}
Expand All @@ -236,7 +235,6 @@ func testDepositFailureNotBuiltin(t *testing.T) {
}

// call the deposit function
eng.tsvc.EXPECT().GetTimeNow().Times(1).Return(time.Now())
err := eng.DepositBuiltinAsset(context.Background(), bad, "depositid", 42)
assert.EqualError(t, err, expectError.Error())
}
Expand Down
12 changes: 12 additions & 0 deletions core/banking/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 0 additions & 19 deletions core/banking/oneoff_transfers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func testRejectedIfDoesntReachMinimalAmount(t *testing.T) {
e.OnMinTransferQuantumMultiple(context.Background(), num.DecimalFromFloat(1))
// asset exists
e.assets.EXPECT().Get(gomock.Any()).Times(1).Return(assets.NewAsset(&mockAsset{name: assetNameETH, quantum: num.DecimalFromFloat(100)}), nil)
e.tsvc.EXPECT().GetTimeNow().Times(1)
e.broker.EXPECT().Send(gomock.Any()).Times(1)

assert.EqualError(t,
Expand All @@ -78,7 +77,6 @@ func testInvalidTransferKind(t *testing.T) {
transfer := &types.TransferFunds{
Kind: types.TransferCommandKind(-1),
}
e.tsvc.EXPECT().GetTimeNow().Times(1)
assert.EqualError(t,
e.TransferFunds(ctx, transfer),
banking.ErrUnsupportedTransferKind.Error(),
Expand All @@ -88,8 +86,6 @@ func testInvalidTransferKind(t *testing.T) {
func testOneOffTransferNotEnoughFundsToTransfer(t *testing.T) {
e := getTestEngine(t)

e.tsvc.EXPECT().GetTimeNow().Times(1)

ctx := context.Background()
transfer := &types.TransferFunds{
Kind: types.TransferCommandKindOneOff,
Expand Down Expand Up @@ -145,7 +141,6 @@ func testOneOffTransferInvalidTransfers(t *testing.T) {
var baseCpy types.TransferBase

t.Run("invalid from account", func(t *testing.T) {
e.tsvc.EXPECT().GetTimeNow().Times(1)
e.broker.EXPECT().Send(gomock.Any()).Times(1)
baseCpy := transferBase
transfer.OneOff.TransferBase = &baseCpy
Expand All @@ -157,7 +152,6 @@ func testOneOffTransferInvalidTransfers(t *testing.T) {
})

t.Run("invalid to account", func(t *testing.T) {
e.tsvc.EXPECT().GetTimeNow().Times(1)
e.broker.EXPECT().Send(gomock.Any()).Times(1)
baseCpy = transferBase
transfer.OneOff.TransferBase = &baseCpy
Expand All @@ -169,7 +163,6 @@ func testOneOffTransferInvalidTransfers(t *testing.T) {
})

t.Run("unsupported from account type", func(t *testing.T) {
e.tsvc.EXPECT().GetTimeNow().Times(1)
e.broker.EXPECT().Send(gomock.Any()).Times(1)
baseCpy = transferBase
transfer.OneOff.TransferBase = &baseCpy
Expand All @@ -181,7 +174,6 @@ func testOneOffTransferInvalidTransfers(t *testing.T) {
})

t.Run("unsuported to account type", func(t *testing.T) {
e.tsvc.EXPECT().GetTimeNow().Times(1)
e.broker.EXPECT().Send(gomock.Any()).Times(1)
baseCpy = transferBase
transfer.OneOff.TransferBase = &baseCpy
Expand All @@ -193,7 +185,6 @@ func testOneOffTransferInvalidTransfers(t *testing.T) {
})

t.Run("zero funds transfer", func(t *testing.T) {
e.tsvc.EXPECT().GetTimeNow().Times(1)
e.broker.EXPECT().Send(gomock.Any()).Times(1)
baseCpy = transferBase
transfer.OneOff.TransferBase = &baseCpy
Expand Down Expand Up @@ -232,7 +223,6 @@ func testValidOneOffTransfer(t *testing.T) {
}

// asset exists
e.tsvc.EXPECT().GetTimeNow().Times(1)
e.assets.EXPECT().Get(gomock.Any()).Times(1).Return(
assets.NewAsset(&mockAsset{name: assetNameETH, quantum: num.DecimalFromFloat(100)}), nil)
e.col.EXPECT().GetPartyGeneralAccount(gomock.Any(), gomock.Any()).Times(1).Return(&fromAcc, nil)
Expand Down Expand Up @@ -277,18 +267,12 @@ func testValidOneOffTransfer(t *testing.T) {
})

e.broker.EXPECT().Send(gomock.Any()).Times(3)
e.tsvc.EXPECT().GetTimeNow().AnyTimes()
assert.NoError(t, e.TransferFunds(ctx, transfer))
}

func testValidOneOffTransferWithDeliverOnInThePastStraightAway(t *testing.T) {
e := getTestEngine(t)

e.tsvc.EXPECT().GetTimeNow().DoAndReturn(
func() time.Time {
return time.Unix(10, 0)
}).AnyTimes()

// let's do a massive fee, easy to test
e.OnTransferFeeFactorUpdate(context.Background(), num.NewDecimalFromFloat(1))
e.OnTick(context.Background(), time.Unix(10, 0))
Expand Down Expand Up @@ -391,9 +375,6 @@ func testValidOneOffTransferWithDeliverOn(t *testing.T) {
Balance: num.NewUint(100),
}

// Time given to e.Transferfunds - base time Unix(10,0)
e.tsvc.EXPECT().GetTimeNow().Times(2).Return(time.Unix(10, 0))

// asset exists
e.assets.EXPECT().Get(gomock.Any()).Times(1).Return(assets.NewAsset(&mockAsset{name: assetNameETH, quantum: num.DecimalFromFloat(100)}), nil)
e.col.EXPECT().GetPartyGeneralAccount(gomock.Any(), gomock.Any()).Times(1).Return(&fromAcc, nil)
Expand Down
14 changes: 12 additions & 2 deletions core/banking/recurring_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,25 @@ func (e *Engine) dispatchRequired(ctx context.Context, ds *vegapb.DispatchStrate
if ds.EntityScope == vegapb.EntityScope_ENTITY_SCOPE_INDIVIDUALS {
hasNonZeroMetric := false
partyMetrics := e.marketActivityTracker.CalculateMetricForIndividuals(ctx, ds)
gs := events.NewPartyGameScoresEvent(ctx, num.UintFromUint64(e.currentEpoch).String(), e.hashDispatchStrategy(ds), e.timeService.GetTimeNow(), partyMetrics)
e.broker.Send(gs)
hasEligibleParties := false
for _, pm := range partyMetrics {
if !pm.Score.IsZero() {
hasNonZeroMetric = true
}
if pm.IsEligible {
hasEligibleParties = true
}
if hasNonZeroMetric && hasEligibleParties {
break
}
}
return hasNonZeroMetric || (len(partyMetrics) > 0 && ds.DistributionStrategy == vegapb.DistributionStrategy_DISTRIBUTION_STRATEGY_RANK)
return hasNonZeroMetric || (hasEligibleParties && ds.DistributionStrategy == vegapb.DistributionStrategy_DISTRIBUTION_STRATEGY_RANK)
} else {
tcs, _ := e.marketActivityTracker.CalculateMetricForTeams(ctx, ds)
tcs, pcs := e.marketActivityTracker.CalculateMetricForTeams(ctx, ds)
gs := events.NewTeamGameScoresEvent(ctx, num.UintFromUint64(e.currentEpoch).String(), e.hashDispatchStrategy(ds), e.timeService.GetTimeNow(), tcs, pcs)
e.broker.Send(gs)
return len(tcs) > 0
}
case vegapb.DispatchMetric_DISPATCH_METRIC_VALIDATOR_RANKING:
Expand Down
Loading

0 comments on commit 38bc93c

Please sign in to comment.