Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add delta compression support #74

Merged
merged 1 commit into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions centrifuge/api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ package io.github.centrifugal.centrifuge {
method public io.github.centrifugal.centrifuge.ClientState! getState();
method public io.github.centrifugal.centrifuge.Subscription! getSubscription(String);
method public void history(String, io.github.centrifugal.centrifuge.HistoryOptions!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.HistoryResult!>!);
method public io.github.centrifugal.centrifuge.Subscription! newSubscription(String, io.github.centrifugal.centrifuge.SubscriptionOptions!, io.github.centrifugal.centrifuge.SubscriptionEventListener!) throws io.github.centrifugal.centrifuge.DuplicateSubscriptionException;
method public io.github.centrifugal.centrifuge.Subscription! newSubscription(String, io.github.centrifugal.centrifuge.SubscriptionEventListener!) throws io.github.centrifugal.centrifuge.DuplicateSubscriptionException;
method public void presence(String, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PresenceResult!>!);
method public void presenceStats(String, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PresenceStatsResult!>!);
method public void publish(String, byte[]!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PublishResult!>!);
method public void removeSubscription(io.github.centrifugal.centrifuge.Subscription!);
method public void rpc(String, byte[]!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.RPCResult!>!);
method public void send(byte[]!, io.github.centrifugal.centrifuge.CompletionCallback!);
method public void setToken(String);
}

