Skip to content

Commit

Permalink
GateIO: Abstract WS handler func routing
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Jan 31, 2025
1 parent f9b84ab commit ccb8f26
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 128 deletions.
1 change: 0 additions & 1 deletion exchanges/gateio/gateio_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2001,7 +2001,6 @@ type WSResponse struct {
Event string `json:"event"`
Result json.RawMessage `json:"result"`
RequestID string `json:"request_id"`
assetType asset.Item
}

// WsTicker websocket ticker information.
Expand Down
27 changes: 18 additions & 9 deletions exchanges/gateio/gateio_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (
)

var (
errParsingWSField = errors.New("error parsing WS field")
errChannelParts = errors.New("should contain asset and channel")
errParsingWSField = errors.New("error parsing WS field")
errChannelParts = errors.New("should contain asset and channel")
errMsgAssetTypeInvalid = errors.New("message AssetType prefix does not match connection AssetType")
)

const (
Expand Down Expand Up @@ -160,7 +161,7 @@ func (g *Gateio) generateWsSignature(secret, event, channel string, t int64) (st
}

// wsHandleData handles websocket data
func (g *Gateio) wsHandleData(ctx context.Context, respRaw []byte) error {
func (g *Gateio) wsHandleData(ctx context.Context, a asset.Item, respRaw []byte) error {
push, err := parseWSHeader(respRaw)
if err != nil {
return err
Expand All @@ -169,18 +170,26 @@ func (g *Gateio) wsHandleData(ctx context.Context, respRaw []byte) error {
if channelParts := strings.Split(push.Channel, "."); len(channelParts) != 2 {
return fmt.Errorf("%w `channel` (`%s`)", errParsingWSField, push.Channel)
} else {
// Assign to push so that we can abstract CoinM/USDT as just Futures
if push.assetType, err = asset.New(channelParts[0]); err != nil {
if a.String() != channelParts[0] {
return fmt.Errorf("%w `channel`; %w: `%s`", errParsingWSField, asset.ErrInvalidAsset, channelParts[0])
}
push.Channel = channelParts[1]
}

switch push.assetType {
case asset.Spot:
return g.wsHandleSpotData(ctx, push, respRaw)
assetHandlers, ok := wsHandlerFuncs[a]

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / lint

declared and not used: assetHandlers

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, 386, true, true)

declared and not used: assetHandlers

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, 386, true, true)

undefined: wsHandlerFuncs

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, amd64, true, false)

declared and not used: assetHandlers

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, amd64, true, false)

undefined: wsHandlerFuncs

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-latest, amd64, true, true)

declared and not used: assetHandlers

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-latest, amd64, true, true)

undefined: wsHandlerFuncs

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-13, amd64, true, true)

declared and not used: assetHandlers

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-13, amd64, true, true)

undefined: wsHandlerFuncs

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (windows-latest, amd64, true, true)

declared and not used: assetHandlers

Check failure on line 179 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (windows-latest, amd64, true, true)

undefined: wsHandlerFuncs
if !ok {
return fmt.Errorf("%w `channel`; %w: `%s`", errParsingWSField, asset.ErrNotSupported, a)
}
return fmt.Errorf("%w `channel`; %w: `%s`", errParsingWSField, asset.ErrNotSupported, push.assetType)

handler, ok := wsHandlerFuncs[a][push.Channel]

Check failure on line 184 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, 386, true, true)

undefined: wsHandlerFuncs

Check failure on line 184 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, amd64, true, false)

undefined: wsHandlerFuncs

Check failure on line 184 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-latest, amd64, true, true)

undefined: wsHandlerFuncs

Check failure on line 184 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-13, amd64, true, true)

undefined: wsHandlerFuncs

Check failure on line 184 in exchanges/gateio/gateio_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (windows-latest, amd64, true, true)

