Skip to content

Commit

Permalink
Limit Payload Queue Size (#31)
Browse files Browse the repository at this point in the history
* Limit Payload Queue Size

* try to fix ubuntu test

* minor changex

* removed debug flag on ubuntu workflow
  • Loading branch information
kamalnadjieb authored Aug 12, 2022
1 parent b92379d commit 65e2129
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 133 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ubuntu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ jobs:
- name: build
run: cmake --build build --config Debug --parallel 10
- name: test
run: cd build ; ctest -j 10 -C Debug --output-on-failure
run: cd build ; ctest -j 10 --output-on-failure
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 3.1)
cmake_minimum_required(VERSION 3.8)

##
## PROJECT
Expand Down Expand Up @@ -68,7 +68,7 @@ add_library(${PROJECT_NAME}::${NADJIEB_MJPEG_STREAMER_TARGET_NAME} ALIAS ${NADJI
if (${CMAKE_VERSION} VERSION_LESS "3.8.0")
target_compile_features(${NADJIEB_MJPEG_STREAMER_TARGET_NAME} INTERFACE cxx_range_for)
else()
target_compile_features(${NADJIEB_MJPEG_STREAMER_TARGET_NAME} INTERFACE cxx_std_11)
target_compile_features(${NADJIEB_MJPEG_STREAMER_TARGET_NAME} INTERFACE cxx_std_17)
endif()

target_include_directories(
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,6 @@ check-single-includes:
@for x in $(SRCS); do \
echo "Checking self-sufficiency of $$x..." ; \
echo "#include <$$x>\nint main() {}\n" | $(SED) 's|include/||' > single_include_test.cpp; \
$(CXX) $(CXXFLAGS) -Iinclude -std=c++11 single_include_test.cpp -o single_include_test; \
$(CXX) $(CXXFLAGS) -Iinclude -std=c++17 single_include_test.cpp -o single_include_test; \
rm -f single_include_test.cpp single_include_test; \
done
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ int main() {

// By default "/shutdown" is the target to graceful shutdown the streamer
// if you want to change the target to graceful shutdown:
// streamer.setShutdownTarget("/stop");
// streamer.setShutdownTarget("/stop");

// By default 1 worker is used for streaming
// if you want to use 4 workers:
// streamer.start(8080, 4);
// By default std::thread::hardware_concurrency() workers is used for streaming
// if you want to use 4 workers instead:
// streamer.start(8080, 4);
streamer.start(8080);

// Visit /shutdown or another defined target to stop the loop and graceful shutdown
Expand Down
4 changes: 2 additions & 2 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
cmake_minimum_required(VERSION 3.1)
cmake_minimum_required(VERSION 3.8)
project(nadjieb_mjpeg_streamer_examples LANGUAGES CXX)

find_package(nadjieb_mjpeg_streamer REQUIRED)
find_package(OpenCV 4.2 REQUIRED)

add_executable(example ./example.cpp)
target_compile_features(example PRIVATE cxx_std_11)
target_compile_features(example PRIVATE cxx_std_17)
target_link_libraries(example
PRIVATE
nadjieb_mjpeg_streamer::nadjieb_mjpeg_streamer
Expand Down
4 changes: 2 additions & 2 deletions examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ int main() {
// if you want to change the target to graceful shutdown:
// streamer.setShutdownTarget("/stop");

// By default 1 worker is used for streaming
// if you want to use 4 workers:
// By default std::thread::hardware_concurrency() workers is used for streaming
// if you want to use 4 workers instead:
// streamer.start(8080, 4);
streamer.start(8080);

Expand Down
17 changes: 15 additions & 2 deletions include/nadjieb/mjpeg_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MJPEGStreamer : public nadjieb::utils::NonCopyable {
public:
virtual ~MJPEGStreamer() { stop(); }

void start(int port, int num_workers = 1) {
void start(int port, int num_workers = std::thread::hardware_concurrency()) {
publisher_.start(num_workers);
listener_.withOnMessageCallback(on_message_cb_).withOnBeforeCloseCallback(on_before_close_cb_).runAsync(port);

Expand Down Expand Up @@ -104,6 +104,19 @@ class MJPEGStreamer : public nadjieb::utils::NonCopyable {
return cb_res;
}

if (!publisher_.pathExists(req.getTarget())) {
nadjieb::net::HTTPResponse not_found_res;
not_found_res.setVersion(req.getVersion());
not_found_res.setStatusCode(404);
not_found_res.setStatusText("Not Found");
auto not_found_res_str = not_found_res.serialize();

nadjieb::net::sendViaSocket(sockfd, not_found_res_str.c_str(), not_found_res_str.size(), 0);

cb_res.close_conn = true;
return cb_res;
}

nadjieb::net::HTTPResponse init_res;
init_res.setVersion(req.getVersion());
init_res.setStatusCode(200);
Expand All @@ -116,7 +129,7 @@ class MJPEGStreamer : public nadjieb::utils::NonCopyable {

nadjieb::net::sendViaSocket(sockfd, init_res_str.c_str(), init_res_str.size(), 0);

publisher_.add(req.getTarget(), sockfd);
publisher_.add(sockfd, req.getTarget());

return cb_res;
};
Expand Down
97 changes: 49 additions & 48 deletions include/nadjieb/net/publisher.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <nadjieb/net/socket.hpp>
#include <nadjieb/net/topic.hpp>
#include <nadjieb/utils/non_copyable.hpp>
#include <nadjieb/utils/runnable.hpp>

Expand All @@ -20,7 +21,7 @@ class Publisher : public nadjieb::utils::NonCopyable, public nadjieb::utils::Run
public:
virtual ~Publisher() { stop(); }

void start(int num_workers = 1) {
void start(int num_workers = std::thread::hardware_concurrency()) {
state_ = nadjieb::utils::State::BOOTING;
end_publisher_ = false;
workers_.reserve(num_workers);
Expand All @@ -44,72 +45,73 @@ class Publisher : public nadjieb::utils::NonCopyable, public nadjieb::utils::Run
workers_.clear();
}

path2clients_.clear();
topics_.clear();
path_by_client_.clear();

while (!payloads_.empty()) {
payloads_.pop();
}
state_ = nadjieb::utils::State::TERMINATED;
}

void add(const std::string& path, const SocketFD& sockfd) {
void add(const SocketFD& sockfd, const std::string& path) {
if (end_publisher_) {
return;
}

const std::lock_guard<std::mutex> lock(p2c_mtx_);
path2clients_[path].emplace_back(NADJIEB_MJPEG_STREAMER_POLLFD{sockfd, POLLWRNORM, 0});
topics_[path].addClient(sockfd);

std::unique_lock<std::mutex> lock(path_by_client_mtx_);
path_by_client_[sockfd] = path;
}

bool pathExists(const std::string& path) { return (topics_.find(path) != topics_.end()); }

void removeClient(const SocketFD& sockfd) {
const std::lock_guard<std::mutex> lock(p2c_mtx_);
for (auto it = path2clients_.begin(); it != path2clients_.end();) {
it->second.erase(
std::remove_if(
it->second.begin(), it->second.end(),
[&](const NADJIEB_MJPEG_STREAMER_POLLFD& pfd) { return pfd.fd == sockfd; }),
it->second.end());

if (it->second.empty()) {
it = path2clients_.erase(it);
} else {
++it;
}
}
std::unique_lock<std::mutex> lock(path_by_client_mtx_);
topics_[path_by_client_[sockfd]].removeClient(sockfd);

path_by_client_.erase(sockfd);
}

void enqueue(const std::string& path, const std::string& buffer) {
if (end_publisher_) {
return;
}

if (path2clients_.find(path) == path2clients_.end()) {
return;
}
topics_[path].setBuffer(buffer);

std::unique_lock<std::mutex> payloads_lock(payloads_mtx_);
payloads_.emplace(path, buffer);
payloads_lock.unlock();
condition_.notify_one();
}
for (const auto& client : topics_[path].getClients()) {
if (topics_[path].getQueueSize(client.fd) > LIMIT_QUEUE_PER_CLIENT) {
continue;
}

std::unique_lock<std::mutex> payloads_lock(payloads_mtx_);
payloads_.emplace(path, client);
topics_[path].increaseQueue(client.fd);
payloads_lock.unlock();

bool hasClient(const std::string& path) {
const std::lock_guard<std::mutex> lock(p2c_mtx_);
return path2clients_.find(path) != path2clients_.end() && !path2clients_[path].empty();
condition_.notify_one();
}
}

bool hasClient(const std::string& path) { return topics_[path].hasClient(); }

private:
typedef std::pair<std::string, std::string> Payload;
typedef std::pair<std::string, NADJIEB_MJPEG_STREAMER_POLLFD> Payload;

std::condition_variable condition_;
std::vector<std::thread> workers_;
std::queue<Payload> payloads_;
std::unordered_map<std::string, std::vector<NADJIEB_MJPEG_STREAMER_POLLFD>> path2clients_;
std::unordered_map<SocketFD, std::string> path_by_client_;
std::unordered_map<std::string, Topic> topics_;
std::mutex cv_mtx_;
std::mutex p2c_mtx_;
std::mutex path_by_client_mtx_;
std::mutex payloads_mtx_;
bool end_publisher_ = true;

const static int LIMIT_QUEUE_PER_CLIENT = 5;

void worker() {
while (!end_publisher_) {
std::unique_lock<std::mutex> cv_lock(cv_mtx_);
Expand All @@ -123,34 +125,33 @@ class Publisher : public nadjieb::utils::NonCopyable, public nadjieb::utils::Run

Payload payload = std::move(payloads_.front());
payloads_.pop();
topics_[payload.first].decreaseQueue(payload.second.fd);

payloads_lock.unlock();
cv_lock.unlock();

auto buffer = topics_[payload.first].getBuffer();
std::string res_str
= "--nadjiebmjpegstreamer\r\n"
"Content-Type: image/jpeg\r\n"
"Content-Length: "
+ std::to_string(payload.second.size()) + "\r\n\r\n" + payload.second;

const std::lock_guard<std::mutex> lock(p2c_mtx_);
for (auto& client : path2clients_[payload.first]) {
auto socket_count = pollSockets(&client, 1, 1);
+ std::to_string(buffer.size()) + "\r\n\r\n" + buffer;

if (socket_count == NADJIEB_MJPEG_STREAMER_SOCKET_ERROR) {
throw std::runtime_error("pollSockets() failed\n");
}
auto socket_count = pollSockets(&payload.second, 1, 1);

if (socket_count == 0) {
continue;
}
if (socket_count == NADJIEB_MJPEG_STREAMER_SOCKET_ERROR) {
throw std::runtime_error("pollSockets() failed\n");
}

if (client.revents != POLLWRNORM) {
throw std::runtime_error("revents != POLLWRNORM\n");
}
if (socket_count == 0) {
continue;
}

sendViaSocket(client.fd, res_str.c_str(), res_str.size(), 0);
if (payload.second.revents != POLLWRNORM) {
throw std::runtime_error("revents != POLLWRNORM\n");
}

sendViaSocket(payload.second.fd, res_str.c_str(), res_str.size(), 0);
}
}
};
Expand Down
82 changes: 82 additions & 0 deletions include/nadjieb/net/topic.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#pragma once

#include <nadjieb/net/socket.hpp>

#include <mutex>
#include <shared_mutex>
#include <string>
#include <unordered_map>

namespace nadjieb {
namespace net {
class Topic {
public:
void setBuffer(const std::string& buffer) {
std::unique_lock lock(buffer_mtx_);
buffer_ = buffer;
}

std::string getBuffer() {
std::shared_lock lock(buffer_mtx_);
return buffer_;
}

void addClient(const SocketFD& sockfd) {
std::unique_lock client_lock(client_by_sockfd_mtx_);
client_by_sockfd_[sockfd] = NADJIEB_MJPEG_STREAMER_POLLFD{sockfd, POLLWRNORM, 0};

std::unique_lock queue_size_lock(queue_size_by_sockfd__mtx_);
queue_size_by_sockfd_[sockfd] = 0;
}

void removeClient(const SocketFD& sockfd) {
std::unique_lock lock(client_by_sockfd_mtx_);
client_by_sockfd_.erase(sockfd);

std::unique_lock queue_size_lock(queue_size_by_sockfd__mtx_);
queue_size_by_sockfd_.erase(sockfd);
}

bool hasClient() {
std::shared_lock lock(client_by_sockfd_mtx_);
return !client_by_sockfd_.empty();
}

std::vector<NADJIEB_MJPEG_STREAMER_POLLFD> getClients() {
std::shared_lock lock(client_by_sockfd_mtx_);

std::vector<NADJIEB_MJPEG_STREAMER_POLLFD> clients;
for (const auto& client : client_by_sockfd_) {
clients.push_back(client.second);
}

return clients;
}

int getQueueSize(const SocketFD& sockfd) {
std::shared_lock queue_size_lock(queue_size_by_sockfd__mtx_);
return queue_size_by_sockfd_[sockfd];
}

void increaseQueue(const SocketFD& sockfd) {
std::unique_lock queue_size_lock(queue_size_by_sockfd__mtx_);
++queue_size_by_sockfd_[sockfd];
}

void decreaseQueue(const SocketFD& sockfd) {
std::unique_lock queue_size_lock(queue_size_by_sockfd__mtx_);
--queue_size_by_sockfd_[sockfd];
}

private:
std::string buffer_;
std::shared_mutex buffer_mtx_;

std::unordered_map<SocketFD, NADJIEB_MJPEG_STREAMER_POLLFD> client_by_sockfd_;
std::shared_mutex client_by_sockfd_mtx_;

std::unordered_map<SocketFD, int> queue_size_by_sockfd_;
std::shared_mutex queue_size_by_sockfd__mtx_;
};
} // namespace net
} // namespace nadjieb
Loading

0 comments on commit 65e2129

Please sign in to comment.