Skip to content

Commit

Permalink
Add Go support for pulsar function secrets
Browse files Browse the repository at this point in the history
  • Loading branch information
David Rayner authored and DavidRayner committed May 23, 2024
1 parent 951eb51 commit 53908f7
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 14 deletions.
3 changes: 3 additions & 0 deletions pulsar-function-go/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Conf struct {
// Deprecated
AutoACK bool `json:"autoAck" yaml:"autoAck"`
Parallelism int32 `json:"parallelism" yaml:"parallelism"`
// SecretProvider config
SecretsProviderClassName string `json:"secretsProviderClassName" yaml:"secretsProviderClassName"`
SecretsProviderConfig string `json:"secretsProviderConfig" yaml:"secretsProviderConfig"`
//source config
SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"`
TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
Expand Down
25 changes: 19 additions & 6 deletions pulsar-function-go/pf/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,26 @@ import (
// message, what are our operating constraints, etc can be accessed by the
// executing function
type FunctionContext struct {
instanceConf *instanceConf
userConfigs map[string]interface{}
logAppender *LogAppender
outputMessage func(topic string) pulsar.Producer
userMetrics sync.Map
record pulsar.Message
instanceConf *instanceConf
userConfigs map[string]interface{}
secrets map[string]interface{}
logAppender *LogAppender
outputMessage func(topic string) pulsar.Producer
userMetrics sync.Map
record pulsar.Message
secretsProvider SecretsProvider
}

// NewFuncContext returns a new Function context
func NewFuncContext() *FunctionContext {
instanceConf := newInstanceConf()
userConfigs := buildUserConfig(instanceConf.funcDetails.GetUserConfig())
secrets := buildUserConfig(instanceConf.funcDetails.SecretsMap)

fc := &FunctionContext{
instanceConf: instanceConf,
userConfigs: userConfigs,
secrets: secrets,
}
return fc
}
Expand Down Expand Up @@ -155,6 +159,15 @@ func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
return c.userConfigs
}

// GetSecretValue returns the value of a key from the pulsar function's secret
// map
func (c *FunctionContext) GetSecretValue(key string) interface{} {
if c.secretsProvider != nil {
return c.secretsProvider.GetValue(c.secrets, key)
}
return nil
}

// NewOutputMessage send message to the topic @param topicName: The name of the
// topic for output message
func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer {
Expand Down
18 changes: 18 additions & 0 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func (gi *goInstance) startFunction(function function) error {
log.Errorf("setup log appender failed, error is:%v", err)
return err
}
err = gi.setupSecretsProvider()
if err != nil {
log.Errorf("setup secret provider failed, error is:%v", err)
return err
}

idleDuration := getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdle)
idleTimer := time.NewTimer(idleDuration)
Expand Down Expand Up @@ -496,6 +501,19 @@ func (gi *goInstance) addLogTopicHandler() {
}
}

func (gi *goInstance) setupSecretsProvider() error {
switch gi.context.instanceConf.secretsProviderClassName {
case "ClearTextSecretsProvider":
gi.context.secretsProvider = &ClearTextSecretsProvider{}
case "EnvironmentBasedSecretsProvider":
gi.context.secretsProvider = &EnvironmentBasedSecretsProvider{}
default:
return fmt.Errorf("unknown secretsProviderClassName: %s",
gi.context.instanceConf.secretsProviderClassName)
}
return nil
}

