Skip to content

Conversation

JiaqiWang18
Copy link
Contributor

@JiaqiWang18 JiaqiWang18 commented Oct 9, 2025

What changes were proposed in this pull request?

Create the API and implementation to allow users to define a sink in a SDP pipeline as outlined in the SPIP. A sink is an external location the pipeline can write data to.

dp.create_sink(
  "myParquetSink",
  format = "parquet",
  options = {"path": "${dir.getPath}"}
)

@dp.append_flow(
  target = "myParquetSink",
)
def mySinkFlow():
  return spark.readStream.table("src")

Why are the changes needed?

New Feature

Does this PR introduce any user-facing change?

New API for unrelased SDP

How was this patch tested?

New and existing tests to ensure sinks work e2e

Was this patch authored or co-authored using generative AI tooling?

No

@JiaqiWang18
Copy link
Contributor Author

@sryza ready for review

identifier: TableIdentifier,
format: String,
options: Map[String, String],
normalizedPath: Option[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does a sink need a normalizedPath?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah not needed, removed

override def toString: String = "VIEW"
}

private object SinkType extends DatasetType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sinks aren't datasets. I.e. the hierarchy is something like:

  • A table is a dataset
  • A view is a dataset
  • A dataset is an output
  • A sink is an output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we can just rename DatasetType to OutputType since the usage of this object in assertOutputIdentifierIsUnique does not distinguish between an output and a dataset.
Also renamed error message PIPELINE_DUPLICATE_IDENTIFIERS.DATASET to PIPELINE_DUPLICATE_IDENTIFIERS.OUTPUT since we also need to check for sink duplicate id.

…aph/SystemMetadataSuite.scala

Co-authored-by: Sandy Ryza <[email protected]>
get_active_graph_element_registry().register_output(table)


def create_sink(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops missed this on first pass: this needs docstring

@JiaqiWang18 JiaqiWang18 requested a review from sryza October 10, 2025 20:20
Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@sryza sryza closed this in e38a651 Oct 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants