Skip to content

Commit

Permalink
ingest: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-burghardt committed May 22, 2024
1 parent a69ca69 commit fb6fe06
Show file tree
Hide file tree
Showing 13 changed files with 615 additions and 28 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Visual Studio Code
.vscode
.vscode
captive-core*/
101 changes: 101 additions & 0 deletions cmd/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package cmd

import (
"go/types"

_ "github.com/lib/pq"
"github.com/spf13/cobra"
"github.com/stellar/go/network"
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/log"
"github.com/stellar/wallet-backend/internal/ingest"
)

type ingestCmd struct{}

func (c *ingestCmd) Command() *cobra.Command {
cfg := ingest.Configs{}
cfgOpts := config.ConfigOptions{
{
Name: "database-url",
Usage: "Database connection URL.",
OptType: types.String,
ConfigKey: &cfg.DatabaseURL,
FlagDefault: "postgres://postgres@localhost:5432/wallet-backend?sslmode=disable",
Required: true,
},
{
Name: "network-passphrase",
Usage: "Stellar Network Passphrase to connect.",
OptType: types.String,
ConfigKey: &cfg.NetworkPassphrase,
FlagDefault: network.TestNetworkPassphrase,
Required: true,
},
{
Name: "captive-core-bin-path",
Usage: "Path to Captive Core's binary file.",
OptType: types.String,
ConfigKey: &cfg.CaptiveCoreBinPath,
FlagDefault: "/usr/local/bin/stellar-core",
Required: true,
},
{
Name: "captive-core-config-dir",
Usage: "Path to Captive Core's configuration files directory.",
OptType: types.String,
ConfigKey: &cfg.CaptiveCoreConfigDir,
FlagDefault: "./internal/ingest/config",
Required: true,
},
{
Name: "ledger-cursor-name",
Usage: "Name of last synced ledger cursor. Attention: there should never be more than one container running with a same cursor name.",
OptType: types.String,
ConfigKey: &cfg.LedgerCursorName,
FlagDefault: "last_synced_ledger",
Required: false,
},
{
Name: "start",
Usage: "Ledger number from which ingestion should start. When not present, ingestion will resume from last synced ledger.",
OptType: types.Int,
ConfigKey: &cfg.StartLedger,
FlagDefault: 0,
Required: false,
},
{
Name: "end",
Usage: "Ledger number up to which ingestion should run. When not present, ingestion run indefinitely (live ingestion requires it to be empty).",
OptType: types.Int,
ConfigKey: &cfg.EndLedger,
FlagDefault: 0,
Required: false,
},
}

cmd := &cobra.Command{
Use: "ingest",
Short: "Run Ingestion service",
PersistentPreRun: func(_ *cobra.Command, _ []string) {
cfgOpts.Require()
if err := cfgOpts.SetValues(); err != nil {
log.Fatalf("Error setting values of config options: %s", err.Error())
}
},
Run: func(_ *cobra.Command, _ []string) {
c.Run(cfg)
},
}
if err := cfgOpts.Init(cmd); err != nil {
log.Fatalf("Error initializing a config option: %s", err.Error())
}
return cmd
}

func (c *ingestCmd) Run(cfg ingest.Configs) {
err := ingest.Ingest(cfg)
if err != nil {
log.Fatalf("Error running Ingest: %s", err.Error())
}
}
12 changes: 5 additions & 7 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package cmd

import (
"log"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/log"
)

// rootCmd represents the base command when called without any subcommands
Expand All @@ -30,9 +28,9 @@ func Execute() {
}

func init() {
logger := supportlog.New()
logger.SetLevel(logrus.TraceLevel)
log.DefaultLogger = log.New()
log.DefaultLogger.SetLevel(logrus.TraceLevel)

rootCmd.AddCommand((&serveCmd{Logger: logger}).Command())
rootCmd.AddCommand((&ingestCmd{Logger: logger}).Command())
rootCmd.AddCommand((&serveCmd{}).Command())
rootCmd.AddCommand((&ingestCmd{}).Command())
}
16 changes: 6 additions & 10 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,14 @@ import (
_ "github.com/lib/pq"
"github.com/spf13/cobra"
"github.com/stellar/go/support/config"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/log"
"github.com/stellar/wallet-backend/internal/serve"
)

type serveCmd struct {
Logger *supportlog.Entry
}
type serveCmd struct{}

