Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tutorial and docs for redis subscriber #1620

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,9 @@ cc_binary(
srcs = ['tutorial/tutorial-14-consul_cli.cc'],
deps = [':consul'],
)

cc_binary(
name = 'redis_subscriber',
srcs = ['tutorial/tutorial-18-redis_subscriber.cc'],
deps = [':redis'],
)
1 change: 1 addition & 0 deletions README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ sudo dnf install workflow
* [异步MySQL客户端:mysql_cli](docs/tutorial-12-mysql_cli.md)
* [异步kafka客户端:kafka_cli](docs/tutorial-13-kafka_cli.md)
* [异步DNS客户端:dns_cli](docs/tutorial-17-dns_cli.md)
* [Redis订阅客户端:redis_subscriber](docs/tutorial-18-redis_subscriber.md)

#### 编程范式

Expand Down
116 changes: 116 additions & 0 deletions docs/tutorial-18-redis_subscriber.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Redis订阅模式

## 示例代码
[tutorial-18-redis_subscriber.cc](/tutorial/tutorial-18-redis_subscriber.cc)

## 创建订阅客户端和任务
在Workflow中,一个客户端网络任务通常是向服务端发出一个请求并接收一个回复,而Redis订阅任务不同,它会先发出一个订阅请求,然后源源不断地接收服务端推送过来的消息,在这个过程中,客户端还可以新增或取消channels、patterns。

用于实现Redis订阅功能的任务是`WFRedisSubscribeTask`,与普通的Redis任务不同,它不从任务工厂产生,而是需要使用`WFRedisSubscriber`来创建。例如

```cpp
WFRedisSubscriber suber;

if (suber.init(url) != 0)
{
std::cerr << "Subscriber init failed " << strerror(errno) << std::endl;
exit(1);
}

// ...

WFRedisSubscribeTask *task;
task = suber.create_subscribe_task(channels, extract, callback);

task->set_watch_timeout(1000000); // 1000秒
task->start();

// 这里可以使用task的相关接口改变订阅内容
// ...

task->release();
suber.deinit();
```

初始化`WFRedisSubscriber`需要使用`Redis URL`,这与普通Redis任务相同,不再赘述。创建订阅任务时,需要提供三个参数

- channels/patterns: 一个或多个被订阅的channel(subscribe)或pattern(psubscribe)
- extract: 收到服务端推送消息时的处理函数
- callback: 任务结束后的回调函数

这个例子中为`watch_timeout`设置了一个很长的时间,若这个时间较短,且服务端长时间未推送消息,则连接会因为超时而断开,订阅任务也会直接失败,请根据实际情况合理设置。

当任务处理完成后,需要通过`task->release()`来释放这个任务,这也是与其他任务的一个不同之处。

## 处理订阅消息
服务端推送的消息由创建任务时指定的`extract`函数处理。后续描述中,subscribe对应channel,psubscribe对应pattern。

1. 服务端推送的消息格式是具有三个元素的数组,第一个元素是字符串"message"或"pmessage",第二个元素是该消息的channel或pattern的名称,第三个元素是消息的内容。
2. subscribe或psubscribe请求的回复是具有三个元素的数组,第一个元素是字符串"subscribe"或"psubscribe",第二个元素是channel或pattern的名称,第三个元素是当前通过subscribe或psubscribe命令已经订阅了多少个channel或pattern,是一个整数。如果一个请求订阅了多个channel或pattern,会有多个回复。
3. unsubscribe或punsubscribe请求的回复是具有三个元素的数组,格式与订阅命令相似。当取消订阅但不指定channel或pattern时,表示取消所有该类型的订阅,对于所有已经订阅的channel或pattern,返回一个回复消息。若当前类型未订阅任何channel或pattern,则返回一个消息,其中名称部分为nil。

更多详情可参阅redis文档。

处理消息的一个示例如下,简单地将内容打印到标准输出

```cpp
void extract(WFRedisSubscribeTask *task)
{
auto *resp = task->get_resp();
protocol::RedisValue value;

resp->get_result(value);

if (value.is_array())
{
for (size_t i = 0; i < value.arr_size(); i++)
{
if (value[i].is_string())
std::cout << value[i].string_value();
else if (value[i].is_int())
std::cout << value[i].int_value();
else if (value[i].is_nil())
std::cout << "nil";
else
std::cout << "Unexpected value in array!";

std::cout << "\n";
}
}
else
std::cout << "Unexpected value!\n";
}
```

## 改变订阅内容
在任务过程中,可以通过下述接口新增或取消订阅,注意在带有channels或patterns参数的接口中,请勿传入空数组。

```cpp
// ...

task->start();

// 新增订阅一组channels
task->subscribe(channels);

// 取消订阅一组channels
task->unsubscribe(channels);

// 取消订阅所有channels
task->unsubscribe();

// 新增订阅一组patterns
task->psubscribe(patterns);

// 取消订阅一组patterns
task->punsubscribe(patterns);

// 取消订阅所有patterns
task->punsubscribe();

task->release();
```

