Skip to content

Commit

Permalink
Payments staging models refactor: settlements, micropayments, refunds (
Browse files Browse the repository at this point in the history
…#2994)

* fix deduping of same-timestamp rows

* rename cte

* handle micropayments in line with new approach

* dedupe settlements

* dedupe refunds
  • Loading branch information
lauriemerrell authored Oct 10, 2023
1 parent c5c71d5 commit 3a27a09
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 18 deletions.
32 changes: 31 additions & 1 deletion warehouse/models/staging/payments/littlepay/_littlepay.yml
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,18 @@ models:
* `COMPLETE_VARIABLE_FARE`
* `INCOMPLETE_VARIABLE_FARE`
* `REFUND`
- *lp_export_date
- *lp_export_ts
- *lp_line_number
- *payments_input_row_key
- *_content_hash
- &payments_key_full_uniqueness
name: _payments_key
description: |
Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.)
tests:
- not_null
- unique

- name: stg_littlepay__product_data
tests:
Expand Down Expand Up @@ -688,6 +700,12 @@ models:
description: This field is not in use.
- name: settlement_response_text
description: This field is not in use.
- *lp_export_date
- *lp_export_ts
- *lp_line_number
- *payments_input_row_key
- *_content_hash
- *payments_key_full_uniqueness

- name: stg_littlepay__settlements
tests:
Expand Down Expand Up @@ -731,6 +749,18 @@ models:
If the acquirer is Elavon, then this key will contain the second part of the string from `retrieval_reference_number`.
- name: settlement_requested_date_time_utc
description: Timestamp of when the settlement request was submitted to the acquirer.
description: |
Timestamp of when the settlement request was submitted to the acquirer.
Per October 2023 updates from Littlepay, it may be more appropriate
to interpret this field as a "last updated" value.
- name: acquirer
description: Identifies the acquirer used to settle the transaction.
- name: settlement_type
description: |
"DEBIT" or "CREDIT". "CREDIT" settlements are associated with refunds.
- *lp_export_date
- *lp_export_ts
- *lp_line_number
- *payments_input_row_key
- *_content_hash
- *payments_key_full_uniqueness
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ WITH source AS (
SELECT * FROM {{ source('external_littlepay', 'authorisations') }}
),

