This commit is contained in:
Aaron Po
2026-04-22 22:07:58 -04:00
parent d40ce34363
commit 8f5471d96c
5 changed files with 530 additions and 546 deletions

View File

@@ -21,7 +21,6 @@ skinparam SwimlaneBorderColor #4A5837
skinparam SwimlaneBorderThickness 1 skinparam SwimlaneBorderThickness 1
skinparam monochrome reverse skinparam monochrome reverse
title The Biergarten Data Pipeline — Activity Diagram title The Biergarten Data Pipeline — Activity Diagram
|Main| |Main|
@@ -35,7 +34,6 @@ endif
:Init CurlGlobalState & LlamaBackendState; :Init CurlGlobalState & LlamaBackendState;
:Build DI injector; :Build DI injector;
:Initialize SqliteExportService; :Initialize SqliteExportService;
note right note right
Opens SQLite connection. Opens SQLite connection.
@@ -49,47 +47,51 @@ note right
Log worker drains log_ch for the Log worker drains log_ch for the
entire pipeline lifetime. entire pipeline lifetime.
All workers emit LogEntry structs All workers emit LogEntry structs
via PipelineLogger never spdlog directly. via PipelineLogger -- never spdlog directly.
end note end note
:BiergartenPipelineOrchestrator::Run(); :BiergartenPipelineOrchestrator::Run();
|BiergartenPipelineOrchestrator::Run()| |BiergartenPipelineOrchestrator::Run()|
:JsonLoader::LoadLocations("locations.json");
fork
:JsonLoader::LoadBeerStyles("beer-styles.json"); :JsonLoader::LoadBeerStyles("beer-styles.json");
:JsonLoader::LoadPersonas("personas.json");
:JsonLoader::LoadNamesByCountry("names-by-country.json");
:EnrichmentService::PreWarmBeerStyleCache(beer_styles); :EnrichmentService::PreWarmBeerStyleCache(beer_styles);
note right fork again
Beer styles do not need location context. :JsonLoader::LoadLocations("locations.json");
Wikipedia summaries for the entire palette are :EnrichmentService::PreWarmLocationCache(sampled_locations);
fetched and cached globally at startup. end fork
end note fork
:JsonLoader::LoadNamesByCountry("names-by-country.json");
:EnrichmentService::PreWarmPersonaCache(personas); fork again
note right :JsonLoader::LoadPersonas("personas.json");
Persona descriptions do not need location context. end fork
All persona lookups are resolved and cached
globally at startup.
end note
' ═══════════════════════════════════════════ ' ═══════════════════════════════════════════
' PHASE 0 — USER GENERATION ' PHASE 0 — USER GENERATION
' ═══════════════════════════════════════════ ' ═══════════════════════════════════════════
|Orchestrator| |Orchestrator|
:RunUserPhase(sampled_locations); :RunUserPhase(sampled_locations);
:Create BoundedChannels\n(loc_ch, llm_ch, exp_ch); :Create BoundedChannels\n(loc_ch, exp_ch);
fork fork
|Orchestrator| |Orchestrator|
:Loop: Send Locations loc_ch; :Loop: Send Locations -> loc_ch;
:Close loc_ch; :Close loc_ch;
note right
Producer closes loc_ch.
LLM Worker while loop
terminates on empty + closed.
end note
fork again fork again
|LLM Worker| |LLM Worker|
while (loc_ch has items?) is (yes) while (loc_ch has items?) is (yes)
:Receive Location; :Receive Location;
:GetLocationContextFromCache(location);
note right
Guaranteed cache hit from startup.
end note
:IPersonaSelectionStrategy::SelectPersona(\n personas_palette_); :IPersonaSelectionStrategy::SelectPersona(\n personas_palette_);
note right note right
Guaranteed cache hit from startup. Guaranteed cache hit from startup.
@@ -100,30 +102,35 @@ fork again
:NamesByCountry::SampleName(\n location.iso3166_1); :NamesByCountry::SampleName(\n location.iso3166_1);
note right note right
Deterministic lookup no LLM involved. Deterministic lookup -- no LLM involved.
Name selected from pre-keyed table Name selected from pre-keyed table
and passed into the generation prompt. and passed into the generation prompt.
end note end note
:GenerateUser(location, persona, sampled_name)\nvia DataGenerator; :GenerateUser(enriched_city, persona, sampled_name)\nvia DataGenerator;
note right note right
LLM receives: Location fields + persona LLM receives: EnrichedCity context + persona
description + sampled name. Generates description + sampled name. Generates
bio and preference signals grounded bio and preference signals grounded
in locale and persona. in locale and persona.
end note end note
:PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "llm"); :PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "llm");
:Send GeneratedUser → llm_ch; :Send GeneratedUser -> exp_ch;
endwhile (no) endwhile (no)
:Close llm_ch; :Close exp_ch;
note right
Producer closes exp_ch.
SQLite Worker while loop
terminates on empty + closed.
end note
fork again fork again
|SQLite Worker| |SQLite Worker|
while (llm_ch has items?) is (yes) while (exp_ch has items?) is (yes)
:Receive GeneratedUser; :Receive GeneratedUser;
:ProcessUser(user) → sqlite3_int64; :ProcessUser(user);
:PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "sqlite"); :PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "sqlite");
:Append user_pool_; :Append -> user_pool_;
endwhile (no) endwhile (no)
end fork end fork
@@ -131,62 +138,94 @@ end fork
:Join LLM Worker, SQLite Worker; :Join LLM Worker, SQLite Worker;
' ═══════════════════════════════════════════ ' ═══════════════════════════════════════════
' PHASE 1 — BREWERY & BEER GENERATION ' PHASE 1a — BREWERY GENERATION
' ═══════════════════════════════════════════ ' ═══════════════════════════════════════════
:RunBreweryAndBeerPhase(sampled_locations); :RunBreweryPhase(sampled_locations);
:Create BoundedChannels\n(loc_ch, llm_ch, exp_ch); :Create BoundedChannels\n(loc_ch, exp_ch);
fork fork
|Orchestrator| |Orchestrator|
:Loop: Send Locations loc_ch; :Loop: Send Locations -> loc_ch;
:Close loc_ch; :Close loc_ch;
fork again fork again
|Enrichment Workers (xN)| |LLM Worker|
while (loc_ch has items?) is (yes) while (loc_ch has items?) is (yes)
:Receive Location; :Receive Location;
:GetLocationContext(location,\nBreweryContextStrategy);
:PipelineLogger::Log(Info,\n BreweryAndBeerGeneration,\n city, nullopt, "enrichment"); :GetLocationContextFromCache(location);
:Send EnrichedCity → llm_ch; note right
Guaranteed cache hit from startup.
end note
:GenerateBrewery(enriched_city, context)\nvia DataGenerator;
note right
KV cache stays warm across all
brewery generations -- system prompt
does not change within this phase.
end note
:PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "llm");
:Send GeneratedBrewery -> exp_ch;
endwhile (no) endwhile (no)
:Close exp_ch;
fork again
|SQLite Worker|
while (exp_ch has items?) is (yes)
:Receive GeneratedBrewery;
:ProcessBrewery(brewery);
:PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "sqlite");
:Append -> brewery_pool_;
endwhile (no)
end fork
|Orchestrator| |Orchestrator|
:Join Enrichment Workers; :Join LLM Worker, SQLite Worker;
:Close llm_ch; note right
brewery_pool_ is now fully populated.
Phase 1b may begin.
end note
' ═══════════════════════════════════════════
' PHASE 1b — BEER GENERATION
' ═══════════════════════════════════════════
:RunBeerPhase();
:Create BoundedChannels\n(brew_ch, exp_ch);
fork
|Orchestrator|
:Loop: Send Breweries -> brew_ch;
:Close brew_ch;
fork again fork again
|LLM Worker| |LLM Worker|
while (llm_ch has items?) is (yes) while (brew_ch has items?) is (yes)
:Receive EnrichedCity; :Receive GeneratedBrewery;
:GenerateBrewery(location, context)\nvia DataGenerator;
:IBeerSelectionStrategy::SelectStyles(\n brewery, beer_style_palette_); :IBeerSelectionStrategy::SelectStyles(\n brewery, beer_style_palette_);
while (For each selected BeerStyle?) is (remaining) while (For each selected BeerStyle?) is (remaining)
:GetStyleContextFromCache(style); :GetStyleContextFromCache(style);
note right note right
Guaranteed cache hit from startup. Guaranteed cache hit from startup.
KV cache stays warm across all
beer generations -- system prompt
does not change within this phase.
end note end note
:GenerateBeer(brewery, style_context)\nvia DataGenerator; :GenerateBeer(brewery, style_context)\nvia DataGenerator;
:Attach GeneratedBeer to Brewery bundle; :Attach GeneratedBeer to bundle;
endwhile (done) endwhile (done)
:PipelineLogger::Log(Info,\n BreweryAndBeerGeneration,\n city, brewery_id, "llm"); :PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "llm");
:Send BreweryWithBeers Bundle exp_ch; :Send BeersBundle -> exp_ch;
endwhile (no) endwhile (no)
:Close exp_ch; :Close exp_ch;
fork again fork again
|SQLite Worker| |SQLite Worker|
while (exp_ch has items?) is (yes) while (exp_ch has items?) is (yes)
:Receive BreweryWithBeers Bundle; :Receive BeersBundle;
:ProcessBrewery(brewery) → brewery_id;
:Append → brewery_pool_;
while (For each beer in bundle?) is (remaining) while (For each beer in bundle?) is (remaining)
:Set beer.brewery_id = brewery_id; :Set beer.brewery_id from bundle;
:ProcessBeer(beer) → sqlite3_int64; :ProcessBeer(beer);
:Append beer_pool_; :Append -> beer_pool_;
endwhile (done) endwhile (done)
:PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "sqlite");
:PipelineLogger::Log(Info,\n BreweryAndBeerGeneration,\n city, brewery_id, "sqlite");
endwhile (no) endwhile (no)
end fork end fork
@@ -214,9 +253,9 @@ while (For each GeneratedUser in user_pool_?) is (remaining)
:TimestampFor(user, index); :TimestampFor(user, index);
:Select brewery from brewery_pool_; :Select brewery from brewery_pool_;
:GenerateCheckin(user, brewery, timestamp)\nvia DataGenerator; :GenerateCheckin(user, brewery, timestamp)\nvia DataGenerator;
:ProcessCheckin(checkin) → sqlite3_int64; :ProcessCheckin(checkin);
:PipelineLogger::Log(Info, CheckinGeneration,\n nullopt, checkin_id, "sqlite"); :PipelineLogger::Log(Info, CheckinGeneration,\n nullopt, checkin_id, "sqlite");
:Append checkin_pool_; :Append -> checkin_pool_;
endwhile (done) endwhile (done)
endwhile (done) endwhile (done)
@@ -231,14 +270,14 @@ note right
end note end note
while (For each GeneratedCheckin in checkin_pool_?) is (remaining) while (For each GeneratedCheckin in checkin_pool_?) is (remaining)
:Match brewery_id select beer from beer_pool_\n(same brewery_id, biased by persona affinities); :Match brewery_id, select beer from beer_pool_\n(same brewery_id, biased by persona affinities);
if (Beer exists for brewery?) then (yes) if (Beer exists for brewery?) then (yes)
:GenerateRating(user, beer, checkin_id)\nvia DataGenerator; :GenerateRating(user, beer, checkin_id)\nvia DataGenerator;
:ProcessRating(rating); :ProcessRating(rating);
:PipelineLogger::Log(Info, RatingGeneration,\n nullopt, rating_id, "sqlite"); :PipelineLogger::Log(Info, RatingGeneration,\n nullopt, rating_id, "sqlite");
else (no) else (no)
:PipelineLogger::Log(Warn, RatingGeneration,\n nullopt, brewery_id, "sqlite"); :PipelineLogger::Log(Warn, RatingGeneration,\n nullopt, brewery_id, "sqlite");
:Skip brewery has no beers; :Skip -- brewery has no beers;
endif endif
endwhile (done) endwhile (done)

