mirror of
https://github.com/aaronpo97/the-biergarten-app.git
synced 2026-05-31 17:53:59 +00:00
361 lines
11 KiB
Plaintext
361 lines
11 KiB
Plaintext
@startuml biergarten_activity
|
|
!include ../biergarten-weizen-theme.puml
|
|
skinparam defaultFontSize 13
|
|
skinparam titleFontSize 20
|
|
|
|
title The Biergarten Data Pipeline — Activity Diagram
|
|
|
|
|Main|
|
|
start
|
|
:ParseArguments(argc, argv);
|
|
if (Invalid args?) then (yes)
|
|
:spdlog::error;
|
|
stop
|
|
else (no)
|
|
endif
|
|
:Init OpenSSL global state & LlamaBackendState;
|
|
:Build DI injector;
|
|
|
|
:Initialize SqliteExportService;
|
|
note right
|
|
Opens SQLite connection.
|
|
(Transactions are now managed
|
|
per-phase via batching).
|
|
end note
|
|
|
|
:Create BoundedChannel<LogEntry> log_ch;
|
|
:Spawn Log Worker thread;
|
|
note right
|
|
Log worker drains log_ch for the
|
|
entire pipeline lifetime.
|
|
All workers emit LogEntry structs
|
|
via PipelineLogger -- never spdlog directly.
|
|
end note
|
|
|
|
:BiergartenPipelineOrchestrator::Run();
|
|
|BiergartenPipelineOrchestrator::Run()|
|
|
|
|
fork
|
|
:JsonLoader::LoadBeerStyles("beer-styles.json");
|
|
:EnrichmentService::PreWarmBeerStyleCache(beer_styles);
|
|
fork again
|
|
:JsonLoader::LoadLocations("locations.json");
|
|
:EnrichmentService::PreWarmLocationCache(sampled_locations);
|
|
end fork
|
|
fork
|
|
:JsonLoader::LoadNamesByCountry("names-by-country.json");
|
|
fork again
|
|
:JsonLoader::LoadPersonas("personas.json");
|
|
end fork
|
|
|
|
' ═══════════════════════════════════════════
|
|
' PHASE 0 — USER GENERATION
|
|
' ═══════════════════════════════════════════
|
|
|Orchestrator|
|
|
:RunUserPhase(sampled_locations);
|
|
:Create BoundedChannels\n(loc_ch, exp_ch);
|
|
|
|
fork
|
|
|Orchestrator|
|
|
:Loop: Send Locations -> loc_ch;
|
|
:Close loc_ch;
|
|
note right
|
|
Producer closes loc_ch.
|
|
LLM Worker while loop
|
|
terminates on empty + closed.
|
|
end note
|
|
fork again
|
|
|LLM Worker|
|
|
while (loc_ch has items?) is (yes)
|
|
:Receive Location;
|
|
|
|
:GetLocationContextFromCache(location);
|
|
note right
|
|
Guaranteed cache hit from startup.
|
|
end note
|
|
|
|
:IPersonaSelectionStrategy::SelectPersona(\n personas_palette_);
|
|
note right
|
|
Guaranteed cache hit from startup.
|
|
Returns a Persona struct carrying
|
|
style_affinities, abv_range,
|
|
ibu_preference, checkin_weight.
|
|
end note
|
|
|
|
:NamesByCountry::SampleName(\n location.iso3166_1);
|
|
note right
|
|
Deterministic lookup -- no LLM involved.
|
|
Name selected from pre-keyed table
|
|
and passed into the generation prompt.
|
|
end note
|
|
|
|
:GenerateUser(enriched_city, persona, sampled_name)\nvia DataGenerator;
|
|
note right
|
|
LLM receives: EnrichedCity context + persona
|
|
description + sampled name. Generates
|
|
bio and preference signals grounded
|
|
in locale and persona.
|
|
end note
|
|
|
|
:PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "llm");
|
|
:Send GeneratedUser -> exp_ch;
|
|
endwhile (no)
|
|
:Close exp_ch;
|
|
note right
|
|
Producer closes exp_ch.
|
|
SQLite Worker while loop
|
|
terminates on empty + closed.
|
|
end note
|
|
fork again
|
|
|SQLite Worker|
|
|
:BEGIN TRANSACTION;
|
|
while (exp_ch has items?) is (yes)
|
|
:Receive GeneratedUser;
|
|
:ProcessUser(user);
|
|
:PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "sqlite");
|
|
:Append -> user_pool_;
|
|
if (Batch size reached?) then (yes)
|
|
:COMMIT & BEGIN;
|
|
else (no)
|
|
endif
|
|
endwhile (no)
|
|
:COMMIT (Final);
|
|
end fork
|
|
|
|
|Orchestrator|
|
|
:Join LLM Worker, SQLite Worker;
|
|
|
|
' ═══════════════════════════════════════════
|
|
' PHASE 1a — BREWERY GENERATION
|
|
' ═══════════════════════════════════════════
|
|
:RunBreweryPhase(sampled_locations);
|
|
:Create BoundedChannels\n(loc_ch, exp_ch);
|
|
|
|
fork
|
|
|Orchestrator|
|
|
:Loop: Sample User from user_pool_
|
|
and pair with Location;
|
|
:Send BreweryTask(Location, User) -> loc_ch;
|
|
:Close loc_ch;
|
|
fork again
|
|
|LLM Worker|
|
|
while (loc_ch has items?) is (yes)
|
|
:Receive BreweryTask(Location, User);
|
|
|
|
:GetLocationContextFromCache(task.location);
|
|
note right
|
|
Guaranteed cache hit from startup.
|
|
end note
|
|
|
|
:GenerateBrewery(enriched_city, context, task.user)\nvia DataGenerator;
|
|
note right
|
|
KV cache stays warm.
|
|
Brewery is linked to the sampled owner_user_id.
|
|
end note
|
|
:PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "llm");
|
|
:Send GeneratedBrewery -> exp_ch;
|
|
endwhile (no)
|
|
:Close exp_ch;
|
|
fork again
|
|
|SQLite Worker|
|
|
:BEGIN TRANSACTION;
|
|
while (exp_ch has items?) is (yes)
|
|
:Receive GeneratedBrewery;
|
|
:ProcessBrewery(brewery);
|
|
:PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "sqlite");
|
|
:Append -> brewery_pool_;
|
|
if (Batch size reached?) then (yes)
|
|
:COMMIT & BEGIN;
|
|
else (no)
|
|
endif
|
|
endwhile (no)
|
|
:COMMIT (Final);
|
|
end fork
|
|
|
|
|Orchestrator|
|
|
:Join LLM Worker, SQLite Worker;
|
|
note right
|
|
brewery_pool_ is now fully populated.
|
|
Phase 1b may begin.
|
|
end note
|
|
|
|
' ═══════════════════════════════════════════
|
|
' PHASE 1b — BEER GENERATION
|
|
' ═══════════════════════════════════════════
|
|
:RunBeerPhase();
|
|
:Create BoundedChannels\n(brew_ch, exp_ch);
|
|
|
|
fork
|
|
|Orchestrator|
|
|
:Loop: Send Breweries -> brew_ch;
|
|
:Close brew_ch;
|
|
fork again
|
|
|LLM Worker|
|
|
while (brew_ch has items?) is (yes)
|
|
:Receive GeneratedBrewery;
|
|
:IBeerSelectionStrategy::SelectStyles(\n brewery, beer_style_palette_);
|
|
|
|
while (For each selected BeerStyle?) is (remaining)
|
|
:GetStyleContextFromCache(style);
|
|
note right
|
|
Guaranteed cache hit from startup.
|
|
KV cache stays warm across all
|
|
beer generations -- system prompt
|
|
does not change within this phase.
|
|
end note
|
|
:GenerateBeer(brewery, style_context)\nvia DataGenerator;
|
|
:Attach GeneratedBeer to bundle;
|
|
endwhile (done)
|
|
|
|
:PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "llm");
|
|
:Send BeersBundle -> exp_ch;
|
|
endwhile (no)
|
|
:Close exp_ch;
|
|
fork again
|
|
|SQLite Worker|
|
|
:BEGIN TRANSACTION;
|
|
while (exp_ch has items?) is (yes)
|
|
:Receive BeersBundle;
|
|
while (For each beer in bundle?) is (remaining)
|
|
:Set beer.brewery_id from bundle;
|
|
:ProcessBeer(beer);
|
|
:Append -> beer_pool_;
|
|
endwhile (done)
|
|
:PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "sqlite");
|
|
if (Batch size reached?) then (yes)
|
|
:COMMIT & BEGIN;
|
|
else (no)
|
|
endif
|
|
endwhile (no)
|
|
:COMMIT (Final);
|
|
end fork
|
|
|
|
|Orchestrator|
|
|
:Join LLM Worker, SQLite Worker;
|
|
note right
|
|
Both brewery_pool_ and beer_pool_
|
|
are now completely populated.
|
|
Checkin and Follow phases may
|
|
now run in parallel.
|
|
end note
|
|
|
|
' ═══════════════════════════════════════════
|
|
' PHASE 2 — CHECKIN + FOLLOW GENERATION
|
|
' (parallel — both depend only on user_pool_
|
|
' and brewery_pool_ being fully populated)
|
|
' ═══════════════════════════════════════════
|
|
fork
|
|
|Orchestrator|
|
|
:RunCheckinPhase();
|
|
:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_);
|
|
note right
|
|
Weights seeded from each user's
|
|
persona.checkin_weight. J-curve profile
|
|
emerges from persona distribution.
|
|
end note
|
|
|
|
:BEGIN TRANSACTION;
|
|
while (For each GeneratedUser in user_pool_?) is (remaining)
|
|
:CheckinsForUser(user, brewery_pool_.size());
|
|
while (For each checkin index?) is (remaining)
|
|
:TimestampFor(user, index);
|
|
:Select brewery from brewery_pool_;
|
|
:GenerateCheckin(user, brewery, timestamp)\nvia DataGenerator;
|
|
:ProcessCheckin(checkin);
|
|
:PipelineLogger::Log(Info, CheckinGeneration,\n nullopt, checkin_id, "sqlite");
|
|
:Append -> checkin_pool_;
|
|
if (Batch size reached?) then (yes)
|
|
:COMMIT & BEGIN;
|
|
else (no)
|
|
endif
|
|
endwhile (done)
|
|
endwhile (done)
|
|
:COMMIT (Final);
|
|
|
|
fork again
|
|
|Orchestrator|
|
|
:RunFollowPhase();
|
|
:IFollowGenerationStrategy::\nAssignFollowWeights(user_pool_);
|
|
note right
|
|
For RandomFollowStrategy, weights
|
|
are uniform. For ActivityWeightedFollowStrategy,
|
|
weights derived from user.activity_weight
|
|
so high-activity users attract more followers.
|
|
end note
|
|
|
|
:BEGIN TRANSACTION;
|
|
:IFollowGenerationStrategy::\nGenerateFollows(user_pool_);
|
|
note right
|
|
Self-follow constraint (follower_id != followed_id)
|
|
enforced here and at the DB schema level.
|
|
end note
|
|
while (For each GeneratedFollow?) is (remaining)
|
|
:ProcessFollow(follow);
|
|
:PipelineLogger::Log(Info, FollowGeneration,\n nullopt, follower_id, "sqlite");
|
|
:Append -> follow_pool_;
|
|
if (Batch size reached?) then (yes)
|
|
:COMMIT & BEGIN;
|
|
else (no)
|
|
endif
|
|
endwhile (done)
|
|
:COMMIT (Final);
|
|
|
|
end fork
|
|
|
|
|Orchestrator|
|
|
:Join CheckinPhase, FollowPhase;
|
|
note right
|
|
checkin_pool_ and follow_pool_
|
|
are now fully populated.
|
|
Rating phase may begin.
|
|
end note
|
|
|
|
' ═══════════════════════════════════════════
|
|
' PHASE 3 — RATING GENERATION
|
|
' ═══════════════════════════════════════════
|
|
:RunRatingPhase();
|
|
note right
|
|
Beer selection biased by
|
|
user.persona.style_affinities and abv_range.
|
|
Rating skew modulated per persona.
|
|
end note
|
|
|
|
:BEGIN TRANSACTION;
|
|
while (For each GeneratedCheckin in checkin_pool_?) is (remaining)
|
|
:Match brewery_id, select beer from beer_pool_\n(same brewery_id, biased by persona affinities);
|
|
if (Beer exists for brewery?) then (yes)
|
|
:GenerateRating(user, beer, checkin_id)\nvia DataGenerator;
|
|
:ProcessRating(rating);
|
|
:PipelineLogger::Log(Info, RatingGeneration,\n nullopt, rating_id, "sqlite");
|
|
if (Batch size reached?) then (yes)
|
|
:COMMIT & BEGIN;
|
|
else (no)
|
|
endif
|
|
else (no)
|
|
:PipelineLogger::Log(Warn, RatingGeneration,\n nullopt, brewery_id, "sqlite");
|
|
:Skip -- brewery has no beers;
|
|
endif
|
|
endwhile (done)
|
|
:COMMIT (Final);
|
|
|
|
' ═══════════════════════════════════════════
|
|
' TEARDOWN
|
|
' ═══════════════════════════════════════════
|
|
|Orchestrator|
|
|
:Finalize SqliteExportService;
|
|
note right
|
|
Safely closes the DB connection.
|
|
end note
|
|
:Close log_ch;
|
|
|
|
|Main|
|
|
:spdlog::info "Pipeline complete in X ms";
|
|
:Join Log Worker;
|
|
note right
|
|
Drain guarantees no LogEntry is
|
|
dropped at shutdown.
|
|
end note
|
|
stop
|
|
|
|
@enduml
|