|
1 | 1 | import logging
|
2 |
| -from datetime import datetime, timedelta |
| 2 | +from datetime import datetime |
3 | 3 |
|
4 | 4 | import requests
|
5 |
| -from airflow.sdk import Asset, AssetWatcher, Context, dag, task |
| 5 | +import tenacity |
| 6 | +from airflow.providers.http.hooks.http import HttpHook |
| 7 | +from airflow.sdk import Asset, AssetWatcher, Context, Variable, dag, task |
6 | 8 |
|
7 | 9 | from triggers.finance_report import FinanceReportTrigger
|
8 | 10 |
|
9 |
| -logger = logging.getLogger(__name__) |
| 11 | +# get the airflow.task logger |
| 12 | +task_logger = logging.getLogger("airflow.task") |
| 13 | + |
10 | 14 |
|
11 | 15 | finance_report_asset = Asset(
|
12 | 16 | name="finance_report",
|
13 | 17 | watchers=[
|
14 | 18 | AssetWatcher(
|
15 | 19 | name="finance_report_watcher",
|
16 | 20 | trigger=FinanceReportTrigger(
|
17 |
| - # poke_interval=86400, # 60*60*24 |
18 |
| - poke_interval=5, # 60*60*24 |
| 21 | + poke_interval=86400, # 60*60*24 |
19 | 22 | ),
|
20 | 23 | )
|
21 | 24 | ],
|
|
37 | 40 | },
|
38 | 41 | )
|
39 | 42 | def discord_message_notification():
|
40 |
| - """Send Discord Message""" |
| 43 | + """Send Discord Message.""" |
41 | 44 |
|
42 |
| - @task( |
43 |
| - retries=10, |
44 |
| - retry_delay=timedelta(seconds=10), |
45 |
| - ) |
| 45 | + @task |
46 | 46 | def send_discord_message(**context: Context) -> None:
|
47 | 47 | triggering_asset_events = context["triggering_asset_events"]
|
48 |
| - session = requests.session() |
49 |
| - logger.info(f"Receive asset events {triggering_asset_events}") |
50 | 48 | for asset_uri, asset_events in triggering_asset_events.items():
|
51 |
| - logger.info(f"Receive asset event from Asset uri={asset_uri}") |
| 49 | + task_logger.info(f"Receive asset event from Asset uri={asset_uri}") |
| 50 | + |
| 51 | + http_hook = HttpHook(method="POST", http_conn_id="discord_webhook") |
52 | 52 | for asset_event in asset_events: # type: ignore[attr-defined]
|
53 | 53 | if asset_event.extra.get("from_trigger", False):
|
54 | 54 | details = asset_event.extra["payload"]
|
55 | 55 | else:
|
56 | 56 | details = asset_event.extra
|
57 | 57 |
|
58 |
| - session.post( |
59 |
| - details.get("webhook_url"), |
60 |
| - json={ |
| 58 | + if not details: |
| 59 | + task_logger.error( |
| 60 | + f"Detail {details} cannot be empty. It's required to send discord message." |
| 61 | + ) |
| 62 | + continue |
| 63 | + |
| 64 | + task_logger.info("Start sending discord message") |
| 65 | + endpoint = Variable.get(details.get("webhook_endpoint_key")) |
| 66 | + http_hook.run_with_advanced_retry( |
| 67 | + endpoint=endpoint, |
| 68 | + data={ |
61 | 69 | "username": details.get("username"),
|
62 | 70 | "content": details.get("content"),
|
63 | 71 | },
|
| 72 | + _retry_args=dict( |
| 73 | + wait=tenacity.wait_random(min=1, max=10), |
| 74 | + stop=tenacity.stop_after_attempt(10), |
| 75 | + retry=tenacity.retry_if_exception_type( |
| 76 | + requests.exceptions.ConnectionError |
| 77 | + ), |
| 78 | + ), |
64 | 79 | )
|
| 80 | + task_logger.info("Discord message sent") |
65 | 81 |
|
66 | 82 | send_discord_message()
|
67 | 83 |
|
|
0 commit comments