Chat about this codebase

AI-powered code exploration

Online

Project Overview

ETL_C- delivers a configurable, high-performance pipeline for ingesting JSON data into PostgreSQL. It streamlines end-to-end ETL workflows by decoupling schema definitions and load order into JSON configuration files, while providing built-in performance metrics and multiple parsing strategies.

Why This Project Exists

  • Automate bulk JSON→PostgreSQL ingestion without hand-crafted SQL
  • Enable rapid schema mapping changes via declarative JSON configs
  • Benchmark and compare JSON parsers for optimal throughput

Primary Goals

  • Offer a single executable (main.cpp) that loads:
    • Extraction configuration (data sources, parsers, performance mode)
    • Schema and relationship definitions
    • Database connection and load order
  • Create or truncate tables, enforce foreign-key relationships, and insert transformed data
  • Report timing metrics for each ETL stage

When to Use

  • Loading large or nested JSON datasets into PostgreSQL
  • Iterating on schema designs without recompiling C++
  • Evaluating SIMDJSON vs. nlohmann::json parsing performance
  • Incorporating an out-of-the-box ETL tool into CI/CD pipelines

Key Features

  • JSON→PostgreSQL ingestion with:
    • nlohmann::json (default)
    • SIMDJSON “performance mode” for ultra-fast parsing
  • Declarative schema mapping:
    • Define tables, columns, types, and relationships in JSON
    • Control load order and optional table truncation
  • Automatic DDL generation and relationship enforcement
  • Detailed performance metrics per stage (extract, transform, load)
  • Dependency management via vcpkg (PostgreSQL client, JSON libraries)

Licensing

Released under the MIT License—use, modify, and distribute with no warranties.

Getting Started

This guide shows how to compile and run your first ETL job in minutes. You’ll install dependencies via vcpkg, build with CMake, prepare PostgreSQL, and execute the sample pipeline using the bundled JSON data.

1. Prerequisites

  • Git
  • CMake ≥ 3.15
  • A C++17 compiler (GCC, Clang or MSVC)
  • PostgreSQL server
  • vcpkg (with your project’s vcpkg-configuration.json in place)

2. Clone and Bootstrap vcpkg

# Clone the ETL project
git clone https://github.com/PerezO12/ETL_C-.git
cd ETL_C-

# Bootstrap vcpkg (if you haven't already)
git clone https://github.com/microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh   # or .\bootstrap-vcpkg.bat on Windows
cd ..

3. Install Dependencies via vcpkg

Create a vcpkg.json manifest in the project root (next to vcpkg-configuration.json):

{
  "name": "etl-project",
  "version-string": "1.0.0",
  "dependencies": [
    "nlohmann-json",
    "postgresql",
    "simdjson"
  ]
}

Then install:

./vcpkg/vcpkg install --triplet x64-linux
# or
./vcpkg/vcpkg install --triplet x64-windows

4. Configure and Build with CMake

mkdir build && cd build
cmake .. \
  -DCMAKE_TOOLCHAIN_FILE=../vcpkg/scripts/buildsystems/vcpkg.cmake \
  -DCMAKE_BUILD_TYPE=Release
cmake --build . --config Release

This produces the etl_processor executable.

5. Prepare PostgreSQL

  1. Start your PostgreSQL server.

  2. Create a database and user:

    CREATE USER etl_user WITH PASSWORD 'etl_pass';
    CREATE DATABASE etl_db OWNER etl_user;
    
  3. Update config/config.json under the database section to match your credentials:

    {
      "database": {
        "host": "localhost",
        "port": 5432,
        "dbname": "etl_db",
        "user": "etl_user",
        "password": "etl_pass"
      },
      ...
    }
    

6. Execute the Sample Pipeline

Run the ETL processor pointing at the configuration and one of the sample data files:

# Using datos.json
./etl_processor ../config/config.json ../data/datos.json

# Or using datos2.json
./etl_processor ../config/config.json ../data/datos2.json

Upon completion you’ll see timing metrics for JSON extraction (nlohmann/json vs. simdjson) and confirmation of table creation and row inserts.

7. Verify Results

Connect with psql or any SQL client:

psql -h localhost -U etl_user -d etl_db

Run a simple query:

SELECT COUNT(*) FROM estudiantes;
SELECT * FROM asignaturas LIMIT 5;

You should see the transformed records loaded into your tables.

Next Steps

  • Explore config/config.json to adjust table mappings, relationships and transformations.
  • Extend src/main.cpp with custom extraction logic or new data sources.
  • Use --truncate or --no-relations flags (see README) to control the load process for testing.

