Skip to content

Add memory-request replica property #32624 #32644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: potential/0-130-14
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 94 additions & 39 deletions ci/mkpipeline.py
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
import traceback
from collections import OrderedDict
from collections.abc import Iterable, Iterator
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any

@@ -354,19 +355,27 @@ def visit(config: Any) -> None:
def switch_jobs_to_aws(pipeline: Any, priority: int) -> None:
"""Switch jobs to AWS if Hetzner is currently overloaded"""

branch = os.getenv("BUILDKITE_BRANCH")
# branch = os.getenv("BUILDKITE_BRANCH")

# If Hetzner is entirely broken, you have to take these actions to switch everything back to AWS:
# - CI_FORCE_SWITCH_TO_AWS env variable to 1
# - Reconfigure the agent from hetzner-aarch64-4cpu-8gb to linux-aarch64-small in https://buildkite.com/materialize/test/settings/steps and other pipelines
# - Reconfigure the agent from hetzner-aarch64-4cpu-8gb to linux-aarch64-small in ci/mkpipeline.sh
if not ui.env_is_truthy("CI_FORCE_SWITCH_TO_AWS", "0"):

stuck: set[str] = set()
# TODO(def-): Remove me when Hetzner fixes its aarch64 availability
stuck.add("aarch64")

if ui.env_is_truthy("CI_FORCE_SWITCH_TO_AWS", "0"):
stuck = set(["x86-64", "x86-64-dedi", "aarch64"])
else:
# TODO(def-): Reenable me when Hetzner fixes its aarch64 availability
# If priority has manually been set to be low, or on main branch, we can
# wait for agents to become available
if branch == "main" or priority < 0:
return
# if branch == "main" or priority < 0:
# return

# Consider Hetzner to be overloaded when at least 600 jobs exist with priority >= 0
# Consider Hetzner to be overloaded/broken when an important job is stuck waiting for an agent for > 60 minutes, per arch (x86-64/x86-64-dedi/aarch64)
try:
builds = generic_api.get_multiple(
"builds",
@@ -382,54 +391,100 @@ def switch_jobs_to_aws(pipeline: Any, priority: int) -> None:
max_fetches=None,
)

num_jobs = 0
for build in builds:
for job in build["jobs"]:
if "state" not in job:
continue
if "agent_query_rules" not in job:
continue
if job["state"] in ("scheduled", "running", "assigned", "accepted"):
queue = job["agent_query_rules"][0].removeprefix("queue=")
if not queue.startswith("hetzner-"):
continue
if job.get("priority", {}).get("number", 0) < 0:
continue
num_jobs += 1
print(f"Number of high-priority jobs on Hetzner: {num_jobs}")
if num_jobs < 600:
return
queue = job["agent_query_rules"][0].removeprefix("queue=")
if not queue.startswith("hetzner-"):
continue

# "hetzner-x86-64-dedi-4cpu-8gb" => "x86-64-dedi"
# "hetzner-x86-64-4cpu-8gb" => "x86-64"
# "hetzner-aarch64-4cpu-8gb" => "aarch64"
queue_prefix = "-".join(queue.split("-")[1:-2])
if queue_prefix in stuck:
continue

if job.get("state") != "scheduled":
continue

runnable = job.get("runnable_at")
if not runnable or job.get("started_at"):
continue
if datetime.now(timezone.utc) - datetime.fromisoformat(
runnable
) < timedelta(hours=1):
continue

print(
f"Job {job.get('id')} ({job.get('web_url')}) with priority {priority} is runnable since {runnable} on {queue}, considering {queue_prefix} stuck"
)
stuck.add(queue_prefix)
except Exception:
print("switch_jobs_to_aws failed, ignoring:")
traceback.print_exc()
return

if not stuck:
return

print(f"Queue prefixes stuck in Hetzner, switching to AWS or another arch: {stuck}")

