Skip to content

Commit

Permalink
feat: custom header validation
Browse files Browse the repository at this point in the history
* Add feature for validation of custom header

* Dataphos Validator's required properties in the header are now removed from the header that is generated to more accurately validate the schema

* Added and modified error messages a bit

* Added setting of the Validator (CC) validateHeaders which can be set to true, meaning header validation is set on the Validator level. This however can be trumped by setting the headerValidation flag in the header of a message whose header user would want to validate.

* Fix potential error and add a comment
  • Loading branch information
Simun17 authored Nov 7, 2024
1 parent 8984666 commit f35aa6a
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 41 deletions.
86 changes: 70 additions & 16 deletions validator/internal/centralconsumer/centralconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
"encoding/json"
"strconv"

"github.com/dataphos/lib-brokers/pkg/broker"
"github.com/dataphos/lib-logger/logger"
"github.com/dataphos/schema-registry-validator/internal/errtemplates"
"github.com/dataphos/schema-registry-validator/internal/janitor"
"github.com/dataphos/schema-registry-validator/internal/registry"
"github.com/dataphos/lib-brokers/pkg/broker"
"github.com/dataphos/lib-logger/logger"
jsoninternal "github.com/dataphos/schema-registry-validator/internal/validator/json"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -63,18 +64,19 @@ type VersionDetails struct {

// CentralConsumer models the central consumer process.
type CentralConsumer struct {
Registry registry.SchemaRegistry
Validators janitor.Validators
Router janitor.Router
Publisher broker.Publisher
topicIDs Topics
topics map[string]broker.Topic
registrySem chan struct{}
validatorsSem chan struct{}
log logger.Log
mode Mode
schema Schema
encryptionKey string
Registry registry.SchemaRegistry
Validators janitor.Validators
Router janitor.Router
Publisher broker.Publisher
topicIDs Topics
topics map[string]broker.Topic
registrySem chan struct{}
validatorsSem chan struct{}
log logger.Log
mode Mode
schema Schema
encryptionKey string
validateHeaders bool
}

// Settings holds settings concerning the concurrency limits for various stages of the central consumer pipeline.
Expand All @@ -84,6 +86,9 @@ type Settings struct {

// NumInferrers defines the maximum amount of inflight destination topic inference jobs (validation and routing).
NumInferrers int

// ValidateHeaders defines if the messages' headers will be validated
ValidateHeaders bool
}

// Topics defines the standard destination topics, based on validation results.
Expand Down Expand Up @@ -122,6 +127,13 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator
if settings.NumInferrers > 0 {
validatorsSem = make(chan struct{}, settings.NumInferrers)
}
if settings.ValidateHeaders == true {

Check failure on line 130 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

S1002: should omit comparison to bool constant, can be simplified to `settings.ValidateHeaders` (gosimple)

Check failure on line 130 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

S1002: should omit comparison to bool constant, can be simplified to `settings.ValidateHeaders` (gosimple)

Check failure on line 130 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

S1002: should omit comparison to bool constant, can be simplified to `settings.ValidateHeaders` (gosimple)
_, ok := validators["json"]
if !ok {
// if json validation is turned off, this version of json validator is used by default for validating message header
validators["json"] = jsoninternal.New()
}
}

var schemaReturned []byte
if mode == OneCCPerTopic {
Expand Down Expand Up @@ -171,7 +183,8 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator
},
Specification: schemaVersion.Specification,
},
encryptionKey: encryptionKey,
encryptionKey: encryptionKey,
validateHeaders: settings.ValidateHeaders,
}, nil
}