Configuration Reference

This section details every field in config/config.json, guiding database setup, data loading, schema definitions, relationships, and ETL transformations.

database

Defines PostgreSQL connection parameters.

  • host (string): Database server host
  • port (string|number): TCP port (e.g. 5432)
  • dbname (string): Name of the database
  • user (string): Username for authentication
  • password (string): Password for authentication

Example

"database": {
  "host": "localhost",
  "port": 5432,
  "dbname": "university",
  "user": "etl_user",
  "password": "s3cret"
}

dataSources

Maps logical source names to input files or endpoints.

  • <sourceName> (object)
    • path (string): File system path or URL
    • format (string): Data format ("json")

Example

"dataSources": {
  "alumnos": {
    "path": "data/alumnos.json",
    "format": "json"
  },
  "profesores": {
    "path": "data/profesores.json",
    "format": "json"
  }
}

globalOptions

Controls load behavior.

  • loadOrder (array[string]): Sequence of table names to load (parents before children)
  • truncateBeforeLoad (boolean): If true, issues TRUNCATE … RESTART IDENTITY CASCADE before inserts

Example

"globalOptions": {
  "loadOrder": ["profesores","asignaturas","alumnos"],
  "truncateBeforeLoad": true
}

tables

Defines table schemas and column mappings.

Each entry key is a table name; value is its schema object:

  • generatedPK (boolean, default true): Add id SERIAL PRIMARY KEY if missing
  • naturalKey (array[string], optional): Column(s) for lookup by natural key
  • columns (array[object]): List of columns:
    • name (string): Column name
    • type (string): SQL type (e.g. "TEXT", "INT")
    • unique (boolean, optional): Add UNIQUE constraint
    • references (object, optional):
      • table (string): Referenced table
      • column (string): Referenced column

Example

"tables": {
  "profesores": {
    "generatedPK": true,
    "naturalKey": ["email"],
    "columns": [
      { "name": "nombre",    "type": "TEXT" },
      { "name": "email",     "type": "TEXT", "unique": true }
    ]
  },
  "asignaturas": {
    "columns": [
      { "name": "titulo",       "type": "TEXT" },
      { "name": "profesor_id",  "type": "INT", "references": { "table": "profesores", "column": "id" } }
    ]
  }
}

relationships

Defines many‐to‐many junctions between tables.

  • type (string): Must be "MANY_TO_MANY"
  • left (string): Left table name
  • right (string): Right table name
  • table (string): Junction table name

Example

"relationships": [
  {
    "type": "MANY_TO_MANY",
    "left": "alumnos",
    "right": "asignaturas",
    "table": "alumnos_asignaturas"
  }
]

transformations

Defines reusable string‐based ETL operations applied at runtime.

Each key is a transform name; value specifies:

  • input (string): Expected input type ("string")
  • output (string|array[string]): Output field(s) or type
  • logic (string): Identifier for the built‐in logic branch in applyTransformation

Example

"transformations": {
  "split_fullname": {
    "input": "string",
    "output": ["nombre","apellido"],
    "logic": "split_by_space"
  },
  "trim_uppercase": {
    "input": "string",
    "output": "string",
    "logic": "trim && uppercase"
  }
}

Accessing Configuration in Code

Load and navigate the configuration via ConfigLoader:

#include "services/ConfigLoader.hpp"
using nlohmann::json;

int main() {
    json cfg = ConfigLoader::load("config/config.json");

    // Database config
    auto db = cfg["database"];
    std::string host = db["host"].get<std::string>();

    // Load order
    auto loadOrder = cfg["globalOptions"]["loadOrder"]
        .get<std::vector<std::string>>();

    // Table schemas
    for (auto& [tableName, tblCfg] : cfg["tables"].items()) {
        bool genPK = tblCfg.value("generatedPK", true);
        // ...
    }

    // Transformations
    json transforms = cfg["transformations"];
}

Use these fields with SchemaManager, InsertValues, and applyTransformation to drive schema creation, data insertion, and ETL logic.

ETL Workflow & Core Concepts

This section outlines how data flows through the three ETL stages—Extraction, Transformation, Loading—using the core utilities, simdjson parsing, transformation rules, and batch database inserts. Each stage highlights extension points and performance tips.

1. Extraction

Extract raw JSON payloads and pull record arrays or scalar fields.

