diff --git a/geonode/base/api/permissions.py b/geonode/base/api/permissions.py index c5c36534234..2fd1c34b5c4 100644 --- a/geonode/base/api/permissions.py +++ b/geonode/base/api/permissions.py @@ -37,6 +37,7 @@ from guardian.shortcuts import get_objects_for_user from itertools import chain from guardian.shortcuts import get_groups_with_perms +from geonode.security.registry import permissions_registry logger = logging.getLogger(__name__) @@ -251,7 +252,7 @@ def has_permission(self, request, view): ) # getting the user permission for that resource - resource_perms = res.get_user_perms(request.user) + resource_perms = permissions_registry.get_perms(instance=res, user=request.user) groups = get_groups_with_perms(res, attach_perms=True) # we are making this because the request.user.groups sometimes returns empty si is not fully reliable diff --git a/geonode/base/api/serializers.py b/geonode/base/api/serializers.py index 96338ebf675..27a8a122a2e 100644 --- a/geonode/base/api/serializers.py +++ b/geonode/base/api/serializers.py @@ -525,7 +525,7 @@ def to_representation(self, instance): request = self.context.get("request", None) resource = ResourceBase.objects.get(pk=instance) return ( - permissions_registry.get_perms(instance=resource, user=request.user, include_virtual=True) + permissions_registry.get_perms(instance=resource, user=request.user) if request and request.user and resource else [] ) diff --git a/geonode/base/api/tests.py b/geonode/base/api/tests.py index 6fa473ffcec..009acd52d7e 100644 --- a/geonode/base/api/tests.py +++ b/geonode/base/api/tests.py @@ -2377,10 +2377,7 @@ def test_resource_service_copy_with_perms_dataset_set_default_perms(self): self.assertTrue( "bobby" in "bobby" - in [ - x.username - for x in permissions_registry.get_perms(instance=resource, include_virtual=True).get("users", []) - ] + in [x.username for x in permissions_registry.get_perms(instance=resource).get("users", [])] ) # copying the resource, should remove the perms for bobby # only the default perms should be available @@ -2398,17 +2395,11 @@ def test_resource_service_copy_with_perms_dataset_set_default_perms(self): self.assertIsNotNone(_resource) self.assertNotIn( "bobby", - [ - x.username - for x in permissions_registry.get_perms(instance=_resource, include_virtual=True).get("users", []) - ], + [x.username for x in permissions_registry.get_perms(instance=_resource).get("users", [])], ) self.assertIn( "admin", - [ - x.username - for x in permissions_registry.get_perms(instance=_resource, include_virtual=True).get("users", []) - ], + [x.username for x in permissions_registry.get_perms(instance=_resource).get("users", [])], ) def test_resource_service_copy_with_perms_doc(self): @@ -3447,7 +3438,7 @@ def test_simple_resourcebase_can_be_created_by_resourcemanager(self): "groups": {anonymous_group: set(["view_resourcebase"])}, } - actual_perms = permissions_registry.get_perms(instance=resource, include_virtual=True).copy() + actual_perms = permissions_registry.get_perms(instance=resource).copy() self.assertIsNotNone(actual_perms) self.assertTrue(self.user in actual_perms["users"].keys()) self.assertTrue(anonymous_group in actual_perms["groups"].keys()) diff --git a/geonode/base/views.py b/geonode/base/views.py index 31f4bf2f677..f8a9741bdef 100644 --- a/geonode/base/views.py +++ b/geonode/base/views.py @@ -72,6 +72,7 @@ from geonode.base.forms import CategoryForm, TKeywordForm, ThesaurusAvailableForm from geonode.base.models import Thesaurus, TopicCategory +from geonode.security.registry import permissions_registry from .forms import ResourceBaseForm @@ -535,8 +536,7 @@ def resourcebase_embed(request, resourcebaseid, template="base/base_edit.html"): # Call this first in order to be sure "perms_list" is correct permissions_json = _perms_info_json(resourcebase_obj) - - perms_list = resourcebase_obj.get_user_perms(request.user) + perms_list = permissions_registry.get_perms(instance=resourcebase_obj, user=request.user) group = None if resourcebase_obj.group: diff --git a/geonode/documents/tests.py b/geonode/documents/tests.py index 7f678328e56..4de6e8ffd46 100644 --- a/geonode/documents/tests.py +++ b/geonode/documents/tests.py @@ -403,7 +403,7 @@ def test_set_document_permissions(self): # Test that previous permissions for users other than ones specified in # the perm_spec (and the document owner) were - current_perms = permissions_registry.get_perms(instance=document, include_virtual=True) + current_perms = permissions_registry.get_perms(instance=document) self.assertEqual(len(current_perms["users"]), 1) # Test that the User permissions specified in the perm_spec were diff --git a/geonode/geoapps/views.py b/geonode/geoapps/views.py index a20064f01b8..2d9277a24f0 100644 --- a/geonode/geoapps/views.py +++ b/geonode/geoapps/views.py @@ -46,6 +46,7 @@ from geonode.base.forms import CategoryForm, TKeywordForm, ThesaurusAvailableForm from geonode.base.models import Thesaurus, TopicCategory from geonode.utils import resolve_object +from geonode.security.registry import permissions_registry from .forms import GeoAppForm @@ -106,7 +107,7 @@ def geoapp_edit(request, geoappid, template="apps/app_edit.html"): # Call this first in order to be sure "perms_list" is correct permissions_json = _perms_info_json(geoapp_obj) - perms_list = geoapp_obj.get_user_perms(request.user) + perms_list = permissions_registry.get_perms(instance=geoapp_obj, user=request.user) group = None if geoapp_obj.group: diff --git a/geonode/geoserver/security.py b/geonode/geoserver/security.py index 5109cec0997..5cc93e77b93 100644 --- a/geonode/geoserver/security.py +++ b/geonode/geoserver/security.py @@ -350,7 +350,7 @@ def sync_resources_with_guardian(resource=None, force=False): batch = AutoPriorityBatch(gf_utils.get_first_available_priority(), f"Sync resources {dataset}") gf_utils.collect_delete_layer_rules(get_dataset_workspace(dataset), dataset.name, batch) - perm_spec = permissions_registry.get_perms(instance=dataset, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=dataset) # All the other users if "users" in perm_spec: for user, perms in perm_spec["users"].items(): diff --git a/geonode/geoserver/tests/integration.py b/geonode/geoserver/tests/integration.py index 39d8040e80f..06cb8a43e9a 100644 --- a/geonode/geoserver/tests/integration.py +++ b/geonode/geoserver/tests/integration.py @@ -42,6 +42,7 @@ from geonode.decorators import on_ogc_backend from geonode.base.models import TopicCategory, Link from geonode.geoserver.helpers import set_attributes_from_geoserver +from geonode.security.registry import permissions_registry LOCAL_TIMEOUT = 300 @@ -351,4 +352,5 @@ def test_default_anonymous_permissions(self): saved_dataset.delete() def get_user_resource_perms(self, instance, user): - return list(instance.get_user_perms(user).union(instance.get_self_resource().get_user_perms(user))) + return permissions_registry.get_perms(instance=instance, user=user) + # return list(instance.get_user_perms(user).union(instance.get_self_resource().get_user_perms(user))) diff --git a/geonode/groups/tests.py b/geonode/groups/tests.py index db9ce8e543a..5366ab515f1 100644 --- a/geonode/groups/tests.py +++ b/geonode/groups/tests.py @@ -335,7 +335,7 @@ def test_perms_info(self): # Add test to test perms being sent to the front end. layer = Dataset.objects.first() layer.set_default_permissions() - perms_info = permissions_registry.get_perms(instance=layer, include_virtual=True) + perms_info = permissions_registry.get_perms(instance=layer) # Ensure there is only one group 'anonymous' by default self.assertEqual(len(perms_info["groups"].keys()), 1) @@ -696,7 +696,7 @@ def test_group_activity_pages_render(self): try: # Add test to test perms being sent to the front end. dataset.set_default_permissions() - perms_info = permissions_registry.get_perms(instance=dataset, include_virtual=True) + perms_info = permissions_registry.get_perms(instance=dataset) # Ensure there is only one group 'anonymous' by default self.assertEqual(len(perms_info["groups"].keys()), 1) diff --git a/geonode/layers/tests.py b/geonode/layers/tests.py index 0d814cc972d..d0a9d304227 100644 --- a/geonode/layers/tests.py +++ b/geonode/layers/tests.py @@ -642,7 +642,7 @@ def test_assign_change_dataset_data_perm(self): layer = Dataset.objects.first() user = get_anonymous_user() layer.set_permissions({"users": {user.username: ["change_dataset_data"]}}) - perms = permissions_registry.get_perms(instance=layer, include_virtual=True) + perms = permissions_registry.get_perms(instance=layer) self.assertNotIn(user, perms["users"]) self.assertNotIn(user.username, perms["users"]) @@ -737,13 +737,13 @@ def test_surrogate_escape_string(self): def test_assign_remove_permissions(self): # Assing layer = Dataset.objects.all().first() - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) self.assertNotIn(get_user_model().objects.get(username="norman"), perm_spec["users"]) utils.set_datasets_permissions( "edit", resources_names=[layer.name], users_usernames=["norman"], delete_flag=False, verbose=True ) - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) _c = 0 if "users" in perm_spec: for _u in perm_spec["users"]: @@ -756,7 +756,7 @@ def test_assign_remove_permissions(self): utils.set_datasets_permissions( "read", resources_names=[layer.name], users_usernames=["norman"], delete_flag=True, verbose=True ) - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) _c = 0 if "users" in perm_spec: for _u in perm_spec["users"]: @@ -769,7 +769,7 @@ def test_assign_remove_permissions(self): def test_assign_remove_permissions_for_groups(self): # Assing layer = Dataset.objects.all().first() - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) group_profile = GroupProfile.objects.create(slug="group1", title="group1", access="public") self.assertNotIn(group_profile, perm_spec["groups"]) @@ -777,7 +777,7 @@ def test_assign_remove_permissions_for_groups(self): utils.set_datasets_permissions( "manage", resources_names=[layer.name], groups_names=["group1"], delete_flag=False, verbose=True ) - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) expected = { "change_dataset_data", "change_dataset_style", @@ -796,7 +796,7 @@ def test_assign_remove_permissions_for_groups(self): utils.set_datasets_permissions( "view", resources_names=[layer.name], groups_names=["group1"], delete_flag=False, verbose=True ) - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) expected = {"view_resourcebase"} # checking the perms list self.assertSetEqual(expected, set(perm_spec["groups"][group_profile.group])) @@ -805,7 +805,7 @@ def test_assign_remove_permissions_for_groups(self): utils.set_datasets_permissions( "view", resources_names=[layer.name], groups_names=["group1"], delete_flag=True, verbose=True ) - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) # checking the perms list self.assertTrue(group_profile.group not in perm_spec["groups"]) @@ -1816,7 +1816,7 @@ def _create_arguments(self, perms_type, mode="set"): def _assert_perms(self, expected_perms, dataset, username, assertion=True): dataset.refresh_from_db() - perms = permissions_registry.get_perms(instance=dataset, include_virtual=True) + perms = permissions_registry.get_perms(instance=dataset) if assertion: self.assertTrue(username in [user.username for user in perms["users"]]) actual = set( diff --git a/geonode/layers/views.py b/geonode/layers/views.py index f7907326936..90efbec41ae 100644 --- a/geonode/layers/views.py +++ b/geonode/layers/views.py @@ -61,6 +61,7 @@ from geonode.people.forms import ProfileForm from geonode.utils import check_ogc_backend, llbbox_to_mercator, resolve_object from geonode.geoserver.helpers import ogc_server_settings +from geonode.security.registry import permissions_registry if check_ogc_backend(geoserver.BACKEND_PACKAGE): from geonode.geoserver.helpers import gs_catalog @@ -646,7 +647,7 @@ def dataset_metadata_detail(request, layername, template="datasets/dataset_metad site_url = settings.SITEURL.rstrip("/") if settings.SITEURL.startswith("http") else settings.SITEURL register_event(request, "view_metadata", layer) - perms_list = layer.get_user_perms(request.user) + perms_list = permissions_registry.get_perms(instance=layer, user=request.user) return render( request, diff --git a/geonode/people/tests.py b/geonode/people/tests.py index 18ae6736fe0..46e4047c8b7 100644 --- a/geonode/people/tests.py +++ b/geonode/people/tests.py @@ -134,7 +134,7 @@ def test_set_unset_user_dataset_permissions(self): ) for layer in self.layers: user = get_user_model().objects.first() - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) self.assertFalse(user in perm_spec["users"], f"{layer} - {user}") @override_settings(ASYNC_SIGNALS=False) @@ -167,7 +167,7 @@ def test_set_unset_group_dataset_permissions(self): verbose=True, ) for layer in self.layers: - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) self.assertTrue(self.groups[0] in perm_spec["groups"]) @override_settings(ASYNC_SIGNALS=False) @@ -215,7 +215,7 @@ def test_unset_group_dataset_perms(self): verbose=True, ) for layer in self.layers: - perm_spec = permissions_registry.get_perms(instance=layer, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=layer) self.assertTrue(user not in perm_spec["users"]) def test_forgot_username(self): diff --git a/geonode/resource/manager.py b/geonode/resource/manager.py index 4f62f5307f5..e2bb8cb2d91 100644 --- a/geonode/resource/manager.py +++ b/geonode/resource/manager.py @@ -801,7 +801,7 @@ def _safe_assign_perm(perm, user_or_group, obj=None): uuid, instance=_resource, owner=owner, - permissions=permissions_registry.get_perms(instance=_resource, include_virtual=True), + permissions=permissions_registry.get_perms(instance=_resource), created=created, ): # This might not be a severe error. E.g. for datasets outside of local GeoServer diff --git a/geonode/security/models.py b/geonode/security/models.py index 914aab02317..f8e3b29d09f 100644 --- a/geonode/security/models.py +++ b/geonode/security/models.py @@ -451,7 +451,7 @@ def user_can(self, user, permission): """ Checks if a has a given permission to the resource. """ - user_perms = self.get_user_perms(user) + user_perms = permissions_registry.get_perms(instance=self, user=user) if permission not in user_perms: # TODO cater for permissions with syntax base.permission_codename diff --git a/geonode/security/registry.py b/geonode/security/registry.py index 41a42277668..81f84274281 100644 --- a/geonode/security/registry.py +++ b/geonode/security/registry.py @@ -54,7 +54,7 @@ def __check_item(self, item): def fixup_perms(self, instance, payload, include_virtual=True, *args, **kwargs): for handler in self.REGISTRY: - payload = handler.fixup_perms(instance, payload, include_virtual, *args, **kwargs) + payload = handler.fixup_perms(instance, payload, include_virtual=include_virtual, *args, **kwargs) return payload def get_perms(self, instance, user=None, include_virtual=True, *args, **kwargs): diff --git a/geonode/security/tests.py b/geonode/security/tests.py index 4197f7cb4a4..bcd690c0c54 100644 --- a/geonode/security/tests.py +++ b/geonode/security/tests.py @@ -939,7 +939,7 @@ def test_set_dataset_permissions(self): # Test that previous permissions for users other than ones specified in # the perm_spec (and the layers owner) were removed - current_perms = permissions_registry.get_perms(instance=layer, include_virtual=True) + current_perms = permissions_registry.get_perms(instance=layer) self.assertGreaterEqual(len(current_perms["users"]), 1) # Test that there are no duplicates on returned permissions @@ -1899,7 +1899,7 @@ def test_set_compact_permissions(self): permissions, expected = item self.resource.set_permissions(permissions) for authorized_subject, expected_perms in expected.items(): - perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] + perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), @@ -1981,7 +1981,7 @@ def test_permissions_are_set_as_expected_resource_publishing_True(self): permissions, expected = item self.resource.set_permissions(permissions) for authorized_subject, expected_perms in expected.items(): - perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] + perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), @@ -2054,7 +2054,9 @@ def test_permissions_are_set_as_expected_admin_upload_resource_publishing_True(s permissions, expected = item self.resource.set_permissions(permissions) for authorized_subject, expected_perms in expected.items(): - perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] + perms_got = [ + x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject) + ] self.assertSetEqual( set(expected_perms), set(perms_got), @@ -2125,7 +2127,7 @@ def test_permissions_are_set_as_expected_admin_upload_resource_publishing_False( permissions, expected = item self.resource.set_permissions(permissions) for authorized_subject, expected_perms in expected.items(): - perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] + perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), @@ -2180,7 +2182,7 @@ def test_permissions_on_user_role_promotion_to_manager(self): sut.refresh_from_db() self.assertEqual(sut.role, "manager") for authorized_subject, expected_perms in expected.items(): - perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] + perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}" ) @@ -2212,7 +2214,7 @@ def test_permissions_on_user_role_demote_to_member(self): self.group_member: ["download_resourcebase", "view_resourcebase"], } for authorized_subject, expected_perms in expected.items(): - perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] + perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}" ) @@ -2245,7 +2247,7 @@ def test_permissions_on_user_role_demote_to_member_only_RESOURCE_PUBLISHING_acti self.group_member: ["download_resourcebase", "view_resourcebase"], } for authorized_subject, expected_perms in expected.items(): - perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] + perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}" ) @@ -2301,7 +2303,7 @@ def test_permissions_on_user_role_promote_to_manager_only_RESOURCE_PUBLISHING_ac ], } for authorized_subject, expected_perms in expected.items(): - perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)] + perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)] self.assertSetEqual( set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}" ) @@ -2313,10 +2315,7 @@ def test_if_anonymoys_default_perms_is_false_should_not_assign_perms_to_user_gro """ resource = resource_manager.create(str(uuid.uuid4), Dataset, defaults={"owner": self.group_member}) - self.assertFalse( - self.group_profile.group - in permissions_registry.get_perms(instance=resource, include_virtual=True)["groups"].keys() - ) + self.assertFalse(self.group_profile.group in permissions_registry.get_perms(instance=resource)["groups"].keys()) @override_settings(DEFAULT_ANONYMOUS_DOWNLOAD_PERMISSION=False) def test_if_anonymoys_default_download_perms_is_false_should_not_assign_perms_to_user_group(self): @@ -2325,10 +2324,7 @@ def test_if_anonymoys_default_download_perms_is_false_should_not_assign_perms_to """ resource = resource_manager.create(str(uuid.uuid4), Dataset, defaults={"owner": self.group_member}) - self.assertFalse( - self.group_profile.group - in permissions_registry.get_perms(instance=resource, include_virtual=True)["groups"].keys() - ) + self.assertFalse(self.group_profile.group in permissions_registry.get_perms(instance=resource)["groups"].keys()) @override_settings(DEFAULT_ANONYMOUS_DOWNLOAD_PERMISSION=False) @override_settings(RESOURCE_PUBLISHING=True) @@ -2339,13 +2335,8 @@ def test_if_anonymoys_default_perms_is_false_should_assign_perms_to_user_group_i """ resource = resource_manager.create(str(uuid.uuid4), Dataset, defaults={"owner": self.group_member}) - self.assertTrue( - self.group_profile.group - in permissions_registry.get_perms(instance=resource, include_virtual=True)["groups"].keys() - ) - group_val = permissions_registry.get_perms(instance=resource, include_virtual=True)["groups"][ - self.group_profile.group - ] + self.assertTrue(self.group_profile.group in permissions_registry.get_perms(instance=resource)["groups"].keys()) + group_val = permissions_registry.get_perms(instance=resource)["groups"][self.group_profile.group] self.assertSetEqual({"view_resourcebase", "download_resourcebase"}, set(group_val)) @override_settings(DEFAULT_ANONYMOUS_VIEW_PERMISSION=False) @@ -2360,13 +2351,8 @@ def test_if_anonymoys_default_perms_is_false_should_assign_perms_to_user_group_i resource = resource_manager.create(str(uuid.uuid4), Dataset, defaults={"owner": self.group_member}) - self.assertTrue( - self.group_profile.group - in permissions_registry.get_perms(instance=resource, include_virtual=True)["groups"].keys() - ) - group_val = permissions_registry.get_perms(instance=resource, include_virtual=True)["groups"][ - self.group_profile.group - ] + self.assertTrue(self.group_profile.group in permissions_registry.get_perms(instance=resource)["groups"].keys()) + group_val = permissions_registry.get_perms(instance=resource)["groups"][self.group_profile.group] self.assertSetEqual({"view_resourcebase", "download_resourcebase"}, set(group_val)) @@ -2427,7 +2413,7 @@ def setUp(self): assign_perm(perm, self.member_with_perms, self.resource.get_self_resource()) # Assert inital assignment of permissions to groups and users - resource_perm_specs = permissions_registry.get_perms(instance=self.resource, include_virtual=True) + resource_perm_specs = permissions_registry.get_perms(instance=self.resource) self.assertSetEqual( set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms) ) @@ -2476,7 +2462,7 @@ def test_owner_is_group_manager(self): # Admin publishes and approves the resource response = self.admin_approve_and_publish_resource() self.assertEqual(response.status_code, 200) - resource_perm_specs = permissions_registry.get_perms(instance=self.resource, include_virtual=True) + resource_perm_specs = permissions_registry.get_perms(instance=self.resource) # Once a resource has been published, the 'publish_resourcebase' permission should be removed anyway self.assertSetEqual( @@ -2487,7 +2473,7 @@ def test_owner_is_group_manager(self): # Admin un-approves and un-publishes the resource response = self.admin_unapprove_and_unpublish_resource() self.assertEqual(response.status_code, 200) - resource_perm_specs = permissions_registry.get_perms(instance=self.resource, include_virtual=True) + resource_perm_specs = permissions_registry.get_perms(instance=self.resource) self.assertSetEqual( set(resource_perm_specs["users"][self.author]), @@ -2497,7 +2483,7 @@ def test_owner_is_group_manager(self): GroupMember.objects.get(group=self.owner_group, user=self.author).demote() def assertions_for_approved_or_published_is_true(self): - resource_perm_specs = permissions_registry.get_perms(instance=self.resource, include_virtual=True) + resource_perm_specs = permissions_registry.get_perms(instance=self.resource) self.assertSetEqual(set(resource_perm_specs["users"][self.author]), set(self.owner_perms)) self.assertSetEqual( set(resource_perm_specs["users"][self.member_with_perms]), set(self.owner_perms + self.dataset_perms) @@ -2514,7 +2500,7 @@ def assertions_for_approved_or_published_is_true(self): self.assertSetEqual(set(resource_perm_specs["groups"][self.resource_group.group]), set(self.safe_perms)) def assertions_for_approved_and_published_is_false(self): - resource_perm_specs = permissions_registry.get_perms(instance=self.resource, include_virtual=True) + resource_perm_specs = permissions_registry.get_perms(instance=self.resource) self.assertSetEqual( set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms) ) @@ -2638,7 +2624,7 @@ def test_anonymous_user_is_stripped_off(self): assign_perm(perm, get_anonymous_user(), resource) assign_perm(perm, Group.objects.get(name="anonymous"), resource) - perm_spec = permissions_registry.get_perms(instance=resource, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=resource) anonymous_user_perm = perm_spec["users"].get(get_anonymous_user()) self.assertEqual(anonymous_user_perm, None, "Anynmous user wasn't removed") diff --git a/geonode/security/utils.py b/geonode/security/utils.py index 9a91ba5e42f..2038a03d06f 100644 --- a/geonode/security/utils.py +++ b/geonode/security/utils.py @@ -626,9 +626,7 @@ def get_permissions( if _resource: from geonode.security.registry import permissions_registry - perm_spec = _permissions or copy.deepcopy( - permissions_registry.get_perms(instance=_resource, include_virtual=True) - ) + perm_spec = _permissions or copy.deepcopy(permissions_registry.get_perms(instance=_resource)) # Sanity checks if isinstance(perm_spec, str): @@ -714,7 +712,7 @@ def set_group_member_permissions(user, group, role): ).filter(owner=user) _resources = queryset.iterator() for _r in _resources: - perm_spec = permissions_registry.get_perms(instance=_r, include_virtual=True) + perm_spec = permissions_registry.get_perms(instance=_r) if "users" not in perm_spec: perm_spec["users"] = {} if "groups" not in perm_spec: diff --git a/geonode/security/views.py b/geonode/security/views.py index 8a5ed3891f9..9c695c4fd9e 100644 --- a/geonode/security/views.py +++ b/geonode/security/views.py @@ -41,7 +41,7 @@ def _perms_info(obj): - return permissions_registry.get_perms(instance=obj, include_virtual=True) + return permissions_registry.get_perms(instance=obj) def _perms_info_json(obj): diff --git a/geonode/services/views.py b/geonode/services/views.py index c5c676721d5..3341fb7fcc1 100644 --- a/geonode/services/views.py +++ b/geonode/services/views.py @@ -39,6 +39,7 @@ from .models import Service from . import forms, enumerations from .serviceprocessors import get_service_handler +from geonode.security.registry import permissions_registry logger = logging.getLogger(__name__) @@ -139,7 +140,7 @@ def harvest_resources_handle_get(request, service, handler): {"id": "type-filter", "data_key": "type"}, ] - perms_list = service.get_user_perms(request.user) + perms_list = permissions_registry.get_perms(instance=service, user=request.user) result = render( request, @@ -249,7 +250,7 @@ def service_detail(request, service_id): permissions_json = _perms_info_json(service) - perms_list = service.get_user_perms(request.user) + perms_list = permissions_registry.get_perms(instance=service, user=request.user) harvested_resources_ids = [] if service.harvester: diff --git a/geonode/upload/tests/end2end/integration.py b/geonode/upload/tests/end2end/integration.py index f1fb6276729..e5fc6ec7cc2 100644 --- a/geonode/upload/tests/end2end/integration.py +++ b/geonode/upload/tests/end2end/integration.py @@ -41,6 +41,7 @@ from geonode.tests.utils import upload_step, Client from geonode.geoserver.helpers import ogc_server_settings, cascading_delete from geonode.geoserver.signals import gs_catalog +from geonode.security.registry import permissions_registry from geoserver.catalog import Catalog from gisdata import BAD_DATA @@ -695,7 +696,7 @@ def get_wms_timepositions(): resp, data = self.client.upload_file(thefile, perms='{"users": {"AnonymousUser": []}, "groups":{}}') _dataset = Dataset.objects.get(name=dataset_name) _user = get_user_model().objects.get(username="AnonymousUser") - self.assertEqual(_dataset.get_user_perms(_user).count(), 0) + self.assertEqual(permissions_registry.get_perms(instance=_dataset, user=_user).count(), 0) # initial state is no positions or info self.assertTrue(get_wms_timepositions() is None) diff --git a/geonode/upload/tests/end2end/test_end2end.py b/geonode/upload/tests/end2end/test_end2end.py index bd6e34761cc..e5e5de4aa72 100644 --- a/geonode/upload/tests/end2end/test_end2end.py +++ b/geonode/upload/tests/end2end/test_end2end.py @@ -130,7 +130,7 @@ def _assertimport( # check if the dynamic model is created if os.getenv("IMPORTER_ENABLE_DYN_MODELS", False): - _schema_id = ModelSchema.objects.filter(name__icontains=initial_name.lower().replace(' ', '_')) + _schema_id = ModelSchema.objects.filter(name__icontains=initial_name.lower().replace(" ", "_")) self.assertTrue(_schema_id.exists()) schema_entity = _schema_id.first() self.assertTrue(FieldSchema.objects.filter(model_schema=schema_entity).exists()) @@ -141,7 +141,8 @@ def _assertimport( # check if the geonode resource exists resource = ResourceBase.objects.filter( - Q(alternate__icontains=f"geonode:{initial_name.lower().replace(' ', '_')}") | Q(alternate__icontains=initial_name.lower().replace(' ', '_')) + Q(alternate__icontains=f"geonode:{initial_name.lower().replace(' ', '_')}") + | Q(alternate__icontains=initial_name.lower().replace(" ", "_")) ) self.assertTrue(resource.exists()) diff --git a/geonode/upload/tests/integration.py b/geonode/upload/tests/integration.py index 4d9877f3a7c..4e2c113b483 100644 --- a/geonode/upload/tests/integration.py +++ b/geonode/upload/tests/integration.py @@ -42,6 +42,7 @@ from geonode.upload.utils import _ALLOW_TIME_STEP from geonode.geoserver.helpers import ogc_server_settings, cascading_delete from geonode.geoserver.signals import gs_catalog +from geonode.security.registry import permissions_registry from geoserver.catalog import Catalog from gisdata import BAD_DATA @@ -696,7 +697,7 @@ def get_wms_timepositions(): resp, data = self.client.upload_file(thefile, perms='{"users": {"AnonymousUser": []}, "groups":{}}') _dataset = Dataset.objects.get(name=dataset_name) _user = get_user_model().objects.get(username="AnonymousUser") - self.assertEqual(_dataset.get_user_perms(_user).count(), 0) + self.assertEqual(permissions_registry.get_perms(instance=_dataset, user=_user).count(), 0) # initial state is no positions or info self.assertTrue(get_wms_timepositions() is None)