diff --git a/tooling/pipeline/CMakeLists.txt b/tooling/pipeline/CMakeLists.txt index 12df845..b31ce3e 100644 --- a/tooling/pipeline/CMakeLists.txt +++ b/tooling/pipeline/CMakeLists.txt @@ -249,6 +249,11 @@ target_compile_definitions(${PROJECT_NAME} PRIVATE $<$:DEBUG> ) +target_compile_options(biergarten-pipeline PRIVATE + -fmacro-prefix-map=${CMAKE_SOURCE_DIR}/tooling/pipeline/src/= +) + + # 7. Runtime Assets configure_file( ${CMAKE_SOURCE_DIR}/locations.json @@ -260,3 +265,4 @@ add_custom_command(TARGET ${PROJECT_NAME} POST_BUILD ${CMAKE_SOURCE_DIR}/prompts ${CMAKE_BINARY_DIR}/prompts ) + diff --git a/tooling/pipeline/includes/biergarten_pipeline_orchestrator.h b/tooling/pipeline/includes/biergarten_pipeline_orchestrator.h index 4be0d41..514d843 100644 --- a/tooling/pipeline/includes/biergarten_pipeline_orchestrator.h +++ b/tooling/pipeline/includes/biergarten_pipeline_orchestrator.h @@ -51,6 +51,12 @@ class BiergartenPipelineOrchestrator { * 2. Resolve context for each city using the injected context service * 3. Generate brewery data for sampled cities * + * @note STRUCTURAL CONCURRENCY REQUIREMENT: + * When transitioned to a multithreaded design, this method MUST structurally + * enforce that all deployed worker threads are joined before returning (e.g. + * by using std::jthread or a structured concurrency primitive). This ensures + * workers do not attempt to log to a closed channel during application teardown. + * * @return true if successful, false if not */ bool Run(); diff --git a/tooling/pipeline/includes/services/logging/log_entry.h b/tooling/pipeline/includes/services/logging/log_entry.h index 9c8e09e..1227430 100644 --- a/tooling/pipeline/includes/services/logging/log_entry.h +++ b/tooling/pipeline/includes/services/logging/log_entry.h @@ -43,23 +43,35 @@ enum class PipelinePhase { Teardown, ///< Finalization and cleanup. }; +/** + * @struct LogDTO + * @brief User-provided subset of log fields. Used to capture call-site info transparently. + */ +struct LogDTO { + LogLevel level; + PipelinePhase phase; + std::string message; +}; + /** * @struct LogEntry * @brief Single structured log event. * * All fields are value types, which keeps transfer across the bounded channel * simple and avoids shared ownership. + * + * NOTE: timestamp, thread_id, and origin must be populated by ILogger::Log() + * before the entry is dispatched. */ struct LogEntry { /// @brief Timestamp when the entry was created. - std::chrono::system_clock::time_point timestamp = - std::chrono::system_clock::now(); + std::chrono::system_clock::time_point timestamp{}; /// @brief Source location where the log call was made. - std::source_location origin = std::source_location::current(); + std::source_location origin{}; /// @brief Thread responsible for emitting the log. - std::thread::id thread_id = std::this_thread::get_id(); + std::thread::id thread_id{}; /// @brief Severity level of this entry. diff --git a/tooling/pipeline/includes/services/logging/log_producer.h b/tooling/pipeline/includes/services/logging/log_producer.h index fb769ac..d032f89 100644 --- a/tooling/pipeline/includes/services/logging/log_producer.h +++ b/tooling/pipeline/includes/services/logging/log_producer.h @@ -24,29 +24,30 @@ */ class LogProducer final : public ILogger { public: - /** - * @brief Construct a channel-backed producer. - * - * @param channel Reference to the bounded channel used for log transfer. - */ - explicit LogProducer(BoundedChannel& channel); + /** + * @brief Construct a channel-backed producer. + * + * @param channel Reference to the bounded channel used for log transfer. + */ + explicit LogProducer(BoundedChannel& channel); - LogProducer(const LogProducer&) = delete; - LogProducer& operator=(const LogProducer&) = delete; - LogProducer(LogProducer&&) = delete; - LogProducer& operator=(LogProducer&&) = delete; + LogProducer(const LogProducer&) = delete; + LogProducer& operator=(const LogProducer&) = delete; + LogProducer(LogProducer&&) = delete; + LogProducer& operator=(LogProducer&&) = delete; - ~LogProducer() override = default; + ~LogProducer() override = default; - /** - * @brief Queue a log message for asynchronous processing. - * - * Blocks while the channel applies backpressure. - */ - void Log(LogEntry log_entry) override; + /** + * @brief Queue a log message for asynchronous processing. + * + * Blocks while the channel applies backpressure. This blocking behavior + * under heavy load is an accepted trade-off for simplicity. + */ + void DoLog(LogEntry log_entry) override; private: - BoundedChannel& channel_; + BoundedChannel& channel_; }; #endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_CHANNEL_LOGGER_H_ diff --git a/tooling/pipeline/includes/services/logging/logger.h b/tooling/pipeline/includes/services/logging/logger.h index 5303106..5171744 100644 --- a/tooling/pipeline/includes/services/logging/logger.h +++ b/tooling/pipeline/includes/services/logging/logger.h @@ -34,9 +34,31 @@ class ILogger { /** * @brief Submit a log message to the logging subsystem. * - * @param log_entry Structured log entry data. + * @param payload User-provided log data (level, phase, message). + * @param origin Auto-captured source location of the call site. */ - virtual void Log(LogEntry log_entry) = 0; + void Log(LogDTO payload, + std::source_location origin = std::source_location::current(), + std::chrono::system_clock::time_point timestamp = std::chrono::system_clock::now(), + std::thread::id thread_id = std::this_thread::get_id()) { + LogEntry entry; + entry.timestamp = timestamp; + entry.thread_id = thread_id; + entry.level = payload.level; + entry.phase = payload.phase; + entry.message = std::move(payload.message); + entry.origin = origin; + DoLog(std::move(entry)); + } + + protected: + /** + * @brief Underlying implementation to transport the log entry. + * + * Implementations must be thread-safe as DoLog can be called concurrently + * from multiple worker threads. + */ + virtual void DoLog(LogEntry log_entry) = 0; }; #endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOGGER_H_ diff --git a/tooling/pipeline/src/application_options/parse_arguments.cc b/tooling/pipeline/src/application_options/parse_arguments.cc index 00743e3..5e9ae84 100644 --- a/tooling/pipeline/src/application_options/parse_arguments.cc +++ b/tooling/pipeline/src/application_options/parse_arguments.cc @@ -71,10 +71,10 @@ std::optional ParseArguments( return usage_stream.str(); })(); if (logger) { - logger->Log({.level = LogLevel::Info, + logger->Log(LogDTO{.level = LogLevel::Info, .phase = PipelinePhase::Startup, .message = title}); - logger->Log({.level = LogLevel::Info, + logger->Log(LogDTO{.level = LogLevel::Info, .phase = PipelinePhase::Startup, .message = usage}); } @@ -90,7 +90,7 @@ std::optional ParseArguments( std::stringstream help_stream; help_stream << "\n" << desc; if (logger) { - logger->Log({.level = LogLevel::Info, + logger->Log(LogDTO{.level = LogLevel::Info, .phase = PipelinePhase::Startup, .message = help_stream.str()}); } @@ -113,7 +113,7 @@ std::optional ParseArguments( const std::string msg = "Invalid arguments: --mocked and --model are mutually exclusive"; if (logger) { - logger->Log({.level = LogLevel::Error, + logger->Log(LogDTO{.level = LogLevel::Error, .phase = PipelinePhase::Startup, .message = msg}); } else { @@ -126,7 +126,7 @@ std::optional ParseArguments( const std::string msg = "Invalid arguments: either --mocked or --model must be specified"; if (logger) { - logger->Log({.level = LogLevel::Error, + logger->Log(LogDTO{.level = LogLevel::Error, .phase = PipelinePhase::Startup, .message = msg}); } else { @@ -170,7 +170,7 @@ std::optional ParseArguments( const std::string msg = "Sampling parameters are ignored when using --mocked"; if (logger) { - logger->Log({.level = LogLevel::Warn, + logger->Log(LogDTO{.level = LogLevel::Warn, .phase = PipelinePhase::Startup, .message = msg}); } else { @@ -196,7 +196,7 @@ std::optional ParseArguments( std::string("Failed to parse command-line arguments: ") + exception.what(); if (logger) { - logger->Log({.level = LogLevel::Error, + logger->Log(LogDTO{.level = LogLevel::Error, .phase = PipelinePhase::Startup, .message = msg}); } @@ -205,7 +205,7 @@ std::optional ParseArguments( const std::string msg = "Failed to parse command-line arguments: unknown error"; if (logger) { - logger->Log({.level = LogLevel::Error, + logger->Log(LogDTO{.level = LogLevel::Error, .phase = PipelinePhase::Startup, .message = msg}); } diff --git a/tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc b/tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc index 8843502..21db335 100644 --- a/tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc +++ b/tooling/pipeline/src/biergarten_pipeline_orchestrator/log_results.cc @@ -34,4 +34,4 @@ void BiergartenPipelineOrchestrator::LogResults() const { logger_->Log({.level = LogLevel::Info, .phase = PipelinePhase::Teardown, .message = oss.str()}); -} \ No newline at end of file +} diff --git a/tooling/pipeline/src/main.cc b/tooling/pipeline/src/main.cc index b2c7a11..25ed693 100644 --- a/tooling/pipeline/src/main.cc +++ b/tooling/pipeline/src/main.cc @@ -66,8 +66,8 @@ int main(const int argc, char** argv) { const LlamaBackendState llama_backend_state; #endif - log_producer->Log({.level = LogLevel::Info, - .phase = PipelinePhase::Startup, + log_producer->Log({.level = LogLevel::Info, + .phase = PipelinePhase::Startup, .message = "STARTING PIPELINE"}); const std::optional parsed_options = @@ -89,8 +89,8 @@ int main(const int argc, char** argv) { prompt_directory = std::make_unique( options.pipeline.prompt_dir, log_producer); } catch (const std::exception& dir_error) { - log_producer->Log({.level = LogLevel::Error, - .phase = PipelinePhase::Startup, + log_producer->Log({.level = LogLevel::Error, + .phase = PipelinePhase::Startup, .message = std::format("Invalid --prompt-dir: {}", dir_error.what())}); @@ -108,7 +108,7 @@ int main(const int argc, char** argv) { { log_producer->Log( {.level = LogLevel::Info, - .phase = PipelinePhase::Startup, + .phase = PipelinePhase::Startup, .message = "Prompt formatter: none (mock mode)"}); } return std::unique_ptr(nullptr); @@ -116,7 +116,7 @@ int main(const int argc, char** argv) { { log_producer->Log( {.level = LogLevel::Info, - .phase = PipelinePhase::Startup, + .phase = PipelinePhase::Startup, .message = "Prompt formatter: Gemma4JinjaPromptFormatter"}); } return std::unique_ptr( @@ -125,15 +125,15 @@ int main(const int argc, char** argv) { di::bind().to([options, log_producer] { if (options.generator.use_mocked) { { - log_producer->Log({.level = LogLevel::Info, - .phase = PipelinePhase::Startup, + log_producer->Log({.level = LogLevel::Info, + .phase = PipelinePhase::Startup, .message = "Web client: none (mock mode)"}); } return std::unique_ptr(nullptr); } { - log_producer->Log({.level = LogLevel::Info, - .phase = PipelinePhase::Startup, + log_producer->Log({.level = LogLevel::Info, + .phase = PipelinePhase::Startup, .message = "Web client: HttpWebClient"}); } return std::unique_ptr( @@ -144,15 +144,15 @@ int main(const int argc, char** argv) { const auto& inj) -> std::unique_ptr { if (options.generator.use_mocked) { { - log_producer->Log({.level = LogLevel::Info, - .phase = PipelinePhase::Startup, + log_producer->Log({.level = LogLevel::Info, + .phase = PipelinePhase::Startup, .message = "Enrichment: mock"}); } return std::make_unique(); } { - log_producer->Log({.level = LogLevel::Info, - .phase = PipelinePhase::Startup, + log_producer->Log({.level = LogLevel::Info, + .phase = PipelinePhase::Startup, .message = "Enrichment: Wikipedia"}); } return std::make_unique( @@ -164,8 +164,8 @@ int main(const int argc, char** argv) { &log_producer](const auto& inj) -> std::unique_ptr { if (options.generator.use_mocked) { { - log_producer->Log({.level = LogLevel::Info, - .phase = PipelinePhase::Startup, + log_producer->Log({.level = LogLevel::Info, + .phase = PipelinePhase::Startup, .message = "Generator: mock"}); } return std::make_unique(); @@ -173,7 +173,7 @@ int main(const int argc, char** argv) { { log_producer->Log( {.level = LogLevel::Info, - .phase = PipelinePhase::Startup, + .phase = PipelinePhase::Startup, .message = std::format( "Generator: LlamaGenerator | model={} | temp={:.2f} " "top_p={:.2f} top_k={} n_ctx={} seed={}", @@ -190,23 +190,23 @@ int main(const int argc, char** argv) { injector.create>(); if (!orchestrator->Run()) { - log_producer->Log({.level = LogLevel::Error, - .phase = PipelinePhase::Teardown, + log_producer->Log({.level = LogLevel::Error, + .phase = PipelinePhase::Teardown, .message = "Pipeline execution failed"}); return shutdown(EXIT_FAILURE); } - log_producer->Log({.level = LogLevel::Info, - .phase = PipelinePhase::Teardown, + log_producer->Log({.level = LogLevel::Info, + .phase = PipelinePhase::Teardown, .message = std::format("Pipeline complete in {} ms", timer.Elapsed())}); return shutdown(EXIT_SUCCESS); } catch (const std::exception& exception) { - const LogEntry log_entry{.level = LogLevel::Error, - .phase = PipelinePhase::Teardown, - .message = exception.what()}; + const LogDTO log_entry{.level = LogLevel::Error, + .phase = PipelinePhase::Teardown, + .message = exception.what()}; if (log_producer) { log_producer->Log(log_entry); } else { diff --git a/tooling/pipeline/src/services/logging/log_dispatcher.cc b/tooling/pipeline/src/services/logging/log_dispatcher.cc index 421e337..3655047 100644 --- a/tooling/pipeline/src/services/logging/log_dispatcher.cc +++ b/tooling/pipeline/src/services/logging/log_dispatcher.cc @@ -13,6 +13,29 @@ #include "concurrency/bounded_channel.h" #include "services/logging/log_entry.h" +namespace { +[[nodiscard]] constexpr std::string_view PipelinePhaseToString( + PipelinePhase phase) { + switch (phase) { + case PipelinePhase::Startup: + return "Startup"; + case PipelinePhase::UserGeneration: + return "User Generation"; + case PipelinePhase::BreweryAndBeerGeneration: + return "Brewery & Beer Gen"; + case PipelinePhase::CheckinGeneration: + return "Checkin Gen"; + case PipelinePhase::RatingGeneration: + return "Rating Gen"; + case PipelinePhase::FollowGeneration: + return "Follow Gen"; + case PipelinePhase::Teardown: + return "Teardown"; + } + return "Unknown"; +} +} // namespace + LogDispatcher::LogDispatcher(BoundedChannel& channel) : channel_(channel) {} @@ -28,7 +51,11 @@ void LogDispatcher::Run() { const auto& log = entry.value(); - logger->log(ToSpdlogLevel(log.level), log.message); + logger->log(ToSpdlogLevel(log.level), + "{:<20} │ thread: {:016x} │ [{}:{}] │ {}", + PipelinePhaseToString(log.phase), + std::hash{}(log.thread_id), + log.origin.file_name(), log.origin.line(), log.message); } } diff --git a/tooling/pipeline/src/services/logging/log_producer.cc b/tooling/pipeline/src/services/logging/log_producer.cc index 9c0718c..18278ad 100644 --- a/tooling/pipeline/src/services/logging/log_producer.cc +++ b/tooling/pipeline/src/services/logging/log_producer.cc @@ -16,4 +16,4 @@ LogProducer::LogProducer(BoundedChannel& channel) : channel_(channel) {} -void LogProducer::Log(LogEntry entry) { channel_.Send(std::move(entry)); } +void LogProducer::DoLog(LogEntry entry) { channel_.Send(std::move(entry)); }