Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Significant redelivery while existing messages are still being "processed" #10437

Closed
jameshartig opened this issue Jun 25, 2024 · 50 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@jameshartig
Copy link
Contributor

jameshartig commented Jun 25, 2024

Client

PubSub v1.39.0

Environment

CentOS on GCE

Go Environment

N/A

Code

I'll try to include the relevant code below as it pertains to this issue.

e.g.

pubsub.NewClientWithConfig(ctx, project, &pubsub.ClientConfig{}, 
	 option.WithGRPCConnectionPool(4),
)
...
sub.ReceiveSettings.MaxOutstandingMessages = 400000
sub.ReceiveSettings.MaxOutstandingBytes = -1
sub.ReceiveSettings.MaxExtension = 2*time.Hour + 30*time.Minute + 5*time.Minute
sub.ReceiveSettings.MinExtensionPeriod = 10*time.Minute
sub.ReceiveSettings.Synchronous = false
sub.ReceiveSettings.NumGoroutines = 1
...
var waitingIDs sync.Map
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	var qbm queuedExitMessage
	if err := json.Unmarshal(m.Data, &qbm); err != nil {
		// nack
		return
	}

	if qbm.Time.Add(minTime).Before(time.Now()) {
		// check to see if we already have this message ID in memory and if
		// we do then we need to cancel the old one
		waitingCancelledCh = make(chan struct{})
		olderMsgCancelChI, ok := waitingIDs.Swap(psm.ID, waitingCancelledCh)
		// clean it up once were done as long as its the same channel
		defer waitingIDs.CompareAndDelete(psm.ID, waitingCancelledCh)
		// if there was already a channel in there then close it to cause it
		// to stop waiting
		if ok {
			close(olderMsgCancelChI.(chan struct{}))
		}
		select {
		case <-ctx.Done():
			// nack
			return
		case <-time.After(time.Duration(srimErr)):
			// continue to handle message
		case <-waitingCancelledCh:
			// nack since another goroutine got this message
			return
		}
	}

	// ... process message
})

Expected behavior

I expect there to be minimal redeliveries while a message is being handled and should still be modacked by the client.

Actual behavior

We see messages being redelivered in spikes frequently. Since we only keep track of "already processing" messages on a per VM basis we can't accurately track when we get redeliveries to different VMs.

Screenshots

There are around 400k-450k unacked messages at any given time.
image

That said, the messages are each pretty small and only total 50MB-60MB:
image

We let messages get as old as ~2 hours before we start to actually "process" them:
image

We are generally "processing" the message in less than 10 seconds at the 90th percentile:
image

This is a chart of all of the times we had to cancel an existing "pending" message because we received another one.
image
You'll notice it spikes as high as 300k which means we are getting redeliveries of most of the outstanding messages.

This chart shows the total messages being held in memory per datacenter:
image
You'll notice that it's pretty normal until 6/22/2024 at 20:00 UTC where it then started to increase dramatically. Then it returned to normal on 6/24 but then started to be erratic again on 6/24 at 20:00 UTC. Another thing to notice is that it spikes above 400k which means we have significantly more messages being held in-memory than are even outstanding in the subscription.

Even worse is if you look at the messages being held by the pub/sub client itself (this comes from the OustandingMessagesView) (green is sum (across 2-3 servers) and yellow is average):
image
When things were normal the client was holding onto 400k-500k messages but when things go wrong the client is holding onto 1 million+ messages (presumably in flow control) which seems insane since there shouldn't be more than 400k messages in the subscription.

We set MinExtensionPeriod to 10 minutes since we expect to hold onto the messages for a long time and don't need to modAck all of the time. This seems to be working since we see a spike in modAck's every 5 minutes:
image

What's odd though is that around when we submit the modAck's we see the spike in new pulled messages and spike in duplicate messages:
image

Additional context

NumGoroutines was previously set to 2 but after the issues on 6/22, we lowered it to 1 without much of a difference. WithGRPCConnectionPool was previously set to 2 (default) but we raised it to 4 at the same time as lowering NumGoroutines and again, there wasn't much difference. We also tried lowering the number of VMs supporting this workload from 3 -> 2 but that seemed to have made things worse.

The Acknowledgement deadline on the subscription is set to the default (10 seconds). We haven't changed this but could if you think that would help.

We were previously "abusing" the backoff delay and kept nack'ing messages until the ~2 hours was up but after talking to a Pub/Sub PM we were told that the backoff delay isn't a guarantee and that only a certain number of messages will respect that backoff delay. This approach (holding the messages in memory in our handler) was suggested as a better approach. Things were working pretty normally until recently so I'm not sure if there was some change on the Pub/Sub side. We didn't make any changes to our code on 6/22. It's also a bit interesting that things were okay on 6/24 until 20:00 UTC again like on 6/22.

I'm wondering if there's some memory/message limit on the forwarders that we might be hitting which causes them to crash and then another forwarder redelivers the messages. My assumption is based on us not having issues when the number of undelivered messages total stays under ~420k because on both 6/22, 6/23, and 6/24 we hit new peaks of ~430k, ~451k, and ~445k outstanding messages, respectively. If that's the case then would actually increasing the number of goroutines help so that messages could be potentially spread across more forwarders? We could also try to decrease the size of the messages but they're already pretty small.

