Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create own partitioner #554

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,16 @@ target/
# PyCharm
.idea

# VSCode
.vscode

kafka_2*

tests/ssl_cert
tests/keytab

# Cython extensions
aiokafka/partitioner/murmur2.c
aiokafka/record/_crecords/default_records.c
aiokafka/record/_crecords/legacy_records.c
aiokafka/record/_crecords/memory_records.c
Expand Down
13 changes: 13 additions & 0 deletions aiokafka/partitioner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from __future__ import absolute_import

from kafka.partitioner.roundrobin import RoundRobinPartitioner

from .default import DefaultPartitioner
from .hashed import (
HashedPartitioner, Murmur2Partitioner, LegacyPartitioner, murmur2,
)

__all__ = [
'DefaultPartitioner', 'RoundRobinPartitioner', 'HashedPartitioner',
'Murmur2Partitioner', 'LegacyPartitioner', 'murmur2',
]
32 changes: 32 additions & 0 deletions aiokafka/partitioner/default.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import absolute_import

import random

from .hashed import murmur2


class DefaultPartitioner(object):
"""Default partitioner.

Hashes key to partition using murmur2 hashing (from java client)
If key is None, selects partition randomly from available,
or from all partitions if none are currently available
"""
@classmethod
def __call__(cls, key, all_partitions, available):
"""
Get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
if key is None:
if available:
return random.choice(available)
return random.choice(all_partitions)

idx = murmur2(key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
48 changes: 48 additions & 0 deletions aiokafka/partitioner/hashed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import absolute_import

from kafka.partitioner.base import Partitioner

from .murmur2 import murmur2


class Murmur2Partitioner(Partitioner):
"""
Implements a partitioner which selects the target partition based on
the hash of the key. Attempts to apply the same hashing
function as mainline java client.
"""
def __call__(self, key, partitions=None, available=None):
if available:
return self.partition(key, available)
return self.partition(key, partitions)

def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions

# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69
idx = (murmur2(key) & 0x7fffffff) % len(partitions)

return partitions[idx]


class LegacyPartitioner(object):
"""DEPRECATED -- See Issue 374

Implements a partitioner which selects the target partition based on
the hash of the key
"""
def __init__(self, partitions):
self.partitions = partitions

def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions
size = len(partitions)
idx = hash(key) % size

return partitions[idx]


# Default will change to Murmur2 in 0.10 release
HashedPartitioner = LegacyPartitioner
63 changes: 63 additions & 0 deletions aiokafka/partitioner/murmur2.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from cpython cimport (
PyBytes_AS_STRING
)
from libc.stdint cimport uint32_t


# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
def murmur2(bytes in_bytes):
"""Cython Murmur2 implementation.

Based on java client, see org.apache.kafka.common.utils.Utils.murmur2

Args:
data (bytes): opaque bytes

