diff --git a/pkg/kafka/producer.go b/pkg/kafka/producer.go index 9c66cd935..8ca836a7b 100644 --- a/pkg/kafka/producer.go +++ b/pkg/kafka/producer.go @@ -5,23 +5,20 @@ import ( "fmt" "github.com/Shopify/sarama" "github.com/linkedin/goavro/v2" - "github.com/practo/klog/v2" - "github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry" - "strings" "time" ) type AvroProducer struct { producer sarama.SyncProducer - registry schemaregistry.SchemaRegistry } func NewAvroProducer( brokers []string, kafkaVersion string, - schemaRegistryURL string, configTLS TLSConfig, -) (*AvroProducer, error) { +) ( + *AvroProducer, error, +) { version, err := sarama.ParseKafkaVersion(kafkaVersion) if err != nil { return nil, fmt.Errorf("Error parsing Kafka version: %v\n", err) @@ -52,36 +49,9 @@ func NewAvroProducer( return &AvroProducer{ producer: producer, - registry: schemaregistry.NewRegistry(schemaRegistryURL), }, nil } -// CreateSchema creates schema if it does not exist -func (c *AvroProducer) CreateSchema( - topic string, scheme string) (int, bool, error) { - - created := false - - schemeStr := strings.ReplaceAll(scheme, "\n", "") - schemeStr = strings.ReplaceAll(schemeStr, " ", "") - - schema, err := schemaregistry.GetLatestSchemaWithRetry( - c.registry, topic, false, 2, - ) - if schema == nil || schema.Schema() != schemeStr { - klog.V(2).Infof("%s: Creating schema for the topic", topic) - schema, err = c.registry.CreateSchema( - topic, scheme, schemaregistry.Avro, false, - ) - if err != nil { - return 0, false, err - } - created = true - } - - return schema.ID(), created, nil -} - func (c *AvroProducer) Add( topic string, schema string, diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index 184a5176c..910ee8157 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -12,6 +12,7 @@ import ( "github.com/practo/tipoca-stream/redshiftsink/pkg/redshift" loader "github.com/practo/tipoca-stream/redshiftsink/pkg/redshiftloader" "github.com/practo/tipoca-stream/redshiftsink/pkg/s3sink" + "github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry" "github.com/practo/tipoca-stream/redshiftsink/pkg/serializer" "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer" "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/debezium" @@ -53,8 +54,12 @@ type batchProcessor struct { maxConcurrency int - // loaderSchemaID informations for the loader topic + // loaderSchemaID stores the schema ID for the loader topic-value loaderSchemaID int + + // schemaIDKey stores the schema ID for the batcher topic-key + // loader would use these to fetch primaryKeys for the table + schemaIDKey int } func newBatchProcessor( @@ -84,7 +89,6 @@ func newBatchProcessor( signaler, err := kafka.NewAvroProducer( strings.Split(kafkaConfig.Brokers, ","), kafkaConfig.Version, - viper.GetString("schemaRegistryURL"), kafkaConfig.TLSConfig, ) if err != nil { @@ -101,13 +105,35 @@ func newBatchProcessor( ) } - loaderSchemaID, _, err := signaler.CreateSchema( + registry := schemaregistry.NewRegistry(viper.GetString("schemaRegistryURL")) + // creates the loader schema for value if not present + loaderSchemaID, _, err := schemaregistry.CreateSchema( + registry, kafkaLoaderTopicPrefix+topic, loader.JobAvroSchema, + false, // key is false means its for the value + ) + if err != nil { + return nil, fmt.Errorf( + "Error creating schema for topic: %s, err: %v", + kafkaLoaderTopicPrefix+topic, err) + } + schemaKey, err := schemaregistry.GetLatestSchemaWithRetry( + registry, + topic, + true, // key is true means its for the key + 2, ) if err != nil { return nil, fmt.Errorf( - "Error creating schema for topic: %s, err: %v", topic, err) + "Error fetching schema for topic-key for topic: %s, err: %v", + topic, err) + } + if schemaKey == nil { + return nil, fmt.Errorf( + "Error since schema came as nil for topic-key for topic: %s", + topic, + ) } klog.V(2).Infof("%s: autoCommit: %v", topic, saramaConfig.AutoCommit) @@ -128,6 +154,7 @@ func newBatchProcessor( signaler: signaler, maxConcurrency: maxConcurrency, loaderSchemaID: loaderSchemaID, + schemaIDKey: schemaKey.ID(), }, nil } @@ -228,7 +255,8 @@ func (b *batchProcessor) signalLoad(resp *response) error { resp.endOffset, ",", b.s3sink.GetKeyURI(resp.s3Key), - resp.batchSchemaID, // schema of upstream topic + resp.batchSchemaID, // schema of upstream topic's value + b.schemaIDKey, // schema of upstream topic's key resp.maskSchema, resp.skipMerge, resp.bytesProcessed, @@ -288,6 +316,7 @@ func (b *batchProcessor) processMessage( r, err := b.schemaTransformer.TransformValue( b.topic, resp.batchSchemaID, + b.schemaIDKey, resp.maskSchema, ) if err != nil { diff --git a/pkg/redshiftloader/job.go b/pkg/redshiftloader/job.go index b763539f1..c0afe7809 100644 --- a/pkg/redshiftloader/job.go +++ b/pkg/redshiftloader/job.go @@ -16,6 +16,7 @@ var JobAvroSchema string = `{ {"name": "csvDialect", "type": "string"}, {"name": "s3Path", "type": "string"}, {"name": "schemaId", "type": "int"}, + {"name": "schemaIdKey", "type": "int", "default": -1}, {"name": "maskSchema", "type": "string"}, {"name": "skipMerge", "type": "string", "default": ""}, {"name": "batchBytes", "type": "long", "default": 0} @@ -23,12 +24,13 @@ var JobAvroSchema string = `{ }` type Job struct { - UpstreamTopic string `json:"upstreamTopic"` + UpstreamTopic string `json:"upstreamTopic"` // batcher topic StartOffset int64 `json:"startOffset"` EndOffset int64 `json:"endOffset"` CsvDialect string `json:"csvDialect"` S3Path string `json:"s3Path"` - SchemaId int `json:"schemaId"` // schema id of debezium event + SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic) + SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic) MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"` SkipMerge bool `json:"skipMerge"` // to load using merge strategy or directy COPY BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch @@ -36,7 +38,7 @@ type Job struct { func NewJob( upstreamTopic string, startOffset int64, endOffset int64, - csvDialect string, s3Path string, schemaId int, + csvDialect string, s3Path string, schemaId int, schemaIdKey int, maskSchema map[string]serializer.MaskInfo, skipMerge bool, batchBytes int64) Job { @@ -47,6 +49,7 @@ func NewJob( CsvDialect: csvDialect, S3Path: s3Path, SchemaId: schemaId, + SchemaIdKey: schemaIdKey, MaskSchema: maskSchema, SkipMerge: skipMerge, BatchBytes: batchBytes, @@ -84,6 +87,14 @@ func StringMapToJob(data map[string]interface{}) Job { } else if value, ok := v.(int); ok { job.SchemaId = value } + case "schemaIdKey": + if value, ok := v.(int32); ok { + job.SchemaIdKey = int(value) + } else if value, ok := v.(int); ok { + job.SchemaIdKey = value + } else { + job.SchemaIdKey = -1 // backward compatibility + } case "skipMerge": if value, ok := v.(string); ok { if value == "true" { @@ -105,7 +116,11 @@ func StringMapToJob(data map[string]interface{}) Job { job.BatchBytes = 0 } } + } + // backward compatibility + if job.SchemaIdKey == 0 { + job.SchemaIdKey = -1 } return job @@ -198,6 +213,7 @@ func (c Job) ToStringMap() map[string]interface{} { "csvDialect": c.CsvDialect, "s3Path": c.S3Path, "schemaId": c.SchemaId, + "schemaIdKey": c.SchemaIdKey, "skipMerge": skipMerge, "maskSchema": ToSchemaString(c.MaskSchema), "batchBytes": c.BatchBytes, diff --git a/pkg/redshiftloader/job_test.go b/pkg/redshiftloader/job_test.go index f13568d90..092a498d2 100644 --- a/pkg/redshiftloader/job_test.go +++ b/pkg/redshiftloader/job_test.go @@ -20,6 +20,7 @@ func TestToStringMap(t *testing.T) { ",", "s3path", 1, + 2, maskSchema, false, 10, diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index bbefa6d96..3b882c47f 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -91,7 +91,7 @@ func newLoadProcessor( partition int32, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift, -) serializer.MessageBatchSyncProcessor { +) (serializer.MessageBatchSyncProcessor, error) { sink, err := s3sink.NewS3Sink( viper.GetString("s3sink.accessKeyId"), viper.GetString("s3sink.secretAccessKey"), @@ -99,7 +99,7 @@ func newLoadProcessor( viper.GetString("s3sink.bucket"), ) if err != nil { - klog.Fatalf("Error creating s3 client: %v\n", err) + return nil, fmt.Errorf("Error creating s3 client: %v\n", err) } klog.V(3).Infof("%s: auto-commit: %v", topic, saramaConfig.AutoCommit) @@ -119,7 +119,7 @@ func newLoadProcessor( targetTable: nil, tableSuffix: viper.GetString("redshift.tableSuffix"), redshiftStats: viper.GetBool("redshift.stats"), - } + }, nil } func (b *loadProcessor) ctxCancelled(ctx context.Context) error { @@ -425,8 +425,11 @@ func (b *loadProcessor) merge(ctx context.Context) error { // batch messages. // this also intializes b.stagingTable func (b *loadProcessor) createStagingTable( - ctx context.Context, schemaId int, inputTable redshift.Table) error { - + ctx context.Context, + schemaId int, + schemaIdKey int, + inputTable redshift.Table, +) error { b.stagingTable = redshift.NewTable(inputTable) b.stagingTable.Name = b.stagingTable.Name + "_staged" @@ -449,8 +452,12 @@ func (b *loadProcessor) createStagingTable( return fmt.Errorf("Error dropping staging table: %v\n", err) } - primaryKeys, err := b.schemaTransformer.TransformKey( - b.upstreamTopic) + var primaryKeys []string + if schemaIdKey == -1 || schemaIdKey == 0 { // Deprecated as below is expensive and does not use cache + primaryKeys, err = b.schemaTransformer.TransformKey(b.upstreamTopic) + } else { // below is the new faster way to get primary keys + primaryKeys, err = b.schemaTransformer.PrimaryKeys(schemaIdKey) + } if err != nil { return fmt.Errorf("Error getting primarykey for: %s, err: %v\n", b.topic, err) } @@ -622,8 +629,8 @@ func (b *loadProcessor) processBatch( } var inputTable redshift.Table - var schemaId int var err error + var schemaId, schemaIdKey int b.stagingTable = nil b.targetTable = nil b.upstreamTopic = "" @@ -637,6 +644,7 @@ func (b *loadProcessor) processBatch( default: job := StringMapToJob(message.Value.(map[string]interface{})) schemaId = job.SchemaId + schemaIdKey = job.SchemaIdKey b.batchEndOffset = message.Offset bytesProcessed += job.BatchBytes @@ -651,6 +659,7 @@ func (b *loadProcessor) processBatch( resp, err := b.schemaTransformer.TransformValue( b.upstreamTopic, schemaId, + schemaIdKey, job.MaskSchema, ) if err != nil { @@ -699,7 +708,7 @@ func (b *loadProcessor) processBatch( // load klog.V(2).Infof("%s, load staging\n", b.topic) - err = b.createStagingTable(ctx, schemaId, inputTable) + err = b.createStagingTable(ctx, schemaId, schemaIdKey, inputTable) if err != nil { return bytesProcessed, err } diff --git a/pkg/redshiftloader/loader_handler.go b/pkg/redshiftloader/loader_handler.go index d1444b705..77b5859c0 100644 --- a/pkg/redshiftloader/loader_handler.go +++ b/pkg/redshiftloader/loader_handler.go @@ -117,13 +117,18 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, var lastSchemaId *int var err error - processor := newLoadProcessor( + processor, err := newLoadProcessor( h.consumerGroupID, claim.Topic(), claim.Partition(), h.saramaConfig, h.redshifter, ) + if err != nil { + return fmt.Errorf( + "Error making the load processor for topic: %s, err: %v", + claim.Topic(), err) + } maxBufSize := h.maxSize if h.maxBytesPerBatch != nil { maxBufSize = serializer.DefaultMessageBufferSize diff --git a/pkg/schemaregistry/schemaregistry.go b/pkg/schemaregistry/schemaregistry.go index 071fe1231..ccad44530 100644 --- a/pkg/schemaregistry/schemaregistry.go +++ b/pkg/schemaregistry/schemaregistry.go @@ -6,6 +6,7 @@ import ( "github.com/practo/klog/v2" "github.com/riferrei/srclient" "math/rand" + "strings" "time" ) @@ -53,10 +54,29 @@ func NewRegistry(url string) SchemaRegistry { } } +func toSchema(cSchema *srclient.Schema) *Schema { + return &Schema{ + id: cSchema.ID(), + schema: cSchema.Schema(), + version: cSchema.Version(), + codec: cSchema.Codec(), + } +} + +func tocSchemaType(schemaType SchemaType) srclient.SchemaType { + switch schemaType { + case Avro: + return srclient.Avro + } + + return "" +} + type cSchemaRegistry struct { client *srclient.SchemaRegistryClient } +// GetSchema returns the cached response if cache hit func (c *cSchemaRegistry) GetSchema(schemaID int) (*Schema, error) { cSchema, err := c.client.GetSchema(schemaID) if err != nil { @@ -66,6 +86,7 @@ func (c *cSchemaRegistry) GetSchema(schemaID int) (*Schema, error) { return toSchema(cSchema), nil } +// GetLatestSchema always makes a call to registry everytime func (c *cSchemaRegistry) GetLatestSchema( subject string, key bool) (*Schema, error) { cSchema, err := c.client.GetLatestSchema(subject, key) @@ -76,6 +97,7 @@ func (c *cSchemaRegistry) GetLatestSchema( return toSchema(cSchema), nil } +// CreateSchema creates schema in registry if the schema if not present func (c *cSchemaRegistry) CreateSchema( subject string, schema string, schemaType SchemaType, key bool) (*Schema, error) { @@ -89,24 +111,7 @@ func (c *cSchemaRegistry) CreateSchema( return toSchema(cSchema), nil } -func toSchema(cSchema *srclient.Schema) *Schema { - return &Schema{ - id: cSchema.ID(), - schema: cSchema.Schema(), - version: cSchema.Version(), - codec: cSchema.Codec(), - } -} - -func tocSchemaType(schemaType SchemaType) srclient.SchemaType { - switch schemaType { - case Avro: - return srclient.Avro - } - - return "" -} - +// GetSchemaWithRetry gets the schema from registry, it gives cached response func GetSchemaWithRetry( registry SchemaRegistry, schemaId int, @@ -132,6 +137,7 @@ func GetSchemaWithRetry( } } +// GetLatestSchemaWithRetry gets the schema from registry everytime func GetLatestSchemaWithRetry( registry SchemaRegistry, topic string, @@ -157,3 +163,26 @@ func GetLatestSchemaWithRetry( time.Sleep(time.Duration(sleepFor) * time.Second) } } + +// CreateSchema creates schema for both key and value of the topic +func CreateSchema( + registry SchemaRegistry, + topic string, + scheme string, + key bool, +) (int, bool, error) { + created := false + schemeStr := strings.ReplaceAll(scheme, "\n", "") + schemeStr = strings.ReplaceAll(schemeStr, " ", "") + schema, err := GetLatestSchemaWithRetry(registry, topic, key, 2) + if schema == nil || schema.Schema() != schemeStr { + klog.V(2).Infof("%s: Creating schema for the topic", topic) + schema, err = registry.CreateSchema(topic, scheme, Avro, key) + if err != nil { + return 0, false, err + } + created = true + } + + return schema.ID(), created, nil +} diff --git a/pkg/transformer/debezium/schema.go b/pkg/transformer/debezium/schema.go index bab7ae15c..3fbf23267 100644 --- a/pkg/transformer/debezium/schema.go +++ b/pkg/transformer/debezium/schema.go @@ -239,6 +239,8 @@ type schemaTransformer struct { registry schemaregistry.SchemaRegistry } +// TransformKey is deprecated as it makes expensive GetLatestSchemaWithRetry calls +// Use PrimaryKeys instead func (c *schemaTransformer) TransformKey(topic string) ([]string, error) { s, err := schemaregistry.GetLatestSchemaWithRetry(c.registry, topic, true, 10) if err != nil { @@ -248,6 +250,15 @@ func (c *schemaTransformer) TransformKey(topic string) ([]string, error) { return c.transformSchemaKey(s.Schema()) } +func (c *schemaTransformer) PrimaryKeys(schemaID int) ([]string, error) { + s, err := schemaregistry.GetSchemaWithRetry(c.registry, schemaID, 3) + if err != nil { + return []string{}, err + } + + return c.transformSchemaKey(s.Schema()) +} + func (c *schemaTransformer) transformSchemaKey( schema string) ([]string, error) { @@ -294,7 +305,10 @@ func isPrimaryKey(columnName string, primaryKeys []string) bool { return false } -func (c *schemaTransformer) TransformValue(topic string, schemaId int, +func (c *schemaTransformer) TransformValue( + topic string, + schemaId int, + schemaIdKey int, maskSchema map[string]serializer.MaskInfo) (interface{}, error) { s, err := schemaregistry.GetSchemaWithRetry(c.registry, schemaId, 10) @@ -302,7 +316,12 @@ func (c *schemaTransformer) TransformValue(topic string, schemaId int, return nil, err } - primaryKeys, err := c.TransformKey(topic) + var primaryKeys []string + if schemaIdKey == -1 || schemaIdKey == 0 { // Deprecated as below is expensive and does not use cache + primaryKeys, err = c.TransformKey(topic) + } else { // below is the new faster way to get primary keys + primaryKeys, err = c.PrimaryKeys(schemaIdKey) + } if err != nil { return nil, err } diff --git a/pkg/transformer/transformer.go b/pkg/transformer/transformer.go index 9077132a5..a42d6ee34 100644 --- a/pkg/transformer/transformer.go +++ b/pkg/transformer/transformer.go @@ -21,13 +21,20 @@ type MessageTransformer interface { } type SchemaTransformer interface { - // TransformKey transforms the topic schema into name of the primary - // key and its type. - TransformKey(topic string) ([]string, error) + // PrimaryKeys returns the list of primary keys for the schema + PrimaryKeys(schemaID int) ([]string, error) // Transform value transforms the schemaId for various use cases. // it uses maskSchema to change the type of the schema datatypes if required - TransformValue(topic string, schemaId int, + TransformValue( + topic string, + schemaId int, + schemaIdKey int, maskSchema map[string]serializer.MaskInfo) (interface{}, error) + + // Deprecated: + // TransformKey transforms the topic schema into name of the primary + // key and its type. + TransformKey(topic string) ([]string, error) } // ParseTopic breaks down the topic string into server, database, table