From a71341d68b8d0d77bf562cbc5ce7283f79f4cd7d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 28 Aug 2023 19:44:50 -0600 Subject: [PATCH] pkg/sr: update for new APIs and to be forward compatible With an HTTP API, the API can add query parameters periodically, any addition of which breaks a Go API where all query parameters are spelled out as method arguments. Now, we have moved all query parameters into values in a context, and added a helper sr.WithParams to make it easy to build parameters. All APIs document which parameters they support. We also have global parameters on the client itself, if you always want to show deleted or something. This also adds metadata and ruleset to the schema type, even though the validation of these is really only possible in the Java client (as documented via limitations). Lastly, this adds support for a few missing APIs: * SubjectsByID * SchemaVersionsByID * SubjectVersions * SchemaTextByVersion * AllSchemas This breaks all CompatibilityLevel APIs, since the compatibility level stuff has been expanded to a whole bunch of compatibility / configuration settings. Namely, we just drop the "Level". This breaks all APIs that had ShowHideDeleted arguments, favoring instead the new ShowDeleted Param. We keep the DeleteHow param, because it is important at call sites to see that you are using hard deletion. --- pkg/sr/api.go | 450 +++++++++++++++++++++++++++++++---------------- pkg/sr/client.go | 10 +- pkg/sr/config.go | 15 +- pkg/sr/enums.go | 107 +++++++++++ pkg/sr/params.go | 195 ++++++++++++++++++++ 5 files changed, 616 insertions(+), 161 deletions(-) create mode 100644 pkg/sr/params.go diff --git a/pkg/sr/api.go b/pkg/sr/api.go index 112dd033..8a1fbde5 100644 --- a/pkg/sr/api.go +++ b/pkg/sr/api.go @@ -9,62 +9,110 @@ import ( "sync/atomic" ) +// TODO +// * /contexts (get) // looks niche right now + // This file is an implementation of: // // https://docs.confluent.io/platform/current/schema-registry/develop/api.html // +type ( + // SchemaReference is a way for a one schema to reference another. The + // details for how referencing is done are type specific; for example, + // JSON objects that use the key "$ref" can refer to another schema via + // URL. For more details on references, see the following link: + // + // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references + // https://docs.confluent.io/platform/current/schema-registry/develop/api.html + SchemaReference struct { + Name string `json:"name"` + Subject string `json:"subject"` + Version int `json:"version"` + } + + // SchemaMetadata is arbitrary information about the schema or its + // constituent parts, such as whether a field contains sensitive + // information or who created a data contract. + SchemaMetadata struct { + Tags map[string][]string `json:"tags,omitempty"` + Properties map[string]string `json:"properties,omitempty"` + Sensitive []string `json:"sensitive,omitempty"` + } + + // SchemaRule specifies integrity constraints or data policies in a + // data contract. These data rules or policies can enforce that a field + // that contains sensitive information must be encrypted, or that a + // message containing an invalid age must be sent to a dead letter + // queue + // + // https://docs.confluent.io/platform/current/schema-registry/fundamentals/data-contracts.html#rules + SchemaRule struct { + Name string `json:"name"` // Name is a user-defined name to reference the rule. + Doc string `json:"doc,omitempty"` // Doc is an optional description of the rule. + Kind SchemaRuleKind `json:"kind"` // Kind is the type of rule. + Mode SchemaRuleMode `json:"mode"` // Mode is the mode of the rule. + Type string `json:"type"` // Type is the type of rule, which invokes a specific rule executor, such as Google Common Expression Language (CEL) or JSONata. + Tags []string `json:"tags"` // Tags to which this rule applies. + Params map[string]string `json:"params,omitempty"` // Optional params for the rule. + Expr string `json:"expr"` // Expr is the rule expression. + OnSuccess string `json:"onSuccess,omitempty"` // OnSuccess is an optional action to execute if the rule succeeds, otherwise the built-in action type NONE is used. For UPDOWN and WRITEREAD rules, one can specify two actions separated by commas, such as "NONE,ERROR" for a WRITEREAD rule. In this case NONE applies to WRITE and ERROR applies to READ + OnFailure string `json:"onFailure,omitempty"` // OnFailure is an optional action to execute if the rule fails, otherwise the built-in action type NONE is used. See OnSuccess for more details. + Disabled bool `json:"disabled,omitempty"` // Disabled specifies whether the rule is disabled. + } + + // SchemaRuleSet groups migration rules and domain validation rules. + SchemaRuleSet struct { + MigrationRules []SchemaRule `json:"migrationRules,omitempty"` + DomainRules []SchemaRule `json:"domainRules,omitempty"` + } + + // Schema is the object form of a schema for the HTTP API. + Schema struct { + // Schema is the actual unescaped text of a schema. + Schema string `json:"schema"` + + // Type is the type of a schema. The default type is avro. + Type SchemaType `json:"schemaType,omitempty"` + + // References declares other schemas this schema references. See the + // docs on SchemaReference for more details. + References []SchemaReference `json:"references,omitempty"` + + // SchemaMetadata is arbitrary information about the schema. + SchemaMetadata SchemaMetadata `json:"metadata,omitempty"` + + // SchemaRuleSet is a set of rules that govern the schema. + SchemaRuleSet SchemaRuleSet `json:"ruleSet,omitempty"` + } + + // SubjectSchema pairs the subject, global identifier, and version of a + // schema with the schema itself. + SubjectSchema struct { + // Subject is the subject for this schema. This usually corresponds to + // a Kafka topic, and whether this is for a key or value. For example, + // "foo-key" would be the subject for the foo topic for serializing the + // key field of a record. + Subject string `json:"subject"` + + // Version is the version of this subject. + Version int `json:"version"` + + // ID is the globally unique ID of the schema. + ID int `json:"id"` + + Schema + } +) + // SupportedTypes returns the schema types that are supported in the schema // registry. func (cl *Client) SupportedTypes(ctx context.Context) ([]SchemaType, error) { // GET /schemas/types var types []SchemaType defer func() { sort.Slice(types, func(i, j int) bool { return types[i] < types[j] }) }() - return types, cl.get(ctx, "/schemas/types", &types) -} - -// SchemaReference is a way for a one schema to reference another. The details -// for how referencing is done are type specific; for example, JSON objects -// that use the key "$ref" can refer to another schema via URL. For more details -// on references, see the following link: -// -// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references -// https://docs.confluent.io/platform/current/schema-registry/develop/api.html -type SchemaReference struct { - Name string `json:"name"` - Subject string `json:"subject"` - Version int `json:"version"` -} - -// Schema is the object form of a schema for the HTTP API. -type Schema struct { - // Schema is the actual unescaped text of a schema. - Schema string `json:"schema"` - - // Type is the type of a schema. The default type is avro. - Type SchemaType `json:"schemaType,omitempty"` - - // References declares other schemas this schema references. See the - // docs on SchemaReference for more details. - References []SchemaReference `json:"references,omitempty"` -} - -// SubjectSchema pairs the subject, global identifier, and version of a schema -// with the schema itself. -type SubjectSchema struct { - // Subject is the subject for this schema. This usually corresponds to - // a Kafka topic, and whether this is for a key or value. For example, - // "foo-key" would be the subject for the foo topic for serializing the - // key field of a record. - Subject string `json:"subject"` - - // Version is the version of this subject. - Version int `json:"version"` - - // ID is the globally unique ID of the schema. - ID int `json:"id"` - - Schema + err := cl.get(ctx, "/schemas/types", &types) + return types, err } // CommSubjectSchemas splits l and r into three sets: what is unique in l, what @@ -118,33 +166,24 @@ func CommSubjectSchemas(l, r []SubjectSchema) (luniq, runiq, common []SubjectSch return luniq, runiq, common } -// HideShowDeleted is a typed bool indicating whether queries should show -// or hide soft deleted schemas / subjects. -type HideShowDeleted bool - -const ( - // HideDeleted hides soft deleted schemas or subjects. - HideDeleted = false - // ShowDeleted shows soft deleted schemas or subjects. - ShowDeleted = true -) - // Subjects returns subjects available in the registry. -func (cl *Client) Subjects(ctx context.Context, deleted HideShowDeleted) ([]string, error) { - // GET /subjects?deleted={x} +// +// This supports params [SubjectPrefix], [ShowDeleted], and [DeletedOnly]. +func (cl *Client) Subjects(ctx context.Context) ([]string, error) { + // GET /subjects var subjects []string - path := "/subjects" - if deleted { - path += "?deleted=true" - } - return subjects, cl.get(ctx, path, &subjects) + err := cl.get(ctx, "/subjects", &subjects) + return subjects, err } // SchemaByID returns the schema for a given schema ID. +// +// This supports params [Subject], [Format], and [FetchMaxID]. func (cl *Client) SchemaByID(ctx context.Context, id int) (Schema, error) { // GET /schemas/ids/{id} var s Schema - return s, cl.get(ctx, fmt.Sprintf("/schemas/ids/%d", id), &s) + err := cl.get(ctx, fmt.Sprintf("/schemas/ids/%d", id), &s) + return s, err } // SchemaTextByID returns the actual text of a schema. @@ -156,6 +195,8 @@ func (cl *Client) SchemaByID(ctx context.Context, id int) (Schema, error) { // this will return // // {"type":"boolean"} +// +// This supports params [Subject], [Format]. func (cl *Client) SchemaTextByID(ctx context.Context, id int) (string, error) { // GET /schemas/ids/{id}/schema var s []byte @@ -165,6 +206,32 @@ func (cl *Client) SchemaTextByID(ctx context.Context, id int) (string, error) { return string(s), nil } +// SubjectsByID returns the subjects associated with a schema ID. +// +// This supports params [Subject] and [ShowDeleted]. +func (cl *Client) SubjectsByID(ctx context.Context, id int) ([]string, error) { + // GET /schemas/ids/{id}/subjects + var subjects []string + err := cl.get(ctx, fmt.Sprintf("/schemas/ids/%d/subjects", id), &subjects) + return subjects, err +} + +// SchemaVersion is a subject version pair. +type SubjectVersion struct { + Subject string `json:"subject"` + Version int `json:"version"` +} + +// SchemaVersionsByID returns all subject versions associated with a schema ID. +// +// This supports params [Subject] and [ShowDeleted]. +func (cl *Client) SchemaVersionsByID(ctx context.Context, id int) ([]SubjectVersion, error) { + // GET /schemas/ids/{id}/versions + var versions []SubjectVersion + err := cl.get(ctx, fmt.Sprintf("/schemas/ids/%d/versions", id), &versions) + return versions, err +} + func pathSubject(subject string) string { return fmt.Sprintf("/subjects/%s", url.PathEscape(subject)) } func pathSubjectWithVersion(subject string) string { return pathSubject(subject) + "/versions" } func pathSubjectVersion(subject string, version int) string { @@ -178,43 +245,77 @@ func pathConfig(subject string) string { if subject == "" { return "/config" } - return fmt.Sprintf("/config/%s?defaultToGlobal=true", url.PathEscape(subject)) + return fmt.Sprintf("/config/%s", url.PathEscape(subject)) } -func pathMode(subject string, force bool) string { +func pathMode(subject string) string { if subject == "" { - if force { - return "/mode?force=true" - } return "/mode" } - if force { - return fmt.Sprintf("/mode/%s?force=true", url.PathEscape(subject)) - } - // set (no force), or delete return fmt.Sprintf("/mode/%s", url.PathEscape(subject)) } +// SubjectVersions returns all versions for a given subject. +// +// This supports params [ShowDeleted] and [DeletedOnly]. +func (cl *Client) SubjectVersions(ctx context.Context, subject string) ([]int, error) { + // GET /subjects/{subject}/versions + var versions []int + err := cl.get(ctx, pathSubject(subject), &versions) + return versions, err +} + // SchemaByVersion returns the schema for a given subject and version. You can // use -1 as the version to return the latest schema. -func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int, deleted HideShowDeleted) (SubjectSchema, error) { +// +// This supports param [ShowDeleted]. +func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int) (SubjectSchema, error) { // GET /subjects/{subject}/versions/{version} var ss SubjectSchema path := pathSubjectVersion(subject, version) - if deleted { - path += "?deleted=true" + err := cl.get(ctx, path, &ss) + return ss, err +} + +// SchemaTextByVersion returns the actual text of a schema, by subject and +// version. You can use -1 as the version to return the latest schema. +// +// For example, if the schema for an ID is +// +// "{\"type\":\"boolean\"}" +// +// this will return +// +// {"type":"boolean"} +// +// This supports param [ShowDeleted]. +func (cl *Client) SchemaTextByVersion(ctx context.Context, subject string, version int) (string, error) { + // GET /subjects/{subject}/versions/{version}/schema + var s []byte + path := pathSubjectVersion(subject, version) + "/schema" + if err := cl.get(ctx, path, &s); err != nil { + return "", err } - return ss, cl.get(ctx, path, &ss) + return string(s), nil +} + +// AllSchemas returns all schemas for all subjects. +// +// This supports params [SubjectPrefix], [ShowDeleted], and [LatestOnly]. +func (cl *Client) AllSchemas(ctx context.Context) ([]SubjectSchema, error) { + // GET /schemas => []SubjectSchema + var ss []SubjectSchema + err := cl.get(ctx, "/schemas", &ss) + return ss, err } // Schemas returns all schemas for the given subject. -func (cl *Client) Schemas(ctx context.Context, subject string, deleted HideShowDeleted) ([]SubjectSchema, error) { +// +// This supports param [ShowDeleted]. +func (cl *Client) Schemas(ctx context.Context, subject string) ([]SubjectSchema, error) { // GET /subjects/{subject}/versions => []int (versions) var versions []int path := pathSubjectWithVersion(subject) - if deleted { - path += "?deleted=true" - } if err := cl.get(ctx, path, &versions); err != nil { return nil, err } @@ -234,7 +335,7 @@ func (cl *Client) Schemas(ctx context.Context, subject string, deleted HideShowD wg.Add(1) go func() { defer wg.Done() - s, err := cl.SchemaByVersion(cctx, subject, version, deleted) + s, err := cl.SchemaByVersion(cctx, subject, version) schemas[slot] = s if err != nil && atomic.SwapUint32(&errOnce, 1) == 0 { firstErr = err @@ -248,12 +349,13 @@ func (cl *Client) Schemas(ctx context.Context, subject string, deleted HideShowD } // CreateSchema attempts to create a schema in the given subject. +// +// This supports param [Normalize]. func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error) { // POST /subjects/{subject}/versions => returns ID + // Newer SR returns the full SubjectSchema, but old does not, so we + // re-request to find the full information. path := pathSubjectWithVersion(subject) - if cl.normalize { - path += "?normalize=true" - } var id struct { ID int `json:"id"` } @@ -261,7 +363,7 @@ func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (S return SubjectSchema{}, err } - usages, err := cl.SchemaUsagesByID(ctx, id.ID, HideDeleted) + usages, err := cl.SchemaUsagesByID(ctx, id.ID) if err != nil { return SubjectSchema{}, err } @@ -275,14 +377,13 @@ func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (S // LookupSchema checks to see if a schema is already registered and if so, // returns its ID and version in the SubjectSchema. +// +// This supports params Normalize and Deleted. func (cl *Client) LookupSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error) { // POST /subjects/{subject}/ - path := pathSubject(subject) - if cl.normalize { - path += "?normalize=true" - } var ss SubjectSchema - return ss, cl.post(ctx, path, s, &ss) + err := cl.post(ctx, pathSubject(subject), s, &ss) + return ss, err } // DeleteHow is a typed bool indicating how subjects or schemas should be @@ -307,7 +408,8 @@ func (cl *Client) DeleteSubject(ctx context.Context, subject string, how DeleteH } var versions []int defer func() { sort.Ints(versions) }() - return versions, cl.delete(ctx, path, &versions) + err := cl.delete(ctx, path, &versions) + return versions, err } // DeleteSchema deletes the schema at the given version. You must soft delete @@ -324,7 +426,9 @@ func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, // SchemaReferences returns all schemas that references the input // subject-version. You can use -1 to check the latest version. -func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int, deleted HideShowDeleted) ([]SubjectSchema, error) { +// +// This supports param [ShowDeleted]. +func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int) ([]SubjectSchema, error) { // GET /subjects/{subject}/versions/{version}/referencedby // SchemaUsagesByID var ids []int @@ -345,7 +449,7 @@ func (cl *Client) SchemaReferences(ctx context.Context, subject string, version wg.Add(1) go func() { defer wg.Done() - idSchemas, err := cl.SchemaUsagesByID(cctx, id, deleted) + idSchemas, err := cl.SchemaUsagesByID(cctx, id) mu.Lock() defer mu.Unlock() schemas = append(schemas, idSchemas...) @@ -363,7 +467,9 @@ func (cl *Client) SchemaReferences(ctx context.Context, subject string, version // SchemaUsagesByID returns all usages of a given schema ID. A single schema's // can be reused in many subject-versions; this function can be used to map a // schema to all subject-versions that use it. -func (cl *Client) SchemaUsagesByID(ctx context.Context, id int, deleted HideShowDeleted) ([]SubjectSchema, error) { +// +// This supports param [ShowDeleted]. +func (cl *Client) SchemaUsagesByID(ctx context.Context, id int) ([]SubjectSchema, error) { // GET /schemas/ids/{id}/versions // SchemaByVersion type subjectVersion struct { @@ -372,9 +478,6 @@ func (cl *Client) SchemaUsagesByID(ctx context.Context, id int, deleted HideShow } var subjectVersions []subjectVersion path := fmt.Sprintf("/schemas/ids/%d/versions", id) - if deleted { - path += "?deleted=true" - } if err := cl.get(ctx, path, &subjectVersions); err != nil { return nil, err } @@ -393,7 +496,7 @@ func (cl *Client) SchemaUsagesByID(ctx context.Context, id int, deleted HideShow wg.Add(1) go func() { defer wg.Done() - s, err := cl.SchemaByVersion(cctx, sv.Subject, sv.Version, deleted) + s, err := cl.SchemaByVersion(cctx, sv.Subject, sv.Version) schemas[slot] = s if err != nil && atomic.SwapUint32(&errOnce, 1) == 0 { firstErr = err @@ -434,15 +537,28 @@ const GlobalSubject = "" // CompatibilityResult is the compatibility level for a subject. type CompatibilityResult struct { - Subject string // The subject this compatibility result is for, or empty for the global level. - Level CompatibilityLevel // The subject (or global) compatibility level. - Err error // The error received for getting this compatibility level. + Subject string `json:"-"` // The subject this compatibility result is for, or empty for the global compatibility.. + + Level CompatibilityLevel `json:"compatibilityLevel"` // The subject (or global) compatibility level. + Alias string `json:"alias"` // The subject alias, if any. + Normalize bool `json:"normalize"` // Whether or not schemas are normalized by default. + Group string `json:"compatibilityGroup"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility. + DefaultMetadata SchemaMetadata `json:"defaultMetadata"` // Default metadata used for schema registration. + OverrideMetadata SchemaMetadata `json:"overrideMetadata"` // Override metadata used for schema registration. + DefaultRuleSet SchemaRuleSet `json:"defaultRuleSet"` // Default rule set used for schema registration. + OverrideRuleSet SchemaRuleSet `json:"overrideRuleSet"` // Override rule set used for schema registration. + + Err error `json:"-"` // The error received for getting this compatibility. } -// CompatibilityLevel returns the subject level and global level compatibility -// of each requested subject. The global level can be requested by using either -// an empty subject or by specifying no subjects. -func (cl *Client) CompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult { +// Compatibility returns the subject compatibility and global compatibility of +// each requested subject. The global compatibility can be requested by using +// either an empty subject or by specifying no subjects. +// +// This supports params [DefaultToGlobal]. +// +// This can return 200 or 500 per result. +func (cl *Client) Compatibility(ctx context.Context, subjects ...string) []CompatibilityResult { // GET /config/{subject} // GET /config if len(subjects) == 0 { @@ -458,15 +574,11 @@ func (cl *Client) CompatibilityLevel(ctx context.Context, subjects ...string) [] wg.Add(1) go func() { defer wg.Done() - var c struct { - Level CompatibilityLevel `json:"compatibilityLevel"` - } + var c CompatibilityResult err := cl.get(ctx, pathConfig(subject), &c) - results[slot] = CompatibilityResult{ - Subject: subject, - Level: c.Level, - Err: err, - } + c.Subject = subject + c.Err = err + results[slot] = c }() } wg.Wait() @@ -474,10 +586,30 @@ func (cl *Client) CompatibilityLevel(ctx context.Context, subjects ...string) [] return results } -// SetCompatibilityLevel sets the compatibility level for each requested -// subject. The global level can be set by either using an empty subject or by +// SetCompatibility contains information used for setting global or per-subject +// compatibility configuration. +// +// The main difference between this and the CompatibilityResult is that this +// struct marshals the compatibility level as "compatibility". +type SetCompatibility struct { + Subject string `json:"-"` // The subject this compatibility set is for, or empty for the global compatibility.. + + Level CompatibilityLevel `json:"compatibility"` // The subject (or global) compatibility level. + Alias string `json:"alias"` // The subject alias, if any. + Normalize bool `json:"normalize"` // Whether or not schemas are normalized by default. + Group string `json:"compatibilityGroup"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility. + DefaultMetadata SchemaMetadata `json:"defaultMetadata"` // Default metadata used for schema registration. + OverrideMetadata SchemaMetadata `json:"overrideMetadata"` // Override metadata used for schema registration. + DefaultRuleSet SchemaRuleSet `json:"defaultRuleSet"` // Default rule set used for schema registration. + OverrideRuleSet SchemaRuleSet `json:"overrideRuleSet"` // Override rule set used for schema registration. + + Err error `json:"-"` // The error received for setting this compatibility. +} + +// SetCompatibilitysets the compatibility for each requested subject. The +// global compatibility can be set by either using an empty subject or by // specifying no subjects. If specifying no subjects, this returns one element. -func (cl *Client) SetCompatibilityLevel(ctx context.Context, level CompatibilityLevel, subjects ...string) []CompatibilityResult { +func (cl *Client) SetCompatibility(ctx context.Context, compat SetCompatibility, subjects ...string) []CompatibilityResult { // PUT /config/{subject} // PUT /config if len(subjects) == 0 { @@ -493,14 +625,19 @@ func (cl *Client) SetCompatibilityLevel(ctx context.Context, level Compatibility wg.Add(1) go func() { defer wg.Done() - c := struct { - Level CompatibilityLevel `json:"compatibility"` - }{level} + c := compat err := cl.put(ctx, pathConfig(subject), c, &c) results[slot] = CompatibilityResult{ - Subject: subject, - Level: c.Level, - Err: err, + Subject: subject, + Level: c.Level, + Alias: c.Alias, + Normalize: c.Normalize, + Group: c.Group, + DefaultMetadata: c.DefaultMetadata, + OverrideMetadata: c.OverrideMetadata, + DefaultRuleSet: c.DefaultRuleSet, + OverrideRuleSet: c.OverrideRuleSet, + Err: err, } }() } @@ -509,12 +646,15 @@ func (cl *Client) SetCompatibilityLevel(ctx context.Context, level Compatibility return results } -// ResetCompatibilityLevel deletes any subject-level compatibility level and -// reverts to the global default. -func (cl *Client) ResetCompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult { +// ResetCompatibility deletes any subject-level compatibility and reverts to the +// global default. The global compatibility can be reset by either using an +// empty subject or by specifying no subjects. +// +// This can return 200 or 500. +func (cl *Client) ResetCompatibility(ctx context.Context, subjects ...string) []CompatibilityResult { // DELETE /config/{subject} if len(subjects) == 0 { - return nil + subjects = append(subjects, GlobalSubject) } var ( wg sync.WaitGroup @@ -526,14 +666,18 @@ func (cl *Client) ResetCompatibilityLevel(ctx context.Context, subjects ...strin wg.Add(1) go func() { defer wg.Done() - var c struct { - Level CompatibilityLevel `json:"compatibility"` - } + var c SetCompatibility // unmarshals with "compatibility" err := cl.delete(ctx, pathConfig(subject), &c) results[slot] = CompatibilityResult{ - Subject: subject, - Level: c.Level, - Err: err, + Subject: subject, + Level: c.Level, + Alias: c.Alias, + Group: c.Group, + DefaultMetadata: c.DefaultMetadata, + OverrideMetadata: c.OverrideMetadata, + DefaultRuleSet: c.DefaultRuleSet, + OverrideRuleSet: c.OverrideRuleSet, + Err: err, } }() } @@ -542,20 +686,27 @@ func (cl *Client) ResetCompatibilityLevel(ctx context.Context, subjects ...strin return results } +// CheckCompatibilityResult is the response from the check compatibility endpoint. +type CheckCompatibilityResult struct { + Is bool `json:"is_compatible"` // Is is true if the schema is compatible. + Messages []string `json:"messages"` // Messages contains reasons a schema is not compatible. +} + // CheckCompatibility checks if a schema is compatible with the given version // that exists. You can use -1 to check compatibility with the latest version, // and -2 to check compatibility against all versions. -func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (bool, error) { - // POST /compatibility/subjects/{subject}/versions/{version}?reason=true - // POST /compatibility/subjects/{subject}/versions?reason=true - path := "/compatibility" + pathSubjectVersion(subject, version) + "?verbose=true" +// +// This supports params [Normalize] and [Verbose]. +func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (CheckCompatibilityResult, error) { + // POST /compatibility/subjects/{subject}/versions/{version} + // POST /compatibility/subjects/{subject}/versions + path := "/compatibility" + pathSubjectVersion(subject, version) if version == -2 { - path = "/compatibility" + pathSubjectWithVersion(subject) + "?verbose=true" + path = "/compatibility" + pathSubjectWithVersion(subject) } - var is struct { - Is bool `json:"is_compatible"` - } - return is.Is, cl.post(ctx, path, s, &is) + var is CheckCompatibilityResult + err := cl.post(ctx, path, s, &is) + return is, err } // ModeResult is the mode for a subject. @@ -572,6 +723,8 @@ type modeResponse struct { // Mode returns the subject and global mode of each requested subject. The // global mode can be requested by using either an empty subject or by // specifying no subjects. +// +// This supports params [DefaultToGlobal]. func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult { // GET /mode/{subject} // GET /mode @@ -589,7 +742,7 @@ func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult { go func() { defer wg.Done() var m modeResponse - err := cl.get(ctx, pathMode(subject, false), &m) + err := cl.get(ctx, pathMode(subject), &m) results[slot] = ModeResult{ Subject: subject, Mode: m.Mode, @@ -604,9 +757,10 @@ func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult { // SetMode sets the mode for each requested subject. The global mode can be set // by either using an empty subject or by specifying no subjects. If specifying -// no subjects, this returns one element. Force can be used to force setting -// the mode even if the registry has existing schemas. -func (cl *Client) SetMode(ctx context.Context, mode Mode, force bool, subjects ...string) []ModeResult { +// no subjects, this returns one element. +// +// This supports params [Force]. +func (cl *Client) SetMode(ctx context.Context, mode Mode, subjects ...string) []ModeResult { // PUT /mode/{subject} // PUT /mode if len(subjects) == 0 { @@ -623,7 +777,7 @@ func (cl *Client) SetMode(ctx context.Context, mode Mode, force bool, subjects . go func() { defer wg.Done() var m modeResponse - err := cl.put(ctx, pathMode(subject, force), mode, &m) + err := cl.put(ctx, pathMode(subject), mode, &m) results[slot] = ModeResult{ Subject: subject, Mode: m.Mode, @@ -653,7 +807,7 @@ func (cl *Client) ResetMode(ctx context.Context, subjects ...string) []ModeResul go func() { defer wg.Done() var m modeResponse - err := cl.delete(ctx, pathMode(subject, false), &m) + err := cl.delete(ctx, pathMode(subject), &m) results[slot] = ModeResult{ Subject: subject, Mode: m.Mode, diff --git a/pkg/sr/client.go b/pkg/sr/client.go index 1c4b1147..850a1c20 100644 --- a/pkg/sr/client.go +++ b/pkg/sr/client.go @@ -50,16 +50,15 @@ func (e *ResponseError) Error() string { // Client talks to a schema registry and contains helper functions to serialize // and deserialize objects according to schemas. type Client struct { - urls []string - httpcl *http.Client - ua string + urls []string + httpcl *http.Client + ua string + defParams Param basicAuth *struct { user string pass string } - - normalize bool } // NewClient returns a new schema registry client. @@ -123,6 +122,7 @@ start: if cl.basicAuth != nil { req.SetBasicAuth(cl.basicAuth.user, cl.basicAuth.pass) } + cl.applyParams(ctx, req) resp, err := cl.httpcl.Do(req) if err != nil { diff --git a/pkg/sr/config.go b/pkg/sr/config.go index db1570c4..499e0ab6 100644 --- a/pkg/sr/config.go +++ b/pkg/sr/config.go @@ -67,14 +67,6 @@ func DialTLSConfig(c *tls.Config) Opt { }} } -// Normalize sets the client to add the "?normalize=true" query parameter when -// getting or creating schemas. This can help collapse duplicate schemas into -// one, but can also be done with a configuration parameter on the schema -// registry itself. -func Normalize() Opt { - return opt{func(cl *Client) { cl.normalize = true }} -} - // BasicAuth sets basic authorization to use for every request. func BasicAuth(user, pass string) Opt { return opt{func(cl *Client) { @@ -84,3 +76,10 @@ func BasicAuth(user, pass string) Opt { }{user, pass} }} } + +// DefaultParams sets default parameters to apply to every request. +func DefaultParams(ps ...Param) Opt { + return opt{func(cl *Client) { + cl.defParams = mergeParams(ps...) + }} +} diff --git a/pkg/sr/enums.go b/pkg/sr/enums.go index 6ed8cc58..2c1b1ca7 100644 --- a/pkg/sr/enums.go +++ b/pkg/sr/enums.go @@ -157,3 +157,110 @@ func (m *Mode) UnmarshalText(text []byte) error { } return nil } + +// SchemaRuleKind as an enum representing the kind of schema rule. +type SchemaRuleKind int + +const ( + SchemaRuleKindTransform SchemaRuleKind = iota + SchemaRuleKindCondition +) + +func (k SchemaRuleKind) String() string { + switch k { + case SchemaRuleKindTransform: + return "TRANSFORM" + case SchemaRuleKindCondition: + return "CONDITION" + default: + return "" + } +} + +func (k SchemaRuleKind) MarshalText() ([]byte, error) { + s := k.String() + if s == "" { + return nil, fmt.Errorf("unknown schema rule kind %d", k) + } + return []byte(s), nil +} + +func (k *SchemaRuleKind) UnmarshalText(text []byte) error { + switch s := strings.ToUpper(string(text)); s { + default: + return fmt.Errorf("unknown schema rule kind %q", s) + case "TRANSFORM": + *k = SchemaRuleKindTransform + case "CONDITION": + *k = SchemaRuleKindCondition + } + return nil +} + +// Mode specifies a schema rule's mode. +// +// Migration rules can be specified for an UPGRADE, DOWNGRADE, or both +// (UPDOWN). Migration rules are used during complex schema evolution. +// +// Domain rules can be specified during serialization (WRITE), deserialization +// (READ) or both (WRITEREAD). +// +// Domain rules can be used to transform the domain values in a message +// payload. +type SchemaRuleMode int + +const ( + SchemaRuleModeUpgrade SchemaRuleMode = iota + SchemaRuleModeDowngrade + SchemaRuleModeUpdown + SchemaRuleModeWrite + SchemaRuleModeRead + SchemaRuleModeWriteRead +) + +func (m SchemaRuleMode) String() string { + switch m { + case SchemaRuleModeUpgrade: + return "UPGRADE" + case SchemaRuleModeDowngrade: + return "DOWNGRADE" + case SchemaRuleModeUpdown: + return "UPDOWN" + case SchemaRuleModeWrite: + return "WRITE" + case SchemaRuleModeRead: + return "READ" + case SchemaRuleModeWriteRead: + return "WRITEREAD" + default: + return "" + } +} + +func (m SchemaRuleMode) MarshalText() ([]byte, error) { + s := m.String() + if s == "" { + return nil, fmt.Errorf("unknown schema rule mode %d", m) + } + return []byte(s), nil +} + +func (m *SchemaRuleMode) UnmarshalText(text []byte) error { + switch s := strings.ToUpper(string(text)); s { + default: + return fmt.Errorf("unknown schema rule mode %q", s) + case "UPGRADE": + *m = SchemaRuleModeUpgrade + case "DOWNGRADE": + *m = SchemaRuleModeDowngrade + case "UPDOWN": + *m = SchemaRuleModeUpdown + case "WRITE": + *m = SchemaRuleModeWrite + case "READ": + *m = SchemaRuleModeRead + case "WRITEREAD": + *m = SchemaRuleModeWriteRead + } + return nil +} diff --git a/pkg/sr/params.go b/pkg/sr/params.go new file mode 100644 index 00000000..33eb3c56 --- /dev/null +++ b/pkg/sr/params.go @@ -0,0 +1,195 @@ +package sr + +import ( + "context" + "fmt" + "net/http" +) + +// Param is a parameter that can be passed to various APIs. Each API documents +// the parameters they accept. +type Param struct { + normalize bool + verbose bool + fetchMaxID bool + defaultToGlobal bool + force bool + latestOnly bool + showDeleted bool + deletedOnly bool + format string + subjectPrefix string + subject string + page *int + limit int +} + +// WithParams adds query parameters to the given context. This is a merge +// operation: any non-zero parameter is kept. The variadic nature of this +// allows for a nicer api: +// +// sr.WithParams(ctx, sr.Format("default"), sr.FetchMaxID) +func WithParams(ctx context.Context, p ...Param) context.Context { + return context.WithValue(ctx, ¶msKey, mergeParams(p...)) +} + +func (cl *Client) applyParams(ctx context.Context, req *http.Request) { + p := cl.defParams + user, ok := ctx.Value(¶msKey).(Param) + if ok { + p = mergeParams(p, user) + } + p.apply(req) +} + +func (p Param) apply(req *http.Request) { + q := req.URL.Query() + if p.normalize { + q.Set("normalize", "true") + } + if p.verbose { + q.Set("verbose", "true") + } + if p.fetchMaxID { + q.Set("fetchMaxId", "true") + } + if p.defaultToGlobal { + q.Set("defaultToGlobal", "true") + } + if p.force { + q.Set("force", "true") + } + if p.latestOnly { + q.Set("latestOnly", "true") + } + if p.showDeleted { + q.Set("deleted", "true") + } + if p.deletedOnly { + q.Set("deletedOnly", "true") + } + if p.format != "" { + q.Set("format", p.format) + } + if p.subjectPrefix != "" { + q.Set("subjectPrefix", p.subjectPrefix) + } + if p.subject != "" { + q.Set("subject", p.subject) + } + if p.page != nil && *p.page >= 0 { + q.Set("page", fmt.Sprintf("%d", *p.page)) + } + if p.limit > 0 { + q.Set("limit", fmt.Sprintf("%d", p.limit)) + } + req.URL.RawQuery = q.Encode() +} + +var paramsKey = "params_key" + +func mergeParams(p ...Param) Param { + var merged Param + for _, p := range p { + if p.normalize { + merged.normalize = true + } + if p.verbose { + merged.verbose = true + } + if p.fetchMaxID { + merged.fetchMaxID = true + } + if p.defaultToGlobal { + merged.defaultToGlobal = true + } + if p.force { + merged.force = true + } + if p.latestOnly { + merged.latestOnly = true + } + if p.showDeleted { + merged.showDeleted = true + } + if p.deletedOnly { + merged.deletedOnly = true + } + if p.format != "" { + merged.format = p.format + } + if p.subjectPrefix != "" { + merged.subjectPrefix = p.subjectPrefix + } + if p.subject != "" { + merged.subject = p.subject + } + if p.page != nil && *p.page >= 0 { + merged.page = p.page + } + if p.limit > 0 { + merged.limit = p.limit + } + } + return merged +} + +var ( + // Normalize is a Param that configures whether or not to normalize + // schema's in certain create- or get-schema operations. + Normalize = Param{normalize: true} + + // Verbose is a Param that configures whether or not to return verbose + // error messages when checking compatibility. + Verbose = Param{verbose: true} + + // FetchMaxID is a Param that configures whether or not to fetch the + // max schema ID in certain get-schema operations. + FetchMaxID = Param{fetchMaxID: true} + + // DefaultToGlobal is a Param that changes get-compatibility or + // get-mode to return the global compatibility or mode if the requested + // subject does not exist. + DefaultToGlobal = Param{defaultToGlobal: true} + + // Force is a Param that updating the mode if you are setting the mode + // to IMPORT and schemas currently exist. + Force = Param{force: true} + + // LatestOnly is a Param that configures whether or not to return only + // the latest schema in certain get-schema operations. + LatestOnly = Param{latestOnly: true} + + // ShowDeleted is a Param that configures whether or not to return + // deleted schemas or subjects in certain get operations. + ShowDeleted = Param{showDeleted: true} + + // DeletedOnly is a Param that configures whether to return only + // deleted schemas or subjects in certain get operations. + DeletedOnly = Param{deletedOnly: true} +) + +// Format returns a Param that configures how schema's are returned in certain +// get-schema operations. +// +// For Avro schemas, the Format param supports "default" or "resolved". For +// Protobuf schemas, the Format param supports "default", "ignore_extensions", +// or "serialized". +func Format(f string) Param { return Param{format: f} } + +// SubjectPrefix returns a Param that filters subjects by prefix when listing +// schemas. +func SubjectPrefix(pfx string) Param { return Param{subjectPrefix: pfx} } + +// Subject returns a Param limiting which subject is returned in certain +// list-schema or list-subject operations. +func Subject(s string) Param { return Param{subject: s} } + +/* +TODO once we know the header that is returned for pagination, we can use these. +// Page returns a Param for certain paginating APIs. +func Page(page int) Param { return Param{page: &page} } + +// Limit returns a Param for certain paginating APIs. +func Limit(limit int) Param { return Param{limit: limit} } +*/