Finally, related to above, I limited this to only europe-west1 because we have the most traffic there and the other regions seem relatively unaffected.

@hongalex I apologize in advance for the complicated issues.

@jameshartig jameshartig added the triage me I really want to be triaged. label Jun 25, 2024
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jun 25, 2024
@hongalex hongalex added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. and removed triage me I really want to be triaged. labels Jun 26, 2024
@hongalex
Copy link
Member

Thanks as always for the detailed logs.

What's odd though is that around when we submit the modAck's we see the spike in new pulled messages and spike in duplicate messages:

I think this might be the other way around, where pulled messages = more modacks issued at the time they are pulled in (to acknowledge receip of these messages).

If that's the case then would actually increasing the number of goroutines help so that messages could be potentially spread across more forwarders? We could also try to decrease the size of the messages but they're already pretty small.

There's no guarantee that increasing the number of streams means you'll hit another server. On top of that, this level of processing shouldn't be OOMing the servers so that doesn't seem right either. I'll check to see if there are any changes around then that might've resulted in this.

Just to confirm, this region was processing the data at the same speed (around 400k messages per client outstanding) without issue prior to 6/20?

@jameshartig
Copy link
Contributor Author

I think this might be the other way around, where pulled messages = more modacks issued at the time they are pulled in (to acknowledge receip of these messages).

Good point, yes that's probably what it is.

Just to confirm, this region was processing the data at the same speed (around 400k messages per client outstanding) without issue prior to 6/20?

Looking back we did have a bit of an issue on 6/18/24 starting at 7:00:00 UTC but it wasn't as bad.

Also, we seem to be doing well today (6/26) so far:
image

Other than a small blip when we had a VM get preempted and restarted things seem to be pretty even and working well. I'll update if anything changes later in the day.

@jameshartig
Copy link
Contributor Author

jameshartig commented Jun 26, 2024

Update:

Around 20:17:00 UTC there was a jump in redeliveries:
image

I don't see any significant increase in the number of unacked messages around that time:
image

We also didn't have any preemptions or any service restarts but it does seem like at that same time there was a new streaming pull connection opened:
image
(The title of this graph says "by subscription" but its actually "by hostname")
But that seems to happen pretty frequently so I'm not sure if that's related.

Let me know what else I can gather on my end.

@hongalex
Copy link
Member

So it seems like the issue is because of messages expiring. Specifically, messages are being pulled into the client library but not being processed (because concurrency control, MaxOutstandingMessages, is reached). This is supported by the fact that your end to end latency is very high around the time of these expirations (>1 hour). I think server side flow control is working most of the time, but when the stream is torn down and recreated, the server is erroneously delivering more messages. My current hypothesis goes something like

  1. Client reads 400k messages from stream
  2. Client acquires flow control resources for this
  3. Client tries to pull more messages from stream, but seeing that flow control limits are reached, server doesn't deliver any more messages
  4. Stream is restarted and connects to a different server, which forgets about the previous flow control limits for this stream, and delivers another 400k messages, while the messages from step 1 are still being processed

These steps happen randomly and eventually you build up a large number of outstanding messages in your client that can't be processed in time. Since your MaxExtension is set to 2 hours, the client could keep these messages around for a while, while accumulating more messages and resulting in a larger in-memory backlog it won't be able to process. Eventually, these expire and are redelivered.

I'll try to craft a version of the library that includes a separate setting for #callbacks, as promised in #9727, which should let you set num_callbacks to > 400k and thus your application won't get bogged down by messages received but not being processed in these off cases. The long term solution is to fix the behavior in step 4, but that requires a lot more planning.

@jameshartig
Copy link
Contributor Author

  1. Stream is restarted and connects to a different server, which forgets about the previous flow control limits for this stream, and delivers another 400k messages, while the messages from step 1 are still being processed