func (c *serveCmd) Command() *cobra.Command {
cfg := serve.Configs{
Logger: c.Logger,
}
cfg := serve.Configs{}
cfgOpts := config.ConfigOptions{
{
Name: "port",
Expand All @@ -42,20 +38,20 @@ func (c *serveCmd) Command() *cobra.Command {
Run: func(_ *cobra.Command, _ []string) {
cfgOpts.Require()
if err := cfgOpts.SetValues(); err != nil {
c.Logger.Fatalf("Error setting values of config options: %s", err.Error())
log.Fatalf("Error setting values of config options: %s", err.Error())
}
c.Run(cfg)
},
}
if err := cfgOpts.Init(cmd); err != nil {
c.Logger.Fatalf("Error initializing a config option: %s", err.Error())
log.Fatalf("Error initializing a config option: %s", err.Error())
}
return cmd
}

func (c *serveCmd) Run(cfg serve.Configs) {
err := serve.Serve(cfg)
if err != nil {
c.Logger.Fatalf("Error running Serve: %s", err.Error())
log.Fatalf("Error running Serve: %s", err.Error())
}
}
6 changes: 3 additions & 3 deletions internal/data/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Payment struct {

func (m *PaymentModel) GetLatestLedgerSynced(ctx context.Context, cursorName string) (uint32, error) {
var lastSyncedLedger uint32
err := m.db.QueryRowxContext(ctx, `SELECT value FROM ingestion_store WHERE key = $1`, cursorName).Scan(&lastSyncedLedger)
err := m.db.QueryRowxContext(ctx, `SELECT value FROM ingest_store WHERE key = $1`, cursorName).Scan(&lastSyncedLedger)
// First run, key does not exist yet
if err == sql.ErrNoRows {
return 0, nil
Expand All @@ -49,7 +49,7 @@ func (m *PaymentModel) GetLatestLedgerSynced(ctx context.Context, cursorName str

func (m *PaymentModel) UpdateLatestLedgerSynced(ctx context.Context, cursorName string, ledger uint32) error {
const query = `
INSERT INTO ingestion_store (key, value) VALUES ($1, $2)
INSERT INTO ingest_store (key, value) VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET value = excluded.value
`
_, err := m.db.ExecContext(ctx, query, cursorName, ledger)
Expand All @@ -71,7 +71,7 @@ func (m *PaymentModel) BeginTx(ctx context.Context) (*sqlx.Tx, error) {

func (m *PaymentModel) AddPayment(ctx context.Context, tx *sqlx.Tx, payment Payment) error {
const query = `
INSERT INTO ingestion_payments (
INSERT INTO ingest_payments (
operation_id, operation_type, transaction_id, transaction_hash, from_address, to_address, src_asset_code, src_asset_issuer, src_amount,
dest_asset_code, dest_asset_issuer, dest_amount, created_at, memo
)
Expand Down
34 changes: 34 additions & 0 deletions internal/db/migrations/2024-05-22.0-ingest_payments.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- +migrate Up

CREATE TABLE ingest_store (
key varchar(255) NOT NULL,
value varchar(255) NOT NULL,
PRIMARY KEY (key)
);

CREATE TABLE ingest_payments (
operation_id bigint NOT NULL,
operation_type text NOT NULL,
transaction_id bigint NOT NULL,
transaction_hash text NOT NULL,
from_address text NOT NULL,
to_address text NOT NULL,
src_asset_code text NOT NULL,
src_asset_issuer text NOT NULL,
src_amount bigint NOT NULL,
dest_asset_code text NOT NULL,
dest_asset_issuer text NOT NULL,
dest_amount bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
memo text,
PRIMARY KEY (operation_id)
);

CREATE INDEX from_address_idx ON ingest_payments (from_address);
CREATE INDEX to_address_idx ON ingest_payments (to_address);

-- +migrate Down

DROP TABLE ingest_payments;

DROP TABLE ingest_store;
25 changes: 25 additions & 0 deletions internal/ingest/config/stellar-core_pubnet.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Stellar Pubnet validators
[[HOME_DOMAINS]]
HOME_DOMAIN="www.stellar.org"
QUALITY="HIGH"

[[VALIDATORS]]
NAME="SDF 1"
PUBLIC_KEY="GCGB2S2KGYARPVIA37HYZXVRM2YZUEXA6S33ZU5BUDC6THSB62LZSTYH"
ADDRESS="core-live-a.stellar.org:11625"
HISTORY="curl -sf http://history.stellar.org/prd/core-live/core_live_001/{0} -o {1}"
HOME_DOMAIN="www.stellar.org"

[[VALIDATORS]]
NAME="SDF 2"
PUBLIC_KEY="GCM6QMP3DLRPTAZW2UZPCPX2LF3SXWXKPMP3GKFZBDSF3QZGV2G5QSTK"
ADDRESS="core-live-b.stellar.org:11625"
HISTORY="curl -sf http://history.stellar.org/prd/core-live/core_live_002/{0} -o {1}"
HOME_DOMAIN="www.stellar.org"

[[VALIDATORS]]
NAME="SDF 3"
PUBLIC_KEY="GABMKJM6I25XI4K7U6XWMULOUQIQ27BCTMLS6BYYSOWKTBUXVRJSXHYQ"
ADDRESS="core-live-c.stellar.org:11625"
HISTORY="curl -sf http://history.stellar.org/prd/core-live/core_live_003/{0} -o {1}"
HOME_DOMAIN="www.stellar.org"
25 changes: 25 additions & 0 deletions internal/ingest/config/stellar-core_testnet.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Stellar Testnet validators
[[HOME_DOMAINS]]
HOME_DOMAIN="testnet.stellar.org"
QUALITY="HIGH"

[[VALIDATORS]]
NAME="sdftest1"
HOME_DOMAIN="testnet.stellar.org"
PUBLIC_KEY="GDKXE2OZMJIPOSLNA6N6F2BVCI3O777I2OOC4BV7VOYUEHYX7RTRYA7Y"
ADDRESS="core-testnet1.stellar.org"
HISTORY="curl -sf http://history.stellar.org/prd/core-testnet/core_testnet_001/{0} -o {1}"

[[VALIDATORS]]
NAME="sdftest2"
HOME_DOMAIN="testnet.stellar.org"
PUBLIC_KEY="GCUCJTIYXSOXKBSNFGNFWW5MUQ54HKRPGJUTQFJ5RQXZXNOLNXYDHRAP"
ADDRESS="core-testnet2.stellar.org"
HISTORY="curl -sf http://history.stellar.org/prd/core-testnet/core_testnet_002/{0} -o {1}"

[[VALIDATORS]]
NAME="sdftest3"
HOME_DOMAIN="testnet.stellar.org"
PUBLIC_KEY="GC2V2EFSXN6SQTWVYA5EPJPBWWIMSD2XQNKUOHGEKB535AQE2I6IXV2Z"
ADDRESS="core-testnet3.stellar.org"
HISTORY="curl -sf http://history.stellar.org/prd/core-testnet/core_testnet_003/{0} -o {1}"
Loading

0 comments on commit fb6fe06

Please sign in to comment.