Skip to content

use concurrent.futures.Executor instead of multiprocessing pool to resolve conflict with duet#7938

Open
NoureldinYosri wants to merge 6 commits intoquantumlib:mainfrom
NoureldinYosri:z_phase_threads
Open

use concurrent.futures.Executor instead of multiprocessing pool to resolve conflict with duet#7938
NoureldinYosri wants to merge 6 commits intoquantumlib:mainfrom
NoureldinYosri:z_phase_threads

Conversation

@NoureldinYosri
Copy link
Collaborator

@NoureldinYosri NoureldinYosri commented Mar 7, 2026

Fixes b/490175992

@NoureldinYosri NoureldinYosri requested review from a team, mrwojtek and vtomole as code owners March 7, 2026 00:39
@github-actions github-actions bot added the size: S 10< lines changed <50 label Mar 7, 2026
Copy link
Collaborator

@eliottrosenberg eliottrosenberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes b/490175992 (discussed in quantumlib/ReCirq#461 (review))

@github-actions github-actions bot added size: M 50< lines changed <250 and removed size: S 10< lines changed <50 labels Mar 11, 2026
@codecov
Copy link

codecov bot commented Mar 11, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 99.63%. Comparing base (1ac366c) to head (762dabc).

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #7938   +/-   ##
=======================================
  Coverage   99.63%   99.63%           
=======================================
  Files        1108     1108           
  Lines       99571    99587   +16     
=======================================
+ Hits        99205    99226   +21     
+ Misses        366      361    -5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@pavoljuhas pavoljuhas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if the new ThreadPoolExecutor default is faster than a serial run. If it makes no difference or is worse, consider switching to a serial evaluation by default.

Also it may be worthwhile to check if creating a local multiprocessing.Pool with a spawn instead of fork start method would help with warnings in the bug.

Ref: https://docs.python.org/3.11/library/multiprocessing.html#multiprocessing.get_context


from __future__ import annotations

import concurrent.futures as cf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - can we use the futures module name here for less indirection - like in

from concurrent import futures

pool = num_workers_or_pool # pragma: no cover
elif num_workers_or_pool != 0:
pool = multiprocessing.Pool(num_workers_or_pool if num_workers_or_pool > 0 else None)
pool = cf.ThreadPoolExecutor(num_workers_or_pool if num_workers_or_pool > 0 else None)
Copy link
Collaborator

@pavoljuhas pavoljuhas Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadPoolExecutor is subject to GIL. Unless the mapped function spends a lot of time in numpy calls or waiting for IO, the execution would be the same as in a serial call or worse due to thread-switching overhead.

I made a quick test with a many-term sums computed in series or in parallel with multiprocessing.Pool.map vs ThreadPoolExecutor.map. The ThreadPoolExecutor took about 2.5 times longer than a serial evaluation.

example timing code
def partial_sum(start_end: tuple[int, int]) -> float:
    total = 0
    for i in range(*start_end, 3):
        total += (-1) ** i * 1.0 / i
    return total


def tedious_sum(terms_count: int, mapfunc) -> float:
    total = sum(mapfunc(partial_sum, ((start, terms_count) for start in (1, 2, 3))))
    return total

# %timeit tedious_sum(10_000_000, map)
# 2.93 s ± 51.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# pool = multiprocessing.Pool(3)
# %timeit tedious_sum(10_000_000, pool.map)
# 1.01 s ± 25.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# tpx = concurrent.futures.ThreadPoolExecutor(3)
# %timeit tedious_sum(10_000_000, tpx.map)
# 8.46 s ± 1.11 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
 

Can you make a quick comparison of the z_phase_calibration_workflow duration with ThreadPoolExecutor compared to a serial run?

If comparable I'd suggest to make it a default to do a serial evaluation.

random_state: cirq.RANDOM_STATE_OR_SEED_LIKE = None,
atol: float = 1e-3,
num_workers_or_pool: int | multiprocessing.pool.Pool = -1,
num_workers_or_pool: int | multiprocessing.pool.Pool | cf.Executor = -1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, the code later needs only the Pool.map or Executor.map functions.

Would it be possible to change this to accept either an int for a number of workers or a parallel-map function?

Comment on lines +146 to +147
assert isinstance(pool, cf.Executor)
pool.shutdown()
Copy link
Collaborator

@pavoljuhas pavoljuhas Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider wrapping the local pool in a contextlib.ExitStack context instead.
I think there is a risk otherwise for the pool to stay around if function aborts on exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size: M 50< lines changed <250

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants