1 Commits

Author SHA1 Message Date
Aaron Po
fd6ba35f68 add future plans 2026-04-20 00:07:50 -04:00
2 changed files with 771 additions and 0 deletions

View File

@@ -0,0 +1,270 @@
@startuml
skinparam style strictuml
skinparam defaultFontName "DM Sans"
skinparam defaultFontSize 14
skinparam titleFontName "Volkhov"
skinparam titleFontSize 20
skinparam backgroundColor #FAFCF9
skinparam defaultFontColor #28342A
skinparam titleFontColor #28342A
skinparam ArrowColor #628A5B
skinparam NoteBackgroundColor #EAF0E8
skinparam NoteBorderColor #547461
skinparam ActivityBackgroundColor #FAFCF9
skinparam ActivityBorderColor #547461
skinparam ActivityDiamondBackgroundColor #FAFCF9
skinparam ActivityDiamondBorderColor #628A5B
skinparam ActivityBarColor #628A5B
skinparam SwimlaneBorderColor #547461
skinparam SwimlaneBorderThickness 0.3
title The Biergarten Data Pipeline — Activity Diagram v3 (Mediator + Full Fixture Chain)
' ═════════════════════════════════════════════
' STARTUP
' ═════════════════════════════════════════════
|#F2F6F0|main.cc|
start
:ParseArguments(argc, argv);
if (Valid?) then (no)
:spdlog::error;
stop
else (yes)
endif
:Init CurlGlobalState & LlamaBackendState;
:Build DI injector;
note right
DataGenerator and IExportService
bound as shared_ptr — both the
orchestrator and mediator hold
a reference to the same instances.
end note
:Create BiergartenPipelineOrchestrator;
:Create BiergartenPipelineMediator\n(shared generator, shared exporter,\nJCurveCheckinStrategy);
:exporter->Initialize();
:JsonLoader::LoadLocations("locations.json");
:SamplingStrategy::Sample(all_locations);
:BiergartenPipelineOrchestrator::Run();
' ═════════════════════════════════════════════
' PHASE 1 — USER GENERATION
' (independent, no FK dependencies)
' ═════════════════════════════════════════════
|#EAF0E8|Orchestrator — Phase 1: Users|
note
Users have no FK dependencies.
Generated first so the full pool
exists before checkin weights
are assigned.
end note
:Spawn Thread U1 — UserProducer(sampled_locations);
:Spawn Thread U2 — UserExportConsumer();
:Join U1, U2;
:Collect user_pool : std::vector<GeneratedUser>;
|#DCE8D8|Thread U1 — UserProducer|
while (For each Location?) is (remaining)
:generator->GenerateUser(location);
:user_channel_.Send(GeneratedUser);
endwhile (done)
:user_channel_.Close();
|#E0EAE0|Thread U2 — UserExportConsumer|
while (user_channel_.Receive()?) is (available)
:exporter->ProcessUser(user) : sqlite3_int64;
note right
Returns inserted row ID.
Stored back into GeneratedUser.user_id
so mediator pool carries live FKs.
end note
:Append to user_pool;
endwhile (nullopt)
|#EAF0E8|Orchestrator — Phase 1: Users|
:mediator->OnUsersComplete(user_pool);
note right
Mediator receives the full committed
user pool. Sets users_ready_ = true.
Calls TryOpenCheckinGate() —
gate stays closed until breweries
are also ready.
end note
' ═════════════════════════════════════════════
' PHASE 2 — BREWERY GENERATION
' (depends on locations, runs after Phase 1)
' ═════════════════════════════════════════════
|#EAF0E8|Orchestrator — Phase 2: Breweries|
note
Runs after Phase 1 completes.
Could be parallelised with Phase 1
in future — FK dependency is only
on checkins, not on users directly.
end note
:Spawn Thread B1 — EnrichmentProducer(sampled_locations);
:Spawn Thread B2 — BreweryGenerationConsumer();
:Spawn Thread B3 — BreweryExportConsumer();
:Join B1, B2, B3;
:Collect brewery_pool : std::vector<GeneratedBrewery>;
|#DCE8D8|Thread B1 — EnrichmentProducer|
while (For each Location?) is (remaining)
:BreweryContextStrategy::QueriesFor(location);
:WikipediaService::GetLocationContext\n(location, brewery_context_strategy_);
if (failure?) then (yes)
:LocationContext{ Absent };
else if (truncated?) then (yes)
:LocationContext{ Partial };
else (no)
:LocationContext{ Full };
endif
:enrichment_channel_.Send(EnrichedCity);
endwhile (done)
:enrichment_channel_.Close();
|#E5EDE1|Thread B2 — BreweryGenerationConsumer|
while (enrichment_channel_.Receive()?) is (available)
:generator->GenerateBrewery(location, context);
:brewery_channel_.Send(GeneratedBrewery);
endwhile (nullopt)
:brewery_channel_.Close();
|#E0EAE0|Thread B3 — BreweryExportConsumer|
while (brewery_channel_.Receive()?) is (available)
:exporter->ProcessBrewery(brewery) : sqlite3_int64;
:Append to brewery_pool;
endwhile (nullopt)
|#EAF0E8|Orchestrator — Phase 2: Breweries|
:mediator->OnBreweriesComplete(brewery_pool);
note right
Mediator sets breweries_ready_ = true.
Calls TryOpenCheckinGate().
Both flags now true — gate opens.
Mediator spawns checkin stage
asynchronously on its own thread.
end note
' ═════════════════════════════════════════════
' PHASE 3 — BEER GENERATION
' (depends on breweries)
' ═════════════════════════════════════════════
|#EAF0E8|Orchestrator — Phase 3: Beers|
:RunBeerPhase(brewery_pool);
:Spawn Thread R1 — BeerGenerationProducer(brewery_pool);
:Spawn Thread R2 — BeerExportConsumer();
:Join R1, R2;
:Collect beer_pool : std::vector<GeneratedBeer>;
|#DCE8D8|Thread R1 — BeerGenerationProducer|
while (For each GeneratedBrewery?) is (remaining)
:BeerContextStrategy::QueriesFor(location);
:WikipediaService::GetLocationContext\n(location, beer_context_strategy_);
:generator->GenerateBeer(brewery_id, location, context);
:beer_channel_.Send(GeneratedBeer);
endwhile (done)
:beer_channel_.Close();
|#E0EAE0|Thread R2 — BeerExportConsumer|
while (beer_channel_.Receive()?) is (available)
:exporter->ProcessBeer(beer) : sqlite3_int64;
:Append to beer_pool;
endwhile (nullopt)
|#EAF0E8|Orchestrator — Phase 3: Beers|
:mediator->OnBeersComplete(beer_pool);
note right
Mediator stores beer_pool.
Rating stage can now reference
beer FKs. Ratings run after
checkins complete (checkin_id FK).
end note
' ═════════════════════════════════════════════
' MEDIATOR — CHECKIN GATE (triggered internally)
' Runs concurrently with Phase 3
' ═════════════════════════════════════════════
|#F0F5EE|Mediator — Checkin Stage (gated)|
note
This stage was triggered by
TryOpenCheckinGate() after both
OnUsersComplete and OnBreweriesComplete
fired. Runs concurrently with
Phase 3 beer generation.
end note
:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_);
note right
J-curve weights assigned across
the full user population before
any checkins are generated.
Small cohort gets high weight;
long tail gets low weight.
end note
while (For each GeneratedUser in pool?) is (remaining)
:strategy->CheckinsForUser(user, brewery_count);
while (For each checkin index?) is (remaining)
:strategy->TimestampFor(user, index);
note right
Bursty weekend/evening
distribution applied here.
end note
:Select brewery from brewery_pool_\n(weighted random);
:generator->GenerateCheckin(user, brewery, timestamp);
:exporter->ProcessCheckin(checkin) : sqlite3_int64;
:mediator->OnCheckinGenerated(checkin);
endwhile (done)
endwhile (done)
:checkins_complete_ = true;
note right
Rating stage depends on checkin_id FK.
RunRatingStage() is called here,
after all checkins are committed.
end note
' ═════════════════════════════════════════════
' MEDIATOR — RATING STAGE
' Runs after checkins complete
' ═════════════════════════════════════════════
|#F0F5EE|Mediator — Rating Stage|
note
Ratings reference user_id, beer_id,
and checkin_id. All three pools
are committed before this runs.
Strong positive skew applied
by RatingResult generation.
end note
while (For each GeneratedCheckin?) is (remaining)
if (Beer available for this brewery?) then (yes)
:Select beer from beer_pool_\n(match brewery_id);
:generator->GenerateRating(user, beer, checkin_id);
:exporter->ProcessRating(rating);
:mediator->OnRatingGenerated(rating);
else (no)
:Skip — no beers for this brewery yet;
endif
endwhile (done)
' ═════════════════════════════════════════════
' TEARDOWN
' ═════════════════════════════════════════════
|#F2F6F0|main.cc|
:Await mediator completion;
:exporter->Finalize();
note right
Single COMMIT covers all five
fixture types: users, breweries,
beers, checkins, ratings.
All-or-nothing consistency.
end note
:spdlog::info "Pipeline complete";
:return 0;
stop
@enduml

