diff --git a/pkg/sr/api.go b/pkg/sr/api.go index 112dd033..8a1fbde5 100644 --- a/pkg/sr/api.go +++ b/pkg/sr/api.go @@ -9,62 +9,110 @@ import ( "sync/atomic" ) +// TODO +// * /contexts (get) // looks niche right now + // This file is an implementation of: // // https://docs.confluent.io/platform/current/schema-registry/develop/api.html // +type ( + // SchemaReference is a way for a one schema to reference another. The + // details for how referencing is done are type specific; for example, + // JSON objects that use the key "$ref" can refer to another schema via + // URL. For more details on references, see the following link: + // + // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references + // https://docs.confluent.io/platform/current/schema-registry/develop/api.html + SchemaReference struct { + Name string `json:"name"` + Subject string `json:"subject"` + Version int `json:"version"` + } + + // SchemaMetadata is arbitrary information about the schema or its + // constituent parts, such as whether a field contains sensitive + // information or who created a data contract. + SchemaMetadata struct { + Tags map[string][]string `json:"tags,omitempty"` + Properties map[string]string `json:"properties,omitempty"` + Sensitive []string `json:"sensitive,omitempty"` + } + + // SchemaRule specifies integrity constraints or data policies in a + // data contract. These data rules or policies can enforce that a field + // that contains sensitive information must be encrypted, or that a + // message containing an invalid age must be sent to a dead letter + // queue + // + // https://docs.confluent.io/platform/current/schema-registry/fundamentals/data-contracts.html#rules + SchemaRule struct { + Name string `json:"name"` // Name is a user-defined name to reference the rule. + Doc string `json:"doc,omitempty"` // Doc is an optional description of the rule. + Kind SchemaRuleKind `json:"kind"` // Kind is the type of rule. + Mode SchemaRuleMode `json:"mode"` // Mode is the mode of the rule. + Type string `json:"type"` // Type is the type of rule, which invokes a specific rule executor, such as Google Common Expression Language (CEL) or JSONata. + Tags []string `json:"tags"` // Tags to which this rule applies. + Params map[string]string `json:"params,omitempty"` // Optional params for the rule. + Expr string `json:"expr"` // Expr is the rule expression. + OnSuccess string `json:"onSuccess,omitempty"` // OnSuccess is an optional action to execute if the rule succeeds, otherwise the built-in action type NONE is used. For UPDOWN and WRITEREAD rules, one can specify two actions separated by commas, such as "NONE,ERROR" for a WRITEREAD rule. In this case NONE applies to WRITE and ERROR applies to READ + OnFailure string `json:"onFailure,omitempty"` // OnFailure is an optional action to execute if the rule fails, otherwise the built-in action type NONE is used. See OnSuccess for more details. + Disabled bool `json:"disabled,omitempty"` // Disabled specifies whether the rule is disabled. + } + + // SchemaRuleSet groups migration rules and domain validation rules. + SchemaRuleSet struct { + MigrationRules []SchemaRule `json:"migrationRules,omitempty"` + DomainRules []SchemaRule `json:"domainRules,omitempty"` + } + + // Schema is the object form of a schema for the HTTP API. + Schema struct { + // Schema is the actual unescaped text of a schema. + Schema string `json:"schema"` + + // Type is the type of a schema. The default type is avro. + Type SchemaType `json:"schemaType,omitempty"` + + // References declares other schemas this schema references. See the + // docs on SchemaReference for more details. + References []SchemaReference `json:"references,omitempty"` + + // SchemaMetadata is arbitrary information about the schema. + SchemaMetadata SchemaMetadata `json:"metadata,omitempty"` + + // SchemaRuleSet is a set of rules that govern the schema. + SchemaRuleSet SchemaRuleSet `json:"ruleSet,omitempty"` + } + + // SubjectSchema pairs the subject, global identifier, and version of a + // schema with the schema itself. + SubjectSchema struct { + // Subject is the subject for this schema. This usually corresponds to + // a Kafka topic, and whether this is for a key or value. For example, + // "foo-key" would be the subject for the foo topic for serializing the + // key field of a record. + Subject string `json:"subject"` + + // Version is the version of this subject. + Version int `json:"version"` + + // ID is the globally unique ID of the schema. + ID int `json:"id"` + + Schema + } +) + // SupportedTypes returns the schema types that are supported in the schema // registry. func (cl *Client) SupportedTypes(ctx context.Context) ([]SchemaType, error) { // GET /schemas/types var types []SchemaType defer func() { sort.Slice(types, func(i, j int) bool { return types[i] < types[j] }) }() - return types, cl.get(ctx, "/schemas/types", &types) -} - -// SchemaReference is a way for a one schema to reference another. The details -// for how referencing is done are type specific; for example, JSON objects -// that use the key "$ref" can refer to another schema via URL. For more details -// on references, see the following link: -// -// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references -// https://docs.confluent.io/platform/current/schema-registry/develop/api.html -type SchemaReference struct { - Name string `json:"name"` - Subject string `json:"subject"` - Version int `json:"version"` -} - -// Schema is the object form of a schema for the HTTP API. -type Schema struct { - // Schema is the actual unescaped text of a schema. - Schema string `json:"schema"` - - // Type is the type of a schema. The default type is avro. - Type SchemaType `json:"schemaType,omitempty"` - - // References declares other schemas this schema references. See the - // docs on SchemaReference for more details. - References []SchemaReference `json:"references,omitempty"` -} - -// SubjectSchema pairs the subject, global identifier, and version of a schema -// with the schema itself. -type SubjectSchema struct { - // Subject is the subject for this schema. This usually corresponds to - // a Kafka topic, and whether this is for a key or value. For example, - // "foo-key" would be the subject for the foo topic for serializing the - // key field of a record. - Subject string `json:"subject"` - - // Version is the version of this subject. - Version int `json:"version"` - - // ID is the globally unique ID of the schema. - ID int `json:"id"` - - Schema + err := cl.get(ctx, "/schemas/types", &types) + return types, err } // CommSubjectSchemas splits l and r into three sets: what is unique in l, what @@ -118,33 +166,24 @@ func CommSubjectSchemas(l, r []SubjectSchema) (luniq, runiq, common []SubjectSch return luniq, runiq, common } -// HideShowDeleted is a typed bool indicating whether queries should show -// or hide soft deleted schemas / subjects. -type HideShowDeleted bool - -const ( - // HideDeleted hides soft deleted schemas or subjects. - HideDeleted = false - // ShowDeleted shows soft deleted schemas or subjects. - ShowDeleted = true -) - // Subjects returns subjects available in the registry. -func (cl *Client) Subjects(ctx context.Context, deleted HideShowDeleted) ([]string, error) { - // GET /subjects?deleted={x} +// +// This supports params [SubjectPrefix], [ShowDeleted], and [DeletedOnly]. +func (cl *Client) Subjects(ctx context.Context) ([]string, error) { + // GET /subjects var subjects []string - path := "/subjects" - if deleted { - path += "?deleted=true" - } - return subjects, cl.get(ctx, path, &subjects) + err := cl.get(ctx, "/subjects", &subjects) + return subjects, err } // SchemaByID returns the schema for a given schema ID. +// +// This supports params [Subject], [Format], and [FetchMaxID]. func (cl *Client) SchemaByID(ctx context.Context, id int) (Schema, error) { // GET /schemas/ids/{id} var s Schema - return s, cl.get(ctx, fmt.Sprintf("/schemas/ids/%d", id), &s) + err := cl.get(ctx, fmt.Sprintf("/schemas/ids/%d", id), &s) + return s, err } // SchemaTextByID returns the actual text of a schema. @@ -156,6 +195,8 @@ func (cl *Client) SchemaByID(ctx context.Context, id int) (Schema, error) { // this will return // // {"type":"boolean"} +// +// This supports params [Subject], [Format]. func (cl *Client) SchemaTextByID(ctx context.Context, id int) (string, error) { // GET /schemas/ids/{id}/schema var s []byte @@ -165,6 +206,32 @@ func (cl *Client) SchemaTextByID(ctx context.Context, id int) (string, error) { return string(s), nil } +// SubjectsByID returns the subjects associated with a schema ID. +// +// This supports params [Subject] and [ShowDeleted]. +func (cl *Client) SubjectsByID(ctx context.Context, id int) ([]string, error) { + // GET /schemas/ids/{id}/subjects + var subjects []string + err := cl.get(ctx, fmt.Sprintf("/schemas/ids/%d/subjects", id), &subjects) + return subjects, err +} + +// SchemaVersion is a subject version pair. +type SubjectVersion struct { + Subject string `json:"subject"` + Version int `json:"version"` +} + +// SchemaVersionsByID returns all subject versions associated with a schema ID. +// +// This supports params [Subject] and [ShowDeleted]. +func (cl *Client) SchemaVersionsByID(ctx context.Context, id int) ([]SubjectVersion, error) { + // GET /schemas/ids/{id}/versions + var versions []SubjectVersion + err := cl.get(ctx, fmt.Sprintf("/schemas/ids/%d/versions", id), &versions) + return versions, err +} + func pathSubject(subject string) string { return fmt.Sprintf("/subjects/%s", url.PathEscape(subject)) } func pathSubjectWithVersion(subject string) string { return pathSubject(subject) + "/versions" } func pathSubjectVersion(subject string, version int) string { @@ -178,43 +245,77 @@ func pathConfig(subject string) string { if subject == "" { return "/config" } - return fmt.Sprintf("/config/%s?defaultToGlobal=true", url.PathEscape(subject)) + return fmt.Sprintf("/config/%s", url.PathEscape(subject)) } -func pathMode(subject string, force bool) string { +func pathMode(subject string) string { if subject == "" { - if force { - return "/mode?force=true" - } return "/mode" } - if force { - return fmt.Sprintf("/mode/%s?force=true", url.PathEscape(subject)) - } - // set (no force), or delete return fmt.Sprintf("/mode/%s", url.PathEscape(subject)) } +// SubjectVersions returns all versions for a given subject. +// +// This supports params [ShowDeleted] and [DeletedOnly]. +func (cl *Client) SubjectVersions(ctx context.Context, subject string) ([]int, error) { + // GET /subjects/{subject}/versions + var versions []int + err := cl.get(ctx, pathSubject(subject), &versions) + return versions, err +} + // SchemaByVersion returns the schema for a given subject and version. You can // use -1 as the version to return the latest schema. -func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int, deleted HideShowDeleted) (SubjectSchema, error) { +// +// This supports param [ShowDeleted]. +func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int) (SubjectSchema, error) { // GET /subjects/{subject}/versions/{version} var ss SubjectSchema path := pathSubjectVersion(subject, version) - if deleted { - path += "?deleted=true" + err := cl.get(ctx, path, &ss) + return ss, err +} + +// SchemaTextByVersion returns the actual text of a schema, by subject and +// version. You can use -1 as the version to return the latest schema. +// +// For example, if the schema for an ID is +// +// "{\"type\":\"boolean\"}" +// +// this will return +// +// {"type":"boolean"} +// +// This supports param [ShowDeleted]. +func (cl *Client) SchemaTextByVersion(ctx context.Context, subject string, version int) (string, error) { + // GET /subjects/{subject}/versions/{version}/schema + var s []byte + path := pathSubjectVersion(subject, version) + "/schema" + if err := cl.get(ctx, path, &s); err != nil { + return "", err } - return ss, cl.get(ctx, path, &ss) + return string(s), nil +} + +// AllSchemas returns all schemas for all subjects. +// +// This supports params [SubjectPrefix], [ShowDeleted], and [LatestOnly]. +func (cl *Client) AllSchemas(ctx context.Context) ([]SubjectSchema, error) { + // GET /schemas => []SubjectSchema + var ss []SubjectSchema + err := cl.get(ctx, "/schemas", &ss) + return ss, err } // Schemas returns all schemas for the given subject. -func (cl *Client) Schemas(ctx context.Context, subject string, deleted HideShowDeleted) ([]SubjectSchema, error) { +// +// This supports param [ShowDeleted]. +func (cl *Client) Schemas(ctx context.Context, subject string) ([]SubjectSchema, error) { // GET /subjects/{subject}/versions => []int (versions) var versions []int path := pathSubjectWithVersion(subject) - if deleted { - path += "?deleted=true" - } if err := cl.get(ctx, path, &versions); err != nil { return nil, err } @@ -234,7 +335,7 @@ func (cl *Client) Schemas(ctx context.Context, subject string, deleted HideShowD wg.Add(1) go func() { defer wg.Done() - s, err := cl.SchemaByVersion(cctx, subject, version, deleted) + s, err := cl.SchemaByVersion(cctx, subject, version) schemas[slot] = s if err != nil && atomic.SwapUint32(&errOnce, 1) == 0 { firstErr = err @@ -248,12 +349,13 @@ func (cl *Client) Schemas(ctx context.Context, subject string, deleted HideShowD } // CreateSchema attempts to create a schema in the given subject. +// +// This supports param [Normalize]. func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error) { // POST /subjects/{subject}/versions => returns ID + // Newer SR returns the full SubjectSchema, but old does not, so we + // re-request to find the full information. path := pathSubjectWithVersion(subject) - if cl.normalize { - path += "?normalize=true" - } var id struct { ID int `json:"id"` } @@ -261,7 +363,7 @@ func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (S return SubjectSchema{}, err } - usages, err := cl.SchemaUsagesByID(ctx, id.ID, HideDeleted) + usages, err := cl.SchemaUsagesByID(ctx, id.ID) if err != nil { return SubjectSchema{}, err } @@ -275,14 +377,13 @@ func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (S // LookupSchema checks to see if a schema is already registered and if so, // returns its ID and version in the SubjectSchema. +// +// This supports params Normalize and Deleted. func (cl *Client) LookupSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error) { // POST /subjects/{subject}/ - path := pathSubject(subject) - if cl.normalize { - path += "?normalize=true" - } var ss SubjectSchema - return ss, cl.post(ctx, path, s, &ss) + err := cl.post(ctx, pathSubject(subject), s, &ss) + return ss, err } // DeleteHow is a typed bool indicating how subjects or schemas should be @@ -307,7 +408,8 @@ func (cl *Client) DeleteSubject(ctx context.Context, subject string, how DeleteH } var versions []int defer func() { sort.Ints(versions) }() - return versions, cl.delete(ctx, path, &versions) + err := cl.delete(ctx, path, &versions) + return versions, err } // DeleteSchema deletes the schema at the given version. You must soft delete @@ -324,7 +426,9 @@ func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, // SchemaReferences returns all schemas that references the input // subject-version. You can use -1 to check the latest version. -func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int, deleted HideShowDeleted) ([]SubjectSchema, error) { +// +// This supports param [ShowDeleted]. +func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int) ([]SubjectSchema, error) { // GET /subjects/{subject}/versions/{version}/referencedby // SchemaUsagesByID var ids []int @@ -345,7 +449,7 @@ func (cl *Client) SchemaReferences(ctx context.Context, subject string, version wg.Add(1) go func() { defer wg.Done() - idSchemas, err := cl.SchemaUsagesByID(cctx, id, deleted) + idSchemas, err := cl.SchemaUsagesByID(cctx, id) mu.Lock() defer mu.Unlock() schemas = append(schemas, idSchemas...) @@ -363,7 +467,9 @@ func (cl *Client) SchemaReferences(ctx context.Context, subject string, version // SchemaUsagesByID returns all usages of a given schema ID. A single schema's // can be reused in many subject-versions; this function can be used to map a // schema to all subject-versions that use it. -func (cl *Client) SchemaUsagesByID(ctx context.Context, id int, deleted HideShowDeleted) ([]SubjectSchema, error) { +// +// This supports param [ShowDeleted]. +func (cl *Client) SchemaUsagesByID(ctx context.Context, id int) ([]SubjectSchema, error) { // GET /schemas/ids/{id}/versions // SchemaByVersion type subjectVersion struct { @@ -372,9 +478,6 @@ func (cl *Client) SchemaUsagesByID(ctx context.Context, id int, deleted HideShow } var subjectVersions []subjectVersion path := fmt.Sprintf("/schemas/ids/%d/versions", id) - if deleted { - path += "?deleted=true" - } if err := cl.get(ctx, path, &subjectVersions); err != nil { return nil, err } @@ -393,7 +496,7 @@ func (cl *Client) SchemaUsagesByID(ctx context.Context, id int, deleted HideShow wg.Add(1) go func() { defer wg.Done() - s, err := cl.SchemaByVersion(cctx, sv.Subject, sv.Version, deleted) + s, err := cl.SchemaByVersion(cctx, sv.Subject, sv.Version) schemas[slot] = s if err != nil && atomic.SwapUint32(&errOnce, 1) == 0 { firstErr = err @@ -434,15 +537,28 @@ const GlobalSubject = "" // CompatibilityResult is the compatibility level for a subject. type CompatibilityResult struct { - Subject string // The subject this compatibility result is for, or empty for the global level. - Level CompatibilityLevel // The subject (or global) compatibility level. - Err error // The error received for getting this compatibility level. + Subject string `json:"-"` // The subject this compatibility result is for, or empty for the global compatibility.. + + Level CompatibilityLevel `json:"compatibilityLevel"` // The subject (or global) compatibility level. + Alias string `json:"alias"` // The subject alias, if any. + Normalize bool `json:"normalize"` // Whether or not schemas are normalized by default. + Group string `json:"compatibilityGroup"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility. + DefaultMetadata SchemaMetadata `json:"defaultMetadata"` // Default metadata used for schema registration. + OverrideMetadata SchemaMetadata `json:"overrideMetadata"` // Override metadata used for schema registration. + DefaultRuleSet SchemaRuleSet `json:"defaultRuleSet"` // Default rule set used for schema registration. + OverrideRuleSet SchemaRuleSet `json:"overrideRuleSet"` // Override rule set used for schema registration. + + Err error `json:"-"` // The error received for getting this compatibility. } -// CompatibilityLevel returns the subject level and global level compatibility -// of each requested subject. The global level can be requested by using either -// an empty subject or by specifying no subjects. -func (cl *Client) CompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult { +// Compatibility returns the subject compatibility and global compatibility of +// each requested subject. The global compatibility can be requested by using +// either an empty subject or by specifying no subjects. +// +// This supports params [DefaultToGlobal]. +// +// This can return 200 or 500 per result. +func (cl *Client) Compatibility(ctx context.Context, subjects ...string) []CompatibilityResult { // GET /config/{subject} // GET /config if len(subjects) == 0 { @@ -458,15 +574,11 @@ func (cl *Client) CompatibilityLevel(ctx context.Context, subjects ...string) [] wg.Add(1) go func() { defer wg.Done() - var c struct { - Level CompatibilityLevel `json:"compatibilityLevel"` - } + var c CompatibilityResult err := cl.get(ctx, pathConfig(subject), &c) - results[slot] = CompatibilityResult{ - Subject: subject, - Level: c.Level, - Err: err, - } + c.Subject = subject + c.Err = err + results[slot] = c }() } wg.Wait() @@ -474,10 +586,30 @@ func (cl *Client) CompatibilityLevel(ctx context.Context, subjects ...string) [] return results } -// SetCompatibilityLevel sets the compatibility level for each requested -// subject. The global level can be set by either using an empty subject or by +// SetCompatibility contains information used for setting global or per-subject +// compatibility configuration. +// +// The main difference between this and the CompatibilityResult is that this +// struct marshals the compatibility level as "compatibility". +type SetCompatibility struct { + Subject string `json:"-"` // The subject this compatibility set is for, or empty for the global compatibility.. + + Level CompatibilityLevel `json:"compatibility"` // The subject (or global) compatibility level. + Alias string `json:"alias"` // The subject alias, if any. + Normalize bool `json:"normalize"` // Whether or not schemas are normalized by default. + Group string `json:"compatibilityGroup"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility. + DefaultMetadata SchemaMetadata `json:"defaultMetadata"` // Default metadata used for schema registration. + OverrideMetadata SchemaMetadata `json:"overrideMetadata"` // Override metadata used for schema registration. + DefaultRuleSet SchemaRuleSet `json:"defaultRuleSet"` // Default rule set used for schema registration. + OverrideRuleSet SchemaRuleSet `json:"overrideRuleSet"` // Override rule set used for schema registration. + + Err error `json:"-"` // The error received for setting this compatibility. +} + +// SetCompatibilitysets the compatibility for each requested subject. The +// global compatibility can be set by either using an empty subject or by // specifying no subjects. If specifying no subjects, this returns one element. -func (cl *Client) SetCompatibilityLevel(ctx context.Context, level CompatibilityLevel, subjects ...string) []CompatibilityResult { +func (cl *Client) SetCompatibility(ctx context.Context, compat SetCompatibility, subjects ...string) []CompatibilityResult { // PUT /config/{subject} // PUT /config if len(subjects) == 0 { @@ -493,14 +625,19 @@ func (cl *Client) SetCompatibilityLevel(ctx context.Context, level Compatibility wg.Add(1) go func() { defer wg.Done() - c := struct { - Level CompatibilityLevel `json:"compatibility"` - }{level} + c := compat err := cl.put(ctx, pathConfig(subject), c, &c) results[slot] = CompatibilityResult{ - Subject: subject, - Level: c.Level, - Err: err, + Subject: subject, + Level: c.Level, + Alias: c.Alias, + Normalize: c.Normalize, + Group: c.Group, + DefaultMetadata: c.DefaultMetadata, + OverrideMetadata: c.OverrideMetadata, + DefaultRuleSet: c.DefaultRuleSet, + OverrideRuleSet: c.OverrideRuleSet, + Err: err, } }() } @@ -509,12 +646,15 @@ func (cl *Client) SetCompatibilityLevel(ctx context.Context, level Compatibility return results } -// ResetCompatibilityLevel deletes any subject-level compatibility level and -// reverts to the global default. -func (cl *Client) ResetCompatibilityLevel(ctx context.Context, subjects ...string) []CompatibilityResult { +// ResetCompatibility deletes any subject-level compatibility and reverts to the +// global default. The global compatibility can be reset by either using an +// empty subject or by specifying no subjects. +// +// This can return 200 or 500. +func (cl *Client) ResetCompatibility(ctx context.Context, subjects ...string) []CompatibilityResult { // DELETE /config/{subject} if len(subjects) == 0 { - return nil + subjects = append(subjects, GlobalSubject) } var ( wg sync.WaitGroup @@ -526,14 +666,18 @@ func (cl *Client) ResetCompatibilityLevel(ctx context.Context, subjects ...strin wg.Add(1) go func() { defer wg.Done() - var c struct { - Level CompatibilityLevel `json:"compatibility"` - } + var c SetCompatibility // unmarshals with "compatibility" err := cl.delete(ctx, pathConfig(subject), &c) results[slot] = CompatibilityResult{ - Subject: subject, - Level: c.Level, - Err: err, + Subject: subject, + Level: c.Level, + Alias: c.Alias, + Group: c.Group, + DefaultMetadata: c.DefaultMetadata, + OverrideMetadata: c.OverrideMetadata, + DefaultRuleSet: c.DefaultRuleSet, + OverrideRuleSet: c.OverrideRuleSet, + Err: err, } }() } @@ -542,20 +686,27 @@ func (cl *Client) ResetCompatibilityLevel(ctx context.Context, subjects ...strin return results } +// CheckCompatibilityResult is the response from the check compatibility endpoint. +type CheckCompatibilityResult struct { + Is bool `json:"is_compatible"` // Is is true if the schema is compatible. + Messages []string `json:"messages"` // Messages contains reasons a schema is not compatible. +} + // CheckCompatibility checks if a schema is compatible with the given version // that exists. You can use -1 to check compatibility with the latest version, // and -2 to check compatibility against all versions. -func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (bool, error) { - // POST /compatibility/subjects/{subject}/versions/{version}?reason=true - // POST /compatibility/subjects/{subject}/versions?reason=true - path := "/compatibility" + pathSubjectVersion(subject, version) + "?verbose=true" +// +// This supports params [Normalize] and [Verbose]. +func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (CheckCompatibilityResult, error) { + // POST /compatibility/subjects/{subject}/versions/{version} + // POST /compatibility/subjects/{subject}/versions + path := "/compatibility" + pathSubjectVersion(subject, version) if version == -2 { - path = "/compatibility" + pathSubjectWithVersion(subject) + "?verbose=true" + path = "/compatibility" + pathSubjectWithVersion(subject) } - var is struct { - Is bool `json:"is_compatible"` - } - return is.Is, cl.post(ctx, path, s, &is) + var is CheckCompatibilityResult + err := cl.post(ctx, path, s, &is) + return is, err } // ModeResult is the mode for a subject. @@ -572,6 +723,8 @@ type modeResponse struct { // Mode returns the subject and global mode of each requested subject. The // global mode can be requested by using either an empty subject or by // specifying no subjects. +// +// This supports params [DefaultToGlobal]. func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult { // GET /mode/{subject} // GET /mode @@ -589,7 +742,7 @@ func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult { go func() { defer wg.Done() var m modeResponse - err := cl.get(ctx, pathMode(subject, false), &m) + err := cl.get(ctx, pathMode(subject), &m) results[slot] = ModeResult{ Subject: subject, Mode: m.Mode, @@ -604,9 +757,10 @@ func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult { // SetMode sets the mode for each requested subject. The global mode can be set // by either using an empty subject or by specifying no subjects. If specifying -// no subjects, this returns one element. Force can be used to force setting -// the mode even if the registry has existing schemas. -func (cl *Client) SetMode(ctx context.Context, mode Mode, force bool, subjects ...string) []ModeResult { +// no subjects, this returns one element. +// +// This supports params [Force]. +func (cl *Client) SetMode(ctx context.Context, mode Mode, subjects ...string) []ModeResult { // PUT /mode/{subject} // PUT /mode if len(subjects) == 0 { @@ -623,7 +777,7 @@ func (cl *Client) SetMode(ctx context.Context, mode Mode, force bool, subjects . go func() { defer wg.Done() var m modeResponse - err := cl.put(ctx, pathMode(subject, force), mode, &m) + err := cl.put(ctx, pathMode(subject), mode, &m) results[slot] = ModeResult{ Subject: subject, Mode: m.Mode, @@ -653,7 +807,7 @@ func (cl *Client) ResetMode(ctx context.Context, subjects ...string) []ModeResul go func() { defer wg.Done() var m modeResponse - err := cl.delete(ctx, pathMode(subject, false), &m) + err := cl.delete(ctx, pathMode(subject), &m) results[slot] = ModeResult{ Subject: subject, Mode: m.Mode, diff --git a/pkg/sr/client.go b/pkg/sr/client.go index 1c4b1147..850a1c20 100644 --- a/pkg/sr/client.go +++ b/pkg/sr/client.go @@ -50,16 +50,15 @@ func (e *ResponseError) Error() string { // Client talks to a schema registry and contains helper functions to serialize // and deserialize objects according to schemas. type Client struct { - urls []string - httpcl *http.Client - ua string + urls []string + httpcl *http.Client + ua string + defParams Param basicAuth *struct { user string pass string } - - normalize bool } // NewClient returns a new schema registry client. @@ -123,6 +122,7 @@ start: if cl.basicAuth != nil { req.SetBasicAuth(cl.basicAuth.user, cl.basicAuth.pass) } + cl.applyParams(ctx, req) resp, err := cl.httpcl.Do(req) if err != nil { diff --git a/pkg/sr/config.go b/pkg/sr/config.go index db1570c4..499e0ab6 100644 --- a/pkg/sr/config.go +++ b/pkg/sr/config.go @@ -67,14 +67,6 @@ func DialTLSConfig(c *tls.Config) Opt { }} } -// Normalize sets the client to add the "?normalize=true" query parameter when -// getting or creating schemas. This can help collapse duplicate schemas into -// one, but can also be done with a configuration parameter on the schema -// registry itself. -func Normalize() Opt { - return opt{func(cl *Client) { cl.normalize = true }} -} - // BasicAuth sets basic authorization to use for every request. func BasicAuth(user, pass string) Opt { return opt{func(cl *Client) { @@ -84,3 +76,10 @@ func BasicAuth(user, pass string) Opt { }{user, pass} }} } + +// DefaultParams sets default parameters to apply to every request. +func DefaultParams(ps ...Param) Opt { + return opt{func(cl *Client) { + cl.defParams = mergeParams(ps...) + }} +} diff --git a/pkg/sr/enums.go b/pkg/sr/enums.go index 6ed8cc58..2c1b1ca7 100644 --- a/pkg/sr/enums.go +++ b/pkg/sr/enums.go @@ -157,3 +157,110 @@ func (m *Mode) UnmarshalText(text []byte) error { } return nil } + +// SchemaRuleKind as an enum representing the kind of schema rule. +type SchemaRuleKind int + +const ( + SchemaRuleKindTransform SchemaRuleKind = iota + SchemaRuleKindCondition +) + +func (k SchemaRuleKind) String() string { + switch k { + case SchemaRuleKindTransform: + return "TRANSFORM" + case SchemaRuleKindCondition: + return "CONDITION" + default: + return "" + } +} + +func (k SchemaRuleKind) MarshalText() ([]byte, error) { + s := k.String() + if s == "" { + return nil, fmt.Errorf("unknown schema rule kind %d", k) + } + return []byte(s), nil +} + +func (k *SchemaRuleKind) UnmarshalText(text []byte) error { + switch s := strings.ToUpper(string(text)); s { + default: + return fmt.Errorf("unknown schema rule kind %q", s) + case "TRANSFORM": + *k = SchemaRuleKindTransform + case "CONDITION": + *k = SchemaRuleKindCondition + } + return nil +} + +// Mode specifies a schema rule's mode. +// +// Migration rules can be specified for an UPGRADE, DOWNGRADE, or both +// (UPDOWN). Migration rules are used during complex schema evolution. +// +// Domain rules can be specified during serialization (WRITE), deserialization +// (READ) or both (WRITEREAD). +// +// Domain rules can be used to transform the domain values in a message +// payload. +type SchemaRuleMode int + +const ( + SchemaRuleModeUpgrade SchemaRuleMode = iota + SchemaRuleModeDowngrade + SchemaRuleModeUpdown + SchemaRuleModeWrite + SchemaRuleModeRead + SchemaRuleModeWriteRead +) + +func (m SchemaRuleMode) String() string { + switch m { + case SchemaRuleModeUpgrade: + return "UPGRADE" + case SchemaRuleModeDowngrade: + return "DOWNGRADE" + case SchemaRuleModeUpdown: + return "UPDOWN" + case SchemaRuleModeWrite: + return "WRITE" + case SchemaRuleModeRead: + return "READ" + case SchemaRuleModeWriteRead: + return "WRITEREAD" + default: + return "" + } +} + +func (m SchemaRuleMode) MarshalText() ([]byte, error) { + s := m.String() + if s == "" { + return nil, fmt.Errorf("unknown schema rule mode %d", m) + } + return []byte(s), nil +} + +func (m *SchemaRuleMode) UnmarshalText(text []byte) error { + switch s := strings.ToUpper(string(text)); s { + default: + return fmt.Errorf("unknown schema rule mode %q", s) + case "UPGRADE": + *m = SchemaRuleModeUpgrade + case "DOWNGRADE": + *m = SchemaRuleModeDowngrade + case "UPDOWN": + *m = SchemaRuleModeUpdown + case "WRITE": + *m = SchemaRuleModeWrite + case "READ": + *m = SchemaRuleModeRead + case "WRITEREAD": + *m = SchemaRuleModeWriteRead + } + return nil +} diff --git a/pkg/sr/params.go b/pkg/sr/params.go new file mode 100644 index 00000000..33eb3c56 --- /dev/null +++ b/pkg/sr/params.go @@ -0,0 +1,195 @@ +package sr + +import ( + "context" + "fmt" + "net/http" +) + +// Param is a parameter that can be passed to various APIs. Each API documents +// the parameters they accept. +type Param struct { + normalize bool + verbose bool + fetchMaxID bool + defaultToGlobal bool + force bool + latestOnly bool + showDeleted bool + deletedOnly bool + format string + subjectPrefix string + subject string + page *int + limit int +} + +// WithParams adds query parameters to the given context. This is a merge +// operation: any non-zero parameter is kept. The variadic nature of this +// allows for a nicer api: +// +// sr.WithParams(ctx, sr.Format("default"), sr.FetchMaxID) +func WithParams(ctx context.Context, p ...Param) context.Context { + return context.WithValue(ctx, ¶msKey, mergeParams(p...)) +} + +func (cl *Client) applyParams(ctx context.Context, req *http.Request) { + p := cl.defParams + user, ok := ctx.Value(¶msKey).(Param) + if ok { + p = mergeParams(p, user) + } + p.apply(req) +} + +func (p Param) apply(req *http.Request) { + q := req.URL.Query() + if p.normalize { + q.Set("normalize", "true") + } + if p.verbose { + q.Set("verbose", "true") + } + if p.fetchMaxID { + q.Set("fetchMaxId", "true") + } + if p.defaultToGlobal { + q.Set("defaultToGlobal", "true") + } + if p.force { + q.Set("force", "true") + } + if p.latestOnly { + q.Set("latestOnly", "true") + } + if p.showDeleted { + q.Set("deleted", "true") + } + if p.deletedOnly { + q.Set("deletedOnly", "true") + } + if p.format != "" { + q.Set("format", p.format) + } + if p.subjectPrefix != "" { + q.Set("subjectPrefix", p.subjectPrefix) + } + if p.subject != "" { + q.Set("subject", p.subject) + } + if p.page != nil && *p.page >= 0 { + q.Set("page", fmt.Sprintf("%d", *p.page)) + } + if p.limit > 0 { + q.Set("limit", fmt.Sprintf("%d", p.limit)) + } + req.URL.RawQuery = q.Encode() +} + +var paramsKey = "params_key" + +func mergeParams(p ...Param) Param { + var merged Param + for _, p := range p { + if p.normalize { + merged.normalize = true + } + if p.verbose { + merged.verbose = true + } + if p.fetchMaxID { + merged.fetchMaxID = true + } + if p.defaultToGlobal { + merged.defaultToGlobal = true + } + if p.force { + merged.force = true + } + if p.latestOnly { + merged.latestOnly = true + } + if p.showDeleted { + merged.showDeleted = true + } + if p.deletedOnly { + merged.deletedOnly = true + } + if p.format != "" { + merged.format = p.format + } + if p.subjectPrefix != "" { + merged.subjectPrefix = p.subjectPrefix + } + if p.subject != "" { + merged.subject = p.subject + } + if p.page != nil && *p.page >= 0 { + merged.page = p.page + } + if p.limit > 0 { + merged.limit = p.limit + } + } + return merged +} + +var ( + // Normalize is a Param that configures whether or not to normalize + // schema's in certain create- or get-schema operations. + Normalize = Param{normalize: true} + + // Verbose is a Param that configures whether or not to return verbose + // error messages when checking compatibility. + Verbose = Param{verbose: true} + + // FetchMaxID is a Param that configures whether or not to fetch the + // max schema ID in certain get-schema operations. + FetchMaxID = Param{fetchMaxID: true} + + // DefaultToGlobal is a Param that changes get-compatibility or + // get-mode to return the global compatibility or mode if the requested + // subject does not exist. + DefaultToGlobal = Param{defaultToGlobal: true} + + // Force is a Param that updating the mode if you are setting the mode + // to IMPORT and schemas currently exist. + Force = Param{force: true} + + // LatestOnly is a Param that configures whether or not to return only + // the latest schema in certain get-schema operations. + LatestOnly = Param{latestOnly: true} + + // ShowDeleted is a Param that configures whether or not to return + // deleted schemas or subjects in certain get operations. + ShowDeleted = Param{showDeleted: true} + + // DeletedOnly is a Param that configures whether to return only + // deleted schemas or subjects in certain get operations. + DeletedOnly = Param{deletedOnly: true} +) + +// Format returns a Param that configures how schema's are returned in certain +// get-schema operations. +// +// For Avro schemas, the Format param supports "default" or "resolved". For +// Protobuf schemas, the Format param supports "default", "ignore_extensions", +// or "serialized". +func Format(f string) Param { return Param{format: f} } + +// SubjectPrefix returns a Param that filters subjects by prefix when listing +// schemas. +func SubjectPrefix(pfx string) Param { return Param{subjectPrefix: pfx} } + +// Subject returns a Param limiting which subject is returned in certain +// list-schema or list-subject operations. +func Subject(s string) Param { return Param{subject: s} } + +/* +TODO once we know the header that is returned for pagination, we can use these. +// Page returns a Param for certain paginating APIs. +func Page(page int) Param { return Param{page: &page} } + +// Limit returns a Param for certain paginating APIs. +func Limit(limit int) Param { return Param{limit: limit} } +*/