From 09b5c03ffddec81450b51b6d88af9b2bd39adacd Mon Sep 17 00:00:00 2001 From: ttldtor Date: Sat, 14 Oct 2023 23:21:42 +0300 Subject: [PATCH] InstrumentProfileConnection::addStateChangeListener InstrumentProfileConnection::removeStateChangeListener InstrumentProfileConnection::onStateChange --- .../dxfeed_graal_cpp_api/internal/Common.hpp | 2 + .../ipf/live/InstrumentProfileConnection.hpp | 57 +++++++++-- .../isolated/Isolated.hpp | 31 ++++-- samples/cpp/DxFeedLiveIpfSample/src/main.cpp | 99 +++++++++---------- src/internal/Isolate.cpp | 32 ++++++ src/internal/JavaObjectHandle.cpp | 1 + src/ipf/live/InstrumentProfileConnection.cpp | 48 ++++++++- 7 files changed, 203 insertions(+), 67 deletions(-) diff --git a/include/dxfeed_graal_cpp_api/internal/Common.hpp b/include/dxfeed_graal_cpp_api/internal/Common.hpp index baae089e..14e3465b 100644 --- a/include/dxfeed_graal_cpp_api/internal/Common.hpp +++ b/include/dxfeed_graal_cpp_api/internal/Common.hpp @@ -46,6 +46,8 @@ struct DXFeedEventListener {}; struct DXEndpointStateChangeListener {}; +struct IpfPropertyChangeListener {}; + #if defined(__clang__) constexpr bool isClangFlavouredCompiler = true; #else diff --git a/include/dxfeed_graal_cpp_api/ipf/live/InstrumentProfileConnection.hpp b/include/dxfeed_graal_cpp_api/ipf/live/InstrumentProfileConnection.hpp index 5f20498b..3868454b 100644 --- a/include/dxfeed_graal_cpp_api/ipf/live/InstrumentProfileConnection.hpp +++ b/include/dxfeed_graal_cpp_api/ipf/live/InstrumentProfileConnection.hpp @@ -14,11 +14,6 @@ namespace dxfcpp { class DXFCPP_EXPORT InstrumentProfileConnection final : public SharedEntity { - Id id_; - JavaObjectHandle handle_; - - InstrumentProfileConnection() noexcept; - public: /// The alias to a type of shared pointer to the InstrumentProfileConnection object using Ptr = std::shared_ptr; @@ -57,7 +52,16 @@ class DXFCPP_EXPORT InstrumentProfileConnection final : public SharedEntity { CLOSED }; - static auto stateToString(State state) noexcept { + private: + Id id_; + JavaObjectHandle handle_; + JavaObjectHandle stateChangeListenerHandle_; + Handler onStateChange_{}; + + InstrumentProfileConnection() noexcept; + + public: + static std::string stateToString(State state) noexcept { switch (state) { case State::NOT_CONNECTED: return "NOT_CONNECTED"; @@ -167,6 +171,47 @@ class DXFCPP_EXPORT InstrumentProfileConnection final : public SharedEntity { * @ref InstrumentProfileConnection::State::CLOSED "CLOSED" and the background update procedures are terminated. */ void close() const noexcept; + + /** + * Adds listener that is notified about changes in @ref InstrumentProfileConnection::getState() "state" property. + * + *

