Skip to content

FLINK-5725: Add extra Flink details to paasta status #4063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
8ec0c76
FLINK-5725: Add extra Flink details to paasta status
nleigh May 13, 2025
fab0eb5
FLINK-5725: Add extra Flink details to paasta status: git repo's
nleigh May 13, 2025
28381c5
FLINK-5725: Add extra Flink details to paasta status: log commands
nleigh May 13, 2025
1fb40c6
FLINK-5725: Add extra Flink details to paasta status: flink monitoring
nleigh May 13, 2025
56cdcc5
FLINK-5725: Add SUPPERREGION_TO_ECOSYSTEM_MAPPINGS
nleigh May 14, 2025
02c4363
FLINK-5725: Update souregraph to github link
nleigh May 14, 2025
7806ecf
FLINK-5725: Add owner information to flink paasta status verbose
nleigh May 14, 2025
ba32de6
FLINK-5725: Refactors
nleigh May 14, 2025
2a65cc6
FLINK-5725: Add flink pool information to flink paasta status verbose
nleigh May 14, 2025
fa93667
FLINK-5725: Add runbook information to flink paasta status verbose
nleigh May 14, 2025
16a304f
FLINK-5725: Fix missing return statement error
nleigh May 15, 2025
06319d8
FLINK-5725: Add Flink cost link to paasta status -v
nleigh May 16, 2025
7357296
Merge branch 'master' into u/nathanleigh/FLINK-5725/AddMoreDetailsToF…
nleigh May 23, 2025
024fdcc
FLINK-5725: Update yelp region -> ecosystem mapping logic
nleigh May 30, 2025
1718cb1
Merge branch 'master' into u/nathanleigh/FLINK-5725/AddMoreDetailsToF…
nleigh May 30, 2025
ce4de9f
FLINK-5725: Downgrade environment-tools
nleigh May 30, 2025
141a8ae
Merge remote-tracking branch 'origin/u/nathanleigh/FLINK-5725/AddMore…
nleigh May 30, 2025
d78be12
FLINK-5725: Mock convert_location_type return
nleigh May 30, 2025
2720594
FLINK-5725: Use 'fake-cluster' name in tests
nleigh Jun 4, 2025
d42bc6e
Update requirements-minimal.txt
nleigh Jun 4, 2025
c48c05b
FLINK-5725: Use existing helper functions
nleigh Jun 5, 2025
379592a
FLINK-5725: Use existing helper functions 2
nleigh Jun 5, 2025
8f16b33
Update paasta_tools/cli/cmds/status.py
nleigh Jun 5, 2025
4cf48ba
Merge branch 'master' into u/nathanleigh/FLINK-5725/AddMoreDetailsToF…
nleigh Jun 5, 2025
1e89fd5
Update paasta_tools/cli/cmds/status.py
nleigh Jun 5, 2025
68b42b6
FLINK-5725: Refactors and remove try/exception
nleigh Jun 5, 2025
da0226f
FLINK-5725: Move ecosytem function to utils
nleigh Jun 5, 2025
d7ac273
FLINK-5725: Fix tox issues
nleigh Jun 5, 2025
994e4d6
Update paasta_tools/flink_tools.py
nleigh Jun 5, 2025
33b03fa
Update paasta_tools/utils.py
nleigh Jun 5, 2025
022b9c4
FLINK-5725: Rename fake-cluster to fake_cluster
nleigh Jun 5, 2025
ffab533
Merge remote-tracking branch 'origin/u/nathanleigh/FLINK-5725/AddMore…
nleigh Jun 5, 2025
42fc583
Update paasta_tools/utils.py
nleigh Jun 6, 2025
571a4cd
Update paasta_tools/utils.py
nleigh Jun 6, 2025
377584d
Update paasta_tools/utils.py
nleigh Jun 6, 2025
22d2921
Update paasta_tools/cli/cmds/status.py
nleigh Jun 6, 2025
dbc5e66
Update paasta_tools/utils.py
nleigh Jun 6, 2025
e0388bb
FLINK-5725: Update test mock
nleigh Jun 6, 2025
4a93fdf
FLINK-5725: Fix tox issues
nleigh Jun 6, 2025
7138c66
Merge branch 'master' into u/nathanleigh/FLINK-5725/AddMoreDetailsToF…
nleigh Jun 10, 2025
399c0ad
Update paasta_tools/cli/cmds/status.py
nleigh Jun 10, 2025
b71546c
Update tests/test_utils.py
nleigh Jun 10, 2025
d5e6765
Update tests/test_utils.py
nleigh Jun 10, 2025
5bf589d
Update paasta_tools/cli/cmds/status.py
nleigh Jun 10, 2025
d946ba3
FLINK-5725: Fix indentation + refactor
nleigh Jun 10, 2025
c1792da
FLINK-5725: Remove if statement check
nleigh Jun 10, 2025
4ba30b3
FLINK-5725: Fix tests by populating flink_instance_config
nleigh Jun 10, 2025
55b4371
Update tests/test_utils.py
nleigh Jun 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 116 additions & 5 deletions paasta_tools/cli/cmds/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@
from paasta_tools.flink_tools import get_flink_config_from_paasta_api_client
from paasta_tools.flink_tools import get_flink_jobs_from_paasta_api_client
from paasta_tools.flink_tools import get_flink_overview_from_paasta_api_client
from paasta_tools.flink_tools import load_flink_instance_config
from paasta_tools.flinkeks_tools import FlinkEksDeploymentConfig
from paasta_tools.flinkeks_tools import load_flinkeks_instance_config
from paasta_tools.kafkacluster_tools import KafkaClusterDeploymentConfig
from paasta_tools.kubernetes_tools import format_pod_event_messages
from paasta_tools.kubernetes_tools import format_tail_lines_for_kubernetes_pod
from paasta_tools.kubernetes_tools import KubernetesDeploymentConfig
from paasta_tools.kubernetes_tools import KubernetesDeployStatus
from paasta_tools.kubernetes_tools import paasta_prefixed
from paasta_tools.monitoring_tools import get_runbook
from paasta_tools.monitoring_tools import get_team
from paasta_tools.monitoring_tools import list_teams
from paasta_tools.paasta_service_config_loader import PaastaServiceConfigLoader
Expand Down Expand Up @@ -763,19 +766,28 @@ def append_pod_status(pod_status, output: List[str]):
output.extend([f" {line}" for line in pods_table])


