diff --git a/pkg/sr/api.go b/pkg/sr/api.go index 2d06838f..4eeb866c 100644 --- a/pkg/sr/api.go +++ b/pkg/sr/api.go @@ -404,7 +404,7 @@ func (cl *Client) DeleteSubject(ctx context.Context, subject string, how DeleteH // DELETE /subjects/{subject}?permanent={x} path := pathSubject(subject) if how == HardDelete { - path += "?permanent=true" + ctx = WithParams(ctx, hardDelete) } var versions []int defer func() { sort.Ints(versions) }() @@ -419,7 +419,7 @@ func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, // DELETE /subjects/{subject}/versions/{version}?permanent={x} path := pathSubjectVersion(subject, version) if how == HardDelete { - path += "?permanent=true" + ctx = WithParams(ctx, hardDelete) } return cl.delete(ctx, path, nil) } diff --git a/pkg/sr/client.go b/pkg/sr/client.go index 850a1c20..0a53618c 100644 --- a/pkg/sr/client.go +++ b/pkg/sr/client.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "net/http" + "net/url" "time" ) @@ -100,21 +101,25 @@ func (cl *Client) do(ctx context.Context, method, path string, v, into any) erro urls := cl.urls start: - url := fmt.Sprintf("%s%s", urls[0], path) + reqURL, err := url.JoinPath(urls[0], path) + if err != nil { + return fmt.Errorf("unable to join path for %q and %q: %w", urls[0], path, err) + } + urls = urls[1:] var reqBody io.Reader if v != nil { marshaled, err := json.Marshal(v) if err != nil { - return fmt.Errorf("unable to encode body for %s %q: %w", method, url, err) + return fmt.Errorf("unable to encode body for %s %q: %w", method, reqURL, err) } reqBody = bytes.NewReader(marshaled) } - req, err := http.NewRequestWithContext(ctx, method, url, reqBody) + req, err := http.NewRequestWithContext(ctx, method, reqURL, reqBody) if err != nil { - return fmt.Errorf("unable to create request for %s %q: %v", method, url, err) + return fmt.Errorf("unable to create request for %s %q: %v", method, reqURL, err) } req.Header.Set("Content-Type", "application/vnd.schemaregistry.v1+json") req.Header.Set("Accept", "application/vnd.schemaregistry.v1+json") @@ -127,7 +132,7 @@ start: resp, err := cl.httpcl.Do(req) if err != nil { if len(urls) == 0 { - return fmt.Errorf("unable to %s %q: %w", method, url, err) + return fmt.Errorf("unable to %s %q: %w", method, reqURL, err) } goto start } @@ -135,13 +140,13 @@ start: body, err := io.ReadAll(resp.Body) resp.Body.Close() if err != nil { - return fmt.Errorf("unable to read response body from %s %q: %w", method, url, err) + return fmt.Errorf("unable to read response body from %s %q: %w", method, reqURL, err) } if resp.StatusCode >= 300 { e := &ResponseError{ Method: method, - URL: url, + URL: reqURL, Raw: body, } _ = json.Unmarshal(body, e) // best effort @@ -154,7 +159,7 @@ start: *into = body // return raw body to caller default: if err := json.Unmarshal(body, into); err != nil { - return fmt.Errorf("unable to decode ok response body from %s %q: %w", method, url, err) + return fmt.Errorf("unable to decode ok response body from %s %q: %w", method, reqURL, err) } } } diff --git a/pkg/sr/params.go b/pkg/sr/params.go index 33eb3c56..59c680c5 100644 --- a/pkg/sr/params.go +++ b/pkg/sr/params.go @@ -22,6 +22,7 @@ type Param struct { subject string page *int limit int + hardDelete bool } // WithParams adds query parameters to the given context. This is a merge @@ -83,6 +84,9 @@ func (p Param) apply(req *http.Request) { if p.limit > 0 { q.Set("limit", fmt.Sprintf("%d", p.limit)) } + if p.hardDelete { + q.Set("permanent", "true") + } req.URL.RawQuery = q.Encode() } @@ -130,6 +134,9 @@ func mergeParams(p ...Param) Param { if p.limit > 0 { merged.limit = p.limit } + if p.hardDelete { + merged.hardDelete = p.hardDelete + } } return merged } @@ -167,6 +174,9 @@ var ( // DeletedOnly is a Param that configures whether to return only // deleted schemas or subjects in certain get operations. DeletedOnly = Param{deletedOnly: true} + + // hardDelete is internal, and set when DeleteHow == HardDelete. + hardDelete = Param{hardDelete: true} ) // Format returns a Param that configures how schema's are returned in certain