diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a59b6d47..e27eedfd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -107,6 +107,9 @@ jobs: - name: Run unit and functional tests with tox run: | tox + env: + # Increase from 1.7 to a greater value to avoid the PyTorch MPS backend running OOM. + PYTORCH_MPS_HIGH_WATERMARK_RATIO: 2.0 - name: Remove llama-cpp-python from cache if: always() diff --git a/requirements.txt b/requirements.txt index ec3c012d..356b7b7e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,9 @@ # SPDX-License-Identifier: Apache-2.0 click>=8.1.7,<9.0.0 datasets>=2.18.0,<3.0.0 -docling[tesserocr]>=2.4.2,<=2.8.3; sys_platform != 'darwin' -docling>=2.4.2,<=2.8.3; sys_platform == 'darwin' -docling-parse>=2.0.0,<3.0.0 +docling-core[chunking]>=2.9.0 +docling[tesserocr]>=2.9.0; sys_platform != 'darwin' +docling>=2.9.0; sys_platform == 'darwin' GitPython>=3.1.42,<4.0.0 gguf>=0.6.0 httpx>=0.25.0,<1.0.0 diff --git a/src/instructlab/sdg/utils/chunkers.py b/src/instructlab/sdg/utils/chunkers.py index e256c969..e7bdf820 100644 --- a/src/instructlab/sdg/utils/chunkers.py +++ b/src/instructlab/sdg/utils/chunkers.py @@ -4,20 +4,23 @@ from typing import Dict, Iterable, List, Optional import json import logging +import os import re +import sys # Third Party from datasets import Dataset from docling.datamodel.base_models import InputFormat from docling.datamodel.document import ConversionResult from docling.datamodel.pipeline_options import ( + AcceleratorDevice, + AcceleratorOptions, EasyOcrOptions, OcrOptions, PdfPipelineOptions, TesseractOcrOptions, ) from langchain_text_splitters import Language, RecursiveCharacterTextSplitter -from tabulate import tabulate # First Party from instructlab.sdg.utils.model_formats import is_model_gguf, is_model_safetensors @@ -35,7 +38,12 @@ def _num_chars_from_tokens(num_tokens) -> int: return int(num_tokens * 4) # 1 token ~ 4 English character -def resolve_ocr_options() -> OcrOptions: +def resolve_ocr_options( + docling_model_path: Optional[Path] = None, +) -> Optional[OcrOptions]: + # Declare ocr_options explicitly as Optional[OcrOptions] + ocr_options: Optional[OcrOptions] = None + # First, attempt to use tesserocr try: ocr_options = TesseractOcrOptions() @@ -47,17 +55,29 @@ def resolve_ocr_options() -> OcrOptions: return ocr_options except ImportError: # No tesserocr, so try something else - pass + logger.warning("Tesseract not found, falling back to EasyOCR.") + try: - ocr_options = EasyOcrOptions() - # Keep easyocr models on the CPU instead of GPU - ocr_options.use_gpu = False + ocr_options = EasyOcrOptions( + lang=["en"], + use_gpu=None, + confidence_threshold=0.5, + model_storage_directory=str(docling_model_path), + recog_network="standard", + download_enabled=True, + ) + accelerator_options = AcceleratorOptions(device="cpu") # triggers torch loading, import lazily # pylint: disable=import-outside-toplevel # Third Party from docling.models.easyocr_model import EasyOcrModel - _ = EasyOcrModel(True, ocr_options) + _ = EasyOcrModel( + enabled=True, + artifacts_path=None, + options=ocr_options, + accelerator_options=accelerator_options, + ) return ocr_options except ImportError: # no easyocr either, so don't use any OCR @@ -126,8 +146,12 @@ def _init_docling_converter(self): artifacts_path=self.docling_model_path, do_ocr=False, ) - - ocr_options = resolve_ocr_options() + # deactivate MPS acceleration on Github CI + if os.getenv("CI") and sys.platform == "darwin": + pipeline_options.accelerator_options = AcceleratorOptions( + device=AcceleratorDevice.CPU + ) + ocr_options = resolve_ocr_options(docling_model_path=self.docling_model_path) if ocr_options is not None: pipeline_options.do_ocr = True pipeline_options.ocr_options = ocr_options @@ -144,20 +168,32 @@ def chunk_documents(self) -> List: Returns: List: a list of chunks from the documents """ - - if self.document_paths == []: - return [] + # Move docling_core import inside method where it's used to avoid importing transformers at top level + # pylint: disable=import-outside-toplevel + # Third Party + from docling_core.transforms.chunker.hybrid_chunker import HybridChunker parsed_documents = self.converter.convert_all(self.document_paths) + all_chunks = [] + for conversion_result in parsed_documents: + doc = conversion_result.document + chunker = HybridChunker(tokenizer=self.tokenizer, max_tokens=500) + try: + chunk_iter = chunker.chunk(dl_doc=doc) + chunks = [chunker.serialize(chunk=chunk) for chunk in chunk_iter] + except Exception as e: # pylint: disable=broad-exception-caught + logger.error( + f"Error chunking document {conversion_result.input.file}: {e}" + ) + chunks = [] - docling_artifacts_path = self.export_documents(parsed_documents) - - docling_json_paths = list(docling_artifacts_path.glob("*.json")) - chunks = [] - for json_fp in docling_json_paths: - chunks.extend(self._process_parsed_docling_json(json_fp)) + fused_texts = self.fuse_texts(chunks, 200) + num_tokens_per_doc = _num_tokens_from_words(self.chunk_word_count) + chunk_size = _num_chars_from_tokens(num_tokens_per_doc) + final_chunks = chunk_markdowns(fused_texts, chunk_size) + all_chunks.extend(final_chunks) - return chunks + return all_chunks def _path_validator(self, path) -> Path: """ @@ -173,29 +209,6 @@ def _path_validator(self, path) -> Path: raise FileNotFoundError(f"{path} does not exist.") return path - def _process_parsed_docling_json(self, json_fp: Path) -> Dataset: - """ - Process the parsed docling json file and return a dataset. - Args: - json_fp (Path): Path to the parsed docling json file. - Returns: - List: a list of chunks built from the provided json file - """ - logger.info(f"Processing parsed docling json file: {json_fp}") - with open(json_fp, "r", encoding="utf-8") as f: - data = json.load(f) - - chunks = self.build_chunks_from_docling_json( - data, - max_token_per_chunk=500, - tokenizer=self.tokenizer, - ) - fused_texts = self.fuse_texts(chunks, 200) - - num_tokens_per_doc = _num_tokens_from_words(self.chunk_word_count) - chunk_size = _num_chars_from_tokens(num_tokens_per_doc) - return chunk_markdowns(fused_texts, chunk_size) - def fuse_texts( self, text_list: List, short_length_threshold: int = 130 ) -> List[str]: @@ -288,215 +301,6 @@ def get_token_count(self, text, tokenizer): """ return len(tokenizer.tokenize(text)) - def add_heading_formatting(self, text): - """ - Add heading formatting to the text if the first part is short. - Args: - text (str): The input text to format. - Returns: - str: Formatted text with headings applied. - """ - text = text.split(".") - - # Change this from hardcoded to something more flexible - if len(text) > 1 and len(text[0].split(" ")) < 3: - text = f"**{text[0]}**" + ".".join(text[1:]) - else: - text = ".".join(text) - return text - - def generate_table_from_parsed_rep(self, item): - """ - Generate the table from the parsed representation and return as a string. - Args: - item (dict): Parsed representation of a table. - Returns: - str: Formatted table as a string. - """ - caption = "" - if "text" in item: - caption = item["text"] - - data = item["data"] - - if len(data) <= 1 or len(data[0]) <= 1: - return "" - - table = [] - for _, row in enumerate(data): - trow = [] - for _, cell in enumerate(row): - trow.append(cell["text"]) - table.append(trow) - - table_text = tabulate(table, tablefmt="github") - if caption: - table_text += f"\nCaption: {caption}\n" - return table_text - - def get_table(self, json_book, table_ref): - """ - Retrieve a table from a document based on a reference string. - Args: - json_book (dict): JSON representation of the document. - table_ref (str): Reference path to the table within the document. - Returns: - str: Formatted table string. - """ - parts = table_ref.split("/") - table_text = self.generate_table_from_parsed_rep( - json_book[parts[1]][int(parts[2])] - ) - return table_text - - def get_table_page_number(self, json_book, idx): - """ - Get the page number of a table or other document element. - Args: - json_book (dict): JSON representation of the document. - idx (int): Index of the element in the document. - Returns: - int: Page number of the element. - """ - prev_page_num, next_page_num = None, None - for book_element in json_book["main-text"][idx - 1 :: -1]: - if "prov" in book_element and book_element["prov"]: - prev_page_num = book_element["prov"][0]["page"] - break - for book_element in json_book["main-text"][idx:]: - if "prov" in book_element and book_element["prov"]: - next_page_num = book_element["prov"][0]["page"] - break - if prev_page_num is not None and next_page_num is not None: - if prev_page_num == next_page_num: - return prev_page_num - return next_page_num - if prev_page_num is not None: - return prev_page_num - if next_page_num is not None: - return next_page_num - - def build_chunks_from_docling_json( - self, - json_book, - max_token_per_chunk, - tokenizer, - keep_same_page_thing_together=False, - chunking_criteria=None, - ): - """ - Build document chunks from a docling JSON representation. - Args: - json_book (dict): JSON document to process. - max_token_per_chunk (int): Maximum token count per chunk. - tokenizer (AutoTokenizer): Tokenizer instance to use. - keep_same_page_thing_together (bool): Whether to keep content on the same page together. - chunking_criteria (callable): Custom function for determining chunk breaks. - Returns: - list: List of document chunks. - """ - current_buffer = [] - document_chunks = [] - prev_page_number = None - book_title = None - - for idx, book_element in enumerate(json_book["main-text"]): - if book_element["type"] in [ - "page-footer", - "picture", - "reference", - "meta-data", - "figure", - "page-header", - ]: - continue - if book_element["type"] == "footnote": - current_book_page_number = book_element["prov"][0]["page"] - elif book_element["type"] in [ - "subtitle-level-1", - "paragraph", - "table", - "title", - "equation", - ]: # 'page-header', - if book_element["type"] == "table": - current_book_page_number = self.get_table_page_number( - json_book, idx - ) - book_text = self.get_table(json_book, book_element["$ref"]) - elif book_element["prov"]: - current_book_page_number = book_element["prov"][0][ - "page" - ] # TODO export to function to handle empty ["prov"] - book_text = book_element["text"] - else: - current_book_page_number = None - book_text = book_element["text"] - - if book_element["type"] == "subtitle-level-1": - if book_title is None: - book_title = book_text - book_text = f"# Title: **{book_text}**" - else: - book_text = f"## **{book_text}**" - - if book_element["type"] == "title": - book_text = f"# **{book_text}**" - if book_element["type"] == "page-header": - book_text = f"Page Header: **{book_text}**\n\n" - - if chunking_criteria is not None: - # custom break function that can be used to chunk document - if chunking_criteria(book_text): - document_chunks.append("\n\n".join(current_buffer)) - current_buffer = [] - elif ( - prev_page_number is not None - and prev_page_number != current_book_page_number - ) and keep_same_page_thing_together: - document_chunks.append("\n\n".join(current_buffer)) - current_buffer = [] - else: - if ( - self.get_token_count("\n\n".join(current_buffer), tokenizer) - >= max_token_per_chunk - and len(current_buffer) > 1 - ): - chunk_text = "\n\n".join(current_buffer[:-1]) - logger.debug( - f"Current chunk size {self.get_token_count(chunk_text, tokenizer)} and max is {max_token_per_chunk}" - ) - - document_chunks.append("\n\n".join(current_buffer[:-1])) - - if ( - self.get_token_count(current_buffer[-1], tokenizer) - >= max_token_per_chunk - ): - logger.debug( - f"The following text was dropped from the document because it was too long to fit into a single context for synthetic data generation: {current_buffer[-1]}" - ) - document_chunks.append(current_buffer[-1]) - current_buffer = [] - else: - current_buffer = current_buffer[-1:] - - if book_element["type"] == "paragraph": - book_text = self.add_heading_formatting(book_text) - if "## References" in book_text or "## Acknowledgements" in book_text: - # For research papers we ignore everything after this sections - break - current_buffer.append(book_text) - - try: - prev_page_number = current_book_page_number - except Exception as e: # pylint: disable=broad-exception-caught - logger.error(f"Error processing book element: {book_element}, {str(e)}") - - if "\n\n".join(current_buffer) not in document_chunks: - document_chunks.append("\n\n".join(current_buffer)) - return document_chunks - def export_documents(self, converted_docs: Iterable[ConversionResult]): """Write converted documents to json files @@ -522,11 +326,11 @@ def export_documents(self, converted_docs: Iterable[ConversionResult]): # Export Deep Search document JSON format: with (docling_artifacts_path / f"{doc_filename}.json").open("w") as fp: - fp.write(json.dumps(doc.legacy_document.export_to_dict())) + fp.write(json.dumps(doc.document.export_to_dict())) # Export Markdown format: with (docling_artifacts_path / f"{doc_filename}.md").open("w") as fp: - fp.write(doc.legacy_document.export_to_markdown()) + fp.write(doc.document.export_to_markdown()) else: logger.info(f"Document {doc.input.file} failed to convert.") failure_count += 1 diff --git a/src/instructlab/sdg/utils/taxonomy.py b/src/instructlab/sdg/utils/taxonomy.py index ed0d7940..c6a9799c 100644 --- a/src/instructlab/sdg/utils/taxonomy.py +++ b/src/instructlab/sdg/utils/taxonomy.py @@ -13,7 +13,6 @@ from datasets import Dataset # pylint: disable=no-name-in-module -from docling_parse.docling_parse import pdf_parser_v1 from instructlab.schema.taxonomy import DEFAULT_TAXONOMY_FOLDERS as TAXONOMY_FOLDERS from instructlab.schema.taxonomy import ( TaxonomyMessageFormat, @@ -27,9 +26,6 @@ # Local from .chunkers import DocumentChunker -# Initialize the pdf parser -PDFParser = pdf_parser_v1() - logger = logging.getLogger(__name__) @@ -126,9 +122,9 @@ def _get_documents( source: Dict[str, Union[str, List[str]]], skip_checkout: bool = False, document_output_dir: Path = None, -) -> Tuple[List[str], List[Path]]: +) -> Tuple[List[Path], List[Path]]: """ - Retrieve the content of files (Markdown and PDF) from a Git repository. + Retrieve file paths (Markdown and PDFs) from a Git repository. Args: source (dict): Source info containing repository URL, commit hash, and list of file patterns. @@ -136,8 +132,8 @@ def _get_documents( document_output_dir (Path, optional): Directory to clone the repository into. Defaults to current directory. Returns: - Tuple[List[str], List[Path]]: - - List of document contents (Markdown as text and PDFs as extracted text). + Tuple[List[Path], List[Path]]: + - List of file paths for valid documents (Markdown and PDFs). - List of corresponding file paths. Raises: @@ -154,12 +150,10 @@ def _get_documents( if not skip_checkout and commit_hash: repo.git.checkout(commit_hash) - file_contents = [] filepaths = [] logger.info("Processing files...") for pattern in file_patterns: - # Use glob to find files matching the pattern matched_files = glob.glob( os.path.join(repo.working_dir, pattern), recursive=True ) @@ -169,76 +163,13 @@ def _get_documents( if os.path.isfile(file_path): logger.info(f"Processing file: {file_path}") try: - if file_path.lower().endswith(".md"): - # Process Markdown files - with open(file_path, "r", encoding="utf-8") as file: - content = file.read() - if _string_contains_html(content): - logging.warning( - f"Provided markdown file {file_path} contains HTML contents, which is currently unsupported as a part of markdown" - "NOTE: Continuing this might affect your data generation quality." - "To get best results please format your markdown documents without the use of HTML or use a different document filetype." - ) - file_contents.append(content) - filepaths.append(Path(file_path)) - logger.info( - f"Appended Markdown content from {file_path}" - ) - - elif file_path.lower().endswith(".pdf"): - # Process PDF files using docling_parse's pdf_parser_v1 - doc_key = f"key_{os.path.basename(file_path)}" # Unique document key - logger.info(f"Loading PDF document from {file_path}") - - success = PDFParser.load_document(doc_key, file_path) - if not success: - logger.warning( - f"Failed to load PDF document: {file_path}" - ) - continue - - num_pages = PDFParser.number_of_pages(doc_key) - logger.info(f"PDF '{file_path}' has {num_pages} pages.") - - pdf_text = "" - - for page in range(num_pages): - try: - json_doc = PDFParser.parse_pdf_from_key_on_page( - doc_key, page - ) - if "pages" not in json_doc or not json_doc["pages"]: - logger.warning( - f"Page {page + 1} could not be parsed in '{file_path}'" - ) - continue - - json_page = json_doc["pages"][0] - - # Extract text from cells - for cell in json_page.get("cells", []): - text = cell.get("content", {}).get( - "rnormalized", "" - ) - if text.strip(): # Only append non-empty text - pdf_text += text.strip() + "\n" - except Exception as page_error: # pylint: disable=broad-exception-caught - logger.warning( - f"Error parsing page {page + 1} of '{file_path}': {page_error}" - ) - continue - - if pdf_text: - file_contents.append(pdf_text) - filepaths.append(Path(file_path)) - - # Unload the document to free memory - PDFParser.unload_document(doc_key) - logger.info(f"Unloaded PDF document: {file_path}") - + if file_path.lower().endswith((".md", ".pdf")): + filepaths.append(Path(file_path)) + logger.info(f"Added file path: {file_path}") else: logger.info(f"Skipping unsupported file type: {file_path}") - except Exception as file_error: # pylint: disable=broad-exception-caught + # pylint: disable=broad-exception-caught + except Exception as file_error: logger.error( f"Error processing file '{file_path}': {file_error}" ) @@ -246,8 +177,8 @@ def _get_documents( else: logger.info(f"Skipping non-file path: {file_path}") - if file_contents: - return file_contents, filepaths + if filepaths: + return filepaths, filepaths raise SystemExit("Couldn't find knowledge documents") except (OSError, git.exc.GitCommandError, FileNotFoundError) as e: @@ -281,17 +212,17 @@ def _read_taxonomy_file( task_description = contents.get("task_description", None) domain = contents.get("domain") documents = contents.get("document") - document_contents, doc_filepaths = None, None + doc_filepaths, _ = None, None if documents: os.makedirs(document_output_dir, exist_ok=True) unique_output_dir = mkdtemp( prefix=f"{leaf_node_path}_", dir=document_output_dir ) - document_contents, doc_filepaths = _get_documents( + doc_filepaths, _ = _get_documents( source=documents, document_output_dir=unique_output_dir, ) - logger.debug("Content from git repo fetched") + logger.debug("File paths from git repo fetched") for seed_example in contents.get("seed_examples"): context = seed_example.get("context", "") @@ -302,7 +233,6 @@ def _read_taxonomy_file( "questions_and_answers": question_answer_list, "context": context, "taxonomy_path": tax_path, - "documents": document_contents, "filepaths": doc_filepaths, "domain": domain, "document_outline": contents.get("document_outline"), @@ -319,7 +249,6 @@ def _read_taxonomy_file( "output": answer, "taxonomy_path": tax_path, "task_description": task_description, - "document": documents, "domain": domain, } ) @@ -493,7 +422,7 @@ def leaf_node_to_samples( docling_model_path=None, ): samples = [] - if leaf_node and leaf_node[0].get("documents"): + if leaf_node and leaf_node[0].get("filepaths"): samples = _knowledge_leaf_node_to_samples( leaf_node, server_ctx_size, diff --git a/tests/functional/test_chunkers.py b/tests/functional/test_chunkers.py index f06db504..58e48cec 100644 --- a/tests/functional/test_chunkers.py +++ b/tests/functional/test_chunkers.py @@ -1,13 +1,20 @@ # Standard from pathlib import Path +import logging import os +import sys # Third Party import pytest +import torch # First Party from instructlab.sdg.utils.chunkers import DocumentChunker +# Set up logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + # Constants for Test Directory and Test Documents TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "testdata") TEST_DOCUMENTS = { @@ -25,26 +32,56 @@ def test_paths(): } -@pytest.fixture +@pytest.fixture(scope="module") def tokenizer_model_name(): """Fixture to return the path to the tokenizer model.""" return os.path.join(TEST_DATA_DIR, "models/instructlab/granite-7b-lab") +@pytest.fixture(scope="module", autouse=True) +def force_cpu_on_macos_ci(): + """Force CPU usage on macOS CI environments.""" + logger.debug("=== Starting CPU force fixture ===") + is_macos = sys.platform == "darwin" + if is_macos: + logger.info("Forcing CPU usage on macOS CI environment") + # Force CPU as default device + os.environ["PYTORCH_DEVICE"] = "cpu" + torch.set_default_device("cpu") + + # Disable MPS + os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "0" + + logger.debug(f"Current device: {os.getenv('PYTORCH_DEVICE', 'not set')}") + logger.debug(f"Available PyTorch devices: CPU={torch.cuda.is_available()}") + logger.debug("=== Finished CPU force fixture ===") + + yield + + @pytest.mark.parametrize( - "doc_type, expected_chunks, contains_text", + "document_type, expected_chunks", [ - ("pdf", 9, "Phoenix is a minor constellation"), - ("md", 7, None), # Assuming there's no specific text to check in Markdown + ("pdf", 9), + ("md", 7), ], ) def test_chunk_documents( - tmp_path, tokenizer_model_name, test_paths, doc_type, expected_chunks, contains_text + tmp_path, + tokenizer_model_name, + test_paths, + document_type, + expected_chunks, ): """ Generalized test function for chunking documents. + + Verifies that: + - The number of chunks is greater than the expected minimum. + - No chunk is empty. + - Each chunk's length is less than 2500 characters. """ - document_path = test_paths[doc_type] + document_path = test_paths[document_type] chunker = DocumentChunker( document_paths=[document_path], output_dir=tmp_path, @@ -53,8 +90,13 @@ def test_chunk_documents( chunk_word_count=500, ) chunks = chunker.chunk_documents() - assert len(chunks) > expected_chunks - if contains_text: - assert contains_text in chunks[0] + + # Check that we have more chunks than expected. + assert ( + len(chunks) > expected_chunks + ), f"Expected more than {expected_chunks} chunks, got {len(chunks)}" + + # Check that no chunk is empty and each chunk's length is within the allowed limit. for chunk in chunks: - assert len(chunk) < 2500 + assert chunk, "Chunk should not be empty" + assert len(chunk) < 2500, f"Chunk length {len(chunk)} exceeds maximum allowed" diff --git a/tox.ini b/tox.ini index cf912fd8..f3f9d829 100644 --- a/tox.ini +++ b/tox.ini @@ -12,6 +12,8 @@ description = run tests (unit, unitcov, functional) # are huge. This reduces venv from 5.7 GB to 1.5 GB. setenv = PIP_EXTRA_INDEX_URL=https://download.pytorch.org/whl/cpu +passenv = + CI package = wheel wheel_build_env = pkg deps = -r requirements-dev.txt