Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: force subscription store check as stop gap for wrapper side implementation #1717

Open
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

shazbert
Copy link
Collaborator

@shazbert shazbert commented Nov 18, 2024

PR Description

Noticed in Bybit that subscriptions weren't being added when I was flushing the connection every hour, so abstracted adding and removing to websocket package then I can merge this to the open PR chain. This is so we don't need to toil in the wrappers as well but there might be some edge cases I am not considering.

This only impacts exchanges that are upgraded to the websocket multi connection.

Added in checks in stream package so that the outbound subscriptions are added or removed from store.

Fixes bybit and Deribit

Type of change

Please delete options that are not relevant and add an x in [] as item is complete.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How has this been tested

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration and
also consider improving test coverage whilst working on a certain feature or package.

  • go test ./... -race
  • golangci-lint run
  • Test X

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation and regenerated documentation via the documentation tool
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally and on Github Actions with my changes
  • Any dependent changes have been merged and published in downstream modules

@shazbert shazbert added review me This pull request is ready for review medium priority labels Nov 18, 2024
@shazbert shazbert requested a review from a team November 18, 2024 04:43
@shazbert shazbert self-assigned this Nov 18, 2024
Copy link

codecov bot commented Nov 18, 2024

Codecov Report

Attention: Patch coverage is 15.38462% with 11 lines in your changes missing coverage. Please review.

Project coverage is 37.13%. Comparing base (85ecd0d) to head (f1ca7bd).
Report is 4 commits behind head on master.

Files with missing lines Patch % Lines
exchanges/stream/websocket.go 10.00% 6 Missing and 3 partials ⚠️
exchanges/gateio/gateio_websocket.go 33.33% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1717      +/-   ##
==========================================
- Coverage   37.15%   37.13%   -0.02%     
==========================================
  Files         414      414              
  Lines      180198   180199       +1     
==========================================
- Hits        66950    66916      -34     
- Misses     105389   105428      +39     
+ Partials     7859     7855       -4     
Files with missing lines Coverage Δ
exchanges/gateio/gateio_websocket.go 60.90% <33.33%> (+0.07%) ⬆️
exchanges/stream/websocket.go 85.22% <10.00%> (-0.87%) ⬇️

... and 12 files with indirect coverage changes

---- 🚨 Try these New Features:

@gbjk
Copy link
Collaborator

gbjk commented Nov 22, 2024

I have concerns about building this out further:

  • Some exchanges set PendingState and rely upon it
  • Some exchanges add temporary subs until they have a better known sub key
  • Fan out mechanics mean that sometimes the subs we add are't the ones we expected to
  • We often need to get the sub added in wsHandleData directly, to ensure that the next message gets parsed for orderbook, etc
  • I think this doesn't allow for Subscribe to be called directly

Copy link
Collaborator

@gbjk gbjk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given comment, I don't see a way to move this forward right now.
I feel like the manageSubs is the only place we can properly manage sub state.
One edge case in the back of my mind is resubscribing, as well, particularly on orderbook fail.

exchanges/stream/websocket.go Outdated Show resolved Hide resolved
@shazbert
Copy link
Collaborator Author

These are all fair points, an option I will investigate is check the connection subscription status after subscriptions to see if they have been added or removed. So that at least it complains that it needs to be added in the exchange packages themselves.

@shazbert shazbert added blocked and removed medium priority review me This pull request is ready for review labels Nov 25, 2024
@gbjk
Copy link
Collaborator

gbjk commented Nov 25, 2024

@shazbert Just realised that I haven't mentioned:

I completely agree about abstracting from exchange implementations. I'm just not yet ready to see how to do it.
My intention was to finish merging all the exchange sub changes, then handle how multi-asset sub confs should work (which is GateIO + multi-sockets + assets ), and then review what hinge points stop us moving stuff up.

I've been looking to abstract this, but I don't think we're ready yet because too much is in flight and unmerged.
My hope is that they all get ParallelChanOp and GenerateSubs for free, at least, and then maybe some more.

@shazbert shazbert added bug review me This pull request is ready for review and removed blocked labels Jan 15, 2025
@shazbert shazbert changed the title stream/gateio: move adding and removing subscriptions from websocket wrapper to stream package stream: force subscription store count check as stop gap for wrapper side implementation Jan 15, 2025
@shazbert shazbert requested a review from gbjk January 15, 2025 02:42
@shazbert
Copy link
Collaborator Author

