mirror of
https://github.com/aaronpo97/the-biergarten-app.git
synced 2026-06-01 01:54:00 +00:00
Compare commits
2 Commits
fd6ba35f68
...
532cb234fa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
532cb234fa | ||
|
|
aa46cf2b4b |
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -1,270 +1,277 @@
|
||||
@startuml
|
||||
skinparam style strictuml
|
||||
skinparam defaultFontName "DM Sans"
|
||||
skinparam defaultFontSize 14
|
||||
skinparam defaultFontSize 13
|
||||
skinparam titleFontName "Volkhov"
|
||||
skinparam titleFontSize 20
|
||||
skinparam backgroundColor #FAFCF9
|
||||
skinparam defaultFontColor #28342A
|
||||
skinparam titleFontColor #28342A
|
||||
skinparam ArrowColor #628A5B
|
||||
skinparam SequenceLifeLineBorderColor #547461
|
||||
skinparam SequenceParticipantBorderColor #547461
|
||||
skinparam SequenceParticipantBackgroundColor #EAF0E8
|
||||
skinparam SequenceBoxBorderColor #547461
|
||||
skinparam NoteBackgroundColor #EAF0E8
|
||||
skinparam NoteBorderColor #547461
|
||||
skinparam ActivityBackgroundColor #FAFCF9
|
||||
skinparam ActivityBorderColor #547461
|
||||
skinparam ActivityDiamondBackgroundColor #FAFCF9
|
||||
skinparam ActivityDiamondBorderColor #628A5B
|
||||
skinparam ActivityBarColor #628A5B
|
||||
skinparam SwimlaneBorderColor #547461
|
||||
skinparam SwimlaneBorderThickness 0.3
|
||||
skinparam SequenceDividerBackgroundColor #EAF0E8
|
||||
skinparam SequenceDividerBorderColor #547461
|
||||
|
||||
title The Biergarten Data Pipeline — Activity Diagram v3 (Mediator + Full Fixture Chain)
|
||||
title The Biergarten Data Pipeline — Sequence Diagram v4 (Unified Orchestrator)
|
||||
|
||||
' ═════════════════════════════════════════════
|
||||
participant "main.cc" as main #F2F6F0
|
||||
participant "Orchestrator" as orch #EAF0E8
|
||||
participant "Thread U1\nUserProducer" as u1 #DCE8D8
|
||||
participant "Thread U2\nUserExportConsumer" as u2 #E0EAE0
|
||||
participant "Thread B1\nEnrichmentProducer" as b1 #DCE8D8
|
||||
participant "Thread B2\nBreweryGenerationConsumer" as b2 #E5EDE1
|
||||
participant "Thread B3\nBreweryExportConsumer" as b3 #E0EAE0
|
||||
participant "Thread R1\nBeerGenerationProducer" as r1 #DCE8D8
|
||||
participant "Thread R2\nBeerExportConsumer" as r2 #E0EAE0
|
||||
participant "Thread C1\nCheckinGenerationProducer" as c1 #DCE8D8
|
||||
participant "Thread C2\nCheckinExportConsumer" as c2 #E0EAE0
|
||||
participant "Thread G1\nRatingGenerationProducer" as g1 #DCE8D8
|
||||
participant "Thread G2\nRatingExportConsumer" as g2 #E0EAE0
|
||||
|
||||
' ─────────────────────────────────────────────
|
||||
' STARTUP
|
||||
' ═════════════════════════════════════════════
|
||||
|#F2F6F0|main.cc|
|
||||
start
|
||||
:ParseArguments(argc, argv);
|
||||
if (Valid?) then (no)
|
||||
:spdlog::error;
|
||||
stop
|
||||
else (yes)
|
||||
endif
|
||||
:Init CurlGlobalState & LlamaBackendState;
|
||||
:Build DI injector;
|
||||
note right
|
||||
DataGenerator and IExportService
|
||||
bound as shared_ptr — both the
|
||||
orchestrator and mediator hold
|
||||
a reference to the same instances.
|
||||
end note
|
||||
:Create BiergartenPipelineOrchestrator;
|
||||
:Create BiergartenPipelineMediator\n(shared generator, shared exporter,\nJCurveCheckinStrategy);
|
||||
:exporter->Initialize();
|
||||
:JsonLoader::LoadLocations("locations.json");
|
||||
:SamplingStrategy::Sample(all_locations);
|
||||
:BiergartenPipelineOrchestrator::Run();
|
||||
' ─────────────────────────────────────────────
|
||||
main -> main : ParseArguments(argc, argv)
|
||||
alt Invalid args
|
||||
main -> main : spdlog::error; stop
|
||||
end
|
||||
|
||||
' ═════════════════════════════════════════════
|
||||
' PHASE 1 — USER GENERATION
|
||||
' (independent, no FK dependencies)
|
||||
' ═════════════════════════════════════════════
|
||||
|#EAF0E8|Orchestrator — Phase 1: Users|
|
||||
note
|
||||
Users have no FK dependencies.
|
||||
Generated first so the full pool
|
||||
exists before checkin weights
|
||||
are assigned.
|
||||
main -> main : Init CurlGlobalState & LlamaBackendState
|
||||
main -> main : Build DI injector
|
||||
note right of main
|
||||
All dependencies bound with unique_ptr.
|
||||
LlamaConfig or RestConfig injected
|
||||
instead of ApplicationOptions.
|
||||
end note
|
||||
|
||||
:Spawn Thread U1 — UserProducer(sampled_locations);
|
||||
:Spawn Thread U2 — UserExportConsumer();
|
||||
:Join U1, U2;
|
||||
:Collect user_pool : std::vector<GeneratedUser>;
|
||||
main -> orch : exporter->Initialize()
|
||||
note right of orch
|
||||
Opens SQLite connection.
|
||||
Creates schema for all five fixture types
|
||||
in one DDL pass. Begins IMMEDIATE TRANSACTION.
|
||||
end note
|
||||
|
||||
|#DCE8D8|Thread U1 — UserProducer|
|
||||
while (For each Location?) is (remaining)
|
||||
:generator->GenerateUser(location);
|
||||
:user_channel_.Send(GeneratedUser);
|
||||
endwhile (done)
|
||||
:user_channel_.Close();
|
||||
main -> orch : JsonLoader::LoadLocations("locations.json")
|
||||
main -> orch : ISamplingStrategy::Sample(all_locations)
|
||||
main -> orch : BiergartenPipelineOrchestrator::Run()
|
||||
|
||||
|#E0EAE0|Thread U2 — UserExportConsumer|
|
||||
while (user_channel_.Receive()?) is (available)
|
||||
:exporter->ProcessUser(user) : sqlite3_int64;
|
||||
note right
|
||||
Returns inserted row ID.
|
||||
Stored back into GeneratedUser.user_id
|
||||
so mediator pool carries live FKs.
|
||||
' ─────────────────────────────────────────────
|
||||
' PHASE 1 — USERS
|
||||
' ─────────────────────────────────────────────
|
||||
== Phase 1 — Users (no FK dependencies) ==
|
||||
|
||||
orch -> u1 : spawn
|
||||
orch -> u2 : spawn
|
||||
|
||||
loop For each Location
|
||||
u1 -> u1 : generator->GenerateUser(location)
|
||||
u1 -> u2 : user_channel_.Send(GeneratedUser)
|
||||
end
|
||||
u1 -> u2 : user_channel_.Close()
|
||||
|
||||
loop user_channel_.Receive()
|
||||
u2 -> u2 : exporter->ProcessUser(user) : sqlite3_int64
|
||||
note right of u2
|
||||
Returns committed row ID.
|
||||
Stored on GeneratedUser.user_id.
|
||||
end note
|
||||
:Append to user_pool;
|
||||
endwhile (nullopt)
|
||||
u2 -> orch : Append to user_pool_
|
||||
end
|
||||
|
||||
|#EAF0E8|Orchestrator — Phase 1: Users|
|
||||
:mediator->OnUsersComplete(user_pool);
|
||||
note right
|
||||
Mediator receives the full committed
|
||||
user pool. Sets users_ready_ = true.
|
||||
Calls TryOpenCheckinGate() —
|
||||
gate stays closed until breweries
|
||||
are also ready.
|
||||
orch -> orch : join(U1, U2)
|
||||
note right of orch
|
||||
── BARRIER 1 ──
|
||||
user_pool_ is now fully committed.
|
||||
Phase 2 begins.
|
||||
end note
|
||||
|
||||
' ═════════════════════════════════════════════
|
||||
' PHASE 2 — BREWERY GENERATION
|
||||
' (depends on locations, runs after Phase 1)
|
||||
' ═════════════════════════════════════════════
|
||||
|#EAF0E8|Orchestrator — Phase 2: Breweries|
|
||||
note
|
||||
Runs after Phase 1 completes.
|
||||
Could be parallelised with Phase 1
|
||||
in future — FK dependency is only
|
||||
on checkins, not on users directly.
|
||||
' ─────────────────────────────────────────────
|
||||
' PHASE 2 — BREWERIES
|
||||
' ─────────────────────────────────────────────
|
||||
== Phase 2 — Breweries (depends on locations only) ==
|
||||
|
||||
orch -> b1 : spawn
|
||||
orch -> b2 : spawn
|
||||
orch -> b3 : spawn
|
||||
|
||||
loop For each Location
|
||||
b1 -> b1 : BreweryContextStrategy::QueriesFor(location)
|
||||
b1 -> b1 : WikipediaService::GetLocationContext(location,\nbrewery_context_strategy_)
|
||||
alt failure
|
||||
b1 -> b1 : LocationContext{ Absent }
|
||||
else truncated
|
||||
b1 -> b1 : LocationContext{ Partial }
|
||||
else success
|
||||
b1 -> b1 : LocationContext{ Full }
|
||||
end
|
||||
b1 -> b2 : enrichment_channel_.Send(EnrichedCity)
|
||||
note right of b1
|
||||
Blocks if channel full.
|
||||
Back-pressure against GPU consumer.
|
||||
end note
|
||||
end
|
||||
b1 -> b2 : enrichment_channel_.Close()
|
||||
|
||||
loop enrichment_channel_.Receive()
|
||||
alt context.completeness == Absent
|
||||
b2 -> b2 : spdlog::warn — proceeding with minimal prompt
|
||||
end
|
||||
b2 -> b2 : generator->GenerateBrewery(location, context)
|
||||
b2 -> b3 : brewery_channel_.Send(GeneratedBrewery)
|
||||
end
|
||||
b2 -> b3 : brewery_channel_.Close()
|
||||
|
||||
loop brewery_channel_.Receive()
|
||||
b3 -> b3 : exporter->ProcessBrewery(brewery) : sqlite3_int64
|
||||
note right of b3
|
||||
Row ID stored on GeneratedBrewery.brewery_id.
|
||||
No brewery_cache_ needed — orchestrator
|
||||
threads the ID forward directly.
|
||||
end note
|
||||
b3 -> orch : Append to brewery_pool_
|
||||
end
|
||||
|
||||
orch -> orch : join(B1, B2, B3)
|
||||
note right of orch
|
||||
── BARRIER 2 ──
|
||||
brewery_pool_ is now fully committed
|
||||
with live brewery_id values.
|
||||
Phase 3 begins.
|
||||
end note
|
||||
|
||||
:Spawn Thread B1 — EnrichmentProducer(sampled_locations);
|
||||
:Spawn Thread B2 — BreweryGenerationConsumer();
|
||||
:Spawn Thread B3 — BreweryExportConsumer();
|
||||
:Join B1, B2, B3;
|
||||
:Collect brewery_pool : std::vector<GeneratedBrewery>;
|
||||
' ─────────────────────────────────────────────
|
||||
' PHASE 3 — BEERS
|
||||
' ─────────────────────────────────────────────
|
||||
== Phase 3 — Beers (depends on brewery_pool_) ==
|
||||
|
||||
|#DCE8D8|Thread B1 — EnrichmentProducer|
|
||||
while (For each Location?) is (remaining)
|
||||
:BreweryContextStrategy::QueriesFor(location);
|
||||
:WikipediaService::GetLocationContext\n(location, brewery_context_strategy_);
|
||||
if (failure?) then (yes)
|
||||
:LocationContext{ Absent };
|
||||
else if (truncated?) then (yes)
|
||||
:LocationContext{ Partial };
|
||||
else (no)
|
||||
:LocationContext{ Full };
|
||||
endif
|
||||
:enrichment_channel_.Send(EnrichedCity);
|
||||
endwhile (done)
|
||||
:enrichment_channel_.Close();
|
||||
orch -> r1 : spawn
|
||||
orch -> r2 : spawn
|
||||
|
||||
|#E5EDE1|Thread B2 — BreweryGenerationConsumer|
|
||||
while (enrichment_channel_.Receive()?) is (available)
|
||||
:generator->GenerateBrewery(location, context);
|
||||
:brewery_channel_.Send(GeneratedBrewery);
|
||||
endwhile (nullopt)
|
||||
:brewery_channel_.Close();
|
||||
loop For each GeneratedBrewery in brewery_pool_
|
||||
r1 -> r1 : BeerContextStrategy::QueriesFor(location)
|
||||
r1 -> r1 : WikipediaService::GetLocationContext(location,\nbeer_context_strategy_)
|
||||
r1 -> r1 : generator->GenerateBeer(brewery.brewery_id,\nlocation, context)
|
||||
r1 -> r2 : beer_channel_.Send(GeneratedBeer)
|
||||
end
|
||||
r1 -> r2 : beer_channel_.Close()
|
||||
|
||||
|#E0EAE0|Thread B3 — BreweryExportConsumer|
|
||||
while (brewery_channel_.Receive()?) is (available)
|
||||
:exporter->ProcessBrewery(brewery) : sqlite3_int64;
|
||||
:Append to brewery_pool;
|
||||
endwhile (nullopt)
|
||||
loop beer_channel_.Receive()
|
||||
r2 -> r2 : exporter->ProcessBeer(beer) : sqlite3_int64
|
||||
note right of r2
|
||||
Row ID stored on GeneratedBeer.beer_id.
|
||||
end note
|
||||
r2 -> orch : Append to beer_pool_
|
||||
end
|
||||
|
||||
|#EAF0E8|Orchestrator — Phase 2: Breweries|
|
||||
:mediator->OnBreweriesComplete(brewery_pool);
|
||||
note right
|
||||
Mediator sets breweries_ready_ = true.
|
||||
Calls TryOpenCheckinGate().
|
||||
Both flags now true — gate opens.
|
||||
Mediator spawns checkin stage
|
||||
asynchronously on its own thread.
|
||||
orch -> orch : join(R1, R2)
|
||||
note right of orch
|
||||
── BARRIER 3 ──
|
||||
beer_pool_ is fully committed.
|
||||
All three upstream pools ready.
|
||||
end note
|
||||
|
||||
' ═════════════════════════════════════════════
|
||||
' PHASE 3 — BEER GENERATION
|
||||
' (depends on breweries)
|
||||
' ═════════════════════════════════════════════
|
||||
|#EAF0E8|Orchestrator — Phase 3: Beers|
|
||||
:RunBeerPhase(brewery_pool);
|
||||
' ─────────────────────────────────────────────
|
||||
' CHECKIN WEIGHT ASSIGNMENT
|
||||
' ─────────────────────────────────────────────
|
||||
== Checkin Weight Assignment ==
|
||||
|
||||
:Spawn Thread R1 — BeerGenerationProducer(brewery_pool);
|
||||
:Spawn Thread R2 — BeerExportConsumer();
|
||||
:Join R1, R2;
|
||||
:Collect beer_pool : std::vector<GeneratedBeer>;
|
||||
|
||||
|#DCE8D8|Thread R1 — BeerGenerationProducer|
|
||||
while (For each GeneratedBrewery?) is (remaining)
|
||||
:BeerContextStrategy::QueriesFor(location);
|
||||
:WikipediaService::GetLocationContext\n(location, beer_context_strategy_);
|
||||
:generator->GenerateBeer(brewery_id, location, context);
|
||||
:beer_channel_.Send(GeneratedBeer);
|
||||
endwhile (done)
|
||||
:beer_channel_.Close();
|
||||
|
||||
|#E0EAE0|Thread R2 — BeerExportConsumer|
|
||||
while (beer_channel_.Receive()?) is (available)
|
||||
:exporter->ProcessBeer(beer) : sqlite3_int64;
|
||||
:Append to beer_pool;
|
||||
endwhile (nullopt)
|
||||
|
||||
|#EAF0E8|Orchestrator — Phase 3: Beers|
|
||||
:mediator->OnBeersComplete(beer_pool);
|
||||
note right
|
||||
Mediator stores beer_pool.
|
||||
Rating stage can now reference
|
||||
beer FKs. Ratings run after
|
||||
checkins complete (checkin_id FK).
|
||||
end note
|
||||
|
||||
' ═════════════════════════════════════════════
|
||||
' MEDIATOR — CHECKIN GATE (triggered internally)
|
||||
' Runs concurrently with Phase 3
|
||||
' ═════════════════════════════════════════════
|
||||
|#F0F5EE|Mediator — Checkin Stage (gated)|
|
||||
note
|
||||
This stage was triggered by
|
||||
TryOpenCheckinGate() after both
|
||||
OnUsersComplete and OnBreweriesComplete
|
||||
fired. Runs concurrently with
|
||||
Phase 3 beer generation.
|
||||
end note
|
||||
|
||||
:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_);
|
||||
note right
|
||||
J-curve weights assigned across
|
||||
the full user population before
|
||||
any checkins are generated.
|
||||
orch -> orch : ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_)
|
||||
note right of orch
|
||||
J-curve weights written onto
|
||||
GeneratedUser.user.activity_weight.
|
||||
Small cohort gets high weight;
|
||||
long tail gets low weight.
|
||||
Requires the full pool — this is why
|
||||
users were committed first.
|
||||
end note
|
||||
|
||||
while (For each GeneratedUser in pool?) is (remaining)
|
||||
:strategy->CheckinsForUser(user, brewery_count);
|
||||
while (For each checkin index?) is (remaining)
|
||||
:strategy->TimestampFor(user, index);
|
||||
note right
|
||||
Bursty weekend/evening
|
||||
' ─────────────────────────────────────────────
|
||||
' PHASE 4 — CHECKINS
|
||||
' ─────────────────────────────────────────────
|
||||
== Phase 4 — Check-ins (depends on user_pool_ + brewery_pool_) ==
|
||||
|
||||
orch -> c1 : spawn
|
||||
orch -> c2 : spawn
|
||||
|
||||
loop For each GeneratedUser in user_pool_
|
||||
c1 -> c1 : strategy->CheckinsForUser(user,\nbrewery_pool_.size())
|
||||
loop For each checkin index
|
||||
c1 -> c1 : strategy->TimestampFor(user, index)
|
||||
note right of c1
|
||||
Bursty weekend / evening
|
||||
distribution applied here.
|
||||
end note
|
||||
:Select brewery from brewery_pool_\n(weighted random);
|
||||
:generator->GenerateCheckin(user, brewery, timestamp);
|
||||
:exporter->ProcessCheckin(checkin) : sqlite3_int64;
|
||||
:mediator->OnCheckinGenerated(checkin);
|
||||
endwhile (done)
|
||||
endwhile (done)
|
||||
c1 -> c1 : Select brewery from brewery_pool_\n(weighted random by activity_weight)
|
||||
c1 -> c1 : generator->GenerateCheckin(user, brewery, timestamp)
|
||||
c1 -> c2 : checkin_channel_.Send(GeneratedCheckin)
|
||||
end
|
||||
end
|
||||
c1 -> c2 : checkin_channel_.Close()
|
||||
|
||||
:checkins_complete_ = true;
|
||||
note right
|
||||
Rating stage depends on checkin_id FK.
|
||||
RunRatingStage() is called here,
|
||||
after all checkins are committed.
|
||||
loop checkin_channel_.Receive()
|
||||
c2 -> c2 : exporter->ProcessCheckin(checkin) : sqlite3_int64
|
||||
note right of c2
|
||||
Row ID stored on GeneratedCheckin.checkin_id.
|
||||
end note
|
||||
c2 -> orch : Append to checkin_pool_
|
||||
end
|
||||
|
||||
orch -> orch : join(C1, C2)
|
||||
note right of orch
|
||||
── BARRIER 4 ──
|
||||
checkin_pool_ is fully committed.
|
||||
All FK dependencies for ratings satisfied.
|
||||
Phase 5 begins.
|
||||
end note
|
||||
|
||||
' ═════════════════════════════════════════════
|
||||
' MEDIATOR — RATING STAGE
|
||||
' Runs after checkins complete
|
||||
' ═════════════════════════════════════════════
|
||||
|#F0F5EE|Mediator — Rating Stage|
|
||||
note
|
||||
Ratings reference user_id, beer_id,
|
||||
and checkin_id. All three pools
|
||||
are committed before this runs.
|
||||
' ─────────────────────────────────────────────
|
||||
' PHASE 5 — RATINGS
|
||||
' ─────────────────────────────────────────────
|
||||
== Phase 5 — Ratings (depends on user_pool_ + beer_pool_ + checkin_pool_) ==
|
||||
|
||||
orch -> g1 : spawn
|
||||
orch -> g2 : spawn
|
||||
|
||||
loop For each GeneratedCheckin in checkin_pool_
|
||||
g1 -> g1 : Resolve GeneratedUser from user_pool_\n(match user_id)
|
||||
g1 -> g1 : Resolve GeneratedBeer from beer_pool_\n(match brewery_id, select one)
|
||||
alt Beer found for this brewery
|
||||
g1 -> g1 : generator->GenerateRating(user, beer,\ncheckin.checkin_id)
|
||||
note right of g1
|
||||
Strong positive skew applied
|
||||
by RatingResult generation.
|
||||
end note
|
||||
inside GenerateRating.
|
||||
end note
|
||||
g1 -> g2 : rating_channel_.Send(GeneratedRating)
|
||||
else No beer found
|
||||
g1 -> g1 : spdlog::warn — no beer for brewery,\nskipping rating
|
||||
end
|
||||
end
|
||||
g1 -> g2 : rating_channel_.Close()
|
||||
|
||||
while (For each GeneratedCheckin?) is (remaining)
|
||||
if (Beer available for this brewery?) then (yes)
|
||||
:Select beer from beer_pool_\n(match brewery_id);
|
||||
:generator->GenerateRating(user, beer, checkin_id);
|
||||
:exporter->ProcessRating(rating);
|
||||
:mediator->OnRatingGenerated(rating);
|
||||
else (no)
|
||||
:Skip — no beers for this brewery yet;
|
||||
endif
|
||||
endwhile (done)
|
||||
loop rating_channel_.Receive()
|
||||
g2 -> g2 : exporter->ProcessRating(rating)
|
||||
end
|
||||
|
||||
' ═════════════════════════════════════════════
|
||||
orch -> orch : join(G1, G2)
|
||||
|
||||
' ─────────────────────────────────────────────
|
||||
' TEARDOWN
|
||||
' ═════════════════════════════════════════════
|
||||
|#F2F6F0|main.cc|
|
||||
:Await mediator completion;
|
||||
:exporter->Finalize();
|
||||
note right
|
||||
Single COMMIT covers all five
|
||||
fixture types: users, breweries,
|
||||
beers, checkins, ratings.
|
||||
All-or-nothing consistency.
|
||||
' ─────────────────────────────────────────────
|
||||
== Teardown ==
|
||||
|
||||
orch -> main : return
|
||||
main -> main : exporter->Finalize()
|
||||
note right of main
|
||||
Single COMMIT covers all five fixture types:
|
||||
users, breweries, beers, checkins, ratings.
|
||||
All-or-nothing consistency per run.
|
||||
end note
|
||||
:spdlog::info "Pipeline complete";
|
||||
:return 0;
|
||||
stop
|
||||
main -> main : spdlog::info "Pipeline complete in X ms"
|
||||
main -> main : return 0
|
||||
|
||||
@enduml
|
||||
|
||||
@@ -29,7 +29,7 @@ skinparam package {
|
||||
FontColor #28342A
|
||||
}
|
||||
|
||||
title The Biergarten Data Pipeline — Architecture v4 (Unified Orchestrator)
|
||||
title The Biergarten Data Pipeline — Architecture (Unified Orchestrator)
|
||||
|
||||
' ─────────────────────────────────────────────
|
||||
' DOMAIN: VALUE OBJECTS
|
||||
|
||||
Reference in New Issue
Block a user