You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi all! We are currently developing an stream processing program using RxCpp, but we are having issues on how to correctly approach the following situation.
Let's say we have multiple operations, where each operation can succeed or fail. We want to lift a pipeline of observables that sequentially try each operation while they fail, returning as soon as one operation succeed.
The main questions is:
What would be the correct way of implementing a bifurcation, depending on current operation result?
One possible implementation we come across is the following:
We define the OperationResult class, just a wrapper over the event being processed and a boolean indicating if the operation succeeded or failed:
template <classT> classOperationResult{
private:
T m_event;
bool m_success;
public:OperationResult(bool succeded, T event) : m_success{succeded}, m_event{event}{}
boolsuccess() const{
returnthis->m_success;
}
T event() const{
returnthis->m_event;
}
};
And the Operation class, that set up the rxcpp pipeline, where each operation exposes 2 observables, one that emits successfully processed items and one that emits the failed ones.
template <classT> classOperation
{
usingfunction_t = std::function<OperationResult<T>(OperationResult<T>)>;
usingobservable_t = rxcpp::observable<OperationResult<T>>;
private:
std::string m_name;
function_t m_fn;
bool m_inputConnected;
public:Operation(const std::string & name, function_t fn) : m_name{name}, m_fn{fn}, m_inputConnected{false}{}
// This function sets up the rxcpp pipeline, implementing the bifurcation depending on OperationResult// in a way that no new observables/subscriptions are performed
std::pair<observable_t, observable_t> connect(constobservable_t & input){
if (this->m_inputConnected){
throwstd::runtime_error("Error, operation " + this->m_name + " is already connected");
}else{
auto result = input.map(this->m_fn).publish();
auto failure = result.filter([](OperationResult<T> result) { return !result.success(); });
auto success = result.filter([](OperationResult<T> result) { return result.success(); });
result.connect();
this->m_inputConnected = true;
returnstd::pair(failure, success);
}
}
};
This allows for setting up a logical OR between multiple Operations like this:
// Operations
Operation<int> op0("is pair",
[](OperationResult<int> res)
{
cout << "pair got " << res.event() << endl;
return OperationResult<int>(res.event() % 2 == 0, res.event());
});
Operation<int> op1("is greater than 3",
[](OperationResult<int> res)
{
cout << "greater than 3 got " << res.event() << endl;
return OperationResult<int>(res.event() > 3, res.event());
});
Operation<int> op2("equals 1",
[](OperationResult<int> res)
{
cout << "equals 1 got " << res.event() << endl;
return OperationResult<int>(res.event() == 1, res.event());
});
// Input to the observable chainauto input_sbj = subjects::subject<OperationResult<int>>();
auto input = input_sbj.get_observable();
// Subscriber to be called if one of the 3 operation succeededauto success_subscriber = make_subscriber<OperationResult<int>>([](OperationResult<int> res)
{ cout << "[Success] got " << res.event() << endl; });
// Subscriber to be called if all operations failedauto error_subscriber = make_subscriber<OperationResult<int>>([](OperationResult<int> res)
{ cout << "[Error] got " << res.event() << endl; });
// Logical ORauto outs = op0.connect(input);
outs.second.subscribe(success_subscriber);
outs = op1.connect(outs.first);
outs.second.subscribe(success_subscriber);
outs = op2.connect(outs.first);
outs.second.subscribe(success_subscriber);
outs.first.subscribe(error_subscriber);
// Emit inputauto s = input_sbj.get_subscriber();
s.on_next(OperationResult(false, 0)); // Success
s.on_next(OperationResult(false, 1)); // Success
s.on_next(OperationResult(false, 3)); // Error
s.on_next(OperationResult(false, 5)); // Success
s.on_completed();
Any help or insight you can give me is much appreciated!
The text was updated successfully, but these errors were encountered:
JavierBejMen
changed the title
Implementing logical OR operation between multiple operations
Implementing logical OR between multiple operations
Jan 27, 2022
Let's say we have multiple operations, where each operation can succeed or fail. We want to lift a pipeline of observables that sequentially try each operation while they fail, returning as soon as one operation succeed.
Hi all! We are currently developing an stream processing program using RxCpp, but we are having issues on how to correctly approach the following situation.
Let's say we have multiple operations, where each operation can succeed or fail. We want to lift a pipeline of observables that sequentially try each operation while they fail, returning as soon as one operation succeed.
The main questions is:
One possible implementation we come across is the following:
We define the
OperationResult
class, just a wrapper over the event being processed and a boolean indicating if the operation succeeded or failed:And the
Operation
class, that set up the rxcpp pipeline, where each operation exposes 2 observables, one that emits successfully processed items and one that emits the failed ones.This allows for setting up a
logical OR
between multiple Operations like this:Output:
Any help or insight you can give me is much appreciated!
The text was updated successfully, but these errors were encountered: