Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Add Go support for pulsar function secrets #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pulsar-function-go/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Conf struct {
// Deprecated
AutoACK bool `json:"autoAck" yaml:"autoAck"`
Parallelism int32 `json:"parallelism" yaml:"parallelism"`
// SecretProvider config
SecretsProviderClassName string `json:"secretsProviderClassName" yaml:"secretsProviderClassName"`
SecretsProviderConfig string `json:"secretsProviderConfig" yaml:"secretsProviderConfig"`
//source config
SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"`
TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
Expand Down
25 changes: 19 additions & 6 deletions pulsar-function-go/pf/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,26 @@ import (
// message, what are our operating constraints, etc can be accessed by the
// executing function
type FunctionContext struct {
instanceConf *instanceConf
userConfigs map[string]interface{}
logAppender *LogAppender
outputMessage func(topic string) pulsar.Producer
userMetrics sync.Map
record pulsar.Message
instanceConf *instanceConf
userConfigs map[string]interface{}
secrets map[string]interface{}
logAppender *LogAppender
outputMessage func(topic string) pulsar.Producer
userMetrics sync.Map
record pulsar.Message
secretsProvider SecretsProvider
}

// NewFuncContext returns a new Function context
func NewFuncContext() *FunctionContext {
instanceConf := newInstanceConf()
userConfigs := buildUserConfig(instanceConf.funcDetails.GetUserConfig())
secrets := buildUserConfig(instanceConf.funcDetails.SecretsMap)

fc := &FunctionContext{
instanceConf: instanceConf,
userConfigs: userConfigs,
secrets: secrets,
}
return fc
}
Expand Down Expand Up @@ -155,6 +159,15 @@ func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
return c.userConfigs
}

// GetSecretValue returns the value of a key from the pulsar function's secret
// map
func (c *FunctionContext) GetSecretValue(key string) interface{} {
if c.secretsProvider != nil {
return c.secretsProvider.GetValue(c.secrets, key)
}
return nil
}

// NewOutputMessage send message to the topic @param topicName: The name of the
// topic for output message
func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer {
Expand Down
18 changes: 18 additions & 0 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func (gi *goInstance) startFunction(function function) error {
log.Errorf("setup log appender failed, error is:%v", err)
return err
}
err = gi.setupSecretsProvider()
if err != nil {
log.Errorf("setup secret provider failed, error is:%v", err)
return err
}

idleDuration := getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdle)
idleTimer := time.NewTimer(idleDuration)
Expand Down Expand Up @@ -496,6 +501,19 @@ func (gi *goInstance) addLogTopicHandler() {
}
}

func (gi *goInstance) setupSecretsProvider() error {
switch gi.context.instanceConf.secretsProviderClassName {
case "ClearTextSecretsProvider":
gi.context.secretsProvider = &ClearTextSecretsProvider{}
case "EnvironmentBasedSecretsProvider":
gi.context.secretsProvider = &EnvironmentBasedSecretsProvider{}
default:
return fmt.Errorf("unknown secretsProviderClassName: %s",
gi.context.instanceConf.secretsProviderClassName)
}
return nil
}

