Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
python3 -mvenv /usr/local/share/virtualenvs/tap-quickbooks
source /usr/local/share/virtualenvs/tap-quickbooks/bin/activate
pip install -U pip setuptools
pip install .[dev]
pip install .[test]
- run:
name: 'JSON Validator'
command: |
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ This tap:
* Transfers
* VendorCredits
* Vendors
* ProfitAndLossReport
* DeletedObjects

- Includes a schema for each resource reflecting most recent tested data retrieved using the api. See [the schema folder](https://github.com/singer-io/tap-quickbooks/tree/master/tap_quickbooks/schemas) for details.
Expand Down
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
'requests_oauthlib==1.3.0',
],
extras_require={
'dev': [
'ipdb==0.11',
'test': [
'pylint==2.5.3',
'nose'
],
'dev': [
'ipdb'
]
},
entry_points='''
Expand Down
28 changes: 28 additions & 0 deletions tap_quickbooks/schemas/profit_loss_report.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"type": [
"null",
"object"
],
"properties": {
"ReportDate": {
"type": [
"null",
"string"
],
"format": "date-time"
},
"AccountingMethod": {
"type": [
"null",
"string"
]
},
"Details": {
"type": [
"null",
"object"
],
"properties": {}
}
}
}
158 changes: 158 additions & 0 deletions tap_quickbooks/streams.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import tap_quickbooks.query_builder as query_builder

import singer
from singer import utils
from singer.utils import strptime_to_utc, strftime
from datetime import timedelta

DATE_WINDOW_SIZE = 29

class Stream:
endpoint = '/v3/company/{realm_id}/query'
Expand Down Expand Up @@ -193,6 +198,158 @@ class Vendors(Stream):
table_name = 'Vendor'
additional_where = "Active IN (true, false)"

class ReportStream(Stream):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more comments to the code to explain at each section what is being done

Copy link
Contributor Author

@savan-chovatiya savan-chovatiya Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comments to the code for better code explanation.

parsed_metadata = {
'dates': [],
'data': []
}
key_properties = ['ReportDate']
replication_method = 'INCREMENTAL'
# replication keys is ReportDate, manually created from data
replication_keys = ['ReportDate']

def sync(self):

is_start_date_used = False
params = {
'summarize_column_by': 'Days'
}

# Get bookmark for the stream
start_dttm_str = singer.get_bookmark(self.state, self.stream_name, 'LastUpdatedTime')
if start_dttm_str is None:
start_dttm_str = self.config.get('start_date')
is_start_date_used = True

# Set start_date and end_date for first date window(30 days) of API calls
start_dttm = strptime_to_utc(start_dttm_str)
end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE)
now_dttm = utils.now()

# Fetch records for minimum 30 days
# if bookmark from state file is used and it's less than 30 days away
# Fetch records for start_date to current date
# if start_date is used and it'sless than 30 days away
if end_dttm > now_dttm:
end_dttm = now_dttm
if not is_start_date_used:
start_dttm = end_dttm - timedelta(days=DATE_WINDOW_SIZE)

# Make a API call in 30 days date window until reach current_time
while start_dttm < now_dttm:
self.parsed_metadata = {
'dates': [],
'data': []
}

start_tm_str = str(start_dttm.date())
end_tm_str = str(end_dttm.date())

# Set date window
params["start_date"] = start_tm_str
params["end_date"] = end_tm_str

resp = self.client.get(self.endpoint, params=params)
self.parse_report_columns(resp.get('Columns', {})) # parse report columns from response's metadata
self.parse_report_rows(resp.get('Rows', {})) # parse report rows from response's metadata

reports = self.day_wise_reports() # get reports for every days from parsed metadata
if reports:
for report in reports:
yield report
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between yield report and yield from report? I've seen in many places using yield from report

Copy link
Contributor Author

@savan-chovatiya savan-chovatiya Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • yield from reports and yield inside for loops, both return generators with every element of reports.

  • The python generator will flush out after iteration over it and here, reports is a generator, and tap use last report from reports for storing bookmark.

  • If we use yield from reports then no way to retrieve the last report after it so we used yield with for loop here to utilize the loop's local variable report to retrieve the last report.

self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat())

self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat())
singer.write_state(self.state)

# Set start_date and end_date of date window for next API call
start_dttm = end_dttm + timedelta(days=1) # one record is emitted for every day so start from next day
end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE)

if end_dttm > now_dttm:
end_dttm = now_dttm

singer.write_state(self.state)

def parse_report_columns(self, pileOfColumns):
'''
Restructure columns data in list of dates and update self.parsed_metadata dictionary.
{
"dates": ["2021-07-01", "2021-07-02", "2021-07-03"],
"data": []
}
Reference for report metadata: https://developer.intuit.com/app/developer/qbo/docs/api/accounting/report-entities/profitandloss
'''
columns = pileOfColumns.get('Column', [])
for column in columns:
metadatas = column.get('MetaData', [])
for metadata in metadatas:
if metadata['Name'] in ['StartDate']:
self.parsed_metadata['dates'].append(metadata['Value'])

def parse_report_rows(self, pileOfRows):
'''
Restructure data from report response on daily basis and update self.parsed_metadata dictionary
{
"dates": ["2021-07-01", "2021-07-02", "2021-07-03"],
"data": [ {
"name": "Total Income",
"values": ["4.00", "4.00", "4.00", "12.00"]
}, {
"name": "Gross Profit",
"values": ["4.00", "4.00", "4.00", "12.00"]
}, {
"name": "Total Expenses",
"values": ["1.00", "1.00", "1.00", "3.00"]
}, {
"name": "Net Income",
"values": ["3.00", "3.00", "3.00", "9.00"]
}]
}
Reference for report metadata: https://developer.intuit.com/app/developer/qbo/docs/api/accounting/report-entities/profitandloss
'''

if isinstance(pileOfRows, list):
for row in pileOfRows:
self.parse_report_rows(row)

else:

if 'Rows' in pileOfRows.keys():
self.parse_report_rows(pileOfRows['Rows'])

if 'Row' in pileOfRows.keys():
self.parse_report_rows(pileOfRows['Row'])
Comment on lines +316 to +320
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between Rows and Row here?

Copy link
Contributor Author

@savan-chovatiya savan-chovatiya Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rows is top-level information about profit and loss reports while Row is information about entries of reports. Documentation of the profit and loss report's metadata: documentation.


if 'Summary' in pileOfRows.keys():
self.parse_report_rows(pileOfRows['Summary'])

if 'ColData' in pileOfRows.keys():
entry_data = dict()
entry_data['name'] = pileOfRows['ColData'][0]['value']
vals = []
for column_value in pileOfRows['ColData'][1:]:
vals.append(column_value['value'])
entry_data['values'] = vals
self.parsed_metadata['data'].append(entry_data)

def day_wise_reports(self):
'''
Return record for every day formed using output of parse_report_columns and parse_report_rows
'''
for index, date in enumerate(self.parsed_metadata['dates']):
report = dict()
report['ReportDate'] = date
report['AccountingMethod'] = 'Accrual'
report['Details'] = {}

for data in self.parsed_metadata['data']:
report['Details'][data['name']] = data['values'][index]

yield report

class ProfitAndLossReport(ReportStream):
stream_name = 'profit_loss_report'
endpoint = '/v3/company/{realm_id}/reports/ProfitAndLoss'

class DeletedObjects(Stream):
endpoint = '/v3/company/{realm_id}/cdc'
stream_name = 'deleted_objects'
Expand Down Expand Up @@ -292,5 +449,6 @@ def parse_data_and_write(self, response):
"transfers": Transfers,
"vendor_credits": VendorCredits,
"vendors": Vendors,
"profit_loss_report": ProfitAndLossReport,
"deleted_objects": DeletedObjects
}
27 changes: 26 additions & 1 deletion tests/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import unittest
import time
from datetime import datetime as dt
from datetime import timedelta

Expand All @@ -23,6 +24,12 @@ class TestQuickbooksBase(unittest.TestCase):
INCREMENTAL = "INCREMENTAL"
FULL = "FULL_TABLE"
START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" # %H:%M:%SZ
DATETIME_FMT = {
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S.000000Z",
"%Y-%m-%dT%H:%M:%S%z"
}

def setUp(self):
missing_envs = [x for x in [
Expand Down Expand Up @@ -89,6 +96,7 @@ def expected_check_streams():
"transfers",
"vendor_credits",
"vendors",
"profit_loss_report",
"deleted_objects"
}

Expand All @@ -97,7 +105,13 @@ def expected_metadata(self):

mdata = {}
for stream in self.expected_check_streams():
if stream == "deleted_objects":
if self.is_report_stream(stream):
mdata[stream] = {
self.PRIMARY_KEYS: {'ReportDate'},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {'ReportDate'},
}
elif stream == "deleted_objects":
mdata[stream] = {
self.PRIMARY_KEYS: {'Id', 'Type'},
self.REPLICATION_METHOD: self.INCREMENTAL,
Expand Down Expand Up @@ -230,3 +244,14 @@ def minimum_record_count_by_stream(self):
record_counts["vendors"] = 26

return record_counts

def dt_to_ts(self, dtime):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising an exception after this for loop would prevent the test from getting back a None and failing in a confusing way in the case where the datetime format is unaccounted for.

for date_format in self.DATETIME_FMT:
try:
date_stripped = int(time.mktime(dt.strptime(dtime, date_format).timetuple()))
return date_stripped
except ValueError:
continue

def is_report_stream(self, stream):
return stream in ["profit_loss_report"]
34 changes: 24 additions & 10 deletions tests/test_quickbooks_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,30 +128,44 @@ def test_run(self):
# bookmarked states (actual values)
first_bookmark_value = first_bookmark_key_value.get(sub_level_replication_key)
second_bookmark_value = second_bookmark_key_value.get(sub_level_replication_key)
# bookmarked values as utc for comparing against records
first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value)
second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value)
# bookmarked values as epoch of utc for comparing against records
first_bookmark_value_utc = self.dt_to_ts(self.convert_state_to_utc(first_bookmark_value))
second_bookmark_value_utc = self.dt_to_ts(self.convert_state_to_utc(second_bookmark_value))

# Verify the second sync bookmark is Equal to the first sync bookmark
self.assertEqual(second_bookmark_value, first_bookmark_value) # assumes no changes to data during test

# Verify the second sync records respect the previous (simulated) bookmark value
simulated_bookmark_value = new_state['bookmarks'][stream][sub_level_replication_key]
simulated_bookmark_value = self.dt_to_ts(new_state['bookmarks'][stream][sub_level_replication_key])

# Decrease 30 days from expected epoch time for reports stream as tap sync minimum data for last 30 days in bookmark scenario
if self.is_report_stream(stream):
simulated_bookmark_value -= 2592000

for message in second_sync_messages:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertGreaterEqual(replication_key_value, simulated_bookmark_value,
if self.is_report_stream(stream):
replication_key_value = message.get('data').get('ReportDate')
else:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertGreaterEqual(self.dt_to_ts(replication_key_value), simulated_bookmark_value,
msg="Second sync records do not repect the previous bookmark.")

# Verify the first sync bookmark value is the max replication key value for a given stream
for message in first_sync_messages:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertLessEqual(replication_key_value, first_bookmark_value_utc,
if self.is_report_stream(stream):
replication_key_value = message.get('data').get('ReportDate')
else:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertLessEqual(self.dt_to_ts(replication_key_value), first_bookmark_value_utc,
msg="First sync bookmark was set incorrectly, a record with a greater rep key value was synced")

# Verify the second sync bookmark value is the max replication key value for a given stream
for message in second_sync_messages:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertLessEqual(replication_key_value, second_bookmark_value_utc,
if self.is_report_stream(stream):
replication_key_value = message.get('data').get('ReportDate')
else:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertLessEqual(self.dt_to_ts(replication_key_value), second_bookmark_value_utc,
msg="Second sync bookmark was set incorrectly, a record with a greater rep key value was synced")

# Verify the number of records in the 2nd sync is less then the first
Expand Down
6 changes: 5 additions & 1 deletion tests/test_quickbooks_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ def test_run(self):
self.assertLessEqual(expected_count, record_count)

# Verify the number or records exceeds the max_results (api limit)
pagination_threshold = int(self.get_properties().get(page_size_key))
if self.is_report_stream(stream):
#Tap is making API call in 30 days window for reports stream
pagination_threshold = 30
else:
pagination_threshold = int(self.get_properties().get(page_size_key))
self.assertGreater(record_count, pagination_threshold,
msg="Record count not large enough to gaurantee pagination.")

Expand Down
16 changes: 12 additions & 4 deletions tests/test_quickbooks_start_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ def test_run(self):

# start dates
start_date_1 = self.get_properties()['start_date']
start_date_1_epoch = self.dt_to_ts(start_date_1)
start_date_2 = self.get_properties(original=False)['start_date']
start_date_2_epoch = self.dt_to_ts(start_date_2)

# Verify by stream that our first sync meets or exceeds the default record count
self.assertLessEqual(expected_first_sync_count, first_sync_count)
Expand All @@ -99,10 +101,16 @@ def test_run(self):

# Verify by stream that all records have a rep key that is equal to or greater than that sync's start_date
for message in first_sync_messages:
rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime')
self.assertGreaterEqual(rep_key_value, start_date_1,
if self.is_report_stream(stream):
rep_key_value = message.get('data').get('ReportDate')
else :
rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime')
self.assertGreaterEqual(self.dt_to_ts(rep_key_value), start_date_1_epoch,
msg="A record was replicated with a replication key value prior to the start date")
for message in second_sync_messages:
rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime')
self.assertGreaterEqual(rep_key_value, start_date_2,
if self.is_report_stream(stream):
rep_key_value = message.get('data').get('ReportDate')
else :
rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime')
self.assertGreaterEqual(self.dt_to_ts(rep_key_value), start_date_2_epoch,
msg="A record was replicated with a replication key value prior to the start date")
Loading