Skip to content

Conversation

@willdonnelly
Copy link
Member

@willdonnelly willdonnelly commented Apr 22, 2025

Description:

I have only ever seen this once in production, but since we run discovery at a regular cadence it really ought to be bulletproof and not able to randomly cause a panic if the computed-columns info and the main column metadata are out of sync.


This change is Reviewable

williamhbaker and others added 30 commits March 6, 2025 13:27
Reworks InfoSchema to make a representation of an "existing resource" available
as ExistingResource. As a matter of taste, I also renamed EndpointField to
ExistingField while doing this.

The new usage is that you must always push a resource to get a handle to an
ExistingResource that you can then add fields to. PushResource is idempotent
which simplifies handling for cases where the list of fields is not necessarily
in order with respect to resources.

The primary motivator for this is to allow creators of an InfoSchema to smuggle
in additional metadata about the resource which they may later use, in addition
to just the list of fields. Here, `materialize-bigquery` does that to avoid
having to query for table metadata again: Since it had to when building the
InfoSchema, it can just retrieve the metadata when needed using a type
assertion. The new Iceberg materialization will make similar use of this
capability since schema alterations are built on previous schemas, and you need
to know what the prior schema ID was when committing schema updates to prevent
conflicts.

There's also new handling for namespaces via PushNamespace that the new Iceberg
materialization will use, and perhaps more materializations will use that in the
future.
Modifies NewConstraints to take a raw field config as an input and possibly
return an error. Currently this isn't used for anything, but it's not hard to
imagine the field configuration being important when computing the constraint
for a new field, and since its a raw JSON message parsing that could result in
an error.

Aside from speculative uses, this is prep work for some related changes that
will be implemented in the following commit which creates a new Materializer
interface.
Creates a new Materializer interface, which describes pretty much everything a
fully-featured materialization needs to be able to do. Also, new
`Run{Spec,Validate,Apply,NewTransactor}` helper functions functions that will
use an implementation of a Materializer to run the connector according to the
terms of the existing `materialize-boilerplate`.

The main goal with this is to reduce the tedium of writing a materialization and
needing to do a whole lot of marshalling raw configurations into structs, and
generally writing the same kind of code every time when handling Spec, Validate,
and Apply requests. Many of the concepts are inspired by what we have in the
`materialize-sql` framework.

This interface is opt-in, and will be used by the new Iceberg materialization in
the commits that follow. The main collateral damage was splitting out the
applicable existing `apply.go` logic in a way that could be used by both the
`RunApply` helper and the existing `ApplyChanges` function.

I'm not 100% confident about the implementation of `type_mapping.go`, and may
modify this in the future to be more like `materialize-sql`, since right now it
is still fairly low-level and verbose.

Eventually I'd like to convert existing materializations to use this new system
since that would make it easier to add common behavioral changes (ref: some
upcoming protocol enhancements), but that will wait until it is proven a bit
more and has its bugs worked out.
This PrereqError has been lifted into the common "connector errors" package, so
SQL materializations can use it from there instead of duplicating it in
`materialize-sql`.
…rd updates

A new materialization connector that writes data to Iceberg tables via PySpark
scripts that are run on EMR serverless.

One interesting thing is that only REST catalogs are supported. AWS Glue is also
a popular catalog in our existing delta-updates-only Iceberg materialization.
Glue actually now has an Iceberg REST API, so it can be used in much the same
way as a REST catalog, with a little additional complication from needing to use
AWS' SigV4 authentication mechanism. Still this is much simpler than needing an
entirely separate catalog.

Initially we're only supporting EMR serverless for compute, but it is
conceivable that other systems could be used to run PySpark scripts, or maybe
even something other than Spark. This version of the connector is _mostly_
written assuming EMR serverless is the only option, with the exception of the
compute configuration being a `oneOf`. Adding support for more compute systems
is perhaps something we will need to do in the future, but I didn't worry too
much about that right now.

Also of note is that for now, the materialization only has a standard updates
mode. Delta updates is actually a complicated topic, since there is no
idempotent support for "COPY INTO" operations with Spark, nor transaction
support of any kind. We could possibly make delta updates mode be at-least-once,
or write parquet files directly and modify table metadata similar to what we do
in our existing delta-updates only materialization. That is a potential future
improvement, and I'm curious to see if anybody even wants a delta updates mode.

