Skip to content

Commit 77515b2

Browse files
Refactor dataprocessing
1 parent 4881e21 commit 77515b2

File tree

5 files changed

+147
-176
lines changed

5 files changed

+147
-176
lines changed
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
defaults:
2-
- method: simple
3-
42
- override hydra/job_logging: custom
53
- _self_ # Primary config overrides values of configs from the Defaults List
64

@@ -10,4 +8,7 @@ hydra:
108
dir: .
119

1210
project_name: "MyProject"
13-
use_remote_storage: false
11+
tags: ["${now:%Y-%m-%d}"]
12+
list_size: 5
13+
split_date: "2019/04/09"
14+
history_size: 5

configs/data_processing/method/bpr.yaml

Lines changed: 0 additions & 1 deletion
This file was deleted.

configs/data_processing/method/simple.yaml

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/mypackage/data_processing/data_processing.py

Lines changed: 129 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,15 @@
1515

1616

1717
def load_raw_data() -> tuple[DataFrame, DataFrame]:
18+
"""
19+
Load raw interaction and impression data from local cache.
20+
If data is not available locally, download it from remote storage.
21+
22+
Returns:
23+
tuple[DataFrame, DataFrame]: DataFrames containing interactions and impressions data.
24+
"""
1825
data_dir = get_cache_path() / "data-cw10m"
19-
# Download data if necessary
26+
# Download data if doesn't exist
2027
if not data_dir.exists():
2128
log.info("Data are not available locally!")
2229
log.info(f"Downloading data from remote storage to {data_dir} ...")
@@ -36,28 +43,66 @@ def load_raw_data() -> tuple[DataFrame, DataFrame]:
3643
return interactions, impressions
3744

3845