Key Functions

  • json extractData(const std::string& filePath)
    Uses nlohmann::json to parse a file into a json object.
  • simdjson::dom::element extractDataSimDjson(const std::string& filePath)
    High-performance parsing for large files.
  • std::vector<json> getJsonRecords(const json& data, const std::string& sourcePath, const std::string& rootPath = "")
    Recursively drills into dot-delimited paths to return array elements.
  • std::string getValueFromJson(const json& record, const std::string& jsonPath)
    Retrieves a scalar field as a string; empty if missing.

Example: Extract Orders

#include "etl/Extraction.hpp"
#include <iostream>

int main() {
  // Load JSON with simdjson for large file
  auto dom = extractDataSimDjson("data/orders.json");
  // Fallback to nlohmann::json for small config
  json config = extractData("config/etl_config.json");

  // Pull out an array at data.orders.items
  auto items = getJsonRecords(
    config,                    // using nlohmann::json here
    "orders.items",            // target array
    "data"                     // nested under top-level "data"
  );

  for (auto& item : items) {
    std::string id   = getValueFromJson(item, "id");
    std::string name = getValueFromJson(item, "name");
    std::cout << "Item " << id << ": " << name << "\n";
  }
}

Practical Tips

  • Use extractDataSimDjson for multi-GB files; fallback to extractData for configs.
  • Provide rootPath when your JSON nests payload under a fixed key.
  • Always check the returned vector before proceeding.

2. Transformation

Apply field-level mutations driven by a JSON configuration.

Key Functions

  • void applyTransformation(const std::string& value, const std::string& transformName, const json& transformsConfig)
    Handles single string transforms (split_by_space, trim_uppercase).
  • void applyTransformations(json& record, const json& transformsConfig)
    Iterates over a record’s fields, applies mapped transformations.

Example: Normalize Records

#include "etl/Transformation.hpp"
#include "etl/Extraction.hpp"
#include <iostream>

int main() {
  // Load raw records
  json raw = extractData("data/users.json");
  auto users = getJsonRecords(raw, "users");

  // Transformation rules
  json transforms = extractData("config/transforms.json");
  // transforms = {
  //   { "username_trim",   { {"logic","trim_uppercase"} } },
  //   { "fullname_split",  { {"logic","split_by_space"} } }
  // };

  for (auto& user : users) {
    // Mutates in place
    applyTransformations(user, transforms);
    std::cout << user.dump() << "\n";
  }
}

Practical Tips

  • Map field names to transform names in your ETL config.
  • Extend by adding else if (logic=="your_logic") in Transformation.cpp and update JSON.

3. Loading

Insert transformed records into PostgreSQL with batch operations and relationship handling.

Key Classes & Methods

  • QueryExecutor
    Wraps a PGconn*, exposes executeQuery, lookupId, selectQuery.
  • InsertValues
    • batchInsert(const std::string& table, const std::vector<json>& records, const json& tableConfig)
      Performs parameterized batch INSERT inside a transaction.
    • processRelationships(const json& relConfig)
      Establishes many-to-many links after inserts.

Example: Full ETL Pipeline

#include "etl/Extraction.hpp"
#include "etl/Transformation.hpp"
#include "database/QueryExecutor.hpp"
#include "database/InsertValues.hpp"
#include <libpq-fe.h>
#include <iostream>

int main() {
  // 1. DB setup
  PGconn* conn = PQconnectdb("dbname=mydb user=me");
  QueryExecutor executor(conn);
  InsertValues loader(executor);

  // 2. Extraction
  json raw = extractData("data/products.json");
  auto records = getJsonRecords(raw, "products");

  // 3. Transformation
  json transforms = extractData("config/product_transforms.json");
  for (auto& rec : records) {
    applyTransformations(rec, transforms);
  }

  // 4. Loading
  json tableConfig = extractData("config/product_table.json");
  // tableConfig example:
  // { "batchSize": 500,
  //   "columns": [ { "name":"id","jsonPath":"$.id","type":"int" },
  //                { "name":"data","jsonPath":"$.info","type":"jsonb" } ] }

  try {
    loader.batchInsert("products", records, tableConfig);
    // Optional: process many-to-many relationships
    json relConfig = extractData("config/product_category_rel.json");
    loader.processRelationships(relConfig);
    std::cout << "ETL completed successfully\n";
  } catch (std::exception& e) {
    std::cerr << "ETL failed: " << e.what() << "\n";
    return 1;
  }
  return 0;
}

Practical Tips

  • Cache foreign-key lookups (via lookupId) in your own map to reduce queries.
  • Tune batchSize for optimal memory vs. network overhead.
  • Wrap the entire load in transactions; on exception, batchInsert auto-rolls back.

