From da79c9fdefdba12d973478285496e356cb53edfd Mon Sep 17 00:00:00 2001 From: GeorgH93 Date: Tue, 31 Oct 2023 16:19:22 +0100 Subject: [PATCH] Add ThreadSafeQueue --- .../Concurent/Containers/ThreadSafeQueue.hpp | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 openVulkanoCpp/Data/Concurent/Containers/ThreadSafeQueue.hpp diff --git a/openVulkanoCpp/Data/Concurent/Containers/ThreadSafeQueue.hpp b/openVulkanoCpp/Data/Concurent/Containers/ThreadSafeQueue.hpp new file mode 100644 index 0000000..4ec4d60 --- /dev/null +++ b/openVulkanoCpp/Data/Concurent/Containers/ThreadSafeQueue.hpp @@ -0,0 +1,119 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace OpenVulkano +{ + template + class ThreadSafeQueue final + { + std::queue m_queue; + mutable std::mutex m_mutex; + std::condition_variable m_newDataAvailable; + bool m_end = false; + std::atomic_int m_waiting = 0; + + inline std::optional PopInternal() + { + if (m_queue.empty()) return {}; + T tmp = std::move(m_queue.front()); + m_queue.pop(); + return tmp; + } + + public: + ThreadSafeQueue() = default; + + ThreadSafeQueue(const ThreadSafeQueue &) = delete ; + ThreadSafeQueue& operator=(const ThreadSafeQueue &) = delete ; + + ThreadSafeQueue(ThreadSafeQueue&& other) noexcept + { + std::scoped_lock lock(m_mutex, other.m_mutex); + m_queue = std::move(other.m_queue); + } + + ~ThreadSafeQueue() + { + m_end = true; + m_newDataAvailable.notify_all(); + while (m_waiting) { std::this_thread::yield(); } + } + + size_t Size() const + { + std::lock_guard lock(m_mutex); + return m_queue.size(); + } + + bool Empty() const + { + std::lock_guard lock(m_mutex); + return m_queue.empty(); + } + + std::optional Pop() + { + std::unique_lock lock(m_mutex); + return PopInternal(); + } + + std::optional PopWait() + { + m_waiting++; + std::unique_lock lock(m_mutex); + m_newDataAvailable.wait(lock, [this]() { return !m_queue.empty() || m_end; }); + auto tmp = PopInternal(); + m_waiting--; + return tmp; + } + + std::optional Peak() const + { + std::lock_guard lock(m_mutex); + if (m_queue.empty()) return {}; + return m_queue.front(); + } + + size_t Push(const T& item) + { + std::unique_lock lock(m_mutex); + if (m_end) return 0; + m_queue.push(item); + m_newDataAvailable.notify_one(); + return m_queue.size(); + } + + size_t Push(T&& item) + { + std::unique_lock lock(m_mutex); + if (m_end) return 0; + m_queue.push(std::forward(item)); + m_newDataAvailable.notify_one(); + return m_queue.size(); + } + + ThreadSafeQueue& operator=(ThreadSafeQueue&& other) noexcept + { + if (this != &other) + { + std::scoped_lock lock(m_mutex, other.m_mutex); + + m_queue = std::move(other.m_queue); + m_newDataAvailable.notify_one(); + } + return *this; + } + }; +}