Files
the-biergarten-app/pipeline/diagrams/future-possible-activity.puml
Aaron Po aa46cf2b4b updates
2026-04-20 03:31:42 -04:00

278 lines
9.7 KiB
Plaintext

@startuml
skinparam style strictuml
skinparam defaultFontName "DM Sans"
skinparam defaultFontSize 13
skinparam titleFontName "Volkhov"
skinparam titleFontSize 20
skinparam backgroundColor #FAFCF9
skinparam defaultFontColor #28342A
skinparam titleFontColor #28342A
skinparam ArrowColor #628A5B
skinparam SequenceLifeLineBorderColor #547461
skinparam SequenceParticipantBorderColor #547461
skinparam SequenceParticipantBackgroundColor #EAF0E8
skinparam SequenceBoxBorderColor #547461
skinparam NoteBackgroundColor #EAF0E8
skinparam NoteBorderColor #547461
skinparam SequenceDividerBackgroundColor #EAF0E8
skinparam SequenceDividerBorderColor #547461
title The Biergarten Data Pipeline — Sequence Diagram v4 (Unified Orchestrator)
participant "main.cc" as main #F2F6F0
participant "Orchestrator" as orch #EAF0E8
participant "Thread U1\nUserProducer" as u1 #DCE8D8
participant "Thread U2\nUserExportConsumer" as u2 #E0EAE0
participant "Thread B1\nEnrichmentProducer" as b1 #DCE8D8
participant "Thread B2\nBreweryGenerationConsumer" as b2 #E5EDE1
participant "Thread B3\nBreweryExportConsumer" as b3 #E0EAE0
participant "Thread R1\nBeerGenerationProducer" as r1 #DCE8D8
participant "Thread R2\nBeerExportConsumer" as r2 #E0EAE0
participant "Thread C1\nCheckinGenerationProducer" as c1 #DCE8D8
participant "Thread C2\nCheckinExportConsumer" as c2 #E0EAE0
participant "Thread G1\nRatingGenerationProducer" as g1 #DCE8D8
participant "Thread G2\nRatingExportConsumer" as g2 #E0EAE0
' ─────────────────────────────────────────────
' STARTUP
' ─────────────────────────────────────────────
main -> main : ParseArguments(argc, argv)
alt Invalid args
main -> main : spdlog::error; stop
end
main -> main : Init CurlGlobalState & LlamaBackendState
main -> main : Build DI injector
note right of main
All dependencies bound with unique_ptr.
LlamaConfig or RestConfig injected
instead of ApplicationOptions.
end note
main -> orch : exporter->Initialize()
note right of orch
Opens SQLite connection.
Creates schema for all five fixture types
in one DDL pass. Begins IMMEDIATE TRANSACTION.
end note
main -> orch : JsonLoader::LoadLocations("locations.json")
main -> orch : ISamplingStrategy::Sample(all_locations)
main -> orch : BiergartenPipelineOrchestrator::Run()
' ─────────────────────────────────────────────
' PHASE 1 — USERS
' ─────────────────────────────────────────────
== Phase 1 — Users (no FK dependencies) ==
orch -> u1 : spawn
orch -> u2 : spawn
loop For each Location
u1 -> u1 : generator->GenerateUser(location)
u1 -> u2 : user_channel_.Send(GeneratedUser)
end
u1 -> u2 : user_channel_.Close()
loop user_channel_.Receive()
u2 -> u2 : exporter->ProcessUser(user) : sqlite3_int64
note right of u2
Returns committed row ID.
Stored on GeneratedUser.user_id.
end note
u2 -> orch : Append to user_pool_
end
orch -> orch : join(U1, U2)
note right of orch
── BARRIER 1 ──
user_pool_ is now fully committed.
Phase 2 begins.
end note
' ─────────────────────────────────────────────
' PHASE 2 — BREWERIES
' ─────────────────────────────────────────────
== Phase 2 — Breweries (depends on locations only) ==
orch -> b1 : spawn
orch -> b2 : spawn
orch -> b3 : spawn
loop For each Location
b1 -> b1 : BreweryContextStrategy::QueriesFor(location)
b1 -> b1 : WikipediaService::GetLocationContext(location,\nbrewery_context_strategy_)
alt failure
b1 -> b1 : LocationContext{ Absent }
else truncated
b1 -> b1 : LocationContext{ Partial }
else success
b1 -> b1 : LocationContext{ Full }
end
b1 -> b2 : enrichment_channel_.Send(EnrichedCity)
note right of b1
Blocks if channel full.
Back-pressure against GPU consumer.
end note
end
b1 -> b2 : enrichment_channel_.Close()
loop enrichment_channel_.Receive()
alt context.completeness == Absent
b2 -> b2 : spdlog::warn — proceeding with minimal prompt
end
b2 -> b2 : generator->GenerateBrewery(location, context)
b2 -> b3 : brewery_channel_.Send(GeneratedBrewery)
end
b2 -> b3 : brewery_channel_.Close()
loop brewery_channel_.Receive()
b3 -> b3 : exporter->ProcessBrewery(brewery) : sqlite3_int64
note right of b3
Row ID stored on GeneratedBrewery.brewery_id.
No brewery_cache_ needed — orchestrator
threads the ID forward directly.
end note
b3 -> orch : Append to brewery_pool_
end
orch -> orch : join(B1, B2, B3)
note right of orch
── BARRIER 2 ──
brewery_pool_ is now fully committed
with live brewery_id values.
Phase 3 begins.
end note
' ─────────────────────────────────────────────
' PHASE 3 — BEERS
' ─────────────────────────────────────────────
== Phase 3 — Beers (depends on brewery_pool_) ==
orch -> r1 : spawn
orch -> r2 : spawn
loop For each GeneratedBrewery in brewery_pool_
r1 -> r1 : BeerContextStrategy::QueriesFor(location)
r1 -> r1 : WikipediaService::GetLocationContext(location,\nbeer_context_strategy_)
r1 -> r1 : generator->GenerateBeer(brewery.brewery_id,\nlocation, context)
r1 -> r2 : beer_channel_.Send(GeneratedBeer)
end
r1 -> r2 : beer_channel_.Close()
loop beer_channel_.Receive()
r2 -> r2 : exporter->ProcessBeer(beer) : sqlite3_int64
note right of r2
Row ID stored on GeneratedBeer.beer_id.
end note
r2 -> orch : Append to beer_pool_
end
orch -> orch : join(R1, R2)
note right of orch
── BARRIER 3 ──
beer_pool_ is fully committed.
All three upstream pools ready.
end note
' ─────────────────────────────────────────────
' CHECKIN WEIGHT ASSIGNMENT
' ─────────────────────────────────────────────
== Checkin Weight Assignment ==
orch -> orch : ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_)
note right of orch
J-curve weights written onto
GeneratedUser.user.activity_weight.
Small cohort gets high weight;
long tail gets low weight.
Requires the full pool — this is why
users were committed first.
end note
' ─────────────────────────────────────────────
' PHASE 4 — CHECKINS
' ─────────────────────────────────────────────
== Phase 4 — Check-ins (depends on user_pool_ + brewery_pool_) ==
orch -> c1 : spawn
orch -> c2 : spawn
loop For each GeneratedUser in user_pool_
c1 -> c1 : strategy->CheckinsForUser(user,\nbrewery_pool_.size())
loop For each checkin index
c1 -> c1 : strategy->TimestampFor(user, index)
note right of c1
Bursty weekend / evening
distribution applied here.
end note
c1 -> c1 : Select brewery from brewery_pool_\n(weighted random by activity_weight)
c1 -> c1 : generator->GenerateCheckin(user, brewery, timestamp)
c1 -> c2 : checkin_channel_.Send(GeneratedCheckin)
end
end
c1 -> c2 : checkin_channel_.Close()
loop checkin_channel_.Receive()
c2 -> c2 : exporter->ProcessCheckin(checkin) : sqlite3_int64
note right of c2
Row ID stored on GeneratedCheckin.checkin_id.
end note
c2 -> orch : Append to checkin_pool_
end
orch -> orch : join(C1, C2)
note right of orch
── BARRIER 4 ──
checkin_pool_ is fully committed.
All FK dependencies for ratings satisfied.
Phase 5 begins.
end note
' ─────────────────────────────────────────────
' PHASE 5 — RATINGS
' ─────────────────────────────────────────────
== Phase 5 — Ratings (depends on user_pool_ + beer_pool_ + checkin_pool_) ==
orch -> g1 : spawn
orch -> g2 : spawn
loop For each GeneratedCheckin in checkin_pool_
g1 -> g1 : Resolve GeneratedUser from user_pool_\n(match user_id)
g1 -> g1 : Resolve GeneratedBeer from beer_pool_\n(match brewery_id, select one)
alt Beer found for this brewery
g1 -> g1 : generator->GenerateRating(user, beer,\ncheckin.checkin_id)
note right of g1
Strong positive skew applied
inside GenerateRating.
end note
g1 -> g2 : rating_channel_.Send(GeneratedRating)
else No beer found
g1 -> g1 : spdlog::warn — no beer for brewery,\nskipping rating
end
end
g1 -> g2 : rating_channel_.Close()
loop rating_channel_.Receive()
g2 -> g2 : exporter->ProcessRating(rating)
end
orch -> orch : join(G1, G2)
' ─────────────────────────────────────────────
' TEARDOWN
' ─────────────────────────────────────────────
== Teardown ==
orch -> main : return
main -> main : exporter->Finalize()
note right of main
Single COMMIT covers all five fixture types:
users, breweries, beers, checkins, ratings.
All-or-nothing consistency per run.
end note
main -> main : spdlog::info "Pipeline complete in X ms"
main -> main : return 0
@enduml