This commit is contained in:
Aaron Po
2026-04-20 23:56:27 -04:00
parent 6657015ee3
commit bbe8970bf6
2 changed files with 285 additions and 218 deletions

View File

@@ -1,18 +1,26 @@
@startuml future_possible_activity
@startuml biergarten_activity
skinparam defaultFontName "DM Sans"
skinparam defaultFontSize 13
skinparam titleFontName "Volkhov"
skinparam titleFontSize 20
skinparam backgroundColor #FAFCF9
skinparam defaultFontColor #28342A
skinparam titleFontColor #28342A
skinparam ArrowColor #628A5B
skinparam ActivityBackgroundColor #EAF0E8
skinparam ActivityBorderColor #547461
skinparam ActivityDiamondBackgroundColor #DCE8D8
skinparam ActivityDiamondBorderColor #547461
skinparam NoteBackgroundColor #EAF0E8
skinparam NoteBorderColor #547461
skinparam backgroundColor #FCFCF7
skinparam defaultFontColor #14180C
skinparam titleFontColor #14180C
skinparam ArrowColor #656F33
skinparam activityStartColor #EBECE3
skinparam activityEndColor #4A5837
skinparam activityStopColor #4A5837
skinparam ActivityBackgroundColor #EBECE3
skinparam ActivityBorderColor #4A5837
skinparam ActivityDiamondBackgroundColor #CBD2B5
skinparam ActivityDiamondBorderColor #4A5837
skinparam NoteBackgroundColor #DBEEDD
skinparam NoteFontColor #14180C
skinparam NoteBorderColor #4A5837
skinparam SwimlaneBorderColor #4A5837
skinparam SwimlaneBorderThickness 1
skinparam monochrome reverse
title The Biergarten Data Pipeline — Activity Diagram
@@ -27,6 +35,25 @@ endif
:Init CurlGlobalState & LlamaBackendState;
:Build DI injector;
:Initialize SqliteExportService;
note right
Opens SQLite connection.
Begins a single transaction
covering all five fixture types.
end note
:Create BoundedChannel<LogEntry> log_ch;
:Spawn Log Worker thread;
note right
Log worker drains log_ch for the
entire pipeline lifetime.
All workers emit LogEntry structs
via PipelineLogger — never spdlog directly.
end note
:BiergartenPipelineOrchestrator::Run();
|BiergartenPipelineOrchestrator::Run()|
:JsonLoader::LoadLocations("locations.json");
:JsonLoader::LoadBeerStyles("beer-styles.json");
:JsonLoader::LoadPersonas("personas.json");
@@ -42,17 +69,10 @@ end note
:EnrichmentService::PreWarmPersonaCache(personas);
note right
Persona descriptions do not need location context.
All persona Wikipedia/description lookups are
resolved and cached globally at startup.
All persona lookups are resolved and cached
globally at startup.
end note
:Initialize SqliteExportService;
note right
Opens SQLite connection.
Begins a single transaction
covering all five fixture types.
end note
:BiergartenPipelineOrchestrator::Run();
' ═══════════════════════════════════════════
' PHASE 0 — USER GENERATION
@@ -73,24 +93,27 @@ fork again
:IPersonaSelectionStrategy::SelectPersona(\n personas_palette_);
note right
Guaranteed cache hit from startup.
Returns a Persona struct with style_affinities,
abv_range, ibu_preference, checkin_weight.
Returns a Persona struct carrying
style_affinities, abv_range,
ibu_preference, checkin_weight.
end note
:NamesByCountry::SampleName(\n location.iso3166_1);
note right
Deterministic lookup — no LLM involved.
Name is selected from a pre-keyed table
Name selected from pre-keyed table
and passed into the generation prompt.
end note
:GenerateUser(location, persona, sampled_name)\nvia DataGenerator;
note right
LLM receives: Location fields + persona description
+ sampled name. Generates bio and preference
signals grounded in both.
LLM receives: Location fields + persona
description + sampled name. Generates
bio and preference signals grounded
in locale and persona.
end note
:PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "llm");
:Send GeneratedUser → llm_ch;
endwhile (no)
:Close llm_ch;
@@ -99,6 +122,7 @@ fork again
while (llm_ch has items?) is (yes)
:Receive GeneratedUser;
:ProcessUser(user) → sqlite3_int64;
:PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "sqlite");
:Append → user_pool_;
endwhile (no)
end fork
@@ -108,7 +132,6 @@ end fork
' ═══════════════════════════════════════════
' PHASE 1 — BREWERY & BEER GENERATION
' Combined into a single dependent unit of work.
' ═══════════════════════════════════════════
:RunBreweryAndBeerPhase(sampled_locations);
:Create BoundedChannels\n(loc_ch, llm_ch, exp_ch);
@@ -122,6 +145,7 @@ fork again
while (loc_ch has items?) is (yes)
:Receive Location;
:GetLocationContext(location,\nBreweryContextStrategy);
:PipelineLogger::Log(Info,\n BreweryAndBeerGeneration,\n city, nullopt, "enrichment");
:Send EnrichedCity → llm_ch;
endwhile (no)
|Orchestrator|
@@ -145,12 +169,8 @@ fork again
:Attach GeneratedBeer to Brewery bundle;
endwhile (done)
:PipelineLogger::Log(Info,\n BreweryAndBeerGeneration,\n city, brewery_id, "llm");
:Send BreweryWithBeers Bundle → exp_ch;
note right
The next generation of a brewery is
entirely dependent on the current
brewery and its beers completing.
end note
endwhile (no)
:Close exp_ch;
fork again
@@ -165,6 +185,8 @@ fork again
:ProcessBeer(beer) → sqlite3_int64;
:Append → beer_pool_;
endwhile (done)
:PipelineLogger::Log(Info,\n BreweryAndBeerGeneration,\n city, brewery_id, "sqlite");
endwhile (no)
end fork
@@ -177,16 +199,13 @@ end note
' ═══════════════════════════════════════════
' PHASE 2 — CHECKIN GENERATION
' Sequential now that Breweries/Beers are done.
' ═══════════════════════════════════════════
:RunCheckinPhase();
:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_);
note right
Weights are seeded from each user's
persona.checkin_weight — high-activity
personas (craft enthusiasts) check in more,
casual personas less. J-curve profile
emerges from the persona distribution.
Weights seeded from each user's
persona.checkin_weight. J-curve profile
emerges from persona distribution.
end note
while (For each GeneratedUser in user_pool_?) is (remaining)
@@ -196,6 +215,7 @@ while (For each GeneratedUser in user_pool_?) is (remaining)
:Select brewery from brewery_pool_;
:GenerateCheckin(user, brewery, timestamp)\nvia DataGenerator;
:ProcessCheckin(checkin) → sqlite3_int64;
:PipelineLogger::Log(Info, CheckinGeneration,\n nullopt, checkin_id, "sqlite");
:Append → checkin_pool_;
endwhile (done)
endwhile (done)
@@ -205,11 +225,9 @@ endwhile (done)
' ═══════════════════════════════════════════
:RunRatingPhase();
note right
Beer selection during rating is biased by
user.persona.style_affinities and abv_range
users are more likely to rate beers matching
their persona profile. Rating skew (positive
with long tail) is also modulated per persona.
Beer selection biased by
user.persona.style_affinities and abv_range.
Rating skew modulated per persona.
end note
while (For each GeneratedCheckin in checkin_pool_?) is (remaining)
@@ -217,7 +235,9 @@ while (For each GeneratedCheckin in checkin_pool_?) is (remaining)
if (Beer exists for brewery?) then (yes)
:GenerateRating(user, beer, checkin_id)\nvia DataGenerator;
:ProcessRating(rating);
:PipelineLogger::Log(Info, RatingGeneration,\n nullopt, rating_id, "sqlite");
else (no)
:PipelineLogger::Log(Warn, RatingGeneration,\n nullopt, brewery_id, "sqlite");
:Skip — brewery has no beers;
endif
endwhile (done)
@@ -230,6 +250,12 @@ endwhile (done)
note right
COMMIT covers all five fixture types.
end note
:Close log_ch;
:Join Log Worker;
note right
Drain guarantees no LogEntry is
dropped at shutdown.
end note
:spdlog::info "Pipeline complete in X ms";
stop