-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlong_term_database_manager.py
116 lines (102 loc) · 4.23 KB
/
long_term_database_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
import shutil
import json
import datetime
import pytz
import logging
from remotes import KaggleUploader
class LongTermDatasetManager:
def __init__(
self,
long_term_db_target=KaggleUploader,
app_folder=".",
enabled_scrapers=None,
enabled_file_types=None,
dataset="israeli-supermarkets-2024",
):
self.when = self._now()
self.dataset = dataset
self.enabled_scrapers = (
"ALL" if not enabled_scrapers else ",".join(enabled_scrapers)
)
self.enabled_file_types = (
"ALL" if not enabled_file_types else ",".join(enabled_file_types)
)
self.dataset_path = os.path.join(app_folder, self.dataset)
self.remote_database = long_term_db_target(
dataset_path=self.dataset_path, when=self.when
)
logging.info(f"Dataset path: {self.dataset_path}")
def _now(self):
return datetime.datetime.now(pytz.timezone("Asia/Jerusalem")).strftime(
"%d/%m/%Y, %H:%M:%S"
)
def read_parser_status(self, outputs_folder):
with open(f"{outputs_folder}/parser-status.json", "r") as file:
data = json.load(file)
descriptions = []
for entry in data:
if "response" in entry and entry["response"]["file_was_created"]:
descriptions.append(
{
"path": os.path.split(entry["response"]["file_created_path"])[
-1
],
"description": f"{len(entry['response']['files_to_process'])} XML files from type {entry['response']['files_types']} published by '{entry['store_enum']}' ",
}
)
return descriptions
def read_scraper_status_files(self, status_folder):
descriptions = []
for file in os.listdir(status_folder):
if file.endswith(".json"):
descriptions.append(
{
"path": file,
"description": f"Scraper status file for '{file}' execution.",
}
)
return descriptions
def compose(self, outputs_folder, status_folder):
if not os.path.exists(self.dataset_path):
os.makedirs(self.dataset_path, exist_ok=True)
with open(f"{self.dataset_path}/dataset-metadata.json", "w") as file:
json.dump(
{
"title": "Israeli Supermarkets 2024",
"id": f"erlichsefi/{self.dataset}",
"resources": [
{
"path": "index.json",
"description": "Index mapping between Kaggle versions and dataset creation times",
},
{
"path": "parser-status.json",
"description": "Parser status file",
},
]
+ self.read_parser_status(outputs_folder)
+ self.read_scraper_status_files(status_folder),
},
file,
)
shutil.copytree(outputs_folder, self.dataset_path, dirs_exist_ok=True)
shutil.copytree(status_folder, self.dataset_path, dirs_exist_ok=True)
self.remote_database.increase_index()
def upload(self):
"""
Upload a new file to an existing Kaggle dataset.
:param dataset: str, the dataset to upload to in the format 'owner/dataset-name'
:param file_path: str, the path to the file to upload
:param new_file_name: str, optional new name for the file in the dataset
"""
try:
self.remote_database.upload_to_dataset(
message=f"Update-Time: {self.when}, Scrapers:{self.enabled_scrapers}, Files:{self.enabled_file_types}"
)
except Exception as e:
logging.critical(f"Error uploading file: {e}")
raise ValueError(f"Error uploading file: {e}")
def clean(self):
shutil.rmtree(self.dataset_path)
self.remote_database.clean()