Skip to content

Commit 54fd2eb

Browse files
authored
fix: Allow split page logic to process files concurrently (#175)
# The Issue `split_pdf_hook.py` does not support multiple concurrent files. This is because we store the split request tasks in `self.coroutines_to_execute[operation_id]`, where `operation_id` is just the string "partition". Therefore, if we send two concurrent docs using the same SDK, they'll both try to await the same list of coroutines. This could result in interleaved results, but mostly it breaks with `RuntimeError: coroutine is being awaited already`, as the second request gets ready to await its requests. This will block anyone trying to use the new `partition_async` to fan out their pdfs. Note that the js/ts client also has this issue. # The fix We need to use an actual id to index into `coroutines_to_execute`. In `before_request`, let's make a uuid and build up the list of coroutines for this doc. We need to pass this id to `after_success` in order to retrieve the results, so we can set it as a header on our "dummy" request that's returned to the SDK. # Testing See the new integration test. We can verify this by sending two docs serially, and then with `asyncio.gather`, and confirm that the results are the same.
1 parent 4ac3b2d commit 54fd2eb

File tree

3 files changed

+91
-8
lines changed

3 files changed

+91
-8
lines changed

CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
## 0.26.0-dev
2+
3+
### Enhancements
4+
5+
### Features
6+
* Add `partition_async` for a non blocking alternative to `partition`
7+
8+
### Fixes

_test_unstructured_client/integration/test_integration_freemium.py

+65
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pathlib import Path
55

66
import pytest
7+
from deepdiff import DeepDiff
78
from unstructured_client import UnstructuredClient
89
from unstructured_client.models import shared, operations
910
from unstructured_client.models.errors import SDKError, ServerError, HTTPValidationError
@@ -125,6 +126,70 @@ async def test_partition_async_returns_elements(client, doc_path):
125126
assert len(response.elements)
126127

127128

129+
@pytest.mark.asyncio
130+
async def test_partition_async_processes_concurrent_files(client, doc_path):
131+
"""
132+
Assert that partition_async can be used to send multiple files concurrently.
133+
Send two separate portions of the test doc, serially and then using asyncio.gather.
134+
The results for both runs should match.
135+
"""
136+
filename = "layout-parser-paper.pdf"
137+
138+
with open(doc_path / filename, "rb") as f:
139+
files = shared.Files(
140+
content=f.read(),
141+
file_name=filename,
142+
)
143+
144+
# Set up two SDK requests
145+
# For different page ranges
146+
requests = [
147+
operations.PartitionRequest(
148+
partition_parameters=shared.PartitionParameters(
149+
files=files,
150+
strategy="fast",
151+
languages=["eng"],
152+
split_pdf_page=True,
153+
split_pdf_page_range=[1, 3],
154+
)
155+
),
156+
operations.PartitionRequest(
157+
partition_parameters=shared.PartitionParameters(
158+
files=files,
159+
strategy="fast",
160+
languages=["eng"],
161+
split_pdf_page=True,
162+
split_pdf_page_range=[10, 12],
163+
)
164+
)
165+
]
166+
167+
serial_responses = []
168+
for req in requests:
169+
res = await client.general.partition_async(request=req)
170+
171+
assert res.status_code == 200
172+
serial_responses.append(res.elements)
173+
174+
concurrent_responses = []
175+
results = await asyncio.gather(
176+
client.general.partition_async(request=requests[0]),
177+
client.general.partition_async(request=requests[1])
178+
)
179+
180+
for res in results:
181+
assert res.status_code == 200
182+
concurrent_responses.append(res.elements)
183+
184+
diff = DeepDiff(
185+
t1=serial_responses,
186+
t2=concurrent_responses,
187+
ignore_order=True,
188+
)
189+
190+
assert len(diff) == 0
191+
192+
128193
def test_uvloop_partitions_without_errors(client, doc_path):
129194
async def call_api():
130195
filename = "layout-parser-paper-fast.pdf"

src/unstructured_client/_hooks/custom/split_pdf_hook.py

+18-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import os
77
import math
8+
import uuid
89
from collections.abc import Awaitable
910
from typing import Any, Coroutine, Optional, Tuple, Union, cast
1011

@@ -198,7 +199,12 @@ def before_request(
198199
# This allows us to use an event loop in an env with an existing loop
199200
# Temporary fix until we can improve the async splitting behavior
200201
nest_asyncio.apply()
201-
operation_id = hook_ctx.operation_id
202+
203+
# This is our key into coroutines_to_execute
204+
# We need to pass it on to after_success so
205+
# we know which results are ours
206+
operation_id = str(uuid.uuid4())
207+
202208
content_type = request.headers.get("Content-Type")
203209

204210
request_content = request.read()
@@ -329,16 +335,20 @@ async def call_api_partial(page):
329335
self.coroutines_to_execute[operation_id].append(coroutine)
330336
set_index += 1
331337

338+
332339
# Return a dummy request for the SDK to use
333340
# This allows us to skip right to the AfterRequestHook and await all the calls
341+
# Also, pass the operation_id so after_success can await the right results
342+
334343
# Note: We need access to the async_client from the sdk_init hook in order to set
335344
# up a mock request like this.
336345
# For now, just make an extra request against our api, which should return 200.
337346
# dummy_request = httpx.Request("GET", "http://no-op")
338-
339-
dummy_request = httpx.Request("GET", "https://api.unstructuredapp.io/general/docs")
340-
341-
return dummy_request
347+
return httpx.Request(
348+
"GET",
349+
"https://api.unstructuredapp.io/general/docs",
350+
headers={"operation_id": operation_id},
351+
)
342352

343353
def _await_elements(
344354
self, operation_id: str) -> Optional[list]:
@@ -407,9 +417,9 @@ def after_success(
407417
combined response object; otherwise, the original response. Can return
408418
exception if it ocurred during the execution.
409419
"""
410-
operation_id = hook_ctx.operation_id
411-
# Because in `before_request` method we skipped sending last page in parallel
412-
# we need to pass response, which contains last page, to `_await_elements` method
420+
# Grab the correct id out of the dummy request
421+
operation_id = response.request.headers.get("operation_id")
422+
413423
elements = self._await_elements(operation_id)
414424

415425
# if fails are disallowed, return the first failed response

0 commit comments

Comments
 (0)