From f3f5b2e55286a47e06f4da78abe725cf83812411 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 9 Oct 2024 15:58:20 +0200 Subject: [PATCH 01/17] style: add license headers (#4) Co-authored-by: github-actions[bot] --- .../external/compatibility-checker/pom.xml | 16 ++++++++++++++++ .../validity/external/validity-checker/pom.xml | 16 ++++++++++++++++ .../validator/external/csv-validator/pom.xml | 16 ++++++++++++++++ .../internal/validator/xml/testdata/data-2.xml | 16 ++++++++++++++++ .../internal/validator/xml/testdata/data-3.xml | 16 ++++++++++++++++ .../validator/xml/testdata/deadletter-1-data.xml | 16 ++++++++++++++++ .../validator/xml/testdata/deadletter-2-data.xml | 16 ++++++++++++++++ .../validator/xml/testdata/deadletter-3-data.xml | 16 ++++++++++++++++ .../validator/xml/testdata/invalid-1-data.xml | 16 ++++++++++++++++ .../validator/xml/testdata/invalid-2-data.xml | 16 ++++++++++++++++ .../validator/xml/testdata/valid-1-data.xml | 16 ++++++++++++++++ .../validator/xml/testdata/valid-2-data.xml | 16 ++++++++++++++++ .../validator/xml/testdata/valid-3-data.xml | 16 ++++++++++++++++ 13 files changed, 208 insertions(+) diff --git a/registry/compatibility/external/compatibility-checker/pom.xml b/registry/compatibility/external/compatibility-checker/pom.xml index 8490adb..5311f0a 100644 --- a/registry/compatibility/external/compatibility-checker/pom.xml +++ b/registry/compatibility/external/compatibility-checker/pom.xml @@ -1,4 +1,20 @@ + + + + + + 4.0.0 diff --git a/validator/internal/validator/xml/testdata/data-2.xml b/validator/internal/validator/xml/testdata/data-2.xml index 3c214b0..d710b0e 100644 --- a/validator/internal/validator/xml/testdata/data-2.xml +++ b/validator/internal/validator/xml/testdata/data-2.xml @@ -1,4 +1,20 @@ + + All my boss diff --git a/validator/internal/validator/xml/testdata/data-3.xml b/validator/internal/validator/xml/testdata/data-3.xml index 4a76903..137fec4 100644 --- a/validator/internal/validator/xml/testdata/data-3.xml +++ b/validator/internal/validator/xml/testdata/data-3.xml @@ -1,4 +1,20 @@ + + My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/deadletter-1-data.xml b/validator/internal/validator/xml/testdata/deadletter-1-data.xml index c91f644..25f065b 100644 --- a/validator/internal/validator/xml/testdata/deadletter-1-data.xml +++ b/validator/internal/validator/xml/testdata/deadletter-1-data.xml @@ -1,4 +1,20 @@ + + beginnersbook> My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/deadletter-2-data.xml b/validator/internal/validator/xml/testdata/deadletter-2-data.xml index 4a76903..137fec4 100644 --- a/validator/internal/validator/xml/testdata/deadletter-2-data.xml +++ b/validator/internal/validator/xml/testdata/deadletter-2-data.xml @@ -1,4 +1,20 @@ + + My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/deadletter-3-data.xml b/validator/internal/validator/xml/testdata/deadletter-3-data.xml index 1ec13da..9a19d60 100644 --- a/validator/internal/validator/xml/testdata/deadletter-3-data.xml +++ b/validator/internal/validator/xml/testdata/deadletter-3-data.xml @@ -1,4 +1,20 @@ + + My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/invalid-1-data.xml b/validator/internal/validator/xml/testdata/invalid-1-data.xml index 3c214b0..d710b0e 100644 --- a/validator/internal/validator/xml/testdata/invalid-1-data.xml +++ b/validator/internal/validator/xml/testdata/invalid-1-data.xml @@ -1,4 +1,20 @@ + + All my boss diff --git a/validator/internal/validator/xml/testdata/invalid-2-data.xml b/validator/internal/validator/xml/testdata/invalid-2-data.xml index 6f1d49a..0e42824 100644 --- a/validator/internal/validator/xml/testdata/invalid-2-data.xml +++ b/validator/internal/validator/xml/testdata/invalid-2-data.xml @@ -1,4 +1,20 @@ + + Myb invalid msg l53222a diff --git a/validator/internal/validator/xml/testdata/valid-1-data.xml b/validator/internal/validator/xml/testdata/valid-1-data.xml index 4a76903..137fec4 100644 --- a/validator/internal/validator/xml/testdata/valid-1-data.xml +++ b/validator/internal/validator/xml/testdata/valid-1-data.xml @@ -1,4 +1,20 @@ + + My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/valid-2-data.xml b/validator/internal/validator/xml/testdata/valid-2-data.xml index 984f1d9..fc45a20 100644 --- a/validator/internal/validator/xml/testdata/valid-2-data.xml +++ b/validator/internal/validator/xml/testdata/valid-2-data.xml @@ -1,4 +1,20 @@ + + My team myselfmeandI diff --git a/validator/internal/validator/xml/testdata/valid-3-data.xml b/validator/internal/validator/xml/testdata/valid-3-data.xml index 5aad282..7ed036f 100644 --- a/validator/internal/validator/xml/testdata/valid-3-data.xml +++ b/validator/internal/validator/xml/testdata/valid-3-data.xml @@ -1,4 +1,20 @@ + + All Me From cf0332450e80d1579150c405ce103c239abd57c2 Mon Sep 17 00:00:00 2001 From: "AzureAD\\LukaAbramusic" Date: Thu, 10 Oct 2024 13:40:15 +0200 Subject: [PATCH 02/17] style: remove headers from xml files --- .../external/compatibility-checker/pom.xml | 16 ---------------- .../validity/external/validity-checker/pom.xml | 16 ---------------- .../validator/external/csv-validator/pom.xml | 16 ---------------- .../internal/validator/xml/testdata/data-2.xml | 16 ---------------- .../internal/validator/xml/testdata/data-3.xml | 16 ---------------- .../validator/xml/testdata/deadletter-1-data.xml | 16 ---------------- .../validator/xml/testdata/deadletter-2-data.xml | 16 ---------------- .../validator/xml/testdata/deadletter-3-data.xml | 16 ---------------- .../validator/xml/testdata/invalid-1-data.xml | 16 ---------------- .../validator/xml/testdata/invalid-2-data.xml | 16 ---------------- .../validator/xml/testdata/valid-1-data.xml | 16 ---------------- .../validator/xml/testdata/valid-2-data.xml | 16 ---------------- .../validator/xml/testdata/valid-3-data.xml | 16 ---------------- 13 files changed, 208 deletions(-) diff --git a/registry/compatibility/external/compatibility-checker/pom.xml b/registry/compatibility/external/compatibility-checker/pom.xml index 5311f0a..8490adb 100644 --- a/registry/compatibility/external/compatibility-checker/pom.xml +++ b/registry/compatibility/external/compatibility-checker/pom.xml @@ -1,20 +1,4 @@ - - - - - - 4.0.0 diff --git a/validator/internal/validator/xml/testdata/data-2.xml b/validator/internal/validator/xml/testdata/data-2.xml index d710b0e..3c214b0 100644 --- a/validator/internal/validator/xml/testdata/data-2.xml +++ b/validator/internal/validator/xml/testdata/data-2.xml @@ -1,20 +1,4 @@ - - All my boss diff --git a/validator/internal/validator/xml/testdata/data-3.xml b/validator/internal/validator/xml/testdata/data-3.xml index 137fec4..4a76903 100644 --- a/validator/internal/validator/xml/testdata/data-3.xml +++ b/validator/internal/validator/xml/testdata/data-3.xml @@ -1,20 +1,4 @@ - - My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/deadletter-1-data.xml b/validator/internal/validator/xml/testdata/deadletter-1-data.xml index 25f065b..c91f644 100644 --- a/validator/internal/validator/xml/testdata/deadletter-1-data.xml +++ b/validator/internal/validator/xml/testdata/deadletter-1-data.xml @@ -1,20 +1,4 @@ - - beginnersbook> My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/deadletter-2-data.xml b/validator/internal/validator/xml/testdata/deadletter-2-data.xml index 137fec4..4a76903 100644 --- a/validator/internal/validator/xml/testdata/deadletter-2-data.xml +++ b/validator/internal/validator/xml/testdata/deadletter-2-data.xml @@ -1,20 +1,4 @@ - - My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/deadletter-3-data.xml b/validator/internal/validator/xml/testdata/deadletter-3-data.xml index 9a19d60..1ec13da 100644 --- a/validator/internal/validator/xml/testdata/deadletter-3-data.xml +++ b/validator/internal/validator/xml/testdata/deadletter-3-data.xml @@ -1,20 +1,4 @@ - - My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/invalid-1-data.xml b/validator/internal/validator/xml/testdata/invalid-1-data.xml index d710b0e..3c214b0 100644 --- a/validator/internal/validator/xml/testdata/invalid-1-data.xml +++ b/validator/internal/validator/xml/testdata/invalid-1-data.xml @@ -1,20 +1,4 @@ - - All my boss diff --git a/validator/internal/validator/xml/testdata/invalid-2-data.xml b/validator/internal/validator/xml/testdata/invalid-2-data.xml index 0e42824..6f1d49a 100644 --- a/validator/internal/validator/xml/testdata/invalid-2-data.xml +++ b/validator/internal/validator/xml/testdata/invalid-2-data.xml @@ -1,20 +1,4 @@ - - Myb invalid msg l53222a diff --git a/validator/internal/validator/xml/testdata/valid-1-data.xml b/validator/internal/validator/xml/testdata/valid-1-data.xml index 137fec4..4a76903 100644 --- a/validator/internal/validator/xml/testdata/valid-1-data.xml +++ b/validator/internal/validator/xml/testdata/valid-1-data.xml @@ -1,20 +1,4 @@ - - My Readers Chaitanya diff --git a/validator/internal/validator/xml/testdata/valid-2-data.xml b/validator/internal/validator/xml/testdata/valid-2-data.xml index fc45a20..984f1d9 100644 --- a/validator/internal/validator/xml/testdata/valid-2-data.xml +++ b/validator/internal/validator/xml/testdata/valid-2-data.xml @@ -1,20 +1,4 @@ - - My team myselfmeandI diff --git a/validator/internal/validator/xml/testdata/valid-3-data.xml b/validator/internal/validator/xml/testdata/valid-3-data.xml index 7ed036f..5aad282 100644 --- a/validator/internal/validator/xml/testdata/valid-3-data.xml +++ b/validator/internal/validator/xml/testdata/valid-3-data.xml @@ -1,20 +1,4 @@ - - All Me From 92796e4887339d3a82d701d6c60de08d539340e9 Mon Sep 17 00:00:00 2001 From: Luka Abramusic <91877528+labramusiq@users.noreply.github.com> Date: Thu, 10 Oct 2024 13:47:39 +0200 Subject: [PATCH 03/17] ci: fix tag_exists condition (#12) --- .github/workflows/push.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/push.yaml b/.github/workflows/push.yaml index 1ecf352..67d767d 100644 --- a/.github/workflows/push.yaml +++ b/.github/workflows/push.yaml @@ -132,23 +132,23 @@ jobs: fi - name: Tag and Push Docker image - if: ${{ env.TAG_EXISTS }} == 'false' + if: ${{ env.TAG_EXISTS == 'false' }} run: | docker tag ${{ matrix.component.image-name }}:${{ env.TAG }} syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }} docker push syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }} - name: Install cosign - if: ${{ env.TAG_EXISTS }} == 'false' + if: ${{ env.TAG_EXISTS == 'false' }} uses: sigstore/cosign-installer@v3.6.0 - name: Sign the Docker image - if: ${{ env.TAG_EXISTS }} == 'false' + if: ${{ env.TAG_EXISTS == 'false' }} run: | digest=$(docker inspect --format='{{index .RepoDigests 0}}' syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }}) cosign sign --yes "$digest" - name: Image already exists - if: ${{ env.TAG_EXISTS }} == 'true' + if: ${{ env.TAG_EXISTS == 'true' }} run: echo "Docker image syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }} already exists. Skipping push." release-please: From 8984666d3d1db49ae8a51bc8b3fe8375b6099b0f Mon Sep 17 00:00:00 2001 From: Luka Abramusic <91877528+labramusiq@users.noreply.github.com> Date: Fri, 11 Oct 2024 12:56:46 +0200 Subject: [PATCH 04/17] chore(main): release 1.0.0 (#9) * chore(main): release 1.0.0 * docs: add contributors --- CHANGELOG.md | 27 +++++++++++++++++++++++++++ version.txt | 2 +- 2 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..7aba25b --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,27 @@ +# Changelog + +## 1.0.0 (2024-10-10) + +**Contributors:** +[@andrijadukic-syntio](https://github.com/andrijadukic-syntio), +[@nikolatomazin](https://github.com/nikolatomazin), +[@Simun17](https://github.com/Simun17), +[@jpecaric2](https://github.com/jpecaric2), +[@jkomericki](https://github.com/jkomericki), +[@robe-rt](https://github.com/robe-rt), +[@PetarBersic](https://github.com/PetarBersic), +[@MarijaSokcevic](https://github.com/MarijaSokcevic), +[@StipeZ](https://github.com/StipeZ), +[@NikaPozar](https://github.com/NikaPozar), +[@mia-ravlic](https://github.com/mia-ravlic), +[@AndreaBarbaric](https://github.com/AndreaBarbaric), +[@Lcero98](https://github.com/Lcero98), +[@mkuju95](https://github.com/mkuju95) + +### ⚠ BREAKING CHANGES + +* initial schema-registry implementation ([#1](https://github.com/dataphos/schema-registry/issues/1)) (#5) + +### Features + +* initial schema-registry implementation ([#1](https://github.com/dataphos/schema-registry/issues/1)) ([#5](https://github.com/dataphos/schema-registry/issues/5)) ([beedaef](https://github.com/dataphos/schema-registry/commit/beedaef818b5490a68318cceea077c361a5f8818)) diff --git a/version.txt b/version.txt index 8acdd82..3eefcb9 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.0.1 +1.0.0 From 2a276dcda58d654ab4bfd10c67a674632a82750a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Thu, 7 Nov 2024 13:24:42 +0100 Subject: [PATCH 05/17] feat: custom header validation --- .../centralconsumer/centralconsumer.go | 86 +++++++++--- validator/internal/config/centralconsumer.go | 1 + validator/internal/errcodes/errcodes.go | 3 + validator/internal/janitor/janitor.go | 122 +++++++++++++++--- validator/internal/janitor/parse.go | 2 +- validator/internal/janitorctl/run.go | 9 +- validator/internal/validator/validator.go | 10 +- 7 files changed, 192 insertions(+), 41 deletions(-) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 85fa02e..087d55b 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -19,11 +19,12 @@ import ( "encoding/json" "strconv" + "github.com/dataphos/lib-brokers/pkg/broker" + "github.com/dataphos/lib-logger/logger" "github.com/dataphos/schema-registry-validator/internal/errtemplates" "github.com/dataphos/schema-registry-validator/internal/janitor" "github.com/dataphos/schema-registry-validator/internal/registry" - "github.com/dataphos/lib-brokers/pkg/broker" - "github.com/dataphos/lib-logger/logger" + jsoninternal "github.com/dataphos/schema-registry-validator/internal/validator/json" "github.com/pkg/errors" ) @@ -63,18 +64,19 @@ type VersionDetails struct { // CentralConsumer models the central consumer process. type CentralConsumer struct { - Registry registry.SchemaRegistry - Validators janitor.Validators - Router janitor.Router - Publisher broker.Publisher - topicIDs Topics - topics map[string]broker.Topic - registrySem chan struct{} - validatorsSem chan struct{} - log logger.Log - mode Mode - schema Schema - encryptionKey string + Registry registry.SchemaRegistry + Validators janitor.Validators + Router janitor.Router + Publisher broker.Publisher + topicIDs Topics + topics map[string]broker.Topic + registrySem chan struct{} + validatorsSem chan struct{} + log logger.Log + mode Mode + schema Schema + encryptionKey string + validateHeaders bool } // Settings holds settings concerning the concurrency limits for various stages of the central consumer pipeline. @@ -84,6 +86,9 @@ type Settings struct { // NumInferrers defines the maximum amount of inflight destination topic inference jobs (validation and routing). NumInferrers int + + // ValidateHeaders defines if the messages' headers will be validated + ValidateHeaders bool } // Topics defines the standard destination topics, based on validation results. @@ -122,6 +127,13 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator if settings.NumInferrers > 0 { validatorsSem = make(chan struct{}, settings.NumInferrers) } + if settings.ValidateHeaders == true { + _, ok := validators["json"] + if !ok { + // if json validation is turned off, this version of json validator is used by default for validating message header + validators["json"] = jsoninternal.New() + } + } var schemaReturned []byte if mode == OneCCPerTopic { @@ -171,7 +183,8 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator }, Specification: schemaVersion.Specification, }, - encryptionKey: encryptionKey, + encryptionKey: encryptionKey, + validateHeaders: settings.ValidateHeaders, }, nil } @@ -286,6 +299,7 @@ func (cc *CentralConsumer) AsProcessor() *janitor.Processor { func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) (janitor.MessageTopicPair, error) { var ( + schema []byte messageSchemaPair janitor.MessageSchemaPair messageTopicPair janitor.MessageTopicPair specificSchemaVersion VersionDetails @@ -293,14 +307,54 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) encryptedMessageData []byte ) + // header validation is turned on if a message specifies so in the header OR if validateHeaders flag is set + // on the Validator level + if message.RawAttributes[janitor.HeaderValidation] == "true" || + (cc.validateHeaders == true && message.RawAttributes[janitor.HeaderValidation] != "false") { + _, ok := cc.Validators["json"] + // it is possible json validator isn't initialized by this point so we are checking it just in case + if !ok { + cc.Validators["json"] = jsoninternal.New() + } + var ( + headerId string + headerVersion string + headerSchema []byte + ) + + headerId, headerVersion, err = janitor.GetHeaderIdAndVersion(message) + if err != nil { + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } + acquireIfSet(cc.registrySem) + headerSchema, err = janitor.CollectSchema(ctx, headerId, headerVersion, cc.Registry) + if err != nil { + setMessageRawAttributes(message, err, "Wrong compile") + releaseIfSet(cc.registrySem) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } + releaseIfSet(cc.registrySem) + messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: headerSchema} + + var isValid bool + isValid, err = janitor.ValidateHeader(message, headerSchema, cc.Validators) + if err != nil { + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } + if !isValid { + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } + } + if cc.mode == Default { acquireIfSet(cc.registrySem) - messageSchemaPair, err = janitor.CollectSchema(ctx, message, cc.Registry) + schema, err = janitor.CollectSchema(ctx, message.SchemaID, message.Version, cc.Registry) if err != nil { setMessageRawAttributes(message, err, "Wrong compile") releaseIfSet(cc.registrySem) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } + messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: schema} releaseIfSet(cc.registrySem) messageTopicPair, err = cc.getMessageTopicPair(messageSchemaPair, encryptedMessageData) diff --git a/validator/internal/config/centralconsumer.go b/validator/internal/config/centralconsumer.go index c6d264d..4545bb0 100644 --- a/validator/internal/config/centralconsumer.go +++ b/validator/internal/config/centralconsumer.go @@ -30,6 +30,7 @@ type CentralConsumer struct { ShouldLog CentralConsumerShouldLog `toml:"should_log"` NumSchemaCollectors int `toml:"num_schema_collectors" default:"-1"` NumInferrers int `toml:"num_inferrers" default:"-1"` + ValidateHeaders bool `toml:"validate_headers"` MetricsLoggingInterval time.Duration `toml:"metrics_logging_interval" default:"5s"` RunOptions RunOptions `toml:"run_option"` Mode int `toml:"mode"` diff --git a/validator/internal/errcodes/errcodes.go b/validator/internal/errcodes/errcodes.go index 6114ea7..c908eba 100644 --- a/validator/internal/errcodes/errcodes.go +++ b/validator/internal/errcodes/errcodes.go @@ -77,6 +77,9 @@ const ( // CompletedWithErrors marks that the process has completed but errors occurred. CompletedWithErrors = 904 + // MissingHeader declares that something in the header is missing + MissingHeader = 906 + // Miscellaneous is used when no other available error code is fitting. Miscellaneous = 999 ) diff --git a/validator/internal/janitor/janitor.go b/validator/internal/janitor/janitor.go index 6b9af88..e9ea488 100644 --- a/validator/internal/janitor/janitor.go +++ b/validator/internal/janitor/janitor.go @@ -18,6 +18,7 @@ package janitor import ( "context" + "encoding/json" "strings" "time" @@ -35,18 +36,19 @@ import ( // Message defines a Message used for processing broker messages. // Essentially, Message decorates broker messages with additional, extracted information. type Message struct { - ID string - Key string - RawAttributes map[string]interface{} - Payload []byte - IngestionTime time.Time - SchemaID string - Version string - Format string + ID string + Key string + RawAttributes map[string]interface{} + Payload []byte + IngestionTime time.Time + SchemaID string + Version string + Format string + HeaderValidation bool } const ( - // AttributeSchemaID is one of the keys expected to beł found in the attributes field of the message. + // AttributeSchemaID is one of the keys expected to be found in the attributes field of the message. // It holds the schema id information concerning the data field of the message AttributeSchemaID = "schemaId" @@ -57,6 +59,18 @@ const ( // AttributeFormat is one of the keys expected to be found in the attributes field of the message. // It holds the format of the data field of the message. AttributeFormat = "format" + + // HeaderValidation is one of the keys that can occur in raw attributes section of header. + // It determines if the header will be validated + HeaderValidation = "headerValidation" + + // AttributeHeaderID is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true. + // It holds the header's schema id that is used to check header validity + AttributeHeaderID = "headerSchemaId" + + // AttributeHeaderVersion is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true. + // It holds the header's schema version that is used to check header validity + AttributeHeaderVersion = "headerVersionId" ) // MessageSchemaPair wraps a Message with the Schema relating to this Message. @@ -65,27 +79,27 @@ type MessageSchemaPair struct { Schema []byte } -// CollectSchema retrieves the schema of the given Message from registry.SchemaRegistry. +// CollectSchema retrieves the schema with the given id and version from registry.SchemaRegistry. // -// If schema retrieval results in registry.ErrNotFound, or Message.SchemaID or Message.Version is an empty string, +// If schema retrieval results in registry.ErrNotFound, or id or version are an empty string, // the Message is put on the results channel with MessageSchemaPair.Schema set to nil. // // The returned error is an instance of OpError for improved error handling (so that the source of this error is identifiable // even if combined with other errors). -func CollectSchema(ctx context.Context, message Message, schemaRegistry registry.SchemaRegistry) (MessageSchemaPair, error) { - if message.SchemaID == "" || message.Version == "" { - return MessageSchemaPair{Message: message, Schema: nil}, nil +func CollectSchema(ctx context.Context, id string, version string, schemaRegistry registry.SchemaRegistry) ([]byte, error) { + if id == "" || version == "" { + return nil, nil } - schema, err := schemaRegistry.Get(ctx, message.SchemaID, message.Version) + schema, err := schemaRegistry.Get(ctx, id, version) if err != nil { if errors.Is(err, registry.ErrNotFound) { - return MessageSchemaPair{Message: message, Schema: nil}, nil + return nil, nil } - return MessageSchemaPair{}, intoOpErr(message.ID, errcodes.RegistryUnresponsive, err) + return nil, intoOpErr(id, errcodes.RegistryUnresponsive, err) } - return MessageSchemaPair{Message: message, Schema: schema}, nil + return schema, nil } // Validators is a convenience type for a map containing validator.Validator instances for available message formats. @@ -103,6 +117,78 @@ func (vs Validators) Validate(message Message, schema []byte) (bool, error) { return v.Validate(message.Payload, schema, message.SchemaID, message.Version) } +func GetHeaderIdAndVersion(message Message) (string, string, error) { + var id string + var version string + var ok bool + + if id, ok = message.RawAttributes[AttributeHeaderID].(string); !ok { + return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header ID")) + } + if version, ok = message.RawAttributes[AttributeHeaderVersion].(string); !ok { + return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header version")) + } + return id, version, nil +} + +func ValidateHeader(message Message, schema []byte, validators Validators) (bool, error) { + if len(schema) == 0 { + errMissingHeaderSchema := errors.WithMessage(validator.ErrMissingHeaderSchema, "") + message.RawAttributes["deadLetterErrorCategory"] = "Header schema error" + message.RawAttributes["deadLetterErrorReason"] = errMissingHeaderSchema.Error() + return false, nil + } + headerData, err := generateHeaderData(message.RawAttributes) + if err != nil { + message.RawAttributes["deadLetterErrorCategory"] = "Header schema error" + message.RawAttributes["deadLetterErrorReason"] = err.Error() + return false, err + } + + headerSchemaId, ok := message.RawAttributes[AttributeHeaderID].(string) + if !ok { + return false, errors.New("header ID is not in a supported format") + } + headerSchemaVersion, ok := message.RawAttributes[AttributeHeaderVersion].(string) + if !ok { + return false, errors.New("header version is not in a supported format") + } + + isValid, err := validators["json"].Validate(headerData, schema, headerSchemaId, headerSchemaVersion) + if err != nil { + if errors.Is(err, validator.ErrFailedValidation) { + message.RawAttributes["deadLetterErrorCategory"] = "Header validation error" + message.RawAttributes["deadLetterErrorReason"] = errors.WithMessage(validator.ErrFailedHeaderValidation, err.Error()) + return false, nil + } else if errors.Is(err, validator.ErrDeadletter) { + return false, nil + } + return false, intoOpErr(message.ID, errcodes.ValidationFailure, err) + } + + if isValid { + return true, nil + } + return false, nil +} + +func generateHeaderData(rawAttributes map[string]interface{}) ([]byte, error) { + cleanAttributes := make(map[string]interface{}) + for key, value := range rawAttributes { + if key == HeaderValidation || key == AttributeHeaderID || key == AttributeHeaderVersion || + key == AttributeSchemaID || key == AttributeSchemaVersion || key == AttributeFormat { + continue + } else { + cleanAttributes[key] = value + } + } + headerData, err := json.Marshal(cleanAttributes) + if err != nil { + return nil, err + } + return headerData, nil +} + // SchemaGenerators is a convenience type for a map containing schemagen.Generator instances for available message formats. type SchemaGenerators map[string]schemagen.Generator diff --git a/validator/internal/janitor/parse.go b/validator/internal/janitor/parse.go index 1ac9e55..42657a6 100644 --- a/validator/internal/janitor/parse.go +++ b/validator/internal/janitor/parse.go @@ -15,8 +15,8 @@ package janitor import ( - "github.com/dataphos/schema-registry-validator/internal/errtemplates" "github.com/dataphos/lib-streamproc/pkg/streamproc" + "github.com/dataphos/schema-registry-validator/internal/errtemplates" ) // ParseMessage parses a given broker.Message into Message, by setting Message.Payload to the value of the data field of the given diff --git a/validator/internal/janitorctl/run.go b/validator/internal/janitorctl/run.go index 35b112d..f719707 100644 --- a/validator/internal/janitorctl/run.go +++ b/validator/internal/janitorctl/run.go @@ -18,16 +18,16 @@ import ( "context" "runtime/debug" + "github.com/dataphos/lib-brokers/pkg/broker" + "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/lib-logger/standardlogger" + "github.com/dataphos/lib-shutdown/pkg/graceful" "github.com/dataphos/schema-registry-validator/internal/centralconsumer" "github.com/dataphos/schema-registry-validator/internal/config" "github.com/dataphos/schema-registry-validator/internal/errcodes" "github.com/dataphos/schema-registry-validator/internal/janitor" "github.com/dataphos/schema-registry-validator/internal/pullercleaner" "github.com/dataphos/schema-registry-validator/internal/registry" - "github.com/dataphos/lib-brokers/pkg/broker" - "github.com/dataphos/lib-logger/logger" - "github.com/dataphos/lib-logger/standardlogger" - "github.com/dataphos/lib-shutdown/pkg/graceful" ) type ProcessorInitFunc func(context.Context, registry.SchemaRegistry, broker.Publisher) (*janitor.Processor, error) @@ -86,6 +86,7 @@ func RunCentralConsumer(configFile string) { centralconsumer.Settings{ NumSchemaCollectors: cfg.NumSchemaCollectors, NumInferrers: cfg.NumInferrers, + ValidateHeaders: cfg.ValidateHeaders, }, log, centralconsumer.RouterFlags{ diff --git a/validator/internal/validator/validator.go b/validator/internal/validator/validator.go index 82ccaac..ce74d22 100644 --- a/validator/internal/validator/validator.go +++ b/validator/internal/validator/validator.go @@ -24,13 +24,19 @@ var ErrDeadletter = errors.New("deadletter") var ErrBrokenMessage = errors.New("Message is not in valid format") // ErrWrongCompile is a special error type to help distinguish messages that had fault while compiling. -var ErrWrongCompile = errors.New("There is an error while compiling.") +var ErrWrongCompile = errors.New("There is an error while compiling") // ErrMissingSchema is a special error type to help distinguish messages that are missing schema. var ErrMissingSchema = errors.New("Message is missing a schema") +// ErrMissingHeaderSchema is a special error type to help distinguish headers that are missing schema (header validation). +var ErrMissingHeaderSchema = errors.New("Header is missing a schema") + // ErrFailedValidation is a special error type to help distinguish messages that have failed in validation. -var ErrFailedValidation = errors.New("An error occured while validating message.") +var ErrFailedValidation = errors.New("An error occured while validating message") + +// ErrFailedHeaderValidation is a special error type to help distinguish messages' headers that have failed in validation. +var ErrFailedHeaderValidation = errors.New("An error occured while validating message's header") // Validator is the interface used to model message validators. type Validator interface { From 470382a03c6d0f90059e7fd815d9fe6518d24e94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:50:41 +0100 Subject: [PATCH 06/17] feat: custom header validation --- validator/internal/centralconsumer/centralconsumer.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 087d55b..7534b16 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -127,7 +127,7 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator if settings.NumInferrers > 0 { validatorsSem = make(chan struct{}, settings.NumInferrers) } - if settings.ValidateHeaders == true { + if settings.ValidateHeaders { _, ok := validators["json"] if !ok { // if json validation is turned off, this version of json validator is used by default for validating message header @@ -310,7 +310,7 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) // header validation is turned on if a message specifies so in the header OR if validateHeaders flag is set // on the Validator level if message.RawAttributes[janitor.HeaderValidation] == "true" || - (cc.validateHeaders == true && message.RawAttributes[janitor.HeaderValidation] != "false") { + (cc.validateHeaders && message.RawAttributes[janitor.HeaderValidation] != "false") { _, ok := cc.Validators["json"] // it is possible json validator isn't initialized by this point so we are checking it just in case if !ok { @@ -334,7 +334,6 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } releaseIfSet(cc.registrySem) - messageSchemaPair = janitor.MessageSchemaPair{Message: message, Schema: headerSchema} var isValid bool isValid, err = janitor.ValidateHeader(message, headerSchema, cc.Validators) From 7c9e625b2434014ea3090a6c0af5ce5bd4f918d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:17:04 +0100 Subject: [PATCH 07/17] fix: log all validation errors instead of only one --- validator/internal/validator/json/json.go | 82 ++++++++++++++++++++--- 1 file changed, 74 insertions(+), 8 deletions(-) diff --git a/validator/internal/validator/json/json.go b/validator/internal/validator/json/json.go index 2e6b2c7..358dfd0 100644 --- a/validator/internal/validator/json/json.go +++ b/validator/internal/validator/json/json.go @@ -17,6 +17,7 @@ package json import ( "bytes" "encoding/json" + _errors "errors" "github.com/pkg/errors" "github.com/dataphos/schema-registry-validator/internal/validator" @@ -29,8 +30,8 @@ import ( func New() validator.Validator { return validator.Func(func(message, schema []byte, _, _ string) (bool, error) { - var v interface{} - if err := json.Unmarshal(message, &v); err != nil { + var parsedMessage interface{} + if err := json.Unmarshal(message, &parsedMessage); err != nil { errBroken := errors.WithMessage(validator.ErrBrokenMessage, "Message is not in a valid format - "+err.Error()) return false, errBroken } @@ -41,10 +42,17 @@ func New() validator.Validator { return false, errCompile } - if err = compiledSchema.Validate(v); err != nil { - errValidation := errors.WithMessage(validator.ErrFailedValidation, err.Error()) - return false, errValidation - + if err = compiledSchema.Validate(parsedMessage); err != nil { + var validationError *jsonschema.ValidationError + ok := _errors.As(err, &validationError) + if !ok { + return false, errors.New("impossible to cast error to *jsonschema.ValidationError") + } + errMessage, err := createErrorMessage(validationError) + if err != nil { + return false, errors.New("couldn't create error message") + } + return false, errors.WithMessage(validator.ErrFailedValidation, errMessage) } return true, nil }) @@ -76,13 +84,33 @@ func NewCached(size int) validator.Validator { } if err := compiledSchema.Validate(parsedMessage); err != nil { - errValidation := errors.WithMessage(validator.ErrFailedValidation, err.Error()) - return false, errValidation + var validationError *jsonschema.ValidationError + ok := _errors.As(err, &validationError) + if !ok { + return false, errors.New("impossible to cast error to *jsonschema.ValidationError") + } + errMessage, err := createErrorMessage(validationError) + if err != nil { + return false, errors.New("couldn't create error message") + } + return false, errors.WithMessagef(validator.ErrFailedValidation, errMessage) } return true, nil }) } +func createErrorMessage(validationError *jsonschema.ValidationError) (string, error) { + errorMap := make(map[string]string) + for _, cause := range validationError.Causes { + errorMap[cause.KeywordLocation] = cause.Message + } + errMessage, err := json.Marshal(errorMap) + if err != nil { + return "", err + } + return string(errMessage), nil +} + func compileSchema(schema []byte) (*jsonschema.Schema, error) { compiler := jsonschema.NewCompiler() if err := compiler.AddResource("schema.json", bytes.NewReader(schema)); err != nil { @@ -110,6 +138,13 @@ func NewGoJsonSchemaValidator() validator.Validator { if err != nil { return false, err } + if !result.Valid() { + errMessage, err := createErrorMessageAlt(result.Errors()) + if err != nil { + return false, err + } + return false, errors.WithMessage(validator.ErrFailedValidation, errMessage) + } return result.Valid(), nil }) @@ -141,7 +176,38 @@ func NewCachedGoJsonSchemaValidator(size int) validator.Validator { if err != nil { return false, err } + if !result.Valid() { + errMessage, err := createErrorMessageAlt(result.Errors()) + if err != nil { + return false, err + } + return false, errors.WithMessage(validator.ErrFailedValidation, errMessage) + } return result.Valid(), nil }) } + +func createErrorMessageAlt(validationError []gojsonschema.ResultError) (string, error) { + errorMap := make(map[string]string) + for _, e := range validationError { + key := e.Details()["context"].(string) + var expected, given, value string + var ok1, ok2 bool + if expected, ok1 = e.Details()["expected"].(string); !ok1 { + value = "invalid value" + } + if given, ok2 = e.Details()["given"].(string); !ok2 { + value = "invalid value" + } + if ok1 && ok2 { + value = "expected " + expected + ", given " + given + } + errorMap[key] = value + } + errMessage, err := json.Marshal(errorMap) + if err != nil { + return "", err + } + return string(errMessage), nil +} From 49215876f36c288bd8e49fd04b7217b92c3418ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:43:40 +0100 Subject: [PATCH 08/17] feat: added kafka publisher ack setting --- validator/internal/config/config.go | 1 + validator/internal/janitorctl/init.go | 34 +++++++++++++++++++-------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/validator/internal/config/config.go b/validator/internal/config/config.go index 816be12..bc0949b 100644 --- a/validator/internal/config/config.go +++ b/validator/internal/config/config.go @@ -76,6 +76,7 @@ type KafkaPublisherSettings struct { BatchSize int `toml:"batch_size" default:"40"` BatchBytes int64 `toml:"batch_bytes" default:"5242880"` Linger time.Duration `toml:"linger" default:"10ms"` + Acks int `toml:"kafka_acks" default:"1"` } type EventhubsPublisherSettings struct { diff --git a/validator/internal/janitorctl/init.go b/validator/internal/janitorctl/init.go index cfca880..30c6163 100644 --- a/validator/internal/janitorctl/init.go +++ b/validator/internal/janitorctl/init.go @@ -23,7 +23,18 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/dataphos/lib-brokers/pkg/broker" + "github.com/dataphos/lib-brokers/pkg/broker/jetstream" + "github.com/dataphos/lib-brokers/pkg/broker/kafka" + "github.com/dataphos/lib-brokers/pkg/broker/pubsub" + "github.com/dataphos/lib-brokers/pkg/broker/pulsar" + "github.com/dataphos/lib-brokers/pkg/broker/servicebus" + "github.com/dataphos/lib-brokers/pkg/brokerutil" + "github.com/dataphos/lib-httputil/pkg/httputil" + "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/lib-streamproc/pkg/streamproc" "github.com/dataphos/schema-registry-validator/internal/config" "github.com/dataphos/schema-registry-validator/internal/errcodes" "github.com/dataphos/schema-registry-validator/internal/errtemplates" @@ -40,16 +51,6 @@ import ( "github.com/dataphos/schema-registry-validator/internal/validator/json" "github.com/dataphos/schema-registry-validator/internal/validator/protobuf" "github.com/dataphos/schema-registry-validator/internal/validator/xml" - "github.com/dataphos/lib-brokers/pkg/broker" - "github.com/dataphos/lib-brokers/pkg/broker/jetstream" - "github.com/dataphos/lib-brokers/pkg/broker/kafka" - "github.com/dataphos/lib-brokers/pkg/broker/pubsub" - "github.com/dataphos/lib-brokers/pkg/broker/pulsar" - "github.com/dataphos/lib-brokers/pkg/broker/servicebus" - "github.com/dataphos/lib-brokers/pkg/brokerutil" - "github.com/dataphos/lib-httputil/pkg/httputil" - "github.com/dataphos/lib-logger/logger" - "github.com/dataphos/lib-streamproc/pkg/streamproc" ) // initializeSchemaRegistry gets the janitor implementation of a schema registry, optionally decorating it with an in-memory lru cache @@ -105,6 +106,7 @@ func initializeSchemaRegistry(ctx context.Context, log logger.Log, cfg *config.R func initKafkaPublisher(ctx context.Context, cfg *config.Producer) (broker.Publisher, error) { var tlsConfig *tls.Config var krbConfig *kafka.KerberosConfig + var ack kgo.Acks if cfg.Kafka.TlsConfig.Enabled { var err error tlsConfig, err = httputil.NewTLSConfig(cfg.Kafka.TlsConfig.ClientCertFile, cfg.Kafka.TlsConfig.ClientKeyFile, cfg.Kafka.TlsConfig.CaCertFile) @@ -124,6 +126,17 @@ func initKafkaPublisher(ctx context.Context, cfg *config.Producer) (broker.Publi } } + switch cfg.Kafka.Settings.Acks { + case 0: + ack = kgo.NoAck() + case 1: + ack = kgo.LeaderAck() + case -1: + ack = kgo.AllISRAcks() + default: + ack = kgo.LeaderAck() + } + return kafka.NewPublisher( ctx, kafka.ProducerConfig{ @@ -139,6 +152,7 @@ func initKafkaPublisher(ctx context.Context, cfg *config.Producer) (broker.Publi BatchSize: cfg.Kafka.Settings.BatchSize, BatchBytes: cfg.Kafka.Settings.BatchBytes, Linger: cfg.Kafka.Settings.Linger, + Acks: ack, }, ) } From 6f833ae67deac18039bc66dcd1975b5e69cfa76e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Wed, 13 Nov 2024 10:05:31 +0100 Subject: [PATCH 09/17] fix: missing/invalid headers don't crash Validator anymore --- .../centralconsumer/centralconsumer.go | 51 ++++++++++++++----- validator/internal/errcodes/errcodes.go | 6 +++ validator/internal/janitor/janitor.go | 7 ++- validator/internal/janitor/parse.go | 22 ++++---- validator/internal/janitor/processor.go | 5 +- .../internal/registry/janitorsr/janitorsr.go | 6 ++- validator/internal/registry/registry.go | 1 + validator/internal/validator/validator.go | 3 ++ 8 files changed, 75 insertions(+), 26 deletions(-) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 7534b16..9bb192e 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -21,6 +21,7 @@ import ( "github.com/dataphos/lib-brokers/pkg/broker" "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/schema-registry-validator/internal/errcodes" "github.com/dataphos/schema-registry-validator/internal/errtemplates" "github.com/dataphos/schema-registry-validator/internal/janitor" "github.com/dataphos/schema-registry-validator/internal/registry" @@ -329,7 +330,7 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) acquireIfSet(cc.registrySem) headerSchema, err = janitor.CollectSchema(ctx, headerId, headerVersion, cc.Registry) if err != nil { - setMessageRawAttributes(message, err, "Wrong compile") + setMessageRawAttributes(message, "Wrong compile", err) releaseIfSet(cc.registrySem) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } @@ -349,7 +350,23 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) acquireIfSet(cc.registrySem) schema, err = janitor.CollectSchema(ctx, message.SchemaID, message.Version, cc.Registry) if err != nil { - setMessageRawAttributes(message, err, "Wrong compile") + opError := &janitor.OpError{} + if errors.As(err, &opError) { + if opError.Code == errcodes.RegistryUnresponsive { + setMessageRawAttributes(message, "Registry unresponsive", err) + releaseIfSet(cc.registrySem) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } else if opError.Code == errcodes.SchemaNotRegistered { + setMessageRawAttributes(message, "Schema error", err) + releaseIfSet(cc.registrySem) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } else if opError.Code == errcodes.MissingHeader || opError.Code == errcodes.InvalidHeader { + setMessageRawAttributes(message, "Header error", err) + releaseIfSet(cc.registrySem) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } + } + setMessageRawAttributes(message, "Wrong compile", err) releaseIfSet(cc.registrySem) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } @@ -393,9 +410,19 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) acquireIfSet(cc.registrySem) specificSchemaVersionSpec, err := cc.Registry.Get(ctx, cc.schema.SchemaMetadata.ID, message.Version) if err != nil { - setMessageRawAttributes(message, err, "Wrong compile") - releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + if errors.Is(err, registry.ErrNotFound) { + setMessageRawAttributes(message, "Schema error", err) + releaseIfSet(cc.registrySem) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } else if errors.Is(err, registry.InvalidHeader) { + setMessageRawAttributes(message, "Header error", err) + releaseIfSet(cc.registrySem) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } else { + setMessageRawAttributes(message, "Wrong compile", err) + releaseIfSet(cc.registrySem) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + } } releaseIfSet(cc.registrySem) @@ -404,7 +431,7 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) Specification: specificSchemaVersionSpec, }) if err != nil { - setMessageRawAttributes(message, err, "Non number version") + setMessageRawAttributes(message, "Non number version", err) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } @@ -420,7 +447,7 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) } } else { err = errors.New("unknown CC mode") - setMessageRawAttributes(message, err, "Unknown CC mode") + setMessageRawAttributes(message, "Unknown CC mode", err) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } } @@ -474,12 +501,12 @@ func (cc *CentralConsumer) revalidatedAgainstLatest(ctx context.Context, specifi acquireIfSet(cc.registrySem) specificSchemaVersionBytes, err := cc.Registry.GetLatest(ctx, cc.schema.SchemaMetadata.ID) if err != nil { - setMessageRawAttributes(message, err, "Wrong compile") + setMessageRawAttributes(message, "Wrong compile", err) releaseIfSet(cc.registrySem) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } if err = json.Unmarshal(specificSchemaVersionBytes, &specificSchemaVersion); err != nil { - setMessageRawAttributes(message, err, "Broken message") + setMessageRawAttributes(message, "Broken message", err) releaseIfSet(cc.registrySem) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, errors.Wrap(err, errtemplates.UnmarshallingJSONFailed) } @@ -487,7 +514,7 @@ func (cc *CentralConsumer) revalidatedAgainstLatest(ctx context.Context, specifi err = cc.updateIfNewer(specificSchemaVersion) if err != nil { - setMessageRawAttributes(message, err, "Non number version") + setMessageRawAttributes(message, "Non number version", err) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err } @@ -512,8 +539,8 @@ func (cc *CentralConsumer) updateIfNewer(versionDetails VersionDetails) error { return nil } -func setMessageRawAttributes(message janitor.Message, err error, errMessage string) { - message.RawAttributes["deadLetterErrorCategory"] = errMessage +func setMessageRawAttributes(message janitor.Message, errCategory string, err error) { + message.RawAttributes["deadLetterErrorCategory"] = errCategory message.RawAttributes["deadLetterErrorReason"] = err.Error() } diff --git a/validator/internal/errcodes/errcodes.go b/validator/internal/errcodes/errcodes.go index c908eba..3c002e2 100644 --- a/validator/internal/errcodes/errcodes.go +++ b/validator/internal/errcodes/errcodes.go @@ -77,9 +77,15 @@ const ( // CompletedWithErrors marks that the process has completed but errors occurred. CompletedWithErrors = 904 + // SchemaNotRegistered declares no schema is registered under specified schemaId and versionId + SchemaNotRegistered = 905 + // MissingHeader declares that something in the header is missing MissingHeader = 906 + // InvalidHeader declares that something in the header is broken + InvalidHeader = 907 + // Miscellaneous is used when no other available error code is fitting. Miscellaneous = 999 ) diff --git a/validator/internal/janitor/janitor.go b/validator/internal/janitor/janitor.go index e9ea488..d6fa263 100644 --- a/validator/internal/janitor/janitor.go +++ b/validator/internal/janitor/janitor.go @@ -112,7 +112,7 @@ type Validators map[string]validator.Validator func (vs Validators) Validate(message Message, schema []byte) (bool, error) { v, ok := vs[strings.ToLower(message.Format)] if !ok { - return false, errtemplates.UnsupportedMessageFormat(message.Format) + return false, errors.WithMessage(validator.ErrUnsupportedFormat, errtemplates.UnsupportedMessageFormat(message.Format).Error()) } return v.Validate(message.Payload, schema, message.SchemaID, message.Version) } @@ -271,6 +271,11 @@ func InferDestinationTopic(messageSchemaPair MessageSchemaPair, validators Valid message.RawAttributes["deadLetterErrorReason"] = err.Error() return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil } + if errors.Is(err, validator.ErrUnsupportedFormat) { + message.RawAttributes["deadLetterErrorCategory"] = "Unsupported format" + message.RawAttributes["deadLetterErrorReason"] = err.Error() + return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil + } if errors.Is(err, validator.ErrDeadletter) { return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil } diff --git a/validator/internal/janitor/parse.go b/validator/internal/janitor/parse.go index 42657a6..fb79578 100644 --- a/validator/internal/janitor/parse.go +++ b/validator/internal/janitor/parse.go @@ -55,19 +55,21 @@ func ExtractAttributes(raw map[string]interface{}) (Attributes, error) { var schemaIDStr, versionStr, formatStr string schemaID, ok := raw[AttributeSchemaID] - if ok { - schemaIDStr, ok = schemaID.(string) - if !ok { - return Attributes{}, errtemplates.AttributeNotAString(AttributeSchemaID) - } + if !ok { + return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaID) + } + schemaIDStr, ok = schemaID.(string) + if !ok { + return Attributes{}, errtemplates.AttributeNotAString(AttributeSchemaID) } version, ok := raw[AttributeSchemaVersion] - if ok { - versionStr, ok = version.(string) - if !ok { - return Attributes{}, errtemplates.AttributeNotAString(AttributeSchemaVersion) - } + if !ok { + return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaVersion) + } + versionStr, ok = version.(string) + if !ok { + return Attributes{}, errtemplates.AttributeNotAString(AttributeSchemaVersion) } format, ok := raw[AttributeFormat] if !ok { diff --git a/validator/internal/janitor/processor.go b/validator/internal/janitor/processor.go index 039c121..b8f2263 100644 --- a/validator/internal/janitor/processor.go +++ b/validator/internal/janitor/processor.go @@ -19,11 +19,11 @@ import ( "golang.org/x/sync/errgroup" - "github.com/dataphos/schema-registry-validator/internal/errcodes" "github.com/dataphos/lib-batchproc/pkg/batchproc" "github.com/dataphos/lib-brokers/pkg/broker" "github.com/dataphos/lib-logger/logger" "github.com/dataphos/lib-streamproc/pkg/streamproc" + "github.com/dataphos/schema-registry-validator/internal/errcodes" ) type Processor struct { @@ -84,6 +84,9 @@ func (p *Processor) parseOrSendToDeadletter(ctx context.Context, message streamp p.log.Errorw(err.Error(), errcodes.ParsingMessage, logger.F{ "id": message.ID, }) + if parsed.RawAttributes == nil { + parsed.RawAttributes = make(map[string]interface{}) + } parsed.RawAttributes["deadLetterErrorCategory"] = "Parsing error" parsed.RawAttributes["deadLetterErrorReason"] = err.Error() if err = PublishToTopic(ctx, parsed, p.Topics[p.Deadletter]); err != nil { diff --git a/validator/internal/registry/janitorsr/janitorsr.go b/validator/internal/registry/janitorsr/janitorsr.go index 9b40390..01cdad4 100644 --- a/validator/internal/registry/janitorsr/janitorsr.go +++ b/validator/internal/registry/janitorsr/janitorsr.go @@ -25,10 +25,10 @@ import ( "net/http" "time" - "github.com/dataphos/schema-registry-validator/internal/errtemplates" - "github.com/dataphos/schema-registry-validator/internal/registry" "github.com/dataphos/lib-httputil/pkg/httputil" "github.com/dataphos/lib-retry/pkg/retry" + "github.com/dataphos/schema-registry-validator/internal/errtemplates" + "github.com/dataphos/schema-registry-validator/internal/registry" "github.com/pkg/errors" ) @@ -97,6 +97,8 @@ func (sr *SchemaRegistry) Get(ctx context.Context, id, version string) ([]byte, if response.StatusCode != http.StatusOK { if response.StatusCode == http.StatusNotFound { return nil, errors.Wrapf(registry.ErrNotFound, "fetching schema %s/%s failed", id, version) + } else if response.StatusCode == http.StatusUnprocessableEntity { + return nil, errors.Wrapf(registry.InvalidHeader, "fetching schema %s/%s failed due to invalid type", id, version) } return nil, errors.Wrapf(errtemplates.BadHttpStatusCode(response.StatusCode), "fetching schema %s/%s resulted in a bad status code", id, version) } diff --git a/validator/internal/registry/registry.go b/validator/internal/registry/registry.go index 725f32d..9fd9370 100644 --- a/validator/internal/registry/registry.go +++ b/validator/internal/registry/registry.go @@ -22,6 +22,7 @@ import ( ) var ErrNotFound = errors.New("no schema registered under given id and version") +var InvalidHeader = errors.New("id and/or version are not in supported format") // SchemaRegistry models schema registries. type SchemaRegistry interface { diff --git a/validator/internal/validator/validator.go b/validator/internal/validator/validator.go index ce74d22..b5a18ba 100644 --- a/validator/internal/validator/validator.go +++ b/validator/internal/validator/validator.go @@ -38,6 +38,9 @@ var ErrFailedValidation = errors.New("An error occured while validating message" // ErrFailedHeaderValidation is a special error type to help distinguish messages' headers that have failed in validation. var ErrFailedHeaderValidation = errors.New("An error occured while validating message's header") +// ErrUnsupportedFormat is a special error type to help distinguish messages that are in wrong format +var ErrUnsupportedFormat = errors.New("Message is not in a supported format") + // Validator is the interface used to model message validators. type Validator interface { // Validate takes a message and a schema (along with schema id and version, in case they are needed for optimization purposes) From 8a395b5ce9803188d56de5a717f273d34f8c21f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Wed, 13 Nov 2024 10:20:22 +0100 Subject: [PATCH 10/17] fix: missing/invalid headers don't crash Validator anymore --- registry/registry/repo.go | 1 + registry/registry/repository/postgres/postgres.go | 11 ++++++++++- registry/server/handler.go | 11 ++++++++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/registry/registry/repo.go b/registry/registry/repo.go index 0a95a83..bad18be 100644 --- a/registry/registry/repo.go +++ b/registry/registry/repo.go @@ -23,6 +23,7 @@ var ErrUnknownComp = errors.New("unknown value for compatibility_mode") var ErrUnknownVal = errors.New("unknown value for validity mode") var ErrNotValid = errors.New("schema is not valid") var ErrNotComp = errors.New("schemas are not compatible") +var ErrInvalidValueHeader = errors.New("invalid value header") type Repository interface { CreateSchema(schemaRegisterRequest SchemaRegistrationRequest) (VersionDetails, bool, error) diff --git a/registry/registry/repository/postgres/postgres.go b/registry/registry/repository/postgres/postgres.go index 3baaa8c..02346a2 100644 --- a/registry/registry/repository/postgres/postgres.go +++ b/registry/registry/repository/postgres/postgres.go @@ -42,7 +42,16 @@ func New(db *gorm.DB) *Repository { // Returns registry.ErrNotFound in case there's no schema under the given id and version. func (r *Repository) GetSchemaVersionByIdAndVersion(id, version string) (registry.VersionDetails, error) { var details VersionDetails - if err := r.db.Where("schema_id = ? and version = ? and version_deactivated = ?", id, version, false).Take(&details).Error; err != nil { + var err error + _, err = strconv.Atoi(id) + if err != nil { + return registry.VersionDetails{}, registry.ErrInvalidValueHeader + } + _, err = strconv.Atoi(version) + if err != nil { + return registry.VersionDetails{}, registry.ErrInvalidValueHeader + } + if err = r.db.Where("schema_id = ? and version = ? and version_deactivated = ?", id, version, false).Take(&details).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return registry.VersionDetails{}, registry.ErrNotFound } diff --git a/registry/server/handler.go b/registry/server/handler.go index 3677ac8..68e1210 100644 --- a/registry/server/handler.go +++ b/registry/server/handler.go @@ -27,9 +27,9 @@ import ( "github.com/go-chi/chi/v5" "github.com/pkg/errors" + "github.com/dataphos/lib-logger/logger" "github.com/dataphos/schema-registry/internal/metrics" "github.com/dataphos/schema-registry/registry" - "github.com/dataphos/lib-logger/logger" ) type Handler struct { @@ -89,6 +89,15 @@ func (h Handler) GetSchemaVersionByIdAndVersion(w http.ResponseWriter, r *http.R Code: http.StatusNotFound, }) return + } else if errors.Is(err, registry.ErrInvalidValueHeader) { + body, _ := json.Marshal(report{ + Message: fmt.Sprintf("Id=%v and/or version=%v are not of supported data types", id, version), + }) + writeResponse(w, responseBodyAndCode{ + Body: body, + Code: http.StatusUnprocessableEntity, + }) + return } writeResponse(w, responseBodyAndCode{ From e3ee3a26f35d575f21d17fcadb5bebfe5215ecaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Fri, 15 Nov 2024 09:47:08 +0100 Subject: [PATCH 11/17] fix: compatibility and validity checks display reasons if checks fail --- registry/cmd/initdb/initdb.go | 4 +- registry/compatibility/checker_test.go | 5 +- .../external/compatibility-checker/pom.xml | 5 ++ .../syntio/compatibility/CheckerFactory.java | 1 + .../net/syntio/compatibility/Message.java | 4 +- .../compatibility/checker/AvroChecker.java | 6 +- .../syntio/compatibility/checker/Checker.java | 4 +- .../checker/CompatibilityChecker.java | 2 +- .../compatibility/checker/JsonChecker.java | 15 +++- .../checker/ProtobufChecker.java | 17 ++++- .../controller/CheckerController.java | 37 ++++++---- registry/compatibility/externalChecker.go | 31 ++++++-- registry/compatibility/http/http.go | 14 ++-- registry/internal/errcodes/errcodes.go | 1 + registry/registry/schema.go | 1 - registry/server/handler.go | 13 +++- .../java/net/syntio/validity/SchemaTypes.java | 1 + .../net/syntio/validity/ValidatorFactory.java | 1 + .../net/syntio/validity/checker/Checker.java | 15 ++-- .../controller/CheckerController.java | 19 +++-- .../syntio/validity/dto/CheckRequestDto.java | 1 + registry/validity/externalChecker.go | 70 +++++++------------ registry/validity/http/http.go | 14 ++-- 23 files changed, 166 insertions(+), 115 deletions(-) diff --git a/registry/cmd/initdb/initdb.go b/registry/cmd/initdb/initdb.go index 6f1b097..3c97990 100644 --- a/registry/cmd/initdb/initdb.go +++ b/registry/cmd/initdb/initdb.go @@ -17,11 +17,11 @@ package main import ( "runtime/debug" + "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/lib-logger/standardlogger" "github.com/dataphos/schema-registry/internal/config" "github.com/dataphos/schema-registry/internal/errcodes" "github.com/dataphos/schema-registry/registry/repository/postgres" - "github.com/dataphos/lib-logger/logger" - "github.com/dataphos/lib-logger/standardlogger" ) func main() { diff --git a/registry/compatibility/checker_test.go b/registry/compatibility/checker_test.go index 57cf8cf..17acb83 100644 --- a/registry/compatibility/checker_test.go +++ b/registry/compatibility/checker_test.go @@ -18,7 +18,6 @@ import ( "context" "encoding/base64" "encoding/json" - "fmt" "os" "path/filepath" "runtime" @@ -34,9 +33,7 @@ func TestNew(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - v, err := New(ctx, "http://localhost:8088", 2*time.Second) - - fmt.Println(v.url) + _, err := New(ctx, "http://localhost:8088", 2*time.Second) if err != nil { t.Fatal(err) diff --git a/registry/compatibility/external/compatibility-checker/pom.xml b/registry/compatibility/external/compatibility-checker/pom.xml index 8490adb..480ed10 100644 --- a/registry/compatibility/external/compatibility-checker/pom.xml +++ b/registry/compatibility/external/compatibility-checker/pom.xml @@ -203,6 +203,11 @@ lombok 1.18.22 + + org.apache.avro + avro + 1.11.1 + io.apicurio diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/CheckerFactory.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/CheckerFactory.java index d30fdde..d4baf83 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/CheckerFactory.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/CheckerFactory.java @@ -22,6 +22,7 @@ import net.syntio.compatibility.checker.ProtobufChecker; public class CheckerFactory { + public static CompatibilityChecker createChecker(String format) throws Exception { return switch (format) { case FileTypes.JSON -> new JsonChecker(); diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/Message.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/Message.java index a2d8834..8abaf54 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/Message.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/Message.java @@ -17,6 +17,7 @@ package net.syntio.compatibility; public class Message { + private final String id; private final String format; private final String schema; @@ -27,12 +28,11 @@ public Message(String id, String format, String schema) { this.schema = schema; } - public String getSchema() { return schema; } - public String getID() { + public String getId() { return id; } diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/AvroChecker.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/AvroChecker.java index 048dd62..900890b 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/AvroChecker.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/AvroChecker.java @@ -25,8 +25,9 @@ import java.util.List; public class AvroChecker implements net.syntio.compatibility.checker.CompatibilityChecker { + @Override - public boolean testCompatibility(CompatibilityLevel level, List history, ContentHandle currentSchema) { + public List testCompatibility(CompatibilityLevel level, List history, ContentHandle currentSchema) { io.confluent.kafka.schemaregistry.CompatibilityLevel avroCompatibilityLevel = switch (level) { case NONE -> io.confluent.kafka.schemaregistry.CompatibilityLevel.NONE; case BACKWARD -> io.confluent.kafka.schemaregistry.CompatibilityLevel.BACKWARD; @@ -42,7 +43,7 @@ public boolean testCompatibility(CompatibilityLevel level, List h } AvroSchema newSchema = new AvroSchema(currentSchema.content()); - List issues = switch (avroCompatibilityLevel) { + return switch (avroCompatibilityLevel) { case BACKWARD -> CompatibilityChecker.BACKWARD_CHECKER.isCompatible(newSchema, newHistory); case BACKWARD_TRANSITIVE -> CompatibilityChecker.BACKWARD_TRANSITIVE_CHECKER.isCompatible(newSchema, newHistory); case FORWARD -> CompatibilityChecker.FORWARD_CHECKER.isCompatible(newSchema, newHistory); @@ -51,6 +52,5 @@ public boolean testCompatibility(CompatibilityLevel level, List h case FULL_TRANSITIVE -> CompatibilityChecker.FULL_TRANSITIVE_CHECKER.isCompatible(newSchema, newHistory); case NONE -> CompatibilityChecker.NO_OP_CHECKER.isCompatible(newSchema, newHistory); }; - return issues.isEmpty(); } } diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/Checker.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/Checker.java index e8b735c..646407b 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/Checker.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/Checker.java @@ -25,16 +25,14 @@ import java.util.List; public class Checker { - public static boolean checkCompatibility(Message msg, List history, CompatibilityLevel mode) throws Exception { + public static List checkCompatibility(Message msg, List history, CompatibilityLevel mode) throws Exception { ContentHandle schema = ContentHandle.create(msg.getSchema()); List schemaHistory = new ArrayList<>(); - for (String s : history) { ContentHandle ps = ContentHandle.create(s); schemaHistory.add(ps); } CompatibilityChecker cc = CheckerFactory.createChecker(msg.getFormat()); - return cc.testCompatibility(mode, schemaHistory, schema); } } diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/CompatibilityChecker.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/CompatibilityChecker.java index 9d57a96..7e68f92 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/CompatibilityChecker.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/CompatibilityChecker.java @@ -22,5 +22,5 @@ import java.util.List; public interface CompatibilityChecker { - boolean testCompatibility(CompatibilityLevel level, List history, ContentHandle currentSchema); + List testCompatibility(CompatibilityLevel level, List history, ContentHandle currentSchema); } diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/JsonChecker.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/JsonChecker.java index 23031da..ecb6c0d 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/JsonChecker.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/JsonChecker.java @@ -17,15 +17,26 @@ package net.syntio.compatibility.checker; import io.apicurio.registry.content.ContentHandle; +import io.apicurio.registry.rules.compatibility.CompatibilityDifference; +import io.apicurio.registry.rules.compatibility.CompatibilityExecutionResult; import io.apicurio.registry.rules.compatibility.CompatibilityLevel; import io.apicurio.registry.rules.compatibility.JsonSchemaCompatibilityChecker; +import java.util.ArrayList; import java.util.List; +import java.util.Set; public class JsonChecker implements CompatibilityChecker { + @Override - public boolean testCompatibility(CompatibilityLevel level, List history, ContentHandle currentSchema) { + public List testCompatibility(CompatibilityLevel level, List history, ContentHandle currentSchema) { JsonSchemaCompatibilityChecker cc = new JsonSchemaCompatibilityChecker(); - return cc.testCompatibility(level, history, currentSchema).isCompatible(); + CompatibilityExecutionResult res = cc.testCompatibility(level, history, currentSchema); + Set resSet = res.getIncompatibleDifferences(); + List issuesList = new ArrayList<>(); + for (CompatibilityDifference dif : resSet) { + issuesList.add(dif.asRuleViolation().getDescription()); + } + return issuesList; } } diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/ProtobufChecker.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/ProtobufChecker.java index ef24e2a..4619717 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/ProtobufChecker.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/checker/ProtobufChecker.java @@ -17,15 +17,26 @@ package net.syntio.compatibility.checker; import io.apicurio.registry.content.ContentHandle; +import io.apicurio.registry.rules.compatibility.CompatibilityDifference; +import io.apicurio.registry.rules.compatibility.CompatibilityExecutionResult; import io.apicurio.registry.rules.compatibility.CompatibilityLevel; import io.apicurio.registry.rules.compatibility.ProtobufCompatibilityChecker; +import java.util.ArrayList; import java.util.List; +import java.util.Set; public class ProtobufChecker implements CompatibilityChecker { + @Override - public boolean testCompatibility(CompatibilityLevel level, List history, ContentHandle currentSchema) { - ProtobufCompatibilityChecker cc = new ProtobufCompatibilityChecker(); - return cc.testCompatibility(level, history, currentSchema).isCompatible(); + public List testCompatibility(CompatibilityLevel level, List history, ContentHandle currentSchema) { + ProtobufCompatibilityChecker cc = new ProtobufCompatibilityChecker(); + CompatibilityExecutionResult res = cc.testCompatibility(level, history, currentSchema); + Set diffs = res.getIncompatibleDifferences(); + List issues = new ArrayList<>(); + for (CompatibilityDifference diff : diffs) { + issues.add(diff.asRuleViolation().getDescription()); + } + return issues; } } diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java index 1fed466..9b2d83f 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java @@ -21,12 +21,14 @@ import net.syntio.compatibility.checker.Checker; import net.syntio.compatibility.dto.CheckRequestDto; import net.syntio.compatibility.dto.CheckResponseDto; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RestController; +import java.util.ArrayList; import java.util.List; @RestController @@ -35,32 +37,41 @@ public class CheckerController { public ResponseEntity check(@RequestBody CheckRequestDto req) { Message latestSchema = req.getMessage(); List schemaHistory = req.getHistory(); + CheckResponseDto res; try { - for (int i = 0; i < schemaHistory.size(); i++) { - schemaHistory.set(i, schemaHistory.get(i).replaceAll("\r\n", "\n")); - } + schemaHistory.replaceAll(s -> s.replaceAll("\r\n", "\n")); String mode = req.getMode(); CompatibilityLevel cl = getCompatibilityLevel(mode); - boolean result; + List issues; if (cl.equals(CompatibilityLevel.NONE)) { - result = true; + issues = new ArrayList<>(); } else { - result = Checker.checkCompatibility(latestSchema, schemaHistory, cl); + issues = Checker.checkCompatibility(latestSchema, schemaHistory, cl); } - CheckResponseDto res = new CheckResponseDto(result); - if (result) { + for (String issue:issues) { + System.out.println(issue); + } + res = new CheckResponseDto(issues.isEmpty()); + if (issues.isEmpty()) { res.setInfo("Schema is compatible"); return ResponseEntity.ok(res); } - res.setInfo("Schema is incompatible"); + res.setInfo("Schema is incompatible: " + String.join("; ", issues)); return ResponseEntity.ok(res); } catch (NullPointerException e) { - System.err.println("Schema history is null."); - return ResponseEntity.badRequest().build(); + res = new CheckResponseDto(false); + res.setInfo("schema history is null"); + return ResponseEntity.status(HttpStatus.PRECONDITION_FAILED).body(res); + } catch (org.everit.json.schema.SchemaException e) { + res = new CheckResponseDto(false); + res.setInfo("schema version unknown or unsupported"); + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(res); } catch (Exception e) { - return ResponseEntity.badRequest().build(); + res = new CheckResponseDto(false); + res.setInfo("unknown error"); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(res); } } diff --git a/registry/compatibility/externalChecker.go b/registry/compatibility/externalChecker.go index 3217c1a..43e04cf 100644 --- a/registry/compatibility/externalChecker.go +++ b/registry/compatibility/externalChecker.go @@ -17,17 +17,20 @@ package compatibility import ( "context" "encoding/base64" - "fmt" "os" "strings" "time" "github.com/pkg/errors" - "github.com/dataphos/schema-registry/compatibility/http" - "github.com/dataphos/schema-registry/internal/errtemplates" "github.com/dataphos/lib-httputil/pkg/httputil" + "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/lib-logger/standardlogger" "github.com/dataphos/lib-retry/pkg/retry" + "github.com/dataphos/schema-registry/compatibility/http" + "github.com/dataphos/schema-registry/internal/config" + "github.com/dataphos/schema-registry/internal/errcodes" + "github.com/dataphos/schema-registry/internal/errtemplates" ) const ( @@ -42,8 +45,9 @@ const ( ) type ExternalChecker struct { - url string + Url string TimeoutBase time.Duration + Log logger.Log } // NewFromEnv loads the needed environment variables and calls New. @@ -73,9 +77,20 @@ func New(ctx context.Context, url string, timeoutBase time.Duration) (*ExternalC return nil, errors.Wrapf(err, "attempting to reach compatibility checker at %s failed", url) } + labels := logger.Labels{ + "product": "Schema Registry", + "component": "compatibility_checker", + } + logLevel, logConfigWarnings := config.GetLogLevel() + log := standardlogger.New(labels, standardlogger.WithLogLevel(logLevel)) + for _, w := range logConfigWarnings { + log.Warn(w) + } + return &ExternalChecker{ - url: url, + Url: url, TimeoutBase: timeoutBase, + Log: log, }, nil } @@ -90,9 +105,12 @@ func (c *ExternalChecker) Check(schemaInfo string, history []string, mode string decodedHistory, err := c.DecodeHistory(history) if err != nil { + c.Log.Error("could not decode", errcodes.SchemaUndecodable) return false, err } - return http.CheckOverHTTP(ctx, schemaInfo, decodedHistory, mode, c.url+"/") + compatible, info, err := http.CheckOverHTTP(ctx, schemaInfo, decodedHistory, mode, c.Url+"/") + c.Log.Info(info) + return compatible, err } func (c *ExternalChecker) DecodeHistory(history []string) ([]string, error) { @@ -100,7 +118,6 @@ func (c *ExternalChecker) DecodeHistory(history []string) ([]string, error) { for i := 0; i < len(history); i++ { decoded, err := base64.StdEncoding.DecodeString(history[i]) if err != nil { - fmt.Println(fmt.Errorf("could not decode").Error()) return nil, err } decodedHistory = append(decodedHistory, string(decoded)) diff --git a/registry/compatibility/http/http.go b/registry/compatibility/http/http.go index e3e4fef..66d26ad 100644 --- a/registry/compatibility/http/http.go +++ b/registry/compatibility/http/http.go @@ -59,10 +59,10 @@ func EstimateHTTPTimeout(size int, base time.Duration) time.Duration { // CheckOverHTTP requests a schema check over HTTP. // Function returns false if schema isn't compatible. -func CheckOverHTTP(ctx context.Context, schema string, history []string, mode, url string) (bool, error) { +func CheckOverHTTP(ctx context.Context, schema string, history []string, mode, url string) (bool, string, error) { response, err := sendCheckRequest(ctx, schema, history, mode, url) if err != nil { - return false, err + return false, "error sending compatibility check request", err } defer func(Body io.ReadCloser) { err := Body.Close() @@ -73,23 +73,23 @@ func CheckOverHTTP(ctx context.Context, schema string, history []string, mode, u body, err := io.ReadAll(response.Body) if err != nil { - return false, err + return false, "error reading compatibility check response", err } var parsedBody checkResponse if err = json.Unmarshal(body, &parsedBody); err != nil { - return false, err + return false, "error unmarshalling compatibility check body", err } compatible := parsedBody.Result switch response.StatusCode { case http.StatusOK: - return compatible, nil + return compatible, parsedBody.Info, nil case http.StatusBadRequest: - return compatible, nil + return compatible, parsedBody.Info, nil default: - return compatible, errors.Errorf("error: status code [%v]", response.StatusCode) + return compatible, parsedBody.Info, errors.Errorf("error: status code [%v]", response.StatusCode) } } diff --git a/registry/internal/errcodes/errcodes.go b/registry/internal/errcodes/errcodes.go index aec85bd..8b23421 100644 --- a/registry/internal/errcodes/errcodes.go +++ b/registry/internal/errcodes/errcodes.go @@ -22,6 +22,7 @@ const ( ExternalCheckerInitialization = 104 ServerShutdown = 200 BadRequest = 400 + SchemaUndecodable = 422 InternalServerError = 500 Miscellaneous = 999 ) diff --git a/registry/registry/schema.go b/registry/registry/schema.go index 2ff9ae4..ed15f2f 100644 --- a/registry/registry/schema.go +++ b/registry/registry/schema.go @@ -403,7 +403,6 @@ func (service *Service) UpdateSchema(id string, schemaUpdateRequest SchemaUpdate return VersionDetails{}, false, err } schemaUpdateRequest.Specification = canonicalSpec - } attributes, err := extractAttributes(schemaUpdateRequest.Specification, schemas.SchemaType, attSearchDepth) diff --git a/registry/server/handler.go b/registry/server/handler.go index 68e1210..7b484fa 100644 --- a/registry/server/handler.go +++ b/registry/server/handler.go @@ -666,9 +666,16 @@ func (h Handler) PutSchema(w http.ResponseWriter, r *http.Request) { Code: http.StatusNotFound, }) return - } - - if errors.Is(err, registry.ErrNotComp) { + } else if errors.Is(err, registry.ErrNotValid) { + body, _ := json.Marshal(report{ + Message: "Schema is not valid", + }) + writeResponse(w, responseBodyAndCode{ + Body: body, + Code: http.StatusBadRequest, + }) + return + } else if errors.Is(err, registry.ErrNotComp) { body, _ := json.Marshal(report{ Message: "Schemas are not compatible", }) diff --git a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/SchemaTypes.java b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/SchemaTypes.java index 5e9cc1a..523d599 100644 --- a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/SchemaTypes.java +++ b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/SchemaTypes.java @@ -17,6 +17,7 @@ package net.syntio.validity; public class SchemaTypes { + public static final String JSON = "json"; public static final String AVRO = "avro"; public static final String PROTOBUF = "protobuf"; diff --git a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/ValidatorFactory.java b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/ValidatorFactory.java index 6f399b2..b89ec69 100644 --- a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/ValidatorFactory.java +++ b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/ValidatorFactory.java @@ -23,6 +23,7 @@ import io.apicurio.registry.rules.validity.XsdContentValidator; public class ValidatorFactory { + public static ContentValidator createValidator(String schema) { return switch (schema) { case SchemaTypes.JSON -> new JsonSchemaContentValidator(); diff --git a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/checker/Checker.java b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/checker/Checker.java index 6005fae..74a9e5b 100644 --- a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/checker/Checker.java +++ b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/checker/Checker.java @@ -17,7 +17,6 @@ package net.syntio.validity.checker; import io.apicurio.registry.content.ContentHandle; -import io.apicurio.registry.rules.RuleViolationException; import io.apicurio.registry.rules.validity.ContentValidator; import io.apicurio.registry.rules.validity.ValidityLevel; @@ -26,21 +25,19 @@ import java.util.Collections; public class Checker { + public static boolean checkValidity(String schemaType, String schema, String mode) { ValidityLevel valLevel = switch (mode.toLowerCase()) { case "syntax-only" -> ValidityLevel.SYNTAX_ONLY; case "full" -> ValidityLevel.FULL; default -> ValidityLevel.NONE; }; - ContentValidator validator = ValidatorFactory.createValidator(schemaType); - ContentHandle contentHandle = ContentHandle.create(schema); - try { - validator.validate(valLevel, contentHandle, Collections.emptyMap()); - return true; - } catch (RuleViolationException e) { - return false; + if (validator == null) { // in case ValidatorFactory returns null + return false; } + ContentHandle contentHandle = ContentHandle.create(schema); + validator.validate(valLevel, contentHandle, Collections.emptyMap()); + return true; } - } diff --git a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/controller/CheckerController.java b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/controller/CheckerController.java index 56a9a8a..4758fbd 100644 --- a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/controller/CheckerController.java +++ b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/controller/CheckerController.java @@ -16,23 +16,26 @@ package net.syntio.validity.controller; +import io.apicurio.registry.rules.RuleViolation; +import io.apicurio.registry.rules.RuleViolationException; import net.syntio.validity.Message; import net.syntio.validity.checker.Checker; import net.syntio.validity.dto.CheckRequestDto; import net.syntio.validity.dto.CheckResponseDto; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RestController; @RestController public class CheckerController { + @PostMapping(value = "/") public ResponseEntity check(@RequestBody CheckRequestDto req) { Message payload = req.getMessage(); - try { String schemaType = payload.getSchemaType(); String schema = payload.getSchema(); @@ -46,9 +49,17 @@ public ResponseEntity check(@RequestBody CheckRequestDto req) } res.setInfo("Schema is invalid"); return ResponseEntity.ok(res); - + } catch (RuleViolationException e) { + CheckResponseDto res = new CheckResponseDto(false); + StringBuilder reasons = new StringBuilder(); + for (RuleViolation cause:e.getCauses()) { + reasons.append(cause.getDescription()).append(" at ").append(cause.getContext()).append(";"); + } + res.setInfo(reasons.toString()); + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(res); } catch (Exception e) { - return ResponseEntity.badRequest().build(); + CheckResponseDto res = new CheckResponseDto(false); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(res); } } diff --git a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/dto/CheckRequestDto.java b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/dto/CheckRequestDto.java index 2d56e77..125b207 100644 --- a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/dto/CheckRequestDto.java +++ b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/dto/CheckRequestDto.java @@ -19,6 +19,7 @@ import net.syntio.validity.Message; public class CheckRequestDto { + private final Message message; public CheckRequestDto(String schema, String format, String mode) { diff --git a/registry/validity/externalChecker.go b/registry/validity/externalChecker.go index 5bf549a..aeae698 100644 --- a/registry/validity/externalChecker.go +++ b/registry/validity/externalChecker.go @@ -16,20 +16,19 @@ package validity import ( "context" - "encoding/json" - "encoding/xml" - "fmt" - "io" "os" "strings" "time" "github.com/pkg/errors" - "github.com/dataphos/schema-registry/internal/errtemplates" - "github.com/dataphos/schema-registry/validity/http" "github.com/dataphos/lib-httputil/pkg/httputil" + "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/lib-logger/standardlogger" "github.com/dataphos/lib-retry/pkg/retry" + "github.com/dataphos/schema-registry/internal/config" + "github.com/dataphos/schema-registry/internal/errtemplates" + "github.com/dataphos/schema-registry/validity/http" ) const ( @@ -44,8 +43,9 @@ const ( ) type ExternalChecker struct { - url string + Url string TimeoutBase time.Duration + Log logger.Log } // NewExternalCheckerFromEnv loads the needed environment variables and calls NewExternalChecker. @@ -74,8 +74,18 @@ func NewExternalChecker(ctx context.Context, url string, timeoutBase time.Durati return nil, errors.Wrapf(err, "attempting to reach validity checker at %s failed", url) } + labels := logger.Labels{ + "product": "Schema Registry", + "component": "validity_checker", + } + logLevel, logConfigWarnings := config.GetLogLevel() + log := standardlogger.New(labels, standardlogger.WithLogLevel(logLevel)) + for _, w := range logConfigWarnings { + log.Warn(w) + } + return &ExternalChecker{ - url: url, + Url: url, TimeoutBase: timeoutBase, }, nil } @@ -86,21 +96,16 @@ func (c *ExternalChecker) Check(schema, schemaType, mode string) (bool, error) { return true, nil } if strings.ToLower(mode) == "syntax-only" || strings.ToLower(mode) == "full" { - internalCheck, err := internalCheck(schema, schemaType) - if err != nil { - return false, err - } - if !internalCheck { - return false, nil - } - } + size := []byte(schema + schemaType + mode) + ctx, cancel := context.WithTimeout(context.Background(), http.EstimateHTTPTimeout(len(size), c.TimeoutBase)) + defer cancel() - size := []byte(schema + schemaType + mode) - - ctx, cancel := context.WithTimeout(context.Background(), http.EstimateHTTPTimeout(len(size), c.TimeoutBase)) - defer cancel() + valid, info, err := http.CheckOverHTTP(ctx, schemaType, schema, mode, c.Url+"/") + c.Log.Info(info) + return valid, err + } - return http.CheckOverHTTP(ctx, schemaType, schema, mode, c.url+"/") + return false, errors.Errorf("") } func InitExternalValidityChecker(ctx context.Context) (*ExternalChecker, string, error) { @@ -118,29 +123,6 @@ func InitExternalValidityChecker(ctx context.Context) (*ExternalChecker, string, return nil, "", errors.Errorf("unsupported validity mode") } -func internalCheck(schema, schemaType string) (bool, error) { - switch schemaType { - case "json", "avro": - return json.Valid([]byte(schema)), nil - case "xml": - return IsValidXML(schema), nil - case "protobuf": //since there is no builtin protobuf validator, we assume schema is valid and propagate validation to external checker - return true, nil - default: - return false, fmt.Errorf("the schemaType is unavailiable") - } -} - -func IsValidXML(input string) bool { - decoder := xml.NewDecoder(strings.NewReader(input)) - for { - err := decoder.Decode(new(interface{})) - if err != nil { - return err == io.EOF - } - } -} - func CheckIfValidMode(mode *string) bool { if *mode == "" { *mode = defaultGlobalValidityMode diff --git a/registry/validity/http/http.go b/registry/validity/http/http.go index 361ac91..32b9c83 100644 --- a/registry/validity/http/http.go +++ b/registry/validity/http/http.go @@ -58,10 +58,10 @@ func EstimateHTTPTimeout(size int, base time.Duration) time.Duration { // CheckOverHTTP requests a schema check over HTTP. // Function returns false if schema isn't valid. -func CheckOverHTTP(ctx context.Context, schemaType, schema, mode, url string) (bool, error) { +func CheckOverHTTP(ctx context.Context, schemaType, schema, mode, url string) (bool, string, error) { response, err := sendCheckRequest(ctx, schemaType, schema, mode, url) if err != nil { - return false, err + return false, "error sending validity check request", err } defer func(Body io.ReadCloser) { err := Body.Close() @@ -72,23 +72,23 @@ func CheckOverHTTP(ctx context.Context, schemaType, schema, mode, url string) (b body, err := io.ReadAll(response.Body) if err != nil { - return false, err + return false, "error reading validity check response", err } var parsedBody checkResponse if err = json.Unmarshal(body, &parsedBody); err != nil { - return false, err + return false, "error unmarshalling compatibility check body", err } valid := parsedBody.Result switch response.StatusCode { case http.StatusOK: - return valid, nil + return valid, parsedBody.Info, nil case http.StatusBadRequest: - return valid, nil + return valid, parsedBody.Info, nil default: - return valid, errors.Errorf("error: status code [%v]", response.StatusCode) + return valid, parsedBody.Info, errors.Errorf("error: status code [%v]", response.StatusCode) } } From e66c613172aab98e13ff703cfbd015b28cbde440 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Fri, 15 Nov 2024 10:44:18 +0100 Subject: [PATCH 12/17] ci: fix incorrect use of error with format --- validator/internal/validator/json/json.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/internal/validator/json/json.go b/validator/internal/validator/json/json.go index 358dfd0..db828f7 100644 --- a/validator/internal/validator/json/json.go +++ b/validator/internal/validator/json/json.go @@ -93,7 +93,7 @@ func NewCached(size int) validator.Validator { if err != nil { return false, errors.New("couldn't create error message") } - return false, errors.WithMessagef(validator.ErrFailedValidation, errMessage) + return false, errors.WithMessage(validator.ErrFailedValidation, errMessage) } return true, nil }) From b0f4352e0dd670590cc9d42fc7ca5fd970492b8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Fri, 15 Nov 2024 14:03:18 +0100 Subject: [PATCH 13/17] fix: fixed minor issues introduced in previous PRs --- registry/validity/externalChecker.go | 1 + .../internal/centralconsumer/centralconsumer.go | 2 +- validator/internal/janitor/janitor.go | 14 +++++++++++--- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/registry/validity/externalChecker.go b/registry/validity/externalChecker.go index aeae698..a411b43 100644 --- a/registry/validity/externalChecker.go +++ b/registry/validity/externalChecker.go @@ -87,6 +87,7 @@ func NewExternalChecker(ctx context.Context, url string, timeoutBase time.Durati return &ExternalChecker{ Url: url, TimeoutBase: timeoutBase, + Log: log, }, nil } diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 9bb192e..f912071 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -325,7 +325,7 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) headerId, headerVersion, err = janitor.GetHeaderIdAndVersion(message) if err != nil { - return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, err + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } acquireIfSet(cc.registrySem) headerSchema, err = janitor.CollectSchema(ctx, headerId, headerVersion, cc.Registry) diff --git a/validator/internal/janitor/janitor.go b/validator/internal/janitor/janitor.go index d6fa263..df5255f 100644 --- a/validator/internal/janitor/janitor.go +++ b/validator/internal/janitor/janitor.go @@ -94,7 +94,9 @@ func CollectSchema(ctx context.Context, id string, version string, schemaRegistr schema, err := schemaRegistry.Get(ctx, id, version) if err != nil { if errors.Is(err, registry.ErrNotFound) { - return nil, nil + return nil, intoOpErr(id, errcodes.SchemaNotRegistered, err) + } else if errors.Is(err, registry.InvalidHeader) { + return nil, intoOpErr(id, errcodes.InvalidHeader, err) } return nil, intoOpErr(id, errcodes.RegistryUnresponsive, err) } @@ -123,10 +125,16 @@ func GetHeaderIdAndVersion(message Message) (string, string, error) { var ok bool if id, ok = message.RawAttributes[AttributeHeaderID].(string); !ok { - return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header ID")) + err := errors.New("missing header ID") + message.RawAttributes["deadLetterErrorCategory"] = "Missing header ID" + message.RawAttributes["deadLetterErrorReason"] = err + return "", "", intoOpErr(message.ID, errcodes.MissingHeader, err) } if version, ok = message.RawAttributes[AttributeHeaderVersion].(string); !ok { - return "", "", intoOpErr(message.ID, errcodes.MissingHeader, errors.New("missing header version")) + err := errors.New("missing header version") + message.RawAttributes["deadLetterErrorCategory"] = "Missing header version" + message.RawAttributes["deadLetterErrorReason"] = err + return "", "", intoOpErr(message.ID, errcodes.MissingHeader, err) } return id, version, nil } From dee9eed634028c6a08d21756fef1561be6033423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Fri, 15 Nov 2024 14:24:17 +0100 Subject: [PATCH 14/17] fix: fixed minor issues introduced in previous PRs --- .../net/syntio/compatibility/controller/CheckerController.java | 3 --- .../java/net/syntio/compatibility/dto/CheckRequestDto.java | 1 - 2 files changed, 4 deletions(-) diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java index 9b2d83f..9131f7f 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java @@ -50,9 +50,6 @@ public ResponseEntity check(@RequestBody CheckRequestDto req) issues = Checker.checkCompatibility(latestSchema, schemaHistory, cl); } - for (String issue:issues) { - System.out.println(issue); - } res = new CheckResponseDto(issues.isEmpty()); if (issues.isEmpty()) { res.setInfo("Schema is compatible"); diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/dto/CheckRequestDto.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/dto/CheckRequestDto.java index 9c92540..394bd68 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/dto/CheckRequestDto.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/dto/CheckRequestDto.java @@ -32,7 +32,6 @@ public CheckRequestDto(String payload, List history, String mode) { this.message = transformStringToMessage(payload); } catch (Exception e) { this.message = new Message("", "", ""); - System.err.println("Cannot read message"); } this.history = history; this.mode = mode; From c3c9ee30a8254e6937a54888be9bba2efba75a97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Fri, 15 Nov 2024 15:31:40 +0100 Subject: [PATCH 15/17] fix: fixed minor issues introduced in previous PRs --- validator/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/go.mod b/validator/go.mod index 7002b36..4dc0054 100644 --- a/validator/go.mod +++ b/validator/go.mod @@ -18,6 +18,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.15.1 github.com/santhosh-tekuri/jsonschema/v5 v5.0.2 + github.com/twmb/franz-go v1.13.3 github.com/xeipuuv/gojsonschema v1.2.0 go.uber.org/multierr v1.9.0 go.uber.org/ratelimit v0.2.0 @@ -88,7 +89,6 @@ require ( github.com/prometheus/procfs v0.9.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/twmb/franz-go v1.13.3 // indirect github.com/twmb/franz-go/pkg/kmsg v1.5.0 // indirect github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 // indirect github.com/twmb/franz-go/plugin/kprom v1.0.0 // indirect From 16c05a172422d78e70037d0a7deea8baf5bc38bb Mon Sep 17 00:00:00 2001 From: Simun Sprem Date: Tue, 19 Nov 2024 09:55:21 +0100 Subject: [PATCH 16/17] fix: pr comments --- .../controller/CheckerController.java | 4 ++-- registry/registry/repo.go | 2 +- .../centralconsumer/centralconsumer.go | 2 +- validator/internal/errcodes/errcodes.go | 8 +++---- validator/internal/janitor/janitor.go | 14 ++++++------- validator/internal/validator/json/json.go | 21 ++++++------------- 6 files changed, 21 insertions(+), 30 deletions(-) diff --git a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java index 9131f7f..ffd5280 100644 --- a/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java +++ b/registry/compatibility/external/compatibility-checker/src/main/java/net/syntio/compatibility/controller/CheckerController.java @@ -52,10 +52,10 @@ public ResponseEntity check(@RequestBody CheckRequestDto req) res = new CheckResponseDto(issues.isEmpty()); if (issues.isEmpty()) { - res.setInfo("Schema is compatible"); + res.setInfo("schema is compatible"); return ResponseEntity.ok(res); } - res.setInfo("Schema is incompatible: " + String.join("; ", issues)); + res.setInfo("schema is incompatible: " + String.join("; ", issues)); return ResponseEntity.ok(res); } catch (NullPointerException e) { res = new CheckResponseDto(false); diff --git a/registry/registry/repo.go b/registry/registry/repo.go index bad18be..4395b21 100644 --- a/registry/registry/repo.go +++ b/registry/registry/repo.go @@ -23,7 +23,7 @@ var ErrUnknownComp = errors.New("unknown value for compatibility_mode") var ErrUnknownVal = errors.New("unknown value for validity mode") var ErrNotValid = errors.New("schema is not valid") var ErrNotComp = errors.New("schemas are not compatible") -var ErrInvalidValueHeader = errors.New("invalid value header") +var ErrInvalidValueHeader = errors.New("invalid header value") type Repository interface { CreateSchema(schemaRegisterRequest SchemaRegistrationRequest) (VersionDetails, bool, error) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index f912071..897a160 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -360,7 +360,7 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) setMessageRawAttributes(message, "Schema error", err) releaseIfSet(cc.registrySem) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil - } else if opError.Code == errcodes.MissingHeader || opError.Code == errcodes.InvalidHeader { + } else if opError.Code == errcodes.MissingDataInHeader || opError.Code == errcodes.InvalidDataInHeader { setMessageRawAttributes(message, "Header error", err) releaseIfSet(cc.registrySem) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil diff --git a/validator/internal/errcodes/errcodes.go b/validator/internal/errcodes/errcodes.go index 3c002e2..f694e30 100644 --- a/validator/internal/errcodes/errcodes.go +++ b/validator/internal/errcodes/errcodes.go @@ -80,11 +80,11 @@ const ( // SchemaNotRegistered declares no schema is registered under specified schemaId and versionId SchemaNotRegistered = 905 - // MissingHeader declares that something in the header is missing - MissingHeader = 906 + // MissingDataInHeader declares that something in the header is missing + MissingDataInHeader = 906 - // InvalidHeader declares that something in the header is broken - InvalidHeader = 907 + // InvalidDataInHeader declares that something in the header is broken + InvalidDataInHeader = 907 // Miscellaneous is used when no other available error code is fitting. Miscellaneous = 999 diff --git a/validator/internal/janitor/janitor.go b/validator/internal/janitor/janitor.go index df5255f..6028dfc 100644 --- a/validator/internal/janitor/janitor.go +++ b/validator/internal/janitor/janitor.go @@ -96,7 +96,7 @@ func CollectSchema(ctx context.Context, id string, version string, schemaRegistr if errors.Is(err, registry.ErrNotFound) { return nil, intoOpErr(id, errcodes.SchemaNotRegistered, err) } else if errors.Is(err, registry.InvalidHeader) { - return nil, intoOpErr(id, errcodes.InvalidHeader, err) + return nil, intoOpErr(id, errcodes.InvalidDataInHeader, err) } return nil, intoOpErr(id, errcodes.RegistryUnresponsive, err) } @@ -125,16 +125,16 @@ func GetHeaderIdAndVersion(message Message) (string, string, error) { var ok bool if id, ok = message.RawAttributes[AttributeHeaderID].(string); !ok { - err := errors.New("missing header ID") - message.RawAttributes["deadLetterErrorCategory"] = "Missing header ID" + err := errors.New("missing header: ID") + message.RawAttributes["deadLetterErrorCategory"] = "Missing header: ID" message.RawAttributes["deadLetterErrorReason"] = err - return "", "", intoOpErr(message.ID, errcodes.MissingHeader, err) + return "", "", intoOpErr(message.ID, errcodes.MissingDataInHeader, err) } if version, ok = message.RawAttributes[AttributeHeaderVersion].(string); !ok { - err := errors.New("missing header version") - message.RawAttributes["deadLetterErrorCategory"] = "Missing header version" + err := errors.New("missing header: version") + message.RawAttributes["deadLetterErrorCategory"] = "Missing header: version" message.RawAttributes["deadLetterErrorReason"] = err - return "", "", intoOpErr(message.ID, errcodes.MissingHeader, err) + return "", "", intoOpErr(message.ID, errcodes.MissingDataInHeader, err) } return id, version, nil } diff --git a/validator/internal/validator/json/json.go b/validator/internal/validator/json/json.go index db828f7..e486ff7 100644 --- a/validator/internal/validator/json/json.go +++ b/validator/internal/validator/json/json.go @@ -18,11 +18,12 @@ import ( "bytes" "encoding/json" _errors "errors" - "github.com/pkg/errors" + "strconv" "github.com/dataphos/schema-registry-validator/internal/validator" lru "github.com/hashicorp/golang-lru" + "github.com/pkg/errors" "github.com/santhosh-tekuri/jsonschema/v5" _ "github.com/santhosh-tekuri/jsonschema/v5/httploader" "github.com/xeipuuv/gojsonschema" @@ -190,20 +191,10 @@ func NewCachedGoJsonSchemaValidator(size int) validator.Validator { func createErrorMessageAlt(validationError []gojsonschema.ResultError) (string, error) { errorMap := make(map[string]string) - for _, e := range validationError { - key := e.Details()["context"].(string) - var expected, given, value string - var ok1, ok2 bool - if expected, ok1 = e.Details()["expected"].(string); !ok1 { - value = "invalid value" - } - if given, ok2 = e.Details()["given"].(string); !ok2 { - value = "invalid value" - } - if ok1 && ok2 { - value = "expected " + expected + ", given " + given - } - errorMap[key] = value + for index, e := range validationError { + key := "error_" + strconv.Itoa(index+1) + reason := e.Description() + errorMap[key] = reason } errMessage, err := json.Marshal(errorMap) if err != nil { From a49349054813ef52ca8255773481d4ad00ee64fc Mon Sep 17 00:00:00 2001 From: Simun Sprem Date: Tue, 19 Nov 2024 13:41:27 +0100 Subject: [PATCH 17/17] ci: updated actions/setup-go --- .github/workflows/pr.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 8771fd1..77a7fe1 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -102,7 +102,7 @@ jobs: # Running lint steps if changes are detected in the component folder - name: Set up Go if: steps.check_changed_files.outputs.any_changed == 'true' - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} # Add all component folders for monorepos @@ -166,7 +166,7 @@ jobs: git pull origin ${{ env.PARENT_BRANCH }} --no-rebase - name: Set up Go - uses: actions/setup-go@v4 + uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} # Add all component folders for monorepos @@ -248,7 +248,7 @@ jobs: uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} # Add all component folders for monorepos