Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Consume plane setup #283

Open
wants to merge 16 commits into
base: consume-plane
Choose a base branch
from
3 changes: 3 additions & 0 deletions cmd/service/metro/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/razorpay/metro/internal/config"
"github.com/razorpay/metro/pkg/logger"
"github.com/razorpay/metro/service"
consumeplane "github.com/razorpay/metro/service/consume-plane"
openapiserver "github.com/razorpay/metro/service/openapi-server"
"github.com/razorpay/metro/service/web"
"github.com/razorpay/metro/service/worker"
Expand All @@ -29,6 +30,8 @@ func NewComponent(component string, cfg config.Config) (*Component, error) {
svc, err = worker.NewService(&cfg.Worker, &cfg.Registry, &cfg.Cache)
case OpenAPIServer:
svc, err = openapiserver.NewService(&cfg.OpenAPIServer)
case ConsumePlane:
svc, err = consumeplane.NewService(&cfg.ConsumePlane, &cfg.Registry)
}

if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/service/metro/metro.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ const (
Worker = "worker"
// OpenAPIServer to server swagger docs
OpenAPIServer = "openapi-server"
// ConsumePlane component serves as a broker interface
ConsumePlane = "consume-plane"
)

var validComponents = []string{Web, Worker, OpenAPIServer}
var validComponents = []string{Web, Worker, OpenAPIServer, ConsumePlane}
var component *Component

// isValidComponent validates if the input component is a valid metro component
Expand Down
27 changes: 27 additions & 0 deletions config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
errorLevel = 1

[web]
replicaCount = 1
consumePlaneDeployment = "metro.svc.cluster.local"
[web.broker]
variant = "kafka"
[web.broker.brokerconfig]
Expand All @@ -28,6 +30,31 @@
GrpcServerAddress = "0.0.0.0:8081"
HttpServerAddress = "0.0.0.0:8082"
InternalHttpServerAddress = "0.0.0.0:9000"
[web.httpclientconfig]
connectTimeoutMs = 2000
connKeepAliveMs = 0
expectContinueTimeoutMs = 0
idleConnTimeoutMs = 60000
maxAllIdleConns = 1000
maxHostIdleConns = 1000
responseHeaderTimeoutMs = 25000
tlsHandshakeTimeoutMs = 2000


[consumePlane]
replicaCount = 1
ordinalID = 0
[consumePlane.broker]
variant = "kafka"
[consumePlane.broker.brokerconfig]
brokers = ["localhost:9092"]
enableTLS = false
[consumePlane.broker.brokerconfig.consumePlane]
[consumePlane.interfaces]
[consumePlane.interfaces.api]
GrpcServerAddress = "0.0.0.0:8088"
HttpServerAddress = "0.0.0.0:8089"
InternalHttpServerAddress = "0.0.0.0:9003"

[worker]
[worker.broker]
Expand Down
17 changes: 17 additions & 0 deletions config/dev_docker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
errorLevel = 1

[web]
replicaCount = 1
consumePlaneDeployment = "metro.svc.cluster.local"
[web.broker]
variant = "kafka"
[web.broker.brokerconfig]
Expand All @@ -28,6 +30,21 @@
HttpServerAddress = "0.0.0.0:8082"
InternalHttpServerAddress = "0.0.0.0:9000"

[consumePlane]
replicaCount = 1
ordinalID = 0
[consumePlane.broker]
variant = "kafka"
[consumePlane.broker.brokerconfig]
brokers = ["localhost:9092"]
enableTLS = false
[consumePlane.broker.brokerconfig.consumePlane]
[consumePlane.interfaces]
[consumePlane.interfaces.api]
GrpcServerAddress = "0.0.0.0:8085"
HttpServerAddress = "0.0.0.0:8086"
InternalHttpServerAddress = "0.0.0.0:9003"

[worker]
[worker.broker]
variant = "kafka"
Expand Down
2 changes: 2 additions & 0 deletions config/func.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
errorLevel = 1

[web]
replicaCount = 1
consumePlaneDeployment = "int.perf.razorpay.in"
[web.broker]
variant = "kafka"
[web.broker.brokerconfig]
Expand Down
2 changes: 2 additions & 0 deletions config/perf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
errorLevel = 1

[web]
replicaCount = 1
consumePlaneDeployment = "int.perf.razorpay.in"
[web.broker]
variant = "kafka"
[web.broker.brokerconfig]
Expand Down
2 changes: 2 additions & 0 deletions config/stage.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
errorLevel = 1

[web]
replicaCount = 1
consumePlaneDeployment = "int.stage.razorpay.in"
Comment on lines +20 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no ConsumerPlane entry in stage, perf, func files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this.

