Skip to content

Commit

Permalink
Enhancements to Freshdesk Stream Handling (#15)
Browse files Browse the repository at this point in the history
* Better error handling and tracing - Func overriding

* Handling 300 pages pagination is has_more func - Class FreshdeskPaginator

* Making tickets_abridged sorted asc by updated_at

* Error response fix

* Schema change tickets.json, association_type is number or null

* 300 page API  -pagination handling

* Fixed JSONDecodeError and enumeration of error

---------

Co-authored-by: prashantvikram <[email protected]>
  • Loading branch information
Prashantvik and Prashantvik authored Mar 12, 2024
1 parent 2d54409 commit 902dca5
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 18 deletions.
79 changes: 62 additions & 17 deletions tap_freshdesk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from typing import Any, Callable, Iterable, TYPE_CHECKING, Generator

import requests
from http import HTTPStatus
from urllib.parse import urlparse
from singer_sdk.authenticators import BasicAuthenticator
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import RESTStream
Expand All @@ -20,6 +22,7 @@

class FreshdeskStream(RESTStream):
"""freshdesk stream class."""

name: str
records_jsonpath = "$.[*]" # Or override `parse_response`.
primary_keys = ["id"]
Expand All @@ -33,11 +36,11 @@ def path(self) -> str:
"""
'groups' -> '/groups'
"""
return f'/{self.name}'
return f"/{self.name}"

@property
def schema_filepath(self) -> Path | None:
return SCHEMAS_DIR / f'{self.name}.json'
return SCHEMAS_DIR / f"{self.name}.json"

# OR use a dynamic url_base:
@property
Expand All @@ -46,7 +49,6 @@ def url_base(self) -> str:
domain = self.config["domain"]
return f"https://{domain}.freshdesk.com/api/v2"


@property
def authenticator(self) -> BasicAuthenticator:
"""Return a new authenticator object.
Expand Down Expand Up @@ -117,11 +119,11 @@ def get_url_params(
A dictionary of URL query parameters.
"""
params: dict = {}
embeds = self.config.get('embeds')
embeds = self.config.get("embeds")
if embeds:
embed_fields = embeds.get(self.name, [])
if embed_fields: # i.e. 'stats,company,sla_policy'
params['include'] = ','.join(embed_fields)
if embed_fields: # i.e. 'stats,company,sla_policy'
params["include"] = ",".join(embed_fields)
return params

def prepare_request_payload(
Expand Down Expand Up @@ -167,33 +169,76 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None:
"""
# TODO: Delete this method if not needed.
return row

def get_new_paginator(self) -> SinglePagePaginator:
return SinglePagePaginator()

def backoff_wait_generator(self) -> Generator[float, None, None]:
return self.backoff_runtime(value=self._wait_for)

@staticmethod
def _wait_for(exception) -> int:
"""
When 429 thrown, header contains the time to wait before
the next call is allowed, rather than use exponential backoff"""
return int(exception.response.headers['Retry-After'])
return int(exception.response.headers["Retry-After"])

def backoff_jitter(self, value: float) -> float:
return value

# Handling error, overriding this method over RESTStream's Class
def response_error_message(self, response: requests.Response) -> str:
"""Build error message for invalid http statuses.
WARNING - Override this method when the URL path may contain secrets or PII
Args:
response: A :class:`requests.Response` object.
Returns:
str: The error message
"""
full_path = urlparse(response.url).path or self.path
error_type = (
"Client"
if HTTPStatus.BAD_REQUEST
<= response.status_code
< HTTPStatus.INTERNAL_SERVER_ERROR
else "Server"
)

error_details = []
if response.status_code >= 400:
print(f"Error Response: {response.status_code} {response.reason}")
try:
error_data = response.json()
errors = error_data.get("errors")
for index, error in enumerate(errors):
message = error.get("message", "Unknown")
field = error.get("field", "Unknown")
error_details.append(
f"Error {index + 1}: Message - {message}, Field - {field}"
)
except requests.exceptions.JSONDecodeError:
return "Error: Unable to parse JSON error response"

return (
f"{response.status_code} {error_type} Error: "
f"{response.reason} for path: {full_path}. "
f"Error via function response_error_message : {'. '.join(error_details)}."
)


class FreshdeskPaginator(BasePageNumberPaginator):

def has_more(self, response: Response) -> bool:
"""
There is no 'has more' indicator for this stream.
If there are no results on this page, then this is 'last' page,
If there are no results on this page, then this is 'last' page,
(even though technically the page before was the last, there was no way to tell).
"""
return len(response.json())
return len(response.json()) != 0 and self.current_value < 300


class PagedFreshdeskStream(FreshdeskStream):

Expand All @@ -213,11 +258,11 @@ def get_url_params(
"""
context = context or {}
params = super().get_url_params(context, next_page_token)
params['per_page'] = 100
params["per_page"] = 100
if next_page_token:
params["page"] = next_page_token
if 'updated_since' not in context:
params['updated_since'] = self.get_starting_timestamp(context)
if "updated_since" not in context:
params["updated_since"] = self.get_starting_timestamp(context)
return params

def get_new_paginator(self) -> BasePageNumberPaginator:
Expand Down
2 changes: 1 addition & 1 deletion tap_freshdesk/schemas/tickets.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
"association_type": {
"type": [
"null",
"string"
"number"
]
},
"associated_tickets_count": {
Expand Down
38 changes: 38 additions & 0 deletions tap_freshdesk/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,44 @@ def path(self) -> str:
def schema_filepath(self) -> Path | None:
return SCHEMAS_DIR / 'tickets.json'

@property
def is_sorted(self) -> bool:
"""Expect stream to be sorted.
When `True`, incremental streams will attempt to resume if unexpectedly
interrupted.
Returns:
`True` if stream is sorted. Defaults to `False`.
"""
return True

def get_url_params(
self,
context: dict | None,
next_page_token: Any | None,
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
context = context or {}
params = super().get_url_params(context, next_page_token)
params['per_page'] = 100
# Adding these parameters for sorting
params['order_type'] = "asc"
params['order_by'] = "updated_at"
if next_page_token:
params["page"] = next_page_token
if 'updated_since' not in context:
params['updated_since'] = self.get_starting_timestamp(context)
return params

def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
context = context or {}
records = self.request_records(context=context)
Expand Down

0 comments on commit 902dca5

Please sign in to comment.