Skip to content

Commit

Permalink
add large matrix of geoserver workspace/layer test combinations
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Jan 25, 2022
1 parent 40dab93 commit 6304983
Showing 1 changed file with 134 additions and 60 deletions.
194 changes: 134 additions & 60 deletions tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,14 +1019,18 @@ def test_ServiceGeoserver_effective_permissions(self):
user group effective (detail)
Service1 (gc-A-R) (gm-D-R) gc-A, gf-D, gi-D, gm-D
Workspace1 (dp-A-R) (gf-A-R) dp-A, gf-A, gi-D, gm-D (no match on Workspace itself)
Layer1 (gi-A-R) dp-D, gf-A, gi-A, gm-D
Layer2 (gf-D-M) (gm-A-M) dp-D, gf-D, gi-A, gm-A (match > recursive)
Layer3 (gm-A-R) dp-D, gf-A, gi-A, gm-A (effective user > group)
Layer11 (gi-A-R) dp-D, gf-A, gi-A, gm-D
Layer12 (gf-D-M) (gm-A-M) dp-D, gf-D, gi-A, gm-A (match > recursive)
Layer13 (gm-A-R) dp-D, gf-A, gi-A, gm-A (effective user > group)
[Layer4] dp-D, gf-A, gi-D (doesn't exist, only R apply)
Process1 dp-A, gf-D, gi-D (gf/gi don't apply on Process)
Process2 (dp-D-M) dp-D, gf-D, gi-D
Process3 (dp-D-M) dp-A, gf-D, gi-D (effective workspace user > group)
Process11 dp-A, gf-D, gi-D (gf/gi don't apply on Process)
Process12 (dp-D-M) dp-D, gf-D, gi-D
Process13 (dp-D-M) dp-A, gf-D, gi-D (effective workspace user > group)
[Process4] dp-A, gf-D, gi-D
Workspace2 (gf-A-R) dp-D, gf-A, gi-D
Layer21 (gf-D-M) dp-D, gf-D, gi-D (revoke access, user > group)
Layer22 (gf-D-M) dp-D, gf-D, gi-D (revoke access, both groups)
Layer23 (gf-A-M) (gf-D-M) dp-D, gf-A, gi-D (allowed access, user > group)
.. note::
Permissions that do not applied to a given sub-:term:`OWS` implementation are automatically denied.
Expand All @@ -1035,15 +1039,19 @@ def test_ServiceGeoserver_effective_permissions(self):
svc_type = ServiceGeoserver.service_type
svc1_name = "unittest-service-geoserver-1"
w1_name = "workspace1"
w2_name = "workspace2"
wx_name = "fake-workspace"
l1_name = "layer1"
l2_name = "layer2"
l3_name = "layer3"
l4_name = "layer4"
p1_name = "process1"
p2_name = "process2"
p3_name = "process3"
p4_name = "process4"
l11_name = "layer11"
l12_name = "layer12"
l13_name = "layer13"
l14_name = "layer14"
l21_name = "layer21"
l22_name = "layer22"
l23_name = "layer23"
p11_name = "process11"
p12_name = "process12"
p13_name = "process13"
p14_name = "process14"

utils.TestSetup.delete_TestService(self, svc1_name)
svc1_id, w1_id = utils.TestSetup.create_TestServiceResourceTree(
Expand All @@ -1054,42 +1062,71 @@ def test_ServiceGeoserver_effective_permissions(self):
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=w1_id,
override_resource_name=l1_name,
override_resource_name=l11_name,
override_resource_type=models.Layer.resource_type_name
)
l1_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
l11_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=w1_id,
override_resource_name=l2_name,
override_resource_name=l12_name,
override_resource_type=models.Layer.resource_type_name
)
l2_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
l12_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=w1_id,
override_resource_name=l3_name,
override_resource_name=l13_name,
override_resource_type=models.Layer.resource_type_name
)
l3_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
l13_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=w1_id,
override_resource_name=p2_name,
override_resource_name=p12_name,
override_resource_type=models.Process.resource_type_name
)
p2_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
p12_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=w1_id,
override_resource_name=p3_name,
override_resource_name=p13_name,
override_resource_type=models.Process.resource_type_name
)
p3_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
p13_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=svc1_id,
override_resource_name=w2_name,
override_resource_type=models.Workspace.resource_type_name
)
w2_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=w2_id,
override_resource_name=l21_name,
override_resource_type=models.Layer.resource_type_name
)
l21_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=w2_id,
override_resource_name=l22_name,
override_resource_type=models.Layer.resource_type_name
)
l22_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]
info = utils.TestSetup.create_TestResource(
self,
parent_resource_id=w2_id,
override_resource_name=l23_name,
override_resource_type=models.Layer.resource_type_name
)
l23_id = utils.TestSetup.get_ResourceInfo(self, info)["resource_id"]