func (gi *goInstance) closeLogTopic() {
log.Info("closing log topic...")
if gi.context.logAppender == nil {
Expand Down
14 changes: 9 additions & 5 deletions pulsar-function-go/pf/instanceConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type instanceConf struct {
metricsPort int
authPlugin string
authParams string
secretsProviderClassName string
secretsProviderConfig string
tlsTrustCertsPath string
tlsAllowInsecure bool
tlsHostnameVerification bool
Expand Down Expand Up @@ -118,11 +120,13 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
},
UserConfig: cfg.UserConfig,
},
authPlugin: cfg.ClientAuthenticationPlugin,
authParams: cfg.ClientAuthenticationParameters,
tlsTrustCertsPath: cfg.TLSTrustCertsFilePath,
tlsAllowInsecure: cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
authPlugin: cfg.ClientAuthenticationPlugin,
authParams: cfg.ClientAuthenticationParameters,
secretsProviderClassName: cfg.SecretsProviderClassName,
secretsProviderConfig: cfg.SecretsProviderConfig,
tlsTrustCertsPath: cfg.TLSTrustCertsFilePath,
tlsAllowInsecure: cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
}
// parse the raw function details and ignore the unmarshal error(fallback to original way)
if cfg.FunctionDetails != "" {
Expand Down
35 changes: 35 additions & 0 deletions pulsar-function-go/pf/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package pf
import (
"context"
"fmt"
"os"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -115,3 +116,37 @@ func Test_goInstance_handlerMsg(t *testing.T) {
assert.Equal(t, "output", string(output))
assert.Equal(t, message, fc.record)
}

func Test_goInstance_clearTextSecretsProvider(t *testing.T) {
fc := NewFuncContext()
fc.secrets = map[string]interface{}{
"mySecret": "hello world",
}
fc.instanceConf.secretsProviderClassName = "ClearTextSecretsProvider"
instance := &goInstance{
context: fc,
}

assert.Nil(t, instance.context.secretsProvider)
assert.Nil(t, instance.setupSecretsProvider())
assert.NotNil(t, instance.context.secretsProvider)

assert.Equal(t, fc.GetSecretValue("mySecret"), "hello world")
assert.Nil(t, fc.GetSecretValue("notASecret"))
}

func Test_goInstance_environmentBasedSecretsProvider(t *testing.T) {
fc := NewFuncContext()
fc.instanceConf.secretsProviderClassName = "EnvironmentBasedSecretsProvider"
instance := &goInstance{
context: fc,
}

assert.Nil(t, instance.context.secretsProvider)
assert.Nil(t, instance.setupSecretsProvider())
assert.NotNil(t, instance.context.secretsProvider)

os.Setenv("MY_ENV_VAR", "hello world")
assert.Equal(t, fc.GetSecretValue("MY_ENV_VAR"), "hello world")
assert.Nil(t, fc.GetSecretValue("NOT_AN_ENV_VAR"))
}
52 changes: 52 additions & 0 deletions pulsar-function-go/pf/secretsProvider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package pf

import (
"os"

log "github.com/apache/pulsar/pulsar-function-go/logutil"
)

type SecretsProvider interface {
GetValue(secrets map[string]interface{}, key string) interface{}
}

type ClearTextSecretsProvider struct{}

func (p *ClearTextSecretsProvider) GetValue(secrets map[string]interface{}, key string) interface{} {
val, ok := secrets[key]
if !ok {
log.Debugf("secret key %s not present in function secrets", key)
return nil
}
return val
}

type EnvironmentBasedSecretsProvider struct{}

func (p *EnvironmentBasedSecretsProvider) GetValue(_ map[string]interface{}, key string) interface{} {
val, ok := os.LookupEnv(key)
if !ok {
log.Debugf("secret key %s does not match an environment variable", key)
return nil
}
return val
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class GoInstanceConfig {
private boolean autoAck;
private int parallelism;

private String secretsProviderClassName = "";
private String secretsProviderConfig = "";

private int subscriptionType;
private long timeoutMs;
private String subscriptionName = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
String pulsarServiceUrl,
String stateStorageServiceUrl,
String pulsarWebServiceUrl,
String secretsProviderClassName,
String secretsProviderConfig,
boolean k8sRuntime) throws IOException {
final List<String> args = new LinkedList<>();
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
Expand Down Expand Up @@ -285,6 +287,14 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
goInstanceConfig.setMetricsPort(instanceConfig.getMetricsPort());
}

if (secretsProviderClassName != null) {
goInstanceConfig.setSecretsProviderClassName(secretsProviderClassName);
}

if (secretsProviderConfig != null) {
goInstanceConfig.setSecretsProviderConfig(secretsProviderConfig);
}

goInstanceConfig.setKillAfterIdleMs(0);
goInstanceConfig.setPort(instanceConfig.getPort());

Expand Down Expand Up @@ -329,7 +339,7 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
return getGoInstanceCmd(instanceConfig, authConfig, originalCodeFileName,
pulsarServiceUrl, stateStorageServiceUrl, pulsarWebServiceUrl,
k8sRuntime);
secretsProviderClassName, secretsProviderConfig, k8sRuntime);
}

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setExposePulsarAdminClientEnabled(true);

List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, authConfig, "config", "pulsar://localhost:6650", "bk://localhost:4181", "http://localhost:8080", k8sRuntime);
List<String> commands = RuntimeUtils.getGoInstanceCmd(
instanceConfig, authConfig, "config", "pulsar://localhost:6650", "bk://localhost:4181",
"http://localhost:8080", "ClearTextSecretsProvider", "", k8sRuntime);
if (k8sRuntime) {
goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
} else {
Expand Down Expand Up @@ -174,6 +176,8 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
Assert.assertEquals(goInstanceConfig.get("tlsTrustCertsFilePath"), "/secret/ca.cert.pem");
Assert.assertEquals(goInstanceConfig.get("tlsHostnameVerificationEnable"), true);
Assert.assertEquals(goInstanceConfig.get("tlsAllowInsecureConnection"), false);
Assert.assertEquals(goInstanceConfig.get("secretsProviderClassName"), "ClearTextSecretsProvider");
Assert.assertEquals(goInstanceConfig.get("secretsProviderConfig"), "");
}

@DataProvider(name = "k8sRuntime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public String getSecretsProviderClassName(Function.FunctionDetails functionDetai
case PYTHON:
return "secretsprovider.ClearTextSecretsProvider";
case GO:
return "";
return "ClearTextSecretsProvider";
default:
throw new RuntimeException("Unknown runtime " + functionDetails.getRuntime());
}
Expand Down

0 comments on commit 53908f7

Please sign in to comment.