-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52640][SDP] Propagate Python Source Code Location #51344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i left a few comments, but this looks close to ready to merge to me.
CC @gengliangwang @hvanhovell in case either of you are also interested in taking a look.
python/pyspark/pipelines/spark_connect_graph_element_registry.py
Outdated
Show resolved
Hide resolved
@@ -49,35 +49,54 @@ class GraphRegistrationContext( | |||
flows += flowDef.copy(sqlConf = defaultSqlConf ++ flowDef.sqlConf) | |||
} | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes look independent from what the rest of this PR is doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just added the scaladoc for the sake of future readers, but the changes to toDataflowGraph
are relevant.
toDataflowGraph
is where we ensure all identifiers are fully qualified, and qualify if not. It's intentional that once we fully qualify (or verify that the identifier is already fully qualified), we also update the associated query origin with the fully qualified identifier.
@@ -156,6 +156,10 @@ private[connect] object PipelinesHandler extends Logging { | |||
.filter(_.nonEmpty), | |||
properties = dataset.getTablePropertiesMap.asScala.toMap, | |||
baseOrigin = QueryOrigin( | |||
filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can store filePath
and line
to variables or a method to avoid duplicated code
LGTM too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
What changes were proposed in this pull request?
Propagate source code location details (line number and file path) E2E for declarative pipelines. That is, collect this information from the python REPL that registers SDP datasets/flows, propagate it through the appropriate spark connect handlers, and associate it to the appropriate datasets/flows in pipeline events/exceptions.
Why are the changes needed?
Better observability and debugging experience for users. Allows users to identify the exact lines that cause a particular exception.
Does this PR introduce any user-facing change?
Yes, we are populating source code information in the origin for pipeline events, which is user-facing. Currently SDP is not released in any spark version however.
How was this patch tested?
Added tests to
org.apache.spark.sql.connect.pipelines.PythonPipelineSuite
Was this patch authored or co-authored using generative AI tooling?
No