This commit is contained in:
Aaron Po
2026-05-20 02:11:36 -04:00
parent e251e7b2a3
commit 7580f47a7d
10 changed files with 133 additions and 59 deletions

View File

@@ -249,6 +249,11 @@ target_compile_definitions(${PROJECT_NAME} PRIVATE
$<$<CONFIG:Debug>:DEBUG>
)
target_compile_options(biergarten-pipeline PRIVATE
-fmacro-prefix-map=${CMAKE_SOURCE_DIR}/tooling/pipeline/src/=
)
# 7. Runtime Assets
configure_file(
${CMAKE_SOURCE_DIR}/locations.json
@@ -260,3 +265,4 @@ add_custom_command(TARGET ${PROJECT_NAME} POST_BUILD
${CMAKE_SOURCE_DIR}/prompts
${CMAKE_BINARY_DIR}/prompts
)

View File

@@ -51,6 +51,12 @@ class BiergartenPipelineOrchestrator {
* 2. Resolve context for each city using the injected context service
* 3. Generate brewery data for sampled cities
*
* @note STRUCTURAL CONCURRENCY REQUIREMENT:
* When transitioned to a multithreaded design, this method MUST structurally
* enforce that all deployed worker threads are joined before returning (e.g.
* by using std::jthread or a structured concurrency primitive). This ensures
* workers do not attempt to log to a closed channel during application teardown.
*
* @return true if successful, false if not
*/
bool Run();

View File

@@ -43,23 +43,35 @@ enum class PipelinePhase {
Teardown, ///< Finalization and cleanup.
};
/**
* @struct LogDTO
* @brief User-provided subset of log fields. Used to capture call-site info transparently.
*/
struct LogDTO {
LogLevel level;
PipelinePhase phase;
std::string message;
};
/**
* @struct LogEntry
* @brief Single structured log event.
*
* All fields are value types, which keeps transfer across the bounded channel
* simple and avoids shared ownership.
*
* NOTE: timestamp, thread_id, and origin must be populated by ILogger::Log()
* before the entry is dispatched.
*/
struct LogEntry {
/// @brief Timestamp when the entry was created.
std::chrono::system_clock::time_point timestamp =
std::chrono::system_clock::now();
std::chrono::system_clock::time_point timestamp{};
/// @brief Source location where the log call was made.
std::source_location origin = std::source_location::current();
std::source_location origin{};
/// @brief Thread responsible for emitting the log.
std::thread::id thread_id = std::this_thread::get_id();
std::thread::id thread_id{};
/// @brief Severity level of this entry.

View File

@@ -24,29 +24,30 @@
*/
class LogProducer final : public ILogger {
public:
/**
* @brief Construct a channel-backed producer.
*
* @param channel Reference to the bounded channel used for log transfer.
*/
explicit LogProducer(BoundedChannel<LogEntry>& channel);
/**
* @brief Construct a channel-backed producer.
*
* @param channel Reference to the bounded channel used for log transfer.
*/
explicit LogProducer(BoundedChannel<LogEntry>& channel);
LogProducer(const LogProducer&) = delete;
LogProducer& operator=(const LogProducer&) = delete;
LogProducer(LogProducer&&) = delete;
LogProducer& operator=(LogProducer&&) = delete;
LogProducer(const LogProducer&) = delete;
LogProducer& operator=(const LogProducer&) = delete;
LogProducer(LogProducer&&) = delete;
LogProducer& operator=(LogProducer&&) = delete;
~LogProducer() override = default;
~LogProducer() override = default;
/**
* @brief Queue a log message for asynchronous processing.
*
* Blocks while the channel applies backpressure.
*/
void Log(LogEntry log_entry) override;
/**
* @brief Queue a log message for asynchronous processing.
*
* Blocks while the channel applies backpressure. This blocking behavior
* under heavy load is an accepted trade-off for simplicity.
*/
void DoLog(LogEntry log_entry) override;
private:
BoundedChannel<LogEntry>& channel_;
BoundedChannel<LogEntry>& channel_;
};
#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_CHANNEL_LOGGER_H_

View File

@@ -34,9 +34,31 @@ class ILogger {
/**
* @brief Submit a log message to the logging subsystem.
*
* @param log_entry Structured log entry data.
* @param payload User-provided log data (level, phase, message).
* @param origin Auto-captured source location of the call site.
*/
virtual void Log(LogEntry log_entry) = 0;
void Log(LogDTO payload,
std::source_location origin = std::source_location::current(),
std::chrono::system_clock::time_point timestamp = std::chrono::system_clock::now(),
std::thread::id thread_id = std::this_thread::get_id()) {
LogEntry entry;
entry.timestamp = timestamp;
entry.thread_id = thread_id;
entry.level = payload.level;
entry.phase = payload.phase;
entry.message = std::move(payload.message);
entry.origin = origin;
DoLog(std::move(entry));
}
protected:
/**
* @brief Underlying implementation to transport the log entry.
*
* Implementations must be thread-safe as DoLog can be called concurrently
* from multiple worker threads.
*/
virtual void DoLog(LogEntry log_entry) = 0;
};
#endif // BIERGARTEN_PIPELINE_INCLUDES_SERVICES_LOGGING_LOGGER_H_

View File

@@ -71,10 +71,10 @@ std::optional<ApplicationOptions> ParseArguments(
return usage_stream.str();
})();
if (logger) {
logger->Log({.level = LogLevel::Info,
logger->Log(LogDTO{.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.message = title});
logger->Log({.level = LogLevel::Info,
logger->Log(LogDTO{.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.message = usage});
}
@@ -90,7 +90,7 @@ std::optional<ApplicationOptions> ParseArguments(
std::stringstream help_stream;
help_stream << "\n" << desc;
if (logger) {
logger->Log({.level = LogLevel::Info,
logger->Log(LogDTO{.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.message = help_stream.str()});
}
@@ -113,7 +113,7 @@ std::optional<ApplicationOptions> ParseArguments(
const std::string msg =
"Invalid arguments: --mocked and --model are mutually exclusive";
if (logger) {
logger->Log({.level = LogLevel::Error,
logger->Log(LogDTO{.level = LogLevel::Error,
.phase = PipelinePhase::Startup,
.message = msg});
} else {
@@ -126,7 +126,7 @@ std::optional<ApplicationOptions> ParseArguments(
const std::string msg =
"Invalid arguments: either --mocked or --model must be specified";
if (logger) {
logger->Log({.level = LogLevel::Error,
logger->Log(LogDTO{.level = LogLevel::Error,
.phase = PipelinePhase::Startup,
.message = msg});
} else {
@@ -170,7 +170,7 @@ std::optional<ApplicationOptions> ParseArguments(
const std::string msg =
"Sampling parameters are ignored when using --mocked";
if (logger) {
logger->Log({.level = LogLevel::Warn,
logger->Log(LogDTO{.level = LogLevel::Warn,
.phase = PipelinePhase::Startup,
.message = msg});
} else {
@@ -196,7 +196,7 @@ std::optional<ApplicationOptions> ParseArguments(
std::string("Failed to parse command-line arguments: ") +
exception.what();
if (logger) {
logger->Log({.level = LogLevel::Error,
logger->Log(LogDTO{.level = LogLevel::Error,
.phase = PipelinePhase::Startup,
.message = msg});
}
@@ -205,7 +205,7 @@ std::optional<ApplicationOptions> ParseArguments(
const std::string msg =
"Failed to parse command-line arguments: unknown error";
if (logger) {
logger->Log({.level = LogLevel::Error,
logger->Log(LogDTO{.level = LogLevel::Error,
.phase = PipelinePhase::Startup,
.message = msg});
}

View File

@@ -34,4 +34,4 @@ void BiergartenPipelineOrchestrator::LogResults() const {
logger_->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Teardown,
.message = oss.str()});
}
}

View File

@@ -66,8 +66,8 @@ int main(const int argc, char** argv) {
const LlamaBackendState llama_backend_state;
#endif
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.message = "STARTING PIPELINE"});
const std::optional<ApplicationOptions> parsed_options =
@@ -89,8 +89,8 @@ int main(const int argc, char** argv) {
prompt_directory = std::make_unique<PromptDirectory>(
options.pipeline.prompt_dir, log_producer);
} catch (const std::exception& dir_error) {
log_producer->Log({.level = LogLevel::Error,
.phase = PipelinePhase::Startup,
log_producer->Log({.level = LogLevel::Error,
.phase = PipelinePhase::Startup,
.message = std::format("Invalid --prompt-dir: {}",
dir_error.what())});
@@ -108,7 +108,7 @@ int main(const int argc, char** argv) {
{
log_producer->Log(
{.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.phase = PipelinePhase::Startup,
.message = "Prompt formatter: none (mock mode)"});
}
return std::unique_ptr<IPromptFormatter>(nullptr);
@@ -116,7 +116,7 @@ int main(const int argc, char** argv) {
{
log_producer->Log(
{.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.phase = PipelinePhase::Startup,
.message = "Prompt formatter: Gemma4JinjaPromptFormatter"});
}
return std::unique_ptr<IPromptFormatter>(
@@ -125,15 +125,15 @@ int main(const int argc, char** argv) {
di::bind<WebClient>().to([options, log_producer] {
if (options.generator.use_mocked) {
{
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.message = "Web client: none (mock mode)"});
}
return std::unique_ptr<WebClient>(nullptr);
}
{
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.message = "Web client: HttpWebClient"});
}
return std::unique_ptr<WebClient>(
@@ -144,15 +144,15 @@ int main(const int argc, char** argv) {
const auto& inj) -> std::unique_ptr<IEnrichmentService> {
if (options.generator.use_mocked) {
{
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.message = "Enrichment: mock"});
}
return std::make_unique<MockEnrichmentService>();
}
{
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.message = "Enrichment: Wikipedia"});
}
return std::make_unique<WikipediaEnrichmentService>(
@@ -164,8 +164,8 @@ int main(const int argc, char** argv) {
&log_producer](const auto& inj) -> std::unique_ptr<DataGenerator> {
if (options.generator.use_mocked) {
{
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.message = "Generator: mock"});
}
return std::make_unique<MockGenerator>();
@@ -173,7 +173,7 @@ int main(const int argc, char** argv) {
{
log_producer->Log(
{.level = LogLevel::Info,
.phase = PipelinePhase::Startup,
.phase = PipelinePhase::Startup,
.message = std::format(
"Generator: LlamaGenerator | model={} | temp={:.2f} "
"top_p={:.2f} top_k={} n_ctx={} seed={}",
@@ -190,23 +190,23 @@ int main(const int argc, char** argv) {
injector.create<std::unique_ptr<BiergartenPipelineOrchestrator>>();
if (!orchestrator->Run()) {
log_producer->Log({.level = LogLevel::Error,
.phase = PipelinePhase::Teardown,
log_producer->Log({.level = LogLevel::Error,
.phase = PipelinePhase::Teardown,
.message = "Pipeline execution failed"});
return shutdown(EXIT_FAILURE);
}
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Teardown,
log_producer->Log({.level = LogLevel::Info,
.phase = PipelinePhase::Teardown,
.message = std::format("Pipeline complete in {} ms",
timer.Elapsed())});
return shutdown(EXIT_SUCCESS);
} catch (const std::exception& exception) {
const LogEntry log_entry{.level = LogLevel::Error,
.phase = PipelinePhase::Teardown,
.message = exception.what()};
const LogDTO log_entry{.level = LogLevel::Error,
.phase = PipelinePhase::Teardown,
.message = exception.what()};
if (log_producer) {
log_producer->Log(log_entry);
} else {

View File

@@ -13,6 +13,29 @@
#include "concurrency/bounded_channel.h"
#include "services/logging/log_entry.h"
namespace {
[[nodiscard]] constexpr std::string_view PipelinePhaseToString(
PipelinePhase phase) {
switch (phase) {
case PipelinePhase::Startup:
return "Startup";
case PipelinePhase::UserGeneration:
return "User Generation";
case PipelinePhase::BreweryAndBeerGeneration:
return "Brewery & Beer Gen";
case PipelinePhase::CheckinGeneration:
return "Checkin Gen";
case PipelinePhase::RatingGeneration:
return "Rating Gen";
case PipelinePhase::FollowGeneration:
return "Follow Gen";
case PipelinePhase::Teardown:
return "Teardown";
}
return "Unknown";
}
} // namespace
LogDispatcher::LogDispatcher(BoundedChannel<LogEntry>& channel)
: channel_(channel) {}
@@ -28,7 +51,11 @@ void LogDispatcher::Run() {
const auto& log = entry.value();
logger->log(ToSpdlogLevel(log.level), log.message);
logger->log(ToSpdlogLevel(log.level),
"{:<20} │ thread: {:016x} │ [{}:{}] │ {}",
PipelinePhaseToString(log.phase),
std::hash<std::thread::id>{}(log.thread_id),
log.origin.file_name(), log.origin.line(), log.message);
}
}

View File

@@ -16,4 +16,4 @@
LogProducer::LogProducer(BoundedChannel<LogEntry>& channel)
: channel_(channel) {}
void LogProducer::Log(LogEntry entry) { channel_.Send(std::move(entry)); }
void LogProducer::DoLog(LogEntry entry) { channel_.Send(std::move(entry)); }