# create permissions
gcAR = PermissionSet(Permission.GET_CAPABILITIES, Access.ALLOW, Scope.RECURSIVE) # noqa
gfAR = PermissionSet(Permission.GET_FEATURE, Access.ALLOW, Scope.RECURSIVE) # noqa
gfAM = PermissionSet(Permission.GET_FEATURE, Access.ALLOW, Scope.MATCH) # noqa
gfDM = PermissionSet(Permission.GET_FEATURE, Access.DENY, Scope.MATCH) # noqa
giAR = PermissionSet(Permission.GET_FEATURE_INFO, Access.ALLOW, Scope.RECURSIVE) # noqa
gmAM = PermissionSet(Permission.GET_MAP, Access.ALLOW, Scope.MATCH) # noqa
Expand All @@ -1101,12 +1138,17 @@ def test_ServiceGeoserver_effective_permissions(self):
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=svc1_id, override_permission=gmDR)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=w1_id, override_permission=gfAR)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=w1_id, override_permission=dpAR)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l1_id, override_permission=giAR)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l2_id, override_permission=gfDM)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l2_id, override_permission=gmAM)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l3_id, override_permission=gmAR)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=p2_id, override_permission=dpDM)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=p3_id, override_permission=dpDM)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l11_id, override_permission=giAR)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l12_id, override_permission=gfDM)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l12_id, override_permission=gmAM)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l13_id, override_permission=gmAR)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=p12_id, override_permission=dpDM)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=p13_id, override_permission=dpDM)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=w2_id, override_permission=gfAR)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l21_id, override_permission=gfDM)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l22_id, override_permission=gfDM)
utils.TestSetup.create_TestUserResourcePermission(self, override_resource_id=l23_id, override_permission=gfAM)
utils.TestSetup.create_TestGroupResourcePermission(self, override_resource_id=l23_id, override_permission=gfDM)

# login test user for which the permissions were set
self.login_test_user()
Expand Down Expand Up @@ -1144,20 +1186,24 @@ def _test(_path, _params, allow):
w1_wfs_path = "{}/{}/wfs".format(svc_path, w1_name)
w1_wms_path = "{}/{}/wms".format(svc_path, w1_name)
w1_wps_path = "{}/{}/wps".format(svc_path, w1_name)
w2_wfs_path = "{}/{}/wfs".format(svc_path, w2_name)
wx_wfs_path = "{}/{}/wfs".format(svc_path, wx_name)
wx_wms_path = "{}/{}/wms".format(svc_path, wx_name)
svc_wfs_path = "{}/wfs".format(svc_path)
svc_wms_path = "{}/wms".format(svc_path)
svc_wps_path = "{}/wps".format(svc_path)
wx_l1 = _scope(wx_name, l1_name)
w1_l1 = _scope(w1_name, l1_name)
w1_l2 = _scope(w1_name, l2_name)
w1_l3 = _scope(w1_name, l3_name)
w1_l4 = _scope(w1_name, l4_name)
w1_p1 = _scope(w1_name, p1_name)
w1_p2 = _scope(w1_name, p2_name)
w1_p3 = _scope(w1_name, p3_name)
w1_p4 = _scope(w1_name, p4_name)
wx_l1 = _scope(wx_name, l11_name)
w1_l1 = _scope(w1_name, l11_name)
w1_l2 = _scope(w1_name, l12_name)
w1_l3 = _scope(w1_name, l13_name)
w1_l4 = _scope(w1_name, l14_name)
w1_p1 = _scope(w1_name, p11_name)
w1_p2 = _scope(w1_name, p12_name)
w1_p3 = _scope(w1_name, p13_name)
w1_p4 = _scope(w1_name, p14_name)
w2_l1 = _scope(w2_name, l21_name)
w2_l2 = _scope(w2_name, l22_name)
w2_l3 = _scope(w2_name, l23_name)

