Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Support OAuth2 in Go instance #22323

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package pf

import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
Expand Down Expand Up @@ -195,8 +196,9 @@ CLOSE:
}

const (
authPluginToken = "org.apache.pulsar.client.impl.auth.AuthenticationToken"
authPluginNone = ""
authPluginToken = "org.apache.pulsar.client.impl.auth.AuthenticationToken"
authPluginOAuth2 = "org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2"
authPluginNone = ""
)

func (gi *goInstance) setupClient() error {
Expand All @@ -221,6 +223,15 @@ func (gi *goInstance) setupClient() error {
default:
return fmt.Errorf(`unknown token format - expecting "file://" or "token:" prefix`)
}
case authPluginOAuth2:
if ic.authParams == "" {
return fmt.Errorf("auth plugin %s given, but authParams is empty", authPluginOAuth2)
}
var authMap map[string]string
if err := json.Unmarshal([]byte(ic.authParams), &authMap); err != nil {
return fmt.Errorf(`unknown auth params format for OAuth2 - expecting a json string of map`)
}
clientOpts.Authentication = pulsar.NewAuthenticationOAuth2(authMap)
case authPluginNone:
clientOpts.Authentication, _ = pulsar.NewAuthentication("", "") // ret: auth.NewAuthDisabled()
default:
Expand Down
24 changes: 24 additions & 0 deletions pulsar-function-go/pf/instanceConf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package pf

import (
"encoding/json"
"fmt"
"testing"

Expand Down Expand Up @@ -320,3 +321,26 @@ func TestInstanceConf_WithEmptyOrInvalidDetails(t *testing.T) {
})
}
}

