diff --git a/pipeline/diagrams/future_possible_activity.svg b/pipeline/diagrams/future_possible_activity.svg new file mode 100644 index 0000000..7676296 --- /dev/null +++ b/pipeline/diagrams/future_possible_activity.svg @@ -0,0 +1 @@ +The Biergarten Data Pipeline — Activity DiagramThe Biergarten Data Pipeline — Activity DiagramParseArguments(argc, argv)spdlog::erroryesInvalid args?noInit CurlGlobalState & LlamaBackendStateBuild DI injectorJsonLoader::LoadLocations("locations.json")JsonLoader::LoadBeerStyles("beer-styles.json")NEW: Beer styles do not need location context.Wikipedia summaries for the entire palette arefetched and cached globally at startup.EnrichmentService::PreWarmBeerStyleCache(beer_styles)Opens SQLite connection.Begins a single transactioncovering all five fixture types.Initialize SqliteExportServiceBiergartenPipelineOrchestrator::Run()COMMIT covers all five fixture types.Finalize SqliteExportServicespdlog::info "Pipeline complete in X ms"RunUserPhase(sampled_locations)Create BoundedChannels(user_llm_ch, user_exp_ch)Loop: Send Locations → user_llm_chClose user_llm_chJoin LLM Worker, SQLite WorkerRunBreweryAndBeerPhase(sampled_locations)Create BoundedChannels(loc_ch, llm_ch, exp_ch)Loop: Send Locations → loc_chClose loc_chJoin Enrichment WorkersClose llm_chBoth brewery_pool_ and beer_pool_are now completely populated.Join LLM Worker, SQLite WorkerRunCheckinPhase()ICheckinDistributionStrategy::AssignActivityWeights(user_pool_)CheckinsForUser(user, brewery_pool_.size())TimestampFor(user, index)Select brewery from brewery_pool_GenerateCheckin(user, brewery, timestamp)via DataGeneratorProcessCheckin(checkin) → sqlite3_int64Append → checkin_pool_remainingFor each checkin index?doneremainingFor each GeneratedUser in user_pool_?doneRunRatingPhase()Match brewery_id → select beerfrom beer_pool_ (same brewery_id)Beer exists for brewery?yesnoGenerateRating(user, beer, checkin_id)via DataGeneratorProcessRating(rating)Skip — brewery has no beersremainingFor each GeneratedCheckin in checkin_pool_?doneReceive LocationGenerateUser(location)via DataGeneratorSend GeneratedUser → user_exp_chyesuser_llm_ch has items?noClose user_exp_chReceive EnrichedCityGenerateBrewery(location, context)via DataGeneratorIBeerSelectionStrategy::SelectStyles(brewery, beer_style_palette_)Guaranteed cache hit from startup.GetStyleContextFromCache(style)GenerateBeer(brewery, style_context)via DataGeneratorAttach GeneratedBeer to Brewery bundleremainingFor each selected BeerStyle?doneThe next generation of a brewery isentirely dependent on the currentbrewery and its beers completing.Send BreweryWithBeers Bundle → exp_chyesllm_ch has items?noClose exp_chReceive GeneratedUserProcessUser(user) → sqlite3_int64Append → user_pool_yesuser_exp_ch has items?noReceive BreweryWithBeers BundleProcessBrewery(brewery) → brewery_idAppend → brewery_pool_Set beer.brewery_id = brewery_idProcessBeer(beer) → sqlite3_int64Append → beer_pool_remainingFor each beer in bundle?doneyesexp_ch has items?noReceive LocationGetLocationContext(location,BreweryContextStrategy)Send EnrichedCity → llm_chyesloc_ch has items?noMainOrchestratorLLM WorkerSQLite WorkerEnrichment Workers (xN) \ No newline at end of file diff --git a/pipeline/diagrams/future_possible_architecture.svg b/pipeline/diagrams/future_possible_architecture.svg new file mode 100644 index 0000000..bf052f3 --- /dev/null +++ b/pipeline/diagrams/future_possible_architecture.svg @@ -0,0 +1 @@ +The Biergarten Data Pipeline — ArchitectureThe Biergarten Data Pipeline — ArchitectureDomain: Value Objects & ContractsDomain PolicyInfrastructure: EnrichmentInfrastructure: GenerationInfrastructure: Pipeline ChannelInfrastructure: ExportOrchestrationLocationcity : std::stringstate_province : std::stringiso3166_2 : std::stringcountry : std::stringiso3166_1 : std::stringlocal_languages : std::vector<std::string>latitude : doublelongitude : doubleLocationContexttext : std::stringcompleteness : Completenesschar_count : size_t«enum» CompletenessFullPartialAbsentEnrichedCitylocation : Locationcontext : LocationContextBeerStylename : std::stringdescription : std::stringmin_abv : floatmax_abv : floatmin_ibu : intmax_ibu : intLoaded once at startup frombeer-styles.json via JsonLoader.Passed as std::span<const BeerStyle>to IBeerSelectionStrategy.Generator receives the selectedstyle as a parameter — it neverreads the palette directly.BreweryResultname_en : std::stringdescription_en : std::stringname_local : std::stringdescription_local : std::stringBeerResultname_en : std::stringdescription_en : std::stringname_local : std::stringdescription_local : std::stringstyle : std::stringabv : floatibu : intUserResultusername : std::stringbio : std::stringactivity_weight : floatactivity_weight assigned byICheckinDistributionStrategyafter the full user pool iscommitted. Drives J-curvecheckin volume per user.CheckinResultchecked_in_at : std::stringnote : std::stringRatingResultscore : floatnote : std::stringGeneratedBrewerybrewery_id : sqlite3_int64location : Locationbrewery : BreweryResultcontext_completeness : LocationContext::Completenessgenerated_at : std::stringGeneratedBeerbeer_id : sqlite3_int64brewery_id : sqlite3_int64location : Locationbeer : BeerResultgenerated_at : std::stringGeneratedUseruser_id : sqlite3_int64location : Locationuser : UserResultgenerated_at : std::stringuser_id populated after SQLiteinsert. Live FK carried in poolfor checkin and rating references.GeneratedCheckincheckin_id : sqlite3_int64user_id : sqlite3_int64brewery_id : sqlite3_int64checkin : CheckinResultgenerated_at : std::stringGeneratedRatinguser_id : sqlite3_int64beer_id : sqlite3_int64checkin_id : sqlite3_int64rating : RatingResultgenerated_at : std::string«interface»IContextStrategyQueriesFor(loc : const Location&) : std::vector<std::string>MaxContextChars() : size_tBreweryContextStrategyQueriesFor(loc : const Location&) : std::vector<std::string>MaxContextChars() : size_tBeerContextStrategyQueriesFor(loc : const Location&) : std::vector<std::string>MaxContextChars() : size_t«interface»ISamplingStrategySample(locations : const std::vector<Location>&) : std::vector<Location>UniformSamplingStrategysample_size_ : size_tSample(locations : const std::vector<Location>&) : std::vector<Location>«interface»IBeerSelectionStrategySelectStyles(brewery : const GeneratedBrewery&,palette : std::span<const BeerStyle>) : std::vector<BeerStyle>Decides how many beers a brewerygets and which styles are selected.Count distribution and stylededuplication logic live here,not in the orchestrator or generator.RandomBeerSelectionStrategyrng_ : std::mt19937min_beers_ : size_tmax_beers_ : size_tSelectStyles(brewery : const GeneratedBrewery&,palette : std::span<const BeerStyle>) : std::vector<BeerStyle>Draws a random count in [min, max].Samples without replacement frompalette to avoid duplicate stylesper brewery.«interface»ICheckinDistributionStrategyAssignActivityWeights(users : std::vector<GeneratedUser>&) : voidCheckinsForUser(user : const GeneratedUser&,brewery_count : size_t) : size_tTimestampFor(user : const GeneratedUser&,index : size_t) : std::stringOwns all statistical policy:J-curve weight assignment,bursty weekend timestamps,per-user checkin volume.JCurveCheckinStrategyrng_ : std::mt19937AssignActivityWeights(users : std::vector<GeneratedUser>&) : voidCheckinsForUser(user : const GeneratedUser&,brewery_count : size_t) : size_tTimestampFor(user : const GeneratedUser&,index : size_t) : std::string«interface»IEnrichmentServiceGetLocationContext(loc : const Location&,strategy : const IContextStrategy&) : LocationContextWikipediaServiceclient_ : std::unique_ptr<WebClient>extract_cache_ : std::unordered_map<std::string, std::string>GetLocationContext(loc : const Location&,strategy : const IContextStrategy&) : LocationContextFetchExtract(query : std::string_view) : std::stringextract_cache_ keyed by query string.Beer pass gets near-100% cache hitssince locations were already fetchedduring the brewery pass.«interface»WebClientGet(url : const std::string&) : std::stringUrlEncode(value : const std::string&) : std::stringCURLWebClientGet(url : const std::string&) : std::stringUrlEncode(value : const std::string&) : std::string«interface»DataGeneratorGenerateBrewery(location : const Location&,context : const LocationContext&) : BreweryResultGenerateBeer(brewery_id : sqlite3_int64,location : const Location&,context : const LocationContext&,style : const BeerStyle&) : BeerResultGenerateUser(location : const Location&) : UserResultGenerateCheckin(user : const GeneratedUser&,brewery : const GeneratedBrewery&,timestamp : const std::string&) : CheckinResultGenerateRating(user : const GeneratedUser&,beer : const GeneratedBeer&,checkin_id : sqlite3_int64) : RatingResultGenerateBeer receives BeerStyleas a parameter. Style selectionand count decisions live inIBeerSelectionStrategy, not here.MockGeneratorGenerateBrewery(...) : BreweryResultGenerateBeer(...) : BeerResultGenerateUser(...) : UserResultGenerateCheckin(...) : CheckinResultGenerateRating(...) : RatingResultDeterministicHash(location : const Location&) : size_tLlamaGeneratormodel_ : ModelHandlecontext_ : ContextHandleprompt_formatter_ : std::unique_ptr<IPromptFormatter>config_ : LlamaConfigrng_ : std::mt19937GenerateBrewery(...) : BreweryResultGenerateBeer(...) : BeerResultGenerateUser(...) : UserResultGenerateCheckin(...) : CheckinResultGenerateRating(...) : RatingResultLoad(config : const LlamaConfig&) : voidInfer(system_prompt, user_prompt,max_tokens, grammar) : std::stringValidateModelArchitecture() : void«interface»IPromptFormatterFormat(system_prompt : std::string_view,user_prompt : std::string_view) : std::stringExpectedArchitecture() : std::string_viewGemma4JinjaPromptFormatterFormat(...) : std::stringExpectedArchitecture() : std::string_viewLlamaConfigmodel_path : std::stringtemperature : floattop_p : floattop_k : uint32_tn_ctx : uint32_tseed : intBoundedChannelTqueue_ : std::queue<T>mutex_ : std::mutexnot_full_ : std::condition_variablenot_empty_ : std::condition_variablecapacity_ : size_tclosed_ : boolSend(item : T) : voidReceive() : std::optional<T>Close() : voidUsed for user, brewery, andcheckin/rating phases.Beer phase uses a simplesequential loop — enrichmentis all cache hits, no fan-outneeded.«interface»IExportServiceInitialize() : voidProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64ProcessUser(user : const GeneratedUser&) : sqlite3_int64ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64ProcessRating(rating : const GeneratedRating&) : voidFinalize() : voidSqliteExportServicedate_time_provider_ : std::unique_ptr<IDateTimeProvider>db_handle_ : SqliteDatabaseHandleinsert_location_stmt_ : SqliteStatementHandleinsert_brewery_stmt_ : SqliteStatementHandleinsert_beer_stmt_ : SqliteStatementHandleinsert_user_stmt_ : SqliteStatementHandleinsert_checkin_stmt_ : SqliteStatementHandleinsert_rating_stmt_ : SqliteStatementHandletransaction_open_ : boollocation_cache_ : std::unordered_map<std::string, sqlite3_int64>brewery_cache_ : std::unordered_map<std::string, sqlite3_int64>Initialize() : voidProcessBrewery(brewery : const GeneratedBrewery&) : sqlite3_int64ProcessBeer(beer : const GeneratedBeer&) : sqlite3_int64ProcessUser(user : const GeneratedUser&) : sqlite3_int64ProcessCheckin(checkin : const GeneratedCheckin&) : sqlite3_int64ProcessRating(rating : const GeneratedRating&) : voidFinalize() : voidInitializeSchema() : voidPrepareStatements() : voidRollbackAndCloseNoThrow() : voidFinalizeStatements() : voidbrewery_cache_ restored.Keyed by location string forlocation deduplication, andby brewery identity for beerFK resolution without re-querying.«interface»IDateTimeProviderGetUtcTimestamp() : std::stringSystemDateTimeProviderGetUtcTimestamp() : std::stringBiergartenPipelineOrchestratorenrichment_service_ : std::unique_ptr<IEnrichmentService>generator_ : std::unique_ptr<DataGenerator>exporter_ : std::unique_ptr<IExportService>brewery_context_strategy_ : std::unique_ptr<IContextStrategy>beer_context_strategy_ : std::unique_ptr<IContextStrategy>sampling_strategy_ : std::unique_ptr<ISamplingStrategy>beer_selection_strategy_ : std::unique_ptr<IBeerSelectionStrategy>checkin_strategy_ : std::unique_ptr<ICheckinDistributionStrategy>beer_style_palette_ : std::vector<BeerStyle>user_pool_ : std::vector<GeneratedUser>brewery_pool_ : std::vector<GeneratedBrewery>beer_pool_ : std::vector<GeneratedBeer>checkin_pool_ : std::vector<GeneratedCheckin>Run() : boolRunUserPhase(locations : const std::vector<Location>&) : voidRunBreweryPhase(locations : const std::vector<Location>&) : voidRunBeerPhase() : voidRunCheckinPhase() : voidRunRatingPhase() : voidbeer_style_palette_ loaded onceat startup from beer-styles.json.Passed as std::span<const BeerStyle>to IBeerSelectionStrategy per brewery.RunBeerPhase() is a sequential loop —no channels, no fan-out. Enrichmentis cache hits; LLM is the only cost.JsonLoaderLoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector<BeerStyle>LoadBeerStyles() added.Reads beer-styles.json onceat startup into the paletteheld by the orchestrator.ownsownsownsownsownsownsusesimplementsimplementsimplementsimplementsimplementsimplementsownsuses (parameter)implementsimplementsimplementsownsconstructed withimplementsimplementsownsimplementscontainscontainscontainscontainscontainscontainscontainscontainscontainscontains \ No newline at end of file