Skip to content

Commit

Permalink
Introduce synchronized websocket interaction
Browse files Browse the repository at this point in the history
  • Loading branch information
jimirocks committed Jun 4, 2023
1 parent a2276e2 commit 1c2f276
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 20 deletions.
65 changes: 46 additions & 19 deletions src/main/java/cz/smarteon/loxone/LoxoneWebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class LoxoneWebSocket {
private CountDownLatch authSeqLatch;
private CountDownLatch visuLatch;

private SyncCommandGuard<?> syncCommandGuard;

private int authTimeoutSeconds = 3;
private int visuTimeoutSeconds = 3;
private int retries = 5;
Expand Down Expand Up @@ -115,7 +117,7 @@ public void registerListener(@NotNull final LoxoneEventListener listener) {
eventListeners.add(listener);
}

public void sendCommand(@NotNull final Command<?> command) {
public synchronized void sendCommand(@NotNull final Command<?> command) {
requireNonNull(command, "command can't be null");
if (command.isWsSupported()) {
sendWithRetry(command, retries);
Expand All @@ -124,10 +126,27 @@ public void sendCommand(@NotNull final Command<?> command) {
}
}

public void sendSecureCommand(@NotNull final ControlCommand<?> command) {
public synchronized void sendSecureCommand(@NotNull final ControlCommand<?> command) {
sendSecureWithRetry(command, retries);
}

@SuppressWarnings("unchecked")
public synchronized <T> T commandRequest(@NotNull final Command<T> command) {
requireNonNull(command, "command can't be null");
if (command.isWsSupported()) {
try {
syncCommandGuard = new SyncCommandGuard<>(command);
sendWithRetry(command, retries);

return (T) syncCommandGuard.waitForResponse(retries * authTimeoutSeconds);
} finally {
syncCommandGuard = null;
}
} else {
throw new IllegalArgumentException("Only websocket commands are supported");
}
}

public void close() {
scheduler.shutdownNow();
closeWebSocket();
Expand Down Expand Up @@ -328,7 +347,7 @@ void sendInternal(final Command<?> command) {
LOG.debug("Sending websocket message: " + command.getCommand());
webSocketClient.send(command.getCommand());
// KEEP_ALIVE command has no response at all
if (!KEEP_ALIVE.getCommand().equals(command.getCommand())) {
if (!KEEP_ALIVE.getCommand().equals(command.getCommand()) && syncCommandGuard == null) {
commands.add(command);
}
}
Expand All @@ -339,7 +358,12 @@ void sendInternal(final Command<?> command) {
*/
void processMessage(final String message) {
try {
final Command<?> command = commands.remove();
Command<?> command;
if (syncCommandGuard != null) {
command = syncCommandGuard.getCommand();
} else {
command= commands.remove();
}
if (!Void.class.equals(command.getResponseType())) {
final Object parsedMessage = Codec.readMessage(message, command.getResponseType());
if (parsedMessage instanceof LoxoneMessage) {
Expand Down Expand Up @@ -467,26 +491,29 @@ private boolean checkLoxoneMessage(final Command<?> command, final LoxoneMessage

@SuppressWarnings("unchecked")
private void processCommand(final Command<?> command, final Object message, final boolean isError) {
CommandResponseListener.State commandState = CommandResponseListener.State.IGNORED;
final Iterator<CommandResponseListener<?>> listeners = commandResponseListeners.iterator();
while (listeners.hasNext() && commandState != CommandResponseListener.State.CONSUMED) {
@SuppressWarnings("rawtypes")
final CommandResponseListener next = listeners.next();
if (isError && next instanceof LoxoneMessageCommandResponseListener) {
if (((LoxoneMessageCommandResponseListener) next).acceptsErrorResponses()) {
if (syncCommandGuard != null) {
syncCommandGuard.receive(message);
} else {
CommandResponseListener.State commandState = CommandResponseListener.State.IGNORED;
final Iterator<CommandResponseListener<?>> listeners = commandResponseListeners.iterator();
while (listeners.hasNext() && commandState != CommandResponseListener.State.CONSUMED) {
@SuppressWarnings("rawtypes") final CommandResponseListener next = listeners.next();
if (isError && next instanceof LoxoneMessageCommandResponseListener) {
if (((LoxoneMessageCommandResponseListener) next).acceptsErrorResponses()) {
commandState = commandState.fold(next.onCommand(command, message));
}
} else if (next.accepts(message.getClass())) {
commandState = commandState.fold(next.onCommand(command, message));
}
} else if (next.accepts(message.getClass())) {
commandState = commandState.fold(next.onCommand(command, message));
}
}

if (commandState == CommandResponseListener.State.IGNORED) {
LOG.warn("No command listener registered, ignoring command=" + command);
}
if (commandState == CommandResponseListener.State.IGNORED) {
LOG.warn("No command listener registered, ignoring command=" + command);
}

if (command != null && command.getCommand().startsWith(C_SYS_ENC)) {
LOG.warn("Encrypted message receive is not supported");
if (command != null && command.getCommand().startsWith(C_SYS_ENC)) {
LOG.warn("Encrypted message receive is not supported");
}
}
}

Expand Down
56 changes: 56 additions & 0 deletions src/main/java/cz/smarteon/loxone/SyncCommandGuard.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cz.smarteon.loxone;

import cz.smarteon.loxone.message.LoxoneMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

class SyncCommandGuard<T> {

private static final Logger LOG = LoggerFactory.getLogger(SyncCommandGuard.class);

private final CountDownLatch latch;

private final Command<T> command;

private Object response;

SyncCommandGuard(final Command<T> command) {
this.command = command;
latch = new CountDownLatch(1);
}

@SuppressWarnings("unchecked")
T waitForResponse(int seconds) {
try {
if (latch.await(seconds, TimeUnit.SECONDS)) {
try {
return (T) response;
} catch (ClassCastException cce) {
if (response instanceof LoxoneMessage<?>) {
LoxoneMessage<?> error = (LoxoneMessage<?>) response;
throw new LoxoneException("Error received of " + error.getControl() + " code " + error.getCode());
} else {
throw new LoxoneException("Unrecognizable error received to " + command.getCommand());
}
}
} else {
throw new LoxoneException("Timeout waiting for sync command response " + command.getCommand());
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for sync command request completion", e);
throw new LoxoneException("Interrupted while waiting for sync command request completion");
}
}

void receive(final Object response) {
this.response = response;
latch.countDown();
}

Command<T> getCommand() {
return command;
}
}
17 changes: 16 additions & 1 deletion src/test/kotlin/LoxoneAT.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cz.smarteon.loxone

import cz.smarteon.loxone.app.SwitchControl
import cz.smarteon.loxone.message.ControlCommand
import cz.smarteon.loxone.message.JsonValue
import cz.smarteon.loxone.message.LoxoneMessage
import io.mockk.every
Expand Down Expand Up @@ -101,6 +102,20 @@ class LoxoneAT {

@Test
@Order(4)
fun `should pulse on switch sync`() {
val response = device?.let { device ->
loxone.webSocket().commandRequest(ControlCommand.genericControlCommand(device.uuid.toString(), "Pulse"))
}

expectThat(response){
isA<LoxoneMessage<*>>()
.get { value }.isA<JsonValue>()
.get { jsonNode.textValue() }.isEqualTo("1")
}
}

@Test
@Order(5)
fun `should pulse on secured switch`() {
val latch = commands.expectCommand(".*${secDevice?.uuid}/Pulse")
secDevice?.let {secDevice -> loxone.sendControlPulse(secDevice) }
Expand All @@ -119,7 +134,7 @@ class LoxoneAT {
}

@Test
@Order(5)
@Order(6)
fun `should refresh token`() {
val evaluator = mockk<TokenStateEvaluator> {
every { evaluate(any()) } answers { mockk {
Expand Down

0 comments on commit 1c2f276

Please sign in to comment.