diff --git a/tooling/pipeline/CMakeLists.txt b/tooling/pipeline/CMakeLists.txt index abdf592..f64db59 100644 --- a/tooling/pipeline/CMakeLists.txt +++ b/tooling/pipeline/CMakeLists.txt @@ -155,13 +155,13 @@ target_sources(${PROJECT_NAME} PRIVATE src/application_options/parse_arguments.cc ) -# --- biergarten_data_generator --- +# --- biergarten_pipeline_orchestrator --- target_sources(${PROJECT_NAME} PRIVATE - src/biergarten_data_generator/log_results.cc - src/biergarten_data_generator/biergarten_data_generator.cc - src/biergarten_data_generator/generate_breweries.cc - src/biergarten_data_generator/run.cc - src/biergarten_data_generator/query_cities_with_countries.cc + src/biergarten_pipeline_orchestrator/log_results.cc + src/biergarten_pipeline_orchestrator/biergarten_data_generator.cc + src/biergarten_pipeline_orchestrator/generate_breweries.cc + src/biergarten_pipeline_orchestrator/run.cc + src/biergarten_pipeline_orchestrator/query_cities_with_countries.cc ) # --- web_client --- @@ -202,17 +202,23 @@ target_sources(${PROJECT_NAME} PRIVATE # --- services: sqlite --- target_sources(${PROJECT_NAME} PRIVATE - src/services/sqlite/process_record.cc - src/services/sqlite/sqlite_export_service.cc - src/services/sqlite/finalize.cc - src/services/sqlite/initialize.cc - src/services/sqlite/helpers/sqlite_connection_helpers.cc - src/services/sqlite/helpers/sqlite_statement_helpers.cc + src/services/sqlite/process_record.cc + src/services/sqlite/sqlite_export_service.cc + src/services/sqlite/finalize.cc + src/services/sqlite/initialize.cc + src/services/sqlite/helpers/sqlite_connection_helpers.cc + src/services/sqlite/helpers/sqlite_statement_helpers.cc +) + +# --- services: logging --- +target_sources(${PROJECT_NAME} PRIVATE + "src/services/logging/channel_logger.cc" + src/services/logging/log_consumer.cc ) # --- services (top-level) --- target_sources(${PROJECT_NAME} PRIVATE - src/services/prompt_directory.cc + src/services/prompt_directory.cc ) # 6. Include Directories, Link Libraries & Compile Definitions diff --git a/tooling/pipeline/includes/biergarten_data_generator.h b/tooling/pipeline/includes/biergarten_data_generator.h index e74ba02..34d7ac3 100644 --- a/tooling/pipeline/includes/biergarten_data_generator.h +++ b/tooling/pipeline/includes/biergarten_data_generator.h @@ -2,7 +2,7 @@ #define BIERGARTEN_PIPELINE_INCLUDES_BIERGARTEN_DATA_GENERATOR_H_ /** - * @file biergarten_data_generator.h + * @file biergarten_pipeline_orchestrator.h * @brief Core orchestration class for pipeline data generation. */ @@ -21,7 +21,7 @@ * This class encapsulates the core logic for generating brewery data. * It handles location loading, city enrichment, and brewery generation. */ -class BiergartenDataGenerator { +class BiergartenPipelineOrchestrator { public: /** * @brief Construct a BiergartenDataGenerator with injected dependencies. @@ -30,10 +30,11 @@ class BiergartenDataGenerator { * @param generator Brewery and user data generator. * @param exporter Storage backend for generated brewery data. */ - BiergartenDataGenerator(std::unique_ptr context_service, - std::unique_ptr generator, - std::unique_ptr exporter, - const ApplicationOptions& application_options); + BiergartenPipelineOrchestrator( + std::unique_ptr context_service, + std::unique_ptr generator, + std::unique_ptr exporter, + const ApplicationOptions& application_options); /** * @brief Run the data generation pipeline. @@ -57,7 +58,7 @@ class BiergartenDataGenerator { /// @brief Storage backend for generated brewery records. std::unique_ptr exporter_; - const ApplicationOptions application_options_; + ApplicationOptions application_options_; /** * @brief Load locations from JSON and sample cities. diff --git a/tooling/pipeline/includes/BoundedChannel.h b/tooling/pipeline/includes/concurrency/bounded_channel.h similarity index 86% rename from tooling/pipeline/includes/BoundedChannel.h rename to tooling/pipeline/includes/concurrency/bounded_channel.h index 3753d8c..8e31031 100644 --- a/tooling/pipeline/includes/BoundedChannel.h +++ b/tooling/pipeline/includes/concurrency/bounded_channel.h @@ -2,17 +2,17 @@ // Created by aaronpo on 29/04/2026. // -#ifndef CONCURRENCY_INTRO_CPP_INCLUDES_BOUNDEDCHANNEL_H_ -#define CONCURRENCY_INTRO_CPP_INCLUDES_BOUNDEDCHANNEL_H_ +#ifndef BIERGARTEN_PIPELINE_INCLUDES_CONCURRENCY_BOUNDED_CHANNEL_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_CONCURRENCY_BOUNDED_CHANNEL_H_ #include +#include #include #include #include -#include /** - * @file BoundedChannel.h + * @file bounded_channel.h * @brief A thread-safe, bounded multi-producer/multi-consumer channel. */ @@ -26,7 +26,6 @@ // --------------------------------------------------------------------------- template class BoundedChannel { - // ------------------------------------------------------------------------- // Internal state — all access must be guarded by mutex_. // ------------------------------------------------------------------------- @@ -44,7 +43,6 @@ class BoundedChannel { bool closed_ = false; public: - /** * @brief Construct a bounded channel with the given capacity. * @param capacity Maximum number of items the channel may hold. @@ -68,10 +66,10 @@ class BoundedChannel { /** * @brief Close the channel and unblock all waiting threads. Idempotent. */ - void close(); + void Close(); }; // Include the template implementation -#include "BoundedChannel.tcc" +#include "bounded_channel.tcc" -#endif // CONCURRENCY_INTRO_CPP_INCLUDES_BOUNDEDCHANNEL_H_ +#endif // BIERGARTEN_PIPELINE_INCLUDES_CONCURRENCY_BOUNDED_CHANNEL_H_ diff --git a/tooling/pipeline/includes/BoundedChannel.tcc b/tooling/pipeline/includes/concurrency/bounded_channel.tcc similarity index 96% rename from tooling/pipeline/includes/BoundedChannel.tcc rename to tooling/pipeline/includes/concurrency/bounded_channel.tcc index 37a5aa3..951bfbd 100644 --- a/tooling/pipeline/includes/BoundedChannel.tcc +++ b/tooling/pipeline/includes/concurrency/bounded_channel.tcc @@ -1,4 +1,4 @@ -#include "BoundedChannel.h" +#include "bounded_channel.h" template void BoundedChannel::Send(T item) { @@ -42,7 +42,7 @@ std::optional BoundedChannel::Receive() { } template -void BoundedChannel::close() { +void BoundedChannel::Close() { // Acquire exclusive ownership of the mutex to ensure visibility of the flag. std::unique_lock lock(mutex_); diff --git a/tooling/pipeline/includes/services/logging/channel_logger.h b/tooling/pipeline/includes/services/logging/channel_logger.h new file mode 100644 index 0000000..8968904 --- /dev/null +++ b/tooling/pipeline/includes/services/logging/channel_logger.h @@ -0,0 +1,37 @@ +/** + * @file services/logging/channel_logger.h + * @brief Channel-backed implementation of the Logger interface. + * + * ChannelLogger constructs LogEntry values and forwards them to a + * BoundedChannel for asynchronous consumption by LogConsumer. + * The channel is injected by reference; ChannelLogger does not own it. + */ + +#ifndef BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_CHANNEL_LOGGER_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_CHANNEL_LOGGER_H_ + +#include + +#include "concurrency/bounded_channel.h" +#include "services/logging/log_entry.h" +#include "services/logging/logger.h" + +class ChannelLogger final : public ILogger { + public: + explicit ChannelLogger(BoundedChannel& channel); + + ChannelLogger(const ChannelLogger&) = delete; + ChannelLogger& operator=(const ChannelLogger&) = delete; + ChannelLogger(ChannelLogger&&) = delete; + ChannelLogger& operator=(ChannelLogger&&) = delete; + + ~ChannelLogger() override = default; + + void Log(LogLevel level, PipelinePhase phase, + std::string_view message) override; + + private: + BoundedChannel& channel_; +}; + +#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_CHANNEL_LOGGER_H_ diff --git a/tooling/pipeline/includes/services/logging/log_consumer.h b/tooling/pipeline/includes/services/logging/log_consumer.h new file mode 100644 index 0000000..20c26b6 --- /dev/null +++ b/tooling/pipeline/includes/services/logging/log_consumer.h @@ -0,0 +1,44 @@ +/** + * @file services/logging/log_consumer.h + * @brief Dedicated log drain worker for the pipeline logging channel. + * + * LogConsumer runs on its own thread, draining LogEntry items from a + * BoundedChannel and forwarding them to spdlog. Exits cleanly + * when the channel is closed. + */ + +#ifndef BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_CONSUMER_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_CONSUMER_H_ + +#include + +#include + +#include "concurrency/bounded_channel.h" +#include "services/logging/log_entry.h" + +class LogConsumer { + public: + explicit LogConsumer(BoundedChannel& channel); + + LogConsumer(const LogConsumer&) = delete; + LogConsumer& operator=(const LogConsumer&) = delete; + LogConsumer(LogConsumer&&) = delete; + LogConsumer& operator=(LogConsumer&&) = delete; + + /** + * @brief Drains the channel until it is closed. + * + * Intended to be run on a dedicated std::thread. Exits when: + * - The channel is closed and the queue is fully drained. + */ + void Run(); + + private: + BoundedChannel& channel_; + + static spdlog::level::level_enum ToSpdlogLevel(LogLevel level); + static std::string ToString(PipelinePhase phase); +}; + +#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_CONSUMER_H_ diff --git a/tooling/pipeline/includes/services/logging/log_entry.h b/tooling/pipeline/includes/services/logging/log_entry.h new file mode 100644 index 0000000..38416ac --- /dev/null +++ b/tooling/pipeline/includes/services/logging/log_entry.h @@ -0,0 +1,41 @@ +/** + * @file services/logging/log_entry.h + * @brief POD struct representing a single log event in the pipeline. + * + * LogEntry is produced by PipelineLogger and consumed by LogWorker via + * BoundedChannel. All fields are value types so entries are + * safely movable across the channel without shared ownership. + */ + +#ifndef BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_ENTRY_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_ENTRY_H_ + +#include +#include + +enum class LogLevel { + Debug, + Info, + Warn, + Error, +}; + +enum class PipelinePhase { + Startup, + UserGeneration, + BreweryAndBeerGeneration, + CheckinGeneration, + RatingGeneration, + FollowGeneration, + Teardown, +}; + +struct LogEntry { + std::chrono::system_clock::time_point timestamp = + std::chrono::system_clock::now(); + LogLevel level; + PipelinePhase phase; + std::string message; +}; + +#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOG_ENTRY_H_ \ No newline at end of file diff --git a/tooling/pipeline/includes/services/logging/logger.h b/tooling/pipeline/includes/services/logging/logger.h new file mode 100644 index 0000000..6e54ae2 --- /dev/null +++ b/tooling/pipeline/includes/services/logging/logger.h @@ -0,0 +1,31 @@ +/** + * @file services/logging/logger.h + * @brief Abstract interface for pipeline logging. + * + * Kept intentionally narrow. Components that need to log depend on Logger, + * not on PipelineLogger or any channel type. + */ + +#ifndef BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOGGER_H_ +#define BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOGGER_H_ + +#include +#include +#include + +#include "services/logging/log_entry.h" + +class ILogger { + public: + ILogger() = default; + ILogger(const ILogger&) = delete; + ILogger& operator=(const ILogger&) = delete; + ILogger(ILogger&&) = delete; + ILogger& operator=(ILogger&&) = delete; + virtual ~ILogger() = default; + + virtual void Log(LogLevel level, PipelinePhase phase, + std::string_view message) = 0; +}; + +#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOGGER_H_ diff --git a/tooling/pipeline/src/biergarten_data_generator/biergarten_data_generator.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/biergarten_data_generator.cc similarity index 77% rename from tooling/pipeline/src/biergarten_data_generator/biergarten_data_generator.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/biergarten_data_generator.cc index 71875b3..c60bfac 100644 --- a/tooling/pipeline/src/biergarten_data_generator/biergarten_data_generator.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/biergarten_data_generator.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/biergarten_data_generator.cc + * @file biergarten_pipeline_orchestrator/biergarten_pipeline_orchestrator.cc * @brief BiergartenDataGenerator constructor implementation. */ @@ -7,7 +7,7 @@ #include -BiergartenDataGenerator::BiergartenDataGenerator( +BiergartenPipelineOrchestrator::BiergartenPipelineOrchestrator( std::unique_ptr context_service, std::unique_ptr generator, std::unique_ptr exporter, diff --git a/tooling/pipeline/src/biergarten_data_generator/generate_breweries.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/generate_breweries.cc similarity index 92% rename from tooling/pipeline/src/biergarten_data_generator/generate_breweries.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/generate_breweries.cc index 934c3dd..4401390 100644 --- a/tooling/pipeline/src/biergarten_data_generator/generate_breweries.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/generate_breweries.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/generate_breweries.cc + * @file biergarten_pipeline_orchestrator/generate_breweries.cc * @brief BiergartenDataGenerator::GenerateBreweries() implementation. */ @@ -7,7 +7,7 @@ #include "biergarten_data_generator.h" -void BiergartenDataGenerator::GenerateBreweries( +void BiergartenPipelineOrchestrator::GenerateBreweries( std::span cities) { spdlog::info("\n=== SAMPLE BREWERY GENERATION ==="); diff --git a/tooling/pipeline/src/biergarten_data_generator/log_results.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc similarity index 88% rename from tooling/pipeline/src/biergarten_data_generator/log_results.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc index 975729e..340490f 100644 --- a/tooling/pipeline/src/biergarten_data_generator/log_results.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/log_results.cc + * @file biergarten_pipeline_orchestrator/log_results.cc * @brief BiergartenDataGenerator::LogResults() implementation. */ @@ -7,7 +7,7 @@ #include "biergarten_data_generator.h" -void BiergartenDataGenerator::LogResults() const { +void BiergartenPipelineOrchestrator::LogResults() const { spdlog::info("\n=== GENERATED DATA DUMP ==="); size_t index = 1; for (const auto& [location, brewery] : generated_breweries_) { diff --git a/tooling/pipeline/src/biergarten_data_generator/query_cities_with_countries.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/query_cities_with_countries.cc similarity index 88% rename from tooling/pipeline/src/biergarten_data_generator/query_cities_with_countries.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/query_cities_with_countries.cc index c17654f..4920693 100644 --- a/tooling/pipeline/src/biergarten_data_generator/query_cities_with_countries.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/query_cities_with_countries.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/query_cities_with_countries.cc + * @file biergarten_pipeline_orchestrator/query_cities_with_countries.cc * @brief BiergartenDataGenerator::QueryCitiesWithCountries() implementation. */ @@ -13,7 +13,7 @@ #include "biergarten_data_generator.h" #include "json_handling/json_loader.h" -std::vector BiergartenDataGenerator::QueryCitiesWithCountries() { +std::vector BiergartenPipelineOrchestrator::QueryCitiesWithCountries() { spdlog::info("\n=== GEOGRAPHIC DATA OVERVIEW ==="); const std::filesystem::path locations_path = "locations.json"; diff --git a/tooling/pipeline/src/biergarten_data_generator/run.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/run.cc similarity index 93% rename from tooling/pipeline/src/biergarten_data_generator/run.cc rename to tooling/pipeline/src/biergarten_pipeline_orchestrator/run.cc index 4ee2b46..c3a42e1 100644 --- a/tooling/pipeline/src/biergarten_data_generator/run.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/run.cc @@ -1,5 +1,5 @@ /** - * @file biergarten_data_generator/run.cc + * @file biergarten_pipeline_orchestrator/run.cc * @brief BiergartenDataGenerator::Run() implementation. */ @@ -9,7 +9,7 @@ #include "biergarten_data_generator.h" -bool BiergartenDataGenerator::Run() { +bool BiergartenPipelineOrchestrator::Run() { try { exporter_->Initialize(); diff --git a/tooling/pipeline/src/main.cc b/tooling/pipeline/src/main.cc index cd204c9..4d84a3f 100644 --- a/tooling/pipeline/src/main.cc +++ b/tooling/pipeline/src/main.cc @@ -12,8 +12,10 @@ #include #include #include +#include #include "biergarten_data_generator.h" +#include "concurrency/bounded_channel.h" #include "data_generation/llama_generator.h" #include "data_generation/mock_generator.h" #include "data_generation/prompt_formatting/gemma4_jinja_prompt_formatter.h" @@ -25,12 +27,22 @@ #include "services/enrichment/enrichment_service.h" #include "services/enrichment/mock_enrichment.h" #include "services/enrichment/wikipedia_service.h" +#include "services/logging/channel_logger.h" +#include "services/logging/log_consumer.h" +#include "services/logging/log_entry.h" +#include "services/logging/logger.h" #include "services/prompting/prompt_directory.h" #include "web_client/http_web_client.h" namespace di = boost::di; +static constexpr size_t kLogMaxCount = 512; int main(const int argc, char** argv) { + auto log_channel = std::make_shared>(kLogMaxCount); + ChannelLogger channel_logger(*log_channel); + LogConsumer log_worker(*log_channel); + std::thread log_thread([&log_worker] { log_worker.Run(); }); + try { Timer timer; spdlog::set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%^%l%$] %v"); @@ -46,6 +58,8 @@ int main(const int argc, char** argv) { ParseArguments(argc, argv); if (!parsed_options.has_value()) { + log_channel->Close(); + log_thread.join(); return 0; } @@ -54,66 +68,97 @@ int main(const int argc, char** argv) { const auto sampling = options.generator.sampling.value_or(SamplingOptions{}); + // ----------------------------------------------------------------------- + // Prompt directory + // Conditionally constructed before the injector; moved into LlamaGenerator. + // ----------------------------------------------------------------------- std::unique_ptr prompt_directory; if (!options.generator.use_mocked) { try { prompt_directory = std::make_unique(options.pipeline.prompt_dir); } catch (const std::exception& dir_error) { - spdlog::error("[Startup] Invalid --prompt-dir: {}", dir_error.what()); + channel_logger.Log( + LogLevel::Error, PipelinePhase::Startup, + std::string("Invalid --prompt-dir: ") + dir_error.what()); + log_channel->Close(); + log_thread.join(); return 1; } } + // ----------------------------------------------------------------------- + // Dependency injection + // ----------------------------------------------------------------------- const auto injector = di::make_injector( di::bind().to(options), di::bind().to(model_path), di::bind().to(), di::bind().to(), di::bind().to(), + di::bind().to( + [log_channel](const auto&) -> std::unique_ptr { + return std::make_unique(*log_channel); + }), di::bind().to( [options](const auto& inj) -> std::unique_ptr { if (options.generator.use_mocked) { return std::make_unique(); } - return std::make_unique( inj.template create>()); }), di::bind().to( - [options, model_path, sampling, &prompt_directory]( + [&options, &model_path, &sampling, &prompt_directory, + &channel_logger]( const auto& inj) -> std::unique_ptr { if (options.generator.use_mocked) { - spdlog::info( - "[Generator] Using MockGenerator (no model path provided)"); + channel_logger.Log( + LogLevel::Info, PipelinePhase::Startup, + "Using MockGenerator (no model path provided)"); return std::make_unique(); } - - spdlog::info( - "[Generator] Using LlamaGenerator: {} (temperature={}, " - "top-p={}, top-k={}, n_ctx={}, seed={})", - model_path, sampling.temperature, sampling.top_p, - sampling.top_k, sampling.n_ctx, sampling.seed); + channel_logger.Log( + LogLevel::Info, PipelinePhase::Startup, + "Using LlamaGenerator: " + model_path + + " (temperature=" + std::to_string(sampling.temperature) + + ", top-p=" + std::to_string(sampling.top_p) + + ", top-k=" + std::to_string(sampling.top_k) + + ", n_ctx=" + std::to_string(sampling.n_ctx) + + ", seed=" + std::to_string(sampling.seed) + ")"); return std::make_unique( options, model_path, inj.template create>(), std::move(prompt_directory)); - }) + })); - ); + // ----------------------------------------------------------------------- + // Pipeline execution + // ----------------------------------------------------------------------- + const auto orchestrator = + injector.create>(); - const auto generator = - injector.create>(); - - if (!generator->Run()) { - spdlog::error("Pipeline execution failed"); + if (!orchestrator->Run()) { + channel_logger.Log(LogLevel::Error, PipelinePhase::Teardown, + "Pipeline execution failed"); + log_channel->Close(); + log_thread.join(); return 1; } - spdlog::info("Pipeline executed successfully in {} ms", timer.Elapsed()); + channel_logger.Log(LogLevel::Info, PipelinePhase::Teardown, + "Pipeline executed successfully in " + + std::to_string(timer.Elapsed()) + " ms"); + + log_channel->Close(); + log_thread.join(); return 0; + } catch (const std::exception& exception) { + // Channel may be in an unknown state; fall back to spdlog directly. spdlog::critical("Unhandled fatal error in main: {}", exception.what()); + log_channel->Close(); + log_thread.join(); return 1; } -} +} \ No newline at end of file diff --git a/tooling/pipeline/src/services/logging/channel_logger.cc b/tooling/pipeline/src/services/logging/channel_logger.cc new file mode 100644 index 0000000..d9f7039 --- /dev/null +++ b/tooling/pipeline/src/services/logging/channel_logger.cc @@ -0,0 +1,23 @@ +/** + * @file services/logging/channel_logger.cc + */ + +#include +#include +#include +#include + +#include "concurrency/bounded_channel.h" +#include "services/logging/channel_logger.h" +#include "services/logging/log_entry.h" + +ChannelLogger::ChannelLogger(BoundedChannel& channel) + : channel_(channel) {} + +void ChannelLogger::Log(LogLevel level, PipelinePhase phase, + const std::string_view message) { + channel_.Send(LogEntry{.timestamp = std::chrono::system_clock::now(), + .level = level, + .phase = phase, + .message = std::string(message)}); +} diff --git a/tooling/pipeline/src/services/logging/log_consumer.cc b/tooling/pipeline/src/services/logging/log_consumer.cc new file mode 100644 index 0000000..9e27f80 --- /dev/null +++ b/tooling/pipeline/src/services/logging/log_consumer.cc @@ -0,0 +1,74 @@ +/** + * @file services/logging/log_consumer.cc + * @brief Dedicated log drain worker implementation. + * + * LogConsumer drains LogEntry items from a BoundedChannel and forwards them + * to spdlog for final output. + */ + +#include "services/logging/log_consumer.h" + +#include + +#include + +#include "concurrency/bounded_channel.h" +#include "services/logging/log_entry.h" + +LogConsumer::LogConsumer(BoundedChannel& channel) + : channel_(channel) {} + +void LogConsumer::Run() { + while (true) { + auto entry = channel_.Receive(); + if (!entry.has_value()) { + // Channel is closed and drained. + break; + } + + const LogEntry& log_entry = entry.value(); + auto logger = spdlog::default_logger(); + + const std::string formatted_message = [&] { + std::string msg = std::string(log_entry.message); + msg += " [phase=" + ToString(log_entry.phase) + "]"; + return msg; + }(); + + logger->log(ToSpdlogLevel(log_entry.level), formatted_message); + } +} + +spdlog::level::level_enum LogConsumer::ToSpdlogLevel(LogLevel level) { + switch (level) { + case LogLevel::Debug: + return spdlog::level::debug; + case LogLevel::Info: + return spdlog::level::info; + case LogLevel::Warn: + return spdlog::level::warn; + case LogLevel::Error: + return spdlog::level::err; + } + return spdlog::level::info; +} + +std::string LogConsumer::ToString(PipelinePhase phase) { + switch (phase) { + case PipelinePhase::Startup: + return "Startup"; + case PipelinePhase::UserGeneration: + return "UserGeneration"; + case PipelinePhase::BreweryAndBeerGeneration: + return "BreweryAndBeerGeneration"; + case PipelinePhase::CheckinGeneration: + return "CheckinGeneration"; + case PipelinePhase::RatingGeneration: + return "RatingGeneration"; + case PipelinePhase::FollowGeneration: + return "FollowGeneration"; + case PipelinePhase::Teardown: + return "Teardown"; + } + return "Unknown"; +}