From fb1fd967b34f6e5505e604447fe8224ffbc17359 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Fri, 22 Mar 2024 11:11:25 +0800 Subject: [PATCH] [improve][fn] Support OAuth2 in Go instance --- pulsar-function-go/pf/instance.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index 1064aece46fe88..bd3c2a19ad6550 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -21,6 +21,7 @@ package pf import ( "context" + "encoding/json" "fmt" "math" "strconv" @@ -195,8 +196,9 @@ CLOSE: } const ( - authPluginToken = "org.apache.pulsar.client.impl.auth.AuthenticationToken" - authPluginNone = "" + authPluginToken = "org.apache.pulsar.client.impl.auth.AuthenticationToken" + authPluginOAuth2 = "org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2" + authPluginNone = "" ) func (gi *goInstance) setupClient() error { @@ -221,6 +223,15 @@ func (gi *goInstance) setupClient() error { default: return fmt.Errorf(`unknown token format - expecting "file://" or "token:" prefix`) } + case authPluginOAuth2: + if ic.authParams == "" { + return fmt.Errorf("auth plugin %s given, but authParams is empty", authPluginToken) + } + var authMap map[string]string + if err := json.Unmarshal([]byte(ic.authParams), &authMap); err != nil { + return fmt.Errorf(`unknown auth params format for OAuth2 - expecting a json string of map`) + } + clientOpts.Authentication = pulsar.NewAuthenticationOAuth2(authMap) case authPluginNone: clientOpts.Authentication, _ = pulsar.NewAuthentication("", "") // ret: auth.NewAuthDisabled() default: