-
Notifications
You must be signed in to change notification settings - Fork 822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: force subscription store check as stop gap for wrapper side implementation #1717
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1717 +/- ##
==========================================
- Coverage 37.15% 37.13% -0.02%
==========================================
Files 414 414
Lines 180198 180199 +1
==========================================
- Hits 66950 66916 -34
- Misses 105389 105428 +39
+ Partials 7859 7855 -4
|
I have concerns about building this out further:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given comment, I don't see a way to move this forward right now.
I feel like the manageSubs is the only place we can properly manage sub state.
One edge case in the back of my mind is resubscribing, as well, particularly on orderbook fail.
These are all fair points, an option I will investigate is check the connection subscription status after subscriptions to see if they have been added or removed. So that at least it complains that it needs to be added in the exchange packages themselves. |
@shazbert Just realised that I haven't mentioned: I completely agree about abstracting from exchange implementations. I'm just not yet ready to see how to do it. I've been looking to abstract this, but I don't think we're ready yet because too much is in flight and unmerged. |
@gbjk I have added in new code now with fixes, when you have time could you please check them out. |
exchanges/stream/websocket.go
Outdated
if err := w.UnsubscribeChannels(c, unsubs); err != nil { | ||
return err | ||
} | ||
if diff := prevState - store.Len(); diff != len(unsubs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't feel right ...
- Shouldn't Unsubscribe be erroring if it wasn't successful ?
- Shouldn't the state of all subs in unsubs be changing ?
That said, it's not too bad either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't Unsubscribe be erroring if it wasn't successful ?
I think it should, I just added it as a catch all in the event we forgot to remove it from the store when it was successful. Then it should complain. should 😬
Shouldn't the state of all subs in unsubs be changing
Now you are making me think this is all completely wrong 😆. This specifically didn't catch any issues. Can you suggest a better way as a back up check? Cause I am drooling at my screen trying to figure it out 🤤.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late reply on this.
I think I'd want see that ranging the subs and unsubs have changed State, and that store contains (or doesn't) each one. ContainsAll
and ContainsNone
or something.
I really don't like checking len as a catchall, because it could false positive.
We're not locking store when we do any of this, afterall.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
It's not too invasive, and doesn't change too much, so I'm happy.
I'm working on sub management and routing separately for gateio, but this doesn't tread on any toes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A strict rule, but it makes sense
if method == "unsubscribe" { | ||
return d.Websocket.RemoveSubscriptions(d.Websocket.Conn, subs...) | ||
} | ||
return d.Websocket.AddSubscriptions(d.Websocket.Conn, subs...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, didn't even notice that. None of those subs were being added to the subscription store and my changes complained. I will check it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…ifference method as its only used locally and Diff method can be accessed directly
…mplate for edge case perps with settlement
} | ||
for _, s := range subs { | ||
if _, ok := subAck[s.QualifiedChannel]; ok { | ||
err = common.AppendError(err, d.Websocket.AddSuccessfulSubscriptions(d.Websocket.Conn, s)) | ||
if strings.Contains(method, "subscribe") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tested yet, but "unsubscribe" contains the string "subscribe" and so both would likely fall in here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work.
Only nits.
I'm asking for a change to not tear everything down on single sub failure, but if you don't make that change, it's no worse than before, so I'm fine.
@@ -123,6 +123,8 @@ var ( | |||
} | |||
) | |||
|
|||
var errSubscriptionFailureDetected = errors.New("subscription failure detected") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like it's stream.ErrSubscriptionFailure
in disguise 🤠
@@ -779,6 +781,7 @@ func (d *Deribit) GetSubscriptionTemplate(_ *subscription.Subscription) (*templa | |||
"channelName": channelName, | |||
"interval": channelInterval, | |||
"isSymbolChannel": isSymbolChannel, | |||
"fmt": formatPerpetualPairWithSettlement, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Don't think we can call this fmt
when it's specific to Futures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It calls it against every pair, are you asking me to only call it against futures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. But I think it's misleading that it's not really format for every pair, even if it's called for all of them.
So I guess I'd vote for: Rename the template func to fmtFuturePairs
or rename the func called to formatChannelPair.
What's catching me is the disconnect between what's in the template and the actual func name.
Can also ignore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -827,16 +831,23 @@ func (d *Deribit) handleSubscription(method string, subs subscription.List) erro | |||
subAck[c] = true | |||
} | |||
if len(subAck) != len(subs) { | |||
err = common.ErrUnknownError | |||
err = errSubscriptionFailureDetected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this would be covering ... any subs that come back that we didn't expect, which won't be reported in the for range below?
It feels like it'd be better to delete from subAck as we range subs, and then error to say "Unexpected channels in Result" or something.
Cos otherwise, this error will be impossible to diagnose and I'd probably say misleading.
exchanges/stream/websocket.go
Outdated
if len(subs) != 0 && w.connectionManager[i].Subscriptions.Len() != len(subs) { | ||
multiConnectFatalError = fmt.Errorf("%v %w expecting %d subscribed %v", w.exchangeName, errSubscriptionsNotAdded, len(subs), subs) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have this problem:
We call connect on an exchange, one pair on one asset fails to subscribe (like TRUMPEW the other day) and all connections get torn down for all asset types and subs.
I still don't agree with that.
The biggest problem I think I have is the strong coupling between connections and subs.
If a consumer wants to connect the websocket without subs, and then later call Subscribe
manually, we both allow it, and don't support it at the same time.
I think this is a wider topic, because this change doesn't make 454/460 worse, but it doubles down on the concept.
I think we need to say "Connecting is connecting, and Subscribing is separate" especially when new subs or resubs could connect new subs.
I'll continue to work on this, so I'm just highlighting for alignment on direction.
In the meantime, I'd gently request again to log the error, and return it, but not tear down all of the subs and conns for all assets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call connect on an exchange, one pair on one asset fails to subscribe (like TRUMPEW the other day) and all connections get torn down for all asset types and subs.
Do you remember what specifically caused this?
The coupling connect() with subs is not great, regarding "Connecting is connecting, and Subscribing is separate"
though is I think connect
function might be changed to handle connections that don't require subscriptions and spawn a middleware handler that performs a JIT connection based on current/incoming subscriptions which also drops connections when unsubscribing when subs is empty, that way it scales a bit better with respect to a connections max sub capacity. I don't currently have a design that could help you in that respect though.
In the meantime, I'd gently request again to log the error, and return it, but not tear down all of the subs and conns for all assets.
Sure, if it's easier for everyone else I will just log it and I can just modify my own trading branch to be throwing the baby out with the bathwater myself. 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gloriousCode @thrasher- I pivot off these errors in my own trading branches that retries connect until it establish a clean slate or base line of full subscriptions across all trading pairs that I require. Happy to just log this out like GK suggests if we all agree and I will update tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you remember what specifically caused this?
A badly listed futures pair erroring on orderbook.
I pivot off these errors in my own trading branches that retries connect until it establish a clean slate or base line of full subscriptions across all trading pairs that I require.
Okay. This might be the crux of our disagreement.
You should not need to disconnect and unsub everything just because one thing fails, if your goal is to ensure it eventually works.
connectionManager should be monitoring and reconnecting, and if it doesn't have requisite granularity we should improve that.
And I think subscriptionManager should manager subscriptions.
What I'm aiming for is smaller responsibilities for functions that have names (and meanings) like connect, so they're more versatile (or composable) and easier to test.
Can you throw me any other requirements you might have like "retry subs and conns until everything is working", if you have any ?
I see exactly the same situation, and also wanna know "When's everything looking good?".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'm aiming for is smaller responsibilities for functions that have names (and meanings) like connect, so they're more versatile (or composable) and easier to test.
Good direction to have, I agree.
Can you throw me any other requirements you might have like "retry subs and conns until everything is working", if you have any ? && "When's everything looking good?"
Pretty much just this; Personally, I am still going to rely on ensuring a clean slate of full subscriptions before proceeding. If even one subscription fails, it indicates a potential issue with the connection or the subscription logic, and I prefer to retry until everything is established correctly, or bring the instance offline and fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I see gk's issue with regards to killing all connections, but I don't see it as something huge because we already kill everything when
Subscribe
is called and return errors. The refactoring you speak of gk will change that section regardless, but I get not wanting to tangle further. - Logging the issue isn't actionable via code and so I don't think that's the best way to go
I propose we move subscribing for multi-connections to the end of the connect()
function. It is a small change™️ that satisfies the following criteria:
- The default handling of websocket connections via
websocketroutine_manager.go
will log any errors returned - It won't by default kill all connections when subscriptions have issue
- It allows users who call
Connect()
directly to handle any errors that are returned, and resubscribe/reconnect/panic("help!")
Gist:
https://gist.github.com/gloriousCode/609d25ea1ee9b954ecbb95b505c76609
It comes with the caveat that I haven't tested things thoroughly. I tried things like separating subscribing, but that's way too large a refactor for this PR and comes with many extra considerations that I don't think are what is desired out of this PR - which is to ensure all subscriptions are subscribed to and that the caller knows it
Does this solution make people happy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's a good compromise with minimal intrusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exchanges/stream/websocket.go
Outdated
if err := w.UnsubscribeChannels(c, unsubs); err != nil { | ||
return err | ||
} | ||
if diff := prevState - store.Len(); diff != len(unsubs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late reply on this.
I think I'd want see that ranging the subs and unsubs have changed State, and that store contains (or doesn't) each one. ContainsAll
and ContainsNone
or something.
I really don't like checking len as a catchall, because it could false positive.
We're not locking store when we do any of this, afterall.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work.
Sorry for change request. Spent a while debating. 😢
exchanges/subscription/store.go
Outdated
// PartitionByPresence returns two lists; one with the subscriptions that are present in the store and one with the | ||
// subscriptions that are not | ||
func (s *Store) PartitionByPresence(compare List) (matched, unmatched List) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought about this for a while, and I think it's cute, but wrong to have a function where you're always using one or the other return.
I thought about giving it a bool for whether you want it there or not.
Took me a while to work out why "Presence" is wrong; It's a noun. Present
is more correct, having an adjective form, but still sounds weird.
I'd vote for 2 funcs:
- Contained returns a List of subs in compare which are in the store already
- alt. Filter, Matching, Match, Union, Contains (but sounds boolish)
- Missing returns a List of subs in compare which are not in the store already
- alt. Absent, NotContained, Excluded
Alternative note: FilterMatching
is probably the best name for keeping a single func, but adding a "contained or missing bool"
I'm feeling a tingle about how similar this is to Diff, and yet not.
Also around Venn and terminology. Spent a while looking at slices
and at wiki on set theory and any precedence on this in stack exchange, but came up with nothing good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good suggestion. I will introduce contained and missing as it has a better approach and single responsibility is more efficient even though it increases code base. If you wanted a set theory naming convention could have Intersection
and Difference
but it becomes less obvious for people with little to no experience with that terminology. (me)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…untering subscription specific error paths
exchanges/stream/websocket.go
Outdated
errSubscriptionsNotAdded = errors.New("subscriptions not added") | ||
errSubscriptionsNotRemoved = errors.New("subscriptions not removed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that these are being returned by Connect()
, I think it would be nice to export these so you can help distinguish an error from connecting or an error for subscribing and act accordingly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exchanges/subscription/store.go
Outdated
} | ||
s.mu.RLock() | ||
defer s.mu.RUnlock() | ||
var matched List |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can move this to the signature if you want, and/but still use explicit return with it.
exchanges/subscription/store.go
Outdated
} | ||
s.mu.RLock() | ||
defer s.mu.RUnlock() | ||
var unmatched List |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whilst considering maybe moving this to the signature, can also consider calling it missing
for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All good from me 👍 Great work.
Comments are entirely optional notes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tACK!
PR Description
Noticed in Bybit that subscriptions weren't being added when I was flushing the connection every hour,
so abstracted adding and removing to websocket package then I can merge this to the open PR chain. This is so we don't need to toil in the wrappers as well but there might be some edge cases I am not considering.This only impacts exchanges that are upgraded to the websocket multi connection.Added in checks in stream package so that the outbound subscriptions are added or removed from store.
Fixes bybit and Deribit
Type of change
Please delete options that are not relevant and add an
x
in[]
as item is complete.How has this been tested
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration and
also consider improving test coverage whilst working on a certain feature or package.
Checklist