Skip to content

Commit 5182c87

Browse files
committed
opt(flow):1.10.23, 添加DummyAction,修改ParallelAction
1. 修改了ParallelAction,实现了任一成功与任一失败的结束条件; 2. 在ParallelAction与SequenceAction中,添加了ToString();
1 parent 4bbd707 commit 5182c87

9 files changed

+261
-30
lines changed

Diff for: modules/flow/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ set(TBOX_FLOW_HEADERS
5151
actions/composite_action.h
5252
actions/wrapper_action.h
5353
actions/succ_fail_action.h
54+
actions/dummy_action.h
5455
to_graphviz.h)
5556

5657
set(TBOX_FLOW_SOURCES

Diff for: modules/flow/Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ HEAD_FILES = \
4646
actions/composite_action.h \
4747
actions/wrapper_action.h \
4848
actions/succ_fail_action.h \
49+
actions/dummy_action.h \
4950
to_graphviz.h \
5051

5152
CPP_SRC_FILES = \

Diff for: modules/flow/actions/dummy_action.h

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* .============.
3+
* // M A K E / \
4+
* // C++ DEV / \
5+
* // E A S Y / \/ \
6+
* ++ ----------. \/\ .
7+
* \\ \ \ /\ /
8+
* \\ \ \ /
9+
* \\ \ \ /
10+
* -============'
11+
*
12+
* Copyright (c) 2024 Hevake and contributors, all rights reserved.
13+
*
14+
* This file is part of cpp-tbox (https://github.com/cpp-main/cpp-tbox)
15+
* Use of this source code is governed by MIT license that can be found
16+
* in the LICENSE file in the root of the source tree. All contributing
17+
* project authors may be found in the CONTRIBUTORS.md file in the root
18+
* of the source tree.
19+
*/
20+
#ifndef TBOX_FLOW_DUMMY_ACTION_H_20241107
21+
#define TBOX_FLOW_DUMMY_ACTION_H_20241107
22+
23+
#include "../action.h"
24+
25+
namespace tbox {
26+
namespace flow {
27+
28+
//! 木偶动作
29+
//! 其自身不会主动发起结束动作,也不会触发阻塞。由外部通过调用emitFinish()与emitBlock()来实现
30+
class DummyAction : public Action {
31+
public:
32+
explicit DummyAction(event::Loop &loop) : Action(loop, "Dummy") {}
33+
34+
virtual bool isReady() const { return true; }
35+
36+
inline void emitFinish(bool is_succ, const Reason &reason = Reason()) { finish(is_succ, reason); }
37+
inline void emitBlock(const Reason &reason) { block(reason); }
38+
};
39+
40+
}
41+
}
42+
43+
#endif //TBOX_FLOW_DUMMY_ACTION_H_20241107

Diff for: modules/flow/actions/parallel_action.cpp

+49-23
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ namespace flow {
2929

3030
using namespace std::placeholders;
3131

32-
ParallelAction::ParallelAction(event::Loop &loop)
32+
ParallelAction::ParallelAction(event::Loop &loop, Mode mode)
3333
: AssembleAction(loop, "Parallel")
34+
, mode_(mode)
3435
{ }
3536

3637
ParallelAction::~ParallelAction() {
@@ -46,6 +47,7 @@ void ParallelAction::toJson(Json &js) const {
4647
action->toJson(js_child);
4748
js_children.push_back(std::move(js_child));
4849
}
50+
js["mode"] = ToString(mode_);
4951
}
5052

5153
int ParallelAction::addChild(Action *action) {
@@ -55,7 +57,7 @@ int ParallelAction::addChild(Action *action) {
5557
int index = children_.size();
5658
children_.push_back(action);
5759
action->setFinishCallback(std::bind(&ParallelAction::onChildFinished, this, index, _1));
58-
action->setBlockCallback(std::bind(&ParallelAction::onChildBlocked, this, index, _1));
60+
action->setBlockCallback(std::bind(&ParallelAction::onChildBlocked, this, index, _1, _2));
5961
return index;
6062

6163
} else {
@@ -77,26 +79,29 @@ void ParallelAction::onStart() {
7779

7880
for (size_t index = 0; index < children_.size(); ++index) {
7981
Action *action = children_.at(index);
80-
if (!action->start())
82+
if (!action->start()) {
8183
finished_children_[index] = false;
84+
//! 如果是任一失败都退出,那么要直接结束
85+
if (mode_ == Mode::kAnyFail) {
86+
finish(true);
87+
stopAllActions();
88+
return;
89+
}
90+
}
8291
}
8392

84-
tryFinish();
93+
//! 如果全部都启动失败了,那么就直接结束
94+
if (finished_children_.size() == children_.size())
95+
finish(true);
8596
}
8697

8798
void ParallelAction::onStop() {
88-
for (Action *action : children_)
89-
action->stop();
90-
99+
stopAllActions();
91100
AssembleAction::onStop();
92101
}
93102

94103
void ParallelAction::onPause() {
95-
for (Action *action : children_) {
96-
if (action->state() == State::kRunning)
97-
action->pause();
98-
}
99-
104+
pauseAllActions();
100105
AssembleAction::onPause();
101106
}
102107

@@ -114,31 +119,52 @@ void ParallelAction::onReset() {
114119
child->reset();
115120

116121
finished_children_.clear();
117-
blocked_children_.clear();
118-
119122
AssembleAction::onReset();
120123
}
121124

122-
void ParallelAction::tryFinish() {
123-
if ((finished_children_.size() + blocked_children_.size()) == children_.size())
124-
finish(true);
125+
void ParallelAction::stopAllActions() {
126+
for (Action *action : children_) {
127+
action->stop();
128+
}
129+
}
130+
131+
void ParallelAction::pauseAllActions() {
132+
for (Action *action : children_) {
133+
action->pause();
134+
}
125135
}
126136

127137
void ParallelAction::onChildFinished(int index, bool is_succ) {
128138
if (state() == State::kRunning) {
129139
finished_children_[index] = is_succ;
130-
tryFinish();
140+
141+
if ((mode_ == Mode::kAnySucc && is_succ) ||
142+
(mode_ == Mode::kAnyFail && !is_succ)) {
143+
stopAllActions();
144+
finish(true);
145+
146+
} else if (finished_children_.size() == children_.size()) {
147+
finish(true);
148+
}
131149
}
132150
}
133151

134-
void ParallelAction::onChildBlocked(int index, const Reason &why) {
152+
void ParallelAction::onChildBlocked(int index, const Reason &why, const Trace &trace) {
135153
if (state() == State::kRunning) {
136-
blocked_children_[index] = why;
137-
//!FIXME:目前遇到block的动作,仅停止它。将来需要更精细化的处理
138-
children_.at(index)->stop();
139-
tryFinish();
154+
pauseAllActions();
155+
block(why, trace);
140156
}
141157
}
142158

159+
std::string ParallelAction::ToString(Mode mode) {
160+
const char *tbl[] = { "AllFinish", "AnyFail", "AnySucc" };
161+
162+
auto index = static_cast<size_t>(mode);
163+
if (0 <= index && index < NUMBER_OF_ARRAY(tbl))
164+
return tbl[index];
165+
166+
return std::to_string(index);
167+
}
168+
143169
}
144170
}

Diff for: modules/flow/actions/parallel_action.h

+17-6
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,28 @@ namespace flow {
3333
*/
3434
class ParallelAction : public AssembleAction {
3535
public:
36-
explicit ParallelAction(event::Loop &loop);
36+
//! 模式
37+
enum class Mode {
38+
kAllFinish, //!< 全部结束
39+
kAnyFail, //!< 任一失败
40+
kAnySucc, //!< 任一成功
41+
};
42+
43+
explicit ParallelAction(event::Loop &loop, Mode mode = Mode::kAllFinish);
3744
virtual ~ParallelAction();
3845

3946
virtual void toJson(Json &js) const override;
4047

4148
virtual int addChild(Action *action) override;
4249
virtual bool isReady() const override;
4350

51+
inline void setMode(Mode mode) { mode_ = mode; }
52+
4453
using FinishedChildren = std::map<int, bool>;
45-
using BlockedChildren = std::map<int, Reason>;
4654

4755
FinishedChildren getFinishedChildren() const { return finished_children_; }
48-
BlockedChildren getBlockedChildren() const { return blocked_children_; }
56+
57+
static std::string ToString(Mode mode);
4958

5059
protected:
5160
virtual void onStart() override;
@@ -55,15 +64,17 @@ class ParallelAction : public AssembleAction {
5564
virtual void onReset() override;
5665

5766
private:
58-
void tryFinish();
67+
void stopAllActions();
68+
void pauseAllActions();
69+
5970
void onChildFinished(int index, bool is_succ);
60-
void onChildBlocked(int index, const Reason &why);
71+
void onChildBlocked(int index, const Reason &why, const Trace &trace);
6172

6273
private:
74+
Mode mode_;
6375
std::vector<Action*> children_;
6476

6577
FinishedChildren finished_children_;
66-
BlockedChildren blocked_children_;
6778
};
6879

6980
}

Diff for: modules/flow/actions/parallel_action_test.cpp

+136
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "parallel_action.h"
2525
#include "function_action.h"
2626
#include "sleep_action.h"
27+
#include "dummy_action.h"
2728

2829
namespace tbox {
2930
namespace flow {
@@ -96,5 +97,140 @@ TEST(ParallelAction, SleepFunctionAction) {
9697
EXPECT_TRUE(nodelay_action_succ);
9798
}
9899

100+
TEST(ParallelAction, AllFinish) {
101+
auto loop = event::Loop::New();
102+
SetScopeExitAction([loop] { delete loop; });
103+
104+
auto para_action = new ParallelAction(*loop, ParallelAction::Mode::kAllFinish);
105+
SetScopeExitAction([para_action] { delete para_action; });
106+
107+
auto ta1 = new DummyAction(*loop);
108+
auto ta2 = new DummyAction(*loop);
109+
auto ta3 = new DummyAction(*loop);
110+
para_action->addChild(ta1);
111+
para_action->addChild(ta2);
112+
para_action->addChild(ta3);
113+
114+
para_action->setFinishCallback(
115+
[loop](bool is_succ, const Action::Reason&, const Action::Trace&) {
116+
EXPECT_TRUE(is_succ);
117+
loop->exitLoop();
118+
}
119+
);
120+
121+
para_action->start();
122+
loop->runNext([ta1] { ta1->emitFinish(true); });
123+
loop->runNext([ta2] { ta2->emitFinish(false); });
124+
loop->runNext([ta3] { ta3->emitFinish(false); });
125+
loop->runLoop();
126+
127+
EXPECT_EQ(ta1->state(), Action::State::kFinished);
128+
EXPECT_EQ(ta2->state(), Action::State::kFinished);
129+
EXPECT_EQ(ta3->state(), Action::State::kFinished);
130+
EXPECT_EQ(ta1->result(), Action::Result::kSuccess);
131+
EXPECT_EQ(ta2->result(), Action::Result::kFail);
132+
EXPECT_EQ(ta2->result(), Action::Result::kFail);
133+
}
134+
135+
TEST(ParallelAction, AnyFail) {
136+
auto loop = event::Loop::New();
137+
SetScopeExitAction([loop] { delete loop; });
138+
139+
auto para_action = new ParallelAction(*loop, ParallelAction::Mode::kAnyFail);
140+
SetScopeExitAction([para_action] { delete para_action; });
141+
142+
auto ta1 = new DummyAction(*loop);
143+
auto ta2 = new DummyAction(*loop);
144+
auto ta3 = new DummyAction(*loop);
145+
para_action->addChild(ta1);
146+
para_action->addChild(ta2);
147+
para_action->addChild(ta3);
148+
149+
para_action->setFinishCallback(
150+
[loop](bool is_succ, const Action::Reason&, const Action::Trace&) {
151+
EXPECT_TRUE(is_succ);
152+
loop->exitLoop();
153+
}
154+
);
155+
156+
para_action->start();
157+
loop->runNext([ta1] { ta1->emitFinish(true); });
158+
loop->runNext([ta2] { ta2->emitFinish(false); });
159+
loop->runLoop();
160+
161+
EXPECT_EQ(ta1->state(), Action::State::kFinished);
162+
EXPECT_EQ(ta1->result(), Action::Result::kSuccess);
163+
EXPECT_EQ(ta2->state(), Action::State::kFinished);
164+
EXPECT_EQ(ta2->result(), Action::Result::kFail);
165+
EXPECT_EQ(ta3->state(), Action::State::kStoped);
166+
}
167+
168+
TEST(ParallelAction, AnySucc) {
169+
auto loop = event::Loop::New();
170+
SetScopeExitAction([loop] { delete loop; });
171+
172+
auto para_action = new ParallelAction(*loop, ParallelAction::Mode::kAnySucc);
173+
SetScopeExitAction([para_action] { delete para_action; });
174+
175+
auto ta1 = new DummyAction(*loop);
176+
auto ta2 = new DummyAction(*loop);
177+
auto ta3 = new DummyAction(*loop);
178+
para_action->addChild(ta1);
179+
para_action->addChild(ta2);
180+
para_action->addChild(ta3);
181+
182+
para_action->setFinishCallback(
183+
[loop](bool is_succ, const Action::Reason&, const Action::Trace&) {
184+
EXPECT_TRUE(is_succ);
185+
loop->exitLoop();
186+
}
187+
);
188+
189+
para_action->start();
190+
loop->runNext([ta1] { ta1->emitFinish(false); });
191+
loop->runNext([ta2] { ta2->emitFinish(true); });
192+
loop->runLoop();
193+
194+
EXPECT_EQ(ta1->state(), Action::State::kFinished);
195+
EXPECT_EQ(ta1->result(), Action::Result::kFail);
196+
EXPECT_EQ(ta2->state(), Action::State::kFinished);
197+
EXPECT_EQ(ta2->result(), Action::Result::kSuccess);
198+
EXPECT_EQ(ta3->state(), Action::State::kStoped);
199+
}
200+
201+
TEST(ParallelAction, Block) {
202+
auto loop = event::Loop::New();
203+
SetScopeExitAction([loop] { delete loop; });
204+
205+
auto para_action = new ParallelAction(*loop, ParallelAction::Mode::kAnySucc);
206+
SetScopeExitAction([para_action] { delete para_action; });
207+
208+
auto ta1 = new DummyAction(*loop);
209+
auto ta2 = new DummyAction(*loop);
210+
auto ta3 = new DummyAction(*loop);
211+
para_action->addChild(ta1);
212+
para_action->addChild(ta2);
213+
para_action->addChild(ta3);
214+
215+
para_action->setBlockCallback(
216+
[=] (const Action::Reason &r, const Action::Trace &t) {
217+
EXPECT_EQ(r.code, 111);
218+
ASSERT_EQ(t.size(), 2u);
219+
EXPECT_EQ(t[0].id, ta2->id());
220+
loop->exitLoop();
221+
}
222+
);
223+
224+
para_action->start();
225+
loop->runNext([ta1] { ta1->emitFinish(false); });
226+
loop->runNext([ta2] { ta2->emitBlock(Action::Reason(111)); });
227+
loop->runLoop();
228+
229+
EXPECT_EQ(ta1->state(), Action::State::kFinished);
230+
EXPECT_EQ(ta1->result(), Action::Result::kFail);
231+
EXPECT_EQ(ta2->state(), Action::State::kPause);
232+
EXPECT_EQ(ta3->state(), Action::State::kPause);
233+
}
234+
99235
}
100236
}

0 commit comments

Comments
 (0)