Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Batch Processing with MemoryAdaptiveDispatcher Missing Crawl Outputs #975

Open
jacobshenn opened this issue Apr 11, 2025 · 0 comments
Labels
🐞 Bug Something isn't working 🩺 Needs Triage Needs attention of maintainers

Comments

@jacobshenn
Copy link

crawl4ai version

0.5.0

Expected Behavior

All 680 product URLs passed to crawler.arun_many() should produce a corresponding result.extracted_content if the crawling and extraction process succeeds. Each result should be saved to MongoDB as a new document. I have 680 unique URL's that should output 680 unique MongoDB documents.

Current Behavior

I have 680 unique URL's that output 540 MongoDB documents. The MemoryAdaptiveDispatcher is not indicating any errors processing the URL's and all URL's were displayed to be processed. Is this a concurrent processing issue or a race condition issue?

Is this reproducible?

Yes

Inputs Causing the Bug

- A list of 680 product URLs loaded from product_links.json
- This bug happens with any large URL set that I run concurrent processing on.

Steps to Reproduce

Code snippets

import os
import asyncio
import json
from pydantic import BaseModel
from datetime import datetime, UTC
from crawl4ai import (
    AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode,
    LLMConfig, CrawlerMonitor, DisplayMode
)
from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from pymongo import MongoClient

# MongoDB connection
client = MongoClient("YOUR_MONGO_URI")
collection = client["your_db"]["your_collection"]

class ProductMetadata(BaseModel):
    # Sample Schema

async def main():
    # Load URLs
    with open('product_links.json', 'r') as f:
        product_urls = json.load(f).get('validated_links', [])
    
    browser_cfg = BrowserConfig(
        browser_type="chromium",
        headless=True,
        java_script_enabled=True
    )

    llm_strategy = LLMExtractionStrategy(
        llm_config=LLMConfig(
            provider="openai/gpt-4o",
            api_token="YOUR_API_KEY",
            base_url="https://your-llm-api-url.com"
        ),
        schema=ProductMetadata.model_json_schema(),
        extraction_type="schema",
        instruction="(Sanitized prompt omitted for brevity)",
        chunk_token_threshold=1000,
        apply_chunking=False,
        input_format="markdown",
        extra_args={"temperature": 0.1, "max_tokens": 1000}
    )

    crawl_config = CrawlerRunConfig(
        extraction_strategy=llm_strategy,
        cache_mode=CacheMode.BYPASS,
        stream=True
    )

    dispatcher = MemoryAdaptiveDispatcher(
        memory_threshold_percent=80.0,
        check_interval=1.0,
        max_session_permit=10,
        monitor=CrawlerMonitor(enable_ui=True)
    )

    async with AsyncWebCrawler(config=browser_cfg) as crawler:
        async for result in await crawler.arun_many(
            urls=product_urls,
            config=crawl_config,
            dispatcher=dispatcher
        ):
            if result.success:
                try:
                    data = json.loads(result.extracted_content)
                    if isinstance(data, list):
                        data = max(data, key=lambda x: sum(1 for v in x.values() if v and v != "N/A"))
                    data['extraction_date'] = datetime.now(UTC).isoformat()
                    data['source_url'] = result.url

                    existing = collection.find_one({"product_link": data.get("product_link")})
                    if existing:
                        collection.update_one({"_id": existing["_id"]}, {"$set": data})
                        print(f"Updated: {data.get('product_name')}")
                    else:
                        collection.insert_one(data)
                        print(f"Inserted: {data.get('product_name')}")

                    print(f"Memory Usage: {result.dispatch_result.memory_usage:.1f}MB")
                except Exception as e:
                    print(f"Processing error: {e}")
            else:
                print(f"Failed to crawl: {result.error_message}")

if __name__ == "__main__":
    asyncio.run(main())

OS

macOS

Python version

3.13.2

Browser

Chrome

Browser version

No response

Error logs & Screenshots (if applicable)

No Error Logs.

@jacobshenn jacobshenn added 🐞 Bug Something isn't working 🩺 Needs Triage Needs attention of maintainers labels Apr 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🐞 Bug Something isn't working 🩺 Needs Triage Needs attention of maintainers
Projects
None yet
Development

No branches or pull requests

1 participant