From fd6ba35f68e9112017ac82a60a5563d0e2c0365b Mon Sep 17 00:00:00 2001 From: Aaron Po Date: Sun, 19 Apr 2026 23:10:08 -0400 Subject: [PATCH] add future plans --- .../diagrams/future-possible-activity.puml | 270 ++++++++++ .../future-possible-architecture.puml | 501 ++++++++++++++++++ 2 files changed, 771 insertions(+) create mode 100644 pipeline/diagrams/future-possible-activity.puml create mode 100644 pipeline/diagrams/future-possible-architecture.puml diff --git a/pipeline/diagrams/future-possible-activity.puml b/pipeline/diagrams/future-possible-activity.puml new file mode 100644 index 0000000..0c27701 --- /dev/null +++ b/pipeline/diagrams/future-possible-activity.puml @@ -0,0 +1,270 @@ +@startuml +skinparam style strictuml +skinparam defaultFontName "DM Sans" +skinparam defaultFontSize 14 +skinparam titleFontName "Volkhov" +skinparam titleFontSize 20 +skinparam backgroundColor #FAFCF9 +skinparam defaultFontColor #28342A +skinparam titleFontColor #28342A +skinparam ArrowColor #628A5B +skinparam NoteBackgroundColor #EAF0E8 +skinparam NoteBorderColor #547461 +skinparam ActivityBackgroundColor #FAFCF9 +skinparam ActivityBorderColor #547461 +skinparam ActivityDiamondBackgroundColor #FAFCF9 +skinparam ActivityDiamondBorderColor #628A5B +skinparam ActivityBarColor #628A5B +skinparam SwimlaneBorderColor #547461 +skinparam SwimlaneBorderThickness 0.3 + +title The Biergarten Data Pipeline — Activity Diagram v3 (Mediator + Full Fixture Chain) + +' ═════════════════════════════════════════════ +' STARTUP +' ═════════════════════════════════════════════ +|#F2F6F0|main.cc| +start +:ParseArguments(argc, argv); +if (Valid?) then (no) + :spdlog::error; + stop +else (yes) +endif +:Init CurlGlobalState & LlamaBackendState; +:Build DI injector; +note right + DataGenerator and IExportService + bound as shared_ptr — both the + orchestrator and mediator hold + a reference to the same instances. +end note +:Create BiergartenPipelineOrchestrator; +:Create BiergartenPipelineMediator\n(shared generator, shared exporter,\nJCurveCheckinStrategy); +:exporter->Initialize(); +:JsonLoader::LoadLocations("locations.json"); +:SamplingStrategy::Sample(all_locations); +:BiergartenPipelineOrchestrator::Run(); + +' ═════════════════════════════════════════════ +' PHASE 1 — USER GENERATION +' (independent, no FK dependencies) +' ═════════════════════════════════════════════ +|#EAF0E8|Orchestrator — Phase 1: Users| +note + Users have no FK dependencies. + Generated first so the full pool + exists before checkin weights + are assigned. +end note + +:Spawn Thread U1 — UserProducer(sampled_locations); +:Spawn Thread U2 — UserExportConsumer(); +:Join U1, U2; +:Collect user_pool : std::vector; + +|#DCE8D8|Thread U1 — UserProducer| +while (For each Location?) is (remaining) + :generator->GenerateUser(location); + :user_channel_.Send(GeneratedUser); +endwhile (done) +:user_channel_.Close(); + +|#E0EAE0|Thread U2 — UserExportConsumer| +while (user_channel_.Receive()?) is (available) + :exporter->ProcessUser(user) : sqlite3_int64; + note right + Returns inserted row ID. + Stored back into GeneratedUser.user_id + so mediator pool carries live FKs. + end note + :Append to user_pool; +endwhile (nullopt) + +|#EAF0E8|Orchestrator — Phase 1: Users| +:mediator->OnUsersComplete(user_pool); +note right + Mediator receives the full committed + user pool. Sets users_ready_ = true. + Calls TryOpenCheckinGate() — + gate stays closed until breweries + are also ready. +end note + +' ═════════════════════════════════════════════ +' PHASE 2 — BREWERY GENERATION +' (depends on locations, runs after Phase 1) +' ═════════════════════════════════════════════ +|#EAF0E8|Orchestrator — Phase 2: Breweries| +note + Runs after Phase 1 completes. + Could be parallelised with Phase 1 + in future — FK dependency is only + on checkins, not on users directly. +end note + +:Spawn Thread B1 — EnrichmentProducer(sampled_locations); +:Spawn Thread B2 — BreweryGenerationConsumer(); +:Spawn Thread B3 — BreweryExportConsumer(); +:Join B1, B2, B3; +:Collect brewery_pool : std::vector; + +|#DCE8D8|Thread B1 — EnrichmentProducer| +while (For each Location?) is (remaining) + :BreweryContextStrategy::QueriesFor(location); + :WikipediaService::GetLocationContext\n(location, brewery_context_strategy_); + if (failure?) then (yes) + :LocationContext{ Absent }; + else if (truncated?) then (yes) + :LocationContext{ Partial }; + else (no) + :LocationContext{ Full }; + endif + :enrichment_channel_.Send(EnrichedCity); +endwhile (done) +:enrichment_channel_.Close(); + +|#E5EDE1|Thread B2 — BreweryGenerationConsumer| +while (enrichment_channel_.Receive()?) is (available) + :generator->GenerateBrewery(location, context); + :brewery_channel_.Send(GeneratedBrewery); +endwhile (nullopt) +:brewery_channel_.Close(); + +|#E0EAE0|Thread B3 — BreweryExportConsumer| +while (brewery_channel_.Receive()?) is (available) + :exporter->ProcessBrewery(brewery) : sqlite3_int64; + :Append to brewery_pool; +endwhile (nullopt) + +|#EAF0E8|Orchestrator — Phase 2: Breweries| +:mediator->OnBreweriesComplete(brewery_pool); +note right + Mediator sets breweries_ready_ = true. + Calls TryOpenCheckinGate(). + Both flags now true — gate opens. + Mediator spawns checkin stage + asynchronously on its own thread. +end note + +' ═════════════════════════════════════════════ +' PHASE 3 — BEER GENERATION +' (depends on breweries) +' ═════════════════════════════════════════════ +|#EAF0E8|Orchestrator — Phase 3: Beers| +:RunBeerPhase(brewery_pool); + +:Spawn Thread R1 — BeerGenerationProducer(brewery_pool); +:Spawn Thread R2 — BeerExportConsumer(); +:Join R1, R2; +:Collect beer_pool : std::vector; + +|#DCE8D8|Thread R1 — BeerGenerationProducer| +while (For each GeneratedBrewery?) is (remaining) + :BeerContextStrategy::QueriesFor(location); + :WikipediaService::GetLocationContext\n(location, beer_context_strategy_); + :generator->GenerateBeer(brewery_id, location, context); + :beer_channel_.Send(GeneratedBeer); +endwhile (done) +:beer_channel_.Close(); + +|#E0EAE0|Thread R2 — BeerExportConsumer| +while (beer_channel_.Receive()?) is (available) + :exporter->ProcessBeer(beer) : sqlite3_int64; + :Append to beer_pool; +endwhile (nullopt) + +|#EAF0E8|Orchestrator — Phase 3: Beers| +:mediator->OnBeersComplete(beer_pool); +note right + Mediator stores beer_pool. + Rating stage can now reference + beer FKs. Ratings run after + checkins complete (checkin_id FK). +end note + +' ═════════════════════════════════════════════ +' MEDIATOR — CHECKIN GATE (triggered internally) +' Runs concurrently with Phase 3 +' ═════════════════════════════════════════════ +|#F0F5EE|Mediator — Checkin Stage (gated)| +note + This stage was triggered by + TryOpenCheckinGate() after both + OnUsersComplete and OnBreweriesComplete + fired. Runs concurrently with + Phase 3 beer generation. +end note + +:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_); +note right + J-curve weights assigned across + the full user population before + any checkins are generated. + Small cohort gets high weight; + long tail gets low weight. +end note + +while (For each GeneratedUser in pool?) is (remaining) + :strategy->CheckinsForUser(user, brewery_count); + while (For each checkin index?) is (remaining) + :strategy->TimestampFor(user, index); + note right + Bursty weekend/evening + distribution applied here. + end note + :Select brewery from brewery_pool_\n(weighted random); + :generator->GenerateCheckin(user, brewery, timestamp); + :exporter->ProcessCheckin(checkin) : sqlite3_int64; + :mediator->OnCheckinGenerated(checkin); + endwhile (done) +endwhile (done) + +:checkins_complete_ = true; +note right + Rating stage depends on checkin_id FK. + RunRatingStage() is called here, + after all checkins are committed. +end note + +' ═════════════════════════════════════════════ +' MEDIATOR — RATING STAGE +' Runs after checkins complete +' ═════════════════════════════════════════════ +|#F0F5EE|Mediator — Rating Stage| +note + Ratings reference user_id, beer_id, + and checkin_id. All three pools + are committed before this runs. + Strong positive skew applied + by RatingResult generation. +end note + +while (For each GeneratedCheckin?) is (remaining) + if (Beer available for this brewery?) then (yes) + :Select beer from beer_pool_\n(match brewery_id); + :generator->GenerateRating(user, beer, checkin_id); + :exporter->ProcessRating(rating); + :mediator->OnRatingGenerated(rating); + else (no) + :Skip — no beers for this brewery yet; + endif +endwhile (done) + +' ═════════════════════════════════════════════ +' TEARDOWN +' ═════════════════════════════════════════════ +|#F2F6F0|main.cc| +:Await mediator completion; +:exporter->Finalize(); +note right + Single COMMIT covers all five + fixture types: users, breweries, + beers, checkins, ratings. + All-or-nothing consistency. +end note +:spdlog::info "Pipeline complete"; +:return 0; +stop + +@enduml diff --git a/pipeline/diagrams/future-possible-architecture.puml b/pipeline/diagrams/future-possible-architecture.puml new file mode 100644 index 0000000..39983a2 --- /dev/null +++ b/pipeline/diagrams/future-possible-architecture.puml @@ -0,0 +1,501 @@ +@startuml +skinparam style strictuml +skinparam defaultFontName "DM Sans" +skinparam defaultFontSize 14 +skinparam titleFontName "Volkhov" +skinparam titleFontSize 20 +skinparam backgroundColor #FAFCF9 +skinparam defaultFontColor #28342A +skinparam titleFontColor #28342A +skinparam ArrowColor #628A5B + +skinparam class { + BackgroundColor #FAFCF9 + HeaderBackgroundColor #EAF0E8 + BorderColor #547461 + ArrowColor #628A5B + FontColor #28342A +} + +skinparam note { + BackgroundColor #EAF0E8 + BorderColor #547461 + FontColor #28342A +} + +skinparam package { + BackgroundColor #F2F6F0 + BorderColor #547461 + FontColor #28342A +} + +title The Biergarten Data Pipeline — Architecture v4 (Unified Orchestrator) + +' ───────────────────────────────────────────── +' DOMAIN: VALUE OBJECTS +' ───────────────────────────────────────────── +package "Domain: Value Objects & Contracts" { + + class Location { + + city : std::string + + state_province : std::string + + iso3166_2 : std::string + + country : std::string + + iso3166_1 : std::string + + local_languages : std::vector + + latitude : double + + longitude : double + } + + class LocationContext { + + text : std::string + + completeness : Completeness + + char_count : size_t + -- + <> Completeness + Full + Partial + Absent + } + + class EnrichedCity { + + location : Location + + context : LocationContext + } + + class BreweryResult { + + name_en : std::string + + description_en : std::string + + name_local : std::string + + description_local : std::string + } + + class BeerResult { + + name_en : std::string + + description_en : std::string + + name_local : std::string + + description_local : std::string + + style : std::string + + abv : float + + ibu : int + } + + class UserResult { + + username : std::string + + bio : std::string + + activity_weight : float + } + note right of UserResult + activity_weight assigned by + ICheckinDistributionStrategy + after the full user pool is + committed. Drives J-curve + checkin volume per user. + end note + + class CheckinResult { + + checked_in_at : std::string + + note : std::string + } + + class RatingResult { + + score : float + + note : std::string + } + + class GeneratedBrewery { + + brewery_id : sqlite3_int64 + + location : Location + + brewery : BreweryResult + + context_completeness : LocationContext::Completeness + + generated_at : std::string + } + + class GeneratedBeer { + + beer_id : sqlite3_int64 + + brewery_id : sqlite3_int64 + + location : Location + + beer : BeerResult + + generated_at : std::string + } + + class GeneratedUser { + + user_id : sqlite3_int64 + + location : Location + + user : UserResult + + generated_at : std::string + } + + class GeneratedCheckin { + + checkin_id : sqlite3_int64 + + user_id : sqlite3_int64 + + brewery_id : sqlite3_int64 + + checkin : CheckinResult + + generated_at : std::string + } + + class GeneratedRating { + + user_id : sqlite3_int64 + + beer_id : sqlite3_int64 + + checkin_id : sqlite3_int64 + + rating : RatingResult + + generated_at : std::string + } + +} + +' ───────────────────────────────────────────── +' DOMAIN POLICY +' ───────────────────────────────────────────── +package "Domain Policy" { + + interface IContextStrategy <> { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + class BreweryContextStrategy { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + class BeerContextStrategy { + + QueriesFor(loc : const Location&) : std::vector + + MaxContextChars() : size_t + } + + interface ISamplingStrategy <> { + + Sample(locations : const std::vector&) : std::vector + } + + class UniformSamplingStrategy { + - sample_size_ : size_t + + Sample(locations : const std::vector&) : std::vector + } + + interface ICheckinDistributionStrategy <> { + + AssignActivityWeights(users : std::vector&) : void + + CheckinsForUser(user : const GeneratedUser&, brewery_count : size_t) : size_t + + TimestampFor(user : const GeneratedUser&, index : size_t) : std::string + } + note right of ICheckinDistributionStrategy + Injected into the orchestrator. + Owns all statistical policy: + J-curve weight assignment, + bursty weekend timestamps, + per-user checkin volume. + No mediator required to hold this — + the orchestrator calls it directly + before the checkin phase opens. + end note + + class JCurveCheckinStrategy { + - rng_ : std::mt19937 + + AssignActivityWeights(users : std::vector&) : void + + CheckinsForUser(user : const GeneratedUser&, brewery_count : size_t) : size_t + + TimestampFor(user : const GeneratedUser&, index : size_t) : std::string + } + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: ENRICHMENT +' ───────────────────────────────────────────── +package "Infrastructure: Enrichment" { + + interface IEnrichmentService <> { + + GetLocationContext(loc : const Location&, strategy : const IContextStrategy&) : LocationContext + } + + class WikipediaService { + - client_ : std::unique_ptr + - extract_cache_ : std::unordered_map + + GetLocationContext(loc : const Location&, strategy : const IContextStrategy&) : LocationContext + - FetchExtract(query : std::string_view) : std::string + } + + interface WebClient <> { + + Get(url : const std::string&) : std::string + + UrlEncode(value : const std::string&) : std::string + } + + class CURLWebClient { + + Get(url : const std::string&) : std::string + + UrlEncode(value : const std::string&) : std::string + } + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: GENERATION +' ───────────────────────────────────────────── +package "Infrastructure: Generation" { + + interface DataGenerator <> { + + GenerateBrewery(location : const Location&, context : const LocationContext&) : BreweryResult + + GenerateBeer(brewery_id : sqlite3_int64, location : const Location&, context : const LocationContext&) : BeerResult + + GenerateUser(location : const Location&) : UserResult + + GenerateCheckin(user : const GeneratedUser&, brewery : const GeneratedBrewery&, timestamp : const std::string&) : CheckinResult + + GenerateRating(user : const GeneratedUser&, beer : const GeneratedBeer&, checkin_id : sqlite3_int64) : RatingResult + } + + class MockGenerator { + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + - DeterministicHash(location : const Location&) : size_t + } + + class LlamaGenerator { + - model_ : ModelHandle + - context_ : ContextHandle + - prompt_formatter_ : std::unique_ptr + - config_ : LlamaConfig + - rng_ : std::mt19937 + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + - Load(config : const LlamaConfig&) : void + - Infer(system_prompt, user_prompt, max_tokens, grammar) : std::string + - ValidateModelArchitecture() : void + } + + class RestGenerator { + - config_ : RestConfig + + GenerateBrewery(...) : BreweryResult + + GenerateBeer(...) : BeerResult + + GenerateUser(...) : UserResult + + GenerateCheckin(...) : CheckinResult + + GenerateRating(...) : RatingResult + } + note right of RestGenerator + Future REST-backed implementation. + Slots in at the DI root with zero + changes to orchestration logic. + end note + + interface IPromptFormatter <> { + + Format(system_prompt : std::string_view, user_prompt : std::string_view) : std::string + + ExpectedArchitecture() : std::string_view + } + + class Gemma4JinjaPromptFormatter { + + Format(...) : std::string + + ExpectedArchitecture() : std::string_view + } + + class LlamaConfig { + + model_path : std::string + + temperature : float + + top_p : float + + top_k : uint32_t + + n_ctx : uint32_t + + seed : int + } + + class RestConfig { + + endpoint : std::string + + api_key : std::string + + timeout : std::chrono::milliseconds + } + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: PIPELINE CHANNEL +' ───────────────────────────────────────────── +package "Infrastructure: Pipeline Channel" { + + class "BoundedChannel" as BoundedChannel { + - queue_ : std::queue + - mutex_ : std::mutex + - not_full_ : std::condition_variable + - not_empty_ : std::condition_variable + - capacity_ : size_t + - closed_ : bool + + Send(item : T) : void + + Receive() : std::optional + + Close() : void + } + note right of BoundedChannel + Used within each phase to + decouple production from export. + Phase boundaries are explicit + sequential barriers in the + orchestrator's Run() method — + not channel-mediated. + end note + +} + +' ───────────────────────────────────────────── +' INFRASTRUCTURE: EXPORT +' ───────────────────────────────────────────── +package "Infrastructure: Export" { + + interface IExportService <> { + + Initialize() : void + + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 + + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 + + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 + + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 + + ProcessRating(rating : const GeneratedRating&) : void + + Finalize() : void + } + note right of IExportService + Process* methods return + sqlite3_int64 row IDs. + Orchestrator uses these to + populate FK fields on all + downstream fixture types. + end note + + class SqliteExportService { + - date_time_provider_ : std::unique_ptr + - db_handle_ : SqliteDatabaseHandle + - insert_location_stmt_ : SqliteStatementHandle + - insert_brewery_stmt_ : SqliteStatementHandle + - insert_beer_stmt_ : SqliteStatementHandle + - insert_user_stmt_ : SqliteStatementHandle + - insert_checkin_stmt_ : SqliteStatementHandle + - insert_rating_stmt_ : SqliteStatementHandle + - transaction_open_ : bool + - location_cache_ : std::unordered_map + + Initialize() : void + + ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64 + + ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64 + + ProcessUser(user : const GeneratedUser&) : sqlite3_int64 + + ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64 + + ProcessRating(rating : const GeneratedRating&) : void + + Finalize() : void + - InitializeSchema() : void + - PrepareStatements() : void + - RollbackAndCloseNoThrow() : void + - FinalizeStatements() : void + } + note right of SqliteExportService + brewery_cache_ removed — row IDs + are now carried on GeneratedBrewery + and GeneratedBeer value objects + and threaded through by the + orchestrator directly. + end note + + interface IDateTimeProvider <> { + + GetUtcTimestamp() : std::string + } + + class SystemDateTimeProvider { + + GetUtcTimestamp() : std::string + } + +} + +' ───────────────────────────────────────────── +' ORCHESTRATION +' ───────────────────────────────────────────── +package "Orchestration" { + + class BiergartenPipelineOrchestrator { + - enrichment_service_ : std::unique_ptr + - generator_ : std::unique_ptr + - exporter_ : std::unique_ptr + - brewery_context_strategy_ : std::unique_ptr + - beer_context_strategy_ : std::unique_ptr + - sampling_strategy_ : std::unique_ptr + - checkin_strategy_ : std::unique_ptr + -- + - user_pool_ : std::vector + - brewery_pool_ : std::vector + - beer_pool_ : std::vector + - checkin_pool_ : std::vector + -- + + Run() : bool + - RunUserPhase(locations : const std::vector&) : void + - RunBreweryPhase(locations : const std::vector&) : void + - RunBeerPhase() : void + - RunCheckinPhase() : void + - RunRatingPhase() : void + } + note right of BiergartenPipelineOrchestrator + Single component owns all + sequencing. Run() reads as a + linear narrative: + 1. RunUserPhase + 2. RunBreweryPhase + 3. RunBeerPhase + 4. checkin_strategy_->AssignActivityWeights + 5. RunCheckinPhase + 6. RunRatingPhase + The checkin gate is an explicit + sequential barrier between steps + 3 and 5 — not a hidden internal + trigger in a separate object. + Pools are members: each phase + appends to them and the next + phase reads from them directly. + No mediator. No shared_ptr. + Ownership is unambiguous. + end note + + class JsonLoader { + + {static} LoadLocations(filepath : const std::filesystem::path&) : std::vector + } + +} + +' ───────────────────────────────────────────── +' RELATIONSHIPS +' ───────────────────────────────────────────── + +' Orchestration +BiergartenPipelineOrchestrator *-- IEnrichmentService : owns +BiergartenPipelineOrchestrator *-- DataGenerator : owns +BiergartenPipelineOrchestrator *-- IExportService : owns +BiergartenPipelineOrchestrator *-- ICheckinDistributionStrategy : owns +BiergartenPipelineOrchestrator *-- ISamplingStrategy : owns +BiergartenPipelineOrchestrator ..> JsonLoader : uses + +' Policy implementations +IContextStrategy <|.. BreweryContextStrategy : implements +IContextStrategy <|.. BeerContextStrategy : implements +ISamplingStrategy <|.. UniformSamplingStrategy : implements +ICheckinDistributionStrategy <|.. JCurveCheckinStrategy : implements + +' Enrichment +IEnrichmentService <|.. WikipediaService : implements +WikipediaService *-- WebClient : owns +WikipediaService ..> IContextStrategy : uses (parameter) +WebClient <|.. CURLWebClient : implements + +' Generation +DataGenerator <|.. MockGenerator : implements +DataGenerator <|.. LlamaGenerator : implements +DataGenerator <|.. RestGenerator : implements +LlamaGenerator *-- IPromptFormatter : owns +LlamaGenerator ..> LlamaConfig : constructed with +RestGenerator ..> RestConfig : constructed with +IPromptFormatter <|.. Gemma4JinjaPromptFormatter : implements + +' Export +IExportService <|.. SqliteExportService : implements +SqliteExportService *-- IDateTimeProvider : owns +IDateTimeProvider <|.. SystemDateTimeProvider : implements + +' Data flow +EnrichedCity *-- Location : contains +EnrichedCity *-- LocationContext : contains +GeneratedBrewery *-- Location : contains +GeneratedBrewery *-- BreweryResult : contains +GeneratedBeer *-- Location : contains +GeneratedBeer *-- BeerResult : contains +GeneratedUser *-- Location : contains +GeneratedUser *-- UserResult : contains +GeneratedCheckin *-- CheckinResult : contains +GeneratedRating *-- RatingResult : contains + +@enduml