Skip to content
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 79 additions & 17 deletions luxonis_ml/tracker/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,12 @@ def __init__(
self.is_sweep = is_sweep
self.rank = rank
self.local_logs = {
"metrics": [],
"metric": [],
"params": {},
"images": [],
"artifacts": [],
"matrices": [],
"metrics": [],
}
self.mlflow_initialized = False

Expand Down Expand Up @@ -171,32 +173,44 @@ def log_to_mlflow(self, log_fn: Callable, *args, **kwargs) -> None:

def store_log_locally(self, log_fn: Callable, *args, **kwargs) -> None:
"""Stores log data locally if logging to MLflow fails."""
# Checking functions without reinitializing experiment
if log_fn == self.log_metric:
self.local_logs["metrics"].append(
# Checking functions without triggering reconnections.
if log_fn == self._experiment["mlflow"].log_metric:
self.local_logs["metric"].append(
{"name": args[0], "value": args[1], "step": args[2]}
)
elif log_fn == self.log_hyperparams:
elif log_fn == self._experiment["mlflow"].log_metrics:
self.local_logs["metrics"].append(
{"metrics": args[0], "step": args[1]}
)
elif log_fn == self._experiment["mlflow"].log_params:
self.local_logs["params"].update(args[0])
elif log_fn == self.log_image:
elif log_fn == self._experiment["mlflow"].log_image:
self.local_logs["images"].append(
{"image_data": args[0], "name": args[1]}
)
elif log_fn == self.upload_artifact:
self.local_logs["artifacts"].append(
{"path": str(args[0]), "name": args[1], "type": args[2]}
)
elif log_fn == self._experiment["mlflow"].log_dict:
self.local_logs["matrices"].append(
{"matrix": args[0], "name": args[1]}
)

def log_stored_logs_to_mlflow(self):
"""Attempts to log any data stored in local_logs to MLflow."""
if not self.mlflow_initialized or not any(self.local_logs.values()):
return

try:
for metric in self.local_logs["metrics"]:
for metric in self.local_logs["metric"]:
self._experiment["mlflow"].log_metric(
metric["name"], metric["value"], metric["step"]
)
for metrics in self.local_logs["metrics"]:
self._experiment["mlflow"].log_metrics(
metrics["metrics"], metrics["step"]
)
if self.local_logs["params"]:
self._experiment["mlflow"].log_params(
self.local_logs["params"]
Expand All @@ -209,20 +223,26 @@ def log_stored_logs_to_mlflow(self):
self.upload_artifact(
Path(artifact["path"]), artifact["name"], artifact["type"]
)
for matrix in self.local_logs["matrices"]:
self._experiment["mlflow"].log_dict(
matrix["matrix"], matrix["name"]
)

self.local_logs = {
"metrics": [],
"params": {},
"images": [],
"artifacts": [],
"matrices": [],
"metric": [],
}
logger.info("Successfully re-logged stored logs to MLflow.")
except Exception as e:
logger.warning(f"Failed to re-log stored logs to MLflow: {e}")

def save_logs_locally(self):
"""Saves metrics, parameters, and artifacts to JSON and images
to separate files."""
"""Saves metrics, parameters, images, artifacts, and matrices
locally."""
run_dir = Path(self.save_directory) / self.run_name
image_dir = run_dir / "images"
artifact_dir = run_dir / "artifacts"
Expand All @@ -238,20 +258,25 @@ def save_logs_locally(self):
)
img["image_data"] = img_path # Replace data with path

# Save artifacts to local storage directory
for artifact in self.local_logs["artifacts"]:
artifact_path = Path(artifact["path"])
if artifact_path.exists():
local_path = artifact_dir / artifact_path.name
local_path.write_bytes(artifact_path.read_bytes())
artifact["path"] = str(local_path)

# Save logs to JSON file
with open(run_dir / "local_logs.json", "w") as f:
json.dump(
{
k: self.local_logs[k]
for k in ["metrics", "params", "images", "artifacts"]
for k in [
"metrics",
"metric",
"params",
"images",
"artifacts",
"matrices",
]
},
f,
)
Expand Down Expand Up @@ -381,7 +406,7 @@ def log_hyperparams(
if self.is_wandb:
self.experiment["wandb"].config.update(params)
if self.is_mlflow:
self.log_to_mlflow(self.experiment["mlflow"].log_params, params)
self.log_to_mlflow(self._experiment["mlflow"].log_params, params)

@rank_zero_only
def log_metric(self, name: str, value: float, step: int) -> None:
Expand All @@ -405,7 +430,7 @@ def log_metric(self, name: str, value: float, step: int) -> None:

if self.is_mlflow:
self.log_to_mlflow(
self.experiment["mlflow"].log_metric, name, value, step
self._experiment["mlflow"].log_metric, name, value, step
)

@rank_zero_only
Expand All @@ -424,7 +449,7 @@ def log_metrics(self, metrics: Dict[str, float], step: int) -> None:
self.experiment["wandb"].log(metrics)
if self.is_mlflow:
self.log_to_mlflow(
self.experiment["mlflow"].log_metrics, metrics, step
self._experiment["mlflow"].log_metrics, metrics, step
)

@rank_zero_only
Expand Down Expand Up @@ -455,7 +480,7 @@ def log_image(self, name: str, img: np.ndarray, step: int) -> None:
base_path, img_caption = name.rsplit("/", 1)
img_path = f"{base_path}/{step}/{img_caption}.png"
self.log_to_mlflow(
self.experiment["mlflow"].log_image, img, img_path
self._experiment["mlflow"].log_image, img, img_path
)

@rank_zero_only
Expand Down Expand Up @@ -491,7 +516,7 @@ def upload_artifact(
fs.put_file(
local_path=path,
remote_path=name or path.name,
mlflow_instance=self.experiment.get("mlflow"),
mlflow_instance=self._experiment.get("mlflow"),
)
except Exception as e:
logger.warning(f"Failed to upload artifact to MLflow: {e}")
Expand All @@ -500,6 +525,43 @@ def upload_artifact(
) # Stores details for retrying later
self.log_stored_logs_to_mlflow()

@rank_zero_only
def log_matrix(self, matrix: np.ndarray, name: str, step: int) -> None:
"""Logs matrix to the logging service.

@type matrix: np.ndarray
@param matrix: The confusion matrix to log.
@type name: str
@param name: The name used to log the matrix.
@type step: int
@param step: The current step.
"""
if self.is_mlflow:
matrix_data = {
"flat_array": matrix.flatten().tolist(),
"shape": matrix.shape,
}
self.log_to_mlflow(
self._experiment["mlflow"].log_dict,
matrix_data,
f"{name}.json",
)

if self.is_tensorboard:
matrix_str = np.array2string(matrix, separator=", ")
self.experiment["tensorboard"].add_text(name, matrix_str, step)

if self.is_wandb:
import wandb

table = wandb.Table(
columns=["Row Index"]
+ [f"Col {i}" for i in range(matrix.shape[1])]
)
for i, row in enumerate(matrix):
table.add_data(i, *row)
self.experiment["wandb"].log({f"{name}_table": table}, step=step)

@rank_zero_only
def log_images(self, imgs: Dict[str, np.ndarray], step: int) -> None:
"""Logs multiple images.
Expand Down
Loading