View File

@@ -1,50 +1,17 @@
@startuml future_possible_architecture @startuml
' ========================================== ' ==========================================
' CONFIGURATION & STYLING ' CONFIGURATION & STYLING
' ========================================== ' ==========================================
left to right direction skinparam classAttributeFontSize 13
skinparam linetype ortho
' --- Typography --- ' --- Typography ---
skinparam defaultFontName "DM Sans" skinparam defaultFontName "DM Sans"
skinparam defaultFontSize 14 skinparam defaultFontSize 20
skinparam titleFontName "Volkhov" skinparam titleFontName "Volkhov"
skinparam titleFontSize 20 skinparam titleFontSize 30
' --- Global Colors --- package "Domain" {
skinparam backgroundColor #FCFCF7
skinparam defaultFontColor #14180C
skinparam titleFontColor #14180C
skinparam ArrowColor #656F33
skinparam class {
BackgroundColor #EBECE3
HeaderBackgroundColor #CBD2B5
BorderColor #4A5837
ArrowColor #656F33
FontColor #14180C
}
skinparam package {
BackgroundColor #DBEEDD
BorderColor #4A5837
FontColor #14180C
}
skinparam note {
BackgroundColor #DBEEDD
BorderColor #4A5837
FontColor #14180C
}
skinparam monochrome reverse
title The Biergarten Data Pipeline — Planned Architecture
' ==========================================
' DOMAIN MODELS
' ==========================================
package "Domain Models" { package "Domain Models" {
class Location { class Location {
@@ -62,8 +29,9 @@ package "Domain Models" {
+ text : std::string + text : std::string
+ completeness : Completeness + completeness : Completeness
+ char_count : size_t + char_count : size_t
-- }
<<enum>> Completeness
enum Completeness {
Full Full
Partial Partial
Absent Absent
@@ -156,6 +124,11 @@ package "Domain Models" {
+ generated_at : std::string + generated_at : std::string
} }
LocationContext *-- Completeness
}
package "Domain: Application Configuration"{
class SamplingOptions { class SamplingOptions {
+ temperature : float = 1.0F + temperature : float = 1.0F
+ top_p : float = 0.95F + top_p : float = 0.95F
@@ -184,66 +157,8 @@ package "Domain Models" {
ApplicationOptions *-- GeneratorOptions ApplicationOptions *-- GeneratorOptions
ApplicationOptions *-- PipelineOptions ApplicationOptions *-- PipelineOptions
GeneratorOptions *-- SamplingOptions GeneratorOptions *-- SamplingOptions
LocationContext *-- Completeness
} }
' ==========================================
' LOGGING
' ==========================================
package "Logging" {
enum LogLevel {
Debug
Info
Warn
Error
}
enum PipelinePhase {
Startup
UserGeneration
BreweryAndBeerGeneration
CheckinGeneration
RatingGeneration
Teardown
}
class LogEntry {
+ timestamp : std::chrono::system_clock::time_point
+ level : LogLevel
+ phase : PipelinePhase
+ message : std::string
+ city : std::optional<std::string>
+ entity_id : std::optional<std::string>
+ worker : std::optional<std::string>
}
interface Logger <<interface>> {
+ Log(level, phase, message,\n city, entity_id, worker) : void
}
class PipelineLogger {
- log_ch_ : BoundedChannel<LogEntry>&
+ Log(level, phase, message,\n city, entity_id, worker) : void
}
class LogWorker {
- log_ch_ : BoundedChannel<LogEntry>&
+ Run() : void
- FormatTimestamp(tp) : std::string
- ToSpdlogLevel(level) : spdlog::level::level_enum
- ToString(phase) : std::string
}
' --- Logging Relationships ---
LogEntry *-- LogLevel
LogEntry *-- PipelinePhase
PipelineLogger ..> LogEntry : emits
LogWorker ..> LogEntry : consumes
}
' ========================================== ' ==========================================
' DOMAIN POLICY ' DOMAIN POLICY
' ========================================== ' ==========================================
@@ -296,21 +211,12 @@ package "Domain Policy" {
+ CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t + CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t
+ TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string + TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string
} }
} }
}
' ========================================== ' ==========================================
' ORCHESTRATION ' ORCHESTRATION
' ========================================== ' ==========================================
package "Orchestration" {
interface DataPreloader <<interface>> {
+ LoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>
+ LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector<BeerStyle>
+ LoadPersonas(filepath : const std::filesystem::path&) : std::vector<Persona>
+ LoadNamesByCountry(filepath : const std::filesystem::path&) : NamesByCountry
}
class BiergartenPipelineOrchestrator { class BiergartenPipelineOrchestrator {
- preloader_ : std::unique_ptr<DataPreloader> - preloader_ : std::unique_ptr<DataPreloader>
@@ -336,13 +242,84 @@ package "Orchestration" {
- RunCheckinPhase() : void - RunCheckinPhase() : void
- RunRatingPhase() : void - RunRatingPhase() : void
} }
package "Infrastructure" {
package "Logging" {
enum LogLevel {
Debug
Info
Warn
Error
} }
enum PipelinePhase {
Startup
UserGeneration
BreweryAndBeerGeneration
CheckinGeneration
RatingGeneration
Teardown
}
' ========================================== class LogEntry {
' INFRASTRUCTURE: PRELOADING + timestamp : std::chrono::system_clock::time_point
' ========================================== + level : LogLevel
package "Infrastructure: Preloading" { + phase : PipelinePhase
+ message : std::string
+ city : std::optional<std::string>
+ entity_id : std::optional<std::string>
+ worker : std::optional<std::string>
}
interface Logger <<interface>> {
+ Log(level, phase, message,\n city, entity_id, worker) : void
}
class PipelineLogger {
- log_ch_ : BoundedChannel<LogEntry>&
+ Log(level, phase, message,\n city, entity_id, worker) : void
}
class LogWorker {
- log_ch_ : BoundedChannel<LogEntry>&
+ Run() : void
- FormatTimestamp(tp) : std::string
- ToSpdlogLevel(level) : spdlog::level::level_enum
- ToString(phase) : std::string
}
' --- Logging Relationships ---
LogEntry *-- LogLevel
LogEntry *-- PipelinePhase
PipelineLogger ..> LogEntry : emits
LogWorker ..> LogEntry : consumes
}
package "Pipeline Channel" {
class "BoundedChannel<T>" as BoundedChannel {
- queue_ : std::queue<T>
- mutex_ : std::mutex
- not_full_ : std::condition_variable
- not_empty_ : std::condition_variable
- capacity_ : size_t
- closed_ : bool
+ Send(item : T) : void
+ Receive() : std::optional<T>
+ Close() : void
}
}
package "Data Preloading" {
interface DataPreloader <<interface>> {
+ LoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>
+ LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector<BeerStyle>
+ LoadPersonas(filepath : const std::filesystem::path&) : std::vector<Persona>
+ LoadNamesByCountry(filepath : const std::filesystem::path&) : NamesByCountry
}
class JsonLoader { class JsonLoader {
+ LoadLocations(filepath : const std::filesystem::path&) : std::vector<Location> + LoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>
@@ -353,11 +330,7 @@ package "Infrastructure: Preloading" {
} }
package "Enrichment" {
' ==========================================
' INFRASTRUCTURE: ENRICHMENT
' ==========================================
package "Infrastructure: Enrichment" {
interface EnrichmentService <<interface>> { interface EnrichmentService <<interface>> {
+ GetLocationContext(loc : const Location&,\n strategy : const ContextStrategy&) : LocationContext + GetLocationContext(loc : const Location&,\n strategy : const ContextStrategy&) : LocationContext
@@ -382,11 +355,7 @@ package "Infrastructure: Enrichment" {
} }
package "Data Generation" {
' ==========================================
' INFRASTRUCTURE: GENERATION
' ==========================================
package "Infrastructure: Generation" {
interface DataGenerator <<interface>> { interface DataGenerator <<interface>> {
+ GenerateBrewery(location : const Location&,\n context : const LocationContext&) : BreweryResult + GenerateBrewery(location : const Location&,\n context : const LocationContext&) : BreweryResult
@@ -432,31 +401,7 @@ package "Infrastructure: Generation" {
} }
package "Data Export" {
' ==========================================
' INFRASTRUCTURE: PIPELINE CHANNEL
' ==========================================
package "Infrastructure: Pipeline Channel" {
class "BoundedChannel<T>" as BoundedChannel {
- queue_ : std::queue<T>
- mutex_ : std::mutex
- not_full_ : std::condition_variable
- not_empty_ : std::condition_variable
- capacity_ : size_t
- closed_ : bool
+ Send(item : T) : void
+ Receive() : std::optional<T>
+ Close() : void
}
}
' ==========================================
' INFRASTRUCTURE: EXPORT
' ==========================================
package "Infrastructure: Export" {
interface ExportService <<interface>> { interface ExportService <<interface>> {
+ Initialize() : void + Initialize() : void
@@ -502,7 +447,7 @@ package "Infrastructure: Export" {
} }
} }
}
' ========================================== ' ==========================================
' GLOBAL RELATIONSHIPS ' GLOBAL RELATIONSHIPS

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long