From 4b191b26128cf4978d6a3e44da41abdd2007b30b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:54:00 +0100 Subject: [PATCH 1/5] fix: added avro error reason to message header (#41) --- .github/workflows/pr.yaml | 57 ++++++++++--------- .github/workflows/push.yaml | 18 +++--- CHANGELOG.md | 8 +++ .../centralconsumer/centralconsumer.go | 26 ++++----- validator/internal/janitor/janitor.go | 26 ++++++--- validator/internal/validator/avro/avro.go | 7 ++- validator/internal/validator/validator.go | 10 +++- version.txt | 2 +- 8 files changed, 91 insertions(+), 63 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 77a7fe1..1980a28 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -1,8 +1,12 @@ name: PR CI on: - pull_request: - branches: [ develop, main ] + pull_request_target: + types: + - opened + - edited + - synchronize + - reopened env: GO111MODULE: on @@ -13,29 +17,30 @@ env: TEST_ARGS: -v -short -coverprofile=coverage.out jobs: - commitlint: - name: Commit Lint Job + validate_pr_title_job: + if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} + name: Validate PR title runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - name: Set up Node.js - uses: actions/setup-node@v4 + - uses: amannn/action-semantic-pull-request@v5 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: - node-version: ${{ env.NODE_VERSION }} - - - name: Install commitlint - run: | - npm install --save-dev @commitlint/{cli,config-conventional} - - - name: Validate PR commits with commitlint - run: npx commitlint --from ${{ github.event.pull_request.head.sha }}~${{ github.event.pull_request.commits }} --to ${{ github.event.pull_request.head.sha }} --verbose + requireScope: false + # Configure additional validation for the subject based on a regex. + # This example ensures the subject doesn't start with an uppercase character. + subjectPattern: ^(?![A-Z]).+$ + # If `subjectPattern` is configured, you can use this property to override + # the default error message that is shown when the pattern doesn't match. + # The variables `subject` and `title` can be used within the message. + subjectPatternError: | + The subject "{subject}" found in the pull request title "{title}" + didn't match the configured pattern. Please ensure that the subject + doesn't start with an uppercase character. # Linting multiple Dockerfiles to ensure adherence to best practices and coding standards. hadolint_job: - name: Hadolint Job + name: Hadolint if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest strategy: @@ -61,7 +66,7 @@ jobs: dockerfile: ${{ matrix.dockerfile }} editor_config_job: - name: Editor Config Job + name: Check editor config runs-on: ubuntu-latest steps: - name: Check out repository @@ -79,7 +84,7 @@ jobs: # Ensures that the code adheres to the lint checks defined in .golangci.yaml. lint_job: - name: Go lint job for all components + name: Go lint if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest strategy: @@ -125,8 +130,8 @@ jobs: skip-save-cache: true working-directory: ${{ matrix.component }} - licenses_check: - name: 3rd Party Licenses Check + licenses_check_job: + name: 3rd party licenses check if: ${{ github.event.head_commit.committer.name != 'github-actions[bot]' || ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest permissions: @@ -233,7 +238,7 @@ jobs: # Runs unit tests for all components in this repo test_job: - name: Test Job for all components + name: Test job if: ${{ github.base_ref == 'main' && ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest @@ -264,8 +269,8 @@ jobs: run: go test ${{env.TEST_ARGS}} ./... # Builds docker images for all components of the repo to test if they can successfully be built - Build_docker: - name: Test building docker images + test_docker_image_builds_job: + name: Test docker image builds if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest diff --git a/.github/workflows/push.yaml b/.github/workflows/push.yaml index 67d767d..f9abed1 100644 --- a/.github/workflows/push.yaml +++ b/.github/workflows/push.yaml @@ -2,14 +2,15 @@ name: PUSH CI on: push: - branches: [ develop, main ] + branches: + - main env: GO_VERSION: 1.21 jobs: - commitlint: - name: Commit Lint Job + commitlint_job: + name: Commit lint runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -28,8 +29,8 @@ jobs: - name: Validate current commit (last commit) with commitlint run: npx commitlint --last --verbose - license_headers: - name: Add License Headers + license_headers_job: + name: Add license headers if: github.event.head_commit.committer.name != 'github-actions[bot]' runs-on: ubuntu-latest permissions: @@ -81,9 +82,9 @@ jobs: run: | gh pr create --base ${{ github.ref_name }} --head "add-license-headers-${{ github.run_id }}" --title "style: add license headers" --body "This PR adds license headers to the affected files. Recommendation: Merge this PR using the rebase-merge method" - Upload_docker: - name: Build, Push and Sign Docker Images - if: github.ref_name == 'main' + upload_docker_images_job: + name: Build, push and sign Docker images + if: github.event.head_commit.committer.name == 'github-actions[bot]' runs-on: ubuntu-latest permissions: id-token: write # required to generate JWT token @@ -152,7 +153,6 @@ jobs: run: echo "Docker image syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }} already exists. Skipping push." release-please: - if: github.ref_name == 'main' runs-on: ubuntu-latest permissions: contents: write diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d26fab..628fbf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## [1.1.1](https://github.com/dataphos/schema-registry/compare/v1.1.0...v1.1.1) (2024-11-26) + + +### Bug Fixes + +* header schema error ([7328844](https://github.com/dataphos/schema-registry/commit/7328844e6e6309e3c99c7c2f9c750aba984a994e)) +* header validation fix ([d5708e2](https://github.com/dataphos/schema-registry/commit/d5708e271d173ff1a697b07bb1c62b7b506db383)) + ## [1.1.0](https://github.com/dataphos/schema-registry/compare/v1.0.0...v1.1.0) (2024-11-20) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 6e68062..d95f66e 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -559,34 +559,34 @@ func setMessageRawAttributes(message janitor.Message, errCategory string, err er message.RawAttributes["deadLetterErrorReason"] = err.Error() } -func (cc *CentralConsumer) determineError(m janitor.Message, e error, errorLocation string) (janitor.MessageTopicPair, error) { +func (cc *CentralConsumer) determineError(message janitor.Message, e error, errorLocation string) (janitor.MessageTopicPair, error) { opError := &janitor.OpError{} if errors.As(e, &opError) { if opError.Code == errcodes.RegistryUnresponsive { - setMessageRawAttributes(m, "Registry unresponsive", e) + setMessageRawAttributes(message, "Registry unresponsive", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, e + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, e } else if opError.Code == errcodes.SchemaNotRegistered { - setMessageRawAttributes(m, errorLocation+"schema not registered", e) + setMessageRawAttributes(message, errorLocation+"schema not registered", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, nil + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } else if opError.Code == errcodes.MissingDataInHeader { - setMessageRawAttributes(m, errorLocation+"schema id/version missing", e) + setMessageRawAttributes(message, errorLocation+"schema id/version missing", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, nil + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } else if opError.Code == errcodes.InvalidDataInHeader { - setMessageRawAttributes(m, errorLocation+"schema id/version invalid", e) + setMessageRawAttributes(message, errorLocation+"schema id/version invalid", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, nil + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } else if opError.Code == errcodes.DeadletterMessage { - setMessageRawAttributes(m, errorLocation+"validation failed", e) + setMessageRawAttributes(message, errorLocation+"validation failed", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, nil + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } } - setMessageRawAttributes(m, "Wrong compile", e) + setMessageRawAttributes(message, "Wrong compile", e) releaseIfSet(cc.registrySem) - return janitor.MessageTopicPair{Message: m, Topic: cc.Router.Route(janitor.Deadletter, m)}, e + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, e } func acquireIfSet(sem chan struct{}) { diff --git a/validator/internal/janitor/janitor.go b/validator/internal/janitor/janitor.go index e6edd51..c0c4fd4 100644 --- a/validator/internal/janitor/janitor.go +++ b/validator/internal/janitor/janitor.go @@ -239,24 +239,32 @@ func InferDestinationTopic(messageSchemaPair MessageSchemaPair, validators Valid if errors.Is(err, validator.ErrBrokenMessage) { setMessageRawAttributes(message, "Broken message", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil - } - if errors.Is(err, validator.ErrWrongCompile) { + } else if errors.Is(err, validator.ErrWrongCompile) { setMessageRawAttributes(message, "Wrong compile", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil - } - if errors.Is(err, validator.ErrFailedValidation) { + } else if errors.Is(err, validator.ErrFailedValidation) { setMessageRawAttributes(message, "Payload validation error", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil - } - if errors.Is(err, validator.ErrUnsupportedFormat) { + } else if errors.Is(err, validator.ErrUnsupportedFormat) { setMessageRawAttributes(message, "Unsupported format", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil - } - if errors.Is(err, validator.ErrDeadletter) { + } else if errors.Is(err, validator.ErrParsingMessage) { + setMessageRawAttributes(message, "Parsing error", err) + return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil + } else if errors.Is(err, validator.ErrMarshalAvro) { + setMessageRawAttributes(message, "Avro serialization error", err) + return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil + } else if errors.Is(err, validator.ErrUnmarshalAvro) { + setMessageRawAttributes(message, "Avro deserialization error", err) + return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil + } else if errors.Is(err, validator.ErrDeadletter) { setMessageRawAttributes(message, "Deadletter error", err) return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, nil + } else { + setMessageRawAttributes(message, "Unknown error", err) + return MessageTopicPair{Message: message, Topic: router.Route(Deadletter, message)}, + intoOpErr(message.ID, errcodes.ValidationFailure, err) } - return MessageTopicPair{}, intoOpErr(message.ID, errcodes.ValidationFailure, err) } var result Result diff --git a/validator/internal/validator/avro/avro.go b/validator/internal/validator/avro/avro.go index a47fc0e..61de25d 100644 --- a/validator/internal/validator/avro/avro.go +++ b/validator/internal/validator/avro/avro.go @@ -16,6 +16,7 @@ package avro import ( "bytes" + "github.com/pkg/errors" "github.com/dataphos/schema-registry-validator/internal/validator" @@ -26,17 +27,17 @@ func New() validator.Validator { return validator.Func(func(message, schema []byte, _, _ string) (bool, error) { parsedSchema, err := avro.Parse(string(schema)) if err != nil { - return false, validator.ErrDeadletter + return false, errors.WithMessage(validator.ErrParsingMessage, err.Error()) } var data interface{} if err = avro.Unmarshal(parsedSchema, message, &data); err != nil { - return false, nil + return false, errors.WithMessage(validator.ErrUnmarshalAvro, err.Error()) } reserializedMessage, err := avro.Marshal(parsedSchema, data) if err != nil { - return false, nil + return false, errors.WithMessage(validator.ErrMarshalAvro, err.Error()) } return bytes.Equal(reserializedMessage, message), nil diff --git a/validator/internal/validator/validator.go b/validator/internal/validator/validator.go index b5a18ba..259fd53 100644 --- a/validator/internal/validator/validator.go +++ b/validator/internal/validator/validator.go @@ -35,8 +35,14 @@ var ErrMissingHeaderSchema = errors.New("Header is missing a schema") // ErrFailedValidation is a special error type to help distinguish messages that have failed in validation. var ErrFailedValidation = errors.New("An error occured while validating message") -// ErrFailedHeaderValidation is a special error type to help distinguish messages' headers that have failed in validation. -var ErrFailedHeaderValidation = errors.New("An error occured while validating message's header") +// ErrParsingMessage is a special error type to help distinguish messages that failed parsing +var ErrParsingMessage = errors.New("Message not parsed correctly") + +// ErrUnmarshalAvro is a special error type to help distinguish avro messages that weren't successfully deserialized +var ErrUnmarshalAvro = errors.New("Avro message not deserialized correctly") + +// ErrMarshalAvro is a special error type to help distinguish avro messages that weren't successfully serialized +var ErrMarshalAvro = errors.New("Avro message not serialized correctly") // ErrUnsupportedFormat is a special error type to help distinguish messages that are in wrong format var ErrUnsupportedFormat = errors.New("Message is not in a supported format") diff --git a/version.txt b/version.txt index 9084fa2..524cb55 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -1.1.0 +1.1.1 From 8d515744bb488f0ad2d7cea97ab7dd11bc14b95f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:55:40 +0100 Subject: [PATCH 2/5] feat: default header schema (#39) --- .../centralconsumer/centralconsumer.go | 103 ++++++++++++------ validator/internal/config/centralconsumer.go | 6 + .../internal/errtemplates/errtemplates.go | 4 +- validator/internal/janitorctl/run.go | 8 +- 4 files changed, 80 insertions(+), 41 deletions(-) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index d95f66e..2444321 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -19,14 +19,17 @@ import ( "encoding/json" "strconv" + "github.com/pkg/errors" + "go.uber.org/multierr" + "github.com/dataphos/lib-brokers/pkg/broker" "github.com/dataphos/lib-logger/logger" + "github.com/dataphos/schema-registry-validator/internal/config" "github.com/dataphos/schema-registry-validator/internal/errcodes" "github.com/dataphos/schema-registry-validator/internal/errtemplates" "github.com/dataphos/schema-registry-validator/internal/janitor" "github.com/dataphos/schema-registry-validator/internal/registry" jsoninternal "github.com/dataphos/schema-registry-validator/internal/validator/json" - "github.com/pkg/errors" ) // Mode is the way Central consumer works; if the mode is Default, one CC will be deployed, and it will validate multiple @@ -70,19 +73,20 @@ type VersionDetails struct { // CentralConsumer models the central consumer process. type CentralConsumer struct { - Registry registry.SchemaRegistry - Validators janitor.Validators - Router janitor.Router - Publisher broker.Publisher - topicIDs Topics - topics map[string]broker.Topic - registrySem chan struct{} - validatorsSem chan struct{} - log logger.Log - mode Mode - schema Schema - encryptionKey string - validateHeaders bool + Registry registry.SchemaRegistry + Validators janitor.Validators + Router janitor.Router + Publisher broker.Publisher + topicIDs Topics + topics map[string]broker.Topic + registrySem chan struct{} + validatorsSem chan struct{} + log logger.Log + mode Mode + schema Schema + encryptionKey string + validateHeaders bool + defaultHeaderSchema config.DefaultHeaderSchema } // Settings holds settings concerning the concurrency limits for various stages of the central consumer pipeline. @@ -95,6 +99,12 @@ type Settings struct { // ValidateHeaders defines if the messages' headers will be validated ValidateHeaders bool + + // DefaultHeaderSchemaId is default ID of the header schema + DefaultHeaderSchemaId string + + // DefaultHeaderSchemaVersion is default version of the header schema + DefaultHeaderSchemaVersion string } // Topics defines the standard destination topics, based on validation results. @@ -191,6 +201,10 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator }, encryptionKey: encryptionKey, validateHeaders: settings.ValidateHeaders, + defaultHeaderSchema: config.DefaultHeaderSchema{ + DefaultHeaderSchemaId: settings.DefaultHeaderSchemaId, + DefaultHeaderSchemaVersion: settings.DefaultHeaderSchemaVersion, + }, }, nil } @@ -329,9 +343,25 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) ) headerId, headerVersion, err = getHeaderSchemaIdAndVersion(message) - if err != nil { - return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + for _, e := range multierr.Errors(err) { + if e.Error() == errtemplates.AttributeNotDefined(janitor.AttributeHeaderID).Error() { + if cc.defaultHeaderSchema.DefaultHeaderSchemaId != "" { + headerId = cc.defaultHeaderSchema.DefaultHeaderSchemaId + } else { + setMessageRawAttributes(message, "Header error", err) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } + } + if e.Error() == errtemplates.AttributeNotDefined(janitor.AttributeHeaderVersion).Error() { + if cc.defaultHeaderSchema.DefaultHeaderSchemaVersion != "" { + headerVersion = cc.defaultHeaderSchema.DefaultHeaderSchemaVersion + } else { + setMessageRawAttributes(message, "Header error", err) + return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil + } + } } + acquireIfSet(cc.registrySem) headerSchema, err = janitor.CollectSchema(ctx, headerId, headerVersion, cc.Registry) if err != nil { @@ -438,33 +468,34 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) } func getHeaderSchemaIdAndVersion(message janitor.Message) (string, string, error) { - var id string - var version string var ( - ok, isString bool + id, version string + okId, okVersion, isString bool + errCombined error ) - if _, ok = message.RawAttributes[janitor.AttributeHeaderID]; !ok { - err := errors.Errorf("missing %s from header", janitor.AttributeHeaderID) - setMessageRawAttributes(message, "Header error", err) - return "", "", err + // Check if header ID and header version are present + if _, okId = message.RawAttributes[janitor.AttributeHeaderID]; !okId { + errCombined = multierr.Append(errCombined, errtemplates.AttributeNotDefined(janitor.AttributeHeaderID)) } - if _, ok = message.RawAttributes[janitor.AttributeHeaderVersion]; !ok { - err := errors.Errorf("missing %s from header", janitor.AttributeHeaderVersion) - setMessageRawAttributes(message, "Header error", err) - return "", "", err + if _, okVersion = message.RawAttributes[janitor.AttributeHeaderVersion]; !okVersion { + errCombined = multierr.Append(errCombined, errtemplates.AttributeNotDefined(janitor.AttributeHeaderVersion)) } - if id, isString = message.RawAttributes[janitor.AttributeHeaderID].(string); !isString { - err := errors.Errorf("invalid type %s in header", janitor.AttributeHeaderID) - setMessageRawAttributes(message, "Header error", err) - return "", "", err + + // For those values that are present, check if they are strings (this should always be true, but check just in case) + if okId { + if id, isString = message.RawAttributes[janitor.AttributeHeaderID].(string); !isString { + errCombined = multierr.Append(errCombined, errtemplates.AttributeNotAString(janitor.AttributeHeaderID)) + } } - if version, isString = message.RawAttributes[janitor.AttributeHeaderVersion].(string); !isString { - err := errors.Errorf("invalid type %s in header", janitor.AttributeHeaderVersion) - setMessageRawAttributes(message, "Header error", err) - return "", "", err + if okVersion { + if version, isString = message.RawAttributes[janitor.AttributeHeaderVersion].(string); !isString { + errCombined = multierr.Append(errCombined, errtemplates.AttributeNotAString(janitor.AttributeHeaderVersion)) + } } - return id, version, nil + + // Values that are set are going to be modified, those that are not will not be and proper error will be returned + return id, version, errCombined } func (cc *CentralConsumer) getMessageTopicPair(messageSchemaPair janitor.MessageSchemaPair, encryptedMessageData []byte) (janitor.MessageTopicPair, error) { diff --git a/validator/internal/config/centralconsumer.go b/validator/internal/config/centralconsumer.go index 4545bb0..c31a0e4 100644 --- a/validator/internal/config/centralconsumer.go +++ b/validator/internal/config/centralconsumer.go @@ -31,6 +31,7 @@ type CentralConsumer struct { NumSchemaCollectors int `toml:"num_schema_collectors" default:"-1"` NumInferrers int `toml:"num_inferrers" default:"-1"` ValidateHeaders bool `toml:"validate_headers"` + DefaultHeaderSchema DefaultHeaderSchema `toml:"default_header_schema"` MetricsLoggingInterval time.Duration `toml:"metrics_logging_interval" default:"5s"` RunOptions RunOptions `toml:"run_option"` Mode int `toml:"mode"` @@ -71,6 +72,11 @@ type CentralConsumerShouldLog struct { DeadLetter bool `toml:"dead_letter"` } +type DefaultHeaderSchema struct { + DefaultHeaderSchemaId string `toml:"id"` + DefaultHeaderSchemaVersion string `toml:"version"` +} + // Read loads parameters from configuration file into CentralConsumer struct. func (cfg *CentralConsumer) Read(filename string) error { return fig.Load(cfg, fig.File(filename), fig.Tag("toml"), fig.UseEnv("")) diff --git a/validator/internal/errtemplates/errtemplates.go b/validator/internal/errtemplates/errtemplates.go index bff6b73..c1dd2fb 100644 --- a/validator/internal/errtemplates/errtemplates.go +++ b/validator/internal/errtemplates/errtemplates.go @@ -61,7 +61,7 @@ func UnsupportedBrokerType(name string) error { return errors.Errorf(unsupportedBrokerTypeTemplate, name) } -// UnsupportedRegistryType returns an error stating that the given broker type is not supported. +// UnsupportedRegistryType returns an error stating that the given registry type is not supported. func UnsupportedRegistryType(name string) error { return errors.Errorf(unsupportedRegistryTypeTemplate, name) } @@ -86,7 +86,7 @@ func MustNotBeEmpty(name string) error { return errors.Errorf(mustNotBeEmptyTemplate, name) } -// UnsupportedMessageFormat returns an error stating that the given variable must not be empty. +// UnsupportedMessageFormat returns an error stating that the message's format is not supported for validation. func UnsupportedMessageFormat(format string) error { return errors.Errorf(unsupportedMessageFormatTemplate, format) } diff --git a/validator/internal/janitorctl/run.go b/validator/internal/janitorctl/run.go index f719707..1132656 100644 --- a/validator/internal/janitorctl/run.go +++ b/validator/internal/janitorctl/run.go @@ -84,9 +84,11 @@ func RunCentralConsumer(configFile string) { Deadletter: cfg.Topics.DeadLetter, }, centralconsumer.Settings{ - NumSchemaCollectors: cfg.NumSchemaCollectors, - NumInferrers: cfg.NumInferrers, - ValidateHeaders: cfg.ValidateHeaders, + NumSchemaCollectors: cfg.NumSchemaCollectors, + NumInferrers: cfg.NumInferrers, + ValidateHeaders: cfg.ValidateHeaders, + DefaultHeaderSchemaId: cfg.DefaultHeaderSchema.DefaultHeaderSchemaId, + DefaultHeaderSchemaVersion: cfg.DefaultHeaderSchema.DefaultHeaderSchemaVersion, }, log, centralconsumer.RouterFlags{ From ef8f07f9b1f5c74adf75b6087ea010aec1ac1674 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Thu, 9 Jan 2025 10:26:01 +0100 Subject: [PATCH 3/5] refactor: change message when empty (#44) --- .github/workflows/pr.yaml | 11 ++++++----- registry/server/handler.go | 7 +++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 1980a28..1674f5b 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -40,8 +40,8 @@ jobs: # Linting multiple Dockerfiles to ensure adherence to best practices and coding standards. hadolint_job: - name: Hadolint if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} + name: Hadolint runs-on: ubuntu-latest strategy: fail-fast: false @@ -66,6 +66,7 @@ jobs: dockerfile: ${{ matrix.dockerfile }} editor_config_job: + if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} name: Check editor config runs-on: ubuntu-latest steps: @@ -84,8 +85,8 @@ jobs: # Ensures that the code adheres to the lint checks defined in .golangci.yaml. lint_job: - name: Go lint if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} + name: Go lint runs-on: ubuntu-latest strategy: fail-fast: false @@ -131,8 +132,8 @@ jobs: working-directory: ${{ matrix.component }} licenses_check_job: + if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} name: 3rd party licenses check - if: ${{ github.event.head_commit.committer.name != 'github-actions[bot]' || ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest permissions: contents: write @@ -238,8 +239,8 @@ jobs: # Runs unit tests for all components in this repo test_job: + if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} name: Test job - if: ${{ github.base_ref == 'main' && ! contains(github.head_ref, 'release-please--branches--main') }} runs-on: ubuntu-latest strategy: @@ -270,8 +271,8 @@ jobs: # Builds docker images for all components of the repo to test if they can successfully be built test_docker_image_builds_job: - name: Test docker image builds if: ${{ ! contains(github.head_ref, 'release-please--branches--main') }} + name: Test docker image builds runs-on: ubuntu-latest strategy: diff --git a/registry/server/handler.go b/registry/server/handler.go index 7b484fa..c6e268a 100644 --- a/registry/server/handler.go +++ b/registry/server/handler.go @@ -159,9 +159,8 @@ func (h Handler) GetSpecificationByIdAndVersion(w http.ResponseWriter, r *http.R log.Println(err) } - body, _ := json.Marshal(specification) writeResponse(w, responseBodyAndCode{ - Body: body, + Body: specification, Code: http.StatusOK, }) } @@ -359,8 +358,8 @@ func (h Handler) GetSchemas(w http.ResponseWriter, _ *http.Request) { if err != nil { if errors.Is(err, registry.ErrNotFound) { writeResponse(w, responseBodyAndCode{ - Body: serializeErrorMessage(http.StatusText(http.StatusNotFound)), - Code: http.StatusNotFound, + Body: serializeErrorMessage("No active schemas registered in the Registry"), + Code: http.StatusNoContent, }) return } From c7d09d0bca72e941e123edfedc7d8274e00f71f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Fri, 10 Jan 2025 18:47:25 +0100 Subject: [PATCH 4/5] fix: require schema format (#45) --- .github/workflows/push.yaml | 18 ++++--- registry/cmd/sr-cli/main.go | 2 +- registry/docker/.env | 5 ++ registry/docker/compose.yaml | 53 +++++++++++++++++++ registry/registry/repo.go | 1 + .../registry/repository/postgres/model.go | 1 - registry/server/handler.go | 15 +++++- registry/server/util.go | 18 +++++++ .../net/syntio/validity/checker/Checker.java | 2 +- registry/validity/http/http.go | 2 +- .../centralconsumer/centralconsumer.go | 3 +- validator/internal/janitor/janitor.go | 25 ++++++--- validator/internal/janitor/parse.go | 10 +++- 13 files changed, 132 insertions(+), 23 deletions(-) create mode 100644 registry/docker/.env create mode 100644 registry/docker/compose.yaml diff --git a/.github/workflows/push.yaml b/.github/workflows/push.yaml index f9abed1..7055e80 100644 --- a/.github/workflows/push.yaml +++ b/.github/workflows/push.yaml @@ -55,7 +55,7 @@ jobs: - name: Add missing license headers run: nwa add -c "Syntio Ltd." -s **/*.xml ./registry ./validator - - name: Check and Commit changes + - name: Check and commit changes id: check_commit run: | git config user.name "github-actions[bot]" @@ -75,7 +75,7 @@ jobs: git checkout -b "add-license-headers-${{ github.run_id }}" git push origin HEAD - - name: Create Pull Request + - name: Create pull request if: env.changes_committed == 'true' env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -83,8 +83,8 @@ jobs: gh pr create --base ${{ github.ref_name }} --head "add-license-headers-${{ github.run_id }}" --title "style: add license headers" --body "This PR adds license headers to the affected files. Recommendation: Merge this PR using the rebase-merge method" upload_docker_images_job: + if: github.event.head_commit.committer.name == 'github-actions[bot]' || startsWith(github.event.head_commit.message.name, 'feat') || startsWith(github.event.head_commit.message.name, 'fix') name: Build, push and sign Docker images - if: github.event.head_commit.committer.name == 'github-actions[bot]' runs-on: ubuntu-latest permissions: id-token: write # required to generate JWT token @@ -111,9 +111,13 @@ jobs: - name: Check out code uses: actions/checkout@v4 - - name: Set Tag + - name: Set image tag run: | - TAG=$(cat version.txt) + if [ ${{ github.event.head_commit.committer.name }} == 'github-actions[bot]' ]; then + TAG=$(cat version.txt) + else + TAG=$(echo $GITHUB_SHA | cut -c 1-7) + fi echo "TAG=$TAG" >> $GITHUB_ENV - name: Docker Hub Login @@ -139,11 +143,11 @@ jobs: docker push syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }} - name: Install cosign - if: ${{ env.TAG_EXISTS == 'false' }} + if: ${{ github.event.head_commit.committer.name == 'github-actions[bot]' && env.TAG_EXISTS == 'false' }} uses: sigstore/cosign-installer@v3.6.0 - name: Sign the Docker image - if: ${{ env.TAG_EXISTS == 'false' }} + if: ${{ github.event.head_commit.committer.name == 'github-actions[bot]' && env.TAG_EXISTS == 'false' }} run: | digest=$(docker inspect --format='{{index .RepoDigests 0}}' syntioinc/dataphos-${{ matrix.component.image-name }}:${{ env.TAG }}) cosign sign --yes "$digest" diff --git a/registry/cmd/sr-cli/main.go b/registry/cmd/sr-cli/main.go index a1c0a5e..e8d1228 100644 --- a/registry/cmd/sr-cli/main.go +++ b/registry/cmd/sr-cli/main.go @@ -100,7 +100,7 @@ func registerSchema(registerCommand *flag.FlagSet) { if !added { log.Print("schema already exists") } else { - log.Print("created schema under the id ", details.VersionID) + log.Printf("created schema under the id %s and version %s", details.SchemaID, details.Version) } } diff --git a/registry/docker/.env b/registry/docker/.env new file mode 100644 index 0000000..af50758 --- /dev/null +++ b/registry/docker/.env @@ -0,0 +1,5 @@ +SR_HOST=host.docker.internal +SR_DBNAME=postgres +SR_USER=postgres +SR_PASSWORD= +SR_TABLE_PREFIX=syntio_schema. diff --git a/registry/docker/compose.yaml b/registry/docker/compose.yaml new file mode 100644 index 0000000..ca34eb5 --- /dev/null +++ b/registry/docker/compose.yaml @@ -0,0 +1,53 @@ +name: schema-registry +services: + postgres: + image: postgres:17 + ports: + - 5432:5432 + environment: + POSTGRES_PASSWORD: # Please provide the password. Needs to match SR_PASSWORD in the .env file. + healthcheck: + test: ["CMD-SHELL", "pg_isready"] + interval: 30s + timeout: 60s + retries: 5 + start_period: 80s + initdb: + depends_on: + postgres: + condition: service_healthy + build: + context: ../../ + dockerfile: ./registry/docker/initdb/Dockerfile + env_file: + - .env + compatibility-checker: + build: + context: ../../ + dockerfile: ./registry/docker/compatibility-checker/Dockerfile + ports: + - 8088:8088 + validity-checker: + build: + context: ../../ + dockerfile: ./registry/docker/validity-checker/Dockerfile + ports: + - 8089:8089 + schema-registry: + depends_on: + initdb: + condition: service_completed_successfully + compatibility-checker: + condition: service_started + validity-checker: + condition: service_started + build: + context: ../../ + dockerfile: ./registry/docker/registry/Dockerfile + ports: + - 8080:8080 + env_file: + - .env + environment: + COMPATIBILITY_CHECKER_URL: http://host.docker.internal:8088 + VALIDITY_CHECKER_URL: http://host.docker.internal:8089 diff --git a/registry/registry/repo.go b/registry/registry/repo.go index 4395b21..bafac99 100644 --- a/registry/registry/repo.go +++ b/registry/registry/repo.go @@ -21,6 +21,7 @@ import ( var ErrNotFound = errors.New("not found") var ErrUnknownComp = errors.New("unknown value for compatibility_mode") var ErrUnknownVal = errors.New("unknown value for validity mode") +var ErrUnknownFormat = errors.New("unknown value for schema format") var ErrNotValid = errors.New("schema is not valid") var ErrNotComp = errors.New("schemas are not compatible") var ErrInvalidValueHeader = errors.New("invalid header value") diff --git a/registry/registry/repository/postgres/model.go b/registry/registry/repository/postgres/model.go index fd5a5a1..43ba7d0 100644 --- a/registry/registry/repository/postgres/model.go +++ b/registry/registry/repository/postgres/model.go @@ -70,7 +70,6 @@ func intoRegistrySchema(schema Schema) registry.Schema { // intoRegistryVersionDetails maps VersionDetails from repository to service layer. func intoRegistryVersionDetails(VersionDetails VersionDetails) registry.VersionDetails { return registry.VersionDetails{ - VersionID: strconv.Itoa(int(VersionDetails.VersionID)), Version: VersionDetails.Version, SchemaID: strconv.Itoa(int(VersionDetails.SchemaID)), Specification: VersionDetails.Specification, diff --git a/registry/server/handler.go b/registry/server/handler.go index c6e268a..3722663 100644 --- a/registry/server/handler.go +++ b/registry/server/handler.go @@ -534,8 +534,19 @@ func (h Handler) SearchSchemas(w http.ResponseWriter, r *http.Request) { // @Failure 500 // @Router /schemas [post] func (h Handler) PostSchema(w http.ResponseWriter, r *http.Request) { + registerRequest, err := readSchemaRegisterRequest(r.Body) if err != nil { + if errors.Is(err, registry.ErrUnknownFormat) { + body, _ := json.Marshal(report{ + Message: "Bad request: unknown format value", + }) + writeResponse(w, responseBodyAndCode{ + Body: body, + Code: http.StatusBadRequest, + }) + return + } writeResponse(w, responseBodyAndCode{ Body: serializeErrorMessage(http.StatusText(http.StatusBadRequest)), Code: http.StatusBadRequest, @@ -547,7 +558,7 @@ func (h Handler) PostSchema(w http.ResponseWriter, r *http.Request) { if err != nil { if errors.Is(err, registry.ErrUnknownComp) { body, _ := json.Marshal(report{ - Message: "Bad request: unknown value for compatibility_mode", + Message: "Bad request: unknown compatibility_mode value", }) writeResponse(w, responseBodyAndCode{ Body: body, @@ -558,7 +569,7 @@ func (h Handler) PostSchema(w http.ResponseWriter, r *http.Request) { if errors.Is(err, registry.ErrUnknownVal) { body, _ := json.Marshal(report{ - Message: "Bad request: unknown value for validity_mode", + Message: "Bad request: unknown validity_mode value", }) writeResponse(w, responseBodyAndCode{ Body: body, diff --git a/registry/server/util.go b/registry/server/util.go index 3c52d5a..d596a3f 100644 --- a/registry/server/util.go +++ b/registry/server/util.go @@ -18,6 +18,7 @@ import ( "encoding/json" "io" "net/http" + "strings" "github.com/dataphos/schema-registry/registry" ) @@ -27,6 +28,8 @@ type responseBodyAndCode struct { Code int } +var supportedFormats = []string{"json", "avro", "xml", "csv", "protobuf"} + func writeResponse(w http.ResponseWriter, response responseBodyAndCode) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(response.Code) @@ -49,9 +52,24 @@ func readSchemaRegisterRequest(body io.ReadCloser) (registry.SchemaRegistrationR return registry.SchemaRegistrationRequest{}, err } + // check if format is unknown + format := strings.ToLower(schemaRegisterRequest.SchemaType) + if !containsFormat(format) { + return registry.SchemaRegistrationRequest{}, registry.ErrUnknownFormat + } + return schemaRegisterRequest, nil } +func containsFormat(format string) bool { + for _, supportedFormat := range supportedFormats { + if format == supportedFormat { + return true + } + } + return false +} + func readSchemaUpdateRequest(body io.ReadCloser) (registry.SchemaUpdateRequest, error) { encoded, err := io.ReadAll(body) if err != nil { diff --git a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/checker/Checker.java b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/checker/Checker.java index 74a9e5b..46ad3e3 100644 --- a/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/checker/Checker.java +++ b/registry/validity/external/validity-checker/src/main/java/net/syntio/validity/checker/Checker.java @@ -32,7 +32,7 @@ public static boolean checkValidity(String schemaType, String schema, String mod case "full" -> ValidityLevel.FULL; default -> ValidityLevel.NONE; }; - ContentValidator validator = ValidatorFactory.createValidator(schemaType); + ContentValidator validator = ValidatorFactory.createValidator(schemaType.toLowerCase()); if (validator == null) { // in case ValidatorFactory returns null return false; } diff --git a/registry/validity/http/http.go b/registry/validity/http/http.go index 32b9c83..b130163 100644 --- a/registry/validity/http/http.go +++ b/registry/validity/http/http.go @@ -77,7 +77,7 @@ func CheckOverHTTP(ctx context.Context, schemaType, schema, mode, url string) (b var parsedBody checkResponse if err = json.Unmarshal(body, &parsedBody); err != nil { - return false, "error unmarshalling compatibility check body", err + return false, "error unmarshalling validity check body", err } valid := parsedBody.Result diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 2444321..0d0c5a2 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -351,8 +351,7 @@ func (cc *CentralConsumer) Handle(ctx context.Context, message janitor.Message) setMessageRawAttributes(message, "Header error", err) return janitor.MessageTopicPair{Message: message, Topic: cc.Router.Route(janitor.Deadletter, message)}, nil } - } - if e.Error() == errtemplates.AttributeNotDefined(janitor.AttributeHeaderVersion).Error() { + } else if e.Error() == errtemplates.AttributeNotDefined(janitor.AttributeHeaderVersion).Error() { if cc.defaultHeaderSchema.DefaultHeaderSchemaVersion != "" { headerVersion = cc.defaultHeaderSchema.DefaultHeaderSchemaVersion } else { diff --git a/validator/internal/janitor/janitor.go b/validator/internal/janitor/janitor.go index c0c4fd4..1d9aacb 100644 --- a/validator/internal/janitor/janitor.go +++ b/validator/internal/janitor/janitor.go @@ -48,13 +48,25 @@ type Message struct { } const ( + // OldAttributeSchemaID is one of the keys expected to be found in the attributes field of the message. + // It holds the schema id information concerning the data field of the message + // This is planned to be dropped in the future version, but is kept for now to be backwards compatible with + // systems that still use this convention + OldAttributeSchemaID = "schemaId" + // AttributeSchemaID is one of the keys expected to be found in the attributes field of the message. // It holds the schema id information concerning the data field of the message - AttributeSchemaID = "schemaId" + AttributeSchemaID = "schema_id" + + // OldAttributeSchemaVersion is one of the keys expected to be found in the attributes field of the message, + // It holds the schema version information concerning the data field of the message. + // This is planned to be dropped in the future version, but is kept for now to be backwards compatible with + // systems that still use this convention + OldAttributeSchemaVersion = "versionId" // AttributeSchemaVersion is one of the keys expected to be found in the attributes field of the message, // It holds the schema version information concerning the data field of the message. - AttributeSchemaVersion = "versionId" + AttributeSchemaVersion = "version" // AttributeFormat is one of the keys expected to be found in the attributes field of the message. // It holds the format of the data field of the message. @@ -62,15 +74,15 @@ const ( // HeaderValidation is one of the keys that can occur in raw attributes section of header. // It determines if the header will be validated - HeaderValidation = "headerValidation" + HeaderValidation = "validate_header" // AttributeHeaderID is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true. // It holds the header's schema id that is used to check header validity - AttributeHeaderID = "headerSchemaId" + AttributeHeaderID = "header_schema_id" // AttributeHeaderVersion is one of the keys expected in raw attributes section of header, but only if HeaderValidation is true. // It holds the header's schema version that is used to check header validity - AttributeHeaderVersion = "headerVersionId" + AttributeHeaderVersion = "header_version" ) // MessageSchemaPair wraps a Message with the Schema relating to this Message. @@ -157,7 +169,8 @@ func generateHeaderData(rawAttributes map[string]interface{}) ([]byte, error) { cleanAttributes := make(map[string]interface{}) for key, value := range rawAttributes { if key == HeaderValidation || key == AttributeHeaderID || key == AttributeHeaderVersion || - key == AttributeSchemaID || key == AttributeSchemaVersion || key == AttributeFormat { + key == AttributeSchemaID || key == AttributeSchemaVersion || key == AttributeFormat || + key == OldAttributeSchemaID || key == OldAttributeSchemaVersion { continue } else { cleanAttributes[key] = value diff --git a/validator/internal/janitor/parse.go b/validator/internal/janitor/parse.go index fb79578..3ae2bb2 100644 --- a/validator/internal/janitor/parse.go +++ b/validator/internal/janitor/parse.go @@ -56,7 +56,10 @@ func ExtractAttributes(raw map[string]interface{}) (Attributes, error) { schemaID, ok := raw[AttributeSchemaID] if !ok { - return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaID) + schemaID, ok = raw[OldAttributeSchemaID] + if !ok { + return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaID) + } } schemaIDStr, ok = schemaID.(string) if !ok { @@ -65,7 +68,10 @@ func ExtractAttributes(raw map[string]interface{}) (Attributes, error) { version, ok := raw[AttributeSchemaVersion] if !ok { - return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaVersion) + version, ok = raw[OldAttributeSchemaVersion] + if !ok { + return Attributes{}, errtemplates.AttributeNotDefined(AttributeSchemaVersion) + } } versionStr, ok = version.(string) if !ok { From ee75fece7f8cf4d36a95e9d7f6214b6827caf751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0imun=20=C5=A0prem?= <92316028+Simun17@users.noreply.github.com> Date: Tue, 4 Feb 2025 14:17:56 +0100 Subject: [PATCH 5/5] refactor: rename ValidateHeaders to ValidateHeader (#56) --- validator/internal/centralconsumer/centralconsumer.go | 8 ++++---- validator/internal/config/centralconsumer.go | 2 +- validator/internal/janitorctl/run.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/validator/internal/centralconsumer/centralconsumer.go b/validator/internal/centralconsumer/centralconsumer.go index 0d0c5a2..ff5d808 100644 --- a/validator/internal/centralconsumer/centralconsumer.go +++ b/validator/internal/centralconsumer/centralconsumer.go @@ -97,8 +97,8 @@ type Settings struct { // NumInferrers defines the maximum amount of inflight destination topic inference jobs (validation and routing). NumInferrers int - // ValidateHeaders defines if the messages' headers will be validated - ValidateHeaders bool + // ValidateHeader defines if the messages' headers will be validated + ValidateHeader bool // DefaultHeaderSchemaId is default ID of the header schema DefaultHeaderSchemaId string @@ -143,7 +143,7 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator if settings.NumInferrers > 0 { validatorsSem = make(chan struct{}, settings.NumInferrers) } - if settings.ValidateHeaders { + if settings.ValidateHeader { _, ok := validators["json"] if !ok { // if json validation is turned off, this version of json validator is used by default for validating message header @@ -200,7 +200,7 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator Specification: schemaVersion.Specification, }, encryptionKey: encryptionKey, - validateHeaders: settings.ValidateHeaders, + validateHeaders: settings.ValidateHeader, defaultHeaderSchema: config.DefaultHeaderSchema{ DefaultHeaderSchemaId: settings.DefaultHeaderSchemaId, DefaultHeaderSchemaVersion: settings.DefaultHeaderSchemaVersion, diff --git a/validator/internal/config/centralconsumer.go b/validator/internal/config/centralconsumer.go index c31a0e4..aeec02c 100644 --- a/validator/internal/config/centralconsumer.go +++ b/validator/internal/config/centralconsumer.go @@ -30,7 +30,7 @@ type CentralConsumer struct { ShouldLog CentralConsumerShouldLog `toml:"should_log"` NumSchemaCollectors int `toml:"num_schema_collectors" default:"-1"` NumInferrers int `toml:"num_inferrers" default:"-1"` - ValidateHeaders bool `toml:"validate_headers"` + ValidateHeader bool `toml:"validate_header"` DefaultHeaderSchema DefaultHeaderSchema `toml:"default_header_schema"` MetricsLoggingInterval time.Duration `toml:"metrics_logging_interval" default:"5s"` RunOptions RunOptions `toml:"run_option"` diff --git a/validator/internal/janitorctl/run.go b/validator/internal/janitorctl/run.go index 1132656..57425cc 100644 --- a/validator/internal/janitorctl/run.go +++ b/validator/internal/janitorctl/run.go @@ -86,7 +86,7 @@ func RunCentralConsumer(configFile string) { centralconsumer.Settings{ NumSchemaCollectors: cfg.NumSchemaCollectors, NumInferrers: cfg.NumInferrers, - ValidateHeaders: cfg.ValidateHeaders, + ValidateHeader: cfg.ValidateHeader, DefaultHeaderSchemaId: cfg.DefaultHeaderSchema.DefaultHeaderSchemaId, DefaultHeaderSchemaVersion: cfg.DefaultHeaderSchema.DefaultHeaderSchemaVersion, },