Skip to content

Commit

Permalink
[feat]PIP 167: Make it Configurable to Require Subscription Permissio…
Browse files Browse the repository at this point in the history
…n for Consumer (#246)
  • Loading branch information
michaeljmarshall authored Apr 11, 2024
1 parent c3db468 commit 5ae9877
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 5 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,12 @@ authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorization
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false

# If a namespace has no roles configured in the subscription permission for a given subscription name,
# allow all roles that have permission to consume a the topic to consume from the subscription.
# See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
# permission.
grantImplicitPermissionOnSubscription=true

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=
Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,12 @@ authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorization
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false

# If a namespace has no roles configured in the subscription permission for a given subscription name,
# allow all roles that have permission to consume a the topic to consume from the subscription.
# See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
# permission.
grantImplicitPermissionOnSubscription=true

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=
Expand Down
6 changes: 6 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorization
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false

# If a namespace has no roles configured in the subscription permission for a given subscription name,
# allow all roles that have permission to consume a the topic to consume from the subscription.
# See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
# permission.
grantImplicitPermissionOnSubscription=true

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,16 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
+ " or last position eg: *.pulsar.service, pulsar.service.*)")
private boolean authorizationAllowWildcardsMatching = false;

@FieldContext(
category = CATEGORY_AUTHORIZATION,
doc = """
If a namespace has no roles configured in the subscription permission for a given subscription name,
allow all roles that have permission to consume a the topic to consume from the subscription.
See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
permission.
""")
private boolean grantImplicitPermissionOnSubscription = true;

@FieldContext(
category = CATEGORY_AUTHORIZATION,
doc = "When this parameter is not empty, unauthenticated users perform as anonymousUserRole"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,18 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
}
} else {
if (isNotBlank(subscription)) {
// validate if role is authorized to access subscription. (skip validation if authorization
// list is empty)
// Reject request if role is unauthorized to access subscription.
// If isGrantImplicitPermissionOnSubscription is true, role must be in the set of roles.
// Otherwise, set of roles must be null or empty, or role must be in set of roles.
Set<String> roles = policies.get().auth_policies
.getSubscriptionAuthentication().get(subscription);
if (roles != null && !roles.isEmpty() && !roles.contains(role)) {
boolean isUnauthorized;
if (roles == null || roles.isEmpty()) {
isUnauthorized = !conf.isGrantImplicitPermissionOnSubscription();
} else {
isUnauthorized = !roles.contains(role);
}
if (isUnauthorized) {
log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription);
return CompletableFuture.completedFuture(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,83 @@ public void testSubscriberPermission() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testSubscriberPermissionRequired() throws Exception {
log.info("-- Starting {} test --", methodName);

// Simplify test by skipping configuration of topic level policies
conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
conf.setGrantImplicitPermissionOnSubscription(false);
setup();

final String tenantRole = "tenant-role";
final String subscriptionRole = "sub-role";
final String subscriptionName = "sub";
final String namespace = "my-property/ns-sub-auth-req";
final String topicName = "persistent://" + namespace + "/my-topic";
Authentication adminAuthentication = new ClientAuthentication("superUser");

clientAuthProviderSupportedRoles.add(subscriptionRole);

@Cleanup
PulsarAdmin superAdmin = spy(
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());

Authentication tenantAdminAuthentication = new ClientAuthentication(tenantRole);
@Cleanup
PulsarAdmin tenantAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(tenantAdminAuthentication).build());

Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
@Cleanup
PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(subAdminAuthentication).build());

Authentication authentication = new ClientAuthentication(subscriptionRole);

superAdmin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());

// Initialize cluster and create namespace and topic
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
tenantAdmin.topics().createNonPartitionedTopic(topicName);
tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
Collections.singleton(AuthAction.consume));
assertNull(superAdmin.namespaces().getPublishRate(namespace));
replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(authentication));

// Cluster is initialized; the subscriptionRole has permission consume on the topic, but doesn't have
// explicit subscription permission. Verify that several operations which rely on subscription permission fail.
try {
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Unauthorized to validateTopicOperation for operation [RESET_CURSOR]"));
}
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Client is not authorized to subscribe"), e.getMessage());
}

// Grant the role permission.
tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Set.of(subscriptionRole));

// Verify the role now has permission to consume (reset cursor second to avoid 404 on subscription)
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testClearBacklogPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,18 @@ public void test() throws Exception {
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertFalse(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, null));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,null));
// Include a subscription name. The subscription doesn't need to exist for the purpose of this test, but this
// tests the case when service.getConfig().isGrantImplicitPermissionOnSubscription() is true because we
// have not granted permission for this role on the subscription named "sub".
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,"sub"));

// Grant permission to a different role for sub and expect failure
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub", Set.of("no-ones-role"));
// Even though other-role has permission to consume from the topic, the "sub" subscription is locked down and
// only roles with permission granted via grantPermissionOnSubscription have permission to consume from that
// subscription.
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));

assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.proxy;

import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
* Class that initializes the WebSocketService disabling {@link WebSocketProxyConfiguration#setGrantImplicitPermissionOnSubscription(boolean)}.
* We must have this class on its own because the WebSocketProxyConfiguration is converted to the ServiceConfiguration
* on start up, so it is not a dynamic property that we can change after the service has started.
*/

@Test(groups = "websocket")
public class ProxyAuthorizationWithoutImplicitPermissionOnSubscriptionTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
private WebSocketService service;
private final String configClusterName = "c1";

@BeforeClass
@Override
protected void setup() throws Exception {
conf.setClusterName(configClusterName);
internalSetup();

WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
Set<String> superUser = Sets.newHashSet("");
config.setAuthorizationEnabled(true);
config.setSuperUserRoles(superUser);
config.setClusterName("c1");
config.setWebServicePort(Optional.of(0));
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
config.setGrantImplicitPermissionOnSubscription(false);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();
}

@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
if (service != null) {
service.close();
}
log.info("Finished Cleaning Up Test setup");
}


@Test
public void testAuthorizationServiceDirectly() throws Exception {
AuthorizationService auth = service.getAuthorizationService();

assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.clusters().createCluster(configClusterName, ClusterData.builder().build());
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
waitForChange();

assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.of(AuthAction.produce));
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.topics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
EnumSet.of(AuthAction.consume));
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertFalse(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));

// Expect false because we disabled the implicit permission on subscription
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,"sub"));

// Grant permission
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub", Set.of("other-role"));

// Expect only true for "other-role" because we granted permission for only that one
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,"sub"));


assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null));

admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.allOf(AuthAction.class));
waitForChange();

assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null));

admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
}

private static void waitForChange() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
+ "presents at first or last position. For example: *.pulsar.service,pulsar.service.*)")
private boolean authorizationAllowWildcardsMatching = false;

@FieldContext(
doc = """
If a namespace has no roles configured in the subscription permission for a given subscription name,
allow all roles that have permission to consume a the topic to consume from the subscription.
See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
permission.
""")
private boolean grantImplicitPermissionOnSubscription = true;

@FieldContext(doc = "Proxy authentication settings used to connect to brokers")
private String brokerClientAuthenticationPlugin;

Expand Down

0 comments on commit 5ae9877

Please sign in to comment.