Skip to content

Commit

Permalink
Merge pull request #3061 from pburrows-ns1/develop
Browse files Browse the repository at this point in the history
feat(sinks): enable sinks with token auth.
  • Loading branch information
mfiedorowicz authored Aug 19, 2024
2 parents a1eb427 + 9920aa2 commit eca8480
Show file tree
Hide file tree
Showing 14 changed files with 857 additions and 68 deletions.
53 changes: 53 additions & 0 deletions maestro/config/authentication_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/orb-community/orb/maestro/password"
"github.com/orb-community/orb/pkg/types"
"github.com/orb-community/orb/sinks/authentication_type/basicauth"
"github.com/orb-community/orb/sinks/authentication_type/bearertokenauth"
)

const AuthenticationKey = "authentication"
Expand All @@ -20,7 +21,12 @@ func GetAuthService(authType string, service password.EncryptionService) AuthBui
return &BasicAuthBuilder{
encryptionService: service,
}
case bearertokenauth.AuthType:
return &BearerTokenAuthBuilder{
encryptionService: service,
}
}

return nil
}

Expand Down Expand Up @@ -65,3 +71,50 @@ func (b *BasicAuthBuilder) EncodeAuth(config types.Metadata) (types.Metadata, er
config[AuthenticationKey] = authcfg
return config, nil
}

type BearerTokenAuthBuilder struct {
encryptionService password.EncryptionService
}

func (b *BearerTokenAuthBuilder) GetExtensionsFromMetadata(c types.Metadata) (Extensions, string) {
authcfg := c.GetSubMetadata(AuthenticationKey)
scheme := authcfg["scheme"].(string)
token := authcfg["token"].(string)

return Extensions{
BearerAuth: &BearerTokenAuthExtension{
Scheme: scheme,
Token: token,
},
}, "bearertokenauth/withscheme"
}

func (b *BearerTokenAuthBuilder) DecodeAuth(config types.Metadata) (types.Metadata, error) {
authCfg := config.GetSubMetadata(AuthenticationKey)
token := authCfg["token"].(string)

decodedToken, err := b.encryptionService.DecodePassword(token)
if err != nil {
return nil, err
}

authCfg["token"] = decodedToken
config[AuthenticationKey] = authCfg

return config, nil
}

func (b *BearerTokenAuthBuilder) EncodeAuth(config types.Metadata) (types.Metadata, error) {
authcfg := config.GetSubMetadata(AuthenticationKey)
token := authcfg["token"].(string)

encodedToken, err := b.encryptionService.EncodePassword(token)
if err != nil {
return nil, err
}

authcfg["token"] = encodedToken
config[AuthenticationKey] = authcfg

return config, nil
}
30 changes: 28 additions & 2 deletions maestro/config/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package config
import (
"context"
"fmt"
"testing"

"go.uber.org/zap"

"github.com/orb-community/orb/maestro/password"
"github.com/orb-community/orb/pkg/types"
"go.uber.org/zap"
"testing"
)

