diff --git a/ingest/README.md b/ingest/README.md index e07dd64ec8..cf3d38f8da 100644 --- a/ingest/README.md +++ b/ingest/README.md @@ -23,13 +23,9 @@ From a high level, the ingestion library is broken down into a few modular compo [ Ledger Backend ] | - one of... | - --------|-----+------|----------| - | | | | - Captive Database Remote etc. - Core Captive - Core + Captive + Core ``` This is described in a little more detail in [`doc.go`](./doc.go), its accompanying examples, the documentation within this package, and the rest of this tutorial. @@ -37,13 +33,7 @@ This is described in a little more detail in [`doc.go`](./doc.go), its accompany # Hello, World! -As is tradition, we'll start with a simplistic example that ingests a single ledger from the network. We're immediately faced with a decision, though: _What's the backend?_ We'll use a **Captive Stellar-Core backend** in this example because it requires (little-to-)no setup, but there are couple of alternatives available. You could also use: - - - a **database** (via `NewDatabaseBackend()`), which would ingest ledgers stored in a Stellar-Core database, or - - - a **remote Captive Core** instance (via `NewRemoteCaptive()`), which works much like Captive Core, but points to an instance that isn't (necessarily) running locally. - -With that in mind, here's a minimalist example of the ingestion library: +As is tradition, we'll start with a simplistic example that ingests a single ledger from the network. We'll use the **Captive Stellar-Core backend** to ingest the ledger: ```go package main diff --git a/ingest/ledgerbackend/database_backend.go b/ingest/ledgerbackend/database_backend.go deleted file mode 100644 index 98333c679c..0000000000 --- a/ingest/ledgerbackend/database_backend.go +++ /dev/null @@ -1,271 +0,0 @@ -package ledgerbackend - -import ( - "context" - "database/sql" - "sort" - "time" - - "github.com/stellar/go/network" - "github.com/stellar/go/support/db" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" -) - -const ( - latestLedgerSeqQuery = "select ledgerseq, closetime from ledgerheaders order by ledgerseq desc limit 1" - txHistoryQuery = "select txbody, txresult, txmeta, txindex from txhistory where ledgerseq = ? " - ledgerHeaderQuery = "select ledgerhash, data from ledgerheaders where ledgerseq = ? " - ledgerSequenceAfterQuery = "select ledgerseq from ledgerheaders where ledgerseq > ? " - txFeeHistoryQuery = "select txchanges, txindex from txfeehistory where ledgerseq = ? " - upgradeHistoryQuery = "select ledgerseq, upgradeindex, upgrade, changes from upgradehistory where ledgerseq = ? order by upgradeindex asc" - orderBy = "order by txindex asc" - dbDriver = "postgres" -) - -// Ensure DatabaseBackend implements LedgerBackend -var _ LedgerBackend = (*DatabaseBackend)(nil) - -// DatabaseBackend implements a database data store. -type DatabaseBackend struct { - networkPassphrase string - session session -} - -func NewDatabaseBackend(dataSourceName, networkPassphrase string) (*DatabaseBackend, error) { - session, err := createSession(dataSourceName) - if err != nil { - return nil, err - } - - return NewDatabaseBackendFromSession(session, networkPassphrase) -} - -func NewDatabaseBackendFromSession(session db.SessionInterface, networkPassphrase string) (*DatabaseBackend, error) { - return &DatabaseBackend{ - session: session, - networkPassphrase: networkPassphrase, - }, nil -} - -func (dbb *DatabaseBackend) PrepareRange(ctx context.Context, ledgerRange Range) error { - _, err := dbb.GetLedger(ctx, ledgerRange.from) - if err != nil { - return errors.Wrap(err, "error getting ledger") - } - - if ledgerRange.bounded { - _, err := dbb.GetLedger(ctx, ledgerRange.to) - if err != nil { - return errors.Wrap(err, "error getting ledger") - } - } - - return nil -} - -// IsPrepared returns true if a given ledgerRange is prepared. -func (*DatabaseBackend) IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) { - return true, nil -} - -// GetLatestLedgerSequence returns the most recent ledger sequence number present in the database. -func (dbb *DatabaseBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { - var ledger []ledgerHeader - err := dbb.session.SelectRaw(ctx, &ledger, latestLedgerSeqQuery) - if err != nil { - return 0, errors.Wrap(err, "couldn't select ledger sequence") - } - if len(ledger) == 0 { - return 0, errors.New("no ledgers exist in ledgerheaders table") - } - - return ledger[0].LedgerSeq, nil -} - -func sortByHash(transactions []xdr.TransactionEnvelope, passphrase string) error { - hashes := make([]xdr.Hash, len(transactions)) - txByHash := map[xdr.Hash]xdr.TransactionEnvelope{} - for i, tx := range transactions { - hash, err := network.HashTransactionInEnvelope(tx, passphrase) - if err != nil { - return errors.Wrap(err, "cannot hash transaction") - } - hashes[i] = hash - txByHash[hash] = tx - } - - sort.Slice(hashes, func(i, j int) bool { - a := hashes[i] - b := hashes[j] - for k := range a { - if a[k] < b[k] { - return true - } - if a[k] > b[k] { - return false - } - } - return false - }) - - for i, hash := range hashes { - transactions[i] = txByHash[hash] - } - return nil -} - -// GetLedger will block until the ledger is -// available in the backend (even for UnaboundedRange). -// Please note that requesting a ledger sequence far after current ledger will -// block the execution for a long time. -func (dbb *DatabaseBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { - for { - exists, meta, err := dbb.getLedgerQuery(ctx, sequence) - if err != nil { - return xdr.LedgerCloseMeta{}, err - } - - if exists { - return meta, nil - } else { - // Check if there are ledgers after `sequence`. If so, it's likely - // the requested sequence was removed during maintenance so return - // error. - ledgerAfterExist, oldestSequenceAfter, err := dbb.getLedgerAfterExist(ctx, sequence) - if err != nil { - return xdr.LedgerCloseMeta{}, err - } - - if ledgerAfterExist { - return xdr.LedgerCloseMeta{}, errors.Errorf("requested ledger already removed (oldest sequence after %d is %d)", sequence, oldestSequenceAfter) - } - time.Sleep(time.Second) - } - } -} - -// getLedgerAfterExist returns true (and sequence number) if there's a ledger in -// the Stellar-Core DB with the sequence number higher than sequence. -func (dbb *DatabaseBackend) getLedgerAfterExist(ctx context.Context, sequence uint32) (bool, uint32, error) { - var fetchedSequence uint32 - err := dbb.session.GetRaw(ctx, &fetchedSequence, ledgerSequenceAfterQuery, sequence) - // Return errors... - if err != nil { - switch err { - case sql.ErrNoRows: - // Ledger was not found - return false, fetchedSequence, nil - default: - return false, fetchedSequence, errors.Wrapf(err, "Error getting ledger after %d", sequence) - } - } - - return true, fetchedSequence, nil -} - -// getLedgerQuery returns the LedgerCloseMeta for the given ledger sequence number. -// The first returned value is false when the ledger does not exist in the database. -func (dbb *DatabaseBackend) getLedgerQuery(ctx context.Context, sequence uint32) (bool, xdr.LedgerCloseMeta, error) { - lcm := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{}, - } - - // Query - ledgerheader - var lRow ledgerHeaderHistory - - err := dbb.session.GetRaw(ctx, &lRow, ledgerHeaderQuery, sequence) - // Return errors... - if err != nil { - switch err { - case sql.ErrNoRows: - // Ledger was not found - return false, xdr.LedgerCloseMeta{}, nil - default: - return false, xdr.LedgerCloseMeta{}, errors.Wrap(err, "Error getting ledger header") - } - } - - // ...otherwise store the header - lcm.V0.LedgerHeader = xdr.LedgerHeaderHistoryEntry{ - Hash: lRow.Hash, - Header: lRow.Header, - Ext: xdr.LedgerHeaderHistoryEntryExt{}, - } - - // Query - txhistory - var txhRows []txHistory - err = dbb.session.SelectRaw(ctx, &txhRows, txHistoryQuery+orderBy, sequence) - // Return errors... - if err != nil { - return false, lcm, errors.Wrap(err, "Error getting txHistory") - } - - // ...otherwise store the data - for i, tx := range txhRows { - // Sanity check index. Note that first TXIndex in a ledger is 1 - if i != int(tx.TXIndex)-1 { - return false, xdr.LedgerCloseMeta{}, errors.New("transactions read from DB history table are misordered") - } - - lcm.V0.TxSet.Txs = append(lcm.V0.TxSet.Txs, tx.TXBody) - lcm.V0.TxProcessing = append(lcm.V0.TxProcessing, xdr.TransactionResultMeta{ - Result: tx.TXResult, - TxApplyProcessing: tx.TXMeta, - }) - } - - if err = sortByHash(lcm.V0.TxSet.Txs, dbb.networkPassphrase); err != nil { - return false, xdr.LedgerCloseMeta{}, errors.Wrap(err, "could not sort txset") - } - - // Query - txfeehistory - var txfhRows []txFeeHistory - err = dbb.session.SelectRaw(ctx, &txfhRows, txFeeHistoryQuery+orderBy, sequence) - // Return errors... - if err != nil { - return false, lcm, errors.Wrap(err, "Error getting txFeeHistory") - } - - // ...otherwise store the data - for i, tx := range txfhRows { - // Sanity check index. Note that first TXIndex in a ledger is 1 - if i != int(tx.TXIndex)-1 { - return false, xdr.LedgerCloseMeta{}, errors.New("transactions read from DB fee history table are misordered") - } - lcm.V0.TxProcessing[i].FeeProcessing = tx.TXChanges - } - - // Query - upgradehistory - var upgradeHistoryRows []upgradeHistory - err = dbb.session.SelectRaw(ctx, &upgradeHistoryRows, upgradeHistoryQuery, sequence) - // Return errors... - if err != nil { - return false, lcm, errors.Wrap(err, "Error getting upgradeHistoryRows") - } - - // ...otherwise store the data - lcm.V0.UpgradesProcessing = make([]xdr.UpgradeEntryMeta, len(upgradeHistoryRows)) - for i, upgradeHistoryRow := range upgradeHistoryRows { - lcm.V0.UpgradesProcessing[i] = xdr.UpgradeEntryMeta{ - Upgrade: upgradeHistoryRow.Upgrade, - Changes: upgradeHistoryRow.Changes, - } - } - - return true, lcm, nil -} - -// CreateSession returns a new db.Session that connects to the given DB settings. -func createSession(dataSourceName string) (*db.Session, error) { - if dataSourceName == "" { - return nil, errors.New("missing DatabaseBackend.DataSourceName (e.g. \"postgres://stellar:postgres@localhost:8002/core\")") - } - - return db.Open(dbDriver, dataSourceName) -} - -// Close disconnects an active database session. -func (dbb *DatabaseBackend) Close() error { - return dbb.session.Close() -} diff --git a/ingest/ledgerbackend/hash_order_test.go b/ingest/ledgerbackend/hash_order_test.go deleted file mode 100644 index 537c9d5148..0000000000 --- a/ingest/ledgerbackend/hash_order_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package ledgerbackend - -import ( - "testing" - - "github.com/stellar/go/network" - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestHashOrder(t *testing.T) { - source := xdr.MustAddress("GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU") - account := source.ToMuxedAccount() - original := []xdr.TransactionEnvelope{ - { - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - SourceAccount: account, - SeqNum: 1, - }, - }, - }, - { - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - SourceAccount: account, - SeqNum: 2, - }, - }, - }, - { - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - SourceAccount: account, - SeqNum: 3, - }, - }, - }, - } - - require.NoError(t, sortByHash(original, network.TestNetworkPassphrase)) - hashes := map[int]xdr.Hash{} - - for i, tx := range original { - var err error - hashes[i], err = network.HashTransactionInEnvelope(tx, network.TestNetworkPassphrase) - if err != nil { - assert.NoError(t, err) - } - } - - for i := range original { - if i == 0 { - continue - } - prev := hashes[i-1] - cur := hashes[i] - for j := range prev { - if !assert.True(t, prev[j] < cur[j]) { - break - } else { - break - } - } - } -} diff --git a/ingest/ledgerbackend/ledger_backend.go b/ingest/ledgerbackend/ledger_backend.go index 572de2e183..eddc98fa05 100644 --- a/ingest/ledgerbackend/ledger_backend.go +++ b/ingest/ledgerbackend/ledger_backend.go @@ -21,60 +21,3 @@ type LedgerBackend interface { IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) Close() error } - -// session is the interface needed to access a persistent database session. -// TODO can't use this until we add Close() to the existing db.Session object -type session interface { - GetRaw(ctx context.Context, dest interface{}, query string, args ...interface{}) error - SelectRaw(ctx context.Context, dest interface{}, query string, args ...interface{}) error - Close() error -} - -// ledgerHeaderHistory is a helper struct used to unmarshall header fields from a stellar-core DB. -type ledgerHeaderHistory struct { - Hash xdr.Hash `db:"ledgerhash"` - Header xdr.LedgerHeader `db:"data"` -} - -// ledgerHeader holds a row of data from the stellar-core `ledgerheaders` table. -type ledgerHeader struct { - LedgerHash string `db:"ledgerhash"` - PrevHash string `db:"prevhash"` - BucketListHash string `db:"bucketlisthash"` - CloseTime int64 `db:"closetime"` - LedgerSeq uint32 `db:"ledgerseq"` - Data xdr.LedgerHeader `db:"data"` -} - -// txHistory holds a row of data from the stellar-core `txhistory` table. -type txHistory struct { - TXID string `db:"txid"` - LedgerSeq uint32 `db:"ledgerseq"` - TXIndex uint32 `db:"txindex"` - TXBody xdr.TransactionEnvelope `db:"txbody"` - TXResult xdr.TransactionResultPair `db:"txresult"` - TXMeta xdr.TransactionMeta `db:"txmeta"` -} - -// txFeeHistory holds a row of data from the stellar-core `txfeehistory` table. -type txFeeHistory struct { - TXID string `db:"txid"` - LedgerSeq uint32 `db:"ledgerseq"` - TXIndex uint32 `db:"txindex"` - TXChanges xdr.LedgerEntryChanges `db:"txchanges"` -} - -// scpHistory holds a row of data from the stellar-core `scphistory` table. -// type scpHistory struct { -// NodeID string `db:"nodeid"` -// LedgerSeq uint32 `db:"ledgerseq"` -// Envelope string `db:"envelope"` -// } - -// upgradeHistory holds a row of data from the stellar-core `upgradehistory` table. -type upgradeHistory struct { - LedgerSeq uint32 `db:"ledgerseq"` - UpgradeIndex uint32 `db:"upgradeindex"` - Upgrade xdr.LedgerUpgrade `db:"upgrade"` - Changes xdr.LedgerEntryChanges `db:"changes"` -} diff --git a/ingest/ledgerbackend/remote_captive_core.go b/ingest/ledgerbackend/remote_captive_core.go deleted file mode 100644 index 764114fa3b..0000000000 --- a/ingest/ledgerbackend/remote_captive_core.go +++ /dev/null @@ -1,262 +0,0 @@ -package ledgerbackend - -import ( - "bytes" - "context" - "encoding/json" - "io/ioutil" - "net/http" - "net/url" - "path" - "strconv" - "sync" - "time" - - "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" -) - -// PrepareRangeResponse describes the status of the pending PrepareRange operation. -type PrepareRangeResponse struct { - LedgerRange Range `json:"ledgerRange"` - StartTime time.Time `json:"startTime"` - Ready bool `json:"ready"` - ReadyDuration int `json:"readyDuration"` -} - -// LatestLedgerSequenceResponse is the response for the GetLatestLedgerSequence command. -type LatestLedgerSequenceResponse struct { - Sequence uint32 `json:"sequence"` -} - -// LedgerResponse is the response for the GetLedger command. -type LedgerResponse struct { - Ledger Base64Ledger `json:"ledger"` -} - -// Base64Ledger extends xdr.LedgerCloseMeta with JSON encoding and decoding -type Base64Ledger xdr.LedgerCloseMeta - -func (r *Base64Ledger) UnmarshalJSON(b []byte) error { - var base64 string - if err := json.Unmarshal(b, &base64); err != nil { - return err - } - - var parsed xdr.LedgerCloseMeta - if err := xdr.SafeUnmarshalBase64(base64, &parsed); err != nil { - return err - } - *r = Base64Ledger(parsed) - - return nil -} - -func (r Base64Ledger) MarshalJSON() ([]byte, error) { - base64, err := xdr.MarshalBase64(xdr.LedgerCloseMeta(r)) - if err != nil { - return nil, err - } - return json.Marshal(base64) -} - -// RemoteCaptiveStellarCore is an http client for interacting with a remote captive core server. -type RemoteCaptiveStellarCore struct { - url *url.URL - client *http.Client - lock *sync.Mutex - prepareRangePollInterval time.Duration -} - -// RemoteCaptiveOption values can be passed into NewRemoteCaptive to customize a RemoteCaptiveStellarCore instance. -type RemoteCaptiveOption func(c *RemoteCaptiveStellarCore) - -// PrepareRangePollInterval configures how often the captive core server will be polled when blocking -// on the PrepareRange operation. -func PrepareRangePollInterval(d time.Duration) RemoteCaptiveOption { - return func(c *RemoteCaptiveStellarCore) { - c.prepareRangePollInterval = d - } -} - -// NewRemoteCaptive returns a new RemoteCaptiveStellarCore instance. -// -// Only the captiveCoreURL parameter is required. -func NewRemoteCaptive(captiveCoreURL string, options ...RemoteCaptiveOption) (RemoteCaptiveStellarCore, error) { - u, err := url.Parse(captiveCoreURL) - if err != nil { - return RemoteCaptiveStellarCore{}, errors.Wrap(err, "unparseable url") - } - - client := RemoteCaptiveStellarCore{ - prepareRangePollInterval: time.Second, - url: u, - client: &http.Client{Timeout: 10 * time.Second}, - lock: &sync.Mutex{}, - } - for _, option := range options { - option(&client) - } - return client, nil -} - -func decodeResponse(response *http.Response, payload interface{}) error { - defer response.Body.Close() - - if response.StatusCode != http.StatusOK { - body, err := ioutil.ReadAll(response.Body) - if err != nil { - return errors.Wrap(err, "failed to read response body") - } - - return errors.New(string(body)) - } - - if err := json.NewDecoder(response.Body).Decode(payload); err != nil { - return errors.Wrap(err, "failed to decode json payload") - } - return nil -} - -// GetLatestLedgerSequence returns the sequence of the latest ledger available -// in the backend. This method returns an error if not in a session (start with -// PrepareRange). -// -// Note that for UnboundedRange the returned sequence number is not necessarily -// the latest sequence closed by the network. It's always the last value available -// in the backend. -func (c RemoteCaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (sequence uint32, err error) { - // TODO: Have a context on this request so we can cancel all outstanding - // requests, not just PrepareRange. - u := *c.url - u.Path = path.Join(u.Path, "latest-sequence") - request, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) - if err != nil { - return 0, errors.Wrap(err, "cannot construct http request") - } - - response, err := c.client.Do(request) - if err != nil { - return 0, errors.Wrap(err, "failed to execute request") - } - - var parsed LatestLedgerSequenceResponse - if err = decodeResponse(response, &parsed); err != nil { - return 0, err - } - - return parsed.Sequence, nil -} - -// Close cancels any pending PrepareRange requests. -func (c RemoteCaptiveStellarCore) Close() error { - return nil -} - -// PrepareRange prepares the given range (including from and to) to be loaded. -// Captive stellar-core backend needs to initalize Stellar-Core state to be -// able to stream ledgers. -// Stellar-Core mode depends on the provided ledgerRange: -// - For BoundedRange it will start Stellar-Core in catchup mode. -// - For UnboundedRange it will first catchup to starting ledger and then run -// it normally (including connecting to the Stellar network). -// -// Please note that using a BoundedRange, currently, requires a full-trust on -// history archive. This issue is being fixed in Stellar-Core. -func (c RemoteCaptiveStellarCore) PrepareRange(ctx context.Context, ledgerRange Range) error { - // TODO: removing createContext call here means we could technically have - // multiple prepareRange requests happening at the same time. Do we still - // need to enforce that? - - timer := time.NewTimer(c.prepareRangePollInterval) - defer timer.Stop() - - for { - ready, err := c.IsPrepared(ctx, ledgerRange) - if err != nil { - return err - } - if ready { - return nil - } - - select { - case <-ctx.Done(): - return errors.Wrap(ctx.Err(), "shutting down") - case <-timer.C: - timer.Reset(c.prepareRangePollInterval) - } - } -} - -// IsPrepared returns true if a given ledgerRange is prepared. -func (c RemoteCaptiveStellarCore) IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) { - // TODO: Have some way to cancel all outstanding requests, not just - // PrepareRange. - u := *c.url - u.Path = path.Join(u.Path, "prepare-range") - rangeBytes, err := json.Marshal(ledgerRange) - if err != nil { - return false, errors.Wrap(err, "cannot serialize range") - } - body := bytes.NewReader(rangeBytes) - request, err := http.NewRequestWithContext(ctx, "POST", u.String(), body) - if err != nil { - return false, errors.Wrap(err, "cannot construct http request") - } - request.Header.Add("Content-Type", "application/json; charset=utf-8") - - var response *http.Response - response, err = c.client.Do(request) - if err != nil { - return false, errors.Wrap(err, "failed to execute request") - } - - var parsed PrepareRangeResponse - if err = decodeResponse(response, &parsed); err != nil { - return false, err - } - - return parsed.Ready, nil -} - -// GetLedger long-polls a remote stellar core backend, until the requested -// ledger is ready. - -// Call PrepareRange first to instruct the backend which ledgers to fetch. -// -// Requesting a ledger on non-prepared backend will return an error. -// -// Because data is streamed from Stellar-Core ledger after ledger user should -// request sequences in a non-decreasing order. If the requested sequence number -// is less than the last requested sequence number, an error will be returned. -func (c RemoteCaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { - for { - // TODO: Have some way to cancel all outstanding requests, not just - // PrepareRange. - u := *c.url - u.Path = path.Join(u.Path, "ledger", strconv.FormatUint(uint64(sequence), 10)) - request, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) - if err != nil { - return xdr.LedgerCloseMeta{}, errors.Wrap(err, "cannot construct http request") - } - - response, err := c.client.Do(request) - if err != nil { - return xdr.LedgerCloseMeta{}, errors.Wrap(err, "failed to execute request") - } - - if response.StatusCode == http.StatusRequestTimeout { - response.Body.Close() - // This request timed out. Retry. - continue - } - - var parsed LedgerResponse - if err = decodeResponse(response, &parsed); err != nil { - return xdr.LedgerCloseMeta{}, err - } - - return xdr.LedgerCloseMeta(parsed.Ledger), nil - } -} diff --git a/ingest/ledgerbackend/remote_captive_core_test.go b/ingest/ledgerbackend/remote_captive_core_test.go deleted file mode 100644 index 393b981589..0000000000 --- a/ingest/ledgerbackend/remote_captive_core_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package ledgerbackend - -import ( - "context" - "encoding/json" - "net/http" - "net/http/httptest" - "sync/atomic" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/stellar/go/xdr" -) - -func TestGetLedgerSucceeds(t *testing.T) { - expectedLedger := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: 64, - }, - }, - }, - } - called := 0 - var encodeFailed int64 - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - called++ - if nil != json.NewEncoder(w).Encode(LedgerResponse{ - Ledger: Base64Ledger(expectedLedger), - }) { - atomic.AddInt64(&encodeFailed, 1) - } - })) - defer server.Close() - - client, err := NewRemoteCaptive(server.URL) - require.NoError(t, err) - - ledger, err := client.GetLedger(context.Background(), 64) - require.NoError(t, err) - require.Equal(t, 1, called) - require.Equal(t, expectedLedger, ledger) - require.Equal(t, int64(0), atomic.LoadInt64(&encodeFailed)) -} - -func TestGetLedgerTakesAWhile(t *testing.T) { - expectedLedger := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: 64, - }, - }, - }, - } - called := 0 - var encodeFailed int64 - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - called++ - if called == 1 { - // TODO: Check this is what the server really does. - w.WriteHeader(http.StatusRequestTimeout) - return - } - if nil != json.NewEncoder(w).Encode(LedgerResponse{ - Ledger: Base64Ledger(expectedLedger), - }) { - atomic.AddInt64(&encodeFailed, 1) - } - })) - defer server.Close() - - client, err := NewRemoteCaptive(server.URL) - require.NoError(t, err) - - ledger, err := client.GetLedger(context.Background(), 64) - require.NoError(t, err) - require.Equal(t, 2, called) - require.Equal(t, expectedLedger, ledger) - require.Equal(t, int64(0), atomic.LoadInt64(&encodeFailed)) -} diff --git a/services/horizon/internal/ingest/database_backend_test.go b/services/horizon/internal/ingest/database_backend_test.go deleted file mode 100644 index 69de728116..0000000000 --- a/services/horizon/internal/ingest/database_backend_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package ingest - -import ( - "testing" - - "github.com/stellar/go/ingest/ledgerbackend" - "github.com/stellar/go/network" - "github.com/stellar/go/services/horizon/internal/test" -) - -func TestGetLatestLedger(t *testing.T) { - tt := test.Start(t) - tt.ScenarioWithoutHorizon("base") - defer tt.Finish() - - backend, err := ledgerbackend.NewDatabaseBackendFromSession(tt.CoreSession(), network.TestNetworkPassphrase) - tt.Assert.NoError(err) - seq, err := backend.GetLatestLedgerSequence(tt.Ctx) - tt.Assert.NoError(err) - tt.Assert.Equal(uint32(3), seq) -} - -func TestGetLatestLedgerNotFound(t *testing.T) { - tt := test.Start(t) - tt.ScenarioWithoutHorizon("base") - defer tt.Finish() - - _, err := tt.CoreDB.Exec(`DELETE FROM ledgerheaders`) - tt.Assert.NoError(err, "failed to remove ledgerheaders") - - backend, err := ledgerbackend.NewDatabaseBackendFromSession(tt.CoreSession(), network.TestNetworkPassphrase) - tt.Assert.NoError(err) - _, err = backend.GetLatestLedgerSequence(tt.Ctx) - tt.Assert.EqualError(err, "no ledgers exist in ledgerheaders table") -}