|
1 | | -# blocks/codependency_matrix_block.py |
2 | 1 | from __future__ import annotations |
| 2 | +# TELF/pipeline/blocks/beaver_codependency_matrix_block.py |
3 | 3 |
|
4 | 4 | from pathlib import Path |
5 | 5 | from typing import Any, Dict, Sequence, Tuple |
6 | 6 |
|
7 | | -import os, sparse |
8 | | -from ...pre_processing import Beaver |
9 | | -from ...helpers.file_system import load_file_as_dict |
| 7 | +import numpy as np |
| 8 | +import pickle |
| 9 | +import scipy.sparse as sp |
| 10 | +import sparse # pydata/sparse |
10 | 11 |
|
11 | 12 | from .base_block import AnimalBlock |
12 | 13 | from .data_bundle import DataBundle, SAVE_DIR_BUNDLE_KEY |
13 | 14 |
|
| 15 | +# Ensure Beaver.coauthor_tensor is patched to a robust version on import |
| 16 | +# from ...pre_processing import Beaver |
| 17 | + |
| 18 | +# TELF/pre_processing/Beaver/monkey_patch_coauthor_tensor.py |
| 19 | +""" |
| 20 | +Monkey-patch Beaver.coauthor_tensor to be robust to: |
| 21 | +- n_jobs <= 0 (uses CPU count) |
| 22 | +- empty / missing authors (writes an empty but valid tensor) |
| 23 | +- missing 'year' column (uses 0) |
| 24 | +Also saves authors/time index maps for downstream consumers. |
| 25 | +""" |
| 26 | + |
| 27 | + |
| 28 | +import os |
| 29 | +import pickle |
| 30 | +from typing import Dict, List, Tuple |
| 31 | + |
| 32 | +import numpy as np |
| 33 | +import pandas as pd |
| 34 | +import sparse # pydata/sparse |
| 35 | + |
| 36 | +from ...pre_processing import Beaver |
| 37 | + |
| 38 | + |
| 39 | +def _safe_coauthor_tensor( |
| 40 | + self: Beaver, |
| 41 | + *, |
| 42 | + dataset: pd.DataFrame, |
| 43 | + target_columns: List[str], |
| 44 | + split_authors_with: str = ";", |
| 45 | + verbose: int = 0, |
| 46 | + save_path: str | None = None, |
| 47 | + n_nodes: int | None = None, |
| 48 | + n_jobs: int = 1, |
| 49 | + joblib_backend: str | None = None, |
| 50 | + authors_idx_map: Dict[str, int] | None = None, |
| 51 | + time_idx_map: Dict[int, int] | None = None, |
| 52 | + return_object: bool = False, |
| 53 | + output_mode: str | None = None, |
| 54 | +): |
| 55 | + auth_col, time_col = target_columns |
| 56 | + df = dataset.copy() |
| 57 | + |
| 58 | + if time_col not in df.columns: |
| 59 | + df[time_col] = 0 |
| 60 | + |
| 61 | + # Parse authors per doc |
| 62 | + auth_series = ( |
| 63 | + df[auth_col].fillna("") |
| 64 | + .astype(str) |
| 65 | + .str.split(split_authors_with) |
| 66 | + .apply(lambda lst: [a.strip() for a in lst if a and a.strip()]) |
| 67 | + ) |
| 68 | + times = df[time_col].fillna(0).astype(int).tolist() |
| 69 | + |
| 70 | + # Build indices |
| 71 | + if authors_idx_map is None: |
| 72 | + unique_authors = sorted({a for lst in auth_series.tolist() for a in lst}) |
| 73 | + a2i: Dict[str, int] = {a: i for i, a in enumerate(unique_authors)} |
| 74 | + else: |
| 75 | + a2i = dict(authors_idx_map) |
| 76 | + |
| 77 | + if time_idx_map is None: |
| 78 | + unique_times = sorted(set(int(t) for t in times)) |
| 79 | + if not unique_times: |
| 80 | + unique_times = [0] |
| 81 | + t2i: Dict[int, int] = {t: i for i, t in enumerate(unique_times)} |
| 82 | + else: |
| 83 | + t2i = dict(time_idx_map) |
| 84 | + |
| 85 | + A = len(a2i) |
| 86 | + T = len(t2i) if t2i else 1 |
| 87 | + |
| 88 | + # Early exit: no authors → empty but valid 3D tensor (0 x 0 x max(1,T)) |
| 89 | + if A == 0: |
| 90 | + coo = sparse.COO(np.zeros((0, 0, T), dtype=np.float32)) |
| 91 | + if save_path: |
| 92 | + os.makedirs(save_path, exist_ok=True) |
| 93 | + sparse.save_npz(os.path.join(save_path, "coauthor.npz"), coo) |
| 94 | + with open(os.path.join(save_path, "authors_idx_map.p"), "wb") as f: |
| 95 | + pickle.dump(a2i, f) |
| 96 | + with open(os.path.join(save_path, "time_idx_map.p"), "wb") as f: |
| 97 | + pickle.dump(t2i, f) |
| 98 | + return coo if return_object else None |
| 99 | + |
| 100 | + # Build weighted undirected pairs per time |
| 101 | + from collections import Counter |
| 102 | + |
| 103 | + weight = Counter() |
| 104 | + for lst, t in zip(auth_series.tolist(), times): |
| 105 | + idxs = [a2i[a] for a in lst if a in a2i] |
| 106 | + if len(idxs) < 2: |
| 107 | + continue |
| 108 | + ti = t2i.get(int(t), next(iter(t2i.values())) if t2i else 0) |
| 109 | + for i in range(len(idxs)): |
| 110 | + for j in range(i + 1, len(idxs)): |
| 111 | + u, v = idxs[i], idxs[j] |
| 112 | + weight[(u, v, ti)] += 1 |
| 113 | + weight[(v, u, ti)] += 1 # undirected |
| 114 | + |
| 115 | + if weight: |
| 116 | + coords = np.array(list(zip(*weight.keys()))) |
| 117 | + data = np.array(list(weight.values()), dtype=np.float32) |
| 118 | + coo = sparse.COO(coords, data, shape=(A, A, T)) |
| 119 | + else: |
| 120 | + coo = sparse.COO(np.zeros((A, A, T), dtype=np.float32)) |
| 121 | + |
| 122 | + if save_path: |
| 123 | + os.makedirs(save_path, exist_ok=True) |
| 124 | + sparse.save_npz(os.path.join(save_path, "coauthor.npz"), coo) |
| 125 | + with open(os.path.join(save_path, "authors_idx_map.p"), "wb") as f: |
| 126 | + pickle.dump(a2i, f) |
| 127 | + with open(os.path.join(save_path, "time_idx_map.p"), "wb") as f: |
| 128 | + pickle.dump(t2i, f) |
| 129 | + |
| 130 | + return coo if return_object else None |
| 131 | + |
| 132 | + |
| 133 | +# Apply the patch at import time |
| 134 | +Beaver.coauthor_tensor = _safe_coauthor_tensor # type: ignore[misc] |
| 135 | + |
14 | 136 |
|
15 | 137 | class CodependencyMatrixBlock(AnimalBlock): |
16 | 138 | """ |
17 | | - Build a 3-mode author–year tensor and flatten it to a co-authorship |
18 | | - matrix + node-ID map. |
| 139 | + Build a (flattened) co-dependency matrix from a column of semicolon-separated ids + 'year'. |
19 | 140 |
|
20 | | - ───────────────────────────────────────────────────────────── |
21 | | - needs : ('df',) |
22 | | - provides : ('X', 'node_ids') |
23 | | - tag : 'CodeMatrix' |
| 141 | + needs: ['df'] |
| 142 | + provides: ['X', 'node_ids'] |
24 | 143 | """ |
25 | | - CANONICAL_NEEDS = ('df', ) |
26 | 144 |
|
27 | | - # ------------------------------------------------------------------ # |
28 | | - # constructor # |
29 | | - # ------------------------------------------------------------------ # |
30 | 145 | def __init__( |
31 | 146 | self, |
32 | 147 | *, |
33 | | - col: str = "slic_author_ids", |
34 | | - needs: Sequence[str] = CANONICAL_NEEDS, |
| 148 | + col: str, |
| 149 | + needs: Sequence[str] = ("df",), |
35 | 150 | provides: Sequence[str] = ("X", "node_ids"), |
36 | | - conditional_needs: Sequence[Tuple[str, Any]] = (), # none for now |
37 | | - tag: str = "CodeMatrix", |
| 151 | + tag: str = "BeaverCodependencyMatrix", |
| 152 | + conditional_needs: Sequence[Tuple[str, Any]] = (), |
38 | 153 | init_settings: Dict[str, Any] | None = None, |
39 | 154 | call_settings: Dict[str, Any] | None = None, |
40 | | - verbose: bool = True, |
41 | | - **kwargs: Any, |
| 155 | + **kw: Any, |
42 | 156 | ) -> None: |
43 | | - |
44 | | - self.col = col # store the column name |
45 | | - |
46 | | - default_init = {} |
47 | | - default_call = { |
48 | | - "target_columns": [self.col, "year"], |
49 | | - "split_authors_with": ";", |
50 | | - "verbose": True, |
51 | | - "n_jobs": -1, |
52 | | - "authors_idx_map": {}, |
53 | | - "joblib_backend": "threading", |
54 | | - } |
55 | | - |
| 157 | + self.col = col |
| 158 | + default_init: Dict[str, Any] = {} |
| 159 | + default_call: Dict[str, Any] = {"split_authors_with": ";", "n_jobs": 1} |
56 | 160 | super().__init__( |
57 | 161 | needs=needs, |
58 | 162 | provides=provides, |
59 | 163 | conditional_needs=conditional_needs, |
60 | 164 | tag=tag, |
61 | | - init_settings=self._merge(default_init, init_settings), |
62 | | - call_settings=self._merge(default_call, call_settings), |
63 | | - verbose=verbose, |
64 | | - **kwargs, |
| 165 | + init_settings={**default_init, **(init_settings or {})}, |
| 166 | + call_settings={**default_call, **(call_settings or {})}, |
| 167 | + **kw, |
65 | 168 | ) |
66 | 169 |
|
67 | | - # ------------------------------------------------------------------ # |
68 | | - # work # |
69 | | - # ------------------------------------------------------------------ # |
70 | 170 | def run(self, bundle: DataBundle) -> None: |
71 | | - # paths |
72 | | - out_dir = Path(bundle[SAVE_DIR_BUNDLE_KEY]) / "CodependencyMatrixBlock" / self.col |
73 | | - out_dir.mkdir(parents=True, exist_ok=True) |
| 171 | + raw = bundle[self.needs[0]] |
| 172 | + df = self.load_path(raw) if isinstance(raw, (str, Path)) else raw |
74 | 173 |
|
75 | | - # dataframe |
76 | | - df = bundle[self.needs[0]].copy() |
| 174 | + # Ensure 'year' exists (Beaver expects it for the 3-mode tensor) |
| 175 | + if "year" not in df.columns: |
| 176 | + df = df.copy() |
| 177 | + df["year"] = 0 |
77 | 178 |
|
78 | | - # build tensor with Beaver |
79 | | - beaver = Beaver(**self.init_settings) |
80 | | - cfg = dict(self.call_settings) |
81 | | - cfg.update({"dataset": df, "target_columns": [self.col, "year"], "save_path": out_dir}) |
| 179 | + out_dir = Path(bundle[SAVE_DIR_BUNDLE_KEY]) / self.tag |
| 180 | + out_dir.mkdir(parents=True, exist_ok=True) |
82 | 181 |
|
| 182 | + # Run Beaver to materialize a 3D co-author tensor (A x A x T) |
| 183 | + beaver = Beaver() |
| 184 | + cfg = dict(self.call_settings) |
| 185 | + cfg.update( |
| 186 | + { |
| 187 | + "dataset": df, |
| 188 | + "target_columns": [self.col, "year"], |
| 189 | + "save_path": str(out_dir), |
| 190 | + } |
| 191 | + ) |
83 | 192 | beaver.coauthor_tensor(**cfg) |
84 | 193 |
|
85 | | - # load results |
86 | | - X = sparse.load_npz(out_dir / "coauthor.npz").sum(axis=2) # flatten 3-mode tensor |
87 | | - node_ids = load_file_as_dict(out_dir / "Authors.txt") |
88 | | - |
89 | | - # write back under this block’s namespace |
90 | | - bundle[f"{self.tag}.{self.provides[0]}"] = X |
91 | | - bundle[f"{self.tag}.{self.provides[1]}"] = node_ids |
| 194 | + # Load the tensor and flatten across time |
| 195 | + coo3: "sparse.COO" = sparse.load_npz(out_dir / "coauthor.npz") |
| 196 | + # flatten T mode → 2D A x A |
| 197 | + coo2 = coo3.sum(axis=2) |
| 198 | + |
| 199 | + # Convert to scipy.sparse (CSR) |
| 200 | + if hasattr(coo2, "coords") and hasattr(coo2, "data"): |
| 201 | + rows, cols = coo2.coords[0], coo2.coords[1] |
| 202 | + X = sp.csr_matrix((coo2.data, (rows, cols)), shape=coo2.shape) |
| 203 | + else: |
| 204 | + # Fallback |
| 205 | + X = sp.csr_matrix(np.asarray(coo2)) |
| 206 | + |
| 207 | + # Node id order from Beaver's authors_idx_map if present |
| 208 | + a_map_path = out_dir / "authors_idx_map.p" |
| 209 | + if a_map_path.exists(): |
| 210 | + with open(a_map_path, "rb") as f: |
| 211 | + a2i: Dict[str, int] = pickle.load(f) |
| 212 | + node_ids = [None] * len(a2i) |
| 213 | + for a, idx in a2i.items(): |
| 214 | + node_ids[idx] = a |
| 215 | + else: |
| 216 | + # Fallback: derive from df (order might differ from Beaver) |
| 217 | + ids = ( |
| 218 | + df[self.col] |
| 219 | + .dropna() |
| 220 | + .astype(str) |
| 221 | + .str.split(cfg.get("split_authors_with", ";")) |
| 222 | + .explode() |
| 223 | + .str.strip() |
| 224 | + ) |
| 225 | + ids = ids.loc[ids != ""].unique().tolist() |
| 226 | + node_ids = sorted(set(ids)) |
| 227 | + |
| 228 | + # Publish outputs (top-level keys; WolfBlock expects this) |
| 229 | + bundle[self.provides[0]] = X |
| 230 | + bundle[self.provides[1]] = node_ids |
0 commit comments