[web.broker]
variant = "kafka"
[web.broker.brokerconfig]
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/dvsekhvalnov/jose2go v0.0.0-20201001154944-b09cfaf05951 // indirect
github.com/fatih/color v1.10.0 // indirect
github.com/getsentry/sentry-go v0.11.0
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/razorpay/metro/pkg/monitoring/sentry"
"github.com/razorpay/metro/pkg/registry"
"github.com/razorpay/metro/pkg/tracing"
consumeplane "github.com/razorpay/metro/service/consume-plane"
openapiserver "github.com/razorpay/metro/service/openapi-server"
"github.com/razorpay/metro/service/web"
worker "github.com/razorpay/metro/service/worker"
Expand All @@ -19,6 +20,7 @@ type Config struct {
Sentry sentry.Config
Web web.Config
Worker worker.Config
ConsumePlane consumeplane.Config
Registry registry.Config
Cache cache.Config
OpenAPIServer openapiserver.Config
Expand Down
143 changes: 143 additions & 0 deletions internal/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package consumer

import (
"context"
"fmt"
"io"
"time"

"github.com/razorpay/metro/internal/subscriber"
"github.com/razorpay/metro/internal/subscription"
"github.com/razorpay/metro/pkg/logger"
"github.com/razorpay/metro/pkg/messagebroker"
metrov1 "github.com/razorpay/metro/rpc/proto/v1"
)

// IConsumer defines the set of methods to access a consumer object
type IConsumer interface {
Run() error
Acknowledge(ctx context.Context, req *ParsedAcknowledgeRequest)
ModifyAckDeadline(ctx context.Context, req *ParsedModifyAckDeadlineRequest)
Fetch(ctx context.Context, messageCount int) (*metrov1.PullResponse, error)
}

// Consumer entity represents a single subscription-partition specific client
type Consumer struct {
computedHash int
subscriberID string
subscription *subscription.Model
subscriberCore subscriber.ICore
subscriptionSubscriber subscriber.ISubscriber
ctx context.Context
errChan chan error
}

// DefaultNumMessageCount ...
var DefaultNumMessageCount int32 = 10

// NewConsumer intializes a consumer entity
func NewConsumer(ctx context.Context, computedHash int, subscriberID string, subscription *subscription.Model, subCore subscriber.ICore, subs subscriber.ISubscriber) *Consumer {
con := &Consumer{
ctx: ctx,
computedHash: computedHash,
subscriberID: subscriberID,
subscription: subscription,
subscriptionSubscriber: subs,
errChan: make(chan error),
}
Comment on lines +40 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you have not set subCore in the intitiated Consumer Instance?

return con
}

// Fetch retrieves messages for a given consumer, it takes ackDeadline, retry and maxMessages into account.
func (c *Consumer) Fetch(ctx context.Context, messageCount int) (*metrov1.PullResponse, error) {
respChan := make(chan *metrov1.PullResponse)
defer close(respChan)
c.subscriptionSubscriber.GetRequestChannel() <- (&subscriber.PullRequest{
MaxNumOfMessages: int32(messageCount),
RespChan: respChan,
}).WithContext(ctx)

select {
case resp := <-respChan:
return resp, nil
case <-ctx.Done():
return &metrov1.PullResponse{}, ctx.Err()
}

}

// Acknowledge send an ACK for a set of messages
func (c *Consumer) Acknowledge(ctx context.Context, ackMsgs []*subscriber.AckMessage) {
for _, ackMsg := range ackMsgs {
c.subscriptionSubscriber.GetAckChannel() <- ackMsg.WithContext(ctx)
}
}

// ModifyAckDeadline allows modification of Ack deadline for a messages(s).
// Deadline of 0 indicates a Nack operation.
func (c *Consumer) ModifyAckDeadline(ctx context.Context, mackMsgs []*subscriber.AckMessage) {
for _, modAckMsg := range mackMsgs {
modAckReq := subscriber.NewModAckMessage(modAckMsg, modAckMsg.Deadline)
modAckReq = modAckReq.WithContext(ctx)
Comment on lines +80 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can merge into single statement?
modAckReq := subscriber.NewModAckMessage(modAckMsg, modAckMsg.Deadline).WithContext(ctx)

c.subscriptionSubscriber.GetModAckChannel() <- modAckReq
}
}

// Run ensures that the lifecycle of a consumer is instantiated.
func (c *Consumer) Run() error {
// stream ack timeout
streamAckDeadlineSecs := int32(30) // init with some sane value
timeout := time.NewTicker(time.Duration(streamAckDeadlineSecs) * time.Second)
for {
select {
case <-c.ctx.Done():
logger.Ctx(c.ctx).Infow("stopping subscriber from <-s.ctx.Done()")
c.stop()
return c.ctx.Err()
case <-timeout.C:
logger.Ctx(c.ctx).Infow("stopping subscriber from <-timeout.C")
c.stop()
return fmt.Errorf("stream: ack deadline seconds crossed")
case err := <-c.errChan:
logger.Ctx(c.ctx).Infow("stopping subscriber from err := <-s.errChan")
c.stop()
if err == io.EOF {
// return will close stream from server side
logger.Ctx(c.ctx).Errorw("stream: EOF received from client")
} else if err != nil {
logger.Ctx(c.ctx).Errorw("stream: error received from client", "error", err.Error())
}
return nil
case err := <-c.subscriptionSubscriber.GetErrorChannel():
// streamManagerSubscriberErrors.WithLabelValues(env, s.subscriberID, s.subscriptionSubscriber.GetSubscriptionName(), err.Error()).Inc()
if messagebroker.IsErrorRecoverable(err) {
// no need to stop the subscriber in such cases. just log and return
logger.Ctx(c.ctx).Errorw("subscriber: got recoverable error", err.Error())
return nil
}

logger.Ctx(c.ctx).Errorw("subscriber: got un-recoverable error", "error", err.Error())
logger.Ctx(c.ctx).Infow("stopping subscriber from err := <-s.subscriptionSubscriber.GetErrorChannel()")
c.stop()
return err

default:
timeout.Reset(time.Duration(streamAckDeadlineSecs) * time.Second)
}
}
}

func (c *Consumer) stop() {
c.subscriptionSubscriber.Stop()
c.closeSubscriberChannels()

logger.Ctx(c.ctx).Infow("stopped subscriber...", "subscriberId", c.subscriberID)

}

func (c *Consumer) closeSubscriberChannels() {
close(c.errChan)
close(c.subscriptionSubscriber.GetRequestChannel())
close(c.subscriptionSubscriber.GetAckChannel())
close(c.subscriptionSubscriber.GetModAckChannel())
}
Loading