# Layer1, mismatching permission for WMS
_test(svc_wms_path, {"request": Permission.GET_FEATURE.title, "layers": w1_l1}, allow=False)
Expand Down Expand Up @@ -1268,14 +1314,14 @@ def _test(_path, _params, allow):
# valid WPS requests
# - Workspace only expected to work when in path because 'identifier' is not linked to workspace
# - Process2 is explicitly denied, so access still blocked even when resource is properly resolved.
_test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p1_name}, allow=True)
_test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p2_name}, allow=False)
_test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p3_name}, allow=True)
_test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p4_name}, allow=True)
_test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p11_name}, allow=True)
_test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p12_name}, allow=False)
_test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p13_name}, allow=True)
_test(w1_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p14_name}, allow=True)
# other cases all invalid since workspace cannot be resolved
_test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p1_name}, allow=False)
_test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p2_name}, allow=False)
_test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p3_name}, allow=False)
_test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p11_name}, allow=False)
_test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p12_name}, allow=False)
_test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": p13_name}, allow=False)
_test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": w1_p1}, allow=False)
_test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": w1_p2}, allow=False)
_test(svc_wps_path, {"request": Permission.DESCRIBE_PROCESS.title, "identifier": w1_p3}, allow=False)
Expand All @@ -1285,16 +1331,16 @@ def _test(_path, _params, allow):
# validate that effective resolution considering user/group priority and recursive/match scope priority work
_test(svc_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l1}, allow=False)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l1}, allow=False)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l1_name}, allow=False)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l11_name}, allow=False)
_test(svc_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l2}, allow=True)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l2}, allow=True)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l2_name}, allow=True)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l12_name}, allow=True)
_test(svc_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l3}, allow=True)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l3}, allow=True)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l3_name}, allow=True)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l13_name}, allow=True)
_test(svc_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l4}, allow=False)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": w1_l4}, allow=False)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l4_name}, allow=False)
_test(w1_wms_path, {"request": Permission.GET_MAP.title, "layers": l14_name}, allow=False)

# using either the 'typename' or 'typenames' parameter lets WFS retrieve layers indistinguishably
alt_name = "typeName"
Expand Down Expand Up @@ -1327,16 +1373,44 @@ def _add_both(value):

# using multiple layers at the same time should validate all of them (all or nothing access)
# order should not matter
w1_l1_w1_l1 = ",".join([w1_l1, w1_l1]) # resolved as duplicate, only processed once, allowed
w1_l1_w1_l2 = ",".join([w1_l1, w1_l2]) # W1-L1 is allowed, but not W1-L2, so both denied
w1_l1_w1_l3 = ",".join([w1_l1, w1_l3]) # both are allowed, so full request allowed as well
w1_l2_w1_l1 = ",".join([w1_l2, w1_l1])
w1_l3_w1_l1 = ",".join([w1_l3, w1_l1])
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l1_w1_l1}, allow=True)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l1_w1_l2}, allow=False)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l1_w1_l3}, allow=True)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l2_w1_l1}, allow=False)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w1_l3_w1_l1}, allow=True)
l11_l11 = ",".join([w1_l1, w1_l1]) # resolved as duplicate, only processed once, allowed
l11_l12 = ",".join([w1_l1, w1_l2]) # W1-L1 is allowed, but not W1-L2, so both denied
l11_l13 = ",".join([w1_l1, w1_l3]) # both are allowed, so full request allowed as well
l12_l11 = ",".join([w1_l2, w1_l1])
l13_l11 = ",".join([w1_l3, w1_l1])
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l11_l11}, allow=True)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l11_l12}, allow=False)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l11_l13}, allow=True)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l12_l11}, allow=False)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": l13_l11}, allow=True)

# validate revoking access when explicit denies are placed under previously allowed-recursive resources
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l1}, allow=False)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l2}, allow=False)
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l3}, allow=True)
_test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l1}, allow=False)
_test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l2}, allow=False)
_test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": w2_l3}, allow=True)

# mixing workspaces still work if only in scoped resource reference,
# but fail for isolated path workspace due to mismatch workspace reference in at least one case
# Summary (GetFeature):
# L11: A L21: D -> only allowed combinations are exclusively with: [L11, L13, L23]
# L12: D L22: D -> deny any combination when following are present: [L12, L21, L22]
# L13: A L23: A
for quantity in [2, 3, 4, 5]:
layer_permutes = itertools.permutations([w1_l1, w1_l2, w1_l3, w2_l1, w2_l2, w2_l3], quantity)
for layer_combo in layer_permutes:
query = ",".join(layer_combo)
allow = not any(layer_deny in layer_combo for layer_deny in [w1_l2, w2_l1, w2_l2])
_test(svc_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow)

# multiple layers nested under same workspace will work for request with Workspace isolated path
# otherwise, automatic deny regardless if they were allowed during request without workspace in path
allow_w1 = allow and all(layer.startswith(w1_name) for layer in layer_combo)
allow_w2 = allow and all(layer.startswith(w2_name) for layer in layer_combo)
_test(w1_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_w1)
_test(w2_wfs_path, {"request": Permission.GET_FEATURE.title, "typeNames": query}, allow=allow_w2)


@runner.MAGPIE_TEST_LOCAL
Expand Down

0 comments on commit 6304983

Please sign in to comment.