|
1 | 1 | """REST client handling, including google_sheetsStream base class.""" |
2 | 2 |
|
| 3 | +from http import HTTPStatus |
3 | 4 | from pathlib import Path |
| 5 | +from random import random |
4 | 6 | from typing import Any, Dict, Iterable, Optional |
| 7 | +from typing_extensions import override |
5 | 8 |
|
6 | 9 | import requests |
7 | 10 | from singer_sdk.helpers.jsonpath import extract_jsonpath |
8 | 11 | from singer_sdk.streams import RESTStream |
| 12 | +from singer_sdk.exceptions import RetriableAPIError |
9 | 13 |
|
10 | 14 | from tap_google_sheets.auth import ( |
11 | 15 | GoogleSheetsAuthenticator, |
@@ -102,3 +106,28 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: |
102 | 106 | """Parse the response and return an iterator of result rows.""" |
103 | 107 | # TODO: Parse response body and return a set of records. |
104 | 108 | yield from extract_jsonpath(self.records_jsonpath, input=response.json()) |
| 109 | + |
| 110 | + @override |
| 111 | + def backoff_wait_generator(self): |
| 112 | + """Custom exponential backoff for 429 responses.""" |
| 113 | + def _wait_time(retriable_api_error: RetriableAPIError): |
| 114 | + if retriable_api_error.response.status_code != HTTPStatus.TOO_MANY_REQUESTS: |
| 115 | + raise retriable_api_error |
| 116 | + |
| 117 | + attempt = retriable_api_error.attempt_number or 1 |
| 118 | + base_delay = min(2 ** (attempt - 1), 64) |
| 119 | + |
| 120 | + # Add up to 1s of jitter (recommeded by google) |
| 121 | + jitter = random.uniform(0, 1) |
| 122 | + |
| 123 | + wait_time = base_delay + jitter |
| 124 | + self.logger.warning( |
| 125 | + f"Rate limit hit (429). Retrying in {wait_time:.2f}s (attempt {attempt})" |
| 126 | + ) |
| 127 | + return wait_time |
| 128 | + |
| 129 | + return _wait_time |
| 130 | + |
| 131 | + @override |
| 132 | + def backoff_max_tries(self): |
| 133 | + return 8 |
0 commit comments