Skip to content

Commit

Permalink
Add support for SASL/OAUTHBEARER (#230)
Browse files Browse the repository at this point in the history
* add support for SASL/OAUTHBEARER

* rename oauthbearer -> oauth

* mention OAUTHBEARER in reference config
  • Loading branch information
bachmanity1 authored Dec 6, 2023
1 parent 406d198 commit 4fa1a11
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 3 deletions.
8 changes: 7 additions & 1 deletion docs/reference-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ kafka:
username: ""
# Password to use for PLAIN or SCRAM mechanism
password: ""
# Mechanism to use for SASL Authentication. Valid values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
# Mechanism to use for SASL Authentication. Valid values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER
mechanism: "PLAIN"
# GSSAPI / Kerberos config properties
gssapi:
Expand All @@ -58,6 +58,12 @@ kafka:
password: ""
realm: ""
enableFast: true
# OAUTHBEARER config properties
oauth:
tokenEndpoint: ""
clientId: ""
clientSecret: ""
scope: ""

minion:
consumerGroups:
Expand Down
14 changes: 14 additions & 0 deletions kafka/client_config_helper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/twmb/franz-go/pkg/kversion"
"github.com/twmb/franz-go/pkg/sasl"
"github.com/twmb/franz-go/pkg/sasl/kerberos"
"github.com/twmb/franz-go/pkg/sasl/oauth"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"go.uber.org/zap"
Expand Down Expand Up @@ -108,6 +110,18 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
}.AsMechanism()
opts = append(opts, kgo.SASL(kerberosMechanism))
}

// OAuthBearer
if cfg.SASL.Mechanism == "OAUTHBEARER" {
mechanism := oauth.Oauth(func(ctx context.Context) (oauth.Auth, error) {
token, err := cfg.SASL.OAuthBearer.getToken(ctx)
return oauth.Auth{
Zid: cfg.SASL.OAuthBearer.ClientID,
Token: token,
}, err
})
opts = append(opts, kgo.SASL(mechanism))
}
}

// Configure TLS
Expand Down
5 changes: 3 additions & 2 deletions kafka/config_sasl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type SASLConfig struct {
Mechanism string `koanf:"mechanism"`

// SASL Mechanisms that require more configuration than username & password
GSSAPI SASLGSSAPIConfig `koanf:"gssapi"`
GSSAPI SASLGSSAPIConfig `koanf:"gssapi"`
OAuthBearer OAuthBearerConfig `koanf:"oauth"`
}

// SetDefaults for SASL Config
Expand All @@ -38,7 +39,7 @@ func (c *SASLConfig) Validate() error {
case SASLMechanismPlain, SASLMechanismScramSHA256, SASLMechanismScramSHA512, SASLMechanismGSSAPI:
// Valid and supported
case SASLMechanismOAuthBearer:
return fmt.Errorf("sasl mechanism '%v' is valid but not yet supported. Please submit an issue if you need it", c.Mechanism)
return c.OAuthBearer.Validate()
default:
return fmt.Errorf("given sasl mechanism '%v' is invalid", c.Mechanism)
}
Expand Down
73 changes: 73 additions & 0 deletions kafka/config_sasl_oauthbearer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kafka

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
)

type OAuthBearerConfig struct {
TokenEndpoint string `koanf:"tokenEndpoint"`
ClientID string `koanf:"clientId"`
ClientSecret string `koanf:"clientSecret"`
Scope string `koanf:"scope"`
}

func (c *OAuthBearerConfig) Validate() error {
if c.TokenEndpoint == "" {
return fmt.Errorf("OAuthBearer token endpoint is not specified")
}
if c.ClientID == "" || c.ClientSecret == "" {
return fmt.Errorf("OAuthBearer client credentials are not specified")
}
return nil
}

// same as AcquireToken in Console https://github.com/redpanda-data/console/blob/master/backend/pkg/config/kafka_sasl_oauth.go#L56
func (c *OAuthBearerConfig) getToken(ctx context.Context) (string, error) {
authHeaderValue := base64.StdEncoding.EncodeToString([]byte(c.ClientID + ":" + c.ClientSecret))

queryParams := url.Values{
"grant_type": []string{"client_credentials"},
"scope": []string{c.Scope},
}

req, err := http.NewRequestWithContext(ctx, "POST", c.TokenEndpoint, strings.NewReader(queryParams.Encode()))
if err != nil {
return "", fmt.Errorf("failed to create HTTP request: %w", err)
}

req.URL.RawQuery = queryParams.Encode()

req.Header.Set("Authorization", "Basic "+authHeaderValue)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

client := &http.Client{}

resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("HTTP request failed: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("token request failed with status code %d", resp.StatusCode)
}

var tokenResponse map[string]interface{}
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&tokenResponse); err != nil {
return "", fmt.Errorf("failed to parse token response: %w", err)
}

accessToken, ok := tokenResponse["access_token"].(string)
if !ok {
return "", fmt.Errorf("access_token not found in token response")
}

return accessToken, nil
}

0 comments on commit 4fa1a11

Please sign in to comment.