def visit(config: Any) -> None:
if "agents" in config:
agent = config["agents"].get("queue", None)
if agent in ("hetzner-aarch64-4cpu-8gb", "hetzner-aarch64-2cpu-4gb"):
config["agents"]["queue"] = "linux-aarch64-small"
if agent == "hetzner-aarch64-8cpu-16gb":
config["agents"]["queue"] = "linux-aarch64"
if agent == "hetzner-aarch64-16cpu-32gb":
config["agents"]["queue"] = "linux-aarch64-medium"
if agent in (
"hetzner-x86-64-4cpu-8gb",
"hetzner-x86-64-2cpu-4gb",
"hetzner-x86-64-dedi-2cpu-8gb",
):
config["agents"]["queue"] = "linux-x86_64-small"
if agent in ("hetzner-x86-64-8cpu-16gb", "hetzner-x86-64-dedi-4cpu-16gb"):
config["agents"]["queue"] = "linux-x86_64"
if agent in ("hetzner-x86-64-16cpu-32gb", "hetzner-x86-64-dedi-8cpu-32gb"):
config["agents"]["queue"] = "linux-x86_64-medium"
if agent == "hetzner-x86-64-dedi-16cpu-64gb":
config["agents"]["queue"] = "linux-x86_64-large"
if agent in (
"hetzner-x86-64-dedi-32cpu-128gb",
"hetzner-x86-64-dedi-48cpu-192gb",
):
config["agents"]["queue"] = "builder-linux-x86_64"
if "aarch64" in stuck:
if "x86-64" not in stuck:
if agent == "hetzner-aarch64-2cpu-4gb":
config["agents"]["queue"] = "hetzner-x86-64-2cpu-4gb"
if config.get("depends_on") == "build-aarch64":
config["depends_on"] = "build-x86_64"
elif agent == "hetzner-aarch64-4cpu-8gb":
config["agents"]["queue"] = "hetzner-x86-64-4cpu-8gb"
if config.get("depends_on") == "build-aarch64":
config["depends_on"] = "build-x86_64"
elif agent == "hetzner-aarch64-8cpu-16gb":
config["agents"]["queue"] = "hetzner-x86-64-8cpu-16gb"
if config.get("depends_on") == "build-aarch64":
config["depends_on"] = "build-x86_64"
elif agent == "hetzner-aarch64-16cpu-32gb":
config["agents"]["queue"] = "hetzner-x86-64-16cpu-32gb"
if config.get("depends_on") == "build-aarch64":
config["depends_on"] = "build-x86_64"
else:
if agent in (
"hetzner-aarch64-4cpu-8gb",
"hetzner-aarch64-2cpu-4gb",
):
config["agents"]["queue"] = "linux-aarch64"
elif agent in (
"hetzner-aarch64-8cpu-16gb",
"hetzner-aarch64-16cpu-32gb",
):
config["agents"]["queue"] = "linux-aarch64-medium"
if "x86-64" in stuck:
if agent in ("hetzner-x86-64-4cpu-8gb", "hetzner-x86-64-2cpu-4gb"):
config["agents"]["queue"] = "linux-x86_64"
elif agent in ("hetzner-x86-64-8cpu-16gb", "hetzner-x86-64-16cpu-32gb"):
config["agents"]["queue"] = "linux-x86_64-medium"
if "x86-64-dedi" in stuck:
if agent == "hetzner-x86-64-dedi-2cpu-8gb":
config["agents"]["queue"] = "linux-x86_64"
elif agent == "hetzner-x86-64-dedi-4cpu-16gb":
config["agents"]["queue"] = "linux-x86_64-medium"
elif agent in (
"hetzner-x86-64-dedi-8cpu-32gb",
"hetzner-x86-64-dedi-16cpu-64gb",
):
config["agents"]["queue"] = "linux-x86_64-large"
elif agent in (
"hetzner-x86-64-dedi-32cpu-128gb",
"hetzner-x86-64-dedi-48cpu-192gb",
):
config["agents"]["queue"] = "builder-linux-x86_64"