当所有channels和patterns都被取消订阅后,任务会直接结束,此后不能再新增订阅,请注意该细节。也可以直接通过`task->quit()`来主动结束任务。

此外,订阅模式下可以通过`task->ping()`或`task->ping(message)`向Redis服务器发起`ping`请求。当任务设置了较小的`watch_timeout`,但服务端可能长时间没有消息推送时,通过定时发出`ping`请求可以令服务端推送`pong`响应,此时任务便不会因为超时而失败。
1 change: 1 addition & 0 deletions tutorial/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ if (NOT REDIS STREQUAL "n")
set(TUTORIAL_LIST
tutorial-02-redis_cli
tutorial-03-wget_to_redis
tutorial-18-redis_subscriber
)
foreach(src ${TUTORIAL_LIST})
string(REPLACE "-" ";" arr ${src})
Expand Down
150 changes: 150 additions & 0 deletions tutorial/tutorial-18-redis_subscriber.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include <cerrno>
#include <cctype>
#include <cstring>
#include <iostream>
#include <string>

#include "workflow/WFRedisSubscriber.h"
#include "workflow/WFFacilities.h"
#include "workflow/StringUtil.h"

void extract(WFRedisSubscribeTask *task)
{
auto *resp = task->get_resp();
protocol::RedisValue value;

resp->get_result(value);

if (value.is_array())
{
for (size_t i = 0; i < value.arr_size(); i++)
{
if (value[i].is_string())
std::cout << value[i].string_value();
else if (value[i].is_int())
std::cout << value[i].int_value();
else if (value[i].is_nil())
std::cout << "nil";
else
std::cout << "Unexpected value in array!";

std::cout << "\n";
}
}
else
std::cout << "Unexpected value!\n";
}

int main(int argc, char *argv[])
{
if (argc < 3)
{
std::cerr << argv[0] << " <URL> <Channel> [<Channel>]..." << std::endl;
exit(1);
}

std::string url = argv[1];
if (strncasecmp(argv[1], "redis://", 8) != 0 &&
strncasecmp(argv[1], "rediss://", 9) != 0)
{
url = "redis://" + url;
}

WFRedisSubscriber suber;

if (suber.init(url) != 0)
{
std::cerr << "Subscriber init failed " << strerror(errno) << std::endl;
exit(1);
}

std::vector<std::string> channels;
for (int i = 2; i < argc; i++)
channels.push_back(argv[i]);

WFFacilities::WaitGroup wg(1);
bool finished = false;

auto callback = [&](WFRedisSubscribeTask *task)
{
std::cout << "state = " << task->get_state()
<< ", error = " << task->get_error() << std::endl;

finished = true;
wg.done();
};

WFRedisSubscribeTask *task;
task = suber.create_subscribe_task(channels, extract, callback);

task->set_watch_timeout(1000000);
task->start();

std::string line;

while (!finished)
{
std::string cmd;
std::vector<std::string> params;

if (std::getline(std::cin, line))
{
if (line.empty())
continue;

params = StringUtil::split_filter_empty(line, ' ');
}

if (finished)
break;

if (params.empty())
{
task->unsubscribe();
task->punsubscribe();
break;
}

cmd = params[0];
params.erase(params.begin());

for (char &c : cmd)
c = std::toupper(c);

int ret;
if (cmd == "SUBSCRIBE")
ret = task->subscribe(params);
else if (cmd == "UNSUBSCRIBE")
ret = task->unsubscribe(params);
else if (cmd == "PSUBSCRIBE")
ret = task->psubscribe(params);
else if (cmd == "PUNSUBSCRIBE")
ret = task->punsubscribe(params);
else if (cmd == "PING")
{
if (params.empty())
ret = task->ping();
else
ret = task->ping(params[0]);
}
else if (cmd == "QUIT")
ret = task->quit();
else
{
std::cerr << "Invalid command " << cmd << std::endl;
ret = 0;
}

if (ret < 0)
{
std::cerr << "Send command failed " << strerror(errno) << std::endl;
break;
}
}

task->release();
wg.wait();
suber.deinit();

return 0;
}
2 changes: 1 addition & 1 deletion tutorial/xmake.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ function all_examples()
local item = {}
local s = path.filename(x)
if ((s == "upstream_unittest.cc" and not has_config("upstream")) or
((s == "tutorial-02-redis_cli.cc" or s == "tutorial-03-wget_to_redis.cc") and not has_config("redis")) or
((s == "tutorial-02-redis_cli.cc" or s == "tutorial-03-wget_to_redis.cc" or s == "tutorial-18-redis_subscriber.cc") and not has_config("redis")) or
(s == "tutorial-12-mysql_cli.cc" and not has_config("mysql")) or
(s == "tutorial-14-consul_cli.cc" and not has_config("consul")) or
(s == "tutorial-13-kafka_cli.cc")) then
Expand Down
Loading