Skip to content

Commit 4667628

Browse files
committed
Try to make 0dt upsert trigger it
1 parent e20b920 commit 4667628

File tree

1 file changed

+37
-16
lines changed

1 file changed

+37
-16
lines changed

test/0dt/mzcompose.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,21 @@ def workflow_kafka_source_rehydration(c: Composition) -> None:
908908
)
909909

910910
start_time = time.time()
911+
c.testdrive(
912+
dedent(
913+
f"""
914+
$ kafka-create-topic topic=kafka-large
915+
"""))
916+
for i in range(repeats):
917+
c.testdrive(
918+
dedent(
919+
f"""
920+
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-large repeat={count}
921+
key{i}A,key{i}${{kafka-ingest.iteration}}:value{i}A,${{kafka-ingest.iteration}}
922+
"""
923+
)
924+
)
925+
911926
c.testdrive(
912927
dedent(
913928
f"""
@@ -916,9 +931,6 @@ def workflow_kafka_source_rehydration(c: Composition) -> None:
916931
> CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL = 'PLAINTEXT';
917932
> CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
918933
919-
$ kafka-create-topic topic=kafka-large
920-
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-large repeat={count}
921-
key0A,key${{kafka-ingest.iteration}}:value0A,${{kafka-ingest.iteration}}
922934
> CREATE SOURCE kafka_source
923935
IN CLUSTER cluster
924936
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-large-${{testdrive.seed}}');
@@ -931,24 +943,14 @@ def workflow_kafka_source_rehydration(c: Composition) -> None:
931943
> CREATE VIEW kafka_source_cnt AS SELECT count(*) FROM kafka_source_tbl
932944
> CREATE DEFAULT INDEX on kafka_source_cnt
933945
> SELECT * FROM kafka_source_cnt
934-
{count}
946+
{count*repeats}
935947
"""
936948
)
937949
)
938-
for i in range(1, repeats):
939-
c.testdrive(
940-
dedent(
941-
f"""
942-
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-large repeat={count}
943-
key{i}A,key{i}${{kafka-ingest.iteration}}:value{i}A,${{kafka-ingest.iteration}}
944-
> SELECT * FROM kafka_source_cnt
945-
{count*(i+1)}
946-
"""
947-
)
948-
)
949950

950951
elapsed = time.time() - start_time
951952
print(f"initial ingestion took {elapsed} seconds")
953+
time.sleep(30)
952954

953955
with c.override(
954956
Materialized(
@@ -959,7 +961,16 @@ def workflow_kafka_source_rehydration(c: Composition) -> None:
959961
restart="on-failure",
960962
external_metadata_store=True,
961963
default_replication_factor=2,
962-
)
964+
),
965+
Testdrive(
966+
materialize_url="postgres://materialize@mz_new:6875",
967+
materialize_url_internal="postgres://materialize@mz_new:6877",
968+
mz_service="mz_new",
969+
materialize_params={"cluster": "cluster"},
970+
no_reset=True,
971+
seed=1,
972+
default_timeout=DEFAULT_TIMEOUT,
973+
),
963974
):
964975
c.up("mz_new")
965976
start_time = time.time()
@@ -974,6 +985,16 @@ def workflow_kafka_source_rehydration(c: Composition) -> None:
974985
elapsed = time.time() - start_time
975986
print(f"promotion took {elapsed} seconds")
976987

988+
for i in range(repeats):
989+
c.testdrive(
990+
dedent(
991+
f"""
992+
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-large repeat={count}
993+
key{i}A,key{i}${{kafka-ingest.iteration}}:value{i}A,${{kafka-ingest.iteration}}
994+
"""
995+
)
996+
)
997+
977998
start_time = time.time()
978999
result = c.sql_query("SELECT 1", service="mz_new")
9791000
elapsed = time.time() - start_time

0 commit comments

Comments
 (0)