Skip to content

Commit 4cdc96e

Browse files
committed
[DOP-24570] Add tests for lineage excluding symlinks with no IO
1 parent 8dc8bda commit 4cdc96e

File tree

8 files changed

+285
-4
lines changed

8 files changed

+285
-4
lines changed

Makefile

+4-4
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ db-views: ##@DB Create views
6464
broker: broker-start ##@Broker Prepare broker (in docker)
6565

6666
broker-start: ##Broker Start broker
67-
docker compose -f docker-compose.test.yml up -d --wait broker $(DOCKER_COMPOSE_ARGS)
67+
docker compose -f docker-compose.test.yml --profile consumer up -d --wait $(DOCKER_COMPOSE_ARGS)
6868

6969

7070
test: test-db test-broker ##@Test Run tests
@@ -84,7 +84,7 @@ test-db-start: ##@TestDB Start database
8484
test-broker: test-broker-start ##@TestBroker Prepare broker (in docker)
8585

8686
test-broker-start: ##@TestBroker Start broker
87-
docker compose -f docker-compose.test.yml up -d --wait broker $(DOCKER_COMPOSE_ARGS)
87+
docker compose -f docker-compose.test.yml --profile consumer up -d --wait $(DOCKER_COMPOSE_ARGS)
8888

8989
test-ci: test-db test-broker ##@Test Run CI tests
9090
${POETRY} run coverage run -m pytest
@@ -93,7 +93,7 @@ test-check-fixtures: ##@Test Check declared fixtures
9393
${POETRY} run pytest --dead-fixtures $(PYTEST_ARGS)
9494

9595
test-cleanup: ##@Test Cleanup tests dependencies
96-
docker compose -f docker-compose.test.yml down $(ARGS)
96+
docker compose -f docker-compose.test.yml --profile all down --remove-orphans $(ARGS)
9797

9898

9999

@@ -110,7 +110,7 @@ prod: ##@Application Run production containers
110110
docker compose up -d
111111

112112
prod-cleanup: ##@Application Stop production containers
113-
docker compose down $(ARGS)
113+
docker compose down --remove-orphans $(ARGS)
114114

115115

116116
.PHONY: docs

docker-compose.test.yml

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ services:
2727
interval: 10s
2828
timeout: 5s
2929
retries: 5
30+
profiles:
31+
- consumer
32+
- all
3033

3134
keycloak:
3235
image: quay.io/keycloak/keycloak:latest
@@ -39,6 +42,9 @@ services:
3942
- 8080:8080
4043
volumes:
4144
- keycloak_data:/opt/keycloak/data
45+
profiles:
46+
- keycloak
47+
- all
4248

4349
volumes:
4450
postgres_test_data:

tests/test_server/fixtures/factories/address.py

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ async def create_address(
3030
else:
3131
address_kwargs = {"location_id": location_id}
3232
address = address_factory(**address_kwargs)
33+
del address.id
3334
async_session.add(address)
3435
await async_session.commit()
3536
await async_session.refresh(address)

tests/test_server/fixtures/factories/lineage.py

+29
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,35 @@ async def lineage_with_symlinks(
691691
await clean_db(async_session)
692692

693693

694+
@pytest_asyncio.fixture()
695+
async def lineage_with_unconnected_symlinks(
696+
lineage_with_depth: LineageResult,
697+
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],
698+
) -> AsyncGenerator[LineageResult, None]:
699+
# Same as lineage_with_depth, but each dataset has also a symlink,
700+
# not connected to any input or output.
701+
702+
lineage = lineage_with_depth
703+
704+
async with async_session_maker() as async_session:
705+
existing_datasets = lineage.datasets.copy()
706+
for dataset in existing_datasets:
707+
another_location = await create_location(async_session)
708+
another_dataset = await create_dataset(async_session, location_id=another_location.id)
709+
lineage.datasets.append(another_dataset)
710+
711+
metastore = [await make_symlink(async_session, another_dataset, dataset, DatasetSymlinkType.METASTORE)]
712+
lineage.dataset_symlinks.extend(metastore)
713+
714+
warehouse = [await make_symlink(async_session, dataset, another_dataset, DatasetSymlinkType.WAREHOUSE)]
715+
lineage.dataset_symlinks.extend(warehouse)
716+
717+
yield lineage
718+
719+
async with async_session_maker() as async_session:
720+
await clean_db(async_session)
721+
722+
694723
@pytest_asyncio.fixture
695724
async def duplicated_lineage_with_column_lineage(
696725
async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]],

tests/test_server/test_lineage/test_dataset_lineage.py

+67
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,73 @@ async def test_get_dataset_lineage_with_symlink(
950950
}
951951

952952