39-
def process_simple(
46+
def process_data(
4047
interactions: DataFrame,
4148
impressions: DataFrame,
49+
list_size: int,
50+
split_date: str,
51+
history_size: int,
4252
) -> tuple[DataFrame, DataFrame, DataFrame, dict]:
43-
interactions = _common(interactions, impressions)
44-
45-
# Split data (train: 2019/01/07-2019/04/13; val: 2019/04/14-2019/04/15)
46-
split_date = dt.datetime(2019, 4, 14)
47-
train = interactions[interactions["timestamp"] < split_date]
48-
val = interactions[interactions["timestamp"] >= split_date]
49-
50-
# Keep lists with condition 0 < mean_target < 1
51-
train_valid_lists = (
52-
train.groupby("list_id")
53-
.agg({"target": "mean"})
54-
.rename(columns={"target": "mean"})
55-
.reset_index()
53+
"""
54+
Process and clean raw interaction and impression data, splitting it into training and validation sets.
55+
56+
Args:
57+
interactions (DataFrame): Raw interactions data.
58+
impressions (DataFrame): Raw impressions data.
59+
list_size (int): Minimum list size to be included in the dataset.
60+
split_date (str): Date string to split train and validation sets (format: YYYY/MM/DD).
61+
history_size (int): Number of past clicks to store as user history.
62+
63+
Returns:
64+
tuple[DataFrame, DataFrame, DataFrame, dict]:
65+
- Processed training dataset.
66+
- Processed validation dataset.
67+
- User-to-index mapping DataFrame.
68+
- Item-to-index mapping DataFrame.
69+
"""
70+
data = _preprocess_data(interactions, impressions)
71+
72+
# Remove lists shorter than 'list_size'
73+
data_valid_lists = (
74+
data.groupby("list_id").size().reset_index().rename(columns={0: "list_size"})
5675
)
57-
train_valid_lists = train_valid_lists[
58-
(train_valid_lists["mean"] > 0) & (train_valid_lists["mean"] < 1)
59-
]
60-
train = train.merge(train_valid_lists, "inner", "list_id")
76+
data_valid_lists = data_valid_lists[data_valid_lists["list_size"] >= list_size]
77+
data_valid_lists = data_valid_lists.drop(columns="list_size")
78+
data = data.merge(data_valid_lists, "inner", "list_id")
79+
80+
# Split data (train: 2019/01/07-2019/04/08; val: 2019/04/09-2019/04/15)
81+
split_date = dt.datetime.strptime(split_date, "%Y/%m/%d")
82+
train = data[data["timestamp"] < split_date]
83+
val = data[data["timestamp"] >= split_date]
84+
85+
# Remove overlapping lists between train and val
86+
train_lists = train["list_id"].unique()
87+
val_lists = val["list_id"].unique()
88+
train_lists_expected = np.setdiff1d(train_lists, val_lists)
89+
val_lists_expected = np.setdiff1d(val_lists, train_lists)
90+
train = train[train["list_id"].isin(train_lists_expected)]
91+
val = val[val["list_id"].isin(val_lists_expected)]
92+
93+
# Remove cold users from val
94+
train_users = train["user"].unique()
95+
val = val[val["user"].isin(train_users)]
96+
97+
# Remove lists containing cold items on val
98+
train_items = train["item"].unique()
99+
val_items = val["item"].unique()
100+
cold_items = np.setdiff1d(val_items, train_items)
101+
val["cold_item"] = val["item"].isin(cold_items)
102+
n_cold_items_per_list = val.groupby("list_id")["cold_item"].sum()
103+
valid_lists = n_cold_items_per_list[n_cold_items_per_list == 0].index
104+
val = val[val["list_id"].isin(valid_lists)]
105+
val = val.drop(columns="cold_item")
61106

62107
# Prepare user/item to idx mappers based on train data
63108
unique_train_users = train["user"].unique()
@@ -69,153 +114,73 @@ def process_simple(
69114
{"item": unique_train_items, "item_idx": np.arange(unique_train_items.size)}
70115
)
71116

72-
# Map user/item to idx - it removes cold users and items from validation
73-
train = train.merge(user_mapper, on="user", how="inner")
74-
train = train.merge(item_mapper, on="item", how="inner")
75-
val = val.merge(user_mapper, on="user", how="inner")
76-
val = val.merge(item_mapper, on="item", how="inner")
77-
78-
# Keep lists with condition 0 < mean_target < 1
79-
val_valid_lists = (
80-
val.groupby("list_id")
81-
.agg({"target": "mean"})
82-
.rename(columns={"target": "mean"})
83-
.reset_index()
117+
# Create user_history column - list of last n clicked items per user
118+
train_clicks = train.sort_values(by=["user", "timestamp"])
119+
train_clicks = train_clicks[train_clicks["target"] == 1].reset_index(drop=True)
120+
121+
def last_clicks(series):
122+
history = []
123+
result = []
124+
for item in series:
125+
result.append(history.copy()) # Append the current state of history
126+
if len(history) == history_size:
127+
history.pop(0) # Keep only the last n items
128+
history.append(item)
129+
# Pad with None if history is shorter than threshold
130+
return [([None] * (history_size - len(h)) + h) for h in result]
131+
132+
# Apply function per user
133+
train_clicks["user_history"] = train_clicks.groupby("user")["item"].transform(
134+
last_clicks
84135
)
85-
val_valid_lists = val_valid_lists[
86-
(val_valid_lists["mean"] > 0) & (val_valid_lists["mean"] < 1)
87-
]
88-
val = val.merge(val_valid_lists, "inner", "list_id")
136+
train = train.merge(
137+
train_clicks[["timestamp", "user", "user_history"]],
138+
on=["timestamp", "user"],
139+
how="left",
140+
)
141+
tmp = train_clicks.loc[
142+
train_clicks.groupby("user")["timestamp"].idxmax()
143+
].reset_index(drop=True)
144+
tmp = tmp[["user", "user_history"]]
145+
val = val.merge(tmp, "inner", "user")
89146

147+
# Sort train and val by timestamp
90148
train = train.sort_values("timestamp").reset_index(drop=True)
91149
val = val.sort_values("timestamp").reset_index(drop=True)
92150

93-
# Select valid columns
94-
train = train[["timestamp", "list_id", "user_idx", "item_idx", "target"]]
95-
train.columns = ["timestamp", "list_id", "user", "item", "target"]
96-
val = val[["timestamp", "list_id", "user_idx", "item_idx", "target"]]
97-
val.columns = ["timestamp", "list_id", "user", "item", "target"]
98-
99-
# Mock test_data
100-
test = val.copy() # test set == validation set (should be changed in the future!)
101-
102-
# Prepare statistics
103-
unique_val_users = val["user"].unique()
104-
unique_val_items = val["item"].unique()
105-
stats = {}
106-
stats["train_n_users"] = unique_train_users.size
107-
stats["train_n_items"] = unique_train_items.size
108-
stats["train_n_lists"] = train["list_id"].nunique()
109-
stats["train_n_clicks"] = int(train["target"].sum())
110-
stats["train_n_impressions"] = len(train) - stats["train_n_clicks"]
111-
stats["train_ctr"] = stats["train_n_clicks"] / stats["train_n_impressions"]
112-
stats["val_n_users"] = unique_val_users.size
113-
stats["val_n_items"] = unique_val_items.size
114-
stats["val_n_lists"] = val["list_id"].nunique()
115-
stats["val_n_clicks"] = int(val["target"].sum())
116-
stats["val_n_impressions"] = len(val) - stats["val_n_clicks"]
117-
stats["val_ctr"] = stats["val_n_clicks"] / stats["val_n_impressions"]
118-
119-
return train, val, test, stats, user_mapper, item_mapper
120-
121-
122-
def process_bpr(
123-
interactions: DataFrame,
124-
impressions: DataFrame,
125-
) -> tuple[DataFrame, DataFrame, DataFrame, dict]:
126-
interactions = _common(interactions, impressions)
127-
128-
# Split data
129-
split_date = dt.datetime(2019, 4, 14)
130-
train = interactions[interactions["timestamp"] < split_date]
131-
tmp0 = train.loc[train["target"] == 0, ["user", "item"]]
132-
tmp1 = train.loc[train["target"] == 1, ["user", "item"]]
133-
train = tmp0.merge(tmp1, "inner", "user", suffixes=("_neg", "_pos"))
134-
val = interactions[interactions["timestamp"] >= split_date]
135-
136-
# Prepare user/item to idx mappers based on train data
137-
unique_train_users = train["user"].unique()
138-
# unique_users = train["user"].unique()
139-
item_neg_set = set(train["item_neg"])
140-
item_pos_set = set(train["item_pos"])
141-
unique_train_items = pd.Series(list(item_neg_set | item_pos_set)).unique()
142-
143-
user_mapper = pd.DataFrame(
144-
{"user": unique_train_users, "user_idx": np.arange(unique_train_users.size)}
145-
)
146-
item_mapper = pd.DataFrame(
147-
{"item": unique_train_items, "item_idx": np.arange(unique_train_items.size)}
148-
)
149-
150-
# Map user/item to idx and handle column names conflicts
151-
train = train.merge(user_mapper, on="user", how="inner")
152-
train = train[["user_idx", "item_neg", "item_pos"]].rename(
153-
columns={"user_idx": "user"}
154-
)
155-
train = train.merge(item_mapper, left_on="item_neg", right_on="item", how="inner")
156-
train = train[["user", "item_idx", "item_pos"]].rename(
157-
columns={"item_idx": "item_neg"}
158-
)
159-
train = train.merge(item_mapper, left_on="item_pos", right_on="item", how="inner")
160-
train = train[["user", "item_neg", "item_idx"]].rename(
161-
columns={"item_idx": "item_pos"}
162-
)
163-
164-
val = val.merge(user_mapper, on="user", how="inner")
165-
val = val.merge(item_mapper, on="item", how="inner")
166-
167-
# Keep lists with condition 0 < mean_target < 1
168-
val_valid_lists = (
169-
val.groupby("list_id")
170-
.agg({"target": "mean"})
171-
.rename(columns={"target": "mean"})
172-
.reset_index()
173-
)
174-
val_valid_lists = val_valid_lists[
175-
(val_valid_lists["mean"] > 0) & (val_valid_lists["mean"] < 1)
176-
]
177-
val = val.merge(val_valid_lists, "inner", "list_id")
178-
179-
val = val[["timestamp", "list_id", "user_idx", "item_idx", "target"]]
180-
val = val.rename(columns={"user_idx": "user", "item_idx": "item"})
181-
182-
# Mock test_data
183-
test = val.copy() # test set == validation set (to change in the future!)
184-
185-
# Prepare statistics
186-
unique_val_users = val["user"].unique()
187-
unique_val_items = val["item"].unique()
188-
stats = {}
189-
stats["train_n_users"] = unique_train_users.size
190-
stats["train_n_items"] = unique_train_items.size
191-
stats["val_n_users"] = unique_val_users.size
192-
stats["val_n_items"] = unique_val_items.size
193-
stats["val_n_lists"] = val["list_id"].nunique()
194-
stats["val_n_clicks"] = int(val["target"].sum())
195-
stats["val_n_impressions"] = len(val) - stats["val_n_clicks"]
196-
stats["val_ctr"] = stats["val_n_clicks"] / stats["val_n_impressions"]
197-
198-
return train, val, test, stats, user_mapper, item_mapper
151+
return train, val, user_mapper, item_mapper
199152

200153

201154
def save_data(
202155
task: Task,
203156
train: DataFrame,
204157
val: DataFrame,
205-
test: DataFrame,
206-
stats: dict,
207158
user_mapper: DataFrame,
208159
item_mapper: DataFrame,
209160
) -> None:
161+
"""
162+
Save processed data and mappings as artifacts in ClearML.
163+
164+
Args:
165+
task (Task): ClearML task instance.
166+
train (DataFrame): Processed training dataset.
167+
val (DataFrame): Processed validation dataset.
168+
user_mapper (DataFrame): User-to-index mapping.
169+
item_mapper (DataFrame): Item-to-index mapping.
170+
"""
210171
task.upload_artifact("train", train, extension_name=".parquet")
211172
task.upload_artifact("val", val, extension_name=".parquet")
212-
task.upload_artifact("test", test, extension_name=".parquet")
213-
task.upload_artifact("stats", stats)
214173
task.upload_artifact("user_mapper", user_mapper, extension_name=".parquet")
215174
task.upload_artifact("item_mapper", item_mapper, extension_name=".parquet")
216175

217176

218177
def _download_data(data_dir: Path) -> None:
178+
"""
179+
Download interaction and impression data from remote S3 storage.
180+
181+
Args:
182+
data_dir (Path): Local directory path to store downloaded data.
183+
"""
219184
s3 = s3fs.S3FileSystem()
220185
prefix = "s3://kf-north-bucket/data-science-template/data/contentwise/CW10M"
221186

@@ -236,10 +201,13 @@ def _download_data(data_dir: Path) -> None:
236201
df.to_parquet(f"{data_dir}/impressions-direct-link/{Path(p).name}")
237202

238203

239-
def _common(
204+
def _preprocess_data(
240205
interactions: DataFrame,
241206
impressions_dl: DataFrame,
242207
) -> DataFrame:
208+
"""
209+
Preprocess raw interactions and impressions data for further processing.
210+
"""
243211
# Select only movies from item types
244212
interactions = interactions[interactions["item_type"] == 0]
245213
# Select only clicks as an interaction type
@@ -250,28 +218,33 @@ def _common(
250218
unit="ms",
251219
)
252220

221+
# Assume that user can have only one interaction at exact timestamp
222+
interactions = interactions.drop_duplicates(["utc_ts_milliseconds", "user_id"])
223+
253224
impressions_dl = impressions_dl.explode("recommended_series_list")
254225
impressions_dl["recommended_series_list"] = pd.to_numeric(
255226
impressions_dl["recommended_series_list"]
256227
)
257228

258229
# Join positive interactions (clicks) with negative interactions (impressions)
259-
interactions = interactions.merge(impressions_dl, "inner", "recommendation_id")
230+
data = interactions.merge(impressions_dl, "inner", "recommendation_id")
260231

261-
# Create unique id per (recommandation_id, user_id) pairs
262-
interactions["list_id"] = pd.factorize(
263-
interactions[["recommendation_id", "user_id"]].apply(tuple, axis=1)
232+
# Create unique id per (utc_ts_milliseconds, recommandation_id, user_id)
233+
data["list_id"] = pd.factorize(
234+
data[["utc_ts_milliseconds", "recommendation_id", "user_id"]].apply(
235+
tuple, axis=1
236+
)
264237
)[0]
265238

266239
# Mark positive interactions with 1 and negative with 0
267-
interactions["target"] = np.where(
268-
interactions["series_id"] == interactions["recommended_series_list"],
240+
data["target"] = np.where(
241+
data["series_id"] == data["recommended_series_list"],
269242
1,
270243
0,
271244
)
272-
interactions["target"] = interactions["target"].astype("int32")
245+
data["target"] = data["target"].astype("int32")
273246

274-
interactions = interactions[
247+
data = data[
275248
[
276249
"utc_ts_milliseconds",
277250
"list_id",
@@ -280,6 +253,6 @@ def _common(
280253
"target",
281254
]
282255
]
283-
interactions.columns = ["timestamp", "list_id", "user", "item", "target"]
256+
data.columns = ["timestamp", "list_id", "user", "item", "target"]
284257

285-
return interactions
258+
return data

0 commit comments

Comments
 (0)