Skip to content

Commit

Permalink
Trades: Abstract and simplify trade processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Dec 20, 2024
1 parent 50448ec commit c37c010
Show file tree
Hide file tree
Showing 67 changed files with 247 additions and 295 deletions.
2 changes: 1 addition & 1 deletion backtester/data/kline/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func LoadData(ctx context.Context, dataType int64, startDate, endDate time.Time,
return nil, fmt.Errorf("could not retrieve candle data for %v %v %v, %v", exch.GetName(), a, fPair, err)
}
case common.DataTrade:
var trades []trade.Data
var trades []trade.Trade
trades, err = exch.GetHistoricTrades(ctx,
fPair,
a,
Expand Down
4 changes: 2 additions & 2 deletions backtester/data/kline/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func LoadData(dataType int64, filepath, exchangeName string, interval time.Durat
}
resp.Item = &candles
case common.DataTrade:
var trades []trade.Data
var trades []trade.Trade
for {
row, errCSV := csvData.Read()
if errCSV != nil {
Expand All @@ -115,7 +115,7 @@ func LoadData(dataType int64, filepath, exchangeName string, interval time.Durat
return nil, errCSV
}

t := trade.Data{}
t := trade.Trade{}
v, errParse := strconv.ParseInt(row[0], 10, 32)
if errParse != nil {
return nil, errParse
Expand Down
2 changes: 1 addition & 1 deletion backtester/data/kline/live/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func LoadData(ctx context.Context, timeToRetrieve time.Time, exch exchange.IBotE
return nil, fmt.Errorf("could not retrieve live candle data for %v %v %v, %v", exch.GetName(), a, currencyPair, err)
}
case common.DataTrade:
var trades []trade.Data
var trades []trade.Trade
trades, err = exch.GetHistoricTrades(ctx,
pFmt,
a,
Expand Down
6 changes: 3 additions & 3 deletions cmd/exchange_wrapper_issues/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func main() {
wrapperConfig.OrderSubmission.OrderType = orderTypeOverride
}
if orderSideOverride != "BUY" {
wrapperConfig.OrderSubmission.OrderSide = orderSideOverride
wrapperConfig.OrderSubmission.OrderSide = orderSideOverrideDefaultSaveInterval
}
if orderPriceOverride > 0 {
wrapperConfig.OrderSubmission.Price = orderPriceOverride
Expand Down Expand Up @@ -432,7 +432,7 @@ func testWrappers(e exchange.IBotExchange, base *exchange.Base, config *Config)
Response: jsonifyInterface([]interface{}{nil}),
})

var getHistoricTradesResponse []trade.Data
var getHistoricTradesResponse []trade.Trade
getHistoricTradesResponse, err = e.GetHistoricTrades(context.TODO(), p, assetTypes[i], time.Now().Add(-time.Hour), time.Now())
msg = ""
if err != nil {
Expand All @@ -446,7 +446,7 @@ func testWrappers(e exchange.IBotExchange, base *exchange.Base, config *Config)
Response: jsonifyInterface([]interface{}{getHistoricTradesResponse}),
})

var getRecentTradesResponse []trade.Data
var getRecentTradesResponse []trade.Trade
getRecentTradesResponse, err = e.GetRecentTrades(context.TODO(), p, assetTypes[i])
msg = ""
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions engine/datahistory_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ func (d dataHistoryJobResultService) GetJobResultsBetween(_ string, _, _ time.Ti
return nil, nil
}

func dataHistoryTraderLoader(exch, a, base, quote string, start, _ time.Time) ([]trade.Data, error) {
func dataHistoryTraderLoader(exch, a, base, quote string, start, _ time.Time) ([]trade.Trade, error) {
cp, err := currency.NewPairFromStrings(base, quote)
if err != nil {
return nil, err
Expand All @@ -1508,7 +1508,7 @@ func dataHistoryTraderLoader(exch, a, base, quote string, start, _ time.Time) ([
if err != nil {
return nil, err
}
return []trade.Data{
return []trade.Trade{
{
Exchange: exch,
CurrencyPair: cp,
Expand Down Expand Up @@ -1546,7 +1546,7 @@ func dataHistoryCandleLoader(exch string, cp currency.Pair, a asset.Item, interv
}, nil
}

func dataHistoryTradeSaver(...trade.Data) error {
func dataHistoryTradeSaver(...trade.Trade) error {
return nil
}

Expand Down Expand Up @@ -1595,8 +1595,8 @@ func (f dhmExchange) GetHistoricCandlesExtended(_ context.Context, p currency.Pa
}, nil
}

func (f dhmExchange) GetHistoricTrades(_ context.Context, p currency.Pair, a asset.Item, startTime, _ time.Time) ([]trade.Data, error) {
return []trade.Data{
func (f dhmExchange) GetHistoricTrades(_ context.Context, p currency.Pair, a asset.Item, startTime, _ time.Time) ([]trade.Trade, error) {
return []trade.Trade{
{
Exchange: testExchange,
CurrencyPair: p,
Expand Down
4 changes: 2 additions & 2 deletions engine/datahistory_manager_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ type DataHistoryManager struct {
maxResultInsertions int64
verbose bool
candleLoader func(string, currency.Pair, asset.Item, kline.Interval, time.Time, time.Time) (*kline.Item, error)
tradeLoader func(string, string, string, string, time.Time, time.Time) ([]trade.Data, error)
tradeSaver func(...trade.Data) error
tradeLoader func(string, string, string, string, time.Time, time.Time) ([]trade.Trade, error)
tradeSaver func(...trade.Trade) error
candleSaver func(*kline.Item, bool) (uint64, error)
}

Expand Down
6 changes: 3 additions & 3 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,13 @@ func validateSettings(b *Engine, s *Settings, flagSet FlagSet) {
b.Settings.EventManagerDelay = EventSleepDelay
}

if b.Settings.TradeBufferProcessingInterval != trade.DefaultProcessorIntervalTime {
if b.Settings.TradeBufferProcessingInterval != trade.DefaultSaveInterval {
if b.Settings.TradeBufferProcessingInterval >= time.Second {
trade.BufferProcessorIntervalTime = b.Settings.TradeBufferProcessingInterval
} else {
b.Settings.TradeBufferProcessingInterval = trade.DefaultProcessorIntervalTime
b.Settings.TradeBufferProcessingInterval = trade.DefaultSaveInterval
gctlog.Warnf(gctlog.Global, "-tradeprocessinginterval must be >= to 1 second, using default value of %v",
trade.DefaultProcessorIntervalTime)
trade.DefaultSaveInterval)
}
}

Expand Down
8 changes: 4 additions & 4 deletions engine/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3219,7 +3219,7 @@ func (s *RPCServer) GetSavedTrades(_ context.Context, r *gctrpc.GetSavedTradesRe
if err != nil {
return nil, err
}
var trades []trade.Data
var trades []trade.Trade
trades, err = trade.GetTradesInRange(r.Exchange, r.AssetType, r.Pair.Base, r.Pair.Quote, start, end)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3468,7 +3468,7 @@ func (s *RPCServer) FindMissingSavedTradeIntervals(_ context.Context, r *gctrpc.
iterationTime = iterationTime.Add(time.Hour)
}

var trades []trade.Data
var trades []trade.Trade
trades, err = trade.GetTradesInRange(
r.ExchangeName,
r.AssetType,
Expand Down Expand Up @@ -3563,7 +3563,7 @@ func (s *RPCServer) GetHistoricTrades(r *gctrpc.GetSavedTradesRequest, stream gc
if err != nil {
return err
}
var trades []trade.Data
var trades []trade.Trade
start, err := time.Parse(common.SimpleTimeFormatWithTimezone, r.Start)
if err != nil {
return fmt.Errorf("%w cannot parse start time %v", errInvalidTimes, err)
Expand Down Expand Up @@ -3644,7 +3644,7 @@ func (s *RPCServer) GetRecentTrades(ctx context.Context, r *gctrpc.GetSavedTrade
return nil, err
}

var trades []trade.Data
var trades []trade.Trade
trades, err = exch.GetRecentTrades(ctx, cp, a)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions engine/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ func TestGetHistoricCandles(t *testing.T) {
if len(results.Candle) == 0 {
t.Error("expected results")
}
err = trade.SaveTradesToDatabase(trade.Data{
err = trade.SaveTradesToDatabase(trade.Trade{
TID: "test123",
Exchange: testExchange,
CurrencyPair: cp,
Expand Down Expand Up @@ -956,7 +956,7 @@ func TestFindMissingSavedTradeIntervals(t *testing.T) {
t.Errorf("expected a status message")
}
// one trade response
err = trade.SaveTradesToDatabase(trade.Data{
err = trade.SaveTradesToDatabase(trade.Trade{
TID: "test1234",
Exchange: testExchange,
CurrencyPair: cp,
Expand Down Expand Up @@ -989,7 +989,7 @@ func TestFindMissingSavedTradeIntervals(t *testing.T) {
}

// two trades response
err = trade.SaveTradesToDatabase(trade.Data{
err = trade.SaveTradesToDatabase(trade.Trade{
TID: "test123",
Exchange: testExchange,
CurrencyPair: cp,
Expand Down
2 changes: 1 addition & 1 deletion engine/websocketroutine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data int
m.printAccountHoldingsChangeSummary(d[x])
}
}
case []trade.Data:
case []trade.Trade:
if m.verbose {
log.Infof(log.Trade, "%+v", d)
}
Expand Down
4 changes: 2 additions & 2 deletions exchanges/alphapoint/alphapoint_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,12 @@ func (a *Alphapoint) GetWithdrawalsHistory(_ context.Context, _ currency.Code, _
}

// GetRecentTrades returns the most recent trades for a currency and asset
func (a *Alphapoint) GetRecentTrades(_ context.Context, _ currency.Pair, _ asset.Item) ([]trade.Data, error) {
func (a *Alphapoint) GetRecentTrades(_ context.Context, _ currency.Pair, _ asset.Item) ([]trade.Trade, error) {
return nil, common.ErrNotYetImplemented
}

// GetHistoricTrades returns historic trade data within the timeframe provided
func (a *Alphapoint) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) {
func (a *Alphapoint) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) {
return nil, common.ErrNotYetImplemented
}

Expand Down
2 changes: 1 addition & 1 deletion exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
err)
}
return b.Websocket.Trade.Update(saveTradeData,
trade.Data{
trade.Trade{
CurrencyPair: pair,
Timestamp: t.TimeStamp,
Price: t.Price.Float64(),
Expand Down
18 changes: 9 additions & 9 deletions exchanges/binance/binance_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,14 +766,14 @@ func (b *Binance) GetWithdrawalsHistory(ctx context.Context, c currency.Code, _
}

// GetRecentTrades returns the most recent trades for a currency and asset
func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.Item) ([]trade.Data, error) {
func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.Item) ([]trade.Trade, error) {
const limit = 1000
rFmt, err := b.GetPairFormat(a, true)
if err != nil {
return nil, err
}
pFmt := p.Format(rFmt)
resp := make([]trade.Data, 0, limit)
resp := make([]trade.Trade, 0, limit)
switch a {
case asset.Spot:
tradeData, err := b.GetMostRecentTrades(ctx,
Expand All @@ -783,7 +783,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.
}

for i := range tradeData {
resp = append(resp, trade.Data{
resp = append(resp, trade.Trade{
TID: strconv.FormatInt(tradeData[i].ID, 10),
Exchange: b.Name,
CurrencyPair: p,
Expand All @@ -800,7 +800,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.
}

for i := range tradeData {
resp = append(resp, trade.Data{
resp = append(resp, trade.Trade{
TID: strconv.FormatInt(tradeData[i].ID, 10),
Exchange: b.Name,
CurrencyPair: p,
Expand All @@ -817,7 +817,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.
}

for i := range tradeData {
resp = append(resp, trade.Data{
resp = append(resp, trade.Trade{
TID: strconv.FormatInt(tradeData[i].ID, 10),
Exchange: b.Name,
CurrencyPair: p,
Expand All @@ -830,7 +830,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.
}

if b.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(b.Name, resp...)
err := trade.Add(b.Name, resp...)
if err != nil {
return nil, err
}
Expand All @@ -841,7 +841,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.
}

// GetHistoricTrades returns historic trade data within the timeframe provided
func (b *Binance) GetHistoricTrades(ctx context.Context, p currency.Pair, a asset.Item, from, to time.Time) ([]trade.Data, error) {
func (b *Binance) GetHistoricTrades(ctx context.Context, p currency.Pair, a asset.Item, from, to time.Time) ([]trade.Trade, error) {
if err := b.CurrencyPairs.IsAssetEnabled(a); err != nil {
return nil, err
}
Expand All @@ -862,9 +862,9 @@ func (b *Binance) GetHistoricTrades(ctx context.Context, p currency.Pair, a asse
if err != nil {
return nil, fmt.Errorf("%w %v", err, pFmt)
}
result := make([]trade.Data, len(trades))
result := make([]trade.Trade, len(trades))
for i := range trades {
result[i] = trade.Data{
result[i] = trade.Trade{
CurrencyPair: p,
TID: strconv.FormatInt(trades[i].ATradeID, 10),
Amount: trades[i].Quantity,
Expand Down
4 changes: 2 additions & 2 deletions exchanges/binanceus/binanceus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ type AggregatedTrade struct {
}

// toTradeData this method converts the AggregatedTrade data into an instance of trade.Data
func (a *AggregatedTrade) toTradeData(p currency.Pair, exchange string, aType asset.Item) *trade.Data {
return &trade.Data{
func (a *AggregatedTrade) toTradeData(p currency.Pair, exchange string, aType asset.Item) *trade.Trade {
return &trade.Trade{
CurrencyPair: p,
TID: strconv.FormatInt(a.ATradeID, 10),
Amount: a.Quantity,
Expand Down
2 changes: 1 addition & 1 deletion exchanges/binanceus/binanceus_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (bi *Binanceus) wsHandleData(respRaw []byte) error {
}

return bi.Websocket.Trade.Update(saveTradeData,
trade.Data{
trade.Trade{
CurrencyPair: pair,
Timestamp: t.TimeStamp,
Price: price,
Expand Down
12 changes: 6 additions & 6 deletions exchanges/binanceus/binanceus_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (bi *Binanceus) GetWithdrawalsHistory(ctx context.Context, c currency.Code,
}

// GetRecentTrades returns the most recent trades for a currency and asset
func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) {
func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) {
if p.IsEmpty() {
return nil, currency.ErrCurrencyPairEmpty
}
Expand All @@ -476,9 +476,9 @@ func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, asset
if err != nil {
return nil, err
}
resp := make([]trade.Data, len(tradeData))
resp := make([]trade.Trade, len(tradeData))
for i := range tradeData {
resp[i] = trade.Data{
resp[i] = trade.Trade{
TID: strconv.FormatInt(tradeData[i].ID, 10),
Exchange: bi.Name,
AssetType: assetType,
Expand All @@ -490,7 +490,7 @@ func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, asset
}

if bi.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(bi.Name, resp...)
err := trade.Add(bi.Name, resp...)
if err != nil {
return nil, err
}
Expand All @@ -500,7 +500,7 @@ func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, asset
}

// GetHistoricTrades returns historic trade data within the timeframe provided
func (bi *Binanceus) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Data, error) {
func (bi *Binanceus) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Trade, error) {
if p.IsEmpty() {
return nil, currency.ErrCurrencyPairEmpty
}
Expand All @@ -516,7 +516,7 @@ func (bi *Binanceus) GetHistoricTrades(ctx context.Context, p currency.Pair, ass
if err != nil {
return nil, err
}
result := make([]trade.Data, len(trades))
result := make([]trade.Trade, len(trades))
exName := bi.Name
for i := range trades {
t := trades[i].toTradeData(p, exName, assetType)
Expand Down
4 changes: 2 additions & 2 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
}
wsTrade.Price = price
}
tradeHolder = append(tradeHolder, wsTrade)
tradeHolder = append(tTradeHolder, wsTrade)
}
trades := make([]trade.Data, len(tradeHolder))
for i := range tradeHolder {
Expand All @@ -989,7 +989,7 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
}
price := tradeHolder[i].Price
if price == 0 && tradeHolder[i].Rate > 0 {
price = tradeHolder[i].Rate
price = tradeHoldTrade].Rate
}
trades[i] = trade.Data{
TID: strconv.FormatInt(tradeHolder[i].ID, 10),
Expand Down
Loading

0 comments on commit c37c010

Please sign in to comment.