Skip to content

Commit

Permalink
Moved the safe queue to it's own header
Browse files Browse the repository at this point in the history
Signed-off-by: AssemblyJohn <[email protected]>
  • Loading branch information
AssemblyJohn committed Dec 5, 2024
1 parent 10059a1 commit e450b0e
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 106 deletions.
118 changes: 118 additions & 0 deletions include/ocpp/common/safe_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest

#pragma once

#include <condition_variable>
#include <mutex>
#include <queue>

namespace ocpp {

/// \brief Thread safe message queue
template <typename T> class SafeQueue {
using safe_queue_reference = typename std::queue<T>::reference;
using safe_queue_const_reference = typename std::queue<T>::const_reference;

public:
/// \return True if the queue is empty
inline bool empty() const {
std::lock_guard lock(mutex);
return queue.empty();
}

inline safe_queue_reference front() const {
std::lock_guard lock(mutex);
return queue.front();
}

inline safe_queue_const_reference front() {
std::lock_guard lock(mutex);
return queue.front();
}

/// \return retrieves and removes the first element in the queue. Undefined behavior if the queue is empty
inline T pop() {
std::lock_guard lock(mutex);

T front = std::move(queue.front());
queue.pop();

return front;
}

/// \brief Queues an element and notifies any threads waiting on the internal conditional variable
inline void push(T&& value) {
{
std::lock_guard<std::mutex> lock(mutex);
queue.push(value);
}

notify_waiting_thread();
}

/// \brief Queues an element and notifies any threads waiting on the internal conditional variable
inline void push(const T& value) {
{
std::lock_guard<std::mutex> lock(mutex);
queue.push(value);
}

notify_waiting_thread();
}

/// \brief Clears the queue
inline void clear() {
std::lock_guard<std::mutex> lock(mutex);

std::queue<T> empty;
empty.swap(queue);
}

/// \brief Waits seconds for the queue to receive an element
/// \param seconds Count of seconds to wait, pass in a value <= 0 to wait indefinitely
inline void wait_on_queue(int seconds = -1) {
std::unique_lock<std::mutex> lock(mutex);

if (seconds > 0) {
cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return (false == queue.empty()); });
} else {
cv.wait(lock, [&]() { return (false == queue.empty()); });
}
}

/// \brief Same as 'wait_on_queue' but receives an additional predicate to wait upon
template <class Predicate> inline void wait_on_queue(Predicate pred, int seconds = -1) {
std::unique_lock<std::mutex> lock(mutex);

if (seconds > 0) {
cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return (false == queue.empty()) or pred(); });
} else {
cv.wait(lock, [&]() { return (false == queue.empty()) or pred(); });
}
}

/// \brief Waits on the queue for a custom event
template <class Predicate> inline void wait_on_custom(Predicate pred, int seconds = -1) {
std::unique_lock<std::mutex> lock(mutex);

if (seconds > 0) {
cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return pred(); });
} else {
cv.wait(lock, [&]() { return pred(); });
}
}

/// \brief Notifies a single waiting thread to wake up
inline void notify_waiting_thread() {
cv.notify_one();
}

private:
std::queue<T> queue;

mutable std::mutex mutex;
std::condition_variable cv;
};

} // namespace ocpp
107 changes: 1 addition & 106 deletions include/ocpp/common/websocket/websocket_libwebsockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#define OCPP_WEBSOCKET_TLS_TPM_HPP

#include <ocpp/common/evse_security.hpp>
#include <ocpp/common/safe_queue.hpp>
#include <ocpp/common/websocket/websocket_base.hpp>

#include <condition_variable>
Expand All @@ -20,112 +21,6 @@ namespace ocpp {
struct ConnectionData;
struct WebsocketMessage;

/// \brief Thread safe message queue
template <typename T> class SafeQueue {
using safe_queue_reference = typename std::queue<T>::reference;
using safe_queue_const_reference = typename std::queue<T>::const_reference;

public:
/// \return True if the queue is empty
inline bool empty() const {
std::lock_guard lock(mutex);
return queue.empty();
}

inline safe_queue_reference front() const {
std::lock_guard lock(mutex);
return queue.front();
}

inline safe_queue_const_reference front() {
std::lock_guard lock(mutex);
return queue.front();
}

/// \return retrieves and removes the first element in the queue. Undefined behavior if the queue is empty
inline T pop() {
std::lock_guard lock(mutex);

T front = std::move(queue.front());
queue.pop();

return front;
}

/// \brief Queues an element and notifies any threads waiting on the internal conditional variable
inline void push(T&& value) {
{
std::lock_guard<std::mutex> lock(mutex);
queue.push(value);
}

notify_waiting_thread();
}

/// \brief Queues an element and notifies any threads waiting on the internal conditional variable
inline void push(const T& value) {
{
std::lock_guard<std::mutex> lock(mutex);
queue.push(value);
}

notify_waiting_thread();
}

/// \brief Clears the queue
inline void clear() {
std::lock_guard<std::mutex> lock(mutex);

std::queue<T> empty;
empty.swap(queue);
}

/// \brief Waits seconds for the queue to receive an element
/// \param seconds Count of seconds to wait, pass in a value <= 0 to wait indefinitely
inline void wait_on_queue(int seconds = -1) {
std::unique_lock<std::mutex> lock(mutex);

if (seconds > 0) {
cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return (false == queue.empty()); });
} else {
cv.wait(lock, [&]() { return (false == queue.empty()); });
}
}

/// \brief Same as 'wait_on_queue' but receives an additional predicate to wait upon
template <class Predicate> inline void wait_on_queue(Predicate pred, int seconds = -1) {
std::unique_lock<std::mutex> lock(mutex);

if (seconds > 0) {
cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return (false == queue.empty()) or pred(); });
} else {
cv.wait(lock, [&]() { return (false == queue.empty()) or pred(); });
}
}

/// \brief Waits on the queue for a custom event
template <class Predicate> inline void wait_on_custom(Predicate pred, int seconds = -1) {
std::unique_lock<std::mutex> lock(mutex);

if (seconds > 0) {
cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return pred(); });
} else {
cv.wait(lock, [&]() { return pred(); });
}
}

/// \brief Notifies a single waiting thread to wake up
inline void notify_waiting_thread() {
cv.notify_one();
}

private:
std::queue<T> queue;

mutable std::mutex mutex;
std::condition_variable cv;
};

/// \brief Experimental libwebsockets TLS connection
class WebsocketLibwebsockets final : public WebsocketBase {
public:
Expand Down

0 comments on commit e450b0e

Please sign in to comment.