-
Notifications
You must be signed in to change notification settings - Fork 24
TDL-13967: Add ProfitAndLoss report stream #37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 12 commits
efb1458
4f4af32
f25d3c0
2237b71
9ea6c51
4ac9530
b7b19cd
18365fe
84f2857
fd07ad1
bf7d920
2cf255d
604c67d
b6e8e4a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| { | ||
| "type": [ | ||
| "null", | ||
| "object" | ||
| ], | ||
| "properties": { | ||
| "ReportDate": { | ||
| "type": [ | ||
| "null", | ||
| "string" | ||
| ], | ||
| "format": "date-time" | ||
| }, | ||
| "AccountingMethod": { | ||
| "type": [ | ||
| "null", | ||
| "string" | ||
| ] | ||
| }, | ||
| "Details": { | ||
| "type": [ | ||
| "null", | ||
| "object" | ||
| ], | ||
| "properties": {} | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -1,6 +1,11 @@ | ||||
| import tap_quickbooks.query_builder as query_builder | ||||
|
|
||||
| import singer | ||||
| from singer import utils | ||||
| from singer.utils import strptime_to_utc, strftime | ||||
| from datetime import timedelta | ||||
|
|
||||
| DATE_WINDOW_SIZE = 29 | ||||
|
|
||||
| class Stream: | ||||
| endpoint = '/v3/company/{realm_id}/query' | ||||
|
|
@@ -193,6 +198,151 @@ class Vendors(Stream): | |||
| table_name = 'Vendor' | ||||
| additional_where = "Active IN (true, false)" | ||||
|
|
||||
| class ReportStream(Stream): | ||||
| parsed_metadata = { | ||||
| 'dates': [], | ||||
| 'data': [] | ||||
| } | ||||
| key_properties = ['ReportDate'] | ||||
| replication_method = 'INCREMENTAL' | ||||
| # replication keys is ReportDate, manually created from data | ||||
| replication_keys = ['ReportDate'] | ||||
|
|
||||
| def sync(self): | ||||
|
|
||||
| is_start_date_used = False | ||||
| params = { | ||||
| 'summarize_column_by': 'Days' | ||||
| } | ||||
|
|
||||
| start_dttm_str = singer.get_bookmark(self.state, self.stream_name, 'LastUpdatedTime') | ||||
| if start_dttm_str is None: | ||||
| start_dttm_str = self.config.get('start_date') | ||||
| is_start_date_used = True | ||||
|
|
||||
| start_dttm = strptime_to_utc(start_dttm_str) | ||||
| end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) | ||||
| now_dttm = utils.now() | ||||
|
|
||||
| # Fetch records for minimum 30 days | ||||
| # if bookmark from state file is used and it's less than 30 days away | ||||
| # Fetch records for start_date to current date | ||||
| # if start_date is used and it'sless than 30 days away | ||||
| if end_dttm > now_dttm: | ||||
| end_dttm = now_dttm | ||||
| if not is_start_date_used: | ||||
| start_dttm = end_dttm - timedelta(days=DATE_WINDOW_SIZE) | ||||
|
|
||||
| while start_dttm < now_dttm: | ||||
| self.parsed_metadata = { | ||||
| 'dates': [], | ||||
| 'data': [] | ||||
| } | ||||
|
|
||||
| start_tm_str = str(start_dttm.date()) | ||||
| end_tm_str = str(end_dttm.date()) | ||||
|
|
||||
| params["start_date"] = start_tm_str | ||||
| params["end_date"] = end_tm_str | ||||
|
|
||||
| resp = self.client.get(self.endpoint, params=params) | ||||
| self.parse_report_columns(resp.get('Columns', {})) | ||||
| self.parse_report_rows(resp.get('Rows', {})) | ||||
|
|
||||
| reports = self.day_wise_reports() | ||||
| if reports: | ||||
| for report in reports: | ||||
| yield report | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the difference between yield report and yield from report? I've seen in many places using yield from report
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
tap-quickbooks/tap_quickbooks/streams.py Line 260 in b6e8e4a
|
||||
| self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat()) | ||||
| singer.write_state(self.state) | ||||
|
|
||||
| start_dttm = end_dttm + timedelta(days=1) # one record is emitted for every day so start from next day | ||||
| end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) | ||||
|
|
||||
| if end_dttm > now_dttm: | ||||
| end_dttm = now_dttm | ||||
|
|
||||
| singer.write_state(self.state) | ||||
|
|
||||
| def parse_report_columns(self, pileOfColumns): | ||||
| ''' | ||||
| Restructure columns data in list of dates and update self.parsed_metadata dictionary. | ||||
| { | ||||
| "dates": ["2021-07-01", "2021-07-02", "2021-07-03"], | ||||
| "data": [] | ||||
| } | ||||
| ''' | ||||
| columns = pileOfColumns.get('Column', []) | ||||
| for column in columns: | ||||
| metadatas = column.get('MetaData', []) | ||||
| for md in metadatas: | ||||
|
||||
| if md['Name'] in ['StartDate']: | ||||
| self.parsed_metadata['dates'].append(md['Value']) | ||||
|
|
||||
| def parse_report_rows(self, pileOfRows): | ||||
| ''' | ||||
| Restructure data from report response on daily basis and update self.parsed_metadata dictionary | ||||
| { | ||||
| "dates": ["2021-07-01", "2021-07-02", "2021-07-03"], | ||||
| "data": [ { | ||||
| "name": "Total Income", | ||||
| "values": ["4.00", "4.00", "4.00", "12.00"] | ||||
| }, { | ||||
| "name": "Gross Profit", | ||||
| "values": ["4.00", "4.00", "4.00", "12.00"] | ||||
| }, { | ||||
| "name": "Total Expenses", | ||||
| "values": ["1.00", "1.00", "1.00", "3.00"] | ||||
| }, { | ||||
| "name": "Net Income", | ||||
| "values": ["3.00", "3.00", "3.00", "9.00"] | ||||
| }] | ||||
| } | ||||
| ''' | ||||
|
|
||||
| if isinstance(pileOfRows, list): | ||||
| for x in pileOfRows: | ||||
|
||||
| self.parse_report_rows(x) | ||||
|
|
||||
| else: | ||||
|
|
||||
| if 'Rows' in pileOfRows.keys(): | ||||
| self.parse_report_rows(pileOfRows['Rows']) | ||||
|
|
||||
| if 'Row' in pileOfRows.keys(): | ||||
| self.parse_report_rows(pileOfRows['Row']) | ||||
|
Comment on lines
+316
to
+320
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the difference between Rows and Row here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||
|
|
||||
| if 'Summary' in pileOfRows.keys(): | ||||
| self.parse_report_rows(pileOfRows['Summary']) | ||||
|
|
||||
| if 'ColData' in pileOfRows.keys(): | ||||
| d = dict() | ||||
|
||||
| d['name'] = pileOfRows['ColData'][0]['value'] | ||||
| vals = [] | ||||
| for x in pileOfRows['ColData'][1:]: | ||||
| vals.append(x['value']) | ||||
| d['values'] = vals | ||||
| self.parsed_metadata['data'].append(d) | ||||
|
||||
|
|
||||
| def day_wise_reports(self): | ||||
| ''' | ||||
| Return record for every day formed using output of parse_report_columns and parse_report_rows | ||||
| ''' | ||||
| for index, date in enumerate(self.parsed_metadata['dates']): | ||||
| report = dict() | ||||
| report['ReportDate'] = date | ||||
| report['AccountingMethod'] = 'Accrual' | ||||
| report['Details'] = {} | ||||
|
|
||||
| for data in self.parsed_metadata['data']: | ||||
| report['Details'][data['name']] = data['values'][index] | ||||
|
|
||||
| yield report | ||||
|
|
||||
| class ProfitAndLossReport(ReportStream): | ||||
| stream_name = 'profit_loss_report' | ||||
| endpoint = '/v3/company/{realm_id}/reports/ProfitAndLoss' | ||||
|
|
||||
| class DeletedObjects(Stream): | ||||
| endpoint = '/v3/company/{realm_id}/cdc' | ||||
| stream_name = 'deleted_objects' | ||||
|
|
@@ -292,5 +442,6 @@ def parse_data_and_write(self, response): | |||
| "transfers": Transfers, | ||||
| "vendor_credits": VendorCredits, | ||||
| "vendors": Vendors, | ||||
| "profit_loss_report": ProfitAndLossReport, | ||||
| "deleted_objects": DeletedObjects | ||||
| } | ||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| import os | ||
| import unittest | ||
| import time | ||
| from datetime import datetime as dt | ||
| from datetime import timedelta | ||
|
|
||
|
|
@@ -23,6 +24,12 @@ class TestQuickbooksBase(unittest.TestCase): | |
| INCREMENTAL = "INCREMENTAL" | ||
| FULL = "FULL_TABLE" | ||
| START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" # %H:%M:%SZ | ||
| DATETIME_FMT = { | ||
| "%Y-%m-%dT%H:%M:%SZ", | ||
| "%Y-%m-%d %H:%M:%S", | ||
| "%Y-%m-%dT%H:%M:%S.000000Z", | ||
| "%Y-%m-%dT%H:%M:%S%z" | ||
| } | ||
|
|
||
| def setUp(self): | ||
| missing_envs = [x for x in [ | ||
|
|
@@ -89,6 +96,7 @@ def expected_check_streams(): | |
| "transfers", | ||
| "vendor_credits", | ||
| "vendors", | ||
| "profit_loss_report", | ||
| "deleted_objects" | ||
| } | ||
|
|
||
|
|
@@ -97,7 +105,13 @@ def expected_metadata(self): | |
|
|
||
| mdata = {} | ||
| for stream in self.expected_check_streams(): | ||
| if stream == "deleted_objects": | ||
| if self.is_report_stream(stream): | ||
| mdata[stream] = { | ||
| self.PRIMARY_KEYS: {'ReportDate'}, | ||
| self.REPLICATION_METHOD: self.INCREMENTAL, | ||
| self.REPLICATION_KEYS: {'ReportDate'}, | ||
| } | ||
| elif stream == "deleted_objects": | ||
| mdata[stream] = { | ||
| self.PRIMARY_KEYS: {'Id', 'Type'}, | ||
| self.REPLICATION_METHOD: self.INCREMENTAL, | ||
|
|
@@ -230,3 +244,14 @@ def minimum_record_count_by_stream(self): | |
| record_counts["vendors"] = 26 | ||
|
|
||
| return record_counts | ||
|
|
||
| def dt_to_ts(self, dtime): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Raising an exception after this for loop would prevent the test from getting back a None and failing in a confusing way in the case where the datetime format is unaccounted for. |
||
| for date_format in self.DATETIME_FMT: | ||
| try: | ||
| date_stripped = int(time.mktime(dt.strptime(dtime, date_format).timetuple())) | ||
| return date_stripped | ||
| except ValueError: | ||
| continue | ||
|
|
||
| def is_report_stream(self, stream): | ||
| return stream in ["profit_loss_report"] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add more comments to the code to explain at each section what is being done
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more comments to the code for better code explanation.