diff --git a/docs/reference-config.yaml b/docs/reference-config.yaml index e449f63..402f4e1 100644 --- a/docs/reference-config.yaml +++ b/docs/reference-config.yaml @@ -46,7 +46,7 @@ kafka: username: "" # Password to use for PLAIN or SCRAM mechanism password: "" - # Mechanism to use for SASL Authentication. Valid values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI + # Mechanism to use for SASL Authentication. Valid values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER mechanism: "PLAIN" # GSSAPI / Kerberos config properties gssapi: @@ -58,6 +58,12 @@ kafka: password: "" realm: "" enableFast: true + # OAUTHBEARER config properties + oauth: + tokenEndpoint: "" + clientId: "" + clientSecret: "" + scope: "" minion: consumerGroups: diff --git a/kafka/client_config_helper.go b/kafka/client_config_helper.go index 35f5bb5..12d71be 100644 --- a/kafka/client_config_helper.go +++ b/kafka/client_config_helper.go @@ -1,6 +1,7 @@ package kafka import ( + "context" "crypto/tls" "crypto/x509" "encoding/pem" @@ -15,6 +16,7 @@ import ( "github.com/twmb/franz-go/pkg/kversion" "github.com/twmb/franz-go/pkg/sasl" "github.com/twmb/franz-go/pkg/sasl/kerberos" + "github.com/twmb/franz-go/pkg/sasl/oauth" "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/pkg/sasl/scram" "go.uber.org/zap" @@ -108,6 +110,18 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) { }.AsMechanism() opts = append(opts, kgo.SASL(kerberosMechanism)) } + + // OAuthBearer + if cfg.SASL.Mechanism == "OAUTHBEARER" { + mechanism := oauth.Oauth(func(ctx context.Context) (oauth.Auth, error) { + token, err := cfg.SASL.OAuthBearer.getToken(ctx) + return oauth.Auth{ + Zid: cfg.SASL.OAuthBearer.ClientID, + Token: token, + }, err + }) + opts = append(opts, kgo.SASL(mechanism)) + } } // Configure TLS diff --git a/kafka/config_sasl.go b/kafka/config_sasl.go index 522f522..73c82de 100644 --- a/kafka/config_sasl.go +++ b/kafka/config_sasl.go @@ -18,7 +18,8 @@ type SASLConfig struct { Mechanism string `koanf:"mechanism"` // SASL Mechanisms that require more configuration than username & password - GSSAPI SASLGSSAPIConfig `koanf:"gssapi"` + GSSAPI SASLGSSAPIConfig `koanf:"gssapi"` + OAuthBearer OAuthBearerConfig `koanf:"oauth"` } // SetDefaults for SASL Config @@ -38,7 +39,7 @@ func (c *SASLConfig) Validate() error { case SASLMechanismPlain, SASLMechanismScramSHA256, SASLMechanismScramSHA512, SASLMechanismGSSAPI: // Valid and supported case SASLMechanismOAuthBearer: - return fmt.Errorf("sasl mechanism '%v' is valid but not yet supported. Please submit an issue if you need it", c.Mechanism) + return c.OAuthBearer.Validate() default: return fmt.Errorf("given sasl mechanism '%v' is invalid", c.Mechanism) } diff --git a/kafka/config_sasl_oauthbearer.go b/kafka/config_sasl_oauthbearer.go new file mode 100644 index 0000000..0a8b5ec --- /dev/null +++ b/kafka/config_sasl_oauthbearer.go @@ -0,0 +1,73 @@ +package kafka + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" +) + +type OAuthBearerConfig struct { + TokenEndpoint string `koanf:"tokenEndpoint"` + ClientID string `koanf:"clientId"` + ClientSecret string `koanf:"clientSecret"` + Scope string `koanf:"scope"` +} + +func (c *OAuthBearerConfig) Validate() error { + if c.TokenEndpoint == "" { + return fmt.Errorf("OAuthBearer token endpoint is not specified") + } + if c.ClientID == "" || c.ClientSecret == "" { + return fmt.Errorf("OAuthBearer client credentials are not specified") + } + return nil +} + +// same as AcquireToken in Console https://github.com/redpanda-data/console/blob/master/backend/pkg/config/kafka_sasl_oauth.go#L56 +func (c *OAuthBearerConfig) getToken(ctx context.Context) (string, error) { + authHeaderValue := base64.StdEncoding.EncodeToString([]byte(c.ClientID + ":" + c.ClientSecret)) + + queryParams := url.Values{ + "grant_type": []string{"client_credentials"}, + "scope": []string{c.Scope}, + } + + req, err := http.NewRequestWithContext(ctx, "POST", c.TokenEndpoint, strings.NewReader(queryParams.Encode())) + if err != nil { + return "", fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.URL.RawQuery = queryParams.Encode() + + req.Header.Set("Authorization", "Basic "+authHeaderValue) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + client := &http.Client{} + + resp, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("HTTP request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("token request failed with status code %d", resp.StatusCode) + } + + var tokenResponse map[string]interface{} + decoder := json.NewDecoder(resp.Body) + if err := decoder.Decode(&tokenResponse); err != nil { + return "", fmt.Errorf("failed to parse token response: %w", err) + } + + accessToken, ok := tokenResponse["access_token"].(string) + if !ok { + return "", fmt.Errorf("access_token not found in token response") + } + + return accessToken, nil +}