Skip to content

Commit

Permalink
feat(payments): Atlar connector: Add more bank details to Accounts (i… (
Browse files Browse the repository at this point in the history
#1736)

Co-authored-by: Lukas Wagner <[email protected]>
  • Loading branch information
paul-nicolas and lwagner-getmomo authored Nov 4, 2024
1 parent f9e565c commit f54c2c1
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func ExternalAccountFromAtlarData(
}, nil
}

func ExtractAccountMetadata(account *atlar_models.Account) metadata.Metadata {
func ExtractAccountMetadata(account *atlar_models.Account, bank *atlar_models.ThirdParty) metadata.Metadata {
result := metadata.Metadata{}
result = result.Merge(ComputeAccountMetadataBool("fictive", account.Fictive))
result = result.Merge(ComputeAccountMetadata("bank/id", account.Bank.ID))
result = result.Merge(ComputeAccountMetadata("bank/name", account.Bank.Name))
result = result.Merge(ComputeAccountMetadata("bank/id", bank.ID))
result = result.Merge(ComputeAccountMetadata("bank/name", bank.Name))
result = result.Merge(ComputeAccountMetadata("bank/bic", account.Bank.Bic))
result = result.Merge(IdentifiersToMetadata(account.Identifiers))
result = result.Merge(ComputeAccountMetadata("alias", account.Alias))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ import (
"github.com/get-momo/atlar-v1-go-client/client/accounts"
)

func (c *Client) GetV1AccountsID(ctx context.Context, id string) (*accounts.GetV1AccountsIDOK, error) {
f := connectors.ClientMetrics(ctx, "atlar", "list_accounts")
now := time.Now()
defer f(ctx, now)

accountsParams := accounts.GetV1AccountsIDParams{
Context: ctx,
ID: id,
}

return c.client.Accounts.GetV1AccountsID(&accountsParams)
}

func (c *Client) GetV1Accounts(ctx context.Context, token string, pageSize int64) (*accounts.GetV1AccountsOK, error) {
f := connectors.ClientMetrics(ctx, "atlar", "list_accounts")
now := time.Now()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package client

import (
"context"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/get-momo/atlar-v1-go-client/client/third_parties"
)

func (c *Client) GetV1BetaThirdPartiesID(ctx context.Context, id string) (*third_parties.GetV1betaThirdPartiesIDOK, error) {
f := connectors.ClientMetrics(ctx, "atlar", "list_third_parties")
now := time.Now()
defer f(ctx, now)

params := third_parties.GetV1betaThirdPartiesIDParams{
Context: ctx,
ID: id,
}

return c.client.ThirdParties.GetV1betaThirdPartiesID(&params)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ import "github.com/formancehq/payments/cmd/connectors/internal/connectors/curren
var (
supportedCurrenciesWithDecimal = map[string]int{
"EUR": currency.ISO4217Currencies["EUR"], // Euro
"DKK": currency.ISO4217Currencies["DKK"],
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func FetchAccountsTask(config Config, client *client.Client) task.Task {

token = pagedAccounts.Payload.NextToken

if err := ingestAccountsBatch(ctx, connectorID, ingester, pagedAccounts); err != nil {
if err := ingestAccountsBatch(ctx, connectorID, taskID, ingester, pagedAccounts, client); err != nil {
otel.RecordError(span, err)
return err
}
Expand Down Expand Up @@ -107,9 +107,19 @@ func FetchAccountsTask(config Config, client *client.Client) task.Task {
func ingestAccountsBatch(
ctx context.Context,
connectorID models.ConnectorID,
taskID models.TaskID,
ingester ingestion.Ingester,
pagedAccounts *accounts.GetV1AccountsOK,
client *client.Client,
) error {
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskFetchAccounts.ingestAccountsBatch",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

accountsBatch := ingestion.AccountBatch{}
balanceBatch := ingestion.BalanceBatch{}

Expand All @@ -124,6 +134,14 @@ func ingestAccountsBatch(
return fmt.Errorf("failed to parse opening date: %w", err)
}

requestCtx, cancel := contextutil.DetachedWithTimeout(ctx, 30*time.Second)
defer cancel()
thirdPartyResponse, err := client.GetV1BetaThirdPartiesID(requestCtx, account.ThirdPartyID)
if err != nil {
otel.RecordError(span, err)
return err
}

accountsBatch = append(accountsBatch, &models.Account{
ID: models.AccountID{
Reference: *account.ID,
Expand All @@ -135,7 +153,7 @@ func ingestAccountsBatch(
DefaultAsset: currency.FormatAsset(supportedCurrenciesWithDecimal, account.Currency),
AccountName: account.Name,
Type: models.AccountTypeInternal,
Metadata: ExtractAccountMetadata(account),
Metadata: ExtractAccountMetadata(account, thirdPartyResponse.Payload),
RawData: raw,
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func FetchTransactionsTask(config Config, client *client.Client) task.Task {

token = pagedTransactions.Payload.NextToken

if err := ingestPaymentsBatch(ctx, connectorID, ingester, pagedTransactions); err != nil {
if err := ingestPaymentsBatch(ctx, connectorID, taskID, ingester, client, pagedTransactions); err != nil {
otel.RecordError(span, err)
return err
}
Expand All @@ -66,13 +66,15 @@ func FetchTransactionsTask(config Config, client *client.Client) task.Task {
func ingestPaymentsBatch(
ctx context.Context,
connectorID models.ConnectorID,
taskID models.TaskID,
ingester ingestion.Ingester,
client *client.Client,
pagedTransactions *transactions.GetV1TransactionsOK,
) error {
batch := ingestion.PaymentBatch{}

for _, item := range pagedTransactions.Payload.Items {
batchElement, err := atlarTransactionToPaymentBatchElement(connectorID, item)
batchElement, err := atlarTransactionToPaymentBatchElement(ctx, connectorID, taskID, item, client)
if err != nil {
return err
}
Expand All @@ -91,9 +93,20 @@ func ingestPaymentsBatch(
}

func atlarTransactionToPaymentBatchElement(
ctx context.Context,
connectorID models.ConnectorID,
taskID models.TaskID,
transaction *atlar_models.Transaction,
client *client.Client,
) (*ingestion.PaymentBatchElement, error) {
ctx, span := connectors.StartSpan(
ctx,
"atlar.atlarTransactionToPaymentBatchElement",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
)
defer span.End()

if _, ok := supportedCurrenciesWithDecimal[*transaction.Amount.Currency]; !ok {
// Discard transactions with unsupported currencies
return nil, nil
Expand All @@ -117,6 +130,22 @@ func atlarTransactionToPaymentBatchElement(
return nil, err
}

requestCtx, cancel := contextutil.DetachedWithTimeout(ctx, 30*time.Second)
defer cancel()
accountResponse, err := client.GetV1AccountsID(requestCtx, *transaction.Account.ID)
if err != nil {
otel.RecordError(span, err)
return nil, err
}

requestCtx, cancel = contextutil.DetachedWithTimeout(ctx, 30*time.Second)
defer cancel()
thirdPartyResponse, err := client.GetV1BetaThirdPartiesID(requestCtx, *&accountResponse.Payload.ThirdPartyID)
if err != nil {
otel.RecordError(span, err)
return nil, err
}

paymentId := models.PaymentID{
PaymentReference: models.PaymentReference{
Reference: transaction.ID,
Expand All @@ -137,7 +166,7 @@ func atlarTransactionToPaymentBatchElement(
Amount: amount,
InitialAmount: amount,
Asset: currency.FormatAsset(supportedCurrenciesWithDecimal, *transaction.Amount.Currency),
Metadata: ExtractPaymentMetadata(paymentId, transaction),
Metadata: ExtractPaymentMetadata(paymentId, transaction, accountResponse.Payload, thirdPartyResponse.Payload),
RawData: raw,
},
}
Expand Down Expand Up @@ -197,7 +226,7 @@ func determinePaymentScheme(item *atlar_models.Transaction) models.PaymentScheme
return models.PaymentSchemeSepa
}

func ExtractPaymentMetadata(paymentId models.PaymentID, transaction *atlar_models.Transaction) []*models.PaymentMetadata {
func ExtractPaymentMetadata(paymentId models.PaymentID, transaction *atlar_models.Transaction, account *atlar_models.Account, bank *atlar_models.ThirdParty) []*models.PaymentMetadata {
result := []*models.PaymentMetadata{}
if transaction.Date != "" {
result = append(result, ComputePaymentMetadata(paymentId, "date", transaction.Date))
Expand All @@ -207,6 +236,9 @@ func ExtractPaymentMetadata(paymentId models.PaymentID, transaction *atlar_model
}
result = append(result, ComputePaymentMetadata(paymentId, "remittanceInformation/type", *transaction.RemittanceInformation.Type))
result = append(result, ComputePaymentMetadata(paymentId, "remittanceInformation/value", *transaction.RemittanceInformation.Value))
result = append(result, ComputePaymentMetadata(paymentId, "bank/id", bank.ID))
result = append(result, ComputePaymentMetadata(paymentId, "bank/name", bank.Name))
result = append(result, ComputePaymentMetadata(paymentId, "bank/bic", account.Bank.Bic))
result = append(result, ComputePaymentMetadata(paymentId, "btc/domain", transaction.Characteristics.BankTransactionCode.Domain))
result = append(result, ComputePaymentMetadata(paymentId, "btc/family", transaction.Characteristics.BankTransactionCode.Family))
result = append(result, ComputePaymentMetadata(paymentId, "btc/subfamily", transaction.Characteristics.BankTransactionCode.Subfamily))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func UpdatePaymentStatusTask(
err = ingestAtlarTransaction(ctx,
ingester,
connectorID,
taskID,
client,
getCreditTransferResponse.Payload.Reconciliation.BookedTransactionID,
)
Expand Down Expand Up @@ -356,18 +357,30 @@ func ingestAtlarTransaction(
ctx context.Context,
ingester ingestion.Ingester,
connectorID models.ConnectorID,
taskID models.TaskID,
client *client.Client,
transactionId string,
) error {

transactionResponse, err := client.GetV1TransactionsID(ctx, transactionId)
ctx, span := connectors.StartSpan(
ctx,
"atlar.taskUpdatePaymentStatus.ingestAtlarTransaction",
attribute.String("connectorID", connectorID.String()),
attribute.String("taskID", taskID.String()),
attribute.String("transactionID", transactionId),
)
defer span.End()

requestCtx, cancel := contextutil.DetachedWithTimeout(ctx, 30*time.Second)
defer cancel()
transactionResponse, err := client.GetV1TransactionsID(requestCtx, transactionId)
if err != nil {
otel.RecordError(span, err)
return fmt.Errorf("failed to get atlar transaction: %w, %w", err, task.ErrRetryable)

}

batchElement, err := atlarTransactionToPaymentBatchElement(connectorID, transactionResponse.Payload)
batchElement, err := atlarTransactionToPaymentBatchElement(ctx, connectorID, taskID, transactionResponse.Payload, client)
if err != nil {
otel.RecordError(span, err)
return err
}
if batchElement == nil {
Expand All @@ -378,6 +391,7 @@ func ingestAtlarTransaction(

err = ingester.IngestPayments(ctx, batch)
if err != nil {
otel.RecordError(span, err)
return err
}

Expand Down

0 comments on commit f54c2c1

Please sign in to comment.