-
Notifications
You must be signed in to change notification settings - Fork 15.3k
Dag processor consumes DagPriorityParsingRequest when relative file l… #48659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Dag processor consumes DagPriorityParsingRequest when relative file l… #48659
Conversation
Still work in progress test cases are yet to come |
da33e5e
to
2230fda
Compare
@@ -407,20 +407,35 @@ def _queue_requested_files_for_parsing(self) -> None: | |||
|
|||
@provide_session | |||
def _get_priority_files(self, session: Session = NEW_SESSION) -> list[DagFileInfo]: | |||
files: list[DagFileInfo] = [] | |||
files: set[DagFileInfo] = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this from a list to a set to ensure uniqueness. There may be cases where a DagPriorityParsingRequest is triggered for both a specific file and a bundle, potentially causing duplicates. Using a set guarantees that each file appears only once.
@jedcunningham please take a look at this one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @shubham-pyc, it looks like there are conflicts in the migration files.
You can refer to the following guide for examples on how to resolve them easily during a rebase:
63040a3
to
a3d64dd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to fix CI and conflicts. Overall the direction looks good.
cd4d42c
to
acb0fde
Compare
@pierrejeambrun resolved conflicts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Thanks! There are a couple of tests failing in the CI.
acb0fde
to
2c23a78
Compare
2c23a78
to
b1e9a7f
Compare
@pierrejeambrun fixed the CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall that looks like the good direction.
@jedcunningham I know you've been working on related issues, any comment?
up. |
# under the License. | ||
|
||
""" | ||
make relative_fileloc nullable for reserialized all bundles. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make relative_fileloc nullable for reserialized all bundles. | |
Make DagPriorityParsingRuquest.relative_fileloc nullable. |
nit
|
||
def __init__(self, bundle_name: str, relative_fileloc: str) -> None: | ||
def __init__(self, bundle_name: str, relative_fileloc: str = "") -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def __init__(self, bundle_name: str, relative_fileloc: str = "") -> None: | |
def __init__(self, bundle_name: str, relative_fileloc: str | None = None) -> None: |
If we are allowing nullable, lets actually use nulls then - otherwise we don't really need to alter the table in the first place.
bool: True if relative_fileloc is None, indicating the whole folder should be parsed, | ||
False otherwise. | ||
""" | ||
return self.relative_fileloc == "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return self.relative_fileloc == "" | |
return self.relative_fileloc is None |
rel_path=Path(request.relative_fileloc), bundle_name=bundle.name, bundle_path=bundle.path | ||
if request.parse_whole_folder(): | ||
# If relative_fileloc is null, get all files from DagModel | ||
dag_files = session.scalars( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should refresh the bundle and relist the dags from disk instead of querying the db.
I think we can just refresh here, relist, and add, but it'd be worth giving it a bit more thought to make sure we aren't impacting other stuff by doing so.
Dag Processing consumes DagPriorityParsingRequest
Needed For: #48174
#47844