-
-
Notifications
You must be signed in to change notification settings - Fork 32
feat: Add free-threaded Python 3.14t support with parallel processing (1.55x speedup) #256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master-freethreaded
Are you sure you want to change the base?
feat: Add free-threaded Python 3.14t support with parallel processing (1.55x speedup) #256
Conversation
- Add parallel processing module (json2xml/parallel.py) for concurrent XML conversion - Implement parallel dict and list processing with thread-safe caching - Add support for Python 3.14t free-threaded build (no-GIL) - Achieve up to 1.55x speedup for medium datasets (100-1K items) on Python 3.14t New Features: - parallel parameter to enable/disable parallel processing (default: False) - workers parameter to configure thread count (default: auto-detect) - chunk_size parameter for list chunking (default: 100) - Automatic free-threaded Python detection - Smart fallback to serial processing for small datasets Testing: - Add 20 comprehensive parallel processing tests - All 173 tests passing (153 original + 20 new) - Zero regressions, full backward compatibility Benchmarking: - Add benchmark.py script for performance testing - Benchmark results on Python 3.14 (GIL) and 3.14t (free-threaded) - Medium datasets show 1.55x speedup on Python 3.14t Documentation: - Add FREE_THREADED_OPTIMIZATION_ANALYSIS.md with detailed analysis - Add BENCHMARK_RESULTS.md with complete benchmark data - Add docs/performance.rst for Sphinx documentation - Update README.rst with performance section and usage examples - Add implementation summaries and guides Benchmark Results (Python 3.14t vs 3.14): - Small (10 items): Serial processing (automatic fallback) - Medium (100 items): 5.55ms vs 8.59ms serial (1.55x speedup) - Large (1K items): Comparable performance - XLarge (5K items): Comparable performance Breaking Changes: None Backward Compatibility: Full (parallel=False by default) Amp-Thread-ID: https://ampcode.com/threads/T-9be8ca5d-f9ef-49cb-9913-b82d0f45dac2 Co-authored-by: Amp <[email protected]>
Reviewer's GuideThis PR introduces an opt-in parallel processing layer for json2xml using Python 3.14t’s free-threaded mode. It adds a dedicated parallel module, extends core conversion functions to dispatch between serial and threaded implementations based on configuration, exposes new API parameters, and provides a full suite of tests, benchmarks, and updated documentation—all while preserving backward compatibility. Sequence diagram for parallel dict conversion in dicttoxmlsequenceDiagram
participant Caller
participant "dicttoxml.dicttoxml()"
participant "parallel.convert_dict_parallel()"
participant "ThreadPoolExecutor"
participant "_convert_dict_item()"
Caller->>"dicttoxml.dicttoxml()": call with parallel=True, obj is dict
"dicttoxml.dicttoxml()"->>"parallel.convert_dict_parallel()": dispatch for dict
"parallel.convert_dict_parallel()"->>"ThreadPoolExecutor": submit _convert_dict_item for each key
"ThreadPoolExecutor"->>"_convert_dict_item()": process key/value in thread
"_convert_dict_item()"-->>"ThreadPoolExecutor": return XML string
"ThreadPoolExecutor"-->>"parallel.convert_dict_parallel()": collect results
"parallel.convert_dict_parallel()"-->>"dicttoxml.dicttoxml()": return joined XML
"dicttoxml.dicttoxml()"-->>Caller: return XML bytes
Sequence diagram for parallel list conversion in dicttoxmlsequenceDiagram
participant Caller
participant "dicttoxml.dicttoxml()"
participant "parallel.convert_list_parallel()"
participant "ThreadPoolExecutor"
participant "_convert_list_chunk()"
Caller->>"dicttoxml.dicttoxml()": call with parallel=True, obj is list
"dicttoxml.dicttoxml()"->>"parallel.convert_list_parallel()": dispatch for list
"parallel.convert_list_parallel()"->>"ThreadPoolExecutor": submit _convert_list_chunk for each chunk
"ThreadPoolExecutor"->>"_convert_list_chunk()": process chunk in thread
"_convert_list_chunk()"-->>"ThreadPoolExecutor": return XML string
"ThreadPoolExecutor"-->>"parallel.convert_list_parallel()": collect results
"parallel.convert_list_parallel()"-->>"dicttoxml.dicttoxml()": return joined XML
"dicttoxml.dicttoxml()"-->>Caller: return XML bytes
Class diagram for updated Json2xml and dicttoxml APIclassDiagram
class Json2xml {
+data: dict[str, Any] | None
+pretty: bool
+attr_type: bool
+item_wrap: bool
+root: str | None
+parallel: bool
+workers: int | None
+chunk_size: int
+to_xml() Any | None
}
class dicttoxml {
+dicttoxml(
obj: Any,
ids: list[str] = [],
custom_root: str = "root",
attr_type: bool = True,
item_func: Callable[[str], str] = default_item_func,
cdata: bool = False,
xml_namespaces: dict[str, Any],
list_headers: bool = False,
parallel: bool = False,
workers: int | None = None,
chunk_size: int = 100
) -> bytes
}
Json2xml --> dicttoxml : uses
Class diagram for new parallel processing moduleclassDiagram
class parallel {
+is_free_threaded() bool
+get_optimal_workers(workers: int | None) int
+key_is_valid_xml_cached(key: str) bool
+make_valid_xml_name_cached(key: str, attr: dict[str, Any]) tuple[str, dict[str, Any]]
+convert_dict_parallel(
obj: dict[str, Any],
ids: list[str],
parent: str,
attr_type: bool,
item_func: Callable[[str], str],
cdata: bool,
item_wrap: bool,
list_headers: bool = False,
workers: int | None = None,
min_items_for_parallel: int = 10
) str
+convert_list_parallel(
items: Sequence[Any],
ids: list[str] | None,
parent: str,
attr_type: bool,
item_func: Callable[[str], str],
cdata: bool,
item_wrap: bool,
list_headers: bool = False,
workers: int | None = None,
chunk_size: int = 100
) str
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master-freethreaded #256 +/- ##
=======================================================
+ Coverage 99.30% 99.53% +0.23%
=======================================================
Files 3 4 +1
Lines 288 432 +144
=======================================================
+ Hits 286 430 +144
Misses 2 2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider exposing the
min_items_for_parallelthreshold as a constructor parameter (similar tochunk_size) rather than hard-coding it to 10 insideconvert_dict_parallel, so users can tune when dict parallelization kicks in. - Avoid dynamic imports within the hot path of
dicttoxml—movefrom json2xml.parallel import …to module level to eliminate repetitive import overhead during conversion. - Benchmarks show limited gains on large datasets due to string concatenation and pretty-printing overhead; consider profiling those hotspots and using more efficient string builders (e.g.
io.StringIO) or streaming output to improve throughput beyond the medium-size sweet spot.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider exposing the `min_items_for_parallel` threshold as a constructor parameter (similar to `chunk_size`) rather than hard-coding it to 10 inside `convert_dict_parallel`, so users can tune when dict parallelization kicks in.
- Avoid dynamic imports within the hot path of `dicttoxml`—move `from json2xml.parallel import …` to module level to eliminate repetitive import overhead during conversion.
- Benchmarks show limited gains on large datasets due to string concatenation and pretty-printing overhead; consider profiling those hotspots and using more efficient string builders (e.g. `io.StringIO`) or streaming output to improve throughput beyond the medium-size sweet spot.
## Individual Comments
### Comment 1
<location> `json2xml/dicttoxml.py:726-731` </location>
<code_context>
+
+ if root:
+ output.append('<?xml version="1.0" encoding="UTF-8" ?>')
+ if isinstance(obj, dict):
+ output_elem = convert_dict_parallel(
+ obj, ids, custom_root, attr_type, item_func, cdata, item_wrap,
+ list_headers=list_headers, workers=workers, min_items_for_parallel=10
+ )
+ elif isinstance(obj, Sequence):
+ output_elem = convert_list_parallel(
+ obj, ids, custom_root, attr_type, item_func, cdata, item_wrap,
</code_context>
<issue_to_address>
**issue (bug_risk):** Using Sequence to check for list-like objects may include strings and bytes.
Explicitly exclude str and bytes when checking for Sequence to prevent unintended handling of these types.
</issue_to_address>
### Comment 2
<location> `json2xml/parallel.py:14-21` </location>
<code_context>
+```python
+import sys
+
+def is_free_threaded() -> bool:
+ """Check if running on free-threaded Python build."""
+ return hasattr(sys, '_is_gil_enabled') and not sys._is_gil_enabled()
</code_context>
<issue_to_address>
**suggestion:** Reliance on sys._is_gil_enabled() may break in future Python versions.
Document the use of this private attribute and consider adding a fallback to handle potential changes in future Python versions.
```suggestion
def is_free_threaded() -> bool:
"""
Check if running on free-threaded Python build (Python 3.13t).
Note:
This function relies on the private attribute `sys._is_gil_enabled`, which may change or be removed in future Python versions.
If the attribute is not present, or its semantics change, this function will fall back to assuming GIL is enabled.
Returns:
bool: True if running on free-threaded build, False otherwise.
"""
# Fallback: If attribute is missing or not callable, assume GIL is enabled.
gil_enabled = True
if hasattr(sys, '_is_gil_enabled'):
try:
gil_enabled = sys._is_gil_enabled()
except Exception:
pass
return not gil_enabled
```
</issue_to_address>
### Comment 3
<location> `json2xml/parallel.py:24` </location>
<code_context>
+ return hasattr(sys, '_is_gil_enabled') and not sys._is_gil_enabled()
+
+
+def get_optimal_workers(workers: int | None = None) -> int:
+ """
+ Get the optimal number of worker threads.
</code_context>
<issue_to_address>
**suggestion (performance):** Limiting workers to min(4, cpu_count) for non-free-threaded Python may be too restrictive.
Making the worker limit configurable or informed by benchmarks could better utilize system resources on machines with many cores.
Suggested implementation:
```python
def get_optimal_workers(
workers: int | None = None,
max_workers_limit: int | None = None
) -> int:
"""
Get the optimal number of worker threads.
Args:
workers: Explicitly specified worker count. If None, auto-detect.
max_workers_limit: Optional cap for worker count on non-free-threaded Python.
Returns:
int: Number of worker threads to use.
"""
if workers is not None:
return max(1, workers)
```
```python
if workers is not None:
return max(1, workers)
import os
cpu_count = os.cpu_count() or 1
if is_free_threaded():
optimal = cpu_count
else:
# Use configurable limit or default to 4
limit = max_workers_limit if max_workers_limit is not None else 4
optimal = min(limit, cpu_count)
return max(1, optimal)
```
</issue_to_address>
### Comment 4
<location> `json2xml/parallel.py:217-226` </location>
<code_context>
+ if len(obj) < 10:
+ return convert_dict(obj, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers)
+
+ with ThreadPoolExecutor(max_workers=workers) as executor:
+ futures = []
+ for key, val in obj.items():
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Using as_completed may result in out-of-order results.
If a future raises an exception, the current error handling may not be sufficient. Please add error handling for failed futures.
</issue_to_address>
### Comment 5
<location> `tests/test_parallel.py:60-58` </location>
<code_context>
+ )
+ assert result_parallel == result_serial
+
+ def test_parallel_dict_large(self) -> None:
+ """Test parallel dict conversion with large data."""
+ data = {f"key{i}": f"value{i}" for i in range(20)}
+ result_parallel = convert_dict_parallel(
+ data, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=4
+ )
+ result_serial = dicttoxml.convert_dict(
+ data, [], "root", True, dicttoxml.default_item_func, False, True, False
+ )
+ assert result_parallel == result_serial
+
+ def test_parallel_list_small(self) -> None:
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for error handling in parallel conversion functions.
Please add tests that check how parallel conversion functions handle invalid inputs and ensure appropriate exceptions are raised.
Suggested implementation:
```python
def test_parallel_dict_invalid_input(self) -> None:
"""Test parallel dict conversion with invalid input types."""
# Passing a list instead of a dict
invalid_data = ["not", "a", "dict"]
with pytest.raises(TypeError):
convert_dict_parallel(
invalid_data, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2
)
# Passing None
with pytest.raises(TypeError):
convert_dict_parallel(
None, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2
)
def test_parallel_list_invalid_input(self) -> None:
"""Test parallel list conversion with invalid input types."""
# Passing a dict instead of a list
invalid_data = {"not": "a list"}
with pytest.raises(TypeError):
convert_list_parallel(
invalid_data, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2, chunk_size=100
)
# Passing None
with pytest.raises(TypeError):
convert_list_parallel(
None, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2, chunk_size=100
)
def test_parallel_list_small(self) -> None:
"""Test parallel list conversion with small data (should fallback to serial)."""
data = ["item1", "item2", "item3"]
result_parallel = convert_list_parallel(
data, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2, chunk_size=100
)
result_serial = dicttoxml.convert_list(
data, [], "root", True, dicttoxml.default_item_func, False, True, False
)
assert result_parallel == result_serial
```
These tests assume that `convert_dict_parallel` and `convert_list_parallel` raise `TypeError` for invalid input types. If they raise a different exception (e.g., `ValueError`), adjust the `pytest.raises` argument accordingly.
You must also ensure that `pytest` is imported at the top of the file:
```python
import pytest
```
If not already present, add this import.
</issue_to_address>
### Comment 6
<location> `tests/test_parallel.py:202-211` </location>
<code_context>
+
+ assert result_parallel == result_serial
+
+ def test_parallel_with_special_characters(self) -> None:
+ """Test parallel processing with special XML characters."""
+ data = {
+ f"key{i}": f"value with <special> & \"characters\" {i}"
+ for i in range(15)
+ }
+
+ result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=4)
+ result_serial = dicttoxml.dicttoxml(data, parallel=False)
+
+ assert result_parallel == result_serial
+ assert b"<special>" in result_parallel
+ assert b"&" in result_parallel
+
+ def test_parallel_empty_data(self) -> None:
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for other XML edge cases, such as attributes, namespaces, and CDATA sections.
Adding tests for attributes, namespaces, and CDATA will help verify that parallel processing handles these XML features correctly and avoids subtle bugs.
Suggested implementation:
```python
def test_parallel_with_attributes(self) -> None:
"""Test parallel processing with XML attributes."""
# Simulate attribute handling using dicttoxml's attr_type feature
data = {
"person": {
"@id": "123",
"name": "Alice"
}
}
result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2, attr_type=True)
result_serial = dicttoxml.dicttoxml(data, parallel=False, attr_type=True)
assert result_parallel == result_serial
assert b'id="123"' in result_parallel
def test_parallel_with_namespaces(self) -> None:
"""Test parallel processing with XML namespaces."""
# Simulate namespace handling by including a namespace in the tag
data = {
"ns:person": {
"name": "Bob"
}
}
result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2)
result_serial = dicttoxml.dicttoxml(data, parallel=False)
assert result_parallel == result_serial
assert b"<ns:person>" in result_parallel
def test_parallel_with_cdata(self) -> None:
"""Test parallel processing with CDATA sections."""
# Simulate CDATA by including a value that should be wrapped in CDATA
data = {
"note": "<![CDATA[Some <cdata> content & more]]>"
}
result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2)
result_serial = dicttoxml.dicttoxml(data, parallel=False)
assert result_parallel == result_serial
assert b"<![CDATA[Some <cdata> content & more]]>" in result_parallel
def test_parallel_empty_data(self) -> None:
"""Test parallel processing with empty data."""
data = {"key": "value"}
converter = Json2xml(data, parallel=True, workers=4)
result = converter.to_xml()
assert result is not None
```
- If your dicttoxml library or wrapper does not support attributes, namespaces, or CDATA natively, you may need to adjust the test data or parameters to match your implementation.
- Ensure that the dicttoxml.dicttoxml function supports the `attr_type` argument for attributes. If not, you may need to use a different approach or library for these tests.
- If your XML generator does not support CDATA sections, you may need to extend its functionality or mock the expected output.
</issue_to_address>
### Comment 7
<location> `tests/test_parallel.py:216-221` </location>
<code_context>
+ assert b"<special>" in result_parallel
+ assert b"&" in result_parallel
+
+ def test_parallel_empty_data(self) -> None:
+ """Test parallel processing with empty data."""
+ data = {"key": "value"}
+ converter = Json2xml(data, parallel=True, workers=4)
+ result = converter.to_xml()
+ assert result is not None
+
+ def test_parallel_with_none_workers(self) -> None:
</code_context>
<issue_to_address>
**nitpick (testing):** Consider adding assertions for the actual output of empty data conversion.
Asserting the exact output for empty data will make the test more reliable and help detect unintended changes in the output format.
</issue_to_address>
### Comment 8
<location> `tests/test_parallel.py:223-221` </location>
<code_context>
+ result = converter.to_xml()
+ assert result is not None
+
+ def test_parallel_with_none_workers(self) -> None:
+ """Test parallel processing with None workers (auto-detect)."""
+ data = {f"key{i}": f"value{i}" for i in range(20)}
+ converter = Json2xml(data, parallel=True, workers=None)
+ result = converter.to_xml()
+ assert result is not None
+
+ def test_parallel_dict_order_preserved(self) -> None:
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for chunk_size=None or invalid chunk_size values.
Testing chunk_size=None or invalid values (e.g., negative, zero) will verify that the code handles these scenarios correctly and applies appropriate defaults.
</issue_to_address>
### Comment 9
<location> `json2xml/dicttoxml.py:721` </location>
<code_context>
- )
- output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>")
+
+ if parallel:
+ from json2xml.parallel import convert_dict_parallel, convert_list_parallel
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the conversion logic into a single helper function to centralize dispatch and eliminate nested conditionals.
```python
# add this helper at module‐level to unify dispatching
def _dispatch_convert(
obj, ids, parent,
attr_type, item_func, cdata, item_wrap, list_headers,
parallel, workers, chunk_size
):
if parallel:
if isinstance(obj, dict):
return convert_dict_parallel(
obj, ids, parent,
attr_type=attr_type, item_func=item_func, cdata=cdata,
item_wrap=item_wrap, list_headers=list_headers,
workers=workers, min_items_for_parallel=10
)
if isinstance(obj, Sequence):
return convert_list_parallel(
obj, ids, parent,
attr_type=attr_type, item_func=item_func, cdata=cdata,
item_wrap=item_wrap, list_headers=list_headers,
workers=workers, chunk_size=chunk_size
)
# fallback to serial
return convert(
obj, ids,
attr_type, item_func, cdata, item_wrap,
parent=parent, list_headers=list_headers
)
def dicttoxml(
obj: ELEMENT,
root: bool = True,
custom_root: str = "root",
ids: list[int] | None = None,
attr_type: bool = True,
item_wrap: bool = True,
item_func: Callable[[str], str] = default_item_func,
cdata: bool = False,
xml_namespaces: dict[str, Any] = {},
list_headers: bool = False,
parallel: bool = False,
workers: int | None = None,
chunk_size: int = 100
) -> bytes:
# ... build namespace_str ...
# choose parent name
parent = custom_root if root else ""
# single dispatch point
output_elem = _dispatch_convert(
obj, ids, parent,
attr_type, item_func, cdata, item_wrap, list_headers,
parallel, workers, chunk_size
)
if root:
output = ['<?xml version="1.0" encoding="UTF-8" ?>']
output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>")
else:
output = [output_elem]
return "".join(output).encode("utf-8")
```
This removes the duplicated `if root`/`elif parallel` nesting and centralizes all conversion logic into one flat helper.
</issue_to_address>
### Comment 10
<location> `json2xml/parallel.py:49` </location>
<code_context>
+_validation_cache_lock = threading.Lock()
+_validation_cache: dict[str, bool] = {}
+
+def key_is_valid_xml_cached(key: str) -> bool:
+ """Thread-safe cached version of key_is_valid_xml."""
+ with _validation_cache_lock:
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the parallel functions to use a generic executor wrapper and functools.lru_cache for caching to reduce duplication and thread-safety code.
```suggestion
Consider collapsing the two “parallel” functions into one generic executor wrapper and swapping out the manual cache/lock for `functools.lru_cache`. This removes almost all duplication and thread‐safety boilerplate while preserving behavior.
1) Replace manual cache + lock with `lru_cache`:
```python
from functools import lru_cache
@lru_cache(maxsize=None)
def key_is_valid_xml(key: str) -> bool:
return dicttoxml.key_is_valid_xml(key)
```
2) Add a single helper to parallelize any function over a list of args:
```python
from concurrent.futures import ThreadPoolExecutor, as_completed
def _parallel_map(
fn: Callable,
args_list: list[tuple],
workers: int | None = None,
min_items: int = 0
) -> list[Any]:
if len(args_list) <= min_items:
return [fn(*args) for args in args_list]
workers = get_optimal_workers(workers)
results = [None] * len(args_list)
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_idx = {
executor.submit(fn, *args): idx
for idx, args in enumerate(args_list)
}
for fut in as_completed(future_to_idx):
results[future_to_idx[fut]] = fut.result()
return results
```
3) Refactor your two APIs to use `_parallel_map`. Example for dict:
```python
def convert_dict_parallel(...):
items = list(obj.items())
args = [
(key, val, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers)
for key, val in items
]
parts = _parallel_map(_convert_dict_item, args, workers, min_items_for_parallel)
return "".join(parts)
```
And similarly for lists. This removes ~200 lines of duplicated pool setup, cache, and ordering logic.```
</issue_to_address>
### Comment 11
<location> `json2xml/parallel.py:132` </location>
<code_context>
attr = {} if not ids else {"id": f"{dicttoxml.get_unique_id(parent)}"}
</code_context>
<issue_to_address>
**suggestion (code-quality):** Swap if/else branches of if expression to remove negation ([`swap-if-expression`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/swap-if-expression))
```suggestion
attr = {"id": f"{dicttoxml.get_unique_id(parent)}"} if ids else {}
```
<br/><details><summary>Explanation</summary>Negated conditions are more difficult to read than positive ones, so it is best
to avoid them where we can. By swapping the `if` and `else` conditions around we
can invert the condition and make it positive.
</details>
</issue_to_address>
### Comment 12
<location> `benchmark.py:61` </location>
<code_context>
def run_benchmarks():
"""Run comprehensive benchmarks."""
print("=" * 80)
print("json2xml Performance Benchmark")
print("=" * 80)
print(f"Python Version: {sys.version}")
print(f"Python Executable: {sys.executable}")
print(f"Free-threaded: {'Yes' if is_free_threaded() else 'No'}")
gil_status = "Disabled (Free-threaded)" if is_free_threaded() else "Enabled (Standard GIL)"
print(f"GIL Status: {gil_status}")
print("=" * 80)
print()
sizes = ["small", "medium", "large", "xlarge"]
for size in sizes:
print(f"\n{size.upper()} Dataset:")
print("-" * 80)
data = generate_test_data(size)
# Count items
if "users" in data:
item_count = len(data["users"])
elif "data" in data:
item_count = len(data["data"])
else:
item_count = len(data)
print(f"Items: {item_count}")
# Serial benchmark
serial_time = benchmark_conversion(data, parallel=False)
print(f"Serial: {serial_time*1000:.2f} ms")
# Parallel benchmarks with different worker counts
for workers in [2, 4, 8]:
parallel_time = benchmark_conversion(data, parallel=True, workers=workers, chunk_size=100)
speedup = serial_time / parallel_time
print(f"Parallel ({workers}w): {parallel_time*1000:.2f} ms (speedup: {speedup:.2f}x)")
print()
print("=" * 80)
print("Benchmark complete!")
print("=" * 80)
</code_context>
<issue_to_address>
**issue (code-quality):** Extract duplicate code into function ([`extract-duplicate-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-duplicate-method/))
</issue_to_address>
### Comment 13
<location> `json2xml/parallel.py:39-42` </location>
<code_context>
def get_optimal_workers(workers: int | None = None) -> int:
"""
Get the optimal number of worker threads.
Args:
workers: Explicitly specified worker count. If None, auto-detect.
Returns:
int: Number of worker threads to use.
"""
if workers is not None:
return max(1, workers)
cpu_count = os.cpu_count() or 4
if is_free_threaded():
return cpu_count
else:
return min(4, cpu_count)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
```suggestion
return cpu_count if is_free_threaded() else min(4, cpu_count)
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def test_parallel_with_special_characters(self) -> None: | ||
| """Test parallel processing with special XML characters.""" | ||
| data = { | ||
| f"key{i}": f"value with <special> & \"characters\" {i}" | ||
| for i in range(15) | ||
| } | ||
|
|
||
| result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=4) | ||
| result_serial = dicttoxml.dicttoxml(data, parallel=False) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Consider adding tests for other XML edge cases, such as attributes, namespaces, and CDATA sections.
Adding tests for attributes, namespaces, and CDATA will help verify that parallel processing handles these XML features correctly and avoids subtle bugs.
Suggested implementation:
def test_parallel_with_attributes(self) -> None:
"""Test parallel processing with XML attributes."""
# Simulate attribute handling using dicttoxml's attr_type feature
data = {
"person": {
"@id": "123",
"name": "Alice"
}
}
result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2, attr_type=True)
result_serial = dicttoxml.dicttoxml(data, parallel=False, attr_type=True)
assert result_parallel == result_serial
assert b'id="123"' in result_parallel
def test_parallel_with_namespaces(self) -> None:
"""Test parallel processing with XML namespaces."""
# Simulate namespace handling by including a namespace in the tag
data = {
"ns:person": {
"name": "Bob"
}
}
result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2)
result_serial = dicttoxml.dicttoxml(data, parallel=False)
assert result_parallel == result_serial
assert b"<ns:person>" in result_parallel
def test_parallel_with_cdata(self) -> None:
"""Test parallel processing with CDATA sections."""
# Simulate CDATA by including a value that should be wrapped in CDATA
data = {
"note": "<![CDATA[Some <cdata> content & more]]>"
}
result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2)
result_serial = dicttoxml.dicttoxml(data, parallel=False)
assert result_parallel == result_serial
assert b"<![CDATA[Some <cdata> content & more]]>" in result_parallel
def test_parallel_empty_data(self) -> None:
"""Test parallel processing with empty data."""
data = {"key": "value"}
converter = Json2xml(data, parallel=True, workers=4)
result = converter.to_xml()
assert result is not None- If your dicttoxml library or wrapper does not support attributes, namespaces, or CDATA natively, you may need to adjust the test data or parameters to match your implementation.
- Ensure that the dicttoxml.dicttoxml function supports the
attr_typeargument for attributes. If not, you may need to use a different approach or library for these tests. - If your XML generator does not support CDATA sections, you may need to extend its functionality or mock the expected output.
| ) | ||
| output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>") | ||
|
|
||
| if parallel: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider refactoring the conversion logic into a single helper function to centralize dispatch and eliminate nested conditionals.
# add this helper at module‐level to unify dispatching
def _dispatch_convert(
obj, ids, parent,
attr_type, item_func, cdata, item_wrap, list_headers,
parallel, workers, chunk_size
):
if parallel:
if isinstance(obj, dict):
return convert_dict_parallel(
obj, ids, parent,
attr_type=attr_type, item_func=item_func, cdata=cdata,
item_wrap=item_wrap, list_headers=list_headers,
workers=workers, min_items_for_parallel=10
)
if isinstance(obj, Sequence):
return convert_list_parallel(
obj, ids, parent,
attr_type=attr_type, item_func=item_func, cdata=cdata,
item_wrap=item_wrap, list_headers=list_headers,
workers=workers, chunk_size=chunk_size
)
# fallback to serial
return convert(
obj, ids,
attr_type, item_func, cdata, item_wrap,
parent=parent, list_headers=list_headers
)
def dicttoxml(
obj: ELEMENT,
root: bool = True,
custom_root: str = "root",
ids: list[int] | None = None,
attr_type: bool = True,
item_wrap: bool = True,
item_func: Callable[[str], str] = default_item_func,
cdata: bool = False,
xml_namespaces: dict[str, Any] = {},
list_headers: bool = False,
parallel: bool = False,
workers: int | None = None,
chunk_size: int = 100
) -> bytes:
# ... build namespace_str ...
# choose parent name
parent = custom_root if root else ""
# single dispatch point
output_elem = _dispatch_convert(
obj, ids, parent,
attr_type, item_func, cdata, item_wrap, list_headers,
parallel, workers, chunk_size
)
if root:
output = ['<?xml version="1.0" encoding="UTF-8" ?>']
output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>")
else:
output = [output_elem]
return "".join(output).encode("utf-8")This removes the duplicated if root/elif parallel nesting and centralizes all conversion logic into one flat helper.
| _validation_cache_lock = threading.Lock() | ||
|
|
||
|
|
||
| def key_is_valid_xml_cached(key: str) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider refactoring the parallel functions to use a generic executor wrapper and functools.lru_cache for caching to reduce duplication and thread-safety code.
| def key_is_valid_xml_cached(key: str) -> bool: | |
| Consider collapsing the two “parallel” functions into one generic executor wrapper and swapping out the manual cache/lock for `functools.lru_cache`. This removes almost all duplication and thread‐safety boilerplate while preserving behavior. | |
| 1) Replace manual cache + lock with `lru_cache`: | |
| ```python | |
| from functools import lru_cache | |
| @lru_cache(maxsize=None) | |
| def key_is_valid_xml(key: str) -> bool: | |
| return dicttoxml.key_is_valid_xml(key) |
- Add a single helper to parallelize any function over a list of args:
from concurrent.futures import ThreadPoolExecutor, as_completed
def _parallel_map(
fn: Callable,
args_list: list[tuple],
workers: int | None = None,
min_items: int = 0
) -> list[Any]:
if len(args_list) <= min_items:
return [fn(*args) for args in args_list]
workers = get_optimal_workers(workers)
results = [None] * len(args_list)
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_idx = {
executor.submit(fn, *args): idx
for idx, args in enumerate(args_list)
}
for fut in as_completed(future_to_idx):
results[future_to_idx[fut]] = fut.result()
return results- Refactor your two APIs to use
_parallel_map. Example for dict:
def convert_dict_parallel(...):
items = list(obj.items())
args = [
(key, val, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers)
for key, val in items
]
parts = _parallel_map(_convert_dict_item, args, workers, min_items_for_parallel)
return "".join(parts)And similarly for lists. This removes ~200 lines of duplicated pool setup, cache, and ordering logic.```
|
|
||
| def run_benchmarks(): | ||
| """Run comprehensive benchmarks.""" | ||
| print("=" * 80) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Extract duplicate code into function (extract-duplicate-method)
| output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>") | ||
|
|
||
| if parallel: | ||
| from json2xml.parallel import convert_dict_parallel, convert_list_parallel |
Check notice
Code scanning / CodeQL
Cyclic import Note
json2xml.parallel
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
The best way to fix the cyclic import is to refactor code such that the common logic that's required by both dicttoxml.py and parallel.py is moved to a new submodule or utility file (e.g., json2xml/utils.py or json2xml/shared.py). This common code can then be imported by both modules without creating a cycle. Specifically, move all functions that are imported in both directions (i.e., those in parallel.py that import from dicttoxml.py, and vice versa) into the new utility module, and change the imports in both files appropriately.
From the code provided, it looks like only the specific functions convert_dict_parallel and convert_list_parallel are imported from json2xml.parallel in this location. If, on inspection, json2xml.parallel imports any functions from dicttoxml.py (e.g., core conversion logic), those should be moved out to a third module and both files updated.
Since we are only allowed to edit code that is shown, and assuming that the import on line 749 is the problem, the immediate step is to "invert the dependency": Move the conversion helpers used for parallel execution from parallel.py into a new file (say shared.py), or within dicttoxml.py if they only use logic from there, and update references. In this context, assuming the original import can be changed to avoid the dependency, we can move the parallel conversion helpers to a new shared module (e.g., json2xml/parallel_helpers.py), import from there, and restrict the use of parallel.py as needed.
Within the snippet, only edit the import on line 749 and related usage; include comments for clarity. No change to core behavior should be made.
-
Copy modified line R749
| @@ -746,7 +746,7 @@ | ||
| should_use_parallel = False | ||
|
|
||
| if should_use_parallel: | ||
| from json2xml.parallel import convert_dict_parallel, convert_list_parallel | ||
| from json2xml.parallel_helpers import convert_dict_parallel, convert_list_parallel # Moved to break cyclical import | ||
|
|
||
| if root: | ||
| output.append('<?xml version="1.0" encoding="UTF-8" ?>') |
- Change ids parameter type from list[int] to list[str] in dicttoxml function - Update convert_dict and convert_dict_parallel to accept list[str] | None - Fix test to use string list instead of int list for ids - Add None check in test_parallel.py before type narrowing result_bytes Amp-Thread-ID: https://ampcode.com/threads/T-ab40799c-7282-451b-bdf6-4a74c73a62b7 Co-authored-by: Amp <[email protected]>
…rage - Add tests for parallel processing without root element (dict, list, primitive) - Add tests for make_valid_xml_name_cached function with various edge cases - Add tests for parallel processing with sequences, None values, and error cases - Add tests for boolean and datetime values in parallel mode - Coverage improved from 94% to 99% Amp-Thread-ID: https://ampcode.com/threads/T-ab40799c-7282-451b-bdf6-4a74c73a62b7 Co-authored-by: Amp <[email protected]>
Amp-Thread-ID: https://ampcode.com/threads/T-ab40799c-7282-451b-bdf6-4a74c73a62b7 Co-authored-by: Amp <[email protected]>
…diff coverage - Add tests for get_xml_type with Decimal and Fraction (numbers.Number subclasses) - Add test for parallel processing with root and primitive values - Add test for get_optimal_workers in non-free-threaded mode - Coverage improved to 99% (393 total statements, only 2 uncovered) - All ruff and ty checks pass Amp-Thread-ID: https://ampcode.com/threads/T-ab40799c-7282-451b-bdf6-4a74c73a62b7 Co-authored-by: Amp <[email protected]>
|
|
||
| import os | ||
| import sys | ||
| import threading |
Check notice
Code scanning / CodeQL
Unused import Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
The correct fix is to remove the unused import statement for threading from the code (import threading on line 6 of json2xml/parallel.py). This resolves the warning without impacting any existing functionality since threading is not used anywhere in the snippets shown. No additional imports or changes are required.
| @@ -3,7 +3,6 @@ | ||
|
|
||
| import os | ||
| import sys | ||
| import threading | ||
| from collections.abc import Callable, Sequence | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from functools import lru_cache |
| if hasattr(sys, '_is_gil_enabled'): | ||
| try: | ||
| gil_enabled = sys._is_gil_enabled() | ||
| except Exception: |
Check notice
Code scanning / CodeQL
Empty except Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
To fix the problem, the except block should not be left empty. The best solutions are to (a) log the exception, so that failures are noticed and can be diagnosed, or (b) add an explanatory comment to clarify why ignoring the exception is correct and intentional. Prefer (a) for most production code unless there is a very strong rationale for silent suppression. In this case, logging the exception is best. The built-in logging module is standard and should be used. Changes required:
- Add
import loggingto the imports if not present. - In the except block, replace
passwith a logging call (e.g.,logging.debug("Exception in sys._is_gil_enabled(): %s", e)). - Update the except line to
except Exception as e:
All changes are within the shown code.
-
Copy modified line R7 -
Copy modified lines R30-R31
| @@ -4,6 +4,7 @@ | ||
| import os | ||
| import sys | ||
| import threading | ||
| import logging | ||
| from collections.abc import Callable, Sequence | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from functools import lru_cache | ||
| @@ -26,8 +27,8 @@ | ||
| if hasattr(sys, '_is_gil_enabled'): | ||
| try: | ||
| gil_enabled = sys._is_gil_enabled() | ||
| except Exception: | ||
| pass | ||
| except Exception as e: | ||
| logging.debug("Exception in sys._is_gil_enabled(): %s", e) | ||
| return not gil_enabled | ||
|
|
||
|
|
| Returns: | ||
| bool: True if the key is valid XML, False otherwise. | ||
| """ | ||
| from json2xml import dicttoxml |
Check notice
Code scanning / CodeQL
Cyclic import Note
json2xml.dicttoxml
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
To fix the cyclic import between json2xml.parallel and json2xml.dicttoxml, we should break the dependency loop. Since only certain functions require access to dicttoxml (key_is_valid_xml_cached and make_valid_xml_name_cached), and they are simply wrappers for functions in dicttoxml, the cleanest solution is to move these two functions (key_is_valid_xml_cached and make_valid_xml_name_cached) from parallel.py into dicttoxml.py. This eliminates the need for parallel.py to import dicttoxml at all, breaking the loop. After moving these function definitions (along with their imports and docstrings), remove them completely from parallel.py.
Furthermore, wherever these functions are used outside dicttoxml, update their import location, if needed (though you instructed us to only edit snippets we've been shown, so we won't update usages outside the given parallel.py code).
In this snippet, remove the following from parallel.py:
- The entire
key_is_valid_xml_cachedfunction (lines 63–76) - The entire
make_valid_xml_name_cachedfunction (lines 78–107)
Note: The implementations can be safely moved into dicttoxml.py, but per the instructions, we only edit code shown, which is limited to parallel.py.
| @@ -60,52 +60,8 @@ | ||
| return max(1, optimal) | ||
|
|
||
|
|
||
| @lru_cache(maxsize=None) | ||
| def key_is_valid_xml_cached(key: str) -> bool: | ||
| """ | ||
| Thread-safe cached version of key_is_valid_xml. | ||
|
|
||
| Args: | ||
| key: The XML key to validate. | ||
|
|
||
| Returns: | ||
| bool: True if the key is valid XML, False otherwise. | ||
| """ | ||
| from json2xml import dicttoxml | ||
| return dicttoxml.key_is_valid_xml(key) | ||
|
|
||
|
|
||
| def make_valid_xml_name_cached(key: str, attr: dict[str, Any]) -> tuple[str, dict[str, Any]]: | ||
| """ | ||
| Thread-safe cached version of make_valid_xml_name. | ||
|
|
||
| Args: | ||
| key: The key to validate. | ||
| attr: The attributes dictionary. | ||
|
|
||
| Returns: | ||
| tuple: Valid XML key and updated attributes. | ||
| """ | ||
| from json2xml import dicttoxml | ||
| key = dicttoxml.escape_xml(key) | ||
|
|
||
| if key_is_valid_xml_cached(key): | ||
| return key, attr | ||
|
|
||
| if isinstance(key, int) or key.isdigit(): | ||
| return f"n{key}", attr | ||
|
|
||
| if key_is_valid_xml_cached(key.replace(" ", "_")): | ||
| return key.replace(" ", "_"), attr | ||
|
|
||
| if key_is_valid_xml_cached(key.replace(":", "").replace("@flat", "")): | ||
| return key, attr | ||
|
|
||
| attr["name"] = key | ||
| key = "key" | ||
| return key, attr | ||
|
|
||
|
|
||
| def _convert_dict_item( | ||
| key: str, | ||
| val: Any, |
| Returns: | ||
| tuple: Valid XML key and updated attributes. | ||
| """ | ||
| from json2xml import dicttoxml |
Check notice
Code scanning / CodeQL
Cyclic import Note
json2xml.dicttoxml
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
General Approach:
To fix a cyclic import, move interdependent functions between modules to make them unidirectional, or refactor shared code into a new module that both can import without cyclic references.
Specific fix (best for this code):
Both cached XML helper functions in parallel.py (key_is_valid_xml_cached and make_valid_xml_name_cached) depend on dicttoxml. If only these helpers depend on dicttoxml, it's best to move them (including their decorators) into dicttoxml.py. This breaks the cycle: parallel.py will not need to import dicttoxml, and any functions in dicttoxml can call the helpers locally.
Changes to make:
- Remove the two helper functions (
key_is_valid_xml_cachedandmake_valid_xml_name_cached) fromparallel.py, replacing their calls with imports fromdicttoxml. - In
dicttoxml.py, add these two helpers so they are accessible to any code needing them. - In
parallel.py, replace local usage of these helpers with imports fromdicttoxml.
Needed for implementation:
- Move function definitions (including docstrings and any decorators) to
dicttoxml.py. - Add import lines for the helpers in
parallel.pywhere needed. - Remove the function-local imports (
from json2xml import dicttoxml) from withinparallel.py.
-
Copy modified line R63 -
Copy modified line R66
| @@ -60,52 +60,12 @@ | ||
| return max(1, optimal) | ||
|
|
||
|
|
||
| @lru_cache(maxsize=None) | ||
| def key_is_valid_xml_cached(key: str) -> bool: | ||
| """ | ||
| Thread-safe cached version of key_is_valid_xml. | ||
| # (Moved to dicttoxml.py. Remove from parallel.py.) | ||
|
|
||
| Args: | ||
| key: The XML key to validate. | ||
|
|
||
| Returns: | ||
| bool: True if the key is valid XML, False otherwise. | ||
| """ | ||
| from json2xml import dicttoxml | ||
| return dicttoxml.key_is_valid_xml(key) | ||
| # (Moved to dicttoxml.py. Remove from parallel.py.) | ||
|
|
||
|
|
||
| def make_valid_xml_name_cached(key: str, attr: dict[str, Any]) -> tuple[str, dict[str, Any]]: | ||
| """ | ||
| Thread-safe cached version of make_valid_xml_name. | ||
|
|
||
| Args: | ||
| key: The key to validate. | ||
| attr: The attributes dictionary. | ||
|
|
||
| Returns: | ||
| tuple: Valid XML key and updated attributes. | ||
| """ | ||
| from json2xml import dicttoxml | ||
| key = dicttoxml.escape_xml(key) | ||
|
|
||
| if key_is_valid_xml_cached(key): | ||
| return key, attr | ||
|
|
||
| if isinstance(key, int) or key.isdigit(): | ||
| return f"n{key}", attr | ||
|
|
||
| if key_is_valid_xml_cached(key.replace(" ", "_")): | ||
| return key.replace(" ", "_"), attr | ||
|
|
||
| if key_is_valid_xml_cached(key.replace(":", "").replace("@flat", "")): | ||
| return key, attr | ||
|
|
||
| attr["name"] = key | ||
| key = "key" | ||
| return key, attr | ||
|
|
||
|
|
||
| def _convert_dict_item( | ||
| key: str, | ||
| val: Any, |
| """ | ||
| if not isinstance(obj, dict): | ||
| raise TypeError("obj must be a dict") | ||
| from json2xml import dicttoxml |
Check notice
Code scanning / CodeQL
Cyclic import Note
json2xml.dicttoxml
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
The best practice to break cyclic imports between modules is to ensure neither module "directly" depends on the other at import time. In this case, the import of dicttoxml is only needed inside a function (convert_dict_parallel), specifically to call dicttoxml.convert_dict if the number of items is below a threshold. A straightforward way to fix this is to move the affected logic (the code for handling small dicts — i.e., the part that calls dicttoxml.convert_dict) out of parallel.py into dicttoxml.py, since conceptually, "parallel" features are extensions or variants of the main conversion logic (single-threaded dictionary conversion).
Here's the recommended approach:
- Move the full
convert_dict_parallelfunction fromparallel.pytodicttoxml.py. - In
parallel.py, remove the import ofdicttoxmland the implementation ofconvert_dict_parallel. - In
dicttoxml.py, import or otherwise reference any helper functions needed fromparallel.py(such as_convert_dict_item,get_optimal_workers) for parallel operation.- If those helpers are required, consider moving all parallel-related helpers (such as
_convert_dict_item,_convert_list_chunk) to a new submodule (json2xml.parallel_helpersor similar), then import that submodule where needed, to avoid new long-term cycles.
- If those helpers are required, consider moving all parallel-related helpers (such as
- Update any usages or references accordingly.
Specific changes:
- In
parallel.py: Remove the local import (from json2xml import dicttoxml) and delete the entireconvert_dict_parallelfunction. - In
dicttoxml.py: Add the fullconvert_dict_parallelfunction, and import required helpers. - Optionally: If helpers must move to avoid new cycles, move them to a third helper module.
Since you only shared the code for parallel.py, I can only provide changes for that file. Further migration steps in dicttoxml.py can be done similarly.
-
Copy modified line R218 -
Copy modified lines R225-R252
| @@ -215,41 +215,41 @@ | ||
| Returns: | ||
| str: XML string. | ||
| """ | ||
| if not isinstance(obj, dict): | ||
| raise TypeError("obj must be a dict") | ||
| from json2xml import dicttoxml | ||
| if len(obj) < min_items_for_parallel: | ||
| return dicttoxml.convert_dict( | ||
| obj, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers | ||
| ) | ||
| # REMOVED: convert_dict_parallel now lives in dicttoxml.py. See dicttoxml.convert_dict_parallel for the parallel implementation. | ||
|
|
||
| workers = get_optimal_workers(workers) | ||
| items = list(obj.items()) | ||
| results: dict[int, str] = {} | ||
|
|
||
| with ThreadPoolExecutor(max_workers=workers) as executor: | ||
| future_to_idx = { | ||
| executor.submit( | ||
| _convert_dict_item, | ||
| key, val, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers | ||
| ): idx | ||
| for idx, (key, val) in enumerate(items) | ||
| } | ||
|
|
||
| for future in as_completed(future_to_idx): | ||
| idx = future_to_idx[future] | ||
| try: | ||
| results[idx] = future.result() | ||
| except Exception as e: | ||
| # Cancel remaining futures | ||
| for f in future_to_idx: | ||
| if not f.done(): | ||
| f.cancel() | ||
| raise e | ||
|
|
||
| return "".join(results[idx] for idx in range(len(items))) | ||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
| def _convert_list_chunk( | ||
| items: Sequence[Any], | ||
| ids: list[str] | None, |
| """ | ||
| if not isinstance(items, Sequence) or isinstance(items, (str, bytes)): | ||
| raise TypeError("items must be a sequence (not str or bytes)") | ||
| from json2xml import dicttoxml |
Check notice
Code scanning / CodeQL
Cyclic import Note
json2xml.dicttoxml
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 4 days ago
To fix this cyclic import, the best approach is to break the dependency loop by decoupling convert_list_parallel from needing to import dicttoxml during execution. One common way is to shift this function (or part of its logic, or the part that needs to call dicttoxml.convert_list) into the dicttoxml module itself, so that parallel does not need to reference dicttoxml.
Given that convert_list_parallel is a parallelized adaptation of dicttoxml.convert_list, it's reasonable to move convert_list_parallel into json2xml/dicttoxml.py, then have the rest of the code call it from there. The parallel module should then define only the "parallelization utility" (e.g., chunking, thread pool logic), not convert_list_parallel itself.
Within the code you've shown:
- Remove (or refactor out) the function
convert_list_parallelfromjson2xml/parallel.py. - Move or request this function to be in
dicttoxml.pyinstead, sincedicttoxmlis the only code that calls its ownconvert_list. - If
parallel.pyis solely a utility for threading/chunking and doesn't need to referencedicttoxml, this breaks the dependency cycle.
-
Copy modified lines R287-R288
| @@ -284,66 +284,5 @@ | ||
| ) | ||
|
|
||
|
|
||
| def convert_list_parallel( | ||
| items: Sequence[Any], | ||
| ids: list[str] | None, | ||
| parent: str, | ||
| attr_type: bool, | ||
| item_func: Callable[[str], str], | ||
| cdata: bool, | ||
| item_wrap: bool, | ||
| list_headers: bool = False, | ||
| workers: int | None = None, | ||
| chunk_size: int = 100 | ||
| ) -> str: | ||
| """ | ||
| Parallel version of convert_list that processes list chunks concurrently. | ||
|
|
||
| Args: | ||
| items: List to convert. | ||
| ids: List of unique IDs. | ||
| parent: Parent element name. | ||
| attr_type: Whether to include type attributes. | ||
| item_func: Function to generate item names. | ||
| cdata: Whether to wrap strings in CDATA. | ||
| item_wrap: Whether to wrap list items. | ||
| list_headers: Whether to repeat headers for lists. | ||
| workers: Number of worker threads (None for auto-detect). | ||
| chunk_size: Number of items per chunk. | ||
|
|
||
| Returns: | ||
| str: XML string. | ||
| """ | ||
| if not isinstance(items, Sequence) or isinstance(items, (str, bytes)): | ||
| raise TypeError("items must be a sequence (not str or bytes)") | ||
| from json2xml import dicttoxml | ||
| if len(items) < chunk_size: | ||
| return dicttoxml.convert_list( | ||
| items, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers | ||
| ) | ||
|
|
||
| workers = get_optimal_workers(workers) | ||
| chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] | ||
| results: dict[int, str] = {} | ||
|
|
||
| with ThreadPoolExecutor(max_workers=workers) as executor: | ||
| future_to_idx = { | ||
| executor.submit( | ||
| _convert_list_chunk, | ||
| chunk, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers, idx * chunk_size | ||
| ): idx | ||
| for idx, chunk in enumerate(chunks) | ||
| } | ||
|
|
||
| for future in as_completed(future_to_idx): | ||
| idx = future_to_idx[future] | ||
| try: | ||
| results[idx] = future.result() | ||
| except Exception as e: | ||
| # Cancel remaining futures | ||
| for f in future_to_idx: | ||
| if not f.done(): | ||
| f.cancel() | ||
| raise e | ||
|
|
||
| return "".join(results[idx] for idx in range(len(chunks))) | ||
| # NOTE: `convert_list_parallel` has been moved to `json2xml/dicttoxml.py` to break an import cycle. | ||
| # Please import and use `json2xml.dicttoxml.convert_list_parallel`. |
…ignore Amp-Thread-ID: https://ampcode.com/threads/T-17f644e1-8fd3-4bb2-b2c6-bcb9119cfc98 Co-authored-by: Amp <[email protected]>
…d code - Remove unused _dispatch_convert function from dicttoxml.py - Add tests for parallel processing with cdata and xml_namespaces - Add test for exception handling in is_free_threaded - Add test for unsupported types in parallel list processing - Fix type checker issues using cast instead of ignore Amp-Thread-ID: https://ampcode.com/threads/T-17f644e1-8fd3-4bb2-b2c6-bcb9119cfc98 Co-authored-by: Amp <[email protected]>
Amp-Thread-ID: https://ampcode.com/threads/T-a444a23f-4bec-49e7-8aba-c1ab93406ffb Co-authored-by: Amp <[email protected]>
- Change minimum Python version to 3.13 - Remove support for Python 3.10-3.12 and PyPy - Update CI matrix to test 3.13t, 3.14, 3.14t, 3.15.0-alpha.1 - Update classifiers to reflect supported versions - Upgrade code syntax with pyupgrade for 3.13+ - Update lint job to use Python 3.13 - Update AGENT.md documentation Amp-Thread-ID: https://ampcode.com/threads/T-df26d9ea-141e-401a-b305-46bb4870f559 Co-authored-by: Amp <[email protected]>
Amp-Thread-ID: https://ampcode.com/threads/T-1f1f15f6-c4ef-4e6e-896a-67a085d602b0 Co-authored-by: Amp <[email protected]>
Amp-Thread-ID: https://ampcode.com/threads/T-1f1f15f6-c4ef-4e6e-896a-67a085d602b0 Co-authored-by: Amp <[email protected]>
…port sorting - Update pytest from 7.0.1 to >=8.4.1 for Python 3.14 compatibility (fixes ast.Str removal) - Add pytest-xdist and pytest-cov to dependencies with proper version constraints - Fix import sorting in test_parallel.py (ruff I001) - All tests passing (199 tests, 99% coverage) - All linting and type checks passing
- Ensure both ruff and ty run in main CI workflow - Matches lint.yml workflow which has both checks - All code quality checks now run automatically
Summary
This PR adds parallel processing support for json2xml, leveraging Python 3.14t's free-threaded capabilities (no-GIL) to achieve up to 1.55x speedup for medium-sized datasets.
🚀 Key Features
📊 Benchmark Results
Tested on macOS ARM64 with Python 3.14.0 and Python 3.14.0t:
Medium Dataset (100 items) - Best Case
Key Findings:
See
BENCHMARK_RESULTS.mdfor complete results.💻 Usage
Basic Parallel Processing
Advanced Configuration
🔧 Implementation Details
New Files
json2xml/parallel.py- Parallel processing module (318 lines)tests/test_parallel.py- Comprehensive test suite (20 tests)benchmark.py- Performance benchmarking tooldocs/performance.rst- Sphinx documentationModified Files
json2xml/json2xml.py- Addedparallel,workers,chunk_sizeparametersjson2xml/dicttoxml.py- Integrated parallel processing supportREADME.rst- Added performance section with benchmarksdocs/index.rst- Added performance documentation page✅ Testing
All 173 tests passing (153 original + 20 new parallel tests)
pytest -v # ============================= 173 passed in 0.14s ==============================📚 Documentation
Complete documentation added:
FREE_THREADED_OPTIMIZATION_ANALYSIS.md- Technical analysis and optimization strategyBENCHMARK_RESULTS.md- Detailed benchmark results and analysisIMPLEMENTATION_SUMMARY.md- Implementation details and architecturedocs/performance.rst- Sphinx documentation for usersREADME.rstwith usage examples and benchmark results🎯 Performance Recommendations
When to Use Parallel Processing
Best for:
Not recommended for:
Optimal Configuration
🔄 Breaking Changes
None - This is a fully backward-compatible change:
parallel=False)🧪 Running Benchmarks
📋 Checklist
🎉 Conclusion
This PR makes json2xml ready for Python's free-threaded future while maintaining perfect compatibility with existing code. Users can now opt-in to parallel processing and see significant performance improvements on Python 3.14t!
Related Issues: N/A (proactive optimization)
Type: Feature Enhancement
Impact: Performance improvement, no breaking changes
Summary by Sourcery
Enable optional parallel JSON-to-XML conversion in json2xml leveraging Python 3.14t free-threaded mode, add thread-safe validation caching, provide benchmarks and documentation, and maintain full backward compatibility
New Features:
parallel,workers, andchunk_sizeparametersEnhancements:
json2xml/parallel.pymodule with concurrent dict and list conversion logicdicttoxmlandJson2xmlwhile preserving default serial behaviorDocumentation:
Tests:
tests/test_parallel.pycovering detection, dict/list parallel conversion, nested data, and order preservation