11
11
12
12
from tap_dbt .client import DBTStream
13
13
14
- if t .TYPE_CHECKING :
15
- import requests
16
-
17
14
SCHEMAS_DIR = Path (__file__ ).parent / Path ("./schemas" )
18
15
19
16
20
- class DbtPaginator (BaseOffsetPaginator ):
21
- """dbt API paginator."""
22
-
23
- def has_more (self , response : requests .Response ) -> bool :
24
- """Returns True until there are no more pages to retrieve.
25
-
26
- The API returns an 'extra' key with information about pagination:
27
- "extra":{"filters":{"limit":100,"offset":2,"account_id":1},"order_by":"id","pagination":{"count":100,"total_count":209}}}
28
- """
29
- data = response .json ()
30
- extra = data .get ("extra" , {})
31
- filters = extra .get ("filters" , {})
32
- pagination = extra .get ("pagination" , {})
33
-
34
- offset = filters .get ("offset" , 0 )
35
- total_count = pagination .get ("total_count" )
36
- count = pagination .get ("count" )
37
-
38
- # The pagination has more records when:
39
- # total_count is still greater than count and offset combined
40
- return count + offset < total_count
41
-
42
-
43
17
class AccountBasedStream (DBTStream ):
44
18
"""A stream that requires an account ID."""
45
19
@@ -59,7 +33,7 @@ def partitions(self) -> list[dict]:
59
33
)
60
34
raise ValueError (errmsg )
61
35
62
- def get_new_paginator (self ) -> DbtPaginator :
36
+ def get_new_paginator (self ) -> BaseOffsetPaginator :
63
37
"""Return a new paginator instance for this stream."""
64
38
page_size = self .config ["page_size" ]
65
39
@@ -68,7 +42,7 @@ def get_new_paginator(self) -> DbtPaginator:
68
42
page_size ,
69
43
)
70
44
71
- return DbtPaginator (start_value = 0 , page_size = page_size )
45
+ return BaseOffsetPaginator (start_value = 0 , page_size = page_size )
72
46
73
47
def get_url_params (
74
48
self ,
0 commit comments