Skip to content

Commit

Permalink
Add tests for export admin actions
Browse files Browse the repository at this point in the history
  • Loading branch information
Eg0ra committed Oct 24, 2024
1 parent dc0ce20 commit 06ffe14
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 13 deletions.
21 changes: 9 additions & 12 deletions import_export_extensions/admin/model_admins/export_job_admin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

from http import HTTPStatus
import http

from django.contrib import admin, messages
from django.core.handlers.wsgi import WSGIRequest
Expand Down Expand Up @@ -96,7 +96,7 @@ def export_job_progress_view(
except self.export_job_model.DoesNotExist as error:
return JsonResponse(
dict(validation_error=error.args[0]),
status=HTTPStatus.NOT_FOUND,
status=http.HTTPStatus.NOT_FOUND,
)

response_data = dict(status=job.export_status.title())
Expand Down Expand Up @@ -131,7 +131,9 @@ def get_readonly_fields(self, request, obj=None):
Some fields are editable for new ExportJob.
"""
readonly_fields = [
base_readonly_fields = super().get_readonly_fields(request, obj)
readonly_fields = (
*base_readonly_fields,
"export_status",
"traceback",
"file_format_path",
Expand All @@ -140,15 +142,10 @@ def get_readonly_fields(self, request, obj=None):
"export_finished",
"error_message",
"_model",
]
if obj:
readonly_fields.extend(
[
"resource_path",
"data_file",
"resource_kwargs",
],
)
"resource_path",
"data_file",
"resource_kwargs",
)

return readonly_fields

Expand Down
163 changes: 162 additions & 1 deletion test_project/tests/integration_tests/test_admin/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,165 @@ def test_export_progress_with_failed_celery_task(
assert (
artist_export_job.export_status == ExportJob.ExportStatus.EXPORT_ERROR
)
assert artist_export_job.error_message == expected_error_message


@pytest.mark.django_db(transaction=True)
def test_cancel_export_admin_action(
client: Client,
superuser: User,
mocker: pytest_mock.MockerFixture,
):
"""Test `cancel_export` via admin action."""
client.force_login(superuser)

revoke_mock = mocker.patch("celery.current_app.control.revoke")
export_data_mock = mocker.patch(
"import_export_extensions.models.ExportJob.export_data",
)
job: ExportJob = ArtistExportJobFactory()

response = client.post(
reverse("admin:import_export_extensions_exportjob_changelist"),
data={
"action": "cancel_jobs",
"_selected_action": [job.pk],
},
)
job.refresh_from_db()

assert response.status_code == status.HTTP_302_FOUND
assert job.export_status == ExportJob.ExportStatus.CANCELLED
assert (
response.wsgi_request._messages._queued_messages[0].message
== f"Export of {job} canceled"
)
export_data_mock.assert_called_once()
revoke_mock.assert_called_once_with(job.export_task_id, terminate=True)


@pytest.mark.django_db(transaction=True)
def test_cancel_export_admin_action_with_incorrect_export_job_status(
client: Client,
superuser: User,
mocker: pytest_mock.MockerFixture,
):
"""Test `cancel_export` via admin action with wrong export job status."""
client.force_login(superuser)

revoke_mock = mocker.patch("celery.current_app.control.revoke")
job: ExportJob = ArtistExportJobFactory()

expected_error_message = f"ExportJob with id {job.pk} has incorrect status"

response = client.post(
reverse("admin:import_export_extensions_exportjob_changelist"),
data={
"action": "cancel_jobs",
"_selected_action": [job.pk],
},
)
job.refresh_from_db()

assert response.status_code == status.HTTP_302_FOUND
assert job.export_status == ExportJob.ExportStatus.EXPORTED
assert (
expected_error_message
in response.wsgi_request._messages._queued_messages[0].message
)
revoke_mock.assert_not_called()


@pytest.mark.parametrize(
argnames=["job_status", "expected_fieldsets"],
argvalues=[
pytest.param(
ExportJob.ExportStatus.CREATED,
(
(
"export_status",
"_model",
"created",
"export_started",
"export_finished",
),
),
id="Get fieldsets for job in status CREATED",
),
pytest.param(
ExportJob.ExportStatus.EXPORTED,
(
(
"export_status",
"_model",
"created",
"export_started",
"export_finished",
),
("data_file",),
),
id="Get fieldsets for job in status EXPORTED",
),
pytest.param(
ExportJob.ExportStatus.EXPORTING,
(
(
"export_status",
"export_progressbar",
),
),
id="Get fieldsets for job in status EXPORTING",
),
pytest.param(
ExportJob.ExportStatus.EXPORT_ERROR,
(
(
"export_status",
"_model",
"created",
"export_started",
"export_finished",
),
(
"error_message",
"traceback",
),
),
id="Get fieldsets for job in status EXPORT_ERROR",
),
],
)
def test_get_fieldsets_by_export_job_status(
client: Client,
superuser: User,
job_status: ExportJob.ExportStatus,
expected_fieldsets: tuple[tuple[str]],
mocker: pytest_mock.MockerFixture,
):
"""Test that appropriate fieldsets returned for different job statuses."""
client.force_login(superuser)

mocker.patch(
"import_export_extensions.models.ExportJob.export_data",
)
job: ExportJob = ArtistExportJobFactory()
job.export_status = job_status
job.save()

response = client.get(
reverse(
"admin:import_export_extensions_exportjob_change",
kwargs={"object_id": job.pk},
),
)

fieldsets = response.context["adminform"].fieldsets
fields = [fields["fields"] for _, fields in fieldsets]

assert tuple(fields) == (
*expected_fieldsets,
(
"resource_path",
"resource_kwargs",
"file_format_path",
),
)

0 comments on commit 06ffe14

Please sign in to comment.