@shazbert Just realised that I haven't mentioned:

I completely agree about abstracting from exchange implementations. I'm just not yet ready to see how to do it. My intention was to finish merging all the exchange sub changes, then handle how multi-asset sub confs should work (which is GateIO + multi-sockets + assets ), and then review what hinge points stop us moving stuff up.

I've been looking to abstract this, but I don't think we're ready yet because too much is in flight and unmerged. My hope is that they all get ParallelChanOp and GenerateSubs for free, at least, and then maybe some more.

@gbjk I have added in new code now with fixes, when you have time could you please check them out.

if err := w.UnsubscribeChannels(c, unsubs); err != nil {
return err
}
if diff := prevState - store.Len(); diff != len(unsubs) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't feel right ...

  1. Shouldn't Unsubscribe be erroring if it wasn't successful ?
  2. Shouldn't the state of all subs in unsubs be changing ?
    That said, it's not too bad either.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't Unsubscribe be erroring if it wasn't successful ?

I think it should, I just added it as a catch all in the event we forgot to remove it from the store when it was successful. Then it should complain. should 😬

Shouldn't the state of all subs in unsubs be changing

Now you are making me think this is all completely wrong 😆. This specifically didn't catch any issues. Can you suggest a better way as a back up check? Cause I am drooling at my screen trying to figure it out 🤤.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reply on this.
I think I'd want see that ranging the subs and unsubs have changed State, and that store contains (or doesn't) each one. ContainsAll and ContainsNone or something.
I really don't like checking len as a catchall, because it could false positive.
We're not locking store when we do any of this, afterall.

Copy link
Collaborator

@gbjk gbjk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.
It's not too invasive, and doesn't change too much, so I'm happy.

I'm working on sub management and routing separately for gateio, but this doesn't tread on any toes.

Copy link
Collaborator

@gloriousCode gloriousCode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A strict rule, but it makes sense

exchanges/bybit/bybit_websocket.go Show resolved Hide resolved
if method == "unsubscribe" {
return d.Websocket.RemoveSubscriptions(d.Websocket.Conn, subs...)
}
return d.Websocket.AddSubscriptions(d.Websocket.Conn, subs...)
Copy link
Collaborator

@gloriousCode gloriousCode Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something I am missing with this?
image
This inclusion does not appear to make much sense to me, given that by this point all subscriptions have been set to the Subscribed state due the for loop above this