func TestInstanceConf_WithOAuth2Parameters(t *testing.T) {
cfg := &cfg.Conf{
AutoACK: true,
ClientAuthenticationPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationOAuth2",
ClientAuthenticationParameters: `{"issuerUrl":"https://test.com","audience":"test-audience",
"scope":"test-scope","privateKey":"/mnt/secrets/auth.json","type":"client_credentials","clientId":"test-client-id"}`,
}

instanceConf := newInstanceConfWithConf(cfg)
assert.Equal(t, cfg.ClientAuthenticationPlugin, instanceConf.authPlugin)
var authMap map[string]string
err := json.Unmarshal([]byte(instanceConf.authParams), &authMap)
assert.Equal(t, err, nil)
assert.Equal(t, authMap, map[string]string{
"issuerUrl": "https://test.com",
"audience": "test-audience",
"scope": "test-scope",
"privateKey": "/mnt/secrets/auth.json",
"type": "client_credentials",
"clientId": "test-client-id",
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,10 @@ InstanceConfig createGolangInstanceConfig() {
public void testGolangConstructor() throws Exception {
InstanceConfig config = createGolangInstanceConfig();

factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(),
null, null, AuthenticationConfig.builder()
.clientAuthenticationPlugin("com.MyAuth")
.clientAuthenticationParameters("{\"authParam1\": \"authParamValue1\"}").build());

verifyGolangInstance(config);
}
Expand Down Expand Up @@ -1014,6 +1017,8 @@ private void verifyGolangInstance(InstanceConfig config) throws Exception {
+ ".TestSink\",\"topic\":\"container-output\",\"serDeClassName\":\"org.apache.pulsar.functions"
+ ".runtime.serde.Utf8Serializer\",\"typeClassName\":\"java.lang.String\"},\"resources\":{\"cpu\":1"
+ ".0,\"ram\":\"1000\",\"disk\":\"10000\"}}");
assertEquals(goInstanceConfig.get("clientAuthenticationPlugin"), "com.MyAuth");
assertEquals(goInstanceConfig.get("clientAuthenticationParameters"), "{\"authParam1\": \"authParamValue1\"}");

// check padding and xmx
V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), RESOURCES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import com.google.gson.reflect.TypeToken;
Expand All @@ -40,6 +41,7 @@
import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
Expand Down Expand Up @@ -138,11 +140,12 @@ public void tearDown() {
}

private ProcessRuntimeFactory createProcessRuntimeFactory(String extraDependenciesDir) {
return createProcessRuntimeFactory(extraDependenciesDir, null, false);
return createProcessRuntimeFactory(extraDependenciesDir, null, false, null);
}

private ProcessRuntimeFactory createProcessRuntimeFactory(String extraDependenciesDir, String webServiceUrl,
boolean exposePulsarAdminClientEnabled) {
boolean exposePulsarAdminClientEnabled,
AuthenticationConfig authConfig) {
ProcessRuntimeFactory processRuntimeFactory = new ProcessRuntimeFactory();

WorkerConfig workerConfig = new WorkerConfig();
Expand All @@ -164,7 +167,7 @@ private ProcessRuntimeFactory createProcessRuntimeFactory(String extraDependenci
workerConfig.setFunctionRuntimeFactoryClassName(ProcessRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(processRuntimeFactoryConfig, Map.class));
processRuntimeFactory.initialize(workerConfig, null, new TestSecretsProviderConfigurator(),
processRuntimeFactory.initialize(workerConfig, authConfig, new TestSecretsProviderConfigurator(),
Mockito.mock(ConnectorsManager.class), Mockito.mock(FunctionsManager.class), Optional.empty(),
Optional.empty());

Expand Down Expand Up @@ -398,11 +401,83 @@ private void verifyPythonInstance(InstanceConfig config, String extraDepsDir) th
assertEquals(String.join(" ", args), expectedArgs);
}

@Test
public void testGoConstructor() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.GO);

factory = createProcessRuntimeFactory(null, null, false,
AuthenticationConfig.builder()
.clientAuthenticationPlugin("com.MyAuth")
.clientAuthenticationParameters("{\"authParam1\": \"authParamValue1\"}").build());

verifyGoInstance(config);
}

private void verifyGoInstance(InstanceConfig config) throws Exception {
String goExec = "/usr/bin/exec";
ProcessRuntime container = factory.createContainer(config, goExec, null, null, null,30l);
List<String> args = container.getProcessArgs();

int totalArgs = 3;

assertEquals(args.size(), totalArgs);
assertEquals(args.get(0), goExec);
assertEquals(args.get(1), "-instance-conf");
String functionDetails =
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails());

HashMap goInstanceConfig = new ObjectMapper().readValue(args.get(2), HashMap.class);
assertEquals(goInstanceConfig.get("pulsarServiceURL"), pulsarServiceUrl);
assertEquals(goInstanceConfig.get("stateStorageServiceUrl"), stateStorageServiceUrl);
assertEquals(goInstanceConfig.get("pulsarWebServiceUrl"), "");
assertEquals(goInstanceConfig.get("instanceID"), config.getInstanceId());
assertEquals(goInstanceConfig.get("funcID"), config.getFunctionId());
assertEquals(goInstanceConfig.get("funcVersion"), config.getFunctionVersion());
assertEquals(goInstanceConfig.get("maxBufTuples"), config.getMaxBufferedTuples());
assertEquals(goInstanceConfig.get("port"), config.getPort());
assertEquals(goInstanceConfig.get("clusterName"), config.getClusterName());
assertEquals(goInstanceConfig.get("killAfterIdleMs"), 0);
assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
assertEquals(goInstanceConfig.get("tenant"), TEST_TENANT);
assertEquals(goInstanceConfig.get("nameSpace"), TEST_NAMESPACE);
assertEquals(goInstanceConfig.get("name"), TEST_NAME);
assertEquals(goInstanceConfig.get("className"), "");
assertEquals(goInstanceConfig.get("logTopic"), TEST_NAME + "-log");
assertEquals(goInstanceConfig.get("processingGuarantees"), config.getFunctionDetails().getProcessingGuarantees().getNumber());
assertEquals(goInstanceConfig.get("secretsMap"), config.getFunctionDetails().getSecretsMap());
assertEquals(goInstanceConfig.get("userConfig"), config.getFunctionDetails().getUserConfig());
assertEquals(goInstanceConfig.get("clientAuthenticationPlugin"), "com.MyAuth");
assertEquals(goInstanceConfig.get("clientAuthenticationParameters"), "{\"authParam1\": \"authParamValue1\"}");
assertEquals(goInstanceConfig.get("tlsTrustCertsFilePath"), "");
assertEquals(goInstanceConfig.get("tlsHostnameVerificationEnable"), false);
assertEquals(goInstanceConfig.get("tlsAllowInsecureConnection"), false);
assertEquals(goInstanceConfig.get("runtime"), FunctionDetails.Runtime.GO.getNumber());
assertEquals(goInstanceConfig.get("autoAck"), config.getFunctionDetails().getAutoAck());
assertEquals(goInstanceConfig.get("parallelism"), config.getFunctionDetails().getParallelism());
assertEquals(goInstanceConfig.get("subscriptionType"), 0);
assertEquals(goInstanceConfig.get("timeoutMs"), 0);
assertEquals(goInstanceConfig.get("subscriptionName"), config.getFunctionDetails().getSource().getSubscriptionName());
assertEquals(goInstanceConfig.get("cleanupSubscription"), config.getFunctionDetails().getSource().getCleanupSubscription());
assertEquals(goInstanceConfig.get("subscriptionPosition"), config.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());
assertEquals(goInstanceConfig.get("sourceSpecsTopic"), "test_src");
assertEquals(goInstanceConfig.get("sourceSchemaType"), "");
assertEquals(goInstanceConfig.get("receiverQueueSize"), 0);
assertEquals(goInstanceConfig.get("sinkSpecsTopic"), "test-function-container-output");
assertEquals(goInstanceConfig.get("sinkSchemaType"), "");
assertEquals(goInstanceConfig.get("cpu"), 0.0);
assertEquals(goInstanceConfig.get("ram"), 0);
assertEquals(goInstanceConfig.get("disk"), 0);
assertEquals(goInstanceConfig.get("maxMessageRetries"), 0);
assertEquals(goInstanceConfig.get("deadLetterTopic"), "");
assertEquals(goInstanceConfig.get("metricsPort"), config.getMetricsPort());
assertEquals(goInstanceConfig.get("functionDetails"), functionDetails);
}

@Test
public void testJavaConstructorWithWebServiceUrlAndExposePulsarAdminClientEnabled() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);