Installed listener can be removed by `id` with InstrumentProfileConnection::removeStateChangeListener method or by call + * `InstrumentProfileConnection::onStateChange() -= id`; + * + * @tparam StateChangeListener The listener type. It can be any callable with signature: `void(State, State)` + * @param listener The listener to add + * @return the listener id + */ + template + std::size_t addStateChangeListener(StateChangeListener &&listener) noexcept +#if __cpp_concepts + requires requires { + { listener(State{}, State{}) } -> std::same_as; + } +#endif + { + return onStateChange_ += listener; + } + + /** + * Removes listener that is notified about changes in @ref InstrumentProfileConnection::getState() "state" property. + * It removes the listener that was previously installed with InstrumentProfileConnection::addStateChangeListener method. + * + * @param listenerId The listener id to remove + */ + void removeStateChangeListener(std::size_t listenerId) noexcept { + onStateChange_ -= listenerId; + } + + /** + * Returns the onStateChange @ref Handler "handler" that can be used to add or remove + * listeners. + * + * @return onStateChange handler with `void(State, State)` signature + */ + auto &onStateChange() noexcept { + return onStateChange_; + } }; } // namespace dxfcpp \ No newline at end of file diff --git a/include/dxfeed_graal_cpp_api/isolated/Isolated.hpp b/include/dxfeed_graal_cpp_api/isolated/Isolated.hpp index c0918828..716f6afd 100644 --- a/include/dxfeed_graal_cpp_api/isolated/Isolated.hpp +++ b/include/dxfeed_graal_cpp_api/isolated/Isolated.hpp @@ -20,7 +20,8 @@ namespace ipf { struct InstrumentProfileReader { static /* dxfg_instrument_profile_reader_t* */ void *create() noexcept; - static std::int64_t getLastModified(/* dxfg_instrument_profile_reader_t * */ void *graalInstrumentProfileReaderHandle) noexcept; + static std::int64_t + getLastModified(/* dxfg_instrument_profile_reader_t * */ void *graalInstrumentProfileReaderHandle) noexcept; static bool wasComplete(/* dxfg_instrument_profile_reader_t * */ void *graalInstrumentProfileReaderHandle) noexcept; @@ -32,7 +33,7 @@ struct InstrumentProfileReader { readFromFile(/* dxfg_instrument_profile_reader_t * */ void *graalInstrumentProfileReaderHandle, const std::string &address, const std::string &user, const std::string &password) noexcept; - static std::string resolveSourceURL(const std::string & address) noexcept; + static std::string resolveSourceURL(const std::string &address) noexcept; }; struct InstrumentProfileCollector { @@ -40,14 +41,24 @@ struct InstrumentProfileCollector { }; struct InstrumentProfileConnection { - static /* dxfg_ipf_connection_t* */ void* createConnection(const std::string& address, /* dxfg_ipf_collector_t* */ void* instrumentProfileCollectorHandle) noexcept; - static std::string getAddress(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept; - static std::int64_t getUpdatePeriod(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept; - static bool setUpdatePeriod(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle, std::int64_t updatePeriod) noexcept; - static dxfcpp::InstrumentProfileConnection::State getState(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept; - static std::int64_t getLastModified(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept; - static bool start(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept; - static bool close(/* dxfg_ipf_connection_t * */ void* instrumentProfileConnectionHandle) noexcept; + static /* dxfg_ipf_connection_t* */ void * + createConnection(const std::string &address, + /* dxfg_ipf_collector_t* */ void *instrumentProfileCollectorHandle) noexcept; + static std::string getAddress(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept; + static std::int64_t getUpdatePeriod(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept; + static bool setUpdatePeriod(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle, + std::int64_t updatePeriod) noexcept; + static dxfcpp::InstrumentProfileConnection::State + getState(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept; + static std::int64_t getLastModified(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept; + static bool start(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept; + static bool close(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle) noexcept; + static bool addStateChangeListener(/* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle, + /* dxfg_ipf_connection_state_change_listener_t * */ void *listener) noexcept; +}; + +struct IpfPropertyChangeListener { + static /* dxfg_ipf_connection_state_change_listener_t* */ void* create(/* dxfg_ipf_connection_state_change_listener_func */ void* userFunc, void *userData) noexcept; }; struct InstrumentProfileList { diff --git a/samples/cpp/DxFeedLiveIpfSample/src/main.cpp b/samples/cpp/DxFeedLiveIpfSample/src/main.cpp index eacbe091..44e46cd5 100644 --- a/samples/cpp/DxFeedLiveIpfSample/src/main.cpp +++ b/samples/cpp/DxFeedLiveIpfSample/src/main.cpp @@ -42,55 +42,54 @@ int main(int argc, char *argv[]) { // Update period can be used to re-read IPF files, not needed for services supporting IPF "live-update" connection->setUpdatePeriod(std::chrono::seconds(60)); -// connection->addStateChangeListener( -// [](InstrumentProfileConnection::State /* oldState */, InstrumentProfileConnection::State newState) { -// std::cout << "Connection state: " + InstrumentProfileConnection::stateToString(newState) << std::endl; -// }); -// -// connection->start(); -// // We can wait until we get first full snapshot of instrument profiles -// connection->waitUntilCompleted(std::chrono::seconds(10)); -// -// // Data model to keep all instrument profiles mapped by their ticker symbol -// std::unordered_map> profiles{}; -// std::mutex mutex{}; -// -// // It is possible to add listener after connection is started - updates will not be missed in this case -// collector.addUpdateListener([&profiles, &mutex](auto &&instruments) { -// std::cout << "\nInstrument Profiles:" << std::endl; -// // We can observe REMOVED elements - need to add necessary filtering -// // See javadoc for InstrumentProfileCollector for more details -// -// // (1) We can either process instrument profile updates manually -// for (auto &&profile : instruments) { -// std::lock_guard lock{mutex}; -// -// if (InstrumentProfileType::REMOVED->getName() == profile->getType()) { -// // Profile was removed - remove it from our data model -// profiles.erase(profile->getSymbol()); -// } else { -// // Profile was updated - collector only notifies us if profile was changed -// profiles[profile.getSymbol()] = profile; -// } -// }; -// -// { -// std::lock_guard lock{mutex}; -// std::cout << "Total number of profiles (1): " + std::to_string(profiles.size()) << std::endl; -// } -// -// // (2) or access the concurrent view of instrument profiles -// std::unordered_set symbols = -// StreamSupport.stream(collector->view().spliterator(), false) -// .filter(profile->!InstrumentProfileType.REMOVED.name().equals(profile.getType())) -// .map(InstrumentProfile::getSymbol) -// .collect(Collectors.toSet()); -// std::cout << "Total number of profiles (2): " + std::to_string(symbols.size()) << std::endl; -// -// std::cout << "Last modified: " + new Date(collector->getLastUpdateTime()) << std::endl; +// connection->addStateChangeListener([](auto /* InstrumentProfileConnection::State oldState */, auto newState) { +// std::cout << "Connection state: " + InstrumentProfileConnection::stateToString(newState) << std::endl; // }); -// -// std::this_thread::sleep_for(std::chrono::days(365)); -// -// connection->close(); + + connection->start(); + // // We can wait until we get first full snapshot of instrument profiles + // connection->waitUntilCompleted(std::chrono::seconds(10)); + // + // // Data model to keep all instrument profiles mapped by their ticker symbol + // std::unordered_map> profiles{}; + // std::mutex mutex{}; + // + // // It is possible to add listener after connection is started - updates will not be missed in this case + // collector.addUpdateListener([&profiles, &mutex](auto &&instruments) { + // std::cout << "\nInstrument Profiles:" << std::endl; + // // We can observe REMOVED elements - need to add necessary filtering + // // See javadoc for InstrumentProfileCollector for more details + // + // // (1) We can either process instrument profile updates manually + // for (auto &&profile : instruments) { + // std::lock_guard lock{mutex}; + // + // if (InstrumentProfileType::REMOVED->getName() == profile->getType()) { + // // Profile was removed - remove it from our data model + // profiles.erase(profile->getSymbol()); + // } else { + // // Profile was updated - collector only notifies us if profile was changed + // profiles[profile.getSymbol()] = profile; + // } + // }; + // + // { + // std::lock_guard lock{mutex}; + // std::cout << "Total number of profiles (1): " + std::to_string(profiles.size()) << std::endl; + // } + // + // // (2) or access the concurrent view of instrument profiles + // std::unordered_set symbols = + // StreamSupport.stream(collector->view().spliterator(), false) + // .filter(profile->!InstrumentProfileType.REMOVED.name().equals(profile.getType())) + // .map(InstrumentProfile::getSymbol) + // .collect(Collectors.toSet()); + // std::cout << "Total number of profiles (2): " + std::to_string(symbols.size()) << std::endl; + // + // std::cout << "Last modified: " + new Date(collector->getLastUpdateTime()) << std::endl; + // }); + // + std::this_thread::sleep_for(std::chrono::days(365)); + + connection->close(); } \ No newline at end of file diff --git a/src/internal/Isolate.cpp b/src/internal/Isolate.cpp index 0d39f3ec..53bad9c5 100644 --- a/src/internal/Isolate.cpp +++ b/src/internal/Isolate.cpp @@ -452,6 +452,38 @@ bool InstrumentProfileConnection::close( false, dxfcpp::bit_cast(instrumentProfileConnectionHandle)); } +bool InstrumentProfileConnection::addStateChangeListener( + /* dxfg_ipf_connection_t * */ void *instrumentProfileConnectionHandle, + /* dxfg_ipf_connection_state_change_listener_t * */ void *listener) noexcept { + if (!instrumentProfileConnectionHandle || !listener) { + return false; + } + + return runIsolatedOrElse( + [](auto threadHandle, auto &&instrumentProfileConnectionHandle, auto &&listener) { + return dxfg_InstrumentProfileConnection_addStateChangeListener( + dxfcpp::bit_cast(threadHandle), instrumentProfileConnectionHandle, + listener) == 0; + }, + false, dxfcpp::bit_cast(instrumentProfileConnectionHandle), + dxfcpp::bit_cast(listener)); +} + +/* dxfg_ipf_connection_state_change_listener_t* */ void * +IpfPropertyChangeListener::create(/* dxfg_ipf_connection_state_change_listener_func */ void *userFunc, + void *userData) noexcept { + if (!userFunc) { + return nullptr; + } + + return runIsolatedOrElse( + [](auto threadHandle, auto &&userFunc, auto &&userData) { + return dxfg_IpfPropertyChangeListener_new(dxfcpp::bit_cast(threadHandle), userFunc, + userData); + }, + nullptr, dxfcpp::bit_cast(userFunc), userData); +} + bool InstrumentProfileList::release(/* dxfg_instrument_profile_list * */ void *graalInstrumentProfileList) noexcept { if (!graalInstrumentProfileList) { return false; diff --git a/src/internal/JavaObjectHandle.cpp b/src/internal/JavaObjectHandle.cpp index 582750f1..60114fde 100644 --- a/src/internal/JavaObjectHandle.cpp +++ b/src/internal/JavaObjectHandle.cpp @@ -33,5 +33,6 @@ template struct JavaObjectHandle; template struct JavaObjectHandle; template struct JavaObjectHandle; template struct JavaObjectHandle; +template struct JavaObjectHandle; } // namespace dxfcpp \ No newline at end of file diff --git a/src/ipf/live/InstrumentProfileConnection.cpp b/src/ipf/live/InstrumentProfileConnection.cpp index 0c124132..edc55728 100644 --- a/src/ipf/live/InstrumentProfileConnection.cpp +++ b/src/ipf/live/InstrumentProfileConnection.cpp @@ -18,8 +18,25 @@ namespace dxfcpp { +static dxfcpp::InstrumentProfileConnection::State graalStateToState(dxfg_ipf_connection_state_t state) { + switch (state) { + case DXFG_IPF_CONNECTION_STATE_NOT_CONNECTED: + return dxfcpp::InstrumentProfileConnection::State::NOT_CONNECTED; + case DXFG_IPF_CONNECTION_STATE_CONNECTING: + return dxfcpp::InstrumentProfileConnection::State::CONNECTING; + case DXFG_IPF_CONNECTION_STATE_CONNECTED: + return dxfcpp::InstrumentProfileConnection::State::CONNECTED; + case DXFG_IPF_CONNECTION_STATE_COMPLETED: + return dxfcpp::InstrumentProfileConnection::State::COMPLETED; + case DXFG_IPF_CONNECTION_STATE_CLOSED: + return dxfcpp::InstrumentProfileConnection::State::CLOSED; + } + + return dxfcpp::InstrumentProfileConnection::State::NOT_CONNECTED; +} + InstrumentProfileConnection::InstrumentProfileConnection() noexcept - : id_{Id::UNKNOWN}, handle_{} { + : id_{Id::UNKNOWN}, handle_{}, stateChangeListenerHandle_{}, onStateChange_{} { } InstrumentProfileConnection::Ptr @@ -38,6 +55,35 @@ InstrumentProfileConnection::createConnection(const std::string &address, connection->handle_ = JavaObjectHandle( isolated::ipf::InstrumentProfileConnection::createConnection(address, collector->handle_.get())); + auto onStateChange = [](graal_isolatethread_t * /*thread*/, dxfg_ipf_connection_state_t oldState, + dxfg_ipf_connection_state_t newState, void *userData) { + auto id = Id::from( + dxfcpp::bit_cast::ValueType>(userData)); + auto connection = ApiContext::getInstance()->getManager()->getEntity(id); + + if constexpr (Debugger::isDebug) { + Debugger::debug("onStateChange: id = " + std::to_string(id.getValue())); + } + + if (connection) { + connection->onStateChange_(graalStateToState(oldState), graalStateToState(newState)); + + if (newState == DXFG_IPF_CONNECTION_STATE_CLOSED) { + ApiContext::getInstance()->getManager()->unregisterEntity(id); + } + } + }; + + connection->stateChangeListenerHandle_ = + JavaObjectHandle(isolated::ipf::IpfPropertyChangeListener::create( + dxfcpp::bit_cast(&onStateChange), dxfcpp::bit_cast(connection->id_.getValue()))); + + if (!connection->handle_ || !connection->stateChangeListenerHandle_) { + return connection; + } + + isolated::ipf::InstrumentProfileConnection::addStateChangeListener(connection->handle_.get(), connection->stateChangeListenerHandle_.get()); + return connection; }