Skip to content

Commit

Permalink
Feature/header validation (#1)
Browse files Browse the repository at this point in the history
feat: custom header validation
  • Loading branch information
Simun17 authored Nov 6, 2024
1 parent 8984666 commit 833192b
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 41 deletions.
86 changes: 70 additions & 16 deletions validator/internal/centralconsumer/centralconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
"encoding/json"
"strconv"

"github.com/dataphos/lib-brokers/pkg/broker"
"github.com/dataphos/lib-logger/logger"
"github.com/dataphos/schema-registry-validator/internal/errtemplates"
"github.com/dataphos/schema-registry-validator/internal/janitor"
"github.com/dataphos/schema-registry-validator/internal/registry"
"github.com/dataphos/lib-brokers/pkg/broker"
"github.com/dataphos/lib-logger/logger"
jsoninternal "github.com/dataphos/schema-registry-validator/internal/validator/json"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -63,18 +64,19 @@ type VersionDetails struct {

// CentralConsumer models the central consumer process.
type CentralConsumer struct {
Registry registry.SchemaRegistry
Validators janitor.Validators
Router janitor.Router
Publisher broker.Publisher
topicIDs Topics
topics map[string]broker.Topic
registrySem chan struct{}
validatorsSem chan struct{}
log logger.Log
mode Mode
schema Schema
encryptionKey string
Registry registry.SchemaRegistry
Validators janitor.Validators
Router janitor.Router
Publisher broker.Publisher
topicIDs Topics
topics map[string]broker.Topic
registrySem chan struct{}
validatorsSem chan struct{}
log logger.Log
mode Mode
schema Schema
encryptionKey string
validateHeaders bool
}

// Settings holds settings concerning the concurrency limits for various stages of the central consumer pipeline.
Expand All @@ -84,6 +86,9 @@ type Settings struct {

// NumInferrers defines the maximum amount of inflight destination topic inference jobs (validation and routing).
NumInferrers int

// ValidateHeaders defines if the messages' headers will be validated
ValidateHeaders bool
}

// Topics defines the standard destination topics, based on validation results.
Expand Down Expand Up @@ -122,6 +127,13 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator
if settings.NumInferrers > 0 {
validatorsSem = make(chan struct{}, settings.NumInferrers)
}
if settings.ValidateHeaders == true {

Check failure on line 130 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

S1002: should omit comparison to bool constant, can be simplified to `settings.ValidateHeaders` (gosimple)
_, ok := validators["json"]
if !ok {
// if json validation is turned off, this version of json validator is used by default for validating message header
validators["json"] = jsoninternal.New()
}
}

var schemaReturned []byte
if mode == OneCCPerTopic {
Expand Down Expand Up @@ -171,7 +183,8 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator
},
Specification: schemaVersion.Specification,
},
encryptionKey: encryptionKey,
encryptionKey: encryptionKey,
validateHeaders: settings.ValidateHeaders,
}, nil
}

Expand Down Expand Up @@ -286,21 +299,62 @@ func (cc *CentralConsumer) AsProcessor() *janitor.Processor {

func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) (janitor.MessageTopicPair, error) {
var (
schema []byte
messageSchemaPair janitor.MessageSchemaPair
messageTopicPair janitor.MessageTopicPair
specificSchemaVersion VersionDetails
err error
encryptedMessageData []byte
)

// header validation is turned on if a message specifies so in the header OR if validateHeaders flag is set
// on the Validator level
if message.RawAttributes[janitor.HeaderValidation] == "true" ||
(cc.validateHeaders == true && message.RawAttributes[janitor.HeaderValidation] != "false") {

Check failure on line 313 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

S1002: should omit comparison to bool constant, can be simplified to `cc.validateHeaders` (gosimple)
_, ok := cc.Validators["json"]
// it is possible json validator isn't initialized by this point so we are checking it just in case
if !ok {
cc.Validators["json"] = jsoninternal.New()
}
var (
headerId string
headerVersion string
headerSchema []byte
)

headerId, headerVersion, err = janitor.GetHeaderIdAndVersion(message)
if err != nil {
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err
}
acquireIfSet(cc.registrySem)
headerSchema, err = janitor.CollectSchema(ctx, headerId, headerVersion, cc.Registry)
if err != nil {
setMessageRawAttributes(message, err, "Wrong compile")
releaseIfSet(cc.registrySem)
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err
}
releaseIfSet(cc.registrySem)
messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: headerSchema}

Check failure on line 337 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

ineffectual assignment to messageSchemaPair (ineffassign)

var isValid bool
isValid, err = janitor.ValidateHeader(message, headerSchema, cc.Validators)
if err != nil {
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err
}
if !isValid {
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil
}
}

