From 53908f713e75ebdb54e68bff238318c97d21a581 Mon Sep 17 00:00:00 2001 From: David Rayner Date: Mon, 29 Apr 2024 11:45:06 +0100 Subject: [PATCH] Add Go support for pulsar function secrets --- pulsar-function-go/conf/conf.go | 3 ++ pulsar-function-go/pf/context.go | 25 ++++++--- pulsar-function-go/pf/instance.go | 18 +++++++ pulsar-function-go/pf/instanceConf.go | 14 +++-- pulsar-function-go/pf/instance_test.go | 35 +++++++++++++ pulsar-function-go/pf/secretsProvider.go | 52 +++++++++++++++++++ .../instance/go/GoInstanceConfig.java | 3 ++ .../functions/runtime/RuntimeUtils.java | 12 ++++- .../functions/runtime/RuntimeUtilsTest.java | 6 ++- .../DefaultSecretsProviderConfigurator.java | 2 +- 10 files changed, 156 insertions(+), 14 deletions(-) create mode 100644 pulsar-function-go/pf/secretsProvider.go diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go index 1442a0f865f4a..d86eff7b1285c 100644 --- a/pulsar-function-go/conf/conf.go +++ b/pulsar-function-go/conf/conf.go @@ -60,6 +60,9 @@ type Conf struct { // Deprecated AutoACK bool `json:"autoAck" yaml:"autoAck"` Parallelism int32 `json:"parallelism" yaml:"parallelism"` + // SecretProvider config + SecretsProviderClassName string `json:"secretsProviderClassName" yaml:"secretsProviderClassName"` + SecretsProviderConfig string `json:"secretsProviderConfig" yaml:"secretsProviderConfig"` //source config SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"` TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"` diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go index 0b269a568ffec..1eab666d7bc04 100644 --- a/pulsar-function-go/pf/context.go +++ b/pulsar-function-go/pf/context.go @@ -34,22 +34,26 @@ import ( // message, what are our operating constraints, etc can be accessed by the // executing function type FunctionContext struct { - instanceConf *instanceConf - userConfigs map[string]interface{} - logAppender *LogAppender - outputMessage func(topic string) pulsar.Producer - userMetrics sync.Map - record pulsar.Message + instanceConf *instanceConf + userConfigs map[string]interface{} + secrets map[string]interface{} + logAppender *LogAppender + outputMessage func(topic string) pulsar.Producer + userMetrics sync.Map + record pulsar.Message + secretsProvider SecretsProvider } // NewFuncContext returns a new Function context func NewFuncContext() *FunctionContext { instanceConf := newInstanceConf() userConfigs := buildUserConfig(instanceConf.funcDetails.GetUserConfig()) + secrets := buildUserConfig(instanceConf.funcDetails.SecretsMap) fc := &FunctionContext{ instanceConf: instanceConf, userConfigs: userConfigs, + secrets: secrets, } return fc } @@ -155,6 +159,15 @@ func (c *FunctionContext) GetUserConfMap() map[string]interface{} { return c.userConfigs } +// GetSecretValue returns the value of a key from the pulsar function's secret +// map +func (c *FunctionContext) GetSecretValue(key string) interface{} { + if c.secretsProvider != nil { + return c.secretsProvider.GetValue(c.secrets, key) + } + return nil +} + // NewOutputMessage send message to the topic @param topicName: The name of the // topic for output message func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer { diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index 1064aece46fe8..7b8be108286b0 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -138,6 +138,11 @@ func (gi *goInstance) startFunction(function function) error { log.Errorf("setup log appender failed, error is:%v", err) return err } + err = gi.setupSecretsProvider() + if err != nil { + log.Errorf("setup secret provider failed, error is:%v", err) + return err + } idleDuration := getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdle) idleTimer := time.NewTimer(idleDuration) @@ -496,6 +501,19 @@ func (gi *goInstance) addLogTopicHandler() { } } +func (gi *goInstance) setupSecretsProvider() error { + switch gi.context.instanceConf.secretsProviderClassName { + case "ClearTextSecretsProvider": + gi.context.secretsProvider = &ClearTextSecretsProvider{} + case "EnvironmentBasedSecretsProvider": + gi.context.secretsProvider = &EnvironmentBasedSecretsProvider{} + default: + return fmt.Errorf("unknown secretsProviderClassName: %s", + gi.context.instanceConf.secretsProviderClassName) + } + return nil +} + func (gi *goInstance) closeLogTopic() { log.Info("closing log topic...") if gi.context.logAppender == nil { diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go index 844a2bc9b89a3..f4e0789704d18 100644 --- a/pulsar-function-go/pf/instanceConf.go +++ b/pulsar-function-go/pf/instanceConf.go @@ -48,6 +48,8 @@ type instanceConf struct { metricsPort int authPlugin string authParams string + secretsProviderClassName string + secretsProviderConfig string tlsTrustCertsPath string tlsAllowInsecure bool tlsHostnameVerification bool @@ -118,11 +120,13 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf { }, UserConfig: cfg.UserConfig, }, - authPlugin: cfg.ClientAuthenticationPlugin, - authParams: cfg.ClientAuthenticationParameters, - tlsTrustCertsPath: cfg.TLSTrustCertsFilePath, - tlsAllowInsecure: cfg.TLSAllowInsecureConnection, - tlsHostnameVerification: cfg.TLSHostnameVerificationEnable, + authPlugin: cfg.ClientAuthenticationPlugin, + authParams: cfg.ClientAuthenticationParameters, + secretsProviderClassName: cfg.SecretsProviderClassName, + secretsProviderConfig: cfg.SecretsProviderConfig, + tlsTrustCertsPath: cfg.TLSTrustCertsFilePath, + tlsAllowInsecure: cfg.TLSAllowInsecureConnection, + tlsHostnameVerification: cfg.TLSHostnameVerificationEnable, } // parse the raw function details and ignore the unmarshal error(fallback to original way) if cfg.FunctionDetails != "" { diff --git a/pulsar-function-go/pf/instance_test.go b/pulsar-function-go/pf/instance_test.go index bf45ae3a8917e..81a529404d08c 100644 --- a/pulsar-function-go/pf/instance_test.go +++ b/pulsar-function-go/pf/instance_test.go @@ -22,6 +22,7 @@ package pf import ( "context" "fmt" + "os" "strconv" "testing" "time" @@ -115,3 +116,37 @@ func Test_goInstance_handlerMsg(t *testing.T) { assert.Equal(t, "output", string(output)) assert.Equal(t, message, fc.record) } + +func Test_goInstance_clearTextSecretsProvider(t *testing.T) { + fc := NewFuncContext() + fc.secrets = map[string]interface{}{ + "mySecret": "hello world", + } + fc.instanceConf.secretsProviderClassName = "ClearTextSecretsProvider" + instance := &goInstance{ + context: fc, + } + + assert.Nil(t, instance.context.secretsProvider) + assert.Nil(t, instance.setupSecretsProvider()) + assert.NotNil(t, instance.context.secretsProvider) + + assert.Equal(t, fc.GetSecretValue("mySecret"), "hello world") + assert.Nil(t, fc.GetSecretValue("notASecret")) +} + +func Test_goInstance_environmentBasedSecretsProvider(t *testing.T) { + fc := NewFuncContext() + fc.instanceConf.secretsProviderClassName = "EnvironmentBasedSecretsProvider" + instance := &goInstance{ + context: fc, + } + + assert.Nil(t, instance.context.secretsProvider) + assert.Nil(t, instance.setupSecretsProvider()) + assert.NotNil(t, instance.context.secretsProvider) + + os.Setenv("MY_ENV_VAR", "hello world") + assert.Equal(t, fc.GetSecretValue("MY_ENV_VAR"), "hello world") + assert.Nil(t, fc.GetSecretValue("NOT_AN_ENV_VAR")) +} diff --git a/pulsar-function-go/pf/secretsProvider.go b/pulsar-function-go/pf/secretsProvider.go new file mode 100644 index 0000000000000..a85c9f4e2e46d --- /dev/null +++ b/pulsar-function-go/pf/secretsProvider.go @@ -0,0 +1,52 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package pf + +import ( + "os" + + log "github.com/apache/pulsar/pulsar-function-go/logutil" +) + +type SecretsProvider interface { + GetValue(secrets map[string]interface{}, key string) interface{} +} + +type ClearTextSecretsProvider struct{} + +func (p *ClearTextSecretsProvider) GetValue(secrets map[string]interface{}, key string) interface{} { + val, ok := secrets[key] + if !ok { + log.Debugf("secret key %s not present in function secrets", key) + return nil + } + return val +} + +type EnvironmentBasedSecretsProvider struct{} + +func (p *EnvironmentBasedSecretsProvider) GetValue(_ map[string]interface{}, key string) interface{} { + val, ok := os.LookupEnv(key) + if !ok { + log.Debugf("secret key %s does not match an environment variable", key) + return nil + } + return val +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java index 467ec74921330..ab768bea38c66 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java @@ -57,6 +57,9 @@ public class GoInstanceConfig { private boolean autoAck; private int parallelism; + private String secretsProviderClassName = ""; + private String secretsProviderConfig = ""; + private int subscriptionType; private long timeoutMs; private String subscriptionName = ""; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 6160626c958ef..8ff32e0cf667c 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -135,6 +135,8 @@ public static List getGoInstanceCmd(InstanceConfig instanceConfig, String pulsarServiceUrl, String stateStorageServiceUrl, String pulsarWebServiceUrl, + String secretsProviderClassName, + String secretsProviderConfig, boolean k8sRuntime) throws IOException { final List args = new LinkedList<>(); GoInstanceConfig goInstanceConfig = new GoInstanceConfig(); @@ -285,6 +287,14 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) { goInstanceConfig.setMetricsPort(instanceConfig.getMetricsPort()); } + if (secretsProviderClassName != null) { + goInstanceConfig.setSecretsProviderClassName(secretsProviderClassName); + } + + if (secretsProviderConfig != null) { + goInstanceConfig.setSecretsProviderConfig(secretsProviderConfig); + } + goInstanceConfig.setKillAfterIdleMs(0); goInstanceConfig.setPort(instanceConfig.getPort()); @@ -329,7 +339,7 @@ public static List getCmd(InstanceConfig instanceConfig, if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) { return getGoInstanceCmd(instanceConfig, authConfig, originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl, pulsarWebServiceUrl, - k8sRuntime); + secretsProviderClassName, secretsProviderConfig, k8sRuntime); } if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java index b19be92e6ba81..2f3091e9e1c94 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java @@ -123,7 +123,9 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException { instanceConfig.setFunctionDetails(functionDetails); instanceConfig.setExposePulsarAdminClientEnabled(true); - List commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, authConfig, "config", "pulsar://localhost:6650", "bk://localhost:4181", "http://localhost:8080", k8sRuntime); + List commands = RuntimeUtils.getGoInstanceCmd( + instanceConfig, authConfig, "config", "pulsar://localhost:6650", "bk://localhost:4181", + "http://localhost:8080", "ClearTextSecretsProvider", "", k8sRuntime); if (k8sRuntime) { goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class); } else { @@ -174,6 +176,8 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException { Assert.assertEquals(goInstanceConfig.get("tlsTrustCertsFilePath"), "/secret/ca.cert.pem"); Assert.assertEquals(goInstanceConfig.get("tlsHostnameVerificationEnable"), true); Assert.assertEquals(goInstanceConfig.get("tlsAllowInsecureConnection"), false); + Assert.assertEquals(goInstanceConfig.get("secretsProviderClassName"), "ClearTextSecretsProvider"); + Assert.assertEquals(goInstanceConfig.get("secretsProviderConfig"), ""); } @DataProvider(name = "k8sRuntime") diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java index 02d9d00634e3a..31629c87e9971 100644 --- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java +++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java @@ -40,7 +40,7 @@ public String getSecretsProviderClassName(Function.FunctionDetails functionDetai case PYTHON: return "secretsprovider.ClearTextSecretsProvider"; case GO: - return ""; + return "ClearTextSecretsProvider"; default: throw new RuntimeException("Unknown runtime " + functionDetails.getRuntime()); }