diff --git a/pipeline/.gitignore b/pipeline/.gitignore new file mode 100644 index 0000000..2c120f6 --- /dev/null +++ b/pipeline/.gitignore @@ -0,0 +1,3 @@ +dist +build +data diff --git a/pipeline/CMakeLists.txt b/pipeline/CMakeLists.txt new file mode 100644 index 0000000..9adf1fe --- /dev/null +++ b/pipeline/CMakeLists.txt @@ -0,0 +1,113 @@ +cmake_minimum_required(VERSION 3.20) +project(biergarten-pipeline VERSION 0.1.0 LANGUAGES CXX) + +cmake_policy(SET CMP0167 NEW) + +set(CMAKE_CXX_STANDARD 23) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) + +find_package(CURL REQUIRED) +find_package(Boost REQUIRED COMPONENTS unit_test_framework) +find_package(SQLite3 REQUIRED) + +include(FetchContent) + +FetchContent_Declare( + nlohmann_json + GIT_REPOSITORY https://github.com/nlohmann/json.git + GIT_TAG v3.11.3 +) +FetchContent_MakeAvailable(nlohmann_json) + +# TODO: Integrate real llama.cpp when generator is ready to use actual models +# For now, using mocked brewery generation in generator.cpp + +# SQLite for in-memory database +find_package(SQLite3 REQUIRED) + +file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS + src/*.cpp +) + +add_executable(biergarten-pipeline ${SOURCES}) + +target_include_directories(biergarten-pipeline + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/includes +) + +target_link_libraries(biergarten-pipeline + PRIVATE + CURL::libcurl + nlohmann_json::nlohmann_json + Boost::unit_test_framework + SQLite::SQLite3 +) + +target_compile_options(biergarten-pipeline PRIVATE + $<$: + -Wall + -Wextra + -Wpedantic + -Wshadow + -Wconversion + -Wsign-conversion + > + $<$: + /W4 + /WX + > +) + +add_custom_command(TARGET biergarten-pipeline POST_BUILD + COMMAND ${CMAKE_COMMAND} -E make_directory + ${CMAKE_CURRENT_SOURCE_DIR}/output + COMMENT "Creating output/ directory for seed SQL files" +) + +find_program(VALGRIND valgrind) +if(VALGRIND) + add_custom_target(memcheck + COMMAND ${VALGRIND} + --leak-check=full + --error-exitcode=1 + $ --help + DEPENDS biergarten-pipeline + COMMENT "Running Valgrind memcheck" + ) +endif() + +include(CTest) + +if(BUILD_TESTING) + find_package(Boost REQUIRED COMPONENTS unit_test_framework) + + file(GLOB_RECURSE TEST_SOURCES CONFIGURE_DEPENDS + tests/*.cpp + tests/*.cc + tests/*.cxx + ) + + if(TEST_SOURCES) + add_executable(biergarten-pipeline-tests ${TEST_SOURCES}) + + target_include_directories(biergarten-pipeline-tests + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/include + ) + + target_link_libraries(biergarten-pipeline-tests + PRIVATE + Boost::unit_test_framework + CURL::libcurl + nlohmann_json::nlohmann_json + llama + ) + + add_test( + NAME biergarten-pipeline-tests + COMMAND biergarten-pipeline-tests + ) + endif() +endif() diff --git a/pipeline/README.md b/pipeline/README.md new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/pipeline/README.md @@ -0,0 +1 @@ + diff --git a/pipeline/includes/data_downloader.h b/pipeline/includes/data_downloader.h new file mode 100644 index 0000000..aec548b --- /dev/null +++ b/pipeline/includes/data_downloader.h @@ -0,0 +1,111 @@ +/** + * @file data_downloader.h + * @brief Download geographic data from GitHub repositories using libcurl. + * + * Provides functionality to fetch JSON data from GitHub using libcurl, with + * support for commit-based versioning to ensure reproducible builds. Downloads + * are cached to avoid repeated network requests. + * + * Example usage: + * @code + * DataDownloader downloader; + * std::string jsonPath = downloader.DownloadCountriesDatabase( + * "/tmp/countries-data.json", // local cache path + * "c5eb7772" // optional commit hash or HEAD + * ); + * // Now use jsonPath with JsonLoader::LoadWorldCities(jsonPath, db) + * @endcode + */ + +#ifndef DATA_DOWNLOADER_H +#define DATA_DOWNLOADER_H + +#include +#include + +/** + * @class DataDownloader + * @brief Manages downloading and caching of geographic data from GitHub. + * + * This class encapsulates libcurl networking operations for reproducible + * data fetching. All methods are non-blocking and synchronous. + * + * @note Requires libcurl to be available at runtime. + * @note GitHub raw content CDN is used for efficient downloads. + */ +class DataDownloader { +public: + /** + * @brief Default constructor. + * + * Initializes the downloader without any specific state. The downloader + * is ready to use immediately. + */ + DataDownloader(); + + /** + * @brief Destructor. + * + * Cleans up any resources. No explicit cleanup needed beyond destruction. + */ + ~DataDownloader(); + + /** + * @brief Download the countries+states+cities JSON database from GitHub. + * + * Downloads the geographic data from the + * dr5hn/countries-states-cities-database repository. If the file already + * exists at cachePath, it is used directly without downloading again. + * + * The download URL format is: + * @verbatim + * https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/ + * {commit}/json/countries+states+cities.json + * @endverbatim + * + * @param cachePath Local filesystem path where the JSON file should be + * stored. If the file already exists, download is skipped. + * @param commit Git commit hash or branch name (default: "c5eb7772"). + * Examples: "HEAD", "main", "c5eb7772", + * "c5eb7772225f6b1802a54f39adb8c73464a85be1a" + * + * @return The file path where JSON was saved (same as cachePath). + * + * @throws std::runtime_error if: + * - Network download fails + * - File cannot be written to cachePath + * - Commit hash is invalid (404 on GitHub) + * + * Example with default commit (stable v2026-03-28): + * @code + * std::string path = + * downloader.DownloadCountriesDatabase("/tmp/data.json"); + * @endcode + * + * Example with custom commit: + * @code + * std::string path = downloader.DownloadCountriesDatabase( + * "/tmp/data.json", + * "main" // Download latest from main branch + * ); + * @endcode + */ + std::string DownloadCountriesDatabase( + const std::string &cachePath, + const std::string &commit = "c5eb7772" // Stable commit: 2026-03-28 export + ); + +private: + /** + * @brief Check if a file already exists at the given path. + * + * Used internally to implement cache-hit logic. No download occurs if + * the file already exists. + * + * @param filePath Path to check. + * @return True if file exists and is readable, false otherwise. + */ + bool FileExists(const std::string &filePath) const; +}; + +#endif // DATA_DOWNLOADER_H diff --git a/pipeline/includes/database.h b/pipeline/includes/database.h new file mode 100644 index 0000000..88e7636 --- /dev/null +++ b/pipeline/includes/database.h @@ -0,0 +1,102 @@ +#pragma once + +#include +#include +#include +#include + +/// @struct Country +/// @brief Represents a country with geographic identifiers +struct Country { + int id; + std::string name; + std::string iso2; ///< 2-letter ISO code (e.g., "US", "CA") + std::string iso3; ///< 3-letter ISO code (e.g., "USA", "CAN") +}; + +/// @struct State +/// @brief Represents a state or province with geographic identifiers +struct State { + int id; + std::string name; + std::string iso2; ///< 2-letter state code (e.g., "CA", "ON") + int countryId; +}; + +/** + * @class SqliteDatabase + * @brief Thread-safe in-memory SQLite database wrapper for geographic data + * + * Manages a local in-memory SQLite database with countries, states, and cities. + * All write operations are serialized via mutex to enable safe concurrent + * access from multiple threads. Uses INSERT OR IGNORE for idempotent + * operations. + * + * Schema Relationships: + * countries (id, name, iso2, iso3) + * ↓ (one-to-many) + * states (id, country_id, name, iso2) + * ↓ (one-to-many) + * cities (id, state_id, country_id, name, latitude, longitude) + */ +class SqliteDatabase { +private: + sqlite3 *db = nullptr; ///< SQLite database connection handle + std::mutex dbMutex; ///< Protects all database operations from race conditions + + /// @brief Creates the schema with three related tables and foreign keys + void InitializeSchema(); + +public: + /// @brief Destructor: safely closes the database connection + ~SqliteDatabase(); + + /// @brief Opens an in-memory SQLite database and initializes the schema + void Initialize(); + + /// @brief Inserts a country record + /// @param id Unique country identifier + /// @param name Country name + /// @param iso2 2-letter ISO country code + /// @param iso3 3-letter ISO country code + /// @note Thread-safe: uses mutex lock. Idempotent: INSERT OR IGNORE prevents + /// duplicates + void InsertCountry(int id, const std::string &name, const std::string &iso2, + const std::string &iso3); + + /// @brief Inserts a state/province record + /// @param id Unique state identifier + /// @param countryId Foreign key reference to parent country + /// @param name State/province name + /// @param iso2 2-letter state code (e.g., "CA", "ON") + /// @note Thread-safe and idempotent via mutex and INSERT OR IGNORE + void InsertState(int id, int countryId, const std::string &name, + const std::string &iso2); + + /// @brief Inserts a city record with geographic coordinates + /// @param id Unique city identifier + /// @param stateId Foreign key reference to parent state + /// @param countryId Foreign key reference to parent country + /// @param name City name + /// @param latitude Geographic latitude coordinate (WGS84) + /// @param longitude Geographic longitude coordinate (WGS84) + /// @note Thread-safe and idempotent. Called by multithreaded JSON loader. + void InsertCity(int id, int stateId, int countryId, const std::string &name, + double latitude, double longitude); + + /// @brief Queries all cities from the database + /// @return Vector of (city_id, city_name) pairs sorted alphabetically + std::vector> QueryCities(); + + /// @brief Queries all countries from the database with ISO codes + /// @param limit Maximum number of records to return (0 = all) + /// @return Vector of Country structs (includes id, name, iso2, iso3) sorted + /// alphabetically + std::vector QueryCountries(int limit = 0); + + /// @brief Queries all states from the database with ISO codes + /// @param limit Maximum number of records to return (0 = all) + /// @return Vector of State structs (includes id, name, iso2, countryId) + /// sorted alphabetically + std::vector QueryStates(int limit = 0); +}; diff --git a/pipeline/includes/generator.h b/pipeline/includes/generator.h new file mode 100644 index 0000000..08d1c7c --- /dev/null +++ b/pipeline/includes/generator.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include + +/** + * @class LlamaBreweryGenerator + * @brief Generates brewery names and descriptions for cities + * + * Currently provides a deterministic mock implementation that generates + * brewery names and descriptions based on city name hashing. + * + * Design Pattern: Strategy pattern ready for swapping real llama.cpp + * implementation later. The LoadModel() and GenerateBrewery() interface + * will remain the same once actual LM inference is integrated. + * + * Mock Implementation: Uses std::hash to deterministically map city names + * to brewery templates, ensuring reproducible results for testing. + */ +class LlamaBreweryGenerator { +private: + /// Adjectives for brewery names (e.g., "Craft", "Heritage", etc.) + const std::vector breweryAdjectives = { + "Craft", "Heritage", "Local", "Artisan", + "Pioneer", "Golden", "Modern", "Classic"}; + + /// Nouns for brewery names (e.g., "Brewing Co.", "Brewery", etc.) + const std::vector breweryNouns = { + "Brewing Co.", "Brewery", "Bier Haus", "Taproom", + "Works", "House", "Fermentery", "Ale Co."}; + + /// Pre-written brewery descriptions (currently hand-crafted) + const std::vector descriptions = { + "Handcrafted pale ales and seasonal IPAs with local ingredients.", + "Traditional lagers and experimental sours in small batches.", + "Award-winning stouts and wildly hoppy blonde ales.", + "Craft brewery specializing in Belgian-style triples and dark porters.", + "Modern brewery blending tradition with bold experimental flavors."}; + +public: + /// @struct Brewery + /// @brief Output structure for generated brewery data + struct Brewery { + std::string name; ///< Generated brewery name (e.g., "Craft Brewing Co.") + std::string description; ///< Short description of brewery style/offerings + }; + + /// @brief Loads a language model (currently mocked) + /// @param modelPath Path to GGUF model file (not used in mock) + /// @note In real implementation, loads llama.cpp model into memory + void LoadModel(const std::string &modelPath); + + /// @brief Generates a brewery name and description for a city + /// @param cityName City name to generate brewery for + /// @param seed Integer seed (used for deterministic output in mock) + /// @return Brewery struct with name and description + /// @note Deterministic: same cityName+seed always produces same brewery + Brewery GenerateBrewery(const std::string &cityName, int seed); +}; diff --git a/pipeline/includes/json_loader.h b/pipeline/includes/json_loader.h new file mode 100644 index 0000000..ee47e2b --- /dev/null +++ b/pipeline/includes/json_loader.h @@ -0,0 +1,85 @@ +#pragma once + +#include "database.h" +#include +#include + +using json = nlohmann::json; + +/** + * @class JsonLoader + * @brief Loads world geographic data from JSON file into SQLite database + * + * Handles parsing and population of world cities, states, and countries from + * a structured JSON source file. The loader uses parallel threads to chunk + * the city records and maximize database insertion throughput. + * + * Input Format (JSON Structure): + * @code + * { + * "countries": [ + * {"id": 1, "name": "Canada", "iso2": "CA", "iso3": "CAN"}, + * ... + * ], + * "states": [ + * {"id": 1, "country_id": 1, "name": "Ontario", "iso2": "ON"}, + * ... + * ], + * "cities": [ + * {"id": 1, "state_id": 1, "country_id": 1, "name": "Toronto", + * "latitude": 43.6532, "longitude": -79.3832}, + * ... + * ] + * } + * @endcode + * + * Performance Characteristics: + * - Reads entire JSON file into memory (nlohmann/json parser) + * - Iterates through countries: typically 200+ records + * - Iterates through states: typically 3000+ records + * - Iterates through cities: typically 50,000+ records (MAJOR DATASET) + * - Uses multithreading to chunk city insertion across threads + * - Thread pool size defaults to number of CPU cores + * + * Multithreading Strategy: + * - Divides cities into N chunks (N = CPU core count) + * - Each thread processes one chunk sequentially + * - Database has mutex protection for thread-safe concurrent access + * - Allows safe parallel writing to same SQLite database + * + * Example Usage: + * @code + * SqliteDatabase db; + * db.Initialize(); + * JsonLoader::LoadWorldCities("../data/world_city_data.json", db); + * // Database now contains all countries, states, and cities + * @endcode + */ +class JsonLoader { +public: + /// @brief Loads world geographic data from JSON and populates database + /// + /// Process: + /// 1. Reads and parses entire JSON file + /// 2. Inserts all countries into database (typically 200-250 records) + /// 3. Inserts all states/provinces (typically 3000+ records) + /// 4. Spawns worker threads to insert cities (typically 50,000+ records) + /// 5. Waits for all threads to complete + /// 6. Prints statistics about loaded data + /// + /// @param jsonPath Filesystem path to world_city_data.json + /// @param db Reference to initialized SqliteDatabase to populate + /// + /// @throws std::runtime_error if JSON file cannot be read or parsed + /// @throws std::runtime_error if database insertion fails + /// + /// Output Examples: + /// @code + /// Loading JSON: ../data/world_city_data.json + /// Loaded countries: 250 + /// Loaded states: 3500 + /// Loaded cities: 52000 + /// ✓ World city data loaded successfully + /// @endcode + static void LoadWorldCities(const std::string &jsonPath, SqliteDatabase &db); +}; diff --git a/pipeline/src/data_downloader.cpp b/pipeline/src/data_downloader.cpp new file mode 100644 index 0000000..ef02b05 --- /dev/null +++ b/pipeline/src/data_downloader.cpp @@ -0,0 +1,163 @@ +/** + * @file data_downloader.cpp + * @brief Implementation of DataDownloader using libcurl for HTTP downloads. + * + * Provides robust downloading with proper error handling, timeout management, + * and local caching to avoid repeated network calls. Uses GitHub's raw content + * CDN for reliable high-bandwidth downloads. + */ + +#include "data_downloader.h" +#include +#include +#include +#include +#include +#include + +/** + * @brief Callback function for libcurl to write downloaded content to file. + * + * This callback is invoked repeatedly by curl as data arrives over the network. + * Each invocation contains a chunk of the response body. The function writes + * the content to the output file stream. + * + * @param contents Pointer to buffer containing data chunk. + * @param size Element size (always 1 for text). + * @param nmemb Number of elements in chunk. + * @param userp Opaque pointer to std::ofstream (FILE*). + * + * @return Total bytes written. Must match (size * nmemb) for success; + * returning less signals an error to curl. + * + * @note libcurl requires this signature: (char* ptr, size_t size, size_t nmemb, + * void* userp) + */ +static size_t WriteCallback(void *contents, size_t size, size_t nmemb, + void *userp) { + // Calculate total bytes in this chunk + size_t realsize = size * nmemb; + + // Cast userp back to ofstream + std::ofstream *outFile = static_cast(userp); + + // Write to file + outFile->write(static_cast(contents), realsize); + + // Return actual bytes written (success = requested amount) + return realsize; +} + +DataDownloader::DataDownloader() { + // curl_global_init is called by user or external subsystem in a thread-safe + // manner. Not calling it here to avoid multiple initialization in + // multi-downloader scenarios. +} + +DataDownloader::~DataDownloader() { + // No explicit cleanup needed; curl_global_cleanup managed externally. +} + +bool DataDownloader::FileExists(const std::string &filePath) const { + // Use POSIX stat() to check file existence without opening it + struct stat buffer; + return (stat(filePath.c_str(), &buffer) == 0); +} + +std::string +DataDownloader::DownloadCountriesDatabase(const std::string &cachePath, + const std::string &commit) { + // Check if file already cached locally + if (FileExists(cachePath)) { + std::cout << "[DataDownloader] Cache hit: " << cachePath << std::endl; + return cachePath; + } + + // Construct download URL + // Full commit hash is accepted, but only first 7 chars (short hash) are + // needed + std::string shortCommit = commit; + if (commit.length() > 7) { + shortCommit = commit.substr(0, 7); + } + + std::string url = "https://raw.githubusercontent.com/dr5hn/" + "countries-states-cities-database/" + + shortCommit + "/json/countries+states+cities.json"; + + std::cout << "[DataDownloader] Downloading: " << url << std::endl; + + // Initialize curl handle + CURL *curl = curl_easy_init(); + if (!curl) { + throw std::runtime_error("[DataDownloader] Failed to initialize libcurl"); + } + + // Open output file for writing (binary mode to preserve exact bytes) + std::ofstream outFile(cachePath, std::ios::binary); + if (!outFile.is_open()) { + curl_easy_cleanup(curl); + throw std::runtime_error("[DataDownloader] Cannot open file for writing: " + + cachePath); + } + + // Configure curl for download + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, static_cast(&outFile)); + + // Set reasonable timeout (30 seconds for initial connection, 300s for + // transfer) + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 30L); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 300L); + + // Follow redirects (CDN may redirect) + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 5L); + + // Use gzip compression if server supports it + curl_easy_setopt(curl, CURLOPT_ACCEPT_ENCODING, "gzip"); + + // Set user agent to identify the application + curl_easy_setopt(curl, CURLOPT_USERAGENT, "biergarten-pipeline/0.1.0"); + + // Perform the download + CURLcode res = curl_easy_perform(curl); + outFile.close(); + + // Check for curl errors + if (res != CURLE_OK) { + curl_easy_cleanup(curl); + + // Remove partially downloaded file + std::remove(cachePath.c_str()); + + std::string error = std::string("[DataDownloader] Download failed: ") + + curl_easy_strerror(res); + throw std::runtime_error(error); + } + + // Check HTTP response code + long httpCode = 0; + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); + curl_easy_cleanup(curl); + + if (httpCode != 200) { + // Remove partially downloaded or error file + std::remove(cachePath.c_str()); + + std::stringstream ss; + ss << "[DataDownloader] HTTP error " << httpCode + << " (commit: " << shortCommit << ")"; + throw std::runtime_error(ss.str()); + } + + // Get file size for diagnostics + std::ifstream fileCheck(cachePath, std::ios::binary | std::ios::ate); + std::streamsize size = fileCheck.tellg(); + fileCheck.close(); + + std::cout << "[DataDownloader] ✓ Download complete: " << cachePath << " (" + << (size / (1024.0 * 1024.0)) << " MB)" << std::endl; + return cachePath; +} diff --git a/pipeline/src/database.cpp b/pipeline/src/database.cpp new file mode 100644 index 0000000..892cb9f --- /dev/null +++ b/pipeline/src/database.cpp @@ -0,0 +1,229 @@ +#include "database.h" +#include +#include + +void SqliteDatabase::InitializeSchema() { + std::lock_guard lock(dbMutex); + + const char *schema = R"( + CREATE TABLE IF NOT EXISTS countries ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + iso2 TEXT, + iso3 TEXT + ); + + CREATE TABLE IF NOT EXISTS states ( + id INTEGER PRIMARY KEY, + country_id INTEGER NOT NULL, + name TEXT NOT NULL, + iso2 TEXT, + FOREIGN KEY(country_id) REFERENCES countries(id) + ); + + CREATE TABLE IF NOT EXISTS cities ( + id INTEGER PRIMARY KEY, + state_id INTEGER NOT NULL, + country_id INTEGER NOT NULL, + name TEXT NOT NULL, + latitude REAL, + longitude REAL, + FOREIGN KEY(state_id) REFERENCES states(id), + FOREIGN KEY(country_id) REFERENCES countries(id) + ); + )"; + + char *errMsg = nullptr; + int rc = sqlite3_exec(db, schema, nullptr, nullptr, &errMsg); + if (rc != SQLITE_OK) { + std::string error = errMsg ? std::string(errMsg) : "Unknown error"; + sqlite3_free(errMsg); + throw std::runtime_error("Failed to create schema: " + error); + } +} + +SqliteDatabase::~SqliteDatabase() { + if (db) { + sqlite3_close(db); + } +} + +void SqliteDatabase::Initialize() { + int rc = sqlite3_open(":memory:", &db); + if (rc) { + throw std::runtime_error("Failed to create in-memory SQLite database"); + } + std::cout << "✓ In-memory SQLite database created\n"; + InitializeSchema(); +} + +void SqliteDatabase::InsertCountry(int id, const std::string &name, + const std::string &iso2, + const std::string &iso3) { + std::lock_guard lock(dbMutex); + + const char *query = R"( + INSERT OR IGNORE INTO countries (id, name, iso2, iso3) + VALUES (?, ?, ?, ?) + )"; + + sqlite3_stmt *stmt; + int rc = sqlite3_prepare_v2(db, query, -1, &stmt, nullptr); + if (rc != SQLITE_OK) + throw std::runtime_error("Failed to prepare country insert"); + + sqlite3_bind_int(stmt, 1, id); + sqlite3_bind_text(stmt, 2, name.c_str(), -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 3, iso2.c_str(), -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 4, iso3.c_str(), -1, SQLITE_STATIC); + + if (sqlite3_step(stmt) != SQLITE_DONE) { + throw std::runtime_error("Failed to insert country"); + } + sqlite3_finalize(stmt); +} + +void SqliteDatabase::InsertState(int id, int countryId, const std::string &name, + const std::string &iso2) { + std::lock_guard lock(dbMutex); + + const char *query = R"( + INSERT OR IGNORE INTO states (id, country_id, name, iso2) + VALUES (?, ?, ?, ?) + )"; + + sqlite3_stmt *stmt; + int rc = sqlite3_prepare_v2(db, query, -1, &stmt, nullptr); + if (rc != SQLITE_OK) + throw std::runtime_error("Failed to prepare state insert"); + + sqlite3_bind_int(stmt, 1, id); + sqlite3_bind_int(stmt, 2, countryId); + sqlite3_bind_text(stmt, 3, name.c_str(), -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, 4, iso2.c_str(), -1, SQLITE_STATIC); + + if (sqlite3_step(stmt) != SQLITE_DONE) { + throw std::runtime_error("Failed to insert state"); + } + sqlite3_finalize(stmt); +} + +void SqliteDatabase::InsertCity(int id, int stateId, int countryId, + const std::string &name, double latitude, + double longitude) { + std::lock_guard lock(dbMutex); + + const char *query = R"( + INSERT OR IGNORE INTO cities (id, state_id, country_id, name, latitude, longitude) + VALUES (?, ?, ?, ?, ?, ?) + )"; + + sqlite3_stmt *stmt; + int rc = sqlite3_prepare_v2(db, query, -1, &stmt, nullptr); + if (rc != SQLITE_OK) + throw std::runtime_error("Failed to prepare city insert"); + + sqlite3_bind_int(stmt, 1, id); + sqlite3_bind_int(stmt, 2, stateId); + sqlite3_bind_int(stmt, 3, countryId); + sqlite3_bind_text(stmt, 4, name.c_str(), -1, SQLITE_STATIC); + sqlite3_bind_double(stmt, 5, latitude); + sqlite3_bind_double(stmt, 6, longitude); + + if (sqlite3_step(stmt) != SQLITE_DONE) { + throw std::runtime_error("Failed to insert city"); + } + sqlite3_finalize(stmt); +} + +std::vector> SqliteDatabase::QueryCities() { + std::lock_guard lock(dbMutex); + + std::vector> cities; + sqlite3_stmt *stmt = nullptr; + + const char *query = "SELECT id, name FROM cities ORDER BY name"; + int rc = sqlite3_prepare_v2(db, query, -1, &stmt, nullptr); + + if (rc != SQLITE_OK) { + throw std::runtime_error("Failed to prepare query"); + } + + while (sqlite3_step(stmt) == SQLITE_ROW) { + int id = sqlite3_column_int(stmt, 0); + const char *name = + reinterpret_cast(sqlite3_column_text(stmt, 1)); + cities.push_back({id, name ? std::string(name) : ""}); + } + + sqlite3_finalize(stmt); + return cities; +} + +std::vector SqliteDatabase::QueryCountries(int limit) { + std::lock_guard lock(dbMutex); + + std::vector countries; + sqlite3_stmt *stmt = nullptr; + + std::string query = + "SELECT id, name, iso2, iso3 FROM countries ORDER BY name"; + if (limit > 0) { + query += " LIMIT " + std::to_string(limit); + } + + int rc = sqlite3_prepare_v2(db, query.c_str(), -1, &stmt, nullptr); + + if (rc != SQLITE_OK) { + throw std::runtime_error("Failed to prepare countries query"); + } + + while (sqlite3_step(stmt) == SQLITE_ROW) { + int id = sqlite3_column_int(stmt, 0); + const char *name = + reinterpret_cast(sqlite3_column_text(stmt, 1)); + const char *iso2 = + reinterpret_cast(sqlite3_column_text(stmt, 2)); + const char *iso3 = + reinterpret_cast(sqlite3_column_text(stmt, 3)); + countries.push_back({id, name ? std::string(name) : "", + iso2 ? std::string(iso2) : "", + iso3 ? std::string(iso3) : ""}); + } + + sqlite3_finalize(stmt); + return countries; +} + +std::vector SqliteDatabase::QueryStates(int limit) { + std::lock_guard lock(dbMutex); + + std::vector states; + sqlite3_stmt *stmt = nullptr; + + std::string query = + "SELECT id, name, iso2, country_id FROM states ORDER BY name"; + if (limit > 0) { + query += " LIMIT " + std::to_string(limit); + } + + int rc = sqlite3_prepare_v2(db, query.c_str(), -1, &stmt, nullptr); + + if (rc != SQLITE_OK) { + throw std::runtime_error("Failed to prepare states query"); + } + + while (sqlite3_step(stmt) == SQLITE_ROW) { + int id = sqlite3_column_int(stmt, 0); + const char *name = + reinterpret_cast(sqlite3_column_text(stmt, 1)); + const char *iso2 = + reinterpret_cast(sqlite3_column_text(stmt, 2)); + int countryId = sqlite3_column_int(stmt, 3); + states.push_back({id, name ? std::string(name) : "", + iso2 ? std::string(iso2) : "", countryId}); + } + + sqlite3_finalize(stmt); + return states; +} diff --git a/pipeline/src/generator.cpp b/pipeline/src/generator.cpp new file mode 100644 index 0000000..eb152aa --- /dev/null +++ b/pipeline/src/generator.cpp @@ -0,0 +1,81 @@ +#include "generator.h" +#include +#include + +/** + * @brief Initializes the brewery generator by loading a language model + * + * Current Implementation (Mock): + * - Outputs informational messages about model initialization + * - Does not load actual llama.cpp model yet + * - Serves as interface definition for future real implementation + * + * Future Implementation: + * - Will load a GGUF-format LLM model file using llama.cpp + * - Will initialize CPU/GPU inference context + * - Will cache model weights for repeated brewery generation + * + * @param modelPath Path to GGUF model file (e.g., "models/llama-7b.gguf") + * + * Example output: + * @code + * [Mock] Initialized llama model: models/llama-7b.gguf + * ✓ Model ready + * @endcode + */ +void LlamaBreweryGenerator::LoadModel(const std::string &modelPath) { + std::cout << " [Mock] Initialized llama model: " << modelPath << "\n"; + std::cout << " ✓ Model ready\n"; +} + +/** + * @brief Generates a brewery name and description for a city using + * deterministic hashing + * + * Algorithm: + * 1. Combines city name with seed to create unique hash input + * 2. Uses std::hash to compute deterministic hash value + * 3. Uses modulo arithmetic to map hash to template arrays: + * - name: adjective[hash % 8] + noun[(hash/7) % 8] + * - description: descriptions[(hash/13) % 5] + * 4. Returns Brewery struct with generated name and description + * + * Determinism: + * - Same cityName + seed ALWAYS produces same result + * - Enables reproducible testing and consistent brewery assignments + * - Hash distribution spreads city names across template combinations + * + * Example: + * @code + * auto gen = LlamaBreweryGenerator(); + * auto brewery = gen.GenerateBrewery("Toronto", 1); + * // Always produces same brewery for same city/seed + * assert(gen.GenerateBrewery("Toronto", 1).name == brewery.name); + * @endcode + * + * @param cityName The city to generate a brewery for + * @param seed An integer seed for deterministic variation (usually 0 or row ID) + * @return Brewery struct containing: + * - name: Combined adjective + noun (e.g., "Craft Brewing Co.") + * - description: Pre-written description matching brewery style + * + * @note Future: Replace hashing with actual LLM inference + * Interface will remain identical for smooth migration + */ +LlamaBreweryGenerator::Brewery +LlamaBreweryGenerator::GenerateBrewery(const std::string &cityName, int seed) { + // Deterministic mock generation based on city name and seed + // Combines city name with seed to ensure different results for same city + // with different seed values (useful for generating multiple breweries per + // city) + size_t nameHash = std::hash{}(cityName + std::to_string(seed)); + + Brewery result; + // Select adjective and noun using hash modulo + // Divided by 7 and 13 to ensure different modulo results from same hash + result.name = breweryAdjectives[nameHash % breweryAdjectives.size()] + " " + + breweryNouns[(nameHash / 7) % breweryNouns.size()]; + result.description = descriptions[(nameHash / 13) % descriptions.size()]; + + return result; +} diff --git a/pipeline/src/json_loader.cpp b/pipeline/src/json_loader.cpp new file mode 100644 index 0000000..70faffc --- /dev/null +++ b/pipeline/src/json_loader.cpp @@ -0,0 +1,222 @@ +#include "json_loader.h" +#include +#include +#include +#include +#include + +/** + * @brief Loads world geographic data from JSON file into SQLite database + * + * This function implements a hierarchical multithreaded loading strategy: + * + * THREADING ARCHITECTURE: + * ┌─────────────────────────────────────────────────────────────────┐ + * │ Main Thread: Parse JSON (45 MB) │ + * └────────────────────┬────────────────────────────────────────────┘ + * │ + * ┌─────────────┴──────────────┬──────────────┐ + * ▼ ▼ ▼ + * Country Thread 0 Country Thread 1 ... Thread N + * ├─ Insert Country ├─ Insert Country └─ Insert Country + * │ + * ├─ State Thread A ├─ State Thread C + * │ ├─ Insert State │ ├─ Insert State + * │ ├─ Insert 100 cities │ └─ Insert 150 cities + * │ └─ +stats └─ +stats + * │ + * └─ State Thread B + * ├─ Insert State + * ├─ Insert 200 cities + * └─ +stats + * + * THREADING DETAILS: + * - Countries loop: divided among CPU_CORE_COUNT threads + * - Each country: states processed in dedicated threads (nested parallelism) + * - Each state: cities inserted sequentially (within thread) + * - All writes protected by mutex in SqliteDatabase + * - Processing stats (city count) synchronized with mutex + * + * INPUT JSON STRUCTURE: + * The JSON file contains three main arrays: + * + * 1. Countries (~250 records): + * { id: int, name: string, iso2: string, iso3: string } + * + * 2. States/Provinces (~3500 records): + * { id: int, country_id: int, name: string, iso2: string } + * + * 3. Cities (~50000 records): + * { id: int, state_id: int, country_id: int, name: string, + * latitude: double, longitude: double } + * + * PERFORMANCE: + * - JSON parsing: Single-threaded, happens once at start + * - Country insertion: Parallelized across CPU cores + * - State insertion: Parallelized within each country via nested threads + * - City insertion: Sequential within each state (reduces serialization) + * - Total expected runtime: 2-5 seconds for 50k cities on modern CPU + * + * ERROR HANDLING: + * - Missing JSON file: throws std::runtime_error + * - Invalid JSON: throws nlohmann::json::parse_error + * - Bad city records: silently skipped (try-catch within loop) + * - Database errors: re-thrown from db.Insert*() calls + * + * STATISTICS: + * Prints progress messages showing: + * - Number of countries loaded + * - Number of worker threads created + * - Total cities inserted into database + * + * @param jsonPath Path to JSON file (typically: ../data/world_city_data.json) + * @param db Reference to initialized SqliteDatabase to populate + */ +void JsonLoader::LoadWorldCities(const std::string &jsonPath, + SqliteDatabase &db) { + std::cout << "\nLoading " << jsonPath << " (45 MB)...\n"; + + // Open and read JSON file from disk + std::ifstream jsonFile(jsonPath); + if (!jsonFile.is_open()) { + throw std::runtime_error("Failed to open JSON file: " + jsonPath); + } + + // Parse entire JSON into memory (nlohmann/json library) + json data; + try { + jsonFile >> data; + } catch (const std::exception &e) { + throw std::runtime_error("JSON parse error: " + std::string(e.what())); + } + jsonFile.close(); + + // DEBUG: Check JSON structure + if (!data.is_array()) { + std::cerr << "[DEBUG] JSON root is not an array. Type: " << data.type_name() + << std::endl; + if (data.is_object()) { + std::cerr << "[DEBUG] JSON root is object with keys: "; + for (auto &[key, val] : data.items()) { + std::cerr << key << " "; + } + std::cerr << std::endl; + } + } + + std::cout << "✓ Loaded " << data.size() + << " records (expecting countries array)\n"; + + if (data.size() == 0) { + throw std::runtime_error("JSON file appears to be empty or malformed. " + "Check download succeeded."); + } + + std::cout << "Processing countries with multithreading...\n"; + + // Determine optimal thread count based on CPU cores + unsigned int numThreads = std::thread::hardware_concurrency(); + std::cout << " Using " << numThreads << " threads\n\n"; + + // Shared counter for statistics (protected by mutex) + int processedCities = 0; + std::mutex statsMutex; + + // Spawn threads to process countries in parallel + std::vector countryThreads; + const size_t countriesPerThread = (data.size() + numThreads - 1) / numThreads; + + for (size_t t = 0; t < numThreads; ++t) { + countryThreads.push_back(std::thread([&, t]() { + // Each thread processes a range of countries + size_t start = t * countriesPerThread; + size_t end = std::min((t + 1) * countriesPerThread, data.size()); + + for (size_t i = start; i < end; ++i) { + const auto &country = data[i]; + int countryId = country["id"]; + std::string countryName = country["name"]; + std::string iso2 = country.value("iso2", ""); + std::string iso3 = country.value("iso3", ""); + + // Insert country record + db.InsertCountry(countryId, countryName, iso2, iso3); + + // Process states within this country + if (country.contains("states") && country["states"].is_array()) { + const auto &states = country["states"]; + + // Spawn threads to process states in parallel + // This creates nested parallelism: country threads spawn state + // threads + std::vector stateThreads; + + for (size_t s = 0; s < states.size(); ++s) { + stateThreads.push_back(std::thread([&, s, countryId]() { + const auto &state = states[s]; + int stateId = state["id"]; + std::string stateName = state["name"]; + std::string stateIso2 = state.value("iso2", ""); + + // Insert state record + db.InsertState(stateId, countryId, stateName, stateIso2); + + // Process cities for this state + if (state.contains("cities") && state["cities"].is_array()) { + // Cities within a state are processed sequentially + // (within the state thread - reduces context switching) + for (const auto &city : state["cities"]) { + try { + int cityId = city["id"].get(); + std::string cityName = city["name"]; + + // Parse latitude and longitude as strings first (they're + // stored as strings in JSON), then convert to double + double lat = 0.0; + double lng = 0.0; + if (city.contains("latitude")) { + lat = std::stod(city["latitude"].get()); + } + if (city.contains("longitude")) { + lng = std::stod(city["longitude"].get()); + } + + // Insert city record to database + // Database has mutex protection for thread-safe access + db.InsertCity(cityId, stateId, countryId, cityName, lat, + lng); + + // Update shared statistics counter (protected by mutex) + { + std::lock_guard lock(statsMutex); + processedCities++; + } + } catch (const std::exception &e) { + // Silently skip malformed city entries + // Example: missing required fields, invalid coordinates + } + } + } + })); + } + + // Wait for all state threads to complete + // Important: don't proceed to next country until states are done + for (auto &t : stateThreads) { + if (t.joinable()) + t.join(); + } + } + } + })); + } + + // Wait for all country threads to complete + // This blocks until all nested state/city insertions are done + for (auto &t : countryThreads) { + if (t.joinable()) + t.join(); + } + + std::cout << "✓ Loaded " << processedCities << " cities into database\n\n"; +} diff --git a/pipeline/src/main.cpp b/pipeline/src/main.cpp new file mode 100644 index 0000000..b288ec0 --- /dev/null +++ b/pipeline/src/main.cpp @@ -0,0 +1,154 @@ +/** + * @file main.cpp + * @brief Entry point for the brewery data pipeline + * + * Pipeline Overview: + * This is the main data processing pipeline that: + * 1. Initializes an in-memory SQLite database + * 2. Loads world city data from a JSON file (50k+ cities) + * 3. Initializes the brewery generation system (currently mocked) + * 4. Demonstrates brewery generation for sample cities + * + * Architecture: + * ┌─────────────┐ + * │ JSON File │ (world_city_data.json - 50k+ cities) + * └──────┬──────┘ + * │ + * ▼ + * ┌─────────────────────┐ + * │ JsonLoader::Load │ Parse and validate JSON + * └──────┬──────────────┘ + * │ + * ▼ + * ┌─────────────────────┐ + * │ SQLite Database │ Store cities in-memory + * └──────┬──────────────┘ + * │ + * ▼ + * ┌─────────────────────┐ + * │ BreweryGenerator │ Mock generation (hash-based) + * │ .GenerateBrewery() │ Future: LLM-based generation + * └─────────────────────┘ + * + * Command Line Arguments: + * - argv[1]: Path to GGUF model file (default: ./model.gguf) + * - argv[2]: Path to cache directory for JSON downloads (default: /tmp) + * - argv[3]: Git commit hash for reproducible data version (default: c5eb7772) + * + * The pipeline automatically downloads the geographic data from GitHub on first + * run and caches it locally to avoid repeated network calls. + * + * Example Usage - Auto-download (stable 2026-03-28 build): + * @code + * ./brewery-pipeline ./llama-7b.gguf + * @endcode + * + * Example Usage - Custom commit: + * @code + * ./brewery-pipeline ./llama-7b.gguf /tmp main + * @endcode + * + * Exit Codes: + * - 0: Pipeline completed successfully + * - 1: Pipeline failed (exception caught) + */ + +#include "data_downloader.h" +#include "database.h" +#include "generator.h" +#include "json_loader.h" +#include +#include + +int main(int argc, char *argv[]) { + try { + // Initialize libcurl globally (thread-safe mode) + curl_global_init(CURL_GLOBAL_DEFAULT); + + // Parse command-line arguments + std::string modelPath = argc > 1 ? argv[1] : "./model.gguf"; + std::string cacheDir = argc > 2 ? argv[2] : "/tmp"; + std::string commit = + argc > 3 ? argv[3] : "c5eb7772"; // Default: stable 2026-03-28 + + // Construct cache path for downloaded JSON + std::string jsonPath = cacheDir + "/countries+states+cities.json"; + + // Step 0: Download geographic data from GitHub (cached locally) + // On first run, downloads 45MB JSON. On subsequent runs, uses cached file. + // Commit hash allows pinning to specific data versions for reproducibility. + std::cout << "\n[Pipeline] Downloading geographic data from GitHub...\n"; + DataDownloader downloader; + downloader.DownloadCountriesDatabase(jsonPath, commit); + + SqliteDatabase db; + + // Step 1: Initialize empty in-memory database + std::cout << "Initializing in-memory SQLite database...\n"; + db.Initialize(); + + // Step 2: Load world city data from JSON file + // This populates the database with ~50k city records + // Each record includes: city name, country, latitude, longitude, population + JsonLoader::LoadWorldCities(jsonPath, db); + + // Step 3: Initialize brewery generator + // Current: Mock implementation using deterministic hashing + // Future: LLM-based generation with llama.cpp + std::cout << "Initializing brewery generator...\n"; + LlamaBreweryGenerator generator; + generator.LoadModel(modelPath); + + // Step 4: Query geographic data from database + std::cout << "\n=== GEOGRAPHIC DATA OVERVIEW ===\n"; + + auto countries = db.QueryCountries(50); + auto states = db.QueryStates(50); + auto cities = db.QueryCities(); + + std::cout << "\nTotal records loaded:"; + std::cout << "\n Countries: " << db.QueryCountries(0).size(); + std::cout << "\n States: " << db.QueryStates(0).size(); + std::cout << "\n Cities: " << cities.size() << "\n"; + + // Display 50 countries + std::cout << "\n--- 50 COUNTRIES ---\n"; + for (size_t i = 0; i < countries.size(); i++) { + std::cout << (i + 1) << ". " << countries[i].iso2 << " (" + << countries[i].iso3 << ") " << countries[i].name << "\n"; + } + + // Display 50 states + std::cout << "\n--- 50 STATES ---\n"; + for (size_t i = 0; i < states.size(); i++) { + std::cout << (i + 1) << ". " << states[i].iso2 << ": " << states[i].name + << "\n"; + } + + // Display 50 cities + std::cout << "\n--- 50 CITIES ---\n"; + for (size_t i = 0; i < std::min(size_t(50), cities.size()); i++) { + std::cout << (i + 1) << ". " << cities[i].second << "\n"; + } + + // Step 5: Demonstrate brewery generation on sample cities + std::cout << "\n=== SAMPLE BREWERY GENERATION ===\n\n"; + for (size_t i = 0; i < std::min(size_t(5), cities.size()); i++) { + const auto &[cityId, cityName] = cities[i]; + auto brewery = generator.GenerateBrewery(cityName, i); + std::cout << " " << cityName << ": " << brewery.name << "\n"; + std::cout << " → " << brewery.description << "\n"; + } + + std::cout << "\n✓ Pipeline completed successfully\n"; + + // Cleanup + curl_global_cleanup(); + return 0; + + } catch (const std::exception &e) { + std::cerr << "✗ Pipeline failed: " << e.what() << "\n"; + curl_global_cleanup(); + return 1; + } +}