Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): support opentelemetry tracing #10633

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,27 @@ github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkU
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
go.einride.tech/aip v0.68.0 h1:4seM66oLzTpz50u4K1zlJyOXQ3tCzcJN7I22tKkjipw=
go.opentelemetry.io/otel v1.23.1/go.mod h1:Td0134eafDLcTS4y+zQ26GE8u3dEuRBiBCTUIRHaikA=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/bridge/opencensus v0.40.0 h1:pqDiayRhBgoqy1vwnscik+TizcImJ58l053NScJyZso=
go.opentelemetry.io/otel/bridge/opencensus v0.40.0/go.mod h1:1NvVHb6tLTe5A9qCYz+eErW0t8iPn4ZfR6tDKcqlGTM=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0/go.mod h1:U707O40ee1FpQGyhvqnzmCJm1Wh6OX6GGBVn0E6Uyyk=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.44.0/go.mod h1:qcTO4xHAxZLaLxPd60TdE88rxtItPHgHWqOhOGRr0as=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0/go.mod h1:sTt30Evb7hJB/gEk27qLb1+l9n4Tb8HvHkR0Wx3S6CU=
go.opentelemetry.io/otel/metric v1.23.1/go.mod h1:mpG2QPlAfnK8yNhNJAxDZruU9Y1/HubbC+KyH8FaCWI=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/trace v1.23.1/go.mod h1:4IpnpJFwr1mo/6HL8XIPJaE9y0+u1KcVmuW7dwFSVrI=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/telemetry v0.0.0-20240208230135-b75ee8823808/go.mod h1:KG1lNk5ZFNssSZLrpVb4sMXKMpGwGXOxSG3rnu2gZQQ=
golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0=
golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=
golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg=
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
Expand Down
10 changes: 6 additions & 4 deletions pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module cloud.google.com/go/pubsub

go 1.21

toolchain go1.21.0

