From 0f0a540463630e4db8a9f467dace618014a45834 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Fri, 22 Mar 2024 11:11:25 +0800 Subject: [PATCH 1/4] [improve][fn] Support OAuth2 in Go instance --- pulsar-function-go/pf/instance.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index 1064aece46fe8..bd3c2a19ad655 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -21,6 +21,7 @@ package pf import ( "context" + "encoding/json" "fmt" "math" "strconv" @@ -195,8 +196,9 @@ CLOSE: } const ( - authPluginToken = "org.apache.pulsar.client.impl.auth.AuthenticationToken" - authPluginNone = "" + authPluginToken = "org.apache.pulsar.client.impl.auth.AuthenticationToken" + authPluginOAuth2 = "org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2" + authPluginNone = "" ) func (gi *goInstance) setupClient() error { @@ -221,6 +223,15 @@ func (gi *goInstance) setupClient() error { default: return fmt.Errorf(`unknown token format - expecting "file://" or "token:" prefix`) } + case authPluginOAuth2: + if ic.authParams == "" { + return fmt.Errorf("auth plugin %s given, but authParams is empty", authPluginToken) + } + var authMap map[string]string + if err := json.Unmarshal([]byte(ic.authParams), &authMap); err != nil { + return fmt.Errorf(`unknown auth params format for OAuth2 - expecting a json string of map`) + } + clientOpts.Authentication = pulsar.NewAuthenticationOAuth2(authMap) case authPluginNone: clientOpts.Authentication, _ = pulsar.NewAuthentication("", "") // ret: auth.NewAuthDisabled() default: From 670278c53e2899242c7990bc7636dcb0924b8a5f Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Sun, 7 Apr 2024 10:07:19 +0800 Subject: [PATCH 2/4] Add tests --- .../kubernetes/KubernetesRuntimeTest.java | 7 +- .../runtime/process/ProcessRuntimeTest.java | 89 +++++++++++++++++-- 2 files changed, 88 insertions(+), 8 deletions(-) diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 980f763f7c303..0b498939fbb65 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -952,7 +952,10 @@ InstanceConfig createGolangInstanceConfig() { public void testGolangConstructor() throws Exception { InstanceConfig config = createGolangInstanceConfig(); - factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0); + factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), + null, null, AuthenticationConfig.builder() + .clientAuthenticationPlugin("com.MyAuth") + .clientAuthenticationParameters("{\"authParam1\": \"authParamValue1\"}").build()); verifyGolangInstance(config); } @@ -1014,6 +1017,8 @@ private void verifyGolangInstance(InstanceConfig config) throws Exception { + ".TestSink\",\"topic\":\"container-output\",\"serDeClassName\":\"org.apache.pulsar.functions" + ".runtime.serde.Utf8Serializer\",\"typeClassName\":\"java.lang.String\"},\"resources\":{\"cpu\":1" + ".0,\"ram\":\"1000\",\"disk\":\"10000\"}}"); + assertEquals(goInstanceConfig.get("clientAuthenticationPlugin"), "com.MyAuth"); + assertEquals(goInstanceConfig.get("clientAuthenticationParameters"), "{\"authParam1\": \"authParamValue1\"}"); // check padding and xmx V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), RESOURCES); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index f63f24dc25624..3997d91284e36 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.MoreFiles; import com.google.common.io.RecursiveDeleteOption; import com.google.gson.reflect.TypeToken; @@ -40,6 +41,7 @@ import org.apache.commons.lang3.JavaVersion; import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.ConsumerSpec; @@ -138,11 +140,12 @@ public void tearDown() { } private ProcessRuntimeFactory createProcessRuntimeFactory(String extraDependenciesDir) { - return createProcessRuntimeFactory(extraDependenciesDir, null, false); + return createProcessRuntimeFactory(extraDependenciesDir, null, false, null); } private ProcessRuntimeFactory createProcessRuntimeFactory(String extraDependenciesDir, String webServiceUrl, - boolean exposePulsarAdminClientEnabled) { + boolean exposePulsarAdminClientEnabled, + AuthenticationConfig authConfig) { ProcessRuntimeFactory processRuntimeFactory = new ProcessRuntimeFactory(); WorkerConfig workerConfig = new WorkerConfig(); @@ -164,7 +167,7 @@ private ProcessRuntimeFactory createProcessRuntimeFactory(String extraDependenci workerConfig.setFunctionRuntimeFactoryClassName(ProcessRuntimeFactory.class.getName()); workerConfig.setFunctionRuntimeFactoryConfigs( ObjectMapperFactory.getMapper().getObjectMapper().convertValue(processRuntimeFactoryConfig, Map.class)); - processRuntimeFactory.initialize(workerConfig, null, new TestSecretsProviderConfigurator(), + processRuntimeFactory.initialize(workerConfig, authConfig, new TestSecretsProviderConfigurator(), Mockito.mock(ConnectorsManager.class), Mockito.mock(FunctionsManager.class), Optional.empty(), Optional.empty()); @@ -398,11 +401,83 @@ private void verifyPythonInstance(InstanceConfig config, String extraDepsDir) th assertEquals(String.join(" ", args), expectedArgs); } + @Test + public void testGoConstructor() throws Exception { + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.GO); + + factory = createProcessRuntimeFactory(null, null, false, + AuthenticationConfig.builder() + .clientAuthenticationPlugin("com.MyAuth") + .clientAuthenticationParameters("{\"authParam1\": \"authParamValue1\"}").build()); + + verifyGoInstance(config); + } + + private void verifyGoInstance(InstanceConfig config) throws Exception { + String goExec = "/usr/bin/exec"; + ProcessRuntime container = factory.createContainer(config, goExec, null, null, null,30l); + List args = container.getProcessArgs(); + + int totalArgs = 3; + + assertEquals(args.size(), totalArgs); + assertEquals(args.get(0), goExec); + assertEquals(args.get(1), "-instance-conf"); + String functionDetails = + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails()); + + HashMap goInstanceConfig = new ObjectMapper().readValue(args.get(2), HashMap.class); + assertEquals(goInstanceConfig.get("pulsarServiceURL"), pulsarServiceUrl); + assertEquals(goInstanceConfig.get("stateStorageServiceUrl"), stateStorageServiceUrl); + assertEquals(goInstanceConfig.get("pulsarWebServiceUrl"), ""); + assertEquals(goInstanceConfig.get("instanceID"), config.getInstanceId()); + assertEquals(goInstanceConfig.get("funcID"), config.getFunctionId()); + assertEquals(goInstanceConfig.get("funcVersion"), config.getFunctionVersion()); + assertEquals(goInstanceConfig.get("maxBufTuples"), config.getMaxBufferedTuples()); + assertEquals(goInstanceConfig.get("port"), config.getPort()); + assertEquals(goInstanceConfig.get("clusterName"), config.getClusterName()); + assertEquals(goInstanceConfig.get("killAfterIdleMs"), 0); + assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0); + assertEquals(goInstanceConfig.get("tenant"), TEST_TENANT); + assertEquals(goInstanceConfig.get("nameSpace"), TEST_NAMESPACE); + assertEquals(goInstanceConfig.get("name"), TEST_NAME); + assertEquals(goInstanceConfig.get("className"), ""); + assertEquals(goInstanceConfig.get("logTopic"), TEST_NAME + "-log"); + assertEquals(goInstanceConfig.get("processingGuarantees"), config.getFunctionDetails().getProcessingGuarantees().getNumber()); + assertEquals(goInstanceConfig.get("secretsMap"), config.getFunctionDetails().getSecretsMap()); + assertEquals(goInstanceConfig.get("userConfig"), config.getFunctionDetails().getUserConfig()); + assertEquals(goInstanceConfig.get("clientAuthenticationPlugin"), "com.MyAuth"); + assertEquals(goInstanceConfig.get("clientAuthenticationParameters"), "{\"authParam1\": \"authParamValue1\"}"); + assertEquals(goInstanceConfig.get("tlsTrustCertsFilePath"), ""); + assertEquals(goInstanceConfig.get("tlsHostnameVerificationEnable"), false); + assertEquals(goInstanceConfig.get("tlsAllowInsecureConnection"), false); + assertEquals(goInstanceConfig.get("runtime"), FunctionDetails.Runtime.GO.getNumber()); + assertEquals(goInstanceConfig.get("autoAck"), config.getFunctionDetails().getAutoAck()); + assertEquals(goInstanceConfig.get("parallelism"), config.getFunctionDetails().getParallelism()); + assertEquals(goInstanceConfig.get("subscriptionType"), 0); + assertEquals(goInstanceConfig.get("timeoutMs"), 0); + assertEquals(goInstanceConfig.get("subscriptionName"), config.getFunctionDetails().getSource().getSubscriptionName()); + assertEquals(goInstanceConfig.get("cleanupSubscription"), config.getFunctionDetails().getSource().getCleanupSubscription()); + assertEquals(goInstanceConfig.get("subscriptionPosition"), config.getFunctionDetails().getSource().getSubscriptionPosition().getNumber()); + assertEquals(goInstanceConfig.get("sourceSpecsTopic"), "test_src"); + assertEquals(goInstanceConfig.get("sourceSchemaType"), ""); + assertEquals(goInstanceConfig.get("receiverQueueSize"), 0); + assertEquals(goInstanceConfig.get("sinkSpecsTopic"), "test-function-container-output"); + assertEquals(goInstanceConfig.get("sinkSchemaType"), ""); + assertEquals(goInstanceConfig.get("cpu"), 0.0); + assertEquals(goInstanceConfig.get("ram"), 0); + assertEquals(goInstanceConfig.get("disk"), 0); + assertEquals(goInstanceConfig.get("maxMessageRetries"), 0); + assertEquals(goInstanceConfig.get("deadLetterTopic"), ""); + assertEquals(goInstanceConfig.get("metricsPort"), config.getMetricsPort()); + assertEquals(goInstanceConfig.get("functionDetails"), functionDetails); + } + @Test public void testJavaConstructorWithWebServiceUrlAndExposePulsarAdminClientEnabled() throws Exception { InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true); - factory = createProcessRuntimeFactory(null, defaultWebServiceUrl, true); + factory = createProcessRuntimeFactory(null, null, true, null); verifyJavaInstance(config, null, defaultWebServiceUrl); } @@ -411,7 +486,7 @@ public void testJavaConstructorWithWebServiceUrlAndExposePulsarAdminClientEnable public void testJavaConstructorWithWebServiceUrlAndExposePulsarAdminClientDisabled() throws Exception { InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false); - factory = createProcessRuntimeFactory(null, defaultWebServiceUrl, false); + factory = createProcessRuntimeFactory(null, defaultWebServiceUrl, false, null); verifyJavaInstance(config, null, defaultWebServiceUrl); } @@ -420,7 +495,7 @@ public void testJavaConstructorWithWebServiceUrlAndExposePulsarAdminClientDisabl public void testJavaConstructorWithoutWebServiceUrlAndExposePulsarAdminClientEnabled() throws Exception { InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true); - factory = createProcessRuntimeFactory(null, null, true); + factory = createProcessRuntimeFactory(null, null, true, null); verifyJavaInstance(config, null, null); } @@ -429,7 +504,7 @@ public void testJavaConstructorWithoutWebServiceUrlAndExposePulsarAdminClientEna public void testJavaConstructorWithoutWebServiceUrlAndExposePulsarAdminClientDisabled() throws Exception { InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false); - factory = createProcessRuntimeFactory(null, null, false); + factory = createProcessRuntimeFactory(null, null, false, null); verifyJavaInstance(config, null, null); } From c50df84347e796f2be08485e6d26d40e337e1a94 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Sun, 7 Apr 2024 10:09:08 +0800 Subject: [PATCH 3/4] Fix log --- pulsar-function-go/pf/instance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index bd3c2a19ad655..8efd9412a7214 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -225,7 +225,7 @@ func (gi *goInstance) setupClient() error { } case authPluginOAuth2: if ic.authParams == "" { - return fmt.Errorf("auth plugin %s given, but authParams is empty", authPluginToken) + return fmt.Errorf("auth plugin %s given, but authParams is empty", authPluginOAuth2) } var authMap map[string]string if err := json.Unmarshal([]byte(ic.authParams), &authMap); err != nil { From 4c004a9695bc462a7eb18ebf22a2dfc27e4bdbdc Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Sun, 7 Apr 2024 10:15:36 +0800 Subject: [PATCH 4/4] Add test --- pulsar-function-go/pf/instanceConf_test.go | 24 ++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go index cc5f46e2fe12b..ca0ab57e780d7 100644 --- a/pulsar-function-go/pf/instanceConf_test.go +++ b/pulsar-function-go/pf/instanceConf_test.go @@ -20,6 +20,7 @@ package pf import ( + "encoding/json" "fmt" "testing" @@ -320,3 +321,26 @@ func TestInstanceConf_WithEmptyOrInvalidDetails(t *testing.T) { }) } } + +func TestInstanceConf_WithOAuth2Parameters(t *testing.T) { + cfg := &cfg.Conf{ + AutoACK: true, + ClientAuthenticationPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationOAuth2", + ClientAuthenticationParameters: `{"issuerUrl":"https://test.com","audience":"test-audience", +"scope":"test-scope","privateKey":"/mnt/secrets/auth.json","type":"client_credentials","clientId":"test-client-id"}`, + } + + instanceConf := newInstanceConfWithConf(cfg) + assert.Equal(t, cfg.ClientAuthenticationPlugin, instanceConf.authPlugin) + var authMap map[string]string + err := json.Unmarshal([]byte(instanceConf.authParams), &authMap) + assert.Equal(t, err, nil) + assert.Equal(t, authMap, map[string]string{ + "issuerUrl": "https://test.com", + "audience": "test-audience", + "scope": "test-scope", + "privateKey": "/mnt/secrets/auth.json", + "type": "client_credentials", + "clientId": "test-client-id", + }) +}