undefined: wsHandlerFuncs
if !ok {
g.Websocket.DataHandler <- stream.UnhandledMessageWarning{
Message: g.Name + ": " + stream.UnhandledMessage + ": " + string(respRaw),
}
return errors.New(stream.UnhandledMessage)
}

return handler(ctx, a, push, respRaw)
}

// wsHandleSpotData handles spot data
Expand Down
16 changes: 2 additions & 14 deletions exchanges/gateio/gateio_websocket_delivery_futures.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,5 @@
package gateio

import (
"context"
"errors"
"strconv"
"time"

"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
)

const (
// delivery real trading urls
deliveryRealUSDTTradingURL = "wss://fx-ws.gateio.ws/v4/ws/delivery/usdt"
Expand All @@ -26,6 +12,7 @@ const (

var fetchedFuturesCurrencyPairSnapshotOrderbook = make(map[string]bool)

/*
// GenerateDeliveryFuturesDefaultSubscriptions returns delivery futures default subscriptions params.
func (g *Gateio) GenerateDeliveryFuturesDefaultSubscriptions() (subscription.List, error) {
_, err := g.GetCredentials(context.Background())
Expand Down Expand Up @@ -174,3 +161,4 @@ func (g *Gateio) generateDeliveryFuturesPayload(ctx context.Context, conn stream
}
return outbound, nil
}
*/
18 changes: 3 additions & 15 deletions exchanges/gateio/gateio_websocket_futures.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
Expand All @@ -23,6 +21,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
)

/*
// GenerateFuturesDefaultSubscriptions returns default subscriptions information.
func (g *Gateio) GenerateFuturesDefaultSubscriptions(a asset.Item) (subscription.List, error) {
channelsToSubscribe := defaultFuturesSubscriptions
Expand Down Expand Up @@ -73,21 +72,10 @@ func (g *Gateio) FuturesSubscribe(ctx context.Context, conn stream.Connection, c
func (g *Gateio) FuturesUnsubscribe(ctx context.Context, conn stream.Connection, channelsToUnsubscribe subscription.List) error {
return g.handleSubscription(ctx, conn, unsubscribeEvent, channelsToUnsubscribe, g.generateFuturesPayload)
}
*/

// WsHandleFuturesData handles futures websocket data
func (g *Gateio) WsHandleFuturesData(_ context.Context, respRaw []byte, a asset.Item) error {
push, err := parseWSHeader(respRaw)
if err != nil {
return err
}

if push.Event == subscribeEvent || push.Event == unsubscribeEvent {
if !g.Websocket.Match.IncomingWithData(push.ID, respRaw) {
return fmt.Errorf("couldn't match subscription message with ID: %d", push.ID)
}
return nil
}

func (g *Gateio) wsHandleFuturesData(_ context.Context, a asset.Item, push *WSResponse, respRaw []byte) error {
switch push.Channel {
case futuresTickersChannel:

Check failure on line 80 in exchanges/gateio/gateio_websocket_futures.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, 386, true, true)

undefined: futuresTickersChannel

Check failure on line 80 in exchanges/gateio/gateio_websocket_futures.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, amd64, true, false)

undefined: futuresTickersChannel

Check failure on line 80 in exchanges/gateio/gateio_websocket_futures.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-latest, amd64, true, true)

undefined: futuresTickersChannel

Check failure on line 80 in exchanges/gateio/gateio_websocket_futures.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-13, amd64, true, true)

undefined: futuresTickersChannel

Check failure on line 80 in exchanges/gateio/gateio_websocket_futures.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (windows-latest, amd64, true, true)