View File

@@ -0,0 +1,501 @@
@startuml
skinparam style strictuml
skinparam defaultFontName "DM Sans"
skinparam defaultFontSize 14
skinparam titleFontName "Volkhov"
skinparam titleFontSize 20
skinparam backgroundColor #FAFCF9
skinparam defaultFontColor #28342A
skinparam titleFontColor #28342A
skinparam ArrowColor #628A5B
skinparam class {
BackgroundColor #FAFCF9
HeaderBackgroundColor #EAF0E8
BorderColor #547461
ArrowColor #628A5B
FontColor #28342A
}
skinparam note {
BackgroundColor #EAF0E8
BorderColor #547461
FontColor #28342A
}
skinparam package {
BackgroundColor #F2F6F0
BorderColor #547461
FontColor #28342A
}
title The Biergarten Data Pipeline — Architecture v4 (Unified Orchestrator)
' ─────────────────────────────────────────────
' DOMAIN: VALUE OBJECTS
' ─────────────────────────────────────────────
package "Domain: Value Objects & Contracts" {
class Location {
+ city : std::string
+ state_province : std::string
+ iso3166_2 : std::string
+ country : std::string
+ iso3166_1 : std::string
+ local_languages : std::vector<std::string>
+ latitude : double
+ longitude : double
}
class LocationContext {
+ text : std::string
+ completeness : Completeness
+ char_count : size_t
--
<<enum>> Completeness
Full
Partial
Absent
}
class EnrichedCity {
+ location : Location
+ context : LocationContext
}
class BreweryResult {
+ name_en : std::string
+ description_en : std::string
+ name_local : std::string
+ description_local : std::string
}
class BeerResult {
+ name_en : std::string
+ description_en : std::string
+ name_local : std::string
+ description_local : std::string
+ style : std::string
+ abv : float
+ ibu : int
}
class UserResult {
+ username : std::string
+ bio : std::string
+ activity_weight : float
}
note right of UserResult
activity_weight assigned by
ICheckinDistributionStrategy
after the full user pool is
committed. Drives J-curve
checkin volume per user.
end note
class CheckinResult {
+ checked_in_at : std::string
+ note : std::string
}
class RatingResult {
+ score : float
+ note : std::string
}
class GeneratedBrewery {
+ brewery_id : sqlite3_int64
+ location : Location
+ brewery : BreweryResult
+ context_completeness : LocationContext::Completeness
+ generated_at : std::string
}
class GeneratedBeer {
+ beer_id : sqlite3_int64
+ brewery_id : sqlite3_int64
+ location : Location
+ beer : BeerResult
+ generated_at : std::string
}
class GeneratedUser {
+ user_id : sqlite3_int64
+ location : Location
+ user : UserResult
+ generated_at : std::string
}
class GeneratedCheckin {
+ checkin_id : sqlite3_int64
+ user_id : sqlite3_int64
+ brewery_id : sqlite3_int64
+ checkin : CheckinResult
+ generated_at : std::string
}
class GeneratedRating {
+ user_id : sqlite3_int64
+ beer_id : sqlite3_int64
+ checkin_id : sqlite3_int64
+ rating : RatingResult
+ generated_at : std::string
}
}
' ─────────────────────────────────────────────
' DOMAIN POLICY
' ─────────────────────────────────────────────
package "Domain Policy" {
interface IContextStrategy <<interface>> {
+ QueriesFor(loc : const Location&) : std::vector<std::string>
+ MaxContextChars() : size_t
}
class BreweryContextStrategy {
+ QueriesFor(loc : const Location&) : std::vector<std::string>
+ MaxContextChars() : size_t
}
class BeerContextStrategy {
+ QueriesFor(loc : const Location&) : std::vector<std::string>
+ MaxContextChars() : size_t
}
interface ISamplingStrategy <<interface>> {
+ Sample(locations : const std::vector<Location>&) : std::vector<Location>
}
class UniformSamplingStrategy {
- sample_size_ : size_t
+ Sample(locations : const std::vector<Location>&) : std::vector<Location>
}
interface ICheckinDistributionStrategy <<interface>> {
+ AssignActivityWeights(users : std::vector<GeneratedUser>&) : void
+ CheckinsForUser(user : const GeneratedUser&, brewery_count : size_t) : size_t
+ TimestampFor(user : const GeneratedUser&, index : size_t) : std::string
}
note right of ICheckinDistributionStrategy
Injected into the orchestrator.
Owns all statistical policy:
J-curve weight assignment,
bursty weekend timestamps,
per-user checkin volume.
No mediator required to hold this —
the orchestrator calls it directly
before the checkin phase opens.
end note
class JCurveCheckinStrategy {
- rng_ : std::mt19937
+ AssignActivityWeights(users : std::vector<GeneratedUser>&) : void
+ CheckinsForUser(user : const GeneratedUser&, brewery_count : size_t) : size_t
+ TimestampFor(user : const GeneratedUser&, index : size_t) : std::string
}
}
' ─────────────────────────────────────────────
' INFRASTRUCTURE: ENRICHMENT
' ─────────────────────────────────────────────
package "Infrastructure: Enrichment" {
interface IEnrichmentService <<interface>> {
+ GetLocationContext(loc : const Location&, strategy : const IContextStrategy&) : LocationContext
}
class WikipediaService {
- client_ : std::unique_ptr<WebClient>
- extract_cache_ : std::unordered_map<std::string, std::string>
+ GetLocationContext(loc : const Location&, strategy : const IContextStrategy&) : LocationContext
- FetchExtract(query : std::string_view) : std::string
}
interface WebClient <<interface>> {
+ Get(url : const std::string&) : std::string
+ UrlEncode(value : const std::string&) : std::string
}
class CURLWebClient {
+ Get(url : const std::string&) : std::string
+ UrlEncode(value : const std::string&) : std::string
}
}
' ─────────────────────────────────────────────
' INFRASTRUCTURE: GENERATION
' ─────────────────────────────────────────────
package "Infrastructure: Generation" {
interface DataGenerator <<interface>> {
+ GenerateBrewery(location : const Location&, context : const LocationContext&) : BreweryResult
+ GenerateBeer(brewery_id : sqlite3_int64, location : const Location&, context : const LocationContext&) : BeerResult
+ GenerateUser(location : const Location&) : UserResult
+ GenerateCheckin(user : const GeneratedUser&, brewery : const GeneratedBrewery&, timestamp : const std::string&) : CheckinResult
+ GenerateRating(user : const GeneratedUser&, beer : const GeneratedBeer&, checkin_id : sqlite3_int64) : RatingResult
}
class MockGenerator {
+ GenerateBrewery(...) : BreweryResult
+ GenerateBeer(...) : BeerResult
+ GenerateUser(...) : UserResult
+ GenerateCheckin(...) : CheckinResult
+ GenerateRating(...) : RatingResult
- DeterministicHash(location : const Location&) : size_t
}
class LlamaGenerator {
- model_ : ModelHandle
- context_ : ContextHandle
- prompt_formatter_ : std::unique_ptr<IPromptFormatter>
- config_ : LlamaConfig
- rng_ : std::mt19937
+ GenerateBrewery(...) : BreweryResult
+ GenerateBeer(...) : BeerResult
+ GenerateUser(...) : UserResult
+ GenerateCheckin(...) : CheckinResult
+ GenerateRating(...) : RatingResult
- Load(config : const LlamaConfig&) : void
- Infer(system_prompt, user_prompt, max_tokens, grammar) : std::string
- ValidateModelArchitecture() : void
}
class RestGenerator {
- config_ : RestConfig
+ GenerateBrewery(...) : BreweryResult
+ GenerateBeer(...) : BeerResult
+ GenerateUser(...) : UserResult
+ GenerateCheckin(...) : CheckinResult
+ GenerateRating(...) : RatingResult
}
note right of RestGenerator
Future REST-backed implementation.
Slots in at the DI root with zero
changes to orchestration logic.
end note
interface IPromptFormatter <<interface>> {
+ Format(system_prompt : std::string_view, user_prompt : std::string_view) : std::string
+ ExpectedArchitecture() : std::string_view
}
class Gemma4JinjaPromptFormatter {
+ Format(...) : std::string
+ ExpectedArchitecture() : std::string_view
}
class LlamaConfig {
+ model_path : std::string
+ temperature : float
+ top_p : float
+ top_k : uint32_t
+ n_ctx : uint32_t
+ seed : int
}
class RestConfig {
+ endpoint : std::string
+ api_key : std::string
+ timeout : std::chrono::milliseconds
}
}
' ─────────────────────────────────────────────
' INFRASTRUCTURE: PIPELINE CHANNEL
' ─────────────────────────────────────────────
package "Infrastructure: Pipeline Channel" {
class "BoundedChannel<T>" as BoundedChannel {
- queue_ : std::queue<T>
- mutex_ : std::mutex
- not_full_ : std::condition_variable
- not_empty_ : std::condition_variable
- capacity_ : size_t
- closed_ : bool
+ Send(item : T) : void
+ Receive() : std::optional<T>
+ Close() : void
}
note right of BoundedChannel
Used within each phase to
decouple production from export.
Phase boundaries are explicit
sequential barriers in the
orchestrator's Run() method —
not channel-mediated.
end note
}
' ─────────────────────────────────────────────
' INFRASTRUCTURE: EXPORT
' ─────────────────────────────────────────────
package "Infrastructure: Export" {
interface IExportService <<interface>> {
+ Initialize() : void
+ ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64
+ ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64
+ ProcessUser(user : const GeneratedUser&) : sqlite3_int64
+ ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64
+ ProcessRating(rating : const GeneratedRating&) : void
+ Finalize() : void
}
note right of IExportService
Process* methods return
sqlite3_int64 row IDs.
Orchestrator uses these to
populate FK fields on all
downstream fixture types.
end note
class SqliteExportService {
- date_time_provider_ : std::unique_ptr<IDateTimeProvider>
- db_handle_ : SqliteDatabaseHandle
- insert_location_stmt_ : SqliteStatementHandle
- insert_brewery_stmt_ : SqliteStatementHandle
- insert_beer_stmt_ : SqliteStatementHandle
- insert_user_stmt_ : SqliteStatementHandle
- insert_checkin_stmt_ : SqliteStatementHandle
- insert_rating_stmt_ : SqliteStatementHandle
- transaction_open_ : bool
- location_cache_ : std::unordered_map<std::string, sqlite3_int64>
+ Initialize() : void
+ ProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64
+ ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64
+ ProcessUser(user : const GeneratedUser&) : sqlite3_int64
+ ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64
+ ProcessRating(rating : const GeneratedRating&) : void
+ Finalize() : void
- InitializeSchema() : void
- PrepareStatements() : void
- RollbackAndCloseNoThrow() : void
- FinalizeStatements() : void
}
note right of SqliteExportService
brewery_cache_ removed — row IDs
are now carried on GeneratedBrewery
and GeneratedBeer value objects
and threaded through by the
orchestrator directly.
end note
interface IDateTimeProvider <<interface>> {
+ GetUtcTimestamp() : std::string
}
class SystemDateTimeProvider {
+ GetUtcTimestamp() : std::string
}
}
' ─────────────────────────────────────────────
' ORCHESTRATION
' ─────────────────────────────────────────────
package "Orchestration" {
class BiergartenPipelineOrchestrator {
- enrichment_service_ : std::unique_ptr<IEnrichmentService>
- generator_ : std::unique_ptr<DataGenerator>
- exporter_ : std::unique_ptr<IExportService>
- brewery_context_strategy_ : std::unique_ptr<IContextStrategy>
- beer_context_strategy_ : std::unique_ptr<IContextStrategy>
- sampling_strategy_ : std::unique_ptr<ISamplingStrategy>
- checkin_strategy_ : std::unique_ptr<ICheckinDistributionStrategy>
--
- user_pool_ : std::vector<GeneratedUser>
- brewery_pool_ : std::vector<GeneratedBrewery>
- beer_pool_ : std::vector<GeneratedBeer>
- checkin_pool_ : std::vector<GeneratedCheckin>
--
+ Run() : bool
- RunUserPhase(locations : const std::vector<Location>&) : void
- RunBreweryPhase(locations : const std::vector<Location>&) : void
- RunBeerPhase() : void
- RunCheckinPhase() : void
- RunRatingPhase() : void
}
note right of BiergartenPipelineOrchestrator
Single component owns all
sequencing. Run() reads as a
linear narrative:
1. RunUserPhase
2. RunBreweryPhase
3. RunBeerPhase
4. checkin_strategy_->AssignActivityWeights
5. RunCheckinPhase
6. RunRatingPhase
The checkin gate is an explicit
sequential barrier between steps
3 and 5 — not a hidden internal
trigger in a separate object.
Pools are members: each phase
appends to them and the next
phase reads from them directly.
No mediator. No shared_ptr.
Ownership is unambiguous.
end note
class JsonLoader {
+ {static} LoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>
}
}
' ─────────────────────────────────────────────
' RELATIONSHIPS
' ─────────────────────────────────────────────
' Orchestration
BiergartenPipelineOrchestrator *-- IEnrichmentService : owns
BiergartenPipelineOrchestrator *-- DataGenerator : owns
BiergartenPipelineOrchestrator *-- IExportService : owns
BiergartenPipelineOrchestrator *-- ICheckinDistributionStrategy : owns
BiergartenPipelineOrchestrator *-- ISamplingStrategy : owns
BiergartenPipelineOrchestrator ..> JsonLoader : uses
' Policy implementations
IContextStrategy <|.. BreweryContextStrategy : implements
IContextStrategy <|.. BeerContextStrategy : implements
ISamplingStrategy <|.. UniformSamplingStrategy : implements
ICheckinDistributionStrategy <|.. JCurveCheckinStrategy : implements
' Enrichment
IEnrichmentService <|.. WikipediaService : implements
WikipediaService *-- WebClient : owns
WikipediaService ..> IContextStrategy : uses (parameter)
WebClient <|.. CURLWebClient : implements
' Generation
DataGenerator <|.. MockGenerator : implements
DataGenerator <|.. LlamaGenerator : implements
DataGenerator <|.. RestGenerator : implements
LlamaGenerator *-- IPromptFormatter : owns
LlamaGenerator ..> LlamaConfig : constructed with
RestGenerator ..> RestConfig : constructed with
IPromptFormatter <|.. Gemma4JinjaPromptFormatter : implements
' Export
IExportService <|.. SqliteExportService : implements
SqliteExportService *-- IDateTimeProvider : owns
IDateTimeProvider <|.. SystemDateTimeProvider : implements
' Data flow
EnrichedCity *-- Location : contains
EnrichedCity *-- LocationContext : contains
GeneratedBrewery *-- Location : contains
GeneratedBrewery *-- BreweryResult : contains
GeneratedBeer *-- Location : contains
GeneratedBeer *-- BeerResult : contains
GeneratedUser *-- Location : contains
GeneratedUser *-- UserResult : contains
GeneratedCheckin *-- CheckinResult : contains
GeneratedRating *-- RatingResult : contains
@enduml