Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CURATOR-725: Allow for global compression #512

Merged
merged 6 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ public interface CuratorFramework extends Closeable {
*/
SchemaSet getSchemaSet();

/**
* Return whether compression is enabled by default for all create, setData and getData operations.
*
* @return if compression is enabled
*/
boolean compressionEnabled();

/**
* Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
* done from the {@link #runSafe(Runnable)} thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public static class Builder {
private List<AuthInfo> authInfos = null;
private byte[] defaultData = LOCAL_ADDRESS;
private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER;
private boolean compressionEnabled = false;
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
Expand Down Expand Up @@ -367,6 +368,18 @@ public Builder compressionProvider(CompressionProvider compressionProvider) {
return this;
}

/**
* By default, each write or read call must explicitly use compression.
* Call this method to enable compression by default on all read and write calls.
* <p>
* In order to implement filtered compression, use this option and a custom {@link CompressionProvider} that only compresses and decompresses the zNodes that match the desired filter.
* @return this
*/
public Builder enableCompression() {
this.compressionEnabled = true;
return this;
}

/**
* @param zookeeperFactory the zookeeper factory to use
* @return this
Expand Down Expand Up @@ -542,6 +555,10 @@ public CompressionProvider getCompressionProvider() {
return compressionProvider;
}

public boolean compressionEnabled() {
return compressionEnabled;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface Compressible<T> {
* @return this
*/
public T compressed();

/**
* Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework}
* has compressionEnabled
*
* @return this
*/
public T uncompressed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface Decompressible<T> {
* @return this
*/
public T decompressed();

/**
* Cause the data to not be de-compressed, even if the {@link org.apache.curator.framework.CuratorFramework}
* has compressionEnabled
*
* @return this
*/
public T undecompressed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class CreateBuilderImpl
acling = new ACLing(client.getAclProvider());
createParentsIfNeeded = false;
createParentsAsContainers = false;
compress = false;
compress = client.compressionEnabled();
setDataIfExists = false;
storingStat = null;
ttl = -1;
Expand Down Expand Up @@ -193,6 +193,12 @@ public ACLCreateModePathAndBytesable<T> compressed() {
return this;
}

@Override
public ACLCreateModePathAndBytesable<T> uncompressed() {
CreateBuilderImpl.this.uncompressed();
return this;
}

@Override
public T forPath(String path) throws Exception {
return forPath(path, client.getDefaultData());
Expand All @@ -216,7 +222,16 @@ public T forPath(String path, byte[] data) throws Exception {

@Override
public CreateBackgroundModeStatACLable compressed() {
compress = true;
return withCompression(true);
}

@Override
public CreateBackgroundModeStatACLable uncompressed() {
return withCompression(false);
}

private CreateBackgroundModeStatACLable withCompression(boolean compress) {
this.compress = compress;
return new CreateBackgroundModeStatACLable() {
@Override
public CreateBackgroundModeACLable storingStatIn(Stat stat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
private final FailedDeleteManager failedDeleteManager;
private final FailedRemoveWatchManager failedRemoveWatcherManager;
private final CompressionProvider compressionProvider;
private final boolean compressionEnabled;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final boolean useContainerParentsIfAvailable;
Expand Down Expand Up @@ -184,6 +185,7 @@ public void process(WatchedEvent watchedEvent) {
builder.getSimulatedSessionExpirationPercent(),
builder.getConnectionStateListenerManagerFactory());
compressionProvider = builder.getCompressionProvider();
compressionEnabled = builder.compressionEnabled();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
Expand Down Expand Up @@ -283,6 +285,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) {
failedDeleteManager = parent.failedDeleteManager;
failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
compressionProvider = parent.compressionProvider;
compressionEnabled = parent.compressionEnabled;
aclProvider = parent.aclProvider;
namespaceFacadeCache = parent.namespaceFacadeCache;
namespace = parent.namespace;
Expand Down Expand Up @@ -618,6 +621,11 @@ public SchemaSet getSchemaSet() {
return schemaSet;
}

@Override
public boolean compressionEnabled() {
return compressionEnabled;
}

ACLProvider getAclProvider() {
return aclProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<S
responseStat = null;
watching = new Watching(client);
backgrounding = new Backgrounding();
decompress = false;
decompress = client.compressionEnabled();
}

public GetDataBuilderImpl(
Expand All @@ -64,7 +64,16 @@ public GetDataBuilderImpl(

@Override
public GetDataWatchBackgroundStatable decompressed() {
decompress = true;
return withDecompression(true);
}

@Override
public GetDataWatchBackgroundStatable undecompressed() {
return withDecompression(false);
}

private GetDataWatchBackgroundStatable withDecompression(boolean decompress) {
this.decompress = decompress;
return new GetDataWatchBackgroundStatable() {
@Override
public ErrorListenerPathable<byte[]> inBackground() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class SetDataBuilderImpl
this.client = client;
backgrounding = new Backgrounding();
version = -1;
compress = false;
compress = client.compressionEnabled();
}

public SetDataBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, int version, boolean compress) {
Expand Down Expand Up @@ -94,12 +94,27 @@ public VersionPathAndBytesable<T> compressed() {
compress = true;
return this;
}

@Override
public VersionPathAndBytesable<T> uncompressed() {
compress = false;
return this;
}
};
}

@Override
public SetDataBackgroundVersionable compressed() {
compress = true;
return withCompression(true);
}

@Override
public SetDataBackgroundVersionable uncompressed() {
return withCompression(false);
}

public SetDataBackgroundVersionable withCompression(boolean compress) {
this.compress = compress;
return new SetDataBackgroundVersionable() {
@Override
public ErrorListenerPathAndBytesable<Stat> inBackground() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder {
TempGetDataBuilderImpl(CuratorFrameworkImpl client) {
this.client = client;
responseStat = null;
decompress = false;
decompress = client.compressionEnabled();
}

@Override
Expand All @@ -44,6 +44,12 @@ public StatPathable<byte[]> decompressed() {
return this;
}

@Override
public StatPathable<byte[]> undecompressed() {
decompress = false;
return this;
}

@Override
public Pathable<byte[]> storingStatIn(Stat stat) {
responseStat = stat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CompressionProvider;
Expand Down Expand Up @@ -97,6 +99,73 @@ public void testSetData() throws Exception {
}
}

@Test
public void testSetDataGlobalCompression() throws Exception {
final byte[] data = "here's a string".getBytes();
final byte[] gzipedData = GzipCompressionProvider.doCompress(data);

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.enableCompression()
.build();
try {
client.start();

// Create with explicit compression
client.create().compressed().creatingParentsIfNeeded().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
assertEquals(
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());

// Create explicitly without compression
client.delete().forPath("/a/b/c");
client.create().uncompressed().creatingParentsIfNeeded().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().undecompressed().forPath("/a/b/c"));
assertThrows(
ZipException.class, () -> client.getData().decompressed().forPath("/a/b/c"));
assertThrows(ZipException.class, () -> client.getData().forPath("/a/b/c"));
assertEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());

// Create with implicit (global) compression
client.delete().forPath("/a/b/c");
client.create().creatingParentsIfNeeded().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
assertEquals(
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());

// SetData with explicit compression
client.setData().compressed().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
assertEquals(
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());

// SetData explicitly without compression
client.setData().uncompressed().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().undecompressed().forPath("/a/b/c"));
assertThrows(
ZipException.class, () -> client.getData().decompressed().forPath("/a/b/c"));
assertThrows(ZipException.class, () -> client.getData().forPath("/a/b/c"));
assertEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());

// SetData with implicit (global) compression
client.setData().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
assertEquals(
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());
} finally {
CloseableUtils.closeQuietly(client);
}
}

@Test
public void testSimple() throws Exception {
final byte[] data = "here's a string".getBytes();
Expand Down
Loading
Loading