Add bounded channels

This commit is contained in:
Aaron Po
2026-05-14 19:50:08 -04:00
parent 688c28f85d
commit f93b14897b
2 changed files with 134 additions and 0 deletions

View File

@@ -0,0 +1,77 @@
//
// Created by aaronpo on 29/04/2026.
//
#ifndef CONCURRENCY_INTRO_CPP_INCLUDES_BOUNDEDCHANNEL_H_
#define CONCURRENCY_INTRO_CPP_INCLUDES_BOUNDEDCHANNEL_H_
#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>
#include <cstddef>
/**
* @file BoundedChannel.h
* @brief A thread-safe, bounded multi-producer/multi-consumer channel.
*/
// ---------------------------------------------------------------------------
// BoundedChannel<T>
//
// Models a synchronous channel with a fixed-capacity internal buffer.
// Producers block when the buffer is full (backpressure).
// Consumers block when the buffer be empty.
// Calling close() unblocks all waiting threads and signals exhaustion.
// ---------------------------------------------------------------------------
template <typename T>
class BoundedChannel {
// -------------------------------------------------------------------------
// Internal state — all access must be guarded by mutex_.
// -------------------------------------------------------------------------
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
std::size_t capacity_;
bool closed_ = false;
public:
/**
* @brief Construct a bounded channel with the given capacity.
* @param capacity Maximum number of items the channel may hold.
*/
explicit BoundedChannel(std::size_t capacity) : capacity_(capacity) {}
/**
* @brief Send an item into the channel. Blocks when the channel is full.
* @param item Move-only item to enqueue.
*/
void Send(T item);
/**
* @brief Receive an item from the channel. Blocks when the channel is
* empty.
* @return std::optional<T> containing the item, or std::nullopt when the
* channel is closed and drained.
*/
std::optional<T> Receive();
/**
* @brief Close the channel and unblock all waiting threads. Idempotent.
*/
void close();
};
// Include the template implementation
#include "BoundedChannel.tcc"
#endif // CONCURRENCY_INTRO_CPP_INCLUDES_BOUNDEDCHANNEL_H_

View File

@@ -0,0 +1,57 @@
#include "BoundedChannel.h"
template <typename T>
void BoundedChannel<T>::Send(T item) {
// Acquire exclusive ownership of the mutex; released automatically on scope exit.
std::unique_lock lock(mutex_);
// Block until there is space in the queue or the channel has been closed.
// The predicate guards against spurious wakeups.
not_full_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; });
// If the channel was closed while waiting, discard the item and return.
if (closed_) return;
// Move the item into the queue to avoid an unnecessary copy.
queue_.push(std::move(item));
// Wake one blocked Receive() call to signal that data is now available.
not_empty_.notify_one();
}
template <typename T>
std::optional<T> BoundedChannel<T>::Receive() {
// Acquire exclusive ownership of the mutex.
std::unique_lock lock(mutex_);
// Block until the queue is non-empty or the channel has been closed.
// The predicate guards against spurious wakeups.
not_empty_.wait(lock, [&] { return !queue_.empty() || closed_; });
// If woken due to closure and no items remain, signal exhaustion via nullopt.
if (queue_.empty()) return std::nullopt;
// Move the front item out of the queue to avoid an unnecessary copy.
T item = std::move(queue_.front());
queue_.pop();
// Wake one blocked Send() call to signal that a slot has opened.
not_full_.notify_one();
return item;
}
template <typename T>
void BoundedChannel<T>::close() {
// Acquire exclusive ownership of the mutex to ensure visibility of the flag.
std::unique_lock lock(mutex_);
// Mark the channel as closed; subsequent Send() calls will be dropped.
closed_ = true;
// Wake all blocked Send() callers so they can observe the closed flag and exit.
not_full_.notify_all();
// Wake all blocked Receive() callers so they can drain remaining items or return nullopt.
not_empty_.notify_all();
}