@startuml biergarten_activity !include ../biergarten-weizen-theme.puml skinparam defaultFontSize 13 skinparam titleFontSize 20 title The Biergarten Data Pipeline — Activity Diagram |Main| start :ParseArguments(argc, argv); if (Invalid args?) then (yes) :spdlog::error; stop else (no) endif :Init CurlGlobalState & LlamaBackendState; :Build DI injector; :Initialize SqliteExportService; note right Opens SQLite connection. (Transactions are now managed per-phase via batching). end note :Create BoundedChannel log_ch; :Spawn Log Worker thread; note right Log worker drains log_ch for the entire pipeline lifetime. All workers emit LogEntry structs via PipelineLogger -- never spdlog directly. end note :BiergartenPipelineOrchestrator::Run(); |BiergartenPipelineOrchestrator::Run()| fork :JsonLoader::LoadBeerStyles("beer-styles.json"); :EnrichmentService::PreWarmBeerStyleCache(beer_styles); fork again :JsonLoader::LoadLocations("locations.json"); :EnrichmentService::PreWarmLocationCache(sampled_locations); end fork fork :JsonLoader::LoadNamesByCountry("names-by-country.json"); fork again :JsonLoader::LoadPersonas("personas.json"); end fork ' ═══════════════════════════════════════════ ' PHASE 0 — USER GENERATION ' ═══════════════════════════════════════════ |Orchestrator| :RunUserPhase(sampled_locations); :Create BoundedChannels\n(loc_ch, exp_ch); fork |Orchestrator| :Loop: Send Locations -> loc_ch; :Close loc_ch; note right Producer closes loc_ch. LLM Worker while loop terminates on empty + closed. end note fork again |LLM Worker| while (loc_ch has items?) is (yes) :Receive Location; :GetLocationContextFromCache(location); note right Guaranteed cache hit from startup. end note :IPersonaSelectionStrategy::SelectPersona(\n personas_palette_); note right Guaranteed cache hit from startup. Returns a Persona struct carrying style_affinities, abv_range, ibu_preference, checkin_weight. end note :NamesByCountry::SampleName(\n location.iso3166_1); note right Deterministic lookup -- no LLM involved. Name selected from pre-keyed table and passed into the generation prompt. end note :GenerateUser(enriched_city, persona, sampled_name)\nvia DataGenerator; note right LLM receives: EnrichedCity context + persona description + sampled name. Generates bio and preference signals grounded in locale and persona. end note :PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "llm"); :Send GeneratedUser -> exp_ch; endwhile (no) :Close exp_ch; note right Producer closes exp_ch. SQLite Worker while loop terminates on empty + closed. end note fork again |SQLite Worker| :BEGIN TRANSACTION; while (exp_ch has items?) is (yes) :Receive GeneratedUser; :ProcessUser(user); :PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "sqlite"); :Append -> user_pool_; if (Batch size reached?) then (yes) :COMMIT & BEGIN; else (no) endif endwhile (no) :COMMIT (Final); end fork |Orchestrator| :Join LLM Worker, SQLite Worker; ' ═══════════════════════════════════════════ ' PHASE 1a — BREWERY GENERATION ' ═══════════════════════════════════════════ :RunBreweryPhase(sampled_locations); :Create BoundedChannels\n(loc_ch, exp_ch); fork |Orchestrator| :Loop: Sample User from user_pool_ and pair with Location; :Send BreweryTask(Location, User) -> loc_ch; :Close loc_ch; fork again |LLM Worker| while (loc_ch has items?) is (yes) :Receive BreweryTask(Location, User); :GetLocationContextFromCache(task.location); note right Guaranteed cache hit from startup. end note :GenerateBrewery(enriched_city, context, task.user)\nvia DataGenerator; note right KV cache stays warm. Brewery is linked to the sampled owner_user_id. end note :PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "llm"); :Send GeneratedBrewery -> exp_ch; endwhile (no) :Close exp_ch; fork again |SQLite Worker| :BEGIN TRANSACTION; while (exp_ch has items?) is (yes) :Receive GeneratedBrewery; :ProcessBrewery(brewery); :PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "sqlite"); :Append -> brewery_pool_; if (Batch size reached?) then (yes) :COMMIT & BEGIN; else (no) endif endwhile (no) :COMMIT (Final); end fork |Orchestrator| :Join LLM Worker, SQLite Worker; note right brewery_pool_ is now fully populated. Phase 1b may begin. end note ' ═══════════════════════════════════════════ ' PHASE 1b — BEER GENERATION ' ═══════════════════════════════════════════ :RunBeerPhase(); :Create BoundedChannels\n(brew_ch, exp_ch); fork |Orchestrator| :Loop: Send Breweries -> brew_ch; :Close brew_ch; fork again |LLM Worker| while (brew_ch has items?) is (yes) :Receive GeneratedBrewery; :IBeerSelectionStrategy::SelectStyles(\n brewery, beer_style_palette_); while (For each selected BeerStyle?) is (remaining) :GetStyleContextFromCache(style); note right Guaranteed cache hit from startup. KV cache stays warm across all beer generations -- system prompt does not change within this phase. end note :GenerateBeer(brewery, style_context)\nvia DataGenerator; :Attach GeneratedBeer to bundle; endwhile (done) :PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "llm"); :Send BeersBundle -> exp_ch; endwhile (no) :Close exp_ch; fork again |SQLite Worker| :BEGIN TRANSACTION; while (exp_ch has items?) is (yes) :Receive BeersBundle; while (For each beer in bundle?) is (remaining) :Set beer.brewery_id from bundle; :ProcessBeer(beer); :Append -> beer_pool_; endwhile (done) :PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "sqlite"); if (Batch size reached?) then (yes) :COMMIT & BEGIN; else (no) endif endwhile (no) :COMMIT (Final); end fork |Orchestrator| :Join LLM Worker, SQLite Worker; note right Both brewery_pool_ and beer_pool_ are now completely populated. Checkin and Follow phases may now run in parallel. end note ' ═══════════════════════════════════════════ ' PHASE 2 — CHECKIN + FOLLOW GENERATION ' (parallel — both depend only on user_pool_ ' and brewery_pool_ being fully populated) ' ═══════════════════════════════════════════ fork |Orchestrator| :RunCheckinPhase(); :ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_); note right Weights seeded from each user's persona.checkin_weight. J-curve profile emerges from persona distribution. end note :BEGIN TRANSACTION; while (For each GeneratedUser in user_pool_?) is (remaining) :CheckinsForUser(user, brewery_pool_.size()); while (For each checkin index?) is (remaining) :TimestampFor(user, index); :Select brewery from brewery_pool_; :GenerateCheckin(user, brewery, timestamp)\nvia DataGenerator; :ProcessCheckin(checkin); :PipelineLogger::Log(Info, CheckinGeneration,\n nullopt, checkin_id, "sqlite"); :Append -> checkin_pool_; if (Batch size reached?) then (yes) :COMMIT & BEGIN; else (no) endif endwhile (done) endwhile (done) :COMMIT (Final); fork again |Orchestrator| :RunFollowPhase(); :IFollowGenerationStrategy::\nAssignFollowWeights(user_pool_); note right For RandomFollowStrategy, weights are uniform. For ActivityWeightedFollowStrategy, weights derived from user.activity_weight so high-activity users attract more followers. end note :BEGIN TRANSACTION; :IFollowGenerationStrategy::\nGenerateFollows(user_pool_); note right Self-follow constraint (follower_id != followed_id) enforced here and at the DB schema level. end note while (For each GeneratedFollow?) is (remaining) :ProcessFollow(follow); :PipelineLogger::Log(Info, FollowGeneration,\n nullopt, follower_id, "sqlite"); :Append -> follow_pool_; if (Batch size reached?) then (yes) :COMMIT & BEGIN; else (no) endif endwhile (done) :COMMIT (Final); end fork |Orchestrator| :Join CheckinPhase, FollowPhase; note right checkin_pool_ and follow_pool_ are now fully populated. Rating phase may begin. end note ' ═══════════════════════════════════════════ ' PHASE 3 — RATING GENERATION ' ═══════════════════════════════════════════ :RunRatingPhase(); note right Beer selection biased by user.persona.style_affinities and abv_range. Rating skew modulated per persona. end note :BEGIN TRANSACTION; while (For each GeneratedCheckin in checkin_pool_?) is (remaining) :Match brewery_id, select beer from beer_pool_\n(same brewery_id, biased by persona affinities); if (Beer exists for brewery?) then (yes) :GenerateRating(user, beer, checkin_id)\nvia DataGenerator; :ProcessRating(rating); :PipelineLogger::Log(Info, RatingGeneration,\n nullopt, rating_id, "sqlite"); if (Batch size reached?) then (yes) :COMMIT & BEGIN; else (no) endif else (no) :PipelineLogger::Log(Warn, RatingGeneration,\n nullopt, brewery_id, "sqlite"); :Skip -- brewery has no beers; endif endwhile (done) :COMMIT (Final); ' ═══════════════════════════════════════════ ' TEARDOWN ' ═══════════════════════════════════════════ |Orchestrator| :Finalize SqliteExportService; note right Safely closes the DB connection. end note :Close log_ch; |Main| :spdlog::info "Pipeline complete in X ms"; :Join Log Worker; note right Drain guarantees no LogEntry is dropped at shutdown. end note stop @enduml