for config in pipeline["steps"]:
if "trigger" in config or "wait" in config:
2 changes: 1 addition & 1 deletion ci/mkpipeline.sh
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ steps:
command: bin/ci-builder run min bin/pyactivate -m ci.mkpipeline $pipeline $@
priority: 200
agents:
queue: hetzner-aarch64-4cpu-8gb
queue: hetzner-x86-64-4cpu-8gb
retry:
automatic:
- exit_status: -1
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
@@ -2001,6 +2001,7 @@ impl CatalogState {
size,
ReplicaAllocation {
memory_limit,
memory_request: _,
cpu_limit,
disk_limit,
scale,
@@ -2025,7 +2026,6 @@ impl CatalogState {
u64::cast_from(*workers).into(),
cpu_limit.as_nanocpus().into(),
memory_bytes.into(),
// TODO(guswynn): disk size will be filled in later.
disk_bytes.into(),
(*credits_per_hour).into(),
]);
7 changes: 7 additions & 0 deletions src/catalog/src/config.rs
Original file line number Diff line number Diff line change
@@ -162,6 +162,7 @@ impl ClusterReplicaSizeMap {
name,
ReplicaAllocation {
memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
memory_request: None,
cpu_limit: None,
disk_limit: None,
scale: 1,
@@ -183,6 +184,7 @@ impl ClusterReplicaSizeMap {
format!("{scale}-1"),
ReplicaAllocation {
memory_limit: None,
memory_request: None,
cpu_limit: None,
disk_limit: None,
scale,
@@ -199,6 +201,7 @@ impl ClusterReplicaSizeMap {
format!("{scale}-{scale}"),
ReplicaAllocation {
memory_limit: None,
memory_request: None,
cpu_limit: None,
disk_limit: None,
scale,
@@ -215,6 +218,7 @@ impl ClusterReplicaSizeMap {
format!("mem-{scale}"),
ReplicaAllocation {
memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
memory_request: None,
cpu_limit: None,
disk_limit: None,
scale: 1,
@@ -232,6 +236,7 @@ impl ClusterReplicaSizeMap {
"2-4".to_string(),
ReplicaAllocation {
memory_limit: None,
memory_request: None,
cpu_limit: None,
disk_limit: None,
scale: 2,
@@ -248,6 +253,7 @@ impl ClusterReplicaSizeMap {
"free".to_string(),
ReplicaAllocation {
memory_limit: None,
memory_request: None,
cpu_limit: None,
disk_limit: None,
scale: 0,
@@ -265,6 +271,7 @@ impl ClusterReplicaSizeMap {
size.to_string(),
ReplicaAllocation {
memory_limit: None,
memory_request: None,
cpu_limit: None,
disk_limit: None,
scale: 1,
7 changes: 7 additions & 0 deletions src/controller/src/clusters.rs
Original file line number Diff line number Diff line change
@@ -72,6 +72,9 @@ pub struct ReplicaConfig {
pub struct ReplicaAllocation {
/// The memory limit for each process in the replica.
pub memory_limit: Option<MemoryLimit>,
/// The memory limit for each process in the replica.
#[serde(default)]
pub memory_request: Option<MemoryLimit>,
/// The CPU limit for each process in the replica.
pub cpu_limit: Option<CpuLimit>,
/// The disk limit for each process in the replica.
@@ -112,6 +115,7 @@ fn test_replica_allocation_deserialization() {
{
"cpu_limit": 1.0,
"memory_limit": "10GiB",
"memory_request": "5GiB",
"disk_limit": "100MiB",
"scale": 16,
"workers": 1,
@@ -132,6 +136,7 @@ fn test_replica_allocation_deserialization() {
disk_limit: Some(DiskLimit(ByteSize::mib(100))),
disabled: false,
memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
memory_request: Some(MemoryLimit(ByteSize::gib(5))),
cpu_limit: Some(CpuLimit::from_millicpus(1000)),
cpu_exclusive: false,
is_cc: true,
@@ -166,6 +171,7 @@ fn test_replica_allocation_deserialization() {
disk_limit: Some(DiskLimit(ByteSize::mib(0))),
disabled: true,
memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
memory_request: None,
cpu_limit: Some(CpuLimit::from_millicpus(0)),
cpu_exclusive: true,
is_cc: true,
@@ -702,6 +708,7 @@ where
],
cpu_limit: location.allocation.cpu_limit,
memory_limit: location.allocation.memory_limit,
memory_request: location.allocation.memory_request,
scale: location.allocation.scale,
labels: BTreeMap::from([
("replica-id".into(), replica_id.to_string()),
2 changes: 1 addition & 1 deletion src/metrics/src/lgalloc.rs
Original file line number Diff line number Diff line change
@@ -189,7 +189,7 @@ pub async fn register_metrics_into(metrics_registry: &MetricsRegistry) {
let mut lgmetrics = LgMetrics::new(metrics_registry);

mz_ore::task::spawn(|| "lgalloc_stats_update", async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
let mut interval = tokio::time::interval(Duration::from_secs(86400));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
22 changes: 19 additions & 3 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
@@ -582,6 +582,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
args,
ports: ports_in,
memory_limit,
memory_request,
cpu_limit,
scale,
labels: labels_in,
@@ -644,17 +645,32 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
labels.insert(key.clone(), value.clone());
}
let mut limits = BTreeMap::new();
let mut requests = BTreeMap::new();
if let Some(memory_limit) = memory_limit {
limits.insert(
"memory".into(),
Quantity(memory_limit.0.as_u64().to_string()),
);
requests.insert(
"memory".into(),
Quantity(memory_limit.0.as_u64().to_string()),
);
}
if let Some(memory_request) = memory_request {
requests.insert(
"memory".into(),
Quantity(memory_request.0.as_u64().to_string()),
);
}
if let Some(cpu_limit) = cpu_limit {
limits.insert(
"cpu".into(),
Quantity(format!("{}m", cpu_limit.as_millicpus())),
);
requests.insert(
"cpu".into(),
Quantity(format!("{}m", cpu_limit.as_millicpus())),
);
}
let service = K8sService {
metadata: ObjectMeta {
@@ -931,7 +947,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
// Set both limits and requests to the same values, to ensure a
// `Guaranteed` QoS class for the pod.
limits: Some(limits.clone()),
requests: Some(limits.clone()),
requests: Some(requests.clone()),
}),
security_context: container_security_context.clone(),
env: Some(vec![
@@ -1128,8 +1144,8 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
claims: None,
// Set both limits and requests to the same values, to ensure a
// `Guaranteed` QoS class for the pod.
limits: Some(limits.clone()),
requests: Some(limits),
limits: Some(limits),
requests: Some(requests),
}),
volume_mounts: if !volume_mounts.is_empty() {
Some(volume_mounts)
3 changes: 3 additions & 0 deletions src/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -203,6 +203,9 @@ pub struct ServiceConfig {
pub ports: Vec<ServicePort>,
/// An optional limit on the memory that the service can use.
pub memory_limit: Option<MemoryLimit>,
/// An optional request on the memory that the service can use. If unspecified,
/// use the same value as `memory_limit`.
pub memory_request: Option<MemoryLimit>,
/// An optional limit on the CPU that the service can use.
pub cpu_limit: Option<CpuLimit>,
/// The number of copies of this service to run.