func TestReturnConfigYamlFromSink(t *testing.T) {
Expand Down Expand Up @@ -97,6 +99,30 @@ func TestReturnConfigYamlFromSink(t *testing.T) {
want: `---\nreceivers:\n kafka:\n brokers:\n - kafka:9092\n topic: otlp_metrics-sink-id-22\n protocol_version: 2.0.0\nextensions:\n pprof:\n endpoint: 0.0.0.0:1888\n basicauth/exporter:\n client_auth:\n username: otlp-user\n password: dbpass\nexporters:\n otlphttp:\n endpoint: https://acme.com/otlphttp/push\n auth:\n authenticator: basicauth/exporter\nservice:\n extensions:\n - pprof\n - basicauth/exporter\n pipelines:\n metrics:\n receivers:\n - kafka\n exporters:\n - otlphttp\n`,
wantErr: false,
},
{
name: "otlp, token auth",
args: args{
in0: context.Background(),
kafkaUrlConfig: "kafka:9092",
sink: &DeploymentRequest{
SinkID: "sink-id-22",
OwnerID: "22",
Backend: "otlphttp",
Config: types.Metadata{
"exporter": types.Metadata{
"endpoint": "https://acme.com/otlphttp/push",
},
"authentication": types.Metadata{
"type": "bearertokenauth",
"scheme": "Api-Token",
"token": "abcdefg",
},
},
},
},
want: `---\nreceivers:\n kafka:\n brokers:\n - kafka:9092\n topic: otlp_metrics-sink-id-22\n protocol_version: 2.0.0\nextensions:\n pprof:\n endpoint: 0.0.0.0:1888\n bearertokenauth/withscheme:\n scheme: Api-Token\n token: abcdefg\nexporters:\n otlphttp:\n endpoint: https://acme.com/otlphttp/push\n auth:\n authenticator: bearertokenauth/withscheme\nservice:\n extensions:\n - pprof\n - bearertokenauth/withscheme\n pipelines:\n metrics:\n receivers:\n - kafka\n exporters:\n - otlphttp\n`,
wantErr: false,
},
}
for _, tt := range tests {
logger := zap.NewNop()
Expand Down
14 changes: 7 additions & 7 deletions maestro/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package config

import (
"database/sql/driver"
"github.com/orb-community/orb/pkg/types"
"time"

"github.com/orb-community/orb/pkg/types"
)

type SinkData struct {
Expand Down Expand Up @@ -82,8 +83,8 @@ type Extensions struct {
PProf *PProfExtension `json:"pprof,omitempty" yaml:"pprof,omitempty" :"p_prof"`
ZPages *ZPagesExtension `json:"zpages,omitempty" yaml:"zpages,omitempty" :"z_pages"`
// Exporters Authentication
BasicAuth *BasicAuthenticationExtension `json:"basicauth/exporter,omitempty" yaml:"basicauth/exporter,omitempty" :"basic_auth"`
//BearerAuth *BearerAuthExtension `json:"bearerauth/exporter,omitempty" yaml:"bearerauth/exporter,omitempty" :"bearer_auth"`
BasicAuth *BasicAuthenticationExtension `json:"basicauth/exporter,omitempty" yaml:"basicauth/exporter,omitempty" :"basic_auth"`
BearerAuth *BearerTokenAuthExtension `json:"bearertokenauth/withscheme,omitempty" yaml:"bearertokenauth/withscheme,omitempty"`
}

type HealthCheckExtension struct {
Expand Down Expand Up @@ -115,10 +116,9 @@ type BasicAuthenticationExtension struct {
ClientAuth *ClientAuth `json:"client_auth" yaml:"client_auth"`
}

type BearerAuthExtension struct {
BearerAuth *struct {
Token string `json:"token" yaml:"token"`
} `json:"client_auth" yaml:"client_auth"`
type BearerTokenAuthExtension struct {
Scheme string `json:"scheme" yaml:"scheme"`
Token string `json:"token" yaml:"token"`
}

type Exporters struct {
Expand Down
39 changes: 27 additions & 12 deletions pkg/errors/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,44 @@ var (
// ErrExporterFieldNotFound indicates that exporter field was not found
ErrExporterFieldNotFound = New("malformed entity specification. exporter field is expected on configuration field")

// ErrEndpointNotFound indicates that endpoint field was not found on exporter field for otlp backend
ErrEndpointNotFound = New("malformed entity specification. endpoint field is expected on exporter field")

// ErrInvalidEndpoint indicates that endpoint field is not valid
ErrInvalidEndpoint = New("malformed entity specification. endpoint field is invalid")

// ErrAuthFieldNotFound indicates that authentication field was not found on configuration field
ErrAuthFieldNotFound = New("malformed entity specification. authentication fields are expected on configuration field")

// ErrAuthTypeNotFound indicates that authentication type field was not found on the authentication field
ErrAuthTypeNotFound = New("malformed entity specification: authentication type field is expected on configuration field")

// ErrInvalidAuthType indicates invalid authentication type
ErrInvalidAuthType = New("malformed entity specification. type key on authentication field is invalid")
// ErrAuthInvalidType indicates invalid authentication type
ErrAuthInvalidType = New("malformed entity specification. type key on authentication field is invalid")

// ErrPasswordNotFound indicates that password key was not found
ErrPasswordNotFound = New("malformed entity specification. password key is expected on authentication field")
// ErrAuthUsernameNotFound indicates that username key was not found
ErrAuthUsernameNotFound = New("malformed entity specification. username key is expected on authentication field")

// ErrEndPointNotFound indicates that endpoint field was not found on exporter field for otlp backend
ErrEndpointNotFound = New("malformed entity specification. endpoint field is expected on exporter field")
// ErrAuthPasswordNotFound indicates that password key was not found
ErrAuthPasswordNotFound = New("malformed entity specification. password key is expected on authentication field")

// ErrInvalidEndpoint indicates that endpoint field is not valid
ErrInvalidEndpoint = New("malformed entity specification. endpoint field is invalid")
// ErrAuthSchemeNotFound indicates that scheme key was not found
ErrAuthSchemeNotFound = New("malformed entity specification. scheme key is expected on authentication field")

// ErrAuthTokenNotFound indicates that token key was not found
ErrAuthTokenNotFound = New("malformed entity specification. token key is expected on authentication field")

// ErrAuthInvalidPasswordType indicates invalid password key on authentication field
ErrAuthInvalidPasswordType = New("malformed entity specification. password key on authentication field is invalid")

// ErrAuthInvalidSchemeType indicates invalid scheme key on authentication field
ErrAuthInvalidSchemeType = New("malformed entity specification. scheme key on authentication field is invalid")

// ErrInvalidPasswordType indicates invalid password key on authentication field
ErrInvalidPasswordType = New("malformed entity specification. password key on authentication field is invalid")
// ErrAuthInvalidTokenType indicates invalid token key on authentication field
ErrAuthInvalidTokenType = New("malformed entity specification. token key on authentication field is invalid")

// ErrInvalidUsernameType indicates invalid username key on authentication field
ErrInvalidUsernameType = New("malformed entity specification. username key on authentication field is invalid")
// ErrAuthInvalidUsernameType indicates invalid username key on authentication field
ErrAuthInvalidUsernameType = New("malformed entity specification. username key on authentication field is invalid")

// ErrRemoteHostNotFound indicates that remote host field was not found
ErrRemoteHostNotFound = New("malformed entity specification. remote host is expected on exporter field")
Expand Down
2 changes: 1 addition & 1 deletion sinks/api/http/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func TestAuthenticationTypesEndpoints(t *testing.T) {
err = json.Unmarshal(body, &authResponse)
require.NoError(t, err, "must not error")
require.NotNil(t, authResponse, "response must not be nil")
require.Equal(t, 1, len(authResponse.AuthenticationTypes), "must contain basicauth for now")
require.Equal(t, 2, len(authResponse.AuthenticationTypes), "must contain basicauth and bearertokenauth")
},
},
"view authentication type basicauth": {
Expand Down
6 changes: 3 additions & 3 deletions sinks/api/http/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func GetConfigurationAndMetadataFromMeta(backendName string, config types.Metada
}
authTypeSvc, ok := authentication_type.GetAuthType(authtype.(string))
if !ok {
err = errors.Wrap(errors.ErrInvalidAuthType, errors.New("invalid required field authentication type"))
err = errors.Wrap(errors.ErrAuthInvalidType, errors.New("invalid required field authentication type"))
return
}
configSvc.Authentication = authTypeSvc
Expand Down Expand Up @@ -119,12 +119,12 @@ func GetConfigurationAndMetadataFromYaml(backendName string, config string) (con
case string:
break
default:
err = errors.ErrInvalidAuthType
err = errors.ErrAuthInvalidType
return
}
authTypeSvc, ok := authentication_type.GetAuthType(authtype.(string))
if !ok {
err = errors.Wrap(errors.ErrInvalidAuthType, errors.New("invalid required field authentication type"))
err = errors.Wrap(errors.ErrAuthInvalidType, errors.New("invalid required field authentication type"))
return
}
configSvc.Authentication = authTypeSvc
Expand Down
18 changes: 14 additions & 4 deletions sinks/api/http/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,25 @@ func encodeError(_ context.Context, err error, w http.ResponseWriter) {
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrBackendNotFound):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrPasswordNotFound):
case errors.Contains(errorVal, errors.ErrAuthUsernameNotFound):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrAuthPasswordNotFound):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrAuthTypeNotFound):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrInvalidUsernameType):
case errors.Contains(errorVal, errors.ErrAuthInvalidUsernameType):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrAuthInvalidPasswordType):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrAuthInvalidTokenType):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrAuthTokenNotFound):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrAuthInvalidSchemeType):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrInvalidPasswordType):
case errors.Contains(errorVal, errors.ErrAuthSchemeNotFound):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrInvalidAuthType):
case errors.Contains(errorVal, errors.ErrAuthInvalidType):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(errorVal, errors.ErrRemoteHostNotFound):
w.WriteHeader(http.StatusBadRequest)
Expand Down
71 changes: 45 additions & 26 deletions sinks/authentication_type/basicauth/authentication.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package basicauth