Extension & Optimization

  • Custom Extraction: Extend Extraction.cpp to support XML or CSV parsing alongside JSON.
  • New Transforms: Add logic blocks in Transformation.cpp and expose in your JSON spec.
  • Parallel Loading: Partition records by shard and run concurrent batchInsert calls on separate connections.
  • Utility Helpers: Leverage Utility.hpp for string joins, splits, and key–value map creation during transformation or lookup phases.

This workflow provides a modular ETL scaffold—swap in new parsers, enrich transformation logic, and scale loading by tuning batch sizes and caching. Use these core concepts to build robust, high-throughput pipelines.

Development & Contribution Guide

This guide helps contributors set up the project, follow coding standards, extend transformation logic, write tests, and submit pull requests.

1. Prerequisites & Setup

  1. Clone the repo
    git clone https://github.com/PerezO12/ETL_C-.git
    cd ETL_C-

  2. Bootstrap vcpkg and install dependencies

    ./vcpkg/bootstrap-vcpkg.sh
    ./vcpkg/vcpkg install nlohmann-json simdjson libpqxx libpq
    
  3. Configure and build with CMake

    mkdir build && cd build
    cmake .. \
      -DCMAKE_TOOLCHAIN_FILE=../vcpkg/scripts/buildsystems/vcpkg.cmake \
      -DCMAKE_BUILD_TYPE=Release
    cmake --build . --parallel
    
  4. Verify executables

    ./bin/etl_processor --help
    

2. Project Layout

├── CMakeLists.txt # Top-level build config
├── vcpkg-configuration.json
├── include/ # Public headers
│ └── etl/
├── src/ # Core implementation
│ ├── Transformation.cpp
│ └── …
├── tests/ # Unit tests
│ └── TransformationTests.cpp
└── tools/ # Scripts, CI configs

3. Coding Standards

  • Language: C++17
  • Formatting:
    • 2-space indent
    snake_case for files and functions
    PascalCase for types/classes
  • Headers: include guards or #pragma once
  • Error Handling: use exceptions (e.g., std::runtime_error)
  • Dependencies: reference via vcpkg; include <nlohmann/json.hpp>, <simdjson.h>, <pqxx/pqxx>
  • Commits: follow Conventional Commits

4. Extending Transformation Functions

All transformations live in src/Transformation.cpp and are driven by JSON configs.

Example: Add a reverse_string transform

  1. In Transformation.cpp, update applyTransformation:

    if (logic == "reverse_string") {
      std::string out = value;
      std::reverse(out.begin(), out.end());
      return out;
    }
    // existing branches...
    
  2. Update JSON config (transformations.json):

    {
      "reverse_string": { "logic": "reverse_string" }
    }
    
  3. Reference it in field-ops map:

    {
      "username": ["trim_uppercase", "reverse_string"]
    }
    
  4. Apply in code:

    json transformDefs = /* load from transformations.json */;
    json fieldOps      = /* load field-ops config */;
    applyTransformations(record, transformDefs);
    applyTransformations(record, fieldOps);
    

5. Writing Tests

We use Catch2 (installed via vcpkg).

  1. Install Catch2:

    ./vcpkg/vcpkg install catch2
    
  2. Add test target in top‐level CMakeLists.txt:

    enable_testing()
    find_package(Catch2 CONFIG REQUIRED)
    add_executable(unit_tests
      tests/TransformationTests.cpp
    )
    target_link_libraries(unit_tests PRIVATE etl_core Catch2::Catch2)
    add_test(NAME TransformationTests COMMAND unit_tests)
    
  3. Example test (tests/TransformationTests.cpp):

    #define CATCH_CONFIG_MAIN
    #include <catch2/catch.hpp>
    #include "Transformation.hpp"
    using nlohmann::json;
    
    TEST_CASE("trim_uppercase works", "[transform]") {
      REQUIRE(
        applyTransformation("  abc ", "trim_uppercase",
                            json{{"logic","trim_uppercase"}})
        == "ABC"
      );
    }
    
  4. Run tests:

    cd build
    ctest --output-on-failure
    

6. Submitting Pull Requests

  1. Fork the repo and create a feature branch:

    git checkout -b feat/add-reverse-transform
    
  2. Implement code, add tests, update docs.

  3. Run full build and tests locally.

  4. Commit with a clear message:

    feat(transform): add reverse_string transformation
    
  5. Push and open a PR against main
    • Reference any related issue
    • Ensure CI (build + ctest) passes
    • Request reviews from maintainer team

Thank you for contributing!