From ce5138ca7ff8b390839d153b1568bec8caa3a8e5 Mon Sep 17 00:00:00 2001 From: "Kemal T. Yesilbek" Date: Thu, 27 Jan 2022 10:06:23 +0100 Subject: [PATCH 1/2] FNV1a_32 bit partitioner class implementation --- kafka/partitioner/fnv1a_32.py | 94 +++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 kafka/partitioner/fnv1a_32.py diff --git a/kafka/partitioner/fnv1a_32.py b/kafka/partitioner/fnv1a_32.py new file mode 100644 index 000000000..7a8de7f65 --- /dev/null +++ b/kafka/partitioner/fnv1a_32.py @@ -0,0 +1,94 @@ +from __future__ import absolute_import + +import random + +from kafka.vendor import six + + +class FNV1a32Partitioner(object): + """Partitioner with FNV1a 32-bit hash algorithm. + + Hashes key to partition using FNV1a 32-bit hashing. + If key is None, selects partition randomly from available, + or from all partitions if none are currently available. + """ + @classmethod + def __call__(cls, key, all_partitions, available): + """ + Partitioner implementation using FNV1a 32-bit hashing function and + decimal conversion with two's complement. If key is passed with None + value, the selection of the partition is random. + + The implementation details are selected to make sure the same key + is mapped to the same partition in Goka/Sarama. Because of the differences + in fundamental operations how two languages (go and python), we had + to implement the hash calculation and conversions ourselves. This + way, we made sure this partitioner works exactly same as Goka/Sarama. + It + + Algorithm details: + http://www.isthe.com/chongo/tech/comp/fnv/#FNV-param + + :param key: partitioning key + :param all_partitions: list of all partitions sorted by partition ID + :param available: list of available partitions in no particular order + :return: one of the values from all_partitions or available + """ + if key is None: + if available: + return random.choice(available) + return random.choice(all_partitions) + + key_hash = _get_fnv1a_32(key) + key_hash = _get_twos_complement_32bit(key_hash) + key_hash = abs(key_hash) + idx = key_hash % len(all_partitions) + return all_partitions[idx] + + +def _get_twos_complement_32bit(value: int) -> int: + """ + Returns the signed two's complement decimal conversion. + + Algorithm details: + http://sandbox.mc.edu/~bennet/cs110/tc/tctod.html + + Taken from: + https://stackoverflow.com/questions/1604464/twos-complement-in-python + """ + bit_base = 32 + if (value & (1 << (bit_base - 1))) != 0: + value = value - (1 << bit_base) + return value + + +def _get_fnv1a_32(key: bytes) -> int: + """ + Returns the FNV1a 32bit hash of the given key. + + Algorithm details: + http://www.isthe.com/chongo/tech/comp/fnv/#FNV-param + + Taken from: + https://github.com/znerol/py-fnvhash/blob/master/fnvhash/__init__.py + """ + # We set the same init_offset and prime for the FNV hasher as + # defined in Golang FNV package to have full compatibility. The Go FNV + # is the package Sarama uses for its hashing calculations under the hood + # References: + # https://cs.opensource.google/go/go/+/refs/tags/go1.17.3:src/hash/fnv/fnv.go;l=31 + # https://cs.opensource.google/go/go/+/refs/tags/go1.17.3:src/hash/fnv/fnv.go;l=35 + init_offset = 0x811c9dc5 + prime = 0x01000193 + hash_size = 2 ** 32 + + # Python2 bytes is really a str, causing the bitwise operations below to fail + # so convert to bytearray. + if six.PY2: + key = bytearray(bytes(key)) + + key_hash = init_offset + for byte in key: + key_hash = key_hash ^ byte + key_hash = (key_hash * prime) % hash_size + return key_hash From 7c8c87830e9b8bd9a14514c263b790eb4370feb2 Mon Sep 17 00:00:00 2001 From: "Kemal T. Yesilbek" Date: Thu, 27 Jan 2022 11:05:26 +0100 Subject: [PATCH 2/2] Added tests and improved docs --- kafka/partitioner/fnv1a_32.py | 12 +++++------- test/test_partitioner.py | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/kafka/partitioner/fnv1a_32.py b/kafka/partitioner/fnv1a_32.py index 7a8de7f65..ae1b04496 100644 --- a/kafka/partitioner/fnv1a_32.py +++ b/kafka/partitioner/fnv1a_32.py @@ -20,11 +20,9 @@ def __call__(cls, key, all_partitions, available): value, the selection of the partition is random. The implementation details are selected to make sure the same key - is mapped to the same partition in Goka/Sarama. Because of the differences - in fundamental operations how two languages (go and python), we had - to implement the hash calculation and conversions ourselves. This - way, we made sure this partitioner works exactly same as Goka/Sarama. - It + is mapped to the same partition in Goka/Sarama. It is confirmed + that this implementation works the same as the partitioner of + github.com/lovoo/goka v1.0.5 with Go version 1.16. Algorithm details: http://www.isthe.com/chongo/tech/comp/fnv/#FNV-param @@ -73,8 +71,8 @@ def _get_fnv1a_32(key: bytes) -> int: https://github.com/znerol/py-fnvhash/blob/master/fnvhash/__init__.py """ # We set the same init_offset and prime for the FNV hasher as - # defined in Golang FNV package to have full compatibility. The Go FNV - # is the package Sarama uses for its hashing calculations under the hood + # defined in Golang FNV package. The Go FNV is the package Sarama + # uses for its hashing calculations under the hood. # References: # https://cs.opensource.google/go/go/+/refs/tags/go1.17.3:src/hash/fnv/fnv.go;l=31 # https://cs.opensource.google/go/go/+/refs/tags/go1.17.3:src/hash/fnv/fnv.go;l=35 diff --git a/test/test_partitioner.py b/test/test_partitioner.py index 853fbf69e..5edfb8d8f 100644 --- a/test/test_partitioner.py +++ b/test/test_partitioner.py @@ -3,6 +3,7 @@ import pytest from kafka.partitioner import DefaultPartitioner, murmur2 +from kafka.partitioner.fnv1a_32 import FNV1a32Partitioner, _get_twos_complement_32bit, _get_fnv1a_32 def test_default_partitioner(): @@ -36,3 +37,17 @@ def test_murmur2_not_ascii(): # Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode murmur2(b'\xa4') murmur2(b'\x81' * 1000) + + +@pytest.mark.parametrize("key,partitions,available,expected", [ + (b"123", [0, 1, 2], [0, 1, 2], 2), + (b"123", [0, 1], [0, 1], 1), + (b"123", [0], [0], 0), + (b"f232oo3232", [0, 1, 2, 3], [0, 1, 2, 3], 2), + (b"f232oo3232", [0, 1], [0, 1], 0), + (b"f232oo3232", [0], [0], 0), +]) +def test_fnv1a_32_partitioner(key, partitions, available, expected): + partitioner = FNV1a32Partitioner() + out = partitioner(key, partitions, available) + assert out == expected