import (
"strings"

"gopkg.in/yaml.v3"

"github.com/orb-community/orb/pkg/errors"
"github.com/orb-community/orb/pkg/types"
"github.com/orb-community/orb/sinks/authentication_type"
"github.com/orb-community/orb/sinks/backend"
"gopkg.in/yaml.v3"
)

const (
AuthType = "basicauth"
UsernameConfigFeature = "username"
PasswordConfigFeature = "password"
)
Expand All @@ -32,11 +36,9 @@ var (
}
)

const AuthType = "basicauth"

type AuthConfig struct {
Username string `json:"username" ,yaml:"username"`
Password string `json:"password" ,yaml:"password"`
Username *string `json:"username" yaml:"username"`
Password *string `json:"password" yaml:"password"`
encryptionService authentication_type.PasswordService
}

Expand All @@ -56,27 +58,32 @@ func (a *AuthConfig) GetFeatureConfig() []authentication_type.ConfigFeature {
func (a *AuthConfig) ValidateConfiguration(inputFormat string, input interface{}) error {
switch inputFormat {
case "object":
if _, ok := input.(types.Metadata)[UsernameConfigFeature]; !ok {
return errors.Wrap(errors.ErrAuthUsernameNotFound, errors.New("username field was not found"))
}

if _, ok := input.(types.Metadata)[PasswordConfigFeature]; !ok {
return errors.Wrap(errors.ErrAuthPasswordNotFound, errors.New("password field was not found"))
}

for key, value := range input.(types.Metadata) {
if _, ok := value.(string); !ok {
if key == "password" {
return errors.Wrap(errors.ErrInvalidPasswordType, errors.New("invalid auth type for field: "+key))
}
if key == "type" {
return errors.Wrap(errors.ErrInvalidAuthType, errors.New("invalid auth type for field: "+key))
}
if key == "username" {
return errors.Wrap(errors.ErrInvalidUsernameType, errors.New("invalid auth type for field: "+key))
}
}
vs := value.(string)
if key == UsernameConfigFeature {
if len(vs) == 0 {
return errors.New("username cannot be empty")
if _, ok := value.(string); !ok {
return errors.Wrap(errors.ErrAuthInvalidUsernameType, errors.New("invalid auth type for field: "+key))
}

if len(strings.Fields(value.(string))) == 0 {
return errors.Wrap(errors.ErrAuthInvalidUsernameType, errors.New("invalid authentication username"))
}
}

if key == PasswordConfigFeature {
if len(vs) == 0 {
return errors.New("password cannot be empty")
if _, ok := value.(string); !ok {
return errors.Wrap(errors.ErrAuthInvalidPasswordType, errors.New("invalid auth type for field: "+key))
}

if len(strings.Fields(value.(string))) == 0 {
return errors.Wrap(errors.ErrAuthInvalidPasswordType, errors.New("invalid authentication password"))
}
}
}
Expand All @@ -85,12 +92,24 @@ func (a *AuthConfig) ValidateConfiguration(inputFormat string, input interface{}
if err != nil {
return err
}
if len(a.Username) == 0 {
return errors.New("username cannot be empty")
} else if len(a.Password) == 0 {
return errors.New("password cannot be empty")

if a.Username == nil {
return errors.Wrap(errors.ErrAuthUsernameNotFound, errors.New("username field was not found"))
}

if len(strings.Fields(*a.Username)) == 0 {
return errors.Wrap(errors.ErrAuthInvalidUsernameType, errors.New("invalid authentication username"))
}

if a.Password == nil {
return errors.Wrap(errors.ErrAuthPasswordNotFound, errors.New("password field was not found"))
}

if len(strings.Fields(*a.Password)) == 0 {
return errors.Wrap(errors.ErrAuthInvalidPasswordType, errors.New("invalid authentication password"))
}
}

return nil
}

Expand Down Expand Up @@ -156,7 +175,7 @@ func (a *AuthConfig) EncodeInformation(outputFormat string, input interface{}) (
inputMeta := input.(types.Metadata)
authMeta := inputMeta.GetSubMetadata(authentication_type.AuthenticationKey)
if _, ok := authMeta[PasswordConfigFeature].(string); !ok {
return nil, errors.Wrap(errors.ErrPasswordNotFound, errors.New("password field was not found"))
return nil, errors.Wrap(errors.ErrAuthPasswordNotFound, errors.New("password field was not found"))
}
encoded, err := a.encryptionService.EncodePassword(authMeta[PasswordConfigFeature].(string))
if err != nil {
Expand Down
Loading

0 comments on commit eca8480

Please sign in to comment.