factory = createProcessRuntimeFactory(null, defaultWebServiceUrl, true);
factory = createProcessRuntimeFactory(null, null, true, null);

verifyJavaInstance(config, null, defaultWebServiceUrl);
}
Expand All @@ -411,7 +486,7 @@ public void testJavaConstructorWithWebServiceUrlAndExposePulsarAdminClientEnable
public void testJavaConstructorWithWebServiceUrlAndExposePulsarAdminClientDisabled() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);

factory = createProcessRuntimeFactory(null, defaultWebServiceUrl, false);
factory = createProcessRuntimeFactory(null, defaultWebServiceUrl, false, null);

verifyJavaInstance(config, null, defaultWebServiceUrl);
}
Expand All @@ -420,7 +495,7 @@ public void testJavaConstructorWithWebServiceUrlAndExposePulsarAdminClientDisabl
public void testJavaConstructorWithoutWebServiceUrlAndExposePulsarAdminClientEnabled() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);

factory = createProcessRuntimeFactory(null, null, true);
factory = createProcessRuntimeFactory(null, null, true, null);

verifyJavaInstance(config, null, null);
}
Expand All @@ -429,7 +504,7 @@ public void testJavaConstructorWithoutWebServiceUrlAndExposePulsarAdminClientEna
public void testJavaConstructorWithoutWebServiceUrlAndExposePulsarAdminClientDisabled() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);

factory = createProcessRuntimeFactory(null, null, false);
factory = createProcessRuntimeFactory(null, null, false, null);

verifyJavaInstance(config, null, null);
}
Expand Down
Loading