public class ClientInfo {
Expand Down Expand Up @@ -46,6 +48,10 @@ package io.github.centrifugal.centrifuge {
method public void onDone(Throwable);
}

public class ConfigurationError {
method public Throwable getError();
}

public class ConnectedEvent {
ctor public ConnectedEvent();
method public String getClient();
Expand Down Expand Up @@ -82,6 +88,7 @@ package io.github.centrifugal.centrifuge {

public class ErrorEvent {
method public Throwable getError();
method public Integer getHttpResponseCode();
}

public abstract class EventListener {
Expand All @@ -99,6 +106,17 @@ package io.github.centrifugal.centrifuge {
method public void onUnsubscribed(io.github.centrifugal.centrifuge.Client!, io.github.centrifugal.centrifuge.ServerUnsubscribedEvent!);
}

public class Fossil {
ctor public Fossil();
method public static byte[]! applyDelta(byte[]!, byte[]!);
method public static long checksum(byte[]!);
}

public class FossilTest {
ctor public FossilTest();
method public void testApplyDelta();
}

public class HistoryOptions {
method public int getLimit();
method public boolean getReverse();
Expand Down Expand Up @@ -182,6 +200,7 @@ package io.github.centrifugal.centrifuge {
public class Publication {
ctor public Publication();
method public byte[]! getData();
method public io.github.centrifugal.centrifuge.ClientInfo! getInfo();
method public long getOffset();
}

Expand Down Expand Up @@ -284,11 +303,13 @@ package io.github.centrifugal.centrifuge {

public class Subscription {
method public String getChannel();
method public byte[]! getPrevData();
method public io.github.centrifugal.centrifuge.SubscriptionState! getState();
method public void history(io.github.centrifugal.centrifuge.HistoryOptions!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.HistoryResult!>!);
method public void presence(io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PresenceResult!>!);
method public void presenceStats(io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PresenceStatsResult!>!);
method public void publish(byte[]!, io.github.centrifugal.centrifuge.ResultCallback<io.github.centrifugal.centrifuge.PublishResult!>!);
method public void setPrevData(byte[]!);
method public void subscribe();
method public void unsubscribe();
}
Expand All @@ -311,6 +332,7 @@ package io.github.centrifugal.centrifuge {
public class SubscriptionOptions {
ctor public SubscriptionOptions();
method public byte[]! getData();
method public String getDelta();
method public int getMaxResubscribeDelay();
method public int getMinResubscribeDelay();
method public String getToken();
Expand All @@ -319,6 +341,7 @@ package io.github.centrifugal.centrifuge {
method public boolean isPositioned();
method public boolean isRecoverable();
method public void setData(byte[]!);
method public void setDelta(String);
method public void setJoinLeave(boolean);
method public void setMaxResubscribeDelay(int);
method public void setMinResubscribeDelay(int);
Expand Down Expand Up @@ -369,6 +392,14 @@ package io.github.centrifugal.centrifuge {
method public Throwable getError();
}

public class UnauthorizedException {
ctor public UnauthorizedException();
}

public class UnclassifiedError {
method public Throwable getError();
}

public class UnsubscribedEvent {
ctor public UnsubscribedEvent(int, String);
method public int getCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ private void failUnauthorized() {
this.processDisconnect(DISCONNECTED_UNAUTHORIZED, "unauthorized", false);
}

private void processReply(Protocol.Reply reply) {
private void processReply(Protocol.Reply reply) throws Exception {
if (reply.getId() > 0) {
CompletableFuture<Protocol.Reply> cf = this.futures.get(reply.getId());
if (cf != null) cf.complete(reply);
Expand All @@ -868,12 +868,18 @@ private void processReply(Protocol.Reply reply) {
}
}

private void handlePub(String channel, Protocol.Publication pub) {
private void handlePub(String channel, Protocol.Publication pub) throws Exception {
ClientInfo info = ClientInfo.fromProtocolClientInfo(pub.getInfo());
Subscription sub = this.getSub(channel);
if (sub != null) {
PublicationEvent event = new PublicationEvent();
event.setData(pub.getData().toByteArray());
byte[] pubData = pub.getData().toByteArray();
byte[] prevData = sub.getPrevData();
if (prevData != null && pub.getDelta()) {
pubData = Fossil.applyDelta(prevData, pubData);
}
sub.setPrevData(pubData);
event.setData(pubData);
event.setInfo(info);
event.setOffset(pub.getOffset());
event.setTags(pub.getTagsMap());
Expand Down Expand Up @@ -974,7 +980,7 @@ private void handleDisconnect(Protocol.Disconnect disconnect) {
}
}

private void handlePush(Protocol.Push push) {
private void handlePush(Protocol.Push push) throws Exception {
String channel = push.getChannel();
if (push.hasPub()) {
this.handlePub(channel, push.getPub());
Expand Down
191 changes: 191 additions & 0 deletions centrifuge/src/main/java/io/github/centrifugal/centrifuge/Fossil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package io.github.centrifugal.centrifuge;

import java.io.ByteArrayOutputStream;


public class Fossil {

private static final int[] zValue = new int[] {
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, 4, 5, 6, 7,
8, 9, -1, -1, -1, -1, -1, -1, -1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, -1, -1, -1,
-1, 36, -1, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, -1, -1, -1, 63, -1,
};

// Reader class
static class Reader {
private byte[] a;
private int pos;

public Reader(byte[] array) {
this.a = array;
this.pos = 0;
}

public boolean haveBytes() {
return this.pos < this.a.length;
}

public int getByte() {
if (this.pos >= this.a.length) {
throw new IndexOutOfBoundsException("out of bounds");
}
int b = this.a[this.pos++] & 0xFF;
return b;
}

public char getChar() {
return (char) getByte();
}

public int getInt() {
int v = 0;
int c;
while (haveBytes() && (c = zValue[getByte() & 0x7F]) >= 0) {
v = (v << 6) + c;
}
this.pos--;
return v;
}
}

// Writer class
static class Writer {
private ByteArrayOutputStream a = new ByteArrayOutputStream();

public byte[] toByteArray() {
return a.toByteArray();
}

// Copy from array 'arr' from 'start' to 'end' (exclusive)
public void putArray(byte[] arr, int start, int end) {
if (start < 0 || end > arr.length || start > end) {
throw new IndexOutOfBoundsException("Invalid start or end index");
}
a.write(arr, start, end - start);
}
}

// Checksum function
public static long checksum(byte[] arr) {
int sum0 = 0, sum1 = 0, sum2 = 0, sum3 = 0;
int z = 0;
int N = arr.length;
while (N >= 16) {
sum0 += (arr[z + 0] & 0xFF);
sum1 += (arr[z + 1] & 0xFF);
sum2 += (arr[z + 2] & 0xFF);
sum3 += (arr[z + 3] & 0xFF);

sum0 += (arr[z + 4] & 0xFF);
sum1 += (arr[z + 5] & 0xFF);
sum2 += (arr[z + 6] & 0xFF);
sum3 += (arr[z + 7] & 0xFF);

sum0 += (arr[z + 8] & 0xFF);
sum1 += (arr[z + 9] & 0xFF);
sum2 += (arr[z + 10] & 0xFF);
sum3 += (arr[z + 11] & 0xFF);

sum0 += (arr[z + 12] & 0xFF);
sum1 += (arr[z + 13] & 0xFF);
sum2 += (arr[z + 14] & 0xFF);
sum3 += (arr[z + 15] & 0xFF);

z += 16;
N -= 16;
}
while (N >= 4) {
sum0 += (arr[z + 0] & 0xFF);
sum1 += (arr[z + 1] & 0xFF);
sum2 += (arr[z + 2] & 0xFF);
sum3 += (arr[z + 3] & 0xFF);
z += 4;
N -= 4;
}
sum3 += (sum2 << 8) + (sum1 << 16) + (sum0 << 24);
switch (N) {
case 3:
sum3 += (arr[z + 2] & 0xFF) << 8;
case 2:
sum3 += (arr[z + 1] & 0xFF) << 16;
case 1:
sum3 += (arr[z + 0] & 0xFF) << 24;
break;
default:
break;
}
return sum3 & 0xFFFFFFFFL;
}

/**
* Apply a delta byte array to a source byte array, returning the target byte array.
*/
public static byte[] applyDelta(byte[] source, byte[] delta) throws Exception {
int total = 0;
Reader zDelta = new Reader(delta);
int lenSrc = source.length;
int lenDelta = delta.length;

int limit = zDelta.getInt();
char c = zDelta.getChar();
if (c != '\n') {
throw new Exception("size integer not terminated by '\\n'");
}
Writer zOut = new Writer();
while (zDelta.haveBytes()) {
int cnt = zDelta.getInt();
int ofst;

c = zDelta.getChar();
switch (c) {
case '@':
ofst = zDelta.getInt();
if (zDelta.haveBytes() && zDelta.getChar() != ',') {
throw new Exception("copy command not terminated by ','");
}
total += cnt;
if (total > limit) {
throw new Exception("copy exceeds output file size");
}
if (ofst + cnt > lenSrc) {
throw new Exception("copy extends past end of input");
}
zOut.putArray(source, ofst, ofst + cnt);
break;

case ':':
total += cnt;
if (total > limit) {
throw new Exception("insert command gives an output larger than predicted");
}
if (cnt > lenDelta - zDelta.pos) {
throw new Exception("insert count exceeds size of delta");
}
zOut.putArray(zDelta.a, zDelta.pos, zDelta.pos + cnt);
zDelta.pos += cnt;
break;

case ';':
byte[] out = zOut.toByteArray();
long checksumValue = checksum(out);
if (cnt != (int) checksumValue) {
throw new Exception("bad checksum");
}
if (total != limit) {
throw new Exception("generated size does not match predicted size");
}
return out;

default:
System.out.println(c);
throw new Exception("unknown delta operator");
}
}
throw new Exception("unterminated delta");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ void setInfo(ClientInfo info) {

private ClientInfo info;


public long getOffset() {
return offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class Subscription {
private int resubscribeAttempts = 0;
private String token;
private com.google.protobuf.ByteString data;
private String delta;
private boolean deltaNegotiated;
private byte[] prevData;

Subscription(final Client client, final String channel, final SubscriptionEventListener listener, final SubscriptionOptions options) {
this.client = client;
Expand All @@ -38,6 +41,9 @@ public class Subscription {
if (opts.getData() != null) {
this.data = com.google.protobuf.ByteString.copyFrom(opts.getData());
}
this.prevData = null;
this.delta = "";
this.deltaNegotiated = false;
}

Subscription(final Client client, final String channel, final SubscriptionEventListener listener) {
Expand Down Expand Up @@ -172,6 +178,7 @@ void moveToSubscribed(Protocol.SubscribeResult result) {
this.recover = true;
}
this.setEpoch(result.getEpoch());
this.deltaNegotiated = result.getDelta();

byte[] data = null;
if (result.getData() != null) {
Expand Down Expand Up @@ -255,6 +262,7 @@ Protocol.SubscribeRequest createSubscribeRequest() {
builder.setPositioned(this.opts.isPositioned());
builder.setRecoverable(this.opts.isRecoverable());
builder.setJoinLeave(this.opts.isJoinLeave());
builder.setDelta(this.opts.getDelta());

return builder.build();
}
Expand Down Expand Up @@ -459,4 +467,12 @@ private void presenceStatsSynchronized(ResultCallback<PresenceStatsResult> cb) {
f.complete(null);
}
}

public byte[] getPrevData() {
return prevData;
}

public void setPrevData(byte[] prevData) {
this.prevData = prevData;
}
}
Loading
Loading