func (gi *goInstance) closeLogTopic() {
log.Info("closing log topic...")
if gi.context.logAppender == nil {
Expand Down
14 changes: 9 additions & 5 deletions pulsar-function-go/pf/instanceConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type instanceConf struct {
metricsPort int
authPlugin string
authParams string
secretsProviderClassName string
secretsProviderConfig string
tlsTrustCertsPath string
tlsAllowInsecure bool
tlsHostnameVerification bool
Expand Down Expand Up @@ -118,11 +120,13 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
},
UserConfig: cfg.UserConfig,
},
authPlugin: cfg.ClientAuthenticationPlugin,
authParams: cfg.ClientAuthenticationParameters,
tlsTrustCertsPath: cfg.TLSTrustCertsFilePath,
tlsAllowInsecure: cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
authPlugin: cfg.ClientAuthenticationPlugin,
authParams: cfg.ClientAuthenticationParameters,
secretsProviderClassName: cfg.SecretsProviderClassName,
secretsProviderConfig: cfg.SecretsProviderConfig,
tlsTrustCertsPath: cfg.TLSTrustCertsFilePath,
tlsAllowInsecure: cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
}
// parse the raw function details and ignore the unmarshal error(fallback to original way)
if cfg.FunctionDetails != "" {
Expand Down
35 changes: 35 additions & 0 deletions pulsar-function-go/pf/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package pf
import (
"context"
"fmt"
"os"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -115,3 +116,37 @@ func Test_goInstance_handlerMsg(t *testing.T) {
assert.Equal(t, "output", string(output))
assert.Equal(t, message, fc.record)
}

func Test_goInstance_clearTextSecretsProvider(t *testing.T) {
fc := NewFuncContext()
fc.secrets = map[string]interface{}{
"mySecret": "hello world",
}
fc.instanceConf.secretsProviderClassName = "ClearTextSecretsProvider"
instance := &goInstance{
context: fc,
}

assert.Nil(t, instance.context.secretsProvider)
assert.Nil(t, instance.setupSecretsProvider())
assert.NotNil(t, instance.context.secretsProvider)

assert.Equal(t, fc.GetSecretValue("mySecret"), "hello world")
assert.Nil(t, fc.GetSecretValue("notASecret"))
}

func Test_goInstance_environmentBasedSecretsProvider(t *testing.T) {
fc := NewFuncContext()
fc.instanceConf.secretsProviderClassName = "EnvironmentBasedSecretsProvider"
instance := &goInstance{
context: fc,
}

assert.Nil(t, instance.context.secretsProvider)
assert.Nil(t, instance.setupSecretsProvider())
assert.NotNil(t, instance.context.secretsProvider)

os.Setenv("MY_ENV_VAR", "hello world")
assert.Equal(t, fc.GetSecretValue("MY_ENV_VAR"), "hello world")
assert.Nil(t, fc.GetSecretValue("NOT_AN_ENV_VAR"))
}
52 changes: 52 additions & 0 deletions pulsar-function-go/pf/secretsProvider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package pf

import (
"os"

log "github.com/apache/pulsar/pulsar-function-go/logutil"
)

type SecretsProvider interface {
GetValue(secrets map[string]interface{}, key string) interface{}
}

type ClearTextSecretsProvider struct{}

func (p *ClearTextSecretsProvider) GetValue(secrets map[string]interface{}, key string) interface{} {
val, ok := secrets[key]
if !ok {
log.Debugf("secret key %s not present in function secrets", key)
return nil
}
return val
}

type EnvironmentBasedSecretsProvider struct{}

func (p *EnvironmentBasedSecretsProvider) GetValue(_ map[string]interface{}, key string) interface{} {
val, ok := os.LookupEnv(key)
if !ok {
log.Debugf("secret key %s does not match an environment variable", key)
return nil
}
return val
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class GoInstanceConfig {
private boolean autoAck;
private int parallelism;

private String secretsProviderClassName = "";
private String secretsProviderConfig = "";

private int subscriptionType;
private long timeoutMs;
private String subscriptionName = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
String pulsarServiceUrl,
String stateStorageServiceUrl,
String pulsarWebServiceUrl,
String secretsProviderClassName,
String secretsProviderConfig,
boolean k8sRuntime) throws IOException {
final List<String> args = new LinkedList<>();
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
Expand Down Expand Up @@ -285,6 +287,14 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
goInstanceConfig.setMetricsPort(instanceConfig.getMetricsPort());
}

if (secretsProviderClassName != null) {
goInstanceConfig.setSecretsProviderClassName(secretsProviderClassName);
}

if (secretsProviderConfig != null) {
goInstanceConfig.setSecretsProviderConfig(secretsProviderConfig);
}

goInstanceConfig.setKillAfterIdleMs(0);
goInstanceConfig.setPort(instanceConfig.getPort());

Expand Down Expand Up @@ -329,7 +339,7 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
return getGoInstanceCmd(instanceConfig, authConfig, originalCodeFileName,
pulsarServiceUrl, stateStorageServiceUrl, pulsarWebServiceUrl,
k8sRuntime);
secretsProviderClassName, secretsProviderConfig, k8sRuntime);
}

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setExposePulsarAdminClientEnabled(true);

List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, authConfig, "config", "pulsar://localhost:6650", "bk://localhost:4181", "http://localhost:8080", k8sRuntime);
List<String> commands = RuntimeUtils.getGoInstanceCmd(
instanceConfig, authConfig, "config", "pulsar://localhost:6650", "bk://localhost:4181",
"http://localhost:8080", "ClearTextSecretsProvider", "", k8sRuntime);
if (k8sRuntime) {
goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
} else {
Expand Down Expand Up @@ -174,6 +176,8 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
Assert.assertEquals(goInstanceConfig.get("tlsTrustCertsFilePath"), "/secret/ca.cert.pem");
Assert.assertEquals(goInstanceConfig.get("tlsHostnameVerificationEnable"), true);
Assert.assertEquals(goInstanceConfig.get("tlsAllowInsecureConnection"), false);
Assert.assertEquals(goInstanceConfig.get("secretsProviderClassName"), "ClearTextSecretsProvider");
Assert.assertEquals(goInstanceConfig.get("secretsProviderConfig"), "");
}

@DataProvider(name = "k8sRuntime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public String getSecretsProviderClassName(Function.FunctionDetails functionDetai
case PYTHON:
return "secretsprovider.ClearTextSecretsProvider";
case GO:
return "";
return "ClearTextSecretsProvider";
default:
throw new RuntimeException("Unknown runtime " + functionDetails.getRuntime());
}
Expand Down
Loading