From 46252c8f67a54a46c93fe8e924ccf768f0d3bc07 Mon Sep 17 00:00:00 2001 From: kedixa <1204837541@qq.com> Date: Wed, 11 Sep 2024 16:42:18 +0800 Subject: [PATCH 1/2] tutorial and docs for redis subscriber --- BUILD | 6 + README_cn.md | 1 + docs/tutorial-18-redis_subscriber.md | 116 ++++++++++++++++++ tutorial/CMakeLists.txt | 1 + tutorial/tutorial-18-redis_subscriber.cc | 145 +++++++++++++++++++++++ tutorial/xmake.lua | 2 +- 6 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 docs/tutorial-18-redis_subscriber.md create mode 100644 tutorial/tutorial-18-redis_subscriber.cc diff --git a/BUILD b/BUILD index a9fd41b966..63394bef0e 100644 --- a/BUILD +++ b/BUILD @@ -377,3 +377,9 @@ cc_binary( srcs = ['tutorial/tutorial-14-consul_cli.cc'], deps = [':consul'], ) + +cc_binary( + name = 'redis_subscriber', + srcs = ['tutorial/tutorial-18-redis_subscriber.cc'], + deps = [':redis'], +) diff --git a/README_cn.md b/README_cn.md index f0f712edce..6fbe9f1dbc 100644 --- a/README_cn.md +++ b/README_cn.md @@ -127,6 +127,7 @@ sudo dnf install workflow * [异步MySQL客户端:mysql_cli](docs/tutorial-12-mysql_cli.md) * [异步kafka客户端:kafka_cli](docs/tutorial-13-kafka_cli.md) * [异步DNS客户端:dns_cli](docs/tutorial-17-dns_cli.md) + * [Redis订阅客户端:redis_subscriber](docs/tutorial-18-redis_subscriber.md) #### 编程范式 diff --git a/docs/tutorial-18-redis_subscriber.md b/docs/tutorial-18-redis_subscriber.md new file mode 100644 index 0000000000..f7fc07b19b --- /dev/null +++ b/docs/tutorial-18-redis_subscriber.md @@ -0,0 +1,116 @@ +# Redis订阅模式 + +## 示例代码 +[tutorial-18-redis_subscriber.cc](/tutorial/tutorial-18-redis_subscriber.cc) + +## 创建订阅客户端和任务 +在Workflow中,一个客户端网络任务通常是向服务端发出一个请求并接收一个回复,而Redis订阅任务不同,它会先发出一个订阅请求,然后源源不断地接收服务端推送过来的消息,在这个过程中,客户端还可以新增或取消channels、patterns。 + +用于实现Redis订阅功能的任务是`WFRedisSubscribeTask`,与普通的Redis任务不同,它不从任务工厂产生,而是需要使用`WFRedisSubscriber`来创建。例如 + +```cpp +WFRedisSubscriber suber; + +if (suber.init(url) != 0) +{ + std::cerr << "Subscriber init failed " << strerror(errno) << std::endl; + exit(1); +} + +// ... + +WFRedisSubscribeTask *task; +task = suber.create_subscribe_task(channels, extract, callback); + +task->set_watch_timeout(100000000); // 100000秒 +task->start(); + +// 这里可以使用task的相关接口改变订阅内容 +// ... + +task->release(); +suber.deinit(); +``` + +初始化`WFRedisSubscriber`需要使用`Redis URL`,这与普通Redis任务相同,不再赘述。创建订阅任务时,需要提供三个参数 + +- channels/patterns: 一个或多个被订阅的channel(subscribe)或pattern(psubscribe) +- extract: 收到服务端推送消息时的处理函数 +- callback: 任务结束后的回调函数 + +这个例子中为`watch_timeout`设置了一个很长的时间,若这个时间较短,且服务端长时间未推送消息,则连接会因为超时而断开,订阅任务也会直接失败,请根据实际情况合理设置。 + +当任务处理完成后,需要通过`task->release()`来释放这个任务,这也是与其他任务的一个不同之处。 + +## 处理订阅消息 +服务端推送的消息由创建任务时指定的`extract`函数处理。后续描述中,subscribe对应channel,psubscribe对应pattern。 + +1. 服务端推送的消息格式是具有三个元素的数组,第一个元素是字符串"message"或"pmessage",第二个元素是该消息的channel或pattern的名称,第三个元素是消息的内容。 +2. subscribe或psubscribe请求的回复是具有三个元素的数组,第一个元素是字符串"subscribe"或"psubscribe",第二个元素是channel或pattern的名称,第三个元素是当前通过subscribe或psubscribe命令已经订阅了多少个channel或pattern,是一个整数。如果一个请求订阅了多个channel或pattern,会有多个回复。 +3. unsubscribe或punsubscribe请求的回复是具有三个元素的数组,格式与订阅命令相似。当取消订阅但不指定channel或pattern时,表示取消所有该类型的订阅,对于所有已经订阅的channel或pattern,返回一个回复消息。若当前类型未订阅任何channel或pattern,则返回一个消息,其中名称部分为nil。 + +更多详情可参阅redis文档。 + +处理消息的一个示例如下,简单地将内容打印到标准输出 + +```cpp +void extract(WFRedisSubscribeTask *task) +{ + auto *resp = task->get_resp(); + protocol::RedisValue value; + + resp->get_result(value); + + if (value.is_array()) + { + for (size_t i = 0; i < value.arr_size(); i++) + { + if (value[i].is_string()) + std::cout << value[i].string_value(); + else if (value[i].is_int()) + std::cout << value[i].int_value(); + else if (value[i].is_nil()) + std::cout << "nil"; + else + std::cout << "Unexpected value in array!"; + + std::cout << "\n"; + } + } + else + std::cout << "Unexpected value!\n"; +} +``` + +## 改变订阅内容 +在任务过程中,可以通过下述接口新增或取消订阅,注意在带有channels或patterns参数的接口中,请勿传入空数组。 + +```cpp +// ... + +task->start(); + +// 新增订阅一组channels +task->subscribe(channels); + +// 取消订阅一组channels +task->unsubscribe(channels); + +// 取消订阅所有channels +task->unsubscribe(); + +// 新增订阅一组patterns +task->psubscribe(patterns); + +// 取消订阅一组patterns +task->punsubscribe(patterns); + +// 取消订阅所有patterns +task->punsubscribe(); + +task->release(); +``` + +当所有channels和patterns都被取消订阅后,任务会直接结束,此后不能再新增订阅,请注意该细节。也可以直接通过`task->quit()`来主动结束任务。 + +此外,订阅模式下可以通过`task->ping()`或`task->ping(message)`向Redis服务器发起`ping`请求。当任务设置了较小的`watch_timeout`,但服务端可能长时间没有消息推送时,通过定时发出`ping`请求可以令服务端推送`pong`响应,此时任务便不会因为超时而失败。 diff --git a/tutorial/CMakeLists.txt b/tutorial/CMakeLists.txt index 5f58ddb931..4d244625b6 100644 --- a/tutorial/CMakeLists.txt +++ b/tutorial/CMakeLists.txt @@ -67,6 +67,7 @@ if (NOT REDIS STREQUAL "n") set(TUTORIAL_LIST tutorial-02-redis_cli tutorial-03-wget_to_redis + tutorial-18-redis_subscriber ) foreach(src ${TUTORIAL_LIST}) string(REPLACE "-" ";" arr ${src}) diff --git a/tutorial/tutorial-18-redis_subscriber.cc b/tutorial/tutorial-18-redis_subscriber.cc new file mode 100644 index 0000000000..ccfee2a782 --- /dev/null +++ b/tutorial/tutorial-18-redis_subscriber.cc @@ -0,0 +1,145 @@ +#include +#include +#include +#include +#include + +#include "workflow/WFRedisSubscriber.h" +#include "workflow/WFFacilities.h" +#include "workflow/StringUtil.h" + +void extract(WFRedisSubscribeTask *task) +{ + auto *resp = task->get_resp(); + protocol::RedisValue value; + + resp->get_result(value); + + if (value.is_array()) + { + for (size_t i = 0; i < value.arr_size(); i++) + { + if (value[i].is_string()) + std::cout << value[i].string_value(); + else if (value[i].is_int()) + std::cout << value[i].int_value(); + else if (value[i].is_nil()) + std::cout << "nil"; + else + std::cout << "Unexpected value in array!"; + + std::cout << "\n"; + } + } + else + std::cout << "Unexpected value!\n"; +} + +int main(int argc, char *argv[]) +{ + if (argc < 3) + { + std::cerr << argv[0] << " []..." << std::endl; + exit(1); + } + + std::string url = argv[1]; + if (strncasecmp(argv[1], "redis://", 8) != 0 && + strncasecmp(argv[1], "rediss://", 9) != 0) + { + url = "redis://" + url; + } + + WFRedisSubscriber suber; + + if (suber.init(url) != 0) + { + std::cerr << "Subscriber init failed " << strerror(errno) << std::endl; + exit(1); + } + + std::vector channels; + for (int i = 2; i < argc; i++) + channels.push_back(argv[i]); + + WFFacilities::WaitGroup wg(1); + bool finished = false; + + auto callback = [&](WFRedisSubscribeTask *task) + { + std::cout << "state = " << task->get_state() + << ", error = " << task->get_error() << std::endl; + + finished = true; + wg.done(); + }; + + WFRedisSubscribeTask *task; + task = suber.create_subscribe_task(channels, extract, callback); + + task->set_watch_timeout(100000000); + task->start(); + + std::string line; + + while (!finished) + { + std::string cmd; + std::vector params; + + if (std::getline(std::cin, line)) + params = StringUtil::split_filter_empty(line, ' '); + + if (finished) + break; + + if (params.empty()) + { + task->unsubscribe(); + task->punsubscribe(); + break; + } + + cmd = params[0]; + params.erase(params.begin()); + + for (char &c : cmd) + c = std::toupper(c); + + int ret; + if (cmd == "SUBSCRIBE") + ret = task->subscribe(params); + else if (cmd == "UNSUBSCRIBE") + ret = task->unsubscribe(params); + else if (cmd == "PSUBSCRIBE") + ret = task->psubscribe(params); + else if (cmd == "PUNSUBSCRIBE") + ret = task->punsubscribe(params); + else if (cmd == "PING") + { + if (params.empty()) + ret = task->ping(); + else + ret = task->ping(params[0]); + } + else if (cmd == "QUIT") + ret = task->quit(); + else + { + std::cerr << "Invalid command " << cmd << std::endl; + ret = 0; + } + + if (ret < 0) + { + std::cerr << "Send command failed " << strerror(errno) << std::endl; + break; + } + } + + task->release(); + wg.wait(); + suber.deinit(); + + return 0; +} diff --git a/tutorial/xmake.lua b/tutorial/xmake.lua index 9611bc51c8..d1ae955114 100644 --- a/tutorial/xmake.lua +++ b/tutorial/xmake.lua @@ -11,7 +11,7 @@ function all_examples() local item = {} local s = path.filename(x) if ((s == "upstream_unittest.cc" and not has_config("upstream")) or - ((s == "tutorial-02-redis_cli.cc" or s == "tutorial-03-wget_to_redis.cc") and not has_config("redis")) or + ((s == "tutorial-02-redis_cli.cc" or s == "tutorial-03-wget_to_redis.cc" or s == "tutorial-18-redis_subscriber.cc") and not has_config("redis")) or (s == "tutorial-12-mysql_cli.cc" and not has_config("mysql")) or (s == "tutorial-14-consul_cli.cc" and not has_config("consul")) or (s == "tutorial-13-kafka_cli.cc")) then From 54f6595aa9f944dab5e44f56dd6019906c608e2c Mon Sep 17 00:00:00 2001 From: kedixa <1204837541@qq.com> Date: Wed, 11 Sep 2024 17:27:13 +0800 Subject: [PATCH 2/2] redis_subscriber skip empty line --- docs/tutorial-18-redis_subscriber.md | 2 +- tutorial/tutorial-18-redis_subscriber.cc | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/tutorial-18-redis_subscriber.md b/docs/tutorial-18-redis_subscriber.md index f7fc07b19b..3e830458b3 100644 --- a/docs/tutorial-18-redis_subscriber.md +++ b/docs/tutorial-18-redis_subscriber.md @@ -22,7 +22,7 @@ if (suber.init(url) != 0) WFRedisSubscribeTask *task; task = suber.create_subscribe_task(channels, extract, callback); -task->set_watch_timeout(100000000); // 100000秒 +task->set_watch_timeout(1000000); // 1000秒 task->start(); // 这里可以使用task的相关接口改变订阅内容 diff --git a/tutorial/tutorial-18-redis_subscriber.cc b/tutorial/tutorial-18-redis_subscriber.cc index ccfee2a782..fef4abf13a 100644 --- a/tutorial/tutorial-18-redis_subscriber.cc +++ b/tutorial/tutorial-18-redis_subscriber.cc @@ -77,7 +77,7 @@ int main(int argc, char *argv[]) WFRedisSubscribeTask *task; task = suber.create_subscribe_task(channels, extract, callback); - task->set_watch_timeout(100000000); + task->set_watch_timeout(1000000); task->start(); std::string line; @@ -88,7 +88,12 @@ int main(int argc, char *argv[]) std::vector params; if (std::getline(std::cin, line)) + { + if (line.empty()) + continue; + params = StringUtil::split_filter_empty(line, ' '); + } if (finished) break;