Skip to content

Commit

Permalink
feat: allow flagged users to link
Browse files Browse the repository at this point in the history
* remove logic to stop flagged users linking
* remove unnecessary logging
* cleanup link wallet method
  • Loading branch information
clD11 committed Oct 16, 2023
1 parent c0dd131 commit d85d726
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 102 deletions.
98 changes: 24 additions & 74 deletions services/wallet/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,18 @@ var (
Help: "A counter for seeing how many custodian accounts have been linked 10 times",
ConstLabels: prometheus.Labels{"service": "wallet"},
})
// counter for flagged unusual
countLinkingFlaggedUnusual = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "count_linking_flagged_unusual",
Help: "provides a count of unusual linkings flagged results",
ConstLabels: prometheus.Labels{"service": "wallet"},
})
)

func init() {
prometheus.MustRegister(tooManyCardsCounter)
prometheus.MustRegister(metricTxLockGauge)
prometheus.MustRegister(tenLinkagesReached)
prometheus.MustRegister(countLinkingFlaggedUnusual)
}

// Datastore holds the interface for the wallet datastore
type Datastore interface {
datastore.Datastore
LinkWallet(ctx context.Context, ID string, providerID string, providerLinkingID uuid.UUID, anonymousAddress *uuid.UUID, depositProvider, country string) error
LinkWallet(ctx context.Context, id string, providerID string, providerLinkingID uuid.UUID, depositProvider, country string) error
GetLinkingLimitInfo(ctx context.Context, providerLinkingID string) (map[string]LinkingInfo, error)
HasPriorLinking(ctx context.Context, walletID uuid.UUID, providerLinkingID uuid.UUID) (bool, error)
// GetLinkingsByProviderLinkingID gets the wallet linking info by provider linking id
Expand Down Expand Up @@ -560,51 +552,26 @@ var (
ErrGeoResetDifferent = errors.New("geo reset is different")
)

// LinkWallet links a wallet together
func (pg *Postgres) LinkWallet(ctx context.Context, ID string, userDepositDestination string, providerLinkingID uuid.UUID, anonymousAddress *uuid.UUID, depositProvider, country string) error {
sublogger := logger(ctx).With().Str("wallet_id", ID).Logger()
sublogger.Debug().Msg("linking wallet")

// rep check
if repClient, ok := ctx.Value(appctx.ReputationClientCTXKey).(reputation.Client); ok {
walletID, err := uuid.FromString(ID)
if err != nil {
sublogger.Warn().Err(err).Msg("invalid wallet id")
return fmt.Errorf("invalid wallet id, not uuid: %w", err)
}
// we have a client, check the value for ID
reputable, cohorts, err := repClient.IsLinkingReputable(ctx, walletID, country)
if err != nil {
sublogger.Warn().Err(err).Msg("failed to check reputation")
return fmt.Errorf("failed to check wallet rep: %w", err)
}
// LinkWallet links a rewards wallet to the given deposit provider.
func (pg *Postgres) LinkWallet(ctx context.Context, id string, userDepositDestination string, providerLinkingID uuid.UUID, depositProvider, country string) error {
walletID, err := uuid.FromString(id)
if err != nil {
return fmt.Errorf("invalid wallet id, not uuid: %w", err)
}

var (
isTooYoung = false
geoResetDifferent = false
)
for _, v := range cohorts {
if isTooYoung = (v == reputation.CohortTooYoung); isTooYoung {
break
}
if geoResetDifferent = (v == reputation.CohortGeoResetDifferent); geoResetDifferent {
break
}
}
repClient, ok := ctx.Value(appctx.ReputationClientCTXKey).(reputation.Client)
if !ok {
return ErrNoReputationClient
}

if !reputable && !isTooYoung && !geoResetDifferent {
sublogger.Info().Msg("wallet linking attempt failed - unusual activity")
countLinkingFlaggedUnusual.Inc()
return ErrUnusualActivity
} else if geoResetDifferent {
sublogger.Info().Msg("wallet linking attempt failed - geo reset is different")
return ErrGeoResetDifferent
}
// TODO(clD11): We no longer need to act on the response and only require a successful call to reputation to
// continue linking. As part of the wallet refactor we should clean this up.
if _, _, err := repClient.IsLinkingReputable(ctx, walletID, country); err != nil {
return fmt.Errorf("failed to check wallet rep: %w", err)
}

ctx, tx, rollback, commit, err := getTx(ctx, pg)
if err != nil {
sublogger.Error().Err(err).Msg("error getting tx")
return fmt.Errorf("error getting tx: %w", err)
}
defer func() {
Expand All @@ -613,52 +580,35 @@ func (pg *Postgres) LinkWallet(ctx context.Context, ID string, userDepositDestin
}()

metricTxLockGauge.Inc()
err = waitAndLockTx(ctx, tx, providerLinkingID)
if err != nil {
sublogger.Error().Err(err).Msg("error acquiring tx lock")
if err := waitAndLockTx(ctx, tx, providerLinkingID); err != nil {
return fmt.Errorf("error acquiring tx lock: %w", err)
}

id, err := uuid.FromString(ID)
if err != nil {
return errorutils.Wrap(err, "error invalid id")
}

// connect custodian link (does the link limit checking in insert)
if err = pg.ConnectCustodialWallet(ctx, &CustodianLink{
WalletID: &id,
if err := pg.ConnectCustodialWallet(ctx, &CustodianLink{
WalletID: &walletID,
Custodian: depositProvider,
LinkingID: &providerLinkingID,
}, userDepositDestination); err != nil {
sublogger.Error().Err(err).
Msg("error connect custodian wallet")
return fmt.Errorf("error connect custodian wallet: %w", err)
}

// TODO(clD11): the below verified wallets calls were added as a quick fix and should be addressed in the wallet refactor.
if VerifiedWalletEnable {
err := pg.InsertVerifiedWalletOutboxTx(ctx, tx, id, true)
if err != nil {
if err := pg.InsertVerifiedWalletOutboxTx(ctx, tx, walletID, true); err != nil {
return fmt.Errorf("failed to update verified wallet: %w", err)
}
}

if directVerifiedWalletEnable {
client, ok := ctx.Value(appctx.ReputationClientCTXKey).(reputation.Client)
if !ok {
return ErrNoReputationClient
op := func() (interface{}, error) {
return nil, repClient.UpdateReputationSummary(ctx, walletID.String(), true)
}
upsertReputationSummary := func() (interface{}, error) {
return nil, client.UpdateReputationSummary(ctx, ID, true)
}
_, err = backoff.Retry(ctx, upsertReputationSummary, retryPolicy, canRetry(nonRetriableErrors))
if err != nil {
if _, err := backoff.Retry(ctx, op, retryPolicy, canRetry(nonRetriableErrors)); err != nil {
return fmt.Errorf("failed to update verified wallet: %w", err)
}
}

err = commit()
if err != nil {
sublogger.Error().Err(err).Msg("error committing tx")
if err := commit(); err != nil {
sentry.CaptureException(fmt.Errorf("error failed to commit link wallet transaction: %w", err))
return fmt.Errorf("error committing tx: %w", err)
}
Expand Down
55 changes: 36 additions & 19 deletions services/wallet/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,16 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_InsertUpdate() {
pg, _, err := NewPostgres()
suite.Require().NoError(err)

for i := 0; i < 1; i++ {
mockCtrl := gomock.NewController(suite.T())
defer mockCtrl.Finish()

repClient := mock_reputation.NewMockClient(mockCtrl)
repClient.EXPECT().IsLinkingReputable(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

ctx := context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D")
ctx = context.WithValue(ctx, appctx.ReputationClientCTXKey, repClient)

for i := 0; i < 1; i++ {
// seed 3 wallets with same linkingID
userDepositDestination, providerLinkingID := suite.seedWallet(pg)

Expand All @@ -214,8 +222,7 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_InsertUpdate() {
PublicKey: "hBrtClwIppLmu/qZ8EhGM1TQZUwDUosbOrVu1jMwryY=",
}

err = pg.UpsertWallet(context.WithValue(context.Background(),
appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), walletInfo)
err = pg.UpsertWallet(ctx, walletInfo)
suite.Require().NoError(err, "save wallet should succeed")

runs := 2
Expand All @@ -225,15 +232,12 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_InsertUpdate() {
for i := 0; i < runs; i++ {
go func() {
defer wg.Done()
err = pg.LinkWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"),
walletInfo.ID, userDepositDestination, providerLinkingID, walletInfo.AnonymousAddress, walletInfo.Provider, "")
err = pg.LinkWallet(ctx, walletInfo.ID, userDepositDestination, providerLinkingID, walletInfo.Provider, "")
}()
}

wg.Wait()

used, max, err := pg.GetCustodianLinkCount(context.WithValue(context.Background(),
appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), providerLinkingID, "")
used, max, err := pg.GetCustodianLinkCount(ctx, providerLinkingID, "")

suite.Require().NoError(err, "should have no error getting custodian link count")
suite.Require().True(used == max, fmt.Sprintf("used %d should not exceed max %d", used, max))
Expand All @@ -244,6 +248,15 @@ func (suite *WalletPostgresTestSuite) seedWallet(pg Datastore) (string, uuid.UUI
userDepositDestination := uuid.NewV4().String()
providerLinkingID := uuid.NewV4()

mockCtrl := gomock.NewController(suite.T())
defer mockCtrl.Finish()

repClient := mock_reputation.NewMockClient(mockCtrl)
repClient.EXPECT().IsLinkingReputable(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

ctx := context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D")
ctx = context.WithValue(ctx, appctx.ReputationClientCTXKey, repClient)

walletCount := 3
for i := 0; i < walletCount; i++ {
altCurrency := altcurrency.BAT
Expand All @@ -256,16 +269,14 @@ func (suite *WalletPostgresTestSuite) seedWallet(pg Datastore) (string, uuid.UUI
AnonymousAddress: nil,
}

err := pg.UpsertWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), walletInfo)
err := pg.UpsertWallet(ctx, walletInfo)
suite.Require().NoError(err, "save wallet should succeed")

err = pg.LinkWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"),
walletInfo.ID, userDepositDestination, providerLinkingID, walletInfo.AnonymousAddress, "uphold", "")
err = pg.LinkWallet(ctx, walletInfo.ID, userDepositDestination, providerLinkingID, "uphold", "")
suite.Require().NoError(err, "link wallet should succeed")
}

used, _, err := pg.GetCustodianLinkCount(context.WithValue(context.Background(),
appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), providerLinkingID, "")
used, _, err := pg.GetCustodianLinkCount(ctx, providerLinkingID, "")

suite.Require().NoError(err, "should have no error getting custodian link count")
suite.Require().True(used == walletCount, fmt.Sprintf("used %d", used))
Expand All @@ -277,6 +288,15 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_MaxLinkCount() {
pg, _, err := NewPostgres()
suite.Require().NoError(err)

mockCtrl := gomock.NewController(suite.T())
defer mockCtrl.Finish()

repClient := mock_reputation.NewMockClient(mockCtrl)
repClient.EXPECT().IsLinkingReputable(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

ctx := context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D")
ctx = context.WithValue(ctx, appctx.ReputationClientCTXKey, repClient)

wallets := make([]*walletutils.Info, 10, 10)

for i := 0; i < len(wallets); i++ {
Expand All @@ -289,7 +309,7 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_MaxLinkCount() {
PublicKey: "hBrtClwIppLmu/qZ8EhGM1TQZUwDUosbOrVu1jMwryY=",
}
wallets[i] = walletInfo
err := pg.UpsertWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), walletInfo)
err := pg.UpsertWallet(ctx, walletInfo)
suite.Require().NoError(err, "save wallet should succeed")
}

Expand All @@ -302,15 +322,12 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_MaxLinkCount() {
for i := 0; i < len(wallets); i++ {
go func(index int) {
defer wg.Done()
err = pg.LinkWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"),
wallets[index].ID, userDepositDestination, providerLinkingID, wallets[index].AnonymousAddress, wallets[index].Provider, "")
err = pg.LinkWallet(ctx, wallets[index].ID, userDepositDestination, providerLinkingID, wallets[index].Provider, "")
}(i)
}

wg.Wait()

used, max, err := pg.GetCustodianLinkCount(context.WithValue(context.Background(),
appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), providerLinkingID, "")
used, max, err := pg.GetCustodianLinkCount(ctx, providerLinkingID, "")

suite.Require().NoError(err, "should have no error getting custodian link count")
suite.Require().True(used == max, fmt.Sprintf("used %d should not exceed max %d", used, max))
Expand Down
8 changes: 4 additions & 4 deletions services/wallet/instrumented_datastore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions services/wallet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (service *Service) LinkBitFlyerWallet(ctx context.Context, walletID uuid.UU
// we also validated that this "info" signed the request to perform the linking with http signature
// we assume that since we got linkingInfo signed from BF that they are KYC
providerLinkingID := uuid.NewV5(ClaimNamespace, accountHash)
err = service.Datastore.LinkWallet(ctx, walletID.String(), depositID, providerLinkingID, nil, depositProvider, country)
err = service.Datastore.LinkWallet(ctx, walletID.String(), depositID, providerLinkingID, depositProvider, country)
if err != nil {
if errors.Is(err, ErrUnusualActivity) {
return "", handlers.WrapError(err, "unable to link - unusual activity", http.StatusBadRequest)
Expand Down Expand Up @@ -497,7 +497,7 @@ func (service *Service) LinkZebPayWallet(ctx context.Context, walletID uuid.UUID
}

providerLinkingID := uuid.NewV5(ClaimNamespace, claims.AccountID)
if err := service.Datastore.LinkWallet(ctx, walletID.String(), claims.DepositID, providerLinkingID, nil, depositProvider, country); err != nil {
if err := service.Datastore.LinkWallet(ctx, walletID.String(), claims.DepositID, providerLinkingID, depositProvider, country); err != nil {
if errors.Is(err, ErrUnusualActivity) {
return "", handlers.WrapError(err, "unable to link - unusual activity", http.StatusBadRequest)
}
Expand Down Expand Up @@ -567,7 +567,7 @@ func (service *Service) LinkGeminiWallet(ctx context.Context, walletID uuid.UUID

// we assume that since we got linking_info(VerificationToken) signed from Gemini that they are KYC
providerLinkingID := uuid.NewV5(ClaimNamespace, accountID)
err = service.Datastore.LinkWallet(ctx, walletID.String(), depositID, providerLinkingID, nil, depositProvider, country)
err = service.Datastore.LinkWallet(ctx, walletID.String(), depositID, providerLinkingID, depositProvider, country)
if err != nil {
if errors.Is(err, ErrUnusualActivity) {
return "", handlers.WrapError(err, "unable to link - unusual activity", http.StatusBadRequest)
Expand All @@ -589,7 +589,7 @@ func (service *Service) LinkGeminiWallet(ctx context.Context, walletID uuid.UUID
}

// LinkUpholdWallet links an uphold.Wallet and transfers funds.
func (service *Service) LinkUpholdWallet(ctx context.Context, wallet uphold.Wallet, transaction string, anonymousAddress *uuid.UUID) (string, error) {
func (service *Service) LinkUpholdWallet(ctx context.Context, wallet uphold.Wallet, transaction string, _ *uuid.UUID) (string, error) {
const depositProvider = "uphold"
// do not confirm this transaction yet
info := wallet.GetWalletInfo()
Expand Down Expand Up @@ -669,7 +669,7 @@ func (service *Service) LinkUpholdWallet(ctx context.Context, wallet uphold.Wall

providerLinkingID := uuid.NewV5(ClaimNamespace, userID)
// tx.Destination will be stored as UserDepositDestination in the wallet info upon linking
err = service.Datastore.LinkWallet(ctx, info.ID, transactionInfo.Destination, providerLinkingID, anonymousAddress, depositProvider, country)
err = service.Datastore.LinkWallet(ctx, info.ID, transactionInfo.Destination, providerLinkingID, depositProvider, country)
if err != nil {
if errors.Is(err, ErrUnusualActivity) {
return "", handlers.WrapError(err, "unable to link - unusual activity", http.StatusBadRequest)
Expand Down

0 comments on commit d85d726

Please sign in to comment.