Skip to content

Commit

Permalink
Merge pull request #9701 from vegaprotocol/9691
Browse files Browse the repository at this point in the history
fix: refactor referral engine snapshot
  • Loading branch information
jeremyletang authored Oct 9, 2023
2 parents 8c9e2d5 + 07af6f0 commit 8b2fc7f
Show file tree
Hide file tree
Showing 8 changed files with 2,266 additions and 2,618 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@
- [9280](https://github.com/vegaprotocol/vega/issues/9280) - Get block height directly from `blocks` table.
- [9675](https://github.com/vegaprotocol/vega/issues/9675) - Fix snapshot issue with not applying `providersCalculationStep` at epoch start.
- [9693](https://github.com/vegaprotocol/vega/issues/9693) - Add missing validation for general account public key in governance transfer
- [9691](https://github.com/vegaprotocol/vega/issues/9691) - Refactor referral engine snapshot
- [8570](https://github.com/vegaprotocol/vega/issues/8570) - Ensure pagination doesn't trigger a sequential scan on block-explorer transactions table.

## 0.72.1
Expand Down
51 changes: 25 additions & 26 deletions core/referral/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,35 +386,34 @@ func (e *Engine) notifyReferralSetStatsUpdated(ctx context.Context, stats *types
e.broker.Send(events.NewReferralSetStatsUpdatedEvent(ctx, stats))
}

func (e *Engine) loadCurrentReferralProgramFromSnapshot(program *vegapb.ReferralProgram) {
if program == nil {
e.currentProgram = nil
return
func (e *Engine) load(referralProgramState *types.PayloadReferralProgramState) {
if referralProgramState.CurrentProgram != nil {
e.currentProgram = types.NewReferralProgramFromProto(referralProgramState.CurrentProgram)
}
if referralProgramState.NewProgram != nil {
e.newProgram = types.NewReferralProgramFromProto(referralProgramState.NewProgram)
}
e.latestProgramVersion = referralProgramState.LastProgramVersion
e.programHasEnded = referralProgramState.ProgramHasEnded
e.loadReferralSetsFromSnapshot(referralProgramState.Sets)
e.loadFactorsByReferee(referralProgramState.FactorByReferee)
}

func (e *Engine) loadFactorsByReferee(factors []*snapshotpb.FactorByReferee) {
e.factorsByReferee = make(map[types.PartyID]*types.RefereeStats, len(factors))
for _, fbr := range factors {
party := types.PartyID(fbr.Party)
discountFactor, _ := num.UnmarshalBinaryDecimal(fbr.DiscountFactor)
takerVolume := num.UintFromBytes(fbr.TakerVolume)
e.factorsByReferee[party] = &types.RefereeStats{
DiscountFactor: discountFactor,
TakerVolume: takerVolume,
}
}

e.currentProgram = types.NewReferralProgramFromProto(program)
}

func (e *Engine) loadNewReferralProgramFromSnapshot(program *vegapb.ReferralProgram) {
if program == nil {
e.newProgram = nil
return
}

e.newProgram = types.NewReferralProgramFromProto(program)
}

func (e *Engine) loadReferralMiscFromSnapshot(misc *snapshotpb.ReferralMisc) {
e.latestProgramVersion = misc.LastProgramVersion
e.programHasEnded = misc.ProgramHasEnded
}

func (e *Engine) loadReferralSetsFromSnapshot(setsProto *snapshotpb.ReferralSets) {
if setsProto == nil {
return
}

for _, setProto := range setsProto.Sets {
func (e *Engine) loadReferralSetsFromSnapshot(setsProto []*snapshotpb.ReferralSet) {
for _, setProto := range setsProto {
setID := types.ReferralSetID(setProto.Id)

newSet := &types.ReferralSet{
Expand Down
162 changes: 41 additions & 121 deletions core/referral/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
"sort"

"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/libs/num"
"code.vegaprotocol.io/vega/libs/proto"
vegapb "code.vegaprotocol.io/vega/protos/vega"
snapshotpb "code.vegaprotocol.io/vega/protos/vega/snapshot/v1"
"golang.org/x/exp/maps"
)
Expand All @@ -33,11 +31,8 @@ type SnapshottedEngine struct {
stopped bool

// Keys need to be computed when the engine is instantiated as they are dynamic.
hashKeys []string
currentProgramKey string
newProgramKey string
referralSetsKey string
referralMiscKey string
hashKeys []string
key string
}

func (e *SnapshottedEngine) Namespace() types.SnapshotNamespace {
Expand All @@ -59,17 +54,8 @@ func (e *SnapshottedEngine) LoadState(_ context.Context, p *types.Payload) ([]ty
}

switch data := p.Data.(type) {
case *types.PayloadCurrentReferralProgram:
e.loadCurrentReferralProgramFromSnapshot(data.CurrentReferralProgram)
return nil, nil
case *types.PayloadNewReferralProgram:
e.loadNewReferralProgramFromSnapshot(data.NewReferralProgram)
return nil, nil
case *types.PayloadReferralSets:
e.loadReferralSetsFromSnapshot(data.Sets)
return nil, nil
case *types.PayloadReferralMisc:
e.loadReferralMiscFromSnapshot(data.ReferralMisc)
case *types.PayloadReferralProgramState:
e.load(data)
return nil, nil
default:
return nil, types.ErrUnknownSnapshotType
Expand All @@ -90,22 +76,46 @@ func (e *SnapshottedEngine) serialise(k string) ([]byte, error) {
}

switch k {
case e.currentProgramKey:
return e.serialiseCurrentReferralProgram()
case e.newProgramKey:
return e.serialiseNewReferralProgram()
case e.referralSetsKey:
return e.serialiseReferralSets()
case e.referralMiscKey:
return e.serialiseReferralMisc()
case e.key:
return e.serialiseReferralProgram()
default:
return nil, types.ErrSnapshotKeyDoesNotExist
}
}

func (e *SnapshottedEngine) serialiseReferralSets() ([]byte, error) {
setsProto := make([]*snapshotpb.ReferralSet, 0, len(e.sets))
func (e *SnapshottedEngine) serialiseReferralProgram() ([]byte, error) {
referralProgramData := &snapshotpb.ReferralProgramData{
LastProgramVersion: e.latestProgramVersion,
ProgramHasEnded: e.programHasEnded,
}

payload := &snapshotpb.Payload{
Data: &snapshotpb.Payload_ReferralProgram{
ReferralProgram: referralProgramData,
},
}

if e.currentProgram != nil {
referralProgramData.CurrentProgram = e.currentProgram.IntoProto()
}
if e.newProgram != nil {
referralProgramData.NewProgram = e.newProgram.IntoProto()
}

referralProgramData.FactorByReferee = make([]*snapshotpb.FactorByReferee, 0, len(e.factorsByReferee))
for pi, rs := range e.factorsByReferee {
df, _ := rs.DiscountFactor.MarshalBinary()
tv := rs.TakerVolume.Bytes()
referralProgramData.FactorByReferee = append(referralProgramData.FactorByReferee, &snapshotpb.FactorByReferee{
Party: pi.String(), DiscountFactor: df, TakerVolume: tv[:],
})
}

sort.Slice(referralProgramData.FactorByReferee, func(i, j int) bool {
return referralProgramData.FactorByReferee[i].Party < referralProgramData.FactorByReferee[j].Party
})

referralProgramData.Sets = make([]*snapshotpb.ReferralSet, 0, len(e.sets))
setIDs := maps.Keys(e.sets)

sort.SliceStable(setIDs, func(i, j int) bool {
Expand Down Expand Up @@ -155,55 +165,7 @@ func (e *SnapshottedEngine) serialiseReferralSets() ([]byte, error) {
setProto.RunningVolumes = runningVolumesProto
}

setsProto = append(setsProto, setProto)
}

payload := &snapshotpb.Payload{
Data: &snapshotpb.Payload_ReferralSets{
ReferralSets: &snapshotpb.ReferralSets{
Sets: setsProto,
},
},
}

serialisedSets, err := proto.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("could not serialize referral sets payload: %w", err)
}

return serialisedSets, nil
}

func (e *SnapshottedEngine) serialiseCurrentReferralProgram() ([]byte, error) {
var programSnapshot *vegapb.ReferralProgram
if e.currentProgram != nil {
programSnapshot = e.currentProgram.IntoProto()
}

payload := &snapshotpb.Payload{
Data: &snapshotpb.Payload_CurrentReferralProgram{
CurrentReferralProgram: &snapshotpb.CurrentReferralProgram{
ReferralProgram: programSnapshot,
},
},
}

serialisedCurrentReferralProgram, err := proto.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("could not serialize current referral program payload: %w", err)
}

return serialisedCurrentReferralProgram, nil
}

func (e *SnapshottedEngine) serialiseReferralMisc() ([]byte, error) {
payload := &snapshotpb.Payload{
Data: &snapshotpb.Payload_ReferralMisc{
ReferralMisc: &snapshotpb.ReferralMisc{
LastProgramVersion: e.latestProgramVersion,
ProgramHasEnded: e.programHasEnded,
},
},
referralProgramData.Sets = append(referralProgramData.Sets, setProto)
}

serialised, err := proto.Marshal(payload)
Expand All @@ -214,51 +176,9 @@ func (e *SnapshottedEngine) serialiseReferralMisc() ([]byte, error) {
return serialised, nil
}

func (e *SnapshottedEngine) serialiseNewReferralProgram() ([]byte, error) {
var programSnapshot *vegapb.ReferralProgram
if e.newProgram != nil {
programSnapshot = e.newProgram.IntoProto()
}

payload := &snapshotpb.Payload{
Data: &snapshotpb.Payload_NewReferralProgram{
NewReferralProgram: &snapshotpb.NewReferralProgram{
ReferralProgram: programSnapshot,
},
},
}

serialisedNewReferralProgram, err := proto.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("could not serialize new referral program payload: %w", err)
}

return serialisedNewReferralProgram, nil
}

func (e *SnapshottedEngine) buildHashKeys() {
e.currentProgramKey = (&types.PayloadCurrentReferralProgram{}).Key()
e.newProgramKey = (&types.PayloadNewReferralProgram{}).Key()
e.referralSetsKey = (&types.PayloadReferralSets{}).Key()
e.referralMiscKey = (&types.PayloadReferralMisc{}).Key()

e.hashKeys = append([]string{}, e.currentProgramKey, e.newProgramKey, e.referralSetsKey, e.referralMiscKey)
}

func (e *Engine) OnStateLoaded(ctx context.Context) error {
if e.programHasEnded {
return nil
}

// we need to regenerate the statistics based on the restored state and we call
// computeFactorsByReferee to do this
partiesTakerVolume := map[types.PartyID]*num.Uint{}
for partyID := range e.referees {
volumeForEpoch := e.marketActivityTracker.NotionalTakerVolumeForParty(string(partyID))
partiesTakerVolume[partyID] = volumeForEpoch
}
e.computeFactorsByReferee(ctx, e.currentEpoch, partiesTakerVolume)
return nil
e.key = (&types.PayloadReferralProgramState{}).Key()
e.hashKeys = append([]string{}, e.key)
}

func NewSnapshottedEngine(broker Broker, timeSvc TimeService, mat MarketActivityTracker, staking StakingBalances) *SnapshottedEngine {
Expand Down
49 changes: 3 additions & 46 deletions core/referral/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"code.vegaprotocol.io/vega/libs/num"
vgtest "code.vegaprotocol.io/vega/libs/test"
"code.vegaprotocol.io/vega/paths"
vegapb "code.vegaprotocol.io/vega/protos/vega"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -114,12 +113,6 @@ func TestTakingAndRestoringSnapshotSucceeds(t *testing.T) {
// Set new program.
te1.engine.UpdateProgram(program2)

// Take a snapshot.
hash1, err := snapshotEngine1.SnapshotNow(ctx)
require.NoError(t, err)

epochAtSnapshot := te1.currentEpoch

// Simulating end of epoch.
// The program should be updated with the new one.
postSnapshotActions := func(te *testEngine) {
Expand Down Expand Up @@ -165,13 +158,9 @@ func TestTakingAndRestoringSnapshotSucceeds(t *testing.T) {
}
postSnapshotActions(te1)

state1 := map[string][]byte{}
for _, key := range te1.engine.Keys() {
state, additionalProvider, err := te1.engine.GetState(key)
require.NoError(t, err)
assert.Empty(t, additionalProvider)
state1[key] = state
}
// Take a snapshot.
hash1, err := snapshotEngine1.SnapshotNow(ctx)
require.NoError(t, err)

closeSnapshotEngine1()

Expand All @@ -180,45 +169,13 @@ func TestTakingAndRestoringSnapshotSucceeds(t *testing.T) {
snapshotEngine2 := newSnapshotEngine(t, vegaPath, now, te2.engine)
defer snapshotEngine2.Close()

// Simulate restoration of the epoch at the time of the snapshot
te2.currentEpoch = epochAtSnapshot
te2.engine.OnEpochRestore(ctx, types.Epoch{
Seq: epochAtSnapshot,
Action: vegapb.EpochAction_EPOCH_ACTION_START,
})
// Simulate restoration of the network parameter at the time of the snapshot
require.NoError(t, te2.engine.OnReferralProgramMaxPartyNotionalVolumeByQuantumPerEpochUpdate(ctx, maxVolumeParams))

// OnStateLoaded will recalculate referrer stats and send out all these events
te2.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee1)).Return(num.UintFromUint64(100)).Times(1)
te2.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee2)).Return(num.UintFromUint64(100)).Times(1)
te2.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee3)).Return(num.UintFromUint64(100)).Times(1)
te2.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee4)).Return(num.UintFromUint64(100)).Times(1)
te2.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee5)).Return(num.UintFromUint64(100)).Times(1)
te2.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee6)).Return(num.UintFromUint64(100)).Times(1)
te2.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee7)).Return(num.UintFromUint64(100)).Times(1)
te2.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee8)).Return(num.UintFromUint64(100)).Times(1)
te2.marketActivityTracker.EXPECT().NotionalTakerVolumeForParty(string(referee9)).Return(num.UintFromUint64(100)).Times(1)
te2.staking.EXPECT().GetAvailableBalance(gomock.Any()).AnyTimes().Return(num.NewUint(100), nil)
expectReferralSetStatsUpdatedEvent(t, te2, 4)
// This triggers the state restoration from the local snapshot.
require.NoError(t, snapshotEngine2.Start(ctx))

// Comparing the hash after restoration, to ensure it produces the same result.
hash2, _, _ := snapshotEngine2.Info()
require.Equal(t, hash1, hash2)

postSnapshotActions(te2)

state2 := map[string][]byte{}
for _, key := range te2.engine.Keys() {
state, additionalProvider, err := te2.engine.GetState(key)
require.NoError(t, err)
assert.Empty(t, additionalProvider)
state2[key] = state
}

for key := range state1 {
assert.Equalf(t, state1[key], state2[key], "Key %q does not have the same data", key)
}
}
Loading

0 comments on commit 8b2fc7f

Please sign in to comment.