Skip to content

Commit

Permalink
InstrumentProfileConnection::addStateChangeListener
Browse files Browse the repository at this point in the history
InstrumentProfileConnection::removeStateChangeListener
InstrumentProfileConnection::onStateChange
  • Loading branch information
ttldtor committed Oct 14, 2023
1 parent b9f4d27 commit 09b5c03
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 67 deletions.
2 changes: 2 additions & 0 deletions include/dxfeed_graal_cpp_api/internal/Common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ struct DXFeedEventListener {};

struct DXEndpointStateChangeListener {};

struct IpfPropertyChangeListener {};

#if defined(__clang__)
constexpr bool isClangFlavouredCompiler = true;
#else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@
namespace dxfcpp {

class DXFCPP_EXPORT InstrumentProfileConnection final : public SharedEntity {
Id<InstrumentProfileConnection> id_;
JavaObjectHandle<InstrumentProfileConnection> handle_;

InstrumentProfileConnection() noexcept;

public:
/// The alias to a type of shared pointer to the InstrumentProfileConnection object
using Ptr = std::shared_ptr<InstrumentProfileConnection>;
Expand Down Expand Up @@ -57,7 +52,16 @@ class DXFCPP_EXPORT InstrumentProfileConnection final : public SharedEntity {
CLOSED
};

static auto stateToString(State state) noexcept {
private:
Id<InstrumentProfileConnection> id_;
JavaObjectHandle<InstrumentProfileConnection> handle_;
JavaObjectHandle<IpfPropertyChangeListener> stateChangeListenerHandle_;
Handler<void(State, State)> onStateChange_{};

InstrumentProfileConnection() noexcept;

public:
static std::string stateToString(State state) noexcept {
switch (state) {
case State::NOT_CONNECTED:
return "NOT_CONNECTED";
Expand Down Expand Up @@ -167,6 +171,47 @@ class DXFCPP_EXPORT InstrumentProfileConnection final : public SharedEntity {
* @ref InstrumentProfileConnection::State::CLOSED "CLOSED" and the background update procedures are terminated.
*/
void close() const noexcept;

/**
* Adds listener that is notified about changes in @ref InstrumentProfileConnection::getState() "state" property.
*
* <p>Installed listener can be removed by `id` with InstrumentProfileConnection::removeStateChangeListener method or by call
* `InstrumentProfileConnection::onStateChange() -= id`;
*
* @tparam StateChangeListener The listener type. It can be any callable with signature: `void(State, State)`
* @param listener The listener to add
* @return the listener id
*/
template <typename StateChangeListener>
std::size_t addStateChangeListener(StateChangeListener &&listener) noexcept
#if __cpp_concepts
requires requires {
{ listener(State{}, State{}) } -> std::same_as<void>;
}
#endif
{
return onStateChange_ += listener;
}

/**
* Removes listener that is notified about changes in @ref InstrumentProfileConnection::getState() "state" property.
* It removes the listener that was previously installed with InstrumentProfileConnection::addStateChangeListener method.
*
* @param listenerId The listener id to remove
*/
void removeStateChangeListener(std::size_t listenerId) noexcept {
onStateChange_ -= listenerId;
}

/**
* Returns the onStateChange @ref Handler<void(ArgTypes...)> "handler" that can be used to add or remove
* listeners.
*
* @return onStateChange handler with `void(State, State)` signature
*/
auto &onStateChange() noexcept {
return onStateChange_;
}
};

} // namespace dxfcpp
31 changes: 21 additions & 10 deletions include/dxfeed_graal_cpp_api/isolated/Isolated.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ namespace ipf {
struct InstrumentProfileReader {
static /* dxfg_instrument_profile_reader_t* */ void *create() noexcept;

static std::int64_t getLastModified(/* dxfg_instrument_profile_reader_t * */ void *graalInstrumentProfileReaderHandle) noexcept;
static std::int64_t
getLastModified(/* dxfg_instrument_profile_reader_t * */ void *graalInstrumentProfileReaderHandle) noexcept;

static bool wasComplete(/* dxfg_instrument_profile_reader_t * */ void *graalInstrumentProfileReaderHandle) noexcept;

Expand All @@ -32,22 +33,32 @@ struct InstrumentProfileReader {
readFromFile(/* dxfg_instrument_profile_reader_t * */ void *graalInstrumentProfileReaderHandle,
const std::string &address, const std::string &user, const std::string &password) noexcept;

static std::string resolveSourceURL(const std::string & address) noexcept;
static std::string resolveSourceURL(const std::string &address) noexcept;
};

struct InstrumentProfileCollector {
static /* dxfg_ipf_collector_t* */ void *create() noexcept;
};

struct InstrumentProfileConnection {
static /* dxfg_ipf_connection_t* */ void* createConnection(const std::string& address, /* dxfg_ipf_collector_t* */ void* instrumentProfileCollectorHandle) noexcept;
static std::string getAddress(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept;
static std::int64_t getUpdatePeriod(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept;
static bool setUpdatePeriod(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle, std::int64_t updatePeriod) noexcept;
static dxfcpp::InstrumentProfileConnection::State getState(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept;
static std::int64_t getLastModified(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept;
static bool start(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept;
static bool close(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept;
static /* dxfg_ipf_connection_t* */ void *
createConnection(const std::string &address,
/* dxfg_ipf_collector_t* */ void *instrumentProfileCollectorHandle) noexcept;
static std::string getAddress(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept;
static std::int64_t getUpdatePeriod(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept;
static bool setUpdatePeriod(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle,
std::int64_t updatePeriod) noexcept;
static dxfcpp::InstrumentProfileConnection::State
getState(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept;
static std::int64_t getLastModified(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept;
static bool start(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept;
static bool close(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept;
static bool addStateChangeListener(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle,
/* dxfg_ipf_connection_state_change_listener_t * */ void *listener) noexcept;
};

struct IpfPropertyChangeListener {
static /* dxfg_ipf_connection_state_change_listener_t* */ void* create(/* dxfg_ipf_connection_state_change_listener_func */ void* userFunc, void *userData) noexcept;
};

struct InstrumentProfileList {
Expand Down
99 changes: 49 additions & 50 deletions samples/cpp/DxFeedLiveIpfSample/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,55 +42,54 @@ int main(int argc, char *argv[]) {
// Update period can be used to re-read IPF files, not needed for services supporting IPF "live-update"
connection->setUpdatePeriod(std::chrono::seconds(60));

// connection->addStateChangeListener(
// [](InstrumentProfileConnection::State /* oldState */, InstrumentProfileConnection::State newState) {
// std::cout << "Connection state: " + InstrumentProfileConnection::stateToString(newState) << std::endl;
// });
//
// connection->start();
// // We can wait until we get first full snapshot of instrument profiles
// connection->waitUntilCompleted(std::chrono::seconds(10));
//
// // Data model to keep all instrument profiles mapped by their ticker symbol
// std::unordered_map<std::string, std::shared_ptr<InstrumentProfile>> profiles{};
// std::mutex mutex{};
//
// // It is possible to add listener after connection is started - updates will not be missed in this case
// collector.addUpdateListener([&profiles, &mutex](auto &&instruments) {
// std::cout << "\nInstrument Profiles:" << std::endl;
// // We can observe REMOVED elements - need to add necessary filtering
// // See javadoc for InstrumentProfileCollector for more details
//
// // (1) We can either process instrument profile updates manually
// for (auto &&profile : instruments) {
// std::lock_guard lock{mutex};
//
// if (InstrumentProfileType::REMOVED->getName() == profile->getType()) {
// // Profile was removed - remove it from our data model
// profiles.erase(profile->getSymbol());
// } else {
// // Profile was updated - collector only notifies us if profile was changed
// profiles[profile.getSymbol()] = profile;
// }
// };
//
// {
// std::lock_guard lock{mutex};
// std::cout << "Total number of profiles (1): " + std::to_string(profiles.size()) << std::endl;
// }
//
// // (2) or access the concurrent view of instrument profiles
// std::unordered_set<std::string> symbols =
// StreamSupport.stream(collector->view().spliterator(), false)
// .filter(profile->!InstrumentProfileType.REMOVED.name().equals(profile.getType()))
// .map(InstrumentProfile::getSymbol)
// .collect(Collectors.toSet());
// std::cout << "Total number of profiles (2): " + std::to_string(symbols.size()) << std::endl;
//
// std::cout << "Last modified: " + new Date(collector->getLastUpdateTime()) << std::endl;
// connection->addStateChangeListener([](auto /* InstrumentProfileConnection::State oldState */, auto newState) {
// std::cout << "Connection state: " + InstrumentProfileConnection::stateToString(newState) << std::endl;
// });
//
// std::this_thread::sleep_for(std::chrono::days(365));
//
// connection->close();

connection->start();
// // We can wait until we get first full snapshot of instrument profiles
// connection->waitUntilCompleted(std::chrono::seconds(10));
//
// // Data model to keep all instrument profiles mapped by their ticker symbol
// std::unordered_map<std::string, std::shared_ptr<InstrumentProfile>> profiles{};
// std::mutex mutex{};
//
// // It is possible to add listener after connection is started - updates will not be missed in this case
// collector.addUpdateListener([&profiles, &mutex](auto &&instruments) {
// std::cout << "\nInstrument Profiles:" << std::endl;
// // We can observe REMOVED elements - need to add necessary filtering
// // See javadoc for InstrumentProfileCollector for more details
//
// // (1) We can either process instrument profile updates manually
// for (auto &&profile : instruments) {
// std::lock_guard lock{mutex};
//
// if (InstrumentProfileType::REMOVED->getName() == profile->getType()) {
// // Profile was removed - remove it from our data model
// profiles.erase(profile->getSymbol());
// } else {
// // Profile was updated - collector only notifies us if profile was changed
// profiles[profile.getSymbol()] = profile;
// }
// };
//
// {
// std::lock_guard lock{mutex};
// std::cout << "Total number of profiles (1): " + std::to_string(profiles.size()) << std::endl;
// }
//
// // (2) or access the concurrent view of instrument profiles
// std::unordered_set<std::string> symbols =
// StreamSupport.stream(collector->view().spliterator(), false)
// .filter(profile->!InstrumentProfileType.REMOVED.name().equals(profile.getType()))
// .map(InstrumentProfile::getSymbol)
// .collect(Collectors.toSet());
// std::cout << "Total number of profiles (2): " + std::to_string(symbols.size()) << std::endl;
//
// std::cout << "Last modified: " + new Date(collector->getLastUpdateTime()) << std::endl;
// });
//
std::this_thread::sleep_for(std::chrono::days(365));

connection->close();
}
32 changes: 32 additions & 0 deletions src/internal/Isolate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,38 @@ bool InstrumentProfileConnection::close(
false, dxfcpp::bit_cast<dxfg_ipf_connection_t *>(instrumentProfileConnectionHandle));
}

bool InstrumentProfileConnection::addStateChangeListener(
/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle,
/* dxfg_ipf_connection_state_change_listener_t * */ void *listener) noexcept {
if (!instrumentProfileConnectionHandle || !listener) {
return false;
}

return runIsolatedOrElse(
[](auto threadHandle, auto &&instrumentProfileConnectionHandle, auto &&listener) {
return dxfg_InstrumentProfileConnection_addStateChangeListener(
dxfcpp::bit_cast<graal_isolatethread_t *>(threadHandle), instrumentProfileConnectionHandle,
listener) == 0;
},
false, dxfcpp::bit_cast<dxfg_ipf_connection_t *>(instrumentProfileConnectionHandle),
dxfcpp::bit_cast<dxfg_ipf_connection_state_change_listener_t *>(listener));
}

/* dxfg_ipf_connection_state_change_listener_t* */ void *
IpfPropertyChangeListener::create(/* dxfg_ipf_connection_state_change_listener_func */ void *userFunc,
void *userData) noexcept {
if (!userFunc) {
return nullptr;
}

return runIsolatedOrElse(
[](auto threadHandle, auto &&userFunc, auto &&userData) {
return dxfg_IpfPropertyChangeListener_new(dxfcpp::bit_cast<graal_isolatethread_t *>(threadHandle), userFunc,
userData);
},
nullptr, dxfcpp::bit_cast<dxfg_ipf_connection_state_change_listener_func>(userFunc), userData);
}

bool InstrumentProfileList::release(/* dxfg_instrument_profile_list * */ void *graalInstrumentProfileList) noexcept {
if (!graalInstrumentProfileList) {
return false;
Expand Down
1 change: 1 addition & 0 deletions src/internal/JavaObjectHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ template struct JavaObjectHandle<DXFeedEventListener>;
template struct JavaObjectHandle<InstrumentProfileReader>;
template struct JavaObjectHandle<InstrumentProfileCollector>;
template struct JavaObjectHandle<InstrumentProfileConnection>;
template struct JavaObjectHandle<IpfPropertyChangeListener>;

} // namespace dxfcpp
48 changes: 47 additions & 1 deletion src/ipf/live/InstrumentProfileConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,25 @@

namespace dxfcpp {

static dxfcpp::InstrumentProfileConnection::State graalStateToState(dxfg_ipf_connection_state_t state) {
switch (state) {
case DXFG_IPF_CONNECTION_STATE_NOT_CONNECTED:
return dxfcpp::InstrumentProfileConnection::State::NOT_CONNECTED;
case DXFG_IPF_CONNECTION_STATE_CONNECTING:
return dxfcpp::InstrumentProfileConnection::State::CONNECTING;
case DXFG_IPF_CONNECTION_STATE_CONNECTED:
return dxfcpp::InstrumentProfileConnection::State::CONNECTED;
case DXFG_IPF_CONNECTION_STATE_COMPLETED:
return dxfcpp::InstrumentProfileConnection::State::COMPLETED;
case DXFG_IPF_CONNECTION_STATE_CLOSED:
return dxfcpp::InstrumentProfileConnection::State::CLOSED;
}

return dxfcpp::InstrumentProfileConnection::State::NOT_CONNECTED;
}

InstrumentProfileConnection::InstrumentProfileConnection() noexcept
: id_{Id<InstrumentProfileConnection>::UNKNOWN}, handle_{} {
: id_{Id<InstrumentProfileConnection>::UNKNOWN}, handle_{}, stateChangeListenerHandle_{}, onStateChange_{} {
}

InstrumentProfileConnection::Ptr
Expand All @@ -38,6 +55,35 @@ InstrumentProfileConnection::createConnection(const std::string &address,
connection->handle_ = JavaObjectHandle<InstrumentProfileConnection>(
isolated::ipf::InstrumentProfileConnection::createConnection(address, collector->handle_.get()));

auto onStateChange = [](graal_isolatethread_t * /*thread*/, dxfg_ipf_connection_state_t oldState,
dxfg_ipf_connection_state_t newState, void *userData) {
auto id = Id<InstrumentProfileConnection>::from(
dxfcpp::bit_cast<Id<InstrumentProfileConnection>::ValueType>(userData));
auto connection = ApiContext::getInstance()->getManager<InstrumentProfileConnectionManager>()->getEntity(id);

if constexpr (Debugger::isDebug) {
Debugger::debug("onStateChange: id = " + std::to_string(id.getValue()));
}

if (connection) {
connection->onStateChange_(graalStateToState(oldState), graalStateToState(newState));

if (newState == DXFG_IPF_CONNECTION_STATE_CLOSED) {
ApiContext::getInstance()->getManager<InstrumentProfileConnectionManager>()->unregisterEntity(id);
}
}
};

connection->stateChangeListenerHandle_ =
JavaObjectHandle<IpfPropertyChangeListener>(isolated::ipf::IpfPropertyChangeListener::create(
dxfcpp::bit_cast<void *>(&onStateChange), dxfcpp::bit_cast<void *>(connection->id_.getValue())));

if (!connection->handle_ || !connection->stateChangeListenerHandle_) {
return connection;
}

isolated::ipf::InstrumentProfileConnection::addStateChangeListener(connection->handle_.get(), connection->stateChangeListenerHandle_.get());

return connection;
}

Expand Down

0 comments on commit 09b5c03

Please sign in to comment.