diff --git a/es/balance_extractor.go b/es/balance_extractor.go index 2b334d1..3cfb28a 100644 --- a/es/balance_extractor.go +++ b/es/balance_extractor.go @@ -98,8 +98,6 @@ func (e *BalanceExtractor) created(change xdr.LedgerEntryChange) { func (e *BalanceExtractor) updated(change xdr.LedgerEntryChange) { updated := change.MustUpdated().Data - e.index++ - pagingToken := PagingToken{EffectIndex: e.index}.Merge(e.basePagingToken) switch x := updated.Type; x { case xdr.LedgerEntryTypeAccount: @@ -109,6 +107,8 @@ func (e *BalanceExtractor) updated(change xdr.LedgerEntryChange) { if oldBalance != account.Balance { diff := account.Balance - oldBalance + e.index++ + pagingToken := PagingToken{EffectIndex: e.index}.Merge(e.basePagingToken) e.balances = append( e.balances, @@ -122,6 +122,8 @@ func (e *BalanceExtractor) updated(change xdr.LedgerEntryChange) { if oldBalance != line.Balance { diff := line.Balance - oldBalance + e.index++ + pagingToken := PagingToken{EffectIndex: e.index}.Merge(e.basePagingToken) e.balances = append( e.balances, diff --git a/es/ledger_serializer.go b/es/ledger_serializer.go index 5f38e0b..1d5ca44 100644 --- a/es/ledger_serializer.go +++ b/es/ledger_serializer.go @@ -40,7 +40,7 @@ func (s *ledgerSerializer) serialize() { if transaction.Successful { changes := s.feeRows[transaction.Index-1].Changes - s.serializeBalances(changes, transaction, nil, BalanceSourceFee, FeeEffectPagingTokenGroup) + s.serializeBalances(changes, transaction, nil, BalanceSourceFee) } s.serializeOperations(transactionRow, transaction) @@ -48,6 +48,7 @@ func (s *ledgerSerializer) serialize() { } func (s *ledgerSerializer) serializeOperations(transactionRow db.TxHistoryRow, transaction *Transaction) { + effectsCount := 0 xdrs := transactionRow.Operations() for index, xdr := range xdrs { @@ -58,10 +59,10 @@ func (s *ledgerSerializer) serializeOperations(transactionRow db.TxHistoryRow, t if transaction.Successful { metas := transactionRow.MetasFor(index) if metas != nil { - s.serializeBalances(metas.Changes, transaction, operation, BalanceSourceMeta, BalanceEffectPagingTokenGroup) + effectsCount = s.serializeBalances(metas.Changes, transaction, operation, BalanceSourceMeta) } - s.serializeTrades(result, transaction, operation) + s.serializeTrades(result, transaction, operation, effectsCount) h := ProduceSignerHistory(operation) if h != nil { @@ -71,39 +72,44 @@ func (s *ledgerSerializer) serializeOperations(transactionRow db.TxHistoryRow, t } } -func (s *ledgerSerializer) serializeBalances(changes xdr.LedgerEntryChanges, transaction *Transaction, operation *Operation, source BalanceSource, effectGroup int) { - if len(changes) > 0 { - pagingToken := PagingToken{ - LedgerSeq: s.ledger.Seq, - TransactionOrder: transaction.Index, - EffectGroup: effectGroup, - } +func (s *ledgerSerializer) serializeBalances(changes xdr.LedgerEntryChanges, transaction *Transaction, operation *Operation, source BalanceSource) int { + if len(changes) == 0 { + return 0 + } - if operation != nil { - pagingToken.OperationOrder = operation.Index - } + pagingToken := PagingToken{ + LedgerSeq: s.ledger.Seq, + TransactionOrder: transaction.Index, + } - balances := ProduceBalances(changes, s.ledger.CloseTime, source, pagingToken) + if operation != nil { + pagingToken.OperationOrder = operation.Index + } - if len(balances) > 0 { - for _, balance := range balances { - SerializeForBulk(balance, s.buffer) - } + balances := ProduceBalances(changes, s.ledger.CloseTime, source, pagingToken) + + if len(balances) > 0 { + for _, balance := range balances { + SerializeForBulk(balance, s.buffer) } } + + return len(balances) } -func (s *ledgerSerializer) serializeTrades(result *xdr.OperationResult, transaction *Transaction, operation *Operation) { +func (s *ledgerSerializer) serializeTrades(result *xdr.OperationResult, transaction *Transaction, operation *Operation, startIndex int) int { pagingToken := PagingToken{ LedgerSeq: s.ledger.Seq, TransactionOrder: transaction.Index, OperationOrder: operation.Index, } - trades := ProduceTrades(result, operation, s.ledger.CloseTime, pagingToken) + trades := ProduceTrades(result, operation, s.ledger.CloseTime, pagingToken, startIndex) if len(trades) > 0 { for _, trade := range trades { SerializeForBulk(&trade, s.buffer) } } + + return len(trades) } diff --git a/es/paging_token.go b/es/paging_token.go index b264985..49fde30 100644 --- a/es/paging_token.go +++ b/es/paging_token.go @@ -11,7 +11,6 @@ type PagingToken struct { LedgerSeq int TransactionOrder int OperationOrder int - EffectGroup int EffectIndex int } @@ -19,20 +18,7 @@ var ( ledgerFormat = "%012d" transactionFormat = "%04d" operationFormat = "%04d" - effectGroupFormat = "%04d" effectIndexFormat = "%04d" - - // BalanceEffectPagingTokenGroup represents balance entry taken from result meta - BalanceEffectPagingTokenGroup = 1 - - // FeeEffectPagingTokenGroup represents balance entry taken from fee meta - FeeEffectPagingTokenGroup = 2 - - // TradeEffectPagingTokenGroup represent trade effects - TradeEffectPagingTokenGroup = 3 - - // SignerHistoryEffectPagingTokenGroup represent signer effects - SignerHistoryEffectPagingTokenGroup = 4 ) // String returns string representation of order @@ -40,7 +26,6 @@ func (o PagingToken) String() (result string) { return fmt.Sprintf(ledgerFormat, o.LedgerSeq) + "-" + fmt.Sprintf(transactionFormat, o.TransactionOrder) + "-" + fmt.Sprintf(operationFormat, o.OperationOrder) + "-" + - fmt.Sprintf(effectGroupFormat, o.EffectGroup) + "-" + fmt.Sprintf(effectIndexFormat, o.EffectIndex) } @@ -69,12 +54,6 @@ func (o PagingToken) Merge(n PagingToken) (result PagingToken) { result.OperationOrder = n.OperationOrder } - if o.EffectGroup != 0 { - result.EffectGroup = o.EffectGroup - } else { - result.EffectGroup = n.EffectGroup - } - if o.EffectIndex != 0 { result.EffectIndex = o.EffectIndex } else { diff --git a/es/serialize.go b/es/serialize.go index 03386a6..99929e4 100644 --- a/es/serialize.go +++ b/es/serialize.go @@ -15,14 +15,14 @@ func SerializeForBulk(obj Indexable, b *bytes.Buffer) { if id != nil { meta = fmt.Sprintf( - `{ "index": { "_index": "%s", "_id": "%s", "_type": "_doc" } }%s`, + `{ "create": { "_index": "%s", "_id": "%s", "_type": "_doc" } }%s`, obj.IndexName(), *id, "\n", ) } else { meta = fmt.Sprintf( - `{ "index": { "_index": "%s", "_type": "_doc" } }%s`, obj.IndexName(), "\n", + `{ "create": { "_index": "%s", "_type": "_doc" } }%s`, obj.IndexName(), "\n", ) } diff --git a/es/signer_history.go b/es/signer_history.go index 5cea8a8..0093b2e 100644 --- a/es/signer_history.go +++ b/es/signer_history.go @@ -29,7 +29,6 @@ func ProduceSignerHistory(o *Operation) (h *SignerHistory) { LedgerSeq: o.Seq, TransactionOrder: o.TxIndex, OperationOrder: o.Index, - EffectGroup: SignerHistoryEffectPagingTokenGroup, } entry := &SignerHistory{ diff --git a/es/trade_extractor.go b/es/trade_extractor.go index 784444a..eb7e0b4 100644 --- a/es/trade_extractor.go +++ b/es/trade_extractor.go @@ -18,12 +18,13 @@ type TradeExtractor struct { } // ProduceTrades returns trades -func ProduceTrades(r *xdr.OperationResult, op *Operation, closeTime time.Time, pagingToken PagingToken) (trades []Trade) { +func ProduceTrades(r *xdr.OperationResult, op *Operation, closeTime time.Time, pagingToken PagingToken, startIndex int) (trades []Trade) { extractor := &TradeExtractor{ result: r, closeTime: closeTime, pagingToken: pagingToken, operation: op, + tokenIndex: startIndex, } if extractor == nil { @@ -130,7 +131,7 @@ func (e *TradeExtractor) fetchFromPathPaymentStrictSend(result xdr.PathPaymentSt func (e *TradeExtractor) fetchClaims(claims []xdr.ClaimOfferAtom, accountID string) (trades []Trade) { for _, claim := range claims { - pagingTokenA := PagingToken{EffectGroup: TradeEffectPagingTokenGroup, EffectIndex: e.tokenIndex + 1}.Merge(e.pagingToken) + pagingTokenA := PagingToken{EffectIndex: e.tokenIndex + 1}.Merge(e.pagingToken) tradeA := Trade{ PagingToken: pagingTokenA, @@ -138,7 +139,7 @@ func (e *TradeExtractor) fetchClaims(claims []xdr.ClaimOfferAtom, accountID stri LedgerCloseTime: e.closeTime, } - pagingTokenB := PagingToken{EffectGroup: TradeEffectPagingTokenGroup, EffectIndex: e.tokenIndex + 2}.Merge(e.pagingToken) + pagingTokenB := PagingToken{EffectIndex: e.tokenIndex + 2}.Merge(e.pagingToken) tradeB := Trade{ PagingToken: pagingTokenB,