From 316217bd5c3cf0af0fcf7e9aa370d85f0039250e Mon Sep 17 00:00:00 2001 From: KashikaMahajan <61978407+kashikamahajan@users.noreply.github.com> Date: Mon, 13 Oct 2025 15:22:23 -0500 Subject: [PATCH 1/4] Add files via upload --- README.md | 180 ++++++++++++++++++++++++++++++++++++++- analytics.py | 213 +++++++++++++++++++++++++++++++++++++++++++++++ dashboard.py | 78 +++++++++++++++++ histogram.py | 183 ++++++++++++++++++++++++++++++++++++++++ hold_bucket.py | 191 ++++++++++++++++++++++++++++++++++++++++++ query.py | 112 +++++++++++++++++++++++++ requirements.txt | 6 ++ summarise.py | 95 +++++++++++++++++++++ utils.py | 5 ++ 9 files changed, 1062 insertions(+), 1 deletion(-) create mode 100644 analytics.py create mode 100644 dashboard.py create mode 100644 histogram.py create mode 100644 hold_bucket.py create mode 100644 query.py create mode 100644 requirements.txt create mode 100644 summarise.py create mode 100644 utils.py diff --git a/README.md b/README.md index 7c31fc4..fc17a9f 100644 --- a/README.md +++ b/README.md @@ -1 +1,179 @@ -initcommit +# CHTC Summer Research Facilitation Project + +**Fellow**: Kashika Mahajan +**Mentors**: Andrew Owen, Ian Ross +**Fellowship Dates**: May 19 – August 8, 2025 + +________ + + +## πŸ“š Background + +Researchers using HTCondor often struggle to quickly understand how their computational workloads (clusters of jobs) are performing. Current interfaces expose too much raw data, making it difficult, especially for less experienced usersβ€”to diagnose issues like jobs on hold, poor resource utilization, or unexpected failures. + +This project aimed to build tools that simplify job monitoring, flag issues, and offer meaningful insights into workload behavior using accessible metrics and clear visual feedback. + +________ + + +## πŸ“ Repository Structure + + - hold_classifer.py # Diagnoses held jobs and groups by hold reasons + - runtime_histogram.py # Plots job runtime distribution using ASCII histograms + - resource_usage_summary.py # Summarizes requested vs actual CPU, memory, disk + - cluster_status_dashboard.py # Prints status distribution of jobs in a cluster + - README.md # This file + +________ + + +## βš™οΈ Setup and Installation +1. Clone this repository: +2. Install all the packages in the requirements.txt +3. You must have access to: + - HTCondor Python bindings + - Elasticsearch (if querying historical job data) + ⚠️ Note: Some tools require authentication to the CHTC Elasticsearch instance, which is currently not available to general users. + +________ + +## πŸš€ Usage Instructions + +Each tool is meant to be run as a standalone Python script with a cluster ID as input. + +Example command: `python dashboard.py ` + +________ + + +## Features and Deliverables +1. Cluster Status Dashboard + Purpose: Quickly visualize job statuses (Idle, Running, Held, Completed) + Features: + - Combines data from both queue and history + - Highlights abnormal patterns using ASCII charts + +### Example: Cluster Status Dashboard Output +``` +Cluster 12345 Status Dashboard + + Status | Bar | Count | % +----------------------------------------------------------------------------------------- + Idle | β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ | 8686 | 27.0% + Running | | 433 | 1.3% + Removing | | 0 | 0.0% + Completed | | 0 | 0.0% + Held | β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ | 23067 | 71.7% +Transferring Output | | 0 | 0.0% + Suspended | | 0 | 0.0% + +``` + + +3. Cluster Runtime Histogram + Purpose: Understand runtime variance across jobs + Features: + - Binned by percentiles + - Flags jobs with runtime < 10 min + - Can print list of affected job IDs + + +### Example: Cluster Runtime Histogram Output + +image + + + +4. Hold Classifier + Purpose: Explain why jobs were held + Features: + - Clusters jobs by HoldReasonCode + HoldReasonSubCode + - Displays percentage and example reasons + - Includes human-readable legend of hold codes + +### Example: Hold Classifier Output + +``` + +Cluster ID: 12345 +Held Jobs in Cluster: 109 ++---------------------+-----------+--------------------------+---------------------------------------------------------+ +| Hold Reason Label | SubCode | % of Held Jobs (Count) | Example Reason | ++=====================+===========+==========================+=========================================================+ +| StartdHeldJob | 0 | 95.4% (104) | Job failed to complete in 72 hrs | ++---------------------+-----------+--------------------------+---------------------------------------------------------+ +| JobExecuteExceeded | 0 | 4.6% (5) | The job exceeded allowed execute duration of 3+00:00:00 | ++---------------------+-----------+--------------------------+---------------------------------------------------------+ + +Legend: +╒════════╀════════════════════╀═══════════════════════════════════════════════════════════════════════════╕ +β”‚ Code β”‚ Label β”‚ Reason β”‚ +β•žβ•β•β•β•β•β•β•β•β•ͺ════════════════════β•ͺ═══════════════════════════════════════════════════════════════════════════║ +β”‚ 21 β”‚ StartdHeldJob β”‚ The job was put on hold because WANT_HOLD in the machine policy was true. β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ 47 β”‚ JobExecuteExceeded β”‚ The job's allowed execution time was exceeded. β”‚ +β•˜β•β•β•β•β•β•β•β•β•§β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•§β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•› + +``` + + +5. Resource Utilization Report + Purpose: Compare requested vs actual usage + Features: + - Summarizes CPU, memory, and disk usage + - Adds flags for under (<15%) or over (>80%) utilization + - Includes bar chart and percentiles + + +### Example: Resource Utilization ReportOutput +``` +================================================================================ + HTCondor Cluster Resource Summary +================================================================================ + Cluster ID: 12345 + Job Count: 748 + Avg Runtime: 0:56:52 + + Requested Resources +================================================================================ +Memory (GiB) : + 0.49 GiB 1 job(s) + 12.0 GiB 1 job(s) + 50.0 GiB 746 job(s) + +Disk (GiB) : + 0.1 GiB 1 job(s) + 10.0 GiB 1 job(s) + 30.0 GiB 746 job(s) + +CPUs : + 1 2 job(s) + 8 746 job(s) + +GPUs : No data + Number Summary Table +================================================================================ +Resource (units) : Min Q1 Median Q3 Max StdDev +-------------------------------------------------------------------------------- +Memory Used (GiB) : 0.1 1.2 6.1 14.2 47.4 10.4 +Disk Used (GiB) : 0.0 0.8 0.8 0.8 1.1 0.1 +CPU Usage (%) : 0.0%% 32.1%% 35.8%% 44.8%% 85.5%% 11.0%% + + Overall Utilization +================================================================================ + Memory usage [β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ ] 12.2% + Disk usage [β–ˆ ] 2.6% + CPU usage [β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ ] 35.8% + + Efficiency Notes +================================================================================ + ⚠️ Memory usage is 12.2% + ⚠️ Disk usage is 2.6% + βœ… CPU usage is 35.8% + + End of Summary +================================================================================ + +``` + + diff --git a/analytics.py b/analytics.py new file mode 100644 index 0000000..7726eb2 --- /dev/null +++ b/analytics.py @@ -0,0 +1,213 @@ +import os +import sys +import csv +import statistics +from collections import Counter +from datetime import timedelta +from utils import safe_float + +""" +This program provides a report on the resource request and usage for a cluster + +""" + + +# to print the bar visualizations +def bar(pct, width=50): + filled = int(pct / 100 * width) + return "[" + "β–ˆ" * filled + " " * (width - filled) + f"] {pct:.1f}%" + +# to calculate efficiency +def efficiency(used, expected): + if not expected: + return 0.0 + return (used / expected) * 100 + +# to print the usage report +def compute_usage_summary(data, label, percentage=False, unit=None): + if not data or len(data) < 2: + return f"{label:<25}: Not enough data" + + data_sorted = sorted(data) + min_val = data_sorted[0] + q1 = statistics.quantiles(data_sorted, n=4)[0] + median = statistics.median(data_sorted) + q3 = statistics.quantiles(data_sorted, n=4)[2] + max_val = data_sorted[-1] + std_dev = statistics.stdev(data_sorted) + + fmt = "{:.1f}%%" if percentage else "{:.1f}" + return ( + f"{label:<25}: " + f"{fmt.format(min_val):>6} {fmt.format(q1):>6} {fmt.format(median):>7} " + f"{fmt.format(q3):>6} {fmt.format(max_val):>6} {fmt.format(std_dev):>6}" + ) + +# prints the resource request table +def print_resource_table(name, values, unit=""): + if not values: + print(f"{name:<15}: No data") + return + + counts = Counter(values) + print(f"{name:<15}:") + for val, count in sorted(counts.items()): + print(f"{'':<15} {val:<10} {unit:<5} {count} job(s)") + print() + +# prints the total report +def summarize(cluster_id): + script_dir = os.path.dirname(os.path.abspath(__file__)) + data_dir = os.path.join(script_dir, "cluster_data") + filepath = os.path.join(data_dir, f"cluster_{cluster_id}_jobs.csv") + + if not os.path.exists(filepath): + print(f"File not found: {filepath}") + sys.exit(1) + + with open(filepath, newline='', encoding='utf-8') as f: + jobs = list(csv.DictReader(f)) + + mem_requested, mem_used = [], [] + disk_requested, disk_used = [], [] + run_time, cpu_used_time = [], [] + runtimes = [] + cpu_requests = [] + gpu_requests = [] + + for job in jobs: + mem_req = safe_float(job.get("RequestMemory")) + mem_use = safe_float(job.get("ResidentSetSize_RAW")) + if mem_req: + mem_requested.append(round(mem_req / 1024, 2)) # Convert MiB to GiB + if mem_use: + mem_used.append(mem_use / 1024 / 1024) # Convert KiB to GiB + + disk_req = safe_float(job.get("RequestDisk")) + disk_use = safe_float(job.get("DiskUsage_RAW")) + if disk_req: + disk_requested.append(round(disk_req / (1024 * 1024), 2)) # Convert KiB to GiB + if disk_use: + disk_used.append(disk_use / (1024 * 1024)) # Convert KiB to GiB + + cpus = safe_float(job.get("RequestCpus")) + if cpus: + cpu_requests.append(int(cpus)) + + gpus = safe_float(job.get("RequestGpus")) + if gpus: + gpu_requests.append(int(gpus)) + + user_cpu = safe_float(job.get("RemoteUserCpu")) or 0 + sys_cpu = safe_float(job.get("RemoteSysCpu")) or 0 + wall_time = safe_float(job.get("RemoteWallClockTime")) + + if wall_time and cpus and (user_cpu or sys_cpu): + total_cpu_used = sys_cpu / cpus + cpu_used_time.append(total_cpu_used) + run_time.append(wall_time) + + if wall_time: + runtimes.append(wall_time) + + from statistics import median + + # Compute per-job efficiency lists + per_job_cpu_eff = [ + efficiency(cpu_used_time[i], run_time[i]) + for i in range(len(cpu_used_time)) + if run_time[i] + ] + + per_job_mem_eff = [ + efficiency(mem_used[i], mem_requested[i]) + for i in range(min(len(mem_used), len(mem_requested))) + if mem_requested[i] + ] + + per_job_disk_eff = [ + efficiency(disk_used[i], disk_requested[i]) + for i in range(min(len(disk_used), len(disk_requested))) + if disk_requested[i] + ] + + # Take medians + avg_cpu_eff = median(per_job_cpu_eff) if per_job_cpu_eff else 0 + avg_mem_eff = median(per_job_mem_eff) if per_job_mem_eff else 0 + avg_disk_eff = median(per_job_disk_eff) if per_job_disk_eff else 0 + + + total_jobs = len(jobs) + avg_runtime = statistics.mean(runtimes) if runtimes else 0 + avg_runtime_str = str(timedelta(seconds=int(avg_runtime))) if avg_runtime else "N/A" + + print("=" * 80) + print(f"{'HTCondor Cluster Resource Summary':^80}") + print("=" * 80) + print(f"{'Cluster ID':>20}: {cluster_id}") + print(f"{'Job Count':>20}: {total_jobs}") + print(f"{'Avg Runtime':>20}: {avg_runtime_str}") + print() + + print(f"{'Requested Resources':^80}") + print("=" * 80) + print_resource_table("Memory (GiB)", mem_requested, "GiB") + print_resource_table("Disk (GiB)", disk_requested, "GiB") + print_resource_table("CPUs", cpu_requests, "") + print_resource_table("GPUs", gpu_requests, "") + + print(f"{'Number Summary Table':^80}") + print("=" * 80) + print(f"{'Resource (units)':<25}: {'Min':>6} {'Q1':>6} {'Median':>7} {'Q3':>6} {'Max':>6} {'StdDev':>6}") + print("-" * 80) + + cpu_usages, mem_values, disk_values = [], [], [] + + for i in range(len(jobs)): + if i < len(cpu_used_time) and i < len(run_time) and run_time[i]: + cpu_usages.append(efficiency(cpu_used_time[i], run_time[i])) + if i < len(mem_used): + mem_values.append(mem_used[i]) + if i < len(disk_used): + disk_values.append(disk_used[i]) + + + print(compute_usage_summary(mem_values, "Memory Used (GiB)")) + print(compute_usage_summary(disk_values, "Disk Used (GiB)")) + print(compute_usage_summary(cpu_usages, "CPU Usage (%)", percentage=True)) + + + print() + + print(f"{'Overall Utilization':^80}") + print("=" * 80) + print(f" Memory usage {bar(avg_mem_eff)}") + print(f" Disk usage {bar(avg_disk_eff)}") + print(f" CPU usage {bar(avg_cpu_eff)}") + print() + + + # Gives human readable notes on the efficiency and also warnings + print(f"{'Efficiency Notes':^80}") + print("=" * 80) + + def warn(resource, efficiency): + if efficiency < 15 or efficiency > 80: + print(f" ⚠️ {resource} usage is {efficiency:.1f}%") + else: + print(f" βœ… {resource} usage is {efficiency:.1f}%") + + warn("Memory", avg_mem_eff) + warn("Disk", avg_disk_eff) + warn("CPU", avg_cpu_eff) + + + print() + print(f"{'End of Summary':^80}") + print("=" * 80) + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python htcondor_cluster_summary.py ") + sys.exit(1) + summarize(sys.argv[1]) diff --git a/dashboard.py b/dashboard.py new file mode 100644 index 0000000..c3ecc79 --- /dev/null +++ b/dashboard.py @@ -0,0 +1,78 @@ +import sys +import math +import htcondor +import classad + + +""" +This program provides an ASCII dashboard for the status of jobs in a cluster +""" + +# get data from the schedd +def fetch_counts(clusterId, job_states): + schedd = htcondor.Schedd() + counts = { state: 0 for state in job_states } + # history (finished jobs) + for ad in schedd.history( + constraint = f"ClusterId == {clusterId}", + projection = ["JobStatus"], + match = -1 + ): + counts[job_states[ad.eval("JobStatus")]] += 1 + # queue (running / pending jobs) + for ad in schedd.query( + constraint = f"ClusterId == {clusterId}", + projection = ["JobStatus"], + limit = -1 + ): + counts[job_states[ad.eval("JobStatus")-1]] += 1 + return counts + +#print the dashboard +def draw_bars(counts, job_states, bar_width=50): + # compute column widths + max_label_len = max(len(s) for s in job_states) + max_count = max(counts.values()) or 1 + count_width = len(str(max_count)) + per_width = len("100.0%") + total_count = sum(counts.values()) + + # header + header = ( + f"{'Status'.rjust(max_label_len)} | " + f"{'Bar'.ljust(bar_width)} | " + f"{'Count'.rjust(count_width)} | " + f"{'%'.rjust(per_width)}" + ) + print(header) + print("-" * len(header)) + + # rows + for state in job_states: + cnt = counts[state] + length = math.ceil( int(cnt / total_count * bar_width ) ) + bar = "β–ˆ" * length + per = cnt * 100 / total_count + + state_str = state.rjust(max_label_len) + bar_str = bar.ljust(bar_width) + cnt_str = str(cnt).rjust(count_width) + per_str = f"{per:5.1f}%".rjust(per_width) + + print(f"{state_str} | {bar_str} | {cnt_str} | {per_str}") + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python dashboard_once.py ") + sys.exit(1) + + clusterId = sys.argv[1] + job_states = [ + "Idle", "Running", "Removing", "Completed", + "Held", "Transferring Output", "Suspended" + ] + + counts = fetch_counts(clusterId, job_states) + print(f"\nCluster {clusterId} Status Dashboard\n") + draw_bars(counts, job_states) + diff --git a/histogram.py b/histogram.py new file mode 100644 index 0000000..ff3fd21 --- /dev/null +++ b/histogram.py @@ -0,0 +1,183 @@ +import sys +import os +import pandas as pd +import numpy as np +import subprocess +from datetime import datetime, timedelta + +""" +This program takes data from the cluster_data folder and gives an ASCII histogram +of the runtimes for a cluster. The runtimes are grouped by percentile range of the runtimes + +""" + +# Time limit for fast jobs, 600 by default +is_red = median_time < 600 + +# function to format seconds into human readable format +def format_seconds_human(seconds): + seconds = int(seconds) + if seconds == 0: + return "0s" + parts = [] + days, seconds = divmod(seconds, 86400) + hours, seconds = divmod(seconds, 3600) + minutes, seconds = divmod(seconds, 60) + if days: parts.append(f"{days}d") + if hours: parts.append(f"{hours}h") + if minutes: parts.append(f"{minutes}m") + if seconds: parts.append(f"{seconds}s") + return ' '.join(parts) + + +# function to format seconds into human relative format +def format_epoch_human_relative(epoch_seconds): + try: + event_time = datetime.fromtimestamp(int(epoch_seconds)) + now = datetime.now() + delta = now - event_time + + if delta < timedelta(minutes=1): + return "just now" + elif delta < timedelta(hours=1): + minutes = int(delta.total_seconds() // 60) + return f"{minutes} minute{'s' if minutes != 1 else ''} ago" + elif delta < timedelta(days=1): + hours = int(delta.total_seconds() // 3600) + return f"{hours} hour{'s' if hours != 1 else ''} ago" + elif delta < timedelta(days=7): + days = delta.days + return f"{days} day{'s' if days != 1 else ''} ago" + elif delta < timedelta(days=30): + weeks = delta.days // 7 + return f"{weeks} week{'s' if weeks != 1 else ''} ago" + else: + return event_time.strftime("%Y-%m-%d") + except: + return "N/A" + + + +# function to print the output +def histogram(cluster_id, df, percentiles=10, max_width=20, show_fast_jobs=False): + if df.empty or "RemoteWallClockTime" not in df.columns: + print("[WARN] No valid data to plot.") + return + + cluster_id = cluster_id + runtimes = df["RemoteWallClockTime"].astype(float).to_numpy() + cluster_ids = df["ClusterId"].astype(str).to_numpy() + proc_ids = df["ProcId"].astype(str).to_numpy() + + percentiles_list = np.linspace(0, 100, percentiles + 1) + bin_edges = np.percentile(runtimes, percentiles_list) + counts, _ = np.histogram(runtimes, bins=bin_edges) + max_count = counts.max() + + print("\nHistogram of Job Runtimes by Percentiles:\n") + print(f"ClusterId: {cluster_id}\n") + + # Show submission and completion times + if "QDate" in df.columns and "CompletionDate" in df.columns: + submit_times = df["QDate"].dropna().astype(float) + completion_times = df["CompletionDate"].dropna().astype(float) + + if not submit_times.empty: + cluster_submit_time = format_epoch_human_relative(submit_times.min()) + print(f"First Submitted : {cluster_submit_time}") + else: + print("First Submitted : N/A") + + if not completion_times.empty: + cluster_completion_time = format_epoch_human_relative(completion_times.max()) + print(f"Last Completed : {cluster_completion_time}") + else: + print("Last Completed : N/A") + + print("") + + + pct_width = 11 + label_width = 30 + count_width = 7 + RED = "\033[91m" + RESET = "\033[0m" + + header = ( + f"{'Percentile':<{pct_width}}" + f"{'Time Range':<{label_width}}" + f"| {'Histogram':<{max_width}}" + f" {'# Jobs':>{count_width}}" + ) + print(header) + print("-" * len(header)) + + jobs_under_10_min_median = 0 + fast_job_ids = [] + + for i in range(len(counts)): + left = bin_edges[i] + right = bin_edges[i + 1] + + in_bin_mask = (runtimes >= left) & (runtimes <= right) if i == len(counts) - 1 else (runtimes >= left) & (runtimes < right) + in_bin_times = runtimes[in_bin_mask] + in_bin_clusters = cluster_ids[in_bin_mask] + in_bin_procs = proc_ids[in_bin_mask] + + median_time = np.median(in_bin_times) if len(in_bin_times) > 0 else 0 + + + color = RED if is_red else "" + if is_red: + jobs_under_10_min_median += len(in_bin_times) + fast_job_ids.extend([f"{cid}.{pid}" for cid, pid in zip(in_bin_clusters, in_bin_procs)]) + + left_label = format_seconds_human(left) + right_label = format_seconds_human(right) + time_range = f"{left_label:>10} - {right_label:>10}".rjust(label_width) + + pct_start = int(percentiles_list[i]) + pct_end = int(percentiles_list[i + 1]) + pct_range = f"{pct_start:02}–{pct_end:02}%".ljust(pct_width) + + bar = "β–ˆ" * int((counts[i] / max_count) * max_width) + colored_bar = f"{color}{bar:<{max_width}}{RESET}" + + print(f"{pct_range}{time_range} | {colored_bar} {counts[i]:>{count_width}}") + + print(f"\n{RED}Note:{RESET} Bars in red represent bins with median runtime < 10 minutes.") + print(f"{RED}Info:{RESET} Total number of jobs in such bins: {jobs_under_10_min_median}") + + if show_fast_jobs and fast_job_ids: + print(f"\nList of Job IDs with median runtime < 10 minutes:") + print(", ".join(fast_job_ids)) + + +# function to get data from the folder or call the query +def load_data_for_cluster(cluster_id): + path = f"cluster_data/cluster_{cluster_id}_jobs.csv" + if not os.path.exists(path): + print(f"[INFO] CSV for ClusterId {cluster_id} not found. Attempting to run query.py...") + try: + subprocess.run([sys.executable, "query.py", cluster_id], check=True) + except subprocess.CalledProcessError as e: + print(f"[ERROR] query.py failed: {e}") + return None + try: + df = pd.read_csv(path) + return df + except Exception as e: + print(f"[ERROR] Could not load CSV: {e}") + return None + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python histogram.py [printList]") + sys.exit(1) + + cluster_id = sys.argv[1] + print_list_flag = sys.argv[2].lower() in ("true", "yes", "1") if len(sys.argv) > 2 else False + + df = load_data_for_cluster(cluster_id) + if df is not None: + histogram(cluster_id, df, percentiles=10, max_width=20, show_fast_jobs=print_list_flag) diff --git a/hold_bucket.py b/hold_bucket.py new file mode 100644 index 0000000..f44083d --- /dev/null +++ b/hold_bucket.py @@ -0,0 +1,191 @@ +import sys +import htcondor +import classad +from difflib import SequenceMatcher +from tabulate import tabulate + + +""" +This program buckets and tabulates the held jobs for a cluster + +""" +# Global variable to keep track of total jobs +total_jobs = 0 + +# Mapping of HoldReasonCodes to their explanations +HOLD_REASON_CODES = { + 1: {"label": "UserRequest", "reason": "The user put the job on hold with condor_hold."}, + 3: {"label": "JobPolicy", "reason": "The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true."}, + 4: {"label": "CorruptedCredential", "reason": "The credentials for the job are invalid."}, + 5: {"label": "JobPolicyUndefined", "reason": "A job policy expression evaluated to Undefined."}, + 6: {"label": "FailedToCreateProcess", "reason": "The condor_starter failed to start the executable."}, + 7: {"label": "UnableToOpenOutput", "reason": "The standard output file for the job could not be opened."}, + 8: {"label": "UnableToOpenInput", "reason": "The standard input file for the job could not be opened."}, + 9: {"label": "UnableToOpenOutputStream", "reason": "The standard output stream for the job could not be opened."}, + 10: {"label": "UnableToOpenInputStream", "reason": "The standard input stream for the job could not be opened."}, + 11: {"label": "InvalidTransferAck", "reason": "An internal HTCondor protocol error was encountered when transferring files."}, + 12: {"label": "TransferOutputError", "reason": "An error occurred while transferring job output files or self-checkpoint files."}, + 13: {"label": "TransferInputError", "reason": "An error occurred while transferring job input files."}, + 14: {"label": "IwdError", "reason": "The initial working directory of the job cannot be accessed."}, + 15: {"label": "SubmittedOnHold", "reason": "The user requested the job be submitted on hold."}, + 16: {"label": "SpoolingInput", "reason": "Input files are being spooled."}, + 17: {"label": "JobShadowMismatch", "reason": "A standard universe job is not compatible with the condor_shadow version available on the submitting machine."}, + 18: {"label": "InvalidTransferGoAhead", "reason": "An internal HTCondor protocol error was encountered when transferring files."}, + 19: {"label": "HookPrepareJobFailure", "reason": "_HOOK_PREPARE_JOB was defined but could not be executed or returned failure."}, + 20: {"label": "MissedDeferredExecutionTime", "reason": "The job missed its deferred execution time and therefore failed to run."}, + 21: {"label": "StartdHeldJob", "reason": "The job was put on hold because WANT_HOLD in the machine policy was true."}, + 22: {"label": "UnableToInitUserLog", "reason": "Unable to initialize job event log."}, + 23: {"label": "FailedToAccessUserAccount", "reason": "Failed to access user account."}, + 24: {"label": "NoCompatibleShadow", "reason": "No compatible shadow."}, + 25: {"label": "InvalidCronSettings", "reason": "Invalid cron settings."}, + 26: {"label": "SystemPolicy", "reason": "SYSTEM_PERIODIC_HOLD evaluated to true."}, + 27: {"label": "SystemPolicyUndefined", "reason": "The system periodic job policy evaluated to undefined."}, + 32: {"label": "MaxTransferInputSizeExceeded", "reason": "The maximum total input file transfer size was exceeded."}, + 33: {"label": "MaxTransferOutputSizeExceeded", "reason": "The maximum total output file transfer size was exceeded."}, + 34: {"label": "JobOutOfResources", "reason": "Memory usage exceeds a memory limit."}, + 35: {"label": "InvalidDockerImage", "reason": "Specified Docker image was invalid."}, + 36: {"label": "FailedToCheckpoint", "reason": "Job failed when sent the checkpoint signal it requested."}, + 43: {"label": "PreScriptFailed", "reason": "Pre script failed."}, + 44: {"label": "PostScriptFailed", "reason": "Post script failed."}, + 45: {"label": "SingularityTestFailed", "reason": "Test of singularity runtime failed before launching a job"}, + 46: {"label": "JobDurationExceeded", "reason": "The job's allowed duration was exceeded."}, + 47: {"label": "JobExecuteExceeded", "reason": "The job's allowed execution time was exceeded."}, + 48: {"label": "HookShadowPrepareJobFailure", "reason": "Prepare job shadow hook failed when it was executed; status code indicated job should be held."} +} + + +""" +Groups similar hold reason messages using fuzzy string matching (difflib.SequenceMatcher). +The default threshold has been kept as 0.7 as was found optimal by testing error messages + + Parameters: + reason_list (List[str]): List of textual hold reasons for the jobs. + subcodes (List[int]): Corresponding subcodes for the reasons. + threshold (float): Similarity ratio (between 0 and 1) above which reasons are considered similar. + + Returns: + List[List[Tuple[str, int]]]: A list of "buckets", where each bucket contains tuples of (reason, subcode) + that are textually similar to each other. +""" + +def bucket_reasons_with_subcodes(reason_list, subcodes, threshold=0.7): + buckets = [] + for reason, subcode in zip(reason_list, subcodes): + placed = False + for bucket in buckets: + ratio = SequenceMatcher(None, reason, bucket[0][0]).ratio() + if ratio >= threshold: + bucket.append((reason, subcode)) + placed = True + break + if not placed: + buckets.append([(reason, subcode)]) + return buckets + + + +""" +Queries the HTCondor schedd for held jobs in the specified cluster and groups them by their HoldReasonCode. +The function then sends the groups of jobs with the same code to be bucketed by string similarity in bucket_reasons_with_subcodes() + + Parameters: + cluster_id (str or int): The ID of the cluster to analyze. + + Returns: + Dict[int, List[Tuple[str, int]]]: A dictionary mapping each HoldReasonCode to a list of (HoldReason, HoldReasonSubCode) tuples. +""" +def group_by_code(cluster_id): + global total_jobs + schedd = htcondor.Schedd() + reasons_by_code = {} + + jobs_q = schedd.query(constraint=f"ClusterId == {cluster_id}") + total_jobs = len(jobs_q) + + for ad in schedd.query( + constraint=f"ClusterId == {cluster_id} && JobStatus == 5", + projection=["HoldReasonCode", "HoldReason", "HoldReasonSubCode"], + limit=-1 + ): + code = ad.eval("HoldReasonCode") + subcode = ad.eval("HoldReasonSubCode") + + reason = ad.eval("HoldReason").split('. ')[0] + if "Error from" in reason and ": " in reason: + parts = reason.split(": ", 1) + if len(parts) == 2: + reason = parts[1] + + reasons_by_code.setdefault(code, []).append((reason, subcode)) + + return reasons_by_code + + + + + +""" +Processes grouped hold reasons and prints: + - A table summarizing the percentage of jobs held for each bucketed reason. + - A legend explaining each HoldReasonCode. +The function also take the HoldReason string and stores only the error message. + +NOTE: The information about the slots which sent the error which can be a future feature + + Parameters: + reasons_by_code (Dict[int, List[Tuple[str, int]]]): Dictionary grouping hold reasons by HoldReasonCode. + cluster_id (str or int): The cluster ID being analyzed. + +""" +def bucket_and_print_table(reasons_by_code, cluster_id): + print() + print("Cluster ID:", cluster_id) + + held_jobs = sum(len(pairs) for pairs in reasons_by_code.values()) + print("Held Jobs in Cluster:", held_jobs) + + example_rows = [] + seen_codes = set() + + for code, pairs in reasons_by_code.items(): + reasons, subcodes = zip(*pairs) + label = HOLD_REASON_CODES.get(code, {}).get("label", f"Code {code}") + seen_codes.add(code) + buckets = bucket_reasons_with_subcodes(reasons, subcodes) + for bucket in buckets: + example_reason, subcode = bucket[0] + percent = (len(bucket) / held_jobs) * 100 if held_jobs > 0 else 0 + example_rows.append([label, subcode, f"{percent:.1f}% ({len(bucket)})", example_reason]) + + headers = ["Hold Reason Label", "SubCode", "% of Held Jobs (Count)", "Example Reason"] + print(tabulate(example_rows, headers=headers, tablefmt="grid")) + + print("\nLegend:") + legend = [] + for code in sorted(seen_codes): + entry = HOLD_REASON_CODES.get(code, {}) + legend.append([code, entry.get("label", "Unknown"), entry.get("reason", "No description available.")]) + print(tabulate(legend, headers=["Code", "Label", "Reason"], tablefmt="fancy_grid")) + + + + + +""" +Main Execution Block: + - Parses the cluster ID from command-line arguments. + - Calls `group_by_code()` to gather held job reasons. + - Passes the results to `bucket_and_print_table()` to display the report. + +Example: + $ python condor_hold_bucket.py 123456 +""" +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python hold_bucket.py ") + sys.exit(1) + + cluster_id = sys.argv[1] + reasons_by_code = group_by_code(cluster_id) + bucket_and_print_table(reasons_by_code, cluster_id) + diff --git a/query.py b/query.py new file mode 100644 index 0000000..da014e9 --- /dev/null +++ b/query.py @@ -0,0 +1,112 @@ +import os +import csv +import sys +import elasticsearch + + +""" +Elasticsearch Cluster Job Dumper + +This script connects to the CHTC Elasticsearch server, fetches all jobs +for a given ClusterId (and optionally, a specific User), using the Scroll API +(to handle large result sets), and writes them into a CSV file inside the +'cluster_data/' directory. + +Usage: + query.py [User] + +NOTE: You need authentication to access data from the Elasticsearch database, that is why the ES_USER and ES_PASS are blanked + +""" + +# Constants +ES_HOST = "https://elastic.osg.chtc.io/q" +ES_INDEX = "adstash-ospool-job-history-*" +MAX_RESULTS = 1000000 +SCROLL_DURATION = "5m" + +#Authenticaion to be filled +ES_USER = "*****" +ES_PASS = "************" + +def connect_to_elasticsearch(): + es = elasticsearch.Elasticsearch(ES_HOST, http_auth=(ES_USER, ES_PASS)) + if not es.ping(): + print("Error: Failed to connect to Elasticsearch.") + sys.exit(1) + return es + +def build_query(cluster_id, user=None): + filters = [{"match": {"ClusterId": cluster_id}}] + if user: + filters.append({"match": {"Owner": user}}) + + return { + "query": { + "bool": { + "must": filters + } + } + } + +def main(): + if len(sys.argv) < 2: + print("Usage: python dump_cluster_jobs.py [User]") + sys.exit(1) + + try: + cluster_id = int(sys.argv[1]) + except ValueError: + print("Error: ClusterId must be an integer.") + sys.exit(1) + + user = sys.argv[2] if len(sys.argv) > 2 else None + + es = connect_to_elasticsearch() + query = build_query(cluster_id, user) + + # Start scroll + response = es.search(index=ES_INDEX, body=query, scroll=SCROLL_DURATION) + scroll_id = response['_scroll_id'] + hits = response['hits']['hits'] + + all_hits = [] + fieldnames = set() + + while hits and len(all_hits) < MAX_RESULTS: + remaining = MAX_RESULTS - len(all_hits) + to_add = hits[:remaining] + all_hits.extend(to_add) + + for hit in to_add: + fieldnames.update(hit['_source'].keys()) + + if len(all_hits) >= MAX_RESULTS: + break + + response = es.scroll(scroll_id=scroll_id, scroll=SCROLL_DURATION) + scroll_id = response['_scroll_id'] + hits = response['hits']['hits'] + + # Output directory and file + output_dir = os.path.join(os.getcwd(), "cluster_data") + os.makedirs(output_dir, exist_ok=True) + + user_suffix = f"_{user}" if user else "" + csv_filename = os.path.join(output_dir, f"cluster_{cluster_id}{user_suffix}_jobs.csv") + + print(f"πŸ“‚ Writing to: {csv_filename}") + + with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=sorted(fieldnames)) + writer.writeheader() + for hit in all_hits: + writer.writerow(hit['_source']) + + print(f"Dumped {len(all_hits)} jobs for ClusterId {cluster_id}" + (f" and user '{user}'" if user else "") + f" to {csv_filename}") + + # Clean up scroll + es.clear_scroll(scroll_id=scroll_id) + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b77e8f1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +pandas +numpy +tabulate +elasticsearch +htcondor +classad diff --git a/summarise.py b/summarise.py new file mode 100644 index 0000000..a374bab --- /dev/null +++ b/summarise.py @@ -0,0 +1,95 @@ +import os +import csv +import sys +from tabulate import tabulate + +""" +This Python script prints a summary table for a given cluster's jobs +using selected parameters from the corresponding CSV in 'cluster_data/'. +It supports command-line parameter selection and auto-converts RAW values to GiB. +""" + +# finds the cluster data from the folder based on the clusterId +def load_job_data(cluster_id, folder="cluster_data"): + filepath = os.path.join(folder, f"cluster_{cluster_id}_jobs.csv") + if not os.path.exists(filepath): + print(f"File not found: {filepath}") + sys.exit(1) + + with open(filepath, newline='', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile) + jobs = list(reader) + + return jobs + +# safe conversion to float +def safe_float(val): + try: + return float(val) + except (ValueError, TypeError): + return None + +# check if all selected parameters exist in the CSV header +def validate_params(jobs, selected_params): + if not jobs: + return + + csv_keys = jobs[0].keys() + missing = [param for param in selected_params if param not in csv_keys] + + if missing: + print(f"Error: The following parameters were not found in the CSV data: {', '.join(missing)}") + print(f"Available columns: {', '.join(csv_keys)}") + sys.exit(1) + +# extract selected parameter values for each job +def extract_requested_vs_used(jobs, selected_params): + rows = [] + for job in jobs: + row = { + "ProcId": job.get("ProcId") + } + for param in selected_params: + raw_val = job.get(param) + val = safe_float(raw_val) + + # Use original value if not a float + if val is None: + val = raw_val + elif "RAW" in param: + val = val / 1024 # Convert MiB to GiB + + row[param] = val + + rows.append(row) + return rows + +# main execution logic +def main(): + if len(sys.argv) < 2: + print("Usage: python summarise.py [param1 param2 ...]") + sys.exit(1) + + cluster_id = sys.argv[1] + + # default list of attributes to summarise if none have been passed + selected_params = sys.argv[2:] if len(sys.argv) > 2 else [ + "RequestCpus", "CpusProvisioned", "RemoteSysCpu", + "RemoteUserCpu", "RemoteWallClockTime", "ResidentSetSize_RAW" + ] + + jobs = load_job_data(cluster_id) + validate_params(jobs, selected_params) + data = extract_requested_vs_used(jobs, selected_params) + total_jobs = len(data) + + if not data: + print("No valid job data found.") + return + + print(f"\nSummary for ClusterId: {cluster_id}") + print(f"Total Jobs: {total_jobs}\n") + print(tabulate(data, headers="keys", tablefmt="grid", floatfmt=".2f")) + +if __name__ == "__main__": + main() diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..123eafe --- /dev/null +++ b/utils.py @@ -0,0 +1,5 @@ +def safe_float(val): + try: + return float(val) + except (ValueError, TypeError): + return None From 5cc2aa4802eba0f8f0d55b4cb9034fd1f224106a Mon Sep 17 00:00:00 2001 From: Kashika Mahajan Date: Mon, 13 Oct 2025 15:39:00 -0500 Subject: [PATCH 2/4] test --- test | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 test diff --git a/test b/test new file mode 100644 index 0000000..e69de29 From 7953ec79bb6f6cc100c5c67d30e82903053dd7b9 Mon Sep 17 00:00:00 2001 From: Ian Ross Date: Tue, 14 Oct 2025 13:25:48 -0500 Subject: [PATCH 3/4] Delete test --- test | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 test diff --git a/test b/test deleted file mode 100644 index e69de29..0000000 From b390108904a5990e80a455a22b17a067bdb23d4e Mon Sep 17 00:00:00 2001 From: aowen-uwmad Date: Tue, 28 Oct 2025 15:16:31 -0500 Subject: [PATCH 4/4] Add note about internship --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index fc17a9f..451694e 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ **Mentors**: Andrew Owen, Ian Ross **Fellowship Dates**: May 19 – August 8, 2025 +Work on this project is continuing Fall 2025 as internship. + ________