Expand Down Expand Up @@ -286,21 +299,62 @@ func (cc *CentralConsumer) AsProcessor() *janitor.Processor {

func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) (janitor.MessageTopicPair, error) {
var (
schema []byte
messageSchemaPair janitor.MessageSchemaPair
messageTopicPair janitor.MessageTopicPair
specificSchemaVersion VersionDetails
err error
encryptedMessageData []byte
)

// header validation is turned on if a message specifies so in the header OR if validateHeaders flag is set
// on the Validator level
if message.RawAttributes[janitor.HeaderValidation] == "true" ||
(cc.validateHeaders == true && message.RawAttributes[janitor.HeaderValidation] != "false") {

Check failure on line 313 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

S1002: should omit comparison to bool constant, can be simplified to `cc.validateHeaders` (gosimple)

Check failure on line 313 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

S1002: should omit comparison to bool constant, can be simplified to `cc.validateHeaders` (gosimple)

Check failure on line 313 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

S1002: should omit comparison to bool constant, can be simplified to `cc.validateHeaders` (gosimple)
_, ok := cc.Validators["json"]
// it is possible json validator isn't initialized by this point so we are checking it just in case
if !ok {
cc.Validators["json"] = jsoninternal.New()
}
var (
headerId string
headerVersion string
headerSchema []byte
)

headerId, headerVersion, err = janitor.GetHeaderIdAndVersion(message)
if err != nil {
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err
}
acquireIfSet(cc.registrySem)
headerSchema, err = janitor.CollectSchema(ctx, headerId, headerVersion, cc.Registry)
if err != nil {
setMessageRawAttributes(message, err, "Wrong compile")
releaseIfSet(cc.registrySem)
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err
}
releaseIfSet(cc.registrySem)
messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: headerSchema}

Check failure on line 337 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

ineffectual assignment to messageSchemaPair (ineffassign)

Check failure on line 337 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

ineffectual assignment to messageSchemaPair (ineffassign)

Check failure on line 337 in validator/internal/centralconsumer/centralconsumer.go

View workflow job for this annotation

GitHub Actions / Go lint job for all components (./validator)

ineffectual assignment to messageSchemaPair (ineffassign)

var isValid bool
isValid, err = janitor.ValidateHeader(message, headerSchema, cc.Validators)
if err != nil {
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err
}
if !isValid {
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil
}
}

if cc.mode == Default {
acquireIfSet(cc.registrySem)
messageSchemaPair, err = janitor.CollectSchema(ctx, message, cc.Registry)
schema, err = janitor.CollectSchema(ctx, message.SchemaID, message.Version, cc.Registry)
if err != nil {
setMessageRawAttributes(message, err, "Wrong compile")
releaseIfSet(cc.registrySem)
return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err
}
messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: schema}
releaseIfSet(cc.registrySem)