I was thinking something like this was happening but I shrugged it off initially because I assumed the new server would not have any messages to deliver because all of the existing messages in the subscription were already leased by a client (and being modack'd). Or does the stream being killed/restarted invalidate all of the leases being held by the outstanding messages delivered on that stream?

These steps happen randomly and eventually you build up a large number of outstanding messages in your client that can't be processed in time.

I'm also a bit confused why it's working for several hours of the day then suddenly decides to redeliver a bunch of messages despite the stream (seemingly) being restarted constantly throughout the day. Could it be that it's reconnecting to the same forwarder most of the time but when it connects to a different one (in step 4) that causes the issue?

Or I'm trying to see if it's somehow related to MaxOutstandingMessages=400000 and when the number of unacked messages grows higher than 400k we start to have problems. I'm not sure what that might have to do with it given your explanation but it seems too coincidental that things are fine until the unacked messages grows over 400k in the subscription. If I raise the MaxOutstandingMessages to 500k could the problem be delayed further?

@hongalex
Copy link
Member

because I assumed the new server would not have any messages to deliver because all of the existing messages in the subscription were already leased by a client (and being modack'd).

I was assuming you have a constant flow of messages coming into your subscription. Is your publish throughput less sustained and done more in bursts?

Could it be that it's reconnecting to the same forwarder most of the time but when it connects to a different one (in step 4) that causes the issue?

Yeah that's what I'm thinking is happening.

I'm not sure if increasing MaxOutstandingMessages will help here. I think it might be best to try the new version of the library that tweaks callbacks independently (maybe setting that to 500k) and seeing if this issue is better. I'll update here once that's ready.

@hongalex
Copy link
Member

hongalex commented Jun 28, 2024

@jameshartig When you get the chance, can you pull in this version of the library:
go get cloud.google.com/go/pubsub@pubsub-max-callbacks.

This introduces a new setting to ReceiveSettings called MaxCallbacks which configures how many callbacks will be invoked at any given time (superseding the previous way of configuring this via MaxOutstandingMessages). MaxCallbacks defaults to 2 * MaxOutstandingMessages, but that is subject to change.

For your case, I recommend keeping MaxOutstandingMessages to 400k and MaxCallbacks to 500k and seeing if that helps.

@jameshartig
Copy link
Contributor Author

Apologies for the delay.

I was assuming you have a constant flow of messages coming into your subscription. Is your publish throughput less sustained and done more in bursts?

Yes, we do have a constant stream. I see what you meant now but I still wouldn't expect to see a huge spike in redelivered messages like we do see.

For your case, I recommend keeping MaxOutstandingMessages to 400k and MaxCallbacks to 500k and seeing if that helps.

I'll work on getting that out today and report back.

@jameshartig
Copy link
Contributor Author

This introduces a new setting to ReceiveSettings called MaxCallbacks which configures how many callbacks will be invoked at any given time (superseding the previous way of configuring this via MaxOutstandingMessages). MaxCallbacks defaults to 2 * MaxOutstandingMessages, but that is subject to change.

Separately, I'm not sure how best to set the 2 fields in conjunction for a variety of applications.

  1. Some applications we want to limit the parallel messages to as-close-to-one-as-possible for various reasons. Obviously understanding that in practice it means it's not guaranteed and only per instance. Would that mean we set MaxCallbacks=1 and MaxOutstandingMessages=1?
  2. Others have low throughput and just expect ~10 messages to be outstanding at any given time but we don't care if its 10 or 20 (or maybe even 30+ in most cases but <100). Would I set MaxOutstandingMessages=10 and let the client automatically decide MaxCallbacks=20?
  3. The application for this issue obviously has some limits because of RAM constraints so we want a hard cap on MaxCallbacks of x (for now 500k) but then should we be required to set MaxOutstandingMessages to some fraction of x (for now 75%)? I think this is a similar situation to the application in pubsub: Many acks/modacks could cause other acks/modacks to be delayed #9727 except that is more CPU-bound.

We try to hide most of this complexity from our developers, except when necessary (which is maybe only the 3rd case above) so I'm trying to figure out how to support 1 & 2 automatically. Currently our framework exposes a knob that is now going to be MaxCallbacks.

@hongalex
Copy link
Member

hongalex commented Jul 1, 2024

Some applications we want to limit the parallel messages to as-close-to-one-as-possible for various reasons. Obviously understanding that in practice it means it's not guaranteed and only per instance. Would that mean we set MaxCallbacks=1 and MaxOutstandingMessages=1?

Yes, that's currently the behavior in the main branch. MaxOutstandingMessages is the same as the number of callbacks to start, and configures server side flow control (per stream). You might run into the issue where the streams "forget" about the current flow control when reconnecting, but this is a current limitation with the server that we're actively looking to address.

Others have low throughput and just expect ~10 messages to be outstanding at any given time but we don't care if its 10 or 20 (or maybe even 30+ in most cases but <100). Would I set MaxOutstandingMessages=10 and let the client automatically decide MaxCallbacks=20?

That sounds like a decent tradeoff. Most of the time, setting MaxOutstandingMessages = 10 means that when there is no redelivery (and streams = 1), client side concurrency will never kick in, and there will only be 10 messages processed simultaneously.

The application for this issue obviously has some limits because of RAM constraints so we want a hard cap on MaxCallbacks of x (for now 500k) but then should we be required to set MaxOutstandingMessages to some fraction of x (for now 75%)? I think this is a similar situation to the application in #9727 except that is more CPU-bound.

I think the upper bound of RAM should determine MaxCallbacks, since most of the time, the client will only be server side flow controlled, or in other words, only have MaxOutstandingMessages in memory at any given time.

@jameshartig
Copy link
Contributor Author

jameshartig commented Jul 1, 2024

We deployed the pubsub-max-callbacks branch with MaxCallbacks=500000 but quickly noticed things go south:
image

During the same period the number of unacked messages was around 450k-460k:
image

Yet we are seeing over 1 million messages currently being held in memory.

@jameshartig
Copy link
Contributor Author

Alternatively, what if we set MaxOutstandingMessages to something significantly lower, like 50k or 100k. Can we reasonably expect that we will get the oldest messages first?

@hongalex
Copy link
Member

hongalex commented Jul 2, 2024

Thanks for trying that fix.

In my dashboards, I can see that the ack latencies for messages (time between when server delivers a message until a message is acked) is very high, consistently around 2 hours, with a spike in expired messages around the time of your last screenshots. In addition, the messages held in memory are being lease extended. If your original statement about processing times being about 10s at p99 still stands, this points to the fact the library is pulling in messages as they are available, but the limiting factor continues to be number of simultaneous callbacks in memory.

Alternatively, what if we set MaxOutstandingMessages to something significantly lower, like 50k or 100k.

I was actually going to suggest this. Given that your ack latencies are high to the server, it's possible your client isn't actually able to process 400k messages simultaneously.

Can we reasonably expect that we will get the oldest messages first?

Pub/Sub delivers the oldest messages on a best-effort basis, yes.

@jameshartig
Copy link
Contributor Author

jameshartig commented Jul 2, 2024

In my dashboards, I can see that the ack latencies for messages (time between when server delivers a message until a message is acked) is very high

Apologies for the confusion. If you look back at the code in the original issue it sleeps for ~2 hours (it actually depends on the timestamp embedded in the message itself) and then processes the message. When I talked about the processing time being under 10 seconds I meant after the sleeping.

with a spike in expired messages around the time of your last screenshots.

This is interesting. I wonder if this is from us nack'ing all the messages being held when the application shutdown during the deploy of the new version around that time.

Also, we set MinExtensionPeriod: 10 * time.Minute and it seems to be acking every 10 minutes:
image

However, since it has to mod ack 400k+ messages it would break them up into 160 batches. I set the grpc pool to 4 which should be more than enough to handle those immediately but if it takes more than 5 seconds (gracePeriod) to mod ack them then the messages would expire and be redelivered. We are seeing some significant GC pauses but I don't think they're anywhere close to 5 seconds.

If we tracked the time since the last modack and the deadline on the last modack we could see in the client if we are modack'ing it too late and maybe increment some sort of counter so we can see if that's happening. It could also be that the modack request itself is taking longer than 5 seconds but that doesn't seem like the case:
image

Thoughts?

@hongalex
Copy link
Member

hongalex commented Jul 2, 2024

This is interesting. I wonder if this is from us nack'ing all the messages being held when the application shutdown during the deploy of the new version around that time.

Yeah that's probably what it is actually. There's a pretty clear correlation between nack and ack expirations (since they are the same).

If we tracked the time since the last modack and the deadline on the last modack we could see in the client if we are modack'ing it too late and maybe increment some sort of counter so we can see if that's happening

I could try this, though I'm a bit skeptical this is happening since the expiration rate doesn't seem to be that high relative to your processing rate. I think the expirations seem to be closely correlated with nacks.

Just to confirm, from this comment, the problem is that your in-memory process rate is lower than before the change right? Did you keep the settings since then, or did you switch to different settings?

@jameshartig
Copy link
Contributor Author

jameshartig commented Jul 2, 2024

I could try this, though I'm a bit skeptical this is happening since the expiration rate doesn't seem to be that high relative to your processing rate. I think the expirations seem to be closely correlated with nacks.

Yeah that makes sense. To confirm, for the last several hours there hasn't been many expirations but the number of messages we are holding onto keeps going up. Assuming that the expired graph represents modack expirations then I agree that the modack theory probably isn't what's happening.
image
image

Just to confirm, from #10437 (comment), the problem is that your in-memory process rate is lower than before the change right?

I'm hoping that the sum of the messages we keep in-memory (i.e. waiting for 2h sleep) across all of the instances is approximately the number of unacked messages in the subscription. Ideally each instance would get a distinct subset of the messages. The problem I outlined in that comment is that after the change we almost immediately got way more messages sent to our handler than we should have.

The publish rate for this subscription is 50-70 messages a second. So we expect to have approximately 60*7200=432k messages unacked at any given time but we should be able to process significantly more than the publish rate (except for the 2 hour sleep).

Also, we are only experiencing this issue in europe-west1 (each region has it's own separate topic). That happens to be our largest region by about 2x. In the other regions the sum of the messages being held by our handler matches up exactly with the unacked messages in the subscription and we don't really see any duplicate messages.

Did you keep the settings since then, or did you switch to different settings?

We initially rolled back after that earlier comment but have since rolled back out the changes you requested. We haven't seen much improvement with or without the MaxCallbacks change.

@jameshartig
Copy link
Contributor Author

We just put MaxCallbacks=500000 and MaxOutstandingMessages=150000 live and unfortunately it didn't really help. We quickly approached 900k+ messages in-memory (in sleep by our handler).

@jameshartig
Copy link
Contributor Author

We have been noticing some CPU spikes and I managed to pull a pprof/profile during one of them and it was:
image

Which makes me think that we're creating too many timers.

@hongalex
Copy link
Member

hongalex commented Jul 3, 2024

There definitely are a good amount of timers in iterator.go but I can't see how that would take up the most amount of time, especially if you only have 1 iterator when NumGoroutines=1. The number of timers shouldn't be correlated with the number of messages either. I'll dedicate some time soon to trying to repro your setup and see if I get the same pprof graph.

I think the question that's bugging me now is why you are pulling in more messages when server side flow control should have stopped delivering messages after the first 150k. Could you try setting MaxOutstandingBytes to something to see if that caps your in memory messages at all?

If that doesn't work, I would next suggest opening a ticket so the backend team could look at this too.

@jameshartig
Copy link
Contributor Author

jameshartig commented Jul 3, 2024

Our handler was creating a timer for every single message to time how long to wait until it's been 2 hours which I believe was the source of the problem. I'm going to work on optimizing that code a bit to see if that helps at all. I temporarily switched it to pool the timers so there shouldn't be more than ~7200 timers. I'll let that run over the next day to see if there's any impact.

I think the question that's bugging me now is why you are pulling in more messages when server side flow control should have stopped delivering messages after the first 150k.

Here are the opencensus metrics for OutstandingMessages after MaxOutstandingMessages=150000:
image

If it helps, there's seems to be a jump every 30 minutes, I'm not sure if that's significant.

It somewhat lines up with the StreamOpen counter increasing:
image

Could you try setting MaxOutstandingBytes to something to see if that caps your in memory messages at all?

I'll need to do some calculation on how many bytes to set it to, or do you mean just set it to any number that's non-negative?

@hongalex
Copy link
Member

hongalex commented Jul 3, 2024

If it helps, there's seems to be a jump every 30 minutes, I'm not sure if that's significant.
It somewhat lines up with the StreamOpen counter increasing:

Right, I think this tracks with my current understanding of the situation. When streams are reopened, there's a chance that the server will deliver more messages because it currently does not have a way of knowing how many messages are already in memory by each client. This is something we're working on fixing.

Re: bytes, this might not be necessary anymore. From your recent "Client Outstanding Messages" graph, it does seem like each client holds around 150k messages, with some temporary increases beyond that around when there are stream opens. This doesn't align with your previous comment where you were holding 900k messages in memory. Perhaps I'm misunderstanding something, or do you think the opencensus metrics are incorrect?

@jameshartig
Copy link
Contributor Author

Re: bytes, this might not be necessary anymore. From your recent "Client Outstanding Messages" graph, it does seem like each client holds around 150k messages, with some temporary increases beyond that around when there are stream opens. This doesn't align with your previous #10437 (comment) where you were holding 900k messages in memory. Perhaps I'm misunderstanding something, or do you think the opencensus metrics are incorrect?

The above screenshots were after a deploy during which it does seem to work but then the next day...
image

@hongalex
Copy link
Member

hongalex commented Jul 3, 2024

Yeah, in that case, could you try setting MaxOutstandingBytes to 150,000 * average size of message in bytes. If this fixes the issue, it would point to an issue with the logic of how we do server side flow control.

@jameshartig
Copy link
Contributor Author

On Friday I deployed MaxOutstandingBytes=18750000, MaxOutstandingMessages=150000, and MaxCallbacks=500000. Over the last 12 hours it does seem like the limits are being respected.

Here is the outstanding bytes from the client:
image

Here is the outstanding messages from the client:
image

That said, I'm still seeing significant redelivery of duplicate messages:
image

and I'm also now seeing significant expired ack deadlines now (as reported by stackdriver):
image

We had to make a change to our handler to rate limit parts of the handler since we started getting bursts of thousands of duplicate messages.

@jameshartig
Copy link
Contributor Author

We recently eclipsed 450k messages in the subscription backlog which could be significant since 150k * 3 VMs = 450k so I just deployed MaxOutstandingBytes=25000000, MaxOutstandingMessages=200000, and MaxCallbacks=500000. I'm not sure if any of the above could somehow be caused by this.

@hongalex
Copy link
Member

hongalex commented Jul 8, 2024

It seems suspicious that the message expiration tends to happen when your backlog increases. I'm curious to see if the behavior is better when total number of messages > backlog (with your recent change).

Also wanted to double check: your application only nacks messages if 1) the message is currently held in memory or 2) you are in the middle of shutting down. Is that right? I just wanted to make sure the expirations are resulting in nacks (with redeliveries) rather than the other way around (nacks gets recorded as expiration).

@jameshartig
Copy link
Contributor Author

It seems suspicious that the message expiration tends to happen when your backlog increases. I'm curious to see if the behavior is better when total number of messages > backlog (with your recent change).

Exactly. I'll report back with the 200k later today but so far things seem to be working well.

Also wanted to double check: your application only nacks messages if 1) the message is currently held in memory or 2) you are in the middle of shutting down. Is that right?

