diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index fe69517f07c31..0a7caad4c1f2b 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -134,6 +134,12 @@ Map The user-specified annotations that are set to the internal Service. The value should be in the form of a1:v1,a2:v2 + +
kubernetes.internal-service.labels
+ (none) + Map + The user-specified labels that are set to the internal Service. The value should be in the form of a1:v1,a2:v2 +
kubernetes.jobmanager.annotations
(none) @@ -242,6 +248,12 @@

Enum

The exposed type of the rest service. The exposed rest service could be used to access the Flinkā€™s Web UI and REST endpoint.

Possible values: + +
kubernetes.rest-service.labels
+ (none) + Map + The user-specified labels that are set to the rest Service. The value should be in the form of a1:v1,a2:v2 +
kubernetes.secrets
(none) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 271e1479c2183..5180b199160b9 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -377,6 +377,14 @@ public class KubernetesConfigOptions { "The user-specified annotations that are set to the rest Service. The value should be " + "in the form of a1:v1,a2:v2"); + public static final ConfigOption> REST_SERVICE_LABELS = + key("kubernetes.rest-service.labels") + .mapType() + .noDefaultValue() + .withDescription( + "The user-specified labels that are set to the rest Service. The value should be " + + "in the form of a1:v1,a2:v2"); + public static final ConfigOption> INTERNAL_SERVICE_ANNOTATIONS = key("kubernetes.internal-service.annotations") .mapType() @@ -385,6 +393,14 @@ public class KubernetesConfigOptions { "The user-specified annotations that are set to the internal Service. The value should be " + "in the form of a1:v1,a2:v2"); + public static final ConfigOption> INTERNAL_SERVICE_LABELS = + key("kubernetes.internal-service.labels") + .mapType() + .noDefaultValue() + .withDescription( + "The user-specified labels that are set to the internal Service. The value should be " + + "in the form of a1:v1,a2:v2"); + /** * Defines the configuration key of that external resource in Kubernetes. This is used as a * suffix in an actual config. diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java index 81bfde80bfe85..2b7b4575ebc00 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java @@ -118,12 +118,24 @@ public Map getRestServiceAnnotations() { .orElse(Collections.emptyMap()); } + public Map getRestServiceLabels() { + return flinkConfig + .getOptional(KubernetesConfigOptions.REST_SERVICE_LABELS) + .orElse(Collections.emptyMap()); + } + public Map getInternalServiceAnnotations() { return flinkConfig .getOptional(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS) .orElse(Collections.emptyMap()); } + public Map getInternalServiceLabels() { + return flinkConfig + .getOptional(KubernetesConfigOptions.INTERNAL_SERVICE_LABELS) + .orElse(Collections.emptyMap()); + } + public int getJobManagerMemoryMB() { return clusterSpecification.getMasterMemoryMB(); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java index 675ffb87e1c19..b9163d5e9a601 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java @@ -50,6 +50,7 @@ public Service buildUpInternalService( .withNewMetadata() .withName(serviceName) .withLabels(kubernetesJobManagerParameters.getCommonLabels()) + .addToLabels(kubernetesJobManagerParameters.getInternalServiceLabels()) .withAnnotations(kubernetesJobManagerParameters.getInternalServiceAnnotations()) .endMetadata() .withNewSpec() diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java index 7952796b8d68f..ee14b8b4af3fb 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java @@ -53,6 +53,7 @@ public Service buildUpExternalRestService( .withNewMetadata() .withName(serviceName) .withLabels(kubernetesJobManagerParameters.getCommonLabels()) + .addToLabels(kubernetesJobManagerParameters.getRestServiceLabels()) .withAnnotations(kubernetesJobManagerParameters.getRestServiceAnnotations()) .endMetadata() .withNewSpec() diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java index eba124773fcef..4f084cb598e3a 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java @@ -64,6 +64,7 @@ protected void setupFlinkConfig() { this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, userLabels); this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, userAnnotations); this.flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_ANNOTATIONS, userAnnotations); + this.flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_LABELS, userLabels); this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, nodeSelector); this.flinkConfig.set( JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY)); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java index a3bc516f62ac5..15e3859955db1 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java @@ -50,12 +50,21 @@ class ExternalServiceDecoratorTest extends KubernetesJobManagerTestBase { } }; + private Map customizedLabels = + new HashMap() { + { + put("label1", "label-value1"); + put("label2", "label-value2"); + } + }; + @Override protected void onSetup() throws Exception { super.onSetup(); this.flinkConfig.set( KubernetesConfigOptions.REST_SERVICE_ANNOTATIONS, customizedAnnotations); + this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_LABELS, customizedLabels); this.externalServiceDecorator = new ExternalServiceDecorator(this.kubernetesJobManagerParameters); } @@ -74,6 +83,7 @@ void testBuildAccompanyingKubernetesResources() throws IOException { .isEqualTo(ExternalServiceDecorator.getExternalServiceName(CLUSTER_ID)); final Map expectedLabels = getCommonLabels(); + expectedLabels.putAll(customizedLabels); assertThat(restService.getMetadata().getLabels()).isEqualTo(expectedLabels); assertThat(restService.getSpec().getType()) @@ -88,8 +98,9 @@ void testBuildAccompanyingKubernetesResources() throws IOException { .build()); assertThat(restService.getSpec().getPorts()).isEqualTo(expectedServicePorts); - expectedLabels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER); - assertThat(restService.getSpec().getSelector()).isEqualTo(expectedLabels); + final Map expectedSelectors = getCommonLabels(); + expectedSelectors.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER); + assertThat(restService.getSpec().getSelector()).isEqualTo(expectedSelectors); final Map resultAnnotations = restService.getMetadata().getAnnotations(); assertThat(resultAnnotations).isEqualTo(customizedAnnotations); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java index 92f91648290d2..2b77f533e353b 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java @@ -67,6 +67,7 @@ void testBuildAccompanyingKubernetesResources() throws IOException { .isEqualTo(InternalServiceDecorator.getInternalServiceName(CLUSTER_ID)); final Map expectedLabels = getCommonLabels(); + expectedLabels.putAll(userLabels); assertThat(internalService.getMetadata().getLabels()).isEqualTo(expectedLabels); assertThat(internalService.getMetadata().getAnnotations()).isEqualTo(userAnnotations); @@ -86,8 +87,9 @@ void testBuildAccompanyingKubernetesResources() throws IOException { .build()); assertThat(internalService.getSpec().getPorts()).isEqualTo(expectedServicePorts); - expectedLabels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER); - assertThat(internalService.getSpec().getSelector()).isEqualTo(expectedLabels); + final Map expectedSelectors = getCommonLabels(); + expectedSelectors.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER); + assertThat(internalService.getSpec().getSelector()).isEqualTo(expectedSelectors); } @Test diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java index e9b6aafadb970..e5f6458b8cec7 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java @@ -276,7 +276,7 @@ void testServices() throws IOException { assertThat(restServiceCandidates).hasSize(1); final Service resultInternalService = internalServiceCandidates.get(0); - assertThat(resultInternalService.getMetadata().getLabels()).hasSize(2); + assertThat(resultInternalService.getMetadata().getLabels()).hasSize(4); assertThat(resultInternalService.getSpec().getType()).isNull(); assertThat(resultInternalService.getSpec().getClusterIP()) diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java index 8883d0b94ee1b..4704f4aeff2e8 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java @@ -109,6 +109,20 @@ void testGetRestServiceAnnotations() { assertThat(resultAnnotations).isEqualTo(expectedAnnotations); } + @Test + void testGetRestServiceLabels() { + final Map expectedLabels = new HashMap<>(); + expectedLabels.put("a1", "v1"); + expectedLabels.put("a2", "v2"); + + flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_LABELS, expectedLabels); + + final Map resultLabels = + kubernetesJobManagerParameters.getRestServiceLabels(); + + assertThat(resultLabels).isEqualTo(expectedLabels); + } + @Test void testGetInternalServiceAnnotations() { final Map expectedAnnotations = new HashMap<>(); @@ -123,6 +137,20 @@ void testGetInternalServiceAnnotations() { assertThat(resultAnnotations).isEqualTo(expectedAnnotations); } + @Test + void testGetInternalServiceLabels() { + final Map expectedLabels = new HashMap<>(); + expectedLabels.put("a1", "v1"); + expectedLabels.put("a2", "v2"); + + flinkConfig.set(KubernetesConfigOptions.INTERNAL_SERVICE_LABELS, expectedLabels); + + final Map resultLabels = + kubernetesJobManagerParameters.getInternalServiceLabels(); + + assertThat(resultLabels).isEqualTo(expectedLabels); + } + @Test void testGetJobManagerMemoryMB() { assertThat(kubernetesJobManagerParameters.getJobManagerMemoryMB())