Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dnr] SQL Server Source #31953

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
foobar, some progress
ParkMyCar committed Mar 19, 2025
commit fd07387347d941eb73e857f9cc1c2f0c6e896687
22 changes: 22 additions & 0 deletions src/sql-server-util/src/inspect.rs
Original file line number Diff line number Diff line change
@@ -65,6 +65,28 @@ pub async fn ensure_snapshot_isolation_enabled(client: &mut Client) -> Result<()
Ok(())
}

/// Returns the maximum log sequence number.
///
/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/sys-fn-cdc-get-max-lsn-transact-sql?view=sql-server-ver16>
pub async fn get_max_lsn(client: &mut Client) -> Result<Vec<u8>, SqlServerError> {
static MAX_LSN_QUERY: &str = "SELECT sys.fn_cdc_get_max_lsn();";
let result = client.simple_query(MAX_LSN_QUERY).await?;

match &result[..] {
[row] => row
.try_get::<&[u8], _>(0)?
.map(|lsn| lsn.to_vec())
.ok_or_else(|| SqlServerError::InvalidSystemSetting {
name: "max_lsn".to_string(),
expected: "10 byte binary".to_string(),
actual: format!("{result:?}"),
}),
other => Err(SqlServerError::InvariantViolated(format!(
"expected 1 row, got {other:?}"
))),
}
}

/// Returns metadata about the columns from the specified table.
///
/// Note: The implementation of TryFrom for [`SqlServerColumnRaw`] relies on
2 changes: 1 addition & 1 deletion src/sql/src/pure.rs
Original file line number Diff line number Diff line change
@@ -991,7 +991,7 @@ async fn purify_create_source(
database: connection.database.into(),
};
retrieved_source_references = reference_client.get_source_references().await?;
tracing::debug!(?retrieved_source_references, "got source references");
tracing::info!(?retrieved_source_references, "got source references");

let subsources = sql_server::purify_source_exports(
&mut client,
3 changes: 1 addition & 2 deletions src/sql/src/pure/sql_server.rs
Original file line number Diff line number Diff line change
@@ -14,8 +14,7 @@ use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
use crate::plan::PlanError;
use crate::pure::{PurifiedSourceExport, RetrievedSourceReferences, SourceReferencePolicy};

// BTreeMap<UnresolvedItemName, PurifiedSourceExport>

/// Purify the requested [`ExternalReferences`] from the provided [``]
pub(super) async fn purify_source_exports(
client: &mut mz_sql_server_util::Client,
retrieved_references: &RetrievedSourceReferences,
36 changes: 33 additions & 3 deletions src/storage-types/src/sources/sql_server.rs
Original file line number Diff line number Diff line change
@@ -13,10 +13,12 @@ use std::fmt;
use std::sync::LazyLock;

use columnation::{Columnation, CopyRegion};
use mz_ore::future::InTask;
use mz_proto::{IntoRustIfSome, RustType};
use mz_repr::{CatalogItemId, Datum, GlobalId, RelationDesc, Row, ScalarType};
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;

use crate::connections::inline::{
ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
@@ -51,9 +53,24 @@ pub struct SqlServerSource<C: ConnectionAccess = InlinedConnection> {
impl SqlServerSource<InlinedConnection> {
pub async fn fetch_write_frontier(
self,
_storage_configuration: &crate::configuration::StorageConfiguration,
) -> Result<timely::progress::Antichain<Lsn>, anyhow::Error> {
todo!()
storage_configuration: &crate::configuration::StorageConfiguration,
) -> Result<Antichain<Lsn>, anyhow::Error> {
let config = self
.connection
.resolve_config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;
let (mut client, conn) = mz_sql_server_util::Client::connect(config).await?;
// TODO(sql_server1): Make spawning this task automatic.
mz_ore::task::spawn(|| "sql server connection", async move { conn.await });

let maximum_raw_lsn = mz_sql_server_util::inspect::get_max_lsn(&mut client).await?;
let maximum_lsn = Lsn::try_from(&maximum_raw_lsn[..])?;

Ok(Antichain::from_elem(maximum_lsn))
}
}

@@ -224,6 +241,19 @@ impl RustType<ProtoSqlServerSourceExtras> for SqlServerSourceExtras {
)]
pub struct Lsn([u8; 10]);

impl TryFrom<&[u8]> for Lsn {
// TODO(sql_server2): Add an InvalidLsnError type that can identify when
// the LSN is reported as 0xNULL, which I think happens when CDC is not enabled.
type Error = anyhow::Error;

fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let raw = value
.try_into()
.map_err(|_| anyhow::anyhow!("invalid SQL Server LSN"))?;
Ok(Lsn(raw))
}
}

impl fmt::Display for Lsn {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO(sql_server1): Pretty print this as the three components.
1 change: 1 addition & 0 deletions src/storage/src/source/sql_server.rs
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ impl SourceRender for SqlServerSource {
Option<TimelyStream<G, Probe<Self::Time>>>,
Vec<PressOnDropButton>,
) {
tracing::error!(source_exports = ?config.source_exports, ?self, "SQL SERVER SOURCE");
todo!()
}
}