-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Consume plane setup #283
base: consume-plane
Are you sure you want to change the base?
Conversation
vnktram
commented
Dec 13, 2021
•
edited
Loading
edited
- Create consume plane lifecycle manager
- Consumer setup
- Fetch, Ack, ModAck endpoints via consume plane
- Consume plane deployment Consume/deployment #284
- Integrate consume plane with web pods Consume/web integration #293
- Consumer update based on registry events/updates.
- Unit tests. (TBD as part of a dedicated PR)
- Setup prometheus metrics scraping
Codecov Report
@@ Coverage Diff @@
## consume-plane #283 +/- ##
=================================================
- Coverage 45.63% 41.39% -4.24%
=================================================
Files 110 117 +7
Lines 5125 5587 +462
=================================================
- Hits 2339 2313 -26
- Misses 2611 3105 +494
+ Partials 175 169 -6
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
* Integrate consume plane with web pods * Remove unused file * Fix lint issues * Proto commit
replicaCount = 1 | ||
consumePlaneDeployment = "int.stage.razorpay.in" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why no ConsumerPlane entry in stage, perf, func files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix this.
con := &Consumer{ | ||
ctx: ctx, | ||
computedHash: computedHash, | ||
subscriberID: subscriberID, | ||
subscription: subscription, | ||
subscriptionSubscriber: subs, | ||
errChan: make(chan error), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason you have not set subCore in the intitiated Consumer Instance?
modAckReq := subscriber.NewModAckMessage(modAckMsg, modAckMsg.Deadline) | ||
modAckReq = modAckReq.WithContext(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can merge into single statement?
modAckReq := subscriber.NewModAckMessage(modAckMsg, modAckMsg.Deadline).WithContext(ctx)
return &Consumer{ | ||
ctx: m.ctx, | ||
computedHash: hash, | ||
subscriberID: subscriberID, | ||
subscription: sub, | ||
subscriberCore: m.subscriberCore, | ||
subscriptionSubscriber: subscriber, | ||
}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use NewConsumer func instead to keep the initiation from single point.
parsedReq.Subscription = req.Subscription | ||
if req.AckIds != nil && len(req.AckIds) > 0 { | ||
ackMessages := make([]*subscriber.AckMessage, 0) | ||
parsedReq.AckIDs = req.AckIds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repeated assignment. Remove from here. Its been handled below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the result of the merge. WIll fix it.
ackMessages = append(ackMessages, ackMessage) | ||
modifyDeadlineMsgIdsWithSecs[ackMessage.MessageID] = req.AckDeadlineSeconds | ||
} | ||
parsedReq.AckIDs = req.AckIds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repeated assignment. Remove from one place
func (c *Core) NewOpinionatedSubscriber(ctx context.Context, | ||
subscriberID string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment explaining in brief what is an OpinionatedSubscriber
?