Skip to content

Commit 9e42969

Browse files
author
Ryan Calvin Barron
committed
post processing (artic fox, peacock) edits for lynx
1 parent 6a44f40 commit 9e42969

File tree

11 files changed

+388
-425
lines changed

11 files changed

+388
-425
lines changed

TELF/applications/Lynx/frontend/pages/doc_view.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
for ii in selected_nodes["checked"]:
182182
directory = st.session_state.data_map[ii]["path"]
183183
peacock_dir = os.path.join(directory, "peacock")
184+
print( peacock_dir )
184185
files = find_files_by_extensions(peacock_dir, extensions=("html", "png"))
185186
with st.expander(st.session_state.data_map[ii]["label"]):
186187
open_explorer_button(peacock_dir, key=f"button_tab2_{ii}")

TELF/pipeline/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,4 @@
7373
from .blocks.termite_vector_block import TermiteVectorBlock
7474

7575
from .blocks.author_affiliation_tables import AffiliationsAndAuthorsBlock
76+
from .blocks.block_helpers.KernelServer import KernelTiedServer

TELF/pipeline/blocks/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,6 @@
6363
from .collect_hnmfk_leaf_block import CollectHNMFkLeafBlock
6464
from .termite_neo4j_block import TermiteNeo4jBlock
6565
from .termite_vector_block import TermiteVectorBlock
66-
from .author_affiliation_tables import AffiliationsAndAuthorsBlock
66+
from .author_affiliation_tables import AffiliationsAndAuthorsBlock
67+
68+
from .block_helpers.KernelServer import KernelTiedServer

TELF/pipeline/blocks/artic_fox_block.py

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
from pathlib import Path
2-
from typing import Dict, Sequence, Any, Tuple
2+
from typing import Dict, Any
33
from .base_block import AnimalBlock
44
from .data_bundle import DataBundle, SAVE_DIR_BUNDLE_KEY
55

66
from ...post_processing import ArcticFox
77
from ...factorization import HNMFk
88

9+
910
class ArticFoxBlock(AnimalBlock):
11+
"""
12+
Block wrapper for the ArcticFox post-process/label/stats pipeline.
13+
Use call_settings['steps'] to run any subset: ["post"], ["label"], ["stats"],
14+
or combinations like ["post","label"], ["label","stats"], ["post","stats"], ["post","label","stats"].
15+
If steps is None, legacy behavior uses the label_clusters/generate_stats booleans.
16+
"""
1017

11-
CANONICAL_NEEDS = ("df", 'vocabulary', "model_path",)
18+
CANONICAL_NEEDS = ("df", "vocabulary", "model_path")
1219