Returns: MurmurHash2 of data
"""
cdef uint32_t length, seed, r, length4, i, i4, extra_bytes
cdef uint32_t m, h, k
cdef char* data

data = PyBytes_AS_STRING(in_bytes)
length = len(data)
seed = 0x9747b28c
# 'm' and 'r' are mixing constants generated offline.
# They're not really 'magic', they just happen to work well.
m = 0x5bd1e995
r = 24

# Initialize the hash to a random value
h = seed ^ length
length4 = length // 4

for i in range(length4):
i4 = i * 4
k = (
(data[i4 + 0] & 0xff) +
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24)
)
k *= m
k ^= k >> r # k ^= k >>> r
k *= m

h *= m
h ^= k

# Handle the last few bytes of the input array
extra_bytes = length % 4
if extra_bytes >= 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
if extra_bytes >= 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
if extra_bytes >= 1:
h ^= (data[length & ~3] & 0xff)
h *= m

h ^= h >> 13 # h >>> 13;
h *= m
h ^= h >> 15 # h >>> 15;

return h
2 changes: 1 addition & 1 deletion aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import traceback
import warnings

from kafka.partitioner.default import DefaultPartitioner
from kafka.codec import has_gzip, has_snappy, has_lz4

from aiokafka.client import AIOKafkaClient
Expand All @@ -17,6 +16,7 @@
)

from .message_accumulator import MessageAccumulator
from ..partitioner import DefaultPartitioner
from .sender import Sender
from .transaction_manager import TransactionManager

Expand Down
2 changes: 1 addition & 1 deletion benchmark/README
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
The batch compose and read benchmarks in this section are written using
``perf`` library, created by Viktor Stinner. For more information on how to get
``perf`` library, created by Victor Stinner. For more information on how to get
reliable results of test runs please consult
http://pyperf.readthedocs.io/en/latest/run_benchmark.html.

Expand Down
29 changes: 29 additions & 0 deletions benchmark/murmur2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python3
import pyperf

from kafka.partitioner.hashed import murmur2 as murmur2_kafka
from aiokafka.partitioner import murmur2


def run_murmur2(loops: int):
data = bytes(range(10))
t0 = pyperf.perf_counter()
for _ in range(loops):
murmur2(data)
res = pyperf.perf_counter() - t0

return res

def run_murmur2_kafka(loops: int):
data = bytes(range(10))
t0 = pyperf.perf_counter()
for _ in range(loops):
murmur2_kafka(data)
res = pyperf.perf_counter() - t0

return res


runner = pyperf.Runner()
runner.bench_time_func('murmur2 cython realization', run_murmur2)
runner.bench_time_func('murmur2 python realization', run_murmur2_kafka)
7 changes: 7 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@
extra_compile_args=CFLAGS,
extra_link_args=LDFLAGS
),
Extension(
'aiokafka.partitioner.murmur2',
['aiokafka/partitioner/murmur2' + ext],
libraries=LIBRARIES,
extra_compile_args=CFLAGS,
extra_link_args=LDFLAGS
),
]


Expand Down
72 changes: 72 additions & 0 deletions tests/test_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from __future__ import absolute_import

import pytest

from aiokafka.partitioner import (
DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner, murmur2,
)


def test_default_partitioner():
partitioner = DefaultPartitioner()
all_partitions = available = list(range(100))
# partitioner should return the same partition for the same key
p1 = partitioner(b'foo', all_partitions, available)
p2 = partitioner(b'foo', all_partitions, available)
assert p1 == p2
assert p1 in all_partitions

# when key is None, choose one of available partitions
assert partitioner(None, all_partitions, [123]) == 123

# with fallback to all_partitions
assert partitioner(None, all_partitions, []) in all_partitions


def test_roundrobin_partitioner():
partitioner = RoundRobinPartitioner()
all_partitions = available = list(range(100))
# partitioner should cycle between partitions
i = 0
max_partition = all_partitions[len(all_partitions) - 1]
while i <= max_partition:
assert i == partitioner(None, all_partitions, available)
i += 1

i = 0
while i <= int(max_partition / 2):
assert i == partitioner(None, all_partitions, available)
i += 1

# test dynamic partition re-assignment
available = available[:-25]

while i <= max(available):
assert i == partitioner(None, all_partitions, available)
i += 1

all_partitions = list(range(200))
available = all_partitions

max_partition = all_partitions[len(all_partitions) - 1]
while i <= max_partition:
assert i == partitioner(None, all_partitions, available)
i += 1


@pytest.mark.parametrize("bytes_payload,partition_number", [
(b'', 681), (b'a', 524), (b'ab', 434), (b'abc', 107), (b'123456789', 566),
(b'\x00 ', 742)
])
def test_murmur2_java_compatibility(bytes_payload, partition_number):
p = Murmur2Partitioner(range(1000))
# compare with output from Kafka's
# org.apache.kafka.clients.producer.Partitioner
assert p.partition(bytes_payload) == partition_number


def test_murmur2_not_ascii():
# Verify no regression of murmur2() bug encoding py2
# bytes that don't ascii encode
murmur2(b'\xa4')
murmur2(b'\x81' * 1000)