This commit is contained in:
Aaron Po
2026-04-20 03:31:42 -04:00
parent fd6ba35f68
commit aa46cf2b4b
3 changed files with 230 additions and 225 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,270 +1,277 @@
@startuml @startuml
skinparam style strictuml skinparam style strictuml
skinparam defaultFontName "DM Sans" skinparam defaultFontName "DM Sans"
skinparam defaultFontSize 14 skinparam defaultFontSize 13
skinparam titleFontName "Volkhov" skinparam titleFontName "Volkhov"
skinparam titleFontSize 20 skinparam titleFontSize 20
skinparam backgroundColor #FAFCF9 skinparam backgroundColor #FAFCF9
skinparam defaultFontColor #28342A skinparam defaultFontColor #28342A
skinparam titleFontColor #28342A skinparam titleFontColor #28342A
skinparam ArrowColor #628A5B skinparam ArrowColor #628A5B
skinparam SequenceLifeLineBorderColor #547461
skinparam SequenceParticipantBorderColor #547461
skinparam SequenceParticipantBackgroundColor #EAF0E8
skinparam SequenceBoxBorderColor #547461
skinparam NoteBackgroundColor #EAF0E8 skinparam NoteBackgroundColor #EAF0E8
skinparam NoteBorderColor #547461 skinparam NoteBorderColor #547461
skinparam ActivityBackgroundColor #FAFCF9 skinparam SequenceDividerBackgroundColor #EAF0E8
skinparam ActivityBorderColor #547461 skinparam SequenceDividerBorderColor #547461
skinparam ActivityDiamondBackgroundColor #FAFCF9
skinparam ActivityDiamondBorderColor #628A5B
skinparam ActivityBarColor #628A5B
skinparam SwimlaneBorderColor #547461
skinparam SwimlaneBorderThickness 0.3
title The Biergarten Data Pipeline — Activity Diagram v3 (Mediator + Full Fixture Chain) title The Biergarten Data Pipeline — Sequence Diagram v4 (Unified Orchestrator)
' ═════════════════════════════════════════════ participant "main.cc" as main #F2F6F0
participant "Orchestrator" as orch #EAF0E8
participant "Thread U1\nUserProducer" as u1 #DCE8D8
participant "Thread U2\nUserExportConsumer" as u2 #E0EAE0
participant "Thread B1\nEnrichmentProducer" as b1 #DCE8D8
participant "Thread B2\nBreweryGenerationConsumer" as b2 #E5EDE1
participant "Thread B3\nBreweryExportConsumer" as b3 #E0EAE0
participant "Thread R1\nBeerGenerationProducer" as r1 #DCE8D8
participant "Thread R2\nBeerExportConsumer" as r2 #E0EAE0
participant "Thread C1\nCheckinGenerationProducer" as c1 #DCE8D8
participant "Thread C2\nCheckinExportConsumer" as c2 #E0EAE0
participant "Thread G1\nRatingGenerationProducer" as g1 #DCE8D8
participant "Thread G2\nRatingExportConsumer" as g2 #E0EAE0
' ─────────────────────────────────────────────
' STARTUP ' STARTUP
' ═════════════════════════════════════════════ ' ─────────────────────────────────────────────
|#F2F6F0|main.cc| main -> main : ParseArguments(argc, argv)
start alt Invalid args
:ParseArguments(argc, argv); main -> main : spdlog::error; stop
if (Valid?) then (no) end
:spdlog::error;
stop
else (yes)
endif
:Init CurlGlobalState & LlamaBackendState;
:Build DI injector;
note right
DataGenerator and IExportService
bound as shared_ptr — both the
orchestrator and mediator hold
a reference to the same instances.
end note
:Create BiergartenPipelineOrchestrator;
:Create BiergartenPipelineMediator\n(shared generator, shared exporter,\nJCurveCheckinStrategy);
:exporter->Initialize();
:JsonLoader::LoadLocations("locations.json");
:SamplingStrategy::Sample(all_locations);
:BiergartenPipelineOrchestrator::Run();
' ═════════════════════════════════════════════ main -> main : Init CurlGlobalState & LlamaBackendState
' PHASE 1 — USER GENERATION main -> main : Build DI injector
' (independent, no FK dependencies) note right of main
' ═════════════════════════════════════════════ All dependencies bound with unique_ptr.
|#EAF0E8|Orchestrator — Phase 1: Users| LlamaConfig or RestConfig injected
note instead of ApplicationOptions.
Users have no FK dependencies.
Generated first so the full pool
exists before checkin weights
are assigned.
end note end note
:Spawn Thread U1 — UserProducer(sampled_locations); main -> orch : exporter->Initialize()
:Spawn Thread U2 — UserExportConsumer(); note right of orch
:Join U1, U2; Opens SQLite connection.
:Collect user_pool : std::vector<GeneratedUser>; Creates schema for all five fixture types
in one DDL pass. Begins IMMEDIATE TRANSACTION.
|#DCE8D8|Thread U1 — UserProducer|
while (For each Location?) is (remaining)
:generator->GenerateUser(location);
:user_channel_.Send(GeneratedUser);
endwhile (done)
:user_channel_.Close();
|#E0EAE0|Thread U2 — UserExportConsumer|
while (user_channel_.Receive()?) is (available)
:exporter->ProcessUser(user) : sqlite3_int64;
note right
Returns inserted row ID.
Stored back into GeneratedUser.user_id
so mediator pool carries live FKs.
end note
:Append to user_pool;
endwhile (nullopt)
|#EAF0E8|Orchestrator — Phase 1: Users|
:mediator->OnUsersComplete(user_pool);
note right
Mediator receives the full committed
user pool. Sets users_ready_ = true.
Calls TryOpenCheckinGate() —
gate stays closed until breweries
are also ready.
end note end note
' ═════════════════════════════════════════════ main -> orch : JsonLoader::LoadLocations("locations.json")
' PHASE 2 — BREWERY GENERATION main -> orch : ISamplingStrategy::Sample(all_locations)
' (depends on locations, runs after Phase 1) main -> orch : BiergartenPipelineOrchestrator::Run()
' ═════════════════════════════════════════════
|#EAF0E8|Orchestrator — Phase 2: Breweries| ' ─────────────────────────────────────────────
note ' PHASE 1 — USERS
Runs after Phase 1 completes. ' ─────────────────────────────────────────────
Could be parallelised with Phase 1 == Phase 1 — Users (no FK dependencies) ==
in future — FK dependency is only
on checkins, not on users directly. orch -> u1 : spawn
orch -> u2 : spawn
loop For each Location
u1 -> u1 : generator->GenerateUser(location)
u1 -> u2 : user_channel_.Send(GeneratedUser)
end
u1 -> u2 : user_channel_.Close()
loop user_channel_.Receive()
u2 -> u2 : exporter->ProcessUser(user) : sqlite3_int64
note right of u2
Returns committed row ID.
Stored on GeneratedUser.user_id.
end note
u2 -> orch : Append to user_pool_
end
orch -> orch : join(U1, U2)
note right of orch
── BARRIER 1 ──
user_pool_ is now fully committed.
Phase 2 begins.
end note end note
:Spawn Thread B1 — EnrichmentProducer(sampled_locations); ' ─────────────────────────────────────────────
:Spawn Thread B2 — BreweryGenerationConsumer(); ' PHASE 2 — BREWERIES
:Spawn Thread B3 — BreweryExportConsumer(); ' ─────────────────────────────────────────────
:Join B1, B2, B3; == Phase 2 — Breweries (depends on locations only) ==
:Collect brewery_pool : std::vector<GeneratedBrewery>;
|#DCE8D8|Thread B1 — EnrichmentProducer| orch -> b1 : spawn
while (For each Location?) is (remaining) orch -> b2 : spawn
:BreweryContextStrategy::QueriesFor(location); orch -> b3 : spawn
:WikipediaService::GetLocationContext\n(location, brewery_context_strategy_);
if (failure?) then (yes)
:LocationContext{ Absent };
else if (truncated?) then (yes)
:LocationContext{ Partial };
else (no)
:LocationContext{ Full };
endif
:enrichment_channel_.Send(EnrichedCity);
endwhile (done)
:enrichment_channel_.Close();
|#E5EDE1|Thread B2 — BreweryGenerationConsumer| loop For each Location
while (enrichment_channel_.Receive()?) is (available) b1 -> b1 : BreweryContextStrategy::QueriesFor(location)
:generator->GenerateBrewery(location, context); b1 -> b1 : WikipediaService::GetLocationContext(location,\nbrewery_context_strategy_)
:brewery_channel_.Send(GeneratedBrewery); alt failure
endwhile (nullopt) b1 -> b1 : LocationContext{ Absent }
:brewery_channel_.Close(); else truncated
b1 -> b1 : LocationContext{ Partial }
else success
b1 -> b1 : LocationContext{ Full }
end
b1 -> b2 : enrichment_channel_.Send(EnrichedCity)
note right of b1
Blocks if channel full.
Back-pressure against GPU consumer.
end note
end
b1 -> b2 : enrichment_channel_.Close()
|#E0EAE0|Thread B3 — BreweryExportConsumer| loop enrichment_channel_.Receive()
while (brewery_channel_.Receive()?) is (available) alt context.completeness == Absent
:exporter->ProcessBrewery(brewery) : sqlite3_int64; b2 -> b2 : spdlog::warn — proceeding with minimal prompt
:Append to brewery_pool; end
endwhile (nullopt) b2 -> b2 : generator->GenerateBrewery(location, context)
b2 -> b3 : brewery_channel_.Send(GeneratedBrewery)
end
b2 -> b3 : brewery_channel_.Close()
|#EAF0E8|Orchestrator — Phase 2: Breweries| loop brewery_channel_.Receive()
:mediator->OnBreweriesComplete(brewery_pool); b3 -> b3 : exporter->ProcessBrewery(brewery) : sqlite3_int64
note right note right of b3
Mediator sets breweries_ready_ = true. Row ID stored on GeneratedBrewery.brewery_id.
Calls TryOpenCheckinGate(). No brewery_cache_ needed — orchestrator
Both flags now true — gate opens. threads the ID forward directly.
Mediator spawns checkin stage end note
asynchronously on its own thread. b3 -> orch : Append to brewery_pool_
end
orch -> orch : join(B1, B2, B3)
note right of orch
── BARRIER 2 ──
brewery_pool_ is now fully committed
with live brewery_id values.
Phase 3 begins.
end note end note
' ═════════════════════════════════════════════ ' ─────────────────────────────────────────────
' PHASE 3 — BEER GENERATION ' PHASE 3 — BEERS
' (depends on breweries) ' ─────────────────────────────────────────────
' ═════════════════════════════════════════════ == Phase 3 — Beers (depends on brewery_pool_) ==
|#EAF0E8|Orchestrator — Phase 3: Beers|
:RunBeerPhase(brewery_pool);
:Spawn Thread R1 — BeerGenerationProducer(brewery_pool); orch -> r1 : spawn
:Spawn Thread R2 — BeerExportConsumer(); orch -> r2 : spawn
:Join R1, R2;
:Collect beer_pool : std::vector<GeneratedBeer>;
|#DCE8D8|Thread R1 — BeerGenerationProducer| loop For each GeneratedBrewery in brewery_pool_
while (For each GeneratedBrewery?) is (remaining) r1 -> r1 : BeerContextStrategy::QueriesFor(location)
:BeerContextStrategy::QueriesFor(location); r1 -> r1 : WikipediaService::GetLocationContext(location,\nbeer_context_strategy_)
:WikipediaService::GetLocationContext\n(location, beer_context_strategy_); r1 -> r1 : generator->GenerateBeer(brewery.brewery_id,\nlocation, context)
:generator->GenerateBeer(brewery_id, location, context); r1 -> r2 : beer_channel_.Send(GeneratedBeer)
:beer_channel_.Send(GeneratedBeer); end
endwhile (done) r1 -> r2 : beer_channel_.Close()
:beer_channel_.Close();
|#E0EAE0|Thread R2 — BeerExportConsumer| loop beer_channel_.Receive()
while (beer_channel_.Receive()?) is (available) r2 -> r2 : exporter->ProcessBeer(beer) : sqlite3_int64
:exporter->ProcessBeer(beer) : sqlite3_int64; note right of r2
:Append to beer_pool; Row ID stored on GeneratedBeer.beer_id.
endwhile (nullopt) end note
r2 -> orch : Append to beer_pool_
end
|#EAF0E8|Orchestrator — Phase 3: Beers| orch -> orch : join(R1, R2)
:mediator->OnBeersComplete(beer_pool); note right of orch
note right ── BARRIER 3 ──
Mediator stores beer_pool. beer_pool_ is fully committed.
Rating stage can now reference All three upstream pools ready.
beer FKs. Ratings run after
checkins complete (checkin_id FK).
end note end note
' ═════════════════════════════════════════════ ' ─────────────────────────────────────────────
' MEDIATOR — CHECKIN GATE (triggered internally) ' CHECKIN WEIGHT ASSIGNMENT
' Runs concurrently with Phase 3 ' ─────────────────────────────────────────────
' ═════════════════════════════════════════════ == Checkin Weight Assignment ==
|#F0F5EE|Mediator — Checkin Stage (gated)|
note
This stage was triggered by
TryOpenCheckinGate() after both
OnUsersComplete and OnBreweriesComplete
fired. Runs concurrently with
Phase 3 beer generation.
end note
:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_); orch -> orch : ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_)
note right note right of orch
J-curve weights assigned across J-curve weights written onto
the full user population before GeneratedUser.user.activity_weight.
any checkins are generated.
Small cohort gets high weight; Small cohort gets high weight;
long tail gets low weight. long tail gets low weight.
Requires the full pool — this is why
users were committed first.
end note end note
while (For each GeneratedUser in pool?) is (remaining) ' ─────────────────────────────────────────────
:strategy->CheckinsForUser(user, brewery_count); ' PHASE 4 — CHECKINS
while (For each checkin index?) is (remaining) ' ─────────────────────────────────────────────
:strategy->TimestampFor(user, index); == Phase 4 — Check-ins (depends on user_pool_ + brewery_pool_) ==
note right
orch -> c1 : spawn
orch -> c2 : spawn
loop For each GeneratedUser in user_pool_
c1 -> c1 : strategy->CheckinsForUser(user,\nbrewery_pool_.size())
loop For each checkin index
c1 -> c1 : strategy->TimestampFor(user, index)
note right of c1
Bursty weekend / evening Bursty weekend / evening
distribution applied here. distribution applied here.
end note end note
:Select brewery from brewery_pool_\n(weighted random); c1 -> c1 : Select brewery from brewery_pool_\n(weighted random by activity_weight)
:generator->GenerateCheckin(user, brewery, timestamp); c1 -> c1 : generator->GenerateCheckin(user, brewery, timestamp)
:exporter->ProcessCheckin(checkin) : sqlite3_int64; c1 -> c2 : checkin_channel_.Send(GeneratedCheckin)
:mediator->OnCheckinGenerated(checkin); end
endwhile (done) end
endwhile (done) c1 -> c2 : checkin_channel_.Close()
:checkins_complete_ = true; loop checkin_channel_.Receive()
note right c2 -> c2 : exporter->ProcessCheckin(checkin) : sqlite3_int64
Rating stage depends on checkin_id FK. note right of c2
RunRatingStage() is called here, Row ID stored on GeneratedCheckin.checkin_id.
after all checkins are committed. end note
c2 -> orch : Append to checkin_pool_
end
orch -> orch : join(C1, C2)
note right of orch
── BARRIER 4 ──
checkin_pool_ is fully committed.
All FK dependencies for ratings satisfied.
Phase 5 begins.
end note end note
' ═════════════════════════════════════════════ ' ─────────────────────────────────────────────
' MEDIATOR — RATING STAGE ' PHASE 5 — RATINGS
' Runs after checkins complete ' ─────────────────────────────────────────────
' ═════════════════════════════════════════════ == Phase 5 — Ratings (depends on user_pool_ + beer_pool_ + checkin_pool_) ==
|#F0F5EE|Mediator — Rating Stage|
note orch -> g1 : spawn
Ratings reference user_id, beer_id, orch -> g2 : spawn
and checkin_id. All three pools
are committed before this runs. loop For each GeneratedCheckin in checkin_pool_
g1 -> g1 : Resolve GeneratedUser from user_pool_\n(match user_id)
g1 -> g1 : Resolve GeneratedBeer from beer_pool_\n(match brewery_id, select one)
alt Beer found for this brewery
g1 -> g1 : generator->GenerateRating(user, beer,\ncheckin.checkin_id)
note right of g1
Strong positive skew applied Strong positive skew applied
by RatingResult generation. inside GenerateRating.
end note end note
g1 -> g2 : rating_channel_.Send(GeneratedRating)
else No beer found
g1 -> g1 : spdlog::warn — no beer for brewery,\nskipping rating
end
end
g1 -> g2 : rating_channel_.Close()
while (For each GeneratedCheckin?) is (remaining) loop rating_channel_.Receive()
if (Beer available for this brewery?) then (yes) g2 -> g2 : exporter->ProcessRating(rating)
:Select beer from beer_pool_\n(match brewery_id); end
:generator->GenerateRating(user, beer, checkin_id);
:exporter->ProcessRating(rating);
:mediator->OnRatingGenerated(rating);
else (no)
:Skip — no beers for this brewery yet;
endif
endwhile (done)
' ═════════════════════════════════════════════ orch -> orch : join(G1, G2)
' ─────────────────────────────────────────────
' TEARDOWN ' TEARDOWN
' ═════════════════════════════════════════════ ' ─────────────────────────────────────────────
|#F2F6F0|main.cc| == Teardown ==
:Await mediator completion;
:exporter->Finalize(); orch -> main : return
note right main -> main : exporter->Finalize()
Single COMMIT covers all five note right of main
fixture types: users, breweries, Single COMMIT covers all five fixture types:
beers, checkins, ratings. users, breweries, beers, checkins, ratings.
All-or-nothing consistency. All-or-nothing consistency per run.
end note end note
:spdlog::info "Pipeline complete"; main -> main : spdlog::info "Pipeline complete in X ms"
:return 0; main -> main : return 0
stop
@enduml @enduml