diff --git a/tests/test_services.py b/tests/test_services.py index c25454773..91abdf010 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -1019,14 +1019,18 @@ def test_ServiceGeoserver_effective_permissions(self): user group effective (detail) Service1 (gc-A-R) (gm-D-R) gc-A, gf-D, gi-D, gm-D Workspace1 (dp-A-R) (gf-A-R) dp-A, gf-A, gi-D, gm-D (no match on Workspace itself) - Layer1 (gi-A-R) dp-D, gf-A, gi-A, gm-D - Layer2 (gf-D-M) (gm-A-M) dp-D, gf-D, gi-A, gm-A (match > recursive) - Layer3 (gm-A-R) dp-D, gf-A, gi-A, gm-A (effective user > group) + Layer11 (gi-A-R) dp-D, gf-A, gi-A, gm-D + Layer12 (gf-D-M) (gm-A-M) dp-D, gf-D, gi-A, gm-A (match > recursive) + Layer13 (gm-A-R) dp-D, gf-A, gi-A, gm-A (effective user > group) [Layer4] dp-D, gf-A, gi-D (doesn't exist, only R apply) - Process1 dp-A, gf-D, gi-D (gf/gi don't apply on Process) - Process2 (dp-D-M) dp-D, gf-D, gi-D - Process3 (dp-D-M) dp-A, gf-D, gi-D (effective workspace user > group) + Process11 dp-A, gf-D, gi-D (gf/gi don't apply on Process) + Process12 (dp-D-M) dp-D, gf-D, gi-D + Process13 (dp-D-M) dp-A, gf-D, gi-D (effective workspace user > group) [Process4] dp-A, gf-D, gi-D + Workspace2 (gf-A-R) dp-D, gf-A, gi-D + Layer21 (gf-D-M) dp-D, gf-D, gi-D (revoke access, user > group) + Layer22 (gf-D-M) dp-D, gf-D, gi-D (revoke access, both groups) + Layer23 (gf-A-M) (gf-D-M) dp-D, gf-A, gi-D (allowed access, user > group) .. note:: Permissions that do not applied to a given sub-:term:`OWS` implementation are automatically denied. @@ -1035,15 +1039,19 @@ def test_ServiceGeoserver_effective_permissions(self): svc_type = ServiceGeoserver.service_type svc1_name = "unittest-service-geoserver-1" w1_name = "workspace1" + w2_name = "workspace2" wx_name = "fake-workspace" - l1_name = "layer1" - l2_name = "layer2" - l3_name = "layer3" - l4_name = "layer4" - p1_name = "process1" - p2_name = "process2" - p3_name = "process3" - p4_name = "process4" + l11_name = "layer11" + l12_name = "layer12" + l13_name = "layer13" + l14_name = "layer14" + l21_name = "layer21" + l22_name = "layer22" + l23_name = "layer23" + p11_name = "process11" + p12_name = "process12" + p13_name = "process13" + p14_name = "process14" utils.TestSetup.delete_TestService(self, svc1_name) svc1_id, w1_id = utils.TestSetup.create_TestServiceResourceTree( @@ -1054,42 +1062,71 @@ def test_ServiceGeoserver_effective_permissions(self): info = utils.TestSetup.create_TestResource( self, parent_resource_id=w1_id, - override_resource_name=l1_name, + override_resource_name=l11_name, override_resource_type=models.Layer.resource_type_name ) - l1_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + l11_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] info = utils.TestSetup.create_TestResource( self, parent_resource_id=w1_id, - override_resource_name=l2_name, + override_resource_name=l12_name, override_resource_type=models.Layer.resource_type_name ) - l2_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + l12_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] info = utils.TestSetup.create_TestResource( self, parent_resource_id=w1_id, - override_resource_name=l3_name, + override_resource_name=l13_name, override_resource_type=models.Layer.resource_type_name ) - l3_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + l13_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] info = utils.TestSetup.create_TestResource( self, parent_resource_id=w1_id, - override_resource_name=p2_name, + override_resource_name=p12_name, override_resource_type=models.Process.resource_type_name ) - p2_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + p12_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] info = utils.TestSetup.create_TestResource( self, parent_resource_id=w1_id, - override_resource_name=p3_name, + override_resource_name=p13_name, override_resource_type=models.Process.resource_type_name ) - p3_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + p13_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + info = utils.TestSetup.create_TestResource( + self, + parent_resource_id=svc1_id, + override_resource_name=w2_name, + override_resource_type=models.Workspace.resource_type_name + ) + w2_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + info = utils.TestSetup.create_TestResource( + self, + parent_resource_id=w2_id, + override_resource_name=l21_name, + override_resource_type=models.Layer.resource_type_name + ) + l21_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + info = utils.TestSetup.create_TestResource( + self, + parent_resource_id=w2_id, + override_resource_name=l22_name, + override_resource_type=models.Layer.resource_type_name + ) + l22_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] + info = utils.TestSetup.create_TestResource( + self, + parent_resource_id=w2_id, + override_resource_name=l23_name, + override_resource_type=models.Layer.resource_type_name + ) + l23_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"] # create permissions gcAR = PermissionSet(Permission.GET_CAPABILITIES, Access.ALLOW, Scope.RECURSIVE) # noqa gfAR = PermissionSet(Permission.GET_FEATURE, Access.ALLOW, Scope.RECURSIVE) # noqa + gfAM = PermissionSet(Permission.GET_FEATURE, Access.ALLOW, Scope.MATCH) # noqa gfDM = PermissionSet(Permission.GET_FEATURE, Access.DENY, Scope.MATCH) # noqa giAR = PermissionSet(Permission.GET_FEATURE_INFO, Access.ALLOW, Scope.RECURSIVE) # noqa gmAM = PermissionSet(Permission.GET_MAP, Access.ALLOW, Scope.MATCH) # noqa @@ -1101,12 +1138,17 @@ def test_ServiceGeoserver_effective_permissions(self): utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=svc1_id, override_permission=gmDR) utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=w1_id, override_permission=gfAR) utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=w1_id, override_permission=dpAR) - utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l1_id, override_permission=giAR) - utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l2_id, override_permission=gfDM) - utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l2_id, override_permission=gmAM) - utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l3_id, override_permission=gmAR) - utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=p2_id, override_permission=dpDM) - utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=p3_id, override_permission=dpDM) + utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l11_id, override_permission=giAR) + utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l12_id, override_permission=gfDM) + utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l12_id, override_permission=gmAM) + utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l13_id, override_permission=gmAR) + utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=p12_id, override_permission=dpDM) + utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=p13_id, override_permission=dpDM) + utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=w2_id, override_permission=gfAR) + utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l21_id, override_permission=gfDM) + utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l22_id, override_permission=gfDM) + utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l23_id, override_permission=gfAM) + utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l23_id, override_permission=gfDM) # login test user for which the permissions were set self.login_test_user() @@ -1144,20 +1186,24 @@ def _test(_path, _params, allow): w1_wfs_path = "{}/{}/wfs".format(svc_path, w1_name) w1_wms_path = "{}/{}/wms".format(svc_path, w1_name) w1_wps_path = "{}/{}/wps".format(svc_path, w1_name) + w2_wfs_path = "{}/{}/wfs".format(svc_path, w2_name) wx_wfs_path = "{}/{}/wfs".format(svc_path, wx_name) wx_wms_path = "{}/{}/wms".format(svc_path, wx_name) svc_wfs_path = "{}/wfs".format(svc_path) svc_wms_path = "{}/wms".format(svc_path) svc_wps_path = "{}/wps".format(svc_path) - wx_l1 = _scope(wx_name, l1_name) - w1_l1 = _scope(w1_name, l1_name) - w1_l2 = _scope(w1_name, l2_name) - w1_l3 = _scope(w1_name, l3_name) - w1_l4 = _scope(w1_name, l4_name) - w1_p1 = _scope(w1_name, p1_name) - w1_p2 = _scope(w1_name, p2_name) - w1_p3 = _scope(w1_name, p3_name) - w1_p4 = _scope(w1_name, p4_name) + wx_l1 = _scope(wx_name, l11_name) + w1_l1 = _scope(w1_name, l11_name) + w1_l2 = _scope(w1_name, l12_name) + w1_l3 = _scope(w1_name, l13_name) + w1_l4 = _scope(w1_name, l14_name) + w1_p1 = _scope(w1_name, p11_name) + w1_p2 = _scope(w1_name, p12_name) + w1_p3 = _scope(w1_name, p13_name) + w1_p4 = _scope(w1_name, p14_name) + w2_l1 = _scope(w2_name, l21_name) + w2_l2 = _scope(w2_name, l22_name) + w2_l3 = _scope(w2_name, l23_name) # Layer1, mismatching permission for WMS _test(svc_wms_path, {"request": Permission.GET_FEATURE.title, "layers": w1_l1}, allow=False) @@ -1268,14 +1314,14 @@ def _test(_path, _params, allow): # valid WPS requests # - Workspace only expected to work when in path because 'identifier' is not linked to workspace # - Process2 is explicitly denied, so access still blocked even when resource is properly resolved. - _test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p1_name}, allow=True) - _test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p2_name}, allow=False) - _test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p3_name}, allow=True) - _test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p4_name}, allow=True) + _test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p11_name}, allow=True) + _test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p12_name}, allow=False) + _test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p13_name}, allow=True) + _test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p14_name}, allow=True) # other cases all invalid since workspace cannot be resolved - _test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p1_name}, allow=False) - _test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p2_name}, allow=False) - _test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p3_name}, allow=False) + _test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p11_name}, allow=False) + _test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p12_name}, allow=False) + _test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p13_name}, allow=False) _test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": w1_p1}, allow=False) _test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": w1_p2}, allow=False) _test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": w1_p3}, allow=False) @@ -1285,16 +1331,16 @@ def _test(_path, _params, allow): # validate that effective resolution considering user/group priority and recursive/match scope priority work _test(svc_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l1}, allow=False) _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l1}, allow=False) - _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l1_name}, allow=False) + _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l11_name}, allow=False) _test(svc_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l2}, allow=True) _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l2}, allow=True) - _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l2_name}, allow=True) + _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l12_name}, allow=True) _test(svc_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l3}, allow=True) _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l3}, allow=True) - _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l3_name}, allow=True) + _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l13_name}, allow=True) _test(svc_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l4}, allow=False) _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l4}, allow=False) - _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l4_name}, allow=False) + _test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l14_name}, allow=False) # using either the 'typename' or 'typenames' parameter lets WFS retrieve layers indistinguishably alt_name = "typeName" @@ -1327,16 +1373,44 @@ def _add_both(value): # using multiple layers at the same time should validate all of them (all or nothing access) # order should not matter - w1_l1_w1_l1 = ",".join([w1_l1, w1_l1]) # resolved as duplicate, only processed once, allowed - w1_l1_w1_l2 = ",".join([w1_l1, w1_l2]) # W1-L1 is allowed, but not W1-L2, so both denied - w1_l1_w1_l3 = ",".join([w1_l1, w1_l3]) # both are allowed, so full request allowed as well - w1_l2_w1_l1 = ",".join([w1_l2, w1_l1]) - w1_l3_w1_l1 = ",".join([w1_l3, w1_l1]) - _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l1_w1_l1}, allow=True) - _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l1_w1_l2}, allow=False) - _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l1_w1_l3}, allow=True) - _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l2_w1_l1}, allow=False) - _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l3_w1_l1}, allow=True) + l11_l11 = ",".join([w1_l1, w1_l1]) # resolved as duplicate, only processed once, allowed + l11_l12 = ",".join([w1_l1, w1_l2]) # W1-L1 is allowed, but not W1-L2, so both denied + l11_l13 = ",".join([w1_l1, w1_l3]) # both are allowed, so full request allowed as well + l12_l11 = ",".join([w1_l2, w1_l1]) + l13_l11 = ",".join([w1_l3, w1_l1]) + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l11_l11}, allow=True) + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l11_l12}, allow=False) + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l11_l13}, allow=True) + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l12_l11}, allow=False) + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l13_l11}, allow=True) + + # validate revoking access when explicit denies are placed under previously allowed-recursive resources + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l1}, allow=False) + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l2}, allow=False) + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l3}, allow=True) + _test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l1}, allow=False) + _test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l2}, allow=False) + _test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l3}, allow=True) + + # mixing workspaces still work if only in scoped resource reference, + # but fail for isolated path workspace due to mismatch workspace reference in at least one case + # Summary (GetFeature): + # L11: A L21: D -> only allowed combinations are exclusively with: [L11, L13, L23] + # L12: D L22: D -> deny any combination when following are present: [L12, L21, L22] + # L13: A L23: A + for quantity in [2, 3, 4, 5]: + layer_permutes = itertools.permutations([w1_l1, w1_l2, w1_l3, w2_l1, w2_l2, w2_l3], quantity) + for layer_combo in layer_permutes: + query = ",".join(layer_combo) + allow = not any(layer_deny in layer_combo for layer_deny in [w1_l2, w2_l1, w2_l2]) + _test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow) + + # multiple layers nested under same workspace will work for request with Workspace isolated path + # otherwise, automatic deny regardless if they were allowed during request without workspace in path + allow_w1 = allow and all(layer.startswith(w1_name) for layer in layer_combo) + allow_w2 = allow and all(layer.startswith(w2_name) for layer in layer_combo) + _test(w1_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_w1) + _test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_w2) @runner.MAGPIE_TEST_LOCAL