I haven't yet added support for online column migrations yet, but that will come
soon.
Creates a helper program for working with Iceberg tables to assist with
troubleshooting and automated testing. It's a little more sophisticated that our
typical "fetch-changes" scripts that are used in integration tests since I found
it very useful outside of just test scenarios to see what is going on with an
Iceberg table.
Adds integration tests, a dockerfile, and entries in the Github actions file to
build the connector.
Adds some additional logging to assist with troubleshooting.
Bumps [jinja2](https://github.com/pallets/jinja) from 3.1.5 to 3.1.6.
- [Release notes](https://github.com/pallets/jinja/releases)
- [Changelog](https://github.com/pallets/jinja/blob/main/CHANGES.rst)
- [Commits](pallets/jinja@3.1.5...3.1.6)

---
updated-dependencies:
- dependency-name: jinja2
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <[email protected]>
The maximum argument length for an EMR job is actually quite limited, around 10k
characters, so if the command input for a job gets very long it will fail. This
will happen if any number of significant bindings is associated with a
transaction or even a single binding with a large number of fields, since all of
the fields and their types need to be provided to the script in a serialized
form, in addition to the query to execute.

The fix here is to write out the input to a temporary cloud storage file and
read that in the PySpark script. Rather than providing the input as an argument,
the input is now a URI to the input file.
Enhance the capture common module to support running multiple subtasks for incremental and backfill captures with independent states. This allows more flexible capture configurations where different subtasks can track their own progress and state within a single binding.
Enhance the Stripe source connector to support capture across multiple connected accounts. Key changes include:

- Add account_id field to Stripe object models
- Fetch connected account and platform account IDs during resource discovery
- Modify incremental and backfill fetch methods to support account-specific replication using `Stripe-Account` HTTP header
- Update resource generation to create subtasks for each connected account each with their own state
- Added validation to require backfilling all collections when `capture_connected_accounts` property changes
Debug logging for when documents are discarded because they are not for a
tracked operation type, or because the full document is empty. Both of these
event types should be fairly low-volume.

Also adds a sanity check when combining fragments from a split event to make
sure that a source MongoDB collection is determined, which could otherwise cause
a false negative when checking if that collection is among the tracked
collections. I have no present reason to think this will ever happen, but it
seems worthwhile to verify.
extract mode is better for long-term consistency since it supports
schema changes, but it is really slow. online mode is much faster and it
gives users a better first experience. If they hit an issue with schema
changes, they can switch to extract mode, and theoretically they can
switch back to online mode after the schema changes are captured.
Modify resource bindings to handle cases where no connected accounts are provided. When no connected accounts are present, the methods now use a single fetch function with a None account_id instead of creating per-account fetch functions.
I have a temporary fix for the databricks driver bug in my fork:
mdibaiee/databricks-sql-go@c54f93a

I have not run this version against a full test suite of databricks
driver, but I have run our integration tests and I have also run it
against the code that would reproduce the bug, and confirmed the bug
does not happen with this version and the error is correctly surfaced.

I'm going to deploy this for one customer who is dealing with this issue
without merging it, after approval, to test it out. The customer's
pipeline has been struggling because of this issue as MERGE INTO queries
are too slow for large backfills.
Bumps [ring](https://github.com/briansmith/ring) from 0.17.8 to 0.17.13.
- [Changelog](https://github.com/briansmith/ring/blob/main/RELEASES.md)
- [Commits](https://github.com/briansmith/ring/commits)

---
updated-dependencies:
- dependency-name: ring
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <[email protected]>
…ort multiple object types

Update the Engagement model and schema to include associations for contacts, companies, tickets, content, quotes, orders, emails, meetings, notes, tasks, carts, partner_clients, and marketing_event.
Just the code changes for implementing a bunch of tests of the
new connector, plus some bugfixes those tests turned up. This is
the cleaned up result after a few rounds of testing and fixing.
Alex-Bair and others added 25 commits April 17, 2025 15:40
…bjects

There are a few special custom objects that don't end in `__c`, and users
are asking us to support them. This commit adds a couple we're currently
being asked about.

Feeds for custom objects end in `__Feed` and always have a
`SystemModstamp` field.

Custom metadata end in `__mdt` and always have a `SystemModstamp` field.

Sharing rules for custom objects end in `__Share` and always have a
`LastModifiedDate` field.

There are other special, custom objects we don't support (like change
events ending in `__ChangeEvent`), but no one is asking about these and
it seems like these would need some additional special handling beyond
what's already in the connector (ex: there may be query restrictions or
a parent-child relationship).
When change streaming fails to catch up, we allow the change stream
to "skip ahead" and flag the binding as inconsistent so we can run
another backfill in the future.

But previously, if change streaming failed repeatedly during an
ongoing backfill, that backfill would keep getting interrupted and
rescheduled for 24 hours in the future. This meant that a really
flaky stream in conjunction with frequent task restarts could
prevent the backfill from ever getting anything done, because
it would keep on getting through just a small bit of the data,
then the task would restart, we'd see the 'Inconsistent: true'
flag, and we'd interrupt the backfill and schedule it to start
over from the top the next day.

The solution is that we should never interrupt an ongoing backfill,
once started we should see it through and then run _another_ one
afterwards (subject to our minimum delay between backfills) to
resolve the inconsistent flag.
When we're inconsistent and there's an ongoing backfill, we need
to preserve that inconsistent flag for later.
This commit makes two logging changes:
1. The sqlcapture logic will now maintain separate counts of the
   important event types and log the counts separately rather than
   a single opaque "events" count.
2. Fixes a handful of spots I noticed while testing the previous
   change where serialized JSON values were being logged as byte
   arrays `[65 66 67]` which is ugly. Most of these are new from
   the cursors-as-JSON refactoring I did yesterday, so I figured
   I'd just address those real quick while I was at it.
The delta updates mode for DynamoDB never really made sense, and was basically
assured to fail if & when multiple documents with the same key show up in the
same transaction, which is a thing that regularly happens.

There's not any valid use case for delta updates with DynamoDB that can be
presently conceived of, so this removes it from the resource configuration
entirely.
Materializations do not support nullable keys in general, and
materialize-iceberg is no exception.

A non-required source collection field is allowed as a key if it has a default
value set, and so if a key field has been selected we know it will never be null
and should create the column for the key field accordingly. On a practical
level, this is necessary since key fields are created as identity columns which
cannot be not required.
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.37.0 to 0.38.0.
- [Commits](golang/net@v0.37.0...v0.38.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-version: 0.38.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <[email protected]>
… length

The maximum allowed string byte size in Azure Fabric Warehouse is 1 MiB, which
is somewhat restrictive.

In cases where strings are larger than this, we'd like to know which field it is
for. The error message from Fabric doesn't tell us that of course, so this is a
built-in check of the lengths before we send them to Fabric.
Adds the "tinyint1_as_bool" feature flag as a no-op, with tests
exercising the intended functionality so we can see that it works
as intended in the followup commit.
This implements the "tinyint1_as_bool" feature flag which was
added in the previous commit, however note that it's still not
the default behavior.
I've just spent the past couple of hours up to my eyeballs in
PostgreSQL XIDs and the different `txid_snapshot_foo(...)`
functions so I'm going to take this opportunity to clean up
and simplify some of the PostgreSQL CDC logic around XMIN
filtered backfills while it's fresh in my mind.

Strictly speaking only one of these changes is truly needed
ASAP, using `compareXID32` to do the appropriate circular
comparison for XMIN filtering in the result row processing.

The other stuff is just cleanups, because we don't need to be
using a complicated case statement based on whether or not we
are hitting a standby when the `txid_snapshot_xmin/xmax` bits
work perfectly fine on the master DB as well.
Two issues, both of which I'm a bit confused how I failed to see
them in the previous XMIN backfill tweaks PR but whatever:

- The lower-bound filter clause needs to be `>=` rather than `>`
  in order for our XMIN backfill test case to pass correctly.
- The query generation snapshots need to be updated
Bumps [jinja2](https://github.com/pallets/jinja) from 3.1.5 to 3.1.6.
- [Release notes](https://github.com/pallets/jinja/releases)
- [Changelog](https://github.com/pallets/jinja/blob/main/CHANGES.rst)
- [Commits](pallets/jinja@3.1.5...3.1.6)

---
updated-dependencies:
- dependency-name: jinja2
  dependency-version: 3.1.6
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <[email protected]>
Bumps [jinja2](https://github.com/pallets/jinja) from 3.1.5 to 3.1.6.
- [Release notes](https://github.com/pallets/jinja/releases)
- [Changelog](https://github.com/pallets/jinja/blob/main/CHANGES.rst)
- [Commits](pallets/jinja@3.1.5...3.1.6)

---
updated-dependencies:
- dependency-name: jinja2
  dependency-version: 3.1.6
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <[email protected]>
Buckets with dots in their names break SSL certificate validation, so don't
allow them to be input.
… error

On some indeterminate frequency, it seems that the staging bucket credentials
"expire" and subsequent queries that involve reading from S3 (which is ~all of
them) fail because of this.

This is a speculative change, but I am hoping that by periodically re-issuing
the various "SET credential" type of commands we can avoid these intermittent
errors.

It's quite difficult to test this locally in any meaningful way, but I have
verified that it at least doesn't break anything. So we will need to see if the
errors continue after this is deployed.
This is a no-op refactoring which introduces a new abstraction
that we'll need in a followup commit.

It also moves the code around a bit, because the obvious place
for the `mysqlClient` abstraction to live is in `database.go`
but I just cannot accept having `mysqlClient` in that file and
yet leaving the `mysqlDatabase` struct in `main.go`.
The go-mysql client library's handling of read/write timeouts is
kind of awful. Previously we disengaged all timeouts in favor of
implementing our own deadline on the initial connection, but it
turns out this leaves us at a real risk of indefinite hangs in
some cases if the network connection silently gets dropped.

So in this commit we take matters into our own hands and set a
deadline on the underlying net.Conn, implementing a timeout on
the entire query/execute operation. For most queries we have a
default timeout of 60s, and for backfills we override that with
a much more generous 6h timeout.
Once started, the network tunnel should never stop, and if it does we should
crash the connector and log an explanation.
The nullability evaluation for parquet schema columns was previously just plain
wrong, since it didn't account for a field that was always present but could be
an explicit null, so that's been fixed.

As an enhancement, it will now also consider a column to be required if its
types do not allow an explicit null & it has a default value. In this case it is
guaranteed that the field will always have a value that is not null, since if it
is absent the default will kick in. This makes things consistent with how the
boilerplate Apply handling does not drop NOT NULL constraints for fields with a
default value, which is important for Iceberg tables where a file with an
optional column cannot be appended to a table where that column is required.
Same as we do for PostgreSQL.
I have only ever seen this once in production, but since we run
discovery at a regular cadence it really ought to be bulletproof
and not able to randomly cause a panic if the computed-columns
info and the main column metadata are out of sync.
@willdonnelly willdonnelly added the change:unplanned Unplanned change, useful for things like doc updates label Apr 22, 2025
@willdonnelly willdonnelly requested a review from a team April 22, 2025 19:36
var info, ok = table.Columns[columnName]
if !ok {
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any idea why they are out of sync?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a bulletproof explanation, but I have some observations:

  1. They're coming from different information tables. The main column listing is based on information_schema.columns while the computed-columns info comes from sys.columns. So it's theoretically possible those could differ due to permissions or something.
  2. The discovery queries aren't all run in a single transaction, so it's possible for them to be mismatched if the user is modifying a table at the precise moment we're running discovery.
  3. In the particular case I observed, the panic happened once and then didn't reoccur, so I suspect (2) was actually the case.

Copy link
Member

@mdibaiee mdibaiee Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to error out and re-try in this case until we get in-sync values? otherwise we are giving them unstable discovery results which may change on a second run I guess? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the ordering of operations here is fine, I think. The main column metadata listing is authoritative, and if a column wasn't present in that listing then we simply shouldn't be augmenting it with the computed-column information either. The results are only "unstable" inasmuch as the user has just changed something and the next discovery run might contain an extra column, but that's the same situation we'd be in if the current discovery run had simply executed a few seconds sooner.

This is also consistent with how we handle columns-on-tables when assembling discovery results: if a table wasn't returned by the main "list tables" query then we skip augmenting it with more detailed information from the other queries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

change:unplanned Unplanned change, useful for things like doc updates

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants