Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify chained profile implementation. #87

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2021 ConnectorIO Sp. z o.o.
* Copyright (C) 2024-2024 ConnectorIO Sp. z o.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2022-2022 ConnectorIO Sp. z o.o.
* Copyright (C) 2024-2024 ConnectorIO Sp. z o.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2022-2022 ConnectorIO Sp. z o.o.
* Copyright (C) 2024-2024 ConnectorIO Sp. z o.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.connectorio.addons.profile.ProfileFactoryRegistry;
import org.connectorio.addons.profile.internal.util.NestedMapCreator;
Expand All @@ -48,15 +47,19 @@
class ConnectorioProfile implements StateProfile {

private final Logger logger = LoggerFactory.getLogger(ConnectorioProfile.class);
private final ProfileCallback callback;
private final StackedProfileCallback callback;
private final Executor executor;
private final ProfileContext context;
private final LinkedList<StateProfile> profileChain = new LinkedList<>();
private final LinkedList<StateProfile> callbackChain = new LinkedList<>();

private final ProfileFactoryRegistry registry;
private final ExecutorService executor = Executors.newSingleThreadExecutor();

ConnectorioProfile(ProfileCallback callback, ProfileContext context, ProfileFactoryRegistry registry) {
this.callback = callback;
this(context.getExecutorService(), callback, context, registry);
}

ConnectorioProfile(Executor executor, ProfileCallback callback, ProfileContext context, ProfileFactoryRegistry registry) {
this.executor = executor;
this.context = context;
this.registry = registry;

Expand All @@ -67,7 +70,8 @@ class ConnectorioProfile implements StateProfile {
}

ItemChannelLink link = determnineLink(callback);
StackedProfileCallback chainedCallback = new StackedProfileCallback(link);

this.callback = new StackedProfileCallback(callback, callbackChain);
for (Entry<String, Object> entry : config.entrySet()) {
if ("profile".equals(entry.getKey())) {
continue;
Expand All @@ -79,7 +83,7 @@ class ConnectorioProfile implements StateProfile {
Map<String, Object> profileCfg = (Map<String, Object>) entry.getValue();
String profileType = (String) profileCfg.get("profile");
logger.debug("Creating profile {} for config key {}", profileType, entry.getKey());
Profile createdProfile = getProfileFromFactories(getConfiguredProfileTypeUID(profileType), profileCfg, chainedCallback);
Profile createdProfile = getProfileFromFactories(getConfiguredProfileTypeUID(profileType), profileCfg, new NavigableCallback(link, callbackChain.size(), this.callback));
if (createdProfile == null) {
Optional<String> supported = registry.getAll().stream()
.map(ProfileFactory::getSupportedProfileTypeUIDs)
Expand All @@ -91,7 +95,7 @@ class ConnectorioProfile implements StateProfile {
if (!(createdProfile instanceof StateProfile)) {
throw new IllegalArgumentException("Could not create profile " + profileType + " or it is not state profile");
}
profileChain.add((StateProfile) createdProfile);
callbackChain.add((StateProfile) createdProfile);
}
}

Expand Down Expand Up @@ -155,36 +159,15 @@ public void onStateUpdateFromHandler(State state) {
}

private void handleReading(boolean incoming, Type type, Consumer<StateProfile> head) {
context.getExecutorService().execute(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Iterator<StateProfile> delegate = incoming ? profileChain.iterator() : profileChain.descendingIterator();
Iterator<StateProfile> iterator = new Iterator<StateProfile>() {
int pos = 0;
@Override
public boolean hasNext() {
return delegate.hasNext();
}

@Override
public StateProfile next() {
pos++;
return delegate.next();
}

public String toString() {
return "Iterator [" + pos + ", " + profileChain + "]";
}
};
ChainedProfileCallback callback = new ChainedProfileCallback(iterator, ConnectorioProfile.this.callback);
logger.trace("Setting chained callback for {} to {}", type, callback);
StackedProfileCallback.set(callback);
Iterator<StateProfile> iterator = incoming ? callbackChain.iterator() : callbackChain.descendingIterator();
logger.trace("Firing chained profiles for {} to {}", type, callback);
head.accept(iterator.next());
} catch (Throwable e) {
logger.warn("Uncaught error found while calling profile chain for {}", type, e);
} finally {
StackedProfileCallback.set(null);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2024-2024 ConnectorIO Sp. z o.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.connectorio.addons.profile.internal;

import org.openhab.core.thing.link.ItemChannelLink;
import org.openhab.core.thing.profiles.ProfileCallback;
import org.openhab.core.types.Command;
import org.openhab.core.types.State;

/**
* Callback implementation which is aware of its position in chain.
*
* When this callback is asked to dispatch command or sate it passes it to chain with upper (state)
* or lower (command) element index. This construction allows to move state/command information
* across entire chain without too complex logic. Finalization of the call happens n stacked profile
* callback which know chain boundaries.
*/
public class NavigableCallback implements ProfileCallback {

private final ItemChannelLink link;
private final int index;
private final StackedProfileCallback stack;

public NavigableCallback(ItemChannelLink link, int index, StackedProfileCallback stack) {
this.link = link;
this.index = index;
this.stack = stack;
}

@Override
public void handleCommand(Command command) {
stack.handleCommand(index - 1, command);
}

@Override
public void sendCommand(Command command) {
stack.sendCommand(index - 1, command);
}

@Override
public void sendUpdate(State state) {
stack.sendUpdate(index + 1, state);
}

@Override
public String toString() {
return "Chained Callback [" + link + " at index " + index + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,54 +17,63 @@
*/
package org.connectorio.addons.profile.internal;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import org.openhab.core.thing.link.ItemChannelLink;
import org.openhab.core.thing.profiles.ProfileCallback;
import org.openhab.core.thing.profiles.StateProfile;
import org.openhab.core.types.Command;
import org.openhab.core.types.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StackedProfileCallback implements ProfileCallback {

private final static ThreadLocal<ProfileCallback> DELEGATE = new ThreadLocal<>();
/**
* Stacked callback is a reference point for created profiles to communicate with framework.
*
* Since profiles can call callback at any time, this instance must be present when profile is being created.
* This leads to situation that we have to bridge it into future.
* More over, because this callback can be called from handler to item and from item to handler
* it has to work in both directions, independently of the creation time.
*/
public class StackedProfileCallback {

private final Logger logger = LoggerFactory.getLogger(StackedProfileCallback.class);
private final ItemChannelLink link;

public StackedProfileCallback(ItemChannelLink link) {
this.link = link;
private final ProfileCallback callback;
private final LinkedList<StateProfile> chain;

public StackedProfileCallback(ProfileCallback callback, LinkedList<StateProfile> chain) {
this.callback = callback;
this.chain = chain;
}

@Override
public void handleCommand(Command command) {
public void handleCommand(int index, Command command) {
if (index == -1) {
callback.handleCommand(command);
return;
}
logger.trace("Passing command {} to profile chain", command);
getDelegate().handleCommand(command);
chain.get(index).onCommandFromItem(command);
}

@Override
public void sendCommand(Command command) {
public void sendCommand(int index, Command command) {
if (index == -1) {
callback.handleCommand(command);
return;
}
logger.trace("Sending command {} toi profile chain", command);
getDelegate().sendCommand(command);
chain.get(index).onCommandFromHandler(command);
}

@Override
public void sendUpdate(State state) {
logger.trace("Sending state {} to profile chain", state);
getDelegate().sendUpdate(state);
}

private ProfileCallback getDelegate() {
ProfileCallback callback = DELEGATE.get();
logger.trace("Callback looked up on thread stack {}", callback);
if (callback != null) {
return callback;
public void sendUpdate(int index, State state) {
if (index >= chain.size()) {
callback.sendUpdate(state);
return;
}

throw new IllegalStateException("No callback found on thread stack");
}

static void set(ProfileCallback callback) {
DELEGATE.set(callback);
logger.trace("Sending state {} to profile chain", state);
chain.get(index).onStateUpdateFromHandler(state);
}

}
Loading
Loading