953+
async def test_get_dataset_lineage_with_symlink_without_input_output(
954+
test_client: AsyncClient,
955+
async_session: AsyncSession,
956+
lineage_with_unconnected_symlinks: LineageResult,
957+
mocked_user: MockedUser,
958+
):
959+
lineage = lineage_with_unconnected_symlinks
960+
# Start from any dataset between J0 and J1, as it has both inputs and outputs
961+
dataset = lineage.datasets[1]
962+
963+
inputs = [input for input in lineage.inputs if input.dataset_id == dataset.id]
964+
assert inputs
965+
966+
outputs = [output for output in lineage.outputs if output.dataset_id == dataset.id]
967+
assert outputs
968+
969+
operation_ids = {input.operation_id for input in inputs} | {output.operation_id for output in outputs}
970+
operations = [operation for operation in lineage.operations if operation.id in operation_ids]
971+
assert operations
972+
973+
run_ids = {operation.run_id for operation in operations}
974+
runs = [run for run in lineage.runs if run.id in run_ids]
975+
assert runs
976+
977+
job_ids = {run.job_id for run in runs}
978+
jobs = [job for job in lineage.jobs if job.id in job_ids]
979+
assert jobs
980+
981+
[dataset] = await enrich_datasets([dataset], async_session)
982+
jobs = await enrich_jobs(jobs, async_session)
983+
runs = await enrich_runs(runs, async_session)
984+
since = min(run.created_at for run in runs)
985+
986+
response = await test_client.get(
987+
"v1/datasets/lineage",
988+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
989+
params={
990+
"since": since.isoformat(),
991+
"start_node_id": dataset.id,
992+
},
993+
)
994+
995+
assert response.status_code == HTTPStatus.OK, response.json()
996+
assert response.json() == {
997+
"relations": {
998+
"parents": run_parents_to_json(runs),
999+
"symlinks": [], # symlinks without inputs/outputs are excluded
1000+
"inputs": [
1001+
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
1002+
*inputs_to_json(merge_io_by_runs(inputs), granularity="RUN"),
1003+
],
1004+
"outputs": [
1005+
*outputs_to_json(merge_io_by_jobs(outputs), granularity="JOB"),
1006+
*outputs_to_json(merge_io_by_runs(outputs), granularity="RUN"),
1007+
],
1008+
"direct_column_lineage": [],
1009+
"indirect_column_lineage": [],
1010+
},
1011+
"nodes": {
1012+
"datasets": datasets_to_json([dataset]),
1013+
"jobs": jobs_to_json(jobs),
1014+
"runs": runs_to_json(runs),
1015+
"operations": {},
1016+
},
1017+
}
1018+
1019+
9531020
@pytest.mark.parametrize("dataset_index", [0, 1], ids=["output", "input"])
9541021
async def test_get_dataset_lineage_unmergeable_schema_and_output_type(
9551022
test_client: AsyncClient,

tests/test_server/test_lineage/test_job_lineage.py

+53
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,59 @@ async def test_get_job_lineage_with_symlinks(
783783
}
784784

785785

786+
async def test_get_job_lineage_with_symlink_without_input_output(
787+
test_client: AsyncClient,
788+
async_session: AsyncSession,
789+
lineage_with_unconnected_symlinks: LineageResult,
790+
mocked_user: MockedUser,
791+
):
792+
lineage = lineage_with_unconnected_symlinks
793+
794+
job = lineage.jobs[0]
795+
inputs = [input for input in lineage.inputs if input.job_id == job.id]
796+
assert inputs
797+
798+
outputs = [output for output in lineage.outputs if output.job_id == job.id]
799+
assert outputs
800+
801+
dataset_ids = {input.dataset_id for input in inputs} | {output.dataset_id for output in outputs}
802+
assert dataset_ids
803+
804+
datasets = [dataset for dataset in lineage.datasets if dataset.id in dataset_ids]
805+
assert datasets
806+
807+
[job] = await enrich_jobs([job], async_session)
808+
datasets = await enrich_datasets(datasets, async_session)
809+
since = min(run.created_at for run in lineage.runs if run.job_id == job.id)
810+
811+
response = await test_client.get(
812+
"v1/jobs/lineage",
813+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
814+
params={
815+
"since": since.isoformat(),
816+
"start_node_id": job.id,
817+
},
818+
)
819+
820+
assert response.status_code == HTTPStatus.OK, response.json()
821+
assert response.json() == {
822+
"relations": {
823+
"parents": [],
824+
"symlinks": [], # symlinks without inputs/outputs are excluded
825+
"inputs": inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
826+
"outputs": outputs_to_json(merge_io_by_jobs(outputs), granularity="JOB"),
827+
"direct_column_lineage": [],
828+
"indirect_column_lineage": [],
829+
},
830+
"nodes": {
831+
"datasets": datasets_to_json(datasets),
832+
"jobs": jobs_to_json([job]),
833+
"runs": {},
834+
"operations": {},
835+
},
836+
}
837+
838+
786839
async def test_get_job_lineage_unmergeable_inputs_and_outputs(
787840
test_client: AsyncClient,
788841
async_session: AsyncSession,

tests/test_server/test_lineage/test_operation_lineage.py

+64
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,70 @@ async def test_get_operation_lineage_with_symlinks(
640640
}
641641

642642

643+
async def test_get_operation_lineage_with_symlink_without_input_output(
644+
test_client: AsyncClient,
645+
async_session: AsyncSession,
646+
lineage_with_unconnected_symlinks: LineageResult,
647+
mocked_user: MockedUser,
648+
):
649+
lineage = lineage_with_unconnected_symlinks
650+
operation = lineage.operations[0]
651+
652+
inputs = [input for input in lineage.inputs if input.operation_id == operation.id]
653+
assert inputs
654+
655+
outputs = [output for output in lineage.outputs if output.operation_id == operation.id]
656+
assert outputs
657+
658+
dataset_ids = {input.dataset_id for input in inputs} | {output.dataset_id for output in outputs}
659+
assert dataset_ids
660+
661+
datasets = [dataset for dataset in lineage.datasets if dataset.id in dataset_ids]
662+
assert datasets
663+
664+
run = next(run for run in lineage.runs if run.id == operation.run_id)
665+
job = next(job for job in lineage.jobs if job.id == run.job_id)
666+
667+
[job] = await enrich_jobs([job], async_session)
668+
[run] = await enrich_runs([run], async_session)
669+
datasets = await enrich_datasets(datasets, async_session)
670+
671+
response = await test_client.get(
672+
"v1/operations/lineage",
673+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
674+
params={
675+
"since": run.created_at.isoformat(),
676+
"start_node_id": str(operation.id),
677+
},
678+
)
679+
680+
assert response.status_code == HTTPStatus.OK, response.json()
681+
assert response.json() == {
682+
"relations": {
683+
"parents": run_parents_to_json([run]) + operation_parents_to_json([operation]),
684+
"symlinks": [], # symlinks without inputs/outputs are excluded
685+
"inputs": [
686+
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
687+
*inputs_to_json(inputs, granularity="OPERATION"),
688+
*inputs_to_json(merge_io_by_runs(inputs), granularity="RUN"),
689+
],
690+
"outputs": [
691+
*outputs_to_json(merge_io_by_jobs(outputs), granularity="JOB"),
692+
*outputs_to_json(outputs, granularity="OPERATION"),
693+
*outputs_to_json(merge_io_by_runs(outputs), granularity="RUN"),
694+
],
695+
"direct_column_lineage": [],
696+
"indirect_column_lineage": [],
697+
},
698+
"nodes": {
699+
"datasets": datasets_to_json(datasets),
700+
"jobs": jobs_to_json([job]),
701+
"runs": runs_to_json([run]),
702+
"operations": operations_to_json([operation]),
703+
},
704+
}
705+
706+
643707
async def test_get_operation_lineage_with_empty_io_stats_and_schema(
644708
test_client: AsyncClient,
645709
async_session: AsyncSession,

tests/test_server/test_lineage/test_run_lineage.py

+61
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,67 @@ async def test_get_run_lineage_with_symlinks(
822822
}
823823

824824

825+
async def test_get_run_lineage_with_symlink_without_input_output(
826+
test_client: AsyncClient,
827+
async_session: AsyncSession,
828+
lineage_with_unconnected_symlinks: LineageResult,
829+
mocked_user: MockedUser,
830+
):
831+
lineage = lineage_with_unconnected_symlinks
832+
833+
run = lineage.runs[0]
834+
job = next(job for job in lineage.jobs if job.id == run.job_id)
835+
836+
inputs = [input for input in lineage.inputs if input.run_id == run.id]
837+
assert inputs
838+
839+
outputs = [output for output in lineage.outputs if output.run_id == run.id]
840+
assert outputs
841+
842+
dataset_ids = {input.dataset_id for input in inputs} | {output.dataset_id for output in outputs}
843+
assert dataset_ids
844+
845+
datasets = [dataset for dataset in lineage.datasets if dataset.id in dataset_ids]
846+
assert datasets
847+
848+
[job] = await enrich_jobs([job], async_session)
849+
[run] = await enrich_runs([run], async_session)
850+
datasets = await enrich_datasets(datasets, async_session)
851+
852+
response = await test_client.get(
853+
"v1/runs/lineage",
854+
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
855+
params={
856+
"since": run.created_at.isoformat(),
857+
"start_node_id": str(run.id),
858+
},
859+
)
860+
861+
assert response.status_code == HTTPStatus.OK, response.json()
862+
assert response.json() == {
863+
"relations": {
864+
"parents": run_parents_to_json([run]),
865+
"symlinks": [], # symlinks without inputs/outputs are excluded
866+
"inputs": [
867+
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
868+
*inputs_to_json(merge_io_by_runs(inputs), granularity="RUN"),
869+
],
870+
"outputs": [
871+
*outputs_to_json(merge_io_by_jobs(outputs), granularity="JOB"),
872+
*outputs_to_json(merge_io_by_runs(outputs), granularity="RUN"),
873+
],
874+
"direct_column_lineage": [],
875+
"indirect_column_lineage": [],
876+
},
877+
"nodes": {
878+
"datasets": datasets_to_json(datasets),
879+
"jobs": jobs_to_json([job]),
880+
"runs": runs_to_json([run]),
881+
"operations": {},
882+
},
883+
}
884+
885+
825886
async def test_get_run_lineage_unmergeable_inputs_and_outputs(
826887
test_client: AsyncClient,
827888
async_session: AsyncSession,

0 commit comments

Comments
 (0)