diff --git a/backtester/data/kline/api/api.go b/backtester/data/kline/api/api.go index 7fd822727db..c5cded9b2e2 100644 --- a/backtester/data/kline/api/api.go +++ b/backtester/data/kline/api/api.go @@ -30,7 +30,7 @@ func LoadData(ctx context.Context, dataType int64, startDate, endDate time.Time, return nil, fmt.Errorf("could not retrieve candle data for %v %v %v, %v", exch.GetName(), a, fPair, err) } case common.DataTrade: - var trades []trade.Data + var trades []trade.Trade trades, err = exch.GetHistoricTrades(ctx, fPair, a, diff --git a/backtester/data/kline/csv/csv.go b/backtester/data/kline/csv/csv.go index 95b84172b35..6750be73d83 100644 --- a/backtester/data/kline/csv/csv.go +++ b/backtester/data/kline/csv/csv.go @@ -105,7 +105,7 @@ func LoadData(dataType int64, filepath, exchangeName string, interval time.Durat } resp.Item = &candles case common.DataTrade: - var trades []trade.Data + var trades []trade.Trade for { row, errCSV := csvData.Read() if errCSV != nil { @@ -115,7 +115,7 @@ func LoadData(dataType int64, filepath, exchangeName string, interval time.Durat return nil, errCSV } - t := trade.Data{} + t := trade.Trade{} v, errParse := strconv.ParseInt(row[0], 10, 32) if errParse != nil { return nil, errParse diff --git a/backtester/data/kline/live/live.go b/backtester/data/kline/live/live.go index 674fda1b1c2..d86c7c0b542 100644 --- a/backtester/data/kline/live/live.go +++ b/backtester/data/kline/live/live.go @@ -48,7 +48,7 @@ func LoadData(ctx context.Context, timeToRetrieve time.Time, exch exchange.IBotE return nil, fmt.Errorf("could not retrieve live candle data for %v %v %v, %v", exch.GetName(), a, currencyPair, err) } case common.DataTrade: - var trades []trade.Data + var trades []trade.Trade trades, err = exch.GetHistoricTrades(ctx, pFmt, a, diff --git a/cmd/exchange_wrapper_issues/main.go b/cmd/exchange_wrapper_issues/main.go index 0e1aa3aa350..c9ac5dd52ce 100644 --- a/cmd/exchange_wrapper_issues/main.go +++ b/cmd/exchange_wrapper_issues/main.go @@ -93,7 +93,7 @@ func main() { wrapperConfig.OrderSubmission.OrderType = orderTypeOverride } if orderSideOverride != "BUY" { - wrapperConfig.OrderSubmission.OrderSide = orderSideOverride + wrapperConfig.OrderSubmission.OrderSide = orderSideOverrideDefaultSaveInterval } if orderPriceOverride > 0 { wrapperConfig.OrderSubmission.Price = orderPriceOverride @@ -432,7 +432,7 @@ func testWrappers(e exchange.IBotExchange, base *exchange.Base, config *Config) Response: jsonifyInterface([]interface{}{nil}), }) - var getHistoricTradesResponse []trade.Data + var getHistoricTradesResponse []trade.Trade getHistoricTradesResponse, err = e.GetHistoricTrades(context.TODO(), p, assetTypes[i], time.Now().Add(-time.Hour), time.Now()) msg = "" if err != nil { @@ -446,7 +446,7 @@ func testWrappers(e exchange.IBotExchange, base *exchange.Base, config *Config) Response: jsonifyInterface([]interface{}{getHistoricTradesResponse}), }) - var getRecentTradesResponse []trade.Data + var getRecentTradesResponse []trade.Trade getRecentTradesResponse, err = e.GetRecentTrades(context.TODO(), p, assetTypes[i]) msg = "" if err != nil { diff --git a/engine/datahistory_manager_test.go b/engine/datahistory_manager_test.go index 7a955033b79..263cec7fafa 100644 --- a/engine/datahistory_manager_test.go +++ b/engine/datahistory_manager_test.go @@ -1499,7 +1499,7 @@ func (d dataHistoryJobResultService) GetJobResultsBetween(_ string, _, _ time.Ti return nil, nil } -func dataHistoryTraderLoader(exch, a, base, quote string, start, _ time.Time) ([]trade.Data, error) { +func dataHistoryTraderLoader(exch, a, base, quote string, start, _ time.Time) ([]trade.Trade, error) { cp, err := currency.NewPairFromStrings(base, quote) if err != nil { return nil, err @@ -1508,7 +1508,7 @@ func dataHistoryTraderLoader(exch, a, base, quote string, start, _ time.Time) ([ if err != nil { return nil, err } - return []trade.Data{ + return []trade.Trade{ { Exchange: exch, CurrencyPair: cp, @@ -1546,7 +1546,7 @@ func dataHistoryCandleLoader(exch string, cp currency.Pair, a asset.Item, interv }, nil } -func dataHistoryTradeSaver(...trade.Data) error { +func dataHistoryTradeSaver(...trade.Trade) error { return nil } @@ -1595,8 +1595,8 @@ func (f dhmExchange) GetHistoricCandlesExtended(_ context.Context, p currency.Pa }, nil } -func (f dhmExchange) GetHistoricTrades(_ context.Context, p currency.Pair, a asset.Item, startTime, _ time.Time) ([]trade.Data, error) { - return []trade.Data{ +func (f dhmExchange) GetHistoricTrades(_ context.Context, p currency.Pair, a asset.Item, startTime, _ time.Time) ([]trade.Trade, error) { + return []trade.Trade{ { Exchange: testExchange, CurrencyPair: p, diff --git a/engine/datahistory_manager_types.go b/engine/datahistory_manager_types.go index 4b17ded0b5a..738b180fd39 100644 --- a/engine/datahistory_manager_types.go +++ b/engine/datahistory_manager_types.go @@ -133,8 +133,8 @@ type DataHistoryManager struct { maxResultInsertions int64 verbose bool candleLoader func(string, currency.Pair, asset.Item, kline.Interval, time.Time, time.Time) (*kline.Item, error) - tradeLoader func(string, string, string, string, time.Time, time.Time) ([]trade.Data, error) - tradeSaver func(...trade.Data) error + tradeLoader func(string, string, string, string, time.Time, time.Time) ([]trade.Trade, error) + tradeSaver func(...trade.Trade) error candleSaver func(*kline.Item, bool) (uint64, error) } diff --git a/engine/engine.go b/engine/engine.go index 8ad14d87ed6..25760f9f37d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -214,13 +214,13 @@ func validateSettings(b *Engine, s *Settings, flagSet FlagSet) { b.Settings.EventManagerDelay = EventSleepDelay } - if b.Settings.TradeBufferProcessingInterval != trade.DefaultProcessorIntervalTime { + if b.Settings.TradeBufferProcessingInterval != trade.DefaultSaveInterval { if b.Settings.TradeBufferProcessingInterval >= time.Second { trade.BufferProcessorIntervalTime = b.Settings.TradeBufferProcessingInterval } else { - b.Settings.TradeBufferProcessingInterval = trade.DefaultProcessorIntervalTime + b.Settings.TradeBufferProcessingInterval = trade.DefaultSaveInterval gctlog.Warnf(gctlog.Global, "-tradeprocessinginterval must be >= to 1 second, using default value of %v", - trade.DefaultProcessorIntervalTime) + trade.DefaultSaveInterval) } } diff --git a/engine/rpcserver.go b/engine/rpcserver.go index 72e7e76e8b6..03ce907b80d 100644 --- a/engine/rpcserver.go +++ b/engine/rpcserver.go @@ -3219,7 +3219,7 @@ func (s *RPCServer) GetSavedTrades(_ context.Context, r *gctrpc.GetSavedTradesRe if err != nil { return nil, err } - var trades []trade.Data + var trades []trade.Trade trades, err = trade.GetTradesInRange(r.Exchange, r.AssetType, r.Pair.Base, r.Pair.Quote, start, end) if err != nil { return nil, err @@ -3468,7 +3468,7 @@ func (s *RPCServer) FindMissingSavedTradeIntervals(_ context.Context, r *gctrpc. iterationTime = iterationTime.Add(time.Hour) } - var trades []trade.Data + var trades []trade.Trade trades, err = trade.GetTradesInRange( r.ExchangeName, r.AssetType, @@ -3563,7 +3563,7 @@ func (s *RPCServer) GetHistoricTrades(r *gctrpc.GetSavedTradesRequest, stream gc if err != nil { return err } - var trades []trade.Data + var trades []trade.Trade start, err := time.Parse(common.SimpleTimeFormatWithTimezone, r.Start) if err != nil { return fmt.Errorf("%w cannot parse start time %v", errInvalidTimes, err) @@ -3644,7 +3644,7 @@ func (s *RPCServer) GetRecentTrades(ctx context.Context, r *gctrpc.GetSavedTrade return nil, err } - var trades []trade.Data + var trades []trade.Trade trades, err = exch.GetRecentTrades(ctx, cp, a) if err != nil { return nil, err diff --git a/engine/rpcserver_test.go b/engine/rpcserver_test.go index c7d2575c955..56a1710dcd6 100644 --- a/engine/rpcserver_test.go +++ b/engine/rpcserver_test.go @@ -884,7 +884,7 @@ func TestGetHistoricCandles(t *testing.T) { if len(results.Candle) == 0 { t.Error("expected results") } - err = trade.SaveTradesToDatabase(trade.Data{ + err = trade.SaveTradesToDatabase(trade.Trade{ TID: "test123", Exchange: testExchange, CurrencyPair: cp, @@ -956,7 +956,7 @@ func TestFindMissingSavedTradeIntervals(t *testing.T) { t.Errorf("expected a status message") } // one trade response - err = trade.SaveTradesToDatabase(trade.Data{ + err = trade.SaveTradesToDatabase(trade.Trade{ TID: "test1234", Exchange: testExchange, CurrencyPair: cp, @@ -989,7 +989,7 @@ func TestFindMissingSavedTradeIntervals(t *testing.T) { } // two trades response - err = trade.SaveTradesToDatabase(trade.Data{ + err = trade.SaveTradesToDatabase(trade.Trade{ TID: "test123", Exchange: testExchange, CurrencyPair: cp, diff --git a/engine/websocketroutine_manager.go b/engine/websocketroutine_manager.go index f6412d8bd0f..9bbb1019797 100644 --- a/engine/websocketroutine_manager.go +++ b/engine/websocketroutine_manager.go @@ -347,7 +347,7 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data int m.printAccountHoldingsChangeSummary(d[x]) } } - case []trade.Data: + case []trade.Trade: if m.verbose { log.Infof(log.Trade, "%+v", d) } diff --git a/exchanges/alphapoint/alphapoint_wrapper.go b/exchanges/alphapoint/alphapoint_wrapper.go index a11a6203c12..b8f5451193a 100644 --- a/exchanges/alphapoint/alphapoint_wrapper.go +++ b/exchanges/alphapoint/alphapoint_wrapper.go @@ -262,12 +262,12 @@ func (a *Alphapoint) GetWithdrawalsHistory(_ context.Context, _ currency.Code, _ } // GetRecentTrades returns the most recent trades for a currency and asset -func (a *Alphapoint) GetRecentTrades(_ context.Context, _ currency.Pair, _ asset.Item) ([]trade.Data, error) { +func (a *Alphapoint) GetRecentTrades(_ context.Context, _ currency.Pair, _ asset.Item) ([]trade.Trade, error) { return nil, common.ErrNotYetImplemented } // GetHistoricTrades returns historic trade data within the timeframe provided -func (a *Alphapoint) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (a *Alphapoint) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrNotYetImplemented } diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 91b68a11ded..020fbebcb50 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -338,7 +338,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error { err) } return b.Websocket.Trade.Update(saveTradeData, - trade.Data{ + trade.Trade{ CurrencyPair: pair, Timestamp: t.TimeStamp, Price: t.Price.Float64(), diff --git a/exchanges/binance/binance_wrapper.go b/exchanges/binance/binance_wrapper.go index 475f274814d..df895131925 100644 --- a/exchanges/binance/binance_wrapper.go +++ b/exchanges/binance/binance_wrapper.go @@ -766,14 +766,14 @@ func (b *Binance) GetWithdrawalsHistory(ctx context.Context, c currency.Code, _ } // GetRecentTrades returns the most recent trades for a currency and asset -func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.Item) ([]trade.Data, error) { +func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.Item) ([]trade.Trade, error) { const limit = 1000 rFmt, err := b.GetPairFormat(a, true) if err != nil { return nil, err } pFmt := p.Format(rFmt) - resp := make([]trade.Data, 0, limit) + resp := make([]trade.Trade, 0, limit) switch a { case asset.Spot: tradeData, err := b.GetMostRecentTrades(ctx, @@ -783,7 +783,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset. } for i := range tradeData { - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: strconv.FormatInt(tradeData[i].ID, 10), Exchange: b.Name, CurrencyPair: p, @@ -800,7 +800,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset. } for i := range tradeData { - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: strconv.FormatInt(tradeData[i].ID, 10), Exchange: b.Name, CurrencyPair: p, @@ -817,7 +817,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset. } for i := range tradeData { - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: strconv.FormatInt(tradeData[i].ID, 10), Exchange: b.Name, CurrencyPair: p, @@ -830,7 +830,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset. } if b.IsSaveTradeDataEnabled() { - err := trade.AddTradesToBuffer(b.Name, resp...) + err := trade.Add(b.Name, resp...) if err != nil { return nil, err } @@ -841,7 +841,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset. } // GetHistoricTrades returns historic trade data within the timeframe provided -func (b *Binance) GetHistoricTrades(ctx context.Context, p currency.Pair, a asset.Item, from, to time.Time) ([]trade.Data, error) { +func (b *Binance) GetHistoricTrades(ctx context.Context, p currency.Pair, a asset.Item, from, to time.Time) ([]trade.Trade, error) { if err := b.CurrencyPairs.IsAssetEnabled(a); err != nil { return nil, err } @@ -862,9 +862,9 @@ func (b *Binance) GetHistoricTrades(ctx context.Context, p currency.Pair, a asse if err != nil { return nil, fmt.Errorf("%w %v", err, pFmt) } - result := make([]trade.Data, len(trades)) + result := make([]trade.Trade, len(trades)) for i := range trades { - result[i] = trade.Data{ + result[i] = trade.Trade{ CurrencyPair: p, TID: strconv.FormatInt(trades[i].ATradeID, 10), Amount: trades[i].Quantity, diff --git a/exchanges/binanceus/binanceus_types.go b/exchanges/binanceus/binanceus_types.go index 07fb569a262..a0d97f64ba5 100644 --- a/exchanges/binanceus/binanceus_types.go +++ b/exchanges/binanceus/binanceus_types.go @@ -146,8 +146,8 @@ type AggregatedTrade struct { } // toTradeData this method converts the AggregatedTrade data into an instance of trade.Data -func (a *AggregatedTrade) toTradeData(p currency.Pair, exchange string, aType asset.Item) *trade.Data { - return &trade.Data{ +func (a *AggregatedTrade) toTradeData(p currency.Pair, exchange string, aType asset.Item) *trade.Trade { + return &trade.Trade{ CurrencyPair: p, TID: strconv.FormatInt(a.ATradeID, 10), Amount: a.Quantity, diff --git a/exchanges/binanceus/binanceus_websocket.go b/exchanges/binanceus/binanceus_websocket.go index 15101a0a5ad..9e839af3eb4 100644 --- a/exchanges/binanceus/binanceus_websocket.go +++ b/exchanges/binanceus/binanceus_websocket.go @@ -346,7 +346,7 @@ func (bi *Binanceus) wsHandleData(respRaw []byte) error { } return bi.Websocket.Trade.Update(saveTradeData, - trade.Data{ + trade.Trade{ CurrencyPair: pair, Timestamp: t.TimeStamp, Price: price, diff --git a/exchanges/binanceus/binanceus_wrapper.go b/exchanges/binanceus/binanceus_wrapper.go index d3992d4e73e..f93aedeeb53 100644 --- a/exchanges/binanceus/binanceus_wrapper.go +++ b/exchanges/binanceus/binanceus_wrapper.go @@ -464,7 +464,7 @@ func (bi *Binanceus) GetWithdrawalsHistory(ctx context.Context, c currency.Code, } // GetRecentTrades returns the most recent trades for a currency and asset -func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { if p.IsEmpty() { return nil, currency.ErrCurrencyPairEmpty } @@ -476,9 +476,9 @@ func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, asset if err != nil { return nil, err } - resp := make([]trade.Data, len(tradeData)) + resp := make([]trade.Trade, len(tradeData)) for i := range tradeData { - resp[i] = trade.Data{ + resp[i] = trade.Trade{ TID: strconv.FormatInt(tradeData[i].ID, 10), Exchange: bi.Name, AssetType: assetType, @@ -490,7 +490,7 @@ func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, asset } if bi.IsSaveTradeDataEnabled() { - err := trade.AddTradesToBuffer(bi.Name, resp...) + err := trade.Add(bi.Name, resp...) if err != nil { return nil, err } @@ -500,7 +500,7 @@ func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, asset } // GetHistoricTrades returns historic trade data within the timeframe provided -func (bi *Binanceus) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Data, error) { +func (bi *Binanceus) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Trade, error) { if p.IsEmpty() { return nil, currency.ErrCurrencyPairEmpty } @@ -516,7 +516,7 @@ func (bi *Binanceus) GetHistoricTrades(ctx context.Context, p currency.Pair, ass if err != nil { return nil, err } - result := make([]trade.Data, len(trades)) + result := make([]trade.Trade, len(trades)) exName := bi.Name for i := range trades { t := trades[i].toTradeData(p, exName, assetType) diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index ee1b854b0b0..46c107843d8 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -977,7 +977,7 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType } wsTrade.Price = price } - tradeHolder = append(tradeHolder, wsTrade) + tradeHolder = append(tTradeHolder, wsTrade) } trades := make([]trade.Data, len(tradeHolder)) for i := range tradeHolder { @@ -989,7 +989,7 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType } price := tradeHolder[i].Price if price == 0 && tradeHolder[i].Rate > 0 { - price = tradeHolder[i].Rate + price = tradeHoldTrade].Rate } trades[i] = trade.Data{ TID: strconv.FormatInt(tradeHolder[i].ID, 10), diff --git a/exchanges/bitflyer/bitflyer_wrapper.go b/exchanges/bitflyer/bitflyer_wrapper.go index 767c107be02..51232c7e563 100644 --- a/exchanges/bitflyer/bitflyer_wrapper.go +++ b/exchanges/bitflyer/bitflyer_wrapper.go @@ -299,7 +299,7 @@ func (b *Bitflyer) GetWithdrawalsHistory(_ context.Context, _ currency.Code, _ a } // GetRecentTrades returns recent historic trades -func (b *Bitflyer) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (b *Bitflyer) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { var err error p, err = b.FormatExchangeCurrency(p, assetType) if err != nil { @@ -309,7 +309,7 @@ func (b *Bitflyer) GetRecentTrades(ctx context.Context, p currency.Pair, assetTy if err != nil { return nil, err } - resp := make([]trade.Data, len(tradeData)) + resp := make([]trade.Trade, len(tradeData)) for i := range tradeData { var timestamp time.Time timestamp, err = time.Parse("2006-01-02T15:04:05.999999999", tradeData[i].ExecDate) @@ -321,7 +321,7 @@ func (b *Bitflyer) GetRecentTrades(ctx context.Context, p currency.Pair, assetTy if err != nil { return nil, err } - resp[i] = trade.Data{ + resp[i] = trade.Trade{ TID: strconv.FormatInt(tradeData[i].ID, 10), Exchange: b.Name, CurrencyPair: p, @@ -343,7 +343,7 @@ func (b *Bitflyer) GetRecentTrades(ctx context.Context, p currency.Pair, assetTy } // GetHistoricTrades returns historic trade data within the timeframe provided -func (b *Bitflyer) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (b *Bitflyer) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/bithumb/bithumb_websocket.go b/exchanges/bithumb/bithumb_websocket.go index 35f8359df39..cc2aa30ed56 100644 --- a/exchanges/bithumb/bithumb_websocket.go +++ b/exchanges/bithumb/bithumb_websocket.go @@ -137,7 +137,7 @@ func (b *Bithumb) wsHandleData(respRaw []byte) error { return err } - toBuffer := make([]trade.Data, len(trades.List)) + toBuffer := make([]trade.Trade, len(trades.List)) var lu time.Time for x := range trades.List { lu, err = time.ParseInLocation(tradeTimeLayout, @@ -147,7 +147,7 @@ func (b *Bithumb) wsHandleData(respRaw []byte) error { return err } - toBuffer[x] = trade.Data{ + toBuffer[x] = trade.Trade{ Exchange: b.Name, AssetType: asset.Spot, CurrencyPair: trades.List[x].Symbol, diff --git a/exchanges/bithumb/bithumb_wrapper.go b/exchanges/bithumb/bithumb_wrapper.go index 64a463249f6..91b3a3ba079 100644 --- a/exchanges/bithumb/bithumb_wrapper.go +++ b/exchanges/bithumb/bithumb_wrapper.go @@ -399,7 +399,7 @@ func (b *Bithumb) GetWithdrawalsHistory(ctx context.Context, c currency.Code, _ } // GetRecentTrades returns the most recent trades for a currency and asset -func (b *Bithumb) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (b *Bithumb) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { var err error p, err = b.FormatExchangeCurrency(p, assetType) if err != nil { @@ -409,7 +409,7 @@ func (b *Bithumb) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp if err != nil { return nil, err } - resp := make([]trade.Data, len(tradeData.Data)) + resp := make([]trade.Trade, len(tradeData.Data)) for i := range tradeData.Data { var side order.Side side, err = order.StringToOrderSide(tradeData.Data[i].Type) @@ -421,7 +421,7 @@ func (b *Bithumb) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp if err != nil { return nil, err } - resp[i] = trade.Data{ + resp[i] = trade.Trade{ Exchange: b.Name, CurrencyPair: p, AssetType: assetType, @@ -442,7 +442,7 @@ func (b *Bithumb) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp } // GetHistoricTrades returns historic trade data within the timeframe provided -func (b *Bithumb) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (b *Bithumb) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/bitmex/bitmex_wrapper.go b/exchanges/bitmex/bitmex_wrapper.go index eea24d81326..b833a893d79 100644 --- a/exchanges/bitmex/bitmex_wrapper.go +++ b/exchanges/bitmex/bitmex_wrapper.go @@ -600,12 +600,12 @@ func (b *Bitmex) GetServerTime(_ context.Context, _ asset.Item) (time.Time, erro } // GetRecentTrades returns the most recent trades for a currency and asset -func (b *Bitmex) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (b *Bitmex) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { return b.GetHistoricTrades(ctx, p, assetType, time.Now().Add(-time.Minute*15), time.Now()) } // GetHistoricTrades returns historic trade data within the timeframe provided -func (b *Bitmex) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Data, error) { +func (b *Bitmex) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Trade, error) { if assetType == asset.Index { return nil, fmt.Errorf("%w %v", asset.ErrNotSupported, assetType) } @@ -624,7 +624,7 @@ func (b *Bitmex) GetHistoricTrades(ctx context.Context, p currency.Pair, assetTy EndTime: timestampEnd.UTC().Format("2006-01-02T15:04:05.000Z"), } ts := timestampStart - var resp []trade.Data + var resp []trade.Trade allTrades: for { req.StartTime = ts.UTC().Format("2006-01-02T15:04:05.000Z") @@ -647,7 +647,7 @@ allTrades: // These have a size of 0 and are used only to indicate a changing price. continue } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ Exchange: b.Name, CurrencyPair: p, AssetType: assetType, diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index 8fe4f2fc1fa..8911c250187 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -157,10 +157,10 @@ func (b *Bitstamp) handleWSTrade(msg []byte) error { if wsTradeTemp.Data.Type == 1 { side = order.Sell } - return trade.AddTradesToBuffer(b.Name, trade.Data{ + return trade.AddTradesToBuffer(b.Name, trade.Trade{ Timestamp: time.Unix(wsTradeTemp.Data.Timestamp, 0), CurrencyPair: p, - AssetType: asset.Spot, + AssetType: Add Exchange: b.Name, Price: wsTradeTemp.Data.Price, Amount: wsTradeTemp.Data.Amount, diff --git a/exchanges/bitstamp/bitstamp_wrapper.go b/exchanges/bitstamp/bitstamp_wrapper.go index 36d02f80195..a5adc575ab3 100644 --- a/exchanges/bitstamp/bitstamp_wrapper.go +++ b/exchanges/bitstamp/bitstamp_wrapper.go @@ -447,7 +447,7 @@ func (b *Bitstamp) GetWithdrawalsHistory(ctx context.Context, c currency.Code, _ } // GetRecentTrades returns the most recent trades for a currency and asset -func (b *Bitstamp) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (b *Bitstamp) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { p, err := b.FormatExchangeCurrency(p, assetType) if err != nil { return nil, err @@ -458,13 +458,13 @@ func (b *Bitstamp) GetRecentTrades(ctx context.Context, p currency.Pair, assetTy return nil, err } - resp := make([]trade.Data, len(tradeData)) + resp := make([]trade.Trade, len(tradeData)) for i := range tradeData { s := order.Buy if tradeData[i].Type == 1 { s = order.Sell } - resp[i] = trade.Data{ + resp[i] = trade.Trade{ Exchange: b.Name, TID: strconv.FormatInt(tradeData[i].TradeID, 10), CurrencyPair: p, @@ -486,7 +486,7 @@ func (b *Bitstamp) GetRecentTrades(ctx context.Context, p currency.Pair, assetTy } // GetHistoricTrades returns historic trade data within the timeframe provided -func (b *Bitstamp) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (b *Bitstamp) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/btcmarkets/btcmarkets_websocket.go b/exchanges/btcmarkets/btcmarkets_websocket.go index 742d7f4b711..8585db9c863 100644 --- a/exchanges/btcmarkets/btcmarkets_websocket.go +++ b/exchanges/btcmarkets/btcmarkets_websocket.go @@ -204,7 +204,7 @@ func (b *BTCMarkets) wsHandleData(respRaw []byte) error { side = order.Sell } - return trade.AddTradesToBuffer(b.Name, trade.Data{ + return trade.Add(b.Name, trade.Trade{ Timestamp: t.Timestamp, CurrencyPair: p, AssetType: asset.Spot, diff --git a/exchanges/btcmarkets/btcmarkets_wrapper.go b/exchanges/btcmarkets/btcmarkets_wrapper.go index e8abb596a0a..ad67e9b5e25 100644 --- a/exchanges/btcmarkets/btcmarkets_wrapper.go +++ b/exchanges/btcmarkets/btcmarkets_wrapper.go @@ -415,7 +415,7 @@ func (b *BTCMarkets) GetWithdrawalsHistory(ctx context.Context, c currency.Code, } // GetRecentTrades returns the most recent trades for a currency and asset -func (b *BTCMarkets) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (b *BTCMarkets) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { var err error p, err = b.FormatExchangeCurrency(p, assetType) if err != nil { @@ -428,7 +428,7 @@ func (b *BTCMarkets) GetRecentTrades(ctx context.Context, p currency.Pair, asset return nil, err } - resp := make([]trade.Data, len(tradeData)) + resp := make([]trade.Trade, len(tradeData)) for i := range tradeData { var side order.Side if tradeData[i].Side != "" { @@ -437,7 +437,7 @@ func (b *BTCMarkets) GetRecentTrades(ctx context.Context, p currency.Pair, asset return nil, err } } - resp[i] = trade.Data{ + resp[i] = trade.Trade{ Exchange: b.Name, TID: tradeData[i].TradeID, CurrencyPair: p, @@ -459,7 +459,7 @@ func (b *BTCMarkets) GetRecentTrades(ctx context.Context, p currency.Pair, asset } // GetHistoricTrades returns historic trade data within the timeframe provided -func (b *BTCMarkets) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (b *BTCMarkets) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/btse/btse_websocket.go b/exchanges/btse/btse_websocket.go index 1d9e590632b..a5f95e1ed88 100644 --- a/exchanges/btse/btse_websocket.go +++ b/exchanges/btse/btse_websocket.go @@ -249,7 +249,7 @@ func (b *BTSE) wsHandleData(respRaw []byte) error { if err != nil { return err } - var trades []trade.Data + var trades []trade.Trade for x := range tradeHistory.Data { side := order.Buy if tradeHistory.Data[x].Gain == -1 { @@ -269,7 +269,7 @@ func (b *BTSE) wsHandleData(respRaw []byte) error { if err != nil { return err } - trades = append(trades, trade.Data{ + trades = append(trades, trade.Trade{ Timestamp: time.UnixMilli(tradeHistory.Data[x].TransactionTime), CurrencyPair: p, AssetType: a, @@ -280,7 +280,7 @@ func (b *BTSE) wsHandleData(respRaw []byte) error { TID: strconv.FormatInt(tradeHistory.Data[x].ID, 10), }) } - return trade.AddTradesToBuffer(b.Name, trades...) + return trade.Add(b.Name, trades...) case strings.Contains(topic, "orderBookL2Api"): // TODO: Fix orderbook updates. var t wsOrderBook err = json.Unmarshal(respRaw, &t) diff --git a/exchanges/btse/btse_wrapper.go b/exchanges/btse/btse_wrapper.go index 72d25f42925..3f4c2f49807 100644 --- a/exchanges/btse/btse_wrapper.go +++ b/exchanges/btse/btse_wrapper.go @@ -443,7 +443,7 @@ func (b *BTSE) GetWithdrawalsHistory(_ context.Context, _ currency.Code, _ asset } // GetRecentTrades returns the most recent trades for a currency and asset -func (b *BTSE) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (b *BTSE) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { var err error p, err = b.FormatExchangeCurrency(p, assetType) if err != nil { @@ -462,7 +462,7 @@ func (b *BTSE) GetRecentTrades(ctx context.Context, p currency.Pair, assetType a return nil, err } - resp := make([]trade.Data, len(tradeData)) + resp := make([]trade.Trade, len(tradeData)) for i := range tradeData { tradeTimestamp := time.UnixMilli(tradeData[i].Time) var side order.Side @@ -470,7 +470,7 @@ func (b *BTSE) GetRecentTrades(ctx context.Context, p currency.Pair, assetType a if err != nil { return nil, err } - resp[i] = trade.Data{ + resp[i] = trade.Trade{ Exchange: b.Name, TID: strconv.FormatInt(tradeData[i].SerialID, 10), CurrencyPair: p, @@ -491,7 +491,7 @@ func (b *BTSE) GetRecentTrades(ctx context.Context, p currency.Pair, assetType a } // GetHistoricTrades returns historic trade data within the timeframe provided -func (b *BTSE) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (b *BTSE) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/bybit/bybit_websocket.go b/exchanges/bybit/bybit_websocket.go index 9d1b84268c1..a1a4994fe1d 100644 --- a/exchanges/bybit/bybit_websocket.go +++ b/exchanges/bybit/bybit_websocket.go @@ -655,7 +655,7 @@ func (by *Bybit) wsProcessPublicTrade(assetType asset.Item, resp *WebsocketRespo if err != nil { return err } - tradeDatas := make([]trade.Data, len(result)) + tradeDatas := make([]trade.Trade, len(result)) for x := range result { cp, err := by.MatchSymbolWithAvailablePairs(result[x].Symbol, assetType, hasPotentialDelimiter(assetType)) if err != nil { @@ -665,7 +665,7 @@ func (by *Bybit) wsProcessPublicTrade(assetType asset.Item, resp *WebsocketRespo if err != nil { return err } - tradeDatas[x] = trade.Data{ + tradeDatas[x] = trade.Trade{ Timestamp: result[x].OrderFillTimestamp.Time(), CurrencyPair: cp, AssetType: assetType, @@ -676,7 +676,7 @@ func (by *Bybit) wsProcessPublicTrade(assetType asset.Item, resp *WebsocketRespo TID: result[x].TradeID, } } - return trade.AddTradesToBuffer(by.Name, tradeDatas...) + return trade.Add(by.Name, tradeDatas...) } func (by *Bybit) wsProcessOrderbook(assetType asset.Item, resp *WebsocketResponse) error { diff --git a/exchanges/bybit/bybit_wrapper.go b/exchanges/bybit/bybit_wrapper.go index acb3ae66949..3d787df50f9 100644 --- a/exchanges/bybit/bybit_wrapper.go +++ b/exchanges/bybit/bybit_wrapper.go @@ -712,7 +712,7 @@ func (by *Bybit) GetRecentTrades(ctx context.Context, p currency.Pair, assetType } if by.IsSaveTradeDataEnabled() { - err := trade.AddTradesToBuffer(by.Name, resp...) + err := trade.Add(by.Name, resp...) if err != nil { return nil, err } diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index e18f52553f0..ea81665efac 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -241,7 +241,7 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error { if !c.IsSaveTradeDataEnabled() { return nil } - return trade.AddTradesToBuffer(c.Name, trade.Data{ + return trade.Add(c.Name, trade.Trade{ Timestamp: wsOrder.Time, Exchange: c.Name, CurrencyPair: p, diff --git a/exchanges/coinbasepro/coinbasepro_wrapper.go b/exchanges/coinbasepro/coinbasepro_wrapper.go index d5d9173db78..36778ac6fed 100644 --- a/exchanges/coinbasepro/coinbasepro_wrapper.go +++ b/exchanges/coinbasepro/coinbasepro_wrapper.go @@ -394,7 +394,7 @@ func (c *CoinbasePro) GetWithdrawalsHistory(_ context.Context, _ currency.Code, } // GetRecentTrades returns the most recent trades for a currency and asset -func (c *CoinbasePro) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (c *CoinbasePro) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { var err error p, err = c.FormatExchangeCurrency(p, assetType) if err != nil { @@ -405,14 +405,14 @@ func (c *CoinbasePro) GetRecentTrades(ctx context.Context, p currency.Pair, asse if err != nil { return nil, err } - resp := make([]trade.Data, len(tradeData)) + resp := make([]trade.Trade, len(tradeData)) for i := range tradeData { var side order.Side side, err = order.StringToOrderSide(tradeData[i].Side) if err != nil { return nil, err } - resp[i] = trade.Data{ + resp[i] = trade.Trade{ Exchange: c.Name, TID: strconv.FormatInt(tradeData[i].TradeID, 10), CurrencyPair: p, @@ -434,7 +434,7 @@ func (c *CoinbasePro) GetRecentTrades(ctx context.Context, p currency.Pair, asse } // GetHistoricTrades returns historic trade data within the timeframe provided -func (c *CoinbasePro) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (c *CoinbasePro) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/coinut/coinut_websocket.go b/exchanges/coinut/coinut_websocket.go index 7e1dece60e9..6d00260b055 100644 --- a/exchanges/coinut/coinut_websocket.go +++ b/exchanges/coinut/coinut_websocket.go @@ -277,7 +277,7 @@ func (c *COINUT) wsHandleData(_ context.Context, respRaw []byte) error { if err != nil { return err } - var trades []trade.Data + var trades []trade.Trade for i := range tradeSnap.Trades { pairs, err := c.GetEnabledPairs(asset.Spot) if err != nil { @@ -299,7 +299,7 @@ func (c *COINUT) wsHandleData(_ context.Context, respRaw []byte) error { } } - trades = append(trades, trade.Data{ + trades = append(trades, trade.Trade{ Timestamp: time.Unix(0, tradeSnap.Trades[i].Timestamp*1000), CurrencyPair: p, AssetType: asset.Spot, @@ -310,7 +310,7 @@ func (c *COINUT) wsHandleData(_ context.Context, respRaw []byte) error { TID: strconv.FormatInt(tradeSnap.Trades[i].TransID, 10), }) } - return trade.AddTradesToBuffer(c.Name, trades...) + return trade.Add(c.Name, trades...) case "inst_trade_update": if !c.IsSaveTradeDataEnabled() { return nil @@ -341,7 +341,7 @@ func (c *COINUT) wsHandleData(_ context.Context, respRaw []byte) error { } } - return trade.AddTradesToBuffer(c.Name, trade.Data{ + return trade.Add(c.Name, trade.Trade{ Timestamp: time.Unix(0, tradeUpdate.Timestamp*1000), CurrencyPair: p, AssetType: asset.Spot, diff --git a/exchanges/coinut/coinut_wrapper.go b/exchanges/coinut/coinut_wrapper.go index 20ae1cf7094..87c9b070541 100644 --- a/exchanges/coinut/coinut_wrapper.go +++ b/exchanges/coinut/coinut_wrapper.go @@ -444,7 +444,7 @@ func (c *COINUT) GetWithdrawalsHistory(_ context.Context, _ currency.Code, _ ass } // GetRecentTrades returns the most recent trades for a currency and asset -func (c *COINUT) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (c *COINUT) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { var err error p, err = c.FormatExchangeCurrency(p, assetType) if err != nil { @@ -459,14 +459,14 @@ func (c *COINUT) GetRecentTrades(ctx context.Context, p currency.Pair, assetType if err != nil { return nil, err } - resp := make([]trade.Data, len(tradeData.Trades)) + resp := make([]trade.Trade, len(tradeData.Trades)) for i := range tradeData.Trades { var side order.Side side, err = order.StringToOrderSide(tradeData.Trades[i].Side) if err != nil { return nil, err } - resp[i] = trade.Data{ + resp[i] = trade.Trade{ Exchange: c.Name, TID: strconv.FormatInt(tradeData.Trades[i].TransactionID, 10), CurrencyPair: p, @@ -488,7 +488,7 @@ func (c *COINUT) GetRecentTrades(ctx context.Context, p currency.Pair, assetType } // GetHistoricTrades returns historic trade data within the timeframe provided -func (c *COINUT) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (c *COINUT) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/deribit/deribit_test.go b/exchanges/deribit/deribit_test.go index 84b92c7b2d2..6d8b67bf26c 100644 --- a/exchanges/deribit/deribit_test.go +++ b/exchanges/deribit/deribit_test.go @@ -3332,7 +3332,7 @@ func TestGetWithdrawalsHistory(t *testing.T) { func TestGetRecentTrades(t *testing.T) { t.Parallel() - var result []trade.Data + var result []trade.Trade var err error for assetType, cp := range assetTypeToPairsMap { result, err = d.GetRecentTrades(context.Background(), cp, assetType) diff --git a/exchanges/deribit/deribit_websocket.go b/exchanges/deribit/deribit_websocket.go index d99dc86293d..5d577d29830 100644 --- a/exchanges/deribit/deribit_websocket.go +++ b/exchanges/deribit/deribit_websocket.go @@ -355,7 +355,7 @@ func (d *Deribit) processUserOrderChanges(respRaw []byte, channels []string) err if err != nil { return err } - td := make([]trade.Data, len(changeData.Trades)) + td := make([]trade.Trade, len(changeData.Trades)) for x := range changeData.Trades { var side order.Side side, err = order.StringToOrderSide(changeData.Trades[x].Direction) @@ -369,7 +369,7 @@ func (d *Deribit) processUserOrderChanges(respRaw []byte, channels []string) err return err } - td[x] = trade.Data{ + td[x] = trade.Trade{ CurrencyPair: cp, Exchange: d.Name, Timestamp: changeData.Trades[x].Timestamp.Time(), @@ -380,7 +380,7 @@ func (d *Deribit) processUserOrderChanges(respRaw []byte, channels []string) err AssetType: a, } } - err = trade.AddTradesToBuffer(d.Name, td...) + err = trade.Add(d.Name, td...) if err != nil { return err } @@ -462,7 +462,7 @@ func (d *Deribit) processTrades(respRaw []byte, channels []string) error { if len(tradeList) == 0 { return fmt.Errorf("%v, empty list of trades found", common.ErrNoResponse) } - tradeDatas := make([]trade.Data, len(tradeList)) + tradeDatas := make([]trade.Trade, len(tradeList)) for x := range tradeDatas { var cp currency.Pair var a asset.Item @@ -474,7 +474,7 @@ func (d *Deribit) processTrades(respRaw []byte, channels []string) error { if err != nil { return err } - tradeDatas[x] = trade.Data{ + tradeDatas[x] = trade.Trade{ CurrencyPair: cp, Exchange: d.Name, Timestamp: tradeList[x].Timestamp.Time(), @@ -485,7 +485,7 @@ func (d *Deribit) processTrades(respRaw []byte, channels []string) error { AssetType: a, } } - return trade.AddTradesToBuffer(d.Name, tradeDatas...) + return trade.Add(d.Name, tradeDatas...) } func (d *Deribit) processIncrementalTicker(respRaw []byte, channels []string) error { diff --git a/exchanges/deribit/deribit_wrapper.go b/exchanges/deribit/deribit_wrapper.go index 38023875caa..4e3a567b415 100644 --- a/exchanges/deribit/deribit_wrapper.go +++ b/exchanges/deribit/deribit_wrapper.go @@ -530,7 +530,7 @@ func (d *Deribit) GetWithdrawalsHistory(ctx context.Context, c currency.Code, _ } // GetRecentTrades returns the most recent trades for a currency and asset -func (d *Deribit) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (d *Deribit) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { if !d.SupportsAsset(assetType) { return nil, fmt.Errorf("%s: %w - %s", d.Name, asset.ErrNotSupported, assetType) } @@ -539,7 +539,7 @@ func (d *Deribit) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp return nil, err } instrumentID := d.formatPairString(assetType, p) - resp := []trade.Data{} + resp := []trade.Trade{} var trades *PublicTradesData if d.Websocket.IsConnected() { trades, err = d.WSRetrieveLastTradesByInstrument( @@ -557,7 +557,7 @@ func (d *Deribit) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp if trades.Trades[a].Direction == sideBUY { sideData = order.Buy } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: trades.Trades[a].TradeID, Exchange: d.Name, Price: trades.Trades[a].Price, @@ -572,7 +572,7 @@ func (d *Deribit) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp } // GetHistoricTrades returns historic trade data within the timeframe provided -func (d *Deribit) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Data, error) { +func (d *Deribit) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Trade, error) { if common.StartEndTimeCheck(timestampStart, timestampEnd) != nil { return nil, fmt.Errorf("invalid time range supplied. Start: %v End %v", timestampStart, @@ -589,7 +589,7 @@ func (d *Deribit) GetHistoricTrades(ctx context.Context, p currency.Pair, assetT default: return nil, fmt.Errorf("%w asset type %v", asset.ErrNotSupported, assetType) } - var resp []trade.Data + var resp []trade.Trade var tradesData *PublicTradesData var hasMore = true for hasMore { @@ -615,7 +615,7 @@ func (d *Deribit) GetHistoricTrades(ctx context.Context, p currency.Pair, assetT if tradesData.Trades[t].Direction == sideBUY { sideData = order.Buy } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: tradesData.Trades[t].TradeID, Exchange: d.Name, Price: tradesData.Trades[t].Price, diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 4369b1dd110..2b79baa6bb1 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -1187,7 +1187,7 @@ func (b *Base) ValidateKline(pair currency.Pair, a asset.Item, interval kline.In // AddTradesToBuffer is a helper function that will only // add trades to the buffer if it is allowed -func (b *Base) AddTradesToBuffer(trades ...trade.Data) error { +func (b *Base) AddTradesToBuffer(trades ...trade.Trade) error { if !b.IsSaveTradeDataEnabled() { return nil } @@ -1195,7 +1195,7 @@ func (b *Base) AddTradesToBuffer(trades ...trade.Data) error { } // IsSaveTradeDataEnabled checks the state of -// SaveTradeData in a concurrent-friendly manner +// SaveTradeDaAddt-friendly manner func (b *Base) IsSaveTradeDataEnabled() bool { b.settingsMutex.RLock() isEnabled := b.Features.Enabled.SaveTradeData diff --git a/exchanges/exchange_test.go b/exchanges/exchange_test.go index a5db589d047..f32cf9e9358 100644 --- a/exchanges/exchange_test.go +++ b/exchanges/exchange_test.go @@ -2986,11 +2986,11 @@ func (f *FakeBase) UpdateAccountInfo(context.Context, asset.Item) (account.Holdi return account.Holdings{}, nil } -func (f *FakeBase) GetRecentTrades(context.Context, currency.Pair, asset.Item) ([]trade.Data, error) { +func (f *FakeBase) GetRecentTrades(context.Context, currency.Pair, asset.Item) ([]trade.Trade, error) { return nil, nil } -func (f *FakeBase) GetHistoricTrades(context.Context, currency.Pair, asset.Item, time.Time, time.Time) ([]trade.Data, error) { +func (f *FakeBase) GetHistoricTrades(context.Context, currency.Pair, asset.Item, time.Time, time.Time) ([]trade.Trade, error) { return nil, nil } diff --git a/exchanges/exmo/exmo_wrapper.go b/exchanges/exmo/exmo_wrapper.go index 611d387cd61..d9d48fa1c63 100644 --- a/exchanges/exmo/exmo_wrapper.go +++ b/exchanges/exmo/exmo_wrapper.go @@ -424,7 +424,7 @@ func (e *EXMO) GetWithdrawalsHistory(ctx context.Context, _ currency.Code, _ ass } // GetRecentTrades returns the most recent trades for a currency and asset -func (e *EXMO) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (e *EXMO) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { var err error p, err = e.FormatExchangeCurrency(p, assetType) if err != nil { @@ -437,14 +437,14 @@ func (e *EXMO) GetRecentTrades(ctx context.Context, p currency.Pair, assetType a } mapData := tradeData[p.String()] - resp := make([]trade.Data, len(mapData)) + resp := make([]trade.Trade, len(mapData)) for i := range mapData { var side order.Side side, err = order.StringToOrderSide(mapData[i].Type) if err != nil { return nil, err } - resp[i] = trade.Data{ + resp[i] = trade.Trade{ Exchange: e.Name, TID: strconv.FormatInt(mapData[i].TradeID, 10), CurrencyPair: p, @@ -466,7 +466,7 @@ func (e *EXMO) GetRecentTrades(ctx context.Context, p currency.Pair, assetType a } // GetHistoricTrades returns historic trade data within the timeframe provided -func (e *EXMO) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (e *EXMO) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index ea53f12029a..4fe8daaa6b3 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -208,7 +208,7 @@ func (g *Gateio) WsHandleSpotData(_ context.Context, respRaw []byte) error { return g.processCrossMarginLoans(respRaw) case spotPongChannel: default: - g.Websocket.DataHandler <- stream.UnhandledMessageWarning{ + g.Websocket.DataHandler <- stream.UnhandledMessageWarningTrade Message: g.Name + stream.UnhandledMessage + string(respRaw), } return errors.New(stream.UnhandledMessage) diff --git a/exchanges/gateio/gateio_websocket_futures.go b/exchanges/gateio/gateio_websocket_futures.go index ba393277553..a3a589f5699 100644 --- a/exchanges/gateio/gateio_websocket_futures.go +++ b/exchanges/gateio/gateio_websocket_futures.go @@ -350,9 +350,9 @@ func (g *Gateio) processFuturesTrades(data []byte, assetType asset.Item) error { return err } - trades := make([]trade.Data, len(resp.Result)) + trades := make([]trade.Trade, len(resp.Result)) for x := range resp.Result { - trades[x] = trade.Data{ + trades[x] = trade.Trade{ Timestamp: resp.Result[x].CreateTimeMs.Time(), CurrencyPair: resp.Result[x].Contract, AssetType: assetType, diff --git a/exchanges/gateio/gateio_websocket_option.go b/exchanges/gateio/gateio_websocket_option.go index 091a9ad1f0f..babed57bb9a 100644 --- a/exchanges/gateio/gateio_websocket_option.go +++ b/exchanges/gateio/gateio_websocket_option.go @@ -399,9 +399,9 @@ func (g *Gateio) processOptionsTradesPushData(data []byte) error { if err != nil { return err } - trades := make([]trade.Data, len(resp.Result)) + trades := make([]trade.Trade, len(resp.Result)) for x := range resp.Result { - trades[x] = trade.Data{ + trades[x] = trade.Trade{ Timestamp: resp.Result[x].CreateTimeMs.Time(), CurrencyPair: resp.Result[x].Contract, AssetType: asset.Options, diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index 9d9929f6e56..a3ae7ba8f6c 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -926,12 +926,12 @@ func (g *Gateio) GetWithdrawalsHistory(ctx context.Context, c currency.Code, _ a withdrawalHistories[x] = exchange.WithdrawalHistory{ Status: records[x].Status, TransferID: records[x].ID, - Currency: records[x].Currency, + Currency: records[x].Currency,Trade Amount: records[x].Amount.Float64(), CryptoTxID: records[x].TransactionID, CryptoToAddress: records[x].WithdrawalAddress, Timestamp: records[x].Timestamp.Time(), - } + }Trade } return withdrawalHistories, nil } @@ -942,14 +942,14 @@ func (g *Gateio) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.I if err != nil { return nil, err } - var resp []trade.Data + var resp []trade.DataTrade switch a { case asset.Spot, asset.Margin, asset.CrossMargin: var tradeData []Trade if p.IsEmpty() { return nil, currency.ErrCurrencyPairEmpty } - tradeData, err = g.GetMarketTrades(ctx, p, 0, "", false, time.Time{}, time.Time{}, 0) + tradeData, err = TradetMarketTrades(ctx, p, 0, "", false, time.Time{}, time.Time{}, 0) if err != nil { return nil, err } @@ -971,9 +971,9 @@ func (g *Gateio) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.I Timestamp: tradeData[i].CreateTimeMs.Time(), } } - case asset.Futures: + case asset.Futures:Trade var settle currency.Code - settle, err = getSettlementFromCurrency(p) + settle, err = getTradelementFromCurrency(p) if err != nil { return nil, err } @@ -994,9 +994,9 @@ func (g *Gateio) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.I Timestamp: futuresTrades[i].CreateTime.Time(), } } - case asset.DeliveryFutures: + case asset.DeliveryFuTrades: var settle currency.Code - settle, err = getSettlementFromCurrency(p) + settle, err = getTradelementFromCurrency(p) if err != nil { return nil, err } @@ -1012,9 +1012,9 @@ func (g *Gateio) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.I Exchange: g.Name, CurrencyPair: p, AssetType: a, - Price: deliveryTrades[i].Price.Float64(), + Price: deliTradeTrades[i].Price.Float64(), Amount: deliveryTrades[i].Size, - Timestamp: deliveryTrades[i].CreateTime.Time(), + Timestamp: dTradeeryTrades[i].CreateTime.Time(), } } case asset.Options: @@ -1036,7 +1036,7 @@ func (g *Gateio) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.I } } default: - return nil, fmt.Errorf("%w asset type: %v", asset.ErrNotSupported, a) + return nil, fmt.Errorf("%w asset type: %v", asset.ErrNotSupported, a)Trade } err = g.AddTradesToBuffer(resp...) if err != nil { diff --git a/exchanges/gemini/gemini_websocket.go b/exchanges/gemini/gemini_websocket.go index cecf8bba1d2..03d090ac50f 100644 --- a/exchanges/gemini/gemini_websocket.go +++ b/exchanges/gemini/gemini_websocket.go @@ -335,7 +335,7 @@ func (g *Gemini) wsHandleData(respRaw []byte) error { return err } - tradeEvent := trade.Data{ + tradeEvent := trade.Trade{ Timestamp: time.UnixMilli(result.Timestamp), CurrencyPair: pair, AssetType: asset.Spot, @@ -346,7 +346,7 @@ func (g *Gemini) wsHandleData(respRaw []byte) error { TID: strconv.FormatInt(result.EventID, 10), } - return trade.AddTradesToBuffer(g.Name, tradeEvent) + return trade.Add(g.Name, tradeEvent) case "subscription_ack": var result WsSubscriptionAcknowledgementResponse err := json.Unmarshal(respRaw, &result) @@ -542,7 +542,7 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error { return nil } - trades := make([]trade.Data, len(result.Trades)) + trades := make([]trade.Trade, len(result.Trades)) for x := range result.Trades { tSide, err := order.StringToOrderSide(result.Trades[x].Side) if err != nil { @@ -551,7 +551,7 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error { Err: err, } } - trades[x] = trade.Data{ + trades[x] = trade.Trade{ Timestamp: time.UnixMilli(result.Trades[x].Timestamp), CurrencyPair: pair, AssetType: asset.Spot, @@ -563,7 +563,7 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error { } } - return trade.AddTradesToBuffer(g.Name, trades...) + return trade.Add(g.Name, trades...) } func channelName(s *subscription.Subscription) string { diff --git a/exchanges/gemini/gemini_wrapper.go b/exchanges/gemini/gemini_wrapper.go index 6c71964fb5e..56d4e700307 100644 --- a/exchanges/gemini/gemini_wrapper.go +++ b/exchanges/gemini/gemini_wrapper.go @@ -426,12 +426,12 @@ func (g *Gemini) GetWithdrawalsHistory(ctx context.Context, c currency.Code, a a } // GetRecentTrades returns the most recent trades for a currency and asset -func (g *Gemini) GetRecentTrades(ctx context.Context, pair currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (g *Gemini) GetRecentTrades(ctx context.Context, pair currency.Pair, assetType asset.Item) ([]trade.Trade, error) { return g.GetHistoricTrades(ctx, pair, assetType, time.Time{}, time.Time{}) } // GetHistoricTrades returns historic trade data within the timeframe provided -func (g *Gemini) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Data, error) { +func (g *Gemini) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Trade, error) { if err := common.StartEndTimeCheck(timestampStart, timestampEnd); err != nil && !errors.Is(err, common.ErrDateUnset) { return nil, fmt.Errorf("invalid time range supplied. Start: %v End %v %w", timestampStart, timestampEnd, err) } @@ -440,7 +440,7 @@ func (g *Gemini) GetHistoricTrades(ctx context.Context, p currency.Pair, assetTy if err != nil { return nil, err } - var resp []trade.Data + var resp []trade.Trade ts := timestampStart limit := 500 allTrades: @@ -465,7 +465,7 @@ allTrades: if err != nil { return nil, err } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ Exchange: g.Name, TID: strconv.FormatInt(tradeData[i].TID, 10), CurrencyPair: p, diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index 7b384d3be97..7c531bf0749 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -223,7 +223,7 @@ func (h *HitBTC) wsHandleData(respRaw []byte) error { if err != nil { return err } - var trades []trade.Data + var trades []trade.Trade p, err := currency.NewPairFromString(tradeSnapshot.Params.Symbol) if err != nil { return &order.ClassificationError{ @@ -239,7 +239,7 @@ func (h *HitBTC) wsHandleData(respRaw []byte) error { Err: err, } } - trades = append(trades, trade.Data{ + trades = append(trades, trade.Trade{ Timestamp: tradeSnapshot.Params.Data[i].Timestamp, Exchange: h.Name, CurrencyPair: p, @@ -250,7 +250,7 @@ func (h *HitBTC) wsHandleData(respRaw []byte) error { TID: strconv.FormatInt(tradeSnapshot.Params.Data[i].ID, 10), }) } - return trade.AddTradesToBuffer(h.Name, trades...) + return trade.Add(h.Name, trades...) case "activeOrders": var o wsActiveOrdersResponse err := json.Unmarshal(respRaw, &o) diff --git a/exchanges/hitbtc/hitbtc_wrapper.go b/exchanges/hitbtc/hitbtc_wrapper.go index 8a57bc7ee78..b98fabc35b4 100644 --- a/exchanges/hitbtc/hitbtc_wrapper.go +++ b/exchanges/hitbtc/hitbtc_wrapper.go @@ -387,12 +387,12 @@ func (h *HitBTC) GetWithdrawalsHistory(_ context.Context, _ currency.Code, _ ass } // GetRecentTrades returns the most recent trades for a currency and asset -func (h *HitBTC) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (h *HitBTC) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { return h.GetHistoricTrades(ctx, p, assetType, time.Now().Add(-time.Minute*15), time.Now()) } // GetHistoricTrades returns historic trade data within the timeframe provided -func (h *HitBTC) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Data, error) { +func (h *HitBTC) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Trade, error) { if err := common.StartEndTimeCheck(timestampStart, timestampEnd); err != nil { return nil, fmt.Errorf("invalid time range supplied. Start: %v End %v %w", timestampStart, timestampEnd, err) } @@ -402,7 +402,7 @@ func (h *HitBTC) GetHistoricTrades(ctx context.Context, p currency.Pair, assetTy return nil, err } ts := timestampStart - var resp []trade.Data + var resp []trade.Trade limit := 1000 allTrades: for { @@ -427,7 +427,7 @@ allTrades: if err != nil { return nil, err } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ Exchange: h.Name, TID: strconv.FormatInt(tradeData[i].ID, 10), CurrencyPair: p, diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index 0ff45913cce..cad22dc4a34 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -409,13 +409,13 @@ func (h *HUOBI) wsHandleData(respRaw []byte) error { if err != nil { return err } - var trades []trade.Data + var trades []trade.Trade for i := range t.Tick.Data { side := order.Buy if t.Tick.Data[i].Direction != "buy" { side = order.Sell } - trades = append(trades, trade.Data{ + trades = append(trades, trade.Trade{ Exchange: h.Name, AssetType: a, CurrencyPair: p, @@ -426,7 +426,7 @@ func (h *HUOBI) wsHandleData(respRaw []byte) error { TID: strconv.FormatFloat(t.Tick.Data[i].TradeID, 'f', -1, 64), }) } - return trade.AddTradesToBuffer(h.Name, trades...) + return trade.Add(h.Name, trades...) case strings.Contains(init.Channel, "detail"), strings.Contains(init.Rep, "detail"): var wsTicker WsTick diff --git a/exchanges/huobi/huobi_wrapper.go b/exchanges/huobi/huobi_wrapper.go index 76c461cb196..63d6b8eb21c 100644 --- a/exchanges/huobi/huobi_wrapper.go +++ b/exchanges/huobi/huobi_wrapper.go @@ -912,8 +912,8 @@ func (h *HUOBI) GetWithdrawalsHistory(ctx context.Context, c currency.Code, a as } // GetRecentTrades returns the most recent trades for a currency and asset -func (h *HUOBI) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.Item) ([]trade.Data, error) { - var resp []trade.Data +func (h *HUOBI) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.Item) ([]trade.Trade, error) { + var resp []trade.Trade pFmt, err := h.GetPairFormat(a, true) if err != nil { return nil, err @@ -934,7 +934,7 @@ func (h *HUOBI) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.It if err != nil { return nil, err } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ Exchange: h.Name, TID: strconv.FormatFloat(sTrades[i].Trades[j].TradeID, 'f', -1, 64), CurrencyPair: p, @@ -961,7 +961,7 @@ func (h *HUOBI) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.It return nil, err } } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ Exchange: h.Name, TID: strconv.FormatInt(fTrades.Data[i].Data[j].ID, 10), CurrencyPair: p, @@ -987,7 +987,7 @@ func (h *HUOBI) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.It return nil, err } } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ Exchange: h.Name, TID: strconv.FormatInt(cTrades.Data[i].ID, 10), CurrencyPair: p, @@ -1010,7 +1010,7 @@ func (h *HUOBI) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.It } // GetHistoricTrades returns historic trade data within the timeframe provided -func (h *HUOBI) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (h *HUOBI) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/interfaces.go b/exchanges/interfaces.go index 0f6071a864d..6317c682486 100644 --- a/exchanges/interfaces.go +++ b/exchanges/interfaces.go @@ -53,8 +53,8 @@ type IBotExchange interface { GetAvailablePairs(a asset.Item) (currency.Pairs, error) SetPairs(pairs currency.Pairs, a asset.Item, enabled bool) error GetAssetTypes(enabled bool) asset.Items - GetRecentTrades(ctx context.Context, p currency.Pair, a asset.Item) ([]trade.Data, error) - GetHistoricTrades(ctx context.Context, p currency.Pair, a asset.Item, startTime, endTime time.Time) ([]trade.Data, error) + GetRecentTrades(ctx context.Context, p currency.Pair, a asset.Item) ([]trade.Trade, error) + GetHistoricTrades(ctx context.Context, p currency.Pair, a asset.Item, startTime, endTime time.Time) ([]trade.Trade, error) GetFeeByType(ctx context.Context, f *FeeBuilder) (float64, error) GetLastPairsUpdateTime() int64 GetWithdrawPermissions() uint32 diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 5b276055181..fb08925db4e 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -534,7 +534,7 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { if !k.IsSaveTradeDataEnabled() { return nil } - trades := make([]trade.Data, len(data)) + trades := make([]trade.Trade, len(data)) for i := range data { t, ok := data[i].([]interface{}) if !ok { @@ -563,7 +563,7 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { tSide = order.Sell } - trades[i] = trade.Data{ + trades[i] = trade.Trade{ AssetType: asset.Spot, CurrencyPair: pair, Exchange: k.Name, @@ -573,7 +573,7 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { Side: tSide, } } - return trade.AddTradesToBuffer(k.Name, trades...) + return trade.Add(k.Name, trades...) } // wsProcessOrderBook handles both partial and full orderbook updates diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index 610350c27f8..c468a35ac33 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -666,13 +666,13 @@ func (k *Kraken) GetWithdrawalsHistory(ctx context.Context, c currency.Code, _ a } // GetRecentTrades returns the most recent trades for a currency and asset -func (k *Kraken) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (k *Kraken) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { var err error p, err = k.FormatExchangeCurrency(p, assetType) if err != nil { return nil, err } - var resp []trade.Data + var resp []trade.Trade switch assetType { case asset.Spot: var tradeData []RecentTrades @@ -685,7 +685,7 @@ func (k *Kraken) GetRecentTrades(ctx context.Context, p currency.Pair, assetType if tradeData[i].BuyOrSell == "s" { side = order.Sell } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: strconv.FormatInt(tradeData[i].TradeID, 10), Exchange: k.Name, CurrencyPair: p, @@ -707,7 +707,7 @@ func (k *Kraken) GetRecentTrades(ctx context.Context, p currency.Pair, assetType if strings.EqualFold(tradeData.Elements[i].ExecutionEvent.OuterExecutionHolder.Execution.MakerOrder.Direction, "sell") { side = order.Sell } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: tradeData.Elements[i].UID, Exchange: k.Name, CurrencyPair: p, @@ -732,7 +732,7 @@ func (k *Kraken) GetRecentTrades(ctx context.Context, p currency.Pair, assetType } // GetHistoricTrades returns historic trade data within the timeframe provided -func (k *Kraken) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (k *Kraken) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/kucoin/kucoin_test.go b/exchanges/kucoin/kucoin_test.go index 86c3ac38b33..4b40cbd398d 100644 --- a/exchanges/kucoin/kucoin_test.go +++ b/exchanges/kucoin/kucoin_test.go @@ -2062,7 +2062,7 @@ func TestGetServerTime(t *testing.T) { func TestGetRecentTrades(t *testing.T) { t.Parallel() - var result []trade.Data + var result []trade.Trade var err error for assetType, tp := range assertToTradablePairMap { result, err = ku.GetRecentTrades(context.Background(), tp, assetType) diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index e364b2b84fc..1fbd798f32a 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -766,7 +766,7 @@ func (ku *Kucoin) processTradeData(respData []byte, instrument, topic string) er CurrencyPair: pair, Timestamp: response.Time.Time(), Price: response.Price, - Amount: response.Size, + Amount: response.Size,Trade Side: side, Exchange: ku.Name, TID: response.TradeID, diff --git a/exchanges/kucoin/kucoin_wrapper.go b/exchanges/kucoin/kucoin_wrapper.go index 12a2bc42216..bb08fc40abf 100644 --- a/exchanges/kucoin/kucoin_wrapper.go +++ b/exchanges/kucoin/kucoin_wrapper.go @@ -579,12 +579,12 @@ func (ku *Kucoin) GetWithdrawalsHistory(ctx context.Context, c currency.Code, as } // GetRecentTrades returns the most recent trades for a currency and asset -func (ku *Kucoin) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (ku *Kucoin) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { p, err := ku.FormatExchangeCurrency(p, assetType) if err != nil { return nil, err } - var resp []trade.Data + var resp []trade.Trade switch assetType { case asset.Futures: tradeData, err := ku.GetFuturesTradeHistory(ctx, p.String()) @@ -597,7 +597,7 @@ func (ku *Kucoin) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp if err != nil { return nil, err } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: tradeData[i].TradeID, Exchange: ku.Name, CurrencyPair: p, @@ -619,7 +619,7 @@ func (ku *Kucoin) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp if err != nil { return nil, err } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: tradeData[i].Sequence, Exchange: ku.Name, CurrencyPair: p, @@ -634,7 +634,7 @@ func (ku *Kucoin) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp return nil, fmt.Errorf("%w %v", asset.ErrNotSupported, assetType) } if ku.IsSaveTradeDataEnabled() { - err := trade.AddTradesToBuffer(ku.Name, resp...) + err := trade.Add(ku.Name, resp...) if err != nil { return nil, err } @@ -644,7 +644,7 @@ func (ku *Kucoin) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp } // GetHistoricTrades returns historic trade data within the timeframe provided -func (ku *Kucoin) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (ku *Kucoin) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported } diff --git a/exchanges/lbank/lbank_wrapper.go b/exchanges/lbank/lbank_wrapper.go index 486f49bcae9..600e0583eab 100644 --- a/exchanges/lbank/lbank_wrapper.go +++ b/exchanges/lbank/lbank_wrapper.go @@ -371,12 +371,12 @@ func (l *Lbank) GetWithdrawalsHistory(ctx context.Context, c currency.Code, a as } // GetRecentTrades returns the most recent trades for a currency and asset -func (l *Lbank) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (l *Lbank) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { return l.GetHistoricTrades(ctx, p, assetType, time.Now().Add(-time.Minute*15), time.Now()) } // GetHistoricTrades returns historic trade data within the timeframe provided -func (l *Lbank) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Data, error) { +func (l *Lbank) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Trade, error) { if err := common.StartEndTimeCheck(timestampStart, timestampEnd); err != nil { return nil, fmt.Errorf("invalid time range supplied. Start: %v End %v %w", timestampStart, timestampEnd, err) } @@ -385,7 +385,7 @@ func (l *Lbank) GetHistoricTrades(ctx context.Context, p currency.Pair, assetTyp if err != nil { return nil, err } - var resp []trade.Data + var resp []trade.Trade ts := timestampStart limit := 600 allTrades: @@ -407,7 +407,7 @@ allTrades: if strings.Contains(tradeData[i].Type, "sell") { side = order.Sell } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ Exchange: l.Name, TID: tradeData[i].TID, CurrencyPair: p, diff --git a/exchanges/okx/okx_websocket.go b/exchanges/okx/okx_websocket.go index 8303452521a..5055e4e2952 100644 --- a/exchanges/okx/okx_websocket.go +++ b/exchanges/okx/okx_websocket.go @@ -973,14 +973,14 @@ func (ok *Okx) wsProcessTrades(data []byte) error { if err != nil { return err } - trades := make([]trade.Data, 0, len(response.Data)*len(assets)) + trades := make([]trade.Trade, 0, len(response.Data)*len(assets)) for i := range response.Data { pair, err := currency.NewPairFromString(response.Data[i].InstrumentID) if err != nil { return err } for j := range assets { - trades = append(trades, trade.Data{ + trades = append(trades, trade.Trade{ Amount: response.Data[i].Quantity.Float64(), AssetType: assets[j], CurrencyPair: pair, @@ -992,7 +992,7 @@ func (ok *Okx) wsProcessTrades(data []byte) error { }) } } - return trade.AddTradesToBuffer(ok.Name, trades...) + return trade.Add(ok.Name, trades...) } // wsProcessOrders handles websocket order push data responses. diff --git a/exchanges/okx/okx_wrapper.go b/exchanges/okx/okx_wrapper.go index 344fa4ba883..8ea01991e33 100644 --- a/exchanges/okx/okx_wrapper.go +++ b/exchanges/okx/okx_wrapper.go @@ -624,7 +624,7 @@ func (ok *Okx) GetWithdrawalsHistory(ctx context.Context, c currency.Code, _ ass } // GetRecentTrades returns the most recent trades for a currency and asset -func (ok *Okx) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (ok *Okx) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { pairFormat, err := ok.GetPairFormat(assetType, true) if err != nil { return nil, err @@ -638,9 +638,9 @@ func (ok *Okx) GetRecentTrades(ctx context.Context, p currency.Pair, assetType a return nil, err } - resp := make([]trade.Data, len(tradeData)) + resp := make([]trade.Trade, len(tradeData)) for x := range tradeData { - resp[x] = trade.Data{ + resp[x] = trade.Trade{ TID: tradeData[x].TradeID, Exchange: ok.Name, CurrencyPair: p, @@ -652,7 +652,7 @@ func (ok *Okx) GetRecentTrades(ctx context.Context, p currency.Pair, assetType a } } if ok.IsSaveTradeDataEnabled() { - err = trade.AddTradesToBuffer(ok.Name, resp...) + err = trade.Add(ok.Name, resp...) if err != nil { return nil, err } @@ -662,7 +662,7 @@ func (ok *Okx) GetRecentTrades(ctx context.Context, p currency.Pair, assetType a } // GetHistoricTrades retrieves historic trade data within the timeframe provided -func (ok *Okx) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Data, error) { +func (ok *Okx) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Trade, error) { if timestampStart.Before(time.Now().Add(-kline.ThreeMonth.Duration())) { return nil, errOnlyThreeMonthsSupported } @@ -674,7 +674,7 @@ func (ok *Okx) GetHistoricTrades(ctx context.Context, p currency.Pair, assetType if p.IsEmpty() { return nil, currency.ErrCurrencyPairEmpty } - var resp []trade.Data + var resp []trade.Trade instrumentID := pairFormat.Format(p) tradeIDEnd := "" allTrades: @@ -694,7 +694,7 @@ allTrades: // reached end of trades to crawl break allTrades } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ TID: trades[i].TradeID, Exchange: ok.Name, CurrencyPair: p, @@ -708,7 +708,7 @@ allTrades: tradeIDEnd = trades[len(trades)-1].TradeID } if ok.IsSaveTradeDataEnabled() { - err = trade.AddTradesToBuffer(ok.Name, resp...) + err = trade.Add(ok.Name, resp...) if err != nil { return nil, err } diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index 2d2ce08ce4e..c5240248af8 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -1136,7 +1136,7 @@ func (p *Poloniex) processTrades(currencyID float64, subData []interface{}) erro return fmt.Errorf("%w time not float64", errTypeAssertionFailure) } - return p.AddTradesToBuffer(trade.Data{ + return p.AddTradesToBuffer(trade.Trade{ TID: tradeID, Exchange: p.Name, CurrencyPair: pair, diff --git a/exchanges/poloniex/poloniex_wrapper.go b/exchanges/poloniex/poloniex_wrapper.go index 183e96dd29c..5dd364032a3 100644 --- a/exchanges/poloniex/poloniex_wrapper.go +++ b/exchanges/poloniex/poloniex_wrapper.go @@ -466,12 +466,12 @@ func (p *Poloniex) GetWithdrawalsHistory(ctx context.Context, c currency.Code, _ } // GetRecentTrades returns the most recent trades for a currency and asset -func (p *Poloniex) GetRecentTrades(ctx context.Context, pair currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (p *Poloniex) GetRecentTrades(ctx context.Context, pair currency.Pair, assetType asset.Item) ([]trade.Trade, error) { return p.GetHistoricTrades(ctx, pair, assetType, time.Now().Add(-time.Minute*15), time.Now()) } // GetHistoricTrades returns historic trade data within the timeframe provided -func (p *Poloniex) GetHistoricTrades(ctx context.Context, pair currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Data, error) { +func (p *Poloniex) GetHistoricTrades(ctx context.Context, pair currency.Pair, assetType asset.Item, timestampStart, timestampEnd time.Time) ([]trade.Trade, error) { if err := common.StartEndTimeCheck(timestampStart, timestampEnd); err != nil { return nil, fmt.Errorf("invalid time range supplied. Start: %v End %v %w", timestampStart, timestampEnd, err) } @@ -481,7 +481,7 @@ func (p *Poloniex) GetHistoricTrades(ctx context.Context, pair currency.Pair, as return nil, err } - var resp []trade.Data + var resp []trade.Trade ts := timestampStart allTrades: for { @@ -507,7 +507,7 @@ allTrades: if err != nil { return nil, err } - resp = append(resp, trade.Data{ + resp = append(resp, trade.Trade{ Exchange: p.Name, TID: tradeData[i].TradeID, CurrencyPair: pair, diff --git a/exchanges/sharedtestvalues/customex.go b/exchanges/sharedtestvalues/customex.go index e7735dd793d..6d9bb5df95d 100644 --- a/exchanges/sharedtestvalues/customex.go +++ b/exchanges/sharedtestvalues/customex.go @@ -122,12 +122,12 @@ func (c *CustomEx) GetAssetTypes(_ bool) asset.Items { } // GetRecentTrades is a mock method for CustomEx -func (c *CustomEx) GetRecentTrades(_ context.Context, _ currency.Pair, _ asset.Item) ([]trade.Data, error) { +func (c *CustomEx) GetRecentTrades(_ context.Context, _ currency.Pair, _ asset.Item) ([]trade.Trade, error) { return nil, nil } // GetHistoricTrades is a mock method for CustomEx -func (c *CustomEx) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (c *CustomEx) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, nil } diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 737c0eadc7f..0b8537bf847 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -199,7 +199,6 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { return err } - w.Trade.Setup(w.exchangeName, s.TradeFeed, w.DataHandler) w.Fills.Setup(s.FillsFeed, w.DataHandler) if s.MaxWebsocketSubscriptionsPerConnection < 0 { diff --git a/exchanges/stream/websocket_types.go b/exchanges/stream/websocket_types.go index 26b20f1ee74..8b8776e0c55 100644 --- a/exchanges/stream/websocket_types.go +++ b/exchanges/stream/websocket_types.go @@ -12,7 +12,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" - "github.com/thrasher-corp/gocryptotrader/exchanges/trade" ) // Websocket functionality list and state consts @@ -81,9 +80,6 @@ type Websocket struct { // Orderbook is a local buffer of orderbooks Orderbook buffer.Orderbook - // Trade is a notifier of occurring trades - Trade trade.Trade - // Fills is a notifier of occurring fills Fills fill.Fills diff --git a/exchanges/trade/trade.go b/exchanges/trade/trade.go index e6427a50d97..478d298ed0c 100644 --- a/exchanges/trade/trade.go +++ b/exchanges/trade/trade.go @@ -1,6 +1,7 @@ package trade import ( + "context" "errors" "fmt" "sort" @@ -19,45 +20,8 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) -// Setup creates the trade processor if trading is supported -func (p *Processor) setup(wg *sync.WaitGroup) { - p.mutex.Lock() - p.bufferProcessorInterval = BufferProcessorIntervalTime - p.mutex.Unlock() - go p.Run(wg) -} - -// Setup configures necessary fields to the `Trade` structure that govern trade data -// processing. -func (t *Trade) Setup(exchangeName string, tradeFeedEnabled bool, c chan interface{}) { - t.exchangeName = exchangeName - t.dataHandler = c - t.tradeFeedEnabled = tradeFeedEnabled -} - -// Update processes trade data, either by saving it or routing it through -// the data channel. -func (t *Trade) Update(save bool, data ...Data) error { - if len(data) == 0 { - // nothing to do - return nil - } - - if t.tradeFeedEnabled { - t.dataHandler <- data - } - - if save { - if err := AddTradesToBuffer(t.exchangeName, data...); err != nil { - return err - } - } - - return nil -} - -// AddTradesToBuffer will push trade data onto the buffer -func AddTradesToBuffer(exchangeName string, data ...Data) error { +// Add will push trade data onto the buffer +func Add(exchangeName string, trades ...Trade) error { cfg := database.DB.GetConfig() if database.DB == nil || cfg == nil || !cfg.Enabled { return nil @@ -65,15 +29,13 @@ func AddTradesToBuffer(exchangeName string, data ...Data) error { if len(data) == 0 { return nil } - if atomic.AddInt32(&processor.started, 0) == 0 { - var wg sync.WaitGroup - wg.Add(1) - processor.setup(&wg) - wg.Wait() + if processor.started.CompareAndSwap(false, true) { + ctx := context.Background() + processor.Start(ctx) } - validDatas := make([]Data, 0, len(data)) var errs error for i := range data { + trade if data[i].Price == 0 || data[i].Amount == 0 || data[i].CurrencyPair.IsEmpty() || @@ -102,7 +64,9 @@ func AddTradesToBuffer(exchangeName string, data ...Data) error { errs = common.AppendError(errs, fmt.Errorf("%s uuid failed to generate for trade: %+v", exchangeName, data[i])) } data[i].ID = uu - validDatas = append(validDatas, data[i]) + go func() { + processor.queue <- data[i] + }() } processor.mutex.Lock() processor.buffer = append(processor.buffer, validDatas...) @@ -110,6 +74,13 @@ func AddTradesToBuffer(exchangeName string, data ...Data) error { return errs } +// Start creates a processor pipeline and orphans a go routine +func (p *Processor) Start(ctx context.Context) { + p.BufferProcessorInterval = BufferProcessorIntervalTime + p.queue = make(chan *Trade, 4096) + go p.Run(wg) +} + // Run will save trade data to the database in batches func (p *Processor) Run(wg *sync.WaitGroup) { wg.Done() @@ -142,7 +113,7 @@ func (p *Processor) Run(wg *sync.WaitGroup) { } // SaveTradesToDatabase converts trades and saves results to database -func SaveTradesToDatabase(trades ...Data) error { +func SaveTradesToDatabase(trades ...Trade) error { sqlTrades, err := tradeToSQLData(trades...) if err != nil { return err @@ -152,7 +123,7 @@ func SaveTradesToDatabase(trades ...Data) error { // GetTradesInRange calls db function to return trades in range // to minimise tradesql package usage -func GetTradesInRange(exchangeName, assetType, base, quote string, startDate, endDate time.Time) ([]Data, error) { +func GetTradesInRange(exchangeName, assetType, base, quote string, startDate, endDate time.Time) ([]Trade, error) { if exchangeName == "" || assetType == "" || base == "" || quote == "" || startDate.IsZero() || endDate.IsZero() { return nil, errors.New("invalid arguments received") } @@ -174,7 +145,7 @@ func HasTradesInRanges(exchangeName, assetType, base, quote string, rangeHolder return tradesql.VerifyTradeInIntervals(exchangeName, assetType, base, quote, rangeHolder) } -func tradeToSQLData(trades ...Data) ([]tradesql.Data, error) { +func tradeToSQLData(trades ...Trade) ([]tradesql.Data, error) { sort.Sort(ByDate(trades)) results := make([]tradesql.Data, len(trades)) for i := range trades { @@ -199,8 +170,8 @@ func tradeToSQLData(trades ...Data) ([]tradesql.Data, error) { } // SQLDataToTrade converts sql data to glorious trade data -func SQLDataToTrade(dbTrades ...tradesql.Data) ([]Data, error) { - result := make([]Data, len(dbTrades)) +func SQLDataToTrade(dbTrades ...tradesql.Data) ([]Trade, error) { + result := make([]Trade, len(dbTrades)) for i := range dbTrades { cp, err := currency.NewPairFromStrings(dbTrades[i].Base, dbTrades[i].Quote) if err != nil { @@ -214,7 +185,7 @@ func SQLDataToTrade(dbTrades ...tradesql.Data) ([]Data, error) { if err != nil { return nil, err } - result[i] = Data{ + result[i] = Trade{ ID: uuid.FromStringOrNil(dbTrades[i].ID), Timestamp: dbTrades[i].Timestamp.UTC(), Exchange: dbTrades[i].Exchange, @@ -229,7 +200,7 @@ func SQLDataToTrade(dbTrades ...tradesql.Data) ([]Data, error) { } // ConvertTradesToCandles turns trade data into kline.Items -func ConvertTradesToCandles(interval kline.Interval, trades ...Data) (*kline.Item, error) { +func ConvertTradesToCandles(interval kline.Interval, trades ...Trade) (*kline.Item, error) { if len(trades) == 0 { return nil, ErrNoTradesSupplied } @@ -247,8 +218,8 @@ func ConvertTradesToCandles(interval kline.Interval, trades ...Data) (*kline.Ite return &candles, nil } -func groupTradesToInterval(interval kline.Interval, times ...Data) map[int64][]Data { - groupedData := make(map[int64][]Data) +func groupTradesToInterval(interval kline.Interval, times ...Trade) map[int64][]Trade { + groupedData := make(map[int64][]Trade) for i := range times { nearestInterval := getNearestInterval(times[i].Timestamp, interval) groupedData[nearestInterval] = append( @@ -263,7 +234,7 @@ func getNearestInterval(t time.Time, interval kline.Interval) int64 { return t.Truncate(interval.Duration()).UTC().Unix() } -func classifyOHLCV(t time.Time, datas ...Data) (c kline.Candle) { +func classifyOHLCV(t time.Time, datas ...Trade) (c kline.Candle) { sort.Sort(ByDate(datas)) c.Open = datas[0].Price c.Close = datas[len(datas)-1].Price @@ -288,12 +259,12 @@ func classifyOHLCV(t time.Time, datas ...Data) (c kline.Candle) { // FilterTradesByTime removes any trades that are not between the start // and end times -func FilterTradesByTime(trades []Data, startTime, endTime time.Time) []Data { +func FilterTradesByTime(trades []Trade, startTime, endTime time.Time) []Trade { if startTime.IsZero() || endTime.IsZero() { // can't filter without boundaries return trades } - var filteredTrades []Data + var filteredTrades []Trade for i := range trades { if trades[i].Timestamp.After(startTime) && trades[i].Timestamp.Before(endTime) { filteredTrades = append(filteredTrades, trades[i]) diff --git a/exchanges/trade/trade_test.go b/exchanges/trade/trade_test.go index 9944f632647..58d426d61e1 100644 --- a/exchanges/trade/trade_test.go +++ b/exchanges/trade/trade_test.go @@ -38,7 +38,7 @@ func TestAddTradesToBuffer(t *testing.T) { t.Error(err) } cp, _ := currency.NewPairFromString("BTC-USD") - err = AddTradesToBuffer("test!", []Data{ + err = Add("test!", []Trade{ { Timestamp: time.Now(), Exchange: "test!", @@ -56,7 +56,7 @@ func TestAddTradesToBuffer(t *testing.T) { t.Error("expected the processor to have started") } - err = AddTradesToBuffer("test!", []Data{ + err = Add("test!", []Trade{ { Timestamp: time.Now(), Exchange: "test!", @@ -74,7 +74,7 @@ func TestAddTradesToBuffer(t *testing.T) { processor.buffer = nil processor.mutex.Unlock() - err = AddTradesToBuffer("test!", []Data{ + err = Add("test!", []Trade{ { Timestamp: time.Now(), Exchange: "test!", @@ -131,7 +131,7 @@ func TestSqlDataToTrade(t *testing.T) { func TestTradeToSQLData(t *testing.T) { t.Parallel() cp := currency.NewPair(currency.BTC, currency.USD) - sqlData, err := tradeToSQLData(Data{ + sqlData, err := tradeToSQLData(Trade{ Timestamp: time.Now(), Exchange: "test!", CurrencyPair: cp, @@ -158,7 +158,7 @@ func TestConvertTradesToCandles(t *testing.T) { t.Parallel() cp, _ := currency.NewPairFromString("BTC-USD") startDate := time.Date(2020, 1, 1, 1, 0, 0, 0, time.UTC) - candles, err := ConvertTradesToCandles(kline.FifteenSecond, []Data{ + candles, err := ConvertTradesToCandles(kline.FifteenSecond, []Trade{ { Timestamp: startDate, Exchange: "test!", @@ -219,7 +219,7 @@ func TestShutdown(t *testing.T) { func TestFilterTradesByTime(t *testing.T) { t.Parallel() - trades := []Data{ + trades := []Trade{ { Exchange: "test", Timestamp: time.Now().Add(-time.Second), @@ -237,7 +237,7 @@ func TestFilterTradesByTime(t *testing.T) { func TestSaveTradesToDatabase(t *testing.T) { t.Parallel() - err := SaveTradesToDatabase(Data{}) + err := SaveTradesToDatabase(Trade{}) if err != nil && err.Error() != "exchange name/uuid not set, cannot insert" { t.Error(err) } diff --git a/exchanges/trade/trade_types.go b/exchanges/trade/trade_types.go index 0a7ec8a0b98..caf4b32c217 100644 --- a/exchanges/trade/trade_types.go +++ b/exchanges/trade/trade_types.go @@ -2,7 +2,7 @@ package trade import ( "errors" - "sync" + "sync/atomic" "time" "github.com/gofrs/uuid" @@ -11,29 +11,16 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/order" ) -// DefaultProcessorIntervalTime is the default timer -// to process queued trades and save them to the database -const DefaultProcessorIntervalTime = time.Second * 15 +const DefaultSaveInterval = time.Second * 15 var ( processor Processor - // BufferProcessorIntervalTime is the interval to save trade buffer data to the database. - // Change this by changing the runtime param `-tradeprocessinginterval=15s` - BufferProcessorIntervalTime = DefaultProcessorIntervalTime // ErrNoTradesSupplied is returned when an attempt is made to process trades, but is an empty slice ErrNoTradesSupplied = errors.New("no trades supplied") ) -// Trade used to hold data and methods related to trade dissemination and -// storage +// Trade defines trade data type Trade struct { - exchangeName string - dataHandler chan interface{} - tradeFeedEnabled bool -} - -// Data defines trade data -type Data struct { ID uuid.UUID `json:"ID,omitempty"` TID string Exchange string @@ -45,17 +32,16 @@ type Data struct { Timestamp time.Time } -// Processor used for processing trade data in batches -// and saving them to the database +// Processor used for processing trade data in batches and saving them to the database type Processor struct { - mutex sync.Mutex - started int32 + started atomic.Bool bufferProcessorInterval time.Duration - buffer []Data + queue chan *Trade + buffer []*Trade } // ByDate sorts trades by date ascending -type ByDate []Data +type ByDate []Trade func (b ByDate) Len() int { return len(b) diff --git a/exchanges/yobit/yobit_wrapper.go b/exchanges/yobit/yobit_wrapper.go index 251c64b6f48..525b3b2dddd 100644 --- a/exchanges/yobit/yobit_wrapper.go +++ b/exchanges/yobit/yobit_wrapper.go @@ -321,7 +321,7 @@ func (y *Yobit) GetWithdrawalsHistory(_ context.Context, _ currency.Code, _ asse } // GetRecentTrades returns the most recent trades for a currency and asset -func (y *Yobit) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Data, error) { +func (y *Yobit) GetRecentTrades(ctx context.Context, p currency.Pair, assetType asset.Item) ([]trade.Trade, error) { var err error p, err = y.FormatExchangeCurrency(p, assetType) if err != nil { @@ -334,14 +334,14 @@ func (y *Yobit) GetRecentTrades(ctx context.Context, p currency.Pair, assetType return nil, err } - resp := make([]trade.Data, len(tradeData)) + resp := make([]trade.Trade, len(tradeData)) for i := range tradeData { tradeTS := time.Unix(tradeData[i].Timestamp, 0) side := order.Buy if tradeData[i].Type == "ask" { side = order.Sell } - resp[i] = trade.Data{ + resp[i] = trade.Trade{ Exchange: y.Name, TID: strconv.FormatInt(tradeData[i].TID, 10), CurrencyPair: p, @@ -363,7 +363,7 @@ func (y *Yobit) GetRecentTrades(ctx context.Context, p currency.Pair, assetType } // GetHistoricTrades returns historic trade data within the timeframe provided -func (y *Yobit) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Data, error) { +func (y *Yobit) GetHistoricTrades(_ context.Context, _ currency.Pair, _ asset.Item, _, _ time.Time) ([]trade.Trade, error) { return nil, common.ErrFunctionNotSupported }