Skip to content

Commit

Permalink
Add WFRedisSubscriber to support redis subscribe/psubscribe.
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim committed Aug 17, 2024
1 parent 50ea72c commit 85057fe
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 47 deletions.
7 changes: 5 additions & 2 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ cc_library(
cc_library(
name = 'redis',
hdrs = [
'src/factory/RedisTaskImpl.inl',
'src/protocol/RedisMessage.h',
'src/protocol/redis_parser.h',
'src/server/WFRedisServer.h',
'src/client/WFRedisSubscriber.h',
],
includes = [
'src/protocol',
Expand All @@ -126,6 +128,7 @@ cc_library(
'src/factory/RedisTaskImpl.cc',
'src/protocol/RedisMessage.cc',
'src/protocol/redis_parser.c',
'src/client/WFRedisSubscriber.cc',
],
deps = [
':common',
Expand All @@ -135,7 +138,6 @@ cc_library(
cc_library(
name = 'mysql',
hdrs = [
'src/client/WFMySQLConnection.h',
'src/protocol/MySQLMessage.h',
'src/protocol/MySQLMessage.inl',
'src/protocol/MySQLResult.h',
Expand All @@ -146,21 +148,22 @@ cc_library(
'src/protocol/mysql_stream.h',
'src/protocol/mysql_types.h',
'src/server/WFMySQLServer.h',
'src/client/WFMySQLConnection.h',
],
includes = [
'src/protocol',
'src/client',
'src/server',
],
srcs = [
'src/client/WFMySQLConnection.cc',
'src/factory/MySQLTaskImpl.cc',
'src/protocol/MySQLMessage.cc',
'src/protocol/MySQLResult.cc',
'src/protocol/MySQLUtil.cc',
'src/protocol/mysql_byteorder.c',
'src/protocol/mysql_parser.c',
'src/protocol/mysql_stream.c',
'src/client/WFMySQLConnection.cc',
],
deps = [
':common',
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists_Headers.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ set(INCLUDE_HEADERS
src/server/WFRedisServer.h
src/server/WFMySQLServer.h
src/client/WFMySQLConnection.h
src/client/WFRedisSubscriber.h
src/client/WFConsulClient.h
src/client/WFDnsClient.h
src/manager/DnsCache.h
Expand Down Expand Up @@ -89,6 +90,7 @@ set(INCLUDE_HEADERS
src/factory/WFResourcePool.h
src/factory/WFMessageQueue.h
src/factory/WFHttpServerTask.h
src/factory/RedisTaskImpl.inl
src/nameservice/WFNameService.h
src/nameservice/WFDnsResolver.h
src/nameservice/WFServiceGovernance.h
Expand Down
7 changes: 7 additions & 0 deletions src/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ set(SRC
WFDnsClient.cc
)

if (NOT REDIS STREQUAL "n")
set(SRC
${SRC}
WFRedisSubscriber.cc
)
endif ()

if (NOT MYSQL STREQUAL "n")
set(SRC
${SRC}
Expand Down
93 changes: 93 additions & 0 deletions src/client/WFRedisSubscriber.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright (c) 2024 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han ([email protected])
*/

#include <errno.h>
#include "URIParser.h"
#include "RedisTaskImpl.inl"
#include "WFRedisSubscriber.h"

void WFRedisSubscribeTask::task_extract(WFRedisTask *task)
{
auto *t = (WFRedisSubscribeTask *)task->user_data;

if (t->extract)
t->extract(t);
}

void WFRedisSubscribeTask::task_callback(WFRedisTask *task)
{
auto *t = (WFRedisSubscribeTask *)task->user_data;

t->mutex.lock();
t->task = NULL;
t->mutex.unlock();

t->state = task->get_state();
t->error = task->get_error();
if (t->callback)
t->callback(t);

t->release();
}

int WFRedisSubscriber::init(const std::string& url, SSL_CTX *ssl_ctx)
{
if (URIParser::parse(url, this->uri) >= 0)
{
this->ssl_ctx = ssl_ctx;
return 0;
}

if (this->uri.state == URI_STATE_INVALID)
errno = EINVAL;

return -1;
}

WFRedisTask *
WFRedisSubscriber::create_redis_task(const std::string& command,
const std::vector<std::string>& params)
{
WFRedisTask *task = __WFRedisTaskFactory::create_subscribe_task(this->uri,
WFRedisSubscribeTask::task_extract,
WFRedisSubscribeTask::task_callback);
this->set_ssl_ctx(task);
task->get_req()->set_request(command, params);
return task;
}

WFRedisSubscribeTask *
WFRedisSubscriber::create_subscribe_task(
const std::vector<std::string>& channels,
extract_t extract, callback_t callback)
{
WFRedisTask *task = this->create_redis_task("SUBSCRIBE", channels);
return new WFRedisSubscribeTask(task, std::move(extract),
std::move(callback));
}

WFRedisSubscribeTask *
WFRedisSubscriber::create_psubscribe_task(
const std::vector<std::string>& patterns,
extract_t extract, callback_t callback)
{
WFRedisTask *task = this->create_redis_task("PSUBSCRIBE", patterns);
return new WFRedisSubscribeTask(task, std::move(extract),
std::move(callback));
}

162 changes: 140 additions & 22 deletions src/client/WFRedisSubscriber.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
/*
Copyright (c) 2024 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han ([email protected])
*/

#ifndef _WFREDISSUBSCRIBER_H_
#define _WFREDISSUBSCRIBER_H_

#include <errno.h>
#include <string>
#include <vector>
#include <utility>
Expand All @@ -9,7 +31,27 @@

class WFRedisSubscribeTask : public WFGenericTask
{
pubilc:
public:
/* Note: Call 'get_resp()' only in the 'extract' function or
before the task is started to set response size limit. */
protocol::RedisResponse *get_resp()
{
return this->task->get_resp();
}

public:
/* User needs to call 'release()' exactly once, anywhere. */
void release()
{
if (this->flag.exchange(true))
delete this;
}

public:
/* Note: After 'release()' is called, all the requesting functions
should not be called except in 'extract', because the task
point may have been deleted because 'callback' finished. */

int subscribe(const std::vector<std::string>& channels);
int unsubscribe(const std::vector<std::string>& channels);
int unsubscribe_all();
Expand All @@ -18,12 +60,47 @@ class WFRedisSubscribeTask : public WFGenericTask
int punsubscribe(const std::vector<std::string>& patterns);
int punsubscribe_all();

int ping();

public:
/* User needs to call 'release()' exactly once. */
void release()
/* All 'timeout' proxy functions can only be called only before
the task is started or in 'extract'. */

/* Timeout of waiting for each message. Very useful. */
void set_watch_timeout(int timeout)
{
if (this->flag.exchange(true))
delete this;
this->task->set_watch_timeout(timeout);
}

/* Timeout of receiving a complete message. */
void set_recv_timeout(int timeout)
{
this->task->set_receive_timeout(timeout);
}

/* Timeout of sending the first subscribe request. */
void set_send_timeout(int timeout)
{
this->task->set_send_timeout(timeout);
}

/* The default keep alive timeout is 0. If you want to keep
the connection alive, make sure not sending any request
after all channels/patterns were unsubscribed. */
void set_keep_alive(int timeout)
{
this->task->set_keep_alive(timeout);
}

public:
void set_extract(std::function<void (WFRedisSubscribeTask *)> ex)
{
this->extract = std::move(ex);
}

void set_callback(std::function<void (WFRedisSubscribeTask *)> cb)
{
this->callback = std::move(cb);
}

protected:
Expand All @@ -38,43 +115,84 @@ class WFRedisSubscribeTask : public WFGenericTask
return series_of(this)->pop();
}

protected:
static void task_extract(WFRedisTask *task);
static void task_callback(WFRedisTask *task);

protected:
WFRedisTask *task;
std::mutex mutex;
std::atomic<bool> flag;
std::function<void (WFRedisSubscribeTask *)> extract;
std::function<void (WFRedisSubscribeTask *)> callback;

protected:
static void task_callback(WFRedisTask *task)
{
auto *t = (WFRedisSubscribeTask *)task->user_data;

t->mutex.lock();
t->task = NULL;
t->mutex.unlock();

t->state = task->get_state();
t->error = task->get_error();
t->callback(t);
t->release();
}

public:
WFRedisSubscribeTask(WFRedisTask *task,
std::function<void (WFRedisSubscribeTask *)>&& ex,
std::function<void (WFRedisSubscribeTask *)>&& cb) :
flag(false),
extract(std::move(ex)),
callback(std::move(cb))
{
task->user_data = this;
task->set_callback(WFRedisSubscribeTask::redis_task_callback);
this->task = task;
}

protected:
virtual ~WFRedisSubscribeTask()
{
if (this->task)
this->task->dismiss();
}

friend class WFRedisSubscriber;
};

class WFRedisSubscriber
{
public:
int init(const std::string& url)
{
return this->init(url, NULL);
}

int init(const std::string& url, SSL_CTX *ssl_ctx);

void deinit() { }

public:
using extract_t = std::function<void (WFRedisSubscribeTask *)>;
using callback_t = std::function<void (WFRedisSubscribeTask *)>;

public:
WFRedisSubscribeTask *
create_subscribe_task(const std::vector<std::string>& channels,
extract_t extract, callback_t callback);

WFRedisSubscribeTask *
create_psubscribe_task(const std::vector<std::string>& patterns,
extract_t extract, callback_t callback);

protected:
void set_ssl_ctx(WFRedisTask *task) const
{
using RedisRequest = protocol::RedisRequest;
using RedisResponse = protocol::RedisResponse;
auto *t = (WFComplexClientTask<RedisRequest, RedisResponse> *)task;
/* 'ssl_ctx' can be NULL and will use default. */
t->set_ssl_ctx(this->ssl_ctx);
}

protected:
WFRedisTask *create_redis_task(const std::string& command,
const std::vector<std::string>& params);

protected:
ParsedURI uri;
SSL_CTX *ssl_ctx;

public:
virtual ~WFRedisSubscriber() { }
};

#endif

3 changes: 3 additions & 0 deletions src/client/xmake.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ target("client")
set_kind("object")
add_files("*.cc")
remove_files("WFKafkaClient.cc")
if not has_config("redis") then
remove_files("WFRedisSubscriber.cc")
end
if not has_config("mysql") then
remove_files("WFMySQLConnection.cc")
end
Expand Down
Loading

0 comments on commit 85057fe

Please sign in to comment.