Skip to content

Commit b8dd18d

Browse files
author
Ryan Calvin Barron
committed
NER entittes to termite KG injection
1 parent 5948485 commit b8dd18d

15 files changed

+1726
-1767
lines changed

TELF/pipeline/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,6 @@
7070

7171
from .blocks.collect_hnmfk_leaf_block import CollectHNMFkLeafBlock
7272
from .blocks.termite_neo4j_block import TermiteNeo4jBlock
73-
from .blocks.termite_vector_block import TermiteVectorBlock
73+
from .blocks.termite_vector_block import TermiteVectorBlock
74+
75+
from .blocks.author_affiliation_tables import AffiliationsAndAuthorsBlock

TELF/pipeline/blocks/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,4 @@
6363
from .collect_hnmfk_leaf_block import CollectHNMFkLeafBlock
6464
from .termite_neo4j_block import TermiteNeo4jBlock
6565
from .termite_vector_block import TermiteVectorBlock
66+
from .author_affiliation_tables import AffiliationsAndAuthorsBlock

TELF/pipeline/blocks/artic_fox_block.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def run(self, bundle: DataBundle) -> None:
6868
model.load_model()
6969

7070
pipeline = ArcticFox(model=model, **self.init_settings)
71-
# pipeline.run_full_pipeline(data_df=df, vocab=vocabulary, **self.call_settings)
71+
pipeline.run_full_pipeline(data_df=df, vocab=vocabulary, **self.call_settings)
7272