Why aren't you handling the unsubscribe in the for loop above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, didn't even notice that. None of those subs were being added to the subscription store and my changes complained. I will check it out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exchanges/stream/websocket_test.go Show resolved Hide resolved
exchanges/stream/websocket.go Show resolved Hide resolved
Ryan O'Hara-Reid added 3 commits January 29, 2025 09:33
…ifference method as its only used locally and Diff method can be accessed directly
…mplate for edge case perps with settlement
@shazbert shazbert requested review from gloriousCode and gbjk January 29, 2025 02:08
}
for _, s := range subs {
if _, ok := subAck[s.QualifiedChannel]; ok {
err = common.AppendError(err, d.Websocket.AddSuccessfulSubscriptions(d.Websocket.Conn, s))
if strings.Contains(method, "subscribe") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested yet, but "unsubscribe" contains the string "subscribe" and so both would likely fall in here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@gbjk gbjk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work.

Only nits.

I'm asking for a change to not tear everything down on single sub failure, but if you don't make that change, it's no worse than before, so I'm fine.

exchanges/bybit/bybit_types.go Outdated Show resolved Hide resolved
exchanges/bybit/bybit_websocket.go Outdated Show resolved Hide resolved
@@ -123,6 +123,8 @@ var (
}
)

var errSubscriptionFailureDetected = errors.New("subscription failure detected")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it's stream.ErrSubscriptionFailure in disguise 🤠

@@ -779,6 +781,7 @@ func (d *Deribit) GetSubscriptionTemplate(_ *subscription.Subscription) (*templa
"channelName": channelName,
"interval": channelInterval,
"isSymbolChannel": isSymbolChannel,
"fmt": formatPerpetualPairWithSettlement,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Don't think we can call this fmt when it's specific to Futures.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It calls it against every pair, are you asking me to only call it against futures?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. But I think it's misleading that it's not really format for every pair, even if it's called for all of them.
So I guess I'd vote for: Rename the template func to fmtFuturePairs or rename the func called to formatChannelPair.
What's catching me is the disconnect between what's in the template and the actual func name.
Can also ignore.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exchanges/deribit/deribit_websocket.go Outdated Show resolved Hide resolved
@@ -827,16 +831,23 @@ func (d *Deribit) handleSubscription(method string, subs subscription.List) erro
subAck[c] = true
}
if len(subAck) != len(subs) {
err = common.ErrUnknownError
err = errSubscriptionFailureDetected
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this would be covering ... any subs that come back that we didn't expect, which won't be reported in the for range below?
It feels like it'd be better to delete from subAck as we range subs, and then error to say "Unexpected channels in Result" or something.
Cos otherwise, this error will be impossible to diagnose and I'd probably say misleading.

Comment on lines 464 to 466
if len(subs) != 0 && w.connectionManager[i].Subscriptions.Len() != len(subs) {
multiConnectFatalError = fmt.Errorf("%v %w expecting %d subscribed %v", w.exchangeName, errSubscriptionsNotAdded, len(subs), subs)
break
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have this problem:
We call connect on an exchange, one pair on one asset fails to subscribe (like TRUMPEW the other day) and all connections get torn down for all asset types and subs.
I still don't agree with that.
The biggest problem I think I have is the strong coupling between connections and subs.
If a consumer wants to connect the websocket without subs, and then later call Subscribe manually, we both allow it, and don't support it at the same time.

I think this is a wider topic, because this change doesn't make 454/460 worse, but it doubles down on the concept.

I think we need to say "Connecting is connecting, and Subscribing is separate" especially when new subs or resubs could connect new subs.

I'll continue to work on this, so I'm just highlighting for alignment on direction.

In the meantime, I'd gently request again to log the error, and return it, but not tear down all of the subs and conns for all assets.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call connect on an exchange, one pair on one asset fails to subscribe (like TRUMPEW the other day) and all connections get torn down for all asset types and subs.

Do you remember what specifically caused this?

The coupling connect() with subs is not great, regarding "Connecting is connecting, and Subscribing is separate" though is I think connect function might be changed to handle connections that don't require subscriptions and spawn a middleware handler that performs a JIT connection based on current/incoming subscriptions which also drops connections when unsubscribing when subs is empty, that way it scales a bit better with respect to a connections max sub capacity. I don't currently have a design that could help you in that respect though.

In the meantime, I'd gently request again to log the error, and return it, but not tear down all of the subs and conns for all assets.

Sure, if it's easier for everyone else I will just log it and I can just modify my own trading branch to be throwing the baby out with the bathwater myself. 🤷

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gloriousCode @thrasher- I pivot off these errors in my own trading branches that retries connect until it establish a clean slate or base line of full subscriptions across all trading pairs that I require. Happy to just log this out like GK suggests if we all agree and I will update tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you remember what specifically caused this?

A badly listed futures pair erroring on orderbook.

I pivot off these errors in my own trading branches that retries connect until it establish a clean slate or base line of full subscriptions across all trading pairs that I require.

Okay. This might be the crux of our disagreement.
You should not need to disconnect and unsub everything just because one thing fails, if your goal is to ensure it eventually works.
connectionManager should be monitoring and reconnecting, and if it doesn't have requisite granularity we should improve that.
And I think subscriptionManager should manager subscriptions.

What I'm aiming for is smaller responsibilities for functions that have names (and meanings) like connect, so they're more versatile (or composable) and easier to test.

Can you throw me any other requirements you might have like "retry subs and conns until everything is working", if you have any ?

I see exactly the same situation, and also wanna know "When's everything looking good?".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm aiming for is smaller responsibilities for functions that have names (and meanings) like connect, so they're more versatile (or composable) and easier to test.

Good direction to have, I agree.

Can you throw me any other requirements you might have like "retry subs and conns until everything is working", if you have any ? && "When's everything looking good?"

Pretty much just this; Personally, I am still going to rely on ensuring a clean slate of full subscriptions before proceeding. If even one subscription fails, it indicates a potential issue with the connection or the subscription logic, and I prefer to retry until everything is established correctly, or bring the instance offline and fix it.

Copy link
Collaborator

@gloriousCode gloriousCode Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I see gk's issue with regards to killing all connections, but I don't see it as something huge because we already kill everything when Subscribe is called and return errors. The refactoring you speak of gk will change that section regardless, but I get not wanting to tangle further.
  • Logging the issue isn't actionable via code and so I don't think that's the best way to go

I propose we move subscribing for multi-connections to the end of the connect() function. It is a small change™️ that satisfies the following criteria:

  • The default handling of websocket connections via websocketroutine_manager.go will log any errors returned
  • It won't by default kill all connections when subscriptions have issue
  • It allows users who call Connect() directly to handle any errors that are returned, and resubscribe/reconnect/panic("help!")

Gist:
https://gist.github.com/gloriousCode/609d25ea1ee9b954ecbb95b505c76609

It comes with the caveat that I haven't tested things thoroughly. I tried things like separating subscribing, but that's way too large a refactor for this PR and comes with many extra considerations that I don't think are what is desired out of this PR - which is to ensure all subscriptions are subscribed to and that the caller knows it

Does this solution make people happy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's a good compromise with minimal intrusion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err := w.UnsubscribeChannels(c, unsubs); err != nil {
return err
}
if diff := prevState - store.Len(); diff != len(unsubs) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reply on this.
I think I'd want see that ranging the subs and unsubs have changed State, and that store contains (or doesn't) each one. ContainsAll and ContainsNone or something.
I really don't like checking len as a catchall, because it could false positive.
We're not locking store when we do any of this, afterall.

exchanges/stream/websocket.go Outdated Show resolved Hide resolved
Copy link
Collaborator

@gbjk gbjk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work.
Sorry for change request. Spent a while debating. 😢

Comment on lines 210 to 212
// PartitionByPresence returns two lists; one with the subscriptions that are present in the store and one with the
// subscriptions that are not
func (s *Store) PartitionByPresence(compare List) (matched, unmatched List) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about this for a while, and I think it's cute, but wrong to have a function where you're always using one or the other return.
I thought about giving it a bool for whether you want it there or not.
Took me a while to work out why "Presence" is wrong; It's a noun. Present is more correct, having an adjective form, but still sounds weird.

I'd vote for 2 funcs:

  • Contained returns a List of subs in compare which are in the store already
    • alt. Filter, Matching, Match, Union, Contains (but sounds boolish)
  • Missing returns a List of subs in compare which are not in the store already
    • alt. Absent, NotContained, Excluded

Alternative note: FilterMatching is probably the best name for keeping a single func, but adding a "contained or missing bool"

I'm feeling a tingle about how similar this is to Diff, and yet not.
Also around Venn and terminology. Spent a while looking at slices and at wiki on set theory and any precedence on this in stack exchange, but came up with nothing good.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good suggestion. I will introduce contained and missing as it has a better approach and single responsibility is more efficient even though it increases code base. If you wanted a set theory naming convention could have Intersection and Difference but it becomes less obvious for people with little to no experience with that terminology. (me)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shazbert shazbert changed the title stream: force subscription store count check as stop gap for wrapper side implementation stream: force subscription store check as stop gap for wrapper side implementation Jan 29, 2025
@shazbert shazbert requested a review from gbjk January 30, 2025 21:15
Comment on lines 59 to 60
errSubscriptionsNotAdded = errors.New("subscriptions not added")
errSubscriptionsNotRemoved = errors.New("subscriptions not removed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that these are being returned by Connect(), I think it would be nice to export these so you can help distinguish an error from connecting or an error for subscribing and act accordingly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
s.mu.RLock()
defer s.mu.RUnlock()
var matched List
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can move this to the signature if you want, and/but still use explicit return with it.

}
s.mu.RLock()
defer s.mu.RUnlock()
var unmatched List
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whilst considering maybe moving this to the signature, can also consider calling it missing for consistency.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@gbjk gbjk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good from me 👍 Great work.

Comments are entirely optional notes.

@shazbert shazbert requested a review from gloriousCode February 2, 2025 23:54
Copy link
Collaborator

@gloriousCode gloriousCode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tACK!

@gloriousCode gloriousCode added the gcrc GloriousCode Review Complete label Feb 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug gcrc GloriousCode Review Complete review me This pull request is ready for review
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants