diff --git a/src/main/java/cz/smarteon/loxone/LoxoneWebSocket.java b/src/main/java/cz/smarteon/loxone/LoxoneWebSocket.java index 8f57f51..9405384 100644 --- a/src/main/java/cz/smarteon/loxone/LoxoneWebSocket.java +++ b/src/main/java/cz/smarteon/loxone/LoxoneWebSocket.java @@ -69,6 +69,8 @@ public class LoxoneWebSocket { private CountDownLatch authSeqLatch; private CountDownLatch visuLatch; + private SyncCommandGuard syncCommandGuard; + private int authTimeoutSeconds = 3; private int visuTimeoutSeconds = 3; private int retries = 5; @@ -115,7 +117,7 @@ public void registerListener(@NotNull final LoxoneEventListener listener) { eventListeners.add(listener); } - public void sendCommand(@NotNull final Command command) { + public synchronized void sendCommand(@NotNull final Command command) { requireNonNull(command, "command can't be null"); if (command.isWsSupported()) { sendWithRetry(command, retries); @@ -124,10 +126,27 @@ public void sendCommand(@NotNull final Command command) { } } - public void sendSecureCommand(@NotNull final ControlCommand command) { + public synchronized void sendSecureCommand(@NotNull final ControlCommand command) { sendSecureWithRetry(command, retries); } + @SuppressWarnings("unchecked") + public synchronized T commandRequest(@NotNull final Command command) { + requireNonNull(command, "command can't be null"); + if (command.isWsSupported()) { + try { + syncCommandGuard = new SyncCommandGuard<>(command); + sendWithRetry(command, retries); + + return (T) syncCommandGuard.waitForResponse(retries * authTimeoutSeconds); + } finally { + syncCommandGuard = null; + } + } else { + throw new IllegalArgumentException("Only websocket commands are supported"); + } + } + public void close() { scheduler.shutdownNow(); closeWebSocket(); @@ -328,7 +347,7 @@ void sendInternal(final Command command) { LOG.debug("Sending websocket message: " + command.getCommand()); webSocketClient.send(command.getCommand()); // KEEP_ALIVE command has no response at all - if (!KEEP_ALIVE.getCommand().equals(command.getCommand())) { + if (!KEEP_ALIVE.getCommand().equals(command.getCommand()) && syncCommandGuard == null) { commands.add(command); } } @@ -339,7 +358,12 @@ void sendInternal(final Command command) { */ void processMessage(final String message) { try { - final Command command = commands.remove(); + Command command; + if (syncCommandGuard != null) { + command = syncCommandGuard.getCommand(); + } else { + command= commands.remove(); + } if (!Void.class.equals(command.getResponseType())) { final Object parsedMessage = Codec.readMessage(message, command.getResponseType()); if (parsedMessage instanceof LoxoneMessage) { @@ -467,26 +491,29 @@ private boolean checkLoxoneMessage(final Command command, final LoxoneMessage @SuppressWarnings("unchecked") private void processCommand(final Command command, final Object message, final boolean isError) { - CommandResponseListener.State commandState = CommandResponseListener.State.IGNORED; - final Iterator> listeners = commandResponseListeners.iterator(); - while (listeners.hasNext() && commandState != CommandResponseListener.State.CONSUMED) { - @SuppressWarnings("rawtypes") - final CommandResponseListener next = listeners.next(); - if (isError && next instanceof LoxoneMessageCommandResponseListener) { - if (((LoxoneMessageCommandResponseListener) next).acceptsErrorResponses()) { + if (syncCommandGuard != null) { + syncCommandGuard.receive(message); + } else { + CommandResponseListener.State commandState = CommandResponseListener.State.IGNORED; + final Iterator> listeners = commandResponseListeners.iterator(); + while (listeners.hasNext() && commandState != CommandResponseListener.State.CONSUMED) { + @SuppressWarnings("rawtypes") final CommandResponseListener next = listeners.next(); + if (isError && next instanceof LoxoneMessageCommandResponseListener) { + if (((LoxoneMessageCommandResponseListener) next).acceptsErrorResponses()) { + commandState = commandState.fold(next.onCommand(command, message)); + } + } else if (next.accepts(message.getClass())) { commandState = commandState.fold(next.onCommand(command, message)); } - } else if (next.accepts(message.getClass())) { - commandState = commandState.fold(next.onCommand(command, message)); } - } - if (commandState == CommandResponseListener.State.IGNORED) { - LOG.warn("No command listener registered, ignoring command=" + command); - } + if (commandState == CommandResponseListener.State.IGNORED) { + LOG.warn("No command listener registered, ignoring command=" + command); + } - if (command != null && command.getCommand().startsWith(C_SYS_ENC)) { - LOG.warn("Encrypted message receive is not supported"); + if (command != null && command.getCommand().startsWith(C_SYS_ENC)) { + LOG.warn("Encrypted message receive is not supported"); + } } } diff --git a/src/main/java/cz/smarteon/loxone/SyncCommandGuard.java b/src/main/java/cz/smarteon/loxone/SyncCommandGuard.java new file mode 100644 index 0000000..cc61dff --- /dev/null +++ b/src/main/java/cz/smarteon/loxone/SyncCommandGuard.java @@ -0,0 +1,56 @@ +package cz.smarteon.loxone; + +import cz.smarteon.loxone.message.LoxoneMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +class SyncCommandGuard { + + private static final Logger LOG = LoggerFactory.getLogger(SyncCommandGuard.class); + + private final CountDownLatch latch; + + private final Command command; + + private Object response; + + SyncCommandGuard(final Command command) { + this.command = command; + latch = new CountDownLatch(1); + } + + @SuppressWarnings("unchecked") + T waitForResponse(int seconds) { + try { + if (latch.await(seconds, TimeUnit.SECONDS)) { + try { + return (T) response; + } catch (ClassCastException cce) { + if (response instanceof LoxoneMessage) { + LoxoneMessage error = (LoxoneMessage) response; + throw new LoxoneException("Error received of " + error.getControl() + " code " + error.getCode()); + } else { + throw new LoxoneException("Unrecognizable error received to " + command.getCommand()); + } + } + } else { + throw new LoxoneException("Timeout waiting for sync command response " + command.getCommand()); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for sync command request completion", e); + throw new LoxoneException("Interrupted while waiting for sync command request completion"); + } + } + + void receive(final Object response) { + this.response = response; + latch.countDown(); + } + + Command getCommand() { + return command; + } +} diff --git a/src/test/kotlin/LoxoneAT.kt b/src/test/kotlin/LoxoneAT.kt index fa5c27a..fce443d 100644 --- a/src/test/kotlin/LoxoneAT.kt +++ b/src/test/kotlin/LoxoneAT.kt @@ -1,6 +1,7 @@ package cz.smarteon.loxone import cz.smarteon.loxone.app.SwitchControl +import cz.smarteon.loxone.message.ControlCommand import cz.smarteon.loxone.message.JsonValue import cz.smarteon.loxone.message.LoxoneMessage import io.mockk.every @@ -101,6 +102,20 @@ class LoxoneAT { @Test @Order(4) + fun `should pulse on switch sync`() { + val response = device?.let { device -> + loxone.webSocket().commandRequest(ControlCommand.genericControlCommand(device.uuid.toString(), "Pulse")) + } + + expectThat(response){ + isA>() + .get { value }.isA() + .get { jsonNode.textValue() }.isEqualTo("1") + } + } + + @Test + @Order(5) fun `should pulse on secured switch`() { val latch = commands.expectCommand(".*${secDevice?.uuid}/Pulse") secDevice?.let {secDevice -> loxone.sendControlPulse(secDevice) } @@ -119,7 +134,7 @@ class LoxoneAT { } @Test - @Order(5) + @Order(6) fun `should refresh token`() { val evaluator = mockk { every { evaluate(any()) } answers { mockk {