Skip to content

Commit

Permalink
[MDAPI-79] [C++] Retrieve promise-based events from feed
Browse files Browse the repository at this point in the history
DXFeed::getLastEventsPromises
  • Loading branch information
ttldtor committed Oct 18, 2024
1 parent 22e9a88 commit c67e75c
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 36 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ jobs:
-DCMAKE_C_COMPILER=${{matrix.config.cc}}
-DCMAKE_CXX_COMPILER=${{matrix.config.cxx}}
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
- name: Configure CMake
if: ${{ contains(matrix.config.os, 'linux') }}
Expand All @@ -96,13 +97,15 @@ jobs:
-DCMAKE_C_COMPILER=${{matrix.config.cc}}
-DCMAKE_CXX_COMPILER=${{matrix.config.cxx}}
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
- name: Configure CMake
if: ${{ contains(matrix.config.os, 'win') }}
run: >
cmake -B ${{github.workspace}}/build
-DCMAKE_BUILD_TYPE=${{matrix.buildType}}
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
- name: Build
if: ${{ !contains(matrix.config.os, 'win') }}
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jobs:
-DDXFCXX_INSTALL_SAMPLES=OFF
-DDXFCXX_INSTALL_TOOLS=OFF
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
- name: Configure CMake [Samples][Tools]
if: ${{ contains(matrix.config.os, 'windows') && !contains(matrix.buildType, 'Deb') }}
Expand All @@ -129,6 +130,7 @@ jobs:
-DDXFCXX_INSTALL_LIB=OFF
-DDXFCXX_INSTALL_TOOLS=OFF
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
cmake -B ${{github.workspace}}/build-Tools
-DCMAKE_BUILD_TYPE=${{matrix.buildType}}
Expand All @@ -139,6 +141,7 @@ jobs:
-DDXFCXX_INSTALL_LIB=OFF
-DDXFCXX_INSTALL_SAMPLES=OFF
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
- name: Configure CMake [Lib]
if: ${{ contains(matrix.config.os, 'macos') }}
Expand All @@ -155,6 +158,7 @@ jobs:
-DDXFCXX_INSTALL_SAMPLES=OFF
-DDXFCXX_INSTALL_TOOLS=OFF
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
- name: Configure CMake [Samples][Tools]
if: ${{ contains(matrix.config.os, 'macos') && !contains(matrix.buildType, 'Deb') }}
Expand All @@ -170,6 +174,7 @@ jobs:
-DDXFCXX_INSTALL_LIB=OFF
-DDXFCXX_INSTALL_TOOLS=OFF
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
cmake -B ${{github.workspace}}/build-Tools
-DCMAKE_BUILD_TYPE=${{matrix.buildType}}
Expand All @@ -182,6 +187,7 @@ jobs:
-DDXFCXX_INSTALL_LIB=OFF
-DDXFCXX_INSTALL_SAMPLES=OFF
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
- name: Configure CMake [Lib]
if: ${{ contains(matrix.config.os, 'linux') }}
Expand All @@ -198,6 +204,7 @@ jobs:
-DDXFCXX_INSTALL_SAMPLES=OFF
-DDXFCXX_INSTALL_TOOLS=OFF
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
- name: Configure CMake [Samples][Tools]
if: ${{ contains(matrix.config.os, 'linux') && !contains(matrix.buildType, 'Deb') }}
Expand All @@ -213,6 +220,7 @@ jobs:
-DDXFCXX_INSTALL_LIB=OFF
-DDXFCXX_INSTALL_TOOLS=OFF
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
cmake -B ${{github.workspace}}/build-Tools
-DCMAKE_BUILD_TYPE=${{matrix.buildType}}
Expand All @@ -225,6 +233,7 @@ jobs:
-DDXFCXX_INSTALL_LIB=OFF
-DDXFCXX_INSTALL_SAMPLES=OFF
-DDXFCXX_BUILD_DOC=OFF
-DDXFCXX_FEATURE_STACKTRACE=ON
- name: Build [Lib]
if: ${{ !contains(matrix.config.os, 'win') }}
Expand Down
120 changes: 118 additions & 2 deletions include/dxfeed_graal_cpp_api/api/DXFeed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ struct DXFCPP_EXPORT DXFeed : SharedEntity {
/**
* Requests the last event for the specified event type and symbol.
* This method works only for event types that implement LastingEvent marker "interface".
* This method requests the data from the uplink data provider, creates new event of the specified `eventType`,
* This method requests the data from the uplink data provider, creates new event of the specified event type,
* and completes the resulting promise with this event.
*
* <p>The promise is cancelled when the underlying DXEndpoint is @ref DXEndpoint::close() "closed".
Expand All @@ -430,18 +430,134 @@ struct DXFCPP_EXPORT DXFeed : SharedEntity {
return std::make_shared<Promise<std::shared_ptr<E>>>(getLastEventPromiseImpl(E::TYPE, symbol));
}

/**
* Requests the last events for the specified event type and a collection of symbols.
* This method works only for event types that implement LastingEvent marker "interface".
* This method requests the data from the the uplink data provider,
* creates new events of the specified evet type, and completes the resulting promises with these events.
*
* <p>This is a bulk version of DXFeed::getLastEventPromise() method.
*
* <p>The promise is cancelled when the the underlying DXEndpoint is @ref DXEndpoint::close() "closed".
* If the event is not available for any transient reason (no subscription, no connection to uplink, etc),
* then the resulting promise completes when the issue is resolved, which may involve an arbitrarily long wait.
* Use Promise::await() method to specify timeout while waiting for promise to complete.
* If the event is permanently not available (not supported), then the promise
* completes exceptionally with JavaException "IllegalArgumentException".
*
* <p>Use the following pattern of code to acquire multiple events (either for multiple symbols and/or multiple
* events) and wait with a single timeout for all of them:
*
* ```cpp
* std::vector<dxfcpp::SymbolWrapper> symbols{"AAPL&Q", "IBM&Q"};
* auto promises = DXFeed::getInstance()->getLastEventsPromises<Quote>(symbols.begin(), symbols.end());
*
* // combine the list of promises into one with Promises utility method and wait
* Promises::allOf(*promises)->awaitWithoutException(std::chrono::seconds(timeout));
*
* // now iterate the promises to retrieve results
* for (const auto& promise : *promises) {
* doSomethingWith(promise->getResult()); // InvalidArgumentException if result is nullptr
* }
* ```
*
* <p>Note, that this method does not work when DXEndpoint was created with @ref DXEndpoint::Role::STREAM_FEED "STREAM_FEED"
* role (promise completes exceptionally).
*
* @tparam E The event type.
* @tparam SymbolIt The symbols collection's iterator type.
* @param begin The beginning of the collection of symbols (SymbolWrapper).
* @param end The end of the collection of symbols (SymbolWrapper).
* @return The list of promises for the result of the requests, one item in list per symbol.
*/
template <Derived<LastingEvent> E, typename SymbolIt>
std::shared_ptr<PromiseList<E>> getLastEventsPromises(SymbolIt begin, SymbolIt end) const {
auto list = SymbolWrapper::SymbolListUtils::toGraalListUnique(begin, end);

return std::make_shared<std::shared_ptr<PromiseList<E>>>(getLastEventsPromisesImpl(E::TYPE, list.get()));
return PromiseList<E>::create(getLastEventsPromisesImpl(E::TYPE, list.get()));
}

/**
* Requests the last events for the specified event type and a collection of symbols.
* This method works only for event types that implement LastingEvent marker "interface".
* This method requests the data from the the uplink data provider,
* creates new events of the specified evet type, and completes the resulting promises with these events.
*
* <p>This is a bulk version of DXFeed::getLastEventPromise() method.
*
* <p>The promise is cancelled when the the underlying DXEndpoint is @ref DXEndpoint::close() "closed".
* If the event is not available for any transient reason (no subscription, no connection to uplink, etc),
* then the resulting promise completes when the issue is resolved, which may involve an arbitrarily long wait.
* Use Promise::await() method to specify timeout while waiting for promise to complete.
* If the event is permanently not available (not supported), then the promise
* completes exceptionally with JavaException "IllegalArgumentException".
*
* <p>Use the following pattern of code to acquire multiple events (either for multiple symbols and/or multiple
* events) and wait with a single timeout for all of them:
*
* ```cpp
* std::vector<dxfcpp::SymbolWrapper> symbols{"AAPL&Q", "IBM&Q"};
* auto promises = DXFeed::getInstance()->getLastEventsPromises<Quote>(symbols);
*
* // combine the list of promises into one with Promises utility method and wait
* Promises::allOf(*promises)->awaitWithoutException(std::chrono::seconds(timeout));
*
* // now iterate the promises to retrieve results
* for (const auto& promise : *promises) {
* doSomethingWith(promise->getResult()); // InvalidArgumentException if result is nullptr
* }
* ```
*
* <p>Note, that this method does not work when DXEndpoint was created with @ref DXEndpoint::Role::STREAM_FEED "STREAM_FEED"
* role (promise completes exceptionally).
*
* @tparam E The event type.
* @tparam SymbolsCollection The symbols collection's type.
* @param collection The symbols collection.
* @return The list of promises for the result of the requests, one item in list per symbol.
*/
template <Derived<LastingEvent> E, ConvertibleToSymbolWrapperCollection SymbolsCollection>
std::shared_ptr<PromiseList<E>> getLastEventsPromises(SymbolsCollection &&collection) const {
return getLastEventsPromises<E>(std::begin(collection), std::end(collection));
}

/**
* Requests the last events for the specified event type and a collection of symbols.
* This method works only for event types that implement LastingEvent marker "interface".
* This method requests the data from the the uplink data provider,
* creates new events of the specified evet type, and completes the resulting promises with these events.
*
* <p>This is a bulk version of DXFeed::getLastEventPromise() method.
*
* <p>The promise is cancelled when the the underlying DXEndpoint is @ref DXEndpoint::close() "closed".
* If the event is not available for any transient reason (no subscription, no connection to uplink, etc),
* then the resulting promise completes when the issue is resolved, which may involve an arbitrarily long wait.
* Use Promise::await() method to specify timeout while waiting for promise to complete.
* If the event is permanently not available (not supported), then the promise
* completes exceptionally with JavaException "IllegalArgumentException".
*
* <p>Use the following pattern of code to acquire multiple events (either for multiple symbols and/or multiple
* events) and wait with a single timeout for all of them:
*
* ```cpp
* auto promises = DXFeed::getInstance()->getLastEventsPromises<Quote>({"AAPL&Q", "IBM&Q"});
*
* // combine the list of promises into one with Promises utility method and wait
* Promises::allOf(*promises)->awaitWithoutException(std::chrono::seconds(timeout));
*
* // now iterate the promises to retrieve results
* for (const auto& promise : *promises) {
* doSomethingWith(promise->getResult()); // InvalidArgumentException if result is nullptr
* }
* ```
*
* <p>Note, that this method does not work when DXEndpoint was created with @ref DXEndpoint::Role::STREAM_FEED "STREAM_FEED"
* role (promise completes exceptionally).
*
* @tparam E The event type.
* @param collection The symbols collection.
* @return The list of promises for the result of the requests, one item in list per symbol.
*/
template <Derived<LastingEvent> E>
std::shared_ptr<PromiseList<E>> getLastEventsPromises(std::initializer_list<SymbolWrapper> collection) const {
return getLastEventsPromises<E>(collection.begin(), collection.end());
Expand Down
57 changes: 24 additions & 33 deletions include/dxfeed_graal_cpp_api/promise/Promise.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ DXFCPP_BEGIN_NAMESPACE

struct EventType;
struct JavaException;
struct Promises;

template <class T, class U>
concept Derived = std::is_base_of_v<U, T>;
Expand Down Expand Up @@ -171,13 +172,8 @@ template <typename P> struct CommonPromiseMixin {
* @throws PromiseException if computation has completed exceptionally.
*/
bool awaitWithoutException(const std::chrono::milliseconds &timeoutInMilliseconds) const {
auto timeout = timeoutInMilliseconds.count();

if (timeout > std::numeric_limits<std::int32_t>::max()) {
timeout = std::numeric_limits<std::int32_t>::max();
}

return static_cast<const P *>(this)->impl.awaitWithoutException(timeout);
return static_cast<const P *>(this)->impl.awaitWithoutException(
dxfcpp::fitToType<std::int32_t>(timeoutInMilliseconds.count()));
}

/**
Expand Down Expand Up @@ -246,13 +242,7 @@ template <typename P> struct VoidPromiseMixin {
* @throws PromiseException if computation has completed exceptionally.
*/
void await(const std::chrono::milliseconds &timeoutInMilliseconds) const & {
auto timeout = timeoutInMilliseconds.count();

if (timeout > std::numeric_limits<std::int32_t>::max()) {
timeout = std::numeric_limits<std::int32_t>::max();
}

static_cast<const P *>(this)->impl.await(timeout);
static_cast<const P *>(this)->impl.await(dxfcpp::fitToType<std::int32_t>(timeoutInMilliseconds.count()));

return getResult();
}
Expand Down Expand Up @@ -312,13 +302,7 @@ template <typename E, typename P> struct EventPromiseMixin {
* @throws PromiseException if computation has completed exceptionally.
*/
std::shared_ptr<E> await(const std::chrono::milliseconds &timeoutInMilliseconds) const & {
auto timeout = timeoutInMilliseconds.count();

if (timeout > std::numeric_limits<std::int32_t>::max()) {
timeout = std::numeric_limits<std::int32_t>::max();
}

static_cast<const P *>(this)->impl.await(timeout);
static_cast<const P *>(this)->impl.await(dxfcpp::fitToType<std::int32_t>(timeoutInMilliseconds.count()));

return getResult();
}
Expand Down Expand Up @@ -373,13 +357,7 @@ template <typename E, typename P> struct EventsPromiseMixin {
* @throws PromiseException if computation has completed exceptionally.
*/
std::vector<std::shared_ptr<E>> await(const std::chrono::milliseconds &timeoutInMilliseconds) const & {
auto timeout = timeoutInMilliseconds.count();

if (timeout > std::numeric_limits<std::int32_t>::max()) {
timeout = std::numeric_limits<std::int32_t>::max();
}

static_cast<const P *>(this)->impl.await(static_cast<std::int32_t>(timeout));
static_cast<const P *>(this)->impl.await(dxfcpp::fitToType<std::int32_t>(timeoutInMilliseconds.count()));

return getResult();
}
Expand All @@ -397,10 +375,13 @@ template <typename T> struct Promise {};
template <> struct Promise<void> : CommonPromiseMixin<Promise<void>>, VoidPromiseMixin<Promise<void>> {
friend struct CommonPromiseMixin<Promise>;
friend struct VoidPromiseMixin<Promise>;
friend struct Promises;

private:
VoidPromiseImpl impl;

explicit Promise(void *handle) : impl(handle) {
public:
explicit Promise(void *handle, bool own = true) : impl(handle, own) {
}

Promise(const Promise &) = delete;
Expand All @@ -418,15 +399,18 @@ struct Promise<std::shared_ptr<E>> : CommonPromiseMixin<Promise<std::shared_ptr<
EventPromiseMixin<E, Promise<std::shared_ptr<E>>> {
friend struct CommonPromiseMixin<Promise>;
friend struct EventPromiseMixin<E, Promise>;
friend struct Promises;

private:
EventPromiseImpl impl;

explicit Promise(void *handle) : impl(handle) {
public:
explicit Promise(void *handle, bool own = true) : impl(handle, own) {
}

Promise(const Promise &) = delete;
Promise &operator=(const Promise &) = delete;
Promise(Promise &&) noexcept = delete;
Promise(Promise &&) noexcept = default;
Promise &operator=(Promise &&) noexcept = delete;
};

Expand All @@ -446,6 +430,8 @@ struct PromiseListImpl {
* @tparam E The event type.
*/
template <typename E> struct PromiseList {
friend struct Promises;

using data_type = std::vector<Promise<std::shared_ptr<E>>>;

using iterator_category = std::random_access_iterator_tag;
Expand All @@ -464,14 +450,16 @@ template <typename E> struct PromiseList {
using reverse_iterator = typename data_type::reverse_iterator;
using const_reverse_iterator = typename data_type::const_reverse_iterator;

private:
PromiseListImpl impl;

data_type data_;

public:
explicit PromiseList(void *handle = nullptr) : impl(handle) {
}

std::shared_ptr<PromiseList> create(void *handle) {
static std::shared_ptr<PromiseList> create(void *handle) {
if (!handle) {
return {};
}
Expand Down Expand Up @@ -595,10 +583,13 @@ struct Promise<std::vector<std::shared_ptr<E>>> : CommonPromiseMixin<Promise<std
EventsPromiseMixin<E, Promise<std::vector<std::shared_ptr<E>>>> {
friend struct CommonPromiseMixin<Promise>;
friend struct EventsPromiseMixin<E, Promise>;
friend struct Promises;

private:
EventsPromiseImpl impl;

explicit Promise(void *handle) : impl(handle) {
public:
explicit Promise(void *handle, bool own = true) : impl(handle, own) {
}

Promise(const Promise &) = delete;
Expand Down
2 changes: 1 addition & 1 deletion src/event/EventMapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ DXFCPP_BEGIN_NAMESPACE

std::shared_ptr<EventType> EventMapper::fromGraal(void *graalNativeEvent) {
if (!graalNativeEvent) {
throw InvalidArgumentException("The `graalNativeEvent` is nullptr");
throw InvalidArgumentException("EventMapper::fromGraal: The `graalNativeEvent` is nullptr");
}

// TODO: implement other types [EN-8235]
Expand Down

0 comments on commit c67e75c

Please sign in to comment.