From 6ff445a4625503473b119a613c149138cdfc417e Mon Sep 17 00:00:00 2001 From: Simun Sprem Date: Fri, 20 Dec 2024 15:42:26 +0100 Subject: [PATCH 1/7] fix: removed line of code that caused unwanted return --- registry/server/handler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/registry/server/handler.go b/registry/server/handler.go index 7b484fa..9318c2d 100644 --- a/registry/server/handler.go +++ b/registry/server/handler.go @@ -159,9 +159,8 @@ func (h Handler) GetSpecificationByIdAndVersion(w http.ResponseWriter, r *http.R log.Println(err) } - body, _ := json.Marshal(specification) writeResponse(w, responseBodyAndCode{ - Body: body, + Body: specification, Code: http.StatusOK, }) } From c5515226ebacc326a24a43099dd1e158a866937d Mon Sep 17 00:00:00 2001 From: Simun Sprem Date: Fri, 20 Dec 2024 16:23:12 +0100 Subject: [PATCH 2/7] refactor: changed message when there are no registered schemas --- registry/server/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/server/handler.go b/registry/server/handler.go index 9318c2d..36ab242 100644 --- a/registry/server/handler.go +++ b/registry/server/handler.go @@ -358,7 +358,7 @@ func (h Handler) GetSchemas(w http.ResponseWriter, _ *http.Request) { if err != nil { if errors.Is(err, registry.ErrNotFound) { writeResponse(w, responseBodyAndCode{ - Body: serializeErrorMessage(http.StatusText(http.StatusNotFound)), + Body: serializeErrorMessage("Schema Registry is empty"), Code: http.StatusNotFound, }) return From 4b191b26128cf4978d6a3e44da41abdd2007b30b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:54:00 +0100 Subject: [PATCH 3/7] fix: added avro error reason to message header (#41) --- .github/workflows/pr.yaml | 57 ++++++++++--------- .github/workflows/push.yaml | 18 +++--- CHANGELOG.md | 8 +++ .../centralconsumer/centralconsumer.go | 26 ++++----- validator/internal/janitor/janitor.go | 26 ++++++--- validator/internal/validator/avro/avro.go | 7 ++- validator/internal/validator/validator.go | 10 +++- version.txt | 2 +- 8 files changed, 91 insertions(+), 63 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 77a7fe1..1980a28 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -1,8 +1,12 @@ name: PR CI on: - pull_request: - branches: [ develop, main ] + pull_request_target: + types: + - opened + - edited + - synchronize + - reopened env: GO111MODULE: on @@ -13,29 +17,30 @@ env: TEST_ARGS: -v -short -coverprofile=coverage.out jobs: - commitlint: - name: Commit Lint Job + validate_pr_title_job: + if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} + name: Validate PR title runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - name: Set up Node.js - uses: actions/setup-node@v4 + - uses: amannn/action-semantic-pull-request@v5 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: - node-version: ${{ env.NODE_VERSION }} - - - name: Install commitlint - run: | - npm install --save-dev @commitlint/{cli,config-conventional} - - - name: Validate PR commits with commitlint - run: npx commitlint --from ${{ github.event.pull_request.head.sha }}~${{ github.event.pull_request.commits }} --to ${{ github.event.pull_request.head.sha }} --verbose + requireScope: false + # Configure additional validation for the subject based on a regex. + # This example ensures the subject doesn't start with an uppercase character. + subjectPattern: ^(?![A-Z]).+$ + # If `subjectPattern` is configured, you can use this property to override + # the default error message that is shown when the pattern doesn't match. + # The variables `subject` and `title` can be used within the message. + subjectPatternError: | + The subject "{subject}" found in the pull request title "{title}" + didn't match the configured pattern. Please ensure that the subject + doesn't start with an uppercase character. # Linting multiple Dockerfiles to ensure adherence to best practices and coding standards. hadolint_job: - name: Hadolint Job + name: Hadolint if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest strategy: @@ -61,7 +66,7 @@ jobs: dockerfile: ${{ matrix.dockerfile }} editor_config_job: - name: Editor Config Job + name: Check editor config runs-on: ubuntu-latest steps: - name: Check out repository @@ -79,7 +84,7 @@ jobs: # Ensures that the code adheres to the lint checks defined in .golangci.yaml. lint_job: - name: Go lint job for all components + name: Go lint if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest strategy: @@ -125,8 +130,8 @@ jobs: skip-save-cache: true working-directory: ${{ matrix.component }} - licenses_check: - name: 3rd Party Licenses Check + licenses_check_job: + name: 3rd party licenses check if: ${{ github.event.head_commit.committer.name != 'github-actions[bot]' || ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest permissions: @@ -233,7 +238,7 @@ jobs: # Runs unit tests for all components in this repo test_job: - name: Test Job for all components + name: Test job if: ${{ github.base_ref == 'main' && ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest @@ -264,8 +269,8 @@ jobs: run: go test ${{env.TEST_ARGS}} ./... # Builds docker images for all components of the repo to test if they can successfully be built - Build_docker: - name: Test building docker images + test_docker_image_builds_job: + name: Test docker image builds if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest diff --git a/.github/workflows/push.yaml b/.github/workflows/push.yaml index 67d767d..f9abed1 100644 --- a/.github/workflows/push.yaml +++ b/.github/workflows/push.yaml @@ -2,14 +2,15 @@ name: PUSH CI on: push: - branches: [ develop, main ] + branches: + - main env: GO_VERSION: 1.21 jobs: - commitlint: - name: Commit Lint Job + commitlint_job: + name: Commit lint runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -28,8 +29,8 @@ jobs: - name: Validate current commit (last commit) with commitlint run: npx commitlint --last --verbose - license_headers: - name: Add License Headers + license_headers_job: + name: Add license headers if: github.event.head_commit.committer.name != 'github-actions[bot]' runs-on: ubuntu-latest permissions: @@ -81,9 +82,9 @@ jobs: run: | gh pr create --base ${{ github.ref_name }} --head "add-license-headers-${{ github.run_id }}" --title "style: add license headers" --body "This PR adds license headers to the affected files. Recommendation: Merge this PR using the rebase-merge method" - Upload_docker: - name: Build, Push and Sign Docker Images - if: github.ref_name == 'main' + upload_docker_images_job: + name: Build, push and sign Docker images + if: github.event.head_commit.committer.name == 'github-actions[bot]' runs-on: ubuntu-latest permissions: id-token: write # required to generate JWT token @@ -152,7 +153,6 @@ jobs: run: echo "Docker image syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }} already exists. Skipping push." release-please: - if: github.ref_name == 'main' runs-on: ubuntu-latest permissions: contents: write diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d26fab..628fbf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## [1.1.1](https://github.com/dataphos/schema-registry/compare/v1.1.0...v1.1.1) (2024-11-26) + + +### Bug Fixes + +* header schema error ([7328844](https://github.com/dataphos/schema-registry/commit/7328844e6e6309e3c99c7c2f9c750aba984a994e)) +* header validation fix ([d5708e2](https://github.com/dataphos/schema-registry/commit/d5708e271d173ff1a697b07bb1c62b7b506db383)) + ## [1.1.0](https://github.com/dataphos/schema-registry/compare/v1.0.0...v1.1.0) (2024-11-20) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 6e68062..d95f66e 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -559,34 +559,34 @@ func setMessageRawAttributes(message janitor.Message, errCategory string, err er message.RawAttributes["deadLetterErrorReason"] = err.Error() } -func (cc *CentralConsumer) determineError(m janitor.Message, e error, errorLocation string) (janitor.MessageTopicPair, error) { +func (cc *CentralConsumer) determineError(message janitor.Message, e error, errorLocation string) (janitor.MessageTopicPair, error) { opError := &janitor.OpError{} if errors.As(e, &opError) { if opError.Code == errcodes.RegistryUnresponsive { - setMessageRawAttributes(m, "Registry unresponsive", e) + setMessageRawAttributes(message, "Registry unresponsive", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, e + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, e } else if opError.Code == errcodes.SchemaNotRegistered { - setMessageRawAttributes(m, errorLocation+"schema not registered", e) + setMessageRawAttributes(message, errorLocation+"schema not registered", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, nil + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } else if opError.Code == errcodes.MissingDataInHeader { - setMessageRawAttributes(m, errorLocation+"schema id/version missing", e) + setMessageRawAttributes(message, errorLocation+"schema id/version missing", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, nil + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } else if opError.Code == errcodes.InvalidDataInHeader { - setMessageRawAttributes(m, errorLocation+"schema id/version invalid", e) + setMessageRawAttributes(message, errorLocation+"schema id/version invalid", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, nil + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } else if opError.Code == errcodes.DeadletterMessage { - setMessageRawAttributes(m, errorLocation+"validation failed", e) + setMessageRawAttributes(message, errorLocation+"validation failed", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, nil + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } } - setMessageRawAttributes(m, "Wrong compile", e) + setMessageRawAttributes(message, "Wrong compile", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, e + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, e } func acquireIfSet(sem chan struct{}) { diff --git a/validator/internal/janitor/janitor.go b/validator/internal/janitor/janitor.go index e6edd51..c0c4fd4 100644 --- a/validator/internal/janitor/janitor.go +++ b/validator/internal/janitor/janitor.go @@ -239,24 +239,32 @@ func InferDestinationTopic(messageSchemaPair MessageSchemaPair, validators Valid if errors.Is(err, validator.ErrBrokenMessage) { setMessageRawAttributes(message, "Broken message", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil - } - if errors.Is(err, validator.ErrWrongCompile) { + } else if errors.Is(err, validator.ErrWrongCompile) { setMessageRawAttributes(message, "Wrong compile", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil - } - if errors.Is(err, validator.ErrFailedValidation) { + } else if errors.Is(err, validator.ErrFailedValidation) { setMessageRawAttributes(message, "Payload validation error", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil - } - if errors.Is(err, validator.ErrUnsupportedFormat) { + } else if errors.Is(err, validator.ErrUnsupportedFormat) { setMessageRawAttributes(message, "Unsupported format", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil - } - if errors.Is(err, validator.ErrDeadletter) { + } else if errors.Is(err, validator.ErrParsingMessage) { + setMessageRawAttributes(message, "Parsing error", err) + return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil + } else if errors.Is(err, validator.ErrMarshalAvro) { + setMessageRawAttributes(message, "Avro serialization error", err) + return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil + } else if errors.Is(err, validator.ErrUnmarshalAvro) { + setMessageRawAttributes(message, "Avro deserialization error", err) + return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil + } else if errors.Is(err, validator.ErrDeadletter) { setMessageRawAttributes(message, "Deadletter error", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil + } else { + setMessageRawAttributes(message, "Unknown error", err) + return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, + intoOpErr(message.ID, errcodes.ValidationFailure, err) } - return MessageTopicPair{}, intoOpErr(message.ID, errcodes.ValidationFailure, err) } var result Result diff --git a/validator/internal/validator/avro/avro.go b/validator/internal/validator/avro/avro.go index a47fc0e..61de25d 100644 --- a/validator/internal/validator/avro/avro.go +++ b/validator/internal/validator/avro/avro.go @@ -16,6 +16,7 @@ package avro import ( "bytes" + "github.com/pkg/errors" "github.com/dataphos/schema-registry-validator/internal/validator" @@ -26,17 +27,17 @@ func New() validator.Validator { return validator.Func(func(message, schema []byte, _, _ string) (bool, error) { parsedSchema, err := avro.Parse(string(schema)) if err != nil { - return false, validator.ErrDeadletter + return false, errors.WithMessage(validator.ErrParsingMessage, err.Error()) } var data interface{} if err = avro.Unmarshal(parsedSchema, message, &data); err != nil { - return false, nil + return false, errors.WithMessage(validator.ErrUnmarshalAvro, err.Error()) } reserializedMessage, err := avro.Marshal(parsedSchema, data) if err != nil { - return false, nil + return false, errors.WithMessage(validator.ErrMarshalAvro, err.Error()) } return bytes.Equal(reserializedMessage, message), nil diff --git a/validator/internal/validator/validator.go b/validator/internal/validator/validator.go index b5a18ba..259fd53 100644 --- a/validator/internal/validator/validator.go +++ b/validator/internal/validator/validator.go @@ -35,8 +35,14 @@ var ErrMissingHeaderSchema = errors.New("Header is missing a schema") // ErrFailedValidation is a special error type to help distinguish messages that have failed in validation. var ErrFailedValidation = errors.New("An error occured while validating message") -// ErrFailedHeaderValidation is a special error type to help distinguish messages' headers that have failed in validation. -var ErrFailedHeaderValidation = errors.New("An error occured while validating message's header") +// ErrParsingMessage is a special error type to help distinguish messages that failed parsing +var ErrParsingMessage = errors.New("Message not parsed correctly") + +// ErrUnmarshalAvro is a special error type to help distinguish avro messages that weren't successfully deserialized +var ErrUnmarshalAvro = errors.New("Avro message not deserialized correctly") + +// ErrMarshalAvro is a special error type to help distinguish avro messages that weren't successfully serialized +var ErrMarshalAvro = errors.New("Avro message not serialized correctly") // ErrUnsupportedFormat is a special error type to help distinguish messages that are in wrong format var ErrUnsupportedFormat = errors.New("Message is not in a supported format") diff --git a/version.txt b/version.txt index 9084fa2..524cb55 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -1.1.0 +1.1.1 From 8d515744bb488f0ad2d7cea97ab7dd11bc14b95f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:55:40 +0100 Subject: [PATCH 4/7] feat: default header schema (#39) --- .../centralconsumer/centralconsumer.go | 103 ++++++++++++------ validator/internal/config/centralconsumer.go | 6 + .../internal/errtemplates/errtemplates.go | 4 +- validator/internal/janitorctl/run.go | 8 +- 4 files changed, 80 insertions(+), 41 deletions(-) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index d95f66e..2444321 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -19,14 +19,17 @@ import ( "encoding/json" "strconv" + "github.com/pkg/errors" + "go.uber.org/multierr" + "github.com/dataphos/lib-brokers/pkg/broker" "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/schema-registry-validator/internal/config" "github.com/dataphos/schema-registry-validator/internal/errcodes" "github.com/dataphos/schema-registry-validator/internal/errtemplates" "github.com/dataphos/schema-registry-validator/internal/janitor" "github.com/dataphos/schema-registry-validator/internal/registry" jsoninternal "github.com/dataphos/schema-registry-validator/internal/validator/json" - "github.com/pkg/errors" ) // Mode is the way Central consumer works; if the mode is Default, one CC will be deployed, and it will validate multiple @@ -70,19 +73,20 @@ type VersionDetails struct { // CentralConsumer models the central consumer process. type CentralConsumer struct { - Registry registry.SchemaRegistry - Validators janitor.Validators - Router janitor.Router - Publisher broker.Publisher - topicIDs Topics - topics map[string]broker.Topic - registrySem chan struct{} - validatorsSem chan struct{} - log logger.Log - mode Mode - schema Schema - encryptionKey string - validateHeaders bool + Registry registry.SchemaRegistry + Validators janitor.Validators + Router janitor.Router + Publisher broker.Publisher + topicIDs Topics + topics map[string]broker.Topic + registrySem chan struct{} + validatorsSem chan struct{} + log logger.Log + mode Mode + schema Schema + encryptionKey string + validateHeaders bool + defaultHeaderSchema config.DefaultHeaderSchema } // Settings holds settings concerning the concurrency limits for various stages of the central consumer pipeline. @@ -95,6 +99,12 @@ type Settings struct { // ValidateHeaders defines if the messages' headers will be validated ValidateHeaders bool + + // DefaultHeaderSchemaId is default ID of the header schema + DefaultHeaderSchemaId string + + // DefaultHeaderSchemaVersion is default version of the header schema + DefaultHeaderSchemaVersion string } // Topics defines the standard destination topics, based on validation results. @@ -191,6 +201,10 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator }, encryptionKey: encryptionKey, validateHeaders: settings.ValidateHeaders, + defaultHeaderSchema: config.DefaultHeaderSchema{ + DefaultHeaderSchemaId: settings.DefaultHeaderSchemaId, + DefaultHeaderSchemaVersion: settings.DefaultHeaderSchemaVersion, + }, }, nil } @@ -329,9 +343,25 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) ) headerId, headerVersion, err = getHeaderSchemaIdAndVersion(message) - if err != nil { - return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + for _, e := range multierr.Errors(err) { + if e.Error() == errtemplates.AttributeNotDefined(janitor.AttributeHeaderID).Error() { + if cc.defaultHeaderSchema.DefaultHeaderSchemaId != "" { + headerId = cc.defaultHeaderSchema.DefaultHeaderSchemaId + } else { + setMessageRawAttributes(message, "Header error", err) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } + } + if e.Error() == errtemplates.AttributeNotDefined(janitor.AttributeHeaderVersion).Error() { + if cc.defaultHeaderSchema.DefaultHeaderSchemaVersion != "" { + headerVersion = cc.defaultHeaderSchema.DefaultHeaderSchemaVersion + } else { + setMessageRawAttributes(message, "Header error", err) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } + } } + acquireIfSet(cc.registrySem) headerSchema, err = janitor.CollectSchema(ctx, headerId, headerVersion, cc.Registry) if err != nil { @@ -438,33 +468,34 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) } func getHeaderSchemaIdAndVersion(message janitor.Message) (string, string, error) { - var id string - var version string var ( - ok, isString bool + id, version string + okId, okVersion, isString bool + errCombined error ) - if _, ok = message.RawAttributes[janitor.AttributeHeaderID]; !ok { - err := errors.Errorf("missing %s from header", janitor.AttributeHeaderID) - setMessageRawAttributes(message, "Header error", err) - return "", "", err + // Check if header ID and header version are present + if _, okId = message.RawAttributes[janitor.AttributeHeaderID]; !okId { + errCombined = multierr.Append(errCombined, errtemplates.AttributeNotDefined(janitor.AttributeHeaderID)) } - if _, ok = message.RawAttributes[janitor.AttributeHeaderVersion]; !ok { - err := errors.Errorf("missing %s from header", janitor.AttributeHeaderVersion) - setMessageRawAttributes(message, "Header error", err) - return "", "", err + if _, okVersion = message.RawAttributes[janitor.AttributeHeaderVersion]; !okVersion { + errCombined = multierr.Append(errCombined, errtemplates.AttributeNotDefined(janitor.AttributeHeaderVersion)) } - if id, isString = message.RawAttributes[janitor.AttributeHeaderID].(string); !isString { - err := errors.Errorf("invalid type %s in header", janitor.AttributeHeaderID) - setMessageRawAttributes(message, "Header error", err) - return "", "", err + + // For those values that are present, check if they are strings (this should always be true, but check just in case) + if okId { + if id, isString = message.RawAttributes[janitor.AttributeHeaderID].(string); !isString { + errCombined = multierr.Append(errCombined, errtemplates.AttributeNotAString(janitor.AttributeHeaderID)) + } } - if version, isString = message.RawAttributes[janitor.AttributeHeaderVersion].(string); !isString { - err := errors.Errorf("invalid type %s in header", janitor.AttributeHeaderVersion) - setMessageRawAttributes(message, "Header error", err) - return "", "", err + if okVersion { + if version, isString = message.RawAttributes[janitor.AttributeHeaderVersion].(string); !isString { + errCombined = multierr.Append(errCombined, errtemplates.AttributeNotAString(janitor.AttributeHeaderVersion)) + } } - return id, version, nil + + // Values that are set are going to be modified, those that are not will not be and proper error will be returned + return id, version, errCombined } func (cc *CentralConsumer) getMessageTopicPair(messageSchemaPair janitor.MessageSchemaPair, encryptedMessageData []byte) (janitor.MessageTopicPair, error) { diff --git a/validator/internal/config/centralconsumer.go b/validator/internal/config/centralconsumer.go index 4545bb0..c31a0e4 100644 --- a/validator/internal/config/centralconsumer.go +++ b/validator/internal/config/centralconsumer.go @@ -31,6 +31,7 @@ type CentralConsumer struct { NumSchemaCollectors int `toml:"num_schema_collectors" default:"-1"` NumInferrers int `toml:"num_inferrers" default:"-1"` ValidateHeaders bool `toml:"validate_headers"` + DefaultHeaderSchema DefaultHeaderSchema `toml:"default_header_schema"` MetricsLoggingInterval time.Duration `toml:"metrics_logging_interval" default:"5s"` RunOptions RunOptions `toml:"run_option"` Mode int `toml:"mode"` @@ -71,6 +72,11 @@ type CentralConsumerShouldLog struct { DeadLetter bool `toml:"dead_letter"` } +type DefaultHeaderSchema struct { + DefaultHeaderSchemaId string `toml:"id"` + DefaultHeaderSchemaVersion string `toml:"version"` +} + // Read loads parameters from configuration file into CentralConsumer struct. func (cfg *CentralConsumer) Read(filename string) error { return fig.Load(cfg, fig.File(filename), fig.Tag("toml"), fig.UseEnv("")) diff --git a/validator/internal/errtemplates/errtemplates.go b/validator/internal/errtemplates/errtemplates.go index bff6b73..c1dd2fb 100644 --- a/validator/internal/errtemplates/errtemplates.go +++ b/validator/internal/errtemplates/errtemplates.go @@ -61,7 +61,7 @@ func UnsupportedBrokerType(name string) error { return errors.Errorf(unsupportedBrokerTypeTemplate, name) } -// UnsupportedRegistryType returns an error stating that the given broker type is not supported. +// UnsupportedRegistryType returns an error stating that the given registry type is not supported. func UnsupportedRegistryType(name string) error { return errors.Errorf(unsupportedRegistryTypeTemplate, name) } @@ -86,7 +86,7 @@ func MustNotBeEmpty(name string) error { return errors.Errorf(mustNotBeEmptyTemplate, name) } -// UnsupportedMessageFormat returns an error stating that the given variable must not be empty. +// UnsupportedMessageFormat returns an error stating that the message's format is not supported for validation. func UnsupportedMessageFormat(format string) error { return errors.Errorf(unsupportedMessageFormatTemplate, format) } diff --git a/validator/internal/janitorctl/run.go b/validator/internal/janitorctl/run.go index f719707..1132656 100644 --- a/validator/internal/janitorctl/run.go +++ b/validator/internal/janitorctl/run.go @@ -84,9 +84,11 @@ func RunCentralConsumer(configFile string) { Deadletter: cfg.Topics.DeadLetter, }, centralconsumer.Settings{ - NumSchemaCollectors: cfg.NumSchemaCollectors, - NumInferrers: cfg.NumInferrers, - ValidateHeaders: cfg.ValidateHeaders, + NumSchemaCollectors: cfg.NumSchemaCollectors, + NumInferrers: cfg.NumInferrers, + ValidateHeaders: cfg.ValidateHeaders, + DefaultHeaderSchemaId: cfg.DefaultHeaderSchema.DefaultHeaderSchemaId, + DefaultHeaderSchemaVersion: cfg.DefaultHeaderSchema.DefaultHeaderSchemaVersion, }, log, centralconsumer.RouterFlags{ From d99a99ba8c09c3cb74cab3725cf1fba086ab8529 Mon Sep 17 00:00:00 2001 From: Simun Sprem Date: Tue, 7 Jan 2025 15:08:54 +0100 Subject: [PATCH 5/7] changed the message --- registry/server/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/server/handler.go b/registry/server/handler.go index 36ab242..9f47f99 100644 --- a/registry/server/handler.go +++ b/registry/server/handler.go @@ -358,7 +358,7 @@ func (h Handler) GetSchemas(w http.ResponseWriter, _ *http.Request) { if err != nil { if errors.Is(err, registry.ErrNotFound) { writeResponse(w, responseBodyAndCode{ - Body: serializeErrorMessage("Schema Registry is empty"), + Body: serializeErrorMessage("No active schemas registered in the Registry"), Code: http.StatusNotFound, }) return From 86c63909a95e167c9a7782b2998d6fcc41f2a167 Mon Sep 17 00:00:00 2001 From: Simun Sprem Date: Wed, 8 Jan 2025 13:32:07 +0100 Subject: [PATCH 6/7] change code --- registry/server/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/server/handler.go b/registry/server/handler.go index 9f47f99..c6e268a 100644 --- a/registry/server/handler.go +++ b/registry/server/handler.go @@ -359,7 +359,7 @@ func (h Handler) GetSchemas(w http.ResponseWriter, _ *http.Request) { if errors.Is(err, registry.ErrNotFound) { writeResponse(w, responseBodyAndCode{ Body: serializeErrorMessage("No active schemas registered in the Registry"), - Code: http.StatusNotFound, + Code: http.StatusNoContent, }) return } From 8b5bfc2200d77bcd4a54b89efdb105e3061c923f Mon Sep 17 00:00:00 2001 From: Simun Sprem Date: Mon, 20 Jan 2025 15:56:28 +0100 Subject: [PATCH 7/7] fix: changed code from 204 to 200 when SR is empty --- registry/server/handler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/registry/server/handler.go b/registry/server/handler.go index c6e268a..53bcdbc 100644 --- a/registry/server/handler.go +++ b/registry/server/handler.go @@ -357,9 +357,10 @@ func (h Handler) GetSchemas(w http.ResponseWriter, _ *http.Request) { schemas, err := h.Service.GetSchemas() if err != nil { if errors.Is(err, registry.ErrNotFound) { + body, _ := json.Marshal(report{Message: "No active schemas registered in the Registry"}) writeResponse(w, responseBodyAndCode{ - Body: serializeErrorMessage("No active schemas registered in the Registry"), - Code: http.StatusNoContent, + Body: body, + Code: http.StatusOK, }) return }