Skip to content

Commit

Permalink
Support EXPANSION and NONSCALING options (#54)
Browse files Browse the repository at this point in the history
* Support EXPANSION and NONSCALING options

* edit

* Placed BF tests contiguously

* add tests and modify source

* address review and further changes
  • Loading branch information
sazzad16 authored Oct 4, 2021
1 parent 8b89abf commit 40ad6ea
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 35 deletions.
63 changes: 51 additions & 12 deletions src/main/java/io/rebloom/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.Pool;
import redis.clients.jedis.util.SafeEncoder;
Expand Down Expand Up @@ -97,10 +98,27 @@ Jedis _conn() {
*/
public void createFilter(String name, long initCapacity, double errorRate) {
try (Jedis conn = _conn()) {
String rep = sendCommand(conn, Command.RESERVE, SafeEncoder.encode(name), Protocol.toByteArray(errorRate), Protocol.toByteArray(initCapacity)).getStatusCodeReply();
if (!rep.equals("OK")) {
throw new JedisException(rep);
String rep = sendCommand(conn, Command.RESERVE, SafeEncoder.encode(name),
Protocol.toByteArray(errorRate), Protocol.toByteArray(initCapacity)).getStatusCodeReply();
checkOK(rep);
}
}

public void bfReserve(String key, double errorRate, long capacity) {
bfReserve(key, errorRate, capacity, null);
}

public void bfReserve(String key, double errorRate, long capacity, ReserveParams params) {
try (Jedis conn = _conn()) {
final List<byte[]> args = new ArrayList<>();
args.add(SafeEncoder.encode(key));
args.add(Protocol.toByteArray(errorRate));
args.add(Protocol.toByteArray(capacity));
if (params != null) {
args.addAll(params.getParams());
}
String response = sendCommand(conn, Command.RESERVE, args).getStatusCodeReply();
checkOK(response);
}
}

Expand Down Expand Up @@ -154,6 +172,33 @@ private boolean[] sendMultiCommand(Command cmd, byte[] name, byte[]... values) {
}
}

public boolean[] bfInsert(String key, String... items) {
return bfInsert(key, (InsertOptions) null, (ReserveParams) null, items);
}

public boolean[] bfInsert(String key, InsertOptions insertOptions, ReserveParams reserveParams, String... items) {
final List<byte[]> args = new ArrayList<>();
args.add(SafeEncoder.encode(key));
if (insertOptions != null) {
args.addAll(insertOptions.getOptions());
}
if (reserveParams != null) {
args.addAll(reserveParams.getParams());
}
args.add(Keywords.ITEMS.getRaw());
for (String item : items) {
args.add(SafeEncoder.encode(item));
}
try (Jedis conn = _conn()) {
List<Object> listResp = sendCommand(conn, Command.INSERT, args).getObjectMultiBulkReply();
final int lastIndex = listResp.size() - 1;
if (listResp.get(lastIndex) instanceof JedisDataException) {
listResp.remove(lastIndex);
}
return toBooleanArray(BuilderFactory.BOOLEAN_LIST.build(listResp));
}
}

private static boolean[] toBooleanArray(List<Boolean> list) {
if (list == null) {
return null;
Expand Down Expand Up @@ -504,9 +549,7 @@ public void cfCreate(String key, CFReserveOptions options) {

String rep = sendCommand(conn, CuckooCommand.RESERVE, fullArgs).getStatusCodeReply();

if (!rep.equals("OK")) {
throw new JedisException(rep);
}
checkOK(rep);
}
}

Expand All @@ -518,9 +561,7 @@ public void cfCreate(String key, long capacity) {
Protocol.toByteArray(capacity) //
).getStatusCodeReply();

if (!rep.equals("OK")) {
throw new JedisException(rep);
}
checkOK(rep);
}
}

Expand Down Expand Up @@ -665,9 +706,7 @@ public void cfLoadChunk(String key, Map.Entry<Long, byte[]> idp) {
idp.getValue() //
).getStatusCodeReply();

if (!rep.equals("OK")) {
throw new JedisException(rep);
}
checkOK(rep);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/rebloom/client/InsertOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
public class InsertOptions {
private final List<byte[]> options = new ArrayList<>();

public static InsertOptions insertOptions() {
return new InsertOptions();
}
/**
* If specified, should be followed by the desired capacity for the filter to be created
* @param capacity
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/rebloom/client/Keywords.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.util.SafeEncoder;

import java.util.Locale;

public enum Keywords implements ProtocolCommand {
CAPACITY,
ERROR,
NOCREATE,
EXPANSION,
NONSCALING,
ITEMS;

private final byte[] raw;

Keywords() {
raw = SafeEncoder.encode(this.name().toLowerCase(Locale.ENGLISH));
raw = SafeEncoder.encode(name());
}

@Override
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/io/rebloom/client/ReserveParams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.rebloom.client;

import static io.rebloom.client.Keywords.EXPANSION;
import static io.rebloom.client.Keywords.NONSCALING;
import static redis.clients.jedis.Protocol.toByteArray;

import java.util.ArrayList;
import java.util.List;

/**
* To be used by both BF.RESERVE and BR.INSERT commands.
* <p>
* Supported arguments:
* <ul><li>EXPANSION</li><li>NONSCALING</li></ul>
*/
public class ReserveParams {

private int expansion = 0;
private boolean nonScaling = false;

public ReserveParams() {
}

/**
* @return ReserveParams
* @see ReserveParams
*/
public static ReserveParams reserveParams() {
return new ReserveParams();
}

public ReserveParams expansion(int expansion) {
this.expansion = expansion;
return this;
}

public ReserveParams nonScaling() {
this.nonScaling = true;
return this;
}

public List<byte[]> getParams() {
List<byte[]> args = new ArrayList<>();
if (expansion > 0) {
args.add(EXPANSION.getRaw());
args.add(toByteArray(expansion));
}
if (nonScaling) {
args.add(NONSCALING.getRaw());
}
return args;
}
}
91 changes: 71 additions & 20 deletions src/test/java/io/rebloom/client/ClientTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.rebloom.client;

import static org.junit.Assert.*;

import java.util.Arrays;
import java.util.Map;
import org.junit.Test;
Expand All @@ -8,9 +10,6 @@
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;

import static junit.framework.TestCase.*;
import static org.junit.Assert.assertThrows;

/**
* @author Mark Nunberg
*/
Expand Down Expand Up @@ -51,6 +50,39 @@ public void reserveAlreadyExists() {
cl.createFilter("myBloom", 100, 0.1);
}

@Test
public void reserveV2() {
cl.bfReserve("reserve-basic", 0.001, 2);
assertArrayEquals(new boolean[]{true}, cl.bfInsert("reserve-basic", "a"));
assertArrayEquals(new boolean[]{true}, cl.bfInsert("reserve-basic", "b"));
assertArrayEquals(new boolean[]{true}, cl.bfInsert("reserve-basic", "c"));
}

@Test
public void reserveEmptyParams() {
cl.bfReserve("empty-param", 0.001, 2, ReserveParams.reserveParams());
assertArrayEquals(new boolean[]{true}, cl.bfInsert("empty-param", "a"));
assertArrayEquals(new boolean[]{true}, cl.bfInsert("empty-param", "b"));
assertArrayEquals(new boolean[]{true}, cl.bfInsert("empty-param", "c"));
}

@Test
public void reserveNonScaling() {
cl.bfReserve("nonscaling", 0.001, 2, ReserveParams.reserveParams().nonScaling());
assertArrayEquals(new boolean[]{true}, cl.bfInsert("nonscaling", "a"));
assertArrayEquals(new boolean[]{true}, cl.bfInsert("nonscaling", "b"));
assertArrayEquals(new boolean[]{}, cl.bfInsert("nonscaling", "c"));
}

@Test
public void reserveExpansion() {
// bf.reserve bfexpansion 0.001 1000 expansion 4
cl.bfReserve("bfexpansion", 0.001, 1000, ReserveParams.reserveParams().expansion(4));
assertArrayEquals(new boolean[]{true}, cl.bfInsert("bfexpansion", "a"));
assertArrayEquals(new boolean[]{true}, cl.bfInsert("bfexpansion",
InsertOptions.insertOptions().nocreate(), (ReserveParams) null, "b"));
}

@Test
public void addExistsString() {
assertTrue(cl.add("newFilter", "foo"));
Expand Down Expand Up @@ -130,23 +162,6 @@ public void testExample() {
client.createFilter("specialBloom", 10000, 0.0001);
client.add("specialBloom", "foo");
}

@Test
public void createTopKFilter() {
cl.topkCreateFilter("aaa", 30, 2000, 7, 0.925);

assertEquals(Arrays.asList(null, null), cl.topkAdd("aaa", "bb", "cc"));

assertEquals(Arrays.asList(true, false, true), cl.topkQuery("aaa", "bb", "gg", "cc"));

assertEquals(Arrays.asList(1L, 0L, 1L), cl.topkCount("aaa", "bb", "gg", "cc"));

assertTrue( cl.topkList("aaa").stream().allMatch( s -> Arrays.asList("bb", "cc").contains(s) || s == null));

assertEquals(null, cl.topkIncrBy("aaa", "ff", 10));

assertTrue( cl.topkList("aaa").stream().allMatch( s -> Arrays.asList("bb", "cc", "ff").contains(s) || s == null));
}

@Test
public void testInsert() {
Expand Down Expand Up @@ -178,4 +193,40 @@ public void testInfo() {
Exception exception = assertThrows(JedisDataException.class, () -> cl.info("not_exist"));
assertEquals("ERR not found", exception.getMessage());
}

@Test
public void insertNonScaling() {
boolean[] insert = cl.bfInsert("nonscaling_err", InsertOptions.insertOptions().capacity(4),
ReserveParams.reserveParams().nonScaling(), "a", "b", "c");
assertEquals(3, insert.length);

insert = cl.bfInsert("nonscaling_err", "d", "e");
assertEquals(1, insert.length);
}

@Test
public void insertExpansion() {
// BF.INSERT bfexpansion CAPACITY 3 expansion 3 ITEMS a b c d e f g h j k l o i u y t r e w q
boolean[] insert = cl.bfInsert("bfexpansion", InsertOptions.insertOptions().capacity(3),
ReserveParams.reserveParams().expansion(3), "a", "b", "c", "d", "e", "f", "g", "h",
"j", "k", "l", "o", "i", "u", "y", "t", "r", "e", "w", "q");
assertEquals(20, insert.length);
}

@Test
public void createTopKFilter() {
cl.topkCreateFilter("aaa", 30, 2000, 7, 0.925);

assertEquals(Arrays.asList(null, null), cl.topkAdd("aaa", "bb", "cc"));

assertEquals(Arrays.asList(true, false, true), cl.topkQuery("aaa", "bb", "gg", "cc"));

assertEquals(Arrays.asList(1L, 0L, 1L), cl.topkCount("aaa", "bb", "gg", "cc"));

assertTrue(cl.topkList("aaa").stream().allMatch(s -> Arrays.asList("bb", "cc").contains(s) || s == null));

assertEquals(null, cl.topkIncrBy("aaa", "ff", 10));

assertTrue(cl.topkList("aaa").stream().allMatch(s -> Arrays.asList("bb", "cc", "ff").contains(s) || s == null));
}
}

0 comments on commit 40ad6ea

Please sign in to comment.