diff --git a/pipeline/diagrams/activity-diagram.svg b/pipeline/diagrams/activity-diagram.svg
deleted file mode 100644
index 0ec4f31..0000000
--- a/pipeline/diagrams/activity-diagram.svg
+++ /dev/null
@@ -1 +0,0 @@
-
\ No newline at end of file
diff --git a/pipeline/diagrams/class-diagram.svg b/pipeline/diagrams/class-diagram.svg
deleted file mode 100644
index b7cb713..0000000
--- a/pipeline/diagrams/class-diagram.svg
+++ /dev/null
@@ -1 +0,0 @@
-
\ No newline at end of file
diff --git a/pipeline/diagrams/future-possible-activity.puml b/pipeline/diagrams/future-possible-activity.puml
index 0c27701..9cca054 100644
--- a/pipeline/diagrams/future-possible-activity.puml
+++ b/pipeline/diagrams/future-possible-activity.puml
@@ -1,270 +1,277 @@
@startuml
skinparam style strictuml
skinparam defaultFontName "DM Sans"
-skinparam defaultFontSize 14
+skinparam defaultFontSize 13
skinparam titleFontName "Volkhov"
skinparam titleFontSize 20
skinparam backgroundColor #FAFCF9
skinparam defaultFontColor #28342A
skinparam titleFontColor #28342A
skinparam ArrowColor #628A5B
+skinparam SequenceLifeLineBorderColor #547461
+skinparam SequenceParticipantBorderColor #547461
+skinparam SequenceParticipantBackgroundColor #EAF0E8
+skinparam SequenceBoxBorderColor #547461
skinparam NoteBackgroundColor #EAF0E8
skinparam NoteBorderColor #547461
-skinparam ActivityBackgroundColor #FAFCF9
-skinparam ActivityBorderColor #547461
-skinparam ActivityDiamondBackgroundColor #FAFCF9
-skinparam ActivityDiamondBorderColor #628A5B
-skinparam ActivityBarColor #628A5B
-skinparam SwimlaneBorderColor #547461
-skinparam SwimlaneBorderThickness 0.3
+skinparam SequenceDividerBackgroundColor #EAF0E8
+skinparam SequenceDividerBorderColor #547461
-title The Biergarten Data Pipeline — Activity Diagram v3 (Mediator + Full Fixture Chain)
+title The Biergarten Data Pipeline — Sequence Diagram v4 (Unified Orchestrator)
-' ═════════════════════════════════════════════
+participant "main.cc" as main #F2F6F0
+participant "Orchestrator" as orch #EAF0E8
+participant "Thread U1\nUserProducer" as u1 #DCE8D8
+participant "Thread U2\nUserExportConsumer" as u2 #E0EAE0
+participant "Thread B1\nEnrichmentProducer" as b1 #DCE8D8
+participant "Thread B2\nBreweryGenerationConsumer" as b2 #E5EDE1
+participant "Thread B3\nBreweryExportConsumer" as b3 #E0EAE0
+participant "Thread R1\nBeerGenerationProducer" as r1 #DCE8D8
+participant "Thread R2\nBeerExportConsumer" as r2 #E0EAE0
+participant "Thread C1\nCheckinGenerationProducer" as c1 #DCE8D8
+participant "Thread C2\nCheckinExportConsumer" as c2 #E0EAE0
+participant "Thread G1\nRatingGenerationProducer" as g1 #DCE8D8
+participant "Thread G2\nRatingExportConsumer" as g2 #E0EAE0
+
+' ─────────────────────────────────────────────
' STARTUP
-' ═════════════════════════════════════════════
-|#F2F6F0|main.cc|
-start
-:ParseArguments(argc, argv);
-if (Valid?) then (no)
- :spdlog::error;
- stop
-else (yes)
-endif
-:Init CurlGlobalState & LlamaBackendState;
-:Build DI injector;
-note right
- DataGenerator and IExportService
- bound as shared_ptr — both the
- orchestrator and mediator hold
- a reference to the same instances.
-end note
-:Create BiergartenPipelineOrchestrator;
-:Create BiergartenPipelineMediator\n(shared generator, shared exporter,\nJCurveCheckinStrategy);
-:exporter->Initialize();
-:JsonLoader::LoadLocations("locations.json");
-:SamplingStrategy::Sample(all_locations);
-:BiergartenPipelineOrchestrator::Run();
+' ─────────────────────────────────────────────
+main -> main : ParseArguments(argc, argv)
+alt Invalid args
+ main -> main : spdlog::error; stop
+end
-' ═════════════════════════════════════════════
-' PHASE 1 — USER GENERATION
-' (independent, no FK dependencies)
-' ═════════════════════════════════════════════
-|#EAF0E8|Orchestrator — Phase 1: Users|
-note
- Users have no FK dependencies.
- Generated first so the full pool
- exists before checkin weights
- are assigned.
+main -> main : Init CurlGlobalState & LlamaBackendState
+main -> main : Build DI injector
+note right of main
+ All dependencies bound with unique_ptr.
+ LlamaConfig or RestConfig injected
+ instead of ApplicationOptions.
end note
-:Spawn Thread U1 — UserProducer(sampled_locations);
-:Spawn Thread U2 — UserExportConsumer();
-:Join U1, U2;
-:Collect user_pool : std::vector;
+main -> orch : exporter->Initialize()
+note right of orch
+ Opens SQLite connection.
+ Creates schema for all five fixture types
+ in one DDL pass. Begins IMMEDIATE TRANSACTION.
+end note
-|#DCE8D8|Thread U1 — UserProducer|
-while (For each Location?) is (remaining)
- :generator->GenerateUser(location);
- :user_channel_.Send(GeneratedUser);
-endwhile (done)
-:user_channel_.Close();
+main -> orch : JsonLoader::LoadLocations("locations.json")
+main -> orch : ISamplingStrategy::Sample(all_locations)
+main -> orch : BiergartenPipelineOrchestrator::Run()
-|#E0EAE0|Thread U2 — UserExportConsumer|
-while (user_channel_.Receive()?) is (available)
- :exporter->ProcessUser(user) : sqlite3_int64;
- note right
- Returns inserted row ID.
- Stored back into GeneratedUser.user_id
- so mediator pool carries live FKs.
+' ─────────────────────────────────────────────
+' PHASE 1 — USERS
+' ─────────────────────────────────────────────
+== Phase 1 — Users (no FK dependencies) ==
+
+orch -> u1 : spawn
+orch -> u2 : spawn
+
+loop For each Location
+ u1 -> u1 : generator->GenerateUser(location)
+ u1 -> u2 : user_channel_.Send(GeneratedUser)
+end
+u1 -> u2 : user_channel_.Close()
+
+loop user_channel_.Receive()
+ u2 -> u2 : exporter->ProcessUser(user) : sqlite3_int64
+ note right of u2
+ Returns committed row ID.
+ Stored on GeneratedUser.user_id.
end note
- :Append to user_pool;
-endwhile (nullopt)
+ u2 -> orch : Append to user_pool_
+end
-|#EAF0E8|Orchestrator — Phase 1: Users|
-:mediator->OnUsersComplete(user_pool);
-note right
- Mediator receives the full committed
- user pool. Sets users_ready_ = true.
- Calls TryOpenCheckinGate() —
- gate stays closed until breweries
- are also ready.
+orch -> orch : join(U1, U2)
+note right of orch
+ ── BARRIER 1 ──
+ user_pool_ is now fully committed.
+ Phase 2 begins.
end note
-' ═════════════════════════════════════════════
-' PHASE 2 — BREWERY GENERATION
-' (depends on locations, runs after Phase 1)
-' ═════════════════════════════════════════════
-|#EAF0E8|Orchestrator — Phase 2: Breweries|
-note
- Runs after Phase 1 completes.
- Could be parallelised with Phase 1
- in future — FK dependency is only
- on checkins, not on users directly.
+' ─────────────────────────────────────────────
+' PHASE 2 — BREWERIES
+' ─────────────────────────────────────────────
+== Phase 2 — Breweries (depends on locations only) ==
+
+orch -> b1 : spawn
+orch -> b2 : spawn
+orch -> b3 : spawn
+
+loop For each Location
+ b1 -> b1 : BreweryContextStrategy::QueriesFor(location)
+ b1 -> b1 : WikipediaService::GetLocationContext(location,\nbrewery_context_strategy_)
+ alt failure
+ b1 -> b1 : LocationContext{ Absent }
+ else truncated
+ b1 -> b1 : LocationContext{ Partial }
+ else success
+ b1 -> b1 : LocationContext{ Full }
+ end
+ b1 -> b2 : enrichment_channel_.Send(EnrichedCity)
+ note right of b1
+ Blocks if channel full.
+ Back-pressure against GPU consumer.
+ end note
+end
+b1 -> b2 : enrichment_channel_.Close()
+
+loop enrichment_channel_.Receive()
+ alt context.completeness == Absent
+ b2 -> b2 : spdlog::warn — proceeding with minimal prompt
+ end
+ b2 -> b2 : generator->GenerateBrewery(location, context)
+ b2 -> b3 : brewery_channel_.Send(GeneratedBrewery)
+end
+b2 -> b3 : brewery_channel_.Close()
+
+loop brewery_channel_.Receive()
+ b3 -> b3 : exporter->ProcessBrewery(brewery) : sqlite3_int64
+ note right of b3
+ Row ID stored on GeneratedBrewery.brewery_id.
+ No brewery_cache_ needed — orchestrator
+ threads the ID forward directly.
+ end note
+ b3 -> orch : Append to brewery_pool_
+end
+
+orch -> orch : join(B1, B2, B3)
+note right of orch
+ ── BARRIER 2 ──
+ brewery_pool_ is now fully committed
+ with live brewery_id values.
+ Phase 3 begins.
end note
-:Spawn Thread B1 — EnrichmentProducer(sampled_locations);
-:Spawn Thread B2 — BreweryGenerationConsumer();
-:Spawn Thread B3 — BreweryExportConsumer();
-:Join B1, B2, B3;
-:Collect brewery_pool : std::vector;
+' ─────────────────────────────────────────────
+' PHASE 3 — BEERS
+' ─────────────────────────────────────────────
+== Phase 3 — Beers (depends on brewery_pool_) ==
-|#DCE8D8|Thread B1 — EnrichmentProducer|
-while (For each Location?) is (remaining)
- :BreweryContextStrategy::QueriesFor(location);
- :WikipediaService::GetLocationContext\n(location, brewery_context_strategy_);
- if (failure?) then (yes)
- :LocationContext{ Absent };
- else if (truncated?) then (yes)
- :LocationContext{ Partial };
- else (no)
- :LocationContext{ Full };
- endif
- :enrichment_channel_.Send(EnrichedCity);
-endwhile (done)
-:enrichment_channel_.Close();
+orch -> r1 : spawn
+orch -> r2 : spawn
-|#E5EDE1|Thread B2 — BreweryGenerationConsumer|
-while (enrichment_channel_.Receive()?) is (available)
- :generator->GenerateBrewery(location, context);
- :brewery_channel_.Send(GeneratedBrewery);
-endwhile (nullopt)
-:brewery_channel_.Close();
+loop For each GeneratedBrewery in brewery_pool_
+ r1 -> r1 : BeerContextStrategy::QueriesFor(location)
+ r1 -> r1 : WikipediaService::GetLocationContext(location,\nbeer_context_strategy_)
+ r1 -> r1 : generator->GenerateBeer(brewery.brewery_id,\nlocation, context)
+ r1 -> r2 : beer_channel_.Send(GeneratedBeer)
+end
+r1 -> r2 : beer_channel_.Close()
-|#E0EAE0|Thread B3 — BreweryExportConsumer|
-while (brewery_channel_.Receive()?) is (available)
- :exporter->ProcessBrewery(brewery) : sqlite3_int64;
- :Append to brewery_pool;
-endwhile (nullopt)
+loop beer_channel_.Receive()
+ r2 -> r2 : exporter->ProcessBeer(beer) : sqlite3_int64
+ note right of r2
+ Row ID stored on GeneratedBeer.beer_id.
+ end note
+ r2 -> orch : Append to beer_pool_
+end
-|#EAF0E8|Orchestrator — Phase 2: Breweries|
-:mediator->OnBreweriesComplete(brewery_pool);
-note right
- Mediator sets breweries_ready_ = true.
- Calls TryOpenCheckinGate().
- Both flags now true — gate opens.
- Mediator spawns checkin stage
- asynchronously on its own thread.
+orch -> orch : join(R1, R2)
+note right of orch
+ ── BARRIER 3 ──
+ beer_pool_ is fully committed.
+ All three upstream pools ready.
end note
-' ═════════════════════════════════════════════
-' PHASE 3 — BEER GENERATION
-' (depends on breweries)
-' ═════════════════════════════════════════════
-|#EAF0E8|Orchestrator — Phase 3: Beers|
-:RunBeerPhase(brewery_pool);
+' ─────────────────────────────────────────────
+' CHECKIN WEIGHT ASSIGNMENT
+' ─────────────────────────────────────────────
+== Checkin Weight Assignment ==
-:Spawn Thread R1 — BeerGenerationProducer(brewery_pool);
-:Spawn Thread R2 — BeerExportConsumer();
-:Join R1, R2;
-:Collect beer_pool : std::vector;
-
-|#DCE8D8|Thread R1 — BeerGenerationProducer|
-while (For each GeneratedBrewery?) is (remaining)
- :BeerContextStrategy::QueriesFor(location);
- :WikipediaService::GetLocationContext\n(location, beer_context_strategy_);
- :generator->GenerateBeer(brewery_id, location, context);
- :beer_channel_.Send(GeneratedBeer);
-endwhile (done)
-:beer_channel_.Close();
-
-|#E0EAE0|Thread R2 — BeerExportConsumer|
-while (beer_channel_.Receive()?) is (available)
- :exporter->ProcessBeer(beer) : sqlite3_int64;
- :Append to beer_pool;
-endwhile (nullopt)
-
-|#EAF0E8|Orchestrator — Phase 3: Beers|
-:mediator->OnBeersComplete(beer_pool);
-note right
- Mediator stores beer_pool.
- Rating stage can now reference
- beer FKs. Ratings run after
- checkins complete (checkin_id FK).
-end note
-
-' ═════════════════════════════════════════════
-' MEDIATOR — CHECKIN GATE (triggered internally)
-' Runs concurrently with Phase 3
-' ═════════════════════════════════════════════
-|#F0F5EE|Mediator — Checkin Stage (gated)|
-note
- This stage was triggered by
- TryOpenCheckinGate() after both
- OnUsersComplete and OnBreweriesComplete
- fired. Runs concurrently with
- Phase 3 beer generation.
-end note
-
-:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_);
-note right
- J-curve weights assigned across
- the full user population before
- any checkins are generated.
+orch -> orch : ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_)
+note right of orch
+ J-curve weights written onto
+ GeneratedUser.user.activity_weight.
Small cohort gets high weight;
long tail gets low weight.
+ Requires the full pool — this is why
+ users were committed first.
end note
-while (For each GeneratedUser in pool?) is (remaining)
- :strategy->CheckinsForUser(user, brewery_count);
- while (For each checkin index?) is (remaining)
- :strategy->TimestampFor(user, index);
- note right
- Bursty weekend/evening
+' ─────────────────────────────────────────────
+' PHASE 4 — CHECKINS
+' ─────────────────────────────────────────────
+== Phase 4 — Check-ins (depends on user_pool_ + brewery_pool_) ==
+
+orch -> c1 : spawn
+orch -> c2 : spawn
+
+loop For each GeneratedUser in user_pool_
+ c1 -> c1 : strategy->CheckinsForUser(user,\nbrewery_pool_.size())
+ loop For each checkin index
+ c1 -> c1 : strategy->TimestampFor(user, index)
+ note right of c1
+ Bursty weekend / evening
distribution applied here.
end note
- :Select brewery from brewery_pool_\n(weighted random);
- :generator->GenerateCheckin(user, brewery, timestamp);
- :exporter->ProcessCheckin(checkin) : sqlite3_int64;
- :mediator->OnCheckinGenerated(checkin);
- endwhile (done)
-endwhile (done)
+ c1 -> c1 : Select brewery from brewery_pool_\n(weighted random by activity_weight)
+ c1 -> c1 : generator->GenerateCheckin(user, brewery, timestamp)
+ c1 -> c2 : checkin_channel_.Send(GeneratedCheckin)
+ end
+end
+c1 -> c2 : checkin_channel_.Close()
-:checkins_complete_ = true;
-note right
- Rating stage depends on checkin_id FK.
- RunRatingStage() is called here,
- after all checkins are committed.
+loop checkin_channel_.Receive()
+ c2 -> c2 : exporter->ProcessCheckin(checkin) : sqlite3_int64
+ note right of c2
+ Row ID stored on GeneratedCheckin.checkin_id.
+ end note
+ c2 -> orch : Append to checkin_pool_
+end
+
+orch -> orch : join(C1, C2)
+note right of orch
+ ── BARRIER 4 ──
+ checkin_pool_ is fully committed.
+ All FK dependencies for ratings satisfied.
+ Phase 5 begins.
end note
-' ═════════════════════════════════════════════
-' MEDIATOR — RATING STAGE
-' Runs after checkins complete
-' ═════════════════════════════════════════════
-|#F0F5EE|Mediator — Rating Stage|
-note
- Ratings reference user_id, beer_id,
- and checkin_id. All three pools
- are committed before this runs.
- Strong positive skew applied
- by RatingResult generation.
-end note
+' ─────────────────────────────────────────────
+' PHASE 5 — RATINGS
+' ─────────────────────────────────────────────
+== Phase 5 — Ratings (depends on user_pool_ + beer_pool_ + checkin_pool_) ==
-while (For each GeneratedCheckin?) is (remaining)
- if (Beer available for this brewery?) then (yes)
- :Select beer from beer_pool_\n(match brewery_id);
- :generator->GenerateRating(user, beer, checkin_id);
- :exporter->ProcessRating(rating);
- :mediator->OnRatingGenerated(rating);
- else (no)
- :Skip — no beers for this brewery yet;
- endif
-endwhile (done)
+orch -> g1 : spawn
+orch -> g2 : spawn
-' ═════════════════════════════════════════════
+loop For each GeneratedCheckin in checkin_pool_
+ g1 -> g1 : Resolve GeneratedUser from user_pool_\n(match user_id)
+ g1 -> g1 : Resolve GeneratedBeer from beer_pool_\n(match brewery_id, select one)
+ alt Beer found for this brewery
+ g1 -> g1 : generator->GenerateRating(user, beer,\ncheckin.checkin_id)
+ note right of g1
+ Strong positive skew applied
+ inside GenerateRating.
+ end note
+ g1 -> g2 : rating_channel_.Send(GeneratedRating)
+ else No beer found
+ g1 -> g1 : spdlog::warn — no beer for brewery,\nskipping rating
+ end
+end
+g1 -> g2 : rating_channel_.Close()
+
+loop rating_channel_.Receive()
+ g2 -> g2 : exporter->ProcessRating(rating)
+end
+
+orch -> orch : join(G1, G2)
+
+' ─────────────────────────────────────────────
' TEARDOWN
-' ═════════════════════════════════════════════
-|#F2F6F0|main.cc|
-:Await mediator completion;
-:exporter->Finalize();
-note right
- Single COMMIT covers all five
- fixture types: users, breweries,
- beers, checkins, ratings.
- All-or-nothing consistency.
+' ─────────────────────────────────────────────
+== Teardown ==
+
+orch -> main : return
+main -> main : exporter->Finalize()
+note right of main
+ Single COMMIT covers all five fixture types:
+ users, breweries, beers, checkins, ratings.
+ All-or-nothing consistency per run.
end note
-:spdlog::info "Pipeline complete";
-:return 0;
-stop
+main -> main : spdlog::info "Pipeline complete in X ms"
+main -> main : return 0
@enduml