Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor BaseSensorOperator to support Reschedule mode with Task SDK #45580

Open
kaxil opened this issue Jan 11, 2025 · 0 comments
Open

Refactor BaseSensorOperator to support Reschedule mode with Task SDK #45580

kaxil opened this issue Jan 11, 2025 · 0 comments
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk

Comments

@kaxil
Copy link
Member

kaxil commented Jan 11, 2025

BaseSensorOperator has some logic to use max_tries from Context as well as access DB directly. This won't work with Task SDK and as such DAGs like example_sensor_decorator will fail.

if self.reschedule:
ti = context["ti"]
max_tries: int = ti.max_tries or 0
retries: int = self.retries or 0
# If reschedule, use the start date of the first try (first try can be either the very
# first execution of the task, or the first execution after the task was cleared).
# If the first try's record was not saved due to the Exception occurred and the following
# transaction rollback, the next available attempt should be taken
# to prevent falling in the endless rescheduling
first_try_number = max_tries - retries + 1
with create_session() as session:
start_date = session.scalar(
select(TaskReschedule)
.where(
TaskReschedule.dag_id == ti.dag_id,
TaskReschedule.task_id == ti.task_id,
TaskReschedule.run_id == ti.run_id,
TaskReschedule.map_index == ti.map_index,
TaskReschedule.try_number >= first_try_number,
)
.order_by(TaskReschedule.id.asc())
.with_only_columns(TaskReschedule.start_date)
.limit(1)
)

Image Image

The max_tries is easier since we can pass it we can in the following code and re-generate the API client.

class TaskInstance(BaseModel):
"""Schema for TaskInstance with minimal required fields needed for Executors and Task SDK."""
id: uuid.UUID
task_id: str
dag_id: str
run_id: str
try_number: int
map_index: int | None = None

Replacing direct DB access in that BaseSensorOperator, on the other hand, needs to be figured out!

@dosubot dosubot bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Jan 11, 2025
@kaxil kaxil added area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk labels Jan 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk
Development

No branches or pull requests

1 participant