Skip to content

Commit

Permalink
fix: recalculate stats when the program is updated
Browse files Browse the repository at this point in the history
Signed-off-by: Elias Van Ootegem <[email protected]>
  • Loading branch information
EVODelavega committed Sep 18, 2024
1 parent 3a48987 commit 907189c
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- [11672](https://github.com/vegaprotocol/vega/issues/11672) - Add missing fees in GraphQL bindings.
- [11681](https://github.com/vegaprotocol/vega/issues/11681) - Account for conflicts inserting funding payment records.
- [11696](https://github.com/vegaprotocol/vega/issues/11696) - Add binding for estimate fees API.
- [11699](https://github.com/vegaprotocol/vega/issues/11699) - Update factors of programs when they are updated.


## 0.78.2
Expand Down
15 changes: 11 additions & 4 deletions core/referral/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,14 @@ func (e *Engine) OnReferralProgramMaxPartyNotionalVolumeByQuantumPerEpochUpdate(
func (e *Engine) OnEpoch(ctx context.Context, ep types.Epoch) {
switch ep.Action {
case vegapb.EpochAction_EPOCH_ACTION_START:
pp := e.currentProgram
e.currentEpoch = ep.Seq
e.applyProgramUpdate(ctx, ep.StartTime, ep.Seq)
if pp != nil && pp != e.currentProgram && !e.programHasEnded {
e.computeReferralSetsStats(ctx, ep, false)
}
case vegapb.EpochAction_EPOCH_ACTION_END:
e.computeReferralSetsStats(ctx, ep)
e.computeReferralSetsStats(ctx, ep, true)
}
}

Expand Down Expand Up @@ -495,7 +499,7 @@ func (e *Engine) loadReferralSetsFromSnapshot(setsProto []*snapshotpb.ReferralSe
}
}

func (e *Engine) computeReferralSetsStats(ctx context.Context, epoch types.Epoch) {
func (e *Engine) computeReferralSetsStats(ctx context.Context, epoch types.Epoch, sendEvents bool) {
priorEpoch := uint64(0)
if epoch.Seq > MaximumWindowLength {
priorEpoch = epoch.Seq - MaximumWindowLength
Expand All @@ -522,10 +526,10 @@ func (e *Engine) computeReferralSetsStats(ctx context.Context, epoch types.Epoch
return
}

e.computeFactorsByReferee(ctx, epoch.Seq, takerVolumePerReferee, referrersTakerVolume)
e.computeFactorsByReferee(ctx, epoch.Seq, takerVolumePerReferee, referrersTakerVolume, sendEvents)
}

func (e *Engine) computeFactorsByReferee(ctx context.Context, epoch uint64, takerVolumePerReferee, referrersTakesVolume map[types.PartyID]*num.Uint) {
func (e *Engine) computeFactorsByReferee(ctx context.Context, epoch uint64, takerVolumePerReferee, referrersTakesVolume map[types.PartyID]*num.Uint, sendEvents bool) {
e.factorsByReferee = map[types.PartyID]*types.RefereeStats{}

allStats := map[types.ReferralSetID]*types.ReferralSetStats{}
Expand Down Expand Up @@ -595,6 +599,9 @@ func (e *Engine) computeFactorsByReferee(ctx context.Context, epoch uint64, take
}
}

if !sendEvents {
return
}
setIDs := maps.Keys(allStats)
slices.Sort(setIDs)
for _, setID := range setIDs {
Expand Down
48 changes: 35 additions & 13 deletions core/referral/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,41 @@ func TestTakingAndRestoringSnapshotSucceeds(t *testing.T) {

// Simulating end of epoch.
// The program should be applied.
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referrer1)).Return(num.UintFromUint64(10)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referrer2)).Return(num.UintFromUint64(20)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referrer3)).Return(num.UintFromUint64(30)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referrer4)).Return(num.UintFromUint64(40)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee1)).Return(num.UintFromUint64(50)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee2)).Return(num.UintFromUint64(60)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee3)).Return(num.UintFromUint64(70)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee4)).Return(num.UintFromUint64(80)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee5)).Return(num.UintFromUint64(90)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee6)).Return(num.UintFromUint64(100)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee7)).Return(num.UintFromUint64(110)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee8)).Return(num.UintFromUint64(120)).Times(1)
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee9)).Return(num.UintFromUint64(130)).Times(1)
epochEndVals := map[string]*num.Uint{
string(referrer1): num.UintFromUint64(10),
string(referrer2): num.UintFromUint64(20),
string(referrer3): num.UintFromUint64(30),
string(referrer4): num.UintFromUint64(40),
string(referee1): num.UintFromUint64(50),
string(referee2): num.UintFromUint64(60),
string(referee3): num.UintFromUint64(70),
string(referee4): num.UintFromUint64(80),
string(referee5): num.UintFromUint64(90),
string(referee6): num.UintFromUint64(100),
string(referee7): num.UintFromUint64(110),
string(referee8): num.UintFromUint64(120),
string(referee9): num.UintFromUint64(130),
}
te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(gomock.Any()).DoAndReturn(func(k string) *num.Uint {
v, ok := epochEndVals[k]
if !ok {
return num.UintZero()
}
return v
}).Times(len(epochEndVals) * 2)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referrer1)).Return(num.UintFromUint64(10)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referrer2)).Return(num.UintFromUint64(20)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referrer3)).Return(num.UintFromUint64(30)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referrer4)).Return(num.UintFromUint64(40)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee1)).Return(num.UintFromUint64(50)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee2)).Return(num.UintFromUint64(60)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee3)).Return(num.UintFromUint64(70)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee4)).Return(num.UintFromUint64(80)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee5)).Return(num.UintFromUint64(90)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee6)).Return(num.UintFromUint64(100)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee7)).Return(num.UintFromUint64(110)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee8)).Return(num.UintFromUint64(120)).Times(1)
// te1.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee9)).Return(num.UintFromUint64(130)).Times(1)

