Skip to content

Commit

Permalink
Handle exceptions in GitDagBundle Auth (#45982)
Browse files Browse the repository at this point in the history
* Handle exceptions in GitDagBundle Auth

The exceptions that are raised during bundle initialization should
not break the dag processor.

* fixup! Handle exceptions in GitDagBundle Auth

* fixup! fixup! Handle exceptions in GitDagBundle Auth

* Update airflow/dag_processing/bundles/git.py

Co-authored-by: Jed Cunningham <[email protected]>

---------

Co-authored-by: Jed Cunningham <[email protected]>
  • Loading branch information
ephraimbuddy and jedcunningham authored Jan 25, 2025
1 parent dd2252d commit ac5d99c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
25 changes: 17 additions & 8 deletions airflow/dag_processing/bundles/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ def __init__(
self._dag_bundle_root_storage_path / "git" / (self.name + f"+{self.version or self.tracking_ref}")
)
self.git_conn_id = git_conn_id
self.hook = GitHook(git_conn_id=self.git_conn_id)
self.repo_url = self.hook.repo_url
self.repo_url: str | None = None
try:
self.hook = GitHook(git_conn_id=self.git_conn_id)
self.repo_url = self.hook.repo_url
except AirflowException as e:
self.log.error("Error creating GitHook: %s", e)

def _initialize(self):
self._clone_bare_repo_if_required()
Expand All @@ -133,7 +137,7 @@ def _initialize(self):

def initialize(self) -> None:
if not self.repo_url:
raise AirflowException(f"Connection {self.git_conn_id} doesn't have a git_repo_url")
raise AirflowException(f"Connection {self.git_conn_id} doesn't have a host url")
if isinstance(self.repo_url, os.PathLike):
self._initialize()
elif not self.repo_url.startswith("git@") or not self.repo_url.endswith(".git"):
Expand All @@ -155,6 +159,8 @@ def _clone_repo_if_required(self) -> None:
self.repo = Repo(self.repo_path)

def _clone_bare_repo_if_required(self) -> None:
if not self.repo_url:
raise AirflowException(f"Connection {self.git_conn_id} doesn't have a host url")
if not os.path.exists(self.bare_repo_path):
self.log.info("Cloning bare repository to %s", self.bare_repo_path)
Repo.clone_from(
Expand Down Expand Up @@ -213,10 +219,11 @@ def refresh(self) -> None:
self._fetch_bare_repo()
self.repo.remotes.origin.pull()

def _convert_git_ssh_url_to_https(self) -> str:
if not self.repo_url.startswith("git@"):
raise ValueError(f"Invalid git SSH URL: {self.repo_url}")
parts = self.repo_url.split(":")
@staticmethod
def _convert_git_ssh_url_to_https(url: str) -> str:
if not url.startswith("git@"):
raise ValueError(f"Invalid git SSH URL: {url}")
parts = url.split(":")
domain = parts[0].replace("git@", "https://")
repo_path = parts[1].replace(".git", "")
return f"{domain}/{repo_path}"
Expand All @@ -225,8 +232,10 @@ def view_url(self, version: str | None = None) -> str | None:
if not version:
return None
url = self.repo_url
if not url:
return None
if url.startswith("git@"):
url = self._convert_git_ssh_url_to_https()
url = self._convert_git_ssh_url_to_https(url)
parsed_url = urlparse(url)
host = parsed_url.hostname
if not host:
Expand Down
7 changes: 6 additions & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from airflow.configuration import conf
from airflow.dag_processing.collection import update_dag_parsing_results_in_db
from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess
from airflow.exceptions import AirflowException
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.models.dagbundle import DagBundleModel
Expand Down Expand Up @@ -433,7 +434,11 @@ def _refresh_dag_bundles(self):
# TODO: AIP-66 handle errors in the case of incomplete cloning? And test this.
# What if the cloning/refreshing took too long(longer than the dag processor timeout)
if not bundle.is_initialized:
bundle.initialize()
try:
bundle.initialize()
except AirflowException as e:
self.log.exception("Error initializing bundle %s: %s", bundle.name, e)
continue
# TODO: AIP-66 test to make sure we get a fresh record from the db and it's not cached
with create_session() as session:
bundle_model: DagBundleModel = session.get(DagBundleModel, bundle.name)
Expand Down
4 changes: 1 addition & 3 deletions tests/dag_processing/test_dag_bundles.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,7 @@ def test_raises_when_no_repo_url(self):
git_conn_id=CONN_NO_REPO_URL,
tracking_ref=GIT_DEFAULT_BRANCH,
)
with pytest.raises(
AirflowException, match=f"Connection {CONN_NO_REPO_URL} doesn't have a git_repo_url"
):
with pytest.raises(AirflowException, match=f"Connection {CONN_NO_REPO_URL} doesn't have a host url"):
bundle.initialize()

@mock.patch("airflow.dag_processing.bundles.git.GitHook")
Expand Down

0 comments on commit ac5d99c

Please sign in to comment.