From f35aa6a1a128e7a7369f23ee835732f4aee22a12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Thu, 7 Nov 2024 13:13:34 +0100 Subject: [PATCH] feat: custom header validation * Add feature for validation of custom header * Dataphos Validator's required properties in the header are now removed from the header that is generated to more accurately validate the schema * Added and modified error messages a bit * Added setting of the Validator (CC) validateHeaders which can be set to true, meaning header validation is set on the Validator level. This however can be trumped by setting the headerValidation flag in the header of a message whose header user would want to validate. * Fix potential error and add a comment --- .../centralconsumer/centralconsumer.go | 86 +++++++++--- validator/internal/config/centralconsumer.go | 1 + validator/internal/errcodes/errcodes.go | 3 + validator/internal/janitor/janitor.go | 122 +++++++++++++++--- validator/internal/janitor/parse.go | 2 +- validator/internal/janitorctl/run.go | 9 +- validator/internal/validator/validator.go | 10 +- 7 files changed, 192 insertions(+), 41 deletions(-) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 85fa02e..087d55b 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -19,11 +19,12 @@ import ( "encoding/json" "strconv" + "github.com/dataphos/lib-brokers/pkg/broker" + "github.com/dataphos/lib-logger/logger" "github.com/dataphos/schema-registry-validator/internal/errtemplates" "github.com/dataphos/schema-registry-validator/internal/janitor" "github.com/dataphos/schema-registry-validator/internal/registry" - "github.com/dataphos/lib-brokers/pkg/broker" - "github.com/dataphos/lib-logger/logger" + jsoninternal "github.com/dataphos/schema-registry-validator/internal/validator/json" "github.com/pkg/errors" ) @@ -63,18 +64,19 @@ type VersionDetails struct { // CentralConsumer models the central consumer process. type CentralConsumer struct { - Registry registry.SchemaRegistry - Validators janitor.Validators - Router janitor.Router - Publisher broker.Publisher - topicIDs Topics - topics map[string]broker.Topic - registrySem chan struct{} - validatorsSem chan struct{} - log logger.Log - mode Mode - schema Schema - encryptionKey string + Registry registry.SchemaRegistry + Validators janitor.Validators + Router janitor.Router + Publisher broker.Publisher + topicIDs Topics + topics map[string]broker.Topic + registrySem chan struct{} + validatorsSem chan struct{} + log logger.Log + mode Mode + schema Schema + encryptionKey string + validateHeaders bool } // Settings holds settings concerning the concurrency limits for various stages of the central consumer pipeline. @@ -84,6 +86,9 @@ type Settings struct { // NumInferrers defines the maximum amount of inflight destination topic inference jobs (validation and routing). NumInferrers int + + // ValidateHeaders defines if the messages' headers will be validated + ValidateHeaders bool } // Topics defines the standard destination topics, based on validation results. @@ -122,6 +127,13 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator if settings.NumInferrers > 0 { validatorsSem = make(chan struct{}, settings.NumInferrers) } + if settings.ValidateHeaders == true { + _, ok := validators["json"] + if !ok { + // if json validation is turned off, this version of json validator is used by default for validating message header + validators["json"] = jsoninternal.New() + } + } var schemaReturned []byte if mode == OneCCPerTopic { @@ -171,7 +183,8 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator }, Specification: schemaVersion.Specification, }, - encryptionKey: encryptionKey, + encryptionKey: encryptionKey, + validateHeaders: settings.ValidateHeaders, }, nil } @@ -286,6 +299,7 @@ func (cc *CentralConsumer) AsProcessor() *janitor.Processor { func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) (janitor.MessageTopicPair, error) { var ( + schema []byte messageSchemaPair janitor.MessageSchemaPair messageTopicPair janitor.MessageTopicPair specificSchemaVersion VersionDetails @@ -293,14 +307,54 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) encryptedMessageData []byte ) + // header validation is turned on if a message specifies so in the header OR if validateHeaders flag is set + // on the Validator level + if message.RawAttributes[janitor.HeaderValidation] == "true" || + (cc.validateHeaders == true && message.RawAttributes[janitor.HeaderValidation] != "false") { + _, ok := cc.Validators["json"] + // it is possible json validator isn't initialized by this point so we are checking it just in case + if !ok { + cc.Validators["json"] = jsoninternal.New() + } + var ( + headerId string + headerVersion string + headerSchema []byte + ) + + headerId, headerVersion, err = janitor.GetHeaderIdAndVersion(message) + if err != nil { + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } + acquireIfSet(cc.registrySem) + headerSchema, err = janitor.CollectSchema(ctx, headerId, headerVersion, cc.Registry) + if err != nil { + setMessageRawAttributes(message, err, "Wrong compile") + releaseIfSet(cc.registrySem) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } + releaseIfSet(cc.registrySem) + messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: headerSchema} + + var isValid bool + isValid, err = janitor.ValidateHeader(message, headerSchema, cc.Validators) + if err != nil { + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } + if !isValid { + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } + } + if cc.mode == Default { acquireIfSet(cc.registrySem) - messageSchemaPair, err = janitor.CollectSchema(ctx, message, cc.Registry) + schema, err = janitor.CollectSchema(ctx, message.SchemaID, message.Version, cc.Registry) if err != nil { setMessageRawAttributes(message, err, "Wrong compile") releaseIfSet(cc.registrySem) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } + messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: schema} releaseIfSet(cc.registrySem) messageTopicPair, err = cc.getMessageTopicPair(messageSchemaPair, encryptedMessageData) diff --git a/validator/internal/config/centralconsumer.go b/validator/internal/config/centralconsumer.go index c6d264d..4545bb0 100644 --- a/validator/internal/config/centralconsumer.go +++ b/validator/internal/config/centralconsumer.go @@ -30,6 +30,7 @@ type CentralConsumer struct { ShouldLog CentralConsumerShouldLog `toml:"should_log"` NumSchemaCollectors int `toml:"num_schema_collectors" default:"-1"` NumInferrers int `toml:"num_inferrers" default:"-1"` + ValidateHeaders bool `toml:"validate_headers"` MetricsLoggingInterval time.Duration `toml:"metrics_logging_interval" default:"5s"` RunOptions RunOptions `toml:"run_option"` Mode int `toml:"mode"` diff --git a/validator/internal/errcodes/errcodes.go b/validator/internal/errcodes/errcodes.go index 6114ea7..c908eba 100644 --- a/validator/internal/errcodes/errcodes.go +++ b/validator/internal/errcodes/errcodes.go @@ -77,6 +77,9 @@ const ( // CompletedWithErrors marks that the process has completed but errors occurred. CompletedWithErrors = 904 + // MissingHeader declares that something in the header is missing + MissingHeader = 906 + // Miscellaneous is used when no other available error code is fitting. Miscellaneous = 999 ) diff --git a/validator/internal/janitor/janitor.go b/validator/internal/janitor/janitor.go index 6b9af88..e9ea488 100644 --- a/validator/internal/janitor/janitor.go +++ b/validator/internal/janitor/janitor.go @@ -18,6 +18,7 @@ package janitor import ( "context" + "encoding/json" "strings" "time" @@ -35,18 +36,19 @@ import ( // Message defines a Message used for processing broker messages. // Essentially, Message decorates broker messages with additional, extracted information. type Message struct { - ID string - Key string - RawAttributes map[string]interface{} - Payload []byte - IngestionTime time.Time - SchemaID string - Version string - Format string + ID string + Key string + RawAttributes map[string]interface{} + Payload []byte + IngestionTime time.Time + SchemaID string + Version string + Format string + HeaderValidation bool } const ( - // AttributeSchemaID is one of the keys expected to beł found in the attributes field of the message. + // AttributeSchemaID is one of the keys expected to be found in the attributes field of the message. // It holds the schema id information concerning the data field of the message AttributeSchemaID = "schemaId" @@ -57,6 +59,18 @@ const ( // AttributeFormat is one of the keys expected to be found in the attributes field of the message. // It holds the format of the data field of the message. AttributeFormat = "format" + + // HeaderValidation is one of the keys that can occur in raw attributes section of header. + // It determines if the header will be validated + HeaderValidation = "headerValidation" + + // AttributeHeaderID is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true. + // It holds the header's schema id that is used to check header validity + AttributeHeaderID = "headerSchemaId" + + // AttributeHeaderVersion is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true. + // It holds the header's schema version that is used to check header validity + AttributeHeaderVersion = "headerVersionId" ) // MessageSchemaPair wraps a Message with the Schema relating to this Message. @@ -65,27 +79,27 @@ type MessageSchemaPair struct { Schema []byte } -// CollectSchema retrieves the schema of the given Message from registry.SchemaRegistry. +// CollectSchema retrieves the schema with the given id and version from registry.SchemaRegistry. // -// If schema retrieval results in registry.ErrNotFound, or Message.SchemaID or Message.Version is an empty string, +// If schema retrieval results in registry.ErrNotFound, or id or version are an empty string, // the Message is put on the results channel with MessageSchemaPair.Schema set to nil. // // The returned error is an instance of OpError for improved error handling (so that the source of this error is identifiable // even if combined with other errors). -func CollectSchema(ctx context.Context, message Message, schemaRegistry registry.SchemaRegistry) (MessageSchemaPair, error) { - if message.SchemaID == "" || message.Version == "" { - return MessageSchemaPair{Message: message, Schema: nil}, nil +func CollectSchema(ctx context.Context, id string, version string, schemaRegistry registry.SchemaRegistry) ([]byte, error) { + if id == "" || version == "" { + return nil, nil } - schema, err := schemaRegistry.Get(ctx, message.SchemaID, message.Version) + schema, err := schemaRegistry.Get(ctx, id, version) if err != nil { if errors.Is(err, registry.ErrNotFound) { - return MessageSchemaPair{Message: message, Schema: nil}, nil + return nil, nil } - return MessageSchemaPair{}, intoOpErr(message.ID, errcodes.RegistryUnresponsive, err) + return nil, intoOpErr(id, errcodes.RegistryUnresponsive, err) } - return MessageSchemaPair{Message: message, Schema: schema}, nil + return schema, nil } // Validators is a convenience type for a map containing validator.Validator instances for available message formats. @@ -103,6 +117,78 @@ func (vs Validators) Validate(message Message, schema []byte) (bool, error) { return v.Validate(message.Payload, schema, message.SchemaID, message.Version) } +func GetHeaderIdAndVersion(message Message) (string, string, error) { + var id string + var version string + var ok bool + + if id, ok = message.RawAttributes[AttributeHeaderID].(string); !ok { + return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header ID")) + } + if version, ok = message.RawAttributes[AttributeHeaderVersion].(string); !ok { + return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header version")) + } + return id, version, nil +} + +func ValidateHeader(message Message, schema []byte, validators Validators) (bool, error) { + if len(schema) == 0 { + errMissingHeaderSchema := errors.WithMessage(validator.ErrMissingHeaderSchema, "") + message.RawAttributes["deadLetterErrorCategory"] = "Header schema error" + message.RawAttributes["deadLetterErrorReason"] = errMissingHeaderSchema.Error() + return false, nil + } + headerData, err := generateHeaderData(message.RawAttributes) + if err != nil { + message.RawAttributes["deadLetterErrorCategory"] = "Header schema error" + message.RawAttributes["deadLetterErrorReason"] = err.Error() + return false, err + } + + headerSchemaId, ok := message.RawAttributes[AttributeHeaderID].(string) + if !ok { + return false, errors.New("header ID is not in a supported format") + } + headerSchemaVersion, ok := message.RawAttributes[AttributeHeaderVersion].(string) + if !ok { + return false, errors.New("header version is not in a supported format") + } + + isValid, err := validators["json"].Validate(headerData, schema, headerSchemaId, headerSchemaVersion) + if err != nil { + if errors.Is(err, validator.ErrFailedValidation) { + message.RawAttributes["deadLetterErrorCategory"] = "Header validation error" + message.RawAttributes["deadLetterErrorReason"] = errors.WithMessage(validator.ErrFailedHeaderValidation, err.Error()) + return false, nil + } else if errors.Is(err, validator.ErrDeadletter) { + return false, nil + } + return false, intoOpErr(message.ID, errcodes.ValidationFailure, err) + } + + if isValid { + return true, nil + } + return false, nil +} + +func generateHeaderData(rawAttributes map[string]interface{}) ([]byte, error) { + cleanAttributes := make(map[string]interface{}) + for key, value := range rawAttributes { + if key == HeaderValidation || key == AttributeHeaderID || key == AttributeHeaderVersion || + key == AttributeSchemaID || key == AttributeSchemaVersion || key == AttributeFormat { + continue + } else { + cleanAttributes[key] = value + } + } + headerData, err := json.Marshal(cleanAttributes) + if err != nil { + return nil, err + } + return headerData, nil +} + // SchemaGenerators is a convenience type for a map containing schemagen.Generator instances for available message formats. type SchemaGenerators map[string]schemagen.Generator diff --git a/validator/internal/janitor/parse.go b/validator/internal/janitor/parse.go index 1ac9e55..42657a6 100644 --- a/validator/internal/janitor/parse.go +++ b/validator/internal/janitor/parse.go @@ -15,8 +15,8 @@ package janitor import ( - "github.com/dataphos/schema-registry-validator/internal/errtemplates" "github.com/dataphos/lib-streamproc/pkg/streamproc" + "github.com/dataphos/schema-registry-validator/internal/errtemplates" ) // ParseMessage parses a given broker.Message into Message, by setting Message.Payload to the value of the data field of the given diff --git a/validator/internal/janitorctl/run.go b/validator/internal/janitorctl/run.go index 35b112d..f719707 100644 --- a/validator/internal/janitorctl/run.go +++ b/validator/internal/janitorctl/run.go @@ -18,16 +18,16 @@ import ( "context" "runtime/debug" + "github.com/dataphos/lib-brokers/pkg/broker" + "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/lib-logger/standardlogger" + "github.com/dataphos/lib-shutdown/pkg/graceful" "github.com/dataphos/schema-registry-validator/internal/centralconsumer" "github.com/dataphos/schema-registry-validator/internal/config" "github.com/dataphos/schema-registry-validator/internal/errcodes" "github.com/dataphos/schema-registry-validator/internal/janitor" "github.com/dataphos/schema-registry-validator/internal/pullercleaner" "github.com/dataphos/schema-registry-validator/internal/registry" - "github.com/dataphos/lib-brokers/pkg/broker" - "github.com/dataphos/lib-logger/logger" - "github.com/dataphos/lib-logger/standardlogger" - "github.com/dataphos/lib-shutdown/pkg/graceful" ) type ProcessorInitFunc func(context.Context, registry.SchemaRegistry, broker.Publisher) (*janitor.Processor, error) @@ -86,6 +86,7 @@ func RunCentralConsumer(configFile string) { centralconsumer.Settings{ NumSchemaCollectors: cfg.NumSchemaCollectors, NumInferrers: cfg.NumInferrers, + ValidateHeaders: cfg.ValidateHeaders, }, log, centralconsumer.RouterFlags{ diff --git a/validator/internal/validator/validator.go b/validator/internal/validator/validator.go index 82ccaac..ce74d22 100644 --- a/validator/internal/validator/validator.go +++ b/validator/internal/validator/validator.go @@ -24,13 +24,19 @@ var ErrDeadletter = errors.New("deadletter") var ErrBrokenMessage = errors.New("Message is not in valid format") // ErrWrongCompile is a special error type to help distinguish messages that had fault while compiling. -var ErrWrongCompile = errors.New("There is an error while compiling.") +var ErrWrongCompile = errors.New("There is an error while compiling") // ErrMissingSchema is a special error type to help distinguish messages that are missing schema. var ErrMissingSchema = errors.New("Message is missing a schema") +// ErrMissingHeaderSchema is a special error type to help distinguish headers that are missing schema (header validation). +var ErrMissingHeaderSchema = errors.New("Header is missing a schema") + // ErrFailedValidation is a special error type to help distinguish messages that have failed in validation. -var ErrFailedValidation = errors.New("An error occured while validating message.") +var ErrFailedValidation = errors.New("An error occured while validating message") + +// ErrFailedHeaderValidation is a special error type to help distinguish messages' headers that have failed in validation. +var ErrFailedHeaderValidation = errors.New("An error occured while validating message's header") // Validator is the interface used to model message validators. type Validator interface {