expectReferralProgramStartedEvent(t, te1)
lastEpochStartTime := program1.EndOfProgramTimestamp.Add(-2 * time.Hour)
Expand Down
9 changes: 9 additions & 0 deletions core/volumediscount/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,16 @@ func New(broker Broker, marketActivityTracker MarketActivityTracker) *Engine {
func (e *Engine) OnEpoch(ctx context.Context, ep types.Epoch) {
switch ep.Action {
case vegapb.EpochAction_EPOCH_ACTION_START:
// whatever current program is
pp := e.currentProgram
e.applyProgramUpdate(ctx, ep.StartTime, ep.Seq)
// has the program changed, and is the new current program active?
if pp != nil && pp != e.currentProgram && !e.programHasEnded {
// calculate volume for the window of the new program
e.calculatePartiesVolumeForWindow(int(e.currentProgram.WindowLength))
// update the factors
e.computeFactorsByParty(ctx, ep.Seq)
}
case vegapb.EpochAction_EPOCH_ACTION_END:
e.updateNotionalVolumeForEpoch()
if !e.programHasEnded {
Expand Down
2 changes: 2 additions & 0 deletions core/volumediscount/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func TestVolumeDiscountProgramLifecycle(t *testing.T) {
e := evt.(*events.VolumeDiscountProgramUpdated)
require.Equal(t, p2.IntoProto(), e.GetVolumeDiscountProgramUpdated().Program)
}).Times(1)
// expect the stats updated event
expectStatsUpdated(t, broker)
engine.OnEpoch(context.Background(), types.Epoch{Action: vega.EpochAction_EPOCH_ACTION_START, StartTime: now.Add(time.Hour * 1)})

// // expire the program
Expand Down
6 changes: 6 additions & 0 deletions core/volumerebate/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ func New(broker Broker, marketActivityTracker MarketActivityTracker) *Engine {
func (e *Engine) OnEpoch(ctx context.Context, ep types.Epoch) {
switch ep.Action {
case vegapb.EpochAction_EPOCH_ACTION_START:
pp := e.currentProgram
e.applyProgramUpdate(ctx, ep.StartTime, ep.Seq)
if pp != nil && pp != e.currentProgram && !e.programHasEnded {
// update state based on the new program window length
e.updateState()
e.computeFactorsByParty(ctx, ep.Seq)
}
case vegapb.EpochAction_EPOCH_ACTION_END:
e.updateState()
if !e.programHasEnded {
Expand Down
18 changes: 14 additions & 4 deletions core/volumerebate/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestVolumeRebateProgramLifecycle(t *testing.T) {
broker := mocks.NewMockBroker(ctrl)
marketActivityTracker := mocks.NewMockMarketActivityTracker(ctrl)
engine := volumerebate.NewSnapshottedEngine(broker, marketActivityTracker)
marketActivityTracker.EXPECT().CalculateTotalMakerContributionInQuantum(gomock.Any()).Return(map[string]*num.Uint{}, map[string]num.Decimal{}).Times(1)

// test snapshot with empty engine
hashEmpty, _, err := engine.GetState(key)
Expand All @@ -83,7 +84,10 @@ func TestVolumeRebateProgramLifecycle(t *testing.T) {

// expect an event for the started program
broker.EXPECT().Send(gomock.Any()).DoAndReturn(func(evt events.Event) {
e := evt.(*events.VolumeRebateProgramStarted)
e, ok := evt.(*events.VolumeRebateProgramStarted)
if !ok {
return
}
require.Equal(t, p1.IntoProto(), e.GetVolumeRebateProgramStarted().Program)
}).Times(1)

Expand Down Expand Up @@ -118,14 +122,20 @@ func TestVolumeRebateProgramLifecycle(t *testing.T) {

// // expect a program updated event
broker.EXPECT().Send(gomock.Any()).DoAndReturn(func(evt events.Event) {
e := evt.(*events.VolumeRebateProgramUpdated)
e, ok := evt.(*events.VolumeRebateProgramUpdated)
if !ok {
return
}
require.Equal(t, p2.IntoProto(), e.GetVolumeRebateProgramUpdated().Program)
}).Times(1)
}).Times(2)
engine.OnEpoch(context.Background(), types.Epoch{Action: vega.EpochAction_EPOCH_ACTION_START, StartTime: now.Add(time.Hour * 1)})

// // expire the program
broker.EXPECT().Send(gomock.Any()).DoAndReturn(func(evt events.Event) {
e := evt.(*events.VolumeRebateProgramEnded)
e, ok := evt.(*events.VolumeRebateProgramEnded)
if !ok {
return
}
require.Equal(t, p2.Version, e.GetVolumeRebateProgramEnded().Version)
}).Times(1)
engine.OnEpoch(context.Background(), types.Epoch{Action: vega.EpochAction_EPOCH_ACTION_START, StartTime: now.Add(time.Hour * 2)})
Expand Down

0 comments on commit 907189c

Please sign in to comment.