diff --git a/tooling/pipeline/includes/BoundedChannel.h b/tooling/pipeline/includes/BoundedChannel.h new file mode 100644 index 0000000..3753d8c --- /dev/null +++ b/tooling/pipeline/includes/BoundedChannel.h @@ -0,0 +1,77 @@ +// +// Created by aaronpo on 29/04/2026. +// + +#ifndef CONCURRENCY_INTRO_CPP_INCLUDES_BOUNDEDCHANNEL_H_ +#define CONCURRENCY_INTRO_CPP_INCLUDES_BOUNDEDCHANNEL_H_ + +#include +#include +#include +#include +#include + +/** + * @file BoundedChannel.h + * @brief A thread-safe, bounded multi-producer/multi-consumer channel. + */ + +// --------------------------------------------------------------------------- +// BoundedChannel +// +// Models a synchronous channel with a fixed-capacity internal buffer. +// Producers block when the buffer is full (backpressure). +// Consumers block when the buffer be empty. +// Calling close() unblocks all waiting threads and signals exhaustion. +// --------------------------------------------------------------------------- +template +class BoundedChannel { + + // ------------------------------------------------------------------------- + // Internal state — all access must be guarded by mutex_. + // ------------------------------------------------------------------------- + + std::queue queue_; + + std::mutex mutex_; + + std::condition_variable not_full_; + + std::condition_variable not_empty_; + + std::size_t capacity_; + + bool closed_ = false; + + public: + + /** + * @brief Construct a bounded channel with the given capacity. + * @param capacity Maximum number of items the channel may hold. + */ + explicit BoundedChannel(std::size_t capacity) : capacity_(capacity) {} + + /** + * @brief Send an item into the channel. Blocks when the channel is full. + * @param item Move-only item to enqueue. + */ + void Send(T item); + + /** + * @brief Receive an item from the channel. Blocks when the channel is + * empty. + * @return std::optional containing the item, or std::nullopt when the + * channel is closed and drained. + */ + std::optional Receive(); + + /** + * @brief Close the channel and unblock all waiting threads. Idempotent. + */ + void close(); +}; + +// Include the template implementation +#include "BoundedChannel.tcc" + +#endif // CONCURRENCY_INTRO_CPP_INCLUDES_BOUNDEDCHANNEL_H_ diff --git a/tooling/pipeline/includes/BoundedChannel.tcc b/tooling/pipeline/includes/BoundedChannel.tcc new file mode 100644 index 0000000..37a5aa3 --- /dev/null +++ b/tooling/pipeline/includes/BoundedChannel.tcc @@ -0,0 +1,57 @@ +#include "BoundedChannel.h" + +template +void BoundedChannel::Send(T item) { + // Acquire exclusive ownership of the mutex; released automatically on scope exit. + std::unique_lock lock(mutex_); + + // Block until there is space in the queue or the channel has been closed. + // The predicate guards against spurious wakeups. + not_full_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + + // If the channel was closed while waiting, discard the item and return. + if (closed_) return; + + // Move the item into the queue to avoid an unnecessary copy. + queue_.push(std::move(item)); + + // Wake one blocked Receive() call to signal that data is now available. + not_empty_.notify_one(); +} + +template +std::optional BoundedChannel::Receive() { + // Acquire exclusive ownership of the mutex. + std::unique_lock lock(mutex_); + + // Block until the queue is non-empty or the channel has been closed. + // The predicate guards against spurious wakeups. + not_empty_.wait(lock, [&] { return !queue_.empty() || closed_; }); + + // If woken due to closure and no items remain, signal exhaustion via nullopt. + if (queue_.empty()) return std::nullopt; + + // Move the front item out of the queue to avoid an unnecessary copy. + T item = std::move(queue_.front()); + queue_.pop(); + + // Wake one blocked Send() call to signal that a slot has opened. + not_full_.notify_one(); + + return item; +} + +template +void BoundedChannel::close() { + // Acquire exclusive ownership of the mutex to ensure visibility of the flag. + std::unique_lock lock(mutex_); + + // Mark the channel as closed; subsequent Send() calls will be dropped. + closed_ = true; + + // Wake all blocked Send() callers so they can observe the closed flag and exit. + not_full_.notify_all(); + + // Wake all blocked Receive() callers so they can drain remaining items or return nullopt. + not_empty_.notify_all(); +} \ No newline at end of file