messageTopicPair, err = cc.getMessageTopicPair(messageSchemaPair, encryptedMessageData)
Expand Down
1 change: 1 addition & 0 deletions validator/internal/config/centralconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type CentralConsumer struct {
ShouldLog CentralConsumerShouldLog `toml:"should_log"`
NumSchemaCollectors int `toml:"num_schema_collectors" default:"-1"`
NumInferrers int `toml:"num_inferrers" default:"-1"`
ValidateHeaders bool `toml:"validate_headers"`
MetricsLoggingInterval time.Duration `toml:"metrics_logging_interval" default:"5s"`
RunOptions RunOptions `toml:"run_option"`
Mode int `toml:"mode"`
Expand Down
3 changes: 3 additions & 0 deletions validator/internal/errcodes/errcodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ const (
// CompletedWithErrors marks that the process has completed but errors occurred.
CompletedWithErrors = 904

// MissingHeader declares that something in the header is missing
MissingHeader = 906

// Miscellaneous is used when no other available error code is fitting.
Miscellaneous = 999
)
122 changes: 104 additions & 18 deletions validator/internal/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package janitor

import (
"context"
"encoding/json"
"strings"
"time"

Expand All @@ -35,18 +36,19 @@ import (
// Message defines a Message used for processing broker messages.
// Essentially, Message decorates broker messages with additional, extracted information.
type Message struct {
ID string
Key string
RawAttributes map[string]interface{}
Payload []byte
IngestionTime time.Time
SchemaID string
Version string
Format string
ID string
Key string
RawAttributes map[string]interface{}
Payload []byte
IngestionTime time.Time
SchemaID string
Version string
Format string
HeaderValidation bool
}

const (
// AttributeSchemaID is one of the keys expected to beł found in the attributes field of the message.
// AttributeSchemaID is one of the keys expected to be found in the attributes field of the message.
// It holds the schema id information concerning the data field of the message
AttributeSchemaID = "schemaId"

Expand All @@ -57,6 +59,18 @@ const (
// AttributeFormat is one of the keys expected to be found in the attributes field of the message.
// It holds the format of the data field of the message.
AttributeFormat = "format"

// HeaderValidation is one of the keys that can occur in raw attributes section of header.
// It determines if the header will be validated
HeaderValidation = "headerValidation"

// AttributeHeaderID is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true.
// It holds the header's schema id that is used to check header validity
AttributeHeaderID = "headerSchemaId"

// AttributeHeaderVersion is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true.
// It holds the header's schema version that is used to check header validity
AttributeHeaderVersion = "headerVersionId"
)

// MessageSchemaPair wraps a Message with the Schema relating to this Message.
Expand All @@ -65,27 +79,27 @@ type MessageSchemaPair struct {
Schema []byte
}

// CollectSchema retrieves the schema of the given Message from registry.SchemaRegistry.
// CollectSchema retrieves the schema with the given id and version from registry.SchemaRegistry.
//
// If schema retrieval results in registry.ErrNotFound, or Message.SchemaID or Message.Version is an empty string,
// If schema retrieval results in registry.ErrNotFound, or id or version are an empty string,
// the Message is put on the results channel with MessageSchemaPair.Schema set to nil.
//
// The returned error is an instance of OpError for improved error handling (so that the source of this error is identifiable
// even if combined with other errors).
func CollectSchema(ctx context.Context, message Message, schemaRegistry registry.SchemaRegistry) (MessageSchemaPair, error) {
if message.SchemaID == "" || message.Version == "" {
return MessageSchemaPair{Message: message, Schema: nil}, nil
func CollectSchema(ctx context.Context, id string, version string, schemaRegistry registry.SchemaRegistry) ([]byte, error) {
if id == "" || version == "" {
return nil, nil
}

schema, err := schemaRegistry.Get(ctx, message.SchemaID, message.Version)
schema, err := schemaRegistry.Get(ctx, id, version)
if err != nil {
if errors.Is(err, registry.ErrNotFound) {
return MessageSchemaPair{Message: message, Schema: nil}, nil
return nil, nil
}
return MessageSchemaPair{}, intoOpErr(message.ID, errcodes.RegistryUnresponsive, err)
return nil, intoOpErr(id, errcodes.RegistryUnresponsive, err)
}

return MessageSchemaPair{Message: message, Schema: schema}, nil
return schema, nil
}

// Validators is a convenience type for a map containing validator.Validator instances for available message formats.
Expand All @@ -103,6 +117,78 @@ func (vs Validators) Validate(message Message, schema []byte) (bool, error) {
return v.Validate(message.Payload, schema, message.SchemaID, message.Version)
}

func GetHeaderIdAndVersion(message Message) (string, string, error) {
var id string
var version string
var ok bool

if id, ok = message.RawAttributes[AttributeHeaderID].(string); !ok {
return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header ID"))
}
if version, ok = message.RawAttributes[AttributeHeaderVersion].(string); !ok {
return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header version"))
}
return id, version, nil
}

func ValidateHeader(message Message, schema []byte, validators Validators) (bool, error) {
if len(schema) == 0 {
errMissingHeaderSchema := errors.WithMessage(validator.ErrMissingHeaderSchema, "")
message.RawAttributes["deadLetterErrorCategory"] = "Header schema error"
message.RawAttributes["deadLetterErrorReason"] = errMissingHeaderSchema.Error()
return false, nil
}
headerData, err := generateHeaderData(message.RawAttributes)
if err != nil {
message.RawAttributes["deadLetterErrorCategory"] = "Header schema error"
message.RawAttributes["deadLetterErrorReason"] = err.Error()
return false, err
}

headerSchemaId, ok := message.RawAttributes[AttributeHeaderID].(string)
if !ok {
return false, errors.New("header ID is not in a supported format")
}
headerSchemaVersion, ok := message.RawAttributes[AttributeHeaderVersion].(string)
if !ok {
return false, errors.New("header version is not in a supported format")
}

isValid, err := validators["json"].Validate(headerData, schema, headerSchemaId, headerSchemaVersion)
if err != nil {
if errors.Is(err, validator.ErrFailedValidation) {
message.RawAttributes["deadLetterErrorCategory"] = "Header validation error"
message.RawAttributes["deadLetterErrorReason"] = errors.WithMessage(validator.ErrFailedHeaderValidation, err.Error())
return false, nil
} else if errors.Is(err, validator.ErrDeadletter) {
return false, nil
}
return false, intoOpErr(message.ID, errcodes.ValidationFailure, err)
}

if isValid {
return true, nil
}
return false, nil
}

func generateHeaderData(rawAttributes map[string]interface{}) ([]byte, error) {
cleanAttributes := make(map[string]interface{})
for key, value := range rawAttributes {
if key == HeaderValidation || key == AttributeHeaderID || key == AttributeHeaderVersion ||
key == AttributeSchemaID || key == AttributeSchemaVersion || key == AttributeFormat {
continue
} else {
cleanAttributes[key] = value
}
}
headerData, err := json.Marshal(cleanAttributes)
if err != nil {
return nil, err
}
return headerData, nil
}

// SchemaGenerators is a convenience type for a map containing schemagen.Generator instances for available message formats.
type SchemaGenerators map[string]schemagen.Generator

Expand Down
2 changes: 1 addition & 1 deletion validator/internal/janitor/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package janitor

import (
"github.com/dataphos/schema-registry-validator/internal/errtemplates"
"github.com/dataphos/lib-streamproc/pkg/streamproc"
"github.com/dataphos/schema-registry-validator/internal/errtemplates"
)

// ParseMessage parses a given broker.Message into Message, by setting Message.Payload to the value of the data field of the given
Expand Down
9 changes: 5 additions & 4 deletions validator/internal/janitorctl/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (
"context"
"runtime/debug"

"github.com/dataphos/lib-brokers/pkg/broker"
"github.com/dataphos/lib-logger/logger"
"github.com/dataphos/lib-logger/standardlogger"
"github.com/dataphos/lib-shutdown/pkg/graceful"
"github.com/dataphos/schema-registry-validator/internal/centralconsumer"
"github.com/dataphos/schema-registry-validator/internal/config"
"github.com/dataphos/schema-registry-validator/internal/errcodes"
"github.com/dataphos/schema-registry-validator/internal/janitor"
"github.com/dataphos/schema-registry-validator/internal/pullercleaner"
"github.com/dataphos/schema-registry-validator/internal/registry"
"github.com/dataphos/lib-brokers/pkg/broker"
"github.com/dataphos/lib-logger/logger"
"github.com/dataphos/lib-logger/standardlogger"
"github.com/dataphos/lib-shutdown/pkg/graceful"
)

type ProcessorInitFunc func(context.Context, registry.SchemaRegistry, broker.Publisher) (*janitor.Processor, error)
Expand Down Expand Up @@ -86,6 +86,7 @@ func RunCentralConsumer(configFile string) {
centralconsumer.Settings{
NumSchemaCollectors: cfg.NumSchemaCollectors,
NumInferrers: cfg.NumInferrers,
ValidateHeaders: cfg.ValidateHeaders,
},
log,
centralconsumer.RouterFlags{
Expand Down
10 changes: 8 additions & 2 deletions validator/internal/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ var ErrDeadletter = errors.New("deadletter")
var ErrBrokenMessage = errors.New("Message is not in valid format")

// ErrWrongCompile is a special error type to help distinguish messages that had fault while compiling.
var ErrWrongCompile = errors.New("There is an error while compiling.")
var ErrWrongCompile = errors.New("There is an error while compiling")

// ErrMissingSchema is a special error type to help distinguish messages that are missing schema.
var ErrMissingSchema = errors.New("Message is missing a schema")

// ErrMissingHeaderSchema is a special error type to help distinguish headers that are missing schema (header validation).
var ErrMissingHeaderSchema = errors.New("Header is missing a schema")

// ErrFailedValidation is a special error type to help distinguish messages that have failed in validation.
var ErrFailedValidation = errors.New("An error occured while validating message.")
var ErrFailedValidation = errors.New("An error occured while validating message")

// ErrFailedHeaderValidation is a special error type to help distinguish messages' headers that have failed in validation.
var ErrFailedHeaderValidation = errors.New("An error occured while validating message's header")

// Validator is the interface used to model message validators.
type Validator interface {
Expand Down

0 comments on commit f35aa6a

Please sign in to comment.