From 721910d7dacb47aefcba91e5d57f006858393634 Mon Sep 17 00:00:00 2001 From: Laurie Merrell Date: Thu, 5 Oct 2023 11:37:46 -0500 Subject: [PATCH 1/5] fix deduping of same-timestamp rows --- .../payments/littlepay/stg_littlepay__authorisations.sql | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__authorisations.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__authorisations.sql index 98ffadc13f..d25eee2122 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__authorisations.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__authorisations.sql @@ -59,12 +59,9 @@ add_keys_drop_full_dupes AS ( same_timestamp_simple_dupes AS ( SELECT _payments_key, - TRUE AS to_drop, - COUNT(DISTINCT retrieval_reference_number) AS ct_rrn, - COUNT(*) AS ct + (COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate, FROM add_keys_drop_full_dupes GROUP BY 1 - HAVING ct > 1 AND ct_rrn = 1 ), stg_littlepay__authorisations AS ( @@ -93,7 +90,8 @@ stg_littlepay__authorisations AS ( FROM add_keys_drop_full_dupes LEFT JOIN same_timestamp_simple_dupes USING(_payments_key) - WHERE NOT COALESCE(to_drop, FALSE) + -- rows to drop are those where RRN is null and it's a duplicate + WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL ) SELECT * FROM stg_littlepay__authorisations From 09dc1112f0b9f7fd48bb0bfc62d2ef4664795172 Mon Sep 17 00:00:00 2001 From: Laurie Merrell Date: Fri, 6 Oct 2023 10:37:05 -0500 Subject: [PATCH 2/5] rename cte --- .../payments/littlepay/stg_littlepay__authorisations.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__authorisations.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__authorisations.sql index d25eee2122..7fe38c7fec 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__authorisations.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__authorisations.sql @@ -2,7 +2,7 @@ WITH source AS ( SELECT * FROM {{ source('external_littlepay', 'authorisations') }} ), -clean_columns_and_dedupe_files AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('participant_id') }} AS participant_id, {{ trim_make_empty_string_null('aggregation_id') }} AS aggregation_id, @@ -49,7 +49,7 @@ add_keys_drop_full_dupes AS ( -- generate keys now that input columns have been trimmed & cast {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, {{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key, - FROM clean_columns_and_dedupe_files + FROM clean_columns {{ qualify_dedupe_full_duplicate_lp_rows() }} ), From 29b296c1bcbc23d2cefbd1194fa4474c355e353a Mon Sep 17 00:00:00 2001 From: Laurie Merrell Date: Fri, 6 Oct 2023 10:40:47 -0500 Subject: [PATCH 3/5] handle micropayments in line with new approach --- .../staging/payments/littlepay/_littlepay.yml | 11 ++++ .../stg_littlepay__micropayments.sql | 52 +++++++++++++++++-- 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/warehouse/models/staging/payments/littlepay/_littlepay.yml b/warehouse/models/staging/payments/littlepay/_littlepay.yml index 506f2c8e4a..4ed07910c1 100644 --- a/warehouse/models/staging/payments/littlepay/_littlepay.yml +++ b/warehouse/models/staging/payments/littlepay/_littlepay.yml @@ -467,6 +467,17 @@ models: * `COMPLETE_VARIABLE_FARE` * `INCOMPLETE_VARIABLE_FARE` * `REFUND` + - *lp_export_date + - *lp_export_ts + - *lp_line_number + - *payments_input_row_key + - *_content_hash + - name: _payments_key + description: | + Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.) + tests: + - not_null + - unique - name: stg_littlepay__product_data tests: diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayments.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayments.sql index aac5552cb9..ce5f955b55 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayments.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayments.sql @@ -1,28 +1,70 @@ WITH source AS ( - SELECT * FROM {{ littlepay_source('external_littlepay', 'micropayments') }} + SELECT * FROM {{ source('external_littlepay', 'micropayments') }} ), -stg_littlepay__micropayments AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('micropayment_id') }} AS micropayment_id, {{ trim_make_empty_string_null('aggregation_id') }} AS aggregation_id, {{ trim_make_empty_string_null('participant_id') }} AS participant_id, {{ trim_make_empty_string_null('customer_id') }} AS customer_id, {{ trim_make_empty_string_null('funding_source_vault_id') }} AS funding_source_vault_id, - TIMESTAMP(transaction_time) AS transaction_time, + TIMESTAMP({{ trim_make_empty_string_null('transaction_time') }}) AS transaction_time, {{ trim_make_empty_string_null('payment_liability') }} AS payment_liability, SAFE_CAST(charge_amount AS NUMERIC) AS charge_amount, SAFE_CAST(nominal_amount AS NUMERIC) AS nominal_amount, {{ trim_make_empty_string_null('currency_code') }} AS currency_code, {{ trim_make_empty_string_null('type') }} AS type, {{ trim_make_empty_string_null('charge_type') }} AS charge_type, + CAST(_line_number AS INTEGER) AS _line_number, + `instance`, + extract_filename, + ts, + {{ extract_littlepay_filename_ts() }} AS littlepay_export_ts, + {{ extract_littlepay_filename_date() }} AS littlepay_export_date, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + {{ dbt_utils.generate_surrogate_key(['participant_id', + 'aggregation_id', 'micropayment_id', 'customer_id', 'funding_source_vault_id', 'transaction_time', + 'payment_liability', 'charge_amount', 'nominal_amount', + 'currency_code', 'type', 'charge_type']) }} AS _content_hash, + FROM source +), + +dedupe_rows AS ( + SELECT * + FROM clean_columns + {{ qualify_dedupe_full_duplicate_lp_rows() }} +), + +stg_littlepay__micropayments AS ( + SELECT + micropayment_id, + aggregation_id, + participant_id, + customer_id, + funding_source_vault_id, + transaction_time, + payment_liability, + charge_amount, + nominal_amount, + currency_code, + type, + charge_type, _line_number, `instance`, extract_filename, ts, littlepay_export_ts, - FROM source - QUALIFY ROW_NUMBER() OVER (PARTITION BY micropayment_id ORDER BY littlepay_export_ts DESC, transaction_time DESC) = 1 + littlepay_export_date, + _content_hash, + -- generate keys now that input columns have been trimmed & cast + {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, + {{ dbt_utils.generate_surrogate_key(['micropayment_id']) }} AS _payments_key, + FROM dedupe_rows + -- completed variable fare payments have two rows with same micropayment id and different transaction times + -- we keep the second tap for these + QUALIFY ROW_NUMBER() OVER (PARTITION BY micropayment_id ORDER BY transaction_time DESC) = 1 ) SELECT * FROM stg_littlepay__micropayments From 53d36a50450aee6879762089a15ca0c4a8966094 Mon Sep 17 00:00:00 2001 From: Laurie Merrell Date: Fri, 6 Oct 2023 14:28:53 -0500 Subject: [PATCH 4/5] dedupe settlements --- .../staging/payments/littlepay/_littlepay.yml | 17 +++++- .../littlepay/stg_littlepay__settlements.sql | 61 ++++++++++++++++++- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/warehouse/models/staging/payments/littlepay/_littlepay.yml b/warehouse/models/staging/payments/littlepay/_littlepay.yml index 4ed07910c1..6e343a5be9 100644 --- a/warehouse/models/staging/payments/littlepay/_littlepay.yml +++ b/warehouse/models/staging/payments/littlepay/_littlepay.yml @@ -472,7 +472,8 @@ models: - *lp_line_number - *payments_input_row_key - *_content_hash - - name: _payments_key + - &payments_key_full_uniqueness + name: _payments_key description: | Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.) tests: @@ -742,6 +743,18 @@ models: If the acquirer is Elavon, then this key will contain the second part of the string from `retrieval_reference_number`. - name: settlement_requested_date_time_utc - description: Timestamp of when the settlement request was submitted to the acquirer. + description: | + Timestamp of when the settlement request was submitted to the acquirer. + Per October 2023 updates from Littlepay, it may be more appropriate + to interpret this field as a "last updated" value. - name: acquirer description: Identifies the acquirer used to settle the transaction. + - name: settlement_type + description: | + "DEBIT" or "CREDIT". "CREDIT" settlements are associated with refunds. + - *lp_export_date + - *lp_export_ts + - *lp_line_number + - *payments_input_row_key + - *_content_hash + - *payments_key_full_uniqueness diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__settlements.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__settlements.sql index c46a26bd86..c68ab5f289 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__settlements.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__settlements.sql @@ -2,7 +2,7 @@ WITH source AS ( SELECT * FROM {{ source('external_littlepay', 'settlements') }} ), -stg_littlepay__settlements AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('settlement_id') }} AS settlement_id, {{ trim_make_empty_string_null('participant_id') }} AS participant_id, @@ -13,13 +13,68 @@ stg_littlepay__settlements AS ( {{ trim_make_empty_string_null('retrieval_reference_number') }} AS retrieval_reference_number, {{ trim_make_empty_string_null('littlepay_reference_number') }} AS littlepay_reference_number, {{ trim_make_empty_string_null('external_reference_number') }} AS external_reference_number, - {{ safe_cast('settlement_requested_date_time_utc', type_timestamp()) }} AS settlement_requested_date_time_utc, + {{ trim_make_empty_string_null('settlement_type') }} AS settlement_type, + -- as of 10/6/23, only ATN has record_updated_timestamp_utc + -- per communication from LP, that column is the new name of settlement_requested_date_time_utc + COALESCE( + {{ safe_cast('settlement_requested_date_time_utc', type_timestamp()) }}, + {{ safe_cast('record_updated_timestamp_utc', type_timestamp()) }} + ) AS settlement_requested_date_time_utc, {{ trim_make_empty_string_null('acquirer') }} AS acquirer, - _line_number, + CAST(_line_number AS INTEGER) AS _line_number, + -- TODO: add "new schema" columns that are present only for ATN as of 10/6/23 `instance`, extract_filename, ts, + {{ extract_littlepay_filename_ts() }} AS littlepay_export_ts, + {{ extract_littlepay_filename_date() }} AS littlepay_export_date, + {{ dbt_utils.generate_surrogate_key(['participant_id', + 'settlement_id', 'aggregation_id', 'customer_id', 'funding_source_id', 'transaction_amount', + 'retrieval_reference_number', 'littlepay_reference_number', 'external_reference_number', + 'settlement_type', 'settlement_requested_date_time_utc', 'acquirer']) }} AS _content_hash, FROM source +), + +dedupe_and_keys AS ( + SELECT + *, + -- generate keys now that input columns have been trimmed & cast + {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, + {{ dbt_utils.generate_surrogate_key(['settlement_id']) }} AS _payments_key + FROM clean_columns + {{ qualify_dedupe_full_duplicate_lp_rows() }} +), + +stg_littlepay__settlements AS ( + SELECT + settlement_id, + participant_id, + aggregation_id, + customer_id, + funding_source_id, + transaction_amount, + retrieval_reference_number, + littlepay_reference_number, + external_reference_number, + settlement_type, + settlement_requested_date_time_utc, + acquirer, + _line_number, + -- TODO: add "new schema" columns that are present only for ATN as of 10/6/23 + `instance`, + extract_filename, + ts, + _content_hash, + littlepay_export_ts, + littlepay_export_date, + _key, + _payments_key + FROM dedupe_and_keys + -- we have just one duplicate on settlement id; it's not associated with a refund + -- drop this one case so that we can continue testing for absolute uniqueness + -- if we get more cases, we can add a qualify to get latest appearance only + WHERE _key != "bc6dd0f735a1087b13b424a3c790fc4d" + ) SELECT * FROM stg_littlepay__settlements From 2666b9380946c8529624082ff5dacdd050a7de2f Mon Sep 17 00:00:00 2001 From: Laurie Merrell Date: Fri, 6 Oct 2023 15:01:19 -0500 Subject: [PATCH 5/5] dedupe refunds --- .../staging/payments/littlepay/_littlepay.yml | 6 +++ .../littlepay/stg_littlepay__refunds.sql | 52 ++++++++++++++++++- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/warehouse/models/staging/payments/littlepay/_littlepay.yml b/warehouse/models/staging/payments/littlepay/_littlepay.yml index 6e343a5be9..59c9d4bb38 100644 --- a/warehouse/models/staging/payments/littlepay/_littlepay.yml +++ b/warehouse/models/staging/payments/littlepay/_littlepay.yml @@ -700,6 +700,12 @@ models: description: This field is not in use. - name: settlement_response_text description: This field is not in use. + - *lp_export_date + - *lp_export_ts + - *lp_line_number + - *payments_input_row_key + - *_content_hash + - *payments_key_full_uniqueness - name: stg_littlepay__settlements tests: diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__refunds.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__refunds.sql index 84ea9c9b34..48e62345dd 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__refunds.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__refunds.sql @@ -2,7 +2,7 @@ WITH source AS ( SELECT * FROM {{ source('external_littlepay', 'refunds') }} ), -stg_littlepay__refunds AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('refund_id') }} AS refund_id, {{ trim_make_empty_string_null('participant_id') }} AS participant_id, @@ -28,11 +28,59 @@ stg_littlepay__refunds AS ( {{ safe_cast('settlement_status_time', 'DATE') }} AS settlement_status_time, {{ trim_make_empty_string_null('settlement_reason_code') }} AS settlement_reason_code, {{ trim_make_empty_string_null('settlement_response_text') }} AS settlement_response_text, - _line_number, + CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, ts, + {{ extract_littlepay_filename_ts() }} AS littlepay_export_ts, + {{ extract_littlepay_filename_date() }} AS littlepay_export_date, + {{ dbt_utils.generate_surrogate_key(['participant_id', + 'refund_id', 'aggregation_id', 'customer_id', 'micropayment_id', 'settlement_id', + 'retrieval_reference_number', 'transaction_date', 'transaction_amount', + 'proposed_amount', 'refund_amount', 'currency_code', 'status', 'initiator', 'reason', 'approval_status', 'issuer', + 'issuer_comment', 'created_time', 'approved_time', 'settlement_status', 'settlement_status_time', 'settlement_reason_code', + 'settlement_response_text']) }} AS _content_hash, FROM source +), + +stg_littlepay__refunds AS ( + SELECT + refund_id, + participant_id, + customer_id, + micropayment_id, + aggregation_id, + settlement_id, + retrieval_reference_number, + transaction_date, + transaction_amount, + proposed_amount, + refund_amount, + currency_code, + status, + initiator, + reason, + approval_status, + issuer, + issuer_comment, + created_time, + approved_time, + settlement_status, + settlement_status_time, + settlement_reason_code, + settlement_response_text, + CAST(_line_number AS INTEGER) AS _line_number, + `instance`, + extract_filename, + ts, + littlepay_export_ts, + littlepay_export_date, + _content_hash, + -- generate keys now that input columns have been trimmed & cast + {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, + -- we have multiple rows for some refunds as the refund moves through different statuses; we should handle this later + {{ dbt_utils.generate_surrogate_key(['refund_id', 'approval_status']) }} AS _payments_key + FROM clean_columns ) SELECT * FROM stg_littlepay__refunds