if cc.mode == Default {
acquireIfSet(cc.registrySem)
messageSchemaPair, err = janitor.CollectSchema(ctx, message, cc.Registry)
schema, err = janitor.CollectSchema(ctx, message.SchemaID, message.Version, cc.Registry)
if err != nil {
setMessageRawAttributes(message, err, "Wrong compile")
releaseIfSet(cc.registrySem)
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err
}
messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: schema}
releaseIfSet(cc.registrySem)

messageTopicPair, err = cc.getMessageTopicPair(messageSchemaPair, encryptedMessageData)
Expand Down
1 change: 1 addition & 0 deletions validator/internal/config/centralconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type CentralConsumer struct {
ShouldLog CentralConsumerShouldLog `toml:"should_log"`
NumSchemaCollectors int `toml:"num_schema_collectors" default:"-1"`
NumInferrers int `toml:"num_inferrers" default:"-1"`
ValidateHeaders bool `toml:"validate_headers"`
MetricsLoggingInterval time.Duration `toml:"metrics_logging_interval" default:"5s"`
RunOptions RunOptions `toml:"run_option"`
Mode int `toml:"mode"`
Expand Down
3 changes: 3 additions & 0 deletions validator/internal/errcodes/errcodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ const (
// CompletedWithErrors marks that the process has completed but errors occurred.
CompletedWithErrors = 904

// MissingHeader declares that something in the header is missing
MissingHeader = 906

// Miscellaneous is used when no other available error code is fitting.
Miscellaneous = 999
)
122 changes: 104 additions & 18 deletions validator/internal/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package janitor

import (
"context"
"encoding/json"
"strings"
"time"

Expand All @@ -35,18 +36,19 @@ import (
// Message defines a Message used for processing broker messages.
// Essentially, Message decorates broker messages with additional, extracted information.
type Message struct {
ID string
Key string
RawAttributes map[string]interface{}
Payload []byte
IngestionTime time.Time
SchemaID string
Version string
Format string
ID string
Key string
RawAttributes map[string]interface{}
Payload []byte
IngestionTime time.Time
SchemaID string
Version string
Format string
HeaderValidation bool
}

const (
// AttributeSchemaID is one of the keys expected to beł found in the attributes field of the message.
// AttributeSchemaID is one of the keys expected to be found in the attributes field of the message.
// It holds the schema id information concerning the data field of the message
AttributeSchemaID = "schemaId"

Expand All @@ -57,6 +59,18 @@ const (
// AttributeFormat is one of the keys expected to be found in the attributes field of the message.
// It holds the format of the data field of the message.
AttributeFormat = "format"

// HeaderValidation is one of the keys that can occur in raw attributes section of header.
// It determines if the header will be validated
HeaderValidation = "headerValidation"

// AttributeHeaderID is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true.
// It holds the header's schema id that is used to check header validity
AttributeHeaderID = "headerSchemaId"

// AttributeHeaderVersion is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true.
// It holds the header's schema version that is used to check header validity
AttributeHeaderVersion = "headerVersionId"
)

// MessageSchemaPair wraps a Message with the Schema relating to this Message.
Expand All @@ -65,27 +79,27 @@ type MessageSchemaPair struct {
Schema []byte
}

// CollectSchema retrieves the schema of the given Message from registry.SchemaRegistry.
// CollectSchema retrieves the schema with the given id and version from registry.SchemaRegistry.
//
// If schema retrieval results in registry.ErrNotFound, or Message.SchemaID or Message.Version is an empty string,
// If schema retrieval results in registry.ErrNotFound, or id or version are an empty string,
// the Message is put on the results channel with MessageSchemaPair.Schema set to nil.
//
// The returned error is an instance of OpError for improved error handling (so that the source of this error is identifiable
// even if combined with other errors).
func CollectSchema(ctx context.Context, message Message, schemaRegistry registry.SchemaRegistry) (MessageSchemaPair, error) {
if message.SchemaID == "" || message.Version == "" {
return MessageSchemaPair{Message: message, Schema: nil}, nil
func CollectSchema(ctx context.Context, id string, version string, schemaRegistry registry.SchemaRegistry) ([]byte, error) {
if id == "" || version == "" {
return nil, nil
}

schema, err := schemaRegistry.Get(ctx, message.SchemaID, message.Version)
schema, err := schemaRegistry.Get(ctx, id, version)
if err != nil {
if errors.Is(err, registry.ErrNotFound) {
return MessageSchemaPair{Message: message, Schema: nil}, nil
return nil, nil
}
return MessageSchemaPair{}, intoOpErr(message.ID, errcodes.RegistryUnresponsive, err)
return nil, intoOpErr(id, errcodes.RegistryUnresponsive, err)
}

return MessageSchemaPair{Message: message, Schema: schema}, nil
return schema, nil
}

// Validators is a convenience type for a map containing validator.Validator instances for available message formats.
Expand All @@ -103,6 +117,78 @@ func (vs Validators) Validate(message Message, schema []byte) (bool, error) {
return v.Validate(message.Payload, schema, message.SchemaID, message.Version)
}

func GetHeaderIdAndVersion(message Message) (string, string, error) {
var id string
var version string
var ok bool

if id, ok = message.RawAttributes[AttributeHeaderID].(string); !ok {
return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header ID"))
}
if version, ok = message.RawAttributes[AttributeHeaderVersion].(string); !ok {
return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header version"))
}
return id, version, nil
}

func ValidateHeader(message Message, schema []byte, validators Validators) (bool, error) {
if len(schema) == 0 {
errMissingHeaderSchema := errors.WithMessage(validator.ErrMissingHeaderSchema, "")
message.RawAttributes["deadLetterErrorCategory"] = "Header schema error"
message.RawAttributes["deadLetterErrorReason"] = errMissingHeaderSchema.Error()
return false, nil
}
headerData, err := generateHeaderData(message.RawAttributes)
if err != nil {
message.RawAttributes["deadLetterErrorCategory"] = "Header schema error"
message.RawAttributes["deadLetterErrorReason"] = err.Error()
return false, err
}

headerSchemaId, ok := message.RawAttributes[AttributeHeaderID].(string)
if !ok {
return false, errors.New("header ID is not in a supported format")
}
headerSchemaVersion, ok := message.RawAttributes[AttributeHeaderVersion].(string)
if !ok {
return false, errors.New("header version is not in a supported format")
}

isValid, err := validators["json"].Validate(headerData, schema, headerSchemaId, headerSchemaVersion)
if err != nil {
if errors.Is(err, validator.ErrFailedValidation) {
message.RawAttributes["deadLetterErrorCategory"] = "Header validation error"
message.RawAttributes["deadLetterErrorReason"] = errors.WithMessage(validator.ErrFailedHeaderValidation, err.Error())
return false, nil
} else if errors.Is(err, validator.ErrDeadletter) {
return false, nil
}
return false, intoOpErr(message.ID, errcodes.ValidationFailure, err)
}

if isValid {
return true, nil
}
return false, nil
}

func generateHeaderData(rawAttributes map[string]interface{}) ([]byte, error) {
cleanAttributes := make(map[string]interface{})
for key, value := range rawAttributes {
if key == HeaderValidation || key == AttributeHeaderID || key == AttributeHeaderVersion ||
key == AttributeSchemaID || key == AttributeSchemaVersion || key == AttributeFormat {
continue
} else {
cleanAttributes[key] = value
}
}
headerData, err := json.Marshal(cleanAttributes)
if err != nil {
return nil, err
}
return headerData, nil
}

// SchemaGenerators is a convenience type for a map containing schemagen.Generator instances for available message formats.
type SchemaGenerators map[string]schemagen.Generator

Expand Down
2 changes: 1 addition & 1 deletion validator/internal/janitor/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package janitor

import (
"github.com/dataphos/schema-registry-validator/internal/errtemplates"
"github.com/dataphos/lib-streamproc/pkg/streamproc"
"github.com/dataphos/schema-registry-validator/internal/errtemplates"
)

// ParseMessage parses a given broker.Message into Message, by setting Message.Payload to the value of the data field of the given
Expand Down
9 changes: 5 additions & 4 deletions validator/internal/janitorctl/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (
"context"
"runtime/debug"

"github.com/dataphos/lib-brokers/pkg/broker"
"github.com/dataphos/lib-logger/logger"
"github.com/dataphos/lib-logger/standardlogger"
"github.com/dataphos/lib-shutdown/pkg/graceful"
"github.com/dataphos/schema-registry-validator/internal/centralconsumer"
"github.com/dataphos/schema-registry-validator/internal/config"
"github.com/dataphos/schema-registry-validator/internal/errcodes"
"github.com/dataphos/schema-registry-validator/internal/janitor"
"github.com/dataphos/schema-registry-validator/internal/pullercleaner"
"github.com/dataphos/schema-registry-validator/internal/registry"
"github.com/dataphos/lib-brokers/pkg/broker"
"github.com/dataphos/lib-logger/logger"
"github.com/dataphos/lib-logger/standardlogger"
"github.com/dataphos/lib-shutdown/pkg/graceful"
)

type ProcessorInitFunc func(context.Context, registry.SchemaRegistry, broker.Publisher) (*janitor.Processor, error)
Expand Down Expand Up @@ -86,6 +86,7 @@ func RunCentralConsumer(configFile string) {
centralconsumer.Settings{
NumSchemaCollectors: cfg.NumSchemaCollectors,
NumInferrers: cfg.NumInferrers,
ValidateHeaders: cfg.ValidateHeaders,
},
log,
centralconsumer.RouterFlags{
Expand Down
10 changes: 8 additions & 2 deletions validator/internal/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ var ErrDeadletter = errors.New("deadletter")
var ErrBrokenMessage = errors.New("Message is not in valid format")

// ErrWrongCompile is a special error type to help distinguish messages that had fault while compiling.
var ErrWrongCompile = errors.New("There is an error while compiling.")
var ErrWrongCompile = errors.New("There is an error while compiling")

// ErrMissingSchema is a special error type to help distinguish messages that are missing schema.
var ErrMissingSchema = errors.New("Message is missing a schema")

// ErrMissingHeaderSchema is a special error type to help distinguish headers that are missing schema (header validation).
var ErrMissingHeaderSchema = errors.New("Header is missing a schema")

// ErrFailedValidation is a special error type to help distinguish messages that have failed in validation.
var ErrFailedValidation = errors.New("An error occured while validating message.")
var ErrFailedValidation = errors.New("An error occured while validating message")

// ErrFailedHeaderValidation is a special error type to help distinguish messages' headers that have failed in validation.
var ErrFailedHeaderValidation = errors.New("An error occured while validating message's header")

// Validator is the interface used to model message validators.
type Validator interface {
Expand Down

0 comments on commit 833192b

Please sign in to comment.