Skip to content

Conversation

@dabla
Copy link
Contributor

@dabla dabla commented Jan 8, 2026

This PR is related to the discussion I started on the devlist and which allows you to natively execute async code on PythonOperators.

There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3

Below an example which show you how it can be used with async hooks:

@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPClientPool

    print("number of files:", len(files))

    async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
        @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
        async def download_file(file):
            async with pool.get_sftp_client() as sftp:
                print("downloading:", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                return buffer

        for batch in chunked(files, cpu_count() * 2):
            tasks = [asyncio.create_task(download_file(f)) for f in batch]

            # Wait for this batch to finish before starting the next
            for task in asyncio.as_completed(tasks):
                result = await task
		 # Do something with result or accumulate it and return it as an XCom

This PR will fix additional remarks made by @kaxil on the original PR which has been reverted.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@dabla dabla changed the title Feature/async python operator Add support for async callables in PythonOperator Jan 8, 2026
@dabla dabla requested a review from uranusjr January 9, 2026 08:41
@dabla dabla force-pushed the feature/async-python-operator branch from 9124c9a to cd4ec26 Compare January 9, 2026 12:58
@dabla dabla closed this Jan 9, 2026
@dabla dabla force-pushed the feature/async-python-operator branch from ebf2873 to 0b341e6 Compare January 9, 2026 13:14
…epends on modified comms supervisor which cannot be backported to older Airflow versions

Add support for async callables in PythonOperator (apache#59087)

* refactor: Implemented BaseAsyncOperator in task-sdk

* refactor: Now PythonOperator extends BaseAsyncOperator

* refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions

---------

Co-authored-by: Jason(Zhe-You) Liu <[email protected]>
(cherry picked from commit 9cab6fb)
@dabla dabla reopened this Jan 9, 2026
@dabla dabla force-pushed the feature/async-python-operator branch from 24536ae to cdecb12 Compare January 10, 2026 08:49
@dabla dabla force-pushed the feature/async-python-operator branch from 597f08a to f829c3e Compare January 10, 2026 08:58
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there, once newsfragment is added we are good

@dabla
Copy link
Contributor Author

dabla commented Jan 13, 2026

Almost there, once newsfragment is added we are good

And the newfragment ;-)

@dabla
Copy link
Contributor Author

dabla commented Jan 13, 2026

@kaxil If newsfragment is ok for you then I think it's finally ready to merge.

@kaxil
Copy link
Member

kaxil commented Jan 13, 2026

Static check is failing, worth looking at https://github.com/apache/airflow/blob/main/airflow-core/newsfragments/54505.significant.rst or other examples

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants