diff --git a/frontends/concrete-python/benchmarks/levenshtein_distance.py b/frontends/concrete-python/benchmarks/levenshtein_distance.py new file mode 100644 index 0000000000..5431c5bc96 --- /dev/null +++ b/frontends/concrete-python/benchmarks/levenshtein_distance.py @@ -0,0 +1,167 @@ +""" +Benchmarks of the levenshtein distance example. +""" + +from functools import lru_cache +from pathlib import Path +from typing import Tuple + +import py_progress_tracker as progress + +from concrete import fhe +from examples.levenshtein_distance.levenshtein_distance import Alphabet, LevenshteinDistance + + +@lru_cache +def levenshtein_on_server( + server: fhe.Server, + x: Tuple[fhe.Value], + y: Tuple[fhe.Value], + evaluation_keys: fhe.EvaluationKeys, +): + """ + Compute levenshtein distance on a server. + """ + + if len(x) == 0: + return server.run( + len(y), + function_name="constant", + evaluation_keys=evaluation_keys, + ) + + if len(y) == 0: + return server.run( + len(x), + function_name="constant", + evaluation_keys=evaluation_keys, + ) + + if_equal = levenshtein_on_server(server, x[1:], y[1:], evaluation_keys) + case_1 = levenshtein_on_server(server, x[1:], y, evaluation_keys) + case_2 = levenshtein_on_server(server, x, y[1:], evaluation_keys) + case_3 = if_equal + + is_equal = server.run( + x[0], + y[0], + function_name="equal", + evaluation_keys=evaluation_keys, + ) + result = server.run( + is_equal, + if_equal, + case_1, + case_2, + case_3, + function_name="mix", + evaluation_keys=evaluation_keys, + ) + + return result + + +def targets(): + """ + Generates targets to benchmark. + """ + + result = [] + for alphabet in ["ACTG", "string"]: + for max_string_length in [2, 4, 8]: + result.append( + { + "id": ( + f"levenshtein-distance :: " + f"alphabet = {alphabet} | max_string_size = {max_string_length}" + ), + "name": ( + f"Levenshtein distance between two strings " + f"of length {max_string_length} " + f"from {alphabet} alphabet" + ), + "parameters": { + "alphabet": alphabet, + "max_string_length": max_string_length, + }, + } + ) + return result + + +@progress.track(targets()) +def main(alphabet, max_string_length): + """ + Benchmark a target. + + Args: + alphabet: + alphabet of the inputs + max_string_length: + maximum size of the inputs + """ + + cached_server = Path(f"levenshtein.{alphabet}.{max_string_length}.server.zip") + alphabet = Alphabet.init_by_name(alphabet) + + print("Compiling...") + if cached_server.exists(): + server = fhe.Server.load(cached_server) + client = fhe.Client(server.client_specs, keyset_cache_directory=".keys") + else: + levenshtein_distance = LevenshteinDistance( + alphabet, + max_string_length, + configuration=fhe.Configuration( + enable_unsafe_features=True, + use_insecure_key_cache=True, + insecure_key_cache_location=".keys", + ), + ) + levenshtein_distance.module.server.save(cached_server) + + server = levenshtein_distance.module.server + client = levenshtein_distance.module.client + + print("Generating keys...") + client.keygen() + + print("Warming up...") + + sample_a = alphabet.random_string(max_string_length) + sample_b = alphabet.random_string(max_string_length) + + encrypted_sample_a = tuple( + client.encrypt(ai, None, function_name="equal")[0] for ai in alphabet.encode(sample_a) + ) + encrypted_sample_b = tuple( + client.encrypt(None, bi, function_name="equal")[1] for bi in alphabet.encode(sample_b) + ) + + levenshtein_on_server( + server, + encrypted_sample_a, + encrypted_sample_b, + client.evaluation_keys, + ) + + for i in range(5): + print(f"Running subsample {i + 1} out of 5...") + + sample_a = alphabet.random_string(max_string_length) + sample_b = alphabet.random_string(max_string_length) + + encrypted_sample_a = tuple( + client.encrypt(ai, None, function_name="equal")[0] for ai in alphabet.encode(sample_a) + ) + encrypted_sample_b = tuple( + client.encrypt(None, bi, function_name="equal")[1] for bi in alphabet.encode(sample_b) + ) + + with progress.measure(id="evaluation-time-ms", label="Evaluation Time (ms)"): + levenshtein_on_server( + server, + encrypted_sample_a, + encrypted_sample_b, + client.evaluation_keys, + ) diff --git a/frontends/concrete-python/examples/levenshtein_distance/levenshtein_distance.py b/frontends/concrete-python/examples/levenshtein_distance/levenshtein_distance.py index f2f5112801..c31d12a823 100644 --- a/frontends/concrete-python/examples/levenshtein_distance/levenshtein_distance.py +++ b/frontends/concrete-python/examples/levenshtein_distance/levenshtein_distance.py @@ -1,19 +1,19 @@ -# Computing Levenstein distance between strings, https://en.wikipedia.org/wiki/Levenshtein_distance +# Computing Levenstein distance between strings (https://en.wikipedia.org/wiki/Levenshtein_distance) # ruff: noqa:S311,N805 # pylint: disable=no-self-argument +# mypy: disable-error-code="valid-type" import argparse import random import time from functools import lru_cache +from typing import Optional import numpy as np from concrete import fhe -# mypy: disable-error-code="valid-type" - class Alphabet: letters: str = "" @@ -62,9 +62,6 @@ def return_available_alphabets() -> list: @staticmethod def init_by_name(alphabet_name: str) -> "Alphabet": """Set the alphabet.""" - assert ( - alphabet_name in Alphabet.return_available_alphabets() - ), f"Unknown alphabet {alphabet_name}" if alphabet_name == "string": return Alphabet.lowercase() @@ -74,19 +71,19 @@ def init_by_name(alphabet_name: str) -> "Alphabet": return Alphabet.anycase() if alphabet_name == "name": return Alphabet.name() + if alphabet_name == "ACTG": + return Alphabet.dna() - assert alphabet_name == "ACTG", f"Unknown alphabet {alphabet_name}" - return Alphabet.dna() + raise ValueError(f"Unknown alphabet {alphabet_name}") def random_pick_in_values(self) -> int: """Pick the integer-encoding of a random char in an alphabet.""" return np.random.randint(len(self.mapping_to_int)) - def _random_string(self, length: int) -> str: + def random_string(self, length: int) -> str: """Pick a random string in the alphabet.""" ans = "".join([random.choice(list(self.mapping_to_int)) for _ in range(length)]) - check = [c in self.letters for c in ans] - assert all(check), "Issue in generation" + assert all([c in self.letters for c in ans]), "Issue in generation" return ans def prepare_random_patterns(self, len_min: int, len_max: int, nb_strings: int) -> list: @@ -99,8 +96,8 @@ def prepare_random_patterns(self, len_min: int, len_max: int, nb_strings: int) - for length_2 in range(len_min, len_max + 1): list_patterns += [ ( - self._random_string(length_1), - self._random_string(length_2), + self.random_string(length_1), + self.random_string(length_2), ) for _ in range(1) ] @@ -125,35 +122,46 @@ def encode(self, string: str) -> tuple: class LevenshteinDistance: alphabet: Alphabet - module: fhe.module # type: ignore + max_string_length: int + module: fhe.Module + configuration: fhe.Configuration def __init__( - self, alphabet: Alphabet, max_string_length: int, show_mlir: bool, show_optimizer: bool + self, + alphabet: Alphabet, + max_string_length: int, + compiled: bool = True, + configuration: Optional[fhe.Configuration] = None, + **kwargs, ): - self.alphabet = alphabet + if configuration is None: + configuration = fhe.Configuration() - self._compile_module(max_string_length, show_mlir, show_optimizer) + self.alphabet = alphabet + self.max_string_length = max_string_length + self.configuration = configuration.fork( + p_error=10**-20, + comparison_strategy_preference=fhe.ComparisonStrategy.ONE_TLU_PROMOTED, + min_max_strategy_preference=fhe.MinMaxStrategy.ONE_TLU_PROMOTED, + **kwargs, + ) + if compiled: + self._compile_module() - def calculate(self, a: str, b: str, mode: str, show_distance: bool = False): + def calculate(self, a: str, b: str, mode: str, show_distance: bool = False) -> int: """Compute a distance between two strings, either in fhe or in simulate.""" if mode == "simulate": - self._compute_in_simulation([(a, b)], show_distance=show_distance) - else: - assert mode == "fhe", "Only 'simulate' and 'fhe' mode are available" - self._compute_in_fhe([(a, b)], show_distance=show_distance) - - def calculate_and_return(self, a: str, b: str, mode: str) -> int: - """Return distance between two strings, either in fhe or in simulate.""" - if mode == "simulate": - return self._compute_and_return_in_simulation(a, b) + return self._compute_in_simulation([(a, b)])[0] assert mode == "fhe", "Only 'simulate' and 'fhe' mode are available" - return self._compute_and_return_in_fhe(a, b) + return self._compute_in_fhe([(a, b)], show_distance=show_distance)[0] - def calculate_list(self, pairs_to_compute_on: list, mode: str): + def calculate_list(self, pairs_to_compute_on: list, mode: str) -> list: """Compute a distance between strings of a list, either in fhe or in simulate.""" + results = [] for a, b in pairs_to_compute_on: - self.calculate(a, b, mode) + results.append(self.calculate(a, b, mode)) + return results def _encode_and_encrypt_strings(self, a: str, b: str) -> tuple: """Encode a string, ie map it to integers using the alphabet, and then encrypt @@ -166,7 +174,7 @@ def _encode_and_encrypt_strings(self, a: str, b: str) -> tuple: return a_enc, b_enc - def _compile_module(self, max_string_length: int, show_mlir: bool, show_optimizer: bool): + def _compile_module(self): """Compile the FHE module.""" assert len(self.alphabet.mapping_to_int) > 0, "Mapping not defined" @@ -180,10 +188,10 @@ def _compile_module(self, max_string_length: int, show_mlir: bool, show_optimize inputset_mix = [ ( np.random.randint(2), - np.random.randint(max_string_length), - np.random.randint(max_string_length), - np.random.randint(max_string_length), - np.random.randint(max_string_length), + np.random.randint(self.max_string_length), + np.random.randint(self.max_string_length), + np.random.randint(self.max_string_length), + np.random.randint(self.max_string_length), ) for _ in range(1000) ] @@ -195,68 +203,54 @@ def _compile_module(self, max_string_length: int, show_mlir: bool, show_optimize "mix": inputset_mix, "constant": [i for i in range(len(self.alphabet.mapping_to_int))], }, - show_mlir=show_mlir, - p_error=10**-20, - show_optimizer=show_optimizer, - comparison_strategy_preference=fhe.ComparisonStrategy.ONE_TLU_PROMOTED, - min_max_strategy_preference=fhe.MinMaxStrategy.ONE_TLU_PROMOTED, + self.configuration, ) - def _compute_and_return_in_simulation(self, a: str, b: str) -> int: - """Check equality between distance in simulation and clear distance, and return.""" - a_as_int = self.alphabet.encode(a) - b_as_int = self.alphabet.encode(b) - - l1_simulate = levenshtein_simulate(self.module, a_as_int, b_as_int) - l1_clear = levenshtein_clear(a_as_int, b_as_int) - - assert l1_simulate == l1_clear, f" {l1_simulate=} and {l1_clear=} are different" - - return int(l1_simulate) - - def _compute_in_simulation(self, list_patterns: list, show_distance: bool = False): + def _compute_in_simulation(self, list_patterns: list) -> list: """Check equality between distance in simulation and clear distance.""" + results = [] for a, b in list_patterns: print(f" Computing Levenshtein between strings '{a}' and '{b}'", end="") - l1_simulate = self._compute_and_return_in_simulation(a, b) - - if not show_distance: - print(" - OK") - else: - print(f" - distance is {l1_simulate}") - - def _compute_and_return_in_fhe(self, a: str, b: str): - """Check equality between distance in FHE and clear distance.""" - a_enc, b_enc = self._encode_and_encrypt_strings(a, b) - - l1_fhe_enc = levenshtein_fhe(self.module, a_enc, b_enc) - l1_fhe = self.module.mix.decrypt(l1_fhe_enc) # type: ignore - l1_clear = levenshtein_clear(a, b) + a_as_int = self.alphabet.encode(a) + b_as_int = self.alphabet.encode(b) - assert l1_fhe == l1_clear, f" {l1_fhe=} and {l1_clear=} are different" + l1_simulate = levenshtein_simulate(self.module, a_as_int, b_as_int) + l1_clear = levenshtein_clear(a_as_int, b_as_int) - return l1_fhe + results.append(l1_simulate) + assert l1_simulate == l1_clear, f" {l1_simulate=} and {l1_clear=} are different" + print(" - OK") + return results - def _compute_in_fhe(self, list_patterns: list, show_distance: bool = False): + def _compute_in_fhe(self, list_patterns: list, show_distance: bool = False) -> list: """Check equality between distance in FHE and clear distance.""" self.module.keygen() # type: ignore # Checks in FHE + results = [] for a, b in list_patterns: print(f" Computing Levenshtein between strings '{a}' and '{b}'", end="") + a_enc, b_enc = self._encode_and_encrypt_strings(a, b) + time_begin = time.time() - l1_fhe = self._compute_and_return_in_fhe(a, b) + l1_fhe_enc = levenshtein_fhe(self.module, a_enc, b_enc) time_end = time.time() + l1_fhe = self.module.mix.decrypt(l1_fhe_enc) # type: ignore + l1_clear = levenshtein_clear(a, b) + + results.append(l1_fhe) + assert l1_fhe == l1_clear, f" {l1_fhe=} and {l1_clear=} are different" + if not show_distance: print(f" - OK in {time_end - time_begin:.2f} seconds") else: print(f" - distance is {l1_fhe}, computed in {time_end - time_begin:.2f} seconds") + return results -# Module FHE @fhe.module() class LevenshsteinModule: @fhe.function({"x": "encrypted", "y": "encrypted"}) @@ -278,8 +272,13 @@ def constant(x): } ) def mix(is_equal, if_equal, case_1, case_2, case_3): - """Compute the min of (case_1, case_2, case_3), and then return `if_equal` if `is_equal` is - True, or the min in the other case.""" + """ + Mix the previous results. + + Return `if_equal` if `is_equal` is True, + otherwise the minimum value between `case_1`, `case_2` and `case_3`. + """ + min_12 = np.minimum(case_1, case_2) min_123 = np.minimum(min_12, case_3) @@ -290,16 +289,28 @@ def mix(is_equal, if_equal, case_1, case_2, case_3): # - input 2 of mix # - input 3 of mix # - input 4 of mix - # or just be the final output + # or just be the final output. # # There is a single output of equal, it goes to input 0 of mix composition = fhe.Wired( { + # There is a single output of equal, it can go to + # - input 0 of mix fhe.Wire(fhe.AllOutputs(equal), fhe.Input(mix, 0)), + # There is a single output of mix, it can go to + # - input 1 of mix + # - input 2 of mix + # - input 3 of mix + # - input 4 of mix fhe.Wire(fhe.AllOutputs(mix), fhe.Input(mix, 1)), fhe.Wire(fhe.AllOutputs(mix), fhe.Input(mix, 2)), fhe.Wire(fhe.AllOutputs(mix), fhe.Input(mix, 3)), fhe.Wire(fhe.AllOutputs(mix), fhe.Input(mix, 4)), + # There is a single output of constant, it can go to + # - input 1 of mix + # - input 2 of mix + # - input 3 of mix + # - input 4 of mix fhe.Wire(fhe.AllOutputs(constant), fhe.Input(mix, 1)), fhe.Wire(fhe.AllOutputs(constant), fhe.Input(mix, 2)), fhe.Wire(fhe.AllOutputs(constant), fhe.Input(mix, 3)), @@ -450,11 +461,20 @@ def main(): # Options by the user args = manage_args() + max_string_length = args.max_string_length + configuration_overwrites = {} + if args.show_mlir: + configuration_overwrites["show_mlir"] = True + if args.show_optimizer: + configuration_overwrites["show_optimizer"] = True + # Do what the user requested if args.autotest: alphabet = Alphabet.init_by_name(args.alphabet) levenshtein_distance = LevenshteinDistance( - alphabet, args.max_string_length, args.show_mlir, args.show_optimizer + alphabet, + max_string_length, + **configuration_overwrites, ) print(f"Making random tests with alphabet {args.alphabet}") @@ -476,7 +496,9 @@ def main(): alphabet = Alphabet.init_by_name(alphabet_name) levenshtein_distance = LevenshteinDistance( - alphabet, args.max_string_length, args.show_mlir, args.show_optimizer + alphabet, + max_string_length, + **configuration_overwrites, ) list_patterns = alphabet.prepare_random_patterns( args.max_string_length, args.max_string_length, 3 @@ -512,7 +534,9 @@ def main(): alphabet = Alphabet.init_by_name(args.alphabet) levenshtein_distance = LevenshteinDistance( - alphabet, args.max_string_length, args.show_mlir, args.show_optimizer + alphabet, + max_string_length, + **configuration_overwrites, ) levenshtein_distance.calculate(string0, string1, mode="fhe", show_distance=True) print("") diff --git a/frontends/concrete-python/tests/execution/test_examples.py b/frontends/concrete-python/tests/execution/test_examples.py index e26eb0d46a..b32bffa32f 100644 --- a/frontends/concrete-python/tests/execution/test_examples.py +++ b/frontends/concrete-python/tests/execution/test_examples.py @@ -8,7 +8,11 @@ import pytest from examples.key_value_database.static_size import StaticKeyValueDatabase -from examples.levenshtein_distance.levenshtein_distance import Alphabet, LevenshteinDistance +from examples.levenshtein_distance.levenshtein_distance import ( + Alphabet, + LevenshteinDistance, + levenshtein_clear, +) def test_static_kvdb(helpers): @@ -181,3 +185,89 @@ def query(db: StaticKeyValueDatabase, key: int) -> Optional[int]: ) assert query(db, maximum_key) == minimum_value + + +@pytest.mark.parametrize( + "mode", + [ + "simulate", + "fhe", + ], +) +def test_levenshtein_distance(mode, helpers): + """ + Test levenshtein distance example. + """ + + configuration = helpers.configuration() + if mode == "simulate": + configuration = configuration.fork(fhe_execution=False, fhe_simulation=True) + + alphabet = Alphabet.lowercase() + max_string_length = 5 + + levenshtein_distance = LevenshteinDistance(alphabet, max_string_length, configuration) + levenshtein_distance.module.keygen() + + samples = [ + # same + ("hello", "hello", 0), + # one character missing from the end + ("hell", "hello", 1), + ("hello", "hell", 1), + # one character missing from the start + ("ello", "hello", 1), + ("hello", "ello", 1), + # one character missing from the middle + ("hllo", "hello", 1), + ("hello", "hllo", 1), + # two characters missing from the start and the end + ("ell", "hello", 2), + ("hello", "ell", 2), + # three characters missing from the start, the end and the middle + ("el", "hello", 3), + ("hello", "el", 3), + # shifted one character + ("hello", "elloh", 2), + ("elloh", "hello", 2), + # shifted two characters + ("hello", "llohe", 4), + ("llohe", "hello", 4), + # shifted three characters + ("hello", "lohel", 4), + ("lohel", "hello", 4), + # shifted four characters + ("hello", "ohell", 2), + ("ohell", "hello", 2), + # completely different + ("hello", "numpy", 5), + ] + + for str1, str2, expected_distance in samples: + actual_distance = levenshtein_distance.calculate(str1, str2, mode, show_distance=True) + assert actual_distance == expected_distance + + +@pytest.mark.parametrize( + "alphabet_name", + Alphabet.return_available_alphabets(), +) +@pytest.mark.parametrize( + "max_length", + [2, 3], +) +def test_levenshtein_distance_randomly(alphabet_name, max_length, helpers): + """ + Test levenshtein distance example with randomly generated strings. + """ + + configuration = helpers.configuration().fork(fhe_execution=False, fhe_simulation=True) + + alphabet = Alphabet.init_by_name(alphabet_name) + levenshtein_distance = LevenshteinDistance(alphabet, max_length, configuration) + levenshtein_distance.module.keygen() + + for str1, str2 in alphabet.prepare_random_patterns(0, max_length, nb_strings=3): + expected_distance = levenshtein_clear(str1, str2) + actual_distance = levenshtein_distance.calculate(str1, str2, "simulate", show_distance=True) + assert actual_distance == expected_distance