require (
cloud.google.com/go v0.115.0
cloud.google.com/go/iam v1.1.12
Expand All @@ -11,6 +13,9 @@ require (
github.com/googleapis/gax-go/v2 v2.13.0
go.einride.tech/aip v0.67.1
go.opencensus.io v0.24.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
golang.org/x/oauth2 v0.22.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.6.0
Expand All @@ -34,10 +39,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions pubsub/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
Expand Down
186 changes: 173 additions & 13 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"cloud.google.com/go/pubsub/internal/distribution"
gax "github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -67,6 +71,8 @@ type messageIterator struct {
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
projectID string
subID string
subName string
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
Expand Down Expand Up @@ -106,6 +112,13 @@ type messageIterator struct {
// by the response in StreamingPull and can change mid Receive. Must be accessed
// with the lock held.
enableOrdering bool

// enableTracing enables span creation for this subscriber iterator.
enableTracing bool
// This maps trace ackID (string) to root subscribe spans(trace.Span), used for otel tracing.
// Active ackIDs in this map should also exist 1:1 with ids in keepAliveDeadlines.
// Elements are removed when messages are acked, nacked, or expired in iterator.handleKeepAlives()
activeSpans sync.Map
}

// newMessageIterator starts and returns a new messageIterator.
Expand Down Expand Up @@ -134,12 +147,17 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
receiptTicker := time.NewTicker(100 * time.Millisecond)
cctx, cancel := context.WithCancel(context.Background())
cctx = withSubscriptionKey(cctx, subName)

projectID, subID := parseResourceName(subName)

it := &messageIterator{
ctx: cctx,
cancel: cancel,
ps: ps,
po: po,
subc: subc,
projectID: projectID,
subID: subID,
subName: subName,
kaTick: time.After(keepAlivePeriod),
ackTicker: ackTicker,
Expand Down Expand Up @@ -269,7 +287,9 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
if err != nil {
return nil, it.fail(err)
}

recordStat(it.ctx, PullCount, int64(len(rmsgs)))

now := time.Now()
msgs, err := convertMessages(rmsgs, now, it.done)
if err != nil {
Expand Down Expand Up @@ -309,6 +329,25 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
pendingMessages[ackID] = m
}
}

if it.enableTracing {
ctx := context.Background()
if m.Attributes != nil {
ctx = propagation.TraceContext{}.Extract(ctx, newMessageCarrier(m))
}
attr := getSubscriberOpts(it.projectID, it.subID, m)
_, span := startSpan(ctx, subscribeSpanName, it.subID, attr...)
span.SetAttributes(
attribute.Bool(eosAttribute, it.enableExactlyOnceDelivery),
attribute.String(ackIDAttribute, ackID),
semconv.MessagingBatchMessageCount(len(msgs)),
semconv.CodeFunction("receive"),
)
// Always store the subscribe span, even if sampling isn't enabled.
// This is useful since we need to propagate the sampling flag
// to the callback in Receive, so traces have an unbroken sampling decision.
it.activeSpans.Store(ackID, span)
}
}
deadline := it.ackDeadline()
it.mu.Unlock()
Expand All @@ -328,7 +367,7 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {

// If exactly once is enabled, we should wait until modack responses are successes
// before attempting to process messages.
it.sendModAck(ackIDs, deadline, false)
it.sendModAck(ackIDs, deadline, false, true)
for ackID, ar := range ackIDs {
ctx := context.Background()
_, err := ar.Get(ctx)
Expand Down Expand Up @@ -498,16 +537,16 @@ func (it *messageIterator) sender() {
}
if sendNacks {
// Nack indicated by modifying the deadline to zero.
it.sendModAck(nacks, 0, false)
it.sendModAck(nacks, 0, false, false)
}
if sendModAcks {
it.sendModAck(modAcks, dl, true)
it.sendModAck(modAcks, dl, true, false)
}
if sendPing {
it.pingStream()
}
if sendReceipt {
it.sendModAck(receipts, dl, true)
it.sendModAck(receipts, dl, true, true)
}
}
}
Expand All @@ -520,11 +559,23 @@ func (it *messageIterator) handleKeepAlives() {
now := time.Now()
for id, expiry := range it.keepAliveDeadlines {
if expiry.Before(now) {
// Message is now expired.
// This delete will not result in skipping any map items, as implied by
// the spec at https://golang.org/ref/spec#For_statements, "For
// statements with range clause", note 3, and stated explicitly at
// https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ.
delete(it.keepAliveDeadlines, id)
if it.enableTracing {
// get the parent span context for this ackID for otel tracing.
// This message is now expired, so if the ackID is still valid,
// mark that span as expired and end the span.
s, ok := it.activeSpans.LoadAndDelete(id)
if ok {
span := s.(trace.Span)
span.SetAttributes(attribute.String(resultAttribute, resultExpired))
span.End()
}
}
} else {
// Use a success AckResult since we don't propagate ModAcks back to the user.
it.pendingModAcks[id] = newSuccessAckResult()
Expand All @@ -539,8 +590,8 @@ type retryAckFunc = func(toRetry map[string]*ipubsub.AckResult)

func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) {
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
for ackID := range m {
ackIDs = append(ackIDs, ackID)
}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
Expand All @@ -563,11 +614,10 @@ func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackF
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}

st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
// Retry acks/modacks/nacks in a separate goroutine.
go func() {
retryAckFunc(toRetry)
}()
Expand All @@ -581,14 +631,57 @@ func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackF
// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
// enabled, we'll retry these messages for a short duration in a goroutine.
func (it *messageIterator) sendAck(m map[string]*AckResult) {
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error {
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIDs []string) error {
// For each ackID (message), setup links to the main subscribe span.
// If this is a nack, also remove it from active spans.
// If the ackID is not found, don't create any more spans.
if it.enableTracing {
var links []trace.Link
subscribeSpans := make([]trace.Span, 0, len(ackIDs))
for _, ackID := range ackIDs {
// get the main subscribe span context for this ackID for otel tracing.
s, ok := it.activeSpans.LoadAndDelete(ackID)
if ok {
subscribeSpan := s.(trace.Span)
defer subscribeSpan.End()
defer subscribeSpan.SetAttributes(attribute.String(resultAttribute, resultAcked))
subscribeSpans = append(subscribeSpans, subscribeSpan)
subscribeSpan.AddEvent(eventAckStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(ackIDs))))
defer subscribeSpan.AddEvent(eventAckEnd)
// Only add this link if the span is sampled, otherwise we're creating invalid links.
if subscribeSpan.SpanContext().IsSampled() {
links = append(links, trace.Link{SpanContext: subscribeSpan.SpanContext()})
}
}
}

// Create the single ack span for this request, and for each
// message, add Subscribe<->Ack links.
opts := getCommonOptions(it.projectID, it.subID)
opts = append(opts, trace.WithLinks(links...))
_, ackSpan := startSpan(context.Background(), ackSpanName, it.subID, opts...)
defer ackSpan.End()
ackSpan.SetAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)),
semconv.CodeFunction("sendAck"))
if ackSpan.SpanContext().IsSampled() {
for _, s := range subscribeSpans {
s.AddLink(trace.Link{
SpanContext: ackSpan.SpanContext(),
Attributes: []attribute.KeyValue{
semconv.MessagingOperationName(ackSpanName),
},
})
}
}
}
return it.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: ackIds,
AckIds: ackIDs,
})
}, it.retryAcks, func(ctx context.Context, toSend []string) {
recordStat(it.ctx, AckCount, int64(len(toSend)))
addAcks(toSend)

})
}