7373
status_value = "Done"
7474

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# blocks/affiliations_and_authors_block.py
2+
from __future__ import annotations
3+
4+
from pathlib import Path
5+
from typing import Dict, Sequence, Any, Optional, List, Tuple, Union
6+
7+
import pandas as pd
8+
9+
from .base_block import AnimalBlock
10+
from .data_bundle import DataBundle, SAVE_DIR_BUNDLE_KEY
11+
12+
# your helpers live in blocks/block_helpers/
13+
from .block_helpers.affiliation_partition import generate_top_affiliations_with_country
14+
from .block_helpers.author_partition import write_top_authors_by_cluster
15+
16+
17+
class AffiliationsAndAuthorsBlock(AnimalBlock):
18+
"""
19+
Compute (affiliation, country, year) paper counts and top authors by cluster.
20+
21+
─────────────────────────────────────────────────────────────
22+
always needs : ('df',) – accepts a CSV path OR a pandas.DataFrame
23+
provides : ('affiliations_df', 'affiliations_csv',
24+
'authors_df', 'authors_csv')
25+
tag : 'AffilsAndAuthors' (namespace for its outputs)
26+
27+
Results are written under <bundle[SAVE_DIR_BUNDLE_KEY]>/<tag>/ .
28+
Checkpoints persist the two CSV paths so the block can be skipped on re-run.
29+
"""
30+
31+
CANONICAL_NEEDS = ("df",)
32+
33+
def __init__(
34+
self,
35+
*,
36+
needs: Sequence[str] = CANONICAL_NEEDS,
37+
provides: Sequence[str] = ("affiliations_df", "affiliations_csv",
38+
"authors_df", "authors_csv"),
39+
# Persist only the CSVs; DataFrames are rebuilt on load if needed.
40+
checkpoint_keys: Sequence[str] = ("affiliations_csv", "authors_csv"),
41+
conditional_needs: Sequence[tuple[str, Any]] = (),
42+
tag: str = "AffilsAndAuthors",
43+
# Defaults mirror your helper signatures
44+
init_settings: Dict[str, Any] | None = None,
45+
call_settings: Dict[str, Any] | None = None,
46+
**kw: Any,
47+
) -> None:
48+
49+
default_init: Dict[str, Any] = {}
50+
51+
default_call: Dict[str, Any] = {
52+
# generate_top_affiliations_with_country(...)
53+
"min_total_papers": 20,
54+
"country_filter": None, # exact match (including 'unknown') or None
55+
"partition_by_year": False,
56+
"per_year_output_dir": None, # if None and partition_by_year=True → use <tag>/by_year
57+
58+
# write_top_authors_by_cluster(...)
59+
"countries": None, # list[str] or None
60+
"top_n": 10,
61+
}
62+
63+
super().__init__(
64+
needs=needs,
65+
provides=provides,
66+
conditional_needs=list(conditional_needs or []),
67+
checkpoint_keys=checkpoint_keys,
68+
tag=tag,
69+
init_settings=self._merge(default_init, init_settings),
70+
call_settings=self._merge(default_call, call_settings),
71+
**kw,
72+
)
73+
74+
# ─────────────────────────────────────────────────────────────
75+
# helpers
76+
# ─────────────────────────────────────────────────────────────
77+
def _ensure_input_csv(self, bundle: DataBundle) -> Path:
78+
"""
79+
Accepts either a DataFrame or a path in bundle['df'].
80+
If a DataFrame, persist it to <save_dir>/<tag>/input.csv and return that path.
81+
"""
82+
src = bundle[self.needs[0]]
83+
save_dir = Path(bundle[SAVE_DIR_BUNDLE_KEY]) / self.tag
84+
save_dir.mkdir(parents=True, exist_ok=True)
85+
86+
if isinstance(src, pd.DataFrame):
87+
inp = save_dir / "input.csv"
88+
src.to_csv(inp, index=False, encoding="utf-8-sig")
89+
return inp
90+
91+
# let AnimalBlock’s path rewriter handle legacy numbered dirs
92+
return Path(src)
93+
94+
# ─────────────────────────────────────────────────────────────
95+
# work
96+
# ─────────────────────────────────────────────────────────────
97+
def run(self, bundle: DataBundle) -> None:
98+
df_path = self._ensure_input_csv(bundle)
99+
100+
out_dir = Path(bundle[SAVE_DIR_BUNDLE_KEY]) / self.tag
101+
out_dir.mkdir(parents=True, exist_ok=True)
102+
103+
# === 1) Affiliations with country (and per-year optional) ===
104+
affils_csv = out_dir / "affiliations_top.csv"
105+
per_year_dir = self.call_settings.get("per_year_output_dir")
106+
if self.call_settings.get("partition_by_year") and not per_year_dir:
107+
per_year_dir = out_dir / "by_year"
108+
109+
generate_top_affiliations_with_country(
110+
df_path=df_path,
111+
affils_output_path=affils_csv,
112+
min_total_papers=int(self.call_settings["min_total_papers"]),
113+
country_filter=self.call_settings.get("country_filter"),
114+
partition_by_year=bool(self.call_settings.get("partition_by_year")),
115+
per_year_output_dir=per_year_dir,
116+
)
117+
118+
# read back for the bundle
119+
aff_df = pd.read_csv(affils_csv) if affils_csv.is_file() else pd.DataFrame(
120+
columns=["affiliation_name", "country", "year", "paper_count"]
121+
)
122+
123+
# register checkpoints / provide
124+
self.register_checkpoint("affiliations_csv", affils_csv)
125+
bundle[f"{self.tag}.affiliations_csv"] = str(affils_csv)
126+
bundle[f"{self.tag}.affiliations_df"] = aff_df
127+
128+
# === 2) Top authors by cluster =============================
129+
authors_csv = out_dir / "top_authors_by_cluster.csv"
130+
# Note: helper param name is COUNTY_NAMES (kept as-is)
131+
result_df = write_top_authors_by_cluster(
132+
df_path=str(df_path),
133+
output_path=str(authors_csv),
134+
COUNTY_NAMES=self.call_settings.get("countries"),
135+
top_n=int(self.call_settings.get("top_n", 10)),
136+
)
137+
138+
# register checkpoints / provide
139+
self.register_checkpoint("authors_csv", authors_csv)
140+
bundle[f"{self.tag}.authors_csv"] = str(authors_csv)
141+
bundle[f"{self.tag}.authors_df"] = result_df

TELF/pipeline/blocks/block_helpers/__init__.py

