The MAAP Data Loader application is designed to synchronize data from various sources (local filesystems, S3 buckets, Google Drive) to a MongoDB database using unstructured.io (MAAP Partner).
Unstructured is a platform that specializes in processing and structuring unstructured data for seamless integration and analysis. It provides a flexible and efficient way to ingest, process, and store unstructured data, making it easily accessible for further analysis and use.
Key features include:
- Support for multiple data sources (local, S3, Google Drive)
- Configurable sync intervals
- Data processing pipeline including partitioning, chunking, and embedding
- RESTful API for controlling sync operations
- Customizable MongoDB ingestion with duplicate handling
This system is ideal for organizations needing to centralize and structure data from diverse sources, enabling seamless data integration and analysis.
The MAAP Data Loader application follows a modular architecture, consisting of several key components:
- FastAPI Application: Provides RESTful API endpoints for controlling the sync process.
- Configuration Manager: Handles loading and parsing of the YAML configuration file.
- Data Pipeline: Orchestrates the data processing flow, including:
- Source Connectors (Local, S3, Google Drive)
- Indexer
- Downloader
- Partitioner
- Chunker
- Embedder
- MongoDB Uploader
- MongoDB Integration: Manages connections and data ingestion to the MongoDB database.
- Background Task Manager: Handles periodic sync operations asynchronously.
Reference Architecture:
This architecture ensures scalability, modularity, and ease of maintenance, allowing for future extensions and modifications to the system.
To install and deploy the MAAP Data Loader application, follow these steps:
-
Prerequisites:
- Python 3.7 or higher
- pip (Python package manager)
- Access to a MongoDB instance
- Docker
-
Clone the repository:
git clone https://github.com/maap-partners/maap-data-loader.git cd maap-data-loader
-
Build the Docker image:
make build-image
-
Running the Docker image:
make run-image
To access the application, you can use the following curl
command to interact with the API hosted on localhost:8182
:
-
Register Source
-
Endpoint:
/register/source
-
Method: GET
-
Description: Registers a new data source for synchronization
-
Curl Command:
- Local Source :
curl --location --request GET 'localhost:8182/register/source' \ --header 'Content-Type: application/json' \ --data-raw '{"sync_interval_seconds": 360, "source": {"source_type": "local", "params": {"remote_url": "<source-url-folder-path>", "chunking_strategy": "by_title", "chunk_max_characters": "1500", "chunk_overlap": "100"}}, "destination": {"mongodb_uri": "<your-mongodb-connection-string>", "database": "<your-db-name>", "collection": "<your-collection-name>", "index_name": "default", "embedding_path": "embeddings", "embedding_dimensions": embedding-model-dims, "id_fields": ["field1","field2" ], "create_md5": true, "batch_size": 100}}'
- S3
curl --location --request GET 'localhost:8182/register/source' \ --header 'Content-Type: application/json' \ --data-raw '{"sync_interval_seconds": 360, "source": {"source_type": "s3", "credentials": {"aws_access_key_id": "<your-aws-access-key-id>", "aws_secret_access_key": "<your-aws-secret-key>", "aws_session_token": "<your-aws-session-token>"}, "params": {"remote_url": "<source-url-folder-path>", "chunking_strategy": "by_title", "chunk_max_characters": "1500", "chunk_overlap": "100"}}, "destination": {"mongodb_uri": "<your-mongodb-connection-string>", "database": "<your-db-name>", "collection": "<your-collection-name>", "index_name": "default", "embedding_path": "embeddings", "embedding_dimensions": embedding-model-dims, "id_fields": ["field1","field2" ], "create_md5": true, "batch_size": 100}}'
- Google Drive
curl --location --request GET 'localhost:8182/register/source' \ --header 'Content-Type: application/json' \ --data-raw '{"sync_interval_seconds": 360, "source": {"source_type": "google-drive", "credentials": {"gcp_service_account_key_string": "<gcp_service_account_key_string>", "google_drive_folder_id": "<google_drive_folder_id>"}, "params": {"remote_url": "<source-url-folder-path>", "chunking_strategy": "by_title", "chunk_max_characters": "1500", "chunk_overlap": "100"}}, "destination": {"mongodb_uri": "<your-mongodb-connection-string>", "database": "<your-db-name>", "collection": "<your-collection-name>", "index_name": "default", "embedding_path": "embeddings", "embedding_dimensions": embedding-model-dims, "id_fields": ["field1","field2" ], "create_md5": true, "batch_size": 100}}'
-
Response:
{ "message": "Source registered successfully." }
-
-
Delete Source
-
Endpoint:
/delete/source
-
Method: GET
-
Description: Deletes an existing data source from synchronization
-
Curl Command:
- Local Source :
curl --location --request GET 'localhost:8182/delete/source' \ --header 'Content-Type: application/json' \ --data-raw '{"sync_interval_seconds": 360, "source": {"source_type": "local", "params": {"remote_url": "<source-url-folder-path>", "chunking_strategy": "by_title", "chunk_max_characters": "1500", "chunk_overlap": "100"}}, "destination": {"mongodb_uri": "<your-mongodb-connection-string>", "database": "<your-db-name>", "collection": "<your-collection-name>", "index_name": "default", "embedding_path": "embeddings", "embedding_dimensions": embedding-model-dims, "id_fields": ["field1","field2" ], "create_md5": true, "batch_size": 100}}'
- S3
curl --location --request GET 'localhost:8182/delete/source' \ --header 'Content-Type: application/json' \ --data-raw '{"sync_interval_seconds": 360, "source": {"source_type": "s3", "credentials": {"aws_access_key_id": "<your-aws-access-key-id>", "aws_secret_access_key": "<your-aws-secret-key>", "aws_session_token": "<your-aws-session-token>"}, "params": {"remote_url": "<source-url-folder-path>", "chunking_strategy": "by_title", "chunk_max_characters": "1500", "chunk_overlap": "100"}}, "destination": {"mongodb_uri": "<your-mongodb-connection-string>", "database": "<your-db-name>", "collection": "<your-collection-name>", "index_name": "default", "embedding_path": "embeddings", "embedding_dimensions": embedding-model-dims, "id_fields": ["field1","field2" ], "create_md5": true, "batch_size": 100}}'
- Google Drive
curl --location --request GET 'localhost:8182/delete/source' \ --header 'Content-Type: application/json' \ --data-raw '{"sync_interval_seconds": 360, "source": {"source_type": "google-drive", "credentials": {"gcp_service_account_key_string": "<gcp_service_account_key_string>", "google_drive_folder_id": "<google_drive_folder_id>"}, "params": {"remote_url": "<source-url-folder-path>", "chunking_strategy": "by_title", "chunk_max_characters": "1500", "chunk_overlap": "100"}}, "destination": {"mongodb_uri": "<your-mongodb-connection-string>", "database": "<your-db-name>", "collection": "<your-collection-name>", "index_name": "default", "embedding_path": "embeddings", "embedding_dimensions": embedding-model-dims, "id_fields": ["field1","field2" ], "create_md5": true, "batch_size": 100}}'
-
Response:
{ "message": "Source deleted successfully." }
-
The application will periodically sync data from the configured sources to the MongoDB destination based on the specified interval.