From 7aeb4ade49862f3b654c7df0dc5bbc812106a11f Mon Sep 17 00:00:00 2001 From: oteffahi <70609372+oteffahi@users.noreply.github.com> Date: Wed, 24 Jul 2024 15:44:43 +0200 Subject: [PATCH] Add publisher delete and queryable reply messages to ACL (#1259) * Expose reply key_expr to interceptors * Add reply message to ACL logic and config * Update DEFAULT_CONFIG * Update ACL get/queryable tests, add reply tests * Improve reply matching * Specify all existing message types in ACL interceptor matching * Add reply to authentication qbl tests configs * Add delete message to ACL logic and config * Add delete message to DEFAULT_CONFIG, format messages * Add delete message to ACL pub/sub tests * Fix clippy errors * Reorder message matching * Revert "Expose reply key_expr to interceptors", use wire_expr for ACL filtering of reply messages This reverts commit 3a78d5da5b98bfbc18cbbee6c359e61fed8f6827. * Revert key_expr parsing change to reply ingress Ingress reply messages are not affected by the unimplemented key_expr in routing/dispatcher/queries.rs --- DEFAULT_CONFIG.json5 | 6 +- commons/zenoh-config/src/lib.rs | 2 + .../net/routing/interceptor/access_control.rs | 113 ++++++- .../net/routing/interceptor/authorization.rs | 6 + zenoh/tests/acl.rs | 276 ++++++++++++++++-- zenoh/tests/authentication.rs | 18 +- 6 files changed, 372 insertions(+), 49 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 33c9b3acdd..2dfb5b2fff 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -199,7 +199,8 @@ // /// Id has to be unique within the rule set // "id": "rule1", // "messages": [ - // "put", "query", "declare_subscriber", "declare_queryable" + // "put", "delete", "declare_subscriber", + // "query", "reply", "declare_queryable", // ], // "flows":["egress","ingress"], // "permission": "allow", @@ -210,7 +211,8 @@ // { // "id": "rule2", // "messages": [ - // "put", "query", "declare_subscriber", "declare_queryable" + // "put", "delete", "declare_subscriber", + // "query", "reply", "declare_queryable", // ], // "flows":["ingress"], // "permission": "allow", diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 270cf950c3..f5fc01aa63 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -166,9 +166,11 @@ pub struct PolicyRule { #[serde(rename_all = "snake_case")] pub enum AclMessage { Put, + Delete, DeclareSubscriber, Query, DeclareQueryable, + Reply, } #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)] diff --git a/zenoh/src/net/routing/interceptor/access_control.rs b/zenoh/src/net/routing/interceptor/access_control.rs index 9e749e1258..6af064a878 100644 --- a/zenoh/src/net/routing/interceptor/access_control.rs +++ b/zenoh/src/net/routing/interceptor/access_control.rs @@ -26,7 +26,7 @@ use zenoh_config::{ }; use zenoh_protocol::{ core::ZenohIdProto, - network::{Declare, DeclareBody, NetworkBody, NetworkMessage, Push, Request}, + network::{Declare, DeclareBody, NetworkBody, NetworkMessage, Push, Request, Response}, zenoh::{PushBody, RequestBody}, }; use zenoh_result::ZResult; @@ -235,6 +235,21 @@ impl InterceptorTrait for IngressAclEnforcer { .or_else(|| ctx.full_expr()); match &ctx.msg.body { + NetworkBody::Request(Request { + payload: RequestBody::Query(_), + .. + }) => { + if self.action(AclMessage::Query, "Query (ingress)", key_expr?) == Permission::Deny + { + return None; + } + } + NetworkBody::Response(Response { .. }) => { + if self.action(AclMessage::Reply, "Reply (ingress)", key_expr?) == Permission::Deny + { + return None; + } + } NetworkBody::Push(Push { payload: PushBody::Put(_), .. @@ -243,11 +258,12 @@ impl InterceptorTrait for IngressAclEnforcer { return None; } } - NetworkBody::Request(Request { - payload: RequestBody::Query(_), + NetworkBody::Push(Push { + payload: PushBody::Del(_), .. }) => { - if self.action(AclMessage::Query, "Query (ingress)", key_expr?) == Permission::Deny + if self.action(AclMessage::Delete, "Delete (ingress)", key_expr?) + == Permission::Deny { return None; } @@ -278,7 +294,38 @@ impl InterceptorTrait for IngressAclEnforcer { return None; } } - _ => {} + // Unfiltered Declare messages + NetworkBody::Declare(Declare { + body: DeclareBody::DeclareKeyExpr(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::DeclareFinal(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::DeclareToken(_), + .. + }) => {} + // Unfiltered Undeclare messages + NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareKeyExpr(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareToken(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareQueryable(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareSubscriber(_), + .. + }) => {} + // Unfiltered remaining message types + NetworkBody::Interest(_) | NetworkBody::OAM(_) | NetworkBody::ResponseFinal(_) => {} } Some(ctx) } @@ -305,6 +352,22 @@ impl InterceptorTrait for EgressAclEnforcer { .or_else(|| ctx.full_expr()); match &ctx.msg.body { + NetworkBody::Request(Request { + payload: RequestBody::Query(_), + .. + }) => { + if self.action(AclMessage::Query, "Query (egress)", key_expr?) == Permission::Deny { + return None; + } + } + NetworkBody::Response(Response { wire_expr, .. }) => { + // @TODO: Remove wire_expr usage when issue #1255 is implemented + if self.action(AclMessage::Reply, "Reply (egress)", wire_expr.as_str()) + == Permission::Deny + { + return None; + } + } NetworkBody::Push(Push { payload: PushBody::Put(_), .. @@ -313,11 +376,12 @@ impl InterceptorTrait for EgressAclEnforcer { return None; } } - NetworkBody::Request(Request { - payload: RequestBody::Query(_), + NetworkBody::Push(Push { + payload: PushBody::Del(_), .. }) => { - if self.action(AclMessage::Query, "Query (egress)", key_expr?) == Permission::Deny { + if self.action(AclMessage::Delete, "Delete (egress)", key_expr?) == Permission::Deny + { return None; } } @@ -347,7 +411,38 @@ impl InterceptorTrait for EgressAclEnforcer { return None; } } - _ => {} + // Unfiltered Declare messages + NetworkBody::Declare(Declare { + body: DeclareBody::DeclareKeyExpr(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::DeclareFinal(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::DeclareToken(_), + .. + }) => {} + // Unfiltered Undeclare messages + NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareKeyExpr(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareToken(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareQueryable(_), + .. + }) + | NetworkBody::Declare(Declare { + body: DeclareBody::UndeclareSubscriber(_), + .. + }) => {} + // Unfiltered remaining message types + NetworkBody::Interest(_) | NetworkBody::OAM(_) | NetworkBody::ResponseFinal(_) => {} } Some(ctx) } diff --git a/zenoh/src/net/routing/interceptor/authorization.rs b/zenoh/src/net/routing/interceptor/authorization.rs index 8b8789fc3b..a7446382d1 100644 --- a/zenoh/src/net/routing/interceptor/authorization.rs +++ b/zenoh/src/net/routing/interceptor/authorization.rs @@ -184,15 +184,19 @@ impl PermissionPolicy { struct ActionPolicy { query: PermissionPolicy, put: PermissionPolicy, + delete: PermissionPolicy, declare_subscriber: PermissionPolicy, declare_queryable: PermissionPolicy, + reply: PermissionPolicy, } impl ActionPolicy { fn action(&self, action: AclMessage) -> &PermissionPolicy { match action { AclMessage::Query => &self.query, + AclMessage::Reply => &self.reply, AclMessage::Put => &self.put, + AclMessage::Delete => &self.delete, AclMessage::DeclareSubscriber => &self.declare_subscriber, AclMessage::DeclareQueryable => &self.declare_queryable, } @@ -200,7 +204,9 @@ impl ActionPolicy { fn action_mut(&mut self, action: AclMessage) -> &mut PermissionPolicy { match action { AclMessage::Query => &mut self.query, + AclMessage::Reply => &mut self.reply, AclMessage::Put => &mut self.put, + AclMessage::Delete => &mut self.delete, AclMessage::DeclareSubscriber => &mut self.declare_subscriber, AclMessage::DeclareQueryable => &mut self.declare_queryable, } diff --git a/zenoh/tests/acl.rs b/zenoh/tests/acl.rs index 0a08090569..13104338b7 100644 --- a/zenoh/tests/acl.rs +++ b/zenoh/tests/acl.rs @@ -23,6 +23,7 @@ mod test { config, config::{EndPoint, WhatAmI}, prelude::*, + sample::SampleKind, Config, Session, }; use zenoh_core::{zlock, ztimeout}; @@ -43,12 +44,21 @@ mod test { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_acl_get_queryable() { + zenoh::try_init_log_from_env(); test_get_qbl_deny(27448).await; test_get_qbl_allow(27448).await; test_get_qbl_allow_then_deny(27448).await; test_get_qbl_deny_then_allow(27448).await; } + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_acl_queryable_reply() { + zenoh::try_init_log_from_env(); + // Only test cases not covered by `test_acl_get_queryable` + test_reply_deny(27449).await; + test_reply_allow_then_deny(27449).await; + } + async fn get_basic_router_config(port: u16) -> Config { let mut config = config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); @@ -105,12 +115,20 @@ mod test { { let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + let temp_recv_value = received_value.clone(); + let deleted_clone = deleted.clone(); let subscriber = sub_session .declare_subscriber(KEY_EXPR) .callback(move |sample| { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().deserialize::().unwrap(); + if sample.kind() == SampleKind::Put { + let mut temp_value = zlock!(temp_recv_value); + *temp_value = sample.payload().deserialize::().unwrap(); + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } }) .await .unwrap(); @@ -119,6 +137,10 @@ mod test { publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; assert_ne!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!(*zlock!(deleted))); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; @@ -147,22 +169,32 @@ mod test { { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + let temp_recv_value = received_value.clone(); - let subscriber = - ztimeout!(sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().deserialize::().unwrap(); - })) + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); tokio::time::sleep(SLEEP).await; - - ztimeout!(publisher.put(VALUE)).unwrap(); + publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; - assert_eq!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(*zlock!(deleted)); ztimeout!(subscriber.undeclare()).unwrap(); } @@ -184,9 +216,10 @@ mod test { { "id": "r1", "permission": "deny", - "flows": ["egress"], + "flows": ["egress", "ingress"], "messages": [ "put", + "delete", "declare_subscriber" ], "key_exprs": [ @@ -218,22 +251,32 @@ mod test { { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + let temp_recv_value = received_value.clone(); - let subscriber = - ztimeout!(sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().deserialize::().unwrap(); - })) + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); tokio::time::sleep(SLEEP).await; - - ztimeout!(publisher.put(VALUE)).unwrap(); + publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; - assert_ne!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(!(*zlock!(deleted))); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; @@ -257,6 +300,7 @@ mod test { "flows": ["egress", "ingress"], "messages": [ "put", + "delete", "declare_subscriber" ], "key_exprs": [ @@ -288,22 +332,32 @@ mod test { { let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); let received_value = Arc::new(Mutex::new(String::new())); + let deleted = Arc::new(Mutex::new(false)); + let temp_recv_value = received_value.clone(); - let subscriber = - ztimeout!(sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { + let deleted_clone = deleted.clone(); + let subscriber = sub_session + .declare_subscriber(KEY_EXPR) + .callback(move |sample| { + if sample.kind() == SampleKind::Put { let mut temp_value = zlock!(temp_recv_value); *temp_value = sample.payload().deserialize::().unwrap(); - })) + } else if sample.kind() == SampleKind::Delete { + let mut deleted = zlock!(deleted_clone); + *deleted = true; + } + }) + .await .unwrap(); tokio::time::sleep(SLEEP).await; - - ztimeout!(publisher.put(VALUE)).unwrap(); + publisher.put(VALUE).await.unwrap(); tokio::time::sleep(SLEEP).await; - assert_eq!(*zlock!(received_value), VALUE); + + publisher.delete().await.unwrap(); + tokio::time::sleep(SLEEP).await; + assert!(*zlock!(deleted)); ztimeout!(subscriber.undeclare()).unwrap(); } close_sessions(sub_session, pub_session).await; @@ -320,9 +374,24 @@ mod test { r#"{ "enabled": true, "default_permission": "deny", - "rules": [], - "subjects": [], - "policies": [], + "rules": [ + { + "id": "allow reply", + "permission": "allow", + "messages": ["reply"], + "flows": ["egress", "ingress"], + "key_exprs": ["test/demo"], + } + ], + "subjects": [ + { "id": "all" } + ], + "policies": [ + { + "rules": ["allow reply"], + "subjects": ["all"], + } + ], }"#, ) .unwrap(); @@ -435,7 +504,8 @@ mod test { "flows": ["egress", "ingress"], "messages": [ "query", - "declare_queryable" + "declare_queryable", + "reply" ], "key_exprs": [ "test/demo" @@ -512,7 +582,7 @@ mod test { { "id": "r1", "permission": "deny", - "flows": ["egress"], + "flows": ["egress", "ingress"], "messages": [ "query", "declare_queryable" @@ -576,4 +646,146 @@ mod test { close_sessions(get_session, qbl_session).await; close_router_session(session).await; } + + async fn test_reply_deny(port: u16) { + println!("test_reply_deny"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "deny", + "rules": [ + { + "id": "allow get/declare qbl", + "permission": "allow", + "messages": ["query", "declare_queryable"], + "key_exprs": ["test/demo"], + } + ], + "subjects": [ + { "id": "all" } + ], + "policies": [ + { + "rules": ["allow get/declare qbl"], + "subjects": ["all"], + } + ], + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() + }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().deserialize::().unwrap(); + break; + } + Err(e) => println!("Error : {:?}", e), + } + } + tokio::time::sleep(SLEEP).await; + assert_ne!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); + } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; + } + + async fn test_reply_allow_then_deny(port: u16) { + println!("test_reply_allow_then_deny"); + + let mut config_router = get_basic_router_config(port).await; + config_router + .insert_json5( + "access_control", + r#"{ + "enabled": true, + "default_permission": "allow", + "rules": [ + { + "id": "r1", + "permission": "deny", + "messages": ["reply"], + "flows": ["egress", "ingress"], + "key_exprs": ["test/demo"], + }, + ], + "subjects": [ + { + "id": "s1", + "interfaces": [ + "lo", "lo0" + ], + } + ], + "policies": [ + { + "rules": ["r1"], + "subjects": ["s1"], + } + ] + }"#, + ) + .unwrap(); + println!("Opening router session"); + + let session = ztimeout!(zenoh::open(config_router)).unwrap(); + + let (get_session, qbl_session) = get_client_sessions(port).await; + { + let mut received_value = String::new(); + + let qbl = ztimeout!(qbl_session + .declare_queryable(KEY_EXPR) + .callback(move |sample| { + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() + }); + }); + })) + .unwrap(); + + tokio::time::sleep(SLEEP).await; + let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); + while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { + match reply.result() { + Ok(sample) => { + received_value = sample.payload().deserialize::().unwrap(); + break; + } + Err(e) => println!("Error : {:?}", e), + } + } + tokio::time::sleep(SLEEP).await; + assert_ne!(received_value, VALUE); + ztimeout!(qbl.undeclare()).unwrap(); + } + close_sessions(get_session, qbl_session).await; + close_router_session(session).await; + } } diff --git a/zenoh/tests/authentication.rs b/zenoh/tests/authentication.rs index 09dd3b74eb..8d1e404617 100644 --- a/zenoh/tests/authentication.rs +++ b/zenoh/tests/authentication.rs @@ -945,7 +945,8 @@ client2name:client2passwd"; "flows": ["egress", "ingress"], "messages": [ "query", - "declare_queryable" + "declare_queryable", + "reply", ], "key_exprs": [ "test/demo" @@ -1030,7 +1031,8 @@ client2name:client2passwd"; "flows": ["egress"], "messages": [ "query", - "declare_queryable" + "declare_queryable", + "reply" ], "key_exprs": [ "test/demo" @@ -1256,7 +1258,8 @@ client2name:client2passwd"; "flows": ["egress", "ingress"], "messages": [ "query", - "declare_queryable" + "declare_queryable", + "reply" ], "key_exprs": [ "test/demo" @@ -1342,7 +1345,8 @@ client2name:client2passwd"; "flows": ["egress"], "messages": [ "query", - "declare_queryable" + "declare_queryable", + "reply" ], "key_exprs": [ "test/demo" @@ -1568,7 +1572,8 @@ client2name:client2passwd"; "flows": ["ingress", "egress"], "messages": [ "query", - "declare_queryable" + "declare_queryable", + "reply" ], "key_exprs": [ "test/demo" @@ -1654,7 +1659,8 @@ client2name:client2passwd"; "flows": ["egress"], "messages": [ "query", - "declare_queryable" + "declare_queryable", + "reply" ], "key_exprs": [ "test/demo"