11
11
from datetime import datetime , timedelta , timezone
12
12
from functools import cached_property
13
13
14
- from diworker .diworker .importers .base import CSVBaseReportImporter
14
+ from diworker .diworker .importers .base import (
15
+ CSVBaseReportImporter , CSV_REWRITE_DAYS
16
+ )
15
17
import tools .optscale_time as opttime
16
18
import pyarrow .parquet as pq
17
19
@@ -175,9 +177,22 @@ def get_update_fields(self):
175
177
'_rec_n'
176
178
]
177
179
180
+ @staticmethod
181
+ def _is_first_import_in_month (last_import_dt : datetime ):
182
+ now = opttime .utcnow ()
183
+ if (last_import_dt .month + 1 == now .month and
184
+ last_import_dt .year == now .year ) or (
185
+ now .month == 1 and last_import_dt .year + 1 == now .year ):
186
+ return True
187
+
178
188
def get_current_reports (self , reports_groups , last_import_modified_at ):
179
189
current_reports = defaultdict (list )
180
190
reports_count = 0
191
+ # during first report in the current month download all reports
192
+ # from the previous month to do full reimport
193
+ if self ._is_first_import_in_month (last_import_modified_at ):
194
+ last_import_modified_at = opttime .startmonth (
195
+ last_import_modified_at )
181
196
for date , reports in reports_groups .items ():
182
197
for report in reports :
183
198
if report .get ('LastModified' , - 1 ) > last_import_modified_at :
@@ -188,6 +203,18 @@ def get_current_reports(self, reports_groups, last_import_modified_at):
188
203
LOG .info ('Selected %s reports' , reports_count )
189
204
return current_reports
190
205
206
+ @cached_property
207
+ def min_date_import_threshold (self ) -> datetime :
208
+ last_import_dt = datetime .fromtimestamp (
209
+ self .cloud_acc .get ('last_import_modified_at' , 0 ), tz = timezone .utc )
210
+ last_import_dt = last_import_dt .replace (
211
+ hour = 0 , minute = 0 , second = 0 , microsecond = 0
212
+ )
213
+ if self ._is_first_import_in_month (last_import_dt ):
214
+ # import full previous month on the first import in month
215
+ return last_import_dt .replace (day = 1 )
216
+ return last_import_dt - timedelta (days = CSV_REWRITE_DAYS )
217
+
191
218
def get_raw_upsert_filters (self , expense ):
192
219
filters = super ().get_raw_upsert_filters (expense )
193
220
filters .update ({
0 commit comments