From 38dd908c1a9c666af83352dbd676492673e38eff Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Tue, 8 Oct 2024 18:50:47 +0300 Subject: [PATCH] delta improvements, introducing since option (#75) --- .../github/centrifugal/centrifuge/Client.java | 27 ++++-------- .../centrifugal/centrifuge/Subscription.java | 41 +++++++++++++------ .../centrifuge/SubscriptionOptions.java | 13 ++++++ .../centrifugal/centrifuge/example/Main.java | 2 +- 4 files changed, 52 insertions(+), 31 deletions(-) diff --git a/centrifuge/src/main/java/io/github/centrifugal/centrifuge/Client.java b/centrifuge/src/main/java/io/github/centrifugal/centrifuge/Client.java index 285b5dc..02239ac 100644 --- a/centrifuge/src/main/java/io/github/centrifugal/centrifuge/Client.java +++ b/centrifuge/src/main/java/io/github/centrifugal/centrifuge/Client.java @@ -465,7 +465,12 @@ private void sendSubscribeSynchronized(String channel, Protocol.SubscribeRequest if (Client.this.getState() != ClientState.CONNECTED) { return; } - this.handleSubscribeReply(channel, reply); + try { + this.handleSubscribeReply(channel, reply); + } catch (Exception e) { + // Should never happen. + e.printStackTrace(); + } this.futures.remove(cmd.getId()); }).orTimeout(this.opts.getTimeout(), TimeUnit.MILLISECONDS).exceptionally(e -> { this.executor.submit(() -> { @@ -602,7 +607,7 @@ public void removeSubscription(Subscription sub) { } } - private void handleSubscribeReply(String channel, Protocol.Reply reply) { + private void handleSubscribeReply(String channel, Protocol.Reply reply) throws Exception { Subscription sub = this.getSub(channel); if (sub != null) { Protocol.SubscribeResult result; @@ -868,25 +873,11 @@ private void processReply(Protocol.Reply reply) throws Exception { } } - private void handlePub(String channel, Protocol.Publication pub) throws Exception { + void handlePub(String channel, Protocol.Publication pub) throws Exception { ClientInfo info = ClientInfo.fromProtocolClientInfo(pub.getInfo()); Subscription sub = this.getSub(channel); if (sub != null) { - PublicationEvent event = new PublicationEvent(); - byte[] pubData = pub.getData().toByteArray(); - byte[] prevData = sub.getPrevData(); - if (prevData != null && pub.getDelta()) { - pubData = Fossil.applyDelta(prevData, pubData); - } - sub.setPrevData(pubData); - event.setData(pubData); - event.setInfo(info); - event.setOffset(pub.getOffset()); - event.setTags(pub.getTagsMap()); - if (pub.getOffset() > 0) { - sub.setOffset(pub.getOffset()); - } - sub.getListener().onPublication(sub, event); + sub.handlePublication(pub); } else { ServerSubscription serverSub = this.getServerSub(channel); if (serverSub != null) { diff --git a/centrifuge/src/main/java/io/github/centrifugal/centrifuge/Subscription.java b/centrifuge/src/main/java/io/github/centrifugal/centrifuge/Subscription.java index bddec26..194906d 100644 --- a/centrifuge/src/main/java/io/github/centrifugal/centrifuge/Subscription.java +++ b/centrifuge/src/main/java/io/github/centrifugal/centrifuge/Subscription.java @@ -27,7 +27,6 @@ public class Subscription { private int resubscribeAttempts = 0; private String token; private com.google.protobuf.ByteString data; - private String delta; private boolean deltaNegotiated; private byte[] prevData; @@ -42,8 +41,12 @@ public class Subscription { this.data = com.google.protobuf.ByteString.copyFrom(opts.getData()); } this.prevData = null; - this.delta = ""; this.deltaNegotiated = false; + if (opts.getSince() != null) { + this.offset = opts.getSince().getOffset(); + this.epoch = opts.getSince().getEpoch(); + this.recover = true; + } } Subscription(final Client client, final String channel, final SubscriptionEventListener listener) { @@ -70,7 +73,7 @@ long getOffset() { return offset; } - void setOffset(long offset) { + private void setOffset(long offset) { this.offset = offset; } @@ -172,7 +175,26 @@ void moveToUnsubscribed(boolean sendUnsubscribe, int code, String reason) { this._unsubscribe(sendUnsubscribe, code, reason); } - void moveToSubscribed(Protocol.SubscribeResult result) { + void handlePublication(Protocol.Publication pub) throws Exception { + ClientInfo info = ClientInfo.fromProtocolClientInfo(pub.getInfo()); + PublicationEvent event = new PublicationEvent(); + byte[] pubData = pub.getData().toByteArray(); + byte[] prevData = this.getPrevData(); + if (prevData != null && pub.getDelta()) { + pubData = Fossil.applyDelta(prevData, pubData); + } + this.setPrevData(pubData); + event.setData(pubData); + event.setInfo(info); + event.setOffset(pub.getOffset()); + event.setTags(pub.getTagsMap()); + if (pub.getOffset() > 0) { + this.setOffset(pub.getOffset()); + } + this.listener.onPublication(this, event); + } + + void moveToSubscribed(Protocol.SubscribeResult result) throws Exception { this.setState(SubscriptionState.SUBSCRIBED); if (result.getRecoverable()) { this.recover = true; @@ -189,12 +211,7 @@ void moveToSubscribed(Protocol.SubscribeResult result) { if (result.getPublicationsCount() > 0) { for (Protocol.Publication publication : result.getPublicationsList()) { - PublicationEvent publicationEvent = new PublicationEvent(); - publicationEvent.setData(publication.getData().toByteArray()); - publicationEvent.setOffset(publication.getOffset()); - publicationEvent.setInfo(ClientInfo.fromProtocolClientInfo(publication.getInfo())); - this.listener.onPublication(this, publicationEvent); - this.setOffset(publication.getOffset()); + this.client.handlePub(this.channel, publication); } } else { this.setOffset(result.getOffset()); @@ -468,11 +485,11 @@ private void presenceStatsSynchronized(ResultCallback cb) { } } - public byte[] getPrevData() { + private byte[] getPrevData() { return prevData; } - public void setPrevData(byte[] prevData) { + private void setPrevData(byte[] prevData) { this.prevData = prevData; } } diff --git a/centrifuge/src/main/java/io/github/centrifugal/centrifuge/SubscriptionOptions.java b/centrifuge/src/main/java/io/github/centrifugal/centrifuge/SubscriptionOptions.java index 6aaa541..d4bb7ac 100644 --- a/centrifuge/src/main/java/io/github/centrifugal/centrifuge/SubscriptionOptions.java +++ b/centrifuge/src/main/java/io/github/centrifugal/centrifuge/SubscriptionOptions.java @@ -90,9 +90,22 @@ public String getDelta() { return delta; } + // setDelta allows using delta compression for subscription. The delta compression + // must be also enabled on server side. The only value at this point is "fossil". + // See https://centrifugal.dev/docs/server/delta_compression. public void setDelta(String delta) { this.delta = delta; } private String delta = ""; + + public void setSince(StreamPosition streamPosition) { + this.since = streamPosition; + } + + public StreamPosition getSince() { + return since; + } + + private StreamPosition since; } diff --git a/example/src/main/java/io/github/centrifugal/centrifuge/example/Main.java b/example/src/main/java/io/github/centrifugal/centrifuge/example/Main.java index 95b6838..dbc27f0 100644 --- a/example/src/main/java/io/github/centrifugal/centrifuge/example/Main.java +++ b/example/src/main/java/io/github/centrifugal/centrifuge/example/Main.java @@ -152,7 +152,7 @@ public void onLeave(Subscription sub, LeaveEvent event) { Subscription sub; SubscriptionOptions subOpts = new SubscriptionOptions(); // You can set `delta` to `"fossil"` for using delta compression via - // `subOpts.setDelta("fossil")`; + // subOpts.setDelta("fossil"); try { sub = client.newSubscription("chat:index", subOpts, subListener); } catch (DuplicateSubscriptionException e) {