clean_columns_and_dedupe_files AS (
clean_columns AS (
SELECT
{{ trim_make_empty_string_null('participant_id') }} AS participant_id,
{{ trim_make_empty_string_null('aggregation_id') }} AS aggregation_id,
Expand Down Expand Up @@ -49,7 +49,7 @@ add_keys_drop_full_dupes AS (
-- generate keys now that input columns have been trimmed & cast
{{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key,
{{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key,
FROM clean_columns_and_dedupe_files
FROM clean_columns
{{ qualify_dedupe_full_duplicate_lp_rows() }}
),

Expand All @@ -59,12 +59,9 @@ add_keys_drop_full_dupes AS (
same_timestamp_simple_dupes AS (
SELECT
_payments_key,
TRUE AS to_drop,
COUNT(DISTINCT retrieval_reference_number) AS ct_rrn,
COUNT(*) AS ct
(COUNT(DISTINCT retrieval_reference_number) = 1 AND COUNT(*) > 1) AS drop_candidate,
FROM add_keys_drop_full_dupes
GROUP BY 1
HAVING ct > 1 AND ct_rrn = 1
),

stg_littlepay__authorisations AS (
Expand Down Expand Up @@ -93,7 +90,8 @@ stg_littlepay__authorisations AS (
FROM add_keys_drop_full_dupes
LEFT JOIN same_timestamp_simple_dupes
USING(_payments_key)
WHERE NOT COALESCE(to_drop, FALSE)
-- rows to drop are those where RRN is null and it's a duplicate
WHERE NOT drop_candidate OR retrieval_reference_number IS NOT NULL
)

SELECT * FROM stg_littlepay__authorisations
Original file line number Diff line number Diff line change
@@ -1,28 +1,70 @@
WITH source AS (
SELECT * FROM {{ littlepay_source('external_littlepay', 'micropayments') }}
SELECT * FROM {{ source('external_littlepay', 'micropayments') }}
),

stg_littlepay__micropayments AS (
clean_columns AS (
SELECT
{{ trim_make_empty_string_null('micropayment_id') }} AS micropayment_id,
{{ trim_make_empty_string_null('aggregation_id') }} AS aggregation_id,
{{ trim_make_empty_string_null('participant_id') }} AS participant_id,
{{ trim_make_empty_string_null('customer_id') }} AS customer_id,
{{ trim_make_empty_string_null('funding_source_vault_id') }} AS funding_source_vault_id,
TIMESTAMP(transaction_time) AS transaction_time,
TIMESTAMP({{ trim_make_empty_string_null('transaction_time') }}) AS transaction_time,
{{ trim_make_empty_string_null('payment_liability') }} AS payment_liability,
SAFE_CAST(charge_amount AS NUMERIC) AS charge_amount,
SAFE_CAST(nominal_amount AS NUMERIC) AS nominal_amount,
{{ trim_make_empty_string_null('currency_code') }} AS currency_code,
{{ trim_make_empty_string_null('type') }} AS type,
{{ trim_make_empty_string_null('charge_type') }} AS charge_type,
CAST(_line_number AS INTEGER) AS _line_number,
`instance`,
extract_filename,
ts,
{{ extract_littlepay_filename_ts() }} AS littlepay_export_ts,
{{ extract_littlepay_filename_date() }} AS littlepay_export_date,
-- hash all content not generated by us to enable deduping full dup rows
-- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream
{{ dbt_utils.generate_surrogate_key(['participant_id',
'aggregation_id', 'micropayment_id', 'customer_id', 'funding_source_vault_id', 'transaction_time',
'payment_liability', 'charge_amount', 'nominal_amount',
'currency_code', 'type', 'charge_type']) }} AS _content_hash,
FROM source
),

dedupe_rows AS (
SELECT *
FROM clean_columns
{{ qualify_dedupe_full_duplicate_lp_rows() }}
),

stg_littlepay__micropayments AS (
SELECT
micropayment_id,
aggregation_id,
participant_id,
customer_id,
funding_source_vault_id,
transaction_time,
payment_liability,
charge_amount,
nominal_amount,
currency_code,
type,
charge_type,
_line_number,
`instance`,
extract_filename,
ts,
littlepay_export_ts,
FROM source
QUALIFY ROW_NUMBER() OVER (PARTITION BY micropayment_id ORDER BY littlepay_export_ts DESC, transaction_time DESC) = 1
littlepay_export_date,
_content_hash,
-- generate keys now that input columns have been trimmed & cast
{{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key,
{{ dbt_utils.generate_surrogate_key(['micropayment_id']) }} AS _payments_key,
FROM dedupe_rows
-- completed variable fare payments have two rows with same micropayment id and different transaction times
-- we keep the second tap for these
QUALIFY ROW_NUMBER() OVER (PARTITION BY micropayment_id ORDER BY transaction_time DESC) = 1
)

SELECT * FROM stg_littlepay__micropayments
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ WITH source AS (
SELECT * FROM {{ source('external_littlepay', 'refunds') }}
),

stg_littlepay__refunds AS (
clean_columns AS (
SELECT
{{ trim_make_empty_string_null('refund_id') }} AS refund_id,
{{ trim_make_empty_string_null('participant_id') }} AS participant_id,
Expand All @@ -28,11 +28,59 @@ stg_littlepay__refunds AS (
{{ safe_cast('settlement_status_time', 'DATE') }} AS settlement_status_time,
{{ trim_make_empty_string_null('settlement_reason_code') }} AS settlement_reason_code,
{{ trim_make_empty_string_null('settlement_response_text') }} AS settlement_response_text,
_line_number,
CAST(_line_number AS INTEGER) AS _line_number,
`instance`,
extract_filename,
ts,
{{ extract_littlepay_filename_ts() }} AS littlepay_export_ts,
{{ extract_littlepay_filename_date() }} AS littlepay_export_date,
{{ dbt_utils.generate_surrogate_key(['participant_id',
'refund_id', 'aggregation_id', 'customer_id', 'micropayment_id', 'settlement_id',
'retrieval_reference_number', 'transaction_date', 'transaction_amount',
'proposed_amount', 'refund_amount', 'currency_code', 'status', 'initiator', 'reason', 'approval_status', 'issuer',
'issuer_comment', 'created_time', 'approved_time', 'settlement_status', 'settlement_status_time', 'settlement_reason_code',
'settlement_response_text']) }} AS _content_hash,
FROM source
),

stg_littlepay__refunds AS (
SELECT
refund_id,
participant_id,
customer_id,
micropayment_id,
aggregation_id,
settlement_id,
retrieval_reference_number,
transaction_date,
transaction_amount,
proposed_amount,
refund_amount,
currency_code,
status,
initiator,
reason,
approval_status,
issuer,
issuer_comment,
created_time,
approved_time,
settlement_status,
settlement_status_time,
settlement_reason_code,
settlement_response_text,
CAST(_line_number AS INTEGER) AS _line_number,
`instance`,
extract_filename,
ts,
littlepay_export_ts,
littlepay_export_date,
_content_hash,
-- generate keys now that input columns have been trimmed & cast
{{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key,
-- we have multiple rows for some refunds as the refund moves through different statuses; we should handle this later
{{ dbt_utils.generate_surrogate_key(['refund_id', 'approval_status']) }} AS _payments_key
FROM clean_columns
)

SELECT * FROM stg_littlepay__refunds
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ WITH source AS (
SELECT * FROM {{ source('external_littlepay', 'settlements') }}
),

stg_littlepay__settlements AS (
clean_columns AS (
SELECT
{{ trim_make_empty_string_null('settlement_id') }} AS settlement_id,
{{ trim_make_empty_string_null('participant_id') }} AS participant_id,
Expand All @@ -13,13 +13,68 @@ stg_littlepay__settlements AS (
{{ trim_make_empty_string_null('retrieval_reference_number') }} AS retrieval_reference_number,
{{ trim_make_empty_string_null('littlepay_reference_number') }} AS littlepay_reference_number,
{{ trim_make_empty_string_null('external_reference_number') }} AS external_reference_number,
{{ safe_cast('settlement_requested_date_time_utc', type_timestamp()) }} AS settlement_requested_date_time_utc,
{{ trim_make_empty_string_null('settlement_type') }} AS settlement_type,
-- as of 10/6/23, only ATN has record_updated_timestamp_utc
-- per communication from LP, that column is the new name of settlement_requested_date_time_utc
COALESCE(
{{ safe_cast('settlement_requested_date_time_utc', type_timestamp()) }},
{{ safe_cast('record_updated_timestamp_utc', type_timestamp()) }}
) AS settlement_requested_date_time_utc,
{{ trim_make_empty_string_null('acquirer') }} AS acquirer,
_line_number,
CAST(_line_number AS INTEGER) AS _line_number,
-- TODO: add "new schema" columns that are present only for ATN as of 10/6/23
`instance`,
extract_filename,
ts,
{{ extract_littlepay_filename_ts() }} AS littlepay_export_ts,
{{ extract_littlepay_filename_date() }} AS littlepay_export_date,
{{ dbt_utils.generate_surrogate_key(['participant_id',
'settlement_id', 'aggregation_id', 'customer_id', 'funding_source_id', 'transaction_amount',
'retrieval_reference_number', 'littlepay_reference_number', 'external_reference_number',
'settlement_type', 'settlement_requested_date_time_utc', 'acquirer']) }} AS _content_hash,
FROM source
),

dedupe_and_keys AS (
SELECT
*,
-- generate keys now that input columns have been trimmed & cast
{{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key,
{{ dbt_utils.generate_surrogate_key(['settlement_id']) }} AS _payments_key
FROM clean_columns
{{ qualify_dedupe_full_duplicate_lp_rows() }}
),

stg_littlepay__settlements AS (
SELECT
settlement_id,
participant_id,
aggregation_id,
customer_id,
funding_source_id,
transaction_amount,
retrieval_reference_number,
littlepay_reference_number,
external_reference_number,
settlement_type,
settlement_requested_date_time_utc,
acquirer,
_line_number,
-- TODO: add "new schema" columns that are present only for ATN as of 10/6/23
`instance`,
extract_filename,
ts,
_content_hash,
littlepay_export_ts,
littlepay_export_date,
_key,
_payments_key
FROM dedupe_and_keys
-- we have just one duplicate on settlement id; it's not associated with a refund
-- drop this one case so that we can continue testing for absolute uniqueness
-- if we get more cases, we can add a qualify to get latest appearance only
WHERE _key != "bc6dd0f735a1087b13b424a3c790fc4d"

)

SELECT * FROM stg_littlepay__settlements

0 comments on commit 3a27a09

Please sign in to comment.