Airflow-sensor-like Tasks #4156
-
I am currently considering Prefect as an alternative to Airflow. One of the most important features of Airflow for me is Sensors. Particularly I have used Airflow sensors to wait for files to land on OneDrive/SharePoint or for a reply in a Slack thread. I have not found any mentions of Sensors in the documentation for Prefect but have found this discussion. API calls are not suited for my use-cases. I have attempted to recreate Airflow's from datetime import datetime, timedelta
from typing import Optional
import pandas
from prefect import Flow, task
from prefect.engine.results import LocalResult
from prefect.engine.signals import FAIL
from prefect.schedules import IntervalSchedule
IMPORTANT_FILENAME = 'important_file.csv'
schedule = IntervalSchedule(start_date=datetime.utcnow() + timedelta(seconds=1), interval=timedelta(minutes=5))
@task(retry_delay=timedelta(minutes=1), max_retries=3)
def wait_for_data(filename: str) -> pandas.DataFrame:
try:
return pandas.read_csv(filepath_or_buffer=filename)
except FileNotFoundError:
raise FAIL(message=f'File `{filename}` does not exist.')
@task(result=LocalResult(dir='./sensor', location='{today}'))
def save_data(data: pandas.DataFrame) -> None:
return data.to_csv()
with Flow('sensors', schedule=schedule) as flow:
data = wait_for_data(filename=IMPORTANT_FILENAME)
save_data(data=data)
flow.register(project_name='demo') |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 5 replies
-
Hi @vsevolodbazhan - great question! You can recreate sensor-like behavior using the following pattern: import pendulum
from prefect.engine.signals import RETRY
@task
def my_sensor(**kwargs):
# check some state of the world
if condition_not_met:
raise RETRY("Condition not met, retrying in 5 seconds.", start_time=pendulum.now().add(seconds=5)) |
Beta Was this translation helpful? Give feedback.
Hi @vsevolodbazhan - great question! You can recreate sensor-like behavior using the following pattern: