diff --git a/exchanges/gateio/gateio_test.go b/exchanges/gateio/gateio_test.go index 469a4ee9d37..3896f3a9443 100644 --- a/exchanges/gateio/gateio_test.go +++ b/exchanges/gateio/gateio_test.go @@ -2407,7 +2407,7 @@ const wsTickerPushDataJSON = `{"time": 1606291803, "channel": "spot.tickers", "e func TestWsTickerPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsTickerPushDataJSON)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsTickerPushDataJSON)); err != nil { t.Errorf("%s websocket ticker push data error: %v", g.Name, err) } } @@ -2416,7 +2416,7 @@ const wsTradePushDataJSON = `{ "time": 1606292218, "channel": "spot.trades", "ev func TestWsTradePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsTradePushDataJSON)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsTradePushDataJSON)); err != nil { t.Errorf("%s websocket trade push data error: %v", g.Name, err) } } @@ -2425,7 +2425,7 @@ const wsCandlestickPushDataJSON = `{"time": 1606292600, "channel": "spot.candles func TestWsCandlestickPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsCandlestickPushDataJSON)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsCandlestickPushDataJSON)); err != nil { t.Errorf("%s websocket candlestick push data error: %v", g.Name, err) } } @@ -2434,7 +2434,7 @@ const wsOrderbookTickerJSON = `{"time": 1606293275, "channel": "spot.book_ticker func TestWsOrderbookTickerPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsOrderbookTickerJSON)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsOrderbookTickerJSON)); err != nil { t.Errorf("%s websocket orderbook push data error: %v", g.Name, err) } } @@ -2446,11 +2446,11 @@ const ( func TestWsOrderbookSnapshotPushData(t *testing.T) { t.Parallel() - err := g.WsHandleSpotData(context.Background(), []byte(wsOrderbookSnapshotPushDataJSON)) + err := g.WsHandleSpotData(context.Background(), nil, []byte(wsOrderbookSnapshotPushDataJSON)) if err != nil { t.Errorf("%s websocket orderbook snapshot push data error: %v", g.Name, err) } - if err = g.WsHandleSpotData(context.Background(), []byte(wsOrderbookUpdatePushDataJSON)); err != nil { + if err = g.WsHandleSpotData(context.Background(), nil, []byte(wsOrderbookUpdatePushDataJSON)); err != nil { t.Errorf("%s websocket orderbook update push data error: %v", g.Name, err) } } @@ -2459,7 +2459,7 @@ const wsSpotOrderPushDataJSON = `{"time": 1605175506, "channel": "spot.orders", func TestWsPushOrders(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsSpotOrderPushDataJSON)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsSpotOrderPushDataJSON)); err != nil { t.Errorf("%s websocket orders push data error: %v", g.Name, err) } } @@ -2468,7 +2468,7 @@ const wsUserTradePushDataJSON = `{"time": 1605176741, "channel": "spot.usertrade func TestWsUserTradesPushDataJSON(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsUserTradePushDataJSON)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsUserTradePushDataJSON)); err != nil { t.Errorf("%s websocket users trade push data error: %v", g.Name, err) } } @@ -2477,7 +2477,7 @@ const wsBalancesPushDataJSON = `{"time": 1605248616, "channel": "spot.balances", func TestBalancesPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsBalancesPushDataJSON)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsBalancesPushDataJSON)); err != nil { t.Errorf("%s websocket balances push data error: %v", g.Name, err) } } @@ -2486,7 +2486,7 @@ const wsMarginBalancePushDataJSON = `{"time": 1605248616, "channel": "spot.fundi func TestMarginBalancePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsMarginBalancePushDataJSON)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsMarginBalancePushDataJSON)); err != nil { t.Errorf("%s websocket margin balance push data error: %v", g.Name, err) } } @@ -2495,7 +2495,7 @@ const wsCrossMarginBalancePushDataJSON = `{"time": 1605248616,"channel": "spot.c func TestCrossMarginBalancePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsCrossMarginBalancePushDataJSON)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsCrossMarginBalancePushDataJSON)); err != nil { t.Errorf("%s websocket cross margin balance push data error: %v", g.Name, err) } } @@ -2504,7 +2504,7 @@ const wsCrossMarginBalanceLoan = `{ "time":1658289372, "channel":"spot.cross_loa func TestCrossMarginBalanceLoan(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(context.Background(), []byte(wsCrossMarginBalanceLoan)); err != nil { + if err := g.WsHandleSpotData(context.Background(), nil, []byte(wsCrossMarginBalanceLoan)); err != nil { t.Errorf("%s websocket cross margin loan push data error: %v", g.Name, err) } } @@ -2513,7 +2513,7 @@ const wsFuturesTickerPushDataJSON = `{"time": 1541659086, "channel": "futures.ti func TestFuturesTicker(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesTickerPushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesTickerPushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket push data error: %v", g.Name, err) } } @@ -2522,7 +2522,7 @@ const wsFuturesTradesPushDataJSON = `{"channel": "futures.trades","event": "upda func TestFuturesTrades(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesTradesPushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesTradesPushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket push data error: %v", g.Name, err) } } @@ -2533,7 +2533,7 @@ const ( func TestOrderbookData(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesOrderbookTickerJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesOrderbookTickerJSON), asset.Futures); err != nil { t.Errorf("%s websocket orderbook ticker push data error: %v", g.Name, err) } } @@ -2542,7 +2542,7 @@ const wsFuturesOrderPushDataJSON = `{ "channel": "futures.orders", "event": "upd func TestFuturesOrderPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesOrderPushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesOrderPushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket futures order push data error: %v", g.Name, err) } } @@ -2551,7 +2551,7 @@ const wsFuturesUsertradesPushDataJSON = `{"time": 1543205083, "channel": "future func TestFuturesUserTrades(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesUsertradesPushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesUsertradesPushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket futures user trades push data error: %v", g.Name, err) } } @@ -2560,7 +2560,7 @@ const wsFuturesLiquidationPushDataJSON = `{"channel": "futures.liquidates", "eve func TestFuturesLiquidationPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesLiquidationPushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesLiquidationPushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket futures liquidation push data error: %v", g.Name, err) } } @@ -2569,7 +2569,7 @@ const wsFuturesAutoDelevergesNotification = `{"channel": "futures.auto_deleverag func TestFuturesAutoDeleverges(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesAutoDelevergesNotification), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesAutoDelevergesNotification), asset.Futures); err != nil { t.Errorf("%s websocket futures auto deleverge push data error: %v", g.Name, err) } } @@ -2578,7 +2578,7 @@ const wsFuturesPositionClosePushDataJSON = ` {"channel": "futures.position_close func TestPositionClosePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesPositionClosePushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesPositionClosePushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket futures position close push data error: %v", g.Name, err) } } @@ -2587,7 +2587,7 @@ const wsFuturesBalanceNotificationPushDataJSON = `{"channel": "futures.balances" func TestFuturesBalanceNotification(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesBalanceNotificationPushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesBalanceNotificationPushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket futures balance notification push data error: %v", g.Name, err) } } @@ -2596,7 +2596,7 @@ const wsFuturesReduceRiskLimitNotificationPushDataJSON = `{"time": 1551858330, " func TestFuturesReduceRiskLimitPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesReduceRiskLimitNotificationPushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesReduceRiskLimitNotificationPushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket futures reduce risk limit notification push data error: %v", g.Name, err) } } @@ -2605,7 +2605,7 @@ const wsFuturesPositionsNotificationPushDataJSON = `{"time": 1588212926,"channel func TestFuturesPositionsNotification(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesPositionsNotificationPushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesPositionsNotificationPushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket futures positions change notification push data error: %v", g.Name, err) } } @@ -2614,7 +2614,7 @@ const wsFuturesAutoOrdersPushDataJSON = `{"time": 1596798126,"channel": "futures func TestFuturesAutoOrderPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleFuturesData(context.Background(), []byte(wsFuturesAutoOrdersPushDataJSON), asset.Futures); err != nil { + if err := g.WsHandleFuturesData(context.Background(), nil, []byte(wsFuturesAutoOrdersPushDataJSON), asset.Futures); err != nil { t.Errorf("%s websocket futures auto orders push data error: %v", g.Name, err) } } @@ -2625,7 +2625,7 @@ const optionsContractTickerPushDataJSON = `{"time": 1630576352, "channel": "opti func TestOptionsContractTickerPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsContractTickerPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsContractTickerPushDataJSON)); err != nil { t.Errorf("%s websocket options contract ticker push data failed with error %v", g.Name, err) } } @@ -2634,7 +2634,7 @@ const optionsUnderlyingTickerPushDataJSON = `{"time": 1630576352, "channel": "op func TestOptionsUnderlyingTickerPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsUnderlyingTickerPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsUnderlyingTickerPushDataJSON)); err != nil { t.Errorf("%s websocket options underlying ticker push data error: %v", g.Name, err) } } @@ -2643,7 +2643,7 @@ const optionsContractTradesPushDataJSON = `{"time": 1630576356, "channel": "opti func TestOptionsContractTradesPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsContractTradesPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsContractTradesPushDataJSON)); err != nil { t.Errorf("%s websocket contract trades push data error: %v", g.Name, err) } } @@ -2652,7 +2652,7 @@ const optionsUnderlyingTradesPushDataJSON = `{"time": 1630576356, "channel": "op func TestOptionsUnderlyingTradesPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsUnderlyingTradesPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsUnderlyingTradesPushDataJSON)); err != nil { t.Errorf("%s websocket underlying trades push data error: %v", g.Name, err) } } @@ -2661,7 +2661,7 @@ const optionsUnderlyingPricePushDataJSON = `{ "time": 1630576356, "channel": "op func TestOptionsUnderlyingPricePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsUnderlyingPricePushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsUnderlyingPricePushDataJSON)); err != nil { t.Errorf("%s websocket underlying price push data error: %v", g.Name, err) } } @@ -2670,7 +2670,7 @@ const optionsMarkPricePushDataJSON = `{ "time": 1630576356, "channel": "options. func TestOptionsMarkPricePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsMarkPricePushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsMarkPricePushDataJSON)); err != nil { t.Errorf("%s websocket mark price push data error: %v", g.Name, err) } } @@ -2679,7 +2679,7 @@ const optionsSettlementsPushDataJSON = `{ "time": 1630576356, "channel": "option func TestSettlementsPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsSettlementsPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsSettlementsPushDataJSON)); err != nil { t.Errorf("%s websocket options settlements push data error: %v", g.Name, err) } } @@ -2688,7 +2688,7 @@ const optionsContractPushDataJSON = `{"time": 1630576356, "channel": "options.co func TestOptionsContractPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsContractPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsContractPushDataJSON)); err != nil { t.Errorf("%s websocket options contracts push data error: %v", g.Name, err) } } @@ -2700,10 +2700,10 @@ const ( func TestOptionsCandlesticksPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsContractCandlesticksPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsContractCandlesticksPushDataJSON)); err != nil { t.Errorf("%s websocket options contracts candlestick push data error: %v", g.Name, err) } - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsUnderlyingCandlesticksPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsUnderlyingCandlesticksPushDataJSON)); err != nil { t.Errorf("%s websocket options underlying candlestick push data error: %v", g.Name, err) } } @@ -2717,17 +2717,17 @@ const ( func TestOptionsOrderbookPushData(t *testing.T) { t.Parallel() - err := g.WsHandleOptionsData(context.Background(), []byte(optionsOrderbookTickerPushDataJSON)) + err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsOrderbookTickerPushDataJSON)) if err != nil { t.Errorf("%s websocket options orderbook ticker push data error: %v", g.Name, err) } - if err = g.WsHandleOptionsData(context.Background(), []byte(optionsOrderbookSnapshotPushDataJSON)); err != nil { + if err = g.WsHandleOptionsData(context.Background(), nil, []byte(optionsOrderbookSnapshotPushDataJSON)); err != nil { t.Errorf("%s websocket options orderbook snapshot push data error: %v", g.Name, err) } - if err = g.WsHandleOptionsData(context.Background(), []byte(optionsOrderbookUpdatePushDataJSON)); err != nil { + if err = g.WsHandleOptionsData(context.Background(), nil, []byte(optionsOrderbookUpdatePushDataJSON)); err != nil { t.Errorf("%s websocket options orderbook update push data error: %v", g.Name, err) } - if err = g.WsHandleOptionsData(context.Background(), []byte(optionsOrderbookSnapshotUpdateEventPushDataJSON)); err != nil { + if err = g.WsHandleOptionsData(context.Background(), nil, []byte(optionsOrderbookSnapshotUpdateEventPushDataJSON)); err != nil { t.Errorf("%s websocket options orderbook snapshot update event push data error: %v", g.Name, err) } } @@ -2736,7 +2736,7 @@ const optionsOrderPushDataJSON = `{"time": 1630654851,"channel": "options.orders func TestOptionsOrderPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsOrderPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsOrderPushDataJSON)); err != nil { t.Errorf("%s websocket options orders push data error: %v", g.Name, err) } } @@ -2745,7 +2745,7 @@ const optionsUsersTradesPushDataJSON = `{ "time": 1639144214, "channel": "option func TestOptionUserTradesPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsUsersTradesPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsUsersTradesPushDataJSON)); err != nil { t.Errorf("%s websocket options orders push data error: %v", g.Name, err) } } @@ -2754,7 +2754,7 @@ const optionsLiquidatesPushDataJSON = `{ "channel": "options.liquidates", "event func TestOptionsLiquidatesPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsLiquidatesPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsLiquidatesPushDataJSON)); err != nil { t.Errorf("%s websocket options liquidates push data error: %v", g.Name, err) } } @@ -2763,7 +2763,7 @@ const optionsSettlementPushDataJSON = `{ "channel": "options.user_settlements", func TestOptionsSettlementPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsSettlementPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsSettlementPushDataJSON)); err != nil { t.Errorf("%s websocket options settlement push data error: %v", g.Name, err) } } @@ -2772,7 +2772,7 @@ const optionsPositionClosePushDataJSON = `{"channel": "options.position_closes", func TestOptionsPositionClosePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsPositionClosePushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsPositionClosePushDataJSON)); err != nil { t.Errorf("%s websocket options position close push data error: %v", g.Name, err) } } @@ -2781,7 +2781,7 @@ const optionsBalancePushDataJSON = `{ "channel": "options.balances", "event": "u func TestOptionsBalancePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsBalancePushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsBalancePushDataJSON)); err != nil { t.Errorf("%s websocket options balance push data error: %v", g.Name, err) } } @@ -2790,7 +2790,7 @@ const optionsPositionPushDataJSON = `{"time": 1630654851, "channel": "options.po func TestOptionsPositionPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(context.Background(), []byte(optionsPositionPushDataJSON)); err != nil { + if err := g.WsHandleOptionsData(context.Background(), nil, []byte(optionsPositionPushDataJSON)); err != nil { t.Errorf("%s websocket options position push data error: %v", g.Name, err) } } @@ -2802,11 +2802,11 @@ const ( func TestFuturesOrderbookPushData(t *testing.T) { t.Parallel() - err := g.WsHandleFuturesData(context.Background(), []byte(futuresOrderbookPushData), asset.Futures) + err := g.WsHandleFuturesData(context.Background(), nil, []byte(futuresOrderbookPushData), asset.Futures) if err != nil { t.Error(err) } - err = g.WsHandleFuturesData(context.Background(), []byte(futuresOrderbookUpdatePushData), asset.Futures) + err = g.WsHandleFuturesData(context.Background(), nil, []byte(futuresOrderbookUpdatePushData), asset.Futures) if err != nil { t.Error(err) } @@ -2816,7 +2816,7 @@ const futuresCandlesticksPushData = `{"time": 1678469467, "time_ms": 16784694679 func TestFuturesCandlestickPushData(t *testing.T) { t.Parallel() - err := g.WsHandleFuturesData(context.Background(), []byte(futuresCandlesticksPushData), asset.Futures) + err := g.WsHandleFuturesData(context.Background(), nil, []byte(futuresCandlesticksPushData), asset.Futures) if err != nil { t.Error(err) } diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 5849e72d706..ee5430f5b95 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -166,18 +166,18 @@ func (g *Gateio) generateWsSignature(secret, event, channel string, t int64) (st } // WsHandleSpotData handles spot data -func (g *Gateio) WsHandleSpotData(_ context.Context, respRaw []byte) error { +func (g *Gateio) WsHandleSpotData(_ context.Context, conn stream.Connection, respRaw []byte) error { push, err := parseWSHeader(respRaw) if err != nil { return err } if push.RequestID != "" { - return g.Websocket.Match.RequireMatchWithData(push.RequestID, respRaw) + return conn.RequireMatchWithData(push.RequestID, respRaw) } if push.Event == subscribeEvent || push.Event == unsubscribeEvent { - return g.Websocket.Match.RequireMatchWithData(push.ID, respRaw) + return conn.RequireMatchWithData(push.ID, respRaw) } switch push.Channel { // TODO: Convert function params below to only use push.Result diff --git a/exchanges/gateio/gateio_websocket_futures.go b/exchanges/gateio/gateio_websocket_futures.go index 628824a08ca..7820a21ad78 100644 --- a/exchanges/gateio/gateio_websocket_futures.go +++ b/exchanges/gateio/gateio_websocket_futures.go @@ -148,17 +148,14 @@ func (g *Gateio) FuturesUnsubscribe(ctx context.Context, conn stream.Connection, } // WsHandleFuturesData handles futures websocket data -func (g *Gateio) WsHandleFuturesData(_ context.Context, respRaw []byte, a asset.Item) error { +func (g *Gateio) WsHandleFuturesData(_ context.Context, conn stream.Connection, respRaw []byte, a asset.Item) error { push, err := parseWSHeader(respRaw) if err != nil { return err } if push.Event == subscribeEvent || push.Event == unsubscribeEvent { - if !g.Websocket.Match.IncomingWithData(push.ID, respRaw) { - return fmt.Errorf("couldn't match subscription message with ID: %d", push.ID) - } - return nil + return conn.RequireMatchWithData(push.ID, respRaw) } switch push.Channel { diff --git a/exchanges/gateio/gateio_websocket_option.go b/exchanges/gateio/gateio_websocket_option.go index 7ec39737e6e..9c66f77dec5 100644 --- a/exchanges/gateio/gateio_websocket_option.go +++ b/exchanges/gateio/gateio_websocket_option.go @@ -293,17 +293,14 @@ func (g *Gateio) OptionsUnsubscribe(ctx context.Context, conn stream.Connection, } // WsHandleOptionsData handles options websocket data -func (g *Gateio) WsHandleOptionsData(_ context.Context, respRaw []byte) error { +func (g *Gateio) WsHandleOptionsData(_ context.Context, conn stream.Connection, respRaw []byte) error { push, err := parseWSHeader(respRaw) if err != nil { return err } if push.Event == subscribeEvent || push.Event == unsubscribeEvent { - if !g.Websocket.Match.IncomingWithData(push.ID, respRaw) { - return fmt.Errorf("couldn't match subscription message with ID: %d", push.ID) - } - return nil + return conn.RequireMatchWithData(push.ID, respRaw) } switch push.Channel { diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index 239e3179245..52bd7475e5c 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -230,8 +230,8 @@ func (g *Gateio) Setup(exch *config.Exchange) error { URL: futuresWebsocketUsdtURL, ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, - Handler: func(ctx context.Context, incoming []byte) error { - return g.WsHandleFuturesData(ctx, incoming, asset.Futures) + Handler: func(ctx context.Context, conn stream.Connection, incoming []byte) error { + return g.WsHandleFuturesData(ctx, conn, incoming, asset.Futures) }, Subscriber: g.FuturesSubscribe, Unsubscriber: g.FuturesUnsubscribe, @@ -249,8 +249,8 @@ func (g *Gateio) Setup(exch *config.Exchange) error { URL: futuresWebsocketBtcURL, ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, - Handler: func(ctx context.Context, incoming []byte) error { - return g.WsHandleFuturesData(ctx, incoming, asset.Futures) + Handler: func(ctx context.Context, conn stream.Connection, incoming []byte) error { + return g.WsHandleFuturesData(ctx, conn, incoming, asset.Futures) }, Subscriber: g.FuturesSubscribe, Unsubscriber: g.FuturesUnsubscribe, @@ -269,8 +269,8 @@ func (g *Gateio) Setup(exch *config.Exchange) error { URL: deliveryRealUSDTTradingURL, ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, - Handler: func(ctx context.Context, incoming []byte) error { - return g.WsHandleFuturesData(ctx, incoming, asset.DeliveryFutures) + Handler: func(ctx context.Context, conn stream.Connection, incoming []byte) error { + return g.WsHandleFuturesData(ctx, conn, incoming, asset.DeliveryFutures) }, Subscriber: g.DeliveryFuturesSubscribe, Unsubscriber: g.DeliveryFuturesUnsubscribe, diff --git a/exchanges/stream/stream_types.go b/exchanges/stream/stream_types.go index 832e74c5526..740dc5ff2ab 100644 --- a/exchanges/stream/stream_types.go +++ b/exchanges/stream/stream_types.go @@ -37,6 +37,9 @@ type Connection interface { SetProxy(string) GetURL() string Shutdown() error + + // RequireMatchWithData routes incoming data using the connection specific match system to the correct handler + RequireMatchWithData(signature any, incoming []byte) error } // Inspector is used to verify messages via SendMessageReturnResponsesWithInspection @@ -79,7 +82,7 @@ type ConnectionSetup struct { // Handler defines the function that will be called when a message is // received from the exchange's websocket server. This function should // handle the incoming message and pass it to the appropriate data handler. - Handler func(ctx context.Context, incoming []byte) error + Handler func(ctx context.Context, conn Connection, incoming []byte) error // BespokeGenerateMessageID is a function that returns a unique message ID. // This is useful for when an exchange connection requires a unique or // structured message ID for each message sent. diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 737c0eadc7f..e54e498bf61 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -300,6 +300,13 @@ func (w *Websocket) getConnectionFromSetup(c *ConnectionSetup) *WebsocketConnect if c.URL != "" { connectionURL = c.URL } + match := w.Match + if w.useMultiConnectionManagement { + // If we are using multi connection management, we can decouple + // the match from the global match and have a match per connection. + match = NewMatch() + } + return &WebsocketConnection{ ExchangeName: w.exchangeName, URL: connectionURL, @@ -310,7 +317,7 @@ func (w *Websocket) getConnectionFromSetup(c *ConnectionSetup) *WebsocketConnect readMessageErrors: w.ReadMessageErrors, shutdown: w.ShutdownC, Wg: &w.Wg, - Match: w.Match, + Match: match, RateLimit: c.RateLimit, Reporter: c.ConnectionLevelReporter, bespokeGenerateMessageID: c.BespokeGenerateMessageID, @@ -1094,14 +1101,14 @@ func (w *Websocket) checkSubscriptions(conn Connection, subs subscription.List) } // Reader reads and handles data from a specific connection -func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ctx context.Context, message []byte) error) { +func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ctx context.Context, conn Connection, message []byte) error) { defer w.Wg.Done() for { resp := conn.ReadMessage() if resp.Raw == nil { return // Connection has been closed } - if err := handler(ctx, resp.Raw); err != nil { + if err := handler(ctx, conn, resp.Raw); err != nil { w.DataHandler <- fmt.Errorf("connection URL:[%v] error: %w", conn.GetURL(), err) } } diff --git a/exchanges/stream/websocket_connection.go b/exchanges/stream/websocket_connection.go index 55fd71682e6..9182181e735 100644 --- a/exchanges/stream/websocket_connection.go +++ b/exchanges/stream/websocket_connection.go @@ -379,3 +379,8 @@ func removeURLQueryString(url string) string { } return url } + +// RequireMatchWithData routes incoming data using the connection specific match system to the correct handler +func (w *WebsocketConnection) RequireMatchWithData(signature any, incoming []byte) error { + return w.Match.RequireMatchWithData(signature, incoming) +} diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index b6f3a762404..a00e90497c5 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -251,7 +251,7 @@ func TestConnectionMessageErrors(t *testing.T) { err = ws.Connect() require.ErrorIs(t, err, errWebsocketDataHandlerUnset) - ws.connectionManager[0].Setup.Handler = func(context.Context, []byte) error { + ws.connectionManager[0].Setup.Handler = func(context.Context, Connection, []byte) error { return errDastardlyReason } err = ws.Connect() @@ -269,7 +269,7 @@ func TestConnectionMessageErrors(t *testing.T) { err = ws.Connect() require.ErrorIs(t, err, errDastardlyReason) - ws.connectionManager[0].Setup.Handler = func(context.Context, []byte) error { + ws.connectionManager[0].Setup.Handler = func(context.Context, Connection, []byte) error { return errDastardlyReason } err = ws.Connect() @@ -461,7 +461,7 @@ func TestSubscribeUnsubscribe(t *testing.T) { Unsubscriber: func(ctx context.Context, c Connection, s subscription.List) error { return currySimpleUnsubConn(multi)(ctx, c, s) }, - Handler: func(context.Context, []byte) error { return nil }, + Handler: func(context.Context, Connection, []byte) error { return nil }, } require.NoError(t, multi.SetupNewConnection(amazingCandidate)) @@ -1168,7 +1168,7 @@ func TestFlushChannels(t *testing.T) { Unsubscriber: func(ctx context.Context, c Connection, s subscription.List) error { return currySimpleUnsubConn(w)(ctx, c, s) }, - Handler: func(context.Context, []byte) error { return nil }, + Handler: func(context.Context, Connection, []byte) error { return nil }, } require.NoError(t, w.SetupNewConnection(amazingCandidate)) require.NoError(t, w.FlushChannels(), "FlushChannels must not error") @@ -1266,7 +1266,7 @@ func TestSetupNewConnection(t *testing.T) { err = multi.SetupNewConnection(connSetup) require.ErrorIs(t, err, errWebsocketDataHandlerUnset) - connSetup.Handler = func(context.Context, []byte) error { return nil } + connSetup.Handler = func(context.Context, Connection, []byte) error { return nil } connSetup.MessageFilter = []string{"slices are super naughty and not comparable"} err = multi.SetupNewConnection(connSetup) require.ErrorIs(t, err, errMessageFilterNotComparable) @@ -1566,3 +1566,18 @@ func TestGetConnection(t *testing.T) { require.NoError(t, err) assert.Same(t, expected, conn) } + +func TestWebsocketConnectionRequireMatchWithData(t *testing.T) { + t.Parallel() + ws := WebsocketConnection{Match: NewMatch()} + err := ws.RequireMatchWithData(0, nil) + require.ErrorIs(t, err, ErrSignatureNotMatched) + + ch, err := ws.Match.Set(0, 1) + require.NoError(t, err) + + err = ws.RequireMatchWithData(0, []byte("test")) + require.NoError(t, err) + + require.Equal(t, []byte("test"), <-ch) +}