Skip to content

Commit

Permalink
Merge pull request #3483 from sever-sever/T6364
Browse files Browse the repository at this point in the history
T6364: CGNAT drop hard limit that allows only one translation rule
  • Loading branch information
dmbaturin authored May 19, 2024
2 parents 65c8e9e + 2371c26 commit bc34540
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 44 deletions.
1 change: 1 addition & 0 deletions interface-definitions/nat_cgnat.xml.in
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<validator name="ipv4-host"/>
<validator name="ipv4-range"/>
</constraint>
<multi/>
</properties>
</leafNode>
</children>
Expand Down
110 changes: 66 additions & 44 deletions src/conf_mode/nat_cgnat.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,6 @@ def verify(config):
if 'rule' not in config:
raise ConfigError(f'Rule must be defined!')

# As PoC allow only one rule for CGNAT translations
# one internal pool and one external pool
if len(config['rule']) > 1:
raise ConfigError(f'Only one rule is allowed for translations!')

for pool in ('external', 'internal'):
if pool not in config['pool']:
raise ConfigError(f'{pool} pool must be defined!')
Expand All @@ -208,6 +203,8 @@ def verify(config):
internal_pools_query = "keys(pool.internal)"
internal_pools: list = jmespath.search(internal_pools_query, config)

used_external_pools = {}
used_internal_pools = {}
for rule, rule_config in config['rule'].items():
if 'source' not in rule_config:
raise ConfigError(f'Rule "{rule}" source pool must be defined!')
Expand All @@ -217,57 +214,82 @@ def verify(config):
if 'translation' not in rule_config:
raise ConfigError(f'Rule "{rule}" translation pool must be defined!')

# Check if pool exists
internal_pool = rule_config['source']['pool']
if internal_pool not in internal_pools:
raise ConfigError(f'Internal pool "{internal_pool}" does not exist!')

external_pool = rule_config['translation']['pool']
if external_pool not in external_pools:
raise ConfigError(f'External pool "{external_pool}" does not exist!')

# Check pool duplication in different rules
if external_pool in used_external_pools:
raise ConfigError(
f'External pool "{external_pool}" is already used in rule '
f'{used_external_pools[external_pool]} and cannot be used in '
f'rule {rule}!'
)

if internal_pool in used_internal_pools:
raise ConfigError(
f'Internal pool "{internal_pool}" is already used in rule '
f'{used_internal_pools[internal_pool]} and cannot be used in '
f'rule {rule}!'
)

used_external_pools[external_pool] = rule
used_internal_pools[internal_pool] = rule


def generate(config):
if not config:
return None
# first external pool as we allow only one as PoC
ext_pool_name = jmespath.search("rule.*.translation | [0]", config).get('pool')
int_pool_name = jmespath.search("rule.*.source | [0]", config).get('pool')
ext_query = f'pool.external."{ext_pool_name}".range | keys(@)'
int_query = f'pool.internal."{int_pool_name}".range'
external_ranges = jmespath.search(ext_query, config)
internal_ranges = [jmespath.search(int_query, config)]

external_list_count = []
external_list_hosts = []
internal_list_count = []
internal_list_hosts = []
for ext_range in external_ranges:
# External hosts count
e_count = IPOperations(ext_range).get_ips_count()
external_list_count.append(e_count)
# External hosts list
e_hosts = IPOperations(ext_range).convert_prefix_to_list_ips()
external_list_hosts.extend(e_hosts)
for int_range in internal_ranges:
# Internal hosts count
i_count = IPOperations(int_range).get_ips_count()
internal_list_count.append(i_count)
# Internal hosts list
i_hosts = IPOperations(int_range).convert_prefix_to_list_ips()
internal_list_hosts.extend(i_hosts)

external_host_count = sum(external_list_count)
internal_host_count = sum(internal_list_count)
ports_per_user = int(
jmespath.search(f'pool.external."{ext_pool_name}".per_user_limit.port', config)
)
external_port_range: str = jmespath.search(
f'pool.external."{ext_pool_name}".external_port_range', config
)

proto_maps, other_maps = generate_port_rules(
external_list_hosts, internal_list_hosts, ports_per_user, external_port_range
)
proto_maps = []
other_maps = []

for rule, rule_config in config['rule'].items():
ext_pool_name: str = rule_config['translation']['pool']
int_pool_name: str = rule_config['source']['pool']

external_ranges: list = [range for range in config['pool']['external'][ext_pool_name]['range']]
internal_ranges: list = [range for range in config['pool']['internal'][int_pool_name]['range']]
external_list_hosts_count = []
external_list_hosts = []
internal_list_hosts_count = []
internal_list_hosts = []

for ext_range in external_ranges:
# External hosts count
e_count = IPOperations(ext_range).get_ips_count()
external_list_hosts_count.append(e_count)
# External hosts list
e_hosts = IPOperations(ext_range).convert_prefix_to_list_ips()
external_list_hosts.extend(e_hosts)

for int_range in internal_ranges:
# Internal hosts count
i_count = IPOperations(int_range).get_ips_count()
internal_list_hosts_count.append(i_count)
# Internal hosts list
i_hosts = IPOperations(int_range).convert_prefix_to_list_ips()
internal_list_hosts.extend(i_hosts)

external_host_count = sum(external_list_hosts_count)
internal_host_count = sum(internal_list_hosts_count)
ports_per_user = int(
jmespath.search(f'pool.external."{ext_pool_name}".per_user_limit.port', config)
)
external_port_range: str = jmespath.search(
f'pool.external."{ext_pool_name}".external_port_range', config
)

rule_proto_maps, rule_other_maps = generate_port_rules(
external_list_hosts, internal_list_hosts, ports_per_user, external_port_range
)

proto_maps.extend(rule_proto_maps)
other_maps.extend(rule_other_maps)

config['proto_map_elements'] = ', '.join(proto_maps)
config['other_map_elements'] = ', '.join(other_maps)
Expand Down

0 comments on commit bc34540

Please sign in to comment.