undefined: futuresTickersChannel
return g.processFuturesTickers(respRaw, a)
Expand Down
112 changes: 23 additions & 89 deletions exchanges/gateio/gateio_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,114 +180,48 @@ func (g *Gateio) SetDefaults() {

// Setup sets user configuration
func (g *Gateio) Setup(exch *config.Exchange) error {
err := exch.Validate()
if err != nil {
if err := exch.Validate(); err != nil {
return err
}
if !exch.Enabled {
g.SetEnabled(false)
return nil
}
err = g.SetupDefaults(exch)
if err != nil {
if err = g.SetupDefaults(exch); err != nil {
return err
}

err = g.Websocket.Setup(&stream.WebsocketSetup{
if err := g.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
Features: &g.Features.Supports.WebsocketCapabilities,
FillsFeed: g.Features.Enabled.FillsFeed,
TradeFeed: g.Features.Enabled.TradeFeed,
GenerateSubscriptions: g.generateSubscriptions,
UseMultiConnectionManagement: true,
RateLimitDefinitions: packageRateLimits,
})
if err != nil {
return err
}
// Spot connection
err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: gateioWebsocketEndpoint,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
Handler: g.WsHandleData,
Subscriber: g.Subscribe,
Unsubscriber: g.Unsubscribe,
Connector: g.WsConnect,
Authenticate: g.authenticateSpot,
MessageFilter: asset.Spot,
BespokeGenerateMessageID: g.GenerateWebsocketMessageID,
})
if err != nil {
return err
}
// Futures connection - USDT margined
err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: usdtFuturesWebsocketURL,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
Handler: func(ctx context.Context, incoming []byte) error {
return g.WsHandleFuturesData(ctx, incoming, asset.USDTMarginedFutures)
},
Subscriber: g.FuturesSubscribe,
Unsubscriber: g.FuturesUnsubscribe,
Connector: g.WsConnect,
MessageFilter: asset.USDTMarginedFutures,
BespokeGenerateMessageID: g.GenerateWebsocketMessageID,
})
if err != nil {
}); err != nil {
return err
}

// Futures connection - BTC margined
err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: btcFuturesWebsocketURL,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
Handler: func(ctx context.Context, incoming []byte) error {
return g.WsHandleFuturesData(ctx, incoming, asset.CoinMarginedFutures)
},
Subscriber: g.FuturesSubscribe,
Unsubscriber: g.FuturesUnsubscribe,
Connector: g.WsConnect,
MessageFilter: asset.CoinMarginedFutures,
BespokeGenerateMessageID: g.GenerateWebsocketMessageID,
})
if err != nil {
return err
}

// TODO: Add BTC margined delivery futures.
// Futures connection - Delivery - USDT margined
err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: deliveryRealUSDTTradingURL,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
Handler: func(ctx context.Context, incoming []byte) error {
return g.WsHandleFuturesData(ctx, incoming, asset.DeliveryFutures)
},
Subscriber: g.DeliveryFuturesSubscribe,
Unsubscriber: g.DeliveryFuturesUnsubscribe,
Connector: g.WsConnect,
MessageFilter: asset.DeliveryFutures,
BespokeGenerateMessageID: g.GenerateWebsocketMessageID,
})
if err != nil {
return err
}

// Futures connection - Options
return g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: optionsWebsocketURL,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
Handler: g.WsHandleOptionsData,
Subscriber: g.OptionsSubscribe,
Unsubscriber: g.OptionsUnsubscribe,
Connector: g.WsConnect,
MessageFilter: asset.Options,
BespokeGenerateMessageID: g.GenerateWebsocketMessageID,
})
var errs error
for _, a := range g.GetEnabledAssets() {
if err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
URL: wsURLs[a],
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
Handler: func(ctx context.Context, msg []byte) error { return g.wsHandleData(ctx, msg, a) },
Subscriber: g.Subscribe,
Unsubscriber: g.Unsubscribe,
Connector: g.WsConnect,
// TODO: Abstract auth spot
Authenticate: g.authenticateSpot,
MessageFilter: a,
BespokeGenerateMessageID: g.GenerateWebsocketMessageID,
}); err != nil {
errs = common.AppendError(errs, fmt.Errorf("%w: %s", err, a))
}
}
return errs
}

// UpdateTicker updates and returns the ticker for a currency pair
Expand Down

0 comments on commit ccb8f26

Please sign in to comment.