Skip to content

Commit

Permalink
Merge pull request #636 from hhromic/sr-client-joinpath
Browse files Browse the repository at this point in the history
pkg/sr: improve base URL and resource path joining
  • Loading branch information
twmb authored Dec 14, 2023
2 parents c09dc92 + cd96c38 commit 070f8f9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/sr/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (cl *Client) DeleteSubject(ctx context.Context, subject string, how DeleteH
// DELETE /subjects/{subject}?permanent={x}
path := pathSubject(subject)
if how == HardDelete {
path += "?permanent=true"
ctx = WithParams(ctx, hardDelete)
}
var versions []int
defer func() { sort.Ints(versions) }()
Expand All @@ -419,7 +419,7 @@ func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int,
// DELETE /subjects/{subject}/versions/{version}?permanent={x}
path := pathSubjectVersion(subject, version)
if how == HardDelete {
path += "?permanent=true"
ctx = WithParams(ctx, hardDelete)
}
return cl.delete(ctx, path, nil)
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/sr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"time"
)

Expand Down Expand Up @@ -100,21 +101,25 @@ func (cl *Client) do(ctx context.Context, method, path string, v, into any) erro
urls := cl.urls

start:
url := fmt.Sprintf("%s%s", urls[0], path)
reqURL, err := url.JoinPath(urls[0], path)
if err != nil {
return fmt.Errorf("unable to join path for %q and %q: %w", urls[0], path, err)
}

urls = urls[1:]

var reqBody io.Reader
if v != nil {
marshaled, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("unable to encode body for %s %q: %w", method, url, err)
return fmt.Errorf("unable to encode body for %s %q: %w", method, reqURL, err)
}
reqBody = bytes.NewReader(marshaled)
}

req, err := http.NewRequestWithContext(ctx, method, url, reqBody)
req, err := http.NewRequestWithContext(ctx, method, reqURL, reqBody)
if err != nil {
return fmt.Errorf("unable to create request for %s %q: %v", method, url, err)
return fmt.Errorf("unable to create request for %s %q: %v", method, reqURL, err)
}
req.Header.Set("Content-Type", "application/vnd.schemaregistry.v1+json")
req.Header.Set("Accept", "application/vnd.schemaregistry.v1+json")
Expand All @@ -127,21 +132,21 @@ start:
resp, err := cl.httpcl.Do(req)
if err != nil {
if len(urls) == 0 {
return fmt.Errorf("unable to %s %q: %w", method, url, err)
return fmt.Errorf("unable to %s %q: %w", method, reqURL, err)
}
goto start
}

body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return fmt.Errorf("unable to read response body from %s %q: %w", method, url, err)
return fmt.Errorf("unable to read response body from %s %q: %w", method, reqURL, err)
}

if resp.StatusCode >= 300 {
e := &ResponseError{
Method: method,
URL: url,
URL: reqURL,
Raw: body,
}
_ = json.Unmarshal(body, e) // best effort
Expand All @@ -154,7 +159,7 @@ start:
*into = body // return raw body to caller
default:
if err := json.Unmarshal(body, into); err != nil {
return fmt.Errorf("unable to decode ok response body from %s %q: %w", method, url, err)
return fmt.Errorf("unable to decode ok response body from %s %q: %w", method, reqURL, err)
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/sr/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Param struct {
subject string
page *int
limit int
hardDelete bool
}

// WithParams adds query parameters to the given context. This is a merge
Expand Down Expand Up @@ -83,6 +84,9 @@ func (p Param) apply(req *http.Request) {
if p.limit > 0 {
q.Set("limit", fmt.Sprintf("%d", p.limit))
}
if p.hardDelete {
q.Set("permanent", "true")
}
req.URL.RawQuery = q.Encode()
}

Expand Down Expand Up @@ -130,6 +134,9 @@ func mergeParams(p ...Param) Param {
if p.limit > 0 {
merged.limit = p.limit
}
if p.hardDelete {
merged.hardDelete = p.hardDelete
}
}
return merged
}
Expand Down Expand Up @@ -167,6 +174,9 @@ var (
// DeletedOnly is a Param that configures whether to return only
// deleted schemas or subjects in certain get operations.
DeletedOnly = Param{deletedOnly: true}

// hardDelete is internal, and set when DeleteHow == HardDelete.
hardDelete = Param{hardDelete: true}
)

// Format returns a Param that configures how schema's are returned in certain
Expand Down

0 comments on commit 070f8f9

Please sign in to comment.