diff --git a/ci/mkpipeline.py b/ci/mkpipeline.py index 5cb4e227ba909..49ac769601f31 100644 --- a/ci/mkpipeline.py +++ b/ci/mkpipeline.py @@ -28,6 +28,7 @@ import traceback from collections import OrderedDict from collections.abc import Iterable, Iterator +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any @@ -354,19 +355,27 @@ def visit(config: Any) -> None: def switch_jobs_to_aws(pipeline: Any, priority: int) -> None: """Switch jobs to AWS if Hetzner is currently overloaded""" - branch = os.getenv("BUILDKITE_BRANCH") + # branch = os.getenv("BUILDKITE_BRANCH") # If Hetzner is entirely broken, you have to take these actions to switch everything back to AWS: # - CI_FORCE_SWITCH_TO_AWS env variable to 1 # - Reconfigure the agent from hetzner-aarch64-4cpu-8gb to linux-aarch64-small in https://buildkite.com/materialize/test/settings/steps and other pipelines # - Reconfigure the agent from hetzner-aarch64-4cpu-8gb to linux-aarch64-small in ci/mkpipeline.sh - if not ui.env_is_truthy("CI_FORCE_SWITCH_TO_AWS", "0"): + + stuck: set[str] = set() + # TODO(def-): Remove me when Hetzner fixes its aarch64 availability + stuck.add("aarch64") + + if ui.env_is_truthy("CI_FORCE_SWITCH_TO_AWS", "0"): + stuck = set(["x86-64", "x86-64-dedi", "aarch64"]) + else: + # TODO(def-): Reenable me when Hetzner fixes its aarch64 availability # If priority has manually been set to be low, or on main branch, we can # wait for agents to become available - if branch == "main" or priority < 0: - return + # if branch == "main" or priority < 0: + # return - # Consider Hetzner to be overloaded when at least 600 jobs exist with priority >= 0 + # Consider Hetzner to be overloaded/broken when an important job is stuck waiting for an agent for > 60 minutes, per arch (x86-64/x86-64-dedi/aarch64) try: builds = generic_api.get_multiple( "builds", @@ -382,54 +391,100 @@ def switch_jobs_to_aws(pipeline: Any, priority: int) -> None: max_fetches=None, ) - num_jobs = 0 for build in builds: for job in build["jobs"]: if "state" not in job: continue if "agent_query_rules" not in job: continue - if job["state"] in ("scheduled", "running", "assigned", "accepted"): - queue = job["agent_query_rules"][0].removeprefix("queue=") - if not queue.startswith("hetzner-"): - continue - if job.get("priority", {}).get("number", 0) < 0: - continue - num_jobs += 1 - print(f"Number of high-priority jobs on Hetzner: {num_jobs}") - if num_jobs < 600: - return + queue = job["agent_query_rules"][0].removeprefix("queue=") + if not queue.startswith("hetzner-"): + continue + + # "hetzner-x86-64-dedi-4cpu-8gb" => "x86-64-dedi" + # "hetzner-x86-64-4cpu-8gb" => "x86-64" + # "hetzner-aarch64-4cpu-8gb" => "aarch64" + queue_prefix = "-".join(queue.split("-")[1:-2]) + if queue_prefix in stuck: + continue + + if job.get("state") != "scheduled": + continue + + runnable = job.get("runnable_at") + if not runnable or job.get("started_at"): + continue + if datetime.now(timezone.utc) - datetime.fromisoformat( + runnable + ) < timedelta(hours=1): + continue + + print( + f"Job {job.get('id')} ({job.get('web_url')}) with priority {priority} is runnable since {runnable} on {queue}, considering {queue_prefix} stuck" + ) + stuck.add(queue_prefix) except Exception: print("switch_jobs_to_aws failed, ignoring:") traceback.print_exc() return + if not stuck: + return + + print(f"Queue prefixes stuck in Hetzner, switching to AWS or another arch: {stuck}") + def visit(config: Any) -> None: if "agents" in config: agent = config["agents"].get("queue", None) - if agent in ("hetzner-aarch64-4cpu-8gb", "hetzner-aarch64-2cpu-4gb"): - config["agents"]["queue"] = "linux-aarch64-small" - if agent == "hetzner-aarch64-8cpu-16gb": - config["agents"]["queue"] = "linux-aarch64" - if agent == "hetzner-aarch64-16cpu-32gb": - config["agents"]["queue"] = "linux-aarch64-medium" - if agent in ( - "hetzner-x86-64-4cpu-8gb", - "hetzner-x86-64-2cpu-4gb", - "hetzner-x86-64-dedi-2cpu-8gb", - ): - config["agents"]["queue"] = "linux-x86_64-small" - if agent in ("hetzner-x86-64-8cpu-16gb", "hetzner-x86-64-dedi-4cpu-16gb"): - config["agents"]["queue"] = "linux-x86_64" - if agent in ("hetzner-x86-64-16cpu-32gb", "hetzner-x86-64-dedi-8cpu-32gb"): - config["agents"]["queue"] = "linux-x86_64-medium" - if agent == "hetzner-x86-64-dedi-16cpu-64gb": - config["agents"]["queue"] = "linux-x86_64-large" - if agent in ( - "hetzner-x86-64-dedi-32cpu-128gb", - "hetzner-x86-64-dedi-48cpu-192gb", - ): - config["agents"]["queue"] = "builder-linux-x86_64" + if "aarch64" in stuck: + if "x86-64" not in stuck: + if agent == "hetzner-aarch64-2cpu-4gb": + config["agents"]["queue"] = "hetzner-x86-64-2cpu-4gb" + if config.get("depends_on") == "build-aarch64": + config["depends_on"] = "build-x86_64" + elif agent == "hetzner-aarch64-4cpu-8gb": + config["agents"]["queue"] = "hetzner-x86-64-4cpu-8gb" + if config.get("depends_on") == "build-aarch64": + config["depends_on"] = "build-x86_64" + elif agent == "hetzner-aarch64-8cpu-16gb": + config["agents"]["queue"] = "hetzner-x86-64-8cpu-16gb" + if config.get("depends_on") == "build-aarch64": + config["depends_on"] = "build-x86_64" + elif agent == "hetzner-aarch64-16cpu-32gb": + config["agents"]["queue"] = "hetzner-x86-64-16cpu-32gb" + if config.get("depends_on") == "build-aarch64": + config["depends_on"] = "build-x86_64" + else: + if agent in ( + "hetzner-aarch64-4cpu-8gb", + "hetzner-aarch64-2cpu-4gb", + ): + config["agents"]["queue"] = "linux-aarch64" + elif agent in ( + "hetzner-aarch64-8cpu-16gb", + "hetzner-aarch64-16cpu-32gb", + ): + config["agents"]["queue"] = "linux-aarch64-medium" + if "x86-64" in stuck: + if agent in ("hetzner-x86-64-4cpu-8gb", "hetzner-x86-64-2cpu-4gb"): + config["agents"]["queue"] = "linux-x86_64" + elif agent in ("hetzner-x86-64-8cpu-16gb", "hetzner-x86-64-16cpu-32gb"): + config["agents"]["queue"] = "linux-x86_64-medium" + if "x86-64-dedi" in stuck: + if agent == "hetzner-x86-64-dedi-2cpu-8gb": + config["agents"]["queue"] = "linux-x86_64" + elif agent == "hetzner-x86-64-dedi-4cpu-16gb": + config["agents"]["queue"] = "linux-x86_64-medium" + elif agent in ( + "hetzner-x86-64-dedi-8cpu-32gb", + "hetzner-x86-64-dedi-16cpu-64gb", + ): + config["agents"]["queue"] = "linux-x86_64-large" + elif agent in ( + "hetzner-x86-64-dedi-32cpu-128gb", + "hetzner-x86-64-dedi-48cpu-192gb", + ): + config["agents"]["queue"] = "builder-linux-x86_64" for config in pipeline["steps"]: if "trigger" in config or "wait" in config: diff --git a/ci/mkpipeline.sh b/ci/mkpipeline.sh index 81e16ade79965..fdba148cb8eed 100755 --- a/ci/mkpipeline.sh +++ b/ci/mkpipeline.sh @@ -50,7 +50,7 @@ steps: command: bin/ci-builder run min bin/pyactivate -m ci.mkpipeline $pipeline $@ priority: 200 agents: - queue: hetzner-aarch64-4cpu-8gb + queue: hetzner-x86-64-4cpu-8gb retry: automatic: - exit_status: -1 diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index a6f55814470b3..52662b2b5cc8c 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -2001,6 +2001,7 @@ impl CatalogState { size, ReplicaAllocation { memory_limit, + memory_request: _, cpu_limit, disk_limit, scale, @@ -2025,7 +2026,6 @@ impl CatalogState { u64::cast_from(*workers).into(), cpu_limit.as_nanocpus().into(), memory_bytes.into(), - // TODO(guswynn): disk size will be filled in later. disk_bytes.into(), (*credits_per_hour).into(), ]); diff --git a/src/catalog/src/config.rs b/src/catalog/src/config.rs index e779f2c2eaad7..176c8db93eabc 100644 --- a/src/catalog/src/config.rs +++ b/src/catalog/src/config.rs @@ -162,6 +162,7 @@ impl ClusterReplicaSizeMap { name, ReplicaAllocation { memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))), + memory_request: None, cpu_limit: None, disk_limit: None, scale: 1, @@ -183,6 +184,7 @@ impl ClusterReplicaSizeMap { format!("{scale}-1"), ReplicaAllocation { memory_limit: None, + memory_request: None, cpu_limit: None, disk_limit: None, scale, @@ -199,6 +201,7 @@ impl ClusterReplicaSizeMap { format!("{scale}-{scale}"), ReplicaAllocation { memory_limit: None, + memory_request: None, cpu_limit: None, disk_limit: None, scale, @@ -215,6 +218,7 @@ impl ClusterReplicaSizeMap { format!("mem-{scale}"), ReplicaAllocation { memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))), + memory_request: None, cpu_limit: None, disk_limit: None, scale: 1, @@ -232,6 +236,7 @@ impl ClusterReplicaSizeMap { "2-4".to_string(), ReplicaAllocation { memory_limit: None, + memory_request: None, cpu_limit: None, disk_limit: None, scale: 2, @@ -248,6 +253,7 @@ impl ClusterReplicaSizeMap { "free".to_string(), ReplicaAllocation { memory_limit: None, + memory_request: None, cpu_limit: None, disk_limit: None, scale: 0, @@ -265,6 +271,7 @@ impl ClusterReplicaSizeMap { size.to_string(), ReplicaAllocation { memory_limit: None, + memory_request: None, cpu_limit: None, disk_limit: None, scale: 1, diff --git a/src/controller/src/clusters.rs b/src/controller/src/clusters.rs index d36c1b277bbf7..494ee9b5afdb7 100644 --- a/src/controller/src/clusters.rs +++ b/src/controller/src/clusters.rs @@ -72,6 +72,9 @@ pub struct ReplicaConfig { pub struct ReplicaAllocation { /// The memory limit for each process in the replica. pub memory_limit: Option, + /// The memory limit for each process in the replica. + #[serde(default)] + pub memory_request: Option, /// The CPU limit for each process in the replica. pub cpu_limit: Option, /// The disk limit for each process in the replica. @@ -112,6 +115,7 @@ fn test_replica_allocation_deserialization() { { "cpu_limit": 1.0, "memory_limit": "10GiB", + "memory_request": "5GiB", "disk_limit": "100MiB", "scale": 16, "workers": 1, @@ -132,6 +136,7 @@ fn test_replica_allocation_deserialization() { disk_limit: Some(DiskLimit(ByteSize::mib(100))), disabled: false, memory_limit: Some(MemoryLimit(ByteSize::gib(10))), + memory_request: Some(MemoryLimit(ByteSize::gib(5))), cpu_limit: Some(CpuLimit::from_millicpus(1000)), cpu_exclusive: false, is_cc: true, @@ -166,6 +171,7 @@ fn test_replica_allocation_deserialization() { disk_limit: Some(DiskLimit(ByteSize::mib(0))), disabled: true, memory_limit: Some(MemoryLimit(ByteSize::gib(0))), + memory_request: None, cpu_limit: Some(CpuLimit::from_millicpus(0)), cpu_exclusive: true, is_cc: true, @@ -702,6 +708,7 @@ where ], cpu_limit: location.allocation.cpu_limit, memory_limit: location.allocation.memory_limit, + memory_request: location.allocation.memory_request, scale: location.allocation.scale, labels: BTreeMap::from([ ("replica-id".into(), replica_id.to_string()), diff --git a/src/metrics/src/lgalloc.rs b/src/metrics/src/lgalloc.rs index f9b2744749bf4..61e072ea42a14 100644 --- a/src/metrics/src/lgalloc.rs +++ b/src/metrics/src/lgalloc.rs @@ -189,7 +189,7 @@ pub async fn register_metrics_into(metrics_registry: &MetricsRegistry) { let mut lgmetrics = LgMetrics::new(metrics_registry); mz_ore::task::spawn(|| "lgalloc_stats_update", async move { - let mut interval = tokio::time::interval(Duration::from_secs(30)); + let mut interval = tokio::time::interval(Duration::from_secs(86400)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; diff --git a/src/orchestrator-kubernetes/src/lib.rs b/src/orchestrator-kubernetes/src/lib.rs index e61f80578faad..86fb7409e9f85 100644 --- a/src/orchestrator-kubernetes/src/lib.rs +++ b/src/orchestrator-kubernetes/src/lib.rs @@ -582,6 +582,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { args, ports: ports_in, memory_limit, + memory_request, cpu_limit, scale, labels: labels_in, @@ -644,17 +645,32 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { labels.insert(key.clone(), value.clone()); } let mut limits = BTreeMap::new(); + let mut requests = BTreeMap::new(); if let Some(memory_limit) = memory_limit { limits.insert( "memory".into(), Quantity(memory_limit.0.as_u64().to_string()), ); + requests.insert( + "memory".into(), + Quantity(memory_limit.0.as_u64().to_string()), + ); + } + if let Some(memory_request) = memory_request { + requests.insert( + "memory".into(), + Quantity(memory_request.0.as_u64().to_string()), + ); } if let Some(cpu_limit) = cpu_limit { limits.insert( "cpu".into(), Quantity(format!("{}m", cpu_limit.as_millicpus())), ); + requests.insert( + "cpu".into(), + Quantity(format!("{}m", cpu_limit.as_millicpus())), + ); } let service = K8sService { metadata: ObjectMeta { @@ -931,7 +947,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { // Set both limits and requests to the same values, to ensure a // `Guaranteed` QoS class for the pod. limits: Some(limits.clone()), - requests: Some(limits.clone()), + requests: Some(requests.clone()), }), security_context: container_security_context.clone(), env: Some(vec![ @@ -1128,8 +1144,8 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { claims: None, // Set both limits and requests to the same values, to ensure a // `Guaranteed` QoS class for the pod. - limits: Some(limits.clone()), - requests: Some(limits), + limits: Some(limits), + requests: Some(requests), }), volume_mounts: if !volume_mounts.is_empty() { Some(volume_mounts) diff --git a/src/orchestrator/src/lib.rs b/src/orchestrator/src/lib.rs index aae9d3a31a377..fa6698e38a8cb 100644 --- a/src/orchestrator/src/lib.rs +++ b/src/orchestrator/src/lib.rs @@ -203,6 +203,9 @@ pub struct ServiceConfig { pub ports: Vec, /// An optional limit on the memory that the service can use. pub memory_limit: Option, + /// An optional request on the memory that the service can use. If unspecified, + /// use the same value as `memory_limit`. + pub memory_request: Option, /// An optional limit on the CPU that the service can use. pub cpu_limit: Option, /// The number of copies of this service to run.