diff --git a/pipeline/diagrams/biergarten-weizen-theme.puml b/pipeline/diagrams/biergarten-weizen-theme.puml new file mode 100644 index 0000000..b31305d --- /dev/null +++ b/pipeline/diagrams/biergarten-weizen-theme.puml @@ -0,0 +1,34 @@ +skinparam shadowing false +skinparam backgroundColor #FCFCF7 +skinparam defaultFontName "DM Sans" +skinparam defaultFontColor #14180C +skinparam titleFontName "Volkhov" +skinparam titleFontColor #14180C +skinparam ArrowColor #656F33 +skinparam NoteBackgroundColor #DBEEDD +skinparam NoteFontColor #14180C +skinparam NoteBorderColor #4A5837 +skinparam SwimlaneBorderColor #4A5837 +skinparam SwimlaneBorderThickness 1 +skinparam activityStartColor #EBECE3 +skinparam activityEndColor #4A5837 +skinparam activityStopColor #4A5837 +skinparam ActivityBackgroundColor #EBECE3 +skinparam ActivityBorderColor #4A5837 +skinparam ActivityDiamondBackgroundColor #CBD2B5 +skinparam ActivityDiamondBorderColor #4A5837 +skinparam packageStyle rectangle +skinparam packageBackgroundColor #F1F3EA +skinparam packageBorderColor #4A5837 +skinparam packageFontColor #14180C +skinparam classBackgroundColor #EBECE3 +skinparam classBorderColor #4A5837 +skinparam classFontColor #14180C +skinparam classAttributeFontColor #3F4724 +skinparam classStereotypeFontColor #4A5837 +skinparam interfaceBackgroundColor #DBEEDD +skinparam interfaceBorderColor #4A5837 +skinparam interfaceFontColor #14180C +skinparam enumBackgroundColor #E4E6D8 +skinparam enumBorderColor #4A5837 +skinparam enumFontColor #14180C diff --git a/pipeline/diagrams/planned/activity.puml b/pipeline/diagrams/planned/activity.puml index 0e62dd2..679268c 100644 --- a/pipeline/diagrams/planned/activity.puml +++ b/pipeline/diagrams/planned/activity.puml @@ -1,25 +1,7 @@ @startuml biergarten_activity -skinparam defaultFontName "DM Sans" +!include ../biergarten-weizen-theme.puml skinparam defaultFontSize 13 -skinparam titleFontName "Volkhov" skinparam titleFontSize 20 -skinparam backgroundColor #FCFCF7 -skinparam defaultFontColor #14180C -skinparam titleFontColor #14180C -skinparam ArrowColor #656F33 -skinparam activityStartColor #EBECE3 -skinparam activityEndColor #4A5837 -skinparam activityStopColor #4A5837 -skinparam ActivityBackgroundColor #EBECE3 -skinparam ActivityBorderColor #4A5837 -skinparam ActivityDiamondBackgroundColor #CBD2B5 -skinparam ActivityDiamondBorderColor #4A5837 -skinparam NoteBackgroundColor #DBEEDD -skinparam NoteFontColor #14180C -skinparam NoteBorderColor #4A5837 -skinparam SwimlaneBorderColor #4A5837 -skinparam SwimlaneBorderThickness 1 -skinparam monochrome reverse title The Biergarten Data Pipeline — Activity Diagram @@ -37,8 +19,8 @@ endif :Initialize SqliteExportService; note right Opens SQLite connection. - Begins a single transaction - covering all five fixture types. + (Transactions are now managed + per-phase via batching). end note :Create BoundedChannel log_ch; @@ -126,12 +108,18 @@ fork again end note fork again |SQLite Worker| + :BEGIN TRANSACTION; while (exp_ch has items?) is (yes) :Receive GeneratedUser; :ProcessUser(user); :PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "sqlite"); :Append -> user_pool_; + if (Batch size reached?) then (yes) + :COMMIT & BEGIN; + else (no) + endif endwhile (no) + :COMMIT (Final); end fork |Orchestrator| @@ -145,23 +133,24 @@ end fork fork |Orchestrator| - :Loop: Send Locations -> loc_ch; + :Loop: Sample User from user_pool_ + and pair with Location; + :Send BreweryTask(Location, User) -> loc_ch; :Close loc_ch; fork again |LLM Worker| while (loc_ch has items?) is (yes) - :Receive Location; + :Receive BreweryTask(Location, User); - :GetLocationContextFromCache(location); + :GetLocationContextFromCache(task.location); note right Guaranteed cache hit from startup. end note - :GenerateBrewery(enriched_city, context)\nvia DataGenerator; + :GenerateBrewery(enriched_city, context, task.user)\nvia DataGenerator; note right - KV cache stays warm across all - brewery generations -- system prompt - does not change within this phase. + KV cache stays warm. + Brewery is linked to the sampled owner_user_id. end note :PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "llm"); :Send GeneratedBrewery -> exp_ch; @@ -169,12 +158,18 @@ fork again :Close exp_ch; fork again |SQLite Worker| + :BEGIN TRANSACTION; while (exp_ch has items?) is (yes) :Receive GeneratedBrewery; :ProcessBrewery(brewery); :PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "sqlite"); :Append -> brewery_pool_; + if (Batch size reached?) then (yes) + :COMMIT & BEGIN; + else (no) + endif endwhile (no) + :COMMIT (Final); end fork |Orchestrator| @@ -218,6 +213,7 @@ fork again :Close exp_ch; fork again |SQLite Worker| + :BEGIN TRANSACTION; while (exp_ch has items?) is (yes) :Receive BeersBundle; while (For each beer in bundle?) is (remaining) @@ -226,7 +222,12 @@ fork again :Append -> beer_pool_; endwhile (done) :PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "sqlite"); + if (Batch size reached?) then (yes) + :COMMIT & BEGIN; + else (no) + endif endwhile (no) + :COMMIT (Final); end fork |Orchestrator| @@ -247,6 +248,7 @@ note right emerges from persona distribution. end note +:BEGIN TRANSACTION; while (For each GeneratedUser in user_pool_?) is (remaining) :CheckinsForUser(user, brewery_pool_.size()); while (For each checkin index?) is (remaining) @@ -256,8 +258,13 @@ while (For each GeneratedUser in user_pool_?) is (remaining) :ProcessCheckin(checkin); :PipelineLogger::Log(Info, CheckinGeneration,\n nullopt, checkin_id, "sqlite"); :Append -> checkin_pool_; + if (Batch size reached?) then (yes) + :COMMIT & BEGIN; + else (no) + endif endwhile (done) endwhile (done) +:COMMIT (Final); ' ═══════════════════════════════════════════ ' PHASE 3 — RATING GENERATION @@ -269,33 +276,41 @@ note right Rating skew modulated per persona. end note +:BEGIN TRANSACTION; while (For each GeneratedCheckin in checkin_pool_?) is (remaining) :Match brewery_id, select beer from beer_pool_\n(same brewery_id, biased by persona affinities); if (Beer exists for brewery?) then (yes) :GenerateRating(user, beer, checkin_id)\nvia DataGenerator; :ProcessRating(rating); :PipelineLogger::Log(Info, RatingGeneration,\n nullopt, rating_id, "sqlite"); + if (Batch size reached?) then (yes) + :COMMIT & BEGIN; + else (no) + endif else (no) :PipelineLogger::Log(Warn, RatingGeneration,\n nullopt, brewery_id, "sqlite"); :Skip -- brewery has no beers; endif endwhile (done) +:COMMIT (Final); ' ═══════════════════════════════════════════ ' TEARDOWN ' ═══════════════════════════════════════════ -|Main| +|Orchestrator| :Finalize SqliteExportService; note right - COMMIT covers all five fixture types. + Safely closes the DB connection. end note :Close log_ch; + +|Main| +:spdlog::info "Pipeline complete in X ms"; :Join Log Worker; note right Drain guarantees no LogEntry is dropped at shutdown. end note -:spdlog::info "Pipeline complete in X ms"; stop @enduml diff --git a/pipeline/diagrams/planned/class.puml b/pipeline/diagrams/planned/class.puml index 9d63cea..9692a89 100644 --- a/pipeline/diagrams/planned/class.puml +++ b/pipeline/diagrams/planned/class.puml @@ -3,220 +3,413 @@ ' ========================================== ' CONFIGURATION & STYLING ' ========================================== -skinparam classAttributeFontSize 13 - -' --- Typography --- -skinparam defaultFontName "DM Sans" -skinparam defaultFontSize 20 -skinparam titleFontName "Volkhov" +!include ../biergarten-weizen-theme.puml +skinparam classAttributeFontSize 9 +skinparam defaultFontSize 25 skinparam titleFontSize 30 -package "Domain" { - package "Domain Models" { +package "Domain: Models" { - class Location { - + city : std::string - + state_province : std::string - + iso3166_2 : std::string - + country : std::string - + iso3166_1 : std::string - + local_languages : std::vector - + latitude : double - + longitude : double - } - - class LocationContext { - + text : std::string - + completeness : Completeness - + char_count : size_t - } - - enum Completeness { - Full - Partial - Absent - } - - class EnrichedCity { - + location : Location - + context : LocationContext - } - - class BeerStyle { - + name : std::string - + description : std::string - + min_abv : float - + max_abv : float - + min_ibu : int - + max_ibu : int - } - - class BreweryResult { - + name_en : std::string - + description_en : std::string - + name_local : std::string - + description_local : std::string - } - - class BeerResult { - + name_en : std::string - + description_en : std::string - + name_local : std::string - + description_local : std::string - + style : std::string - + abv : float - + ibu : int - } - - class UserResult { - + username : std::string - + bio : std::string - + activity_weight : float - } - - class CheckinResult { - + checked_in_at : std::string - + note : std::string - } - - class RatingResult { - + score : float - + note : std::string - } - - class GeneratedBrewery { - + brewery_id : sqlite3_int64 - + location : Location - + brewery : BreweryResult - + context_completeness : LocationContext::Completeness - + generated_at : std::string - } - - class GeneratedBeer { - + beer_id : sqlite3_int64 - + brewery_id : sqlite3_int64 - + location : Location - + style : BeerStyle - + beer : BeerResult - + generated_at : std::string - } - - class GeneratedUser { - + user_id : sqlite3_int64 - + location : Location - + user : UserResult - + generated_at : std::string - } - - class GeneratedCheckin { - + checkin_id : sqlite3_int64 - + user_id : sqlite3_int64 - + brewery_id : sqlite3_int64 - + checkin : CheckinResult - + generated_at : std::string - } - - class GeneratedRating { - + user_id : sqlite3_int64 - + beer_id : sqlite3_int64 - + checkin_id : sqlite3_int64 - + rating : RatingResult - + generated_at : std::string - } - - - LocationContext *-- Completeness + class Location { + + city : std::string + + state_province : std::string + + iso3166_2 : std::string + + country : std::string + + iso3166_1 : std::string + + local_languages : std::vector + + latitude : double + + longitude : double } - package "Domain: Application Configuration"{ - class SamplingOptions { - + temperature : float = 1.0F - + top_p : float = 0.95F - + top_k : uint32_t = 64 - + n_ctx : uint32_t = 8192 - + seed : int = -1 - } - - class GeneratorOptions { - + model_path : std::filesystem::path - + use_mocked : bool = false - + sampling : SamplingOptions - } - - class PipelineOptions { - + output_path : std::filesystem::path - + log_path : std::filesystem::path - } - - class ApplicationOptions { - + generator : GeneratorOptions - + pipeline : PipelineOptions - } - - ' --- Domain Model Relationships --- - ApplicationOptions *-- GeneratorOptions - ApplicationOptions *-- PipelineOptions - GeneratorOptions *-- SamplingOptions + class LocationContext { + + text : std::string + + completeness : Completeness + + char_count : size_t } - ' ========================================== - ' DOMAIN POLICY - ' ========================================== - package "Domain Policy" { + enum Completeness { + Full + Partial + Absent + } - interface ContextStrategy <> { - + QueriesFor(loc : const Location&) : std::vector - + MaxContextChars() : size_t - } + class EnrichedCity { + + location : Location + + context : LocationContext + } - class BreweryContextStrategy { - + QueriesFor(loc : const Location&) : std::vector - + MaxContextChars() : size_t - } + class BeerStyle { + + name : std::string + + description : std::string + + min_abv : float + + max_abv : float + + min_ibu : int + + max_ibu : int + } - class BeerContextStrategy { - + QueriesFor(loc : const Location&) : std::vector - + MaxContextChars() : size_t - } + class BreweryResult { + + name_en : std::string + + description_en : std::string + + name_local : std::string + + description_local : std::string + } - interface SamplingStrategy <> { - + Sample(locations : const std::vector&) : std::vector - } + class BeerResult { + + name_en : std::string + + description_en : std::string + + name_local : std::string + + description_local : std::string + + style : std::string + + abv : float + + ibu : int + } - class UniformSamplingStrategy { - - sample_size_ : size_t - + Sample(locations : const std::vector&) : std::vector - } + class UserResult { + + username : std::string + + bio : std::string + + activity_weight : float + } - interface BeerSelectionStrategy <> { - + SelectStyles(brewery : const GeneratedBrewery&,\n palette : std::span) : std::vector - } + class CheckinResult { + + checked_in_at : std::string + + note : std::string + } - class RandomBeerSelectionStrategy { - - rng_ : std::mt19937 - - min_beers_ : size_t - - max_beers_ : size_t - + SelectStyles(brewery : const GeneratedBrewery&,\n palette : std::span) : std::vector - } + class RatingResult { + + score : float + + note : std::string + } - interface CheckinDistributionStrategy <> { - + AssignActivityWeights(users : std::vector&) : void - + CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t - + TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string - } + class GeneratedBrewery { + + brewery_id : sqlite3_int64 + + location : Location + + brewery : BreweryResult + + context_completeness : LocationContext::Completeness + + generated_at : std::string + } - class JCurveCheckinStrategy { - - rng_ : std::mt19937 - + AssignActivityWeights(users : std::vector&) : void - + CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t - + TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string - } + class GeneratedBeer { + + beer_id : sqlite3_int64 + + brewery_id : sqlite3_int64 + + location : Location + + style : BeerStyle + + beer : BeerResult + + generated_at : std::string + } + + class GeneratedUser { + + user_id : sqlite3_int64 + + location : Location + + user : UserResult + + generated_at : std::string + } + + class GeneratedCheckin { + + checkin_id : sqlite3_int64 + + user_id : sqlite3_int64 + + brewery_id : sqlite3_int64 + + checkin : CheckinResult + + generated_at : std::string + } + + class GeneratedRating { + + user_id : sqlite3_int64 + + beer_id : sqlite3_int64 + + checkin_id : sqlite3_int64 + + rating : RatingResult + + generated_at : std::string + } + + + LocationContext *-- Completeness +} + +package "Domain: Application Configuration"{ + class SamplingOptions { + + temperature : float = 1.0F + + top_p : float = 0.95F + + top_k : uint32_t = 64 + + n_ctx : uint32_t = 8192 + + seed : int = -1 + } + + class GeneratorOptions { + + model_path : std::filesystem::path + + use_mocked : bool = false + + sampling : SamplingOptions + } + + class PipelineOptions { + + output_path : std::filesystem::path + + log_path : std::filesystem::path + } + + class ApplicationOptions { + + generator : GeneratorOptions + + pipeline : PipelineOptions + } + + ' --- Domain Model Relationships --- + ApplicationOptions *-- GeneratorOptions + ApplicationOptions *-- PipelineOptions + GeneratorOptions *-- SamplingOptions +} + +package "Domain: Policy" { + + interface ContextStrategy <> { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + class BreweryContextStrategy { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + class BeerContextStrategy { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + interface SamplingStrategy <> { + + Sample(locations : const std::vector&) : std::vector + } + + class UniformSamplingStrategy { + - sample_size_ : size_t + + Sample(locations : const std::vector&) : std::vector + } + + interface BeerSelectionStrategy <> { + + SelectStyles(brewery : const GeneratedBrewery&,\n palette : std::span) : std::vector + } + + class RandomBeerSelectionStrategy { + - rng_ : std::mt19937 + - min_beers_ : size_t + - max_beers_ : size_t + + SelectStyles(brewery : const GeneratedBrewery&,\n palette : std::span) : std::vector + } + + interface CheckinDistributionStrategy <> { + + AssignActivityWeights(users : std::vector&) : void + + CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t + + TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string + } + + class JCurveCheckinStrategy { + - rng_ : std::mt19937 + + AssignActivityWeights(users : std::vector&) : void + + CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t + + TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string } } -' ========================================== -' ORCHESTRATION -' ========================================== +package "Infrastructure: Logging" { + enum LogLevel { + Debug + Info + Warn + Error + } + + enum PipelinePhase { + Startup + UserGeneration + BreweryAndBeerGeneration + CheckinGeneration + RatingGeneration + Teardown + } + + class LogEntry { + + timestamp : std::chrono::system_clock::time_point + + level : LogLevel + + phase : PipelinePhase + + message : std::string + + city : std::optional + + entity_id : std::optional + + worker : std::optional + } + + interface Logger <> { + + Log(level, phase, message,\n city, entity_id, worker) : void + } + + class PipelineLogger { + - log_ch_ : BoundedChannel& + + Log(level, phase, message,\n city, entity_id, worker) : void + } + + class LogWorker { + - log_ch_ : BoundedChannel& + + Run() : void + - FormatTimestamp(tp) : std::string + - ToSpdlogLevel(level) : spdlog::level::level_enum + - ToString(phase) : std::string + } + + ' --- Logging Relationships --- + LogEntry *-- LogLevel + LogEntry *-- PipelinePhase + PipelineLogger ..> LogEntry : emits + LogWorker ..> LogEntry : consumes +} + +package "Infrastructure: Pipeline Channel" { + + class "BoundedChannel" as BoundedChannel { + - queue_ : std::queue + - mutex_ : std::mutex + - not_full_ : std::condition_variable + - not_empty_ : std::condition_variable + - capacity_ : size_t + - closed_ : bool + + Send(item : T) : void + + Receive() : std::optional + + Close() : void + } + +} + +package "Infrastructure: Data Preloading" { + + interface DataPreloader <> { + + LoadLocations(filepath : const std::filesystem::path&) : std::vector + + LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector + + LoadPersonas(filepath : const std::filesystem::path&) : std::vector + + LoadNamesByCountry(filepath : const std::filesystem::path&) : NamesByCountry + } + + class JsonLoader { + + LoadLocations(filepath : const std::filesystem::path&) : std::vector + + LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector + + LoadPersonas(filepath : const std::filesystem::path&) : std::vector + + LoadNamesByCountry(filepath : const std::filesystem::path&) : NamesByCountry + } + +} + +package "Infrastructure: Enrichment" { + + interface EnrichmentService <> { + + GetLocationContext(loc : const Location&,\n strategy : const ContextStrategy&) : LocationContext + } + + class WikipediaService { + - client_ : std::unique_ptr + - extract_cache_ : std::unordered_map + + GetLocationContext(loc : const Location&,\n strategy : const ContextStrategy&) : LocationContext + - FetchExtract(query : std::string_view) : std::string + } + + interface WebClient <> { + + Get(url : const std::string&) : std::string + + UrlEncode(value : const std::string&) : std::string + } + + class CURLWebClient { + + Get(url : const std::string&) : std::string + + UrlEncode(value : const std::string&) : std::string + } + +} + +package "Infrastructure: Data Generation" { + + interface DataGenerator <> { + + GenerateBrewery(location : const Location&,\n context : const LocationContext&) : BreweryResult + + GenerateBeer(brewery_id : sqlite3_int64,\n location : const Location&,\n context : const LocationContext&,\n style : const BeerStyle&) : BeerResult + + GenerateUser(location : const Location&) : UserResult + + GenerateCheckin(user : const GeneratedUser&,\n brewery : const GeneratedBrewery&,\n timestamp : const std::string&) : CheckinResult + + GenerateRating(user : const GeneratedUser&,\n beer : const GeneratedBeer&,\n checkin_id : sqlite3_int64) : RatingResult + } + + class MockGenerator { + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + - DeterministicHash(location : const Location&) : size_t + } + + class LlamaGenerator { + - model_ : ModelHandle + - context_ : ContextHandle + - prompt_formatter_ : std::unique_ptr + - rng_ : std::mt19937 + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + - Load(opts : const GeneratorOptions&) : void + - Infer(system_prompt, user_prompt,\n max_tokens, grammar) : std::string + - ValidateModelArchitecture() : void + } + + interface PromptFormatter <> { + + Format(system_prompt : std::string_view,\n user_prompt : std::string_view) : std::string + + ExpectedArchitecture() : std::string_view + } + + class Gemma4JinjaPromptFormatter { + + Format(...) : std::string + + ExpectedArchitecture() : std::string_view + } + +} + +package "Infrastructure: Data Export" { + + interface ExportService <> { + + Initialize() : void + + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 + + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 + + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 + + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 + + ProcessRating(rating : const GeneratedRating&) : void + + Finalize() : void + } + + class SqliteExportService { + - date_time_provider_ : std::unique_ptr + - db_handle_ : SqliteDatabaseHandle + - insert_location_stmt_ : SqliteStatementHandle + - insert_brewery_stmt_ : SqliteStatementHandle + - insert_beer_stmt_ : SqliteStatementHandle + - insert_user_stmt_ : SqliteStatementHandle + - insert_checkin_stmt_ : SqliteStatementHandle + - insert_rating_stmt_ : SqliteStatementHandle + - transaction_open_ : bool + - location_cache_ : std::unordered_map + - brewery_cache_ : std::unordered_map + + Initialize() : void + + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 + + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 + + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 + + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 + + ProcessRating(rating : const GeneratedRating&) : void + + Finalize() : void + - InitializeSchema() : void + - PrepareStatements() : void + - RollbackAndCloseNoThrow() : void + - FinalizeStatements() : void + } + + interface DateTimeProvider <> { + + GetUtcTimestamp() : std::string + } + + class SystemDateTimeProvider { + + GetUtcTimestamp() : std::string + } + +} + + class BiergartenPipelineOrchestrator { - preloader_ : std::unique_ptr @@ -243,216 +436,6 @@ class BiergartenPipelineOrchestrator { - RunRatingPhase() : void } -package "Infrastructure" { - - package "Logging" { - enum LogLevel { - Debug - Info - Warn - Error - } - - enum PipelinePhase { - Startup - UserGeneration - BreweryAndBeerGeneration - CheckinGeneration - RatingGeneration - Teardown - } - - class LogEntry { - + timestamp : std::chrono::system_clock::time_point - + level : LogLevel - + phase : PipelinePhase - + message : std::string - + city : std::optional - + entity_id : std::optional - + worker : std::optional - } - - interface Logger <> { - + Log(level, phase, message,\n city, entity_id, worker) : void - } - - class PipelineLogger { - - log_ch_ : BoundedChannel& - + Log(level, phase, message,\n city, entity_id, worker) : void - } - - class LogWorker { - - log_ch_ : BoundedChannel& - + Run() : void - - FormatTimestamp(tp) : std::string - - ToSpdlogLevel(level) : spdlog::level::level_enum - - ToString(phase) : std::string - } - - ' --- Logging Relationships --- - LogEntry *-- LogLevel - LogEntry *-- PipelinePhase - PipelineLogger ..> LogEntry : emits - LogWorker ..> LogEntry : consumes - } - - package "Pipeline Channel" { - - class "BoundedChannel" as BoundedChannel { - - queue_ : std::queue - - mutex_ : std::mutex - - not_full_ : std::condition_variable - - not_empty_ : std::condition_variable - - capacity_ : size_t - - closed_ : bool - + Send(item : T) : void - + Receive() : std::optional - + Close() : void - } - - } - - package "Data Preloading" { - - interface DataPreloader <> { - + LoadLocations(filepath : const std::filesystem::path&) : std::vector - + LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector - + LoadPersonas(filepath : const std::filesystem::path&) : std::vector - + LoadNamesByCountry(filepath : const std::filesystem::path&) : NamesByCountry - } - - class JsonLoader { - + LoadLocations(filepath : const std::filesystem::path&) : std::vector - + LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector - + LoadPersonas(filepath : const std::filesystem::path&) : std::vector - + LoadNamesByCountry(filepath : const std::filesystem::path&) : NamesByCountry - } - - } - - package "Enrichment" { - - interface EnrichmentService <> { - + GetLocationContext(loc : const Location&,\n strategy : const ContextStrategy&) : LocationContext - } - - class WikipediaService { - - client_ : std::unique_ptr - - extract_cache_ : std::unordered_map - + GetLocationContext(loc : const Location&,\n strategy : const ContextStrategy&) : LocationContext - - FetchExtract(query : std::string_view) : std::string - } - - interface WebClient <> { - + Get(url : const std::string&) : std::string - + UrlEncode(value : const std::string&) : std::string - } - - class CURLWebClient { - + Get(url : const std::string&) : std::string - + UrlEncode(value : const std::string&) : std::string - } - - } - - package "Data Generation" { - - interface DataGenerator <> { - + GenerateBrewery(location : const Location&,\n context : const LocationContext&) : BreweryResult - + GenerateBeer(brewery_id : sqlite3_int64,\n location : const Location&,\n context : const LocationContext&,\n style : const BeerStyle&) : BeerResult - + GenerateUser(location : const Location&) : UserResult - + GenerateCheckin(user : const GeneratedUser&,\n brewery : const GeneratedBrewery&,\n timestamp : const std::string&) : CheckinResult - + GenerateRating(user : const GeneratedUser&,\n beer : const GeneratedBeer&,\n checkin_id : sqlite3_int64) : RatingResult - } - - class MockGenerator { - + GenerateBrewery(...) : BreweryResult - + GenerateBeer(...) : BeerResult - + GenerateUser(...) : UserResult - + GenerateCheckin(...) : CheckinResult - + GenerateRating(...) : RatingResult - - DeterministicHash(location : const Location&) : size_t - } - - class LlamaGenerator { - - model_ : ModelHandle - - context_ : ContextHandle - - prompt_formatter_ : std::unique_ptr - - rng_ : std::mt19937 - + GenerateBrewery(...) : BreweryResult - + GenerateBeer(...) : BeerResult - + GenerateUser(...) : UserResult - + GenerateCheckin(...) : CheckinResult - + GenerateRating(...) : RatingResult - - Load(opts : const GeneratorOptions&) : void - - Infer(system_prompt, user_prompt,\n max_tokens, grammar) : std::string - - ValidateModelArchitecture() : void - } - - interface PromptFormatter <> { - + Format(system_prompt : std::string_view,\n user_prompt : std::string_view) : std::string - + ExpectedArchitecture() : std::string_view - } - - class Gemma4JinjaPromptFormatter { - + Format(...) : std::string - + ExpectedArchitecture() : std::string_view - } - - } - - package "Data Export" { - - interface ExportService <> { - + Initialize() : void - + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 - + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 - + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 - + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 - + ProcessRating(rating : const GeneratedRating&) : void - + Finalize() : void - } - - class SqliteExportService { - - date_time_provider_ : std::unique_ptr - - db_handle_ : SqliteDatabaseHandle - - insert_location_stmt_ : SqliteStatementHandle - - insert_brewery_stmt_ : SqliteStatementHandle - - insert_beer_stmt_ : SqliteStatementHandle - - insert_user_stmt_ : SqliteStatementHandle - - insert_checkin_stmt_ : SqliteStatementHandle - - insert_rating_stmt_ : SqliteStatementHandle - - transaction_open_ : bool - - location_cache_ : std::unordered_map - - brewery_cache_ : std::unordered_map - + Initialize() : void - + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 - + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 - + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 - + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 - + ProcessRating(rating : const GeneratedRating&) : void - + Finalize() : void - - InitializeSchema() : void - - PrepareStatements() : void - - RollbackAndCloseNoThrow() : void - - FinalizeStatements() : void - } - - interface DateTimeProvider <> { - + GetUtcTimestamp() : std::string - } - - class SystemDateTimeProvider { - + GetUtcTimestamp() : std::string - } - - } -} - -' ========================================== -' GLOBAL RELATIONSHIPS -' ========================================== - ' --- Orchestration Aggregations (Services & Strategies) --- BiergartenPipelineOrchestrator *-- DataPreloader BiergartenPipelineOrchestrator *-- EnrichmentService