OUTPUT_HORIZONTAL_RULE = (
"=================================================================="
)


def _print_flink_status_from_job_manager(
service: str,
instance: str,
cluster: str,
output: List[str],
flink: Mapping[str, Any],
client: PaastaOApiClient,
system_paasta_config: "SystemPaastaConfig",
flink_instance_config: FlinkDeploymentConfig,
verbose: int,
) -> int:
status = flink.get("status")
if status is None:
output.append(PaastaColors.red(" Flink cluster is not available yet"))
return 1

# Print Flink Config SHA
# Since metadata should be available no matter the state, we show it first. If this errors out
# then we cannot really do much to recover, because cluster is not in usable state anyway
metadata = flink.get("metadata")
Expand All @@ -785,9 +797,32 @@ def _print_flink_status_from_job_manager(
raise ValueError(f"expected config sha on Flink, but received {metadata}")
if config_sha.startswith("config"):
config_sha = config_sha[6:]

output.append(f" Config SHA: {config_sha}")

if verbose:
# Print Flink repo links
output.append(f" Repo(git): https://github.yelpcorp.com/services/{service}")
output.append(
f" Repo(sourcegraph): https://sourcegraph.yelpcorp.com/services/{service}"
)

# Print Flink Pool information
flink_pool = flink_instance_config.get_pool()
output.append(f" Flink Pool: {flink_pool}")

# Print ownership information
flink_monitoring_team = flink_instance_config.get_team() or get_team(
overrides={}, service=service, soa_dir=DEFAULT_SOA_DIR
)
output.append(f" Owner: {flink_monitoring_team}")

# Print rb information
flink_rb_for_instance = flink_instance_config.get_runbook() or get_runbook(
overrides={}, service=service, soa_dir=DEFAULT_SOA_DIR
)
output.append(f" Flink Runbook: {flink_rb_for_instance}")

# Print Flink Version
if status["state"] == "running":
try:
flink_config = get_flink_config_from_paasta_api_client(
Expand All @@ -805,17 +840,67 @@ def _print_flink_status_from_job_manager(
else:
output.append(f" Flink version: {flink_config.flink_version}")

# Print Flink Dashboard URL
# Annotation "flink.yelp.com/dashboard_url" is populated by flink-operator
dashboard_url = metadata["annotations"].get("flink.yelp.com/dashboard_url")
output.append(f" URL: {dashboard_url}/")

if verbose:
# Print Flink config link resources
ecosystem = system_paasta_config.get_ecosystem_for_cluster(cluster)
output.append(
f" Yelpsoa configs: https://github.yelpcorp.com/sysgit/yelpsoa-configs/tree/master/{service}"
)
output.append(
f" Srv configs: https://github.yelpcorp.com/sysgit/srv-configs/tree/master/ecosystem/{ecosystem}/{service}"
)

output.append(f"{OUTPUT_HORIZONTAL_RULE}")

# Print Flink Log Commands
output.append(f" Flink Log Commands:")
output.append(
f" Service: paasta logs -a 1h -c {cluster} -s {service} -i {instance}"
)
output.append(
f" Taskmanager: paasta logs -a 1h -c {cluster} -s {service} -i {instance}.TASKMANAGER"
)
output.append(
f" Jobmanager: paasta logs -a 1h -c {cluster} -s {service} -i {instance}.JOBMANAGER"
)
output.append(
f" Supervisor: paasta logs -a 1h -c {cluster} -s {service} -i {instance}.SUPERVISOR"
)

output.append(f"{OUTPUT_HORIZONTAL_RULE}")

# Print Flink Metrics Links
output.append(f" Flink Monitoring:")
output.append(
f" Job Metrics: https://grafana.yelpcorp.com/d/flink-metrics/flink-job-metrics?orgId=1&var-datasource=Prometheus-flink&var-region=uswest2-{ecosystem}&var-service={service}&var-instance={instance}&var-job=All&from=now-24h&to=now"
)
output.append(
f" Container Metrics: https://grafana.yelpcorp.com/d/flink-container-metrics/flink-container-metrics?orgId=1&var-datasource=Prometheus-flink&var-region=uswest2-{ecosystem}&var-service={service}&var-instance={instance}&from=now-24h&to=now"
)
output.append(
f" JVM Metrics: https://grafana.yelpcorp.com/d/flink-jvm-metrics/flink-jvm-metrics?orgId=1&var-datasource=Prometheus-flink&var-region=uswest2-{ecosystem}&var-service={service}&var-instance={instance}&from=now-24h&to=now"
)

# Print Flink Costs Link
output.append(
f" Flink Cost: https://splunk.yelpcorp.com/en-US/app/yelp_computeinfra/paasta_service_utilization?form.service={service}&form.field1.earliest=-30d%40d&form.field1.latest=now&form.instance={instance}&form.cluster={cluster}"
)

output.append(f"{OUTPUT_HORIZONTAL_RULE}")

# Print Flink Cluster State
color = PaastaColors.green if status["state"] == "running" else PaastaColors.yellow
output.append(f" State: {color(status['state'].title())}")

# Print Flink Cluster Pod Info
pod_running_count = pod_evicted_count = pod_other_count = 0
# default for evicted in case where pod status is not available
evicted = f"{pod_evicted_count}"

for pod in status["pod_status"]:
if pod["phase"] == "Running":
pod_running_count += 1
Expand Down Expand Up @@ -987,8 +1072,22 @@ def print_flink_status(
)
return 1

flink_instance_config = load_flink_instance_config(
service=service,
instance=instance,
cluster=cluster,
)

return _print_flink_status_from_job_manager(
service, instance, output, flink, client, verbose
service,
instance,
cluster,
output,
flink,
client,
system_paasta_config,
flink_instance_config,
verbose,
)


Expand All @@ -1013,9 +1112,21 @@ def print_flinkeks_status(
)
)
return 1

flink_eks_instance_config = load_flinkeks_instance_config(
service=service,
instance=instance,
cluster=cluster,
)
return _print_flink_status_from_job_manager(
service, instance, output, flink, client, verbose
service,
instance,
cluster,
output,
flink,
client,
system_paasta_config,
flink_eks_instance_config,
verbose,
)


Expand Down
18 changes: 18 additions & 0 deletions paasta_tools/flink_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class TaskManagerConfig(TypedDict, total=False):

class FlinkDeploymentConfigDict(LongRunningServiceConfigDict, total=False):
taskmanager: TaskManagerConfig
spot: bool


class FlinkDeploymentConfig(LongRunningServiceConfig):
Expand Down Expand Up @@ -111,6 +112,23 @@ def validate(
def get_replication_crit_percentage(self) -> int:
return self.config_dict.get("replication_threshold", 100)

def get_pool(self) -> Optional[str]:
"""
Parses flink_pool from a specific Flink Deployment instance's configuration data, using key 'spot'.

Args:
flink_deployment_config_data: The FlinkDeploymentConfig for a specific Flink yelpsoa instance

Returns:
The flink pool string.
"""
spot_config = self.config_dict.get("spot", None)
if spot_config is False:
return "flink"
else:
# if not set or True, Flink instance defaults to use flink-spot pool
return "flink-spot"


def load_flink_instance_config(
service: str,
Expand Down
45 changes: 45 additions & 0 deletions paasta_tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import service_configuration_lib
from docker import APIClient
from docker.utils import kwargs_from_env
from environment_tools.type_utils import convert_location_type
from kazoo.client import KazooClient
from mypy_extensions import TypedDict
from service_configuration_lib import read_extra_service_information
Expand Down Expand Up @@ -490,6 +491,9 @@ def get_deploy_group(self) -> str:
def get_team(self) -> str:
return self.config_dict.get("monitoring", {}).get("team", None)

def get_runbook(self) -> str:
return self.config_dict.get("monitoring", {}).get("runbook", None)

def get_mem(self) -> float:
"""Gets the memory required from the service's configuration.

Expand Down Expand Up @@ -2802,6 +2806,47 @@ def get_enable_tron_tsc(self) -> bool:
def get_remote_run_duration_limit(self, default: int) -> int:
return self.config_dict.get("remote_run_duration_limit", default)

def get_ecosystem_for_cluster(self, cluster: str) -> Optional[str]:
"""
Convert a Kubernetes cluster's region information to an ecosystem name.

This function extracts the 'yelp_region' from the cluster data and
converts it to an ecosystem identifier (e.g., 'uswest2-devc' → 'devc').

Convert the yelp_region to ecosystem
See y/habitat for what these mean
Example
convert_location_type(
location="uswest2-devc",
source_type="region",
desired_type="ecosystem",
)
Output: devc

NOTE: kube_clusters_data.get(cluster) returns a string like uswest2-devc
which loosely looks like an aws region - but while you can
go from yelp region -> aws region, the reverse is not
true without additional data
"""
kube_clusters_data = self.get_kube_clusters()
cluster_info = kube_clusters_data.get(cluster)
if cluster_info is not None:
yelp_region = cluster_info.get("yelp_region", None)
else:
# NOTE: this should never happen unless our kube metadata generator is broken
return None

result = convert_location_type(
location=yelp_region,
source_type="region",
desired_type="ecosystem",
)
if result:
return result[0]
else:
# NOTE: this should never happen unless we've gotten bad data
return None


def _run(
command: Union[str, List[str]],
Expand Down
1 change: 1 addition & 0 deletions requirements-minimal.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cookiecutter >= 1.4.0
croniter
docker
dulwich >= 0.17.3
environment-tools
ephemeral-port-reserve >= 1.0.1
graphviz
grpcio
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ click==6.6
containerd==1.5.3
cookiecutter==1.4.0
croniter==1.3.4
decorator==4.1.2
decorator==4.3.0
docker==5.0.3
dulwich==0.17.3
environment-tools==1.1.3
ephemeral-port-reserve==1.1.0
future==0.16.0
google-auth==1.2.0
Expand Down Expand Up @@ -55,6 +56,7 @@ msgpack-python==0.5.6
multidict==4.7.6
mypy-extensions==0.4.1
nats-py==2.8.0
networkx==2.4
nulltype==2.3.1
oauthlib==3.1.0
objgraph==3.4.0
Expand Down
Loading