Skip to content

Commit

Permalink
feat: Add delta compression support (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
itismoej authored Oct 5, 2024
1 parent b2dc056 commit 3e0a8c4
Show file tree
Hide file tree
Showing 24 changed files with 625 additions and 6 deletions.
31 changes: 31 additions & 0 deletions centrifuge/api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ package io.github.centrifugal.centrifuge {
method public io.github.centrifugal.centrifuge.ClientState! getState();
method public io.github.centrifugal.centrifuge.Subscription! getSubscription(String);
method public void history(String, io.github.centrifugal.centrifuge.HistoryOptions!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.HistoryResult!>!);
method public io.github.centrifugal.centrifuge.Subscription! newSubscription(String, io.github.centrifugal.centrifuge.SubscriptionOptions!, io.github.centrifugal.centrifuge.SubscriptionEventListener!) throws io.github.centrifugal.centrifuge.DuplicateSubscriptionException;
method public io.github.centrifugal.centrifuge.Subscription! newSubscription(String, io.github.centrifugal.centrifuge.SubscriptionEventListener!) throws io.github.centrifugal.centrifuge.DuplicateSubscriptionException;
method public void presence(String, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PresenceResult!>!);
method public void presenceStats(String, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PresenceStatsResult!>!);
method public void publish(String, byte[]!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PublishResult!>!);
method public void removeSubscription(io.github.centrifugal.centrifuge.Subscription!);
method public void rpc(String, byte[]!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.RPCResult!>!);
method public void send(byte[]!, io.github.centrifugal.centrifuge.CompletionCallback!);
method public void setToken(String);
}

public class ClientInfo {
Expand Down Expand Up @@ -46,6 +48,10 @@ package io.github.centrifugal.centrifuge {
method public void onDone(Throwable);
}

public class ConfigurationError {
method public Throwable getError();
}

public class ConnectedEvent {
ctor public ConnectedEvent();
method public String getClient();
Expand Down Expand Up @@ -82,6 +88,7 @@ package io.github.centrifugal.centrifuge {

public class ErrorEvent {
method public Throwable getError();
method public Integer getHttpResponseCode();
}

public abstract class EventListener {
Expand All @@ -99,6 +106,17 @@ package io.github.centrifugal.centrifuge {
method public void onUnsubscribed(io.github.centrifugal.centrifuge.Client!, io.github.centrifugal.centrifuge.ServerUnsubscribedEvent!);
}

public class Fossil {
ctor public Fossil();
method public static byte[]! applyDelta(byte[]!, byte[]!);
method public static long checksum(byte[]!);
}

public class FossilTest {
ctor public FossilTest();
method public void testApplyDelta();
}

public class HistoryOptions {
method public int getLimit();
method public boolean getReverse();
Expand Down Expand Up @@ -182,6 +200,7 @@ package io.github.centrifugal.centrifuge {
public class Publication {
ctor public Publication();
method public byte[]! getData();
method public io.github.centrifugal.centrifuge.ClientInfo! getInfo();
method public long getOffset();
}

Expand Down Expand Up @@ -284,11 +303,13 @@ package io.github.centrifugal.centrifuge {

public class Subscription {
method public String getChannel();
method public byte[]! getPrevData();
method public io.github.centrifugal.centrifuge.SubscriptionState! getState();
method public void history(io.github.centrifugal.centrifuge.HistoryOptions!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.HistoryResult!>!);
method public void presence(io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PresenceResult!>!);
method public void presenceStats(io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PresenceStatsResult!>!);
method public void publish(byte[]!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PublishResult!>!);
method public void setPrevData(byte[]!);
method public void subscribe();
method public void unsubscribe();
}
Expand All @@ -311,6 +332,7 @@ package io.github.centrifugal.centrifuge {
public class SubscriptionOptions {
ctor public SubscriptionOptions();
method public byte[]! getData();
method public String getDelta();
method public int getMaxResubscribeDelay();
method public int getMinResubscribeDelay();
method public String getToken();
Expand All @@ -319,6 +341,7 @@ package io.github.centrifugal.centrifuge {
method public boolean isPositioned();
method public boolean isRecoverable();
method public void setData(byte[]!);
method public void setDelta(String);
method public void setJoinLeave(boolean);
method public void setMaxResubscribeDelay(int);
method public void setMinResubscribeDelay(int);
Expand Down Expand Up @@ -369,6 +392,14 @@ package io.github.centrifugal.centrifuge {
method public Throwable getError();
}

public class UnauthorizedException {
ctor public UnauthorizedException();
}

public class UnclassifiedError {
method public Throwable getError();
}

public class UnsubscribedEvent {
ctor public UnsubscribedEvent(int, String);
method public int getCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ private void failUnauthorized() {
this.processDisconnect(DISCONNECTED_UNAUTHORIZED, "unauthorized", false);
}

private void processReply(Protocol.Reply reply) {
private void processReply(Protocol.Reply reply) throws Exception {
if (reply.getId() > 0) {
CompletableFuture<Protocol.Reply> cf = this.futures.get(reply.getId());
if (cf != null) cf.complete(reply);
Expand All @@ -868,12 +868,18 @@ private void processReply(Protocol.Reply reply) {
}
}

private void handlePub(String channel, Protocol.Publication pub) {
private void handlePub(String channel, Protocol.Publication pub) throws Exception {
ClientInfo info = ClientInfo.fromProtocolClientInfo(pub.getInfo());
Subscription sub = this.getSub(channel);
if (sub != null) {
PublicationEvent event = new PublicationEvent();
event.setData(pub.getData().toByteArray());
byte[] pubData = pub.getData().toByteArray();
byte[] prevData = sub.getPrevData();
if (prevData != null && pub.getDelta()) {
pubData = Fossil.applyDelta(prevData, pubData);
}
sub.setPrevData(pubData);
event.setData(pubData);
event.setInfo(info);
event.setOffset(pub.getOffset());
event.setTags(pub.getTagsMap());
Expand Down Expand Up @@ -974,7 +980,7 @@ private void handleDisconnect(Protocol.Disconnect disconnect) {
}
}

private void handlePush(Protocol.Push push) {
private void handlePush(Protocol.Push push) throws Exception {
String channel = push.getChannel();
if (push.hasPub()) {
this.handlePub(channel, push.getPub());
Expand Down
191 changes: 191 additions & 0 deletions centrifuge/src/main/java/io/github/centrifugal/centrifuge/Fossil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package io.github.centrifugal.centrifuge;

import java.io.ByteArrayOutputStream;


public class Fossil {

private static final int[] zValue = new int[] {
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, 4, 5, 6, 7,
8, 9, -1, -1, -1, -1, -1, -1, -1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, -1, -1, -1,
-1, 36, -1, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, -1, -1, -1, 63, -1,
};

// Reader class
static class Reader {
private byte[] a;
private int pos;

public Reader(byte[] array) {
this.a = array;
this.pos = 0;
}

public boolean haveBytes() {
return this.pos < this.a.length;
}

public int getByte() {
if (this.pos >= this.a.length) {
throw new IndexOutOfBoundsException("out of bounds");
}
int b = this.a[this.pos++] & 0xFF;
return b;
}

public char getChar() {
return (char) getByte();
}

public int getInt() {
int v = 0;
int c;
while (haveBytes() && (c = zValue[getByte() & 0x7F]) >= 0) {
v = (v << 6) + c;
}
this.pos--;
return v;
}
}

// Writer class
static class Writer {
private ByteArrayOutputStream a = new ByteArrayOutputStream();

public byte[] toByteArray() {
return a.toByteArray();
}

// Copy from array 'arr' from 'start' to 'end' (exclusive)
public void putArray(byte[] arr, int start, int end) {
if (start < 0 || end > arr.length || start > end) {
throw new IndexOutOfBoundsException("Invalid start or end index");
}
a.write(arr, start, end - start);
}
}

// Checksum function
public static long checksum(byte[] arr) {
int sum0 = 0, sum1 = 0, sum2 = 0, sum3 = 0;
int z = 0;
int N = arr.length;
while (N >= 16) {
sum0 += (arr[z + 0] & 0xFF);
sum1 += (arr[z + 1] & 0xFF);
sum2 += (arr[z + 2] & 0xFF);
sum3 += (arr[z + 3] & 0xFF);

sum0 += (arr[z + 4] & 0xFF);
sum1 += (arr[z + 5] & 0xFF);
sum2 += (arr[z + 6] & 0xFF);
sum3 += (arr[z + 7] & 0xFF);

sum0 += (arr[z + 8] & 0xFF);
sum1 += (arr[z + 9] & 0xFF);
sum2 += (arr[z + 10] & 0xFF);
sum3 += (arr[z + 11] & 0xFF);

sum0 += (arr[z + 12] & 0xFF);
sum1 += (arr[z + 13] & 0xFF);
sum2 += (arr[z + 14] & 0xFF);
sum3 += (arr[z + 15] & 0xFF);

z += 16;
N -= 16;
}
while (N >= 4) {
sum0 += (arr[z + 0] & 0xFF);
sum1 += (arr[z + 1] & 0xFF);
sum2 += (arr[z + 2] & 0xFF);
sum3 += (arr[z + 3] & 0xFF);
z += 4;
N -= 4;
}
sum3 += (sum2 << 8) + (sum1 << 16) + (sum0 << 24);
switch (N) {
case 3:
sum3 += (arr[z + 2] & 0xFF) << 8;
case 2:
sum3 += (arr[z + 1] & 0xFF) << 16;
case 1:
sum3 += (arr[z + 0] & 0xFF) << 24;
break;
default:
break;
}
return sum3 & 0xFFFFFFFFL;
}

/**
* Apply a delta byte array to a source byte array, returning the target byte array.
*/
public static byte[] applyDelta(byte[] source, byte[] delta) throws Exception {
int total = 0;
Reader zDelta = new Reader(delta);
int lenSrc = source.length;
int lenDelta = delta.length;

int limit = zDelta.getInt();
char c = zDelta.getChar();
if (c != '\n') {
throw new Exception("size integer not terminated by '\\n'");
}
Writer zOut = new Writer();
while (zDelta.haveBytes()) {
int cnt = zDelta.getInt();
int ofst;

c = zDelta.getChar();
switch (c) {
case '@':
ofst = zDelta.getInt();
if (zDelta.haveBytes() && zDelta.getChar() != ',') {
throw new Exception("copy command not terminated by ','");
}
total += cnt;
if (total > limit) {
throw new Exception("copy exceeds output file size");
}
if (ofst + cnt > lenSrc) {
throw new Exception("copy extends past end of input");
}
zOut.putArray(source, ofst, ofst + cnt);
break;

case ':':
total += cnt;
if (total > limit) {
throw new Exception("insert command gives an output larger than predicted");
}
if (cnt > lenDelta - zDelta.pos) {
throw new Exception("insert count exceeds size of delta");
}
zOut.putArray(zDelta.a, zDelta.pos, zDelta.pos + cnt);
zDelta.pos += cnt;
break;

case ';':
byte[] out = zOut.toByteArray();
long checksumValue = checksum(out);
if (cnt != (int) checksumValue) {
throw new Exception("bad checksum");
}
if (total != limit) {
throw new Exception("generated size does not match predicted size");
}
return out;

default:
System.out.println(c);
throw new Exception("unknown delta operator");
}
}
throw new Exception("unterminated delta");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ void setInfo(ClientInfo info) {

private ClientInfo info;


public long getOffset() {
return offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class Subscription {
private int resubscribeAttempts = 0;
private String token;
private com.google.protobuf.ByteString data;
private String delta;
private boolean deltaNegotiated;
private byte[] prevData;

Subscription(final Client client, final String channel, final SubscriptionEventListener listener, final SubscriptionOptions options) {
this.client = client;
Expand All @@ -38,6 +41,9 @@ public class Subscription {
if (opts.getData() != null) {
this.data = com.google.protobuf.ByteString.copyFrom(opts.getData());
}
this.prevData = null;
this.delta = "";
this.deltaNegotiated = false;
}

Subscription(final Client client, final String channel, final SubscriptionEventListener listener) {
Expand Down Expand Up @@ -172,6 +178,7 @@ void moveToSubscribed(Protocol.SubscribeResult result) {
this.recover = true;
}
this.setEpoch(result.getEpoch());
this.deltaNegotiated = result.getDelta();

byte[] data = null;
if (result.getData() != null) {
Expand Down Expand Up @@ -255,6 +262,7 @@ Protocol.SubscribeRequest createSubscribeRequest() {
builder.setPositioned(this.opts.isPositioned());
builder.setRecoverable(this.opts.isRecoverable());
builder.setJoinLeave(this.opts.isJoinLeave());
builder.setDelta(this.opts.getDelta());

return builder.build();
}
Expand Down Expand Up @@ -459,4 +467,12 @@ private void presenceStatsSynchronized(ResultCallback<PresenceStatsResult> cb) {
f.complete(null);
}
}

public byte[] getPrevData() {
return prevData;
}

public void setPrevData(byte[] prevData) {
this.prevData = prevData;
}
}
Loading

0 comments on commit 3e0a8c4

Please sign in to comment.