From d372ce7c07101f03862c696a709852d95eb2eafa Mon Sep 17 00:00:00 2001 From: GeorgH93 Date: Tue, 27 Jul 2021 02:05:08 +0200 Subject: [PATCH] Add async frame recording --- openVulkanoCpp/AR/ArRecorder.cpp | 67 ++++++++++++++++++++++++++++++-- openVulkanoCpp/AR/ArRecorder.hpp | 26 ++++++++++++- 2 files changed, 88 insertions(+), 5 deletions(-) diff --git a/openVulkanoCpp/AR/ArRecorder.cpp b/openVulkanoCpp/AR/ArRecorder.cpp index 4bfe98e..1d33491 100644 --- a/openVulkanoCpp/AR/ArRecorder.cpp +++ b/openVulkanoCpp/AR/ArRecorder.cpp @@ -12,6 +12,7 @@ #include "IO/Files/Pnm.hpp" #include "IO/AppFolders.hpp" #include "Base/BlockProfiler.hpp" +#include "Base/Logger.hpp" #include "Image/YuvUtils.hpp" #include #include @@ -42,7 +43,7 @@ namespace openVulkanoCpp::AR } ArRecorder::ArRecorder(ArSession* session) - : m_session(session) + : m_session(session), m_asyncProcessor(this) { m_settings.path = GeneratePath(AppFolders::GetAppDataHomeDir(), "ar_recording"); session->OnNewFrameHighResolution += EventHandler(this, &ArRecorder::SaveHighResolution); @@ -216,7 +217,7 @@ namespace openVulkanoCpp::AR if (!m_recording) return; if (m_settings.asyncRecording) { - + m_asyncProcessor.Queue(frame, false); } else Write(frame.get()); } @@ -226,8 +227,68 @@ namespace openVulkanoCpp::AR if (!m_recording) return; if (m_settings.asyncRecording) { - + m_asyncProcessor.Queue(frame, true); } else Write(frame.get(), true); } + + //region AsyncProcessor + ArRecorder::AsyncProcessor::AsyncProcessor(ArRecorder* recorder) + : recorder(recorder), processingThread(&ArRecorder::AsyncProcessor::Handler, this) + {} + + ArRecorder::AsyncProcessor::~AsyncProcessor() + { + requestExit = true; + queueMutex.lock(); + newDataAvailable.notify_one(); + if (processingThread.joinable()) processingThread.join(); + } + + void ArRecorder::AsyncProcessor::Queue(const std::shared_ptr& frame, bool highRes) + { + if (requestExit) return; // no need to queue up on shutdown + { + std::unique_lock lock(queueMutex); + if (highRes) highResFrameQueue.push(frame); + else frameQueue.push(frame); + } + newDataAvailable.notify_all(); + } + + void ArRecorder::AsyncProcessor::Handler() + { + Utils::SetThreadName("ArRecorder"); + std::unique_lock lock(queueMutex); + do + { + newDataAvailable.wait(lock, [this]{ return !frameQueue.empty() || !highResFrameQueue.empty() || requestExit; }); + while(!highResFrameQueue.empty()) + { + auto frame = std::move(highResFrameQueue.front()); + highResFrameQueue.pop(); + if (frame->IsSaved()) continue; + lock.unlock(); + recorder->Write(frame.get(), true); + lock.lock(); + } + if (requestExit) break; + while(!frameQueue.empty()) + { + if (frameQueue.size() > 3) + { + Logger::AR->warn("Falling behind saving frames, skipping ..."); + //while(frameQueue.size() > 3) frameQueue.pop(); + } + auto frame = std::move(frameQueue.front()); + frameQueue.pop(); + if (frame->IsSaved()) continue; + lock.unlock(); + recorder->Write(frame.get(), false); + lock.lock(); + } + } + while (!requestExit); + } + //endregion } diff --git a/openVulkanoCpp/AR/ArRecorder.hpp b/openVulkanoCpp/AR/ArRecorder.hpp index 81054f1..c98fc78 100644 --- a/openVulkanoCpp/AR/ArRecorder.hpp +++ b/openVulkanoCpp/AR/ArRecorder.hpp @@ -7,9 +7,14 @@ #pragma once #include "Math/ByteSize.hpp" -#include +#include +#include #include +#include #include +#include +#include +#include namespace openVulkanoCpp { @@ -50,6 +55,21 @@ namespace openVulkanoCpp::AR class ArRecorder final { + struct AsyncProcessor final + { + ArRecorder* recorder; + std::thread processingThread; + std::queue> frameQueue, highResFrameQueue; + std::mutex queueMutex; + std::condition_variable newDataAvailable; + std::atomic_bool requestExit{}; + + explicit AsyncProcessor(ArRecorder* recorder); + ~AsyncProcessor(); + void Queue(const std::shared_ptr& frame, bool highRes); + void Handler(); + }; + ArSession* m_session; std::unique_ptr m_colorWriter, m_depthWriter, m_confidenceWriter, m_metadataWriter, m_highResWriter; RecordingSettings m_settings; @@ -57,6 +77,8 @@ namespace openVulkanoCpp::AR IEventHandler* m_newFrameHandler = nullptr; + AsyncProcessor m_asyncProcessor; + void Write(ArFrame* frame, bool highRes = false); void WriteColorImage(ArFrame* arFrame, MultiPartArchiveWriter* colorWriter, bool highRes); void WriteDepthImage(ArFrame *arFrame, MultiPartArchiveWriter* depthWriter, MultiPartArchiveWriter* confWriter); @@ -147,7 +169,7 @@ namespace openVulkanoCpp::AR void SetArchivePartMaxFileSize(size_t maxPartSize = 2_GiB) { m_settings.archiveSize = maxPartSize; } - size_t GetArchivePartMaxFileSize() const { m_settings.archiveSize; } + size_t GetArchivePartMaxFileSize() const { return m_settings.archiveSize; } const RecordingSettings& GetRecordingSettings() const { return m_settings; } };