Skip to content

Commit

Permalink
CURATOR-725: Allow for global compression (#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman authored Jan 17, 2025
1 parent 914f2f7 commit 8eb6f9a
Show file tree
Hide file tree
Showing 25 changed files with 1,100 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ public interface CuratorFramework extends Closeable {
*/
SchemaSet getSchemaSet();

/**
* Return whether compression is enabled by default for all create, setData and getData operations.
*
* @return if compression is enabled
*/
boolean compressionEnabled();

/**
* Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
* done from the {@link #runSafe(Runnable)} thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public static class Builder {
private List<AuthInfo> authInfos = null;
private byte[] defaultData = LOCAL_ADDRESS;
private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER;
private boolean compressionEnabled = false;
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
Expand Down Expand Up @@ -367,6 +368,18 @@ public Builder compressionProvider(CompressionProvider compressionProvider) {
return this;
}

/**
* By default, each write or read call must explicitly use compression.
* Call this method to enable compression by default on all read and write calls.
* <p>
* In order to implement filtered compression, use this option and a custom {@link CompressionProvider} that only compresses and decompresses the zNodes that match the desired filter.
* @return this
*/
public Builder enableCompression() {
this.compressionEnabled = true;
return this;
}

/**
* @param zookeeperFactory the zookeeper factory to use
* @return this
Expand Down Expand Up @@ -542,6 +555,10 @@ public CompressionProvider getCompressionProvider() {
return compressionProvider;
}

public boolean compressionEnabled() {
return compressionEnabled;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface Compressible<T> {
* @return this
*/
public T compressed();

/**
* Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework}
* has compressionEnabled
*
* @return this
*/
public T uncompressed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface Decompressible<T> {
* @return this
*/
public T decompressed();

/**
* Cause the data to not be de-compressed, even if the {@link org.apache.curator.framework.CuratorFramework}
* has compressionEnabled
*
* @return this
*/
public T undecompressed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class CreateBuilderImpl
acling = new ACLing(client.getAclProvider());
createParentsIfNeeded = false;
createParentsAsContainers = false;
compress = false;
compress = client.compressionEnabled();
setDataIfExists = false;
storingStat = null;
ttl = -1;
Expand Down Expand Up @@ -193,6 +193,12 @@ public ACLCreateModePathAndBytesable<T> compressed() {
return this;
}

@Override
public ACLCreateModePathAndBytesable<T> uncompressed() {
CreateBuilderImpl.this.uncompressed();
return this;
}

@Override
public T forPath(String path) throws Exception {
return forPath(path, client.getDefaultData());
Expand All @@ -216,7 +222,16 @@ public T forPath(String path, byte[] data) throws Exception {

@Override
public CreateBackgroundModeStatACLable compressed() {
compress = true;
return withCompression(true);
}

@Override
public CreateBackgroundModeStatACLable uncompressed() {
return withCompression(false);
}

private CreateBackgroundModeStatACLable withCompression(boolean compress) {
this.compress = compress;
return new CreateBackgroundModeStatACLable() {
@Override
public CreateBackgroundModeACLable storingStatIn(Stat stat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
private final FailedDeleteManager failedDeleteManager;
private final FailedRemoveWatchManager failedRemoveWatcherManager;
private final CompressionProvider compressionProvider;
private final boolean compressionEnabled;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final boolean useContainerParentsIfAvailable;
Expand Down Expand Up @@ -185,6 +186,7 @@ public void process(WatchedEvent watchedEvent) {
builder.getSimulatedSessionExpirationPercent(),
builder.getConnectionStateListenerManagerFactory());
compressionProvider = builder.getCompressionProvider();
compressionEnabled = builder.compressionEnabled();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
Expand Down Expand Up @@ -284,6 +286,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) {
failedDeleteManager = parent.failedDeleteManager;
failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
compressionProvider = parent.compressionProvider;
compressionEnabled = parent.compressionEnabled;
aclProvider = parent.aclProvider;
namespaceFacadeCache = parent.namespaceFacadeCache;
namespace = parent.namespace;
Expand Down Expand Up @@ -628,6 +631,11 @@ public SchemaSet getSchemaSet() {
return schemaSet;
}

@Override
public boolean compressionEnabled() {
return compressionEnabled;
}

ACLProvider getAclProvider() {
return aclProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<S
responseStat = null;
watching = new Watching(client);
backgrounding = new Backgrounding();
decompress = false;
decompress = client.compressionEnabled();
}

public GetDataBuilderImpl(
Expand All @@ -64,7 +64,16 @@ public GetDataBuilderImpl(

@Override
public GetDataWatchBackgroundStatable decompressed() {
decompress = true;
return withDecompression(true);
}

@Override
public GetDataWatchBackgroundStatable undecompressed() {
return withDecompression(false);
}

private GetDataWatchBackgroundStatable withDecompression(boolean decompress) {
this.decompress = decompress;
return new GetDataWatchBackgroundStatable() {
@Override
public ErrorListenerPathable<byte[]> inBackground() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class SetDataBuilderImpl
this.client = client;
backgrounding = new Backgrounding();
version = -1;
compress = false;
compress = client.compressionEnabled();
}

public SetDataBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, int version, boolean compress) {
Expand Down Expand Up @@ -94,12 +94,27 @@ public VersionPathAndBytesable<T> compressed() {
compress = true;
return this;
}

@Override
public VersionPathAndBytesable<T> uncompressed() {
compress = false;
return this;
}
};
}

@Override
public SetDataBackgroundVersionable compressed() {
compress = true;
return withCompression(true);
}

@Override
public SetDataBackgroundVersionable uncompressed() {
return withCompression(false);
}

public SetDataBackgroundVersionable withCompression(boolean compress) {
this.compress = compress;
return new SetDataBackgroundVersionable() {
@Override
public ErrorListenerPathAndBytesable<Stat> inBackground() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder {
TempGetDataBuilderImpl(CuratorFrameworkImpl client) {
this.client = client;
responseStat = null;
decompress = false;
decompress = client.compressionEnabled();
}

@Override
Expand All @@ -44,6 +44,12 @@ public StatPathable<byte[]> decompressed() {
return this;
}

@Override
public StatPathable<byte[]> undecompressed() {
decompress = false;
return this;
}

@Override
public Pathable<byte[]> storingStatIn(Stat stat) {
responseStat = stat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CompressionProvider;
Expand Down Expand Up @@ -97,6 +99,73 @@ public void testSetData() throws Exception {
}
}

@Test
public void testSetDataGlobalCompression() throws Exception {
final byte[] data = "here's a string".getBytes();
final byte[] gzipedData = GzipCompressionProvider.doCompress(data);

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.enableCompression()
.build();
try {
client.start();

// Create with explicit compression
client.create().compressed().creatingParentsIfNeeded().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
assertEquals(
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());

// Create explicitly without compression
client.delete().forPath("/a/b/c");
client.create().uncompressed().creatingParentsIfNeeded().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().undecompressed().forPath("/a/b/c"));
assertThrows(
ZipException.class, () -> client.getData().decompressed().forPath("/a/b/c"));
assertThrows(ZipException.class, () -> client.getData().forPath("/a/b/c"));
assertEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());

// Create with implicit (global) compression
client.delete().forPath("/a/b/c");
client.create().creatingParentsIfNeeded().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
assertEquals(
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());

// SetData with explicit compression
client.setData().compressed().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
assertEquals(
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());

// SetData explicitly without compression
client.setData().uncompressed().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().undecompressed().forPath("/a/b/c"));
assertThrows(
ZipException.class, () -> client.getData().decompressed().forPath("/a/b/c"));
assertThrows(ZipException.class, () -> client.getData().forPath("/a/b/c"));
assertEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());

// SetData with implicit (global) compression
client.setData().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().forPath("/a/b/c"));
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c"));
assertEquals(
gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength());
} finally {
CloseableUtils.closeQuietly(client);
}
}

@Test
public void testSimple() throws Exception {
final byte[] data = "here's a string".getBytes();
Expand Down
Loading

0 comments on commit 8eb6f9a

Please sign in to comment.