diff --git a/tooling/pipeline/CMakeLists.txt b/tooling/pipeline/CMakeLists.txt index abdf592..f64db59 100644 --- a/tooling/pipeline/CMakeLists.txt +++ b/tooling/pipeline/CMakeLists.txt @@ -155,13 +155,13 @@ target_sources(${PROJECT_NAME} PRIVATE src/application_options/parse_arguments.cc ) -# --- biergarten_data_generator --- +# --- biergarten_pipeline_orchestrator --- target_sources(${PROJECT_NAME} PRIVATE - src/biergarten_data_generator/log_results.cc - src/biergarten_data_generator/biergarten_data_generator.cc - src/biergarten_data_generator/generate_breweries.cc - src/biergarten_data_generator/run.cc - src/biergarten_data_generator/query_cities_with_countries.cc + src/biergarten_pipeline_orchestrator/log_results.cc + src/biergarten_pipeline_orchestrator/biergarten_data_generator.cc + src/biergarten_pipeline_orchestrator/generate_breweries.cc + src/biergarten_pipeline_orchestrator/run.cc + src/biergarten_pipeline_orchestrator/query_cities_with_countries.cc ) # --- web_client --- @@ -202,17 +202,23 @@ target_sources(${PROJECT_NAME} PRIVATE # --- services: sqlite --- target_sources(${PROJECT_NAME} PRIVATE - src/services/sqlite/process_record.cc - src/services/sqlite/sqlite_export_service.cc - src/services/sqlite/finalize.cc - src/services/sqlite/initialize.cc - src/services/sqlite/helpers/sqlite_connection_helpers.cc - src/services/sqlite/helpers/sqlite_statement_helpers.cc + src/services/sqlite/process_record.cc + src/services/sqlite/sqlite_export_service.cc + src/services/sqlite/finalize.cc + src/services/sqlite/initialize.cc + src/services/sqlite/helpers/sqlite_connection_helpers.cc + src/services/sqlite/helpers/sqlite_statement_helpers.cc +) + +# --- services: logging --- +target_sources(${PROJECT_NAME} PRIVATE + "src/services/logging/channel_logger.cc" + src/services/logging/log_consumer.cc ) # --- services (top-level) --- target_sources(${PROJECT_NAME} PRIVATE - src/services/prompt_directory.cc + src/services/prompt_directory.cc ) # 6. Include Directories, Link Libraries & Compile Definitions diff --git a/tooling/pipeline/includes/biergarten_data_generator.h b/tooling/pipeline/includes/biergarten_data_generator.h index e74ba02..573df8a 100644 --- a/tooling/pipeline/includes/biergarten_data_generator.h +++ b/tooling/pipeline/includes/biergarten_data_generator.h @@ -3,7 +3,10 @@ /** * @file biergarten_data_generator.h - * @brief Core orchestration class for pipeline data generation. + * @brief Orchestration for end-to-end brewery data generation pipeline. + * + * Intent: Coordinates location loading, enrichment, and generation phases + * to produce a complete dataset. Coordinates dependencies via composition root. */ #include @@ -21,19 +24,21 @@ * This class encapsulates the core logic for generating brewery data. * It handles location loading, city enrichment, and brewery generation. */ -class BiergartenDataGenerator { +class BiergartenPipelineOrchestrator { public: - /** - * @brief Construct a BiergartenDataGenerator with injected dependencies. - * - * @param context_service Context provider for sampled locations. - * @param generator Brewery and user data generator. - * @param exporter Storage backend for generated brewery data. - */ - BiergartenDataGenerator(std::unique_ptr context_service, - std::unique_ptr generator, - std::unique_ptr exporter, - const ApplicationOptions& application_options); +/** + * @brief Constructs the orchestrator with injected pipeline dependencies. + * + * @param context_service Provides regional context for locations. + * @param generator Implementation (Llama or Mock) for brewery/user generation. + * @param exporter Database backend for persisting generated records. + * @param application_options CLI configuration and paths. + */ + BiergartenPipelineOrchestrator( + std::unique_ptr context_service, + std::unique_ptr generator, + std::unique_ptr exporter, + const ApplicationOptions& application_options); /** * @brief Run the data generation pipeline. @@ -54,10 +59,11 @@ class BiergartenDataGenerator { /// @brief Generator dependency selected in the composition root. std::unique_ptr generator_; - /// @brief Storage backend for generated brewery records. - std::unique_ptr exporter_; + /// @brief Storage backend for generated brewery records. + std::unique_ptr exporter_; - const ApplicationOptions application_options_; + /// @brief CLI configuration: paths, model settings, generation parameters. + ApplicationOptions application_options_; /** * @brief Load locations from JSON and sample cities. diff --git a/tooling/pipeline/includes/concurrency/bounded_channel.h b/tooling/pipeline/includes/concurrency/bounded_channel.h new file mode 100644 index 0000000..ff4fdc2 --- /dev/null +++ b/tooling/pipeline/includes/concurrency/bounded_channel.h @@ -0,0 +1,73 @@ +#ifndef BIERGARTEN_PIPELINE_INCLUDES_CONCURRENCY_BOUNDED_CHANNEL_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_CONCURRENCY_BOUNDED_CHANNEL_H_ + +#include +#include +#include +#include +#include + +/** + * @file bounded_channel.h + * @brief Thread-safe, bounded multi-producer/multi-consumer synchronous channel. + * + * Intent: Enables asynchronous inter-thread communication with backpressure. + * Models a synchronous channel where producers/consumers block on capacity limits. + */ + +/** + * @class BoundedChannel + * @brief MPMC channel with fixed capacity and blocking semantics. + * + * Producers block when buffer is full; consumers block when empty. + * Close() unblocks all waiters and signals channel exhaustion. + */ +template +class BoundedChannel { + // ------------------------------------------------------------------------- + // Internal state — all access must be guarded by mutex_. + // ------------------------------------------------------------------------- + + std::queue queue_; + + std::mutex mutex_; + + std::condition_variable not_full_; + + std::condition_variable not_empty_; + + std::size_t capacity_; + + bool closed_ = false; + + public: + /** + * @brief Construct a bounded channel with the given capacity. + * @param capacity Maximum number of items the channel may hold. + */ + explicit BoundedChannel(std::size_t capacity) : capacity_(capacity) {} + + /** + * @brief Send an item into the channel. Blocks when the channel is full. + * @param item Move-only item to enqueue. + */ + void Send(T item); + + /** + * @brief Receive an item from the channel. Blocks when the channel is + * empty. + * @return std::optional containing the item, or std::nullopt when the + * channel is closed and drained. + */ + std::optional Receive(); + + /** + * @brief Close the channel and unblock all waiting threads. Idempotent. + */ + void Close(); +}; + +// Include the template implementation +#include "bounded_channel.tcc" + +#endif // BIERGARTEN_PIPELINE_INCLUDES_CONCURRENCY_BOUNDED_CHANNEL_H_ diff --git a/tooling/pipeline/includes/concurrency/bounded_channel.tcc b/tooling/pipeline/includes/concurrency/bounded_channel.tcc new file mode 100644 index 0000000..951bfbd --- /dev/null +++ b/tooling/pipeline/includes/concurrency/bounded_channel.tcc @@ -0,0 +1,57 @@ +#include "bounded_channel.h" + +template +void BoundedChannel::Send(T item) { + // Acquire exclusive ownership of the mutex; released automatically on scope exit. + std::unique_lock lock(mutex_); + + // Block until there is space in the queue or the channel has been closed. + // The predicate guards against spurious wakeups. + not_full_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + + // If the channel was closed while waiting, discard the item and return. + if (closed_) return; + + // Move the item into the queue to avoid an unnecessary copy. + queue_.push(std::move(item)); + + // Wake one blocked Receive() call to signal that data is now available. + not_empty_.notify_one(); +} + +template +std::optional BoundedChannel::Receive() { + // Acquire exclusive ownership of the mutex. + std::unique_lock lock(mutex_); + + // Block until the queue is non-empty or the channel has been closed. + // The predicate guards against spurious wakeups. + not_empty_.wait(lock, [&] { return !queue_.empty() || closed_; }); + + // If woken due to closure and no items remain, signal exhaustion via nullopt. + if (queue_.empty()) return std::nullopt; + + // Move the front item out of the queue to avoid an unnecessary copy. + T item = std::move(queue_.front()); + queue_.pop(); + + // Wake one blocked Send() call to signal that a slot has opened. + not_full_.notify_one(); + + return item; +} + +template +void BoundedChannel::Close() { + // Acquire exclusive ownership of the mutex to ensure visibility of the flag. + std::unique_lock lock(mutex_); + + // Mark the channel as closed; subsequent Send() calls will be dropped. + closed_ = true; + + // Wake all blocked Send() callers so they can observe the closed flag and exit. + not_full_.notify_all(); + + // Wake all blocked Receive() callers so they can drain remaining items or return nullopt. + not_empty_.notify_all(); +} \ No newline at end of file diff --git a/tooling/pipeline/includes/services/logging/channel_logger.h b/tooling/pipeline/includes/services/logging/channel_logger.h new file mode 100644 index 0000000..d2b5bfa --- /dev/null +++ b/tooling/pipeline/includes/services/logging/channel_logger.h @@ -0,0 +1,54 @@ +/** + * @file services/logging/channel_logger.h + * @brief Channel-backed producer for asynchronous pipeline logging. + * + * Intent: Decouple logging from synchronous I/O by forwarding entries to a + * bounded channel. LogConsumer drains the channel on a dedicated thread. + */ + +#ifndef BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_CHANNEL_LOGGER_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_CHANNEL_LOGGER_H_ + +#include + +#include "concurrency/bounded_channel.h" +#include "services/logging/log_entry.h" +#include "services/logging/logger.h" + +/** + * @class ChannelLogger + * @brief ILogger implementation that sends entries to a BoundedChannel. + * + * Non-copyable, non-movable. Holds a non-owning reference to the channel. + */ +class ChannelLogger final : public ILogger { + public: + /** + * @brief Construct a channel-backed logger. + * + * @param channel Reference to bounded channel for log entry transfer. + * Channel must outlive this logger instance. + */ + explicit ChannelLogger(BoundedChannel& channel); + + ChannelLogger(const ChannelLogger&) = delete; + ChannelLogger& operator=(const ChannelLogger&) = delete; + ChannelLogger(ChannelLogger&&) = delete; + ChannelLogger& operator=(ChannelLogger&&) = delete; + + ~ChannelLogger() override = default; + + /** + * @brief Queue a log entry for asynchronous processing. + * + * Blocks if the channel is full (backpressure). Returns immediately + * if the channel is closed. + */ + void Log(LogLevel level, PipelinePhase phase, + std::string_view message) override; + + private: + BoundedChannel& channel_; +}; + +#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_CHANNEL_LOGGER_H_ diff --git a/tooling/pipeline/includes/services/logging/log_consumer.h b/tooling/pipeline/includes/services/logging/log_consumer.h new file mode 100644 index 0000000..1865e1e --- /dev/null +++ b/tooling/pipeline/includes/services/logging/log_consumer.h @@ -0,0 +1,59 @@ +/** + * @file services/logging/log_consumer.h + * @brief Dedicated log consumer/drain for asynchronous pipeline logging. + * + * Intent: Dequeue LogEntry values from a BoundedChannel on a dedicated thread + * and forward them to spdlog for I/O and formatting. Decouples application + * logic from logging latency. + */ + +#ifndef BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_CONSUMER_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_CONSUMER_H_ + +#include + +#include + +#include "concurrency/bounded_channel.h" +#include "services/logging/log_entry.h" + +/** + * @class LogConsumer + * @brief Consumes log entries from channel and forwards to spdlog. + * + * Non-copyable, non-movable. Designed to run on its own dedicated std::thread. + * Drains the channel until closure, then exits cleanly. + */ +class LogConsumer { + public: + /** + * @brief Construct a log consumer. + * + * @param channel Reference to bounded channel for log entry retrieval. + * Channel must outlive this consumer instance. + */ + explicit LogConsumer(BoundedChannel& channel); + + LogConsumer(const LogConsumer&) = delete; + LogConsumer& operator=(const LogConsumer&) = delete; + LogConsumer(LogConsumer&&) = delete; + LogConsumer& operator=(LogConsumer&&) = delete; + + /** + * @brief Main loop: drain channel and forward entries to spdlog. + * + * Intended to be called once on a dedicated thread. Returns when: + * - Channel is closed AND all queued entries are drained. + * + * Thread-safe for use from multiple ChannelLogger instances on other threads. + */ + void Run(); + + private: + BoundedChannel& channel_; + + static spdlog::level::level_enum ToSpdlogLevel(LogLevel level); + static std::string ToString(PipelinePhase phase); +}; + +#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_CONSUMER_H_ diff --git a/tooling/pipeline/includes/services/logging/log_entry.h b/tooling/pipeline/includes/services/logging/log_entry.h new file mode 100644 index 0000000..5b47677 --- /dev/null +++ b/tooling/pipeline/includes/services/logging/log_entry.h @@ -0,0 +1,65 @@ +/** + * @file services/logging/log_entry.h + * @brief POD log entry structure for asynchronous pipeline logging. + * + * Intent: Lightweight, move-safe data transfer between logging producer + * (ChannelLogger) and consumer (LogConsumer) via BoundedChannel. + */ + +#ifndef BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_ENTRY_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_ENTRY_H_ + +#include +#include + +/** + * @enum LogLevel + * @brief Severity levels for log entries. + */ +enum class LogLevel { + Debug, ///< Development/debugging information. + Info, ///< General informational messages. + Warn, ///< Warning conditions. + Error, ///< Error conditions. +}; + +/** + * @enum PipelinePhase + * @brief Execution phases for contextual logging. + * + * Used to tag log entries by their processing stage, enabling phase-specific + * analysis and filtering of the execution timeline. + */ +enum class PipelinePhase { + Startup, ///< Initialization and validation. + UserGeneration, ///< User profile generation. + BreweryAndBeerGeneration, ///< Brewery and beer data generation. + CheckinGeneration, ///< Checkin (visit) record generation. + RatingGeneration, ///< Rating and review generation. + FollowGeneration, ///< Follow relationship generation. + Teardown, ///< Finalization and cleanup. +}; + +/** + * @struct LogEntry + * @brief Single log event for asynchronous processing. + * + * All fields are value types, allowing safe move semantics across + * BoundedChannel without shared ownership or synchronization overhead. + */ +struct LogEntry { + /// @brief Timestamp when entry was created. + std::chrono::system_clock::time_point timestamp = + std::chrono::system_clock::now(); + + /// @brief Severity level of this entry. + LogLevel level; + + /// @brief Pipeline phase when entry was logged. + PipelinePhase phase; + + /// @brief Log message text. + std::string message; +}; + +#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_ENTRY_H_ \ No newline at end of file diff --git a/tooling/pipeline/includes/services/logging/logger.h b/tooling/pipeline/includes/services/logging/logger.h new file mode 100644 index 0000000..71a61e8 --- /dev/null +++ b/tooling/pipeline/includes/services/logging/logger.h @@ -0,0 +1,45 @@ +/** + * @file services/logging/logger.h + * @brief Abstract logging interface for pipeline components. + * + * Intent: Decouple logging from channel/worker implementation details. + * All pipeline components depend on ILogger, enabling swappable backends. + */ + +#ifndef BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOGGER_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOGGER_H_ + +#include +#include +#include + +#include "services/logging/log_entry.h" + +/** + * @class ILogger + * @brief Minimal interface for submitting log entries. + * + * Non-copyable and non-movable. Implementations are typically short-lived, + * created and owned by the composition root. + */ +class ILogger { + public: + ILogger() = default; + ILogger(const ILogger&) = delete; + ILogger& operator=(const ILogger&) = delete; + ILogger(ILogger&&) = delete; + ILogger& operator=(ILogger&&) = delete; + virtual ~ILogger() = default; + + /** + * @brief Submit a log entry to the logging subsystem. + * + * @param level Log level (Debug, Info, Warn, Error). + * @param phase Pipeline execution phase (Startup, Generation, Teardown, etc.). + * @param message Log message text. + */ + virtual void Log(LogLevel level, PipelinePhase phase, + std::string_view message) = 0; +}; + +#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOGGER_H_ diff --git a/tooling/pipeline/src/biergarten_data_generator/biergarten_data_generator.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/biergarten_data_generator.cc similarity index 77% rename from tooling/pipeline/src/biergarten_data_generator/biergarten_data_generator.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/biergarten_data_generator.cc index 71875b3..c60bfac 100644 --- a/tooling/pipeline/src/biergarten_data_generator/biergarten_data_generator.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/biergarten_data_generator.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/biergarten_data_generator.cc + * @file biergarten_pipeline_orchestrator/biergarten_pipeline_orchestrator.cc * @brief BiergartenDataGenerator constructor implementation. */ @@ -7,7 +7,7 @@ #include -BiergartenDataGenerator::BiergartenDataGenerator( +BiergartenPipelineOrchestrator::BiergartenPipelineOrchestrator( std::unique_ptr context_service, std::unique_ptr generator, std::unique_ptr exporter, diff --git a/tooling/pipeline/src/biergarten_data_generator/generate_breweries.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/generate_breweries.cc similarity index 92% rename from tooling/pipeline/src/biergarten_data_generator/generate_breweries.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/generate_breweries.cc index 934c3dd..4401390 100644 --- a/tooling/pipeline/src/biergarten_data_generator/generate_breweries.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/generate_breweries.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/generate_breweries.cc + * @file biergarten_pipeline_orchestrator/generate_breweries.cc * @brief BiergartenDataGenerator::GenerateBreweries() implementation. */ @@ -7,7 +7,7 @@ #include "biergarten_data_generator.h" -void BiergartenDataGenerator::GenerateBreweries( +void BiergartenPipelineOrchestrator::GenerateBreweries( std::span cities) { spdlog::info("\n=== SAMPLE BREWERY GENERATION ==="); diff --git a/tooling/pipeline/src/biergarten_data_generator/log_results.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc similarity index 88% rename from tooling/pipeline/src/biergarten_data_generator/log_results.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc index 975729e..340490f 100644 --- a/tooling/pipeline/src/biergarten_data_generator/log_results.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/log_results.cc + * @file biergarten_pipeline_orchestrator/log_results.cc * @brief BiergartenDataGenerator::LogResults() implementation. */ @@ -7,7 +7,7 @@ #include "biergarten_data_generator.h" -void BiergartenDataGenerator::LogResults() const { +void BiergartenPipelineOrchestrator::LogResults() const { spdlog::info("\n=== GENERATED DATA DUMP ==="); size_t index = 1; for (const auto& [location, brewery] : generated_breweries_) { diff --git a/tooling/pipeline/src/biergarten_data_generator/query_cities_with_countries.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/query_cities_with_countries.cc similarity index 88% rename from tooling/pipeline/src/biergarten_data_generator/query_cities_with_countries.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/query_cities_with_countries.cc index c17654f..4920693 100644 --- a/tooling/pipeline/src/biergarten_data_generator/query_cities_with_countries.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/query_cities_with_countries.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/query_cities_with_countries.cc + * @file biergarten_pipeline_orchestrator/query_cities_with_countries.cc * @brief BiergartenDataGenerator::QueryCitiesWithCountries() implementation. */ @@ -13,7 +13,7 @@ #include "biergarten_data_generator.h" #include "json_handling/json_loader.h" -std::vector BiergartenDataGenerator::QueryCitiesWithCountries() { +std::vector BiergartenPipelineOrchestrator::QueryCitiesWithCountries() { spdlog::info("\n=== GEOGRAPHIC DATA OVERVIEW ==="); const std::filesystem::path locations_path = "locations.json"; diff --git a/tooling/pipeline/src/biergarten_data_generator/run.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/run.cc similarity index 93% rename from tooling/pipeline/src/biergarten_data_generator/run.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/run.cc index 4ee2b46..c3a42e1 100644 --- a/tooling/pipeline/src/biergarten_data_generator/run.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/run.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/run.cc + * @file biergarten_pipeline_orchestrator/run.cc * @brief BiergartenDataGenerator::Run() implementation. */ @@ -9,7 +9,7 @@ #include "biergarten_data_generator.h" -bool BiergartenDataGenerator::Run() { +bool BiergartenPipelineOrchestrator::Run() { try { exporter_->Initialize(); diff --git a/tooling/pipeline/src/main.cc b/tooling/pipeline/src/main.cc index cd204c9..4d84a3f 100644 --- a/tooling/pipeline/src/main.cc +++ b/tooling/pipeline/src/main.cc @@ -12,8 +12,10 @@ #include #include #include +#include #include "biergarten_data_generator.h" +#include "concurrency/bounded_channel.h" #include "data_generation/llama_generator.h" #include "data_generation/mock_generator.h" #include "data_generation/prompt_formatting/gemma4_jinja_prompt_formatter.h" @@ -25,12 +27,22 @@ #include "services/enrichment/enrichment_service.h" #include "services/enrichment/mock_enrichment.h" #include "services/enrichment/wikipedia_service.h" +#include "services/logging/channel_logger.h" +#include "services/logging/log_consumer.h" +#include "services/logging/log_entry.h" +#include "services/logging/logger.h" #include "services/prompting/prompt_directory.h" #include "web_client/http_web_client.h" namespace di = boost::di; +static constexpr size_t kLogMaxCount = 512; int main(const int argc, char** argv) { + auto log_channel = std::make_shared>(kLogMaxCount); + ChannelLogger channel_logger(*log_channel); + LogConsumer log_worker(*log_channel); + std::thread log_thread([&log_worker] { log_worker.Run(); }); + try { Timer timer; spdlog::set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%^%l%$] %v"); @@ -46,6 +58,8 @@ int main(const int argc, char** argv) { ParseArguments(argc, argv); if (!parsed_options.has_value()) { + log_channel->Close(); + log_thread.join(); return 0; } @@ -54,66 +68,97 @@ int main(const int argc, char** argv) { const auto sampling = options.generator.sampling.value_or(SamplingOptions{}); + // ----------------------------------------------------------------------- + // Prompt directory + // Conditionally constructed before the injector; moved into LlamaGenerator. + // ----------------------------------------------------------------------- std::unique_ptr prompt_directory; if (!options.generator.use_mocked) { try { prompt_directory = std::make_unique(options.pipeline.prompt_dir); } catch (const std::exception& dir_error) { - spdlog::error("[Startup] Invalid --prompt-dir: {}", dir_error.what()); + channel_logger.Log( + LogLevel::Error, PipelinePhase::Startup, + std::string("Invalid --prompt-dir: ") + dir_error.what()); + log_channel->Close(); + log_thread.join(); return 1; } } + // ----------------------------------------------------------------------- + // Dependency injection + // ----------------------------------------------------------------------- const auto injector = di::make_injector( di::bind().to(options), di::bind().to(model_path), di::bind().to(), di::bind().to(), di::bind().to(), + di::bind().to( + [log_channel](const auto&) -> std::unique_ptr { + return std::make_unique(*log_channel); + }), di::bind().to( [options](const auto& inj) -> std::unique_ptr { if (options.generator.use_mocked) { return std::make_unique(); } - return std::make_unique( inj.template create>()); }), di::bind().to( - [options, model_path, sampling, &prompt_directory]( + [&options, &model_path, &sampling, &prompt_directory, + &channel_logger]( const auto& inj) -> std::unique_ptr { if (options.generator.use_mocked) { - spdlog::info( - "[Generator] Using MockGenerator (no model path provided)"); + channel_logger.Log( + LogLevel::Info, PipelinePhase::Startup, + "Using MockGenerator (no model path provided)"); return std::make_unique(); } - - spdlog::info( - "[Generator] Using LlamaGenerator: {} (temperature={}, " - "top-p={}, top-k={}, n_ctx={}, seed={})", - model_path, sampling.temperature, sampling.top_p, - sampling.top_k, sampling.n_ctx, sampling.seed); + channel_logger.Log( + LogLevel::Info, PipelinePhase::Startup, + "Using LlamaGenerator: " + model_path + + " (temperature=" + std::to_string(sampling.temperature) + + ", top-p=" + std::to_string(sampling.top_p) + + ", top-k=" + std::to_string(sampling.top_k) + + ", n_ctx=" + std::to_string(sampling.n_ctx) + + ", seed=" + std::to_string(sampling.seed) + ")"); return std::make_unique( options, model_path, inj.template create>(), std::move(prompt_directory)); - }) + })); - ); + // ----------------------------------------------------------------------- + // Pipeline execution + // ----------------------------------------------------------------------- + const auto orchestrator = + injector.create>(); - const auto generator = - injector.create>(); - - if (!generator->Run()) { - spdlog::error("Pipeline execution failed"); + if (!orchestrator->Run()) { + channel_logger.Log(LogLevel::Error, PipelinePhase::Teardown, + "Pipeline execution failed"); + log_channel->Close(); + log_thread.join(); return 1; } - spdlog::info("Pipeline executed successfully in {} ms", timer.Elapsed()); + channel_logger.Log(LogLevel::Info, PipelinePhase::Teardown, + "Pipeline executed successfully in " + + std::to_string(timer.Elapsed()) + " ms"); + + log_channel->Close(); + log_thread.join(); return 0; + } catch (const std::exception& exception) { + // Channel may be in an unknown state; fall back to spdlog directly. spdlog::critical("Unhandled fatal error in main: {}", exception.what()); + log_channel->Close(); + log_thread.join(); return 1; } -} +} \ No newline at end of file diff --git a/tooling/pipeline/src/services/logging/channel_logger.cc b/tooling/pipeline/src/services/logging/channel_logger.cc new file mode 100644 index 0000000..d9f7039 --- /dev/null +++ b/tooling/pipeline/src/services/logging/channel_logger.cc @@ -0,0 +1,23 @@ +/** + * @file services/logging/channel_logger.cc + */ + +#include +#include +#include +#include + +#include "concurrency/bounded_channel.h" +#include "services/logging/channel_logger.h" +#include "services/logging/log_entry.h" + +ChannelLogger::ChannelLogger(BoundedChannel& channel) + : channel_(channel) {} + +void ChannelLogger::Log(LogLevel level, PipelinePhase phase, + const std::string_view message) { + channel_.Send(LogEntry{.timestamp = std::chrono::system_clock::now(), + .level = level, + .phase = phase, + .message = std::string(message)}); +} diff --git a/tooling/pipeline/src/services/logging/log_consumer.cc b/tooling/pipeline/src/services/logging/log_consumer.cc new file mode 100644 index 0000000..9e27f80 --- /dev/null +++ b/tooling/pipeline/src/services/logging/log_consumer.cc @@ -0,0 +1,74 @@ +/** + * @file services/logging/log_consumer.cc + * @brief Dedicated log drain worker implementation. + * + * LogConsumer drains LogEntry items from a BoundedChannel and forwards them + * to spdlog for final output. + */ + +#include "services/logging/log_consumer.h" + +#include + +#include + +#include "concurrency/bounded_channel.h" +#include "services/logging/log_entry.h" + +LogConsumer::LogConsumer(BoundedChannel& channel) + : channel_(channel) {} + +void LogConsumer::Run() { + while (true) { + auto entry = channel_.Receive(); + if (!entry.has_value()) { + // Channel is closed and drained. + break; + } + + const LogEntry& log_entry = entry.value(); + auto logger = spdlog::default_logger(); + + const std::string formatted_message = [&] { + std::string msg = std::string(log_entry.message); + msg += " [phase=" + ToString(log_entry.phase) + "]"; + return msg; + }(); + + logger->log(ToSpdlogLevel(log_entry.level), formatted_message); + } +} + +spdlog::level::level_enum LogConsumer::ToSpdlogLevel(LogLevel level) { + switch (level) { + case LogLevel::Debug: + return spdlog::level::debug; + case LogLevel::Info: + return spdlog::level::info; + case LogLevel::Warn: + return spdlog::level::warn; + case LogLevel::Error: + return spdlog::level::err; + } + return spdlog::level::info; +} + +std::string LogConsumer::ToString(PipelinePhase phase) { + switch (phase) { + case PipelinePhase::Startup: + return "Startup"; + case PipelinePhase::UserGeneration: + return "UserGeneration"; + case PipelinePhase::BreweryAndBeerGeneration: + return "BreweryAndBeerGeneration"; + case PipelinePhase::CheckinGeneration: + return "CheckinGeneration"; + case PipelinePhase::RatingGeneration: + return "RatingGeneration"; + case PipelinePhase::FollowGeneration: + return "FollowGeneration"; + case PipelinePhase::Teardown: + return "Teardown"; + } + return "Unknown"; +}