Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fix merge conflicts #58

Merged
merged 6 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion validator/cmd/centralconsumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

func main() {
configFile := flag.String("f", "", "toml file containing configuration of working environment")
configFile := flag.String("f", "./config/validator.toml", "toml file containing configuration of working environment")
flag.Parse()

janitorctl.RunCentralConsumer(*configFile)
Expand Down
43 changes: 32 additions & 11 deletions validator/config/validator.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ schema_id = ""
schema_version = ""
schema_type = ""

validate_header = "true"

#default_header_schema_id = "5"
#default_header_schema_version = "1"

[consumer]
type = "" # insert "kafka", "pubsub", "servicebus" or "jetstream"
type = "kafka" # insert "kafka", "pubsub", "servicebus" or "jetstream"
encryption_key = ""

[consumer.kafka]
address = "" # insert
topic = "" # insert
group_id = "" # insert
address = "34.116.158.107:9094" # insert
topic = "input-topic" # insert
group_id = "Group" # insert

[consumer.pubsub]
project_id = "" # insert
Expand All @@ -42,10 +47,13 @@ subject = "" # insert
consumer_name = "" # insert

[producer]
type = "" # insert "kafka", "eventhubs", "pubsub", "servicebus" or "jetstream"
type = "kafka" # insert "kafka", "eventhubs", "pubsub", "servicebus" or "jetstream"

[producer.kafka]
address = "" # insert
address = "34.116.158.107:9094" # insert

[producer.kafka.settings]
kafka_acks = "1"

[producer.pubsub]
project_id = "" # insert
Expand All @@ -57,18 +65,31 @@ connection_string = "" # insert
url = "" # insert

[topics]
valid = "" # insert
dead_letter = "" # insert
valid = "valid-topic" # insert
dead_letter = "dead-letter-topic" # insert

[registry]
url = ""
url = "http://34.118.10.21:8080"
type = "janitor" # insert "janitor" or "apicurio"
groupID = "default"
groupID = "group"

[validators]
enable_json = "true"
enable_avro = "true"
enable_csv = "false"
enable_xml = "false"
enable_protobuf = "false"
csv_url = "http://csv-validator-svc:8080"
csv_url = "http://localhost:8081"
xml_url = "http://xml-validator-svc:8081"

[should_log]
missing_schema = "true"
valid = "true"
dead_letter = "true"

[default_header_schema]
id = "9"
version = "1"

[run_option]
err_threshold = 100
8 changes: 4 additions & 4 deletions validator/internal/centralconsumer/centralconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type Settings struct {
// NumInferrers defines the maximum amount of inflight destination topic inference jobs (validation and routing).
NumInferrers int

// ValidateHeaders defines if the messages' headers will be validated
ValidateHeaders bool
// ValidateHeader defines if the messages' headers will be validated
ValidateHeader bool

// DefaultHeaderSchemaId is default ID of the header schema
DefaultHeaderSchemaId string
Expand Down Expand Up @@ -143,7 +143,7 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator
if settings.NumInferrers > 0 {
validatorsSem = make(chan struct{}, settings.NumInferrers)
}
if settings.ValidateHeaders {
if settings.ValidateHeader {
_, ok := validators["json"]
if !ok {
// if json validation is turned off, this version of json validator is used by default for validating message header
Expand Down Expand Up @@ -200,7 +200,7 @@ func New(registry registry.SchemaRegistry, publisher broker.Publisher, validator
Specification: schemaVersion.Specification,
},
encryptionKey: encryptionKey,
validateHeaders: settings.ValidateHeaders,
validateHeaders: settings.ValidateHeader,
defaultHeaderSchema: config.DefaultHeaderSchema{
DefaultHeaderSchemaId: settings.DefaultHeaderSchemaId,
DefaultHeaderSchemaVersion: settings.DefaultHeaderSchemaVersion,
Expand Down
2 changes: 1 addition & 1 deletion validator/internal/config/centralconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type CentralConsumer struct {
ShouldLog CentralConsumerShouldLog `toml:"should_log"`
NumSchemaCollectors int `toml:"num_schema_collectors" default:"-1"`
NumInferrers int `toml:"num_inferrers" default:"-1"`
ValidateHeaders bool `toml:"validate_headers"`
ValidateHeader bool `toml:"validate_header"`
DefaultHeaderSchema DefaultHeaderSchema `toml:"default_header_schema"`
MetricsLoggingInterval time.Duration `toml:"metrics_logging_interval" default:"5s"`
RunOptions RunOptions `toml:"run_option"`
Expand Down
2 changes: 1 addition & 1 deletion validator/internal/janitorctl/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func RunCentralConsumer(configFile string) {
centralconsumer.Settings{
NumSchemaCollectors: cfg.NumSchemaCollectors,
NumInferrers: cfg.NumInferrers,
ValidateHeaders: cfg.ValidateHeaders,
ValidateHeader: cfg.ValidateHeader,
DefaultHeaderSchemaId: cfg.DefaultHeaderSchema.DefaultHeaderSchemaId,
DefaultHeaderSchemaVersion: cfg.DefaultHeaderSchema.DefaultHeaderSchemaVersion,
},
Expand Down
Loading