1320
def __init__(
1421
self,
@@ -21,62 +28,64 @@ def __init__(
2128
call_settings: Dict[str, Any] = None,
2229
**kw,
2330
) -> None:
24-
31+
2532
self.col = col
2633
default_init = {
27-
'clean_cols_name': self.col,
28-
'embedding_model': "SCINCL",
34+
"clean_cols_name": self.col,
35+
"embedding_model": "SCINCL",
2936
}
3037
default_call = {
31-
'ollama_model': "llama3.2:3b-instruct-fp16", # Language model used for semantic label generation
32-
'label_clusters': True, # Enable automatic labeling of clusters
33-
'generate_stats': True, # Generate cluster-level statistics
34-
'process_parents': True, # Propagate labels or stats upward through the hierarchy
35-
'skip_completed': True, # Skip processing of nodes already labeled/stored
36-
'label_criteria': { # Rules to filter generated labels
37-
"minimum words": 2,
38-
"maximum words": 6
39-
},
40-
'label_info': { # Additional metadata to associate with generated labels
41-
"source": "Science"
42-
},
43-
'number_of_labels': 5 # Number of candidate labels to generate per node
38+
"ollama_model": "llama3.2:3b-instruct-fp16", # Language model used for semantic label generation
39+
"label_clusters": True, # Back-compat: used when steps is None
40+
"generate_stats": True, # Back-compat: used when steps is None
41+
"process_parents": True,
42+
"skip_completed": True,
43+
"label_criteria": {"minimum words": 2, "maximum words": 6},
44+
"label_info": {"source": "Science"},
45+
"number_of_labels": 5,
46+
# NEW: choose subset explicitly; None keeps legacy boolean behavior
47+
# Examples: ["post"], ["label"], ["stats"], ["post","label"], ["label","stats"], ["post","stats"], ["post","label","stats"]
48+
"steps": None,
4449
}
4550

4651
super().__init__(
47-
needs = needs,
48-
provides = provides,
52+
needs=needs,
53+
provides=provides,
4954
init_settings=self._merge(default_init, init_settings),
5055
call_settings=self._merge(default_call, call_settings),
5156
tag=tag,
5257
**kw,
5358
)
5459

55-
5660
def run(self, bundle: DataBundle) -> None:
61+
# Resolve inputs
5762
df = self.load_path(bundle[self.needs[0]])
5863
vocabulary = self.load_path(bundle[self.needs[1]])
59-
6064
raw_model_path = str(bundle[self.needs[2]])
61-
# Try to resolve to an absolute path for traceability; fall back to the raw string.
65+
6266
try:
6367
resolved_model_path = str(Path(raw_model_path).expanduser().resolve())
6468
except Exception:
6569
resolved_model_path = raw_model_path
6670

71+
# Load HNMFk model
6772
model = HNMFk(experiment_name=raw_model_path)
6873
model.load_model()
6974

75+
# Run selected steps (order enforced inside ArcticFox)
7076
pipeline = ArcticFox(model=model, **self.init_settings)
71-
pipeline.run_full_pipeline(data_df=df, vocab=vocabulary, **self.call_settings)
77+
pipeline.run_full_pipeline(
78+
data_df=df,
79+
vocab=vocabulary,
80+
**self.call_settings
81+
)
7282

83+
# Write a lightweight status checkpoint
7384
status_value = "Done"
74-
7585
if SAVE_DIR_BUNDLE_KEY in bundle:
7686
out_dir = Path(bundle[SAVE_DIR_BUNDLE_KEY]) / self.tag
7787
out_dir.mkdir(parents=True, exist_ok=True)
7888
status_file = out_dir / "status.txt"
79-
# Include model path info in the checkpointed status file
8089
status_file.write_text(
8190
f"status: {status_value}\n"
8291
f"model_path: {raw_model_path}\n"
@@ -86,7 +95,3 @@ def run(self, bundle: DataBundle) -> None:
8695
self.register_checkpoint(self.provides[0], status_file)
8796

8897
bundle[f"{self.tag}.{self.provides[0]}"] = status_value
89-
90-
91-
92-
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import os, signal, subprocess, pathlib, shlex, time
2+
3+
class KernelTiedServer:
4+
def __init__(self, cmd, *, cwd=None, env=None, log_dir=None):
5+
"""
6+
cmd: list or string (string is shell-split)
7+
cwd: working dir
8+
env: dict of env vars
9+
log_dir: if set, stdout/stderr are appended to files here
10+
"""
11+
if isinstance(cmd, str):
12+
cmd = shlex.split(cmd)
13+
self.cmd = cmd
14+
self.cwd = cwd
15+
self.env = env
16+
self.log_dir = pathlib.Path(log_dir).expanduser() if log_dir else None
17+
self.proc = None
18+
self.watchdog = None
19+
self.stdout_f = None
20+
self.stderr_f = None
21+
22+
def start(self):
23+
if self.proc and self.running:
24+
raise RuntimeError("Already running")
25+
if self.log_dir:
26+
self.log_dir.mkdir(parents=True, exist_ok=True)
27+
self.stdout_f = open(self.log_dir / "server.out", "ab", buffering=0)
28+
self.stderr_f = open(self.log_dir / "server.err", "ab", buffering=0)
29+
30+
# 1) start the server in its own process group
31+
self.proc = subprocess.Popen(
32+
self.cmd,
33+
cwd=self.cwd,
34+
env=self.env,
35+
stdout=self.stdout_f or subprocess.DEVNULL,
36+
stderr=self.stderr_f or subprocess.DEVNULL,
37+
preexec_fn=os.setpgrp, # new PGID == proc.pid (POSIX)
38+
)
39+
40+
# 2) watchdog: when kernel PID disappears, kill the whole PGID
41+
kernel_pid = os.getpid()
42+
pgid = self.proc.pid
43+
script = f"""
44+
while kill -0 {kernel_pid} 2>/dev/null; do sleep 2; done
45+
kill -TERM -{pgid} 2>/dev/null
46+
sleep 5
47+
kill -KILL -{pgid} 2>/dev/null
48+
"""
49+
self.watchdog = subprocess.Popen(
50+
["bash", "-c", script],
51+
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
52+
)
53+
return self
54+
55+
@property
56+
def running(self):
57+
return self.proc is not None and self.proc.poll() is None
58+
59+
def status(self):
60+
if not self.proc:
61+
return "not started"
62+
rc = self.proc.poll()
63+
return f"running (pid {self.proc.pid}, pgid {self.proc.pid})" if rc is None else f"exited rc={rc}"
64+
65+
def stop(self, sig=signal.SIGTERM, hard_after=5):
66+
if not self.proc:
67+
return
68+
if self.running:
69+
try:
70+
os.killpg(self.proc.pid, sig)
71+
except ProcessLookupError:
72+
pass
73+
# optional hard kill after grace
74+
t0 = time.time()
75+
while self.running and time.time() - t0 < hard_after:
76+
time.sleep(0.2)
77+
if self.running:
78+
try:
79+
os.killpg(self.proc.pid, signal.SIGKILL)
80+
except ProcessLookupError:
81+
pass
82+
# stop watchdog
83+
if self.watchdog and self.watchdog.poll() is None:
84+
try:
85+
self.watchdog.terminate()
86+
except Exception:
87+
pass
88+
# close logs
89+
for f in (self.stdout_f, self.stderr_f):
90+
try:
91+
f and f.close()
92+
except Exception:
93+
pass
94+
95+
def tail(self, n=50, which="out"):
96+
if not self.log_dir:
97+
print("(no logs: set log_dir=...)")
98+
return
99+
path = self.log_dir / ( "server.out" if which=="out" else "server.err" )
100+
try:
101+
with open(path, "rb") as f:
102+
print(b"".join(f.readlines()[-n:]).decode(errors="replace"))
103+
except FileNotFoundError:
104+
print("(no log yet)")
105+
106+
# --- Example ---
107+
# srv = KernelTiedServer(["zsh", "../../Lynx/start_lynx.sh"], log_dir="~/.logs/lynx").start()
108+
# print(srv.status()); srv.tail(100) # view last 100 lines of stdout
109+
# srv.stop() # stop manually
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .KernelServer import KernelTiedServer

0 commit comments

Comments
 (0)