diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 7eef717d02dbe..b939fdf2ee50b 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -1209,6 +1209,8 @@ steps: - ./ci/plugins/mzcompose: composition: terraform run: aws-temporary + # Cleanup runs in pre-exit hook + args: [--no-cleanup] branches: "main v*.* lts-v* *aws* *tf* *terraform* *helm* *self-managed* *orchestratord*" - id: terraform-aws-upgrade @@ -1225,6 +1227,8 @@ steps: - ./ci/plugins/mzcompose: composition: terraform run: aws-upgrade + # Cleanup runs in pre-exit hook + args: [--no-cleanup] branches: "main v*.* lts-v* *aws* *tf* *terraform* *helm* *self-managed* *orchestratord*" - id: terraform-gcp @@ -1240,6 +1244,8 @@ steps: - ./ci/plugins/mzcompose: composition: terraform run: gcp-temporary + # Cleanup runs in pre-exit hook + args: [--no-cleanup] branches: "main v*.* lts-v* *gcp* *tf* *terraform* *helm* *self-managed* *orchestratord*" - id: terraform-azure @@ -1255,6 +1261,8 @@ steps: - ./ci/plugins/mzcompose: composition: terraform run: azure-temporary + # Cleanup runs in pre-exit hook + args: [--no-cleanup] branches: "main v*.* lts-v* *azure* *tf* *terraform* *helm* *self-managed* *orchestratord*" - group: "Output consistency" diff --git a/ci/plugins/mzcompose/hooks/pre-exit b/ci/plugins/mzcompose/hooks/pre-exit index 187ef6bc30f70..c24cb677edec0 100755 --- a/ci/plugins/mzcompose/hooks/pre-exit +++ b/ci/plugins/mzcompose/hooks/pre-exit @@ -134,14 +134,6 @@ if [ -n "${CI_COVERAGE_ENABLED:-}" ] && [ -z "${BUILDKITE_MZCOMPOSE_PLUGIN_SKIP_ fi fi -if [[ "$BUILDKITE_LABEL" =~ Terraform\ .* ]]; then - ci_unimportant_heading "terraform: Destroying leftover state in case job was cancelled or timed out..." - bin/ci-builder run stable terraform -chdir=test/terraform/aws-temporary destroy || true - bin/ci-builder run stable terraform -chdir=test/terraform/gcp-temporary destroy || true - PATH="$PWD/test/terraform/azure-temporary/venv/bin:$PATH" VIRTUAL_ENV="$PWD/test/terraform/azure-temporary/venv" bin/ci-builder run stable terraform -chdir=test/terraform/azure-temporary destroy || true -fi -rm -rf ~/.kube # Remove potential state from E2E Terraform tests - ci_unimportant_heading ":docker: Cleaning up after mzcompose" # docker-compose kill may fail attempting to kill containers @@ -156,12 +148,14 @@ fi ci_collapsed_heading ":docker: Purging all existing docker containers and volumes, regardless of origin" docker ps --all --quiet | xargs --no-run-if-empty docker rm --force --volumes -if [[ "$BUILDKITE_LABEL" =~ Terraform\ .* ]]; then - ci_unimportant_heading "terraform: Destroying leftover state in case job was cancelled or timed out..." - bin/ci-builder run stable terraform -chdir=test/terraform/aws-temporary destroy || true - bin/ci-builder run stable terraform -chdir=test/terraform/aws-upgrade destroy || true - bin/ci-builder run stable terraform -chdir=test/terraform/gcp-temporary destroy || true - bin/ci-builder run stable terraform -chdir=test/terraform/azure-temporary destroy || true +if [ "$BUILDKITE_STEP_KEY" = "terraform-aws" ]; then + run run aws-temporary --no-setup --no-test --no-run-mz-debug || CI_ANNOTATE_ERRORS_RESULT=1 +elif [ "$BUILDKITE_STEP_KEY" = "terraform-aws-upgrade" ]; then + run run aws-upgrade --no-setup --no-test --no-run-mz-debug || CI_ANNOTATE_ERRORS_RESULT=1 +elif [ "$BUILDKITE_STEP_KEY" = "terraform-gcp" ]; then + run run gcp-temporary --no-setup --no-test --no-run-mz-debug || CI_ANNOTATE_ERRORS_RESULT=1 +elif [ "$BUILDKITE_STEP_KEY" = "terraform-azure" ]; then + run run azure-temporary --no-setup --no-test --no-run-mz-debug || CI_ANNOTATE_ERRORS_RESULT=1 fi rm -rf ~/.kube # Remove potential state from E2E Terraform tests diff --git a/misc/python/materialize/mzcompose/services/testdrive.py b/misc/python/materialize/mzcompose/services/testdrive.py index 56a2dc001036b..51c566f9a28ab 100644 --- a/misc/python/materialize/mzcompose/services/testdrive.py +++ b/misc/python/materialize/mzcompose/services/testdrive.py @@ -64,6 +64,7 @@ def __init__( stop_grace_period: str = "120s", cluster_replica_size: dict[str, dict[str, Any]] | None = None, network_mode: str | None = None, + set_persist_urls: bool = True, ) -> None: depends_graph: dict[str, ServiceDependency] = {} @@ -161,27 +162,30 @@ def __init__( f"--fivetran-destination-files-path={fivetran_destination_files_path}" ) - if external_blob_store: - blob_store = "azurite" if blob_store_is_azure else "minio" - address = blob_store if external_blob_store == True else external_blob_store - persist_blob_url = ( - azure_blob_uri(address) - if blob_store_is_azure - else minio_blob_uri(address) - ) - entrypoint.append(f"--persist-blob-url={persist_blob_url}") - else: - entrypoint.append("--persist-blob-url=file:///mzdata/persist/blob") - - if external_metadata_store: - depends_graph[metadata_store] = {"condition": "service_healthy"} - entrypoint.append( - "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus" - ) - else: - entrypoint.append( - f"--persist-consensus-url=postgres://root@{mz_service}:26257?options=--search_path=consensus" - ) + if set_persist_urls: + if external_blob_store: + blob_store = "azurite" if blob_store_is_azure else "minio" + address = ( + blob_store if external_blob_store == True else external_blob_store + ) + persist_blob_url = ( + azure_blob_uri(address) + if blob_store_is_azure + else minio_blob_uri(address) + ) + entrypoint.append(f"--persist-blob-url={persist_blob_url}") + else: + entrypoint.append("--persist-blob-url=file:///mzdata/persist/blob") + + if external_metadata_store: + depends_graph[metadata_store] = {"condition": "service_healthy"} + entrypoint.append( + "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus" + ) + else: + entrypoint.append( + f"--persist-consensus-url=postgres://root@{mz_service}:26257?options=--search_path=consensus" + ) entrypoint.extend(entrypoint_extra) diff --git a/src/testdrive/src/action.rs b/src/testdrive/src/action.rs index d5920fbdeeea6..a582bb6f04b52 100644 --- a/src/testdrive/src/action.rs +++ b/src/testdrive/src/action.rs @@ -776,7 +776,7 @@ impl Run for PosCommand { consistency::skip_consistency_checks(builtin, state) } "check-shard-tombstone" => { - consistency::run_check_shard_tombstoned(builtin, state).await + consistency::run_check_shard_tombstone(builtin, state).await } "fivetran-destination" => { fivetran::run_destination_command(builtin, state).await diff --git a/src/testdrive/src/action/consistency.rs b/src/testdrive/src/action/consistency.rs index 9f962cfc5721c..ae5178ce45fb7 100644 --- a/src/testdrive/src/action/consistency.rs +++ b/src/testdrive/src/action/consistency.rs @@ -94,12 +94,12 @@ pub async fn run_consistency_checks(state: &State) -> Result Result { let shard_id = cmd.args.string("shard-id")?; - check_shard_tombstoned(state, &shard_id).await?; + check_shard_tombstone(state, &shard_id).await?; Ok(ControlFlow::Continue) } @@ -237,8 +237,8 @@ async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> { } /// Checks if the provided `shard_id` is a tombstone, returning an error if it's not. -async fn check_shard_tombstoned(state: &State, shard_id: &str) -> Result<(), anyhow::Error> { - println!("$ check-shard-tombstoned {shard_id}"); +async fn check_shard_tombstone(state: &State, shard_id: &str) -> Result<(), anyhow::Error> { + println!("$ check-shard-tombstone {shard_id}"); let (Some(consensus_uri), Some(blob_uri)) = (&state.persist_consensus_url, &state.persist_blob_url) diff --git a/test/terraform/mzcompose.py b/test/terraform/mzcompose.py index f960d387dd855..ab953a0d9447f 100644 --- a/test/terraform/mzcompose.py +++ b/test/terraform/mzcompose.py @@ -15,6 +15,7 @@ import os import signal import subprocess +import threading import time from collections.abc import Sequence from pathlib import Path @@ -35,6 +36,10 @@ Testdrive(), # overridden below ] +TD_CMD = [ + "--var=default-replica-size=25cc", + "--var=default-storage-size=25cc", +] COMPATIBLE_TESTDRIVE_FILES = [ "array.td", @@ -80,6 +85,8 @@ "joins.td", "jsonb.td", "list.td", + # Flaky on Azure: https://buildkite.com/materialize/nightly/builds/11906#019661aa-2f41-43e1-b08f-6195c66a7ab9 + # "load-generator-key-value.td", "logging.td", "map.td", "multijoins.td", @@ -88,6 +95,7 @@ "oid.td", "orms.td", "pg-catalog.td", + "quickstart.td", "runtime-errors.td", "search_path.td", "self-test.td", @@ -95,8 +103,11 @@ "subquery-scalar-errors.td", "system-functions.td", "test-skip-if.td", + "tpch.td", "type_char_quoted.td", "version.td", + # Hangs on GCP in check-shard-tombstone + # "webhook.td", ] @@ -116,8 +127,9 @@ def testdrive(no_reset: bool) -> Testdrive: return Testdrive( materialize_url="postgres://materialize@127.0.0.1:6875/materialize", materialize_url_internal="postgres://mz_system:materialize@127.0.0.1:6877/materialize", - materialize_use_https=True, + materialize_use_https=False, no_consistency_checks=True, + set_persist_urls=False, network_mode="host", volume_workdir="../testdrive:/workdir", no_reset=no_reset, @@ -132,24 +144,48 @@ def get_tag(tag: str | None) -> str: return tag or f"v{ci_util.get_mz_version()}--pr.g{git.rev_parse('HEAD')}" -def mz_debug(env: dict[str, str] | None = None) -> None: - print("-- Running mz-debug") - run_ignore_error( - [ - "cargo", - "run", - "--bin", - "mz-debug", - "--", - "self-managed", - "--k8s-namespace", - "materialize-environment", - "--k8s-namespace", - "materialize", - ], - cwd=MZ_ROOT, - env=env, - ) +def build_mz_debug_async(env: dict[str, str] | None = None) -> threading.Thread: + def run(): + spawn.capture( + [ + "cargo", + "build", + "--bin", + "mz-debug", + ], + cwd=MZ_ROOT, + stderr=subprocess.STDOUT, + env=env, + ) + + thread = threading.Thread(target=run) + thread.start() + return thread + + +def run_mz_debug(env: dict[str, str] | None = None) -> None: + print("--- Running mz-debug") + try: + # mz-debug (and its compilation) is rather noisy, so ignore the output + spawn.capture( + [ + "cargo", + "run", + "--bin", + "mz-debug", + "--", + "self-managed", + "--k8s-namespace", + "materialize-environment", + "--k8s-namespace", + "materialize", + ], + cwd=MZ_ROOT, + stderr=subprocess.STDOUT, + env=env, + ) + except: + pass class AWS: @@ -579,6 +615,7 @@ def connect(self, c: Composition) -> None: with conn.cursor() as cur: # Required for some testdrive tests cur.execute("ALTER CLUSTER mz_system SET (REPLICATION FACTOR 2)") + cur.execute("ALTER SYSTEM SET enable_create_table_from_source = true") c.up("testdrive", persistent=True) c.testdrive( @@ -635,12 +672,24 @@ def workflow_aws_temporary(c: Composition, parser: WorkflowArgumentParser) -> No action=argparse.BooleanOptionalAction, help="Destroy the region at the end of the workflow.", ) + parser.add_argument( + "--test", + default=True, + action=argparse.BooleanOptionalAction, + help="Run the actual test part", + ) parser.add_argument( "--run-testdrive-files", default=True, action=argparse.BooleanOptionalAction, help="Run testdrive files", ) + parser.add_argument( + "--run-mz-debug", + default=True, + action=argparse.BooleanOptionalAction, + help="Run mz-debug", + ) parser.add_argument( "--tag", type=str, @@ -658,37 +707,44 @@ def workflow_aws_temporary(c: Composition, parser: WorkflowArgumentParser) -> No tag = get_tag(args.tag) path = MZ_ROOT / "test" / "terraform" / "aws-temporary" aws = AWS(path) + mz_debug_build_thread: threading.Thread | None = None try: + if args.run_mz_debug: + mz_debug_build_thread = build_mz_debug_async() aws.setup("aws-test", args.setup, tag) - print("--- Running tests") - with c.override(testdrive(no_reset=False)): - aws.connect(c) + if args.test: + print("--- Running tests") + with c.override(testdrive(no_reset=False)): + aws.connect(c) - with psycopg.connect( - "postgres://materialize@127.0.0.1:6875/materialize" - ) as conn: - with conn.cursor() as cur: - cur.execute("SELECT 1") - results = cur.fetchall() - assert results == [(1,)], results - cur.execute("SELECT mz_version()") - version = cur.fetchall()[0][0] - assert version.startswith(tag.split("--")[0] + " ") - with open( - MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml" - ) as f: - content = yaml.load(f, Loader=yaml.Loader) - helm_chart_version = content["version"] - assert version.endswith( - f", helm chart: {helm_chart_version})" - ), f"Actual version: {version}, expected to contain {helm_chart_version}" - - if args.run_testdrive_files: - c.run_testdrive_files(*args.files) + with psycopg.connect( + "postgres://materialize@127.0.0.1:6875/materialize" + ) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + results = cur.fetchall() + assert results == [(1,)], results + cur.execute("SELECT mz_version()") + version = cur.fetchall()[0][0] + assert version.startswith(tag.split("--")[0] + " ") + with open( + MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml" + ) as f: + content = yaml.load(f, Loader=yaml.Loader) + helm_chart_version = content["version"] + assert version.endswith( + f", helm chart: {helm_chart_version})" + ), f"Actual version: {version}, expected to contain {helm_chart_version}" + + if args.run_testdrive_files: + c.run_testdrive_files(*TD_CMD, *args.files) finally: aws.cleanup() - mz_debug() + if args.run_mz_debug: + assert mz_debug_build_thread + mz_debug_build_thread.join() + run_mz_debug() if args.cleanup: aws.destroy() @@ -708,6 +764,24 @@ def workflow_aws_upgrade(c: Composition, parser: WorkflowArgumentParser) -> None action=argparse.BooleanOptionalAction, help="Destroy the region at the end of the workflow.", ) + parser.add_argument( + "--test", + default=True, + action=argparse.BooleanOptionalAction, + help="Run the actual test part", + ) + parser.add_argument( + "--run-testdrive-files", + default=True, + action=argparse.BooleanOptionalAction, + help="Run testdrive files", + ) + parser.add_argument( + "--run-mz-debug", + default=True, + action=argparse.BooleanOptionalAction, + help="Run mz-debug", + ) parser.add_argument( "--tag", type=str, @@ -727,41 +801,49 @@ def workflow_aws_upgrade(c: Composition, parser: WorkflowArgumentParser) -> None tag = get_tag(args.tag) path = MZ_ROOT / "test" / "terraform" / "aws-upgrade" aws = AWS(path) + mz_debug_build_thread: threading.Thread | None = None try: + if args.run_mz_debug: + mz_debug_build_thread = build_mz_debug_async() aws.setup("aws-upgrade", args.setup, previous_tag) aws.upgrade(tag) - # Try waiting a bit, otherwise connection error, should be handled better - time.sleep(180) - print("--- Running tests") - with c.override(testdrive(no_reset=False)): - aws.connect(c) + if args.test: + # Try waiting a bit, otherwise connection error, should be handled better + time.sleep(180) + print("--- Running tests") + with c.override(testdrive(no_reset=False)): + aws.connect(c) - with psycopg.connect( - "postgres://materialize@127.0.0.1:6875/materialize" - ) as conn: - with conn.cursor() as cur: - cur.execute("SELECT 1") - results = cur.fetchall() - assert results == [(1,)], results - cur.execute("SELECT mz_version()") - version = cur.fetchall()[0][0] - assert version.startswith( - tag.split("--")[0] + " " - ), f"Version expected to start with {tag.split('--')[0]}, but is actually {version}" - with open( - MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml" - ) as f: - content = yaml.load(f, Loader=yaml.Loader) - helm_chart_version = content["version"] - assert version.endswith( - f", helm chart: {helm_chart_version})" - ), f"Actual version: {version}, expected to contain {helm_chart_version}" - - c.run_testdrive_files(*args.files) + with psycopg.connect( + "postgres://materialize@127.0.0.1:6875/materialize" + ) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + results = cur.fetchall() + assert results == [(1,)], results + cur.execute("SELECT mz_version()") + version = cur.fetchall()[0][0] + assert version.startswith( + tag.split("--")[0] + " " + ), f"Version expected to start with {tag.split('--')[0]}, but is actually {version}" + with open( + MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml" + ) as f: + content = yaml.load(f, Loader=yaml.Loader) + helm_chart_version = content["version"] + assert version.endswith( + f", helm chart: {helm_chart_version})" + ), f"Actual version: {version}, expected to contain {helm_chart_version}" + + if args.run_testdrive_files: + c.run_testdrive_files(*TD_CMD, *args.files) finally: aws.cleanup() - mz_debug() + if args.run_mz_debug: + assert mz_debug_build_thread + mz_debug_build_thread.join() + run_mz_debug() if args.cleanup: aws.destroy() @@ -898,12 +980,24 @@ def workflow_gcp_temporary(c: Composition, parser: WorkflowArgumentParser) -> No action=argparse.BooleanOptionalAction, help="Destroy the region at the end of the workflow.", ) + parser.add_argument( + "--test", + default=True, + action=argparse.BooleanOptionalAction, + help="Run the actual test part", + ) parser.add_argument( "--run-testdrive-files", default=True, action=argparse.BooleanOptionalAction, help="Run testdrive files", ) + parser.add_argument( + "--run-mz-debug", + default=True, + action=argparse.BooleanOptionalAction, + help="Run mz-debug", + ) parser.add_argument( "--tag", type=str, @@ -934,7 +1028,10 @@ def workflow_gcp_temporary(c: Composition, parser: WorkflowArgumentParser) -> No f.write(gcp_service_account_json) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = str(gcloud_creds_path) + mz_debug_build_thread: threading.Thread | None = None try: + if args.run_mz_debug: + mz_debug_build_thread = build_mz_debug_async() spawn.runv(["gcloud", "config", "set", "project", "materialize-ci"]) spawn.runv( @@ -1212,125 +1309,132 @@ def workflow_gcp_temporary(c: Composition, parser: WorkflowArgumentParser) -> No print(f"Failed to get logs for {pod_name}") raise ValueError("Never completed") - print("--- Running tests") - environmentd_name = spawn.capture( - [ - "kubectl", - "get", - "pods", - "-l", - "app=environmentd", - "-n", - "materialize-environment", - "-o", - "jsonpath={.items[*].metadata.name}", - ], - cwd=path, - ) - - balancerd_name = spawn.capture( - [ - "kubectl", - "get", - "pods", - "-l", - "app=balancerd", - "-n", - "materialize-environment", - "-o", - "jsonpath={.items[*].metadata.name}", - ], - cwd=path, - ) - # error: arguments in resource/name form must have a single resource and name - print(f"Got balancerd name: {balancerd_name}") - - environmentd_port_forward_process = subprocess.Popen( - [ - "kubectl", - "port-forward", - f"pod/{environmentd_name}", - "-n", - "materialize-environment", - "6877:6877", - "6878:6878", - ], - preexec_fn=os.setpgrp, - ) - balancerd_port_forward_process = subprocess.Popen( - [ - "kubectl", - "port-forward", - f"pod/{balancerd_name}", - "-n", - "materialize-environment", - "6875:6875", - "6876:6876", - ], - preexec_fn=os.setpgrp, - ) - time.sleep(10) + if args.test: + print("--- Running tests") + environmentd_name = spawn.capture( + [ + "kubectl", + "get", + "pods", + "-l", + "app=environmentd", + "-n", + "materialize-environment", + "-o", + "jsonpath={.items[*].metadata.name}", + ], + cwd=path, + ) - with psycopg.connect( - "postgres://mz_system:materialize@127.0.0.1:6877/materialize", - autocommit=True, - ) as conn: - with conn.cursor() as cur: - # Required for some testdrive tests - cur.execute("ALTER CLUSTER mz_system SET (REPLICATION FACTOR 2)") + balancerd_name = spawn.capture( + [ + "kubectl", + "get", + "pods", + "-l", + "app=balancerd", + "-n", + "materialize-environment", + "-o", + "jsonpath={.items[*].metadata.name}", + ], + cwd=path, + ) + # error: arguments in resource/name form must have a single resource and name + print(f"Got balancerd name: {balancerd_name}") - with c.override( - Testdrive( - materialize_url="postgres://materialize@127.0.0.1:6875/materialize", - materialize_url_internal="postgres://mz_system:materialize@127.0.0.1:6877/materialize", - materialize_use_https=True, - no_consistency_checks=True, - network_mode="host", - volume_workdir="../testdrive:/workdir", - # For full testdrive support we'll need: - # kafka_url=... - # schema_registry_url=... - # aws_endpoint=... + environmentd_port_forward_process = subprocess.Popen( + [ + "kubectl", + "port-forward", + f"pod/{environmentd_name}", + "-n", + "materialize-environment", + "6877:6877", + "6878:6878", + ], + preexec_fn=os.setpgrp, ) - ): - c.up("testdrive", persistent=True) - c.testdrive( - dedent( - """ - > SELECT 1 - 1 - """ - ) + balancerd_port_forward_process = subprocess.Popen( + [ + "kubectl", + "port-forward", + f"pod/{balancerd_name}", + "-n", + "materialize-environment", + "6875:6875", + "6876:6876", + ], + preexec_fn=os.setpgrp, ) + time.sleep(10) with psycopg.connect( - "postgres://materialize@127.0.0.1:6875/materialize" + "postgres://mz_system:materialize@127.0.0.1:6877/materialize", + autocommit=True, ) as conn: with conn.cursor() as cur: - cur.execute("SELECT 1") - results = cur.fetchall() - assert results == [(1,)], results - cur.execute("SELECT mz_version()") - version = cur.fetchall()[0][0] - assert version.startswith(tag.split("--")[0] + " ") - with open( - MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml" - ) as f: - content = yaml.load(f, Loader=yaml.Loader) - helm_chart_version = content["version"] - assert version.endswith( - f", helm chart: {helm_chart_version})" - ), f"Actual version: {version}, expected to contain {helm_chart_version}" - - if args.run_testdrive_files: - c.run_testdrive_files(*args.files) + # Required for some testdrive tests + cur.execute("ALTER CLUSTER mz_system SET (REPLICATION FACTOR 2)") + cur.execute( + "ALTER SYSTEM SET enable_create_table_from_source = true" + ) + + with c.override( + Testdrive( + materialize_url="postgres://materialize@127.0.0.1:6875/materialize", + materialize_url_internal="postgres://mz_system:materialize@127.0.0.1:6877/materialize", + materialize_use_https=False, + no_consistency_checks=True, + network_mode="host", + volume_workdir="../testdrive:/workdir", + # For full testdrive support we'll need: + # kafka_url=... + # schema_registry_url=... + # aws_endpoint=... + ) + ): + c.up("testdrive", persistent=True) + c.testdrive( + dedent( + """ + > SELECT 1 + 1 + """ + ) + ) + + with psycopg.connect( + "postgres://materialize@127.0.0.1:6875/materialize" + ) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + results = cur.fetchall() + assert results == [(1,)], results + cur.execute("SELECT mz_version()") + version = cur.fetchall()[0][0] + assert version.startswith(tag.split("--")[0] + " ") + with open( + MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml" + ) as f: + content = yaml.load(f, Loader=yaml.Loader) + helm_chart_version = content["version"] + assert version.endswith( + f", helm chart: {helm_chart_version})" + ), f"Actual version: {version}, expected to contain {helm_chart_version}" + + if args.run_testdrive_files: + c.run_testdrive_files(*TD_CMD, *args.files) finally: if environmentd_port_forward_process: os.killpg(os.getpgid(environmentd_port_forward_process.pid), signal.SIGTERM) if balancerd_port_forward_process: os.killpg(os.getpgid(balancerd_port_forward_process.pid), signal.SIGTERM) - mz_debug() + if args.run_mz_debug: + assert mz_debug_build_thread + mz_debug_build_thread.join() + run_mz_debug() if args.cleanup: print("--- Cleaning up") @@ -1368,12 +1472,24 @@ def workflow_azure_temporary(c: Composition, parser: WorkflowArgumentParser) -> action=argparse.BooleanOptionalAction, help="Destroy the region at the end of the workflow.", ) + parser.add_argument( + "--test", + default=True, + action=argparse.BooleanOptionalAction, + help="Run the actual test part", + ) parser.add_argument( "--run-testdrive-files", default=True, action=argparse.BooleanOptionalAction, help="Run testdrive files", ) + parser.add_argument( + "--run-mz-debug", + default=True, + action=argparse.BooleanOptionalAction, + help="Run mz-debug", + ) parser.add_argument( "--tag", type=str, @@ -1405,7 +1521,10 @@ def workflow_azure_temporary(c: Composition, parser: WorkflowArgumentParser) -> env=venv_env, ) + mz_debug_build_thread: threading.Thread | None = None try: + if args.run_mz_debug: + mz_debug_build_thread = build_mz_debug_async() if os.getenv("CI"): username = os.getenv("AZURE_SERVICE_ACCOUNT_USERNAME") password = os.getenv("AZURE_SERVICE_ACCOUNT_PASSWORD") @@ -1586,7 +1705,7 @@ def workflow_azure_temporary(c: Composition, parser: WorkflowArgumentParser) -> stdin=yaml.dump(materialize_environment).encode(), env=venv_env, ) - for i in range(60): + for i in range(120): try: spawn.runv( [ @@ -1604,7 +1723,7 @@ def workflow_azure_temporary(c: Composition, parser: WorkflowArgumentParser) -> time.sleep(1) else: raise ValueError("Never completed") - for i in range(180): + for i in range(240): try: spawn.runv( ["kubectl", "get", "pods", "-n", "materialize-environment"], @@ -1706,130 +1825,137 @@ def workflow_azure_temporary(c: Composition, parser: WorkflowArgumentParser) -> ) raise ValueError("Never completed") - print("--- Running tests") - environmentd_name = spawn.capture( - [ - "kubectl", - "get", - "pods", - "-l", - "app=environmentd", - "-n", - "materialize-environment", - "-o", - "jsonpath={.items[*].metadata.name}", - ], - cwd=path, - env=venv_env, - ) - - balancerd_name = spawn.capture( - [ - "kubectl", - "get", - "pods", - "-l", - "app=balancerd", - "-n", - "materialize-environment", - "-o", - "jsonpath={.items[*].metadata.name}", - ], - cwd=path, - env=venv_env, - ) - # error: arguments in resource/name form must have a single resource and name - print(f"Got environmentd name: {environmentd_name}") - print(f"Got balancerd name: {balancerd_name}") - - environmentd_port_forward_process = subprocess.Popen( - [ - "kubectl", - "port-forward", - f"pod/{environmentd_name}", - "-n", - "materialize-environment", - "6877:6877", - "6878:6878", - ], - preexec_fn=os.setpgrp, - env=venv_env, - ) - balancerd_port_forward_process = subprocess.Popen( - [ - "kubectl", - "port-forward", - f"pod/{balancerd_name}", - "-n", - "materialize-environment", - "6875:6875", - "6876:6876", - ], - preexec_fn=os.setpgrp, - env=venv_env, - ) - time.sleep(10) + if args.test: + print("--- Running tests") + environmentd_name = spawn.capture( + [ + "kubectl", + "get", + "pods", + "-l", + "app=environmentd", + "-n", + "materialize-environment", + "-o", + "jsonpath={.items[*].metadata.name}", + ], + cwd=path, + env=venv_env, + ) - with psycopg.connect( - "postgres://mz_system:materialize@127.0.0.1:6877/materialize", - autocommit=True, - ) as conn: - with conn.cursor() as cur: - # Required for some testdrive tests - cur.execute("ALTER CLUSTER mz_system SET (REPLICATION FACTOR 2)") + balancerd_name = spawn.capture( + [ + "kubectl", + "get", + "pods", + "-l", + "app=balancerd", + "-n", + "materialize-environment", + "-o", + "jsonpath={.items[*].metadata.name}", + ], + cwd=path, + env=venv_env, + ) + # error: arguments in resource/name form must have a single resource and name + print(f"Got environmentd name: {environmentd_name}") + print(f"Got balancerd name: {balancerd_name}") - with c.override( - Testdrive( - materialize_url="postgres://materialize@127.0.0.1:6875/materialize", - materialize_url_internal="postgres://mz_system:materialize@127.0.0.1:6877/materialize", - materialize_use_https=True, - no_consistency_checks=True, - network_mode="host", - volume_workdir="../testdrive:/workdir", - # For full testdrive support we'll need: - # kafka_url=... - # schema_registry_url=... - # aws_endpoint=... + environmentd_port_forward_process = subprocess.Popen( + [ + "kubectl", + "port-forward", + f"pod/{environmentd_name}", + "-n", + "materialize-environment", + "6877:6877", + "6878:6878", + ], + preexec_fn=os.setpgrp, + env=venv_env, ) - ): - c.up("testdrive", persistent=True) - c.testdrive( - dedent( - """ - > SELECT 1 - 1 - """ - ) + balancerd_port_forward_process = subprocess.Popen( + [ + "kubectl", + "port-forward", + f"pod/{balancerd_name}", + "-n", + "materialize-environment", + "6875:6875", + "6876:6876", + ], + preexec_fn=os.setpgrp, + env=venv_env, ) + time.sleep(10) with psycopg.connect( - "postgres://materialize@127.0.0.1:6875/materialize" + "postgres://mz_system:materialize@127.0.0.1:6877/materialize", + autocommit=True, ) as conn: with conn.cursor() as cur: - cur.execute("SELECT 1") - results = cur.fetchall() - assert results == [(1,)], results - cur.execute("SELECT mz_version()") - version = cur.fetchall()[0][0] - assert version.startswith(tag.split("--")[0] + " ") - with open( - MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml" - ) as f: - content = yaml.load(f, Loader=yaml.Loader) - helm_chart_version = content["version"] - assert version.endswith( - f", helm chart: {helm_chart_version})" - ), f"Actual version: {version}, expected to contain {helm_chart_version}" - - if args.run_testdrive_files: - c.run_testdrive_files(*args.files) + # Required for some testdrive tests + cur.execute("ALTER CLUSTER mz_system SET (REPLICATION FACTOR 2)") + cur.execute( + "ALTER SYSTEM SET enable_create_table_from_source = true" + ) + + with c.override( + Testdrive( + materialize_url="postgres://materialize@127.0.0.1:6875/materialize", + materialize_url_internal="postgres://mz_system:materialize@127.0.0.1:6877/materialize", + materialize_use_https=False, + no_consistency_checks=True, + network_mode="host", + volume_workdir="../testdrive:/workdir", + # For full testdrive support we'll need: + # kafka_url=... + # schema_registry_url=... + # aws_endpoint=... + ) + ): + c.up("testdrive", persistent=True) + c.testdrive( + dedent( + """ + > SELECT 1 + 1 + """ + ) + ) + + with psycopg.connect( + "postgres://materialize@127.0.0.1:6875/materialize" + ) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + results = cur.fetchall() + assert results == [(1,)], results + cur.execute("SELECT mz_version()") + version = cur.fetchall()[0][0] + assert version.startswith(tag.split("--")[0] + " ") + with open( + MZ_ROOT / "misc" / "helm-charts" / "operator" / "Chart.yaml" + ) as f: + content = yaml.load(f, Loader=yaml.Loader) + helm_chart_version = content["version"] + assert version.endswith( + f", helm chart: {helm_chart_version})" + ), f"Actual version: {version}, expected to contain {helm_chart_version}" + + if args.run_testdrive_files: + c.run_testdrive_files(*TD_CMD, *args.files) finally: if environmentd_port_forward_process: os.killpg(os.getpgid(environmentd_port_forward_process.pid), signal.SIGTERM) if balancerd_port_forward_process: os.killpg(os.getpgid(balancerd_port_forward_process.pid), signal.SIGTERM) - mz_debug(env=venv_env) + if args.run_mz_debug: + assert mz_debug_build_thread + mz_debug_build_thread.join() + run_mz_debug(env=venv_env) if args.cleanup: print("--- Cleaning up") diff --git a/test/testdrive/github-6942.td b/test/testdrive/github-6942.td index 3e02c960bcecf..f9993a4c9c7af 100644 --- a/test/testdrive/github-6942.td +++ b/test/testdrive/github-6942.td @@ -8,7 +8,7 @@ # by the Apache License, Version 2.0. > DROP CLUSTER IF EXISTS gh_6942_cluster CASCADE; -> CREATE CLUSTER gh_6942_cluster SIZE '1', REPLICATION FACTOR 1; +> CREATE CLUSTER gh_6942_cluster SIZE '${arg.default-storage-size}', REPLICATION FACTOR 1; > CREATE SOURCE auction_house IN CLUSTER gh_6942_cluster diff --git a/test/testdrive/webhook.td b/test/testdrive/webhook.td index 51a92f74d75fa..dbbf8ac1981da 100644 --- a/test/testdrive/webhook.td +++ b/test/testdrive/webhook.td @@ -11,9 +11,9 @@ $ set-arg-default replicas=1 # Exercises Webhook sources. -> CREATE CLUSTER webhook_cluster REPLICAS (r1 (SIZE '1')); +> CREATE CLUSTER webhook_cluster REPLICAS (r1 (SIZE '${arg.default-storage-size}')); -> CREATE CLUSTER webhook_compute REPLICAS (r1 (SIZE '1')); +> CREATE CLUSTER webhook_compute REPLICAS (r1 (SIZE '${arg.default-storage-size}')); > CREATE SOURCE webhook_text IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;