From 248a51b35f1ad6a29a6735eb3a906ad2c7699a59 Mon Sep 17 00:00:00 2001 From: Aaron Po Date: Wed, 1 Apr 2026 21:35:02 -0400 Subject: [PATCH] cleanup --- pipeline/includes/database.h | 10 ++++-- pipeline/src/data_downloader.cpp | 5 ++- pipeline/src/database.cpp | 29 ++++++++++++++--- pipeline/src/json_loader.cpp | 55 ++++++-------------------------- pipeline/src/main.cpp | 25 +++++++++++---- pipeline/src/stream_parser.cpp | 29 ++++++++++------- 6 files changed, 82 insertions(+), 71 deletions(-) diff --git a/pipeline/includes/database.h b/pipeline/includes/database.h index d3648c3..df94d4c 100644 --- a/pipeline/includes/database.h +++ b/pipeline/includes/database.h @@ -39,8 +39,14 @@ public: /// @brief Closes the SQLite connection if initialized. ~SqliteDatabase(); - /// @brief Opens the in-memory database and creates schema objects. - void Initialize(); + /// @brief Opens the SQLite database at dbPath and creates schema objects. + void Initialize(const std::string &dbPath = ":memory:"); + + /// @brief Starts a database transaction for batched writes. + void BeginTransaction(); + + /// @brief Commits the active database transaction. + void CommitTransaction(); /// @brief Inserts a country row. void InsertCountry(int id, const std::string &name, const std::string &iso2, diff --git a/pipeline/src/data_downloader.cpp b/pipeline/src/data_downloader.cpp index d8f271a..9be4508 100644 --- a/pipeline/src/data_downloader.cpp +++ b/pipeline/src/data_downloader.cpp @@ -1,10 +1,10 @@ #include "data_downloader.h" #include #include +#include #include #include #include -#include static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp) { @@ -19,8 +19,7 @@ DataDownloader::DataDownloader() {} DataDownloader::~DataDownloader() {} bool DataDownloader::FileExists(const std::string &filePath) const { - struct stat buffer; - return (stat(filePath.c_str(), &buffer) == 0); + return std::filesystem::exists(filePath); } std::string diff --git a/pipeline/src/database.cpp b/pipeline/src/database.cpp index 2a18bb9..2748ccb 100644 --- a/pipeline/src/database.cpp +++ b/pipeline/src/database.cpp @@ -48,15 +48,36 @@ SqliteDatabase::~SqliteDatabase() { } } -void SqliteDatabase::Initialize() { - int rc = sqlite3_open(":memory:", &db); +void SqliteDatabase::Initialize(const std::string &dbPath) { + int rc = sqlite3_open(dbPath.c_str(), &db); if (rc) { - throw std::runtime_error("Failed to create in-memory SQLite database"); + throw std::runtime_error("Failed to open SQLite database: " + dbPath); } - spdlog::info("OK: In-memory SQLite database created"); + spdlog::info("OK: SQLite database opened: {}", dbPath); InitializeSchema(); } +void SqliteDatabase::BeginTransaction() { + std::lock_guard lock(dbMutex); + char *err = nullptr; + if (sqlite3_exec(db, "BEGIN TRANSACTION", nullptr, nullptr, &err) != + SQLITE_OK) { + std::string msg = err ? err : "unknown"; + sqlite3_free(err); + throw std::runtime_error("BeginTransaction failed: " + msg); + } +} + +void SqliteDatabase::CommitTransaction() { + std::lock_guard lock(dbMutex); + char *err = nullptr; + if (sqlite3_exec(db, "COMMIT", nullptr, nullptr, &err) != SQLITE_OK) { + std::string msg = err ? err : "unknown"; + sqlite3_free(err); + throw std::runtime_error("CommitTransaction failed: " + msg); + } +} + void SqliteDatabase::InsertCountry(int id, const std::string &name, const std::string &iso2, const std::string &iso3) { diff --git a/pipeline/src/json_loader.cpp b/pipeline/src/json_loader.cpp index 929e08a..1d27176 100644 --- a/pipeline/src/json_loader.cpp +++ b/pipeline/src/json_loader.cpp @@ -1,65 +1,32 @@ #include "json_loader.h" #include "stream_parser.h" -#include "work_queue.h" -#include #include #include -#include -#include void JsonLoader::LoadWorldCities(const std::string &jsonPath, SqliteDatabase &db) { auto startTime = std::chrono::high_resolution_clock::now(); - spdlog::info("\nLoading {} (streaming RapidJSON SAX + producer-consumer)...", - jsonPath); + spdlog::info("\nLoading {} (streaming RapidJSON SAX)...", jsonPath); - const unsigned int QUEUE_CAPACITY = 1000; - WorkQueue queue(QUEUE_CAPACITY); + db.BeginTransaction(); - spdlog::info("Creating worker thread pool..."); - - unsigned int numWorkers = std::thread::hardware_concurrency(); - if (numWorkers == 0) - numWorkers = 4; // Fallback if unavailable - spdlog::info(" Spawning {} worker threads", numWorkers); - - std::vector workers; - std::atomic citiesProcessed{0}; - - for (unsigned int i = 0; i < numWorkers; ++i) { - workers.push_back(std::thread([&]() { - unsigned long localCount = 0; - while (auto record = queue.pop()) { - db.InsertCity(record->id, record->state_id, record->country_id, - record->name, record->latitude, record->longitude); - localCount++; - } - citiesProcessed += localCount; - })); - } - - spdlog::info("Streaming cities into worker queue..."); - - unsigned long totalCities = 0; + size_t citiesProcessed = 0; StreamingJsonParser::Parse( - jsonPath, db, [&](const CityRecord &record) { queue.push(record); }, + jsonPath, db, + [&](const CityRecord &record) { + db.InsertCity(record.id, record.state_id, record.country_id, + record.name, record.latitude, record.longitude); + citiesProcessed++; + }, [&](size_t current, size_t total) { if (current % 10000 == 0 && current > 0) { spdlog::info(" [Progress] Parsed {} cities...", current); } - totalCities = current; }); spdlog::info(" OK: Parsed all cities from JSON"); - queue.shutdown_queue(); - - spdlog::info("Waiting for worker threads to complete..."); - for (auto &worker : workers) { - if (worker.joinable()) { - worker.join(); - } - } + db.CommitTransaction(); auto endTime = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast( @@ -74,7 +41,5 @@ void JsonLoader::LoadWorldCities(const std::string &jsonPath, static_cast(duration.count()) : 0LL; spdlog::info("Throughput: {} cities/sec", throughput); - spdlog::info("Worker pool: {} threads", numWorkers); - spdlog::info("Queue capacity: {}", QUEUE_CAPACITY); spdlog::info("=======================================\n"); } diff --git a/pipeline/src/main.cpp b/pipeline/src/main.cpp index 2376d1d..287dc26 100644 --- a/pipeline/src/main.cpp +++ b/pipeline/src/main.cpp @@ -3,8 +3,13 @@ #include "generator.h" #include "json_loader.h" #include +#include #include +static bool FileExists(const std::string &filePath) { + return std::filesystem::exists(filePath); +} + int main(int argc, char *argv[]) { try { curl_global_init(CURL_GLOBAL_DEFAULT); @@ -15,17 +20,25 @@ int main(int argc, char *argv[]) { argc > 3 ? argv[3] : "c5eb7772"; // Default: stable 2026-03-28 std::string jsonPath = cacheDir + "/countries+states+cities.json"; + std::string dbPath = cacheDir + "/biergarten-pipeline.db"; - spdlog::info("\n[Pipeline] Downloading geographic data from GitHub..."); - DataDownloader downloader; - downloader.DownloadCountriesDatabase(jsonPath, commit); + bool hasJsonCache = FileExists(jsonPath); + bool hasDbCache = FileExists(dbPath); SqliteDatabase db; - spdlog::info("Initializing in-memory SQLite database..."); - db.Initialize(); + spdlog::info("Initializing SQLite database at {}...", dbPath); + db.Initialize(dbPath); - JsonLoader::LoadWorldCities(jsonPath, db); + if (hasDbCache && hasJsonCache) { + spdlog::info("[Pipeline] Cache hit: skipping download and parse"); + } else { + spdlog::info("\n[Pipeline] Downloading geographic data from GitHub..."); + DataDownloader downloader; + downloader.DownloadCountriesDatabase(jsonPath, commit); + + JsonLoader::LoadWorldCities(jsonPath, db); + } spdlog::info("Initializing brewery generator..."); LlamaBreweryGenerator generator; diff --git a/pipeline/src/stream_parser.cpp b/pipeline/src/stream_parser.cpp index c5579a0..432ea50 100644 --- a/pipeline/src/stream_parser.cpp +++ b/pipeline/src/stream_parser.cpp @@ -1,10 +1,10 @@ #include "stream_parser.h" #include "database.h" -#include +#include +#include #include #include #include -#include using namespace rapidjson; @@ -196,32 +196,39 @@ void StreamingJsonParser::Parse( spdlog::info(" Streaming parse of {}...", filePath); - std::ifstream file(filePath, std::ios::binary); - if (!file.is_open()) { + FILE *file = std::fopen(filePath.c_str(), "rb"); + if (!file) { throw std::runtime_error("Failed to open JSON file: " + filePath); } - std::stringstream buffer; - buffer << file.rdbuf(); - file.close(); - std::string json_str = buffer.str(); - size_t total_size = json_str.length(); + size_t total_size = 0; + if (std::fseek(file, 0, SEEK_END) == 0) { + long file_size = std::ftell(file); + if (file_size > 0) { + total_size = static_cast(file_size); + } + std::rewind(file); + } CityRecordHandler::ParseContext ctx{&db, onCity, onProgress, 0, total_size, 0, 0}; CityRecordHandler handler(ctx); Reader reader; - StringStream ss(json_str.c_str()); + char buf[65536]; + FileReadStream frs(file, buf, sizeof(buf)); - if (!reader.Parse(ss, handler)) { + if (!reader.Parse(frs, handler)) { ParseErrorCode errCode = reader.GetParseErrorCode(); size_t errOffset = reader.GetErrorOffset(); + std::fclose(file); throw std::runtime_error(std::string("JSON parse error at offset ") + std::to_string(errOffset) + " (code: " + std::to_string(errCode) + ")"); } + std::fclose(file); + spdlog::info(" OK: Parsed {} countries, {} states, {} cities", ctx.countries_inserted, ctx.states_inserted, ctx.cities_emitted); }