Expand All @@ -598,13 +691,80 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
// percentile in order to capture the highest amount of time necessary without
// considering 1% outliers. If the ModAck RPC fails and exactly once delivery is
// enabled, we retry it in a separate goroutine for a short duration.
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid, isReceipt bool) {
deadlineSec := int32(deadline / time.Second)
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error {
isNack := deadline == 0
var spanName, eventStart, eventEnd string
if isNack {
spanName = nackSpanName
eventStart = eventNackStart
eventEnd = eventNackEnd
} else {
spanName = modackSpanName
eventStart = eventModackStart
eventEnd = eventModackEnd
}
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIDs []string) error {
if it.enableTracing {
// For each ackID (message), link back to the main subscribe span.
// If this is a nack, also remove it from active spans.
// If the ackID is not found, don't create any more spans.
links := make([]trace.Link, 0, len(ackIDs))
subscribeSpans := make([]trace.Span, 0, len(ackIDs))
for _, ackID := range ackIDs {
// get the parent span context for this ackID for otel tracing.
var s any
var ok bool
if isNack {
s, ok = it.activeSpans.LoadAndDelete(ackID)
} else {
s, ok = it.activeSpans.Load(ackID)
}
if ok {
subscribeSpan := s.(trace.Span)
subscribeSpans = append(subscribeSpans, subscribeSpan)
if isNack {
defer subscribeSpan.End()
defer subscribeSpan.SetAttributes(attribute.String(resultAttribute, resultNacked))
}
subscribeSpan.AddEvent(eventStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(ackIDs))))
defer subscribeSpan.AddEvent(eventEnd)

// Only add this link if the span is sampled, otherwise we're creating invalid links.
if subscribeSpan.SpanContext().IsSampled() {
links = append(links, trace.Link{SpanContext: subscribeSpan.SpanContext()})
}
}
}

// Create the single modack/nack span for this request, and for each
// message, add Subscribe<->Modack links.
opts := getCommonOptions(it.projectID, it.subID)
opts = append(opts, trace.WithLinks(links...))
_, mSpan := startSpan(context.Background(), spanName, it.subID, opts...)
defer mSpan.End()
if !isNack {
mSpan.SetAttributes(
semconv.MessagingGCPPubsubMessageAckDeadline(int(deadlineSec)),
attribute.Bool(receiptModackAttribute, isReceipt))
}
mSpan.SetAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)),
semconv.CodeFunction("sendModAck"))
if mSpan.SpanContext().IsSampled() {
for _, s := range subscribeSpans {
s.AddLink(trace.Link{
SpanContext: mSpan.SpanContext(),
Attributes: []attribute.KeyValue{
semconv.MessagingOperationName(spanName),
},
})
}
}
}
return it.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
Subscription: it.subName,
AckDeadlineSeconds: deadlineSec,
AckIds: ackIds,
AckIds: ackIDs,
})
}, func(toRetry map[string]*ipubsub.AckResult) {
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
Expand Down
2 changes: 1 addition & 1 deletion pubsub/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func TestIterator_StreamingPullExactlyOnce(t *testing.T) {
func TestAddToDistribution(t *testing.T) {
c, _ := newFake(t)

iter := newMessageIterator(c.subc, "some-sub", &pullOptions{})
iter := newMessageIterator(c.subc, "projects/p/subscriptions/some-sub", &pullOptions{})

// Start with a datapoint that's too small that should be bounded to 10s.
receiveTime := time.Now().Add(time.Duration(-1) * time.Second)
Expand Down
Loading
Loading