Whitespace-only changes.
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import ast
2+
import json
3+
from pathlib import Path
4+
import pandas as pd
5+
6+
UNKNOWN_COUNTRY = "unknown"
7+
8+
def _parse_affiliations_with_country(raw) -> list[tuple[str, str]]:
9+
"""
10+
Parse a JSON/Python-literal dict into [(name, country), …].
11+
Always returns a country (missing ⇒ 'unknown'). Returns [] if unparseable.
12+
Expected shape (examples):
13+
'{"0": {"name": "MIT", "country": "United States"}}'
14+
'{0: {"name": "LANL"}}'
15+
"""
16+
if raw is None:
17+
return []
18+
try:
19+
if pd.isna(raw): # safe even if raw isn't a pandas scalar
20+
return []
21+
except Exception:
22+
pass
23+
24+
if isinstance(raw, dict):
25+
parsed = raw
26+
else:
27+
s = str(raw).strip()
28+
if s in ("", "{}", "[]"):
29+
return []
30+
try:
31+
parsed = json.loads(s)
32+
except Exception:
33+
try:
34+
parsed = ast.literal_eval(s)
35+
except Exception:
36+
return []
37+
38+
if not isinstance(parsed, dict):
39+
return []
40+
41+
out: list[tuple[str, str]] = []
42+
for info in parsed.values():
43+
if not isinstance(info, dict):
44+
continue
45+
name = info.get("name")
46+
if not isinstance(name, str) or not name.strip():
47+
continue
48+
country = info.get("country")
49+
if isinstance(country, str):
50+
country = country.strip() or UNKNOWN_COUNTRY
51+
elif country is None:
52+
country = UNKNOWN_COUNTRY
53+
else:
54+
country = str(country).strip() or UNKNOWN_COUNTRY
55+
out.append((name.strip(), country))
56+
return out
57+
58+
def generate_top_affiliations_with_country(
59+
df_path: str | Path,
60+
affils_output_path: str | Path,
61+
min_total_papers: int = 20,
62+
country_filter: str | None = None, # ← filter to exactly this country (or 'unknown')
63+
partition_by_year: bool = False, # ← also emit one CSV per year
64+
per_year_output_dir: str | Path | None = None,
65+
):
66+
"""
67+
Reads df_path (must have 'year' and 'affiliations'), computes per-(affiliation, country, year)
68+
paper counts for affiliations whose total_papers (within the current filter) ≥ min_total_papers.
69+
Always includes a 'country' value; if missing in the source, uses 'unknown'.
70+
71+
If country_filter is provided, restricts to that single country (exact string match, including 'unknown').
72+
"""
73+
df = pd.read_csv(df_path)
74+
75+
if 'year' not in df.columns:
76+
raise KeyError("Expected a 'year' column.")
77+
df['year'] = pd.to_numeric(df['year'], errors='coerce')
78+
df = df[df['year'].notna()].copy()
79+
df['year'] = df['year'].astype(int)
80+
81+
if 'affiliations' not in df.columns:
82+
raise KeyError("Expected an 'affiliations' column.")
83+
84+
# 1) Parse affiliations → list of (name, country or 'unknown')
85+
df_aff = df.copy()
86+
df_aff['affil_tuples'] = df_aff['affiliations'].apply(_parse_affiliations_with_country)
87+
88+
# 2) Explode into one row per (paper, affiliation_name, country)
89+
exploded_aff = df_aff.explode('affil_tuples')
90+
exploded_aff = exploded_aff[exploded_aff['affil_tuples'].notna()].copy()
91+
exploded_aff[['affiliation_name', 'country']] = pd.DataFrame(
92+
exploded_aff['affil_tuples'].tolist(), index=exploded_aff.index
93+
)
94+
# Defensive fill (should already be set by parser)
95+
exploded_aff['country'] = exploded_aff['country'].fillna(UNKNOWN_COUNTRY)
96+
97+
# 3) Optional: restrict to one specific country
98+
if country_filter is not None:
99+
exploded_aff = exploded_aff[exploded_aff['country'] == country_filter].copy()
100+
101+
# 4) Totals per affiliation (within current filter scope)
102+
total_per_aff = (
103+
exploded_aff
104+
.groupby(['affiliation_name', 'country'])
105+
.size()
106+
.reset_index(name='total_papers')
107+
.sort_values('total_papers', ascending=False)
108+
)
109+
110+
print("=== Affiliations",
111+
f"in [{country_filter}]" if country_filter is not None else "(all countries)",
112+
"with their total paper counts ===")
113+
print(total_per_aff.head(20).to_string(index=False))
114+
print("───────────────────────────────────────────────────────────────────────────\n")
115+
116+
# 5) Keep affiliations with ≥ min_total_papers
117+
top_affils = total_per_aff[total_per_aff['total_papers'] >= min_total_papers][
118+
['affiliation_name', 'country']
119+
]
120+
121+
if top_affils.empty:
122+
print(f"No affiliation{' in ' + country_filter if country_filter else ''} "
123+
f"meets ≥ {min_total_papers} total papers.")
124+
aff_year_counts = pd.DataFrame(columns=['affiliation_name','country','year','paper_count'])
125+
else:
126+
# 6) Per-year counts for top affiliations
127+
exploded_aff_top = exploded_aff.merge(top_affils, on=['affiliation_name','country'], how='inner')
128+
aff_year_counts = (
129+
exploded_aff_top
130+
.groupby(['affiliation_name','country','year'])
131+
.size()
132+
.reset_index(name='paper_count')
133+
.sort_values(['affiliation_name','country','year'])
134+
.reset_index(drop=True)
135+
)
136+
137+
# 7) Write consolidated CSV
138+
affils_output_path = Path(affils_output_path)
139+
affils_output_path.parent.mkdir(parents=True, exist_ok=True)
140+
if affils_output_path.suffix == "":
141+
affils_output_path = affils_output_path.with_suffix(".csv")
142+
aff_year_counts.to_csv(affils_output_path, index=False, encoding="utf-8-sig")
143+
144+
print(f"Wrote {len(aff_year_counts)} rows to {affils_output_path} "
145+
f"(≥ {min_total_papers} papers"
146+
f"{', country=' + country_filter if country_filter is not None else ', all countries'})")
147+
148+
# 8) Optional: one file per year (same columns)
149+
if partition_by_year and not aff_year_counts.empty:
150+
out_dir = Path(per_year_output_dir) if per_year_output_dir else affils_output_path.parent
151+
out_dir.mkdir(parents=True, exist_ok=True)
152+
stem = affils_output_path.stem
153+
suffix = affils_output_path.suffix or ".csv"
154+
for yr in sorted(aff_year_counts['year'].unique()):
155+
yr_df = aff_year_counts[aff_year_counts['year'] == yr]
156+
yr_path = out_dir / f"{stem}.year={yr}{suffix}"
157+
yr_df.to_csv(yr_path, index=False, encoding="utf-8-sig")
158+
print(f"→ Wrote {len(yr_df)} rows for year {yr} to {yr_path}")
159+
160+
# # All countries in output (missing → 'unknown'), consolidated CSV only
161+
# generate_top_affiliations_with_country(
162+
# "papers.csv", "out/affiliations_top.csv", min_total_papers=20
163+
# )
164+
165+
# # Only the United States (others excluded), plus per-year files
166+
# generate_top_affiliations_with_country(
167+
# "papers.csv", "out/affiliations_top.csv",
168+
# min_total_papers=10,
169+
# country_filter="United States",
170+
# partition_by_year=True,
171+
# per_year_output_dir="out/by_year"
172+
# )
173+
174+
# # Only entries whose country was missing in the source (now labeled 'unknown')
175+
# generate_top_affiliations_with_country(
176+
# "papers.csv", "out/affiliations_unknown.csv",
177+
# min_total_papers=5,
178+
# country_filter="unknown"
179+
# )

0 commit comments

Comments
 (0)