mirror of
https://github.com/aaronpo97/the-biergarten-app.git
synced 2026-06-01 01:54:00 +00:00
Implement BoundedChannel and multithreaded logging infra
This commit is contained in:
73
tooling/pipeline/includes/concurrency/bounded_channel.h
Normal file
73
tooling/pipeline/includes/concurrency/bounded_channel.h
Normal file
@@ -0,0 +1,73 @@
|
||||
#ifndef BIERGARTEN_PIPELINE_INCLUDES_CONCURRENCY_BOUNDED_CHANNEL_H_
|
||||
#define BIERGARTEN_PIPELINE_INCLUDES_CONCURRENCY_BOUNDED_CHANNEL_H_
|
||||
|
||||
#include <condition_variable>
|
||||
#include <cstddef>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <queue>
|
||||
|
||||
/**
|
||||
* @file bounded_channel.h
|
||||
* @brief Thread-safe, bounded multi-producer/multi-consumer synchronous channel.
|
||||
*
|
||||
* Intent: Enables asynchronous inter-thread communication with backpressure.
|
||||
* Models a synchronous channel where producers/consumers block on capacity limits.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @class BoundedChannel
|
||||
* @brief MPMC channel with fixed capacity and blocking semantics.
|
||||
*
|
||||
* Producers block when buffer is full; consumers block when empty.
|
||||
* Close() unblocks all waiters and signals channel exhaustion.
|
||||
*/
|
||||
template <typename T>
|
||||
class BoundedChannel {
|
||||
// -------------------------------------------------------------------------
|
||||
// Internal state — all access must be guarded by mutex_.
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
std::queue<T> queue_;
|
||||
|
||||
std::mutex mutex_;
|
||||
|
||||
std::condition_variable not_full_;
|
||||
|
||||
std::condition_variable not_empty_;
|
||||
|
||||
std::size_t capacity_;
|
||||
|
||||
bool closed_ = false;
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Construct a bounded channel with the given capacity.
|
||||
* @param capacity Maximum number of items the channel may hold.
|
||||
*/
|
||||
explicit BoundedChannel(std::size_t capacity) : capacity_(capacity) {}
|
||||
|
||||
/**
|
||||
* @brief Send an item into the channel. Blocks when the channel is full.
|
||||
* @param item Move-only item to enqueue.
|
||||
*/
|
||||
void Send(T item);
|
||||
|
||||
/**
|
||||
* @brief Receive an item from the channel. Blocks when the channel is
|
||||
* empty.
|
||||
* @return std::optional<T> containing the item, or std::nullopt when the
|
||||
* channel is closed and drained.
|
||||
*/
|
||||
std::optional<T> Receive();
|
||||
|
||||
/**
|
||||
* @brief Close the channel and unblock all waiting threads. Idempotent.
|
||||
*/
|
||||
void Close();
|
||||
};
|
||||
|
||||
// Include the template implementation
|
||||
#include "bounded_channel.tcc"
|
||||
|
||||
#endif // BIERGARTEN_PIPELINE_INCLUDES_CONCURRENCY_BOUNDED_CHANNEL_H_
|
||||
57
tooling/pipeline/includes/concurrency/bounded_channel.tcc
Normal file
57
tooling/pipeline/includes/concurrency/bounded_channel.tcc
Normal file
@@ -0,0 +1,57 @@
|
||||
#include "bounded_channel.h"
|
||||
|
||||
template <typename T>
|
||||
void BoundedChannel<T>::Send(T item) {
|
||||
// Acquire exclusive ownership of the mutex; released automatically on scope exit.
|
||||
std::unique_lock lock(mutex_);
|
||||
|
||||
// Block until there is space in the queue or the channel has been closed.
|
||||
// The predicate guards against spurious wakeups.
|
||||
not_full_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; });
|
||||
|
||||
// If the channel was closed while waiting, discard the item and return.
|
||||
if (closed_) return;
|
||||
|
||||
// Move the item into the queue to avoid an unnecessary copy.
|
||||
queue_.push(std::move(item));
|
||||
|
||||
// Wake one blocked Receive() call to signal that data is now available.
|
||||
not_empty_.notify_one();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
std::optional<T> BoundedChannel<T>::Receive() {
|
||||
// Acquire exclusive ownership of the mutex.
|
||||
std::unique_lock lock(mutex_);
|
||||
|
||||
// Block until the queue is non-empty or the channel has been closed.
|
||||
// The predicate guards against spurious wakeups.
|
||||
not_empty_.wait(lock, [&] { return !queue_.empty() || closed_; });
|
||||
|
||||
// If woken due to closure and no items remain, signal exhaustion via nullopt.
|
||||
if (queue_.empty()) return std::nullopt;
|
||||
|
||||
// Move the front item out of the queue to avoid an unnecessary copy.
|
||||
T item = std::move(queue_.front());
|
||||
queue_.pop();
|
||||
|
||||
// Wake one blocked Send() call to signal that a slot has opened.
|
||||
not_full_.notify_one();
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void BoundedChannel<T>::Close() {
|
||||
// Acquire exclusive ownership of the mutex to ensure visibility of the flag.
|
||||
std::unique_lock lock(mutex_);
|
||||
|
||||
// Mark the channel as closed; subsequent Send() calls will be dropped.
|
||||
closed_ = true;
|
||||
|
||||
// Wake all blocked Send() callers so they can observe the closed flag and exit.
|
||||
not_full_.notify_all();
|
||||
|
||||
// Wake all blocked Receive() callers so they can drain remaining items or return nullopt.
|
||||
not_empty_.notify_all();
|
||||
}
|
||||
Reference in New Issue
Block a user