From f67c19a1bf33462263f568db6832345d85ddb4ac Mon Sep 17 00:00:00 2001 From: Eg0ra Date: Fri, 1 Nov 2024 11:43:43 +0700 Subject: [PATCH] Prepare tests for celery admin mixins Remove redundant pytest.params Improve import naming --- .../admin/mixins/base_mixin.py | 22 +++ .../admin/mixins/export_mixin.py | 53 +++---- .../admin/mixins/import_mixin.py | 90 ++++------- .../admin/model_admins/export_job_admin.py | 18 +-- .../admin/model_admins/import_job_admin.py | 5 +- .../admin/model_admins/mixins.py | 10 +- pyproject.toml | 2 +- .../test_admin/test_changelist_view.py | 21 +++ .../test_admin/test_export.py | 110 +++++++++++++ .../test_admin/test_import.py | 145 ++++++++++++++++++ .../integration_tests/test_api/test_export.py | 29 +--- .../integration_tests/test_api/test_import.py | 96 +++--------- test_project/tests/test_utils.py | 4 +- 13 files changed, 388 insertions(+), 217 deletions(-) create mode 100644 import_export_extensions/admin/mixins/base_mixin.py create mode 100644 test_project/tests/integration_tests/test_admin/test_changelist_view.py diff --git a/import_export_extensions/admin/mixins/base_mixin.py b/import_export_extensions/admin/mixins/base_mixin.py new file mode 100644 index 0000000..9aa0a22 --- /dev/null +++ b/import_export_extensions/admin/mixins/base_mixin.py @@ -0,0 +1,22 @@ +import typing + +from import_export import admin as import_export_admin + +from . import types + + +class BaseCeleryImportExportAdminMixin( + import_export_admin.ImportExportMixinBase, +): + """Extend base mixin with common logic for import/export.""" + + @property + def model_info(self) -> types.ModelInfo: + """Get info of model.""" + return types.ModelInfo( + meta=self.model._meta, + ) + + def get_context_data(self, **kwargs) -> dict[str, typing.Any]: + """Get context data.""" + return {} diff --git a/import_export_extensions/admin/mixins/export_mixin.py b/import_export_extensions/admin/mixins/export_mixin.py index 27b084e..b7d5df2 100644 --- a/import_export_extensions/admin/mixins/export_mixin.py +++ b/import_export_extensions/admin/mixins/export_mixin.py @@ -4,7 +4,6 @@ from django.core.handlers.wsgi import WSGIRequest from django.http import ( HttpResponse, - HttpResponseForbidden, HttpResponseRedirect, ) from django.shortcuts import get_object_or_404 @@ -12,17 +11,17 @@ from django.urls import re_path, reverse from django.utils.translation import gettext_lazy as _ -from import_export import admin as base_admin -from import_export import forms as base_forms -from import_export import mixins as base_mixins +from import_export import admin as import_export_admin +from import_export import forms as import_export_forms +from import_export import mixins as import_export_mixins from ... import models -from . import types +from . import base_mixin, types class CeleryExportAdminMixin( - base_mixins.BaseExportMixin, - base_admin.ImportExportMixinBase, + import_export_mixins.BaseExportMixin, + base_mixin.BaseCeleryImportExportAdminMixin, ): """Admin mixin for celery export. @@ -63,22 +62,13 @@ class CeleryExportAdminMixin( export_results_statuses = models.ExportJob.export_finished_statuses # Copy methods of mixin from original package to reuse it here - has_export_permission = base_admin.ExportMixin.has_export_permission + has_export_permission = ( + import_export_admin.ExportMixin.has_export_permission + ) - @property - def model_info(self) -> types.ModelInfo: - """Get info of exported model.""" - return types.ModelInfo( - meta=self.model._meta, - ) - - def get_context_data( - self, - request: WSGIRequest, - **kwargs, - ) -> dict[str, typing.Any]: - """Get context data.""" - return {} + def get_export_context_data(self, **kwargs): + """Get context data for export.""" + return self.get_context_data(**kwargs) def get_urls(self): """Return list of urls. @@ -130,7 +120,7 @@ def celery_export_action(self, request, *args, **kwargs): raise PermissionDenied formats = self.get_export_formats() - form = base_forms.ExportForm( + form = import_export_forms.ExportForm( formats=formats, resources=self.get_export_resource_classes(request), data=request.POST or None, @@ -158,7 +148,7 @@ def celery_export_action(self, request, *args, **kwargs): ) # GET: display Export Form - context = self.get_context_data(request) + context = self.get_export_context_data() context.update(self.admin_site.each_context(request)) context["title"] = _("Export") @@ -196,7 +186,7 @@ def export_job_status_view( job=job, ) - context = self.get_context_data(request) + context = self.get_export_context_data() job_url = reverse("admin:export_job_progress", args=(job.id,)) context["title"] = _("Export status") @@ -235,10 +225,7 @@ def export_job_results_view( job=job, ) - context = self.get_context_data(request) - - if request.method != "GET": - return HttpResponseForbidden() + context = self.get_export_context_data() # GET request, show export results context["title"] = _("Export results") @@ -293,8 +280,7 @@ def _redirect_to_export_status_page( ) url = reverse(url_name, kwargs=dict(job_id=job.id)) query = request.GET.urlencode() - if query: - url = f"{url}?{query}" + url = f"{url}?{query}" if query else url return HttpResponseRedirect(redirect_to=url) def _redirect_to_export_results_page( @@ -308,8 +294,7 @@ def _redirect_to_export_results_page( ) url = reverse(url_name, kwargs=dict(job_id=job.id)) query = request.GET.urlencode() - if query: - url = f"{url}?{query}" + url = f"{url}?{query}" if query else url return HttpResponseRedirect(redirect_to=url) def changelist_view( @@ -319,5 +304,5 @@ def changelist_view( ): """Add the check for permission to changelist template context.""" context = context or {} - context["has_export_permission"] = True + context["has_export_permission"] = self.has_export_permission(request) return super().changelist_view(request, context) diff --git a/import_export_extensions/admin/mixins/import_mixin.py b/import_export_extensions/admin/mixins/import_mixin.py index 3c6a54f..3b31a3a 100644 --- a/import_export_extensions/admin/mixins/import_mixin.py +++ b/import_export_extensions/admin/mixins/import_mixin.py @@ -1,7 +1,6 @@ import typing from django.conf import settings -from django.core.cache import cache from django.core.exceptions import PermissionDenied from django.core.handlers.wsgi import WSGIRequest from django.forms.forms import Form @@ -15,17 +14,17 @@ from django.urls import re_path, reverse from django.utils.translation import gettext_lazy as _ -from import_export import admin as base_admin -from import_export import mixins as base_mixins +from import_export import admin as import_export_admin +from import_export import mixins as import_export_mixins from ... import models from ..forms import ForceImportForm -from . import types +from . import base_mixin, types class CeleryImportAdminMixin( - base_mixins.BaseImportMixin, - base_admin.ImportExportMixinBase, + import_export_mixins.BaseImportMixin, + base_mixin.BaseCeleryImportExportAdminMixin, ): """Admin mixin for celery import. @@ -75,30 +74,17 @@ class CeleryImportAdminMixin( skip_admin_log = None # Copy methods of mixin from original package to reuse it here - generate_log_entries = base_admin.ImportMixin.generate_log_entries - get_skip_admin_log = base_admin.ImportMixin.get_skip_admin_log - has_import_permission = base_admin.ImportMixin.has_import_permission - _log_actions = base_admin.ImportMixin._log_actions - _create_log_entries = base_admin.ImportMixin._create_log_entries - _create_log_entry = base_admin.ImportMixin._create_log_entry - - @property - def model_info(self) -> types.ModelInfo: - """Get info of imported model.""" - return types.ModelInfo( - meta=self.model._meta, - ) - - def get_context_data( - self, - request: WSGIRequest, - **kwargs, - ) -> dict[str, typing.Any]: - """Get context data.""" - return {} + generate_log_entries = import_export_admin.ImportMixin.generate_log_entries + get_skip_admin_log = import_export_admin.ImportMixin.get_skip_admin_log + has_import_permission = ( + import_export_admin.ImportMixin.has_import_permission + ) + _log_actions = import_export_admin.ImportMixin._log_actions + _create_log_entries = import_export_admin.ImportMixin._create_log_entries + _create_log_entry = import_export_admin.ImportMixin._create_log_entry def get_import_context_data(self, **kwargs): - """Get context for import data.""" + """Get context data for import.""" return self.get_context_data(**kwargs) def get_urls(self): @@ -159,7 +145,7 @@ def celery_import_action( if not self.has_import_permission(request): raise PermissionDenied - context = self.get_context_data(request) + context = self.get_import_context_data() resource_classes = self.get_import_resource_classes(request) form = ForceImportForm( @@ -238,7 +224,7 @@ def celery_import_job_status_view( job=job, ) - context = self.get_context_data(request) + context = self.get_import_context_data() job_url = reverse("admin:import_job_progress", args=(job.id,)) context.update( dict( @@ -283,7 +269,7 @@ def celery_import_job_results_view( job=job, ) - context = self.get_context_data(request=request) + context = self.get_import_context_data() if request.method == "GET": # GET request, show parse results @@ -292,14 +278,14 @@ def celery_import_job_results_view( context["result"] = result context["title"] = _("Import results") - if job.import_status != models.ImportJob.ImportStatus.PARSED: + if job.import_status == models.ImportJob.ImportStatus.PARSED: + context["confirm_form"] = Form() + else: # display import form context["import_form"] = ForceImportForm( formats=self.get_import_formats(), resources=self.get_import_resource_classes(request), ) - else: - context["confirm_form"] = Form() context.update(self.admin_site.each_context(request)) context["opts"] = self.model_info.meta @@ -310,17 +296,21 @@ def celery_import_job_results_view( context, ) - # POST request. If data is invalid - error - if job.import_status != models.ImportJob.ImportStatus.PARSED: - return HttpResponseForbidden( - "Data invalid, before importing data " - "needs to be successfully parsed." - f"Current status: {job.import_status}", + # POST request + if job.import_status == models.ImportJob.ImportStatus.PARSED: + # start celery task for data importing + job.confirm_import() + return self._redirect_to_import_status_page( + request=request, + job=job, ) - # start celery task for data importing - job.confirm_import() - return self._redirect_to_import_status_page(request=request, job=job) + return HttpResponseForbidden( + "Data invalid, before importing data " + "needs to be successfully parsed. " + f"Current status: {job.import_status}", + ) + def create_import_job( self, @@ -384,20 +374,6 @@ def _redirect_to_results_page( if job.import_status != models.ImportJob.ImportStatus.PARSED: return HttpResponseRedirect(redirect_to=url) - # Redirections add one by one links to `redirect_to` - key = request.session.get("redirect_key", None) - session = request.session - if key: - links = cache.get(key) - try: - session["redirect_to"] = links[0] - del links[0] - cache.set(key, links) - except (TypeError, IndexError): - session.pop("redirect_to", None) - session.pop("redirect_key", None) - cache.delete(key) - return HttpResponseRedirect(redirect_to=url) def changelist_view( diff --git a/import_export_extensions/admin/model_admins/export_job_admin.py b/import_export_extensions/admin/model_admins/export_job_admin.py index 594a5bb..3a728ca 100644 --- a/import_export_extensions/admin/model_admins/export_job_admin.py +++ b/import_export_extensions/admin/model_admins/export_job_admin.py @@ -8,7 +8,7 @@ from django.urls import re_path from django.utils.translation import gettext_lazy as _ -from ... import models, tasks +from ... import models from .. import forms from . import mixins @@ -55,22 +55,6 @@ class ExportJobAdmin( "resource_kwargs", ) - def export_data_action( - self, - request: WSGIRequest, - obj: models.ExportJob, - ): - """Admin action for starting data export. - - Data export should auto start after export confirmation. But there - may be issues with celery and task did not start. So this action - for such cases. - - """ - tasks.export_data_task.delay(obj.id) - - export_data_action.label = _("Start Export") - def get_urls(self): """Add url to get current export job progress in JSON representation. diff --git a/import_export_extensions/admin/model_admins/import_job_admin.py b/import_export_extensions/admin/model_admins/import_job_admin.py index 2c5f4fa..f87d079 100644 --- a/import_export_extensions/admin/model_admins/import_job_admin.py +++ b/import_export_extensions/admin/model_admins/import_job_admin.py @@ -137,7 +137,7 @@ def import_job_progress_view( def _show_results( self, - obj: models.ImportJob | None = None, + obj: models.ImportJob, ) -> str: """Show results totals. @@ -149,9 +149,6 @@ def _show_results( Error: 0 """ - if not obj: - return "" - result_sections = [] for key, value in obj.result.totals.items(): status_template = f"{key.title()}: {value}" diff --git a/import_export_extensions/admin/model_admins/mixins.py b/import_export_extensions/admin/model_admins/mixins.py index 18f232d..b213fb0 100644 --- a/import_export_extensions/admin/model_admins/mixins.py +++ b/import_export_extensions/admin/model_admins/mixins.py @@ -4,7 +4,7 @@ from django.utils.module_loading import import_string from django.utils.translation import gettext_lazy as _ -from ... import models +from ...models.core import BaseJob class BaseImportExportJobAdminMixin: @@ -32,20 +32,20 @@ def has_delete_permission( """ return False - def _model(self, obj: models.ImportJob) -> str: + def _model(self, obj: BaseJob) -> str: """Add `model` field of import/export job.""" try: resource_class = import_string(obj.resource_path) model = resource_class.Meta.model._meta.verbose_name_plural # In case resource has no Meta or model we need to catch AttributeError - except (ImportError, AttributeError): + except (ImportError, AttributeError): # pragma: no cover model = _("Unknown") return model def get_from_content_type( self, - obj: models.ImportJob | models.ExportJob, - ) -> ContentType | None: + obj: BaseJob, + ) -> ContentType | None: # pragma: no cover """Shortcut to get object from content_type.""" content_type = obj.resource_kwargs.get("content_type") obj_id = obj.resource_kwargs.get("object_id") diff --git a/pyproject.toml b/pyproject.toml index 41aa93f..f357149 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -417,7 +417,7 @@ omit = [ ] [tool.coverage.report] -exclude_lines = [ +exclude_also = [ "def __repr__", "def __str__", "pass", diff --git a/test_project/tests/integration_tests/test_admin/test_changelist_view.py b/test_project/tests/integration_tests/test_admin/test_changelist_view.py new file mode 100644 index 0000000..00ceaca --- /dev/null +++ b/test_project/tests/integration_tests/test_admin/test_changelist_view.py @@ -0,0 +1,21 @@ +from django.contrib.auth.models import User +from django.test.client import Client +from django.urls import reverse + +from rest_framework import status + + +def test_changelist_view_permission_context( + client: Client, + superuser: User, +): + """Make sure changelist view has property to show import/export buttons.""" + client.force_login(superuser) + + response = client.get( + path=reverse("admin:fake_app_artist_changelist"), + ) + assert response.status_code == status.HTTP_200_OK + context = response.context + assert context["has_export_permission"] + assert context["has_import_permission"] diff --git a/test_project/tests/integration_tests/test_admin/test_export.py b/test_project/tests/integration_tests/test_admin/test_export.py index bc8935e..0f9420d 100644 --- a/test_project/tests/integration_tests/test_admin/test_export.py +++ b/test_project/tests/integration_tests/test_admin/test_export.py @@ -9,6 +9,7 @@ import pytest import pytest_mock from celery import states +from pytest_lazy_fixtures import lf from import_export_extensions.models import ExportJob from test_project.fake_app.factories import ArtistExportJobFactory @@ -54,6 +55,115 @@ def test_export_using_admin_model(client: Client, superuser: User): assert export_job.export_status == ExportJob.ExportStatus.EXPORTED +@pytest.mark.parametrize( + argnames=["view_name", "path_kwargs"], + argvalues=[ + pytest.param( + "admin:fake_app_artist_export", + None, + id="Test access to `celery_export_action`", + ), + pytest.param( + "admin:fake_app_artist_export_job_status", + {"job_id": lf("artist_export_job.pk")}, + id="Test access to `export_job_status_view`", + ), + pytest.param( + "admin:fake_app_artist_export_job_results", + {"job_id": lf("artist_export_job.pk")}, + id="Test access to `export_job_results_view`", + ), + ], +) +def test_export_using_admin_model_without_permissions( + client: Client, + superuser: User, + view_name: str, + path_kwargs: dict[str, str], + mocker: pytest_mock.MockerFixture, +): + """Test access to celery-export endpoints forbidden without permission.""" + client.force_login(superuser) + mocker.patch( + "test_project.fake_app.admin.ArtistAdmin.has_export_permission", + return_value=False, + ) + + response = client.get( + path=reverse( + view_name, + kwargs=path_kwargs, + ), + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_celery_export_status_view_during_export( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test export status page when export in progress.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.export_data_task.apply_async") + artist_export_job = ArtistExportJobFactory() + artist_export_job.export_status = ExportJob.ExportStatus.EXPORTING + artist_export_job.save() + + response = client.get( + path=reverse( + "admin:fake_app_artist_export_job_status", + kwargs={"job_id": artist_export_job.pk}, + ), + ) + + expected_export_job_url = reverse( + "admin:export_job_progress", + kwargs={"job_id": artist_export_job.id}, + ) + + assert response.status_code == status.HTTP_200_OK + assert response.context["export_job_url"] == expected_export_job_url + + +@pytest.mark.parametrize( + argnames="incorrect_job_status", + argvalues=[ + ExportJob.ExportStatus.CREATED, + ExportJob.ExportStatus.EXPORTING, + ExportJob.ExportStatus.CANCELLED, + ], +) +def test_celery_export_results_view_redirect_to_status_page( + client: Client, + superuser: User, + incorrect_job_status: ExportJob.ExportStatus, + mocker: pytest_mock.MockerFixture, +): + """Test redirect to export status page when job in not results statuses.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.export_data_task.apply_async") + artist_export_job = ArtistExportJobFactory() + artist_export_job.export_status = incorrect_job_status + artist_export_job.save() + + response = client.get( + path=reverse( + "admin:fake_app_artist_export_job_results", + kwargs={"job_id": artist_export_job.pk}, + ), + ) + + expected_redirect_url = reverse( + "admin:fake_app_artist_export_job_status", + kwargs={"job_id": artist_export_job.pk}, + ) + assert response.status_code == status.HTTP_302_FOUND + assert response.url == expected_redirect_url + + @pytest.mark.django_db(transaction=True) def test_export_progress_during_export( client: Client, diff --git a/test_project/tests/integration_tests/test_admin/test_import.py b/test_project/tests/integration_tests/test_admin/test_import.py index 36f4ab8..9ad0ca1 100644 --- a/test_project/tests/integration_tests/test_admin/test_import.py +++ b/test_project/tests/integration_tests/test_admin/test_import.py @@ -10,6 +10,7 @@ import pytest import pytest_mock from celery import states +from pytest_lazy_fixtures import lf from import_export_extensions.models import ImportJob from test_project.fake_app.factories import ArtistImportJobFactory @@ -83,6 +84,150 @@ def test_import_using_admin_model( assert import_job.import_status == ImportJob.ImportStatus.IMPORTED +@pytest.mark.parametrize( + argnames=["view_name", "path_kwargs"], + argvalues=[ + pytest.param( + "admin:fake_app_artist_import", + None, + id="Test access to `celery_import_action`", + ), + pytest.param( + "admin:fake_app_artist_import_job_status", + {"job_id": lf("artist_import_job.pk")}, + id="Test access to `import_job_status_view`", + ), + pytest.param( + "admin:fake_app_artist_import_job_results", + {"job_id": lf("artist_import_job.pk")}, + id="Test access to `import_job_results_view`", + ), + ], +) +def test_import_using_admin_model_without_permissions( + client: Client, + superuser: User, + view_name: str, + path_kwargs: dict[str, str], + mocker: pytest_mock.MockerFixture, +): + """Test access to celery-import endpoints forbidden without permission.""" + client.force_login(superuser) + mocker.patch( + "test_project.fake_app.admin.ArtistAdmin.has_import_permission", + return_value=False, + ) + response = client.get( + path=reverse( + view_name, + kwargs=path_kwargs, + ), + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_celery_import_status_view_during_import( + client: Client, + superuser: User, + mocker: pytest_mock.MockerFixture, +): + """Test import status page when import in progress.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.parse_data_task.apply_async") + artist_import_job = ArtistImportJobFactory(skip_parse_step=True) + artist_import_job.import_status = ImportJob.ImportStatus.IMPORTING + artist_import_job.save() + + response = client.get( + path=reverse( + "admin:fake_app_artist_import_job_status", + kwargs={"job_id": artist_import_job.pk}, + ), + ) + + expected_import_job_url = reverse( + "admin:import_job_progress", + kwargs={"job_id": artist_import_job.id}, + ) + + assert response.status_code == status.HTTP_200_OK + assert response.context["import_job_url"] == expected_import_job_url + + +@pytest.mark.parametrize( + argnames="incorrect_job_status", + argvalues=[ + ImportJob.ImportStatus.CREATED, + ImportJob.ImportStatus.PARSING, + ImportJob.ImportStatus.PARSE_ERROR, + ImportJob.ImportStatus.CONFIRMED, + ImportJob.ImportStatus.IMPORTING, + ImportJob.ImportStatus.CANCELLED, + ], +) +def test_celery_import_results_view_redirect_to_status_page( + client: Client, + superuser: User, + incorrect_job_status: ImportJob.ImportStatus, + mocker: pytest_mock.MockerFixture, +): + """Test redirect to import status page when job in not result status.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.parse_data_task.apply_async") + artist_import_job = ArtistImportJobFactory() + artist_import_job.import_status = incorrect_job_status + artist_import_job.save() + + response = client.get( + path=reverse( + "admin:fake_app_artist_import_job_results", + kwargs={"job_id": artist_import_job.pk}, + ), + ) + + expected_redirect_url = reverse( + "admin:fake_app_artist_import_job_status", + kwargs={"job_id": artist_import_job.pk}, + ) + assert response.status_code == status.HTTP_302_FOUND + assert response.url == expected_redirect_url + + +@pytest.mark.parametrize( + argnames="incorrect_job_status", + argvalues=[ + ImportJob.ImportStatus.INPUT_ERROR, + ImportJob.ImportStatus.IMPORT_ERROR, + ImportJob.ImportStatus.IMPORTED, + ], +) +def test_celery_import_results_confirm_forbidden( + client: Client, + superuser: User, + incorrect_job_status: ImportJob.ImportStatus, + mocker: pytest_mock.MockerFixture, +): + """Check that confirm from result page forbidden for not PARSED jobs.""" + client.force_login(superuser) + + mocker.patch("import_export_extensions.tasks.parse_data_task.apply_async") + artist_import_job = ArtistImportJobFactory() + artist_import_job.import_status = incorrect_job_status + artist_import_job.save() + + response = client.post( + path=reverse( + "admin:fake_app_artist_import_job_results", + kwargs={"job_id": artist_import_job.pk}, + ), + data={"confirm": "Confirm import"}, + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + @pytest.mark.usefixtures("existing_artist") def test_import_admin_has_same_formats( client: Client, diff --git a/test_project/tests/integration_tests/test_api/test_export.py b/test_project/tests/integration_tests/test_api/test_export.py index 79a26af..b972a62 100644 --- a/test_project/tests/integration_tests/test_api/test_export.py +++ b/test_project/tests/integration_tests/test_api/test_export.py @@ -70,16 +70,10 @@ def test_export_api_detail( @pytest.mark.django_db(transaction=True) @pytest.mark.parametrize( - argnames=["allowed_cancel_status"], + argnames="allowed_cancel_status", argvalues=[ - pytest.param( - ExportJob.ExportStatus.CREATED, - id="Cancel job with `CREATED` status", - ), - pytest.param( - ExportJob.ExportStatus.EXPORTING, - id="Cancel job with `EXPORTING` status", - ), + ExportJob.ExportStatus.CREATED, + ExportJob.ExportStatus.EXPORTING, ], ) def test_export_api_cancel( @@ -103,20 +97,11 @@ def test_export_api_cancel( @pytest.mark.django_db(transaction=True) @pytest.mark.parametrize( - argnames=["incorrect_job_status"], + argnames="incorrect_job_status", argvalues=[ - pytest.param( - ExportJob.ExportStatus.EXPORT_ERROR, - id="Cancel export job with `EXPORT_ERROR` status", - ), - pytest.param( - ExportJob.ExportStatus.EXPORTED, - id="Cancel export job with `EXPORTED` status", - ), - pytest.param( - ExportJob.ExportStatus.CANCELLED, - id="Cancel export job with `CANCELLED` status", - ), + ExportJob.ExportStatus.EXPORT_ERROR, + ExportJob.ExportStatus.EXPORTED, + ExportJob.ExportStatus.CANCELLED, ], ) def test_export_api_cancel_with_errors( diff --git a/test_project/tests/integration_tests/test_api/test_import.py b/test_project/tests/integration_tests/test_api/test_import.py index 4660bc5..6352ddf 100644 --- a/test_project/tests/integration_tests/test_api/test_import.py +++ b/test_project/tests/integration_tests/test_api/test_import.py @@ -197,44 +197,17 @@ def test_import_api_confirm_parsed_job( @pytest.mark.django_db(transaction=True) @pytest.mark.parametrize( - argnames=["incorrect_job_status"], + argnames="incorrect_job_status", argvalues=[ - pytest.param( - ImportJob.ImportStatus.CREATED, - id="Confirm import job with `CREATED` status", - ), - pytest.param( - ImportJob.ImportStatus.PARSING, - id="Confirm import job with `PARSING` status", - ), - pytest.param( - ImportJob.ImportStatus.PARSE_ERROR, - id="Confirm import job with `PARSE_ERROR` status", - ), - pytest.param( - ImportJob.ImportStatus.CONFIRMED, - id="Confirm import job with `CONFIRMED` status", - ), - pytest.param( - ImportJob.ImportStatus.INPUT_ERROR, - id="Confirm import job with `INPUT_ERROR` status", - ), - pytest.param( - ImportJob.ImportStatus.IMPORTING, - id="Confirm import job with `IMPORTING` status", - ), - pytest.param( - ImportJob.ImportStatus.IMPORT_ERROR, - id="Confirm import job with `IMPORT_ERROR` status", - ), - pytest.param( - ImportJob.ImportStatus.IMPORTED, - id="Confirm import job with `IMPORTED` status", - ), - pytest.param( - ImportJob.ImportStatus.CANCELLED, - id="Confirm import job with `CANCELLED` status", - ), + ImportJob.ImportStatus.CREATED, + ImportJob.ImportStatus.PARSING, + ImportJob.ImportStatus.PARSE_ERROR, + ImportJob.ImportStatus.CONFIRMED, + ImportJob.ImportStatus.INPUT_ERROR, + ImportJob.ImportStatus.IMPORTING, + ImportJob.ImportStatus.IMPORT_ERROR, + ImportJob.ImportStatus.IMPORTED, + ImportJob.ImportStatus.CANCELLED, ], ) def test_import_api_confirm_incorrect_job_status( @@ -262,24 +235,12 @@ def test_import_api_confirm_incorrect_job_status( @pytest.mark.django_db(transaction=True) @pytest.mark.parametrize( - argnames=["allowed_cancel_status"], + argnames="allowed_cancel_status", argvalues=[ - pytest.param( - ImportJob.ImportStatus.CREATED, - id="Cancel import job with `CREATED` status", - ), - pytest.param( - ImportJob.ImportStatus.PARSING, - id="Cancel import job with `PARSING` status", - ), - pytest.param( - ImportJob.ImportStatus.IMPORTING, - id="Cancel import job with `IMPORTING` status", - ), - pytest.param( - ImportJob.ImportStatus.CONFIRMED, - id="Cancel import job with `CONFIRMED` status", - ), + ImportJob.ImportStatus.CREATED, + ImportJob.ImportStatus.PARSING, + ImportJob.ImportStatus.IMPORTING, + ImportJob.ImportStatus.CONFIRMED, ], ) def test_import_api_cancel_job( @@ -303,28 +264,13 @@ def test_import_api_cancel_job( @pytest.mark.django_db(transaction=True) @pytest.mark.parametrize( - argnames=["incorrect_job_status"], + argnames="incorrect_job_status", argvalues=[ - pytest.param( - ImportJob.ImportStatus.INPUT_ERROR, - id="Cancel import job with `INPUT_ERROR` status", - ), - pytest.param( - ImportJob.ImportStatus.PARSE_ERROR, - id="Cancel import job with `PARSE_ERROR` status", - ), - pytest.param( - ImportJob.ImportStatus.IMPORT_ERROR, - id="Cancel import job with `IMPORT_ERROR` status", - ), - pytest.param( - ImportJob.ImportStatus.IMPORTED, - id="Cancel import job with `IMPORTED` status", - ), - pytest.param( - ImportJob.ImportStatus.CANCELLED, - id="Cancel import job with `CANCELLED` status", - ), + ImportJob.ImportStatus.INPUT_ERROR, + ImportJob.ImportStatus.PARSE_ERROR, + ImportJob.ImportStatus.IMPORT_ERROR, + ImportJob.ImportStatus.IMPORTED, + ImportJob.ImportStatus.CANCELLED, ], ) def test_import_api_cancel_incorrect_job_status( diff --git a/test_project/tests/test_utils.py b/test_project/tests/test_utils.py index 34de96e..adde903 100644 --- a/test_project/tests/test_utils.py +++ b/test_project/tests/test_utils.py @@ -65,8 +65,8 @@ def test_clear_q_filter(): f"http://{AWS_STORAGE_BUCKET_NAME}.s3.region.com/dir/file.csv", "dir/file.csv", id=( - "File from s3 bucket if using virtual addressing style:" - "https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html" + # https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html + "File from s3 bucket if using virtual addressing style." ), ), ],