@startuml skinparam style strictuml skinparam defaultFontName "DM Sans" skinparam defaultFontSize 14 skinparam titleFontName "Volkhov" skinparam titleFontSize 20 skinparam backgroundColor #FAFCF9 skinparam defaultFontColor #28342A skinparam titleFontColor #28342A skinparam ArrowColor #628A5B skinparam NoteBackgroundColor #EAF0E8 skinparam NoteBorderColor #547461 skinparam ActivityBackgroundColor #FAFCF9 skinparam ActivityBorderColor #547461 skinparam ActivityDiamondBackgroundColor #FAFCF9 skinparam ActivityDiamondBorderColor #628A5B skinparam ActivityBarColor #628A5B skinparam SwimlaneBorderColor #547461 skinparam SwimlaneBorderThickness 0.3 title The Biergarten Data Pipeline — Activity Diagram v3 (Mediator + Full Fixture Chain) ' ═════════════════════════════════════════════ ' STARTUP ' ═════════════════════════════════════════════ |#F2F6F0|main.cc| start :ParseArguments(argc, argv); if (Valid?) then (no) :spdlog::error; stop else (yes) endif :Init CurlGlobalState & LlamaBackendState; :Build DI injector; note right DataGenerator and IExportService bound as shared_ptr — both the orchestrator and mediator hold a reference to the same instances. end note :Create BiergartenPipelineOrchestrator; :Create BiergartenPipelineMediator\n(shared generator, shared exporter,\nJCurveCheckinStrategy); :exporter->Initialize(); :JsonLoader::LoadLocations("locations.json"); :SamplingStrategy::Sample(all_locations); :BiergartenPipelineOrchestrator::Run(); ' ═════════════════════════════════════════════ ' PHASE 1 — USER GENERATION ' (independent, no FK dependencies) ' ═════════════════════════════════════════════ |#EAF0E8|Orchestrator — Phase 1: Users| note Users have no FK dependencies. Generated first so the full pool exists before checkin weights are assigned. end note :Spawn Thread U1 — UserProducer(sampled_locations); :Spawn Thread U2 — UserExportConsumer(); :Join U1, U2; :Collect user_pool : std::vector; |#DCE8D8|Thread U1 — UserProducer| while (For each Location?) is (remaining) :generator->GenerateUser(location); :user_channel_.Send(GeneratedUser); endwhile (done) :user_channel_.Close(); |#E0EAE0|Thread U2 — UserExportConsumer| while (user_channel_.Receive()?) is (available) :exporter->ProcessUser(user) : sqlite3_int64; note right Returns inserted row ID. Stored back into GeneratedUser.user_id so mediator pool carries live FKs. end note :Append to user_pool; endwhile (nullopt) |#EAF0E8|Orchestrator — Phase 1: Users| :mediator->OnUsersComplete(user_pool); note right Mediator receives the full committed user pool. Sets users_ready_ = true. Calls TryOpenCheckinGate() — gate stays closed until breweries are also ready. end note ' ═════════════════════════════════════════════ ' PHASE 2 — BREWERY GENERATION ' (depends on locations, runs after Phase 1) ' ═════════════════════════════════════════════ |#EAF0E8|Orchestrator — Phase 2: Breweries| note Runs after Phase 1 completes. Could be parallelised with Phase 1 in future — FK dependency is only on checkins, not on users directly. end note :Spawn Thread B1 — EnrichmentProducer(sampled_locations); :Spawn Thread B2 — BreweryGenerationConsumer(); :Spawn Thread B3 — BreweryExportConsumer(); :Join B1, B2, B3; :Collect brewery_pool : std::vector; |#DCE8D8|Thread B1 — EnrichmentProducer| while (For each Location?) is (remaining) :BreweryContextStrategy::QueriesFor(location); :WikipediaService::GetLocationContext\n(location, brewery_context_strategy_); if (failure?) then (yes) :LocationContext{ Absent }; else if (truncated?) then (yes) :LocationContext{ Partial }; else (no) :LocationContext{ Full }; endif :enrichment_channel_.Send(EnrichedCity); endwhile (done) :enrichment_channel_.Close(); |#E5EDE1|Thread B2 — BreweryGenerationConsumer| while (enrichment_channel_.Receive()?) is (available) :generator->GenerateBrewery(location, context); :brewery_channel_.Send(GeneratedBrewery); endwhile (nullopt) :brewery_channel_.Close(); |#E0EAE0|Thread B3 — BreweryExportConsumer| while (brewery_channel_.Receive()?) is (available) :exporter->ProcessBrewery(brewery) : sqlite3_int64; :Append to brewery_pool; endwhile (nullopt) |#EAF0E8|Orchestrator — Phase 2: Breweries| :mediator->OnBreweriesComplete(brewery_pool); note right Mediator sets breweries_ready_ = true. Calls TryOpenCheckinGate(). Both flags now true — gate opens. Mediator spawns checkin stage asynchronously on its own thread. end note ' ═════════════════════════════════════════════ ' PHASE 3 — BEER GENERATION ' (depends on breweries) ' ═════════════════════════════════════════════ |#EAF0E8|Orchestrator — Phase 3: Beers| :RunBeerPhase(brewery_pool); :Spawn Thread R1 — BeerGenerationProducer(brewery_pool); :Spawn Thread R2 — BeerExportConsumer(); :Join R1, R2; :Collect beer_pool : std::vector; |#DCE8D8|Thread R1 — BeerGenerationProducer| while (For each GeneratedBrewery?) is (remaining) :BeerContextStrategy::QueriesFor(location); :WikipediaService::GetLocationContext\n(location, beer_context_strategy_); :generator->GenerateBeer(brewery_id, location, context); :beer_channel_.Send(GeneratedBeer); endwhile (done) :beer_channel_.Close(); |#E0EAE0|Thread R2 — BeerExportConsumer| while (beer_channel_.Receive()?) is (available) :exporter->ProcessBeer(beer) : sqlite3_int64; :Append to beer_pool; endwhile (nullopt) |#EAF0E8|Orchestrator — Phase 3: Beers| :mediator->OnBeersComplete(beer_pool); note right Mediator stores beer_pool. Rating stage can now reference beer FKs. Ratings run after checkins complete (checkin_id FK). end note ' ═════════════════════════════════════════════ ' MEDIATOR — CHECKIN GATE (triggered internally) ' Runs concurrently with Phase 3 ' ═════════════════════════════════════════════ |#F0F5EE|Mediator — Checkin Stage (gated)| note This stage was triggered by TryOpenCheckinGate() after both OnUsersComplete and OnBreweriesComplete fired. Runs concurrently with Phase 3 beer generation. end note :ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_); note right J-curve weights assigned across the full user population before any checkins are generated. Small cohort gets high weight; long tail gets low weight. end note while (For each GeneratedUser in pool?) is (remaining) :strategy->CheckinsForUser(user, brewery_count); while (For each checkin index?) is (remaining) :strategy->TimestampFor(user, index); note right Bursty weekend/evening distribution applied here. end note :Select brewery from brewery_pool_\n(weighted random); :generator->GenerateCheckin(user, brewery, timestamp); :exporter->ProcessCheckin(checkin) : sqlite3_int64; :mediator->OnCheckinGenerated(checkin); endwhile (done) endwhile (done) :checkins_complete_ = true; note right Rating stage depends on checkin_id FK. RunRatingStage() is called here, after all checkins are committed. end note ' ═════════════════════════════════════════════ ' MEDIATOR — RATING STAGE ' Runs after checkins complete ' ═════════════════════════════════════════════ |#F0F5EE|Mediator — Rating Stage| note Ratings reference user_id, beer_id, and checkin_id. All three pools are committed before this runs. Strong positive skew applied by RatingResult generation. end note while (For each GeneratedCheckin?) is (remaining) if (Beer available for this brewery?) then (yes) :Select beer from beer_pool_\n(match brewery_id); :generator->GenerateRating(user, beer, checkin_id); :exporter->ProcessRating(rating); :mediator->OnRatingGenerated(rating); else (no) :Skip — no beers for this brewery yet; endif endwhile (done) ' ═════════════════════════════════════════════ ' TEARDOWN ' ═════════════════════════════════════════════ |#F2F6F0|main.cc| :Await mediator completion; :exporter->Finalize(); note right Single COMMIT covers all five fixture types: users, breweries, beers, checkins, ratings. All-or-nothing consistency. end note :spdlog::info "Pipeline complete"; :return 0; stop @enduml