This project implements a batch data ingestion pipeline that polls the Predictit API for US political odds data. The goal is to demonstrate a production-ready system using a modern data stack tooling that can be used to generate analysis and identify trends.
The project follows a microservices architecture pattern with code processing services deployed as AWS Lambda functions, raw data stored in S3 and processed data loaded into Snowflake and transformed for downstream analytics. The pipeline is orchestrated using Airflow into 2 DAGs for modularity: fetch and ingest, with fetch triggering ingest after successful completion.
AWS infrastructure is managed using Terraform and CI/CD is undertaken using GitHub Actions, ensuring reusable cloud infrastructure and robust code which is immediately deployed.
- 🐍 Python: API client, response validation & Lambda logic
- ☁️ AWS S3: Raw and validated data storage
- 🧬 AWS Lambda: Stateless functions for API fetching & validation
- ❄️ Snowflake: Data warehouse
- 🛠️ Terraform: Infrastructure as code
- 🔄 Airflow: Pipeline orchestration
- 🚀 GitHub Actions: CI/CD automation
predictit/
├── .github/workflows # GitHub Actions workflows
│ ├── deploy_lambdas.yml # Reuseable workflow for Lambdas continuous deployment
│ ├── test_and_deploy.yml # Main workflow to run tests and deploy to AWS
│ ├── test.yml # Reuseable workflow to check and format code and run tests
├── dags/ # Airflow DAGs for orchestration
│ ├── fetch.py # DAG to fetch data from PredictIt API
│ ├── ingest.py # DAG to ingest data into Snowflake
│ └── sql/ # Snowflake SQL scripts (DDL/COPY INTO/etc.)
│ # Note: SQL files have been placed here due to MWAA directory
│ structure requirements
│
├── docs/ # Documentation and diagrams
│ └── architecture-diagram.png # Pipeline architecture diagram
│ └── predictit_markets_erd.png # Date model entity relationship diagram
│
├── infrastructure/aws # Terraform modules
│ ├── iam/ # IAM roles and permissions
│ ├── lambdas/ # Lambda deployment resources
│ └── s3/ # S3 bucket
│
├── lambda_fetch/ # Fetch Lambda (PredictIt API ingestion)
│ └── __init__.py
│ ├── Dockerfile
│ ├── lambda_function.py
│ ├── requirements.txt
│
├── lambda_validate/ # Validate Lambda (schema/structure checks)
│ └── __init__.py
│ ├── Dockerfile
│ ├── lambda_function.py
│ ├── requirements.txt
│
├── src/ # Python source code
│ └── __init__.py
│ ├── api.py # API fetch and request logic
│ ├── validate.py # API response validation schema definitions
│
├── tests/ # Unit tests
│ └── __init__.py
│ ├── test_api.py
│ ├── test_lambda_fetch.py
│ ├── test_lambda_validate.py
│
├── main.tf # Root Terraform entry point
├── variables.tf # Terraform variables
├── requirements.txt # Python dependencies
└── README.md # Project documentation
lambda_fetchandlambda_validateare containerised AWS Lambda functions built with Docker and deployed via ECR repossrc/contains the Python source code for API fetching and validation logictests/contains unit tests for key modules built using pytestinfrastructure/contains Terraform modules to provision the AWS infrastructure.github/workflowscontains CI/CD pipelines tun tests, build and push images and deploy to AWS
- lambda_fetch fetches the latest data from the Predictit API and stores to a staging layer in an S3 bucket
- lambda_validate picks up the staged response and validates it against the expected JSON pattern, moving to a raw storage layer if valid
- Snowflake SQL scripts copy and flatten the JSON file to a staging table, and populates the data warehouse model with new data
- Analytical SQL queries are available to generate sample analysis reports
- Airflow is used to schedule and orchestrate the pipeline from fetching to analysis
The Predictit data provides odds on answers to various political questions such as "Who will be the next president of the USA?". The questions are called "markets", and the answers to the markets being "contracts". Users "trade" (bet) on the various outcomes, influencing the trading price (odds). Markets can open and close at any time, and contracts can be opened or closed at any time. Below is the modelled ER diagram for the data.
The model consists of two type 1 slowly changing dimension tables and a central fact table. SQL merge statements cover the opening and closing of markets/contracts, indicated by the EFFECTIVE_TS and EXPIRY_TS columns. The central FACT_PRICES table then has all the columns necessary to generate any kind of analysis.
Some design choices taken:
- FACT_PRICES.MARKET_ID is not strictly necessary as MARKET_ID can be derived from DIM_CONTRACTS
- To reflect a real-world system however, some denormalisation has been justified for performance
- In big data systems, this leads to less joins and faster query performance
- To enforce data integrity, a check constraint can be placed on FACT_PRICES.MARKET_ID = DIM_CONTRACTS.MARKET_ID
The data warehouse loading strategy is covered by the ingest pipeline. An incremental load of both dimension and fact tables from staging tables is used, with MERGE INTO statements loading dimensions tables and an INSERT INTO statement for new price data. Please consult the SQL files for specific code. An example loading strategy for DIM_CONTRACTS is shown below.
MERGE INTO markets.dim_contracts AS tgt
USING markets.stg_dim_contracts AS src
ON src.id = tgt.id
WHEN MATCHED AND src.date_end IS NOT NULL AND tgt.expiry_ts IS NULL THEN
UPDATE SET
tgt.expiry_ts = src.date_end,
tgt.is_open = FALSE
WHEN NOT MATCHED THEN
INSERT (
id,
market_id,
name,
short_name,
effective_ts
)
VALUES (
src.id,
src.market_id,
src.name,
src.short_name,
CURRENT_TIMESTAMP
);Unit testing is conducted using pytest. Testing covers all Python fetching and validation code. To run tests:
pytest tests/Test artifacts will also be generated during CI pipelines.
Terraform is used to provision the AWS infrastructure. It was decided against using Terraform for Snowflake as per the provider documentation stating Terraform-Snowflake should be used for RBAC only, rather than database setup.
The infrastructure contains the following resources:
- S3 bucket
- IAM roles and policies for lambda execution
- predictit_fetch and predictit_validate lambda functions
- predictit_fetch and predictit_validate ECR repositories for lambda function images
The AWS infra is split into IAM, Lambdas and S3 modules. To set up the infrastructure you will need an AWS account and the AWS CLI set up with relevant credentials. Terraform documentation can be found here.
To set up Terraform ensure you have created a terraform.tfvars file at the project root with an s3_bucket_name variable to name the bucket for storage. Then run:
terraform initA dependency exists between the ECR repos and the lambda functions as the functions use the latest images within the repos. You therefore must first create the ECR repos. Follow these steps:
- Provision ECR repos only
terraform apply \
-target=module.lambdas.aws_ecr_repository.lambda_fetch \
-target=module.lambdas.aws_ecr_repository.lambda_validate- Build and push Docker images to ECR's (e.g. for lambda_fetch)
docker buildx build \
--platform linux/amd64 \
--provenance=false \
-t lambda_fetch \
-f ./lambda_fetch/Dockerfile .
aws ecr get-login-password | docker login --username AWS --password-stdin <account>.dkr.ecr.<region>.amazonaws.com
docker tag lambda_fetch <ecr_repo_uri>:latest
docker push <ecr_repo_uri>:latest- Provision infrastructure
terraform applySnowflake DDL setup can be found at ./dags/sql/setup. One point of note is setting up of the storage integration to S3. Follow the relevant Snowflake docs to do this. Once set up the SQL statements can be ran to create the database, schema, storage integration, file format, stage and tables.
The pipeline is orchestrated using Airflow. Two DAGs are present:
- fetch - covers API fetching and validation logic, triggers ingest
- ingest - ingests data into Snowflake data warehouse
The pipeline can be set to run on whatever schedule necessary for analysis. Airflow can usually be deployed using AWS MWAA, but considering costs is not within the scope of this project. The Airflow configuration can be tested using the aws-mwaa-local-runner however.
CI/CD pipelines are stored in the .github/worfklows directory. These cover testing and formatting of code and deployment of code changes to lambda functions.
The pipelines are configured to run on pushes to the main branch, or opening of pull requests to the main branch.
They ensure production environments are not undermined by bugs and are consistently up to date.
To make the pipeline more akin to a real-world system we could:
- Deploy the Airflow DAGs to a production environment
- Create analytics dashboards to present the trends
- Add monitoring through AWS CloudWatch or Airflow SLAs
- Extend the pipeline to support near real-time ingestion, or refactor to a streaming pipeline using Apache Kafka
Billy Moore

