diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 85fa02e..087d55b 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -19,11 +19,12 @@ import ( "encoding/json" "strconv" + "github.com/dataphos/lib-brokers/pkg/broker" + "github.com/dataphos/lib-logger/logger" "github.com/dataphos/schema-registry-validator/internal/errtemplates" "github.com/dataphos/schema-registry-validator/internal/janitor" "github.com/dataphos/schema-registry-validator/internal/registry" - "github.com/dataphos/lib-brokers/pkg/broker" - "github.com/dataphos/lib-logger/logger" + jsoninternal "github.com/dataphos/schema-registry-validator/internal/validator/json" "github.com/pkg/errors" ) @@ -63,18 +64,19 @@ type VersionDetails struct { // CentralConsumer models the central consumer process. type CentralConsumer struct { - Registry registry.SchemaRegistry - Validators janitor.Validators - Router janitor.Router - Publisher broker.Publisher - topicIDs Topics - topics map[string]broker.Topic - registrySem chan struct{} - validatorsSem chan struct{} - log logger.Log - mode Mode - schema Schema - encryptionKey string + Registry registry.SchemaRegistry + Validators janitor.Validators + Router janitor.Router + Publisher broker.Publisher + topicIDs Topics + topics map[string]broker.Topic + registrySem chan struct{} + validatorsSem chan struct{} + log logger.Log + mode Mode + schema Schema + encryptionKey string + validateHeaders bool } // Settings holds settings concerning the concurrency limits for various stages of the central consumer pipeline. @@ -84,6 +86,9 @@ type Settings struct { // NumInferrers defines the maximum amount of inflight destination topic inference jobs (validation and routing). NumInferrers int + + // ValidateHeaders defines if the messages' headers will be validated + ValidateHeaders bool } // Topics defines the standard destination topics, based on validation results. @@ -122,6 +127,13 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator if settings.NumInferrers > 0 { validatorsSem = make(chan struct{}, settings.NumInferrers) } + if settings.ValidateHeaders == true { + _, ok := validators["json"] + if !ok { + // if json validation is turned off, this version of json validator is used by default for validating message header + validators["json"] = jsoninternal.New() + } + } var schemaReturned []byte if mode == OneCCPerTopic { @@ -171,7 +183,8 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator }, Specification: schemaVersion.Specification, }, - encryptionKey: encryptionKey, + encryptionKey: encryptionKey, + validateHeaders: settings.ValidateHeaders, }, nil } @@ -286,6 +299,7 @@ func (cc *CentralConsumer) AsProcessor() *janitor.Processor { func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) (janitor.MessageTopicPair, error) { var ( + schema []byte messageSchemaPair janitor.MessageSchemaPair messageTopicPair janitor.MessageTopicPair specificSchemaVersion VersionDetails @@ -293,14 +307,54 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) encryptedMessageData []byte ) + // header validation is turned on if a message specifies so in the header OR if validateHeaders flag is set + // on the Validator level + if message.RawAttributes[janitor.HeaderValidation] == "true" || + (cc.validateHeaders == true && message.RawAttributes[janitor.HeaderValidation] != "false") { + _, ok := cc.Validators["json"] + // it is possible json validator isn't initialized by this point so we are checking it just in case + if !ok { + cc.Validators["json"] = jsoninternal.New() + } + var ( + headerId string + headerVersion string + headerSchema []byte + ) + + headerId, headerVersion, err = janitor.GetHeaderIdAndVersion(message) + if err != nil { + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } + acquireIfSet(cc.registrySem) + headerSchema, err = janitor.CollectSchema(ctx, headerId, headerVersion, cc.Registry) + if err != nil { + setMessageRawAttributes(message, err, "Wrong compile") + releaseIfSet(cc.registrySem) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } + releaseIfSet(cc.registrySem) + messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: headerSchema} + + var isValid bool + isValid, err = janitor.ValidateHeader(message, headerSchema, cc.Validators) + if err != nil { + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } + if !isValid { + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } + } + if cc.mode == Default { acquireIfSet(cc.registrySem) - messageSchemaPair, err = janitor.CollectSchema(ctx, message, cc.Registry) + schema, err = janitor.CollectSchema(ctx, message.SchemaID, message.Version, cc.Registry) if err != nil { setMessageRawAttributes(message, err, "Wrong compile") releaseIfSet(cc.registrySem) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } + messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: schema} releaseIfSet(cc.registrySem) messageTopicPair, err = cc.getMessageTopicPair(messageSchemaPair, encryptedMessageData) diff --git a/validator/internal/config/centralconsumer.go b/validator/internal/config/centralconsumer.go index c6d264d..4545bb0 100644 --- a/validator/internal/config/centralconsumer.go +++ b/validator/internal/config/centralconsumer.go @@ -30,6 +30,7 @@ type CentralConsumer struct { ShouldLog CentralConsumerShouldLog `toml:"should_log"` NumSchemaCollectors int `toml:"num_schema_collectors" default:"-1"` NumInferrers int `toml:"num_inferrers" default:"-1"` + ValidateHeaders bool `toml:"validate_headers"` MetricsLoggingInterval time.Duration `toml:"metrics_logging_interval" default:"5s"` RunOptions RunOptions `toml:"run_option"` Mode int `toml:"mode"` diff --git a/validator/internal/errcodes/errcodes.go b/validator/internal/errcodes/errcodes.go index 6114ea7..c908eba 100644 --- a/validator/internal/errcodes/errcodes.go +++ b/validator/internal/errcodes/errcodes.go @@ -77,6 +77,9 @@ const ( // CompletedWithErrors marks that the process has completed but errors occurred. CompletedWithErrors = 904 + // MissingHeader declares that something in the header is missing + MissingHeader = 906 + // Miscellaneous is used when no other available error code is fitting. Miscellaneous = 999 ) diff --git a/validator/internal/janitor/janitor.go b/validator/internal/janitor/janitor.go index 6b9af88..e9ea488 100644 --- a/validator/internal/janitor/janitor.go +++ b/validator/internal/janitor/janitor.go @@ -18,6 +18,7 @@ package janitor import ( "context" + "encoding/json" "strings" "time" @@ -35,18 +36,19 @@ import ( // Message defines a Message used for processing broker messages. // Essentially, Message decorates broker messages with additional, extracted information. type Message struct { - ID string - Key string - RawAttributes map[string]interface{} - Payload []byte - IngestionTime time.Time - SchemaID string - Version string - Format string + ID string + Key string + RawAttributes map[string]interface{} + Payload []byte + IngestionTime time.Time + SchemaID string + Version string + Format string + HeaderValidation bool } const ( - // AttributeSchemaID is one of the keys expected to beł found in the attributes field of the message. + // AttributeSchemaID is one of the keys expected to be found in the attributes field of the message. // It holds the schema id information concerning the data field of the message AttributeSchemaID = "schemaId" @@ -57,6 +59,18 @@ const ( // AttributeFormat is one of the keys expected to be found in the attributes field of the message. // It holds the format of the data field of the message. AttributeFormat = "format" + + // HeaderValidation is one of the keys that can occur in raw attributes section of header. + // It determines if the header will be validated + HeaderValidation = "headerValidation" + + // AttributeHeaderID is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true. + // It holds the header's schema id that is used to check header validity + AttributeHeaderID = "headerSchemaId" + + // AttributeHeaderVersion is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true. + // It holds the header's schema version that is used to check header validity + AttributeHeaderVersion = "headerVersionId" ) // MessageSchemaPair wraps a Message with the Schema relating to this Message. @@ -65,27 +79,27 @@ type MessageSchemaPair struct { Schema []byte } -// CollectSchema retrieves the schema of the given Message from registry.SchemaRegistry. +// CollectSchema retrieves the schema with the given id and version from registry.SchemaRegistry. // -// If schema retrieval results in registry.ErrNotFound, or Message.SchemaID or Message.Version is an empty string, +// If schema retrieval results in registry.ErrNotFound, or id or version are an empty string, // the Message is put on the results channel with MessageSchemaPair.Schema set to nil. // // The returned error is an instance of OpError for improved error handling (so that the source of this error is identifiable // even if combined with other errors). -func CollectSchema(ctx context.Context, message Message, schemaRegistry registry.SchemaRegistry) (MessageSchemaPair, error) { - if message.SchemaID == "" || message.Version == "" { - return MessageSchemaPair{Message: message, Schema: nil}, nil +func CollectSchema(ctx context.Context, id string, version string, schemaRegistry registry.SchemaRegistry) ([]byte, error) { + if id == "" || version == "" { + return nil, nil } - schema, err := schemaRegistry.Get(ctx, message.SchemaID, message.Version) + schema, err := schemaRegistry.Get(ctx, id, version) if err != nil { if errors.Is(err, registry.ErrNotFound) { - return MessageSchemaPair{Message: message, Schema: nil}, nil + return nil, nil } - return MessageSchemaPair{}, intoOpErr(message.ID, errcodes.RegistryUnresponsive, err) + return nil, intoOpErr(id, errcodes.RegistryUnresponsive, err) } - return MessageSchemaPair{Message: message, Schema: schema}, nil + return schema, nil } // Validators is a convenience type for a map containing validator.Validator instances for available message formats. @@ -103,6 +117,78 @@ func (vs Validators) Validate(message Message, schema []byte) (bool, error) { return v.Validate(message.Payload, schema, message.SchemaID, message.Version) } +func GetHeaderIdAndVersion(message Message) (string, string, error) { + var id string + var version string + var ok bool + + if id, ok = message.RawAttributes[AttributeHeaderID].(string); !ok { + return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header ID")) + } + if version, ok = message.RawAttributes[AttributeHeaderVersion].(string); !ok { + return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header version")) + } + return id, version, nil +} + +func ValidateHeader(message Message, schema []byte, validators Validators) (bool, error) { + if len(schema) == 0 { + errMissingHeaderSchema := errors.WithMessage(validator.ErrMissingHeaderSchema, "") + message.RawAttributes["deadLetterErrorCategory"] = "Header schema error" + message.RawAttributes["deadLetterErrorReason"] = errMissingHeaderSchema.Error() + return false, nil + } + headerData, err := generateHeaderData(message.RawAttributes) + if err != nil { + message.RawAttributes["deadLetterErrorCategory"] = "Header schema error" + message.RawAttributes["deadLetterErrorReason"] = err.Error() + return false, err + } + + headerSchemaId, ok := message.RawAttributes[AttributeHeaderID].(string) + if !ok { + return false, errors.New("header ID is not in a supported format") + } + headerSchemaVersion, ok := message.RawAttributes[AttributeHeaderVersion].(string) + if !ok { + return false, errors.New("header version is not in a supported format") + } + + isValid, err := validators["json"].Validate(headerData, schema, headerSchemaId, headerSchemaVersion) + if err != nil { + if errors.Is(err, validator.ErrFailedValidation) { + message.RawAttributes["deadLetterErrorCategory"] = "Header validation error" + message.RawAttributes["deadLetterErrorReason"] = errors.WithMessage(validator.ErrFailedHeaderValidation, err.Error()) + return false, nil + } else if errors.Is(err, validator.ErrDeadletter) { + return false, nil + } + return false, intoOpErr(message.ID, errcodes.ValidationFailure, err) + } + + if isValid { + return true, nil + } + return false, nil +} + +func generateHeaderData(rawAttributes map[string]interface{}) ([]byte, error) { + cleanAttributes := make(map[string]interface{}) + for key, value := range rawAttributes { + if key == HeaderValidation || key == AttributeHeaderID || key == AttributeHeaderVersion || + key == AttributeSchemaID || key == AttributeSchemaVersion || key == AttributeFormat { + continue + } else { + cleanAttributes[key] = value + } + } + headerData, err := json.Marshal(cleanAttributes) + if err != nil { + return nil, err + } + return headerData, nil +} + // SchemaGenerators is a convenience type for a map containing schemagen.Generator instances for available message formats. type SchemaGenerators map[string]schemagen.Generator diff --git a/validator/internal/janitor/parse.go b/validator/internal/janitor/parse.go index 1ac9e55..42657a6 100644 --- a/validator/internal/janitor/parse.go +++ b/validator/internal/janitor/parse.go @@ -15,8 +15,8 @@ package janitor import ( - "github.com/dataphos/schema-registry-validator/internal/errtemplates" "github.com/dataphos/lib-streamproc/pkg/streamproc" + "github.com/dataphos/schema-registry-validator/internal/errtemplates" ) // ParseMessage parses a given broker.Message into Message, by setting Message.Payload to the value of the data field of the given diff --git a/validator/internal/janitorctl/run.go b/validator/internal/janitorctl/run.go index 35b112d..f719707 100644 --- a/validator/internal/janitorctl/run.go +++ b/validator/internal/janitorctl/run.go @@ -18,16 +18,16 @@ import ( "context" "runtime/debug" + "github.com/dataphos/lib-brokers/pkg/broker" + "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/lib-logger/standardlogger" + "github.com/dataphos/lib-shutdown/pkg/graceful" "github.com/dataphos/schema-registry-validator/internal/centralconsumer" "github.com/dataphos/schema-registry-validator/internal/config" "github.com/dataphos/schema-registry-validator/internal/errcodes" "github.com/dataphos/schema-registry-validator/internal/janitor" "github.com/dataphos/schema-registry-validator/internal/pullercleaner" "github.com/dataphos/schema-registry-validator/internal/registry" - "github.com/dataphos/lib-brokers/pkg/broker" - "github.com/dataphos/lib-logger/logger" - "github.com/dataphos/lib-logger/standardlogger" - "github.com/dataphos/lib-shutdown/pkg/graceful" ) type ProcessorInitFunc func(context.Context, registry.SchemaRegistry, broker.Publisher) (*janitor.Processor, error) @@ -86,6 +86,7 @@ func RunCentralConsumer(configFile string) { centralconsumer.Settings{ NumSchemaCollectors: cfg.NumSchemaCollectors, NumInferrers: cfg.NumInferrers, + ValidateHeaders: cfg.ValidateHeaders, }, log, centralconsumer.RouterFlags{ diff --git a/validator/internal/validator/validator.go b/validator/internal/validator/validator.go index 82ccaac..ce74d22 100644 --- a/validator/internal/validator/validator.go +++ b/validator/internal/validator/validator.go @@ -24,13 +24,19 @@ var ErrDeadletter = errors.New("deadletter") var ErrBrokenMessage = errors.New("Message is not in valid format") // ErrWrongCompile is a special error type to help distinguish messages that had fault while compiling. -var ErrWrongCompile = errors.New("There is an error while compiling.") +var ErrWrongCompile = errors.New("There is an error while compiling") // ErrMissingSchema is a special error type to help distinguish messages that are missing schema. var ErrMissingSchema = errors.New("Message is missing a schema") +// ErrMissingHeaderSchema is a special error type to help distinguish headers that are missing schema (header validation). +var ErrMissingHeaderSchema = errors.New("Header is missing a schema") + // ErrFailedValidation is a special error type to help distinguish messages that have failed in validation. -var ErrFailedValidation = errors.New("An error occured while validating message.") +var ErrFailedValidation = errors.New("An error occured while validating message") + +// ErrFailedHeaderValidation is a special error type to help distinguish messages' headers that have failed in validation. +var ErrFailedHeaderValidation = errors.New("An error occured while validating message's header") // Validator is the interface used to model message validators. type Validator interface {