That used to be the case. Now we nack messages in 3 situations:

  1. If we get the same messageID again that we are already holding in memory we will nack the previously-received one.
  2. The application is shutting down. (this didn't happen at all during the 12 hour window shared above)
  3. We hit the newly-added database query rate limit.

We are debating disabling the rate limit that we recently added but need to first be confident in the rate of messages. This is part of issue of us configuring MaxCallbacks=500000 but we can't actually handle 500k concurrent messages resulting in 500k concurrent database queries.

@jameshartig
Copy link
Contributor Author

jameshartig commented Jul 8, 2024

Things were going well until 18:50 UTC:

Outstanding bytes:
image

Outstanding messages:
image

There was ~425k messages in the backlog so the total was tracking pretty close.

There wasn't any stream retries at 18:50:
image

All of the 3 instances saw jumps right at the exact same time. Later the stream did reconnect between 19:05 and 19:10 and that did result in some redelivery.

Most of the "new" messages were immediately put to sleep:
image

This is confirmed by a huge jump in the 90th percentile of received message "seconds since publish":
image

But since the duplicate delivered messages didn't jump much at 18:50 most of the messages must've been redelivered to another instance:
image

However, I'm not sure what happened at 19:25 when there was a huge redelivery to the same instance.

There wasn't any expired acks until close to 19:20:
image

I suspected it might be caused by us holding onto messages for longer than MaxExtension but if that was the case then I'd expect the number of messages held in memory to not increase much since the messages would be over 2 hours old already and we would just immediately process them.

Is there anything on the server-side that is tracking health of each stream/instance besides mod acks? It seems like something at 18:50 decided that it needed to redeliver all of the messages in the backlog and to mostly other instances.

But if all of the messages were actually redelivered to other instances then I would expect to see a steady amount of expired ack as the "old" instances finally processed the original messages and acked them. Besides that one spike in expired acks I haven't seen any significant increase. I'll report back if something changes over the next few hours.

Update: There's been a bunch of expired acks since 19:40 UTC:
image

@jameshartig
Copy link
Contributor Author

We also had the same problem happen in another datacenter with a different subscription at 19:51 UTC:
image

@jameshartig
Copy link
Contributor Author

jameshartig commented Jul 8, 2024

If I zoom out on the outstanding messages graph:
image

15:35 UTC: I deployed MaxOutstandingBytes=25000000
16:04: Unsure why nack'd messages were not delivered until now
18:50: Spike in message redelivery

There's 2 hours and ~46 minute difference between getting a bunch of messages and a spike in redelivery. MaxExtension=2h35m and MinExtensionPeriod=10m. It's either a coincidence or what happened is that the MaxExtension was hit for the messages and they were redelivered after 10 minutes (which was the last modack). There's no OpenCensus counter for that case in handleKeepAlives and unfortunately the histogram we use to track how long a message is stuck in our handler tops out at 7200 seconds. But our handler has ctx, cancel := context.WithTimeout(ctx, maxExtension) at the top so I'd expect to see a lot of context cancellations if we were somehow still holding onto it longer. I'm going to add some larger histogram buckets to verify.

Update:

After deploying a change, that included changing MaxExtension=2h45m at 23:11 there was a big spike at 3:06 (after 2h55m which matches the new MaxExtension+MinExtensionPeriod) and again at 6:39 (after 3h33 minutes?).
image

@jameshartig
Copy link
Contributor Author

jameshartig commented Jul 9, 2024

I believe the issue is that when I configured MaxOutstandingBytes the flow controller is now holding onto messages since there isn't any MaxCallbacks knob for bytes:

	fc := newSubscriptionFlowController(FlowControlSettings{
		MaxOutstandingMessages: s.ReceiveSettings.MaxCallbacks,
		// TODO(hongalex): introduce client side setting for this, relegating
		// maxBytes for server side flow control only.
		MaxOutstandingBytes:   maxBytes,
		LimitExceededBehavior: FlowControlBlock,
	})

So I think I need to undo setting MaxOutstandingBytes. Thoughts?

Also another OpenCensus metric that would be helpful is how many bytes and messages are blocked in Acquire.

@hongalex
Copy link
Member

hongalex commented Jul 10, 2024

Sorry, just catching up on the information you've provided (thanks for doing so).

So I think I need to undo setting MaxOutstandingBytes. Thoughts?

Yeah that's a good point. It seems like server side flow control is being exceeded anyway, which is frustrating.

Also another OpenCensus metric that would be helpful is how many bytes and messages are blocked in Acquire

I added to the pubsub-max-callbacks branch 2 more metrics, FlowControlledMessagesView and FlowControlledBytesView, which are sum views that sums the number of messages and bytes that come in.

If you need something which is a LastValue view, similar to OutstandingMessages, I could probably add it in as a temporary fix, though I'm not sure if it would add it as a permanent feature into the library.

After deploying a change, that included changing MaxExtension=2h45m at 23:11 there was a big spike at 3:06 (after 2h55m which matches the new MaxExtension+MinExtensionPeriod) and again at 6:39 (after 3h33 minutes?).

So, this mean does seem to me (as you noted) that messages aren't being processed quickly after the ~2 hour sleep, and might be timing out after even 45 minutes. This might be something that can be tracked using the OpenTelemetry tracing view (since user callback time is tracked there) to verify. If it would be helpful, I can create another branch that merges the otel branch + callbacks + opencensus metrics. Side note, unfortunately the otel tracing is delayed just a bit longer since we're waiting to bump the min go version from go1.20->go1.21 (which is the min version that otel requires now).

Is there anything on the server-side that is tracking health of each stream/instance besides mod acks? It seems like something at 18:50 decided that it needed to redeliver all of the messages in the backlog and to mostly other instances.

I believe there should be more information we can gather, though unfortunately I don't have access to this and you would need to open a ticket to look into this more.

@jameshartig
Copy link
Contributor Author

jameshartig commented Jul 10, 2024

I added to the pubsub-max-callbacks branch 2 more metrics, FlowControlledMessagesView and FlowControlledBytesView, which are sum views that sums the number of messages and bytes that come in.

Unfortunately, there's a fmt.Printf in the latest commit. Can you remove that and I'll deploy that change so I can track those views as well?

What's the difference between FlowControlledMessages and PullCount?

If you need something which is a LastValue view, similar to OutstandingMessages, I could probably add it in as a temporary fix, though I'm not sure if it would add it as a permanent feature into the library.

I'm interested to hear why a LastValue view wouldn't be added but a Count view would be? Or were you saying that probably neither will make it into the library permanently?

So, this mean does seem to me (as you noted) that messages aren't being processed quickly after the ~2 hour sleep, and might be timing out after even 45 minutes.

I thought so too but I've added a lot of metrics around that and I can't seem to find any messages that are taking longer than a few seconds. But it could be that messages are being stuck in flow control and are never making it to my handler. I'll be able to confirm that once I disable the MaxOutstandingBytes.

This might be something that can be tracked using the OpenTelemetry tracing view (since user callback time is tracked there) to verify. If it would be helpful, I can create another branch that merges the otel branch + callbacks + opencensus metrics.

I was about to suggest that. I do think that would help. Thanks!

Side note, unfortunately the otel tracing is delayed just a bit longer since we're waiting to bump the min go version from go1.20->go1.21 (which is the min version that otel requires now).

That's a shame but at least for us we run 1.22 internally so this is fine.

@jameshartig
Copy link
Contributor Author

After going back to MaxOutstandingBytes=-1 were back to significant duplicate delivery:
image

Each instance has ~300k messages but there are only ~370k messages unacked:
image

I'm also not seeing many expired acks other than a big spike at 14:53 which coincided with a bunch of nacks on our end because we received new duplicates.
image

I think the tracing will help pinpoint some of the problem with specific message IDs that I can use to open a ticket.

@hongalex
Copy link
Member

Removed the printf, but yes PullCount is essentially the same as FlowControlledMessages currently (minus when they were recorded). My original idea was to use FlowControlledMessages as a proxy for when messages were added to flow control, and then OutstandingMessages for when after the acquire happens. It might be easier to just track via a LastCount though.

I'm interested to hear why a LastValue view wouldn't be added but a Count view would be? Or were you saying that probably neither will make it into the library permanently?

Originally I was thinking neither would be added since we deprecated OpenCensus a while back, but given that OpenTelemetry metrics is still being worked on, I could see adding these metrics if they prove to be useful.

I'll focus on getting those changes as promised and will update later today.

@hongalex
Copy link
Member

Created cloud.google.com/go/pubsub@pubsub-otel-trace-callbacks which merges the current changes in the tracing PR, callbacks, and adds SubscriberFlowControlMessagesView and SubscriberFlowControlBytesView.

Something I realized when I was adding this is that the number of messages stuck by the flow controller when streams=1 should be limited to the batch of messages returned by StreamingPullResponse. After a single message is blocked on flow control, we stop trying to acquire flow control resources for other messages pulled, and also stop pulling on the stream. It's possible the messages are delivered to a client side gRPC buffer, but I think that buffer size is pretty small too.

@jameshartig
Copy link
Contributor Author

Created cloud.google.com/go/pubsub@pubsub-otel-trace-callbacks which merges the current changes in the tracing PR, callbacks, and adds SubscriberFlowControlMessagesView and SubscriberFlowControlBytesView.

I deployed that to production. My plan is to filter traces to messaging.gcp_pubsub.result: nacked and then assuming that those are re-deliveries, look to see what happened with the same messaging.message.id earlier.

Something I realized when I was adding this is that the number of messages stuck by the flow controller when streams=1 should be limited to the batch of messages returned by StreamingPullResponse. After a single message is blocked on flow control, we stop trying to acquire flow control resources for other messages pulled, and also stop pulling on the stream. It's possible the messages are delivered to a client side gRPC buffer, but I think that buffer size is pretty small too.

Now that we're back live with MaxOutstandingMessages=200000 and MaxCallbacks=500000 I believe it won't block on flow control until 500k so it should still keep trying to acquire messages even after 200k but ideally the server wouldn't send any new ones, correct?

@hongalex
Copy link
Member

Yeah that's how it should behave. If MaxOutstandingMessages does regularly get exceeded and that doesn't happen around when stream retries are happening, there might be an issue with the server that needs to be investigated further.

@jameshartig
Copy link
Contributor Author

Since updating to your new branch I haven't seen any issues. I'm not sure why but I'll continue to monitor.
image

I'll rollback on Monday to confirm it was the branch change that fixed something.

@hongalex
Copy link
Member

That's very curious considering there shouldn't be anything new to that branch besides adding metrics. If it continues to behave well, it would point to the issue being a temporary server side issue, but I'm not that sure.

On the chance that the issue resurfaces, I'm also wondering if you can scale up the number of subscriber clients. I think you currently have 3 and I'm wondering if creating more could alleviate the issue too.

@hongalex hongalex added priority: p2 Moderately-important priority. Fix may not be included in next release. and removed priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. labels Jul 12, 2024
@jameshartig
Copy link
Contributor Author

The issue happened earlier today and here's the trace (abdbc06f25197b8b0015c9853788bd76):

image

Looks like the 3rd modack took too long (longer than 10 minutes) based on these events:
image

But it looks like it was redelivered before that since another instance got the message 10 minutes into the span (which is only 4 minutes after the last modack):
image

That is interesting though because that makes me think that the server never got the 6 minute modack at all and redelivered it 10 minutes after it was initially modacked.

Unfortunately I'm missing some spans because of some internal rate limiting so I can't see the details on the specific modack. Also, I believe that the message was actually delivered more times than shown above because the nack at the end was a result of (another) duplicate delivery.

Here's another case (ba5c1fb1667b36a5002f2afdfecf8dfb):
image

Message was modacked every 10 minutes until 1hr 11min into the span when it should've modacked again but instead it was nacked because the message was received again:
image

There was a CPU usage spike from this go service around the time that it should've modack'd but I don't have any CPU profile to see what the usage was from:
image

The second time though things seemed to go smoothly:
image

So a few things here:

  1. Seems like something happened with a modack that caused some messages to be redelivered. We probably would need to talk to the backend team via a ticket to understand that.
  2. Occasionally modacks are taking longer than 10 minutes for some reason. Looking at the traces for ModifyAckDeadline I don't really see many taking more than 50ms but I'm also limited by the GCP Console showing 1000. If there are 150k messages that need to be modacked and we can do them in batches of 2500 then it should be able to do them relatively quickly, like <5 seconds.
  3. When a message is acked/nacked too closely to when it needs to be modacked that could cause the message to expire on the server if the ack/nack doesn't happen quick enough. Maybe consider changing the logic in the iterator's done to not immediately remove the keepAliveDeadlines?

@jameshartig
Copy link
Contributor Author

Separately, I'm going to be deploying a change that sets GOMAXPROCS=2. We use automaxprocs and this service has a CPU limit of 180% which unfortunately means that GOMAXPROCS=1 previously. I'm curious if spinning up 60+ goroutines for modacks might be overwhelming the scheduler or the single active thread.

@hongalex
Copy link
Member

Yeah the ticket is a good idea since then someone can drill down into the message ID level and see what's happening there.

When a message is acked/nacked too closely to when it needs to be modacked that could cause the message to expire on the server if the ack/nack doesn't happen quick enough. Maybe consider changing the logic in the iterator's done to not immediately remove the keepAliveDeadlines?

Yeah I can see this being a problem. As I understand it, if an ack takes place just before the modack is issued and takes longer than grace period (5 seconds), then the message might expire. One possibility is to extend the gracePeriod, or make it dependent on how long the modack deadlines are, where larger extensions = larger gracePeriod. I prefer this slightly over delaying the removal from keepAliveDeadlines, since it's not clear how long we should keep the message in that map (do we need another gracePeriod for this)?

Related, nacks are unaffected by this, because nacks are modacks, with time set to 0, which tells the server to redeliver the message, behaving exactly as an message expiration.

@jameshartig
Copy link
Contributor Author

One possibility is to extend the gracePeriod, or make it dependent on how long the modack deadlines are, where larger extensions = larger gracePeriod.

That seems like a good idea. I'm going chase down a CPU profile to see if I can understand if that's causing the missed modacks.

The GOMAXPROCS=2 didn't seem to prevent the issue.

@jameshartig
Copy link
Contributor Author

Yeah the ticket is a good idea since then someone can drill down into the message ID level and see what's happening there.

The ticket is 52368550. They provided a list of the calls related to messageID 11704992227715245 which is from the first trace above (abdbc06f25197b8b0015c9853788bd76). The server agrees with the client that there was a modack at 4:26 so I've asked for clarification why it was redelivered then 3 minutes later.

I'm going chase down a CPU profile to see if I can understand if that's causing the missed modacks.

I added back the timer batching to reduce the number of overall timers that the Go runtime has to manage and that seems to have helped eliminate the CPU saturation spikes but it hasn't helped reduce the redeliveries.

I just deployed a change swapping out the otel span batch processor with our own to help reduce the dropped spans to get a clearer picture of the issue when it happens.

@hongalex
Copy link
Member

Just following up here since it's been a while since I responded. I'm following the ticket closely and after the investigations are complete, they'll update on the ticket. If the suggestion is that the modacks are not being issued in time, I'll implement the change described in my previous comment.

Thanks for clarifying that the CPU usage doesn't necessarily correlate with redeliveries.

@jameshartig
Copy link
Contributor Author

jameshartig commented Aug 23, 2024

I heard that there's been some changes on the server-side to improve things. What is your recommendation as far as the max-callbacks branch/code? We've been running this whole time with MaxOutstandingMessages=200000 and MaxCallbacks=500000. But we've also lowered the goroutines to 1 so should we be good to switch back to the v1.42.0 and just leave goroutines at 1 to avoid the issue of the server getting an inflated number of messages to send?

@hongalex
Copy link
Member

We plan to support configuring callbacks as a feature, but I'm working on making sure this is designed consistently with Pub/Sub libraries in other languages. If the server side changes fix the problem of overdelivering and you're ok with setting MaxGoroutines to 1, then yeah you don't need the ability to set number of callbacks.

@jameshartig
Copy link
Contributor Author

We are okay with setting MaxGoroutines to 1 since we don't need more throughput. We also plan on testing exactly-once delivery to see if that will help with the redeliveries as well.

I believe this can be closed unless you want to leave it open to track the NumCallbacks feature.

@hongalex
Copy link
Member

Yeah I'll close this for now. We're tracking callbacks in another location (across languages, so not specific to this Github issue yet). Thanks for your patience as the issue was resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

2 participants